Compare commits

...

3 Commits

Author SHA1 Message Date
James Tucker
6a69c05c04 control/controlclient: add debug netmap refcount
Signed-off-by: Brad Fitzpatrick <brad@tailscale.com>
Signed-off-by: James Tucker <james@tailscale.com>
2025-01-24 13:37:41 -08:00
Andrew Dunham
a62fb13b63 util/execqueue: add metrics
Expose enough metrics to get a sense of queue depth, use and if it has
stalled.

Updates tailscale/corp#26058

Signed-off-by: Andrew Dunham <andrew@du.nham.ca>
Change-Id: I271ac8d03f3db587a33aca6964fe92f2833e1251
2025-01-24 13:37:41 -08:00
Brad Fitzpatrick
0b4d9065e6 control/controlclient: skip SetControlClientStatus when queue has newer results later
Updates tailscale/corp#26058

Change-Id: I3033d235ca49f9739fdf3deaf603eea4ec3e407e
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2025-01-24 13:36:28 -08:00
3 changed files with 93 additions and 2 deletions

View File

@@ -6,12 +6,14 @@ package controlclient
import (
"context"
"errors"
"expvar"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
"tailscale.com/envknob"
"tailscale.com/logtail/backoff"
"tailscale.com/net/sockstats"
"tailscale.com/tailcfg"
@@ -131,6 +133,8 @@ type Auto struct {
// the server.
lastUpdateGen updateGen
lastStatus atomic.Pointer[Status]
paused bool // whether we should stop making HTTP requests
unpauseWaiters []chan bool // chans that gets sent true (once) on wake, or false on Shutdown
loggedIn bool // true if currently logged in
@@ -596,21 +600,61 @@ func (c *Auto) sendStatus(who string, err error, url string, nm *netmap.NetworkM
// not logged in.
nm = nil
}
new := Status{
newSt := &Status{
URL: url,
Persist: p,
NetMap: nm,
Err: err,
state: state,
}
c.lastStatus.Store(newSt)
// Launch a new goroutine to avoid blocking the caller while the observer
// does its thing, which may result in a call back into the client.
c.observerQueue.Add(func() {
c.observer.SetControlClientStatus(c, new)
if c.canSkipStatus(newSt) {
metricSkippable.Add(1)
if debugSkipQueue() {
metricSkipped.Add(1)
return
}
}
c.observer.SetControlClientStatus(c, *newSt)
c.lastStatus.CompareAndSwap(newSt, nil)
})
}
var debugSkipQueue = envknob.RegisterBool("TS_DEBUG_SKIP_STATUS_QUEUE")
var (
metricSkippable = expvar.NewInt("controlclient_auto_status_queue_skippable")
metricSkipped = expvar.NewInt("controlclient_auto_status_queue_skipped")
)
func (c *Auto) canSkipStatus(s1 *Status) bool {
s2 := c.lastStatus.Load()
if s2 == nil {
return false
}
if s1 == s2 {
return false
}
if s1.Err != nil || s1.URL != "" {
return false
}
if !s1.Persist.Equals(s2.Persist) || s1.state != s2.state {
return false
}
if s1.NetMap != nil && s2.NetMap != nil {
// If s1 doesn't have an error and doesn't have a URL
// and is otherwise identical to the latest one
// other than the a different NetMap, then this is
// skippable.
return true
}
return false
}
func (c *Auto) Login(flags LoginFlags) {
c.logf("client.Login(%v)", flags)

View File

@@ -15,6 +15,7 @@ import (
"slices"
"strconv"
"sync"
"sync/atomic"
"time"
"tailscale.com/control/controlknobs"
@@ -817,6 +818,8 @@ func (ms *mapSession) sortedPeers() []tailcfg.NodeView {
return ret
}
var numNetmaps atomic.Int64
// netmap returns a fully populated NetworkMap from the last state seen from
// a call to updateStateFromResponse, filling in omitted
// information from prior MapResponse values.
@@ -841,6 +844,10 @@ func (ms *mapSession) netmap() *netmap.NetworkMap {
TKAEnabled: ms.lastTKAInfo != nil && !ms.lastTKAInfo.Disabled,
MaxKeyDuration: ms.lastMaxExpiry,
}
ms.logf("XXX NetMap++ => %v", numNetmaps.Add(1))
runtime.SetFinalizer(nm, func(nm *netmap.NetworkMap) {
ms.logf("XXX NetMap-- => %v", numNetmaps.Add(-1))
})
if ms.lastTKAInfo != nil && ms.lastTKAInfo.Head != "" {
if err := nm.TKAHead.UnmarshalText([]byte(ms.lastTKAInfo.Head)); err != nil {

View File

@@ -7,7 +7,11 @@ package execqueue
import (
"context"
"errors"
"expvar"
"fmt"
"sync"
"sync/atomic"
"time"
)
type ExecQueue struct {
@@ -16,9 +20,36 @@ type ExecQueue struct {
inFlight bool // whether a goroutine is running q.run
doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
queue []func()
// metrics follow
metricsRegisterOnce sync.Once
metricInserts expvar.Int
metricRemovals expvar.Int
metricQueueLastDrain expvar.Int // unix millis
}
// This is extremely silly but is for debugging
var metricsCounter atomic.Int64
// registerMetrics registers the queue's metrics with expvar, using a unique name.
func (q *ExecQueue) registerMetrics() {
q.metricsRegisterOnce.Do(func() {
m := new(expvar.Map).Init()
m.Set("inserts", &q.metricInserts)
m.Set("removals", &q.metricRemovals)
m.Set("length", expvar.Func(func() any {
return q.metricInserts.Value() - q.metricRemovals.Value()
}))
m.Set("last_drain", &q.metricQueueLastDrain)
name := fmt.Sprintf("execqueue-%d", metricsCounter.Add(1))
expvar.Publish(name, m)
})
}
func (q *ExecQueue) Add(f func()) {
q.registerMetrics()
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
@@ -26,6 +57,7 @@ func (q *ExecQueue) Add(f func()) {
}
if q.inFlight {
q.queue = append(q.queue, f)
q.metricInserts.Add(1)
} else {
q.inFlight = true
go q.run(f)
@@ -35,6 +67,8 @@ func (q *ExecQueue) Add(f func()) {
// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
q.registerMetrics()
for {
if err := q.Wait(ctx); err != nil {
return err
@@ -61,11 +95,13 @@ func (q *ExecQueue) run(f func()) {
f := q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]
q.metricRemovals.Add(1)
q.mu.Unlock()
f()
q.mu.Lock()
}
q.inFlight = false
q.metricQueueLastDrain.Set(int64(time.Now().UnixMilli()))
q.queue = nil
if q.doneWaiter != nil {
close(q.doneWaiter)
@@ -76,6 +112,8 @@ func (q *ExecQueue) run(f func()) {
// Shutdown asynchronously signals the queue to stop.
func (q *ExecQueue) Shutdown() {
q.registerMetrics()
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
@@ -83,6 +121,8 @@ func (q *ExecQueue) Shutdown() {
// Wait waits for the queue to be empty.
func (q *ExecQueue) Wait(ctx context.Context) error {
q.registerMetrics()
q.mu.Lock()
waitCh := q.doneWaiter
if q.inFlight && waitCh == nil {