From 573a8d6d05699df72aeefa2b573b5e786bba4cf0 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 22 Jun 2023 11:56:09 +0200 Subject: [PATCH] Improve apiserver storage size metric to allow it's graduation Change name to make it compliant with prometheus guidelines. Calculate it on demand instead of periodic to comply with prometheus standards. Replace "endpoint" with "server" label to make it semantically consistent with storage factory Kubernetes-commit: 7a63997c8a1a9ba14f2bdc478fdf33cf88f48d80 --- pkg/server/options/etcd.go | 25 +++++++ pkg/storage/etcd3/metrics/metrics.go | 69 +++++++++++++++++-- pkg/storage/storagebackend/factory/etcd3.go | 63 +++++++++++------ pkg/storage/storagebackend/factory/factory.go | 14 +++- 4 files changed, 144 insertions(+), 27 deletions(-) diff --git a/pkg/server/options/etcd.go b/pkg/server/options/etcd.go index 5ad5e0277..57e9c1a9f 100644 --- a/pkg/server/options/etcd.go +++ b/pkg/server/options/etcd.go @@ -36,6 +36,7 @@ import ( "k8s.io/apiserver/pkg/server/options/encryptionconfig" encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller" serverstorage "k8s.io/apiserver/pkg/server/storage" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/storagebackend" storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" storagevalue "k8s.io/apiserver/pkg/storage/value" @@ -238,10 +239,34 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac return err } + metrics.SetStorageMonitorGetter(monitorGetter(factory)) + c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers) return nil } +func monitorGetter(factory serverstorage.StorageFactory) func() (monitors []metrics.Monitor, err error) { + return func() (monitors []metrics.Monitor, err error) { + defer func() { + if err != nil { + for _, m := range monitors { + m.Close() + } + } + }() + + var m metrics.Monitor + for _, cfg := range factory.Configs() { + m, err = storagefactory.CreateMonitor(cfg) + if err != nil { + return nil, err + } + monitors = append(monitors, m) + } + return monitors, nil + } +} + func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter { if resourceTransformers != nil { factory = &transformerStorageFactory{ diff --git a/pkg/storage/etcd3/metrics/metrics.go b/pkg/storage/etcd3/metrics/metrics.go index 78c9a5983..a8eda9d22 100644 --- a/pkg/storage/etcd3/metrics/metrics.go +++ b/pkg/storage/etcd3/metrics/metrics.go @@ -17,11 +17,14 @@ limitations under the License. package metrics import ( + "context" + "fmt" "sync" "time" compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/klog/v2" ) /* @@ -73,13 +76,16 @@ var ( ) dbTotalSize = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ - Subsystem: "apiserver", - Name: "storage_db_total_size_in_bytes", - Help: "Total size of the storage database file physically allocated in bytes.", - StabilityLevel: compbasemetrics.ALPHA, + Subsystem: "apiserver", + Name: "storage_db_total_size_in_bytes", + Help: "Total size of the storage database file physically allocated in bytes.", + StabilityLevel: compbasemetrics.ALPHA, + DeprecatedVersion: "1.28.0", }, []string{"endpoint"}, ) + storageSizeDescription = compbasemetrics.NewDesc("apiserver_storage_size_bytes", "Size of the storage database file physically allocated in bytes.", []string{"server"}, nil, compbasemetrics.ALPHA, "") + storageMonitor = &monitorCollector{} etcdEventsReceivedCounts = compbasemetrics.NewCounterVec( &compbasemetrics.CounterOpts{ Subsystem: "apiserver", @@ -160,6 +166,7 @@ func Register() { legacyregistry.MustRegister(etcdRequestErrorCounts) legacyregistry.MustRegister(objectCounts) legacyregistry.MustRegister(dbTotalSize) + legacyregistry.CustomMustRegister(storageMonitor) legacyregistry.MustRegister(etcdBookmarkCounts) legacyregistry.MustRegister(etcdLeaseObjectCounts) legacyregistry.MustRegister(listStorageCount) @@ -214,10 +221,16 @@ var sinceInSeconds = func(start time.Time) float64 { } // UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric. +// Deprecated: Metric etcd_db_total_size_in_bytes will be replaced with apiserver_storage_size_bytes func UpdateEtcdDbSize(ep string, size int64) { dbTotalSize.WithLabelValues(ep).Set(float64(size)) } +// SetStorageMonitorGetter sets monitor getter to allow monitoring etcd stats. +func SetStorageMonitorGetter(getter func() ([]Monitor, error)) { + storageMonitor.monitorGetter = getter +} + // UpdateLeaseObjectCount sets the etcd_lease_object_counts metric. func UpdateLeaseObjectCount(count int64) { // Currently we only store one previous lease, since all the events have the same ttl. @@ -232,3 +245,51 @@ func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald)) listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned)) } + +type Monitor interface { + Monitor(ctx context.Context) (StorageMetrics, error) + Close() error +} + +type StorageMetrics struct { + Size int64 +} + +type monitorCollector struct { + compbasemetrics.BaseStableCollector + + monitorGetter func() ([]Monitor, error) +} + +// DescribeWithStability implements compbasemetrics.StableColletor +func (c *monitorCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) { + ch <- storageSizeDescription +} + +// CollectWithStability implements compbasemetrics.StableColletor +func (c *monitorCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) { + monitors, err := c.monitorGetter() + if err != nil { + return + } + + for i, m := range monitors { + server := fmt.Sprintf("etcd-%d", i) + + klog.V(4).InfoS("Start collecting storage metrics", "server", server) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + metrics, err := m.Monitor(ctx) + cancel() + m.Close() + if err != nil { + klog.InfoS("Failed to get storage metrics", "server", server, "err", err) + continue + } + + metric, err := compbasemetrics.NewConstMetric(storageSizeDescription, compbasemetrics.GaugeValue, float64(metrics.Size), server) + if err != nil { + klog.ErrorS(err, "Failed to create metric", "server", server) + } + ch <- metric + } +} diff --git a/pkg/storage/storagebackend/factory/etcd3.go b/pkg/storage/storagebackend/factory/etcd3.go index 64bcabadb..5736abf63 100644 --- a/pkg/storage/storagebackend/factory/etcd3.go +++ b/pkg/storage/storagebackend/factory/etcd3.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "log" + "math/rand" "net" "net/url" "os" @@ -37,6 +38,7 @@ import ( "go.uber.org/zap/zapcore" "golang.org/x/time/rate" "google.golang.org/grpc" + "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/runtime" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -52,7 +54,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/metrics/legacyregistry" tracing "k8s.io/component-base/tracing" - "k8s.io/klog/v2" ) const ( @@ -153,11 +154,11 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan // retry in a loop in the background until we successfully create the client, storing the client or error encountered lock := sync.RWMutex{} - var prober *etcd3Prober + var prober *etcd3ProberMonitor clientErr := fmt.Errorf("etcd client connection not yet established") go wait.PollUntil(time.Second, func() (bool, error) { - newProber, err := newETCD3Prober(c) + newProber, err := newETCD3ProberMonitor(c) lock.Lock() defer lock.Unlock() // Ensure that server is already not shutting down. @@ -221,49 +222,66 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan }, nil } -func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) { +func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error) { client, err := newETCD3Client(c.Transport) if err != nil { return nil, err } - return &etcd3Prober{ - client: client, - prefix: c.Prefix, + return &etcd3ProberMonitor{ + client: client, + prefix: c.Prefix, + endpoints: c.Transport.ServerList, }, nil } -type etcd3Prober struct { - prefix string +type etcd3ProberMonitor struct { + prefix string + endpoints []string mux sync.RWMutex client *clientv3.Client closed bool } -func (p *etcd3Prober) Close() error { - p.mux.Lock() - defer p.mux.Unlock() - if !p.closed { - p.closed = true - return p.client.Close() +func (t *etcd3ProberMonitor) Close() error { + t.mux.Lock() + defer t.mux.Unlock() + if !t.closed { + t.closed = true + return t.client.Close() } - return fmt.Errorf("prober was closed") + return fmt.Errorf("closed") } -func (p *etcd3Prober) Probe(ctx context.Context) error { - p.mux.RLock() - defer p.mux.RUnlock() - if p.closed { - return fmt.Errorf("prober was closed") +func (t *etcd3ProberMonitor) Probe(ctx context.Context) error { + t.mux.RLock() + defer t.mux.RUnlock() + if t.closed { + return fmt.Errorf("closed") } // See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118 - _, err := p.client.Get(ctx, path.Join("/", p.prefix, "health")) + _, err := t.client.Get(ctx, path.Join("/", t.prefix, "health")) if err != nil { return fmt.Errorf("error getting data from etcd: %w", err) } return nil } +func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) { + t.mux.RLock() + defer t.mux.RUnlock() + if t.closed { + return metrics.StorageMetrics{}, fmt.Errorf("closed") + } + status, err := t.client.Status(ctx, t.endpoints[rand.Int()%len(t.endpoints)]) + if err != nil { + return metrics.StorageMetrics{}, err + } + return metrics.StorageMetrics{ + Size: status.DbSize, + }, nil +} + var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { tlsInfo := transport.TLSInfo{ CertFile: c.CertFile, @@ -441,6 +459,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime. // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the // corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint. +// Deprecated: Will be replaced with newETCD3ProberMonitor func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) { if interval == 0 { return func() {}, nil diff --git a/pkg/storage/storagebackend/factory/factory.go b/pkg/storage/storagebackend/factory/factory.go index c8cdd19b9..1a60c9290 100644 --- a/pkg/storage/storagebackend/factory/factory.go +++ b/pkg/storage/storagebackend/factory/factory.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/storagebackend" ) @@ -68,7 +69,18 @@ func CreateProber(c storagebackend.Config) (Prober, error) { case storagebackend.StorageTypeETCD2: return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: - return newETCD3Prober(c) + return newETCD3ProberMonitor(c) + default: + return nil, fmt.Errorf("unknown storage type: %s", c.Type) + } +} + +func CreateMonitor(c storagebackend.Config) (metrics.Monitor, error) { + switch c.Type { + case storagebackend.StorageTypeETCD2: + return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) + case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: + return newETCD3ProberMonitor(c) default: return nil, fmt.Errorf("unknown storage type: %s", c.Type) }