diff --git a/go.mod b/go.mod index dfa426522..50d8ba91e 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 k8s.io/api v0.0.0-20250423231958-d18a46229505 k8s.io/apimachinery v0.0.0-20250423231524-954960919938 - k8s.io/client-go v0.0.0-20250423232513-451ac0fcb5bd + k8s.io/client-go v0.0.0-20250424032238-82c2b6c71657 k8s.io/component-base v0.0.0-20250423233653-88ca0abd8349 k8s.io/klog/v2 v2.130.1 k8s.io/kms v0.0.0-20250401105328-de9f6e9dd930 diff --git a/go.sum b/go.sum index d52f86c6a..827703ba4 100644 --- a/go.sum +++ b/go.sum @@ -371,8 +371,8 @@ k8s.io/api v0.0.0-20250423231958-d18a46229505 h1:CzcGzeX7BpE8DYfaOipLutwMTAZHcii k8s.io/api v0.0.0-20250423231958-d18a46229505/go.mod h1:PEZtCkQxQ/XohDih7vofdKAdIIiS9kFruj1lF/EUMLs= k8s.io/apimachinery v0.0.0-20250423231524-954960919938 h1:yoIMbzO4of8M4auqFKjNsbFlHJG9jCuoD+4sUJUPdn4= k8s.io/apimachinery v0.0.0-20250423231524-954960919938/go.mod h1:tJ77gZ1upNffdrQVxg+oIoEmvSIyTbz3RIPi9HKw+nw= -k8s.io/client-go v0.0.0-20250423232513-451ac0fcb5bd h1:rR8S6D2gcZ8zGdIobs1g8HLnwbSVvnTOXaI9LaS/TR4= -k8s.io/client-go v0.0.0-20250423232513-451ac0fcb5bd/go.mod h1:NKnTaoOD5AOCgg0t9ZlglMkD6GXYAdkN5RfI1BYA4lo= +k8s.io/client-go v0.0.0-20250424032238-82c2b6c71657 h1:qBJHoi86zzpRsCHtDZnj0bccNrz7PaAwrdZB1uGT3xY= +k8s.io/client-go v0.0.0-20250424032238-82c2b6c71657/go.mod h1:NKnTaoOD5AOCgg0t9ZlglMkD6GXYAdkN5RfI1BYA4lo= k8s.io/component-base v0.0.0-20250423233653-88ca0abd8349 h1:TdFXFBAukREkHLOoqrybaIpwLRgmdc4ntO4nqP9ly3A= k8s.io/component-base v0.0.0-20250423233653-88ca0abd8349/go.mod h1:d9qYA5xnPzVV2QGRVxRQkFCFhomjWs865vF8Nvfvhf8= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index e2141395b..17cc24f0d 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -91,7 +91,6 @@ type watchChan struct { cancel context.CancelFunc incomingEventChan chan *event resultChan chan watch.Event - errChan chan error } // Watch watches on a key and returns a watch.Interface that transfers relevant notifications. @@ -135,7 +134,6 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re internalPred: pred, incomingEventChan: make(chan *event, incomingBufSize), resultChan: make(chan watch.Event, outgoingBufSize), - errChan: make(chan error, 1), } if pred.Empty() { // The filter doesn't filter out any object. @@ -234,18 +232,6 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo wc.processEvents(&resultChanWG) select { - case err := <-wc.errChan: - if isCancelError(err) { - break - } - errResult := transformErrorToEvent(err) - if errResult != nil { - // error result is guaranteed to be received by user before closing ResultChan. - select { - case wc.resultChan <- *errResult: - case <-wc.ctx.Done(): // user has given up all results - } - } case <-watchClosedCh: case <-wc.ctx.Done(): // user cancel } @@ -309,7 +295,7 @@ func (wc *watchChan) sync() error { // send items from the response until no more results for i, kv := range getResp.Kvs { lastKey = kv.Key - wc.sendEvent(parseKV(kv)) + wc.queueEvent(parseKV(kv)) // free kv early. Long lists can take O(seconds) to decode. getResp.Kvs[i] = nil } @@ -378,7 +364,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd } } if initialEventsEndBookmarkRequired { - wc.sendEvent(func() *event { + wc.queueEvent(func() *event { e := progressNotifyEvent(wc.initialRev) e.isInitialEventsEndBookmark = true return e @@ -397,11 +383,16 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd err := wres.Err() // If there is an error on server (e.g. compaction), the channel will return it before closed. logWatchChannelErr(err) + // sendError doesn't guarantee that no more items will be put into resultChan. + // However, by returning from startWatching here, we guarantee, that events + // with higher resourceVersion than the error will not be queue and thus also + // processed and send to the user. + // TODO(wojtek-t): Figure out if we can synchronously prevent more events. wc.sendError(err) return } if wres.IsProgressNotify() { - wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision())) + wc.queueEvent(progressNotifyEvent(wres.Header.GetRevision())) metrics.RecordEtcdBookmark(wc.watcher.groupResource.String()) continue } @@ -411,10 +402,15 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd parsedEvent, err := parseEvent(e) if err != nil { logWatchChannelErr(err) + // sendError doesn't guarantee that no more items will be put into resultChan. + // However, by returning from startWatching here, we guarantee, that events + // with higher resourceVersion than the error will not be queue and thus also + // processed and send to the user. + // TODO(wojtek-t): Figure out if we can synchronously prevent more events. wc.sendError(err) return } - wc.sendEvent(parsedEvent) + wc.queueEvent(parsedEvent) } } // When we come to this point, it's only possible that client side ends the watch. @@ -447,15 +443,7 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) { if res == nil { continue } - if len(wc.resultChan) == cap(wc.resultChan) { - klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource) - } - // If user couldn't receive results fast enough, we also block incoming events from watcher. - // Because storing events in local will cause more memory usage. - // The worst case would be closing the fast watcher. - select { - case wc.resultChan <- *res: - case <-wc.ctx.Done(): + if !wc.sendEvent(res) { return } case <-wc.ctx.Done(): @@ -545,15 +533,7 @@ func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Co if r.event == nil { continue } - if len(p.wc.resultChan) == cap(p.wc.resultChan) { - klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.wc.watcher.objectType, "groupResource", p.wc.watcher.groupResource) - } - // If user couldn't receive results fast enough, we also block incoming events from watcher. - // Because storing events in local will cause more memory usage. - // The worst case would be closing the fast watcher. - select { - case p.wc.resultChan <- *r.event: - case <-p.wc.ctx.Done(): + if !p.wc.sendEvent(r.event) { return } } @@ -654,14 +634,44 @@ func transformErrorToEvent(err error) *watch.Event { } } +// sendError synchronously puts an error event into resultChan and +// trigger cancelling all goroutines. func (wc *watchChan) sendError(err error) { - select { - case wc.errChan <- err: - case <-wc.ctx.Done(): + // We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all. + // It's fine to double cancel. + defer wc.cancel() + + if isCancelError(err) { + return + } + errResult := transformErrorToEvent(err) + if errResult != nil { + // error result is guaranteed to be received by user before closing ResultChan. + select { + case wc.resultChan <- *errResult: + case <-wc.ctx.Done(): // user has given up all results + } } } -func (wc *watchChan) sendEvent(e *event) { +// sendEvent synchronously puts an event into resultChan. +// Returns true if it was successful. +func (wc *watchChan) sendEvent(event *watch.Event) bool { + if len(wc.resultChan) == cap(wc.resultChan) { + klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource) + } + // If user couldn't receive results fast enough, we also block incoming events from watcher. + // Because storing events in local will cause more memory usage. + // The worst case would be closing the fast watcher. + select { + case wc.resultChan <- *event: + return true + case <-wc.ctx.Done(): + return false + } +} + +func (wc *watchChan) queueEvent(e *event) { if len(wc.incomingEventChan) == incomingBufSize { klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource) } diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 11afc6153..a72ba7dec 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -174,7 +174,6 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything) // make resultChan and errChan blocking to ensure ordering. w.resultChan = make(chan watch.Event) - w.errChan = make(chan error) // The event flow goes like: // - first we send an error, it should block on resultChan. // - Then we cancel ctx. The blocking on resultChan should be freed up @@ -185,8 +184,13 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { w.run(false, true) wg.Done() }() - w.errChan <- fmt.Errorf("some error") + wg.Add(1) + go func() { + w.sendError(fmt.Errorf("some error")) + wg.Done() + }() cancel() + // Ensure that both run() and sendError() don't hung forever. wg.Wait() }