Merge pull request #131162 from wojtek-t/simplify_etcd3_watcher

Simplify etcd3 watcher

Kubernetes-commit: db21f3df3f44b5b4545c8164d0d6030b01db2ed0
This commit is contained in:
Kubernetes Publisher 2025-04-23 17:08:51 -07:00
commit c160237b46
4 changed files with 59 additions and 45 deletions

2
go.mod
View File

@ -50,7 +50,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1
k8s.io/api v0.0.0-20250423231958-d18a46229505
k8s.io/apimachinery v0.0.0-20250423231524-954960919938
k8s.io/client-go v0.0.0-20250423232513-451ac0fcb5bd
k8s.io/client-go v0.0.0-20250424032238-82c2b6c71657
k8s.io/component-base v0.0.0-20250423233653-88ca0abd8349
k8s.io/klog/v2 v2.130.1
k8s.io/kms v0.0.0-20250401105328-de9f6e9dd930

4
go.sum
View File

@ -371,8 +371,8 @@ k8s.io/api v0.0.0-20250423231958-d18a46229505 h1:CzcGzeX7BpE8DYfaOipLutwMTAZHcii
k8s.io/api v0.0.0-20250423231958-d18a46229505/go.mod h1:PEZtCkQxQ/XohDih7vofdKAdIIiS9kFruj1lF/EUMLs=
k8s.io/apimachinery v0.0.0-20250423231524-954960919938 h1:yoIMbzO4of8M4auqFKjNsbFlHJG9jCuoD+4sUJUPdn4=
k8s.io/apimachinery v0.0.0-20250423231524-954960919938/go.mod h1:tJ77gZ1upNffdrQVxg+oIoEmvSIyTbz3RIPi9HKw+nw=
k8s.io/client-go v0.0.0-20250423232513-451ac0fcb5bd h1:rR8S6D2gcZ8zGdIobs1g8HLnwbSVvnTOXaI9LaS/TR4=
k8s.io/client-go v0.0.0-20250423232513-451ac0fcb5bd/go.mod h1:NKnTaoOD5AOCgg0t9ZlglMkD6GXYAdkN5RfI1BYA4lo=
k8s.io/client-go v0.0.0-20250424032238-82c2b6c71657 h1:qBJHoi86zzpRsCHtDZnj0bccNrz7PaAwrdZB1uGT3xY=
k8s.io/client-go v0.0.0-20250424032238-82c2b6c71657/go.mod h1:NKnTaoOD5AOCgg0t9ZlglMkD6GXYAdkN5RfI1BYA4lo=
k8s.io/component-base v0.0.0-20250423233653-88ca0abd8349 h1:TdFXFBAukREkHLOoqrybaIpwLRgmdc4ntO4nqP9ly3A=
k8s.io/component-base v0.0.0-20250423233653-88ca0abd8349/go.mod h1:d9qYA5xnPzVV2QGRVxRQkFCFhomjWs865vF8Nvfvhf8=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=

View File

