Merge pull request #118812 from serathius/storage-metric
Improve apiserver storage size metric Kubernetes-commit: 2ec4e14bfa0cec1f22919ea862c45b1501187e20
This commit is contained in:
commit
3cebba9887
|
@ -36,6 +36,7 @@ import (
|
|||
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
||||
encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
storagevalue "k8s.io/apiserver/pkg/storage/value"
|
||||
|
@ -238,10 +239,34 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
|
|||
return err
|
||||
}
|
||||
|
||||
metrics.SetStorageMonitorGetter(monitorGetter(factory))
|
||||
|
||||
c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers)
|
||||
return nil
|
||||
}
|
||||
|
||||
func monitorGetter(factory serverstorage.StorageFactory) func() (monitors []metrics.Monitor, err error) {
|
||||
return func() (monitors []metrics.Monitor, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
for _, m := range monitors {
|
||||
m.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var m metrics.Monitor
|
||||
for _, cfg := range factory.Configs() {
|
||||
m, err = storagefactory.CreateMonitor(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
monitors = append(monitors, m)
|
||||
}
|
||||
return monitors, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter {
|
||||
if resourceTransformers != nil {
|
||||
factory = &transformerStorageFactory{
|
||||
|
|
|
@ -17,11 +17,14 @@ limitations under the License.
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
compbasemetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
/*
|
||||
|
@ -73,13 +76,16 @@ var (
|
|||
)
|
||||
dbTotalSize = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Subsystem: "apiserver",
|
||||
Name: "storage_db_total_size_in_bytes",
|
||||
Help: "Total size of the storage database file physically allocated in bytes.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
Subsystem: "apiserver",
|
||||
Name: "storage_db_total_size_in_bytes",
|
||||
Help: "Total size of the storage database file physically allocated in bytes.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
DeprecatedVersion: "1.28.0",
|
||||
},
|
||||
[]string{"endpoint"},
|
||||
)
|
||||
storageSizeDescription = compbasemetrics.NewDesc("apiserver_storage_size_bytes", "Size of the storage database file physically allocated in bytes.", []string{"server"}, nil, compbasemetrics.ALPHA, "")
|
||||
storageMonitor = &monitorCollector{}
|
||||
etcdEventsReceivedCounts = compbasemetrics.NewCounterVec(
|
||||
&compbasemetrics.CounterOpts{
|
||||
Subsystem: "apiserver",
|
||||
|
@ -160,6 +166,7 @@ func Register() {
|
|||
legacyregistry.MustRegister(etcdRequestErrorCounts)
|
||||
legacyregistry.MustRegister(objectCounts)
|
||||
legacyregistry.MustRegister(dbTotalSize)
|
||||
legacyregistry.CustomMustRegister(storageMonitor)
|
||||
legacyregistry.MustRegister(etcdBookmarkCounts)
|
||||
legacyregistry.MustRegister(etcdLeaseObjectCounts)
|
||||
legacyregistry.MustRegister(listStorageCount)
|
||||
|
@ -214,10 +221,16 @@ var sinceInSeconds = func(start time.Time) float64 {
|
|||
}
|
||||
|
||||
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
|
||||
// Deprecated: Metric etcd_db_total_size_in_bytes will be replaced with apiserver_storage_size_bytes
|
||||
func UpdateEtcdDbSize(ep string, size int64) {
|
||||
dbTotalSize.WithLabelValues(ep).Set(float64(size))
|
||||
}
|
||||
|
||||
// SetStorageMonitorGetter sets monitor getter to allow monitoring etcd stats.
|
||||
func SetStorageMonitorGetter(getter func() ([]Monitor, error)) {
|
||||
storageMonitor.monitorGetter = getter
|
||||
}
|
||||
|
||||
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
|
||||
func UpdateLeaseObjectCount(count int64) {
|
||||
// Currently we only store one previous lease, since all the events have the same ttl.
|
||||
|
@ -232,3 +245,51 @@ func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned
|
|||
listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald))
|
||||
listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned))
|
||||
}
|
||||
|
||||
type Monitor interface {
|
||||
Monitor(ctx context.Context) (StorageMetrics, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type StorageMetrics struct {
|
||||
Size int64
|
||||
}
|
||||
|
||||
type monitorCollector struct {
|
||||
compbasemetrics.BaseStableCollector
|
||||
|
||||
monitorGetter func() ([]Monitor, error)
|
||||
}
|
||||
|
||||
// DescribeWithStability implements compbasemetrics.StableColletor
|
||||
func (c *monitorCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) {
|
||||
ch <- storageSizeDescription
|
||||
}
|
||||
|
||||
// CollectWithStability implements compbasemetrics.StableColletor
|
||||
func (c *monitorCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) {
|
||||
monitors, err := c.monitorGetter()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for i, m := range monitors {
|
||||
server := fmt.Sprintf("etcd-%d", i)
|
||||
|
||||
klog.V(4).InfoS("Start collecting storage metrics", "server", server)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
metrics, err := m.Monitor(ctx)
|
||||
cancel()
|
||||
m.Close()
|
||||
if err != nil {
|
||||
klog.InfoS("Failed to get storage metrics", "server", server, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
metric, err := compbasemetrics.NewConstMetric(storageSizeDescription, compbasemetrics.GaugeValue, float64(metrics.Size), server)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to create metric", "server", server)
|
||||
}
|
||||
ch <- metric
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
|
@ -37,6 +38,7 @@ import (
|
|||
"go.uber.org/zap/zapcore"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
|
@ -52,7 +54,6 @@ import (
|
|||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
tracing "k8s.io/component-base/tracing"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -153,11 +154,11 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
|
|||
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
|
||||
|
||||
lock := sync.RWMutex{}
|
||||
var prober *etcd3Prober
|
||||
var prober *etcd3ProberMonitor
|
||||
clientErr := fmt.Errorf("etcd client connection not yet established")
|
||||
|
||||
go wait.PollUntil(time.Second, func() (bool, error) {
|
||||
newProber, err := newETCD3Prober(c)
|
||||
newProber, err := newETCD3ProberMonitor(c)
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
// Ensure that server is already not shutting down.
|
||||
|
@ -221,49 +222,66 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
|
|||
}, nil
|
||||
}
|
||||
|
||||
func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) {
|
||||
func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error) {
|
||||
client, err := newETCD3Client(c.Transport)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &etcd3Prober{
|
||||
client: client,
|
||||
prefix: c.Prefix,
|
||||
return &etcd3ProberMonitor{
|
||||
client: client,
|
||||
prefix: c.Prefix,
|
||||
endpoints: c.Transport.ServerList,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type etcd3Prober struct {
|
||||
prefix string
|
||||
type etcd3ProberMonitor struct {
|
||||
prefix string
|
||||
endpoints []string
|
||||
|
||||
mux sync.RWMutex
|
||||
client *clientv3.Client
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (p *etcd3Prober) Close() error {
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
if !p.closed {
|
||||
p.closed = true
|
||||
return p.client.Close()
|
||||
func (t *etcd3ProberMonitor) Close() error {
|
||||
t.mux.Lock()
|
||||
defer t.mux.Unlock()
|
||||
if !t.closed {
|
||||
t.closed = true
|
||||
return t.client.Close()
|
||||
}
|
||||
return fmt.Errorf("prober was closed")
|
||||
return fmt.Errorf("closed")
|
||||
}
|
||||
|
||||
func (p *etcd3Prober) Probe(ctx context.Context) error {
|
||||
p.mux.RLock()
|
||||
defer p.mux.RUnlock()
|
||||
if p.closed {
|
||||
return fmt.Errorf("prober was closed")
|
||||
func (t *etcd3ProberMonitor) Probe(ctx context.Context) error {
|
||||
t.mux.RLock()
|
||||
defer t.mux.RUnlock()
|
||||
if t.closed {
|
||||
return fmt.Errorf("closed")
|
||||
}
|
||||
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
|
||||
_, err := p.client.Get(ctx, path.Join("/", p.prefix, "health"))
|
||||
_, err := t.client.Get(ctx, path.Join("/", t.prefix, "health"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting data from etcd: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) {
|
||||
t.mux.RLock()
|
||||
defer t.mux.RUnlock()
|
||||
if t.closed {
|
||||
return metrics.StorageMetrics{}, fmt.Errorf("closed")
|
||||
}
|
||||
status, err := t.client.Status(ctx, t.endpoints[rand.Int()%len(t.endpoints)])
|
||||
if err != nil {
|
||||
return metrics.StorageMetrics{}, err
|
||||
}
|
||||
return metrics.StorageMetrics{
|
||||
Size: status.DbSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: c.CertFile,
|
||||
|
@ -441,6 +459,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
|
|||
|
||||
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
||||
// corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint.
|
||||
// Deprecated: Will be replaced with newETCD3ProberMonitor
|
||||
func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) {
|
||||
if interval == 0 {
|
||||
return func() {}, nil
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
)
|
||||
|
||||
|
@ -68,7 +69,18 @@ func CreateProber(c storagebackend.Config) (Prober, error) {
|
|||
case storagebackend.StorageTypeETCD2:
|
||||
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
|
||||
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||
return newETCD3Prober(c)
|
||||
return newETCD3ProberMonitor(c)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func CreateMonitor(c storagebackend.Config) (metrics.Monitor, error) {
|
||||
switch c.Type {
|
||||
case storagebackend.StorageTypeETCD2:
|
||||
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
|
||||
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||
return newETCD3ProberMonitor(c)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue