mirror of https://github.com/linkerd/linkerd2.git
393 lines
9.9 KiB
Go
393 lines
9.9 KiB
Go
package destination
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"slices"
|
|
"testing"
|
|
"time"
|
|
|
|
logging "github.com/sirupsen/logrus"
|
|
|
|
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
|
|
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
|
"github.com/linkerd/linkerd2/controller/k8s"
|
|
"github.com/linkerd/linkerd2/pkg/addr"
|
|
"github.com/linkerd/linkerd2/testutil"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
func TestFederatedService(t *testing.T) {
|
|
fsw, err := mockFederatedServiceWatcher(t)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
|
|
|
|
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
|
|
|
|
updates := []*pb.Update{}
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
|
|
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
|
|
}
|
|
|
|
func TestRemoteJoinFederatedService(t *testing.T) {
|
|
fsw, err := mockFederatedServiceWatcher(t)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
|
|
|
|
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
|
|
|
|
updates := []*pb.Update{}
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
|
|
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
|
|
|
|
federatedSvc, err := fsw.k8sAPI.Svc().Lister().Services("test").Get("bb-federated")
|
|
if err != nil {
|
|
t.Fatalf("error getting federated service: %s", err)
|
|
}
|
|
newFederatedSvc := federatedSvc.DeepCopy()
|
|
newFederatedSvc.Annotations["multicluster.linkerd.io/remote-discovery"] = "bb@east,bb@north"
|
|
fsw.updateService(federatedSvc, newFederatedSvc)
|
|
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
assertUpdatesContains(t, updates, "bb-north-1", "172.17.2.1:8080")
|
|
}
|
|
|
|
func TestRemoteLeaveFederatedService(t *testing.T) {
|
|
fsw, err := mockFederatedServiceWatcher(t)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
|
|
|
|
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
|
|
|
|
updates := []*pb.Update{}
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
|
|
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
|
|
|
|
federatedSvc, err := fsw.k8sAPI.Svc().Lister().Services("test").Get("bb-federated")
|
|
if err != nil {
|
|
t.Fatalf("error getting federated service: %s", err)
|
|
}
|
|
newFederatedSvc := federatedSvc.DeepCopy()
|
|
delete(newFederatedSvc.Annotations, "multicluster.linkerd.io/remote-discovery")
|
|
fsw.updateService(federatedSvc, newFederatedSvc)
|
|
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
assertUpdatesRemoves(t, updates, "172.17.1.1:8080")
|
|
}
|
|
|
|
func TestLocalLeaveFederatedService(t *testing.T) {
|
|
fsw, err := mockFederatedServiceWatcher(t)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
|
|
|
|
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
|
|
|
|
updates := []*pb.Update{}
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
|
|
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
|
|
|
|
federatedSvc, err := fsw.k8sAPI.Svc().Lister().Services("test").Get("bb-federated")
|
|
if err != nil {
|
|
t.Fatalf("error getting federated service: %s", err)
|
|
}
|
|
newFederatedSvc := federatedSvc.DeepCopy()
|
|
delete(newFederatedSvc.Annotations, "multicluster.linkerd.io/local-discovery")
|
|
fsw.updateService(federatedSvc, newFederatedSvc)
|
|
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
assertUpdatesRemoves(t, updates, "172.17.0.1:8080")
|
|
|
|
federatedSvc = newFederatedSvc
|
|
newFederatedSvc = federatedSvc.DeepCopy()
|
|
newFederatedSvc.Annotations["multicluster.linkerd.io/local-discovery"] = "bb"
|
|
fsw.updateService(federatedSvc, newFederatedSvc)
|
|
|
|
updates = append(updates, <-mockGetServer.updatesReceived)
|
|
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
|
|
}
|
|
|
|
func mockFederatedServiceWatcher(t *testing.T) (*federatedServiceWatcher, error) {
|
|
k8sAPI, err := k8s.NewFakeAPI(westConfigs...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("NewFakeAPI returned an error: %w", err)
|
|
}
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("NewFakeMetadataAPI returned an error: %w", err)
|
|
}
|
|
localEndpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("NewEndpointsWatcher returned an error: %w", err)
|
|
}
|
|
|
|
prom := prometheus.NewRegistry()
|
|
clusterStore, err := watcher.NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", false,
|
|
watcher.CreateMulticlusterDecoder(map[string][]string{
|
|
"east": eastConfigs,
|
|
"north": northConfigs,
|
|
}),
|
|
prom,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("NewClusterStoreWithDecoder returned an error: %w", err)
|
|
}
|
|
fsw, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{}, clusterStore, localEndpoints, logging.WithField("test", t.Name()))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("newFederatedServiceWatcher returned an error: %w", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
clusterStore.Sync(nil)
|
|
|
|
// Wait for the cluster store to be populated with the remote clusters.
|
|
err = testutil.RetryFor(30*time.Second, func() error {
|
|
if _, _, found := clusterStore.Get("east"); !found {
|
|
return errors.New("east cluster not found in cluster store")
|
|
}
|
|
if _, _, found := clusterStore.Get("north"); !found {
|
|
return errors.New("north cluster not found in cluster store")
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("timed out waiting for cluster store to be populated: %w", err)
|
|
}
|
|
|
|
return fsw, nil
|
|
}
|
|
|
|
func assertUpdatesContains(t *testing.T, updates []*pb.Update, pod, address string) {
|
|
t.Helper()
|
|
if !slices.ContainsFunc[[]*pb.Update, *pb.Update](updates, func(u *pb.Update) bool {
|
|
if u.GetAdd() == nil || len(u.GetAdd().GetAddrs()) == 0 {
|
|
return false
|
|
}
|
|
endpoint := u.GetAdd().GetAddrs()[0]
|
|
return addr.ProxyAddressToString(endpoint.GetAddr()) == address && endpoint.MetricLabels["pod"] == pod
|
|
}) {
|
|
t.Errorf("expected updates to contain pod %s with address %s", pod, address)
|
|
}
|
|
}
|
|
|
|
func assertUpdatesRemoves(t *testing.T, updates []*pb.Update, address string) {
|
|
t.Helper()
|
|
if !slices.ContainsFunc[[]*pb.Update, *pb.Update](updates, func(u *pb.Update) bool {
|
|
if u.GetRemove() == nil || len(u.GetRemove().GetAddrs()) == 0 {
|
|
return false
|
|
}
|
|
endpoint := u.GetRemove().GetAddrs()[0]
|
|
return addr.ProxyAddressToString(endpoint) == address
|
|
}) {
|
|
t.Errorf("expected updates to contain remove of address %s", address)
|
|
}
|
|
}
|
|
|
|
var (
|
|
westConfigs = []string{
|
|
`
|
|
apiVersion: v1
|
|
kind: Namespace
|
|
metadata:
|
|
name: linkerd`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Secret
|
|
type: mirror.linkerd.io/remote-kubeconfig
|
|
metadata:
|
|
namespace: linkerd
|
|
name: east-cluster-credentials
|
|
labels:
|
|
multicluster.linkerd.io/cluster-name: east
|
|
annotations:
|
|
multicluster.linkerd.io/trust-domain: cluster.local
|
|
multicluster.linkerd.io/cluster-domain: cluster.local
|
|
data:
|
|
kubeconfig: ZWFzdAo= # east
|
|
`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Secret
|
|
type: mirror.linkerd.io/remote-kubeconfig
|
|
metadata:
|
|
namespace: linkerd
|
|
name: north-cluster-credentials
|
|
labels:
|
|
multicluster.linkerd.io/cluster-name: north
|
|
annotations:
|
|
multicluster.linkerd.io/trust-domain: cluster.local
|
|
multicluster.linkerd.io/cluster-domain: cluster.local
|
|
data:
|
|
kubeconfig: bm9ydGgK # north
|
|
`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Namespace
|
|
metadata:
|
|
name: test`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: bb-federated
|
|
namespace: test
|
|
annotations:
|
|
multicluster.linkerd.io/remote-discovery: bb@east
|
|
multicluster.linkerd.io/local-discovery: bb
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8080`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: bb
|
|
namespace: test
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8080`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: bb
|
|
namespace: test
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.1
|
|
targetRef:
|
|
kind: Pod
|
|
name: bb-west-1
|
|
namespace: test
|
|
ports:
|
|
- port: 8080
|
|
`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: bb-west-1
|
|
namespace: test
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: bb-west
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.1`,
|
|
}
|
|
|
|
eastConfigs = []string{
|
|
`
|
|
apiVersion: v1
|
|
kind: Namespace
|
|
metadata:
|
|
name: test`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: bb
|
|
namespace: test
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8080`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: bb
|
|
namespace: test
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.1.1
|
|
targetRef:
|
|
kind: Pod
|
|
name: bb-east-1
|
|
namespace: test
|
|
ports:
|
|
- port: 8080
|
|
`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: bb-east-1
|
|
namespace: test
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: bb-east
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.1.1`,
|
|
}
|
|
|
|
northConfigs = []string{
|
|
`
|
|
apiVersion: v1
|
|
kind: Namespace
|
|
metadata:
|
|
name: test`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: bb
|
|
namespace: test
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8080`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: bb
|
|
namespace: test
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.2.1
|
|
targetRef:
|
|
kind: Pod
|
|
name: bb-north-1
|
|
namespace: test
|
|
ports:
|
|
- port: 8080
|
|
`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: bb-north-1
|
|
namespace: test
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: bb-north
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.2.1`,
|
|
}
|
|
)
|