@ -91,7 +91,6 @@ type watchChan struct {
cancel context.CancelFunc
incomingEventChan chan *event
resultChan chan watch.Event
errChan chan error
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
@ -135,7 +134,6 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
errChan: make(chan error, 1),
}
if pred.Empty() {
// The filter doesn't filter out any object.
@ -234,18 +232,6 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo
wc.processEvents(&resultChanWG)
select {
case err := <-wc.errChan:
if isCancelError(err) {
break
}
errResult := transformErrorToEvent(err)
if errResult != nil {
// error result is guaranteed to be received by user before closing ResultChan.
select {
case wc.resultChan <- *errResult:
case <-wc.ctx.Done(): // user has given up all results
}
}
case <-watchClosedCh:
case <-wc.ctx.Done(): // user cancel
}
@ -309,7 +295,7 @@ func (wc *watchChan) sync() error {
// send items from the response until no more results
for i, kv := range getResp.Kvs {
lastKey = kv.Key
wc.sendEvent(parseKV(kv))
wc.queueEvent(parseKV(kv))
// free kv early. Long lists can take O(seconds) to decode.
getResp.Kvs[i] = nil
}
@ -378,7 +364,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
}
}
if initialEventsEndBookmarkRequired {
wc.sendEvent(func() *event {
wc.queueEvent(func() *event {
e := progressNotifyEvent(wc.initialRev)
e.isInitialEventsEndBookmark = true
return e
@ -397,11 +383,16 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
err := wres.Err()
// If there is an error on server (e.g. compaction), the channel will return it before closed.
logWatchChannelErr(err)
// sendError doesn't guarantee that no more items will be put into resultChan.
// However, by returning from startWatching here, we guarantee, that events
// with higher resourceVersion than the error will not be queue and thus also
// processed and send to the user.
// TODO(wojtek-t): Figure out if we can synchronously prevent more events.
wc.sendError(err)
return
}
if wres.IsProgressNotify() {
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
wc.queueEvent(progressNotifyEvent(wres.Header.GetRevision()))
metrics.RecordEtcdBookmark(wc.watcher.groupResource.String())
continue
}
@ -411,10 +402,15 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
parsedEvent, err := parseEvent(e)
if err != nil {
logWatchChannelErr(err)
// sendError doesn't guarantee that no more items will be put into resultChan.
// However, by returning from startWatching here, we guarantee, that events
// with higher resourceVersion than the error will not be queue and thus also
// processed and send to the user.
// TODO(wojtek-t): Figure out if we can synchronously prevent more events.
wc.sendError(err)
return
}
wc.sendEvent(parsedEvent)
wc.queueEvent(parsedEvent)
}
}
// When we come to this point, it's only possible that client side ends the watch.
@ -447,15 +443,7 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
if res == nil {
continue
}
if len(wc.resultChan) == cap(wc.resultChan) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case wc.resultChan <- *res:
case <-wc.ctx.Done():
if !wc.sendEvent(res) {
return
}
case <-wc.ctx.Done():
@ -545,15 +533,7 @@ func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Co
if r.event == nil {
continue
}
if len(p.wc.resultChan) == cap(p.wc.resultChan) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.wc.watcher.objectType, "groupResource", p.wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case p.wc.resultChan <- *r.event:
case <-p.wc.ctx.Done():
if !p.wc.sendEvent(r.event) {
return
}
}
@ -654,14 +634,44 @@ func transformErrorToEvent(err error) *watch.Event {
}
}
// sendError synchronously puts an error event into resultChan and
// trigger cancelling all goroutines.
func (wc *watchChan) sendError(err error) {
select {
case wc.errChan <- err:
case <-wc.ctx.Done():
// We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all.
// It's fine to double cancel.
defer wc.cancel()
if isCancelError(err) {
return
}
errResult := transformErrorToEvent(err)
if errResult != nil {
// error result is guaranteed to be received by user before closing ResultChan.
select {
case wc.resultChan <- *errResult:
case <-wc.ctx.Done(): // user has given up all results
}
}
}
func (wc *watchChan) sendEvent(e *event) {
// sendEvent synchronously puts an event into resultChan.
// Returns true if it was successful.
func (wc *watchChan) sendEvent(event *watch.Event) bool {
if len(wc.resultChan) == cap(wc.resultChan) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case wc.resultChan <- *event:
return true
case <-wc.ctx.Done():
return false
}
}
func (wc *watchChan) queueEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}

View File

@ -174,7 +174,6 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
// make resultChan and errChan blocking to ensure ordering.
w.resultChan = make(chan watch.Event)
w.errChan = make(chan error)
// The event flow goes like:
// - first we send an error, it should block on resultChan.
// - Then we cancel ctx. The blocking on resultChan should be freed up
@ -185,8 +184,13 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
w.run(false, true)
wg.Done()
}()
w.errChan <- fmt.Errorf("some error")
wg.Add(1)
go func() {
w.sendError(fmt.Errorf("some error"))
wg.Done()
}()
cancel()
// Ensure that both run() and sendError() don't hung forever.
wg.Wait()
}