diff --git a/controller/cmd/service-mirror/cluster_watcher.go b/controller/cmd/service-mirror/cluster_watcher.go index 12f0b33f6..883ab190e 100644 --- a/controller/cmd/service-mirror/cluster_watcher.go +++ b/controller/cmd/service-mirror/cluster_watcher.go @@ -3,11 +3,14 @@ package servicemirror import ( "errors" "fmt" + "net" "strconv" "strings" + "time" "github.com/linkerd/linkerd2/controller/k8s" consts "github.com/linkerd/linkerd2/pkg/k8s" + "github.com/prometheus/client_golang/prometheus" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -36,6 +39,7 @@ type ( log *logging.Entry eventsQueue workqueue.RateLimitingInterface requeueLimit int + repairPeriod time.Duration } // ProbeConfig describes the configured probe on particular gateway (if presents) @@ -136,6 +140,10 @@ type ( svc *corev1.Service } + // RepairEndpoints is issued when the service mirror and mirror gateway + // endpoints should be resolved based on the remote gateway and updated. + RepairEndpoints struct{} + gatewayMetadata struct { Name string Namespace string @@ -171,6 +179,7 @@ func NewRemoteClusterServiceWatcher( cfg *rest.Config, clusterName string, requeueLimit int, + repairPeriod time.Duration, clusterDomain string, ) (*RemoteClusterServiceWatcher, error) { remoteAPI, err := k8s.InitializeAPIForConfig(cfg, false, k8s.Svc) @@ -191,6 +200,7 @@ func NewRemoteClusterServiceWatcher( }), eventsQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), requeueLimit: requeueLimit, + repairPeriod: repairPeriod, }, nil } @@ -990,6 +1000,8 @@ func (rcsw *RemoteClusterServiceWatcher) processNextEvent() (bool, interface{}, err = rcsw.cleanupMirroredResources() case *OprhanedServicesGcTriggered: err = rcsw.cleanupOrphanedServices() + case *RepairEndpoints: + rcsw.repairEndpoints() default: if ev != nil || !done { // we get a nil in case we are shutting down... rcsw.log.Warnf("Received unknown event: %v", ev) @@ -1005,6 +1017,7 @@ func (rcsw *RemoteClusterServiceWatcher) processNextEvent() (bool, interface{}, func (rcsw *RemoteClusterServiceWatcher) processEvents() { for { done, event, err := rcsw.processNextEvent() + rcsw.eventsQueue.Done(event) // the logic here is that there might have been an API // connectivity glitch or something. So its not a bad idea to requeue // the event and try again up to a number of limits, just to ensure @@ -1066,6 +1079,20 @@ func (rcsw *RemoteClusterServiceWatcher) Start() error { }, ) go rcsw.processEvents() + + go func() { + ticker := time.NewTicker(rcsw.repairPeriod) + for { + select { + case <-ticker.C: + ev := RepairEndpoints{} + rcsw.eventsQueue.Add(&ev) + case <-rcsw.stopper: + return + } + } + }() + return nil } @@ -1121,9 +1148,16 @@ func (rcsw *RemoteClusterServiceWatcher) extractGatewaySpec(gateway *corev1.Serv var gatewayEndpoints []corev1.EndpointAddress for _, ingress := range gateway.Status.LoadBalancer.Ingress { + ip := ingress.IP + if ip == "" { + ipAddr, err := net.ResolveIPAddr("ip", ingress.Hostname) + if err != nil { + return nil, err + } + ip = ipAddr.String() + } gatewayEndpoints = append(gatewayEndpoints, corev1.EndpointAddress{ - IP: ingress.IP, - Hostname: ingress.Hostname, + IP: ip, }) } @@ -1144,3 +1178,53 @@ func (rcsw *RemoteClusterServiceWatcher) extractGatewaySpec(gateway *corev1.Serv ProbeConfig: probeConfig, }, nil } + +// repairEndpoints will look up all remote gateways and update the endpoints +// of all local mirror services for those gateways. Note that we ignore resource +// version and update ALL affected endpoints objects. This is because the +// remote gateway may be exposed as a DNS hostname and we want to re-resolve +// this DNS name in case its IP address has changed. By invoking repairEndpoints +// frequently, we can pick up any DNS changes fairly quickly. +// TODO: Replace this with a more robust solution that does not rely on +// frequently repairing endpoints to pick up DNS updates. +func (rcsw *RemoteClusterServiceWatcher) repairEndpoints() { + svcs, err := rcsw.remoteAPIClient.Svc().Lister().Services(metav1.NamespaceAll).List(labels.Everything()) + if err != nil { + rcsw.log.Errorf("failed to list remote gateways: %s", err) + return + } + rcsw.log.Errorf("During repair, found %d remote services", len(svcs)) + for _, svc := range svcs { + if isGateway(svc.Annotations) { + + // We omit a resource version here because we want to get ALL mirror + // services for this gateway. + affectedServices, err := rcsw.affectedMirroredServicesForGatewayUpdate(&gatewayMetadata{ + Name: svc.Name, + Namespace: svc.Namespace, + }, "") + if err != nil { + rcsw.log.Errorf("failed to determine mirror services for gateway %s.%s: %s", svc.Name, svc.Namespace, err) + continue + } + + spec, err := rcsw.extractGatewaySpec(svc) + if err != nil { + rcsw.log.Errorf("failed to extract spec for gateway %s.%s: %s", svc.Name, svc.Namespace, err) + continue + } + + endpointRepairCounter.With(prometheus.Labels{ + gatewayNameLabel: svc.Name, + gatewayNamespaceLabel: svc.Namespace, + gatewayClusterName: rcsw.clusterName, + }).Inc() + + rcsw.log.Errorf("adding gateway update event %s with %d mirrro services", svc.Name, len(affectedServices)) + rcsw.eventsQueue.Add(&RemoteGatewayUpdated{ + gatewaySpec: *spec, + affectedServices: affectedServices, + }) + } + } +} diff --git a/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go b/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go index 468e18e0f..ce434f8e2 100644 --- a/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go +++ b/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go @@ -2,10 +2,12 @@ package servicemirror import ( "fmt" + "net" "reflect" "testing" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" ) @@ -83,7 +85,7 @@ func (tc *mirroringTestCase) run(t *testing.T) { for _, ev := range tc.expectedEventsInQueue { evInQueue, _ := q.Get() if !reflect.DeepEqual(ev, evInQueue) { - t.Fatalf("was expecting to see event %T but got %T", ev, evInQueue) + t.Fatalf("was expecting to see event %s but got %s", ev, evInQueue) } } }) @@ -262,6 +264,12 @@ func TestRemoteServiceUpdatedMirroring(t *testing.T) { } func TestRemoteGatewayUpdatedMirroring(t *testing.T) { + + localhostIP, err := net.ResolveIPAddr("ip", "localhost") + if err != nil { + t.Fatal(err) + } + for _, tt := range []mirroringTestCase{ { description: "endpoints ports are updated on gateway change", @@ -377,6 +385,28 @@ func TestRemoteGatewayUpdatedMirroring(t *testing.T) { }}), }, }, + { + description: "gateway uses hostname address", + environment: remoteGatewayUpdatedWithHostnameAddress, + expectedEventsInQueue: []interface{}{ + &RemoteGatewayUpdated{ + gatewaySpec: GatewaySpec{ + gatewayName: "gateway", + gatewayNamespace: "gateway-ns", + clusterName: "remote", + addresses: []corev1.EndpointAddress{{IP: localhostIP.String()}}, + incomingPort: 999, + resourceVersion: "currentGatewayResVersion", + ProbeConfig: &ProbeConfig{ + path: defaultProbePath, + port: defaultProbePort, + periodInSeconds: defaultProbePeriod, + }, + }, + affectedServices: []*v1.Service{}, + }, + }, + }, } { tc := tt // pin tc.run(t) diff --git a/controller/cmd/service-mirror/cluster_watcher_test_util.go b/controller/cmd/service-mirror/cluster_watcher_test_util.go index c2605163f..b59dc0b61 100644 --- a/controller/cmd/service-mirror/cluster_watcher_test_util.go +++ b/controller/cmd/service-mirror/cluster_watcher_test_util.go @@ -104,7 +104,7 @@ var createServiceWrongGatewaySpec = &testEnvironment{ }, }, remoteResources: []string{ - gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-wrong", 888, "", 111, "/path", 666), + gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "", "mc-wrong", 888, "", 111, "/path", 666), }, } @@ -130,7 +130,7 @@ var createServiceOkeGatewaySpec = &testEnvironment{ }, }, remoteResources: []string{ - gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod), + gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "", "mc-gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod), }, } @@ -193,7 +193,7 @@ var updateServiceToNewGateway = &testEnvironment{ }, }, remoteResources: []string{ - gatewayAsYaml("gateway-new", "gateway-ns", "currentGatewayResVersion", "0.0.0.0", "mc-gateway", 999, "", defaultProbePort, defaultProbePath, defaultProbePeriod), + gatewayAsYaml("gateway-new", "gateway-ns", "currentGatewayResVersion", "0.0.0.0", "", "mc-gateway", 999, "", defaultProbePort, defaultProbePath, defaultProbePeriod), }, localResources: []string{ mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "past", "pastGatewayResVersion", []corev1.ServicePort{ @@ -269,7 +269,7 @@ var updateServiceWithChangedPorts = &testEnvironment{ }, }, remoteResources: []string{ - gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "mc-gateway", 888, "", defaultProbePort, defaultProbePath, defaultProbePeriod), + gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "", "mc-gateway", 888, "", defaultProbePort, defaultProbePath, defaultProbePeriod), }, localResources: []string{ mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "past", "pastGatewayResVersion", []corev1.ServicePort{ @@ -378,6 +378,15 @@ var remoteGatewayUpdated = &testEnvironment{ }, } +var remoteGatewayUpdatedWithHostnameAddress = &testEnvironment{ + events: []interface{}{ + &RepairEndpoints{}, + }, + remoteResources: []string{ + gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "", "localhost", "mc-gateway", 999, "", defaultProbePort, defaultProbePath, defaultProbePeriod), + }, +} + var gatewayAddressChanged = &testEnvironment{ events: []interface{}{ &RemoteGatewayUpdated{ @@ -774,7 +783,7 @@ func mirroredServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion, gat return string(bytes) } -func gateway(name, namespace, resourceVersion, ip, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) *corev1.Service { +func gateway(name, namespace, resourceVersion, ip, hostname, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) *corev1.Service { svc := corev1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", @@ -785,9 +794,10 @@ func gateway(name, namespace, resourceVersion, ip, portName string, port int32, Namespace: namespace, ResourceVersion: resourceVersion, Annotations: map[string]string{ - consts.GatewayIdentity: identity, - consts.GatewayProbePath: probePath, - consts.GatewayProbePeriod: fmt.Sprint(probePeriod), + consts.GatewayIdentity: identity, + consts.GatewayProbePath: probePath, + consts.GatewayProbePeriod: fmt.Sprint(probePeriod), + consts.MulticlusterGatewayAnnotation: "true", }, }, Spec: corev1.ServiceSpec{ @@ -809,11 +819,14 @@ func gateway(name, namespace, resourceVersion, ip, portName string, port int32, if ip != "" { svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{IP: ip}) } + if hostname != "" { + svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{Hostname: hostname}) + } return &svc } -func gatewayAsYaml(name, namespace, resourceVersion, ip, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) string { - gtw := gateway(name, namespace, resourceVersion, ip, portName, port, identity, probePort, probePath, probePeriod) +func gatewayAsYaml(name, namespace, resourceVersion, ip, hostname, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) string { + gtw := gateway(name, namespace, resourceVersion, ip, hostname, portName, port, identity, probePort, probePath, probePeriod) bytes, err := yaml.Marshal(gtw) if err != nil { diff --git a/controller/cmd/service-mirror/config_watcher.go b/controller/cmd/service-mirror/config_watcher.go index 3d9f9283b..c71543aca 100644 --- a/controller/cmd/service-mirror/config_watcher.go +++ b/controller/cmd/service-mirror/config_watcher.go @@ -3,6 +3,7 @@ package servicemirror import ( "fmt" "sync" + "time" "github.com/linkerd/linkerd2/controller/k8s" log "github.com/sirupsen/logrus" @@ -21,16 +22,18 @@ type RemoteClusterConfigWatcher struct { k8sAPI *k8s.API clusterWatchers map[string]*RemoteClusterServiceWatcher requeueLimit int + repairPeriod time.Duration sync.RWMutex } // NewRemoteClusterConfigWatcher Creates a new config watcher -func NewRemoteClusterConfigWatcher(serviceMirrorNamespace string, secretsInformer cache.SharedIndexInformer, k8sAPI *k8s.API, requeueLimit int) *RemoteClusterConfigWatcher { +func NewRemoteClusterConfigWatcher(serviceMirrorNamespace string, secretsInformer cache.SharedIndexInformer, k8sAPI *k8s.API, requeueLimit int, repairPeriod time.Duration) *RemoteClusterConfigWatcher { rcw := &RemoteClusterConfigWatcher{ serviceMirrorNamespace: serviceMirrorNamespace, k8sAPI: k8sAPI, clusterWatchers: map[string]*RemoteClusterServiceWatcher{}, requeueLimit: requeueLimit, + repairPeriod: repairPeriod, } secretsInformer.AddEventHandler( cache.FilteringResourceEventHandler{ @@ -126,7 +129,7 @@ func (rcw *RemoteClusterConfigWatcher) registerRemoteCluster(secret *corev1.Secr return fmt.Errorf("there is already a cluster with name %s being watcher. Please delete its config before attempting to register a new one", config.ClusterName) } - watcher, err := NewRemoteClusterServiceWatcher(rcw.serviceMirrorNamespace, rcw.k8sAPI, clientConfig, config.ClusterName, rcw.requeueLimit, config.ClusterDomain) + watcher, err := NewRemoteClusterServiceWatcher(rcw.serviceMirrorNamespace, rcw.k8sAPI, clientConfig, config.ClusterName, rcw.requeueLimit, rcw.repairPeriod, config.ClusterDomain) if err != nil { return err } diff --git a/controller/cmd/service-mirror/events_formatting.go b/controller/cmd/service-mirror/events_formatting.go index 6a2cba1bf..02058024c 100644 --- a/controller/cmd/service-mirror/events_formatting.go +++ b/controller/cmd/service-mirror/events_formatting.go @@ -113,6 +113,10 @@ func (od OnDeleteCalled) String() string { return fmt.Sprintf("OnDeleteCalled: {svc: %s}", formatService(od.svc)) } +func (re RepairEndpoints) String() string { + return "RepairEndpoints" +} + //Events for probe manager func (ps probeSpec) String() string { diff --git a/controller/cmd/service-mirror/main.go b/controller/cmd/service-mirror/main.go index 831d8cf66..3fc6476e3 100644 --- a/controller/cmd/service-mirror/main.go +++ b/controller/cmd/service-mirror/main.go @@ -55,6 +55,7 @@ func Main(args []string) { requeueLimit := cmd.Int("event-requeue-limit", 3, "requeue limit for events") metricsAddr := cmd.String("metrics-addr", ":9999", "address to serve scrapable metrics on") namespace := cmd.String("namespace", "", "address to serve scrapable metrics on") + repairPeriod := cmd.Duration("endpoint-refresh-period", 1*time.Minute, "frequency to refresh endpoint resolution") flags.ConfigureAndParse(cmd, args) @@ -88,7 +89,7 @@ func Main(args []string) { probeManager.Start() k8sAPI.Sync(nil) - watcher := NewRemoteClusterConfigWatcher(*namespace, secretsInformer, k8sAPI, *requeueLimit) + watcher := NewRemoteClusterConfigWatcher(*namespace, secretsInformer, k8sAPI, *requeueLimit, *repairPeriod) log.Info("Started cluster config watcher") go admin.StartServer(*metricsAddr) diff --git a/controller/cmd/service-mirror/metrics.go b/controller/cmd/service-mirror/metrics.go index a5b644678..97f027b5a 100644 --- a/controller/cmd/service-mirror/metrics.go +++ b/controller/cmd/service-mirror/metrics.go @@ -29,6 +29,18 @@ type probeMetrics struct { unregister func() } +var endpointRepairCounter *prometheus.CounterVec + +func init() { + endpointRepairCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "service_mirror_endpoint_repairs", + Help: "Increments when the service mirror controller attempts to repair mirror endpoints", + }, + []string{gatewayNameLabel, gatewayNamespaceLabel, gatewayClusterName}, + ) +} + func newProbeMetricVecs() probeMetricVecs { labelNames := []string{gatewayNameLabel, gatewayNamespaceLabel, gatewayClusterName}