Provide peer Identities via the Destination API (#2537)

This change reintroduces identity hinting to the destination service.
The Get endpoint includes identities for pods that are injected with an
identity-mode of "default" and have the same linkerd control plane.

A `serviceaccount` label is now also added to destination response
metadata so that it's accessible in prometheus and tap.
This commit is contained in:
Oliver Gould 2019-03-22 09:19:14 -07:00 committed by GitHub
parent 34ea302a32
commit da0330743f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 206 additions and 58 deletions

View File

@ -136,6 +136,9 @@ spec:
containerPort: 8086
- name: admin-http
containerPort: 9996
volumeMounts:
- name: config
mountPath: /var/run/linkerd/config
image: {{.Values.ControllerImage}}
imagePullPolicy: {{.Values.ImagePullPolicy}}
args:

View File

@ -416,6 +416,9 @@ spec:
resources: {}
securityContext:
runAsUser: 2103
volumeMounts:
- mountPath: /var/run/linkerd/config
name: config
- args:
- tap
- -controller-namespace=linkerd

View File

@ -428,6 +428,9 @@ spec:
memory: 50Mi
securityContext:
runAsUser: 2103
volumeMounts:
- mountPath: /var/run/linkerd/config
name: config
- args:
- tap
- -controller-namespace=linkerd

View File

@ -428,6 +428,9 @@ spec:
memory: 50Mi
securityContext:
runAsUser: 2103
volumeMounts:
- mountPath: /var/run/linkerd/config
name: config
- args:
- tap
- -controller-namespace=linkerd

View File

@ -392,6 +392,9 @@ spec:
resources: {}
securityContext:
runAsUser: 2103
volumeMounts:
- mountPath: /var/run/linkerd/config
name: config
- args:
- tap
- -controller-namespace=linkerd

View File

@ -394,6 +394,9 @@ spec:
resources: {}
securityContext:
runAsUser: 2103
volumeMounts:
- mountPath: /var/run/linkerd/config
name: config
- args:
- tap
- -controller-namespace=linkerd

View File

@ -384,6 +384,9 @@ spec:
resources: {}
securityContext:
runAsUser: 2103
volumeMounts:
- mountPath: /var/run/linkerd/config
name: config
- args:
- tap
- -controller-namespace=Namespace

View File

@ -94,7 +94,8 @@ func diffUpdateAddresses(oldAddrs, newAddrs []*updateAddress) ([]*updateAddress,
// implements the endpointUpdateListener interface
type endpointListener struct {
controllerNS string
controllerNS,
identityTrustDomain string
stream pb.Destination_GetServer
ownerKindAndName ownerKindAndNameFn
labels map[string]string
@ -107,15 +108,16 @@ func newEndpointListener(
stream pb.Destination_GetServer,
ownerKindAndName ownerKindAndNameFn,
enableH2Upgrade bool,
controllerNS string,
controllerNS, identityTrustDomain string,
) *endpointListener {
return &endpointListener{
controllerNS: controllerNS,
stream: stream,
ownerKindAndName: ownerKindAndName,
labels: make(map[string]string),
enableH2Upgrade: enableH2Upgrade,
stopCh: make(chan struct{}),
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
stream: stream,
ownerKindAndName: ownerKindAndName,
labels: make(map[string]string),
enableH2Upgrade: enableH2Upgrade,
stopCh: make(chan struct{}),
log: log.WithFields(log.Fields{
"component": "endpoint-listener",
}),
@ -213,18 +215,14 @@ func (l *endpointListener) toWeightedAddr(address *updateAddress) *pb.WeightedAd
}
}
// TODO: restore TLS identities
//nolint
func (l *endpointListener) getAddrMetadata(pod *corev1.Pod) (map[string]string, *pb.ProtocolHint, *pb.TlsIdentity) {
controllerNS := pod.Labels[pkgK8s.ControllerNSLabel]
ownerKind, ownerName := l.ownerKindAndName(pod)
labels := pkgK8s.GetPodLabels(ownerKind, ownerName, pod)
sa, ns := pkgK8s.GetServiceAccountAndNS(pod)
ok, on := l.ownerKindAndName(pod)
labels := pkgK8s.GetPodLabels(ok, on, pod)
// If the pod is controlled by us, then it can be hinted that this destination
// knows H2 (and handles our orig-proto translation). Note that this check
// does not verify that the pod's control plane matches the control plane
// where the destination service is running; all pods injected for all control
// planes are considered valid for providing the H2 hint.
// If the pod is controlled by any Linkerd control plane, then it can be hinted
// that this destination knows H2 (and handles our orig-proto translation).
var hint *pb.ProtocolHint
if l.enableH2Upgrade && controllerNS != "" {
hint = &pb.ProtocolHint{
@ -234,5 +232,25 @@ func (l *endpointListener) getAddrMetadata(pod *corev1.Pod) (map[string]string,
}
}
return labels, hint, nil
// If the pod is controlled by the same Linkerd control plane, then it can
// participate in identity with peers.
//
// TODO this should be relaxed to match a trust domain annotation so that
// multiple meshes can participate in identity if they share trust roots.
var identity *pb.TlsIdentity
if l.identityTrustDomain != "" &&
controllerNS == l.controllerNS &&
pod.Annotations[pkgK8s.IdentityModeAnnotation] == pkgK8s.IdentityModeDefault {
id := fmt.Sprintf("%s.%s.serviceaccount.identity.%s.%s", sa, ns, controllerNS, l.identityTrustDomain)
identity = &pb.TlsIdentity{
Strategy: &pb.TlsIdentity_DnsLikeIdentity_{
DnsLikeIdentity: &pb.TlsIdentity_DnsLikeIdentity{
Name: id,
},
},
}
}
return labels, hint, identity
}

View File

@ -70,8 +70,7 @@ func TestEndpointListener(t *testing.T) {
listener := newEndpointListener(
mockGetServer,
defaultOwnerKindAndName,
false,
"linkerd",
false, "linkerd", "",
)
listener.Update(add, remove)
@ -88,8 +87,7 @@ func TestEndpointListener(t *testing.T) {
listener := newEndpointListener(
mockGetServer,
defaultOwnerKindAndName,
false,
"linkerd",
false, "linkerd", "",
)
listener.Update(add, remove)
@ -129,8 +127,7 @@ func TestEndpointListener(t *testing.T) {
listener := newEndpointListener(
mockGetServer,
defaultOwnerKindAndName,
false,
"linkerd",
false, "linkerd", "",
)
completed := make(chan bool)
@ -153,6 +150,7 @@ func TestEndpointListener(t *testing.T) {
expectedPodName := pod1.Name
expectedNamespace := thisNS
expectedReplicationControllerName := "rc-name"
expectedServiceAccountName := "serviceaccount-name"
podForAddedAddress1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -162,6 +160,9 @@ func TestEndpointListener(t *testing.T) {
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
Spec: corev1.PodSpec{
ServiceAccountName: expectedServiceAccountName,
},
}
ownerKindAndName := func(pod *corev1.Pod) (string, string) {
@ -172,8 +173,7 @@ func TestEndpointListener(t *testing.T) {
listener := newEndpointListener(
mockGetServer,
ownerKindAndName,
false,
"linkerd",
false, "linkerd", "",
)
listener.labels = map[string]string{
"service": expectedServiceName,
@ -195,6 +195,7 @@ func TestEndpointListener(t *testing.T) {
expectedAddedAddress1MetricLabels := map[string]string{
"pod": expectedPodName,
"replicationcontroller": expectedReplicationControllerName,
"serviceaccount": expectedServiceAccountName,
}
if !reflect.DeepEqual(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels) {
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels)
@ -202,19 +203,20 @@ func TestEndpointListener(t *testing.T) {
})
t.Run("Sends TlsIdentity when enabled", func(t *testing.T) {
t.Skip("TLS is currently disabled")
expectedPodName := pod1.Name
expectedPodNamespace := thisNS
expectedControllerNamespace := "linkerd-namespace"
expectedPodDeployment := podDeployment
//expectedTLSIdentity := nil
expectedTLSIdentity := &pb.TlsIdentity_DnsLikeIdentity{
Name: "this-serviceaccount.this-namespace.serviceaccount.identity.linkerd-namespace.trust.domain",
}
podForAddedAddress1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: expectedPodName,
Namespace: expectedPodNamespace,
Annotations: map[string]string{
pkgK8s.IdentityModeAnnotation: "FIXME",
pkgK8s.IdentityModeAnnotation: pkgK8s.IdentityModeDefault,
},
Labels: map[string]string{
pkgK8s.ControllerNSLabel: expectedControllerNamespace,
@ -224,6 +226,9 @@ func TestEndpointListener(t *testing.T) {
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
Spec: corev1.PodSpec{
ServiceAccountName: "this-serviceaccount",
},
}
ownerKindAndName := func(pod *corev1.Pod) (string, string) {
@ -236,6 +241,7 @@ func TestEndpointListener(t *testing.T) {
ownerKindAndName,
false,
expectedControllerNamespace,
"trust.domain",
)
add := []*updateAddress{
@ -248,14 +254,13 @@ func TestEndpointListener(t *testing.T) {
t.Fatalf("Expected [1] address returned, got %v", addrs)
}
// actualTLSIdentity := addrs[0].GetTlsIdentity().GetK8SPodIdentity()
// if !reflect.DeepEqual(actualTLSIdentity, expectedTLSIdentity) {
// t.Fatalf("Expected TlsIdentity to be [%v] but was [%v]", expectedTLSIdentity, actualTLSIdentity)
// }
actualTLSIdentity := addrs[0].GetTlsIdentity().GetDnsLikeIdentity()
if !reflect.DeepEqual(actualTLSIdentity, expectedTLSIdentity) {
t.Fatalf("Expected TlsIdentity to be [%v] but was [%v]", expectedTLSIdentity, actualTLSIdentity)
}
})
t.Run("Does not send TlsIdentity for other meshes", func(t *testing.T) {
t.Skip("TLS is currently disabled")
t.Run("Does not send TlsIdentity for non-default identity-modes", func(t *testing.T) {
expectedPodName := "pod1"
expectedPodNamespace := thisNS
expectedControllerNamespace := "other-linkerd-namespace"
@ -266,7 +271,58 @@ func TestEndpointListener(t *testing.T) {
Name: expectedPodName,
Namespace: expectedPodNamespace,
Annotations: map[string]string{
pkgK8s.IdentityModeAnnotation: "FIXME",
pkgK8s.IdentityModeAnnotation: "optional",
},
Labels: map[string]string{
pkgK8s.ControllerNSLabel: expectedControllerNamespace,
pkgK8s.ProxyDeploymentLabel: expectedPodDeployment,
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}
ownerKindAndName := func(pod *v1.Pod) (string, string) {
return deploymentKind, expectedPodDeployment
}
mockGetServer := &mockDestinationGetServer{updatesReceived: []*pb.Update{}}
listener := newEndpointListener(
mockGetServer,
ownerKindAndName,
false,
expectedControllerNamespace,
"trust.domain",
)
add := []*updateAddress{
{address: addedAddress1, pod: podForAddedAddress1},
}
listener.Update(add, nil)
addrs := mockGetServer.updatesReceived[0].GetAdd().GetAddrs()
if len(addrs) != 1 {
t.Fatalf("Expected [1] address returned, got %v", addrs)
}
if addrs[0].TlsIdentity != nil {
t.Fatalf("Expected no TlsIdentity to be sent, but got [%v]", addrs[0].TlsIdentity)
}
})
t.Run("Does not send TlsIdentity for other meshes", func(t *testing.T) {
expectedPodName := "pod1"
expectedPodNamespace := thisNS
expectedControllerNamespace := "other-linkerd-namespace"
expectedPodDeployment := podDeployment
podForAddedAddress1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: expectedPodName,
Namespace: expectedPodNamespace,
Annotations: map[string]string{
pkgK8s.IdentityModeAnnotation: pkgK8s.IdentityModeDefault,
},
Labels: map[string]string{
pkgK8s.ControllerNSLabel: expectedControllerNamespace,
@ -288,6 +344,7 @@ func TestEndpointListener(t *testing.T) {
ownerKindAndName,
false,
"linkerd-namespace",
"trust.domain",
)
add := []*updateAddress{
@ -334,7 +391,8 @@ func TestEndpointListener(t *testing.T) {
mockGetServer,
ownerKindAndName,
false,
"linkerd",
expectedControllerNamespace,
"",
)
add := []*updateAddress{

View File

@ -20,8 +20,9 @@ type server struct {
k8sAPI *k8s.API
resolver streamingDestinationResolver
enableH2Upgrade bool
controllerNS string
log *log.Entry
controllerNS,
identityTrustDomain string
log *log.Entry
}
// NewServer returns a new instance of the destination server.
@ -38,7 +39,7 @@ type server struct {
// API.
func NewServer(
addr, k8sDNSZone string,
controllerNS string,
controllerNS, identityTrustDomain string,
enableH2Upgrade bool,
k8sAPI *k8s.API,
done chan struct{},
@ -49,10 +50,11 @@ func NewServer(
}
srv := server{
k8sAPI: k8sAPI,
resolver: resolver,
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
k8sAPI: k8sAPI,
resolver: resolver,
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
log: log.WithFields(log.Fields{
"addr": addr,
"component": "server",
@ -161,7 +163,7 @@ func (s *server) Endpoints(ctx context.Context, params *discoveryPb.EndpointsPar
}
func (s *server) streamResolution(host string, port int, stream pb.Destination_GetServer) error {
listener := newEndpointListener(stream, s.k8sAPI.GetOwnerKindAndName, s.enableH2Upgrade, s.controllerNS)
listener := newEndpointListener(stream, s.k8sAPI.GetOwnerKindAndName, s.enableH2Upgrade, s.controllerNS, s.identityTrustDomain)
resolverCanResolve, err := s.resolver.canResolve(host, port)
if err != nil {

View File

@ -157,7 +157,7 @@ func TestEndpoints(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
gRPCServer, err := NewServer(
"fake-addr", "", "controller-ns",
"fake-addr", "", "controller-ns", "",
false, k8sAPI, nil,
)
if err != nil {

View File

@ -10,7 +10,9 @@ import (
"github.com/linkerd/linkerd2/controller/api/destination"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/admin"
"github.com/linkerd/linkerd2/pkg/config"
"github.com/linkerd/linkerd2/pkg/flags"
consts "github.com/linkerd/linkerd2/pkg/k8s"
log "github.com/sirupsen/logrus"
)
@ -20,6 +22,7 @@ func main() {
kubeConfigPath := flag.String("kubeconfig", "", "path to kube config")
k8sDNSZone := flag.String("kubernetes-dns-zone", "", "The DNS suffix for the local Kubernetes zone.")
enableH2Upgrade := flag.Bool("enable-h2-upgrade", true, "Enable transparently upgraded HTTP2 connections among pods in the service mesh")
disableIdentity := flag.Bool("disable-identity", false, "Disable identity configuration")
controllerNamespace := flag.String("controller-namespace", "linkerd", "namespace in which Linkerd is installed")
flags.ConfigureAndParse()
@ -41,7 +44,27 @@ func main() {
log.Fatalf("Failed to listen on %s: %s", *addr, err)
}
server, err := destination.NewServer(*addr, *k8sDNSZone, *controllerNamespace, *enableH2Upgrade, k8sAPI, done)
trustDomain := ""
if *disableIdentity {
log.Info("Identity is disabled")
} else {
global, err := config.Global(consts.MountPathGlobalConfig)
if err != nil {
log.Fatalf("Failed to load global config: %s", err)
}
trustDomain = global.GetIdentityContext().GetTrustDomain()
}
server, err := destination.NewServer(
*addr,
*k8sDNSZone,
*controllerNamespace,
trustDomain,
*enableH2Upgrade,
k8sAPI,
done,
)
if err != nil {
log.Fatal(err)
}

View File

@ -213,6 +213,21 @@ func CreatedByAnnotationValue() string {
return fmt.Sprintf("linkerd/cli %s", version.Version)
}
// GetServiceAccountAndNS returns the pod's serviceaccount and namespace.
func GetServiceAccountAndNS(pod *corev1.Pod) (sa string, ns string) {
sa = pod.Spec.ServiceAccountName
if sa == "" {
sa = "default"
}
ns = pod.GetNamespace()
if ns == "" {
ns = "default"
}
return
}
// GetPodLabels returns the set of prometheus owner labels for a given pod
func GetPodLabels(ownerKind, ownerName string, pod *corev1.Pod) map[string]string {
labels := map[string]string{"pod": pod.Name}
@ -220,6 +235,8 @@ func GetPodLabels(ownerKind, ownerName string, pod *corev1.Pod) map[string]strin
l5dLabel := KindToL5DLabel(ownerKind)
labels[l5dLabel] = ownerName
labels["serviceaccount"], _ = GetServiceAccountAndNS(pod)
if controllerNS := pod.Labels[ControllerNSLabel]; controllerNS != "" {
labels["control_plane_ns"] = controllerNS
}

View File

@ -13,12 +13,16 @@ func TestGetPodLabels(t *testing.T) {
t.Run("Maps proxy labels to prometheus labels", func(t *testing.T) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Name: "test-pod",
Namespace: "test-ns",
Labels: map[string]string{
ControllerNSLabel: "linkerd-namespace",
appsv1.DefaultDeploymentUniqueLabelKey: "test-pth",
},
},
Spec: corev1.PodSpec{
ServiceAccountName: "test-sa",
},
}
ownerKind := "deployment"
@ -29,6 +33,7 @@ func TestGetPodLabels(t *testing.T) {
"deployment": "test-deployment",
"pod": "test-pod",
"pod_template_hash": "test-pth",
"serviceaccount": "test-sa",
}
podLabels := GetPodLabels(ownerKind, ownerName, pod)

View File

@ -63,14 +63,14 @@ func loadVerifier(pem string) (verify x509.VerifyOptions, err error) {
// checkEndEntityDir checks that the provided directory path exists and is
// suitable to write key material to, returning the Key, CSR, and Crt paths.
//
// If the directory does not exist or if it has incorrect permissions, we assume
// that the wrong directory was specified incorrectly, instead of trying to
// If the directory does not exist, we assume that the wrong directory was
// specified incorrectly, instead of trying to
// create or repair the directory. In practice, this directory should be tmpfs
// so that credentials are not written to disk, so we want to be extra sensitive
// to an incorrectly specified path.
//
// If the key, CSR, and/or Crt paths refer to existing files, it is assumed that
// multiple instances of this process are running, and an error is returned.
// the proxy has been restarted and these credentials are NOT recreated.
func checkEndEntityDir(dir string) (string, string, error) {
if dir == "" {
return "", "", errors.New("no end entity directory specified")

View File

@ -67,6 +67,7 @@ func TestCliStatForLinkerdNamespace(t *testing.T) {
for _, tt := range []struct {
args []string
expectedRows map[string]string
skipTLS bool
}{
{
args: []string{"stat", "deploy", "-n", TestHelper.GetLinkerdNamespace()},
@ -77,6 +78,7 @@ func TestCliStatForLinkerdNamespace(t *testing.T) {
"linkerd-prometheus": "1/1",
"linkerd-web": "1/1",
},
skipTLS: true,
},
{
args: []string{"stat", "po", "-n", TestHelper.GetLinkerdNamespace(), "--from", "deploy/linkerd-controller"},
@ -107,6 +109,7 @@ func TestCliStatForLinkerdNamespace(t *testing.T) {
expectedRows: map[string]string{
TestHelper.GetLinkerdNamespace(): "5/5",
},
skipTLS: true,
},
{
args: []string{"stat", "po", "-n", TestHelper.GetLinkerdNamespace(), "--to", "au/" + prometheusAuthority},
@ -135,7 +138,7 @@ func TestCliStatForLinkerdNamespace(t *testing.T) {
}
for name, meshed := range tt.expectedRows {
if err := validateRowStats(name, meshed, rowStats); err != nil {
if err := validateRowStats(name, meshed, rowStats, tt.skipTLS); err != nil {
return err
}
}
@ -195,7 +198,7 @@ func parseRows(out string, expectedRowCount int) (map[string]*rowStat, error) {
return rowStats, nil
}
func validateRowStats(name, expectedMeshCount string, rowStats map[string]*rowStat) error {
func validateRowStats(name, expectedMeshCount string, rowStats map[string]*rowStat, skipTLS bool) error {
stat, ok := rowStats[name]
if !ok {
return fmt.Errorf("No stats found for [%s]", name)
@ -232,11 +235,12 @@ func validateRowStats(name, expectedMeshCount string, rowStats map[string]*rowSt
name, stat.p99Latency)
}
// this should be 100.00% when control plane is TLSed by default
expectedTLSRate := "0%"
if stat.tlsPercent != expectedTLSRate {
return fmt.Errorf("Expected tls rate [%s] for [%s], got [%s]",
expectedTLSRate, name, stat.tlsPercent)
if !skipTLS {
expectedTLSRate := "100%"
if stat.tlsPercent != expectedTLSRate {
return fmt.Errorf("Expected tls rate [%s] for [%s], got [%s]",
expectedTLSRate, name, stat.tlsPercent)
}
}
return nil