From 755538b84a6411405b2c2d6d22d32c350d404be0 Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Mon, 15 Jun 2020 10:33:49 -0700 Subject: [PATCH] Resolve gateway hostnames into IP addresses (#4588) Fixes #4582 When a target cluster gateway is exposed as a hostname rather than with a fixed IP address, the service mirror controller fails to create mirror services and gateway mirrors for that gateway. This is because we only look at the IP field of the gateway service. We make two changes to address this problem: First, when extracting the gateway spec from a gateway that has a hostname instead of an IP address, we do a DNS lookup to resolve that hostname into an IP address to use in the mirror service endpoints and gateway mirror endpoints. Second, we schedule a repair job on a regular (1 minute) to update these endpoint objects. This has the effect of re-resolving the DNS names every minute to pick up any changes in DNS resolution. Signed-off-by: Alex Leong --- .../cmd/service-mirror/cluster_watcher.go | 88 ++++++++++++++++++- .../cluster_watcher_mirroring_test.go | 32 ++++++- .../cluster_watcher_test_util.go | 33 ++++--- .../cmd/service-mirror/config_watcher.go | 7 +- .../cmd/service-mirror/events_formatting.go | 4 + controller/cmd/service-mirror/main.go | 3 +- controller/cmd/service-mirror/metrics.go | 12 +++ 7 files changed, 163 insertions(+), 16 deletions(-) diff --git a/controller/cmd/service-mirror/cluster_watcher.go b/controller/cmd/service-mirror/cluster_watcher.go index 12f0b33f6..883ab190e 100644 --- a/controller/cmd/service-mirror/cluster_watcher.go +++ b/controller/cmd/service-mirror/cluster_watcher.go @@ -3,11 +3,14 @@ package servicemirror import ( "errors" "fmt" + "net" "strconv" "strings" + "time" "github.com/linkerd/linkerd2/controller/k8s" consts "github.com/linkerd/linkerd2/pkg/k8s" + "github.com/prometheus/client_golang/prometheus" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -36,6 +39,7 @@ type ( log *logging.Entry eventsQueue workqueue.RateLimitingInterface requeueLimit int + repairPeriod time.Duration } // ProbeConfig describes the configured probe on particular gateway (if presents) @@ -136,6 +140,10 @@ type ( svc *corev1.Service } + // RepairEndpoints is issued when the service mirror and mirror gateway + // endpoints should be resolved based on the remote gateway and updated. + RepairEndpoints struct{} + gatewayMetadata struct { Name string Namespace string @@ -171,6 +179,7 @@ func NewRemoteClusterServiceWatcher( cfg *rest.Config, clusterName string, requeueLimit int, + repairPeriod time.Duration, clusterDomain string, ) (*RemoteClusterServiceWatcher, error) { remoteAPI, err := k8s.InitializeAPIForConfig(cfg, false, k8s.Svc) @@ -191,6 +200,7 @@ func NewRemoteClusterServiceWatcher( }), eventsQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), requeueLimit: requeueLimit, + repairPeriod: repairPeriod, }, nil } @@ -990,6 +1000,8 @@ func (rcsw *RemoteClusterServiceWatcher) processNextEvent() (bool, interface{}, err = rcsw.cleanupMirroredResources() case *OprhanedServicesGcTriggered: err = rcsw.cleanupOrphanedServices() + case *RepairEndpoints: + rcsw.repairEndpoints() default: if ev != nil || !done { // we get a nil in case we are shutting down... rcsw.log.Warnf("Received unknown event: %v", ev) @@ -1005,6 +1017,7 @@ func (rcsw *RemoteClusterServiceWatcher) processNextEvent() (bool, interface{}, func (rcsw *RemoteClusterServiceWatcher) processEvents() { for { done, event, err := rcsw.processNextEvent() + rcsw.eventsQueue.Done(event) // the logic here is that there might have been an API // connectivity glitch or something. So its not a bad idea to requeue // the event and try again up to a number of limits, just to ensure @@ -1066,6 +1079,20 @@ func (rcsw *RemoteClusterServiceWatcher) Start() error { }, ) go rcsw.processEvents() + + go func() { + ticker := time.NewTicker(rcsw.repairPeriod) + for { + select { + case <-ticker.C: + ev := RepairEndpoints{} + rcsw.eventsQueue.Add(&ev) + case <-rcsw.stopper: + return + } + } + }() + return nil } @@ -1121,9 +1148,16 @@ func (rcsw *RemoteClusterServiceWatcher) extractGatewaySpec(gateway *corev1.Serv var gatewayEndpoints []corev1.EndpointAddress for _, ingress := range gateway.Status.LoadBalancer.Ingress { + ip := ingress.IP + if ip == "" { + ipAddr, err := net.ResolveIPAddr("ip", ingress.Hostname) + if err != nil { + return nil, err + } + ip = ipAddr.String() + } gatewayEndpoints = append(gatewayEndpoints, corev1.EndpointAddress{ - IP: ingress.IP, - Hostname: ingress.Hostname, + IP: ip, }) } @@ -1144,3 +1178,53 @@ func (rcsw *RemoteClusterServiceWatcher) extractGatewaySpec(gateway *corev1.Serv ProbeConfig: probeConfig, }, nil } + +// repairEndpoints will look up all remote gateways and update the endpoints +// of all local mirror services for those gateways. Note that we ignore resource +// version and update ALL affected endpoints objects. This is because the +// remote gateway may be exposed as a DNS hostname and we want to re-resolve +// this DNS name in case its IP address has changed. By invoking repairEndpoints +// frequently, we can pick up any DNS changes fairly quickly. +// TODO: Replace this with a more robust solution that does not rely on +// frequently repairing endpoints to pick up DNS updates. +func (rcsw *RemoteClusterServiceWatcher) repairEndpoints() { + svcs, err := rcsw.remoteAPIClient.Svc().Lister().Services(metav1.NamespaceAll).List(labels.Everything()) + if err != nil { + rcsw.log.Errorf("failed to list remote gateways: %s", err) + return + } + rcsw.log.Errorf("During repair, found %d remote services", len(svcs)) + for _, svc := range svcs { + if isGateway(svc.Annotations) { + + // We omit a resource version here because we want to get ALL mirror + // services for this gateway. + affectedServices, err := rcsw.affectedMirroredServicesForGatewayUpdate(&gatewayMetadata{ + Name: svc.Name, + Namespace: svc.Namespace, + }, "") + if err != nil { + rcsw.log.Errorf("failed to determine mirror services for gateway %s.%s: %s", svc.Name, svc.Namespace, err) + continue + } + + spec, err := rcsw.extractGatewaySpec(svc) + if err != nil { + rcsw.log.Errorf("failed to extract spec for gateway %s.%s: %s", svc.Name, svc.Namespace, err) + continue + } + + endpointRepairCounter.With(prometheus.Labels{ + gatewayNameLabel: svc.Name, + gatewayNamespaceLabel: svc.Namespace, + gatewayClusterName: rcsw.clusterName, + }).Inc() + + rcsw.log.Errorf("adding gateway update event %s with %d mirrro services", svc.Name, len(affectedServices)) + rcsw.eventsQueue.Add(&RemoteGatewayUpdated{ + gatewaySpec: *spec, + affectedServices: affectedServices, + }) + } + } +} diff --git a/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go b/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go index 468e18e0f..ce434f8e2 100644 --- a/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go +++ b/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go @@ -2,10 +2,12 @@ package servicemirror import ( "fmt" + "net" "reflect" "testing" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" ) @@ -83,7 +85,7 @@ func (tc *mirroringTestCase) run(t *testing.T) { for _, ev := range tc.expectedEventsInQueue { evInQueue, _ := q.Get() if !reflect.DeepEqual(ev, evInQueue) { - t.Fatalf("was expecting to see event %T but got %T", ev, evInQueue) + t.Fatalf("was expecting to see event %s but got %s", ev, evInQueue) } } }) @@ -262,6 +264,12 @@ func TestRemoteServiceUpdatedMirroring(t *testing.T) { } func TestRemoteGatewayUpdatedMirroring(t *testing.T) { + + localhostIP, err := net.ResolveIPAddr("ip", "localhost") + if err != nil { + t.Fatal(err) + } + for _, tt := range []mirroringTestCase{ { description: "endpoints ports are updated on gateway change", @@ -377,6 +385,28 @@ func TestRemoteGatewayUpdatedMirroring(t *testing.T) { }}), }, }, + { + description: "gateway uses hostname address", + environment: remoteGatewayUpdatedWithHostnameAddress, + expectedEventsInQueue: []interface{}{ + &RemoteGatewayUpdated{ + gatewaySpec: GatewaySpec{ + gatewayName: "gateway", + gatewayNamespace: "gateway-ns", + clusterName: "remote", + addresses: []corev1.EndpointAddress{{IP: localhostIP.String()}}, + incomingPort: 999, + resourceVersion: "currentGatewayResVersion", + ProbeConfig: &ProbeConfig{ + path: defaultProbePath, + port: defaultProbePort, + periodInSeconds: defaultProbePeriod, + }, + }, + affectedServices: []*v1.Service{}, + }, + }, + }, } { tc := tt // pin tc.run(t) diff --git a/controller/cmd/service-mirror/cluster_watcher_test_util.go b/controller/cmd/service-mirror/cluster_watcher_test_util.go index c2605163f..b59dc0b61 100644 --- a/controller/cmd/service-mirror/cluster_watcher_test_util.go +++ b/controller/cmd/service-mirror/cluster_watcher_test_util.go @@ -104,7 +104,7 @@ var createServiceWrongGatewaySpec = &testEnvironment{ }, }, remoteResources: []string{ - gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-wrong", 888, "", 111, "/path", 666), + gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "", "mc-wrong", 888, "", 111, "/path", 666), }, } @@ -130,7 +130,7 @@ var createServiceOkeGatewaySpec = &testEnvironment{ }, }, remoteResources: []string{ - gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod), + gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "", "mc-gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod), }, } @@ -193,7 +193,7 @@ var updateServiceToNewGateway = &testEnvironment{ }, }, remoteResources: []string{ - gatewayAsYaml("gateway-new", "gateway-ns", "currentGatewayResVersion", "0.0.0.0", "mc-gateway", 999, "", defaultProbePort, defaultProbePath, defaultProbePeriod), + gatewayAsYaml("gateway-new", "gateway-ns", "currentGatewayResVersion", "0.0.0.0", "", "mc-gateway", 999, "", defaultProbePort, defaultProbePath, defaultProbePeriod), }, localResources: []string{ mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "past", "pastGatewayResVersion", []corev1.ServicePort{ @@ -269,7 +269,7 @@ var updateServiceWithChangedPorts = &testEnvironment{ }, }, remoteResources: []string{ - gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "mc-gateway", 888, "", defaultProbePort, defaultProbePath, defaultProbePeriod), + gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "", "mc-gateway", 888, "", defaultProbePort, defaultProbePath, defaultProbePeriod), }, localResources: []string{ mirroredServiceAsYaml("test-service-remote", "test-namespace", "gateway", "gateway-ns", "past", "pastGatewayResVersion", []corev1.ServicePort{ @@ -378,6 +378,15 @@ var remoteGatewayUpdated = &testEnvironment{ }, } +var remoteGatewayUpdatedWithHostnameAddress = &testEnvironment{ + events: []interface{}{ + &RepairEndpoints{}, + }, + remoteResources: []string{ + gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "", "localhost", "mc-gateway", 999, "", defaultProbePort, defaultProbePath, defaultProbePeriod), + }, +} + var gatewayAddressChanged = &testEnvironment{ events: []interface{}{ &RemoteGatewayUpdated{ @@ -774,7 +783,7 @@ func mirroredServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion, gat return string(bytes) } -func gateway(name, namespace, resourceVersion, ip, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) *corev1.Service { +func gateway(name, namespace, resourceVersion, ip, hostname, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) *corev1.Service { svc := corev1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", @@ -785,9 +794,10 @@ func gateway(name, namespace, resourceVersion, ip, portName string, port int32, Namespace: namespace, ResourceVersion: resourceVersion, Annotations: map[string]string{ - consts.GatewayIdentity: identity, - consts.GatewayProbePath: probePath, - consts.GatewayProbePeriod: fmt.Sprint(probePeriod), + consts.GatewayIdentity: identity, + consts.GatewayProbePath: probePath, + consts.GatewayProbePeriod: fmt.Sprint(probePeriod), + consts.MulticlusterGatewayAnnotation: "true", }, }, Spec: corev1.ServiceSpec{ @@ -809,11 +819,14 @@ func gateway(name, namespace, resourceVersion, ip, portName string, port int32, if ip != "" { svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{IP: ip}) } + if hostname != "" { + svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{Hostname: hostname}) + } return &svc } -func gatewayAsYaml(name, namespace, resourceVersion, ip, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) string { - gtw := gateway(name, namespace, resourceVersion, ip, portName, port, identity, probePort, probePath, probePeriod) +func gatewayAsYaml(name, namespace, resourceVersion, ip, hostname, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) string { + gtw := gateway(name, namespace, resourceVersion, ip, hostname, portName, port, identity, probePort, probePath, probePeriod) bytes, err := yaml.Marshal(gtw) if err != nil { diff --git a/controller/cmd/service-mirror/config_watcher.go b/controller/cmd/service-mirror/config_watcher.go index 3d9f9283b..c71543aca 100644 --- a/controller/cmd/service-mirror/config_watcher.go +++ b/controller/cmd/service-mirror/config_watcher.go @@ -3,6 +3,7 @@ package servicemirror import ( "fmt" "sync" + "time" "github.com/linkerd/linkerd2/controller/k8s" log "github.com/sirupsen/logrus" @@ -21,16 +22,18 @@ type RemoteClusterConfigWatcher struct { k8sAPI *k8s.API clusterWatchers map[string]*RemoteClusterServiceWatcher requeueLimit int + repairPeriod time.Duration sync.RWMutex } // NewRemoteClusterConfigWatcher Creates a new config watcher -func NewRemoteClusterConfigWatcher(serviceMirrorNamespace string, secretsInformer cache.SharedIndexInformer, k8sAPI *k8s.API, requeueLimit int) *RemoteClusterConfigWatcher { +func NewRemoteClusterConfigWatcher(serviceMirrorNamespace string, secretsInformer cache.SharedIndexInformer, k8sAPI *k8s.API, requeueLimit int, repairPeriod time.Duration) *RemoteClusterConfigWatcher { rcw := &RemoteClusterConfigWatcher{ serviceMirrorNamespace: serviceMirrorNamespace, k8sAPI: k8sAPI, clusterWatchers: map[string]*RemoteClusterServiceWatcher{}, requeueLimit: requeueLimit, + repairPeriod: repairPeriod, } secretsInformer.AddEventHandler( cache.FilteringResourceEventHandler{ @@ -126,7 +129,7 @@ func (rcw *RemoteClusterConfigWatcher) registerRemoteCluster(secret *corev1.Secr return fmt.Errorf("there is already a cluster with name %s being watcher. Please delete its config before attempting to register a new one", config.ClusterName) } - watcher, err := NewRemoteClusterServiceWatcher(rcw.serviceMirrorNamespace, rcw.k8sAPI, clientConfig, config.ClusterName, rcw.requeueLimit, config.ClusterDomain) + watcher, err := NewRemoteClusterServiceWatcher(rcw.serviceMirrorNamespace, rcw.k8sAPI, clientConfig, config.ClusterName, rcw.requeueLimit, rcw.repairPeriod, config.ClusterDomain) if err != nil { return err } diff --git a/controller/cmd/service-mirror/events_formatting.go b/controller/cmd/service-mirror/events_formatting.go index 6a2cba1bf..02058024c 100644 --- a/controller/cmd/service-mirror/events_formatting.go +++ b/controller/cmd/service-mirror/events_formatting.go @@ -113,6 +113,10 @@ func (od OnDeleteCalled) String() string { return fmt.Sprintf("OnDeleteCalled: {svc: %s}", formatService(od.svc)) } +func (re RepairEndpoints) String() string { + return "RepairEndpoints" +} + //Events for probe manager func (ps probeSpec) String() string { diff --git a/controller/cmd/service-mirror/main.go b/controller/cmd/service-mirror/main.go index 831d8cf66..3fc6476e3 100644 --- a/controller/cmd/service-mirror/main.go +++ b/controller/cmd/service-mirror/main.go @@ -55,6 +55,7 @@ func Main(args []string) { requeueLimit := cmd.Int("event-requeue-limit", 3, "requeue limit for events") metricsAddr := cmd.String("metrics-addr", ":9999", "address to serve scrapable metrics on") namespace := cmd.String("namespace", "", "address to serve scrapable metrics on") + repairPeriod := cmd.Duration("endpoint-refresh-period", 1*time.Minute, "frequency to refresh endpoint resolution") flags.ConfigureAndParse(cmd, args) @@ -88,7 +89,7 @@ func Main(args []string) { probeManager.Start() k8sAPI.Sync(nil) - watcher := NewRemoteClusterConfigWatcher(*namespace, secretsInformer, k8sAPI, *requeueLimit) + watcher := NewRemoteClusterConfigWatcher(*namespace, secretsInformer, k8sAPI, *requeueLimit, *repairPeriod) log.Info("Started cluster config watcher") go admin.StartServer(*metricsAddr) diff --git a/controller/cmd/service-mirror/metrics.go b/controller/cmd/service-mirror/metrics.go index a5b644678..97f027b5a 100644 --- a/controller/cmd/service-mirror/metrics.go +++ b/controller/cmd/service-mirror/metrics.go @@ -29,6 +29,18 @@ type probeMetrics struct { unregister func() } +var endpointRepairCounter *prometheus.CounterVec + +func init() { + endpointRepairCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "service_mirror_endpoint_repairs", + Help: "Increments when the service mirror controller attempts to repair mirror endpoints", + }, + []string{gatewayNameLabel, gatewayNamespaceLabel, gatewayClusterName}, + ) +} + func newProbeMetricVecs() probeMetricVecs { labelNames := []string{gatewayNameLabel, gatewayNamespaceLabel, gatewayClusterName}