CA: extend ClusterSnapshot interface with predicate-checking methods
To handle DRA properly, scheduling predicates will need to be run whenever Pods are scheduled in the snapshot. PredicateChecker always needs a ClusterSnapshot to work, and ClusterSnapshot scheduling methods need to run the predicates first. So it makes most sense to have PredicateChecker be a dependency for ClusterSnapshot implementations, and move the PredicateChecker methods to ClusterSnapshot. This commit mirrors PredicateChecker methods in ClusterSnapshot (with the exception of FitsAnyNode which isn't used anywhere and is trivial to do via FitsAnyNodeMatching). Further commits will remove the PredicateChecker interface and move the implementation under clustersnapshot. Dummy methods are added to current ClusterSnapshot implementations to get the tests to pass. Further commits will actually implement them. PredicateError is refactored into a broader SchedulingError so that the ClusterSnapshot methods can return a single error that the callers can use to distinguish between a failing predicate and other, unexpected errors.
This commit is contained in:
parent
a35f830f1d
commit
ce185226d1
|
@ -586,7 +586,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
|
|||
eg.Schedulable = true
|
||||
eg.SchedulableGroups = append(eg.SchedulableGroups, nodeGroup.Id())
|
||||
} else {
|
||||
klog.V(2).Infof("Pod %s/%s can't be scheduled on %s, predicate checking error: %v", samplePod.Namespace, samplePod.Name, nodeGroup.Id(), err.VerboseMessage())
|
||||
klog.V(2).Infof("Pod %s/%s can't be scheduled on %s, predicate checking error: %v", samplePod.Namespace, samplePod.Name, nodeGroup.Id(), err)
|
||||
if podCount := len(eg.Pods); podCount > 1 {
|
||||
klog.V(2).Infof("%d other pods similar to %s can't be scheduled on %s", podCount-1, samplePod.Name, nodeGroup.Id())
|
||||
}
|
||||
|
|
|
@ -31,6 +31,26 @@ type BasicClusterSnapshot struct {
|
|||
data []*internalBasicSnapshotData
|
||||
}
|
||||
|
||||
func (snapshot *BasicClusterSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) SchedulingError {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (snapshot *BasicClusterSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (matchingNode string, err SchedulingError) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (snapshot *BasicClusterSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) SchedulingError {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (snapshot *BasicClusterSnapshot) UnschedulePod(namespace string, podName string, nodeName string) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
type internalBasicSnapshotData struct {
|
||||
nodeInfoMap map[string]*schedulerframework.NodeInfo
|
||||
pvcNamespacePodMap map[string]map[string]bool
|
||||
|
|
|
@ -28,19 +28,48 @@ import (
|
|||
// ClusterSnapshot is abstraction of cluster state used for predicate simulations.
|
||||
// It exposes mutation methods and can be viewed as scheduler's SharedLister.
|
||||
type ClusterSnapshot interface {
|
||||
ClusterSnapshotStore
|
||||
|
||||
// SchedulePod tries to schedule the given Pod on the Node with the given name inside the snapshot,
|
||||
// checking scheduling predicates. The pod is only scheduled if the predicates pass. If the pod is scheduled,
|
||||
// all relevant DRA objects are modified to reflect that. Returns nil if the pod got scheduled, and a non-nil
|
||||
// error explaining why not otherwise. The error Type() can be checked against SchedulingInternalError to distinguish
|
||||
// failing predicates from unexpected errors.
|
||||
SchedulePod(pod *apiv1.Pod, nodeName string) SchedulingError
|
||||
// SchedulePodOnAnyNodeMatching tries to schedule the given Pod on any Node for which nodeMatches returns
|
||||
// true. Scheduling predicates are checked, and the pod is scheduled only if there is a matching Node with passing
|
||||
// predicates. If the pod is scheduled, all relevant DRA objects are modified to reflect that, and the name of the
|
||||
// Node its scheduled on and nil are returned. If the pod can't be scheduled on any Node, an empty string and a non-nil
|
||||
// error explaining why are returned. The error Type() can be checked against SchedulingInternalError to distinguish
|
||||
// failing predicates from unexpected errors.
|
||||
SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (matchingNode string, err SchedulingError)
|
||||
// UnschedulePod removes the given Pod from the given Node inside the snapshot, and modifies all relevant DRA objects
|
||||
// to reflect the removal. The pod can then be scheduled on another Node in the snapshot using the Schedule methods.
|
||||
UnschedulePod(namespace string, podName string, nodeName string) error
|
||||
|
||||
// CheckPredicates runs scheduler predicates to check if the given Pod would be able to schedule on the Node with the given
|
||||
// name. Returns nil if predicates pass, or a non-nil error specifying why they didn't otherwise. The error Type() can be
|
||||
// checked against SchedulingInternalError to distinguish failing predicates from unexpected errors. Doesn't mutate the snapshot.
|
||||
CheckPredicates(pod *apiv1.Pod, nodeName string) SchedulingError
|
||||
}
|
||||
|
||||
// ClusterSnapshotStore is the "low-level" part of ClusterSnapshot, responsible for storing the snapshot state and mutating it directly,
|
||||
// without going through scheduler predicates. ClusterSnapshotStore shouldn't be directly used outside the clustersnapshot pkg, its methods
|
||||
// should be accessed via ClusterSnapshot.
|
||||
type ClusterSnapshotStore interface {
|
||||
schedulerframework.SharedLister
|
||||
|
||||
// SetClusterState resets the snapshot to an unforked state and replaces the contents of the snapshot
|
||||
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
|
||||
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error
|
||||
|
||||
// ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot.
|
||||
// ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot without checking scheduler predicates.
|
||||
ForceAddPod(pod *apiv1.Pod, nodeName string) error
|
||||
// ForceRemovePod removes the given Pod (and all DRA objects it owns) from the snapshot.
|
||||
ForceRemovePod(namespace string, podName string, nodeName string) error
|
||||
|
||||
// AddNodeInfo adds the given NodeInfo to the snapshot. The Node and the Pods are added, as well as
|
||||
// any DRA objects passed along them.
|
||||
// AddNodeInfo adds the given NodeInfo to the snapshot without checking scheduler predicates. The Node and the Pods are added,
|
||||
// as well as any DRA objects passed along them.
|
||||
AddNodeInfo(nodeInfo *framework.NodeInfo) error
|
||||
// RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as
|
||||
// any DRA objects owned by them.
|
||||
|
|
|
@ -46,6 +46,26 @@ type DeltaClusterSnapshot struct {
|
|||
data *internalDeltaSnapshotData
|
||||
}
|
||||
|
||||
func (snapshot *DeltaClusterSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) SchedulingError {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (snapshot *DeltaClusterSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (matchingNode string, err SchedulingError) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (snapshot *DeltaClusterSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) SchedulingError {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (snapshot *DeltaClusterSnapshot) UnschedulePod(namespace string, podName string, nodeName string) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
type deltaSnapshotNodeLister DeltaClusterSnapshot
|
||||
type deltaSnapshotStorageLister DeltaClusterSnapshot
|
||||
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clustersnapshot
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// SchedulingErrorType represents different possible schedulingError types.
|
||||
type SchedulingErrorType int
|
||||
|
||||
const (
|
||||
// SchedulingInternalError denotes internal unexpected error while trying to schedule a pod
|
||||
SchedulingInternalError SchedulingErrorType = iota
|
||||
// FailingPredicateError means that a pod couldn't be scheduled on a particular node because of a failing scheduler predicate
|
||||
FailingPredicateError
|
||||
// NoNodesPassingPredicatesFoundError means that a pod couldn't be scheduled on any Node because of failing scheduler predicates
|
||||
NoNodesPassingPredicatesFoundError
|
||||
)
|
||||
|
||||
// SchedulingError represents an error encountered while trying to schedule a Pod inside ClusterSnapshot.
|
||||
// An interface is exported instead of the concrete type to avoid the dreaded https://go.dev/doc/faq#nil_error.
|
||||
type SchedulingError interface {
|
||||
error
|
||||
|
||||
// Type can be used to distinguish between different SchedulingError types.
|
||||
Type() SchedulingErrorType
|
||||
// Reasons provides a list of human-readable reasons explaining the error.
|
||||
Reasons() []string
|
||||
|
||||
// FailingPredicateName returns the name of the predicate that failed. Only applicable to the FailingPredicateError type.
|
||||
FailingPredicateName() string
|
||||
// FailingPredicateReasons returns a list of human-readable reasons explaining why the predicate failed. Only applicable to the FailingPredicateError type.
|
||||
FailingPredicateReasons() []string
|
||||
}
|
||||
|
||||
type schedulingError struct {
|
||||
errorType SchedulingErrorType
|
||||
pod *apiv1.Pod
|
||||
|
||||
// Only applicable to SchedulingInternalError:
|
||||
internalErrorMsg string
|
||||
|
||||
// Only applicable to FailingPredicateError:
|
||||
failingPredicateName string
|
||||
failingPredicateReasons []string
|
||||
failingPredicateUnexpectedErrMsg string
|
||||
// debugInfo contains additional info that predicate doesn't include,
|
||||
// but may be useful for debugging (e.g. taints on node blocking scale-up)
|
||||
failingPredicateDebugInfo string
|
||||
}
|
||||
|
||||
// Type returns if error was internal of names predicate failure.
|
||||
func (se *schedulingError) Type() SchedulingErrorType {
|
||||
return se.errorType
|
||||
}
|
||||
|
||||
// Error satisfies the builtin error interface.
|
||||
func (se *schedulingError) Error() string {
|
||||
msg := ""
|
||||
|
||||
switch se.errorType {
|
||||
case SchedulingInternalError:
|
||||
msg = fmt.Sprintf("unexpected error: %s", se.internalErrorMsg)
|
||||
case FailingPredicateError:
|
||||
details := []string{
|
||||
fmt.Sprintf("predicateReasons=[%s]", strings.Join(se.FailingPredicateReasons(), ", ")),
|
||||
}
|
||||
if se.failingPredicateDebugInfo != "" {
|
||||
details = append(details, fmt.Sprintf("debugInfo=%s", se.failingPredicateDebugInfo))
|
||||
}
|
||||
if se.failingPredicateUnexpectedErrMsg != "" {
|
||||
details = append(details, fmt.Sprintf("unexpectedError=%s", se.failingPredicateUnexpectedErrMsg))
|
||||
}
|
||||
msg = fmt.Sprintf("predicate %q didn't pass (%s)", se.FailingPredicateName(), strings.Join(details, "; "))
|
||||
case NoNodesPassingPredicatesFoundError:
|
||||
msg = fmt.Sprintf("couldn't find a matching Node with passing predicates")
|
||||
default:
|
||||
msg = fmt.Sprintf("SchedulingErrorType type %q unknown - this shouldn't happen", se.errorType)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("can't schedule pod %s/%s: %s", se.pod.Namespace, se.pod.Name, msg)
|
||||
}
|
||||
|
||||
// Reasons returns a list of human-readable reasons for the error.
|
||||
func (se *schedulingError) Reasons() []string {
|
||||
switch se.errorType {
|
||||
case FailingPredicateError:
|
||||
return se.FailingPredicateReasons()
|
||||
default:
|
||||
return []string{se.Error()}
|
||||
}
|
||||
}
|
||||
|
||||
// FailingPredicateName returns the name of the predicate which failed.
|
||||
func (se *schedulingError) FailingPredicateName() string {
|
||||
return se.failingPredicateName
|
||||
}
|
||||
|
||||
// FailingPredicateReasons returns the failure reasons from the failed predicate as a slice of strings.
|
||||
func (se *schedulingError) FailingPredicateReasons() []string {
|
||||
return se.failingPredicateReasons
|
||||
}
|
||||
|
||||
// NewSchedulingInternalError creates a new schedulingError with SchedulingInternalError type.
|
||||
func NewSchedulingInternalError(pod *apiv1.Pod, errMsg string) SchedulingError {
|
||||
return &schedulingError{
|
||||
errorType: SchedulingInternalError,
|
||||
pod: pod,
|
||||
internalErrorMsg: errMsg,
|
||||
}
|
||||
}
|
||||
|
||||
// NewFailingPredicateError creates a new schedulingError with FailingPredicateError type.
|
||||
func NewFailingPredicateError(pod *apiv1.Pod, predicateName string, predicateReasons []string, unexpectedErrMsg string, debugInfo string) SchedulingError {
|
||||
return &schedulingError{
|
||||
errorType: FailingPredicateError,
|
||||
pod: pod,
|
||||
failingPredicateName: predicateName,
|
||||
failingPredicateReasons: predicateReasons,
|
||||
failingPredicateUnexpectedErrMsg: unexpectedErrMsg,
|
||||
failingPredicateDebugInfo: debugInfo,
|
||||
}
|
||||
}
|
||||
|
||||
// NewNoNodesPassingPredicatesFoundError creates a new schedulingError with NoNodesPassingPredicatesFoundError type.
|
||||
func NewNoNodesPassingPredicatesFoundError(pod *apiv1.Pod) SchedulingError {
|
||||
return &schedulingError{
|
||||
errorType: NoNodesPassingPredicatesFoundError,
|
||||
pod: pod,
|
||||
}
|
||||
}
|
|
@ -1,107 +0,0 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package predicatechecker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// PredicateErrorType is type of predicate error
|
||||
type PredicateErrorType int
|
||||
|
||||
const (
|
||||
// NotSchedulablePredicateError means that one of the filters returned that pod does not fit a node
|
||||
NotSchedulablePredicateError PredicateErrorType = iota
|
||||
// InternalPredicateError denotes internal unexpected error while calling PredicateChecker
|
||||
InternalPredicateError
|
||||
)
|
||||
|
||||
// PredicateError is a structure representing error returned from predicate checking simulation.
|
||||
type PredicateError struct {
|
||||
errorType PredicateErrorType
|
||||
predicateName string
|
||||
errorMessage string
|
||||
reasons []string
|
||||
// debugInfo contains additional info that predicate doesn't include,
|
||||
// but may be useful for debugging (e.g. taints on node blocking scale-up)
|
||||
debugInfo func() string
|
||||
}
|
||||
|
||||
// ErrorType returns if error was internal of names predicate failure.
|
||||
func (pe *PredicateError) ErrorType() PredicateErrorType {
|
||||
return pe.errorType
|
||||
}
|
||||
|
||||
// PredicateName return name of predicate which failed.
|
||||
func (pe *PredicateError) PredicateName() string {
|
||||
return pe.predicateName
|
||||
}
|
||||
|
||||
// Message returns error message.
|
||||
func (pe *PredicateError) Message() string {
|
||||
if pe.errorMessage == "" {
|
||||
return "unknown error"
|
||||
}
|
||||
return pe.errorMessage
|
||||
}
|
||||
|
||||
// VerboseMessage generates verbose error message. Building verbose message may be expensive so number of calls should be
|
||||
// limited.
|
||||
func (pe *PredicateError) VerboseMessage() string {
|
||||
return fmt.Sprintf(
|
||||
"%s; predicateName=%s; reasons: %s; debugInfo=%s",
|
||||
pe.Message(),
|
||||
pe.predicateName,
|
||||
strings.Join(pe.reasons, ", "),
|
||||
pe.debugInfo())
|
||||
}
|
||||
|
||||
// Reasons returns failure reasons from failed predicate as a slice of strings.
|
||||
func (pe *PredicateError) Reasons() []string {
|
||||
return pe.reasons
|
||||
}
|
||||
|
||||
// NewPredicateError creates a new predicate error from error and reasons.
|
||||
func NewPredicateError(
|
||||
errorType PredicateErrorType,
|
||||
predicateName string,
|
||||
errorMessage string,
|
||||
reasons []string,
|
||||
debugInfo func() string,
|
||||
) *PredicateError {
|
||||
return &PredicateError{
|
||||
errorType: errorType,
|
||||
predicateName: predicateName,
|
||||
errorMessage: errorMessage,
|
||||
reasons: reasons,
|
||||
debugInfo: debugInfo,
|
||||
}
|
||||
}
|
||||
|
||||
// GenericPredicateError return a generic instance of PredicateError to be used in context where predicate name is not
|
||||
// know.
|
||||
func GenericPredicateError() *PredicateError {
|
||||
return &PredicateError{
|
||||
errorType: NotSchedulablePredicateError,
|
||||
errorMessage: "generic predicate failure",
|
||||
}
|
||||
}
|
||||
|
||||
func emptyString() string {
|
||||
return ""
|
||||
}
|
|
@ -25,7 +25,7 @@ import (
|
|||
|
||||
// PredicateChecker checks whether all required predicates pass for given Pod and Node.
|
||||
type PredicateChecker interface {
|
||||
FitsAnyNode(clusterSnapshot clustersnapshot.ClusterSnapshot, pod *apiv1.Pod) (string, error)
|
||||
FitsAnyNodeMatching(clusterSnapshot clustersnapshot.ClusterSnapshot, pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, error)
|
||||
CheckPredicates(clusterSnapshot clustersnapshot.ClusterSnapshot, pod *apiv1.Pod, nodeName string) *PredicateError
|
||||
FitsAnyNode(clusterSnapshot clustersnapshot.ClusterSnapshotStore, pod *apiv1.Pod) (string, clustersnapshot.SchedulingError)
|
||||
FitsAnyNodeMatching(clusterSnapshot clustersnapshot.ClusterSnapshotStore, pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError)
|
||||
CheckPredicates(clusterSnapshot clustersnapshot.ClusterSnapshotStore, pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError
|
||||
}
|
||||
|
|
|
@ -19,13 +19,14 @@ package predicatechecker
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
klog "k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
|
@ -44,16 +45,16 @@ func NewSchedulerBasedPredicateChecker(fwHandle *framework.Handle) *SchedulerBas
|
|||
}
|
||||
|
||||
// FitsAnyNode checks if the given pod can be placed on any of the given nodes.
|
||||
func (p *SchedulerBasedPredicateChecker) FitsAnyNode(clusterSnapshot clustersnapshot.ClusterSnapshot, pod *apiv1.Pod) (string, error) {
|
||||
func (p *SchedulerBasedPredicateChecker) FitsAnyNode(clusterSnapshot clustersnapshot.ClusterSnapshotStore, pod *apiv1.Pod) (string, clustersnapshot.SchedulingError) {
|
||||
return p.FitsAnyNodeMatching(clusterSnapshot, pod, func(*framework.NodeInfo) bool {
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// FitsAnyNodeMatching checks if the given pod can be placed on any of the given nodes matching the provided function.
|
||||
func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clustersnapshot.ClusterSnapshot, pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, error) {
|
||||
func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clustersnapshot.ClusterSnapshotStore, pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) {
|
||||
if clusterSnapshot == nil {
|
||||
return "", fmt.Errorf("ClusterSnapshot not provided")
|
||||
return "", clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided")
|
||||
}
|
||||
|
||||
nodeInfosList, err := clusterSnapshot.ListNodeInfos()
|
||||
|
@ -63,7 +64,7 @@ func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clu
|
|||
// Scheduler requires interface returning error, but no implementation
|
||||
// of ClusterSnapshot ever does it.
|
||||
klog.Errorf("Error obtaining nodeInfos from schedulerLister")
|
||||
return "", fmt.Errorf("error obtaining nodeInfos from schedulerLister")
|
||||
return "", clustersnapshot.NewSchedulingInternalError(pod, "error obtaining nodeInfos from schedulerLister")
|
||||
}
|
||||
|
||||
p.fwHandle.DelegatingLister.UpdateDelegate(clusterSnapshot)
|
||||
|
@ -72,7 +73,7 @@ func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clu
|
|||
state := schedulerframework.NewCycleState()
|
||||
preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod)
|
||||
if !preFilterStatus.IsSuccess() {
|
||||
return "", fmt.Errorf("error running pre filter plugins for pod %s; %s", pod.Name, preFilterStatus.Message())
|
||||
return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
|
||||
}
|
||||
|
||||
for i := range nodeInfosList {
|
||||
|
@ -96,18 +97,17 @@ func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clu
|
|||
return nodeInfo.Node().Name, nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("cannot put pod %s on any node", pod.Name)
|
||||
return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
|
||||
}
|
||||
|
||||
// CheckPredicates checks if the given pod can be placed on the given node.
|
||||
func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot clustersnapshot.ClusterSnapshot, pod *apiv1.Pod, nodeName string) *PredicateError {
|
||||
func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot clustersnapshot.ClusterSnapshotStore, pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
|
||||
if clusterSnapshot == nil {
|
||||
return NewPredicateError(InternalPredicateError, "", "ClusterSnapshot not provided", nil, emptyString)
|
||||
return clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided")
|
||||
}
|
||||
nodeInfo, err := clusterSnapshot.GetNodeInfo(nodeName)
|
||||
if err != nil {
|
||||
errorMessage := fmt.Sprintf("Error obtaining NodeInfo for name %s; %v", nodeName, err)
|
||||
return NewPredicateError(InternalPredicateError, "", errorMessage, nil, emptyString)
|
||||
return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err))
|
||||
}
|
||||
|
||||
p.fwHandle.DelegatingLister.UpdateDelegate(clusterSnapshot)
|
||||
|
@ -116,47 +116,31 @@ func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot cluster
|
|||
state := schedulerframework.NewCycleState()
|
||||
_, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod)
|
||||
if !preFilterStatus.IsSuccess() {
|
||||
return NewPredicateError(
|
||||
InternalPredicateError,
|
||||
"",
|
||||
preFilterStatus.Message(),
|
||||
preFilterStatus.Reasons(),
|
||||
emptyString)
|
||||
return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
|
||||
}
|
||||
|
||||
filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler())
|
||||
|
||||
if !filterStatus.IsSuccess() {
|
||||
filterName := filterStatus.Plugin()
|
||||
filterMessage := filterStatus.Message()
|
||||
filterReasons := filterStatus.Reasons()
|
||||
if filterStatus.IsRejected() {
|
||||
return NewPredicateError(
|
||||
NotSchedulablePredicateError,
|
||||
filterName,
|
||||
filterMessage,
|
||||
filterReasons,
|
||||
p.buildDebugInfo(filterName, nodeInfo))
|
||||
unexpectedErrMsg := ""
|
||||
if !filterStatus.IsRejected() {
|
||||
unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String())
|
||||
}
|
||||
return NewPredicateError(
|
||||
InternalPredicateError,
|
||||
filterName,
|
||||
filterMessage,
|
||||
filterReasons,
|
||||
p.buildDebugInfo(filterName, nodeInfo))
|
||||
return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SchedulerBasedPredicateChecker) buildDebugInfo(filterName string, nodeInfo *framework.NodeInfo) func() string {
|
||||
func (p *SchedulerBasedPredicateChecker) failingFilterDebugInfo(filterName string, nodeInfo *framework.NodeInfo) string {
|
||||
infoParts := []string{fmt.Sprintf("nodeName: %q", nodeInfo.Node().Name)}
|
||||
|
||||
switch filterName {
|
||||
case "TaintToleration":
|
||||
taints := nodeInfo.Node().Spec.Taints
|
||||
return func() string {
|
||||
return fmt.Sprintf("taints on node: %#v", taints)
|
||||
}
|
||||
default:
|
||||
return emptyString
|
||||
infoParts = append(infoParts, fmt.Sprintf("nodeTaints: %#v", nodeInfo.Node().Spec.Taints))
|
||||
}
|
||||
|
||||
return strings.Join(infoParts, ", ")
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import (
|
|||
testconfig "k8s.io/autoscaler/cluster-autoscaler/config/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
|
||||
scheduler "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
|
@ -151,9 +151,11 @@ func TestCheckPredicate(t *testing.T) {
|
|||
predicateError := tt.predicateChecker.CheckPredicates(clusterSnapshot, tt.testPod, tt.node.Name)
|
||||
if tt.expectError {
|
||||
assert.NotNil(t, predicateError)
|
||||
assert.Equal(t, NotSchedulablePredicateError, predicateError.ErrorType())
|
||||
assert.Equal(t, "Insufficient cpu", predicateError.Message())
|
||||
assert.Contains(t, predicateError.VerboseMessage(), "Insufficient cpu; predicateName=NodeResourcesFit")
|
||||
assert.Equal(t, clustersnapshot.FailingPredicateError, predicateError.Type())
|
||||
assert.Equal(t, "NodeResourcesFit", predicateError.FailingPredicateName())
|
||||
assert.Equal(t, []string{"Insufficient cpu"}, predicateError.FailingPredicateReasons())
|
||||
assert.Contains(t, predicateError.Error(), "NodeResourcesFit")
|
||||
assert.Contains(t, predicateError.Error(), "Insufficient cpu")
|
||||
} else {
|
||||
assert.Nil(t, predicateError)
|
||||
}
|
||||
|
@ -291,8 +293,9 @@ func TestDebugInfo(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
predicateErr := defaultPredicateChecker.CheckPredicates(clusterSnapshot, p1, "n1")
|
||||
assert.NotNil(t, predicateErr)
|
||||
assert.Equal(t, "node(s) had untolerated taint {SomeTaint: WhyNot?}", predicateErr.Message())
|
||||
assert.Contains(t, predicateErr.VerboseMessage(), "RandomTaint")
|
||||
assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}")
|
||||
assert.Contains(t, predicateErr.Error(), "node(s) had untolerated taint {SomeTaint: WhyNot?}")
|
||||
assert.Contains(t, predicateErr.Error(), "RandomTaint")
|
||||
|
||||
// with custom predicate checker
|
||||
|
||||
|
|
Loading…
Reference in New Issue