add hpa controller

Signed-off-by: Poor12 <shentiecheng@huawei.com>
This commit is contained in:
Poor12 2023-05-29 15:07:26 +08:00
parent 314b46ddd3
commit 3a1464828a
31 changed files with 3905 additions and 18 deletions

View File

@ -17658,6 +17658,11 @@
"kind": "DeleteOptions",
"version": "v2beta2"
},
{
"group": "autoscaling.karmada.io",
"kind": "DeleteOptions",
"version": "v1alpha1"
},
{
"group": "batch",
"kind": "DeleteOptions",
@ -18394,6 +18399,11 @@
"kind": "WatchEvent",
"version": "v2beta2"
},
{
"group": "autoscaling.karmada.io",
"kind": "WatchEvent",
"version": "v1alpha1"
},
{
"group": "batch",
"kind": "WatchEvent",

View File

@ -61,6 +61,20 @@ webhooks:
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 10
- name: autoscaling.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["autoscaling.karmada.io"]
apiVersions: ["*"]
resources: ["federatedhpas"]
scope: "Namespaced"
clientConfig:
url: https://karmada-webhook.karmada-system.svc:443/mutate-federatedhpa
caBundle: {{caBundle}}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 10
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration

View File

@ -1164,6 +1164,8 @@ spec:
type: object
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""

View File

@ -12,6 +12,7 @@ resources:
- bases/config.karmada.io_resourceinterpretercustomizations.yaml
- bases/config.karmada.io_resourceinterpreterwebhookconfigurations.yaml
- bases/networking.karmada.io_multiclusteringresses.yaml
- bases/autoscaling.karmada.io_federatedhpas.yaml
patchesStrategicMerge:
- patches/webhook_in_resourcebindings.yaml

View File

@ -65,6 +65,20 @@ webhooks:
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: autoscaling.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["autoscaling.karmada.io"]
apiVersions: ["*"]
resources: ["federatedhpas"]
scope: "Namespaced"
clientConfig:
url: https://{{ $name }}-webhook.{{ $namespace }}.svc:443/mutate-federatedhpa
{{- include "karmada.webhook.caBundle" . | nindent 6 }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration

View File

@ -17,6 +17,9 @@ import (
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
"k8s.io/metrics/pkg/client/custom_metrics"
"k8s.io/metrics/pkg/client/external_metrics"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
@ -35,6 +38,8 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/cluster"
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
"github.com/karmada-io/karmada/pkg/controllers/execution"
"github.com/karmada-io/karmada/pkg/controllers/federatedhpa"
metricsclient "github.com/karmada-io/karmada/pkg/controllers/federatedhpa/metrics"
"github.com/karmada-io/karmada/pkg/controllers/federatedresourcequota"
"github.com/karmada-io/karmada/pkg/controllers/gracefuleviction"
"github.com/karmada-io/karmada/pkg/controllers/hpa"
@ -198,6 +203,7 @@ func init() {
controllers["federatedResourceQuotaStatus"] = startFederatedResourceQuotaStatusController
controllers["gracefulEviction"] = startGracefulEvictionController
controllers["applicationFailover"] = startApplicationFailoverController
controllers["federatedHorizontalPodAutoscaler"] = startFederatedHorizontalPodAutoscalerController
}
func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
@ -552,6 +558,38 @@ func startApplicationFailoverController(ctx controllerscontext.Context) (enabled
return true, nil
}
func startFederatedHorizontalPodAutoscalerController(ctx controllerscontext.Context) (enabled bool, err error) {
apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(ctx.KubeClientSet.Discovery())
go custom_metrics.PeriodicallyInvalidate(
apiVersionsGetter,
ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.StopChan)
metricsClient := metricsclient.NewRESTMetricsClient(
resourceclient.NewForConfigOrDie(ctx.Mgr.GetConfig()),
custom_metrics.NewForConfig(ctx.Mgr.GetConfig(), ctx.Mgr.GetRESTMapper(), apiVersionsGetter),
external_metrics.NewForConfigOrDie(ctx.Mgr.GetConfig()),
)
replicaCalculator := federatedhpa.NewReplicaCalculator(metricsClient,
ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerTolerance,
ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerInitialReadinessDelay.Duration)
federatedHPAController := federatedhpa.FederatedHPAController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedhpa.ControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
DownscaleStabilisationWindow: ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
HorizontalPodAutoscalerSyncPeroid: ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerSyncPeriod.Duration,
ReplicaCalc: replicaCalculator,
ClusterScaleClientSetFunc: util.NewClusterScaleClientSet,
TypedInformerManager: typedmanager.GetInstance(),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
if err = federatedHPAController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
return true, nil
}
// setupControllers initialize controllers and setup one by one.
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
restConfig := mgr.GetConfig()
@ -640,9 +678,11 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
RateLimiterOptions: opts.RateLimiterOpts,
GracefulEvictionTimeout: opts.GracefulEvictionTimeout,
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
HPAControllerConfiguration: opts.HPAControllerConfiguration,
},
StopChan: stopChan,
DynamicClientSet: dynamicClientSet,
KubeClientSet: kubeClientSet,
OverrideManager: overrideManager,
ControlPlaneInformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,

View File

@ -11,6 +11,7 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
componentbaseconfig "k8s.io/component-base/config"
"github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
@ -123,8 +124,9 @@ type Options struct {
// removal since the workload(resource) has been moved to the graceful eviction tasks.
GracefulEvictionTimeout metav1.Duration
RateLimiterOpts ratelimiterflag.Options
ProfileOpts profileflag.Options
RateLimiterOpts ratelimiterflag.Options
ProfileOpts profileflag.Options
HPAControllerConfiguration config.HPAControllerConfiguration
// EnableClusterResourceModeling indicates if enable cluster resource modeling.
// The resource modeling might be used by the scheduler to make scheduling decisions
// in scenario of dynamic replica assignment based on cluster free resources.
@ -217,6 +219,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers, disabledByDefau
o.RateLimiterOpts.AddFlags(flags)
o.ProfileOpts.AddFlags(flags)
o.HPAControllerConfiguration.AddFlags(flags)
features.FeatureGate.AddFlag(flags)
}

View File

@ -25,6 +25,7 @@ import (
"github.com/karmada-io/karmada/pkg/webhook/clusteroverridepolicy"
"github.com/karmada-io/karmada/pkg/webhook/clusterpropagationpolicy"
"github.com/karmada-io/karmada/pkg/webhook/configuration"
"github.com/karmada-io/karmada/pkg/webhook/federatedhpa"
"github.com/karmada-io/karmada/pkg/webhook/federatedresourcequota"
"github.com/karmada-io/karmada/pkg/webhook/multiclusteringress"
"github.com/karmada-io/karmada/pkg/webhook/overridepolicy"
@ -129,6 +130,7 @@ func Run(ctx context.Context, opts *options.Options) error {
hookServer.Register("/validate-federatedresourcequota", &webhook.Admission{Handler: &federatedresourcequota.ValidatingAdmission{}})
hookServer.Register("/validate-resourceinterpretercustomization", &webhook.Admission{Handler: &resourceinterpretercustomization.ValidatingAdmission{Client: hookManager.GetClient()}})
hookServer.Register("/validate-multiclusteringress", &webhook.Admission{Handler: &multiclusteringress.ValidatingAdmission{}})
hookServer.Register("/mutate-federatedhpa", &webhook.Admission{Handler: &federatedhpa.MutatingAdmission{}})
hookServer.WebhookMux.Handle("/readyz/", http.StripPrefix("/readyz/", &healthz.Handler{}))
// blocks until the context is done.

View File

@ -66,6 +66,20 @@ webhooks:
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: autoscaling.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["autoscaling.karmada.io"]
apiVersions: ["*"]
resources: ["federatedhpas"]
scope: "Namespaced"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/mutate-federatedhpa
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3
`
// KarmadaWebhookValidatingWebhookConfiguration is KarmadaWebhook ValidatingWebhookConfiguration manifest

View File

@ -7,6 +7,7 @@ import (
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=fhpa,categories={karmada-io}
// FederatedHPA is centralized HPA that can aggregate the metrics in multiple clusters.

View File

@ -7,9 +7,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
@ -79,6 +81,8 @@ type Options struct {
CertRotationRemainingTimeThreshold float64
// KarmadaKubeconfigNamespace is the namespace of the secret containing karmada-agent certificate.
KarmadaKubeconfigNamespace string
// HPAControllerConfiguration is the config of federatedHPA-controller.
HPAControllerConfiguration config.HPAControllerConfiguration
}
// Context defines the context object for controller.
@ -88,6 +92,7 @@ type Context struct {
Opts Options
StopChan <-chan struct{}
DynamicClientSet dynamic.Interface
KubeClientSet clientset.Interface
OverrideManager overridemanager.OverrideManager
ControlPlaneInformerManager genericmanager.SingleClusterInformerManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter

View File

@ -0,0 +1,64 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"time"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// HPAControllerConfiguration contains elements describing HPAController.
type HPAControllerConfiguration struct {
// horizontalPodAutoscalerSyncPeriod is the period for syncing the number of
// pods in horizontal pod autoscaler.
HorizontalPodAutoscalerSyncPeriod metav1.Duration
// horizontalPodAutoscalerUpscaleForbiddenWindow is a period after which next upscale allowed.
HorizontalPodAutoscalerUpscaleForbiddenWindow metav1.Duration
// horizontalPodAutoscalerDownscaleForbiddenWindow is a period after which next downscale allowed.
HorizontalPodAutoscalerDownscaleForbiddenWindow metav1.Duration
// HorizontalPodAutoscalerDowncaleStabilizationWindow is a period for which autoscaler will look
// backwards and not scale down below any recommendation it made during that period.
HorizontalPodAutoscalerDownscaleStabilizationWindow metav1.Duration
// horizontalPodAutoscalerTolerance is the tolerance for when
// resource usage suggests upscaling/downscaling
HorizontalPodAutoscalerTolerance float64
// HorizontalPodAutoscalerCPUInitializationPeriod is the period after pod start when CPU samples
// might be skipped.
HorizontalPodAutoscalerCPUInitializationPeriod metav1.Duration
// HorizontalPodAutoscalerInitialReadinessDelay is period after pod start during which readiness
// changes are treated as readiness being set for the first time. The only effect of this is that
// HPA will disregard CPU samples from unready pods that had last readiness change during that
// period.
HorizontalPodAutoscalerInitialReadinessDelay metav1.Duration
}
// AddFlags adds flags related to HPAController for controller manager to the specified FlagSet.
func (o *HPAControllerConfiguration) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
fs.DurationVar(&o.HorizontalPodAutoscalerSyncPeriod.Duration, "horizontal-pod-autoscaler-sync-period", 15*time.Second, "The period for syncing the number of pods in horizontal pod autoscaler.")
fs.DurationVar(&o.HorizontalPodAutoscalerUpscaleForbiddenWindow.Duration, "horizontal-pod-autoscaler-upscale-delay", 3*time.Minute, "The period since last upscale, before another upscale can be performed in horizontal pod autoscaler.")
fs.DurationVar(&o.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, "horizontal-pod-autoscaler-downscale-stabilization", 5*time.Minute, "The period for which autoscaler will look backwards and not scale down below any recommendation it made during that period.")
fs.DurationVar(&o.HorizontalPodAutoscalerDownscaleForbiddenWindow.Duration, "horizontal-pod-autoscaler-downscale-delay", 5*time.Minute, "The period since last downscale, before another downscale can be performed in horizontal pod autoscaler.")
fs.Float64Var(&o.HorizontalPodAutoscalerTolerance, "horizontal-pod-autoscaler-tolerance", 0.1, "The minimum change (from 1.0) in the desired-to-actual metrics ratio for the horizontal pod autoscaler to consider scaling.")
fs.DurationVar(&o.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, "horizontal-pod-autoscaler-cpu-initialization-period", 5*time.Minute, "The period after pod start when CPU samples might be skipped.")
fs.DurationVar(&o.HorizontalPodAutoscalerInitialReadinessDelay.Duration, "horizontal-pod-autoscaler-initial-readiness-delay", 30*time.Second, "The period after pod start during which readiness changes will be treated as initial readiness.")
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,221 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"fmt"
"time"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
customapi "k8s.io/metrics/pkg/apis/custom_metrics/v1beta2"
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1beta1"
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
customclient "k8s.io/metrics/pkg/client/custom_metrics"
externalclient "k8s.io/metrics/pkg/client/external_metrics"
)
const (
metricServerDefaultMetricWindow = time.Minute
)
func NewRESTMetricsClient(resourceClient resourceclient.PodMetricsesGetter, customClient customclient.CustomMetricsClient, externalClient externalclient.ExternalMetricsClient) MetricsClient {
return &restMetricsClient{
&resourceMetricsClient{resourceClient},
&customMetricsClient{customClient},
&externalMetricsClient{externalClient},
}
}
// restMetricsClient is a client which supports fetching
// metrics from the resource metrics API, the
// custom metrics API and the external metrics API.
type restMetricsClient struct {
*resourceMetricsClient
*customMetricsClient
*externalMetricsClient
}
// resourceMetricsClient implements the resource-metrics-related parts of MetricsClient,
// using data from the resource metrics API.
type resourceMetricsClient struct {
client resourceclient.PodMetricsesGetter
}
// GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
func (c *resourceMetricsClient) GetResourceMetric(ctx context.Context, resource corev1.ResourceName, namespace string, selector labels.Selector, container string) (PodMetricsInfo, time.Time, error) {
metrics, err := c.client.PodMetricses(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return nil, time.Time{}, fmt.Errorf("unable to fetch metrics from resource metrics API: %v", err)
}
if len(metrics.Items) == 0 {
return nil, time.Time{}, fmt.Errorf("no metrics returned from resource metrics API")
}
var res PodMetricsInfo
if container != "" {
res, err = getContainerMetrics(metrics.Items, resource, container)
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to get container metrics: %v", err)
}
} else {
res = getPodMetrics(metrics.Items, resource)
}
timestamp := metrics.Items[0].Timestamp.Time
return res, timestamp, nil
}
func getContainerMetrics(rawMetrics []metricsapi.PodMetrics, resource corev1.ResourceName, container string) (PodMetricsInfo, error) {
res := make(PodMetricsInfo, len(rawMetrics))
for _, m := range rawMetrics {
containerFound := false
for _, c := range m.Containers {
if c.Name == container {
containerFound = true
if val, resFound := c.Usage[resource]; resFound {
res[m.Name] = PodMetric{
Timestamp: m.Timestamp.Time,
Window: m.Window.Duration,
Value: val.MilliValue(),
}
}
break
}
}
if !containerFound {
return nil, fmt.Errorf("container %s not present in metrics for pod %s/%s", container, m.Namespace, m.Name)
}
}
return res, nil
}
func getPodMetrics(rawMetrics []metricsapi.PodMetrics, resource corev1.ResourceName) PodMetricsInfo {
res := make(PodMetricsInfo, len(rawMetrics))
for _, m := range rawMetrics {
podSum := int64(0)
missing := len(m.Containers) == 0
for _, c := range m.Containers {
resValue, found := c.Usage[resource]
if !found {
missing = true
klog.V(2).Infof("missing resource metric %v for %s/%s", resource, m.Namespace, m.Name)
break
}
podSum += resValue.MilliValue()
}
if !missing {
res[m.Name] = PodMetric{
Timestamp: m.Timestamp.Time,
Window: m.Window.Duration,
Value: podSum,
}
}
}
return res
}
// customMetricsClient implements the custom-metrics-related parts of MetricsClient,
// using data from the custom metrics API.
type customMetricsClient struct {
client customclient.CustomMetricsClient
}
// GetRawMetric gets the given metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
func (c *customMetricsClient) GetRawMetric(metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (PodMetricsInfo, time.Time, error) {
metrics, err := c.client.NamespacedMetrics(namespace).GetForObjects(schema.GroupKind{Kind: "Pod"}, selector, metricName, metricSelector)
if err != nil {
return nil, time.Time{}, fmt.Errorf("unable to fetch metrics from custom metrics API: %v", err)
}
if len(metrics.Items) == 0 {
return nil, time.Time{}, fmt.Errorf("no metrics returned from custom metrics API")
}
res := make(PodMetricsInfo, len(metrics.Items))
for _, m := range metrics.Items {
window := metricServerDefaultMetricWindow
if m.WindowSeconds != nil {
window = time.Duration(*m.WindowSeconds) * time.Second
}
res[m.DescribedObject.Name] = PodMetric{
Timestamp: m.Timestamp.Time,
Window: window,
Value: int64(m.Value.MilliValue()),
}
m.Value.MilliValue()
}
timestamp := metrics.Items[0].Timestamp.Time
return res, timestamp, nil
}
// GetObjectMetric gets the given metric (and an associated timestamp) for the given
// object in the given namespace
func (c *customMetricsClient) GetObjectMetric(metricName string, namespace string, objectRef *autoscalingv2.CrossVersionObjectReference, metricSelector labels.Selector) (int64, time.Time, error) {
gvk := schema.FromAPIVersionAndKind(objectRef.APIVersion, objectRef.Kind)
var metricValue *customapi.MetricValue
var err error
if gvk.Kind == "Namespace" && gvk.Group == "" {
// handle namespace separately
// NB: we ignore namespace name here, since CrossVersionObjectReference isn't
// supposed to allow you to escape your namespace
metricValue, err = c.client.RootScopedMetrics().GetForObject(gvk.GroupKind(), namespace, metricName, metricSelector)
} else {
metricValue, err = c.client.NamespacedMetrics(namespace).GetForObject(gvk.GroupKind(), objectRef.Name, metricName, metricSelector)
}
if err != nil {
return 0, time.Time{}, fmt.Errorf("unable to fetch metrics from custom metrics API: %v", err)
}
return metricValue.Value.MilliValue(), metricValue.Timestamp.Time, nil
}
// externalMetricsClient implements the external metrics related parts of MetricsClient,
// using data from the external metrics API.
type externalMetricsClient struct {
client externalclient.ExternalMetricsClient
}
// GetExternalMetric gets all the values of a given external metric
// that match the specified selector.
func (c *externalMetricsClient) GetExternalMetric(metricName, namespace string, selector labels.Selector) ([]int64, time.Time, error) {
metrics, err := c.client.NamespacedMetrics(namespace).List(metricName, selector)
if err != nil {
return []int64{}, time.Time{}, fmt.Errorf("unable to fetch metrics from external metrics API: %v", err)
}
if len(metrics.Items) == 0 {
return nil, time.Time{}, fmt.Errorf("no metrics returned from external metrics API")
}
res := make([]int64, 0)
for _, m := range metrics.Items {
res = append(res, m.Value.MilliValue())
}
timestamp := metrics.Items[0].Timestamp.Time
return res, timestamp, nil
}

View File

@ -0,0 +1,57 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"time"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
)
// PodMetric contains pod metric value (the metric values are expected to be the metric as a milli-value)
type PodMetric struct {
Timestamp time.Time
Window time.Duration
Value int64
}
// PodMetricsInfo contains pod metrics as a map from pod names to PodMetricsInfo
type PodMetricsInfo map[string]PodMetric
// MetricsClient knows how to query a remote interface to retrieve container-level
// resource metrics as well as pod-level arbitrary metrics
type MetricsClient interface {
// GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
// for the specified named container in all pods matching the specified selector in the given namespace and when
// the container is an empty string it returns the sum of all the container metrics.
GetResourceMetric(ctx context.Context, resource corev1.ResourceName, namespace string, selector labels.Selector, container string) (PodMetricsInfo, time.Time, error)
// GetRawMetric gets the given metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
GetRawMetric(metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (PodMetricsInfo, time.Time, error)
// GetObjectMetric gets the given metric (and an associated timestamp) for the given
// object in the given namespace
GetObjectMetric(metricName string, namespace string, objectRef *autoscalingv2.CrossVersionObjectReference, metricSelector labels.Selector) (int64, time.Time, error)
// GetExternalMetric gets all the values of a given external metric
// that match the specified selector.
GetExternalMetric(metricName string, namespace string, selector labels.Selector) ([]int64, time.Time, error)
}

View File

@ -0,0 +1,66 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
)
// GetResourceUtilizationRatio takes in a set of metrics, a set of matching requests,
// and a target utilization percentage, and calculates the ratio of
// desired to actual utilization (returning that, the actual utilization, and the raw average value)
func GetResourceUtilizationRatio(metrics PodMetricsInfo, requests map[string]int64, targetUtilization int32) (utilizationRatio float64, currentUtilization int32, rawAverageValue int64, err error) {
metricsTotal := int64(0)
requestsTotal := int64(0)
numEntries := 0
for podName, metric := range metrics {
request, hasRequest := requests[podName]
if !hasRequest {
// we check for missing requests elsewhere, so assuming missing requests == extraneous metrics
continue
}
metricsTotal += metric.Value
requestsTotal += request
numEntries++
}
// if the set of requests is completely disjoint from the set of metrics,
// then we could have an issue where the requests total is zero
if requestsTotal == 0 {
return 0, 0, 0, fmt.Errorf("no metrics returned matched known pods")
}
currentUtilization = int32((metricsTotal * 100) / requestsTotal)
return float64(currentUtilization) / float64(targetUtilization), currentUtilization, metricsTotal / int64(numEntries), nil
}
// GetMetricUsageRatio takes in a set of metrics and a target utilization value,
// and calculates the ratio of desired to actual utilization
// (returning that and the actual utilization)
func GetMetricUsageRatio(metrics PodMetricsInfo, targetUsage int64) (utilizationRatio float64, currentUsage int64) {
metricsTotal := int64(0)
for _, metric := range metrics {
metricsTotal += metric.Value
}
currentUsage = metricsTotal / int64(len(metrics))
return float64(currentUsage) / float64(targetUsage), currentUsage
}

View File

@ -0,0 +1,149 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
type resourceUtilizationRatioTestCase struct {
metrics PodMetricsInfo
requests map[string]int64
targetUtilization int32
expectedUtilizationRatio float64
expectedCurrentUtilization int32
expectedRawAverageValue int64
expectedErr error
}
func (tc *resourceUtilizationRatioTestCase) runTest(t *testing.T) {
actualUtilizationRatio, actualCurrentUtilization, actualRawAverageValue, actualErr := GetResourceUtilizationRatio(tc.metrics, tc.requests, tc.targetUtilization)
if tc.expectedErr != nil {
assert.Error(t, actualErr, "there should be an error getting the utilization ratio")
assert.Contains(t, fmt.Sprintf("%v", actualErr), fmt.Sprintf("%v", tc.expectedErr), "the error message should be as expected")
return
}
assert.NoError(t, actualErr, "there should be no error retrieving the utilization ratio")
assert.Equal(t, tc.expectedUtilizationRatio, actualUtilizationRatio, "the utilization ratios should be as expected")
assert.Equal(t, tc.expectedCurrentUtilization, actualCurrentUtilization, "the current utilization should be as expected")
assert.Equal(t, tc.expectedRawAverageValue, actualRawAverageValue, "the raw average value should be as expected")
}
type metricUsageRatioTestCase struct {
metrics PodMetricsInfo
targetUsage int64
expectedUsageRatio float64
expectedCurrentUsage int64
}
func (tc *metricUsageRatioTestCase) runTest(t *testing.T) {
actualUsageRatio, actualCurrentUsage := GetMetricUsageRatio(tc.metrics, tc.targetUsage)
assert.Equal(t, tc.expectedUsageRatio, actualUsageRatio, "the usage ratios should be as expected")
assert.Equal(t, tc.expectedCurrentUsage, actualCurrentUsage, "the current usage should be as expected")
}
func TestGetResourceUtilizationRatioBaseCase(t *testing.T) {
tc := resourceUtilizationRatioTestCase{
metrics: PodMetricsInfo{
"test-pod-0": {Value: 50}, "test-pod-1": {Value: 76},
},
requests: map[string]int64{
"test-pod-0": 100, "test-pod-1": 100,
},
targetUtilization: 50,
expectedUtilizationRatio: 1.26,
expectedCurrentUtilization: 63,
expectedRawAverageValue: 63,
expectedErr: nil,
}
tc.runTest(t)
}
func TestGetResourceUtilizationRatioIgnorePodsWithNoRequest(t *testing.T) {
tc := resourceUtilizationRatioTestCase{
metrics: PodMetricsInfo{
"test-pod-0": {Value: 50}, "test-pod-1": {Value: 76}, "test-pod-no-request": {Value: 100},
},
requests: map[string]int64{
"test-pod-0": 100, "test-pod-1": 100,
},
targetUtilization: 50,
expectedUtilizationRatio: 1.26,
expectedCurrentUtilization: 63,
expectedRawAverageValue: 63,
expectedErr: nil,
}
tc.runTest(t)
}
func TestGetResourceUtilizationRatioExtraRequest(t *testing.T) {
tc := resourceUtilizationRatioTestCase{
metrics: PodMetricsInfo{
"test-pod-0": {Value: 50}, "test-pod-1": {Value: 76},
},
requests: map[string]int64{
"test-pod-0": 100, "test-pod-1": 100, "test-pod-extra-request": 500,
},
targetUtilization: 50,
expectedUtilizationRatio: 1.26,
expectedCurrentUtilization: 63,
expectedRawAverageValue: 63,
expectedErr: nil,
}
tc.runTest(t)
}
func TestGetResourceUtilizationRatioNoRequests(t *testing.T) {
tc := resourceUtilizationRatioTestCase{
metrics: PodMetricsInfo{
"test-pod-0": {Value: 50}, "test-pod-1": {Value: 76},
},
requests: map[string]int64{},
targetUtilization: 50,
expectedUtilizationRatio: 0,
expectedCurrentUtilization: 0,
expectedRawAverageValue: 0,
expectedErr: fmt.Errorf("no metrics returned matched known pods"),
}
tc.runTest(t)
}
func TestGetMetricUsageRatioBaseCase(t *testing.T) {
tc := metricUsageRatioTestCase{
metrics: PodMetricsInfo{
"test-pod-0": {Value: 5000}, "test-pod-1": {Value: 10000},
},
targetUsage: 10000,
expectedUsageRatio: .75,
expectedCurrentUsage: 7500,
}
tc.runTest(t)
}

View File

@ -0,0 +1,88 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// metrics packages contains metrics which are exposed from the HPA controller.
package monitor
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
// hpaControllerSubsystem - subsystem name used by HPA controller
hpaControllerSubsystem = "horizontal_pod_autoscaler_controller"
)
var (
reconciliationsTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: hpaControllerSubsystem,
Name: "reconciliations_total",
Help: "Number of reconciliations of HPA controller. The label 'action' should be either 'scale_down', 'scale_up', or 'none'. Also, the label 'error' should be either 'spec', 'internal', or 'none'. Note that if both spec and internal errors happen during a reconciliation, the first one to occur is reported in `error` label.",
StabilityLevel: metrics.ALPHA,
}, []string{"action", "error"})
reconciliationsDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: hpaControllerSubsystem,
Name: "reconciliation_duration_seconds",
Help: "The time(seconds) that the HPA controller takes to reconcile once. The label 'action' should be either 'scale_down', 'scale_up', or 'none'. Also, the label 'error' should be either 'spec', 'internal', or 'none'. Note that if both spec and internal errors happen during a reconciliation, the first one to occur is reported in `error` label.",
Buckets: metrics.ExponentialBuckets(0.001, 2, 15),
StabilityLevel: metrics.ALPHA,
}, []string{"action", "error"})
metricComputationTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: hpaControllerSubsystem,
Name: "metric_computation_total",
Help: "Number of metric computations. The label 'action' should be either 'scale_down', 'scale_up', or 'none'. Also, the label 'error' should be either 'spec', 'internal', or 'none'. The label 'metric_type' corresponds to HPA.spec.metrics[*].type",
StabilityLevel: metrics.ALPHA,
}, []string{"action", "error", "metric_type"})
metricComputationDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: hpaControllerSubsystem,
Name: "metric_computation_duration_seconds",
Help: "The time(seconds) that the HPA controller takes to calculate one metric. The label 'action' should be either 'scale_down', 'scale_up', or 'none'. The label 'error' should be either 'spec', 'internal', or 'none'. The label 'metric_type' corresponds to HPA.spec.metrics[*].type",
Buckets: metrics.ExponentialBuckets(0.001, 2, 15),
StabilityLevel: metrics.ALPHA,
}, []string{"action", "error", "metric_type"})
metricsList = []metrics.Registerable{
reconciliationsTotal,
reconciliationsDuration,
metricComputationTotal,
metricComputationDuration,
}
)
var register sync.Once
// Register all metrics.
func Register() {
// Register the metrics.
register.Do(func() {
registerMetrics(metricsList...)
})
}
// RegisterMetrics registers a list of metrics.
func registerMetrics(extraMetrics ...metrics.Registerable) {
for _, metric := range extraMetrics {
legacyregistry.MustRegister(metric)
}
}

View File

@ -0,0 +1,62 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package monitor
import (
"time"
autoscalingv2 "k8s.io/api/autoscaling/v2"
)
type ActionLabel string
type ErrorLabel string
const (
ActionLabelScaleUp ActionLabel = "scale_up"
ActionLabelScaleDown ActionLabel = "scale_down"
ActionLabelNone ActionLabel = "none"
// ErrorLabelSpec represents an error due to an invalid spec of HPA object.
ErrorLabelSpec ErrorLabel = "spec"
// ErrorLabelInternal represents an error from an internal computation or communication with other component.
ErrorLabelInternal ErrorLabel = "internal"
ErrorLabelNone ErrorLabel = "none"
)
// Monitor records some metrics so that people can monitor HPA controller.
type Monitor interface {
ObserveReconciliationResult(action ActionLabel, err ErrorLabel, duration time.Duration)
ObserveMetricComputationResult(action ActionLabel, err ErrorLabel, duration time.Duration, metricType autoscalingv2.MetricSourceType)
}
type monitor struct{}
func New() Monitor {
return &monitor{}
}
// ObserveReconciliationResult observes some metrics from a reconciliation result.
func (r *monitor) ObserveReconciliationResult(action ActionLabel, err ErrorLabel, duration time.Duration) {
reconciliationsTotal.WithLabelValues(string(action), string(err)).Inc()
reconciliationsDuration.WithLabelValues(string(action), string(err)).Observe(duration.Seconds())
}
// ObserveMetricComputationResult observes some metrics from a metric computation result.
func (r *monitor) ObserveMetricComputationResult(action ActionLabel, err ErrorLabel, duration time.Duration, metricType autoscalingv2.MetricSourceType) {
metricComputationTotal.WithLabelValues(string(action), string(err), string(metricType)).Inc()
metricComputationDuration.WithLabelValues(string(action), string(err), string(metricType)).Observe(duration.Seconds())
}

View File

@ -0,0 +1,281 @@
package federatedhpa
import (
"context"
"fmt"
"math"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
metricsclient "github.com/karmada-io/karmada/pkg/controllers/federatedhpa/metrics"
"github.com/karmada-io/karmada/pkg/util/helper"
)
// This file is basically lifted from https://github.com/kubernetes/kubernetes/blob/release-1.27/pkg/controller/podautoscaler/replica_calculator.go.
// The main difference is:
// 1. ReplicaCalculator no longer has podLister built in. PodList is calculated in the outer controller.
// 2. ReplicaCalculator needs to import a calibration value to calibrate the calculation results
// when they are determined by the global number of ready Pods or metrics.
// ReplicaCalculator bundles all needed information to calculate the target amount of replicas
type ReplicaCalculator struct {
metricsClient metricsclient.MetricsClient
tolerance float64
cpuInitializationPeriod time.Duration
delayOfInitialReadinessStatus time.Duration
}
// NewReplicaCalculator creates a new ReplicaCalculator and passes all necessary information to the new instance
func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, tolerance float64, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) *ReplicaCalculator {
return &ReplicaCalculator{
metricsClient: metricsClient,
tolerance: tolerance,
cpuInitializationPeriod: cpuInitializationPeriod,
delayOfInitialReadinessStatus: delayOfInitialReadinessStatus,
}
}
// GetResourceReplicas calculates the desired replica count based on a target resource utilization percentage
// of the given resource for pods matching the given selector in the given namespace, and the current replica count
//
//nolint:gocyclo
func (c *ReplicaCalculator) GetResourceReplicas(ctx context.Context, currentReplicas int32, targetUtilization int32, resource corev1.ResourceName, namespace string, selector labels.Selector, container string, podList []*corev1.Pod, calibration float64) (replicaCount int32, utilization int32, rawUtilization int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetResourceMetric(ctx, resource, namespace, selector, container)
if err != nil {
return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
}
itemsLen := len(podList)
if itemsLen == 0 {
return 0, 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count")
}
readyPodCount, unreadyPods, missingPods, ignoredPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
removeMetricsForPods(metrics, ignoredPods)
removeMetricsForPods(metrics, unreadyPods)
requests, err := calculatePodRequests(podList, container, resource)
if err != nil {
return 0, 0, 0, time.Time{}, err
}
if len(metrics) == 0 {
return 0, 0, 0, time.Time{}, fmt.Errorf("did not receive metrics for any ready pods")
}
usageRatio, utilization, rawUtilization, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
if err != nil {
return 0, 0, 0, time.Time{}, err
}
scaleUpWithUnready := len(unreadyPods) > 0 && usageRatio > 1.0
if !scaleUpWithUnready && len(missingPods) == 0 {
if math.Abs(1.0-usageRatio) <= c.tolerance {
// return the current replicas if the change would be too small
return currentReplicas, utilization, rawUtilization, timestamp, nil
}
// if we don't have any unready or missing pods, we can calculate the new replica count now
return int32(math.Ceil(usageRatio * float64(readyPodCount) / calibration)), utilization, rawUtilization, timestamp, nil
}
if len(missingPods) > 0 {
if usageRatio < 1.0 {
// on a scale-down, treat missing pods as using 100% (all) of the resource request
// or the utilization target for targets higher than 100%
fallbackUtilization := int64(max(100, targetUtilization))
for podName := range missingPods {
metrics[podName] = metricsclient.PodMetric{Value: requests[podName] * fallbackUtilization / 100}
}
} else if usageRatio > 1.0 {
// on a scale-up, treat missing pods as using 0% of the resource request
for podName := range missingPods {
metrics[podName] = metricsclient.PodMetric{Value: 0}
}
}
}
if scaleUpWithUnready {
// on a scale-up, treat unready pods as using 0% of the resource request
for podName := range unreadyPods {
metrics[podName] = metricsclient.PodMetric{Value: 0}
}
}
// re-run the utilization calculation with our new numbers
newUsageRatio, _, _, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
if err != nil {
return 0, utilization, rawUtilization, time.Time{}, err
}
if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
// return the current replicas if the change would be too small,
// or if the new usage ratio would cause a change in scale direction
return currentReplicas, utilization, rawUtilization, timestamp, nil
}
newReplicas := int32(math.Ceil(newUsageRatio * float64(len(metrics)) / calibration))
if (newUsageRatio < 1.0 && newReplicas > currentReplicas) || (newUsageRatio > 1.0 && newReplicas < currentReplicas) {
// return the current replicas if the change of metrics length would cause a change in scale direction
return currentReplicas, utilization, rawUtilization, timestamp, nil
}
// return the result, where the number of replicas considered is
// however many replicas factored into our calculation
return newReplicas, utilization, rawUtilization, timestamp, nil
}
// GetRawResourceReplicas calculates the desired replica count based on a target resource utilization (as a raw milli-value)
// for pods matching the given selector in the given namespace, and the current replica count
func (c *ReplicaCalculator) GetRawResourceReplicas(ctx context.Context, currentReplicas int32, targetUsage int64, resource corev1.ResourceName, namespace string, selector labels.Selector, container string, podList []*corev1.Pod, calibration float64) (replicaCount int32, usage int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetResourceMetric(ctx, resource, namespace, selector, container)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
}
replicaCount, usage, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUsage, resource, podList, calibration)
return replicaCount, usage, timestamp, err
}
// calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics.
//
//nolint:gocyclo
func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUsage int64, resource corev1.ResourceName, podList []*corev1.Pod, calibration float64) (replicaCount int32, usage int64, err error) {
if len(podList) == 0 {
return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count")
}
readyPodCount, unreadyPods, missingPods, ignoredPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
removeMetricsForPods(metrics, ignoredPods)
removeMetricsForPods(metrics, unreadyPods)
if len(metrics) == 0 {
return 0, 0, fmt.Errorf("did not receive metrics for any ready pods")
}
usageRatio, usage := metricsclient.GetMetricUsageRatio(metrics, targetUsage)
scaleUpWithUnready := len(unreadyPods) > 0 && usageRatio > 1.0
if !scaleUpWithUnready && len(missingPods) == 0 {
if math.Abs(1.0-usageRatio) <= c.tolerance {
// return the current replicas if the change would be too small
return currentReplicas, usage, nil
}
// if we don't have any unready or missing pods, we can calculate the new replica count now
return int32(math.Ceil(usageRatio * float64(readyPodCount) / calibration)), usage, nil
}
if len(missingPods) > 0 {
if usageRatio < 1.0 {
// on a scale-down, treat missing pods as using 100% of the resource request
for podName := range missingPods {
metrics[podName] = metricsclient.PodMetric{Value: targetUsage}
}
} else {
// on a scale-up, treat missing pods as using 0% of the resource request
for podName := range missingPods {
metrics[podName] = metricsclient.PodMetric{Value: 0}
}
}
}
if scaleUpWithUnready {
// on a scale-up, treat unready pods as using 0% of the resource request
for podName := range unreadyPods {
metrics[podName] = metricsclient.PodMetric{Value: 0}
}
}
// re-run the utilization calculation with our new numbers
newUsageRatio, _ := metricsclient.GetMetricUsageRatio(metrics, targetUsage)
if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
// return the current replicas if the change would be too small,
// or if the new usage ratio would cause a change in scale direction
return currentReplicas, usage, nil
}
newReplicas := int32(math.Ceil(newUsageRatio * float64(len(metrics)) / calibration))
if (newUsageRatio < 1.0 && newReplicas > currentReplicas) || (newUsageRatio > 1.0 && newReplicas < currentReplicas) {
// return the current replicas if the change of metrics length would cause a change in scale direction
return currentReplicas, usage, nil
}
// return the result, where the number of replicas considered is
// however many replicas factored into our calculation
return newReplicas, usage, nil
}
func groupPods(pods []*corev1.Pod, metrics metricsclient.PodMetricsInfo, resource corev1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, unreadyPods, missingPods, ignoredPods sets.Set[string]) {
missingPods = sets.New[string]()
unreadyPods = sets.New[string]()
ignoredPods = sets.New[string]()
for _, pod := range pods {
if pod.DeletionTimestamp != nil || pod.Status.Phase == corev1.PodFailed {
ignoredPods.Insert(pod.Name)
continue
}
// Pending pods are ignored.
if pod.Status.Phase == corev1.PodPending {
unreadyPods.Insert(pod.Name)
continue
}
// Pods missing metrics.
metric, found := metrics[pod.Name]
if !found {
missingPods.Insert(pod.Name)
continue
}
// Unready pods are ignored.
if resource == corev1.ResourceCPU {
var unready bool
_, condition := helper.GetPodCondition(&pod.Status, corev1.PodReady)
if condition == nil || pod.Status.StartTime == nil {
unready = true
} else {
// Pod still within possible initialisation period.
if pod.Status.StartTime.Add(cpuInitializationPeriod).After(time.Now()) {
// Ignore sample if pod is unready or one window of metric wasn't collected since last state transition.
unready = condition.Status == corev1.ConditionFalse || metric.Timestamp.Before(condition.LastTransitionTime.Time.Add(metric.Window))
} else {
// Ignore metric if pod is unready and it has never been ready.
unready = condition.Status == corev1.ConditionFalse && pod.Status.StartTime.Add(delayOfInitialReadinessStatus).After(condition.LastTransitionTime.Time)
}
}
if unready {
unreadyPods.Insert(pod.Name)
continue
}
}
readyPodCount++
}
return
}
func calculatePodRequests(pods []*corev1.Pod, container string, resource corev1.ResourceName) (map[string]int64, error) {
requests := make(map[string]int64, len(pods))
for _, pod := range pods {
podSum := int64(0)
for _, c := range pod.Spec.Containers {
if container == "" || container == c.Name {
if containerRequest, ok := c.Resources.Requests[resource]; ok {
podSum += containerRequest.MilliValue()
} else {
return nil, fmt.Errorf("missing request for %s in container %s of Pod %s", resource, c.Name, pod.ObjectMeta.Name)
}
}
}
requests[pod.Name] = podSum
}
return requests, nil
}
func removeMetricsForPods(metrics metricsclient.PodMetricsInfo, pods sets.Set[string]) {
for _, pod := range pods.UnsortedList() {
delete(metrics, pod)
}
}

View File

@ -77,6 +77,20 @@ webhooks:
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: autoscaling.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["autoscaling.karmada.io"]
apiVersions: ["*"]
resources: ["federatedhpas"]
scope: "Namespaced"
clientConfig:
url: https://karmada-webhook.%[1]s.svc:443/mutate-federatedhpa
caBundle: %[2]s
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3`, systemNamespace, caBundle)
}

View File

@ -60,8 +60,10 @@ func PodTransformFunc(obj interface{}) (interface{}, error) {
aggregatedPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
Name: pod.Name,
Namespace: pod.Namespace,
Labels: pod.Labels,
DeletionTimestamp: pod.DeletionTimestamp,
},
Spec: corev1.PodSpec{
NodeName: pod.Spec.NodeName,
@ -70,7 +72,9 @@ func PodTransformFunc(obj interface{}) (interface{}, error) {
Overhead: pod.Spec.Overhead,
},
Status: corev1.PodStatus{
Phase: pod.Status.Phase,
Phase: pod.Status.Phase,
Conditions: pod.Status.Conditions,
StartTime: pod.Status.StartTime,
},
}
return aggregatedPod, nil

View File

@ -163,6 +163,7 @@ func TestNodeTransformFunc(t *testing.T) {
}
func TestPodTransformFunc(t *testing.T) {
timeNow := metav1.Now()
tests := []struct {
name string
obj interface{}
@ -181,12 +182,15 @@ func TestPodTransformFunc(t *testing.T) {
Manager: "whatever",
},
},
DeletionTimestamp: &timeNow,
},
},
want: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
Namespace: "foo",
Name: "bar",
Labels: map[string]string{"a": "b"},
DeletionTimestamp: &timeNow,
},
},
},
@ -210,6 +214,12 @@ func TestPodTransformFunc(t *testing.T) {
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
},
},
StartTime: &timeNow,
},
},
want: &corev1.Pod{
@ -230,6 +240,12 @@ func TestPodTransformFunc(t *testing.T) {
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
},
},
StartTime: &timeNow,
},
},
},

