mirror of https://github.com/linkerd/linkerd2.git
1332 lines
37 KiB
Go
1332 lines
37 KiB
Go
package destination
|
|
|
|
import (
|
|
"fmt"
|
|
"net/netip"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/go-test/deep"
|
|
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
|
|
"github.com/linkerd/linkerd2-proxy-api/go/net"
|
|
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
|
ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
|
|
"github.com/linkerd/linkerd2/pkg/addr"
|
|
"github.com/linkerd/linkerd2/pkg/k8s"
|
|
"google.golang.org/protobuf/proto"
|
|
corev1 "k8s.io/api/core/v1"
|
|
v1 "k8s.io/api/discovery/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
)
|
|
|
|
var (
|
|
pod1 = watcher.Address{
|
|
IP: "1.1.1.1",
|
|
Port: 1,
|
|
Pod: &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
ServiceAccountName: "serviceaccount-name",
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: k8s.ProxyContainerName,
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: envInboundListenAddr,
|
|
Value: "0.0.0.0:4143",
|
|
},
|
|
{
|
|
Name: envAdminListenAddr,
|
|
Value: "0.0.0.0:4191",
|
|
},
|
|
{
|
|
Name: envControlListenAddr,
|
|
Value: "0.0.0.0:4190",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
OwnerKind: "replicationcontroller",
|
|
OwnerName: "rc-name",
|
|
}
|
|
|
|
pod1IPv6 = watcher.Address{
|
|
IP: "2001:0db8:85a3:0000:0000:8a2e:0370:7333",
|
|
Port: 1,
|
|
Pod: &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
ServiceAccountName: "serviceaccount-name",
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: k8s.ProxyContainerName,
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: envInboundListenAddr,
|
|
Value: "[::]:4143",
|
|
},
|
|
{
|
|
Name: envAdminListenAddr,
|
|
Value: "[::]:4191",
|
|
},
|
|
{
|
|
Name: envControlListenAddr,
|
|
Value: "[::]:4190",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
OwnerKind: "replicationcontroller",
|
|
OwnerName: "rc-name",
|
|
}
|
|
|
|
pod2 = watcher.Address{
|
|
IP: "1.1.1.2",
|
|
Port: 2,
|
|
Pod: &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod2",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: k8s.ProxyContainerName,
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: envInboundListenAddr,
|
|
Value: "0.0.0.0:4143",
|
|
},
|
|
{
|
|
Name: envAdminListenAddr,
|
|
Value: "0.0.0.0:4191",
|
|
},
|
|
{
|
|
Name: envControlListenAddr,
|
|
Value: "0.0.0.0:4190",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
pod3 = watcher.Address{
|
|
IP: "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
|
|
Port: 3,
|
|
Pod: &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod3",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: k8s.ProxyContainerName,
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: envInboundListenAddr,
|
|
Value: "[::1]:4143",
|
|
},
|
|
{
|
|
Name: envAdminListenAddr,
|
|
Value: "0.0.0.0:4191",
|
|
},
|
|
{
|
|
Name: envControlListenAddr,
|
|
Value: "0.0.0.0:4190",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
podOpaque = watcher.Address{
|
|
IP: "1.1.1.4",
|
|
Port: 4,
|
|
Pod: &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod4",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
Annotations: map[string]string{
|
|
k8s.ProxyOpaquePortsAnnotation: "4",
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: k8s.ProxyContainerName,
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: envInboundListenAddr,
|
|
Value: "0.0.0.0:4143",
|
|
},
|
|
{
|
|
Name: envAdminListenAddr,
|
|
Value: "0.0.0.0:4191",
|
|
},
|
|
{
|
|
Name: envControlListenAddr,
|
|
Value: "0.0.0.0:4190",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
OpaqueProtocol: true,
|
|
}
|
|
|
|
podAdmin = watcher.Address{
|
|
IP: "1.1.1.5",
|
|
Port: 4191,
|
|
Pod: &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "podAdmin",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
ServiceAccountName: "serviceaccount-name",
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: k8s.ProxyContainerName,
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: envInboundListenAddr,
|
|
Value: "0.0.0.0:4143",
|
|
},
|
|
{
|
|
Name: envAdminListenAddr,
|
|
Value: "0.0.0.0:4191",
|
|
},
|
|
{
|
|
Name: envControlListenAddr,
|
|
Value: "0.0.0.0:4190",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
OwnerKind: "replicationcontroller",
|
|
OwnerName: "rc-name",
|
|
}
|
|
|
|
podControl = watcher.Address{
|
|
IP: "1.1.1.6",
|
|
Port: 4190,
|
|
Pod: &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "podControl",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
ServiceAccountName: "serviceaccount-name",
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: k8s.ProxyContainerName,
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: envInboundListenAddr,
|
|
Value: "0.0.0.0:4143",
|
|
},
|
|
{
|
|
Name: envAdminListenAddr,
|
|
Value: "0.0.0.0:4191",
|
|
},
|
|
{
|
|
Name: envControlListenAddr,
|
|
Value: "0.0.0.0:4190",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
OwnerKind: "replicationcontroller",
|
|
OwnerName: "rc-name",
|
|
}
|
|
|
|
ew1 = watcher.Address{
|
|
IP: "1.1.1.1",
|
|
Port: 1,
|
|
ExternalWorkload: &ewv1beta1.ExternalWorkload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "ew-1",
|
|
Namespace: "ns",
|
|
},
|
|
Spec: ewv1beta1.ExternalWorkloadSpec{
|
|
MeshTLS: ewv1beta1.MeshTLS{
|
|
Identity: "spiffe://some-domain/ew-1",
|
|
ServerName: "server.local",
|
|
},
|
|
Ports: []ewv1beta1.PortSpec{
|
|
{
|
|
Name: k8s.ProxyPortName,
|
|
Port: 4143,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
OwnerKind: "workloadgroup",
|
|
OwnerName: "wg-name",
|
|
}
|
|
|
|
ew2 = watcher.Address{
|
|
IP: "1.1.1.2",
|
|
Port: 2,
|
|
ExternalWorkload: &ewv1beta1.ExternalWorkload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "ew-2",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
},
|
|
Spec: ewv1beta1.ExternalWorkloadSpec{
|
|
MeshTLS: ewv1beta1.MeshTLS{
|
|
Identity: "spiffe://some-domain/ew-2",
|
|
ServerName: "server.local",
|
|
},
|
|
Ports: []ewv1beta1.PortSpec{
|
|
{
|
|
Name: k8s.ProxyPortName,
|
|
Port: 4143,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
ew3 = watcher.Address{
|
|
IP: "1.1.1.3",
|
|
Port: 3,
|
|
ExternalWorkload: &ewv1beta1.ExternalWorkload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "ew-3",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
},
|
|
Spec: ewv1beta1.ExternalWorkloadSpec{
|
|
MeshTLS: ewv1beta1.MeshTLS{
|
|
Identity: "spiffe://some-domain/ew-3",
|
|
ServerName: "server.local",
|
|
},
|
|
Ports: []ewv1beta1.PortSpec{
|
|
{
|
|
Name: k8s.ProxyPortName,
|
|
Port: 4143,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
ewOpaque = watcher.Address{
|
|
IP: "1.1.1.4",
|
|
Port: 4,
|
|
ExternalWorkload: &ewv1beta1.ExternalWorkload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod4",
|
|
Namespace: "ns",
|
|
Annotations: map[string]string{
|
|
k8s.ProxyOpaquePortsAnnotation: "4",
|
|
},
|
|
},
|
|
Spec: ewv1beta1.ExternalWorkloadSpec{
|
|
MeshTLS: ewv1beta1.MeshTLS{
|
|
Identity: "spiffe://some-domain/ew-opaque",
|
|
ServerName: "server.local",
|
|
},
|
|
|
|
Ports: []ewv1beta1.PortSpec{
|
|
{
|
|
Port: 4143,
|
|
Name: "linkerd-proxy",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
OpaqueProtocol: true,
|
|
}
|
|
|
|
ewNoProxyPort = watcher.Address{
|
|
IP: "1.1.1.1",
|
|
Port: 1,
|
|
ExternalWorkload: &ewv1beta1.ExternalWorkload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "ew-4",
|
|
Namespace: "ns",
|
|
},
|
|
Spec: ewv1beta1.ExternalWorkloadSpec{
|
|
MeshTLS: ewv1beta1.MeshTLS{
|
|
Identity: "spiffe://some-domain/ew-4",
|
|
ServerName: "server.local",
|
|
},
|
|
Ports: []ewv1beta1.PortSpec{},
|
|
},
|
|
},
|
|
OwnerKind: "workloadgroup",
|
|
OwnerName: "wg-name",
|
|
}
|
|
|
|
ewOverrideProxyPort = watcher.Address{
|
|
IP: "1.1.1.1",
|
|
Port: 1,
|
|
ExternalWorkload: &ewv1beta1.ExternalWorkload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "ew-4",
|
|
Namespace: "ns",
|
|
},
|
|
Spec: ewv1beta1.ExternalWorkloadSpec{
|
|
MeshTLS: ewv1beta1.MeshTLS{
|
|
Identity: "spiffe://some-domain/ew-4",
|
|
ServerName: "server.local",
|
|
},
|
|
Ports: []ewv1beta1.PortSpec{{
|
|
Port: 1111,
|
|
Name: "linkerd-proxy",
|
|
}},
|
|
},
|
|
},
|
|
OwnerKind: "workloadgroup",
|
|
OwnerName: "wg-name",
|
|
}
|
|
|
|
remoteGateway1 = watcher.Address{
|
|
IP: "1.1.1.1",
|
|
Port: 1,
|
|
}
|
|
|
|
remoteGateway2 = watcher.Address{
|
|
IP: "1.1.1.2",
|
|
Port: 2,
|
|
Identity: "some-identity",
|
|
}
|
|
|
|
remoteGatewayAuthOverride = watcher.Address{
|
|
IP: "1.1.1.2",
|
|
Port: 2,
|
|
Identity: "some-identity",
|
|
AuthorityOverride: "some-auth.com:2",
|
|
}
|
|
|
|
west1aAddress = watcher.Address{
|
|
IP: "1.1.1.1",
|
|
Port: 1,
|
|
ForZones: []v1.ForZone{
|
|
{Name: "west-1a"},
|
|
},
|
|
}
|
|
west1bAddress = watcher.Address{
|
|
IP: "1.1.1.1",
|
|
Port: 2,
|
|
ForZones: []v1.ForZone{
|
|
{Name: "west-1b"},
|
|
},
|
|
}
|
|
AddressOnTest123Node = watcher.Address{
|
|
IP: "1.1.1.1",
|
|
Port: 1,
|
|
Pod: &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
NodeName: "test-123",
|
|
},
|
|
},
|
|
}
|
|
AddressNotOnTest123Node = watcher.Address{
|
|
IP: "1.1.1.2",
|
|
Port: 2,
|
|
Pod: &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
Namespace: "ns",
|
|
Labels: map[string]string{
|
|
k8s.ControllerNSLabel: "linkerd",
|
|
k8s.ProxyDeploymentLabel: "deployment-name",
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
NodeName: "test-234",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
|
|
func TestEndpointTranslatorForRemoteGateways(t *testing.T) {
|
|
t.Run("Sends one update for add and another for remove", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForServices(remoteGateway1, remoteGateway2))
|
|
translator.Remove(mkAddressSetForServices(remoteGateway2))
|
|
|
|
expectedNumUpdates := 2
|
|
<-mockGetServer.updatesReceived // Add
|
|
<-mockGetServer.updatesReceived // Remove
|
|
|
|
if len(mockGetServer.updatesReceived) != 0 {
|
|
t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
|
|
}
|
|
})
|
|
|
|
t.Run("Recovers after emptying address et", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForServices(remoteGateway1))
|
|
translator.Remove(mkAddressSetForServices(remoteGateway1))
|
|
translator.Add(mkAddressSetForServices(remoteGateway1))
|
|
|
|
expectedNumUpdates := 3
|
|
<-mockGetServer.updatesReceived // Add
|
|
<-mockGetServer.updatesReceived // Remove
|
|
<-mockGetServer.updatesReceived // Add
|
|
|
|
if len(mockGetServer.updatesReceived) != 0 {
|
|
t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
|
|
}
|
|
})
|
|
|
|
t.Run("Sends TlsIdentity when enabled", func(t *testing.T) {
|
|
expectedTLSIdentity := &pb.TlsIdentity_DnsLikeIdentity{
|
|
Name: "some-identity",
|
|
}
|
|
|
|
expectedProtocolHint := &pb.ProtocolHint{
|
|
Protocol: &pb.ProtocolHint_H2_{
|
|
H2: &pb.ProtocolHint_H2{},
|
|
},
|
|
}
|
|
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForServices(remoteGateway2))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
|
|
actualTLSIdentity := addrs[0].GetTlsIdentity().GetDnsLikeIdentity()
|
|
if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
|
|
t.Fatalf("TlsIdentity: %v", diff)
|
|
}
|
|
|
|
actualProtocolHint := addrs[0].GetProtocolHint()
|
|
if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
|
|
t.Fatalf("ProtocolHint: %v", diff)
|
|
}
|
|
})
|
|
|
|
t.Run("Sends TlsIdentity and Auth override when present", func(t *testing.T) {
|
|
expectedTLSIdentity := &pb.TlsIdentity_DnsLikeIdentity{
|
|
Name: "some-identity",
|
|
}
|
|
|
|
expectedProtocolHint := &pb.ProtocolHint{
|
|
Protocol: &pb.ProtocolHint_H2_{
|
|
H2: &pb.ProtocolHint_H2{},
|
|
},
|
|
}
|
|
|
|
expectedAuthOverride := &pb.AuthorityOverride{
|
|
AuthorityOverride: "some-auth.com:2",
|
|
}
|
|
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForServices(remoteGatewayAuthOverride))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
|
|
actualTLSIdentity := addrs[0].GetTlsIdentity().GetDnsLikeIdentity()
|
|
if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
|
|
t.Fatalf("TlsIdentity %v", diff)
|
|
}
|
|
|
|
actualProtocolHint := addrs[0].GetProtocolHint()
|
|
if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
|
|
t.Fatalf("ProtocolHint %v", diff)
|
|
}
|
|
|
|
actualAuthOverride := addrs[0].GetAuthorityOverride()
|
|
if diff := deep.Equal(actualAuthOverride, expectedAuthOverride); diff != nil {
|
|
t.Fatalf("AuthOverride %v", diff)
|
|
}
|
|
})
|
|
|
|
t.Run("Does not send TlsIdentity when not present", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForServices(remoteGateway1))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
|
|
if addrs[0].TlsIdentity != nil {
|
|
t.Fatalf("Expected no TlsIdentity to be sent, but got [%v]", addrs[0].TlsIdentity)
|
|
}
|
|
if addrs[0].ProtocolHint != nil {
|
|
t.Fatalf("Expected no ProtocolHint to be sent, but got [%v]", addrs[0].TlsIdentity)
|
|
}
|
|
})
|
|
|
|
}
|
|
|
|
func TestEndpointTranslatorForPods(t *testing.T) {
|
|
t.Run("Sends one update for add and another for remove", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForPods(t, pod1, pod2))
|
|
translator.Remove(mkAddressSetForPods(t, pod2))
|
|
|
|
expectedNumUpdates := 2
|
|
<-mockGetServer.updatesReceived // Add
|
|
<-mockGetServer.updatesReceived // Remove
|
|
|
|
if len(mockGetServer.updatesReceived) != 0 {
|
|
t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
|
|
}
|
|
})
|
|
|
|
t.Run("Sends addresses as removed or added", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForServices(pod1, pod2, pod3))
|
|
translator.Remove(mkAddressSetForServices(pod3))
|
|
|
|
addressesAdded := (<-mockGetServer.updatesReceived).GetAdd().Addrs
|
|
actualNumberOfAdded := len(addressesAdded)
|
|
expectedNumberOfAdded := 3
|
|
if actualNumberOfAdded != expectedNumberOfAdded {
|
|
t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded)
|
|
}
|
|
|
|
addressesRemoved := (<-mockGetServer.updatesReceived).GetRemove().Addrs
|
|
actualNumberOfRemoved := len(addressesRemoved)
|
|
expectedNumberOfRemoved := 1
|
|
if actualNumberOfRemoved != expectedNumberOfRemoved {
|
|
t.Fatalf("Expecting [%d] addresses to be removed, got [%d]: %v", expectedNumberOfRemoved, actualNumberOfRemoved, addressesRemoved)
|
|
}
|
|
|
|
sort.Slice(addressesAdded, func(i, j int) bool {
|
|
return addressesAdded[i].GetAddr().Port < addressesAdded[j].GetAddr().Port
|
|
})
|
|
checkAddressAndWeight(t, addressesAdded[0], pod1, defaultWeight)
|
|
checkAddressAndWeight(t, addressesAdded[1], pod2, defaultWeight)
|
|
checkAddress(t, addressesRemoved[0], pod3)
|
|
})
|
|
|
|
t.Run("Sends addresses with opaque transport", func(t *testing.T) {
|
|
expectedProtocolHint := &pb.ProtocolHint{
|
|
Protocol: &pb.ProtocolHint_H2_{
|
|
H2: &pb.ProtocolHint_H2{},
|
|
},
|
|
OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
|
|
InboundPort: 4143,
|
|
},
|
|
}
|
|
|
|
mockGetServer, translator := makeEndpointTranslatorWithOpaqueTransport(t, true)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForPods(t, pod1, pod2, pod3))
|
|
|
|
addressesAdded := (<-mockGetServer.updatesReceived).GetAdd().Addrs
|
|
actualNumberOfAdded := len(addressesAdded)
|
|
expectedNumberOfAdded := 3
|
|
if actualNumberOfAdded != expectedNumberOfAdded {
|
|
t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded)
|
|
}
|
|
|
|
for i := 0; i < 3; i++ {
|
|
actualProtocolHint := addressesAdded[i].GetProtocolHint()
|
|
if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
|
|
t.Fatalf("ProtocolHint: %v", diff)
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("Sends admin addresses without opaque transport", func(t *testing.T) {
|
|
expectedProtocolHint := &pb.ProtocolHint{
|
|
Protocol: &pb.ProtocolHint_H2_{
|
|
H2: &pb.ProtocolHint_H2{},
|
|
},
|
|
}
|
|
|
|
mockGetServer, translator := makeEndpointTranslatorWithOpaqueTransport(t, true)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForPods(t, podAdmin, podControl))
|
|
|
|
addressesAdded := (<-mockGetServer.updatesReceived).GetAdd().Addrs
|
|
actualNumberOfAdded := len(addressesAdded)
|
|
expectedNumberOfAdded := 2
|
|
if actualNumberOfAdded != expectedNumberOfAdded {
|
|
t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded)
|
|
}
|
|
|
|
for _, addressAdded := range addressesAdded {
|
|
actualProtocolHint := addressAdded.GetProtocolHint()
|
|
if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
|
|
t.Fatalf("ProtocolHint: %v", diff)
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("Sends metric labels with added addresses", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForPods(t, pod1))
|
|
|
|
update := <-mockGetServer.updatesReceived
|
|
|
|
actualGlobalMetricLabels := update.GetAdd().MetricLabels
|
|
expectedGlobalMetricLabels := map[string]string{"namespace": "service-ns", "service": "service-name"}
|
|
if diff := deep.Equal(actualGlobalMetricLabels, expectedGlobalMetricLabels); diff != nil {
|
|
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedGlobalMetricLabels, actualGlobalMetricLabels)
|
|
}
|
|
|
|
actualAddedAddress1MetricLabels := update.GetAdd().Addrs[0].MetricLabels
|
|
expectedAddedAddress1MetricLabels := map[string]string{
|
|
"pod": "pod1",
|
|
"replicationcontroller": "rc-name",
|
|
"serviceaccount": "serviceaccount-name",
|
|
"control_plane_ns": "linkerd",
|
|
"zone": "",
|
|
"zone_locality": "unknown",
|
|
}
|
|
if diff := deep.Equal(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels); diff != nil {
|
|
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels)
|
|
}
|
|
})
|
|
|
|
t.Run("Sends TlsIdentity when enabled", func(t *testing.T) {
|
|
expectedTLSIdentity := &pb.TlsIdentity_DnsLikeIdentity{
|
|
Name: "serviceaccount-name.ns.serviceaccount.identity.linkerd.trust.domain",
|
|
}
|
|
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForPods(t, pod1))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
|
|
actualTLSIdentity := addrs[0].GetTlsIdentity().GetDnsLikeIdentity()
|
|
if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
|
|
t.Fatalf("Expected TlsIdentity to be [%v] but was [%v]", expectedTLSIdentity, actualTLSIdentity)
|
|
}
|
|
})
|
|
|
|
t.Run("Sends Opaque ProtocolHint for opaque ports", func(t *testing.T) {
|
|
expectedProtocolHint := &pb.ProtocolHint{
|
|
Protocol: &pb.ProtocolHint_Opaque_{
|
|
Opaque: &pb.ProtocolHint_Opaque{},
|
|
},
|
|
OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
|
|
InboundPort: 4143,
|
|
},
|
|
}
|
|
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForServices(podOpaque))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
|
|
actualProtocolHint := addrs[0].GetProtocolHint()
|
|
if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
|
|
t.Fatalf("ProtocolHint: %v", diff)
|
|
}
|
|
})
|
|
|
|
t.Run("Sends IPv6 only when pod has both IPv4 and IPv6", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForPods(t, pod1, pod1IPv6))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
if ipPort := addr.ProxyAddressToString(addrs[0].GetAddr()); ipPort != "[2001:db8:85a3::8a2e:370:7333]:1" {
|
|
t.Fatalf("Expected address to be [%s], got [%s]", "[2001:db8:85a3::8a2e:370:7333]:1", ipPort)
|
|
}
|
|
|
|
if updates := len(mockGetServer.updatesReceived); updates > 0 {
|
|
t.Fatalf("Expected to receive no more messages, received [%d]", updates)
|
|
}
|
|
})
|
|
|
|
t.Run("Sends IPv4 only when pod has both IPv4 and IPv6 but the latter in another zone ", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
pod1West1a := pod1
|
|
pod1West1a.ForZones = []v1.ForZone{
|
|
{Name: "west-1a"},
|
|
}
|
|
|
|
pod1IPv6West1b := pod1IPv6
|
|
pod1IPv6West1b.ForZones = []v1.ForZone{
|
|
{Name: "west-1b"},
|
|
}
|
|
|
|
translator.Add(mkAddressSetForPods(t, pod1West1a, pod1IPv6West1b))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
if ipPort := addr.ProxyAddressToString(addrs[0].GetAddr()); ipPort != "1.1.1.1:1" {
|
|
t.Fatalf("Expected address to be [%s], got [%s]", "1.1.1.1:1", ipPort)
|
|
}
|
|
|
|
if updates := len(mockGetServer.updatesReceived); updates > 0 {
|
|
t.Fatalf("Expected to receive no more messages, received [%d]", updates)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestEndpointTranslatorExternalWorkloads(t *testing.T) {
|
|
t.Run("Sends one update for add and another for remove", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForExternalWorkloads(ew1, ew2))
|
|
translator.Remove(mkAddressSetForExternalWorkloads(ew2))
|
|
|
|
expectedNumUpdates := 2
|
|
<-mockGetServer.updatesReceived // Add
|
|
<-mockGetServer.updatesReceived // Remove
|
|
|
|
if len(mockGetServer.updatesReceived) != 0 {
|
|
t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
|
|
}
|
|
})
|
|
|
|
t.Run("Sends addresses as removed or added", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForExternalWorkloads(ew1, ew2, ew3))
|
|
translator.Remove(mkAddressSetForExternalWorkloads(ew3))
|
|
|
|
addressesAdded := (<-mockGetServer.updatesReceived).GetAdd().Addrs
|
|
actualNumberOfAdded := len(addressesAdded)
|
|
expectedNumberOfAdded := 3
|
|
if actualNumberOfAdded != expectedNumberOfAdded {
|
|
t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded)
|
|
}
|
|
|
|
addressesRemoved := (<-mockGetServer.updatesReceived).GetRemove().Addrs
|
|
actualNumberOfRemoved := len(addressesRemoved)
|
|
expectedNumberOfRemoved := 1
|
|
if actualNumberOfRemoved != expectedNumberOfRemoved {
|
|
t.Fatalf("Expecting [%d] addresses to be removed, got [%d]: %v", expectedNumberOfRemoved, actualNumberOfRemoved, addressesRemoved)
|
|
}
|
|
|
|
sort.Slice(addressesAdded, func(i, j int) bool {
|
|
return addressesAdded[i].GetAddr().Port < addressesAdded[j].GetAddr().Port
|
|
})
|
|
checkAddressAndWeight(t, addressesAdded[0], ew1, defaultWeight)
|
|
checkAddressAndWeight(t, addressesAdded[1], ew2, defaultWeight)
|
|
checkAddress(t, addressesRemoved[0], ew3)
|
|
})
|
|
|
|
t.Run("Sends metric labels with added addresses", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForExternalWorkloads(ew1))
|
|
|
|
update := <-mockGetServer.updatesReceived
|
|
|
|
actualGlobalMetricLabels := update.GetAdd().MetricLabels
|
|
expectedGlobalMetricLabels := map[string]string{"namespace": "service-ns", "service": "service-name"}
|
|
if diff := deep.Equal(actualGlobalMetricLabels, expectedGlobalMetricLabels); diff != nil {
|
|
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedGlobalMetricLabels, actualGlobalMetricLabels)
|
|
}
|
|
|
|
actualAddedAddress1MetricLabels := update.GetAdd().Addrs[0].MetricLabels
|
|
expectedAddedAddress1MetricLabels := map[string]string{
|
|
"external_workload": "ew-1",
|
|
"zone": "",
|
|
"zone_locality": "unknown",
|
|
"workloadgroup": "wg-name",
|
|
}
|
|
if diff := deep.Equal(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels); diff != nil {
|
|
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels)
|
|
}
|
|
})
|
|
|
|
t.Run("Sends TlsIdentity and Server Name when enabled", func(t *testing.T) {
|
|
expectedTLSIdentity := &pb.TlsIdentity{
|
|
Strategy: &pb.TlsIdentity_UriLikeIdentity_{
|
|
UriLikeIdentity: &pb.TlsIdentity_UriLikeIdentity{
|
|
Uri: "spiffe://some-domain/ew-1",
|
|
},
|
|
},
|
|
ServerName: &pb.TlsIdentity_DnsLikeIdentity{
|
|
Name: "server.local",
|
|
},
|
|
}
|
|
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForExternalWorkloads(ew1))
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
|
|
actualTLSIdentity := addrs[0].GetTlsIdentity()
|
|
if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
|
|
t.Fatalf("Expected TlsIdentity to be [%v] but was [%v]", expectedTLSIdentity, actualTLSIdentity)
|
|
}
|
|
})
|
|
|
|
t.Run("Sends Opaque ProtocolHint for opaque ports", func(t *testing.T) {
|
|
expectedProtocolHint := &pb.ProtocolHint{
|
|
Protocol: &pb.ProtocolHint_Opaque_{
|
|
Opaque: &pb.ProtocolHint_Opaque{},
|
|
},
|
|
OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
|
|
InboundPort: 4143,
|
|
},
|
|
}
|
|
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForExternalWorkloads(ewOpaque))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
|
|
actualProtocolHint := addrs[0].GetProtocolHint()
|
|
if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
|
|
t.Fatalf("ProtocolHint: %v", diff)
|
|
}
|
|
})
|
|
|
|
t.Run("(forced opaque transport): Works with default proxy port when port missing from definition", func(t *testing.T) {
|
|
expectedProtocolHint := &pb.ProtocolHint{
|
|
Protocol: &pb.ProtocolHint_H2_{
|
|
H2: &pb.ProtocolHint_H2{},
|
|
},
|
|
OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
|
|
InboundPort: 4143,
|
|
},
|
|
}
|
|
|
|
mockGetServer, translator := makeEndpointTranslatorWithOpaqueTransport(t, true)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForExternalWorkloads(ewNoProxyPort))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
|
|
actualProtocolHint := addrs[0].GetProtocolHint()
|
|
if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
|
|
t.Fatalf("ProtocolHint: %v", diff)
|
|
}
|
|
})
|
|
|
|
t.Run("(forced opaque transport): Works with override proxy port", func(t *testing.T) {
|
|
expectedProtocolHint := &pb.ProtocolHint{
|
|
Protocol: &pb.ProtocolHint_H2_{
|
|
H2: &pb.ProtocolHint_H2{},
|
|
},
|
|
OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
|
|
InboundPort: 1111,
|
|
},
|
|
}
|
|
|
|
mockGetServer, translator := makeEndpointTranslatorWithOpaqueTransport(t, true)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForExternalWorkloads(ewOverrideProxyPort))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 1 {
|
|
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
|
}
|
|
|
|
actualProtocolHint := addrs[0].GetProtocolHint()
|
|
if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
|
|
t.Fatalf("ProtocolHint: %v", diff)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestEndpointTranslatorTopologyAwareFilter(t *testing.T) {
|
|
t.Run("Sends one update for add and none for remove", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForServices(west1aAddress, west1bAddress))
|
|
translator.Remove(mkAddressSetForServices(west1bAddress))
|
|
|
|
// Only the address meant for west-1a should be added, which means
|
|
// that when we try to remove the address meant for west-1b there
|
|
// should be no remove update.
|
|
expectedNumUpdates := 1
|
|
<-mockGetServer.updatesReceived // Add
|
|
|
|
if len(mockGetServer.updatesReceived) != 0 {
|
|
t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestEndpointTranslatorExperimentalZoneWeights(t *testing.T) {
|
|
zoneA := "west-1a"
|
|
zoneB := "west-1b"
|
|
addrA := watcher.Address{
|
|
IP: "7.9.7.9",
|
|
Port: 7979,
|
|
Zone: &zoneA,
|
|
}
|
|
addrB := watcher.Address{
|
|
IP: "9.7.9.7",
|
|
Port: 9797,
|
|
Zone: &zoneB,
|
|
}
|
|
|
|
t.Run("Disabled", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.extEndpointZoneWeights = false
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForServices(addrA, addrB))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 2 {
|
|
t.Fatalf("Expected [2] addresses returned, got %v", addrs)
|
|
}
|
|
sort.Slice(addrs, func(i, j int) bool {
|
|
return addrs[i].GetAddr().Port < addrs[j].GetAddr().Port
|
|
})
|
|
checkAddressAndWeight(t, addrs[0], addrA, defaultWeight)
|
|
checkAddressAndWeight(t, addrs[1], addrB, defaultWeight)
|
|
})
|
|
|
|
t.Run("Applies weights", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.extEndpointZoneWeights = true
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
translator.Add(mkAddressSetForServices(addrA, addrB))
|
|
|
|
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
|
|
if len(addrs) != 2 {
|
|
t.Fatalf("Expected [2] addresses returned, got %v", addrs)
|
|
}
|
|
sort.Slice(addrs, func(i, j int) bool {
|
|
return addrs[i].GetAddr().Port < addrs[j].GetAddr().Port
|
|
})
|
|
checkAddressAndWeight(t, addrs[0], addrA, defaultWeight*10)
|
|
checkAddressAndWeight(t, addrs[1], addrB, defaultWeight)
|
|
})
|
|
}
|
|
|
|
func TestEndpointTranslatorForLocalTrafficPolicy(t *testing.T) {
|
|
t.Run("Sends one update for add and none for remove", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
addressSet := mkAddressSetForServices(AddressOnTest123Node, AddressNotOnTest123Node)
|
|
addressSet.LocalTrafficPolicy = true
|
|
translator.Add(addressSet)
|
|
translator.Remove(mkAddressSetForServices(AddressNotOnTest123Node))
|
|
|
|
// Only the address meant for AddressOnTest123Node should be added, which means
|
|
// that when we try to remove the address meant for AddressNotOnTest123Node there
|
|
// should be no remove update.
|
|
expectedNumUpdates := 1
|
|
<-mockGetServer.updatesReceived // Add
|
|
|
|
if len(mockGetServer.updatesReceived) != 0 {
|
|
t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
|
|
}
|
|
})
|
|
|
|
t.Run("Removes cannot change LocalTrafficPolicy", func(t *testing.T) {
|
|
mockGetServer, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
addressSet := mkAddressSetForServices(AddressOnTest123Node, AddressNotOnTest123Node)
|
|
addressSet.LocalTrafficPolicy = true
|
|
translator.Add(addressSet)
|
|
set := watcher.AddressSet{
|
|
Addresses: make(map[watcher.ServiceID]watcher.Address),
|
|
Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
|
|
LocalTrafficPolicy: false,
|
|
}
|
|
translator.Remove(set)
|
|
|
|
// Only the address meant for AddressOnTest123Node should be added.
|
|
// The remove with no addresses should not change the LocalTrafficPolicy
|
|
// and should be a noop that does not send an update.
|
|
expectedNumUpdates := 1
|
|
<-mockGetServer.updatesReceived // Add
|
|
|
|
if len(mockGetServer.updatesReceived) != 0 {
|
|
t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
|
|
}
|
|
})
|
|
}
|
|
|
|
// TestConcurrency, to be triggered with `go test -race`, shouldn't report a race condition
|
|
func TestConcurrency(t *testing.T) {
|
|
_, translator := makeEndpointTranslator(t)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
translator.Add(mkAddressSetForServices(west1aAddress, west1bAddress))
|
|
translator.Remove(mkAddressSetForServices(west1bAddress))
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestGetInboundPort(t *testing.T) {
|
|
podSpec := &corev1.PodSpec{
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: k8s.ProxyContainerName,
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: envInboundListenAddr,
|
|
Value: "1.2.3.4:8080",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
port, err := getInboundPort(podSpec)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %s", err)
|
|
}
|
|
if port != 8080 {
|
|
t.Fatalf("Expecting port [%d], got [%d]", 8080, port)
|
|
}
|
|
|
|
podSpec.Containers[0].Env[0].Value = "[2001:db8::94]:8080"
|
|
port, err = getInboundPort(podSpec)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %s", err)
|
|
}
|
|
if port != 8080 {
|
|
t.Fatalf("Expecting port [%d], got [%d]", 8080, port)
|
|
}
|
|
}
|
|
|
|
func mkAddressSetForServices(gatewayAddresses ...watcher.Address) watcher.AddressSet {
|
|
set := watcher.AddressSet{
|
|
Addresses: make(map[watcher.ServiceID]watcher.Address),
|
|
Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
|
|
}
|
|
for _, a := range gatewayAddresses {
|
|
a := a // pin
|
|
|
|
id := watcher.ServiceID{
|
|
Name: strings.Join([]string{
|
|
a.IP,
|
|
fmt.Sprint(a.Port),
|
|
}, "-"),
|
|
}
|
|
set.Addresses[id] = a
|
|
}
|
|
return set
|
|
}
|
|
|
|
func mkAddressSetForPods(t *testing.T, podAddresses ...watcher.Address) watcher.AddressSet {
|
|
t.Helper()
|
|
|
|
set := watcher.AddressSet{
|
|
Addresses: make(map[watcher.PodID]watcher.Address),
|
|
Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
|
|
}
|
|
for _, p := range podAddresses {
|
|
// The IP family is set on the PodID used to index the
|
|
// watcher.Address; here we simply detect it
|
|
fam := corev1.IPv4Protocol
|
|
addr, err := netip.ParseAddr(p.IP)
|
|
if err != nil {
|
|
t.Fatalf("Invalid IP '%s': %s", p.IP, err)
|
|
}
|
|
if addr.Is6() {
|
|
fam = corev1.IPv6Protocol
|
|
}
|
|
|
|
id := watcher.PodID{
|
|
Name: p.Pod.Name,
|
|
Namespace: p.Pod.Namespace,
|
|
IPFamily: fam,
|
|
}
|
|
set.Addresses[id] = p
|
|
}
|
|
return set
|
|
}
|
|
|
|
func mkAddressSetForExternalWorkloads(ewAddresses ...watcher.Address) watcher.AddressSet {
|
|
set := watcher.AddressSet{
|
|
Addresses: make(map[watcher.PodID]watcher.Address),
|
|
Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
|
|
}
|
|
for _, ew := range ewAddresses {
|
|
id := watcher.ExternalWorkloadID{Name: ew.ExternalWorkload.Name, Namespace: ew.ExternalWorkload.Namespace}
|
|
set.Addresses[id] = ew
|
|
}
|
|
return set
|
|
}
|
|
|
|
func checkAddressAndWeight(t *testing.T, actual *pb.WeightedAddr, expected watcher.Address, weight uint32) {
|
|
t.Helper()
|
|
|
|
checkAddress(t, actual.GetAddr(), expected)
|
|
if actual.GetWeight() != weight {
|
|
t.Fatalf("Expected weight [%+v] but got [%+v]", weight, actual.GetWeight())
|
|
}
|
|
}
|
|
|
|
func checkAddress(t *testing.T, actual *net.TcpAddress, expected watcher.Address) {
|
|
t.Helper()
|
|
|
|
expectedAddr, err := addr.ParseProxyIP(expected.IP)
|
|
expectedTCP := net.TcpAddress{
|
|
Ip: expectedAddr,
|
|
Port: expected.Port,
|
|
}
|
|
if err != nil {
|
|
t.Fatalf("Failed to parse expected IP [%s]: %s", expected.IP, err)
|
|
}
|
|
if actual.Ip.GetIpv4() == 0 && actual.Ip.GetIpv6() == nil {
|
|
t.Fatal("Actual IP is empty")
|
|
}
|
|
if actual.Ip.GetIpv4() != expectedTCP.Ip.GetIpv4() {
|
|
t.Fatalf("Expected IPv4 [%+v] but got [%+v]", expectedTCP.Ip, actual.Ip)
|
|
}
|
|
if !proto.Equal(actual.Ip.GetIpv6(), expectedTCP.Ip.GetIpv6()) {
|
|
t.Fatalf("Expected IPv6 [%+v] but got [%+v]", expectedTCP.Ip, actual.Ip)
|
|
}
|
|
if actual.Port != expectedTCP.Port {
|
|
t.Fatalf("Expected port [%+v] but got [%+v]", expectedTCP.Port, actual.Port)
|
|
}
|
|
}
|