Add force Daemon Sets option

This commit is contained in:
Bartłomiej Wróblewski 2023-01-24 17:02:11 +00:00
parent 4d94120223
commit b608278386
11 changed files with 253 additions and 196 deletions

View File

@ -533,7 +533,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleTestConfig) *ScaleTestResul
}
context.ExpanderStrategy = expander
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
@ -694,7 +694,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
assert.NoError(t, err)
nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
@ -736,7 +736,7 @@ func TestScaleUpNoHelp(t *testing.T) {
assert.NoError(t, err)
nodes := []*apiv1.Node{n1}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
@ -804,7 +804,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
@ -873,7 +873,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{t, 0}
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
resourceManager := scaleup.NewResourceManager(processors.CustomResourcesProcessor)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, resourceManager, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
@ -927,7 +927,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{t, 2}
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
resourceManager := scaleup.NewResourceManager(processors.CustomResourcesProcessor)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, resourceManager, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
@ -978,7 +978,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
assert.NoError(t, err)
nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
processors := NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

View File

@ -68,7 +68,7 @@ func TestDeltaForNode(t *testing.T) {
ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&ctx, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
rm := NewResourceManager(processors.CustomResourcesProcessor)
delta, err := rm.DeltaForNode(&ctx, nodeInfos[ng.Name], group)
@ -109,7 +109,7 @@ func TestResourcesLeft(t *testing.T) {
ng := testCase.nodeGroupConfig
_, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&ctx, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
rm := NewResourceManager(processors.CustomResourcesProcessor)
left, err := rm.ResourcesLeft(&ctx, nodeInfos, nodes)
@ -160,7 +160,7 @@ func TestApplyResourcesLimits(t *testing.T) {
ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&ctx, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
rm := NewResourceManager(processors.CustomResourcesProcessor)
newCount, err := rm.ApplyResourcesLimits(&ctx, testCase.newNodeCount, testCase.resourcesLeft, nodeInfos[testCase.nodeGroupConfig.Name], group)
@ -225,7 +225,7 @@ func TestResourceManagerWithGpuResource(t *testing.T) {
assert.NoError(t, err)
nodes := []*corev1.Node{n1}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
rm := NewResourceManager(processors.CustomResourcesProcessor)

View File

@ -146,7 +146,7 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),

View File

@ -220,6 +220,7 @@ var (
maxCapacityMemoryDifferenceRatio = flag.Float64("memory-difference-ratio", config.DefaultMaxCapacityMemoryDifferenceRatio, "Maximum difference in memory capacity between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's memory capacity.")
maxFreeDifferenceRatio = flag.Float64("max-free-difference-ratio", config.DefaultMaxFreeDifferenceRatio, "Maximum difference in free resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's free resource.")
maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.")
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
)
func createAutoscalingOptions() config.AutoscalingOptions {
@ -399,7 +400,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
}
opts.Processors = ca_processors.DefaultProcessors()
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
opts.Processors.PodListProcessor = podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)
if autoscalingOptions.ParallelDrain {
sdProcessor := nodes.NewScaleDownCandidatesSortingProcessor()
@ -418,7 +419,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
nodeInfoComparatorBuilder = nodegroupset.CreateAwsNodeInfoComparator
} else if autoscalingOptions.CloudProviderName == cloudprovider.GceProviderName {
nodeInfoComparatorBuilder = nodegroupset.CreateGceNodeInfoComparator
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewAnnotationNodeInfoProvider(nodeInfoCacheExpireTime)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewAnnotationNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
}
nodeInfoComparator = nodeInfoComparatorBuilder(autoscalingOptions.BalancingExtraIgnoredLabels, autoscalingOptions.NodeGroupSetRatios)
}

View File

@ -33,9 +33,9 @@ type AnnotationNodeInfoProvider struct {
}
// NewAnnotationNodeInfoProvider returns AnnotationNodeInfoProvider.
func NewAnnotationNodeInfoProvider(t *time.Duration) *AnnotationNodeInfoProvider {
func NewAnnotationNodeInfoProvider(t *time.Duration, forceDaemonSets bool) *AnnotationNodeInfoProvider {
return &AnnotationNodeInfoProvider{
mixedTemplateNodeInfoProvider: NewMixedTemplateNodeInfoProvider(t),
mixedTemplateNodeInfoProvider: NewMixedTemplateNodeInfoProvider(t, forceDaemonSets),
}
}
@ -45,7 +45,7 @@ func (p *AnnotationNodeInfoProvider) Process(ctx *context.AutoscalingContext, no
if err != nil {
return nil, err
}
// Add annotatios to the NodeInfo to use later in expander.
// Add annotations to the NodeInfo to use later in expander.
nodeGroups := ctx.CloudProvider.NodeGroups()
for _, ng := range nodeGroups {
if nodeInfo, ok := nodeInfos[ng.Id()]; ok {

View File

@ -44,20 +44,22 @@ type cacheItem struct {
// MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups.
type MixedTemplateNodeInfoProvider struct {
nodeInfoCache map[string]cacheItem
ttl time.Duration
nodeInfoCache map[string]cacheItem
ttl time.Duration
forceDaemonSets bool
}
// NewMixedTemplateNodeInfoProvider returns a NodeInfoProvider processor building
// NodeInfos from real-world nodes when available, otherwise from node groups templates.
func NewMixedTemplateNodeInfoProvider(t *time.Duration) *MixedTemplateNodeInfoProvider {
func NewMixedTemplateNodeInfoProvider(t *time.Duration, forceDaemonSets bool) *MixedTemplateNodeInfoProvider {
ttl := maxCacheExpireTime
if t != nil {
ttl = *t
}
return &MixedTemplateNodeInfoProvider{
nodeInfoCache: make(map[string]cacheItem),
ttl: ttl,
nodeInfoCache: make(map[string]cacheItem),
ttl: ttl,
forceDaemonSets: forceDaemonSets,
}
}
@ -93,10 +95,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
id := nodeGroup.Id()
if _, found := result[id]; !found {
// Build nodeInfo.
nodeInfo, err := simulator.BuildNodeInfoForNode(node, podsForNodes)
if err != nil {
return false, "", err
}
nodeInfo, err := simulator.BuildNodeInfoForNode(node, podsForNodes[node.Name], daemonsets, p.forceDaemonSets)
sanitizedNodeInfo, err := utils.SanitizeNodeInfo(nodeInfo, id, ignoredTaints)
if err != nil {
return false, "", err

View File

@ -85,7 +85,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl, false).Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 5, len(res))
info, found := res["ng1"]
@ -112,7 +112,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err = NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
res, err = NewMixedTemplateNodeInfoProvider(&cacheTtl, false).Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(res))
}
@ -171,7 +171,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
ListerRegistry: registry,
},
}
niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl)
niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl, false)
res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
// Check results
@ -263,7 +263,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) {
tni := schedulerframework.NewNodeInfo()
tni.SetNode(tn)
// Cache expire time is set.
niProcessor1 := NewMixedTemplateNodeInfoProvider(&cacheTtl)
niProcessor1 := NewMixedTemplateNodeInfoProvider(&cacheTtl, false)
niProcessor1.nodeInfoCache = map[string]cacheItem{
"ng1": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
"ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
@ -277,7 +277,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) {
assert.Equal(t, 1, len(niProcessor1.nodeInfoCache))
// Cache expire time isn't set.
niProcessor2 := NewMixedTemplateNodeInfoProvider(nil)
niProcessor2 := NewMixedTemplateNodeInfoProvider(nil, false)
niProcessor2.nodeInfoCache = map[string]cacheItem{
"ng1": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
"ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)},

View File

@ -37,6 +37,6 @@ type TemplateNodeInfoProvider interface {
}
// NewDefaultTemplateNodeInfoProvider returns a default TemplateNodeInfoProvider.
func NewDefaultTemplateNodeInfoProvider(time *time.Duration) TemplateNodeInfoProvider {
return NewMixedTemplateNodeInfoProvider(time)
func NewDefaultTemplateNodeInfoProvider(time *time.Duration, forceDaemonSets bool) TemplateNodeInfoProvider {
return NewMixedTemplateNodeInfoProvider(time, forceDaemonSets)
}

View File

@ -86,7 +86,7 @@ func DefaultProcessors() *AutoscalingProcessors {
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
}
}

View File

@ -17,46 +17,56 @@ limitations under the License.
package simulator
import (
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
)
// getRequiredPodsForNode returns a list of pods that would appear on the node if the
// node was just created (like daemonset and manifest-run pods). It reuses kubectl
// drain command to get the list.
func getRequiredPodsForNode(nodename string, podsForNodes map[string][]*apiv1.Pod) ([]*apiv1.Pod, errors.AutoscalerError) {
allPods := podsForNodes[nodename]
return filterRequiredPodsForNode(allPods), nil
}
// BuildNodeInfoForNode build a NodeInfo structure for the given node as if the node was just created.
func BuildNodeInfoForNode(node *apiv1.Node, podsForNodes map[string][]*apiv1.Pod) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
requiredPods, err := getRequiredPodsForNode(node.Name, podsForNodes)
if err != nil {
return nil, err
}
result := schedulerframework.NewNodeInfo(requiredPods...)
result.SetNode(node)
return result, nil
func BuildNodeInfoForNode(node *apiv1.Node, scheduledPods []*apiv1.Pod, daemonsets []*appsv1.DaemonSet, forceDaemonSets bool) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
nodeInfo := schedulerframework.NewNodeInfo()
nodeInfo.SetNode(node)
return addExpectedPods(nodeInfo, scheduledPods, daemonsets, forceDaemonSets)
}
func filterRequiredPodsForNode(allPods []*apiv1.Pod) []*apiv1.Pod {
var selectedPods []*apiv1.Pod
for id, pod := range allPods {
// Ignore pod in deletion phase
func addExpectedPods(nodeInfo *schedulerframework.NodeInfo, scheduledPods []*apiv1.Pod, daemonsets []*appsv1.DaemonSet, forceDaemonSets bool) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
runningDS := make(map[types.UID]bool)
for _, pod := range scheduledPods {
// Ignore scheduled pods in deletion phase
if pod.DeletionTimestamp != nil {
continue
}
// Add scheduled mirror and DS pods
if pod_util.IsMirrorPod(pod) || pod_util.IsDaemonSetPod(pod) {
selectedPods = append(selectedPods, allPods[id])
nodeInfo.AddPod(pod)
}
// Mark DS pods as running
controllerRef := metav1.GetControllerOf(pod)
if controllerRef != nil && controllerRef.Kind == "DaemonSet" {
runningDS[controllerRef.UID] = true
}
}
return selectedPods
// Add all pending DS pods if force scheduling DS
if forceDaemonSets {
var pendingDS []*appsv1.DaemonSet
for _, ds := range daemonsets {
if !runningDS[ds.UID] {
pendingDS = append(pendingDS, ds)
}
}
daemonPods, err := daemonset.GetDaemonSetPodsForNode(nodeInfo, pendingDS)
if err != nil {
return nil, errors.ToAutoscalerError(errors.InternalError, err)
}
for _, pod := range daemonPods {
nodeInfo.AddPod(pod)
}
}
return nodeInfo, nil
}

View File

@ -17,176 +17,223 @@ limitations under the License.
package simulator
import (
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/kubelet/types"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/kubernetes/pkg/controller/daemon"
)
func TestRequiredPodsForNode(t *testing.T) {
nodeName1 := "node1"
nodeName2 := "node2"
pod1 := &apiv1.Pod{
func TestBuildNodeInfoForNode(t *testing.T) {
ds1 := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
SelfLink: "pod1",
},
Spec: apiv1.PodSpec{
NodeName: nodeName1,
},
}
// Manifest pod.
pod2 := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "kube-system",
SelfLink: "pod2",
Annotations: map[string]string{
types.ConfigMirrorAnnotationKey: "something",
},
},
Spec: apiv1.PodSpec{
NodeName: nodeName1,
},
}
pod3 := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "kube-system",
SelfLink: "pod2",
Annotations: map[string]string{
types.ConfigMirrorAnnotationKey: "something",
},
},
Spec: apiv1.PodSpec{
NodeName: nodeName2,
Name: "ds1",
Namespace: "ds1-namespace",
UID: types.UID("ds1"),
},
}
podsForNodes := map[string][]*apiv1.Pod{nodeName1: {pod1, pod2}, nodeName2: {pod3}}
pods, err := getRequiredPodsForNode(nodeName1, podsForNodes)
assert.NoError(t, err)
assert.Equal(t, 1, len(pods))
assert.Equal(t, "pod2", pods[0].Name)
}
func Test_filterRequiredPodsForNode(t *testing.T) {
nodeName := "node1"
pod1 := &apiv1.Pod{
ds2 := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
SelfLink: "pod1",
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
},
}
// Manifest pod.
mirrorPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "mirrorPod",
Namespace: "kube-system",
SelfLink: "mirrorPod",
Annotations: map[string]string{
types.ConfigMirrorAnnotationKey: "something",
},
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
},
}
now := metav1.NewTime(time.Now())
podDeleted := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "podDeleted",
SelfLink: "podDeleted",
Annotations: map[string]string{
types.ConfigMirrorAnnotationKey: "something",
},
DeletionTimestamp: &now,
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
Name: "ds2",
Namespace: "ds2-namespace",
UID: types.UID("ds2"),
},
}
podDaemonset := &apiv1.Pod{
ds3 := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "podDaemonset",
SelfLink: "podDaemonset",
OwnerReferences: GenerateOwnerReferences("ds", "DaemonSet", "apps/v1", ""),
Annotations: map[string]string{
types.ConfigSourceAnnotationKey: types.FileSource,
},
Name: "ds3",
Namespace: "ds3-namespace",
UID: types.UID("ds3"),
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
Spec: appsv1.DaemonSetSpec{
Template: apiv1.PodTemplateSpec{
Spec: apiv1.PodSpec{
NodeSelector: map[string]string{"key": "value"},
},
},
},
}
podDaemonsetAnnotation := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "podDaemonset2",
SelfLink: "podDaemonset2",
OwnerReferences: GenerateOwnerReferences("ds2", "CustomDaemonset", "crd/v1", ""),
Annotations: map[string]string{
pod_util.DaemonSetPodAnnotationKey: "true",
},
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
},
}
testCases := []struct {
name string
node *apiv1.Node
pods []*apiv1.Pod
daemonSets []*appsv1.DaemonSet
forceDS bool
tests := []struct {
name string
inputPods []*apiv1.Pod
want []*apiv1.Pod
wantPods []*apiv1.Pod
wantError bool
}{
{
name: "nil input pod list",
inputPods: nil,
want: []*apiv1.Pod{},
name: "node without any pods",
node: test.BuildTestNode("n", 1000, 10),
},
{
name: "should return only mirrorPod",
inputPods: []*apiv1.Pod{pod1, mirrorPod},
want: []*apiv1.Pod{mirrorPod},
name: "node with non-DS/mirror pods",
node: test.BuildTestNode("n", 1000, 10),
pods: []*apiv1.Pod{
test.BuildScheduledTestPod("p1", 100, 1, "n"),
test.BuildScheduledTestPod("p2", 100, 1, "n"),
},
},
{
name: "should ignore podDeleted",
inputPods: []*apiv1.Pod{pod1, mirrorPod, podDeleted},
want: []*apiv1.Pod{mirrorPod},
name: "node with a mirror pod",
node: test.BuildTestNode("n", 1000, 10),
pods: []*apiv1.Pod{
test.SetMirrorPodSpec(test.BuildScheduledTestPod("p1", 100, 1, "n")),
},
wantPods: []*apiv1.Pod{
test.SetMirrorPodSpec(test.BuildScheduledTestPod("p1", 100, 1, "n")),
},
},
{
name: "should return daemonset pod",
inputPods: []*apiv1.Pod{pod1, podDaemonset},
want: []*apiv1.Pod{podDaemonset},
name: "node with a deleted mirror pod",
node: test.BuildTestNode("n", 1000, 10),
pods: []*apiv1.Pod{
test.SetMirrorPodSpec(test.BuildScheduledTestPod("p1", 100, 1, "n")),
setDeletionTimestamp(test.SetMirrorPodSpec(test.BuildScheduledTestPod("p2", 100, 1, "n"))),
},
wantPods: []*apiv1.Pod{
test.SetMirrorPodSpec(test.BuildScheduledTestPod("p1", 100, 1, "n")),
},
},
{
name: "should return daemonset pods with",
inputPods: []*apiv1.Pod{pod1, podDaemonset, podDaemonsetAnnotation},
want: []*apiv1.Pod{podDaemonset, podDaemonsetAnnotation},
name: "node with DS pods [forceDS=false, no daemon sets]",
node: test.BuildTestNode("n", 1000, 10),
pods: []*apiv1.Pod{
buildDSPod(ds1, "n"),
setDeletionTimestamp(buildDSPod(ds2, "n")),
},
wantPods: []*apiv1.Pod{
buildDSPod(ds1, "n"),
},
},
{
name: "node with DS pods [forceDS=false, some daemon sets]",
node: test.BuildTestNode("n", 1000, 10),
pods: []*apiv1.Pod{
buildDSPod(ds1, "n"),
setDeletionTimestamp(buildDSPod(ds2, "n")),
},
daemonSets: []*appsv1.DaemonSet{ds1, ds2, ds3},
wantPods: []*apiv1.Pod{
buildDSPod(ds1, "n"),
},
},
{
name: "node with a DS pod [forceDS=true, no daemon sets]",
node: test.BuildTestNode("n", 1000, 10),
pods: []*apiv1.Pod{
buildDSPod(ds1, "n"),
setDeletionTimestamp(buildDSPod(ds2, "n")),
},
wantPods: []*apiv1.Pod{
buildDSPod(ds1, "n"),
},
forceDS: true,
},
{
name: "node with a DS pod [forceDS=true, some daemon sets]",
node: test.BuildTestNode("n", 1000, 10),
pods: []*apiv1.Pod{
buildDSPod(ds1, "n"),
setDeletionTimestamp(buildDSPod(ds2, "n")),
},
daemonSets: []*appsv1.DaemonSet{ds1, ds2, ds3},
forceDS: true,
wantPods: []*apiv1.Pod{
buildDSPod(ds1, "n"),
buildDSPod(ds2, "n"),
},
},
{
name: "everything together [forceDS=false]",
node: test.BuildTestNode("n", 1000, 10),
pods: []*apiv1.Pod{
test.BuildScheduledTestPod("p1", 100, 1, "n"),
test.BuildScheduledTestPod("p2", 100, 1, "n"),
test.SetMirrorPodSpec(test.BuildScheduledTestPod("p3", 100, 1, "n")),
setDeletionTimestamp(test.SetMirrorPodSpec(test.BuildScheduledTestPod("p4", 100, 1, "n"))),
buildDSPod(ds1, "n"),
setDeletionTimestamp(buildDSPod(ds2, "n")),
},
daemonSets: []*appsv1.DaemonSet{ds1, ds2, ds3},
wantPods: []*apiv1.Pod{
test.SetMirrorPodSpec(test.BuildScheduledTestPod("p3", 100, 1, "n")),
buildDSPod(ds1, "n"),
},
},
{
name: "everything together [forceDS=true]",
node: test.BuildTestNode("n", 1000, 10),
pods: []*apiv1.Pod{
test.BuildScheduledTestPod("p1", 100, 1, "n"),
test.BuildScheduledTestPod("p2", 100, 1, "n"),
test.SetMirrorPodSpec(test.BuildScheduledTestPod("p3", 100, 1, "n")),
setDeletionTimestamp(test.SetMirrorPodSpec(test.BuildScheduledTestPod("p4", 100, 1, "n"))),
buildDSPod(ds1, "n"),
setDeletionTimestamp(buildDSPod(ds2, "n")),
},
daemonSets: []*appsv1.DaemonSet{ds1, ds2, ds3},
forceDS: true,
wantPods: []*apiv1.Pod{
test.SetMirrorPodSpec(test.BuildScheduledTestPod("p3", 100, 1, "n")),
buildDSPod(ds1, "n"),
buildDSPod(ds2, "n"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := filterRequiredPodsForNode(tt.inputPods); !apiequality.Semantic.DeepEqual(got, tt.want) {
t.Errorf("filterRequiredPodsForNode() = %v, want %v", got, tt.want)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
nodeInfo, err := BuildNodeInfoForNode(tc.node, tc.pods, tc.daemonSets, tc.forceDS)
if tc.wantError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, nodeInfo.Node(), tc.node)
// clean pod metadata for comparison purposes
var wantPods, pods []*apiv1.Pod
for _, pod := range tc.wantPods {
wantPods = append(wantPods, cleanPodMetadata(pod))
}
for _, podInfo := range nodeInfo.Pods {
pods = append(pods, cleanPodMetadata(podInfo.Pod))
}
assert.ElementsMatch(t, tc.wantPods, pods)
}
})
}
}
func cleanPodMetadata(pod *apiv1.Pod) *apiv1.Pod {
pod.Name = strings.Split(pod.Name, "-")[0]
pod.OwnerReferences = nil
return pod
}
func buildDSPod(ds *appsv1.DaemonSet, nodeName string) *apiv1.Pod {
pod := daemon.NewPod(ds, nodeName)
pod.Name = ds.Name
ptrVal := true
pod.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
{Kind: "DaemonSet", UID: ds.UID, Controller: &ptrVal},
}
return pod
}
func setDeletionTimestamp(pod *apiv1.Pod) *apiv1.Pod {
now := metav1.NewTime(time.Now())
pod.DeletionTimestamp = &now
return pod
}