Add Capacity Buffer controller logic

This commit is contained in:
Omran 2025-09-11 10:50:42 +00:00
parent 7b9cb8c8ba
commit fe61e262fd
No known key found for this signature in database
15 changed files with 1068 additions and 12 deletions

View File

@ -97,9 +97,9 @@ type ResourceList map[ResourceName]resource.Quantity
// CapacityBufferSpec defines the desired state of CapacityBuffer.
type CapacityBufferSpec struct {
// ProvisioningStrategy defines how the buffer is utilized.
// "active-capacity" is the default strategy, where the buffer actively scales up the cluster by creating placeholder pods.
// +kubebuilder:validation:Enum=active-capacity
// +kubebuilder:default="active-capacity"
// "buffer.x-k8s.io/active-capacity" is the default strategy, where the buffer actively scales up the cluster by creating placeholder pods.
// +kubebuilder:validation:Enum=buffer.x-k8s.io/active-capacity
// +kubebuilder:default="buffer.x-k8s.io/active-capacity"
// +optional
ProvisioningStrategy *string `json:"provisioningStrategy,omitempty" protobuf:"bytes,1,opt,name=provisioningStrategy"`
@ -123,24 +123,18 @@ type CapacityBufferSpec struct {
// If neither `replicas` nor `percentage` is set, as many chunks as fit within
// defined resource limits (if any) will be created. If both are set, the maximum
// of the two will be used.
// This field is mutually exclusive with `percentage` when `scalableRef` is set.
// +optional
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:ExclusiveMinimum=false
// +kubebuilder:validation:Xor=replicas,percentage
Replicas *int32 `json:"replicas,omitempty" protobuf:"varint,4,opt,name=replicas"`
// Percentage defines the desired buffer capacity as a percentage of the
// `scalableRef`'s current replicas. This is only applicable if `scalableRef` is set.
// The absolute number of replicas is calculated from the percentage by rounding up to a minimum of 1.
// For example, if `scalableRef` has 10 replicas and `percentage` is 20, 2 buffer chunks will be created.
// This field is mutually exclusive with `replicas`.
// +optional
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=100
// +kubebuilder:validation:ExclusiveMaximum=false
// +kubebuilder:validation:ExclusiveMinimum=false
// +kubebuilder:validation:Xor=replicas,percentage
Percentage *int32 `json:"percentage,omitempty" protobuf:"varint,5,opt,name=percentage"`
// Limits, if specified, will limit the number of chunks created for this buffer

View File

@ -100,12 +100,12 @@ spec:
- name
type: object
provisioningStrategy:
default: active-capacity
default: buffer.x-k8s.io/active-capacity
description: |-
ProvisioningStrategy defines how the buffer is utilized.
"active-capacity" is the default strategy, where the buffer actively scales up the cluster by creating placeholder pods.
"buffer.x-k8s.io/active-capacity" is the default strategy, where the buffer actively scales up the cluster by creating placeholder pods.
enum:
- active-capacity
- buffer.x-k8s.io/active-capacity
type: string
replicas:
description: |-

View File

@ -0,0 +1,49 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common
import (
"context"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
client "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/clientset/versioned"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// Constants to use in Capacity Buffers objects
const (
ActiveProvisioningStrategy = "buffer.x-k8s.io/active-capacity"
ReadyForProvisioningCondition = "ReadyForProvisioning"
ProvisioningCondition = "Provisioning"
ConditionTrue = "True"
ConditionFalse = "False"
DefaultNamespace = "default"
)
// CreatePodTemplate creates a pod template object by calling API server
func CreatePodTemplate(client *kubernetes.Clientset, podTemplate *corev1.PodTemplate) (*corev1.PodTemplate, error) {
return client.CoreV1().PodTemplates(DefaultNamespace).Create(context.TODO(), podTemplate, metav1.CreateOptions{})
}
// UpdateBufferStatus updates the passed buffer object with its defined status
func UpdateBufferStatus(buffersClient client.Interface, buffer *v1.CapacityBuffer) error {
_, err := buffersClient.AutoscalingV1().CapacityBuffers(DefaultNamespace).UpdateStatus(context.TODO(), buffer, metav1.UpdateOptions{})
return err
}

View File

@ -0,0 +1,143 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"time"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/labels"
buffersclient "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/clientset/versioned"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/listers/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
common "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
filters "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/filters"
translators "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/translators"
updater "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/updater"
client "k8s.io/client-go/kubernetes"
)
const loopInterval = time.Second * 5
// BufferController performs updates on Buffers and convert them to pods to be injected
type BufferController interface {
// Run to run the reconciliation loop frequently every x seconds
Run(stopCh <-chan struct{})
}
type bufferController struct {
buffersLister v1.CapacityBufferLister
strategyFilter filters.Filter
statusFilter filters.Filter
translator translators.Translator
updater updater.StatusUpdater
loopInterval time.Duration
}
// NewBufferController creates new bufferController object
func NewBufferController(
buffersLister v1.CapacityBufferLister,
strategyFilter filters.Filter,
statusFilter filters.Filter,
translator translators.Translator,
updater updater.StatusUpdater,
loopInterval time.Duration,
) BufferController {
return &bufferController{
buffersLister: buffersLister,
strategyFilter: strategyFilter,
statusFilter: statusFilter,
translator: translator,
updater: updater,
loopInterval: loopInterval,
}
}
// NewDefaultBufferController creates bufferController with default configs
func NewDefaultBufferController(
listerRegistry kubernetes.ListerRegistry,
capacityBufferClinet buffersclient.Clientset,
nodeBufferListener v1.CapacityBufferLister,
kubeClient client.Clientset,
) BufferController {
return &bufferController{
buffersLister: nodeBufferListener,
// Accepting empty string as it represents nil value for ProvisioningStrategy
strategyFilter: filters.NewStrategyFilter([]string{common.ActiveProvisioningStrategy, ""}),
statusFilter: filters.NewStatusFilter(map[string]string{
common.ReadyForProvisioningCondition: common.ConditionTrue,
common.ProvisioningCondition: common.ConditionTrue,
}),
translator: translators.NewCombinedTranslator(
[]translators.Translator{
translators.NewPodTemplateBufferTranslator(),
},
),
updater: *updater.NewStatusUpdater(&capacityBufferClinet),
loopInterval: loopInterval,
}
}
// Run to run the controller reconcile loop
func (c *bufferController) Run(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
case <-time.After(c.loopInterval):
c.reconcile()
}
}
}
// Reconcile represents single iteration in the main-loop of Updater
func (c *bufferController) reconcile() {
// List all capacity buffers objects
buffers, err := c.buffersLister.List(labels.Everything())
if err != nil {
klog.Errorf("Capacity buffer controller failed to list buffers with error: %v", err.Error())
return
}
klog.V(2).Infof("Capacity buffer controller listed [%v] buffers", len(buffers))
// Filter the desired provisioning strategy
filteredBuffers, _ := c.strategyFilter.Filter(buffers)
klog.V(2).Infof("Capacity buffer controller filtered %v buffers with buffers strategy filter", len(filteredBuffers))
// Filter the desired status
toBeTranslatedBuffers, _ := c.statusFilter.Filter(filteredBuffers)
klog.V(2).Infof("Capacity buffer controller filtered %v buffers with buffers status filter", len(filteredBuffers))
// Extract pod specs and number of replicas from filtered buffers
errors := c.translator.Translate(toBeTranslatedBuffers)
logErrors(errors)
// Update buffer status by calling API server
errors = c.updater.Update(toBeTranslatedBuffers)
logErrors(errors)
}
func logErrors(errors []error) {
for _, error := range errors {
klog.Errorf("Capacity buffer controller error: %v", error.Error())
}
}