View File

@ -10,6 +10,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
autoscalingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/autoscaling/v1alpha1"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
@ -23,17 +24,18 @@ import (
var aggregatedScheme = runtime.NewScheme()
func init() {
utilruntime.Must(scheme.AddToScheme(aggregatedScheme)) // add Kubernetes schemes
utilruntime.Must(clusterv1alpha1.AddToScheme(aggregatedScheme)) // add cluster schemes
utilruntime.Must(configv1alpha1.AddToScheme(aggregatedScheme)) // add config v1alpha1 schemes
utilruntime.Must(networkingv1alpha1.AddToScheme(aggregatedScheme)) // add network v1alpha1 schemes
utilruntime.Must(policyv1alpha1.AddToScheme(aggregatedScheme)) // add propagation schemes
utilruntime.Must(workv1alpha1.AddToScheme(aggregatedScheme)) // add work v1alpha1 schemes
utilruntime.Must(workv1alpha2.AddToScheme(aggregatedScheme)) // add work v1alpha2 schemes
utilruntime.Must(searchv1alpha1.AddToScheme(aggregatedScheme)) // add search v1alpha1 schemes
utilruntime.Must(mcsv1alpha1.AddToScheme(aggregatedScheme)) // add mcs-api schemes
utilruntime.Must(clusterapiv1alpha4.AddToScheme(aggregatedScheme)) // add cluster-api v1alpha4 schemes
utilruntime.Must(clusterapiv1beta1.AddToScheme(aggregatedScheme)) // add cluster-api v1beta1 schemes
utilruntime.Must(scheme.AddToScheme(aggregatedScheme)) // add Kubernetes schemes
utilruntime.Must(clusterv1alpha1.AddToScheme(aggregatedScheme)) // add cluster schemes
utilruntime.Must(configv1alpha1.AddToScheme(aggregatedScheme)) // add config v1alpha1 schemes
utilruntime.Must(networkingv1alpha1.AddToScheme(aggregatedScheme)) // add network v1alpha1 schemes
utilruntime.Must(policyv1alpha1.AddToScheme(aggregatedScheme)) // add propagation schemes
utilruntime.Must(workv1alpha1.AddToScheme(aggregatedScheme)) // add work v1alpha1 schemes
utilruntime.Must(workv1alpha2.AddToScheme(aggregatedScheme)) // add work v1alpha2 schemes
utilruntime.Must(searchv1alpha1.AddToScheme(aggregatedScheme)) // add search v1alpha1 schemes
utilruntime.Must(mcsv1alpha1.AddToScheme(aggregatedScheme)) // add mcs-api schemes
utilruntime.Must(clusterapiv1alpha4.AddToScheme(aggregatedScheme)) // add cluster-api v1alpha4 schemes
utilruntime.Must(clusterapiv1beta1.AddToScheme(aggregatedScheme)) // add cluster-api v1beta1 schemes
utilruntime.Must(autoscalingv1alpha1.AddToScheme(aggregatedScheme)) // add autoscaling v1alpha1 schemes
}
// NewSchema returns a singleton schema set which aggregated Kubernetes's schemes and extended schemes.

View File

@ -9,6 +9,24 @@ import (
"github.com/karmada-io/karmada/pkg/util/lifted"
)
// IsPodReady returns true if a pod is ready; false otherwise.
func IsPodReady(pod *corev1.Pod) bool {
return IsPodReadyConditionTrue(pod.Status)
}
// IsPodReadyConditionTrue returns true if a pod is ready; false otherwise.
func IsPodReadyConditionTrue(status corev1.PodStatus) bool {
condition := GetPodReadyCondition(status)
return condition != nil && condition.Status == corev1.ConditionTrue
}
// GetPodReadyCondition extracts the pod ready condition from the given status and returns that.
// Returns nil if the condition is not present.
func GetPodReadyCondition(status corev1.PodStatus) *corev1.PodCondition {
_, condition := GetPodCondition(&status, corev1.PodReady)
return condition
}
// GetPodCondition extracts the provided condition from the given status and returns that.
// Returns nil and -1 if the condition is not present, and the index of the located condition.
func GetPodCondition(status *corev1.PodStatus, conditionType corev1.PodConditionType) (int, *corev1.PodCondition) {

View File

@ -0,0 +1,122 @@
package lifted
import (
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"
autoscalingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/autoscaling/v1alpha1"
)
// This code is lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.27/pkg/apis/autoscaling/v2/defaults.go
// DefaultCPUUtilization is the default value for CPU utilization, provided no other
// metrics are present. This is here because it's used by both the v2beta1 defaulting
// logic, and the pseudo-defaulting done in v1 conversion.
const DefaultCPUUtilization = 80
var (
// These constants repeats previous HPA behavior
scaleUpLimitPercent int32 = 100
scaleUpLimitMinimumPods int32 = 4
scaleUpPeriod int32 = 15
scaleUpStabilizationSeconds int32
maxPolicy = autoscalingv2.MaxChangePolicySelect
defaultHPAScaleUpRules = autoscalingv2.HPAScalingRules{
StabilizationWindowSeconds: &scaleUpStabilizationSeconds,
SelectPolicy: &maxPolicy,
Policies: []autoscalingv2.HPAScalingPolicy{
{
Type: autoscalingv2.PodsScalingPolicy,
Value: scaleUpLimitMinimumPods,
PeriodSeconds: scaleUpPeriod,
},
{
Type: autoscalingv2.PercentScalingPolicy,
Value: scaleUpLimitPercent,
PeriodSeconds: scaleUpPeriod,
},
},
}
scaleDownPeriod int32 = 15
// Currently we can set the downscaleStabilizationWindow from the command line
// So we can not rewrite the command line option from here
scaleDownLimitPercent int32 = 100
defaultHPAScaleDownRules = autoscalingv2.HPAScalingRules{
StabilizationWindowSeconds: nil,
SelectPolicy: &maxPolicy,
Policies: []autoscalingv2.HPAScalingPolicy{
{
Type: autoscalingv2.PercentScalingPolicy,
Value: scaleDownLimitPercent,
PeriodSeconds: scaleDownPeriod,
},
},
}
)
func SetDefaultsFederatedHPA(obj *autoscalingv1alpha1.FederatedHPA) {
if obj.Spec.MinReplicas == nil {
obj.Spec.MinReplicas = pointer.Int32(1)
}
if len(obj.Spec.Metrics) == 0 {
utilizationDefaultVal := int32(DefaultCPUUtilization)
obj.Spec.Metrics = []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricSource{
Name: corev1.ResourceCPU,
Target: autoscalingv2.MetricTarget{
Type: autoscalingv2.UtilizationMetricType,
AverageUtilization: &utilizationDefaultVal,
},
},
},
}
}
SetDefaultsHorizontalPodAutoscalerBehavior(obj)
}
// SetDefaultsHorizontalPodAutoscalerBehavior fills the behavior if it is not null
func SetDefaultsHorizontalPodAutoscalerBehavior(obj *autoscalingv1alpha1.FederatedHPA) {
// if behavior is specified, we should fill all the 'nil' values with the default ones
if obj.Spec.Behavior != nil {
obj.Spec.Behavior.ScaleUp = GenerateHPAScaleUpRules(obj.Spec.Behavior.ScaleUp)
obj.Spec.Behavior.ScaleDown = GenerateHPAScaleDownRules(obj.Spec.Behavior.ScaleDown)
}
}
// GenerateHPAScaleUpRules returns a fully-initialized HPAScalingRules value
// We guarantee that no pointer in the structure will have the 'nil' value
func GenerateHPAScaleUpRules(scalingRules *autoscalingv2.HPAScalingRules) *autoscalingv2.HPAScalingRules {
defaultScalingRules := defaultHPAScaleUpRules.DeepCopy()
return copyHPAScalingRules(scalingRules, defaultScalingRules)
}
// GenerateHPAScaleDownRules returns a fully-initialized HPAScalingRules value
// We guarantee that no pointer in the structure will have the 'nil' value
// EXCEPT StabilizationWindowSeconds, for reasoning check the comment for defaultHPAScaleDownRules
func GenerateHPAScaleDownRules(scalingRules *autoscalingv2.HPAScalingRules) *autoscalingv2.HPAScalingRules {
defaultScalingRules := defaultHPAScaleDownRules.DeepCopy()
return copyHPAScalingRules(scalingRules, defaultScalingRules)
}
// copyHPAScalingRules copies all non-`nil` fields in HPA constraint structure
func copyHPAScalingRules(from, to *autoscalingv2.HPAScalingRules) *autoscalingv2.HPAScalingRules {
if from == nil {
return to
}
if from.SelectPolicy != nil {
to.SelectPolicy = from.SelectPolicy
}
if from.StabilizationWindowSeconds != nil {
to.StabilizationWindowSeconds = from.StabilizationWindowSeconds
}
if from.Policies != nil {
to.Policies = from.Policies
}
return to
}

View File

@ -0,0 +1,217 @@
package lifted
import (
"testing"
"github.com/stretchr/testify/assert"
autoscalingv2 "k8s.io/api/autoscaling/v2"
utilpointer "k8s.io/utils/pointer"
)
// This code is lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.27/pkg/apis/autoscaling/v2/defaults_test.go
func TestGenerateScaleDownRules(t *testing.T) {
type TestCase struct {
rateDownPods int32
rateDownPodsPeriodSeconds int32
rateDownPercent int32
rateDownPercentPeriodSeconds int32
stabilizationSeconds *int32
selectPolicy *autoscalingv2.ScalingPolicySelect
expectedPolicies []autoscalingv2.HPAScalingPolicy
expectedStabilization *int32
expectedSelectPolicy string
annotation string
}
maxPolicy := autoscalingv2.MaxChangePolicySelect
minPolicy := autoscalingv2.MinChangePolicySelect
tests := []TestCase{
{
annotation: "Default values",
expectedPolicies: []autoscalingv2.HPAScalingPolicy{
{Type: autoscalingv2.PercentScalingPolicy, Value: 100, PeriodSeconds: 15},
},
expectedStabilization: nil,
expectedSelectPolicy: string(autoscalingv2.MaxChangePolicySelect),
},
{
annotation: "All parameters are specified",
rateDownPods: 1,
rateDownPodsPeriodSeconds: 2,
rateDownPercent: 3,
rateDownPercentPeriodSeconds: 4,
stabilizationSeconds: utilpointer.Int32(25),
selectPolicy: &maxPolicy,
expectedPolicies: []autoscalingv2.HPAScalingPolicy{
{Type: autoscalingv2.PodsScalingPolicy, Value: 1, PeriodSeconds: 2},
{Type: autoscalingv2.PercentScalingPolicy, Value: 3, PeriodSeconds: 4},
},
expectedStabilization: utilpointer.Int32(25),
expectedSelectPolicy: string(autoscalingv2.MaxChangePolicySelect),
},
{
annotation: "Percent policy is specified",
rateDownPercent: 1,
rateDownPercentPeriodSeconds: 2,
selectPolicy: &minPolicy,
expectedPolicies: []autoscalingv2.HPAScalingPolicy{
{Type: autoscalingv2.PercentScalingPolicy, Value: 1, PeriodSeconds: 2},
},
expectedStabilization: nil,
expectedSelectPolicy: string(autoscalingv2.MinChangePolicySelect),
},
{
annotation: "Pods policy is specified",
rateDownPods: 3,
rateDownPodsPeriodSeconds: 4,
expectedPolicies: []autoscalingv2.HPAScalingPolicy{
{Type: autoscalingv2.PodsScalingPolicy, Value: 3, PeriodSeconds: 4},
},
expectedStabilization: nil,
expectedSelectPolicy: string(autoscalingv2.MaxChangePolicySelect),
},
}
for _, tc := range tests {
t.Run(tc.annotation, func(t *testing.T) {
scaleDownRules := &autoscalingv2.HPAScalingRules{
StabilizationWindowSeconds: tc.stabilizationSeconds,
SelectPolicy: tc.selectPolicy,
}
if tc.rateDownPods != 0 || tc.rateDownPodsPeriodSeconds != 0 {
scaleDownRules.Policies = append(scaleDownRules.Policies, autoscalingv2.HPAScalingPolicy{
Type: autoscalingv2.PodsScalingPolicy, Value: tc.rateDownPods, PeriodSeconds: tc.rateDownPodsPeriodSeconds,
})
}
if tc.rateDownPercent != 0 || tc.rateDownPercentPeriodSeconds != 0 {
scaleDownRules.Policies = append(scaleDownRules.Policies, autoscalingv2.HPAScalingPolicy{
Type: autoscalingv2.PercentScalingPolicy, Value: tc.rateDownPercent, PeriodSeconds: tc.rateDownPercentPeriodSeconds,
})
}
down := GenerateHPAScaleDownRules(scaleDownRules)
assert.EqualValues(t, tc.expectedPolicies, down.Policies)
if tc.expectedStabilization != nil {
assert.Equal(t, *tc.expectedStabilization, *down.StabilizationWindowSeconds)
} else {
assert.Equal(t, tc.expectedStabilization, down.StabilizationWindowSeconds)
}
assert.Equal(t, autoscalingv2.ScalingPolicySelect(tc.expectedSelectPolicy), *down.SelectPolicy)
})
}
}
func TestGenerateScaleUpRules(t *testing.T) {
type TestCase struct {
rateUpPods int32
rateUpPodsPeriodSeconds int32
rateUpPercent int32
rateUpPercentPeriodSeconds int32
stabilizationSeconds *int32
selectPolicy *autoscalingv2.ScalingPolicySelect
expectedPolicies []autoscalingv2.HPAScalingPolicy
expectedStabilization *int32
expectedSelectPolicy string
annotation string
}
maxPolicy := autoscalingv2.MaxChangePolicySelect
minPolicy := autoscalingv2.MinChangePolicySelect
tests := []TestCase{
{
annotation: "Default values",
expectedPolicies: []autoscalingv2.HPAScalingPolicy{
{Type: autoscalingv2.PodsScalingPolicy, Value: 4, PeriodSeconds: 15},
{Type: autoscalingv2.PercentScalingPolicy, Value: 100, PeriodSeconds: 15},
},
expectedStabilization: utilpointer.Int32(0),
expectedSelectPolicy: string(autoscalingv2.MaxChangePolicySelect),
},
{
annotation: "All parameters are specified",
rateUpPods: 1,
rateUpPodsPeriodSeconds: 2,
rateUpPercent: 3,
rateUpPercentPeriodSeconds: 4,
stabilizationSeconds: utilpointer.Int32(25),
selectPolicy: &maxPolicy,
expectedPolicies: []autoscalingv2.HPAScalingPolicy{
{Type: autoscalingv2.PodsScalingPolicy, Value: 1, PeriodSeconds: 2},
{Type: autoscalingv2.PercentScalingPolicy, Value: 3, PeriodSeconds: 4},
},
expectedStabilization: utilpointer.Int32(25),
expectedSelectPolicy: string(autoscalingv2.MaxChangePolicySelect),
},
{
annotation: "Pod policy is specified",
rateUpPods: 1,
rateUpPodsPeriodSeconds: 2,
selectPolicy: &minPolicy,
expectedPolicies: []autoscalingv2.HPAScalingPolicy{
{Type: autoscalingv2.PodsScalingPolicy, Value: 1, PeriodSeconds: 2},
},
expectedStabilization: utilpointer.Int32(0),
expectedSelectPolicy: string(autoscalingv2.MinChangePolicySelect),
},
{
annotation: "Percent policy is specified",
rateUpPercent: 7,
rateUpPercentPeriodSeconds: 10,
expectedPolicies: []autoscalingv2.HPAScalingPolicy{
{Type: autoscalingv2.PercentScalingPolicy, Value: 7, PeriodSeconds: 10},
},
expectedStabilization: utilpointer.Int32(0),
expectedSelectPolicy: string(autoscalingv2.MaxChangePolicySelect),
},
{
annotation: "Pod policy and stabilization window are specified",
rateUpPodsPeriodSeconds: 2,
stabilizationSeconds: utilpointer.Int32(25),
rateUpPods: 4,
expectedPolicies: []autoscalingv2.HPAScalingPolicy{
{Type: autoscalingv2.PodsScalingPolicy, Value: 4, PeriodSeconds: 2},
},
expectedStabilization: utilpointer.Int32(25),
expectedSelectPolicy: string(autoscalingv2.MaxChangePolicySelect),
},
{
annotation: "Percent policy and stabilization window are specified",
rateUpPercent: 7,
rateUpPercentPeriodSeconds: 60,
stabilizationSeconds: utilpointer.Int32(25),
expectedPolicies: []autoscalingv2.HPAScalingPolicy{
{Type: autoscalingv2.PercentScalingPolicy, Value: 7, PeriodSeconds: 60},
},
expectedStabilization: utilpointer.Int32(25),
expectedSelectPolicy: string(autoscalingv2.MaxChangePolicySelect),
},
}
for _, tc := range tests {
t.Run(tc.annotation, func(t *testing.T) {
scaleUpRules := &autoscalingv2.HPAScalingRules{
StabilizationWindowSeconds: tc.stabilizationSeconds,
SelectPolicy: tc.selectPolicy,
}
if tc.rateUpPods != 0 || tc.rateUpPodsPeriodSeconds != 0 {
scaleUpRules.Policies = append(scaleUpRules.Policies, autoscalingv2.HPAScalingPolicy{
Type: autoscalingv2.PodsScalingPolicy, Value: tc.rateUpPods, PeriodSeconds: tc.rateUpPodsPeriodSeconds,
})
}
if tc.rateUpPercent != 0 || tc.rateUpPercentPeriodSeconds != 0 {
scaleUpRules.Policies = append(scaleUpRules.Policies, autoscalingv2.HPAScalingPolicy{
Type: autoscalingv2.PercentScalingPolicy, Value: tc.rateUpPercent, PeriodSeconds: tc.rateUpPercentPeriodSeconds,
})
}
up := GenerateHPAScaleUpRules(scaleUpRules)
assert.Equal(t, tc.expectedPolicies, up.Policies)
if tc.expectedStabilization != nil {
assert.Equal(t, *tc.expectedStabilization, *up.StabilizationWindowSeconds)
} else {
assert.Equal(t, tc.expectedStabilization, up.StabilizationWindowSeconds)
}
assert.Equal(t, autoscalingv2.ScalingPolicySelect(tc.expectedSelectPolicy), *up.SelectPolicy)
})
}
}

View File

@ -0,0 +1,380 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package selectors
import (
"fmt"
"strings"
"sync"
pkglabels "k8s.io/apimachinery/pkg/labels"
)
// BiMultimap is an efficient, bi-directional mapping of object
// keys. Associations are created by putting keys with a selector.
type BiMultimap struct {
mux sync.RWMutex
// Objects.
labeledObjects map[Key]*labeledObject
selectingObjects map[Key]*selectingObject
// Associations.
labeledBySelecting map[selectorKey]*labeledObjects
selectingByLabeled map[labelsKey]*selectingObjects
}
// NewBiMultimap creates a map.
func NewBiMultimap() *BiMultimap {
return &BiMultimap{
labeledObjects: make(map[Key]*labeledObject),
selectingObjects: make(map[Key]*selectingObject),
labeledBySelecting: make(map[selectorKey]*labeledObjects),
selectingByLabeled: make(map[labelsKey]*selectingObjects),
}
}
// Key is a tuple of name and namespace.
type Key struct {
Name string
Namespace string
}
// Parse turns a string in the format namespace/name into a Key.
func Parse(s string) (key Key) {
ns := strings.SplitN(s, "/", 2)
if len(ns) == 2 {
key.Namespace = ns[0]
key.Name = ns[1]
} else {
key.Name = ns[0]
}
return key
}
func (k Key) String() string {
return fmt.Sprintf("%v/%v", k.Namespace, k.Name)
}
type selectorKey struct {
key string
namespace string
}
type selectingObject struct {
key Key
selector pkglabels.Selector
// selectorKey is a stable serialization of selector for
// association caching.
selectorKey selectorKey
}
type selectingObjects struct {
objects map[Key]*selectingObject
refCount int
}
type labelsKey struct {
key string
namespace string
}
type labeledObject struct {
key Key
labels map[string]string
// labelsKey is a stable serialization of labels for association
// caching.
labelsKey labelsKey
}
type labeledObjects struct {
objects map[Key]*labeledObject
refCount int
}
// Put inserts or updates an object and the incoming associations
// based on the object labels.
func (m *BiMultimap) Put(key Key, labels map[string]string) {
m.mux.Lock()
defer m.mux.Unlock()
labelsKey := labelsKey{
key: pkglabels.Set(labels).String(),
namespace: key.Namespace,
}
if l, ok := m.labeledObjects[key]; ok {
// Update labeled object.
if labelsKey == l.labelsKey {
// No change to labels.
return
}
// Delete before readding.
m.delete(key)
}
// Add labeled object.
labels = copyLabels(labels)
labeledObject := &labeledObject{
key: key,
labels: labels,
labelsKey: labelsKey,
}
m.labeledObjects[key] = labeledObject
// Add associations.
if _, ok := m.selectingByLabeled[labelsKey]; !ok {
// Cache miss. Scan selecting objects.
selecting := &selectingObjects{
objects: make(map[Key]*selectingObject),
}
set := pkglabels.Set(labels)
for _, s := range m.selectingObjects {
if s.key.Namespace != key.Namespace {
continue
}
if s.selector.Matches(set) {
selecting.objects[s.key] = s
}
}
// Associate selecting with labeled.
m.selectingByLabeled[labelsKey] = selecting
}
selecting := m.selectingByLabeled[labelsKey]
selecting.refCount++
for _, sObject := range selecting.objects {
// Associate labeled with selecting.
labeled := m.labeledBySelecting[sObject.selectorKey]
labeled.objects[labeledObject.key] = labeledObject
}
}
// Delete removes a labeled object and incoming associations.
func (m *BiMultimap) Delete(key Key) {
m.mux.Lock()
defer m.mux.Unlock()
m.delete(key)
}
func (m *BiMultimap) delete(key Key) {
if _, ok := m.labeledObjects[key]; !ok {
// Does not exist.
return
}
labeledObject := m.labeledObjects[key]
labelsKey := labeledObject.labelsKey
defer delete(m.labeledObjects, key)
if _, ok := m.selectingByLabeled[labelsKey]; !ok {
// No associations.
return
}
// Remove associations.
for _, selectingObject := range m.selectingByLabeled[labelsKey].objects {
selectorKey := selectingObject.selectorKey
// Delete selectingObject to labeledObject association.
delete(m.labeledBySelecting[selectorKey].objects, key)
}
m.selectingByLabeled[labelsKey].refCount--
// Garbage collect labeledObject to selectingObject associations.
if m.selectingByLabeled[labelsKey].refCount == 0 {
delete(m.selectingByLabeled, labelsKey)
}
}
// Exists returns true if the labeled object is present in the map.
func (m *BiMultimap) Exists(key Key) bool {
m.mux.Lock()
defer m.mux.Unlock()
_, exists := m.labeledObjects[key]
return exists
}
// PutSelector inserts or updates an object with a selector. Associations
// are created or updated based on the selector.
func (m *BiMultimap) PutSelector(key Key, selector pkglabels.Selector) {
m.mux.Lock()
defer m.mux.Unlock()
selectorKey := selectorKey{
key: selector.String(),
namespace: key.Namespace,
}
if s, ok := m.selectingObjects[key]; ok {
// Update selecting object.
if selectorKey == s.selectorKey {
// No change to selector.
return
}
// Delete before readding.
m.deleteSelector(key)
}
// Add selecting object.
selectingObject := &selectingObject{
key: key,
selector: selector,
selectorKey: selectorKey,
}
m.selectingObjects[key] = selectingObject
// Add associations.
if _, ok := m.labeledBySelecting[selectorKey]; !ok {
// Cache miss. Scan labeled objects.
labeled := &labeledObjects{
objects: make(map[Key]*labeledObject),
}
for _, l := range m.labeledObjects {
if l.key.Namespace != key.Namespace {
continue
}
set := pkglabels.Set(l.labels)
if selector.Matches(set) {
labeled.objects[l.key] = l
}
}
// Associate labeled with selecting.
m.labeledBySelecting[selectorKey] = labeled
}
labeled := m.labeledBySelecting[selectorKey]
labeled.refCount++
for _, labeledObject := range labeled.objects {
// Associate selecting with labeled.
selecting := m.selectingByLabeled[labeledObject.labelsKey]
selecting.objects[selectingObject.key] = selectingObject
}
}
// DeleteSelector deletes a selecting object and associations created by its
// selector.
func (m *BiMultimap) DeleteSelector(key Key) {
m.mux.Lock()
defer m.mux.Unlock()
m.deleteSelector(key)
}
func (m *BiMultimap) deleteSelector(key Key) {
if _, ok := m.selectingObjects[key]; !ok {
// Does not exist.
return
}
selectingObject := m.selectingObjects[key]
selectorKey := selectingObject.selectorKey
defer delete(m.selectingObjects, key)
if _, ok := m.labeledBySelecting[selectorKey]; !ok {
// No associations.
return
}
// Remove associations.
for _, labeledObject := range m.labeledBySelecting[selectorKey].objects {
labelsKey := labeledObject.labelsKey
// Delete labeledObject to selectingObject association.
delete(m.selectingByLabeled[labelsKey].objects, key)
}
m.labeledBySelecting[selectorKey].refCount--
// Garbage collect selectingObjects to labeledObject associations.
if m.labeledBySelecting[selectorKey].refCount == 0 {
delete(m.labeledBySelecting, selectorKey)
}
}
// SelectorExists returns true if the selecting object is present in the map.
func (m *BiMultimap) SelectorExists(key Key) bool {
m.mux.Lock()
defer m.mux.Unlock()
_, exists := m.selectingObjects[key]
return exists
}
// KeepOnly retains only the specified labeled objects and deletes the
// rest. Like calling Delete for all keys not specified.
func (m *BiMultimap) KeepOnly(keys []Key) {
m.mux.Lock()
defer m.mux.Unlock()
keyMap := make(map[Key]bool)
for _, k := range keys {
keyMap[k] = true
}
for k := range m.labeledObjects {
if !keyMap[k] {
m.delete(k)
}
}
}
// KeepOnlySelectors retains only the specified selecting objects and
// deletes the rest. Like calling DeleteSelector for all keys not
// specified.
func (m *BiMultimap) KeepOnlySelectors(keys []Key) {
m.mux.Lock()
defer m.mux.Unlock()
keyMap := make(map[Key]bool)
for _, k := range keys {
keyMap[k] = true
}
for k := range m.selectingObjects {
if !keyMap[k] {
m.deleteSelector(k)
}
}
}
// Select finds objects associated with a selecting object. If the
// given key was found in the map `ok` will be true. Otherwise false.
func (m *BiMultimap) Select(key Key) (keys []Key, ok bool) {
m.mux.RLock()
defer m.mux.RUnlock()
selectingObject, ok := m.selectingObjects[key]
if !ok {
// Does not exist.
return nil, false
}
keys = make([]Key, 0)
if labeled, ok := m.labeledBySelecting[selectingObject.selectorKey]; ok {
for _, labeledObject := range labeled.objects {
keys = append(keys, labeledObject.key)
}
}
return keys, true
}
// ReverseSelect finds objects selecting the given object. If the
// given key was found in the map `ok` will be true. Otherwise false.
func (m *BiMultimap) ReverseSelect(key Key) (keys []Key, ok bool) {
m.mux.RLock()
defer m.mux.RUnlock()
labeledObject, ok := m.labeledObjects[key]
if !ok {
// Does not exist.
return []Key{}, false
}
keys = make([]Key, 0)
if selecting, ok := m.selectingByLabeled[labeledObject.labelsKey]; ok {
for _, selectingObject := range selecting.objects {
keys = append(keys, selectingObject.key)
}
}
return keys, true
}
func copyLabels(labels map[string]string) map[string]string {
l := make(map[string]string)
for k, v := range labels {
l[k] = v
}
return l
}

