mirror of https://github.com/linkerd/linkerd2.git
Don't swallow error when MC gateway hostname can't be resolved (#5362)
* Don't swallow error when MC gateway hostname can't be resolved Ref #5343 When none of the gateway addresses is resolvable, propagate the error as a retryable error so it gets retried and logged. Don't create the mirrored resources if there's no success after the retries.
This commit is contained in:
parent
02b456087d
commit
131e270d5a
|
@ -339,11 +339,15 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context.
|
||||||
// new gateway being assigned or additional ports exposed. This method takes care of that.
|
// new gateway being assigned or additional ports exposed. This method takes care of that.
|
||||||
func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context.Context, ev *RemoteServiceUpdated) error {
|
func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context.Context, ev *RemoteServiceUpdated) error {
|
||||||
rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name)
|
rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name)
|
||||||
|
gatewayAddresses, err := rcsw.resolveGatewayAddress()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
copiedEndpoints := ev.localEndpoints.DeepCopy()
|
copiedEndpoints := ev.localEndpoints.DeepCopy()
|
||||||
copiedEndpoints.Subsets = []corev1.EndpointSubset{
|
copiedEndpoints.Subsets = []corev1.EndpointSubset{
|
||||||
{
|
{
|
||||||
Addresses: rcsw.resolveGatewayAddress(),
|
Addresses: gatewayAddresses,
|
||||||
Ports: rcsw.getEndpointsPorts(ev.remoteUpdate),
|
Ports: rcsw.getEndpointsPorts(ev.remoteUpdate),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -383,6 +387,11 @@ func remapRemoteServicePorts(ports []corev1.ServicePort) []corev1.ServicePort {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.Context, ev *RemoteServiceCreated) error {
|
func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.Context, ev *RemoteServiceCreated) error {
|
||||||
|
gatewayAddresses, err := rcsw.resolveGatewayAddress()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
remoteService := ev.service.DeepCopy()
|
remoteService := ev.service.DeepCopy()
|
||||||
serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name)
|
serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name)
|
||||||
localServiceName := rcsw.mirroredResourceName(remoteService.Name)
|
localServiceName := rcsw.mirroredResourceName(remoteService.Name)
|
||||||
|
@ -417,14 +426,13 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
gatewayAddress := rcsw.resolveGatewayAddress()
|
|
||||||
// only if we resolve it, we are updating the endpoints addresses and ports
|
// only if we resolve it, we are updating the endpoints addresses and ports
|
||||||
rcsw.log.Infof("Resolved gateway [%v:%d] for %s", gatewayAddress, rcsw.link.GatewayPort, serviceInfo)
|
rcsw.log.Infof("Resolved gateway [%v:%d] for %s", gatewayAddresses, rcsw.link.GatewayPort, serviceInfo)
|
||||||
|
|
||||||
if len(gatewayAddress) > 0 {
|
if len(gatewayAddresses) > 0 {
|
||||||
endpointsToCreate.Subsets = []corev1.EndpointSubset{
|
endpointsToCreate.Subsets = []corev1.EndpointSubset{
|
||||||
{
|
{
|
||||||
Addresses: gatewayAddress,
|
Addresses: gatewayAddresses,
|
||||||
Ports: rcsw.getEndpointsPorts(ev.service),
|
Ports: rcsw.getEndpointsPorts(ev.service),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -564,7 +572,7 @@ func (rcsw *RemoteClusterServiceWatcher) processNextEvent(ctx context.Context) (
|
||||||
case *OrphanedServicesGcTriggered:
|
case *OrphanedServicesGcTriggered:
|
||||||
err = rcsw.cleanupOrphanedServices(ctx)
|
err = rcsw.cleanupOrphanedServices(ctx)
|
||||||
case *RepairEndpoints:
|
case *RepairEndpoints:
|
||||||
rcsw.repairEndpoints(ctx)
|
err = rcsw.repairEndpoints(ctx)
|
||||||
default:
|
default:
|
||||||
if ev != nil || !done { // we get a nil in case we are shutting down...
|
if ev != nil || !done { // we get a nil in case we are shutting down...
|
||||||
rcsw.log.Warnf("Received unknown event: %v", ev)
|
rcsw.log.Warnf("Received unknown event: %v", ev)
|
||||||
|
@ -673,22 +681,34 @@ func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) {
|
||||||
rcsw.eventsQueue.ShutDown()
|
rcsw.eventsQueue.ShutDown()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rcsw *RemoteClusterServiceWatcher) resolveGatewayAddress() []corev1.EndpointAddress {
|
func (rcsw *RemoteClusterServiceWatcher) resolveGatewayAddress() ([]corev1.EndpointAddress, error) {
|
||||||
var gatewayEndpoints []corev1.EndpointAddress
|
var gatewayEndpoints []corev1.EndpointAddress
|
||||||
|
var errors []error
|
||||||
for _, addr := range strings.Split(rcsw.link.GatewayAddress, ",") {
|
for _, addr := range strings.Split(rcsw.link.GatewayAddress, ",") {
|
||||||
resolved := addr
|
|
||||||
ipAddr, err := net.ResolveIPAddr("ip", addr)
|
ipAddr, err := net.ResolveIPAddr("ip", addr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
resolved = ipAddr.String()
|
gatewayEndpoints = append(gatewayEndpoints, corev1.EndpointAddress{
|
||||||
|
IP: ipAddr.String(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("Error resolving '%s': %s", addr, err)
|
||||||
|
rcsw.log.Warn(err)
|
||||||
|
errors = append(errors, err)
|
||||||
}
|
}
|
||||||
gatewayEndpoints = append(gatewayEndpoints, corev1.EndpointAddress{
|
|
||||||
IP: resolved,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
return gatewayEndpoints
|
// one resolved address is enough
|
||||||
|
if len(gatewayEndpoints) > 0 {
|
||||||
|
return gatewayEndpoints, nil
|
||||||
|
}
|
||||||
|
return nil, RetryableError{errors}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) {
|
func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) error {
|
||||||
|
gatewayAddresses, err := rcsw.resolveGatewayAddress()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
endpointRepairCounter.With(prometheus.Labels{
|
endpointRepairCounter.With(prometheus.Labels{
|
||||||
gatewayClusterName: rcsw.link.TargetClusterName,
|
gatewayClusterName: rcsw.link.TargetClusterName,
|
||||||
}).Inc()
|
}).Inc()
|
||||||
|
@ -709,7 +729,7 @@ func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) {
|
||||||
},
|
},
|
||||||
Subsets: []corev1.EndpointSubset{
|
Subsets: []corev1.EndpointSubset{
|
||||||
{
|
{
|
||||||
Addresses: rcsw.resolveGatewayAddress(),
|
Addresses: gatewayAddresses,
|
||||||
Ports: []corev1.EndpointPort{
|
Ports: []corev1.EndpointPort{
|
||||||
{
|
{
|
||||||
Name: "mc-probe",
|
Name: "mc-probe",
|
||||||
|
@ -721,7 +741,7 @@ func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := rcsw.createOrUpdateEndpoints(ctx, gatewayMirrorEndpoints)
|
err = rcsw.createOrUpdateEndpoints(ctx, gatewayMirrorEndpoints)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rcsw.log.Errorf("Failed to create/update gateway mirror endpoints: %s", err)
|
rcsw.log.Errorf("Failed to create/update gateway mirror endpoints: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -743,7 +763,7 @@ func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) {
|
||||||
updatedEndpoints := endpoints.DeepCopy()
|
updatedEndpoints := endpoints.DeepCopy()
|
||||||
updatedEndpoints.Subsets = []corev1.EndpointSubset{
|
updatedEndpoints.Subsets = []corev1.EndpointSubset{
|
||||||
{
|
{
|
||||||
Addresses: rcsw.resolveGatewayAddress(),
|
Addresses: gatewayAddresses,
|
||||||
Ports: rcsw.getEndpointsPorts(updatedService),
|
Ports: rcsw.getEndpointsPorts(updatedService),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -764,6 +784,8 @@ func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) {
|
||||||
rcsw.log.Error(err)
|
rcsw.log.Error(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rcsw *RemoteClusterServiceWatcher) createOrUpdateEndpoints(ctx context.Context, ep *corev1.Endpoints) error {
|
func (rcsw *RemoteClusterServiceWatcher) createOrUpdateEndpoints(ctx context.Context, ep *corev1.Endpoints) error {
|
||||||
|
|
Loading…
Reference in New Issue