Merge pull request #4552 from jayantjain93/debugging-snapshot

Adding support for Debugging Snapshot
This commit is contained in:
Kubernetes Prow Robot 2021-12-30 05:20:50 -08:00 committed by GitHub
commit 28eaacd631
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 760 additions and 50 deletions

View File

@ -20,6 +20,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
@ -50,6 +51,8 @@ type AutoscalingContext struct {
EstimatorBuilder estimator.EstimatorBuilder
// ProcessorCallbacks is interface defining extra callback methods which can be called by processors used in extension points.
ProcessorCallbacks processor_callbacks.ProcessorCallbacks
// DebuggingSnapshotter is the interface for capturing the debugging snapshot
DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
}
// AutoscalingKubeClients contains all Kubernetes API clients,
@ -93,7 +96,8 @@ func NewAutoscalingContext(
cloudProvider cloudprovider.CloudProvider,
expanderStrategy expander.Strategy,
estimatorBuilder estimator.EstimatorBuilder,
processorCallbacks processor_callbacks.ProcessorCallbacks) *AutoscalingContext {
processorCallbacks processor_callbacks.ProcessorCallbacks,
debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) *AutoscalingContext {
return &AutoscalingContext{
AutoscalingOptions: options,
CloudProvider: cloudProvider,
@ -103,6 +107,7 @@ func NewAutoscalingContext(
ExpanderStrategy: expanderStrategy,
EstimatorBuilder: estimatorBuilder,
ProcessorCallbacks: processorCallbacks,
DebuggingSnapshotter: debuggingSnapshotter,
}
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
@ -48,6 +49,7 @@ type AutoscalerOptions struct {
EstimatorBuilder estimator.EstimatorBuilder
Processors *ca_processors.AutoscalingProcessors
Backoff backoff.Backoff
DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
}
// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
@ -76,7 +78,8 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError)
opts.CloudProvider,
opts.ExpanderStrategy,
opts.EstimatorBuilder,
opts.Backoff), nil
opts.Backoff,
opts.DebuggingSnapshotter), nil
}
// Initialize default options if not provided.

View File

