extension: Separate multicluster chart and binary (#5293)

Fixes #5257

This branch movies mc charts and cli level code to a new
top level directory. None of the logic is changed.

Also, moves some common types into `/pkg` so that they
are accessible both to the main cli and extensions.

Signed-off-by: Tarun Pothulapati <tarunpothulapati@outlook.com>
This commit is contained in:
Tarun Pothulapati 2020-12-05 06:06:10 +05:30 committed by GitHub
parent 47a49e5ac5
commit 72a0ca974d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 1253 additions and 1090 deletions

View File

@ -439,7 +439,7 @@ run_cni-calico-deep_test() {
run_helm-deep_test() {
local tests=()
setup_helm
helm_multicluster_chart="$( cd "$bindir"/.. && pwd )"/charts/linkerd2-multicluster
helm_multicluster_chart="$( cd "$bindir"/.. && pwd )"/multicluster/charts/linkerd2-multicluster
run_test "$test_directory/install_test.go" --helm-path="$helm_path" --helm-chart="$helm_chart" \
--helm-release="$helm_release_name" --multicluster-helm-chart="$helm_multicluster_chart" \
--multicluster-helm-release="$helm_multicluster_release_name"

View File

@ -20,6 +20,7 @@ rootdir=$( cd "$bindir"/.. && pwd )
# TODO: `go generate` does not honor -mod=readonly
GO111MODULE=on go generate -mod=readonly ./pkg/charts/static
GO111MODULE=on go generate -mod=readonly ./jaeger/static
GO111MODULE=on go generate -mod=readonly ./multicluster/static
root_tag=$("$bindir"/root-tag)
GO111MODULE=on CGO_ENABLED=0 go build -o $target -tags prod -mod=readonly -ldflags "-s -w -X github.com/linkerd/linkerd2/pkg/version.Version=$root_tag" ./cli
echo "$target"

View File

@ -5,7 +5,7 @@ set -e
setValues() {
sed -i "s/$1/$2/" charts/linkerd2/values.yaml
sed -i "s/$1/$2/" charts/linkerd2-cni/values.yaml
sed -i "s/$1/$2/" charts/linkerd2-multicluster/values.yaml
sed -i "s/$1/$2/" multicluster/charts/linkerd2-multicluster/values.yaml
sed -i "s/$1/$2/" jaeger/charts/jaeger/values.yaml
}
@ -20,8 +20,8 @@ trap 'showErr' ERR
bindir=$( cd "${BASH_SOURCE[0]%/*}" && pwd )
rootdir=$( cd "$bindir"/.. && pwd )
"$bindir"/helm lint "$rootdir"/charts/linkerd2-multicluster
"$bindir"/helm lint "$rootdir"/charts/linkerd2-multicluster-link
"$bindir"/helm lint "$rootdir"/multicluster/charts/linkerd2-multicluster
"$bindir"/helm lint "$rootdir"/multicluster/charts/linkerd2-multicluster-link
"$bindir"/helm lint "$rootdir"/charts/partials
"$bindir"/helm dep up "$rootdir"/charts/linkerd2-cni
"$bindir"/helm lint "$rootdir"/charts/linkerd2-cni
@ -53,8 +53,8 @@ if [ "$1" = package ]; then
"$bindir"/helm --version "$version" --app-version "$tag" -d "$rootdir"/target/helm package "$rootdir"/charts/linkerd2
"$bindir"/helm --version "$version" --app-version "$tag" -d "$rootdir"/target/helm package "$rootdir"/charts/linkerd2-cni
"$bindir"/helm --version "$version" --app-version "$tag" -d "$rootdir"/target/helm package "$rootdir"/charts/linkerd2-multicluster
"$bindir"/helm --version "$version" --app-version "$tag" -d "$rootdir"/target/helm package "$rootdir"/charts/linkerd2-multicluster-link
"$bindir"/helm --version "$version" --app-version "$tag" -d "$rootdir"/target/helm package "$rootdir"/multicluster/charts/linkerd2-multicluster
"$bindir"/helm --version "$version" --app-version "$tag" -d "$rootdir"/target/helm package "$rootdir"/multicluster/charts/linkerd2-multicluster-link
"$bindir"/helm --version "$version" --app-version "$tag" -d "$rootdir"/target/helm package "$rootdir"/jaeger/charts/jaeger
mv "$rootdir"/target/helm/index-pre.yaml "$rootdir"/target/helm/index-pre-"$version".yaml
"$bindir"/helm repo index --url "https://helm.linkerd.io/$repo/" --merge "$rootdir"/target/helm/index-pre-"$version".yaml "$rootdir"/target/helm

View File

@ -15,6 +15,7 @@ WORKDIR /linkerd-build
COPY cli cli
COPY charts charts
COPY jaeger jaeger
COPY multicluster multicluster
COPY controller/k8s controller/k8s
COPY controller/api controller/api
@ -26,6 +27,7 @@ RUN mkdir -p /out
# TODO: `go generate` does not honor -mod=readonly
RUN go generate -mod=readonly ./pkg/charts/static
RUN go generate -mod=readonly ./jaeger/static
RUN go generate -mod=readonly ./multicluster/static
# Cache builds without version info
RUN CGO_ENABLED=0 GOOS=darwin go build -o /out/linkerd-darwin -tags prod -mod=readonly -ldflags "-s -w" ./cli

View File

@ -1,967 +0,0 @@
package cmd
import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"github.com/linkerd/linkerd2/cli/table"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/charts"
"github.com/linkerd/linkerd2/pkg/charts/linkerd2"
"github.com/linkerd/linkerd2/pkg/charts/multicluster"
mccharts "github.com/linkerd/linkerd2/pkg/charts/multicluster"
"github.com/linkerd/linkerd2/pkg/charts/static"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"github.com/linkerd/linkerd2/pkg/k8s"
mc "github.com/linkerd/linkerd2/pkg/multicluster"
"github.com/linkerd/linkerd2/pkg/version"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
chartloader "helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/chartutil"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"sigs.k8s.io/yaml"
)
const (
defaultMulticlusterNamespace = "linkerd-multicluster"
defaultGatewayName = "linkerd-gateway"
helmMulticlusterDefaultChartName = "linkerd2-multicluster"
helmMulticlusterLinkDefaultChartName = "linkerd2-multicluster-link"
tokenKey = "token"
defaultServiceAccountName = "linkerd-service-mirror-remote-access-default"
)
type (
allowOptions struct {
namespace string
serviceAccountName string
ignoreCluster bool
}
multiclusterInstallOptions struct {
gateway bool
gatewayPort uint32
gatewayProbeSeconds uint32
gatewayProbePort uint32
namespace string
gatewayNginxImage string
gatewayNginxVersion string
remoteMirrorCredentials bool
gatewayServiceType string
}
linkOptions struct {
namespace string
clusterName string
apiServerAddress string
serviceAccountName string
gatewayName string
gatewayNamespace string
serviceMirrorRetryLimit uint32
logLevel string
controlPlaneVersion string
dockerRegistry string
selector string
gatewayAddresses string
}
gatewaysOptions struct {
gatewayNamespace string
clusterName string
timeWindow string
}
)
func newMulticlusterInstallOptionsWithDefault() (*multiclusterInstallOptions, error) {
defaults, err := mccharts.NewInstallValues()
if err != nil {
return nil, err
}
return &multiclusterInstallOptions{
gateway: defaults.Gateway,
gatewayPort: defaults.GatewayPort,
gatewayProbeSeconds: defaults.GatewayProbeSeconds,
gatewayProbePort: defaults.GatewayProbePort,
namespace: defaults.Namespace,
gatewayNginxImage: defaults.GatewayNginxImage,
gatewayNginxVersion: defaults.GatewayNginxImageVersion,
remoteMirrorCredentials: true,
gatewayServiceType: defaults.GatewayServiceType,
}, nil
}
func newLinkOptionsWithDefault() (*linkOptions, error) {
defaults, err := mccharts.NewLinkValues()
if err != nil {
return nil, err
}
return &linkOptions{
controlPlaneVersion: version.Version,
namespace: defaults.Namespace,
dockerRegistry: defaultDockerRegistry,
serviceMirrorRetryLimit: defaults.ServiceMirrorRetryLimit,
logLevel: defaults.LogLevel,
selector: k8s.DefaultExportedServiceSelector,
gatewayAddresses: "",
}, nil
}
func getLinkerdConfigMap(ctx context.Context) (*linkerd2.Values, error) {
kubeAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
if err != nil {
return nil, err
}
_, values, err := healthcheck.FetchCurrentConfiguration(ctx, kubeAPI, controlPlaneNamespace)
if err != nil {
return nil, err
}
return values, nil
}
func buildServiceMirrorValues(opts *linkOptions) (*multicluster.Values, error) {
if !alphaNumDashDot.MatchString(opts.controlPlaneVersion) {
return nil, fmt.Errorf("%s is not a valid version", opts.controlPlaneVersion)
}
if opts.namespace == "" {
return nil, errors.New("you need to specify a namespace")
}
if opts.namespace == controlPlaneNamespace {
return nil, errors.New("you need to setup the multicluster addons in a namespace different than the Linkerd one")
}
if _, err := log.ParseLevel(opts.logLevel); err != nil {
return nil, fmt.Errorf("--log-level must be one of: panic, fatal, error, warn, info, debug")
}
defaults, err := mccharts.NewLinkValues()
if err != nil {
return nil, err
}
defaults.TargetClusterName = opts.clusterName
defaults.Namespace = opts.namespace
defaults.ServiceMirrorRetryLimit = opts.serviceMirrorRetryLimit
defaults.LogLevel = opts.logLevel
defaults.ControllerImageVersion = opts.controlPlaneVersion
defaults.ControllerImage = fmt.Sprintf("%s/controller", opts.dockerRegistry)
return defaults, nil
}
func buildMulticlusterInstallValues(ctx context.Context, opts *multiclusterInstallOptions) (*multicluster.Values, error) {
values, err := getLinkerdConfigMap(ctx)
if err != nil {
if kerrors.IsNotFound(err) {
return nil, errors.New("you need Linkerd to be installed in order to install multicluster addons")
}
return nil, err
}
if opts.namespace == "" {
return nil, errors.New("you need to specify a namespace")
}
if opts.namespace == controlPlaneNamespace {
return nil, errors.New("you need to setup the multicluster addons in a namespace different than the Linkerd one")
}
defaults, err := mccharts.NewInstallValues()
if err != nil {
return nil, err
}
defaults.Namespace = opts.namespace
defaults.Gateway = opts.gateway
defaults.GatewayPort = opts.gatewayPort
defaults.GatewayProbeSeconds = opts.gatewayProbeSeconds
defaults.GatewayProbePort = opts.gatewayProbePort
defaults.GatewayNginxImage = opts.gatewayNginxImage
defaults.GatewayNginxImageVersion = opts.gatewayNginxVersion
defaults.IdentityTrustDomain = values.GetGlobal().IdentityTrustDomain
defaults.LinkerdNamespace = controlPlaneNamespace
defaults.ProxyOutboundPort = uint32(values.GetGlobal().Proxy.Ports.Outbound)
defaults.LinkerdVersion = version.Version
defaults.RemoteMirrorServiceAccount = opts.remoteMirrorCredentials
defaults.GatewayServiceType = opts.gatewayServiceType
return defaults, nil
}
func buildMulticlusterAllowValues(ctx context.Context, opts *allowOptions) (*mccharts.Values, error) {
kubeAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
if err != nil {
return nil, err
}
if opts.namespace == "" {
return nil, errors.New("you need to specify a namespace")
}
if opts.serviceAccountName == "" {
return nil, errors.New("you need to specify a service account name")
}
if opts.namespace == controlPlaneNamespace {
return nil, errors.New("you need to setup the multicluster addons in a namespace different than the Linkerd one")
}
defaults, err := mccharts.NewInstallValues()
if err != nil {
return nil, err
}
defaults.Namespace = opts.namespace
defaults.LinkerdVersion = version.Version
defaults.Gateway = false
defaults.ServiceMirror = false
defaults.RemoteMirrorServiceAccount = true
defaults.RemoteMirrorServiceAccountName = opts.serviceAccountName
if !opts.ignoreCluster {
acc, err := kubeAPI.CoreV1().ServiceAccounts(defaults.Namespace).Get(ctx, defaults.RemoteMirrorServiceAccountName, metav1.GetOptions{})
if err == nil && acc != nil {
return nil, fmt.Errorf("Service account with name %s already exists, use --ignore-cluster for force operation", defaults.RemoteMirrorServiceAccountName)
}
if !kerrors.IsNotFound(err) {
return nil, err
}
}
return defaults, nil
}
func newAllowCommand() *cobra.Command {
opts := allowOptions{
namespace: defaultMulticlusterNamespace,
ignoreCluster: false,
}
cmd := &cobra.Command{
Hidden: false,
Use: "allow",
Short: "Outputs credential resources that allow service-mirror controllers to connect to this cluster",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
values, err := buildMulticlusterAllowValues(cmd.Context(), &opts)
if err != nil {
return err
}
// Render raw values and create chart config
rawValues, err := yaml.Marshal(values)
if err != nil {
return err
}
files := []*chartloader.BufferedFile{
{Name: chartutil.ChartfileName},
{Name: "templates/namespace.yaml"},
{Name: "templates/remote-access-service-mirror-rbac.yaml"},
}
chart := &charts.Chart{
Name: helmMulticlusterDefaultChartName,
Dir: helmMulticlusterDefaultChartName,
Namespace: controlPlaneNamespace,
RawValues: rawValues,
Files: files,
Fs: static.Templates,
}
buf, err := chart.RenderNoPartials()
if err != nil {
return err
}
stdout.Write(buf.Bytes())
stdout.Write([]byte("---\n"))
return nil
},
}
cmd.Flags().StringVar(&opts.namespace, "namespace", defaultMulticlusterNamespace, "The destination namespace for the service account.")
cmd.Flags().BoolVar(&opts.ignoreCluster, "ignore-cluster", false, "Ignore cluster configuration")
cmd.Flags().StringVar(&opts.serviceAccountName, "service-account-name", "", "The name of the multicluster access service account")
return cmd
}
func newGatewaysCommand() *cobra.Command {
opts := gatewaysOptions{}
cmd := &cobra.Command{
Use: "gateways",
Short: "Display stats information about the gateways in target clusters",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
req := &pb.GatewaysRequest{
RemoteClusterName: opts.clusterName,
GatewayNamespace: opts.gatewayNamespace,
TimeWindow: opts.timeWindow,
}
client := checkPublicAPIClientOrExit()
resp, err := requestGatewaysFromAPI(client, req)
if err != nil {
return err
}
renderGateways(resp.GetOk().GatewaysTable.Rows, stdout)
return nil
},
}
cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "the name of the target cluster")
cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", "", "the namespace in which the gateway resides on the target cluster")
cmd.Flags().StringVarP(&opts.timeWindow, "time-window", "t", "1m", "Time window (for example: \"15s\", \"1m\", \"10m\", \"1h\"). Needs to be at least 15s.")
return cmd
}
func newMulticlusterInstallCommand() *cobra.Command {
options, err := newMulticlusterInstallOptionsWithDefault()
if err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
os.Exit(1)
}
cmd := &cobra.Command{
Use: "install",
Short: "Output Kubernetes configs to install the Linkerd multicluster add-on",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
values, err := buildMulticlusterInstallValues(cmd.Context(), options)
if err != nil {
return err
}
// Render raw values and create chart config
rawValues, err := yaml.Marshal(values)
if err != nil {
return err
}
files := []*chartloader.BufferedFile{
{Name: chartutil.ChartfileName},
{Name: "templates/namespace.yaml"},
{Name: "templates/gateway.yaml"},
{Name: "templates/remote-access-service-mirror-rbac.yaml"},
{Name: "templates/link-crd.yaml"},
}
chart := &charts.Chart{
Name: helmMulticlusterDefaultChartName,
Dir: helmMulticlusterDefaultChartName,
Namespace: controlPlaneNamespace,
RawValues: rawValues,
Files: files,
Fs: static.Templates,
}
buf, err := chart.RenderNoPartials()
if err != nil {
return err
}
stdout.Write(buf.Bytes())
stdout.Write([]byte("---\n"))
return nil
},
}
cmd.Flags().StringVar(&options.namespace, "namespace", options.namespace, "The namespace in which the multicluster add-on is to be installed. Must not be the control plane namespace. ")
cmd.Flags().BoolVar(&options.gateway, "gateway", options.gateway, "If the gateway component should be installed")
cmd.Flags().Uint32Var(&options.gatewayPort, "gateway-port", options.gatewayPort, "The port on the gateway used for all incoming traffic")
cmd.Flags().Uint32Var(&options.gatewayProbeSeconds, "gateway-probe-seconds", options.gatewayProbeSeconds, "The interval at which the gateway will be checked for being alive in seconds")
cmd.Flags().Uint32Var(&options.gatewayProbePort, "gateway-probe-port", options.gatewayProbePort, "The liveness check port of the gateway")
cmd.Flags().StringVar(&options.gatewayNginxImage, "gateway-nginx-image", options.gatewayNginxImage, "The nginx image to be used")
cmd.Flags().StringVar(&options.gatewayNginxVersion, "gateway-nginx-image-version", options.gatewayNginxVersion, "The version of nginx to be used")
cmd.Flags().BoolVar(&options.remoteMirrorCredentials, "service-mirror-credentials", options.remoteMirrorCredentials, "Whether to install the service account which can be used by service mirror components in source clusters to discover exported services")
cmd.Flags().StringVar(&options.gatewayServiceType, "gateway-service-type", options.gatewayServiceType, "Overwrite Service type for gateway service")
// Hide developer focused flags in release builds.
release, err := version.IsReleaseChannel(version.Version)
if err != nil {
log.Errorf("Unable to parse version: %s", version.Version)
}
if release {
cmd.Flags().MarkHidden("control-plane-version")
cmd.Flags().MarkHidden("gateway-nginx-image")
cmd.Flags().MarkHidden("gateway-nginx-image-version")
}
return cmd
}
func newMulticlusterUninstallCommand() *cobra.Command {
options, err := newMulticlusterInstallOptionsWithDefault()
if err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
os.Exit(1)
}
cmd := &cobra.Command{
Use: "uninstall",
Short: "Output Kubernetes configs to uninstall the Linkerd multicluster add-on",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
rules := clientcmd.NewDefaultClientConfigLoadingRules()
rules.ExplicitPath = kubeconfigPath
loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, &clientcmd.ConfigOverrides{})
config, err := loader.RawConfig()
if err != nil {
return err
}
if kubeContext != "" {
config.CurrentContext = kubeContext
}
k, err := k8s.NewAPI(kubeconfigPath, config.CurrentContext, impersonate, impersonateGroup, 0)
if err != nil {
return err
}
links, err := mc.GetLinks(cmd.Context(), k.DynamicClient)
if err != nil {
return err
}
if len(links) > 0 {
err := []string{"Please unlink the following clusters before uninstalling multicluster:"}
for _, link := range links {
err = append(err, fmt.Sprintf(" * %s", link.TargetClusterName))
}
return errors.New(strings.Join(err, "\n"))
}
values, err := buildMulticlusterInstallValues(cmd.Context(), options)
if err != nil {
return err
}
// Render raw values and create chart config
rawValues, err := yaml.Marshal(values)
if err != nil {
return err
}
files := []*chartloader.BufferedFile{
{Name: chartutil.ChartfileName},
{Name: "templates/namespace.yaml"},
{Name: "templates/gateway.yaml"},
{Name: "templates/remote-access-service-mirror-rbac.yaml"},
{Name: "templates/link-crd.yaml"},
}
chart := &charts.Chart{
Name: helmMulticlusterDefaultChartName,
Dir: helmMulticlusterDefaultChartName,
Namespace: controlPlaneNamespace,
RawValues: rawValues,
Files: files,
Fs: static.Templates,
}
buf, err := chart.RenderNoPartials()
if err != nil {
return err
}
stdout.Write(buf.Bytes())
stdout.Write([]byte("---\n"))
return nil
},
}
cmd.Flags().StringVar(&options.namespace, "namespace", options.namespace, "The namespace in which the multicluster add-on is to be installed. Must not be the control plane namespace. ")
return cmd
}
func newLinkCommand() *cobra.Command {
opts, err := newLinkOptionsWithDefault()
if err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
os.Exit(1)
}
cmd := &cobra.Command{
Use: "link",
Short: "Outputs resources that allow another cluster to mirror services from this one",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
if opts.clusterName == "" {
return errors.New("You need to specify cluster name")
}
configMap, err := getLinkerdConfigMap(cmd.Context())
if err != nil {
if kerrors.IsNotFound(err) {
return errors.New("you need Linkerd to be installed on a cluster in order to get its credentials")
}
return err
}
rules := clientcmd.NewDefaultClientConfigLoadingRules()
rules.ExplicitPath = kubeconfigPath
loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, &clientcmd.ConfigOverrides{})
config, err := loader.RawConfig()
if err != nil {
return err
}
if kubeContext != "" {
config.CurrentContext = kubeContext
}
k, err := k8s.NewAPI(kubeconfigPath, config.CurrentContext, impersonate, impersonateGroup, 0)
if err != nil {
return err
}
sa, err := k.CoreV1().ServiceAccounts(opts.namespace).Get(cmd.Context(), opts.serviceAccountName, metav1.GetOptions{})
if err != nil {
return err
}
var secretName string
for _, s := range sa.Secrets {
if strings.HasPrefix(s.Name, fmt.Sprintf("%s-token", sa.Name)) {
secretName = s.Name
break
}
}
if secretName == "" {
return fmt.Errorf("could not find service account token secret for %s", sa.Name)
}
secret, err := k.CoreV1().Secrets(opts.namespace).Get(cmd.Context(), secretName, metav1.GetOptions{})
if err != nil {
return err
}
token, ok := secret.Data[tokenKey]
if !ok {
return fmt.Errorf("could not find the token data in the service account secret")
}
context, ok := config.Contexts[config.CurrentContext]
if !ok {
return fmt.Errorf("could not extract current context from config")
}
context.AuthInfo = opts.serviceAccountName
config.Contexts = map[string]*api.Context{
config.CurrentContext: context,
}
config.AuthInfos = map[string]*api.AuthInfo{
opts.serviceAccountName: {
Token: string(token),
},
}
cluster := config.Clusters[context.Cluster]
if opts.apiServerAddress != "" {
cluster.Server = opts.apiServerAddress
}
config.Clusters = map[string]*api.Cluster{
context.Cluster: cluster,
}
kubeconfig, err := clientcmd.Write(config)
if err != nil {
return err
}
creds := corev1.Secret{
Type: k8s.MirrorSecretType,
TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
Namespace: opts.namespace,
},
Data: map[string][]byte{
k8s.ConfigKeyName: kubeconfig,
},
}
credsOut, err := yaml.Marshal(creds)
if err != nil {
return err
}
gateway, err := k.CoreV1().Services(opts.gatewayNamespace).Get(cmd.Context(), opts.gatewayName, metav1.GetOptions{})
if err != nil {
return err
}
gatewayAddresses := ""
gwAddresses := []string{}
for _, ingress := range gateway.Status.LoadBalancer.Ingress {
addr := ingress.IP
if addr == "" {
addr = ingress.Hostname
}
if addr == "" {
continue
}
gwAddresses = append(gwAddresses, addr)
}
if len(gwAddresses) == 0 && opts.gatewayAddresses == "" {
return fmt.Errorf("Gateway %s.%s has no ingress addresses", gateway.Name, gateway.Namespace)
} else if len(gwAddresses) > 0 {
gatewayAddresses = strings.Join(gwAddresses, ",")
} else {
gatewayAddresses = opts.gatewayAddresses
}
gatewayIdentity, ok := gateway.Annotations[k8s.GatewayIdentity]
if !ok || gatewayIdentity == "" {
return fmt.Errorf("Gateway %s.%s has no %s annotation", gateway.Name, gateway.Namespace, k8s.GatewayIdentity)
}
probeSpec, err := mc.ExtractProbeSpec(gateway)
if err != nil {
return err
}
gatewayPort, err := extractGatewayPort(gateway)
if err != nil {
return err
}
selector, err := metav1.ParseToLabelSelector(opts.selector)
if err != nil {
return err
}
link := mc.Link{
Name: opts.clusterName,
Namespace: opts.namespace,
TargetClusterName: opts.clusterName,
TargetClusterDomain: configMap.GetGlobal().ClusterDomain,
TargetClusterLinkerdNamespace: controlPlaneNamespace,
ClusterCredentialsSecret: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
GatewayAddress: gatewayAddresses,
GatewayPort: gatewayPort,
GatewayIdentity: gatewayIdentity,
ProbeSpec: probeSpec,
Selector: *selector,
}
obj, err := link.ToUnstructured()
if err != nil {
return err
}
linkOut, err := yaml.Marshal(obj.Object)
if err != nil {
return err
}
values, err := buildServiceMirrorValues(opts)
if err != nil {
return err
}
// Render raw values and create chart config
rawValues, err := yaml.Marshal(values)
if err != nil {
return err
}
files := []*chartloader.BufferedFile{
{Name: chartutil.ChartfileName},
{Name: "templates/service-mirror.yaml"},
{Name: "templates/gateway-mirror.yaml"},
}
chart := &charts.Chart{
Name: helmMulticlusterLinkDefaultChartName,
Dir: helmMulticlusterLinkDefaultChartName,
Namespace: controlPlaneNamespace,
RawValues: rawValues,
Files: files,
Fs: static.Templates,
}
serviceMirrorOut, err := chart.RenderNoPartials()
if err != nil {
return err
}
stdout.Write(credsOut)
stdout.Write([]byte("---\n"))
stdout.Write(linkOut)
stdout.Write([]byte("---\n"))
stdout.Write(serviceMirrorOut.Bytes())
stdout.Write([]byte("---\n"))
return nil
},
}
cmd.Flags().StringVar(&opts.namespace, "namespace", defaultMulticlusterNamespace, "The namespace for the service account")
cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "Cluster name")
cmd.Flags().StringVar(&opts.apiServerAddress, "api-server-address", "", "The api server address of the target cluster")
cmd.Flags().StringVar(&opts.serviceAccountName, "service-account-name", defaultServiceAccountName, "The name of the service account associated with the credentials")
cmd.Flags().StringVar(&opts.controlPlaneVersion, "control-plane-version", opts.controlPlaneVersion, "(Development) Tag to be used for the service mirror controller image")
cmd.Flags().StringVar(&opts.gatewayName, "gateway-name", defaultGatewayName, "The name of the gateway service")
cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", defaultMulticlusterNamespace, "The namespace of the gateway service")
cmd.Flags().Uint32Var(&opts.serviceMirrorRetryLimit, "service-mirror-retry-limit", opts.serviceMirrorRetryLimit, "The number of times a failed update from the target cluster is allowed to be retried")
cmd.Flags().StringVar(&opts.logLevel, "log-level", opts.logLevel, "Log level for the Multicluster components")
cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry, "Docker registry to pull service mirror controller image from")
cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror")
cmd.Flags().StringVar(&opts.gatewayAddresses, "gateway-addresses", opts.gatewayAddresses, "If specified overwrites gateway addresses when gateway service is not type LoadBalancer (comma separated list)")
return cmd
}
func newUnlinkCommand() *cobra.Command {
opts, err := newLinkOptionsWithDefault()
if err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
os.Exit(1)
}
cmd := &cobra.Command{
Use: "unlink",
Short: "Outputs link resources for deletion",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
if opts.clusterName == "" {
return errors.New("You need to specify cluster name")
}
rules := clientcmd.NewDefaultClientConfigLoadingRules()
rules.ExplicitPath = kubeconfigPath
loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, &clientcmd.ConfigOverrides{})
config, err := loader.RawConfig()
if err != nil {
return err
}
if kubeContext != "" {
config.CurrentContext = kubeContext
}
k, err := k8s.NewAPI(kubeconfigPath, config.CurrentContext, impersonate, impersonateGroup, 0)
if err != nil {
return err
}
_, err = mc.GetLink(cmd.Context(), k.DynamicClient, opts.namespace, opts.clusterName)
if err != nil {
return err
}
secret := newNamespacedKubernetesResource(corev1.SchemeGroupVersion.String(), "Secret", fmt.Sprintf("cluster-credentials-%s", opts.clusterName), opts.namespace)
gatewayMirror := newNamespacedKubernetesResource(corev1.SchemeGroupVersion.String(), "Service", fmt.Sprintf("probe-gateway-%s", opts.clusterName), opts.namespace)
link := newNamespacedKubernetesResource(k8s.LinkAPIGroupVersion, "Link", opts.clusterName, opts.namespace)
clusterRole := newKubernetesResource(rbac.SchemeGroupVersion.String(), "ClusterRole", fmt.Sprintf("linkerd-service-mirror-access-local-resources-%s", opts.clusterName))
clusterRoleBinding := newKubernetesResource(rbac.SchemeGroupVersion.String(), "ClusterRoleBinding", fmt.Sprintf("linkerd-service-mirror-access-local-resources-%s", opts.clusterName))
role := newNamespacedKubernetesResource(rbac.SchemeGroupVersion.String(), "Role", fmt.Sprintf("linkerd-service-mirror-read-remote-creds-%s", opts.clusterName), opts.namespace)
roleBinding := newNamespacedKubernetesResource(rbac.SchemeGroupVersion.String(), "RoleBinding", fmt.Sprintf("linkerd-service-mirror-read-remote-creds-%s", opts.clusterName), opts.namespace)
serviceAccount := newNamespacedKubernetesResource(corev1.SchemeGroupVersion.String(), "ServiceAccount", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace)
serviceMirror := newNamespacedKubernetesResource(appsv1.SchemeGroupVersion.String(), "Deployment", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace)
resources := []kubernetesResource{
secret, gatewayMirror, link, clusterRole, clusterRoleBinding,
role, roleBinding, serviceAccount, serviceMirror,
}
selector := fmt.Sprintf("%s=%s,%s=%s",
k8s.MirroredResourceLabel, "true",
k8s.RemoteClusterNameLabel, opts.clusterName,
)
svcList, err := k.CoreV1().Services(metav1.NamespaceAll).List(cmd.Context(), metav1.ListOptions{LabelSelector: selector})
if err != nil {
return err
}
for _, svc := range svcList.Items {
resources = append(resources,
newNamespacedKubernetesResource(corev1.SchemeGroupVersion.String(), "Service", svc.Name, svc.Namespace),
)
}
for _, r := range resources {
r.renderResource(stdout)
}
return nil
},
}
cmd.Flags().StringVar(&opts.namespace, "namespace", defaultMulticlusterNamespace, "The namespace for the service account")
cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "Cluster name")
return cmd
}
func newNamespacedKubernetesResource(apiVersion, kind, name, namespace string) kubernetesResource {
return kubernetesResource{
runtime.TypeMeta{
APIVersion: apiVersion,
Kind: kind,
},
metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
}
func newCmdMulticluster() *cobra.Command {
multiclusterCmd := &cobra.Command{
Use: "multicluster [flags]",
Aliases: []string{"mc"},
Args: cobra.NoArgs,
Short: "Manages the multicluster setup for Linkerd",
Long: `Manages the multicluster setup for Linkerd.
This command provides subcommands to manage the multicluster support
functionality of Linkerd. You can use it to install the service mirror
components on a cluster, manage credentials and link clusters together.`,
Example: ` # Install multicluster addons.
linkerd --context=cluster-a multicluster install | kubectl --context=cluster-a apply -f -
# Extract mirroring cluster credentials from cluster A and install them on cluster B
linkerd --context=cluster-a multicluster link --cluster-name=target | kubectl apply --context=cluster-b -f -`,
}
multiclusterCmd.AddCommand(newLinkCommand())
multiclusterCmd.AddCommand(newUnlinkCommand())
multiclusterCmd.AddCommand(newMulticlusterInstallCommand())
multiclusterCmd.AddCommand(newMulticlusterUninstallCommand())
multiclusterCmd.AddCommand(newGatewaysCommand())
multiclusterCmd.AddCommand(newAllowCommand())
return multiclusterCmd
}
func requestGatewaysFromAPI(client pb.ApiClient, req *pb.GatewaysRequest) (*pb.GatewaysResponse, error) {
resp, err := client.Gateways(context.Background(), req)
if err != nil {
return nil, fmt.Errorf("Gateways API error: %v", err)
}
if e := resp.GetError(); e != nil {
return nil, fmt.Errorf("Gateways API response error: %v", e.Error)
}
return resp, nil
}
func renderGateways(rows []*pb.GatewaysTable_Row, w io.Writer) {
t := buildGatewaysTable()
t.Data = []table.Row{}
for _, row := range rows {
row := row // Copy to satisfy golint.
t.Data = append(t.Data, gatewaysRowToTableRow(row))
}
t.Render(w)
}
var (
clusterNameHeader = "CLUSTER"
aliveHeader = "ALIVE"
pairedServicesHeader = "NUM_SVC"
latencyP50Header = "LATENCY_P50"
latencyP95Header = "LATENCY_P95"
latencyP99Header = "LATENCY_P99"
)
func buildGatewaysTable() table.Table {
columns := []table.Column{
table.Column{
Header: clusterNameHeader,
Width: 7,
Flexible: true,
LeftAlign: true,
},
table.Column{
Header: aliveHeader,
Width: 5,
Flexible: true,
LeftAlign: true,
},
table.Column{
Header: pairedServicesHeader,
Width: 9,
},
table.Column{
Header: latencyP50Header,
Width: 11,
},
table.Column{
Header: latencyP95Header,
Width: 11,
},
table.Column{
Header: latencyP99Header,
Width: 11,
},
}
t := table.NewTable(columns, []table.Row{})
t.Sort = []int{0, 1} // Sort by namespace, then name.
return t
}
func gatewaysRowToTableRow(row *pb.GatewaysTable_Row) []string {
valueOrPlaceholder := func(value string) string {
if row.Alive {
return value
}
return "-"
}
alive := "False"
if row.Alive {
alive = "True"
}
return []string{
row.ClusterName,
alive,
fmt.Sprint(row.PairedServices),
valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP50)),
valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP95)),
valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP99)),
}
}
func extractGatewayPort(gateway *corev1.Service) (uint32, error) {
for _, port := range gateway.Spec.Ports {
if port.Name == k8s.GatewayPortName {
return uint32(port.Port), nil
}
}
return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/fatih/color"
"github.com/linkerd/linkerd2/cli/flag"
jaeger "github.com/linkerd/linkerd2/jaeger/cmd"
multicluster "github.com/linkerd/linkerd2/multicluster/cmd"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
@ -126,11 +127,11 @@ func init() {
RootCmd.AddCommand(newCmdUninject())
RootCmd.AddCommand(newCmdUpgrade())
RootCmd.AddCommand(newCmdVersion())
RootCmd.AddCommand(newCmdMulticluster())
RootCmd.AddCommand(newCmdUninstall())
// Extension Sub Commands
RootCmd.AddCommand(jaeger.NewCmdJaeger())
RootCmd.AddCommand(multicluster.NewCmdMulticluster())
}
type statOptionsBase struct {

View File

@ -3,11 +3,10 @@ package cmd
import (
"context"
"fmt"
"io"
"os"
"github.com/ghodss/yaml"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/k8s/resource"
"github.com/spf13/cobra"
admissionRegistration "k8s.io/api/admissionregistration/v1beta1"
core "k8s.io/api/core/v1"
@ -15,34 +14,15 @@ import (
rbac "k8s.io/api/rbac/v1"
apiextension "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiRegistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
const (
yamlSep = "---\n"
)
type kubernetesResource struct {
runtime.TypeMeta
metav1.ObjectMeta `json:"metadata"`
}
func newKubernetesResource(apiVersion, kind, name string) kubernetesResource {
return kubernetesResource{
runtime.TypeMeta{
APIVersion: apiVersion,
Kind: kind,
},
metav1.ObjectMeta{
Name: name,
},
}
}
func newCmdUninstall() *cobra.Command {
cmd := &cobra.Command{
Use: "uninstall",
@ -72,34 +52,19 @@ func uninstallRunE(ctx context.Context) error {
}
for _, r := range resources {
if err := r.renderResource(os.Stdout); err != nil {
if err := r.RenderResource(os.Stdout); err != nil {
return fmt.Errorf("error rendering Kubernetes resource:%v", err)
}
}
return nil
}
func (r kubernetesResource) renderResource(w io.Writer) error {
b, err := yaml.Marshal(r)
if err != nil {
return err
}
_, err = w.Write(b)
if err != nil {
return err
}
_, err = w.Write([]byte(yamlSep))
return err
}
func fetchKubernetesResources(ctx context.Context, k *k8s.KubernetesAPI) ([]kubernetesResource, error) {
func fetchKubernetesResources(ctx context.Context, k *k8s.KubernetesAPI) ([]resource.Kubernetes, error) {
options := metav1.ListOptions{
LabelSelector: k8s.ControllerNSLabel,
}
resources := make([]kubernetesResource, 0)
resources := make([]resource.Kubernetes, 0)
clusterRoles, err := fetchClusterRoles(ctx, k, options)
if err != nil {
@ -161,29 +126,29 @@ func fetchKubernetesResources(ctx context.Context, k *k8s.KubernetesAPI) ([]kube
return resources, nil
}
func fetchClusterRoles(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]kubernetesResource, error) {
func fetchClusterRoles(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]resource.Kubernetes, error) {
list, err := k.RbacV1().ClusterRoles().List(ctx, options)
if err != nil {
return nil, err
}
resources := make([]kubernetesResource, len(list.Items))
resources := make([]resource.Kubernetes, len(list.Items))
for i, item := range list.Items {
resources[i] = newKubernetesResource(rbac.SchemeGroupVersion.String(), "ClusterRole", item.Name)
resources[i] = resource.New(rbac.SchemeGroupVersion.String(), "ClusterRole", item.Name)
}
return resources, nil
}
func fetchClusterRoleBindings(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]kubernetesResource, error) {
func fetchClusterRoleBindings(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]resource.Kubernetes, error) {
list, err := k.RbacV1().ClusterRoleBindings().List(ctx, options)
if err != nil {
return nil, err
}
resources := make([]kubernetesResource, len(list.Items))
resources := make([]resource.Kubernetes, len(list.Items))
for i, item := range list.Items {
resources[i] = newKubernetesResource(rbac.SchemeGroupVersion.String(), "ClusterRoleBinding", item.Name)
resources[i] = resource.New(rbac.SchemeGroupVersion.String(), "ClusterRoleBinding", item.Name)
}
return resources, nil
@ -192,89 +157,89 @@ func fetchClusterRoleBindings(ctx context.Context, k *k8s.KubernetesAPI, options
// Although role bindings are namespaced resources in nature
// some admin role bindings are created and persisted in the kube-system namespace and will not be deleted
// when the namespace is deleted
func fetchKubeSystemRoleBindings(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]kubernetesResource, error) {
func fetchKubeSystemRoleBindings(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]resource.Kubernetes, error) {
list, err := k.RbacV1().RoleBindings("kube-system").List(ctx, options)
if err != nil {
return nil, err
}
resources := make([]kubernetesResource, len(list.Items))
resources := make([]resource.Kubernetes, len(list.Items))
for i, item := range list.Items {
r := newKubernetesResource(rbac.SchemeGroupVersion.String(), "RoleBinding", item.Name)
r := resource.New(rbac.SchemeGroupVersion.String(), "RoleBinding", item.Name)
r.Namespace = item.Namespace
resources[i] = r
}
return resources, nil
}
func fetchCustomResourceDefinitions(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]kubernetesResource, error) {
func fetchCustomResourceDefinitions(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]resource.Kubernetes, error) {
list, err := k.Apiextensions.ApiextensionsV1beta1().CustomResourceDefinitions().List(ctx, options)
if err != nil {
return nil, err
}
resources := make([]kubernetesResource, len(list.Items))
resources := make([]resource.Kubernetes, len(list.Items))
for i, item := range list.Items {
resources[i] = newKubernetesResource(apiextension.SchemeGroupVersion.String(), "CustomResourceDefinition", item.Name)
resources[i] = resource.New(apiextension.SchemeGroupVersion.String(), "CustomResourceDefinition", item.Name)
}
return resources, nil
}
func fetchNamespaceResource(ctx context.Context, k *k8s.KubernetesAPI) (kubernetesResource, error) {
func fetchNamespaceResource(ctx context.Context, k *k8s.KubernetesAPI) (resource.Kubernetes, error) {
obj, err := k.CoreV1().Namespaces().Get(ctx, controlPlaneNamespace, metav1.GetOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
return kubernetesResource{}, nil
return resource.Kubernetes{}, nil
}
return kubernetesResource{}, err
return resource.Kubernetes{}, err
}
return newKubernetesResource(core.SchemeGroupVersion.String(), "Namespace", obj.Name), nil
return resource.New(core.SchemeGroupVersion.String(), "Namespace", obj.Name), nil
}
func fetchPodSecurityPolicy(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]kubernetesResource, error) {
func fetchPodSecurityPolicy(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]resource.Kubernetes, error) {
list, err := k.PolicyV1beta1().PodSecurityPolicies().List(ctx, options)
if err != nil {
return nil, err
}
resources := make([]kubernetesResource, len(list.Items))
resources := make([]resource.Kubernetes, len(list.Items))
for i, item := range list.Items {
resources[i] = newKubernetesResource(policy.SchemeGroupVersion.String(), "PodSecurityPolicy", item.Name)
resources[i] = resource.New(policy.SchemeGroupVersion.String(), "PodSecurityPolicy", item.Name)
}
return resources, nil
}
func fetchValidatingWebhooksConfiguration(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]kubernetesResource, error) {
func fetchValidatingWebhooksConfiguration(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]resource.Kubernetes, error) {
list, err := k.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations().List(ctx, options)
if err != nil {
return nil, err
}
resources := make([]kubernetesResource, len(list.Items))
resources := make([]resource.Kubernetes, len(list.Items))
for i, item := range list.Items {
resources[i] = newKubernetesResource(admissionRegistration.SchemeGroupVersion.String(), "ValidatingWebhookConfiguration", item.Name)
resources[i] = resource.New(admissionRegistration.SchemeGroupVersion.String(), "ValidatingWebhookConfiguration", item.Name)
}
return resources, nil
}
func fetchMutatingWebhooksConfiguration(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]kubernetesResource, error) {
func fetchMutatingWebhooksConfiguration(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]resource.Kubernetes, error) {
list, err := k.AdmissionregistrationV1beta1().MutatingWebhookConfigurations().List(ctx, options)
if err != nil {
return nil, err
}
resources := make([]kubernetesResource, len(list.Items))
resources := make([]resource.Kubernetes, len(list.Items))
for i, item := range list.Items {
resources[i] = newKubernetesResource(admissionRegistration.SchemeGroupVersion.String(), "MutatingWebhookConfiguration", item.Name)
resources[i] = resource.New(admissionRegistration.SchemeGroupVersion.String(), "MutatingWebhookConfiguration", item.Name)
}
return resources, nil
}
func fetchAPIRegistrationResources(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]kubernetesResource, error) {
func fetchAPIRegistrationResources(ctx context.Context, k *k8s.KubernetesAPI, options metav1.ListOptions) ([]resource.Kubernetes, error) {
apiClient, err := apiregistrationv1client.NewForConfig(k.Config)
if err != nil {
return nil, err
@ -285,9 +250,9 @@ func fetchAPIRegistrationResources(ctx context.Context, k *k8s.KubernetesAPI, op
return nil, err
}
resources := make([]kubernetesResource, len(list.Items))
resources := make([]resource.Kubernetes, len(list.Items))
for i, item := range list.Items {
resources[i] = newKubernetesResource(apiRegistration.SchemeGroupVersion.String(), "APIService", item.Name)
resources[i] = resource.New(apiRegistration.SchemeGroupVersion.String(), "APIService", item.Name)
}
return resources, nil

View File

@ -17,6 +17,7 @@ COPY pkg pkg
COPY controller controller
COPY charts/patch charts/patch
COPY charts/partials charts/partials
COPY multicluster multicluster
# Generate static templates
# TODO: `go generate` does not honor -mod=readonly

View File

@ -4,8 +4,6 @@ import (
"fmt"
"os"
servicemirror "github.com/linkerd/linkerd2/controller/cmd/service-mirror"
"github.com/linkerd/linkerd2/controller/cmd/destination"
"github.com/linkerd/linkerd2/controller/cmd/heartbeat"
"github.com/linkerd/linkerd2/controller/cmd/identity"
@ -13,6 +11,7 @@ import (
publicapi "github.com/linkerd/linkerd2/controller/cmd/public-api"
spvalidator "github.com/linkerd/linkerd2/controller/cmd/sp-validator"
"github.com/linkerd/linkerd2/controller/cmd/tap"
servicemirror "github.com/linkerd/linkerd2/multicluster/cmd/service-mirror"
)
func main() {

View File

@ -6,26 +6,10 @@ package static
import (
"net/http"
"path"
"path/filepath"
"runtime"
"github.com/linkerd/linkerd2/pkg/charts/static"
)
// Templates that will be rendered by `linkerd install`. This is only used on
// dev builds.
var Templates http.FileSystem = http.Dir(path.Join(getRepoRoot(), "jaeger/charts"))
// getRepoRoot returns the full path to the root of the repo. We assume this
// function is only called from the `Templates` var above, and that this source
// file lives at `pkg/charts/static`, relative to the root of the repo.
func getRepoRoot() string {
// /foo/bar/linkerd2/pkg/charts/static/templates.go
_, filename, _, _ := runtime.Caller(0)
// /foo/bar/linkerd2/pkg/charts/static
dir := filepath.Dir(filename)
// filepath.Dir returns the parent directory, so that combined with joining
// ".." walks 3 levels up the tree:
// /foo/bar/linkerd2
return filepath.Dir(path.Join(dir, ".."))
}
var Templates http.FileSystem = http.Dir(path.Join(static.GetRepoRoot(), "jaeger/charts"))

127
multicluster/cmd/allow.go Normal file
View File

@ -0,0 +1,127 @@
package cmd
import (
"context"
"errors"
"fmt"
"github.com/linkerd/linkerd2/multicluster/static"
mccharts "github.com/linkerd/linkerd2/multicluster/values"
"github.com/linkerd/linkerd2/pkg/charts"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/version"
"github.com/spf13/cobra"
chartloader "helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/chartutil"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
)
type (
allowOptions struct {
namespace string
serviceAccountName string
ignoreCluster bool
}
)
func newAllowCommand() *cobra.Command {
opts := allowOptions{
namespace: defaultMulticlusterNamespace,
ignoreCluster: false,
}
cmd := &cobra.Command{
Hidden: false,
Use: "allow",
Short: "Outputs credential resources that allow service-mirror controllers to connect to this cluster",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
values, err := buildMulticlusterAllowValues(cmd.Context(), &opts)
if err != nil {
return err
}
// Render raw values and create chart config
rawValues, err := yaml.Marshal(values)
if err != nil {
return err
}
files := []*chartloader.BufferedFile{
{Name: chartutil.ChartfileName},
{Name: "templates/namespace.yaml"},
{Name: "templates/remote-access-service-mirror-rbac.yaml"},
}
chart := &charts.Chart{
Name: helmMulticlusterDefaultChartName,
Dir: helmMulticlusterDefaultChartName,
Namespace: controlPlaneNamespace,
RawValues: rawValues,
Files: files,
Fs: static.Templates,
}
buf, err := chart.RenderNoPartials()
if err != nil {
return err
}
stdout.Write(buf.Bytes())
stdout.Write([]byte("---\n"))
return nil
},
}
cmd.Flags().StringVar(&opts.namespace, "namespace", defaultMulticlusterNamespace, "The destination namespace for the service account.")
cmd.Flags().BoolVar(&opts.ignoreCluster, "ignore-cluster", false, "Ignore cluster configuration")
cmd.Flags().StringVar(&opts.serviceAccountName, "service-account-name", "", "The name of the multicluster access service account")
return cmd
}
func buildMulticlusterAllowValues(ctx context.Context, opts *allowOptions) (*mccharts.Values, error) {
kubeAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
if err != nil {
return nil, err
}
if opts.namespace == "" {
return nil, errors.New("you need to specify a namespace")
}
if opts.serviceAccountName == "" {
return nil, errors.New("you need to specify a service account name")
}
if opts.namespace == controlPlaneNamespace {
return nil, errors.New("you need to setup the multicluster addons in a namespace different than the Linkerd one")
}
defaults, err := mccharts.NewInstallValues()
if err != nil {
return nil, err
}
defaults.Namespace = opts.namespace
defaults.LinkerdVersion = version.Version
defaults.Gateway = false
defaults.ServiceMirror = false
defaults.RemoteMirrorServiceAccount = true
defaults.RemoteMirrorServiceAccountName = opts.serviceAccountName
if !opts.ignoreCluster {
acc, err := kubeAPI.CoreV1().ServiceAccounts(defaults.Namespace).Get(ctx, defaults.RemoteMirrorServiceAccountName, metav1.GetOptions{})
if err == nil && acc != nil {
return nil, fmt.Errorf("Service account with name %s already exists, use --ignore-cluster for force operation", defaults.RemoteMirrorServiceAccountName)
}
if !kerrors.IsNotFound(err) {
return nil, err
}
}
return defaults, nil
}

View File

@ -0,0 +1,163 @@
package cmd
import (
"context"
"fmt"
"io"
"github.com/linkerd/linkerd2/cli/table"
"github.com/linkerd/linkerd2/controller/api/public"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
)
type (
gatewaysOptions struct {
gatewayNamespace string
clusterName string
timeWindow string
}
)
func newGatewaysCommand() *cobra.Command {
opts := gatewaysOptions{}
cmd := &cobra.Command{
Use: "gateways",
Short: "Display stats information about the gateways in target clusters",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
req := &pb.GatewaysRequest{
RemoteClusterName: opts.clusterName,
GatewayNamespace: opts.gatewayNamespace,
TimeWindow: opts.timeWindow,
}
k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
if err != nil {
return err
}
client, err := public.NewExternalClient(cmd.Context(), controlPlaneNamespace, k8sAPI)
if err != nil {
return err
}
resp, err := requestGatewaysFromAPI(client, req)
if err != nil {
return err
}
renderGateways(resp.GetOk().GatewaysTable.Rows, stdout)
return nil
},
}
cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "the name of the target cluster")
cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", "", "the namespace in which the gateway resides on the target cluster")
cmd.Flags().StringVarP(&opts.timeWindow, "time-window", "t", "1m", "Time window (for example: \"15s\", \"1m\", \"10m\", \"1h\"). Needs to be at least 15s.")
return cmd
}
func requestGatewaysFromAPI(client pb.ApiClient, req *pb.GatewaysRequest) (*pb.GatewaysResponse, error) {
resp, err := client.Gateways(context.Background(), req)
if err != nil {
return nil, fmt.Errorf("Gateways API error: %v", err)
}
if e := resp.GetError(); e != nil {
return nil, fmt.Errorf("Gateways API response error: %v", e.Error)
}
return resp, nil
}
func renderGateways(rows []*pb.GatewaysTable_Row, w io.Writer) {
t := buildGatewaysTable()
t.Data = []table.Row{}
for _, row := range rows {
row := row // Copy to satisfy golint.
t.Data = append(t.Data, gatewaysRowToTableRow(row))
}
t.Render(w)
}
var (
clusterNameHeader = "CLUSTER"
aliveHeader = "ALIVE"
pairedServicesHeader = "NUM_SVC"
latencyP50Header = "LATENCY_P50"
latencyP95Header = "LATENCY_P95"
latencyP99Header = "LATENCY_P99"
)
func buildGatewaysTable() table.Table {
columns := []table.Column{
table.Column{
Header: clusterNameHeader,
Width: 7,
Flexible: true,
LeftAlign: true,
},
table.Column{
Header: aliveHeader,
Width: 5,
Flexible: true,
LeftAlign: true,
},
table.Column{
Header: pairedServicesHeader,
Width: 9,
},
table.Column{
Header: latencyP50Header,
Width: 11,
},
table.Column{
Header: latencyP95Header,
Width: 11,
},
table.Column{
Header: latencyP99Header,
Width: 11,
},
}
t := table.NewTable(columns, []table.Row{})
t.Sort = []int{0, 1} // Sort by namespace, then name.
return t
}
func gatewaysRowToTableRow(row *pb.GatewaysTable_Row) []string {
valueOrPlaceholder := func(value string) string {
if row.Alive {
return value
}
return "-"
}
alive := "False"
if row.Alive {
alive = "True"
}
return []string{
row.ClusterName,
alive,
fmt.Sprint(row.PairedServices),
valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP50)),
valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP95)),
valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP99)),
}
}
func extractGatewayPort(gateway *corev1.Service) (uint32, error) {
for _, port := range gateway.Spec.Ports {
if port.Name == k8s.GatewayPortName {
return uint32(port.Port), nil
}
}
return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName)
}