View File

@ -0,0 +1,641 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package selectors
import (
"fmt"
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
pkglabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)
func TestAssociations(t *testing.T) {
cases := []struct {
name string
ops []operation
want []expectation
testAllPermutations bool
}{{
name: "single association",
ops: []operation{
putSelectingObject(key("hpa"), selector("a", "1")),
putLabeledObject(key("pod"), labels("a", "1")),
},
want: []expectation{
forwardSelect(key("hpa"), key("pod")),
reverseSelect(key("pod"), key("hpa")),
},
testAllPermutations: true,
}, {
name: "multiple associations from a selecting object",
ops: []operation{
putSelectingObject(key("hpa"), selector("a", "1")),
putLabeledObject(key("pod-1"), labels("a", "1")),
putLabeledObject(key("pod-2"), labels("a", "1")),
},
want: []expectation{
forwardSelect(key("hpa"), key("pod-1"), key("pod-2")),
reverseSelect(key("pod-1"), key("hpa")),
reverseSelect(key("pod-2"), key("hpa")),
},
testAllPermutations: true,
}, {
name: "multiple associations to a labeled object",
ops: []operation{
putSelectingObject(key("hpa-1"), selector("a", "1")),
putSelectingObject(key("hpa-2"), selector("a", "1")),
putLabeledObject(key("pod"), labels("a", "1")),
},
want: []expectation{
forwardSelect(key("hpa-1"), key("pod")),
forwardSelect(key("hpa-2"), key("pod")),
reverseSelect(key("pod"), key("hpa-1"), key("hpa-2")),
},
testAllPermutations: true,
}, {
name: "disjoint association sets",
ops: []operation{
putSelectingObject(key("hpa-1"), selector("a", "1")),
putSelectingObject(key("hpa-2"), selector("a", "2")),
putLabeledObject(key("pod-1"), labels("a", "1")),
putLabeledObject(key("pod-2"), labels("a", "2")),
},
want: []expectation{
forwardSelect(key("hpa-1"), key("pod-1")),
forwardSelect(key("hpa-2"), key("pod-2")),
reverseSelect(key("pod-1"), key("hpa-1")),
reverseSelect(key("pod-2"), key("hpa-2")),
},
testAllPermutations: true,
}, {
name: "separate label cache paths",
ops: []operation{
putSelectingObject(key("hpa"), selector("a", "1")),
putLabeledObject(key("pod-1"), labels("a", "1", "b", "2")),
putLabeledObject(key("pod-2"), labels("a", "1", "b", "3")),
},
want: []expectation{
forwardSelect(key("hpa"), key("pod-1"), key("pod-2")),
reverseSelect(key("pod-1"), key("hpa")),
reverseSelect(key("pod-2"), key("hpa")),
},
testAllPermutations: true,
}, {
name: "separate selector cache paths",
ops: []operation{
putSelectingObject(key("hpa-1"), selector("a", "1")),
putSelectingObject(key("hpa-2"), selector("b", "2")),
putLabeledObject(key("pod"), labels("a", "1", "b", "2")),
},
want: []expectation{
forwardSelect(key("hpa-1"), key("pod")),
forwardSelect(key("hpa-2"), key("pod")),
reverseSelect(key("pod"), key("hpa-1"), key("hpa-2")),
},
testAllPermutations: true,
}, {
name: "selection in different namespaces",
ops: []operation{
putLabeledObject(key("pod-1", "namespace-1"), labels("a", "1")),
putLabeledObject(key("pod-1", "namespace-2"), labels("a", "1")),
putSelectingObject(key("hpa-1", "namespace-2"), selector("a", "1")),
},
want: []expectation{
forwardSelect(key("hpa-1", "namespace-1")), // selects nothing
forwardSelect(key("hpa-1", "namespace-2"), key("pod-1", "namespace-2")),
reverseSelect(key("pod-1", "namespace-1")), // selects nothing
reverseSelect(key("pod-1", "namespace-2"), key("hpa-1", "namespace-2")),
},
testAllPermutations: true,
}, {
name: "update labeled objects",
ops: []operation{
putLabeledObject(key("pod-1"), labels("a", "1")),
putSelectingObject(key("hpa-1"), selector("a", "2")),
putLabeledObject(key("pod-1"), labels("a", "2")),
},
want: []expectation{
forwardSelect(key("hpa-1"), key("pod-1")),
reverseSelect(key("pod-1"), key("hpa-1")),
},
}, {
name: "update selecting objects",
ops: []operation{
putSelectingObject(key("hpa-1"), selector("a", "1")),
putLabeledObject(key("pod-1"), labels("a", "2")),
putSelectingObject(key("hpa-1"), selector("a", "2")),
},
want: []expectation{
forwardSelect(key("hpa-1"), key("pod-1")),
reverseSelect(key("pod-1"), key("hpa-1")),
},
}, {
name: "keep only labeled objects",
ops: []operation{
putSelectingObject(key("hpa-1"), selector("a", "1")),
putLabeledObject(key("pod-1"), labels("a", "1")),
putLabeledObject(key("pod-2"), labels("a", "1")),
putLabeledObject(key("pod-3"), labels("a", "1")),
keepOnly(key("pod-1"), key("pod-2")),
},
want: []expectation{
forwardSelect(key("hpa-1"), key("pod-1"), key("pod-2")),
reverseSelect(key("pod-1"), key("hpa-1")),
reverseSelect(key("pod-2"), key("hpa-1")),
},
}, {
name: "keep only selecting objects",
ops: []operation{
putSelectingObject(key("hpa-1"), selector("a", "1")),
putSelectingObject(key("hpa-2"), selector("a", "1")),
putSelectingObject(key("hpa-3"), selector("a", "1")),
putLabeledObject(key("pod-1"), labels("a", "1")),
keepOnlySelectors(key("hpa-1"), key("hpa-2")),
},
want: []expectation{
forwardSelect(key("hpa-1"), key("pod-1")),
forwardSelect(key("hpa-2"), key("pod-1")),
reverseSelect(key("pod-1"), key("hpa-1"), key("hpa-2")),
},
}, {
name: "put multiple associations and delete all",
ops: []operation{
putSelectingObject(key("hpa-1"), selector("a", "1")),
putSelectingObject(key("hpa-2"), selector("a", "1")),
putSelectingObject(key("hpa-3"), selector("a", "2")),
putSelectingObject(key("hpa-4"), selector("b", "1")),
putLabeledObject(key("pod-1"), labels("a", "1")),
putLabeledObject(key("pod-2"), labels("a", "2")),
putLabeledObject(key("pod-3"), labels("a", "1", "b", "1")),
putLabeledObject(key("pod-4"), labels("a", "2", "b", "1")),
putLabeledObject(key("pod-5"), labels("b", "1")),
putLabeledObject(key("pod-6"), labels("b", "2")),
deleteSelecting(key("hpa-1")),
deleteSelecting(key("hpa-2")),
deleteSelecting(key("hpa-3")),
deleteSelecting(key("hpa-4")),
deleteLabeled(key("pod-1")),
deleteLabeled(key("pod-2")),
deleteLabeled(key("pod-3")),
deleteLabeled(key("pod-4")),
deleteLabeled(key("pod-5")),
deleteLabeled(key("pod-6")),
},
want: []expectation{
emptyMap,
},
}, {
name: "fuzz testing",
ops: []operation{
randomOperations(10000),
deleteAll,
},
want: []expectation{
emptyMap,
},
}}
for _, tc := range cases {
var permutations [][]int
if tc.testAllPermutations {
// Run test case with all permutations of operations.
permutations = indexPermutations(len(tc.ops))
} else {
// Unless test is order dependent (e.g. includes
// deletes) or just too big.
var p []int
for i := 0; i < len(tc.ops); i++ {
p = append(p, i)
}
permutations = [][]int{p}
}
for _, permutation := range permutations {
name := tc.name + fmt.Sprintf(" permutation %v", permutation)
t.Run(name, func(t *testing.T) {
multimap := NewBiMultimap()
for i := range permutation {
tc.ops[i](multimap)
// Run consistency check after every operation.
err := consistencyCheck(multimap)
if err != nil {
t.Fatalf(err.Error())
}
}
for _, expect := range tc.want {
err := expect(multimap)
if err != nil {
t.Errorf("%v %v", tc.name, err)
}
}
})
}
}
}
func TestEfficientAssociation(t *testing.T) {
useOnceSelector := useOnce(selector("a", "1"))
m := NewBiMultimap()
m.PutSelector(key("hpa-1"), useOnceSelector)
m.Put(key("pod-1"), labels("a", "1"))
// Selector is used only during full scan. Second Put will use
// cached association or explode.
m.Put(key("pod-2"), labels("a", "1"))
err := forwardSelect(key("hpa-1"), key("pod-1"), key("pod-2"))(m)
if err != nil {
t.Errorf(err.Error())
}
}
func TestUseOnceSelector(t *testing.T) {
useOnceSelector := useOnce(selector("a", "1"))
labels := pkglabels.Set(labels("a", "1"))
// Use once.
useOnceSelector.Matches(labels)
// Use twice.
defer func() {
if r := recover(); r == nil {
t.Errorf("Expected panic when using selector twice.")
}
}()
useOnceSelector.Matches(labels)
}
func TestObjectsExist(t *testing.T) {
m := NewBiMultimap()
// Nothing exists in the empty map.
assert.False(t, m.Exists(key("pod-1")))
assert.False(t, m.SelectorExists(key("hpa-1")))
// Adding entries.
m.PutSelector(key("hpa-1"), useOnce(selector("a", "1")))
m.Put(key("pod-1"), labels("a", "1"))
// Entries exist.
assert.True(t, m.Exists(key("pod-1")))
assert.True(t, m.SelectorExists(key("hpa-1")))
// Removing the entries.
m.DeleteSelector(key("hpa-1"))
m.Delete(key("pod-1"))
// They don't exist anymore.
assert.False(t, m.Exists(key("pod-1")))
assert.False(t, m.SelectorExists(key("hpa-1")))
}
type useOnceSelector struct {
used bool
selector pkglabels.Selector
}
func useOnce(s pkglabels.Selector) pkglabels.Selector {
return &useOnceSelector{
selector: s,
}
}
func (u *useOnceSelector) Matches(l pkglabels.Labels) bool {
if u.used {
panic("useOnceSelector used more than once")
}
u.used = true
return u.selector.Matches(l)
}
func (u *useOnceSelector) Empty() bool {
return u.selector.Empty()
}
func (u *useOnceSelector) String() string {
return u.selector.String()
}
func (u *useOnceSelector) Add(r ...pkglabels.Requirement) pkglabels.Selector {
u.selector = u.selector.Add(r...)
return u
}
func (u *useOnceSelector) Requirements() (pkglabels.Requirements, bool) {
return u.selector.Requirements()
}
func (u *useOnceSelector) DeepCopySelector() pkglabels.Selector {
u.selector = u.selector.DeepCopySelector()
return u
}
func (u *useOnceSelector) RequiresExactMatch(label string) (value string, found bool) {
v, f := u.selector.RequiresExactMatch(label)
return v, f
}
func indexPermutations(size int) [][]int {
var permute func([]int, []int) [][]int
permute = func(placed, remaining []int) (permutations [][]int) {
if len(remaining) == 0 {
return [][]int{placed}
}
for i, v := range remaining {
r := append([]int(nil), remaining...) // copy remaining
r = append(r[:i], r[i+1:]...) // delete placed index
p := permute(append(placed, v), r) // place index and permute
permutations = append(permutations, p...)
}
return
}
var remaining []int
for i := 0; i < size; i++ {
remaining = append(remaining, i)
}
return permute(nil, remaining)
}
type operation func(*BiMultimap)
func putLabeledObject(key Key, labels map[string]string) operation {
return func(m *BiMultimap) {
m.Put(key, labels)
}
}
func putSelectingObject(key Key, selector pkglabels.Selector) operation {
return func(m *BiMultimap) {
m.PutSelector(key, selector)
}
}
func deleteLabeled(key Key) operation {
return func(m *BiMultimap) {
m.Delete(key)
}
}
func deleteSelecting(key Key) operation {
return func(m *BiMultimap) {
m.DeleteSelector(key)
}
}
func deleteAll(m *BiMultimap) {
for key := range m.labeledObjects {
m.Delete(key)
}
for key := range m.selectingObjects {
m.DeleteSelector(key)
}
}
func keepOnly(keys ...Key) operation {
return func(m *BiMultimap) {
m.KeepOnly(keys)
}
}
func keepOnlySelectors(keys ...Key) operation {
return func(m *BiMultimap) {
m.KeepOnlySelectors(keys)
}
}
func randomOperations(times int) operation {
pods := []Key{
key("pod-1"),
key("pod-2"),
key("pod-3"),
key("pod-4"),
key("pod-5"),
key("pod-6"),
}
randomPod := func() Key {
return pods[rand.Intn(len(pods))]
}
labels := []map[string]string{
labels("a", "1"),
labels("a", "2"),
labels("b", "1"),
labels("b", "2"),
labels("a", "1", "b", "1"),
labels("a", "2", "b", "2"),
labels("a", "3"),
labels("c", "1"),
}
randomLabels := func() map[string]string {
return labels[rand.Intn(len(labels))]
}
hpas := []Key{
key("hpa-1"),
key("hpa-2"),
key("hpa-3"),
}
randomHpa := func() Key {
return hpas[rand.Intn(len(hpas))]
}
selectors := []pkglabels.Selector{
selector("a", "1"),
selector("b", "1"),
selector("a", "1", "b", "1"),
selector("c", "2"),
}
randomSelector := func() pkglabels.Selector {
return selectors[rand.Intn(len(selectors))]
}
randomOp := func(m *BiMultimap) {
switch rand.Intn(4) {
case 0:
m.Put(randomPod(), randomLabels())
case 1:
m.PutSelector(randomHpa(), randomSelector())
case 2:
m.Delete(randomPod())
case 3:
m.DeleteSelector(randomHpa())
}
}
return func(m *BiMultimap) {
for i := 0; i < times; i++ {
randomOp(m)
}
}
}
type expectation func(*BiMultimap) error
func forwardSelect(key Key, want ...Key) expectation {
return func(m *BiMultimap) error {
got, _ := m.Select(key)
if !unorderedEqual(want, got) {
return fmt.Errorf("forward select %v wanted %v. got %v.", key, want, got)
}
return nil
}
}
func reverseSelect(key Key, want ...Key) expectation {
return func(m *BiMultimap) error {
got, _ := m.ReverseSelect(key)
if !unorderedEqual(want, got) {
return fmt.Errorf("reverse select %v wanted %v. got %v.", key, want, got)
}
return nil
}
}
func emptyMap(m *BiMultimap) error {
if len(m.labeledObjects) != 0 {
return fmt.Errorf("Found %v labeledObjects. Wanted none.", len(m.labeledObjects))
}
if len(m.selectingObjects) != 0 {
return fmt.Errorf("Found %v selectingObjects. Wanted none.", len(m.selectingObjects))
}
if len(m.labeledBySelecting) != 0 {
return fmt.Errorf("Found %v cached labeledBySelecting associations. Wanted none.", len(m.labeledBySelecting))
}
if len(m.selectingByLabeled) != 0 {
return fmt.Errorf("Found %v cached selectingByLabeled associations. Wanted none.", len(m.selectingByLabeled))
}
return nil
}
func consistencyCheck(m *BiMultimap) error {
emptyKey := Key{}
emptyLabelsKey := labelsKey{}
emptySelectorKey := selectorKey{}
for k, v := range m.labeledObjects {
if v == nil {
return fmt.Errorf("Found nil labeled object for key %q", k)
}
if k == emptyKey {
return fmt.Errorf("Found empty key for labeled object %+v", v)
}
}
for k, v := range m.selectingObjects {
if v == nil {
return fmt.Errorf("Found nil selecting object for key %q", k)
}
if k == emptyKey {
return fmt.Errorf("Found empty key for selecting object %+v", v)
}
}
for k, v := range m.labeledBySelecting {
if v == nil {
return fmt.Errorf("Found nil labeledBySelecting entry for key %q", k)
}
if k == emptySelectorKey {
return fmt.Errorf("Found empty key for labeledBySelecting object %+v", v)
}
for k2, v2 := range v.objects {
if v2 == nil {
return fmt.Errorf("Found nil object in labeledBySelecting under keys %q and %q", k, k2)
}
if k2 == emptyKey {
return fmt.Errorf("Found empty key for object in labeledBySelecting under key %+v", k)
}
}
if v.refCount < 1 {
return fmt.Errorf("Found labeledBySelecting entry with no references (orphaned) under key %q", k)
}
}
for k, v := range m.selectingByLabeled {
if v == nil {
return fmt.Errorf("Found nil selectingByLabeled entry for key %q", k)
}
if k == emptyLabelsKey {
return fmt.Errorf("Found empty key for selectingByLabeled object %+v", v)
}
for k2, v2 := range v.objects {
if v2 == nil {
return fmt.Errorf("Found nil object in selectingByLabeled under keys %q and %q", k, k2)
}
if k2 == emptyKey {
return fmt.Errorf("Found empty key for object in selectingByLabeled under key %+v", k)
}
}
if v.refCount < 1 {
return fmt.Errorf("Found selectingByLabeled entry with no references (orphaned) under key %q", k)
}
}
return nil
}
func key(s string, ss ...string) Key {
if len(ss) > 1 {
panic("Key requires 1 or 2 parts.")
}
k := Key{
Name: s,
}
if len(ss) >= 1 {
k.Namespace = ss[0]
}
return k
}
func labels(ls ...string) map[string]string {
if len(ls)%2 != 0 {
panic("labels requires pairs of strings.")
}
ss := make(map[string]string)
for i := 0; i < len(ls); i += 2 {
ss[ls[i]] = ls[i+1]
}
return ss
}
func selector(ss ...string) pkglabels.Selector {
if len(ss)%2 != 0 {
panic("selector requires pairs of strings.")
}
s := pkglabels.NewSelector()
for i := 0; i < len(ss); i += 2 {
r, err := pkglabels.NewRequirement(ss[i], selection.Equals, []string{ss[i+1]})
if err != nil {
panic(err)
}
s = s.Add(*r)
}
return s
}
func unorderedEqual(as, bs []Key) bool {
if len(as) != len(bs) {
return false
}
aMap := make(map[Key]int)
for _, a := range as {
aMap[a]++
}
bMap := make(map[Key]int)
for _, b := range bs {
bMap[b]++
}
if len(aMap) != len(bMap) {
return false
}
for a, count := range aMap {
if bMap[a] != count {
return false
}
}
return true
}

