allow drainer to be reset (#2572)

* allow drainer to be reset

* ensure Reset can be called multiple times

* add comment explaining the use of the channel

* bump tolerance as it flaked in actions

* update godoc

* fix linting and comments
This commit is contained in:
Dave Protasowski 2022-08-15 17:52:48 -04:00 committed by GitHub
parent 894c2f20c9
commit d02dcd0b03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 111 additions and 16 deletions

View File

@ -70,12 +70,12 @@ type Drainer struct {
// after Drain is called before it may return.
QuietPeriod time.Duration
// once is used to initialize timer
once sync.Once
// timer is used to orchestrate the drain.
timer timer
// used to synchronize callers of Drain and Reset
ch chan struct{}
// HealthCheckUAPrefixes are the additional user agent prefixes that trigger the
// drainer's health check
HealthCheckUAPrefixes []string
@ -106,7 +106,7 @@ func (d *Drainer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
d.reset()
d.resetTimer()
d.Inner.ServeHTTP(w, r)
}
@ -115,19 +115,42 @@ func (d *Drainer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (d *Drainer) Drain() {
// Note: until the first caller exits, the others
// will wait blocked as well.
d.once.Do(func() {
t := func() timer {
d.Lock()
defer d.Unlock()
if d.QuietPeriod <= 0 {
d.QuietPeriod = network.DefaultDrainTimeout
ch := func() chan struct{} {
d.Lock()
defer d.Unlock()
if d.ch != nil {
return d.ch
}
if d.QuietPeriod <= 0 {
d.QuietPeriod = network.DefaultDrainTimeout
}
timer := newTimer(d.QuietPeriod)
ch := make(chan struct{})
go func() {
select {
case <-ch:
// closed by reset
case <-timer.tickChan():
close(ch)
}
d.timer = newTimer(d.QuietPeriod)
return d.timer
}()
<-t.tickChan()
})
d.ch = ch
d.timer = timer
return ch
}()
<-ch
}
func drainTimer(tc <-chan time.Time) {
select {
case <-tc:
default:
}
}
// isHealthcheckRequest validates if the request has a user agent that is for healthcheck
@ -145,8 +168,27 @@ func (d *Drainer) isHealthCheckRequest(r *http.Request) bool {
return false
}
// reset resets the drain timer to the full amount of time.
func (d *Drainer) reset() {
// Reset interrupts Drain and clears the drainers internal state
// Thus further calls to Drain will block and wait for the entire QuietPeriod
func (d *Drainer) Reset() {
d.Lock()
defer d.Unlock()
if d.timer != nil {
if d.timer.Stop() {
d.timer = nil
} else {
drainTimer(d.timer.tickChan())
}
}
if d.ch != nil {
close(d.ch)
d.ch = nil
}
}
func (d *Drainer) resetTimer() {
if func() bool {
d.RLock()
defer d.RUnlock()

View File

@ -490,3 +490,56 @@ func TestServeKProbe(t *testing.T) {
t.Errorf("Probe status = %d, wanted %d", got, want)
}
}
func TestReset(t *testing.T) {
d := Drainer{
QuietPeriod: 5 * time.Second,
}
drain1 := make(chan struct{})
drain2 := make(chan struct{})
go func() {
defer close(drain1)
d.Drain()
}()
go func() {
defer close(drain2)
d.Drain()
}()
// wait for draining to be active
time.Sleep(50 * time.Millisecond)
d.Reset()
select {
case <-drain1:
case <-time.After(time.Second):
t.Fatal("Reset didn't unblock first Drain")
}
select {
case <-drain2:
case <-time.After(time.Second):
t.Fatal("Reset didn't unblock second Drain")
}
// Calling reset again should be a noop
d.Reset()
d.QuietPeriod = time.Second / 2
start := time.Now()
d.Drain()
duration := time.Since(start)
diff := d.QuietPeriod - duration
if diff < 0 {
diff = -diff
}
if diff > 50*time.Millisecond {
t.Error("expected to drain to wait QuietPeriod time after reset")
}
}