refactor: extract decode functions into an interface for etcd3 store

Kubernetes-commit: 1d1a656d8de1cdb99deaa6ec771aa354616eaa16
This commit is contained in:
Abu Kashem 2024-09-24 07:19:19 -04:00 committed by Kubernetes Publisher
parent 44ff1c1665
commit 14881364b3
6 changed files with 122 additions and 52 deletions

View File

@ -57,16 +57,20 @@ func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t testing.TB, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
versioner := storage.APIObjectVersioner{}
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
storage := etcd3.New(
server.V3Client,
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
codec,
newPod,
newPodList,
prefix,
"/pods",
schema.GroupResource{Resource: "pods"},
identity.NewEncryptCheckTransformer(),
etcd3.NewDefaultLeaseManagerConfig())
etcd3.NewDefaultLeaseManagerConfig(),
etcd3.NewDefaultDecoder(codec, versioner),
versioner)
return server, storage
}

View File

@ -0,0 +1,94 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"fmt"
"time"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
)
// NewDefaultDecoder returns the default decoder for etcd3 store
func NewDefaultDecoder(codec runtime.Codec, versioner storage.Versioner) Decoder {
return &defaultDecoder{
codec: codec,
versioner: versioner,
}
}
// Decoder is used by the etcd storage implementation to decode
// transformed data from the storage into an object
type Decoder interface {
// Decode decodes value of bytes into object. It will also
// set the object resource version to rev.
// On success, objPtr would be set to the object.
Decode(value []byte, objPtr runtime.Object, rev int64) error
// DecodeListItem decodes bytes value in array into object.
DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error)
}
var _ Decoder = &defaultDecoder{}
type defaultDecoder struct {
codec runtime.Codec
versioner storage.Versioner
}
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func (d *defaultDecoder) Decode(value []byte, objPtr runtime.Object, rev int64) error {
if _, err := conversion.EnforcePtr(objPtr); err != nil {
// nolint:errorlint // this code was moved from store.go as is
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
_, _, err := d.codec.Decode(value, nil, objPtr)
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
if err := d.versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
return nil
}
// decodeListItem decodes bytes value in array into object.
func (d *defaultDecoder) DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
}()
obj, _, err := d.codec.Decode(data, nil, newItemFunc())
if err != nil {
return nil, err
}
if err := d.versioner.UpdateObject(obj, rev); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
return obj, nil
}

View File

@ -38,7 +38,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
@ -82,6 +81,7 @@ type store struct {
groupResourceString string
watcher *watcher
leaseManager *leaseManager
decoder Decoder
}
func (s *store) RequestWatchProgress(ctx context.Context) error {
@ -99,12 +99,11 @@ type objState struct {
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig)
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner)
}
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store {
versioner := storage.APIObjectVersioner{}
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
@ -137,6 +136,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
groupResourceString: groupResource.String(),
watcher: w,
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
decoder: decoder,
}
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
@ -182,7 +182,7 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
return storage.NewInternalError(err)
}
err = decode(s.codec, s.versioner, data, out, kv.ModRevision)
err = s.decoder.Decode(data, out, kv.ModRevision)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
@ -248,7 +248,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
err = s.decoder.Decode(data, out, putResp.Header.Revision)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
@ -379,7 +379,7 @@ func (s *store) conditionalDelete(
return errors.New("invalid DeleteRange response - nil header")
}
if !skipTransformDecode {
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
err = s.decoder.Decode(origState.data, out, deleteResp.Header.Revision)
if err != nil {
recordDecodeError(s.groupResourceString, key)
return err
@ -496,7 +496,7 @@ func (s *store) GuaranteedUpdate(
}
// recheck that the data from etcd is not stale before short-circuiting a write
if !origState.stale {
err = decode(s.codec, s.versioner, origState.data, destination, origState.rev)
err = s.decoder.Decode(origState.data, destination, origState.rev)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
@ -547,7 +547,7 @@ func (s *store) GuaranteedUpdate(
}
putResp := txnResp.Responses[0].GetResponsePut()
err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
err = s.decoder.Decode(data, destination, putResp.Header.Revision)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
@ -779,7 +779,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
default:
}
obj, err := decodeListItem(ctx, data, uint64(kv.ModRevision), s.codec, s.versioner, newItemFunc)
obj, err := s.decoder.DecodeListItem(ctx, data, uint64(kv.ModRevision), newItemFunc)
if err != nil {
recordDecodeError(s.groupResourceString, string(kv.Key))
return err
@ -939,7 +939,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
state.data = data
state.stale = stale
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
if err := s.decoder.Decode(state.data, state.obj, state.rev); err != nil {
recordDecodeError(s.groupResourceString, key)
return nil, err
}
@ -1046,42 +1046,6 @@ func (s *store) prepareKey(key string) (string, error) {
return s.pathPrefix + key[startIndex:], nil
}
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
if _, err := conversion.EnforcePtr(objPtr); err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
_, _, err := codec.Decode(value, nil, objPtr)
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
if err := versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
return nil
}
// decodeListItem decodes bytes value in array into object.
func decodeListItem(ctx context.Context, data []byte, rev uint64, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) (runtime.Object, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
}()
obj, _, err := codec.Decode(data, nil, newItemFunc())
if err != nil {
return nil, err
}
if err := versioner.UpdateObject(obj, rev); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
return obj, nil
}
// recordDecodeError record decode error split by object type.
func recordDecodeError(resource string, key string) {
metrics.RecordDecodeError(resource)

View File

@ -566,6 +566,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli
if setupOpts.recorderEnabled {
client.KV = &clientRecorder{KV: client.KV}
}
versioner := storage.APIObjectVersioner{}
store := newStore(
client,
setupOpts.codec,
@ -576,6 +577,8 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli
setupOpts.groupResource,
setupOpts.transformer,
setupOpts.leaseConfig,
NewDefaultDecoder(setupOpts.codec, versioner),
versioner,
)
ctx := context.Background()
return ctx, store, client

View File

@ -459,7 +459,10 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
if transformer == nil {
transformer = identity.NewEncryptCheckTransformer()
}
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig), destroyFunc, nil
versioner := storage.APIObjectVersioner{}
decoder := etcd3.NewDefaultDecoder(c.Codec, versioner)
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner), destroyFunc, nil
}
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -84,8 +84,10 @@ func TestHighWaterMark(t *testing.T) {
func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
// test data
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
versioner := storage.APIObjectVersioner{}
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion)
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig())
storage := etcd3.New(server.V3Client, codec, func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig(), etcd3.NewDefaultDecoder(codec, versioner), versioner)
return server, storage
}
server, etcdStorage := newEtcdTestStorage(t, "")