From e97b51b803efe686f27742a2831454bc9e2c6f1c Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Wed, 26 Mar 2025 13:08:09 -0700 Subject: [PATCH] feat(mutlicluster): Add support for excluding labels and annotations from federated and mirror services (#13802) Depends on https://github.com/linkerd/linkerd2/pull/13801 Adds support for excluding certain labels and annotations from being copied onto mirror and federated services. This makes use of the `excludedLabels` and `excludedAnnoations` fields in the Link resource. These fields take a list of strings which may be literal label/annotation names or they may be group globs of the form `/*` which will match all labels/annotations beginning with `/`. Any matching labels or annotations will not be copied. We also add corresponding flags to the `mc link` command: `--excluded-labels` and `--excluded-annotations` for setting these fields on the Link resource. --- controller/gen/apis/link/v1alpha3/types.go | 2 +- .../link/v1alpha3/zz_generated.deepcopy.go | 6 +- .../templates/local-service-mirror.yaml | 2 + .../charts/linkerd-multicluster/values.yaml | 8 + multicluster/cmd/link.go | 8 + multicluster/cmd/service-mirror/main.go | 52 ++++-- .../cmd/testdata/install_default.golden | 2 + multicluster/cmd/testdata/install_ha.golden | 2 + multicluster/cmd/testdata/install_psp.golden | 2 + .../service-mirror/cluster_watcher.go | 173 +++++++++++------- .../cluster_watcher_mirroring_test.go | 12 +- .../cluster_watcher_test_util.go | 26 --- .../service-mirror/events_formatting.go | 35 +--- test/integration/multicluster/install_test.go | 7 +- .../multicluster-traffic/federated_test.go | 86 ++++++++- .../multicluster-traffic/mc_traffic_test.go | 95 +++++++++- 16 files changed, 357 insertions(+), 161 deletions(-) diff --git a/controller/gen/apis/link/v1alpha3/types.go b/controller/gen/apis/link/v1alpha3/types.go index 223ee8f5c..f4a4d2b51 100644 --- a/controller/gen/apis/link/v1alpha3/types.go +++ b/controller/gen/apis/link/v1alpha3/types.go @@ -91,7 +91,7 @@ type LinkCondition struct { // +optional Message string `json:"message"` // LocalRef is a reference to the local mirror or federated service. - LocalRef ObjectRef `json:"localRef,omitempty"` + LocalRef *ObjectRef `json:"localRef,omitempty"` } type ObjectRef struct { diff --git a/controller/gen/apis/link/v1alpha3/zz_generated.deepcopy.go b/controller/gen/apis/link/v1alpha3/zz_generated.deepcopy.go index 36a4dcb37..640a48016 100644 --- a/controller/gen/apis/link/v1alpha3/zz_generated.deepcopy.go +++ b/controller/gen/apis/link/v1alpha3/zz_generated.deepcopy.go @@ -59,7 +59,11 @@ func (in *LinkCondition) DeepCopyInto(out *LinkCondition) { *out = *in in.LastProbeTime.DeepCopyInto(&out.LastProbeTime) in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) - out.LocalRef = in.LocalRef + if in.LocalRef != nil { + in, out := &in.LocalRef, &out.LocalRef + *out = new(ObjectRef) + **out = **in + } return } diff --git a/multicluster/charts/linkerd-multicluster/templates/local-service-mirror.yaml b/multicluster/charts/linkerd-multicluster/templates/local-service-mirror.yaml index 9723c0e79..f37cdef7c 100644 --- a/multicluster/charts/linkerd-multicluster/templates/local-service-mirror.yaml +++ b/multicluster/charts/linkerd-multicluster/templates/local-service-mirror.yaml @@ -102,6 +102,8 @@ spec: - -enable-pprof={{.Values.localServiceMirror.enablePprof | default false}} - -local-mirror - -federated-service-selector={{.Values.localServiceMirror.federatedServiceSelector}} + - -excluded-labels={{.Values.localServiceMirror.excludedLabels}} + - -excluded-annotations={{.Values.localServiceMirror.excludedAnnotations}} {{- if or .Values.localServiceMirror.additionalEnv .Values.localServiceMirror.experimentalEnv }} env: {{- with .Values.localServiceMirror.additionalEnv }} diff --git a/multicluster/charts/linkerd-multicluster/values.yaml b/multicluster/charts/linkerd-multicluster/values.yaml index 3f9d6886f..ced9f4012 100644 --- a/multicluster/charts/linkerd-multicluster/values.yaml +++ b/multicluster/charts/linkerd-multicluster/values.yaml @@ -126,6 +126,14 @@ localServiceMirror: # -- Label selector for federated service members in the local cluster. federatedServiceSelector: "mirror.linkerd.io/federated=member" + # -- Labels that should not be copied from the local service to the mirror + # service. + excludedLabels: "" + + # -- Annotations that should not be copied from the local service to the + # mirror service. + excludedAnnotations: "" + # -- Number of local service mirror replicas to run replicas: 1 diff --git a/multicluster/cmd/link.go b/multicluster/cmd/link.go index 3b512e6f0..270233d67 100644 --- a/multicluster/cmd/link.go +++ b/multicluster/cmd/link.go @@ -58,6 +58,8 @@ type ( federatedServiceSelector string gatewayAddresses string gatewayPort uint32 + excludedAnnotations []string + excludedLabels []string ha bool enableGateway bool enableServiceMirror bool @@ -262,6 +264,8 @@ A full list of configurable values can be found at https://github.com/linkerd/li ClusterCredentialsSecret: fmt.Sprintf("cluster-credentials-%s", opts.clusterName), RemoteDiscoverySelector: remoteDiscoverySelector, FederatedServiceSelector: federatedServiceSelector, + ExcludedAnnotations: opts.excludedAnnotations, + ExcludedLabels: opts.excludedLabels, }, } @@ -400,6 +404,8 @@ A full list of configurable values can be found at https://github.com/linkerd/li cmd.Flags().StringVar(&opts.federatedServiceSelector, "federated-service-selector", opts.federatedServiceSelector, "Selector (label query) for federated service members in the target cluster") cmd.Flags().StringVar(&opts.gatewayAddresses, "gateway-addresses", opts.gatewayAddresses, "If specified, overwrites gateway addresses when gateway service is not type LoadBalancer (comma separated list)") cmd.Flags().Uint32Var(&opts.gatewayPort, "gateway-port", opts.gatewayPort, "If specified, overwrites gateway port when gateway service is not type LoadBalancer") + cmd.Flags().StringSliceVar(&opts.excludedAnnotations, "excluded-annotations", opts.excludedAnnotations, "Annotations to exclude when mirroring services") + cmd.Flags().StringSliceVar(&opts.excludedLabels, "excluded-labels", opts.excludedLabels, "Labels to exclude when mirroring services") cmd.Flags().BoolVar(&opts.ha, "ha", opts.ha, "Enable HA configuration for the service-mirror deployment (default false)") cmd.Flags().BoolVar(&opts.enableGateway, "gateway", opts.enableGateway, "If false, allows a link to be created against a cluster that does not have a gateway service") cmd.Flags().BoolVar(&opts.enableServiceMirror, "service-mirror", opts.enableServiceMirror, "If false, only outputs link manifest and credentials secrets") @@ -508,6 +514,8 @@ func newLinkOptionsWithDefault() (*linkOptions, error) { federatedServiceSelector: fmt.Sprintf("%s=%s", k8s.DefaultFederatedServiceSelector, "member"), gatewayAddresses: "", gatewayPort: 0, + excludedAnnotations: []string{}, + excludedLabels: []string{}, ha: false, enableGateway: true, enableServiceMirror: true, diff --git a/multicluster/cmd/service-mirror/main.go b/multicluster/cmd/service-mirror/main.go index a39911102..7b995fdfa 100644 --- a/multicluster/cmd/service-mirror/main.go +++ b/multicluster/cmd/service-mirror/main.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "os/signal" + "strings" "syscall" "time" @@ -55,6 +56,8 @@ func Main(args []string) { enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server") localMirror := cmd.Bool("local-mirror", false, "watch the local cluster for federated service members") federatedServiceSelector := cmd.String("federated-service-selector", k8s.DefaultFederatedServiceSelector, "Selector (label query) for federated service members in the local cluster") + exludedAnnotations := cmd.String("excluded-annotations", "", "Annotations to exclude when mirroring services") + excludedLabels := cmd.String("excluded-labels", "", "Labels to exclude when mirroring services") probeSvc := cmd.String("probe-service", "", "Name of the target cluster probe service") flags.ConfigureAndParse(cmd, args) @@ -122,7 +125,35 @@ func Main(args []string) { if *localMirror { run = func(ctx context.Context) { - err = startLocalClusterWatcher(ctx, *namespace, controllerK8sAPI, linksAPI, *requeueLimit, *repairPeriod, *enableHeadlessSvc, *enableNamespaceCreation, *federatedServiceSelector) + federatedLabelSelector, err := metav1.ParseToLabelSelector(*federatedServiceSelector) + if err != nil { + log.Fatalf("failed to parse federated service selector: %s", err) + } + + excludedLabelList := []string{} + if *excludedLabels != "" { + excludedLabelList = strings.Split(*excludedLabels, ",") + } + excludedAnnotationList := []string{} + if *exludedAnnotations != "" { + excludedAnnotationList = strings.Split(*exludedAnnotations, ",") + } + + link := v1alpha3.Link{ + ObjectMeta: metav1.ObjectMeta{ + Name: "local", + Namespace: *namespace, + }, + Spec: v1alpha3.LinkSpec{ + TargetClusterName: "", + Selector: nil, + RemoteDiscoverySelector: nil, + FederatedServiceSelector: federatedLabelSelector, + ExcludedAnnotations: excludedAnnotationList, + ExcludedLabels: excludedLabelList, + }, + } + err = startLocalClusterWatcher(ctx, *namespace, controllerK8sAPI, linksAPI, *requeueLimit, *repairPeriod, *enableHeadlessSvc, *enableNamespaceCreation, link) if err != nil { log.Fatalf("Failed to start local cluster watcher: %s", err) } @@ -360,25 +391,8 @@ func startLocalClusterWatcher( repairPeriod time.Duration, enableHeadlessSvc bool, enableNamespaceCreation bool, - federatedServiceSelector string, + link v1alpha3.Link, ) error { - federatedLabelSelector, err := metav1.ParseToLabelSelector(federatedServiceSelector) - if err != nil { - return fmt.Errorf("failed to parse federated service selector: %w", err) - } - - link := v1alpha3.Link{ - ObjectMeta: metav1.ObjectMeta{ - Name: "local", - Namespace: namespace, - }, - Spec: v1alpha3.LinkSpec{ - TargetClusterName: "", - Selector: nil, - RemoteDiscoverySelector: nil, - FederatedServiceSelector: federatedLabelSelector, - }, - } cw, err := servicemirror.NewRemoteClusterServiceWatcher( ctx, namespace, diff --git a/multicluster/cmd/testdata/install_default.golden b/multicluster/cmd/testdata/install_default.golden index 77d92b281..0033c02c2 100644 --- a/multicluster/cmd/testdata/install_default.golden +++ b/multicluster/cmd/testdata/install_default.golden @@ -1358,6 +1358,8 @@ spec: - -enable-pprof=false - -local-mirror - -federated-service-selector=mirror.linkerd.io/federated=member + - -excluded-labels= + - -excluded-annotations= image: cr.l5d.io/linkerd/controller:linkerdVersionValue name: service-mirror securityContext: diff --git a/multicluster/cmd/testdata/install_ha.golden b/multicluster/cmd/testdata/install_ha.golden index 05804c210..8cbce1719 100644 --- a/multicluster/cmd/testdata/install_ha.golden +++ b/multicluster/cmd/testdata/install_ha.golden @@ -1453,6 +1453,8 @@ spec: - -enable-pprof=false - -local-mirror - -federated-service-selector=mirror.linkerd.io/federated=member + - -excluded-labels= + - -excluded-annotations= image: cr.l5d.io/linkerd/controller:linkerdVersionValue name: service-mirror securityContext: diff --git a/multicluster/cmd/testdata/install_psp.golden b/multicluster/cmd/testdata/install_psp.golden index 09e8c9855..3315f5121 100644 --- a/multicluster/cmd/testdata/install_psp.golden +++ b/multicluster/cmd/testdata/install_psp.golden @@ -1392,6 +1392,8 @@ spec: - -enable-pprof=false - -local-mirror - -federated-service-selector=mirror.linkerd.io/federated=member + - -excluded-labels= + - -excluded-annotations= image: cr.l5d.io/linkerd/controller:linkerdVersionValue name: service-mirror securityContext: diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index b05cfdd4e..52609ab54 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -91,15 +91,12 @@ type ( // reconcile. Most importantly we need to keep track of exposed ports // and gateway association changes. RemoteExportedServiceUpdated struct { - localService *corev1.Service - localEndpoints *corev1.Endpoints - remoteUpdate *corev1.Service + remoteUpdate *corev1.Service } // RemoteServiceJoinedFederatedService is generated when a remote server // joins a federated service and the local federated service already exists. RemoteServiceJoinsFederatedService struct { - localService *corev1.Service remoteUpdate *corev1.Service } @@ -259,6 +256,23 @@ func (rcsw *RemoteClusterServiceWatcher) originalResourceName(mirroredName strin return strings.TrimSuffix(mirroredName, fmt.Sprintf("-%s", rcsw.link.Spec.TargetClusterName)) } +func (rcsw *RemoteClusterServiceWatcher) isLabelExlucded(label string) bool { + if strings.HasPrefix(label, consts.SvcMirrorPrefix) { + return true + } + for _, excludedLabel := range rcsw.link.Spec.ExcludedLabels { + if strings.HasSuffix(excludedLabel, "/*") { + trimmed := strings.TrimSuffix(excludedLabel, "/*") + if strings.HasPrefix(label, trimmed) { + return true + } + } else if label == excludedLabel { + return true + } + } + return false +} + // Provides labels for mirrored or federatedservice. // Copies all labels from the remote service to local service (except labels // with the "SvcMirrorPrefix"). @@ -268,7 +282,7 @@ func (rcsw *RemoteClusterServiceWatcher) getCommonServiceLabels(remoteService *c } for key, value := range remoteService.ObjectMeta.Labels { - if strings.HasPrefix(key, consts.SvcMirrorPrefix) { + if rcsw.isLabelExlucded(key) { continue } labels[key] = value @@ -310,13 +324,30 @@ func (rcsw *RemoteClusterServiceWatcher) getFederatedServiceLabels(remoteService return labels } +func (rcsw *RemoteClusterServiceWatcher) isAnnotationExlucded(annotation string) bool { + // Topology aware hints are not multicluster aware. + if annotation == "service.kubernetes.io/topology-aware-hints" || annotation == "service.kubernetes.io/topology-mode" { + return true + } + for _, excludedAnnotation := range rcsw.link.Spec.ExcludedAnnotations { + if strings.HasSuffix(excludedAnnotation, "/*") { + trimmed := strings.TrimSuffix(excludedAnnotation, "/*") + if strings.HasPrefix(annotation, trimmed) { + return true + } + } else if annotation == excludedAnnotation { + return true + } + } + return false +} + // Provides annotations for mirror or federated services func (rcsw *RemoteClusterServiceWatcher) getCommonServiceAnnotations(remoteService *corev1.Service) map[string]string { annotations := map[string]string{} for key, value := range remoteService.ObjectMeta.Annotations { - // Topology aware hints are not multicluster aware. - if key == "service.kubernetes.io/topology-aware-hints" || key == "service.kubernetes.io/topology-mode" { + if rcsw.isAnnotationExlucded(key) { continue } annotations[key] = value @@ -624,24 +655,46 @@ func (rcsw *RemoteClusterServiceWatcher) handleFederatedServiceLeave(ctx context // Updates a locally mirrored service. There might have been some pretty fundamental changes such as // new gateway being assigned or additional ports exposed. This method takes care of that. func (rcsw *RemoteClusterServiceWatcher) handleRemoteExportedServiceUpdated(ctx context.Context, ev *RemoteExportedServiceUpdated) error { - rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name) + rcsw.log.Infof("Updating mirror service for %s/%s", ev.remoteUpdate.Namespace, ev.remoteUpdate.Name) + + mirrorName := rcsw.mirrorServiceName(ev.remoteUpdate.Name) + localService, err := rcsw.localAPIClient.Svc().Lister().Services(ev.remoteUpdate.Namespace).Get(mirrorName) + if err != nil { + if kerrors.IsNotFound(err) { + rcsw.log.Infof("Mirror service %s/%s not found, re-creating", ev.remoteUpdate.Namespace, mirrorName) + rcsw.eventsQueue.Add(&RemoteServiceExported{ + service: ev.remoteUpdate, + }) + return nil + } + return RetryableError{[]error{err}} + } + + localEndpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(localService.Namespace).Get(mirrorName) + if err != nil { + if kerrors.IsNotFound(err) { + localEndpoints = nil + } else { + return RetryableError{[]error{err}} + } + } if rcsw.isRemoteDiscovery(ev.remoteUpdate.Labels) { // The service is mirrored in remote discovery mode and any local // endpoints for it should be deleted if they exist. - if ev.localEndpoints != nil { - err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.localService.Namespace).Delete(ctx, ev.localService.Name, metav1.DeleteOptions{}) + if localEndpoints != nil { + err := rcsw.localAPIClient.Client.CoreV1().Endpoints(localService.Namespace).Delete(ctx, localService.Name, metav1.DeleteOptions{}) if err != nil { rcsw.updateLinkMirrorStatus( ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to delete mirror endpoints: %s", err), nil), ) return RetryableError{[]error{ - fmt.Errorf("failed to delete mirror endpoints for %s/%s: %w", ev.localService.Namespace, ev.localService.Name, err), + fmt.Errorf("failed to delete mirror endpoints for %s/%s: %w", localService.Namespace, localService.Name, err), }} } } - } else if ev.localEndpoints == nil { + } else if localEndpoints == nil { // The service is mirrored in gateway mode and gateway endpoints should // be created for it. err := rcsw.createGatewayEndpoints(ctx, ev.remoteUpdate) @@ -664,7 +717,7 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteExportedServiceUpdated(ctx return err } - copiedEndpoints := ev.localEndpoints.DeepCopy() + copiedEndpoints := localEndpoints.DeepCopy() ports, err := rcsw.getEndpointsPorts(ev.remoteUpdate) if err != nil { return err @@ -691,11 +744,11 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteExportedServiceUpdated(ctx } } - ev.localService.Labels = rcsw.getMirrorServiceLabels(ev.remoteUpdate) - ev.localService.Annotations = rcsw.getMirrorServiceAnnotations(ev.remoteUpdate) - ev.localService.Spec.Ports = remapRemoteServicePorts(ev.remoteUpdate.Spec.Ports) + localService.Labels = rcsw.getMirrorServiceLabels(ev.remoteUpdate) + localService.Annotations = rcsw.getMirrorServiceAnnotations(ev.remoteUpdate) + localService.Spec.Ports = remapRemoteServicePorts(ev.remoteUpdate.Spec.Ports) - if _, err := rcsw.localAPIClient.Client.CoreV1().Services(ev.localService.Namespace).Update(ctx, ev.localService, metav1.UpdateOptions{}); err != nil { + if _, err := rcsw.localAPIClient.Client.CoreV1().Services(localService.Namespace).Update(ctx, localService, metav1.UpdateOptions{}); err != nil { rcsw.updateLinkMirrorStatus( ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to update mirror service: %s", err), nil), @@ -704,15 +757,32 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteExportedServiceUpdated(ctx } rcsw.updateLinkMirrorStatus( ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), - mirrorStatusCondition(true, reasonMirrored, "", ev.localService), + mirrorStatusCondition(true, reasonMirrored, "", localService), ) return nil } // Updates a federated service to include the remote service as a member. func (rcsw *RemoteClusterServiceWatcher) handleFederatedServiceJoin(ctx context.Context, ev *RemoteServiceJoinsFederatedService) error { - rcsw.log.Infof("Updating federated service %s/%s", ev.localService.Namespace, ev.localService.Name) - previous := ev.localService.DeepCopy() + federatedName := rcsw.federatedServiceName(ev.remoteUpdate.Name) + rcsw.log.Infof("Updating federated service %s/%s", ev.remoteUpdate.Namespace, federatedName) + + localService, err := rcsw.localAPIClient.Svc().Lister().Services(ev.remoteUpdate.Namespace).Get(federatedName) + if err != nil { + if kerrors.IsNotFound(err) { + rcsw.eventsQueue.Add(&CreateFederatedService{ + service: ev.remoteUpdate, + }) + return nil + } + rcsw.updateLinkFederatedStatus( + ev.remoteUpdate.Name, ev.remoteUpdate.Namespace, + mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to get federated service: %s", err), nil), + ) + return RetryableError{[]error{err}} + } + + previous := localService.DeepCopy() if ev.remoteUpdate.Spec.ClusterIP == corev1.ClusterIPNone { rcsw.updateLinkFederatedStatus( @@ -728,14 +798,14 @@ func (rcsw *RemoteClusterServiceWatcher) handleFederatedServiceJoin(ctx context. if rcsw.link.Spec.TargetClusterName == "" { // Always treat the local cluster as the oldest. isOldest = true - } else if _, localDiscoveryExists := ev.localService.Annotations[consts.LocalDiscoveryAnnotation]; localDiscoveryExists { + } else if _, localDiscoveryExists := localService.Annotations[consts.LocalDiscoveryAnnotation]; localDiscoveryExists { // The local cluster is older than us. isOldest = false } else { // The local cluster is not a member of the federated service. Therefore, // we check the remote discovery to see if we are the oldest. isOldest = true - members := strings.Split(ev.localService.Annotations[consts.RemoteDiscoveryAnnotation], ",") + members := strings.Split(localService.Annotations[consts.RemoteDiscoveryAnnotation], ",") for _, member := range members { target := strings.Split(member, "@") if len(target) != 2 { @@ -759,32 +829,32 @@ func (rcsw *RemoteClusterServiceWatcher) handleFederatedServiceJoin(ctx context. if isOldest { preservedAnnotations := map[string]string{} for _, k := range []string{consts.RemoteDiscoveryAnnotation, consts.LocalDiscoveryAnnotation} { - if v, ok := ev.localService.Annotations[k]; ok { + if v, ok := localService.Annotations[k]; ok { preservedAnnotations[k] = v } } - ev.localService.Labels = rcsw.getFederatedServiceLabels(ev.remoteUpdate) - ev.localService.Annotations = rcsw.getFederatedServiceAnnotations(ev.remoteUpdate) + localService.Labels = rcsw.getFederatedServiceLabels(ev.remoteUpdate) + localService.Annotations = rcsw.getFederatedServiceAnnotations(ev.remoteUpdate) for k, v := range preservedAnnotations { - ev.localService.Annotations[k] = v + localService.Annotations[k] = v } - ev.localService.Spec.Ports = remapRemoteServicePorts(ev.remoteUpdate.Spec.Ports) + localService.Spec.Ports = remapRemoteServicePorts(ev.remoteUpdate.Spec.Ports) } if rcsw.link.Spec.TargetClusterName == "" { // Local discovery - ev.localService.Annotations[consts.LocalDiscoveryAnnotation] = ev.remoteUpdate.Name + localService.Annotations[consts.LocalDiscoveryAnnotation] = ev.remoteUpdate.Name } else { // Remote discovery remoteTarget := fmt.Sprintf("%s@%s", ev.remoteUpdate.Name, rcsw.link.Spec.TargetClusterName) - if !remoteDiscoveryContains(ev.localService.Annotations[consts.RemoteDiscoveryAnnotation], remoteTarget) { - if ev.localService.Annotations[consts.RemoteDiscoveryAnnotation] == "" { - ev.localService.Annotations[consts.RemoteDiscoveryAnnotation] = remoteTarget + if !remoteDiscoveryContains(localService.Annotations[consts.RemoteDiscoveryAnnotation], remoteTarget) { + if localService.Annotations[consts.RemoteDiscoveryAnnotation] == "" { + localService.Annotations[consts.RemoteDiscoveryAnnotation] = remoteTarget } else { - ev.localService.Annotations[consts.RemoteDiscoveryAnnotation] = fmt.Sprintf( + localService.Annotations[consts.RemoteDiscoveryAnnotation] = fmt.Sprintf( "%s,%s", - ev.localService.Annotations[consts.RemoteDiscoveryAnnotation], + localService.Annotations[consts.RemoteDiscoveryAnnotation], remoteTarget, ) } @@ -792,11 +862,11 @@ func (rcsw *RemoteClusterServiceWatcher) handleFederatedServiceJoin(ctx context. } // Don't update the service if it has not changed. - if reflect.DeepEqual(previous, ev.localService) { + if reflect.DeepEqual(previous, localService) { return nil } - if _, err := rcsw.localAPIClient.Client.CoreV1().Services(ev.localService.Namespace).Update(ctx, ev.localService, metav1.UpdateOptions{}); err != nil { + if _, err := rcsw.localAPIClient.Client.CoreV1().Services(localService.Namespace).Update(ctx, localService, metav1.UpdateOptions{}); err != nil { rcsw.updateLinkFederatedStatus( ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to update federated service: %s", err), nil), @@ -805,7 +875,7 @@ func (rcsw *RemoteClusterServiceWatcher) handleFederatedServiceJoin(ctx context. } rcsw.updateLinkFederatedStatus( ev.remoteUpdate.GetName(), ev.remoteUpdate.GetNamespace(), - mirrorStatusCondition(true, reasonMirrored, "", ev.localService), + mirrorStatusCondition(true, reasonMirrored, "", localService), ) return nil } @@ -1137,18 +1207,8 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.S // if we have the local service present, we need to issue an update lastMirroredRemoteVersion, ok := localService.Annotations[consts.RemoteResourceVersionAnnotation] if ok && lastMirroredRemoteVersion != service.ResourceVersion { - endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(service.Namespace).Get(mirrorName) - if err != nil { - if kerrors.IsNotFound(err) { - endpoints = nil - } else { - return RetryableError{[]error{err}} - } - } rcsw.eventsQueue.Add(&RemoteExportedServiceUpdated{ - localService: localService, - localEndpoints: endpoints, - remoteUpdate: service, + remoteUpdate: service, }) } } else { @@ -1172,23 +1232,7 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.S if rcsw.isFederatedServiceMember(service.Labels) { // The desired state is that the local federated service should exist. - localService, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(federatedName) - if err != nil { - if kerrors.IsNotFound(err) { - rcsw.eventsQueue.Add(&CreateFederatedService{ - service: service, - }) - return nil - } - rcsw.updateLinkFederatedStatus( - service.Name, service.Namespace, - mirrorStatusCondition(false, reasonError, fmt.Sprintf("Failed to get federated service: %s", err), nil), - ) - return RetryableError{[]error{err}} - } - // if we have the local service present, we need to issue an update rcsw.eventsQueue.Add(&RemoteServiceJoinsFederatedService{ - localService: localService, remoteUpdate: service, }) } else { @@ -1809,6 +1853,7 @@ func (rcsw *RemoteClusterServiceWatcher) updateLinkMirrorStatus(remoteName, name link, err := rcsw.linksAPIClient.L5dClient.LinkV1alpha3().Links(rcsw.link.GetNamespace()).Get(context.Background(), rcsw.link.Name, metav1.GetOptions{}) if err != nil { rcsw.log.Errorf("Failed to get link %s/%s: %s", rcsw.link.Namespace, rcsw.link.Name, err) + return } link.Status.MirrorServices = updateServiceStatus(remoteName, namespace, condition, link.Status.MirrorServices) rcsw.patchLinkStatus(link.Status) @@ -1920,7 +1965,7 @@ func mirrorStatusCondition(success bool, reason string, message string, localRef Type: "Mirrored", } if localRef != nil { - condition.LocalRef = v1alpha3.ObjectRef{ + condition.LocalRef = &v1alpha3.ObjectRef{ Name: localRef.Name, Namespace: localRef.Namespace, Kind: "Service", diff --git a/multicluster/service-mirror/cluster_watcher_mirroring_test.go b/multicluster/service-mirror/cluster_watcher_mirroring_test.go index 2fb5dc942..58ed875c9 100644 --- a/multicluster/service-mirror/cluster_watcher_mirroring_test.go +++ b/multicluster/service-mirror/cluster_watcher_mirroring_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/go-test/deep" "github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha3" @@ -596,8 +597,6 @@ func TestServiceCreatedGatewayAlive(t *testing.T) { // update svc-remote; the gateway is still not alive though so we expect // the Endpoints of svc-remote to still have no ready addresses. events.Add(&RemoteExportedServiceUpdated{ - localService: remoteService("svc-remote", "ns", "2", nil, nil), - localEndpoints: endpoints, remoteUpdate: remoteService("svc", "ns", "2", map[string]string{ consts.DefaultExportedServiceSelector: "true", "new-label": "hi", @@ -609,6 +608,13 @@ func TestServiceCreatedGatewayAlive(t *testing.T) { }, }), }) + + // Processing the RemoteExportedServiceUpdated involves reading from the + // localAPI informer cache. Since this cache is updated asyncronously, we + // pause briefly here to give a chance for updates to the localAPI to be + // reflected in the cache. + time.Sleep(100 * time.Millisecond) + for events.Len() > 0 { watcher.processNextEvent(context.Background()) } @@ -1017,8 +1023,6 @@ func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase { description: fmt.Sprintf("enqueue a RemoteServiceUpdated event if this is a service that we have already mirrored and its res version is different (%s)", testType), environment: onAddOrUpdateRemoteServiceUpdated(isAdd), expectedEventsInQueue: []interface{}{&RemoteExportedServiceUpdated{ - localService: mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil, nil), - localEndpoints: endpoints("test-service-remote", "test-namespace", nil, "0.0.0.0", "", nil), remoteUpdate: remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{ consts.DefaultExportedServiceSelector: "true", }, nil), diff --git a/multicluster/service-mirror/cluster_watcher_test_util.go b/multicluster/service-mirror/cluster_watcher_test_util.go index d85480184..810887980 100644 --- a/multicluster/service-mirror/cluster_watcher_test_util.go +++ b/multicluster/service-mirror/cluster_watcher_test_util.go @@ -218,7 +218,6 @@ func joinFederatedService() *testEnvironment { return &testEnvironment{ events: []interface{}{ &RemoteServiceJoinsFederatedService{ - localService: fedSvc, remoteUpdate: remoteService("service-one", "ns1", "111", map[string]string{ consts.DefaultFederatedServiceSelector: "member", }, []corev1.ServicePort{ @@ -351,7 +350,6 @@ func joinLocalFederatedService() *testEnvironment { return &testEnvironment{ events: []interface{}{ &RemoteServiceJoinsFederatedService{ - localService: fedSvc, remoteUpdate: remoteService("service-one", "ns1", "111", map[string]string{ consts.DefaultFederatedServiceSelector: "member", }, []corev1.ServicePort{ @@ -554,30 +552,6 @@ var updateServiceWithChangedPorts = &testEnvironment{ Port: 333, }, }), - localService: mirrorService("test-service-remote", "test-namespace", "pastServiceResVersion", nil, []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 111, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 222, - }, - }), - localEndpoints: endpoints("test-service-remote", "test-namespace", nil, "192.0.2.127", "", []corev1.EndpointPort{ - { - Name: "port1", - Port: 888, - Protocol: "TCP", - }, - { - Name: "port2", - Port: 888, - Protocol: "TCP", - }, - }), }, }, remoteResources: []string{ diff --git a/multicluster/service-mirror/events_formatting.go b/multicluster/service-mirror/events_formatting.go index e0a3eb3da..8eb73c387 100644 --- a/multicluster/service-mirror/events_formatting.go +++ b/multicluster/service-mirror/events_formatting.go @@ -8,15 +8,6 @@ import ( corev1 "k8s.io/api/core/v1" ) -func formatAddresses(addresses []corev1.EndpointAddress) string { - var addrs []string - - for _, a := range addresses { - addrs = append(addrs, a.IP) - } - return fmt.Sprintf("[%s]", strings.Join(addrs, ",")) -} - func formatMetadata(meta map[string]string) string { var metadata []string @@ -28,15 +19,6 @@ func formatMetadata(meta map[string]string) string { return fmt.Sprintf("[%s]", strings.Join(metadata, ",")) } -func formatPorts(ports []corev1.EndpointPort) string { - var formattedPorts []string - - for _, p := range ports { - formattedPorts = append(formattedPorts, fmt.Sprintf("Port: {name: %s, port: %d}", p.Name, p.Port)) - } - return fmt.Sprintf("[%s]", strings.Join(formattedPorts, ",")) -} - func formatService(svc *corev1.Service) string { if svc == nil { return "Service: nil" @@ -44,26 +26,13 @@ func formatService(svc *corev1.Service) string { return fmt.Sprintf("Service: {name: %s, namespace: %s, annotations: [%s], labels [%s]}", svc.Name, svc.Namespace, formatMetadata(svc.Annotations), formatMetadata(svc.Labels)) } -func formatEndpoints(endpoints *corev1.Endpoints) string { - if endpoints == nil { - return "Endpoints: nil" - } - var subsets []string - - for _, ss := range endpoints.Subsets { - subsets = append(subsets, fmt.Sprintf("%s:%s", formatAddresses(ss.Addresses), formatPorts(ss.Ports))) - } - - return fmt.Sprintf("Endpoints: {name: %s, namespace: %s, annotations: [%s], labels: [%s], subsets: [%s]}", endpoints.Name, endpoints.Namespace, formatMetadata(endpoints.Annotations), formatMetadata(endpoints.Labels), strings.Join(subsets, ",")) -} - // Events for cluster watcher func (rsc RemoteServiceExported) String() string { return fmt.Sprintf("RemoteServiceExported: {service: %s}", formatService(rsc.service)) } func (rsu RemoteExportedServiceUpdated) String() string { - return fmt.Sprintf("RemoteExportedServiceUpdated: {localService: %s, localEndpoints: %s, remoteUpdate: %s}", formatService(rsu.localService), formatEndpoints(rsu.localEndpoints), formatService(rsu.remoteUpdate)) + return fmt.Sprintf("RemoteExportedServiceUpdated: {remoteUpdate: %s}", formatService(rsu.remoteUpdate)) } func (rsd RemoteServiceUnexported) String() string { @@ -75,7 +44,7 @@ func (cfs CreateFederatedService) String() string { } func (jfs RemoteServiceJoinsFederatedService) String() string { - return fmt.Sprintf("RemoteServiceJoinsFederatedService: {localService: %s, remoteUpdate: %s}", formatService(jfs.localService), formatService(jfs.remoteUpdate)) + return fmt.Sprintf("RemoteServiceJoinsFederatedService: {remoteUpdate: %s}", formatService(jfs.remoteUpdate)) } func (lfs RemoteServiceLeavesFederatedService) String() string { diff --git a/test/integration/multicluster/install_test.go b/test/integration/multicluster/install_test.go index 7114e729f..7dd7c3cd0 100644 --- a/test/integration/multicluster/install_test.go +++ b/test/integration/multicluster/install_test.go @@ -145,7 +145,10 @@ func TestInstallMulticluster(t *testing.T) { for k, ctx := range contexts { var out string var err error - args := []string{"--context=" + ctx, "multicluster", "install"} + args := []string{"--context=" + ctx, "multicluster", "install", + "--set", "localServiceMirror.excludedAnnotations=evil.linkerd/*\\,evil", + "--set", "localServiceMirror.excludedLabels=evil.linkerd/*\\,evil", + } // Source context should be installed without a gateway if k == testutil.SourceContextKey { @@ -222,6 +225,8 @@ func TestLinkClusters(t *testing.T) { "--cluster-name", linkName, "--api-server-address", fmt.Sprintf("https://%s:6443", lbIP), "multicluster", "link", + "--excluded-annotations", "evil.linkerd/*,evil", + "--excluded-labels", "evil.linkerd/*,evil", } if TestHelper.GetMulticlusterManageControllers() { linkCmd = append( diff --git a/test/integration/multicluster/multicluster-traffic/federated_test.go b/test/integration/multicluster/multicluster-traffic/federated_test.go index cc2a87a5e..91676bf2d 100644 --- a/test/integration/multicluster/multicluster-traffic/federated_test.go +++ b/test/integration/multicluster/multicluster-traffic/federated_test.go @@ -52,26 +52,56 @@ func TestFederatedService(t *testing.T) { testutil.AnnotatedFatalf(t, "failed to install emojivoto", "failed to install emojivoto: %s\n%s", err, out) } - // Label the service to join the federated service. + // Label the service to join the federated service and add + // labels to the service so we can ensure they are copied + // correctly to the federated service. timeout := time.Minute err = testutil.RetryFor(timeout, func() error { - out, err = TestHelper.KubectlWithContext("", ctx, "--namespace", ns, "label", "service/web-svc", "mirror.linkerd.io/federated=member") - return err + for _, label := range []string{ + "mirror.linkerd.io/federated=member", + "evil.linkerd/a=b", + "evil=yes", + "good.linkerd/c=d", + "good=yes", + } { + out, err = TestHelper.KubectlWithContext("", ctx, "--namespace", ns, "label", "service/web-svc", label) + if err != nil { + return err + } + } + out, err = TestHelper.KubectlWithContext("", ctx, "--namespace", ns, "label", "service/web-svc", "test-context="+ctx) + if err != nil { + return err + } + return nil }) if err != nil { testutil.AnnotatedFatalf(t, "failed to label web-svc", "%s\n%s", err, out) } - // Add metadata to each service so we can ensure it is copied - // correctly to the federated service. - out, err = TestHelper.KubectlWithContext("", ctx, "--namespace", ns, "annotate", "service/web-svc", "test-context="+ctx) + // Add annotations to the service so we can ensure they are + // copied correctly to the federated service. + err = testutil.RetryFor(timeout, func() error { + for _, annotation := range []string{ + "evil.linkerd/a=b", + "evil=yes", + "good.linkerd/c=d", + "good=yes", + } { + out, err = TestHelper.KubectlWithContext("", ctx, "--namespace", ns, "annotate", "service/web-svc", annotation) + if err != nil { + return err + } + } + out, err = TestHelper.KubectlWithContext("", ctx, "--namespace", ns, "annotate", "service/web-svc", "test-context="+ctx) + if err != nil { + return err + } + return nil + }) if err != nil { testutil.AnnotatedFatalf(t, "failed to annotate web-svc", "%s\n%s", err, out) } - out, err = TestHelper.KubectlWithContext("", ctx, "--namespace", ns, "label", "service/web-svc", "test-context="+ctx) - if err != nil { - testutil.AnnotatedFatalf(t, "failed to label web-svc", "%s\n%s", err, out) - } } }) @@ -149,6 +179,42 @@ func TestFederatedService(t *testing.T) { } }) + t.Run("Check if federated service has correct metadata", func(t *testing.T) { + timeout := time.Minute + err := testutil.RetryFor(timeout, func() error { + err := CheckAnnotation(contexts[testutil.SourceContextKey], ns, "web-svc-federated", "evil\\.linkerd/a", "") // Should be excluded. + if err != nil { + return err + } + err = CheckAnnotation(contexts[testutil.SourceContextKey], ns, "web-svc-federated", "good", "yes") // Should be included. + if err != nil { + return err + } + err = CheckAnnotation(contexts[testutil.SourceContextKey], ns, "web-svc-federated", "good\\.linkerd/c", "d") // Should be included. + if err != nil { + return err + } + + err = CheckLabel(contexts[testutil.SourceContextKey], ns, "web-svc-federated", "evil", "") // Should be excluded. + if err != nil { + return err + } + err = CheckLabel(contexts[testutil.SourceContextKey], ns, "web-svc-federated", "evil\\.linkerd/a", "") // Should be excluded. + if err != nil { + return err + } + err = CheckLabel(contexts[testutil.SourceContextKey], ns, "web-svc-federated", "good", "yes") // Should be included. + if err != nil { + return err + } + err = CheckLabel(contexts[testutil.SourceContextKey], ns, "web-svc-federated", "good\\.linkerd/c", "d") // Should be included. + return err + }) + if err != nil { + testutil.AnnotatedFatalf(t, "incorrect service metadata", "incorrect service metadata: %s", err) + } + }) + for _, ctx := range contexts { err := testutil.RetryFor(timeout, func() error { out, err := TestHelper.KubectlWithContext("", diff --git a/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go b/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go index 3a2fbd0dc..6d37c7f37 100644 --- a/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go +++ b/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go @@ -189,12 +189,81 @@ func TestTargetTraffic(t *testing.T) { timeout := time.Minute err = testutil.RetryFor(timeout, func() error { - out, err = TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace", ns, "label", "service/web-svc", "mirror.linkerd.io/exported=true") - return err + for _, label := range []string{ + "mirror.linkerd.io/exported=true", + "evil.linkerd/a=b", + "evil=yes", + "good.linkerd/c=d", + "good=yes", + } { + out, err = TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace", ns, "label", "service/web-svc", label) + if err != nil { + return err + } + } + return nil }) if err != nil { testutil.AnnotatedFatalf(t, "failed to label web-svc", "%s\n%s", err, out) } + + err = testutil.RetryFor(timeout, func() error { + for _, annotation := range []string{ + "evil.linkerd/a=b", + "evil=yes", + "good.linkerd/c=d", + "good=yes", + } { + out, err = TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace", ns, "annotate", "service/web-svc", annotation) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + testutil.AnnotatedFatalf(t, "failed to annotate web-svc", "%s\n%s", err, out) + } + }) + + t.Run("Check if mirror service has correct metadata", func(t *testing.T) { + timeout := time.Minute + err := testutil.RetryFor(timeout, func() error { + err := CheckAnnotation(contexts[testutil.SourceContextKey], ns, "web-svc-target", "good", "yes") // Should be included. + if err != nil { + return err + } + err = CheckAnnotation(contexts[testutil.SourceContextKey], ns, "web-svc-target", "good\\.linkerd/c", "d") // Should be included. + if err != nil { + return err + } + err = CheckAnnotation(contexts[testutil.SourceContextKey], ns, "web-svc-target", "evil", "") // Should be excluded. + if err != nil { + return err + } + err = CheckAnnotation(contexts[testutil.SourceContextKey], ns, "web-svc-target", "evil\\.linkerd/a", "") // Should be excluded. + if err != nil { + return err + } + + err = CheckLabel(contexts[testutil.SourceContextKey], ns, "web-svc-target", "good", "yes") // Should be included. + if err != nil { + return err + } + err = CheckLabel(contexts[testutil.SourceContextKey], ns, "web-svc-target", "good\\.linkerd/c", "d") // Should be included. + if err != nil { + return err + } + err = CheckLabel(contexts[testutil.SourceContextKey], ns, "web-svc-target", "evil", "") // Should be excluded. + if err != nil { + return err + } + err = CheckLabel(contexts[testutil.SourceContextKey], ns, "web-svc-target", "evil\\.linkerd/a", "") // Should be excluded. + return err + }) + if err != nil { + testutil.AnnotatedFatalf(t, "incorrect service metadata", "incorrect service metadata: %s", err) + } }) t.Run("Wait until target workloads are ready", func(t *testing.T) { @@ -353,3 +422,25 @@ func TestTargetResourcesAreCleaned(t *testing.T) { "failed to delete %s namespace: %s", "linkerd-nginx-gateway-deploy", err) } } + +func CheckAnnotation(context, ns, svc, annotation, expected string) error { + out, err := TestHelper.KubectlWithContext("", context, "--namespace", ns, "get", "service", svc, fmt.Sprintf("-ojsonpath={.metadata.annotations.%s}", annotation)) + if err != nil { + return fmt.Errorf("Failed to get annotation %s on service %s: %w", annotation, svc, err) + } + if out != expected { + return fmt.Errorf("Expected annotation %s to be %s, got %s", annotation, expected, out) + } + return nil +} + +func CheckLabel(context, ns, svc, label, expected string) error { + out, err := TestHelper.KubectlWithContext("", context, "--namespace", ns, "get", "service", svc, fmt.Sprintf("-ojsonpath={.metadata.labels.%s}", label)) + if err != nil { + return fmt.Errorf("Failed to get label %s on service %s: %w", label, svc, err) + } + if out != expected { + return fmt.Errorf("Expected label %s to be %s, got %s", label, expected, out) + } + return nil +}