Compare commits

...

3 Commits

Author SHA1 Message Date
Jonathan Nobels
b24b36fcc8 ipn, tstime : add opt in rate limiting for netmap updates on the IPN bus
updates tailscale/corp#24553

Adds opt-in rate limiting to limit netmap updates to, at most, one every
3 seconds when the client includes the NotifyRateLimitNetmaps option
in the ipn bus watcher opts.   This should mitigate issues with excessive
memory and CPU usage in clients on large, busy tailnets.

Signed-off-by: Jonathan Nobels <jonathan@tailscale.com>
2024-11-15 15:14:16 -05:00
Brad Fitzpatrick
8fd471ce57 control/controlclient: disable https on for http://localhost:$port URLs
Previously we required the program to be running in a test or have
TS_CONTROL_IS_PLAINTEXT_HTTP before we disabled its https fallback
on "http" schema control URLs to localhost with ports.

But nobody accidentally does all three of "http", explicit port
number, localhost and doesn't mean it. And when they mean it, they're
testing a localhost dev control server (like I was) and don't want 443
getting involved.

As of the changes for #13597, this became more annoying in that we
were trying to use a port which wasn't even available.

Updates #13597

Change-Id: Icd00bca56043d2da58ab31de7aa05a3b269c490f
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2024-11-14 12:12:16 -08:00
Brad Fitzpatrick
e73cfd9700 go.toolchain.rev: bump from Go 1.23.1 to Go 1.23.3
Updates #14100

Change-Id: I57f9d4260be15ce1daebe4a9782910aba3fb9dc9
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2024-11-14 10:57:49 -08:00
7 changed files with 324 additions and 17 deletions

View File