View File

@ -11,9 +11,11 @@ import (
"k8s.io/client-go/dynamic"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/scale"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
)
@ -30,6 +32,13 @@ type DynamicClusterClient struct {
ClusterName string
}
// ClusterScaleClient stands for a cluster ClientSet with scale client for the given member cluster
type ClusterScaleClient struct {
KubeClient *kubeclientset.Clientset
ScaleClient scale.ScalesGetter
ClusterName string
}
// Config holds the common attributes that can be passed to a Kubernetes client on
// initialization.
@ -44,6 +53,35 @@ type ClientOption struct {
Burst int
}
// NewClusterScaleClientSet returns a ClusterScaleClient for the given member cluster.
func NewClusterScaleClientSet(clusterName string, client client.Client) (*ClusterScaleClient, error) {
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client))
if err != nil {
return nil, err
}
var clusterScaleClientSet = ClusterScaleClient{ClusterName: clusterName}
if clusterConfig != nil {
hpaClient := kubeclientset.NewForConfigOrDie(clusterConfig)
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
mapper, err := apiutil.NewDiscoveryRESTMapper(clusterConfig)
if err != nil {
return nil, err
}
scaleClient, err := scale.NewForConfig(clusterConfig, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, err
}
clusterScaleClientSet.KubeClient = hpaClient
clusterScaleClientSet.ScaleClient = scaleClient
}
return &clusterScaleClientSet, nil
}
// NewClusterClientSet returns a ClusterClient for the given member cluster.
func NewClusterClientSet(clusterName string, client client.Client, clientOption *ClientOption) (*ClusterClient, error) {
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client))

