Destination should return `OpaqueTransport` hint when annotation matches resolved target port (#5458)

The destination service now returns `OpaqueTransport` hint when the annotation
matches the resolve target port. This is different from the current behavior
which always sets the hint when a proxy is present.

Closes #5421

This happens by changing the endpoint watcher to set a pod's opaque port
annotation in certain cases. If the pod already has an annotation, then its
value is used. If the pod has no annotation, then it checks the namespace that
the endpoint belongs to; if it finds an annotation on the namespace then it
overrides the pod's annotation value with that.

Signed-off-by: Kevin Leimkuhler <kevin@kleimkuhler.com>
This commit is contained in:
Alejandro Pedraza 2021-01-05 14:54:55 -05:00 committed by GitHub
parent 8d50631727
commit d3d7f4e2e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 155 additions and 90 deletions

View File

@ -216,7 +216,11 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
err error
)
if address.Pod != nil {
wa, err = toWeightedAddr(address, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log)
opaquePorts, getErr := getOpaquePortsAnnotations(address.Pod)
if getErr != nil {
et.log.Errorf("failed getting opaque ports annotation for pod: %s", getErr)
}
wa, err = toWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log)
} else {
var authOverride *pb.AuthorityOverride
if address.AuthorityOverride != "" {
@ -306,44 +310,30 @@ func toAddr(address watcher.Address) (*net.TcpAddress, error) {
}, nil
}
func toWeightedAddr(address watcher.Address, enableH2Upgrade bool, identityTrustDomain string, controllerNS string, log *logging.Entry) (*pb.WeightedAddr, error) {
func toWeightedAddr(address watcher.Address, opaquePorts map[uint32]struct{}, enableH2Upgrade bool, identityTrustDomain string, controllerNS string, log *logging.Entry) (*pb.WeightedAddr, error) {
controllerNSLabel := address.Pod.Labels[k8s.ControllerNSLabel]
sa, ns := k8s.GetServiceAccountAndNS(address.Pod)
labels := k8s.GetPodLabels(address.OwnerKind, address.OwnerName, address.Pod)
// If the pod is controlled by any Linkerd control plane, then it can be
// hinted that this destination knows H2 (and handles our orig-proto
// translation) and supports receiving opaque traffic.
// translation)
var hint *pb.ProtocolHint
if enableH2Upgrade && controllerNSLabel != "" {
// Get the inbound port from the proxy container's environment
// variable so that it can be set in the protocol hint.
var inboundPort uint32
loop:
for _, containerSpec := range address.Pod.Spec.Containers {
if containerSpec.Name != k8s.ProxyContainerName {
continue
}
for _, envVar := range containerSpec.Env {
if envVar.Name != envInboundListenAddr {
continue
}
addr := strings.Split(envVar.Value, ":")
port, err := strconv.ParseUint(addr[1], 10, 32)
if err != nil {
log.Errorf("failed to parse inbound port for proxy container: %s", err)
}
inboundPort = uint32(port)
break loop
}
}
hint = &pb.ProtocolHint{
Protocol: &pb.ProtocolHint_H2_{
H2: &pb.ProtocolHint_H2{},
},
OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
InboundPort: inboundPort,
},
}
if _, ok := opaquePorts[address.Port]; ok {
port, err := getInboundPort(&address.Pod.Spec)
if err != nil {
log.Error(err)
} else {
hint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
}
}
}
}
@ -406,3 +396,25 @@ func newEmptyAddressSet() watcher.AddressSet {
TopologicalPref: []string{},
}
}
// getInboundPort gets the inbound port from the proxy container's environment
// variable.
func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
for _, containerSpec := range podSpec.Containers {
if containerSpec.Name != k8s.ProxyContainerName {
continue
}
for _, envVar := range containerSpec.Env {
if envVar.Name != envInboundListenAddr {
continue
}
addr := strings.Split(envVar.Value, ":")
port, err := strconv.ParseUint(addr[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse inbound port for proxy container: %s", err)
}
return uint32(port), nil
}
}
return 0, fmt.Errorf("failed to find %s environment variable in any container for given pod spec", envInboundListenAddr)
}

View File

@ -1,7 +1,6 @@
package destination
import (
"fmt"
"strconv"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
@ -11,7 +10,6 @@ import (
"github.com/linkerd/linkerd2/pkg/util"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
)
// opaquePortsAdaptor implements EndpointUpdateListener so that it can watch
@ -73,7 +71,7 @@ func (opa *opaquePortsAdaptor) getOpaquePorts(set watcher.AddressSet) map[uint32
for _, address := range set.Addresses {
pod := address.Pod
if pod != nil {
override, err := getOpaquePortsAnnotations(opa.k8sAPI, pod)
override, err := getOpaquePortsAnnotations(pod)
if err != nil {
opa.log.Errorf("Failed to get opaque ports annotation for pod %s: %s", pod, err)
}
@ -91,36 +89,14 @@ func (opa *opaquePortsAdaptor) publish() {
merged = *opa.profile
}
merged.Spec.OpaquePorts = opa.opaquePorts
fmt.Printf("publishing SP: %v", merged)
opa.listener.Update(&merged)
}
func getOpaquePortsAnnotations(k8sAPI *k8s.API, pod *corev1.Pod) (map[uint32]struct{}, error) {
func getOpaquePortsAnnotations(pod *corev1.Pod) (map[uint32]struct{}, error) {
opaquePorts := make(map[uint32]struct{})
obj, err := k8sAPI.GetObjects("", pkgk8s.Namespace, pod.Namespace, labels.Everything())
if err != nil {
return nil, err
}
if len(obj) > 1 {
return nil, fmt.Errorf("Namespace conflict: %v, %v", obj[0], obj[1])
}
if len(obj) != 1 {
return nil, fmt.Errorf("Namespace not found: %v", pod.Namespace)
}
ns, ok := obj[0].(*corev1.Namespace)
if !ok {
// This is very unlikely due to how `GetObjects` works
return nil, fmt.Errorf("Object with name %s was not a namespace", pod.Namespace)
}
override := ns.Annotations[pkgk8s.ProxyOpaquePortsAnnotation]
// Pod annotations override namespace annotations
if podOverride, ok := pod.Annotations[pkgk8s.ProxyOpaquePortsAnnotation]; ok {
override = podOverride
}
if override != "" {
for _, portStr := range util.ParseOpaquePorts(override, pod.Spec.Containers) {
annotation := pod.Annotations[pkgk8s.ProxyOpaquePortsAnnotation]
if annotation != "" {
for _, portStr := range util.ParseOpaquePorts(annotation, pod.Spec.Containers) {
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return nil, err
@ -128,6 +104,5 @@ func getOpaquePortsAnnotations(k8sAPI *k8s.API, pod *corev1.Pod) (map[uint32]str
opaquePorts[uint32(port)] = struct{}{}
}
}
return opaquePorts, nil
}

View File

@ -208,19 +208,21 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
Namespace: pod.Namespace,
Name: pod.Name,
}
endpoint, err = toWeightedAddr(podSet.Addresses[podID], s.enableH2Upgrade, s.identityTrustDomain, s.controllerNS, log)
err := watcher.SetPodOpaquePortAnnotation(s.k8sAPI, pod, pod.Namespace)
if err != nil {
log.Errorf("failed to set opaque port annotation on pod: %s", err)
}
opaquePorts, err = getOpaquePortsAnnotations(pod)
if err != nil {
log.Errorf("failed to get opaque ports annotation for pod: %s", err)
}
endpoint, err = toWeightedAddr(podSet.Addresses[podID], opaquePorts, s.enableH2Upgrade, s.identityTrustDomain, s.controllerNS, log)
if err != nil {
return err
}
// `Get` doesn't include the namespace in the per-endpoint
// metadata, so it needs to be special-cased.
endpoint.MetricLabels["namespace"] = pod.Namespace
opaquePorts, err = getOpaquePortsAnnotations(s.k8sAPI, pod)
if err != nil {
return err
}
}
// When the IP does not map to a service, the default profile is

View File

@ -14,9 +14,12 @@ import (
)
const fullyQualifiedName = "name1.ns.svc.mycluster.local"
const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
const clusterIP = "172.17.12.0"
const clusterIPOpaque = "172.17.12.1"
const podIP1 = "172.17.0.12"
const podIP2 = "172.17.0.13"
const podIPOpaque = "172.17.0.14"
const port uint32 = 8989
const opaquePort uint32 = 4242
@ -41,7 +44,7 @@ func (m *mockDestinationGetProfileServer) Send(profile *pb.DestinationProfile) e
}
func makeServer(t *testing.T) *server {
k8sAPI, err := k8s.NewFakeAPI(`
meshedPodResources := []string{`
apiVersion: v1
kind: Namespace
metadata:
@ -78,8 +81,6 @@ kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
annotations:
config.linkerd.io/opaque-ports: "4242"
name: name1-1
namespace: ns
status:
@ -92,15 +93,6 @@ spec:
value: 0.0.0.0:4143
name: linkerd-proxy`,
`
apiVersion: v1
kind: Pod
metadata:
name: name2-1
namespace: ns
status:
phase: Running
podIP: 172.17.0.13`,
`
apiVersion: linkerd.io/v1alpha2
kind: ServiceProfile
metadata:
@ -112,6 +104,9 @@ spec:
isRetryable: false
condition:
pathRegex: "/a/b/c"`,
}
clientSP := []string{
`
apiVersion: linkerd.io/v1alpha2
kind: ServiceProfile
@ -124,7 +119,69 @@ spec:
isRetryable: true
condition:
pathRegex: "/x/y/z"`,
)
}
unmeshedPod := `
apiVersion: v1
kind: Pod
metadata:
name: name2
namespace: ns
status:
phase: Running
podIP: 172.17.0.13`
meshedOpaquePodResources := []string{
`
apiVersion: v1
kind: Service
metadata:
name: name3
namespace: ns
spec:
type: LoadBalancer
clusterIP: 172.17.12.1
ports:
- port: 4242`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: name3
namespace: ns
subsets:
- addresses:
- ip: 172.17.0.14
targetRef:
kind: Pod
name: name3
namespace: ns
ports:
- port: 4242`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
annotations:
config.linkerd.io/opaque-ports: "4242"
name: name3
namespace: ns
status:
phase: Running
podIP: 172.17.0.14
spec:
containers:
- env:
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
value: 0.0.0.0:4143
name: linkerd-proxy`,
}
res := append(meshedPodResources, clientSP...)
res = append(res, unmeshedPod)
res = append(res, meshedOpaquePodResources...)
k8sAPI, err := k8s.NewFakeAPI(res...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
@ -482,8 +539,8 @@ func TestGetProfiles(t *testing.T) {
if first.Endpoint.ProtocolHint == nil {
t.Fatalf("Expected protocol hint but found none")
}
if first.Endpoint.ProtocolHint.GetOpaqueTransport().InboundPort != 4143 {
t.Fatalf("Expected pod to support opaque traffic on port 4143")
if first.Endpoint.ProtocolHint.GetOpaqueTransport() != nil {
t.Fatalf("Expected pod to not support opaque traffic on port %d", port)
}
if first.Endpoint.Addr.String() != epAddr.String() {
t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
@ -556,7 +613,7 @@ func TestGetProfiles(t *testing.T) {
stream.Cancel()
err := server.GetProfile(&pb.GetDestination{
Scheme: "k8s",
Path: fmt.Sprintf("%s:%d", clusterIP, opaquePort),
Path: fmt.Sprintf("%s:%d", clusterIPOpaque, opaquePort),
}, stream)
if err != nil {
t.Fatalf("Got error: %s", err)
@ -569,16 +626,12 @@ func TestGetProfiles(t *testing.T) {
}
last := stream.updates[len(stream.updates)-1]
if last.FullyQualifiedName != fullyQualifiedName {
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, last.FullyQualifiedName)
if last.FullyQualifiedName != fullyQualifiedNameOpaque {
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaque, last.FullyQualifiedName)
}
if !last.OpaqueProtocol {
t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
}
routes := last.GetRoutes()
if len(routes) != 1 {
t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
}
})
t.Run("Return opaque protocol profile with endpoint when using pod IP and opaque protocol port", func(t *testing.T) {
@ -589,14 +642,14 @@ func TestGetProfiles(t *testing.T) {
}
stream.Cancel()
epAddr, err := toAddress(podIP1, opaquePort)
epAddr, err := toAddress(podIPOpaque, opaquePort)
if err != nil {
t.Fatalf("Got error: %s", err)
}
err = server.GetProfile(&pb.GetDestination{
Scheme: "k8s",
Path: fmt.Sprintf("%s:%d", podIP1, opaquePort),
Path: fmt.Sprintf("%s:%d", podIPOpaque, opaquePort),
}, stream)
if err != nil {
t.Fatalf("Got error: %s", err)

View File

@ -799,6 +799,10 @@ func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) Addre
pp.log.Errorf("Unable to create new address:%v", err)
continue
}
err = SetPodOpaquePortAnnotation(pp.k8sAPI, address.Pod, endpoints.Namespace)
if err != nil {
pp.log.Errorf("failed to set opaque port annotation on pod: %s", err)
}
addresses[id] = address
}
}
@ -1081,3 +1085,22 @@ func isValidSlice(es *discovery.EndpointSlice) bool {
return true
}
// SetPodOpaquePortAnnotation ensures that if there is no opaque port
// annotation on the pod, then it inherits the annotation from the namespace.
// If there is also no annotation on the namespace, then it remains unset.
func SetPodOpaquePortAnnotation(k8sAPI *k8s.API, pod *corev1.Pod, ns string) error {
if _, ok := pod.Annotations[consts.ProxyOpaquePortsAnnotation]; !ok {
ns, err := k8sAPI.NS().Lister().Get(ns)
if err != nil {
return fmt.Errorf("failed to get namespace annotation: %s", err)
}
if annotation, ok := ns.Annotations[consts.ProxyOpaquePortsAnnotation]; ok {
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[consts.ProxyOpaquePortsAnnotation] = annotation
}
}
return nil
}

View File

@ -19,7 +19,7 @@ var (
// With the app's port marked as opaque, we expect to find a single open
// TCP connection that is not TLS'd because the port is skipped.
tcpMetric = "tcp_open_total{peer=\"src\",direction=\"inbound\",tls=\"no_identity\",no_tls_reason=\"port_skipped\"}"
tcpMetric = "tcp_open_connections{peer=\"src\",direction=\"inbound\",tls=\"true\",client_id=\"default.linkerd-opaque-ports-test.serviceaccount.identity.linkerd.cluster.local\"}"
)
func TestMain(m *testing.M) {
@ -64,7 +64,7 @@ func TestOpaquePorts(t *testing.T) {
testutil.AnnotatedErrorf(t, "CheckDeployment timed-out", "Error validating deployment [%s]:\n%s", appName, err)
}
t.Run("expect inbound TCP connection with no TLS for port_skipped reason", func(t *testing.T) {
t.Run("expect inbound TCP connection metric with expected TLS identity", func(t *testing.T) {
pods, err := TestHelper.GetPods(ctx, opaquePortsNs, map[string]string{"app": appName})
if err != nil {
testutil.AnnotatedFatalf(t, "error getting opaque ports app pods", "error getting opaque ports app pods\n%s", err)