Disable protocol and TLS hints on skipped ports (#6022)

When a pod is configured with `skip-inbound-ports` annotation, a client
proxy trying to connect to that pod tries to connect to it via H2 and
also tries to initiate a TLS connection. This issue is caused by the
destination controller when it sends protocol and TLS hints to the
client proxy for that skipped port.

This change fixes the destination controller so that it no longer
sends protocol and TLS identity hints to outbound proxies resolving a
`podIP:port` that is on a skipped inbound port.

I've included a test that exhibits this error prior to this fix but you
can also test the prior behavior by:

```bash
curl https://run.linkerd.io/booksapp.yml > booksapp.yaml

# edit either the books or authors service to:
1: Configure a failure rate of 0.0
2: add the `skip-inbound-ports` config annotation

bin/linkerd viz stat pods webapp

There should be no successful requests on the webapp deployment
```
Fixes #5995

Signed-off-by: Dennis Adjei-Baah <dennis@buoyant.io>
This commit is contained in:
Dennis Adjei-Baah 2021-04-16 11:44:17 -05:00 committed by GitHub
parent c24585e6ea
commit 78363ca894
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 427 additions and 6 deletions

View File

@ -225,7 +225,13 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
if !ok {
opaquePorts = et.defaultOpaquePorts
}
wa, err = toWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log)
skippedInboundPorts, skippedErr := getPodSkippedInboundPortsAnnotations(address.Pod)
if skippedErr != nil {
et.log.Errorf("failed getting ignored inbound ports annoatation for pod: %s", err)
}
wa, err = toWeightedAddr(address, opaquePorts, skippedInboundPorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log)
} else {
var authOverride *pb.AuthorityOverride
if address.AuthorityOverride != "" {
@ -315,16 +321,16 @@ func toAddr(address watcher.Address) (*net.TcpAddress, error) {
}, nil
}
func toWeightedAddr(address watcher.Address, opaquePorts map[uint32]struct{}, enableH2Upgrade bool, identityTrustDomain string, controllerNS string, log *logging.Entry) (*pb.WeightedAddr, error) {
func toWeightedAddr(address watcher.Address, opaquePorts, skippedInboundPorts map[uint32]struct{}, enableH2Upgrade bool, identityTrustDomain string, controllerNS string, log *logging.Entry) (*pb.WeightedAddr, error) {
controllerNSLabel := address.Pod.Labels[k8s.ControllerNSLabel]
sa, ns := k8s.GetServiceAccountAndNS(address.Pod)
labels := k8s.GetPodLabels(address.OwnerKind, address.OwnerName, address.Pod)
_, isSkippedInboundPort := skippedInboundPorts[address.Port]
// If the pod is controlled by any Linkerd control plane, then it can be
// hinted that this destination knows H2 (and handles our orig-proto
// translation)
var hint *pb.ProtocolHint
if enableH2Upgrade && controllerNSLabel != "" {
if enableH2Upgrade && controllerNSLabel != "" && !isSkippedInboundPort {
hint = &pb.ProtocolHint{
Protocol: &pb.ProtocolHint_H2_{
H2: &pb.ProtocolHint_H2{},
@ -350,7 +356,8 @@ func toWeightedAddr(address watcher.Address, opaquePorts map[uint32]struct{}, en
var identity *pb.TlsIdentity
if identityTrustDomain != "" &&
controllerNSLabel == controllerNS &&
address.Pod.Annotations[k8s.IdentityModeAnnotation] == k8s.IdentityModeDefault {
address.Pod.Annotations[k8s.IdentityModeAnnotation] == k8s.IdentityModeDefault &&
!isSkippedInboundPort {
id := fmt.Sprintf("%s.%s.serviceaccount.identity.%s.%s", sa, ns, controllerNSLabel, identityTrustDomain)
identity = &pb.TlsIdentity{

View File

@ -231,7 +231,13 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
if !ok {
opaquePorts = s.defaultOpaquePorts
}
endpoint, err = toWeightedAddr(podSet.Addresses[podID], opaquePorts, s.enableH2Upgrade, s.identityTrustDomain, s.controllerNS, log)
skippedInboundPorts, err := getPodSkippedInboundPortsAnnotations(pod)
if err != nil {
log.Errorf("failed to get ignored inbound ports annotation for pod: %s", err)
}
endpoint, err = toWeightedAddr(podSet.Addresses[podID], opaquePorts, skippedInboundPorts, s.enableH2Upgrade, s.identityTrustDomain, s.controllerNS, log)
if err != nil {
return err
}
@ -482,3 +488,12 @@ func getPodOpaquePortsAnnotations(pod *corev1.Pod) (map[uint32]struct{}, bool, e
}
return opaquePorts, true, nil
}
func getPodSkippedInboundPortsAnnotations(pod *corev1.Pod) (map[uint32]struct{}, error) {
annotation, ok := pod.Annotations[labels.ProxyIgnoreInboundPortsAnnotation]
if !ok || annotation == "" {
return nil, nil
}
return util.ParsePorts(annotation)
}

View File

@ -16,13 +16,16 @@ import (
const fullyQualifiedName = "name1.ns.svc.mycluster.local"
const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
const clusterIP = "172.17.12.0"
const clusterIPOpaque = "172.17.12.1"
const podIP1 = "172.17.0.12"
const podIP2 = "172.17.0.13"
const podIPOpaque = "172.17.0.14"
const podIPSkipped = "172.17.0.15"
const port uint32 = 8989
const opaquePort uint32 = 4242
const skippedPort uint32 = 24224
type mockDestinationGetServer struct {
util.MockServerStream
@ -217,10 +220,59 @@ metadata:
annotations:
config.linkerd.io/opaque-ports: "4242"`,
}
meshedSkippedPodResource := []string{
`
apiVersion: v1
kind: Service
metadata:
name: name5
namespace: ns
spec:
type: LoadBalancer
clusterIP: 172.17.13.1
ports:
- port: 24224`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: name5
namespace: ns
subsets:
- addresses:
- ip: 172.17.0.15
targetRef:
kind: Pod
name: name5
namespace: ns
ports:
- port: 24224`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
annotations:
config.linkerd.io/skip-inbound-ports: "24224"
name: name5
namespace: ns
status:
phase: Running
podIP: 172.17.0.15
spec:
containers:
- env:
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
value: 0.0.0.0:4143
name: linkerd-proxy`,
}
res := append(meshedPodResources, clientSP...)
res = append(res, unmeshedPod)
res = append(res, meshedOpaquePodResources...)
res = append(res, meshedOpaqueServiceResources...)
res = append(res, meshedSkippedPodResource...)
k8sAPI, err := k8s.NewFakeAPI(res...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
@ -327,6 +379,44 @@ func TestGet(t *testing.T) {
}
})
t.Run("Return endpoint with unknown protocol hint and identity when service name contains skipped inbound port", func(t *testing.T) {
server := makeServer(t)
stream := &bufferingGetStream{
updates: []*pb.Update{},
MockServerStream: util.NewMockServerStream(),
}
stream.Cancel()
path := fmt.Sprintf("%s:%d", fullyQualifiedNameSkipped, skippedPort)
err := server.Get(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
if err != nil {
t.Fatalf("Got error: %s", err)
}
if len(stream.updates) == 0 || len(stream.updates) > 3 {
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
}
last := stream.updates[len(stream.updates)-1]
addrs := last.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].ProtocolHint != nil {
t.Fatalf("Expected protocol hint for %s to be nil but got %+v", path, addrs[0].ProtocolHint)
}
if addrs[0].TlsIdentity != nil {
t.Fatalf("Expected protocol hint for %s to be nil but got %+v", path, addrs[0].TlsIdentity)
}
})
}
func TestGetProfiles(t *testing.T) {
@ -760,6 +850,44 @@ func TestGetProfiles(t *testing.T) {
t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
}
})
t.Run("Return profile with unknown protocol hint and identity when pod contains skipped inbound port", func(t *testing.T) {
server := makeServer(t)
stream := &bufferingGetProfileStream{
updates: []*pb.DestinationProfile{},
MockServerStream: util.NewMockServerStream(),
}
stream.Cancel()
path := fmt.Sprintf("%s:%d", podIPSkipped, skippedPort)
err := server.GetProfile(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
if err != nil {
t.Fatalf("Got error: %s", err)
}
if len(stream.updates) == 0 || len(stream.updates) > 3 {
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
}
last := stream.updates[len(stream.updates)-1]
addr := last.GetEndpoint()
if addr == nil {
t.Fatalf("Expected to not be nil")
}
if addr.ProtocolHint != nil {
t.Fatalf("Expected protocol hint for %s to be nil but got %+v", path, addr.ProtocolHint)
}
if addr.TlsIdentity != nil {
t.Fatalf("Expected protocol hint for %s to be nil but got %+v", path, addr.TlsIdentity)
}
})
}
func TestTokenStructure(t *testing.T) {

View File

@ -0,0 +1,80 @@
package skipports
import (
"context"
"fmt"
"os"
"regexp"
"testing"
"time"
"github.com/linkerd/linkerd2/testutil"
)
var TestHelper *testutil.TestHelper
var (
skipPortsNs = "skip-ports-test"
booksappDeployments = []string{"books", "traffic", "authors", "webapp"}
httpResponseTotalMetricRE = regexp.MustCompile(
`route_response_total\{direction="outbound",dst="books\.skip-ports-test\.svc\.cluster\.local:7002",classification="failure".*`,
)
)
func TestMain(m *testing.M) {
TestHelper = testutil.NewTestHelper()
os.Exit(m.Run())
}
//////////////////////
/// TEST EXECUTION ///
//////////////////////
func TestSkipInboundPorts(t *testing.T) {
ctx := context.Background()
TestHelper.WithDataPlaneNamespace(ctx, skipPortsNs, nil, t, func(t *testing.T, ns string) {
out, err := TestHelper.LinkerdRun("inject", "--manual", "testdata/skip_ports_application.yaml")
if err != nil {
testutil.AnnotatedFatal(t, "'linkerd inject' command failed", err)
}
out, err = TestHelper.KubectlApply(out, ns)
if err != nil {
testutil.AnnotatedFatalf(t, "'kubectl apply' command failed",
"'kubectl apply' command failed\n%s", out)
}
// Check all booksapp deployments are up and running
for _, deploy := range booksappDeployments {
if err := TestHelper.CheckPods(ctx, ns, deploy, 1); err != nil {
if rce, ok := err.(*testutil.RestartCountError); ok {
testutil.AnnotatedWarn(t, "CheckPods timed-out", rce)
} else {
testutil.AnnotatedError(t, "CheckPods timed-out", err)
}
}
}
// Wait for slow-cookers to start sending requests
time.Sleep(30 * time.Second)
t.Run("expect webapp to not have any 5xx response errors", func(t *testing.T) {
pods, err := TestHelper.GetPods(ctx, ns, map[string]string{"app": "webapp"})
if err != nil {
testutil.AnnotatedFatalf(t, "error getting pods", "error getting pods\n%s", err)
}
podName := fmt.Sprintf("pod/%s", pods[0].Name)
cmd := []string{"diagnostics", "proxy-metrics", "--namespace", ns, podName}
metrics, err := TestHelper.LinkerdRun(cmd...)
if err != nil {
testutil.AnnotatedFatalf(t, "error getting metrics for pod", "error getting metrics for pod\n%s", err)
}
if httpResponseTotalMetricRE.MatchString(metrics) {
testutil.AnnotatedFatalf(t, "expected not to find HTTP outbound response failures to dst=books.skip-ports-test.svc.cluster.local:7002",
"expected not to find HTTP outbound requests when pod is skipping inbound port\n%s", metrics)
}
})
})
}

View File

@ -0,0 +1,191 @@
---
apiVersion: v1
kind: Service
metadata:
name: webapp
labels:
app: webapp
project: booksapp
spec:
selector:
app: webapp
type: ClusterIP
ports:
- name: service
port: 7000
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: webapp
labels:
app: webapp
project: booksapp
app.kubernetes.io/part-of: booksapp
spec:
replicas: 1
selector:
matchLabels:
app: webapp
project: booksapp
template:
metadata:
labels:
app: webapp
project: booksapp
spec:
dnsPolicy: ClusterFirst
containers:
- name: service
image: buoyantio/booksapp:v0.0.5
env:
- name: DATABASE_URL
value: sqlite3:db/db.sqlite3
- name: AUTHORS_SITE
value: http://authors:7001
- name: BOOKS_SITE
value: http://books:7002
args: ["prod:webapp"]
readinessProbe:
httpGet:
path: /ping
port: 7000
ports:
- name: service
containerPort: 7000
---
apiVersion: v1
kind: Service
metadata:
name: authors
labels:
app: authors
project: booksapp
spec:
selector:
app: authors
ports:
- name: service
port: 7001
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: authors
labels:
app: authors
project: booksapp
app.kubernetes.io/part-of: booksapp
spec:
replicas: 1
selector:
matchLabels:
app: authors
project: booksapp
template:
metadata:
labels:
app: authors
project: booksapp
spec:
dnsPolicy: ClusterFirst
containers:
- name: service
image: buoyantio/booksapp:v0.0.5
env:
- name: DATABASE_URL
value: sqlite3:db/db.sqlite3
- name: BOOKS_SITE
value: http://books:7002
- name: FAILURE_RATE
value: "0.0"
args: ["prod:authors"]
readinessProbe:
httpGet:
path: /ping
port: 7001
ports:
- name: service
containerPort: 7001
---
apiVersion: v1
kind: Service
metadata:
name: books
labels:
app: books
project: booksapp
spec:
selector:
app: books
ports:
- name: service
port: 7002
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: books
labels:
app: books
project: booksapp
app.kubernetes.io/part-of: booksapp
spec:
replicas: 1
selector:
matchLabels:
app: books
project: booksapp
template:
metadata:
annotations:
config.linkerd.io/skip-inbound-ports: "7002"
labels:
app: books
project: booksapp
spec:
dnsPolicy: ClusterFirst
containers:
- name: service
image: buoyantio/booksapp:v0.0.5
env:
- name: DATABASE_URL
value: sqlite3:db/db.sqlite3
- name: AUTHORS_SITE
value: http://authors:7001
args: ["prod:books"]
readinessProbe:
httpGet:
path: /ping
port: 7002
ports:
- name: service
containerPort: 7002
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: traffic
labels:
app: traffic
project: booksapp
app.kubernetes.io/part-of: booksapp
spec:
replicas: 1
selector:
matchLabels:
app: traffic
project: booksapp
template:
metadata:
labels:
app: traffic
project: booksapp
spec:
dnsPolicy: ClusterFirst
containers:
- name: traffic
image: buoyantio/booksapp-traffic:v0.0.3
args:
- "-initial-delay=30s"
- "webapp:7000"