From 72fc94b03c1869f483c9b53f87228ad6460c36ff Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 4 Mar 2020 20:17:21 +0200 Subject: [PATCH] Service mirroring tests (#4115) Unit tests that exercise most of the code in cluster_watcher.go. Essentially the whole cluster mirroring machinary can be tought of as a function that takes remote cluster state, local cluster state, and modification events and as a result it either modifies local cluster state or issues new events onto the queue. This is what these tests are trying to model. I think this covers a lot of the logic there. Any suggestions for other edge cases are welcome. Signed-off-by: Zahari Dichev --- .../cmd/service-mirror/cluster_watcher.go | 109 +- .../service-mirror/cluster_watcher_test.go | 1003 +++++++++++++++++ .../cluster_watcher_test_util.go | 214 ++++ 3 files changed, 1278 insertions(+), 48 deletions(-) create mode 100644 controller/cmd/service-mirror/cluster_watcher_test.go create mode 100644 controller/cmd/service-mirror/cluster_watcher_test_util.go diff --git a/controller/cmd/service-mirror/cluster_watcher.go b/controller/cmd/service-mirror/cluster_watcher.go index 323564f77..782be2b0f 100644 --- a/controller/cmd/service-mirror/cluster_watcher.go +++ b/controller/cmd/service-mirror/cluster_watcher.go @@ -39,9 +39,8 @@ type ( // RemoteServiceCreated is generated whenever a remote service is created Observing // this event means that the service in question is not mirrored atm RemoteServiceCreated struct { - service *corev1.Service - gatewayData *gatewayMetadata - newResourceVersion string + service *corev1.Service + gatewayData *gatewayMetadata } // RemoteServiceUpdated is generated when we see something about an already @@ -65,8 +64,7 @@ type ( // RemoteGatewayDeleted is observed when a service that is a gateway to at least // one already mirrored service is deleted RemoteGatewayDeleted struct { - gatewayData *gatewayMetadata - affectedEndpoints []*corev1.Endpoints + gatewayData *gatewayMetadata } // RemoteGatewayUpdated happens when a service that is a gateway to at least @@ -376,14 +374,23 @@ func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources() error { func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ev *RemoteServiceDeleted) error { localServiceName := rcsw.mirroredResourceName(ev.Name) rcsw.log.Debugf("Deleting mirrored service %s/%s and its corresponding Endpoints", ev.Namespace, localServiceName) + var errors []error if err := rcsw.localAPIClient.Client.CoreV1().Services(ev.Namespace).Delete(localServiceName, &metav1.DeleteOptions{}); err != nil { - if kerrors.IsNotFound(err) { - return nil + if !kerrors.IsNotFound(err) { + errors = append(errors, fmt.Errorf("could not delete Service: %s/%s: %s", ev.Namespace, localServiceName, err)) } - // we can try deleting it again - return RetryableError{[]error{fmt.Errorf("could not delete Service: %s/%s: %s", ev.Namespace, localServiceName, err)}} - } + + if err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.Namespace).Delete(localServiceName, &metav1.DeleteOptions{}); err != nil { + if !kerrors.IsNotFound(err) { + errors = append(errors, fmt.Errorf("could not delete Endpoints: %s/%s: %s", ev.Namespace, localServiceName, err)) + } + } + + if len(errors) > 0 { + return RetryableError{errors} + } + rcsw.log.Debugf("Successfully deleted Service: %s/%s", ev.Namespace, localServiceName) return nil } @@ -402,6 +409,10 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ev *RemoteSe Ports: rcsw.getEndpointsPorts(ev.remoteUpdate, gatewayPort), }, } + + ev.localEndpoints.Labels[consts.RemoteGatewayNameLabel] = ev.gatewayData.Name + ev.localEndpoints.Labels[consts.RemoteGatewayNsLabel] = ev.gatewayData.Namespace + } else { rcsw.log.Warnf("Could not resolve gateway for %s: %s, nulling endpoints", serviceInfo, err) ev.localEndpoints.Subsets = nil @@ -617,11 +628,7 @@ func (rcsw *RemoteClusterServiceWatcher) handleConsiderGatewayUpdateDispatch(eve rcsw.log.Warnf("Gateway [%s/%s] is not a compliant gateway anymore, dispatching GatewayDeleted event: %s", event.maybeGateway.Namespace, event.maybeGateway.Name, err) // in case something changed about this gateway and it is not really a gateway anymore, // simply dispatch deletion event so all endpoints are nulled - endpoints, err := rcsw.endpointsForGateway(gatewayMeta) - if err != nil { - return RetryableError{[]error{err}} - } - rcsw.eventsQueue.AddRateLimited(&RemoteGatewayDeleted{gatewayMeta, endpoints}) + rcsw.eventsQueue.AddRateLimited(&RemoteGatewayDeleted{gatewayMeta}) } else { affectedServices, err := rcsw.affectedMirroredServicesForGatewayUpdate(gtwMetadata, event.maybeGateway.ResourceVersion) if err != nil { @@ -662,9 +669,8 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.S // we are not mirroring but has gateway data, so we need // to create it rcsw.eventsQueue.Add(&RemoteServiceCreated{ - service: service, - gatewayData: gtwData, - newResourceVersion: service.ResourceVersion, + service: service, + gatewayData: gtwData, }) } else { // at this point we know that we do not have such a service @@ -769,41 +775,48 @@ func (rcsw *RemoteClusterServiceWatcher) handleOnDelete(service *corev1.Service) } } +func (rcsw *RemoteClusterServiceWatcher) processNextEvent() (bool, interface{}, error) { + event, done := rcsw.eventsQueue.Get() + var err error + switch ev := event.(type) { + case *OnAddCalled: + err = rcsw.createOrUpdateService(ev.svc) + case *OnUpdateCalled: + err = rcsw.createOrUpdateService(ev.svc) + case *OnDeleteCalled: + rcsw.handleOnDelete(ev.svc) + case *RemoteServiceCreated: + err = rcsw.handleRemoteServiceCreated(ev) + case *RemoteServiceUpdated: + err = rcsw.handleRemoteServiceUpdated(ev) + case *RemoteServiceDeleted: + err = rcsw.handleRemoteServiceDeleted(ev) + case *RemoteGatewayUpdated: + err = rcsw.handleRemoteGatewayUpdated(ev) + case *RemoteGatewayDeleted: + err = rcsw.handleRemoteGatewayDeleted(ev) + case *ConsiderGatewayUpdateDispatch: + err = rcsw.handleConsiderGatewayUpdateDispatch(ev) + case *ClusterUnregistered: + err = rcsw.cleanupMirroredResources() + case *OprhanedServicesGcTriggered: + err = rcsw.cleanupOrphanedServices() + default: + if ev != nil || !done { // we get a nil in case we are shutting down... + rcsw.log.Warnf("Received unknown event: %v", ev) + } + } + + return done, event, err + +} + // the main processing loop in which we handle more domain specific events // and deal with retries func (rcsw *RemoteClusterServiceWatcher) processEvents() { for { - event, done := rcsw.eventsQueue.Get() - var err error - switch ev := event.(type) { - case *OnAddCalled: - err = rcsw.createOrUpdateService(ev.svc) - case *OnUpdateCalled: - err = rcsw.createOrUpdateService(ev.svc) - case *OnDeleteCalled: - rcsw.handleOnDelete(ev.svc) - case *RemoteServiceCreated: - err = rcsw.handleRemoteServiceCreated(ev) - case *RemoteServiceUpdated: - err = rcsw.handleRemoteServiceUpdated(ev) - case *RemoteServiceDeleted: - err = rcsw.handleRemoteServiceDeleted(ev) - case *RemoteGatewayUpdated: - err = rcsw.handleRemoteGatewayUpdated(ev) - case *RemoteGatewayDeleted: - err = rcsw.handleRemoteGatewayDeleted(ev) - case *ConsiderGatewayUpdateDispatch: - err = rcsw.handleConsiderGatewayUpdateDispatch(ev) - case *ClusterUnregistered: - err = rcsw.cleanupMirroredResources() - case *OprhanedServicesGcTriggered: - err = rcsw.cleanupOrphanedServices() - default: - if ev != nil || !done { // we get a nil in case we are shutting down... - rcsw.log.Warnf("Received unknown event: %v", ev) - } - } + done, event, err := rcsw.processNextEvent() // the logic here is that there might have been an API // connectivity glitch or something. So its not a bad idea to requeue // the event and try again up to a number of limits, just to ensure diff --git a/controller/cmd/service-mirror/cluster_watcher_test.go b/controller/cmd/service-mirror/cluster_watcher_test.go new file mode 100644 index 000000000..eb8980aa1 --- /dev/null +++ b/controller/cmd/service-mirror/cluster_watcher_test.go @@ -0,0 +1,1003 @@ +package servicemirror + +import ( + "fmt" + "reflect" + "testing" + + "github.com/linkerd/linkerd2/controller/k8s" + logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" +) + +const ( + clusterName = "remote" + clusterDomain = "cluster.local" +) + +type testCase struct { + testDescription string + events []interface{} + remoteResources []string + localResources []string + expectedLocalServices []*corev1.Service + expectedLocalEndpoints []*corev1.Endpoints + expectedEventsInQueue []interface{} +} + +func runTestCase(tc *testCase, t *testing.T) { + t.Run(tc.testDescription, func(t *testing.T) { + remoteAPI, err := k8s.NewFakeAPI(tc.remoteResources...) + if err != nil { + t.Fatal(err) + } + + localAPI, err := k8s.NewFakeAPI(tc.localResources...) + if err != nil { + t.Fatal(err) + } + + remoteAPI.Sync(nil) + localAPI.Sync(nil) + + q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + + watcher := RemoteClusterServiceWatcher{ + clusterName: clusterName, + clusterDomain: clusterDomain, + remoteAPIClient: remoteAPI, + localAPIClient: localAPI, + stopper: nil, + log: logging.WithFields(logging.Fields{"cluster": clusterName}), + eventsQueue: q, + requeueLimit: 0, + } + + for _, ev := range tc.events { + q.Add(ev) + } + + for range tc.events { + watcher.processNextEvent() + } + + localAPI.Sync(nil) + remoteAPI.Sync(nil) + + if tc.expectedLocalServices == nil { + // ensure the are no local services + services, err := localAPI.Client.CoreV1().Services(corev1.NamespaceAll).List(metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + if len(services.Items) > 0 { + t.Fatalf("Was expecting no local services but instead found %v", services.Items) + + } + } else { + for _, expected := range tc.expectedLocalServices { + actual, err := localAPI.Client.CoreV1().Services(expected.Namespace).Get(expected.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Could not find mirrored service with name %s", expected.Name) + } + + if err := diffServices(expected, actual); err != nil { + t.Fatal(err) + } + } + } + + if tc.expectedLocalEndpoints == nil { + // ensure the are no local endpoints + endpoints, err := localAPI.Client.CoreV1().Endpoints(corev1.NamespaceAll).List(metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + if len(endpoints.Items) > 0 { + t.Fatalf("Was expecting no local endpoints but instead found %d", len(endpoints.Items)) + + } + } else { + for _, expected := range tc.expectedLocalEndpoints { + actual, err := localAPI.Client.CoreV1().Endpoints(expected.Namespace).Get(expected.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Could not find endpoints with name %s", expected.Name) + } + + if err := diffEndpoints(expected, actual); err != nil { + t.Fatal(err) + } + } + } + + expectedNumEvents := len(tc.expectedEventsInQueue) + actualNumEvents := watcher.eventsQueue.Len() + + if expectedNumEvents != actualNumEvents { + t.Fatalf("Was expecting %d events but got %d", expectedNumEvents, actualNumEvents) + } + + for _, ev := range tc.expectedEventsInQueue { + evInQueue, _ := watcher.eventsQueue.Get() + if !reflect.DeepEqual(ev, evInQueue) { + t.Fatalf("was expecting to see event %T but got %T", ev, evInQueue) + } + } + }) +} + +func TestRemoteServiceCreated(t *testing.T) { + for _, tt := range []testCase{ + { + testDescription: "create service and endpoints when gateway cannot be resolved", + events: []interface{}{ + &RemoteServiceCreated{ + service: remoteService("service-one", "ns1", "missing-gateway", "missing-namespace", "111", nil), + gatewayData: &gatewayMetadata{ + Name: "missing-gateway", + Namespace: "missing-namespace", + }, + }, + }, + expectedLocalServices: []*corev1.Service{ + mirroredService("service-one-remote", "ns1", "missing-gateway", "missing-namespace", "111", "", nil), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("service-one-remote", "ns1", "missing-gateway", "missing-namespace", "", nil), + }, + }, + { + testDescription: "create service and endpoints without subsets when gateway spec is wrong", + events: []interface{}{ + &RemoteServiceCreated{ + service: remoteService("service-one", "ns1", "existing-gateway", "existing-namespace", + "111", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + + gatewayData: &gatewayMetadata{ + Name: "existing-gateway", + Namespace: "existing-namespace", + }, + }, + }, + expectedLocalServices: []*corev1.Service{ + mirroredService("service-one-remote", "ns1", "existing-gateway", "existing-namespace", "111", "", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("service-one-remote", "ns1", "existing-gateway", "existing-namespace", "", nil), + }, + remoteResources: []string{ + gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "incoming-port-wrong", 888, t), + }, + }, + { + testDescription: "create service and endpoints when gateway can be resolved", + events: []interface{}{ + &RemoteServiceCreated{ + + service: remoteService("service-one", "ns1", "existing-gateway", "existing-namespace", "111", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + gatewayData: &gatewayMetadata{ + Name: "existing-gateway", + Namespace: "existing-namespace", + }, + }, + }, + expectedLocalServices: []*corev1.Service{ + mirroredService( + "service-one-remote", + "ns1", + "existing-gateway", + "existing-namespace", + "111", + "222", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("service-one-remote", "ns1", "existing-gateway", "existing-namespace", "192.0.2.127", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + }), + }, + remoteResources: []string{ + gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "incoming-port", 888, t), + }, + }, + } { + tc := tt // pin + runTestCase(&tc, t) + } +} + +func TestRemoteServiceDeleted(t *testing.T) { + for _, tt := range []testCase{ + { + testDescription: "deletes locally mirrored service", + events: []interface{}{ + &RemoteServiceDeleted{ + Name: "test-service-remote-to-delete", + Namespace: "test-namespace-to-delete", + }, + }, + + localResources: []string{ + mirroredServiceAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", "", "", "", nil, t), + endpointsAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", "", "", nil, t), + }, + }, + } { + tc := tt // pin + runTestCase(&tc, t) + } +} + +func TestRemoteServiceUpdated(t *testing.T) { + for _, tt := range []testCase{ + { + testDescription: "update to new gateway", + events: []interface{}{ + &RemoteServiceUpdated{ + remoteUpdate: remoteService("test-service", "test-namespace", "gateway-new", "gateway-ns", "currentServiceResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 222, + }, + }), + localService: mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastServiceResVersion", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 222, + }, + }), + localEndpoints: endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + }), + gatewayData: &gatewayMetadata{ + Name: "gateway-new", + Namespace: "gateway-ns", + }, + }, + }, + remoteResources: []string{ + gatewayAsYaml("gateway-new", "gateway-ns", "currentGatewayResVersion", "0.0.0.0", "incoming-port", 999, t), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "past", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 222, + }, + }, t), + endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + }, t), + }, + expectedLocalServices: []*corev1.Service{ + mirroredService( + "test-service-remote", + "test-namespace", + "gateway-new", + "gateway-ns", + "currentServiceResVersion", + "currentGatewayResVersion", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port3", + Protocol: "TCP", + Port: 333, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-remote", "test-namespace", "gateway-new", "gateway-ns", "0.0.0.0", []corev1.EndpointPort{ + { + Name: "port1", + Port: 999, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 999, + Protocol: "TCP", + }, + }), + }, + }, + { + testDescription: "updates service ports on both service and endpoints", + events: []interface{}{ + &RemoteServiceUpdated{ + remoteUpdate: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentServiceResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port3", + Protocol: "TCP", + Port: 333, + }, + }), + localService: mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastServiceResVersion", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 222, + }, + }), + localEndpoints: endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + }), + gatewayData: &gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + }, + }, + remoteResources: []string{ + gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "incoming-port", 888, t), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "past", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 222, + }, + { + Name: "port3", + Protocol: "TCP", + Port: 333, + }, + }, t), + endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port3", + Port: 888, + Protocol: "TCP", + }, + }, t), + }, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentServiceResVersion", "currentGatewayResVersion", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port3", + Protocol: "TCP", + Port: 333, + }, + }), + }, + + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port3", + Port: 888, + Protocol: "TCP", + }, + }), + }, + }, + } { + tc := tt // pin + runTestCase(&tc, t) + } +} + +func TestRemoteGatewayUpdated(t *testing.T) { + for _, tt := range []testCase{ + { + testDescription: "endpoints ports are updated on gateway change", + events: []interface{}{ + &RemoteGatewayUpdated{ + newPort: 999, + newEndpointAddresses: []corev1.EndpointAddress{{IP: "0.0.0.0"}}, + gatewayData: &gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + newResourceVersion: "currentGatewayResVersion", + affectedServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + }, + }, + + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 999, + Protocol: "TCP", + }}), + endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 999, + Protocol: "TCP", + }}), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }, t), + endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}, t), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }, t), + endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 888, + Protocol: "TCP", + }}, t), + }, + }, + + { + testDescription: "endpoints addresses are updated on gateway change", + events: []interface{}{ + &RemoteGatewayUpdated{ + newPort: 888, + newEndpointAddresses: []corev1.EndpointAddress{{IP: "0.0.0.1"}}, + gatewayData: &gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + newResourceVersion: "currentGatewayResVersion", + affectedServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + }, + }, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.1", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}), + endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.1", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 888, + Protocol: "TCP", + }}), + }, + + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }, t), + endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}, t), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }, t), + endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 888, + Protocol: "TCP", + }}, t), + }, + }, + } { + tc := tt // pin + runTestCase(&tc, t) + } +} +func TestRemoteGatewayDeleted(t *testing.T) { + for _, tt := range []testCase{ + { + testDescription: "removes endpoint subsets when gateway is deleted", + events: []interface{}{ + &RemoteGatewayDeleted{ + gatewayData: &gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + }, + }, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", nil), + endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", nil), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }, t), + endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}, t), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }, t), + endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 888, + Protocol: "TCP", + }}, t), + }, + }, + } { + tc := tt // pin + runTestCase(&tc, t) + } +} + +func TestClusterUnregistered(t *testing.T) { + for _, tt := range []testCase{ + { + testDescription: "unregisters cluster and cleans up all mirrored resources", + events: []interface{}{ + &ClusterUnregistered{}, + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "", "", "", "", nil, t), + endpointsAsYaml("test-service-1-remote", "test-namespace", "", "", "", nil, t), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "", "", "", "", nil, t), + endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", "", nil, t), + }, + }, + } { + tc := tt // pin + runTestCase(&tc, t) + } +} + +func TestGcOrphanedServices(t *testing.T) { + for _, tt := range []testCase{ + { + testDescription: "deletes mirrored resources that are no longer present on the remote cluster", + events: []interface{}{ + &OprhanedServicesGcTriggered{}, + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "", nil, t), + endpointsAsYaml("test-service-1-remote", "test-namespace", "", "", "", nil, t), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "", "", "", "", nil, t), + endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", "", nil, t), + }, + remoteResources: []string{ + remoteServiceAsYaml("test-service-1", "test-namespace", "gateway", "gateway-ns", "", nil, t), + }, + + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "", nil), + }, + + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-1-remote", "test-namespace", "", "", "", nil), + }, + }, + } { + tc := tt // pin + runTestCase(&tc, t) + } +} + +// the following tests ensure that onAdd, onUpdate and onDelete result in +// queueing more specific events to be processed + +func onAddOrUpdateEvent(isAdd bool, svc *corev1.Service) interface{} { + if isAdd { + return &OnAddCalled{svc: svc} + } + return &OnUpdateCalled{svc: svc} +} + +func onAddOrUpdateTestCases(t *testing.T, isAdd bool) []testCase { + + testType := "ADD" + if !isAdd { + testType = "UPDATE" + } + + return []testCase{ + { + testDescription: fmt.Sprintf("enqueue a RemoteServiceCreated event when this is not a gateway and we have the needed annotations (%s)", testType), + events: []interface{}{ + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "resVersion", nil)), + }, + expectedEventsInQueue: []interface{}{&RemoteServiceCreated{ + service: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "resVersion", nil), + gatewayData: &gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + }}, + }, + { + testDescription: fmt.Sprintf("enqueue a ConsiderGatewayUpdateDispatch event if this is clearly not a mirrorable service (%s)", testType), + events: []interface{}{ + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "", "", "resVersion", nil)), + }, + expectedEventsInQueue: []interface{}{&ConsiderGatewayUpdateDispatch{ + maybeGateway: remoteService("test-service", "test-namespace", "", "", "resVersion", nil), + }}, + }, + { + testDescription: fmt.Sprintf("enqueue a RemoteServiceUpdated event if this is a service that we have already mirrored and its res version is different (%s)", testType), + events: []interface{}{ + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil)), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastResourceVersion", "gatewayResVersion", nil, t), + endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", nil, t), + }, + expectedEventsInQueue: []interface{}{&RemoteServiceUpdated{ + localService: mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastResourceVersion", "gatewayResVersion", nil), + localEndpoints: endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", nil), + remoteUpdate: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil), + gatewayData: &gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + }}, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastResourceVersion", "gatewayResVersion", nil), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", nil), + }, + }, + { + testDescription: fmt.Sprintf("not not enqueue any events as this update does not really tell us anything new (res version is the same...) (%s)", testType), + events: []interface{}{ + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil)), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil, t), + endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", nil, t), + }, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", nil), + }, + }, + { + testDescription: fmt.Sprintf("enqueue RemoteServiceDeleted event as this service is not mirrorable anymore (%s)", testType), + events: []interface{}{ + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "", "gateway-ns", "currentResVersion", nil)), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil, t), + endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", nil, t), + }, + expectedEventsInQueue: []interface{}{&RemoteServiceDeleted{ + Name: "test-service", + Namespace: "test-namespace", + }}, + + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", nil), + }, + }, + } +} + +func TestOnAdd(t *testing.T) { + for _, tt := range onAddOrUpdateTestCases(t, true) { + tc := tt // pin + runTestCase(&tc, t) + } +} + +func TestOnUpdate(t *testing.T) { + for _, tt := range onAddOrUpdateTestCases(t, false) { + tc := tt // pin + runTestCase(&tc, t) + } +} + +func TestOnDelete(t *testing.T) { + for _, tt := range []testCase{ + { + testDescription: "enqueues a RemoteServiceDeleted because there is gateway metadata present on the service", + events: []interface{}{ + &OnDeleteCalled{ + svc: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil), + }, + }, + + expectedEventsInQueue: []interface{}{ + &RemoteServiceDeleted{ + Name: "test-service", + Namespace: "test-namespace", + }, + }, + }, + { + testDescription: "enqueues a RemoteGatewayDeleted because there is no gateway metadata present on the service", + events: []interface{}{ + &OnDeleteCalled{ + svc: remoteService("gateway", "test-namespace", "", "", "currentResVersion", nil), + }, + }, + expectedEventsInQueue: []interface{}{ + &RemoteGatewayDeleted{ + gatewayData: &gatewayMetadata{ + Name: "gateway", + Namespace: "test-namespace", + }, + }, + }, + }, + } { + tc := tt // pin + runTestCase(&tc, t) + } +} diff --git a/controller/cmd/service-mirror/cluster_watcher_test_util.go b/controller/cmd/service-mirror/cluster_watcher_test_util.go new file mode 100644 index 000000000..05d73a4a5 --- /dev/null +++ b/controller/cmd/service-mirror/cluster_watcher_test_util.go @@ -0,0 +1,214 @@ +package servicemirror + +import ( + "fmt" + "reflect" + "strings" + "testing" + + "github.com/ghodss/yaml" + consts "github.com/linkerd/linkerd2/pkg/k8s" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func diffServices(expected, actual *corev1.Service) error { + if expected.Name != actual.Name { + return fmt.Errorf("was expecting service with name %s but was %s", expected.Name, actual.Name) + } + + if expected.Namespace != actual.Namespace { + return fmt.Errorf("was expecting service with namespace %s but was %s", expected.Namespace, actual.Namespace) + } + + if !reflect.DeepEqual(expected.Annotations, actual.Annotations) { + return fmt.Errorf("was expecting service with annotations %v but got %v", expected.Annotations, actual.Annotations) + } + + if !reflect.DeepEqual(expected.Labels, actual.Labels) { + return fmt.Errorf("was expecting service with labels %v but got %v", expected.Labels, actual.Labels) + } + + return nil +} + +func diffEndpoints(expected, actual *corev1.Endpoints) error { + if expected.Name != actual.Name { + return fmt.Errorf("was expecting endpoints with name %s but was %s", expected.Name, actual.Name) + } + + if expected.Namespace != actual.Namespace { + return fmt.Errorf("was expecting endpoints with namespace %s but was %s", expected.Namespace, actual.Namespace) + } + + if !reflect.DeepEqual(expected.Annotations, actual.Annotations) { + return fmt.Errorf("was expecting endpoints with annotations %v but got %v", expected.Annotations, actual.Annotations) + } + + if !reflect.DeepEqual(expected.Labels, actual.Labels) { + return fmt.Errorf("was expecting endpoints with labels %v but got %v", expected.Labels, actual.Labels) + } + + if !reflect.DeepEqual(expected.Subsets, actual.Subsets) { + return fmt.Errorf("was expecting endpoints with subsets %v but got %v", expected.Subsets, actual.Subsets) + } + + return nil +} + +func remoteService(name, namespace, gtwName, gtwNs, resourceVersion string, ports []corev1.ServicePort) *corev1.Service { + annotations := make(map[string]string) + if gtwName != "" && gtwNs != "" { + annotations[consts.GatewayNameAnnotation] = gtwName + annotations[consts.GatewayNsAnnotation] = gtwNs + } + + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + Annotations: annotations, + }, + Spec: corev1.ServiceSpec{ + Ports: ports, + }, + } +} + +func remoteServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion string, ports []corev1.ServicePort, t *testing.T) string { + svc := remoteService(name, namespace, gtwName, gtwNs, resourceVersion, ports) + + bytes, err := yaml.Marshal(svc) + if err != nil { + t.Fatal(err) + } + return string(bytes) +} + +func mirroredService(name, namespace, gtwName, gtwNs, resourceVersion, gatewayResourceVersion string, ports []corev1.ServicePort) *corev1.Service { + annotations := make(map[string]string) + annotations[consts.RemoteResourceVersionAnnotation] = resourceVersion + annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.svc.cluster.local", strings.Replace(name, "-remote", "", 1), namespace) + + if gatewayResourceVersion != "" { + annotations[consts.RemoteGatewayResourceVersionAnnotation] = gatewayResourceVersion + + } + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + consts.RemoteClusterNameLabel: "remote", + consts.MirroredResourceLabel: "true", + consts.RemoteGatewayNameLabel: gtwName, + consts.RemoteGatewayNsLabel: gtwNs, + }, + Annotations: annotations, + }, + Spec: corev1.ServiceSpec{ + Ports: ports, + }, + } +} + +func mirroredServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion, gatewayResourceVersion string, ports []corev1.ServicePort, t *testing.T) string { + svc := mirroredService(name, namespace, gtwName, gtwNs, resourceVersion, gatewayResourceVersion, ports) + + bytes, err := yaml.Marshal(svc) + if err != nil { + t.Fatal(err) + } + return string(bytes) +} + +func gateway(name, namespace, resourceVersion, ip, portName string, port int32) *corev1.Service { + svc := corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: portName, + Protocol: "TCP", + Port: port, + }, + }, + }, + } + + if ip != "" { + svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{IP: ip}) + } + return &svc +} + +func gatewayAsYaml(name, namespace, resourceVersion, ip, portName string, port int32, t *testing.T) string { + gtw := gateway(name, namespace, resourceVersion, ip, portName, port) + + bytes, err := yaml.Marshal(gtw) + if err != nil { + t.Fatal(err) + } + return string(bytes) +} + +func endpoints(name, namespace, gtwName, gtwNs, gatewayIP string, ports []corev1.EndpointPort) *corev1.Endpoints { + var subsets []corev1.EndpointSubset + if gatewayIP != "" { + subsets = []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: gatewayIP, + }, + }, + Ports: ports, + }, + } + } + + return &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + Kind: "Endpoints", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + consts.RemoteClusterNameLabel: "remote", + consts.MirroredResourceLabel: "true", + consts.RemoteGatewayNameLabel: gtwName, + consts.RemoteGatewayNsLabel: gtwNs, + }, + }, + Subsets: subsets, + } +} + +func endpointsAsYaml(name, namespace, gtwName, gtwNs, gatewayIP string, ports []corev1.EndpointPort, t *testing.T) string { + ep := endpoints(name, namespace, gtwName, gtwNs, gatewayIP, ports) + + bytes, err := yaml.Marshal(ep) + if err != nil { + t.Fatal(err) + } + return string(bytes) +}