Improve apiserver storage size metric to allow it's graduation
Change name to make it compliant with prometheus guidelines. Calculate it on demand instead of periodic to comply with prometheus standards. Replace "endpoint" with "server" label to make it semantically consistent with storage factory Kubernetes-commit: 7a63997c8a1a9ba14f2bdc478fdf33cf88f48d80
This commit is contained in:
parent
7f9444fbee
commit
573a8d6d05
|
@ -36,6 +36,7 @@ import (
|
|||
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
||||
encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
storagevalue "k8s.io/apiserver/pkg/storage/value"
|
||||
|
@ -238,10 +239,34 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
|
|||
return err
|
||||
}
|
||||
|
||||
metrics.SetStorageMonitorGetter(monitorGetter(factory))
|
||||
|
||||
c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers)
|
||||
return nil
|
||||
}
|
||||
|
||||
func monitorGetter(factory serverstorage.StorageFactory) func() (monitors []metrics.Monitor, err error) {
|
||||
return func() (monitors []metrics.Monitor, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
for _, m := range monitors {
|
||||
m.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var m metrics.Monitor
|
||||
for _, cfg := range factory.Configs() {
|
||||
m, err = storagefactory.CreateMonitor(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
monitors = append(monitors, m)
|
||||
}
|
||||
return monitors, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter {
|
||||
if resourceTransformers != nil {
|
||||
factory = &transformerStorageFactory{
|
||||
|
|
|
@ -17,11 +17,14 @@ limitations under the License.
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
compbasemetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
/*
|
||||
|
@ -73,13 +76,16 @@ var (
|
|||
)
|
||||
dbTotalSize = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Subsystem: "apiserver",
|
||||
Name: "storage_db_total_size_in_bytes",
|
||||
Help: "Total size of the storage database file physically allocated in bytes.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
Subsystem: "apiserver",
|
||||
Name: "storage_db_total_size_in_bytes",
|
||||
Help: "Total size of the storage database file physically allocated in bytes.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
DeprecatedVersion: "1.28.0",
|
||||
},
|
||||
[]string{"endpoint"},
|
||||
)
|
||||
storageSizeDescription = compbasemetrics.NewDesc("apiserver_storage_size_bytes", "Size of the storage database file physically allocated in bytes.", []string{"server"}, nil, compbasemetrics.ALPHA, "")
|
||||
storageMonitor = &monitorCollector{}
|
||||
etcdEventsReceivedCounts = compbasemetrics.NewCounterVec(
|
||||
&compbasemetrics.CounterOpts{
|
||||
Subsystem: "apiserver",
|
||||
|
@ -160,6 +166,7 @@ func Register() {
|
|||
legacyregistry.MustRegister(etcdRequestErrorCounts)
|
||||
legacyregistry.MustRegister(objectCounts)
|
||||
legacyregistry.MustRegister(dbTotalSize)
|
||||
legacyregistry.CustomMustRegister(storageMonitor)
|
||||
legacyregistry.MustRegister(etcdBookmarkCounts)
|
||||
legacyregistry.MustRegister(etcdLeaseObjectCounts)
|
||||
legacyregistry.MustRegister(listStorageCount)
|
||||
|
@ -214,10 +221,16 @@ var sinceInSeconds = func(start time.Time) float64 {
|
|||
}
|
||||
|
||||
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
|
||||
// Deprecated: Metric etcd_db_total_size_in_bytes will be replaced with apiserver_storage_size_bytes
|
||||
func UpdateEtcdDbSize(ep string, size int64) {
|
||||
dbTotalSize.WithLabelValues(ep).Set(float64(size))
|
||||
}
|
||||
|
||||
// SetStorageMonitorGetter sets monitor getter to allow monitoring etcd stats.
|
||||
func SetStorageMonitorGetter(getter func() ([]Monitor, error)) {
|
||||
storageMonitor.monitorGetter = getter
|
||||
}
|
||||
|
||||
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
|
||||
func UpdateLeaseObjectCount(count int64) {
|
||||
// Currently we only store one previous lease, since all the events have the same ttl.
|
||||
|
@ -232,3 +245,51 @@ func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned
|
|||
listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald))
|
||||
listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned))
|
||||
}
|
||||
|
||||
type Monitor interface {
|
||||
Monitor(ctx context.Context) (StorageMetrics, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type StorageMetrics struct {
|
||||
Size int64
|
||||
}
|
||||
|
||||
type monitorCollector struct {
|
||||
compbasemetrics.BaseStableCollector
|
||||
|
||||
monitorGetter func() ([]Monitor, error)
|
||||
}
|
||||
|
||||
// DescribeWithStability implements compbasemetrics.StableColletor
|
||||
func (c *monitorCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) {
|
||||
ch <- storageSizeDescription
|
||||
}
|
||||
|
||||
// CollectWithStability implements compbasemetrics.StableColletor
|
||||
func (c *monitorCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) {
|
||||
monitors, err := c.monitorGetter()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for i, m := range monitors {
|
||||
server := fmt.Sprintf("etcd-%d", i)
|
||||
|
||||
klog.V(4).InfoS("Start collecting storage metrics", "server", server)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
metrics, err := m.Monitor(ctx)
|
||||
cancel()
|
||||
m.Close()
|
||||
if err != nil {
|
||||
klog.InfoS("Failed to get storage metrics", "server", server, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
metric, err := compbasemetrics.NewConstMetric(storageSizeDescription, compbasemetrics.GaugeValue, float64(metrics.Size), server)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to create metric", "server", server)
|
||||
}
|
||||
ch <- metric
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
|
@ -37,6 +38,7 @@ import (
|
|||
"go.uber.org/zap/zapcore"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
|
@ -52,7 +54,6 @@ import (
|
|||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
tracing "k8s.io/component-base/tracing"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -153,11 +154,11 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
|
|||
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
|
||||
|
||||
lock := sync.RWMutex{}
|
||||
var prober *etcd3Prober
|
||||
var prober *etcd3ProberMonitor
|
||||
clientErr := fmt.Errorf("etcd client connection not yet established")
|
||||
|
||||
go wait.PollUntil(time.Second, func() (bool, error) {
|
||||
newProber, err := newETCD3Prober(c)
|
||||
newProber, err := newETCD3ProberMonitor(c)
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
// Ensure that server is already not shutting down.
|
||||
|
@ -221,49 +222,66 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
|
|||
}, nil
|
||||
}
|
||||
|
||||
func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) {
|
||||
func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error) {
|
||||
client, err := newETCD3Client(c.Transport)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &etcd3Prober{
|
||||
client: client,
|
||||
prefix: c.Prefix,
|
||||
return &etcd3ProberMonitor{
|
||||
client: client,
|
||||
prefix: c.Prefix,
|
||||
endpoints: c.Transport.ServerList,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type etcd3Prober struct {
|
||||
prefix string
|
||||
type etcd3ProberMonitor struct {
|
||||
prefix string
|
||||
endpoints []string
|
||||
|
||||
mux sync.RWMutex
|
||||
client *clientv3.Client
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (p *etcd3Prober) Close() error {
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
if !p.closed {
|
||||
p.closed = true
|
||||
return p.client.Close()
|
||||
func (t *etcd3ProberMonitor) Close() error {
|
||||
t.mux.Lock()
|
||||
defer t.mux.Unlock()
|
||||
if !t.closed {
|
||||
t.closed = true
|
||||
return t.client.Close()
|
||||
}
|
||||
return fmt.Errorf("prober was closed")
|
||||
return fmt.Errorf("closed")
|
||||
}
|
||||
|
||||
func (p *etcd3Prober) Probe(ctx context.Context) error {
|
||||
p.mux.RLock()
|
||||
defer p.mux.RUnlock()
|
||||
if p.closed {
|
||||
return fmt.Errorf("prober was closed")
|
||||
func (t *etcd3ProberMonitor) Probe(ctx context.Context) error {
|
||||
t.mux.RLock()
|
||||
defer t.mux.RUnlock()
|
||||
if t.closed {
|
||||
return fmt.Errorf("closed")
|
||||
}
|
||||
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
|
||||
_, err := p.client.Get(ctx, path.Join("/", p.prefix, "health"))
|
||||
_, err := t.client.Get(ctx, path.Join("/", t.prefix, "health"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting data from etcd: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) {
|
||||
t.mux.RLock()
|
||||
defer t.mux.RUnlock()
|
||||
if t.closed {
|
||||
return metrics.StorageMetrics{}, fmt.Errorf("closed")
|
||||
}
|
||||
status, err := t.client.Status(ctx, t.endpoints[rand.Int()%len(t.endpoints)])
|
||||
if err != nil {
|
||||
return metrics.StorageMetrics{}, err
|
||||
}
|
||||
return metrics.StorageMetrics{
|
||||
Size: status.DbSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: c.CertFile,
|
||||
|
@ -441,6 +459,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
|
|||
|
||||
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
||||
// corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint.
|
||||
// Deprecated: Will be replaced with newETCD3ProberMonitor
|
||||
func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) {
|
||||
if interval == 0 {
|
||||
return func() {}, nil
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
)
|
||||
|
||||
|
@ -68,7 +69,18 @@ func CreateProber(c storagebackend.Config) (Prober, error) {
|
|||
case storagebackend.StorageTypeETCD2:
|
||||
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
|
||||
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||
return newETCD3Prober(c)
|
||||
return newETCD3ProberMonitor(c)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func CreateMonitor(c storagebackend.Config) (metrics.Monitor, error) {
|
||||
switch c.Type {
|
||||
case storagebackend.StorageTypeETCD2:
|
||||
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
|
||||
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||
return newETCD3ProberMonitor(c)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue