Fix destination controller

This commit is contained in:
Alejandro Pedraza 2024-04-16 10:40:08 -05:00
parent e2e8990326
commit f8f73c47c7
No known key found for this signature in database
3 changed files with 177 additions and 94 deletions

View File

@ -665,7 +665,8 @@ func newEmptyAddressSet() watcher.AddressSet {
// getInboundPort gets the inbound port from the proxy container's environment // getInboundPort gets the inbound port from the proxy container's environment
// variable. // variable.
func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) { func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
for _, containerSpec := range podSpec.Containers { containers := append(podSpec.InitContainers, podSpec.Containers...)
for _, containerSpec := range containers {
if containerSpec.Name != pkgK8s.ProxyContainerName { if containerSpec.Name != pkgK8s.ProxyContainerName {
continue continue
} }

View File

@ -29,7 +29,6 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local" const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local" const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local" const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local"
const clusterIP = "172.17.12.0" const clusterIP = "172.17.12.0"
const clusterIPv6 = "2001:db8::88" const clusterIPv6 = "2001:db8::88"
const clusterIPOpaque = "172.17.12.1" const clusterIPOpaque = "172.17.12.1"
@ -160,95 +159,11 @@ func TestGet(t *testing.T) {
}) })
t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) { t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
server, client := getServerWithClient(t) testOpaque(t, "policy-test")
defer server.clusterStore.UnregisterGauges() })
stream := &bufferingGetStream{ t.Run("Return endpoint opaque protocol controlled by a server (native sidecar)", func(t *testing.T) {
updates: make(chan *pb.Update, 50), testOpaque(t, "native")
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
errs := make(chan error)
path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80)
// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
if err != nil {
errs <- err
}
}()
select {
case err := <-errs:
t.Fatalf("Got error: %s", err)
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
}
// Update the Server's pod selector so that it no longer selects the
// pod. This should result in the proxy protocol no longer being marked
// as opaque.
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// PodSelector is updated to NOT select the pod
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
// Update the Server's pod selector so that it once again selects the
// pod. This should result in the proxy protocol once again being marked
// as opaque.
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
}) })
t.Run("Remote discovery", func(t *testing.T) { t.Run("Remote discovery", func(t *testing.T) {
@ -290,6 +205,98 @@ func TestGet(t *testing.T) {
}) })
} }
func testOpaque(t *testing.T, name string) {
server, client := getServerWithClient(t)
defer server.clusterStore.UnregisterGauges()
stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
errs := make(chan error)
path := fmt.Sprintf("%s.ns.svc.mycluster.local:%d", name, 80)
// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
if err != nil {
errs <- err
}
}()
select {
case err := <-errs:
t.Fatalf("Got error: %s", err)
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
}
// Update the Server's pod selector so that it no longer selects the
// pod. This should result in the proxy protocol no longer being marked
// as opaque.
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// PodSelector is updated to NOT select the pod
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
// Update the Server's pod selector so that it once again selects the
// pod. This should result in the proxy protocol once again being marked
// as opaque.
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": name}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
}
func TestGetProfiles(t *testing.T) { func TestGetProfiles(t *testing.T) {
t.Run("Returns error if not valid service name", func(t *testing.T) { t.Run("Returns error if not valid service name", func(t *testing.T) {
server := makeServer(t) server := makeServer(t)

View File

@ -376,7 +376,7 @@ endpoints:
- 172.17.0.16 - 172.17.0.16
targetRef: targetRef:
kind: Pod kind: Pod
name: pod-policyResources name: policy-test
namespace: ns namespace: ns
ports: ports:
- port: 80 - port: 80
@ -388,7 +388,7 @@ metadata:
labels: labels:
linkerd.io/control-plane-ns: linkerd linkerd.io/control-plane-ns: linkerd
app: policy-test app: policy-test
name: pod-policyResources name: policy-test
namespace: ns namespace: ns
status: status:
phase: Running phase: Running
@ -414,7 +414,7 @@ spec:
apiVersion: policy.linkerd.io/v1beta2 apiVersion: policy.linkerd.io/v1beta2
kind: Server kind: Server
metadata: metadata:
name: srv name: policy-test
namespace: ns namespace: ns
spec: spec:
podSelector: podSelector:
@ -426,7 +426,7 @@ spec:
apiVersion: policy.linkerd.io/v1beta2 apiVersion: policy.linkerd.io/v1beta2
kind: Server kind: Server
metadata: metadata:
name: srv-external-workload name: policy-test-external-workload
namespace: ns namespace: ns
spec: spec:
externalWorkloadSelector: externalWorkloadSelector:
@ -436,6 +436,80 @@ spec:
proxyProtocol: opaque`, proxyProtocol: opaque`,
} }
policyResourcesNativeSidecar := []string{
`
apiVersion: v1
kind: Service
metadata:
name: native
namespace: ns
spec:
type: LoadBalancer
clusterIP: 172.17.12.4
ports:
- port: 80`,
`
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: native
namespace: ns
labels:
kubernetes.io/service-name: native
addressType: IPv4
endpoints:
- addresses:
- 172.17.0.18
targetRef:
kind: Pod
name: native
namespace: ns
ports:
- port: 80
protocol: TCP`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
app: native
name: native
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.0.18
podIPs:
- ip: 172.17.0.18
spec:
initContainers:
- name: linkerd-proxy
env:
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
value: 0.0.0.0:4143
- name: app
image: nginx
ports:
- containerPort: 80
name: http
protocol: TCP`,
`
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
name: native
namespace: ns
spec:
podSelector:
matchLabels:
app: native
port: 80
proxyProtocol: opaque`,
}
hostPortMapping := []string{ hostPortMapping := []string{
` `
kind: Pod kind: Pod
@ -643,6 +717,7 @@ spec:
res = append(res, meshedSkippedPodResource...) res = append(res, meshedSkippedPodResource...)
res = append(res, meshedStatefulSetPodResource...) res = append(res, meshedStatefulSetPodResource...)
res = append(res, policyResources...) res = append(res, policyResources...)
res = append(res, policyResourcesNativeSidecar...)
res = append(res, hostPortMapping...) res = append(res, hostPortMapping...)
res = append(res, mirrorServiceResources...) res = append(res, mirrorServiceResources...)
res = append(res, destinationCredentialsResources...) res = append(res, destinationCredentialsResources...)