Compare commits
1 Commits
nickkhyl/h
...
bradfitz/w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f141a2852 |
@@ -45,24 +45,24 @@ func (g *LoginGoal) sendLogoutError(err error) {
|
||||
|
||||
var _ Client = (*Auto)(nil)
|
||||
|
||||
// waitUnpause waits until the client is unpaused then returns. It only
|
||||
// returns an error if the client is closed.
|
||||
func (c *Auto) waitUnpause(routineLogName string) error {
|
||||
// waitUnpause waits until either the client is unpaused or the Auto client is
|
||||
// shut down. It reports whether the client should keep running (i.e. it's not
|
||||
// closed).
|
||||
func (c *Auto) waitUnpause(routineLogName string) (keepRunning bool) {
|
||||
c.mu.Lock()
|
||||
if !c.paused {
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
defer c.mu.Unlock()
|
||||
return !c.closed
|
||||
}
|
||||
unpaused := c.unpausedChanLocked()
|
||||
c.mu.Unlock()
|
||||
|
||||
c.logf("%s: awaiting unpause", routineLogName)
|
||||
select {
|
||||
case <-unpaused:
|
||||
v := <-unpaused
|
||||
if v {
|
||||
c.logf("%s: unpaused", routineLogName)
|
||||
return nil
|
||||
case <-c.quit:
|
||||
return errors.New("quit")
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// updateRoutine is responsible for informing the server of worthy changes to
|
||||
@@ -76,7 +76,7 @@ func (c *Auto) updateRoutine() {
|
||||
var lastUpdateGenInformed updateGen
|
||||
|
||||
for {
|
||||
if err := c.waitUnpause("updateRoutine"); err != nil {
|
||||
if !c.waitUnpause("updateRoutine") {
|
||||
c.logf("updateRoutine: exiting")
|
||||
return
|
||||
}
|
||||
@@ -86,19 +86,11 @@ func (c *Auto) updateRoutine() {
|
||||
needUpdate := gen > 0 && gen != lastUpdateGenInformed && c.loggedIn
|
||||
c.mu.Unlock()
|
||||
|
||||
if needUpdate {
|
||||
select {
|
||||
case <-c.quit:
|
||||
c.logf("updateRoutine: exiting")
|
||||
return
|
||||
default:
|
||||
}
|
||||
} else {
|
||||
if !needUpdate {
|
||||
// Nothing to do, wait for a signal.
|
||||
select {
|
||||
case <-c.quit:
|
||||
c.logf("updateRoutine: exiting")
|
||||
return
|
||||
case <-ctx.Done():
|
||||
continue
|
||||
case <-c.updateCh:
|
||||
continue
|
||||
}
|
||||
@@ -141,7 +133,6 @@ type Auto struct {
|
||||
logf logger.Logf
|
||||
closed bool
|
||||
updateCh chan struct{} // readable when we should inform the server of a change
|
||||
newMapCh chan struct{} // readable when we must restart a map request
|
||||
observer Observer // called to update Client status; always non-nil
|
||||
|
||||
unregisterHealthWatch func()
|
||||
@@ -155,7 +146,7 @@ type Auto struct {
|
||||
lastUpdateGen updateGen
|
||||
|
||||
paused bool // whether we should stop making HTTP requests
|
||||
unpauseWaiters []chan struct{}
|
||||
unpauseWaiters []chan bool
|
||||
loggedIn bool // true if currently logged in
|
||||
loginGoal *LoginGoal // non-nil if some login activity is desired
|
||||
synced bool // true if our netmap is up-to-date
|
||||
@@ -165,7 +156,6 @@ type Auto struct {
|
||||
mapCtx context.Context // context used for netmap and update requests
|
||||
authCancel func() // cancel authCtx
|
||||
mapCancel func() // cancel mapCtx
|
||||
quit chan struct{} // when closed, goroutines should all exit
|
||||
authDone chan struct{} // when closed, authRoutine is done
|
||||
mapDone chan struct{} // when closed, mapRoutine is done
|
||||
updateDone chan struct{} // when closed, updateRoutine is done
|
||||
@@ -206,8 +196,6 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
|
||||
clock: opts.Clock,
|
||||
logf: opts.Logf,
|
||||
updateCh: make(chan struct{}, 1),
|
||||
newMapCh: make(chan struct{}, 1),
|
||||
quit: make(chan struct{}),
|
||||
authDone: make(chan struct{}),
|
||||
mapDone: make(chan struct{}),
|
||||
updateDone: make(chan struct{}),
|
||||
@@ -241,7 +229,7 @@ func (c *Auto) SetPaused(paused bool) {
|
||||
c.cancelMapCtxLocked()
|
||||
} else {
|
||||
for _, ch := range c.unpauseWaiters {
|
||||
close(ch)
|
||||
ch <- true
|
||||
}
|
||||
c.unpauseWaiters = nil
|
||||
}
|
||||
@@ -325,16 +313,6 @@ func (c *Auto) restartMap() {
|
||||
c.mu.Unlock()
|
||||
|
||||
c.logf("[v1] restartMap: synced=%v", synced)
|
||||
|
||||
select {
|
||||
case c.newMapCh <- struct{}{}:
|
||||
c.logf("[v1] restartMap: wrote to channel")
|
||||
default:
|
||||
// if channel write failed, then there was already
|
||||
// an outstanding newMapCh request. One is enough,
|
||||
// since it'll always use the latest endpoints.
|
||||
c.logf("[v1] restartMap: channel was full")
|
||||
}
|
||||
c.updateControl()
|
||||
}
|
||||
|
||||
@@ -343,6 +321,10 @@ func (c *Auto) authRoutine() {
|
||||
bo := backoff.NewBackoff("authRoutine", c.logf, 30*time.Second)
|
||||
|
||||
for {
|
||||
if !c.waitUnpause("authRoutine") {
|
||||
c.logf("authRoutine: exiting")
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
goal := c.loginGoal
|
||||
ctx := c.authCtx
|
||||
@@ -353,13 +335,6 @@ func (c *Auto) authRoutine() {
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-c.quit:
|
||||
c.logf("[v1] authRoutine: quit")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
report := func(err error, msg string) {
|
||||
c.logf("[v1] %s: %v", msg, err)
|
||||
// don't send status updates for context errors,
|
||||
@@ -476,12 +451,12 @@ func (c *Auto) DirectForTest() *Direct {
|
||||
return c.direct
|
||||
}
|
||||
|
||||
// unpausedChanLocked returns a new channel that is closed when the
|
||||
// current Auto pause is unpaused.
|
||||
// unpausedChanLocked returns a new channel that gets sent
|
||||
// either a true when unpaused or false on Auto.Shutdown.
|
||||
//
|
||||
// c.mu must be held
|
||||
func (c *Auto) unpausedChanLocked() <-chan struct{} {
|
||||
unpaused := make(chan struct{})
|
||||
func (c *Auto) unpausedChanLocked() <-chan bool {
|
||||
unpaused := make(chan bool, 1)
|
||||
c.unpauseWaiters = append(c.unpauseWaiters, unpaused)
|
||||
return unpaused
|
||||
}
|
||||
@@ -523,7 +498,7 @@ func (c *Auto) mapRoutine() {
|
||||
}
|
||||
|
||||
for {
|
||||
if err := c.waitUnpause("mapRoutine"); err != nil {
|
||||
if !c.waitUnpause("mapRoutine") {
|
||||
c.logf("mapRoutine: exiting")
|
||||
return
|
||||
}
|
||||
@@ -534,13 +509,6 @@ func (c *Auto) mapRoutine() {
|
||||
ctx := c.mapCtx
|
||||
c.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-c.quit:
|
||||
c.logf("mapRoutine: quit")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
report := func(err error, msg string) {
|
||||
c.logf("[v1] %s: %v", msg, err)
|
||||
err = fmt.Errorf("%s: %w", msg, err)
|
||||
@@ -558,36 +526,33 @@ func (c *Auto) mapRoutine() {
|
||||
// c.state is set by authRoutine()
|
||||
c.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
c.logf("[v1] mapRoutine: context done.")
|
||||
case <-c.newMapCh:
|
||||
c.logf("[v1] mapRoutine: new map needed while idle.")
|
||||
}
|
||||
} else {
|
||||
health.SetOutOfPollNetMap()
|
||||
<-ctx.Done()
|
||||
c.logf("[v1] mapRoutine: context done.")
|
||||
continue
|
||||
|
||||
err := c.direct.PollNetMap(ctx, mrs)
|
||||
}
|
||||
health.SetOutOfPollNetMap()
|
||||
|
||||
health.SetOutOfPollNetMap()
|
||||
c.mu.Lock()
|
||||
c.synced = false
|
||||
if c.state == StateSynchronized {
|
||||
c.state = StateAuthenticated
|
||||
}
|
||||
paused := c.paused
|
||||
c.mu.Unlock()
|
||||
err := c.direct.PollNetMap(ctx, mrs)
|
||||
|
||||
if paused {
|
||||
mrs.bo.BackOff(ctx, nil)
|
||||
c.logf("mapRoutine: paused")
|
||||
continue
|
||||
}
|
||||
health.SetOutOfPollNetMap()
|
||||
c.mu.Lock()
|
||||
c.synced = false
|
||||
if c.state == StateSynchronized {
|
||||
c.state = StateAuthenticated
|
||||
}
|
||||
paused := c.paused
|
||||
c.mu.Unlock()
|
||||
|
||||
report(err, "PollNetMap")
|
||||
mrs.bo.BackOff(ctx, err)
|
||||
if paused {
|
||||
mrs.bo.BackOff(ctx, nil)
|
||||
c.logf("mapRoutine: paused")
|
||||
continue
|
||||
}
|
||||
|
||||
report(err, "PollNetMap")
|
||||
mrs.bo.BackOff(ctx, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -735,13 +700,16 @@ func (c *Auto) Shutdown() {
|
||||
c.closed = true
|
||||
c.cancelAuthCtxLocked()
|
||||
c.cancelMapCtxLocked()
|
||||
for _, ch := range c.unpauseWaiters {
|
||||
ch <- false
|
||||
}
|
||||
c.unpauseWaiters = nil
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
c.logf("client.Shutdown")
|
||||
if !closed {
|
||||
c.unregisterHealthWatch()
|
||||
close(c.quit)
|
||||
<-c.authDone
|
||||
<-c.mapDone
|
||||
<-c.updateDone
|
||||
|
||||
Reference in New Issue
Block a user