View File

@ -0,0 +1,50 @@
package federatedhpa
import (
"context"
"encoding/json"
"net/http"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
autoscalingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/autoscaling/v1alpha1"
"github.com/karmada-io/karmada/pkg/util/lifted"
)
// MutatingAdmission mutates API request if necessary.
type MutatingAdmission struct {
decoder *admission.Decoder
}
// Check if our MutatingAdmission implements necessary interface
var _ admission.Handler = &MutatingAdmission{}
var _ admission.DecoderInjector = &MutatingAdmission{}
// Handle yields a response to an AdmissionRequest.
func (a *MutatingAdmission) Handle(_ context.Context, req admission.Request) admission.Response {
federatedHPA := &autoscalingv1alpha1.FederatedHPA{}
err := a.decoder.Decode(req, federatedHPA)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
klog.V(2).Infof("Mutating federatedHPA(%s/%s) for request: %s", federatedHPA.Namespace, federatedHPA.Name, req.Operation)
lifted.SetDefaultsFederatedHPA(federatedHPA)
marshaledBytes, err := json.Marshal(federatedHPA)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.Object.Raw, marshaledBytes)
}
// InjectDecoder implements admission.DecoderInjector interface.
// A decoder will be automatically injected.
func (a *MutatingAdmission) InjectDecoder(d *admission.Decoder) error {
a.decoder = d
return nil
}