From cd04b94bb9df7ec3aadcc6a488ffd3d1a7f55dda Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 5 May 2020 08:57:05 +0300 Subject: [PATCH] Probe manager events emission tests (#4312) Probe manager events emission tests Signed-off-by: Zahari Dichev --- .../cmd/service-mirror/cluster_watcher.go | 18 +- .../cluster_watcher_mirroring_test.go | 573 +++++++++ .../cluster_watcher_probe_events_test.go | 214 ++++ .../service-mirror/cluster_watcher_test.go | 1120 ----------------- .../cluster_watcher_test_util.go | 732 ++++++++++- controller/cmd/service-mirror/metrics.go | 31 +- .../cmd/service-mirror/probe_manager.go | 16 +- controller/cmd/service-mirror/probe_worker.go | 1 + 8 files changed, 1562 insertions(+), 1143 deletions(-) create mode 100644 controller/cmd/service-mirror/cluster_watcher_mirroring_test.go create mode 100644 controller/cmd/service-mirror/cluster_watcher_probe_events_test.go delete mode 100644 controller/cmd/service-mirror/cluster_watcher_test.go diff --git a/controller/cmd/service-mirror/cluster_watcher.go b/controller/cmd/service-mirror/cluster_watcher.go index f4fc0be70..392f64425 100644 --- a/controller/cmd/service-mirror/cluster_watcher.go +++ b/controller/cmd/service-mirror/cluster_watcher.go @@ -433,21 +433,22 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ev *RemoteSe } gatewaySpec, err := rcsw.resolveGateway(&ev.gatewayData) + copiedEndpoints := ev.localEndpoints.DeepCopy() if err == nil { - ev.localEndpoints.Subsets = []corev1.EndpointSubset{ + copiedEndpoints.Subsets = []corev1.EndpointSubset{ { Addresses: gatewaySpec.addresses, Ports: rcsw.getEndpointsPorts(ev.remoteUpdate, int32(gatewaySpec.incomingPort)), }, } - ev.localEndpoints.Labels[consts.RemoteGatewayNameLabel] = ev.gatewayData.Name - ev.localEndpoints.Labels[consts.RemoteGatewayNsLabel] = ev.gatewayData.Namespace + copiedEndpoints.Labels[consts.RemoteGatewayNameLabel] = ev.gatewayData.Name + copiedEndpoints.Labels[consts.RemoteGatewayNsLabel] = ev.gatewayData.Namespace if gatewaySpec.identity != "" { - ev.localEndpoints.Annotations[consts.RemoteGatewayIdentity] = gatewaySpec.identity + copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = gatewaySpec.identity } else { - delete(ev.localEndpoints.Annotations, consts.RemoteGatewayIdentity) + delete(copiedEndpoints.Annotations, consts.RemoteGatewayIdentity) } if gatewayChanged { @@ -463,7 +464,7 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ev *RemoteSe ev.localEndpoints.Subsets = nil } - if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.localEndpoints.Namespace).Update(ev.localEndpoints); err != nil { + if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(copiedEndpoints.Namespace).Update(copiedEndpoints); err != nil { return RetryableError{[]error{err}} } @@ -576,6 +577,11 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ev *RemoteSe } func (rcsw *RemoteClusterServiceWatcher) handleRemoteGatewayDeleted(ev *RemoteGatewayDeleted) error { + rcsw.probeEventsSink.send(&GatewayDeleted{ + gatewayName: ev.gatewayData.Name, + gatewayNs: ev.gatewayData.Namespace, + clusterName: rcsw.clusterName, + }) affectedEndpoints, err := rcsw.endpointsForGateway(&ev.gatewayData) if err != nil { // if we cannot find the endpoints, we can give up diff --git a/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go b/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go new file mode 100644 index 000000000..9762c5d1d --- /dev/null +++ b/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go @@ -0,0 +1,573 @@ +package servicemirror + +import ( + "fmt" + "reflect" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" +) + +type NoOpProbeEventSink struct{} + +func (s *NoOpProbeEventSink) send(event interface{}) {} + +type mirroringTestCase struct { + description string + environment *testEnvironment + expectedLocalServices []*corev1.Service + expectedLocalEndpoints []*corev1.Endpoints + expectedEventsInQueue []interface{} +} + +func (tc *mirroringTestCase) run(t *testing.T) { + t.Run(tc.description, func(t *testing.T) { + + q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + localAPI, err := tc.environment.runEnvironment(&NoOpProbeEventSink{}, q) + if err != nil { + t.Fatal(err) + } + + if tc.expectedLocalServices == nil { + // ensure the are no local services + services, err := localAPI.Client.CoreV1().Services(corev1.NamespaceAll).List(metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + if len(services.Items) > 0 { + t.Fatalf("Was expecting no local services but instead found %v", services.Items) + + } + } else { + for _, expected := range tc.expectedLocalServices { + actual, err := localAPI.Client.CoreV1().Services(expected.Namespace).Get(expected.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Could not find mirrored service with name %s", expected.Name) + } + + if err := diffServices(expected, actual); err != nil { + t.Fatal(err) + } + } + } + + if tc.expectedLocalEndpoints == nil { + // ensure the are no local endpoints + endpoints, err := localAPI.Client.CoreV1().Endpoints(corev1.NamespaceAll).List(metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + if len(endpoints.Items) > 0 { + t.Fatalf("Was expecting no local endpoints but instead found %d", len(endpoints.Items)) + + } + } else { + for _, expected := range tc.expectedLocalEndpoints { + actual, err := localAPI.Client.CoreV1().Endpoints(expected.Namespace).Get(expected.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Could not find endpoints with name %s", expected.Name) + } + + if err := diffEndpoints(expected, actual); err != nil { + t.Fatal(err) + } + } + } + + expectedNumEvents := len(tc.expectedEventsInQueue) + actualNumEvents := q.Len() + + if expectedNumEvents != actualNumEvents { + t.Fatalf("Was expecting %d events but got %d", expectedNumEvents, actualNumEvents) + } + + for _, ev := range tc.expectedEventsInQueue { + evInQueue, _ := q.Get() + if !reflect.DeepEqual(ev, evInQueue) { + t.Fatalf("was expecting to see event %T but got %T", ev, evInQueue) + } + } + }) +} + +func TestRemoteServiceCreatedMirroring(t *testing.T) { + for _, tt := range []mirroringTestCase{ + { + description: "create service and endpoints when gateway cannot be resolved", + environment: serviceCreateWithMissingGateway, + expectedLocalServices: []*corev1.Service{ + mirroredService("service-one-remote", "ns1", "missing-gateway", "missing-namespace", "111", "", nil), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("service-one-remote", "ns1", "missing-gateway", "missing-namespace", "", "", nil), + }, + }, + { + description: "create service and endpoints without subsets when gateway spec is wrong", + environment: createServiceWrongGatewaySpec, + expectedLocalServices: []*corev1.Service{ + mirroredService("service-one-remote", "ns1", "existing-gateway", "existing-namespace", "111", "", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("service-one-remote", "ns1", "existing-gateway", "existing-namespace", "", "", nil), + }, + }, + { + description: "create service and endpoints when gateway can be resolved", + environment: createServiceOkeGatewaySpec, + expectedLocalServices: []*corev1.Service{ + mirroredService( + "service-one-remote", + "ns1", + "existing-gateway", + "existing-namespace", + "111", + "222", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("service-one-remote", "ns1", "existing-gateway", "existing-namespace", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + }), + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} + +func TestRemoteServiceDeletedMirroring(t *testing.T) { + for _, tt := range []mirroringTestCase{ + { + description: "deletes locally mirrored service", + environment: deleteMirroredService, + }, + } { + tc := tt // pin + tc.run(t) + } +} + +func TestRemoteServiceUpdatedMirroring(t *testing.T) { + for _, tt := range []mirroringTestCase{ + { + description: "update to new gateway", + environment: updateServiceToNewGateway, + expectedLocalServices: []*corev1.Service{ + mirroredService( + "test-service-remote", + "test-namespace", + "gateway-new", + "gateway-ns", + "currentServiceResVersion", + "currentGatewayResVersion", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port3", + Protocol: "TCP", + Port: 333, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-remote", "test-namespace", "gateway-new", "gateway-ns", "0.0.0.0", "", []corev1.EndpointPort{ + { + Name: "port1", + Port: 999, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 999, + Protocol: "TCP", + }, + }), + }, + }, + { + description: "updates service ports on both service and endpoints", + environment: updateServiceWithChangedPorts, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentServiceResVersion", "currentGatewayResVersion", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port3", + Protocol: "TCP", + Port: 333, + }, + }), + }, + + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", "", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port3", + Port: 888, + Protocol: "TCP", + }, + }), + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} + +func TestRemoteGatewayUpdatedMirroring(t *testing.T) { + for _, tt := range []mirroringTestCase{ + { + description: "endpoints ports are updated on gateway change", + environment: remoteGatewayUpdated, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 999, + Protocol: "TCP", + }}), + endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 999, + Protocol: "TCP", + }}), + }, + }, + + { + description: "endpoints addresses are updated on gateway change", + environment: gatewayAddressChanged, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.1", "", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}), + endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.1", "", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 888, + Protocol: "TCP", + }}), + }, + }, + + { + description: "identity is updated on gateway change", + environment: gatewayIdentityChanged, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "new-identity", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}), + endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "new-identity", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 888, + Protocol: "TCP", + }}), + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} +func TestRemoteGatewayDeletedMirroring(t *testing.T) { + for _, tt := range []mirroringTestCase{ + { + description: "removes endpoint subsets when gateway is deleted", + environment: gatewayDeleted, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "", nil), + endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "", nil), + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} + +func TestClusterUnregisteredMirroring(t *testing.T) { + for _, tt := range []mirroringTestCase{ + { + description: "unregisters cluster and cleans up all mirrored resources", + environment: clusterUnregistered, + }, + } { + tc := tt // pin + tc.run(t) + } +} + +func TestGcOrphanedServicesMirroring(t *testing.T) { + for _, tt := range []mirroringTestCase{ + { + description: "deletes mirrored resources that are no longer present on the remote cluster", + environment: gcTriggered, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "", nil), + }, + + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-1-remote", "test-namespace", "", "", "", "", nil), + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} + +func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase { + + testType := "ADD" + if !isAdd { + testType = "UPDATE" + } + + return []mirroringTestCase{ + { + description: fmt.Sprintf("enqueue a RemoteServiceCreated event when this is not a gateway and we have the needed annotations (%s)", testType), + environment: onAddOrUpdateExportedSvc(isAdd), + expectedEventsInQueue: []interface{}{&RemoteServiceCreated{ + service: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "resVersion", nil), + gatewayData: &gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + }}, + }, + { + description: fmt.Sprintf("enqueue a ConsiderGatewayUpdateDispatch event if this is clearly not a mirrorable service (%s)", testType), + environment: onAddOrUpdateNonExportedSvc(isAdd), + expectedEventsInQueue: []interface{}{&ConsiderGatewayUpdateDispatch{ + maybeGateway: remoteService("test-service", "test-namespace", "", "", "resVersion", nil), + }}, + }, + { + description: fmt.Sprintf("enqueue a RemoteServiceUpdated event if this is a service that we have already mirrored and its res version is different (%s)", testType), + environment: onAddOrUpdateRemoteServiceUpdated(isAdd), + expectedEventsInQueue: []interface{}{&RemoteServiceUpdated{ + localService: mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastResourceVersion", "gatewayResVersion", nil), + localEndpoints: endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), + remoteUpdate: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil), + gatewayData: gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + }}, + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastResourceVersion", "gatewayResVersion", nil), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), + }, + }, + { + description: fmt.Sprintf("not enqueue any events as this update does not really tell us anything new (res version is the same...) (%s)", testType), + environment: onAddOrUpdateSameResVersion(isAdd), + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), + }, + }, + { + description: fmt.Sprintf("enqueue RemoteServiceDeleted event as this service is not mirrorable anymore (%s)", testType), + environment: serviceNotExportedAnymore(isAdd), + expectedEventsInQueue: []interface{}{&RemoteServiceDeleted{ + Name: "test-service", + Namespace: "test-namespace", + }}, + + expectedLocalServices: []*corev1.Service{ + mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), + }, + }, + } +} + +func TestOnAdd(t *testing.T) { + for _, tt := range onAddOrUpdateTestCases(true) { + tc := tt // pin + tc.run(t) + } +} + +func TestOnUpdate(t *testing.T) { + for _, tt := range onAddOrUpdateTestCases(false) { + tc := tt // pin + tc.run(t) + } +} + +func TestOnDelete(t *testing.T) { + for _, tt := range []mirroringTestCase{ + { + description: "enqueues a RemoteServiceDeleted because there is gateway metadata present on the service", + environment: onDeleteWithGatewayMetadata, + expectedEventsInQueue: []interface{}{ + &RemoteServiceDeleted{ + Name: "test-service", + Namespace: "test-namespace", + GatewayData: gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + }, + }, + }, + { + description: "enqueues a RemoteGatewayDeleted because there is no gateway metadata present on the service", + environment: onDeleteNoGatewayMetadata, + expectedEventsInQueue: []interface{}{ + &RemoteGatewayDeleted{ + gatewayData: gatewayMetadata{ + Name: "gateway", + Namespace: "test-namespace", + }, + }, + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} diff --git a/controller/cmd/service-mirror/cluster_watcher_probe_events_test.go b/controller/cmd/service-mirror/cluster_watcher_probe_events_test.go new file mode 100644 index 000000000..2c201c1b1 --- /dev/null +++ b/controller/cmd/service-mirror/cluster_watcher_probe_events_test.go @@ -0,0 +1,214 @@ +package servicemirror + +import ( + "fmt" + "reflect" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/workqueue" +) + +type BufferingProbeEventSink struct { + events []interface{} +} + +func (s *BufferingProbeEventSink) send(event interface{}) { + s.events = append(s.events, event) +} + +type probeEventsTestCase struct { + description string + environment *testEnvironment + expectedEventsSentToProbeManager []interface{} +} + +func (tc *probeEventsTestCase) run(t *testing.T) { + t.Run(tc.description, func(t *testing.T) { + q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + probeEventsSink := &BufferingProbeEventSink{} + _, err := tc.environment.runEnvironment(probeEventsSink, q) + if err != nil { + t.Fatal(err) + } + + expectedNumEvents := len(tc.expectedEventsSentToProbeManager) + actualNumEvents := len(probeEventsSink.events) + + if expectedNumEvents != actualNumEvents { + t.Fatalf("Was expecting %d events but got %d", expectedNumEvents, actualNumEvents) + } + + for i, ev := range tc.expectedEventsSentToProbeManager { + evInQueue := probeEventsSink.events[i] + if !reflect.DeepEqual(ev, evInQueue) { + t.Fatalf("was expecting to see event %T but got %T", ev, evInQueue) + } + } + }) +} + +func gatewaySpec(name, namespace, clustername, address, resVersion, identity string, incomingPort, probePort uint32, probePath string, probePeriod uint32) GatewaySpec { + return GatewaySpec{ + gatewayName: name, + gatewayNamespace: namespace, + clusterName: clustername, + addresses: []v1.EndpointAddress{ + { + IP: address, + }, + }, + incomingPort: incomingPort, + resourceVersion: resVersion, + identity: identity, + ProbeConfig: &ProbeConfig{ + path: probePath, + port: probePort, + periodInSeconds: probePeriod, + }, + } +} + +func TestRemoteServiceCreatedProbeEvents(t *testing.T) { + for _, tt := range []probeEventsTestCase{ + { + description: "do not send event if gateway cannot be resolved", + environment: serviceCreateWithMissingGateway, + expectedEventsSentToProbeManager: []interface{}{}, + }, + { + description: "do not send event if gateway has wrong spec", + environment: createServiceWrongGatewaySpec, + expectedEventsSentToProbeManager: []interface{}{}, + }, + { + description: "create service and endpoints when gateway can be resolved", + environment: createServiceOkeGatewaySpec, + expectedEventsSentToProbeManager: []interface{}{ + &MirroredServicePaired{ + serviceName: fmt.Sprintf("service-one-%s", clusterName), + serviceNamespace: "ns1", + GatewaySpec: gatewaySpec("existing-gateway", "existing-namespace", clusterName, "192.0.2.127", "222", "gateway-identity", 888, defaultProbePort, defaultProbePath, defaultProbePeriod), + }, + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} + +func TestRemoteServiceDeletedProbeEvents(t *testing.T) { + for _, tt := range []probeEventsTestCase{ + { + description: "send a service unpaired event", + environment: deleteMirroredService, + expectedEventsSentToProbeManager: []interface{}{ + &MirroredServiceUnpaired{ + serviceName: fmt.Sprintf("test-service-remote-to-delete-%s", clusterName), + serviceNamespace: "test-namespace-to-delete", + gatewayName: "gateway", + gatewayNs: "gateway-ns", + clusterName: clusterName, + }, + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} + +func TestRemoteServiceUpdatedProbeEvents(t *testing.T) { + for _, tt := range []probeEventsTestCase{ + { + description: "unpairs from old and pairs to new gateway", + environment: updateServiceToNewGateway, + expectedEventsSentToProbeManager: []interface{}{ + &MirroredServiceUnpaired{ + serviceName: fmt.Sprintf("test-service-%s", clusterName), + serviceNamespace: "test-namespace", + gatewayName: "gateway", + gatewayNs: "gateway-ns", + clusterName: clusterName, + }, + &MirroredServicePaired{ + serviceName: fmt.Sprintf("test-service-%s", clusterName), + serviceNamespace: "test-namespace", + GatewaySpec: gatewaySpec("gateway-new", "gateway-ns", clusterName, "0.0.0.0", "currentGatewayResVersion", "", 999, defaultProbePort, defaultProbePath, defaultProbePeriod), + }, + }, + }, + { + description: "does not send event when gateway assignment does not change", + environment: updateServiceWithChangedPorts, + }, + } { + tc := tt // pin + tc.run(t) + } +} + +func TestRemoteGatewayUpdatedProbeEvents(t *testing.T) { + for _, tt := range []probeEventsTestCase{ + { + description: "sends gateway updated when endpoints ports", + environment: remoteGatewayUpdated, + expectedEventsSentToProbeManager: []interface{}{ + &GatewayUpdated{ + GatewaySpec: gatewaySpec("gateway", "gateway-ns", clusterName, "0.0.0.0", "currentGatewayResVersion", "", 999, defaultProbePort, defaultProbePath, defaultProbePeriod), + }, + }, + }, + + { + description: "sends gateway updated when address changes", + environment: gatewayAddressChanged, + expectedEventsSentToProbeManager: []interface{}{ + &GatewayUpdated{ + GatewaySpec: gatewaySpec("gateway", "gateway-ns", "some-cluster", "0.0.0.1", "currentGatewayResVersion", "", 888, 1, "/p", 222), + }, + }, + }, + { + description: "sends gateway updated when identity changes", + environment: gatewayIdentityChanged, + expectedEventsSentToProbeManager: []interface{}{ + &GatewayUpdated{ + GatewaySpec: gatewaySpec("gateway", "gateway-ns", clusterName, "0.0.0.0", "currentGatewayResVersion", "new-identity", 888, defaultProbePort, defaultProbePath, defaultProbePeriod), + }, + }, + }, + { + description: "sends gateway updated when probe changes", + environment: gatewayProbeConfigChanged, + expectedEventsSentToProbeManager: []interface{}{ + &GatewayUpdated{ + GatewaySpec: gatewaySpec("gateway", "gateway-ns", clusterName, "0.0.0.0", "currentGatewayResVersion", "identity", 888, defaultProbePort, "/new-path", defaultProbePeriod), + }, + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} + +func TestRemoteGatewayDeletedProbeEvents(t *testing.T) { + for _, tt := range []probeEventsTestCase{ + { + description: "sends a gateway deleted event to the probe manager", + environment: gatewayDeleted, + expectedEventsSentToProbeManager: []interface{}{ + &GatewayDeleted{ + gatewayName: "gateway", + gatewayNs: "gateway-ns", + clusterName: clusterName, + }, + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} diff --git a/controller/cmd/service-mirror/cluster_watcher_test.go b/controller/cmd/service-mirror/cluster_watcher_test.go deleted file mode 100644 index 1637258e2..000000000 --- a/controller/cmd/service-mirror/cluster_watcher_test.go +++ /dev/null @@ -1,1120 +0,0 @@ -package servicemirror - -import ( - "fmt" - "reflect" - "testing" - - "github.com/linkerd/linkerd2/controller/k8s" - logging "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/util/workqueue" -) - -const ( - clusterName = "remote" - clusterDomain = "cluster.local" -) - -type testCase struct { - testDescription string - events []interface{} - remoteResources []string - localResources []string - expectedLocalServices []*corev1.Service - expectedLocalEndpoints []*corev1.Endpoints - expectedEventsInQueue []interface{} -} - -type NoOpProbeEventSink struct{} - -func (s *NoOpProbeEventSink) send(event interface{}) {} - -func runTestCase(tc *testCase, t *testing.T) { - t.Run(tc.testDescription, func(t *testing.T) { - remoteAPI, err := k8s.NewFakeAPI(tc.remoteResources...) - if err != nil { - t.Fatal(err) - } - - localAPI, err := k8s.NewFakeAPI(tc.localResources...) - if err != nil { - t.Fatal(err) - } - - remoteAPI.Sync(nil) - localAPI.Sync(nil) - - q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - - watcher := RemoteClusterServiceWatcher{ - clusterName: clusterName, - clusterDomain: clusterDomain, - remoteAPIClient: remoteAPI, - localAPIClient: localAPI, - stopper: nil, - log: logging.WithFields(logging.Fields{"cluster": clusterName}), - eventsQueue: q, - requeueLimit: 0, - probeEventsSink: &NoOpProbeEventSink{}, - } - - for _, ev := range tc.events { - q.Add(ev) - } - - for range tc.events { - watcher.processNextEvent() - } - - localAPI.Sync(nil) - remoteAPI.Sync(nil) - - if tc.expectedLocalServices == nil { - // ensure the are no local services - services, err := localAPI.Client.CoreV1().Services(corev1.NamespaceAll).List(metav1.ListOptions{}) - if err != nil { - t.Fatal(err) - } - if len(services.Items) > 0 { - t.Fatalf("Was expecting no local services but instead found %v", services.Items) - - } - } else { - for _, expected := range tc.expectedLocalServices { - actual, err := localAPI.Client.CoreV1().Services(expected.Namespace).Get(expected.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Could not find mirrored service with name %s", expected.Name) - } - - if err := diffServices(expected, actual); err != nil { - t.Fatal(err) - } - } - } - - if tc.expectedLocalEndpoints == nil { - // ensure the are no local endpoints - endpoints, err := localAPI.Client.CoreV1().Endpoints(corev1.NamespaceAll).List(metav1.ListOptions{}) - if err != nil { - t.Fatal(err) - } - if len(endpoints.Items) > 0 { - t.Fatalf("Was expecting no local endpoints but instead found %d", len(endpoints.Items)) - - } - } else { - for _, expected := range tc.expectedLocalEndpoints { - actual, err := localAPI.Client.CoreV1().Endpoints(expected.Namespace).Get(expected.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Could not find endpoints with name %s", expected.Name) - } - - if err := diffEndpoints(expected, actual); err != nil { - t.Fatal(err) - } - } - } - - expectedNumEvents := len(tc.expectedEventsInQueue) - actualNumEvents := watcher.eventsQueue.Len() - - if expectedNumEvents != actualNumEvents { - t.Fatalf("Was expecting %d events but got %d", expectedNumEvents, actualNumEvents) - } - - for _, ev := range tc.expectedEventsInQueue { - evInQueue, _ := watcher.eventsQueue.Get() - if !reflect.DeepEqual(ev, evInQueue) { - t.Fatalf("was expecting to see event %T but got %T", ev, evInQueue) - } - } - }) -} - -func TestRemoteServiceCreated(t *testing.T) { - for _, tt := range []testCase{ - { - testDescription: "create service and endpoints when gateway cannot be resolved", - events: []interface{}{ - &RemoteServiceCreated{ - service: remoteService("service-one", "ns1", "missing-gateway", "missing-namespace", "111", nil), - gatewayData: &gatewayMetadata{ - Name: "missing-gateway", - Namespace: "missing-namespace", - }, - }, - }, - expectedLocalServices: []*corev1.Service{ - mirroredService("service-one-remote", "ns1", "missing-gateway", "missing-namespace", "111", "", nil), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("service-one-remote", "ns1", "missing-gateway", "missing-namespace", "", "", nil), - }, - }, - { - testDescription: "create service and endpoints without subsets when gateway spec is wrong", - events: []interface{}{ - &RemoteServiceCreated{ - service: remoteService("service-one", "ns1", "existing-gateway", "existing-namespace", - "111", []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 555, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 666, - }, - }), - - gatewayData: &gatewayMetadata{ - Name: "existing-gateway", - Namespace: "existing-namespace", - }, - }, - }, - expectedLocalServices: []*corev1.Service{ - mirroredService("service-one-remote", "ns1", "existing-gateway", "existing-namespace", "111", "", - []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 555, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 666, - }, - }), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("service-one-remote", "ns1", "existing-gateway", "existing-namespace", "", "", nil), - }, - remoteResources: []string{ - gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "incoming-port-wrong", 888, "", t), - }, - }, - { - testDescription: "create service and endpoints when gateway can be resolved", - events: []interface{}{ - &RemoteServiceCreated{ - - service: remoteService("service-one", "ns1", "existing-gateway", "existing-namespace", "111", []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 555, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 666, - }, - }), - gatewayData: &gatewayMetadata{ - Name: "existing-gateway", - Namespace: "existing-namespace", - }, - }, - }, - expectedLocalServices: []*corev1.Service{ - mirroredService( - "service-one-remote", - "ns1", - "existing-gateway", - "existing-namespace", - "111", - "222", - []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 555, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 666, - }, - }), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("service-one-remote", "ns1", "existing-gateway", "existing-namespace", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{ - { - Name: "port1", - Port: 888, - Protocol: "TCP", - }, - { - Name: "port2", - Port: 888, - Protocol: "TCP", - }, - }), - }, - remoteResources: []string{ - gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "incoming-port", 888, "gateway-identity", t), - }, - }, - } { - tc := tt // pin - runTestCase(&tc, t) - } -} - -func TestRemoteServiceDeleted(t *testing.T) { - for _, tt := range []testCase{ - { - testDescription: "deletes locally mirrored service", - events: []interface{}{ - &RemoteServiceDeleted{ - Name: "test-service-remote-to-delete", - Namespace: "test-namespace-to-delete", - GatewayData: gatewayMetadata{ - Name: "gateway", - Namespace: "gateway-ns", - }, - }, - }, - - localResources: []string{ - mirroredServiceAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", "", "", "", nil, t), - endpointsAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", "", "", "gateway-identity", nil, t), - }, - }, - } { - tc := tt // pin - runTestCase(&tc, t) - } -} - -func TestRemoteServiceUpdated(t *testing.T) { - for _, tt := range []testCase{ - { - testDescription: "update to new gateway", - events: []interface{}{ - &RemoteServiceUpdated{ - remoteUpdate: remoteService("test-service", "test-namespace", "gateway-new", "gateway-ns", "currentServiceResVersion", []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 111, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 222, - }, - }), - localService: mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastServiceResVersion", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 111, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 222, - }, - }), - localEndpoints: endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", "", []corev1.EndpointPort{ - { - Name: "port1", - Port: 888, - Protocol: "TCP", - }, - { - Name: "port2", - Port: 888, - Protocol: "TCP", - }, - }), - gatewayData: gatewayMetadata{ - Name: "gateway-new", - Namespace: "gateway-ns", - }, - }, - }, - remoteResources: []string{ - gatewayAsYaml("gateway-new", "gateway-ns", "currentGatewayResVersion", "0.0.0.0", "incoming-port", 999, "", t), - }, - localResources: []string{ - mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "past", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 111, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 222, - }, - }, t), - endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", "", []corev1.EndpointPort{ - { - Name: "port1", - Port: 888, - Protocol: "TCP", - }, - { - Name: "port2", - Port: 888, - Protocol: "TCP", - }, - }, t), - }, - expectedLocalServices: []*corev1.Service{ - mirroredService( - "test-service-remote", - "test-namespace", - "gateway-new", - "gateway-ns", - "currentServiceResVersion", - "currentGatewayResVersion", - []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 111, - }, - { - Name: "port3", - Protocol: "TCP", - Port: 333, - }, - }), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("test-service-remote", "test-namespace", "gateway-new", "gateway-ns", "0.0.0.0", "", []corev1.EndpointPort{ - { - Name: "port1", - Port: 999, - Protocol: "TCP", - }, - { - Name: "port2", - Port: 999, - Protocol: "TCP", - }, - }), - }, - }, - { - testDescription: "updates service ports on both service and endpoints", - events: []interface{}{ - &RemoteServiceUpdated{ - remoteUpdate: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentServiceResVersion", []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 111, - }, - { - Name: "port3", - Protocol: "TCP", - Port: 333, - }, - }), - localService: mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastServiceResVersion", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 111, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 222, - }, - }), - localEndpoints: endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", "", []corev1.EndpointPort{ - { - Name: "port1", - Port: 888, - Protocol: "TCP", - }, - { - Name: "port2", - Port: 888, - Protocol: "TCP", - }, - }), - gatewayData: gatewayMetadata{ - Name: "gateway", - Namespace: "gateway-ns", - }, - }, - }, - remoteResources: []string{ - gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "incoming-port", 888, "", t), - }, - localResources: []string{ - mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "past", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 111, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 222, - }, - { - Name: "port3", - Protocol: "TCP", - Port: 333, - }, - }, t), - endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", "", []corev1.EndpointPort{ - { - Name: "port1", - Port: 888, - Protocol: "TCP", - }, - { - Name: "port2", - Port: 888, - Protocol: "TCP", - }, - { - Name: "port3", - Port: 888, - Protocol: "TCP", - }, - }, t), - }, - expectedLocalServices: []*corev1.Service{ - mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentServiceResVersion", "currentGatewayResVersion", - []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 111, - }, - { - Name: "port3", - Protocol: "TCP", - Port: 333, - }, - }), - }, - - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", "", []corev1.EndpointPort{ - { - Name: "port1", - Port: 888, - Protocol: "TCP", - }, - { - Name: "port3", - Port: 888, - Protocol: "TCP", - }, - }), - }, - }, - } { - tc := tt // pin - runTestCase(&tc, t) - } -} - -func TestRemoteGatewayUpdated(t *testing.T) { - for _, tt := range []testCase{ - { - testDescription: "endpoints ports are updated on gateway change", - events: []interface{}{ - &RemoteGatewayUpdated{ - - gatewaySpec: GatewaySpec{ - gatewayName: "gateway", - gatewayNamespace: "gateway-ns", - clusterName: "remote", - addresses: []corev1.EndpointAddress{{IP: "0.0.0.0"}}, - incomingPort: 999, - resourceVersion: "currentGatewayResVersion", - ProbeConfig: nil, - }, - affectedServices: []*corev1.Service{ - mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }), - - mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }), - }, - }, - }, - - expectedLocalServices: []*corev1.Service{ - mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }), - - mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", - []corev1.EndpointPort{ - { - Name: "svc-1-port", - Port: 999, - Protocol: "TCP", - }}), - endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", - []corev1.EndpointPort{ - { - Name: "svc-2-port", - Port: 999, - Protocol: "TCP", - }}), - }, - localResources: []string{ - mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }, t), - endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", - []corev1.EndpointPort{ - { - Name: "svc-1-port", - Port: 888, - Protocol: "TCP", - }}, t), - mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }, t), - endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", - []corev1.EndpointPort{ - { - Name: "svc-2-port", - Port: 888, - Protocol: "TCP", - }}, t), - }, - }, - - { - testDescription: "endpoints addresses are updated on gateway change", - events: []interface{}{ - &RemoteGatewayUpdated{ - gatewaySpec: GatewaySpec{ - gatewayName: "gateway", - gatewayNamespace: "gateway-ns", - clusterName: "remote", - addresses: []corev1.EndpointAddress{{IP: "0.0.0.1"}}, - incomingPort: 888, - resourceVersion: "currentGatewayResVersion", - }, - affectedServices: []*corev1.Service{ - mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }), - mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }), - }, - }, - }, - expectedLocalServices: []*corev1.Service{ - mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }), - mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.1", "", - []corev1.EndpointPort{ - { - Name: "svc-1-port", - Port: 888, - Protocol: "TCP", - }}), - endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.1", "", - []corev1.EndpointPort{ - { - Name: "svc-2-port", - Port: 888, - Protocol: "TCP", - }}), - }, - - localResources: []string{ - mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }, t), - endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", - []corev1.EndpointPort{ - { - Name: "svc-1-port", - Port: 888, - Protocol: "TCP", - }}, t), - mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }, t), - endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", - []corev1.EndpointPort{ - { - Name: "svc-2-port", - Port: 888, - Protocol: "TCP", - }}, t), - }, - }, - - { - testDescription: "identity is updated on gateway change", - events: []interface{}{ - &RemoteGatewayUpdated{ - gatewaySpec: GatewaySpec{ - gatewayName: "gateway", - gatewayNamespace: "gateway-ns", - clusterName: "", - addresses: []corev1.EndpointAddress{{IP: "0.0.0.0"}}, - incomingPort: 888, - resourceVersion: "currentGatewayResVersion", - identity: "new-identity", - ProbeConfig: nil, - }, - affectedServices: []*corev1.Service{ - mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }), - mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }), - }, - }, - }, - expectedLocalServices: []*corev1.Service{ - mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }), - mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "currentGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "new-identity", - []corev1.EndpointPort{ - { - Name: "svc-1-port", - Port: 888, - Protocol: "TCP", - }}), - endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "new-identity", - []corev1.EndpointPort{ - { - Name: "svc-2-port", - Port: 888, - Protocol: "TCP", - }}), - }, - - localResources: []string{ - mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }, t), - endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", - []corev1.EndpointPort{ - { - Name: "svc-1-port", - Port: 888, - Protocol: "TCP", - }}, t), - mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }, t), - endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", - []corev1.EndpointPort{ - { - Name: "svc-2-port", - Port: 888, - Protocol: "TCP", - }}, t), - }, - }, - } { - tc := tt // pin - runTestCase(&tc, t) - } -} -func TestRemoteGatewayDeleted(t *testing.T) { - for _, tt := range []testCase{ - { - testDescription: "removes endpoint subsets when gateway is deleted", - events: []interface{}{ - &RemoteGatewayDeleted{ - gatewayData: gatewayMetadata{ - Name: "gateway", - Namespace: "gateway-ns", - }, - }, - }, - expectedLocalServices: []*corev1.Service{ - mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }), - - mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "", nil), - endpoints("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "", nil), - }, - localResources: []string{ - mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", - []corev1.ServicePort{ - { - Name: "svc-1-port", - Protocol: "TCP", - Port: 8081, - }, - }, t), - endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", - []corev1.EndpointPort{ - { - Name: "svc-1-port", - Port: 888, - Protocol: "TCP", - }}, t), - mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ - { - Name: "svc-2-port", - Protocol: "TCP", - Port: 8082, - }, - }, t), - endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", - []corev1.EndpointPort{ - { - Name: "svc-2-port", - Port: 888, - Protocol: "TCP", - }}, t), - }, - }, - } { - tc := tt // pin - runTestCase(&tc, t) - } -} - -func TestClusterUnregistered(t *testing.T) { - for _, tt := range []testCase{ - { - testDescription: "unregisters cluster and cleans up all mirrored resources", - events: []interface{}{ - &ClusterUnregistered{}, - }, - localResources: []string{ - mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "", "", "", "", nil, t), - endpointsAsYaml("test-service-1-remote", "test-namespace", "", "", "", "", nil, t), - mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "", "", "", "", nil, t), - endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", "", "", nil, t), - }, - }, - } { - tc := tt // pin - runTestCase(&tc, t) - } -} - -func TestGcOrphanedServices(t *testing.T) { - for _, tt := range []testCase{ - { - testDescription: "deletes mirrored resources that are no longer present on the remote cluster", - events: []interface{}{ - &OprhanedServicesGcTriggered{}, - }, - localResources: []string{ - mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "", nil, t), - endpointsAsYaml("test-service-1-remote", "test-namespace", "", "", "", "", nil, t), - mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "", "", "", "", nil, t), - endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", "", "", nil, t), - }, - remoteResources: []string{ - remoteServiceAsYaml("test-service-1", "test-namespace", "gateway", "gateway-ns", "", nil, t), - }, - - expectedLocalServices: []*corev1.Service{ - mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "", nil), - }, - - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("test-service-1-remote", "test-namespace", "", "", "", "", nil), - }, - }, - } { - tc := tt // pin - runTestCase(&tc, t) - } -} - -// the following tests ensure that onAdd, onUpdate and onDelete result in -// queueing more specific events to be processed - -func onAddOrUpdateEvent(isAdd bool, svc *corev1.Service) interface{} { - if isAdd { - return &OnAddCalled{svc: svc} - } - return &OnUpdateCalled{svc: svc} -} - -func onAddOrUpdateTestCases(t *testing.T, isAdd bool) []testCase { - - testType := "ADD" - if !isAdd { - testType = "UPDATE" - } - - return []testCase{ - { - testDescription: fmt.Sprintf("enqueue a RemoteServiceCreated event when this is not a gateway and we have the needed annotations (%s)", testType), - events: []interface{}{ - onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "resVersion", nil)), - }, - expectedEventsInQueue: []interface{}{&RemoteServiceCreated{ - service: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "resVersion", nil), - gatewayData: &gatewayMetadata{ - Name: "gateway", - Namespace: "gateway-ns", - }, - }}, - }, - { - testDescription: fmt.Sprintf("enqueue a ConsiderGatewayUpdateDispatch event if this is clearly not a mirrorable service (%s)", testType), - events: []interface{}{ - onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "", "", "resVersion", nil)), - }, - expectedEventsInQueue: []interface{}{&ConsiderGatewayUpdateDispatch{ - maybeGateway: remoteService("test-service", "test-namespace", "", "", "resVersion", nil), - }}, - }, - { - testDescription: fmt.Sprintf("enqueue a RemoteServiceUpdated event if this is a service that we have already mirrored and its res version is different (%s)", testType), - events: []interface{}{ - onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil)), - }, - localResources: []string{ - mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastResourceVersion", "gatewayResVersion", nil, t), - endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil, t), - }, - expectedEventsInQueue: []interface{}{&RemoteServiceUpdated{ - localService: mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastResourceVersion", "gatewayResVersion", nil), - localEndpoints: endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), - remoteUpdate: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil), - gatewayData: gatewayMetadata{ - Name: "gateway", - Namespace: "gateway-ns", - }, - }}, - expectedLocalServices: []*corev1.Service{ - mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastResourceVersion", "gatewayResVersion", nil), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), - }, - }, - { - testDescription: fmt.Sprintf("not enqueue any events as this update does not really tell us anything new (res version is the same...) (%s)", testType), - events: []interface{}{ - onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil)), - }, - localResources: []string{ - mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil, t), - endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil, t), - }, - expectedLocalServices: []*corev1.Service{ - mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), - }, - }, - { - testDescription: fmt.Sprintf("enqueue RemoteServiceDeleted event as this service is not mirrorable anymore (%s)", testType), - events: []interface{}{ - onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "", "gateway-ns", "currentResVersion", nil)), - }, - localResources: []string{ - mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil, t), - endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil, t), - }, - expectedEventsInQueue: []interface{}{&RemoteServiceDeleted{ - Name: "test-service", - Namespace: "test-namespace", - }}, - - expectedLocalServices: []*corev1.Service{ - mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil), - }, - expectedLocalEndpoints: []*corev1.Endpoints{ - endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), - }, - }, - } -} - -func TestOnAdd(t *testing.T) { - for _, tt := range onAddOrUpdateTestCases(t, true) { - tc := tt // pin - runTestCase(&tc, t) - } -} - -func TestOnUpdate(t *testing.T) { - for _, tt := range onAddOrUpdateTestCases(t, false) { - tc := tt // pin - runTestCase(&tc, t) - } -} - -func TestOnDelete(t *testing.T) { - for _, tt := range []testCase{ - { - testDescription: "enqueues a RemoteServiceDeleted because there is gateway metadata present on the service", - events: []interface{}{ - &OnDeleteCalled{ - svc: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil), - }, - }, - - expectedEventsInQueue: []interface{}{ - &RemoteServiceDeleted{ - Name: "test-service", - Namespace: "test-namespace", - GatewayData: gatewayMetadata{ - Name: "gateway", - Namespace: "gateway-ns", - }, - }, - }, - }, - { - testDescription: "enqueues a RemoteGatewayDeleted because there is no gateway metadata present on the service", - events: []interface{}{ - &OnDeleteCalled{ - svc: remoteService("gateway", "test-namespace", "", "", "currentResVersion", nil), - }, - }, - expectedEventsInQueue: []interface{}{ - &RemoteGatewayDeleted{ - gatewayData: gatewayMetadata{ - Name: "gateway", - Namespace: "test-namespace", - }, - }, - }, - }, - } { - tc := tt // pin - runTestCase(&tc, t) - } -} diff --git a/controller/cmd/service-mirror/cluster_watcher_test_util.go b/controller/cmd/service-mirror/cluster_watcher_test_util.go index 17869dbcb..5c3ef5556 100644 --- a/controller/cmd/service-mirror/cluster_watcher_test_util.go +++ b/controller/cmd/service-mirror/cluster_watcher_test_util.go @@ -2,16 +2,721 @@ package servicemirror import ( "fmt" + "log" "reflect" "strings" - "testing" "github.com/ghodss/yaml" + "github.com/linkerd/linkerd2/controller/k8s" consts "github.com/linkerd/linkerd2/pkg/k8s" + logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" ) +const ( + clusterName = "remote" + clusterDomain = "cluster.local" + defaultProbePath = "/probe" + defaultProbePort = 12345 + defaultProbePeriod = 60 +) + +type testEnvironment struct { + events []interface{} + remoteResources []string + localResources []string +} + +func (te *testEnvironment) runEnvironment(probeEventSync ProbeEventSink, watcherQueue workqueue.RateLimitingInterface) (*k8s.API, error) { + remoteAPI, err := k8s.NewFakeAPI(te.remoteResources...) + if err != nil { + return nil, err + } + + localAPI, err := k8s.NewFakeAPI(te.localResources...) + if err != nil { + return nil, err + } + + remoteAPI.Sync(nil) + localAPI.Sync(nil) + + watcher := RemoteClusterServiceWatcher{ + clusterName: clusterName, + clusterDomain: clusterDomain, + remoteAPIClient: remoteAPI, + localAPIClient: localAPI, + stopper: nil, + log: logging.WithFields(logging.Fields{"cluster": clusterName}), + eventsQueue: watcherQueue, + requeueLimit: 0, + probeEventsSink: probeEventSync, + } + + for _, ev := range te.events { + watcherQueue.Add(ev) + } + + for range te.events { + watcher.processNextEvent() + } + + localAPI.Sync(nil) + remoteAPI.Sync(nil) + + return localAPI, nil +} + +var serviceCreateWithMissingGateway = &testEnvironment{ + events: []interface{}{ + &RemoteServiceCreated{ + service: remoteService("service-one", "ns1", "missing-gateway", "missing-namespace", "111", nil), + gatewayData: &gatewayMetadata{ + Name: "missing-gateway", + Namespace: "missing-namespace", + }, + }, + }, +} + +var createServiceWrongGatewaySpec = &testEnvironment{ + events: []interface{}{ + &RemoteServiceCreated{ + service: remoteService("service-one", "ns1", "existing-gateway", "existing-namespace", + "111", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + + gatewayData: &gatewayMetadata{ + Name: "existing-gateway", + Namespace: "existing-namespace", + }, + }, + }, + remoteResources: []string{ + gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "incoming-port-wrong", 888, "", 111, "/path", 666), + }, +} + +var createServiceOkeGatewaySpec = &testEnvironment{ + events: []interface{}{ + &RemoteServiceCreated{ + service: remoteService("service-one", "ns1", "existing-gateway", "existing-namespace", "111", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + gatewayData: &gatewayMetadata{ + Name: "existing-gateway", + Namespace: "existing-namespace", + }, + }, + }, + remoteResources: []string{ + gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "incoming-port", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod), + }, +} + +var deleteMirroredService = &testEnvironment{ + events: []interface{}{ + &RemoteServiceDeleted{ + Name: "test-service-remote-to-delete", + Namespace: "test-namespace-to-delete", + GatewayData: gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + }, + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", "", "", "", nil), + endpointsAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", "", "", "gateway-identity", nil), + }, +} + +var updateServiceToNewGateway = &testEnvironment{ + events: []interface{}{ + &RemoteServiceUpdated{ + remoteUpdate: remoteService("test-service", "test-namespace", "gateway-new", "gateway-ns", "currentServiceResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 222, + }, + }), + localService: mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastServiceResVersion", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 222, + }, + }), + localEndpoints: endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", "", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + }), + gatewayData: gatewayMetadata{ + Name: "gateway-new", + Namespace: "gateway-ns", + }, + }, + }, + remoteResources: []string{ + gatewayAsYaml("gateway-new", "gateway-ns", "currentGatewayResVersion", "0.0.0.0", "incoming-port", 999, "", defaultProbePort, defaultProbePath, defaultProbePeriod), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "past", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 222, + }, + }), + endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", "", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + }), + }, +} + +var updateServiceWithChangedPorts = &testEnvironment{ + events: []interface{}{ + &RemoteServiceUpdated{ + remoteUpdate: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentServiceResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port3", + Protocol: "TCP", + Port: 333, + }, + }), + localService: mirroredService("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastServiceResVersion", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 222, + }, + }), + localEndpoints: endpoints("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", "", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + }), + gatewayData: gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + }, + }, + remoteResources: []string{ + gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "incoming-port", 888, "", defaultProbePort, defaultProbePath, defaultProbePeriod), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "past", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 111, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 222, + }, + { + Name: "port3", + Protocol: "TCP", + Port: 333, + }, + }), + endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "192.0.2.127", "", []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port3", + Port: 888, + Protocol: "TCP", + }, + }), + }, +} + +var remoteGatewayUpdated = &testEnvironment{ + events: []interface{}{ + &RemoteGatewayUpdated{ + gatewaySpec: GatewaySpec{ + gatewayName: "gateway", + gatewayNamespace: "gateway-ns", + clusterName: "remote", + addresses: []corev1.EndpointAddress{{IP: "0.0.0.0"}}, + incomingPort: 999, + resourceVersion: "currentGatewayResVersion", + ProbeConfig: &ProbeConfig{ + path: defaultProbePath, + port: defaultProbePort, + periodInSeconds: defaultProbePeriod, + }, + }, + affectedServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + }, + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 888, + Protocol: "TCP", + }}), + }, +} + +var gatewayAddressChanged = &testEnvironment{ + events: []interface{}{ + &RemoteGatewayUpdated{ + gatewaySpec: GatewaySpec{ + gatewayName: "gateway", + gatewayNamespace: "gateway-ns", + clusterName: "some-cluster", + addresses: []corev1.EndpointAddress{{IP: "0.0.0.1"}}, + incomingPort: 888, + resourceVersion: "currentGatewayResVersion", + ProbeConfig: &ProbeConfig{ + path: "/p", + port: 1, + periodInSeconds: 222, + }, + }, + affectedServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + }, + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 888, + Protocol: "TCP", + }}), + }, +} + +var gatewayIdentityChanged = &testEnvironment{ + events: []interface{}{ + &RemoteGatewayUpdated{ + gatewaySpec: GatewaySpec{ + gatewayName: "gateway", + gatewayNamespace: "gateway-ns", + clusterName: clusterName, + addresses: []corev1.EndpointAddress{{IP: "0.0.0.0"}}, + incomingPort: 888, + resourceVersion: "currentGatewayResVersion", + identity: "new-identity", + ProbeConfig: &ProbeConfig{ + path: defaultProbePath, + port: defaultProbePort, + periodInSeconds: defaultProbePeriod, + }, + }, + affectedServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + mirroredService("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + }, + }, + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 888, + Protocol: "TCP", + }}), + }, +} + +var gatewayProbeConfigChanged = &testEnvironment{ + events: []interface{}{ + &RemoteGatewayUpdated{ + gatewaySpec: GatewaySpec{ + gatewayName: "gateway", + gatewayNamespace: "gateway-ns", + clusterName: clusterName, + addresses: []corev1.EndpointAddress{{IP: "0.0.0.0"}}, + incomingPort: 888, + resourceVersion: "currentGatewayResVersion", + identity: "identity", + ProbeConfig: &ProbeConfig{ + path: "/new-path", + port: defaultProbePort, + periodInSeconds: defaultProbePeriod, + }, + }, + affectedServices: []*corev1.Service{ + mirroredService("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + }, + }, + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}), + }, +} + +var gatewayDeleted = &testEnvironment{ + events: []interface{}{ + &RemoteGatewayDeleted{ + gatewayData: gatewayMetadata{ + Name: "gateway", + Namespace: "gateway-ns", + }, + }, + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", + []corev1.ServicePort{ + { + Name: "svc-1-port", + Protocol: "TCP", + Port: 8081, + }, + }), + endpointsAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-1-port", + Port: 888, + Protocol: "TCP", + }}), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "", "pastGatewayResVersion", []corev1.ServicePort{ + { + Name: "svc-2-port", + Protocol: "TCP", + Port: 8082, + }, + }), + endpointsAsYaml("test-service-2-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", + []corev1.EndpointPort{ + { + Name: "svc-2-port", + Port: 888, + Protocol: "TCP", + }}), + }, +} + +var clusterUnregistered = &testEnvironment{ + events: []interface{}{ + &ClusterUnregistered{}, + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "", "", "", "", nil), + endpointsAsYaml("test-service-1-remote", "test-namespace", "", "", "", "", nil), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "", "", "", "", nil), + endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", "", "", nil), + }, +} + +var gcTriggered = &testEnvironment{ + events: []interface{}{ + &OprhanedServicesGcTriggered{}, + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-1-remote", "test-namespace", "gateway", "gateway-ns", "", "", nil), + endpointsAsYaml("test-service-1-remote", "test-namespace", "", "", "", "", nil), + mirroredServiceAsYaml("test-service-2-remote", "test-namespace", "", "", "", "", nil), + endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", "", "", nil), + }, + remoteResources: []string{ + remoteServiceAsYaml("test-service-1", "test-namespace", "gateway", "gateway-ns", "", nil), + }, +} + +func onAddOrUpdateExportedSvc(isAdd bool) *testEnvironment { + return &testEnvironment{ + events: []interface{}{ + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "resVersion", nil)), + }, + } + +} + +func onAddOrUpdateNonExportedSvc(isAdd bool) *testEnvironment { + return &testEnvironment{ + events: []interface{}{ + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "", "", "resVersion", nil)), + }, + } + +} + +func onAddOrUpdateRemoteServiceUpdated(isAdd bool) *testEnvironment { + return &testEnvironment{ + events: []interface{}{ + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil)), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "pastResourceVersion", "gatewayResVersion", nil), + endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), + }, + } +} + +func onAddOrUpdateSameResVersion(isAdd bool) *testEnvironment { + return &testEnvironment{ + events: []interface{}{ + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil)), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil), + endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), + }, + } +} + +func serviceNotExportedAnymore(isAdd bool) *testEnvironment { + return &testEnvironment{ + events: []interface{}{ + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "", "gateway-ns", "currentResVersion", nil)), + }, + localResources: []string{ + mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "currentResVersion", "gatewayResVersion", nil), + endpointsAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "0.0.0.0", "", nil), + }, + } +} + +var onDeleteWithGatewayMetadata = &testEnvironment{ + events: []interface{}{ + &OnDeleteCalled{ + svc: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil), + }, + }, +} + +var onDeleteNoGatewayMetadata = &testEnvironment{ + events: []interface{}{ + &OnDeleteCalled{ + svc: remoteService("gateway", "test-namespace", "", "", "currentResVersion", nil), + }, + }, +} + +// the following tests ensure that onAdd, onUpdate and onDelete result in +// queueing more specific events to be processed + +func onAddOrUpdateEvent(isAdd bool, svc *corev1.Service) interface{} { + if isAdd { + return &OnAddCalled{svc: svc} + } + return &OnUpdateCalled{svc: svc} +} + func diffServices(expected, actual *corev1.Service) error { if expected.Name != actual.Name { return fmt.Errorf("was expecting service with name %s but was %s", expected.Name, actual.Name) @@ -80,12 +785,12 @@ func remoteService(name, namespace, gtwName, gtwNs, resourceVersion string, port } } -func remoteServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion string, ports []corev1.ServicePort, t *testing.T) string { +func remoteServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion string, ports []corev1.ServicePort) string { svc := remoteService(name, namespace, gtwName, gtwNs, resourceVersion, ports) bytes, err := yaml.Marshal(svc) if err != nil { - t.Fatal(err) + log.Fatal(err) } return string(bytes) } @@ -121,17 +826,17 @@ func mirroredService(name, namespace, gtwName, gtwNs, resourceVersion, gatewayRe } } -func mirroredServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion, gatewayResourceVersion string, ports []corev1.ServicePort, t *testing.T) string { +func mirroredServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion, gatewayResourceVersion string, ports []corev1.ServicePort) string { svc := mirroredService(name, namespace, gtwName, gtwNs, resourceVersion, gatewayResourceVersion, ports) bytes, err := yaml.Marshal(svc) if err != nil { - t.Fatal(err) + log.Fatal(err) } return string(bytes) } -func gateway(name, namespace, resourceVersion, ip, portName string, port int32, identity string) *corev1.Service { +func gateway(name, namespace, resourceVersion, ip, portName string, port int32, identity string, probePort int, probePath string, probePeriod int) *corev1.Service { svc := corev1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", @@ -142,7 +847,10 @@ func gateway(name, namespace, resourceVersion, ip, portName string, port int32, Namespace: namespace, ResourceVersion: resourceVersion, Annotations: map[string]string{ - consts.GatewayIdentity: identity, + consts.GatewayIdentity: identity, + consts.GatewayProbePath: probePath, + consts.GatewayProbePeriod: fmt.Sprint(probePeriod), + consts.GatewayProbePort: fmt.Sprint(probePort), }, }, Spec: corev1.ServiceSpec{ @@ -162,12 +870,12 @@ func gateway(name, namespace, resourceVersion, ip, portName string, port int32, return &svc } -func gatewayAsYaml(name, namespace, resourceVersion, ip, portName string, port int32, identity string, t *testing.T) string { - gtw := gateway(name, namespace, resourceVersion, ip, portName, port, identity) +func gatewayAsYaml(name, namespace, resourceVersion, ip, portName string, port int32, identity string, probePort int, probePath string, probePeriod int) string { + gtw := gateway(name, namespace, resourceVersion, ip, portName, port, identity, probePort, probePath, probePeriod) bytes, err := yaml.Marshal(gtw) if err != nil { - t.Fatal(err) + log.Fatal(err) } return string(bytes) } @@ -215,12 +923,12 @@ func endpoints(name, namespace, gtwName, gtwNs, gatewayIP string, gatewayIdentit return endpoints } -func endpointsAsYaml(name, namespace, gtwName, gtwNs, gatewayIP, gatewayIdentity string, ports []corev1.EndpointPort, t *testing.T) string { +func endpointsAsYaml(name, namespace, gtwName, gtwNs, gatewayIP, gatewayIdentity string, ports []corev1.EndpointPort) string { ep := endpoints(name, namespace, gtwName, gtwNs, gatewayIP, gatewayIdentity, ports) bytes, err := yaml.Marshal(ep) if err != nil { - t.Fatal(err) + log.Fatal(err) } return string(bytes) } diff --git a/controller/cmd/service-mirror/metrics.go b/controller/cmd/service-mirror/metrics.go index ec2fdf31f..bb7612603 100644 --- a/controller/cmd/service-mirror/metrics.go +++ b/controller/cmd/service-mirror/metrics.go @@ -3,6 +3,7 @@ package servicemirror import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + logging "github.com/sirupsen/logrus" ) const ( @@ -23,10 +24,11 @@ type probeMetricVecs struct { } type probeMetrics struct { - services prometheus.Gauge - alive prometheus.Gauge - latencies prometheus.Observer - probes *prometheus.CounterVec + services prometheus.Gauge + alive prometheus.Gauge + latencies prometheus.Observer + probes *prometheus.CounterVec + unregister func() } func newProbeMetricVecs() probeMetricVecs { @@ -112,5 +114,26 @@ func (mv probeMetricVecs) newWorkerMetrics(gatewayNamespace, gatewayName, remote alive: mv.alive.With(labels), latencies: mv.latencies.With(labels), probes: curriedProbes, + unregister: func() { + mv.unregister(gatewayNamespace, gatewayName, remoteClusterName) + }, }, nil } + +func (mv probeMetricVecs) unregister(gatewayNamespace, gatewayName, remoteClusterName string) { + labels := prometheus.Labels{ + gatewayNameLabel: gatewayName, + gatewayNamespaceLabel: gatewayNamespace, + gatewayClusterName: remoteClusterName, + } + + if !mv.services.Delete(labels) { + logging.Warnf("unable to delete num_mirrored_services metric with labels %s", labels) + } + if !mv.alive.Delete(labels) { + logging.Warnf("unable to delete gateway_alive metric with labels %s", labels) + } + if !mv.latencies.Delete(labels) { + logging.Warnf("unable to delete gateway_probe_latency_ms metric with labels %s", labels) + } +} diff --git a/controller/cmd/service-mirror/probe_manager.go b/controller/cmd/service-mirror/probe_manager.go index 0febad05d..19d054dd9 100644 --- a/controller/cmd/service-mirror/probe_manager.go +++ b/controller/cmd/service-mirror/probe_manager.go @@ -40,6 +40,13 @@ type MirroredServicePaired struct { GatewaySpec } +// GatewayDeleted is emitted when a gateway is deleted +type GatewayDeleted struct { + gatewayName string + gatewayNs string + clusterName string +} + // ClusterNotRegistered is is emitted when the cluster is not monitored anymore type ClusterNotRegistered struct { clusterName string @@ -94,6 +101,8 @@ func (m *ProbeManager) handleEvent(ev interface{}) { m.handleMirroredServiceUnpaired(ev) case *GatewayUpdated: m.handleGatewayUpdated(ev) + case *GatewayDeleted: + m.handleGatewayDeleted(ev) case *ClusterNotRegistered: m.handleClusterNotRegistered(ev) default: @@ -189,6 +198,11 @@ func (m *ProbeManager) stopProbe(key string) { } } +func (m *ProbeManager) handleGatewayDeleted(event *GatewayDeleted) { + probeKey := probeKey(event.gatewayNs, event.gatewayName, event.clusterName) + m.stopProbe(probeKey) +} + func (m *ProbeManager) handleClusterNotRegistered(event *ClusterNotRegistered) { matchLabels := map[string]string{ consts.MirroredResourceLabel: "true", @@ -202,7 +216,7 @@ func (m *ProbeManager) handleClusterNotRegistered(event *ClusterNotRegistered) { stopped := make(map[string]bool) for _, svc := range services { - probeKey := probeKey(svc.Labels[consts.RemoteGatewayNameLabel], svc.Labels[consts.RemoteGatewayNameLabel], event.clusterName) + probeKey := probeKey(svc.Labels[consts.RemoteGatewayNsLabel], svc.Labels[consts.RemoteGatewayNameLabel], event.clusterName) if _, ok := stopped[probeKey]; !ok { m.stopProbe(probeKey) diff --git a/controller/cmd/service-mirror/probe_worker.go b/controller/cmd/service-mirror/probe_worker.go index b5ffb0bd3..c48f75113 100644 --- a/controller/cmd/service-mirror/probe_worker.go +++ b/controller/cmd/service-mirror/probe_worker.go @@ -77,6 +77,7 @@ func (pw *ProbeWorker) UpdateProbeSpec(spec *probeSpec) { // Stop this probe worker func (pw *ProbeWorker) Stop() { + pw.metrics.unregister() pw.log.Debug("Stopping probe worker") close(pw.stopCh) }