Compare commits

...

1 Commits

Author SHA1 Message Date
Brad Fitzpatrick
a794630f60 wgengine/magicsock: add controlknob tunable for session timeout experiments
Updates #TODO

Change-Id: Ifb7ee2b69545cbc457aa2bf4c4744f431edb36e2
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2023-10-06 06:59:17 -07:00
5 changed files with 50 additions and 8 deletions

View File

@@ -7,7 +7,9 @@ package controlknobs
import (
"slices"
"strconv"
"sync/atomic"
"time"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
@@ -52,6 +54,10 @@ type Knobs struct {
// DisableDNSForwarderTCPRetries is whether the DNS forwarder should
// skip retrying truncated queries over TCP.
DisableDNSForwarderTCPRetries atomic.Bool
// MagicsockSessionActiveTimeout is an alternate magicsock session timeout
// duration to use. If zero or unset, the default is used.
MagicsockSessionActiveTimeout syncs.AtomicValue[time.Duration]
}
// UpdateFromNodeAttributes updates k (if non-nil) based on the provided self
@@ -91,6 +97,17 @@ func (k *Knobs) UpdateFromNodeAttributes(selfNodeAttrs []tailcfg.NodeCapability,
k.DisableDeltaUpdates.Store(disableDeltaUpdates)
k.PeerMTUEnable.Store(peerMTUEnable)
k.DisableDNSForwarderTCPRetries.Store(dnsForwarderDisableTCPRetries)
var timeout time.Duration
if vv := capMap[tailcfg.NodeAttrMagicsockSessionTimeout]; len(vv) > 0 {
if v, _ := strconv.Unquote(string(vv[0])); v != "" {
timeout, _ = time.ParseDuration(v)
timeout = max(timeout, 0)
}
}
if was := k.MagicsockSessionActiveTimeout.Load(); was != timeout {
k.MagicsockSessionActiveTimeout.Store(timeout)
}
}
// AsDebugJSON returns k as something that can be marshalled with json.Marshal
@@ -109,5 +126,6 @@ func (k *Knobs) AsDebugJSON() map[string]any {
"DisableDeltaUpdates": k.DisableDeltaUpdates.Load(),
"PeerMTUEnable": k.PeerMTUEnable.Load(),
"DisableDNSForwarderTCPRetries": k.DisableDNSForwarderTCPRetries.Load(),
"MagicsockSessionActiveTimeout": k.MagicsockSessionActiveTimeout.Load().String(),
}
}

View File

@@ -2123,6 +2123,11 @@ const (
// NodeAttrDNSForwarderDisableTCPRetries disables retrying truncated
// DNS queries over TCP if the response is truncated.
NodeAttrDNSForwarderDisableTCPRetries NodeCapability = "dns-forwarder-disable-tcp-retries"
// NodeAttrMagicsockSessionTimeout sets the magicsock session timeout.
// It must have an associated string value, formatted by time.Duration.String
// and parsable by time.ParseDuration. If invalid or unset, the default is used.
NodeAttrMagicsockSessionTimeout NodeCapability = "magicsock-session-timeout"
)
// SetDNSRequest is a request to add a DNS record.

View File

@@ -83,6 +83,13 @@ type endpoint struct {
isWireguardOnly bool // whether the endpoint is WireGuard only
}
func (ep *endpoint) sessionActiveTimeout() time.Duration {
if ep == nil {
return sessionActiveTimeoutDefault
}
return ep.c.sessionActiveTimeout()
}
// endpointDisco is the current disco key and short string for an endpoint. This
// structure is immutable.
type endpointDisco struct {
@@ -104,6 +111,8 @@ type sentPing struct {
// a endpoint. (The subject is the endpoint.endpointState
// map key)
type endpointState struct {
ep *endpoint
// all fields guarded by endpoint.mu
// lastPing is the last (outgoing) ping time.
@@ -169,7 +178,7 @@ func (st *endpointState) shouldDeleteLocked() bool {
return st.index == indexSentinelDeleted
default:
// This was an endpoint discovered at runtime.
return time.Since(st.lastGotPing) > sessionActiveTimeout
return time.Since(st.lastGotPing) > st.ep.sessionActiveTimeout()
}
}
@@ -411,7 +420,7 @@ func (de *endpoint) heartbeat() {
return
}
if mono.Since(de.lastSend) > sessionActiveTimeout {
if mono.Since(de.lastSend) > de.c.sessionActiveTimeout() {
// Session's idle. Stop heartbeating.
de.c.dlogf("[v1] magicsock: disco: ending heartbeats for idle session to %v (%v)", de.publicKey.ShortString(), de.discoShort())
return
@@ -876,7 +885,7 @@ func (de *endpoint) setEndpointsLocked(eps interface {
if st, ok := de.endpointState[ipp]; ok {
st.index = int16(i)
} else {
de.endpointState[ipp] = &endpointState{index: int16(i)}
de.endpointState[ipp] = &endpointState{ep: de, index: int16(i)}
newIpps = append(newIpps, ipp)
}
}
@@ -924,6 +933,7 @@ func (de *endpoint) addCandidateEndpoint(ep netip.AddrPort, forRxPingTxID stun.T
// Newly discovered endpoint. Exciting!
de.c.dlogf("[v1] magicsock: disco: adding %v as candidate endpoint for %v (%s)", ep, de.discoShort(), de.publicKey.ShortString())
de.endpointState[ep] = &endpointState{
ep: de,
lastGotPing: time.Now(),
lastGotPingTxID: forRxPingTxID,
}
@@ -1261,7 +1271,7 @@ func (de *endpoint) populatePeerStatus(ps *ipnstate.PeerStatus) {
now := mono.Now()
ps.LastWrite = de.lastSend.WallTime()
ps.Active = now.Sub(de.lastSend) < sessionActiveTimeout
ps.Active = now.Sub(de.lastSend) < de.c.sessionActiveTimeout()
if udpAddr, derpAddr, _ := de.addrForSendLocked(now); udpAddr.IsValid() && !derpAddr.IsValid() {
ps.CurAddr = udpAddr.String()

View File

@@ -2185,7 +2185,7 @@ func (c *Conn) shouldDoPeriodicReSTUNLocked() bool {
if debugReSTUNStopOnIdle() {
c.logf("magicsock: periodicReSTUN: idle for %v", idleFor.Round(time.Second))
}
if idleFor > sessionActiveTimeout {
if idleFor > c.sessionActiveTimeout() {
if c.controlKnobs != nil && c.controlKnobs.ForceBackgroundSTUN.Load() {
// Overridden by control.
return true
@@ -2657,11 +2657,11 @@ func (c *Conn) SetStatistics(stats *connstats.Statistics) {
}
const (
// sessionActiveTimeout is how long since the last activity we
// sessionActiveTimeoutDefault is how long since the last activity we
// try to keep an established endpoint peering alive.
// It's also the idle time at which we stop doing STUN queries to
// keep NAT mappings alive.
sessionActiveTimeout = 45 * time.Second
sessionActiveTimeoutDefault = 45 * time.Second
// upgradeInterval is how often we try to upgrade to a better path
// even if we have some non-DERP route that works.
@@ -2729,6 +2729,15 @@ func portableTrySetSocketBuffer(pconn nettype.PacketConn, logf logger.Logf) {
}
}
func (c *Conn) sessionActiveTimeout() time.Duration {
if ck := c.controlKnobs; ck != nil {
if v := ck.MagicsockSessionActiveTimeout.Load(); v != 0 {
return v
}
}
return sessionActiveTimeoutDefault
}
// derpStr replaces DERP IPs in s with "derp-".
func derpStr(s string) string { return strings.ReplaceAll(s, "127.3.3.40:", "derp-") }

View File

@@ -2851,7 +2851,7 @@ func TestAddrForSendLockedForWireGuardOnly(t *testing.T) {
}
for _, epd := range test.ep {
endpoint.endpointState[epd.addrPort] = &endpointState{}
endpoint.endpointState[epd.addrPort] = &endpointState{ep: endpoint}
}
udpAddr, _, shouldPing := endpoint.addrForSendLocked(testTime)
if udpAddr.IsValid() != test.validAddr {