diff --git a/pkg/storage/cacher/cacher_testing_utils_test.go b/pkg/storage/cacher/cacher_testing_utils_test.go index 4f1fed104..a7f68b543 100644 --- a/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/pkg/storage/cacher/cacher_testing_utils_test.go @@ -57,16 +57,20 @@ func newPodList() runtime.Object { return &example.PodList{} } func newEtcdTestStorage(t testing.TB, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + versioner := storage.APIObjectVersioner{} + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) storage := etcd3.New( server.V3Client, - apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), + codec, newPod, newPodList, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), - etcd3.NewDefaultLeaseManagerConfig()) + etcd3.NewDefaultLeaseManagerConfig(), + etcd3.NewDefaultDecoder(codec, versioner), + versioner) return server, storage } diff --git a/pkg/storage/etcd3/decoder.go b/pkg/storage/etcd3/decoder.go new file mode 100644 index 000000000..f70101529 --- /dev/null +++ b/pkg/storage/etcd3/decoder.go @@ -0,0 +1,94 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime" + endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/storage" + + "k8s.io/klog/v2" +) + +// NewDefaultDecoder returns the default decoder for etcd3 store +func NewDefaultDecoder(codec runtime.Codec, versioner storage.Versioner) Decoder { + return &defaultDecoder{ + codec: codec, + versioner: versioner, + } +} + +// Decoder is used by the etcd storage implementation to decode +// transformed data from the storage into an object +type Decoder interface { + // Decode decodes value of bytes into object. It will also + // set the object resource version to rev. + // On success, objPtr would be set to the object. + Decode(value []byte, objPtr runtime.Object, rev int64) error + + // DecodeListItem decodes bytes value in array into object. + DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error) +} + +var _ Decoder = &defaultDecoder{} + +type defaultDecoder struct { + codec runtime.Codec + versioner storage.Versioner +} + +// decode decodes value of bytes into object. It will also set the object resource version to rev. +// On success, objPtr would be set to the object. +func (d *defaultDecoder) Decode(value []byte, objPtr runtime.Object, rev int64) error { + if _, err := conversion.EnforcePtr(objPtr); err != nil { + // nolint:errorlint // this code was moved from store.go as is + return fmt.Errorf("unable to convert output object to pointer: %v", err) + } + _, _, err := d.codec.Decode(value, nil, objPtr) + if err != nil { + return err + } + // being unable to set the version does not prevent the object from being extracted + if err := d.versioner.UpdateObject(objPtr, uint64(rev)); err != nil { + klog.Errorf("failed to update object version: %v", err) + } + return nil +} + +// decodeListItem decodes bytes value in array into object. +func (d *defaultDecoder) DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error) { + startedAt := time.Now() + defer func() { + endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt)) + }() + + obj, _, err := d.codec.Decode(data, nil, newItemFunc()) + if err != nil { + return nil, err + } + + if err := d.versioner.UpdateObject(obj, rev); err != nil { + klog.Errorf("failed to update object version: %v", err) + } + + return obj, nil +} diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index f79364058..37e454e90 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -38,7 +38,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/audit" - endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/metrics" @@ -82,6 +81,7 @@ type store struct { groupResourceString string watcher *watcher leaseManager *leaseManager + decoder Decoder } func (s *store) RequestWatchProgress(ctx context.Context) error { @@ -99,12 +99,11 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) storage.Interface { - return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig) +func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface { + return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner) } -func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store { - versioner := storage.APIObjectVersioner{} +func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store { // for compatibility with etcd2 impl. // no-op for default prefix of '/registry'. // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' @@ -137,6 +136,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func groupResourceString: groupResource.String(), watcher: w, leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), + decoder: decoder, } w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) { @@ -182,7 +182,7 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou return storage.NewInternalError(err) } - err = decode(s.codec, s.versioner, data, out, kv.ModRevision) + err = s.decoder.Decode(data, out, kv.ModRevision) if err != nil { recordDecodeError(s.groupResourceString, preparedKey) return err @@ -248,7 +248,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, if out != nil { putResp := txnResp.Responses[0].GetResponsePut() - err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision) + err = s.decoder.Decode(data, out, putResp.Header.Revision) if err != nil { span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error())) recordDecodeError(s.groupResourceString, preparedKey) @@ -379,7 +379,7 @@ func (s *store) conditionalDelete( return errors.New("invalid DeleteRange response - nil header") } if !skipTransformDecode { - err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision) + err = s.decoder.Decode(origState.data, out, deleteResp.Header.Revision) if err != nil { recordDecodeError(s.groupResourceString, key) return err @@ -496,7 +496,7 @@ func (s *store) GuaranteedUpdate( } // recheck that the data from etcd is not stale before short-circuiting a write if !origState.stale { - err = decode(s.codec, s.versioner, origState.data, destination, origState.rev) + err = s.decoder.Decode(origState.data, destination, origState.rev) if err != nil { recordDecodeError(s.groupResourceString, preparedKey) return err @@ -547,7 +547,7 @@ func (s *store) GuaranteedUpdate( } putResp := txnResp.Responses[0].GetResponsePut() - err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision) + err = s.decoder.Decode(data, destination, putResp.Header.Revision) if err != nil { span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error())) recordDecodeError(s.groupResourceString, preparedKey) @@ -779,7 +779,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption default: } - obj, err := decodeListItem(ctx, data, uint64(kv.ModRevision), s.codec, s.versioner, newItemFunc) + obj, err := s.decoder.DecodeListItem(ctx, data, uint64(kv.ModRevision), newItemFunc) if err != nil { recordDecodeError(s.groupResourceString, string(kv.Key)) return err @@ -939,7 +939,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key state.data = data state.stale = stale - if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil { + if err := s.decoder.Decode(state.data, state.obj, state.rev); err != nil { recordDecodeError(s.groupResourceString, key) return nil, err } @@ -1046,42 +1046,6 @@ func (s *store) prepareKey(key string) (string, error) { return s.pathPrefix + key[startIndex:], nil } -// decode decodes value of bytes into object. It will also set the object resource version to rev. -// On success, objPtr would be set to the object. -func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error { - if _, err := conversion.EnforcePtr(objPtr); err != nil { - return fmt.Errorf("unable to convert output object to pointer: %v", err) - } - _, _, err := codec.Decode(value, nil, objPtr) - if err != nil { - return err - } - // being unable to set the version does not prevent the object from being extracted - if err := versioner.UpdateObject(objPtr, uint64(rev)); err != nil { - klog.Errorf("failed to update object version: %v", err) - } - return nil -} - -// decodeListItem decodes bytes value in array into object. -func decodeListItem(ctx context.Context, data []byte, rev uint64, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) (runtime.Object, error) { - startedAt := time.Now() - defer func() { - endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt)) - }() - - obj, _, err := codec.Decode(data, nil, newItemFunc()) - if err != nil { - return nil, err - } - - if err := versioner.UpdateObject(obj, rev); err != nil { - klog.Errorf("failed to update object version: %v", err) - } - - return obj, nil -} - // recordDecodeError record decode error split by object type. func recordDecodeError(resource string, key string) { metrics.RecordDecodeError(resource) diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index 7036a7383..43d2f8475 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -566,6 +566,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli if setupOpts.recorderEnabled { client.KV = &clientRecorder{KV: client.KV} } + versioner := storage.APIObjectVersioner{} store := newStore( client, setupOpts.codec, @@ -576,6 +577,8 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli setupOpts.groupResource, setupOpts.transformer, setupOpts.leaseConfig, + NewDefaultDecoder(setupOpts.codec, versioner), + versioner, ) ctx := context.Background() return ctx, store, client diff --git a/pkg/storage/storagebackend/factory/etcd3.go b/pkg/storage/storagebackend/factory/etcd3.go index 9cd0b981b..d629087d7 100644 --- a/pkg/storage/storagebackend/factory/etcd3.go +++ b/pkg/storage/storagebackend/factory/etcd3.go @@ -459,7 +459,10 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu if transformer == nil { transformer = identity.NewEncryptCheckTransformer() } - return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig), destroyFunc, nil + + versioner := storage.APIObjectVersioner{} + decoder := etcd3.NewDefaultDecoder(c.Codec, versioner) + return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner), destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index ed9f43ef6..fd8bca745 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -84,8 +84,10 @@ func TestHighWaterMark(t *testing.T) { func TestGetCurrentResourceVersionFromStorage(t *testing.T) { // test data newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { + versioner := storage.APIObjectVersioner{} + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion) server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig()) + storage := etcd3.New(server.V3Client, codec, func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig(), etcd3.NewDefaultDecoder(codec, versioner), versioner) return server, storage } server, etcdStorage := newEtcdTestStorage(t, "")