move pkg/storage to apiserver

This commit is contained in:
deads2k 2017-01-31 11:47:19 -05:00
parent 8787eeb741
commit 55fd399ad4
14 changed files with 2913 additions and 0 deletions

41
pkg/storage/OWNERS Normal file
View File

@ -0,0 +1,41 @@
approvers:
- lavalamp
- liggitt
- timothysc
- wojtek-t
- xiang90
reviewers:
- thockin
- lavalamp
- smarterclayton
- wojtek-t
- deads2k
- derekwaynecarr
- caesarxuchao
- mikedanese
- liggitt
- erictune
- davidopp
- pmorie
- luxas
- janetkuo
- roberthbailey
- ncdc
- timstclair
- timothysc
- soltysh
- dims
- madhusudancs
- hongchaodeng
- krousey
- fgrzadkowski
- xiang90
- mml
- ingvagabund
- resouer
- mbohlool
- pweil-
- lixiaobing10051267
- mqliang
- feihujiang
- rrati

958
pkg/storage/cacher.go Normal file
View File

@ -0,0 +1,958 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"fmt"
"net/http"
"reflect"
"strconv"
"sync"
"time"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/client-go/tools/cache"
)
// CacherConfig contains the configuration for a given Cache.
type CacherConfig struct {
// Maximum size of the history cached in memory.
CacheCapacity int
// An underlying storage.Interface.
Storage Interface
// An underlying storage.Versioner.
Versioner Versioner
Copier runtime.ObjectCopier
// The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database.
Type interface{}
ResourcePrefix string
// KeyFunc is used to get a key in the underyling storage for a given object.
KeyFunc func(runtime.Object) (string, error)
// GetAttrsFunc is used to get object labels and fields.
GetAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
// TriggerPublisherFunc is used for optimizing amount of watchers that
// needs to process an incoming event.
TriggerPublisherFunc TriggerPublisherFunc
// NewList is a function that creates new empty object storing a list of
// objects of type Type.
NewListFunc func() runtime.Object
Codec runtime.Codec
}
type watchersMap map[int]*cacheWatcher
func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
wm[number] = w
}
func (wm watchersMap) deleteWatcher(number int) {
delete(wm, number)
}
func (wm watchersMap) terminateAll() {
for key, watcher := range wm {
delete(wm, key)
watcher.stop()
}
}
type indexedWatchers struct {
allWatchers watchersMap
valueWatchers map[string]watchersMap
}
func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
if supported {
if _, ok := i.valueWatchers[value]; !ok {
i.valueWatchers[value] = watchersMap{}
}
i.valueWatchers[value].addWatcher(w, number)
} else {
i.allWatchers.addWatcher(w, number)
}
}
func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {
if supported {
i.valueWatchers[value].deleteWatcher(number)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers.deleteWatcher(number)
}
}
func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
glog.Warningf("Terminating all watchers from cacher %v", objectType)
}
i.allWatchers.terminateAll()
for index, watchers := range i.valueWatchers {
watchers.terminateAll()
delete(i.valueWatchers, index)
}
}
type watchFilterFunc func(string, labels.Set, fields.Set) bool
// Cacher is responsible for serving WATCH and LIST requests for a given
// resource from its internal cache and updating its cache in the background
// based on the underlying storage contents.
// Cacher implements storage.Interface (although most of the calls are just
// delegated to the underlying storage).
type Cacher struct {
// HighWaterMarks for performance debugging.
// Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
incomingHWM HighWaterMark
// Incoming events that should be dispatched to watchers.
incoming chan watchCacheEvent
sync.RWMutex
// Before accessing the cacher's cache, wait for the ready to be ok.
// This is necessary to prevent users from accessing structures that are
// uninitialized or are being repopulated right now.
// ready needs to be set to false when the cacher is paused or stopped.
// ready needs to be set to true when the cacher is ready to use after
// initialization.
ready *ready
// Underlying storage.Interface.
storage Interface
copier runtime.ObjectCopier
// Expected type of objects in the underlying cache.
objectType reflect.Type
// "sliding window" of recent changes of objects and the current state.
watchCache *watchCache
reflector *cache.Reflector
// Versioner is used to handle resource versions.
versioner Versioner
// triggerFunc is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc TriggerPublisherFunc
// watchers is mapping from the value of trigger function that a
// watcher is interested into the watchers
watcherIdx int
watchers indexedWatchers
// Defines a time budget that can be spend on waiting for not-ready watchers
// while dispatching event before shutting them down.
dispatchTimeoutBudget *timeBudget
// Handling graceful termination.
stopLock sync.RWMutex
stopped bool
stopCh chan struct{}
stopWg sync.WaitGroup
}
// Create a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
if obj, ok := config.Type.(runtime.Object); ok {
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
panic("storage codec doesn't seem to match given type: " + err.Error())
}
}
stopCh := make(chan struct{})
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
copier: config.Copier,
objectType: reflect.TypeOf(config.Type),
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
versioner: config.Versioner,
triggerFunc: config.TriggerPublisherFunc,
watcherIdx: 0,
watchers: indexedWatchers{
allWatchers: make(map[int]*cacheWatcher),
valueWatchers: make(map[string]watchersMap),
},
// TODO: Figure out the correct value for the buffer size.
incoming: make(chan watchCacheEvent, 100),
dispatchTimeoutBudget: newTimeBudget(stopCh),
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: stopCh,
}
watchCache.SetOnEvent(cacher.processEvent)
go cacher.dispatchEvents()
cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
wait.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh)
}
}, time.Second, stopCh,
)
}()
return cacher
}
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
// It is safe to use the cache after a successful list until a disconnection.
// We start with usable (write) locked. The below OnReplace function will
// unlock it after a successful list. The below defer will then re-lock
// it when this function exits (always due to disconnection), only if
// we actually got a successful list. This cycle will repeat as needed.
successfulList := false
c.watchCache.SetOnReplace(func() {
successfulList = true
c.ready.set(true)
})
defer func() {
if successfulList {
c.ready.set(false)
}
}()
c.terminateAllWatchers()
// Note that since onReplace may be not called due to errors, we explicitly
// need to retry it on errors under lock.
// Also note that startCaching is called in a loop, so there's no need
// to have another loop here.
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
glog.Errorf("unexpected ListAndWatch error: %v", err)
}
}
// Implements storage.Interface.
func (c *Cacher) Versioner() Versioner {
return c.storage.Versioner()
}
// Implements storage.Interface.
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(ctx, key, obj, out, ttl)
}
// Implements storage.Interface.
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
return c.storage.Delete(ctx, key, out, preconditions)
}
// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
watchRV, err := ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
c.ready.wait()
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
// on return from this function.
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}
triggerValue, triggerSupported := "", false
// TODO: Currently we assume that in a given Cacher object, any <predicate> that is
// passed here is aware of exactly the same trigger (at most one).
// Thus, either 0 or 1 values will be returned.
if matchValues := pred.MatcherIndex(); len(matchValues) > 0 {
triggerValue, triggerSupported = matchValues[0].Value, true
}
// If there is triggerFunc defined, but triggerSupported is false,
// we can't narrow the amount of events significantly at this point.
//
// That said, currently triggerFunc is defined only for Pods and Nodes,
// and there is only constant number of watchers for which triggerSupported
// is false (excluding those issues explicitly by users).
// Thus, to reduce the risk of those watchers blocking all watchers of a
// given resource in the system, we increase the sizes of buffers for them.
chanSize := 10
if c.triggerFunc != nil && !triggerSupported {
// TODO: We should tune this value and ideally make it dependent on the
// number of objects of a given type and/or their churn.
chanSize = 1000
}
c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(c.copier, watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
return watcher, nil
}
// Implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
return c.Watch(ctx, key, resourceVersion, pred)
}
// Implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
if resourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
getRV, err := ParseListResourceVersion(resourceVersion)
if err != nil {
return err
}
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
c.ready.wait()
objVal, err := conversion.EnforcePtr(objPtr)
if err != nil {
return err
}
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
if err != nil {
return err
}
if exists {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
objVal.Set(reflect.ValueOf(elem.Object).Elem())
} else {
objVal.Set(reflect.Zero(objVal.Type()))
if !ignoreNotFound {
return NewKeyNotFoundError(key, int64(readResourceVersion))
}
}
return nil
}
// Implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
if resourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := ParseListResourceVersion(resourceVersion)
if err != nil {
return err
}
trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
defer trace.LogIfLong(500 * time.Millisecond)
c.ready.wait()
trace.Step("Ready")
// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil || listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterFunction(key, pred)
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
if err != nil {
return err
}
trace.Step("Got from cache")
if exists {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Object) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
return err
}
}
return nil
}
// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
if resourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := ParseListResourceVersion(resourceVersion)
if err != nil {
return err
}
trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
defer trace.LogIfLong(500 * time.Millisecond)
c.ready.wait()
trace.Step("Ready")
// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil || listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterFunction(key, pred)
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
if err != nil {
return err
}
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
// Resize the slice appropriately, since we already know that none
// of the elements will be filtered out.
listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
trace.Step("Resized result")
}
for _, obj := range objs {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Object) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
return err
}
}
return nil
}
// Implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
glog.Errorf("GetByKey returned error: %v", err)
} else if exists {
currObj, copyErr := c.copier.Copy(elem.(*storeElement).Object)
if copyErr == nil {
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)
}
glog.Errorf("couldn't copy object: %v", copyErr)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
}
func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
// TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
// is aware of exactly the same trigger (at most one). Thus calling:
// c.triggerFunc(<some object>)
// can return only 0 or 1 values.
// That means, that triggerValues itself may return up to 2 different values.
if c.triggerFunc == nil {
return nil, false
}
result := make([]string, 0, 2)
matchValues := c.triggerFunc(event.Object)
if len(matchValues) > 0 {
result = append(result, matchValues[0].Value)
}
if event.PrevObject == nil {
return result, len(result) > 0
}
prevMatchValues := c.triggerFunc(event.PrevObject)
if len(prevMatchValues) > 0 {
if len(result) == 0 || result[0] != prevMatchValues[0].Value {
result = append(result, prevMatchValues[0].Value)
}
}
return result, len(result) > 0
}
func (c *Cacher) processEvent(event *watchCacheEvent) {
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
}
c.incoming <- *event
}
func (c *Cacher) dispatchEvents() {
for {
select {
case event, ok := <-c.incoming:
if !ok {
return
}
c.dispatchEvent(&event)
case <-c.stopCh:
return
}
}
}
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)
c.Lock()
defer c.Unlock()
// Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers {
watcher.add(event, c.dispatchTimeoutBudget)
}
if supported {
// Iterate over watchers interested in the given values of the trigger.
for _, triggerValue := range triggerValues {
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
watcher.add(event, c.dispatchTimeoutBudget)
}
}
} else {
// supported equal to false generally means that trigger function
// is not defined (or not aware of any indexes). In this case,
// watchers filters should generally also don't generate any
// trigger values, but can cause problems in case of some
// misconfiguration. Thus we paranoidly leave this branch.
// Iterate over watchers interested in exact values for all values.
for _, watchers := range c.watchers.valueWatchers {
for _, watcher := range watchers {
watcher.add(event, c.dispatchTimeoutBudget)
}
}
}
}
func (c *Cacher) terminateAllWatchers() {
c.Lock()
defer c.Unlock()
c.watchers.terminateAll(c.objectType)
}
func (c *Cacher) isStopped() bool {
c.stopLock.RLock()
defer c.stopLock.RUnlock()
return c.stopped
}
func (c *Cacher) Stop() {
c.stopLock.Lock()
c.stopped = true
c.stopLock.Unlock()
close(c.stopCh)
c.stopWg.Wait()
}
func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {
return func(lock bool) {
if lock {
c.Lock()
defer c.Unlock()
}
// It's possible that the watcher is already not in the structure (e.g. in case of
// simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
c.watchers.deleteWatcher(index, triggerValue, triggerSupported)
}
}
func filterFunction(key string, p SelectionPredicate) func(string, runtime.Object) bool {
f := SimpleFilter(p)
filterFunc := func(objKey string, obj runtime.Object) bool {
if !hasPathPrefix(objKey, key) {
return false
}
return f(obj)
}
return filterFunc
}
func watchFilterFunction(key string, p SelectionPredicate) watchFilterFunc {
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
if !hasPathPrefix(objKey, key) {
return false
}
return p.MatchesLabelsAndFields(label, field)
}
return filterFunc
}
// Returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
c.ready.wait()
resourceVersion := c.reflector.LastSyncResourceVersion()
if resourceVersion == "" {
return 0, nil
}
return strconv.ParseUint(resourceVersion, 10, 64)
}
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
storage Interface
resourcePrefix string
newListFunc func() runtime.Object
}
func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &cacherListerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
return nil, err
}
return list, nil
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
}
// cacherWatch implements watch.Interface to return a single error
type errWatcher struct {
result chan watch.Event
}
func newErrWatcher(err error) *errWatcher {
// Create an error event
errEvent := watch.Event{Type: watch.Error}
switch err := err.(type) {
case runtime.Object:
errEvent.Object = err
case *errors.StatusError:
errEvent.Object = &err.ErrStatus
default:
errEvent.Object = &metav1.Status{
Status: metav1.StatusFailure,
Message: err.Error(),
Reason: metav1.StatusReasonInternalError,
Code: http.StatusInternalServerError,
}
}
// Create a watcher with room for a single event, populate it, and close the channel
watcher := &errWatcher{result: make(chan watch.Event, 1)}
watcher.result <- errEvent
close(watcher.result)
return watcher
}
// Implements watch.Interface.
func (c *errWatcher) ResultChan() <-chan watch.Event {
return c.result
}
// Implements watch.Interface.
func (c *errWatcher) Stop() {
// no-op
}
// cacherWatch implements watch.Interface
type cacheWatcher struct {
sync.Mutex
copier runtime.ObjectCopier
input chan watchCacheEvent
result chan watch.Event
done chan struct{}
filter watchFilterFunc
stopped bool
forget func(bool)
}
func newCacheWatcher(copier runtime.ObjectCopier, resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
copier: copier,
input: make(chan watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
}
go watcher.process(initEvents, resourceVersion)
return watcher
}
// Implements watch.Interface.
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
return c.result
}
// Implements watch.Interface.
func (c *cacheWatcher) Stop() {
c.forget(true)
c.stop()
}
func (c *cacheWatcher) stop() {
c.Lock()
defer c.Unlock()
if !c.stopped {
c.stopped = true
close(c.done)
close(c.input)
}
}
var timerPool sync.Pool
func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
// Try to send the event immediately, without blocking.
select {
case c.input <- *event:
return
default:
}
// OK, block sending, but only for up to <timeout>.
// cacheWatcher.add is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time.Now()
timeout := budget.takeAvailable()
t, ok := timerPool.Get().(*time.Timer)
if ok {
t.Reset(timeout)
} else {
t = time.NewTimer(timeout)
}
defer timerPool.Put(t)
select {
case c.input <- *event:
stopped := t.Stop()
if !stopped {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-t.C
}
case <-t.C:
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
c.forget(false)
c.stop()
}
budget.returnUnused(timeout - time.Since(startTime))
}
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.
return
}
object, err := c.copier.Copy(event.Object)
if err != nil {
glog.Errorf("unexpected copy error: %v", err)
return
}
var watchEvent watch.Event
switch {
case curObjPasses && !oldObjPasses:
watchEvent = watch.Event{Type: watch.Added, Object: object}
case curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Modified, Object: object}
case !curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Deleted, Object: object}
}
// We need to ensure that if we put event X to the c.result, all
// previous events were already put into it before, no matter whether
// c.done is close or not.
// Thus we cannot simply select from c.done and c.result and this
// would give us non-determinism.
// At the same time, we don't want to block infinitely on putting
// to c.result, when c.done is already closed.
// This ensures that with c.done already close, we at most once go
// into the next select after this. With that, no matter which
// statement we choose there, we will deliver only consecutive
// events.
select {
case <-c.done:
return
default:
}
select {
case c.result <- watchEvent:
case <-c.done:
}
}
func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash()
// Check how long we are processing initEvents.
// As long as these are not processed, we are not processing
// any incoming events, so if it takes long, we may actually
// block all watchers for some time.
// TODO: From the logs it seems that there happens processing
// times even up to 1s which is very long. However, this doesn't
// depend that much on the number of initEvents. E.g. from the
// 2000-node Kubemark run we have logs like this, e.g.:
// ... processing 13862 initEvents took 66.808689ms
// ... processing 14040 initEvents took 993.532539ms
// We should understand what is blocking us in those cases (e.g.
// is it lack of CPU, network, or sth else) and potentially
// consider increase size of result buffer in those cases.
const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now()
for _, event := range initEvents {
c.sendWatchCacheEvent(event)
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
objType := "<null>"
if len(initEvents) > 0 {
objType = reflect.TypeOf(initEvents[0].Object).String()
}
glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
}
defer close(c.result)
defer c.Stop()
for {
event, ok := <-c.input
if !ok {
return
}
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(&event)
}
}
}
type ready struct {
ok bool
c *sync.Cond
}
func newReady() *ready {
return &ready{c: sync.NewCond(&sync.Mutex{})}
}
func (r *ready) wait() {
r.c.L.Lock()
for !r.ok {
r.c.Wait()
}
r.c.L.Unlock()
}
func (r *ready) set(ok bool) {
r.c.L.Lock()
defer r.c.L.Unlock()
r.ok = ok
r.c.Broadcast()
}

View File

@ -0,0 +1,56 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/pkg/api"
)
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
// the writes to cacheWatcher.result channel is blocked.
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
var lock sync.RWMutex
count := 0
filter := func(string, labels.Set, fields.Set) bool { return true }
forget := func(bool) {
lock.Lock()
defer lock.Unlock()
count++
}
initEvents := []*watchCacheEvent{
{Object: &api.Pod{}},
{Object: &api.Pod{}},
}
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
w := newCacheWatcher(api.Scheme, 0, 0, initEvents, filter, forget)
w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock()
defer lock.RUnlock()
return count == 2, nil
}); err != nil {
t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err)
}
}

18
pkg/storage/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Interfaces for database-related operations.
package storage // import "k8s.io/apiserver/pkg/storage"

170
pkg/storage/errors.go Normal file
View File

@ -0,0 +1,170 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"fmt"
"k8s.io/apimachinery/pkg/util/validation/field"
)
const (
ErrCodeKeyNotFound int = iota + 1
ErrCodeKeyExists
ErrCodeResourceVersionConflicts
ErrCodeInvalidObj
ErrCodeUnreachable
)
var errCodeToMessage = map[int]string{
ErrCodeKeyNotFound: "key not found",
ErrCodeKeyExists: "key exists",
ErrCodeResourceVersionConflicts: "resource version conflicts",
ErrCodeInvalidObj: "invalid object",
ErrCodeUnreachable: "server unreachable",
}
func NewKeyNotFoundError(key string, rv int64) *StorageError {
return &StorageError{
Code: ErrCodeKeyNotFound,
Key: key,
ResourceVersion: rv,
}
}
func NewKeyExistsError(key string, rv int64) *StorageError {
return &StorageError{
Code: ErrCodeKeyExists,
Key: key,
ResourceVersion: rv,
}
}
func NewResourceVersionConflictsError(key string, rv int64) *StorageError {
return &StorageError{
Code: ErrCodeResourceVersionConflicts,
Key: key,
ResourceVersion: rv,
}
}
func NewUnreachableError(key string, rv int64) *StorageError {
return &StorageError{
Code: ErrCodeUnreachable,
Key: key,
ResourceVersion: rv,
}
}
func NewInvalidObjError(key, msg string) *StorageError {
return &StorageError{
Code: ErrCodeInvalidObj,
Key: key,
AdditionalErrorMsg: msg,
}
}
type StorageError struct {
Code int
Key string
ResourceVersion int64
AdditionalErrorMsg string
}
func (e *StorageError) Error() string {
return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %s",
errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.AdditionalErrorMsg)
}
// IsNotFound returns true if and only if err is "key" not found error.
func IsNotFound(err error) bool {
return isErrCode(err, ErrCodeKeyNotFound)
}
// IsNodeExist returns true if and only if err is an node already exist error.
func IsNodeExist(err error) bool {
return isErrCode(err, ErrCodeKeyExists)
}
// IsUnreachable returns true if and only if err indicates the server could not be reached.
func IsUnreachable(err error) bool {
return isErrCode(err, ErrCodeUnreachable)
}
// IsConflict returns true if and only if err is a write conflict.
func IsConflict(err error) bool {
return isErrCode(err, ErrCodeResourceVersionConflicts)
}
// IsInvalidObj returns true if and only if err is invalid error
func IsInvalidObj(err error) bool {
return isErrCode(err, ErrCodeInvalidObj)
}
func isErrCode(err error, code int) bool {
if err == nil {
return false
}
if e, ok := err.(*StorageError); ok {
return e.Code == code
}
return false
}
// InvalidError is generated when an error caused by invalid API object occurs
// in the storage package.
type InvalidError struct {
Errs field.ErrorList
}
func (e InvalidError) Error() string {
return e.Errs.ToAggregate().Error()
}
// IsInvalidError returns true if and only if err is an InvalidError.
func IsInvalidError(err error) bool {
_, ok := err.(InvalidError)
return ok
}
func NewInvalidError(errors field.ErrorList) InvalidError {
return InvalidError{errors}
}
// InternalError is generated when an error occurs in the storage package, i.e.,
// not from the underlying storage backend (e.g., etcd).
type InternalError struct {
Reason string
}
func (e InternalError) Error() string {
return e.Reason
}
// IsInternalError returns true if and only if err is an InternalError.
func IsInternalError(err error) bool {
_, ok := err.(InternalError)
return ok
}
func NewInternalError(reason string) InternalError {
return InternalError{reason}
}
func NewInternalErrorf(format string, a ...interface{}) InternalError {
return InternalError{fmt.Sprintf(format, a)}
}

