mirror of https://github.com/linkerd/linkerd2.git
Fix destination controller
This commit is contained in:
parent
e2e8990326
commit
f8f73c47c7
|
@ -665,7 +665,8 @@ func newEmptyAddressSet() watcher.AddressSet {
|
|||
// getInboundPort gets the inbound port from the proxy container's environment
|
||||
// variable.
|
||||
func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
|
||||
for _, containerSpec := range podSpec.Containers {
|
||||
containers := append(podSpec.InitContainers, podSpec.Containers...)
|
||||
for _, containerSpec := range containers {
|
||||
if containerSpec.Name != pkgK8s.ProxyContainerName {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
|
|||
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
|
||||
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
|
||||
const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
|
||||
const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local"
|
||||
const clusterIP = "172.17.12.0"
|
||||
const clusterIPv6 = "2001:db8::88"
|
||||
const clusterIPOpaque = "172.17.12.1"
|
||||
|
@ -160,95 +159,11 @@ func TestGet(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
|
||||
server, client := getServerWithClient(t)
|
||||
defer server.clusterStore.UnregisterGauges()
|
||||
testOpaque(t, "policy-test")
|
||||
})
|
||||
|
||||
stream := &bufferingGetStream{
|
||||
updates: make(chan *pb.Update, 50),
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
defer stream.Cancel()
|
||||
errs := make(chan error)
|
||||
|
||||
path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80)
|
||||
|
||||
// server.Get blocks until the grpc stream is complete so we call it
|
||||
// in a goroutine and watch stream.updates for updates.
|
||||
go func() {
|
||||
err := server.Get(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: path,
|
||||
}, stream)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errs:
|
||||
t.Fatalf("Got error: %s", err)
|
||||
case update := <-stream.updates:
|
||||
addrs := update.GetAdd().Addrs
|
||||
if len(addrs) == 0 {
|
||||
t.Fatalf("Expected len(addrs) to be > 0")
|
||||
}
|
||||
|
||||
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
|
||||
t.Fatalf("Expected opaque transport for %s but was nil", path)
|
||||
}
|
||||
}
|
||||
|
||||
// Update the Server's pod selector so that it no longer selects the
|
||||
// pod. This should result in the proxy protocol no longer being marked
|
||||
// as opaque.
|
||||
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// PodSelector is updated to NOT select the pod
|
||||
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
|
||||
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case update := <-stream.updates:
|
||||
addrs := update.GetAdd().Addrs
|
||||
if len(addrs) == 0 {
|
||||
t.Fatalf("Expected len(addrs) to be > 0")
|
||||
}
|
||||
|
||||
if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
|
||||
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
|
||||
}
|
||||
case err := <-errs:
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// Update the Server's pod selector so that it once again selects the
|
||||
// pod. This should result in the proxy protocol once again being marked
|
||||
// as opaque.
|
||||
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"}
|
||||
|
||||
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case update := <-stream.updates:
|
||||
addrs := update.GetAdd().Addrs
|
||||
if len(addrs) == 0 {
|
||||
t.Fatalf("Expected len(addrs) to be > 0")
|
||||
}
|
||||
|
||||
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
|
||||
t.Fatalf("Expected opaque transport for %s but was nil", path)
|
||||
}
|
||||
case err := <-errs:
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
t.Run("Return endpoint opaque protocol controlled by a server (native sidecar)", func(t *testing.T) {
|
||||
testOpaque(t, "native")
|
||||
})
|
||||
|
||||
t.Run("Remote discovery", func(t *testing.T) {
|
||||
|
@ -290,6 +205,98 @@ func TestGet(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func testOpaque(t *testing.T, name string) {
|
||||
server, client := getServerWithClient(t)
|
||||
defer server.clusterStore.UnregisterGauges()
|
||||
|
||||
stream := &bufferingGetStream{
|
||||
updates: make(chan *pb.Update, 50),
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
defer stream.Cancel()
|
||||
errs := make(chan error)
|
||||
|
||||
path := fmt.Sprintf("%s.ns.svc.mycluster.local:%d", name, 80)
|
||||
|
||||
// server.Get blocks until the grpc stream is complete so we call it
|
||||
// in a goroutine and watch stream.updates for updates.
|
||||
go func() {
|
||||
err := server.Get(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: path,
|
||||
}, stream)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errs:
|
||||
t.Fatalf("Got error: %s", err)
|
||||
case update := <-stream.updates:
|
||||
addrs := update.GetAdd().Addrs
|
||||
if len(addrs) == 0 {
|
||||
t.Fatalf("Expected len(addrs) to be > 0")
|
||||
}
|
||||
|
||||
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
|
||||
t.Fatalf("Expected opaque transport for %s but was nil", path)
|
||||
}
|
||||
}
|
||||
|
||||
// Update the Server's pod selector so that it no longer selects the
|
||||
// pod. This should result in the proxy protocol no longer being marked
|
||||
// as opaque.
|
||||
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// PodSelector is updated to NOT select the pod
|
||||
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
|
||||
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case update := <-stream.updates:
|
||||
addrs := update.GetAdd().Addrs
|
||||
if len(addrs) == 0 {
|
||||
t.Fatalf("Expected len(addrs) to be > 0")
|
||||
}
|
||||
|
||||
if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
|
||||
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
|
||||
}
|
||||
case err := <-errs:
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// Update the Server's pod selector so that it once again selects the
|
||||
// pod. This should result in the proxy protocol once again being marked
|
||||
// as opaque.
|
||||
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": name}
|
||||
|
||||
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case update := <-stream.updates:
|
||||
addrs := update.GetAdd().Addrs
|
||||
if len(addrs) == 0 {
|
||||
t.Fatalf("Expected len(addrs) to be > 0")
|
||||
}
|
||||
|
||||
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
|
||||
t.Fatalf("Expected opaque transport for %s but was nil", path)
|
||||
}
|
||||
case err := <-errs:
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetProfiles(t *testing.T) {
|
||||
t.Run("Returns error if not valid service name", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
|
|
|
@ -376,7 +376,7 @@ endpoints:
|
|||
- 172.17.0.16
|
||||
targetRef:
|
||||
kind: Pod
|
||||
name: pod-policyResources
|
||||
name: policy-test
|
||||
namespace: ns
|
||||
ports:
|
||||
- port: 80
|
||||
|
@ -388,7 +388,7 @@ metadata:
|
|||
labels:
|
||||
linkerd.io/control-plane-ns: linkerd
|
||||
app: policy-test
|
||||
name: pod-policyResources
|
||||
name: policy-test
|
||||
namespace: ns
|
||||
status:
|
||||
phase: Running
|
||||
|
@ -414,7 +414,7 @@ spec:
|
|||
apiVersion: policy.linkerd.io/v1beta2
|
||||
kind: Server
|
||||
metadata:
|
||||
name: srv
|
||||
name: policy-test
|
||||
namespace: ns
|
||||
spec:
|
||||
podSelector:
|
||||
|
@ -426,7 +426,7 @@ spec:
|
|||
apiVersion: policy.linkerd.io/v1beta2
|
||||
kind: Server
|
||||
metadata:
|
||||
name: srv-external-workload
|
||||
name: policy-test-external-workload
|
||||
namespace: ns
|
||||
spec:
|
||||
externalWorkloadSelector:
|
||||
|
@ -436,6 +436,80 @@ spec:
|
|||
proxyProtocol: opaque`,
|
||||
}
|
||||
|
||||
policyResourcesNativeSidecar := []string{
|
||||
`
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: native
|
||||
namespace: ns
|
||||
spec:
|
||||
type: LoadBalancer
|
||||
clusterIP: 172.17.12.4
|
||||
ports:
|
||||
- port: 80`,
|
||||
`
|
||||
apiVersion: discovery.k8s.io/v1
|
||||
kind: EndpointSlice
|
||||
metadata:
|
||||
name: native
|
||||
namespace: ns
|
||||
labels:
|
||||
kubernetes.io/service-name: native
|
||||
addressType: IPv4
|
||||
endpoints:
|
||||
- addresses:
|
||||
- 172.17.0.18
|
||||
targetRef:
|
||||
kind: Pod
|
||||
name: native
|
||||
namespace: ns
|
||||
ports:
|
||||
- port: 80
|
||||
protocol: TCP`,
|
||||
`
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
labels:
|
||||
linkerd.io/control-plane-ns: linkerd
|
||||
app: native
|
||||
name: native
|
||||
namespace: ns
|
||||
status:
|
||||
phase: Running
|
||||
conditions:
|
||||
- type: Ready
|
||||
status: "True"
|
||||
podIP: 172.17.0.18
|
||||
podIPs:
|
||||
- ip: 172.17.0.18
|
||||
spec:
|
||||
initContainers:
|
||||
- name: linkerd-proxy
|
||||
env:
|
||||
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
|
||||
value: 0.0.0.0:4143
|
||||
- name: app
|
||||
image: nginx
|
||||
ports:
|
||||
- containerPort: 80
|
||||
name: http
|
||||
protocol: TCP`,
|
||||
`
|
||||
apiVersion: policy.linkerd.io/v1beta2
|
||||
kind: Server
|
||||
metadata:
|
||||
name: native
|
||||
namespace: ns
|
||||
spec:
|
||||
podSelector:
|
||||
matchLabels:
|
||||
app: native
|
||||
port: 80
|
||||
proxyProtocol: opaque`,
|
||||
}
|
||||
|
||||
hostPortMapping := []string{
|
||||
`
|
||||
kind: Pod
|
||||
|
@ -643,6 +717,7 @@ spec:
|
|||
res = append(res, meshedSkippedPodResource...)
|
||||
res = append(res, meshedStatefulSetPodResource...)
|
||||
res = append(res, policyResources...)
|
||||
res = append(res, policyResourcesNativeSidecar...)
|
||||
res = append(res, hostPortMapping...)
|
||||
res = append(res, mirrorServiceResources...)
|
||||
res = append(res, destinationCredentialsResources...)
|
||||
|
|
Loading…
Reference in New Issue