diff --git a/pkg/registry/generic/registry/storage_factory.go b/pkg/registry/generic/registry/storage_factory.go index 5a0d207cb..6b34fc1e4 100644 --- a/pkg/registry/generic/registry/storage_factory.go +++ b/pkg/registry/generic/registry/storage_factory.go @@ -39,12 +39,15 @@ func StorageWithCacher(capacity int) generic.StorageDecorator { newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { + triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) { - s, d := generic.NewRawStorage(storageConfig) + s, d, err := generic.NewRawStorage(storageConfig) + if err != nil { + return s, d, err + } if capacity <= 0 { klog.V(5).Infof("Storage caching is disabled for %T", newFunc()) - return s, d + return s, d, nil } if klog.V(5) { klog.Infof("Storage caching is enabled for %T with capacity %v", newFunc(), capacity) @@ -64,7 +67,10 @@ func StorageWithCacher(capacity int) generic.StorageDecorator { TriggerPublisherFunc: triggerFunc, Codec: storageConfig.Codec, } - cacher := cacherstorage.NewCacherFromConfig(cacherConfig) + cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig) + if err != nil { + return nil, func() {}, err + } destroyFunc := func() { cacher.Stop() d() @@ -75,7 +81,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator { // merges as that shuts down storage properly RegisterStorageCleanup(destroyFunc) - return cacher, destroyFunc + return cacher, destroyFunc, nil } } diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index d951c96f1..ee3c4fb8d 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -1310,7 +1310,8 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { if e.Storage.Storage == nil { e.Storage.Codec = opts.StorageConfig.Codec - e.Storage.Storage, e.DestroyFunc = opts.Decorator( + var err error + e.Storage.Storage, e.DestroyFunc, err = opts.Decorator( opts.StorageConfig, prefix, keyFunc, @@ -1319,6 +1320,9 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { attrFunc, triggerFunc, ) + if err != nil { + return err + } e.StorageVersioner = opts.StorageConfig.EncodeVersioner if opts.CountMetricPollPeriod > 0 { diff --git a/pkg/registry/generic/registry/store_test.go b/pkg/registry/generic/registry/store_test.go index a753bdb14..32f050415 100644 --- a/pkg/registry/generic/registry/store_test.go +++ b/pkg/registry/generic/registry/store_test.go @@ -1561,7 +1561,10 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: sc.Codec, } - cacher := cacherstorage.NewCacherFromConfig(config) + cacher, err := cacherstorage.NewCacherFromConfig(config) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } d := destroyFunc s = cacher destroyFunc = func() { diff --git a/pkg/registry/generic/storage_decorator.go b/pkg/registry/generic/storage_decorator.go index f604a6f25..6509ef831 100644 --- a/pkg/registry/generic/storage_decorator.go +++ b/pkg/registry/generic/storage_decorator.go @@ -21,7 +21,6 @@ import ( "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend/factory" - "k8s.io/klog" ) // StorageDecorator is a function signature for producing a storage.Interface @@ -33,7 +32,7 @@ type StorageDecorator func( newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) + trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) // UndecoratedStorage returns the given a new storage from the given config // without any decoration. @@ -44,17 +43,13 @@ func UndecoratedStorage( newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { + trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) { return NewRawStorage(config) } // NewRawStorage creates the low level kv storage. This is a work-around for current // two layer of same storage interface. // TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method. -func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) { - s, d, err := factory.Create(*config) - if err != nil { - klog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err) - } - return s, d +func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) { + return factory.Create(*config) } diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index 10425d9dc..2c4c1a354 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -291,13 +291,13 @@ type Cacher struct { // NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from // its internal cache and updating its cache in the background based on the // given configuration. -func NewCacherFromConfig(config Config) *Cacher { +func NewCacherFromConfig(config Config) (*Cacher, error) { stopCh := make(chan struct{}) obj := config.NewFunc() // Give this error when it is constructed rather than when you get the // first watch item, because it's much easier to track down that way. if err := runtime.CheckCodec(config.Codec, obj); err != nil { - panic("storage codec doesn't seem to match given type: " + err.Error()) + return nil, fmt.Errorf("storage codec doesn't seem to match given type: %v", err) } clock := clock.RealClock{} @@ -363,7 +363,7 @@ func NewCacherFromConfig(config Config) *Cacher { ) }() - return cacher + return cacher, nil } func (c *Cacher) startCaching(stopChannel <-chan struct{}) { diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 1c9481649..154cc524f 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -257,7 +257,7 @@ func init() { utilruntime.Must(examplev1.AddToScheme(scheme)) } -func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) { +func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, error) { prefix := "pods" config := Config{ CacheCapacity: cap, @@ -270,7 +270,8 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) { NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), } - return NewCacherFromConfig(config), testVersioner{} + cacher, err := NewCacherFromConfig(config) + return cacher, testVersioner{}, err } type dummyStorage struct { @@ -328,7 +329,10 @@ func (d *dummyStorage) Count(_ string) (int64, error) { func TestListWithLimitAndRV0(t *testing.T) { backingStorage := &dummyStorage{} - cacher, _ := newTestCacher(backingStorage, 0) + cacher, _, err := newTestCacher(backingStorage, 0) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() pred := storage.SelectionPredicate{ @@ -341,7 +345,7 @@ func TestListWithLimitAndRV0(t *testing.T) { // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy - err := cacher.List(context.TODO(), "pods/ns", "0", pred, result) + err = cacher.List(context.TODO(), "pods/ns", "0", pred, result) if err != nil { t.Errorf("List with Limit and RV=0 should be served from cache: %v", err) } @@ -354,7 +358,10 @@ func TestListWithLimitAndRV0(t *testing.T) { func TestGetToListWithLimitAndRV0(t *testing.T) { backingStorage := &dummyStorage{} - cacher, _ := newTestCacher(backingStorage, 0) + cacher, _, err := newTestCacher(backingStorage, 0) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() pred := storage.SelectionPredicate{ @@ -367,7 +374,7 @@ func TestGetToListWithLimitAndRV0(t *testing.T) { // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy - err := cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result) + err = cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result) if err != nil { t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err) } @@ -380,7 +387,10 @@ func TestGetToListWithLimitAndRV0(t *testing.T) { func TestWatcherNotGoingBackInTime(t *testing.T) { backingStorage := &dummyStorage{} - cacher, _ := newTestCacher(backingStorage, 1000) + cacher, _, err := newTestCacher(backingStorage, 1000) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() // Wait until cacher is initialized. @@ -498,7 +508,10 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { func TestCacheWatcherStoppedOnDestroy(t *testing.T) { backingStorage := &dummyStorage{} - cacher, _ := newTestCacher(backingStorage, 1000) + cacher, _, err := newTestCacher(backingStorage, 1000) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() // Wait until cacher is initialized. @@ -577,7 +590,10 @@ func TestTimeBucketWatchersBasic(t *testing.T) { func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBookmarks, expectedBookmarks bool) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, watchCacheEnabled)() backingStorage := &dummyStorage{} - cacher, _ := newTestCacher(backingStorage, 1000) + cacher, _, err := newTestCacher(backingStorage, 1000) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() // Wait until cacher is initialized. @@ -676,7 +692,10 @@ func TestCacherSendBookmarkEvents(t *testing.T) { func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() backingStorage := &dummyStorage{} - cacher, _ := newTestCacher(backingStorage, 1000) + cacher, _, err := newTestCacher(backingStorage, 1000) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() // Wait until cacher is initialized. @@ -686,7 +705,7 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond) resourceVersion := uint64(1000) - err := cacher.watchCache.Add(&examplev1.Pod{ + err = cacher.watchCache.Add(&examplev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("pod-0"), Namespace: "ns", @@ -746,7 +765,10 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { backingStorage := &dummyStorage{} - cacher, _ := newTestCacher(backingStorage, 1000) + cacher, _, err := newTestCacher(backingStorage, 1000) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() // Wait until cacher is initialized. diff --git a/pkg/storage/tests/cacher_test.go b/pkg/storage/tests/cacher_test.go index 76accf824..8c2cb0611 100644 --- a/pkg/storage/tests/cacher_test.go +++ b/pkg/storage/tests/cacher_test.go @@ -99,7 +99,7 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServ return server, storage } -func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner) { +func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner, error) { prefix := "pods" v := etcd3.APIObjectVersioner{} config := cacherstorage.Config{ @@ -113,7 +113,8 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), } - return cacherstorage.NewCacherFromConfig(config), v + cacher, err := cacherstorage.NewCacherFromConfig(config) + return cacher, v, err } func makeTestPod(name string) *example.Pod { @@ -148,7 +149,10 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl func TestGet(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) - cacher, _ := newTestCacher(etcdStorage, 10) + cacher, _, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() podFoo := makeTestPod("foo") @@ -179,7 +183,10 @@ func TestGet(t *testing.T) { func TestGetToList(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) - cacher, _ := newTestCacher(etcdStorage, 10) + cacher, _, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() storedObj := updatePod(t, etcdStorage, makeTestPod("foo"), nil) @@ -235,7 +242,10 @@ func TestGetToList(t *testing.T) { func TestList(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) - cacher, _ := newTestCacher(etcdStorage, 10) + cacher, _, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() podFoo := makeTestPod("foo") @@ -316,7 +326,10 @@ func TestList(t *testing.T) { func TestInfiniteList(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) - cacher, v := newTestCacher(etcdStorage, 10) + cacher, v, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() podFoo := makeTestPod("foo") @@ -372,7 +385,10 @@ func TestWatch(t *testing.T) { // Inject one list error to make sure we test the relist case. etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} defer server.Terminate(t) - cacher, _ := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error + cacher, _, err := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() podFoo := makeTestPod("foo") @@ -447,7 +463,10 @@ func TestWatch(t *testing.T) { func TestWatcherTimeout(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) - cacher, _ := newTestCacher(etcdStorage, 10) + cacher, _, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() // initialVersion is used to initate the watcher at the beginning of the world, @@ -489,7 +508,10 @@ func TestWatcherTimeout(t *testing.T) { func TestFiltering(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) - cacher, _ := newTestCacher(etcdStorage, 10) + cacher, _, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() // Ensure that the cacher is initialized, before creating any pods, @@ -551,7 +573,10 @@ func TestFiltering(t *testing.T) { func TestStartingResourceVersion(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) - cacher, v := newTestCacher(etcdStorage, 10) + cacher, v, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() // add 1 object @@ -609,7 +634,10 @@ func TestEmptyWatchEventCache(t *testing.T) { fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) - cacher, v := newTestCacher(etcdStorage, 10) + cacher, v, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() // get rv of last pod created @@ -663,7 +691,10 @@ func TestEmptyWatchEventCache(t *testing.T) { func TestRandomWatchDeliver(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) - cacher, v := newTestCacher(etcdStorage, 10) + cacher, v, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) @@ -789,7 +820,10 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) - cacher, v := newTestCacher(etcdStorage, 10) + cacher, v, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) @@ -851,7 +885,10 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) - cacher, v := newTestCacher(etcdStorage, 10) + cacher, v, err := newTestCacher(etcdStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } defer cacher.Stop() pred := storage.Everything