diff --git a/pkg/server/options/etcd.go b/pkg/server/options/etcd.go index 4cf68fb8f..358776a79 100644 --- a/pkg/server/options/etcd.go +++ b/pkg/server/options/etcd.go @@ -176,6 +176,9 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.StorageConfig.CountMetricPollPeriod, "etcd-count-metric-poll-period", s.StorageConfig.CountMetricPollPeriod, ""+ "Frequency of polling etcd for number of resources per type. 0 disables the metric collection.") + + fs.DurationVar(&s.StorageConfig.DBMetricPollInterval, "etcd-db-metric-poll-interval", s.StorageConfig.DBMetricPollInterval, + "The interval of requests to poll etcd and update metric. 0 disables the metric collection") } func (s *EtcdOptions) ApplyTo(c *server.Config) error { diff --git a/pkg/storage/etcd3/metrics/metrics.go b/pkg/storage/etcd3/metrics/metrics.go index 8dd6462b0..a0de7e18c 100644 --- a/pkg/storage/etcd3/metrics/metrics.go +++ b/pkg/storage/etcd3/metrics/metrics.go @@ -49,6 +49,14 @@ var ( }, []string{"resource"}, ) + dbTotalSize = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Name: "etcd_db_total_size_in_bytes", + Help: "Total size of the etcd database file physically allocated in bytes.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"endpoint"}, + ) ) var registerMetrics sync.Once @@ -59,6 +67,7 @@ func Register() { registerMetrics.Do(func() { legacyregistry.MustRegister(etcdRequestLatency) legacyregistry.MustRegister(objectCounts) + legacyregistry.MustRegister(dbTotalSize) }) } @@ -81,3 +90,8 @@ func Reset() { func sinceInSeconds(start time.Time) float64 { return time.Since(start).Seconds() } + +// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric. +func UpdateEtcdDbSize(ep string, size int64) { + dbTotalSize.WithLabelValues(ep).Set(float64(size)) +} diff --git a/pkg/storage/storagebackend/config.go b/pkg/storage/storagebackend/config.go index cbf50b211..5dc0bbfb3 100644 --- a/pkg/storage/storagebackend/config.go +++ b/pkg/storage/storagebackend/config.go @@ -28,7 +28,8 @@ const ( StorageTypeUnset = "" StorageTypeETCD3 = "etcd3" - DefaultCompactInterval = 5 * time.Minute + DefaultCompactInterval = 5 * time.Minute + DefaultDBMetricPollInterval = 30 * time.Second ) // TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to. @@ -71,13 +72,16 @@ type Config struct { CompactionInterval time.Duration // CountMetricPollPeriod specifies how often should count metric be updated CountMetricPollPeriod time.Duration + // DBMetricPollInterval specifies how often should storage backend metric be updated. + DBMetricPollInterval time.Duration } func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { return &Config{ - Paging: true, - Prefix: prefix, - Codec: codec, - CompactionInterval: DefaultCompactInterval, + Paging: true, + Prefix: prefix, + Codec: codec, + CompactionInterval: DefaultCompactInterval, + DBMetricPollInterval: DefaultDBMetricPollInterval, } } diff --git a/pkg/storage/storagebackend/factory/etcd3.go b/pkg/storage/storagebackend/factory/etcd3.go index 81a24825b..97578c3d9 100644 --- a/pkg/storage/storagebackend/factory/etcd3.go +++ b/pkg/storage/storagebackend/factory/etcd3.go @@ -36,20 +36,26 @@ import ( "k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/value" "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/klog" ) -// The short keepalive timeout and interval have been chosen to aggressively -// detect a failed etcd server without introducing much overhead. -const keepaliveTime = 30 * time.Second -const keepaliveTimeout = 10 * time.Second +const ( + // The short keepalive timeout and interval have been chosen to aggressively + // detect a failed etcd server without introducing much overhead. + keepaliveTime = 30 * time.Second + keepaliveTimeout = 10 * time.Second -// dialTimeout is the timeout for failing to establish a connection. -// It is set to 20 seconds as times shorter than that will cause TLS connections to fail -// on heavily loaded arm64 CPUs (issue #64649) -const dialTimeout = 20 * time.Second + // dialTimeout is the timeout for failing to establish a connection. + // It is set to 20 seconds as times shorter than that will cause TLS connections to fail + // on heavily loaded arm64 CPUs (issue #64649) + dialTimeout = 20 * time.Second + + dbMetricsMonitorJitter = 0.5 +) func init() { // grpcprom auto-registers (via an init function) their client metrics, since we are opting out of @@ -57,6 +63,7 @@ func init() { // we need to explicitly register these metrics to our global registry here. // For reference: https://github.com/kubernetes/kubernetes/pull/81387 legacyregistry.RawMustRegister(grpcprom.DefaultClientMetrics) + dbMetricsMonitors = make(map[string]struct{}) } func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) { @@ -153,16 +160,20 @@ type runningCompactor struct { } var ( - lock sync.Mutex - compactors = map[string]*runningCompactor{} + // compactorsMu guards access to compactors map + compactorsMu sync.Mutex + compactors = map[string]*runningCompactor{} + // dbMetricsMonitorsMu guards access to dbMetricsMonitors map + dbMetricsMonitorsMu sync.Mutex + dbMetricsMonitors map[string]struct{} ) // startCompactorOnce start one compactor per transport. If the interval get smaller on repeated calls, the // compactor is replaced. A destroy func is returned. If all destroy funcs with the same transport are called, // the compactor is stopped. func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (func(), error) { - lock.Lock() - defer lock.Unlock() + compactorsMu.Lock() + defer compactorsMu.Unlock() key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile} if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval { @@ -193,8 +204,8 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration compactors[key].refs++ return func() { - lock.Lock() - defer lock.Unlock() + compactorsMu.Lock() + defer compactorsMu.Unlock() compactor := compactors[key] compactor.refs-- @@ -218,6 +229,11 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e return nil, nil, err } + stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval) + if err != nil { + return nil, nil, err + } + var once sync.Once destroyFunc := func() { // we know that storage destroy funcs are called multiple times (due to reuse in subresources). @@ -225,6 +241,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e // TODO: fix duplicated storage destroy calls higher level once.Do(func() { stopCompactor() + stopDBSizeMonitor() client.Close() }) } @@ -234,3 +251,36 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e } return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil } + +// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the +// corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint. +func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) { + if interval == 0 { + return func() {}, nil + } + dbMetricsMonitorsMu.Lock() + defer dbMetricsMonitorsMu.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + for _, ep := range client.Endpoints() { + if _, found := dbMetricsMonitors[ep]; found { + continue + } + dbMetricsMonitors[ep] = struct{}{} + endpoint := ep + klog.V(4).Infof("Start monitoring storage db size metric for endpoint %s with polling interval %v", endpoint, interval) + go wait.JitterUntilWithContext(ctx, func(context.Context) { + epStatus, err := client.Maintenance.Status(ctx, endpoint) + if err != nil { + klog.V(4).Infof("Failed to get storage db size for ep %s: %v", endpoint, err) + metrics.UpdateEtcdDbSize(endpoint, -1) + } else { + metrics.UpdateEtcdDbSize(endpoint, epStatus.DbSize) + } + }, interval, dbMetricsMonitorJitter, true) + } + + return func() { + cancel() + }, nil +}