Implement etcd3 progress-notify feature in etcd3 layer

Kubernetes-commit: 56e72841b6005740453828a9f4f7a9a1b9a831f6
This commit is contained in:
wojtekt 2020-08-31 12:40:41 +02:00 committed by Kubernetes Publisher
parent a5000473c1
commit e40efde837
7 changed files with 112 additions and 17 deletions

View File

@ -23,12 +23,13 @@ import (
) )
type event struct { type event struct {
key string key string
value []byte value []byte
prevValue []byte prevValue []byte
rev int64 rev int64
isDeleted bool isDeleted bool
isCreated bool isCreated bool
isProgressNotify bool
} }
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event. // parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
@ -61,3 +62,10 @@ func parseEvent(e *clientv3.Event) (*event, error) {
} }
return ret, nil return ret, nil
} }
func progressNotifyEvent(rev int64) *event {
return &event{
rev: rev,
isProgressNotify: true,
}
}

View File

@ -61,6 +61,14 @@ var (
}, },
[]string{"endpoint"}, []string{"endpoint"},
) )
etcdBookmarkCounts = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "etcd_bookmark_counts",
Help: "Number of etcd bookmarks (progress notify events) split by kind.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -72,6 +80,7 @@ func Register() {
legacyregistry.MustRegister(etcdRequestLatency) legacyregistry.MustRegister(etcdRequestLatency)
legacyregistry.MustRegister(objectCounts) legacyregistry.MustRegister(objectCounts)
legacyregistry.MustRegister(dbTotalSize) legacyregistry.MustRegister(dbTotalSize)
legacyregistry.MustRegister(etcdBookmarkCounts)
}) })
} }
@ -85,6 +94,11 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime)) etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
} }
// RecordEtcdBookmark updates the etcd_bookmark_counts metric.
func RecordEtcdBookmark(resource string) {
etcdBookmarkCounts.WithLabelValues(resource).Inc()
}
// Reset resets the etcd_request_duration_seconds metric. // Reset resets the etcd_request_duration_seconds metric.
func Reset() { func Reset() {
etcdRequestLatency.Reset() etcdRequestLatency.Reset()

View File

@ -87,7 +87,7 @@ func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object,
return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer) return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer)
} }
func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := APIObjectVersioner{} versioner := APIObjectVersioner{}
result := &store{ result := &store{
client: c, client: c,
@ -99,7 +99,7 @@ func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, c
// no-op for default prefix of '/registry'. // no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix), pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, versioner, transformer), watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c), leaseManager: newDefaultLeaseManager(c),
} }
return result return result
@ -784,7 +784,7 @@ func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions,
return nil, err return nil, err
} }
key = path.Join(s.pathPrefix, key) key = path.Join(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.Predicate) return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.ProgressNotify, opts.Predicate)
} }
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) { func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {

View File

@ -2083,7 +2083,6 @@ func TestConsistentList(t *testing.T) {
if !reflect.DeepEqual(result3, result4) { if !reflect.DeepEqual(result3, result4) {
t.Errorf("inconsistent lists: %#v, %#v", result3, result4) t.Errorf("inconsistent lists: %#v, %#v", result3, result4)
} }
} }
func TestCount(t *testing.T) { func TestCount(t *testing.T) {

View File

@ -21,6 +21,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"reflect"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value" "k8s.io/apiserver/pkg/storage/value"
"go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3"
@ -68,6 +70,8 @@ func TestOnlySetFatalOnDecodeError(b bool) {
type watcher struct { type watcher struct {
client *clientv3.Client client *clientv3.Client
codec runtime.Codec codec runtime.Codec
newFunc func() runtime.Object
objectType string
versioner storage.Versioner versioner storage.Versioner
transformer value.Transformer transformer value.Transformer
} }
@ -78,6 +82,7 @@ type watchChan struct {
key string key string
initialRev int64 initialRev int64
recursive bool recursive bool
progressNotify bool
internalPred storage.SelectionPredicate internalPred storage.SelectionPredicate
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -86,13 +91,20 @@ type watchChan struct {
errChan chan error errChan chan error
} }
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher { func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
return &watcher{ res := &watcher{
client: client, client: client,
codec: codec, codec: codec,
newFunc: newFunc,
versioner: versioner, versioner: versioner,
transformer: transformer, transformer: transformer,
} }
if newFunc == nil {
res.objectType = "<unknown>"
} else {
res.objectType = reflect.TypeOf(newFunc()).String()
}
return res
} }
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications. // Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
@ -102,21 +114,22 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.
// If recursive is false, it watches on given key. // If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself. // If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if pred matches the change, it will be returned. // pred must be non-nil. Only if pred matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) { func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") { if recursive && !strings.HasSuffix(key, "/") {
key += "/" key += "/"
} }
wc := w.createWatchChan(ctx, key, rev, recursive, pred) wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
go wc.run() go wc.run()
return wc, nil return wc, nil
} }
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan { func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
wc := &watchChan{ wc := &watchChan{
watcher: w, watcher: w,
key: key, key: key,
initialRev: rev, initialRev: rev,
recursive: recursive, recursive: recursive,
progressNotify: progressNotify,
internalPred: pred, internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize), incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize), resultChan: make(chan watch.Event, outgoingBufSize),
@ -223,6 +236,9 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.recursive { if wc.recursive {
opts = append(opts, clientv3.WithPrefix()) opts = append(opts, clientv3.WithPrefix())
} }
if wc.progressNotify {
opts = append(opts, clientv3.WithProgressNotify())
}
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...) wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
for wres := range wch { for wres := range wch {
if wres.Err() != nil { if wres.Err() != nil {
@ -232,6 +248,12 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
wc.sendError(err) wc.sendError(err)
return return
} }
if wres.IsProgressNotify() {
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
metrics.RecordEtcdBookmark(wc.watcher.objectType)
continue
}
for _, e := range wres.Events { for _, e := range wres.Events {
parsedEvent, err := parseEvent(e) parsedEvent, err := parseEvent(e)
if err != nil { if err != nil {
@ -299,6 +321,19 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
} }
switch { switch {
case e.isProgressNotify:
if wc.watcher.newFunc == nil {
return nil
}
object := wc.watcher.newFunc()
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
klog.Errorf("failed to propagate object version: %v", err)
return nil
}
res = &watch.Event{
Type: watch.Bookmark,
Object: object,
}
case e.isDeleted: case e.isDeleted:
if !wc.filter(oldObj) { if !wc.filter(oldObj) {
return nil return nil
@ -376,6 +411,11 @@ func (wc *watchChan) sendEvent(e *event) {
} }
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) { func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
if e.isProgressNotify {
// progressNotify events doesn't contain neither current nor previous object version,
return nil, nil, nil
}
if !e.isDeleted { if !e.isDeleted {
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key)) data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
if err != nil { if err != nil {

View File

@ -246,7 +246,7 @@ func TestWatchContextCancel(t *testing.T) {
cancel() cancel()
// When we watch with a canceled context, we should detect that it's context canceled. // When we watch with a canceled context, we should detect that it's context canceled.
// We won't take it as error and also close the watcher. // We won't take it as error and also close the watcher.
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything) w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, false, storage.Everything)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -265,7 +265,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
origCtx, store, cluster := testSetup(t) origCtx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
ctx, cancel := context.WithCancel(origCtx) ctx, cancel := context.WithCancel(origCtx)
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything) w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
// make resutlChan and errChan blocking to ensure ordering. // make resutlChan and errChan blocking to ensure ordering.
w.resultChan = make(chan watch.Event) w.resultChan = make(chan watch.Event)
w.errChan = make(chan error) w.errChan = make(chan error)
@ -314,6 +314,37 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
} }
} }
func TestProgressNotify(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
clusterConfig := &integration.ClusterConfig{
Size: 1,
WatchProgressNotifyInterval: time.Second,
}
cluster := integration.NewClusterV3(t, clusterConfig)
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
key := "/somekey"
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}
out := &example.Pod{}
if err := store.Create(ctx, key, input, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
opts := storage.ListOptions{
ResourceVersion: out.ResourceVersion,
Predicate: storage.Everything,
ProgressNotify: true,
}
w, err := store.Watch(ctx, key, opts)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
result := &example.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: out.ResourceVersion}}
testCheckResult(t, 0, watch.Bookmark, w, result)
}
type testWatchStruct struct { type testWatchStruct struct {
obj *example.Pod obj *example.Pod
expectEvent bool expectEvent bool

View File

@ -269,4 +269,7 @@ type ListOptions struct {
ResourceVersionMatch metav1.ResourceVersionMatch ResourceVersionMatch metav1.ResourceVersionMatch
// Predicate provides the selection rules for the list operation. // Predicate provides the selection rules for the list operation.
Predicate SelectionPredicate Predicate SelectionPredicate
// ProgressNotify determines whether storage-originated bookmark (progress notify) events should
// be delivered to the users. The option is ignored for non-watch requests.
ProgressNotify bool
} }