Add a metric exposing etcd database size

Kubernetes-commit: 922ec728de9248657f026eb6cfb8fdaeb11049ac
This commit is contained in:
jingyih 2020-03-16 07:55:38 -07:00 committed by Kubernetes Publisher
parent 744e4e6817
commit 9303178e27
4 changed files with 90 additions and 19 deletions

View File

@ -176,6 +176,9 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.StorageConfig.CountMetricPollPeriod, "etcd-count-metric-poll-period", s.StorageConfig.CountMetricPollPeriod, ""+ fs.DurationVar(&s.StorageConfig.CountMetricPollPeriod, "etcd-count-metric-poll-period", s.StorageConfig.CountMetricPollPeriod, ""+
"Frequency of polling etcd for number of resources per type. 0 disables the metric collection.") "Frequency of polling etcd for number of resources per type. 0 disables the metric collection.")
fs.DurationVar(&s.StorageConfig.DBMetricPollInterval, "etcd-db-metric-poll-interval", s.StorageConfig.DBMetricPollInterval,
"The interval of requests to poll etcd and update metric. 0 disables the metric collection")
} }
func (s *EtcdOptions) ApplyTo(c *server.Config) error { func (s *EtcdOptions) ApplyTo(c *server.Config) error {

View File

@ -49,6 +49,14 @@ var (
}, },
[]string{"resource"}, []string{"resource"},
) )
dbTotalSize = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "etcd_db_total_size_in_bytes",
Help: "Total size of the etcd database file physically allocated in bytes.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"endpoint"},
)
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -59,6 +67,7 @@ func Register() {
registerMetrics.Do(func() { registerMetrics.Do(func() {
legacyregistry.MustRegister(etcdRequestLatency) legacyregistry.MustRegister(etcdRequestLatency)
legacyregistry.MustRegister(objectCounts) legacyregistry.MustRegister(objectCounts)
legacyregistry.MustRegister(dbTotalSize)
}) })
} }
@ -81,3 +90,8 @@ func Reset() {
func sinceInSeconds(start time.Time) float64 { func sinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds() return time.Since(start).Seconds()
} }
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
func UpdateEtcdDbSize(ep string, size int64) {
dbTotalSize.WithLabelValues(ep).Set(float64(size))
}

View File

@ -28,7 +28,8 @@ const (
StorageTypeUnset = "" StorageTypeUnset = ""
StorageTypeETCD3 = "etcd3" StorageTypeETCD3 = "etcd3"
DefaultCompactInterval = 5 * time.Minute DefaultCompactInterval = 5 * time.Minute
DefaultDBMetricPollInterval = 30 * time.Second
) )
// TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to. // TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to.
@ -71,13 +72,16 @@ type Config struct {
CompactionInterval time.Duration CompactionInterval time.Duration
// CountMetricPollPeriod specifies how often should count metric be updated // CountMetricPollPeriod specifies how often should count metric be updated
CountMetricPollPeriod time.Duration CountMetricPollPeriod time.Duration
// DBMetricPollInterval specifies how often should storage backend metric be updated.
DBMetricPollInterval time.Duration
} }
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
return &Config{ return &Config{
Paging: true, Paging: true,
Prefix: prefix, Prefix: prefix,
Codec: codec, Codec: codec,
CompactionInterval: DefaultCompactInterval, CompactionInterval: DefaultCompactInterval,
DBMetricPollInterval: DefaultDBMetricPollInterval,
} }
} }

View File

