Compare commits

...

1 Commits

Author SHA1 Message Date
Brad Fitzpatrick
4f141a2852 control/controlclient: remove some channels (quit, updateCh)
Updates #cleanup

Change-Id: Ie4820b64f1ad276129e4f81a609fa14e7e04604b
Co-authored-by: Maisem Ali <maisem@tailscale.com>
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2023-08-31 22:22:04 -07:00

View File

@@ -45,24 +45,24 @@ func (g *LoginGoal) sendLogoutError(err error) {
var _ Client = (*Auto)(nil)
// waitUnpause waits until the client is unpaused then returns. It only
// returns an error if the client is closed.
func (c *Auto) waitUnpause(routineLogName string) error {
// waitUnpause waits until either the client is unpaused or the Auto client is
// shut down. It reports whether the client should keep running (i.e. it's not
// closed).
func (c *Auto) waitUnpause(routineLogName string) (keepRunning bool) {
c.mu.Lock()
if !c.paused {
c.mu.Unlock()
return nil
defer c.mu.Unlock()
return !c.closed
}
unpaused := c.unpausedChanLocked()
c.mu.Unlock()
c.logf("%s: awaiting unpause", routineLogName)
select {
case <-unpaused:
v := <-unpaused
if v {
c.logf("%s: unpaused", routineLogName)
return nil
case <-c.quit:
return errors.New("quit")
}
return v
}
// updateRoutine is responsible for informing the server of worthy changes to
@@ -76,7 +76,7 @@ func (c *Auto) updateRoutine() {
var lastUpdateGenInformed updateGen
for {
if err := c.waitUnpause("updateRoutine"); err != nil {
if !c.waitUnpause("updateRoutine") {
c.logf("updateRoutine: exiting")
return
}
@@ -86,19 +86,11 @@ func (c *Auto) updateRoutine() {
needUpdate := gen > 0 && gen != lastUpdateGenInformed && c.loggedIn
c.mu.Unlock()
if needUpdate {
select {
case <-c.quit:
c.logf("updateRoutine: exiting")
return
default:
}
} else {
if !needUpdate {
// Nothing to do, wait for a signal.
select {
case <-c.quit:
c.logf("updateRoutine: exiting")
return
case <-ctx.Done():
continue
case <-c.updateCh:
continue
}
@@ -141,7 +133,6 @@ type Auto struct {
logf logger.Logf
closed bool
updateCh chan struct{} // readable when we should inform the server of a change
newMapCh chan struct{} // readable when we must restart a map request
observer Observer // called to update Client status; always non-nil
unregisterHealthWatch func()
@@ -155,7 +146,7 @@ type Auto struct {
lastUpdateGen updateGen
paused bool // whether we should stop making HTTP requests
unpauseWaiters []chan struct{}
unpauseWaiters []chan bool
loggedIn bool // true if currently logged in
loginGoal *LoginGoal // non-nil if some login activity is desired
synced bool // true if our netmap is up-to-date
@@ -165,7 +156,6 @@ type Auto struct {
mapCtx context.Context // context used for netmap and update requests
authCancel func() // cancel authCtx
mapCancel func() // cancel mapCtx
quit chan struct{} // when closed, goroutines should all exit
authDone chan struct{} // when closed, authRoutine is done
mapDone chan struct{} // when closed, mapRoutine is done
updateDone chan struct{} // when closed, updateRoutine is done
@@ -206,8 +196,6 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
clock: opts.Clock,
logf: opts.Logf,
updateCh: make(chan struct{}, 1),
newMapCh: make(chan struct{}, 1),
quit: make(chan struct{}),
authDone: make(chan struct{}),
mapDone: make(chan struct{}),
updateDone: make(chan struct{}),
@@ -241,7 +229,7 @@ func (c *Auto) SetPaused(paused bool) {
c.cancelMapCtxLocked()
} else {
for _, ch := range c.unpauseWaiters {
close(ch)
ch <- true
}
c.unpauseWaiters = nil
}
@@ -325,16 +313,6 @@ func (c *Auto) restartMap() {
c.mu.Unlock()
c.logf("[v1] restartMap: synced=%v", synced)
select {
case c.newMapCh <- struct{}{}:
c.logf("[v1] restartMap: wrote to channel")
default:
// if channel write failed, then there was already
// an outstanding newMapCh request. One is enough,
// since it'll always use the latest endpoints.
c.logf("[v1] restartMap: channel was full")
}
c.updateControl()
}
@@ -343,6 +321,10 @@ func (c *Auto) authRoutine() {
bo := backoff.NewBackoff("authRoutine", c.logf, 30*time.Second)
for {
if !c.waitUnpause("authRoutine") {
c.logf("authRoutine: exiting")
return
}
c.mu.Lock()
goal := c.loginGoal
ctx := c.authCtx
@@ -353,13 +335,6 @@ func (c *Auto) authRoutine() {
}
c.mu.Unlock()
select {
case <-c.quit:
c.logf("[v1] authRoutine: quit")
return
default:
}
report := func(err error, msg string) {
c.logf("[v1] %s: %v", msg, err)
// don't send status updates for context errors,
@@ -476,12 +451,12 @@ func (c *Auto) DirectForTest() *Direct {
return c.direct
}
// unpausedChanLocked returns a new channel that is closed when the
// current Auto pause is unpaused.
// unpausedChanLocked returns a new channel that gets sent
// either a true when unpaused or false on Auto.Shutdown.
//
// c.mu must be held
func (c *Auto) unpausedChanLocked() <-chan struct{} {
unpaused := make(chan struct{})
func (c *Auto) unpausedChanLocked() <-chan bool {
unpaused := make(chan bool, 1)
c.unpauseWaiters = append(c.unpauseWaiters, unpaused)
return unpaused
}
@@ -523,7 +498,7 @@ func (c *Auto) mapRoutine() {
}
for {
if err := c.waitUnpause("mapRoutine"); err != nil {
if !c.waitUnpause("mapRoutine") {
c.logf("mapRoutine: exiting")
return
}
@@ -534,13 +509,6 @@ func (c *Auto) mapRoutine() {
ctx := c.mapCtx
c.mu.Unlock()
select {
case <-c.quit:
c.logf("mapRoutine: quit")
return
default:
}
report := func(err error, msg string) {
c.logf("[v1] %s: %v", msg, err)
err = fmt.Errorf("%s: %w", msg, err)
@@ -558,36 +526,33 @@ func (c *Auto) mapRoutine() {
// c.state is set by authRoutine()
c.mu.Unlock()
select {
case <-ctx.Done():
c.logf("[v1] mapRoutine: context done.")
case <-c.newMapCh:
c.logf("[v1] mapRoutine: new map needed while idle.")
}
} else {
health.SetOutOfPollNetMap()
<-ctx.Done()
c.logf("[v1] mapRoutine: context done.")
continue
err := c.direct.PollNetMap(ctx, mrs)
}
health.SetOutOfPollNetMap()
health.SetOutOfPollNetMap()
c.mu.Lock()
c.synced = false
if c.state == StateSynchronized {
c.state = StateAuthenticated
}
paused := c.paused
c.mu.Unlock()
err := c.direct.PollNetMap(ctx, mrs)
if paused {
mrs.bo.BackOff(ctx, nil)
c.logf("mapRoutine: paused")
continue
}
health.SetOutOfPollNetMap()
c.mu.Lock()
c.synced = false
if c.state == StateSynchronized {
c.state = StateAuthenticated
}
paused := c.paused
c.mu.Unlock()
report(err, "PollNetMap")
mrs.bo.BackOff(ctx, err)
if paused {
mrs.bo.BackOff(ctx, nil)
c.logf("mapRoutine: paused")
continue
}
report(err, "PollNetMap")
mrs.bo.BackOff(ctx, err)
continue
}
}
@@ -735,13 +700,16 @@ func (c *Auto) Shutdown() {
c.closed = true
c.cancelAuthCtxLocked()
c.cancelMapCtxLocked()
for _, ch := range c.unpauseWaiters {
ch <- false
}
c.unpauseWaiters = nil
}
c.mu.Unlock()
c.logf("client.Shutdown")
if !closed {
c.unregisterHealthWatch()
close(c.quit)
<-c.authDone
<-c.mapDone
<-c.updateDone