diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index ba91b2774..961029413 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -339,11 +339,15 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context. // new gateway being assigned or additional ports exposed. This method takes care of that. func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context.Context, ev *RemoteServiceUpdated) error { rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name) + gatewayAddresses, err := rcsw.resolveGatewayAddress() + if err != nil { + return err + } copiedEndpoints := ev.localEndpoints.DeepCopy() copiedEndpoints.Subsets = []corev1.EndpointSubset{ { - Addresses: rcsw.resolveGatewayAddress(), + Addresses: gatewayAddresses, Ports: rcsw.getEndpointsPorts(ev.remoteUpdate), }, } @@ -383,6 +387,11 @@ func remapRemoteServicePorts(ports []corev1.ServicePort) []corev1.ServicePort { } func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.Context, ev *RemoteServiceCreated) error { + gatewayAddresses, err := rcsw.resolveGatewayAddress() + if err != nil { + return err + } + remoteService := ev.service.DeepCopy() serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name) localServiceName := rcsw.mirroredResourceName(remoteService.Name) @@ -417,14 +426,13 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context. }, } - gatewayAddress := rcsw.resolveGatewayAddress() // only if we resolve it, we are updating the endpoints addresses and ports - rcsw.log.Infof("Resolved gateway [%v:%d] for %s", gatewayAddress, rcsw.link.GatewayPort, serviceInfo) + rcsw.log.Infof("Resolved gateway [%v:%d] for %s", gatewayAddresses, rcsw.link.GatewayPort, serviceInfo) - if len(gatewayAddress) > 0 { + if len(gatewayAddresses) > 0 { endpointsToCreate.Subsets = []corev1.EndpointSubset{ { - Addresses: gatewayAddress, + Addresses: gatewayAddresses, Ports: rcsw.getEndpointsPorts(ev.service), }, } @@ -564,7 +572,7 @@ func (rcsw *RemoteClusterServiceWatcher) processNextEvent(ctx context.Context) ( case *OrphanedServicesGcTriggered: err = rcsw.cleanupOrphanedServices(ctx) case *RepairEndpoints: - rcsw.repairEndpoints(ctx) + err = rcsw.repairEndpoints(ctx) default: if ev != nil || !done { // we get a nil in case we are shutting down... rcsw.log.Warnf("Received unknown event: %v", ev) @@ -673,22 +681,34 @@ func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) { rcsw.eventsQueue.ShutDown() } -func (rcsw *RemoteClusterServiceWatcher) resolveGatewayAddress() []corev1.EndpointAddress { +func (rcsw *RemoteClusterServiceWatcher) resolveGatewayAddress() ([]corev1.EndpointAddress, error) { var gatewayEndpoints []corev1.EndpointAddress + var errors []error for _, addr := range strings.Split(rcsw.link.GatewayAddress, ",") { - resolved := addr ipAddr, err := net.ResolveIPAddr("ip", addr) if err == nil { - resolved = ipAddr.String() + gatewayEndpoints = append(gatewayEndpoints, corev1.EndpointAddress{ + IP: ipAddr.String(), + }) + } else { + err = fmt.Errorf("Error resolving '%s': %s", addr, err) + rcsw.log.Warn(err) + errors = append(errors, err) } - gatewayEndpoints = append(gatewayEndpoints, corev1.EndpointAddress{ - IP: resolved, - }) } - return gatewayEndpoints + // one resolved address is enough + if len(gatewayEndpoints) > 0 { + return gatewayEndpoints, nil + } + return nil, RetryableError{errors} } -func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) { +func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) error { + gatewayAddresses, err := rcsw.resolveGatewayAddress() + if err != nil { + return err + } + endpointRepairCounter.With(prometheus.Labels{ gatewayClusterName: rcsw.link.TargetClusterName, }).Inc() @@ -709,7 +729,7 @@ func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) { }, Subsets: []corev1.EndpointSubset{ { - Addresses: rcsw.resolveGatewayAddress(), + Addresses: gatewayAddresses, Ports: []corev1.EndpointPort{ { Name: "mc-probe", @@ -721,7 +741,7 @@ func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) { }, } - err := rcsw.createOrUpdateEndpoints(ctx, gatewayMirrorEndpoints) + err = rcsw.createOrUpdateEndpoints(ctx, gatewayMirrorEndpoints) if err != nil { rcsw.log.Errorf("Failed to create/update gateway mirror endpoints: %s", err) } @@ -743,7 +763,7 @@ func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) { updatedEndpoints := endpoints.DeepCopy() updatedEndpoints.Subsets = []corev1.EndpointSubset{ { - Addresses: rcsw.resolveGatewayAddress(), + Addresses: gatewayAddresses, Ports: rcsw.getEndpointsPorts(updatedService), }, } @@ -764,6 +784,8 @@ func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) { rcsw.log.Error(err) } } + + return nil } func (rcsw *RemoteClusterServiceWatcher) createOrUpdateEndpoints(ctx context.Context, ep *corev1.Endpoints) error {