181
pkg/storage/interfaces.go Normal file
View File

@ -0,0 +1,181 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
)
// Versioner abstracts setting and retrieving metadata fields from database response
// onto the object ot list.
type Versioner interface {
// UpdateObject sets storage metadata into an API object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
// from database.
UpdateObject(obj runtime.Object, resourceVersion uint64) error
// UpdateList sets the resource version into an API list object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
// from database.
UpdateList(obj runtime.Object, resourceVersion uint64) error
// ObjectResourceVersion returns the resource version (for persistence) of the specified object.
// Should return an error if the specified object does not have a persistable version.
ObjectResourceVersion(obj runtime.Object) (uint64, error)
}
// ResponseMeta contains information about the database metadata that is associated with
// an object. It abstracts the actual underlying objects to prevent coupling with concrete
// database and to improve testability.
type ResponseMeta struct {
// TTL is the time to live of the node that contained the returned object. It may be
// zero or negative in some cases (objects may be expired after the requested
// expiration time due to server lag).
TTL int64
// The resource version of the node that contained the returned object.
ResourceVersion uint64
}
// MatchValue defines a pair (<index name>, <value for that index>).
type MatchValue struct {
IndexName string
Value string
}
// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs
// (<index name>, <index value for the given object>) for all indexes known
// to that function.
type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
// FilterFunc takes an API object and returns true if the object satisfies some requirements.
// TODO: We will remove this type and use SelectionPredicate everywhere.
type FilterFunc func(obj runtime.Object) bool
// Everything accepts all objects.
var Everything = SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
}
// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update
// that is guaranteed to succeed.
// See the comment for GuaranteedUpdate for more details.
type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
// Preconditions must be fulfilled before an operation (update, delete, etc.) is carried out.
type Preconditions struct {
// Specifies the target UID.
// +optional
UID *types.UID `json:"uid,omitempty"`
}
// NewUIDPreconditions returns a Preconditions with UID set.
func NewUIDPreconditions(uid string) *Preconditions {
u := types.UID(uid)
return &Preconditions{UID: &u}
}
// Interface offers a common interface for object marshaling/unmarshaling operations and
// hides all the storage-related operations behind it.
type Interface interface {
// Returns Versioner associated with this interface.
Versioner() Versioner
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from database.
Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
// Delete removes the specified key and returns the value that existed at that spot.
// If key didn't exist, it will return NotFound storage error.
Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items selected by 'p' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts.
Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item selected by 'p' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will list current objects directory defined by key
// and send them in "ADDED" events, before watch starts.
WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
// Treats empty responses and nil response nodes exactly like a not found error.
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
// GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is index conflict.
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
// the current contents of the object when deciding how the update object should look.
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
// or zero value in 'ptrToType' parameter otherwise.
// If the object to update has the same value as previous, it won't do any update
// but will return the object in 'ptrToType' parameter.
// If 'suggestion' can contain zero or one element - in such case this can be used as
// a suggestion about the current version of the object to avoid read operation from
// storage to get it.
//
// Example:
//
// s := /* implementation of Interface */
// err := s.GuaranteedUpdate(
// "myKey", &MyType{}, true,
// func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
// // Before each incovation of the user defined function, "input" is reset to
// // current contents for "myKey" in database.
// curr := input.(*MyType) // Guaranteed to succeed.
//
// // Make the modification
// curr.Counter++
//
// // Return the modified object - return an error to stop iterating. Return
// // a uint64 to alter the TTL on the object, or nil to keep it the same value.
// return cur, nil, nil
// }
// })
GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
}