View File

@ -0,0 +1,60 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filter
import (
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
)
// Filter filters CapacityBuffer based on some criteria.
type Filter interface {
Filter(buffers []*v1.CapacityBuffer) ([]*v1.CapacityBuffer, []*v1.CapacityBuffer)
CleanUp()
}
// combinedFilter is a list of Filter
type combinedFilter struct {
filters []Filter
}
// NewCombinedFilter construct combinedFilter.
func NewCombinedFilter(filters []Filter) *combinedFilter {
return &combinedFilter{filters}
}
// AddFilter append a filter to the list.
func (f *combinedFilter) AddFilter(filter Filter) {
f.filters = append(f.filters, filter)
}
// Filter runs sub-filters sequentially
func (f *combinedFilter) Filter(buffers []*v1.CapacityBuffer) ([]*v1.CapacityBuffer, []*v1.CapacityBuffer) {
var totalFilteredOutBuffers []*v1.CapacityBuffer
for _, buffersFilter := range f.filters {
updatedBuffersList, filteredOutBuffers := buffersFilter.Filter(buffers)
buffers = updatedBuffersList
totalFilteredOutBuffers = append(totalFilteredOutBuffers, filteredOutBuffers...)
}
return buffers, totalFilteredOutBuffers
}
// CleanUp cleans up the filter's internal structures.
func (f *combinedFilter) CleanUp() {
for _, filter := range f.filters {
filter.CleanUp()
}
}

View File

@ -0,0 +1,62 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filter
import (
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
)
// statusFilter filters out buffers with the defined conditions
type statusFilter struct {
conditions map[string]string
}
// NewStatusFilter creates an instance of statusFilter that filters out the buffers with condition in passed conditions.
func NewStatusFilter(conditions map[string]string) *statusFilter {
return &statusFilter{
conditions: conditions,
}
}
// Filter filters the passed buffers based on buffer status conditions
func (f *statusFilter) Filter(buffersToFilter []*v1.CapacityBuffer) ([]*v1.CapacityBuffer, []*v1.CapacityBuffer) {
var buffers []*v1.CapacityBuffer
var filteredOutBuffers []*v1.CapacityBuffer
for _, buffer := range buffersToFilter {
if !f.hasCondition(buffer) {
buffers = append(buffers, buffer)
} else {
filteredOutBuffers = append(filteredOutBuffers, buffer)
}
}
return buffers, filteredOutBuffers
}
func (f *statusFilter) hasCondition(buffer *v1.CapacityBuffer) bool {
bufferConditions := buffer.Status.Conditions
for _, condition := range bufferConditions {
if val, found := f.conditions[condition.Type]; found && val == string(condition.Status) {
return true
}
}
return false
}
// CleanUp cleans up the filter's internal structures.
func (f *statusFilter) CleanUp() {
}

View File

@ -0,0 +1,81 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filter
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/testutil"
)
func TestStatusFilter(t *testing.T) {
tests := []struct {
name string
conditions map[string]string
buffers []*v1.CapacityBuffer
expectedFilteredBuffers []*v1.CapacityBuffer
expectedFilteredOutBuffers []*v1.CapacityBuffer
}{
{
name: "Empty conditions, filter none",
conditions: map[string]string{},
buffers: []*v1.CapacityBuffer{
testutil.GetPodTemplateRefBuffer(&v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil),
},
expectedFilteredBuffers: []*v1.CapacityBuffer{
testutil.GetPodTemplateRefBuffer(&v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil),
},
expectedFilteredOutBuffers: []*v1.CapacityBuffer{},
},
{
name: "Some condition, filter one",
conditions: map[string]string{common.ReadyForProvisioningCondition: common.ConditionTrue},
buffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&testutil.ProvisioningStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, testutil.GetConditionReady()),
},
expectedFilteredBuffers: []*v1.CapacityBuffer{},
expectedFilteredOutBuffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&testutil.ProvisioningStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, testutil.GetConditionReady()),
},
},
{
name: "Some condition, filter one in and one out",
conditions: map[string]string{common.ReadyForProvisioningCondition: common.ConditionTrue},
buffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&testutil.ProvisioningStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, testutil.GetConditionReady()),
testutil.GetBuffer(&testutil.ProvisioningStrategy, &v1.LocalObjectRef{Name: testutil.AnotherPodTemplateRefName}, nil, nil, nil, testutil.GetConditionNotReady()),
},
expectedFilteredBuffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&testutil.ProvisioningStrategy, &v1.LocalObjectRef{Name: testutil.AnotherPodTemplateRefName}, nil, nil, nil, testutil.GetConditionNotReady()),
},
expectedFilteredOutBuffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&testutil.ProvisioningStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, testutil.GetConditionReady()),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
statusFilter := NewStatusFilter(test.conditions)
filtered, filteredOut := statusFilter.Filter(test.buffers)
assert.ElementsMatch(t, test.expectedFilteredBuffers, filtered)
assert.ElementsMatch(t, test.expectedFilteredOutBuffers, filteredOut)
})
}
}

View File

