diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 7f558a588..2129c333d 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -33,6 +33,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/conversion" @@ -69,6 +70,7 @@ type store struct { versioner storage.Versioner transformer value.Transformer pathPrefix string + groupResource schema.GroupResource watcher *watcher pagingEnabled bool leaseManager *leaseManager @@ -83,11 +85,11 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface { - return newStore(c, codec, newFunc, prefix, transformer, pagingEnabled, leaseManagerConfig) +func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface { + return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig) } -func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store { +func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store { versioner := APIObjectVersioner{} result := &store{ client: c, @@ -98,9 +100,10 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob // for compatibility with etcd2 impl. // no-op for default prefix of '/registry'. // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' - pathPrefix: path.Join("/", prefix), - watcher: newWatcher(c, codec, newFunc, versioner, transformer), - leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), + pathPrefix: path.Join("/", prefix), + groupResource: groupResource, + watcher: newWatcher(c, codec, newFunc, versioner, transformer), + leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), } return result } diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index 5af5f4b93..69f199624 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -42,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" @@ -1000,7 +1001,7 @@ func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) { func TestTransformationFailure(t *testing.T) { client := testserver.RunEtcd(t, nil) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) ctx := context.Background() preset := []struct { @@ -1076,8 +1077,8 @@ func TestList(t *testing.T) { client := testserver.RunEtcd(t, nil) defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)() codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig()) - disablePagingStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig()) + disablePagingStore := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) ctx := context.Background() // Setup storage with the following structure: @@ -1573,7 +1574,7 @@ func TestListContinuation(t *testing.T) { transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig()) + store := newStore(etcdClient, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig()) ctx := context.Background() // Setup storage with the following structure: @@ -1733,7 +1734,7 @@ func TestListContinuationWithFilter(t *testing.T) { transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig()) + store := newStore(etcdClient, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig()) ctx := context.Background() preset := []struct { @@ -1835,7 +1836,7 @@ func TestListContinuationWithFilter(t *testing.T) { func TestListInconsistentContinuation(t *testing.T) { client := testserver.RunEtcd(t, nil) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig()) ctx := context.Background() // Setup storage with the following structure: @@ -1983,7 +1984,7 @@ func testSetup(t *testing.T) (context.Context, *store, *clientv3.Client) { // As 30s is the default timeout for testing in glboal configuration, // we cannot wait longer than that in a single time: change it to 10 // for testing purposes. See apimachinery/pkg/util/wait/wait.go - store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{ + store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{ ReuseDurationSeconds: 1, MaxObjectCount: defaultLeaseMaxObjectCount, }) @@ -2027,7 +2028,7 @@ func TestPrefix(t *testing.T) { "/registry": "/registry", } for configuredPrefix, effectivePrefix := range testcases { - store := newStore(client, codec, nil, configuredPrefix, transformer, true, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, nil, configuredPrefix, schema.GroupResource{Resource: "widgets"}, transformer, true, NewDefaultLeaseManagerConfig()) if store.pathPrefix != effectivePrefix { t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) } @@ -2192,7 +2193,7 @@ func TestConsistentList(t *testing.T) { transformer := &fancyTransformer{ transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)}, } - store := newStore(client, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig()) transformer.store = store for i := 0; i < 5; i++ { @@ -2296,7 +2297,7 @@ func TestCount(t *testing.T) { func TestLeaseMaxObjectCount(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) client := testserver.RunEtcd(t, nil) - store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{ + store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{ ReuseDurationSeconds: defaultLeaseReuseDurationSeconds, MaxObjectCount: 2, }) diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 068e61281..0361548e0 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -221,13 +221,13 @@ func TestWatchFromNoneZero(t *testing.T) { func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} client := testserver.RunEtcd(t, nil) - invalidStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig()) + invalidStore := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig()) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig()) + validStore := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig()) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil @@ -327,7 +327,7 @@ func TestProgressNotify(t *testing.T) { clusterConfig := testserver.NewTestConfig(t) clusterConfig.ExperimentalWatchProgressNotifyInterval = time.Second client := testserver.RunEtcd(t, clusterConfig) - store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) ctx := context.Background() key := "/somekey" diff --git a/pkg/storage/storagebackend/factory/etcd3.go b/pkg/storage/storagebackend/factory/etcd3.go index 1d0b103c7..66f8e5214 100644 --- a/pkg/storage/storagebackend/factory/etcd3.go +++ b/pkg/storage/storagebackend/factory/etcd3.go @@ -276,7 +276,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime. if transformer == nil { transformer = value.IdentityTransformer } - return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil + return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/pkg/storage/tests/cacher_test.go b/pkg/storage/tests/cacher_test.go index 2317d786a..32f03d762 100644 --- a/pkg/storage/tests/cacher_test.go +++ b/pkg/storage/tests/cacher_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/clock" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -105,7 +106,7 @@ func newPodList() runtime.Object { return &example.PodList{} } func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig()) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig()) return server, storage }