Fix bug for inconsistent lists served from etcd
Kubernetes-commit: a5b60c352d4457d96a90ddf1479f0833ef36f7d4
This commit is contained in:
parent
5e8a2ce77c
commit
6e6045833c
|
@ -572,7 +572,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
|||
fromRV = &parsedRV
|
||||
}
|
||||
|
||||
var returnedRV, continueRV int64
|
||||
var returnedRV, continueRV, withRev int64
|
||||
var continueKey string
|
||||
switch {
|
||||
case s.pagingEnabled && len(pred.Continue) > 0:
|
||||
|
@ -593,7 +593,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
|||
// continueRV==0 is invalid.
|
||||
// If continueRV < 0, the request is for the latest resource version.
|
||||
if continueRV > 0 {
|
||||
options = append(options, clientv3.WithRev(continueRV))
|
||||
withRev = continueRV
|
||||
returnedRV = continueRV
|
||||
}
|
||||
case s.pagingEnabled && pred.Limit > 0:
|
||||
|
@ -604,11 +604,11 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
|||
// and returnedRV is then set to the revision we get from the etcd response.
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
returnedRV = int64(*fromRV)
|
||||
options = append(options, clientv3.WithRev(returnedRV))
|
||||
withRev = returnedRV
|
||||
case "": // legacy case
|
||||
if *fromRV > 0 {
|
||||
returnedRV = int64(*fromRV)
|
||||
options = append(options, clientv3.WithRev(returnedRV))
|
||||
withRev = returnedRV
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
|
||||
|
@ -625,7 +625,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
|||
// and returnedRV is then set to the revision we get from the etcd response.
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
returnedRV = int64(*fromRV)
|
||||
options = append(options, clientv3.WithRev(returnedRV))
|
||||
withRev = returnedRV
|
||||
case "": // legacy case
|
||||
default:
|
||||
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
|
||||
|
@ -634,6 +634,9 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
|||
|
||||
options = append(options, clientv3.WithPrefix())
|
||||
}
|
||||
if withRev != 0 {
|
||||
options = append(options, clientv3.WithRev(withRev))
|
||||
}
|
||||
|
||||
// loop until we have filled the requested limit from etcd or there are no more results
|
||||
var lastKey []byte
|
||||
|
@ -695,6 +698,10 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
|||
break
|
||||
}
|
||||
key = string(lastKey) + "\x00"
|
||||
if withRev == 0 {
|
||||
withRev = returnedRV
|
||||
options = append(options, clientv3.WithRev(withRev))
|
||||
}
|
||||
}
|
||||
|
||||
// instruct the client to begin querying from immediately after the last key we returned
|
||||
|
|
|
@ -1963,3 +1963,115 @@ func Test_growSlice(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// fancyTransformer creates next object on each call to
|
||||
// TransformFromStorage call.
|
||||
type fancyTransformer struct {
|
||||
transformer value.Transformer
|
||||
store *store
|
||||
|
||||
lock sync.Mutex
|
||||
index int
|
||||
}
|
||||
|
||||
func (t *fancyTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) {
|
||||
if err := t.createObject(); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return t.transformer.TransformFromStorage(b, ctx)
|
||||
}
|
||||
|
||||
func (t *fancyTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) {
|
||||
return t.transformer.TransformToStorage(b, ctx)
|
||||
}
|
||||
|
||||
func (t *fancyTransformer) createObject() error {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
t.index++
|
||||
key := fmt.Sprintf("pod-%d", t.index)
|
||||
obj := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: key,
|
||||
Labels: map[string]string{
|
||||
"even": strconv.FormatBool(t.index%2 == 0),
|
||||
},
|
||||
},
|
||||
}
|
||||
out := &example.Pod{}
|
||||
return t.store.Create(context.TODO(), key, obj, out, 0)
|
||||
}
|
||||
|
||||
func TestConsistentList(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
transformer := &fancyTransformer{
|
||||
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
|
||||
}
|
||||
store := newStore(cluster.RandClient(), true, codec, "", transformer)
|
||||
transformer.store = store
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
if err := transformer.createObject(); err != nil {
|
||||
t.Fatalf("failed to create object: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*example.Pod)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid object")
|
||||
}
|
||||
return labels.Set(pod.Labels), nil, nil
|
||||
}
|
||||
predicate := storage.SelectionPredicate{
|
||||
Label: labels.Set{"even": "true"}.AsSelector(),
|
||||
GetAttrs: getAttrs,
|
||||
Limit: 4,
|
||||
}
|
||||
|
||||
result1 := example.PodList{}
|
||||
if err := store.List(context.TODO(), "/", storage.ListOptions{Predicate: predicate}, &result1); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
// List objects from the returned resource version.
|
||||
options := storage.ListOptions{
|
||||
Predicate: predicate,
|
||||
ResourceVersion: result1.ResourceVersion,
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
|
||||
}
|
||||
|
||||
result2 := example.PodList{}
|
||||
if err := store.List(context.TODO(), "/", options, &result2); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(result1, result2) {
|
||||
t.Errorf("inconsistent lists: %#v, %#v", result1, result2)
|
||||
}
|
||||
|
||||
// Now also verify the ResourceVersionMatchNotOlderThan.
|
||||
options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
|
||||
|
||||
result3 := example.PodList{}
|
||||
if err := store.List(context.TODO(), "/", options, &result3); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
options.ResourceVersion = result3.ResourceVersion
|
||||
options.ResourceVersionMatch = metav1.ResourceVersionMatchExact
|
||||
|
||||
result4 := example.PodList{}
|
||||
if err := store.List(context.TODO(), "/", options, &result4); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(result3, result4) {
|
||||
t.Errorf("inconsistent lists: %#v, %#v", result3, result4)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue