From 1d0b9a659dea5fc722b4c1d848a23a09d0acb6ab Mon Sep 17 00:00:00 2001 From: Alejandro Pedraza Date: Thu, 12 May 2022 15:14:36 -0500 Subject: [PATCH] Fix multicluster headless svc mirroring (#8474) * Fix multicluster headless svc mirroring Fixes #8469 ** Note this needs to be back-ported into 2.11.x ** When labeling a headless service for exporting, the service-mirror controller was ignoring the change because `isExportedEndpoints()` was checked on the _old_ version of the EP update (which doesn't have the label). This change checks on both the old and new version of the updated EP (and so I had to move some of the stuff out of `isExportedEndpoints()` into the calling sites, to avoid dupped log entries when calling it). In order to test this, I removed the `mirror.linkerd.io/exported` label from the nginx SS and instead label it in the test after the SS persists. * Consolidated isExportedEndpoints() and isExportedService() into isExported(), which uses the label configured in the Link --- .../service-mirror/cluster_watcher.go | 65 +++++++++++-------- .../cluster_watcher_headless.go | 8 +-- .../multicluster-traffic/mc_traffic_test.go | 6 ++ .../testdata/nginx-ss.yml | 2 - 4 files changed, 44 insertions(+), 37 deletions(-) diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index 3da1dc8d9..8dd607bd4 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -626,22 +626,13 @@ func (rcsw *RemoteClusterServiceWatcher) createGatewayEndpoints(ctx context.Cont return nil } -func (rcsw *RemoteClusterServiceWatcher) isExportedService(service *corev1.Service) bool { - selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector) - if err != nil { - rcsw.log.Errorf("Invalid service selector: %s", err) - return false - } - return selector.Matches(labels.Set(service.Labels)) -} - // this method is common to both CREATE and UPDATE because if we have been // offline for some time due to a crash a CREATE for a service that we have // observed before is simply a case of UPDATE func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.Service) error { localName := rcsw.mirroredResourceName(service.Name) - if rcsw.isExportedService(service) { + if rcsw.isExported(service.Labels) { localService, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(localName) if err != nil { if kerrors.IsNotFound(err) { @@ -698,7 +689,7 @@ func (rcsw *RemoteClusterServiceWatcher) getMirrorServices() (*corev1.ServiceLis } func (rcsw *RemoteClusterServiceWatcher) handleOnDelete(service *corev1.Service) { - if rcsw.isExportedService(service) { + if rcsw.isExported(service.Labels) { rcsw.eventsQueue.Add(&RemoteServiceDeleted{ Name: service.Name, Namespace: service.Namespace, @@ -811,7 +802,7 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { } rcsw.eventsQueue.Add(&OnDeleteCalled{service}) }, - UpdateFunc: func(old, new interface{}) { + UpdateFunc: func(_, new interface{}) { rcsw.eventsQueue.Add(&OnUpdateCalled{new.(*corev1.Service)}) }, }, @@ -825,7 +816,18 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { return } - if !isExportedEndpoints(obj, rcsw.log) || !isHeadlessEndpoints(obj, rcsw.log) { + ep, ok := obj.(*corev1.Endpoints) + if !ok { + rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep) + return + } + + if !rcsw.isExported(ep.Labels) { + rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, consts.DefaultExportedServiceSelector) + return + } + + if !isHeadlessEndpoints(ep, rcsw.log) { return } @@ -837,11 +839,24 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { return } - if !isExportedEndpoints(old, rcsw.log) { + epOld, ok := old.(*corev1.Endpoints) + if !ok { + rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", epOld) return } - rcsw.eventsQueue.Add(&OnUpdateEndpointsCalled{new.(*corev1.Endpoints)}) + epNew, ok := new.(*corev1.Endpoints) + if !ok { + rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", epNew) + return + } + + if !rcsw.isExported(epOld.Labels) && !rcsw.isExported(epNew.Labels) { + rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", epNew.Namespace, epNew.Name, consts.DefaultExportedServiceSelector) + return + } + + rcsw.eventsQueue.Add(&OnUpdateEndpointsCalled{epNew}) }, }, ) @@ -1113,7 +1128,7 @@ func (rcsw *RemoteClusterServiceWatcher) createMirrorEndpoints(ctx context.Conte rcsw.updateReadiness(endpoints) _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Create(ctx, endpoints, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("Failed to create mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err) + return fmt.Errorf("failed to create mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err) } return nil } @@ -1126,7 +1141,7 @@ func (rcsw *RemoteClusterServiceWatcher) updateMirrorEndpoints(ctx context.Conte rcsw.updateReadiness(endpoints) _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Update(ctx, endpoints, metav1.UpdateOptions{}) if err != nil { - return fmt.Errorf("Failed to update mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err) + return fmt.Errorf("failed to update mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err) } return err } @@ -1141,17 +1156,11 @@ func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpo } } -func isExportedEndpoints(obj interface{}, log *logging.Entry) bool { - ep, ok := obj.(*corev1.Endpoints) - if !ok { - log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep) +func (rcsw *RemoteClusterServiceWatcher) isExported(l map[string]string) bool { + selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector) + if err != nil { + rcsw.log.Errorf("Invalid selector: %s", err) return false } - - if _, found := ep.Labels[consts.DefaultExportedServiceSelector]; !found { - log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, consts.DefaultExportedServiceSelector) - return false - } - - return true + return selector.Matches(labels.Set(l)) } diff --git a/multicluster/service-mirror/cluster_watcher_headless.go b/multicluster/service-mirror/cluster_watcher_headless.go index caa85da45..a4a392c53 100644 --- a/multicluster/service-mirror/cluster_watcher_headless.go +++ b/multicluster/service-mirror/cluster_watcher_headless.go @@ -417,13 +417,7 @@ func shouldExportAsHeadlessService(endpoints *corev1.Endpoints, log *logging.Ent // isHeadlessEndpoints checks if an endpoints object belongs to a // headless service. -func isHeadlessEndpoints(obj interface{}, log *logging.Entry) bool { - ep, ok := obj.(*corev1.Endpoints) - if !ok { - log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep) - return false - } - +func isHeadlessEndpoints(ep *corev1.Endpoints, log *logging.Entry) bool { if _, found := ep.Labels[corev1.IsHeadlessService]; !found { // Not an Endpoints object for a headless service? Then we likely don't want // to update anything. diff --git a/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go b/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go index 2ff65dcee..7b21d2d5f 100644 --- a/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go +++ b/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/linkerd/linkerd2/pkg/healthcheck" + "github.com/linkerd/linkerd2/pkg/k8s" "github.com/linkerd/linkerd2/pkg/version" "github.com/linkerd/linkerd2/testutil" ) @@ -276,6 +277,11 @@ func TestMulticlusterStatefulSetTargetTraffic(t *testing.T) { } }) + _, err := TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace="+ns, "label", "svc", "nginx-statefulset-svc", k8s.DefaultExportedServiceSelector+"=true") + if err != nil { + testutil.AnnotatedFatal(t, "failed to label nginx-statefulset-svc service", err) + } + dgCmd := []string{"--context=" + targetCtx, "diagnostics", "proxy-metrics", "--namespace", "linkerd-multicluster", "deploy/linkerd-gateway"} t.Run("expect open outbound TCP connection from gateway to nginx", func(t *testing.T) { diff --git a/test/integration/multicluster/multicluster-traffic/testdata/nginx-ss.yml b/test/integration/multicluster/multicluster-traffic/testdata/nginx-ss.yml index d189c3e85..cdb70029c 100644 --- a/test/integration/multicluster/multicluster-traffic/testdata/nginx-ss.yml +++ b/test/integration/multicluster/multicluster-traffic/testdata/nginx-ss.yml @@ -3,8 +3,6 @@ kind: Service metadata: name: nginx-statefulset-svc namespace: linkerd-multicluster-statefulset - labels: - mirror.linkerd.io/exported: "true" spec: clusterIP: None clusterIPs: