diff --git a/multicluster/charts/linkerd-multicluster/templates/remote-access-service-mirror-rbac.yaml b/multicluster/charts/linkerd-multicluster/templates/remote-access-service-mirror-rbac.yaml index bd34e08ea..39038c8ee 100644 --- a/multicluster/charts/linkerd-multicluster/templates/remote-access-service-mirror-rbac.yaml +++ b/multicluster/charts/linkerd-multicluster/templates/remote-access-service-mirror-rbac.yaml @@ -16,12 +16,15 @@ metadata: {{ include "partials.annotations.created-by" $ }} rules: - apiGroups: [""] - resources: ["services"] + resources: ["services", "endpoints"] verbs: ["list", "get", "watch"] - apiGroups: [""] resources: ["configmaps"] verbs: ["get"] - resourceNames: ["linkerd-config"] + resourceNames: ["linkerd-config"] +- apiGroups: [""] + resources: ["events"] + verbs: ["create", "patch"] --- apiVersion: v1 kind: ServiceAccount diff --git a/multicluster/cmd/service-mirror/main.go b/multicluster/cmd/service-mirror/main.go index d5918e0e4..3c2e62834 100644 --- a/multicluster/cmd/service-mirror/main.go +++ b/multicluster/cmd/service-mirror/main.go @@ -39,6 +39,7 @@ func Main(args []string) { metricsAddr := cmd.String("metrics-addr", ":9999", "address to serve scrapable metrics on") namespace := cmd.String("namespace", "", "namespace containing Link and credentials Secret") repairPeriod := cmd.Duration("endpoint-refresh-period", 1*time.Minute, "frequency to refresh endpoint resolution") + enableHeadlessSvc := cmd.Bool("enable-headless-services", false, "toggle support for headless service mirroring") flags.ConfigureAndParse(cmd, args) linkName := cmd.Arg(0) @@ -113,7 +114,7 @@ main: if err != nil { log.Errorf("Failed to load remote cluster credentials: %s", err) } - err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics) + err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc) if err != nil { // failed to restart cluster watcher; give a bit of slack // and restart the link watch to give it another try @@ -162,6 +163,7 @@ func restartClusterWatcher( requeueLimit int, repairPeriod time.Duration, metrics servicemirror.ProbeMetricVecs, + enableHeadlessSvc bool, ) error { if clusterWatcher != nil { clusterWatcher.Stop(false) @@ -183,6 +185,7 @@ func restartClusterWatcher( &link, requeueLimit, repairPeriod, + enableHeadlessSvc, ) if err != nil { return fmt.Errorf("Unable to create cluster watcher: %s", err) diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index f1971a660..3c4b0845c 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -13,14 +13,20 @@ import ( "github.com/prometheus/client_golang/prometheus" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ) +const eventTypeSkipped = "ServiceMirroringSkipped" + type ( // RemoteClusterServiceWatcher is a watcher instantiated for every cluster that is being watched // Its main job is to listen to events coming from the remote cluster and react accordingly, keeping @@ -30,15 +36,17 @@ type ( // it can be requeued up to N times, to ensure that the failure is not due to some temporary network // problems or general glitch in the Matrix. RemoteClusterServiceWatcher struct { - serviceMirrorNamespace string - link *multicluster.Link - remoteAPIClient *k8s.API - localAPIClient *k8s.API - stopper chan struct{} - log *logging.Entry - eventsQueue workqueue.RateLimitingInterface - requeueLimit int - repairPeriod time.Duration + serviceMirrorNamespace string + link *multicluster.Link + remoteAPIClient *k8s.API + localAPIClient *k8s.API + stopper chan struct{} + recorder record.EventRecorder + log *logging.Entry + eventsQueue workqueue.RateLimitingInterface + requeueLimit int + repairPeriod time.Duration + headlessServicesEnabled bool } // RemoteServiceCreated is generated whenever a remote service is created Observing @@ -89,12 +97,23 @@ type ( svc *corev1.Service } + // OnAddEndpointsCalled is issued when the onAdd function of the Endpoints + // shared informer is called + OnAddEndpointsCalled struct { + ep *corev1.Endpoints + } + // OnUpdateCalled is issued when the onUpdate function of the // shared informer is called OnUpdateCalled struct { svc *corev1.Service } + // OnUpdateEndpointsCalled is issued when the onUpdate function of the + // shared Endpoints informer is called + OnUpdateEndpointsCalled struct { + ep *corev1.Endpoints + } // OnDeleteCalled is issued when the onDelete function of the // shared informer is called OnDeleteCalled struct { @@ -126,9 +145,9 @@ func NewRemoteClusterServiceWatcher( link *multicluster.Link, requeueLimit int, repairPeriod time.Duration, - + enableHeadlessSvc bool, ) (*RemoteClusterServiceWatcher, error) { - remoteAPI, err := k8s.InitializeAPIForConfig(ctx, cfg, false, k8s.Svc) + remoteAPI, err := k8s.InitializeAPIForConfig(ctx, cfg, false, k8s.Svc, k8s.Endpoint) if err != nil { return nil, fmt.Errorf("cannot initialize api for target cluster %s: %s", clusterName, err) } @@ -137,6 +156,15 @@ func NewRemoteClusterServiceWatcher( return nil, fmt.Errorf("cannot connect to api for target cluster %s: %s", clusterName, err) } + // Create k8s event recorder + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{ + Interface: remoteAPI.Client.CoreV1().Events(""), + }) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{ + Component: fmt.Sprintf("linkerd-service-mirror-%s", clusterName), + }) + stopper := make(chan struct{}) return &RemoteClusterServiceWatcher{ serviceMirrorNamespace: serviceMirrorNamespace, @@ -144,13 +172,15 @@ func NewRemoteClusterServiceWatcher( remoteAPIClient: remoteAPI, localAPIClient: localAPI, stopper: stopper, + recorder: recorder, log: logging.WithFields(logging.Fields{ "cluster": clusterName, "apiAddress": cfg.Host, }), - eventsQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - requeueLimit: requeueLimit, - repairPeriod: repairPeriod, + eventsQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + requeueLimit: requeueLimit, + repairPeriod: repairPeriod, + headlessServicesEnabled: enableHeadlessSvc, }, nil } @@ -174,10 +204,12 @@ func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceAnnotations(remoteSer consts.RemoteResourceVersionAnnotation: remoteService.ResourceVersion, // needed to detect real changes consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", remoteService.Name, remoteService.Namespace, rcsw.link.TargetClusterDomain), } + value, ok := remoteService.GetAnnotations()[consts.ProxyOpaquePortsAnnotation] if ok { annotations[consts.ProxyOpaquePortsAnnotation] = value } + return annotations } @@ -289,7 +321,7 @@ func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources(ctx context.Co if kerrors.IsNotFound(err) { continue } - errors = append(errors, fmt.Errorf("Could not delete service %s/%s: %s", svc.Namespace, svc.Name, err)) + errors = append(errors, fmt.Errorf("Could not delete service %s/%s: %s", svc.Namespace, svc.Name, err)) } else { rcsw.log.Infof("Deleted service %s/%s", svc.Namespace, svc.Name) } @@ -297,7 +329,7 @@ func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources(ctx context.Co endpoints, err := rcsw.localAPIClient.Endpoint().Lister().List(labels.Set(matchLabels).AsSelector()) if err != nil { - innerErr := fmt.Errorf("could not retrieve Endpoints that need cleaning up: %s", err) + innerErr := fmt.Errorf("could not retrieve endpoints that need cleaning up: %s", err) if kerrors.IsNotFound(err) { return innerErr } @@ -309,9 +341,9 @@ func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources(ctx context.Co if kerrors.IsNotFound(err) { continue } - errors = append(errors, fmt.Errorf("Could not delete Endpoints %s/%s: %s", endpoint.Namespace, endpoint.Name, err)) + errors = append(errors, fmt.Errorf("Could not delete endpoints %s/%s: %s", endpoint.Namespace, endpoint.Name, err)) } else { - rcsw.log.Infof("Deleted Endpoints %s/%s", endpoint.Namespace, endpoint.Name) + rcsw.log.Infof("Deleted endpoints %s/%s", endpoint.Namespace, endpoint.Name) } } @@ -324,11 +356,43 @@ func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources(ctx context.Co // Deletes a locally mirrored service as it is not present on the remote cluster anymore func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context.Context, ev *RemoteServiceDeleted) error { localServiceName := rcsw.mirroredResourceName(ev.Name) - rcsw.log.Infof("Deleting mirrored service %s/%s", ev.Namespace, localServiceName) + localService, err := rcsw.localAPIClient.Svc().Lister().Services(ev.Namespace).Get(localServiceName) var errors []error + if err != nil { + if kerrors.IsNotFound(err) { + rcsw.log.Debugf("Failed to delete mirror service %s/%s: %v", ev.Namespace, ev.Name, err) + return nil + } + errors = append(errors, fmt.Errorf("could not fetch service %s/%s: %s", ev.Namespace, localServiceName, err)) + } + + // If the mirror service is headless, also delete its endpoint mirror + // services. + if localService.Spec.ClusterIP == corev1.ClusterIPNone { + matchLabels := map[string]string{ + consts.MirroredHeadlessSvcNameLabel: localServiceName, + } + endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector()) + if err != nil { + if !kerrors.IsNotFound(err) { + errors = append(errors, fmt.Errorf("could not fetch endpoint mirrors for mirror service %s/%s: %s", ev.Namespace, localServiceName, err)) + } + } + + for _, endpointMirror := range endpointMirrorServices { + err = rcsw.localAPIClient.Client.CoreV1().Services(endpointMirror.Namespace).Delete(ctx, endpointMirror.Name, metav1.DeleteOptions{}) + if err != nil { + if !kerrors.IsNotFound(err) { + errors = append(errors, fmt.Errorf("could not delete endpoint mirror %s/%s: %s", endpointMirror.Namespace, endpointMirror.Name, err)) + } + } + } + } + + rcsw.log.Infof("Deleting mirrored service %s/%s", ev.Namespace, localServiceName) if err := rcsw.localAPIClient.Client.CoreV1().Services(ev.Namespace).Delete(ctx, localServiceName, metav1.DeleteOptions{}); err != nil { if !kerrors.IsNotFound(err) { - errors = append(errors, fmt.Errorf("could not delete Service: %s/%s: %s", ev.Namespace, localServiceName, err)) + errors = append(errors, fmt.Errorf("could not delete service: %s/%s: %s", ev.Namespace, localServiceName, err)) } } @@ -336,7 +400,7 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context. return RetryableError{errors} } - rcsw.log.Infof("Successfully deleted Service: %s/%s", ev.Namespace, localServiceName) + rcsw.log.Infof("Successfully deleted service: %s/%s", ev.Namespace, localServiceName) return nil } @@ -392,11 +456,6 @@ func remapRemoteServicePorts(ports []corev1.ServicePort) []corev1.ServicePort { } func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.Context, ev *RemoteServiceCreated) error { - gatewayAddresses, err := rcsw.resolveGatewayAddress() - if err != nil { - return err - } - remoteService := ev.service.DeepCopy() serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name) localServiceName := rcsw.mirroredResourceName(remoteService.Name) @@ -417,35 +476,8 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context. }, } - endpointsToCreate := &corev1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: localServiceName, - Namespace: ev.service.Namespace, - Labels: map[string]string{ - consts.MirroredResourceLabel: "true", - consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName, - }, - Annotations: map[string]string{ - consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", remoteService.Name, remoteService.Namespace, rcsw.link.TargetClusterDomain), - }, - }, - } - - // only if we resolve it, we are updating the endpoints addresses and ports - rcsw.log.Infof("Resolved gateway [%v:%d] for %s", gatewayAddresses, rcsw.link.GatewayPort, serviceInfo) - - if len(gatewayAddresses) > 0 { - endpointsToCreate.Subsets = []corev1.EndpointSubset{ - { - Addresses: gatewayAddresses, - Ports: rcsw.getEndpointsPorts(ev.service), - }, - } - } else { - rcsw.log.Warnf("gateway for %s does not have ready addresses, skipping subsets", serviceInfo) - } - if rcsw.link.GatewayIdentity != "" { - endpointsToCreate.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity + if rcsw.headlessServicesEnabled && remoteService.Spec.ClusterIP == corev1.ClusterIPNone { + return nil } rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo) @@ -456,10 +488,52 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context. } } - rcsw.log.Infof("Creating a new Endpoints for %s", serviceInfo) - if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.service.Namespace).Create(ctx, endpointsToCreate, metav1.CreateOptions{}); err != nil { + return rcsw.createGatewayEndpoints(ctx, remoteService) +} + +func (rcsw *RemoteClusterServiceWatcher) createGatewayEndpoints(ctx context.Context, exportedService *corev1.Service) error { + gatewayAddresses, err := rcsw.resolveGatewayAddress() + if err != nil { + return err + } + + localServiceName := rcsw.mirroredResourceName(exportedService.Name) + serviceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name) + endpointsToCreate := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: localServiceName, + Namespace: exportedService.Namespace, + Labels: map[string]string{ + consts.MirroredResourceLabel: "true", + consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName, + }, + Annotations: map[string]string{ + consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain), + }, + }, + } + + rcsw.log.Infof("Resolved gateway [%v:%d] for %s", gatewayAddresses, rcsw.link.GatewayPort, serviceInfo) + + if len(gatewayAddresses) > 0 { + endpointsToCreate.Subsets = []corev1.EndpointSubset{ + { + Addresses: gatewayAddresses, + Ports: rcsw.getEndpointsPorts(exportedService), + }, + } + } else { + rcsw.log.Warnf("gateway for %s does not have ready addresses, skipping subsets", serviceInfo) + } + + if rcsw.link.GatewayIdentity != "" { + endpointsToCreate.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity + } + + rcsw.log.Infof("Creating a new endpoints for %s", serviceInfo) + if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(exportedService.Namespace).Create(ctx, endpointsToCreate, metav1.CreateOptions{}); err != nil { // we clean up after ourselves - rcsw.localAPIClient.Client.CoreV1().Services(ev.service.Namespace).Delete(ctx, localServiceName, metav1.DeleteOptions{}) + rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, localServiceName, metav1.DeleteOptions{}) // and retry return RetryableError{[]error{err}} } @@ -506,6 +580,7 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.S } return RetryableError{[]error{err}} } + return nil } localSvc, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(localName) @@ -562,8 +637,12 @@ func (rcsw *RemoteClusterServiceWatcher) processNextEvent(ctx context.Context) ( switch ev := event.(type) { case *OnAddCalled: err = rcsw.createOrUpdateService(ev.svc) + case *OnAddEndpointsCalled: + err = rcsw.createOrUpdateHeadlessEndpoints(ctx, ev.ep) case *OnUpdateCalled: err = rcsw.createOrUpdateService(ev.svc) + case *OnUpdateEndpointsCalled: + err = rcsw.createOrUpdateHeadlessEndpoints(ctx, ev.ep) case *OnDeleteCalled: rcsw.handleOnDelete(ev.svc) case *RemoteServiceCreated: @@ -655,6 +734,34 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { }, }, ) + if rcsw.headlessServicesEnabled { + rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if obj.(metav1.Object).GetNamespace() == "kube-system" { + return + } + + if ok := isExportedHeadlessEndpoints(obj, rcsw.log); !ok { + return + } + + rcsw.eventsQueue.Add(&OnAddEndpointsCalled{obj.(*corev1.Endpoints)}) + }, + UpdateFunc: func(old, new interface{}) { + if new.(metav1.Object).GetNamespace() == "kube-system" { + return + } + + if ok := isExportedHeadlessEndpoints(new, rcsw.log); !ok { + return + } + + rcsw.eventsQueue.Add(&OnUpdateEndpointsCalled{new.(*corev1.Endpoints)}) + }, + }, + ) + } go rcsw.processEvents(ctx) // We need to issue a RepairEndpoints immediately to populate the gateway @@ -760,6 +867,14 @@ func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) er for _, svc := range mirrorServices { updatedService := svc.DeepCopy() + // If the Service is headless we should skip repairing its Endpoints. + // Headless Services that are mirrored on a remote cluster will have + // their Endpoints created with hostnames and nested clusterIP services, + // we should avoid replacing these with the gateway address. + if svc.Spec.ClusterIP == corev1.ClusterIPNone { + rcsw.log.Debugf("Skipped repairing endpoints for %s/%s", svc.Namespace, svc.Name) + continue + } endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(svc.Namespace).Get(svc.Name) if err != nil { rcsw.log.Errorf("Could not get endpoints: %s", err) @@ -815,3 +930,421 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateEndpoints(ctx context.Con return nil } + +// createOrUpdateHeadlessEndpoints processes endpoints objects for exported +// headless services. When an endpoints object is created or updated in the +// remote cluster, it will be processed here in order to reconcile the local +// cluster state with the remote cluster state. +// +// createOrUpdateHeadlessEndpoints is also responsible for creating the service +// mirror in the source cluster. In order for an exported headless service to be +// mirrored as headless, it must have at least one port defined and at least one +// named address in its endpoints object (e.g a deployment would not work since +// pods may not have arbitrary hostnames). As such, when an endpoints object is +// first processed, if there is no mirror service, we create one, by looking at +// the endpoints object itself. If the exported service is deemed to be valid +// for headless mirroring, then the function will create the headless mirror and +// then create an endpoints object for it in the source cluster. If it is not +// valid, the exported service will be mirrored as clusterIP and its endpoints +// will point to the gateway. +// +// When creating endpoints for a headless mirror, we also create an endpoint +// mirror (clusterIP) service for each of the endpoints' named addresses. If the +// headless mirror exists and has an endpoints object, we simply update by +// either creating or deleting endpoint mirror services. +func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx context.Context, exportedEndpoints *corev1.Endpoints) error { + exportedService, err := rcsw.remoteAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(exportedEndpoints.Name) + if err != nil { + rcsw.log.Debugf("failed to retrieve exported service %s/%s when updating its headless mirror endpoints: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err) + return fmt.Errorf("error retrieving exported service %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err) + } + + // Check whether the endpoints should be processed for a headless exported + // service. If the exported service does not have any ports exposed, then + // neither will its corresponding endpoint mirrors, it should not be created + // as a headless mirror. If the service does not have any named addresses in + // its Endpoints object, then the endpoints should not be processed. + if len(exportedService.Spec.Ports) == 0 { + rcsw.recorder.Event(exportedService, v1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: object spec has no exposed ports") + rcsw.log.Infof("Skipped creating headless mirror for %s/%s: service object spec has no exposed ports", exportedService.Namespace, exportedService.Name) + return nil + } + + mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name) + mirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedService.Namespace).Get(mirrorServiceName) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + + // If the mirror service does not exist, create it, either as clusterIP + // or as headless. + mirrorService, err = rcsw.createRemoteHeadlessService(ctx, exportedService, exportedEndpoints) + if err != nil { + return err + } + } + + headlessMirrorEpName := rcsw.mirroredResourceName(exportedEndpoints.Name) + headlessMirrorEndpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(headlessMirrorEpName) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + + if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone { + return rcsw.createGatewayEndpoints(ctx, exportedService) + } + + // Create endpoint mirrors for headless mirror + if err := rcsw.createHeadlessMirrorEndpoints(ctx, exportedService, exportedEndpoints); err != nil { + rcsw.log.Debugf("failed to create headless mirrors for endpoints %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err) + return err + } + + return nil + } + + // Past this point, we do not want to process a mirror service that is not + // headless. We want to process only endpoints for headless mirrors; before + // this point it would have been possible to have a clusterIP mirror, since + // we are creating the mirror service in the scope of the function. + if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone { + return nil + } + + mirrorEndpoints := headlessMirrorEndpoints.DeepCopy() + endpointMirrors := make(map[string]struct{}) + newSubsets := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets)) + for _, subset := range exportedEndpoints.Subsets { + newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses)) + for _, address := range subset.Addresses { + if address.Hostname == "" { + continue + } + + endpointMirrorName := rcsw.mirroredResourceName(address.Hostname) + endpointMirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(endpointMirrorName) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + // If the error is 'NotFound' then the Endpoint Mirror service + // does not exist, so create it. + endpointMirrorService, err = rcsw.createEndpointMirrorService(ctx, address.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService) + if err != nil { + return err + } + } + + endpointMirrors[endpointMirrorName] = struct{}{} + newAddresses = append(newAddresses, corev1.EndpointAddress{ + Hostname: address.Hostname, + IP: endpointMirrorService.Spec.ClusterIP, + }) + } + + if len(newAddresses) == 0 { + continue + } + + // copy ports, create subset + newSubsets = append(newSubsets, corev1.EndpointSubset{ + Addresses: newAddresses, + Ports: subset.DeepCopy().Ports, + }) + } + + headlessMirrorName := rcsw.mirroredResourceName(exportedService.Name) + matchLabels := map[string]string{ + consts.MirroredHeadlessSvcNameLabel: headlessMirrorName, + } + + // Fetch all Endpoint Mirror services that belong to the same Headless Mirror + endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector()) + if err != nil { + return err + } + + var errors []error + for _, service := range endpointMirrorServices { + // If the service's name does not show up in the up-to-date map of + // Endpoint Mirror names, then we should delete it. + if _, found := endpointMirrors[service.Name]; found { + continue + } + err := rcsw.localAPIClient.Client.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{}) + if err != nil { + if !kerrors.IsNotFound(err) { + errors = append(errors, fmt.Errorf("error deleting Endpoint Mirror service %s/%s: %v", service.Namespace, service.Name, err)) + } + } + } + if len(errors) > 0 { + return RetryableError{errors} + } + + // Update + mirrorEndpoints.Subsets = newSubsets + _, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(mirrorEndpoints.Namespace).Update(ctx, mirrorEndpoints, metav1.UpdateOptions{}) + if err != nil { + return RetryableError{[]error{err}} + } + + return nil +} + +// createRemoteHeadlessService creates a mirror service for an exported headless +// service. Whether the mirror will be created as a headless or clusterIP +// service depends on the endpoints object associated with the exported service. +// If there is at least one named address, then the service will be mirrored as +// headless. +// +// Note: we do not check for any exposed ports because it was previously done +// when the service was picked up by the service mirror. We also do not need to +// check if the exported service is headless; its endpoints will be processed +// only if it is headless so we are certain at this point that is the case. +func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessService(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) (*corev1.Service, error) { + // If we don't have any subsets to process then avoid creating the service. + // We need at least one address to be make a decision (whether we should + // create as clusterIP or headless), rely on the endpoints being eventually + // consistent, will likely receive an update with subsets. + if len(exportedEndpoints.Subsets) == 0 { + return &corev1.Service{}, nil + } + + remoteService := exportedService.DeepCopy() + serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name) + localServiceName := rcsw.mirroredResourceName(remoteService.Name) + + if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil { + return &corev1.Service{}, err + } + + serviceToCreate := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: localServiceName, + Namespace: remoteService.Namespace, + Annotations: rcsw.getMirroredServiceAnnotations(remoteService), + Labels: rcsw.getMirroredServiceLabels(), + }, + Spec: corev1.ServiceSpec{ + Ports: remapRemoteServicePorts(remoteService.Spec.Ports), + }, + } + + if shouldExportAsHeadlessService(exportedEndpoints, rcsw.log) { + serviceToCreate.Spec.ClusterIP = corev1.ClusterIPNone + rcsw.log.Infof("Creating a new headless service mirror for %s", serviceInfo) + } else { + rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo) + } + + svc, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{}) + if err != nil { + if !kerrors.IsAlreadyExists(err) { + // we might have created it during earlier attempt, if that is not the case, we retry + return &corev1.Service{}, RetryableError{[]error{err}} + } + } + + return svc, err +} + +// createHeadlessMirrorEndpoints creates an endpoints object for a Headless +// Mirror service. The endpoints object will contain the same subsets and hosts +// as the endpoints object of the exported headless service. Each host in the +// Headless Mirror's endpoints object will point to an Endpoint Mirror service. +func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpoints(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) error { + exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name) + endpointsHostnames := make(map[string]struct{}) + subsetsToCreate := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets)) + for _, subset := range exportedEndpoints.Subsets { + newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses)) + for _, addr := range subset.Addresses { + if addr.Hostname == "" { + continue + } + + endpointMirrorName := rcsw.mirroredResourceName(addr.Hostname) + createdService, err := rcsw.createEndpointMirrorService(ctx, addr.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService) + if err != nil { + rcsw.log.Errorf("error creating endpoint mirror service %s/%s for exported headless service %s: %v", endpointMirrorName, exportedService.Namespace, exportedServiceInfo, err) + continue + } + + endpointsHostnames[addr.Hostname] = struct{}{} + newAddresses = append(newAddresses, corev1.EndpointAddress{ + Hostname: addr.TargetRef.Name, + IP: createdService.Spec.ClusterIP, + }) + + } + + if len(newAddresses) == 0 { + continue + } + + subsetsToCreate = append(subsetsToCreate, corev1.EndpointSubset{ + Addresses: newAddresses, + Ports: subset.DeepCopy().Ports, + }) + } + + headlessMirrorServiceName := rcsw.mirroredResourceName(exportedService.Name) + headlessMirrorEndpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: headlessMirrorServiceName, + Namespace: exportedService.Namespace, + Labels: map[string]string{ + consts.MirroredResourceLabel: "true", + consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName, + }, + Annotations: map[string]string{ + consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain), + }, + }, + Subsets: subsetsToCreate, + } + + if rcsw.link.GatewayIdentity != "" { + headlessMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity + } + + rcsw.log.Infof("Creating a new headless mirror endpoints object for headless mirror %s/%s", headlessMirrorServiceName, exportedService.Namespace) + if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(exportedService.Namespace).Create(ctx, headlessMirrorEndpoints, metav1.CreateOptions{}); err != nil { + // we clean up after ourselves + rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, headlessMirrorServiceName, metav1.DeleteOptions{}) + // and retry + return RetryableError{[]error{err}} + } + + return nil +} + +// createEndpointMirrorService creates a new Endpoint Mirror service and its +// corresponding endpoints object. It returns the newly created Endpoint Mirror +// service object. When a headless service is exported, we create a Headless +// Mirror service in the source cluster and then for each hostname in the +// exported service's endpoints object, we also create an Endpoint Mirror +// service (and its corresponding endpoints object). +func (rcsw *RemoteClusterServiceWatcher) createEndpointMirrorService(ctx context.Context, endpointHostname, resourceVersion, endpointMirrorName string, exportedService *corev1.Service) (*corev1.Service, error) { + gatewayAddresses, err := rcsw.resolveGatewayAddress() + if err != nil { + return nil, err + } + + endpointMirrorAnnotations := map[string]string{ + // needed to detect real changes + consts.RemoteResourceVersionAnnotation: resourceVersion, + consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.%s.svc.%s", endpointHostname, exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain), + } + + endpointMirrorLabels := rcsw.getMirroredServiceLabels() + mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name) + endpointMirrorLabels[consts.MirroredHeadlessSvcNameLabel] = mirrorServiceName + + // Create service spec, clusterIP + endpointMirrorService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: endpointMirrorName, + Namespace: exportedService.Namespace, + Annotations: endpointMirrorAnnotations, + Labels: endpointMirrorLabels, + }, + Spec: corev1.ServiceSpec{ + Ports: remapRemoteServicePorts(exportedService.Spec.Ports), + }, + } + endpointMirrorEndpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: endpointMirrorService.Name, + Namespace: endpointMirrorService.Namespace, + Labels: endpointMirrorLabels, + Annotations: map[string]string{ + consts.RemoteServiceFqName: endpointMirrorService.Annotations[consts.RemoteServiceFqName], + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: gatewayAddresses, + Ports: rcsw.getEndpointsPorts(exportedService), + }, + }, + } + + if rcsw.link.GatewayIdentity != "" { + endpointMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity + } + + exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name) + endpointMirrorInfo := fmt.Sprintf("%s/%s", endpointMirrorService.Namespace, endpointMirrorName) + rcsw.log.Infof("Creating a new endpoint mirror service %s for exported headless service %s", endpointMirrorInfo, exportedServiceInfo) + createdService, err := rcsw.localAPIClient.Client.CoreV1().Services(endpointMirrorService.Namespace).Create(ctx, endpointMirrorService, metav1.CreateOptions{}) + if err != nil { + if !kerrors.IsAlreadyExists(err) { + // we might have created it during earlier attempt, if that is not the case, we retry + return createdService, RetryableError{[]error{err}} + } + } + + rcsw.log.Infof("Creating a new endpoints object for endpoint mirror service %s", endpointMirrorInfo) + if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpointMirrorService.Namespace).Create(ctx, endpointMirrorEndpoints, metav1.CreateOptions{}); err != nil { + // If we cannot create an Endpoints object for the Endpoint Mirror + // service, then delete the Endpoint Mirror service we just created + rcsw.localAPIClient.Client.CoreV1().Services(endpointMirrorService.Namespace).Delete(ctx, endpointMirrorName, metav1.DeleteOptions{}) + // and retry + return createdService, RetryableError{[]error{err}} + } + + return createdService, nil +} + +// shouldExportAsHeadlessService checks if an exported service should be +// mirrored as a headless service or as a clusterIP service, based on its +// endpoints object. For an exported service to be a headless mirror, it needs +// to have at least one named address in its endpoints (that is, a pod with a +// `hostname`). If the endpoints object does not contain at least one named +// address, it should be exported as clusterIP. +func shouldExportAsHeadlessService(endpoints *corev1.Endpoints, log *logging.Entry) bool { + for _, subset := range endpoints.Subsets { + for _, addr := range subset.Addresses { + if addr.Hostname != "" { + return true + } + } + + for _, addr := range subset.NotReadyAddresses { + if addr.Hostname != "" { + return true + } + } + } + log.Infof("Service %s/%s should not be exported as headless: no named addresses in its endpoints object", endpoints.Namespace, endpoints.Name) + return false +} + +// isExportedHeadlessEndpoints checks if an endpoints object belongs to a +// headless exported service. +func isExportedHeadlessEndpoints(obj interface{}, log *logging.Entry) bool { + ep, ok := obj.(*corev1.Endpoints) + if !ok { + log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep) + return false + } + + if _, found := ep.Labels[corev1.IsHeadlessService]; !found { + // Not an Endpoints object for a headless service? Then we likely don't want + // to update anything. + log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, corev1.IsHeadlessService) + return false + } + + // If Endpoints belong to an unexported service, ignore. + if _, found := ep.Labels[consts.DefaultExportedServiceSelector]; !found { + log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, consts.DefaultExportedServiceSelector) + return false + } + + return true +} diff --git a/multicluster/service-mirror/cluster_watcher_mirroring_test.go b/multicluster/service-mirror/cluster_watcher_mirroring_test.go index 57fafbd38..17f0e372c 100644 --- a/multicluster/service-mirror/cluster_watcher_mirroring_test.go +++ b/multicluster/service-mirror/cluster_watcher_mirroring_test.go @@ -28,13 +28,13 @@ func (tc *mirroringTestCase) run(t *testing.T) { if err != nil { t.Fatal(err) } - if tc.expectedLocalServices == nil { // ensure the are no local services services, err := localAPI.Client.CoreV1().Services(corev1.NamespaceAll).List(context.Background(), metav1.ListOptions{}) if err != nil { t.Fatal(err) } + if len(services.Items) > 0 { t.Fatalf("Was expecting no local services but instead found %v", services.Items) @@ -122,6 +122,78 @@ func TestRemoteServiceCreatedMirroring(t *testing.T) { }), }, }, + { + description: "create headless service and endpoints when gateway can be resolved", + environment: createExportedHeadlessService, + expectedLocalServices: []*corev1.Service{ + headlessMirrorService( + "service-one-remote", + "ns2", + "111", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + endpointMirrorService( + "pod-0", + "service-one-remote", + "ns2", + "112", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }, + ), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + headlessMirrorEndpoints("service-one-remote", "ns2", "pod-0", "", "gateway-identity", []corev1.EndpointPort{ + { + Name: "port1", + Port: 555, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 666, + Protocol: "TCP", + }, + }), + endpointMirrorEndpoints( + "service-one-remote", + "ns2", + "pod-0", + "192.0.2.129", + "gateway-identity", + []corev1.EndpointPort{ + { + Name: "port1", + Port: 889, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 889, + Protocol: "TCP", + }, + }), + }, + }, } { tc := tt // pin tc.run(t) @@ -182,6 +254,112 @@ func TestRemoteServiceUpdatedMirroring(t *testing.T) { } } +func TestRemoteEndpointsUpdatedMirroring(t *testing.T) { + for _, tt := range []mirroringTestCase{ + { + description: "updates headless mirror service with new remote Endpoints hosts", + environment: updateEndpointsWithChangedHosts, + expectedLocalServices: []*corev1.Service{ + headlessMirrorService("service-two-remote", "eptest", "222", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + endpointMirrorService("pod-0", "service-two-remote", "eptest", "333", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + endpointMirrorService("pod-1", "service-two-remote", "eptest", "112", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{ + headlessMirrorEndpointsUpdated( + "service-two-remote", + "eptest", + []string{"pod-0", "pod-1"}, + []string{"", ""}, + "gateway-identity", + []corev1.EndpointPort{ + { + Name: "port1", + Port: 555, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 666, + Protocol: "TCP", + }, + }), + endpointMirrorEndpoints( + "service-two-remote", + "eptest", + "pod-0", + "192.0.2.127", + "gateway-identity", + []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + }), + endpointMirrorEndpoints( + "service-two-remote", + "eptest", + "pod-1", + "192.0.2.127", + "gateway-identity", + []corev1.EndpointPort{ + { + Name: "port1", + Port: 888, + Protocol: "TCP", + }, + { + Name: "port2", + Port: 888, + Protocol: "TCP", + }, + }), + }, + }, + } { + tc := tt // pin + tc.run(t) + } +} + func TestClusterUnregisteredMirroring(t *testing.T) { for _, tt := range []mirroringTestCase{ { diff --git a/multicluster/service-mirror/cluster_watcher_test_util.go b/multicluster/service-mirror/cluster_watcher_test_util.go index a6b9c572c..1d889f39c 100644 --- a/multicluster/service-mirror/cluster_watcher_test_util.go +++ b/multicluster/service-mirror/cluster_watcher_test_util.go @@ -47,23 +47,22 @@ func (te *testEnvironment) runEnvironment(watcherQueue workqueue.RateLimitingInt if err != nil { return nil, err } - localAPI, err := k8s.NewFakeAPI(te.localResources...) if err != nil { return nil, err } - remoteAPI.Sync(nil) localAPI.Sync(nil) watcher := RemoteClusterServiceWatcher{ - link: &te.link, - remoteAPIClient: remoteAPI, - localAPIClient: localAPI, - stopper: nil, - log: logging.WithFields(logging.Fields{"cluster": clusterName}), - eventsQueue: watcherQueue, - requeueLimit: 0, + link: &te.link, + remoteAPIClient: remoteAPI, + localAPIClient: localAPI, + stopper: nil, + log: logging.WithFields(logging.Fields{"cluster": clusterName}), + eventsQueue: watcherQueue, + requeueLimit: 0, + headlessServicesEnabled: true, } for _, ev := range te.events { @@ -101,6 +100,7 @@ var createExportedService = &testEnvironment{ }, remoteResources: []string{ gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod), + endpointsAsYaml("service-one", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{}), }, link: multicluster.Link{ TargetClusterName: clusterName, @@ -113,6 +113,82 @@ var createExportedService = &testEnvironment{ }, } +var createExportedHeadlessService = &testEnvironment{ + events: []interface{}{ + &RemoteServiceCreated{ + service: remoteHeadlessService("service-one", "ns2", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + &OnAddEndpointsCalled{ + ep: remoteHeadlessEndpoints("service-one", "ns2", "112", "192.0.0.1", []corev1.EndpointPort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + }, + remoteResources: []string{ + gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.129", "gateway", 889, "gateway-identity", 123456, "/probe1", 120), + remoteHeadlessSvcAsYaml("service-one", "ns2", "111", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + remoteHeadlessEndpointsAsYaml("service-one", "ns2", "112", "192.0.0.1", []corev1.EndpointPort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + link: multicluster.Link{ + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.129", + GatewayPort: 889, + ProbeSpec: multicluster.ProbeSpec{ + Port: 123456, + Path: "/probe1", + Period: 120, + }, + Selector: *defaultSelector, + }, +} + var deleteMirrorService = &testEnvironment{ events: []interface{}{ &RemoteServiceDeleted{ @@ -228,6 +304,113 @@ var updateServiceWithChangedPorts = &testEnvironment{ }, } +var updateEndpointsWithChangedHosts = &testEnvironment{ + events: []interface{}{ + &OnUpdateEndpointsCalled{ + ep: remoteHeadlessEndpointsUpdate("service-two", "eptest", "112", "192.0.0.1", []corev1.EndpointPort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + }, + remoteResources: []string{ + gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "mc-gateway", 888, "", defaultProbePort, defaultProbePath, defaultProbePeriod), + remoteHeadlessSvcAsYaml("service-two", "eptest", "222", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + localResources: []string{ + headlessMirrorAsYaml("service-two-remote", "eptest", "222", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + endpointMirrorAsYaml("pod-0", "service-two-remote", "eptest", "333", []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + headlessMirrorEndpointsAsYaml( + "service-two-remote", + "eptest", + "pod-0", + "", + "gateway-identity", + []corev1.EndpointPort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + endpointMirrorEndpointsAsYaml( + "service-two-remote", + "eptest", + "pod-0", + "192.0.2.127", + "gateway-identity", + []corev1.EndpointPort{ + { + Name: "port1", + Protocol: "TCP", + Port: 888, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 888, + }, + }), + }, + link: multicluster.Link{ + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + }, +} var clusterUnregistered = &testEnvironment{ events: []interface{}{ &ClusterUnregistered{}, @@ -456,6 +639,99 @@ func remoteService(name, namespace, resourceVersion string, labels map[string]st } } +func remoteHeadlessService(name, namespace, resourceVersion string, labels map[string]string, ports []corev1.ServicePort) *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + Labels: labels, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "None", + Ports: ports, + }, + } +} + +func remoteHeadlessEndpoints(name, namespace, resourceVersion, address string, ports []corev1.EndpointPort) *corev1.Endpoints { + return &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + Kind: "Endpoints", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + Labels: map[string]string{ + "service.kubernetes.io/headless": "", + consts.DefaultExportedServiceSelector: "true", + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + Hostname: "pod-0", + IP: address, + TargetRef: &corev1.ObjectReference{ + Name: "pod-0", + ResourceVersion: resourceVersion, + }, + }, + }, + Ports: ports, + }, + }, + } +} + +func remoteHeadlessEndpointsUpdate(name, namespace, resourceVersion, address string, ports []corev1.EndpointPort) *corev1.Endpoints { + return &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + Kind: "Endpoints", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + Labels: map[string]string{ + "service.kubernetes.io/headless": "", + consts.DefaultExportedServiceSelector: "true", + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + Hostname: "pod-0", + IP: address, + TargetRef: &corev1.ObjectReference{ + Name: "pod-0", + ResourceVersion: resourceVersion, + }, + }, + { + Hostname: "pod-1", + IP: address, + TargetRef: &corev1.ObjectReference{ + Name: "pod-1", + ResourceVersion: resourceVersion, + }, + }, + }, + Ports: ports, + }, + }, + } +} + func remoteServiceAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string { svc := remoteService(name, namespace, resourceVersion, nil, ports) @@ -466,6 +742,25 @@ func remoteServiceAsYaml(name, namespace, resourceVersion string, ports []corev1 return string(bytes) } +func remoteHeadlessSvcAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string { + svc := remoteHeadlessService(name, namespace, resourceVersion, nil, ports) + + bytes, err := yaml.Marshal(svc) + if err != nil { + log.Fatal(err) + } + return string(bytes) +} + +func remoteHeadlessEndpointsAsYaml(name, namespace, resourceVersion, address string, ports []corev1.EndpointPort) string { + ep := remoteHeadlessEndpoints(name, namespace, resourceVersion, address, ports) + + bytes, err := yaml.Marshal(ep) + if err != nil { + log.Fatal(err) + } + return string(bytes) +} func mirrorService(name, namespace, resourceVersion string, ports []corev1.ServicePort) *corev1.Service { annotations := make(map[string]string) annotations[consts.RemoteResourceVersionAnnotation] = resourceVersion @@ -491,6 +786,39 @@ func mirrorService(name, namespace, resourceVersion string, ports []corev1.Servi } } +func headlessMirrorService(name, namespace, resourceVersion string, ports []corev1.ServicePort) *corev1.Service { + svc := mirrorService(name, namespace, resourceVersion, ports) + svc.Spec.ClusterIP = "None" + return svc +} + +func endpointMirrorService(hostname, rootName, namespace, resourceVersion string, ports []corev1.ServicePort) *corev1.Service { + annotations := make(map[string]string) + annotations[consts.RemoteResourceVersionAnnotation] = resourceVersion + annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.%s.svc.cluster.local", hostname, strings.Replace(rootName, "-remote", "", 1), namespace) + + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", hostname, clusterName), + Namespace: namespace, + Labels: map[string]string{ + + consts.MirroredHeadlessSvcNameLabel: rootName, + consts.RemoteClusterNameLabel: clusterName, + consts.MirroredResourceLabel: "true", + }, + Annotations: annotations, + }, + Spec: corev1.ServiceSpec{ + Ports: ports, + }, + } +} + func mirrorServiceAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string { svc := mirrorService(name, namespace, resourceVersion, ports) @@ -501,6 +829,27 @@ func mirrorServiceAsYaml(name, namespace, resourceVersion string, ports []corev1 return string(bytes) } +func headlessMirrorAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string { + svc := headlessMirrorService(name, namespace, resourceVersion, ports) + + bytes, err := yaml.Marshal(svc) + if err != nil { + log.Fatal(err) + } + return string(bytes) +} + +func endpointMirrorAsYaml(hostname, rootName, namespace, resourceVersion string, ports []corev1.ServicePort) string { + svc := endpointMirrorService(hostname, rootName, namespace, resourceVersion, ports) + + bytes, err := yaml.Marshal(svc) + if err != nil { + log.Fatal(err) + } + + return string(bytes) +} + func gateway(name, namespace, resourceVersion, ip, hostname, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) *corev1.Service { svc := corev1.Service{ TypeMeta: metav1.TypeMeta{ @@ -593,6 +942,94 @@ func endpoints(name, namespace, gatewayIP string, gatewayIdentity string, ports return endpoints } +func endpointMirrorEndpoints(rootName, namespace, hostname, gatewayIP, gatewayIdentity string, ports []corev1.EndpointPort) *corev1.Endpoints { + localName := fmt.Sprintf("%s-%s", hostname, clusterName) + ep := endpoints(localName, namespace, gatewayIP, gatewayIdentity, ports) + + ep.Annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.%s.svc.cluster.local", hostname, strings.Replace(rootName, "-remote", "", 1), namespace) + ep.Labels[consts.MirroredHeadlessSvcNameLabel] = rootName + + return ep +} + +func headlessMirrorEndpoints(name, namespace, hostname, hostIP, gatewayIdentity string, ports []corev1.EndpointPort) *corev1.Endpoints { + endpoints := &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + Kind: "Endpoints", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + consts.RemoteClusterNameLabel: clusterName, + consts.MirroredResourceLabel: "true", + }, + Annotations: map[string]string{ + consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.cluster.local", strings.Replace(name, "-remote", "", 1), namespace), + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + Hostname: hostname, + IP: hostIP, + }, + }, + Ports: ports, + }, + }, + } + + if gatewayIdentity != "" { + endpoints.Annotations[consts.RemoteGatewayIdentity] = gatewayIdentity + } + + return endpoints +} + +func headlessMirrorEndpointsUpdated(name, namespace string, hostnames, hostIPs []string, gatewayIdentity string, ports []corev1.EndpointPort) *corev1.Endpoints { + endpoints := &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + Kind: "Endpoints", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + consts.RemoteClusterNameLabel: clusterName, + consts.MirroredResourceLabel: "true", + }, + Annotations: map[string]string{ + consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.cluster.local", strings.Replace(name, "-remote", "", 1), namespace), + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + Hostname: hostnames[0], + IP: hostIPs[0], + }, + { + Hostname: hostnames[1], + IP: hostIPs[1], + }, + }, + Ports: ports, + }, + }, + } + + if gatewayIdentity != "" { + endpoints.Annotations[consts.RemoteGatewayIdentity] = gatewayIdentity + } + + return endpoints +} + func endpointsAsYaml(name, namespace, gatewayIP, gatewayIdentity string, ports []corev1.EndpointPort) string { ep := endpoints(name, namespace, gatewayIP, gatewayIdentity, ports) @@ -602,3 +1039,25 @@ func endpointsAsYaml(name, namespace, gatewayIP, gatewayIdentity string, ports [ } return string(bytes) } + +func headlessMirrorEndpointsAsYaml(name, namespace, hostname, hostIP, gatewayIdentity string, ports []corev1.EndpointPort) string { + ep := headlessMirrorEndpoints(name, namespace, hostname, hostIP, gatewayIdentity, ports) + + bytes, err := yaml.Marshal(ep) + if err != nil { + log.Fatal(err) + } + + return string(bytes) +} + +func endpointMirrorEndpointsAsYaml(name, namespace, hostname, gatewayIP, gatewayIdentity string, ports []corev1.EndpointPort) string { + ep := endpointMirrorEndpoints(name, namespace, hostname, gatewayIP, gatewayIdentity, ports) + + bytes, err := yaml.Marshal(ep) + if err != nil { + log.Fatal(err) + } + + return string(bytes) +} diff --git a/pkg/k8s/labels.go b/pkg/k8s/labels.go index 06b764489..0d99e8b55 100644 --- a/pkg/k8s/labels.go +++ b/pkg/k8s/labels.go @@ -410,6 +410,10 @@ const ( // MirroredGatewayLabel indicates that this is a mirrored gateway MirroredGatewayLabel = SvcMirrorPrefix + "/mirrored-gateway" + // MirroredHeadlessSvcNameLabel indicates the root headless service for + // mirrored headless hosts. + MirroredHeadlessSvcNameLabel = SvcMirrorPrefix + "/headless-mirror-svc-name" + // RemoteClusterNameLabel put on a local mirrored service, it // allows us to associate a mirrored service with a remote cluster RemoteClusterNameLabel = SvcMirrorPrefix + "/cluster-name"