@ -0,0 +1,71 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filter
import (
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
)
// strategyFilter filters out buffers with provisioning strategies not defined in strategiesToUse
// and defaults nil values of provisioningStrategy to empty string
type strategyFilter struct {
strategiesToUse map[string]bool
}
// NewStrategyFilter creates an instance of strategyFilter.
func NewStrategyFilter(strategiesToUse []string) *strategyFilter {
strategiesToUseMap := map[string]bool{}
for _, strategy := range strategiesToUse {
strategiesToUseMap[strategy] = true
}
return &strategyFilter{
strategiesToUse: strategiesToUseMap,
}
}
// Filter filters out buffers with provisioning strategies not defined in strategiesToUseMap
func (f *strategyFilter) Filter(buffers []*v1.CapacityBuffer) ([]*v1.CapacityBuffer, []*v1.CapacityBuffer) {
var filteredBuffers []*v1.CapacityBuffer
var filteredOutBuffers []*v1.CapacityBuffer
for _, buffer := range buffers {
if f.isAllowedProvisioningStrategy(buffer) {
filteredBuffers = append(filteredBuffers, buffer)
} else {
filteredOutBuffers = append(filteredOutBuffers, buffer)
}
}
return filteredBuffers, filteredOutBuffers
}
func (f *strategyFilter) isAllowedProvisioningStrategy(buffer *v1.CapacityBuffer) bool {
provisioningStrategy := ""
if buffer.Spec.ProvisioningStrategy != nil {
provisioningStrategy = *buffer.Spec.ProvisioningStrategy
}
if useStrategy, found := f.strategiesToUse[provisioningStrategy]; found && useStrategy {
return true
}
return false
}
// CleanUp cleans up the filter's internal structures.
func (f *strategyFilter) CleanUp() {
}

View File

@ -0,0 +1,94 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filter
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/testutil"
)
func TestStrategyFilter(t *testing.T) {
someRandomStrategy := "someStrategy"
tests := []struct {
name string
buffers []*v1.CapacityBuffer
strategiesToConsider []string
expectedFilteredBuffers []*v1.CapacityBuffer
expectedFilteredOutBuffers []*v1.CapacityBuffer
}{
{
name: "Single buffer with accepted strategy",
buffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&testutil.ProvisioningStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
},
strategiesToConsider: []string{testutil.ProvisioningStrategy},
expectedFilteredBuffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&testutil.ProvisioningStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
},
expectedFilteredOutBuffers: []*v1.CapacityBuffer{},
},
{
name: "Nil strategy defaulting to empty",
buffers: []*v1.CapacityBuffer{
testutil.GetBuffer(nil, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
},
strategiesToConsider: []string{""},
expectedFilteredBuffers: []*v1.CapacityBuffer{
testutil.GetBuffer(nil, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
},
expectedFilteredOutBuffers: []*v1.CapacityBuffer{},
},
{
name: "Single buffer with rejected strategy",
buffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&someRandomStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
},
strategiesToConsider: []string{testutil.ProvisioningStrategy},
expectedFilteredBuffers: []*v1.CapacityBuffer{},
expectedFilteredOutBuffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&someRandomStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
},
},
{
name: "Multiple buffers different strategies",
buffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&someRandomStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
testutil.GetBuffer(&testutil.ProvisioningStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
testutil.GetBuffer(nil, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
},
strategiesToConsider: []string{testutil.ProvisioningStrategy, ""},
expectedFilteredBuffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&testutil.ProvisioningStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
testutil.GetBuffer(nil, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
},
expectedFilteredOutBuffers: []*v1.CapacityBuffer{
testutil.GetBuffer(&someRandomStrategy, &v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, nil, nil, nil, nil),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
strategyFilter := NewStrategyFilter(test.strategiesToConsider)
filtered, filteredOut := strategyFilter.Filter(test.buffers)
assert.ElementsMatch(t, test.expectedFilteredBuffers, filtered)
assert.ElementsMatch(t, test.expectedFilteredOutBuffers, filteredOut)
})
}
}

View File

@ -0,0 +1,109 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testutil
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
)
// To use their pointers in creating testing capacity buffer objects
var (
ProvisioningStrategy = common.ActiveProvisioningStrategy
SomeNumberOfReplicas = int32(3)
AnotherNumberOfReplicas = int32(5)
SomePodTemplateRefName = "some-pod-template"
AnotherPodTemplateRefName = "another-pod-template"
)
// SanitizeBuffersStatus returns a list of the status objects of the passed buffers after sanitizing them for testing comparison
func SanitizeBuffersStatus(buffers []*v1.CapacityBuffer) []*v1.CapacityBufferStatus {
resultedStatus := []*v1.CapacityBufferStatus{}
for _, buffer := range buffers {
for i := range buffer.Status.Conditions {
buffer.Status.Conditions[i].LastTransitionTime = metav1.Time{}
buffer.Status.Conditions[i].Message = ""
}
resultedStatus = append(resultedStatus, &buffer.Status)
}
return resultedStatus
}
// GetPodTemplateRefBuffer returns a buffer with podTemplateRef with the passed attributes and empty status, should be used for testing purposes only
func GetPodTemplateRefBuffer(podTemplateRef *v1.LocalObjectRef, replicas *int32) *v1.CapacityBuffer {
return &v1.CapacityBuffer{
Spec: v1.CapacityBufferSpec{
ProvisioningStrategy: &ProvisioningStrategy,
PodTemplateRef: podTemplateRef,
ScalableRef: nil,
Replicas: replicas,
Percentage: nil,
Limits: nil,
},
Status: *GetBufferStatus(nil, nil, nil),
}
}
// GetBuffer returns a capacity buffer with the passed attributes, should be used for testing purposes only
func GetBuffer(strategy *string, podTemplateRef *v1.LocalObjectRef, replicas *int32, podTempRef *v1.LocalObjectRef, statusReplicas *int32, conditions []metav1.Condition) *v1.CapacityBuffer {
return &v1.CapacityBuffer{
Spec: v1.CapacityBufferSpec{
ProvisioningStrategy: strategy,
PodTemplateRef: podTemplateRef,
ScalableRef: nil,
Replicas: replicas,
Percentage: nil,
Limits: nil,
},
Status: *GetBufferStatus(podTempRef, statusReplicas, conditions),
}
}
// GetBufferStatus returns a buffer status with the passed attributes, should be used for testing purposes only
func GetBufferStatus(podTempRef *v1.LocalObjectRef, replicas *int32, conditions []metav1.Condition) *v1.CapacityBufferStatus {
return &v1.CapacityBufferStatus{
PodTemplateRef: podTempRef,
Replicas: replicas,
PodTemplateGeneration: nil,
Conditions: conditions,
}
}
// GetConditionReady returns a list of conditions with a condition ready and empty message, should be used for testing purposes only
func GetConditionReady() []metav1.Condition {
readyCondition := metav1.Condition{
Type: common.ReadyForProvisioningCondition,
Status: common.ConditionTrue,
Message: "",
Reason: "atrtibutesSetSuccessfully",
LastTransitionTime: metav1.Time{},
}
return []metav1.Condition{readyCondition}
}
// GetConditionNotReady returns a list of conditions with a condition not ready and empty message, should be used for testing purposes only
func GetConditionNotReady() []metav1.Condition {
notReadyCondition := metav1.Condition{
Type: common.ReadyForProvisioningCondition,
Status: common.ConditionFalse,
Message: "",
Reason: "error",
LastTransitionTime: metav1.Time{},
}
return []metav1.Condition{notReadyCondition}
}

View File

@ -0,0 +1,65 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package translator
import (
"fmt"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
)
// podTemplateBufferTranslator translates podTemplateRef buffers specs to fill their status.
type podTemplateBufferTranslator struct {
}
// NewPodTemplateBufferTranslator creates an instance of podTemplateBufferTranslator.
func NewPodTemplateBufferTranslator() *podTemplateBufferTranslator {
return &podTemplateBufferTranslator{}
}
// Translate translates buffers processors into pod capacity.
func (t *podTemplateBufferTranslator) Translate(buffers []*v1.CapacityBuffer) []error {
errors := []error{}
for _, buffer := range buffers {
if isPodTemplateBasedBuffer(buffer) {
podTemplateRef, numberOfPods, err := t.translate(buffer)
if err != nil {
setBufferAsNotReadyForProvisioning(buffer, err.Error())
errors = append(errors, err)
} else {
setBufferAsReadyForProvisioning(buffer, podTemplateRef.Name, numberOfPods)
}
}
}
return errors
}
func (t *podTemplateBufferTranslator) translate(buffer *v1.CapacityBuffer) (*v1.LocalObjectRef, int32, error) {
// Fixed Replicas will be used if both Replicas and Percent are defined
if buffer.Spec.Replicas != nil {
return buffer.Spec.PodTemplateRef, max(1, int32(*buffer.Spec.Replicas)), nil
}
return nil, 0, fmt.Errorf("Failed to translate buffer %v, Replicas should have a value when PodTemplateRef is set", buffer.Name)
}
func isPodTemplateBasedBuffer(buffer *v1.CapacityBuffer) bool {
return buffer.Spec.PodTemplateRef != nil
}
// CleanUp cleans up the translator's internal structures.
func (t *podTemplateBufferTranslator) CleanUp() {
}

