Adding support for Debugging Snapshot
This commit is contained in:
parent
4b8c3937df
commit
729038ff2d
|
|
@ -20,6 +20,7 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/expander"
|
||||
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
|
||||
|
|
@ -50,6 +51,8 @@ type AutoscalingContext struct {
|
|||
EstimatorBuilder estimator.EstimatorBuilder
|
||||
// ProcessorCallbacks is interface defining extra callback methods which can be called by processors used in extension points.
|
||||
ProcessorCallbacks processor_callbacks.ProcessorCallbacks
|
||||
// DebuggingSnapshotter is the interface for capturing the debugging snapshot
|
||||
DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
|
||||
}
|
||||
|
||||
// AutoscalingKubeClients contains all Kubernetes API clients,
|
||||
|
|
@ -93,7 +96,8 @@ func NewAutoscalingContext(
|
|||
cloudProvider cloudprovider.CloudProvider,
|
||||
expanderStrategy expander.Strategy,
|
||||
estimatorBuilder estimator.EstimatorBuilder,
|
||||
processorCallbacks processor_callbacks.ProcessorCallbacks) *AutoscalingContext {
|
||||
processorCallbacks processor_callbacks.ProcessorCallbacks,
|
||||
debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) *AutoscalingContext {
|
||||
return &AutoscalingContext{
|
||||
AutoscalingOptions: options,
|
||||
CloudProvider: cloudProvider,
|
||||
|
|
@ -103,6 +107,7 @@ func NewAutoscalingContext(
|
|||
ExpanderStrategy: expanderStrategy,
|
||||
EstimatorBuilder: estimatorBuilder,
|
||||
ProcessorCallbacks: processorCallbacks,
|
||||
DebuggingSnapshotter: debuggingSnapshotter,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/expander"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
|
||||
|
|
@ -48,6 +49,7 @@ type AutoscalerOptions struct {
|
|||
EstimatorBuilder estimator.EstimatorBuilder
|
||||
Processors *ca_processors.AutoscalingProcessors
|
||||
Backoff backoff.Backoff
|
||||
DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
|
||||
}
|
||||
|
||||
// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
|
||||
|
|
@ -76,7 +78,8 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError)
|
|||
opts.CloudProvider,
|
||||
opts.ExpanderStrategy,
|
||||
opts.EstimatorBuilder,
|
||||
opts.Backoff), nil
|
||||
opts.Backoff,
|
||||
opts.DebuggingSnapshotter), nil
|
||||
}
|
||||
|
||||
// Initialize default options if not provided.
|
||||
|
|
|
|||
|
|
@ -78,6 +78,12 @@ func (p *filterOutSchedulablePodListProcessor) Process(
|
|||
if len(unschedulablePodsToHelp) != len(unschedulablePods) {
|
||||
klog.V(2).Info("Schedulable pods present")
|
||||
context.ProcessorCallbacks.DisableScaleDownForLoop()
|
||||
|
||||
if context.DebuggingSnapshotter.IsDataCollectionAllowed() {
|
||||
schedulablePods := findSchedulablePods(unschedulablePods, unschedulablePodsToHelp)
|
||||
context.DebuggingSnapshotter.SetUnscheduledPodsCanBeScheduled(schedulablePods)
|
||||
}
|
||||
|
||||
} else {
|
||||
klog.V(4).Info("No schedulable pods")
|
||||
}
|
||||
|
|
@ -179,3 +185,17 @@ func moreImportantPod(pod1, pod2 *apiv1.Pod) bool {
|
|||
p2 := corev1helpers.PodPriority(pod2)
|
||||
return p1 > p2
|
||||
}
|
||||
|
||||
func findSchedulablePods(allUnschedulablePods, podsStillUnschedulable []*apiv1.Pod) []*apiv1.Pod {
|
||||
podsStillUnschedulableMap := make(map[*apiv1.Pod]struct{}, len(podsStillUnschedulable))
|
||||
for _, x := range podsStillUnschedulable {
|
||||
podsStillUnschedulableMap[x] = struct{}{}
|
||||
}
|
||||
var schedulablePods []*apiv1.Pod
|
||||
for _, x := range allUnschedulablePods {
|
||||
if _, found := podsStillUnschedulableMap[x]; !found {
|
||||
schedulablePods = append(schedulablePods, x)
|
||||
}
|
||||
}
|
||||
return schedulablePods
|
||||
}
|
||||
|
|
|
|||
|
|
@ -136,7 +136,7 @@ func TestFindUnneededNodes(t *testing.T) {
|
|||
},
|
||||
UnremovableNodeRecheckTimeout: 5 * time.Minute,
|
||||
}
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -252,7 +252,7 @@ func TestFindUnneededGPUNodes(t *testing.T) {
|
|||
},
|
||||
UnremovableNodeRecheckTimeout: 5 * time.Minute,
|
||||
}
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -359,7 +359,7 @@ func TestFindUnneededWithPerNodeGroupThresholds(t *testing.T) {
|
|||
}
|
||||
for tn, tc := range cases {
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
context, err := NewScaleTestAutoscalingContext(globalOptions, &fake.Clientset{}, nil, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(globalOptions, &fake.Clientset{}, nil, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
sd := NewScaleDown(&context, NewTestProcessors(), clusterStateRegistry)
|
||||
|
|
@ -434,7 +434,7 @@ func TestPodsWithPreemptionsFindUnneededNodes(t *testing.T) {
|
|||
ScaleDownUtilizationThreshold: 0.35,
|
||||
},
|
||||
}
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -491,7 +491,7 @@ func TestFindUnneededMaxCandidates(t *testing.T) {
|
|||
ScaleDownCandidatesPoolRatio: 1,
|
||||
ScaleDownCandidatesPoolMinCount: 1000,
|
||||
}
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -566,7 +566,7 @@ func TestFindUnneededEmptyNodes(t *testing.T) {
|
|||
ScaleDownCandidatesPoolRatio: 1.0,
|
||||
ScaleDownCandidatesPoolMinCount: 1000,
|
||||
}
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -616,7 +616,7 @@ func TestFindUnneededNodePool(t *testing.T) {
|
|||
ScaleDownCandidatesPoolRatio: 0.1,
|
||||
ScaleDownCandidatesPoolMinCount: 10,
|
||||
}
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -760,7 +760,7 @@ func TestDeleteNode(t *testing.T) {
|
|||
|
||||
// build context
|
||||
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -1146,7 +1146,7 @@ func TestScaleDown(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
nodes := []*apiv1.Node{n1, n2}
|
||||
|
||||
|
|
@ -1331,7 +1331,7 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
|
|||
provider.AddNode("ng1", n1)
|
||||
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
if scenario.nodeInfoSuccess {
|
||||
|
|
@ -1554,7 +1554,7 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
|
|||
assert.NotNil(t, provider)
|
||||
|
||||
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(config.options, fakeClient, registry, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(config.options, fakeClient, registry, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -1643,7 +1643,7 @@ func TestNoScaleDownUnready(t *testing.T) {
|
|||
MaxGracefulTerminationSec: 60,
|
||||
}
|
||||
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
nodes := []*apiv1.Node{n1, n2}
|
||||
|
|
@ -1756,7 +1756,7 @@ func TestScaleDownNoMove(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
nodes := []*apiv1.Node{n1, n2}
|
||||
|
|
@ -2006,7 +2006,7 @@ func TestSoftTaint(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -2127,7 +2127,7 @@ func TestSoftTaintTimeLimit(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ import (
|
|||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
testcloudprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
||||
|
|
@ -155,7 +157,7 @@ func NewTestProcessors() *processors.AutoscalingProcessors {
|
|||
func NewScaleTestAutoscalingContext(
|
||||
options config.AutoscalingOptions, fakeClient kube_client.Interface,
|
||||
listers kube_util.ListerRegistry, provider cloudprovider.CloudProvider,
|
||||
processorCallbacks processor_callbacks.ProcessorCallbacks) (context.AutoscalingContext, error) {
|
||||
processorCallbacks processor_callbacks.ProcessorCallbacks, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (context.AutoscalingContext, error) {
|
||||
// Not enough buffer space causes the test to hang without printing any logs.
|
||||
// This is not useful.
|
||||
fakeRecorder := kube_record.NewFakeRecorder(100)
|
||||
|
|
@ -170,6 +172,9 @@ func NewScaleTestAutoscalingContext(
|
|||
if err != nil {
|
||||
return context.AutoscalingContext{}, err
|
||||
}
|
||||
if debuggingSnapshotter == nil {
|
||||
debuggingSnapshotter = debuggingsnapshot.NewDebuggingSnapshotter(false)
|
||||
}
|
||||
clusterSnapshot := simulator.NewBasicClusterSnapshot()
|
||||
return context.AutoscalingContext{
|
||||
AutoscalingOptions: options,
|
||||
|
|
@ -179,12 +184,13 @@ func NewScaleTestAutoscalingContext(
|
|||
LogRecorder: fakeLogRecorder,
|
||||
ListerRegistry: listers,
|
||||
},
|
||||
CloudProvider: provider,
|
||||
PredicateChecker: predicateChecker,
|
||||
ClusterSnapshot: clusterSnapshot,
|
||||
ExpanderStrategy: random.NewStrategy(),
|
||||
EstimatorBuilder: estimatorBuilder,
|
||||
ProcessorCallbacks: processorCallbacks,
|
||||
CloudProvider: provider,
|
||||
PredicateChecker: predicateChecker,
|
||||
ClusterSnapshot: clusterSnapshot,
|
||||
ExpanderStrategy: random.NewStrategy(),
|
||||
EstimatorBuilder: estimatorBuilder,
|
||||
ProcessorCallbacks: processorCallbacks,
|
||||
DebuggingSnapshotter: debuggingSnapshotter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -518,7 +518,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
|
|||
assert.NotNil(t, provider)
|
||||
|
||||
// Create context with non-random expander strategy.
|
||||
context, err := NewScaleTestAutoscalingContext(config.options, &fake.Clientset{}, listers, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(config.options, &fake.Clientset{}, listers, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
expander := reportingStrategy{
|
||||
|
|
@ -684,7 +684,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
|
|||
MaxCoresTotal: config.DefaultMaxClusterCores,
|
||||
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
||||
}
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
nodes := []*apiv1.Node{n1, n2}
|
||||
|
|
@ -724,7 +724,7 @@ func TestScaleUpNoHelp(t *testing.T) {
|
|||
MaxCoresTotal: config.DefaultMaxClusterCores,
|
||||
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
||||
}
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
nodes := []*apiv1.Node{n1}
|
||||
|
|
@ -790,7 +790,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
|
|||
MaxCoresTotal: config.DefaultMaxClusterCores,
|
||||
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
||||
}
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
|
||||
|
|
@ -851,7 +851,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
|
|||
}
|
||||
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
|
||||
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -904,7 +904,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
|
|||
}
|
||||
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
|
||||
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
|
||||
|
|
@ -980,7 +980,7 @@ func TestCheckScaleUpDeltaWithinLimits(t *testing.T) {
|
|||
|
||||
func TestAuthError(t *testing.T) {
|
||||
metrics.RegisterAll(false)
|
||||
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, nil, nil)
|
||||
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, nil, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
nodeGroup := &mockprovider.NodeGroup{}
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
|
|
@ -119,7 +120,8 @@ func NewStaticAutoscaler(
|
|||
cloudProvider cloudprovider.CloudProvider,
|
||||
expanderStrategy expander.Strategy,
|
||||
estimatorBuilder estimator.EstimatorBuilder,
|
||||
backoff backoff.Backoff) *StaticAutoscaler {
|
||||
backoff backoff.Backoff,
|
||||
debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) *StaticAutoscaler {
|
||||
|
||||
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
|
||||
autoscalingContext := context.NewAutoscalingContext(
|
||||
|
|
@ -130,7 +132,8 @@ func NewStaticAutoscaler(
|
|||
cloudProvider,
|
||||
expanderStrategy,
|
||||
estimatorBuilder,
|
||||
processorCallbacks)
|
||||
processorCallbacks,
|
||||
debuggingSnapshotter)
|
||||
|
||||
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
|
||||
MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage,
|
||||
|
|
@ -220,6 +223,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
|
|||
a.cleanUpIfRequired()
|
||||
a.processorCallbacks.reset()
|
||||
a.clusterStateRegistry.PeriodicCleanup()
|
||||
a.DebuggingSnapshotter.StartDataCollection()
|
||||
defer a.DebuggingSnapshotter.Flush()
|
||||
|
||||
unschedulablePodLister := a.UnschedulablePodLister()
|
||||
scheduledPodLister := a.ScheduledPodLister()
|
||||
|
|
@ -409,6 +414,13 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
|
|||
}
|
||||
}
|
||||
|
||||
l, err := a.ClusterSnapshot.NodeInfos().List()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to fetch NodeInfo List for Debugging Snapshot, %v", err)
|
||||
} else {
|
||||
a.AutoscalingContext.DebuggingSnapshotter.SetNodeGroupInfo(l)
|
||||
}
|
||||
|
||||
unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)
|
||||
|
||||
// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
|
||||
|
|
@ -715,6 +727,7 @@ func (a *StaticAutoscaler) filterOutYoungPods(allUnschedulablePods []*apiv1.Pod,
|
|||
// ExitCleanUp performs all necessary clean-ups when the autoscaler's exiting.
|
||||
func (a *StaticAutoscaler) ExitCleanUp() {
|
||||
a.processors.CleanUp()
|
||||
a.DebuggingSnapshotter.Cleanup()
|
||||
|
||||
if !a.AutoscalingContext.WriteStatusConfigMap {
|
||||
return
|
||||
|
|
|
|||
|
|
@ -185,7 +185,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
|
|||
}
|
||||
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
|
||||
|
|
@ -376,7 +376,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
|
|||
}
|
||||
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
|
||||
|
|
@ -511,7 +511,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
|
|||
}
|
||||
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
|
||||
|
|
@ -658,7 +658,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
|
|||
}
|
||||
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
|
||||
|
|
@ -786,7 +786,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
|
|||
}
|
||||
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
|
||||
|
|
@ -882,7 +882,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
|
|||
}
|
||||
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock,
|
||||
|
|
@ -944,7 +944,7 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) {
|
|||
}
|
||||
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
|
||||
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks)
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
|
||||
|
|
|
|||
|
|
@ -0,0 +1,150 @@
|
|||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package debuggingsnapshot
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
// NodeInfo captures a single entity of nodeInfo. i.e. Node specs and all the pods on that node.
|
||||
type NodeInfo struct {
|
||||
Node *v1.Node `json:"Node"`
|
||||
Pods []*framework.PodInfo `json:"Pods"`
|
||||
}
|
||||
|
||||
// DebuggingSnapshot is the interface used to define any debugging snapshot
|
||||
// implementation, incl. any custom impl. to be used by DebuggingSnapshotter
|
||||
type DebuggingSnapshot interface {
|
||||
// SetNodeGroupInfo is a setter to capture all the NodeInfo
|
||||
SetNodeGroupInfo([]*framework.NodeInfo)
|
||||
// SetUnscheduledPodsCanBeScheduled is a setter for all pods which are unscheduled
|
||||
// but they can be scheduled. i.e. pods which aren't triggering scale-up
|
||||
SetUnscheduledPodsCanBeScheduled([]*v1.Pod)
|
||||
// SetErrorMessage sets the error message in the snapshot
|
||||
SetErrorMessage(string)
|
||||
// SetEndTimestamp sets the timestamp in the snapshot,
|
||||
// when all the data collection is finished
|
||||
SetEndTimestamp(time.Time)
|
||||
// SetStartTimestamp sets the timestamp in the snapshot,
|
||||
// when all the data collection is started
|
||||
SetStartTimestamp(time.Time)
|
||||
// GetOutputBytes return the output state of the Snapshot with bool to specify if
|
||||
// the snapshot has the error message set
|
||||
GetOutputBytes() ([]byte, bool)
|
||||
// Cleanup clears the internal data obj of the snapshot, readying for next request
|
||||
Cleanup()
|
||||
}
|
||||
|
||||
// DebuggingSnapshotImpl is the struct used to collect all the data to be output.
|
||||
// Please add all new output fields in this struct. This is to make the data
|
||||
// encoding/decoding easier as the single object going into the decoder
|
||||
type DebuggingSnapshotImpl struct {
|
||||
NodeInfo []*NodeInfo `json:"NodeList"`
|
||||
UnscheduledPodsCanBeScheduled []*v1.Pod `json:"UnscheduledPodsCanBeScheduled"`
|
||||
Error string `json:"Error,omitempty"`
|
||||
StartTimestamp time.Time `json:"StartTimestamp"`
|
||||
EndTimestamp time.Time `json:"EndTimestamp"`
|
||||
}
|
||||
|
||||
// SetUnscheduledPodsCanBeScheduled is the setter for UnscheduledPodsCanBeScheduled
|
||||
func (s *DebuggingSnapshotImpl) SetUnscheduledPodsCanBeScheduled(podList []*v1.Pod) {
|
||||
if podList == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.UnscheduledPodsCanBeScheduled = nil
|
||||
for _, pod := range podList {
|
||||
s.UnscheduledPodsCanBeScheduled = append(s.UnscheduledPodsCanBeScheduled, pod)
|
||||
}
|
||||
}
|
||||
|
||||
// SetNodeGroupInfo is the setter for Node Group Info
|
||||
// All filtering/prettifying of data should be done here.
|
||||
func (s *DebuggingSnapshotImpl) SetNodeGroupInfo(nodeInfos []*framework.NodeInfo) {
|
||||
if nodeInfos == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var NodeInfoList []*NodeInfo
|
||||
|
||||
for _, n := range nodeInfos {
|
||||
nClone := n.Clone()
|
||||
node := nClone.Node()
|
||||
|
||||
nodeInfo := &NodeInfo{
|
||||
Node: node,
|
||||
Pods: nClone.Pods,
|
||||
}
|
||||
|
||||
NodeInfoList = append(NodeInfoList, nodeInfo)
|
||||
}
|
||||
s.NodeInfo = NodeInfoList
|
||||
}
|
||||
|
||||
// SetEndTimestamp is the setter for end timestamp
|
||||
func (s *DebuggingSnapshotImpl) SetEndTimestamp(t time.Time) {
|
||||
s.EndTimestamp = t
|
||||
}
|
||||
|
||||
// SetStartTimestamp is the setter for end timestamp
|
||||
func (s *DebuggingSnapshotImpl) SetStartTimestamp(t time.Time) {
|
||||
s.StartTimestamp = t
|
||||
}
|
||||
|
||||
// GetOutputBytes return the output state of the Snapshot with bool to specify if
|
||||
// the snapshot has the error message set
|
||||
func (s *DebuggingSnapshotImpl) GetOutputBytes() ([]byte, bool) {
|
||||
errMsgSet := false
|
||||
if s.Error != "" {
|
||||
klog.Errorf("Debugging snapshot found with error message set when GetOutputBytes() is called. - ", s.Error)
|
||||
errMsgSet = true
|
||||
}
|
||||
|
||||
klog.Infof("Debugging snapshot flush ready")
|
||||
marshalOutput, err := json.Marshal(s)
|
||||
|
||||
// this error captures if the snapshot couldn't be marshalled, hence we create a new object
|
||||
// and return the error message
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to json marshal the debugging snapshot: %v", err)
|
||||
errorSnapshot := DebuggingSnapshotImpl{}
|
||||
errorSnapshot.SetErrorMessage("Unable to marshal the snapshot, " + err.Error())
|
||||
errorSnapshot.SetEndTimestamp(s.EndTimestamp)
|
||||
errorSnapshot.SetStartTimestamp(s.StartTimestamp)
|
||||
errorMarshal, err1 := json.Marshal(errorSnapshot)
|
||||
klog.Errorf("Unable to marshal a new Debugging Snapshot Impl, with just a error message: %v", err1)
|
||||
return errorMarshal, true
|
||||
}
|
||||
|
||||
return marshalOutput, errMsgSet
|
||||
}
|
||||
|
||||
// SetErrorMessage sets the error message in the snapshot
|
||||
func (s *DebuggingSnapshotImpl) SetErrorMessage(error string) {
|
||||
s.Error = error
|
||||
}
|
||||
|
||||
// Cleanup cleans up all the data in the snapshot without changing the
|
||||
// pointer reference
|
||||
func (s *DebuggingSnapshotImpl) Cleanup() {
|
||||
*s = DebuggingSnapshotImpl{}
|
||||
}
|
||||
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package debuggingsnapshot
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
func TestBasicSetterWorkflow(t *testing.T) {
|
||||
snapshot := &DebuggingSnapshotImpl{}
|
||||
pod := []*framework.PodInfo{
|
||||
{
|
||||
Pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "Pod1",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "testNode",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
node := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testNode",
|
||||
},
|
||||
}
|
||||
|
||||
nodeInfo := &framework.NodeInfo{
|
||||
Pods: pod,
|
||||
Requested: &framework.Resource{},
|
||||
NonZeroRequested: &framework.Resource{},
|
||||
Allocatable: &framework.Resource{},
|
||||
Generation: 0,
|
||||
}
|
||||
|
||||
var nodeGroups []*framework.NodeInfo
|
||||
nodeGroups = append(nodeGroups, nodeInfo)
|
||||
nodeGroups[0].SetNode(node)
|
||||
timestamp := time.Now().In(time.UTC)
|
||||
snapshot.SetNodeGroupInfo(nodeGroups)
|
||||
snapshot.SetEndTimestamp(timestamp)
|
||||
op, err := snapshot.GetOutputBytes()
|
||||
assert.False(t, err)
|
||||
|
||||
type JSONList = []interface{}
|
||||
type JSONMap = map[string]interface{}
|
||||
var String = "test"
|
||||
|
||||
var parsed map[string]interface{}
|
||||
er := json.Unmarshal(op, &parsed)
|
||||
assert.NoError(t, er)
|
||||
assert.IsType(t, JSONMap{}, parsed)
|
||||
assert.IsType(t, JSONList{}, parsed["NodeList"])
|
||||
assert.Greater(t, len(parsed["NodeList"].([]interface{})), 0)
|
||||
assert.IsType(t, JSONMap{}, parsed["NodeList"].([]interface{})[0])
|
||||
pNodeInfo := parsed["NodeList"].([]interface{})[0].(map[string]interface{})
|
||||
assert.IsType(t, JSONMap{}, pNodeInfo["Node"].(map[string]interface{}))
|
||||
pNode := pNodeInfo["Node"].(map[string]interface{})
|
||||
assert.IsType(t, JSONMap{}, pNode["metadata"].(map[string]interface{}))
|
||||
pNodeObjectMeta := pNode["metadata"].(map[string]interface{})
|
||||
assert.IsType(t, String, pNodeObjectMeta["name"])
|
||||
pNodeName := pNodeObjectMeta["name"].(string)
|
||||
assert.Equal(t, pNodeName, "testNode")
|
||||
|
||||
assert.IsType(t, JSONList{}, pNodeInfo["Pods"])
|
||||
assert.Greater(t, len(pNodeInfo["Pods"].([]interface{})), 0)
|
||||
assert.IsType(t, JSONMap{}, pNodeInfo["Pods"].([]interface{})[0])
|
||||
pPodInfo := pNodeInfo["Pods"].([]interface{})[0].(map[string]interface{})
|
||||
assert.IsType(t, JSONMap{}, pPodInfo["Pod"])
|
||||
pPod := pPodInfo["Pod"].(map[string]interface{})
|
||||
assert.IsType(t, JSONMap{}, pPod["metadata"])
|
||||
pPodMeta := pPod["metadata"].(map[string]interface{})
|
||||
assert.IsType(t, String, pPodMeta["name"])
|
||||
pPodName := pPodMeta["name"].(string)
|
||||
assert.Equal(t, pPodName, "Pod1")
|
||||
|
||||
}
|
||||
|
||||
func TestEmptyDataNoError(t *testing.T) {
|
||||
snapshot := &DebuggingSnapshotImpl{}
|
||||
op, err := snapshot.GetOutputBytes()
|
||||
assert.False(t, err)
|
||||
assert.NotNil(t, op)
|
||||
}
|
||||
|
|
@ -0,0 +1,238 @@
|
|||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package debuggingsnapshot
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
// DebuggingSnapshotterState is the type for the debugging snapshot State machine
|
||||
// The states guide the workflow of the snapshot.
|
||||
type DebuggingSnapshotterState int
|
||||
|
||||
// DebuggingSnapshotterState help navigate the different workflows of the snapshot capture.
|
||||
const (
|
||||
// SNAPSHOTTER_DISABLED is when debuggingSnapshot is disabled on the cluster and no action can be taken
|
||||
SNAPSHOTTER_DISABLED DebuggingSnapshotterState = iota + 1
|
||||
// LISTENING is set when snapshotter is enabled on the cluster and is ready to listen to a
|
||||
// snapshot request. Used by ResponseHandler to wait on to listen to request
|
||||
LISTENING
|
||||
// TRIGGER_ENABLED is set by ResponseHandler if a valid snapshot request is received
|
||||
// it states that a snapshot request needs to be processed
|
||||
TRIGGER_ENABLED
|
||||
// START_DATA_COLLECTION is used to synchronise the collection of data.
|
||||
// Since the trigger is an asynchronous process, data collection could be started mid-loop
|
||||
// leading to incomplete data. So setter methods wait for START_DATA_COLLECTION before collecting data
|
||||
// which is set at the start of the next loop after receiving the trigger
|
||||
START_DATA_COLLECTION
|
||||
// DATA_COLLECTED is set by setter func (also used by setter func for data collection)
|
||||
// This is set to let Flush know that at least some data collected and there isn't
|
||||
// an error State leading to no data collection
|
||||
DATA_COLLECTED
|
||||
)
|
||||
|
||||
// DebuggingSnapshotterImpl is the impl for DebuggingSnapshotter
|
||||
type DebuggingSnapshotterImpl struct {
|
||||
// State captures the internal state of the snapshotter
|
||||
State *DebuggingSnapshotterState
|
||||
// DebuggingSnapshot is the data bean for the snapshot
|
||||
DebuggingSnapshot DebuggingSnapshot
|
||||
// Mutex is the synchronisation used to the methods/states in the critical section
|
||||
Mutex *sync.Mutex
|
||||
// Trigger is the channel on which the Response Handler waits on to know
|
||||
// when there is data to be flushed back to the channel from the Snapshot
|
||||
Trigger chan struct{}
|
||||
// CancelRequest is the cancel function for the snapshot request. It is used to
|
||||
// terminate any ongoing request when CA is shutting down
|
||||
CancelRequest context.CancelFunc
|
||||
}
|
||||
|
||||
// DebuggingSnapshotter is the interface for debugging snapshot
|
||||
type DebuggingSnapshotter interface {
|
||||
|
||||
// StartDataCollection will check the State(s) and enable data
|
||||
// collection for the loop if applicable
|
||||
StartDataCollection()
|
||||
// SetNodeGroupInfo is a setter to capture all the NodeInfo
|
||||
SetNodeGroupInfo([]*framework.NodeInfo)
|
||||
// SetUnscheduledPodsCanBeScheduled is a setter for all pods which are unscheduled
|
||||
// but they can be scheduled. i.e. pods which aren't triggering scale-up
|
||||
SetUnscheduledPodsCanBeScheduled([]*v1.Pod)
|
||||
// ResponseHandler is the http response handler to manage incoming requests
|
||||
ResponseHandler(http.ResponseWriter, *http.Request)
|
||||
// IsDataCollectionAllowed checks the internal State of the snapshotter
|
||||
// to find if data can be collected. This can be used before preprocessing
|
||||
// for the snapshot
|
||||
IsDataCollectionAllowed() bool
|
||||
// Flush triggers the flushing of the snapshot
|
||||
Flush()
|
||||
// Cleanup clears the internal data beans of the snapshot, readying for next request
|
||||
Cleanup()
|
||||
}
|
||||
|
||||
// NewDebuggingSnapshotter returns a new instance of DebuggingSnapshotter
|
||||
func NewDebuggingSnapshotter(isDebuggerEnabled bool) DebuggingSnapshotter {
|
||||
state := SNAPSHOTTER_DISABLED
|
||||
if isDebuggerEnabled {
|
||||
klog.Infof("Debugging Snapshot is enabled")
|
||||
state = LISTENING
|
||||
}
|
||||
return &DebuggingSnapshotterImpl{
|
||||
State: &state,
|
||||
Mutex: &sync.Mutex{},
|
||||
DebuggingSnapshot: &DebuggingSnapshotImpl{},
|
||||
Trigger: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// ResponseHandler is the impl for request handler
|
||||
func (d *DebuggingSnapshotterImpl) ResponseHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
d.Mutex.Lock()
|
||||
// checks if the handler is in the correct State to accept a new snapshot request
|
||||
if *d.State != LISTENING {
|
||||
defer d.Mutex.Unlock()
|
||||
klog.Errorf("Debugging Snapshot is currently being processed. Another snapshot can't be processed")
|
||||
w.WriteHeader(http.StatusTooManyRequests)
|
||||
w.Write([]byte("Another debugging snapshot request is being processed. Concurrent requests not supported"))
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(r.Context())
|
||||
d.CancelRequest = cancel
|
||||
|
||||
klog.Infof("Received a new snapshot, that is accepted")
|
||||
// set the State to trigger enabled, to allow workflow to collect data
|
||||
*d.State = TRIGGER_ENABLED
|
||||
d.Mutex.Unlock()
|
||||
|
||||
select {
|
||||
case <-d.Trigger:
|
||||
d.Mutex.Lock()
|
||||
d.DebuggingSnapshot.SetEndTimestamp(time.Now().In(time.UTC))
|
||||
body, isErrorMessage := d.DebuggingSnapshot.GetOutputBytes()
|
||||
if isErrorMessage {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
w.Write(body)
|
||||
|
||||
// reset the debugging State to receive a new snapshot request
|
||||
*d.State = LISTENING
|
||||
d.CancelRequest = nil
|
||||
d.DebuggingSnapshot.Cleanup()
|
||||
|
||||
d.Mutex.Unlock()
|
||||
case <-ctx.Done():
|
||||
d.Mutex.Lock()
|
||||
klog.Infof("Received terminate trigger, aborting ongoing snapshot request")
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
|
||||
d.DebuggingSnapshot.Cleanup()
|
||||
*d.State = LISTENING
|
||||
d.CancelRequest = nil
|
||||
select {
|
||||
case <-d.Trigger:
|
||||
default:
|
||||
}
|
||||
d.Mutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// IsDataCollectionAllowed encapsulate the check to know if data collection is currently active
|
||||
// This should be used by setters and by any function that is contingent on data collection State
|
||||
// before doing extra processing.
|
||||
// e.g. If you want to pre-process a particular State in cloud-provider for snapshot
|
||||
// you should check this func in the loop before doing that extra processing
|
||||
func (d *DebuggingSnapshotterImpl) IsDataCollectionAllowed() bool {
|
||||
d.Mutex.Lock()
|
||||
defer d.Mutex.Unlock()
|
||||
return *d.State == DATA_COLLECTED || *d.State == START_DATA_COLLECTION
|
||||
}
|
||||
|
||||
// StartDataCollection changes the State when the trigger has been enabled
|
||||
// to start data collection. To be done at the start of the runLoop to allow for consistency
|
||||
// as the trigger can be called mid-loop leading to partial data collection
|
||||
func (d *DebuggingSnapshotterImpl) StartDataCollection() {
|
||||
d.Mutex.Lock()
|
||||
defer d.Mutex.Unlock()
|
||||
if *d.State == TRIGGER_ENABLED {
|
||||
*d.State = START_DATA_COLLECTION
|
||||
klog.Infof("Trigger Enabled for Debugging Snapshot, starting data collection")
|
||||
d.DebuggingSnapshot.SetStartTimestamp(time.Now().In(time.UTC))
|
||||
}
|
||||
}
|
||||
|
||||
// Flush is the impl for DebuggingSnapshotter.Flush
|
||||
// It checks if any data has been collected or data collection failed
|
||||
func (d *DebuggingSnapshotterImpl) Flush() {
|
||||
d.Mutex.Lock()
|
||||
defer d.Mutex.Unlock()
|
||||
|
||||
// Case where Data Collection was started but no data was collected, needs to
|
||||
// be stated as an error and reset to pre-trigger State
|
||||
if *d.State == START_DATA_COLLECTION {
|
||||
klog.Errorf("No data was collected for the snapshot in this loop. So no snapshot can be generated.")
|
||||
d.DebuggingSnapshot.SetErrorMessage("Unable to collect any data")
|
||||
d.Trigger <- struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
if *d.State == DATA_COLLECTED {
|
||||
d.Trigger <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// SetNodeGroupInfo is the setter for Node Group Info
|
||||
// All filtering/prettifying of data should be done here.
|
||||
func (d *DebuggingSnapshotterImpl) SetNodeGroupInfo(nodeInfos []*framework.NodeInfo) {
|
||||
if !d.IsDataCollectionAllowed() {
|
||||
return
|
||||
}
|
||||
d.Mutex.Lock()
|
||||
defer d.Mutex.Unlock()
|
||||
klog.Infof("NodeGroupInfo is being set for the debugging snapshot")
|
||||
d.DebuggingSnapshot.SetNodeGroupInfo(nodeInfos)
|
||||
*d.State = DATA_COLLECTED
|
||||
}
|
||||
|
||||
// SetUnscheduledPodsCanBeScheduled is the setter for UnscheduledPodsCanBeScheduled
|
||||
func (d *DebuggingSnapshotterImpl) SetUnscheduledPodsCanBeScheduled(podList []*v1.Pod) {
|
||||
if !d.IsDataCollectionAllowed() {
|
||||
return
|
||||
}
|
||||
d.Mutex.Lock()
|
||||
defer d.Mutex.Unlock()
|
||||
klog.Infof("UnscheduledPodsCanBeScheduled is being set for the debugging snapshot")
|
||||
d.DebuggingSnapshot.SetUnscheduledPodsCanBeScheduled(podList)
|
||||
*d.State = DATA_COLLECTED
|
||||
}
|
||||
|
||||
// Cleanup clears the internal data sets of the cluster
|
||||
func (d *DebuggingSnapshotterImpl) Cleanup() {
|
||||
if d.CancelRequest != nil {
|
||||
d.CancelRequest()
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,160 @@
|
|||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package debuggingsnapshot
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
func TestBasicSnapshotRequest(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
snapshotter := NewDebuggingSnapshotter(true)
|
||||
|
||||
pod := []*framework.PodInfo{
|
||||
{
|
||||
Pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "Pod1",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "testNode",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
node := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testNode",
|
||||
},
|
||||
}
|
||||
|
||||
nodeInfo := &framework.NodeInfo{
|
||||
Pods: pod,
|
||||
Requested: &framework.Resource{},
|
||||
NonZeroRequested: &framework.Resource{},
|
||||
Allocatable: &framework.Resource{},
|
||||
Generation: 0,
|
||||
}
|
||||
|
||||
var nodeGroups []*framework.NodeInfo
|
||||
nodeGroups = append(nodeGroups, nodeInfo)
|
||||
nodeGroups[0].SetNode(node)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
go func() {
|
||||
snapshotter.ResponseHandler(w, req)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
for !snapshotter.IsDataCollectionAllowed() {
|
||||
snapshotter.StartDataCollection()
|
||||
}
|
||||
snapshotter.SetNodeGroupInfo(nodeGroups)
|
||||
snapshotter.Flush()
|
||||
|
||||
wg.Wait()
|
||||
resp := w.Result()
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
assert.Greater(t, int64(0), resp.ContentLength)
|
||||
}
|
||||
|
||||
func TestFlushWithoutData(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
snapshotter := NewDebuggingSnapshotter(true)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
go func() {
|
||||
snapshotter.ResponseHandler(w, req)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
for !snapshotter.IsDataCollectionAllowed() {
|
||||
snapshotter.StartDataCollection()
|
||||
}
|
||||
snapshotter.Flush()
|
||||
|
||||
wg.Wait()
|
||||
resp := w.Result()
|
||||
assert.Equal(t, http.StatusInternalServerError, w.Code)
|
||||
assert.Greater(t, int64(0), resp.ContentLength)
|
||||
}
|
||||
|
||||
func TestRequestTerminationOnShutdown(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
snapshotter := NewDebuggingSnapshotter(true)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
go func() {
|
||||
snapshotter.ResponseHandler(w, req)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
for !snapshotter.IsDataCollectionAllowed() {
|
||||
snapshotter.StartDataCollection()
|
||||
}
|
||||
|
||||
go snapshotter.Cleanup()
|
||||
wg.Wait()
|
||||
|
||||
assert.Equal(t, http.StatusServiceUnavailable, w.Code)
|
||||
}
|
||||
|
||||
func TestRejectParallelRequest(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
snapshotter := NewDebuggingSnapshotter(true)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
go func() {
|
||||
snapshotter.ResponseHandler(w, req)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
for !snapshotter.IsDataCollectionAllowed() {
|
||||
snapshotter.StartDataCollection()
|
||||
}
|
||||
|
||||
w1 := httptest.NewRecorder()
|
||||
req1 := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
snapshotter.ResponseHandler(w1, req1)
|
||||
assert.Equal(t, http.StatusTooManyRequests, w1.Code)
|
||||
|
||||
snapshotter.SetNodeGroupInfo(nil)
|
||||
snapshotter.Flush()
|
||||
wg.Wait()
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
}
|
||||
|
|
@ -29,6 +29,8 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
|
@ -181,7 +183,8 @@ var (
|
|||
daemonSetEvictionForOccupiedNodes = flag.Bool("daemonset-eviction-for-occupied-nodes", true, "DaemonSet pods will be gracefully terminated from non-empty nodes")
|
||||
userAgent = flag.String("user-agent", "cluster-autoscaler", "User agent used for HTTP calls.")
|
||||
|
||||
emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.")
|
||||
emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.")
|
||||
debuggingSnapshotEnabled = flag.Bool("debugging-snapshot-enabled", false, "Whether the debugging snapshot of cluster autoscaler feature is enabled")
|
||||
)
|
||||
|
||||
func createAutoscalingOptions() config.AutoscalingOptions {
|
||||
|
|
@ -304,17 +307,18 @@ func registerSignalHandlers(autoscaler core.Autoscaler) {
|
|||
}()
|
||||
}
|
||||
|
||||
func buildAutoscaler() (core.Autoscaler, error) {
|
||||
func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) {
|
||||
// Create basic config from flags.
|
||||
autoscalingOptions := createAutoscalingOptions()
|
||||
kubeClient := createKubeClient(getKubeConfig())
|
||||
eventsKubeClient := createKubeClient(getKubeConfig())
|
||||
|
||||
opts := core.AutoscalerOptions{
|
||||
AutoscalingOptions: autoscalingOptions,
|
||||
ClusterSnapshot: simulator.NewDeltaClusterSnapshot(),
|
||||
KubeClient: kubeClient,
|
||||
EventsKubeClient: eventsKubeClient,
|
||||
AutoscalingOptions: autoscalingOptions,
|
||||
ClusterSnapshot: simulator.NewDeltaClusterSnapshot(),
|
||||
KubeClient: kubeClient,
|
||||
EventsKubeClient: eventsKubeClient,
|
||||
DebuggingSnapshotter: debuggingSnapshotter,
|
||||
}
|
||||
|
||||
opts.Processors = ca_processors.DefaultProcessors()
|
||||
|
|
@ -345,10 +349,10 @@ func buildAutoscaler() (core.Autoscaler, error) {
|
|||
return core.NewAutoscaler(opts)
|
||||
}
|
||||
|
||||
func run(healthCheck *metrics.HealthCheck) {
|
||||
func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) {
|
||||
metrics.RegisterAll(*emitPerNodeGroupMetrics)
|
||||
|
||||
autoscaler, err := buildAutoscaler()
|
||||
autoscaler, err := buildAutoscaler(debuggingSnapshotter)
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed to create autoscaler: %v", err)
|
||||
}
|
||||
|
|
@ -400,12 +404,17 @@ func main() {
|
|||
|
||||
klog.V(1).Infof("Cluster Autoscaler %s", version.ClusterAutoscalerVersion)
|
||||
|
||||
debuggingSnapshotter := debuggingsnapshot.NewDebuggingSnapshotter(*debuggingSnapshotEnabled)
|
||||
|
||||
go func() {
|
||||
pathRecorderMux := mux.NewPathRecorderMux("cluster-autoscaler")
|
||||
defaultMetricsHandler := legacyregistry.Handler().ServeHTTP
|
||||
pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
|
||||
defaultMetricsHandler(w, req)
|
||||
})
|
||||
if *debuggingSnapshotEnabled {
|
||||
pathRecorderMux.HandleFunc("/snapshotz", debuggingSnapshotter.ResponseHandler)
|
||||
}
|
||||
pathRecorderMux.HandleFunc("/health-check", healthCheck.ServeHTTP)
|
||||
if *enableProfiling {
|
||||
routes.Profiling{}.Install(pathRecorderMux)
|
||||
|
|
@ -415,7 +424,7 @@ func main() {
|
|||
}()
|
||||
|
||||
if !leaderElection.LeaderElect {
|
||||
run(healthCheck)
|
||||
run(healthCheck, debuggingSnapshotter)
|
||||
} else {
|
||||
id, err := os.Hostname()
|
||||
if err != nil {
|
||||
|
|
@ -455,7 +464,7 @@ func main() {
|
|||
OnStartedLeading: func(_ ctx.Context) {
|
||||
// Since we are committing a suicide after losing
|
||||
// mastership, we can safely ignore the argument.
|
||||
run(healthCheck)
|
||||
run(healthCheck, debuggingSnapshotter)
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
klog.Fatalf("lost master")
|
||||
|
|
|
|||
Loading…
Reference in New Issue