Compare commits
3 Commits
andrew/exe
...
raggi/hell
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a69c05c04 | ||
|
|
a62fb13b63 | ||
|
|
0b4d9065e6 |
@@ -6,12 +6,14 @@ package controlclient
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"tailscale.com/envknob"
|
||||
"tailscale.com/logtail/backoff"
|
||||
"tailscale.com/net/sockstats"
|
||||
"tailscale.com/tailcfg"
|
||||
@@ -131,6 +133,8 @@ type Auto struct {
|
||||
// the server.
|
||||
lastUpdateGen updateGen
|
||||
|
||||
lastStatus atomic.Pointer[Status]
|
||||
|
||||
paused bool // whether we should stop making HTTP requests
|
||||
unpauseWaiters []chan bool // chans that gets sent true (once) on wake, or false on Shutdown
|
||||
loggedIn bool // true if currently logged in
|
||||
@@ -596,21 +600,61 @@ func (c *Auto) sendStatus(who string, err error, url string, nm *netmap.NetworkM
|
||||
// not logged in.
|
||||
nm = nil
|
||||
}
|
||||
new := Status{
|
||||
newSt := &Status{
|
||||
URL: url,
|
||||
Persist: p,
|
||||
NetMap: nm,
|
||||
Err: err,
|
||||
state: state,
|
||||
}
|
||||
c.lastStatus.Store(newSt)
|
||||
|
||||
// Launch a new goroutine to avoid blocking the caller while the observer
|
||||
// does its thing, which may result in a call back into the client.
|
||||
c.observerQueue.Add(func() {
|
||||
c.observer.SetControlClientStatus(c, new)
|
||||
if c.canSkipStatus(newSt) {
|
||||
metricSkippable.Add(1)
|
||||
if debugSkipQueue() {
|
||||
metricSkipped.Add(1)
|
||||
return
|
||||
}
|
||||
}
|
||||
c.observer.SetControlClientStatus(c, *newSt)
|
||||
c.lastStatus.CompareAndSwap(newSt, nil)
|
||||
})
|
||||
}
|
||||
|
||||
var debugSkipQueue = envknob.RegisterBool("TS_DEBUG_SKIP_STATUS_QUEUE")
|
||||
|
||||
var (
|
||||
metricSkippable = expvar.NewInt("controlclient_auto_status_queue_skippable")
|
||||
metricSkipped = expvar.NewInt("controlclient_auto_status_queue_skipped")
|
||||
)
|
||||
|
||||
func (c *Auto) canSkipStatus(s1 *Status) bool {
|
||||
s2 := c.lastStatus.Load()
|
||||
if s2 == nil {
|
||||
return false
|
||||
}
|
||||
if s1 == s2 {
|
||||
return false
|
||||
}
|
||||
if s1.Err != nil || s1.URL != "" {
|
||||
return false
|
||||
}
|
||||
if !s1.Persist.Equals(s2.Persist) || s1.state != s2.state {
|
||||
return false
|
||||
}
|
||||
if s1.NetMap != nil && s2.NetMap != nil {
|
||||
// If s1 doesn't have an error and doesn't have a URL
|
||||
// and is otherwise identical to the latest one
|
||||
// other than the a different NetMap, then this is
|
||||
// skippable.
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *Auto) Login(flags LoginFlags) {
|
||||
c.logf("client.Login(%v)", flags)
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"tailscale.com/control/controlknobs"
|
||||
@@ -817,6 +818,8 @@ func (ms *mapSession) sortedPeers() []tailcfg.NodeView {
|
||||
return ret
|
||||
}
|
||||
|
||||
var numNetmaps atomic.Int64
|
||||
|
||||
// netmap returns a fully populated NetworkMap from the last state seen from
|
||||
// a call to updateStateFromResponse, filling in omitted
|
||||
// information from prior MapResponse values.
|
||||
@@ -841,6 +844,10 @@ func (ms *mapSession) netmap() *netmap.NetworkMap {
|
||||
TKAEnabled: ms.lastTKAInfo != nil && !ms.lastTKAInfo.Disabled,
|
||||
MaxKeyDuration: ms.lastMaxExpiry,
|
||||
}
|
||||
ms.logf("XXX NetMap++ => %v", numNetmaps.Add(1))
|
||||
runtime.SetFinalizer(nm, func(nm *netmap.NetworkMap) {
|
||||
ms.logf("XXX NetMap-- => %v", numNetmaps.Add(-1))
|
||||
})
|
||||
|
||||
if ms.lastTKAInfo != nil && ms.lastTKAInfo.Head != "" {
|
||||
if err := nm.TKAHead.UnmarshalText([]byte(ms.lastTKAInfo.Head)); err != nil {
|
||||
|
||||
@@ -7,7 +7,11 @@ package execqueue
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ExecQueue struct {
|
||||
@@ -16,9 +20,36 @@ type ExecQueue struct {
|
||||
inFlight bool // whether a goroutine is running q.run
|
||||
doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
|
||||
queue []func()
|
||||
|
||||
// metrics follow
|
||||
metricsRegisterOnce sync.Once
|
||||
metricInserts expvar.Int
|
||||
metricRemovals expvar.Int
|
||||
metricQueueLastDrain expvar.Int // unix millis
|
||||
}
|
||||
|
||||
// This is extremely silly but is for debugging
|
||||
var metricsCounter atomic.Int64
|
||||
|
||||
// registerMetrics registers the queue's metrics with expvar, using a unique name.
|
||||
func (q *ExecQueue) registerMetrics() {
|
||||
q.metricsRegisterOnce.Do(func() {
|
||||
m := new(expvar.Map).Init()
|
||||
m.Set("inserts", &q.metricInserts)
|
||||
m.Set("removals", &q.metricRemovals)
|
||||
m.Set("length", expvar.Func(func() any {
|
||||
return q.metricInserts.Value() - q.metricRemovals.Value()
|
||||
}))
|
||||
m.Set("last_drain", &q.metricQueueLastDrain)
|
||||
|
||||
name := fmt.Sprintf("execqueue-%d", metricsCounter.Add(1))
|
||||
expvar.Publish(name, m)
|
||||
})
|
||||
}
|
||||
|
||||
func (q *ExecQueue) Add(f func()) {
|
||||
q.registerMetrics()
|
||||
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
if q.closed {
|
||||
@@ -26,6 +57,7 @@ func (q *ExecQueue) Add(f func()) {
|
||||
}
|
||||
if q.inFlight {
|
||||
q.queue = append(q.queue, f)
|
||||
q.metricInserts.Add(1)
|
||||
} else {
|
||||
q.inFlight = true
|
||||
go q.run(f)
|
||||
@@ -35,6 +67,8 @@ func (q *ExecQueue) Add(f func()) {
|
||||
// RunSync waits for the queue to be drained and then synchronously runs f.
|
||||
// It returns an error if the queue is closed before f is run or ctx expires.
|
||||
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
|
||||
q.registerMetrics()
|
||||
|
||||
for {
|
||||
if err := q.Wait(ctx); err != nil {
|
||||
return err
|
||||
@@ -61,11 +95,13 @@ func (q *ExecQueue) run(f func()) {
|
||||
f := q.queue[0]
|
||||
q.queue[0] = nil
|
||||
q.queue = q.queue[1:]
|
||||
q.metricRemovals.Add(1)
|
||||
q.mu.Unlock()
|
||||
f()
|
||||
q.mu.Lock()
|
||||
}
|
||||
q.inFlight = false
|
||||
q.metricQueueLastDrain.Set(int64(time.Now().UnixMilli()))
|
||||
q.queue = nil
|
||||
if q.doneWaiter != nil {
|
||||
close(q.doneWaiter)
|
||||
@@ -76,6 +112,8 @@ func (q *ExecQueue) run(f func()) {
|
||||
|
||||
// Shutdown asynchronously signals the queue to stop.
|
||||
func (q *ExecQueue) Shutdown() {
|
||||
q.registerMetrics()
|
||||
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
q.closed = true
|
||||
@@ -83,6 +121,8 @@ func (q *ExecQueue) Shutdown() {
|
||||
|
||||
// Wait waits for the queue to be empty.
|
||||
func (q *ExecQueue) Wait(ctx context.Context) error {
|
||||
q.registerMetrics()
|
||||
|
||||
q.mu.Lock()
|
||||
waitCh := q.doneWaiter
|
||||
if q.inFlight && waitCh == nil {
|
||||
|
||||
Reference in New Issue
Block a user