Add ProvisioningRequestPodsFilter processor (#6386)
* Introduce ProvisioningRequestPodsFilter processor * Review
This commit is contained in:
		
							parent
							
								
									f4891e3602
								
							
						
					
					
						commit
						d29ffd03b9
					
				| 
						 | 
				
			
			@ -281,6 +281,8 @@ type AutoscalingOptions struct {
 | 
			
		|||
	DynamicNodeDeleteDelayAfterTaintEnabled bool
 | 
			
		||||
	// BypassedSchedulers are used to specify which schedulers to bypass their processing
 | 
			
		||||
	BypassedSchedulers map[string]bool
 | 
			
		||||
	// ProvisioningRequestEnabled tells if CA processes ProvisioningRequest.
 | 
			
		||||
	ProvisioningRequestEnabled bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// KubeClientOptions specify options for kube client
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,44 +17,18 @@ limitations under the License.
 | 
			
		|||
package podlistprocessor
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	apiv1 "k8s.io/api/core/v1"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/context"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type defaultPodListProcessor struct {
 | 
			
		||||
	processors []pods.PodListProcessor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewDefaultPodListProcessor returns a default implementation of the pod list
 | 
			
		||||
// processor, which wraps and sequentially runs other sub-processors.
 | 
			
		||||
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor {
 | 
			
		||||
	return &defaultPodListProcessor{
 | 
			
		||||
		processors: []pods.PodListProcessor{
 | 
			
		||||
			NewClearTPURequestsPodListProcessor(),
 | 
			
		||||
			NewFilterOutExpendablePodListProcessor(),
 | 
			
		||||
			NewCurrentlyDrainedNodesPodListProcessor(),
 | 
			
		||||
			NewFilterOutSchedulablePodListProcessor(predicateChecker),
 | 
			
		||||
			NewFilterOutDaemonSetPodListProcessor(),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Process runs sub-processors sequentially
 | 
			
		||||
func (p *defaultPodListProcessor) Process(ctx *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
 | 
			
		||||
	var err error
 | 
			
		||||
	for _, processor := range p.processors {
 | 
			
		||||
		unschedulablePods, err = processor.Process(ctx, unschedulablePods)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return unschedulablePods, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *defaultPodListProcessor) CleanUp() {
 | 
			
		||||
	for _, processor := range p.processors {
 | 
			
		||||
		processor.CleanUp()
 | 
			
		||||
	}
 | 
			
		||||
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *pods.CombinedPodListProcessor {
 | 
			
		||||
	return pods.NewCombinedPodListProcessor([]pods.PodListProcessor{
 | 
			
		||||
		NewClearTPURequestsPodListProcessor(),
 | 
			
		||||
		NewFilterOutExpendablePodListProcessor(),
 | 
			
		||||
		NewCurrentlyDrainedNodesPodListProcessor(),
 | 
			
		||||
		NewFilterOutSchedulablePodListProcessor(predicateChecker),
 | 
			
		||||
		NewFilterOutDaemonSetPodListProcessor(),
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -51,6 +51,7 @@ import (
 | 
			
		|||
	ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates"
 | 
			
		||||
| 
						 | 
				
			
			@ -250,6 +251,7 @@ var (
 | 
			
		|||
			"--max-graceful-termination-sec flag should not be set when this flag is set. Not setting this flag will use unordered evictor by default."+
 | 
			
		||||
			"Priority evictor reuses the concepts of drain logic in kubelet(https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2712-pod-priority-based-graceful-node-shutdown#migration-from-the-node-graceful-shutdown-feature)."+
 | 
			
		||||
			"Eg. flag usage:  '10000:20,1000:100,0:60'")
 | 
			
		||||
	provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func isFlagPassed(name string) bool {
 | 
			
		||||
| 
						 | 
				
			
			@ -420,6 +422,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
 | 
			
		|||
		},
 | 
			
		||||
		DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled,
 | 
			
		||||
		BypassedSchedulers:                      scheduler_util.GetBypassedSchedulersMap(*bypassedSchedulers),
 | 
			
		||||
		ProvisioningRequestEnabled:              *provisioningRequestsEnabled,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -475,7 +478,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
 | 
			
		|||
 | 
			
		||||
	opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
 | 
			
		||||
	opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
 | 
			
		||||
	opts.Processors.PodListProcessor = podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)
 | 
			
		||||
	podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)
 | 
			
		||||
	if autoscalingOptions.ProvisioningRequestEnabled {
 | 
			
		||||
		podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))
 | 
			
		||||
	}
 | 
			
		||||
	opts.Processors.PodListProcessor = podListProcessor
 | 
			
		||||
	scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
 | 
			
		||||
	if autoscalingOptions.ParallelDrain {
 | 
			
		||||
		sdCandidatesSorting := previouscandidates.NewPreviousCandidates()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -48,3 +48,37 @@ func (p *NoOpPodListProcessor) Process(
 | 
			
		|||
// CleanUp cleans up the processor's internal structures.
 | 
			
		||||
func (p *NoOpPodListProcessor) CleanUp() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// CombinedPodListProcessor is a list of PodListProcessors
 | 
			
		||||
type CombinedPodListProcessor struct {
 | 
			
		||||
	processors []PodListProcessor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewCombinedPodListProcessor construct CombinedPodListProcessor.
 | 
			
		||||
func NewCombinedPodListProcessor(processors []PodListProcessor) *CombinedPodListProcessor {
 | 
			
		||||
	return &CombinedPodListProcessor{processors}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddProcessor append processor to the list.
 | 
			
		||||
func (p *CombinedPodListProcessor) AddProcessor(processor PodListProcessor) {
 | 
			
		||||
	p.processors = append(p.processors, processor)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Process runs sub-processors sequentially
 | 
			
		||||
func (p *CombinedPodListProcessor) Process(ctx *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
 | 
			
		||||
	var err error
 | 
			
		||||
	for _, processor := range p.processors {
 | 
			
		||||
		unschedulablePods, err = processor.Process(ctx, unschedulablePods)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return unschedulablePods, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// CleanUp cleans up the processor's internal structures.
 | 
			
		||||
func (p *CombinedPodListProcessor) CleanUp() {
 | 
			
		||||
	for _, processor := range p.processors {
 | 
			
		||||
		processor.CleanUp()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,106 @@
 | 
			
		|||
/*
 | 
			
		||||
Copyright 2023 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package provreq
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	apiv1 "k8s.io/api/core/v1"
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/context"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/utils/klogx"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	provisioningRequestPodAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request"
 | 
			
		||||
	maxProvReqEvent                     = 50
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// EventManager is an interface for handling events for provisioning request.
 | 
			
		||||
type EventManager interface {
 | 
			
		||||
	LogIgnoredInScaleUpEvent(context *context.AutoscalingContext, now time.Time, pod *apiv1.Pod, prName string)
 | 
			
		||||
	Reset()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type defaultEventManager struct {
 | 
			
		||||
	loggedEvents int
 | 
			
		||||
	limit        int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewDefautlEventManager return basic event manager.
 | 
			
		||||
func NewDefautlEventManager() *defaultEventManager {
 | 
			
		||||
	return &defaultEventManager{limit: maxProvReqEvent}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// LogIgnoredInScaleUpEvent adds event about ignored scale up for unscheduled pod, that consumes Provisioning Request.
 | 
			
		||||
func (e *defaultEventManager) LogIgnoredInScaleUpEvent(context *context.AutoscalingContext, now time.Time, pod *apiv1.Pod, prName string) {
 | 
			
		||||
	message := fmt.Sprintf("Unschedulable pod didn't trigger scale-up, because it's consuming ProvisioningRequest %s/%s", pod.Namespace, prName)
 | 
			
		||||
	if e.loggedEvents < e.limit {
 | 
			
		||||
		context.Recorder.Event(pod, apiv1.EventTypeNormal, "", message)
 | 
			
		||||
		e.loggedEvents++
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Reset resets event manager internal structure. It will be called once before handling all pods.
 | 
			
		||||
func (e *defaultEventManager) Reset() {
 | 
			
		||||
	e.loggedEvents = 0
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ProvisioningRequestPodsFilter filter out pods that consumes Provisioning Request
 | 
			
		||||
type ProvisioningRequestPodsFilter struct {
 | 
			
		||||
	eventManager EventManager
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Process filters out all pods that are consuming a Provisioning Request from unschedulable pods list.
 | 
			
		||||
func (p *ProvisioningRequestPodsFilter) Process(
 | 
			
		||||
	context *context.AutoscalingContext,
 | 
			
		||||
	unschedulablePods []*apiv1.Pod,
 | 
			
		||||
) ([]*apiv1.Pod, error) {
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	p.eventManager.Reset()
 | 
			
		||||
	loggingQuota := klogx.PodsLoggingQuota()
 | 
			
		||||
	result := make([]*apiv1.Pod, 0, len(unschedulablePods))
 | 
			
		||||
	for _, pod := range unschedulablePods {
 | 
			
		||||
		prName, found := provisioningRequestName(pod)
 | 
			
		||||
		if !found {
 | 
			
		||||
			result = append(result, pod)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		klogx.V(1).UpTo(loggingQuota).Infof("Ignoring unschedulable pod %s/%s as it consumes ProvisioningRequest: %s/%s", pod.Namespace, pod.Name, pod.Namespace, prName)
 | 
			
		||||
		p.eventManager.LogIgnoredInScaleUpEvent(context, now, pod, prName)
 | 
			
		||||
	}
 | 
			
		||||
	klogx.V(1).Over(loggingQuota).Infof("There are also %v other pods which were ignored", -loggingQuota.Left())
 | 
			
		||||
	return result, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// CleanUp cleans up the processor's internal structures.
 | 
			
		||||
func (p *ProvisioningRequestPodsFilter) CleanUp() {}
 | 
			
		||||
 | 
			
		||||
// NewProvisioningRequestPodsFilter creates a ProvisioningRequest filter processor.
 | 
			
		||||
func NewProvisioningRequestPodsFilter(e EventManager) pods.PodListProcessor {
 | 
			
		||||
	return &ProvisioningRequestPodsFilter{e}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func provisioningRequestName(pod *v1.Pod) (string, bool) {
 | 
			
		||||
	if pod == nil || pod.Annotations == nil {
 | 
			
		||||
		return "", false
 | 
			
		||||
	}
 | 
			
		||||
	provReqName, found := pod.Annotations[provisioningRequestPodAnnotationKey]
 | 
			
		||||
	return provReqName, found
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,117 @@
 | 
			
		|||
/*
 | 
			
		||||
Copyright 2023 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package provreq
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
	apiv1 "k8s.io/api/core/v1"
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/context"
 | 
			
		||||
	. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestProvisioningRequestPodsFilter(t *testing.T) {
 | 
			
		||||
	prPod1 := BuildTestPod("pr-pod-1", 500, 10)
 | 
			
		||||
	prPod1.Annotations[provisioningRequestPodAnnotationKey] = "pr-class"
 | 
			
		||||
 | 
			
		||||
	prPod2 := BuildTestPod("pr-pod-2", 500, 10)
 | 
			
		||||
	prPod2.Annotations[provisioningRequestPodAnnotationKey] = "pr-class-2"
 | 
			
		||||
 | 
			
		||||
	pod1 := BuildTestPod("pod-1", 500, 10)
 | 
			
		||||
	pod2 := BuildTestPod("pod-2", 500, 10)
 | 
			
		||||
 | 
			
		||||
	testCases := map[string]struct {
 | 
			
		||||
		unschedulableCandidates []*apiv1.Pod
 | 
			
		||||
		expectedUnscheduledPods []*apiv1.Pod
 | 
			
		||||
	}{
 | 
			
		||||
		"ProvisioningRequest consumer is filtered out": {
 | 
			
		||||
			unschedulableCandidates: []*v1.Pod{prPod1, pod1},
 | 
			
		||||
			expectedUnscheduledPods: []*v1.Pod{pod1},
 | 
			
		||||
		},
 | 
			
		||||
		"Different ProvisioningRequest consumers are filtered out": {
 | 
			
		||||
			unschedulableCandidates: []*v1.Pod{prPod1, prPod2, pod1},
 | 
			
		||||
			expectedUnscheduledPods: []*v1.Pod{pod1},
 | 
			
		||||
		},
 | 
			
		||||
		"No pod is filtered": {
 | 
			
		||||
			unschedulableCandidates: []*v1.Pod{pod1, pod2},
 | 
			
		||||
			expectedUnscheduledPods: []*v1.Pod{pod1, pod2},
 | 
			
		||||
		},
 | 
			
		||||
		"Empty unschedulable pods list": {
 | 
			
		||||
			unschedulableCandidates: []*v1.Pod{},
 | 
			
		||||
			expectedUnscheduledPods: []*v1.Pod{},
 | 
			
		||||
		},
 | 
			
		||||
		"All ProvisioningRequest consumers are filtered out": {
 | 
			
		||||
			unschedulableCandidates: []*v1.Pod{prPod1, prPod2},
 | 
			
		||||
			expectedUnscheduledPods: []*v1.Pod{},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	for _, test := range testCases {
 | 
			
		||||
		eventRecorder := record.NewFakeRecorder(10)
 | 
			
		||||
		ctx := &context.AutoscalingContext{AutoscalingKubeClients: context.AutoscalingKubeClients{Recorder: eventRecorder}}
 | 
			
		||||
		filter := NewProvisioningRequestPodsFilter(NewDefautlEventManager())
 | 
			
		||||
		got, _ := filter.Process(ctx, test.unschedulableCandidates)
 | 
			
		||||
		assert.ElementsMatch(t, got, test.expectedUnscheduledPods)
 | 
			
		||||
		if len(test.expectedUnscheduledPods) < len(test.expectedUnscheduledPods) {
 | 
			
		||||
			select {
 | 
			
		||||
			case event := <-eventRecorder.Events:
 | 
			
		||||
				assert.Contains(t, event, "Unschedulable pod didn't trigger scale-up, because it's consuming ProvisioningRequest default/pr-class")
 | 
			
		||||
			case <-time.After(1 * time.Second):
 | 
			
		||||
				t.Errorf("Timeout waiting for event")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestEventManager(t *testing.T) {
 | 
			
		||||
	eventLimit := 5
 | 
			
		||||
	eventManager := &defaultEventManager{limit: eventLimit}
 | 
			
		||||
	prFilter := NewProvisioningRequestPodsFilter(eventManager)
 | 
			
		||||
	eventRecorder := record.NewFakeRecorder(10)
 | 
			
		||||
	ctx := &context.AutoscalingContext{AutoscalingKubeClients: context.AutoscalingKubeClients{Recorder: eventRecorder}}
 | 
			
		||||
	unscheduledPods := []*v1.Pod{BuildTestPod("pod", 500, 10)}
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < 10; i++ {
 | 
			
		||||
		prPod := BuildTestPod(fmt.Sprintf("pr-pod-%d", i), 10, 10)
 | 
			
		||||
		prPod.Annotations[provisioningRequestPodAnnotationKey] = "pr-class"
 | 
			
		||||
		unscheduledPods = append(unscheduledPods, prPod)
 | 
			
		||||
	}
 | 
			
		||||
	got, err := prFilter.Process(ctx, unscheduledPods)
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
	if len(got) != 1 {
 | 
			
		||||
		t.Errorf("Want 1 unschedulable pod, got: %v", got)
 | 
			
		||||
	}
 | 
			
		||||
	assert.Equal(t, eventManager.loggedEvents, eventLimit)
 | 
			
		||||
	for i := 0; i < eventLimit; i++ {
 | 
			
		||||
		select {
 | 
			
		||||
		case event := <-eventRecorder.Events:
 | 
			
		||||
			assert.Contains(t, event, "Unschedulable pod didn't trigger scale-up, because it's consuming ProvisioningRequest default/pr-class")
 | 
			
		||||
		case <-time.After(1 * time.Second):
 | 
			
		||||
			t.Errorf("Timeout waiting for event")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	select {
 | 
			
		||||
	case <-eventRecorder.Events:
 | 
			
		||||
		t.Errorf("Receive event after reaching event limit")
 | 
			
		||||
	case <-time.After(1 * time.Millisecond):
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
 | 
			
		|||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package v1beta1client
 | 
			
		||||
package provreqclient
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
| 
						 | 
				
			
			@ -28,6 +28,7 @@ import (
 | 
			
		|||
	"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/client/clientset/versioned"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/client/informers/externalversions"
 | 
			
		||||
	listers "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/client/listers/autoscaling.x-k8s.io/v1beta1"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
 | 
			
		||||
	"k8s.io/client-go/informers"
 | 
			
		||||
	"k8s.io/client-go/kubernetes"
 | 
			
		||||
	v1 "k8s.io/client-go/listers/core/v1"
 | 
			
		||||
| 
						 | 
				
			
			@ -40,15 +41,15 @@ const (
 | 
			
		|||
	provisioningRequestClientCallTimeout = 4 * time.Second
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// ProvisioningRequestClient represents client for v1beta1 ProvReq CRD.
 | 
			
		||||
type ProvisioningRequestClient struct {
 | 
			
		||||
// ProvisioningRequestClientV1beta1 represents client for v1beta1 ProvReq CRD.
 | 
			
		||||
type ProvisioningRequestClientV1beta1 struct {
 | 
			
		||||
	client         versioned.Interface
 | 
			
		||||
	provReqLister  listers.ProvisioningRequestLister
 | 
			
		||||
	podTemplLister v1.PodTemplateLister
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewProvisioningRequestClient configures and returns a provisioningRequestClient.
 | 
			
		||||
func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClient, error) {
 | 
			
		||||
func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClientV1beta1, error) {
 | 
			
		||||
	prClient, err := newPRClient(kubeConfig)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("Failed to create Provisioning Request client: %v", err)
 | 
			
		||||
| 
						 | 
				
			
			@ -69,7 +70,7 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest
 | 
			
		|||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &ProvisioningRequestClient{
 | 
			
		||||
	return &ProvisioningRequestClientV1beta1{
 | 
			
		||||
		client:         prClient,
 | 
			
		||||
		provReqLister:  provReqLister,
 | 
			
		||||
		podTemplLister: podTemplLister,
 | 
			
		||||
| 
						 | 
				
			
			@ -77,21 +78,37 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
// ProvisioningRequest gets a specific ProvisioningRequest CR.
 | 
			
		||||
func (c *ProvisioningRequestClient) ProvisioningRequest(namespace, name string) (*v1beta1.ProvisioningRequest, error) {
 | 
			
		||||
	return c.provReqLister.ProvisioningRequests(namespace).Get(name)
 | 
			
		||||
func (c *ProvisioningRequestClientV1beta1) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) {
 | 
			
		||||
	v1Beta1PR, err := c.provReqLister.ProvisioningRequests(namespace).Get(name)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	podTemplates, err := c.FetchPodTemplates(v1Beta1PR)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("while fetching pod templates for Get Provisioning Request %s/%s got error: %v", namespace, name, err)
 | 
			
		||||
	}
 | 
			
		||||
	return provreqwrapper.NewV1Beta1ProvisioningRequest(v1Beta1PR, podTemplates), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ProvisioningRequests gets all ProvisioningRequest CRs.
 | 
			
		||||
func (c *ProvisioningRequestClient) ProvisioningRequests() ([]*v1beta1.ProvisioningRequest, error) {
 | 
			
		||||
	provisioningRequests, err := c.provReqLister.List(labels.Everything())
 | 
			
		||||
func (c *ProvisioningRequestClientV1beta1) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) {
 | 
			
		||||
	v1Beta1PRs, err := c.provReqLister.List(labels.Everything())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("error fetching provisioningRequests: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
	return provisioningRequests, nil
 | 
			
		||||
	prs := make([]*provreqwrapper.ProvisioningRequest, 0, len(v1Beta1PRs))
 | 
			
		||||
	for _, v1Beta1PR := range v1Beta1PRs {
 | 
			
		||||
		podTemplates, errPodTemplates := c.FetchPodTemplates(v1Beta1PR)
 | 
			
		||||
		if errPodTemplates != nil {
 | 
			
		||||
			return nil, fmt.Errorf("while fetching pod templates for List Provisioning Request %s/%s got error: %v", v1Beta1PR.Namespace, v1Beta1PR.Name, errPodTemplates)
 | 
			
		||||
		}
 | 
			
		||||
		prs = append(prs, provreqwrapper.NewV1Beta1ProvisioningRequest(v1Beta1PR, podTemplates))
 | 
			
		||||
	}
 | 
			
		||||
	return prs, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// FetchPodTemplates fetches PodTemplates referenced by the Provisioning Request.
 | 
			
		||||
func (c *ProvisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) {
 | 
			
		||||
func (c *ProvisioningRequestClientV1beta1) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) {
 | 
			
		||||
	podTemplates := make([]*apiv1.PodTemplate, 0, len(pr.Spec.PodSets))
 | 
			
		||||
	for _, podSpec := range pr.Spec.PodSets {
 | 
			
		||||
		podTemplate, err := c.podTemplLister.PodTemplates(pr.Namespace).Get(podSpec.PodTemplateRef.Name)
 | 
			
		||||
| 
						 | 
				
			
			@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
 | 
			
		|||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package v1beta1client
 | 
			
		||||
package provreqclient
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
| 
						 | 
				
			
			@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
 | 
			
		|||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package v1beta1client
 | 
			
		||||
package provreqclient
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
| 
						 | 
				
			
			@ -35,7 +35,7 @@ import (
 | 
			
		|||
)
 | 
			
		||||
 | 
			
		||||
// NewFakeProvisioningRequestClient mock ProvisioningRequestClient for tests.
 | 
			
		||||
func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) (*ProvisioningRequestClient, *FakeProvisioningRequestForceClient) {
 | 
			
		||||
func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) (*ProvisioningRequestClientV1beta1, *FakeProvisioningRequestForceClient) {
 | 
			
		||||
	t.Helper()
 | 
			
		||||
	provReqClient := fake.NewSimpleClientset()
 | 
			
		||||
	podTemplClient := fake_kubernetes.NewSimpleClientset()
 | 
			
		||||
| 
						 | 
				
			
			@ -60,7 +60,7 @@ func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to create Provisioning Request lister. Error was: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	return &ProvisioningRequestClient{
 | 
			
		||||
	return &ProvisioningRequestClientV1beta1{
 | 
			
		||||
			client:         provReqClient,
 | 
			
		||||
			provReqLister:  provReqLister,
 | 
			
		||||
			podTemplLister: podTemplLister,
 | 
			
		||||
| 
						 | 
				
			
			@ -1,72 +0,0 @@
 | 
			
		|||
/*
 | 
			
		||||
Copyright 2023 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package provreqservice
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
 | 
			
		||||
	"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/service/v1beta1client"
 | 
			
		||||
	"k8s.io/client-go/rest"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// ProvisioningRequestService represents the service that is able to list,
 | 
			
		||||
// access and delete different Provisioning Requests.
 | 
			
		||||
type ProvisioningRequestService struct {
 | 
			
		||||
	provReqV1Beta1Client *v1beta1client.ProvisioningRequestClient
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewProvisioningRequestService returns new service for interacting with ProvisioningRequests.
 | 
			
		||||
func NewProvisioningRequestService(kubeConfig *rest.Config) (*ProvisioningRequestService, error) {
 | 
			
		||||
	v1Beta1Client, err := v1beta1client.NewProvisioningRequestClient(kubeConfig)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return &ProvisioningRequestService{
 | 
			
		||||
		provReqV1Beta1Client: v1Beta1Client,
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ProvisioningRequest gets a specific ProvisioningRequest CR.
 | 
			
		||||
func (s *ProvisioningRequestService) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) {
 | 
			
		||||
	v1Beta1PR, err := s.provReqV1Beta1Client.ProvisioningRequest(namespace, name)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		podTemplates, errPodTemplates := s.provReqV1Beta1Client.FetchPodTemplates(v1Beta1PR)
 | 
			
		||||
		if errPodTemplates != nil {
 | 
			
		||||
			return nil, fmt.Errorf("while fetching pod templates for Get Provisioning Request %s/%s got error: %v", namespace, name, errPodTemplates)
 | 
			
		||||
		}
 | 
			
		||||
		return provreqwrapper.NewV1Beta1ProvisioningRequest(v1Beta1PR, podTemplates), nil
 | 
			
		||||
	}
 | 
			
		||||
	return nil, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ProvisioningRequests gets all Queued ProvisioningRequest CRs.
 | 
			
		||||
func (s *ProvisioningRequestService) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) {
 | 
			
		||||
	v1Beta1PRs, err := s.provReqV1Beta1Client.ProvisioningRequests()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	prs := make([]*provreqwrapper.ProvisioningRequest, 0, len(v1Beta1PRs))
 | 
			
		||||
	for _, v1Beta1PR := range v1Beta1PRs {
 | 
			
		||||
		podTemplates, errPodTemplates := s.provReqV1Beta1Client.FetchPodTemplates(v1Beta1PR)
 | 
			
		||||
		if errPodTemplates != nil {
 | 
			
		||||
			return nil, fmt.Errorf("while fetching pod templates for List Provisioning Request %s/%s got error: %v", v1Beta1PR.Namespace, v1Beta1PR.Name, errPodTemplates)
 | 
			
		||||
		}
 | 
			
		||||
		prs = append(prs, provreqwrapper.NewV1Beta1ProvisioningRequest(v1Beta1PR, podTemplates))
 | 
			
		||||
	}
 | 
			
		||||
	return prs, nil
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue