diff --git a/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml b/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml index 33f1c0707..1e99a5522 100644 --- a/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml +++ b/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml @@ -52,6 +52,9 @@ rules: - apiGroups: ["multicluster.linkerd.io"] resources: ["links"] verbs: ["list", "get", "watch"] + - apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["create", "get", "update", "patch"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/multicluster/cmd/service-mirror/main.go b/multicluster/cmd/service-mirror/main.go index b7de53adb..5194dcff7 100644 --- a/multicluster/cmd/service-mirror/main.go +++ b/multicluster/cmd/service-mirror/main.go @@ -21,9 +21,21 @@ import ( dynamic "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" ) -const linkWatchRestartAfter = 10 * time.Second +const ( + linkWatchRestartAfter = 10 * time.Second + // Duration of the lease + LEASE_DURATION = 30 * time.Second + // Deadline for the leader to refresh its lease. Defaults to the same value + // used by core controllers + LEASE_RENEW_DEADLINE = 10 * time.Second + // Duration leader elector clients should wait between action re-tries. + // Defaults to the same value used by core controllers + LEASE_RETRY_PERIOD = 2 * time.Second +) var ( clusterWatcher *servicemirror.RemoteClusterServiceWatcher @@ -55,8 +67,17 @@ func Main(args []string) { } }() + rootCtx, cancel := context.WithCancel(context.Background()) + stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + go func() { + <-stop + log.Info("Received shutdown signal") + // Cancel root context. Cancellation will be propagated to all other + // contexts that are children of the root context. + cancel() + }() // We create two different kubernetes API clients for the local cluster: // k8sAPI is used as a dynamic client for unstructured access to Link custom @@ -69,10 +90,8 @@ func Main(args []string) { if err != nil { log.Fatalf("Failed to initialize K8s API: %s", err) } - - ctx := context.Background() controllerK8sAPI, err := controllerK8s.InitializeAPI( - ctx, + rootCtx, *kubeConfigPath, false, controllerK8s.NS, @@ -84,79 +103,167 @@ func Main(args []string) { } linkClient := k8sAPI.DynamicClient.Resource(multicluster.LinkGVR).Namespace(*namespace) - metrics := servicemirror.NewProbeMetricVecs() - controllerK8sAPI.Sync(nil) ready = true - -main: - for { - // Start link watch - linkWatch, err := linkClient.Watch(ctx, metav1.ListOptions{}) - if err != nil { - log.Fatalf("Failed to watch Link %s: %s", linkName, err) - } - results := linkWatch.ResultChan() - - // Each time the link resource is updated, reload the config and restart the - // cluster watcher. + run := func(ctx context.Context) { + main: for { - select { - case <-stop: - break main - case event, ok := <-results: - if !ok { - log.Info("Link watch terminated; restarting watch") - continue main - } - switch obj := event.Object.(type) { - case *dynamic.Unstructured: - if obj.GetName() == linkName { - switch event.Type { - case watch.Added, watch.Modified: - link, err := multicluster.NewLink(*obj) - if err != nil { - log.Errorf("Failed to parse link %s: %s", linkName, err) - continue - } - log.Infof("Got updated link %s: %+v", linkName, link) - creds, err := loadCredentials(ctx, link, *namespace, k8sAPI) - if err != nil { - log.Errorf("Failed to load remote cluster credentials: %s", err) - } - err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc) - if err != nil { - // failed to restart cluster watcher; give a bit of slack - // and restart the link watch to give it another try - log.Error(err) - time.Sleep(linkWatchRestartAfter) - linkWatch.Stop() - } - case watch.Deleted: - log.Infof("Link %s deleted", linkName) - if clusterWatcher != nil { - clusterWatcher.Stop(false) - clusterWatcher = nil - } - if probeWorker != nil { - probeWorker.Stop() - probeWorker = nil - } - default: - log.Infof("Ignoring event type %s", event.Type) - } + // Start link watch + linkWatch, err := linkClient.Watch(ctx, metav1.ListOptions{}) + if err != nil { + log.Fatalf("Failed to watch Link %s: %s", linkName, err) + } + results := linkWatch.ResultChan() + + // Each time the link resource is updated, reload the config and restart the + // cluster watcher. + for { + select { + // ctx.Done() is a one-shot channel that will be closed once + // the context has been cancelled. Receiving from a closed + // channel yields the value immediately. + case <-ctx.Done(): + // The channel will be closed by the leader elector when a + // lease is lost, or by a background task handling SIGTERM. + // Before terminating the loop, stop the workers and set + // them to nil to release memory. + cleanupWorkers() + return + case event, ok := <-results: + if !ok { + log.Info("Link watch terminated; restarting watch") + continue main + } + switch obj := event.Object.(type) { + case *dynamic.Unstructured: + if obj.GetName() == linkName { + switch event.Type { + case watch.Added, watch.Modified: + link, err := multicluster.NewLink(*obj) + if err != nil { + log.Errorf("Failed to parse link %s: %s", linkName, err) + continue + } + log.Infof("Got updated link %s: %+v", linkName, link) + creds, err := loadCredentials(ctx, link, *namespace, k8sAPI) + if err != nil { + log.Errorf("Failed to load remote cluster credentials: %s", err) + } + err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc) + if err != nil { + // failed to restart cluster watcher; give a bit of slack + // and restart the link watch to give it another try + log.Error(err) + time.Sleep(linkWatchRestartAfter) + linkWatch.Stop() + } + case watch.Deleted: + log.Infof("Link %s deleted", linkName) + cleanupWorkers() + default: + log.Infof("Ignoring event type %s", event.Type) + } + } + default: + log.Errorf("Unknown object type detected: %+v", obj) } - default: - log.Errorf("Unknown object type detected: %+v", obj) } } } } + + hostname, found := os.LookupEnv("HOSTNAME") + if !found { + log.Fatal("Failed to fetch 'HOSTNAME' environment variable") + } + + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("service-mirror-write-%s", linkName), + Namespace: *namespace, + Labels: map[string]string{ + "component": "linkerd-service-mirror", + "mirror.linkerd.io/cluster-name": linkName, + }, + }, + Client: k8sAPI.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: hostname, + }, + } + +election: + for { + // RunOrDie will block until the lease is lost. + // + // When a lease is acquired, the OnStartedLeading callback will be + // triggered, and a main watcher loop will be established to watch Link + // resources. + // + // When the lease is lost, all watchers will be cleaned-up and we will + // loop then attempt to re-acquire the lease. + leaderelection.RunOrDie(rootCtx, leaderelection.LeaderElectionConfig{ + // When runtime context is cancelled, lock will be released. Implies any + // code guarded by the lease _must_ finish before cancelling. + ReleaseOnCancel: true, + Lock: lock, + LeaseDuration: LEASE_DURATION, + RenewDeadline: LEASE_RENEW_DEADLINE, + RetryPeriod: LEASE_RETRY_PERIOD, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + // When a lease is lost, RunOrDie will cancel the context + // passed into the OnStartedLeading callback. This will in + // turn cause us to cancel the work in the run() function, + // effectively terminating and cleaning-up the watches. + log.Info("Starting controller loop") + run(ctx) + }, + OnStoppedLeading: func() { + log.Infof("%s released lease", hostname) + }, + OnNewLeader: func(identity string) { + if identity == hostname { + log.Infof("%s acquired lease", hostname) + } + }, + }, + }) + + select { + // If the lease has been lost, and we have received a shutdown signal, + // break the loop and gracefully exit. We can guarantee at this point + // resources have been released. + case <-rootCtx.Done(): + break election + // If the lease has been lost, loop and attempt to re-acquire it. + default: + + } + } log.Info("Shutting down") } +// cleanupWorkers is a utility function that checks whether the worker pointers +// (clusterWatcher and probeWorker) are instantiated, and if they are, stops +// their execution and sets the pointers to a nil value so that memory may be +// garbage collected. +func cleanupWorkers() { + if clusterWatcher != nil { + // release, but do not clean-up services created + // the `unlink` command will take care of that + clusterWatcher.Stop(false) + clusterWatcher = nil + } + + if probeWorker != nil { + probeWorker.Stop() + probeWorker = nil + } +} + func loadCredentials(ctx context.Context, link multicluster.Link, namespace string, k8sAPI *k8s.KubernetesAPI) ([]byte, error) { // Load the credentials secret secret, err := k8sAPI.Interface.CoreV1().Secrets(namespace).Get(ctx, link.ClusterCredentialsSecret, metav1.GetOptions{}) @@ -177,12 +284,8 @@ func restartClusterWatcher( metrics servicemirror.ProbeMetricVecs, enableHeadlessSvc bool, ) error { - if clusterWatcher != nil { - clusterWatcher.Stop(false) - } - if probeWorker != nil { - probeWorker.Stop() - } + + cleanupWorkers() // Start probe worker workerMetrics, err := metrics.NewWorkerMetrics(link.TargetClusterName) diff --git a/multicluster/cmd/testdata/serivce_mirror_default.golden b/multicluster/cmd/testdata/serivce_mirror_default.golden index 8f8152e7f..99ad7092b 100644 --- a/multicluster/cmd/testdata/serivce_mirror_default.golden +++ b/multicluster/cmd/testdata/serivce_mirror_default.golden @@ -49,6 +49,9 @@ rules: - apiGroups: ["multicluster.linkerd.io"] resources: ["links"] verbs: ["list", "get", "watch"] + - apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["create", "get", "update", "patch"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/multicluster/cmd/unlink.go b/multicluster/cmd/unlink.go index 16a346180..e46af19b8 100644 --- a/multicluster/cmd/unlink.go +++ b/multicluster/cmd/unlink.go @@ -13,6 +13,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" appsv1 "k8s.io/api/apps/v1" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -67,10 +68,11 @@ func newUnlinkCommand() *cobra.Command { roleBinding := resource.NewNamespaced(rbac.SchemeGroupVersion.String(), "RoleBinding", fmt.Sprintf("linkerd-service-mirror-read-remote-creds-%s", opts.clusterName), opts.namespace) serviceAccount := resource.NewNamespaced(corev1.SchemeGroupVersion.String(), "ServiceAccount", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace) serviceMirror := resource.NewNamespaced(appsv1.SchemeGroupVersion.String(), "Deployment", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace) + lease := resource.NewNamespaced(coordinationv1.SchemeGroupVersion.String(), "Lease", fmt.Sprintf("service-mirror-write-%s", opts.clusterName), opts.namespace) resources := []resource.Kubernetes{ secret, gatewayMirror, link, clusterRole, clusterRoleBinding, - role, roleBinding, serviceAccount, serviceMirror, + role, roleBinding, serviceAccount, serviceMirror, lease, } selector := fmt.Sprintf("%s=%s,%s=%s",