Compare commits

..

13 Commits

Author SHA1 Message Date
M. J. Fromberger
0efeb032e6 wgengine/magicsock: subscribe to portmapper updates
When an event bus is plumbed in, use it to subscribe and react to port mapping
updates instead of using the client's callback mechanism. For now, the callback
remains available as a fallback when an event bus is not provided.

Updates #15160

Change-Id: I026adca44bf6187692ee87ae8ec02641c12f7774
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-03-26 08:50:51 -07:00
David Anderson
d7e92345eb net/netmon: publish events to event bus
Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
2025-03-26 08:50:51 -07:00
David Anderson
39449b12ec derp/derphttp: remove ban on websockets dependency
The event bus's debug page uses websockets.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
2025-03-26 08:50:51 -07:00
David Anderson
f6547cd990 cmd/tailscaled: clean up unnecessary logf indirection #cleanup
Signed-off-by: David Anderson <dave@tailscale.com>
2025-03-26 08:50:51 -07:00
M. J. Fromberger
cf230a8362 net/portmapper: fire an event when a port mapping is updated (#15371)
When an event bus is configured publish an event each time a new port mapping
is updated. Publication is unconditional and occurs prior to calling any
callback that is registered. For now, the callback is still fired in a separate
goroutine as before -- later, those callbacks should become subscriptions to
the published event.

For now, the event type is defined as a new type here in the package. We will
want to move it to a more central package when there are subscribers. The event
wrapper is effectively a subset of the data exported by the internal mapping
interface, but on a concrete struct so the bus plumbing can inspect it.

Updates #15160

Change-Id: I951f212429ac791223af8d75b6eb39a0d2a0053a
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-03-26 08:50:51 -07:00
M. J. Fromberger
039605e500 all: update the tsd.System constructor name (#15372)
Replace NewSystemWithEventBus with plain NewSystem, and update all usage.
See https://github.com/tailscale/tailscale/pull/15355#discussion_r2003910766

Updates #15160

Change-Id: I64d337f09576b41d9ad78eba301a74b9a9d6ebf4
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-03-26 08:50:51 -07:00
M. J. Fromberger
8d6ff1c66a {wgengine,util/portmapper}: add and plumb an event bus (#15359)
Updates #15160

Change-Id: I2510fb4a8905fb0abe8a8e0c5b81adb15d50a6f8
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-03-26 08:50:51 -07:00
M. J. Fromberger
c654c5bd7a portmapper: update NewClient to use a Config argument
In preparation for adding more parameters (and later, moving some away), rework
the portmapper constructor to accept its arguments on a Config struct rather
than positionally.

This is a breaking change to the function signature, but one that is very easy
to update, and a search of GitHub reveals only six instances of usage outside
clones and forks of Tailscale itself, that are not direct copies of the code
fixed up here.

While we could stub in another constructor, I think it is safe to let those
folks do the update in-place, since their usage is already affected by other
changes we can't test for anyway.

Updates #15160

Change-Id: I9f8a5e12b38885074c98894b7376039261b43f43
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-03-26 08:50:51 -07:00
M. J. Fromberger
981d721d20 wgengine: plumb an event bus into the userspace engine
Updates #15160

Change-Id: Ia695ccdddd09cd950de22abd000d4c531d6bf3c8
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-03-26 08:50:51 -07:00
M. J. Fromberger
a612b2de5a tsnet: shut down the event bus on Close
Updates #15160

Change-Id: I29c8194b4b41e95848e5f160e9970db352588449
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-03-26 08:50:51 -07:00
M. J. Fromberger
f5ddc0d6c3 all: construct new System values with an event bus pre-populated
Although, at the moment, we do not yet require an event bus to be present, as
we start to add more pieces we will want to ensure it is always available.  Add
a new constructor and replace existing uses of new(tsd.System) throughout.
Update generated files for import changes.

Updates #15160

Change-Id: Ie5460985571ade87b8eac8b416948c7f49f0f64b
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-03-26 08:50:51 -07:00
David Anderson
cf85d4a3b1 tsd: wire up the event bus to tailscaled
Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
2025-03-26 08:50:51 -07:00
David Anderson
b2b1737f86 tsweb: don't hook up pprof handlers in javascript builds
Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
2025-03-26 08:50:51 -07:00
60 changed files with 568 additions and 233 deletions

View File

@@ -1 +1 @@
1.83.0
1.81.0

View File

@@ -128,17 +128,16 @@ func (m *manualCertManager) TLSConfig() *tls.Config {
}
func (m *manualCertManager) getCertificate(hi *tls.ClientHelloInfo) (*tls.Certificate, error) {
// if hi.ServerName != m.hostname && !m.noHostname {
// return nil, fmt.Errorf("cert mismatch with hostname: %q", hi.ServerName)
// }
if hi.ServerName != m.hostname && !m.noHostname {
return nil, fmt.Errorf("cert mismatch with hostname: %q", hi.ServerName)
}
// Return a shallow copy of the cert so the caller can append to its
// Certificate field.
// certCopy := new(tls.Certificate)
// *certCopy = *m.cert
// certCopy.Certificate = certCopy.Certificate[:len(certCopy.Certificate):len(certCopy.Certificate)]
// return certCopy, nil
return m.cert, nil
certCopy := new(tls.Certificate)
*certCopy = *m.cert
certCopy.Certificate = certCopy.Certificate[:len(certCopy.Certificate):len(certCopy.Certificate)]
return certCopy, nil
}
func (m *manualCertManager) HTTPHandler(fallback http.Handler) http.Handler {

View File

@@ -155,6 +155,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa
💣 tailscale.com/util/deephash from tailscale.com/util/syspolicy/setting
L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics
tailscale.com/util/dnsname from tailscale.com/hostinfo+
tailscale.com/util/eventbus from tailscale.com/net/netmon
💣 tailscale.com/util/hashx from tailscale.com/util/deephash
tailscale.com/util/httpm from tailscale.com/client/tailscale
tailscale.com/util/lineiter from tailscale.com/hostinfo+
@@ -308,7 +309,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa
hash/fnv from google.golang.org/protobuf/internal/detrand
hash/maphash from go4.org/mem
html from net/http/pprof+
html/template from tailscale.com/cmd/derper
html/template from tailscale.com/cmd/derper+
internal/abi from crypto/x509/internal/macos+
internal/asan from internal/runtime/maps+
internal/bisect from internal/godebug

View File

@@ -82,6 +82,10 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
L github.com/aws/smithy-go/waiter from github.com/aws/aws-sdk-go-v2/service/ssm
github.com/beorn7/perks/quantile from github.com/prometheus/client_golang/prometheus
💣 github.com/cespare/xxhash/v2 from github.com/prometheus/client_golang/prometheus
github.com/coder/websocket from tailscale.com/util/eventbus
github.com/coder/websocket/internal/errd from github.com/coder/websocket
github.com/coder/websocket/internal/util from github.com/coder/websocket
github.com/coder/websocket/internal/xsync from github.com/coder/websocket
L github.com/coreos/go-iptables/iptables from tailscale.com/util/linuxfw
💣 github.com/davecgh/go-spew/spew from k8s.io/apimachinery/pkg/util/dump
W 💣 github.com/dblohm7/wingoes from github.com/dblohm7/wingoes/com+
@@ -903,7 +907,8 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
tailscale.com/tstime from tailscale.com/cmd/k8s-operator+
tailscale.com/tstime/mono from tailscale.com/net/tstun+
tailscale.com/tstime/rate from tailscale.com/derp+
tailscale.com/tsweb/varz from tailscale.com/util/usermetric
tailscale.com/tsweb from tailscale.com/util/eventbus
tailscale.com/tsweb/varz from tailscale.com/util/usermetric+
tailscale.com/types/appctype from tailscale.com/ipn/ipnlocal
tailscale.com/types/bools from tailscale.com/tsnet
tailscale.com/types/dnstype from tailscale.com/ipn/ipnlocal+
@@ -932,6 +937,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
💣 tailscale.com/util/deephash from tailscale.com/ipn/ipnlocal+
L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics+
tailscale.com/util/dnsname from tailscale.com/appc+
tailscale.com/util/eventbus from tailscale.com/tsd+
tailscale.com/util/execqueue from tailscale.com/appc+
tailscale.com/util/goroutines from tailscale.com/ipn/ipnlocal
tailscale.com/util/groupmember from tailscale.com/client/web+
@@ -1149,7 +1155,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
hash/fnv from google.golang.org/protobuf/internal/detrand
hash/maphash from go4.org/mem
html from html/template+
html/template from github.com/gorilla/csrf
html/template from github.com/gorilla/csrf+
internal/abi from crypto/x509/internal/macos+
internal/asan from internal/runtime/maps+
internal/bisect from internal/godebug

View File

@@ -43,6 +43,7 @@ import (
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
"tailscale.com/util/must"
)
@@ -956,7 +957,10 @@ func runTS2021(ctx context.Context, args []string) error {
logf = log.Printf
}
netMon, err := netmon.New(logger.WithPrefix(logf, "netmon: "))
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(logf, "netmon: "))
if err != nil {
return fmt.Errorf("creating netmon: %w", err)
}

View File

@@ -24,6 +24,7 @@ import (
"tailscale.com/net/tlsdial"
"tailscale.com/tailcfg"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
var netcheckCmd = &ffcli.Command{
@@ -48,14 +49,19 @@ var netcheckArgs struct {
func runNetcheck(ctx context.Context, args []string) error {
logf := logger.WithPrefix(log.Printf, "portmap: ")
netMon, err := netmon.New(logf)
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logf)
if err != nil {
return err
}
// Ensure that we close the portmapper after running a netcheck; this
// will release any port mappings created.
pm := portmapper.NewClient(logf, netMon, nil, nil, nil)
pm := portmapper.NewClient(portmapper.Config{
Logf: logf,
NetMon: netMon,
})
defer pm.Close()
c := &netcheck.Client{

View File

@@ -5,6 +5,10 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
W 💣 github.com/alexbrainman/sspi from github.com/alexbrainman/sspi/internal/common+
W github.com/alexbrainman/sspi/internal/common from github.com/alexbrainman/sspi/negotiate
W 💣 github.com/alexbrainman/sspi/negotiate from tailscale.com/net/tshttpproxy
github.com/coder/websocket from tailscale.com/util/eventbus
github.com/coder/websocket/internal/errd from github.com/coder/websocket
github.com/coder/websocket/internal/util from github.com/coder/websocket
github.com/coder/websocket/internal/xsync from github.com/coder/websocket
L github.com/coreos/go-iptables/iptables from tailscale.com/util/linuxfw
W 💣 github.com/dblohm7/wingoes from github.com/dblohm7/wingoes/pe+
W 💣 github.com/dblohm7/wingoes/pe from tailscale.com/util/winutil/authenticode
@@ -89,6 +93,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
tailscale.com/drive from tailscale.com/client/local+
tailscale.com/envknob from tailscale.com/client/local+
tailscale.com/envknob/featureknob from tailscale.com/client/web
tailscale.com/feature from tailscale.com/tsweb
tailscale.com/feature/capture/dissector from tailscale.com/cmd/tailscale/cli
tailscale.com/health from tailscale.com/net/tlsdial+
tailscale.com/health/healthmsg from tailscale.com/cmd/tailscale/cli
@@ -131,7 +136,8 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
tailscale.com/tstime from tailscale.com/control/controlhttp+
tailscale.com/tstime/mono from tailscale.com/tstime/rate
tailscale.com/tstime/rate from tailscale.com/cmd/tailscale/cli+
tailscale.com/tsweb/varz from tailscale.com/util/usermetric
tailscale.com/tsweb from tailscale.com/util/eventbus
tailscale.com/tsweb/varz from tailscale.com/util/usermetric+
tailscale.com/types/dnstype from tailscale.com/tailcfg+
tailscale.com/types/empty from tailscale.com/ipn
tailscale.com/types/ipproto from tailscale.com/ipn+
@@ -156,6 +162,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
💣 tailscale.com/util/deephash from tailscale.com/util/syspolicy/setting
L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics
tailscale.com/util/dnsname from tailscale.com/cmd/tailscale/cli+
tailscale.com/util/eventbus from tailscale.com/net/portmapper+
tailscale.com/util/groupmember from tailscale.com/client/web
💣 tailscale.com/util/hashx from tailscale.com/util/deephash
tailscale.com/util/httpm from tailscale.com/client/tailscale+
@@ -166,6 +173,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
tailscale.com/util/must from tailscale.com/clientupdate/distsign+
tailscale.com/util/nocasemaps from tailscale.com/types/ipproto
tailscale.com/util/quarantine from tailscale.com/cmd/tailscale/cli
tailscale.com/util/rands from tailscale.com/tsweb
tailscale.com/util/set from tailscale.com/derp+
tailscale.com/util/singleflight from tailscale.com/net/dnscache+
tailscale.com/util/slicesx from tailscale.com/net/dns/recursive+
@@ -328,7 +336,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
hash/crc32 from compress/gzip+
hash/maphash from go4.org/mem
html from html/template+
html/template from github.com/gorilla/csrf
html/template from github.com/gorilla/csrf+
image from github.com/skip2/go-qrcode+
image/color from github.com/skip2/go-qrcode+
image/png from github.com/skip2/go-qrcode
@@ -352,7 +360,8 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
internal/nettrace from net+
internal/oserror from io/fs+
internal/poll from net+
internal/profilerecord from runtime
internal/profile from net/http/pprof
internal/profilerecord from runtime+
internal/race from internal/poll+
internal/reflectlite from context+
internal/runtime/atomic from internal/runtime/exithook+
@@ -394,6 +403,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
net/http/httputil from tailscale.com/client/web+
net/http/internal from net/http+
net/http/internal/ascii from net/http+
net/http/pprof from tailscale.com/tsweb
net/netip from go4.org/netipx+
net/textproto from golang.org/x/net/http/httpguts+
net/url from crypto/x509+
@@ -408,6 +418,8 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
regexp/syntax from regexp
runtime from archive/tar+
runtime/debug from tailscale.com+
runtime/pprof from net/http/pprof
runtime/trace from net/http/pprof
slices from tailscale.com/client/web+
sort from compress/flate+
strconv from archive/tar+

View File

@@ -27,6 +27,7 @@ import (
"tailscale.com/net/tshttpproxy"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/util/eventbus"
)
var debugArgs struct {
@@ -72,11 +73,14 @@ func debugMode(args []string) error {
}
func runMonitor(ctx context.Context, loop bool) error {
b := eventbus.New()
defer b.Close()
dump := func(st *netmon.State) {
j, _ := json.MarshalIndent(st, "", " ")
os.Stderr.Write(j)
}
mon, err := netmon.New(log.Printf)
mon, err := netmon.New(b, log.Printf)
if err != nil {
return err
}

View File

@@ -81,6 +81,10 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
L github.com/aws/smithy-go/transport/http from github.com/aws/aws-sdk-go-v2/aws/middleware+
L github.com/aws/smithy-go/transport/http/internal/io from github.com/aws/smithy-go/transport/http
L github.com/aws/smithy-go/waiter from github.com/aws/aws-sdk-go-v2/service/ssm
github.com/coder/websocket from tailscale.com/util/eventbus
github.com/coder/websocket/internal/errd from github.com/coder/websocket
github.com/coder/websocket/internal/util from github.com/coder/websocket
github.com/coder/websocket/internal/xsync from github.com/coder/websocket
L github.com/coreos/go-iptables/iptables from tailscale.com/util/linuxfw
LD 💣 github.com/creack/pty from tailscale.com/ssh/tailssh
W 💣 github.com/dblohm7/wingoes from github.com/dblohm7/wingoes/com+
@@ -353,6 +357,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
tailscale.com/tstime from tailscale.com/control/controlclient+
tailscale.com/tstime/mono from tailscale.com/net/tstun+
tailscale.com/tstime/rate from tailscale.com/derp+
tailscale.com/tsweb from tailscale.com/util/eventbus
tailscale.com/tsweb/varz from tailscale.com/cmd/tailscaled+
tailscale.com/types/appctype from tailscale.com/ipn/ipnlocal
tailscale.com/types/dnstype from tailscale.com/ipn/ipnlocal+
@@ -382,6 +387,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
💣 tailscale.com/util/deephash from tailscale.com/ipn/ipnlocal+
L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics+
tailscale.com/util/dnsname from tailscale.com/appc+
tailscale.com/util/eventbus from tailscale.com/tsd+
tailscale.com/util/execqueue from tailscale.com/control/controlclient+
tailscale.com/util/goroutines from tailscale.com/ipn/ipnlocal
tailscale.com/util/groupmember from tailscale.com/client/web+
@@ -587,7 +593,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
hash/crc32 from compress/gzip+
hash/maphash from go4.org/mem
html from html/template+
html/template from github.com/gorilla/csrf
html/template from github.com/gorilla/csrf+
internal/abi from crypto/x509/internal/macos+
internal/asan from internal/runtime/maps+
internal/bisect from internal/godebug

View File

@@ -339,7 +339,9 @@ var debugMux *http.ServeMux
func run() (err error) {
var logf logger.Logf = log.Printf
sys := new(tsd.System)
// Install an event bus as early as possible, so that it's
// available universally when setting up everything else.
sys := tsd.NewSystem()
// Parse config, if specified, to fail early if it's invalid.
var conf *conffile.Config
@@ -354,9 +356,7 @@ func run() (err error) {
var netMon *netmon.Monitor
isWinSvc := isWindowsService()
if !isWinSvc {
netMon, err = netmon.New(func(format string, args ...any) {
logf(format, args...)
})
netMon, err = netmon.New(sys.Bus.Get(), logf)
if err != nil {
return fmt.Errorf("netmon.New: %w", err)
}

View File

@@ -327,8 +327,8 @@ func beWindowsSubprocess() bool {
log.Printf("Error pre-loading \"%s\": %v", fqWintunPath, err)
}
sys := new(tsd.System)
netMon, err := netmon.New(log.Printf)
sys := tsd.NewSystem()
netMon, err := netmon.New(sys.Bus.Get(), log.Printf)
if err != nil {
log.Fatalf("Could not create netMon: %v", err)
}

View File

@@ -100,7 +100,7 @@ func newIPN(jsConfig js.Value) map[string]any {
logtail := logtail.NewLogger(c, log.Printf)
logf := logtail.Logf
sys := new(tsd.System)
sys := tsd.NewSystem()
sys.Set(store)
dialer := &tsdial.Dialer{Logf: logf}
eng, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{

View File

@@ -32,7 +32,6 @@ import (
"tailscale.com/net/tsdial"
"tailscale.com/tailcfg"
"tailscale.com/tstest"
"tailscale.com/tstest/deptest"
"tailscale.com/tstime"
"tailscale.com/types/key"
"tailscale.com/types/logger"
@@ -822,14 +821,3 @@ func (c *closeTrackConn) Close() error {
c.d.noteClose(c)
return c.Conn.Close()
}
func TestDeps(t *testing.T) {
deptest.DepChecker{
GOOS: "darwin",
GOARCH: "arm64",
BadDeps: map[string]string{
// Only the controlhttpserver needs WebSockets...
"github.com/coder/websocket": "controlhttp client shouldn't need websockets",
},
}.Check(t)
}

View File

@@ -17,9 +17,7 @@ import (
"tailscale.com/derp"
"tailscale.com/net/netmon"
"tailscale.com/tstest/deptest"
"tailscale.com/types/key"
"tailscale.com/util/set"
)
func TestSendRecv(t *testing.T) {
@@ -487,23 +485,3 @@ func TestProbe(t *testing.T) {
}
}
}
func TestDeps(t *testing.T) {
deptest.DepChecker{
GOOS: "darwin",
GOARCH: "arm64",
BadDeps: map[string]string{
"github.com/coder/websocket": "shouldn't link websockets except on js/wasm",
},
}.Check(t)
deptest.DepChecker{
GOOS: "darwin",
GOARCH: "arm64",
Tags: "ts_debug_websockets",
WantDeps: set.Of(
"github.com/coder/websocket",
),
}.Check(t)
}

View File

@@ -436,7 +436,7 @@ func (panicOnUseTransport) RoundTrip(*http.Request) (*http.Response, error) {
}
func newTestLocalBackend(t testing.TB) *LocalBackend {
return newTestLocalBackendWithSys(t, new(tsd.System))
return newTestLocalBackendWithSys(t, tsd.NewSystem())
}
// newTestLocalBackendWithSys creates a new LocalBackend with the given tsd.System.
@@ -448,7 +448,7 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend {
sys.Set(new(mem.Store))
}
if _, ok := sys.Engine.GetOK(); !ok {
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err)
}
@@ -4411,10 +4411,10 @@ func newLocalBackendWithTestControl(t *testing.T, enableLogging bool, newControl
if enableLogging {
logf = tstest.WhileTestRunningLogger(t)
}
sys := new(tsd.System)
sys := tsd.NewSystem()
store := new(mem.Store)
sys.Set(store)
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err)
}
@@ -4859,9 +4859,8 @@ func TestConfigFileReload(t *testing.T) {
// Create backend with initial config
tc.initial.Path = path
tc.initial.Raw = initialJSON
sys := &tsd.System{
InitialConfig: tc.initial,
}
sys := tsd.NewSystem()
sys.InitialConfig = tc.initial
b := newTestLocalBackendWithSys(t, sys)
// Update config file

View File

@@ -47,10 +47,10 @@ func TestLocalLogLines(t *testing.T) {
idA := logid(0xaa)
// set up a LocalBackend, super bare bones. No functional data.
sys := new(tsd.System)
sys := tsd.NewSystem()
store := new(mem.Store)
sys.Set(store)
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil {
t.Fatal(err)
}

View File

@@ -34,6 +34,7 @@ import (
"tailscale.com/tstest"
"tailscale.com/types/logger"
"tailscale.com/types/netmap"
"tailscale.com/util/eventbus"
"tailscale.com/util/must"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine"
@@ -643,9 +644,12 @@ func TestPeerAPIReplyToDNSQueries(t *testing.T) {
h.isSelf = false
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker)
reg := new(usermetric.Registry)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
h.ps = &peerAPIServer{
b: &LocalBackend{
@@ -695,9 +699,12 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) {
var h peerAPIHandler
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker)
reg := new(usermetric.Registry)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
var a *appc.AppConnector
if shouldStore {
@@ -768,10 +775,12 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) {
var h peerAPIHandler
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
bus := eventbus.New()
defer bus.Close()
rc := &appctest.RouteCollector{}
ht := new(health.Tracker)
reg := new(usermetric.Registry)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
var a *appc.AppConnector
if shouldStore {
@@ -833,10 +842,12 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) {
var h peerAPIHandler
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker)
reg := new(usermetric.Registry)
rc := &appctest.RouteCollector{}
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
var a *appc.AppConnector
if shouldStore {

View File

@@ -877,11 +877,12 @@ func newTestBackend(t *testing.T) *LocalBackend {
logf = logger.WithPrefix(tstest.WhileTestRunningLogger(t), "... ")
}
sys := &tsd.System{}
sys := tsd.NewSystem()
e, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{
SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(),
Metrics: sys.UserMetricsRegistry(),
EventBus: sys.Bus.Get(),
})
if err != nil {
t.Fatal(err)

View File

@@ -295,10 +295,10 @@ func TestStateMachine(t *testing.T) {
c := qt.New(t)
logf := tstest.WhileTestRunningLogger(t)
sys := new(tsd.System)
sys := tsd.NewSystem()
store := new(testStateStorage)
sys.Set(store)
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err)
}
@@ -934,9 +934,9 @@ func TestStateMachine(t *testing.T) {
func TestEditPrefsHasNoKeys(t *testing.T) {
logf := tstest.WhileTestRunningLogger(t)
sys := new(tsd.System)
sys := tsd.NewSystem()
sys.Set(new(mem.Store))
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err)
}
@@ -1014,10 +1014,10 @@ func TestWGEngineStatusRace(t *testing.T) {
t.Skip("test fails")
c := qt.New(t)
logf := tstest.WhileTestRunningLogger(t)
sys := new(tsd.System)
sys := tsd.NewSystem()
sys.Set(new(mem.Store))
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set)
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.Bus.Get())
c.Assert(err, qt.IsNil)
t.Cleanup(eng.Close)
sys.Set(eng)

View File

@@ -517,12 +517,12 @@ type newControlClientFn func(tb testing.TB, opts controlclient.Options) controlc
func newLocalBackendWithTestControl(tb testing.TB, newControl newControlClientFn, enableLogging bool) *ipnlocal.LocalBackend {
tb.Helper()
sys := &tsd.System{}
sys := tsd.NewSystem()
store := &mem.Store{}
sys.Set(store)
logf := testLogger(tb, enableLogging)
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil {
tb.Fatalf("NewFakeUserspaceEngine: %v", err)
}

View File

@@ -56,6 +56,7 @@ import (
"tailscale.com/types/ptr"
"tailscale.com/types/tkatype"
"tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
"tailscale.com/util/httphdr"
"tailscale.com/util/httpm"
"tailscale.com/util/mak"
@@ -818,23 +819,31 @@ func (h *Handler) serveDebugPortmap(w http.ResponseWriter, r *http.Request) {
done := make(chan bool, 1)
var c *portmapper.Client
c = portmapper.NewClient(logger.WithPrefix(logf, "portmapper: "), h.b.NetMon(), debugKnobs, h.b.ControlKnobs(), func() {
logf("portmapping changed.")
logf("have mapping: %v", c.HaveMapping())
c = portmapper.NewClient(portmapper.Config{
Logf: logger.WithPrefix(logf, "portmapper: "),
NetMon: h.b.NetMon(),
DebugKnobs: debugKnobs,
ControlKnobs: h.b.ControlKnobs(),
OnChange: func() {
logf("portmapping changed.")
logf("have mapping: %v", c.HaveMapping())
if ext, ok := c.GetCachedMappingOrStartCreatingOne(); ok {
logf("cb: mapping: %v", ext)
select {
case done <- true:
default:
if ext, ok := c.GetCachedMappingOrStartCreatingOne(); ok {
logf("cb: mapping: %v", ext)
select {
case done <- true:
default:
}
return
}
return
}
logf("cb: no mapping")
logf("cb: no mapping")
},
})
defer c.Close()
netMon, err := netmon.New(logger.WithPrefix(logf, "monitor: "))
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(logf, "monitor: "))
if err != nil {
logf("error creating monitor: %v", err)
return

View File

@@ -336,10 +336,10 @@ func TestServeWatchIPNBus(t *testing.T) {
func newTestLocalBackend(t testing.TB) *ipnlocal.LocalBackend {
var logf logger.Logf = logger.Discard
sys := new(tsd.System)
sys := tsd.NewSystem()
store := new(mem.Store)
sys.Set(store)
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err)
}

View File

@@ -29,6 +29,7 @@ import (
"tailscale.com/net/tsdial"
"tailscale.com/tstest"
"tailscale.com/types/dnstype"
"tailscale.com/util/eventbus"
)
func (rr resolverAndDelay) String() string {
@@ -454,7 +455,9 @@ func makeLargeResponse(tb testing.TB, domain string) (request, response []byte)
func runTestQuery(tb testing.TB, request []byte, modify func(*forwarder), ports ...uint16) ([]byte, error) {
logf := tstest.WhileTestRunningLogger(tb)
netMon, err := netmon.New(logf)
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logf)
if err != nil {
tb.Fatal(err)
}

View File

@@ -31,6 +31,7 @@ import (
"tailscale.com/types/dnstype"
"tailscale.com/types/logger"
"tailscale.com/util/dnsname"
"tailscale.com/util/eventbus"
)
var (
@@ -1059,7 +1060,10 @@ func TestForwardLinkSelection(t *testing.T) {
// routes differently.
specialIP := netaddr.IPv4(1, 2, 3, 4)
netMon, err := netmon.New(logger.WithPrefix(t.Logf, ".... netmon: "))
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, ".... netmon: "))
if err != nil {
t.Fatal(err)
}

View File

@@ -15,6 +15,7 @@ import (
"tailscale.com/net/netmon"
"tailscale.com/tailcfg"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
func TestGetDERPMap(t *testing.T) {
@@ -185,7 +186,10 @@ func TestLookup(t *testing.T) {
logf, closeLogf := logger.LogfCloser(t.Logf)
defer closeLogf()
netMon, err := netmon.New(logf)
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logf)
if err != nil {
t.Fatal(err)
}

View File

@@ -7,10 +7,14 @@ import (
"bytes"
"fmt"
"testing"
"tailscale.com/util/eventbus"
)
func TestLinkChangeLogLimiter(t *testing.T) {
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}

View File

@@ -16,6 +16,7 @@ import (
"tailscale.com/types/logger"
"tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
"tailscale.com/util/set"
)
@@ -50,7 +51,10 @@ type osMon interface {
// Monitor represents a monitoring instance.
type Monitor struct {
logf logger.Logf
logf logger.Logf
b *eventbus.Client
changed *eventbus.Publisher[*ChangeDelta]
om osMon // nil means not supported on this platform
change chan bool // send false to wake poller, true to also force ChangeDeltas be sent
stop chan struct{} // closed on Stop
@@ -114,21 +118,23 @@ type ChangeDelta struct {
// New instantiates and starts a monitoring instance.
// The returned monitor is inactive until it's started by the Start method.
// Use RegisterChangeCallback to get notified of network changes.
func New(logf logger.Logf) (*Monitor, error) {
func New(bus *eventbus.Bus, logf logger.Logf) (*Monitor, error) {
logf = logger.WithPrefix(logf, "monitor: ")
m := &Monitor{
logf: logf,
b: bus.Client("netmon"),
change: make(chan bool, 1),
stop: make(chan struct{}),
lastWall: wallTime(),
}
m.changed = eventbus.Publish[*ChangeDelta](m.b)
st, err := m.interfaceStateUncached()
if err != nil {
return nil, err
}
m.ifState = st
m.om, err = newOSMon(logf, m)
m.om, err = newOSMon(bus, logf, m)
if err != nil {
return nil, err
}
@@ -465,6 +471,7 @@ func (m *Monitor) handlePotentialChange(newState *State, forceCallbacks bool) {
if delta.TimeJumped {
metricChangeTimeJump.Add(1)
}
m.changed.Publish(delta)
for _, cb := range m.cbs {
go cb(delta)
}

View File

@@ -13,6 +13,7 @@ import (
"golang.org/x/sys/unix"
"tailscale.com/net/netaddr"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
const debugRouteMessages = false
@@ -24,7 +25,7 @@ type unspecifiedMessage struct{}
func (unspecifiedMessage) ignore() bool { return false }
func newOSMon(logf logger.Logf, _ *Monitor) (osMon, error) {
func newOSMon(_ *eventbus.Bus, logf logger.Logf, _ *Monitor) (osMon, error) {
fd, err := unix.Socket(unix.AF_ROUTE, unix.SOCK_RAW, 0)
if err != nil {
return nil, err

View File

@@ -10,6 +10,7 @@ import (
"strings"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
// unspecifiedMessage is a minimal message implementation that should not
@@ -24,7 +25,7 @@ type devdConn struct {
conn net.Conn
}
func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) {
func newOSMon(_ *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) {
conn, err := net.Dial("unixpacket", "/var/run/devd.seqpacket.pipe")
if err != nil {
logf("devd dial error: %v, falling back to polling method", err)

View File

@@ -16,6 +16,7 @@ import (
"tailscale.com/envknob"
"tailscale.com/net/tsaddr"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
var debugNetlinkMessages = envknob.RegisterBool("TS_DEBUG_NETLINK")
@@ -27,15 +28,26 @@ type unspecifiedMessage struct{}
func (unspecifiedMessage) ignore() bool { return false }
// RuleDeleted reports that one of Tailscale's policy routing rules
// was deleted.
type RuleDeleted struct {
// Table is the table number that the deleted rule referenced.
Table uint8
// Priority is the lookup priority of the deleted rule.
Priority uint32
}
// nlConn wraps a *netlink.Conn and returns a monitor.Message
// instead of a netlink.Message. Currently, messages are discarded,
// but down the line, when messages trigger different logic depending
// on the type of event, this provides the capability of handling
// each architecture-specific message in a generic fashion.
type nlConn struct {
logf logger.Logf
conn *netlink.Conn
buffered []netlink.Message
busClient *eventbus.Client
rulesDeleted *eventbus.Publisher[RuleDeleted]
logf logger.Logf
conn *netlink.Conn
buffered []netlink.Message
// addrCache maps interface indices to a set of addresses, and is
// used to suppress duplicate RTM_NEWADDR messages. It is populated
@@ -44,7 +56,7 @@ type nlConn struct {
addrCache map[uint32]map[netip.Addr]bool
}
func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) {
func newOSMon(bus *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) {
conn, err := netlink.Dial(unix.NETLINK_ROUTE, &netlink.Config{
// Routes get us most of the events of interest, but we need
// address as well to cover things like DHCP deciding to give
@@ -59,12 +71,22 @@ func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) {
logf("monitor_linux: AF_NETLINK RTMGRP failed, falling back to polling")
return newPollingMon(logf, m)
}
return &nlConn{logf: logf, conn: conn, addrCache: make(map[uint32]map[netip.Addr]bool)}, nil
client := bus.Client("netmon-iprules")
return &nlConn{
busClient: client,
rulesDeleted: eventbus.Publish[RuleDeleted](client),
logf: logf,
conn: conn,
addrCache: make(map[uint32]map[netip.Addr]bool),
}, nil
}
func (c *nlConn) IsInterestingInterface(iface string) bool { return true }
func (c *nlConn) Close() error { return c.conn.Close() }
func (c *nlConn) Close() error {
c.busClient.Close()
return c.conn.Close()
}
func (c *nlConn) Receive() (message, error) {
if len(c.buffered) == 0 {
@@ -219,6 +241,10 @@ func (c *nlConn) Receive() (message, error) {
// On `ip -4 rule del pref 5210 table main`, logs:
// monitor: ip rule deleted: {Family:2 DstLength:0 SrcLength:0 Tos:0 Table:254 Protocol:0 Scope:0 Type:1 Flags:0 Attributes:{Dst:<nil> Src:<nil> Gateway:<nil> OutIface:0 Priority:5210 Table:254 Mark:4294967295 Expires:<nil> Metrics:<nil> Multipath:[]}}
}
c.rulesDeleted.Publish(RuleDeleted{
Table: rmsg.Table,
Priority: rmsg.Attributes.Priority,
})
rdm := ipRuleDeletedMessage{
table: rmsg.Table,
priority: rmsg.Attributes.Priority,

View File

@@ -7,9 +7,10 @@ package netmon
import (
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) {
func newOSMon(_ *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) {
return newPollingMon(logf, m)
}

View File

@@ -11,11 +11,15 @@ import (
"testing"
"time"
"tailscale.com/util/eventbus"
"tailscale.com/util/mak"
)
func TestMonitorStartClose(t *testing.T) {
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}
@@ -26,7 +30,10 @@ func TestMonitorStartClose(t *testing.T) {
}
func TestMonitorJustClose(t *testing.T) {
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}
@@ -36,7 +43,10 @@ func TestMonitorJustClose(t *testing.T) {
}
func TestMonitorInjectEvent(t *testing.T) {
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}
@@ -71,7 +81,11 @@ func TestMonitorMode(t *testing.T) {
default:
t.Skipf(`invalid --monitor value: must be "raw" or "callback"`)
}
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}

View File

@@ -13,6 +13,7 @@ import (
"golang.zx2c4.com/wireguard/windows/tunnel/winipcfg"
"tailscale.com/net/tsaddr"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
var (
@@ -45,7 +46,7 @@ type winMon struct {
noDeadlockTicker *time.Ticker
}
func newOSMon(logf logger.Logf, pm *Monitor) (osMon, error) {
func newOSMon(_ *eventbus.Bus, logf logger.Logf, pm *Monitor) (osMon, error) {
m := &winMon{
logf: logf,
isActive: pm.isActive,

View File

@@ -10,6 +10,7 @@ import (
"testing"
"tailscale.com/net/netmon"
"tailscale.com/util/eventbus"
)
type conn struct {
@@ -72,7 +73,10 @@ func TestCheckReversePathFiltering(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skipf("skipping on %s", runtime.GOOS)
}
netMon, err := netmon.New(t.Logf)
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}

View File

@@ -19,6 +19,7 @@ import (
"tailscale.com/net/netmon"
"tailscale.com/syncs"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
// TestIGD is an IGD (Internet Gateway Device) for testing. It supports fake
@@ -258,15 +259,25 @@ func (d *TestIGD) handlePCPQuery(pkt []byte, src netip.AddrPort) {
}
}
func newTestClient(t *testing.T, igd *TestIGD) *Client {
// newTestClient configures a new test client connected to igd for mapping updates.
// If bus != nil, update events are published to it.
// A cleanup for the resulting client is added to t.
func newTestClient(t *testing.T, igd *TestIGD, bus *eventbus.Bus) *Client {
var c *Client
c = NewClient(t.Logf, netmon.NewStatic(), nil, new(controlknobs.Knobs), func() {
t.Logf("port map changed")
t.Logf("have mapping: %v", c.HaveMapping())
c = NewClient(Config{
Logf: t.Logf,
NetMon: netmon.NewStatic(),
ControlKnobs: new(controlknobs.Knobs),
EventBus: bus,
OnChange: func() {
t.Logf("port map changed")
t.Logf("have mapping: %v", c.HaveMapping())
},
})
c.testPxPPort = igd.TestPxPPort()
c.testUPnPPort = igd.TestUPnPPort()
c.netMon = netmon.NewStatic()
c.SetGatewayLookupFunc(testIPAndGateway)
t.Cleanup(func() { c.Close() })
return c
}

View File

@@ -31,6 +31,7 @@ import (
"tailscale.com/types/logger"
"tailscale.com/types/nettype"
"tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
)
var disablePortMapperEnv = envknob.RegisterBool("TS_DISABLE_PORTMAPPER")
@@ -84,6 +85,11 @@ const trustServiceStillAvailableDuration = 10 * time.Minute
// Client is a port mapping client.
type Client struct {
// The following two fields must either both be nil, or both non-nil.
// Both are immutable after construction.
pubClient *eventbus.Client
updates *eventbus.Publisher[Mapping]
logf logger.Logf
netMon *netmon.Monitor // optional; nil means interfaces will be looked up on-demand
controlKnobs *controlknobs.Knobs
@@ -201,32 +207,56 @@ func (m *pmpMapping) Release(ctx context.Context) {
uc.WriteToUDPAddrPort(pkt, m.gw)
}
// NewClient returns a new portmapping client.
//
// The netMon parameter is required.
//
// The debug argument allows configuring the behaviour of the portmapper for
// debugging; if nil, a sensible set of defaults will be used.
//
// The controlKnobs, if non-nil, specifies the control knobs from the control
// plane that might disable portmapping.
//
// The optional onChange argument specifies a func to run in a new goroutine
// whenever the port mapping status has changed. If nil, it doesn't make a
// callback.
func NewClient(logf logger.Logf, netMon *netmon.Monitor, debug *DebugKnobs, controlKnobs *controlknobs.Knobs, onChange func()) *Client {
if netMon == nil {
// Config carries the settings for a [Client].
type Config struct {
// EventBus, if non-nil, is used for event publication and subscription by
// portmapper clients created from this config.
//
// TODO(creachadair): As of 2025-03-19 this is optional, but is intended to
// become required non-nil.
EventBus *eventbus.Bus
// Logf is called to generate text logs for the client. If nil, logger.Discard is used.
Logf logger.Logf
// NetMon is the network monitor used by the client. It must be non-nil.
NetMon *netmon.Monitor
// DebugKnobs, if non-nil, configure the behaviour of the portmapper for
// debugging. If nil, a sensible set of defaults will be used.
DebugKnobs *DebugKnobs
// ControlKnobs, if non-nil, specifies knobs from the control plane that
// might disable port mapping.
ControlKnobs *controlknobs.Knobs
// OnChange is called to run in a new goroutine whenever the port mapping
// status has changed. If nil, no callback is issued.
OnChange func()
}
// NewClient constructs a new portmapping [Client] from c. It will panic if any
// required parameters are omitted.
func NewClient(c Config) *Client {
if c.NetMon == nil {
panic("nil netMon")
}
ret := &Client{
logf: logf,
netMon: netMon,
logf: c.Logf,
netMon: c.NetMon,
ipAndGateway: netmon.LikelyHomeRouterIP, // TODO(bradfitz): move this to method on netMon
onChange: onChange,
controlKnobs: controlKnobs,
onChange: c.OnChange,
controlKnobs: c.ControlKnobs,
}
if debug != nil {
ret.debug = *debug
if c.EventBus != nil {
ret.pubClient = c.EventBus.Client("portmapper")
ret.updates = eventbus.Publish[Mapping](ret.pubClient)
}
if ret.logf == nil {
ret.logf = logger.Discard
}
if c.DebugKnobs != nil {
ret.debug = *c.DebugKnobs
}
return ret
}
@@ -256,6 +286,10 @@ func (c *Client) Close() error {
}
c.closed = true
c.invalidateMappingsLocked(true)
if c.updates != nil {
c.updates.Close()
c.pubClient.Close()
}
// TODO: close some future ever-listening UDP socket(s),
// waiting for multicast announcements from router.
return nil
@@ -467,11 +501,30 @@ func (c *Client) createMapping() {
c.runningCreate = false
}()
if _, err := c.createOrGetMapping(ctx); err == nil && c.onChange != nil {
go c.onChange()
} else if err != nil && !IsNoMappingError(err) {
c.logf("createOrGetMapping: %v", err)
mapping, _, err := c.createOrGetMapping(ctx)
if err != nil {
if !IsNoMappingError(err) {
c.logf("createOrGetMapping: %v", err)
}
return
}
c.updates.Publish(Mapping{
External: mapping.External(),
Type: mapping.MappingType(),
GoodUntil: mapping.GoodUntil(),
})
if c.onChange != nil {
go c.onChange()
}
}
// Mapping is an event recording the allocation of a port mapping.
type Mapping struct {
External netip.AddrPort
Type string
GoodUntil time.Time
// TODO(creachadair): Record whether we reused an existing mapping?
}
// wildcardIP is used when the previous external IP is not known for PCP port mapping.
@@ -482,19 +535,19 @@ var wildcardIP = netip.MustParseAddr("0.0.0.0")
//
// If no mapping is available, the error will be of type
// NoMappingError; see IsNoMappingError.
func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPort, err error) {
func (c *Client) createOrGetMapping(ctx context.Context) (mapping mapping, external netip.AddrPort, err error) {
if c.debug.disableAll() {
return netip.AddrPort{}, NoMappingError{ErrPortMappingDisabled}
return nil, netip.AddrPort{}, NoMappingError{ErrPortMappingDisabled}
}
if c.debug.DisableUPnP && c.debug.DisablePCP && c.debug.DisablePMP {
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
gw, myIP, ok := c.gatewayAndSelfIP()
if !ok {
return netip.AddrPort{}, NoMappingError{ErrGatewayRange}
return nil, netip.AddrPort{}, NoMappingError{ErrGatewayRange}
}
if gw.Is6() {
return netip.AddrPort{}, NoMappingError{ErrGatewayIPv6}
return nil, netip.AddrPort{}, NoMappingError{ErrGatewayIPv6}
}
now := time.Now()
@@ -523,6 +576,17 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
return
}
// TODO(creachadair): This is more subtle than it should be. Ideally we
// would just return the mapping directly, but there are many different
// paths through the function with carefully-balanced locks, and not all
// the paths have a mapping to return. As a workaround, while we're here
// doing cleanup under the lock, grab the final mapping value and return
// it, so the caller does not need to grab the lock again and potentially
// race with a later update. The mapping itself is concurrency-safe.
//
// We should restructure this code so the locks are properly scoped.
mapping = c.mapping
// Print the internal details of each mapping if we're being verbose.
if c.debug.VerboseLogs {
c.logf("successfully obtained mapping: now=%d external=%v type=%s mapping=%s",
@@ -548,7 +612,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if now.Before(m.RenewAfter()) {
defer c.mu.Unlock()
reusedExisting = true
return m.External(), nil
return nil, m.External(), nil
}
// The mapping might still be valid, so just try to renew it.
prevPort = m.External().Port()
@@ -557,10 +621,10 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if c.debug.DisablePCP && c.debug.DisablePMP {
c.mu.Unlock()
if external, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok {
return external, nil
return nil, external, nil
}
c.vlogf("fallback to UPnP due to PCP and PMP being disabled failed")
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
// If we just did a Probe (e.g. via netchecker) but didn't
@@ -587,16 +651,16 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
c.mu.Unlock()
// fallback to UPnP portmapping
if external, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok {
return external, nil
return nil, external, nil
}
c.vlogf("fallback to UPnP due to no PCP and PMP failed")
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
c.mu.Unlock()
uc, err := c.listenPacket(ctx, "udp4", ":0")
if err != nil {
return netip.AddrPort{}, err
return nil, netip.AddrPort{}, err
}
defer uc.Close()
@@ -616,7 +680,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if neterror.TreatAsLostUDP(err) {
err = NoMappingError{ErrNoPortMappingServices}
}
return netip.AddrPort{}, err
return nil, netip.AddrPort{}, err
}
} else {
// Ask for our external address if needed.
@@ -625,7 +689,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if neterror.TreatAsLostUDP(err) {
err = NoMappingError{ErrNoPortMappingServices}
}
return netip.AddrPort{}, err
return nil, netip.AddrPort{}, err
}
}
@@ -634,7 +698,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if neterror.TreatAsLostUDP(err) {
err = NoMappingError{ErrNoPortMappingServices}
}
return netip.AddrPort{}, err
return nil, netip.AddrPort{}, err
}
}
@@ -643,13 +707,13 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
n, src, err := uc.ReadFromUDPAddrPort(res)
if err != nil {
if ctx.Err() == context.Canceled {
return netip.AddrPort{}, err
return nil, netip.AddrPort{}, err
}
// fallback to UPnP portmapping
if mapping, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok {
return mapping, nil
return nil, mapping, nil
}
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
src = netaddr.Unmap(src)
if !src.IsValid() {
@@ -665,7 +729,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
continue
}
if pres.ResultCode != 0 {
return netip.AddrPort{}, NoMappingError{fmt.Errorf("PMP response Op=0x%x,Res=0x%x", pres.OpCode, pres.ResultCode)}
return nil, netip.AddrPort{}, NoMappingError{fmt.Errorf("PMP response Op=0x%x,Res=0x%x", pres.OpCode, pres.ResultCode)}
}
if pres.OpCode == pmpOpReply|pmpOpMapPublicAddr {
m.external = netip.AddrPortFrom(pres.PublicAddr, m.external.Port())
@@ -683,7 +747,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if err != nil {
c.logf("failed to get PCP mapping: %v", err)
// PCP should only have a single packet response
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
pcpMapping.c = c
pcpMapping.internal = m.internal
@@ -691,10 +755,10 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
c.mu.Lock()
defer c.mu.Unlock()
c.mapping = pcpMapping
return pcpMapping.external, nil
return pcpMapping, pcpMapping.external, nil
default:
c.logf("unknown PMP/PCP version number: %d %v", version, res[:n])
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
}
@@ -702,7 +766,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
c.mu.Lock()
defer c.mu.Unlock()
c.mapping = m
return m.external, nil
return nil, m.external, nil
}
}
}

View File

@@ -12,20 +12,21 @@ import (
"time"
"tailscale.com/control/controlknobs"
"tailscale.com/util/eventbus"
)
func TestCreateOrGetMapping(t *testing.T) {
if v, _ := strconv.ParseBool(os.Getenv("HIT_NETWORK")); !v {
t.Skip("skipping test without HIT_NETWORK=1")
}
c := NewClient(t.Logf, nil, nil, new(controlknobs.Knobs), nil)
c := NewClient(Config{Logf: t.Logf, ControlKnobs: new(controlknobs.Knobs)})
defer c.Close()
c.SetLocalPort(1234)
for i := range 2 {
if i > 0 {
time.Sleep(100 * time.Millisecond)
}
ext, err := c.createOrGetMapping(context.Background())
_, ext, err := c.createOrGetMapping(context.Background())
t.Logf("Got: %v, %v", ext, err)
}
}
@@ -34,7 +35,7 @@ func TestClientProbe(t *testing.T) {
if v, _ := strconv.ParseBool(os.Getenv("HIT_NETWORK")); !v {
t.Skip("skipping test without HIT_NETWORK=1")
}
c := NewClient(t.Logf, nil, nil, new(controlknobs.Knobs), nil)
c := NewClient(Config{Logf: t.Logf, ControlKnobs: new(controlknobs.Knobs)})
defer c.Close()
for i := range 3 {
if i > 0 {
@@ -49,13 +50,13 @@ func TestClientProbeThenMap(t *testing.T) {
if v, _ := strconv.ParseBool(os.Getenv("HIT_NETWORK")); !v {
t.Skip("skipping test without HIT_NETWORK=1")
}
c := NewClient(t.Logf, nil, nil, new(controlknobs.Knobs), nil)
c := NewClient(Config{Logf: t.Logf, ControlKnobs: new(controlknobs.Knobs)})
defer c.Close()
c.debug.VerboseLogs = true
c.SetLocalPort(1234)
res, err := c.Probe(context.Background())
t.Logf("Probe: %+v, %v", res, err)
ext, err := c.createOrGetMapping(context.Background())
_, ext, err := c.createOrGetMapping(context.Background())
t.Logf("createOrGetMapping: %v, %v", ext, err)
}
@@ -66,9 +67,8 @@ func TestProbeIntegration(t *testing.T) {
}
defer igd.Close()
c := newTestClient(t, igd)
c := newTestClient(t, igd, nil)
t.Logf("Listening on pxp=%v, upnp=%v", c.testPxPPort, c.testUPnPPort)
defer c.Close()
res, err := c.Probe(context.Background())
if err != nil {
@@ -101,8 +101,7 @@ func TestPCPIntegration(t *testing.T) {
}
defer igd.Close()
c := newTestClient(t, igd)
defer c.Close()
c := newTestClient(t, igd, nil)
res, err := c.Probe(context.Background())
if err != nil {
t.Fatalf("probe failed: %v", err)
@@ -114,7 +113,7 @@ func TestPCPIntegration(t *testing.T) {
t.Fatalf("probe did not see pcp: %+v", res)
}
external, err := c.createOrGetMapping(context.Background())
_, external, err := c.createOrGetMapping(context.Background())
if err != nil {
t.Fatalf("failed to get mapping: %v", err)
}
@@ -136,3 +135,29 @@ func TestGetUPnPErrorsMetric(t *testing.T) {
getUPnPErrorsMetric(0)
getUPnPErrorsMetric(-100)
}
func TestUpdateEvent(t *testing.T) {
igd, err := NewTestIGD(t.Logf, TestIGDOptions{PCP: true})
if err != nil {
t.Fatalf("Create test gateway: %v", err)
}
bus := eventbus.New()
defer bus.Close()
sub := eventbus.Subscribe[Mapping](bus.Client("TestUpdateEvent"))
c := newTestClient(t, igd, bus)
if _, err := c.Probe(t.Context()); err != nil {
t.Fatalf("Probe failed: %v", err)
}
c.GetCachedMappingOrStartCreatingOne()
select {
case evt := <-sub.Events():
t.Logf("Received portmap update: %+v", evt)
case <-sub.Done():
t.Error("Subscriber closed prematurely")
case <-time.After(5 * time.Second):
t.Error("Timed out waiting for an update event")
}
}

View File

@@ -163,9 +163,8 @@ func TestSelectBestService(t *testing.T) {
Desc: rootDesc,
Control: tt.control,
})
c := newTestClient(t, igd)
c := newTestClient(t, igd, nil)
t.Logf("Listening on upnp=%v", c.testUPnPPort)
defer c.Close()
// Ensure that we're using the HTTP client that talks to our test IGD server
ctx := context.Background()

View File

@@ -586,9 +586,8 @@ func TestGetUPnPPortMapping(t *testing.T) {
},
})
c := newTestClient(t, igd)
c := newTestClient(t, igd, nil)
t.Logf("Listening on upnp=%v", c.testUPnPPort)
defer c.Close()
c.debug.VerboseLogs = true
@@ -689,10 +688,9 @@ func TestGetUPnPPortMapping_LeaseDuration(t *testing.T) {
})
ctx := context.Background()
c := newTestClient(t, igd)
c := newTestClient(t, igd, nil)
c.debug.VerboseLogs = true
t.Logf("Listening on upnp=%v", c.testUPnPPort)
defer c.Close()
// Actually test the UPnP port mapping.
mustProbeUPnP(t, ctx, c)
@@ -735,8 +733,7 @@ func TestGetUPnPPortMapping_NoValidServices(t *testing.T) {
Desc: noSupportedServicesRootDesc,
})
c := newTestClient(t, igd)
defer c.Close()
c := newTestClient(t, igd, nil)
c.debug.VerboseLogs = true
ctx := context.Background()
@@ -778,8 +775,7 @@ func TestGetUPnPPortMapping_Legacy(t *testing.T) {
},
})
c := newTestClient(t, igd)
defer c.Close()
c := newTestClient(t, igd, nil)
c.debug.VerboseLogs = true
ctx := context.Background()
@@ -806,9 +802,8 @@ func TestGetUPnPPortMappingNoResponses(t *testing.T) {
}
defer igd.Close()
c := newTestClient(t, igd)
c := newTestClient(t, igd, nil)
t.Logf("Listening on upnp=%v", c.testUPnPPort)
defer c.Close()
c.debug.VerboseLogs = true
@@ -939,8 +934,7 @@ func TestGetUPnPPortMapping_Invalid(t *testing.T) {
},
})
c := newTestClient(t, igd)
defer c.Close()
c := newTestClient(t, igd, nil)
c.debug.VerboseLogs = true
ctx := context.Background()

View File

@@ -1037,8 +1037,8 @@ func TestSSHAuthFlow(t *testing.T) {
func TestSSH(t *testing.T) {
var logf logger.Logf = t.Logf
sys := &tsd.System{}
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
sys := tsd.NewSystem()
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil {
t.Fatal(err)
}

View File

@@ -34,6 +34,7 @@ import (
"tailscale.com/net/tstun"
"tailscale.com/proxymap"
"tailscale.com/types/netmap"
"tailscale.com/util/eventbus"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine"
"tailscale.com/wgengine/magicsock"
@@ -41,7 +42,12 @@ import (
)
// System contains all the subsystems of a Tailscale node (tailscaled, etc.)
//
// A valid System value must always have a non-nil Bus populated. Callers must
// ensure this before using the value further. Call [NewSystem] to obtain a
// value ready to use.
type System struct {
Bus SubSystem[*eventbus.Bus]
Dialer SubSystem[*tsdial.Dialer]
DNSManager SubSystem[*dns.Manager] // can get its *resolver.Resolver from DNSManager.Resolver
Engine SubSystem[wgengine.Engine]
@@ -74,6 +80,14 @@ type System struct {
userMetricsRegistry usermetric.Registry
}
// NewSystem constructs a new otherwise-empty [System] with a
// freshly-constructed event bus populated.
func NewSystem() *System {
sys := new(System)
sys.Set(eventbus.New())
return sys
}
// NetstackImpl is the interface that *netstack.Impl implements.
// It's an interface for circular dependency reasons: netstack.Impl
// references LocalBackend, and LocalBackend has a tsd.System.
@@ -86,6 +100,8 @@ type NetstackImpl interface {
// has already been set.
func (s *System) Set(v any) {
switch v := v.(type) {
case *eventbus.Bus:
s.Bus.Set(v)
case *netmon.Monitor:
s.NetMon.Set(v)
case *dns.Manager:

View File

@@ -435,8 +435,11 @@ func (s *Server) Close() error {
for _, ln := range s.listeners {
ln.closeLocked()
}
wg.Wait()
if bus := s.sys.Bus.Get(); bus != nil {
bus.Close()
}
s.closed = true
return nil
}
@@ -558,13 +561,13 @@ func (s *Server) start() (reterr error) {
s.Logf(format, a...)
}
sys := new(tsd.System)
sys := tsd.NewSystem()
s.sys = sys
if err := s.startLogger(&closePool, sys.HealthTracker(), tsLogf); err != nil {
return err
}
s.netMon, err = netmon.New(tsLogf)
s.netMon, err = netmon.New(sys.Bus.Get(), tsLogf)
if err != nil {
return err
}
@@ -572,6 +575,7 @@ func (s *Server) start() (reterr error) {
s.dialer = &tsdial.Dialer{Logf: tsLogf} // mutated below (before used)
eng, err := wgengine.NewUserspaceEngine(tsLogf, wgengine.Config{
EventBus: sys.Bus.Get(),
ListenPort: s.Port,
NetMon: s.netMon,
Dialer: s.dialer,

View File

@@ -48,6 +48,7 @@ import (
_ "tailscale.com/types/logger"
_ "tailscale.com/types/logid"
_ "tailscale.com/util/clientmetric"
_ "tailscale.com/util/eventbus"
_ "tailscale.com/util/multierr"
_ "tailscale.com/util/osshare"
_ "tailscale.com/version"

View File

@@ -48,6 +48,7 @@ import (
_ "tailscale.com/types/logger"
_ "tailscale.com/types/logid"
_ "tailscale.com/util/clientmetric"
_ "tailscale.com/util/eventbus"
_ "tailscale.com/util/multierr"
_ "tailscale.com/util/osshare"
_ "tailscale.com/version"

View File

@@ -48,6 +48,7 @@ import (
_ "tailscale.com/types/logger"
_ "tailscale.com/types/logid"
_ "tailscale.com/util/clientmetric"
_ "tailscale.com/util/eventbus"
_ "tailscale.com/util/multierr"
_ "tailscale.com/util/osshare"
_ "tailscale.com/version"

View File

@@ -48,6 +48,7 @@ import (
_ "tailscale.com/types/logger"
_ "tailscale.com/types/logid"
_ "tailscale.com/util/clientmetric"
_ "tailscale.com/util/eventbus"
_ "tailscale.com/util/multierr"
_ "tailscale.com/util/osshare"
_ "tailscale.com/version"

View File

@@ -56,6 +56,7 @@ import (
_ "tailscale.com/types/logger"
_ "tailscale.com/types/logid"
_ "tailscale.com/util/clientmetric"
_ "tailscale.com/util/eventbus"
_ "tailscale.com/util/multierr"
_ "tailscale.com/util/osdiag"
_ "tailscale.com/util/osshare"

View File

@@ -9,7 +9,6 @@ import (
"html"
"io"
"net/http"
"net/http/pprof"
"net/url"
"os"
"runtime"
@@ -64,16 +63,7 @@ func Debugger(mux *http.ServeMux) *DebugHandler {
ret.Handle("varz", "Metrics (Prometheus)", http.HandlerFunc(varz.Handler))
}
// pprof.Index serves everything that runtime/pprof.Lookup finds:
// goroutine, threadcreate, heap, allocs, block, mutex
ret.Handle("pprof/", "pprof (index)", http.HandlerFunc(pprof.Index))
// But register the other ones from net/http/pprof directly:
ret.HandleSilent("pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
ret.HandleSilent("pprof/profile", http.HandlerFunc(pprof.Profile))
ret.HandleSilent("pprof/symbol", http.HandlerFunc(pprof.Symbol))
ret.HandleSilent("pprof/trace", http.HandlerFunc(pprof.Trace))
ret.URL("/debug/pprof/goroutine?debug=1", "Goroutines (collapsed)")
ret.URL("/debug/pprof/goroutine?debug=2", "Goroutines (full)")
addProfilingHandlers(ret)
ret.Handle("gc", "force GC", http.HandlerFunc(gcHandler))
hostname, err := os.Hostname()
if err == nil {

24
tsweb/pprof_default.go Normal file
View File

@@ -0,0 +1,24 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !js && !wasm
package tsweb
import (
"net/http"
"net/http/pprof"
)
func addProfilingHandlers(d *DebugHandler) {
// pprof.Index serves everything that runtime/pprof.Lookup finds:
// goroutine, threadcreate, heap, allocs, block, mutex
d.Handle("pprof/", "pprof (index)", http.HandlerFunc(pprof.Index))
// But register the other ones from net/http/pprof directly:
d.HandleSilent("pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
d.HandleSilent("pprof/profile", http.HandlerFunc(pprof.Profile))
d.HandleSilent("pprof/symbol", http.HandlerFunc(pprof.Symbol))
d.HandleSilent("pprof/trace", http.HandlerFunc(pprof.Trace))
d.URL("/debug/pprof/goroutine?debug=1", "Goroutines (collapsed)")
d.URL("/debug/pprof/goroutine?debug=2", "Goroutines (full)")
}

10
tsweb/pprof_js.go Normal file
View File

@@ -0,0 +1,10 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build js && wasm
package tsweb
func addProfilingHandlers(d *DebugHandler) {
// No pprof in js builds, pprof doesn't work and bloats the build.
}

View File

@@ -15,7 +15,6 @@ import (
"io"
"net"
"net/http"
_ "net/http/pprof"
"net/netip"
"net/url"
"os"

View File

@@ -46,7 +46,7 @@ func setupWGTest(b *testing.B, logf logger.Logf, traf *TrafficGen, a1, a2 netip.
logf: logger.WithPrefix(logf, "tun1: "),
traf: traf,
}
s1 := new(tsd.System)
s1 := tsd.NewSystem()
e1, err := wgengine.NewUserspaceEngine(l1, wgengine.Config{
Router: router.NewFake(l1),
NetMon: nil,
@@ -73,7 +73,7 @@ func setupWGTest(b *testing.B, logf logger.Logf, traf *TrafficGen, a1, a2 netip.
logf: logger.WithPrefix(logf, "tun2: "),
traf: traf,
}
s2 := new(tsd.System)
s2 := tsd.NewSystem()
e2, err := wgengine.NewUserspaceEngine(l2, wgengine.Config{
Router: router.NewFake(l2),
NetMon: nil,

View File

@@ -56,6 +56,7 @@ import (
"tailscale.com/types/nettype"
"tailscale.com/types/views"
"tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
"tailscale.com/util/mak"
"tailscale.com/util/ringbuffer"
"tailscale.com/util/set"
@@ -136,6 +137,8 @@ type Conn struct {
// This block mirrors the contents and field order of the Options
// struct. Initialized once at construction, then constant.
eventBus *eventbus.Bus
eventClient *eventbus.Client
logf logger.Logf
epFunc func([]tailcfg.Endpoint)
derpActiveFunc func()
@@ -401,8 +404,15 @@ func (c *Conn) dlogf(format string, a ...any) {
// Options contains options for Listen.
type Options struct {
// Logf optionally provides a log function to use.
// Must not be nil.
// EventBus, if non-nil, is used for event publication and subscription by
// each Conn created from these Options.
//
// TODO(creachadair): As of 2025-03-19 this is optional, but is intended to
// become required non-nil.
EventBus *eventbus.Bus
// Logf provides a log function to use. It must not be nil.
// Use [logger.Discard] to disrcard logs.
Logf logger.Logf
// Port is the port to listen on.
@@ -529,6 +539,7 @@ func NewConn(opts Options) (*Conn, error) {
}
c := newConn(opts.logf())
c.eventBus = opts.EventBus
c.port.Store(uint32(opts.Port))
c.controlKnobs = opts.ControlKnobs
c.epFunc = opts.endpointsFunc()
@@ -537,6 +548,31 @@ func NewConn(opts Options) (*Conn, error) {
c.testOnlyPacketListener = opts.TestOnlyPacketListener
c.noteRecvActivity = opts.NoteRecvActivity
// If an event bus is enabled, subscribe to portmapping changes; otherwise
// use the callback mechanism of portmapper.Client.
//
// TODO(creachadair): Remove the switch once the event bus is mandatory.
onPortMapChanged := c.onPortMapChanged
if c.eventBus != nil {
c.eventClient = c.eventBus.Client("magicsock.Conn")
pmSub := eventbus.Subscribe[portmapper.Mapping](c.eventClient)
go func() {
defer pmSub.Close()
for {
select {
case <-pmSub.Events():
c.onPortMapChanged()
case <-pmSub.Done():
return
}
}
}()
// Disable the explicit callback from the portmapper, the subscriber handles it.
onPortMapChanged = nil
}
// Don't log the same log messages possibly every few seconds in our
// portmapper.
portmapperLogf := logger.WithPrefix(c.logf, "portmapper: ")
@@ -544,7 +580,14 @@ func NewConn(opts Options) (*Conn, error) {
portMapOpts := &portmapper.DebugKnobs{
DisableAll: func() bool { return opts.DisablePortMapper || c.onlyTCP443.Load() },
}
c.portMapper = portmapper.NewClient(portmapperLogf, opts.NetMon, portMapOpts, opts.ControlKnobs, c.onPortMapChanged)
c.portMapper = portmapper.NewClient(portmapper.Config{
EventBus: c.eventBus,
Logf: portmapperLogf,
NetMon: opts.NetMon,
DebugKnobs: portMapOpts,
ControlKnobs: opts.ControlKnobs,
OnChange: onPortMapChanged,
})
c.portMapper.SetGatewayLookupFunc(opts.NetMon.GatewayAndSelfIP)
c.netMon = opts.NetMon
c.health = opts.HealthTracker
@@ -2461,6 +2504,9 @@ func (c *connBind) Close() error {
if c.closeDisco6 != nil {
c.closeDisco6.Close()
}
if c.eventClient != nil {
c.eventClient.Close()
}
// Send an empty read result to unblock receiveDERP,
// which will then check connBind.Closed.
// connBind.Closed takes c.mu, but c.derpRecvCh is buffered.

View File

@@ -62,6 +62,7 @@ import (
"tailscale.com/types/nettype"
"tailscale.com/types/ptr"
"tailscale.com/util/cibuild"
"tailscale.com/util/eventbus"
"tailscale.com/util/must"
"tailscale.com/util/racebuild"
"tailscale.com/util/set"
@@ -173,7 +174,10 @@ func newMagicStack(t testing.TB, logf logger.Logf, l nettype.PacketListener, der
func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListener, derpMap *tailcfg.DERPMap, privateKey key.NodePrivate) *magicStack {
t.Helper()
netMon, err := netmon.New(logf)
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logf)
if err != nil {
t.Fatalf("netmon.New: %v", err)
}
@@ -390,7 +394,10 @@ func TestNewConn(t *testing.T) {
}
}
netMon, err := netmon.New(logger.WithPrefix(t.Logf, "... netmon: "))
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
if err != nil {
t.Fatalf("netmon.New: %v", err)
}
@@ -523,7 +530,10 @@ func TestDeviceStartStop(t *testing.T) {
tstest.PanicOnLog()
tstest.ResourceCheck(t)
netMon, err := netmon.New(logger.WithPrefix(t.Logf, "... netmon: "))
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
if err != nil {
t.Fatalf("netmon.New: %v", err)
}
@@ -1362,7 +1372,10 @@ func newTestConn(t testing.TB) *Conn {
t.Helper()
port := pickPort(t)
netMon, err := netmon.New(logger.WithPrefix(t.Logf, "... netmon: "))
bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
if err != nil {
t.Fatalf("netmon.New: %v", err)
}
@@ -3117,7 +3130,10 @@ func TestMaybeRebindOnError(t *testing.T) {
}
func TestNetworkDownSendErrors(t *testing.T) {
netMon := must.Get(netmon.New(t.Logf))
bus := eventbus.New()
defer bus.Close()
netMon := must.Get(netmon.New(bus, t.Logf))
defer netMon.Close()
reg := new(usermetric.Registry)

View File

@@ -44,13 +44,14 @@ func TestInjectInboundLeak(t *testing.T) {
t.Logf(format, args...)
}
}
sys := new(tsd.System)
sys := tsd.NewSystem()
eng, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{
Tun: tunDev,
Dialer: dialer,
SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(),
Metrics: sys.UserMetricsRegistry(),
EventBus: sys.Bus.Get(),
})
if err != nil {
t.Fatal(err)
@@ -100,7 +101,7 @@ func getMemStats() (ms runtime.MemStats) {
func makeNetstack(tb testing.TB, config func(*Impl)) *Impl {
tunDev := tstun.NewFake()
sys := &tsd.System{}
sys := tsd.NewSystem()
sys.Set(new(mem.Store))
dialer := new(tsdial.Dialer)
logf := tstest.WhileTestRunningLogger(tb)
@@ -110,6 +111,7 @@ func makeNetstack(tb testing.TB, config func(*Impl)) *Impl {
SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(),
Metrics: sys.UserMetricsRegistry(),
EventBus: sys.Bus.Get(),
})
if err != nil {
tb.Fatal(err)

View File

@@ -27,6 +27,7 @@ import (
"tailscale.com/net/tsaddr"
"tailscale.com/tstest"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
"tailscale.com/util/linuxfw"
"tailscale.com/version/distro"
)
@@ -363,7 +364,9 @@ ip route add throw 192.168.0.0/24 table 52` + basic,
},
}
mon, err := netmon.New(logger.Discard)
bus := eventbus.New()
defer bus.Close()
mon, err := netmon.New(bus, logger.Discard)
if err != nil {
t.Fatal(err)
}
@@ -973,7 +976,10 @@ func newLinuxRootTest(t *testing.T) *linuxTest {
logf := lt.logOutput.Logf
mon, err := netmon.New(logger.Discard)
bus := eventbus.New()
defer bus.Close()
mon, err := netmon.New(bus, logger.Discard)
if err != nil {
lt.Close()
t.Fatal(err)

View File

@@ -46,6 +46,7 @@ import (
"tailscale.com/types/views"
"tailscale.com/util/clientmetric"
"tailscale.com/util/deephash"
"tailscale.com/util/eventbus"
"tailscale.com/util/mak"
"tailscale.com/util/set"
"tailscale.com/util/testenv"
@@ -89,8 +90,12 @@ const statusPollInterval = 1 * time.Minute
const networkLoggerUploadTimeout = 5 * time.Second
type userspaceEngine struct {
// eventBus will eventually become required, but for now may be nil.
// TODO(creachadair): Enforce that this is non-nil at construction.
eventBus *eventbus.Bus
logf logger.Logf
wgLogger *wglog.Logger //a wireguard-go logging wrapper
wgLogger *wglog.Logger // a wireguard-go logging wrapper
reqCh chan struct{}
waitCh chan struct{} // chan is closed when first Close call completes; contrast with closing bool
timeNow func() mono.Time
@@ -227,6 +232,13 @@ type Config struct {
// DriveForLocal, if populated, will cause the engine to expose a Taildrive
// listener at 100.100.100.100:8080.
DriveForLocal drive.FileSystemForLocal
// EventBus, if non-nil, is used for event publication and subscription by
// the Engine and its subsystems.
//
// TODO(creachadair): As of 2025-03-19 this is optional, but is intended to
// become required non-nil.
EventBus *eventbus.Bus
}
// NewFakeUserspaceEngine returns a new userspace engine for testing.
@@ -255,6 +267,8 @@ func NewFakeUserspaceEngine(logf logger.Logf, opts ...any) (Engine, error) {
conf.HealthTracker = v
case *usermetric.Registry:
conf.Metrics = v
case *eventbus.Bus:
conf.EventBus = v
default:
return nil, fmt.Errorf("unknown option type %T", v)
}
@@ -323,6 +337,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
}
e := &userspaceEngine{
eventBus: conf.EventBus,
timeNow: mono.Now,
logf: logf,
reqCh: make(chan struct{}, 1),
@@ -348,7 +363,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
if conf.NetMon != nil {
e.netMon = conf.NetMon
} else {
mon, err := netmon.New(logf)
mon, err := netmon.New(conf.EventBus, logf)
if err != nil {
return nil, err
}
@@ -389,6 +404,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
}
}
magicsockOpts := magicsock.Options{
EventBus: e.eventBus,
Logf: logf,
Port: conf.ListenPort,
EndpointsFunc: endpointsFn,

View File

@@ -16,13 +16,14 @@ import (
)
func TestIsNetstack(t *testing.T) {
sys := new(tsd.System)
sys := tsd.NewSystem()
e, err := wgengine.NewUserspaceEngine(
tstest.WhileTestRunningLogger(t),
wgengine.Config{
SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(),
Metrics: sys.UserMetricsRegistry(),
EventBus: sys.Bus.Get(),
},
)
if err != nil {
@@ -66,7 +67,7 @@ func TestIsNetstackRouter(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sys := &tsd.System{}
sys := tsd.NewSystem()
if tt.setNetstackRouter {
sys.NetstackRouter.Set(true)
}
@@ -74,6 +75,7 @@ func TestIsNetstackRouter(t *testing.T) {
conf.SetSubsystem = sys.Set
conf.HealthTracker = sys.HealthTracker()
conf.Metrics = sys.UserMetricsRegistry()
conf.EventBus = sys.Bus.Get()
e, err := wgengine.NewUserspaceEngine(logger.Discard, conf)
if err != nil {
t.Fatal(err)

View File

@@ -25,6 +25,7 @@ import (
"tailscale.com/types/key"
"tailscale.com/types/netmap"
"tailscale.com/types/opt"
"tailscale.com/util/eventbus"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine/router"
"tailscale.com/wgengine/wgcfg"
@@ -100,9 +101,12 @@ func nodeViews(v []*tailcfg.Node) []tailcfg.NodeView {
}
func TestUserspaceEngineReconfig(t *testing.T) {
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker)
reg := new(usermetric.Registry)
e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg)
e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg, bus)
if err != nil {
t.Fatal(err)
}
@@ -166,13 +170,16 @@ func TestUserspaceEnginePortReconfig(t *testing.T) {
var knobs controlknobs.Knobs
bus := eventbus.New()
defer bus.Close()
// Keep making a wgengine until we find an unused port
var ue *userspaceEngine
ht := new(health.Tracker)
reg := new(usermetric.Registry)
for i := range 100 {
attempt := uint16(defaultPort + i)
e, err := NewFakeUserspaceEngine(t.Logf, attempt, &knobs, ht, reg)
e, err := NewFakeUserspaceEngine(t.Logf, attempt, &knobs, ht, reg, bus)
if err != nil {
t.Fatal(err)
}
@@ -251,9 +258,11 @@ func TestUserspaceEnginePeerMTUReconfig(t *testing.T) {
var knobs controlknobs.Knobs
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker)
reg := new(usermetric.Registry)
e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht, reg)
e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht, reg, bus)
if err != nil {
t.Fatal(err)
}

View File

@@ -9,6 +9,7 @@ import (
"time"
"tailscale.com/health"
"tailscale.com/util/eventbus"
"tailscale.com/util/usermetric"
)
@@ -24,9 +25,11 @@ func TestWatchdog(t *testing.T) {
t.Run("default watchdog does not fire", func(t *testing.T) {
t.Parallel()
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker)
reg := new(usermetric.Registry)
e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg)
e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg, bus)
if err != nil {
t.Fatal(err)
}