mirror of https://github.com/linkerd/linkerd2.git
Extend unit test for HostPort subscriptions (#11439)
Followup to https://github.com/linkerd/linkerd2/pull/11334#issuecomment-1736093592 This extends the test introduced in #11334 to excercise upgrading a Server associated to a pod's HostPort, and observing how the stream updates the OpaqueProtocol field. Helper functions were refactored a bit to allow retrieving the l5dCRDClientSet used when building the fake API.
This commit is contained in:
parent
30ecb57ff1
commit
c67985def0
|
@ -10,12 +10,15 @@ import (
|
|||
"github.com/linkerd/linkerd2-proxy-api/go/net"
|
||||
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
||||
"github.com/linkerd/linkerd2/controller/api/util"
|
||||
"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
|
||||
"github.com/linkerd/linkerd2/controller/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/addr"
|
||||
pkgk8s "github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/linkerd/linkerd2/testutil"
|
||||
logging "github.com/sirupsen/logrus"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
)
|
||||
|
||||
const fullyQualifiedName = "name1.ns.svc.mycluster.local"
|
||||
|
@ -170,7 +173,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Returns server profile", func(t *testing.T) {
|
||||
stream, server := profileStream(t, fullyQualifiedName, port, "ns:other")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, fullyQualifiedName, port, "ns:other")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if profile.FullyQualifiedName != fullyQualifiedName {
|
||||
|
@ -189,7 +193,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return service profile when using json token", func(t *testing.T) {
|
||||
stream, server := profileStream(t, fullyQualifiedName, port, `{"ns":"other"}`)
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"other"}`)
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if profile.FullyQualifiedName != fullyQualifiedName {
|
||||
|
@ -204,7 +209,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Returns client profile", func(t *testing.T) {
|
||||
stream, server := profileStream(t, fullyQualifiedName, port, `{"ns":"client-ns"}`)
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"client-ns"}`)
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
routes := profile.GetRoutes()
|
||||
|
@ -219,7 +225,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return profile when using cluster IP", func(t *testing.T) {
|
||||
stream, server := profileStream(t, clusterIP, port, "")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, clusterIP, port, "")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if profile.FullyQualifiedName != fullyQualifiedName {
|
||||
|
@ -237,7 +244,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return profile with endpoint when using pod DNS", func(t *testing.T) {
|
||||
stream, server := profileStream(t, fullyQualifiedPodDNS, port, "ns:ns")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, fullyQualifiedPodDNS, port, "ns:ns")
|
||||
defer stream.Cancel()
|
||||
|
||||
epAddr, err := toAddress(podIPStatefulSet, port)
|
||||
|
@ -277,7 +285,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return profile with endpoint when using pod IP", func(t *testing.T) {
|
||||
stream, server := profileStream(t, podIP1, port, "ns:ns")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, podIP1, port, "ns:ns")
|
||||
defer stream.Cancel()
|
||||
|
||||
epAddr, err := toAddress(podIP1, port)
|
||||
|
@ -317,7 +326,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) {
|
||||
stream, server := profileStream(t, "172.0.0.0", 1234, "")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, "172.0.0.0", 1234, "")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if profile.RetryBudget == nil {
|
||||
|
@ -328,7 +338,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return profile with no protocol hint when pod does not have label", func(t *testing.T) {
|
||||
stream, server := profileStream(t, podIP2, port, "")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, podIP2, port, "")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if profile.Endpoint == nil {
|
||||
|
@ -342,7 +353,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return non-opaque protocol profile when using cluster IP and opaque protocol port", func(t *testing.T) {
|
||||
stream, server := profileStream(t, clusterIPOpaque, opaquePort, "")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, clusterIPOpaque, opaquePort, "")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if profile.FullyQualifiedName != fullyQualifiedNameOpaque {
|
||||
|
@ -356,7 +368,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return opaque protocol profile with endpoint when using pod IP and opaque protocol port", func(t *testing.T) {
|
||||
stream, server := profileStream(t, podIPOpaque, opaquePort, "")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, podIPOpaque, opaquePort, "")
|
||||
defer stream.Cancel()
|
||||
|
||||
epAddr, err := toAddress(podIPOpaque, opaquePort)
|
||||
|
@ -396,7 +409,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return opaque protocol profile when using service name with opaque port annotation", func(t *testing.T) {
|
||||
stream, server := profileStream(t, fullyQualifiedNameOpaqueService, opaquePort, "")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, fullyQualifiedNameOpaqueService, opaquePort, "")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if profile.FullyQualifiedName != fullyQualifiedNameOpaqueService {
|
||||
|
@ -410,7 +424,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return profile with unknown protocol hint and identity when pod contains skipped inbound port", func(t *testing.T) {
|
||||
stream, server := profileStream(t, podIPSkipped, skippedPort, "")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, podIPSkipped, skippedPort, "")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
addr := profile.GetEndpoint()
|
||||
|
@ -428,7 +443,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return profile with opaque protocol when using Pod IP selected by a Server", func(t *testing.T) {
|
||||
stream, server := profileStream(t, podIPPolicy, 80, "")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, podIPPolicy, 80, "")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if profile.Endpoint == nil {
|
||||
|
@ -448,7 +464,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return profile with opaque protocol when using an opaque port with an external IP", func(t *testing.T) {
|
||||
stream, server := profileStream(t, externalIP, 3306, "")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, externalIP, 3306, "")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if !profile.OpaqueProtocol {
|
||||
|
@ -459,7 +476,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return profile with non-opaque protocol when using an arbitrary port with an external IP", func(t *testing.T) {
|
||||
stream, server := profileStream(t, externalIP, 80, "")
|
||||
server := makeServer(t)
|
||||
stream := profileStream(t, server, externalIP, 80, "")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if profile.OpaqueProtocol {
|
||||
|
@ -472,7 +490,8 @@ func TestGetProfiles(t *testing.T) {
|
|||
t.Run("Return profile for host port pods", func(t *testing.T) {
|
||||
hostPort := uint32(7777)
|
||||
containerPort := uint32(80)
|
||||
stream, server := profileStream(t, externalIP, hostPort, "")
|
||||
server, l5dClient := getServerWithClient(t)
|
||||
stream := profileStream(t, server, externalIP, hostPort, "")
|
||||
defer stream.Cancel()
|
||||
|
||||
// HostPort maps to pod.
|
||||
|
@ -517,9 +536,21 @@ func TestGetProfiles(t *testing.T) {
|
|||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "hostport-mapping-2",
|
||||
Namespace: "ns",
|
||||
Labels: map[string]string{
|
||||
"app": "hostport-mapping-2",
|
||||
},
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
{
|
||||
Name: pkgk8s.ProxyContainerName,
|
||||
Env: []corev1.EnvVar{
|
||||
{
|
||||
Name: "LINKERD2_PROXY_INBOUND_LISTEN_ADDR",
|
||||
Value: "0.0.0.0:4143",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "nginx",
|
||||
Image: "nginx",
|
||||
|
@ -566,6 +597,46 @@ func TestGetProfiles(t *testing.T) {
|
|||
if dstPod != "hostport-mapping-2" {
|
||||
t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping-2", dstPod)
|
||||
}
|
||||
if profile.OpaqueProtocol {
|
||||
t.Fatal("Expected OpaqueProtocol=false")
|
||||
}
|
||||
|
||||
// Server is created, setting the port to opaque
|
||||
(*l5dClient).ServerV1beta1().Servers("ns").Create(context.Background(), &v1beta1.Server{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "srv-hostport-mapping-2",
|
||||
Namespace: "ns",
|
||||
},
|
||||
Spec: v1beta1.ServerSpec{
|
||||
PodSelector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{
|
||||
"app": "hostport-mapping-2",
|
||||
},
|
||||
},
|
||||
Port: intstr.IntOrString{
|
||||
Type: intstr.String,
|
||||
StrVal: "nginx-7777",
|
||||
},
|
||||
ProxyProtocol: "opaque",
|
||||
},
|
||||
}, metav1.CreateOptions{})
|
||||
|
||||
var updates []*pb.DestinationProfile
|
||||
err = testutil.RetryFor(time.Second*10, func() error {
|
||||
updates = stream.Updates()
|
||||
if len(updates) < 4 {
|
||||
return fmt.Errorf("expected 4 updates, got %d", len(updates))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
profile = stream.Updates()[3]
|
||||
if !profile.OpaqueProtocol {
|
||||
t.Fatal("Expected OpaqueProtocol=true")
|
||||
}
|
||||
|
||||
server.clusterStore.UnregisterGauges()
|
||||
})
|
||||
|
@ -713,10 +784,9 @@ func assertSingleUpdate(t *testing.T, updates []*pb.Update) *pb.Update {
|
|||
return updates[0]
|
||||
}
|
||||
|
||||
func profileStream(t *testing.T, host string, port uint32, token string) (*bufferingGetProfileStream, *server) {
|
||||
func profileStream(t *testing.T, server *server, host string, port uint32, token string) *bufferingGetProfileStream {
|
||||
t.Helper()
|
||||
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
|
@ -735,5 +805,5 @@ func profileStream(t *testing.T, host string, port uint32, token string) (*buffe
|
|||
// Give GetProfile some slack
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
return stream, server
|
||||
return stream
|
||||
}
|
||||
|
|
|
@ -7,11 +7,17 @@ import (
|
|||
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
|
||||
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
||||
"github.com/linkerd/linkerd2/controller/api/util"
|
||||
l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
|
||||
"github.com/linkerd/linkerd2/controller/k8s"
|
||||
logging "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func makeServer(t *testing.T) *server {
|
||||
srv, _ := getServerWithClient(t)
|
||||
return srv
|
||||
}
|
||||
|
||||
func getServerWithClient(t *testing.T) (*server, *l5dcrdclient.Interface) {
|
||||
meshedPodResources := []string{`
|
||||
apiVersion: v1
|
||||
kind: Namespace
|
||||
|
@ -445,9 +451,9 @@ spec:
|
|||
res = append(res, hostPortMapping...)
|
||||
res = append(res, mirrorServiceResources...)
|
||||
res = append(res, destinationCredentialsResources...)
|
||||
k8sAPI, err := k8s.NewFakeAPI(res...)
|
||||
k8sAPI, l5dClient, err := k8s.NewFakeAPIWithL5dClient(res...)
|
||||
if err != nil {
|
||||
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
||||
t.Fatalf("NewFakeAPIWithL5dClient returned an error: %s", err)
|
||||
}
|
||||
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
||||
if err != nil {
|
||||
|
@ -513,7 +519,7 @@ spec:
|
|||
metadataAPI,
|
||||
log,
|
||||
make(<-chan struct{}),
|
||||
}
|
||||
}, l5dClient
|
||||
}
|
||||
|
||||
type bufferingGetStream struct {
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package k8s
|
||||
|
||||
import (
|
||||
l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
|
||||
"github.com/linkerd/linkerd2/pkg/k8s"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/metadata/fake"
|
||||
)
|
||||
|
@ -16,10 +18,26 @@ func NewFakeAPI(configs ...string) (*API, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return NewFakeClusterScopedAPI(clientSet, spClientSet), nil
|
||||
}
|
||||
|
||||
// NewFakeAPIWithL5dClient provides a mock Kubernetes API for testing like
|
||||
// NewFakeAPI, but it also returns the mock client for linkerd CRDs
|
||||
func NewFakeAPIWithL5dClient(configs ...string) (*API, *l5dcrdclient.Interface, error) {
|
||||
clientSet, _, _, l5dClientSet, err := k8s.NewFakeClientSets(configs...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return NewFakeClusterScopedAPI(clientSet, l5dClientSet), &l5dClientSet, nil
|
||||
}
|
||||
|
||||
// NewFakeClusterScopedAPI provides a mock Kubernetes API for testing.
|
||||
func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrdclient.Interface) *API {
|
||||
return NewClusterScopedAPI(
|
||||
clientSet,
|
||||
nil,
|
||||
spClientSet,
|
||||
l5dClientSet,
|
||||
"fake",
|
||||
CJ,
|
||||
CM,
|
||||
|
@ -39,7 +57,7 @@ func NewFakeAPI(configs ...string) (*API, error) {
|
|||
ES,
|
||||
Srv,
|
||||
Secret,
|
||||
), nil
|
||||
)
|
||||
}
|
||||
|
||||
// NewFakeMetadataAPI provides a mock Kubernetes API for testing.
|
||||
|
|
Loading…
Reference in New Issue