Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93aa395223 | ||
|
|
f0b395d851 | ||
|
|
0663412559 | ||
|
|
eb680edbce | ||
|
|
cd391b37a6 | ||
|
|
45ecc0f85a | ||
|
|
6d217d81d1 | ||
|
|
d83024a63f | ||
|
|
640b2fa3ae |
@@ -259,6 +259,7 @@ func main() {
|
||||
fmt.Printf("\n\nAttempt #%d: Retrying flaky tests:\n\nflakytest failures JSON: %s\n\n", thisRun.attempt, j)
|
||||
}
|
||||
|
||||
fatalFailures := make(map[string]struct{}) // pkg.Test key
|
||||
toRetry := make(map[string][]*testAttempt) // pkg -> tests to retry
|
||||
for _, pt := range thisRun.tests {
|
||||
ch := make(chan *testAttempt)
|
||||
@@ -301,11 +302,24 @@ func main() {
|
||||
if tr.isMarkedFlaky {
|
||||
toRetry[tr.pkg] = append(toRetry[tr.pkg], tr)
|
||||
} else {
|
||||
fatalFailures[tr.pkg+"."+tr.testName] = struct{}{}
|
||||
failed = true
|
||||
}
|
||||
}
|
||||
if failed {
|
||||
fmt.Println("\n\nNot retrying flaky tests because non-flaky tests failed.")
|
||||
|
||||
// Print the list of non-flakytest failures.
|
||||
// We will later analyze the retried GitHub Action runs to see
|
||||
// if non-flakytest failures succeeded upon retry. This will
|
||||
// highlight tests which are flaky but not yet flagged as such.
|
||||
if len(fatalFailures) > 0 {
|
||||
tests := slicesx.MapKeys(fatalFailures)
|
||||
sort.Strings(tests)
|
||||
j, _ := json.Marshal(tests)
|
||||
fmt.Printf("non-flakytest failures: %s\n", j)
|
||||
}
|
||||
fmt.Println()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
||||
@@ -417,6 +417,23 @@ func App() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// IsCertShareReadOnlyMode returns true if this replica should never attempt to
|
||||
// issue or renew TLS credentials for any of the HTTPS endpoints that it is
|
||||
// serving. It should only return certs found in its cert store. Currently,
|
||||
// this is used by the Kubernetes Operator's HA Ingress via VIPServices, where
|
||||
// multiple Ingress proxy instances serve the same HTTPS endpoint with a shared
|
||||
// TLS credentials. The TLS credentials should only be issued by one of the
|
||||
// replicas.
|
||||
// For HTTPS Ingress the operator and containerboot ensure
|
||||
// that read-only replicas will not be serving the HTTPS endpoints before there
|
||||
// is a shared cert available.
|
||||
func IsCertShareReadOnlyMode() bool {
|
||||
m := String("TS_CERT_SHARE_MODE")
|
||||
return m == modeRO
|
||||
}
|
||||
|
||||
const modeRO = "ro"
|
||||
|
||||
// CrashOnUnexpected reports whether the Tailscale client should panic
|
||||
// on unexpected conditions. If TS_DEBUG_CRASH_ON_UNEXPECTED is set, that's
|
||||
// used. Otherwise the default value is true for unstable builds.
|
||||
|
||||
@@ -130,4 +130,4 @@
|
||||
in
|
||||
flake-utils.lib.eachDefaultSystem (system: flakeForSystem nixpkgs system);
|
||||
}
|
||||
# nix-direnv cache busting line: sha256-xO1DuLWi6/lpA9ubA2ZYVJM+CkVNA5IaVGZxX9my0j0=
|
||||
# nix-direnv cache busting line: sha256-SiUkN6BQK1IQmLfkfPetzvYqRu9ENK6+6txtGxegF5Y=
|
||||
|
||||
2
go.mod
2
go.mod
@@ -97,7 +97,7 @@ require (
|
||||
golang.org/x/crypto v0.35.0
|
||||
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac
|
||||
golang.org/x/mod v0.23.0
|
||||
golang.org/x/net v0.35.0
|
||||
golang.org/x/net v0.36.0
|
||||
golang.org/x/oauth2 v0.26.0
|
||||
golang.org/x/sync v0.11.0
|
||||
golang.org/x/sys v0.30.0
|
||||
|
||||
@@ -1 +1 @@
|
||||
sha256-xO1DuLWi6/lpA9ubA2ZYVJM+CkVNA5IaVGZxX9my0j0=
|
||||
sha256-SiUkN6BQK1IQmLfkfPetzvYqRu9ENK6+6txtGxegF5Y=
|
||||
|
||||
4
go.sum
4
go.sum
@@ -1135,8 +1135,8 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
|
||||
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
|
||||
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
|
||||
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
|
||||
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
|
||||
golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA=
|
||||
golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
|
||||
@@ -119,6 +119,9 @@ func (b *LocalBackend) GetCertPEMWithValidity(ctx context.Context, domain string
|
||||
}
|
||||
|
||||
if pair, err := getCertPEMCached(cs, domain, now); err == nil {
|
||||
if envknob.IsCertShareReadOnlyMode() {
|
||||
return pair, nil
|
||||
}
|
||||
// If we got here, we have a valid unexpired cert.
|
||||
// Check whether we should start an async renewal.
|
||||
shouldRenew, err := b.shouldStartDomainRenewal(cs, domain, now, pair, minValidity)
|
||||
@@ -134,7 +137,7 @@ func (b *LocalBackend) GetCertPEMWithValidity(ctx context.Context, domain string
|
||||
if minValidity == 0 {
|
||||
logf("starting async renewal")
|
||||
// Start renewal in the background, return current valid cert.
|
||||
go b.getCertPEM(context.Background(), cs, logf, traceACME, domain, now, minValidity)
|
||||
b.goTracker.Go(func() { getCertPEM(context.Background(), b, cs, logf, traceACME, domain, now, minValidity) })
|
||||
return pair, nil
|
||||
}
|
||||
// If the caller requested a specific validity duration, fall through
|
||||
@@ -142,7 +145,11 @@ func (b *LocalBackend) GetCertPEMWithValidity(ctx context.Context, domain string
|
||||
logf("starting sync renewal")
|
||||
}
|
||||
|
||||
pair, err := b.getCertPEM(ctx, cs, logf, traceACME, domain, now, minValidity)
|
||||
if envknob.IsCertShareReadOnlyMode() {
|
||||
return nil, fmt.Errorf("retrieving cached TLS certificate failed and cert store is configured in read-only mode, not attempting to issue a new certificate: %w", err)
|
||||
}
|
||||
|
||||
pair, err := getCertPEM(ctx, b, cs, logf, traceACME, domain, now, minValidity)
|
||||
if err != nil {
|
||||
logf("getCertPEM: %v", err)
|
||||
return nil, err
|
||||
@@ -358,7 +365,29 @@ type certStateStore struct {
|
||||
testRoots *x509.CertPool
|
||||
}
|
||||
|
||||
// TLSCertKeyReader is an interface implemented by state stores where it makes
|
||||
// sense to read the TLS cert and key in a single operation that can be
|
||||
// distinguished from generic state value reads. Currently this is only implemented
|
||||
// by the kubestore.Store, which, in some cases, need to read cert and key from a
|
||||
// non-cached TLS Secret.
|
||||
type TLSCertKeyReader interface {
|
||||
ReadTLSCertAndKey(domain string) ([]byte, []byte, error)
|
||||
}
|
||||
|
||||
func (s certStateStore) Read(domain string, now time.Time) (*TLSCertKeyPair, error) {
|
||||
// If we're using a store that supports atomic reads, use that
|
||||
if kr, ok := s.StateStore.(TLSCertKeyReader); ok {
|
||||
cert, key, err := kr.ReadTLSCertAndKey(domain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !validCertPEM(domain, key, cert, s.testRoots, now) {
|
||||
return nil, errCertExpired
|
||||
}
|
||||
return &TLSCertKeyPair{CertPEM: cert, KeyPEM: key, Cached: true}, nil
|
||||
}
|
||||
|
||||
// Otherwise fall back to separate reads
|
||||
certPEM, err := s.ReadState(ipn.StateKey(domain + ".crt"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -446,7 +475,9 @@ func getCertPEMCached(cs certStore, domain string, now time.Time) (p *TLSCertKey
|
||||
return cs.Read(domain, now)
|
||||
}
|
||||
|
||||
func (b *LocalBackend) getCertPEM(ctx context.Context, cs certStore, logf logger.Logf, traceACME func(any), domain string, now time.Time, minValidity time.Duration) (*TLSCertKeyPair, error) {
|
||||
// getCertPem checks if a cert needs to be renewed and if so, renews it.
|
||||
// It can be overridden in tests.
|
||||
var getCertPEM = func(ctx context.Context, b *LocalBackend, cs certStore, logf logger.Logf, traceACME func(any), domain string, now time.Time, minValidity time.Duration) (*TLSCertKeyPair, error) {
|
||||
acmeMu.Lock()
|
||||
defer acmeMu.Unlock()
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
package ipnlocal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
@@ -14,11 +15,17 @@ import (
|
||||
"embed"
|
||||
"encoding/pem"
|
||||
"math/big"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"tailscale.com/envknob"
|
||||
"tailscale.com/ipn/store/mem"
|
||||
"tailscale.com/tstest"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/util/must"
|
||||
)
|
||||
|
||||
func TestValidLookingCertDomain(t *testing.T) {
|
||||
@@ -221,3 +228,151 @@ func TestDebugACMEDirectoryURL(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetCertPEMWithValidity(t *testing.T) {
|
||||
const testDomain = "example.com"
|
||||
b := &LocalBackend{
|
||||
store: &mem.Store{},
|
||||
varRoot: t.TempDir(),
|
||||
ctx: context.Background(),
|
||||
logf: t.Logf,
|
||||
}
|
||||
certDir, err := b.certDir()
|
||||
if err != nil {
|
||||
t.Fatalf("certDir error: %v", err)
|
||||
}
|
||||
if _, err := b.getCertStore(); err != nil {
|
||||
t.Fatalf("getCertStore error: %v", err)
|
||||
}
|
||||
testRoot, err := certTestFS.ReadFile("testdata/rootCA.pem")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
roots := x509.NewCertPool()
|
||||
if !roots.AppendCertsFromPEM(testRoot) {
|
||||
t.Fatal("Unable to add test CA to the cert pool")
|
||||
}
|
||||
testX509Roots = roots
|
||||
defer func() { testX509Roots = nil }()
|
||||
tests := []struct {
|
||||
name string
|
||||
now time.Time
|
||||
// storeCerts is true if the test cert and key should be written to store.
|
||||
storeCerts bool
|
||||
readOnlyMode bool // TS_READ_ONLY_CERTS env var
|
||||
wantAsyncRenewal bool // async issuance should be started
|
||||
wantIssuance bool // sync issuance should be started
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "valid_no_renewal",
|
||||
now: time.Date(2023, time.February, 20, 0, 0, 0, 0, time.UTC),
|
||||
storeCerts: true,
|
||||
wantAsyncRenewal: false,
|
||||
wantIssuance: false,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "issuance_needed",
|
||||
now: time.Date(2023, time.February, 20, 0, 0, 0, 0, time.UTC),
|
||||
storeCerts: false,
|
||||
wantAsyncRenewal: false,
|
||||
wantIssuance: true,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "renewal_needed",
|
||||
now: time.Date(2025, time.May, 1, 0, 0, 0, 0, time.UTC),
|
||||
storeCerts: true,
|
||||
wantAsyncRenewal: true,
|
||||
wantIssuance: false,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "renewal_needed_read_only_mode",
|
||||
now: time.Date(2025, time.May, 1, 0, 0, 0, 0, time.UTC),
|
||||
storeCerts: true,
|
||||
readOnlyMode: true,
|
||||
wantAsyncRenewal: false,
|
||||
wantIssuance: false,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "no_certs_read_only_mode",
|
||||
now: time.Date(2025, time.May, 1, 0, 0, 0, 0, time.UTC),
|
||||
storeCerts: false,
|
||||
readOnlyMode: true,
|
||||
wantAsyncRenewal: false,
|
||||
wantIssuance: false,
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
||||
if tt.readOnlyMode {
|
||||
envknob.Setenv("TS_CERT_SHARE_MODE", "ro")
|
||||
}
|
||||
|
||||
os.RemoveAll(certDir)
|
||||
if tt.storeCerts {
|
||||
os.MkdirAll(certDir, 0755)
|
||||
if err := os.WriteFile(filepath.Join(certDir, "example.com.crt"),
|
||||
must.Get(os.ReadFile("testdata/example.com.pem")), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(certDir, "example.com.key"),
|
||||
must.Get(os.ReadFile("testdata/example.com-key.pem")), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
b.clock = tstest.NewClock(tstest.ClockOpts{Start: tt.now})
|
||||
|
||||
allDone := make(chan bool, 1)
|
||||
defer b.goTracker.AddDoneCallback(func() {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if b.goTracker.RunningGoroutines() > 0 {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case allDone <- true:
|
||||
default:
|
||||
}
|
||||
})()
|
||||
|
||||
// Set to true if get getCertPEM is called. GetCertPEM can be called in a goroutine for async
|
||||
// renewal or in the main goroutine if issuance is required to obtain valid TLS credentials.
|
||||
getCertPemWasCalled := false
|
||||
getCertPEM = func(ctx context.Context, b *LocalBackend, cs certStore, logf logger.Logf, traceACME func(any), domain string, now time.Time, minValidity time.Duration) (*TLSCertKeyPair, error) {
|
||||
getCertPemWasCalled = true
|
||||
return nil, nil
|
||||
}
|
||||
prevGoRoutines := b.goTracker.StartedGoroutines()
|
||||
_, err = b.GetCertPEMWithValidity(context.Background(), testDomain, 0)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("b.GetCertPemWithValidity got err %v, wants error: '%v'", err, tt.wantErr)
|
||||
}
|
||||
// GetCertPEMWithValidity calls getCertPEM in a goroutine if async renewal is needed. That's the
|
||||
// only goroutine it starts, so this can be used to test if async renewal was started.
|
||||
gotAsyncRenewal := b.goTracker.StartedGoroutines()-prevGoRoutines != 0
|
||||
if gotAsyncRenewal {
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timed out waiting for goroutines to finish")
|
||||
case <-allDone:
|
||||
}
|
||||
}
|
||||
// Verify that async renewal was triggered if expected.
|
||||
if tt.wantAsyncRenewal != gotAsyncRenewal {
|
||||
t.Fatalf("wants getCertPem to be called async: %v, got called %v", tt.wantAsyncRenewal, gotAsyncRenewal)
|
||||
}
|
||||
// Verify that (non-async) issuance was started if expected.
|
||||
gotIssuance := getCertPemWasCalled && !gotAsyncRenewal
|
||||
if tt.wantIssuance != gotIssuance {
|
||||
t.Errorf("wants getCertPem to be called: %v, got called %v", tt.wantIssuance, gotIssuance)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
42
net/netmon/loghelper.go
Normal file
42
net/netmon/loghelper.go
Normal file
@@ -0,0 +1,42 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package netmon
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"tailscale.com/types/logger"
|
||||
)
|
||||
|
||||
// LinkChangeLogLimiter returns a new [logger.Logf] that logs each unique
|
||||
// format string to the underlying logger only once per major LinkChange event.
|
||||
//
|
||||
// The returned function should be called when the logger is no longer needed,
|
||||
// to release resources from the Monitor.
|
||||
func LinkChangeLogLimiter(logf logger.Logf, nm *Monitor) (_ logger.Logf, unregister func()) {
|
||||
var formatSeen sync.Map // map[string]bool
|
||||
unregister = nm.RegisterChangeCallback(func(cd *ChangeDelta) {
|
||||
// If we're in a major change or a time jump, clear the seen map.
|
||||
if cd.Major || cd.TimeJumped {
|
||||
formatSeen.Clear()
|
||||
}
|
||||
})
|
||||
|
||||
return func(format string, args ...any) {
|
||||
// We only store 'true' in the map, so if it's present then it
|
||||
// means we've already logged this format string.
|
||||
_, loaded := formatSeen.LoadOrStore(format, true)
|
||||
if loaded {
|
||||
// TODO(andrew-d): we may still want to log this
|
||||
// message every N minutes (1x/hour?) even if it's been
|
||||
// seen, so that debugging doesn't require searching
|
||||
// back in the logs for an unbounded amount of time.
|
||||
//
|
||||
// See: https://github.com/tailscale/tailscale/issues/13145
|
||||
return
|
||||
}
|
||||
|
||||
logf(format, args...)
|
||||
}, unregister
|
||||
}
|
||||
78
net/netmon/loghelper_test.go
Normal file
78
net/netmon/loghelper_test.go
Normal file
@@ -0,0 +1,78 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package netmon
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestLinkChangeLogLimiter(t *testing.T) {
|
||||
mon, err := New(t.Logf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mon.Close()
|
||||
|
||||
var logBuffer bytes.Buffer
|
||||
logf := func(format string, args ...any) {
|
||||
t.Logf("captured log: "+format, args...)
|
||||
|
||||
if format[len(format)-1] != '\n' {
|
||||
format += "\n"
|
||||
}
|
||||
fmt.Fprintf(&logBuffer, format, args...)
|
||||
}
|
||||
|
||||
logf, unregister := LinkChangeLogLimiter(logf, mon)
|
||||
defer unregister()
|
||||
|
||||
// Log once, which should write to our log buffer.
|
||||
logf("hello %s", "world")
|
||||
if got := logBuffer.String(); got != "hello world\n" {
|
||||
t.Errorf("unexpected log buffer contents: %q", got)
|
||||
}
|
||||
|
||||
// Log again, which should not write to our log buffer.
|
||||
logf("hello %s", "andrew")
|
||||
if got := logBuffer.String(); got != "hello world\n" {
|
||||
t.Errorf("unexpected log buffer contents: %q", got)
|
||||
}
|
||||
|
||||
// Log a different message, which should write to our log buffer.
|
||||
logf("other message")
|
||||
if got := logBuffer.String(); got != "hello world\nother message\n" {
|
||||
t.Errorf("unexpected log buffer contents: %q", got)
|
||||
}
|
||||
|
||||
// Synthesize a fake major change event, which should clear the format
|
||||
// string cache and allow the next log to write to our log buffer.
|
||||
//
|
||||
// InjectEvent doesn't work because it's not a major event, so we
|
||||
// instead reach into the netmon and grab the callback, and then call
|
||||
// it ourselves.
|
||||
mon.mu.Lock()
|
||||
var cb func(*ChangeDelta)
|
||||
for _, c := range mon.cbs {
|
||||
cb = c
|
||||
break
|
||||
}
|
||||
mon.mu.Unlock()
|
||||
|
||||
cb(&ChangeDelta{Major: true})
|
||||
|
||||
logf("hello %s", "world")
|
||||
if got := logBuffer.String(); got != "hello world\nother message\nhello world\n" {
|
||||
t.Errorf("unexpected log buffer contents: %q", got)
|
||||
}
|
||||
|
||||
// Unregistering the callback should clear our 'cbs' set.
|
||||
unregister()
|
||||
mon.mu.Lock()
|
||||
if len(mon.cbs) != 0 {
|
||||
t.Errorf("expected no callbacks, got %v", mon.cbs)
|
||||
}
|
||||
mon.mu.Unlock()
|
||||
}
|
||||
@@ -16,4 +16,4 @@
|
||||
) {
|
||||
src = ./.;
|
||||
}).shellNix
|
||||
# nix-direnv cache busting line: sha256-xO1DuLWi6/lpA9ubA2ZYVJM+CkVNA5IaVGZxX9my0j0=
|
||||
# nix-direnv cache busting line: sha256-SiUkN6BQK1IQmLfkfPetzvYqRu9ENK6+6txtGxegF5Y=
|
||||
|
||||
@@ -34,6 +34,7 @@ type DebugHandler struct {
|
||||
kvs []func(io.Writer) // output one <li>...</li> each, see KV()
|
||||
urls []string // one <li>...</li> block with link each
|
||||
sections []func(io.Writer, *http.Request) // invoked in registration order prior to outputting </body>
|
||||
title string // title displayed on index page
|
||||
}
|
||||
|
||||
// Debugger returns the DebugHandler registered on mux at /debug/,
|
||||
@@ -44,7 +45,8 @@ func Debugger(mux *http.ServeMux) *DebugHandler {
|
||||
return d
|
||||
}
|
||||
ret := &DebugHandler{
|
||||
mux: mux,
|
||||
mux: mux,
|
||||
title: fmt.Sprintf("%s debug", version.CmdName()),
|
||||
}
|
||||
mux.Handle("/debug/", ret)
|
||||
|
||||
@@ -85,7 +87,7 @@ func (d *DebugHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
AddBrowserHeaders(w)
|
||||
f := func(format string, args ...any) { fmt.Fprintf(w, format, args...) }
|
||||
f("<html><body><h1>%s debug</h1><ul>", version.CmdName())
|
||||
f("<html><body><h1>%s</h1><ul>", html.EscapeString(d.title))
|
||||
for _, kv := range d.kvs {
|
||||
kv(w)
|
||||
}
|
||||
@@ -103,14 +105,20 @@ func (d *DebugHandler) handle(slug string, handler http.Handler) string {
|
||||
return href
|
||||
}
|
||||
|
||||
// Handle registers handler at /debug/<slug> and creates a descriptive
|
||||
// entry in /debug/ for it.
|
||||
// Handle registers handler at /debug/<slug> and adds a link to it
|
||||
// on /debug/ with the provided description.
|
||||
func (d *DebugHandler) Handle(slug, desc string, handler http.Handler) {
|
||||
href := d.handle(slug, handler)
|
||||
d.URL(href, desc)
|
||||
}
|
||||
|
||||
// HandleSilent registers handler at /debug/<slug>. It does not create
|
||||
// Handle registers handler at /debug/<slug> and adds a link to it
|
||||
// on /debug/ with the provided description.
|
||||
func (d *DebugHandler) HandleFunc(slug, desc string, handler http.HandlerFunc) {
|
||||
d.Handle(slug, desc, handler)
|
||||
}
|
||||
|
||||
// HandleSilent registers handler at /debug/<slug>. It does not add
|
||||
// a descriptive entry in /debug/ for it. This should be used
|
||||
// sparingly, for things that need to be registered but would pollute
|
||||
// the list of debug links.
|
||||
@@ -118,6 +126,14 @@ func (d *DebugHandler) HandleSilent(slug string, handler http.Handler) {
|
||||
d.handle(slug, handler)
|
||||
}
|
||||
|
||||
// HandleSilent registers handler at /debug/<slug>. It does not add
|
||||
// a descriptive entry in /debug/ for it. This should be used
|
||||
// sparingly, for things that need to be registered but would pollute
|
||||
// the list of debug links.
|
||||
func (d *DebugHandler) HandleSilentFunc(slug string, handler http.HandlerFunc) {
|
||||
d.HandleSilent(slug, handler)
|
||||
}
|
||||
|
||||
// KV adds a key/value list item to /debug/.
|
||||
func (d *DebugHandler) KV(k string, v any) {
|
||||
val := html.EscapeString(fmt.Sprintf("%v", v))
|
||||
@@ -149,6 +165,11 @@ func (d *DebugHandler) Section(f func(w io.Writer, r *http.Request)) {
|
||||
d.sections = append(d.sections, f)
|
||||
}
|
||||
|
||||
// Title sets the title at the top of the debug page.
|
||||
func (d *DebugHandler) Title(title string) {
|
||||
d.title = title
|
||||
}
|
||||
|
||||
func gcHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("running GC...\n"))
|
||||
if f, ok := w.(http.Flusher); ok {
|
||||
|
||||
6
util/eventbus/assets/event.html
Normal file
6
util/eventbus/assets/event.html
Normal file
@@ -0,0 +1,6 @@
|
||||
<li id="monitor" hx-swap-oob="afterbegin">
|
||||
<details>
|
||||
<summary>{{.Count}}: {{.Type}} from {{.Event.From.Name}}, {{len .Event.To}} recipients</summary>
|
||||
{{.Event.Event}}
|
||||
</details>
|
||||
</li>
|
||||
BIN
util/eventbus/assets/htmx-websocket.min.js.gz
Normal file
BIN
util/eventbus/assets/htmx-websocket.min.js.gz
Normal file
Binary file not shown.
BIN
util/eventbus/assets/htmx.min.js.gz
Normal file
BIN
util/eventbus/assets/htmx.min.js.gz
Normal file
Binary file not shown.
97
util/eventbus/assets/main.html
Normal file
97
util/eventbus/assets/main.html
Normal file
@@ -0,0 +1,97 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<script src="bus/htmx.min.js"></script>
|
||||
<script src="bus/htmx-websocket.min.js"></script>
|
||||
<link rel="stylesheet" href="bus/style.css">
|
||||
</head>
|
||||
<body hx-ext="ws">
|
||||
<h1>Event bus</h1>
|
||||
|
||||
<section>
|
||||
<h2>General</h2>
|
||||
{{with $.PublishQueue}}
|
||||
{{len .}} pending
|
||||
{{end}}
|
||||
|
||||
<button hx-post="bus/monitor" hx-swap="outerHTML">Monitor all events</button>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>Clients</h2>
|
||||
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Name</th>
|
||||
<th>Publishing</th>
|
||||
<th>Subscribing</th>
|
||||
<th>Pending</th>
|
||||
</tr>
|
||||
</thead>
|
||||
{{range .Clients}}
|
||||
<tr id="{{.Name}}">
|
||||
<td>{{.Name}}</td>
|
||||
<td class="list">
|
||||
<ul>
|
||||
{{range .Publish}}
|
||||
<li><a href="#{{.}}">{{.}}</a></li>
|
||||
{{end}}
|
||||
</ul>
|
||||
</td>
|
||||
<td class="list">
|
||||
<ul>
|
||||
{{range .Subscribe}}
|
||||
<li><a href="#{{.}}">{{.}}</a></li>
|
||||
{{end}}
|
||||
</ul>
|
||||
</td>
|
||||
<td>
|
||||
{{len ($.SubscribeQueue .Client)}}
|
||||
</td>
|
||||
</tr>
|
||||
{{end}}
|
||||
</table>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>Types</h2>
|
||||
|
||||
{{range .Types}}
|
||||
|
||||
<section id="{{.}}">
|
||||
<h3>{{.Name}}</h3>
|
||||
<h4>Definition</h4>
|
||||
<code>{{prettyPrintStruct .}}</code>
|
||||
|
||||
<h4>Published by:</h4>
|
||||
{{if len (.Publish)}}
|
||||
<ul>
|
||||
{{range .Publish}}
|
||||
<li><a href="#{{.Name}}">{{.Name}}</a></li>
|
||||
{{end}}
|
||||
</ul>
|
||||
{{else}}
|
||||
<ul>
|
||||
<li>No publishers.</li>
|
||||
</ul>
|
||||
{{end}}
|
||||
|
||||
<h4>Received by:</h4>
|
||||
{{if len (.Subscribe)}}
|
||||
<ul>
|
||||
{{range .Subscribe}}
|
||||
<li><a href="#{{.Name}}">{{.Name}}</a></li>
|
||||
{{end}}
|
||||
</ul>
|
||||
{{else}}
|
||||
<ul>
|
||||
<li>No subscribers.</li>
|
||||
</ul>
|
||||
{{end}}
|
||||
</section>
|
||||
{{end}}
|
||||
|
||||
</section>
|
||||
</body>
|
||||
</html>
|
||||
5
util/eventbus/assets/monitor.html
Normal file
5
util/eventbus/assets/monitor.html
Normal file
@@ -0,0 +1,5 @@
|
||||
<div>
|
||||
<ul id="monitor" ws-connect="bus/monitor">
|
||||
</ul>
|
||||
<button hx-get="bus" hx-target="body">Stop monitoring</button>
|
||||
</div>
|
||||
90
util/eventbus/assets/style.css
Normal file
90
util/eventbus/assets/style.css
Normal file
@@ -0,0 +1,90 @@
|
||||
/* CSS reset, thanks Josh Comeau: https://www.joshwcomeau.com/css/custom-css-reset/ */
|
||||
*, *::before, *::after { box-sizing: border-box; }
|
||||
* { margin: 0; }
|
||||
input, button, textarea, select { font: inherit; }
|
||||
p, h1, h2, h3, h4, h5, h6 { overflow-wrap: break-word; }
|
||||
p { text-wrap: pretty; }
|
||||
h1, h2, h3, h4, h5, h6 { text-wrap: balance; }
|
||||
#root, #__next { isolation: isolate; }
|
||||
body {
|
||||
line-height: 1.5;
|
||||
-webkit-font-smoothing: antialiased;
|
||||
}
|
||||
img, picture, video, canvas, svg {
|
||||
display: block;
|
||||
max-width: 100%;
|
||||
}
|
||||
|
||||
/* Local styling begins */
|
||||
|
||||
body {
|
||||
padding: 12px;
|
||||
}
|
||||
|
||||
div {
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
section {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
flex-gap: 6px;
|
||||
align-items: flex-start;
|
||||
padding: 12px 0;
|
||||
}
|
||||
|
||||
section > * {
|
||||
margin-left: 24px;
|
||||
}
|
||||
|
||||
section > h2, section > h3 {
|
||||
margin-left: 0;
|
||||
padding-bottom: 6px;
|
||||
padding-top: 12px;
|
||||
}
|
||||
|
||||
details {
|
||||
padding-bottom: 12px;
|
||||
}
|
||||
|
||||
table {
|
||||
table-layout: fixed;
|
||||
width: calc(100% - 48px);
|
||||
border-collapse: collapse;
|
||||
border: 1px solid black;
|
||||
}
|
||||
|
||||
th, td {
|
||||
padding: 12px;
|
||||
border: 1px solid black;
|
||||
}
|
||||
|
||||
td.list {
|
||||
vertical-align: top;
|
||||
}
|
||||
|
||||
ul {
|
||||
list-style: none;
|
||||
}
|
||||
|
||||
td ul {
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
code {
|
||||
padding: 12px;
|
||||
white-space: pre;
|
||||
}
|
||||
|
||||
#monitor {
|
||||
width: calc(100% - 48px);
|
||||
resize: vertical;
|
||||
padding: 12px;
|
||||
overflow: scroll;
|
||||
height: 15lh;
|
||||
border: 1px inset;
|
||||
min-height: 1em;
|
||||
display: flex;
|
||||
flex-direction: column-reverse;
|
||||
}
|
||||
125
util/eventbus/bench_test.go
Normal file
125
util/eventbus/bench_test.go
Normal file
@@ -0,0 +1,125 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package eventbus_test
|
||||
|
||||
import (
|
||||
"math/rand/v2"
|
||||
"testing"
|
||||
|
||||
"tailscale.com/util/eventbus"
|
||||
)
|
||||
|
||||
func BenchmarkBasicThroughput(b *testing.B) {
|
||||
bus := eventbus.New()
|
||||
pcli := bus.Client(b.Name() + "-pub")
|
||||
scli := bus.Client(b.Name() + "-sub")
|
||||
|
||||
type emptyEvent [0]byte
|
||||
|
||||
// One publisher and a corresponding subscriber shoveling events as fast as
|
||||
// they can through the plumbing.
|
||||
pub := eventbus.Publish[emptyEvent](pcli)
|
||||
sub := eventbus.Subscribe[emptyEvent](scli)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-sub.Events():
|
||||
continue
|
||||
case <-sub.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for b.Loop() {
|
||||
pub.Publish(emptyEvent{})
|
||||
}
|
||||
bus.Close()
|
||||
}
|
||||
|
||||
func BenchmarkSubsThroughput(b *testing.B) {
|
||||
bus := eventbus.New()
|
||||
pcli := bus.Client(b.Name() + "-pub")
|
||||
scli1 := bus.Client(b.Name() + "-sub1")
|
||||
scli2 := bus.Client(b.Name() + "-sub2")
|
||||
|
||||
type emptyEvent [0]byte
|
||||
|
||||
// One publisher and two subscribers shoveling events as fast as they can
|
||||
// through the plumbing.
|
||||
pub := eventbus.Publish[emptyEvent](pcli)
|
||||
sub1 := eventbus.Subscribe[emptyEvent](scli1)
|
||||
sub2 := eventbus.Subscribe[emptyEvent](scli2)
|
||||
|
||||
for _, sub := range []*eventbus.Subscriber[emptyEvent]{sub1, sub2} {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-sub.Events():
|
||||
continue
|
||||
case <-sub.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for b.Loop() {
|
||||
pub.Publish(emptyEvent{})
|
||||
}
|
||||
bus.Close()
|
||||
}
|
||||
|
||||
func BenchmarkMultiThroughput(b *testing.B) {
|
||||
bus := eventbus.New()
|
||||
cli := bus.Client(b.Name())
|
||||
|
||||
type eventA struct{}
|
||||
type eventB struct{}
|
||||
|
||||
// Two disjoint event streams routed through the global order.
|
||||
apub := eventbus.Publish[eventA](cli)
|
||||
asub := eventbus.Subscribe[eventA](cli)
|
||||
bpub := eventbus.Publish[eventB](cli)
|
||||
bsub := eventbus.Subscribe[eventB](cli)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-asub.Events():
|
||||
continue
|
||||
case <-asub.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-bsub.Events():
|
||||
continue
|
||||
case <-bsub.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var rng uint64
|
||||
var bits int
|
||||
for b.Loop() {
|
||||
if bits == 0 {
|
||||
rng = rand.Uint64()
|
||||
bits = 64
|
||||
}
|
||||
if rng&1 == 0 {
|
||||
apub.Publish(eventA{})
|
||||
} else {
|
||||
bpub.Publish(eventB{})
|
||||
}
|
||||
rng >>= 1
|
||||
bits--
|
||||
}
|
||||
bus.Close()
|
||||
}
|
||||
@@ -73,8 +73,8 @@ func (b *Bus) Client(name string) *Client {
|
||||
}
|
||||
|
||||
// Debugger returns the debugging facility for the bus.
|
||||
func (b *Bus) Debugger() Debugger {
|
||||
return Debugger{b}
|
||||
func (b *Bus) Debugger() *Debugger {
|
||||
return &Debugger{b}
|
||||
}
|
||||
|
||||
// Close closes the bus. Implicitly closes all clients, publishers and
|
||||
|
||||
103
util/eventbus/debug-demo/main.go
Normal file
103
util/eventbus/debug-demo/main.go
Normal file
@@ -0,0 +1,103 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
// debug-demo is a program that serves a bus's debug interface over
|
||||
// HTTP, then generates some fake traffic from a handful of
|
||||
// clients. It is an aid to development, to have something to present
|
||||
// on the debug interfaces while writing them.
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math/rand/v2"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"time"
|
||||
|
||||
"tailscale.com/tsweb"
|
||||
"tailscale.com/types/key"
|
||||
"tailscale.com/util/eventbus"
|
||||
)
|
||||
|
||||
func main() {
|
||||
b := eventbus.New()
|
||||
c := b.Client("RouteMonitor")
|
||||
go testPub[RouteAdded](c, 5*time.Second)
|
||||
go testPub[RouteRemoved](c, 5*time.Second)
|
||||
c = b.Client("ControlClient")
|
||||
go testPub[PeerAdded](c, 3*time.Second)
|
||||
go testPub[PeerRemoved](c, 6*time.Second)
|
||||
c = b.Client("Portmapper")
|
||||
go testPub[PortmapAcquired](c, 10*time.Second)
|
||||
go testPub[PortmapLost](c, 15*time.Second)
|
||||
go testSub[RouteAdded](c)
|
||||
c = b.Client("WireguardConfig")
|
||||
go testSub[PeerAdded](c)
|
||||
go testSub[PeerRemoved](c)
|
||||
c = b.Client("Magicsock")
|
||||
go testPub[PeerPathChanged](c, 5*time.Second)
|
||||
go testSub[RouteAdded](c)
|
||||
go testSub[RouteRemoved](c)
|
||||
go testSub[PortmapAcquired](c)
|
||||
go testSub[PortmapLost](c)
|
||||
|
||||
m := http.NewServeMux()
|
||||
d := tsweb.Debugger(m)
|
||||
b.Debugger().RegisterHTTP(d)
|
||||
|
||||
m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Redirect(w, r, "/debug/bus", http.StatusFound)
|
||||
})
|
||||
log.Printf("Serving debug interface at http://localhost:8185/debug/bus")
|
||||
http.ListenAndServe(":8185", m)
|
||||
}
|
||||
|
||||
func testPub[T any](c *eventbus.Client, every time.Duration) {
|
||||
p := eventbus.Publish[T](c)
|
||||
for {
|
||||
jitter := time.Duration(rand.N(2000)) * time.Millisecond
|
||||
time.Sleep(jitter)
|
||||
var zero T
|
||||
log.Printf("%s publish: %T", c.Name(), zero)
|
||||
p.Publish(zero)
|
||||
time.Sleep(every)
|
||||
}
|
||||
}
|
||||
|
||||
func testSub[T any](c *eventbus.Client) {
|
||||
s := eventbus.Subscribe[T](c)
|
||||
for v := range s.Events() {
|
||||
log.Printf("%s received: %T", c.Name(), v)
|
||||
}
|
||||
}
|
||||
|
||||
type RouteAdded struct {
|
||||
Prefix netip.Prefix
|
||||
Via netip.Addr
|
||||
Priority int
|
||||
}
|
||||
type RouteRemoved struct {
|
||||
Prefix netip.Addr
|
||||
}
|
||||
|
||||
type PeerAdded struct {
|
||||
ID int
|
||||
Key key.NodePublic
|
||||
}
|
||||
type PeerRemoved struct {
|
||||
ID int
|
||||
Key key.NodePublic
|
||||
}
|
||||
|
||||
type PortmapAcquired struct {
|
||||
Endpoint netip.Addr
|
||||
}
|
||||
type PortmapLost struct {
|
||||
Endpoint netip.Addr
|
||||
}
|
||||
|
||||
type PeerPathChanged struct {
|
||||
ID int
|
||||
EndpointID int
|
||||
Quality int
|
||||
}
|
||||
@@ -4,11 +4,14 @@
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"tailscale.com/tsweb"
|
||||
)
|
||||
|
||||
// A Debugger offers access to a bus's privileged introspection and
|
||||
@@ -29,7 +32,11 @@ type Debugger struct {
|
||||
|
||||
// Clients returns a list of all clients attached to the bus.
|
||||
func (d *Debugger) Clients() []*Client {
|
||||
return d.bus.listClients()
|
||||
ret := d.bus.listClients()
|
||||
slices.SortFunc(ret, func(a, b *Client) int {
|
||||
return cmp.Compare(a.Name(), b.Name())
|
||||
})
|
||||
return ret
|
||||
}
|
||||
|
||||
// PublishQueue returns the contents of the publish queue.
|
||||
@@ -130,6 +137,8 @@ func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type {
|
||||
return client.subscribeTypes()
|
||||
}
|
||||
|
||||
func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler) { registerHTTPDebugger(d, td) }
|
||||
|
||||
// A hook collects hook functions that can be run as a group.
|
||||
type hook[T any] struct {
|
||||
sync.Mutex
|
||||
|
||||
238
util/eventbus/debughttp.go
Normal file
238
util/eventbus/debughttp.go
Normal file
@@ -0,0 +1,238 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"embed"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"io"
|
||||
"io/fs"
|
||||
"log"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/coder/websocket"
|
||||
"tailscale.com/tsweb"
|
||||
)
|
||||
|
||||
type httpDebugger struct {
|
||||
*Debugger
|
||||
}
|
||||
|
||||
func registerHTTPDebugger(d *Debugger, td *tsweb.DebugHandler) {
|
||||
dh := httpDebugger{d}
|
||||
td.Handle("bus", "Event bus", dh)
|
||||
td.HandleSilent("bus/monitor", http.HandlerFunc(dh.serveMonitor))
|
||||
td.HandleSilent("bus/style.css", serveStatic("style.css"))
|
||||
td.HandleSilent("bus/htmx.min.js", serveStatic("htmx.min.js.gz"))
|
||||
td.HandleSilent("bus/htmx-websocket.min.js", serveStatic("htmx-websocket.min.js.gz"))
|
||||
}
|
||||
|
||||
//go:embed assets/*.html
|
||||
var templatesSrc embed.FS
|
||||
|
||||
var templates = sync.OnceValue(func() *template.Template {
|
||||
d, err := fs.Sub(templatesSrc, "assets")
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("getting eventbus debughttp templates subdir: %w", err))
|
||||
}
|
||||
ret := template.New("").Funcs(map[string]any{
|
||||
"prettyPrintStruct": prettyPrintStruct,
|
||||
})
|
||||
return template.Must(ret.ParseFS(d, "*"))
|
||||
})
|
||||
|
||||
//go:generate go run fetch-htmx.go
|
||||
|
||||
//go:embed assets/*.css assets/*.min.js.gz
|
||||
var static embed.FS
|
||||
|
||||
func serveStatic(name string) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case strings.HasSuffix(name, ".css"):
|
||||
w.Header().Set("Content-Type", "text/css")
|
||||
case strings.HasSuffix(name, ".min.js.gz"):
|
||||
w.Header().Set("Content-Type", "text/javascript")
|
||||
w.Header().Set("Content-Encoding", "gzip")
|
||||
case strings.HasSuffix(name, ".js"):
|
||||
w.Header().Set("Content-Type", "text/javascript")
|
||||
default:
|
||||
http.Error(w, "not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
f, err := static.Open(filepath.Join("assets", name))
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("opening asset: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
if _, err := io.Copy(w, f); err != nil {
|
||||
http.Error(w, fmt.Sprintf("serving asset: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func render(w http.ResponseWriter, name string, data any) {
|
||||
err := templates().ExecuteTemplate(w, name+".html", data)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("rendering template: %v", err)
|
||||
log.Print(err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
func (h httpDebugger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
type clientInfo struct {
|
||||
*Client
|
||||
Publish []reflect.Type
|
||||
Subscribe []reflect.Type
|
||||
}
|
||||
type typeInfo struct {
|
||||
reflect.Type
|
||||
Publish []*Client
|
||||
Subscribe []*Client
|
||||
}
|
||||
type info struct {
|
||||
*Debugger
|
||||
Clients map[string]*clientInfo
|
||||
Types map[string]*typeInfo
|
||||
}
|
||||
|
||||
data := info{
|
||||
Debugger: h.Debugger,
|
||||
Clients: map[string]*clientInfo{},
|
||||
Types: map[string]*typeInfo{},
|
||||
}
|
||||
|
||||
getTypeInfo := func(t reflect.Type) *typeInfo {
|
||||
if data.Types[t.Name()] == nil {
|
||||
data.Types[t.Name()] = &typeInfo{
|
||||
Type: t,
|
||||
}
|
||||
}
|
||||
return data.Types[t.Name()]
|
||||
}
|
||||
|
||||
for _, c := range h.Clients() {
|
||||
ci := &clientInfo{
|
||||
Client: c,
|
||||
Publish: h.PublishTypes(c),
|
||||
Subscribe: h.SubscribeTypes(c),
|
||||
}
|
||||
slices.SortFunc(ci.Publish, func(a, b reflect.Type) int { return cmp.Compare(a.Name(), b.Name()) })
|
||||
slices.SortFunc(ci.Subscribe, func(a, b reflect.Type) int { return cmp.Compare(a.Name(), b.Name()) })
|
||||
data.Clients[c.Name()] = ci
|
||||
|
||||
for _, t := range ci.Publish {
|
||||
ti := getTypeInfo(t)
|
||||
ti.Publish = append(ti.Publish, c)
|
||||
}
|
||||
for _, t := range ci.Subscribe {
|
||||
ti := getTypeInfo(t)
|
||||
ti.Subscribe = append(ti.Subscribe, c)
|
||||
}
|
||||
}
|
||||
|
||||
render(w, "main", data)
|
||||
}
|
||||
|
||||
func (h httpDebugger) serveMonitor(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Header.Get("Upgrade") == "websocket" {
|
||||
h.serveMonitorStream(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
render(w, "monitor", nil)
|
||||
}
|
||||
|
||||
func (h httpDebugger) serveMonitorStream(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := websocket.Accept(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.CloseNow()
|
||||
wsCtx := conn.CloseRead(r.Context())
|
||||
|
||||
mon := h.WatchBus()
|
||||
defer mon.Close()
|
||||
|
||||
i := 0
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case <-wsCtx.Done():
|
||||
return
|
||||
case <-mon.Done():
|
||||
return
|
||||
case event := <-mon.Events():
|
||||
msg, err := conn.Writer(r.Context(), websocket.MessageText)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
data := map[string]any{
|
||||
"Count": i,
|
||||
"Type": reflect.TypeOf(event.Event),
|
||||
"Event": event,
|
||||
}
|
||||
i++
|
||||
if err := templates().ExecuteTemplate(msg, "event.html", data); err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
if err := msg.Close(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func prettyPrintStruct(t reflect.Type) string {
|
||||
if t.Kind() != reflect.Struct {
|
||||
return t.String()
|
||||
}
|
||||
var rec func(io.Writer, int, reflect.Type)
|
||||
rec = func(out io.Writer, indent int, t reflect.Type) {
|
||||
ind := strings.Repeat(" ", indent)
|
||||
fmt.Fprintf(out, "%s", t.String())
|
||||
fs := collectFields(t)
|
||||
if len(fs) > 0 {
|
||||
io.WriteString(out, " {\n")
|
||||
for _, f := range fs {
|
||||
fmt.Fprintf(out, "%s %s ", ind, f.Name)
|
||||
if f.Type.Kind() == reflect.Struct {
|
||||
rec(out, indent+1, f.Type)
|
||||
} else {
|
||||
fmt.Fprint(out, f.Type)
|
||||
}
|
||||
io.WriteString(out, "\n")
|
||||
}
|
||||
fmt.Fprintf(out, "%s}", ind)
|
||||
}
|
||||
}
|
||||
|
||||
var ret bytes.Buffer
|
||||
rec(&ret, 0, t)
|
||||
return ret.String()
|
||||
}
|
||||
|
||||
func collectFields(t reflect.Type) (ret []reflect.StructField) {
|
||||
for _, f := range reflect.VisibleFields(t) {
|
||||
if !f.IsExported() {
|
||||
continue
|
||||
}
|
||||
ret = append(ret, f)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
93
util/eventbus/fetch-htmx.go
Normal file
93
util/eventbus/fetch-htmx.go
Normal file
@@ -0,0 +1,93 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build ignore
|
||||
|
||||
// Program fetch-htmx fetches and installs local copies of the HTMX
|
||||
// library and its dependencies, used by the debug UI. It is meant to
|
||||
// be run via go generate.
|
||||
package main
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"crypto/sha512"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Hash from https://htmx.org/docs/#installing
|
||||
htmx, err := fetchHashed("https://unpkg.com/htmx.org@2.0.4", "HGfztofotfshcF7+8n44JQL2oJmowVChPTg48S+jvZoztPfvwD79OC/LTtG6dMp+")
|
||||
if err != nil {
|
||||
log.Fatalf("fetching htmx: %v", err)
|
||||
}
|
||||
|
||||
// Hash SHOULD be from https://htmx.org/extensions/ws/ , but the
|
||||
// hash is currently incorrect, see
|
||||
// https://github.com/bigskysoftware/htmx-extensions/issues/153
|
||||
//
|
||||
// Until that bug is resolved, hash was obtained by rebuilding the
|
||||
// extension from git source, and verifying that the hash matches
|
||||
// what unpkg is serving.
|
||||
ws, err := fetchHashed("https://unpkg.com/htmx-ext-ws@2.0.2", "932iIqjARv+Gy0+r6RTGrfCkCKS5MsF539Iqf6Vt8L4YmbnnWI2DSFoMD90bvXd0")
|
||||
if err != nil {
|
||||
log.Fatalf("fetching htmx-websockets: %v", err)
|
||||
}
|
||||
|
||||
if err := writeGz("assets/htmx.min.js.gz", htmx); err != nil {
|
||||
log.Fatalf("writing htmx.min.js.gz: %v", err)
|
||||
}
|
||||
if err := writeGz("assets/htmx-websocket.min.js.gz", ws); err != nil {
|
||||
log.Fatalf("writing htmx-websocket.min.js.gz: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func writeGz(path string, bs []byte) error {
|
||||
f, err := os.Create(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
g, err := gzip.NewWriterLevel(f, gzip.BestCompression)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := g.Write(bs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := g.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func fetchHashed(url, wantHash string) ([]byte, error) {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("fetching %q returned error status: %s", url, resp.Status)
|
||||
}
|
||||
ret, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading file from %q: %v", url, err)
|
||||
}
|
||||
h := sha512.Sum384(ret)
|
||||
got := base64.StdEncoding.EncodeToString(h[:])
|
||||
if got != wantHash {
|
||||
return nil, fmt.Errorf("wrong hash for %q: got %q, want %q", url, got, wantHash)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
@@ -177,6 +177,10 @@ type Conn struct {
|
||||
// port mappings from NAT devices.
|
||||
portMapper *portmapper.Client
|
||||
|
||||
// portMapperLogfUnregister is the function to call to unregister
|
||||
// the portmapper log limiter.
|
||||
portMapperLogfUnregister func()
|
||||
|
||||
// derpRecvCh is used by receiveDERP to read DERP messages.
|
||||
// It must have buffer size > 0; see issue 3736.
|
||||
derpRecvCh chan derpReadResult
|
||||
@@ -532,10 +536,15 @@ func NewConn(opts Options) (*Conn, error) {
|
||||
c.idleFunc = opts.IdleFunc
|
||||
c.testOnlyPacketListener = opts.TestOnlyPacketListener
|
||||
c.noteRecvActivity = opts.NoteRecvActivity
|
||||
|
||||
// Don't log the same log messages possibly every few seconds in our
|
||||
// portmapper.
|
||||
portmapperLogf := logger.WithPrefix(c.logf, "portmapper: ")
|
||||
portmapperLogf, c.portMapperLogfUnregister = netmon.LinkChangeLogLimiter(portmapperLogf, opts.NetMon)
|
||||
portMapOpts := &portmapper.DebugKnobs{
|
||||
DisableAll: func() bool { return opts.DisablePortMapper || c.onlyTCP443.Load() },
|
||||
}
|
||||
c.portMapper = portmapper.NewClient(logger.WithPrefix(c.logf, "portmapper: "), opts.NetMon, portMapOpts, opts.ControlKnobs, c.onPortMapChanged)
|
||||
c.portMapper = portmapper.NewClient(portmapperLogf, opts.NetMon, portMapOpts, opts.ControlKnobs, c.onPortMapChanged)
|
||||
c.portMapper.SetGatewayLookupFunc(opts.NetMon.GatewayAndSelfIP)
|
||||
c.netMon = opts.NetMon
|
||||
c.health = opts.HealthTracker
|
||||
@@ -2481,6 +2490,7 @@ func (c *Conn) Close() error {
|
||||
}
|
||||
c.stopPeriodicReSTUNTimerLocked()
|
||||
c.portMapper.Close()
|
||||
c.portMapperLogfUnregister()
|
||||
|
||||
c.peerMap.forEachEndpoint(func(ep *endpoint) {
|
||||
ep.stopAndReset()
|
||||
|
||||
Reference in New Issue
Block a user