diff --git a/cli/cmd/multicluster.go b/cli/cmd/multicluster.go index 4d2c4d694..8e23e8f8d 100644 --- a/cli/cmd/multicluster.go +++ b/cli/cmd/multicluster.go @@ -20,9 +20,12 @@ import ( "github.com/linkerd/linkerd2/pkg/version" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + rbac "k8s.io/api/rbac/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd/api" "k8s.io/helm/pkg/chartutil" @@ -415,7 +418,7 @@ func newLinkCommand() *cobra.Command { cmd := &cobra.Command{ Use: "link", - Short: "Outputs a Kubernetes secret that allows a service mirror component to connect to this cluster", + Short: "Outputs resources that allow another cluster to mirror services from this one", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { @@ -633,6 +636,101 @@ func newLinkCommand() *cobra.Command { return cmd } +func newUnlinkCommand() *cobra.Command { + opts, err := newLinkOptionsWithDefault() + if err != nil { + fmt.Fprintf(os.Stderr, "%s", err) + os.Exit(1) + } + + cmd := &cobra.Command{ + Use: "unlink", + Short: "Outputs link resources for deletion", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + + if opts.clusterName == "" { + return errors.New("You need to specify cluster name") + } + + rules := clientcmd.NewDefaultClientConfigLoadingRules() + rules.ExplicitPath = kubeconfigPath + loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, &clientcmd.ConfigOverrides{}) + config, err := loader.RawConfig() + if err != nil { + return err + } + + if kubeContext != "" { + config.CurrentContext = kubeContext + } + + k, err := k8s.NewAPI(kubeconfigPath, config.CurrentContext, impersonate, impersonateGroup, 0) + if err != nil { + return err + } + + _, err = mc.GetLink(k.DynamicClient, opts.namespace, opts.clusterName) + if err != nil { + return err + } + + secret := newNamespacedKubernetesResource(corev1.SchemeGroupVersion.String(), "Secret", fmt.Sprintf("cluster-credentials-%s", opts.clusterName), opts.namespace) + gatewayMirror := newNamespacedKubernetesResource(corev1.SchemeGroupVersion.String(), "Service", fmt.Sprintf("probe-gateway-%s", opts.clusterName), opts.namespace) + link := newNamespacedKubernetesResource(k8s.LinkAPIGroupVersion, "Link", opts.clusterName, opts.namespace) + clusterRole := newKubernetesResource(rbac.SchemeGroupVersion.String(), "ClusterRole", fmt.Sprintf("linkerd-service-mirror-access-local-resources-%s", opts.clusterName)) + clusterRoleBinding := newKubernetesResource(rbac.SchemeGroupVersion.String(), "ClusterRoleBinding", fmt.Sprintf("linkerd-service-mirror-access-local-resources-%s", opts.clusterName)) + role := newNamespacedKubernetesResource(rbac.SchemeGroupVersion.String(), "Role", fmt.Sprintf("linkerd-service-mirror-read-remote-creds-%s", opts.clusterName), opts.namespace) + roleBinding := newNamespacedKubernetesResource(rbac.SchemeGroupVersion.String(), "RoleBinding", fmt.Sprintf("linkerd-service-mirror-read-remote-creds-%s", opts.clusterName), opts.namespace) + serviceAccount := newNamespacedKubernetesResource(corev1.SchemeGroupVersion.String(), "ServiceAccount", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace) + serviceMirror := newNamespacedKubernetesResource(appsv1.SchemeGroupVersion.String(), "Deployment", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace) + + resources := []kubernetesResource{ + secret, gatewayMirror, link, clusterRole, clusterRoleBinding, + role, roleBinding, serviceAccount, serviceMirror, + } + + selector := fmt.Sprintf("%s=%s,%s=%s", + k8s.MirroredResourceLabel, "true", + k8s.RemoteClusterNameLabel, opts.clusterName, + ) + svcList, err := k.CoreV1().Services(metav1.NamespaceAll).List(metav1.ListOptions{LabelSelector: selector}) + if err != nil { + return err + } + for _, svc := range svcList.Items { + resources = append(resources, + newNamespacedKubernetesResource(corev1.SchemeGroupVersion.String(), "Service", svc.Name, svc.Namespace), + ) + } + + for _, r := range resources { + r.renderResource(stdout) + } + + return nil + }, + } + + cmd.Flags().StringVar(&opts.namespace, "namespace", defaultMulticlusterNamespace, "The namespace for the service account") + cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "Cluster name") + + return cmd +} + +func newNamespacedKubernetesResource(apiVersion, kind, name, namespace string) kubernetesResource { + return kubernetesResource{ + runtime.TypeMeta{ + APIVersion: apiVersion, + Kind: kind, + }, + metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } +} + func newCmdMulticluster() *cobra.Command { multiclusterCmd := &cobra.Command{ @@ -653,6 +751,7 @@ components on a cluster, manage credentials and link clusters together.`, } multiclusterCmd.AddCommand(newLinkCommand()) + multiclusterCmd.AddCommand(newUnlinkCommand()) multiclusterCmd.AddCommand(newMulticlusterInstallCommand()) multiclusterCmd.AddCommand(newGatewaysCommand()) multiclusterCmd.AddCommand(newAllowCommand()) diff --git a/controller/cmd/service-mirror/cluster_watcher.go b/controller/cmd/service-mirror/cluster_watcher.go index b5ba26557..26d2b4c56 100644 --- a/controller/cmd/service-mirror/cluster_watcher.go +++ b/controller/cmd/service-mirror/cluster_watcher.go @@ -383,7 +383,7 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ev *RemoteSe if err := rcsw.mirrorNamespaceIfNecessary(remoteService.Namespace); err != nil { return err } - // here we always create both a service and endpoints, even if we cannot resolve the gateway + serviceToCreate := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: localServiceName, diff --git a/controller/cmd/service-mirror/main.go b/controller/cmd/service-mirror/main.go index 4faf4c577..85e226193 100644 --- a/controller/cmd/service-mirror/main.go +++ b/controller/cmd/service-mirror/main.go @@ -71,6 +71,7 @@ func Main(args []string) { controllerK8sAPI.Sync(nil) +main: for { // Start link watch linkWatch, err := linkClient.Watch(metav1.ListOptions{}) @@ -81,37 +82,52 @@ func Main(args []string) { // Each time the link resource is updated, reload the config and restart the // cluster watcher. - for event := range results { - switch obj := event.Object.(type) { - case *dynamic.Unstructured: - if obj.GetName() == linkName { - switch event.Type { - case watch.Added, watch.Modified: - link, err := multicluster.NewLink(*obj) - if err != nil { - log.Errorf("Failed to parse link %s: %s", linkName, err) - continue - } - log.Infof("Got updated link %s: %+v", linkName, link) - creds, err := loadCredentials(link, *namespace, k8sAPI) - if err != nil { - log.Errorf("Failed to load remote cluster credentials: %s", err) - } - restartClusterWatcher(link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics) - case watch.Deleted: - log.Infof("Link %s deleted", linkName) - // TODO: should we delete all mirror resources? - default: - log.Infof("Ignoring event type %s", event.Type) - } + for { + select { + case <-stop: + break main + case event, ok := <-results: + if !ok { + log.Info("Link watch terminated; restarting watch") + continue main + } + switch obj := event.Object.(type) { + case *dynamic.Unstructured: + if obj.GetName() == linkName { + switch event.Type { + case watch.Added, watch.Modified: + link, err := multicluster.NewLink(*obj) + if err != nil { + log.Errorf("Failed to parse link %s: %s", linkName, err) + continue + } + log.Infof("Got updated link %s: %+v", linkName, link) + creds, err := loadCredentials(link, *namespace, k8sAPI) + if err != nil { + log.Errorf("Failed to load remote cluster credentials: %s", err) + } + restartClusterWatcher(link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics) + case watch.Deleted: + log.Infof("Link %s deleted", linkName) + if clusterWatcher != nil { + clusterWatcher.Stop(false) + clusterWatcher = nil + } + if probeWorker != nil { + probeWorker.Stop() + probeWorker = nil + } + default: + log.Infof("Ignoring event type %s", event.Type) + } + } + default: + log.Errorf("Unknown object type detected: %+v", obj) } - default: - log.Errorf("Unknown object type detected: %+v", obj) } } - - log.Info("Link watch terminated; restarting watch") } + log.Info("Shutting down") } func loadCredentials(link multicluster.Link, namespace string, k8sAPI *k8s.KubernetesAPI) ([]byte, error) {