Compare commits
6 Commits
marwan/tmp
...
rate-limit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c7ceb07f4 | ||
|
|
5b12473d70 | ||
|
|
db90b611f2 | ||
|
|
ac5039ab39 | ||
|
|
8a94ccc5a2 | ||
|
|
ca6675386b |
@@ -21,6 +21,7 @@ import (
|
||||
"tailscale.com/ipn/ipnserver"
|
||||
"tailscale.com/logpolicy"
|
||||
"tailscale.com/paths"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/wgengine"
|
||||
"tailscale.com/wgengine/magicsock"
|
||||
)
|
||||
@@ -51,6 +52,7 @@ func main() {
|
||||
socketpath := getopt.StringLong("socket", 's', paths.DefaultTailscaledSocket(), "Path of the service unix socket")
|
||||
|
||||
logf := wgengine.RusagePrefixLog(log.Printf)
|
||||
logf = logger.RateLimitedFn(logf, 1, 1, 100)
|
||||
|
||||
err := fixconsole.FixConsoleIfNeeded()
|
||||
if err != nil {
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"tailscale.com/control/controlclient"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/tstest"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/wgengine"
|
||||
"tailscale.com/wgengine/magicsock"
|
||||
"tailscale.com/wgengine/router"
|
||||
@@ -191,12 +192,10 @@ type testNode struct {
|
||||
// Create a new IPN node.
|
||||
func newNode(t *testing.T, prefix string, https *httptest.Server, weirdPrefs bool) testNode {
|
||||
t.Helper()
|
||||
logfe := func(fmt string, args ...interface{}) {
|
||||
t.Logf(prefix+".e: "+fmt, args...)
|
||||
}
|
||||
logf := func(fmt string, args ...interface{}) {
|
||||
t.Logf(prefix+": "+fmt, args...)
|
||||
}
|
||||
|
||||
logfe := logger.WithPrefix(t.Logf, prefix+"e: ")
|
||||
|
||||
logf := logger.WithPrefix(t.Logf, prefix+": ")
|
||||
|
||||
var err error
|
||||
httpc := https.Client()
|
||||
|
||||
@@ -8,12 +8,18 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// Logf is the basic Tailscale logger type: a printf-like func.
|
||||
// Like log.Printf, the format need not end in a newline.
|
||||
// Logf functions should be safe for concurrent use.
|
||||
type Logf func(format string, args ...interface{})
|
||||
|
||||
// WithPrefix wraps f, prefixing each format with the provided prefix.
|
||||
@@ -42,3 +48,56 @@ func (w funcWriter) Write(p []byte) (int, error) {
|
||||
|
||||
// Discard is a Logf that throws away the logs given to it.
|
||||
func Discard(string, ...interface{}) {}
|
||||
|
||||
// limitData is used to keep track of each format string's associated
|
||||
// rate-limiting data.
|
||||
type limitData struct {
|
||||
lim *rate.Limiter // the token bucket associated with this string
|
||||
msgBlocked bool // whether a "duplicate error" message has already been logged
|
||||
ele *list.Element // list element used to access this string in the cache
|
||||
}
|
||||
|
||||
// RateLimitedFn implements rate limiting by fstring on a given Logf.
|
||||
// Messages are allowed through at a maximum of f messages/second, in
|
||||
// bursts of up to b messages at a time. Up to m strings will be held at a time.
|
||||
func RateLimitedFn(logf Logf, f float64, b int, m int) Logf {
|
||||
r := rate.Limit(f)
|
||||
msgLim := make(map[string]*limitData)
|
||||
msgCache := list.New() // a rudimentary LRU that limits the size of the map
|
||||
mu := &sync.Mutex{}
|
||||
|
||||
return func(format string, args ...interface{}) {
|
||||
mu.Lock()
|
||||
rl, ok := msgLim[format]
|
||||
if ok {
|
||||
msgCache.MoveToFront(rl.ele)
|
||||
if rl.lim.Allow() {
|
||||
mu.Lock()
|
||||
rl.msgBlocked = false
|
||||
mu.Unlock()
|
||||
logf(format, args...)
|
||||
} else {
|
||||
if !rl.msgBlocked {
|
||||
rl.msgBlocked = true
|
||||
mu.Unlock()
|
||||
logf("Repeated messages were suppressed by rate limiting. Original message: %s",
|
||||
fmt.Sprintf(format, args...))
|
||||
} else {
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
msgLim[format] = &limitData{rate.NewLimiter(r, b), false, msgCache.PushFront(format)}
|
||||
msgLim[format].lim.Allow()
|
||||
mu.Unlock()
|
||||
logf(format, args...)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
if msgCache.Len() > m {
|
||||
delete(msgLim, msgCache.Back().Value.(string))
|
||||
msgCache.Remove(msgCache.Back())
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"testing"
|
||||
)
|
||||
@@ -19,3 +20,43 @@ func TestStdLogger(t *testing.T) {
|
||||
lg := StdLogger(t.Logf)
|
||||
lg.Printf("plumbed through")
|
||||
}
|
||||
|
||||
func TestRateLimiter(t *testing.T) {
|
||||
|
||||
// Testing function. args[0] should indicate what should
|
||||
logTester := func(want []string) Logf {
|
||||
i := 0
|
||||
return func(format string, args ...interface{}) {
|
||||
got := fmt.Sprintf(format, args...)
|
||||
if i >= len(want) {
|
||||
t.Fatalf("Logging continued past end of expected input: %s", got)
|
||||
}
|
||||
if got != want[i] {
|
||||
t.Fatalf("wanted: %s \n got: %s", want[i], got)
|
||||
}
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
want := []string{
|
||||
"boring string with constant formatting (constant)",
|
||||
"templated format string no. 0",
|
||||
"Repeated messages were suppressed by rate limiting. Original message: boring string with constant formatting (constant)",
|
||||
"Repeated messages were suppressed by rate limiting. Original message: templated format string no. 1",
|
||||
"Make sure this string makes it through the rest (that are blocked) 4",
|
||||
"4 shouldn't get filtered.",
|
||||
}
|
||||
|
||||
lg := RateLimitedFn(logTester(want), 1, 1, 50)
|
||||
var prefixed Logf
|
||||
for i := 0; i < 10; i++ {
|
||||
lg("boring string with constant formatting %s", "(constant)")
|
||||
lg("templated format string no. %d", i)
|
||||
if i == 4 {
|
||||
lg("Make sure this string makes it through the rest (that are blocked) %d", i)
|
||||
prefixed = WithPrefix(lg, string('0'+i))
|
||||
prefixed(" shouldn't get filtered.")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user