@ -36,20 +36,26 @@ import (
"k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/value" "k8s.io/apiserver/pkg/storage/value"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog"
) )
// The short keepalive timeout and interval have been chosen to aggressively const (
// detect a failed etcd server without introducing much overhead. // The short keepalive timeout and interval have been chosen to aggressively
const keepaliveTime = 30 * time.Second // detect a failed etcd server without introducing much overhead.
const keepaliveTimeout = 10 * time.Second keepaliveTime = 30 * time.Second
keepaliveTimeout = 10 * time.Second
// dialTimeout is the timeout for failing to establish a connection. // dialTimeout is the timeout for failing to establish a connection.
// It is set to 20 seconds as times shorter than that will cause TLS connections to fail // It is set to 20 seconds as times shorter than that will cause TLS connections to fail
// on heavily loaded arm64 CPUs (issue #64649) // on heavily loaded arm64 CPUs (issue #64649)
const dialTimeout = 20 * time.Second dialTimeout = 20 * time.Second
dbMetricsMonitorJitter = 0.5
)
func init() { func init() {
// grpcprom auto-registers (via an init function) their client metrics, since we are opting out of // grpcprom auto-registers (via an init function) their client metrics, since we are opting out of
@ -57,6 +63,7 @@ func init() {
// we need to explicitly register these metrics to our global registry here. // we need to explicitly register these metrics to our global registry here.
// For reference: https://github.com/kubernetes/kubernetes/pull/81387 // For reference: https://github.com/kubernetes/kubernetes/pull/81387
legacyregistry.RawMustRegister(grpcprom.DefaultClientMetrics) legacyregistry.RawMustRegister(grpcprom.DefaultClientMetrics)
dbMetricsMonitors = make(map[string]struct{})
} }
func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) { func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) {
@ -153,16 +160,20 @@ type runningCompactor struct {
} }
var ( var (
lock sync.Mutex // compactorsMu guards access to compactors map
compactors = map[string]*runningCompactor{} compactorsMu sync.Mutex
compactors = map[string]*runningCompactor{}
// dbMetricsMonitorsMu guards access to dbMetricsMonitors map
dbMetricsMonitorsMu sync.Mutex
dbMetricsMonitors map[string]struct{}
) )
// startCompactorOnce start one compactor per transport. If the interval get smaller on repeated calls, the // startCompactorOnce start one compactor per transport. If the interval get smaller on repeated calls, the
// compactor is replaced. A destroy func is returned. If all destroy funcs with the same transport are called, // compactor is replaced. A destroy func is returned. If all destroy funcs with the same transport are called,
// the compactor is stopped. // the compactor is stopped.
func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (func(), error) { func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (func(), error) {
lock.Lock() compactorsMu.Lock()
defer lock.Unlock() defer compactorsMu.Unlock()
key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile} key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile}
if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval { if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval {
@ -193,8 +204,8 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
compactors[key].refs++ compactors[key].refs++
return func() { return func() {
lock.Lock() compactorsMu.Lock()
defer lock.Unlock() defer compactorsMu.Unlock()
compactor := compactors[key] compactor := compactors[key]
compactor.refs-- compactor.refs--
@ -218,6 +229,11 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
return nil, nil, err return nil, nil, err
} }
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
if err != nil {
return nil, nil, err
}
var once sync.Once var once sync.Once
destroyFunc := func() { destroyFunc := func() {
// we know that storage destroy funcs are called multiple times (due to reuse in subresources). // we know that storage destroy funcs are called multiple times (due to reuse in subresources).
@ -225,6 +241,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
// TODO: fix duplicated storage destroy calls higher level // TODO: fix duplicated storage destroy calls higher level
once.Do(func() { once.Do(func() {
stopCompactor() stopCompactor()
stopDBSizeMonitor()
client.Close() client.Close()
}) })
} }
@ -234,3 +251,36 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
} }
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
} }
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
// corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint.
func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) {
if interval == 0 {
return func() {}, nil
}
dbMetricsMonitorsMu.Lock()
defer dbMetricsMonitorsMu.Unlock()
ctx, cancel := context.WithCancel(context.Background())
for _, ep := range client.Endpoints() {
if _, found := dbMetricsMonitors[ep]; found {
continue
}
dbMetricsMonitors[ep] = struct{}{}
endpoint := ep
klog.V(4).Infof("Start monitoring storage db size metric for endpoint %s with polling interval %v", endpoint, interval)
go wait.JitterUntilWithContext(ctx, func(context.Context) {
epStatus, err := client.Maintenance.Status(ctx, endpoint)
if err != nil {
klog.V(4).Infof("Failed to get storage db size for ep %s: %v", endpoint, err)
metrics.UpdateEtcdDbSize(endpoint, -1)
} else {
metrics.UpdateEtcdDbSize(endpoint, epStatus.DbSize)
}
}, interval, dbMetricsMonitorJitter, true)
}
return func() {
cancel()
}, nil
}