Compare commits
1 Commits
awly/cli-j
...
jknodt/der
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a317a963b |
@@ -41,6 +41,7 @@ import (
|
||||
"tailscale.com/client/tailscale"
|
||||
"tailscale.com/disco"
|
||||
"tailscale.com/metrics"
|
||||
"tailscale.com/syncs"
|
||||
"tailscale.com/types/key"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/version"
|
||||
@@ -129,6 +130,8 @@ type Server struct {
|
||||
removePktForwardOther expvar.Int
|
||||
avgQueueDuration *uint64 // In milliseconds; accessed atomically
|
||||
|
||||
avgFlowDuration *uint64 // In seconds; accessed atomically
|
||||
|
||||
// verifyClients only accepts client connections to the DERP server if the clientKey is a
|
||||
// known peer in the network, as specified by a running tailscaled's client's local api.
|
||||
verifyClients bool
|
||||
@@ -197,6 +200,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
|
||||
watchers: map[*sclient]bool{},
|
||||
sentTo: map[key.Public]map[key.Public]int64{},
|
||||
avgQueueDuration: new(uint64),
|
||||
avgFlowDuration: new(uint64),
|
||||
keyOfAddr: map[netaddr.IPPort]key.Public{},
|
||||
}
|
||||
s.initMetacert()
|
||||
@@ -514,6 +518,8 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
|
||||
// minute is less than the client's 2 minute
|
||||
// inactivity timeout.
|
||||
replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100),
|
||||
|
||||
flows: make(map[key.Public]*flow),
|
||||
}
|
||||
|
||||
if c.canMesh {
|
||||
@@ -592,6 +598,37 @@ func (c *sclient) run(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.SweepFlows()
|
||||
}
|
||||
}
|
||||
|
||||
// SweepFlows deletes old flows from this client, removing any old items and
|
||||
// updating avgFlowDuration.
|
||||
func (c *sclient) SweepFlows() {
|
||||
|
||||
// delete flows which are dirty, but only when a large number have accumulated
|
||||
if atomic.LoadUint32(&c.dirtyFlows) > 5 {
|
||||
total := 0
|
||||
totalFlowDurationSec := 0.0
|
||||
// TODO do we need to iterate through the entire flow set?
|
||||
for k, f := range c.flows {
|
||||
if f.idle.Get() {
|
||||
delete(c.flows, k)
|
||||
total++
|
||||
totalFlowDurationSec += f.closedAt.Sub(f.createdAt).Seconds()
|
||||
}
|
||||
}
|
||||
atomic.AddUint32(&c.dirtyFlows, ^uint32(total-1))
|
||||
|
||||
avgFlowDurationSec := totalFlowDurationSec / float64(total)
|
||||
for {
|
||||
old := atomic.LoadUint64(c.s.avgFlowDuration)
|
||||
newAvg := expMovingAverage(math.Float64frombits(old), avgFlowDurationSec, 0.1)
|
||||
if atomic.CompareAndSwapUint64(c.s.avgFlowDuration, old, math.Float64bits(newAvg)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -704,20 +741,47 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
}
|
||||
|
||||
var fwd PacketForwarder
|
||||
s.mu.Lock()
|
||||
dst := s.clients[dstKey]
|
||||
if dst == nil {
|
||||
fwd = s.clientsMesh[dstKey]
|
||||
} else {
|
||||
s.notePeerSendLocked(c.key, dst)
|
||||
var dst *sclient
|
||||
|
||||
f, ok := c.flows[dstKey]
|
||||
if ok {
|
||||
if f.dst != nil {
|
||||
select {
|
||||
case <-f.dst.done:
|
||||
default:
|
||||
dst = f.dst
|
||||
}
|
||||
}
|
||||
} else /* dst still is nil */ {
|
||||
s.mu.Lock()
|
||||
dst = s.clients[dstKey]
|
||||
if dst == nil {
|
||||
fwd = s.clientsMesh[dstKey]
|
||||
} else {
|
||||
s.notePeerSendLocked(c.key, dst)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
if dst != nil {
|
||||
f = &flow{
|
||||
c: c,
|
||||
dst: dst,
|
||||
idleTimer: time.AfterFunc(FlowTimerDuration, f.close),
|
||||
createdAt: time.Now(),
|
||||
}
|
||||
c.flows[dstKey] = f
|
||||
}
|
||||
}
|
||||
|
||||
if f != nil {
|
||||
f.sawPacket(uint64(len(contents)))
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
if dst == nil {
|
||||
if fwd != nil {
|
||||
s.packetsForwardedOut.Add(1)
|
||||
if err := fwd.ForwardPacket(c.key, dstKey, contents); err != nil {
|
||||
// TODO:
|
||||
// TODO: failed to forward packet when intended to send it.
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
@@ -726,12 +790,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
p := pkt{
|
||||
return c.sendPkt(dst, pkt{
|
||||
bs: contents,
|
||||
enqueuedAt: time.Now(),
|
||||
src: c.key,
|
||||
}
|
||||
return c.sendPkt(dst, p)
|
||||
})
|
||||
}
|
||||
|
||||
// dropReason is why we dropped a DERP frame.
|
||||
@@ -971,6 +1034,46 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d
|
||||
return srcKey, dstKey, contents, nil
|
||||
}
|
||||
|
||||
// FlowTimerDuration is the time which we consider a flow to be active.
|
||||
// After this time, we consider it to be idle, and can never return to an active state.
|
||||
var FlowTimerDuration = time.Minute
|
||||
|
||||
type flow struct {
|
||||
c *sclient // source of flow
|
||||
dst *sclient // non-nil for local client; nil means use forwarder
|
||||
|
||||
// Various stats:
|
||||
pkts uint64
|
||||
bytes uint64
|
||||
|
||||
idleTimer *time.Timer
|
||||
createdAt time.Time
|
||||
closedAt time.Time
|
||||
|
||||
// idle indicates this flow expired once, and should be cleaned up.
|
||||
// Once set, it should always remain true.
|
||||
idle syncs.AtomicBool
|
||||
}
|
||||
|
||||
func (f *flow) sawPacket(bytes uint64) {
|
||||
if f.idle.Get() {
|
||||
return
|
||||
}
|
||||
f.pkts += 1
|
||||
f.bytes += bytes
|
||||
// do not care if f.close is called twice, as it's idempotent.
|
||||
f.idleTimer.Reset(FlowTimerDuration)
|
||||
}
|
||||
|
||||
func (f *flow) close() {
|
||||
if changed := f.idle.Swap(true); !changed {
|
||||
return
|
||||
}
|
||||
f.closedAt = time.Now()
|
||||
atomic.AddUint32(&f.c.dirtyFlows, 1)
|
||||
// TODO(jknodt): log items to the server here? Or lazily log them when the flow is closed?
|
||||
}
|
||||
|
||||
// sclient is a client connection to the server.
|
||||
//
|
||||
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
|
||||
@@ -1001,6 +1104,10 @@ type sclient struct {
|
||||
connectedAt time.Time
|
||||
preferred bool
|
||||
|
||||
// dirtyFlows is the number of dirty flows owned by this client that needs to be cleaned up.
|
||||
dirtyFlows uint32
|
||||
flows map[key.Public]*flow
|
||||
|
||||
// Owned by sender, not thread-safe.
|
||||
bw *bufio.Writer
|
||||
|
||||
@@ -1417,6 +1524,9 @@ func (s *Server) ExpVar() expvar.Var {
|
||||
m.Set("average_queue_duration_ms", expvar.Func(func() interface{} {
|
||||
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
|
||||
}))
|
||||
m.Set("average_flow_duration_sec", expvar.Func(func() interface{} {
|
||||
return math.Float64frombits(atomic.LoadUint64(s.avgFlowDuration))
|
||||
}))
|
||||
var expvarVersion expvar.String
|
||||
expvarVersion.Set(version.Long)
|
||||
m.Set("version", &expvarVersion)
|
||||
|
||||
Reference in New Issue
Block a user