Add a metric exposing number of objects per type

Kubernetes-commit: f6e9ebffa2df10f7792fbea0a0fbe5ab8e388a26
This commit is contained in:
Marek Grabowski 2018-02-12 15:58:57 +00:00 committed by Kubernetes Publisher
parent faaaac8e85
commit e36f8069aa
10 changed files with 83 additions and 2 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package generic package generic
import ( import (
"time"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
@ -30,6 +32,7 @@ type RESTOptions struct {
EnableGarbageCollection bool EnableGarbageCollection bool
DeleteCollectionWorkers int DeleteCollectionWorkers int
ResourcePrefix string ResourcePrefix string
CountMetricPollPeriod time.Duration
} }
// Implement RESTOptionsGetter so that RESTOptions can directly be used when available (i.e. tests) // Implement RESTOptionsGetter so that RESTOptions can directly be used when available (i.e. tests)

View File

@ -36,12 +36,14 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
storeerr "k8s.io/apiserver/pkg/storage/errors" storeerr "k8s.io/apiserver/pkg/storage/errors"
"k8s.io/apiserver/pkg/storage/etcd/metrics"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -181,7 +183,10 @@ var _ rest.Exporter = &Store{}
var _ rest.TableConvertor = &Store{} var _ rest.TableConvertor = &Store{}
var _ GenericStore = &Store{} var _ GenericStore = &Store{}
const OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again" const (
OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
resourceCountPollPeriodJitter = 1.2
)
// NamespaceKeyRootFunc is the default function for constructing storage paths // NamespaceKeyRootFunc is the default function for constructing storage paths
// to resource directories enforcing namespace rules. // to resource directories enforcing namespace rules.
@ -1363,11 +1368,40 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
attrFunc, attrFunc,
triggerFunc, triggerFunc,
) )
if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
previousDestroy := e.DestroyFunc
e.DestroyFunc = func() {
stopFunc()
if previousDestroy != nil {
previousDestroy()
}
}
}
} }
return nil return nil
} }
// startObservingCount starts monitoring given prefix and periodically updating metrics. It returns a function to stop collection.
func (e *Store) startObservingCount(period time.Duration) func() {
prefix := e.KeyRootFunc(genericapirequest.NewContext())
resourceName := e.DefaultQualifiedResource.String()
glog.V(2).Infof("Monitoring %v count at <storage-prefix>/%v", resourceName, prefix)
stopCh := make(chan struct{})
go wait.JitterUntil(func() {
count, err := e.Storage.Count(prefix)
if err != nil {
glog.V(5).Infof("Failed to update storage count metric: %v", err)
metrics.UpdateObjectCount(resourceName, -1)
} else {
metrics.UpdateObjectCount(resourceName, count)
}
}, period, resourceCountPollPeriodJitter, true, stopCh)
return func() { close(stopCh) }
}
func (e *Store) ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) { func (e *Store) ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
if e.TableConvertor != nil { if e.TableConvertor != nil {
return e.TableConvertor.ConvertToTable(ctx, object, tableOptions) return e.TableConvertor.ConvertToTable(ctx, object, tableOptions)

View File

@ -21,6 +21,7 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/spf13/pflag" "github.com/spf13/pflag"
@ -64,7 +65,7 @@ var storageTypes = sets.NewString(
) )
func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions { func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
return &EtcdOptions{ options := &EtcdOptions{
StorageConfig: *backendConfig, StorageConfig: *backendConfig,
DefaultStorageMediaType: "application/json", DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1, DeleteCollectionWorkers: 1,
@ -72,6 +73,8 @@ func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
EnableWatchCache: true, EnableWatchCache: true,
DefaultWatchCacheSize: 100, DefaultWatchCacheSize: 100,
} }
options.StorageConfig.CountMetricPollPeriod = time.Minute
return options
} }
func (s *EtcdOptions) Validate() []error { func (s *EtcdOptions) Validate() []error {
@ -155,6 +158,8 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.StorageConfig.CompactionInterval, "etcd-compaction-interval", s.StorageConfig.CompactionInterval, fs.DurationVar(&s.StorageConfig.CompactionInterval, "etcd-compaction-interval", s.StorageConfig.CompactionInterval,
"The interval of compaction requests. If 0, the compaction request from apiserver is disabled.") "The interval of compaction requests. If 0, the compaction request from apiserver is disabled.")
fs.DurationVar(&s.StorageConfig.CountMetricPollPeriod, "etcd-count-metric-poll-period", s.StorageConfig.CountMetricPollPeriod, ""+
"Frequency of polling etcd for number of resources per type. 0 disables the metric collection.")
} }
func (s *EtcdOptions) ApplyTo(c *server.Config) error { func (s *EtcdOptions) ApplyTo(c *server.Config) error {
@ -197,6 +202,7 @@ func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource)
EnableGarbageCollection: f.Options.EnableGarbageCollection, EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource, ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
} }
if f.Options.EnableWatchCache { if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
@ -229,6 +235,7 @@ func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection, EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
} }
if f.Options.EnableWatchCache { if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)

View File

@ -555,6 +555,10 @@ func (c *Cacher) GuaranteedUpdate(
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate) return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
} }
func (c *Cacher) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
// TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc> // TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
// is aware of exactly the same trigger (at most one). Thus calling: // is aware of exactly the same trigger (at most one). Thus calling:

View File

@ -586,6 +586,10 @@ func (h *etcdHelper) GuaranteedUpdate(
} }
} }
func (*etcdHelper) Count(pathPerfix string) (int64, error) {
return 0, fmt.Errorf("Count is unimplemented for etcd2!")
}
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by // etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// their Node.ModifiedIndex, which is unique across all types. // their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe. // All implementations must be thread-safe.

View File

@ -59,6 +59,13 @@ var (
}, },
[]string{"operation", "type"}, []string{"operation", "type"},
) )
objectCounts = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "etcd_object_counts",
Help: "Number of stored objects at the time of last check split by kind.",
},
[]string{"resource"},
)
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -73,9 +80,14 @@ func Register() {
prometheus.MustRegister(cacheAddLatency) prometheus.MustRegister(cacheAddLatency)
prometheus.MustRegister(cacheGetLatency) prometheus.MustRegister(cacheGetLatency)
prometheus.MustRegister(etcdRequestLatenciesSummary) prometheus.MustRegister(etcdRequestLatenciesSummary)
prometheus.MustRegister(objectCounts)
}) })
} }
func UpdateObjectCount(resourcePrefix string, count int64) {
objectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
}
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) { func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond)) etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
} }

View File

@ -60,9 +60,11 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library", "//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/value:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library",
], ],

View File

@ -412,6 +412,15 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "") return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "")
} }
func (s *store) Count(key string) (int64, error) {
key = path.Join(s.pathPrefix, key)
getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
if err != nil {
return 0, err
}
return getResp.Count, nil
}
// continueToken is a simple structured object for encoding the state of a continue token. // continueToken is a simple structured object for encoding the state of a continue token.
// TODO: if we change the version of the encoded from, we can't start encoding the new version // TODO: if we change the version of the encoded from, we can't start encoding the new version
// until all other servers are upgraded (i.e. we need to support rolling schema) // until all other servers are upgraded (i.e. we need to support rolling schema)

View File

@ -195,4 +195,7 @@ type Interface interface {
GuaranteedUpdate( GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
// Count returns number of different entries under the key (generally being path prefix).
Count(key string) (int64, error)
} }

View File

@ -62,6 +62,9 @@ type Config struct {
// CompactionInterval is an interval of requesting compaction from apiserver. // CompactionInterval is an interval of requesting compaction from apiserver.
// If the value is 0, no compaction will be issued. // If the value is 0, no compaction will be issued.
CompactionInterval time.Duration CompactionInterval time.Duration
// CountMetricPollPeriod specifies how often should count metric be updated
CountMetricPollPeriod time.Duration
} }
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {