Merge pull request #5810 from mohamedawnallah/unitTestKarmadaSearchController

pkg/search: unit test controller
This commit is contained in:
karmada-bot 2024-11-15 17:37:53 +08:00 committed by GitHub
commit 3acc14c0f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 772 additions and 2 deletions

View File

@ -35,6 +35,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
@ -349,6 +350,13 @@ func (c *Controller) getRegistryBackendHandler(cluster string, matchedRegistries
return handler, nil
}
var controlPlaneClientBuilder = func(restConfig *rest.Config) client.Client {
return gclient.NewForConfigOrDie(restConfig)
}
var clusterDynamicClientBuilder = func(cluster string, controlPlaneClient client.Client) (*util.DynamicClusterClient, error) {
return util.NewClusterDynamicClientSet(cluster, controlPlaneClient)
}
// doCacheCluster processes the resourceRegistry object
// TODO: update status
func (c *Controller) doCacheCluster(cluster string) error {
@ -386,9 +394,9 @@ func (c *Controller) doCacheCluster(cluster string) error {
// STEP2: added/updated cluster, builds an informer manager for a specific cluster.
if !c.InformerManager.IsManagerExist(cluster) {
klog.Info("Try to build informer manager for cluster ", cluster)
controlPlaneClient := gclient.NewForConfigOrDie(c.restConfig)
controlPlaneClient := controlPlaneClientBuilder(c.restConfig)
clusterDynamicClient, err := util.NewClusterDynamicClientSet(cluster, controlPlaneClient)
clusterDynamicClient, err := clusterDynamicClientBuilder(cluster, controlPlaneClient)
if err != nil {
return err
}

View File

@ -0,0 +1,762 @@
/*
Copyright 2024 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package search
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakedynamic "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1"
versioned "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
fakekarmadaclient "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
"github.com/karmada-io/karmada/pkg/search/backendstore"
"github.com/karmada-io/karmada/pkg/util"
)
func TestNewKarmadaSearchController(t *testing.T) {
tests := []struct {
name string
restConfig *rest.Config
factory informerfactory.SharedInformerFactory
restMapper meta.RESTMapper
client versioned.Interface
prep func(*informerfactory.SharedInformerFactory, versioned.Interface) error
wantErr bool
errMsg string
}{
{
name: "NewKarmadaSearchController",
restConfig: &rest.Config{},
restMapper: meta.NewDefaultRESTMapper(nil),
client: fakekarmadaclient.NewSimpleClientset(),
factory: informerfactory.NewSharedInformerFactory(fakekarmadaclient.NewSimpleClientset(), 0),
prep: func(*informerfactory.SharedInformerFactory, versioned.Interface) error { return nil },
wantErr: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if err := test.prep(&test.factory, test.client); err != nil {
t.Fatalf("failed to prep test environment before creating new controller, got: %v", err)
}
_, err := NewController(test.restConfig, test.factory, test.restMapper)
if err == nil && test.wantErr {
t.Fatal("expected an error, but got none")
}
if err != nil && !test.wantErr {
t.Errorf("unexpected error, got: %v", err)
}
if err != nil && test.wantErr && !strings.Contains(err.Error(), test.errMsg) {
t.Errorf("expected error message %s to be in %s", test.errMsg, err.Error())
}
})
}
}
func TestAddClusterEventHandler(t *testing.T) {
tests := []struct {
name string
restConfig *rest.Config
client *fakekarmadaclient.Clientset
restMapper meta.RESTMapper
stopCh chan struct{}
prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error)
verify func(*fakekarmadaclient.Clientset, *Controller) error
}{
{
name: "AddAllEventHandlers_TriggerAddClusterEvent_ClusterAddedToWorkQueue",
restConfig: &rest.Config{},
client: fakekarmadaclient.NewSimpleClientset(),
restMapper: meta.NewDefaultRESTMapper(nil),
stopCh: make(chan struct{}),
prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) {
factory := informerfactory.NewSharedInformerFactory(clientConnector, 0)
controller, err := createController(restConfig, factory, restMapper)
return controller, factory, err
},
verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller) error {
var (
clusterName, resourceVersion = "test-cluster", "1000"
apiEndpoint, labels = "10.0.0.1", map[string]string{}
)
// Wait a bit to allow addCluster
// background thread to complete its execution.
if err := upsertCluster(clientConnector, labels, apiEndpoint, clusterName, resourceVersion); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
return nil
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper)
if err != nil {
t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err)
}
// Add event handlers and start the informer to watch for changes.
controller.addAllEventHandlers()
informer.Start(test.stopCh)
defer close(test.stopCh)
if err := test.verify(test.client, controller); err != nil {
t.Errorf("failed to verify controller, got: %v", err)
}
})
}
}
func TestUpdateClusterEventHandler(t *testing.T) {
tests := []struct {
name string
restConfig *rest.Config
client *fakekarmadaclient.Clientset
restMapper meta.RESTMapper
stopCh chan struct{}
prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error)
verify func(*fakekarmadaclient.Clientset, *Controller) error
}{
{
name: "AddAllEventHandlers_TriggerUpdateClusterEvent_UpdatedClusterAddedToWorkQueue",
restConfig: &rest.Config{},
client: fakekarmadaclient.NewSimpleClientset(),
restMapper: meta.NewDefaultRESTMapper(nil),
stopCh: make(chan struct{}),
prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) {
factory := informerfactory.NewSharedInformerFactory(clientConnector, 0)
controller, err := createController(restConfig, factory, restMapper)
return controller, factory, err
},
verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller) error {
var (
clusterName, resourceVersion, updatedResourceVerison = "test-cluster", "1000", "1001"
apiEndpoint, oldLabels, newLabels = "10.0.0.1", map[string]string{"status": "old"}, map[string]string{"status": "new"}
)
// Wait a bit to allow addCluster
// background thread to complete its execution.
if err := upsertCluster(clientConnector, oldLabels, apiEndpoint, clusterName, resourceVersion); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
// Wait a bit to allow updateCluster
// background thread to complete its execution.
if err := upsertCluster(clientConnector, newLabels, apiEndpoint, clusterName, updatedResourceVerison); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
return nil
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper)
if err != nil {
t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err)
}
// Add event handlers and start the informer to watch for changes.
controller.addAllEventHandlers()
informer.Start(test.stopCh)
defer close(test.stopCh)
if err := test.verify(test.client, controller); err != nil {
t.Errorf("failed to verify controller, got: %v", err)
}
})
}
}
func TestDeleteClusterEventHandler(t *testing.T) {
tests := []struct {
name string
restConfig *rest.Config
client *fakekarmadaclient.Clientset
controlPlaneClient client.WithWatch
restMapper meta.RESTMapper
stopCh chan struct{}
prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error)
verify func(*fakekarmadaclient.Clientset, *Controller, client.Client) error
}{
{
name: "AddAllEventHandlers_TriggerDeleteClusterEvent_DeletedClusterAddedToWorkQueue",
restConfig: &rest.Config{},
client: fakekarmadaclient.NewSimpleClientset(),
controlPlaneClient: fake.NewFakeClient(),
restMapper: meta.NewDefaultRESTMapper(nil),
stopCh: make(chan struct{}),
prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) {
factory := informerfactory.NewSharedInformerFactory(clientConnector, 0)
controller, err := createController(restConfig, factory, restMapper)
return controller, factory, err
},
verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller, controlPlaneClient client.Client) error {
var (
registryName, clusterName = "test-registry", "test-cluster"
resourceVersion, apiEndpoint = "1000", "10.0.0.1"
labels = map[string]string{}
resourceSelectors = []searchv1alpha1.ResourceSelector{
{
APIVersion: "apps/v1",
Kind: "Deployment",
},
}
)
if err := clusterv1alpha1.Install(scheme.Scheme); err != nil {
return fmt.Errorf("failed to install scheme: %w", err)
}
if err := upsertClusterControllerRuntime(controlPlaneClient, labels, clusterName, apiEndpoint); err != nil {
return err
}
controlPlaneClientBuilder = func(*rest.Config) client.Client {
return controlPlaneClient
}
clusterDynamicClientBuilder = func(string, client.Client) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
DynamicClientSet: fakedynamic.NewSimpleDynamicClient(scheme.Scheme),
ClusterName: clusterName,
}, nil
}
// Wait a bit to allow for addCluster background
// thread to complete its execution.
if err := upsertCluster(clientConnector, labels, apiEndpoint, clusterName, resourceVersion); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
// Wait a bit to allow for addResourceRegistry background
// thread to complete its execution.
if err := upsertResourceRegistry(clientConnector, resourceSelectors, registryName, resourceVersion, []string{clusterName}); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
// Wait a bit to allow for deleteCluster on the controller
// background thread to complete its execution.
if err := deleteCluster(clientConnector, clusterName); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
// Verify no backend store for this deleted cluster.
if backend := backendstore.GetBackend(clusterName); backend != nil {
return fmt.Errorf("expected backend store for cluster %s to be deleted, but got: %v", clusterName, backend)
}
return nil
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper)
if err != nil {
t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err)
}
// Add event handlers and start the informer to watch for changes.
controller.addAllEventHandlers()
informer.Start(test.stopCh)
defer close(test.stopCh)
if err := test.verify(test.client, controller, test.controlPlaneClient); err != nil {
t.Errorf("failed to verify controller, got: %v", err)
}
})
}
}
func TestAddResourceRegistryEventHandler(t *testing.T) {
tests := []struct {
name string
restConfig *rest.Config
client *fakekarmadaclient.Clientset
controlPlaneClient client.WithWatch
restMapper meta.RESTMapper
stopCh chan struct{}
prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error)
verify func(*fakekarmadaclient.Clientset, *Controller, client.Client) error
}{
{
name: "AddAllEventHandlers_TriggerAddResourceRegistryEvent_ResourceRegistryAddedToWorkQueue",
restConfig: &rest.Config{},
client: fakekarmadaclient.NewSimpleClientset(),
controlPlaneClient: fake.NewFakeClient(),
restMapper: meta.NewDefaultRESTMapper(nil),
stopCh: make(chan struct{}),
prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) {
factory := informerfactory.NewSharedInformerFactory(clientConnector, 0)
controller, err := createController(restConfig, factory, restMapper)
return controller, factory, err
},
verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller, controlPlaneClient client.Client) error {
var (
registryName, clusterName = "test-registry", "test-cluster"
resourceVersion, apiEndpoint = "1000", "10.0.0.1"
labels = map[string]string{}
resourceSelectors = []searchv1alpha1.ResourceSelector{
{
APIVersion: "apps/v1",
Kind: "Deployment",
},
}
)
controlPlaneClientBuilder = func(*rest.Config) client.Client {
return controlPlaneClient
}
clusterDynamicClientBuilder = func(string, client.Client) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
DynamicClientSet: fakedynamic.NewSimpleDynamicClient(scheme.Scheme),
ClusterName: clusterName,
}, nil
}
// Wait a bit to allow addCluster background
// thread to complete its execution.
if err := upsertCluster(clientConnector, labels, apiEndpoint, clusterName, resourceVersion); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
// Wait a bit to allow addResourceRegistry background
// thread to complete its execution.
if err := upsertResourceRegistry(clientConnector, resourceSelectors, registryName, resourceVersion, []string{clusterName}); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
return nil
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper)
if err != nil {
t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err)
}
// Add event handlers and start the informer to watch for changes.
controller.addAllEventHandlers()
informer.Start(test.stopCh)
defer close(test.stopCh)
if err := test.verify(test.client, controller, test.controlPlaneClient); err != nil {
t.Errorf("failed to verify controller, got: %v", err)
}
})
}
}
func TestUpdateResourceRegistryEventHandler(t *testing.T) {
tests := []struct {
name string
restConfig *rest.Config
client *fakekarmadaclient.Clientset
restMapper meta.RESTMapper
stopCh chan struct{}
prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error)
verify func(*fakekarmadaclient.Clientset, *Controller) error
}{
{
name: "AddAllEventHandlers_TriggerUpdateResourceRegistryEvent_UpdatedResourceRegistryAddedToWorkQueue",
restConfig: &rest.Config{},
client: fakekarmadaclient.NewSimpleClientset(),
restMapper: meta.NewDefaultRESTMapper(nil),
stopCh: make(chan struct{}),
prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) {
factory := informerfactory.NewSharedInformerFactory(clientConnector, 0)
controller, err := createController(restConfig, factory, restMapper)
return controller, factory, err
},
verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller) error {
var (
registryName, clusterName, resourceVersion = "test-registry", "test-cluster", "1000"
apiEndpoint, labels = "10.0.0.1", map[string]string{}
resourceSelectors = []searchv1alpha1.ResourceSelector{
{
APIVersion: "apps/v1",
Kind: "Deployment",
},
}
resourceSelectorsUpdated = []searchv1alpha1.ResourceSelector{
{
APIVersion: "apps/v1",
Kind: "Deployment",
},
{
APIVersion: "v1",
Kind: "Pod",
},
}
)
// Wait a bit to allow addCluster background thread
// to complete its execution.
if err := upsertCluster(clientConnector, labels, apiEndpoint, clusterName, resourceVersion); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
// Wait a bit to allow addResourceRegistry background thread
// to complete its execution.
if err := upsertResourceRegistry(clientConnector, resourceSelectors, registryName, resourceVersion, []string{clusterName}); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
// Wait a bit to allow updateResourceRegistry background thread
// to complete its execution.
if err := upsertResourceRegistry(clientConnector, resourceSelectorsUpdated, registryName, resourceVersion, []string{clusterName}); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
return nil
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper)
if err != nil {
t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err)
}
// Add event handlers and start the informer to watch for changes.
controller.addAllEventHandlers()
informer.Start(test.stopCh)
defer close(test.stopCh)
if err := test.verify(test.client, controller); err != nil {
t.Errorf("failed to verify controller, got: %v", err)
}
})
}
}
func TestDeleteResourceRegistryEventHandler(t *testing.T) {
tests := []struct {
name string
restConfig *rest.Config
client *fakekarmadaclient.Clientset
controlPlaneClient client.WithWatch
restMapper meta.RESTMapper
stopCh chan struct{}
prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error)
verify func(*fakekarmadaclient.Clientset, *Controller, client.Client) error
}{
{
name: "AddAllEventHandlers_TriggerDeleteResourceRegistryEvent_DeletedResourceRegistryAddedToWorkQueue",
restConfig: &rest.Config{},
client: fakekarmadaclient.NewSimpleClientset(),
controlPlaneClient: fake.NewFakeClient(),
restMapper: meta.NewDefaultRESTMapper(nil),
stopCh: make(chan struct{}),
prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) {
factory := informerfactory.NewSharedInformerFactory(clientConnector, 0)
controller, err := createController(restConfig, factory, restMapper)
return controller, factory, err
},
verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller, controlPlaneClient client.Client) error {
var (
registryName, clusterName = "test-registry", "test-cluster"
resourceVersion, apiEndpoint = "1000", "10.0.0.1"
labels = map[string]string{}
resourceSelectors = []searchv1alpha1.ResourceSelector{
{
APIVersion: "apps/v1",
Kind: "Deployment",
},
}
)
controlPlaneClientBuilder = func(*rest.Config) client.Client {
return controlPlaneClient
}
clusterDynamicClientBuilder = func(string, client.Client) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
DynamicClientSet: fakedynamic.NewSimpleDynamicClient(scheme.Scheme),
ClusterName: clusterName,
}, nil
}
// Wait a bit to allow addCluster
// background thread to complete its execution.
if err := upsertCluster(clientConnector, labels, apiEndpoint, clusterName, resourceVersion); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
// Wait a bit to allow addResourceRegistry
// background thread to complete its execution.
if err := upsertResourceRegistry(clientConnector, resourceSelectors, registryName, resourceVersion, []string{clusterName}); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
// Wait a bit to allow deleteResourceRegistry
// background thread to complete its execution.
if err := deleteResourceRegistry(clientConnector, registryName); err != nil {
return err
}
time.Sleep(time.Millisecond * 250)
if err := cacheNextWrapper(controller); err != nil {
return err
}
return nil
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper)
if err != nil {
t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err)
}
// Add event handlers and start the informer to watch for changes.
controller.addAllEventHandlers()
informer.Start(test.stopCh)
defer close(test.stopCh)
if err := test.verify(test.client, controller, test.controlPlaneClient); err != nil {
t.Errorf("failed to verify controller, got: %v", err)
}
})
}
}
// createController initializes a new Controller instance using the provided
// Kubernetes REST configuration, shared informer factory, and REST mapper.
// It returns the created Controller or an error if initialization fails.
func createController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, restMapper meta.RESTMapper) (*Controller, error) {
newController, err := NewController(restConfig, factory, restMapper)
if err != nil {
return nil, fmt.Errorf("failed to create new controller, got: %v", err)
}
return newController, nil
}
// upsertCluster creates or updates a Cluster resource in the Kubernetes API using the provided
// client, labels, API endpoint, cluster name, and resource version.
func upsertCluster(client *fakekarmadaclient.Clientset, labels map[string]string, apiEndpoint, clusterName, resourceVersion string) error {
cluster := &clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
ResourceVersion: resourceVersion,
Labels: labels,
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: apiEndpoint,
},
Status: clusterv1alpha1.ClusterStatus{
Conditions: []metav1.Condition{
{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
},
},
}
// Try to create the Cluster.
_, err := client.ClusterV1alpha1().Clusters().Create(context.TODO(), cluster, metav1.CreateOptions{})
if err == nil {
// Successfully created the Cluster.
return nil
}
// If the Cluster already exists, update it.
if apierrors.IsAlreadyExists(err) {
_, updateErr := client.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
if updateErr != nil {
return fmt.Errorf("failed to update cluster: %v", updateErr)
}
return nil
}
// Return any other errors encountered.
return err
}
// upsertClusterControllerRuntime creates or updates a Cluster resource using the controller-runtime
// client. The function takes labels, API endpoint, and cluster name to define the Cluster.
func upsertClusterControllerRuntime(controlPlaneClient client.Client, labels map[string]string, clusterName, apiEndpoint string) error {
cluster := &clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Labels: labels,
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: apiEndpoint,
},
Status: clusterv1alpha1.ClusterStatus{
Conditions: []metav1.Condition{
{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
},
},
}
// Try to create the Cluster.
err := controlPlaneClient.Create(context.TODO(), cluster)
if err == nil {
// Successfully created the Cluster.
return nil
}
// If the Cluster already exists, update it.
if apierrors.IsAlreadyExists(err) {
if updateErr := controlPlaneClient.Update(context.TODO(), cluster); updateErr != nil {
return fmt.Errorf("failed to update cluster: %v", updateErr)
}
return nil
}
return err
}
// upsertResourceRegistry creates or updates a ResourceRegistry resource in the Kubernetes API.
// It uses the provided client, resource selectors, registry name, resource version, and target cluster names.
func upsertResourceRegistry(client *fakekarmadaclient.Clientset, resourceSelectors []searchv1alpha1.ResourceSelector, registryName, resourceVersion string, clusterNames []string) error {
resourceRegistry := &searchv1alpha1.ResourceRegistry{
ObjectMeta: metav1.ObjectMeta{
Name: registryName,
ResourceVersion: resourceVersion,
},
Spec: searchv1alpha1.ResourceRegistrySpec{
TargetCluster: policyv1alpha1.ClusterAffinity{
ClusterNames: clusterNames,
},
ResourceSelectors: resourceSelectors,
},
}
// Try to create the ResourceRegistry.
_, err := client.SearchV1alpha1().ResourceRegistries().Create(context.TODO(), resourceRegistry, metav1.CreateOptions{})
if err == nil {
// Successfully created the ResourceRegistry.
return nil
}
// If the ResourceRegistry already exists, update it.
if apierrors.IsAlreadyExists(err) {
_, updateErr := client.SearchV1alpha1().ResourceRegistries().Update(context.TODO(), resourceRegistry, metav1.UpdateOptions{})
if updateErr != nil {
return fmt.Errorf("failed to update ResourceRegistry: %v", updateErr)
}
return nil
}
// Return any other errors encountered.
return err
}
// deleteCluster deletes a Cluster resource by name from the Kubernetes API using the provided client.
func deleteCluster(client *fakekarmadaclient.Clientset, clusterName string) error {
if err := client.ClusterV1alpha1().Clusters().Delete(context.TODO(), clusterName, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("failed to delete cluster, got: %v", err)
}
return nil
}
// deleteResourceRegistry deletes a ResourceRegistry resource by name from the Kubernetes API
// using the provided client.
func deleteResourceRegistry(client *fakekarmadaclient.Clientset, resourceRegistryName string) error {
if err := client.SearchV1alpha1().ResourceRegistries().Delete(context.TODO(), resourceRegistryName, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("failed to delete resource registry, got: %v", err)
}
return nil
}
// cacheNextWrapper calls the cacheNext method on the provided Controller instance.
// If the cacheNext method fails, it returns an error.
func cacheNextWrapper(controller *Controller) error {
if !controller.cacheNext() {
return errors.New("failed to cache next object")
}
return nil
}