Revert "add timeout suuport for watch"

This reverts commit c676e234cc1036a2a1147f2eeef9c89468671330.

Kubernetes-commit: 30a56bab42ab14a3a2ee21f5581b02525d4fd110
This commit is contained in:
Benjamin Elder 2019-04-01 18:39:17 -07:00 committed by Kubernetes Publisher
parent 8b6e55f1c0
commit 8ad12b3912
3 changed files with 12 additions and 59 deletions

View File

@ -249,8 +249,6 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
} }
klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout) klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
watcher, err := rw.Watch(ctx, &opts) watcher, err := rw.Watch(ctx, &opts)
if err != nil { if err != nil {
scope.err(err, w, req) scope.err(err, w, req)

View File

@ -19,7 +19,6 @@ package cacher
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"net/http" "net/http"
"reflect" "reflect"
"sync" "sync"
@ -365,16 +364,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
chanSize = 1000 chanSize = 1000
} }
// Determine watch timeout
timeout := time.Duration(math.MaxInt64)
if deadline, ok := ctx.Deadline(); ok {
timeout = deadline.Sub(time.Now())
}
// Create a watcher here to reduce memory allocations under lock, // Create a watcher here to reduce memory allocations under lock,
// given that memory allocation may trigger GC and block the thread. // given that memory allocation may trigger GC and block the thread.
// Also note that emptyFunc is a placeholder, until we will be able // Also note that emptyFunc is a placeholder, until we will be able
// to compute watcher.forget function (which has to happen under lock). // to compute watcher.forget function (which has to happen under lock).
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, timeout) watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner)
// We explicitly use thread unsafe version and do locking ourself to ensure that // We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked // no new events will be processed in the meantime. The watchCache will be unlocked
@ -407,7 +401,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
c.watcherIdx++ c.watcherIdx++
}() }()
go watcher.process(ctx, initEvents, watchRV) go watcher.process(initEvents, watchRV)
return watcher, nil return watcher, nil
} }
@ -894,34 +888,9 @@ type cacheWatcher struct {
stopped bool stopped bool
forget func() forget func()
versioner storage.Versioner versioner storage.Versioner
timer *time.Timer
} }
var timerPool sync.Pool func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher {
func newTimer(d time.Duration) *time.Timer {
t, ok := timerPool.Get().(*time.Timer)
if ok {
t.Reset(d)
} else {
t = time.NewTimer(d)
}
return t
}
func freeTimer(timer *time.Timer) {
if !timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
select {
case <-timer.C:
default:
}
}
timerPool.Put(timer)
}
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, timeout time.Duration) *cacheWatcher {
return &cacheWatcher{ return &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize), input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize), result: make(chan watch.Event, chanSize),
@ -930,7 +899,6 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), ve
stopped: false, stopped: false,
forget: forget, forget: forget,
versioner: versioner, versioner: versioner,
timer: newTimer(timeout),
} }
} }
@ -951,7 +919,6 @@ func (c *cacheWatcher) stop() {
c.stopped = true c.stopped = true
close(c.done) close(c.done)
close(c.input) close(c.input)
freeTimer(c.timer)
} }
} }
@ -1040,7 +1007,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
} }
} }
func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) { func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// Check how long we are processing initEvents. // Check how long we are processing initEvents.
@ -1076,20 +1043,10 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
defer close(c.result) defer close(c.result)
defer c.Stop() defer c.Stop()
for { for event := range c.input {
select { // only send events newer than resourceVersion
case event, ok := <-c.input: if event.ResourceVersion > resourceVersion {
if !ok { c.sendWatchCacheEvent(event)
return
}
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
}
case <-ctx.Done():
return
case <-c.timer.C:
return
} }
} }
} }

View File

@ -19,7 +19,6 @@ package cacher
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"reflect" "reflect"
"strconv" "strconv"
"sync" "sync"
@ -64,8 +63,8 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
} }
// set the size of the buffer of w.result to 0, so that the writes to // set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked. // w.result is blocked.
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Duration(math.MaxInt64)) w = newCacheWatcher(0, filter, forget, testVersioner{})
go w.process(context.Background(), initEvents, 0) go w.process(initEvents, 0)
w.Stop() w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock() lock.RLock()
@ -183,9 +182,8 @@ TestCase:
for j := range testCase.events { for j := range testCase.events {
testCase.events[j].ResourceVersion = uint64(j) + 1 testCase.events[j].ResourceVersion = uint64(j) + 1
} }
w := newCacheWatcher(0, filter, forget, testVersioner{})
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Duration(math.MaxInt64)) go w.process(testCase.events, 0)
go w.process(context.Background(), testCase.events, 0)
ch := w.ResultChan() ch := w.ResultChan()
for j, event := range testCase.expected { for j, event := range testCase.expected {
e := <-ch e := <-ch