Fix destination controller

This commit is contained in:
Alejandro Pedraza 2024-04-16 10:40:08 -05:00
parent e2e8990326
commit f8f73c47c7
No known key found for this signature in database
3 changed files with 177 additions and 94 deletions

View File

@ -665,7 +665,8 @@ func newEmptyAddressSet() watcher.AddressSet {
// getInboundPort gets the inbound port from the proxy container's environment
// variable.
func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
for _, containerSpec := range podSpec.Containers {
containers := append(podSpec.InitContainers, podSpec.Containers...)
for _, containerSpec := range containers {
if containerSpec.Name != pkgK8s.ProxyContainerName {
continue
}

View File

@ -29,7 +29,6 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local"
const clusterIP = "172.17.12.0"
const clusterIPv6 = "2001:db8::88"
const clusterIPOpaque = "172.17.12.1"
@ -160,95 +159,11 @@ func TestGet(t *testing.T) {
})
t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
server, client := getServerWithClient(t)
defer server.clusterStore.UnregisterGauges()
testOpaque(t, "policy-test")
})
stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
errs := make(chan error)
path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80)
// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
if err != nil {
errs <- err
}
}()
select {
case err := <-errs:
t.Fatalf("Got error: %s", err)
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
}
// Update the Server's pod selector so that it no longer selects the
// pod. This should result in the proxy protocol no longer being marked
// as opaque.
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// PodSelector is updated to NOT select the pod
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
// Update the Server's pod selector so that it once again selects the
// pod. This should result in the proxy protocol once again being marked
// as opaque.
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
t.Run("Return endpoint opaque protocol controlled by a server (native sidecar)", func(t *testing.T) {
testOpaque(t, "native")
})
t.Run("Remote discovery", func(t *testing.T) {
@ -290,6 +205,98 @@ func TestGet(t *testing.T) {
})
}
func testOpaque(t *testing.T, name string) {
server, client := getServerWithClient(t)
defer server.clusterStore.UnregisterGauges()
stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
errs := make(chan error)
path := fmt.Sprintf("%s.ns.svc.mycluster.local:%d", name, 80)
// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
if err != nil {
errs <- err
}
}()
select {
case err := <-errs:
t.Fatalf("Got error: %s", err)
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
}
// Update the Server's pod selector so that it no longer selects the
// pod. This should result in the proxy protocol no longer being marked
// as opaque.
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// PodSelector is updated to NOT select the pod
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
// Update the Server's pod selector so that it once again selects the
// pod. This should result in the proxy protocol once again being marked
// as opaque.
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": name}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
}
func TestGetProfiles(t *testing.T) {
t.Run("Returns error if not valid service name", func(t *testing.T) {
server := makeServer(t)

View File

@ -376,7 +376,7 @@ endpoints:
- 172.17.0.16
targetRef:
kind: Pod
name: pod-policyResources
name: policy-test
namespace: ns
ports:
- port: 80
@ -388,7 +388,7 @@ metadata:
labels:
linkerd.io/control-plane-ns: linkerd
app: policy-test
name: pod-policyResources
name: policy-test
namespace: ns
status:
phase: Running
@ -414,7 +414,7 @@ spec:
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
name: srv
name: policy-test
namespace: ns
spec:
podSelector:
@ -426,7 +426,7 @@ spec:
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
name: srv-external-workload
name: policy-test-external-workload
namespace: ns
spec:
externalWorkloadSelector:
@ -436,6 +436,80 @@ spec:
proxyProtocol: opaque`,
}
policyResourcesNativeSidecar := []string{
`
apiVersion: v1
kind: Service
metadata:
name: native
namespace: ns
spec:
type: LoadBalancer
clusterIP: 172.17.12.4
ports:
- port: 80`,
`
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: native
namespace: ns
labels:
kubernetes.io/service-name: native
addressType: IPv4
endpoints:
- addresses:
- 172.17.0.18
targetRef:
kind: Pod
name: native
namespace: ns
ports:
- port: 80
protocol: TCP`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
app: native
name: native
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.0.18
podIPs:
- ip: 172.17.0.18
spec:
initContainers:
- name: linkerd-proxy
env:
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
value: 0.0.0.0:4143
- name: app
image: nginx
ports:
- containerPort: 80
name: http
protocol: TCP`,
`
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
name: native
namespace: ns
spec:
podSelector:
matchLabels:
app: native
port: 80
proxyProtocol: opaque`,
}
hostPortMapping := []string{
`
kind: Pod
@ -643,6 +717,7 @@ spec:
res = append(res, meshedSkippedPodResource...)
res = append(res, meshedStatefulSetPodResource...)
res = append(res, policyResources...)
res = append(res, policyResourcesNativeSidecar...)
res = append(res, hostPortMapping...)
res = append(res, mirrorServiceResources...)
res = append(res, destinationCredentialsResources...)