Propagate error from creating cacher and storage decorators up

Kubernetes-commit: ee13be28845e8c55f6e2a2207ca4f46cd67fdb4e
This commit is contained in:
wojtekt 2019-07-15 09:34:08 +02:00 committed by Kubernetes Publisher
parent 4775d46683
commit fa23498ce7
7 changed files with 112 additions and 45 deletions

View File

@ -39,12 +39,15 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) {
s, d := generic.NewRawStorage(storageConfig)
s, d, err := generic.NewRawStorage(storageConfig)
if err != nil {
return s, d, err
}
if capacity <= 0 {
klog.V(5).Infof("Storage caching is disabled for %T", newFunc())
return s, d
return s, d, nil
}
if klog.V(5) {
klog.Infof("Storage caching is enabled for %T with capacity %v", newFunc(), capacity)
@ -64,7 +67,10 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
TriggerPublisherFunc: triggerFunc,
Codec: storageConfig.Codec,
}
cacher := cacherstorage.NewCacherFromConfig(cacherConfig)
cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
if err != nil {
return nil, func() {}, err
}
destroyFunc := func() {
cacher.Stop()
d()
@ -75,7 +81,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
// merges as that shuts down storage properly
RegisterStorageCleanup(destroyFunc)
return cacher, destroyFunc
return cacher, destroyFunc, nil
}
}

View File

@ -1310,7 +1310,8 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
if e.Storage.Storage == nil {
e.Storage.Codec = opts.StorageConfig.Codec
e.Storage.Storage, e.DestroyFunc = opts.Decorator(
var err error
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
@ -1319,6 +1320,9 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
attrFunc,
triggerFunc,
)
if err != nil {
return err
}
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
if opts.CountMetricPollPeriod > 0 {

View File

@ -1561,7 +1561,10 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: sc.Codec,
}
cacher := cacherstorage.NewCacherFromConfig(config)
cacher, err := cacherstorage.NewCacherFromConfig(config)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
d := destroyFunc
s = cacher
destroyFunc = func() {

View File

@ -21,7 +21,6 @@ import (
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/klog"
)
// StorageDecorator is a function signature for producing a storage.Interface
@ -33,7 +32,7 @@ type StorageDecorator func(
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc)
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error)
// UndecoratedStorage returns the given a new storage from the given config
// without any decoration.
@ -44,17 +43,13 @@ func UndecoratedStorage(
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config)
}
// NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
s, d, err := factory.Create(*config)
if err != nil {
klog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err)
}
return s, d
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config)
}

View File