@@ -17,7 +17,6 @@ import (
"golang.org/x/net/http2"
"tailscale.com/control/controlhttp"
"tailscale.com/envknob"
"tailscale.com/health"
"tailscale.com/internal/noiseconn"
"tailscale.com/net/dnscache"
@@ -30,7 +29,6 @@ import (
"tailscale.com/util/mak"
"tailscale.com/util/multierr"
"tailscale.com/util/singleflight"
"tailscale.com/util/testenv"
)
// NoiseClient provides a http.Client to connect to tailcontrol over
@@ -107,11 +105,6 @@ type NoiseOpts struct {
DialPlan func() *tailcfg.ControlDialPlan
}
// controlIsPlaintext is whether we should assume that the controlplane is only accessible
// over plaintext HTTP (as the first hop, before the ts2021 encryption begins).
// This is used by some tests which don't have a real TLS certificate.
var controlIsPlaintext = envknob.RegisterBool("TS_CONTROL_IS_PLAINTEXT_HTTP")
// NewNoiseClient returns a new noiseClient for the provided server and machine key.
// serverURL is of the form https://<host>:<port> (no trailing slash).
//
@@ -129,7 +122,7 @@ func NewNoiseClient(opts NoiseOpts) (*NoiseClient, error) {
if u.Scheme == "http" {
httpPort = port
httpsPort = "443"
if (testenv.InTest() || controlIsPlaintext()) && (u.Hostname() == "127.0.0.1" || u.Hostname() == "localhost") {
if u.Hostname() == "127.0.0.1" || u.Hostname() == "localhost" {
httpsPort = ""
}
} else {

View File

@@ -1 +1 @@
bf15628b759344c6fc7763795a405ba65b8be5d7
96578f73d04e1a231fa2a495ad3fa97747785bc6

View File

@@ -73,6 +73,15 @@ const (
NotifyInitialOutgoingFiles // if set, the first Notify message (sent immediately) will contain the current Taildrop OutgoingFiles
NotifyInitialHealthState // if set, the first Notify message (sent immediately) will contain the current health.State of the client
NotifyRateLimitNetmaps // if set, rate limit netmap updates to once every DefaultNetmapRateLimit seconds
)
const (
// This is the minimum time between netmap updates when NotifyRateLimitNetmaps is included in the Notify opts.
// 3 seconds should be sufficient to avoid flooding the UI with netmap updates on large/chatty tailnets without
// causing noticable issues with the UI being out of date.
DefaultNetmapRateLimit = time.Duration(3 * time.Second)
)
// Notify is a communication from a backend (e.g. tailscaled) to a frontend

View File

@@ -82,6 +82,7 @@ import (
"tailscale.com/tka"
"tailscale.com/tsd"
"tailscale.com/tstime"
"tailscale.com/tstime/rate"
"tailscale.com/types/appctype"
"tailscale.com/types/dnstype"
"tailscale.com/types/empty"
@@ -370,6 +371,16 @@ type LocalBackend struct {
// backend is healthy and captive portal detection is not required
// (sending false).
needsCaptiveDetection chan bool
// netmapRateLimiter rate limits netmap updates to to the IPN bus.
// It should be nil if the ipn bus options do not include the rate limiting flag.
// It is automatically created via setNetmapRateLimit.
netmapRateLimiter *rate.Limiter
// deferredNetmapCancel is used to cancel deferred netmap updates which
// were initially blocked due to rate limiting. We always attempt to send the latest
// netmap once the rate limiter allows it, discarding any pending netmaps.
deferredNetmapCancel context.CancelFunc
}
// HealthTracker returns the health tracker for the backend.
@@ -475,6 +486,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
captiveCtx: captiveCtx,
captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running
needsCaptiveDetection: make(chan bool),
deferredNetmapCancel: nil,
}
mConn.SetNetInfoCallback(b.setNetInfo)
@@ -965,6 +977,11 @@ func (b *LocalBackend) Shutdown() {
if b.notifyCancel != nil {
b.notifyCancel()
}
if b.deferredNetmapCancel != nil {
b.deferredNetmapCancel()
}
b.mu.Unlock()
b.webClientShutdown()
@@ -1591,8 +1608,7 @@ func (b *LocalBackend) SetControlClientStatus(c controlclient.Client, st control
// Update the DERP map in the health package, which uses it for health notifications
b.health.SetDERPMap(st.NetMap.DERPMap)
b.send(ipn.Notify{NetMap: st.NetMap})
b.sendNetmap(st.NetMap)
}
if st.URL != "" {
b.logf("Received auth URL: %.20v...", st.URL)
@@ -1677,20 +1693,91 @@ func applySysPolicy(prefs *ipn.Prefs) (anyChange bool) {
return anyChange
}
// setNetmapRateLimit Sets the minimum interval between netmap updates on the IPN Bus (in seconds)
// If interval is 0 or negative, the rate limiter is disabled. Netmap rate limiting is
// disabled by default
// b.mu must be held
func (b *LocalBackend) setNetmapRateLimit(interval time.Duration) {
if interval > 0 {
b.netmapRateLimiter = rate.NewLimiter(rate.Every(interval), 1)
} else {
b.netmapRateLimiter = nil
}
}
// sendNetmap sends a netmap update to the IPN bus respecting the rate limiter. This function
// should be used for all netmap updates on the IPN bus unless there is some critical reason that
// a netmap update be sent immediately.
//
// A non-nil channel will be returned if the netmap update was deferred due to rate limiting. The channel will be closed
// when the netmap update is handled. true or false will be sent to the channel to indicate whether
// or not the netmap was sent or cancelled respectively. A nil return value indicates that the netmap
// was sent immediately. The returned value is primarily useful for testing and you can safely ignore
// it and just call this method at will.
func (b *LocalBackend) sendNetmap(nm *netmap.NetworkMap) chan bool {
notify := ipn.Notify{NetMap: nm}
b.mu.Lock()
// Cancel all pending netmap updates, they're stale and we have something newer
if b.deferredNetmapCancel != nil {
b.deferredNetmapCancel()
}
// No rate limiter? Send it.
// Rate limiter allows the send? Send it.
if b.netmapRateLimiter == nil || b.netmapRateLimiter.Allow() {
b.mu.Unlock()
b.send(notify)
return nil
}
// We're rate limited. Defer the netmap update
var ctx context.Context
ctx, cancel := context.WithCancel(b.ctx)
b.deferredNetmapCancel = cancel
// The rate limiter is set to Limit() events per second. Convert that back to
// the time interval we need to wait
delay := b.netmapRateLimiter.Delay()
b.mu.Unlock()
c := make(chan bool)
// Send the netmap update once the rate limiter allows it
go func() {
select {
case <-time.After(delay):
b.send(notify)
c <- true
case <-ctx.Done():
c <- false
}
close(c)
}()
return c
}
var _ controlclient.NetmapDeltaUpdater = (*LocalBackend)(nil)
// UpdateNetmapDelta implements controlclient.NetmapDeltaUpdater.
func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) {
if !b.MagicConn().UpdateNetmapDelta(muts) {
return false
}
var notify *ipn.Notify // non-nil if we need to send a Notify
// Will be sent if non nil
var netmap *netmap.NetworkMap
// Will send an empty notify if true and netmap is nil (for tests - see below)
var sendEmpty = false
defer func() {
if notify != nil {
if netmap != nil {
b.sendNetmap(netmap)
} else if sendEmpty {
notify := new(ipn.Notify)
b.send(*notify)
}
}()
unlock := b.lockAndGetUnlock()
defer unlock()
if !b.updateNetmapDeltaLocked(muts) {
@@ -1712,13 +1799,14 @@ func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo
slices.SortFunc(nm.Peers, func(a, b tailcfg.NodeView) int {
return cmp.Compare(a.ID(), b.ID())
})
notify = &ipn.Notify{NetMap: nm}
netmap = nm
} else if testenv.InTest() {
// In tests, send an empty Notify as a wake-up so end-to-end
// integration tests in another repo can check on the status of
// LocalBackend after processing deltas.
notify = new(ipn.Notify)
sendEmpty = true
}
return true
}
@@ -2749,6 +2837,12 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A
cancel: cancel,
}
mak.Set(&b.notifyWatchers, sessionID, session)
if mask&ipn.NotifyRateLimitNetmaps != 0 {
b.setNetmapRateLimit(ipn.DefaultNetmapRateLimit)
} else {
b.setNetmapRateLimit(0)
}
b.mu.Unlock()
defer func() {
@@ -4989,6 +5083,9 @@ func (b *LocalBackend) enterStateLockedOnEntry(newState ipn.State, unlock unlock
if authURL == "" {
systemd.Status("Stopped; run 'tailscale up' to log in")
}
if b.deferredNetmapCancel != nil {
b.deferredNetmapCancel()
}
case ipn.Starting, ipn.NeedsMachineAuth:
b.authReconfig()
// Needed so that UpdateEndpoints can run
@@ -5001,7 +5098,9 @@ func (b *LocalBackend) enterStateLockedOnEntry(newState ipn.State, unlock unlock
}
systemd.Status("Connected; %s; %s", activeLogin, strings.Join(addrStrs, " "))
case ipn.NoState:
// Do nothing.
if b.deferredNetmapCancel != nil {
b.deferredNetmapCancel()
}
default:
b.logf("[unexpected] unknown newState %#v", newState)
}

View File

@@ -572,6 +572,164 @@ func TestSetUseExitNodeEnabled(t *testing.T) {
}
}
func TestNetmapRateLimiting(t *testing.T) {
b := new(LocalBackend)
var cancel context.CancelFunc
b.ctx, cancel = context.WithCancel(context.Background())
b.logf = t.Logf
b.setNetmapRateLimit(time.Duration(100 * time.Millisecond))
if b.netmapRateLimiter == nil {
t.Fatalf("no netmapRateLimiter")
}
now := time.Now()
b.netMap = new(netmap.NetworkMap)
if g := b.sendNetmap(b.netMap); g != nil {
t.Errorf("First should be immediately sent immediately")
}
// We just sent a netmap, so these should all be rate limited. c1 should get cancelled.
// c2 should be cancelled. c3 should be sent after 100ms.
c1 := b.sendNetmap(b.netMap)
c2 := b.sendNetmap(b.netMap)
// Let's spam a bunch more we won't track, just for fun
for i := 0; i < 10; i++ {
if g := b.sendNetmap(b.netMap); g == nil {
t.Errorf("should have been deferred")
}
}
// This is our last netmap send. It should be deferred and sent after 100ms.
c3 := b.sendNetmap(b.netMap)
// The first onnetmape should be cancelled
select {
case sent := <-c1:
if sent {
t.Errorf("Second netmap update was not cacncelled; sent got %v, want %v", sent, false)
}
}
// The second netmap should be cancelled
select {
case sent := <-c2:
if sent {
t.Errorf("Second netmap update was not cacncelled; got %v, sent want %v", sent, false)
}
}
// The last netmap should be sent after about 100ms
select {
case sent := <-c3:
if !sent {
t.Errorf("Fourth netmap update was deferred but not sent; sent got %v, want %v", sent, true)
}
}
elapsed := time.Since(now)
if elapsed < 90*time.Millisecond {
t.Errorf("elapsed time %v is too short", elapsed)
}
if elapsed > 110*time.Millisecond {
t.Errorf("elapsed time %v is too long", elapsed)
}
// The rate limiter should be reset at this point and the next netmap should be sent immediately.
if g := b.sendNetmap(b.netMap); g != nil {
t.Errorf("netmap should be immediately sent immediately")
}
// We're rate limited - becuase we just sent a netmap.
// Lower the rate limit and make sure we can send again once the rate limit is up.
b.setNetmapRateLimit(time.Duration(10 * time.Millisecond))
time.Sleep(12 * time.Millisecond)
if g := b.sendNetmap(b.netMap); g != nil {
t.Errorf("netmap should be immediately sent immediately")
}
// Check to make sure the cancellation function is properly set and does what it's
// supposed to do.
c4 := b.sendNetmap(b.netMap)
b.deferredNetmapCancel()
select {
case sent := <-c4:
if sent {
t.Errorf("Fourth netmap should have been cancelled; sent got %v, want %v", sent, true)
}
}
cancel()
}
func TestNetmapDeferral(t *testing.T) {
b := new(LocalBackend)
var cancel context.CancelFunc
b.ctx, cancel = context.WithCancel(context.Background())
b.logf = t.Logf
w := 40 * time.Millisecond
// Ensure that a deferred netmap gets sent with the correct delay
b.setNetmapRateLimit(time.Duration(w))
start := time.Now()
b.sendNetmap(b.netMap)
// Snooze for 20ms
time.Sleep(20 * time.Millisecond)
// This one should be deferred and sent after ~40-20ms
c := b.sendNetmap(b.netMap)
select {
case sent := <-c:
if !sent {
t.Errorf("Fourth netmap update was deferred but not sent; sent got %v, want %v", sent, true)
}
}
slop := 5 * time.Millisecond
g := time.Since(start) * time.Millisecond
// The difference between the elapsed time and the expected time should be within the slop
// and our total time should always be slightly greater than the expected time.
if w-g > slop || w > g {
t.Errorf("elapsed time is too incorrect w:%v g:%v", w, g)
}
cancel()
}
func TestNetmapNoRateLimiting(t *testing.T) {
b := new(LocalBackend)
var cancel context.CancelFunc
b.ctx, cancel = context.WithCancel(context.Background())
b.logf = t.Logf
// A zero rate limit means send-at-will
b.setNetmapRateLimit(0)
b.netMap = new(netmap.NetworkMap)
for i := 0; i < 10; i++ {
if g := b.sendNetmap(b.netMap); g != nil {
t.Errorf("should be immediately sent immediately")
}
}
// A negative rate limit also means send-at-will
b.setNetmapRateLimit(-1)
for i := 0; i < 10; i++ {
if g := b.sendNetmap(b.netMap); g != nil {
t.Errorf("should be immediately sent immediately")
}
}
cancel()
}
func TestFileTargets(t *testing.T) {
b := new(LocalBackend)
_, err := b.FileTargets()

View File

@@ -56,6 +56,29 @@ func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{limit: r, burst: float64(b)}
}
// Limit returns the maximum overall event rate.
func (lim *Limiter) Limit() Limit {
return lim.limit
}
// Delay returns the approximate minimum duration before sufficient tokens
// will be available to permit another event.
func (lim *Limiter) Delay() time.Duration {
lim.mu.Lock()
defer lim.mu.Unlock()
// Calculate the new number of tokens available due to the passage of time.
elapsed := mono.Now().Sub(lim.last)
tokens := lim.tokens + float64(lim.limit)*elapsed.Seconds()
if tokens > lim.burst {
tokens = lim.burst
}
// Calculate the time until the next token is available.
wait := time.Duration((1-tokens)/float64(lim.limit)*1e9) * time.Nanosecond
return wait
}
// Allow reports whether an event may happen now.
func (lim *Limiter) Allow() bool {
return lim.allow(mono.Now())

View File

@@ -145,6 +145,31 @@ func TestSimultaneousRequests(t *testing.T) {
}
}
func TestDelay(t *testing.T) {
lim := NewLimiter(Every(1*time.Second), 1)
// We'll allow for 10 ms of slop to avoid flakiness.
// w should always be just slightly greater than d
slop := int64(10)
lim.Allow()
d := lim.Delay().Milliseconds()
w := int64(1000)
if w-d > slop || d > w {
t.Errorf("Delay() = %v want 1000", d)
}
time.Sleep(50 * time.Millisecond)
w = 950
d = lim.Delay().Milliseconds()
// ~50 milliseconds will have passed,
if w-d > slop || d > w {
t.Errorf("Delay() = %v want 950", d)
}
}
func BenchmarkAllowN(b *testing.B) {
lim := NewLimiter(Every(1*time.Second), 1)
now := mono.Now()