karmada/pkg/controllers/multiclusterservice/endpointslice_collect_contr...

422 lines
16 KiB
Go

/*
Copyright 2023 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multiclusterservice
import (
"context"
"fmt"
"reflect"
"sync"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
)
// EndpointSliceCollectControllerName is the controller name that will be used when reporting events.
const EndpointSliceCollectControllerName = "endpointslice-collect-controller"
// EndpointSliceCollectController collects EndpointSlice from member clusters and reports them to control-plane.
type EndpointSliceCollectController struct {
client.Client
RESTMapper meta.RESTMapper
StopChan <-chan struct{}
InformerManager genericmanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
// eventHandlers holds the handlers which used to handle events reported from member clusters.
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
// "member1": instance of ResourceEventHandler
eventHandlers sync.Map
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
ClusterCacheSyncTimeout metav1.Duration
}
var (
endpointSliceGVR = discoveryv1.SchemeGroupVersion.WithResource("endpointslices")
multiClusterServiceGVK = networkingv1alpha1.SchemeGroupVersion.WithKind("MultiClusterService")
)
// Reconcile performs a full reconciliation for the object referred to by the Request.
func (c *EndpointSliceCollectController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling Work %s", req.NamespacedName.String())
work := &workv1alpha1.Work{}
if err := c.Client.Get(ctx, req.NamespacedName, work); err != nil {
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
if !helper.IsWorkContains(work.Spec.Workload.Manifests, multiClusterServiceGVK) {
return controllerruntime.Result{}, nil
}
if !work.DeletionTimestamp.IsZero() {
// The Provider Clusters' EndpointSlice will be deleted by mcs_controller, let's just ignore it
return controllerruntime.Result{}, nil
}
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name)
return controllerruntime.Result{Requeue: true}, err
}
if err = c.buildResourceInformers(clusterName); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
if err = c.collectTargetEndpointSlice(work, clusterName); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).
For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).Complete(c)
}
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *EndpointSliceCollectController) RunWorkQueue() {
workerOptions := util.Options{
Name: "endpointslice-collect",
KeyFunc: nil,
ReconcileFunc: c.collectEndpointSlice,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker.Run(c.WorkerNumber, c.StopChan)
}
func (c *EndpointSliceCollectController) collectEndpointSlice(key util.QueueKey) error {
fedKey, ok := key.(keys.FederatedKey)
if !ok {
klog.Errorf("Failed to collect endpointslice as invalid key: %v", key)
return fmt.Errorf("invalid key")
}
klog.V(4).Infof("Begin to collect %s %s.", fedKey.Kind, fedKey.NamespaceKey())
if err := c.handleEndpointSliceEvent(fedKey); err != nil {
klog.Errorf("Failed to handle endpointSlice(%s) event, Error: %v",
fedKey.NamespaceKey(), err)
return err
}
return nil
}
func (c *EndpointSliceCollectController) buildResourceInformers(clusterName string) error {
cluster, err := util.GetCluster(c.Client, clusterName)
if err != nil {
klog.Errorf("Failed to get the given member cluster %s", clusterName)
return err
}
if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop collect endpointslice for cluster(%s) as cluster not ready.", cluster.Name)
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
}
if err := c.registerInformersAndStart(cluster); err != nil {
klog.Errorf("Failed to register informer for Cluster %s. Error: %v.", cluster.Name, err)
return err
}
return nil
}
// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr
// and start it.
func (c *EndpointSliceCollectController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
}
singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0)
}
gvrTargets := []schema.GroupVersionResource{
endpointSliceGVR,
}
allSynced := true
for _, gvr := range gvrTargets {
if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler(cluster.Name)) {
allSynced = false
singleClusterInformerManager.ForResource(gvr, c.getEventHandler(cluster.Name))
}
}
if allSynced {
return nil
}
c.InformerManager.Start(cluster.Name)
if err := func() error {
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, c.ClusterCacheSyncTimeout.Duration)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for _, gvr := range gvrTargets {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return err
}
return nil
}
// getEventHandler return callback function that knows how to handle events from the member cluster.
func (c *EndpointSliceCollectController) getEventHandler(clusterName string) cache.ResourceEventHandler {
if value, exists := c.eventHandlers.Load(clusterName); exists {
return value.(cache.ResourceEventHandler)
}
eventHandler := fedinformer.NewHandlerOnEvents(c.genHandlerAddFunc(clusterName), c.genHandlerUpdateFunc(clusterName),
c.genHandlerDeleteFunc(clusterName))
c.eventHandlers.Store(clusterName, eventHandler)
return eventHandler
}
func (c *EndpointSliceCollectController) genHandlerAddFunc(clusterName string) func(obj interface{}) {
return func(obj interface{}) {
curObj := obj.(runtime.Object)
key, err := keys.FederatedKeyFunc(clusterName, curObj)
if err != nil {
klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind())
return
}
c.worker.Add(key)
}
}
func (c *EndpointSliceCollectController) genHandlerUpdateFunc(clusterName string) func(oldObj, newObj interface{}) {
return func(oldObj, newObj interface{}) {
curObj := newObj.(runtime.Object)
if !reflect.DeepEqual(oldObj, newObj) {
key, err := keys.FederatedKeyFunc(clusterName, curObj)
if err != nil {
klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind())
return
}
c.worker.Add(key)
}
}
}
func (c *EndpointSliceCollectController) genHandlerDeleteFunc(clusterName string) func(obj interface{}) {
return func(obj interface{}) {
if deleted, ok := obj.(cache.DeletedFinalStateUnknown); ok {
// This object might be stale but ok for our current usage.
obj = deleted.Obj
if obj == nil {
return
}
}
oldObj := obj.(runtime.Object)
key, err := keys.FederatedKeyFunc(clusterName, oldObj)
if err != nil {
klog.Warningf("Failed to generate key for obj: %s", oldObj.GetObjectKind().GroupVersionKind())
return
}
c.worker.Add(key)
}
}
// handleEndpointSliceEvent syncs EndPointSlice objects to control-plane according to EndpointSlice event.
// For EndpointSlice create or update event, reports the EndpointSlice when referencing service has been exported.
// For EndpointSlice delete event, cleanup the previously reported EndpointSlice.
func (c *EndpointSliceCollectController) handleEndpointSliceEvent(endpointSliceKey keys.FederatedKey) error {
endpointSliceObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, endpointSliceKey)
if err != nil {
if apierrors.IsNotFound(err) {
return cleanupWorkWithEndpointSliceDelete(c.Client, endpointSliceKey)
}
return err
}
if util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceDispatchControllerLabelValue {
return nil
}
workList := &workv1alpha1.WorkList{}
if err := c.Client.List(context.TODO(), workList, &client.ListOptions{
Namespace: names.GenerateExecutionSpaceName(endpointSliceKey.Cluster),
LabelSelector: labels.SelectorFromSet(labels.Set{
util.MultiClusterServiceNamespaceLabel: endpointSliceKey.Namespace,
util.MultiClusterServiceNameLabel: util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelServiceName),
})}); err != nil {
klog.Errorf("Failed to list workList reported by endpointSlice(%s/%s), error: %v", endpointSliceKey.Namespace, endpointSliceKey.Name, err)
return err
}
mcsExists := false
for _, work := range workList.Items {
if helper.IsWorkContains(work.Spec.Workload.Manifests, multiClusterServiceGVK) {
mcsExists = true
break
}
}
if !mcsExists {
return nil
}
if err = c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(endpointSliceKey.Cluster, endpointSliceObj); err != nil {
klog.Errorf("Failed to handle endpointSlice(%s) event, Error: %v",
endpointSliceKey.NamespaceKey(), err)
return err
}
return nil
}
func (c *EndpointSliceCollectController) collectTargetEndpointSlice(work *workv1alpha1.Work, clusterName string) error {
manager := c.InformerManager.GetSingleClusterManager(clusterName)
if manager == nil {
err := fmt.Errorf("failed to get informer manager for cluster %s", clusterName)
klog.Errorf("%v", err)
return err
}
svcNamespace := util.GetLabelValue(work.Labels, util.MultiClusterServiceNamespaceLabel)
svcName := util.GetLabelValue(work.Labels, util.MultiClusterServiceNameLabel)
selector := labels.SelectorFromSet(labels.Set{
discoveryv1.LabelServiceName: svcName,
})
epsList, err := manager.Lister(discoveryv1.SchemeGroupVersion.WithResource("endpointslices")).ByNamespace(svcNamespace).List(selector)
if err != nil {
klog.Errorf("Failed to list EndpointSlice for Service(%s/%s) in cluster(%s), Error: %v", svcNamespace, svcName, clusterName, err)
return err
}
for _, epsObj := range epsList {
eps := &discoveryv1.EndpointSlice{}
if err = helper.ConvertToTypedObject(epsObj, eps); err != nil {
klog.Errorf("Failed to convert object to EndpointSlice, error: %v", err)
return err
}
if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceDispatchControllerLabelValue {
continue
}
epsUnstructured, err := helper.ToUnstructured(eps)
if err != nil {
klog.Errorf("Failed to convert EndpointSlice %s/%s to unstructured, error: %v", eps.GetNamespace(), eps.GetName(), err)
return err
}
if err := c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(clusterName, epsUnstructured); err != nil {
return err
}
}
return nil
}
// reportEndpointSliceWithEndpointSliceCreateOrUpdate reports the EndpointSlice when referencing service has been exported.
func (c *EndpointSliceCollectController) reportEndpointSliceWithEndpointSliceCreateOrUpdate(clusterName string, endpointSlice *unstructured.Unstructured) error {
if err := reportEndpointSlice(c.Client, endpointSlice, clusterName); err != nil {
return fmt.Errorf("failed to report EndpointSlice(%s/%s) from cluster(%s) to control-plane",
endpointSlice.GetNamespace(), endpointSlice.GetName(), clusterName)
}
return nil
}
// reportEndpointSlice report EndPointSlice objects to control-plane.
func reportEndpointSlice(c client.Client, endpointSlice *unstructured.Unstructured, clusterName string) error {
executionSpace := names.GenerateExecutionSpaceName(clusterName)
workMeta := metav1.ObjectMeta{
// Karmada will synchronize this work to other cluster namespaces and add the cluster name to prevent conflicts.
Name: names.GenerateWorkName(endpointSlice.GetKind(), endpointSlice.GetName(), endpointSlice.GetNamespace()),
Namespace: executionSpace,
Labels: map[string]string{
util.MultiClusterServiceNamespaceLabel: endpointSlice.GetNamespace(),
util.MultiClusterServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName],
// indicate the Work should be not propagated since it's collected resource.
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
},
}
if err := helper.CreateOrUpdateWork(c, workMeta, endpointSlice); err != nil {
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
return err
}
return nil
}
func cleanupWorkWithEndpointSliceDelete(c client.Client, endpointSliceKey keys.FederatedKey) error {
executionSpace := names.GenerateExecutionSpaceName(endpointSliceKey.Cluster)
workNamespaceKey := types.NamespacedName{
Namespace: executionSpace,
Name: names.GenerateWorkName(endpointSliceKey.Kind, endpointSliceKey.Name, endpointSliceKey.Namespace),
}
work := &workv1alpha1.Work{}
if err := c.Get(context.TODO(), workNamespaceKey, work); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
klog.Errorf("Failed to get work(%s) in executionSpace(%s): %v", workNamespaceKey.String(), executionSpace, err)
return err
}
if err := c.Delete(context.TODO(), work); err != nil {
klog.Errorf("Failed to delete work(%s): %v", workNamespaceKey.String(), err)
return err
}
return nil
}