Compare commits
1 Commits
awly/cli-j
...
bradfitz/d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
68f95f7c53 |
@@ -41,6 +41,7 @@ func startMeshWithHost(s *derp.Server, host string) error {
|
||||
return err
|
||||
}
|
||||
c.MeshKey = s.MeshKey()
|
||||
c.WatchConnectionChanges = true
|
||||
|
||||
// For meshed peers within a region, connect via VPC addresses.
|
||||
c.SetURLDialer(func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
|
||||
@@ -56,6 +56,12 @@ type Client struct {
|
||||
MeshKey string // optional; for trusted clients
|
||||
IsProber bool // optional; for probers to optional declare themselves as such
|
||||
|
||||
// WatchConnectionChanges is whether the client wishes to subscribe to
|
||||
// notifications about clients connecting & disconnecting.
|
||||
//
|
||||
// Only trusted connections (using MeshKey) are allowed to use this.
|
||||
WatchConnectionChanges bool
|
||||
|
||||
// BaseContext, if non-nil, returns the base context to use for dialing a
|
||||
// new derp server. If nil, context.Background is used.
|
||||
// In either case, additional timeouts may be added to the base context.
|
||||
@@ -80,6 +86,7 @@ type Client struct {
|
||||
addrFamSelAtomic syncs.AtomicValue[AddressFamilySelector]
|
||||
|
||||
mu sync.Mutex
|
||||
started bool // true upon first connect, never transitions to false
|
||||
preferred bool
|
||||
canAckPings bool
|
||||
closed bool
|
||||
@@ -142,6 +149,15 @@ func NewClient(privateKey key.NodePrivate, serverURL string, logf logger.Logf) (
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// isStarted reports whether this client has been used yet.
|
||||
//
|
||||
// If if reports false, it may still have its exported fields configured.
|
||||
func (c *Client) isStarted() bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.started
|
||||
}
|
||||
|
||||
// Connect connects or reconnects to the server, unless already connected.
|
||||
// It returns nil if there was already a good connection, or if one was made.
|
||||
func (c *Client) Connect(ctx context.Context) error {
|
||||
@@ -284,6 +300,7 @@ func useWebsockets() bool {
|
||||
func (c *Client) connect(ctx context.Context, caller string) (client *derp.Client, connGen int, err error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.started = true
|
||||
if c.closed {
|
||||
return nil, 0, ErrClientClosed
|
||||
}
|
||||
@@ -495,6 +512,13 @@ func (c *Client) connect(ctx context.Context, caller string) (client *derp.Clien
|
||||
}
|
||||
}
|
||||
|
||||
if c.WatchConnectionChanges {
|
||||
if err := derpClient.WatchConnectionChanges(); err != nil {
|
||||
go httpConn.Close()
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
c.serverPubKey = derpClient.ServerPublicKey()
|
||||
c.client = derpClient
|
||||
c.netConn = tcpConn
|
||||
@@ -956,22 +980,6 @@ func (c *Client) NotePreferred(v bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// WatchConnectionChanges sends a request to subscribe to
|
||||
// notifications about clients connecting & disconnecting.
|
||||
//
|
||||
// Only trusted connections (using MeshKey) are allowed to use this.
|
||||
func (c *Client) WatchConnectionChanges() error {
|
||||
client, _, err := c.connect(c.newContext(), "derphttp.Client.WatchConnectionChanges")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = client.WatchConnectionChanges()
|
||||
if err != nil {
|
||||
c.closeForReconnect(client)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ClosePeer asks the server to close target's TCP connection.
|
||||
//
|
||||
// Only trusted connections (using MeshKey) are allowed to use this.
|
||||
|
||||
@@ -14,20 +14,30 @@ import (
|
||||
"tailscale.com/types/logger"
|
||||
)
|
||||
|
||||
// RunWatchConnectionLoop loops until ctx is done, sending WatchConnectionChanges and subscribing to
|
||||
// connection changes.
|
||||
// RunWatchConnectionLoop loops until ctx is done, sending
|
||||
// WatchConnectionChanges and subscribing to connection changes.
|
||||
//
|
||||
// If the server's public key is ignoreServerKey, RunWatchConnectionLoop returns.
|
||||
// If the server's public key is ignoreServerKey, RunWatchConnectionLoop
|
||||
// returns.
|
||||
//
|
||||
// Otherwise, the add and remove funcs are called as clients come & go.
|
||||
//
|
||||
// infoLogf, if non-nil, is the logger to write periodic status
|
||||
// updates about how many peers are on the server. Error log output is
|
||||
// set to the c's logger, regardless of infoLogf's value.
|
||||
// infoLogf, if non-nil, is the logger to write periodic status updates about
|
||||
// how many peers are on the server. Error log output is set to the c's logger,
|
||||
// regardless of infoLogf's value.
|
||||
//
|
||||
// To force RunWatchConnectionLoop to return quickly, its ctx needs to
|
||||
// be closed, and c itself needs to be closed.
|
||||
// To force RunWatchConnectionLoop to return quickly, its ctx needs to be
|
||||
// closed, and c itself needs to be closed.
|
||||
//
|
||||
// It is a fatal error to call this on an already-started Client withoutq having
|
||||
// initialized Client.WatchConnectionChanges to true.
|
||||
func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf, add func(key.NodePublic, netip.AddrPort), remove func(key.NodePublic)) {
|
||||
if !c.WatchConnectionChanges {
|
||||
if c.isStarted() {
|
||||
panic("invalid use of RunWatchConnectionLoop on already-started Client without setting Client.RunWatchConnectionLoop")
|
||||
}
|
||||
c.WatchConnectionChanges = true
|
||||
}
|
||||
if infoLogf == nil {
|
||||
infoLogf = logger.Discard
|
||||
}
|
||||
@@ -101,14 +111,6 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
|
||||
}
|
||||
|
||||
for ctx.Err() == nil {
|
||||
err := c.WatchConnectionChanges()
|
||||
if err != nil {
|
||||
clear()
|
||||
logf("WatchConnectionChanges: %v", err)
|
||||
sleep(retryInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
if c.ServerPublicKey() == ignoreServerKey {
|
||||
logf("detected self-connect; ignoring host")
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user