@ -291,13 +291,13 @@ type Cacher struct {
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config Config) *Cacher {
func NewCacherFromConfig(config Config) (*Cacher, error) {
stopCh := make(chan struct{})
obj := config.NewFunc()
// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
panic("storage codec doesn't seem to match given type: " + err.Error())
return nil, fmt.Errorf("storage codec doesn't seem to match given type: %v", err)
}
clock := clock.RealClock{}
@ -363,7 +363,7 @@ func NewCacherFromConfig(config Config) *Cacher {
)
}()
return cacher
return cacher, nil
}
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {

View File

@ -257,7 +257,7 @@ func init() {
utilruntime.Must(examplev1.AddToScheme(scheme))
}
func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) {
func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, error) {
prefix := "pods"
config := Config{
CacheCapacity: cap,
@ -270,7 +270,8 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) {
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
}
return NewCacherFromConfig(config), testVersioner{}
cacher, err := NewCacherFromConfig(config)
return cacher, testVersioner{}, err
}
type dummyStorage struct {
@ -328,7 +329,10 @@ func (d *dummyStorage) Count(_ string) (int64, error) {
func TestListWithLimitAndRV0(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 0)
cacher, _, err := newTestCacher(backingStorage, 0)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
pred := storage.SelectionPredicate{
@ -341,7 +345,7 @@ func TestListWithLimitAndRV0(t *testing.T) {
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err := cacher.List(context.TODO(), "pods/ns", "0", pred, result)
err = cacher.List(context.TODO(), "pods/ns", "0", pred, result)
if err != nil {
t.Errorf("List with Limit and RV=0 should be served from cache: %v", err)
}
@ -354,7 +358,10 @@ func TestListWithLimitAndRV0(t *testing.T) {
func TestGetToListWithLimitAndRV0(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 0)
cacher, _, err := newTestCacher(backingStorage, 0)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
pred := storage.SelectionPredicate{
@ -367,7 +374,7 @@ func TestGetToListWithLimitAndRV0(t *testing.T) {
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err := cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result)
err = cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result)
if err != nil {
t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err)
}
@ -380,7 +387,10 @@ func TestGetToListWithLimitAndRV0(t *testing.T) {
func TestWatcherNotGoingBackInTime(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 1000)
cacher, _, err := newTestCacher(backingStorage, 1000)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Wait until cacher is initialized.
@ -498,7 +508,10 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 1000)
cacher, _, err := newTestCacher(backingStorage, 1000)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Wait until cacher is initialized.
@ -577,7 +590,10 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBookmarks, expectedBookmarks bool) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, watchCacheEnabled)()
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 1000)
cacher, _, err := newTestCacher(backingStorage, 1000)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Wait until cacher is initialized.
@ -676,7 +692,10 @@ func TestCacherSendBookmarkEvents(t *testing.T) {
func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 1000)
cacher, _, err := newTestCacher(backingStorage, 1000)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Wait until cacher is initialized.
@ -686,7 +705,7 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
resourceVersion := uint64(1000)
err := cacher.watchCache.Add(&examplev1.Pod{
err = cacher.watchCache.Add(&examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-0"),
Namespace: "ns",
@ -746,7 +765,10 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 1000)
cacher, _, err := newTestCacher(backingStorage, 1000)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Wait until cacher is initialized.

View File

@ -99,7 +99,7 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServ
return server, storage
}
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner) {
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner, error) {
prefix := "pods"
v := etcd3.APIObjectVersioner{}
config := cacherstorage.Config{
@ -113,7 +113,8 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
}
return cacherstorage.NewCacherFromConfig(config), v
cacher, err := cacherstorage.NewCacherFromConfig(config)
return cacher, v, err
}
func makeTestPod(name string) *example.Pod {
@ -148,7 +149,10 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl
func TestGet(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
cacher, _, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
podFoo := makeTestPod("foo")
@ -179,7 +183,10 @@ func TestGet(t *testing.T) {
func TestGetToList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
cacher, _, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
storedObj := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
@ -235,7 +242,10 @@ func TestGetToList(t *testing.T) {
func TestList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
cacher, _, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
podFoo := makeTestPod("foo")
@ -316,7 +326,10 @@ func TestList(t *testing.T) {
func TestInfiniteList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
cacher, v, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
podFoo := makeTestPod("foo")
@ -372,7 +385,10 @@ func TestWatch(t *testing.T) {
// Inject one list error to make sure we test the relist case.
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error
cacher, _, err := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
podFoo := makeTestPod("foo")
@ -447,7 +463,10 @@ func TestWatch(t *testing.T) {
func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
cacher, _, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// initialVersion is used to initate the watcher at the beginning of the world,
@ -489,7 +508,10 @@ func TestWatcherTimeout(t *testing.T) {
func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
cacher, _, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Ensure that the cacher is initialized, before creating any pods,
@ -551,7 +573,10 @@ func TestFiltering(t *testing.T) {
func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
cacher, v, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// add 1 object
@ -609,7 +634,10 @@ func TestEmptyWatchEventCache(t *testing.T) {
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
cacher, v := newTestCacher(etcdStorage, 10)
cacher, v, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// get rv of last pod created
@ -663,7 +691,10 @@ func TestEmptyWatchEventCache(t *testing.T) {
func TestRandomWatchDeliver(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
cacher, v, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
@ -789,7 +820,10 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
cacher, v, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
@ -851,7 +885,10 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
cacher, v, err := newTestCacher(etcdStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
pred := storage.Everything