diff --git a/controller/api/destination/endpoint_translator.go b/controller/api/destination/endpoint_translator.go index 1d32c220e..60b68ddee 100644 --- a/controller/api/destination/endpoint_translator.go +++ b/controller/api/destination/endpoint_translator.go @@ -665,7 +665,8 @@ func newEmptyAddressSet() watcher.AddressSet { // getInboundPort gets the inbound port from the proxy container's environment // variable. func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) { - for _, containerSpec := range podSpec.Containers { + containers := append(podSpec.InitContainers, podSpec.Containers...) + for _, containerSpec := range containers { if containerSpec.Name != pkgK8s.ProxyContainerName { continue } diff --git a/controller/api/destination/server_test.go b/controller/api/destination/server_test.go index 66eb1f98a..62ff6ab3d 100644 --- a/controller/api/destination/server_test.go +++ b/controller/api/destination/server_test.go @@ -29,7 +29,6 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local" const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local" const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local" const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local" -const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local" const clusterIP = "172.17.12.0" const clusterIPv6 = "2001:db8::88" const clusterIPOpaque = "172.17.12.1" @@ -160,95 +159,11 @@ func TestGet(t *testing.T) { }) t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) { - server, client := getServerWithClient(t) - defer server.clusterStore.UnregisterGauges() + testOpaque(t, "policy-test") + }) - stream := &bufferingGetStream{ - updates: make(chan *pb.Update, 50), - MockServerStream: util.NewMockServerStream(), - } - defer stream.Cancel() - errs := make(chan error) - - path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80) - - // server.Get blocks until the grpc stream is complete so we call it - // in a goroutine and watch stream.updates for updates. - go func() { - err := server.Get(&pb.GetDestination{ - Scheme: "k8s", - Path: path, - }, stream) - if err != nil { - errs <- err - } - }() - - select { - case err := <-errs: - t.Fatalf("Got error: %s", err) - case update := <-stream.updates: - addrs := update.GetAdd().Addrs - if len(addrs) == 0 { - t.Fatalf("Expected len(addrs) to be > 0") - } - - if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil { - t.Fatalf("Expected opaque transport for %s but was nil", path) - } - } - - // Update the Server's pod selector so that it no longer selects the - // pod. This should result in the proxy protocol no longer being marked - // as opaque. - srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{}) - if err != nil { - t.Fatal(err) - } - // PodSelector is updated to NOT select the pod - srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"} - _, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{}) - if err != nil { - t.Fatal(err) - } - - select { - case update := <-stream.updates: - addrs := update.GetAdd().Addrs - if len(addrs) == 0 { - t.Fatalf("Expected len(addrs) to be > 0") - } - - if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil { - t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport()) - } - case err := <-errs: - t.Fatalf("Got error: %s", err) - } - - // Update the Server's pod selector so that it once again selects the - // pod. This should result in the proxy protocol once again being marked - // as opaque. - srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"} - - _, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{}) - if err != nil { - t.Fatal(err) - } - - select { - case update := <-stream.updates: - addrs := update.GetAdd().Addrs - if len(addrs) == 0 { - t.Fatalf("Expected len(addrs) to be > 0") - } - - if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil { - t.Fatalf("Expected opaque transport for %s but was nil", path) - } - case err := <-errs: - t.Fatalf("Got error: %s", err) - } + t.Run("Return endpoint opaque protocol controlled by a server (native sidecar)", func(t *testing.T) { + testOpaque(t, "native") }) t.Run("Remote discovery", func(t *testing.T) { @@ -290,6 +205,98 @@ func TestGet(t *testing.T) { }) } +func testOpaque(t *testing.T, name string) { + server, client := getServerWithClient(t) + defer server.clusterStore.UnregisterGauges() + + stream := &bufferingGetStream{ + updates: make(chan *pb.Update, 50), + MockServerStream: util.NewMockServerStream(), + } + defer stream.Cancel() + errs := make(chan error) + + path := fmt.Sprintf("%s.ns.svc.mycluster.local:%d", name, 80) + + // server.Get blocks until the grpc stream is complete so we call it + // in a goroutine and watch stream.updates for updates. + go func() { + err := server.Get(&pb.GetDestination{ + Scheme: "k8s", + Path: path, + }, stream) + if err != nil { + errs <- err + } + }() + + select { + case err := <-errs: + t.Fatalf("Got error: %s", err) + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil { + t.Fatalf("Expected opaque transport for %s but was nil", path) + } + } + + // Update the Server's pod selector so that it no longer selects the + // pod. This should result in the proxy protocol no longer being marked + // as opaque. + srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + // PodSelector is updated to NOT select the pod + srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"} + _, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + select { + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil { + t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport()) + } + case err := <-errs: + t.Fatalf("Got error: %s", err) + } + + // Update the Server's pod selector so that it once again selects the + // pod. This should result in the proxy protocol once again being marked + // as opaque. + srv.Spec.PodSelector.MatchLabels = map[string]string{"app": name} + + _, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + select { + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil { + t.Fatalf("Expected opaque transport for %s but was nil", path) + } + case err := <-errs: + t.Fatalf("Got error: %s", err) + } +} + func TestGetProfiles(t *testing.T) { t.Run("Returns error if not valid service name", func(t *testing.T) { server := makeServer(t) diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 316060dfb..f50c37fce 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -376,7 +376,7 @@ endpoints: - 172.17.0.16 targetRef: kind: Pod - name: pod-policyResources + name: policy-test namespace: ns ports: - port: 80 @@ -388,7 +388,7 @@ metadata: labels: linkerd.io/control-plane-ns: linkerd app: policy-test - name: pod-policyResources + name: policy-test namespace: ns status: phase: Running @@ -414,7 +414,7 @@ spec: apiVersion: policy.linkerd.io/v1beta2 kind: Server metadata: - name: srv + name: policy-test namespace: ns spec: podSelector: @@ -426,7 +426,7 @@ spec: apiVersion: policy.linkerd.io/v1beta2 kind: Server metadata: - name: srv-external-workload + name: policy-test-external-workload namespace: ns spec: externalWorkloadSelector: @@ -436,6 +436,80 @@ spec: proxyProtocol: opaque`, } + policyResourcesNativeSidecar := []string{ + ` +apiVersion: v1 +kind: Service +metadata: + name: native + namespace: ns +spec: + type: LoadBalancer + clusterIP: 172.17.12.4 + ports: + - port: 80`, + ` +apiVersion: discovery.k8s.io/v1 +kind: EndpointSlice +metadata: + name: native + namespace: ns + labels: + kubernetes.io/service-name: native +addressType: IPv4 +endpoints: +- addresses: + - 172.17.0.18 + targetRef: + kind: Pod + name: native + namespace: ns +ports: +- port: 80 + protocol: TCP`, + ` +apiVersion: v1 +kind: Pod +metadata: + labels: + linkerd.io/control-plane-ns: linkerd + app: native + name: native + namespace: ns +status: + phase: Running + conditions: + - type: Ready + status: "True" + podIP: 172.17.0.18 + podIPs: + - ip: 172.17.0.18 +spec: + initContainers: + - name: linkerd-proxy + env: + - name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR + value: 0.0.0.0:4143 + - name: app + image: nginx + ports: + - containerPort: 80 + name: http + protocol: TCP`, + ` +apiVersion: policy.linkerd.io/v1beta2 +kind: Server +metadata: + name: native + namespace: ns +spec: + podSelector: + matchLabels: + app: native + port: 80 + proxyProtocol: opaque`, + } + hostPortMapping := []string{ ` kind: Pod @@ -643,6 +717,7 @@ spec: res = append(res, meshedSkippedPodResource...) res = append(res, meshedStatefulSetPodResource...) res = append(res, policyResources...) + res = append(res, policyResourcesNativeSidecar...) res = append(res, hostPortMapping...) res = append(res, mirrorServiceResources...) res = append(res, destinationCredentialsResources...)