diff --git a/internal/store/builder.go b/internal/store/builder.go index c9e90ea3..657b3cef 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -43,7 +43,6 @@ import ( "k8s.io/klog/v2" ksmtypes "k8s.io/kube-state-metrics/v2/pkg/builder/types" - "k8s.io/kube-state-metrics/v2/pkg/listwatch" generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator" metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" "k8s.io/kube-state-metrics/v2/pkg/options" @@ -68,7 +67,7 @@ type Builder struct { shardingMetrics *sharding.Metrics shard int32 totalShards int - buildStoreFunc ksmtypes.BuildStoreFunc + buildStoresFunc ksmtypes.BuildStoresFunc allowLabelsList map[string][]string } @@ -137,14 +136,14 @@ func (b *Builder) WithAllowDenyList(l ksmtypes.AllowDenyLister) { b.allowDenyList = l } -// WithGenerateStoreFunc configures a custom generate store function -func (b *Builder) WithGenerateStoreFunc(f ksmtypes.BuildStoreFunc) { - b.buildStoreFunc = f +// WithGenerateStoresFunc configures a custom generate store function +func (b *Builder) WithGenerateStoresFunc(f ksmtypes.BuildStoresFunc) { + b.buildStoresFunc = f } -// DefaultGenerateStoreFunc returns default buildStore function -func (b *Builder) DefaultGenerateStoreFunc() ksmtypes.BuildStoreFunc { - return b.buildStore +// DefaultGenerateStoresFunc returns default buildStores function +func (b *Builder) DefaultGenerateStoresFunc() ksmtypes.BuildStoresFunc { + return b.buildStores } // WithAllowLabels configures which labels can be returned for metrics @@ -155,58 +154,64 @@ func (b *Builder) WithAllowLabels(labels map[string][]string) { } // Build initializes and registers all enabled stores. -func (b *Builder) Build() []cache.Store { +// It returns metrics writers which can be used to write out +// metrics from the stores. +func (b *Builder) Build() []metricsstore.MetricsWriter { if b.allowDenyList == nil { panic("allowDenyList should not be nil") } - stores := []cache.Store{} - activeStoreNames := []string{} + var metricsWriters []metricsstore.MetricsWriter + var activeStoreNames []string for _, c := range b.enabledResources { constructor, ok := availableStores[c] if ok { - store := constructor(b) + stores := constructor(b) activeStoreNames = append(activeStoreNames, c) - stores = append(stores, store) + if len(stores) == 1 { + metricsWriters = append(metricsWriters, stores[0]) + } else { + metricsWriters = append(metricsWriters, metricsstore.NewMultiStoreMetricsWriter(stores)) + } } } klog.Infof("Active resources: %s", strings.Join(activeStoreNames, ",")) - return stores + return metricsWriters } -var availableStores = map[string]func(f *Builder) cache.Store{ - "certificatesigningrequests": func(b *Builder) cache.Store { return b.buildCsrStore() }, - "configmaps": func(b *Builder) cache.Store { return b.buildConfigMapStore() }, - "cronjobs": func(b *Builder) cache.Store { return b.buildCronJobStore() }, - "daemonsets": func(b *Builder) cache.Store { return b.buildDaemonSetStore() }, - "deployments": func(b *Builder) cache.Store { return b.buildDeploymentStore() }, - "endpoints": func(b *Builder) cache.Store { return b.buildEndpointsStore() }, - "horizontalpodautoscalers": func(b *Builder) cache.Store { return b.buildHPAStore() }, - "ingresses": func(b *Builder) cache.Store { return b.buildIngressStore() }, - "jobs": func(b *Builder) cache.Store { return b.buildJobStore() }, - "leases": func(b *Builder) cache.Store { return b.buildLeases() }, - "limitranges": func(b *Builder) cache.Store { return b.buildLimitRangeStore() }, - "mutatingwebhookconfigurations": func(b *Builder) cache.Store { return b.buildMutatingWebhookConfigurationStore() }, - "namespaces": func(b *Builder) cache.Store { return b.buildNamespaceStore() }, - "networkpolicies": func(b *Builder) cache.Store { return b.buildNetworkPolicyStore() }, - "nodes": func(b *Builder) cache.Store { return b.buildNodeStore() }, - "persistentvolumeclaims": func(b *Builder) cache.Store { return b.buildPersistentVolumeClaimStore() }, - "persistentvolumes": func(b *Builder) cache.Store { return b.buildPersistentVolumeStore() }, - "poddisruptionbudgets": func(b *Builder) cache.Store { return b.buildPodDisruptionBudgetStore() }, - "pods": func(b *Builder) cache.Store { return b.buildPodStore() }, - "replicasets": func(b *Builder) cache.Store { return b.buildReplicaSetStore() }, - "replicationcontrollers": func(b *Builder) cache.Store { return b.buildReplicationControllerStore() }, - "resourcequotas": func(b *Builder) cache.Store { return b.buildResourceQuotaStore() }, - "secrets": func(b *Builder) cache.Store { return b.buildSecretStore() }, - "services": func(b *Builder) cache.Store { return b.buildServiceStore() }, - "statefulsets": func(b *Builder) cache.Store { return b.buildStatefulSetStore() }, - "storageclasses": func(b *Builder) cache.Store { return b.buildStorageClassStore() }, - "validatingwebhookconfigurations": func(b *Builder) cache.Store { return b.buildValidatingWebhookConfigurationStore() }, - "volumeattachments": func(b *Builder) cache.Store { return b.buildVolumeAttachmentStore() }, - "verticalpodautoscalers": func(b *Builder) cache.Store { return b.buildVPAStore() }, +var availableStores = map[string]func(f *Builder) []*metricsstore.MetricsStore{ + "certificatesigningrequests": func(b *Builder) []*metricsstore.MetricsStore { return b.buildCsrStores() }, + "configmaps": func(b *Builder) []*metricsstore.MetricsStore { return b.buildConfigMapStores() }, + "cronjobs": func(b *Builder) []*metricsstore.MetricsStore { return b.buildCronJobStores() }, + "daemonsets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildDaemonSetStores() }, + "deployments": func(b *Builder) []*metricsstore.MetricsStore { return b.buildDeploymentStores() }, + "endpoints": func(b *Builder) []*metricsstore.MetricsStore { return b.buildEndpointsStores() }, + "horizontalpodautoscalers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildHPAStores() }, + "ingresses": func(b *Builder) []*metricsstore.MetricsStore { return b.buildIngressStores() }, + "jobs": func(b *Builder) []*metricsstore.MetricsStore { return b.buildJobStores() }, + "leases": func(b *Builder) []*metricsstore.MetricsStore { return b.buildLeasesStores() }, + "limitranges": func(b *Builder) []*metricsstore.MetricsStore { return b.buildLimitRangeStores() }, + "mutatingwebhookconfigurations": func(b *Builder) []*metricsstore.MetricsStore { return b.buildMutatingWebhookConfigurationStores() }, + "namespaces": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNamespaceStores() }, + "networkpolicies": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNetworkPolicyStores() }, + "nodes": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNodeStores() }, + "persistentvolumeclaims": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPersistentVolumeClaimStores() }, + "persistentvolumes": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPersistentVolumeStores() }, + "poddisruptionbudgets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPodDisruptionBudgetStores() }, + "pods": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPodStores() }, + "replicasets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildReplicaSetStores() }, + "replicationcontrollers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildReplicationControllerStores() }, + "resourcequotas": func(b *Builder) []*metricsstore.MetricsStore { return b.buildResourceQuotaStores() }, + "secrets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildSecretStores() }, + "services": func(b *Builder) []*metricsstore.MetricsStore { return b.buildServiceStores() }, + "statefulsets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildStatefulSetStores() }, + "storageclasses": func(b *Builder) []*metricsstore.MetricsStore { return b.buildStorageClassStores() }, + "validatingwebhookconfigurations": func(b *Builder) []*metricsstore.MetricsStore { return b.buildValidatingWebhookConfigurationStores() }, + "volumeattachments": func(b *Builder) []*metricsstore.MetricsStore { return b.buildVolumeAttachmentStores() }, + "verticalpodautoscalers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildVPAStores() }, } func resourceExists(name string) bool { @@ -222,150 +227,169 @@ func availableResources() []string { return c } -func (b *Builder) buildConfigMapStore() cache.Store { - return b.buildStoreFunc(configMapMetricFamilies, &v1.ConfigMap{}, createConfigMapListWatch) +func (b *Builder) buildConfigMapStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(configMapMetricFamilies, &v1.ConfigMap{}, createConfigMapListWatch) } -func (b *Builder) buildCronJobStore() cache.Store { - return b.buildStoreFunc(cronJobMetricFamilies(b.allowLabelsList["cronjobs"]), &batchv1beta1.CronJob{}, createCronJobListWatch) +func (b *Builder) buildCronJobStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(cronJobMetricFamilies(b.allowLabelsList["cronjobs"]), &batchv1beta1.CronJob{}, createCronJobListWatch) } -func (b *Builder) buildDaemonSetStore() cache.Store { - return b.buildStoreFunc(daemonSetMetricFamilies(b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch) +func (b *Builder) buildDaemonSetStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(daemonSetMetricFamilies(b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch) } -func (b *Builder) buildDeploymentStore() cache.Store { - return b.buildStoreFunc(deploymentMetricFamilies(b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch) +func (b *Builder) buildDeploymentStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(deploymentMetricFamilies(b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch) } -func (b *Builder) buildEndpointsStore() cache.Store { - return b.buildStoreFunc(endpointMetricFamilies(b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch) +func (b *Builder) buildEndpointsStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(endpointMetricFamilies(b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch) } -func (b *Builder) buildHPAStore() cache.Store { - return b.buildStoreFunc(hpaMetricFamilies(b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch) +func (b *Builder) buildHPAStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(hpaMetricFamilies(b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch) } -func (b *Builder) buildIngressStore() cache.Store { - return b.buildStoreFunc(ingressMetricFamilies(b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch) +func (b *Builder) buildIngressStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(ingressMetricFamilies(b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch) } -func (b *Builder) buildJobStore() cache.Store { - return b.buildStoreFunc(jobMetricFamilies(b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch) +func (b *Builder) buildJobStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(jobMetricFamilies(b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch) } -func (b *Builder) buildLimitRangeStore() cache.Store { - return b.buildStoreFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch) +func (b *Builder) buildLimitRangeStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch) } -func (b *Builder) buildMutatingWebhookConfigurationStore() cache.Store { - return b.buildStoreFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch) +func (b *Builder) buildMutatingWebhookConfigurationStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch) } -func (b *Builder) buildNamespaceStore() cache.Store { - return b.buildStoreFunc(namespaceMetricFamilies(b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch) +func (b *Builder) buildNamespaceStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(namespaceMetricFamilies(b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch) } -func (b *Builder) buildNetworkPolicyStore() cache.Store { - return b.buildStoreFunc(networkPolicyMetricFamilies(b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch) +func (b *Builder) buildNetworkPolicyStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(networkPolicyMetricFamilies(b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch) } -func (b *Builder) buildNodeStore() cache.Store { - return b.buildStoreFunc(nodeMetricFamilies(b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch) +func (b *Builder) buildNodeStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(nodeMetricFamilies(b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch) } -func (b *Builder) buildPersistentVolumeClaimStore() cache.Store { - return b.buildStoreFunc(persistentVolumeClaimMetricFamilies(b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch) +func (b *Builder) buildPersistentVolumeClaimStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(persistentVolumeClaimMetricFamilies(b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch) } -func (b *Builder) buildPersistentVolumeStore() cache.Store { - return b.buildStoreFunc(persistentVolumeMetricFamilies(b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch) +func (b *Builder) buildPersistentVolumeStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(persistentVolumeMetricFamilies(b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch) } -func (b *Builder) buildPodDisruptionBudgetStore() cache.Store { - return b.buildStoreFunc(podDisruptionBudgetMetricFamilies, &policy.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch) +func (b *Builder) buildPodDisruptionBudgetStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(podDisruptionBudgetMetricFamilies, &policy.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch) } -func (b *Builder) buildReplicaSetStore() cache.Store { - return b.buildStoreFunc(replicaSetMetricFamilies(b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch) +func (b *Builder) buildReplicaSetStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(replicaSetMetricFamilies(b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch) } -func (b *Builder) buildReplicationControllerStore() cache.Store { - return b.buildStoreFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch) +func (b *Builder) buildReplicationControllerStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch) } -func (b *Builder) buildResourceQuotaStore() cache.Store { - return b.buildStoreFunc(resourceQuotaMetricFamilies, &v1.ResourceQuota{}, createResourceQuotaListWatch) +func (b *Builder) buildResourceQuotaStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(resourceQuotaMetricFamilies, &v1.ResourceQuota{}, createResourceQuotaListWatch) } -func (b *Builder) buildSecretStore() cache.Store { - return b.buildStoreFunc(secretMetricFamilies(b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch) +func (b *Builder) buildSecretStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(secretMetricFamilies(b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch) } -func (b *Builder) buildServiceStore() cache.Store { - return b.buildStoreFunc(serviceMetricFamilies(b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch) +func (b *Builder) buildServiceStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(serviceMetricFamilies(b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch) } -func (b *Builder) buildStatefulSetStore() cache.Store { - return b.buildStoreFunc(statefulSetMetricFamilies(b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch) +func (b *Builder) buildStatefulSetStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(statefulSetMetricFamilies(b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch) } -func (b *Builder) buildStorageClassStore() cache.Store { - return b.buildStoreFunc(storageClassMetricFamilies(b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch) +func (b *Builder) buildStorageClassStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(storageClassMetricFamilies(b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch) } -func (b *Builder) buildPodStore() cache.Store { - return b.buildStoreFunc(podMetricFamilies(b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch) +func (b *Builder) buildPodStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(podMetricFamilies(b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch) } -func (b *Builder) buildCsrStore() cache.Store { - return b.buildStoreFunc(csrMetricFamilies(b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch) +func (b *Builder) buildCsrStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(csrMetricFamilies(b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch) } -func (b *Builder) buildValidatingWebhookConfigurationStore() cache.Store { - return b.buildStoreFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch) +func (b *Builder) buildValidatingWebhookConfigurationStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch) } -func (b *Builder) buildVolumeAttachmentStore() cache.Store { - return b.buildStoreFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch) +func (b *Builder) buildVolumeAttachmentStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch) } -func (b *Builder) buildVPAStore() cache.Store { - return b.buildStoreFunc(vpaMetricFamilies(b.allowLabelsList["verticalpodautoscalers"]), &vpaautoscaling.VerticalPodAutoscaler{}, createVPAListWatchFunc(b.vpaClient)) +func (b *Builder) buildVPAStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(vpaMetricFamilies(b.allowLabelsList["verticalpodautoscalers"]), &vpaautoscaling.VerticalPodAutoscaler{}, createVPAListWatchFunc(b.vpaClient)) } -func (b *Builder) buildLeases() cache.Store { - return b.buildStoreFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch) +func (b *Builder) buildLeasesStores() []*metricsstore.MetricsStore { + return b.buildStoresFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch) } -func (b *Builder) buildStore( +func (b *Builder) buildStores( metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher, -) cache.Store { +) []*metricsstore.MetricsStore { metricFamilies = generator.FilterMetricFamilies(b.allowDenyList, metricFamilies) composedMetricGenFuncs := generator.ComposeMetricGenFuncs(metricFamilies) familyHeaders := generator.ExtractMetricFamilyHeaders(metricFamilies) - store := metricsstore.NewMetricsStore( - familyHeaders, - composedMetricGenFuncs, - ) - b.reflectorPerNamespace(expectedType, store, listWatchFunc) + if isAllNamespaces(b.namespaces) { + store := metricsstore.NewMetricsStore( + familyHeaders, + composedMetricGenFuncs, + ) + listWatcher := listWatchFunc(b.kubeClient, v1.NamespaceAll) + b.startReflector(expectedType, store, listWatcher) + return []*metricsstore.MetricsStore{store} + } - return store + stores := make([]*metricsstore.MetricsStore, 0, len(b.namespaces)) + for _, ns := range b.namespaces { + store := metricsstore.NewMetricsStore( + familyHeaders, + composedMetricGenFuncs, + ) + listWatcher := listWatchFunc(b.kubeClient, ns) + b.startReflector(expectedType, store, listWatcher) + stores = append(stores, store) + } + + return stores } -// reflectorPerNamespace creates a Kubernetes client-go reflector with the given -// listWatchFunc for each given namespace and registers it with the given store. -func (b *Builder) reflectorPerNamespace( +// startReflector starts a Kubernetes client-go reflector with the given +// listWatcher and registers it with the given store. +func (b *Builder) startReflector( expectedType interface{}, store cache.Store, - listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher, + listWatcher cache.ListerWatcher, ) { - lwf := func(ns string) cache.ListerWatcher { return listWatchFunc(b.kubeClient, ns) } - lw := listwatch.MultiNamespaceListerWatcher(b.namespaces, nil, lwf) - instrumentedListWatch := watch.NewInstrumentedListerWatcher(lw, b.listWatchMetrics, reflect.TypeOf(expectedType).String()) + instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String()) reflector := cache.NewReflector(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, 0) go reflector.Run(b.ctx.Done()) } + +// isAllNamespaces checks if the given slice of namespaces +// contains only v1.NamespaceAll. +func isAllNamespaces(namespaces []string) bool { + return len(namespaces) == 1 && namespaces[0] == v1.NamespaceAll +} diff --git a/main.go b/main.go index a66934b9..40964a64 100644 --- a/main.go +++ b/main.go @@ -140,7 +140,7 @@ func main() { storeBuilder.WithAllowDenyList(allowDenyList) - storeBuilder.WithGenerateStoreFunc(storeBuilder.DefaultGenerateStoreFunc()) + storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc()) proc.StartReaper() diff --git a/main_test.go b/main_test.go index 69086f8d..48dcab02 100644 --- a/main_test.go +++ b/main_test.go @@ -67,7 +67,7 @@ func BenchmarkKubeStateMetrics(b *testing.B) { builder.WithSharding(0, 1) builder.WithContext(ctx) builder.WithNamespaces(options.DefaultNamespaces) - builder.WithGenerateStoreFunc(builder.DefaultGenerateStoreFunc()) + builder.WithGenerateStoresFunc(builder.DefaultGenerateStoresFunc()) l, err := allowdenylist.New(map[string]struct{}{}, map[string]struct{}{}) if err != nil { @@ -132,7 +132,7 @@ func TestFullScrapeCycle(t *testing.T) { builder.WithEnabledResources(options.DefaultResources.AsSlice()) builder.WithKubeClient(kubeClient) builder.WithNamespaces(options.DefaultNamespaces) - builder.WithGenerateStoreFunc(builder.DefaultGenerateStoreFunc()) + builder.WithGenerateStoresFunc(builder.DefaultGenerateStoresFunc()) l, err := allowdenylist.New(map[string]struct{}{}, map[string]struct{}{}) if err != nil { @@ -412,7 +412,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) { unshardedBuilder.WithNamespaces(options.DefaultNamespaces) unshardedBuilder.WithAllowDenyList(l) unshardedBuilder.WithAllowLabels(map[string][]string{}) - unshardedBuilder.WithGenerateStoreFunc(unshardedBuilder.DefaultGenerateStoreFunc()) + unshardedBuilder.WithGenerateStoresFunc(unshardedBuilder.DefaultGenerateStoresFunc()) unshardedHandler := metricshandler.New(&options.Options{}, kubeClient, unshardedBuilder, false) unshardedHandler.ConfigureSharding(ctx, 0, 1) @@ -425,7 +425,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) { shardedBuilder1.WithNamespaces(options.DefaultNamespaces) shardedBuilder1.WithAllowDenyList(l) shardedBuilder1.WithAllowLabels(map[string][]string{}) - shardedBuilder1.WithGenerateStoreFunc(shardedBuilder1.DefaultGenerateStoreFunc()) + shardedBuilder1.WithGenerateStoresFunc(shardedBuilder1.DefaultGenerateStoresFunc()) shardedHandler1 := metricshandler.New(&options.Options{}, kubeClient, shardedBuilder1, false) shardedHandler1.ConfigureSharding(ctx, 0, 2) @@ -438,7 +438,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) { shardedBuilder2.WithNamespaces(options.DefaultNamespaces) shardedBuilder2.WithAllowDenyList(l) shardedBuilder2.WithAllowLabels(map[string][]string{}) - shardedBuilder2.WithGenerateStoreFunc(shardedBuilder2.DefaultGenerateStoreFunc()) + shardedBuilder2.WithGenerateStoresFunc(shardedBuilder2.DefaultGenerateStoresFunc()) shardedHandler2 := metricshandler.New(&options.Options{}, kubeClient, shardedBuilder2, false) shardedHandler2.ConfigureSharding(ctx, 1, 2) diff --git a/pkg/builder/builder.go b/pkg/builder/builder.go index 365051da..6a4bc29e 100644 --- a/pkg/builder/builder.go +++ b/pkg/builder/builder.go @@ -19,10 +19,11 @@ package builder import ( "context" + metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" + "github.com/prometheus/client_golang/prometheus" vpaclientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" internalstore "k8s.io/kube-state-metrics/v2/internal/store" ksmtypes "k8s.io/kube-state-metrics/v2/pkg/builder/types" @@ -89,17 +90,17 @@ func (b *Builder) WithAllowLabels(l map[string][]string) { b.internal.WithAllowLabels(l) } -// WithGenerateStoreFunc configures a custom generate store function -func (b *Builder) WithGenerateStoreFunc(f ksmtypes.BuildStoreFunc) { - b.internal.WithGenerateStoreFunc(f) +// WithGenerateStoresFunc configures a custom generate store function +func (b *Builder) WithGenerateStoresFunc(f ksmtypes.BuildStoresFunc) { + b.internal.WithGenerateStoresFunc(f) } -// DefaultGenerateStoreFunc returns default buildStore function -func (b *Builder) DefaultGenerateStoreFunc() ksmtypes.BuildStoreFunc { - return b.internal.DefaultGenerateStoreFunc() +// DefaultGenerateStoresFunc returns default buildStore function +func (b *Builder) DefaultGenerateStoresFunc() ksmtypes.BuildStoresFunc { + return b.internal.DefaultGenerateStoresFunc() } // Build initializes and registers all enabled stores. -func (b *Builder) Build() []cache.Store { +func (b *Builder) Build() []metricsstore.MetricsWriter { return b.internal.Build() } diff --git a/pkg/builder/types/interfaces.go b/pkg/builder/types/interfaces.go index c30f0adc..82d5352d 100644 --- a/pkg/builder/types/interfaces.go +++ b/pkg/builder/types/interfaces.go @@ -19,6 +19,8 @@ package types import ( "context" + metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" + "github.com/prometheus/client_golang/prometheus" vpaclientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" clientset "k8s.io/client-go/kubernetes" @@ -38,17 +40,17 @@ type BuilderInterface interface { WithKubeClient(c clientset.Interface) WithVPAClient(c vpaclientset.Interface) WithAllowDenyList(l AllowDenyLister) - WithGenerateStoreFunc(f BuildStoreFunc) WithAllowLabels(l map[string][]string) - DefaultGenerateStoreFunc() BuildStoreFunc - Build() []cache.Store + WithGenerateStoresFunc(f BuildStoresFunc) + DefaultGenerateStoresFunc() BuildStoresFunc + Build() []metricsstore.MetricsWriter } -// BuildStoreFunc function signature that is use to returns a cache.Store -type BuildStoreFunc func(metricFamilies []generator.FamilyGenerator, +// BuildStoresFunc function signature that is used to return a list of metricsstore.MetricsStore +type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher, -) cache.Store +) []*metricsstore.MetricsStore // AllowDenyLister interface for AllowDeny lister that can allow or exclude metrics by there names type AllowDenyLister interface { diff --git a/pkg/listwatch/listwatch.go b/pkg/listwatch/listwatch.go deleted file mode 100644 index b6668121..00000000 --- a/pkg/listwatch/listwatch.go +++ /dev/null @@ -1,202 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package listwatch - -import ( - "fmt" - "strings" - "sync" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" -) - -// MultiNamespaceListerWatcher takes allowed and denied namespaces and a -// cache.ListerWatcher generator func and returns a single cache.ListerWatcher -// capable of operating on multiple namespaces. -// -// Allowed namespaces and denied namespaces are mutually exclusive. -// If allowed namespaces contain multiple items, the given denied namespaces have no effect. -// If the allowed namespaces includes exactly one entry with the value v1.NamespaceAll (empty string), -// the given denied namespaces are applied. -func MultiNamespaceListerWatcher(allowedNamespaces, deniedNamespaces []string, f func(string) cache.ListerWatcher) cache.ListerWatcher { - // If there is only one namespace then there is no need to create a - // multi lister watcher proxy. - if IsAllNamespaces(allowedNamespaces) { - return newDenylistListerWatcher(deniedNamespaces, f(allowedNamespaces[0])) - } - if len(allowedNamespaces) == 1 { - return f(allowedNamespaces[0]) - } - - var lws []cache.ListerWatcher - for _, n := range allowedNamespaces { - lws = append(lws, f(n)) - } - return multiListerWatcher(lws) -} - -// multiListerWatcher abstracts several cache.ListerWatchers, allowing them -// to be treated as a single cache.ListerWatcher. -type multiListerWatcher []cache.ListerWatcher - -// List implements the ListerWatcher interface. -// It combines the output of the List method of every ListerWatcher into -// a single result. -func (mlw multiListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { - l := metav1.List{} - var resourceVersions []string - for _, lw := range mlw { - list, err := lw.List(options) - if err != nil { - return nil, err - } - items, err := meta.ExtractList(list) - if err != nil { - return nil, err - } - metaObj, err := meta.ListAccessor(list) - if err != nil { - return nil, err - } - for _, item := range items { - l.Items = append(l.Items, runtime.RawExtension{Object: item.DeepCopyObject()}) - } - resourceVersions = append(resourceVersions, metaObj.GetResourceVersion()) - } - // Combine the resource versions so that the composite Watch method can - // distribute appropriate versions to each underlying Watch func. - l.ListMeta.ResourceVersion = strings.Join(resourceVersions, "/") - return &l, nil -} - -// Watch implements the ListerWatcher interface. -// It returns a watch.Interface that combines the output from the -// watch.Interface of every cache.ListerWatcher into a single result chan. -func (mlw multiListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { - resourceVersions := make([]string, len(mlw)) - // Allow resource versions to be "". - if options.ResourceVersion != "" { - rvs := make([]string, 0, len(mlw)) - if strings.Contains(options.ResourceVersion, "/") { - rvs = strings.Split(options.ResourceVersion, "/") - if len(rvs) != len(mlw) { - return nil, fmt.Errorf("expected resource version to have %d parts to match the number of ListerWatchers, actual: %d", len(mlw), len(rvs)) - } - } else { - // watch reconnected and resource version is the latest one from event.Object has no "/" - for i := 0; i < len(mlw); i++ { - rvs = append(rvs, options.ResourceVersion) - } - } - resourceVersions = rvs - } - return newMultiWatch(mlw, resourceVersions, options) -} - -// multiWatch abstracts multiple watch.Interface's, allowing them -// to be treated as a single watch.Interface. -type multiWatch struct { - result chan watch.Event - stopped chan struct{} - stoppers []func() -} - -// newMultiWatch returns a new multiWatch or an error if one of the underlying -// Watch funcs errored. The length of []cache.ListerWatcher and []string must -// match. -func newMultiWatch(lws []cache.ListerWatcher, resourceVersions []string, options metav1.ListOptions) (*multiWatch, error) { - var ( - result = make(chan watch.Event) - stopped = make(chan struct{}) - stoppers []func() - wg sync.WaitGroup - ) - - wg.Add(len(lws)) - - for i, lw := range lws { - o := options.DeepCopy() - o.ResourceVersion = resourceVersions[i] - w, err := lw.Watch(*o) - if err != nil { - return nil, err - } - - go func() { - defer wg.Done() - - for { - event, ok := <-w.ResultChan() - if !ok { - return - } - - select { - case result <- event: - case <-stopped: - return - } - } - }() - stoppers = append(stoppers, w.Stop) - } - - // result chan must be closed, - // once all event sender goroutines exited. - go func() { - wg.Wait() - close(result) - }() - - return &multiWatch{ - result: result, - stoppers: stoppers, - stopped: stopped, - }, nil -} - -// ResultChan implements the watch.Interface interface. -func (mw *multiWatch) ResultChan() <-chan watch.Event { - return mw.result -} - -// Stop implements the watch.Interface interface. -// It stops all of the underlying watch.Interfaces and closes the backing chan. -// Can safely be called more than once. -func (mw *multiWatch) Stop() { - select { - case <-mw.stopped: - // nothing to do, we are already stopped - default: - for _, stop := range mw.stoppers { - stop() - } - close(mw.stopped) - } - return -} - -// IsAllNamespaces checks if the given slice of namespaces -// contains only v1.NamespaceAll. -func IsAllNamespaces(namespaces []string) bool { - return len(namespaces) == 1 && namespaces[0] == v1.NamespaceAll -} diff --git a/pkg/listwatch/namespace_denylist.go b/pkg/listwatch/namespace_denylist.go deleted file mode 100644 index 957b962c..00000000 --- a/pkg/listwatch/namespace_denylist.go +++ /dev/null @@ -1,184 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package listwatch - -import ( - "fmt" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" -) - -// denylistListerWatcher implements cache.ListerWatcher -// which wraps a cache.ListerWatcher, -// filtering list results and watch events by denied namespaces. -type denylistListerWatcher struct { - denylist map[string]struct{} - next cache.ListerWatcher -} - -// newDenylistListerWatcher creates a cache.ListerWatcher -// wrapping the given next cache.ListerWatcher -// filtering lists and watch events by the given namespaces. -func newDenylistListerWatcher(namespaces []string, next cache.ListerWatcher) cache.ListerWatcher { - if len(namespaces) == 0 { - return next - } - - denylist := make(map[string]struct{}) - - for _, ns := range namespaces { - denylist[ns] = struct{}{} - } - - return &denylistListerWatcher{ - denylist: denylist, - next: next, - } -} - -// List lists the wrapped next listerwatcher List result, -// but filtering denied namespaces from the result. -func (w *denylistListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { - l := metav1.List{} - - list, err := w.next.List(options) - if err != nil { - klog.Errorf("error listing: %v", err) - return nil, err - } - - objs, err := meta.ExtractList(list) - if err != nil { - klog.Errorf("error extracting list: %v", err) - return nil, err - } - - metaObj, err := meta.ListAccessor(list) - if err != nil { - klog.Errorf("error getting list accessor: %v", err) - return nil, err - } - - for _, obj := range objs { - acc, err := meta.Accessor(obj) - if err != nil { - klog.Errorf("error getting meta accessor accessor for object %s: %v", fmt.Sprintf("%v", obj), err) - return nil, err - } - - if _, denied := w.denylist[getNamespace(acc)]; denied { - klog.V(8).Infof("denied %s", acc.GetSelfLink()) - continue - } - - klog.V(8).Infof("allowed %s", acc.GetSelfLink()) - - l.Items = append(l.Items, runtime.RawExtension{Object: obj.DeepCopyObject()}) - } - - l.ListMeta.ResourceVersion = metaObj.GetResourceVersion() - return &l, nil -} - -// Watch -func (w *denylistListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { - nextWatch, err := w.next.Watch(options) - if err != nil { - return nil, err - } - - return newDenylistWatch(w.denylist, nextWatch), nil -} - -// newDenylistWatch creates a new watch.Interface, -// wrapping the given next watcher, -// and filtering watch events by the given namespaces. -// -// It starts a new goroutine until either -// a) the result channel of the wrapped next watcher is closed, or -// b) Stop() was invoked on the returned watcher. -func newDenylistWatch(denylist map[string]struct{}, next watch.Interface) watch.Interface { - var ( - result = make(chan watch.Event) - proxy = watch.NewProxyWatcher(result) - ) - - go func() { - defer func() { - klog.V(8).Info("stopped denylist watcher") - // According to watch.Interface the result channel is supposed to be called - // in case of error or if the listwach is closed, see [1]. - // - // [1] https://github.com/kubernetes/apimachinery/blob/533d101be9a6450773bb2829bef282b6b7c4ff6d/pkg/watch/watch.go#L34-L37 - close(result) - }() - - for { - select { - case event, ok := <-next.ResultChan(): - if !ok { - klog.V(8).Info("result channel closed") - return - } - - acc, err := meta.Accessor(event.Object) - if err != nil { - // ignore this event, it doesn't implement the metav1.Object interface, - // hence we cannot determine its namespace. - klog.V(6).Infof("unexpected object type in event (%T): %v", event.Object, event.Object) - continue - } - - if _, denied := denylist[getNamespace(acc)]; denied { - klog.V(8).Infof("denied %s", acc.GetSelfLink()) - continue - } - - klog.V(8).Infof("allowed %s", acc.GetSelfLink()) - - select { - case result <- event: - klog.V(8).Infof("dispatched %s", acc.GetSelfLink()) - case <-proxy.StopChan(): - next.Stop() - return - } - case <-proxy.StopChan(): - next.Stop() - return - } - } - }() - - return proxy -} - -// getNamespace returns the namespace of the given object. -// If the object is itself a namespace, it returns the object's -// name. -func getNamespace(obj metav1.Object) string { - if _, ok := obj.(*v1.Namespace); ok { - return obj.GetName() - } - return obj.GetNamespace() -} diff --git a/pkg/metrics_store/metrics_writer.go b/pkg/metrics_store/metrics_writer.go new file mode 100644 index 00000000..5d0362ae --- /dev/null +++ b/pkg/metrics_store/metrics_writer.go @@ -0,0 +1,70 @@ +/* +Copyright 2021 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricsstore + +import "io" + +// MetricsWriter is the interface that wraps the WriteAll method. +// WriteAll writes out bytes to the underlying writer. +type MetricsWriter interface { + WriteAll(w io.Writer) +} + +// MultiStoreMetricsWriter is a struct that holds multiple MetricsStore(s) and +// implements the MetricsWriter interface. +// It should be used with stores which have the same metric headers. +// +// MultiStoreMetricsWriter writes out metrics from the underlying stores so that +// metrics with the same name coming from different stores end up grouped together. +// It also ensures that the metric headers are only written out once. +type MultiStoreMetricsWriter struct { + stores []*MetricsStore +} + +// NewMultiStoreMetricsWriter creates a new MultiStoreMetricsWriter. +func NewMultiStoreMetricsWriter(stores []*MetricsStore) MetricsWriter { + return &MultiStoreMetricsWriter{ + stores: stores, + } +} + +// WriteAll writes out metrics from the underlying stores to the given writer. +// +// WriteAll writes metrics so that the ones with the same name +// are grouped together when written out. +func (m MultiStoreMetricsWriter) WriteAll(w io.Writer) { + if len(m.stores) == 0 { + return + } + + for _, s := range m.stores { + s.mutex.RLock() + defer func(s *MetricsStore) { + s.mutex.RUnlock() + }(s) + } + + for i, help := range m.stores[0].headers { + w.Write([]byte(help)) + w.Write([]byte{'\n'}) + for _, s := range m.stores { + for _, metricFamilies := range s.metrics { + w.Write(metricFamilies[i]) + } + } + } +} diff --git a/pkg/metrics_store/metrics_writer_test.go b/pkg/metrics_store/metrics_writer_test.go new file mode 100644 index 00000000..ff758a68 --- /dev/null +++ b/pkg/metrics_store/metrics_writer_test.go @@ -0,0 +1,224 @@ +/* +Copyright 2021 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricsstore_test + +import ( + "strings" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/kube-state-metrics/v2/pkg/metric" + metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" +) + +func TestWriteAllWithSingleStore(t *testing.T) { + genFunc := func(obj interface{}) []metric.FamilyInterface { + o, err := meta.Accessor(obj) + if err != nil { + t.Fatal(err) + } + + mf1 := metric.Family{ + Name: "kube_service_info_1", + Metrics: []*metric.Metric{ + { + LabelKeys: []string{"namespace", "uid"}, + LabelValues: []string{o.GetNamespace(), string(o.GetUID())}, + Value: float64(1), + }, + }, + } + + mf2 := metric.Family{ + Name: "kube_service_info_2", + Metrics: []*metric.Metric{ + { + LabelKeys: []string{"namespace", "uid"}, + LabelValues: []string{o.GetNamespace(), string(o.GetUID())}, + Value: float64(1), + }, + }, + } + + return []metric.FamilyInterface{&mf1, &mf2} + } + store := metricsstore.NewMetricsStore([]string{"Info 1 about services", "Info 2 about services"}, genFunc) + svcs := []v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "a1", + Name: "service", + Namespace: "a", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + UID: "a2", + Name: "service", + Namespace: "a", + }, + }, + } + for _, svc := range svcs { + if err := store.Add(&svc); err != nil { + t.Fatal(err) + } + } + + multiNsWriter := metricsstore.NewMultiStoreMetricsWriter([]*metricsstore.MetricsStore{store}) + w := strings.Builder{} + multiNsWriter.WriteAll(&w) + result := w.String() + + resultLines := strings.Split(strings.TrimRight(result, "\n"), "\n") + if len(resultLines) != 6 { + t.Fatalf("Invalid number of series, got %d, want %d", len(resultLines), 6) + } + if resultLines[0] != "Info 1 about services" { + t.Fatalf("Invalid metrics header on line 0, got %s, want %s", resultLines[0], "Info 1 about services") + } + if resultLines[3] != "Info 2 about services" { + t.Fatalf("Invalid metrics header on line 3, got %s, want %s", resultLines[3], "Info 2 about services") + } + + expectedSeries := []string{ + `kube_service_info_1{namespace="a",uid="a1"} 1`, + `kube_service_info_1{namespace="a",uid="a2"} 1`, + `kube_service_info_2{namespace="a",uid="a1"} 1`, + `kube_service_info_2{namespace="a",uid="a2"} 1`, + } + + for _, series := range expectedSeries { + if !strings.Contains(result, series) { + t.Fatalf("Did not find expected series %s", series) + } + } +} + +func TestWriteAllWithMultipleStores(t *testing.T) { + genFunc := func(obj interface{}) []metric.FamilyInterface { + o, err := meta.Accessor(obj) + if err != nil { + t.Fatal(err) + } + + mf1 := metric.Family{ + Name: "kube_service_info_1", + Metrics: []*metric.Metric{ + { + LabelKeys: []string{"namespace", "uid"}, + LabelValues: []string{o.GetNamespace(), string(o.GetUID())}, + Value: float64(1), + }, + }, + } + + mf2 := metric.Family{ + Name: "kube_service_info_2", + Metrics: []*metric.Metric{ + { + LabelKeys: []string{"namespace", "uid"}, + LabelValues: []string{o.GetNamespace(), string(o.GetUID())}, + Value: float64(1), + }, + }, + } + + return []metric.FamilyInterface{&mf1, &mf2} + } + s1 := metricsstore.NewMetricsStore([]string{"Info 1 about services", "Info 2 about services"}, genFunc) + svcs1 := []v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "a1", + Name: "service", + Namespace: "a", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + UID: "a2", + Name: "service", + Namespace: "a", + }, + }, + } + for _, svc := range svcs1 { + if err := s1.Add(&svc); err != nil { + t.Fatal(err) + } + } + + svcs2 := []v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "b1", + Name: "service", + Namespace: "b", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + UID: "b2", + Name: "service", + Namespace: "b", + }, + }, + } + s2 := metricsstore.NewMetricsStore([]string{"Info 1 about services", "Info 2 about services"}, genFunc) + for _, svc := range svcs2 { + if err := s2.Add(&svc); err != nil { + t.Fatal(err) + } + } + + multiNsWriter := metricsstore.NewMultiStoreMetricsWriter([]*metricsstore.MetricsStore{s1, s2}) + w := strings.Builder{} + multiNsWriter.WriteAll(&w) + result := w.String() + + resultLines := strings.Split(strings.TrimRight(result, "\n"), "\n") + if len(resultLines) != 10 { + t.Fatalf("Invalid number of series, got %d, want %d", len(resultLines), 10) + } + if resultLines[0] != "Info 1 about services" { + t.Fatalf("Invalid metrics header on line 0, got %s, want %s", resultLines[0], "Info 1 about services") + } + if resultLines[5] != "Info 2 about services" { + t.Fatalf("Invalid metrics header on line 0, got %s, want %s", resultLines[5], "Info 2 about services") + } + + expectedSeries := []string{ + `kube_service_info_1{namespace="a",uid="a1"} 1`, + `kube_service_info_1{namespace="a",uid="a2"} 1`, + `kube_service_info_1{namespace="b",uid="b1"} 1`, + `kube_service_info_1{namespace="b",uid="b2"} 1`, + `kube_service_info_2{namespace="a",uid="a1"} 1`, + `kube_service_info_2{namespace="a",uid="a2"} 1`, + `kube_service_info_2{namespace="b",uid="b1"} 1`, + `kube_service_info_2{namespace="b",uid="b2"} 1`, + } + + for _, series := range expectedSeries { + if !strings.Contains(result, series) { + t.Fatalf("Did not find expected series %s", series) + } + } +} diff --git a/pkg/metricshandler/metrics_handler.go b/pkg/metricshandler/metrics_handler.go index 0e87759b..9dd0cc26 100644 --- a/pkg/metricshandler/metrics_handler.go +++ b/pkg/metricshandler/metrics_handler.go @@ -48,9 +48,9 @@ type MetricsHandler struct { cancel func() - // mtx protects stores, curShard, and curTotalShards + // mtx protects metricsWriters, curShard, and curTotalShards mtx *sync.RWMutex - stores []cache.Store + metricsWriters []metricsstore.MetricsWriter curShard int32 curTotalShards int } @@ -81,7 +81,7 @@ func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, tot ctx, m.cancel = context.WithCancel(ctx) m.storeBuilder.WithSharding(shard, totalShards) m.storeBuilder.WithContext(ctx) - m.stores = m.storeBuilder.Build() + m.metricsWriters = m.storeBuilder.Build() m.curShard = shard m.curTotalShards = totalShards } @@ -174,8 +174,8 @@ func (m *MetricsHandler) Run(ctx context.Context) error { return ctx.Err() } -// ServeHTTP implements the http.Handler interface. It writes the metrics in -// its stores to the response body. +// ServeHTTP implements the http.Handler interface. It writes all generated +// metrics to the response body. func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.mtx.RLock() defer m.mtx.RUnlock() @@ -198,9 +198,8 @@ func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - for _, s := range m.stores { - ms := s.(*metricsstore.MetricsStore) - ms.WriteAll(writer) + for _, w := range m.metricsWriters { + w.WriteAll(writer) } // In case we gzipped the response, we have to close the writer.