View File

@ -0,0 +1,89 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package translator
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/testutil"
)
func TestPodTemplateBufferTranslator(t *testing.T) {
podTemplateBufferTranslator := NewPodTemplateBufferTranslator()
tests := []struct {
name string
buffers []*v1.CapacityBuffer
expectedStatus []*v1.CapacityBufferStatus
expectedNumberOfErrors int
}{
{
name: "Test 1 buffer with pod template ref",
buffers: []*v1.CapacityBuffer{
testutil.GetPodTemplateRefBuffer(&v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, &testutil.SomeNumberOfReplicas),
},
expectedStatus: []*v1.CapacityBufferStatus{
testutil.GetBufferStatus(&v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, &testutil.SomeNumberOfReplicas, testutil.GetConditionReady()),
},
expectedNumberOfErrors: 0,
},
{
name: "Test 2 buffers with pod template ref",
buffers: []*v1.CapacityBuffer{
testutil.GetPodTemplateRefBuffer(&v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, &testutil.SomeNumberOfReplicas),
testutil.GetPodTemplateRefBuffer(&v1.LocalObjectRef{Name: testutil.AnotherPodTemplateRefName}, &testutil.AnotherNumberOfReplicas),
},
expectedStatus: []*v1.CapacityBufferStatus{
testutil.GetBufferStatus(&v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, &testutil.SomeNumberOfReplicas, testutil.GetConditionReady()),
testutil.GetBufferStatus(&v1.LocalObjectRef{Name: testutil.AnotherPodTemplateRefName}, &testutil.AnotherNumberOfReplicas, testutil.GetConditionReady()),
},
expectedNumberOfErrors: 0,
},
{
name: "Test 2 buffers, one with no replicas",
buffers: []*v1.CapacityBuffer{
testutil.GetPodTemplateRefBuffer(&v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, &testutil.SomeNumberOfReplicas),
testutil.GetPodTemplateRefBuffer(&v1.LocalObjectRef{Name: testutil.AnotherPodTemplateRefName}, nil),
},
expectedStatus: []*v1.CapacityBufferStatus{
testutil.GetBufferStatus(&v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, &testutil.SomeNumberOfReplicas, testutil.GetConditionReady()),
testutil.GetBufferStatus(nil, nil, testutil.GetConditionNotReady()),
},
expectedNumberOfErrors: 1,
},
{
name: "Test 2 buffers, one with no pod template ref",
buffers: []*v1.CapacityBuffer{
testutil.GetPodTemplateRefBuffer(&v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, &testutil.SomeNumberOfReplicas),
testutil.GetPodTemplateRefBuffer(nil, &testutil.AnotherNumberOfReplicas),
},
expectedStatus: []*v1.CapacityBufferStatus{
testutil.GetBufferStatus(&v1.LocalObjectRef{Name: testutil.SomePodTemplateRefName}, &testutil.SomeNumberOfReplicas, testutil.GetConditionReady()),
testutil.GetBufferStatus(nil, nil, nil),
},
expectedNumberOfErrors: 0,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
errors := podTemplateBufferTranslator.Translate(test.buffers)
assert.Equal(t, len(errors), test.expectedNumberOfErrors)
assert.ElementsMatch(t, test.expectedStatus, testutil.SanitizeBuffersStatus(test.buffers))
})
}
}

View File