164
multicluster/cmd/install.go Normal file
View File

@ -0,0 +1,164 @@
package cmd
import (
"context"
"errors"
"fmt"
"os"
"github.com/linkerd/linkerd2/multicluster/static"
multicluster "github.com/linkerd/linkerd2/multicluster/values"
"github.com/linkerd/linkerd2/pkg/charts"
"github.com/linkerd/linkerd2/pkg/version"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
chartloader "helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/chartutil"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/yaml"
)
type (
multiclusterInstallOptions struct {
gateway bool
gatewayPort uint32
gatewayProbeSeconds uint32
gatewayProbePort uint32
namespace string
gatewayNginxImage string
gatewayNginxVersion string
remoteMirrorCredentials bool
}
)
func newMulticlusterInstallCommand() *cobra.Command {
options, err := newMulticlusterInstallOptionsWithDefault()
if err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
os.Exit(1)
}
cmd := &cobra.Command{
Use: "install",
Short: "Output Kubernetes configs to install the Linkerd multicluster add-on",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
values, err := buildMulticlusterInstallValues(cmd.Context(), options)
if err != nil {
return err
}
// Render raw values and create chart config
rawValues, err := yaml.Marshal(values)
if err != nil {
return err
}
files := []*chartloader.BufferedFile{
{Name: chartutil.ChartfileName},
{Name: "templates/namespace.yaml"},
{Name: "templates/gateway.yaml"},
{Name: "templates/remote-access-service-mirror-rbac.yaml"},
{Name: "templates/link-crd.yaml"},
}
chart := &charts.Chart{
Name: helmMulticlusterDefaultChartName,
Dir: helmMulticlusterDefaultChartName,
Namespace: controlPlaneNamespace,
RawValues: rawValues,
Files: files,
Fs: static.Templates,
}
buf, err := chart.RenderNoPartials()
if err != nil {
return err
}
stdout.Write(buf.Bytes())
stdout.Write([]byte("---\n"))
return nil
},
}
cmd.Flags().StringVar(&options.namespace, "namespace", options.namespace, "The namespace in which the multicluster add-on is to be installed. Must not be the control plane namespace. ")
cmd.Flags().BoolVar(&options.gateway, "gateway", options.gateway, "If the gateway component should be installed")
cmd.Flags().Uint32Var(&options.gatewayPort, "gateway-port", options.gatewayPort, "The port on the gateway used for all incoming traffic")
cmd.Flags().Uint32Var(&options.gatewayProbeSeconds, "gateway-probe-seconds", options.gatewayProbeSeconds, "The interval at which the gateway will be checked for being alive in seconds")
cmd.Flags().Uint32Var(&options.gatewayProbePort, "gateway-probe-port", options.gatewayProbePort, "The liveness check port of the gateway")
cmd.Flags().StringVar(&options.gatewayNginxImage, "gateway-nginx-image", options.gatewayNginxImage, "The nginx image to be used")
cmd.Flags().StringVar(&options.gatewayNginxVersion, "gateway-nginx-image-version", options.gatewayNginxVersion, "The version of nginx to be used")
cmd.Flags().BoolVar(&options.remoteMirrorCredentials, "service-mirror-credentials", options.remoteMirrorCredentials, "Whether to install the service account which can be used by service mirror components in source clusters to discover exported services")
// Hide developer focused flags in release builds.
release, err := version.IsReleaseChannel(version.Version)
if err != nil {
log.Errorf("Unable to parse version: %s", version.Version)
}
if release {
cmd.Flags().MarkHidden("control-plane-version")
cmd.Flags().MarkHidden("gateway-nginx-image")
cmd.Flags().MarkHidden("gateway-nginx-image-version")
}
return cmd
}
func newMulticlusterInstallOptionsWithDefault() (*multiclusterInstallOptions, error) {
defaults, err := multicluster.NewInstallValues()
if err != nil {
return nil, err
}
return &multiclusterInstallOptions{
gateway: defaults.Gateway,
gatewayPort: defaults.GatewayPort,
gatewayProbeSeconds: defaults.GatewayProbeSeconds,
gatewayProbePort: defaults.GatewayProbePort,
namespace: defaults.Namespace,
gatewayNginxImage: defaults.GatewayNginxImage,
gatewayNginxVersion: defaults.GatewayNginxImageVersion,
remoteMirrorCredentials: true,
}, nil
}
func buildMulticlusterInstallValues(ctx context.Context, opts *multiclusterInstallOptions) (*multicluster.Values, error) {
values, err := getLinkerdConfigMap(ctx)
if err != nil {
if kerrors.IsNotFound(err) {
return nil, errors.New("you need Linkerd to be installed in order to install multicluster addons")
}
return nil, err
}
if opts.namespace == "" {
return nil, errors.New("you need to specify a namespace")
}
if opts.namespace == controlPlaneNamespace {
return nil, errors.New("you need to setup the multicluster addons in a namespace different than the Linkerd one")
}
defaults, err := multicluster.NewInstallValues()
if err != nil {
return nil, err
}
defaults.Namespace = opts.namespace
defaults.Gateway = opts.gateway
defaults.GatewayPort = opts.gatewayPort
defaults.GatewayProbeSeconds = opts.gatewayProbeSeconds
defaults.GatewayProbePort = opts.gatewayProbePort
defaults.GatewayNginxImage = opts.gatewayNginxImage
defaults.GatewayNginxImageVersion = opts.gatewayNginxVersion
defaults.IdentityTrustDomain = values.GetGlobal().IdentityTrustDomain
defaults.LinkerdNamespace = controlPlaneNamespace
defaults.ProxyOutboundPort = uint32(values.GetGlobal().Proxy.Ports.Outbound)
defaults.LinkerdVersion = version.Version
defaults.RemoteMirrorServiceAccount = opts.remoteMirrorCredentials
return defaults, nil
}

325
multicluster/cmd/link.go Normal file
View File

@ -0,0 +1,325 @@
package cmd
import (
"errors"
"fmt"
"os"
"strings"
"github.com/linkerd/linkerd2/multicluster/static"
multicluster "github.com/linkerd/linkerd2/multicluster/values"
"github.com/linkerd/linkerd2/pkg/charts"
"github.com/linkerd/linkerd2/pkg/k8s"
mc "github.com/linkerd/linkerd2/pkg/multicluster"
"github.com/linkerd/linkerd2/pkg/version"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
chartloader "helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/chartutil"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"sigs.k8s.io/yaml"
)
type (
linkOptions struct {
namespace string
clusterName string
apiServerAddress string
serviceAccountName string
gatewayName string
gatewayNamespace string
serviceMirrorRetryLimit uint32
logLevel string
controlPlaneVersion string
dockerRegistry string
selector string
}
)
func newLinkCommand() *cobra.Command {
opts, err := newLinkOptionsWithDefault()
if err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
os.Exit(1)
}
cmd := &cobra.Command{
Use: "link",
Short: "Outputs resources that allow another cluster to mirror services from this one",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
if opts.clusterName == "" {
return errors.New("You need to specify cluster name")
}
configMap, err := getLinkerdConfigMap(cmd.Context())
if err != nil {
if kerrors.IsNotFound(err) {
return errors.New("you need Linkerd to be installed on a cluster in order to get its credentials")
}
return err
}
rules := clientcmd.NewDefaultClientConfigLoadingRules()
rules.ExplicitPath = kubeconfigPath
loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, &clientcmd.ConfigOverrides{})
config, err := loader.RawConfig()
if err != nil {
return err
}
if kubeContext != "" {
config.CurrentContext = kubeContext
}
k, err := k8s.NewAPI(kubeconfigPath, config.CurrentContext, impersonate, impersonateGroup, 0)
if err != nil {
return err
}
sa, err := k.CoreV1().ServiceAccounts(opts.namespace).Get(cmd.Context(), opts.serviceAccountName, metav1.GetOptions{})
if err != nil {
return err
}
var secretName string
for _, s := range sa.Secrets {
if strings.HasPrefix(s.Name, fmt.Sprintf("%s-token", sa.Name)) {
secretName = s.Name
break
}
}
if secretName == "" {
return fmt.Errorf("could not find service account token secret for %s", sa.Name)
}
secret, err := k.CoreV1().Secrets(opts.namespace).Get(cmd.Context(), secretName, metav1.GetOptions{})
if err != nil {
return err
}
token, ok := secret.Data[tokenKey]
if !ok {
return fmt.Errorf("could not find the token data in the service account secret")
}
context, ok := config.Contexts[config.CurrentContext]
if !ok {
return fmt.Errorf("could not extract current context from config")
}
context.AuthInfo = opts.serviceAccountName
config.Contexts = map[string]*api.Context{
config.CurrentContext: context,
}
config.AuthInfos = map[string]*api.AuthInfo{
opts.serviceAccountName: {
Token: string(token),
},
}
cluster := config.Clusters[context.Cluster]
if opts.apiServerAddress != "" {
cluster.Server = opts.apiServerAddress
}
config.Clusters = map[string]*api.Cluster{
context.Cluster: cluster,
}
kubeconfig, err := clientcmd.Write(config)
if err != nil {
return err
}
creds := corev1.Secret{
Type: k8s.MirrorSecretType,
TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
Namespace: opts.namespace,
},
Data: map[string][]byte{
k8s.ConfigKeyName: kubeconfig,
},
}
credsOut, err := yaml.Marshal(creds)
if err != nil {
return err
}
gateway, err := k.CoreV1().Services(opts.gatewayNamespace).Get(cmd.Context(), opts.gatewayName, metav1.GetOptions{})
if err != nil {
return err
}
gatewayAddresses := []string{}
for _, ingress := range gateway.Status.LoadBalancer.Ingress {
addr := ingress.IP
if addr == "" {
addr = ingress.Hostname
}
if addr == "" {
continue
}
gatewayAddresses = append(gatewayAddresses, addr)
}
if len(gatewayAddresses) == 0 {
return fmt.Errorf("Gateway %s.%s has no ingress addresses", gateway.Name, gateway.Namespace)
}
gatewayIdentity, ok := gateway.Annotations[k8s.GatewayIdentity]
if !ok || gatewayIdentity == "" {
return fmt.Errorf("Gateway %s.%s has no %s annotation", gateway.Name, gateway.Namespace, k8s.GatewayIdentity)
}
probeSpec, err := mc.ExtractProbeSpec(gateway)
if err != nil {
return err
}
gatewayPort, err := extractGatewayPort(gateway)
if err != nil {
return err
}
selector, err := metav1.ParseToLabelSelector(opts.selector)
if err != nil {
return err
}
link := mc.Link{
Name: opts.clusterName,
Namespace: opts.namespace,
TargetClusterName: opts.clusterName,
TargetClusterDomain: configMap.GetGlobal().ClusterDomain,
TargetClusterLinkerdNamespace: controlPlaneNamespace,
ClusterCredentialsSecret: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
GatewayAddress: strings.Join(gatewayAddresses, ","),
GatewayPort: gatewayPort,
GatewayIdentity: gatewayIdentity,
ProbeSpec: probeSpec,
Selector: *selector,
}
obj, err := link.ToUnstructured()
if err != nil {
return err
}
linkOut, err := yaml.Marshal(obj.Object)
if err != nil {
return err
}
values, err := buildServiceMirrorValues(opts)
if err != nil {
return err
}
// Render raw values and create chart config
rawValues, err := yaml.Marshal(values)
if err != nil {
return err
}
files := []*chartloader.BufferedFile{
{Name: chartutil.ChartfileName},
{Name: "templates/service-mirror.yaml"},
{Name: "templates/gateway-mirror.yaml"},
}
chart := &charts.Chart{
Name: helmMulticlusterLinkDefaultChartName,
Dir: helmMulticlusterLinkDefaultChartName,
Namespace: controlPlaneNamespace,
RawValues: rawValues,
Files: files,
Fs: static.Templates,
}
serviceMirrorOut, err := chart.RenderNoPartials()
if err != nil {
return err
}
stdout.Write(credsOut)
stdout.Write([]byte("---\n"))
stdout.Write(linkOut)
stdout.Write([]byte("---\n"))
stdout.Write(serviceMirrorOut.Bytes())
stdout.Write([]byte("---\n"))
return nil
},
}
cmd.Flags().StringVar(&opts.namespace, "namespace", defaultMulticlusterNamespace, "The namespace for the service account")
cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "Cluster name")
cmd.Flags().StringVar(&opts.apiServerAddress, "api-server-address", "", "The api server address of the target cluster")
cmd.Flags().StringVar(&opts.serviceAccountName, "service-account-name", defaultServiceAccountName, "The name of the service account associated with the credentials")
cmd.Flags().StringVar(&opts.controlPlaneVersion, "control-plane-version", opts.controlPlaneVersion, "(Development) Tag to be used for the service mirror controller image")
cmd.Flags().StringVar(&opts.gatewayName, "gateway-name", defaultGatewayName, "The name of the gateway service")
cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", defaultMulticlusterNamespace, "The namespace of the gateway service")
cmd.Flags().Uint32Var(&opts.serviceMirrorRetryLimit, "service-mirror-retry-limit", opts.serviceMirrorRetryLimit, "The number of times a failed update from the target cluster is allowed to be retried")
cmd.Flags().StringVar(&opts.logLevel, "log-level", opts.logLevel, "Log level for the Multicluster components")
cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry, "Docker registry to pull service mirror controller image from")
cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror")
return cmd
}
func newLinkOptionsWithDefault() (*linkOptions, error) {
defaults, err := multicluster.NewLinkValues()
if err != nil {
return nil, err
}
return &linkOptions{
controlPlaneVersion: version.Version,
namespace: defaults.Namespace,
dockerRegistry: defaultDockerRegistry,
serviceMirrorRetryLimit: defaults.ServiceMirrorRetryLimit,
logLevel: defaults.LogLevel,
selector: k8s.DefaultExportedServiceSelector,
}, nil
}
func buildServiceMirrorValues(opts *linkOptions) (*multicluster.Values, error) {
if !alphaNumDashDot.MatchString(opts.controlPlaneVersion) {
return nil, fmt.Errorf("%s is not a valid version", opts.controlPlaneVersion)
}
if opts.namespace == "" {
return nil, errors.New("you need to specify a namespace")
}
if opts.namespace == controlPlaneNamespace {
return nil, errors.New("you need to setup the multicluster addons in a namespace different than the Linkerd one")
}
if _, err := log.ParseLevel(opts.logLevel); err != nil {
return nil, fmt.Errorf("--log-level must be one of: panic, fatal, error, warn, info, debug")
}
defaults, err := multicluster.NewLinkValues()
if err != nil {
return nil, err
}
defaults.TargetClusterName = opts.clusterName
defaults.Namespace = opts.namespace
defaults.ServiceMirrorRetryLimit = opts.serviceMirrorRetryLimit
defaults.LogLevel = opts.logLevel
defaults.ControllerImageVersion = opts.controlPlaneVersion
defaults.ControllerImage = fmt.Sprintf("%s/controller", opts.dockerRegistry)
return defaults, nil
}

