Plumb the schema.GroupResource into etcd3 Store struct

Kubernetes-commit: 6f8019aae95db85552381f7e7066a52fef61be7a
This commit is contained in:
Mike Spreitzer 2021-09-13 16:54:34 -04:00 committed by Kubernetes Publisher
parent 80817993b9
commit 31ec50ec43
5 changed files with 26 additions and 21 deletions

View File

@ -33,6 +33,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion"
@ -69,6 +70,7 @@ type store struct {
versioner storage.Versioner
transformer value.Transformer
pathPrefix string
groupResource schema.GroupResource
watcher *watcher
pagingEnabled bool
leaseManager *leaseManager
@ -83,11 +85,11 @@ type objState struct {
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, prefix, transformer, pagingEnabled, leaseManagerConfig)
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
}
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
versioner := APIObjectVersioner{}
result := &store{
client: c,
@ -98,9 +100,10 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
pathPrefix: path.Join("/", prefix),
groupResource: groupResource,
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
}
return result
}

View File

@ -42,6 +42,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
@ -1000,7 +1001,7 @@ func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
func TestTransformationFailure(t *testing.T) {
client := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
ctx := context.Background()
preset := []struct {
@ -1076,8 +1077,8 @@ func TestList(t *testing.T) {
client := testserver.RunEtcd(t, nil)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)()
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
disablePagingStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
disablePagingStore := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
ctx := context.Background()
// Setup storage with the following structure:
@ -1573,7 +1574,7 @@ func TestListContinuation(t *testing.T) {
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
store := newStore(etcdClient, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig())
ctx := context.Background()
// Setup storage with the following structure:
@ -1733,7 +1734,7 @@ func TestListContinuationWithFilter(t *testing.T) {
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
store := newStore(etcdClient, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig())
ctx := context.Background()
preset := []struct {
@ -1835,7 +1836,7 @@ func TestListContinuationWithFilter(t *testing.T) {
func TestListInconsistentContinuation(t *testing.T) {
client := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
ctx := context.Background()
// Setup storage with the following structure:
@ -1983,7 +1984,7 @@ func testSetup(t *testing.T) (context.Context, *store, *clientv3.Client) {
// As 30s is the default timeout for testing in glboal configuration,
// we cannot wait longer than that in a single time: change it to 10
// for testing purposes. See apimachinery/pkg/util/wait/wait.go
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
ReuseDurationSeconds: 1,
MaxObjectCount: defaultLeaseMaxObjectCount,
})
@ -2027,7 +2028,7 @@ func TestPrefix(t *testing.T) {
"/registry": "/registry",
}
for configuredPrefix, effectivePrefix := range testcases {
store := newStore(client, codec, nil, configuredPrefix, transformer, true, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, nil, configuredPrefix, schema.GroupResource{Resource: "widgets"}, transformer, true, NewDefaultLeaseManagerConfig())
if store.pathPrefix != effectivePrefix {
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
}
@ -2192,7 +2193,7 @@ func TestConsistentList(t *testing.T) {
transformer := &fancyTransformer{
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
}
store := newStore(client, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig())
transformer.store = store
for i := 0; i < 5; i++ {
@ -2296,7 +2297,7 @@ func TestCount(t *testing.T) {
func TestLeaseMaxObjectCount(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
client := testserver.RunEtcd(t, nil)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
MaxObjectCount: 2,
})

View File

@ -221,13 +221,13 @@ func TestWatchFromNoneZero(t *testing.T) {
func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
client := testserver.RunEtcd(t, nil)
invalidStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
invalidStore := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
validStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
validStore := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
@ -327,7 +327,7 @@ func TestProgressNotify(t *testing.T) {
clusterConfig := testserver.NewTestConfig(t)
clusterConfig.ExperimentalWatchProgressNotifyInterval = time.Second
client := testserver.RunEtcd(t, clusterConfig)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
ctx := context.Background()
key := "/somekey"

View File

@ -276,7 +276,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
if transformer == nil {
transformer = value.IdentityTransformer
}
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
}
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -105,7 +106,7 @@ func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig())
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}