Merge pull request #131020 from wojtek-t/fix_asynchronous_error

Fix race for sending errors in watch

Kubernetes-commit: b4d139094698687043b36f1c378dfeb1b654198a
This commit is contained in:
Kubernetes Publisher 2025-04-02 14:18:38 -07:00
commit d4f2fc56b5
5 changed files with 116 additions and 35 deletions

2
go.mod
View File

@ -48,7 +48,7 @@ require (
gopkg.in/evanphx/json-patch.v4 v4.12.0 gopkg.in/evanphx/json-patch.v4 v4.12.0
gopkg.in/go-jose/go-jose.v2 v2.6.3 gopkg.in/go-jose/go-jose.v2 v2.6.3
gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/natefinch/lumberjack.v2 v2.2.1
k8s.io/api v0.0.0-20250401102040-dc8867983a6e k8s.io/api v0.0.0-20250401102041-e8d4d542f6a9
k8s.io/apimachinery v0.0.0-20250401101805-7b4292bd2e85 k8s.io/apimachinery v0.0.0-20250401101805-7b4292bd2e85
k8s.io/client-go v0.0.0-20250401103247-18a1faa115ed k8s.io/client-go v0.0.0-20250401103247-18a1faa115ed
k8s.io/component-base v0.0.0-20250401105049-cc24073df84f k8s.io/component-base v0.0.0-20250401105049-cc24073df84f

4
go.sum
View File

@ -367,8 +367,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20250401102040-dc8867983a6e h1:oY/yt517XLctfPgsZl3UBOunhhwj3p957k5ptQZIl7A= k8s.io/api v0.0.0-20250401102041-e8d4d542f6a9 h1:0tq9NMjwiQ6AO5k0Hift5kKGV6GQptV6MWHfoDRzSxU=
k8s.io/api v0.0.0-20250401102040-dc8867983a6e/go.mod h1:FFhGt5u35h2C+LO52B0n4kxNjVzqZJieArQBpannX+s= k8s.io/api v0.0.0-20250401102041-e8d4d542f6a9/go.mod h1:FFhGt5u35h2C+LO52B0n4kxNjVzqZJieArQBpannX+s=
k8s.io/apimachinery v0.0.0-20250401101805-7b4292bd2e85 h1:eN9elIIXhVi5aK03Qb89lKWm8zpKmuhNjZLMt7fXZig= k8s.io/apimachinery v0.0.0-20250401101805-7b4292bd2e85 h1:eN9elIIXhVi5aK03Qb89lKWm8zpKmuhNjZLMt7fXZig=
k8s.io/apimachinery v0.0.0-20250401101805-7b4292bd2e85/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM= k8s.io/apimachinery v0.0.0-20250401101805-7b4292bd2e85/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM=
k8s.io/client-go v0.0.0-20250401103247-18a1faa115ed h1:O80bglccVvjn9M6m1artcLIXlXn47cwP7XDXaz+Z7X4= k8s.io/client-go v0.0.0-20250401103247-18a1faa115ed h1:O80bglccVvjn9M6m1artcLIXlXn47cwP7XDXaz+Z7X4=

View File

@ -438,7 +438,12 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
for { for {
select { select {
case e := <-wc.incomingEventChan: case e := <-wc.incomingEventChan:
res := wc.transform(e) res, err := wc.transform(e)
if err != nil {
wc.sendError(err)
return
}
if res == nil { if res == nil {
continue continue
} }
@ -461,10 +466,8 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) { func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
p := concurrentOrderedEventProcessing{ p := concurrentOrderedEventProcessing{
input: wc.incomingEventChan, wc: wc,
processFunc: wc.transform, processingQueue: make(chan chan *processingResult, processEventConcurrency-1),
output: wc.resultChan,
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
objectType: wc.watcher.objectType, objectType: wc.watcher.objectType,
groupResource: wc.watcher.groupResource, groupResource: wc.watcher.groupResource,
@ -481,12 +484,15 @@ func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
}() }()
} }
type concurrentOrderedEventProcessing struct { type processingResult struct {
input chan *event event *watch.Event
processFunc func(*event) *watch.Event err error
output chan watch.Event }
processingQueue chan chan *watch.Event type concurrentOrderedEventProcessing struct {
wc *watchChan
processingQueue chan chan *processingResult
// Metadata for logging // Metadata for logging
objectType string objectType string
groupResource schema.GroupResource groupResource schema.GroupResource
@ -498,28 +504,29 @@ func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case e = <-p.input: case e = <-p.wc.incomingEventChan:
} }
processingResponse := make(chan *watch.Event, 1) processingResponse := make(chan *processingResult, 1)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case p.processingQueue <- processingResponse: case p.processingQueue <- processingResponse:
} }
wg.Add(1) wg.Add(1)
go func(e *event, response chan<- *watch.Event) { go func(e *event, response chan<- *processingResult) {
defer wg.Done() defer wg.Done()
responseEvent, err := p.wc.transform(e)
select { select {
case <-ctx.Done(): case <-ctx.Done():
case response <- p.processFunc(e): case response <- &processingResult{event: responseEvent, err: err}:
} }
}(e, processingResponse) }(e, processingResponse)
} }
} }
func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) { func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
var processingResponse chan *watch.Event var processingResponse chan *processingResult
var e *watch.Event var r *processingResult
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -529,21 +536,25 @@ func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Co
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case e = <-processingResponse: case r = <-processingResponse:
} }
if e == nil { if r.err != nil {
p.wc.sendError(r.err)
return
}
if r.event == nil {
continue continue
} }
if len(p.output) == cap(p.output) { if len(p.wc.resultChan) == cap(p.wc.resultChan) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource) klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.wc.watcher.objectType, "groupResource", p.wc.watcher.groupResource)
} }
// If user couldn't receive results fast enough, we also block incoming events from watcher. // If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage. // Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher. // The worst case would be closing the fast watcher.
select { select {
case <-ctx.Done(): case p.wc.resultChan <- *r.event:
case <-p.wc.ctx.Done():
return return
case p.output <- *e:
} }
} }
} }
@ -561,12 +572,11 @@ func (wc *watchChan) acceptAll() bool {
} }
// transform transforms an event into a result for user if not filtered. // transform transforms an event into a result for user if not filtered.
func (wc *watchChan) transform(e *event) (res *watch.Event) { func (wc *watchChan) transform(e *event) (res *watch.Event, err error) {
curObj, oldObj, err := wc.prepareObjs(e) curObj, oldObj, err := wc.prepareObjs(e)
if err != nil { if err != nil {
klog.Errorf("failed to prepare current and previous objects: %v", err) klog.Errorf("failed to prepare current and previous objects: %v", err)
wc.sendError(err) return nil, err
return nil
} }
switch { switch {
@ -574,12 +584,11 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
object := wc.watcher.newFunc() object := wc.watcher.newFunc()
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil { if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
klog.Errorf("failed to propagate object version: %v", err) klog.Errorf("failed to propagate object version: %v", err)
return nil return nil, fmt.Errorf("failed to propagate object resource version: %w", err)
} }
if e.isInitialEventsEndBookmark { if e.isInitialEventsEndBookmark {
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil { if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err)) return nil, fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %w", wc.watcher.groupResource, wc.watcher.objectType, object, err)
return nil
} }
} }
res = &watch.Event{ res = &watch.Event{
@ -588,7 +597,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
} }
case e.isDeleted: case e.isDeleted:
if !wc.filter(oldObj) { if !wc.filter(oldObj) {
return nil return nil, nil
} }
res = &watch.Event{ res = &watch.Event{
Type: watch.Deleted, Type: watch.Deleted,
@ -596,7 +605,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
} }
case e.isCreated: case e.isCreated:
if !wc.filter(curObj) { if !wc.filter(curObj) {
return nil return nil, nil
} }
res = &watch.Event{ res = &watch.Event{
Type: watch.Added, Type: watch.Added,
@ -608,7 +617,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
Type: watch.Modified, Type: watch.Modified,
Object: curObj, Object: curObj,
} }
return res return res, nil
} }
curObjPasses := wc.filter(curObj) curObjPasses := wc.filter(curObj)
oldObjPasses := wc.filter(oldObj) oldObjPasses := wc.filter(oldObj)
@ -630,7 +639,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
} }
} }
} }
return res return res, nil
} }
func transformErrorToEvent(err error) *watch.Event { func transformErrorToEvent(err error) *watch.Event {

View File

@ -155,6 +155,11 @@ func TestWatchListMatchSingle(t *testing.T) {
storagetesting.RunWatchListMatchSingle(ctx, t, store) storagetesting.RunWatchListMatchSingle(ctx, t, store)
} }
func TestWatchErrorEventIsBlockingFurtherEvent(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchErrorIsBlockingFurtherEvents(ctx, t, &storeWithPrefixTransformer{store})
}
// ======================================================================= // =======================================================================
// Implementation-specific tests are following. // Implementation-specific tests are following.
// The following tests are exercising the details of the implementation // The following tests are exercising the details of the implementation

View File

@ -1698,6 +1698,73 @@ func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.In
TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil) TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
} }
func RunWatchErrorIsBlockingFurtherEvents(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
foo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"}}
fooKey := fmt.Sprintf("/pods/%s/%s", foo.Namespace, foo.Name)
fooCreated := &example.Pod{}
if err := store.Create(context.Background(), fooKey, foo, fooCreated, 0); err != nil {
t.Errorf("failed to create object: %v", err)
}
bar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "bar"}}
barKey := fmt.Sprintf("/pods/%s/%s", bar.Namespace, bar.Name)
barCreated := &example.Pod{}
if err := store.Create(context.Background(), barKey, bar, barCreated, 0); err != nil {
t.Errorf("failed to create object: %v", err)
}
// Update transformer to ensure that foo will become effectively corrupted.
revertTransformer := store.UpdatePrefixTransformer(
func(transformer *PrefixTransformer) value.Transformer {
transformer.prefix = []byte("other-prefix")
return transformer
})
defer revertTransformer()
baz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "baz"}}
bazKey := fmt.Sprintf("/pods/%s/%s", baz.Namespace, baz.Name)
bazCreated := &example.Pod{}
if err := store.Create(context.Background(), bazKey, baz, bazCreated, 0); err != nil {
t.Errorf("failed to create object: %v", err)
}
opts := storage.ListOptions{
ResourceVersion: fooCreated.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
}
// Run N concurrent watches. Given the asynchronous nature, we increase the
// probability of hitting the race in at least one of those watches.
concurrentWatches := 10
wg := sync.WaitGroup{}
for i := 0; i < concurrentWatches; i++ {
wg.Add(1)
go func() {
defer wg.Done()
w, err := store.Watch(ctx, "/pods", opts)
if err != nil {
t.Errorf("failed to create watch: %v", err)
return
}
// We issue the watch starting from object bar.
// The object fails TransformFromStorage and generates ERROR watch event.
// The further events (i.e. ADDED event for baz object) should not be
// emitted, so we verify no events other than ERROR type are emitted.
for {
event, ok := <-w.ResultChan()
if !ok {
break
}
if event.Type != watch.Error {
t.Errorf("unexpected event: %#v", event)
}
}
}()
}
wg.Wait()
}
func makePod(namePrefix string) *example.Pod { func makePod(namePrefix string) *example.Pod {
return &example.Pod{ return &example.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{