feat: support mcs with native svc name

Signed-off-by: jwcesign <jwcesign@gmail.com>
This commit is contained in:
jwcesign 2023-11-28 23:21:20 +08:00
parent 01b086d2e1
commit 5932615cba
18 changed files with 1147 additions and 64 deletions

View File

@ -75,6 +75,20 @@ webhooks:
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3
- name: multiclusterservice.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["networking.karmada.io"]
apiVersions: ["*"]
resources: ["multiclusterservices"]
scope: "Namespaced"
clientConfig:
url: https://karmada-webhook.karmada-system.svc:443/mutate-multiclusterservice
caBundle: {{caBundle}}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration

View File

@ -46,6 +46,7 @@ import (
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
"github.com/karmada-io/karmada/pkg/controllers/execution"
"github.com/karmada-io/karmada/pkg/controllers/mcs"
"github.com/karmada-io/karmada/pkg/controllers/multiclusterservice"
"github.com/karmada-io/karmada/pkg/controllers/status"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
"github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient"
@ -74,7 +75,7 @@ func NewAgentCommand(ctx context.Context) *cobra.Command {
cmd := &cobra.Command{
Use: "karmada-agent",
Long: `The karmada-agent is the agent of member clusters. It can register a specific cluster to the Karmada control
plane and sync manifests from the Karmada control plane to the member cluster. In addition, it also syncs the status of member
plane and sync manifests from the Karmada control plane to the member cluster. In addition, it also syncs the status of member
cluster and manifests to the Karmada control plane.`,
RunE: func(cmd *cobra.Command, args []string) error {
// validate options
@ -127,6 +128,7 @@ func init() {
controllers["workStatus"] = startWorkStatusController
controllers["serviceExport"] = startServiceExportController
controllers["certRotation"] = startCertRotationController
controllers["endpointsliceCollect"] = startEndpointSliceCollectController
}
func run(ctx context.Context, opts *options.Options) error {
@ -387,6 +389,25 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error)
return true, nil
}
func startEndpointSliceCollectController(ctx controllerscontext.Context) (enabled bool, err error) {
opts := ctx.Opts
endpointSliceCollectController := &multiclusterservice.EndpointSliceCollectController{
Client: ctx.Mgr.GetClient(),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForEndpointSliceCollectControllerOnAgent(opts.ClusterName),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
}
endpointSliceCollectController.RunWorkQueue()
if err := endpointSliceCollectController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
return true, nil
}
func startCertRotationController(ctx controllerscontext.Context) (bool, error) {
certRotationController := &certificate.CertRotationController{
Client: ctx.Mgr.GetClient(),

View File

@ -96,7 +96,7 @@ func NewControllerManagerCommand(ctx context.Context) *cobra.Command {
cmd := &cobra.Command{
Use: "karmada-controller-manager",
Long: `The karmada-controller-manager runs various controllers.
The controllers watch Karmada objects and then talk to the underlying clusters' API servers
The controllers watch Karmada objects and then talk to the underlying clusters' API servers
to create regular Kubernetes resources.`,
RunE: func(cmd *cobra.Command, args []string) error {
// validate options
@ -226,6 +226,8 @@ func init() {
controllers["cronFederatedHorizontalPodAutoscaler"] = startCronFederatedHorizontalPodAutoscalerController
controllers["hpaReplicasSyncer"] = startHPAReplicasSyncerController
controllers["multiclusterservice"] = startMCSController
controllers["endpointsliceCollect"] = startEndpointSliceCollectController
controllers["endpointsliceDispatch"] = startEndpointSliceDispatchController
}
func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
@ -461,6 +463,38 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
return true, nil
}
func startEndpointSliceCollectController(ctx controllerscontext.Context) (enabled bool, err error) {
opts := ctx.Opts
endpointSliceCollectController := &multiclusterservice.EndpointSliceCollectController{
Client: ctx.Mgr.GetClient(),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
}
endpointSliceCollectController.RunWorkQueue()
if err := endpointSliceCollectController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
return true, nil
}
func startEndpointSliceDispatchController(ctx controllerscontext.Context) (enabled bool, err error) {
endpointSliceSyncController := &multiclusterservice.EndpointsliceDispatchController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(multiclusterservice.EndpointsliceDispatchControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
}
if err := endpointSliceSyncController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
return true, nil
}
func startEndpointSliceController(ctx controllerscontext.Context) (enabled bool, err error) {
endpointSliceController := &mcs.EndpointSliceController{
Client: ctx.Mgr.GetClient(),

View File

@ -155,6 +155,7 @@ func Run(ctx context.Context, opts *options.Options) error {
hookServer.Register("/validate-resourceinterpretercustomization", &webhook.Admission{Handler: &resourceinterpretercustomization.ValidatingAdmission{Client: hookManager.GetClient(), Decoder: decoder}})
hookServer.Register("/validate-multiclusteringress", &webhook.Admission{Handler: &multiclusteringress.ValidatingAdmission{Decoder: decoder}})
hookServer.Register("/validate-multiclusterservice", &webhook.Admission{Handler: &multiclusterservice.ValidatingAdmission{Decoder: decoder}})
hookServer.Register("/mutate-multiclusterservice", &webhook.Admission{Handler: &multiclusterservice.MutatingAdmission{Decoder: decoder}})
hookServer.Register("/mutate-federatedhpa", &webhook.Admission{Handler: &federatedhpa.MutatingAdmission{Decoder: decoder}})
hookServer.Register("/validate-resourcedeletionprotection", &webhook.Admission{Handler: &resourcedeletionprotection.ValidatingAdmission{Decoder: decoder}})
hookServer.WebhookMux().Handle("/readyz/", http.StripPrefix("/readyz/", &healthz.Handler{}))

View File

@ -32,6 +32,8 @@ const (
ResourceNamespaceScopedMultiClusterService = true
// MCSServiceAppliedConditionType is indicates the condition type of mcs service applied.
MCSServiceAppliedConditionType = "ServiceApplied"
// EndpointSliceDispatched indicates whether the EndpointSlice is dispatched to consumption clusters
EndpointSliceDispatched string = "EndpointSliceDispatched"
)
// +genclient

View File

@ -1,9 +1,12 @@
/*
Copyright 2023 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -99,7 +99,7 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr
return controllerruntime.Result{}, nil
}
if !isWorkContains(work.Spec.Workload.Manifests, serviceExportGVK) {
if !helper.IsWorkContains(work.Spec.Workload.Manifests, serviceExportGVK) {
return controllerruntime.Result{}, nil
}
@ -123,23 +123,6 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr
return c.buildResourceInformers(cluster)
}
// isWorkContains checks if the target resource exists in a work.spec.workload.manifests.
func isWorkContains(manifests []workv1alpha1.Manifest, targetResource schema.GroupVersionKind) bool {
for index := range manifests {
workload := &unstructured.Unstructured{}
err := workload.UnmarshalJSON(manifests[index].Raw)
if err != nil {
klog.Errorf("Failed to unmarshal work manifests index %d, error is: %v", index, err)
continue
}
if targetResource == workload.GroupVersionKind() {
return true
}
}
return false
}
// SetupWithManager creates a controller and register to controller manager.
func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).Complete(c)

View File

@ -0,0 +1,469 @@
/*
Copyright 2023 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multiclusterservice
import (
"context"
"fmt"
"reflect"
"sync"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
)
// EndpointSliceCollectControllerName is the controller name that will be used when reporting events.
const EndpointSliceCollectControllerName = "endpointslice-collect-controller"
type EndpointSliceCollectController struct {
client.Client
RESTMapper meta.RESTMapper
StopChan <-chan struct{}
InformerManager genericmanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
// eventHandlers holds the handlers which used to handle events reported from member clusters.
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
// "member1": instance of ResourceEventHandler
eventHandlers sync.Map
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
ClusterCacheSyncTimeout metav1.Duration
}
var (
endpointSliceGVR = discoveryv1.SchemeGroupVersion.WithResource("endpointslices")
multiClusterServiceGVK = networkingv1alpha1.SchemeGroupVersion.WithKind("MultiClusterService")
)
// Reconcile performs a full reconciliation for the object referred to by the Request.
func (c *EndpointSliceCollectController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling Work %s", req.NamespacedName.String())
work := &workv1alpha1.Work{}
if err := c.Client.Get(ctx, req.NamespacedName, work); err != nil {
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
if !helper.IsWorkContains(work.Spec.Workload.Manifests, multiClusterServiceGVK) {
return controllerruntime.Result{}, nil
}
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name)
return controllerruntime.Result{Requeue: true}, err
}
if !work.DeletionTimestamp.IsZero() {
if err := c.cleanWorkWithMCSDelete(work); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
if err = c.buildResourceInformers(ctx, work, clusterName); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
if err = c.collectTargetEndpointSlice(work, clusterName); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).
For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).Complete(c)
}
func (c *EndpointSliceCollectController) cleanWorkWithMCSDelete(work *workv1alpha1.Work) error {
workList := &workv1alpha1.WorkList{}
if err := c.List(context.TODO(), workList, &client.ListOptions{
Namespace: work.Namespace,
LabelSelector: labels.SelectorFromSet(labels.Set{
util.ServiceNameLabel: util.GetLabelValue(work.Labels, util.ServiceNameLabel),
util.ServiceNamespaceLabel: util.GetLabelValue(work.Labels, util.ServiceNamespaceLabel),
}),
}); err != nil {
klog.Errorf("Failed to list workList reported by work(MultiClusterService)(%s/%s): %v", work.Namespace, work.Name, err)
return err
}
var errs []error
for _, work := range workList.Items {
if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) {
continue
}
if err := c.Delete(context.TODO(), work.DeepCopy()); err != nil {
klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err)
errs = append(errs, err)
}
}
if err := utilerrors.NewAggregate(errs); err != nil {
return err
}
if controllerutil.RemoveFinalizer(work, util.MCSEndpointSliceCollectControllerFinalizer) {
if err := c.Client.Update(context.Background(), work); err != nil {
klog.Errorf("Failed to remove finalizer %s for work %s/%s: %v",
util.MCSEndpointSliceCollectControllerFinalizer, work.Namespace, work.Name, err)
return err
}
}
return nil
}
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *EndpointSliceCollectController) RunWorkQueue() {
workerOptions := util.Options{
Name: "endpointslice-collect",
KeyFunc: nil,
ReconcileFunc: c.syncEndpointSlice,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker.Run(c.WorkerNumber, c.StopChan)
}
func (c *EndpointSliceCollectController) syncEndpointSlice(key util.QueueKey) error {
fedKey, ok := key.(keys.FederatedKey)
if !ok {
klog.Errorf("Failed to sync endpointslice as invalid key: %v", key)
return fmt.Errorf("invalid key")
}
klog.V(4).Infof("Begin to sync %s %s.", fedKey.Kind, fedKey.NamespaceKey())
if err := c.handleEndpointSliceEvent(fedKey); err != nil {
klog.Errorf("Failed to handle endpointSlice(%s) event, Error: %v",
fedKey.NamespaceKey(), err)
return err
}
return nil
}
func (c *EndpointSliceCollectController) buildResourceInformers(ctx context.Context, work *workv1alpha1.Work, clusterName string) error {
cluster, err := util.GetCluster(c.Client, clusterName)
if err != nil {
klog.Errorf("Failed to get the given member cluster %s", clusterName)
return err
}
if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop sync endpointslice for cluster(%s) as cluster not ready.", cluster.Name)
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
}
if err := c.registerInformersAndStart(cluster); err != nil {
klog.Errorf("Failed to register informer for Cluster %s. Error: %v.", cluster.Name, err)
return err
}
if controllerutil.AddFinalizer(work, util.MCSEndpointSliceCollectControllerFinalizer) {
if err := c.Client.Update(ctx, work); err != nil {
klog.Errorf("Failed to add finalizer %s for work %s/%s: %v", util.MCSEndpointSliceCollectControllerFinalizer, work.Namespace, work.Name, err)
return err
}
}
return nil
}
// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr
// and start it.
func (c *EndpointSliceCollectController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
}
singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0)
}
gvrTargets := []schema.GroupVersionResource{
endpointSliceGVR,
}
allSynced := true
for _, gvr := range gvrTargets {
if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler(cluster.Name)) {
allSynced = false
singleClusterInformerManager.ForResource(gvr, c.getEventHandler(cluster.Name))
}
}
if allSynced {
return nil
}
c.InformerManager.Start(cluster.Name)
if err := func() error {
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, c.ClusterCacheSyncTimeout.Duration)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for _, gvr := range gvrTargets {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return err
}
return nil
}
// getEventHandler return callback function that knows how to handle events from the member cluster.
func (c *EndpointSliceCollectController) getEventHandler(clusterName string) cache.ResourceEventHandler {
if value, exists := c.eventHandlers.Load(clusterName); exists {
return value.(cache.ResourceEventHandler)
}
eventHandler := fedinformer.NewHandlerOnEvents(c.genHandlerAddFunc(clusterName), c.genHandlerUpdateFunc(clusterName),
c.genHandlerDeleteFunc(clusterName))
c.eventHandlers.Store(clusterName, eventHandler)
return eventHandler
}
func (c *EndpointSliceCollectController) genHandlerAddFunc(clusterName string) func(obj interface{}) {
return func(obj interface{}) {
curObj := obj.(runtime.Object)
key, err := keys.FederatedKeyFunc(clusterName, curObj)
if err != nil {
klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind())
return
}
c.worker.Add(key)
}
}
func (c *EndpointSliceCollectController) genHandlerUpdateFunc(clusterName string) func(oldObj, newObj interface{}) {
return func(oldObj, newObj interface{}) {
curObj := newObj.(runtime.Object)
if !reflect.DeepEqual(oldObj, newObj) {
key, err := keys.FederatedKeyFunc(clusterName, curObj)
if err != nil {
klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind())
return
}
c.worker.Add(key)
}
}
}
func (c *EndpointSliceCollectController) genHandlerDeleteFunc(clusterName string) func(obj interface{}) {
return func(obj interface{}) {
if deleted, ok := obj.(cache.DeletedFinalStateUnknown); ok {
// This object might be stale but ok for our current usage.
obj = deleted.Obj
if obj == nil {
return
}
}
oldObj := obj.(runtime.Object)
key, err := keys.FederatedKeyFunc(clusterName, oldObj)
if err != nil {
klog.Warningf("Failed to generate key for obj: %s", oldObj.GetObjectKind().GroupVersionKind())
return
}
c.worker.Add(key)
}
}
// handleEndpointSliceEvent syncs EndPointSlice objects to control-plane according to EndpointSlice event.
// For EndpointSlice create or update event, reports the EndpointSlice when referencing service has been exported.
// For EndpointSlice delete event, cleanup the previously reported EndpointSlice.
func (c *EndpointSliceCollectController) handleEndpointSliceEvent(endpointSliceKey keys.FederatedKey) error {
endpointSliceObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, endpointSliceKey)
if err != nil {
if apierrors.IsNotFound(err) {
return cleanupWorkWithEndpointSliceDelete(c.Client, endpointSliceKey)
}
return err
}
if util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceControllerLabelValue {
return nil
}
workList := &workv1alpha1.WorkList{}
if err := c.Client.List(context.TODO(), workList, &client.ListOptions{
Namespace: names.GenerateExecutionSpaceName(endpointSliceKey.Cluster),
LabelSelector: labels.SelectorFromSet(labels.Set{
util.ServiceNamespaceLabel: endpointSliceKey.Namespace,
util.ServiceNameLabel: util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelServiceName),
})}); err != nil {
klog.Errorf("Failed to list workList reported by endpointSlice(%s/%s), error: %v", endpointSliceKey.Namespace, endpointSliceKey.Name, err)
return err
}
mcsExists := false
for _, work := range workList.Items {
if helper.IsWorkContains(work.Spec.Workload.Manifests, multiClusterServiceGVK) {
mcsExists = true
break
}
}
if !mcsExists {
return nil
}
if err = c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(endpointSliceKey.Cluster, endpointSliceObj); err != nil {
klog.Errorf("Failed to handle endpointSlice(%s) event, Error: %v",
endpointSliceKey.NamespaceKey(), err)
return err
}
return nil
}
func (c *EndpointSliceCollectController) collectTargetEndpointSlice(work *workv1alpha1.Work, clusterName string) error {
manager := c.InformerManager.GetSingleClusterManager(clusterName)
if manager == nil {
err := fmt.Errorf("failed to get informer manager for cluster %s", clusterName)
klog.Errorf("%v", err)
return err
}
svcNamespace := util.GetLabelValue(work.Labels, util.ServiceNamespaceLabel)
svcName := util.GetLabelValue(work.Labels, util.ServiceNameLabel)
selector := labels.SelectorFromSet(labels.Set{
discoveryv1.LabelServiceName: svcName,
})
epsList, err := manager.Lister(discoveryv1.SchemeGroupVersion.WithResource("endpointslices")).ByNamespace(svcNamespace).List(selector)
if err != nil {
klog.Errorf("Failed to list EndpointSlice for Service(%s/%s) in cluster(%s), Error: %v", svcNamespace, svcName, clusterName, err)
return err
}
for _, epsObj := range epsList {
eps := &discoveryv1.EndpointSlice{}
if err = helper.ConvertToTypedObject(epsObj, eps); err != nil {
klog.Errorf("Failed to convert object to EndpointSlice, error: %v", err)
return err
}
if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceControllerLabelValue {
continue
}
epsUnstructured, err := helper.ToUnstructured(eps)
if err != nil {
klog.Errorf("Failed to convert EndpointSlice %s/%s to unstructured, error: %v", eps.GetNamespace(), eps.GetName(), err)
return err
}
if err := c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(clusterName, epsUnstructured); err != nil {
return err
}
}
return nil
}
// reportEndpointSliceWithEndpointSliceCreateOrUpdate reports the EndpointSlice when referencing service has been exported.
func (c *EndpointSliceCollectController) reportEndpointSliceWithEndpointSliceCreateOrUpdate(clusterName string, endpointSlice *unstructured.Unstructured) error {
if err := reportEndpointSlice(c.Client, endpointSlice, clusterName); err != nil {
return fmt.Errorf("failed to report EndpointSlice(%s/%s) from cluster(%s) to control-plane",
endpointSlice.GetNamespace(), endpointSlice.GetName(), clusterName)
}
return nil
}
// reportEndpointSlice report EndPointSlice objects to control-plane.
func reportEndpointSlice(c client.Client, endpointSlice *unstructured.Unstructured, clusterName string) error {
executionSpace := names.GenerateExecutionSpaceName(clusterName)
workMeta := metav1.ObjectMeta{
// Karmada will synchronize this work to other cluster namespaces and add the cluster name to prevent conflicts.
Name: names.GenerateMCSWorkName(endpointSlice.GetKind(), endpointSlice.GetName(), endpointSlice.GetNamespace(), clusterName),
Namespace: executionSpace,
Labels: map[string]string{
util.ServiceNamespaceLabel: endpointSlice.GetNamespace(),
util.ServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName],
// indicate the Work should be not propagated since it's collected resource.
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
},
}
if err := helper.CreateOrUpdateWork(c, workMeta, endpointSlice); err != nil {
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
return err
}
return nil
}
func cleanupWorkWithEndpointSliceDelete(c client.Client, endpointSliceKey keys.FederatedKey) error {
executionSpace := names.GenerateExecutionSpaceName(endpointSliceKey.Cluster)
workNamespaceKey := types.NamespacedName{
Namespace: executionSpace,
Name: names.GenerateMCSWorkName(endpointSliceKey.Kind, endpointSliceKey.Name, endpointSliceKey.Namespace, endpointSliceKey.Cluster),
}
work := &workv1alpha1.Work{}
if err := c.Get(context.TODO(), workNamespaceKey, work); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
klog.Errorf("Failed to get work(%s) in executionSpace(%s): %v", workNamespaceKey.String(), executionSpace, err)
return err
}
if err := c.Delete(context.TODO(), work); err != nil {
klog.Errorf("Failed to delete work(%s): %v", workNamespaceKey.String(), err)
return err
}
return nil
}

View File

@ -0,0 +1,280 @@
/*
Copyright 2023 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multiclusterservice
import (
"context"
"strconv"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
)
// EndpointsliceDispatchControllerName is the controller name that will be used when reporting events.
const EndpointsliceDispatchControllerName = "endpointslice-dispatch-controller"
var (
endpointSliceGVK = discoveryv1.SchemeGroupVersion.WithKind("EndpointSlice")
)
type EndpointsliceDispatchController struct {
client.Client
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
InformerManager genericmanager.MultiClusterInformerManager
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling Work %s", req.NamespacedName.String())
work := &workv1alpha1.Work{}
if err := c.Client.Get(ctx, req.NamespacedName, work); err != nil {
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) {
return controllerruntime.Result{}, nil
}
if !work.DeletionTimestamp.IsZero() {
if err := c.cleanupEndpointSliceFromConsumerClusters(ctx, work); err != nil {
klog.Errorf("Failed to cleanup EndpointSlice from consumer clusters for work %s/%s", work.Namespace, work.Name)
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
mcsName := util.GetLabelValue(work.Labels, util.ServiceNameLabel)
mcsNS := util.GetLabelValue(work.Labels, util.ServiceNamespaceLabel)
mcs := &networkingv1alpha1.MultiClusterService{}
if err := c.Client.Get(ctx, types.NamespacedName{Namespace: mcsNS, Name: mcsName}, mcs); err != nil {
if apierrors.IsNotFound(err) {
klog.Warningf("MultiClusterService %s/%s is not found", mcsNS, mcsName)
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
var err error
defer func() {
if err != nil {
_ = c.updateEndpointSliceSynced(mcs, metav1.ConditionFalse, "EndpointSliceSyncFailed", err.Error())
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonDispatchEndpointSliceFailed, err.Error())
return
}
_ = c.updateEndpointSliceSynced(mcs, metav1.ConditionTrue, "EndpointSliceSyncSucceed", "EndpointSlice are synced successfully")
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonDispatchEndpointSliceSucceed, "EndpointSlice are synced successfully")
}()
if err = c.syncEndpointSlice(ctx, work.DeepCopy(), mcs); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
func (c *EndpointsliceDispatchController) updateEndpointSliceSynced(mcs *networkingv1alpha1.MultiClusterService, status metav1.ConditionStatus, reason, message string) error {
EndpointSliceCollected := metav1.Condition{
Type: networkingv1alpha1.EndpointSliceDispatched,
Status: status,
Reason: reason,
Message: message,
LastTransitionTime: metav1.Now(),
}
return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
meta.SetStatusCondition(&mcs.Status.Conditions, EndpointSliceCollected)
updateErr := c.Status().Update(context.TODO(), mcs)
if updateErr == nil {
return nil
}
updated := &networkingv1alpha1.MultiClusterService{}
if err = c.Get(context.TODO(), client.ObjectKey{Namespace: mcs.Namespace, Name: mcs.Name}, updated); err == nil {
mcs = updated
} else {
klog.Errorf("Failed to get updated MultiClusterService %s/%s: %v", mcs.Namespace, mcs.Name, err)
}
return updateErr
})
}
// SetupWithManager creates a controller and register to controller manager.
func (c *EndpointsliceDispatchController) SetupWithManager(mgr controllerruntime.Manager) error {
workPredicateFun := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return util.GetLabelValue(createEvent.Object.GetLabels(), util.ServiceNameLabel) != ""
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
return util.GetLabelValue(updateEvent.ObjectNew.GetLabels(), util.ServiceNameLabel) != ""
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return util.GetLabelValue(deleteEvent.Object.GetLabels(), util.ServiceNameLabel) != ""
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
}
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(workPredicateFun)).Complete(c)
}
func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context, work *workv1alpha1.Work, mcs *networkingv1alpha1.MultiClusterService) error {
epsSourceCluster, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get EndpointSlice source cluster name for work %s/%s", work.Namespace, work.Name)
return err
}
consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...)
if len(consumptionClusters) == 0 {
consumptionClusters, err = util.GetClusterSet(c.Client)
if err != nil {
klog.Errorf("Failed to get cluster set, error is: %v", err)
return err
}
}
for clusterName := range consumptionClusters {
if clusterName == epsSourceCluster {
continue
}
// It couldn't happen here
if len(work.Spec.Workload.Manifests) == 0 {
continue
}
// There should be only one manifest in the work, let's use the first one.
manifest := work.Spec.Workload.Manifests[0]
unstructuredObj := &unstructured.Unstructured{}
if err := unstructuredObj.UnmarshalJSON(manifest.Raw); err != nil {
klog.Errorf("Failed to unmarshal work manifest, error is: %v", err)
return err
}
endpointSlice := &discoveryv1.EndpointSlice{}
if err := helper.ConvertToTypedObject(unstructuredObj, endpointSlice); err != nil {
klog.Errorf("Failed to convert unstructured object to typed object, error is: %v", err)
return err
}
// Use this name to avoid naming conflicts and locate the EPS source cluster.
endpointSlice.Name = epsSourceCluster + "-" + endpointSlice.Name
clusterNamespace := names.GenerateExecutionSpaceName(clusterName)
endpointSlice.Labels = map[string]string{
discoveryv1.LabelServiceName: mcs.Name,
workv1alpha1.WorkNamespaceLabel: clusterNamespace,
workv1alpha1.WorkNameLabel: work.Name,
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
discoveryv1.LabelManagedBy: util.EndpointSliceControllerLabelValue,
}
endpointSlice.Annotations = map[string]string{
// This annotation is used to identify the source cluster of EndpointSlice and whether the eps are the newest version
util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster,
util.EndPointSliceProvisionGenerationAnnotation: strconv.FormatInt(endpointSlice.Generation, 10),
}
workMeta := metav1.ObjectMeta{
Name: work.Name,
Namespace: clusterNamespace,
Finalizers: []string{util.ExecutionControllerFinalizer},
Annotations: map[string]string{
util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster,
},
Labels: map[string]string{
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
},
}
unstructuredEPS, err := helper.ToUnstructured(endpointSlice)
if err != nil {
klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err)
return err
}
if err := helper.CreateOrUpdateWork(c.Client, workMeta, unstructuredEPS); err != nil {
klog.Errorf("Failed to sync EndpointSlice %s/%s from %s to cluster %s:%v",
work.GetNamespace(), work.GetName(), epsSourceCluster, clusterName, err)
return err
}
}
if controllerutil.AddFinalizer(work, util.MCSEndpointSliceDispatchControllerFinalizer) {
if err := c.Client.Update(ctx, work); err != nil {
klog.Errorf("Failed to add finalizer %s for work %s/%s:%v", util.MCSEndpointSliceDispatchControllerFinalizer, work.Namespace, work.Name, err)
return err
}
}
return nil
}
func (c *EndpointsliceDispatchController) cleanupEndpointSliceFromConsumerClusters(ctx context.Context, work *workv1alpha1.Work) error {
// TBD: There may be a better way without listing all works.
workList := &workv1alpha1.WorkList{}
err := c.Client.List(ctx, workList)
if err != nil {
klog.Errorf("Failed to list works serror: %v", err)
return err
}
epsSourceCluster, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get EndpointSlice provision cluster name for work %s/%s", work.Namespace, work.Name)
return err
}
for _, item := range workList.Items {
if item.Name != work.Name || util.GetAnnotationValue(item.Annotations, util.EndpointSliceProvisionClusterAnnotation) != epsSourceCluster {
continue
}
if err := c.Client.Delete(ctx, item.DeepCopy()); err != nil {
return err
}
}
if controllerutil.RemoveFinalizer(work, util.MCSEndpointSliceDispatchControllerFinalizer) {
if err := c.Client.Update(ctx, work); err != nil {
klog.Errorf("Failed to remove %s finalizer for work %s/%s:%v", util.MCSEndpointSliceDispatchControllerFinalizer, work.Namespace, work.Name, err)
return err
}
}
return nil
}

View File

@ -19,9 +19,7 @@ package multiclusterservice
import (
"context"
"fmt"
"strings"
"github.com/google/uuid"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -65,6 +63,10 @@ type MCSController struct {
RateLimiterOptions ratelimiterflag.Options
}
var (
serviceGVK = corev1.SchemeGroupVersion.WithKind("Service")
)
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
@ -110,6 +112,12 @@ func (c *MCSController) handleMCSDelete(ctx context.Context, mcs *networkingv1al
return controllerruntime.Result{}, err
}
if err := c.deleteMultiClusterServiceWork(mcs, true); err != nil {
c.EventRecorder.Event(mcs, corev1.EventTypeWarning, events.EventReasonSyncServiceWorkFailed,
fmt.Sprintf("failed to delete MultiClusterService work :%v", err))
return controllerruntime.Result{}, err
}
finalizersUpdated := controllerutil.RemoveFinalizer(mcs, util.MCSControllerFinalizer)
if finalizersUpdated {
err := c.Client.Update(ctx, mcs)
@ -125,27 +133,31 @@ func (c *MCSController) handleMCSDelete(ctx context.Context, mcs *networkingv1al
}
func (c *MCSController) deleteServiceWork(mcs *networkingv1alpha1.MultiClusterService, retainClusters sets.Set[string]) error {
mcsID, err := c.getMultiClusterServiceID(mcs)
if err != nil {
klog.Errorf("Get MultiClusterService(%s/%s) ID error:%v", mcs.Namespace, mcs.Name, err)
return err
}
workList, err := helper.GetWorksByLabelsSet(c, labels.Set{networkingv1alpha1.MultiClusterServicePermanentIDLabel: mcsID})
mcsID := util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel)
workList, err := helper.GetWorksByLabelsSet(c, labels.Set{
networkingv1alpha1.MultiClusterServicePermanentIDLabel: mcsID,
})
if err != nil {
klog.ErrorS(err, "failed to get work", "namespace", mcs.Namespace, "name", mcs.Name)
return err
}
for index := range workList.Items {
clusterName := strings.TrimPrefix(workList.Items[index].Namespace, names.ExecutionSpacePrefix)
for _, work := range workList.Items {
if !helper.IsWorkContains(work.Spec.Workload.Manifests, serviceGVK) {
continue
}
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", work.Namespace, work.Name)
continue
}
if retainClusters.Has(clusterName) {
continue
}
if err = c.Client.Delete(context.TODO(), &workList.Items[index]); err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Error while updating work(%s/%s) deletion timestamp: %s",
workList.Items[index].Namespace, workList.Items[index].Name, err)
if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Error while deleting work(%s/%s) deletion timestamp: %s",
work.Namespace, work.Name, err)
return err
}
}
@ -154,13 +166,58 @@ func (c *MCSController) deleteServiceWork(mcs *networkingv1alpha1.MultiClusterSe
return nil
}
func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.MultiClusterService, deleteAll bool) error {
mcsID := util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel)
workList, err := helper.GetWorksByLabelsSet(c, labels.Set{
networkingv1alpha1.MultiClusterServicePermanentIDLabel: mcsID,
})
if err != nil {
klog.Errorf("Failed to list work by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return err
}
provisionClusters, err := helper.GetProvisionClusters(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get provision clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return err
}
for _, work := range workList.Items {
if !helper.IsWorkContains(work.Spec.Workload.Manifests, multiClusterServiceGVK) {
continue
}
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s:%v", work.Namespace, work.Name, work)
continue
}
if !deleteAll && provisionClusters.Has(clusterName) {
continue
}
if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Error while updating work(%s/%s) deletion timestamp: %s",
work.Namespace, work.Name, err)
return err
}
}
klog.V(4).InfoS("Success to delete MultiClusterService work", "namespace", mcs.Namespace, "name", mcs.Name)
return nil
}
func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
klog.V(4).InfoS("Begin to handle MultiClusterService create or update event",
"namespace", mcs.Namespace, "name", mcs.Name)
// 1. if mcs not contain CrossCluster type, delete service work if needed
if !helper.MultiClusterServiceCrossClusterEnabled(mcs) {
return c.deleteServiceWork(mcs, sets.New[string]())
if err := c.deleteServiceWork(mcs, sets.New[string]()); err != nil {
return err
}
if err := c.deleteMultiClusterServiceWork(mcs, true); err != nil {
return err
}
}
// 2. add finalizer if needed
@ -173,7 +230,12 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ
}
}
// 3. make sure service exist
// 3. Generate the MCS work
if err := c.ensureMultiClusterServiceWork(ctx, mcs); err != nil {
return err
}
// 4. make sure service exist
svc := &corev1.Service{}
err := c.Client.Get(ctx, types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}, svc)
if err != nil && !apierrors.IsNotFound(err) {
@ -181,7 +243,7 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ
return err
}
// 4. if service not exist, delete service work if needed
// 5. if service not exist, delete service work if needed
if apierrors.IsNotFound(err) {
delErr := c.deleteServiceWork(mcs, sets.New[string]())
if delErr != nil {
@ -192,13 +254,18 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ
return err
}
// 5. if service exist, create or update corresponding work in clusters
// 6. if service exist, create or update corresponding work in clusters
syncClusters, err := c.syncSVCWorkToClusters(ctx, mcs, svc)
if err != nil {
return err
}
// 6. delete service work not in need sync clusters
// 7. delete MultiClusterService work not in provision clusters
if err = c.deleteMultiClusterServiceWork(mcs, false); err != nil {
return err
}
// 8. delete service work not in need sync clusters
if err = c.deleteServiceWork(mcs, syncClusters); err != nil {
return err
}
@ -207,6 +274,42 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ
return nil
}
func (c *MCSController) ensureMultiClusterServiceWork(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
provisionCluster, err := helper.GetProvisionClusters(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get provision clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return err
}
for clusterName := range provisionCluster {
workMeta := metav1.ObjectMeta{
Name: names.GenerateMCSWorkName(mcs.Kind, mcs.Name, mcs.Namespace, clusterName),
Namespace: names.GenerateExecutionSpaceName(clusterName),
Labels: map[string]string{
// We add this id in mutating webhook, let's just use it
networkingv1alpha1.MultiClusterServicePermanentIDLabel: util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel),
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.ServiceNamespaceLabel: mcs.Namespace,
util.ServiceNameLabel: mcs.Name,
},
}
mcsObj, err := helper.ToUnstructured(mcs)
if err != nil {
klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err)
return err
}
if err = helper.CreateOrUpdateWork(c, workMeta, mcsObj); err != nil {
klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v",
mcs.Namespace, mcs.Name, clusterName, err)
return err
}
}
return nil
}
func (c *MCSController) syncSVCWorkToClusters(
ctx context.Context,
mcs *networkingv1alpha1.MultiClusterService,
@ -235,20 +338,15 @@ func (c *MCSController) syncSVCWorkToClusters(
return syncClusters, err
}
mcsID, err := c.getMultiClusterServiceID(mcs)
if err != nil {
klog.Errorf("Get MultiClusterService(%s/%s) ID error:%v", mcs.Namespace, mcs.Name, err)
return syncClusters, err
}
var errs []error
for clusterName := range syncClusters {
workMeta := metav1.ObjectMeta{
Name: names.GenerateWorkName(svc.Kind, svc.Name, clusterName+"/"+svc.Namespace),
Name: names.GenerateMCSWorkName(svc.Kind, svc.Name, svc.Namespace, clusterName),
Namespace: names.GenerateExecutionSpaceName(clusterName),
Finalizers: []string{util.ExecutionControllerFinalizer},
Labels: map[string]string{
networkingv1alpha1.MultiClusterServicePermanentIDLabel: mcsID,
// We add this id in mutating webhook, let's just use it
networkingv1alpha1.MultiClusterServicePermanentIDLabel: util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel),
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
},
}
@ -294,19 +392,6 @@ func (c *MCSController) updateMCSStatus(mcs *networkingv1alpha1.MultiClusterServ
})
}
func (c *MCSController) getMultiClusterServiceID(mcs *networkingv1alpha1.MultiClusterService) (string, error) {
id := util.GetLabelValue(mcs.GetLabels(), networkingv1alpha1.MultiClusterServicePermanentIDLabel)
if id == "" {
id = uuid.New().String()
mcs.Labels = util.DedupeAndMergeLabels(mcs.Labels, map[string]string{networkingv1alpha1.MultiClusterServicePermanentIDLabel: id})
if err := c.Client.Update(context.TODO(), mcs); err != nil {
return id, err
}
}
return id, nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error {
mcsPredicateFunc := predicate.Funcs{

View File

@ -130,10 +130,14 @@ const (
EventReasonSyncDerivedServiceFailed = "SyncDerivedServiceFailed"
)
// Define events for MultiClusterService objects and their associated resources.
// Define events for MultiClusterService objects with CrossCluster type.
const (
// EventReasonSyncServiceWorkFailed is indicates that sync service work failed.
EventReasonSyncServiceWorkFailed string = "SyncServiceWorkFailed"
// EventReasonSyncServiceWorkSucceed is indicates that sync service work succeed.
EventReasonSyncServiceWorkSucceed string = "SyncServiceWorkSucceed"
// EventReasonDispatchEndpointSliceFailed indicates that dispatch endpointslice failed.
EventReasonDispatchEndpointSliceFailed = "DispatchEndpointSliceFailed"
// EventReasonDispatchEndpointSliceSucceed indicates that dispatch endpointslice succeed.
EventReasonDispatchEndpointSliceSucceed = "DispatchEndpointSliceSucceed"
)

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
@ -102,6 +103,19 @@ func GetCluster(hostClient client.Client, clusterName string) (*clusterv1alpha1.
return cluster, nil
}
// GetClusterWithKubeClient returns the given Clusters name set
func GetClusterSet(hostClient client.Client) (sets.Set[string], error) {
clusterList := &clusterv1alpha1.ClusterList{}
if err := hostClient.List(context.Background(), clusterList); err != nil {
return nil, err
}
clusterSet := sets.New[string]()
for _, cluster := range clusterList.Items {
clusterSet.Insert(cluster.Name)
}
return clusterSet, nil
}
// CreateClusterObject create cluster object in karmada control plane
func CreateClusterObject(controlPlaneClient karmadaclientset.Interface, clusterObj *clusterv1alpha1.Cluster) (*clusterv1alpha1.Cluster, error) {
cluster, exist, err := GetClusterWithKarmadaClient(controlPlaneClient, clusterObj.Name)

View File

@ -25,6 +25,12 @@ const (
// ServiceNameLabel is added to work object, which is report by member cluster, to specify service name associated with EndpointSlice.
ServiceNameLabel = "endpointslice.karmada.io/name"
// EndPointSliceProvisionClusterAnnotation is added to EndpointSlice to specify the cluster which cluster provides the EndpointSlice.
EndpointSliceProvisionClusterAnnotation = "endpointslice.karmada.io/provision-cluster"
// EndPointSliceProvisionGenerationAnnotation is added to EndpointSlice to specify the generation of EndpointSlice of the provision cluster.
EndPointSliceProvisionGenerationAnnotation = "endpointslice.karmada.io/endpointslice-generation"
// PropagationInstruction is used to mark a resource(like Work) propagation instruction.
// Valid values includes:
// - suppressed: indicates that the resource should not be propagated.
@ -45,6 +51,9 @@ const (
// ManagedByKarmadaLabelValue indicates that resources are managed by karmada controllers.
ManagedByKarmadaLabelValue = "true"
// EndpointSliceControllerLabelValue indicates the endpointSlice are controlled by karmada-mcs-endpointslice-controller
EndpointSliceControllerLabelValue = "karmada-mcs-endpointslice-controller"
// RetainReplicasLabel is a reserved label to indicate whether the replicas should be retained. e.g:
// resourcetemplate.karmada.io/retain-replicas: true // with value `true` indicates retain
// resourcetemplate.karmada.io/retain-replicas: false // with value `false` and others, indicates not retain
@ -86,6 +95,12 @@ const (
// before ResourceBinding itself is deleted.
BindingControllerFinalizer = "karmada.io/binding-controller"
// MCSEndpointSliceCollectControllerFinalizer is added to mcs to ensure related Works in provision clusters are deleted
MCSEndpointSliceCollectControllerFinalizer = "karmada.io/mcs-endpointslice-collect-controller"
// MCSEndpointSliceDispatchControllerFinalizer is added to mcs to ensure related Works in consumption clusters are deleted
MCSEndpointSliceDispatchControllerFinalizer = "karmada.io/mcs-endpointslice-dispatch-controller"
// ClusterResourceBindingControllerFinalizer is added to ClusterResourceBinding to ensure related Works are deleted
// before ClusterResourceBinding itself is deleted.
ClusterResourceBindingControllerFinalizer = "karmada.io/cluster-resource-binding-controller"

View File

@ -23,12 +23,14 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
)
// CreateOrUpdateEndpointSlice creates a EndpointSlice object if not exist, or updates if it already exists.
@ -94,12 +96,25 @@ func DeleteEndpointSlice(c client.Client, selector labels.Set) error {
return errors.NewAggregate(errs)
}
// MultiClusterServiceCrossClusterEnabled checks weather the MultiClusterService contains CrossCluster type.
func MultiClusterServiceCrossClusterEnabled(mcs *networkingv1alpha1.MultiClusterService) bool {
for _, t := range mcs.Spec.Types {
if t == networkingv1alpha1.ExposureTypeCrossCluster {
for _, svcType := range mcs.Spec.Types {
if svcType == networkingv1alpha1.ExposureTypeCrossCluster {
return true
}
}
return false
}
func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
provisionClusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
if len(provisionClusters) == 0 {
var err error
provisionClusters, err = util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, err
}
}
return provisionClusters, nil
}

View File

@ -114,6 +114,40 @@ func NewPredicateForServiceExportController(mgr controllerruntime.Manager) predi
}
}
// NewPredicateForEndpointSliceCollectController generates an event filter function for EndpointSliceCollectController running by karmada-controller-manager.
func NewPredicateForEndpointSliceCollectController(mgr controllerruntime.Manager) predicate.Funcs {
predFunc := func(eventType string, object client.Object) bool {
obj := object.(*workv1alpha1.Work)
clusterName, err := names.GetClusterName(obj.GetNamespace())
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName())
return false
}
clusterObj, err := util.GetCluster(mgr.GetClient(), clusterName)
if err != nil {
klog.Errorf("Failed to get the given member cluster %s", clusterName)
return false
}
return clusterObj.Spec.SyncMode == clusterv1alpha1.Push
}
return predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return predFunc("create", createEvent.Object)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
return predFunc("update", updateEvent.ObjectNew) || predFunc("update", updateEvent.ObjectOld)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return predFunc("delete", deleteEvent.Object)
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
}
}
// NewClusterPredicateOnAgent generates an event filter function with Cluster for karmada-agent.
func NewClusterPredicateOnAgent(clusterName string) predicate.Funcs {
return predicate.Funcs{
@ -166,6 +200,34 @@ func NewPredicateForServiceExportControllerOnAgent(curClusterName string) predic
}
}
// NewPredicateForEndpointSliceCollectControllerOnAgent generates an event filter function for EndpointSliceCollectController running by karmada-agent.
func NewPredicateForEndpointSliceCollectControllerOnAgent(curClusterName string) predicate.Funcs {
predFunc := func(eventType string, object client.Object) bool {
obj := object.(*workv1alpha1.Work)
clusterName, err := names.GetClusterName(obj.GetNamespace())
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName())
return false
}
return clusterName == curClusterName
}
return predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return predFunc("create", createEvent.Object)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
return predFunc("update", updateEvent.ObjectNew) || predFunc("update", updateEvent.ObjectOld)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return predFunc("delete", deleteEvent.Object)
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
}
}
// NewExecutionPredicateOnAgent generates the event filter function to skip events that the controllers are uninterested.
// Used by controllers:
// - execution controller working in agent

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
@ -178,3 +179,20 @@ func GenEventRef(resource *unstructured.Unstructured) (*corev1.ObjectReference,
return ref, nil
}
// IsWorkContains checks if the target resource exists in a work.spec.workload.manifests.
func IsWorkContains(manifests []workv1alpha1.Manifest, targetResource schema.GroupVersionKind) bool {
for index := range manifests {
workload := &unstructured.Unstructured{}
err := workload.UnmarshalJSON(manifests[index].Raw)
if err != nil {
klog.Errorf("Failed to unmarshal work manifests index %d, error is: %v", index, err)
continue
}
if targetResource == workload.GroupVersionKind() {
return true
}
}
return false
}

View File

@ -89,6 +89,10 @@ func GenerateBindingReferenceKey(namespace, name string) string {
return rand.SafeEncodeString(fmt.Sprint(hash.Sum32()))
}
func GenerateMCSWorkName(kind, name, namespace, cluster string) string {
return GenerateWorkName(kind, name, cluster+"/"+namespace)
}
// GenerateWorkName will generate work name by its name and the hash of its namespace, kind and name.
func GenerateWorkName(kind, name, namespace string) string {
// The name of resources, like 'Role'/'ClusterRole'/'RoleBinding'/'ClusterRoleBinding',

View File

@ -0,0 +1,59 @@
/*
Copyright 2023 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multiclusterservice
import (
"context"
"encoding/json"
"net/http"
"github.com/google/uuid"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
)
// MutatingAdmission mutates API request if necessary.
type MutatingAdmission struct {
Decoder *admission.Decoder
}
// Check if our MutatingAdmission implements necessary interface
var _ admission.Handler = &MutatingAdmission{}
// Handle yields a response to an AdmissionRequest.
func (a *MutatingAdmission) Handle(_ context.Context, req admission.Request) admission.Response {
mcs := &networkingv1alpha1.MultiClusterService{}
err := a.Decoder.Decode(req, mcs)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel) == "" {
id := uuid.New().String()
mcs.Labels = util.DedupeAndMergeLabels(mcs.Labels, map[string]string{networkingv1alpha1.MultiClusterServicePermanentIDLabel: id})
}
marshaledBytes, err := json.Marshal(mcs)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.Object.Raw, marshaledBytes)
}