567 lines
18 KiB
Go
567 lines
18 KiB
Go
/*
|
|
Copyright 2015 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package cacher
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
goruntime "runtime"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/apiserver/pkg/apis/example"
|
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
|
"k8s.io/apiserver/pkg/storage"
|
|
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
|
"k8s.io/utils/clock"
|
|
testingclock "k8s.io/utils/clock/testing"
|
|
)
|
|
|
|
const (
|
|
// watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity.
|
|
watchCacheDefaultCapacity = 100
|
|
)
|
|
|
|
func init() {
|
|
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
|
|
utilruntime.Must(example.AddToScheme(scheme))
|
|
utilruntime.Must(examplev1.AddToScheme(scheme))
|
|
}
|
|
|
|
// GetPodAttrs returns labels and fields of a given object for filtering purposes.
|
|
func GetPodAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
|
|
pod, ok := obj.(*example.Pod)
|
|
if !ok {
|
|
return nil, nil, fmt.Errorf("not a pod")
|
|
}
|
|
return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), nil
|
|
}
|
|
|
|
// PodToSelectableFields returns a field set that represents the object
|
|
// TODO: fields are not labels, and the validation rules for them do not apply.
|
|
func PodToSelectableFields(pod *example.Pod) fields.Set {
|
|
// The purpose of allocation with a given number of elements is to reduce
|
|
// amount of allocations needed to create the fields.Set. If you add any
|
|
// field here or the number of object-meta related fields changes, this should
|
|
// be adjusted.
|
|
podSpecificFieldsSet := make(fields.Set, 5)
|
|
podSpecificFieldsSet["spec.nodeName"] = pod.Spec.NodeName
|
|
podSpecificFieldsSet["spec.restartPolicy"] = string(pod.Spec.RestartPolicy)
|
|
podSpecificFieldsSet["status.phase"] = string(pod.Status.Phase)
|
|
return AddObjectMetaFieldsSet(podSpecificFieldsSet, &pod.ObjectMeta, true)
|
|
}
|
|
|
|
func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, hasNamespaceField bool) fields.Set {
|
|
source["metadata.name"] = objectMeta.Name
|
|
if hasNamespaceField {
|
|
source["metadata.namespace"] = objectMeta.Namespace
|
|
}
|
|
return source
|
|
}
|
|
|
|
func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*Cacher, storage.Versioner, error) {
|
|
prefix := "pods"
|
|
v := storage.APIObjectVersioner{}
|
|
config := Config{
|
|
Storage: s,
|
|
Versioner: v,
|
|
GroupResource: schema.GroupResource{Resource: "pods"},
|
|
ResourcePrefix: prefix,
|
|
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
|
GetAttrsFunc: GetPodAttrs,
|
|
NewFunc: newPod,
|
|
NewListFunc: newPodList,
|
|
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
|
Clock: clock,
|
|
}
|
|
cacher, err := NewCacherFromConfig(config)
|
|
return cacher, v, err
|
|
}
|
|
|
|
func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod {
|
|
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
|
return obj.DeepCopyObject(), nil, nil
|
|
}
|
|
key := "pods/" + obj.Namespace + "/" + obj.Name
|
|
if err := s.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, old == nil, nil, updateFn, nil); err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
obj.ResourceVersion = ""
|
|
result := &example.Pod{}
|
|
if err := s.Get(context.TODO(), key, storage.GetOptions{}, result); err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
return result
|
|
}
|
|
|
|
func checkStorageInvariants(ctx context.Context, t *testing.T, key string) {
|
|
// No-op function since cacher simply passes object creation to the underlying storage.
|
|
}
|
|
|
|
func TestCreate(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestCreate(ctx, t, cacher, checkStorageInvariants)
|
|
}
|
|
|
|
func TestCreateWithTTL(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestCreateWithTTL(ctx, t, cacher)
|
|
}
|
|
|
|
func TestCreateWithKeyExist(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestCreateWithKeyExist(ctx, t, cacher)
|
|
}
|
|
|
|
func TestGet(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestGet(ctx, t, cacher)
|
|
}
|
|
|
|
func TestUnconditionalDelete(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestUnconditionalDelete(ctx, t, cacher)
|
|
}
|
|
|
|
func TestConditionalDelete(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestConditionalDelete(ctx, t, cacher)
|
|
}
|
|
|
|
func TestDeleteWithSuggestion(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestDeleteWithSuggestion(ctx, t, cacher)
|
|
}
|
|
|
|
func TestDeleteWithSuggestionAndConflict(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestDeleteWithSuggestionAndConflict(ctx, t, cacher)
|
|
}
|
|
|
|
func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestDeleteWithSuggestionOfDeletedObject(ctx, t, cacher)
|
|
}
|
|
|
|
func TestValidateDeletionWithSuggestion(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestValidateDeletionWithSuggestion(ctx, t, cacher)
|
|
}
|
|
|
|
func TestPreconditionalDeleteWithSuggestion(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestPreconditionalDeleteWithSuggestion(ctx, t, cacher)
|
|
}
|
|
|
|
func TestList(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestList(ctx, t, cacher, true)
|
|
}
|
|
|
|
func TestListWithoutPaging(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t, withoutPaging)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestListWithoutPaging(ctx, t, cacher)
|
|
}
|
|
|
|
func TestGetListNonRecursive(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestGetListNonRecursive(ctx, t, cacher)
|
|
}
|
|
|
|
func checkStorageCalls(t *testing.T, pageSize, estimatedProcessedObjects uint64) {
|
|
// No-op function for now, since cacher passes pagination calls to underlying storage.
|
|
}
|
|
|
|
func TestListContinuation(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestListContinuation(ctx, t, cacher, checkStorageCalls)
|
|
}
|
|
|
|
func TestListPaginationRareObject(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestListPaginationRareObject(ctx, t, cacher, checkStorageCalls)
|
|
}
|
|
|
|
func TestListContinuationWithFilter(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestListContinuationWithFilter(ctx, t, cacher, checkStorageCalls)
|
|
}
|
|
|
|
func TestListInconsistentContinuation(t *testing.T) {
|
|
// TODO(#109831): Enable use of this test and run it.
|
|
}
|
|
|
|
func TestConsistentList(t *testing.T) {
|
|
// TODO(#109831): Enable use of this test and run it.
|
|
}
|
|
|
|
func TestGuaranteedUpdate(t *testing.T) {
|
|
// TODO(#109831): Enable use of this test and run it.
|
|
}
|
|
|
|
func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestGuaranteedUpdateWithTTL(ctx, t, cacher)
|
|
}
|
|
|
|
func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
|
// TODO(#109831): Enable use of this test and run it.
|
|
}
|
|
|
|
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestGuaranteedUpdateWithConflict(ctx, t, cacher)
|
|
}
|
|
|
|
func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestGuaranteedUpdateWithSuggestionAndConflict(ctx, t, cacher)
|
|
}
|
|
|
|
func TestTransformationFailure(t *testing.T) {
|
|
// TODO(#109831): Enable use of this test and run it.
|
|
}
|
|
|
|
func TestCount(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestCount(ctx, t, cacher)
|
|
}
|
|
|
|
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
|
|
_, _, line, _ := goruntime.Caller(1)
|
|
select {
|
|
case event := <-w.ResultChan():
|
|
if e, a := eventType, event.Type; e != a {
|
|
t.Logf("(called from line %d)", line)
|
|
t.Errorf("Expected: %s, got: %s", eventType, event.Type)
|
|
}
|
|
object := event.Object
|
|
if co, ok := object.(runtime.CacheableObject); ok {
|
|
object = co.GetObject()
|
|
}
|
|
if e, a := eventObject, object; !apiequality.Semantic.DeepDerivative(e, a) {
|
|
t.Logf("(called from line %d)", line)
|
|
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Logf("(called from line %d)", line)
|
|
t.Errorf("Timed out waiting for an event")
|
|
}
|
|
}
|
|
|
|
func TestWatch(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestWatch(ctx, t, cacher)
|
|
}
|
|
|
|
func TestWatchFromZero(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestWatchFromZero(ctx, t, cacher, nil)
|
|
}
|
|
|
|
func TestDeleteTriggerWatch(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestDeleteTriggerWatch(ctx, t, cacher)
|
|
}
|
|
|
|
func TestWatchFromNonZero(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestWatchFromNonZero(ctx, t, cacher)
|
|
}
|
|
|
|
func TestDelayedWatchDelivery(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestDelayedWatchDelivery(ctx, t, cacher)
|
|
}
|
|
|
|
func TestWatchError(t *testing.T) {
|
|
// TODO(#109831): Enable use of this test and run it.
|
|
}
|
|
|
|
func TestWatchContextCancel(t *testing.T) {
|
|
// TODO(#109831): Enable use of this test and run it.
|
|
}
|
|
|
|
func TestWatcherTimeout(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestWatcherTimeout(ctx, t, cacher)
|
|
}
|
|
|
|
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, cacher)
|
|
}
|
|
|
|
func TestWatchInitializationSignal(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestWatchInitializationSignal(ctx, t, cacher)
|
|
}
|
|
|
|
func TestClusterScopedWatch(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t, withClusterScopedKeyFunc, withSpecNodeNameIndexerFuncs)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestClusterScopedWatch(ctx, t, cacher)
|
|
}
|
|
|
|
func TestNamespaceScopedWatch(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t, withSpecNodeNameIndexerFuncs)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher)
|
|
}
|
|
|
|
// TODO(wojtek-t): We should extend the generic RunTestWatch test to cover the
|
|
// scenarios that are not yet covered by it and get rid of this test.
|
|
func TestWatchDeprecated(t *testing.T) {
|
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true)
|
|
defer server.Terminate(t)
|
|
fakeClock := testingclock.NewFakeClock(time.Now())
|
|
cacher, _, err := newTestCacherWithClock(etcdStorage, fakeClock)
|
|
if err != nil {
|
|
t.Fatalf("Couldn't create cacher: %v", err)
|
|
}
|
|
defer cacher.Stop()
|
|
|
|
podFoo := makeTestPodWithName("foo")
|
|
podBar := makeTestPodWithName("bar")
|
|
|
|
podFooPrime := makeTestPodWithName("foo")
|
|
podFooPrime.Spec.NodeName = "fakeNode"
|
|
|
|
podFooBis := makeTestPodWithName("foo")
|
|
podFooBis.Spec.NodeName = "anotherFakeNode"
|
|
|
|
podFooNS2 := makeTestPodWithName("foo")
|
|
podFooNS2.Namespace += "2"
|
|
|
|
// initialVersion is used to initate the watcher at the beginning of the world,
|
|
// which is not defined precisely in etcd.
|
|
initialVersion, err := cacher.LastSyncResourceVersion()
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
startVersion := strconv.Itoa(int(initialVersion))
|
|
|
|
// Set up Watch for object "podFoo".
|
|
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
defer watcher.Stop()
|
|
|
|
// Create in another namespace first to make sure events from other namespaces don't get delivered
|
|
updatePod(t, etcdStorage, podFooNS2, nil)
|
|
|
|
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
|
_ = updatePod(t, etcdStorage, podBar, nil)
|
|
fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
|
|
|
|
verifyWatchEvent(t, watcher, watch.Added, podFoo)
|
|
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
|
|
|
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything})
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
defer initialWatcher.Stop()
|
|
|
|
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
|
|
|
|
// Now test watch from "now".
|
|
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
defer nowWatcher.Stop()
|
|
|
|
verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime)
|
|
|
|
_ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
|
|
|
|
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
|
|
|
|
// Add watchCacheDefaultCapacity events to make current watch cache full.
|
|
// Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand.
|
|
for i := 0; i < watchCacheDefaultCapacity; i++ {
|
|
fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute))
|
|
podFoo := makeTestPodWithName(fmt.Sprintf("foo-%d", i))
|
|
updatePod(t, etcdStorage, podFoo, nil)
|
|
}
|
|
|
|
// Check whether we get too-old error via the watch channel
|
|
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
|
|
if err != nil {
|
|
t.Fatalf("Expected no direct error, got %v", err)
|
|
}
|
|
defer tooOldWatcher.Stop()
|
|
|
|
// Ensure we get a "Gone" error.
|
|
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
|
|
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
|
|
}
|
|
|
|
func TestWatchDispatchBookmarkEvents(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, cacher)
|
|
}
|
|
|
|
func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
|
|
ctx, cacher, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx, t, cacher)
|
|
}
|
|
|
|
func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
|
|
ctx, store, terminate := testSetup(t)
|
|
t.Cleanup(terminate)
|
|
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
|
|
}
|
|
|
|
// ===================================================
|
|
// Test-setup related function are following.
|
|
// ===================================================
|
|
|
|
type tearDownFunc func()
|
|
|
|
type setupOptions struct {
|
|
resourcePrefix string
|
|
keyFunc func(runtime.Object) (string, error)
|
|
indexerFuncs map[string]storage.IndexerFunc
|
|
pagingEnabled bool
|
|
clock clock.Clock
|
|
}
|
|
|
|
type setupOption func(*setupOptions)
|
|
|
|
func withDefaults(options *setupOptions) {
|
|
prefix := "/pods"
|
|
|
|
options.resourcePrefix = prefix
|
|
options.keyFunc = func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }
|
|
options.pagingEnabled = true
|
|
options.clock = clock.RealClock{}
|
|
}
|
|
|
|
func withClusterScopedKeyFunc(options *setupOptions) {
|
|
options.keyFunc = func(obj runtime.Object) (string, error) {
|
|
return storage.NoNamespaceKeyFunc(options.resourcePrefix, obj)
|
|
}
|
|
}
|
|
|
|
func withSpecNodeNameIndexerFuncs(options *setupOptions) {
|
|
options.indexerFuncs = map[string]storage.IndexerFunc{
|
|
"spec.nodeName": func(obj runtime.Object) string {
|
|
pod, ok := obj.(*example.Pod)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return pod.Spec.NodeName
|
|
},
|
|
}
|
|
}
|
|
|
|
func withoutPaging(options *setupOptions) {
|
|
options.pagingEnabled = false
|
|
}
|
|
|
|
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) {
|
|
setupOpts := setupOptions{}
|
|
opts = append([]setupOption{withDefaults}, opts...)
|
|
for _, opt := range opts {
|
|
opt(&setupOpts)
|
|
}
|
|
|
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), setupOpts.pagingEnabled)
|
|
// Inject one list error to make sure we test the relist case.
|
|
wrappedStorage := &storagetesting.StorageInjectingListErrors{
|
|
Interface: etcdStorage,
|
|
Errors: 1,
|
|
}
|
|
|
|
config := Config{
|
|
Storage: wrappedStorage,
|
|
Versioner: storage.APIObjectVersioner{},
|
|
GroupResource: schema.GroupResource{Resource: "pods"},
|
|
ResourcePrefix: setupOpts.resourcePrefix,
|
|
KeyFunc: setupOpts.keyFunc,
|
|
GetAttrsFunc: GetPodAttrs,
|
|
NewFunc: newPod,
|
|
NewListFunc: newPodList,
|
|
IndexerFuncs: setupOpts.indexerFuncs,
|
|
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
|
Clock: setupOpts.clock,
|
|
}
|
|
cacher, err := NewCacherFromConfig(config)
|
|
if err != nil {
|
|
t.Fatalf("Failed to initialize cacher: %v", err)
|
|
}
|
|
ctx := context.Background()
|
|
terminate := func() {
|
|
cacher.Stop()
|
|
server.Terminate(t)
|
|
}
|
|
|
|
// Since some tests depend on the fact that GetList shouldn't fail,
|
|
// we wait until the error from the underlying storage is consumed.
|
|
if err := wait.PollInfinite(100*time.Millisecond, wrappedStorage.ErrorsConsumed); err != nil {
|
|
t.Fatalf("Failed to inject list errors: %v", err)
|
|
}
|
|
|
|
return ctx, cacher, terminate
|
|
}
|