92
multicluster/cmd/root.go Normal file
View File

@ -0,0 +1,92 @@
package cmd
import (
"context"
"regexp"
"github.com/fatih/color"
"github.com/linkerd/linkerd2/pkg/charts/linkerd2"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/spf13/cobra"
)
const (
defaultDockerRegistry = "ghcr.io/linkerd"
defaultLinkerdNamespace = "linkerd"
defaultMulticlusterNamespace = "linkerd-multicluster"
defaultGatewayName = "linkerd-gateway"
helmMulticlusterDefaultChartName = "linkerd2-multicluster"
helmMulticlusterLinkDefaultChartName = "linkerd2-multicluster-link"
tokenKey = "token"
defaultServiceAccountName = "linkerd-service-mirror-remote-access-default"
)
var (
apiAddr string // An empty value means "use the Kubernetes configuration"
controlPlaneNamespace string
kubeconfigPath string
kubeContext string
impersonate string
impersonateGroup []string
verbose bool
// special handling for Windows, on all other platforms these resolve to
// os.Stdout and os.Stderr, thanks to https://github.com/mattn/go-colorable
stdout = color.Output
// These regexs are not as strict as they could be, but are a quick and dirty
// sanity check against illegal characters.
alphaNumDashDot = regexp.MustCompile(`^[\.a-zA-Z0-9-]+$`)
)
// NewCmdMulticluster returns a new multicluster command
func NewCmdMulticluster() *cobra.Command {
multiclusterCmd := &cobra.Command{
Use: "multicluster [flags]",
Aliases: []string{"mc"},
Args: cobra.NoArgs,
Short: "Manages the multicluster setup for Linkerd",
Long: `Manages the multicluster setup for Linkerd.
This command provides subcommands to manage the multicluster support
functionality of Linkerd. You can use it to install the service mirror
components on a cluster, manage credentials and link clusters together.`,
Example: ` # Install multicluster addons.
linkerd --context=cluster-a multicluster install | kubectl --context=cluster-a apply -f -
# Extract mirroring cluster credentials from cluster A and install them on cluster B
linkerd --context=cluster-a multicluster link --cluster-name=target | kubectl apply --context=cluster-b -f -`,
}
multiclusterCmd.PersistentFlags().StringVarP(&controlPlaneNamespace, "linkerd-namespace", "L", defaultLinkerdNamespace, "Namespace in which Linkerd is installed")
multiclusterCmd.PersistentFlags().StringVar(&kubeconfigPath, "kubeconfig", "", "Path to the kubeconfig file to use for CLI requests")
multiclusterCmd.PersistentFlags().StringVar(&kubeContext, "context", "", "Name of the kubeconfig context to use")
multiclusterCmd.PersistentFlags().StringVar(&impersonate, "as", "", "Username to impersonate for Kubernetes operations")
multiclusterCmd.PersistentFlags().StringArrayVar(&impersonateGroup, "as-group", []string{}, "Group to impersonate for Kubernetes operations")
multiclusterCmd.PersistentFlags().StringVar(&apiAddr, "api-addr", "", "Override kubeconfig and communicate directly with the control plane at host:port (mostly for testing)")
multiclusterCmd.PersistentFlags().BoolVar(&verbose, "verbose", false, "Turn on debug logging")
multiclusterCmd.AddCommand(newLinkCommand())
multiclusterCmd.AddCommand(newUnlinkCommand())
multiclusterCmd.AddCommand(newMulticlusterInstallCommand())
multiclusterCmd.AddCommand(newMulticlusterUninstallCommand())
multiclusterCmd.AddCommand(newGatewaysCommand())
multiclusterCmd.AddCommand(newAllowCommand())
return multiclusterCmd
}
func getLinkerdConfigMap(ctx context.Context) (*linkerd2.Values, error) {
kubeAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
if err != nil {
return nil, err
}
_, values, err := healthcheck.FetchCurrentConfiguration(ctx, kubeAPI, controlPlaneNamespace)
if err != nil {
return nil, err
}
return values, nil
}

