Fix multicluster headless svc mirroring (#8474)

* Fix multicluster headless svc mirroring

Fixes #8469

** Note this needs to be back-ported into 2.11.x **

When labeling a headless service for exporting, the service-mirror controller was ignoring the change because `isExportedEndpoints()` was checked on the _old_ version of the EP update (which doesn't have the label).

This change checks on both the old and new version of the updated EP (and so I had to move some of the stuff out of `isExportedEndpoints()` into the calling sites, to avoid dupped log entries when calling it).

In order to test this, I removed the `mirror.linkerd.io/exported` label from the nginx SS and instead label it in the test after the SS persists.

* Consolidated isExportedEndpoints() and isExportedService() into isExported(), which uses the label configured in the Link
This commit is contained in:
Alejandro Pedraza 2022-05-12 15:14:36 -05:00 committed by GitHub
parent c4a85aeac8
commit 1d0b9a659d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 37 deletions

View File

@ -626,22 +626,13 @@ func (rcsw *RemoteClusterServiceWatcher) createGatewayEndpoints(ctx context.Cont
return nil
}
func (rcsw *RemoteClusterServiceWatcher) isExportedService(service *corev1.Service) bool {
selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector)
if err != nil {
rcsw.log.Errorf("Invalid service selector: %s", err)
return false
}
return selector.Matches(labels.Set(service.Labels))
}
// this method is common to both CREATE and UPDATE because if we have been
// offline for some time due to a crash a CREATE for a service that we have
// observed before is simply a case of UPDATE
func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.Service) error {
localName := rcsw.mirroredResourceName(service.Name)
if rcsw.isExportedService(service) {
if rcsw.isExported(service.Labels) {
localService, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(localName)
if err != nil {
if kerrors.IsNotFound(err) {
@ -698,7 +689,7 @@ func (rcsw *RemoteClusterServiceWatcher) getMirrorServices() (*corev1.ServiceLis
}
func (rcsw *RemoteClusterServiceWatcher) handleOnDelete(service *corev1.Service) {
if rcsw.isExportedService(service) {
if rcsw.isExported(service.Labels) {
rcsw.eventsQueue.Add(&RemoteServiceDeleted{
Name: service.Name,
Namespace: service.Namespace,
@ -811,7 +802,7 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
}
rcsw.eventsQueue.Add(&OnDeleteCalled{service})
},
UpdateFunc: func(old, new interface{}) {
UpdateFunc: func(_, new interface{}) {
rcsw.eventsQueue.Add(&OnUpdateCalled{new.(*corev1.Service)})
},
},
@ -825,7 +816,18 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
return
}
if !isExportedEndpoints(obj, rcsw.log) || !isHeadlessEndpoints(obj, rcsw.log) {
ep, ok := obj.(*corev1.Endpoints)
if !ok {
rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep)
return
}
if !rcsw.isExported(ep.Labels) {
rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, consts.DefaultExportedServiceSelector)
return
}
if !isHeadlessEndpoints(ep, rcsw.log) {
return
}
@ -837,11 +839,24 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
return
}
if !isExportedEndpoints(old, rcsw.log) {
epOld, ok := old.(*corev1.Endpoints)
if !ok {
rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", epOld)
return
}
rcsw.eventsQueue.Add(&OnUpdateEndpointsCalled{new.(*corev1.Endpoints)})
epNew, ok := new.(*corev1.Endpoints)
if !ok {
rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", epNew)
return
}
if !rcsw.isExported(epOld.Labels) && !rcsw.isExported(epNew.Labels) {
rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", epNew.Namespace, epNew.Name, consts.DefaultExportedServiceSelector)
return
}
rcsw.eventsQueue.Add(&OnUpdateEndpointsCalled{epNew})
},
},
)
@ -1113,7 +1128,7 @@ func (rcsw *RemoteClusterServiceWatcher) createMirrorEndpoints(ctx context.Conte
rcsw.updateReadiness(endpoints)
_, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Create(ctx, endpoints, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("Failed to create mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err)
return fmt.Errorf("failed to create mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err)
}
return nil
}
@ -1126,7 +1141,7 @@ func (rcsw *RemoteClusterServiceWatcher) updateMirrorEndpoints(ctx context.Conte
rcsw.updateReadiness(endpoints)
_, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Update(ctx, endpoints, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("Failed to update mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err)
return fmt.Errorf("failed to update mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err)
}
return err
}
@ -1141,17 +1156,11 @@ func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpo
}
}
func isExportedEndpoints(obj interface{}, log *logging.Entry) bool {
ep, ok := obj.(*corev1.Endpoints)
if !ok {
log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep)
func (rcsw *RemoteClusterServiceWatcher) isExported(l map[string]string) bool {
selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector)
if err != nil {
rcsw.log.Errorf("Invalid selector: %s", err)
return false
}
if _, found := ep.Labels[consts.DefaultExportedServiceSelector]; !found {
log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, consts.DefaultExportedServiceSelector)
return false
}
return true
return selector.Matches(labels.Set(l))
}

View File

@ -417,13 +417,7 @@ func shouldExportAsHeadlessService(endpoints *corev1.Endpoints, log *logging.Ent
// isHeadlessEndpoints checks if an endpoints object belongs to a
// headless service.
func isHeadlessEndpoints(obj interface{}, log *logging.Entry) bool {
ep, ok := obj.(*corev1.Endpoints)
if !ok {
log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep)
return false
}
func isHeadlessEndpoints(ep *corev1.Endpoints, log *logging.Entry) bool {
if _, found := ep.Labels[corev1.IsHeadlessService]; !found {
// Not an Endpoints object for a headless service? Then we likely don't want
// to update anything.

View File

@ -13,6 +13,7 @@ import (
"time"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/version"
"github.com/linkerd/linkerd2/testutil"
)
@ -276,6 +277,11 @@ func TestMulticlusterStatefulSetTargetTraffic(t *testing.T) {
}
})
_, err := TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace="+ns, "label", "svc", "nginx-statefulset-svc", k8s.DefaultExportedServiceSelector+"=true")
if err != nil {
testutil.AnnotatedFatal(t, "failed to label nginx-statefulset-svc service", err)
}
dgCmd := []string{"--context=" + targetCtx, "diagnostics", "proxy-metrics", "--namespace",
"linkerd-multicluster", "deploy/linkerd-gateway"}
t.Run("expect open outbound TCP connection from gateway to nginx", func(t *testing.T) {

View File

@ -3,8 +3,6 @@ kind: Service
metadata:
name: nginx-statefulset-svc
namespace: linkerd-multicluster-statefulset
labels:
mirror.linkerd.io/exported: "true"
spec:
clusterIP: None
clusterIPs: