Add tests for federated service watcher (#13329)

Adds tests for the federated service watcher that exercise having remote and local clusters join and leave a federated service and ensuring that the correct proxy API updates are emitted.

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2024-11-19 10:08:50 -08:00 committed by GitHub
parent 80d28a4318
commit 752d1c9ea0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 424 additions and 4 deletions

View File

@ -27,6 +27,8 @@ type federatedServiceWatcher struct {
localEndpoints *watcher.EndpointsWatcher
log *logging.Entry
sync.RWMutex
}
type remoteDiscoveryID struct {
@ -109,10 +111,14 @@ func (fsw *federatedServiceWatcher) Subscribe(
endStream chan struct{},
) error {
id := watcher.ServiceID{Namespace: namespace, Name: service}
fsw.RLock()
if federatedService, ok := fsw.services[id]; ok {
fsw.RUnlock()
fsw.log.Debugf("Subscribing to federated service %s/%s", namespace, service)
federatedService.subscribe(port, nodeName, instanceID, stream, endStream)
return nil
} else {
fsw.RUnlock()
}
return fmt.Errorf("service %s/%s is not a federated service", namespace, service)
}
@ -123,9 +129,13 @@ func (fsw *federatedServiceWatcher) Unsubscribe(
stream pb.Destination_GetServer,
) {
id := watcher.ServiceID{Namespace: namespace, Name: service}
fsw.RLock()
if federatedService, ok := fsw.services[id]; ok {
fsw.RUnlock()
fsw.log.Debugf("Unsubscribing from federated service %s/%s", namespace, service)
federatedService.unsubscribe(stream)
} else {
fsw.RUnlock()
}
}
@ -137,20 +147,27 @@ func (fsw *federatedServiceWatcher) addService(obj interface{}) {
}
if isFederatedService(service) {
fsw.Lock()
if federatedService, ok := fsw.services[id]; ok {
fsw.Unlock()
fsw.log.Debugf("Updating federated service %s/%s", service.Namespace, service.Name)
federatedService.update(service)
} else {
fsw.log.Debugf("Adding federated service %s/%s", service.Namespace, service.Name)
federatedService = fsw.newFederatedService(service)
fsw.services[id] = federatedService
fsw.Unlock()
federatedService.update(service)
}
} else {
fsw.Lock()
if federatedService, ok := fsw.services[id]; ok {
delete(fsw.services, id)
fsw.Unlock()
fsw.log.Debugf("Service %s/%s is no longer a federated service", service.Namespace, service.Name)
federatedService.delete()
delete(fsw.services, id)
} else {
fsw.Unlock()
}
}
}
@ -178,10 +195,15 @@ func (fsw *federatedServiceWatcher) deleteService(obj interface{}) {
Namespace: service.Namespace,
Name: service.Name,
}
fsw.Lock()
if federatedService, ok := fsw.services[id]; ok {
federatedService.delete()
delete(fsw.services, id)
fsw.Unlock()
federatedService.delete()
} else {
fsw.Unlock()
}
}
func (fsw *federatedServiceWatcher) newFederatedService(service *corev1.Service) *federatedService {
@ -268,6 +290,9 @@ func (fs *federatedService) subscribe(
stream pb.Destination_GetServer,
endStream chan struct{},
) {
fs.Lock()
defer fs.Unlock()
syncStream := newSyncronizedGetStream(stream, fs.log)
syncStream.Start()
@ -287,8 +312,6 @@ func (fs *federatedService) subscribe(
fs.localDiscoverySubscribe(&subscriber, fs.localDiscovery)
}
fs.Lock()
defer fs.Unlock()
fs.subscribers = append(fs.subscribers, subscriber)
}
@ -322,6 +345,7 @@ func (fs *federatedService) remoteDiscoverySubscribe(
remoteWatcher, remoteConfig, found := fs.clusterStore.Get(id.cluster)
if !found {
fs.log.Errorf("Failed to get remote cluster %s", id.cluster)
return
}
translator := newEndpointTranslator(

View File

@ -0,0 +1,375 @@
package destination
import (
"fmt"
"slices"
"testing"
logging "github.com/sirupsen/logrus"
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/addr"
)
func TestFederatedService(t *testing.T) {
fsw, err := mockFederatedServiceWatcher(t)
if err != nil {
t.Fatal(err)
}
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
updates := []*pb.Update{}
updates = append(updates, <-mockGetServer.updatesReceived)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
}
func TestRemoteJoinFederatedService(t *testing.T) {
fsw, err := mockFederatedServiceWatcher(t)
if err != nil {
t.Fatal(err)
}
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
updates := []*pb.Update{}
updates = append(updates, <-mockGetServer.updatesReceived)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
federatedSvc, err := fsw.k8sAPI.Svc().Lister().Services("test").Get("bb-federated")
if err != nil {
t.Fatalf("error getting federated service: %s", err)
}
newFederatedSvc := federatedSvc.DeepCopy()
newFederatedSvc.Annotations["multicluster.linkerd.io/remote-discovery"] = "bb@east,bb@north"
fsw.updateService(federatedSvc, newFederatedSvc)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-north-1", "172.17.2.1:8080")
}
func TestRemoteLeaveFederatedService(t *testing.T) {
fsw, err := mockFederatedServiceWatcher(t)
if err != nil {
t.Fatal(err)
}
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
updates := []*pb.Update{}
updates = append(updates, <-mockGetServer.updatesReceived)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
federatedSvc, err := fsw.k8sAPI.Svc().Lister().Services("test").Get("bb-federated")
if err != nil {
t.Fatalf("error getting federated service: %s", err)
}
newFederatedSvc := federatedSvc.DeepCopy()
delete(newFederatedSvc.Annotations, "multicluster.linkerd.io/remote-discovery")
fsw.updateService(federatedSvc, newFederatedSvc)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesRemoves(t, updates, "172.17.1.1:8080")
}
func TestLocalLeaveFederatedService(t *testing.T) {
fsw, err := mockFederatedServiceWatcher(t)
if err != nil {
t.Fatal(err)
}
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
fsw.Subscribe("bb-federated", "test", 8080, "node", "", mockGetServer, nil)
updates := []*pb.Update{}
updates = append(updates, <-mockGetServer.updatesReceived)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
assertUpdatesContains(t, updates, "bb-east-1", "172.17.1.1:8080")
federatedSvc, err := fsw.k8sAPI.Svc().Lister().Services("test").Get("bb-federated")
if err != nil {
t.Fatalf("error getting federated service: %s", err)
}
newFederatedSvc := federatedSvc.DeepCopy()
delete(newFederatedSvc.Annotations, "multicluster.linkerd.io/local-discovery")
fsw.updateService(federatedSvc, newFederatedSvc)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesRemoves(t, updates, "172.17.0.1:8080")
federatedSvc = newFederatedSvc
newFederatedSvc = federatedSvc.DeepCopy()
newFederatedSvc.Annotations["multicluster.linkerd.io/local-discovery"] = "bb"
fsw.updateService(federatedSvc, newFederatedSvc)
updates = append(updates, <-mockGetServer.updatesReceived)
assertUpdatesContains(t, updates, "bb-west-1", "172.17.0.1:8080")
}
func mockFederatedServiceWatcher(t *testing.T) (*federatedServiceWatcher, error) {
k8sAPI, err := k8s.NewFakeAPI(westConfigs...)
if err != nil {
return nil, fmt.Errorf("NewFakeAPI returned an error: %w", err)
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
return nil, fmt.Errorf("NewFakeMetadataAPI returned an error: %w", err)
}
localEndpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
if err != nil {
return nil, fmt.Errorf("NewEndpointsWatcher returned an error: %w", err)
}
clusterStore, err := watcher.NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", false,
watcher.CreateMulticlusterDecoder(map[string][]string{
"east": eastConfigs,
"north": northConfigs,
}),
)
if err != nil {
return nil, fmt.Errorf("NewClusterStoreWithDecoder returned an error: %w", err)
}
fsw, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{}, clusterStore, localEndpoints, logging.WithField("test", t.Name()))
if err != nil {
return nil, fmt.Errorf("newFederatedServiceWatcher returned an error: %w", err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
clusterStore.Sync(nil)
t.Cleanup(func() {
clusterStore.UnregisterGauges()
})
return fsw, nil
}
func assertUpdatesContains(t *testing.T, updates []*pb.Update, pod, address string) {
t.Helper()
if !slices.ContainsFunc[[]*pb.Update, *pb.Update](updates, func(u *pb.Update) bool {
if u.GetAdd() == nil || len(u.GetAdd().GetAddrs()) == 0 {
return false
}
endpoint := u.GetAdd().GetAddrs()[0]
return addr.ProxyAddressToString(endpoint.GetAddr()) == address && endpoint.MetricLabels["pod"] == pod
}) {
t.Errorf("expected updates to contain pod %s with address %s", pod, address)
}
}
func assertUpdatesRemoves(t *testing.T, updates []*pb.Update, address string) {
t.Helper()
if !slices.ContainsFunc[[]*pb.Update, *pb.Update](updates, func(u *pb.Update) bool {
if u.GetRemove() == nil || len(u.GetRemove().GetAddrs()) == 0 {
return false
}
endpoint := u.GetRemove().GetAddrs()[0]
return addr.ProxyAddressToString(endpoint) == address
}) {
t.Errorf("expected updates to contain remove of address %s", address)
}
}
var (
westConfigs = []string{
`
apiVersion: v1
kind: Namespace
metadata:
name: linkerd`,
`
apiVersion: v1
kind: Secret
type: mirror.linkerd.io/remote-kubeconfig
metadata:
namespace: linkerd
name: east-cluster-credentials
labels:
multicluster.linkerd.io/cluster-name: east
annotations:
multicluster.linkerd.io/trust-domain: cluster.local
multicluster.linkerd.io/cluster-domain: cluster.local
data:
kubeconfig: ZWFzdAo= # east
`,
`
apiVersion: v1
kind: Secret
type: mirror.linkerd.io/remote-kubeconfig
metadata:
namespace: linkerd
name: north-cluster-credentials
labels:
multicluster.linkerd.io/cluster-name: north
annotations:
multicluster.linkerd.io/trust-domain: cluster.local
multicluster.linkerd.io/cluster-domain: cluster.local
data:
kubeconfig: bm9ydGgK # north
`,
`
apiVersion: v1
kind: Namespace
metadata:
name: test`,
`
apiVersion: v1
kind: Service
metadata:
name: bb-federated
namespace: test
annotations:
multicluster.linkerd.io/remote-discovery: bb@east
multicluster.linkerd.io/local-discovery: bb
spec:
type: LoadBalancer
ports:
- port: 8080`,
`
apiVersion: v1
kind: Service
metadata:
name: bb
namespace: test
spec:
type: LoadBalancer
ports:
- port: 8080`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: bb
namespace: test
subsets:
- addresses:
- ip: 172.17.0.1
targetRef:
kind: Pod
name: bb-west-1
namespace: test
ports:
- port: 8080
`,
`
apiVersion: v1
kind: Pod
metadata:
name: bb-west-1
namespace: test
ownerReferences:
- kind: ReplicaSet
name: bb-west
status:
phase: Running
podIP: 172.17.0.1`,
}
eastConfigs = []string{
`
apiVersion: v1
kind: Namespace
metadata:
name: test`,
`
apiVersion: v1
kind: Service
metadata:
name: bb
namespace: test
spec:
type: LoadBalancer
ports:
- port: 8080`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: bb
namespace: test
subsets:
- addresses:
- ip: 172.17.1.1
targetRef:
kind: Pod
name: bb-east-1
namespace: test
ports:
- port: 8080
`,
`
apiVersion: v1
kind: Pod
metadata:
name: bb-east-1
namespace: test
ownerReferences:
- kind: ReplicaSet
name: bb-east
status:
phase: Running
podIP: 172.17.1.1`,
}
northConfigs = []string{
`
apiVersion: v1
kind: Namespace
metadata:
name: test`,
`
apiVersion: v1
kind: Service
metadata:
name: bb
namespace: test
spec:
type: LoadBalancer
ports:
- port: 8080`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: bb
namespace: test
subsets:
- addresses:
- ip: 172.17.2.1
targetRef:
kind: Pod
name: bb-north-1
namespace: test
ports:
- port: 8080
`,
`
apiVersion: v1
kind: Pod
metadata:
name: bb-north-1
namespace: test
ownerReferences:
- kind: ReplicaSet
name: bb-north
status:
phase: Running
podIP: 172.17.2.1`,
}
)

View File

@ -1,6 +1,7 @@
package watcher
import (
"fmt"
"sync"
"testing"
@ -61,6 +62,26 @@ func CreateMockDecoder(configs ...string) configDecoder {
}
func CreateMulticlusterDecoder(configs map[string][]string) configDecoder {
return func(data []byte, cluster string, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) {
configs, ok := configs[cluster]
if !ok {
return nil, nil, fmt.Errorf("cluster %s not found in configs", cluster)
}
remoteAPI, err := k8s.NewFakeAPI(configs...)
if err != nil {
return nil, nil, err
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
return nil, nil, err
}
return remoteAPI, metadataAPI, nil
}
}
// Update stores the update in the internal buffer.
func (bpl *BufferingProfileListener) Update(profile *sp.ServiceProfile) {
bpl.mu.Lock()