Allow watch cache to be disabled per type
Currently setting watch cache size for a given resource does not disable the watch cache. This commit adds a new `default-watch-cache-size` flag to map to the existing field, and refactors how watch cache sizes are calculated to bring all of the code into one place. It also adds debug logging to startup to allow us to verify watch cache enablement in production. Kubernetes-commit: fc2d201e155296f311ae0a9278b00dcae2d68708
This commit is contained in:
parent
94baeb40da
commit
644d9a8cf1
|
@ -17,6 +17,8 @@ limitations under the License.
|
|||
package registry
|
||||
|
||||
import (
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
|
@ -26,11 +28,10 @@ import (
|
|||
)
|
||||
|
||||
// Creates a cacher based given storageConfig.
|
||||
func StorageWithCacher(defaultCapacity int) generic.StorageDecorator {
|
||||
func StorageWithCacher(capacity int) generic.StorageDecorator {
|
||||
return func(
|
||||
copier runtime.ObjectCopier,
|
||||
storageConfig *storagebackend.Config,
|
||||
requestedSize *int,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
keyFunc func(obj runtime.Object) (string, error),
|
||||
|
@ -38,15 +39,13 @@ func StorageWithCacher(defaultCapacity int) generic.StorageDecorator {
|
|||
getAttrsFunc storage.AttrFunc,
|
||||
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
|
||||
|
||||
capacity := defaultCapacity
|
||||
if requestedSize != nil && *requestedSize == 0 {
|
||||
panic("StorageWithCacher must not be called with zero cache size")
|
||||
}
|
||||
if requestedSize != nil {
|
||||
capacity = *requestedSize
|
||||
}
|
||||
|
||||
s, d := generic.NewRawStorage(storageConfig)
|
||||
if capacity == 0 {
|
||||
glog.V(5).Infof("Storage caching is disabled for %T", objectType)
|
||||
return s, d
|
||||
}
|
||||
glog.V(5).Infof("Storage caching is enabled for %T with capacity %v", objectType, capacity)
|
||||
|
||||
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
|
||||
// Currently it has two layers of same storage interface -- cacher and low level kv.
|
||||
cacherConfig := storage.CacherConfig{
|
||||
|
|
|
@ -175,10 +175,6 @@ type Store struct {
|
|||
Storage storage.Interface
|
||||
// Called to cleanup clients used by the underlying Storage; optional.
|
||||
DestroyFunc func()
|
||||
// Maximum size of the watch history cached in memory, in number of entries.
|
||||
// This value is ignored if Storage is non-nil. Nil is replaced with a default value.
|
||||
// A zero integer will disable caching.
|
||||
WatchCacheSize *int
|
||||
}
|
||||
|
||||
// Note: the rest.StandardStorage interface aggregates the common REST verbs
|
||||
|
@ -1337,7 +1333,6 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
|
|||
e.Storage, e.DestroyFunc = opts.Decorator(
|
||||
e.Copier,
|
||||
opts.StorageConfig,
|
||||
e.WatchCacheSize,
|
||||
e.NewFunc(),
|
||||
prefix,
|
||||
keyFunc,
|
||||
|
|
|
@ -26,11 +26,9 @@ import (
|
|||
|
||||
// StorageDecorator is a function signature for producing a storage.Interface
|
||||
// and an associated DestroyFunc from given parameters.
|
||||
// A zero capacity means to disable caching, nil means to use a default.
|
||||
type StorageDecorator func(
|
||||
copier runtime.ObjectCopier,
|
||||
config *storagebackend.Config,
|
||||
capacity *int,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
keyFunc func(obj runtime.Object) (string, error),
|
||||
|
@ -43,7 +41,6 @@ type StorageDecorator func(
|
|||
func UndecoratedStorage(
|
||||
copier runtime.ObjectCopier,
|
||||
config *storagebackend.Config,
|
||||
capacity *int,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
keyFunc func(obj runtime.Object) (string, error),
|
||||
|
|
|
@ -19,6 +19,8 @@ package options
|
|||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
|
@ -51,6 +53,8 @@ type EtcdOptions struct {
|
|||
EnableWatchCache bool
|
||||
// Set DefaultWatchCacheSize to zero to disable watch caches for those resources that have no explicit cache size set
|
||||
DefaultWatchCacheSize int
|
||||
// WatchCacheSizes represents override to a given resource
|
||||
WatchCacheSizes []string
|
||||
}
|
||||
|
||||
var storageTypes = sets.NewString(
|
||||
|
@ -107,10 +111,17 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
|||
"Enables the generic garbage collector. MUST be synced with the corresponding flag "+
|
||||
"of the kube-controller-manager.")
|
||||
|
||||
// TODO: enable cache in integration tests.
|
||||
fs.BoolVar(&s.EnableWatchCache, "watch-cache", s.EnableWatchCache,
|
||||
"Enable watch caching in the apiserver")
|
||||
|
||||
fs.IntVar(&s.DefaultWatchCacheSize, "default-watch-cache-size", s.DefaultWatchCacheSize,
|
||||
"Default watch cache size. If zero, watch cache will be disabled for resources that do not have a default watch size set.")
|
||||
|
||||
fs.StringSliceVar(&s.WatchCacheSizes, "watch-cache-sizes", s.WatchCacheSizes, ""+
|
||||
"List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. "+
|
||||
"The individual override format: resource#size, where size is a number. It takes effect "+
|
||||
"when watch-cache is enabled.")
|
||||
|
||||
fs.StringVar(&s.StorageConfig.Type, "storage-backend", s.StorageConfig.Type,
|
||||
"The storage backend for persistence. Options: 'etcd3' (default), 'etcd2'.")
|
||||
|
||||
|
@ -181,7 +192,15 @@ func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource)
|
|||
ResourcePrefix: resource.Group + "/" + resource.Resource,
|
||||
}
|
||||
if f.Options.EnableWatchCache {
|
||||
ret.Decorator = genericregistry.StorageWithCacher(f.Options.DefaultWatchCacheSize)
|
||||
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
|
||||
if err != nil {
|
||||
return generic.RESTOptions{}, err
|
||||
}
|
||||
cacheSize, ok := sizes[resource]
|
||||
if !ok {
|
||||
cacheSize = f.Options.DefaultWatchCacheSize
|
||||
}
|
||||
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
@ -205,8 +224,52 @@ func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR
|
|||
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
|
||||
}
|
||||
if f.Options.EnableWatchCache {
|
||||
ret.Decorator = genericregistry.StorageWithCacher(f.Options.DefaultWatchCacheSize)
|
||||
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
|
||||
if err != nil {
|
||||
return generic.RESTOptions{}, err
|
||||
}
|
||||
cacheSize, ok := sizes[resource]
|
||||
if !ok {
|
||||
cacheSize = f.Options.DefaultWatchCacheSize
|
||||
}
|
||||
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// ParseWatchCacheSizes turns a list of cache size values into a map of group resources
|
||||
// to requested sizes.
|
||||
func ParseWatchCacheSizes(cacheSizes []string) (map[schema.GroupResource]int, error) {
|
||||
watchCacheSizes := make(map[schema.GroupResource]int)
|
||||
for _, c := range cacheSizes {
|
||||
tokens := strings.Split(c, "#")
|
||||
if len(tokens) != 2 {
|
||||
return nil, fmt.Errorf("invalid value of watch cache size: %s", c)
|
||||
}
|
||||
|
||||
size, err := strconv.Atoi(tokens[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid size of watch cache size: %s", c)
|
||||
}
|
||||
if size < 0 {
|
||||
return nil, fmt.Errorf("watch cache size cannot be negative: %s", c)
|
||||
}
|
||||
|
||||
watchCacheSizes[schema.ParseGroupResource(tokens[0])] = size
|
||||
}
|
||||
return watchCacheSizes, nil
|
||||
}
|
||||
|
||||
// WriteWatchCacheSizes turns a map of cache size values into a list of string specifications.
|
||||
func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]string, error) {
|
||||
var cacheSizes []string
|
||||
|
||||
for resource, size := range watchCacheSizes {
|
||||
if size < 0 {
|
||||
return nil, fmt.Errorf("watch cache size cannot be negative for resource %s", resource)
|
||||
}
|
||||
cacheSizes = append(cacheSizes, fmt.Sprintf("%s#%d", resource.String(), size))
|
||||
}
|
||||
return cacheSizes, nil
|
||||
}
|
||||
|
|
|
@ -43,7 +43,6 @@ type ServerRunOptions struct {
|
|||
RequestTimeout time.Duration
|
||||
MinRequestTimeout int
|
||||
TargetRAMMB int
|
||||
WatchCacheSizes []string
|
||||
}
|
||||
|
||||
func NewServerRunOptions() *ServerRunOptions {
|
||||
|
@ -151,10 +150,5 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
|
|||
"handler, which picks a randomized value above this number as the connection timeout, "+
|
||||
"to spread out load.")
|
||||
|
||||
fs.StringSliceVar(&s.WatchCacheSizes, "watch-cache-sizes", s.WatchCacheSizes, ""+
|
||||
"List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. "+
|
||||
"The individual override format: resource#size, where size is a number. It takes effect "+
|
||||
"when watch-cache is enabled.")
|
||||
|
||||
utilfeature.DefaultFeatureGate.AddFlag(fs)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue