Merge pull request #94364 from wojtek-t/efficient_watch_resumption

Efficient watch resumption

Kubernetes-commit: 0f39af90ed39794ceea426aa0f77de67b1392308
This commit is contained in:
Kubernetes Publisher 2020-09-25 15:42:48 -07:00
commit e996d15d55
21 changed files with 209 additions and 65 deletions

2
Godeps/Godeps.json generated
View File

@ -680,7 +680,7 @@
},
{
"ImportPath": "k8s.io/client-go",
"Rev": "f8c0b224ad5a"
"Rev": "a0a9b7d9d5e2"
},
{
"ImportPath": "k8s.io/component-base",

4
go.mod
View File

@ -43,7 +43,7 @@ require (
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.0.0-20200922195808-5bb35d2636ca
k8s.io/apimachinery v0.0.0-20200922235617-829ed199f4e0
k8s.io/client-go v0.0.0-20200924040014-f8c0b224ad5a
k8s.io/client-go v0.0.0-20200926000026-a0a9b7d9d5e2
k8s.io/component-base v0.0.0-20200911092040-c985e940ef8f
k8s.io/klog/v2 v2.2.0
k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6
@ -56,6 +56,6 @@ require (
replace (
k8s.io/api => k8s.io/api v0.0.0-20200922195808-5bb35d2636ca
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200922235617-829ed199f4e0
k8s.io/client-go => k8s.io/client-go v0.0.0-20200924040014-f8c0b224ad5a
k8s.io/client-go => k8s.io/client-go v0.0.0-20200926000026-a0a9b7d9d5e2
k8s.io/component-base => k8s.io/component-base v0.0.0-20200911092040-c985e940ef8f
)

2
go.sum
View File

@ -504,7 +504,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
k8s.io/api v0.0.0-20200922195808-5bb35d2636ca/go.mod h1:FAsg3Y/xcbFIbnyfpgARCQ8di7NxpIRZWbawQTkHTDo=
k8s.io/apimachinery v0.0.0-20200922235617-829ed199f4e0/go.mod h1:DnPGDnARWFvYa3pMHgSxtbZb7gpzzAZ1pTfaUNDVlmA=
k8s.io/client-go v0.0.0-20200924040014-f8c0b224ad5a/go.mod h1:0TFqP6nDzw4kbUbyCXc0xcULYS2MvbAQIAaEhgnPPAA=
k8s.io/client-go v0.0.0-20200926000026-a0a9b7d9d5e2/go.mod h1:0TFqP6nDzw4kbUbyCXc0xcULYS2MvbAQIAaEhgnPPAA=
k8s.io/component-base v0.0.0-20200911092040-c985e940ef8f/go.mod h1:KLUf6+rBAAlh4P5aX9t725mVdFgvY6LfYzl+QOveAV4=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=

View File

@ -138,6 +138,12 @@ const (
//
// Allows sending warning headers in API responses.
WarningHeaders featuregate.Feature = "WarningHeaders"
// owner: @wojtek-t
// alpha: v1.20
//
// Allows for updating watchcache resource version with progress notify events.
EfficientWatchResumption featuregate.Feature = "EfficientWatchResumption"
)
func init() {
@ -148,18 +154,19 @@ func init() {
// To add a new feature, define a key for it above and add it here. The features will be
// available throughout Kubernetes binaries.
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
DryRun: {Default: true, PreRelease: featuregate.GA},
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
DryRun: {Default: true, PreRelease: featuregate.GA},
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
EfficientWatchResumption: {Default: false, PreRelease: featuregate.Alpha},
}

View File

@ -38,7 +38,7 @@ import (
func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) {
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, destroy, err := factory.Create(*sc)
s, destroy, err := factory.Create(*sc, nil)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}

View File

@ -45,7 +45,7 @@ func StorageWithCacher() generic.StorageDecorator {
triggerFuncs storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
s, d, err := generic.NewRawStorage(storageConfig)
s, d, err := generic.NewRawStorage(storageConfig, newFunc)
if err != nil {
return s, d, err
}

View File

@ -1601,8 +1601,11 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
strategy := &testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true}
newFunc := func() runtime.Object { return &example.Pod{} }
newListFunc := func() runtime.Object { return &example.PodList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc)
s, dFunc, err := factory.Create(*sc, newFunc)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}
@ -1617,8 +1620,8 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
GetAttrsFunc: getPodAttrs,
NewFunc: func() runtime.Object { return &example.Pod{} },
NewListFunc: func() runtime.Object { return &example.PodList{} },
NewFunc: newFunc,
NewListFunc: newListFunc,
Codec: sc.Codec,
}
cacher, err := cacherstorage.NewCacherFromConfig(config)

View File

@ -47,12 +47,12 @@ func UndecoratedStorage(
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config)
return NewRawStorage(config, newFunc)
}
// NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config)
func NewRawStorage(config *storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config, newFunc)
}

