Compare commits
1 Commits
main
...
percy/derp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b341f3ffcf |
@@ -52,6 +52,8 @@ import (
|
||||
"tailscale.com/util/set"
|
||||
"tailscale.com/util/slicesx"
|
||||
"tailscale.com/version"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// verboseDropKeys is the set of destination public keys that should
|
||||
@@ -139,37 +141,39 @@ type Server struct {
|
||||
debug bool
|
||||
|
||||
// Counters:
|
||||
packetsSent, bytesSent expvar.Int
|
||||
packetsRecv, bytesRecv expvar.Int
|
||||
packetsRecvByKind metrics.LabelMap
|
||||
packetsRecvDisco *expvar.Int
|
||||
packetsRecvOther *expvar.Int
|
||||
_ align64
|
||||
packetsForwardedOut expvar.Int
|
||||
packetsForwardedIn expvar.Int
|
||||
peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent
|
||||
peerGoneNotHereFrames expvar.Int // number of peer not here frames sent
|
||||
gotPing expvar.Int // number of ping frames from client
|
||||
sentPong expvar.Int // number of pong frames enqueued to client
|
||||
accepts expvar.Int
|
||||
curClients expvar.Int
|
||||
curClientsNotIdeal expvar.Int
|
||||
curHomeClients expvar.Int // ones with preferred
|
||||
dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
|
||||
dupClientConns expvar.Int // current number of connections sharing a public key
|
||||
dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
|
||||
unknownFrames expvar.Int
|
||||
homeMovesIn expvar.Int // established clients announce home server moves in
|
||||
homeMovesOut expvar.Int // established clients announce home server moves out
|
||||
multiForwarderCreated expvar.Int
|
||||
multiForwarderDeleted expvar.Int
|
||||
removePktForwardOther expvar.Int
|
||||
sclientWriteTimeouts expvar.Int
|
||||
avgQueueDuration *uint64 // In milliseconds; accessed atomically
|
||||
tcpRtt metrics.LabelMap // histogram
|
||||
meshUpdateBatchSize *metrics.Histogram
|
||||
meshUpdateLoopCount *metrics.Histogram
|
||||
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
|
||||
packetsSent, bytesSent expvar.Int
|
||||
packetsRecv, bytesRecv expvar.Int
|
||||
packetsRecvByKind metrics.LabelMap
|
||||
packetsRecvDisco *expvar.Int
|
||||
packetsRecvOther *expvar.Int
|
||||
_ align64
|
||||
packetsForwardedOut expvar.Int
|
||||
packetsForwardedIn expvar.Int
|
||||
peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent
|
||||
peerGoneNotHereFrames expvar.Int // number of peer not here frames sent
|
||||
gotPing expvar.Int // number of ping frames from client
|
||||
sentPong expvar.Int // number of pong frames enqueued to client
|
||||
accepts expvar.Int
|
||||
curClients expvar.Int
|
||||
curClientsNotIdeal expvar.Int
|
||||
curHomeClients expvar.Int // ones with preferred
|
||||
dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
|
||||
dupClientConns expvar.Int // current number of connections sharing a public key
|
||||
dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
|
||||
unknownFrames expvar.Int
|
||||
homeMovesIn expvar.Int // established clients announce home server moves in
|
||||
homeMovesOut expvar.Int // established clients announce home server moves out
|
||||
multiForwarderCreated expvar.Int
|
||||
multiForwarderDeleted expvar.Int
|
||||
removePktForwardOther expvar.Int
|
||||
sclientWriteTimeouts expvar.Int
|
||||
avgQueueDuration *uint64 // In milliseconds; accessed atomically
|
||||
tcpRtt metrics.LabelMap // histogram
|
||||
meshUpdateBatchSize *metrics.Histogram
|
||||
meshUpdateLoopCount *metrics.Histogram
|
||||
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
|
||||
packetsRecvDropRateByClient *prometheus.HistogramVec
|
||||
packetsSentDropRateByClient *prometheus.HistogramVec
|
||||
|
||||
// verifyClientsLocalTailscaled only accepts client connections to the DERP
|
||||
// server if the clientKey is a known peer in the network, as specified by a
|
||||
@@ -381,8 +385,18 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
|
||||
meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}),
|
||||
meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}),
|
||||
bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}),
|
||||
keyOfAddr: map[netip.AddrPort]key.NodePublic{},
|
||||
clock: tstime.StdClock{},
|
||||
packetsRecvDropRateByClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "clients_by_packet_loss_rate_recv",
|
||||
Help: "Histogram counting clients by the packet loss rate of packets received from those clients, by reason and kind",
|
||||
Buckets: []float64{0.001, .005, .01, .025, .05, .1, .25, .5, 1},
|
||||
}, []string{"kind", "reason"}),
|
||||
packetsSentDropRateByClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "clients_by_packet_loss_rate_send",
|
||||
Help: "Histogram counting clients by the packet loss rate of packets sent to those clients, by reason and kind",
|
||||
Buckets: []float64{0.001, .005, .01, .025, .05, .1, .25, .5, 1},
|
||||
}, []string{"kind", "reason"}),
|
||||
keyOfAddr: map[netip.AddrPort]key.NodePublic{},
|
||||
clock: tstime.StdClock{},
|
||||
}
|
||||
s.initMetacert()
|
||||
s.packetsRecvDisco = s.packetsRecvByKind.Get(string(packetKindDisco))
|
||||
@@ -391,6 +405,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
|
||||
genPacketsDroppedCounters()
|
||||
|
||||
s.perClientSendQueueDepth = getPerClientSendQueueDepth()
|
||||
|
||||
go s.collectDropsByClient()
|
||||
return s
|
||||
}
|
||||
|
||||
@@ -1105,7 +1121,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
|
||||
} else {
|
||||
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
|
||||
}
|
||||
s.recordDrop(contents, srcKey, dstKey, reason)
|
||||
s.recordDrop(c, dst, contents, srcKey, dstKey, reason)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1122,7 +1138,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
|
||||
func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
s := c.s
|
||||
|
||||
dstKey, contents, err := s.recvPacket(c.br, fl)
|
||||
dstKey, contents, err := c.recvPacket(fl)
|
||||
if err != nil {
|
||||
return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
|
||||
}
|
||||
@@ -1158,7 +1174,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
} else {
|
||||
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
|
||||
}
|
||||
s.recordDrop(contents, c.key, dstKey, reason)
|
||||
s.recordDrop(c, dst, contents, c.key, dstKey, reason)
|
||||
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
|
||||
return nil
|
||||
}
|
||||
@@ -1196,16 +1212,108 @@ const (
|
||||
dropReasonDupClient dropReason = "dup_client" // the public key is connected 2+ times (active/active, fighting)
|
||||
)
|
||||
|
||||
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) {
|
||||
type dropsByReason struct {
|
||||
total float64
|
||||
unknownDest float64
|
||||
unknownDestOnFwd float64
|
||||
goneDisconnected float64
|
||||
queueHead float64
|
||||
queueTail float64
|
||||
writeError float64
|
||||
dupClient float64
|
||||
}
|
||||
|
||||
func (d *dropsByReason) recordDrop(reason dropReason) {
|
||||
d.total += 1
|
||||
switch reason {
|
||||
case dropReasonUnknownDest:
|
||||
d.unknownDest += 1
|
||||
case dropReasonUnknownDestOnFwd:
|
||||
d.unknownDestOnFwd += 1
|
||||
case dropReasonGoneDisconnected:
|
||||
d.goneDisconnected += 1
|
||||
case dropReasonQueueHead:
|
||||
d.queueHead += 1
|
||||
case dropReasonQueueTail:
|
||||
d.queueTail += 1
|
||||
case dropReasonWriteError:
|
||||
d.writeError += 1
|
||||
case dropReasonDupClient:
|
||||
d.dupClient += 1
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dropsByReason) reset() {
|
||||
d.total = 0
|
||||
d.unknownDest = 0
|
||||
d.unknownDestOnFwd = 0
|
||||
d.goneDisconnected = 0
|
||||
d.queueHead = 0
|
||||
d.queueTail = 0
|
||||
d.writeError = 0
|
||||
d.dupClient = 0
|
||||
}
|
||||
|
||||
// collect collects packet drop rates into the given HistogramVec. The rates
|
||||
// are labeled with the given kind and the relevant drop reasons. The rates are
|
||||
// calculated as the number of drops divided by the total number of packets.
|
||||
// If includeDrops is true, the total drops are added to the given total. This
|
||||
// is used for send statistics since the packets sent counter only includes
|
||||
// packets that weren't dropped.
|
||||
func (d *dropsByReason) collect(hv *prometheus.HistogramVec, kind packetKind, total float64, includeDrops bool) {
|
||||
if includeDrops {
|
||||
total += d.total
|
||||
}
|
||||
|
||||
if total == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
kindString := string(kind)
|
||||
hv.WithLabelValues(kindString, string(dropReasonUnknownDest)).Observe(d.unknownDest / total)
|
||||
hv.WithLabelValues(kindString, string(dropReasonUnknownDestOnFwd)).Observe(d.unknownDestOnFwd / total)
|
||||
hv.WithLabelValues(kindString, string(dropReasonGoneDisconnected)).Observe(d.goneDisconnected / total)
|
||||
hv.WithLabelValues(kindString, string(dropReasonQueueHead)).Observe(d.queueHead / total)
|
||||
hv.WithLabelValues(kindString, string(dropReasonQueueTail)).Observe(d.queueTail / total)
|
||||
hv.WithLabelValues(kindString, string(dropReasonWriteError)).Observe(d.writeError / total)
|
||||
hv.WithLabelValues(kindString, string(dropReasonDupClient)).Observe(d.dupClient / total)
|
||||
}
|
||||
|
||||
func (s *Server) recordDrop(src, dst *sclient, packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) {
|
||||
labels := dropReasonKindLabels{
|
||||
Reason: string(reason),
|
||||
}
|
||||
looksDisco := disco.LooksLikeDiscoWrapper(packetBytes)
|
||||
if src != nil {
|
||||
src.dropsMu.Lock()
|
||||
}
|
||||
if dst != nil {
|
||||
dst.dropsMu.Lock()
|
||||
}
|
||||
if looksDisco {
|
||||
labels.Kind = string(packetKindDisco)
|
||||
if src != nil {
|
||||
src.discoRecvDropsByReason.recordDrop(reason)
|
||||
}
|
||||
if dst != nil {
|
||||
dst.discoSendDropsByReason.recordDrop(reason)
|
||||
}
|
||||
} else {
|
||||
labels.Kind = string(packetKindOther)
|
||||
if src != nil {
|
||||
src.otherRecvDropsByReason.recordDrop(reason)
|
||||
}
|
||||
if dst != nil {
|
||||
dst.otherSendDropsByReason.recordDrop(reason)
|
||||
}
|
||||
}
|
||||
if src != nil {
|
||||
src.dropsMu.Unlock()
|
||||
}
|
||||
if dst != nil {
|
||||
dst.dropsMu.Unlock()
|
||||
}
|
||||
|
||||
packetsDropped.Add(labels, 1)
|
||||
|
||||
if verboseDropKeys[dstKey] {
|
||||
@@ -1220,7 +1328,6 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r
|
||||
}
|
||||
|
||||
func (c *sclient) sendPkt(dst *sclient, p pkt) error {
|
||||
s := c.s
|
||||
dstKey := dst.key
|
||||
|
||||
// Attempt to queue for sending up to 3 times. On each attempt, if
|
||||
@@ -1233,7 +1340,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
|
||||
for attempt := 0; attempt < 3; attempt++ {
|
||||
select {
|
||||
case <-dst.done:
|
||||
s.recordDrop(p.bs, c.key, dstKey, dropReasonGoneDisconnected)
|
||||
c.s.recordDrop(c, dst, p.bs, c.key, dstKey, dropReasonGoneDisconnected)
|
||||
dst.debugLogf("sendPkt attempt %d dropped, dst gone", attempt)
|
||||
return nil
|
||||
default:
|
||||
@@ -1247,7 +1354,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
|
||||
|
||||
select {
|
||||
case pkt := <-sendQueue:
|
||||
s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead)
|
||||
c.s.recordDrop(c, dst, pkt.bs, c.key, dstKey, dropReasonQueueHead)
|
||||
c.recordQueueTime(pkt.enqueuedAt)
|
||||
default:
|
||||
}
|
||||
@@ -1255,7 +1362,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
|
||||
// Failed to make room for packet. This can happen in a heavily
|
||||
// contended queue with racing writers. Give up and tail-drop in
|
||||
// this case to keep reader unblocked.
|
||||
s.recordDrop(p.bs, c.key, dstKey, dropReasonQueueTail)
|
||||
c.s.recordDrop(c, dst, p.bs, c.key, dstKey, dropReasonQueueTail)
|
||||
dst.debugLogf("sendPkt attempt %d dropped, queue full")
|
||||
|
||||
return nil
|
||||
@@ -1497,7 +1604,10 @@ func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info
|
||||
return clientKey, info, nil
|
||||
}
|
||||
|
||||
func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
|
||||
func (c *sclient) recvPacket(frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
|
||||
s := c.s
|
||||
br := c.br
|
||||
|
||||
if frameLen < keyLen {
|
||||
return zpub, nil, errors.New("short send packet frame")
|
||||
}
|
||||
@@ -1512,12 +1622,15 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodeP
|
||||
if _, err := io.ReadFull(br, contents); err != nil {
|
||||
return zpub, nil, err
|
||||
}
|
||||
|
||||
s.packetsRecv.Add(1)
|
||||
s.bytesRecv.Add(int64(len(contents)))
|
||||
if disco.LooksLikeDiscoWrapper(contents) {
|
||||
s.packetsRecvDisco.Add(1)
|
||||
c.packetsRecvDisco.Add(1)
|
||||
} else {
|
||||
s.packetsRecvOther.Add(1)
|
||||
c.packetsRecvOther.Add(1)
|
||||
}
|
||||
return dstKey, contents, nil
|
||||
}
|
||||
@@ -1598,6 +1711,15 @@ type sclient struct {
|
||||
// client that it's trying to establish a direct connection
|
||||
// through us with a peer we have no record of.
|
||||
peerGoneLim *rate.Limiter
|
||||
|
||||
packetsRecvDisco, packetsRecvOther, packetsSentDisco, packetsSentOther atomic.Uint64
|
||||
|
||||
// dropsMu guards the below packet drop metrics
|
||||
dropsMu sync.Mutex
|
||||
discoRecvDropsByReason dropsByReason
|
||||
discoSendDropsByReason dropsByReason
|
||||
otherRecvDropsByReason dropsByReason
|
||||
otherSendDropsByReason dropsByReason
|
||||
}
|
||||
|
||||
func (c *sclient) presentFlags() PeerPresentFlags {
|
||||
@@ -1708,9 +1830,9 @@ func (c *sclient) onSendLoopDone() {
|
||||
for {
|
||||
select {
|
||||
case pkt := <-c.sendQueue:
|
||||
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
|
||||
c.s.recordDrop(nil, c, pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
|
||||
case pkt := <-c.discoSendQueue:
|
||||
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
|
||||
c.s.recordDrop(nil, c, pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
|
||||
default:
|
||||
return
|
||||
}
|
||||
@@ -1917,7 +2039,7 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error)
|
||||
defer func() {
|
||||
// Stats update.
|
||||
if err != nil {
|
||||
c.s.recordDrop(contents, srcKey, c.key, dropReasonWriteError)
|
||||
c.s.recordDrop(nil, c, contents, srcKey, c.key, dropReasonWriteError)
|
||||
} else {
|
||||
c.s.packetsSent.Add(1)
|
||||
c.s.bytesSent.Add(int64(len(contents)))
|
||||
@@ -2149,6 +2271,8 @@ func (s *Server) ExpVar() expvar.Var {
|
||||
m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize)
|
||||
m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount)
|
||||
m.Set("counter_buffered_write_frames", s.bufferedWriteFrames)
|
||||
m.Set("packets_recv_drop_rate_by_client", collectorVar{s.packetsRecvDropRateByClient})
|
||||
m.Set("packets_send_drop_rate_by_client", collectorVar{s.packetsSentDropRateByClient})
|
||||
var expvarVersion expvar.String
|
||||
expvarVersion.Set(version.Long())
|
||||
m.Set("version", &expvarVersion)
|
||||
@@ -2341,3 +2465,43 @@ func (w *lazyBufioWriter) Flush() error {
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// monitorQueueDepths maintains histograms of send queue depths for disco and
|
||||
// non-disco traffic. It observes queue depths for all active clients every 10
|
||||
// seconds.
|
||||
func (s *Server) collectDropsByClient() {
|
||||
t := time.NewTicker(10 * time.Second)
|
||||
|
||||
var clients []*clientSet
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
clients = clients[:0]
|
||||
s.mu.Lock()
|
||||
for _, cs := range s.clients {
|
||||
clients = append(clients, cs)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
for _, cs := range clients {
|
||||
if c := cs.activeClient.Load(); c != nil {
|
||||
discoRecv, otherRecv, discoSent, otherSent := c.packetsRecvDisco.Swap(0), c.packetsRecvOther.Swap(0), c.packetsSentDisco.Swap(0), c.packetsSentOther.Swap(0)
|
||||
c.dropsMu.Lock()
|
||||
c.discoRecvDropsByReason.collect(s.packetsRecvDropRateByClient, packetKindDisco, float64(discoRecv), false)
|
||||
c.otherRecvDropsByReason.collect(s.packetsRecvDropRateByClient, packetKindOther, float64(otherRecv), false)
|
||||
c.discoSendDropsByReason.collect(s.packetsSentDropRateByClient, packetKindDisco, float64(discoSent), true)
|
||||
c.otherSendDropsByReason.collect(s.packetsSentDropRateByClient, packetKindOther, float64(otherSent), true)
|
||||
c.discoRecvDropsByReason.reset()
|
||||
c.otherRecvDropsByReason.reset()
|
||||
c.discoSendDropsByReason.reset()
|
||||
c.otherSendDropsByReason.reset()
|
||||
c.dropsMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// clients does retain clientSets, but these are fairly lightweight since they just point at actual clients
|
||||
clients = clients[:0]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
31
derp/prometheus_adapter.go
Normal file
31
derp/prometheus_adapter.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package derp
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
)
|
||||
|
||||
var (
|
||||
expFormat = expfmt.NewFormat(expfmt.TypeTextPlain)
|
||||
)
|
||||
|
||||
// collectorVar implements expvar.Var and metrics.PrometheusWriter
|
||||
type collectorVar struct {
|
||||
prometheus.Collector
|
||||
}
|
||||
|
||||
func (cw collectorVar) String() string {
|
||||
return `"CollectorVar"`
|
||||
}
|
||||
|
||||
func (cw collectorVar) WritePrometheus(w io.Writer, name string) {
|
||||
reg := prometheus.NewRegistry()
|
||||
_ = reg.Register(cw)
|
||||
mfs, _ := reg.Gather()
|
||||
enc := expfmt.NewEncoder(w, expFormat)
|
||||
for _, mf := range mfs {
|
||||
_ = enc.Encode(mf)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user