Merge pull request #1499 from fpetkovski/fix-multiwatcher

Replace multiListerWatcher with independent listWatchers per namespace
This commit is contained in:
Kubernetes Prow Robot 2021-07-14 05:32:26 -07:00 committed by GitHub
commit 1d61fc1461
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 465 additions and 531 deletions

View File

@ -43,7 +43,6 @@ import (
"k8s.io/klog/v2"
ksmtypes "k8s.io/kube-state-metrics/v2/pkg/builder/types"
"k8s.io/kube-state-metrics/v2/pkg/listwatch"
generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator"
metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"
"k8s.io/kube-state-metrics/v2/pkg/options"
@ -68,7 +67,7 @@ type Builder struct {
shardingMetrics *sharding.Metrics
shard int32
totalShards int
buildStoreFunc ksmtypes.BuildStoreFunc
buildStoresFunc ksmtypes.BuildStoresFunc
allowLabelsList map[string][]string
}
@ -137,14 +136,14 @@ func (b *Builder) WithAllowDenyList(l ksmtypes.AllowDenyLister) {
b.allowDenyList = l
}
// WithGenerateStoreFunc configures a custom generate store function
func (b *Builder) WithGenerateStoreFunc(f ksmtypes.BuildStoreFunc) {
b.buildStoreFunc = f
// WithGenerateStoresFunc configures a custom generate store function
func (b *Builder) WithGenerateStoresFunc(f ksmtypes.BuildStoresFunc) {
b.buildStoresFunc = f
}
// DefaultGenerateStoreFunc returns default buildStore function
func (b *Builder) DefaultGenerateStoreFunc() ksmtypes.BuildStoreFunc {
return b.buildStore
// DefaultGenerateStoresFunc returns default buildStores function
func (b *Builder) DefaultGenerateStoresFunc() ksmtypes.BuildStoresFunc {
return b.buildStores
}
// WithAllowLabels configures which labels can be returned for metrics
@ -155,58 +154,64 @@ func (b *Builder) WithAllowLabels(labels map[string][]string) {
}
// Build initializes and registers all enabled stores.
func (b *Builder) Build() []cache.Store {
// It returns metrics writers which can be used to write out
// metrics from the stores.
func (b *Builder) Build() []metricsstore.MetricsWriter {
if b.allowDenyList == nil {
panic("allowDenyList should not be nil")
}
stores := []cache.Store{}
activeStoreNames := []string{}
var metricsWriters []metricsstore.MetricsWriter
var activeStoreNames []string
for _, c := range b.enabledResources {
constructor, ok := availableStores[c]
if ok {
store := constructor(b)
stores := constructor(b)
activeStoreNames = append(activeStoreNames, c)
stores = append(stores, store)
if len(stores) == 1 {
metricsWriters = append(metricsWriters, stores[0])
} else {
metricsWriters = append(metricsWriters, metricsstore.NewMultiStoreMetricsWriter(stores))
}
}
}
klog.Infof("Active resources: %s", strings.Join(activeStoreNames, ","))
return stores
return metricsWriters
}
var availableStores = map[string]func(f *Builder) cache.Store{
"certificatesigningrequests": func(b *Builder) cache.Store { return b.buildCsrStore() },
"configmaps": func(b *Builder) cache.Store { return b.buildConfigMapStore() },
"cronjobs": func(b *Builder) cache.Store { return b.buildCronJobStore() },
"daemonsets": func(b *Builder) cache.Store { return b.buildDaemonSetStore() },
"deployments": func(b *Builder) cache.Store { return b.buildDeploymentStore() },
"endpoints": func(b *Builder) cache.Store { return b.buildEndpointsStore() },
"horizontalpodautoscalers": func(b *Builder) cache.Store { return b.buildHPAStore() },
"ingresses": func(b *Builder) cache.Store { return b.buildIngressStore() },
"jobs": func(b *Builder) cache.Store { return b.buildJobStore() },
"leases": func(b *Builder) cache.Store { return b.buildLeases() },
"limitranges": func(b *Builder) cache.Store { return b.buildLimitRangeStore() },
"mutatingwebhookconfigurations": func(b *Builder) cache.Store { return b.buildMutatingWebhookConfigurationStore() },
"namespaces": func(b *Builder) cache.Store { return b.buildNamespaceStore() },
"networkpolicies": func(b *Builder) cache.Store { return b.buildNetworkPolicyStore() },
"nodes": func(b *Builder) cache.Store { return b.buildNodeStore() },
"persistentvolumeclaims": func(b *Builder) cache.Store { return b.buildPersistentVolumeClaimStore() },
"persistentvolumes": func(b *Builder) cache.Store { return b.buildPersistentVolumeStore() },
"poddisruptionbudgets": func(b *Builder) cache.Store { return b.buildPodDisruptionBudgetStore() },
"pods": func(b *Builder) cache.Store { return b.buildPodStore() },
"replicasets": func(b *Builder) cache.Store { return b.buildReplicaSetStore() },
"replicationcontrollers": func(b *Builder) cache.Store { return b.buildReplicationControllerStore() },
"resourcequotas": func(b *Builder) cache.Store { return b.buildResourceQuotaStore() },
"secrets": func(b *Builder) cache.Store { return b.buildSecretStore() },
"services": func(b *Builder) cache.Store { return b.buildServiceStore() },
"statefulsets": func(b *Builder) cache.Store { return b.buildStatefulSetStore() },
"storageclasses": func(b *Builder) cache.Store { return b.buildStorageClassStore() },
"validatingwebhookconfigurations": func(b *Builder) cache.Store { return b.buildValidatingWebhookConfigurationStore() },
"volumeattachments": func(b *Builder) cache.Store { return b.buildVolumeAttachmentStore() },
"verticalpodautoscalers": func(b *Builder) cache.Store { return b.buildVPAStore() },
var availableStores = map[string]func(f *Builder) []*metricsstore.MetricsStore{
"certificatesigningrequests": func(b *Builder) []*metricsstore.MetricsStore { return b.buildCsrStores() },
"configmaps": func(b *Builder) []*metricsstore.MetricsStore { return b.buildConfigMapStores() },
"cronjobs": func(b *Builder) []*metricsstore.MetricsStore { return b.buildCronJobStores() },
"daemonsets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildDaemonSetStores() },
"deployments": func(b *Builder) []*metricsstore.MetricsStore { return b.buildDeploymentStores() },
"endpoints": func(b *Builder) []*metricsstore.MetricsStore { return b.buildEndpointsStores() },
"horizontalpodautoscalers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildHPAStores() },
"ingresses": func(b *Builder) []*metricsstore.MetricsStore { return b.buildIngressStores() },
"jobs": func(b *Builder) []*metricsstore.MetricsStore { return b.buildJobStores() },
"leases": func(b *Builder) []*metricsstore.MetricsStore { return b.buildLeasesStores() },
"limitranges": func(b *Builder) []*metricsstore.MetricsStore { return b.buildLimitRangeStores() },
"mutatingwebhookconfigurations": func(b *Builder) []*metricsstore.MetricsStore { return b.buildMutatingWebhookConfigurationStores() },
"namespaces": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNamespaceStores() },
"networkpolicies": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNetworkPolicyStores() },
"nodes": func(b *Builder) []*metricsstore.MetricsStore { return b.buildNodeStores() },
"persistentvolumeclaims": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPersistentVolumeClaimStores() },
"persistentvolumes": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPersistentVolumeStores() },
"poddisruptionbudgets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPodDisruptionBudgetStores() },
"pods": func(b *Builder) []*metricsstore.MetricsStore { return b.buildPodStores() },
"replicasets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildReplicaSetStores() },
"replicationcontrollers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildReplicationControllerStores() },
"resourcequotas": func(b *Builder) []*metricsstore.MetricsStore { return b.buildResourceQuotaStores() },
"secrets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildSecretStores() },
"services": func(b *Builder) []*metricsstore.MetricsStore { return b.buildServiceStores() },
"statefulsets": func(b *Builder) []*metricsstore.MetricsStore { return b.buildStatefulSetStores() },
"storageclasses": func(b *Builder) []*metricsstore.MetricsStore { return b.buildStorageClassStores() },
"validatingwebhookconfigurations": func(b *Builder) []*metricsstore.MetricsStore { return b.buildValidatingWebhookConfigurationStores() },
"volumeattachments": func(b *Builder) []*metricsstore.MetricsStore { return b.buildVolumeAttachmentStores() },
"verticalpodautoscalers": func(b *Builder) []*metricsstore.MetricsStore { return b.buildVPAStores() },
}
func resourceExists(name string) bool {
@ -222,150 +227,169 @@ func availableResources() []string {
return c
}
func (b *Builder) buildConfigMapStore() cache.Store {
return b.buildStoreFunc(configMapMetricFamilies, &v1.ConfigMap{}, createConfigMapListWatch)
func (b *Builder) buildConfigMapStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(configMapMetricFamilies, &v1.ConfigMap{}, createConfigMapListWatch)
}
func (b *Builder) buildCronJobStore() cache.Store {
return b.buildStoreFunc(cronJobMetricFamilies(b.allowLabelsList["cronjobs"]), &batchv1beta1.CronJob{}, createCronJobListWatch)
func (b *Builder) buildCronJobStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(cronJobMetricFamilies(b.allowLabelsList["cronjobs"]), &batchv1beta1.CronJob{}, createCronJobListWatch)
}
func (b *Builder) buildDaemonSetStore() cache.Store {
return b.buildStoreFunc(daemonSetMetricFamilies(b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch)
func (b *Builder) buildDaemonSetStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(daemonSetMetricFamilies(b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch)
}
func (b *Builder) buildDeploymentStore() cache.Store {
return b.buildStoreFunc(deploymentMetricFamilies(b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch)
func (b *Builder) buildDeploymentStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(deploymentMetricFamilies(b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch)
}
func (b *Builder) buildEndpointsStore() cache.Store {
return b.buildStoreFunc(endpointMetricFamilies(b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch)
func (b *Builder) buildEndpointsStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(endpointMetricFamilies(b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch)
}
func (b *Builder) buildHPAStore() cache.Store {
return b.buildStoreFunc(hpaMetricFamilies(b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch)
func (b *Builder) buildHPAStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(hpaMetricFamilies(b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch)
}
func (b *Builder) buildIngressStore() cache.Store {
return b.buildStoreFunc(ingressMetricFamilies(b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch)
func (b *Builder) buildIngressStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(ingressMetricFamilies(b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch)
}
func (b *Builder) buildJobStore() cache.Store {
return b.buildStoreFunc(jobMetricFamilies(b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch)
func (b *Builder) buildJobStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(jobMetricFamilies(b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch)
}
func (b *Builder) buildLimitRangeStore() cache.Store {
return b.buildStoreFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch)
func (b *Builder) buildLimitRangeStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch)
}
func (b *Builder) buildMutatingWebhookConfigurationStore() cache.Store {
return b.buildStoreFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch)
func (b *Builder) buildMutatingWebhookConfigurationStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch)
}
func (b *Builder) buildNamespaceStore() cache.Store {
return b.buildStoreFunc(namespaceMetricFamilies(b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch)
func (b *Builder) buildNamespaceStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(namespaceMetricFamilies(b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch)
}
func (b *Builder) buildNetworkPolicyStore() cache.Store {
return b.buildStoreFunc(networkPolicyMetricFamilies(b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch)
func (b *Builder) buildNetworkPolicyStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(networkPolicyMetricFamilies(b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch)
}
func (b *Builder) buildNodeStore() cache.Store {
return b.buildStoreFunc(nodeMetricFamilies(b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch)
func (b *Builder) buildNodeStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(nodeMetricFamilies(b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch)
}
func (b *Builder) buildPersistentVolumeClaimStore() cache.Store {
return b.buildStoreFunc(persistentVolumeClaimMetricFamilies(b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch)
func (b *Builder) buildPersistentVolumeClaimStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(persistentVolumeClaimMetricFamilies(b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch)
}
func (b *Builder) buildPersistentVolumeStore() cache.Store {
return b.buildStoreFunc(persistentVolumeMetricFamilies(b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch)
func (b *Builder) buildPersistentVolumeStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(persistentVolumeMetricFamilies(b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch)
}
func (b *Builder) buildPodDisruptionBudgetStore() cache.Store {
return b.buildStoreFunc(podDisruptionBudgetMetricFamilies, &policy.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch)
func (b *Builder) buildPodDisruptionBudgetStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(podDisruptionBudgetMetricFamilies, &policy.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch)
}
func (b *Builder) buildReplicaSetStore() cache.Store {
return b.buildStoreFunc(replicaSetMetricFamilies(b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch)
func (b *Builder) buildReplicaSetStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(replicaSetMetricFamilies(b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch)
}
func (b *Builder) buildReplicationControllerStore() cache.Store {
return b.buildStoreFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch)
func (b *Builder) buildReplicationControllerStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch)
}
func (b *Builder) buildResourceQuotaStore() cache.Store {
return b.buildStoreFunc(resourceQuotaMetricFamilies, &v1.ResourceQuota{}, createResourceQuotaListWatch)
func (b *Builder) buildResourceQuotaStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(resourceQuotaMetricFamilies, &v1.ResourceQuota{}, createResourceQuotaListWatch)
}
func (b *Builder) buildSecretStore() cache.Store {
return b.buildStoreFunc(secretMetricFamilies(b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch)
func (b *Builder) buildSecretStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(secretMetricFamilies(b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch)
}
func (b *Builder) buildServiceStore() cache.Store {
return b.buildStoreFunc(serviceMetricFamilies(b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch)
func (b *Builder) buildServiceStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(serviceMetricFamilies(b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch)
}
func (b *Builder) buildStatefulSetStore() cache.Store {
return b.buildStoreFunc(statefulSetMetricFamilies(b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch)
func (b *Builder) buildStatefulSetStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(statefulSetMetricFamilies(b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch)
}
func (b *Builder) buildStorageClassStore() cache.Store {
return b.buildStoreFunc(storageClassMetricFamilies(b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch)
func (b *Builder) buildStorageClassStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(storageClassMetricFamilies(b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch)
}
func (b *Builder) buildPodStore() cache.Store {
return b.buildStoreFunc(podMetricFamilies(b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch)
func (b *Builder) buildPodStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(podMetricFamilies(b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch)
}
func (b *Builder) buildCsrStore() cache.Store {
return b.buildStoreFunc(csrMetricFamilies(b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch)
func (b *Builder) buildCsrStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(csrMetricFamilies(b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch)
}
func (b *Builder) buildValidatingWebhookConfigurationStore() cache.Store {
return b.buildStoreFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch)
func (b *Builder) buildValidatingWebhookConfigurationStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch)
}
func (b *Builder) buildVolumeAttachmentStore() cache.Store {
return b.buildStoreFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch)
func (b *Builder) buildVolumeAttachmentStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch)
}
func (b *Builder) buildVPAStore() cache.Store {
return b.buildStoreFunc(vpaMetricFamilies(b.allowLabelsList["verticalpodautoscalers"]), &vpaautoscaling.VerticalPodAutoscaler{}, createVPAListWatchFunc(b.vpaClient))
func (b *Builder) buildVPAStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(vpaMetricFamilies(b.allowLabelsList["verticalpodautoscalers"]), &vpaautoscaling.VerticalPodAutoscaler{}, createVPAListWatchFunc(b.vpaClient))
}
func (b *Builder) buildLeases() cache.Store {
return b.buildStoreFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch)
func (b *Builder) buildLeasesStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch)
}
func (b *Builder) buildStore(
func (b *Builder) buildStores(
metricFamilies []generator.FamilyGenerator,
expectedType interface{},
listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher,
) cache.Store {
) []*metricsstore.MetricsStore {
metricFamilies = generator.FilterMetricFamilies(b.allowDenyList, metricFamilies)
composedMetricGenFuncs := generator.ComposeMetricGenFuncs(metricFamilies)
familyHeaders := generator.ExtractMetricFamilyHeaders(metricFamilies)
store := metricsstore.NewMetricsStore(
familyHeaders,
composedMetricGenFuncs,
)
b.reflectorPerNamespace(expectedType, store, listWatchFunc)
if isAllNamespaces(b.namespaces) {
store := metricsstore.NewMetricsStore(
familyHeaders,
composedMetricGenFuncs,
)
listWatcher := listWatchFunc(b.kubeClient, v1.NamespaceAll)
b.startReflector(expectedType, store, listWatcher)
return []*metricsstore.MetricsStore{store}
}
return store
stores := make([]*metricsstore.MetricsStore, 0, len(b.namespaces))
for _, ns := range b.namespaces {
store := metricsstore.NewMetricsStore(
familyHeaders,
composedMetricGenFuncs,
)
listWatcher := listWatchFunc(b.kubeClient, ns)
b.startReflector(expectedType, store, listWatcher)
stores = append(stores, store)
}
return stores
}
// reflectorPerNamespace creates a Kubernetes client-go reflector with the given
// listWatchFunc for each given namespace and registers it with the given store.
func (b *Builder) reflectorPerNamespace(
// startReflector starts a Kubernetes client-go reflector with the given
// listWatcher and registers it with the given store.
func (b *Builder) startReflector(
expectedType interface{},
store cache.Store,
listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher,
listWatcher cache.ListerWatcher,
) {
lwf := func(ns string) cache.ListerWatcher { return listWatchFunc(b.kubeClient, ns) }
lw := listwatch.MultiNamespaceListerWatcher(b.namespaces, nil, lwf)
instrumentedListWatch := watch.NewInstrumentedListerWatcher(lw, b.listWatchMetrics, reflect.TypeOf(expectedType).String())
instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String())
reflector := cache.NewReflector(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, 0)
go reflector.Run(b.ctx.Done())
}
// isAllNamespaces checks if the given slice of namespaces
// contains only v1.NamespaceAll.
func isAllNamespaces(namespaces []string) bool {
return len(namespaces) == 1 && namespaces[0] == v1.NamespaceAll
}

View File

@ -140,7 +140,7 @@ func main() {
storeBuilder.WithAllowDenyList(allowDenyList)
storeBuilder.WithGenerateStoreFunc(storeBuilder.DefaultGenerateStoreFunc())
storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc())
proc.StartReaper()

View File

@ -67,7 +67,7 @@ func BenchmarkKubeStateMetrics(b *testing.B) {
builder.WithSharding(0, 1)
builder.WithContext(ctx)
builder.WithNamespaces(options.DefaultNamespaces)
builder.WithGenerateStoreFunc(builder.DefaultGenerateStoreFunc())
builder.WithGenerateStoresFunc(builder.DefaultGenerateStoresFunc())
l, err := allowdenylist.New(map[string]struct{}{}, map[string]struct{}{})
if err != nil {
@ -132,7 +132,7 @@ func TestFullScrapeCycle(t *testing.T) {
builder.WithEnabledResources(options.DefaultResources.AsSlice())
builder.WithKubeClient(kubeClient)
builder.WithNamespaces(options.DefaultNamespaces)
builder.WithGenerateStoreFunc(builder.DefaultGenerateStoreFunc())
builder.WithGenerateStoresFunc(builder.DefaultGenerateStoresFunc())
l, err := allowdenylist.New(map[string]struct{}{}, map[string]struct{}{})
if err != nil {
@ -412,7 +412,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) {
unshardedBuilder.WithNamespaces(options.DefaultNamespaces)
unshardedBuilder.WithAllowDenyList(l)
unshardedBuilder.WithAllowLabels(map[string][]string{})
unshardedBuilder.WithGenerateStoreFunc(unshardedBuilder.DefaultGenerateStoreFunc())
unshardedBuilder.WithGenerateStoresFunc(unshardedBuilder.DefaultGenerateStoresFunc())
unshardedHandler := metricshandler.New(&options.Options{}, kubeClient, unshardedBuilder, false)
unshardedHandler.ConfigureSharding(ctx, 0, 1)
@ -425,7 +425,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) {
shardedBuilder1.WithNamespaces(options.DefaultNamespaces)
shardedBuilder1.WithAllowDenyList(l)
shardedBuilder1.WithAllowLabels(map[string][]string{})
shardedBuilder1.WithGenerateStoreFunc(shardedBuilder1.DefaultGenerateStoreFunc())
shardedBuilder1.WithGenerateStoresFunc(shardedBuilder1.DefaultGenerateStoresFunc())
shardedHandler1 := metricshandler.New(&options.Options{}, kubeClient, shardedBuilder1, false)
shardedHandler1.ConfigureSharding(ctx, 0, 2)
@ -438,7 +438,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) {
shardedBuilder2.WithNamespaces(options.DefaultNamespaces)
shardedBuilder2.WithAllowDenyList(l)
shardedBuilder2.WithAllowLabels(map[string][]string{})
shardedBuilder2.WithGenerateStoreFunc(shardedBuilder2.DefaultGenerateStoreFunc())
shardedBuilder2.WithGenerateStoresFunc(shardedBuilder2.DefaultGenerateStoresFunc())
shardedHandler2 := metricshandler.New(&options.Options{}, kubeClient, shardedBuilder2, false)
shardedHandler2.ConfigureSharding(ctx, 1, 2)

View File

@ -19,10 +19,11 @@ package builder
import (
"context"
metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"
"github.com/prometheus/client_golang/prometheus"
vpaclientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
internalstore "k8s.io/kube-state-metrics/v2/internal/store"
ksmtypes "k8s.io/kube-state-metrics/v2/pkg/builder/types"
@ -89,17 +90,17 @@ func (b *Builder) WithAllowLabels(l map[string][]string) {
b.internal.WithAllowLabels(l)
}
// WithGenerateStoreFunc configures a custom generate store function
func (b *Builder) WithGenerateStoreFunc(f ksmtypes.BuildStoreFunc) {
b.internal.WithGenerateStoreFunc(f)
// WithGenerateStoresFunc configures a custom generate store function
func (b *Builder) WithGenerateStoresFunc(f ksmtypes.BuildStoresFunc) {
b.internal.WithGenerateStoresFunc(f)
}
// DefaultGenerateStoreFunc returns default buildStore function
func (b *Builder) DefaultGenerateStoreFunc() ksmtypes.BuildStoreFunc {
return b.internal.DefaultGenerateStoreFunc()
// DefaultGenerateStoresFunc returns default buildStore function
func (b *Builder) DefaultGenerateStoresFunc() ksmtypes.BuildStoresFunc {
return b.internal.DefaultGenerateStoresFunc()
}
// Build initializes and registers all enabled stores.
func (b *Builder) Build() []cache.Store {
func (b *Builder) Build() []metricsstore.MetricsWriter {
return b.internal.Build()
}

View File

@ -19,6 +19,8 @@ package types
import (
"context"
metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"
"github.com/prometheus/client_golang/prometheus"
vpaclientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
clientset "k8s.io/client-go/kubernetes"
@ -38,17 +40,17 @@ type BuilderInterface interface {
WithKubeClient(c clientset.Interface)
WithVPAClient(c vpaclientset.Interface)
WithAllowDenyList(l AllowDenyLister)
WithGenerateStoreFunc(f BuildStoreFunc)
WithAllowLabels(l map[string][]string)
DefaultGenerateStoreFunc() BuildStoreFunc
Build() []cache.Store
WithGenerateStoresFunc(f BuildStoresFunc)
DefaultGenerateStoresFunc() BuildStoresFunc
Build() []metricsstore.MetricsWriter
}
// BuildStoreFunc function signature that is use to returns a cache.Store
type BuildStoreFunc func(metricFamilies []generator.FamilyGenerator,
// BuildStoresFunc function signature that is used to return a list of metricsstore.MetricsStore
type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator,
expectedType interface{},
listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher,
) cache.Store
) []*metricsstore.MetricsStore
// AllowDenyLister interface for AllowDeny lister that can allow or exclude metrics by there names
type AllowDenyLister interface {

View File

@ -1,202 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package listwatch
import (
"fmt"
"strings"
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
// MultiNamespaceListerWatcher takes allowed and denied namespaces and a
// cache.ListerWatcher generator func and returns a single cache.ListerWatcher
// capable of operating on multiple namespaces.
//
// Allowed namespaces and denied namespaces are mutually exclusive.
// If allowed namespaces contain multiple items, the given denied namespaces have no effect.
// If the allowed namespaces includes exactly one entry with the value v1.NamespaceAll (empty string),
// the given denied namespaces are applied.
func MultiNamespaceListerWatcher(allowedNamespaces, deniedNamespaces []string, f func(string) cache.ListerWatcher) cache.ListerWatcher {
// If there is only one namespace then there is no need to create a
// multi lister watcher proxy.
if IsAllNamespaces(allowedNamespaces) {
return newDenylistListerWatcher(deniedNamespaces, f(allowedNamespaces[0]))
}
if len(allowedNamespaces) == 1 {
return f(allowedNamespaces[0])
}
var lws []cache.ListerWatcher
for _, n := range allowedNamespaces {
lws = append(lws, f(n))
}
return multiListerWatcher(lws)
}
// multiListerWatcher abstracts several cache.ListerWatchers, allowing them
// to be treated as a single cache.ListerWatcher.
type multiListerWatcher []cache.ListerWatcher
// List implements the ListerWatcher interface.
// It combines the output of the List method of every ListerWatcher into
// a single result.
func (mlw multiListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
l := metav1.List{}
var resourceVersions []string
for _, lw := range mlw {
list, err := lw.List(options)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
for _, item := range items {
l.Items = append(l.Items, runtime.RawExtension{Object: item.DeepCopyObject()})
}
resourceVersions = append(resourceVersions, metaObj.GetResourceVersion())
}
// Combine the resource versions so that the composite Watch method can
// distribute appropriate versions to each underlying Watch func.
l.ListMeta.ResourceVersion = strings.Join(resourceVersions, "/")
return &l, nil
}
// Watch implements the ListerWatcher interface.
// It returns a watch.Interface that combines the output from the
// watch.Interface of every cache.ListerWatcher into a single result chan.
func (mlw multiListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
resourceVersions := make([]string, len(mlw))
// Allow resource versions to be "".
if options.ResourceVersion != "" {
rvs := make([]string, 0, len(mlw))
if strings.Contains(options.ResourceVersion, "/") {
rvs = strings.Split(options.ResourceVersion, "/")
if len(rvs) != len(mlw) {
return nil, fmt.Errorf("expected resource version to have %d parts to match the number of ListerWatchers actual: %d", len(mlw), len(rvs))
}
} else {
// watch reconnected and resource version is the latest one from event.Object has no "/"
for i := 0; i < len(mlw); i++ {
rvs = append(rvs, options.ResourceVersion)
}
}
resourceVersions = rvs
}
return newMultiWatch(mlw, resourceVersions, options)
}
// multiWatch abstracts multiple watch.Interface's, allowing them
// to be treated as a single watch.Interface.
type multiWatch struct {
result chan watch.Event
stopped chan struct{}
stoppers []func()
}
// newMultiWatch returns a new multiWatch or an error if one of the underlying
// Watch funcs errored. The length of []cache.ListerWatcher and []string must
// match.
func newMultiWatch(lws []cache.ListerWatcher, resourceVersions []string, options metav1.ListOptions) (*multiWatch, error) {
var (
result = make(chan watch.Event)
stopped = make(chan struct{})
stoppers []func()
wg sync.WaitGroup
)
wg.Add(len(lws))
for i, lw := range lws {
o := options.DeepCopy()
o.ResourceVersion = resourceVersions[i]
w, err := lw.Watch(*o)
if err != nil {
return nil, err
}
go func() {
defer wg.Done()
for {
event, ok := <-w.ResultChan()
if !ok {
return
}
select {
case result <- event:
case <-stopped:
return
}
}
}()
stoppers = append(stoppers, w.Stop)
}
// result chan must be closed,
// once all event sender goroutines exited.
go func() {
wg.Wait()
close(result)
}()
return &multiWatch{
result: result,
stoppers: stoppers,
stopped: stopped,
}, nil
}
// ResultChan implements the watch.Interface interface.
func (mw *multiWatch) ResultChan() <-chan watch.Event {
return mw.result
}
// Stop implements the watch.Interface interface.
// It stops all of the underlying watch.Interfaces and closes the backing chan.
// Can safely be called more than once.
func (mw *multiWatch) Stop() {
select {
case <-mw.stopped:
// nothing to do, we are already stopped
default:
for _, stop := range mw.stoppers {
stop()
}
close(mw.stopped)
}
return
}
// IsAllNamespaces checks if the given slice of namespaces
// contains only v1.NamespaceAll.
func IsAllNamespaces(namespaces []string) bool {
return len(namespaces) == 1 && namespaces[0] == v1.NamespaceAll
}

View File

@ -1,184 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package listwatch
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
// denylistListerWatcher implements cache.ListerWatcher
// which wraps a cache.ListerWatcher,
// filtering list results and watch events by denied namespaces.
type denylistListerWatcher struct {
denylist map[string]struct{}
next cache.ListerWatcher
}
// newDenylistListerWatcher creates a cache.ListerWatcher
// wrapping the given next cache.ListerWatcher
// filtering lists and watch events by the given namespaces.
func newDenylistListerWatcher(namespaces []string, next cache.ListerWatcher) cache.ListerWatcher {
if len(namespaces) == 0 {
return next
}
denylist := make(map[string]struct{})
for _, ns := range namespaces {
denylist[ns] = struct{}{}
}
return &denylistListerWatcher{
denylist: denylist,
next: next,
}
}
// List lists the wrapped next listerwatcher List result,
// but filtering denied namespaces from the result.
func (w *denylistListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
l := metav1.List{}
list, err := w.next.List(options)
if err != nil {
klog.Errorf("error listing: %v", err)
return nil, err
}
objs, err := meta.ExtractList(list)
if err != nil {
klog.Errorf("error extracting list: %v", err)
return nil, err
}
metaObj, err := meta.ListAccessor(list)
if err != nil {
klog.Errorf("error getting list accessor: %v", err)
return nil, err
}
for _, obj := range objs {
acc, err := meta.Accessor(obj)
if err != nil {
klog.Errorf("error getting meta accessor accessor for object %s: %v", fmt.Sprintf("%v", obj), err)
return nil, err
}
if _, denied := w.denylist[getNamespace(acc)]; denied {
klog.V(8).Infof("denied %s", acc.GetSelfLink())
continue
}
klog.V(8).Infof("allowed %s", acc.GetSelfLink())
l.Items = append(l.Items, runtime.RawExtension{Object: obj.DeepCopyObject()})
}
l.ListMeta.ResourceVersion = metaObj.GetResourceVersion()
return &l, nil
}
// Watch
func (w *denylistListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
nextWatch, err := w.next.Watch(options)
if err != nil {
return nil, err
}
return newDenylistWatch(w.denylist, nextWatch), nil
}
// newDenylistWatch creates a new watch.Interface,
// wrapping the given next watcher,
// and filtering watch events by the given namespaces.
//
// It starts a new goroutine until either
// a) the result channel of the wrapped next watcher is closed, or
// b) Stop() was invoked on the returned watcher.
func newDenylistWatch(denylist map[string]struct{}, next watch.Interface) watch.Interface {
var (
result = make(chan watch.Event)
proxy = watch.NewProxyWatcher(result)
)
go func() {
defer func() {
klog.V(8).Info("stopped denylist watcher")
// According to watch.Interface the result channel is supposed to be called
// in case of error or if the listwach is closed, see [1].
//
// [1] https://github.com/kubernetes/apimachinery/blob/533d101be9a6450773bb2829bef282b6b7c4ff6d/pkg/watch/watch.go#L34-L37
close(result)
}()
for {
select {
case event, ok := <-next.ResultChan():
if !ok {
klog.V(8).Info("result channel closed")
return
}
acc, err := meta.Accessor(event.Object)
if err != nil {
// ignore this event, it doesn't implement the metav1.Object interface,
// hence we cannot determine its namespace.
klog.V(6).Infof("unexpected object type in event (%T): %v", event.Object, event.Object)
continue
}
if _, denied := denylist[getNamespace(acc)]; denied {
klog.V(8).Infof("denied %s", acc.GetSelfLink())
continue
}
klog.V(8).Infof("allowed %s", acc.GetSelfLink())
select {
case result <- event:
klog.V(8).Infof("dispatched %s", acc.GetSelfLink())
case <-proxy.StopChan():
next.Stop()
return
}
case <-proxy.StopChan():
next.Stop()
return
}
}
}()
return proxy
}
// getNamespace returns the namespace of the given object.
// If the object is itself a namespace, it returns the object's
// name.
func getNamespace(obj metav1.Object) string {
if _, ok := obj.(*v1.Namespace); ok {
return obj.GetName()
}
return obj.GetNamespace()
}

View File

@ -0,0 +1,70 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricsstore
import "io"
// MetricsWriter is the interface that wraps the WriteAll method.
// WriteAll writes out bytes to the underlying writer.
type MetricsWriter interface {
WriteAll(w io.Writer)
}
// MultiStoreMetricsWriter is a struct that holds multiple MetricsStore(s) and
// implements the MetricsWriter interface.
// It should be used with stores which have the same metric headers.
//
// MultiStoreMetricsWriter writes out metrics from the underlying stores so that
// metrics with the same name coming from different stores end up grouped together.
// It also ensures that the metric headers are only written out once.
type MultiStoreMetricsWriter struct {
stores []*MetricsStore
}
// NewMultiStoreMetricsWriter creates a new MultiStoreMetricsWriter.
func NewMultiStoreMetricsWriter(stores []*MetricsStore) MetricsWriter {
return &MultiStoreMetricsWriter{
stores: stores,
}
}
// WriteAll writes out metrics from the underlying stores to the given writer.
//
// WriteAll writes metrics so that the ones with the same name
// are grouped together when written out.
func (m MultiStoreMetricsWriter) WriteAll(w io.Writer) {
if len(m.stores) == 0 {
return
}
for _, s := range m.stores {
s.mutex.RLock()
defer func(s *MetricsStore) {
s.mutex.RUnlock()
}(s)
}
for i, help := range m.stores[0].headers {
w.Write([]byte(help))
w.Write([]byte{'\n'})
for _, s := range m.stores {
for _, metricFamilies := range s.metrics {
w.Write(metricFamilies[i])
}
}
}
}

View File

@ -0,0 +1,224 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricsstore_test
import (
"strings"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kube-state-metrics/v2/pkg/metric"
metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"
)
func TestWriteAllWithSingleStore(t *testing.T) {
genFunc := func(obj interface{}) []metric.FamilyInterface {
o, err := meta.Accessor(obj)
if err != nil {
t.Fatal(err)
}
mf1 := metric.Family{
Name: "kube_service_info_1",
Metrics: []*metric.Metric{
{
LabelKeys: []string{"namespace", "uid"},
LabelValues: []string{o.GetNamespace(), string(o.GetUID())},
Value: float64(1),
},
},
}
mf2 := metric.Family{
Name: "kube_service_info_2",
Metrics: []*metric.Metric{
{
LabelKeys: []string{"namespace", "uid"},
LabelValues: []string{o.GetNamespace(), string(o.GetUID())},
Value: float64(1),
},
},
}
return []metric.FamilyInterface{&mf1, &mf2}
}
store := metricsstore.NewMetricsStore([]string{"Info 1 about services", "Info 2 about services"}, genFunc)
svcs := []v1.Service{
{
ObjectMeta: metav1.ObjectMeta{
UID: "a1",
Name: "service",
Namespace: "a",
},
},
{
ObjectMeta: metav1.ObjectMeta{
UID: "a2",
Name: "service",
Namespace: "a",
},
},
}
for _, svc := range svcs {
if err := store.Add(&svc); err != nil {
t.Fatal(err)
}
}
multiNsWriter := metricsstore.NewMultiStoreMetricsWriter([]*metricsstore.MetricsStore{store})
w := strings.Builder{}
multiNsWriter.WriteAll(&w)
result := w.String()
resultLines := strings.Split(strings.TrimRight(result, "\n"), "\n")
if len(resultLines) != 6 {
t.Fatalf("Invalid number of series, got %d, want %d", len(resultLines), 6)
}
if resultLines[0] != "Info 1 about services" {
t.Fatalf("Invalid metrics header on line 0, got %s, want %s", resultLines[0], "Info 1 about services")
}
if resultLines[3] != "Info 2 about services" {
t.Fatalf("Invalid metrics header on line 3, got %s, want %s", resultLines[3], "Info 2 about services")
}
expectedSeries := []string{
`kube_service_info_1{namespace="a",uid="a1"} 1`,
`kube_service_info_1{namespace="a",uid="a2"} 1`,
`kube_service_info_2{namespace="a",uid="a1"} 1`,
`kube_service_info_2{namespace="a",uid="a2"} 1`,
}
for _, series := range expectedSeries {
if !strings.Contains(result, series) {
t.Fatalf("Did not find expected series %s", series)
}
}
}
func TestWriteAllWithMultipleStores(t *testing.T) {
genFunc := func(obj interface{}) []metric.FamilyInterface {
o, err := meta.Accessor(obj)
if err != nil {
t.Fatal(err)
}
mf1 := metric.Family{
Name: "kube_service_info_1",
Metrics: []*metric.Metric{
{
LabelKeys: []string{"namespace", "uid"},
LabelValues: []string{o.GetNamespace(), string(o.GetUID())},
Value: float64(1),
},
},
}
mf2 := metric.Family{
Name: "kube_service_info_2",
Metrics: []*metric.Metric{
{
LabelKeys: []string{"namespace", "uid"},
LabelValues: []string{o.GetNamespace(), string(o.GetUID())},
Value: float64(1),
},
},
}
return []metric.FamilyInterface{&mf1, &mf2}
}
s1 := metricsstore.NewMetricsStore([]string{"Info 1 about services", "Info 2 about services"}, genFunc)
svcs1 := []v1.Service{
{
ObjectMeta: metav1.ObjectMeta{
UID: "a1",
Name: "service",
Namespace: "a",
},
},
{
ObjectMeta: metav1.ObjectMeta{
UID: "a2",
Name: "service",
Namespace: "a",
},
},
}
for _, svc := range svcs1 {
if err := s1.Add(&svc); err != nil {
t.Fatal(err)
}
}
svcs2 := []v1.Service{
{
ObjectMeta: metav1.ObjectMeta{
UID: "b1",
Name: "service",
Namespace: "b",
},
},
{
ObjectMeta: metav1.ObjectMeta{
UID: "b2",
Name: "service",
Namespace: "b",
},
},
}
s2 := metricsstore.NewMetricsStore([]string{"Info 1 about services", "Info 2 about services"}, genFunc)
for _, svc := range svcs2 {
if err := s2.Add(&svc); err != nil {
t.Fatal(err)
}
}
multiNsWriter := metricsstore.NewMultiStoreMetricsWriter([]*metricsstore.MetricsStore{s1, s2})
w := strings.Builder{}
multiNsWriter.WriteAll(&w)
result := w.String()
resultLines := strings.Split(strings.TrimRight(result, "\n"), "\n")
if len(resultLines) != 10 {
t.Fatalf("Invalid number of series, got %d, want %d", len(resultLines), 10)
}
if resultLines[0] != "Info 1 about services" {
t.Fatalf("Invalid metrics header on line 0, got %s, want %s", resultLines[0], "Info 1 about services")
}
if resultLines[5] != "Info 2 about services" {
t.Fatalf("Invalid metrics header on line 0, got %s, want %s", resultLines[5], "Info 2 about services")
}
expectedSeries := []string{
`kube_service_info_1{namespace="a",uid="a1"} 1`,
`kube_service_info_1{namespace="a",uid="a2"} 1`,
`kube_service_info_1{namespace="b",uid="b1"} 1`,
`kube_service_info_1{namespace="b",uid="b2"} 1`,
`kube_service_info_2{namespace="a",uid="a1"} 1`,
`kube_service_info_2{namespace="a",uid="a2"} 1`,
`kube_service_info_2{namespace="b",uid="b1"} 1`,
`kube_service_info_2{namespace="b",uid="b2"} 1`,
}
for _, series := range expectedSeries {
if !strings.Contains(result, series) {
t.Fatalf("Did not find expected series %s", series)
}
}
}

View File

@ -48,9 +48,9 @@ type MetricsHandler struct {
cancel func()
// mtx protects stores, curShard, and curTotalShards
// mtx protects metricsWriters, curShard, and curTotalShards
mtx *sync.RWMutex
stores []cache.Store
metricsWriters []metricsstore.MetricsWriter
curShard int32
curTotalShards int
}
@ -81,7 +81,7 @@ func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, tot
ctx, m.cancel = context.WithCancel(ctx)
m.storeBuilder.WithSharding(shard, totalShards)
m.storeBuilder.WithContext(ctx)
m.stores = m.storeBuilder.Build()
m.metricsWriters = m.storeBuilder.Build()
m.curShard = shard
m.curTotalShards = totalShards
}
@ -174,8 +174,8 @@ func (m *MetricsHandler) Run(ctx context.Context) error {
return ctx.Err()
}
// ServeHTTP implements the http.Handler interface. It writes the metrics in
// its stores to the response body.
// ServeHTTP implements the http.Handler interface. It writes all generated
// metrics to the response body.
func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.mtx.RLock()
defer m.mtx.RUnlock()
@ -198,9 +198,8 @@ func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
for _, s := range m.stores {
ms := s.(*metricsstore.MetricsStore)
ms.WriteAll(writer)
for _, w := range m.metricsWriters {
w.WriteAll(writer)
}
// In case we gzipped the response, we have to close the writer.