View File

@ -1098,7 +1098,14 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything})
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
}
if utilfeature.DefaultFeatureGate.Enabled(features.EfficientWatchResumption) {
opts.ProgressNotify = true
}
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, opts)
}
// errWatcher implements watch.Interface to return a single error

View File

@ -381,6 +381,29 @@ func (w *watchCache) doCacheResizeLocked(capacity int) {
w.capacity = capacity
}
func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
rv, err := w.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
klog.Errorf("Couldn't parse resourceVersion: %v", err)
return
}
w.Lock()
defer w.Unlock()
w.resourceVersion = rv
// Don't dispatch bookmarks coming from the storage layer.
// They can be very frequent (even to the level of subseconds)
// to allow efficient watch resumption on kube-apiserver restarts,
// and propagating them down may overload the whole system.
//
// TODO: If at some point we decide the performance and scalability
// footprint is acceptable, this is the place to hook them in.
// However, we then need to check if this was called as a result
// of a bookmark event or regular Add/Update/Delete operation by
// checking if resourceVersion here has changed.
}
// List returns list of pointers to <storeElement> objects.
func (w *watchCache) List() []interface{} {
return w.store.List()

View File

@ -23,12 +23,13 @@ import (
)
type event struct {
key string
value []byte
prevValue []byte
rev int64
isDeleted bool
isCreated bool
key string
value []byte
prevValue []byte
rev int64
isDeleted bool
isCreated bool
isProgressNotify bool
}
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
@ -61,3 +62,10 @@ func parseEvent(e *clientv3.Event) (*event, error) {
}
return ret, nil
}
func progressNotifyEvent(rev int64) *event {
return &event{
rev: rev,
isProgressNotify: true,
}
}

View File