@ -78,6 +78,12 @@ func (p *filterOutSchedulablePodListProcessor) Process(
if len(unschedulablePodsToHelp) != len(unschedulablePods) {
klog.V(2).Info("Schedulable pods present")
context.ProcessorCallbacks.DisableScaleDownForLoop()
if context.DebuggingSnapshotter.IsDataCollectionAllowed() {
schedulablePods := findSchedulablePods(unschedulablePods, unschedulablePodsToHelp)
context.DebuggingSnapshotter.SetUnscheduledPodsCanBeScheduled(schedulablePods)
}
} else {
klog.V(4).Info("No schedulable pods")
}
@ -179,3 +185,17 @@ func moreImportantPod(pod1, pod2 *apiv1.Pod) bool {
p2 := corev1helpers.PodPriority(pod2)
return p1 > p2
}
func findSchedulablePods(allUnschedulablePods, podsStillUnschedulable []*apiv1.Pod) []*apiv1.Pod {
podsStillUnschedulableMap := make(map[*apiv1.Pod]struct{}, len(podsStillUnschedulable))
for _, x := range podsStillUnschedulable {
podsStillUnschedulableMap[x] = struct{}{}
}
var schedulablePods []*apiv1.Pod
for _, x := range allUnschedulablePods {
if _, found := podsStillUnschedulableMap[x]; !found {
schedulablePods = append(schedulablePods, x)
}
}
return schedulablePods
}

View File

@ -136,7 +136,7 @@ func TestFindUnneededNodes(t *testing.T) {
},
UnremovableNodeRecheckTimeout: 5 * time.Minute,
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -252,7 +252,7 @@ func TestFindUnneededGPUNodes(t *testing.T) {
},
UnremovableNodeRecheckTimeout: 5 * time.Minute,
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -359,7 +359,7 @@ func TestFindUnneededWithPerNodeGroupThresholds(t *testing.T) {
}
for tn, tc := range cases {
t.Run(tn, func(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(globalOptions, &fake.Clientset{}, nil, provider, nil)
context, err := NewScaleTestAutoscalingContext(globalOptions, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
sd := NewScaleDown(&context, NewTestProcessors(), clusterStateRegistry)
@ -434,7 +434,7 @@ func TestPodsWithPreemptionsFindUnneededNodes(t *testing.T) {
ScaleDownUtilizationThreshold: 0.35,
},
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -491,7 +491,7 @@ func TestFindUnneededMaxCandidates(t *testing.T) {
ScaleDownCandidatesPoolRatio: 1,
ScaleDownCandidatesPoolMinCount: 1000,
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -566,7 +566,7 @@ func TestFindUnneededEmptyNodes(t *testing.T) {
ScaleDownCandidatesPoolRatio: 1.0,
ScaleDownCandidatesPoolMinCount: 1000,
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -616,7 +616,7 @@ func TestFindUnneededNodePool(t *testing.T) {
ScaleDownCandidatesPoolRatio: 0.1,
ScaleDownCandidatesPoolMinCount: 10,
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -760,7 +760,7 @@ func TestDeleteNode(t *testing.T) {
// build context
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -1146,7 +1146,7 @@ func TestScaleDown(t *testing.T) {
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
nodes := []*apiv1.Node{n1, n2}
@ -1331,7 +1331,7 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
provider.AddNode("ng1", n1)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
if scenario.nodeInfoSuccess {
@ -1554,7 +1554,7 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
assert.NotNil(t, provider)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(config.options, fakeClient, registry, provider, nil)
context, err := NewScaleTestAutoscalingContext(config.options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -1643,7 +1643,7 @@ func TestNoScaleDownUnready(t *testing.T) {
MaxGracefulTerminationSec: 60,
}
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
nodes := []*apiv1.Node{n1, n2}
@ -1756,7 +1756,7 @@ func TestScaleDownNoMove(t *testing.T) {
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
nodes := []*apiv1.Node{n1, n2}
@ -2006,7 +2006,7 @@ func TestSoftTaint(t *testing.T) {
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -2127,7 +2127,7 @@ func TestSoftTaintTimeLimit(t *testing.T) {
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())

View File

@ -21,6 +21,8 @@ import (
"reflect"
"testing"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
testcloudprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
@ -155,7 +157,7 @@ func NewTestProcessors() *processors.AutoscalingProcessors {
func NewScaleTestAutoscalingContext(
options config.AutoscalingOptions, fakeClient kube_client.Interface,
listers kube_util.ListerRegistry, provider cloudprovider.CloudProvider,
processorCallbacks processor_callbacks.ProcessorCallbacks) (context.AutoscalingContext, error) {
processorCallbacks processor_callbacks.ProcessorCallbacks, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (context.AutoscalingContext, error) {
// Not enough buffer space causes the test to hang without printing any logs.
// This is not useful.
fakeRecorder := kube_record.NewFakeRecorder(100)
@ -170,6 +172,9 @@ func NewScaleTestAutoscalingContext(
if err != nil {
return context.AutoscalingContext{}, err
}
if debuggingSnapshotter == nil {
debuggingSnapshotter = debuggingsnapshot.NewDebuggingSnapshotter(false)
}
clusterSnapshot := simulator.NewBasicClusterSnapshot()
return context.AutoscalingContext{
AutoscalingOptions: options,
@ -179,12 +184,13 @@ func NewScaleTestAutoscalingContext(
LogRecorder: fakeLogRecorder,
ListerRegistry: listers,
},
CloudProvider: provider,
PredicateChecker: predicateChecker,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: random.NewStrategy(),
EstimatorBuilder: estimatorBuilder,
ProcessorCallbacks: processorCallbacks,
CloudProvider: provider,
PredicateChecker: predicateChecker,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: random.NewStrategy(),
EstimatorBuilder: estimatorBuilder,
ProcessorCallbacks: processorCallbacks,
DebuggingSnapshotter: debuggingSnapshotter,
}, nil
}

View File

@ -518,7 +518,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
assert.NotNil(t, provider)
// Create context with non-random expander strategy.
context, err := NewScaleTestAutoscalingContext(config.options, &fake.Clientset{}, listers, provider, nil)
context, err := NewScaleTestAutoscalingContext(config.options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
expander := reportingStrategy{
@ -684,7 +684,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
nodes := []*apiv1.Node{n1, n2}
@ -724,7 +724,7 @@ func TestScaleUpNoHelp(t *testing.T) {
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
nodes := []*apiv1.Node{n1}
@ -790,7 +790,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
@ -851,7 +851,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
}
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil)
assert.NoError(t, err)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -904,7 +904,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
}
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil)
assert.NoError(t, err)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -980,7 +980,7 @@ func TestCheckScaleUpDeltaWithinLimits(t *testing.T) {
func TestAuthError(t *testing.T) {
metrics.RegisterAll(false)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, nil, nil, nil)
assert.NoError(t, err)
nodeGroup := &mockprovider.NodeGroup{}

View File

@ -23,6 +23,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
@ -119,7 +120,8 @@ func NewStaticAutoscaler(
cloudProvider cloudprovider.CloudProvider,
expanderStrategy expander.Strategy,
estimatorBuilder estimator.EstimatorBuilder,
backoff backoff.Backoff) *StaticAutoscaler {
backoff backoff.Backoff,
debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) *StaticAutoscaler {
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
autoscalingContext := context.NewAutoscalingContext(
@ -130,7 +132,8 @@ func NewStaticAutoscaler(
cloudProvider,
expanderStrategy,
estimatorBuilder,
processorCallbacks)
processorCallbacks,
debuggingSnapshotter)
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage,
@ -220,6 +223,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
a.cleanUpIfRequired()
a.processorCallbacks.reset()
a.clusterStateRegistry.PeriodicCleanup()
a.DebuggingSnapshotter.StartDataCollection()
defer a.DebuggingSnapshotter.Flush()
unschedulablePodLister := a.UnschedulablePodLister()
scheduledPodLister := a.ScheduledPodLister()
@ -409,6 +414,13 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
}
}
l, err := a.ClusterSnapshot.NodeInfos().List()
if err != nil {
klog.Errorf("Unable to fetch NodeInfo List for Debugging Snapshot, %v", err)
} else {
a.AutoscalingContext.DebuggingSnapshotter.SetNodeGroupInfo(l)
}
unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)
// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
@ -715,6 +727,7 @@ func (a *StaticAutoscaler) filterOutYoungPods(allUnschedulablePods []*apiv1.Pod,
// ExitCleanUp performs all necessary clean-ups when the autoscaler's exiting.
func (a *StaticAutoscaler) ExitCleanUp() {
a.processors.CleanUp()
a.DebuggingSnapshotter.Cleanup()
if !a.AutoscalingContext.WriteStatusConfigMap {
return

View File

@ -185,7 +185,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
@ -376,7 +376,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
@ -511,7 +511,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
@ -658,7 +658,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
@ -786,7 +786,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
@ -882,7 +882,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
@ -944,7 +944,7 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) {
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{

View File

@ -0,0 +1,150 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debuggingsnapshot
import (
"encoding/json"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// NodeInfo captures a single entity of nodeInfo. i.e. Node specs and all the pods on that node.
type NodeInfo struct {
Node *v1.Node `json:"Node"`
Pods []*framework.PodInfo `json:"Pods"`
}
// DebuggingSnapshot is the interface used to define any debugging snapshot
// implementation, incl. any custom impl. to be used by DebuggingSnapshotter
type DebuggingSnapshot interface {
// SetNodeGroupInfo is a setter to capture all the NodeInfo
SetNodeGroupInfo([]*framework.NodeInfo)
// SetUnscheduledPodsCanBeScheduled is a setter for all pods which are unscheduled
// but they can be scheduled. i.e. pods which aren't triggering scale-up
SetUnscheduledPodsCanBeScheduled([]*v1.Pod)
// SetErrorMessage sets the error message in the snapshot
SetErrorMessage(string)
// SetEndTimestamp sets the timestamp in the snapshot,
// when all the data collection is finished
SetEndTimestamp(time.Time)
// SetStartTimestamp sets the timestamp in the snapshot,
// when all the data collection is started
SetStartTimestamp(time.Time)
// GetOutputBytes return the output state of the Snapshot with bool to specify if
// the snapshot has the error message set
GetOutputBytes() ([]byte, bool)
// Cleanup clears the internal data obj of the snapshot, readying for next request
Cleanup()
}
// DebuggingSnapshotImpl is the struct used to collect all the data to be output.
// Please add all new output fields in this struct. This is to make the data
// encoding/decoding easier as the single object going into the decoder
type DebuggingSnapshotImpl struct {
NodeInfo []*NodeInfo `json:"NodeList"`
UnscheduledPodsCanBeScheduled []*v1.Pod `json:"UnscheduledPodsCanBeScheduled"`
Error string `json:"Error,omitempty"`
StartTimestamp time.Time `json:"StartTimestamp"`
EndTimestamp time.Time `json:"EndTimestamp"`
}
// SetUnscheduledPodsCanBeScheduled is the setter for UnscheduledPodsCanBeScheduled
func (s *DebuggingSnapshotImpl) SetUnscheduledPodsCanBeScheduled(podList []*v1.Pod) {
if podList == nil {
return
}
s.UnscheduledPodsCanBeScheduled = nil
for _, pod := range podList {
s.UnscheduledPodsCanBeScheduled = append(s.UnscheduledPodsCanBeScheduled, pod)
}
}
// SetNodeGroupInfo is the setter for Node Group Info
// All filtering/prettifying of data should be done here.
func (s *DebuggingSnapshotImpl) SetNodeGroupInfo(nodeInfos []*framework.NodeInfo) {
if nodeInfos == nil {
return
}
var NodeInfoList []*NodeInfo
for _, n := range nodeInfos {
nClone := n.Clone()
node := nClone.Node()
nodeInfo := &NodeInfo{
Node: node,
Pods: nClone.Pods,
}
NodeInfoList = append(NodeInfoList, nodeInfo)
}
s.NodeInfo = NodeInfoList
}
// SetEndTimestamp is the setter for end timestamp
func (s *DebuggingSnapshotImpl) SetEndTimestamp(t time.Time) {
s.EndTimestamp = t
}
// SetStartTimestamp is the setter for end timestamp
func (s *DebuggingSnapshotImpl) SetStartTimestamp(t time.Time) {
s.StartTimestamp = t
}
// GetOutputBytes return the output state of the Snapshot with bool to specify if
// the snapshot has the error message set
func (s *DebuggingSnapshotImpl) GetOutputBytes() ([]byte, bool) {
errMsgSet := false
if s.Error != "" {
klog.Errorf("Debugging snapshot found with error message set when GetOutputBytes() is called. - ", s.Error)
errMsgSet = true
}
klog.Infof("Debugging snapshot flush ready")
marshalOutput, err := json.Marshal(s)
// this error captures if the snapshot couldn't be marshalled, hence we create a new object
// and return the error message
if err != nil {
klog.Errorf("Unable to json marshal the debugging snapshot: %v", err)
errorSnapshot := DebuggingSnapshotImpl{}
errorSnapshot.SetErrorMessage("Unable to marshal the snapshot, " + err.Error())
errorSnapshot.SetEndTimestamp(s.EndTimestamp)
errorSnapshot.SetStartTimestamp(s.StartTimestamp)
errorMarshal, err1 := json.Marshal(errorSnapshot)
klog.Errorf("Unable to marshal a new Debugging Snapshot Impl, with just a error message: %v", err1)
return errorMarshal, true
}
return marshalOutput, errMsgSet
}
// SetErrorMessage sets the error message in the snapshot
func (s *DebuggingSnapshotImpl) SetErrorMessage(error string) {
s.Error = error
}
// Cleanup cleans up all the data in the snapshot without changing the
// pointer reference
func (s *DebuggingSnapshotImpl) Cleanup() {
*s = DebuggingSnapshotImpl{}
}

View File

@ -0,0 +1,106 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debuggingsnapshot
import (
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
func TestBasicSetterWorkflow(t *testing.T) {
snapshot := &DebuggingSnapshotImpl{}
pod := []*framework.PodInfo{
{
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "Pod1",
},
Spec: v1.PodSpec{
NodeName: "testNode",
},
},
},
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "testNode",
},
}
nodeInfo := &framework.NodeInfo{
Pods: pod,
Requested: &framework.Resource{},
NonZeroRequested: &framework.Resource{},
Allocatable: &framework.Resource{},
Generation: 0,
}
var nodeGroups []*framework.NodeInfo
nodeGroups = append(nodeGroups, nodeInfo)
nodeGroups[0].SetNode(node)
timestamp := time.Now().In(time.UTC)
snapshot.SetNodeGroupInfo(nodeGroups)
snapshot.SetEndTimestamp(timestamp)
op, err := snapshot.GetOutputBytes()
assert.False(t, err)
type JSONList = []interface{}
type JSONMap = map[string]interface{}
var String = "test"
var parsed map[string]interface{}
er := json.Unmarshal(op, &parsed)
assert.NoError(t, er)
assert.IsType(t, JSONMap{}, parsed)
assert.IsType(t, JSONList{}, parsed["NodeList"])
assert.Greater(t, len(parsed["NodeList"].([]interface{})), 0)
assert.IsType(t, JSONMap{}, parsed["NodeList"].([]interface{})[0])
pNodeInfo := parsed["NodeList"].([]interface{})[0].(map[string]interface{})
assert.IsType(t, JSONMap{}, pNodeInfo["Node"].(map[string]interface{}))
pNode := pNodeInfo["Node"].(map[string]interface{})
assert.IsType(t, JSONMap{}, pNode["metadata"].(map[string]interface{}))
pNodeObjectMeta := pNode["metadata"].(map[string]interface{})
assert.IsType(t, String, pNodeObjectMeta["name"])
pNodeName := pNodeObjectMeta["name"].(string)
assert.Equal(t, pNodeName, "testNode")
assert.IsType(t, JSONList{}, pNodeInfo["Pods"])
assert.Greater(t, len(pNodeInfo["Pods"].([]interface{})), 0)
assert.IsType(t, JSONMap{}, pNodeInfo["Pods"].([]interface{})[0])
pPodInfo := pNodeInfo["Pods"].([]interface{})[0].(map[string]interface{})
assert.IsType(t, JSONMap{}, pPodInfo["Pod"])
pPod := pPodInfo["Pod"].(map[string]interface{})
assert.IsType(t, JSONMap{}, pPod["metadata"])
pPodMeta := pPod["metadata"].(map[string]interface{})
assert.IsType(t, String, pPodMeta["name"])
pPodName := pPodMeta["name"].(string)
assert.Equal(t, pPodName, "Pod1")
}
func TestEmptyDataNoError(t *testing.T) {
snapshot := &DebuggingSnapshotImpl{}
op, err := snapshot.GetOutputBytes()
assert.False(t, err)
assert.NotNil(t, op)
}

View File

@ -0,0 +1,238 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debuggingsnapshot
import (
"context"
"net/http"
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// DebuggingSnapshotterState is the type for the debugging snapshot State machine
// The states guide the workflow of the snapshot.
type DebuggingSnapshotterState int
// DebuggingSnapshotterState help navigate the different workflows of the snapshot capture.
const (
// SNAPSHOTTER_DISABLED is when debuggingSnapshot is disabled on the cluster and no action can be taken
SNAPSHOTTER_DISABLED DebuggingSnapshotterState = iota + 1
// LISTENING is set when snapshotter is enabled on the cluster and is ready to listen to a
// snapshot request. Used by ResponseHandler to wait on to listen to request
LISTENING
// TRIGGER_ENABLED is set by ResponseHandler if a valid snapshot request is received
// it states that a snapshot request needs to be processed
TRIGGER_ENABLED
// START_DATA_COLLECTION is used to synchronise the collection of data.
// Since the trigger is an asynchronous process, data collection could be started mid-loop
// leading to incomplete data. So setter methods wait for START_DATA_COLLECTION before collecting data
// which is set at the start of the next loop after receiving the trigger
START_DATA_COLLECTION
// DATA_COLLECTED is set by setter func (also used by setter func for data collection)
// This is set to let Flush know that at least some data collected and there isn't
// an error State leading to no data collection
DATA_COLLECTED
)
// DebuggingSnapshotterImpl is the impl for DebuggingSnapshotter
type DebuggingSnapshotterImpl struct {
// State captures the internal state of the snapshotter
State *DebuggingSnapshotterState
// DebuggingSnapshot is the data bean for the snapshot
DebuggingSnapshot DebuggingSnapshot
// Mutex is the synchronisation used to the methods/states in the critical section
Mutex *sync.Mutex
// Trigger is the channel on which the Response Handler waits on to know
// when there is data to be flushed back to the channel from the Snapshot
Trigger chan struct{}
// CancelRequest is the cancel function for the snapshot request. It is used to
// terminate any ongoing request when CA is shutting down
CancelRequest context.CancelFunc
}
// DebuggingSnapshotter is the interface for debugging snapshot
type DebuggingSnapshotter interface {
// StartDataCollection will check the State(s) and enable data
// collection for the loop if applicable
StartDataCollection()
// SetNodeGroupInfo is a setter to capture all the NodeInfo
SetNodeGroupInfo([]*framework.NodeInfo)
// SetUnscheduledPodsCanBeScheduled is a setter for all pods which are unscheduled
// but they can be scheduled. i.e. pods which aren't triggering scale-up
SetUnscheduledPodsCanBeScheduled([]*v1.Pod)
// ResponseHandler is the http response handler to manage incoming requests
ResponseHandler(http.ResponseWriter, *http.Request)
// IsDataCollectionAllowed checks the internal State of the snapshotter
// to find if data can be collected. This can be used before preprocessing
// for the snapshot
IsDataCollectionAllowed() bool
// Flush triggers the flushing of the snapshot
Flush()
// Cleanup clears the internal data beans of the snapshot, readying for next request
Cleanup()
}
// NewDebuggingSnapshotter returns a new instance of DebuggingSnapshotter
func NewDebuggingSnapshotter(isDebuggerEnabled bool) DebuggingSnapshotter {
state := SNAPSHOTTER_DISABLED
if isDebuggerEnabled {
klog.Infof("Debugging Snapshot is enabled")
state = LISTENING
}
return &DebuggingSnapshotterImpl{
State: &state,
Mutex: &sync.Mutex{},
DebuggingSnapshot: &DebuggingSnapshotImpl{},
Trigger: make(chan struct{}, 1),
}
}
// ResponseHandler is the impl for request handler
func (d *DebuggingSnapshotterImpl) ResponseHandler(w http.ResponseWriter, r *http.Request) {
d.Mutex.Lock()
// checks if the handler is in the correct State to accept a new snapshot request
if *d.State != LISTENING {
defer d.Mutex.Unlock()
klog.Errorf("Debugging Snapshot is currently being processed. Another snapshot can't be processed")
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte("Another debugging snapshot request is being processed. Concurrent requests not supported"))
return
}
ctx, cancel := context.WithCancel(r.Context())
d.CancelRequest = cancel
klog.Infof("Received a new snapshot, that is accepted")
// set the State to trigger enabled, to allow workflow to collect data
*d.State = TRIGGER_ENABLED
d.Mutex.Unlock()
select {
case <-d.Trigger:
d.Mutex.Lock()
d.DebuggingSnapshot.SetEndTimestamp(time.Now().In(time.UTC))
body, isErrorMessage := d.DebuggingSnapshot.GetOutputBytes()
if isErrorMessage {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
w.Write(body)
// reset the debugging State to receive a new snapshot request
*d.State = LISTENING
d.CancelRequest = nil
d.DebuggingSnapshot.Cleanup()
d.Mutex.Unlock()
case <-ctx.Done():
d.Mutex.Lock()
klog.Infof("Received terminate trigger, aborting ongoing snapshot request")
w.WriteHeader(http.StatusServiceUnavailable)
d.DebuggingSnapshot.Cleanup()
*d.State = LISTENING
d.CancelRequest = nil
select {
case <-d.Trigger:
default:
}
d.Mutex.Unlock()
}
}
// IsDataCollectionAllowed encapsulate the check to know if data collection is currently active
// This should be used by setters and by any function that is contingent on data collection State
// before doing extra processing.
// e.g. If you want to pre-process a particular State in cloud-provider for snapshot
// you should check this func in the loop before doing that extra processing
func (d *DebuggingSnapshotterImpl) IsDataCollectionAllowed() bool {
d.Mutex.Lock()
defer d.Mutex.Unlock()
return *d.State == DATA_COLLECTED || *d.State == START_DATA_COLLECTION
}
// StartDataCollection changes the State when the trigger has been enabled
// to start data collection. To be done at the start of the runLoop to allow for consistency
// as the trigger can be called mid-loop leading to partial data collection
func (d *DebuggingSnapshotterImpl) StartDataCollection() {
d.Mutex.Lock()
defer d.Mutex.Unlock()
if *d.State == TRIGGER_ENABLED {
*d.State = START_DATA_COLLECTION
klog.Infof("Trigger Enabled for Debugging Snapshot, starting data collection")
d.DebuggingSnapshot.SetStartTimestamp(time.Now().In(time.UTC))
}
}
// Flush is the impl for DebuggingSnapshotter.Flush
// It checks if any data has been collected or data collection failed
func (d *DebuggingSnapshotterImpl) Flush() {
d.Mutex.Lock()
defer d.Mutex.Unlock()
// Case where Data Collection was started but no data was collected, needs to
// be stated as an error and reset to pre-trigger State
if *d.State == START_DATA_COLLECTION {
klog.Errorf("No data was collected for the snapshot in this loop. So no snapshot can be generated.")
d.DebuggingSnapshot.SetErrorMessage("Unable to collect any data")
d.Trigger <- struct{}{}
return
}
if *d.State == DATA_COLLECTED {
d.Trigger <- struct{}{}
}
}
// SetNodeGroupInfo is the setter for Node Group Info
// All filtering/prettifying of data should be done here.
func (d *DebuggingSnapshotterImpl) SetNodeGroupInfo(nodeInfos []*framework.NodeInfo) {
if !d.IsDataCollectionAllowed() {
return
}
d.Mutex.Lock()
defer d.Mutex.Unlock()
klog.Infof("NodeGroupInfo is being set for the debugging snapshot")
d.DebuggingSnapshot.SetNodeGroupInfo(nodeInfos)
*d.State = DATA_COLLECTED
}
// SetUnscheduledPodsCanBeScheduled is the setter for UnscheduledPodsCanBeScheduled
func (d *DebuggingSnapshotterImpl) SetUnscheduledPodsCanBeScheduled(podList []*v1.Pod) {
if !d.IsDataCollectionAllowed() {
return
}
d.Mutex.Lock()
defer d.Mutex.Unlock()
klog.Infof("UnscheduledPodsCanBeScheduled is being set for the debugging snapshot")
d.DebuggingSnapshot.SetUnscheduledPodsCanBeScheduled(podList)
*d.State = DATA_COLLECTED
}
// Cleanup clears the internal data sets of the cluster
func (d *DebuggingSnapshotterImpl) Cleanup() {
if d.CancelRequest != nil {
d.CancelRequest()
}
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debuggingsnapshot
import (
"net/http"
"net/http/httptest"
"sync"
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
func TestBasicSnapshotRequest(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
snapshotter := NewDebuggingSnapshotter(true)
pod := []*framework.PodInfo{
{
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "Pod1",
},
Spec: v1.PodSpec{
NodeName: "testNode",
},
},
},
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "testNode",
},
}
nodeInfo := &framework.NodeInfo{
Pods: pod,
Requested: &framework.Resource{},
NonZeroRequested: &framework.Resource{},
Allocatable: &framework.Resource{},
Generation: 0,
}
var nodeGroups []*framework.NodeInfo
nodeGroups = append(nodeGroups, nodeInfo)
nodeGroups[0].SetNode(node)
req := httptest.NewRequest(http.MethodGet, "/", nil)
w := httptest.NewRecorder()
go func() {
snapshotter.ResponseHandler(w, req)
wg.Done()
}()
for !snapshotter.IsDataCollectionAllowed() {
snapshotter.StartDataCollection()
}
snapshotter.SetNodeGroupInfo(nodeGroups)
snapshotter.Flush()
wg.Wait()
resp := w.Result()
assert.Equal(t, http.StatusOK, w.Code)
assert.Greater(t, int64(0), resp.ContentLength)
}
func TestFlushWithoutData(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
snapshotter := NewDebuggingSnapshotter(true)
req := httptest.NewRequest(http.MethodGet, "/", nil)
w := httptest.NewRecorder()
go func() {
snapshotter.ResponseHandler(w, req)
wg.Done()
}()
for !snapshotter.IsDataCollectionAllowed() {
snapshotter.StartDataCollection()
}
snapshotter.Flush()
wg.Wait()
resp := w.Result()
assert.Equal(t, http.StatusInternalServerError, w.Code)
assert.Greater(t, int64(0), resp.ContentLength)
}
func TestRequestTerminationOnShutdown(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
snapshotter := NewDebuggingSnapshotter(true)
req := httptest.NewRequest(http.MethodGet, "/", nil)
w := httptest.NewRecorder()
go func() {
snapshotter.ResponseHandler(w, req)
wg.Done()
}()
for !snapshotter.IsDataCollectionAllowed() {
snapshotter.StartDataCollection()
}
go snapshotter.Cleanup()
wg.Wait()
assert.Equal(t, http.StatusServiceUnavailable, w.Code)
}
func TestRejectParallelRequest(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
snapshotter := NewDebuggingSnapshotter(true)
req := httptest.NewRequest(http.MethodGet, "/", nil)
w := httptest.NewRecorder()
go func() {
snapshotter.ResponseHandler(w, req)
wg.Done()
}()
for !snapshotter.IsDataCollectionAllowed() {
snapshotter.StartDataCollection()
}
w1 := httptest.NewRecorder()
req1 := httptest.NewRequest(http.MethodGet, "/", nil)
snapshotter.ResponseHandler(w1, req1)
assert.Equal(t, http.StatusTooManyRequests, w1.Code)
snapshotter.SetNodeGroupInfo(nil)
snapshotter.Flush()
wg.Wait()
assert.Equal(t, http.StatusOK, w.Code)
}

View File

@ -29,6 +29,8 @@ import (
"syscall"
"time"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -181,7 +183,8 @@ var (
daemonSetEvictionForOccupiedNodes = flag.Bool("daemonset-eviction-for-occupied-nodes", true, "DaemonSet pods will be gracefully terminated from non-empty nodes")
userAgent = flag.String("user-agent", "cluster-autoscaler", "User agent used for HTTP calls.")
emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.")
emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.")
debuggingSnapshotEnabled = flag.Bool("debugging-snapshot-enabled", false, "Whether the debugging snapshot of cluster autoscaler feature is enabled")
)
func createAutoscalingOptions() config.AutoscalingOptions {
@ -304,17 +307,18 @@ func registerSignalHandlers(autoscaler core.Autoscaler) {
}()
}
func buildAutoscaler() (core.Autoscaler, error) {
func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) {
// Create basic config from flags.
autoscalingOptions := createAutoscalingOptions()
kubeClient := createKubeClient(getKubeConfig())
eventsKubeClient := createKubeClient(getKubeConfig())
opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: simulator.NewDeltaClusterSnapshot(),
KubeClient: kubeClient,
EventsKubeClient: eventsKubeClient,
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: simulator.NewDeltaClusterSnapshot(),
KubeClient: kubeClient,
EventsKubeClient: eventsKubeClient,
DebuggingSnapshotter: debuggingSnapshotter,
}
opts.Processors = ca_processors.DefaultProcessors()
@ -345,10 +349,10 @@ func buildAutoscaler() (core.Autoscaler, error) {
return core.NewAutoscaler(opts)
}
func run(healthCheck *metrics.HealthCheck) {
func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) {
metrics.RegisterAll(*emitPerNodeGroupMetrics)
autoscaler, err := buildAutoscaler()
autoscaler, err := buildAutoscaler(debuggingSnapshotter)
if err != nil {
klog.Fatalf("Failed to create autoscaler: %v", err)
}
@ -400,12 +404,17 @@ func main() {
klog.V(1).Infof("Cluster Autoscaler %s", version.ClusterAutoscalerVersion)
debuggingSnapshotter := debuggingsnapshot.NewDebuggingSnapshotter(*debuggingSnapshotEnabled)
go func() {
pathRecorderMux := mux.NewPathRecorderMux("cluster-autoscaler")
defaultMetricsHandler := legacyregistry.Handler().ServeHTTP
pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
defaultMetricsHandler(w, req)
})
if *debuggingSnapshotEnabled {
pathRecorderMux.HandleFunc("/snapshotz", debuggingSnapshotter.ResponseHandler)
}
pathRecorderMux.HandleFunc("/health-check", healthCheck.ServeHTTP)
if *enableProfiling {
routes.Profiling{}.Install(pathRecorderMux)
@ -415,7 +424,7 @@ func main() {
}()
if !leaderElection.LeaderElect {
run(healthCheck)
run(healthCheck, debuggingSnapshotter)
} else {
id, err := os.Hostname()
if err != nil {
@ -455,7 +464,7 @@ func main() {
OnStartedLeading: func(_ ctx.Context) {
// Since we are committing a suicide after losing
// mastership, we can safely ignore the argument.
run(healthCheck)
run(healthCheck, debuggingSnapshotter)
},
OnStoppedLeading: func() {
klog.Fatalf("lost master")