linkerd2/multicluster/cmd/link.go

531 lines
17 KiB
Go

package cmd
import (
"bytes"
"errors"
"fmt"
"os"
"path"
"strings"
"github.com/linkerd/linkerd2/multicluster/static"
multicluster "github.com/linkerd/linkerd2/multicluster/values"
"github.com/linkerd/linkerd2/pkg/charts"
partials "github.com/linkerd/linkerd2/pkg/charts/static"
pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
"github.com/linkerd/linkerd2/pkg/flags"
"github.com/linkerd/linkerd2/pkg/k8s"
mc "github.com/linkerd/linkerd2/pkg/multicluster"
"github.com/linkerd/linkerd2/pkg/version"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
chartloader "helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/chartutil"
valuespkg "helm.sh/helm/v3/pkg/cli/values"
"helm.sh/helm/v3/pkg/engine"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"sigs.k8s.io/yaml"
)
const (
clusterNameLabel = "multicluster.linkerd.io/cluster-name"
trustDomainAnnotation = "multicluster.linkerd.io/trust-domain"
clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain"
)
type (
linkOptions struct {
namespace string
clusterName string
apiServerAddress string
serviceAccountName string
gatewayName string
gatewayNamespace string
serviceMirrorRetryLimit uint32
logLevel string
logFormat string
controlPlaneVersion string
dockerRegistry string
selector string
remoteDiscoverySelector string
gatewayAddresses string
gatewayPort uint32
ha bool
enableGateway bool
}
)
func newLinkCommand() *cobra.Command {
opts, err := newLinkOptionsWithDefault()
// Override the default value with env registry path.
// If cli cmd contains --registry flag, it will override env variable.
if registry := os.Getenv(flags.EnvOverrideDockerRegistry); registry != "" {
opts.dockerRegistry = registry
}
var valuesOptions valuespkg.Options
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
cmd := &cobra.Command{
Use: "link",
Short: "Outputs resources that allow another cluster to mirror services from this one",
Long: `Outputs resources that allow another cluster to mirror services from this one.
Note that the Link resource applies only in one direction. In order for two
clusters to mirror each other, a Link resource will have to be generated for
each cluster and applied to the other.`,
Args: cobra.NoArgs,
Example: ` # To link the west cluster to east
linkerd --context=east multicluster link --cluster-name east | kubectl --context=west apply -f -
The command can be configured by using the --set, --values, --set-string and --set-file flags.
A full list of configurable values can be found at https://github.com/linkerd/linkerd2/blob/main/multicluster/charts/linkerd-multicluster-link/README.md
`,
RunE: func(cmd *cobra.Command, args []string) error {
if opts.clusterName == "" {
return errors.New("You need to specify cluster name")
}
configMap, err := getLinkerdConfigMap(cmd.Context())
if err != nil {
if kerrors.IsNotFound(err) {
return errors.New("you need Linkerd to be installed on a cluster in order to get its credentials")
}
return err
}
rules := clientcmd.NewDefaultClientConfigLoadingRules()
rules.ExplicitPath = kubeconfigPath
loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, &clientcmd.ConfigOverrides{})
config, err := loader.RawConfig()
if err != nil {
return err
}
if kubeContext != "" {
config.CurrentContext = kubeContext
}
k, err := k8s.NewAPI(kubeconfigPath, config.CurrentContext, impersonate, impersonateGroup, 0)
if err != nil {
return err
}
sa, err := k.CoreV1().ServiceAccounts(opts.namespace).Get(cmd.Context(), opts.serviceAccountName, metav1.GetOptions{})
if err != nil {
return err
}
listOpts := metav1.ListOptions{
FieldSelector: fmt.Sprintf("type=%s", corev1.SecretTypeServiceAccountToken),
}
secrets, err := k.CoreV1().Secrets(opts.namespace).List(cmd.Context(), listOpts)
if err != nil {
return err
}
token, err := extractSAToken(secrets.Items, sa.Name)
if err != nil {
return err
}
context, ok := config.Contexts[config.CurrentContext]
if !ok {
return fmt.Errorf("could not extract current context from config")
}
context.AuthInfo = opts.serviceAccountName
config.Contexts = map[string]*api.Context{
config.CurrentContext: context,
}
config.AuthInfos = map[string]*api.AuthInfo{
opts.serviceAccountName: {
Token: token,
},
}
cluster := config.Clusters[context.Cluster]
if opts.apiServerAddress != "" {
cluster.Server = opts.apiServerAddress
}
config.Clusters = map[string]*api.Cluster{
context.Cluster: cluster,
}
kubeconfig, err := clientcmd.Write(config)
if err != nil {
return err
}
creds := corev1.Secret{
Type: k8s.MirrorSecretType,
TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
Namespace: opts.namespace,
},
Data: map[string][]byte{
k8s.ConfigKeyName: kubeconfig,
},
}
credsOut, err := yaml.Marshal(creds)
if err != nil {
return err
}
destinationCreds := corev1.Secret{
Type: k8s.MirrorSecretType,
TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
Namespace: controlPlaneNamespace,
Labels: map[string]string{
clusterNameLabel: opts.clusterName,
},
Annotations: map[string]string{
trustDomainAnnotation: configMap.IdentityTrustDomain,
clusterDomainAnnotation: configMap.ClusterDomain,
},
},
Data: map[string][]byte{
k8s.ConfigKeyName: kubeconfig,
},
}
destinationCredsOut, err := yaml.Marshal(destinationCreds)
if err != nil {
return err
}
remoteDiscoverySelector, err := metav1.ParseToLabelSelector(opts.remoteDiscoverySelector)
if err != nil {
return err
}
link := mc.Link{
Name: opts.clusterName,
Namespace: opts.namespace,
TargetClusterName: opts.clusterName,
TargetClusterDomain: configMap.ClusterDomain,
TargetClusterLinkerdNamespace: controlPlaneNamespace,
ClusterCredentialsSecret: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
RemoteDiscoverySelector: *remoteDiscoverySelector,
}
// If there is a gateway in the exporting cluster, populate Link
// resource with gateway information
if opts.enableGateway {
gateway, err := k.CoreV1().Services(opts.gatewayNamespace).Get(cmd.Context(), opts.gatewayName, metav1.GetOptions{})
if err != nil {
return err
}
gwAddresses := []string{}
for _, ingress := range gateway.Status.LoadBalancer.Ingress {
addr := ingress.IP
if addr == "" {
addr = ingress.Hostname
}
if addr == "" {
continue
}
gwAddresses = append(gwAddresses, addr)
}
if opts.gatewayAddresses != "" {
link.GatewayAddress = opts.gatewayAddresses
} else if len(gwAddresses) > 0 {
link.GatewayAddress = strings.Join(gwAddresses, ",")
} else {
return fmt.Errorf("Gateway %s.%s has no ingress addresses", gateway.Name, gateway.Namespace)
}
gatewayIdentity, ok := gateway.Annotations[k8s.GatewayIdentity]
if !ok || gatewayIdentity == "" {
return fmt.Errorf("Gateway %s.%s has no %s annotation", gateway.Name, gateway.Namespace, k8s.GatewayIdentity)
}
link.GatewayIdentity = gatewayIdentity
probeSpec, err := mc.ExtractProbeSpec(gateway)
if err != nil {
return err
}
link.ProbeSpec = probeSpec
gatewayPort, err := extractGatewayPort(gateway)
if err != nil {
return err
}
// Override with user provided gateway port if present
if opts.gatewayPort != 0 {
gatewayPort = opts.gatewayPort
}
link.GatewayPort = gatewayPort
selector, err := metav1.ParseToLabelSelector(opts.selector)
if err != nil {
return err
}
link.Selector = *selector
}
obj, err := link.ToUnstructured()
if err != nil {
return err
}
linkOut, err := yaml.Marshal(obj.Object)
if err != nil {
return err
}
values, err := buildServiceMirrorValues(opts)
if err != nil {
return err
}
// Create values override
valuesOverrides, err := valuesOptions.MergeValues(nil)
if err != nil {
return err
}
if opts.ha {
if valuesOverrides, err = charts.OverrideFromFile(
valuesOverrides,
static.Templates,
helmMulticlusterLinkDefaultChartName,
"values-ha.yaml",
); err != nil {
return err
}
}
serviceMirrorOut, err := renderServiceMirror(values, valuesOverrides, opts.namespace)
if err != nil {
return err
}
stdout.Write(credsOut)
stdout.Write([]byte("---\n"))
stdout.Write(destinationCredsOut)
stdout.Write([]byte("---\n"))
stdout.Write(linkOut)
stdout.Write([]byte("---\n"))
stdout.Write(serviceMirrorOut)
stdout.Write([]byte("---\n"))
return nil
},
}
flags.AddValueOptionsFlags(cmd.Flags(), &valuesOptions)
cmd.Flags().StringVar(&opts.namespace, "namespace", defaultMulticlusterNamespace, "The namespace for the service account")
cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "Cluster name")
cmd.Flags().StringVar(&opts.apiServerAddress, "api-server-address", "", "The api server address of the target cluster")
cmd.Flags().StringVar(&opts.serviceAccountName, "service-account-name", defaultServiceAccountName, "The name of the service account associated with the credentials")
cmd.Flags().StringVar(&opts.controlPlaneVersion, "control-plane-version", opts.controlPlaneVersion, "(Development) Tag to be used for the service mirror controller image")
cmd.Flags().StringVar(&opts.gatewayName, "gateway-name", defaultGatewayName, "The name of the gateway service")
cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", defaultMulticlusterNamespace, "The namespace of the gateway service")
cmd.Flags().Uint32Var(&opts.serviceMirrorRetryLimit, "service-mirror-retry-limit", opts.serviceMirrorRetryLimit, "The number of times a failed update from the target cluster is allowed to be retried")
cmd.Flags().StringVar(&opts.logLevel, "log-level", opts.logLevel, "Log level for the Multicluster components")
cmd.Flags().StringVar(&opts.logFormat, "log-format", opts.logFormat, "Log format for the Multicluster components")
cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry,
fmt.Sprintf("Docker registry to pull service mirror controller image from ($%s)", flags.EnvOverrideDockerRegistry))
cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror")
cmd.Flags().StringVar(&opts.remoteDiscoverySelector, "remote-discovery-selector", opts.remoteDiscoverySelector, "Selector (label query) to filter which services in the target cluster to mirror in remote discovery mode")
cmd.Flags().StringVar(&opts.gatewayAddresses, "gateway-addresses", opts.gatewayAddresses, "If specified, overwrites gateway addresses when gateway service is not type LoadBalancer (comma separated list)")
cmd.Flags().Uint32Var(&opts.gatewayPort, "gateway-port", opts.gatewayPort, "If specified, overwrites gateway port when gateway service is not type LoadBalancer")
cmd.Flags().BoolVar(&opts.ha, "ha", opts.ha, "Enable HA configuration for the service-mirror deployment (default false)")
cmd.Flags().BoolVar(&opts.enableGateway, "gateway", opts.enableGateway, "If false, allows a link to be created against a cluster that does not have a gateway service")
pkgcmd.ConfigureNamespaceFlagCompletion(
cmd, []string{"namespace", "gateway-namespace"},
kubeconfigPath, impersonate, impersonateGroup, kubeContext)
return cmd
}
func renderServiceMirror(values *multicluster.Values, valuesOverrides map[string]interface{}, namespace string) ([]byte, error) {
files := []*chartloader.BufferedFile{
{Name: chartutil.ChartfileName},
{Name: "templates/service-mirror.yaml"},
{Name: "templates/psp.yaml"},
{Name: "templates/gateway-mirror.yaml"},
}
var partialFiles []*chartloader.BufferedFile
for _, template := range charts.L5dPartials {
partialFiles = append(partialFiles,
&chartloader.BufferedFile{Name: template},
)
}
// Load all multicluster link chart files into buffer
if err := charts.FilesReader(static.Templates, helmMulticlusterLinkDefaultChartName+"/", files); err != nil {
return nil, err
}
// Load all partial chart files into buffer
if err := charts.FilesReader(partials.Templates, "", partialFiles); err != nil {
return nil, err
}
// Create a Chart obj from the files
chart, err := chartloader.LoadFiles(append(files, partialFiles...))
if err != nil {
return nil, err
}
// Render raw values and create chart config
rawValues, err := yaml.Marshal(values)
if err != nil {
return nil, err
}
// Store final Values generated from values.yaml and CLI flags
err = yaml.Unmarshal(rawValues, &chart.Values)
if err != nil {
return nil, err
}
vals, err := chartutil.CoalesceValues(chart, valuesOverrides)
if err != nil {
return nil, err
}
fullValues := map[string]interface{}{
"Values": vals,
"Release": map[string]interface{}{
"Namespace": namespace,
"Service": "CLI",
},
}
// Attach the final values into the `Values` field for rendering to work
renderedTemplates, err := engine.Render(chart, fullValues)
if err != nil {
return nil, err
}
// Merge templates and inject
var out bytes.Buffer
for _, tmpl := range chart.Templates {
t := path.Join(chart.Metadata.Name, tmpl.Name)
if _, err := out.WriteString(renderedTemplates[t]); err != nil {
return nil, err
}
}
return out.Bytes(), nil
}
func newLinkOptionsWithDefault() (*linkOptions, error) {
defaults, err := multicluster.NewLinkValues()
if err != nil {
return nil, err
}
return &linkOptions{
controlPlaneVersion: version.Version,
namespace: defaultMulticlusterNamespace,
dockerRegistry: pkgcmd.DefaultDockerRegistry,
serviceMirrorRetryLimit: defaults.ServiceMirrorRetryLimit,
logLevel: defaults.LogLevel,
logFormat: defaults.LogFormat,
selector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true"),
remoteDiscoverySelector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "remote-discovery"),
gatewayAddresses: "",
gatewayPort: 0,
ha: false,
enableGateway: true,
}, nil
}
func buildServiceMirrorValues(opts *linkOptions) (*multicluster.Values, error) {
if !alphaNumDashDot.MatchString(opts.controlPlaneVersion) {
return nil, fmt.Errorf("%s is not a valid version", opts.controlPlaneVersion)
}
if opts.namespace == "" {
return nil, errors.New("you need to specify a namespace")
}
if _, err := log.ParseLevel(opts.logLevel); err != nil {
return nil, fmt.Errorf("--log-level must be one of: panic, fatal, error, warn, info, debug, trace")
}
if opts.logFormat != "plain" && opts.logFormat != "json" {
return nil, fmt.Errorf("--log-format must be one of: plain, json")
}
if opts.selector != "" && opts.selector != fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true") {
if !opts.enableGateway {
return nil, fmt.Errorf("--selector and --gateway=false are mutually exclusive")
}
}
if opts.gatewayAddresses != "" && !opts.enableGateway {
return nil, fmt.Errorf("--gateway-addresses and --gateway=false are mutually exclusive")
}
if opts.gatewayPort != 0 && !opts.enableGateway {
return nil, fmt.Errorf("--gateway-port and --gateway=false are mutually exclusive")
}
defaults, err := multicluster.NewLinkValues()
if err != nil {
return nil, err
}
defaults.Gateway.Enabled = opts.enableGateway
defaults.TargetClusterName = opts.clusterName
defaults.ServiceMirrorRetryLimit = opts.serviceMirrorRetryLimit
defaults.LogLevel = opts.logLevel
defaults.LogFormat = opts.logFormat
defaults.ControllerImageVersion = opts.controlPlaneVersion
defaults.ControllerImage = fmt.Sprintf("%s/controller", opts.dockerRegistry)
return defaults, nil
}
func extractGatewayPort(gateway *corev1.Service) (uint32, error) {
for _, port := range gateway.Spec.Ports {
if port.Name == k8s.GatewayPortName {
if gateway.Spec.Type == "NodePort" {
return uint32(port.NodePort), nil
}
return uint32(port.Port), nil
}
}
return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName)
}
func extractSAToken(secrets []corev1.Secret, saName string) (string, error) {
for _, secret := range secrets {
boundSA := secret.Annotations[saNameAnnotationKey]
if saName == boundSA {
token, ok := secret.Data[tokenKey]
if !ok {
return "", fmt.Errorf("could not find the token data in service account secret %s", secret.Name)
}
return string(token), nil
}
}
return "", fmt.Errorf("could not find service account token secret for %s", saName)
}