@ -0,0 +1,94 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package translator
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
)
// Translator translates the passed buffers to pod template and number of replicas
type Translator interface {
Translate(buffers []*v1.CapacityBuffer) []error
CleanUp()
}
// combinedTranslator is a list of Translator
type combinedTranslator struct {
translators []Translator
}
// NewCombinedTranslator construct combinedTranslator.
func NewCombinedTranslator(Translators []Translator) *combinedTranslator {
return &combinedTranslator{Translators}
}
// AddTranslator append translator to the list.
func (b *combinedTranslator) AddTranslator(translator Translator) {
b.translators = append(b.translators, translator)
}
// Translate runs sub-translate sequentially, in case more than one translator acted on same buffer
// last translator overrides the others
func (b *combinedTranslator) Translate(buffers []*v1.CapacityBuffer) []error {
var errors []error
for _, translator := range b.translators {
bufferErrors := translator.Translate(buffers)
errors = append(errors, bufferErrors...)
}
return errors
}
// CleanUp cleans up the translator's internal structures.
func (b *combinedTranslator) CleanUp() {
for _, translator := range b.translators {
translator.CleanUp()
}
}
func setBufferAsReadyForProvisioning(buffer *v1.CapacityBuffer, podTemplateName string, replicas int32) {
buffer.Status.PodTemplateRef = &v1.LocalObjectRef{
Name: podTemplateName,
}
buffer.Status.Replicas = &replicas
buffer.Status.PodTemplateGeneration = nil
readyCondition := metav1.Condition{
Type: common.ReadyForProvisioningCondition,
Status: common.ConditionTrue,
Message: "ready",
Reason: "atrtibutesSetSuccessfully",
LastTransitionTime: metav1.Time{Time: time.Now()},
}
buffer.Status.Conditions = []metav1.Condition{readyCondition}
}
func setBufferAsNotReadyForProvisioning(buffer *v1.CapacityBuffer, errorMessage string) {
buffer.Status.PodTemplateRef = nil
buffer.Status.Replicas = nil
buffer.Status.PodTemplateGeneration = nil
notReadyCondition := metav1.Condition{
Type: common.ReadyForProvisioningCondition,
Status: common.ConditionFalse,
Message: errorMessage,
Reason: "error",
LastTransitionTime: metav1.Time{Time: time.Now()},
}
buffer.Status.Conditions = []metav1.Condition{notReadyCondition}
}

View File

@ -0,0 +1,51 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package updater
import (
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
client "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/clientset/versioned"
common "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
)
// StatusUpdater updates the buffer status bassed
type StatusUpdater struct {
client client.Interface
}
// NewStatusUpdater creates an instance of StatusUpdater.
func NewStatusUpdater(client client.Interface) *StatusUpdater {
return &StatusUpdater{
client: client,
}
}
// Update updates the buffer status with pod capacity
func (u *StatusUpdater) Update(buffers []*v1.CapacityBuffer) []error {
var errors []error
for _, buffer := range buffers {
err := common.UpdateBufferStatus(u.client, buffer)
if err != nil {
errors = append(errors, err)
}
}
return errors
}
// CleanUp cleans up the updater's internal structures.
func (u *StatusUpdater) CleanUp() {
}

View File

@ -0,0 +1,94 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package updater
import (
"testing"
ctesting "k8s.io/client-go/testing"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
fakeclientset "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/clientset/versioned/fake"
)
func TestStatusUpdater(t *testing.T) {
exitingBuffer := &v1.CapacityBuffer{
ObjectMeta: metav1.ObjectMeta{
Name: "buffer1",
Namespace: "default",
},
Spec: v1.CapacityBufferSpec{},
}
notExistingBuffer := &v1.CapacityBuffer{
ObjectMeta: metav1.ObjectMeta{
Name: "buffer2",
Namespace: "default",
},
Spec: v1.CapacityBufferSpec{},
}
fakeClient := fakeclientset.NewSimpleClientset(exitingBuffer)
tests := []struct {
name string
buffers []*v1.CapacityBuffer
expectedNumberOfCalls int
expectedNumberOfErrors int
}{
{
name: "Update one buffer",
buffers: []*v1.CapacityBuffer{
exitingBuffer,
},
expectedNumberOfCalls: 1,
expectedNumberOfErrors: 0,
},
{
name: "Update one buffer not existing",
buffers: []*v1.CapacityBuffer{
notExistingBuffer,
},
expectedNumberOfCalls: 1,
expectedNumberOfErrors: 1,
},
{
name: "Update multiple buffers",
buffers: []*v1.CapacityBuffer{
exitingBuffer,
notExistingBuffer,
},
expectedNumberOfCalls: 2,
expectedNumberOfErrors: 1,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
updateCallsCount := 0
fakeClient.Fake.PrependReactor("update", "capacitybuffers",
func(action ctesting.Action) (handled bool, ret runtime.Object, err error) {
updateCallsCount++
return false, nil, nil
},
)
buffersUpdater := NewStatusUpdater(fakeClient)
errors := buffersUpdater.Update(test.buffers)
assert.Equal(t, test.expectedNumberOfErrors, len(errors))
assert.Equal(t, test.expectedNumberOfCalls, updateCallsCount)
})
}
}