From 55fd399ad43e20b4eaac197223de4c36bee0d2f4 Mon Sep 17 00:00:00 2001 From: deads2k Date: Tue, 31 Jan 2017 11:47:19 -0500 Subject: [PATCH] move pkg/storage to apiserver --- pkg/storage/OWNERS | 41 + pkg/storage/cacher.go | 958 ++++++++++++++++++++++++ pkg/storage/cacher_whitebox_test.go | 56 ++ pkg/storage/doc.go | 18 + pkg/storage/errors.go | 170 +++++ pkg/storage/interfaces.go | 181 +++++ pkg/storage/selection_predicate.go | 90 +++ pkg/storage/selection_predicate_test.go | 119 +++ pkg/storage/time_budget.go | 95 +++ pkg/storage/time_budget_test.go | 53 ++ pkg/storage/util.go | 161 ++++ pkg/storage/util_test.go | 136 ++++ pkg/storage/watch_cache.go | 468 ++++++++++++ pkg/storage/watch_cache_test.go | 367 +++++++++ 14 files changed, 2913 insertions(+) create mode 100644 pkg/storage/OWNERS create mode 100644 pkg/storage/cacher.go create mode 100644 pkg/storage/cacher_whitebox_test.go create mode 100644 pkg/storage/doc.go create mode 100644 pkg/storage/errors.go create mode 100644 pkg/storage/interfaces.go create mode 100644 pkg/storage/selection_predicate.go create mode 100644 pkg/storage/selection_predicate_test.go create mode 100644 pkg/storage/time_budget.go create mode 100644 pkg/storage/time_budget_test.go create mode 100644 pkg/storage/util.go create mode 100644 pkg/storage/util_test.go create mode 100644 pkg/storage/watch_cache.go create mode 100644 pkg/storage/watch_cache_test.go diff --git a/pkg/storage/OWNERS b/pkg/storage/OWNERS new file mode 100644 index 000000000..a107a01a5 --- /dev/null +++ b/pkg/storage/OWNERS @@ -0,0 +1,41 @@ +approvers: +- lavalamp +- liggitt +- timothysc +- wojtek-t +- xiang90 +reviewers: +- thockin +- lavalamp +- smarterclayton +- wojtek-t +- deads2k +- derekwaynecarr +- caesarxuchao +- mikedanese +- liggitt +- erictune +- davidopp +- pmorie +- luxas +- janetkuo +- roberthbailey +- ncdc +- timstclair +- timothysc +- soltysh +- dims +- madhusudancs +- hongchaodeng +- krousey +- fgrzadkowski +- xiang90 +- mml +- ingvagabund +- resouer +- mbohlool +- pweil- +- lixiaobing10051267 +- mqliang +- feihujiang +- rrati diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go new file mode 100644 index 000000000..fffac82a8 --- /dev/null +++ b/pkg/storage/cacher.go @@ -0,0 +1,958 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "fmt" + "net/http" + "reflect" + "strconv" + "sync" + "time" + + "github.com/golang/glog" + "golang.org/x/net/context" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + utiltrace "k8s.io/apiserver/pkg/util/trace" + "k8s.io/client-go/tools/cache" +) + +// CacherConfig contains the configuration for a given Cache. +type CacherConfig struct { + // Maximum size of the history cached in memory. + CacheCapacity int + + // An underlying storage.Interface. + Storage Interface + + // An underlying storage.Versioner. + Versioner Versioner + + Copier runtime.ObjectCopier + + // The Cache will be caching objects of a given Type and assumes that they + // are all stored under ResourcePrefix directory in the underlying database. + Type interface{} + ResourcePrefix string + + // KeyFunc is used to get a key in the underyling storage for a given object. + KeyFunc func(runtime.Object) (string, error) + + // GetAttrsFunc is used to get object labels and fields. + GetAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error) + + // TriggerPublisherFunc is used for optimizing amount of watchers that + // needs to process an incoming event. + TriggerPublisherFunc TriggerPublisherFunc + + // NewList is a function that creates new empty object storing a list of + // objects of type Type. + NewListFunc func() runtime.Object + + Codec runtime.Codec +} + +type watchersMap map[int]*cacheWatcher + +func (wm watchersMap) addWatcher(w *cacheWatcher, number int) { + wm[number] = w +} + +func (wm watchersMap) deleteWatcher(number int) { + delete(wm, number) +} + +func (wm watchersMap) terminateAll() { + for key, watcher := range wm { + delete(wm, key) + watcher.stop() + } +} + +type indexedWatchers struct { + allWatchers watchersMap + valueWatchers map[string]watchersMap +} + +func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) { + if supported { + if _, ok := i.valueWatchers[value]; !ok { + i.valueWatchers[value] = watchersMap{} + } + i.valueWatchers[value].addWatcher(w, number) + } else { + i.allWatchers.addWatcher(w, number) + } +} + +func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) { + if supported { + i.valueWatchers[value].deleteWatcher(number) + if len(i.valueWatchers[value]) == 0 { + delete(i.valueWatchers, value) + } + } else { + i.allWatchers.deleteWatcher(number) + } +} + +func (i *indexedWatchers) terminateAll(objectType reflect.Type) { + if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 { + glog.Warningf("Terminating all watchers from cacher %v", objectType) + } + i.allWatchers.terminateAll() + for index, watchers := range i.valueWatchers { + watchers.terminateAll() + delete(i.valueWatchers, index) + } +} + +type watchFilterFunc func(string, labels.Set, fields.Set) bool + +// Cacher is responsible for serving WATCH and LIST requests for a given +// resource from its internal cache and updating its cache in the background +// based on the underlying storage contents. +// Cacher implements storage.Interface (although most of the calls are just +// delegated to the underlying storage). +type Cacher struct { + // HighWaterMarks for performance debugging. + // Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms + // See: https://golang.org/pkg/sync/atomic/ for more information + incomingHWM HighWaterMark + // Incoming events that should be dispatched to watchers. + incoming chan watchCacheEvent + + sync.RWMutex + + // Before accessing the cacher's cache, wait for the ready to be ok. + // This is necessary to prevent users from accessing structures that are + // uninitialized or are being repopulated right now. + // ready needs to be set to false when the cacher is paused or stopped. + // ready needs to be set to true when the cacher is ready to use after + // initialization. + ready *ready + + // Underlying storage.Interface. + storage Interface + + copier runtime.ObjectCopier + + // Expected type of objects in the underlying cache. + objectType reflect.Type + + // "sliding window" of recent changes of objects and the current state. + watchCache *watchCache + reflector *cache.Reflector + + // Versioner is used to handle resource versions. + versioner Versioner + + // triggerFunc is used for optimizing amount of watchers that needs to process + // an incoming event. + triggerFunc TriggerPublisherFunc + // watchers is mapping from the value of trigger function that a + // watcher is interested into the watchers + watcherIdx int + watchers indexedWatchers + + // Defines a time budget that can be spend on waiting for not-ready watchers + // while dispatching event before shutting them down. + dispatchTimeoutBudget *timeBudget + + // Handling graceful termination. + stopLock sync.RWMutex + stopped bool + stopCh chan struct{} + stopWg sync.WaitGroup +} + +// Create a new Cacher responsible for servicing WATCH and LIST requests from +// its internal cache and updating its cache in the background based on the +// given configuration. +func NewCacherFromConfig(config CacherConfig) *Cacher { + watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc) + listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) + + // Give this error when it is constructed rather than when you get the + // first watch item, because it's much easier to track down that way. + if obj, ok := config.Type.(runtime.Object); ok { + if err := runtime.CheckCodec(config.Codec, obj); err != nil { + panic("storage codec doesn't seem to match given type: " + err.Error()) + } + } + + stopCh := make(chan struct{}) + cacher := &Cacher{ + ready: newReady(), + storage: config.Storage, + copier: config.Copier, + objectType: reflect.TypeOf(config.Type), + watchCache: watchCache, + reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), + versioner: config.Versioner, + triggerFunc: config.TriggerPublisherFunc, + watcherIdx: 0, + watchers: indexedWatchers{ + allWatchers: make(map[int]*cacheWatcher), + valueWatchers: make(map[string]watchersMap), + }, + // TODO: Figure out the correct value for the buffer size. + incoming: make(chan watchCacheEvent, 100), + dispatchTimeoutBudget: newTimeBudget(stopCh), + // We need to (potentially) stop both: + // - wait.Until go-routine + // - reflector.ListAndWatch + // and there are no guarantees on the order that they will stop. + // So we will be simply closing the channel, and synchronizing on the WaitGroup. + stopCh: stopCh, + } + watchCache.SetOnEvent(cacher.processEvent) + go cacher.dispatchEvents() + + cacher.stopWg.Add(1) + go func() { + defer cacher.stopWg.Done() + wait.Until( + func() { + if !cacher.isStopped() { + cacher.startCaching(stopCh) + } + }, time.Second, stopCh, + ) + }() + return cacher +} + +func (c *Cacher) startCaching(stopChannel <-chan struct{}) { + // The 'usable' lock is always 'RLock'able when it is safe to use the cache. + // It is safe to use the cache after a successful list until a disconnection. + // We start with usable (write) locked. The below OnReplace function will + // unlock it after a successful list. The below defer will then re-lock + // it when this function exits (always due to disconnection), only if + // we actually got a successful list. This cycle will repeat as needed. + successfulList := false + c.watchCache.SetOnReplace(func() { + successfulList = true + c.ready.set(true) + }) + defer func() { + if successfulList { + c.ready.set(false) + } + }() + + c.terminateAllWatchers() + // Note that since onReplace may be not called due to errors, we explicitly + // need to retry it on errors under lock. + // Also note that startCaching is called in a loop, so there's no need + // to have another loop here. + if err := c.reflector.ListAndWatch(stopChannel); err != nil { + glog.Errorf("unexpected ListAndWatch error: %v", err) + } +} + +// Implements storage.Interface. +func (c *Cacher) Versioner() Versioner { + return c.storage.Versioner() +} + +// Implements storage.Interface. +func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + return c.storage.Create(ctx, key, obj, out, ttl) +} + +// Implements storage.Interface. +func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error { + return c.storage.Delete(ctx, key, out, preconditions) +} + +// Implements storage.Interface. +func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) { + watchRV, err := ParseWatchResourceVersion(resourceVersion) + if err != nil { + return nil, err + } + + c.ready.wait() + + // We explicitly use thread unsafe version and do locking ourself to ensure that + // no new events will be processed in the meantime. The watchCache will be unlocked + // on return from this function. + // Note that we cannot do it under Cacher lock, to avoid a deadlock, since the + // underlying watchCache is calling processEvent under its lock. + c.watchCache.RLock() + defer c.watchCache.RUnlock() + initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) + if err != nil { + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil + } + + triggerValue, triggerSupported := "", false + // TODO: Currently we assume that in a given Cacher object, any that is + // passed here is aware of exactly the same trigger (at most one). + // Thus, either 0 or 1 values will be returned. + if matchValues := pred.MatcherIndex(); len(matchValues) > 0 { + triggerValue, triggerSupported = matchValues[0].Value, true + } + + // If there is triggerFunc defined, but triggerSupported is false, + // we can't narrow the amount of events significantly at this point. + // + // That said, currently triggerFunc is defined only for Pods and Nodes, + // and there is only constant number of watchers for which triggerSupported + // is false (excluding those issues explicitly by users). + // Thus, to reduce the risk of those watchers blocking all watchers of a + // given resource in the system, we increase the sizes of buffers for them. + chanSize := 10 + if c.triggerFunc != nil && !triggerSupported { + // TODO: We should tune this value and ideally make it dependent on the + // number of objects of a given type and/or their churn. + chanSize = 1000 + } + + c.Lock() + defer c.Unlock() + forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) + watcher := newCacheWatcher(c.copier, watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget) + + c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) + c.watcherIdx++ + return watcher, nil +} + +// Implements storage.Interface. +func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) { + return c.Watch(ctx, key, resourceVersion, pred) +} + +// Implements storage.Interface. +func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error { + if resourceVersion == "" { + // If resourceVersion is not specified, serve it from underlying + // storage (for backward compatibility). + return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) + } + + // If resourceVersion is specified, serve it from cache. + // It's guaranteed that the returned value is at least that + // fresh as the given resourceVersion. + getRV, err := ParseListResourceVersion(resourceVersion) + if err != nil { + return err + } + + // Do not create a trace - it's not for free and there are tons + // of Get requests. We can add it if it will be really needed. + c.ready.wait() + + objVal, err := conversion.EnforcePtr(objPtr) + if err != nil { + return err + } + + obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil) + if err != nil { + return err + } + + if exists { + elem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("non *storeElement returned from storage: %v", obj) + } + objVal.Set(reflect.ValueOf(elem.Object).Elem()) + } else { + objVal.Set(reflect.Zero(objVal.Type())) + if !ignoreNotFound { + return NewKeyNotFoundError(key, int64(readResourceVersion)) + } + } + return nil +} + +// Implements storage.Interface. +func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error { + if resourceVersion == "" { + // If resourceVersion is not specified, serve it from underlying + // storage (for backward compatibility). + return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj) + } + + // If resourceVersion is specified, serve it from cache. + // It's guaranteed that the returned value is at least that + // fresh as the given resourceVersion. + listRV, err := ParseListResourceVersion(resourceVersion) + if err != nil { + return err + } + + trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String())) + defer trace.LogIfLong(500 * time.Millisecond) + + c.ready.wait() + trace.Step("Ready") + + // List elements with at least 'listRV' from cache. + listPtr, err := meta.GetItemsPtr(listObj) + if err != nil { + return err + } + listVal, err := conversion.EnforcePtr(listPtr) + if err != nil || listVal.Kind() != reflect.Slice { + return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) + } + filter := filterFunction(key, pred) + + obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) + if err != nil { + return err + } + trace.Step("Got from cache") + + if exists { + elem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("non *storeElement returned from storage: %v", obj) + } + if filter(elem.Key, elem.Object) { + listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) + } + } + if c.versioner != nil { + if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil { + return err + } + } + return nil +} + +// Implements storage.Interface. +func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error { + if resourceVersion == "" { + // If resourceVersion is not specified, serve it from underlying + // storage (for backward compatibility). + return c.storage.List(ctx, key, resourceVersion, pred, listObj) + } + + // If resourceVersion is specified, serve it from cache. + // It's guaranteed that the returned value is at least that + // fresh as the given resourceVersion. + listRV, err := ParseListResourceVersion(resourceVersion) + if err != nil { + return err + } + + trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String())) + defer trace.LogIfLong(500 * time.Millisecond) + + c.ready.wait() + trace.Step("Ready") + + // List elements with at least 'listRV' from cache. + listPtr, err := meta.GetItemsPtr(listObj) + if err != nil { + return err + } + listVal, err := conversion.EnforcePtr(listPtr) + if err != nil || listVal.Kind() != reflect.Slice { + return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) + } + filter := filterFunction(key, pred) + + objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace) + if err != nil { + return err + } + trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs))) + if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() { + // Resize the slice appropriately, since we already know that none + // of the elements will be filtered out. + listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs))) + trace.Step("Resized result") + } + for _, obj := range objs { + elem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("non *storeElement returned from storage: %v", obj) + } + if filter(elem.Key, elem.Object) { + listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) + } + } + trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len())) + if c.versioner != nil { + if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil { + return err + } + } + return nil +} + +// Implements storage.Interface. +func (c *Cacher) GuaranteedUpdate( + ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, + preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error { + // Ignore the suggestion and try to pass down the current version of the object + // read from cache. + if elem, exists, err := c.watchCache.GetByKey(key); err != nil { + glog.Errorf("GetByKey returned error: %v", err) + } else if exists { + currObj, copyErr := c.copier.Copy(elem.(*storeElement).Object) + if copyErr == nil { + return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj) + } + glog.Errorf("couldn't copy object: %v", copyErr) + } + // If we couldn't get the object, fallback to no-suggestion. + return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate) +} + +func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { + // TODO: Currently we assume that in a given Cacher object, its + // is aware of exactly the same trigger (at most one). Thus calling: + // c.triggerFunc() + // can return only 0 or 1 values. + // That means, that triggerValues itself may return up to 2 different values. + if c.triggerFunc == nil { + return nil, false + } + result := make([]string, 0, 2) + matchValues := c.triggerFunc(event.Object) + if len(matchValues) > 0 { + result = append(result, matchValues[0].Value) + } + if event.PrevObject == nil { + return result, len(result) > 0 + } + prevMatchValues := c.triggerFunc(event.PrevObject) + if len(prevMatchValues) > 0 { + if len(result) == 0 || result[0] != prevMatchValues[0].Value { + result = append(result, prevMatchValues[0].Value) + } + } + return result, len(result) > 0 +} + +func (c *Cacher) processEvent(event *watchCacheEvent) { + if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { + // Monitor if this gets backed up, and how much. + glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) + } + c.incoming <- *event +} + +func (c *Cacher) dispatchEvents() { + for { + select { + case event, ok := <-c.incoming: + if !ok { + return + } + c.dispatchEvent(&event) + case <-c.stopCh: + return + } + } +} + +func (c *Cacher) dispatchEvent(event *watchCacheEvent) { + triggerValues, supported := c.triggerValues(event) + + c.Lock() + defer c.Unlock() + // Iterate over "allWatchers" no matter what the trigger function is. + for _, watcher := range c.watchers.allWatchers { + watcher.add(event, c.dispatchTimeoutBudget) + } + if supported { + // Iterate over watchers interested in the given values of the trigger. + for _, triggerValue := range triggerValues { + for _, watcher := range c.watchers.valueWatchers[triggerValue] { + watcher.add(event, c.dispatchTimeoutBudget) + } + } + } else { + // supported equal to false generally means that trigger function + // is not defined (or not aware of any indexes). In this case, + // watchers filters should generally also don't generate any + // trigger values, but can cause problems in case of some + // misconfiguration. Thus we paranoidly leave this branch. + + // Iterate over watchers interested in exact values for all values. + for _, watchers := range c.watchers.valueWatchers { + for _, watcher := range watchers { + watcher.add(event, c.dispatchTimeoutBudget) + } + } + } +} + +func (c *Cacher) terminateAllWatchers() { + c.Lock() + defer c.Unlock() + c.watchers.terminateAll(c.objectType) +} + +func (c *Cacher) isStopped() bool { + c.stopLock.RLock() + defer c.stopLock.RUnlock() + return c.stopped +} + +func (c *Cacher) Stop() { + c.stopLock.Lock() + c.stopped = true + c.stopLock.Unlock() + close(c.stopCh) + c.stopWg.Wait() +} + +func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) { + return func(lock bool) { + if lock { + c.Lock() + defer c.Unlock() + } + // It's possible that the watcher is already not in the structure (e.g. in case of + // simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything. + c.watchers.deleteWatcher(index, triggerValue, triggerSupported) + } +} + +func filterFunction(key string, p SelectionPredicate) func(string, runtime.Object) bool { + f := SimpleFilter(p) + filterFunc := func(objKey string, obj runtime.Object) bool { + if !hasPathPrefix(objKey, key) { + return false + } + return f(obj) + } + return filterFunc +} + +func watchFilterFunction(key string, p SelectionPredicate) watchFilterFunc { + filterFunc := func(objKey string, label labels.Set, field fields.Set) bool { + if !hasPathPrefix(objKey, key) { + return false + } + return p.MatchesLabelsAndFields(label, field) + } + return filterFunc +} + +// Returns resource version to which the underlying cache is synced. +func (c *Cacher) LastSyncResourceVersion() (uint64, error) { + c.ready.wait() + + resourceVersion := c.reflector.LastSyncResourceVersion() + if resourceVersion == "" { + return 0, nil + } + + return strconv.ParseUint(resourceVersion, 10, 64) +} + +// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher. +type cacherListerWatcher struct { + storage Interface + resourcePrefix string + newListFunc func() runtime.Object +} + +func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher { + return &cacherListerWatcher{ + storage: storage, + resourcePrefix: resourcePrefix, + newListFunc: newListFunc, + } +} + +// Implements cache.ListerWatcher interface. +func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { + list := lw.newListFunc() + if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil { + return nil, err + } + return list, nil +} + +// Implements cache.ListerWatcher interface. +func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { + return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything) +} + +// cacherWatch implements watch.Interface to return a single error +type errWatcher struct { + result chan watch.Event +} + +func newErrWatcher(err error) *errWatcher { + // Create an error event + errEvent := watch.Event{Type: watch.Error} + switch err := err.(type) { + case runtime.Object: + errEvent.Object = err + case *errors.StatusError: + errEvent.Object = &err.ErrStatus + default: + errEvent.Object = &metav1.Status{ + Status: metav1.StatusFailure, + Message: err.Error(), + Reason: metav1.StatusReasonInternalError, + Code: http.StatusInternalServerError, + } + } + + // Create a watcher with room for a single event, populate it, and close the channel + watcher := &errWatcher{result: make(chan watch.Event, 1)} + watcher.result <- errEvent + close(watcher.result) + + return watcher +} + +// Implements watch.Interface. +func (c *errWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +// Implements watch.Interface. +func (c *errWatcher) Stop() { + // no-op +} + +// cacherWatch implements watch.Interface +type cacheWatcher struct { + sync.Mutex + copier runtime.ObjectCopier + input chan watchCacheEvent + result chan watch.Event + done chan struct{} + filter watchFilterFunc + stopped bool + forget func(bool) +} + +func newCacheWatcher(copier runtime.ObjectCopier, resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher { + watcher := &cacheWatcher{ + copier: copier, + input: make(chan watchCacheEvent, chanSize), + result: make(chan watch.Event, chanSize), + done: make(chan struct{}), + filter: filter, + stopped: false, + forget: forget, + } + go watcher.process(initEvents, resourceVersion) + return watcher +} + +// Implements watch.Interface. +func (c *cacheWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +// Implements watch.Interface. +func (c *cacheWatcher) Stop() { + c.forget(true) + c.stop() +} + +func (c *cacheWatcher) stop() { + c.Lock() + defer c.Unlock() + if !c.stopped { + c.stopped = true + close(c.done) + close(c.input) + } +} + +var timerPool sync.Pool + +func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) { + // Try to send the event immediately, without blocking. + select { + case c.input <- *event: + return + default: + } + + // OK, block sending, but only for up to . + // cacheWatcher.add is called very often, so arrange + // to reuse timers instead of constantly allocating. + startTime := time.Now() + timeout := budget.takeAvailable() + + t, ok := timerPool.Get().(*time.Timer) + if ok { + t.Reset(timeout) + } else { + t = time.NewTimer(timeout) + } + defer timerPool.Put(t) + + select { + case c.input <- *event: + stopped := t.Stop() + if !stopped { + // Consume triggered (but not yet received) timer event + // so that future reuse does not get a spurious timeout. + <-t.C + } + case <-t.C: + // This means that we couldn't send event to that watcher. + // Since we don't want to block on it infinitely, + // we simply terminate it. + c.forget(false) + c.stop() + } + + budget.returnUnused(timeout - time.Since(startTime)) +} + +// NOTE: sendWatchCacheEvent is assumed to not modify !!! +func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { + curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) + oldObjPasses := false + if event.PrevObject != nil { + oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields) + } + if !curObjPasses && !oldObjPasses { + // Watcher is not interested in that object. + return + } + + object, err := c.copier.Copy(event.Object) + if err != nil { + glog.Errorf("unexpected copy error: %v", err) + return + } + var watchEvent watch.Event + switch { + case curObjPasses && !oldObjPasses: + watchEvent = watch.Event{Type: watch.Added, Object: object} + case curObjPasses && oldObjPasses: + watchEvent = watch.Event{Type: watch.Modified, Object: object} + case !curObjPasses && oldObjPasses: + watchEvent = watch.Event{Type: watch.Deleted, Object: object} + } + + // We need to ensure that if we put event X to the c.result, all + // previous events were already put into it before, no matter whether + // c.done is close or not. + // Thus we cannot simply select from c.done and c.result and this + // would give us non-determinism. + // At the same time, we don't want to block infinitely on putting + // to c.result, when c.done is already closed. + + // This ensures that with c.done already close, we at most once go + // into the next select after this. With that, no matter which + // statement we choose there, we will deliver only consecutive + // events. + select { + case <-c.done: + return + default: + } + + select { + case c.result <- watchEvent: + case <-c.done: + } +} + +func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) { + defer utilruntime.HandleCrash() + + // Check how long we are processing initEvents. + // As long as these are not processed, we are not processing + // any incoming events, so if it takes long, we may actually + // block all watchers for some time. + // TODO: From the logs it seems that there happens processing + // times even up to 1s which is very long. However, this doesn't + // depend that much on the number of initEvents. E.g. from the + // 2000-node Kubemark run we have logs like this, e.g.: + // ... processing 13862 initEvents took 66.808689ms + // ... processing 14040 initEvents took 993.532539ms + // We should understand what is blocking us in those cases (e.g. + // is it lack of CPU, network, or sth else) and potentially + // consider increase size of result buffer in those cases. + const initProcessThreshold = 500 * time.Millisecond + startTime := time.Now() + for _, event := range initEvents { + c.sendWatchCacheEvent(event) + } + processingTime := time.Since(startTime) + if processingTime > initProcessThreshold { + objType := "" + if len(initEvents) > 0 { + objType = reflect.TypeOf(initEvents[0].Object).String() + } + glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime) + } + + defer close(c.result) + defer c.Stop() + for { + event, ok := <-c.input + if !ok { + return + } + // only send events newer than resourceVersion + if event.ResourceVersion > resourceVersion { + c.sendWatchCacheEvent(&event) + } + } +} + +type ready struct { + ok bool + c *sync.Cond +} + +func newReady() *ready { + return &ready{c: sync.NewCond(&sync.Mutex{})} +} + +func (r *ready) wait() { + r.c.L.Lock() + for !r.ok { + r.c.Wait() + } + r.c.L.Unlock() +} + +func (r *ready) set(ok bool) { + r.c.L.Lock() + defer r.c.L.Unlock() + r.ok = ok + r.c.Broadcast() +} diff --git a/pkg/storage/cacher_whitebox_test.go b/pkg/storage/cacher_whitebox_test.go new file mode 100644 index 000000000..33fe54255 --- /dev/null +++ b/pkg/storage/cacher_whitebox_test.go @@ -0,0 +1,56 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/pkg/api" +) + +// verifies the cacheWatcher.process goroutine is properly cleaned up even if +// the writes to cacheWatcher.result channel is blocked. +func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { + var lock sync.RWMutex + count := 0 + filter := func(string, labels.Set, fields.Set) bool { return true } + forget := func(bool) { + lock.Lock() + defer lock.Unlock() + count++ + } + initEvents := []*watchCacheEvent{ + {Object: &api.Pod{}}, + {Object: &api.Pod{}}, + } + // set the size of the buffer of w.result to 0, so that the writes to + // w.result is blocked. + w := newCacheWatcher(api.Scheme, 0, 0, initEvents, filter, forget) + w.Stop() + if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { + lock.RLock() + defer lock.RUnlock() + return count == 2, nil + }); err != nil { + t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err) + } +} diff --git a/pkg/storage/doc.go b/pkg/storage/doc.go new file mode 100644 index 000000000..fbdd94468 --- /dev/null +++ b/pkg/storage/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Interfaces for database-related operations. +package storage // import "k8s.io/apiserver/pkg/storage" diff --git a/pkg/storage/errors.go b/pkg/storage/errors.go new file mode 100644 index 000000000..a4d134ac9 --- /dev/null +++ b/pkg/storage/errors.go @@ -0,0 +1,170 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/util/validation/field" +) + +const ( + ErrCodeKeyNotFound int = iota + 1 + ErrCodeKeyExists + ErrCodeResourceVersionConflicts + ErrCodeInvalidObj + ErrCodeUnreachable +) + +var errCodeToMessage = map[int]string{ + ErrCodeKeyNotFound: "key not found", + ErrCodeKeyExists: "key exists", + ErrCodeResourceVersionConflicts: "resource version conflicts", + ErrCodeInvalidObj: "invalid object", + ErrCodeUnreachable: "server unreachable", +} + +func NewKeyNotFoundError(key string, rv int64) *StorageError { + return &StorageError{ + Code: ErrCodeKeyNotFound, + Key: key, + ResourceVersion: rv, + } +} + +func NewKeyExistsError(key string, rv int64) *StorageError { + return &StorageError{ + Code: ErrCodeKeyExists, + Key: key, + ResourceVersion: rv, + } +} + +func NewResourceVersionConflictsError(key string, rv int64) *StorageError { + return &StorageError{ + Code: ErrCodeResourceVersionConflicts, + Key: key, + ResourceVersion: rv, + } +} + +func NewUnreachableError(key string, rv int64) *StorageError { + return &StorageError{ + Code: ErrCodeUnreachable, + Key: key, + ResourceVersion: rv, + } +} + +func NewInvalidObjError(key, msg string) *StorageError { + return &StorageError{ + Code: ErrCodeInvalidObj, + Key: key, + AdditionalErrorMsg: msg, + } +} + +type StorageError struct { + Code int + Key string + ResourceVersion int64 + AdditionalErrorMsg string +} + +func (e *StorageError) Error() string { + return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %s", + errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.AdditionalErrorMsg) +} + +// IsNotFound returns true if and only if err is "key" not found error. +func IsNotFound(err error) bool { + return isErrCode(err, ErrCodeKeyNotFound) +} + +// IsNodeExist returns true if and only if err is an node already exist error. +func IsNodeExist(err error) bool { + return isErrCode(err, ErrCodeKeyExists) +} + +// IsUnreachable returns true if and only if err indicates the server could not be reached. +func IsUnreachable(err error) bool { + return isErrCode(err, ErrCodeUnreachable) +} + +// IsConflict returns true if and only if err is a write conflict. +func IsConflict(err error) bool { + return isErrCode(err, ErrCodeResourceVersionConflicts) +} + +// IsInvalidObj returns true if and only if err is invalid error +func IsInvalidObj(err error) bool { + return isErrCode(err, ErrCodeInvalidObj) +} + +func isErrCode(err error, code int) bool { + if err == nil { + return false + } + if e, ok := err.(*StorageError); ok { + return e.Code == code + } + return false +} + +// InvalidError is generated when an error caused by invalid API object occurs +// in the storage package. +type InvalidError struct { + Errs field.ErrorList +} + +func (e InvalidError) Error() string { + return e.Errs.ToAggregate().Error() +} + +// IsInvalidError returns true if and only if err is an InvalidError. +func IsInvalidError(err error) bool { + _, ok := err.(InvalidError) + return ok +} + +func NewInvalidError(errors field.ErrorList) InvalidError { + return InvalidError{errors} +} + +// InternalError is generated when an error occurs in the storage package, i.e., +// not from the underlying storage backend (e.g., etcd). +type InternalError struct { + Reason string +} + +func (e InternalError) Error() string { + return e.Reason +} + +// IsInternalError returns true if and only if err is an InternalError. +func IsInternalError(err error) bool { + _, ok := err.(InternalError) + return ok +} + +func NewInternalError(reason string) InternalError { + return InternalError{reason} +} + +func NewInternalErrorf(format string, a ...interface{}) InternalError { + return InternalError{fmt.Sprintf(format, a)} +} diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go new file mode 100644 index 000000000..e8181c3e8 --- /dev/null +++ b/pkg/storage/interfaces.go @@ -0,0 +1,181 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "golang.org/x/net/context" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" +) + +// Versioner abstracts setting and retrieving metadata fields from database response +// onto the object ot list. +type Versioner interface { + // UpdateObject sets storage metadata into an API object. Returns an error if the object + // cannot be updated correctly. May return nil if the requested object does not need metadata + // from database. + UpdateObject(obj runtime.Object, resourceVersion uint64) error + // UpdateList sets the resource version into an API list object. Returns an error if the object + // cannot be updated correctly. May return nil if the requested object does not need metadata + // from database. + UpdateList(obj runtime.Object, resourceVersion uint64) error + // ObjectResourceVersion returns the resource version (for persistence) of the specified object. + // Should return an error if the specified object does not have a persistable version. + ObjectResourceVersion(obj runtime.Object) (uint64, error) +} + +// ResponseMeta contains information about the database metadata that is associated with +// an object. It abstracts the actual underlying objects to prevent coupling with concrete +// database and to improve testability. +type ResponseMeta struct { + // TTL is the time to live of the node that contained the returned object. It may be + // zero or negative in some cases (objects may be expired after the requested + // expiration time due to server lag). + TTL int64 + // The resource version of the node that contained the returned object. + ResourceVersion uint64 +} + +// MatchValue defines a pair (, ). +type MatchValue struct { + IndexName string + Value string +} + +// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs +// (, ) for all indexes known +// to that function. +type TriggerPublisherFunc func(obj runtime.Object) []MatchValue + +// FilterFunc takes an API object and returns true if the object satisfies some requirements. +// TODO: We will remove this type and use SelectionPredicate everywhere. +type FilterFunc func(obj runtime.Object) bool + +// Everything accepts all objects. +var Everything = SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), +} + +// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update +// that is guaranteed to succeed. +// See the comment for GuaranteedUpdate for more details. +type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error) + +// Preconditions must be fulfilled before an operation (update, delete, etc.) is carried out. +type Preconditions struct { + // Specifies the target UID. + // +optional + UID *types.UID `json:"uid,omitempty"` +} + +// NewUIDPreconditions returns a Preconditions with UID set. +func NewUIDPreconditions(uid string) *Preconditions { + u := types.UID(uid) + return &Preconditions{UID: &u} +} + +// Interface offers a common interface for object marshaling/unmarshaling operations and +// hides all the storage-related operations behind it. +type Interface interface { + // Returns Versioner associated with this interface. + Versioner() Versioner + + // Create adds a new object at a key unless it already exists. 'ttl' is time-to-live + // in seconds (0 means forever). If no error is returned and out is not nil, out will be + // set to the read value from database. + Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error + + // Delete removes the specified key and returns the value that existed at that spot. + // If key didn't exist, it will return NotFound storage error. + Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error + + // Watch begins watching the specified key. Events are decoded into API objects, + // and any items selected by 'p' are sent down to returned watch.Interface. + // resourceVersion may be used to specify what version to begin watching, + // which should be the current resourceVersion, and no longer rv+1 + // (e.g. reconnecting without missing any updates). + // If resource version is "0", this interface will get current object at given key + // and send it in an "ADDED" event, before watch starts. + Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) + + // WatchList begins watching the specified key's items. Items are decoded into API + // objects and any item selected by 'p' are sent down to returned watch.Interface. + // resourceVersion may be used to specify what version to begin watching, + // which should be the current resourceVersion, and no longer rv+1 + // (e.g. reconnecting without missing any updates). + // If resource version is "0", this interface will list current objects directory defined by key + // and send them in "ADDED" events, before watch starts. + WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) + + // Get unmarshals json found at key into objPtr. On a not found error, will either + // return a zero object of the requested type, or an error, depending on ignoreNotFound. + // Treats empty responses and nil response nodes exactly like a not found error. + // The returned contents may be delayed, but it is guaranteed that they will + // be have at least 'resourceVersion'. + Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error + + // GetToList unmarshals json found at key and opaque it into *List api object + // (an object that satisfies the runtime.IsList definition). + // The returned contents may be delayed, but it is guaranteed that they will + // be have at least 'resourceVersion'. + GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error + + // List unmarshalls jsons found at directory defined by key and opaque them + // into *List api object (an object that satisfies runtime.IsList definition). + // The returned contents may be delayed, but it is guaranteed that they will + // be have at least 'resourceVersion'. + List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error + + // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') + // retrying the update until success if there is index conflict. + // Note that object passed to tryUpdate may change across invocations of tryUpdate() if + // other writers are simultaneously updating it, so tryUpdate() needs to take into account + // the current contents of the object when deciding how the update object should look. + // If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false + // or zero value in 'ptrToType' parameter otherwise. + // If the object to update has the same value as previous, it won't do any update + // but will return the object in 'ptrToType' parameter. + // If 'suggestion' can contain zero or one element - in such case this can be used as + // a suggestion about the current version of the object to avoid read operation from + // storage to get it. + // + // Example: + // + // s := /* implementation of Interface */ + // err := s.GuaranteedUpdate( + // "myKey", &MyType{}, true, + // func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { + // // Before each incovation of the user defined function, "input" is reset to + // // current contents for "myKey" in database. + // curr := input.(*MyType) // Guaranteed to succeed. + // + // // Make the modification + // curr.Counter++ + // + // // Return the modified object - return an error to stop iterating. Return + // // a uint64 to alter the TTL on the object, or nil to keep it the same value. + // return cur, nil, nil + // } + // }) + GuaranteedUpdate( + ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, + precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error +} diff --git a/pkg/storage/selection_predicate.go b/pkg/storage/selection_predicate.go new file mode 100644 index 000000000..c4f79288d --- /dev/null +++ b/pkg/storage/selection_predicate.go @@ -0,0 +1,90 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" +) + +// AttrFunc returns label and field sets for List or Watch to match. +// In any failure to parse given object, it returns error. +type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, error) + +// SelectionPredicate is used to represent the way to select objects from api storage. +type SelectionPredicate struct { + Label labels.Selector + Field fields.Selector + GetAttrs AttrFunc + IndexFields []string +} + +// Matches returns true if the given object's labels and fields (as +// returned by s.GetAttrs) match s.Label and s.Field. An error is +// returned if s.GetAttrs fails. +func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) { + if s.Label.Empty() && s.Field.Empty() { + return true, nil + } + labels, fields, err := s.GetAttrs(obj) + if err != nil { + return false, err + } + matched := s.Label.Matches(labels) + if matched && s.Field != nil { + matched = (matched && s.Field.Matches(fields)) + } + return matched, nil +} + +// MatchesLabelsAndFields returns true if the given labels and fields +// match s.Label and s.Field. +func (s *SelectionPredicate) MatchesLabelsAndFields(l labels.Set, f fields.Set) bool { + if s.Label.Empty() && s.Field.Empty() { + return true + } + matched := s.Label.Matches(l) + if matched && s.Field != nil { + matched = (matched && s.Field.Matches(f)) + } + return matched +} + +// MatchesSingle will return (name, true) if and only if s.Field matches on the object's +// name. +func (s *SelectionPredicate) MatchesSingle() (string, bool) { + // TODO: should be namespace.name + if name, ok := s.Field.RequiresExactMatch("metadata.name"); ok { + return name, true + } + return "", false +} + +// For any index defined by IndexFields, if a matcher can match only (a subset) +// of objects that return for a given index, a pair (, ) +// wil be returned. +// TODO: Consider supporting also labels. +func (s *SelectionPredicate) MatcherIndex() []MatchValue { + var result []MatchValue + for _, field := range s.IndexFields { + if value, ok := s.Field.RequiresExactMatch(field); ok { + result = append(result, MatchValue{IndexName: field, Value: value}) + } + } + return result +} diff --git a/pkg/storage/selection_predicate_test.go b/pkg/storage/selection_predicate_test.go new file mode 100644 index 000000000..d51666e21 --- /dev/null +++ b/pkg/storage/selection_predicate_test.go @@ -0,0 +1,119 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "errors" + "testing" + + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type Ignored struct { + ID string +} + +type IgnoredList struct { + Items []Ignored +} + +func (obj *Ignored) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind } +func (obj *IgnoredList) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind } + +func TestSelectionPredicate(t *testing.T) { + table := map[string]struct { + labelSelector, fieldSelector string + labels labels.Set + fields fields.Set + err error + shouldMatch bool + matchSingleKey string + }{ + "A": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + labels: labels.Set{"name": "foo"}, + fields: fields.Set{"uid": "12345"}, + shouldMatch: true, + }, + "B": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + labels: labels.Set{"name": "foo"}, + fields: fields.Set{}, + shouldMatch: false, + }, + "C": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + labels: labels.Set{}, + fields: fields.Set{"uid": "12345"}, + shouldMatch: false, + }, + "D": { + fieldSelector: "metadata.name=12345", + labels: labels.Set{}, + fields: fields.Set{"metadata.name": "12345"}, + shouldMatch: true, + matchSingleKey: "12345", + }, + "error": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + err: errors.New("maybe this is a 'wrong object type' error"), + shouldMatch: false, + }, + } + + for name, item := range table { + parsedLabel, err := labels.Parse(item.labelSelector) + if err != nil { + panic(err) + } + parsedField, err := fields.ParseSelector(item.fieldSelector) + if err != nil { + panic(err) + } + sp := &SelectionPredicate{ + Label: parsedLabel, + Field: parsedField, + GetAttrs: func(runtime.Object) (label labels.Set, field fields.Set, err error) { + return item.labels, item.fields, item.err + }, + } + got, err := sp.Matches(&Ignored{}) + if e, a := item.err, err; e != a { + t.Errorf("%v: expected %v, got %v", name, e, a) + continue + } + if e, a := item.shouldMatch, got; e != a { + t.Errorf("%v: expected %v, got %v", name, e, a) + } + if key := item.matchSingleKey; key != "" { + got, ok := sp.MatchesSingle() + if !ok { + t.Errorf("%v: expected single match", name) + } + if e, a := key, got; e != a { + t.Errorf("%v: expected %v, got %v", name, e, a) + } + } + } +} diff --git a/pkg/storage/time_budget.go b/pkg/storage/time_budget.go new file mode 100644 index 000000000..0febb794b --- /dev/null +++ b/pkg/storage/time_budget.go @@ -0,0 +1,95 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "sync" + "time" +) + +const ( + refreshPerSecond = 50 * time.Millisecond + maxBudget = 250 * time.Millisecond +) + +// timeBudget implements a budget of time that you can use and is +// periodically being refreshed. The pattern to use it is: +// budget := newTimeBudget(...) +// ... +// timeout := budget.takeAvailable() +// // Now you can spend at most timeout on doing stuff +// ... +// // If you didn't use all timeout, return what you didn't use +// budget.returnUnused() +// +// NOTE: It's not recommended to be used concurrently from multiple threads - +// if first user takes the whole timeout, the second one will get 0 timeout +// even though the first one may return something later. +type timeBudget struct { + sync.Mutex + budget time.Duration + + refresh time.Duration + maxBudget time.Duration +} + +func newTimeBudget(stopCh <-chan struct{}) *timeBudget { + result := &timeBudget{ + budget: time.Duration(0), + refresh: refreshPerSecond, + maxBudget: maxBudget, + } + go result.periodicallyRefresh(stopCh) + return result +} + +func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + t.Lock() + if t.budget = t.budget + t.refresh; t.budget > t.maxBudget { + t.budget = t.maxBudget + } + t.Unlock() + case <-stopCh: + return + } + } +} + +func (t *timeBudget) takeAvailable() time.Duration { + t.Lock() + defer t.Unlock() + result := t.budget + t.budget = time.Duration(0) + return result +} + +func (t *timeBudget) returnUnused(unused time.Duration) { + t.Lock() + defer t.Unlock() + if unused < 0 { + // We used more than allowed. + return + } + if t.budget = t.budget + unused; t.budget > t.maxBudget { + t.budget = t.maxBudget + } +} diff --git a/pkg/storage/time_budget_test.go b/pkg/storage/time_budget_test.go new file mode 100644 index 000000000..99ba19bd5 --- /dev/null +++ b/pkg/storage/time_budget_test.go @@ -0,0 +1,53 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "testing" + "time" +) + +func TestTimeBudget(t *testing.T) { + budget := &timeBudget{ + budget: time.Duration(0), + maxBudget: time.Duration(200), + } + if res := budget.takeAvailable(); res != time.Duration(0) { + t.Errorf("Expected: %v, got: %v", time.Duration(0), res) + } + budget.budget = time.Duration(100) + if res := budget.takeAvailable(); res != time.Duration(100) { + t.Errorf("Expected: %v, got: %v", time.Duration(100), res) + } + if res := budget.takeAvailable(); res != time.Duration(0) { + t.Errorf("Expected: %v, got: %v", time.Duration(0), res) + } + budget.returnUnused(time.Duration(50)) + if res := budget.takeAvailable(); res != time.Duration(50) { + t.Errorf("Expected: %v, got: %v", time.Duration(50), res) + } + budget.budget = time.Duration(100) + budget.returnUnused(-time.Duration(50)) + if res := budget.takeAvailable(); res != time.Duration(100) { + t.Errorf("Expected: %v, got: %v", time.Duration(100), res) + } + // test overflow. + budget.returnUnused(time.Duration(500)) + if res := budget.takeAvailable(); res != time.Duration(200) { + t.Errorf("Expected: %v, got: %v", time.Duration(200), res) + } +} diff --git a/pkg/storage/util.go b/pkg/storage/util.go new file mode 100644 index 000000000..3e0b7211b --- /dev/null +++ b/pkg/storage/util.go @@ -0,0 +1,161 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "fmt" + "strconv" + "strings" + "sync/atomic" + + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/validation/path" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation/field" +) + +type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error) + +// SimpleUpdateFunc converts SimpleUpdateFunc into UpdateFunc +func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc { + return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) { + out, err := fn(input) + return out, nil, err + } +} + +// SimpleFilter converts a selection predicate into a FilterFunc. +// It ignores any error from Matches(). +func SimpleFilter(p SelectionPredicate) FilterFunc { + return func(obj runtime.Object) bool { + matches, err := p.Matches(obj) + if err != nil { + glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err) + return false + } + return matches + } +} + +func EverythingFunc(runtime.Object) bool { + return true +} + +func NoTriggerFunc() []MatchValue { + return nil +} + +func NoTriggerPublisher(runtime.Object) []MatchValue { + return nil +} + +// ParseWatchResourceVersion takes a resource version argument and converts it to +// the etcd version we should pass to helper.Watch(). Because resourceVersion is +// an opaque value, the default watch behavior for non-zero watch is to watch +// the next value (if you pass "1", you will see updates from "2" onwards). +func ParseWatchResourceVersion(resourceVersion string) (uint64, error) { + if resourceVersion == "" || resourceVersion == "0" { + return 0, nil + } + version, err := strconv.ParseUint(resourceVersion, 10, 64) + if err != nil { + return 0, NewInvalidError(field.ErrorList{ + // Validation errors are supposed to return version-specific field + // paths, but this is probably close enough. + field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()), + }) + } + return version, nil +} + +// ParseListResourceVersion takes a resource version argument and converts it to +// the etcd version. +func ParseListResourceVersion(resourceVersion string) (uint64, error) { + if resourceVersion == "" { + return 0, nil + } + version, err := strconv.ParseUint(resourceVersion, 10, 64) + return version, err +} + +func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", err + } + name := meta.GetName() + if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 { + return "", fmt.Errorf("invalid name: %v", msgs) + } + return prefix + "/" + meta.GetNamespace() + "/" + name, nil +} + +func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", err + } + name := meta.GetName() + if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 { + return "", fmt.Errorf("invalid name: %v", msgs) + } + return prefix + "/" + name, nil +} + +// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary +func hasPathPrefix(s, pathPrefix string) bool { + // Short circuit if s doesn't contain the prefix at all + if !strings.HasPrefix(s, pathPrefix) { + return false + } + + pathPrefixLength := len(pathPrefix) + + if len(s) == pathPrefixLength { + // Exact match + return true + } + if strings.HasSuffix(pathPrefix, "/") { + // pathPrefix already ensured a path segment boundary + return true + } + if s[pathPrefixLength:pathPrefixLength+1] == "/" { + // The next character in s is a path segment boundary + // Check this instead of normalizing pathPrefix to avoid allocating on every call + return true + } + return false +} + +// HighWaterMark is a thread-safe object for tracking the maximum value seen +// for some quantity. +type HighWaterMark int64 + +// Update returns true if and only if 'current' is the highest value ever seen. +func (hwm *HighWaterMark) Update(current int64) bool { + for { + old := atomic.LoadInt64((*int64)(hwm)) + if current <= old { + return false + } + if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { + return true + } + } +} diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go new file mode 100644 index 000000000..6eba9b499 --- /dev/null +++ b/pkg/storage/util_test.go @@ -0,0 +1,136 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "math/rand" + "sync" + "testing" +) + +func TestEtcdParseWatchResourceVersion(t *testing.T) { + testCases := []struct { + Version string + ExpectVersion uint64 + Err bool + }{ + {Version: "", ExpectVersion: 0}, + {Version: "a", Err: true}, + {Version: " ", Err: true}, + {Version: "1", ExpectVersion: 1}, + {Version: "10", ExpectVersion: 10}, + } + for _, testCase := range testCases { + version, err := ParseWatchResourceVersion(testCase.Version) + switch { + case testCase.Err: + if err == nil { + t.Errorf("%s: unexpected non-error", testCase.Version) + continue + } + if !IsInvalidError(err) { + t.Errorf("%s: unexpected error: %v", testCase.Version, err) + continue + } + case !testCase.Err && err != nil: + t.Errorf("%s: unexpected error: %v", testCase.Version, err) + continue + } + if version != testCase.ExpectVersion { + t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version) + } + } +} + +func TestHasPathPrefix(t *testing.T) { + validTestcases := []struct { + s string + prefix string + }{ + // Exact matches + {"", ""}, + {"a", "a"}, + {"a/", "a/"}, + {"a/../", "a/../"}, + + // Path prefix matches + {"a/b", "a"}, + {"a/b", "a/"}, + {"中文/", "中文"}, + } + for i, tc := range validTestcases { + if !hasPathPrefix(tc.s, tc.prefix) { + t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix) + } + } + + invalidTestcases := []struct { + s string + prefix string + }{ + // Mismatch + {"a", "b"}, + + // Dir requirement + {"a", "a/"}, + + // Prefix mismatch + {"ns2", "ns"}, + {"ns2", "ns/"}, + {"中文文", "中文"}, + + // Ensure no normalization is applied + {"a/c/../b/", "a/b/"}, + {"a/", "a/b/.."}, + } + for i, tc := range invalidTestcases { + if hasPathPrefix(tc.s, tc.prefix) { + t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix) + } + } +} + +func TestHighWaterMark(t *testing.T) { + var h HighWaterMark + + for i := int64(10); i < 20; i++ { + if !h.Update(i) { + t.Errorf("unexpected false for %v", i) + } + if h.Update(i - 1) { + t.Errorf("unexpected true for %v", i-1) + } + } + + m := int64(0) + wg := sync.WaitGroup{} + for i := 0; i < 300; i++ { + wg.Add(1) + v := rand.Int63() + go func(v int64) { + defer wg.Done() + h.Update(v) + }(v) + if v > m { + m = v + } + } + wg.Wait() + if m != int64(h) { + t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) + } +} diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go new file mode 100644 index 000000000..b2d8f4a82 --- /dev/null +++ b/pkg/storage/watch_cache.go @@ -0,0 +1,468 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "fmt" + "sort" + "strconv" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + utiltrace "k8s.io/apiserver/pkg/util/trace" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/clock" +) + +const ( + // blockTimeout determines how long we're willing to block the request + // to wait for a given resource version to be propagated to cache, + // before terminating request and returning Timeout error with retry + // after suggestion. + blockTimeout = 3 * time.Second +) + +// watchCacheEvent is a single "watch event" that is send to users of +// watchCache. Additionally to a typical "watch.Event" it contains +// the previous value of the object to enable proper filtering in the +// upper layers. +type watchCacheEvent struct { + Type watch.EventType + Object runtime.Object + ObjLabels labels.Set + ObjFields fields.Set + PrevObject runtime.Object + PrevObjLabels labels.Set + PrevObjFields fields.Set + Key string + ResourceVersion uint64 +} + +// Computing a key of an object is generally non-trivial (it performs +// e.g. validation underneath). To avoid computing it multiple times +// (to serve the event in different List/Watch requests), in the +// underlying store we are keeping pair (key, object). +type storeElement struct { + Key string + Object runtime.Object +} + +func storeElementKey(obj interface{}) (string, error) { + elem, ok := obj.(*storeElement) + if !ok { + return "", fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Key, nil +} + +// watchCacheElement is a single "watch event" stored in a cache. +// It contains the resource version of the object and the object +// itself. +type watchCacheElement struct { + resourceVersion uint64 + watchCacheEvent *watchCacheEvent +} + +// watchCache implements a Store interface. +// However, it depends on the elements implementing runtime.Object interface. +// +// watchCache is a "sliding window" (with a limited capacity) of objects +// observed from a watch. +type watchCache struct { + sync.RWMutex + + // Condition on which lists are waiting for the fresh enough + // resource version. + cond *sync.Cond + + // Maximum size of history window. + capacity int + + // keyFunc is used to get a key in the underlying storage for a given object. + keyFunc func(runtime.Object) (string, error) + + // getAttrsFunc is used to get labels and fields of an object. + getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error) + + // cache is used a cyclic buffer - its first element (with the smallest + // resourceVersion) is defined by startIndex, its last element is defined + // by endIndex (if cache is full it will be startIndex + capacity). + // Both startIndex and endIndex can be greater than buffer capacity - + // you should always apply modulo capacity to get an index in cache array. + cache []watchCacheElement + startIndex int + endIndex int + + // store will effectively support LIST operation from the "end of cache + // history" i.e. from the moment just after the newest cached watched event. + // It is necessary to effectively allow clients to start watching at now. + // NOTE: We assume that is thread-safe. + store cache.Store + + // ResourceVersion up to which the watchCache is propagated. + resourceVersion uint64 + + // This handler is run at the end of every successful Replace() method. + onReplace func() + + // This handler is run at the end of every Add/Update/Delete method + // and additionally gets the previous value of the object. + onEvent func(*watchCacheEvent) + + // for testing timeouts. + clock clock.Clock +} + +func newWatchCache( + capacity int, + keyFunc func(runtime.Object) (string, error), + getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)) *watchCache { + wc := &watchCache{ + capacity: capacity, + keyFunc: keyFunc, + getAttrsFunc: getAttrsFunc, + cache: make([]watchCacheElement, capacity), + startIndex: 0, + endIndex: 0, + store: cache.NewStore(storeElementKey), + resourceVersion: 0, + clock: clock.RealClock{}, + } + wc.cond = sync.NewCond(wc.RLocker()) + return wc +} + +// Add takes runtime.Object as an argument. +func (w *watchCache) Add(obj interface{}) error { + object, resourceVersion, err := objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{Type: watch.Added, Object: object} + + f := func(elem *storeElement) error { return w.store.Add(elem) } + return w.processEvent(event, resourceVersion, f) +} + +// Update takes runtime.Object as an argument. +func (w *watchCache) Update(obj interface{}) error { + object, resourceVersion, err := objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{Type: watch.Modified, Object: object} + + f := func(elem *storeElement) error { return w.store.Update(elem) } + return w.processEvent(event, resourceVersion, f) +} + +// Delete takes runtime.Object as an argument. +func (w *watchCache) Delete(obj interface{}) error { + object, resourceVersion, err := objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{Type: watch.Deleted, Object: object} + + f := func(elem *storeElement) error { return w.store.Delete(elem) } + return w.processEvent(event, resourceVersion, f) +} + +func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) { + object, ok := obj.(runtime.Object) + if !ok { + return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) + } + meta, err := meta.Accessor(object) + if err != nil { + return nil, 0, err + } + resourceVersion, err := parseResourceVersion(meta.GetResourceVersion()) + if err != nil { + return nil, 0, err + } + return object, resourceVersion, nil +} + +func parseResourceVersion(resourceVersion string) (uint64, error) { + if resourceVersion == "" { + return 0, nil + } + // Use bitsize being the size of int on the machine. + return strconv.ParseUint(resourceVersion, 10, 0) +} + +func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { + key, err := w.keyFunc(event.Object) + if err != nil { + return fmt.Errorf("couldn't compute key: %v", err) + } + elem := &storeElement{Key: key, Object: event.Object} + + // TODO: We should consider moving this lock below after the watchCacheEvent + // is created. In such situation, the only problematic scenario is Replace( + // happening after getting object from store and before acquiring a lock. + // Maybe introduce another lock for this purpose. + w.Lock() + defer w.Unlock() + previous, exists, err := w.store.Get(elem) + if err != nil { + return err + } + objLabels, objFields, err := w.getAttrsFunc(event.Object) + if err != nil { + return err + } + var prevObject runtime.Object + var prevObjLabels labels.Set + var prevObjFields fields.Set + if exists { + prevObject = previous.(*storeElement).Object + prevObjLabels, prevObjFields, err = w.getAttrsFunc(prevObject) + if err != nil { + return err + } + } + watchCacheEvent := &watchCacheEvent{ + Type: event.Type, + Object: event.Object, + ObjLabels: objLabels, + ObjFields: objFields, + PrevObject: prevObject, + PrevObjLabels: prevObjLabels, + PrevObjFields: prevObjFields, + Key: key, + ResourceVersion: resourceVersion, + } + if w.onEvent != nil { + w.onEvent(watchCacheEvent) + } + w.updateCache(resourceVersion, watchCacheEvent) + w.resourceVersion = resourceVersion + w.cond.Broadcast() + return updateFunc(elem) +} + +// Assumes that lock is already held for write. +func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) { + if w.endIndex == w.startIndex+w.capacity { + // Cache is full - remove the oldest element. + w.startIndex++ + } + w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event} + w.endIndex++ +} + +// List returns list of pointers to objects. +func (w *watchCache) List() []interface{} { + return w.store.List() +} + +// waitUntilFreshAndBlock waits until cache is at least as fresh as given . +// NOTE: This function acquired lock and doesn't release it. +// You HAVE TO explicitly call w.RUnlock() after this function. +func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utiltrace.Trace) error { + startTime := w.clock.Now() + go func() { + // Wake us up when the time limit has expired. The docs + // promise that time.After (well, NewTimer, which it calls) + // will wait *at least* the duration given. Since this go + // routine starts sometime after we record the start time, and + // it will wake up the loop below sometime after the broadcast, + // we don't need to worry about waking it up before the time + // has expired accidentally. + <-w.clock.After(blockTimeout) + w.cond.Broadcast() + }() + + w.RLock() + if trace != nil { + trace.Step("watchCache locked acquired") + } + for w.resourceVersion < resourceVersion { + if w.clock.Since(startTime) >= blockTimeout { + // Timeout with retry after 1 second. + return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1) + } + w.cond.Wait() + } + if trace != nil { + trace.Step("watchCache fresh enough") + } + return nil +} + +// WaitUntilFreshAndList returns list of pointers to objects. +func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *utiltrace.Trace) ([]interface{}, uint64, error) { + err := w.waitUntilFreshAndBlock(resourceVersion, trace) + defer w.RUnlock() + if err != nil { + return nil, 0, err + } + return w.store.List(), w.resourceVersion, nil +} + +// WaitUntilFreshAndGet returns a pointers to object. +func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *utiltrace.Trace) (interface{}, bool, uint64, error) { + err := w.waitUntilFreshAndBlock(resourceVersion, trace) + defer w.RUnlock() + if err != nil { + return nil, false, 0, err + } + value, exists, err := w.store.GetByKey(key) + return value, exists, w.resourceVersion, err +} + +func (w *watchCache) ListKeys() []string { + return w.store.ListKeys() +} + +// Get takes runtime.Object as a parameter. However, it returns +// pointer to . +func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) { + object, ok := obj.(runtime.Object) + if !ok { + return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) + } + key, err := w.keyFunc(object) + if err != nil { + return nil, false, fmt.Errorf("couldn't compute key: %v", err) + } + + return w.store.Get(&storeElement{Key: key, Object: object}) +} + +// GetByKey returns pointer to . +func (w *watchCache) GetByKey(key string) (interface{}, bool, error) { + return w.store.GetByKey(key) +} + +// Replace takes slice of runtime.Object as a paramater. +func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { + version, err := parseResourceVersion(resourceVersion) + if err != nil { + return err + } + + toReplace := make([]interface{}, 0, len(objs)) + for _, obj := range objs { + object, ok := obj.(runtime.Object) + if !ok { + return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj) + } + key, err := w.keyFunc(object) + if err != nil { + return fmt.Errorf("couldn't compute key: %v", err) + } + toReplace = append(toReplace, &storeElement{Key: key, Object: object}) + } + + w.Lock() + defer w.Unlock() + + w.startIndex = 0 + w.endIndex = 0 + if err := w.store.Replace(toReplace, resourceVersion); err != nil { + return err + } + w.resourceVersion = version + if w.onReplace != nil { + w.onReplace() + } + w.cond.Broadcast() + return nil +} + +func (w *watchCache) SetOnReplace(onReplace func()) { + w.Lock() + defer w.Unlock() + w.onReplace = onReplace +} + +func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) { + w.Lock() + defer w.Unlock() + w.onEvent = onEvent +} + +func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) { + size := w.endIndex - w.startIndex + oldest := w.resourceVersion + if size > 0 { + oldest = w.cache[w.startIndex%w.capacity].resourceVersion + } + if resourceVersion == 0 { + // resourceVersion = 0 means that we don't require any specific starting point + // and we would like to start watching from ~now. + // However, to keep backward compatibility, we additionally need to return the + // current state and only then start watching from that point. + // + // TODO: In v2 api, we should stop returning the current state - #13969. + allItems := w.store.List() + result := make([]*watchCacheEvent, len(allItems)) + for i, item := range allItems { + elem, ok := item.(*storeElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", elem) + } + objLabels, objFields, err := w.getAttrsFunc(elem.Object) + if err != nil { + return nil, err + } + result[i] = &watchCacheEvent{ + Type: watch.Added, + Object: elem.Object, + ObjLabels: objLabels, + ObjFields: objFields, + Key: elem.Key, + ResourceVersion: w.resourceVersion, + } + } + return result, nil + } + if resourceVersion < oldest-1 { + return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) + } + + // Binary search the smallest index at which resourceVersion is greater than the given one. + f := func(i int) bool { + return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion + } + first := sort.Search(size, f) + result := make([]*watchCacheEvent, size-first) + for i := 0; i < size-first; i++ { + result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent + } + return result, nil +} + +func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) { + w.RLock() + defer w.RUnlock() + return w.GetAllEventsSinceThreadUnsafe(resourceVersion) +} + +func (w *watchCache) Resync() error { + // Nothing to do + return nil +} diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go new file mode 100644 index 000000000..4a82cf1e5 --- /dev/null +++ b/pkg/storage/watch_cache_test.go @@ -0,0 +1,367 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "strconv" + "testing" + "time" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/clock" +) + +func makeTestPod(name string, resourceVersion uint64) *api.Pod { + return &api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: name, + ResourceVersion: strconv.FormatUint(resourceVersion, 10), + }, + } +} + +// newTestWatchCache just adds a fake clock. +func newTestWatchCache(capacity int) *watchCache { + keyFunc := func(obj runtime.Object) (string, error) { + return NamespaceKeyFunc("prefix", obj) + } + getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) { + return nil, nil, nil + } + wc := newWatchCache(capacity, keyFunc, getAttrsFunc) + wc.clock = clock.NewFakeClock(time.Now()) + return wc +} + +func TestWatchCacheBasic(t *testing.T) { + store := newTestWatchCache(2) + + // Test Add/Update/Delete. + pod1 := makeTestPod("pod", 1) + if err := store.Add(pod1); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item, ok, _ := store.Get(pod1); !ok { + t.Errorf("didn't find pod") + } else { + if !apiequality.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod1}, item) { + t.Errorf("expected %v, got %v", pod1, item) + } + } + pod2 := makeTestPod("pod", 2) + if err := store.Update(pod2); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item, ok, _ := store.Get(pod2); !ok { + t.Errorf("didn't find pod") + } else { + if !apiequality.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod2}, item) { + t.Errorf("expected %v, got %v", pod1, item) + } + } + pod3 := makeTestPod("pod", 3) + if err := store.Delete(pod3); err != nil { + t.Errorf("unexpected error: %v", err) + } + if _, ok, _ := store.Get(pod3); ok { + t.Errorf("found pod") + } + + // Test List. + store.Add(makeTestPod("pod1", 4)) + store.Add(makeTestPod("pod2", 5)) + store.Add(makeTestPod("pod3", 6)) + { + podNames := sets.String{} + for _, item := range store.List() { + podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name) + } + if !podNames.HasAll("pod1", "pod2", "pod3") { + t.Errorf("missing pods, found %v", podNames) + } + if len(podNames) != 3 { + t.Errorf("found missing/extra items") + } + } + + // Test Replace. + store.Replace([]interface{}{ + makeTestPod("pod4", 7), + makeTestPod("pod5", 8), + }, "8") + { + podNames := sets.String{} + for _, item := range store.List() { + podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name) + } + if !podNames.HasAll("pod4", "pod5") { + t.Errorf("missing pods, found %v", podNames) + } + if len(podNames) != 2 { + t.Errorf("found missing/extra items") + } + } +} + +func TestEvents(t *testing.T) { + store := newTestWatchCache(5) + + store.Add(makeTestPod("pod", 3)) + + // Test for Added event. + { + _, err := store.GetAllEventsSince(1) + if err == nil { + t.Errorf("expected error too old") + } + if _, ok := err.(*errors.StatusError); !ok { + t.Errorf("expected error to be of type StatusError") + } + } + { + result, err := store.GetAllEventsSince(2) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(result) != 1 { + t.Fatalf("unexpected events: %v", result) + } + if result[0].Type != watch.Added { + t.Errorf("unexpected event type: %v", result[0].Type) + } + pod := makeTestPod("pod", uint64(3)) + if !apiequality.Semantic.DeepEqual(pod, result[0].Object) { + t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) + } + if result[0].PrevObject != nil { + t.Errorf("unexpected item: %v", result[0].PrevObject) + } + } + + store.Update(makeTestPod("pod", 4)) + store.Update(makeTestPod("pod", 5)) + + // Test with not full cache. + { + _, err := store.GetAllEventsSince(1) + if err == nil { + t.Errorf("expected error too old") + } + } + { + result, err := store.GetAllEventsSince(3) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(result) != 2 { + t.Fatalf("unexpected events: %v", result) + } + for i := 0; i < 2; i++ { + if result[i].Type != watch.Modified { + t.Errorf("unexpected event type: %v", result[i].Type) + } + pod := makeTestPod("pod", uint64(i+4)) + if !apiequality.Semantic.DeepEqual(pod, result[i].Object) { + t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) + } + prevPod := makeTestPod("pod", uint64(i+3)) + if !apiequality.Semantic.DeepEqual(prevPod, result[i].PrevObject) { + t.Errorf("unexpected item: %v, expected: %v", result[i].PrevObject, prevPod) + } + } + } + + for i := 6; i < 10; i++ { + store.Update(makeTestPod("pod", uint64(i))) + } + + // Test with full cache - there should be elements from 5 to 9. + { + _, err := store.GetAllEventsSince(3) + if err == nil { + t.Errorf("expected error too old") + } + } + { + result, err := store.GetAllEventsSince(4) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(result) != 5 { + t.Fatalf("unexpected events: %v", result) + } + for i := 0; i < 5; i++ { + pod := makeTestPod("pod", uint64(i+5)) + if !apiequality.Semantic.DeepEqual(pod, result[i].Object) { + t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) + } + } + } + + // Test for delete event. + store.Delete(makeTestPod("pod", uint64(10))) + + { + result, err := store.GetAllEventsSince(9) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(result) != 1 { + t.Fatalf("unexpected events: %v", result) + } + if result[0].Type != watch.Deleted { + t.Errorf("unexpected event type: %v", result[0].Type) + } + pod := makeTestPod("pod", uint64(10)) + if !apiequality.Semantic.DeepEqual(pod, result[0].Object) { + t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) + } + prevPod := makeTestPod("pod", uint64(9)) + if !apiequality.Semantic.DeepEqual(prevPod, result[0].PrevObject) { + t.Errorf("unexpected item: %v, expected: %v", result[0].PrevObject, prevPod) + } + } +} + +func TestWaitUntilFreshAndList(t *testing.T) { + store := newTestWatchCache(3) + + // In background, update the store. + go func() { + store.Add(makeTestPod("foo", 2)) + store.Add(makeTestPod("bar", 5)) + }() + + list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + } + if len(list) != 2 { + t.Errorf("unexpected list returned: %#v", list) + } +} + +func TestWaitUntilFreshAndGet(t *testing.T) { + store := newTestWatchCache(3) + + // In background, update the store. + go func() { + store.Add(makeTestPod("foo", 2)) + store.Add(makeTestPod("bar", 5)) + }() + + obj, exists, resourceVersion, err := store.WaitUntilFreshAndGet(5, "prefix/ns/bar", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + } + if !exists { + t.Fatalf("no results returned: %#v", obj) + } + if !apiequality.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/bar", Object: makeTestPod("bar", 5)}, obj) { + t.Errorf("unexpected element returned: %#v", obj) + } +} + +func TestWaitUntilFreshAndListTimeout(t *testing.T) { + store := newTestWatchCache(3) + fc := store.clock.(*clock.FakeClock) + + // In background, step clock after the below call starts the timer. + go func() { + for !fc.HasWaiters() { + time.Sleep(time.Millisecond) + } + fc.Step(blockTimeout) + + // Add an object to make sure the test would + // eventually fail instead of just waiting + // forever. + time.Sleep(30 * time.Second) + store.Add(makeTestPod("bar", 5)) + }() + + _, _, err := store.WaitUntilFreshAndList(5, nil) + if err == nil { + t.Fatalf("unexpected lack of timeout error") + } +} + +type testLW struct { + ListFunc func(options metav1.ListOptions) (runtime.Object, error) + WatchFunc func(options metav1.ListOptions) (watch.Interface, error) +} + +func (t *testLW) List(options metav1.ListOptions) (runtime.Object, error) { + return t.ListFunc(options) +} +func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) { + return t.WatchFunc(options) +} + +func TestReflectorForWatchCache(t *testing.T) { + store := newTestWatchCache(5) + + { + _, version, err := store.WaitUntilFreshAndList(0, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if version != 0 { + t.Errorf("unexpected resource version: %d", version) + } + } + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + fw := watch.NewFake() + go fw.Stop() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return &api.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil + }, + } + r := cache.NewReflector(lw, &api.Pod{}, store, 0) + r.ListAndWatch(wait.NeverStop) + + { + _, version, err := store.WaitUntilFreshAndList(10, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if version != 10 { + t.Errorf("unexpected resource version: %d", version) + } + } +}