From e36f8069aa3218fd64d2e23ededbb2bf2781f8a9 Mon Sep 17 00:00:00 2001 From: Marek Grabowski Date: Mon, 12 Feb 2018 15:58:57 +0000 Subject: [PATCH] Add a metric exposing number of objects per type Kubernetes-commit: f6e9ebffa2df10f7792fbea0a0fbe5ab8e388a26 --- pkg/registry/generic/options.go | 3 +++ pkg/registry/generic/registry/store.go | 36 +++++++++++++++++++++++++- pkg/server/options/etcd.go | 9 ++++++- pkg/storage/cacher.go | 4 +++ pkg/storage/etcd/etcd_helper.go | 4 +++ pkg/storage/etcd/metrics/metrics.go | 12 +++++++++ pkg/storage/etcd3/BUILD | 2 ++ pkg/storage/etcd3/store.go | 9 +++++++ pkg/storage/interfaces.go | 3 +++ pkg/storage/storagebackend/config.go | 3 +++ 10 files changed, 83 insertions(+), 2 deletions(-) diff --git a/pkg/registry/generic/options.go b/pkg/registry/generic/options.go index 2f5c850ff..af651371f 100644 --- a/pkg/registry/generic/options.go +++ b/pkg/registry/generic/options.go @@ -17,6 +17,8 @@ limitations under the License. package generic import ( + "time" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/storagebackend" @@ -30,6 +32,7 @@ type RESTOptions struct { EnableGarbageCollection bool DeleteCollectionWorkers int ResourcePrefix string + CountMetricPollPeriod time.Duration } // Implement RESTOptionsGetter so that RESTOptions can directly be used when available (i.e. tests) diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index 0907f0154..d7e449c25 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -36,12 +36,14 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" storeerr "k8s.io/apiserver/pkg/storage/errors" + "k8s.io/apiserver/pkg/storage/etcd/metrics" "github.com/golang/glog" ) @@ -181,7 +183,10 @@ var _ rest.Exporter = &Store{} var _ rest.TableConvertor = &Store{} var _ GenericStore = &Store{} -const OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again" +const ( + OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again" + resourceCountPollPeriodJitter = 1.2 +) // NamespaceKeyRootFunc is the default function for constructing storage paths // to resource directories enforcing namespace rules. @@ -1363,11 +1368,40 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { attrFunc, triggerFunc, ) + + if opts.CountMetricPollPeriod > 0 { + stopFunc := e.startObservingCount(opts.CountMetricPollPeriod) + previousDestroy := e.DestroyFunc + e.DestroyFunc = func() { + stopFunc() + if previousDestroy != nil { + previousDestroy() + } + } + } } return nil } +// startObservingCount starts monitoring given prefix and periodically updating metrics. It returns a function to stop collection. +func (e *Store) startObservingCount(period time.Duration) func() { + prefix := e.KeyRootFunc(genericapirequest.NewContext()) + resourceName := e.DefaultQualifiedResource.String() + glog.V(2).Infof("Monitoring %v count at /%v", resourceName, prefix) + stopCh := make(chan struct{}) + go wait.JitterUntil(func() { + count, err := e.Storage.Count(prefix) + if err != nil { + glog.V(5).Infof("Failed to update storage count metric: %v", err) + metrics.UpdateObjectCount(resourceName, -1) + } else { + metrics.UpdateObjectCount(resourceName, count) + } + }, period, resourceCountPollPeriodJitter, true, stopCh) + return func() { close(stopCh) } +} + func (e *Store) ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) { if e.TableConvertor != nil { return e.TableConvertor.ConvertToTable(ctx, object, tableOptions) diff --git a/pkg/server/options/etcd.go b/pkg/server/options/etcd.go index e401193a7..5b8f1b9bb 100644 --- a/pkg/server/options/etcd.go +++ b/pkg/server/options/etcd.go @@ -21,6 +21,7 @@ import ( "net/http" "strconv" "strings" + "time" "github.com/spf13/pflag" @@ -64,7 +65,7 @@ var storageTypes = sets.NewString( ) func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions { - return &EtcdOptions{ + options := &EtcdOptions{ StorageConfig: *backendConfig, DefaultStorageMediaType: "application/json", DeleteCollectionWorkers: 1, @@ -72,6 +73,8 @@ func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions { EnableWatchCache: true, DefaultWatchCacheSize: 100, } + options.StorageConfig.CountMetricPollPeriod = time.Minute + return options } func (s *EtcdOptions) Validate() []error { @@ -155,6 +158,8 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.StorageConfig.CompactionInterval, "etcd-compaction-interval", s.StorageConfig.CompactionInterval, "The interval of compaction requests. If 0, the compaction request from apiserver is disabled.") + fs.DurationVar(&s.StorageConfig.CountMetricPollPeriod, "etcd-count-metric-poll-period", s.StorageConfig.CountMetricPollPeriod, ""+ + "Frequency of polling etcd for number of resources per type. 0 disables the metric collection.") } func (s *EtcdOptions) ApplyTo(c *server.Config) error { @@ -197,6 +202,7 @@ func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) EnableGarbageCollection: f.Options.EnableGarbageCollection, DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, ResourcePrefix: resource.Group + "/" + resource.Resource, + CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, } if f.Options.EnableWatchCache { sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) @@ -229,6 +235,7 @@ func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, EnableGarbageCollection: f.Options.EnableGarbageCollection, ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), + CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, } if f.Options.EnableWatchCache { sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index f1ea90e91..9ffbbc5e9 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -555,6 +555,10 @@ func (c *Cacher) GuaranteedUpdate( return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate) } +func (c *Cacher) Count(pathPrefix string) (int64, error) { + return c.storage.Count(pathPrefix) +} + func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { // TODO: Currently we assume that in a given Cacher object, its // is aware of exactly the same trigger (at most one). Thus calling: diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 7b11ec4c9..510b293c7 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -586,6 +586,10 @@ func (h *etcdHelper) GuaranteedUpdate( } } +func (*etcdHelper) Count(pathPerfix string) (int64, error) { + return 0, fmt.Errorf("Count is unimplemented for etcd2!") +} + // etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by // their Node.ModifiedIndex, which is unique across all types. // All implementations must be thread-safe. diff --git a/pkg/storage/etcd/metrics/metrics.go b/pkg/storage/etcd/metrics/metrics.go index 12c11eaf9..96385f6e6 100644 --- a/pkg/storage/etcd/metrics/metrics.go +++ b/pkg/storage/etcd/metrics/metrics.go @@ -59,6 +59,13 @@ var ( }, []string{"operation", "type"}, ) + objectCounts = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "etcd_object_counts", + Help: "Number of stored objects at the time of last check split by kind.", + }, + []string{"resource"}, + ) ) var registerMetrics sync.Once @@ -73,9 +80,14 @@ func Register() { prometheus.MustRegister(cacheAddLatency) prometheus.MustRegister(cacheGetLatency) prometheus.MustRegister(etcdRequestLatenciesSummary) + prometheus.MustRegister(objectCounts) }) } +func UpdateObjectCount(resourcePrefix string, count int64) { + objectCounts.WithLabelValues(resourcePrefix).Set(float64(count)) +} + func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) { etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond)) } diff --git a/pkg/storage/etcd3/BUILD b/pkg/storage/etcd3/BUILD index 29f789950..782551112 100644 --- a/pkg/storage/etcd3/BUILD +++ b/pkg/storage/etcd3/BUILD @@ -60,9 +60,11 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/value:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library", ], diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index f5bf03e5b..c83965b09 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -412,6 +412,15 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "") } +func (s *store) Count(key string) (int64, error) { + key = path.Join(s.pathPrefix, key) + getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly()) + if err != nil { + return 0, err + } + return getResp.Count, nil +} + // continueToken is a simple structured object for encoding the state of a continue token. // TODO: if we change the version of the encoded from, we can't start encoding the new version // until all other servers are upgraded (i.e. we need to support rolling schema) diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index e93265792..6241533be 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -195,4 +195,7 @@ type Interface interface { GuaranteedUpdate( ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error + + // Count returns number of different entries under the key (generally being path prefix). + Count(key string) (int64, error) } diff --git a/pkg/storage/storagebackend/config.go b/pkg/storage/storagebackend/config.go index b725985f6..8d7ecf37c 100644 --- a/pkg/storage/storagebackend/config.go +++ b/pkg/storage/storagebackend/config.go @@ -62,6 +62,9 @@ type Config struct { // CompactionInterval is an interval of requesting compaction from apiserver. // If the value is 0, no compaction will be issued. CompactionInterval time.Duration + + // CountMetricPollPeriod specifies how often should count metric be updated + CountMetricPollPeriod time.Duration } func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {