Compare commits
1 Commits
dependabot
...
patrickod/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ef68b4c004 |
@@ -48,6 +48,7 @@ import (
|
||||
"tailscale.com/types/key"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/util/ctxkey"
|
||||
"tailscale.com/util/lru"
|
||||
"tailscale.com/util/mak"
|
||||
"tailscale.com/util/set"
|
||||
"tailscale.com/util/slicesx"
|
||||
@@ -178,11 +179,14 @@ type Server struct {
|
||||
verifyClientsURL string
|
||||
verifyClientsURLFailOpen bool
|
||||
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
netConns map[Conn]chan struct{} // chan is closed when conn closes
|
||||
clients map[key.NodePublic]*clientSet
|
||||
watchers set.Set[*sclient] // mesh peers
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
flow map[flowKey]*flow
|
||||
flows []*flow // slice of values of flow map
|
||||
flowCleanIndex int
|
||||
netConns map[Conn]chan struct{} // chan is closed when conn closes
|
||||
clients map[key.NodePublic]*clientSet
|
||||
watchers set.Set[*sclient] // mesh peers
|
||||
// clientsMesh tracks all clients in the cluster, both locally
|
||||
// and to mesh peers. If the value is nil, that means the
|
||||
// peer is only local (and thus in the clients Map, but not
|
||||
@@ -368,6 +372,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
|
||||
packetsDroppedType: metrics.LabelMap{Label: "type"},
|
||||
clients: map[key.NodePublic]*clientSet{},
|
||||
clientsMesh: map[key.NodePublic]PacketForwarder{},
|
||||
flow: map[flowKey]*flow{},
|
||||
netConns: map[Conn]chan struct{}{},
|
||||
memSys0: ms.Sys,
|
||||
watchers: set.Set[*sclient]{},
|
||||
@@ -901,9 +906,20 @@ func (s *Server) debugLogf(format string, v ...any) {
|
||||
}
|
||||
}
|
||||
|
||||
// onRunLoopDone is called when the run loop is done
|
||||
// to clean up.
|
||||
//
|
||||
// It must only be called from the [slient.run] goroutine.
|
||||
func (c *sclient) onRunLoopDone() {
|
||||
c.flows.ForEach(func(k key.NodePublic, peer flowAndClientSet) {
|
||||
peer.f.ref.Add(-1)
|
||||
})
|
||||
}
|
||||
|
||||
// run serves the client until there's an error.
|
||||
// If the client hangs up or the server is closed, run returns nil, otherwise run returns an error.
|
||||
func (c *sclient) run(ctx context.Context) error {
|
||||
defer c.onRunLoopDone()
|
||||
// Launch sender, but don't return from run until sender goroutine is done.
|
||||
var grp errgroup.Group
|
||||
sendCtx, cancelSender := context.WithCancel(ctx)
|
||||
@@ -1066,6 +1082,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
|
||||
var dst *sclient
|
||||
|
||||
s.mu.Lock()
|
||||
flo := s.getMakeFlowLocked(srcKey, dstKey)
|
||||
if set, ok := s.clients[dstKey]; ok {
|
||||
dstLen = set.Len()
|
||||
dst = set.activeClient.Load()
|
||||
@@ -1088,7 +1105,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
|
||||
return c.sendPkt(dst, pkt{
|
||||
bs: contents,
|
||||
enqueuedAt: c.s.clock.Now(),
|
||||
src: srcKey,
|
||||
flow: flo,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1101,22 +1118,13 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
|
||||
}
|
||||
|
||||
var fwd PacketForwarder
|
||||
var dstLen int
|
||||
var dst *sclient
|
||||
|
||||
s.mu.Lock()
|
||||
if set, ok := s.clients[dstKey]; ok {
|
||||
dstLen = set.Len()
|
||||
dst = set.activeClient.Load()
|
||||
}
|
||||
if dst == nil && dstLen < 1 {
|
||||
fwd = s.clientsMesh[dstKey]
|
||||
}
|
||||
s.mu.Unlock()
|
||||
flo, dst, fwd := c.lookupDest(dstKey)
|
||||
flo.noteActivity()
|
||||
|
||||
if dst == nil {
|
||||
if fwd != nil {
|
||||
flo.pktSendRegion.Add(1)
|
||||
flo.byteSendRegion.Add(1)
|
||||
s.packetsForwardedOut.Add(1)
|
||||
err := fwd.ForwardPacket(c.key, dstKey, contents)
|
||||
c.debugLogf("SendPacket for %s, forwarding via %s: %v", dstKey.ShortString(), fwd, err)
|
||||
@@ -1126,22 +1134,22 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
flo.dropUnknownDest.Add(1)
|
||||
reason := dropReasonUnknownDest
|
||||
if dstLen > 1 {
|
||||
reason = dropReasonDupClient
|
||||
} else {
|
||||
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
|
||||
}
|
||||
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
|
||||
s.recordDrop(contents, c.key, dstKey, reason)
|
||||
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
|
||||
return nil
|
||||
}
|
||||
c.debugLogf("SendPacket for %s, sending directly", dstKey.ShortString())
|
||||
|
||||
flo.pktSendLocal.Add(1)
|
||||
flo.byteSendLocal.Add(1)
|
||||
|
||||
p := pkt{
|
||||
bs: contents,
|
||||
enqueuedAt: c.s.clock.Now(),
|
||||
src: c.key,
|
||||
flow: flo,
|
||||
}
|
||||
return c.sendPkt(dst, p)
|
||||
}
|
||||
@@ -1189,6 +1197,7 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r
|
||||
}
|
||||
|
||||
func (c *sclient) sendPkt(dst *sclient, p pkt) error {
|
||||
// TODO(bradfitz): bump metrics on p.flow
|
||||
s := c.s
|
||||
dstKey := dst.key
|
||||
|
||||
@@ -1550,6 +1559,7 @@ type sclient struct {
|
||||
br *bufio.Reader
|
||||
connectedAt time.Time
|
||||
preferred bool
|
||||
flows lru.Cache[key.NodePublic, flowAndClientSet] // keyed by dest
|
||||
|
||||
// Owned by sendLoop, not thread-safe.
|
||||
sawSrc map[key.NodePublic]set.Handle
|
||||
@@ -1605,8 +1615,12 @@ type pkt struct {
|
||||
// The memory is owned by pkt.
|
||||
bs []byte
|
||||
|
||||
// src is the who's the sender of the packet.
|
||||
src key.NodePublic
|
||||
// flow is the flow stats from the src to the dest.
|
||||
flow *flow
|
||||
}
|
||||
|
||||
func (p pkt) src() key.NodePublic {
|
||||
return p.flow.flowKey.Value().src
|
||||
}
|
||||
|
||||
// peerGoneMsg is a request to write a peerGone frame to an sclient
|
||||
@@ -1677,14 +1691,13 @@ func (c *sclient) onSendLoopDone() {
|
||||
for {
|
||||
select {
|
||||
case pkt := <-c.sendQueue:
|
||||
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
|
||||
c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected)
|
||||
case pkt := <-c.discoSendQueue:
|
||||
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
|
||||
c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected)
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (c *sclient) sendLoop(ctx context.Context) error {
|
||||
@@ -1713,11 +1726,11 @@ func (c *sclient) sendLoop(ctx context.Context) error {
|
||||
werr = c.sendMeshUpdates()
|
||||
continue
|
||||
case msg := <-c.sendQueue:
|
||||
werr = c.sendPacket(msg.src, msg.bs)
|
||||
werr = c.sendPacket(msg.src(), msg.bs)
|
||||
c.recordQueueTime(msg.enqueuedAt)
|
||||
continue
|
||||
case msg := <-c.discoSendQueue:
|
||||
werr = c.sendPacket(msg.src, msg.bs)
|
||||
werr = c.sendPacket(msg.src(), msg.bs)
|
||||
c.recordQueueTime(msg.enqueuedAt)
|
||||
continue
|
||||
case msg := <-c.sendPongCh:
|
||||
@@ -1747,10 +1760,10 @@ func (c *sclient) sendLoop(ctx context.Context) error {
|
||||
case <-c.meshUpdate:
|
||||
werr = c.sendMeshUpdates()
|
||||
case msg := <-c.sendQueue:
|
||||
werr = c.sendPacket(msg.src, msg.bs)
|
||||
werr = c.sendPacket(msg.src(), msg.bs)
|
||||
c.recordQueueTime(msg.enqueuedAt)
|
||||
case msg := <-c.discoSendQueue:
|
||||
werr = c.sendPacket(msg.src, msg.bs)
|
||||
werr = c.sendPacket(msg.src(), msg.bs)
|
||||
c.recordQueueTime(msg.enqueuedAt)
|
||||
case msg := <-c.sendPongCh:
|
||||
werr = c.sendPong(msg)
|
||||
|
||||
182
derp/flow.go
Normal file
182
derp/flow.go
Normal file
@@ -0,0 +1,182 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package derp
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unique"
|
||||
|
||||
"tailscale.com/types/key"
|
||||
)
|
||||
|
||||
type flowKey struct {
|
||||
src, dst key.NodePublic
|
||||
}
|
||||
|
||||
// flow tracks metadata about a directional flow of packets from a source
|
||||
// node to a destination node. The public keys of the src is known
|
||||
// by the caller.
|
||||
type flow struct {
|
||||
createdUnixNano int64 // int64 instead of time.Time to keep flow smaller
|
||||
index int // index in Server.flows slice or -1 if not; guarded by Server.mu
|
||||
flowKey unique.Handle[flowKey] // TODO: make this a unique handle of two unique handles for each NodePublic?
|
||||
|
||||
roughActivityUnixTime atomic.Int64 // unix sec of recent activity, updated at most once a minute
|
||||
pktSendRegion atomic.Int64
|
||||
byteSendRegion atomic.Int64
|
||||
pktSendLocal atomic.Int64
|
||||
byteSendLocal atomic.Int64
|
||||
dropUnknownDest atomic.Int64 // no local or region client for dest
|
||||
dropGone atomic.Int64
|
||||
|
||||
// ref is the reference count of things (*Server, *sclient) holding on
|
||||
// to this flow. As of 2024-09-18 it is currently only informational
|
||||
// and not used for anything. The Server adds/removes a ref count when
|
||||
// it's remove from its map and each 0, 1 or more sclients for a given
|
||||
// recently active flow also add/remove a ref count.
|
||||
//
|
||||
// This might be used in the future as an alternate Server.flow eviction
|
||||
// strategy but for now it's just a debug tool. We do want to keep flow
|
||||
// stats surviving a brief client disconnections, so we do want Server
|
||||
// to keep at least a momentary ref count alive.
|
||||
ref atomic.Int64
|
||||
}
|
||||
|
||||
// noteActivity updates f.recentActivityUnixTime if it's been
|
||||
// more than a minute.
|
||||
func (f *flow) noteActivity() {
|
||||
now := time.Now().Unix()
|
||||
if now-f.roughActivityUnixTime.Load() > 60 {
|
||||
f.roughActivityUnixTime.Store(now)
|
||||
}
|
||||
}
|
||||
|
||||
// getMakeFlow either gets or makes a new flow for the given source and
|
||||
// destination nodes.
|
||||
func (s *Server) getMakeFlow(src, dst key.NodePublic) *flow {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.getMakeFlowLocked(src, dst)
|
||||
}
|
||||
|
||||
func (s *Server) getMakeFlowLocked(src, dst key.NodePublic) *flow {
|
||||
k := flowKey{src, dst}
|
||||
f, ok := s.flow[k]
|
||||
if ok {
|
||||
return f
|
||||
}
|
||||
now := time.Now()
|
||||
f = &flow{
|
||||
createdUnixNano: now.UnixNano(),
|
||||
index: len(s.flows),
|
||||
flowKey: unique.Make(k),
|
||||
}
|
||||
f.roughActivityUnixTime.Store(now.Unix())
|
||||
f.ref.Add(1) // for Server's ref in the s.flows map itself
|
||||
|
||||
// As penance for the one flow we're about to add to the map and slice
|
||||
// above, check two old flows for removal. We roll around and around the
|
||||
// flows slice, so this is a simple way to eventually check everything for
|
||||
// removal before we double in size.
|
||||
for range 2 {
|
||||
s.maybeCleanOldFlowLocked()
|
||||
}
|
||||
|
||||
s.flow[k] = f
|
||||
s.flows = append(s.flows, f)
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
func (s *Server) maybeCleanOldFlowLocked() {
|
||||
if len(s.flows) == 0 {
|
||||
return
|
||||
}
|
||||
s.flowCleanIndex++
|
||||
if s.flowCleanIndex >= len(s.flows) {
|
||||
s.flowCleanIndex = 0
|
||||
}
|
||||
f := s.flows[s.flowCleanIndex]
|
||||
|
||||
now := time.Now().Unix()
|
||||
ageSec := now - f.roughActivityUnixTime.Load()
|
||||
if ageSec > 3600 {
|
||||
// No activity in an hour. Remove it.
|
||||
delete(s.flow, f.flowKey.Value())
|
||||
holeIdx := f.index
|
||||
s.flows[holeIdx] = s.flows[len(s.flows)-1]
|
||||
s.flows[holeIdx].index = holeIdx
|
||||
s.flows = s.flows[:len(s.flows)-1]
|
||||
f.ref.Add(-1)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type flowAndClientSet struct {
|
||||
f *flow // always non-nil
|
||||
cs *clientSet // may be nil if peer not connected/known
|
||||
}
|
||||
|
||||
// lookupDest returns the flow (always non-nil) and sclient and/or
|
||||
// PacketForwarder (at least one of which will be nil, possibly both) for the
|
||||
// given destination node.
|
||||
|
||||
// It must only be called from the [sclient.run] goroutine.
|
||||
func (c *sclient) lookupDest(dst key.NodePublic) (_ *flow, _ *sclient, fwd PacketForwarder) {
|
||||
peer, ok := c.flows.GetOk(dst)
|
||||
if ok && peer.cs != nil {
|
||||
if c := peer.cs.activeClient.Load(); c != nil {
|
||||
// Common case for hot flows within the same node: we know the
|
||||
// clientSet and no mutex is needed.
|
||||
return peer.f, c, nil
|
||||
}
|
||||
}
|
||||
|
||||
if peer.f == nil {
|
||||
peer.f = c.s.getMakeFlow(c.key, dst)
|
||||
peer.f.ref.Add(1)
|
||||
// At least store the flow in the map, even if we don't find the
|
||||
// clientSet later. In theory we could coallesce this map write with a
|
||||
// possible one later, but they should be rare and uncontended so we
|
||||
// don't care as of 2024-09-18.
|
||||
c.flows.Set(dst, peer)
|
||||
c.maybeCleanFlows()
|
||||
}
|
||||
|
||||
srv := c.s
|
||||
srv.mu.Lock()
|
||||
set, ok := srv.clients[dst]
|
||||
if ok {
|
||||
if c := set.activeClient.Load(); c != nil {
|
||||
srv.mu.Unlock()
|
||||
peer.cs = set
|
||||
c.flows.Set(dst, peer)
|
||||
c.maybeCleanFlows()
|
||||
return peer.f, c, nil
|
||||
}
|
||||
fwd = srv.clientsMesh[dst]
|
||||
}
|
||||
srv.mu.Unlock()
|
||||
return peer.f, nil, fwd // fwd may be nil too
|
||||
}
|
||||
|
||||
// maybeCleanFlows cleans the oldest element from the client flows cache if
|
||||
// it's too big.
|
||||
//
|
||||
// It must only be called from the [sclient.run] goroutine.
|
||||
func (c *sclient) maybeCleanFlows() {
|
||||
const maxClientFlowTrack = 100
|
||||
if c.flows.Len() <= maxClientFlowTrack {
|
||||
return
|
||||
}
|
||||
|
||||
oldest, _ := c.flows.OldestKey()
|
||||
facs, ok := c.flows.PeekOk(oldest)
|
||||
if !ok {
|
||||
panic("lookupDest: OldestKey lied")
|
||||
}
|
||||
facs.f.ref.Add(-1)
|
||||
c.flows.Delete(oldest)
|
||||
}
|
||||
52
derp/flow_test.go
Normal file
52
derp/flow_test.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package derp
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"unique"
|
||||
|
||||
"go4.org/mem"
|
||||
"tailscale.com/types/key"
|
||||
)
|
||||
|
||||
func BenchmarkUnique(b *testing.B) {
|
||||
var keys [100]key.NodePublic
|
||||
for i := range keys {
|
||||
keys[i] = key.NodePublicFromRaw32(mem.B([]byte{31: byte(i)}))
|
||||
}
|
||||
b.Run("raw", func(b *testing.B) {
|
||||
m := map[flowKey]bool{}
|
||||
for range b.N {
|
||||
for _, k := range keys {
|
||||
key := flowKey{k, k}
|
||||
if _, ok := m[key]; !ok {
|
||||
m[key] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
b.Run("unique-tightmake", func(b *testing.B) {
|
||||
m := map[unique.Handle[flowKey]]bool{}
|
||||
for range b.N {
|
||||
for _, k := range keys {
|
||||
key := unique.Make(flowKey{k, k})
|
||||
if _, ok := m[key]; !ok {
|
||||
m[key] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
b.Run("unique-makeonce", func(b *testing.B) {
|
||||
m := map[unique.Handle[flowKey]]bool{}
|
||||
ukeys := make([]unique.Handle[flowKey], len(keys))
|
||||
for i, k := range keys {
|
||||
ukeys[i] = unique.Make(flowKey{k, k})
|
||||
}
|
||||
for range b.N {
|
||||
for _, key := range ukeys {
|
||||
if _, ok := m[key]; !ok {
|
||||
m[key] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -133,6 +133,15 @@ func (c *Cache[K, V]) DeleteOldest() {
|
||||
}
|
||||
}
|
||||
|
||||
// OldestKey returns the oldest key, without bumping it to the head.
|
||||
// If the cache is empty, it returns ok false.
|
||||
func (c *Cache[K, V]) OldestKey() (key K, ok bool) {
|
||||
if c.head == nil {
|
||||
return key, false
|
||||
}
|
||||
return c.head.prev.key, true
|
||||
}
|
||||
|
||||
// Len returns the number of items in the cache.
|
||||
func (c *Cache[K, V]) Len() int { return len(c.lookup) }
|
||||
|
||||
|
||||
Reference in New Issue
Block a user