Merge pull request #494 from XiShanYongYe-Chang/mcs-controller

Collect endpointslices from executionNamespace to serviceexport namespace
This commit is contained in:
karmada-bot 2021-07-07 11:24:25 +08:00 committed by GitHub
commit 7a4a5e5f62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 218 additions and 4 deletions

View File

@ -138,7 +138,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
serviceExportController := &mcs.ServiceExportController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.ControllerName),
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
RESTMapper: mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: stopChan,

View File

@ -240,7 +240,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
serviceExportController := &mcs.ServiceExportController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.ControllerName),
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
RESTMapper: mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: stopChan,
@ -252,4 +252,12 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
if err := serviceExportController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup ServiceExport controller: %v", err)
}
endpointSliceController := &mcs.EndpointSliceController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName),
}
if err := endpointSliceController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup EndpointSlice controller: %v", err)
}
}

View File

@ -0,0 +1,120 @@
package mcs
import (
"context"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
)
// EndpointSliceControllerName is the controller name that will be used when reporting events.
const EndpointSliceControllerName = "endpointslice-controller"
// EndpointSliceController is to collect EndpointSlice which reported by member cluster from executionNamespace to serviceexport namespace.
type EndpointSliceController struct {
client.Client
EventRecorder record.EventRecorder
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
func (c *EndpointSliceController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling Work %s.", req.NamespacedName.String())
work := &workv1alpha1.Work{}
if err := c.Client.Get(context.TODO(), req.NamespacedName, work); err != nil {
if errors.IsNotFound(err) {
// Cleanup derived EndpointSlices after work has been removed.
return helper.DeleteEndpointSlice(c.Client, labels.Set{
util.WorkNamespaceLabel: req.Namespace,
util.WorkNameLabel: req.Name,
})
}
return controllerruntime.Result{Requeue: true}, err
}
if !work.DeletionTimestamp.IsZero() {
return controllerruntime.Result{}, nil
}
return c.collectEndpointSliceFromWork(work)
}
// SetupWithManager creates a controller and register to controller manager.
func (c *EndpointSliceController) SetupWithManager(mgr controllerruntime.Manager) error {
serviceImportPredicateFun := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return util.GetLabelValue(createEvent.Object.GetLabels(), util.ServiceNameLabel) != ""
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
return util.GetLabelValue(updateEvent.ObjectNew.GetLabels(), util.ServiceNameLabel) != ""
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return util.GetLabelValue(deleteEvent.Object.GetLabels(), util.ServiceNameLabel) != ""
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
}
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(serviceImportPredicateFun).Complete(c)
}
func (c *EndpointSliceController) collectEndpointSliceFromWork(work *workv1alpha1.Work) (controllerruntime.Result, error) {
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name)
return controllerruntime.Result{Requeue: true}, err
}
for _, manifest := range work.Spec.Workload.Manifests {
unstructObj := &unstructured.Unstructured{}
if err := unstructObj.UnmarshalJSON(manifest.Raw); err != nil {
klog.Errorf("Failed to unmarshal workload, error is: %v", err)
return controllerruntime.Result{Requeue: true}, err
}
endpointSlice := &discoveryv1beta1.EndpointSlice{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), endpointSlice); err != nil {
klog.Errorf("failed to convert unstructured to typed object: %v", err)
return controllerruntime.Result{Requeue: true}, err
}
desiredEndpointSlice := deriveEndpointSlice(endpointSlice, clusterName)
desiredEndpointSlice.Labels = map[string]string{
util.WorkNamespaceLabel: work.Namespace,
util.WorkNameLabel: work.Name,
discoveryv1beta1.LabelServiceName: names.GenerateDerivedServiceName(work.Labels[util.ServiceNameLabel]),
}
if err = helper.CreateOrUpdateEndpointSlice(c.Client, desiredEndpointSlice); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
}
return controllerruntime.Result{}, nil
}
func deriveEndpointSlice(original *discoveryv1beta1.EndpointSlice, migratedFrom string) *discoveryv1beta1.EndpointSlice {
endpointSlice := original.DeepCopy()
endpointSlice.ObjectMeta = metav1.ObjectMeta{
Namespace: original.Namespace,
Name: names.GenerateEndpointSliceName(original.GetName(), migratedFrom),
}
return endpointSlice
}

View File

