linkerd2/controller/api/destination/federated_service_watcher_t...

393 lines
9.9 KiB
Go

package destination
import (
"errors"
"fmt"
"slices"
"testing"
"time"
logging "github.com/sirupsen/logrus"
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/addr"
"github.com/linkerd/linkerd2/testutil"
"github.com/prometheus/client_golang/prometheus"
)
func TestFederatedService(t *testing.T) {
fsw, err := mockFederatedServiceWatcher(t)
if err != nil {
t.Fatal(err)
}
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
updates := []*pb.Update{}
updates = append(updates, <-mockGetServer.updatesReceived)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
}
func TestRemoteJoinFederatedService(t *testing.T) {
fsw, err := mockFederatedServiceWatcher(t)
if err != nil {
t.Fatal(err)
}
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
updates := []*pb.Update{}
updates = append(updates, <-mockGetServer.updatesReceived)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
federatedSvc, err := fsw.k8sAPI.Svc().Lister().Services("test").Get("bb-federated")
if err != nil {
t.Fatalf("error getting federated service: %s", err)
}
newFederatedSvc := federatedSvc.DeepCopy()
newFederatedSvc.Annotations["multicluster.linkerd.io/remote-discovery"] = "bb@east,bb@north"
fsw.updateService(federatedSvc, newFederatedSvc)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-north-1", "172.17.2.1:8080")
}
func TestRemoteLeaveFederatedService(t *testing.T) {
fsw, err := mockFederatedServiceWatcher(t)
if err != nil {
t.Fatal(err)
}
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
updates := []*pb.Update{}
updates = append(updates, <-mockGetServer.updatesReceived)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
federatedSvc, err := fsw.k8sAPI.Svc().Lister().Services("test").Get("bb-federated")
if err != nil {
t.Fatalf("error getting federated service: %s", err)
}
newFederatedSvc := federatedSvc.DeepCopy()
delete(newFederatedSvc.Annotations, "multicluster.linkerd.io/remote-discovery")
fsw.updateService(federatedSvc, newFederatedSvc)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesRemoves(t, updates, "172.17.1.1:8080")
}
func TestLocalLeaveFederatedService(t *testing.T) {
fsw, err := mockFederatedServiceWatcher(t)
if err != nil {
t.Fatal(err)
}
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
updates := []*pb.Update{}
updates = append(updates, <-mockGetServer.updatesReceived)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
federatedSvc, err := fsw.k8sAPI.Svc().Lister().Services("test").Get("bb-federated")
if err != nil {
t.Fatalf("error getting federated service: %s", err)
}
newFederatedSvc := federatedSvc.DeepCopy()
delete(newFederatedSvc.Annotations, "multicluster.linkerd.io/local-discovery")
fsw.updateService(federatedSvc, newFederatedSvc)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesRemoves(t, updates, "172.17.0.1:8080")
federatedSvc = newFederatedSvc
newFederatedSvc = federatedSvc.DeepCopy()
newFederatedSvc.Annotations["multicluster.linkerd.io/local-discovery"] = "bb"
fsw.updateService(federatedSvc, newFederatedSvc)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
}
func mockFederatedServiceWatcher(t *testing.T) (*federatedServiceWatcher, error) {
k8sAPI, err := k8s.NewFakeAPI(westConfigs...)
if err != nil {
return nil, fmt.Errorf("NewFakeAPI returned an error: %w", err)
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
return nil, fmt.Errorf("NewFakeMetadataAPI returned an error: %w", err)
}
localEndpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
if err != nil {
return nil, fmt.Errorf("NewEndpointsWatcher returned an error: %w", err)
}
prom := prometheus.NewRegistry()
clusterStore, err := watcher.NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", false,
watcher.CreateMulticlusterDecoder(map[string][]string{
"east": eastConfigs,
"north": northConfigs,
}),
prom,
)
if err != nil {
return nil, fmt.Errorf("NewClusterStoreWithDecoder returned an error: %w", err)
}
fsw, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{}, clusterStore, localEndpoints, logging.WithField("test", t.Name()))
if err != nil {
return nil, fmt.Errorf("newFederatedServiceWatcher returned an error: %w", err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
clusterStore.Sync(nil)
// Wait for the cluster store to be populated with the remote clusters.
err = testutil.RetryFor(30*time.Second, func() error {
if _, _, found := clusterStore.Get("east"); !found {
return errors.New("east cluster not found in cluster store")
}
if _, _, found := clusterStore.Get("north"); !found {
return errors.New("north cluster not found in cluster store")
}
return nil
})
if err != nil {
return nil, fmt.Errorf("timed out waiting for cluster store to be populated: %w", err)
}
return fsw, nil
}
func assertUpdatesContains(t *testing.T, updates []*pb.Update, pod, address string) {
t.Helper()
if !slices.ContainsFunc[[]*pb.Update, *pb.Update](updates, func(u *pb.Update) bool {
if u.GetAdd() == nil || len(u.GetAdd().GetAddrs()) == 0 {
return false
}
endpoint := u.GetAdd().GetAddrs()[0]
return addr.ProxyAddressToString(endpoint.GetAddr()) == address && endpoint.MetricLabels["pod"] == pod
}) {
t.Errorf("expected updates to contain pod %s with address %s", pod, address)
}
}
func assertUpdatesRemoves(t *testing.T, updates []*pb.Update, address string) {
t.Helper()
if !slices.ContainsFunc[[]*pb.Update, *pb.Update](updates, func(u *pb.Update) bool {
if u.GetRemove() == nil || len(u.GetRemove().GetAddrs()) == 0 {
return false
}
endpoint := u.GetRemove().GetAddrs()[0]
return addr.ProxyAddressToString(endpoint) == address
}) {
t.Errorf("expected updates to contain remove of address %s", address)
}
}
var (
westConfigs = []string{
`
apiVersion: v1
kind: Namespace
metadata:
name: linkerd`,
`
apiVersion: v1
kind: Secret
type: mirror.linkerd.io/remote-kubeconfig
metadata:
namespace: linkerd
name: east-cluster-credentials
labels:
multicluster.linkerd.io/cluster-name: east
annotations:
multicluster.linkerd.io/trust-domain: cluster.local
multicluster.linkerd.io/cluster-domain: cluster.local
data:
kubeconfig: ZWFzdAo= # east
`,
`
apiVersion: v1
kind: Secret
type: mirror.linkerd.io/remote-kubeconfig
metadata:
namespace: linkerd
name: north-cluster-credentials
labels:
multicluster.linkerd.io/cluster-name: north
annotations:
multicluster.linkerd.io/trust-domain: cluster.local
multicluster.linkerd.io/cluster-domain: cluster.local
data:
kubeconfig: bm9ydGgK # north
`,
`
apiVersion: v1
kind: Namespace
metadata:
name: test`,
`
apiVersion: v1
kind: Service
metadata:
name: bb-federated
namespace: test
annotations:
multicluster.linkerd.io/remote-discovery: bb@east
multicluster.linkerd.io/local-discovery: bb
spec:
type: LoadBalancer
ports:
- port: 8080`,
`
apiVersion: v1
kind: Service
metadata:
name: bb
namespace: test
spec:
type: LoadBalancer
ports:
- port: 8080`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: bb
namespace: test
subsets:
- addresses:
- ip: 172.17.0.1
targetRef:
kind: Pod
name: bb-west-1
namespace: test
ports:
- port: 8080
`,
`
apiVersion: v1
kind: Pod
metadata:
name: bb-west-1
namespace: test
ownerReferences:
- kind: ReplicaSet
name: bb-west
status:
phase: Running
podIP: 172.17.0.1`,
}
eastConfigs = []string{
`
apiVersion: v1
kind: Namespace
metadata:
name: test`,
`
apiVersion: v1
kind: Service
metadata:
name: bb
namespace: test
spec:
type: LoadBalancer
ports:
- port: 8080`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: bb
namespace: test
subsets:
- addresses:
- ip: 172.17.1.1
targetRef:
kind: Pod
name: bb-east-1
namespace: test
ports:
- port: 8080
`,
`
apiVersion: v1
kind: Pod
metadata:
name: bb-east-1
namespace: test
ownerReferences:
- kind: ReplicaSet
name: bb-east
status:
phase: Running
podIP: 172.17.1.1`,
}
northConfigs = []string{
`
apiVersion: v1
kind: Namespace
metadata:
name: test`,
`
apiVersion: v1
kind: Service
metadata:
name: bb
namespace: test
spec:
type: LoadBalancer
ports:
- port: 8080`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: bb
namespace: test
subsets:
- addresses:
- ip: 172.17.2.1
targetRef:
kind: Pod
name: bb-north-1
namespace: test
ports:
- port: 8080
`,
`
apiVersion: v1
kind: Pod
metadata:
name: bb-north-1
namespace: test
ownerReferences:
- kind: ReplicaSet
name: bb-north
status:
phase: Running
podIP: 172.17.2.1`,
}
)