From e40efde837e61f2a06cd59cfe21f9cce29cebf9b Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 31 Aug 2020 12:40:41 +0200 Subject: [PATCH] Implement etcd3 progress-notify feature in etcd3 layer Kubernetes-commit: 56e72841b6005740453828a9f4f7a9a1b9a831f6 --- pkg/storage/etcd3/event.go | 20 +++++++---- pkg/storage/etcd3/metrics/metrics.go | 14 ++++++++ pkg/storage/etcd3/store.go | 6 ++-- pkg/storage/etcd3/store_test.go | 1 - pkg/storage/etcd3/watcher.go | 50 +++++++++++++++++++++++++--- pkg/storage/etcd3/watcher_test.go | 35 +++++++++++++++++-- pkg/storage/interfaces.go | 3 ++ 7 files changed, 112 insertions(+), 17 deletions(-) diff --git a/pkg/storage/etcd3/event.go b/pkg/storage/etcd3/event.go index c4e1f8032..83e52c064 100644 --- a/pkg/storage/etcd3/event.go +++ b/pkg/storage/etcd3/event.go @@ -23,12 +23,13 @@ import ( ) type event struct { - key string - value []byte - prevValue []byte - rev int64 - isDeleted bool - isCreated bool + key string + value []byte + prevValue []byte + rev int64 + isDeleted bool + isCreated bool + isProgressNotify bool } // parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event. @@ -61,3 +62,10 @@ func parseEvent(e *clientv3.Event) (*event, error) { } return ret, nil } + +func progressNotifyEvent(rev int64) *event { + return &event{ + rev: rev, + isProgressNotify: true, + } +} diff --git a/pkg/storage/etcd3/metrics/metrics.go b/pkg/storage/etcd3/metrics/metrics.go index 4ba0b14fd..1f001406a 100644 --- a/pkg/storage/etcd3/metrics/metrics.go +++ b/pkg/storage/etcd3/metrics/metrics.go @@ -61,6 +61,14 @@ var ( }, []string{"endpoint"}, ) + etcdBookmarkCounts = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Name: "etcd_bookmark_counts", + Help: "Number of etcd bookmarks (progress notify events) split by kind.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource"}, + ) ) var registerMetrics sync.Once @@ -72,6 +80,7 @@ func Register() { legacyregistry.MustRegister(etcdRequestLatency) legacyregistry.MustRegister(objectCounts) legacyregistry.MustRegister(dbTotalSize) + legacyregistry.MustRegister(etcdBookmarkCounts) }) } @@ -85,6 +94,11 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) { etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime)) } +// RecordEtcdBookmark updates the etcd_bookmark_counts metric. +func RecordEtcdBookmark(resource string) { + etcdBookmarkCounts.WithLabelValues(resource).Inc() +} + // Reset resets the etcd_request_duration_seconds metric. func Reset() { etcdRequestLatency.Reset() diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 0b294da0e..a7307f46f 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -87,7 +87,7 @@ func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer) } -func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { +func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { versioner := APIObjectVersioner{} result := &store{ client: c, @@ -99,7 +99,7 @@ func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, c // no-op for default prefix of '/registry'. // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' pathPrefix: path.Join("/", prefix), - watcher: newWatcher(c, codec, versioner, transformer), + watcher: newWatcher(c, codec, newFunc, versioner, transformer), leaseManager: newDefaultLeaseManager(c), } return result @@ -784,7 +784,7 @@ func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions, return nil, err } key = path.Join(s.pathPrefix, key) - return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.Predicate) + return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.ProgressNotify, opts.Predicate) } func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) { diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index 191cfe677..e48462a8e 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -2083,7 +2083,6 @@ func TestConsistentList(t *testing.T) { if !reflect.DeepEqual(result3, result4) { t.Errorf("inconsistent lists: %#v, %#v", result3, result4) } - } func TestCount(t *testing.T) { diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 793495f3e..bd87382e8 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "os" + "reflect" "strconv" "strings" "sync" @@ -29,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/value" "go.etcd.io/etcd/clientv3" @@ -68,6 +70,8 @@ func TestOnlySetFatalOnDecodeError(b bool) { type watcher struct { client *clientv3.Client codec runtime.Codec + newFunc func() runtime.Object + objectType string versioner storage.Versioner transformer value.Transformer } @@ -78,6 +82,7 @@ type watchChan struct { key string initialRev int64 recursive bool + progressNotify bool internalPred storage.SelectionPredicate ctx context.Context cancel context.CancelFunc @@ -86,13 +91,20 @@ type watchChan struct { errChan chan error } -func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher { - return &watcher{ +func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher { + res := &watcher{ client: client, codec: codec, + newFunc: newFunc, versioner: versioner, transformer: transformer, } + if newFunc == nil { + res.objectType = "" + } else { + res.objectType = reflect.TypeOf(newFunc()).String() + } + return res } // Watch watches on a key and returns a watch.Interface that transfers relevant notifications. @@ -102,21 +114,22 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage. // If recursive is false, it watches on given key. // If recursive is true, it watches any children and directories under the key, excluding the root key itself. // pred must be non-nil. Only if pred matches the change, it will be returned. -func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) { +func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) { if recursive && !strings.HasSuffix(key, "/") { key += "/" } - wc := w.createWatchChan(ctx, key, rev, recursive, pred) + wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred) go wc.run() return wc, nil } -func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan { +func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan { wc := &watchChan{ watcher: w, key: key, initialRev: rev, recursive: recursive, + progressNotify: progressNotify, internalPred: pred, incomingEventChan: make(chan *event, incomingBufSize), resultChan: make(chan watch.Event, outgoingBufSize), @@ -223,6 +236,9 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { if wc.recursive { opts = append(opts, clientv3.WithPrefix()) } + if wc.progressNotify { + opts = append(opts, clientv3.WithProgressNotify()) + } wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...) for wres := range wch { if wres.Err() != nil { @@ -232,6 +248,12 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { wc.sendError(err) return } + if wres.IsProgressNotify() { + wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision())) + metrics.RecordEtcdBookmark(wc.watcher.objectType) + continue + } + for _, e := range wres.Events { parsedEvent, err := parseEvent(e) if err != nil { @@ -299,6 +321,19 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { } switch { + case e.isProgressNotify: + if wc.watcher.newFunc == nil { + return nil + } + object := wc.watcher.newFunc() + if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil { + klog.Errorf("failed to propagate object version: %v", err) + return nil + } + res = &watch.Event{ + Type: watch.Bookmark, + Object: object, + } case e.isDeleted: if !wc.filter(oldObj) { return nil @@ -376,6 +411,11 @@ func (wc *watchChan) sendEvent(e *event) { } func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) { + if e.isProgressNotify { + // progressNotify events doesn't contain neither current nor previous object version, + return nil, nil, nil + } + if !e.isDeleted { data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key)) if err != nil { diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 4a1dda946..e706edf3d 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -246,7 +246,7 @@ func TestWatchContextCancel(t *testing.T) { cancel() // When we watch with a canceled context, we should detect that it's context canceled. // We won't take it as error and also close the watcher. - w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything) + w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, false, storage.Everything) if err != nil { t.Fatal(err) } @@ -265,7 +265,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { origCtx, store, cluster := testSetup(t) defer cluster.Terminate(t) ctx, cancel := context.WithCancel(origCtx) - w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything) + w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything) // make resutlChan and errChan blocking to ensure ordering. w.resultChan = make(chan watch.Event) w.errChan = make(chan error) @@ -314,6 +314,37 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { } } +func TestProgressNotify(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + clusterConfig := &integration.ClusterConfig{ + Size: 1, + WatchProgressNotifyInterval: time.Second, + } + cluster := integration.NewClusterV3(t, clusterConfig) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + ctx := context.Background() + + key := "/somekey" + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}} + out := &example.Pod{} + if err := store.Create(ctx, key, input, out, 0); err != nil { + t.Fatalf("Create failed: %v", err) + } + + opts := storage.ListOptions{ + ResourceVersion: out.ResourceVersion, + Predicate: storage.Everything, + ProgressNotify: true, + } + w, err := store.Watch(ctx, key, opts) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + result := &example.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: out.ResourceVersion}} + testCheckResult(t, 0, watch.Bookmark, w, result) +} + type testWatchStruct struct { obj *example.Pod expectEvent bool diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index fb3002080..6249b5cc5 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -269,4 +269,7 @@ type ListOptions struct { ResourceVersionMatch metav1.ResourceVersionMatch // Predicate provides the selection rules for the list operation. Predicate SelectionPredicate + // ProgressNotify determines whether storage-originated bookmark (progress notify) events should + // be delivered to the users. The option is ignored for non-watch requests. + ProgressNotify bool }