Compare commits
3 Commits
andrew/dns
...
jonathan/n
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b24b36fcc8 | ||
|
|
8fd471ce57 | ||
|
|
e73cfd9700 |
@@ -17,7 +17,6 @@ import (
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"tailscale.com/control/controlhttp"
|
||||
"tailscale.com/envknob"
|
||||
"tailscale.com/health"
|
||||
"tailscale.com/internal/noiseconn"
|
||||
"tailscale.com/net/dnscache"
|
||||
@@ -30,7 +29,6 @@ import (
|
||||
"tailscale.com/util/mak"
|
||||
"tailscale.com/util/multierr"
|
||||
"tailscale.com/util/singleflight"
|
||||
"tailscale.com/util/testenv"
|
||||
)
|
||||
|
||||
// NoiseClient provides a http.Client to connect to tailcontrol over
|
||||
@@ -107,11 +105,6 @@ type NoiseOpts struct {
|
||||
DialPlan func() *tailcfg.ControlDialPlan
|
||||
}
|
||||
|
||||
// controlIsPlaintext is whether we should assume that the controlplane is only accessible
|
||||
// over plaintext HTTP (as the first hop, before the ts2021 encryption begins).
|
||||
// This is used by some tests which don't have a real TLS certificate.
|
||||
var controlIsPlaintext = envknob.RegisterBool("TS_CONTROL_IS_PLAINTEXT_HTTP")
|
||||
|
||||
// NewNoiseClient returns a new noiseClient for the provided server and machine key.
|
||||
// serverURL is of the form https://<host>:<port> (no trailing slash).
|
||||
//
|
||||
@@ -129,7 +122,7 @@ func NewNoiseClient(opts NoiseOpts) (*NoiseClient, error) {
|
||||
if u.Scheme == "http" {
|
||||
httpPort = port
|
||||
httpsPort = "443"
|
||||
if (testenv.InTest() || controlIsPlaintext()) && (u.Hostname() == "127.0.0.1" || u.Hostname() == "localhost") {
|
||||
if u.Hostname() == "127.0.0.1" || u.Hostname() == "localhost" {
|
||||
httpsPort = ""
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -1 +1 @@
|
||||
bf15628b759344c6fc7763795a405ba65b8be5d7
|
||||
96578f73d04e1a231fa2a495ad3fa97747785bc6
|
||||
|
||||
@@ -73,6 +73,15 @@ const (
|
||||
NotifyInitialOutgoingFiles // if set, the first Notify message (sent immediately) will contain the current Taildrop OutgoingFiles
|
||||
|
||||
NotifyInitialHealthState // if set, the first Notify message (sent immediately) will contain the current health.State of the client
|
||||
|
||||
NotifyRateLimitNetmaps // if set, rate limit netmap updates to once every DefaultNetmapRateLimit seconds
|
||||
)
|
||||
|
||||
const (
|
||||
// This is the minimum time between netmap updates when NotifyRateLimitNetmaps is included in the Notify opts.
|
||||
// 3 seconds should be sufficient to avoid flooding the UI with netmap updates on large/chatty tailnets without
|
||||
// causing noticable issues with the UI being out of date.
|
||||
DefaultNetmapRateLimit = time.Duration(3 * time.Second)
|
||||
)
|
||||
|
||||
// Notify is a communication from a backend (e.g. tailscaled) to a frontend
|
||||
|
||||
@@ -82,6 +82,7 @@ import (
|
||||
"tailscale.com/tka"
|
||||
"tailscale.com/tsd"
|
||||
"tailscale.com/tstime"
|
||||
"tailscale.com/tstime/rate"
|
||||
"tailscale.com/types/appctype"
|
||||
"tailscale.com/types/dnstype"
|
||||
"tailscale.com/types/empty"
|
||||
@@ -370,6 +371,16 @@ type LocalBackend struct {
|
||||
// backend is healthy and captive portal detection is not required
|
||||
// (sending false).
|
||||
needsCaptiveDetection chan bool
|
||||
|
||||
// netmapRateLimiter rate limits netmap updates to to the IPN bus.
|
||||
// It should be nil if the ipn bus options do not include the rate limiting flag.
|
||||
// It is automatically created via setNetmapRateLimit.
|
||||
netmapRateLimiter *rate.Limiter
|
||||
|
||||
// deferredNetmapCancel is used to cancel deferred netmap updates which
|
||||
// were initially blocked due to rate limiting. We always attempt to send the latest
|
||||
// netmap once the rate limiter allows it, discarding any pending netmaps.
|
||||
deferredNetmapCancel context.CancelFunc
|
||||
}
|
||||
|
||||
// HealthTracker returns the health tracker for the backend.
|
||||
@@ -475,6 +486,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
|
||||
captiveCtx: captiveCtx,
|
||||
captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running
|
||||
needsCaptiveDetection: make(chan bool),
|
||||
deferredNetmapCancel: nil,
|
||||
}
|
||||
mConn.SetNetInfoCallback(b.setNetInfo)
|
||||
|
||||
@@ -965,6 +977,11 @@ func (b *LocalBackend) Shutdown() {
|
||||
if b.notifyCancel != nil {
|
||||
b.notifyCancel()
|
||||
}
|
||||
|
||||
if b.deferredNetmapCancel != nil {
|
||||
b.deferredNetmapCancel()
|
||||
}
|
||||
|
||||
b.mu.Unlock()
|
||||
b.webClientShutdown()
|
||||
|
||||
@@ -1591,8 +1608,7 @@ func (b *LocalBackend) SetControlClientStatus(c controlclient.Client, st control
|
||||
|
||||
// Update the DERP map in the health package, which uses it for health notifications
|
||||
b.health.SetDERPMap(st.NetMap.DERPMap)
|
||||
|
||||
b.send(ipn.Notify{NetMap: st.NetMap})
|
||||
b.sendNetmap(st.NetMap)
|
||||
}
|
||||
if st.URL != "" {
|
||||
b.logf("Received auth URL: %.20v...", st.URL)
|
||||
@@ -1677,20 +1693,91 @@ func applySysPolicy(prefs *ipn.Prefs) (anyChange bool) {
|
||||
return anyChange
|
||||
}
|
||||
|
||||
// setNetmapRateLimit Sets the minimum interval between netmap updates on the IPN Bus (in seconds)
|
||||
// If interval is 0 or negative, the rate limiter is disabled. Netmap rate limiting is
|
||||
// disabled by default
|
||||
// b.mu must be held
|
||||
func (b *LocalBackend) setNetmapRateLimit(interval time.Duration) {
|
||||
if interval > 0 {
|
||||
b.netmapRateLimiter = rate.NewLimiter(rate.Every(interval), 1)
|
||||
} else {
|
||||
b.netmapRateLimiter = nil
|
||||
}
|
||||
}
|
||||
|
||||
// sendNetmap sends a netmap update to the IPN bus respecting the rate limiter. This function
|
||||
// should be used for all netmap updates on the IPN bus unless there is some critical reason that
|
||||
// a netmap update be sent immediately.
|
||||
//
|
||||
// A non-nil channel will be returned if the netmap update was deferred due to rate limiting. The channel will be closed
|
||||
// when the netmap update is handled. true or false will be sent to the channel to indicate whether
|
||||
// or not the netmap was sent or cancelled respectively. A nil return value indicates that the netmap
|
||||
// was sent immediately. The returned value is primarily useful for testing and you can safely ignore
|
||||
// it and just call this method at will.
|
||||
func (b *LocalBackend) sendNetmap(nm *netmap.NetworkMap) chan bool {
|
||||
notify := ipn.Notify{NetMap: nm}
|
||||
|
||||
b.mu.Lock()
|
||||
|
||||
// Cancel all pending netmap updates, they're stale and we have something newer
|
||||
if b.deferredNetmapCancel != nil {
|
||||
b.deferredNetmapCancel()
|
||||
}
|
||||
|
||||
// No rate limiter? Send it.
|
||||
// Rate limiter allows the send? Send it.
|
||||
if b.netmapRateLimiter == nil || b.netmapRateLimiter.Allow() {
|
||||
b.mu.Unlock()
|
||||
b.send(notify)
|
||||
return nil
|
||||
}
|
||||
|
||||
// We're rate limited. Defer the netmap update
|
||||
var ctx context.Context
|
||||
ctx, cancel := context.WithCancel(b.ctx)
|
||||
b.deferredNetmapCancel = cancel
|
||||
// The rate limiter is set to Limit() events per second. Convert that back to
|
||||
// the time interval we need to wait
|
||||
delay := b.netmapRateLimiter.Delay()
|
||||
b.mu.Unlock()
|
||||
|
||||
c := make(chan bool)
|
||||
|
||||
// Send the netmap update once the rate limiter allows it
|
||||
go func() {
|
||||
select {
|
||||
case <-time.After(delay):
|
||||
b.send(notify)
|
||||
c <- true
|
||||
case <-ctx.Done():
|
||||
c <- false
|
||||
}
|
||||
close(c)
|
||||
}()
|
||||
return c
|
||||
}
|
||||
|
||||
var _ controlclient.NetmapDeltaUpdater = (*LocalBackend)(nil)
|
||||
|
||||
// UpdateNetmapDelta implements controlclient.NetmapDeltaUpdater.
|
||||
func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) {
|
||||
if !b.MagicConn().UpdateNetmapDelta(muts) {
|
||||
return false
|
||||
}
|
||||
|
||||
var notify *ipn.Notify // non-nil if we need to send a Notify
|
||||
// Will be sent if non nil
|
||||
var netmap *netmap.NetworkMap
|
||||
// Will send an empty notify if true and netmap is nil (for tests - see below)
|
||||
var sendEmpty = false
|
||||
|
||||
defer func() {
|
||||
if notify != nil {
|
||||
if netmap != nil {
|
||||
b.sendNetmap(netmap)
|
||||
} else if sendEmpty {
|
||||
notify := new(ipn.Notify)
|
||||
b.send(*notify)
|
||||
}
|
||||
}()
|
||||
|
||||
unlock := b.lockAndGetUnlock()
|
||||
defer unlock()
|
||||
if !b.updateNetmapDeltaLocked(muts) {
|
||||
@@ -1712,13 +1799,14 @@ func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo
|
||||
slices.SortFunc(nm.Peers, func(a, b tailcfg.NodeView) int {
|
||||
return cmp.Compare(a.ID(), b.ID())
|
||||
})
|
||||
notify = &ipn.Notify{NetMap: nm}
|
||||
netmap = nm
|
||||
} else if testenv.InTest() {
|
||||
// In tests, send an empty Notify as a wake-up so end-to-end
|
||||
// integration tests in another repo can check on the status of
|
||||
// LocalBackend after processing deltas.
|
||||
notify = new(ipn.Notify)
|
||||
sendEmpty = true
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -2749,6 +2837,12 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A
|
||||
cancel: cancel,
|
||||
}
|
||||
mak.Set(&b.notifyWatchers, sessionID, session)
|
||||
if mask&ipn.NotifyRateLimitNetmaps != 0 {
|
||||
b.setNetmapRateLimit(ipn.DefaultNetmapRateLimit)
|
||||
} else {
|
||||
b.setNetmapRateLimit(0)
|
||||
}
|
||||
|
||||
b.mu.Unlock()
|
||||
|
||||
defer func() {
|
||||
@@ -4989,6 +5083,9 @@ func (b *LocalBackend) enterStateLockedOnEntry(newState ipn.State, unlock unlock
|
||||
if authURL == "" {
|
||||
systemd.Status("Stopped; run 'tailscale up' to log in")
|
||||
}
|
||||
if b.deferredNetmapCancel != nil {
|
||||
b.deferredNetmapCancel()
|
||||
}
|
||||
case ipn.Starting, ipn.NeedsMachineAuth:
|
||||
b.authReconfig()
|
||||
// Needed so that UpdateEndpoints can run
|
||||
@@ -5001,7 +5098,9 @@ func (b *LocalBackend) enterStateLockedOnEntry(newState ipn.State, unlock unlock
|
||||
}
|
||||
systemd.Status("Connected; %s; %s", activeLogin, strings.Join(addrStrs, " "))
|
||||
case ipn.NoState:
|
||||
// Do nothing.
|
||||
if b.deferredNetmapCancel != nil {
|
||||
b.deferredNetmapCancel()
|
||||
}
|
||||
default:
|
||||
b.logf("[unexpected] unknown newState %#v", newState)
|
||||
}
|
||||
|
||||
@@ -572,6 +572,164 @@ func TestSetUseExitNodeEnabled(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNetmapRateLimiting(t *testing.T) {
|
||||
b := new(LocalBackend)
|
||||
var cancel context.CancelFunc
|
||||
b.ctx, cancel = context.WithCancel(context.Background())
|
||||
b.logf = t.Logf
|
||||
b.setNetmapRateLimit(time.Duration(100 * time.Millisecond))
|
||||
|
||||
if b.netmapRateLimiter == nil {
|
||||
t.Fatalf("no netmapRateLimiter")
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
b.netMap = new(netmap.NetworkMap)
|
||||
if g := b.sendNetmap(b.netMap); g != nil {
|
||||
t.Errorf("First should be immediately sent immediately")
|
||||
}
|
||||
|
||||
// We just sent a netmap, so these should all be rate limited. c1 should get cancelled.
|
||||
// c2 should be cancelled. c3 should be sent after 100ms.
|
||||
c1 := b.sendNetmap(b.netMap)
|
||||
c2 := b.sendNetmap(b.netMap)
|
||||
|
||||
// Let's spam a bunch more we won't track, just for fun
|
||||
for i := 0; i < 10; i++ {
|
||||
if g := b.sendNetmap(b.netMap); g == nil {
|
||||
t.Errorf("should have been deferred")
|
||||
}
|
||||
}
|
||||
|
||||
// This is our last netmap send. It should be deferred and sent after 100ms.
|
||||
c3 := b.sendNetmap(b.netMap)
|
||||
|
||||
// The first onnetmape should be cancelled
|
||||
select {
|
||||
case sent := <-c1:
|
||||
if sent {
|
||||
t.Errorf("Second netmap update was not cacncelled; sent got %v, want %v", sent, false)
|
||||
}
|
||||
}
|
||||
|
||||
// The second netmap should be cancelled
|
||||
select {
|
||||
case sent := <-c2:
|
||||
if sent {
|
||||
t.Errorf("Second netmap update was not cacncelled; got %v, sent want %v", sent, false)
|
||||
}
|
||||
}
|
||||
|
||||
// The last netmap should be sent after about 100ms
|
||||
select {
|
||||
case sent := <-c3:
|
||||
if !sent {
|
||||
t.Errorf("Fourth netmap update was deferred but not sent; sent got %v, want %v", sent, true)
|
||||
}
|
||||
}
|
||||
|
||||
elapsed := time.Since(now)
|
||||
if elapsed < 90*time.Millisecond {
|
||||
t.Errorf("elapsed time %v is too short", elapsed)
|
||||
}
|
||||
if elapsed > 110*time.Millisecond {
|
||||
t.Errorf("elapsed time %v is too long", elapsed)
|
||||
}
|
||||
|
||||
// The rate limiter should be reset at this point and the next netmap should be sent immediately.
|
||||
if g := b.sendNetmap(b.netMap); g != nil {
|
||||
t.Errorf("netmap should be immediately sent immediately")
|
||||
}
|
||||
|
||||
// We're rate limited - becuase we just sent a netmap.
|
||||
// Lower the rate limit and make sure we can send again once the rate limit is up.
|
||||
b.setNetmapRateLimit(time.Duration(10 * time.Millisecond))
|
||||
time.Sleep(12 * time.Millisecond)
|
||||
if g := b.sendNetmap(b.netMap); g != nil {
|
||||
t.Errorf("netmap should be immediately sent immediately")
|
||||
}
|
||||
|
||||
// Check to make sure the cancellation function is properly set and does what it's
|
||||
// supposed to do.
|
||||
c4 := b.sendNetmap(b.netMap)
|
||||
b.deferredNetmapCancel()
|
||||
select {
|
||||
case sent := <-c4:
|
||||
if sent {
|
||||
t.Errorf("Fourth netmap should have been cancelled; sent got %v, want %v", sent, true)
|
||||
}
|
||||
}
|
||||
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestNetmapDeferral(t *testing.T) {
|
||||
b := new(LocalBackend)
|
||||
var cancel context.CancelFunc
|
||||
b.ctx, cancel = context.WithCancel(context.Background())
|
||||
b.logf = t.Logf
|
||||
|
||||
w := 40 * time.Millisecond
|
||||
|
||||
// Ensure that a deferred netmap gets sent with the correct delay
|
||||
b.setNetmapRateLimit(time.Duration(w))
|
||||
start := time.Now()
|
||||
b.sendNetmap(b.netMap)
|
||||
|
||||
// Snooze for 20ms
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
// This one should be deferred and sent after ~40-20ms
|
||||
c := b.sendNetmap(b.netMap)
|
||||
select {
|
||||
case sent := <-c:
|
||||
if !sent {
|
||||
t.Errorf("Fourth netmap update was deferred but not sent; sent got %v, want %v", sent, true)
|
||||
}
|
||||
}
|
||||
|
||||
slop := 5 * time.Millisecond
|
||||
g := time.Since(start) * time.Millisecond
|
||||
|
||||
// The difference between the elapsed time and the expected time should be within the slop
|
||||
// and our total time should always be slightly greater than the expected time.
|
||||
if w-g > slop || w > g {
|
||||
t.Errorf("elapsed time is too incorrect w:%v g:%v", w, g)
|
||||
}
|
||||
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestNetmapNoRateLimiting(t *testing.T) {
|
||||
b := new(LocalBackend)
|
||||
var cancel context.CancelFunc
|
||||
b.ctx, cancel = context.WithCancel(context.Background())
|
||||
b.logf = t.Logf
|
||||
|
||||
// A zero rate limit means send-at-will
|
||||
b.setNetmapRateLimit(0)
|
||||
|
||||
b.netMap = new(netmap.NetworkMap)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
if g := b.sendNetmap(b.netMap); g != nil {
|
||||
t.Errorf("should be immediately sent immediately")
|
||||
}
|
||||
}
|
||||
|
||||
// A negative rate limit also means send-at-will
|
||||
b.setNetmapRateLimit(-1)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
if g := b.sendNetmap(b.netMap); g != nil {
|
||||
t.Errorf("should be immediately sent immediately")
|
||||
}
|
||||
}
|
||||
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestFileTargets(t *testing.T) {
|
||||
b := new(LocalBackend)
|
||||
_, err := b.FileTargets()
|
||||
|
||||
@@ -56,6 +56,29 @@ func NewLimiter(r Limit, b int) *Limiter {
|
||||
return &Limiter{limit: r, burst: float64(b)}
|
||||
}
|
||||
|
||||
// Limit returns the maximum overall event rate.
|
||||
func (lim *Limiter) Limit() Limit {
|
||||
return lim.limit
|
||||
}
|
||||
|
||||
// Delay returns the approximate minimum duration before sufficient tokens
|
||||
// will be available to permit another event.
|
||||
func (lim *Limiter) Delay() time.Duration {
|
||||
lim.mu.Lock()
|
||||
defer lim.mu.Unlock()
|
||||
|
||||
// Calculate the new number of tokens available due to the passage of time.
|
||||
elapsed := mono.Now().Sub(lim.last)
|
||||
tokens := lim.tokens + float64(lim.limit)*elapsed.Seconds()
|
||||
if tokens > lim.burst {
|
||||
tokens = lim.burst
|
||||
}
|
||||
|
||||
// Calculate the time until the next token is available.
|
||||
wait := time.Duration((1-tokens)/float64(lim.limit)*1e9) * time.Nanosecond
|
||||
return wait
|
||||
}
|
||||
|
||||
// Allow reports whether an event may happen now.
|
||||
func (lim *Limiter) Allow() bool {
|
||||
return lim.allow(mono.Now())
|
||||
|
||||
@@ -145,6 +145,31 @@ func TestSimultaneousRequests(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelay(t *testing.T) {
|
||||
lim := NewLimiter(Every(1*time.Second), 1)
|
||||
// We'll allow for 10 ms of slop to avoid flakiness.
|
||||
// w should always be just slightly greater than d
|
||||
slop := int64(10)
|
||||
|
||||
lim.Allow()
|
||||
|
||||
d := lim.Delay().Milliseconds()
|
||||
w := int64(1000)
|
||||
|
||||
if w-d > slop || d > w {
|
||||
t.Errorf("Delay() = %v want 1000", d)
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
w = 950
|
||||
d = lim.Delay().Milliseconds()
|
||||
|
||||
// ~50 milliseconds will have passed,
|
||||
if w-d > slop || d > w {
|
||||
t.Errorf("Delay() = %v want 950", d)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAllowN(b *testing.B) {
|
||||
lim := NewLimiter(Every(1*time.Second), 1)
|
||||
now := mono.Now()
|
||||
|
||||
Reference in New Issue
Block a user