View File

@ -0,0 +1,90 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)
// AttrFunc returns label and field sets for List or Watch to match.
// In any failure to parse given object, it returns error.
type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, error)
// SelectionPredicate is used to represent the way to select objects from api storage.
type SelectionPredicate struct {
Label labels.Selector
Field fields.Selector
GetAttrs AttrFunc
IndexFields []string
}
// Matches returns true if the given object's labels and fields (as
// returned by s.GetAttrs) match s.Label and s.Field. An error is
// returned if s.GetAttrs fails.
func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
if s.Label.Empty() && s.Field.Empty() {
return true, nil
}
labels, fields, err := s.GetAttrs(obj)
if err != nil {
return false, err
}
matched := s.Label.Matches(labels)
if matched && s.Field != nil {
matched = (matched && s.Field.Matches(fields))
}
return matched, nil
}
// MatchesLabelsAndFields returns true if the given labels and fields
// match s.Label and s.Field.
func (s *SelectionPredicate) MatchesLabelsAndFields(l labels.Set, f fields.Set) bool {
if s.Label.Empty() && s.Field.Empty() {
return true
}
matched := s.Label.Matches(l)
if matched && s.Field != nil {
matched = (matched && s.Field.Matches(f))
}
return matched
}
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
// name.
func (s *SelectionPredicate) MatchesSingle() (string, bool) {
// TODO: should be namespace.name
if name, ok := s.Field.RequiresExactMatch("metadata.name"); ok {
return name, true
}
return "", false
}
// For any index defined by IndexFields, if a matcher can match only (a subset)
// of objects that return <value> for a given index, a pair (<index name>, <value>)
// wil be returned.
// TODO: Consider supporting also labels.
func (s *SelectionPredicate) MatcherIndex() []MatchValue {
var result []MatchValue
for _, field := range s.IndexFields {
if value, ok := s.Field.RequiresExactMatch(field); ok {
result = append(result, MatchValue{IndexName: field, Value: value})
}
}
return result
}