View File

@ -9,19 +9,18 @@ import (
"syscall"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
dynamic "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/clientcmd"
controllerK8s "github.com/linkerd/linkerd2/controller/k8s"
servicemirror "github.com/linkerd/linkerd2/controller/service-mirror"
servicemirror "github.com/linkerd/linkerd2/multicluster/service-mirror"
"github.com/linkerd/linkerd2/pkg/admin"
"github.com/linkerd/linkerd2/pkg/flags"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/multicluster"
sm "github.com/linkerd/linkerd2/pkg/servicemirror"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
dynamic "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/clientcmd"
)
const linkWatchRestartAfter = 10 * time.Second

View File

@ -0,0 +1,105 @@
package cmd
import (
"errors"
"fmt"
"os"
"strings"
"github.com/linkerd/linkerd2/multicluster/static"
"github.com/linkerd/linkerd2/pkg/charts"
"github.com/linkerd/linkerd2/pkg/k8s"
mc "github.com/linkerd/linkerd2/pkg/multicluster"
"github.com/spf13/cobra"
chartloader "helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/chartutil"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/yaml"
)
func newMulticlusterUninstallCommand() *cobra.Command {
options, err := newMulticlusterInstallOptionsWithDefault()
if err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
os.Exit(1)
}
cmd := &cobra.Command{
Use: "uninstall",
Short: "Output Kubernetes configs to uninstall the Linkerd multicluster add-on",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
rules := clientcmd.NewDefaultClientConfigLoadingRules()
rules.ExplicitPath = kubeconfigPath
loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, &clientcmd.ConfigOverrides{})
config, err := loader.RawConfig()
if err != nil {
return err
}
if kubeContext != "" {
config.CurrentContext = kubeContext
}
k, err := k8s.NewAPI(kubeconfigPath, config.CurrentContext, impersonate, impersonateGroup, 0)
if err != nil {
return err
}
links, err := mc.GetLinks(cmd.Context(), k.DynamicClient)
if err != nil {
return err
}
if len(links) > 0 {
err := []string{"Please unlink the following clusters before uninstalling multicluster:"}
for _, link := range links {
err = append(err, fmt.Sprintf(" * %s", link.TargetClusterName))
}
return errors.New(strings.Join(err, "\n"))
}
values, err := buildMulticlusterInstallValues(cmd.Context(), options)
if err != nil {
return err
}
// Render raw values and create chart config
rawValues, err := yaml.Marshal(values)
if err != nil {
return err
}
files := []*chartloader.BufferedFile{
{Name: chartutil.ChartfileName},
{Name: "templates/namespace.yaml"},
{Name: "templates/gateway.yaml"},
{Name: "templates/remote-access-service-mirror-rbac.yaml"},
{Name: "templates/link-crd.yaml"},
}
chart := &charts.Chart{
Name: helmMulticlusterDefaultChartName,
Dir: helmMulticlusterDefaultChartName,
Namespace: controlPlaneNamespace,
RawValues: rawValues,
Files: files,
Fs: static.Templates,
}
buf, err := chart.RenderNoPartials()
if err != nil {
return err
}
stdout.Write(buf.Bytes())
stdout.Write([]byte("---\n"))
return nil
},
}
cmd.Flags().StringVar(&options.namespace, "namespace", options.namespace, "The namespace in which the multicluster add-on is to be installed. Must not be the control plane namespace. ")
return cmd
}

View File

@ -0,0 +1,99 @@
package cmd
import (
"errors"
"fmt"
"os"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/k8s/resource"
mc "github.com/linkerd/linkerd2/pkg/multicluster"
"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
)
func newUnlinkCommand() *cobra.Command {
opts, err := newLinkOptionsWithDefault()
if err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
os.Exit(1)
}
cmd := &cobra.Command{
Use: "unlink",
Short: "Outputs link resources for deletion",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
if opts.clusterName == "" {
return errors.New("You need to specify cluster name")
}
rules := clientcmd.NewDefaultClientConfigLoadingRules()
rules.ExplicitPath = kubeconfigPath
loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, &clientcmd.ConfigOverrides{})
config, err := loader.RawConfig()
if err != nil {
return err
}
if kubeContext != "" {
config.CurrentContext = kubeContext
}
k, err := k8s.NewAPI(kubeconfigPath, config.CurrentContext, impersonate, impersonateGroup, 0)
if err != nil {
return err
}
_, err = mc.GetLink(cmd.Context(), k.DynamicClient, opts.namespace, opts.clusterName)
if err != nil {
return err
}
secret := resource.NewNamespaced(corev1.SchemeGroupVersion.String(), "Secret", fmt.Sprintf("cluster-credentials-%s", opts.clusterName), opts.namespace)
gatewayMirror := resource.NewNamespaced(corev1.SchemeGroupVersion.String(), "Service", fmt.Sprintf("probe-gateway-%s", opts.clusterName), opts.namespace)
link := resource.NewNamespaced(k8s.LinkAPIGroupVersion, "Link", opts.clusterName, opts.namespace)
clusterRole := resource.New(rbac.SchemeGroupVersion.String(), "ClusterRole", fmt.Sprintf("linkerd-service-mirror-access-local-resources-%s", opts.clusterName))
clusterRoleBinding := resource.New(rbac.SchemeGroupVersion.String(), "ClusterRoleBinding", fmt.Sprintf("linkerd-service-mirror-access-local-resources-%s", opts.clusterName))
role := resource.NewNamespaced(rbac.SchemeGroupVersion.String(), "Role", fmt.Sprintf("linkerd-service-mirror-read-remote-creds-%s", opts.clusterName), opts.namespace)
roleBinding := resource.NewNamespaced(rbac.SchemeGroupVersion.String(), "RoleBinding", fmt.Sprintf("linkerd-service-mirror-read-remote-creds-%s", opts.clusterName), opts.namespace)
serviceAccount := resource.NewNamespaced(corev1.SchemeGroupVersion.String(), "ServiceAccount", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace)
serviceMirror := resource.NewNamespaced(appsv1.SchemeGroupVersion.String(), "Deployment", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace)
resources := []resource.Kubernetes{
secret, gatewayMirror, link, clusterRole, clusterRoleBinding,
role, roleBinding, serviceAccount, serviceMirror,
}
selector := fmt.Sprintf("%s=%s,%s=%s",
k8s.MirroredResourceLabel, "true",
k8s.RemoteClusterNameLabel, opts.clusterName,
)
svcList, err := k.CoreV1().Services(metav1.NamespaceAll).List(cmd.Context(), metav1.ListOptions{LabelSelector: selector})
if err != nil {
return err
}
for _, svc := range svcList.Items {
resources = append(resources,
resource.NewNamespaced(corev1.SchemeGroupVersion.String(), "Service", svc.Name, svc.Namespace),
)
}
for _, r := range resources {
r.RenderResource(stdout)
}
return nil
},
}
cmd.Flags().StringVar(&opts.namespace, "namespace", defaultMulticlusterNamespace, "The namespace for the service account")
cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "Cluster name")
return cmd
}