@ -61,6 +61,14 @@ var (
},
[]string{"endpoint"},
)
etcdBookmarkCounts = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "etcd_bookmark_counts",
Help: "Number of etcd bookmarks (progress notify events) split by kind.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
)
var registerMetrics sync.Once
@ -72,6 +80,7 @@ func Register() {
legacyregistry.MustRegister(etcdRequestLatency)
legacyregistry.MustRegister(objectCounts)
legacyregistry.MustRegister(dbTotalSize)
legacyregistry.MustRegister(etcdBookmarkCounts)
})
}
@ -85,6 +94,11 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
}
// RecordEtcdBookmark updates the etcd_bookmark_counts metric.
func RecordEtcdBookmark(resource string) {
etcdBookmarkCounts.WithLabelValues(resource).Inc()
}
// Reset resets the etcd_request_duration_seconds metric.
func Reset() {
etcdRequestLatency.Reset()

View File

@ -83,11 +83,11 @@ type objState struct {
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, pagingEnabled, codec, prefix, transformer)
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer)
}
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := APIObjectVersioner{}
result := &store{
client: c,
@ -99,7 +99,7 @@ func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefi
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, versioner, transformer),
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c),
}
return result
@ -784,7 +784,7 @@ func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions,
return nil, err
}
key = path.Join(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.Predicate)
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.ProgressNotify, opts.Predicate)
}
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {

View File

@ -99,6 +99,10 @@ func (p *prefixTransformer) resetReads() {
p.reads = 0
}
func newPod() runtime.Object {
return &example.Pod{}
}
func TestCreate(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
@ -818,7 +822,7 @@ func TestTransformationFailure(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
preset := []struct {
@ -895,8 +899,8 @@ func TestList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// Setup storage with the following structure:
@ -1394,7 +1398,7 @@ func TestListContinuation(t *testing.T) {
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, true, codec, "", transformer)
store := newStore(etcdClient, newPod, true, codec, "", transformer)
ctx := context.Background()
// Setup storage with the following structure:
@ -1556,7 +1560,7 @@ func TestListContinuationWithFilter(t *testing.T) {
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, true, codec, "", transformer)
store := newStore(etcdClient, newPod, true, codec, "", transformer)
ctx := context.Background()
preset := []struct {
@ -1659,7 +1663,7 @@ func TestListInconsistentContinuation(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// Setup storage with the following structure:
@ -1804,7 +1808,7 @@ func TestListInconsistentContinuation(t *testing.T) {
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// As 30s is the default timeout for testing in glboal configuration,
// we cannot wait longer than that in a single time: change it to 10
@ -1850,7 +1854,7 @@ func TestPrefix(t *testing.T) {
"/registry": "/registry",
}
for configuredPrefix, effectivePrefix := range testcases {
store := newStore(cluster.RandClient(), true, codec, configuredPrefix, transformer)
store := newStore(cluster.RandClient(), nil, true, codec, configuredPrefix, transformer)
if store.pathPrefix != effectivePrefix {
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
}
@ -2017,7 +2021,7 @@ func TestConsistentList(t *testing.T) {
transformer := &fancyTransformer{
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
}
store := newStore(cluster.RandClient(), true, codec, "", transformer)
store := newStore(cluster.RandClient(), newPod, true, codec, "", transformer)
transformer.store = store
for i := 0; i < 5; i++ {
@ -2079,7 +2083,6 @@ func TestConsistentList(t *testing.T) {
if !reflect.DeepEqual(result3, result4) {
t.Errorf("inconsistent lists: %#v, %#v", result3, result4)
}
}
func TestCount(t *testing.T) {

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"sync"
@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
"go.etcd.io/etcd/clientv3"
@ -68,6 +70,8 @@ func TestOnlySetFatalOnDecodeError(b bool) {
type watcher struct {
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
versioner storage.Versioner
transformer value.Transformer
}
@ -78,6 +82,7 @@ type watchChan struct {
key string
initialRev int64
recursive bool
progressNotify bool
internalPred storage.SelectionPredicate
ctx context.Context
cancel context.CancelFunc
@ -86,13 +91,20 @@ type watchChan struct {
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher {
return &watcher{
func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
res := &watcher{
client: client,
codec: codec,
newFunc: newFunc,
versioner: versioner,
transformer: transformer,
}
if newFunc == nil {
res.objectType = "<unknown>"
} else {
res.objectType = reflect.TypeOf(newFunc()).String()
}
return res
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
@ -102,21 +114,22 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.
// If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if pred matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) {
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, pred)
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
go wc.run()
return wc, nil
}
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan {
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
wc := &watchChan{
watcher: w,
key: key,
initialRev: rev,
recursive: recursive,
progressNotify: progressNotify,
internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
@ -223,6 +236,9 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
if wc.progressNotify {
opts = append(opts, clientv3.WithProgressNotify())
}
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
for wres := range wch {
if wres.Err() != nil {
@ -232,6 +248,12 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
wc.sendError(err)
return
}
if wres.IsProgressNotify() {
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
metrics.RecordEtcdBookmark(wc.watcher.objectType)
continue
}
for _, e := range wres.Events {
parsedEvent, err := parseEvent(e)
if err != nil {
@ -299,6 +321,19 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
}
switch {
case e.isProgressNotify:
if wc.watcher.newFunc == nil {
return nil
}
object := wc.watcher.newFunc()
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
klog.Errorf("failed to propagate object version: %v", err)
return nil
}
res = &watch.Event{
Type: watch.Bookmark,
Object: object,
}
case e.isDeleted:
if !wc.filter(oldObj) {
return nil
@ -376,6 +411,11 @@ func (wc *watchChan) sendEvent(e *event) {
}
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
if e.isProgressNotify {
// progressNotify events doesn't contain neither current nor previous object version,
return nil, nil, nil
}
if !e.isDeleted {
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
if err != nil {

View File

@ -225,13 +225,13 @@ func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
invalidStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
validStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")})
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
@ -246,7 +246,7 @@ func TestWatchContextCancel(t *testing.T) {
cancel()
// When we watch with a canceled context, we should detect that it's context canceled.
// We won't take it as error and also close the watcher.
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything)
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, false, storage.Everything)
if err != nil {
t.Fatal(err)
}
@ -265,7 +265,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
origCtx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, cancel := context.WithCancel(origCtx)
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything)
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
// make resutlChan and errChan blocking to ensure ordering.
w.resultChan = make(chan watch.Event)
w.errChan = make(chan error)
@ -314,6 +314,37 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
}
}
func TestProgressNotify(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
clusterConfig := &integration.ClusterConfig{
Size: 1,
WatchProgressNotifyInterval: time.Second,
}
cluster := integration.NewClusterV3(t, clusterConfig)
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
key := "/somekey"
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}
out := &example.Pod{}
if err := store.Create(ctx, key, input, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
opts := storage.ListOptions{
ResourceVersion: out.ResourceVersion,
Predicate: storage.Everything,
ProgressNotify: true,
}
w, err := store.Watch(ctx, key, opts)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
result := &example.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: out.ResourceVersion}}
testCheckResult(t, 0, watch.Bookmark, w, result)
}
type testWatchStruct struct {
obj *example.Pod
expectEvent bool

View File

@ -269,4 +269,7 @@ type ListOptions struct {
ResourceVersionMatch metav1.ResourceVersionMatch
// Predicate provides the selection rules for the list operation.
Predicate SelectionPredicate
// ProgressNotify determines whether storage-originated bookmark (progress notify) events should
// be delivered to the users. The option is ignored for non-watch requests.
ProgressNotify bool
}

View File

@ -31,6 +31,7 @@ import (
"go.etcd.io/etcd/pkg/transport"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/egressselector"
@ -217,7 +218,7 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
}, nil
}
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil {
return nil, nil, err
@ -249,7 +250,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
if transformer == nil {
transformer = value.IdentityTransformer
}
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -19,6 +19,7 @@ package factory
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
@ -27,12 +28,12 @@ import (
type DestroyFunc func()
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
func Create(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
return newETCD3Storage(c, newFunc)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}

View File

@ -75,7 +75,7 @@ func TestTLSConnection(t *testing.T) {
},
Codec: codec,
}
storage, destroyFunc, err := newETCD3Storage(cfg)
storage, destroyFunc, err := newETCD3Storage(cfg, nil)
defer destroyFunc()
if err != nil {
t.Fatal(err)

View File

@ -99,9 +99,12 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
return source
}
func newPod() runtime.Object { return &example.Pod{} }
func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true)
return server, storage
}
@ -118,8 +121,8 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: GetAttrs,
NewFunc: func() runtime.Object { return &example.Pod{} },
NewListFunc: func() runtime.Object { return &example.PodList{} },
NewFunc: newPod,
NewListFunc: newPodList,
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: clock,
}