Compare commits

...

9 Commits
qnap ... flakes

Author SHA1 Message Date
Flakes Updater
93aa395223 go.mod.sri: update SRI hash for go.mod changes
Signed-off-by: Flakes Updater <noreply+flakes-updater@tailscale.com>
2025-03-13 17:38:34 +00:00
Patrick O'Doherty
f0b395d851 go.mod update golang.org/x/net to 0.36.0 for govulncheck (#15296)
Updates #cleanup

Signed-off-by: Patrick O'Doherty <patrick@tailscale.com>
2025-03-13 10:37:42 -07:00
M. J. Fromberger
0663412559 util/eventbus: add basic throughput benchmarks (#15284)
Shovel small events through the pipeine as fast as possible in a few basic
configurations, to establish some baseline performance numbers.

Updates #15160

Change-Id: I1dcbbd1109abb7b93aa4dcb70da57f183eb0e60e
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-03-13 08:06:20 -07:00
Paul Scott
eb680edbce cmd/testwrapper: print failed tests preventing retry (#15270)
Updates tailscale/corp#26637

Signed-off-by: Paul Scott <paul@tailscale.com>
2025-03-13 14:21:29 +00:00
Irbe Krumina
cd391b37a6 ipn/ipnlocal, envknob: make it possible to configure the cert client to act in read-only mode (#15250)
* ipn/ipnlocal,envknob: add some primitives for HA replica cert share.

Add an envknob for configuring
an instance's cert store as read-only, so that it
does not attempt to issue or renew TLS credentials,
only reads them from its cert store.
This will be used by the Kubernetes Operator's HA Ingress
to enable multiple replicas serving the same HTTPS endpoint
to be able to share the same cert.

Also some minor refactor to allow adding more tests
for cert retrieval logic.


Signed-off-by: Irbe Krumina <irbe@tailscale.com>
2025-03-13 14:14:03 +00:00
Will Norris
45ecc0f85a tsweb: add title to DebugHandler and helper registration methods
Allow customizing the title on the debug index page.  Also add methods
for registering http.HandlerFunc to make it a little easier on callers.

Updates tailscale/corp#27058

Change-Id: Ia101a4a3005adb9118051b3416f5a64a4a45987d
Signed-off-by: Will Norris <will@tailscale.com>
2025-03-12 19:21:25 -07:00
David Anderson
6d217d81d1 util/eventbus: add a helper program for bus development
The demo program generates a stream of made up bus events between
a number of bus actors, as a way to generate some interesting activity
to show on the bus debug page.

Signed-off-by: David Anderson <dave@tailscale.com>
2025-03-12 17:47:47 -07:00
David Anderson
d83024a63f util/eventbus: add a debug HTTP handler for the bus
Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
2025-03-12 17:47:47 -07:00
Andrew Dunham
640b2fa3ae net/netmon, wgengine/magicsock: be quieter with portmapper logs
This adds a new helper to the netmon package that allows us to
rate-limit log messages, so that they only print once per (major)
LinkChange event. We then use this when constructing the portmapper, so
that we don't keep spamming logs forever on the same network.

Updates #13145

Signed-off-by: Andrew Dunham <andrew@du.nham.ca>
Change-Id: I6e7162509148abea674f96efd76be9dffb373ae4
2025-03-12 17:45:26 -04:00
25 changed files with 1152 additions and 18 deletions

View File

@@ -259,6 +259,7 @@ func main() {
fmt.Printf("\n\nAttempt #%d: Retrying flaky tests:\n\nflakytest failures JSON: %s\n\n", thisRun.attempt, j)
}
fatalFailures := make(map[string]struct{}) // pkg.Test key
toRetry := make(map[string][]*testAttempt) // pkg -> tests to retry
for _, pt := range thisRun.tests {
ch := make(chan *testAttempt)
@@ -301,11 +302,24 @@ func main() {
if tr.isMarkedFlaky {
toRetry[tr.pkg] = append(toRetry[tr.pkg], tr)
} else {
fatalFailures[tr.pkg+"."+tr.testName] = struct{}{}
failed = true
}
}
if failed {
fmt.Println("\n\nNot retrying flaky tests because non-flaky tests failed.")
// Print the list of non-flakytest failures.
// We will later analyze the retried GitHub Action runs to see
// if non-flakytest failures succeeded upon retry. This will
// highlight tests which are flaky but not yet flagged as such.
if len(fatalFailures) > 0 {
tests := slicesx.MapKeys(fatalFailures)
sort.Strings(tests)
j, _ := json.Marshal(tests)
fmt.Printf("non-flakytest failures: %s\n", j)
}
fmt.Println()
os.Exit(1)
}

View File

@@ -417,6 +417,23 @@ func App() string {
return ""
}
// IsCertShareReadOnlyMode returns true if this replica should never attempt to
// issue or renew TLS credentials for any of the HTTPS endpoints that it is
// serving. It should only return certs found in its cert store. Currently,
// this is used by the Kubernetes Operator's HA Ingress via VIPServices, where
// multiple Ingress proxy instances serve the same HTTPS endpoint with a shared
// TLS credentials. The TLS credentials should only be issued by one of the
// replicas.
// For HTTPS Ingress the operator and containerboot ensure
// that read-only replicas will not be serving the HTTPS endpoints before there
// is a shared cert available.
func IsCertShareReadOnlyMode() bool {
m := String("TS_CERT_SHARE_MODE")
return m == modeRO
}
const modeRO = "ro"
// CrashOnUnexpected reports whether the Tailscale client should panic
// on unexpected conditions. If TS_DEBUG_CRASH_ON_UNEXPECTED is set, that's
// used. Otherwise the default value is true for unstable builds.

View File

@@ -130,4 +130,4 @@
in
flake-utils.lib.eachDefaultSystem (system: flakeForSystem nixpkgs system);
}
# nix-direnv cache busting line: sha256-xO1DuLWi6/lpA9ubA2ZYVJM+CkVNA5IaVGZxX9my0j0=
# nix-direnv cache busting line: sha256-SiUkN6BQK1IQmLfkfPetzvYqRu9ENK6+6txtGxegF5Y=

2
go.mod
View File

@@ -97,7 +97,7 @@ require (
golang.org/x/crypto v0.35.0
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac
golang.org/x/mod v0.23.0
golang.org/x/net v0.35.0
golang.org/x/net v0.36.0
golang.org/x/oauth2 v0.26.0
golang.org/x/sync v0.11.0
golang.org/x/sys v0.30.0

View File

@@ -1 +1 @@
sha256-xO1DuLWi6/lpA9ubA2ZYVJM+CkVNA5IaVGZxX9my0j0=
sha256-SiUkN6BQK1IQmLfkfPetzvYqRu9ENK6+6txtGxegF5Y=

4
go.sum
View File

@@ -1135,8 +1135,8 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA=
golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

View File

@@ -119,6 +119,9 @@ func (b *LocalBackend) GetCertPEMWithValidity(ctx context.Context, domain string
}
if pair, err := getCertPEMCached(cs, domain, now); err == nil {
if envknob.IsCertShareReadOnlyMode() {
return pair, nil
}
// If we got here, we have a valid unexpired cert.
// Check whether we should start an async renewal.
shouldRenew, err := b.shouldStartDomainRenewal(cs, domain, now, pair, minValidity)
@@ -134,7 +137,7 @@ func (b *LocalBackend) GetCertPEMWithValidity(ctx context.Context, domain string
if minValidity == 0 {
logf("starting async renewal")
// Start renewal in the background, return current valid cert.
go b.getCertPEM(context.Background(), cs, logf, traceACME, domain, now, minValidity)
b.goTracker.Go(func() { getCertPEM(context.Background(), b, cs, logf, traceACME, domain, now, minValidity) })
return pair, nil
}
// If the caller requested a specific validity duration, fall through
@@ -142,7 +145,11 @@ func (b *LocalBackend) GetCertPEMWithValidity(ctx context.Context, domain string
logf("starting sync renewal")
}
pair, err := b.getCertPEM(ctx, cs, logf, traceACME, domain, now, minValidity)
if envknob.IsCertShareReadOnlyMode() {
return nil, fmt.Errorf("retrieving cached TLS certificate failed and cert store is configured in read-only mode, not attempting to issue a new certificate: %w", err)
}
pair, err := getCertPEM(ctx, b, cs, logf, traceACME, domain, now, minValidity)
if err != nil {
logf("getCertPEM: %v", err)
return nil, err
@@ -358,7 +365,29 @@ type certStateStore struct {
testRoots *x509.CertPool
}
// TLSCertKeyReader is an interface implemented by state stores where it makes
// sense to read the TLS cert and key in a single operation that can be
// distinguished from generic state value reads. Currently this is only implemented
// by the kubestore.Store, which, in some cases, need to read cert and key from a
// non-cached TLS Secret.
type TLSCertKeyReader interface {
ReadTLSCertAndKey(domain string) ([]byte, []byte, error)
}
func (s certStateStore) Read(domain string, now time.Time) (*TLSCertKeyPair, error) {
// If we're using a store that supports atomic reads, use that
if kr, ok := s.StateStore.(TLSCertKeyReader); ok {
cert, key, err := kr.ReadTLSCertAndKey(domain)
if err != nil {
return nil, err
}
if !validCertPEM(domain, key, cert, s.testRoots, now) {
return nil, errCertExpired
}
return &TLSCertKeyPair{CertPEM: cert, KeyPEM: key, Cached: true}, nil
}
// Otherwise fall back to separate reads
certPEM, err := s.ReadState(ipn.StateKey(domain + ".crt"))
if err != nil {
return nil, err
@@ -446,7 +475,9 @@ func getCertPEMCached(cs certStore, domain string, now time.Time) (p *TLSCertKey
return cs.Read(domain, now)
}
func (b *LocalBackend) getCertPEM(ctx context.Context, cs certStore, logf logger.Logf, traceACME func(any), domain string, now time.Time, minValidity time.Duration) (*TLSCertKeyPair, error) {
// getCertPem checks if a cert needs to be renewed and if so, renews it.
// It can be overridden in tests.
var getCertPEM = func(ctx context.Context, b *LocalBackend, cs certStore, logf logger.Logf, traceACME func(any), domain string, now time.Time, minValidity time.Duration) (*TLSCertKeyPair, error) {
acmeMu.Lock()
defer acmeMu.Unlock()

View File

@@ -6,6 +6,7 @@
package ipnlocal
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
@@ -14,11 +15,17 @@ import (
"embed"
"encoding/pem"
"math/big"
"os"
"path/filepath"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"tailscale.com/envknob"
"tailscale.com/ipn/store/mem"
"tailscale.com/tstest"
"tailscale.com/types/logger"
"tailscale.com/util/must"
)
func TestValidLookingCertDomain(t *testing.T) {
@@ -221,3 +228,151 @@ func TestDebugACMEDirectoryURL(t *testing.T) {
})
}
}
func TestGetCertPEMWithValidity(t *testing.T) {
const testDomain = "example.com"
b := &LocalBackend{
store: &mem.Store{},
varRoot: t.TempDir(),
ctx: context.Background(),
logf: t.Logf,
}
certDir, err := b.certDir()
if err != nil {
t.Fatalf("certDir error: %v", err)
}
if _, err := b.getCertStore(); err != nil {
t.Fatalf("getCertStore error: %v", err)
}
testRoot, err := certTestFS.ReadFile("testdata/rootCA.pem")
if err != nil {
t.Fatal(err)
}
roots := x509.NewCertPool()
if !roots.AppendCertsFromPEM(testRoot) {
t.Fatal("Unable to add test CA to the cert pool")
}
testX509Roots = roots
defer func() { testX509Roots = nil }()
tests := []struct {
name string
now time.Time
// storeCerts is true if the test cert and key should be written to store.
storeCerts bool
readOnlyMode bool // TS_READ_ONLY_CERTS env var
wantAsyncRenewal bool // async issuance should be started
wantIssuance bool // sync issuance should be started
wantErr bool
}{
{
name: "valid_no_renewal",
now: time.Date(2023, time.February, 20, 0, 0, 0, 0, time.UTC),
storeCerts: true,
wantAsyncRenewal: false,
wantIssuance: false,
wantErr: false,
},
{
name: "issuance_needed",
now: time.Date(2023, time.February, 20, 0, 0, 0, 0, time.UTC),
storeCerts: false,
wantAsyncRenewal: false,
wantIssuance: true,
wantErr: false,
},
{
name: "renewal_needed",
now: time.Date(2025, time.May, 1, 0, 0, 0, 0, time.UTC),
storeCerts: true,
wantAsyncRenewal: true,
wantIssuance: false,
wantErr: false,
},
{
name: "renewal_needed_read_only_mode",
now: time.Date(2025, time.May, 1, 0, 0, 0, 0, time.UTC),
storeCerts: true,
readOnlyMode: true,
wantAsyncRenewal: false,
wantIssuance: false,
wantErr: false,
},
{
name: "no_certs_read_only_mode",
now: time.Date(2025, time.May, 1, 0, 0, 0, 0, time.UTC),
storeCerts: false,
readOnlyMode: true,
wantAsyncRenewal: false,
wantIssuance: false,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.readOnlyMode {
envknob.Setenv("TS_CERT_SHARE_MODE", "ro")
}
os.RemoveAll(certDir)
if tt.storeCerts {
os.MkdirAll(certDir, 0755)
if err := os.WriteFile(filepath.Join(certDir, "example.com.crt"),
must.Get(os.ReadFile("testdata/example.com.pem")), 0644); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(certDir, "example.com.key"),
must.Get(os.ReadFile("testdata/example.com-key.pem")), 0644); err != nil {
t.Fatal(err)
}
}
b.clock = tstest.NewClock(tstest.ClockOpts{Start: tt.now})
allDone := make(chan bool, 1)
defer b.goTracker.AddDoneCallback(func() {
b.mu.Lock()
defer b.mu.Unlock()
if b.goTracker.RunningGoroutines() > 0 {
return
}
select {
case allDone <- true:
default:
}
})()
// Set to true if get getCertPEM is called. GetCertPEM can be called in a goroutine for async
// renewal or in the main goroutine if issuance is required to obtain valid TLS credentials.
getCertPemWasCalled := false
getCertPEM = func(ctx context.Context, b *LocalBackend, cs certStore, logf logger.Logf, traceACME func(any), domain string, now time.Time, minValidity time.Duration) (*TLSCertKeyPair, error) {
getCertPemWasCalled = true
return nil, nil
}
prevGoRoutines := b.goTracker.StartedGoroutines()
_, err = b.GetCertPEMWithValidity(context.Background(), testDomain, 0)
if (err != nil) != tt.wantErr {
t.Errorf("b.GetCertPemWithValidity got err %v, wants error: '%v'", err, tt.wantErr)
}
// GetCertPEMWithValidity calls getCertPEM in a goroutine if async renewal is needed. That's the
// only goroutine it starts, so this can be used to test if async renewal was started.
gotAsyncRenewal := b.goTracker.StartedGoroutines()-prevGoRoutines != 0
if gotAsyncRenewal {
select {
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for goroutines to finish")
case <-allDone:
}
}
// Verify that async renewal was triggered if expected.
if tt.wantAsyncRenewal != gotAsyncRenewal {
t.Fatalf("wants getCertPem to be called async: %v, got called %v", tt.wantAsyncRenewal, gotAsyncRenewal)
}
// Verify that (non-async) issuance was started if expected.
gotIssuance := getCertPemWasCalled && !gotAsyncRenewal
if tt.wantIssuance != gotIssuance {
t.Errorf("wants getCertPem to be called: %v, got called %v", tt.wantIssuance, gotIssuance)
}
})
}
}

42
net/netmon/loghelper.go Normal file
View File

@@ -0,0 +1,42 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package netmon
import (
"sync"
"tailscale.com/types/logger"
)
// LinkChangeLogLimiter returns a new [logger.Logf] that logs each unique
// format string to the underlying logger only once per major LinkChange event.
//
// The returned function should be called when the logger is no longer needed,
// to release resources from the Monitor.
func LinkChangeLogLimiter(logf logger.Logf, nm *Monitor) (_ logger.Logf, unregister func()) {
var formatSeen sync.Map // map[string]bool
unregister = nm.RegisterChangeCallback(func(cd *ChangeDelta) {
// If we're in a major change or a time jump, clear the seen map.
if cd.Major || cd.TimeJumped {
formatSeen.Clear()
}
})
return func(format string, args ...any) {
// We only store 'true' in the map, so if it's present then it
// means we've already logged this format string.
_, loaded := formatSeen.LoadOrStore(format, true)
if loaded {
// TODO(andrew-d): we may still want to log this
// message every N minutes (1x/hour?) even if it's been
// seen, so that debugging doesn't require searching
// back in the logs for an unbounded amount of time.
//
// See: https://github.com/tailscale/tailscale/issues/13145
return
}
logf(format, args...)
}, unregister
}

View File

@@ -0,0 +1,78 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package netmon
import (
"bytes"
"fmt"
"testing"
)
func TestLinkChangeLogLimiter(t *testing.T) {
mon, err := New(t.Logf)
if err != nil {
t.Fatal(err)
}
defer mon.Close()
var logBuffer bytes.Buffer
logf := func(format string, args ...any) {
t.Logf("captured log: "+format, args...)
if format[len(format)-1] != '\n' {
format += "\n"
}
fmt.Fprintf(&logBuffer, format, args...)
}
logf, unregister := LinkChangeLogLimiter(logf, mon)
defer unregister()
// Log once, which should write to our log buffer.
logf("hello %s", "world")
if got := logBuffer.String(); got != "hello world\n" {
t.Errorf("unexpected log buffer contents: %q", got)
}
// Log again, which should not write to our log buffer.
logf("hello %s", "andrew")
if got := logBuffer.String(); got != "hello world\n" {
t.Errorf("unexpected log buffer contents: %q", got)
}
// Log a different message, which should write to our log buffer.
logf("other message")
if got := logBuffer.String(); got != "hello world\nother message\n" {
t.Errorf("unexpected log buffer contents: %q", got)
}
// Synthesize a fake major change event, which should clear the format
// string cache and allow the next log to write to our log buffer.
//
// InjectEvent doesn't work because it's not a major event, so we
// instead reach into the netmon and grab the callback, and then call
// it ourselves.
mon.mu.Lock()
var cb func(*ChangeDelta)
for _, c := range mon.cbs {
cb = c
break
}
mon.mu.Unlock()
cb(&ChangeDelta{Major: true})
logf("hello %s", "world")
if got := logBuffer.String(); got != "hello world\nother message\nhello world\n" {
t.Errorf("unexpected log buffer contents: %q", got)
}
// Unregistering the callback should clear our 'cbs' set.
unregister()
mon.mu.Lock()
if len(mon.cbs) != 0 {
t.Errorf("expected no callbacks, got %v", mon.cbs)
}
mon.mu.Unlock()
}

View File

@@ -16,4 +16,4 @@
) {
src = ./.;
}).shellNix
# nix-direnv cache busting line: sha256-xO1DuLWi6/lpA9ubA2ZYVJM+CkVNA5IaVGZxX9my0j0=
# nix-direnv cache busting line: sha256-SiUkN6BQK1IQmLfkfPetzvYqRu9ENK6+6txtGxegF5Y=

View File

@@ -34,6 +34,7 @@ type DebugHandler struct {
kvs []func(io.Writer) // output one <li>...</li> each, see KV()
urls []string // one <li>...</li> block with link each
sections []func(io.Writer, *http.Request) // invoked in registration order prior to outputting </body>
title string // title displayed on index page
}
// Debugger returns the DebugHandler registered on mux at /debug/,
@@ -44,7 +45,8 @@ func Debugger(mux *http.ServeMux) *DebugHandler {
return d
}
ret := &DebugHandler{
mux: mux,
mux: mux,
title: fmt.Sprintf("%s debug", version.CmdName()),
}
mux.Handle("/debug/", ret)
@@ -85,7 +87,7 @@ func (d *DebugHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
AddBrowserHeaders(w)
f := func(format string, args ...any) { fmt.Fprintf(w, format, args...) }
f("<html><body><h1>%s debug</h1><ul>", version.CmdName())
f("<html><body><h1>%s</h1><ul>", html.EscapeString(d.title))
for _, kv := range d.kvs {
kv(w)
}
@@ -103,14 +105,20 @@ func (d *DebugHandler) handle(slug string, handler http.Handler) string {
return href
}
// Handle registers handler at /debug/<slug> and creates a descriptive
// entry in /debug/ for it.
// Handle registers handler at /debug/<slug> and adds a link to it
// on /debug/ with the provided description.
func (d *DebugHandler) Handle(slug, desc string, handler http.Handler) {
href := d.handle(slug, handler)
d.URL(href, desc)
}
// HandleSilent registers handler at /debug/<slug>. It does not create
// Handle registers handler at /debug/<slug> and adds a link to it
// on /debug/ with the provided description.
func (d *DebugHandler) HandleFunc(slug, desc string, handler http.HandlerFunc) {
d.Handle(slug, desc, handler)
}
// HandleSilent registers handler at /debug/<slug>. It does not add
// a descriptive entry in /debug/ for it. This should be used
// sparingly, for things that need to be registered but would pollute
// the list of debug links.
@@ -118,6 +126,14 @@ func (d *DebugHandler) HandleSilent(slug string, handler http.Handler) {
d.handle(slug, handler)
}
// HandleSilent registers handler at /debug/<slug>. It does not add
// a descriptive entry in /debug/ for it. This should be used
// sparingly, for things that need to be registered but would pollute
// the list of debug links.
func (d *DebugHandler) HandleSilentFunc(slug string, handler http.HandlerFunc) {
d.HandleSilent(slug, handler)
}
// KV adds a key/value list item to /debug/.
func (d *DebugHandler) KV(k string, v any) {
val := html.EscapeString(fmt.Sprintf("%v", v))
@@ -149,6 +165,11 @@ func (d *DebugHandler) Section(f func(w io.Writer, r *http.Request)) {
d.sections = append(d.sections, f)
}
// Title sets the title at the top of the debug page.
func (d *DebugHandler) Title(title string) {
d.title = title
}
func gcHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("running GC...\n"))
if f, ok := w.(http.Flusher); ok {

View File

@@ -0,0 +1,6 @@
<li id="monitor" hx-swap-oob="afterbegin">
<details>
<summary>{{.Count}}: {{.Type}} from {{.Event.From.Name}}, {{len .Event.To}} recipients</summary>
{{.Event.Event}}
</details>
</li>

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,97 @@
<!DOCTYPE html>
<html>
<head>
<script src="bus/htmx.min.js"></script>
<script src="bus/htmx-websocket.min.js"></script>
<link rel="stylesheet" href="bus/style.css">
</head>
<body hx-ext="ws">
<h1>Event bus</h1>
<section>
<h2>General</h2>
{{with $.PublishQueue}}
{{len .}} pending
{{end}}
<button hx-post="bus/monitor" hx-swap="outerHTML">Monitor all events</button>
</section>
<section>
<h2>Clients</h2>
<table>
<thead>
<tr>
<th>Name</th>
<th>Publishing</th>
<th>Subscribing</th>
<th>Pending</th>
</tr>
</thead>
{{range .Clients}}
<tr id="{{.Name}}">
<td>{{.Name}}</td>
<td class="list">
<ul>
{{range .Publish}}
<li><a href="#{{.}}">{{.}}</a></li>
{{end}}
</ul>
</td>
<td class="list">
<ul>
{{range .Subscribe}}
<li><a href="#{{.}}">{{.}}</a></li>
{{end}}
</ul>
</td>
<td>
{{len ($.SubscribeQueue .Client)}}
</td>
</tr>
{{end}}
</table>
</section>
<section>
<h2>Types</h2>
{{range .Types}}
<section id="{{.}}">
<h3>{{.Name}}</h3>
<h4>Definition</h4>
<code>{{prettyPrintStruct .}}</code>
<h4>Published by:</h4>
{{if len (.Publish)}}
<ul>
{{range .Publish}}
<li><a href="#{{.Name}}">{{.Name}}</a></li>
{{end}}
</ul>
{{else}}
<ul>
<li>No publishers.</li>
</ul>
{{end}}
<h4>Received by:</h4>
{{if len (.Subscribe)}}
<ul>
{{range .Subscribe}}
<li><a href="#{{.Name}}">{{.Name}}</a></li>
{{end}}
</ul>
{{else}}
<ul>
<li>No subscribers.</li>
</ul>
{{end}}
</section>
{{end}}
</section>
</body>
</html>

View File

@@ -0,0 +1,5 @@
<div>
<ul id="monitor" ws-connect="bus/monitor">
</ul>
<button hx-get="bus" hx-target="body">Stop monitoring</button>
</div>

View File

@@ -0,0 +1,90 @@
/* CSS reset, thanks Josh Comeau: https://www.joshwcomeau.com/css/custom-css-reset/ */
*, *::before, *::after { box-sizing: border-box; }
* { margin: 0; }
input, button, textarea, select { font: inherit; }
p, h1, h2, h3, h4, h5, h6 { overflow-wrap: break-word; }
p { text-wrap: pretty; }
h1, h2, h3, h4, h5, h6 { text-wrap: balance; }
#root, #__next { isolation: isolate; }
body {
line-height: 1.5;
-webkit-font-smoothing: antialiased;
}
img, picture, video, canvas, svg {
display: block;
max-width: 100%;
}
/* Local styling begins */
body {
padding: 12px;
}
div {
width: 100%;
}
section {
display: flex;
flex-direction: column;
flex-gap: 6px;
align-items: flex-start;
padding: 12px 0;
}
section > * {
margin-left: 24px;
}
section > h2, section > h3 {
margin-left: 0;
padding-bottom: 6px;
padding-top: 12px;
}
details {
padding-bottom: 12px;
}
table {
table-layout: fixed;
width: calc(100% - 48px);
border-collapse: collapse;
border: 1px solid black;
}
th, td {
padding: 12px;
border: 1px solid black;
}
td.list {
vertical-align: top;
}
ul {
list-style: none;
}
td ul {
margin: 0;
padding: 0;
}
code {
padding: 12px;
white-space: pre;
}
#monitor {
width: calc(100% - 48px);
resize: vertical;
padding: 12px;
overflow: scroll;
height: 15lh;
border: 1px inset;
min-height: 1em;
display: flex;
flex-direction: column-reverse;
}

125
util/eventbus/bench_test.go Normal file
View File

@@ -0,0 +1,125 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package eventbus_test
import (
"math/rand/v2"
"testing"
"tailscale.com/util/eventbus"
)
func BenchmarkBasicThroughput(b *testing.B) {
bus := eventbus.New()
pcli := bus.Client(b.Name() + "-pub")
scli := bus.Client(b.Name() + "-sub")
type emptyEvent [0]byte
// One publisher and a corresponding subscriber shoveling events as fast as
// they can through the plumbing.
pub := eventbus.Publish[emptyEvent](pcli)
sub := eventbus.Subscribe[emptyEvent](scli)
go func() {
for {
select {
case <-sub.Events():
continue
case <-sub.Done():
return
}
}
}()
for b.Loop() {
pub.Publish(emptyEvent{})
}
bus.Close()
}
func BenchmarkSubsThroughput(b *testing.B) {
bus := eventbus.New()
pcli := bus.Client(b.Name() + "-pub")
scli1 := bus.Client(b.Name() + "-sub1")
scli2 := bus.Client(b.Name() + "-sub2")
type emptyEvent [0]byte
// One publisher and two subscribers shoveling events as fast as they can
// through the plumbing.
pub := eventbus.Publish[emptyEvent](pcli)
sub1 := eventbus.Subscribe[emptyEvent](scli1)
sub2 := eventbus.Subscribe[emptyEvent](scli2)
for _, sub := range []*eventbus.Subscriber[emptyEvent]{sub1, sub2} {
go func() {
for {
select {
case <-sub.Events():
continue
case <-sub.Done():
return
}
}
}()
}
for b.Loop() {
pub.Publish(emptyEvent{})
}
bus.Close()
}
func BenchmarkMultiThroughput(b *testing.B) {
bus := eventbus.New()
cli := bus.Client(b.Name())
type eventA struct{}
type eventB struct{}
// Two disjoint event streams routed through the global order.
apub := eventbus.Publish[eventA](cli)
asub := eventbus.Subscribe[eventA](cli)
bpub := eventbus.Publish[eventB](cli)
bsub := eventbus.Subscribe[eventB](cli)
go func() {
for {
select {
case <-asub.Events():
continue
case <-asub.Done():
return
}
}
}()
go func() {
for {
select {
case <-bsub.Events():
continue
case <-bsub.Done():
return
}
}
}()
var rng uint64
var bits int
for b.Loop() {
if bits == 0 {
rng = rand.Uint64()
bits = 64
}
if rng&1 == 0 {
apub.Publish(eventA{})
} else {
bpub.Publish(eventB{})
}
rng >>= 1
bits--
}
bus.Close()
}

View File

@@ -73,8 +73,8 @@ func (b *Bus) Client(name string) *Client {
}
// Debugger returns the debugging facility for the bus.
func (b *Bus) Debugger() Debugger {
return Debugger{b}
func (b *Bus) Debugger() *Debugger {
return &Debugger{b}
}
// Close closes the bus. Implicitly closes all clients, publishers and

View File

@@ -0,0 +1,103 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// debug-demo is a program that serves a bus's debug interface over
// HTTP, then generates some fake traffic from a handful of
// clients. It is an aid to development, to have something to present
// on the debug interfaces while writing them.
package main
import (
"log"
"math/rand/v2"
"net/http"
"net/netip"
"time"
"tailscale.com/tsweb"
"tailscale.com/types/key"
"tailscale.com/util/eventbus"
)
func main() {
b := eventbus.New()
c := b.Client("RouteMonitor")
go testPub[RouteAdded](c, 5*time.Second)
go testPub[RouteRemoved](c, 5*time.Second)
c = b.Client("ControlClient")
go testPub[PeerAdded](c, 3*time.Second)
go testPub[PeerRemoved](c, 6*time.Second)
c = b.Client("Portmapper")
go testPub[PortmapAcquired](c, 10*time.Second)
go testPub[PortmapLost](c, 15*time.Second)
go testSub[RouteAdded](c)
c = b.Client("WireguardConfig")
go testSub[PeerAdded](c)
go testSub[PeerRemoved](c)
c = b.Client("Magicsock")
go testPub[PeerPathChanged](c, 5*time.Second)
go testSub[RouteAdded](c)
go testSub[RouteRemoved](c)
go testSub[PortmapAcquired](c)
go testSub[PortmapLost](c)
m := http.NewServeMux()
d := tsweb.Debugger(m)
b.Debugger().RegisterHTTP(d)
m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/debug/bus", http.StatusFound)
})
log.Printf("Serving debug interface at http://localhost:8185/debug/bus")
http.ListenAndServe(":8185", m)
}
func testPub[T any](c *eventbus.Client, every time.Duration) {
p := eventbus.Publish[T](c)
for {
jitter := time.Duration(rand.N(2000)) * time.Millisecond
time.Sleep(jitter)
var zero T
log.Printf("%s publish: %T", c.Name(), zero)
p.Publish(zero)
time.Sleep(every)
}
}
func testSub[T any](c *eventbus.Client) {
s := eventbus.Subscribe[T](c)
for v := range s.Events() {
log.Printf("%s received: %T", c.Name(), v)
}
}
type RouteAdded struct {
Prefix netip.Prefix
Via netip.Addr
Priority int
}
type RouteRemoved struct {
Prefix netip.Addr
}
type PeerAdded struct {
ID int
Key key.NodePublic
}
type PeerRemoved struct {
ID int
Key key.NodePublic
}
type PortmapAcquired struct {
Endpoint netip.Addr
}
type PortmapLost struct {
Endpoint netip.Addr
}
type PeerPathChanged struct {
ID int
EndpointID int
Quality int
}

View File

@@ -4,11 +4,14 @@
package eventbus
import (
"cmp"
"fmt"
"reflect"
"slices"
"sync"
"sync/atomic"
"tailscale.com/tsweb"
)
// A Debugger offers access to a bus's privileged introspection and
@@ -29,7 +32,11 @@ type Debugger struct {
// Clients returns a list of all clients attached to the bus.
func (d *Debugger) Clients() []*Client {
return d.bus.listClients()
ret := d.bus.listClients()
slices.SortFunc(ret, func(a, b *Client) int {
return cmp.Compare(a.Name(), b.Name())
})
return ret
}
// PublishQueue returns the contents of the publish queue.
@@ -130,6 +137,8 @@ func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type {
return client.subscribeTypes()
}
func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler) { registerHTTPDebugger(d, td) }
// A hook collects hook functions that can be run as a group.
type hook[T any] struct {
sync.Mutex

238
util/eventbus/debughttp.go Normal file
View File

@@ -0,0 +1,238 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package eventbus
import (
"bytes"
"cmp"
"embed"
"fmt"
"html/template"
"io"
"io/fs"
"log"
"net/http"
"path/filepath"
"reflect"
"slices"
"strings"
"sync"
"github.com/coder/websocket"
"tailscale.com/tsweb"
)
type httpDebugger struct {
*Debugger
}
func registerHTTPDebugger(d *Debugger, td *tsweb.DebugHandler) {
dh := httpDebugger{d}
td.Handle("bus", "Event bus", dh)
td.HandleSilent("bus/monitor", http.HandlerFunc(dh.serveMonitor))
td.HandleSilent("bus/style.css", serveStatic("style.css"))
td.HandleSilent("bus/htmx.min.js", serveStatic("htmx.min.js.gz"))
td.HandleSilent("bus/htmx-websocket.min.js", serveStatic("htmx-websocket.min.js.gz"))
}
//go:embed assets/*.html
var templatesSrc embed.FS
var templates = sync.OnceValue(func() *template.Template {
d, err := fs.Sub(templatesSrc, "assets")
if err != nil {
panic(fmt.Errorf("getting eventbus debughttp templates subdir: %w", err))
}
ret := template.New("").Funcs(map[string]any{
"prettyPrintStruct": prettyPrintStruct,
})
return template.Must(ret.ParseFS(d, "*"))
})
//go:generate go run fetch-htmx.go
//go:embed assets/*.css assets/*.min.js.gz
var static embed.FS
func serveStatic(name string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(name, ".css"):
w.Header().Set("Content-Type", "text/css")
case strings.HasSuffix(name, ".min.js.gz"):
w.Header().Set("Content-Type", "text/javascript")
w.Header().Set("Content-Encoding", "gzip")
case strings.HasSuffix(name, ".js"):
w.Header().Set("Content-Type", "text/javascript")
default:
http.Error(w, "not found", http.StatusNotFound)
return
}
f, err := static.Open(filepath.Join("assets", name))
if err != nil {
http.Error(w, fmt.Sprintf("opening asset: %v", err), http.StatusInternalServerError)
return
}
defer f.Close()
if _, err := io.Copy(w, f); err != nil {
http.Error(w, fmt.Sprintf("serving asset: %v", err), http.StatusInternalServerError)
return
}
})
}
func render(w http.ResponseWriter, name string, data any) {
err := templates().ExecuteTemplate(w, name+".html", data)
if err != nil {
err := fmt.Errorf("rendering template: %v", err)
log.Print(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func (h httpDebugger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type clientInfo struct {
*Client
Publish []reflect.Type
Subscribe []reflect.Type
}
type typeInfo struct {
reflect.Type
Publish []*Client
Subscribe []*Client
}
type info struct {
*Debugger
Clients map[string]*clientInfo
Types map[string]*typeInfo
}
data := info{
Debugger: h.Debugger,
Clients: map[string]*clientInfo{},
Types: map[string]*typeInfo{},
}
getTypeInfo := func(t reflect.Type) *typeInfo {
if data.Types[t.Name()] == nil {
data.Types[t.Name()] = &typeInfo{
Type: t,
}
}
return data.Types[t.Name()]
}
for _, c := range h.Clients() {
ci := &clientInfo{
Client: c,
Publish: h.PublishTypes(c),
Subscribe: h.SubscribeTypes(c),
}
slices.SortFunc(ci.Publish, func(a, b reflect.Type) int { return cmp.Compare(a.Name(), b.Name()) })
slices.SortFunc(ci.Subscribe, func(a, b reflect.Type) int { return cmp.Compare(a.Name(), b.Name()) })
data.Clients[c.Name()] = ci
for _, t := range ci.Publish {
ti := getTypeInfo(t)
ti.Publish = append(ti.Publish, c)
}
for _, t := range ci.Subscribe {
ti := getTypeInfo(t)
ti.Subscribe = append(ti.Subscribe, c)
}
}
render(w, "main", data)
}
func (h httpDebugger) serveMonitor(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Upgrade") == "websocket" {
h.serveMonitorStream(w, r)
return
}
render(w, "monitor", nil)
}
func (h httpDebugger) serveMonitorStream(w http.ResponseWriter, r *http.Request) {
conn, err := websocket.Accept(w, r, nil)
if err != nil {
return
}
defer conn.CloseNow()
wsCtx := conn.CloseRead(r.Context())
mon := h.WatchBus()
defer mon.Close()
i := 0
for {
select {
case <-r.Context().Done():
return
case <-wsCtx.Done():
return
case <-mon.Done():
return
case event := <-mon.Events():
msg, err := conn.Writer(r.Context(), websocket.MessageText)
if err != nil {
return
}
data := map[string]any{
"Count": i,
"Type": reflect.TypeOf(event.Event),
"Event": event,
}
i++
if err := templates().ExecuteTemplate(msg, "event.html", data); err != nil {
log.Println(err)
return
}
if err := msg.Close(); err != nil {
return
}
}
}
}
func prettyPrintStruct(t reflect.Type) string {
if t.Kind() != reflect.Struct {
return t.String()
}
var rec func(io.Writer, int, reflect.Type)
rec = func(out io.Writer, indent int, t reflect.Type) {
ind := strings.Repeat(" ", indent)
fmt.Fprintf(out, "%s", t.String())
fs := collectFields(t)
if len(fs) > 0 {
io.WriteString(out, " {\n")
for _, f := range fs {
fmt.Fprintf(out, "%s %s ", ind, f.Name)
if f.Type.Kind() == reflect.Struct {
rec(out, indent+1, f.Type)
} else {
fmt.Fprint(out, f.Type)
}
io.WriteString(out, "\n")
}
fmt.Fprintf(out, "%s}", ind)
}
}
var ret bytes.Buffer
rec(&ret, 0, t)
return ret.String()
}
func collectFields(t reflect.Type) (ret []reflect.StructField) {
for _, f := range reflect.VisibleFields(t) {
if !f.IsExported() {
continue
}
ret = append(ret, f)
}
return ret
}

View File

@@ -0,0 +1,93 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build ignore
// Program fetch-htmx fetches and installs local copies of the HTMX
// library and its dependencies, used by the debug UI. It is meant to
// be run via go generate.
package main
import (
"compress/gzip"
"crypto/sha512"
"encoding/base64"
"fmt"
"io"
"log"
"net/http"
"os"
)
func main() {
// Hash from https://htmx.org/docs/#installing
htmx, err := fetchHashed("https://unpkg.com/htmx.org@2.0.4", "HGfztofotfshcF7+8n44JQL2oJmowVChPTg48S+jvZoztPfvwD79OC/LTtG6dMp+")
if err != nil {
log.Fatalf("fetching htmx: %v", err)
}
// Hash SHOULD be from https://htmx.org/extensions/ws/ , but the
// hash is currently incorrect, see
// https://github.com/bigskysoftware/htmx-extensions/issues/153
//
// Until that bug is resolved, hash was obtained by rebuilding the
// extension from git source, and verifying that the hash matches
// what unpkg is serving.
ws, err := fetchHashed("https://unpkg.com/htmx-ext-ws@2.0.2", "932iIqjARv+Gy0+r6RTGrfCkCKS5MsF539Iqf6Vt8L4YmbnnWI2DSFoMD90bvXd0")
if err != nil {
log.Fatalf("fetching htmx-websockets: %v", err)
}
if err := writeGz("assets/htmx.min.js.gz", htmx); err != nil {
log.Fatalf("writing htmx.min.js.gz: %v", err)
}
if err := writeGz("assets/htmx-websocket.min.js.gz", ws); err != nil {
log.Fatalf("writing htmx-websocket.min.js.gz: %v", err)
}
}
func writeGz(path string, bs []byte) error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
g, err := gzip.NewWriterLevel(f, gzip.BestCompression)
if err != nil {
return err
}
if _, err := g.Write(bs); err != nil {
return err
}
if err := g.Flush(); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
return nil
}
func fetchHashed(url, wantHash string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("fetching %q returned error status: %s", url, resp.Status)
}
ret, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading file from %q: %v", url, err)
}
h := sha512.Sum384(ret)
got := base64.StdEncoding.EncodeToString(h[:])
if got != wantHash {
return nil, fmt.Errorf("wrong hash for %q: got %q, want %q", url, got, wantHash)
}
return ret, nil
}

View File

@@ -177,6 +177,10 @@ type Conn struct {
// port mappings from NAT devices.
portMapper *portmapper.Client
// portMapperLogfUnregister is the function to call to unregister
// the portmapper log limiter.
portMapperLogfUnregister func()
// derpRecvCh is used by receiveDERP to read DERP messages.
// It must have buffer size > 0; see issue 3736.
derpRecvCh chan derpReadResult
@@ -532,10 +536,15 @@ func NewConn(opts Options) (*Conn, error) {
c.idleFunc = opts.IdleFunc
c.testOnlyPacketListener = opts.TestOnlyPacketListener
c.noteRecvActivity = opts.NoteRecvActivity
// Don't log the same log messages possibly every few seconds in our
// portmapper.
portmapperLogf := logger.WithPrefix(c.logf, "portmapper: ")
portmapperLogf, c.portMapperLogfUnregister = netmon.LinkChangeLogLimiter(portmapperLogf, opts.NetMon)
portMapOpts := &portmapper.DebugKnobs{
DisableAll: func() bool { return opts.DisablePortMapper || c.onlyTCP443.Load() },
}
c.portMapper = portmapper.NewClient(logger.WithPrefix(c.logf, "portmapper: "), opts.NetMon, portMapOpts, opts.ControlKnobs, c.onPortMapChanged)
c.portMapper = portmapper.NewClient(portmapperLogf, opts.NetMon, portMapOpts, opts.ControlKnobs, c.onPortMapChanged)
c.portMapper.SetGatewayLookupFunc(opts.NetMon.GatewayAndSelfIP)
c.netMon = opts.NetMon
c.health = opts.HealthTracker
@@ -2481,6 +2490,7 @@ func (c *Conn) Close() error {
}
c.stopPeriodicReSTUNTimerLocked()
c.portMapper.Close()
c.portMapperLogfUnregister()
c.peerMap.forEachEndpoint(func(ep *endpoint) {
ep.stopAndReset()