View File

@ -0,0 +1,119 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"errors"
"testing"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type Ignored struct {
ID string
}
type IgnoredList struct {
Items []Ignored
}
func (obj *Ignored) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
func (obj *IgnoredList) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
func TestSelectionPredicate(t *testing.T) {
table := map[string]struct {
labelSelector, fieldSelector string
labels labels.Set
fields fields.Set
err error
shouldMatch bool
matchSingleKey string
}{
"A": {
labelSelector: "name=foo",
fieldSelector: "uid=12345",
labels: labels.Set{"name": "foo"},
fields: fields.Set{"uid": "12345"},
shouldMatch: true,
},
"B": {
labelSelector: "name=foo",
fieldSelector: "uid=12345",
labels: labels.Set{"name": "foo"},
fields: fields.Set{},
shouldMatch: false,
},
"C": {
labelSelector: "name=foo",
fieldSelector: "uid=12345",
labels: labels.Set{},
fields: fields.Set{"uid": "12345"},
shouldMatch: false,
},
"D": {
fieldSelector: "metadata.name=12345",
labels: labels.Set{},
fields: fields.Set{"metadata.name": "12345"},
shouldMatch: true,
matchSingleKey: "12345",
},
"error": {
labelSelector: "name=foo",
fieldSelector: "uid=12345",
err: errors.New("maybe this is a 'wrong object type' error"),
shouldMatch: false,
},
}
for name, item := range table {
parsedLabel, err := labels.Parse(item.labelSelector)
if err != nil {
panic(err)
}
parsedField, err := fields.ParseSelector(item.fieldSelector)
if err != nil {
panic(err)
}
sp := &SelectionPredicate{
Label: parsedLabel,
Field: parsedField,
GetAttrs: func(runtime.Object) (label labels.Set, field fields.Set, err error) {
return item.labels, item.fields, item.err
},
}
got, err := sp.Matches(&Ignored{})
if e, a := item.err, err; e != a {
t.Errorf("%v: expected %v, got %v", name, e, a)
continue
}
if e, a := item.shouldMatch, got; e != a {
t.Errorf("%v: expected %v, got %v", name, e, a)
}
if key := item.matchSingleKey; key != "" {
got, ok := sp.MatchesSingle()
if !ok {
t.Errorf("%v: expected single match", name)
}
if e, a := key, got; e != a {
t.Errorf("%v: expected %v, got %v", name, e, a)
}
}
}
}

View File

@ -0,0 +1,95 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"sync"
"time"
)
const (
refreshPerSecond = 50 * time.Millisecond
maxBudget = 250 * time.Millisecond
)
// timeBudget implements a budget of time that you can use and is
// periodically being refreshed. The pattern to use it is:
// budget := newTimeBudget(...)
// ...
// timeout := budget.takeAvailable()
// // Now you can spend at most timeout on doing stuff
// ...
// // If you didn't use all timeout, return what you didn't use
// budget.returnUnused(<unused part of timeout>)
//
// NOTE: It's not recommended to be used concurrently from multiple threads -
// if first user takes the whole timeout, the second one will get 0 timeout
// even though the first one may return something later.
type timeBudget struct {
sync.Mutex
budget time.Duration
refresh time.Duration
maxBudget time.Duration
}
func newTimeBudget(stopCh <-chan struct{}) *timeBudget {
result := &timeBudget{
budget: time.Duration(0),
refresh: refreshPerSecond,
maxBudget: maxBudget,
}
go result.periodicallyRefresh(stopCh)
return result
}
func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
t.Lock()
if t.budget = t.budget + t.refresh; t.budget > t.maxBudget {
t.budget = t.maxBudget
}
t.Unlock()
case <-stopCh:
return
}
}
}
func (t *timeBudget) takeAvailable() time.Duration {
t.Lock()
defer t.Unlock()
result := t.budget
t.budget = time.Duration(0)
return result
}
func (t *timeBudget) returnUnused(unused time.Duration) {
t.Lock()
defer t.Unlock()
if unused < 0 {
// We used more than allowed.
return
}
if t.budget = t.budget + unused; t.budget > t.maxBudget {
t.budget = t.maxBudget
}
}

View File

@ -0,0 +1,53 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"testing"
"time"
)
func TestTimeBudget(t *testing.T) {
budget := &timeBudget{
budget: time.Duration(0),
maxBudget: time.Duration(200),
}
if res := budget.takeAvailable(); res != time.Duration(0) {
t.Errorf("Expected: %v, got: %v", time.Duration(0), res)
}
budget.budget = time.Duration(100)
if res := budget.takeAvailable(); res != time.Duration(100) {
t.Errorf("Expected: %v, got: %v", time.Duration(100), res)
}
if res := budget.takeAvailable(); res != time.Duration(0) {
t.Errorf("Expected: %v, got: %v", time.Duration(0), res)
}
budget.returnUnused(time.Duration(50))
if res := budget.takeAvailable(); res != time.Duration(50) {
t.Errorf("Expected: %v, got: %v", time.Duration(50), res)
}
budget.budget = time.Duration(100)
budget.returnUnused(-time.Duration(50))
if res := budget.takeAvailable(); res != time.Duration(100) {
t.Errorf("Expected: %v, got: %v", time.Duration(100), res)
}
// test overflow.
budget.returnUnused(time.Duration(500))
if res := budget.takeAvailable(); res != time.Duration(200) {
t.Errorf("Expected: %v, got: %v", time.Duration(200), res)
}
}

161
pkg/storage/util.go Normal file
View File

@ -0,0 +1,161 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"fmt"
"strconv"
"strings"
"sync/atomic"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/validation/path"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
)
type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error)
// SimpleUpdateFunc converts SimpleUpdateFunc into UpdateFunc
func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) {
out, err := fn(input)
return out, nil, err
}
}
// SimpleFilter converts a selection predicate into a FilterFunc.
// It ignores any error from Matches().
func SimpleFilter(p SelectionPredicate) FilterFunc {
return func(obj runtime.Object) bool {
matches, err := p.Matches(obj)
if err != nil {
glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err)
return false
}
return matches
}
}
func EverythingFunc(runtime.Object) bool {
return true
}
func NoTriggerFunc() []MatchValue {
return nil
}
func NoTriggerPublisher(runtime.Object) []MatchValue {
return nil
}
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, NewInvalidError(field.ErrorList{
// Validation errors are supposed to return version-specific field
// paths, but this is probably close enough.
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
})
}
return version, nil
}
// ParseListResourceVersion takes a resource version argument and converts it to
// the etcd version.
func ParseListResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
return version, err
}
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return "", err
}
name := meta.GetName()
if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
return "", fmt.Errorf("invalid name: %v", msgs)
}
return prefix + "/" + meta.GetNamespace() + "/" + name, nil
}
func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return "", err
}
name := meta.GetName()
if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
return "", fmt.Errorf("invalid name: %v", msgs)
}
return prefix + "/" + name, nil
}
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
func hasPathPrefix(s, pathPrefix string) bool {
// Short circuit if s doesn't contain the prefix at all
if !strings.HasPrefix(s, pathPrefix) {
return false
}
pathPrefixLength := len(pathPrefix)
if len(s) == pathPrefixLength {
// Exact match
return true
}
if strings.HasSuffix(pathPrefix, "/") {
// pathPrefix already ensured a path segment boundary
return true
}
if s[pathPrefixLength:pathPrefixLength+1] == "/" {
// The next character in s is a path segment boundary
// Check this instead of normalizing pathPrefix to avoid allocating on every call
return true
}
return false
}
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64
// Update returns true if and only if 'current' is the highest value ever seen.
func (hwm *HighWaterMark) Update(current int64) bool {
for {
old := atomic.LoadInt64((*int64)(hwm))
if current <= old {
return false
}
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
return true
}
}
}

