mirror of https://github.com/linkerd/linkerd2.git
Reset service-mirror component when target's k8s API is unreachable (#4996)
When the service-mirror component can't reach the target's k8s API, the goroutine blocks and it can't be unblocked. This was happenining specifically in the case of the multicluster integration test (still to be pushed), where the source and target clusters are created in quick succession and the target's API service doesn't always have time to be exposed before being requested by the service mirror. The fix consists on no longer have restartClusterWatcher be side-effecting, and instead return an error. If such error is not nil then the link watcher is stopped and reset after 10 seconds.
This commit is contained in:
parent
55dd49e826
commit
b30d35f46a
|
@ -23,6 +23,8 @@ import (
|
|||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const linkWatchRestartAfter = 10 * time.Second
|
||||
|
||||
var (
|
||||
clusterWatcher *servicemirror.RemoteClusterServiceWatcher
|
||||
probeWorker *servicemirror.ProbeWorker
|
||||
|
@ -107,7 +109,14 @@ main:
|
|||
if err != nil {
|
||||
log.Errorf("Failed to load remote cluster credentials: %s", err)
|
||||
}
|
||||
restartClusterWatcher(link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics)
|
||||
err = restartClusterWatcher(link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics)
|
||||
if err != nil {
|
||||
// failed to restart cluster watcher; give a bit of slack
|
||||
// and restart the link watch to give it another try
|
||||
log.Error(err)
|
||||
time.Sleep(linkWatchRestartAfter)
|
||||
linkWatch.Stop()
|
||||
}
|
||||
case watch.Deleted:
|
||||
log.Infof("Link %s deleted", linkName)
|
||||
if clusterWatcher != nil {
|
||||
|
@ -148,7 +157,7 @@ func restartClusterWatcher(
|
|||
requeueLimit int,
|
||||
repairPeriod time.Duration,
|
||||
metrics servicemirror.ProbeMetricVecs,
|
||||
) {
|
||||
) error {
|
||||
if clusterWatcher != nil {
|
||||
clusterWatcher.Stop(false)
|
||||
}
|
||||
|
@ -158,8 +167,7 @@ func restartClusterWatcher(
|
|||
|
||||
cfg, err := clientcmd.RESTConfigFromKubeConfig(creds)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to parse kube config: %s", err)
|
||||
return
|
||||
return fmt.Errorf("Unable to parse kube config: %s", err)
|
||||
}
|
||||
|
||||
clusterWatcher, err = servicemirror.NewRemoteClusterServiceWatcher(
|
||||
|
@ -171,20 +179,19 @@ func restartClusterWatcher(
|
|||
repairPeriod,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to create cluster watcher: %s", err)
|
||||
return
|
||||
return fmt.Errorf("Unable to create cluster watcher: %s", err)
|
||||
}
|
||||
|
||||
err = clusterWatcher.Start()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to start cluster watcher: %s", err)
|
||||
return
|
||||
return fmt.Errorf("Failed to start cluster watcher: %s", err)
|
||||
}
|
||||
|
||||
workerMetrics, err := metrics.NewWorkerMetrics(link.TargetClusterName)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to create metrics for cluster watcher: %s", err)
|
||||
return fmt.Errorf("Failed to create metrics for cluster watcher: %s", err)
|
||||
}
|
||||
probeWorker = servicemirror.NewProbeWorker(fmt.Sprintf("probe-gateway-%s", link.TargetClusterName), &link.ProbeSpec, workerMetrics, link.TargetClusterName)
|
||||
probeWorker.Start()
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue