From b9e86eb851744533a75a934441d740b55a2c2c4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 24 Mar 2025 14:02:07 +0100 Subject: [PATCH] Fix race for sending errors in watch Kubernetes-commit: c8c2844aaf1d04835624ff2d46417492e10dec11 --- pkg/storage/etcd3/watcher.go | 73 ++++++++++++++++------------ pkg/storage/etcd3/watcher_test.go | 5 ++ pkg/storage/testing/watcher_tests.go | 67 +++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 32 deletions(-) diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index abb51b07f..e2141395b 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -438,7 +438,12 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) { for { select { case e := <-wc.incomingEventChan: - res := wc.transform(e) + res, err := wc.transform(e) + if err != nil { + wc.sendError(err) + return + } + if res == nil { continue } @@ -461,10 +466,8 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) { func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) { p := concurrentOrderedEventProcessing{ - input: wc.incomingEventChan, - processFunc: wc.transform, - output: wc.resultChan, - processingQueue: make(chan chan *watch.Event, processEventConcurrency-1), + wc: wc, + processingQueue: make(chan chan *processingResult, processEventConcurrency-1), objectType: wc.watcher.objectType, groupResource: wc.watcher.groupResource, @@ -481,12 +484,15 @@ func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) { }() } -type concurrentOrderedEventProcessing struct { - input chan *event - processFunc func(*event) *watch.Event - output chan watch.Event +type processingResult struct { + event *watch.Event + err error +} - processingQueue chan chan *watch.Event +type concurrentOrderedEventProcessing struct { + wc *watchChan + + processingQueue chan chan *processingResult // Metadata for logging objectType string groupResource schema.GroupResource @@ -498,28 +504,29 @@ func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.C select { case <-ctx.Done(): return - case e = <-p.input: + case e = <-p.wc.incomingEventChan: } - processingResponse := make(chan *watch.Event, 1) + processingResponse := make(chan *processingResult, 1) select { case <-ctx.Done(): return case p.processingQueue <- processingResponse: } wg.Add(1) - go func(e *event, response chan<- *watch.Event) { + go func(e *event, response chan<- *processingResult) { defer wg.Done() + responseEvent, err := p.wc.transform(e) select { case <-ctx.Done(): - case response <- p.processFunc(e): + case response <- &processingResult{event: responseEvent, err: err}: } }(e, processingResponse) } } func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) { - var processingResponse chan *watch.Event - var e *watch.Event + var processingResponse chan *processingResult + var r *processingResult for { select { case <-ctx.Done(): @@ -529,21 +536,25 @@ func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Co select { case <-ctx.Done(): return - case e = <-processingResponse: + case r = <-processingResponse: } - if e == nil { + if r.err != nil { + p.wc.sendError(r.err) + return + } + if r.event == nil { continue } - if len(p.output) == cap(p.output) { - klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource) + if len(p.wc.resultChan) == cap(p.wc.resultChan) { + klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.wc.watcher.objectType, "groupResource", p.wc.watcher.groupResource) } // If user couldn't receive results fast enough, we also block incoming events from watcher. // Because storing events in local will cause more memory usage. // The worst case would be closing the fast watcher. select { - case <-ctx.Done(): + case p.wc.resultChan <- *r.event: + case <-p.wc.ctx.Done(): return - case p.output <- *e: } } } @@ -561,12 +572,11 @@ func (wc *watchChan) acceptAll() bool { } // transform transforms an event into a result for user if not filtered. -func (wc *watchChan) transform(e *event) (res *watch.Event) { +func (wc *watchChan) transform(e *event) (res *watch.Event, err error) { curObj, oldObj, err := wc.prepareObjs(e) if err != nil { klog.Errorf("failed to prepare current and previous objects: %v", err) - wc.sendError(err) - return nil + return nil, err } switch { @@ -574,12 +584,11 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { object := wc.watcher.newFunc() if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil { klog.Errorf("failed to propagate object version: %v", err) - return nil + return nil, fmt.Errorf("failed to propagate object resource version: %w", err) } if e.isInitialEventsEndBookmark { if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil { - wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err)) - return nil + return nil, fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %w", wc.watcher.groupResource, wc.watcher.objectType, object, err) } } res = &watch.Event{ @@ -588,7 +597,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { } case e.isDeleted: if !wc.filter(oldObj) { - return nil + return nil, nil } res = &watch.Event{ Type: watch.Deleted, @@ -596,7 +605,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { } case e.isCreated: if !wc.filter(curObj) { - return nil + return nil, nil } res = &watch.Event{ Type: watch.Added, @@ -608,7 +617,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { Type: watch.Modified, Object: curObj, } - return res + return res, nil } curObjPasses := wc.filter(curObj) oldObjPasses := wc.filter(oldObj) @@ -630,7 +639,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { } } } - return res + return res, nil } func transformErrorToEvent(err error) *watch.Event { diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 536b4026f..11afc6153 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -155,6 +155,11 @@ func TestWatchListMatchSingle(t *testing.T) { storagetesting.RunWatchListMatchSingle(ctx, t, store) } +func TestWatchErrorEventIsBlockingFurtherEvent(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunWatchErrorIsBlockingFurtherEvents(ctx, t, &storeWithPrefixTransformer{store}) +} + // ======================================================================= // Implementation-specific tests are following. // The following tests are exercising the details of the implementation diff --git a/pkg/storage/testing/watcher_tests.go b/pkg/storage/testing/watcher_tests.go index 61335dea9..7e9e8b32a 100644 --- a/pkg/storage/testing/watcher_tests.go +++ b/pkg/storage/testing/watcher_tests.go @@ -1698,6 +1698,73 @@ func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.In TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil) } +func RunWatchErrorIsBlockingFurtherEvents(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) { + foo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"}} + fooKey := fmt.Sprintf("/pods/%s/%s", foo.Namespace, foo.Name) + fooCreated := &example.Pod{} + if err := store.Create(context.Background(), fooKey, foo, fooCreated, 0); err != nil { + t.Errorf("failed to create object: %v", err) + } + bar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "bar"}} + barKey := fmt.Sprintf("/pods/%s/%s", bar.Namespace, bar.Name) + barCreated := &example.Pod{} + if err := store.Create(context.Background(), barKey, bar, barCreated, 0); err != nil { + t.Errorf("failed to create object: %v", err) + } + + // Update transformer to ensure that foo will become effectively corrupted. + revertTransformer := store.UpdatePrefixTransformer( + func(transformer *PrefixTransformer) value.Transformer { + transformer.prefix = []byte("other-prefix") + return transformer + }) + defer revertTransformer() + + baz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "baz"}} + bazKey := fmt.Sprintf("/pods/%s/%s", baz.Namespace, baz.Name) + bazCreated := &example.Pod{} + if err := store.Create(context.Background(), bazKey, baz, bazCreated, 0); err != nil { + t.Errorf("failed to create object: %v", err) + } + + opts := storage.ListOptions{ + ResourceVersion: fooCreated.ResourceVersion, + Predicate: storage.Everything, + Recursive: true, + } + + // Run N concurrent watches. Given the asynchronous nature, we increase the + // probability of hitting the race in at least one of those watches. + concurrentWatches := 10 + wg := sync.WaitGroup{} + for i := 0; i < concurrentWatches; i++ { + wg.Add(1) + go func() { + defer wg.Done() + w, err := store.Watch(ctx, "/pods", opts) + if err != nil { + t.Errorf("failed to create watch: %v", err) + return + } + + // We issue the watch starting from object bar. + // The object fails TransformFromStorage and generates ERROR watch event. + // The further events (i.e. ADDED event for baz object) should not be + // emitted, so we verify no events other than ERROR type are emitted. + for { + event, ok := <-w.ResultChan() + if !ok { + break + } + if event.Type != watch.Error { + t.Errorf("unexpected event: %#v", event) + } + } + }() + } + wg.Wait() +} + func makePod(namePrefix string) *example.Pod { return &example.Pod{ ObjectMeta: metav1.ObjectMeta{