Merge pull request #6880 from yaroslava-serdiuk/provreq-scale-down
BookCapacity for ProvisioningRequest pods
This commit is contained in:
commit
68a757c191
|
@ -532,7 +532,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
|
|||
|
||||
// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
|
||||
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)
|
||||
|
||||
preScaleUp := func() time.Time {
|
||||
scaleUpStart := time.Now()
|
||||
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
|
||||
|
|
|
@ -512,7 +512,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
|
|||
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)
|
||||
|
||||
opts.ScaleUpOrchestrator = scaleUpOrchestrator
|
||||
provreqProcesor := provreq.NewProvReqProcessor(client)
|
||||
provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -522,6 +522,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
|
|||
return nil, err
|
||||
}
|
||||
podListProcessor.AddProcessor(injector)
|
||||
podListProcessor.AddProcessor(provreqProcesor)
|
||||
}
|
||||
opts.Processors.PodListProcessor = podListProcessor
|
||||
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
|
||||
|
|
|
@ -17,16 +17,24 @@ limitations under the License.
|
|||
package provreq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
|
||||
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -36,15 +44,20 @@ const (
|
|||
defaultMaxUpdated = 20
|
||||
)
|
||||
|
||||
type injector interface {
|
||||
TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error)
|
||||
}
|
||||
|
||||
type provReqProcessor struct {
|
||||
now func() time.Time
|
||||
maxUpdated int
|
||||
client *provreqclient.ProvisioningRequestClient
|
||||
injector injector
|
||||
}
|
||||
|
||||
// NewProvReqProcessor return ProvisioningRequestProcessor.
|
||||
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient) *provReqProcessor {
|
||||
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client}
|
||||
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient, predicateChecker predicatechecker.PredicateChecker) *provReqProcessor {
|
||||
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator(predicateChecker)}
|
||||
}
|
||||
|
||||
// Refresh implements loop.Observer interface and will be run at the start
|
||||
|
@ -56,15 +69,14 @@ func (p *provReqProcessor) Refresh() {
|
|||
klog.Errorf("Failed to get ProvisioningRequests list, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
p.Process(provReqs)
|
||||
p.refresh(provReqs)
|
||||
}
|
||||
|
||||
// Process iterates over ProvisioningRequests and apply:
|
||||
// refresh iterates over ProvisioningRequests and apply:
|
||||
// -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired.
|
||||
// -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime.
|
||||
// TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest
|
||||
func (p *provReqProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) {
|
||||
func (p *provReqProcessor) refresh(provReqs []*provreqwrapper.ProvisioningRequest) {
|
||||
expiredProvReq := []*provreqwrapper.ProvisioningRequest{}
|
||||
failedProvReq := []*provreqwrapper.ProvisioningRequest{}
|
||||
for _, provReq := range provReqs {
|
||||
|
@ -108,5 +120,50 @@ func (p *provReqProcessor) Process(provReqs []*provreqwrapper.ProvisioningReques
|
|||
}
|
||||
}
|
||||
|
||||
// Cleanup cleans up internal state.
|
||||
// CleanUp cleans up internal state
|
||||
func (p *provReqProcessor) CleanUp() {}
|
||||
|
||||
// Process implements PodListProcessor.Process() and inject fake pods to the cluster snapshoot for Provisioned ProvReqs in order to
|
||||
// reserve capacity from ScaleDown.
|
||||
func (p *provReqProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
|
||||
err := p.bookCapacity(context)
|
||||
if err != nil {
|
||||
klog.Warningf("Failed to book capacity for ProvisioningRequests: %s", err)
|
||||
}
|
||||
return unschedulablePods, nil
|
||||
}
|
||||
|
||||
// bookCapacity schedule fake pods for ProvisioningRequest that should have reserved capacity
|
||||
// in the cluster.
|
||||
func (p *provReqProcessor) bookCapacity(ctx *context.AutoscalingContext) error {
|
||||
provReqs, err := p.client.ProvisioningRequests()
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
|
||||
}
|
||||
podsToCreate := []*apiv1.Pod{}
|
||||
for _, provReq := range provReqs {
|
||||
if !conditions.ShouldCapacityBeBooked(provReq) {
|
||||
continue
|
||||
}
|
||||
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
|
||||
if err != nil {
|
||||
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
|
||||
// If there is an error, mark PR as invalid, because we won't be able to book capacity
|
||||
// for it anyway.
|
||||
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
|
||||
if _, err := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
|
||||
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
podsToCreate = append(podsToCreate, pods...)
|
||||
}
|
||||
if len(podsToCreate) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Scheduling the pods to reserve capacity for provisioning request.
|
||||
if _, _, err = p.injector.TrySchedulePods(ctx.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -17,19 +17,26 @@ limitations under the License.
|
|||
package provreq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
|
||||
)
|
||||
|
||||
func TestProcess(t *testing.T) {
|
||||
func TestRefresh(t *testing.T) {
|
||||
now := time.Now()
|
||||
dayAgo := now.Add(-1 * 24 * time.Hour)
|
||||
weekAgo := now.Add(-1 * defaultExpirationTime).Add(-1 * 5 * time.Minute)
|
||||
|
@ -146,8 +153,8 @@ func TestProcess(t *testing.T) {
|
|||
additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional")
|
||||
additionalPr.CreationTimestamp = metav1.NewTime(weekAgo)
|
||||
additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity
|
||||
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)}
|
||||
processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
|
||||
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil}
|
||||
processor.refresh([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
|
||||
assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions)
|
||||
if len(test.conditions) == len(test.wantConditions) {
|
||||
assert.ElementsMatch(t, []metav1.Condition{
|
||||
|
@ -164,3 +171,78 @@ func TestProcess(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
type fakeInjector struct {
|
||||
pods []*apiv1.Pod
|
||||
}
|
||||
|
||||
func (f *fakeInjector) TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error) {
|
||||
f.pods = pods
|
||||
return nil, 0, nil
|
||||
}
|
||||
|
||||
func TestBookCapacity(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
conditions []string
|
||||
provReq *provreqwrapper.ProvisioningRequest
|
||||
capacityIsBooked bool
|
||||
}{
|
||||
{
|
||||
name: "ProvReq is new, check-capacity class",
|
||||
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity),
|
||||
capacityIsBooked: false,
|
||||
},
|
||||
{
|
||||
name: "ProvReq is Failed, best-effort-atomic class",
|
||||
conditions: []string{v1beta1.Failed},
|
||||
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
|
||||
capacityIsBooked: false,
|
||||
},
|
||||
{
|
||||
name: "ProvReq is Provisioned, unknown class",
|
||||
conditions: []string{v1beta1.Provisioned},
|
||||
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), "unknown"),
|
||||
capacityIsBooked: false,
|
||||
},
|
||||
{
|
||||
name: "ProvReq is Provisioned, capacity should be booked, check-capacity class",
|
||||
conditions: []string{v1beta1.Provisioned},
|
||||
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity),
|
||||
capacityIsBooked: true,
|
||||
},
|
||||
{
|
||||
name: "ProvReq is Provisioned, capacity should be booked, best-effort-atomic class",
|
||||
conditions: []string{v1beta1.Provisioned},
|
||||
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
|
||||
capacityIsBooked: true,
|
||||
},
|
||||
{
|
||||
name: "ProvReq has BookingExpired, capacity should not be booked, best-effort-atomic class",
|
||||
conditions: []string{v1beta1.Provisioned, v1beta1.BookingExpired},
|
||||
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
|
||||
capacityIsBooked: false,
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
test := test
|
||||
injector := &fakeInjector{pods: []*apiv1.Pod{}}
|
||||
for _, condition := range test.conditions {
|
||||
conditions.AddOrUpdateCondition(test.provReq, condition, metav1.ConditionTrue, "", "", metav1.Now())
|
||||
}
|
||||
|
||||
processor := &provReqProcessor{
|
||||
now: func() time.Time { return time.Now() },
|
||||
client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, test.provReq),
|
||||
maxUpdated: 20,
|
||||
injector: injector,
|
||||
}
|
||||
ctx, _ := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, nil, nil, nil)
|
||||
processor.bookCapacity(&ctx)
|
||||
if (test.capacityIsBooked && len(injector.pods) == 0) || (!test.capacityIsBooked && len(injector.pods) > 0) {
|
||||
t.Fail()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,19 +21,14 @@ import (
|
|||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
|
||||
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
|
||||
ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
@ -96,7 +91,6 @@ func (o *provReqOrchestrator) ScaleUp(
|
|||
|
||||
o.context.ClusterSnapshot.Fork()
|
||||
defer o.context.ClusterSnapshot.Revert()
|
||||
o.bookCapacity()
|
||||
|
||||
// unschedulablePods pods should belong to one ProvisioningClass, so only one provClass should try to ScaleUp.
|
||||
for _, provClass := range o.provisioningClasses {
|
||||
|
@ -115,35 +109,3 @@ func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize(
|
|||
) (*status.ScaleUpStatus, ca_errors.AutoscalerError) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (o *provReqOrchestrator) bookCapacity() error {
|
||||
provReqs, err := o.client.ProvisioningRequests()
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
|
||||
}
|
||||
podsToCreate := []*apiv1.Pod{}
|
||||
for _, provReq := range provReqs {
|
||||
if conditions.ShouldCapacityBeBooked(provReq) {
|
||||
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
|
||||
if err != nil {
|
||||
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
|
||||
// If there is an error, mark PR as invalid, because we won't be able to book capacity
|
||||
// for it anyway.
|
||||
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
|
||||
if _, err := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
|
||||
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
podsToCreate = append(podsToCreate, pods...)
|
||||
}
|
||||
}
|
||||
if len(podsToCreate) == 0 {
|
||||
return nil
|
||||
}
|
||||
// scheduling the pods to reserve capacity for provisioning request with BookCapacity condition
|
||||
if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
|
||||
klog.Warningf("Error during capacity booking: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -188,10 +188,10 @@ func TestScaleUp(t *testing.T) {
|
|||
scaleUpResult: status.ScaleUpNotNeeded,
|
||||
},
|
||||
{
|
||||
name: "capacity in the cluster is booked",
|
||||
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, bookedCapacityProvReq},
|
||||
name: "capacity is there, check-capacity class",
|
||||
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq},
|
||||
provReqToScaleUp: newCheckCapacityMemProvReq,
|
||||
scaleUpResult: status.ScaleUpNoOptionsAvailable,
|
||||
scaleUpResult: status.ScaleUpSuccessful,
|
||||
},
|
||||
{
|
||||
name: "unsupported ProvisioningRequest is ignored",
|
||||
|
@ -211,12 +211,6 @@ func TestScaleUp(t *testing.T) {
|
|||
provReqToScaleUp: atomicScaleUpProvReq,
|
||||
scaleUpResult: status.ScaleUpNotNeeded,
|
||||
},
|
||||
{
|
||||
name: "some capacity is pre-booked, large atomic scale-up request doesn't fit",
|
||||
provReqs: []*provreqwrapper.ProvisioningRequest{bookedCapacityProvReq, largeAtomicScaleUpProvReq},
|
||||
provReqToScaleUp: largeAtomicScaleUpProvReq,
|
||||
scaleUpResult: status.ScaleUpNoOptionsAvailable,
|
||||
},
|
||||
{
|
||||
name: "capacity is there, large atomic scale-up request doesn't require scale-up",
|
||||
provReqs: []*provreqwrapper.ProvisioningRequest{largeAtomicScaleUpProvReq},
|
||||
|
|
Loading…
Reference in New Issue