pkg/storage/etcd3: be more precise in watch test

Previously, this test assumed that:
 - a global watch would return only an event for the key in question
 - only the delete event in question would be returned

Neither of these assumptions are correct for an etcd backend as long
as any other clients are interacting with the system. This commit
makes the watch more specific and extracts the correct event.

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>

Kubernetes-commit: 2631c0a0f959bd67aa455045dce33e77150ab5f8
This commit is contained in:
Steve Kuznetsov 2022-03-23 12:59:53 -08:00 committed by Kubernetes Publisher
parent 83e26430f0
commit 42854c1734
1 changed files with 26 additions and 8 deletions

View File

@ -23,6 +23,7 @@ import (
"testing" "testing"
"time" "time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/api/apitesting" "k8s.io/apimachinery/pkg/api/apitesting"
@ -293,24 +294,26 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
etcdW := client.Watch(ctx, "/", clientv3.WithPrefix()) rv, err := APIObjectVersioner{}.ObjectResourceVersion(storedObj)
if err != nil {
t.Fatalf("failed to parse resourceVersion on stored object: %v", err)
}
etcdW := client.Watch(ctx, key, clientv3.WithRev(int64(rv)))
if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil { if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil {
t.Fatalf("Delete failed: %v", err) t.Fatalf("Delete failed: %v", err)
} }
var e watch.Event var e watch.Event
var wres clientv3.WatchResponse
watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout) watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout)
select { select {
case e = <-w.ResultChan(): case e = <-w.ResultChan():
case <-watchCtx.Done(): case <-watchCtx.Done():
t.Fatalf("timed out waiting for watch event") t.Fatalf("timed out waiting for watch event")
} }
select { deletedRV, err := deletedRevision(watchCtx, etcdW)
case wres = <-etcdW: if err != nil {
case <-watchCtx.Done(): t.Fatalf("did not see delete event in raw watch: %v", err)
t.Fatalf("timed out waiting for raw watch event")
} }
watchedDeleteObj := e.Object.(*example.Pod) watchedDeleteObj := e.Object.(*example.Pod)
@ -318,9 +321,24 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("ParseWatchResourceVersion failed: %v", err) t.Fatalf("ParseWatchResourceVersion failed: %v", err)
} }
if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision { if int64(watchedDeleteRev) != deletedRV {
t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d", t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d",
watchedDeleteRev, wres.Events[0].Kv.ModRevision) watchedDeleteRev, deletedRV)
}
}
func deletedRevision(ctx context.Context, watch <-chan clientv3.WatchResponse) (int64, error) {
for {
select {
case <-ctx.Done():
return 0, ctx.Err()
case wres := <-watch:
for _, evt := range wres.Events {
if evt.Type == mvccpb.DELETE && evt.Kv != nil {
return evt.Kv.ModRevision, nil
}
}
}
} }
} }