refactor etcd store conditional delete

Kubernetes-commit: fecab0713b96bb0d528aea58900942ae0cb52260
This commit is contained in:
Abu Kashem 2024-09-19 13:14:52 -04:00 committed by Kubernetes Publisher
parent bd937b2b8b
commit f28acc6161
3 changed files with 78 additions and 15 deletions

View File

@ -271,13 +271,15 @@ func (s *store) Delete(
if err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
return s.conditionalDelete(ctx, preparedKey, out, v, preconditions, validateDeletion, cachedExistingObject)
skipTransformDecode := false
return s.conditionalDelete(ctx, preparedKey, out, v, preconditions, validateDeletion, cachedExistingObject, skipTransformDecode)
}
func (s *store) conditionalDelete(
ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
getCurrentState := s.getCurrentState(ctx, key, v, false)
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, skipTransformDecode bool) error {
getCurrentState := s.getCurrentState(ctx, key, v, false, skipTransformDecode)
var origState *objState
var err error
@ -361,7 +363,7 @@ func (s *store) conditionalDelete(
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(ctx, getResp, key, v, false)
origState, err = s.getState(ctx, getResp, key, v, false, skipTransformDecode)
if err != nil {
return err
}
@ -376,10 +378,12 @@ func (s *store) conditionalDelete(
if deleteResp.Header == nil {
return errors.New("invalid DeleteRange response - nil header")
}
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
if err != nil {
recordDecodeError(s.groupResourceString, key)
return err
if !skipTransformDecode {
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
if err != nil {
recordDecodeError(s.groupResourceString, key)
return err
}
}
return nil
}
@ -405,7 +409,8 @@ func (s *store) GuaranteedUpdate(
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
getCurrentState := s.getCurrentState(ctx, preparedKey, v, ignoreNotFound)
skipTransformDecode := false
getCurrentState := s.getCurrentState(ctx, preparedKey, v, ignoreNotFound, skipTransformDecode)
var origState *objState
var origStateIsCurrent bool
@ -531,7 +536,8 @@ func (s *store) GuaranteedUpdate(
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey)
origState, err = s.getState(ctx, getResp, preparedKey, v, ignoreNotFound)
skipTransformDecode := false
origState, err = s.getState(ctx, getResp, preparedKey, v, ignoreNotFound, skipTransformDecode)
if err != nil {
return err
}
@ -878,7 +884,7 @@ func (s *store) watchContext(ctx context.Context) context.Context {
return clientv3.WithRequireLeader(ctx)
}
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool) func() (*objState, error) {
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) func() (*objState, error) {
return func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
@ -886,11 +892,17 @@ func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value
if err != nil {
return nil, err
}
return s.getState(ctx, getResp, key, v, ignoreNotFound)
return s.getState(ctx, getResp, key, v, ignoreNotFound, skipTransformDecode)
}
}
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
// getState constructs a new objState from the given response from the storage.
// skipTransformDecode: if true, the function will neither transform the data
// from the storage nor decode it into an object; otherwise, data from the
// storage will be transformed and decoded.
// NOTE: when skipTransformDecode is true, the 'data', and the 'obj' fields
// of the objState will be nil, and 'stale' will be set to true.
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) {
state := &objState{
meta: &storage.ResponseMeta{},
}
@ -909,14 +921,24 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
return nil, err
}
} else {
state.rev = getResp.Kvs[0].ModRevision
state.meta.ResourceVersion = uint64(state.rev)
if skipTransformDecode {
// be explicit that we don't have the object
state.obj = nil
state.stale = true // this seems a more sane value here
return state, nil
}
data, stale, err := s.transformer.TransformFromStorage(ctx, getResp.Kvs[0].Value, authenticatedDataString(key))
if err != nil {
return nil, storage.NewInternalError(err.Error())
}
state.rev = getResp.Kvs[0].ModRevision
state.meta.ResourceVersion = uint64(state.rev)
state.data = data
state.stale = stale
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
recordDecodeError(s.groupResourceString, key)
return nil, err

View File

@ -108,6 +108,8 @@ type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Obj
// ValidateObjectFunc is a function to act on a given object. An error may be returned
// if the hook cannot be completed. The function may NOT transform the provided
// object.
// NOTE: the object in obj may be nil if it cannot be read from the
// storage, due to transformation or decode error.
type ValidateObjectFunc func(ctx context.Context, obj runtime.Object) error
// ValidateAllObjectFunc is a "admit everything" instance of ValidateObjectFunc.

View File

@ -0,0 +1,39 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"errors"
"strings"
"testing"
)
func TestPreconditionsCheckWithNilObject(t *testing.T) {
p := &Preconditions{}
err := p.Check("foo", nil)
if err == nil {
t.Fatalf("expected an error")
}
var internalErr InternalError
if !errors.As(err, &internalErr) {
t.Fatalf("expected error to be of type: %T, but got: %#v", InternalError{}, err)
}
if want := "can't enforce preconditions"; !strings.Contains(internalErr.Error(), want) {
t.Errorf("expected error to contain %q", want)
}
}