136
pkg/storage/util_test.go Normal file
View File

@ -0,0 +1,136 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"math/rand"
"sync"
"testing"
)
func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
Version string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 1},
{Version: "10", ExpectVersion: 10},
}
for _, testCase := range testCases {
version, err := ParseWatchResourceVersion(testCase.Version)
switch {
case testCase.Err:
if err == nil {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !IsInvalidError(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
case !testCase.Err && err != nil:
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
if version != testCase.ExpectVersion {
t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
}
}
}
func TestHasPathPrefix(t *testing.T) {
validTestcases := []struct {
s string
prefix string
}{
// Exact matches
{"", ""},
{"a", "a"},
{"a/", "a/"},
{"a/../", "a/../"},
// Path prefix matches
{"a/b", "a"},
{"a/b", "a/"},
{"中文/", "中文"},
}
for i, tc := range validTestcases {
if !hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix)
}
}
invalidTestcases := []struct {
s string
prefix string
}{
// Mismatch
{"a", "b"},
// Dir requirement
{"a", "a/"},
// Prefix mismatch
{"ns2", "ns"},
{"ns2", "ns/"},
{"中文文", "中文"},
// Ensure no normalization is applied
{"a/c/../b/", "a/b/"},
{"a/", "a/b/.."},
}
for i, tc := range invalidTestcases {
if hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix)
}
}
}
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark
for i := int64(10); i < 20; i++ {
if !h.Update(i) {
t.Errorf("unexpected false for %v", i)
}
if h.Update(i - 1) {
t.Errorf("unexpected true for %v", i-1)
}
}
m := int64(0)
wg := sync.WaitGroup{}
for i := 0; i < 300; i++ {
wg.Add(1)
v := rand.Int63()
go func(v int64) {
defer wg.Done()
h.Update(v)
}(v)
if v > m {
m = v
}
}
wg.Wait()
if m != int64(h) {
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
}
}

468
pkg/storage/watch_cache.go Normal file
View File

