From 83887f16116cbf9ecc34a32e4ad8b4d8e782348d Mon Sep 17 00:00:00 2001 From: fpetkovski Date: Mon, 7 Jun 2021 15:28:06 +0200 Subject: [PATCH 1/5] Replace multiListerWatcher with independent listWatchers per namespace The multiListerWatcher is a composite object encapsulating multiple ListerWatchers and implements the ListerWatcher interface. With the current implementation, when an individual lister fails, the entire List operation fails. This causes no metrics to be shown when KSM has no permissions to a single namespace. In addition to this, the multiListerWatcher takes advantage of internal implementation details if the client-go library by modifiying and relying on the ResourceVersion metadata field. This introduces a bug where reconnecting to the API server will break the multiListerWatcher completely. This commit replaces the multiListerWatcher with individual ListerWatchers per each configured namespace, resolving both issues. Signed-off-by: fpetkovski --- internal/store/builder.go | 184 +++++++++++-------- pkg/builder/builder.go | 5 +- pkg/builder/types/interfaces.go | 8 +- pkg/listwatch/listwatch.go | 202 -------------------- pkg/listwatch/namespace_denylist.go | 184 ------------------- pkg/metrics_store/metrics_writer.go | 70 +++++++ pkg/metrics_store/metrics_writer_test.go | 224 +++++++++++++++++++++++ pkg/metricshandler/metrics_handler.go | 15 +- 8 files changed, 413 insertions(+), 479 deletions(-) delete mode 100644 pkg/listwatch/listwatch.go delete mode 100644 pkg/listwatch/namespace_denylist.go create mode 100644 pkg/metrics_store/metrics_writer.go create mode 100644 pkg/metrics_store/metrics_writer_test.go diff --git a/internal/store/builder.go b/internal/store/builder.go index c9e90ea3..53079f19 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -43,7 +43,6 @@ import ( "k8s.io/klog/v2" ksmtypes "k8s.io/kube-state-metrics/v2/pkg/builder/types" - "k8s.io/kube-state-metrics/v2/pkg/listwatch" generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator" metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" "k8s.io/kube-state-metrics/v2/pkg/options" @@ -155,58 +154,64 @@ func (b *Builder) WithAllowLabels(labels map[string][]string) { } // Build initializes and registers all enabled stores. -func (b *Builder) Build() []cache.Store { +// It returns metrics writers which can be used to write out +// metrics from the stores. +func (b *Builder) Build() []metricsstore.MetricsWriter { if b.allowDenyList == nil { panic("allowDenyList should not be nil") } - stores := []cache.Store{} - activeStoreNames := []string{} + var metricsWriters []metricsstore.MetricsWriter + var activeStoreNames []string for _, c := range b.enabledResources { constructor, ok := availableStores[c] if ok { - store := constructor(b) + stores := constructor(b) activeStoreNames = append(activeStoreNames, c) - stores = append(stores, store) + if len(stores) == 1 { + metricsWriters = append(metricsWriters, stores[0]) + } else { + metricsWriters = append(metricsWriters, metricsstore.NewMultiStoreMetricsWriter(stores)) + } } } klog.Infof("Active resources: %s", strings.Join(activeStoreNames, ",")) - return stores + return metricsWriters } -var availableStores = map[string]func(f *Builder) cache.Store{ - "certificatesigningrequests": func(b *Builder) cache.Store { return b.buildCsrStore() }, - "configmaps": func(b *Builder) cache.Store { return b.buildConfigMapStore() }, - "cronjobs": func(b *Builder) cache.Store { return b.buildCronJobStore() }, - "daemonsets": func(b *Builder) cache.Store { return b.buildDaemonSetStore() }, - "deployments": func(b *Builder) cache.Store { return b.buildDeploymentStore() }, - "endpoints": func(b *Builder) cache.Store { return b.buildEndpointsStore() }, - "horizontalpodautoscalers": func(b *Builder) cache.Store { return b.buildHPAStore() }, - "ingresses": func(b *Builder) cache.Store { return b.buildIngressStore() }, - "jobs": func(b *Builder) cache.Store { return b.buildJobStore() }, - "leases": func(b *Builder) cache.Store { return b.buildLeases() }, - "limitranges": func(b *Builder) cache.Store { return b.buildLimitRangeStore() }, - "mutatingwebhookconfigurations": func(b *Builder) cache.Store { return b.buildMutatingWebhookConfigurationStore() }, - "namespaces": func(b *Builder) cache.Store { return b.buildNamespaceStore() }, - "networkpolicies": func(b *Builder) cache.Store { return b.buildNetworkPolicyStore() }, - "nodes": func(b *Builder) cache.Store { return b.buildNodeStore() }, - "persistentvolumeclaims": func(b *Builder) cache.Store { return b.buildPersistentVolumeClaimStore() }, - "persistentvolumes": func(b *Builder) cache.Store { return b.buildPersistentVolumeStore() }, - "poddisruptionbudgets": func(b *Builder) cache.Store { return b.buildPodDisruptionBudgetStore() }, - "pods": func(b *Builder) cache.Store { return b.buildPodStore() }, - "replicasets": func(b *Builder) cache.Store { return b.buildReplicaSetStore() }, - "replicationcontrollers": func(b *Builder) cache.Store { return b.buildReplicationControllerStore() }, - "resourcequotas": func(b *Builder) cache.Store { return b.buildResourceQuotaStore() }, - "secrets": func(b *Builder) cache.Store { return b.buildSecretStore() }, - "services": func(b *Builder) cache.Store { return b.buildServiceStore() }, - "statefulsets": func(b *Builder) cache.Store { return b.buildStatefulSetStore() }, - "storageclasses": func(b *Builder) cache.Store { return b.buildStorageClassStore() }, - "validatingwebhookconfigurations": func(b *Builder) cache.Store { return b.buildValidatingWebhookConfigurationStore() }, - "volumeattachments": func(b *Builder) cache.Store { return b.buildVolumeAttachmentStore() }, - "verticalpodautoscalers": func(b *Builder) cache.Store { return b.buildVPAStore() }, +var availableStores = map[string]func(f *Builder) []*metricsstore.MetricsStore{ + "certificatesigningrequests": func(b *Builder) []*metricsstore.MetricsStore { return b.buildCsrStore() }, + "configmaps": func(b *Builder) []*metricsstore.MetricsStore { return b.buildConfigMapStore() }, + "cronjobs": func(b *Builder) []*metricsstore.MetricsStore { return b.buildCronJobStore() }, + "daemonsets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildDaemonSetStore() }, + "deployments": func(b *Builder) []*metricsstore.MetricsStore { return b.buildDeploymentStore() }, + "endpoints": func(b *Builder) []*metricsstore.MetricsStore { return b.buildEndpointsStore() }, + "horizontalpodautoscalers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildHPAStore() }, + "ingresses": func(b *Builder) []*metricsstore.MetricsStore { return b.buildIngressStore() }, + "jobs": func(b *Builder) []*metricsstore.MetricsStore { return b.buildJobStore() }, + "leases": func(b *Builder) []*metricsstore.MetricsStore { return b.buildLeases() }, + "limitranges": func(b *Builder) []*metricsstore.MetricsStore { return b.buildLimitRangeStore() }, + "mutatingwebhookconfigurations": func(b *Builder) []*metricsstore.MetricsStore { return b.buildMutatingWebhookConfigurationStore() }, + "namespaces": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNamespaceStore() }, + "networkpolicies": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNetworkPolicyStore() }, + "nodes": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNodeStore() }, + "persistentvolumeclaims": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPersistentVolumeClaimStore() }, + "persistentvolumes": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPersistentVolumeStore() }, + "poddisruptionbudgets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPodDisruptionBudgetStore() }, + "pods": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPodStore() }, + "replicasets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildReplicaSetStore() }, + "replicationcontrollers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildReplicationControllerStore() }, + "resourcequotas": func(b *Builder) []*metricsstore.MetricsStore { return b.buildResourceQuotaStore() }, + "secrets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildSecretStore() }, + "services": func(b *Builder) []*metricsstore.MetricsStore { return b.buildServiceStore() }, + "statefulsets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildStatefulSetStore() }, + "storageclasses": func(b *Builder) []*metricsstore.MetricsStore { return b.buildStorageClassStore() }, + "validatingwebhookconfigurations": func(b *Builder) []*metricsstore.MetricsStore { return b.buildValidatingWebhookConfigurationStore() }, + "volumeattachments": func(b *Builder) []*metricsstore.MetricsStore { return b.buildVolumeAttachmentStore() }, + "verticalpodautoscalers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildVPAStore() }, } func resourceExists(name string) bool { @@ -222,119 +227,119 @@ func availableResources() []string { return c } -func (b *Builder) buildConfigMapStore() cache.Store { +func (b *Builder) buildConfigMapStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(configMapMetricFamilies, &v1.ConfigMap{}, createConfigMapListWatch) } -func (b *Builder) buildCronJobStore() cache.Store { +func (b *Builder) buildCronJobStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(cronJobMetricFamilies(b.allowLabelsList["cronjobs"]), &batchv1beta1.CronJob{}, createCronJobListWatch) } -func (b *Builder) buildDaemonSetStore() cache.Store { +func (b *Builder) buildDaemonSetStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(daemonSetMetricFamilies(b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch) } -func (b *Builder) buildDeploymentStore() cache.Store { +func (b *Builder) buildDeploymentStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(deploymentMetricFamilies(b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch) } -func (b *Builder) buildEndpointsStore() cache.Store { +func (b *Builder) buildEndpointsStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(endpointMetricFamilies(b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch) } -func (b *Builder) buildHPAStore() cache.Store { +func (b *Builder) buildHPAStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(hpaMetricFamilies(b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch) } -func (b *Builder) buildIngressStore() cache.Store { +func (b *Builder) buildIngressStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(ingressMetricFamilies(b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch) } -func (b *Builder) buildJobStore() cache.Store { +func (b *Builder) buildJobStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(jobMetricFamilies(b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch) } -func (b *Builder) buildLimitRangeStore() cache.Store { +func (b *Builder) buildLimitRangeStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch) } -func (b *Builder) buildMutatingWebhookConfigurationStore() cache.Store { +func (b *Builder) buildMutatingWebhookConfigurationStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch) } -func (b *Builder) buildNamespaceStore() cache.Store { +func (b *Builder) buildNamespaceStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(namespaceMetricFamilies(b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch) } -func (b *Builder) buildNetworkPolicyStore() cache.Store { +func (b *Builder) buildNetworkPolicyStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(networkPolicyMetricFamilies(b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch) } -func (b *Builder) buildNodeStore() cache.Store { +func (b *Builder) buildNodeStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(nodeMetricFamilies(b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch) } -func (b *Builder) buildPersistentVolumeClaimStore() cache.Store { +func (b *Builder) buildPersistentVolumeClaimStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(persistentVolumeClaimMetricFamilies(b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch) } -func (b *Builder) buildPersistentVolumeStore() cache.Store { +func (b *Builder) buildPersistentVolumeStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(persistentVolumeMetricFamilies(b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch) } -func (b *Builder) buildPodDisruptionBudgetStore() cache.Store { +func (b *Builder) buildPodDisruptionBudgetStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(podDisruptionBudgetMetricFamilies, &policy.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch) } -func (b *Builder) buildReplicaSetStore() cache.Store { +func (b *Builder) buildReplicaSetStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(replicaSetMetricFamilies(b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch) } -func (b *Builder) buildReplicationControllerStore() cache.Store { +func (b *Builder) buildReplicationControllerStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch) } -func (b *Builder) buildResourceQuotaStore() cache.Store { +func (b *Builder) buildResourceQuotaStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(resourceQuotaMetricFamilies, &v1.ResourceQuota{}, createResourceQuotaListWatch) } -func (b *Builder) buildSecretStore() cache.Store { +func (b *Builder) buildSecretStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(secretMetricFamilies(b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch) } -func (b *Builder) buildServiceStore() cache.Store { +func (b *Builder) buildServiceStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(serviceMetricFamilies(b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch) } -func (b *Builder) buildStatefulSetStore() cache.Store { +func (b *Builder) buildStatefulSetStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(statefulSetMetricFamilies(b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch) } -func (b *Builder) buildStorageClassStore() cache.Store { +func (b *Builder) buildStorageClassStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(storageClassMetricFamilies(b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch) } -func (b *Builder) buildPodStore() cache.Store { +func (b *Builder) buildPodStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(podMetricFamilies(b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch) } -func (b *Builder) buildCsrStore() cache.Store { +func (b *Builder) buildCsrStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(csrMetricFamilies(b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch) } -func (b *Builder) buildValidatingWebhookConfigurationStore() cache.Store { +func (b *Builder) buildValidatingWebhookConfigurationStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch) } -func (b *Builder) buildVolumeAttachmentStore() cache.Store { +func (b *Builder) buildVolumeAttachmentStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch) } -func (b *Builder) buildVPAStore() cache.Store { +func (b *Builder) buildVPAStore() []*metricsstore.MetricsStore { return b.buildStoreFunc(vpaMetricFamilies(b.allowLabelsList["verticalpodautoscalers"]), &vpaautoscaling.VerticalPodAutoscaler{}, createVPAListWatchFunc(b.vpaClient)) } -func (b *Builder) buildLeases() cache.Store { +func (b *Builder) buildLeases() []*metricsstore.MetricsStore { return b.buildStoreFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch) } @@ -342,30 +347,49 @@ func (b *Builder) buildStore( metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher, -) cache.Store { +) []*metricsstore.MetricsStore { metricFamilies = generator.FilterMetricFamilies(b.allowDenyList, metricFamilies) composedMetricGenFuncs := generator.ComposeMetricGenFuncs(metricFamilies) familyHeaders := generator.ExtractMetricFamilyHeaders(metricFamilies) - store := metricsstore.NewMetricsStore( - familyHeaders, - composedMetricGenFuncs, - ) - b.reflectorPerNamespace(expectedType, store, listWatchFunc) + if isAllNamespaces(b.namespaces) { + store := metricsstore.NewMetricsStore( + familyHeaders, + composedMetricGenFuncs, + ) + listWatcher := listWatchFunc(b.kubeClient, v1.NamespaceAll) + b.startReflector(expectedType, store, listWatcher) + return []*metricsstore.MetricsStore{store} + } - return store + stores := make([]*metricsstore.MetricsStore, 0, len(b.namespaces)) + for _, ns := range b.namespaces { + store := metricsstore.NewMetricsStore( + familyHeaders, + composedMetricGenFuncs, + ) + listWatcher := listWatchFunc(b.kubeClient, ns) + b.startReflector(expectedType, store, listWatcher) + stores = append(stores, store) + } + + return stores } -// reflectorPerNamespace creates a Kubernetes client-go reflector with the given -// listWatchFunc for each given namespace and registers it with the given store. -func (b *Builder) reflectorPerNamespace( +// startReflector starts a Kubernetes client-go reflector with the given +// listWatcher and registers it with the given store. +func (b *Builder) startReflector( expectedType interface{}, store cache.Store, - listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher, + listWatcher cache.ListerWatcher, ) { - lwf := func(ns string) cache.ListerWatcher { return listWatchFunc(b.kubeClient, ns) } - lw := listwatch.MultiNamespaceListerWatcher(b.namespaces, nil, lwf) - instrumentedListWatch := watch.NewInstrumentedListerWatcher(lw, b.listWatchMetrics, reflect.TypeOf(expectedType).String()) + instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String()) reflector := cache.NewReflector(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, 0) go reflector.Run(b.ctx.Done()) } + +// isAllNamespaces checks if the given slice of namespaces +// contains only v1.NamespaceAll. +func isAllNamespaces(namespaces []string) bool { + return len(namespaces) == 1 && namespaces[0] == v1.NamespaceAll +} diff --git a/pkg/builder/builder.go b/pkg/builder/builder.go index 365051da..db5f8b83 100644 --- a/pkg/builder/builder.go +++ b/pkg/builder/builder.go @@ -19,10 +19,11 @@ package builder import ( "context" + metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" + "github.com/prometheus/client_golang/prometheus" vpaclientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" internalstore "k8s.io/kube-state-metrics/v2/internal/store" ksmtypes "k8s.io/kube-state-metrics/v2/pkg/builder/types" @@ -100,6 +101,6 @@ func (b *Builder) DefaultGenerateStoreFunc() ksmtypes.BuildStoreFunc { } // Build initializes and registers all enabled stores. -func (b *Builder) Build() []cache.Store { +func (b *Builder) Build() []metricsstore.MetricsWriter { return b.internal.Build() } diff --git a/pkg/builder/types/interfaces.go b/pkg/builder/types/interfaces.go index c30f0adc..a19e0201 100644 --- a/pkg/builder/types/interfaces.go +++ b/pkg/builder/types/interfaces.go @@ -19,6 +19,8 @@ package types import ( "context" + metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" + "github.com/prometheus/client_golang/prometheus" vpaclientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" clientset "k8s.io/client-go/kubernetes" @@ -41,14 +43,14 @@ type BuilderInterface interface { WithGenerateStoreFunc(f BuildStoreFunc) WithAllowLabels(l map[string][]string) DefaultGenerateStoreFunc() BuildStoreFunc - Build() []cache.Store + Build() []metricsstore.MetricsWriter } -// BuildStoreFunc function signature that is use to returns a cache.Store +// BuildStoreFunc function signature that is use to returns a list of metricsstore.MetricsStore type BuildStoreFunc func(metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher, -) cache.Store +) []*metricsstore.MetricsStore // AllowDenyLister interface for AllowDeny lister that can allow or exclude metrics by there names type AllowDenyLister interface { diff --git a/pkg/listwatch/listwatch.go b/pkg/listwatch/listwatch.go deleted file mode 100644 index b6668121..00000000 --- a/pkg/listwatch/listwatch.go +++ /dev/null @@ -1,202 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package listwatch - -import ( - "fmt" - "strings" - "sync" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" -) - -// MultiNamespaceListerWatcher takes allowed and denied namespaces and a -// cache.ListerWatcher generator func and returns a single cache.ListerWatcher -// capable of operating on multiple namespaces. -// -// Allowed namespaces and denied namespaces are mutually exclusive. -// If allowed namespaces contain multiple items, the given denied namespaces have no effect. -// If the allowed namespaces includes exactly one entry with the value v1.NamespaceAll (empty string), -// the given denied namespaces are applied. -func MultiNamespaceListerWatcher(allowedNamespaces, deniedNamespaces []string, f func(string) cache.ListerWatcher) cache.ListerWatcher { - // If there is only one namespace then there is no need to create a - // multi lister watcher proxy. - if IsAllNamespaces(allowedNamespaces) { - return newDenylistListerWatcher(deniedNamespaces, f(allowedNamespaces[0])) - } - if len(allowedNamespaces) == 1 { - return f(allowedNamespaces[0]) - } - - var lws []cache.ListerWatcher - for _, n := range allowedNamespaces { - lws = append(lws, f(n)) - } - return multiListerWatcher(lws) -} - -// multiListerWatcher abstracts several cache.ListerWatchers, allowing them -// to be treated as a single cache.ListerWatcher. -type multiListerWatcher []cache.ListerWatcher - -// List implements the ListerWatcher interface. -// It combines the output of the List method of every ListerWatcher into -// a single result. -func (mlw multiListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { - l := metav1.List{} - var resourceVersions []string - for _, lw := range mlw { - list, err := lw.List(options) - if err != nil { - return nil, err - } - items, err := meta.ExtractList(list) - if err != nil { - return nil, err - } - metaObj, err := meta.ListAccessor(list) - if err != nil { - return nil, err - } - for _, item := range items { - l.Items = append(l.Items, runtime.RawExtension{Object: item.DeepCopyObject()}) - } - resourceVersions = append(resourceVersions, metaObj.GetResourceVersion()) - } - // Combine the resource versions so that the composite Watch method can - // distribute appropriate versions to each underlying Watch func. - l.ListMeta.ResourceVersion = strings.Join(resourceVersions, "/") - return &l, nil -} - -// Watch implements the ListerWatcher interface. -// It returns a watch.Interface that combines the output from the -// watch.Interface of every cache.ListerWatcher into a single result chan. -func (mlw multiListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { - resourceVersions := make([]string, len(mlw)) - // Allow resource versions to be "". - if options.ResourceVersion != "" { - rvs := make([]string, 0, len(mlw)) - if strings.Contains(options.ResourceVersion, "/") { - rvs = strings.Split(options.ResourceVersion, "/") - if len(rvs) != len(mlw) { - return nil, fmt.Errorf("expected resource version to have %d parts to match the number of ListerWatchers, actual: %d", len(mlw), len(rvs)) - } - } else { - // watch reconnected and resource version is the latest one from event.Object has no "/" - for i := 0; i < len(mlw); i++ { - rvs = append(rvs, options.ResourceVersion) - } - } - resourceVersions = rvs - } - return newMultiWatch(mlw, resourceVersions, options) -} - -// multiWatch abstracts multiple watch.Interface's, allowing them -// to be treated as a single watch.Interface. -type multiWatch struct { - result chan watch.Event - stopped chan struct{} - stoppers []func() -} - -// newMultiWatch returns a new multiWatch or an error if one of the underlying -// Watch funcs errored. The length of []cache.ListerWatcher and []string must -// match. -func newMultiWatch(lws []cache.ListerWatcher, resourceVersions []string, options metav1.ListOptions) (*multiWatch, error) { - var ( - result = make(chan watch.Event) - stopped = make(chan struct{}) - stoppers []func() - wg sync.WaitGroup - ) - - wg.Add(len(lws)) - - for i, lw := range lws { - o := options.DeepCopy() - o.ResourceVersion = resourceVersions[i] - w, err := lw.Watch(*o) - if err != nil { - return nil, err - } - - go func() { - defer wg.Done() - - for { - event, ok := <-w.ResultChan() - if !ok { - return - } - - select { - case result <- event: - case <-stopped: - return - } - } - }() - stoppers = append(stoppers, w.Stop) - } - - // result chan must be closed, - // once all event sender goroutines exited. - go func() { - wg.Wait() - close(result) - }() - - return &multiWatch{ - result: result, - stoppers: stoppers, - stopped: stopped, - }, nil -} - -// ResultChan implements the watch.Interface interface. -func (mw *multiWatch) ResultChan() <-chan watch.Event { - return mw.result -} - -// Stop implements the watch.Interface interface. -// It stops all of the underlying watch.Interfaces and closes the backing chan. -// Can safely be called more than once. -func (mw *multiWatch) Stop() { - select { - case <-mw.stopped: - // nothing to do, we are already stopped - default: - for _, stop := range mw.stoppers { - stop() - } - close(mw.stopped) - } - return -} - -// IsAllNamespaces checks if the given slice of namespaces -// contains only v1.NamespaceAll. -func IsAllNamespaces(namespaces []string) bool { - return len(namespaces) == 1 && namespaces[0] == v1.NamespaceAll -} diff --git a/pkg/listwatch/namespace_denylist.go b/pkg/listwatch/namespace_denylist.go deleted file mode 100644 index 957b962c..00000000 --- a/pkg/listwatch/namespace_denylist.go +++ /dev/null @@ -1,184 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package listwatch - -import ( - "fmt" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" -) - -// denylistListerWatcher implements cache.ListerWatcher -// which wraps a cache.ListerWatcher, -// filtering list results and watch events by denied namespaces. -type denylistListerWatcher struct { - denylist map[string]struct{} - next cache.ListerWatcher -} - -// newDenylistListerWatcher creates a cache.ListerWatcher -// wrapping the given next cache.ListerWatcher -// filtering lists and watch events by the given namespaces. -func newDenylistListerWatcher(namespaces []string, next cache.ListerWatcher) cache.ListerWatcher { - if len(namespaces) == 0 { - return next - } - - denylist := make(map[string]struct{}) - - for _, ns := range namespaces { - denylist[ns] = struct{}{} - } - - return &denylistListerWatcher{ - denylist: denylist, - next: next, - } -} - -// List lists the wrapped next listerwatcher List result, -// but filtering denied namespaces from the result. -func (w *denylistListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { - l := metav1.List{} - - list, err := w.next.List(options) - if err != nil { - klog.Errorf("error listing: %v", err) - return nil, err - } - - objs, err := meta.ExtractList(list) - if err != nil { - klog.Errorf("error extracting list: %v", err) - return nil, err - } - - metaObj, err := meta.ListAccessor(list) - if err != nil { - klog.Errorf("error getting list accessor: %v", err) - return nil, err - } - - for _, obj := range objs { - acc, err := meta.Accessor(obj) - if err != nil { - klog.Errorf("error getting meta accessor accessor for object %s: %v", fmt.Sprintf("%v", obj), err) - return nil, err - } - - if _, denied := w.denylist[getNamespace(acc)]; denied { - klog.V(8).Infof("denied %s", acc.GetSelfLink()) - continue - } - - klog.V(8).Infof("allowed %s", acc.GetSelfLink()) - - l.Items = append(l.Items, runtime.RawExtension{Object: obj.DeepCopyObject()}) - } - - l.ListMeta.ResourceVersion = metaObj.GetResourceVersion() - return &l, nil -} - -// Watch -func (w *denylistListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { - nextWatch, err := w.next.Watch(options) - if err != nil { - return nil, err - } - - return newDenylistWatch(w.denylist, nextWatch), nil -} - -// newDenylistWatch creates a new watch.Interface, -// wrapping the given next watcher, -// and filtering watch events by the given namespaces. -// -// It starts a new goroutine until either -// a) the result channel of the wrapped next watcher is closed, or -// b) Stop() was invoked on the returned watcher. -func newDenylistWatch(denylist map[string]struct{}, next watch.Interface) watch.Interface { - var ( - result = make(chan watch.Event) - proxy = watch.NewProxyWatcher(result) - ) - - go func() { - defer func() { - klog.V(8).Info("stopped denylist watcher") - // According to watch.Interface the result channel is supposed to be called - // in case of error or if the listwach is closed, see [1]. - // - // [1] https://github.com/kubernetes/apimachinery/blob/533d101be9a6450773bb2829bef282b6b7c4ff6d/pkg/watch/watch.go#L34-L37 - close(result) - }() - - for { - select { - case event, ok := <-next.ResultChan(): - if !ok { - klog.V(8).Info("result channel closed") - return - } - - acc, err := meta.Accessor(event.Object) - if err != nil { - // ignore this event, it doesn't implement the metav1.Object interface, - // hence we cannot determine its namespace. - klog.V(6).Infof("unexpected object type in event (%T): %v", event.Object, event.Object) - continue - } - - if _, denied := denylist[getNamespace(acc)]; denied { - klog.V(8).Infof("denied %s", acc.GetSelfLink()) - continue - } - - klog.V(8).Infof("allowed %s", acc.GetSelfLink()) - - select { - case result <- event: - klog.V(8).Infof("dispatched %s", acc.GetSelfLink()) - case <-proxy.StopChan(): - next.Stop() - return - } - case <-proxy.StopChan(): - next.Stop() - return - } - } - }() - - return proxy -} - -// getNamespace returns the namespace of the given object. -// If the object is itself a namespace, it returns the object's -// name. -func getNamespace(obj metav1.Object) string { - if _, ok := obj.(*v1.Namespace); ok { - return obj.GetName() - } - return obj.GetNamespace() -} diff --git a/pkg/metrics_store/metrics_writer.go b/pkg/metrics_store/metrics_writer.go new file mode 100644 index 00000000..965326b1 --- /dev/null +++ b/pkg/metrics_store/metrics_writer.go @@ -0,0 +1,70 @@ +/* +Copyright 2018 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricsstore + +import "io" + +// MetricsWriter is the interface that wraps the WriteAll method. +// WriteAll writes out bytes to the underlying writer. +type MetricsWriter interface { + WriteAll(w io.Writer) +} + +// MultiStoreMetricsWriter is a struct that holds multiple MetricsStore(s) and +// implements the MetricsWriter interface. +// It should be used with stores which have the same metric headers headers. +// +// MultiStoreMetricsWriter writes out metrics from the underlying stores so that +// metrics with the same name coming from different stores end up grouped together. +// It also ensures that the metric headers are only written out once. +type MultiStoreMetricsWriter struct { + stores []*MetricsStore +} + +// NewMultiStoreMetricsWriter creates a new MultiStoreMetricsWriter. +func NewMultiStoreMetricsWriter(stores []*MetricsStore) MetricsWriter { + return &MultiStoreMetricsWriter{ + stores: stores, + } +} + +// WriteAll writes out metrics from the underlying stores to the given writer. +// +// WriteAll writes metrics so that the ones with the same name +// are grouped together when written out. +func (m MultiStoreMetricsWriter) WriteAll(w io.Writer) { + if len(m.stores) == 0 { + return + } + + for _, s := range m.stores { + s.mutex.RLock() + defer func(s *MetricsStore) { + s.mutex.RUnlock() + }(s) + } + + for i, help := range m.stores[0].headers { + w.Write([]byte(help)) + w.Write([]byte{'\n'}) + for _, s := range m.stores { + for _, metricFamilies := range s.metrics { + w.Write(metricFamilies[i]) + } + } + } +} diff --git a/pkg/metrics_store/metrics_writer_test.go b/pkg/metrics_store/metrics_writer_test.go new file mode 100644 index 00000000..5316c63d --- /dev/null +++ b/pkg/metrics_store/metrics_writer_test.go @@ -0,0 +1,224 @@ +/* +Copyright 2018 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricsstore_test + +import ( + "strings" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/kube-state-metrics/v2/pkg/metric" + metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" +) + +func TestMultiNamespaceMetricsWriter_SingleStores_WriteAll(t *testing.T) { + genFunc := func(obj interface{}) []metric.FamilyInterface { + o, err := meta.Accessor(obj) + if err != nil { + t.Fatal(err) + } + + mf1 := metric.Family{ + Name: "kube_service_info_1", + Metrics: []*metric.Metric{ + { + LabelKeys: []string{"namespace", "uid"}, + LabelValues: []string{o.GetNamespace(), string(o.GetUID())}, + Value: float64(1), + }, + }, + } + + mf2 := metric.Family{ + Name: "kube_service_info_2", + Metrics: []*metric.Metric{ + { + LabelKeys: []string{"namespace", "uid"}, + LabelValues: []string{o.GetNamespace(), string(o.GetUID())}, + Value: float64(1), + }, + }, + } + + return []metric.FamilyInterface{&mf1, &mf2} + } + store := metricsstore.NewMetricsStore([]string{"Info 1 about services", "Info 2 about services"}, genFunc) + svcs := []v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "a1", + Name: "service", + Namespace: "a", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + UID: "a2", + Name: "service", + Namespace: "a", + }, + }, + } + for _, svc := range svcs { + if err := store.Add(&svc); err != nil { + t.Fatal(err) + } + } + + multiNsWriter := metricsstore.NewMultiStoreMetricsWriter([]*metricsstore.MetricsStore{store}) + w := strings.Builder{} + multiNsWriter.WriteAll(&w) + result := w.String() + + resultLines := strings.Split(strings.TrimRight(result, "\n"), "\n") + if len(resultLines) != 6 { + t.Fatalf("Invalid number of series, got %d, want %d", len(resultLines), 6) + } + if resultLines[0] != "Info 1 about services" { + t.Fatalf("Invalid metrics header on line 0, got %s, want %s", resultLines[0], "Info 1 about services") + } + if resultLines[3] != "Info 2 about services" { + t.Fatalf("Invalid metrics header on line 3, got %s, want %s", resultLines[3], "Info 2 about services") + } + + expectedSeries := []string{ + `kube_service_info_1{namespace="a",uid="a1"} 1`, + `kube_service_info_1{namespace="a",uid="a2"} 1`, + `kube_service_info_2{namespace="a",uid="a1"} 1`, + `kube_service_info_2{namespace="a",uid="a2"} 1`, + } + + for _, series := range expectedSeries { + if !strings.Contains(result, series) { + t.Fatalf("Did not find expected series %s", series) + } + } +} + +func TestMultiNamespaceMetricsWriter_MultipleStores_WriteAll(t *testing.T) { + genFunc := func(obj interface{}) []metric.FamilyInterface { + o, err := meta.Accessor(obj) + if err != nil { + t.Fatal(err) + } + + mf1 := metric.Family{ + Name: "kube_service_info_1", + Metrics: []*metric.Metric{ + { + LabelKeys: []string{"namespace", "uid"}, + LabelValues: []string{o.GetNamespace(), string(o.GetUID())}, + Value: float64(1), + }, + }, + } + + mf2 := metric.Family{ + Name: "kube_service_info_2", + Metrics: []*metric.Metric{ + { + LabelKeys: []string{"namespace", "uid"}, + LabelValues: []string{o.GetNamespace(), string(o.GetUID())}, + Value: float64(1), + }, + }, + } + + return []metric.FamilyInterface{&mf1, &mf2} + } + s1 := metricsstore.NewMetricsStore([]string{"Info 1 about services", "Info 2 about services"}, genFunc) + svcs1 := []v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "a1", + Name: "service", + Namespace: "a", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + UID: "a2", + Name: "service", + Namespace: "a", + }, + }, + } + for _, svc := range svcs1 { + if err := s1.Add(&svc); err != nil { + t.Fatal(err) + } + } + + svcs2 := []v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "b1", + Name: "service", + Namespace: "b", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + UID: "b2", + Name: "service", + Namespace: "b", + }, + }, + } + s2 := metricsstore.NewMetricsStore([]string{"Info 1 about services", "Info 2 about services"}, genFunc) + for _, svc := range svcs2 { + if err := s2.Add(&svc); err != nil { + t.Fatal(err) + } + } + + multiNsWriter := metricsstore.NewMultiStoreMetricsWriter([]*metricsstore.MetricsStore{s1, s2}) + w := strings.Builder{} + multiNsWriter.WriteAll(&w) + result := w.String() + + resultLines := strings.Split(strings.TrimRight(result, "\n"), "\n") + if len(resultLines) != 10 { + t.Fatalf("Invalid number of series, got %d, want %d", len(resultLines), 10) + } + if resultLines[0] != "Info 1 about services" { + t.Fatalf("Invalid metrics header on line 0, got %s, want %s", resultLines[0], "Info 1 about services") + } + if resultLines[5] != "Info 2 about services" { + t.Fatalf("Invalid metrics header on line 0, got %s, want %s", resultLines[5], "Info 2 about services") + } + + expectedSeries := []string{ + `kube_service_info_1{namespace="a",uid="a1"} 1`, + `kube_service_info_1{namespace="a",uid="a2"} 1`, + `kube_service_info_1{namespace="b",uid="b1"} 1`, + `kube_service_info_1{namespace="b",uid="b2"} 1`, + `kube_service_info_2{namespace="a",uid="a1"} 1`, + `kube_service_info_2{namespace="a",uid="a2"} 1`, + `kube_service_info_2{namespace="b",uid="b1"} 1`, + `kube_service_info_2{namespace="b",uid="b2"} 1`, + } + + for _, series := range expectedSeries { + if !strings.Contains(result, series) { + t.Fatalf("Did not find expected series %s", series) + } + } +} diff --git a/pkg/metricshandler/metrics_handler.go b/pkg/metricshandler/metrics_handler.go index 0e87759b..9dd0cc26 100644 --- a/pkg/metricshandler/metrics_handler.go +++ b/pkg/metricshandler/metrics_handler.go @@ -48,9 +48,9 @@ type MetricsHandler struct { cancel func() - // mtx protects stores, curShard, and curTotalShards + // mtx protects metricsWriters, curShard, and curTotalShards mtx *sync.RWMutex - stores []cache.Store + metricsWriters []metricsstore.MetricsWriter curShard int32 curTotalShards int } @@ -81,7 +81,7 @@ func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, tot ctx, m.cancel = context.WithCancel(ctx) m.storeBuilder.WithSharding(shard, totalShards) m.storeBuilder.WithContext(ctx) - m.stores = m.storeBuilder.Build() + m.metricsWriters = m.storeBuilder.Build() m.curShard = shard m.curTotalShards = totalShards } @@ -174,8 +174,8 @@ func (m *MetricsHandler) Run(ctx context.Context) error { return ctx.Err() } -// ServeHTTP implements the http.Handler interface. It writes the metrics in -// its stores to the response body. +// ServeHTTP implements the http.Handler interface. It writes all generated +// metrics to the response body. func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.mtx.RLock() defer m.mtx.RUnlock() @@ -198,9 +198,8 @@ func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - for _, s := range m.stores { - ms := s.(*metricsstore.MetricsStore) - ms.WriteAll(writer) + for _, w := range m.metricsWriters { + w.WriteAll(writer) } // In case we gzipped the response, we have to close the writer. From acb7d1dd3cd6129fdf20857e29f5e43121a49195 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Fri, 25 Jun 2021 13:49:42 +0200 Subject: [PATCH 2/5] Apply suggestions from code review Co-authored-by: Damien Grisonnet --- pkg/metrics_store/metrics_writer.go | 4 ++-- pkg/metrics_store/metrics_writer_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/metrics_store/metrics_writer.go b/pkg/metrics_store/metrics_writer.go index 965326b1..5d0362ae 100644 --- a/pkg/metrics_store/metrics_writer.go +++ b/pkg/metrics_store/metrics_writer.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Kubernetes Authors All rights reserved. +Copyright 2021 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ type MetricsWriter interface { // MultiStoreMetricsWriter is a struct that holds multiple MetricsStore(s) and // implements the MetricsWriter interface. -// It should be used with stores which have the same metric headers headers. +// It should be used with stores which have the same metric headers. // // MultiStoreMetricsWriter writes out metrics from the underlying stores so that // metrics with the same name coming from different stores end up grouped together. diff --git a/pkg/metrics_store/metrics_writer_test.go b/pkg/metrics_store/metrics_writer_test.go index 5316c63d..b051bd39 100644 --- a/pkg/metrics_store/metrics_writer_test.go +++ b/pkg/metrics_store/metrics_writer_test.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Kubernetes Authors All rights reserved. +Copyright 2021 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From a6eb25e3c6ee7447289323d03434d38eeba82b7e Mon Sep 17 00:00:00 2001 From: fpetkovski Date: Fri, 25 Jun 2021 14:23:09 +0200 Subject: [PATCH 3/5] rename buildStore to buildStores --- internal/store/builder.go | 190 ++++++++++++++++---------------- main.go | 2 +- main_test.go | 10 +- pkg/builder/builder.go | 12 +- pkg/builder/types/interfaces.go | 8 +- 5 files changed, 111 insertions(+), 111 deletions(-) diff --git a/internal/store/builder.go b/internal/store/builder.go index 53079f19..657b3cef 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -67,7 +67,7 @@ type Builder struct { shardingMetrics *sharding.Metrics shard int32 totalShards int - buildStoreFunc ksmtypes.BuildStoreFunc + buildStoresFunc ksmtypes.BuildStoresFunc allowLabelsList map[string][]string } @@ -136,14 +136,14 @@ func (b *Builder) WithAllowDenyList(l ksmtypes.AllowDenyLister) { b.allowDenyList = l } -// WithGenerateStoreFunc configures a custom generate store function -func (b *Builder) WithGenerateStoreFunc(f ksmtypes.BuildStoreFunc) { - b.buildStoreFunc = f +// WithGenerateStoresFunc configures a custom generate store function +func (b *Builder) WithGenerateStoresFunc(f ksmtypes.BuildStoresFunc) { + b.buildStoresFunc = f } -// DefaultGenerateStoreFunc returns default buildStore function -func (b *Builder) DefaultGenerateStoreFunc() ksmtypes.BuildStoreFunc { - return b.buildStore +// DefaultGenerateStoresFunc returns default buildStores function +func (b *Builder) DefaultGenerateStoresFunc() ksmtypes.BuildStoresFunc { + return b.buildStores } // WithAllowLabels configures which labels can be returned for metrics @@ -183,35 +183,35 @@ func (b *Builder) Build() []metricsstore.MetricsWriter { } var availableStores = map[string]func(f *Builder) []*metricsstore.MetricsStore{ - "certificatesigningrequests": func(b *Builder) []*metricsstore.MetricsStore { return b.buildCsrStore() }, - "configmaps": func(b *Builder) []*metricsstore.MetricsStore { return b.buildConfigMapStore() }, - "cronjobs": func(b *Builder) []*metricsstore.MetricsStore { return b.buildCronJobStore() }, - "daemonsets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildDaemonSetStore() }, - "deployments": func(b *Builder) []*metricsstore.MetricsStore { return b.buildDeploymentStore() }, - "endpoints": func(b *Builder) []*metricsstore.MetricsStore { return b.buildEndpointsStore() }, - "horizontalpodautoscalers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildHPAStore() }, - "ingresses": func(b *Builder) []*metricsstore.MetricsStore { return b.buildIngressStore() }, - "jobs": func(b *Builder) []*metricsstore.MetricsStore { return b.buildJobStore() }, - "leases": func(b *Builder) []*metricsstore.MetricsStore { return b.buildLeases() }, - "limitranges": func(b *Builder) []*metricsstore.MetricsStore { return b.buildLimitRangeStore() }, - "mutatingwebhookconfigurations": func(b *Builder) []*metricsstore.MetricsStore { return b.buildMutatingWebhookConfigurationStore() }, - "namespaces": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNamespaceStore() }, - "networkpolicies": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNetworkPolicyStore() }, - "nodes": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNodeStore() }, - "persistentvolumeclaims": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPersistentVolumeClaimStore() }, - "persistentvolumes": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPersistentVolumeStore() }, - "poddisruptionbudgets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPodDisruptionBudgetStore() }, - "pods": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPodStore() }, - "replicasets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildReplicaSetStore() }, - "replicationcontrollers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildReplicationControllerStore() }, - "resourcequotas": func(b *Builder) []*metricsstore.MetricsStore { return b.buildResourceQuotaStore() }, - "secrets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildSecretStore() }, - "services": func(b *Builder) []*metricsstore.MetricsStore { return b.buildServiceStore() }, - "statefulsets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildStatefulSetStore() }, - "storageclasses": func(b *Builder) []*metricsstore.MetricsStore { return b.buildStorageClassStore() }, - "validatingwebhookconfigurations": func(b *Builder) []*metricsstore.MetricsStore { return b.buildValidatingWebhookConfigurationStore() }, - "volumeattachments": func(b *Builder) []*metricsstore.MetricsStore { return b.buildVolumeAttachmentStore() }, - "verticalpodautoscalers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildVPAStore() }, + "certificatesigningrequests": func(b *Builder) []*metricsstore.MetricsStore { return b.buildCsrStores() }, + "configmaps": func(b *Builder) []*metricsstore.MetricsStore { return b.buildConfigMapStores() }, + "cronjobs": func(b *Builder) []*metricsstore.MetricsStore { return b.buildCronJobStores() }, + "daemonsets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildDaemonSetStores() }, + "deployments": func(b *Builder) []*metricsstore.MetricsStore { return b.buildDeploymentStores() }, + "endpoints": func(b *Builder) []*metricsstore.MetricsStore { return b.buildEndpointsStores() }, + "horizontalpodautoscalers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildHPAStores() }, + "ingresses": func(b *Builder) []*metricsstore.MetricsStore { return b.buildIngressStores() }, + "jobs": func(b *Builder) []*metricsstore.MetricsStore { return b.buildJobStores() }, + "leases": func(b *Builder) []*metricsstore.MetricsStore { return b.buildLeasesStores() }, + "limitranges": func(b *Builder) []*metricsstore.MetricsStore { return b.buildLimitRangeStores() }, + "mutatingwebhookconfigurations": func(b *Builder) []*metricsstore.MetricsStore { return b.buildMutatingWebhookConfigurationStores() }, + "namespaces": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNamespaceStores() }, + "networkpolicies": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNetworkPolicyStores() }, + "nodes": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNodeStores() }, + "persistentvolumeclaims": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPersistentVolumeClaimStores() }, + "persistentvolumes": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPersistentVolumeStores() }, + "poddisruptionbudgets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPodDisruptionBudgetStores() }, + "pods": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPodStores() }, + "replicasets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildReplicaSetStores() }, + "replicationcontrollers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildReplicationControllerStores() }, + "resourcequotas": func(b *Builder) []*metricsstore.MetricsStore { return b.buildResourceQuotaStores() }, + "secrets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildSecretStores() }, + "services": func(b *Builder) []*metricsstore.MetricsStore { return b.buildServiceStores() }, + "statefulsets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildStatefulSetStores() }, + "storageclasses": func(b *Builder) []*metricsstore.MetricsStore { return b.buildStorageClassStores() }, + "validatingwebhookconfigurations": func(b *Builder) []*metricsstore.MetricsStore { return b.buildValidatingWebhookConfigurationStores() }, + "volumeattachments": func(b *Builder) []*metricsstore.MetricsStore { return b.buildVolumeAttachmentStores() }, + "verticalpodautoscalers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildVPAStores() }, } func resourceExists(name string) bool { @@ -227,123 +227,123 @@ func availableResources() []string { return c } -func (b *Builder) buildConfigMapStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(configMapMetricFamilies, &v1.ConfigMap{}, createConfigMapListWatch) +func (b *Builder) buildConfigMapStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(configMapMetricFamilies, &v1.ConfigMap{}, createConfigMapListWatch) } -func (b *Builder) buildCronJobStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(cronJobMetricFamilies(b.allowLabelsList["cronjobs"]), &batchv1beta1.CronJob{}, createCronJobListWatch) +func (b *Builder) buildCronJobStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(cronJobMetricFamilies(b.allowLabelsList["cronjobs"]), &batchv1beta1.CronJob{}, createCronJobListWatch) } -func (b *Builder) buildDaemonSetStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(daemonSetMetricFamilies(b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch) +func (b *Builder) buildDaemonSetStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(daemonSetMetricFamilies(b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch) } -func (b *Builder) buildDeploymentStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(deploymentMetricFamilies(b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch) +func (b *Builder) buildDeploymentStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(deploymentMetricFamilies(b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch) } -func (b *Builder) buildEndpointsStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(endpointMetricFamilies(b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch) +func (b *Builder) buildEndpointsStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(endpointMetricFamilies(b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch) } -func (b *Builder) buildHPAStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(hpaMetricFamilies(b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch) +func (b *Builder) buildHPAStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(hpaMetricFamilies(b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch) } -func (b *Builder) buildIngressStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(ingressMetricFamilies(b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch) +func (b *Builder) buildIngressStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(ingressMetricFamilies(b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch) } -func (b *Builder) buildJobStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(jobMetricFamilies(b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch) +func (b *Builder) buildJobStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(jobMetricFamilies(b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch) } -func (b *Builder) buildLimitRangeStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch) +func (b *Builder) buildLimitRangeStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch) } -func (b *Builder) buildMutatingWebhookConfigurationStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch) +func (b *Builder) buildMutatingWebhookConfigurationStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch) } -func (b *Builder) buildNamespaceStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(namespaceMetricFamilies(b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch) +func (b *Builder) buildNamespaceStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(namespaceMetricFamilies(b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch) } -func (b *Builder) buildNetworkPolicyStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(networkPolicyMetricFamilies(b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch) +func (b *Builder) buildNetworkPolicyStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(networkPolicyMetricFamilies(b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch) } -func (b *Builder) buildNodeStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(nodeMetricFamilies(b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch) +func (b *Builder) buildNodeStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(nodeMetricFamilies(b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch) } -func (b *Builder) buildPersistentVolumeClaimStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(persistentVolumeClaimMetricFamilies(b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch) +func (b *Builder) buildPersistentVolumeClaimStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(persistentVolumeClaimMetricFamilies(b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch) } -func (b *Builder) buildPersistentVolumeStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(persistentVolumeMetricFamilies(b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch) +func (b *Builder) buildPersistentVolumeStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(persistentVolumeMetricFamilies(b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch) } -func (b *Builder) buildPodDisruptionBudgetStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(podDisruptionBudgetMetricFamilies, &policy.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch) +func (b *Builder) buildPodDisruptionBudgetStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(podDisruptionBudgetMetricFamilies, &policy.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch) } -func (b *Builder) buildReplicaSetStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(replicaSetMetricFamilies(b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch) +func (b *Builder) buildReplicaSetStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(replicaSetMetricFamilies(b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch) } -func (b *Builder) buildReplicationControllerStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch) +func (b *Builder) buildReplicationControllerStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch) } -func (b *Builder) buildResourceQuotaStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(resourceQuotaMetricFamilies, &v1.ResourceQuota{}, createResourceQuotaListWatch) +func (b *Builder) buildResourceQuotaStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(resourceQuotaMetricFamilies, &v1.ResourceQuota{}, createResourceQuotaListWatch) } -func (b *Builder) buildSecretStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(secretMetricFamilies(b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch) +func (b *Builder) buildSecretStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(secretMetricFamilies(b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch) } -func (b *Builder) buildServiceStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(serviceMetricFamilies(b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch) +func (b *Builder) buildServiceStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(serviceMetricFamilies(b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch) } -func (b *Builder) buildStatefulSetStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(statefulSetMetricFamilies(b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch) +func (b *Builder) buildStatefulSetStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(statefulSetMetricFamilies(b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch) } -func (b *Builder) buildStorageClassStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(storageClassMetricFamilies(b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch) +func (b *Builder) buildStorageClassStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(storageClassMetricFamilies(b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch) } -func (b *Builder) buildPodStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(podMetricFamilies(b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch) +func (b *Builder) buildPodStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(podMetricFamilies(b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch) } -func (b *Builder) buildCsrStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(csrMetricFamilies(b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch) +func (b *Builder) buildCsrStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(csrMetricFamilies(b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch) } -func (b *Builder) buildValidatingWebhookConfigurationStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch) +func (b *Builder) buildValidatingWebhookConfigurationStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch) } -func (b *Builder) buildVolumeAttachmentStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch) +func (b *Builder) buildVolumeAttachmentStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch) } -func (b *Builder) buildVPAStore() []*metricsstore.MetricsStore { - return b.buildStoreFunc(vpaMetricFamilies(b.allowLabelsList["verticalpodautoscalers"]), &vpaautoscaling.VerticalPodAutoscaler{}, createVPAListWatchFunc(b.vpaClient)) +func (b *Builder) buildVPAStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(vpaMetricFamilies(b.allowLabelsList["verticalpodautoscalers"]), &vpaautoscaling.VerticalPodAutoscaler{}, createVPAListWatchFunc(b.vpaClient)) } -func (b *Builder) buildLeases() []*metricsstore.MetricsStore { - return b.buildStoreFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch) +func (b *Builder) buildLeasesStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch) } -func (b *Builder) buildStore( +func (b *Builder) buildStores( metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher, diff --git a/main.go b/main.go index df7512fc..91d0ab21 100644 --- a/main.go +++ b/main.go @@ -139,7 +139,7 @@ func main() { storeBuilder.WithAllowDenyList(allowDenyList) - storeBuilder.WithGenerateStoreFunc(storeBuilder.DefaultGenerateStoreFunc()) + storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc()) proc.StartReaper() diff --git a/main_test.go b/main_test.go index 69086f8d..48dcab02 100644 --- a/main_test.go +++ b/main_test.go @@ -67,7 +67,7 @@ func BenchmarkKubeStateMetrics(b *testing.B) { builder.WithSharding(0, 1) builder.WithContext(ctx) builder.WithNamespaces(options.DefaultNamespaces) - builder.WithGenerateStoreFunc(builder.DefaultGenerateStoreFunc()) + builder.WithGenerateStoresFunc(builder.DefaultGenerateStoresFunc()) l, err := allowdenylist.New(map[string]struct{}{}, map[string]struct{}{}) if err != nil { @@ -132,7 +132,7 @@ func TestFullScrapeCycle(t *testing.T) { builder.WithEnabledResources(options.DefaultResources.AsSlice()) builder.WithKubeClient(kubeClient) builder.WithNamespaces(options.DefaultNamespaces) - builder.WithGenerateStoreFunc(builder.DefaultGenerateStoreFunc()) + builder.WithGenerateStoresFunc(builder.DefaultGenerateStoresFunc()) l, err := allowdenylist.New(map[string]struct{}{}, map[string]struct{}{}) if err != nil { @@ -412,7 +412,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) { unshardedBuilder.WithNamespaces(options.DefaultNamespaces) unshardedBuilder.WithAllowDenyList(l) unshardedBuilder.WithAllowLabels(map[string][]string{}) - unshardedBuilder.WithGenerateStoreFunc(unshardedBuilder.DefaultGenerateStoreFunc()) + unshardedBuilder.WithGenerateStoresFunc(unshardedBuilder.DefaultGenerateStoresFunc()) unshardedHandler := metricshandler.New(&options.Options{}, kubeClient, unshardedBuilder, false) unshardedHandler.ConfigureSharding(ctx, 0, 1) @@ -425,7 +425,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) { shardedBuilder1.WithNamespaces(options.DefaultNamespaces) shardedBuilder1.WithAllowDenyList(l) shardedBuilder1.WithAllowLabels(map[string][]string{}) - shardedBuilder1.WithGenerateStoreFunc(shardedBuilder1.DefaultGenerateStoreFunc()) + shardedBuilder1.WithGenerateStoresFunc(shardedBuilder1.DefaultGenerateStoresFunc()) shardedHandler1 := metricshandler.New(&options.Options{}, kubeClient, shardedBuilder1, false) shardedHandler1.ConfigureSharding(ctx, 0, 2) @@ -438,7 +438,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) { shardedBuilder2.WithNamespaces(options.DefaultNamespaces) shardedBuilder2.WithAllowDenyList(l) shardedBuilder2.WithAllowLabels(map[string][]string{}) - shardedBuilder2.WithGenerateStoreFunc(shardedBuilder2.DefaultGenerateStoreFunc()) + shardedBuilder2.WithGenerateStoresFunc(shardedBuilder2.DefaultGenerateStoresFunc()) shardedHandler2 := metricshandler.New(&options.Options{}, kubeClient, shardedBuilder2, false) shardedHandler2.ConfigureSharding(ctx, 1, 2) diff --git a/pkg/builder/builder.go b/pkg/builder/builder.go index db5f8b83..6a4bc29e 100644 --- a/pkg/builder/builder.go +++ b/pkg/builder/builder.go @@ -90,14 +90,14 @@ func (b *Builder) WithAllowLabels(l map[string][]string) { b.internal.WithAllowLabels(l) } -// WithGenerateStoreFunc configures a custom generate store function -func (b *Builder) WithGenerateStoreFunc(f ksmtypes.BuildStoreFunc) { - b.internal.WithGenerateStoreFunc(f) +// WithGenerateStoresFunc configures a custom generate store function +func (b *Builder) WithGenerateStoresFunc(f ksmtypes.BuildStoresFunc) { + b.internal.WithGenerateStoresFunc(f) } -// DefaultGenerateStoreFunc returns default buildStore function -func (b *Builder) DefaultGenerateStoreFunc() ksmtypes.BuildStoreFunc { - return b.internal.DefaultGenerateStoreFunc() +// DefaultGenerateStoresFunc returns default buildStore function +func (b *Builder) DefaultGenerateStoresFunc() ksmtypes.BuildStoresFunc { + return b.internal.DefaultGenerateStoresFunc() } // Build initializes and registers all enabled stores. diff --git a/pkg/builder/types/interfaces.go b/pkg/builder/types/interfaces.go index a19e0201..b6247d6f 100644 --- a/pkg/builder/types/interfaces.go +++ b/pkg/builder/types/interfaces.go @@ -40,14 +40,14 @@ type BuilderInterface interface { WithKubeClient(c clientset.Interface) WithVPAClient(c vpaclientset.Interface) WithAllowDenyList(l AllowDenyLister) - WithGenerateStoreFunc(f BuildStoreFunc) WithAllowLabels(l map[string][]string) - DefaultGenerateStoreFunc() BuildStoreFunc + WithGenerateStoresFunc(f BuildStoresFunc) + DefaultGenerateStoresFunc() BuildStoresFunc Build() []metricsstore.MetricsWriter } -// BuildStoreFunc function signature that is use to returns a list of metricsstore.MetricsStore -type BuildStoreFunc func(metricFamilies []generator.FamilyGenerator, +// BuildStoresFunc function signature that is use to returns a list of metricsstore.MetricsStore +type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher, ) []*metricsstore.MetricsStore From 222f271486d01f013bd54cfd60c2edc33120865d Mon Sep 17 00:00:00 2001 From: fpetkovski Date: Mon, 28 Jun 2021 10:22:49 +0200 Subject: [PATCH 4/5] Rename tests --- pkg/metrics_store/metrics_writer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/metrics_store/metrics_writer_test.go b/pkg/metrics_store/metrics_writer_test.go index b051bd39..ff758a68 100644 --- a/pkg/metrics_store/metrics_writer_test.go +++ b/pkg/metrics_store/metrics_writer_test.go @@ -28,7 +28,7 @@ import ( metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" ) -func TestMultiNamespaceMetricsWriter_SingleStores_WriteAll(t *testing.T) { +func TestWriteAllWithSingleStore(t *testing.T) { genFunc := func(obj interface{}) []metric.FamilyInterface { o, err := meta.Accessor(obj) if err != nil { @@ -112,7 +112,7 @@ func TestMultiNamespaceMetricsWriter_SingleStores_WriteAll(t *testing.T) { } } -func TestMultiNamespaceMetricsWriter_MultipleStores_WriteAll(t *testing.T) { +func TestWriteAllWithMultipleStores(t *testing.T) { genFunc := func(obj interface{}) []metric.FamilyInterface { o, err := meta.Accessor(obj) if err != nil { From 06dc1347357024c0c62e7015675b23a94da5ba63 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 29 Jun 2021 14:31:21 +0200 Subject: [PATCH 5/5] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Manuel Rüger --- pkg/builder/types/interfaces.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/builder/types/interfaces.go b/pkg/builder/types/interfaces.go index b6247d6f..82d5352d 100644 --- a/pkg/builder/types/interfaces.go +++ b/pkg/builder/types/interfaces.go @@ -46,7 +46,7 @@ type BuilderInterface interface { Build() []metricsstore.MetricsWriter } -// BuildStoresFunc function signature that is use to returns a list of metricsstore.MetricsStore +// BuildStoresFunc function signature that is used to return a list of metricsstore.MetricsStore type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher,