mirror of https://github.com/linkerd/linkerd2.git
Fix destination controller
This commit is contained in:
parent
e2e8990326
commit
f8f73c47c7
|
@ -665,7 +665,8 @@ func newEmptyAddressSet() watcher.AddressSet {
|
||||||
// getInboundPort gets the inbound port from the proxy container's environment
|
// getInboundPort gets the inbound port from the proxy container's environment
|
||||||
// variable.
|
// variable.
|
||||||
func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
|
func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
|
||||||
for _, containerSpec := range podSpec.Containers {
|
containers := append(podSpec.InitContainers, podSpec.Containers...)
|
||||||
|
for _, containerSpec := range containers {
|
||||||
if containerSpec.Name != pkgK8s.ProxyContainerName {
|
if containerSpec.Name != pkgK8s.ProxyContainerName {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,6 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
|
||||||
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
|
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
|
||||||
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
|
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
|
||||||
const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
|
const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
|
||||||
const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local"
|
|
||||||
const clusterIP = "172.17.12.0"
|
const clusterIP = "172.17.12.0"
|
||||||
const clusterIPv6 = "2001:db8::88"
|
const clusterIPv6 = "2001:db8::88"
|
||||||
const clusterIPOpaque = "172.17.12.1"
|
const clusterIPOpaque = "172.17.12.1"
|
||||||
|
@ -160,95 +159,11 @@ func TestGet(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
|
t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
|
||||||
server, client := getServerWithClient(t)
|
testOpaque(t, "policy-test")
|
||||||
defer server.clusterStore.UnregisterGauges()
|
})
|
||||||
|
|
||||||
stream := &bufferingGetStream{
|
t.Run("Return endpoint opaque protocol controlled by a server (native sidecar)", func(t *testing.T) {
|
||||||
updates: make(chan *pb.Update, 50),
|
testOpaque(t, "native")
|
||||||
MockServerStream: util.NewMockServerStream(),
|
|
||||||
}
|
|
||||||
defer stream.Cancel()
|
|
||||||
errs := make(chan error)
|
|
||||||
|
|
||||||
path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80)
|
|
||||||
|
|
||||||
// server.Get blocks until the grpc stream is complete so we call it
|
|
||||||
// in a goroutine and watch stream.updates for updates.
|
|
||||||
go func() {
|
|
||||||
err := server.Get(&pb.GetDestination{
|
|
||||||
Scheme: "k8s",
|
|
||||||
Path: path,
|
|
||||||
}, stream)
|
|
||||||
if err != nil {
|
|
||||||
errs <- err
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-errs:
|
|
||||||
t.Fatalf("Got error: %s", err)
|
|
||||||
case update := <-stream.updates:
|
|
||||||
addrs := update.GetAdd().Addrs
|
|
||||||
if len(addrs) == 0 {
|
|
||||||
t.Fatalf("Expected len(addrs) to be > 0")
|
|
||||||
}
|
|
||||||
|
|
||||||
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
|
|
||||||
t.Fatalf("Expected opaque transport for %s but was nil", path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the Server's pod selector so that it no longer selects the
|
|
||||||
// pod. This should result in the proxy protocol no longer being marked
|
|
||||||
// as opaque.
|
|
||||||
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
// PodSelector is updated to NOT select the pod
|
|
||||||
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
|
|
||||||
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case update := <-stream.updates:
|
|
||||||
addrs := update.GetAdd().Addrs
|
|
||||||
if len(addrs) == 0 {
|
|
||||||
t.Fatalf("Expected len(addrs) to be > 0")
|
|
||||||
}
|
|
||||||
|
|
||||||
if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
|
|
||||||
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
|
|
||||||
}
|
|
||||||
case err := <-errs:
|
|
||||||
t.Fatalf("Got error: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the Server's pod selector so that it once again selects the
|
|
||||||
// pod. This should result in the proxy protocol once again being marked
|
|
||||||
// as opaque.
|
|
||||||
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"}
|
|
||||||
|
|
||||||
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case update := <-stream.updates:
|
|
||||||
addrs := update.GetAdd().Addrs
|
|
||||||
if len(addrs) == 0 {
|
|
||||||
t.Fatalf("Expected len(addrs) to be > 0")
|
|
||||||
}
|
|
||||||
|
|
||||||
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
|
|
||||||
t.Fatalf("Expected opaque transport for %s but was nil", path)
|
|
||||||
}
|
|
||||||
case err := <-errs:
|
|
||||||
t.Fatalf("Got error: %s", err)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Remote discovery", func(t *testing.T) {
|
t.Run("Remote discovery", func(t *testing.T) {
|
||||||
|
@ -290,6 +205,98 @@ func TestGet(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testOpaque(t *testing.T, name string) {
|
||||||
|
server, client := getServerWithClient(t)
|
||||||
|
defer server.clusterStore.UnregisterGauges()
|
||||||
|
|
||||||
|
stream := &bufferingGetStream{
|
||||||
|
updates: make(chan *pb.Update, 50),
|
||||||
|
MockServerStream: util.NewMockServerStream(),
|
||||||
|
}
|
||||||
|
defer stream.Cancel()
|
||||||
|
errs := make(chan error)
|
||||||
|
|
||||||
|
path := fmt.Sprintf("%s.ns.svc.mycluster.local:%d", name, 80)
|
||||||
|
|
||||||
|
// server.Get blocks until the grpc stream is complete so we call it
|
||||||
|
// in a goroutine and watch stream.updates for updates.
|
||||||
|
go func() {
|
||||||
|
err := server.Get(&pb.GetDestination{
|
||||||
|
Scheme: "k8s",
|
||||||
|
Path: path,
|
||||||
|
}, stream)
|
||||||
|
if err != nil {
|
||||||
|
errs <- err
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errs:
|
||||||
|
t.Fatalf("Got error: %s", err)
|
||||||
|
case update := <-stream.updates:
|
||||||
|
addrs := update.GetAdd().Addrs
|
||||||
|
if len(addrs) == 0 {
|
||||||
|
t.Fatalf("Expected len(addrs) to be > 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
|
||||||
|
t.Fatalf("Expected opaque transport for %s but was nil", path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the Server's pod selector so that it no longer selects the
|
||||||
|
// pod. This should result in the proxy protocol no longer being marked
|
||||||
|
// as opaque.
|
||||||
|
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// PodSelector is updated to NOT select the pod
|
||||||
|
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
|
||||||
|
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case update := <-stream.updates:
|
||||||
|
addrs := update.GetAdd().Addrs
|
||||||
|
if len(addrs) == 0 {
|
||||||
|
t.Fatalf("Expected len(addrs) to be > 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
|
||||||
|
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
|
||||||
|
}
|
||||||
|
case err := <-errs:
|
||||||
|
t.Fatalf("Got error: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the Server's pod selector so that it once again selects the
|
||||||
|
// pod. This should result in the proxy protocol once again being marked
|
||||||
|
// as opaque.
|
||||||
|
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": name}
|
||||||
|
|
||||||
|
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case update := <-stream.updates:
|
||||||
|
addrs := update.GetAdd().Addrs
|
||||||
|
if len(addrs) == 0 {
|
||||||
|
t.Fatalf("Expected len(addrs) to be > 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
|
||||||
|
t.Fatalf("Expected opaque transport for %s but was nil", path)
|
||||||
|
}
|
||||||
|
case err := <-errs:
|
||||||
|
t.Fatalf("Got error: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetProfiles(t *testing.T) {
|
func TestGetProfiles(t *testing.T) {
|
||||||
t.Run("Returns error if not valid service name", func(t *testing.T) {
|
t.Run("Returns error if not valid service name", func(t *testing.T) {
|
||||||
server := makeServer(t)
|
server := makeServer(t)
|
||||||
|
|
|
@ -376,7 +376,7 @@ endpoints:
|
||||||
- 172.17.0.16
|
- 172.17.0.16
|
||||||
targetRef:
|
targetRef:
|
||||||
kind: Pod
|
kind: Pod
|
||||||
name: pod-policyResources
|
name: policy-test
|
||||||
namespace: ns
|
namespace: ns
|
||||||
ports:
|
ports:
|
||||||
- port: 80
|
- port: 80
|
||||||
|
@ -388,7 +388,7 @@ metadata:
|
||||||
labels:
|
labels:
|
||||||
linkerd.io/control-plane-ns: linkerd
|
linkerd.io/control-plane-ns: linkerd
|
||||||
app: policy-test
|
app: policy-test
|
||||||
name: pod-policyResources
|
name: policy-test
|
||||||
namespace: ns
|
namespace: ns
|
||||||
status:
|
status:
|
||||||
phase: Running
|
phase: Running
|
||||||
|
@ -414,7 +414,7 @@ spec:
|
||||||
apiVersion: policy.linkerd.io/v1beta2
|
apiVersion: policy.linkerd.io/v1beta2
|
||||||
kind: Server
|
kind: Server
|
||||||
metadata:
|
metadata:
|
||||||
name: srv
|
name: policy-test
|
||||||
namespace: ns
|
namespace: ns
|
||||||
spec:
|
spec:
|
||||||
podSelector:
|
podSelector:
|
||||||
|
@ -426,7 +426,7 @@ spec:
|
||||||
apiVersion: policy.linkerd.io/v1beta2
|
apiVersion: policy.linkerd.io/v1beta2
|
||||||
kind: Server
|
kind: Server
|
||||||
metadata:
|
metadata:
|
||||||
name: srv-external-workload
|
name: policy-test-external-workload
|
||||||
namespace: ns
|
namespace: ns
|
||||||
spec:
|
spec:
|
||||||
externalWorkloadSelector:
|
externalWorkloadSelector:
|
||||||
|
@ -436,6 +436,80 @@ spec:
|
||||||
proxyProtocol: opaque`,
|
proxyProtocol: opaque`,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
policyResourcesNativeSidecar := []string{
|
||||||
|
`
|
||||||
|
apiVersion: v1
|
||||||
|
kind: Service
|
||||||
|
metadata:
|
||||||
|
name: native
|
||||||
|
namespace: ns
|
||||||
|
spec:
|
||||||
|
type: LoadBalancer
|
||||||
|
clusterIP: 172.17.12.4
|
||||||
|
ports:
|
||||||
|
- port: 80`,
|
||||||
|
`
|
||||||
|
apiVersion: discovery.k8s.io/v1
|
||||||
|
kind: EndpointSlice
|
||||||
|
metadata:
|
||||||
|
name: native
|
||||||
|
namespace: ns
|
||||||
|
labels:
|
||||||
|
kubernetes.io/service-name: native
|
||||||
|
addressType: IPv4
|
||||||
|
endpoints:
|
||||||
|
- addresses:
|
||||||
|
- 172.17.0.18
|
||||||
|
targetRef:
|
||||||
|
kind: Pod
|
||||||
|
name: native
|
||||||
|
namespace: ns
|
||||||
|
ports:
|
||||||
|
- port: 80
|
||||||
|
protocol: TCP`,
|
||||||
|
`
|
||||||
|
apiVersion: v1
|
||||||
|
kind: Pod
|
||||||
|
metadata:
|
||||||
|
labels:
|
||||||
|
linkerd.io/control-plane-ns: linkerd
|
||||||
|
app: native
|
||||||
|
name: native
|
||||||
|
namespace: ns
|
||||||
|
status:
|
||||||
|
phase: Running
|
||||||
|
conditions:
|
||||||
|
- type: Ready
|
||||||
|
status: "True"
|
||||||
|
podIP: 172.17.0.18
|
||||||
|
podIPs:
|
||||||
|
- ip: 172.17.0.18
|
||||||
|
spec:
|
||||||
|
initContainers:
|
||||||
|
- name: linkerd-proxy
|
||||||
|
env:
|
||||||
|
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
|
||||||
|
value: 0.0.0.0:4143
|
||||||
|
- name: app
|
||||||
|
image: nginx
|
||||||
|
ports:
|
||||||
|
- containerPort: 80
|
||||||
|
name: http
|
||||||
|
protocol: TCP`,
|
||||||
|
`
|
||||||
|
apiVersion: policy.linkerd.io/v1beta2
|
||||||
|
kind: Server
|
||||||
|
metadata:
|
||||||
|
name: native
|
||||||
|
namespace: ns
|
||||||
|
spec:
|
||||||
|
podSelector:
|
||||||
|
matchLabels:
|
||||||
|
app: native
|
||||||
|
port: 80
|
||||||
|
proxyProtocol: opaque`,
|
||||||
|
}
|
||||||
|
|
||||||
hostPortMapping := []string{
|
hostPortMapping := []string{
|
||||||
`
|
`
|
||||||
kind: Pod
|
kind: Pod
|
||||||
|
@ -643,6 +717,7 @@ spec:
|
||||||
res = append(res, meshedSkippedPodResource...)
|
res = append(res, meshedSkippedPodResource...)
|
||||||
res = append(res, meshedStatefulSetPodResource...)
|
res = append(res, meshedStatefulSetPodResource...)
|
||||||
res = append(res, policyResources...)
|
res = append(res, policyResources...)
|
||||||
|
res = append(res, policyResourcesNativeSidecar...)
|
||||||
res = append(res, hostPortMapping...)
|
res = append(res, hostPortMapping...)
|
||||||
res = append(res, mirrorServiceResources...)
|
res = append(res, mirrorServiceResources...)
|
||||||
res = append(res, destinationCredentialsResources...)
|
res = append(res, destinationCredentialsResources...)
|
||||||
|
|
Loading…
Reference in New Issue