@ -0,0 +1,468 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"fmt"
"sort"
"strconv"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/clock"
)
const (
// blockTimeout determines how long we're willing to block the request
// to wait for a given resource version to be propagated to cache,
// before terminating request and returning Timeout error with retry
// after suggestion.
blockTimeout = 3 * time.Second
)
// watchCacheEvent is a single "watch event" that is send to users of
// watchCache. Additionally to a typical "watch.Event" it contains
// the previous value of the object to enable proper filtering in the
// upper layers.
type watchCacheEvent struct {
Type watch.EventType
Object runtime.Object
ObjLabels labels.Set
ObjFields fields.Set
PrevObject runtime.Object
PrevObjLabels labels.Set
PrevObjFields fields.Set
Key string
ResourceVersion uint64
}
// Computing a key of an object is generally non-trivial (it performs
// e.g. validation underneath). To avoid computing it multiple times
// (to serve the event in different List/Watch requests), in the
// underlying store we are keeping pair (key, object).
type storeElement struct {
Key string
Object runtime.Object
}
func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {
return "", fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Key, nil
}
// watchCacheElement is a single "watch event" stored in a cache.
// It contains the resource version of the object and the object
// itself.
type watchCacheElement struct {
resourceVersion uint64
watchCacheEvent *watchCacheEvent
}
// watchCache implements a Store interface.
// However, it depends on the elements implementing runtime.Object interface.
//
// watchCache is a "sliding window" (with a limited capacity) of objects
// observed from a watch.
type watchCache struct {
sync.RWMutex
// Condition on which lists are waiting for the fresh enough
// resource version.
cond *sync.Cond
// Maximum size of history window.
capacity int
// keyFunc is used to get a key in the underlying storage for a given object.
keyFunc func(runtime.Object) (string, error)
// getAttrsFunc is used to get labels and fields of an object.
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
// cache is used a cyclic buffer - its first element (with the smallest
// resourceVersion) is defined by startIndex, its last element is defined
// by endIndex (if cache is full it will be startIndex + capacity).
// Both startIndex and endIndex can be greater than buffer capacity -
// you should always apply modulo capacity to get an index in cache array.
cache []watchCacheElement
startIndex int
endIndex int
// store will effectively support LIST operation from the "end of cache
// history" i.e. from the moment just after the newest cached watched event.
// It is necessary to effectively allow clients to start watching at now.
// NOTE: We assume that <store> is thread-safe.
store cache.Store
// ResourceVersion up to which the watchCache is propagated.
resourceVersion uint64
// This handler is run at the end of every successful Replace() method.
onReplace func()
// This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object.
onEvent func(*watchCacheEvent)
// for testing timeouts.
clock clock.Clock
}
func newWatchCache(
capacity int,
keyFunc func(runtime.Object) (string, error),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)) *watchCache {
wc := &watchCache{
capacity: capacity,
keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc,
cache: make([]watchCacheElement, capacity),
startIndex: 0,
endIndex: 0,
store: cache.NewStore(storeElementKey),
resourceVersion: 0,
clock: clock.RealClock{},
}
wc.cond = sync.NewCond(wc.RLocker())
return wc
}
// Add takes runtime.Object as an argument.
func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Added, Object: object}
f := func(elem *storeElement) error { return w.store.Add(elem) }
return w.processEvent(event, resourceVersion, f)
}
// Update takes runtime.Object as an argument.
func (w *watchCache) Update(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Modified, Object: object}
f := func(elem *storeElement) error { return w.store.Update(elem) }
return w.processEvent(event, resourceVersion, f)
}
// Delete takes runtime.Object as an argument.
func (w *watchCache) Delete(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Deleted, Object: object}
f := func(elem *storeElement) error { return w.store.Delete(elem) }
return w.processEvent(event, resourceVersion, f)
}
func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
object, ok := obj.(runtime.Object)
if !ok {
return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
}
meta, err := meta.Accessor(object)
if err != nil {
return nil, 0, err
}
resourceVersion, err := parseResourceVersion(meta.GetResourceVersion())
if err != nil {
return nil, 0, err
}
return object, resourceVersion, nil
}
func parseResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" {
return 0, nil
}
// Use bitsize being the size of int on the machine.
return strconv.ParseUint(resourceVersion, 10, 0)
}
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
key, err := w.keyFunc(event.Object)
if err != nil {
return fmt.Errorf("couldn't compute key: %v", err)
}
elem := &storeElement{Key: key, Object: event.Object}
// TODO: We should consider moving this lock below after the watchCacheEvent
// is created. In such situation, the only problematic scenario is Replace(
// happening after getting object from store and before acquiring a lock.
// Maybe introduce another lock for this purpose.
w.Lock()
defer w.Unlock()
previous, exists, err := w.store.Get(elem)
if err != nil {
return err
}
objLabels, objFields, err := w.getAttrsFunc(event.Object)
if err != nil {
return err
}
var prevObject runtime.Object
var prevObjLabels labels.Set
var prevObjFields fields.Set
if exists {
prevObject = previous.(*storeElement).Object
prevObjLabels, prevObjFields, err = w.getAttrsFunc(prevObject)
if err != nil {
return err
}
}
watchCacheEvent := &watchCacheEvent{
Type: event.Type,
Object: event.Object,
ObjLabels: objLabels,
ObjFields: objFields,
PrevObject: prevObject,
PrevObjLabels: prevObjLabels,
PrevObjFields: prevObjFields,
Key: key,
ResourceVersion: resourceVersion,
}
if w.onEvent != nil {
w.onEvent(watchCacheEvent)
}
w.updateCache(resourceVersion, watchCacheEvent)
w.resourceVersion = resourceVersion
w.cond.Broadcast()
return updateFunc(elem)
}
// Assumes that lock is already held for write.
func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity {
// Cache is full - remove the oldest element.
w.startIndex++
}
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
w.endIndex++
}
// List returns list of pointers to <storeElement> objects.
func (w *watchCache) List() []interface{} {
return w.store.List()
}
// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>.
// NOTE: This function acquired lock and doesn't release it.
// You HAVE TO explicitly call w.RUnlock() after this function.
func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utiltrace.Trace) error {
startTime := w.clock.Now()
go func() {
// Wake us up when the time limit has expired. The docs
// promise that time.After (well, NewTimer, which it calls)
// will wait *at least* the duration given. Since this go
// routine starts sometime after we record the start time, and
// it will wake up the loop below sometime after the broadcast,
// we don't need to worry about waking it up before the time
// has expired accidentally.
<-w.clock.After(blockTimeout)
w.cond.Broadcast()
}()
w.RLock()
if trace != nil {
trace.Step("watchCache locked acquired")
}
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= blockTimeout {
// Timeout with retry after 1 second.
return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1)
}
w.cond.Wait()
}
if trace != nil {
trace.Step("watchCache fresh enough")
}
return nil
}
// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *utiltrace.Trace) ([]interface{}, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
defer w.RUnlock()
if err != nil {
return nil, 0, err
}
return w.store.List(), w.resourceVersion, nil
}
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *utiltrace.Trace) (interface{}, bool, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
defer w.RUnlock()
if err != nil {
return nil, false, 0, err
}
value, exists, err := w.store.GetByKey(key)
return value, exists, w.resourceVersion, err
}
func (w *watchCache) ListKeys() []string {
return w.store.ListKeys()
}
// Get takes runtime.Object as a parameter. However, it returns
// pointer to <storeElement>.
func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
object, ok := obj.(runtime.Object)
if !ok {
return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
}
key, err := w.keyFunc(object)
if err != nil {
return nil, false, fmt.Errorf("couldn't compute key: %v", err)
}
return w.store.Get(&storeElement{Key: key, Object: object})
}
// GetByKey returns pointer to <storeElement>.
func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
return w.store.GetByKey(key)
}
// Replace takes slice of runtime.Object as a paramater.
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
version, err := parseResourceVersion(resourceVersion)
if err != nil {
return err
}
toReplace := make([]interface{}, 0, len(objs))
for _, obj := range objs {
object, ok := obj.(runtime.Object)
if !ok {
return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj)
}
key, err := w.keyFunc(object)
if err != nil {
return fmt.Errorf("couldn't compute key: %v", err)
}
toReplace = append(toReplace, &storeElement{Key: key, Object: object})
}
w.Lock()
defer w.Unlock()
w.startIndex = 0
w.endIndex = 0
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err
}
w.resourceVersion = version
if w.onReplace != nil {
w.onReplace()
}
w.cond.Broadcast()
return nil
}
func (w *watchCache) SetOnReplace(onReplace func()) {
w.Lock()
defer w.Unlock()
w.onReplace = onReplace
}
func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
w.Lock()
defer w.Unlock()
w.onEvent = onEvent
}
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
size := w.endIndex - w.startIndex
oldest := w.resourceVersion
if size > 0 {
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
}
if resourceVersion == 0 {
// resourceVersion = 0 means that we don't require any specific starting point
// and we would like to start watching from ~now.
// However, to keep backward compatibility, we additionally need to return the
// current state and only then start watching from that point.
//
// TODO: In v2 api, we should stop returning the current state - #13969.
allItems := w.store.List()
result := make([]*watchCacheEvent, len(allItems))
for i, item := range allItems {
elem, ok := item.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", elem)
}
objLabels, objFields, err := w.getAttrsFunc(elem.Object)
if err != nil {
return nil, err
}
result[i] = &watchCacheEvent{
Type: watch.Added,
Object: elem.Object,
ObjLabels: objLabels,
ObjFields: objFields,
Key: elem.Key,
ResourceVersion: w.resourceVersion,
}
}
return result, nil
}
if resourceVersion < oldest-1 {
return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
}
// Binary search the smallest index at which resourceVersion is greater than the given one.
f := func(i int) bool {
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
}
first := sort.Search(size, f)
result := make([]*watchCacheEvent, size-first)
for i := 0; i < size-first; i++ {
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
}
return result, nil
}
func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
w.RLock()
defer w.RUnlock()
return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
}
func (w *watchCache) Resync() error {
// Nothing to do
return nil
}

View File