@ -34,8 +34,8 @@ import (
"github.com/karmada-io/karmada/pkg/util/names"
)
// ControllerName is the controller name that will be used when reporting events.
const ControllerName = "service-export-controller"
// ServiceExportControllerName is the controller name that will be used when reporting events.
const ServiceExportControllerName = "service-export-controller"
// ServiceExportController is to sync ServiceExport and report EndpointSlices of exported service to control-plane.
type ServiceExportController struct {

70
pkg/util/helper/mcs.go Normal file
View File

@ -0,0 +1,70 @@
package helper
import (
"context"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
// CreateOrUpdateEndpointSlice creates a EndpointSlice object if not exist, or updates if it already exist.
func CreateOrUpdateEndpointSlice(client client.Client, endpointSlice *discoveryv1beta1.EndpointSlice) error {
runtimeObject := endpointSlice.DeepCopy()
operationResult, err := controllerutil.CreateOrUpdate(context.TODO(), client, runtimeObject, func() error {
runtimeObject.AddressType = endpointSlice.AddressType
runtimeObject.Endpoints = endpointSlice.Endpoints
runtimeObject.Labels = endpointSlice.Labels
runtimeObject.Ports = endpointSlice.Ports
return nil
})
if err != nil {
klog.Errorf("Failed to create/update EndpointSlice %s/%s. Error: %v", endpointSlice.GetNamespace(), endpointSlice.GetName(), err)
return err
}
if operationResult == controllerutil.OperationResultCreated {
klog.V(2).Infof("Create EndpointSlice %s/%s successfully.", endpointSlice.GetNamespace(), endpointSlice.GetName())
} else if operationResult == controllerutil.OperationResultUpdated {
klog.V(2).Infof("Update EndpointSlice %s/%s successfully.", endpointSlice.GetNamespace(), endpointSlice.GetName())
} else {
klog.V(2).Infof("EndpointSlice %s/%s is up to date.", endpointSlice.GetNamespace(), endpointSlice.GetName())
}
return nil
}
// GetEndpointSlices returns a EndpointSliceList by labels
func GetEndpointSlices(c client.Client, ls labels.Set) (*discoveryv1beta1.EndpointSliceList, error) {
endpointSlices := &discoveryv1beta1.EndpointSliceList{}
listOpt := &client.ListOptions{LabelSelector: labels.SelectorFromSet(ls)}
return endpointSlices, c.List(context.TODO(), endpointSlices, listOpt)
}
// DeleteEndpointSlice will delete all EndpointSlice objects by labels.
func DeleteEndpointSlice(c client.Client, selector labels.Set) (controllerruntime.Result, error) {
endpointSliceList, err := GetEndpointSlices(c, selector)
if err != nil {
klog.Errorf("Failed to get endpointslices by label %v: %v", selector, err)
return controllerruntime.Result{Requeue: true}, err
}
var errs []error
for index, work := range endpointSliceList.Items {
if err := c.Delete(context.TODO(), &endpointSliceList.Items[index]); err != nil {
klog.Errorf("Failed to delete endpointslice(%s/%s): %v", work.Namespace, work.Name, err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return controllerruntime.Result{Requeue: true}, errors.NewAggregate(errs)
}
return controllerruntime.Result{}, nil
}

View File

@ -22,6 +22,12 @@ const (
// executionSpacePrefix is the prefix of execution space
const executionSpacePrefix = "karmada-es-"
// endpointSlicePrefix is the prefix of collected EndpointSlice from member clusters.
const endpointSlicePrefix = "imported"
// endpointSlicePrefix is the prefix of service derived from ServiceImport.
const derivedServicePrefix = "derived"
// GenerateExecutionSpaceName generates execution space name for the given member cluster
func GenerateExecutionSpaceName(clusterName string) (string, error) {
if clusterName == "" {
@ -61,3 +67,13 @@ func GenerateServiceAccountName(clusterName string) string {
func GenerateRoleName(serviceAccountName string) string {
return fmt.Sprintf("karmada-controller-manager:%s", serviceAccountName)
}
// GenerateEndpointSliceName generates the name of collected EndpointSlice.
func GenerateEndpointSliceName(endpointSliceName string, cluster string) string {
return fmt.Sprintf("%s-%s-%s", endpointSlicePrefix, cluster, endpointSliceName)
}
// GenerateDerivedServiceName generates the service name derived from ServiceImport.
func GenerateDerivedServiceName(serviceName string) string {
return fmt.Sprintf("%s-%s", derivedServicePrefix, serviceName)
}