Switch VPA checkpoint to use a lister

This commit is contained in:
Adrian Moisey 2025-07-17 08:07:19 +02:00
parent 9bc422016f
commit aae2a010f1
No known key found for this signature in database
GPG Key ID: 41AE4AE32747C7CF
5 changed files with 101 additions and 28 deletions

View File

@ -81,6 +81,7 @@ type ClusterStateFeederFactory struct {
KubeClient kube_client.Interface
MetricsClient metrics.MetricsClient
VpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
VpaCheckpointLister vpa_lister.VerticalPodAutoscalerCheckpointLister
VpaLister vpa_lister.VerticalPodAutoscalerLister
PodLister v1lister.PodLister
OOMObserver oom.Observer
@ -99,6 +100,7 @@ func (m ClusterStateFeederFactory) Make() *clusterStateFeeder {
metricsClient: m.MetricsClient,
oomChan: m.OOMObserver.GetObservedOomsChannel(),
vpaCheckpointClient: m.VpaCheckpointClient,
vpaCheckpointLister: m.VpaCheckpointLister,
vpaLister: m.VpaLister,
clusterState: m.ClusterState,
specClient: spec.NewSpecClient(m.PodLister),
@ -206,6 +208,7 @@ type clusterStateFeeder struct {
metricsClient metrics.MetricsClient
oomChan <-chan oom.OomInfo
vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
vpaCheckpointLister vpa_lister.VerticalPodAutoscalerCheckpointLister
vpaLister vpa_lister.VerticalPodAutoscalerLister
clusterState model.ClusterState
selectorFetcher target.VpaTargetSelectorFetcher
@ -267,25 +270,29 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints(ctx context.Context) {
klog.V(3).InfoS("Initializing VPA from checkpoints")
feeder.LoadVPAs(ctx)
klog.V(3).InfoS("Fetching VPA checkpoints")
checkpointList, err := feeder.vpaCheckpointLister.List(labels.Everything())
if err != nil {
klog.ErrorS(err, "Cannot list VPA checkpoints")
}
namespaces := make(map[string]bool)
for _, v := range feeder.clusterState.VPAs() {
namespaces[v.ID.Namespace] = true
}
for namespace := range namespaces {
klog.V(3).InfoS("Fetching checkpoints", "namespace", namespace)
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
klog.ErrorS(err, "Cannot list VPA checkpoints", "namespace", namespace)
if feeder.shouldIgnoreNamespace(namespace) {
klog.V(3).InfoS("Skipping loading VPA Checkpoints from namespace.", "namespace", namespace, "vpaObjectNamespace", feeder.vpaObjectNamespace, "ignoredNamespaces", feeder.ignoredNamespaces)
continue
}
for _, checkpoint := range checkpointList.Items {
for _, checkpoint := range checkpointList {
klog.V(3).InfoS("Loading checkpoint for VPA", "checkpoint", klog.KRef(checkpoint.Namespace, checkpoint.Spec.VPAObjectName), "container", checkpoint.Spec.ContainerName)
err = feeder.setVpaCheckpoint(&checkpoint)
err = feeder.setVpaCheckpoint(checkpoint)
if err != nil {
klog.ErrorS(err, "Error while loading checkpoint")
}
}
}
}
@ -345,11 +352,12 @@ func (feeder *clusterStateFeeder) shouldIgnoreNamespace(namespace string) bool {
func (feeder *clusterStateFeeder) cleanupCheckpointsForNamespace(ctx context.Context, namespace string, allVPAKeys map[model.VpaID]bool) error {
var err error
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(ctx, metav1.ListOptions{})
checkpointList, err := feeder.vpaCheckpointLister.VerticalPodAutoscalerCheckpoints(namespace).List(labels.Everything())
if err != nil {
return err
}
for _, checkpoint := range checkpointList.Items {
for _, checkpoint := range checkpointList {
vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName}
if !allVPAKeys[vpaID] {
if errFeeder := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(ctx, checkpoint.Name, metav1.DeleteOptions{}); errFeeder != nil {

View File

@ -859,11 +859,12 @@ func TestFilterVPAsIgnoreNamespaces(t *testing.T) {
func TestCanCleanupCheckpoints(t *testing.T) {
_, tctx := ktesting.NewTestContext(t)
client := fake.NewSimpleClientset()
namespace := "testNamespace"
_, err := client.CoreV1().Namespaces().Create(tctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "testNamespace"}}, metav1.CreateOptions{})
_, err := client.CoreV1().Namespaces().Create(tctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
assert.NoError(t, err)
vpaBuilder := test.VerticalPodAutoscaler().WithContainer("container").WithNamespace("testNamespace").WithTargetRef(&autoscalingv1.CrossVersionObjectReference{
vpaBuilder := test.VerticalPodAutoscaler().WithContainer("container").WithNamespace(namespace).WithTargetRef(&autoscalingv1.CrossVersionObjectReference{
Kind: kind,
Name: name1,
APIVersion: apiVersion,
@ -878,22 +879,19 @@ func TestCanCleanupCheckpoints(t *testing.T) {
vpaLister := &test.VerticalPodAutoscalerListerMock{}
vpaLister.On("List").Return(vpas, nil)
checkpoints := &vpa_types.VerticalPodAutoscalerCheckpointList{
Items: []vpa_types.VerticalPodAutoscalerCheckpoint{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testNamespace",
Name: "nonExistentVPA",
},
Spec: vpa_types.VerticalPodAutoscalerCheckpointSpec{
VPAObjectName: "nonExistentVPA",
},
},
vpaCheckPoint := &vpa_types.VerticalPodAutoscalerCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "nonExistentVPA",
},
Spec: vpa_types.VerticalPodAutoscalerCheckpointSpec{
VPAObjectName: "nonExistentVPA",
},
}
vpacheckpoints := []*vpa_types.VerticalPodAutoscalerCheckpoint{vpaCheckPoint}
for _, vpa := range vpas {
checkpoints.Items = append(checkpoints.Items, vpa_types.VerticalPodAutoscalerCheckpoint{
vpacheckpoints = append(vpacheckpoints, &vpa_types.VerticalPodAutoscalerCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Namespace: vpa.Namespace,
Name: vpa.Name,
@ -904,23 +902,29 @@ func TestCanCleanupCheckpoints(t *testing.T) {
})
}
// Create a mock checkpoint client to track deletions
checkpointClient := &fakeautoscalingv1.FakeAutoscalingV1{Fake: &core.Fake{}}
checkpointClient.AddReactor("list", "verticalpodautoscalercheckpoints", func(action core.Action) (bool, runtime.Object, error) {
return true, checkpoints, nil
})
// Track deleted checkpoints
deletedCheckpoints := []string{}
checkpointClient.AddReactor("delete", "verticalpodautoscalercheckpoints", func(action core.Action) (bool, runtime.Object, error) {
deleteAction := action.(core.DeleteAction)
deletedCheckpoints = append(deletedCheckpoints, deleteAction.GetName())
return true, nil, nil
})
// Create namespace lister mock that will return the checkpoint list
checkpointNamespaceLister := &test.VerticalPodAutoscalerCheckPointListerMock{}
checkpointNamespaceLister.On("List").Return(vpacheckpoints, nil)
// Create main checkpoint mock that will return the namespace lister
checkpointLister := &test.VerticalPodAutoscalerCheckPointListerMock{}
checkpointLister.On("VerticalPodAutoscalerCheckpoints", namespace).Return(checkpointNamespaceLister)
feeder := clusterStateFeeder{
coreClient: client.CoreV1(),
vpaLister: vpaLister,
vpaCheckpointClient: checkpointClient,
vpaCheckpointLister: checkpointLister,
clusterState: model.NewClusterState(testGcPeriod),
recommenderName: "default",
}

View File

@ -297,6 +297,7 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm
MetricsClient: input_metrics.NewMetricsClient(source, commonFlag.VpaObjectNamespace, "default-metrics-client"),
VpaCheckpointClient: vpa_clientset.NewForConfigOrDie(config).AutoscalingV1(),
VpaLister: vpa_api_util.NewVpasLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace),
VpaCheckpointLister: vpa_api_util.NewVpaCheckpointLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace),
ClusterState: clusterState,
SelectorFetcher: target.NewVpaTargetSelectorFetcher(config, kubeClient, factory),
MemorySaveMode: *memorySaver,

View File

@ -200,6 +200,36 @@ func (m *VerticalPodAutoscalerListerMock) Get(name string) (*vpa_types.VerticalP
return nil, fmt.Errorf("unimplemented")
}
// VerticalPodAutoscalerCheckPointListerMock is a mock of VerticalPodAutoscalerCheckPointLister
type VerticalPodAutoscalerCheckPointListerMock struct {
mock.Mock
}
// List is a mock implementation of VerticalPodAutoscalerLister.List
func (m *VerticalPodAutoscalerCheckPointListerMock) List(selector labels.Selector) (ret []*vpa_types.VerticalPodAutoscalerCheckpoint, err error) {
args := m.Called()
var returnArg []*vpa_types.VerticalPodAutoscalerCheckpoint
if args.Get(0) != nil {
returnArg = args.Get(0).([]*vpa_types.VerticalPodAutoscalerCheckpoint)
}
return returnArg, args.Error(1)
}
// VerticalPodAutoscalerCheckpoints is a mock implementation of returning a lister for namespace.
func (m *VerticalPodAutoscalerCheckPointListerMock) VerticalPodAutoscalerCheckpoints(namespace string) vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister {
args := m.Called(namespace)
var returnArg vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister
if args.Get(0) != nil {
returnArg = args.Get(0).(vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister)
}
return returnArg
}
// Get is not implemented for this mock
func (m *VerticalPodAutoscalerCheckPointListerMock) Get(name string) (*vpa_types.VerticalPodAutoscalerCheckpoint, error) {
return nil, fmt.Errorf("unimplemented")
}
// VerticalPodAutoscalerV1Beta1ListerMock is a mock of VerticalPodAutoscalerLister or
// VerticalPodAutoscalerNamespaceLister - the crucial List method is the same.
type VerticalPodAutoscalerV1Beta1ListerMock struct {

View File

@ -107,6 +107,36 @@ func NewVpasLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct
return vpaLister
}
// NewVpaCheckpointLister returns VerticalPodAutoscalerCheckpointLister configured to fetch all VPACheckpoint objects from namespace,
// set namespace to k8sapiv1.NamespaceAll to select all namespaces.
// The method blocks until vpaCheckpointLister is initially populated.
func NewVpaCheckpointLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct{}, namespace string) vpa_lister.VerticalPodAutoscalerCheckpointLister {
vpaListWatch := cache.NewListWatchFromClient(vpaClient.AutoscalingV1().RESTClient(), "verticalpodautoscalercheckpoints", namespace, fields.Everything())
informerOptions := cache.InformerOptions{
ObjectType: &vpa_types.VerticalPodAutoscalerCheckpoint{},
ListerWatcher: vpaListWatch,
Handler: &cache.ResourceEventHandlerFuncs{},
ResyncPeriod: 1 * time.Hour,
Indexers: cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
}
store, controller := cache.NewInformerWithOptions(informerOptions)
indexer, ok := store.(cache.Indexer)
if !ok {
klog.ErrorS(nil, "Expected Indexer, but got a Store that does not implement Indexer")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
vpaCheckpointLister := vpa_lister.NewVerticalPodAutoscalerCheckpointLister(indexer)
go controller.Run(stopChannel)
if !cache.WaitForCacheSync(stopChannel, controller.HasSynced) {
klog.ErrorS(nil, "Failed to sync VPA checkpoint cache during initialization")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
} else {
klog.InfoS("Initial VPA checkpoint synced successfully")
}
return vpaCheckpointLister
}
// PodMatchesVPA returns true iff the vpaWithSelector matches the Pod.
func PodMatchesVPA(pod *core.Pod, vpaWithSelector *VpaWithSelector) bool {
return PodLabelsMatchVPA(pod.Namespace, labels.Set(pod.GetLabels()), vpaWithSelector.Vpa.Namespace, vpaWithSelector.Selector)