@ -0,0 +1,367 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"strconv"
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/clock"
)
func makeTestPod(name string, resourceVersion uint64) *api.Pod {
return &api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: name,
ResourceVersion: strconv.FormatUint(resourceVersion, 10),
},
}
}
// newTestWatchCache just adds a fake clock.
func newTestWatchCache(capacity int) *watchCache {
keyFunc := func(obj runtime.Object) (string, error) {
return NamespaceKeyFunc("prefix", obj)
}
getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) {
return nil, nil, nil
}
wc := newWatchCache(capacity, keyFunc, getAttrsFunc)
wc.clock = clock.NewFakeClock(time.Now())
return wc
}
func TestWatchCacheBasic(t *testing.T) {
store := newTestWatchCache(2)
// Test Add/Update/Delete.
pod1 := makeTestPod("pod", 1)
if err := store.Add(pod1); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item, ok, _ := store.Get(pod1); !ok {
t.Errorf("didn't find pod")
} else {
if !apiequality.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod1}, item) {
t.Errorf("expected %v, got %v", pod1, item)
}
}
pod2 := makeTestPod("pod", 2)
if err := store.Update(pod2); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item, ok, _ := store.Get(pod2); !ok {
t.Errorf("didn't find pod")
} else {
if !apiequality.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod2}, item) {
t.Errorf("expected %v, got %v", pod1, item)
}
}
pod3 := makeTestPod("pod", 3)
if err := store.Delete(pod3); err != nil {
t.Errorf("unexpected error: %v", err)
}
if _, ok, _ := store.Get(pod3); ok {
t.Errorf("found pod")
}
// Test List.
store.Add(makeTestPod("pod1", 4))
store.Add(makeTestPod("pod2", 5))
store.Add(makeTestPod("pod3", 6))
{
podNames := sets.String{}
for _, item := range store.List() {
podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name)
}
if !podNames.HasAll("pod1", "pod2", "pod3") {
t.Errorf("missing pods, found %v", podNames)
}
if len(podNames) != 3 {
t.Errorf("found missing/extra items")
}
}
// Test Replace.
store.Replace([]interface{}{
makeTestPod("pod4", 7),
makeTestPod("pod5", 8),
}, "8")
{
podNames := sets.String{}
for _, item := range store.List() {
podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name)
}
if !podNames.HasAll("pod4", "pod5") {
t.Errorf("missing pods, found %v", podNames)
}
if len(podNames) != 2 {
t.Errorf("found missing/extra items")
}
}
}
func TestEvents(t *testing.T) {
store := newTestWatchCache(5)
store.Add(makeTestPod("pod", 3))
// Test for Added event.
{
_, err := store.GetAllEventsSince(1)
if err == nil {
t.Errorf("expected error too old")
}
if _, ok := err.(*errors.StatusError); !ok {
t.Errorf("expected error to be of type StatusError")
}
}
{
result, err := store.GetAllEventsSince(2)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(result) != 1 {
t.Fatalf("unexpected events: %v", result)
}
if result[0].Type != watch.Added {
t.Errorf("unexpected event type: %v", result[0].Type)
}
pod := makeTestPod("pod", uint64(3))
if !apiequality.Semantic.DeepEqual(pod, result[0].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod)
}
if result[0].PrevObject != nil {
t.Errorf("unexpected item: %v", result[0].PrevObject)
}
}
store.Update(makeTestPod("pod", 4))
store.Update(makeTestPod("pod", 5))
// Test with not full cache.
{
_, err := store.GetAllEventsSince(1)
if err == nil {
t.Errorf("expected error too old")
}
}
{
result, err := store.GetAllEventsSince(3)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(result) != 2 {
t.Fatalf("unexpected events: %v", result)
}
for i := 0; i < 2; i++ {
if result[i].Type != watch.Modified {
t.Errorf("unexpected event type: %v", result[i].Type)
}
pod := makeTestPod("pod", uint64(i+4))
if !apiequality.Semantic.DeepEqual(pod, result[i].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod)
}
prevPod := makeTestPod("pod", uint64(i+3))
if !apiequality.Semantic.DeepEqual(prevPod, result[i].PrevObject) {
t.Errorf("unexpected item: %v, expected: %v", result[i].PrevObject, prevPod)
}
}
}
for i := 6; i < 10; i++ {
store.Update(makeTestPod("pod", uint64(i)))
}
// Test with full cache - there should be elements from 5 to 9.
{
_, err := store.GetAllEventsSince(3)
if err == nil {
t.Errorf("expected error too old")
}
}
{
result, err := store.GetAllEventsSince(4)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(result) != 5 {
t.Fatalf("unexpected events: %v", result)
}
for i := 0; i < 5; i++ {
pod := makeTestPod("pod", uint64(i+5))
if !apiequality.Semantic.DeepEqual(pod, result[i].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod)
}
}
}
// Test for delete event.
store.Delete(makeTestPod("pod", uint64(10)))
{
result, err := store.GetAllEventsSince(9)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(result) != 1 {
t.Fatalf("unexpected events: %v", result)
}
if result[0].Type != watch.Deleted {
t.Errorf("unexpected event type: %v", result[0].Type)
}
pod := makeTestPod("pod", uint64(10))
if !apiequality.Semantic.DeepEqual(pod, result[0].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod)
}
prevPod := makeTestPod("pod", uint64(9))
if !apiequality.Semantic.DeepEqual(prevPod, result[0].PrevObject) {
t.Errorf("unexpected item: %v, expected: %v", result[0].PrevObject, prevPod)
}
}
}
func TestWaitUntilFreshAndList(t *testing.T) {
store := newTestWatchCache(3)
// In background, update the store.
go func() {
store.Add(makeTestPod("foo", 2))
store.Add(makeTestPod("bar", 5))
}()
list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
}
if len(list) != 2 {
t.Errorf("unexpected list returned: %#v", list)
}
}
func TestWaitUntilFreshAndGet(t *testing.T) {
store := newTestWatchCache(3)
// In background, update the store.
go func() {
store.Add(makeTestPod("foo", 2))
store.Add(makeTestPod("bar", 5))
}()
obj, exists, resourceVersion, err := store.WaitUntilFreshAndGet(5, "prefix/ns/bar", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
}
if !exists {
t.Fatalf("no results returned: %#v", obj)
}
if !apiequality.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/bar", Object: makeTestPod("bar", 5)}, obj) {
t.Errorf("unexpected element returned: %#v", obj)
}
}
func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store := newTestWatchCache(3)
fc := store.clock.(*clock.FakeClock)
// In background, step clock after the below call starts the timer.
go func() {
for !fc.HasWaiters() {
time.Sleep(time.Millisecond)
}
fc.Step(blockTimeout)
// Add an object to make sure the test would
// eventually fail instead of just waiting
// forever.
time.Sleep(30 * time.Second)
store.Add(makeTestPod("bar", 5))
}()
_, _, err := store.WaitUntilFreshAndList(5, nil)
if err == nil {
t.Fatalf("unexpected lack of timeout error")
}
}
type testLW struct {
ListFunc func(options metav1.ListOptions) (runtime.Object, error)
WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
}
func (t *testLW) List(options metav1.ListOptions) (runtime.Object, error) {
return t.ListFunc(options)
}
func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
return t.WatchFunc(options)
}
func TestReflectorForWatchCache(t *testing.T) {
store := newTestWatchCache(5)
{
_, version, err := store.WaitUntilFreshAndList(0, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if version != 0 {
t.Errorf("unexpected resource version: %d", version)
}
}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
go fw.Stop()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &api.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
},
}
r := cache.NewReflector(lw, &api.Pod{}, store, 0)
r.ListAndWatch(wait.NeverStop)
{
_, version, err := store.WaitUntilFreshAndList(10, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if version != 10 {
t.Errorf("unexpected resource version: %d", version)
}
}
}