Fix race for sending errors in watch

Kubernetes-commit: c8c2844aaf1d04835624ff2d46417492e10dec11
This commit is contained in:
Wojciech Tyczyński 2025-03-24 14:02:07 +01:00 committed by Kubernetes Publisher
parent beaef1d3ec
commit b9e86eb851
3 changed files with 113 additions and 32 deletions

View File

@ -438,7 +438,12 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
for { for {
select { select {
case e := <-wc.incomingEventChan: case e := <-wc.incomingEventChan:
res := wc.transform(e) res, err := wc.transform(e)
if err != nil {
wc.sendError(err)
return
}
if res == nil { if res == nil {
continue continue
} }
@ -461,10 +466,8 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) { func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
p := concurrentOrderedEventProcessing{ p := concurrentOrderedEventProcessing{
input: wc.incomingEventChan, wc: wc,
processFunc: wc.transform, processingQueue: make(chan chan *processingResult, processEventConcurrency-1),
output: wc.resultChan,
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
objectType: wc.watcher.objectType, objectType: wc.watcher.objectType,
groupResource: wc.watcher.groupResource, groupResource: wc.watcher.groupResource,
@ -481,12 +484,15 @@ func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
}() }()
} }
type concurrentOrderedEventProcessing struct { type processingResult struct {
input chan *event event *watch.Event
processFunc func(*event) *watch.Event err error
output chan watch.Event }
processingQueue chan chan *watch.Event type concurrentOrderedEventProcessing struct {
wc *watchChan
processingQueue chan chan *processingResult
// Metadata for logging // Metadata for logging
objectType string objectType string
groupResource schema.GroupResource groupResource schema.GroupResource
@ -498,28 +504,29 @@ func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case e = <-p.input: case e = <-p.wc.incomingEventChan:
} }
processingResponse := make(chan *watch.Event, 1) processingResponse := make(chan *processingResult, 1)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case p.processingQueue <- processingResponse: case p.processingQueue <- processingResponse:
} }
wg.Add(1) wg.Add(1)
go func(e *event, response chan<- *watch.Event) { go func(e *event, response chan<- *processingResult) {
defer wg.Done() defer wg.Done()
responseEvent, err := p.wc.transform(e)
select { select {
case <-ctx.Done(): case <-ctx.Done():
case response <- p.processFunc(e): case response <- &processingResult{event: responseEvent, err: err}:
} }
}(e, processingResponse) }(e, processingResponse)
} }
} }
func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) { func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
var processingResponse chan *watch.Event var processingResponse chan *processingResult
var e *watch.Event var r *processingResult
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -529,21 +536,25 @@ func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Co
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case e = <-processingResponse: case r = <-processingResponse:
} }
if e == nil { if r.err != nil {
p.wc.sendError(r.err)
return
}
if r.event == nil {
continue continue
} }
if len(p.output) == cap(p.output) { if len(p.wc.resultChan) == cap(p.wc.resultChan) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource) klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.wc.watcher.objectType, "groupResource", p.wc.watcher.groupResource)
} }
// If user couldn't receive results fast enough, we also block incoming events from watcher. // If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage. // Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher. // The worst case would be closing the fast watcher.
select { select {
case <-ctx.Done(): case p.wc.resultChan <- *r.event:
case <-p.wc.ctx.Done():
return return
case p.output <- *e:
} }
} }
} }
@ -561,12 +572,11 @@ func (wc *watchChan) acceptAll() bool {
} }
// transform transforms an event into a result for user if not filtered. // transform transforms an event into a result for user if not filtered.
func (wc *watchChan) transform(e *event) (res *watch.Event) { func (wc *watchChan) transform(e *event) (res *watch.Event, err error) {
curObj, oldObj, err := wc.prepareObjs(e) curObj, oldObj, err := wc.prepareObjs(e)
if err != nil { if err != nil {
klog.Errorf("failed to prepare current and previous objects: %v", err) klog.Errorf("failed to prepare current and previous objects: %v", err)
wc.sendError(err) return nil, err
return nil
} }
switch { switch {
@ -574,12 +584,11 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
object := wc.watcher.newFunc() object := wc.watcher.newFunc()
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil { if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
klog.Errorf("failed to propagate object version: %v", err) klog.Errorf("failed to propagate object version: %v", err)
return nil return nil, fmt.Errorf("failed to propagate object resource version: %w", err)
} }
if e.isInitialEventsEndBookmark { if e.isInitialEventsEndBookmark {
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil { if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err)) return nil, fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %w", wc.watcher.groupResource, wc.watcher.objectType, object, err)
return nil
} }
} }
res = &watch.Event{ res = &watch.Event{
@ -588,7 +597,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
} }
case e.isDeleted: case e.isDeleted:
if !wc.filter(oldObj) { if !wc.filter(oldObj) {
return nil return nil, nil
} }
res = &watch.Event{ res = &watch.Event{
Type: watch.Deleted, Type: watch.Deleted,
@ -596,7 +605,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
} }
case e.isCreated: case e.isCreated:
if !wc.filter(curObj) { if !wc.filter(curObj) {
return nil return nil, nil
} }
res = &watch.Event{ res = &watch.Event{
Type: watch.Added, Type: watch.Added,
@ -608,7 +617,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
Type: watch.Modified, Type: watch.Modified,
Object: curObj, Object: curObj,
} }
return res return res, nil
} }
curObjPasses := wc.filter(curObj) curObjPasses := wc.filter(curObj)
oldObjPasses := wc.filter(oldObj) oldObjPasses := wc.filter(oldObj)
@ -630,7 +639,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
} }
} }
} }
return res return res, nil
} }
func transformErrorToEvent(err error) *watch.Event { func transformErrorToEvent(err error) *watch.Event {

View File

@ -155,6 +155,11 @@ func TestWatchListMatchSingle(t *testing.T) {
storagetesting.RunWatchListMatchSingle(ctx, t, store) storagetesting.RunWatchListMatchSingle(ctx, t, store)
} }
func TestWatchErrorEventIsBlockingFurtherEvent(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchErrorIsBlockingFurtherEvents(ctx, t, &storeWithPrefixTransformer{store})
}
// ======================================================================= // =======================================================================
// Implementation-specific tests are following. // Implementation-specific tests are following.
// The following tests are exercising the details of the implementation // The following tests are exercising the details of the implementation

View File

@ -1698,6 +1698,73 @@ func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.In
TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil) TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
} }
func RunWatchErrorIsBlockingFurtherEvents(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
foo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"}}
fooKey := fmt.Sprintf("/pods/%s/%s", foo.Namespace, foo.Name)
fooCreated := &example.Pod{}
if err := store.Create(context.Background(), fooKey, foo, fooCreated, 0); err != nil {
t.Errorf("failed to create object: %v", err)
}
bar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "bar"}}
barKey := fmt.Sprintf("/pods/%s/%s", bar.Namespace, bar.Name)
barCreated := &example.Pod{}
if err := store.Create(context.Background(), barKey, bar, barCreated, 0); err != nil {
t.Errorf("failed to create object: %v", err)
}
// Update transformer to ensure that foo will become effectively corrupted.
revertTransformer := store.UpdatePrefixTransformer(
func(transformer *PrefixTransformer) value.Transformer {
transformer.prefix = []byte("other-prefix")
return transformer
})
defer revertTransformer()
baz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "baz"}}
bazKey := fmt.Sprintf("/pods/%s/%s", baz.Namespace, baz.Name)
bazCreated := &example.Pod{}
if err := store.Create(context.Background(), bazKey, baz, bazCreated, 0); err != nil {
t.Errorf("failed to create object: %v", err)
}
opts := storage.ListOptions{
ResourceVersion: fooCreated.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
}
// Run N concurrent watches. Given the asynchronous nature, we increase the
// probability of hitting the race in at least one of those watches.
concurrentWatches := 10
wg := sync.WaitGroup{}
for i := 0; i < concurrentWatches; i++ {
wg.Add(1)
go func() {
defer wg.Done()
w, err := store.Watch(ctx, "/pods", opts)
if err != nil {
t.Errorf("failed to create watch: %v", err)
return
}
// We issue the watch starting from object bar.
// The object fails TransformFromStorage and generates ERROR watch event.
// The further events (i.e. ADDED event for baz object) should not be
// emitted, so we verify no events other than ERROR type are emitted.
for {
event, ok := <-w.ResultChan()
if !ok {
break
}
if event.Type != watch.Error {
t.Errorf("unexpected event: %#v", event)
}
}
}()
}
wg.Wait()
}
func makePod(namePrefix string) *example.Pod { func makePod(namePrefix string) *example.Pod {
return &example.Pod{ return &example.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{