linkerd2/multicluster/service-mirror/cluster_watcher_test_util.go

605 lines
17 KiB
Go

package servicemirror
import (
"context"
"fmt"
"log"
"reflect"
"strings"
"time"
"github.com/ghodss/yaml"
"github.com/linkerd/linkerd2/controller/k8s"
consts "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/multicluster"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
)
const (
clusterName = "remote"
clusterDomain = "cluster.local"
defaultProbePath = "/probe"
defaultProbePort = 12345
defaultProbePeriod = 60
)
var (
defaultProbeSpec = multicluster.ProbeSpec{
Path: defaultProbePath,
Port: defaultProbePort,
Period: time.Duration(defaultProbePeriod) * time.Second,
}
defaultSelector, _ = metav1.ParseToLabelSelector(consts.DefaultExportedServiceSelector)
)
type testEnvironment struct {
events []interface{}
remoteResources []string
localResources []string
link multicluster.Link
}
func (te *testEnvironment) runEnvironment(watcherQueue workqueue.RateLimitingInterface) (*k8s.API, error) {
remoteAPI, err := k8s.NewFakeAPI(te.remoteResources...)
if err != nil {
return nil, err
}
localAPI, err := k8s.NewFakeAPI(te.localResources...)
if err != nil {
return nil, err
}
remoteAPI.Sync(nil)
localAPI.Sync(nil)
watcher := RemoteClusterServiceWatcher{
link: &te.link,
remoteAPIClient: remoteAPI,
localAPIClient: localAPI,
stopper: nil,
log: logging.WithFields(logging.Fields{"cluster": clusterName}),
eventsQueue: watcherQueue,
requeueLimit: 0,
}
for _, ev := range te.events {
watcherQueue.Add(ev)
}
for range te.events {
watcher.processNextEvent(context.Background())
}
localAPI.Sync(nil)
remoteAPI.Sync(nil)
return localAPI, nil
}
var createExportedService = &testEnvironment{
events: []interface{}{
&RemoteServiceCreated{
service: remoteService("service-one", "ns1", "111", map[string]string{
consts.DefaultExportedServiceSelector: "true",
}, []corev1.ServicePort{
{
Name: "port1",
Protocol: "TCP",
Port: 555,
},
{
Name: "port2",
Protocol: "TCP",
Port: 666,
},
}),
},
},
remoteResources: []string{
gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod),
},
link: multicluster.Link{
TargetClusterName: clusterName,
TargetClusterDomain: clusterDomain,
GatewayIdentity: "gateway-identity",
GatewayAddress: "192.0.2.127",
GatewayPort: 888,
ProbeSpec: defaultProbeSpec,
Selector: *defaultSelector,
},
}
var deleteMirrorService = &testEnvironment{
events: []interface{}{
&RemoteServiceDeleted{
Name: "test-service-remote-to-delete",
Namespace: "test-namespace-to-delete",
},
},
localResources: []string{
mirrorServiceAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", nil),
endpointsAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", "gateway-identity", nil),
},
link: multicluster.Link{
TargetClusterName: clusterName,
TargetClusterDomain: clusterDomain,
GatewayIdentity: "gateway-identity",
GatewayAddress: "192.0.2.127",
GatewayPort: 888,
ProbeSpec: defaultProbeSpec,
Selector: *defaultSelector,
},
}
var updateServiceWithChangedPorts = &testEnvironment{
events: []interface{}{
&RemoteServiceUpdated{
remoteUpdate: remoteService("test-service", "test-namespace", "currentServiceResVersion", map[string]string{
consts.DefaultExportedServiceSelector: "true",
}, []corev1.ServicePort{
{
Name: "port1",
Protocol: "TCP",
Port: 111,
},
{
Name: "port3",
Protocol: "TCP",
Port: 333,
},
}),
localService: mirrorService("test-service-remote", "test-namespace", "pastServiceResVersion", []corev1.ServicePort{
{
Name: "port1",
Protocol: "TCP",
Port: 111,
},
{
Name: "port2",
Protocol: "TCP",
Port: 222,
},
}),
localEndpoints: endpoints("test-service-remote", "test-namespace", "192.0.2.127", "", []corev1.EndpointPort{
{
Name: "port1",
Port: 888,
Protocol: "TCP",
},
{
Name: "port2",
Port: 888,
Protocol: "TCP",
},
}),
},
},
remoteResources: []string{
gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "mc-gateway", 888, "", defaultProbePort, defaultProbePath, defaultProbePeriod),
},
localResources: []string{
mirrorServiceAsYaml("test-service-remote", "test-namespace", "past", []corev1.ServicePort{
{
Name: "port1",
Protocol: "TCP",
Port: 111,
},
{
Name: "port2",
Protocol: "TCP",
Port: 222,
},
{
Name: "port3",
Protocol: "TCP",
Port: 333,
},
}),
endpointsAsYaml("test-service-remote", "test-namespace", "192.0.2.127", "", []corev1.EndpointPort{
{
Name: "port1",
Port: 888,
Protocol: "TCP",
},
{
Name: "port2",
Port: 888,
Protocol: "TCP",
},
{
Name: "port3",
Port: 888,
Protocol: "TCP",
},
}),
},
link: multicluster.Link{
TargetClusterName: clusterName,
TargetClusterDomain: clusterDomain,
GatewayIdentity: "gateway-identity",
GatewayAddress: "192.0.2.127",
GatewayPort: 888,
ProbeSpec: defaultProbeSpec,
Selector: *defaultSelector,
},
}
var clusterUnregistered = &testEnvironment{
events: []interface{}{
&ClusterUnregistered{},
},
localResources: []string{
mirrorServiceAsYaml("test-service-1-remote", "test-namespace", "", nil),
endpointsAsYaml("test-service-1-remote", "test-namespace", "", "", nil),
mirrorServiceAsYaml("test-service-2-remote", "test-namespace", "", nil),
endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", nil),
},
link: multicluster.Link{
TargetClusterName: clusterName,
},
}
var gcTriggered = &testEnvironment{
events: []interface{}{
&OrphanedServicesGcTriggered{},
},
localResources: []string{
mirrorServiceAsYaml("test-service-1-remote", "test-namespace", "", nil),
endpointsAsYaml("test-service-1-remote", "test-namespace", "", "", nil),
mirrorServiceAsYaml("test-service-2-remote", "test-namespace", "", nil),
endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", nil),
},
remoteResources: []string{
remoteServiceAsYaml("test-service-1", "test-namespace", "", nil),
},
link: multicluster.Link{
TargetClusterName: clusterName,
},
}
func onAddOrUpdateExportedSvc(isAdd bool) *testEnvironment {
return &testEnvironment{
events: []interface{}{
onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "resVersion", map[string]string{
consts.DefaultExportedServiceSelector: "true",
}, nil)),
},
link: multicluster.Link{
TargetClusterName: clusterName,
TargetClusterDomain: clusterDomain,
GatewayIdentity: "gateway-identity",
GatewayAddress: "192.0.2.127",
GatewayPort: 888,
ProbeSpec: defaultProbeSpec,
Selector: *defaultSelector,
},
}
}
func onAddOrUpdateRemoteServiceUpdated(isAdd bool) *testEnvironment {
return &testEnvironment{
events: []interface{}{
onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{
consts.DefaultExportedServiceSelector: "true",
}, nil)),
},
localResources: []string{
mirrorServiceAsYaml("test-service-remote", "test-namespace", "pastResourceVersion", nil),
endpointsAsYaml("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
},
link: multicluster.Link{
TargetClusterName: clusterName,
TargetClusterDomain: clusterDomain,
GatewayIdentity: "gateway-identity",
GatewayAddress: "192.0.2.127",
GatewayPort: 888,
ProbeSpec: defaultProbeSpec,
Selector: *defaultSelector,
},
}
}
func onAddOrUpdateSameResVersion(isAdd bool) *testEnvironment {
return &testEnvironment{
events: []interface{}{
onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{
consts.DefaultExportedServiceSelector: "true",
}, nil)),
},
localResources: []string{
mirrorServiceAsYaml("test-service-remote", "test-namespace", "currentResVersion", nil),
endpointsAsYaml("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
},
link: multicluster.Link{
TargetClusterName: clusterName,
TargetClusterDomain: clusterDomain,
GatewayIdentity: "gateway-identity",
GatewayAddress: "192.0.2.127",
GatewayPort: 888,
ProbeSpec: defaultProbeSpec,
Selector: *defaultSelector,
},
}
}
func serviceNotExportedAnymore(isAdd bool) *testEnvironment {
return &testEnvironment{
events: []interface{}{
onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{}, nil)),
},
localResources: []string{
mirrorServiceAsYaml("test-service-remote", "test-namespace", "currentResVersion", nil),
endpointsAsYaml("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
},
link: multicluster.Link{
TargetClusterName: clusterName,
TargetClusterDomain: clusterDomain,
GatewayIdentity: "gateway-identity",
GatewayAddress: "192.0.2.127",
GatewayPort: 888,
ProbeSpec: defaultProbeSpec,
Selector: *defaultSelector,
},
}
}
var onDeleteExportedService = &testEnvironment{
events: []interface{}{
&OnDeleteCalled{
svc: remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{
consts.DefaultExportedServiceSelector: "true",
}, nil),
},
},
link: multicluster.Link{
TargetClusterName: clusterName,
TargetClusterDomain: clusterDomain,
GatewayIdentity: "gateway-identity",
GatewayAddress: "192.0.2.127",
GatewayPort: 888,
ProbeSpec: defaultProbeSpec,
Selector: *defaultSelector,
},
}
var onDeleteNonExportedService = &testEnvironment{
events: []interface{}{
&OnDeleteCalled{
svc: remoteService("gateway", "test-namespace", "currentResVersion", map[string]string{}, nil),
},
},
link: multicluster.Link{
TargetClusterName: clusterName,
TargetClusterDomain: clusterDomain,
GatewayIdentity: "gateway-identity",
GatewayAddress: "192.0.2.127",
GatewayPort: 888,
ProbeSpec: defaultProbeSpec,
Selector: *defaultSelector,
},
}
// the following tests ensure that onAdd, onUpdate and onDelete result in
// queueing more specific events to be processed
func onAddOrUpdateEvent(isAdd bool, svc *corev1.Service) interface{} {
if isAdd {
return &OnAddCalled{svc: svc}
}
return &OnUpdateCalled{svc: svc}
}
func diffServices(expected, actual *corev1.Service) error {
if expected.Name != actual.Name {
return fmt.Errorf("was expecting service with name %s but was %s", expected.Name, actual.Name)
}
if expected.Namespace != actual.Namespace {
return fmt.Errorf("was expecting service with namespace %s but was %s", expected.Namespace, actual.Namespace)
}
if !reflect.DeepEqual(expected.Annotations, actual.Annotations) {
return fmt.Errorf("was expecting service with annotations %v but got %v", expected.Annotations, actual.Annotations)
}
if !reflect.DeepEqual(expected.Labels, actual.Labels) {
return fmt.Errorf("was expecting service with labels %v but got %v", expected.Labels, actual.Labels)
}
return nil
}
func diffEndpoints(expected, actual *corev1.Endpoints) error {
if expected.Name != actual.Name {
return fmt.Errorf("was expecting endpoints with name %s but was %s", expected.Name, actual.Name)
}
if expected.Namespace != actual.Namespace {
return fmt.Errorf("was expecting endpoints with namespace %s but was %s", expected.Namespace, actual.Namespace)
}
if !reflect.DeepEqual(expected.Annotations, actual.Annotations) {
return fmt.Errorf("was expecting endpoints with annotations %v but got %v", expected.Annotations, actual.Annotations)
}
if !reflect.DeepEqual(expected.Labels, actual.Labels) {
return fmt.Errorf("was expecting endpoints with labels %v but got %v", expected.Labels, actual.Labels)
}
if !reflect.DeepEqual(expected.Subsets, actual.Subsets) {
return fmt.Errorf("was expecting endpoints with subsets %v but got %v", expected.Subsets, actual.Subsets)
}
return nil
}
func remoteService(name, namespace, resourceVersion string, labels map[string]string, ports []corev1.ServicePort) *corev1.Service {
return &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: resourceVersion,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Ports: ports,
},
}
}
func remoteServiceAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string {
svc := remoteService(name, namespace, resourceVersion, nil, ports)
bytes, err := yaml.Marshal(svc)
if err != nil {
log.Fatal(err)
}
return string(bytes)
}
func mirrorService(name, namespace, resourceVersion string, ports []corev1.ServicePort) *corev1.Service {
annotations := make(map[string]string)
annotations[consts.RemoteResourceVersionAnnotation] = resourceVersion
annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.svc.cluster.local", strings.Replace(name, "-remote", "", 1), namespace)
return &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
consts.RemoteClusterNameLabel: clusterName,
consts.MirroredResourceLabel: "true",
},
Annotations: annotations,
},
Spec: corev1.ServiceSpec{
Ports: ports,
},
}
}
func mirrorServiceAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string {
svc := mirrorService(name, namespace, resourceVersion, ports)
bytes, err := yaml.Marshal(svc)
if err != nil {
log.Fatal(err)
}
return string(bytes)
}
func gateway(name, namespace, resourceVersion, ip, hostname, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) *corev1.Service {
svc := corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: resourceVersion,
Annotations: map[string]string{
consts.GatewayIdentity: identity,
consts.GatewayProbePath: probePath,
consts.GatewayProbePeriod: fmt.Sprint(probePeriod),
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: portName,
Protocol: "TCP",
Port: port,
},
{
Name: consts.ProbePortName,
Protocol: "TCP",
Port: probePort,
},
},
},
}
if ip != "" {
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{IP: ip})
}
if hostname != "" {
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{Hostname: hostname})
}
return &svc
}
func gatewayAsYaml(name, namespace, resourceVersion, ip, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) string {
gtw := gateway(name, namespace, resourceVersion, ip, "", portName, port, identity, probePort, probePath, probePeriod)
bytes, err := yaml.Marshal(gtw)
if err != nil {
log.Fatal(err)
}
return string(bytes)
}
func endpoints(name, namespace, gatewayIP string, gatewayIdentity string, ports []corev1.EndpointPort) *corev1.Endpoints {
var subsets []corev1.EndpointSubset
if gatewayIP != "" {
subsets = []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: gatewayIP,
},
},
Ports: ports,
},
}
}
endpoints := &corev1.Endpoints{
TypeMeta: metav1.TypeMeta{
Kind: "Endpoints",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
consts.RemoteClusterNameLabel: clusterName,
consts.MirroredResourceLabel: "true",
},
Annotations: map[string]string{
consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.cluster.local", strings.Replace(name, "-remote", "", 1), namespace),
},
},
Subsets: subsets,
}
if gatewayIdentity != "" {
endpoints.Annotations[consts.RemoteGatewayIdentity] = gatewayIdentity
}
return endpoints
}
func endpointsAsYaml(name, namespace, gatewayIP, gatewayIdentity string, ports []corev1.EndpointPort) string {
ep := endpoints(name, namespace, gatewayIP, gatewayIdentity, ports)
bytes, err := yaml.Marshal(ep)
if err != nil {
log.Fatal(err)
}
return string(bytes)
}