Compare commits

...

1 Commits

Author SHA1 Message Date
Brad Fitzpatrick
0a317a963b derp: add session flows
A flow tracks exchanges from some source to destination, for the purpose of better understanding
who is using DERP. It currently tracks the number of packets sent, as well as the number of
bytes sent with the packets.

Signed-off-by: julianknodt <julianknodt@gmail.com>
2021-07-27 15:17:56 -07:00

View File

@@ -41,6 +41,7 @@ import (
"tailscale.com/client/tailscale"
"tailscale.com/disco"
"tailscale.com/metrics"
"tailscale.com/syncs"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/version"
@@ -129,6 +130,8 @@ type Server struct {
removePktForwardOther expvar.Int
avgQueueDuration *uint64 // In milliseconds; accessed atomically
avgFlowDuration *uint64 // In seconds; accessed atomically
// verifyClients only accepts client connections to the DERP server if the clientKey is a
// known peer in the network, as specified by a running tailscaled's client's local api.
verifyClients bool
@@ -197,6 +200,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
watchers: map[*sclient]bool{},
sentTo: map[key.Public]map[key.Public]int64{},
avgQueueDuration: new(uint64),
avgFlowDuration: new(uint64),
keyOfAddr: map[netaddr.IPPort]key.Public{},
}
s.initMetacert()
@@ -514,6 +518,8 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
// minute is less than the client's 2 minute
// inactivity timeout.
replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100),
flows: make(map[key.Public]*flow),
}
if c.canMesh {
@@ -592,6 +598,37 @@ func (c *sclient) run(ctx context.Context) error {
if err != nil {
return err
}
c.SweepFlows()
}
}
// SweepFlows deletes old flows from this client, removing any old items and
// updating avgFlowDuration.
func (c *sclient) SweepFlows() {
// delete flows which are dirty, but only when a large number have accumulated
if atomic.LoadUint32(&c.dirtyFlows) > 5 {
total := 0
totalFlowDurationSec := 0.0
// TODO do we need to iterate through the entire flow set?
for k, f := range c.flows {
if f.idle.Get() {
delete(c.flows, k)
total++
totalFlowDurationSec += f.closedAt.Sub(f.createdAt).Seconds()
}
}
atomic.AddUint32(&c.dirtyFlows, ^uint32(total-1))
avgFlowDurationSec := totalFlowDurationSec / float64(total)
for {
old := atomic.LoadUint64(c.s.avgFlowDuration)
newAvg := expMovingAverage(math.Float64frombits(old), avgFlowDurationSec, 0.1)
if atomic.CompareAndSwapUint64(c.s.avgFlowDuration, old, math.Float64bits(newAvg)) {
break
}
}
}
}
@@ -704,20 +741,47 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
}
var fwd PacketForwarder
s.mu.Lock()
dst := s.clients[dstKey]
if dst == nil {
fwd = s.clientsMesh[dstKey]
} else {
s.notePeerSendLocked(c.key, dst)
var dst *sclient
f, ok := c.flows[dstKey]
if ok {
if f.dst != nil {
select {
case <-f.dst.done:
default:
dst = f.dst
}
}
} else /* dst still is nil */ {
s.mu.Lock()
dst = s.clients[dstKey]
if dst == nil {
fwd = s.clientsMesh[dstKey]
} else {
s.notePeerSendLocked(c.key, dst)
}
s.mu.Unlock()
if dst != nil {
f = &flow{
c: c,
dst: dst,
idleTimer: time.AfterFunc(FlowTimerDuration, f.close),
createdAt: time.Now(),
}
c.flows[dstKey] = f
}
}
if f != nil {
f.sawPacket(uint64(len(contents)))
}
s.mu.Unlock()
if dst == nil {
if fwd != nil {
s.packetsForwardedOut.Add(1)
if err := fwd.ForwardPacket(c.key, dstKey, contents); err != nil {
// TODO:
// TODO: failed to forward packet when intended to send it.
return nil
}
return nil
@@ -726,12 +790,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return nil
}
p := pkt{
return c.sendPkt(dst, pkt{
bs: contents,
enqueuedAt: time.Now(),
src: c.key,
}
return c.sendPkt(dst, p)
})
}
// dropReason is why we dropped a DERP frame.
@@ -971,6 +1034,46 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d
return srcKey, dstKey, contents, nil
}
// FlowTimerDuration is the time which we consider a flow to be active.
// After this time, we consider it to be idle, and can never return to an active state.
var FlowTimerDuration = time.Minute
type flow struct {
c *sclient // source of flow
dst *sclient // non-nil for local client; nil means use forwarder
// Various stats:
pkts uint64
bytes uint64
idleTimer *time.Timer
createdAt time.Time
closedAt time.Time
// idle indicates this flow expired once, and should be cleaned up.
// Once set, it should always remain true.
idle syncs.AtomicBool
}
func (f *flow) sawPacket(bytes uint64) {
if f.idle.Get() {
return
}
f.pkts += 1
f.bytes += bytes
// do not care if f.close is called twice, as it's idempotent.
f.idleTimer.Reset(FlowTimerDuration)
}
func (f *flow) close() {
if changed := f.idle.Swap(true); !changed {
return
}
f.closedAt = time.Now()
atomic.AddUint32(&f.c.dirtyFlows, 1)
// TODO(jknodt): log items to the server here? Or lazily log them when the flow is closed?
}
// sclient is a client connection to the server.
//
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
@@ -1001,6 +1104,10 @@ type sclient struct {
connectedAt time.Time
preferred bool
// dirtyFlows is the number of dirty flows owned by this client that needs to be cleaned up.
dirtyFlows uint32
flows map[key.Public]*flow
// Owned by sender, not thread-safe.
bw *bufio.Writer
@@ -1417,6 +1524,9 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("average_queue_duration_ms", expvar.Func(func() interface{} {
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
}))
m.Set("average_flow_duration_sec", expvar.Func(func() interface{} {
return math.Float64frombits(atomic.LoadUint64(s.avgFlowDuration))
}))
var expvarVersion expvar.String
expvarVersion.Set(version.Long)
m.Set("version", &expvarVersion)