View File

@ -0,0 +1,21 @@
// +build ignore
package main
import (
"github.com/linkerd/linkerd2/multicluster/static"
"github.com/shurcooL/vfsgen"
log "github.com/sirupsen/logrus"
)
func main() {
err := vfsgen.Generate(static.Templates, vfsgen.Options{
Filename: "generated_multicluster_templates.gogen.go",
PackageName: "static",
BuildTags: "prod",
VariableName: "Templates",
})
if err != nil {
log.Fatalln(err)
}
}

View File

@ -0,0 +1,15 @@
//go:generate go run generate.go
// +build !prod
package static
import (
"net/http"
"path"
"github.com/linkerd/linkerd2/pkg/charts/static"
)
// Templates that will be rendered by `linkerd install`. This is only used on
// dev builds.
var Templates http.FileSystem = http.Dir(path.Join(static.GetRepoRoot(), "multicluster/charts"))

View File

@ -1,10 +1,10 @@
package multicluster
package values
import (
"fmt"
"github.com/linkerd/linkerd2/multicluster/static"
"github.com/linkerd/linkerd2/pkg/charts"
"github.com/linkerd/linkerd2/pkg/charts/static"
"github.com/linkerd/linkerd2/pkg/k8s"
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/chartutil"

View File

@ -0,0 +1,23 @@
package static
import (
"path"
"path/filepath"
"runtime"
)
// GetRepoRoot returns the full path to the root of the repo. We assume this
// function is only called from the `Templates` var above, and that this source
// file lives at `pkg/charts/static`, relative to the root of the repo.
func GetRepoRoot() string {
// /foo/bar/linkerd2/pkg/charts/static/templates.go
_, filename, _, _ := runtime.Caller(0)
// /foo/bar/linkerd2/pkg/charts/static
dir := filepath.Dir(filename)
// filepath.Dir returns the parent directory, so that combined with joining
// ".." walks 3 levels up the tree:
// /foo/bar/linkerd2
return filepath.Dir(path.Join(dir, "../.."))
}

View File

@ -6,26 +6,8 @@ package static
import (
"net/http"
"path"
"path/filepath"
"runtime"
)
// Templates that will be rendered by `linkerd install`. This is only used on
// dev builds.
var Templates http.FileSystem = http.Dir(path.Join(getRepoRoot(), "charts"))
// getRepoRoot returns the full path to the root of the repo. We assume this
// function is only called from the `Templates` var above, and that this source
// file lives at `pkg/charts/static`, relative to the root of the repo.
func getRepoRoot() string {
// /foo/bar/linkerd2/pkg/charts/static/templates.go
_, filename, _, _ := runtime.Caller(0)
// /foo/bar/linkerd2/pkg/charts/static
dir := filepath.Dir(filename)
// filepath.Dir returns the parent directory, so that combined with joining
// ".." walks 3 levels up the tree:
// /foo/bar/linkerd2
return filepath.Dir(path.Join(dir, "../.."))
}
var Templates http.FileSystem = http.Dir(path.Join(GetRepoRoot(), "charts"))

View File

@ -0,0 +1,62 @@
package resource
import (
"io"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/yaml"
)
const (
yamlSep = "---\n"
)
// Kubernetes is a parent object used to generalize all k8s types
type Kubernetes struct {
runtime.TypeMeta
metav1.ObjectMeta `json:"metadata"`
}
// New returns a kubernetes resource with the given data
func New(apiVersion, kind, name string) Kubernetes {
return Kubernetes{
runtime.TypeMeta{
APIVersion: apiVersion,
Kind: kind,
},
metav1.ObjectMeta{
Name: name,
},
}
}
// NewNamespaced returns a namespace scoped kubernetes resource with the given data
func NewNamespaced(apiVersion, kind, name, namespace string) Kubernetes {
return Kubernetes{
runtime.TypeMeta{
APIVersion: apiVersion,
Kind: kind,
},
metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
}
// RenderResource renders a kuberetes object as a yaml object
func (r Kubernetes) RenderResource(w io.Writer) error {
b, err := yaml.Marshal(r)
if err != nil {
return err
}
_, err = w.Write(b)
if err != nil {
return err
}
_, err = w.Write([]byte(yamlSep))
return err
}