Service mirroring tests (#4115)

Unit tests that exercise most of the code in cluster_watcher.go. Essentially the whole cluster mirroring machinary can be tought of as a function that takes remote cluster state, local cluster state, and modification events and as a result it either modifies local cluster state or issues new events onto the queue. This is what these tests are trying to model. I think this covers a lot of the logic there. Any suggestions for other edge cases are welcome.

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
This commit is contained in:
Zahari Dichev 2020-03-04 20:17:21 +02:00 committed by GitHub
parent 2d17d6253d
commit 72fc94b03c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 1278 additions and 48 deletions

View File

@ -39,9 +39,8 @@ type (
// RemoteServiceCreated is generated whenever a remote service is created Observing
// this event means that the service in question is not mirrored atm
RemoteServiceCreated struct {
service *corev1.Service
gatewayData *gatewayMetadata
newResourceVersion string
service *corev1.Service
gatewayData *gatewayMetadata
}
// RemoteServiceUpdated is generated when we see something about an already
@ -65,8 +64,7 @@ type (
// RemoteGatewayDeleted is observed when a service that is a gateway to at least
// one already mirrored service is deleted
RemoteGatewayDeleted struct {
gatewayData *gatewayMetadata
affectedEndpoints []*corev1.Endpoints
gatewayData *gatewayMetadata
}
// RemoteGatewayUpdated happens when a service that is a gateway to at least
@ -376,14 +374,23 @@ func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources() error {
func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ev *RemoteServiceDeleted) error {
localServiceName := rcsw.mirroredResourceName(ev.Name)
rcsw.log.Debugf("Deleting mirrored service %s/%s and its corresponding Endpoints", ev.Namespace, localServiceName)
var errors []error
if err := rcsw.localAPIClient.Client.CoreV1().Services(ev.Namespace).Delete(localServiceName, &metav1.DeleteOptions{}); err != nil {
if kerrors.IsNotFound(err) {
return nil
if !kerrors.IsNotFound(err) {
errors = append(errors, fmt.Errorf("could not delete Service: %s/%s: %s", ev.Namespace, localServiceName, err))
}
// we can try deleting it again
return RetryableError{[]error{fmt.Errorf("could not delete Service: %s/%s: %s", ev.Namespace, localServiceName, err)}}
}
if err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.Namespace).Delete(localServiceName, &metav1.DeleteOptions{}); err != nil {
if !kerrors.IsNotFound(err) {
errors = append(errors, fmt.Errorf("could not delete Endpoints: %s/%s: %s", ev.Namespace, localServiceName, err))
}
}
if len(errors) > 0 {
return RetryableError{errors}
}
rcsw.log.Debugf("Successfully deleted Service: %s/%s", ev.Namespace, localServiceName)
return nil
}
@ -402,6 +409,10 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ev *RemoteSe
Ports: rcsw.getEndpointsPorts(ev.remoteUpdate, gatewayPort),
},
}
ev.localEndpoints.Labels[consts.RemoteGatewayNameLabel] = ev.gatewayData.Name
ev.localEndpoints.Labels[consts.RemoteGatewayNsLabel] = ev.gatewayData.Namespace
} else {
rcsw.log.Warnf("Could not resolve gateway for %s: %s, nulling endpoints", serviceInfo, err)
ev.localEndpoints.Subsets = nil
@ -617,11 +628,7 @@ func (rcsw *RemoteClusterServiceWatcher) handleConsiderGatewayUpdateDispatch(eve
rcsw.log.Warnf("Gateway [%s/%s] is not a compliant gateway anymore, dispatching GatewayDeleted event: %s", event.maybeGateway.Namespace, event.maybeGateway.Name, err)
// in case something changed about this gateway and it is not really a gateway anymore,
// simply dispatch deletion event so all endpoints are nulled
endpoints, err := rcsw.endpointsForGateway(gatewayMeta)
if err != nil {
return RetryableError{[]error{err}}
}
rcsw.eventsQueue.AddRateLimited(&RemoteGatewayDeleted{gatewayMeta, endpoints})
rcsw.eventsQueue.AddRateLimited(&RemoteGatewayDeleted{gatewayMeta})
} else {
affectedServices, err := rcsw.affectedMirroredServicesForGatewayUpdate(gtwMetadata, event.maybeGateway.ResourceVersion)
if err != nil {
@ -662,9 +669,8 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.S
// we are not mirroring but has gateway data, so we need
// to create it
rcsw.eventsQueue.Add(&RemoteServiceCreated{
service: service,
gatewayData: gtwData,
newResourceVersion: service.ResourceVersion,
service: service,
gatewayData: gtwData,
})
} else {
// at this point we know that we do not have such a service
@ -769,41 +775,48 @@ func (rcsw *RemoteClusterServiceWatcher) handleOnDelete(service *corev1.Service)
}
}
func (rcsw *RemoteClusterServiceWatcher) processNextEvent() (bool, interface{}, error) {
event, done := rcsw.eventsQueue.Get()
var err error
switch ev := event.(type) {
case *OnAddCalled:
err = rcsw.createOrUpdateService(ev.svc)
case *OnUpdateCalled:
err = rcsw.createOrUpdateService(ev.svc)
case *OnDeleteCalled:
rcsw.handleOnDelete(ev.svc)
case *RemoteServiceCreated:
err = rcsw.handleRemoteServiceCreated(ev)
case *RemoteServiceUpdated:
err = rcsw.handleRemoteServiceUpdated(ev)
case *RemoteServiceDeleted:
err = rcsw.handleRemoteServiceDeleted(ev)
case *RemoteGatewayUpdated:
err = rcsw.handleRemoteGatewayUpdated(ev)
case *RemoteGatewayDeleted:
err = rcsw.handleRemoteGatewayDeleted(ev)
case *ConsiderGatewayUpdateDispatch:
err = rcsw.handleConsiderGatewayUpdateDispatch(ev)
case *ClusterUnregistered:
err = rcsw.cleanupMirroredResources()
case *OprhanedServicesGcTriggered:
err = rcsw.cleanupOrphanedServices()
default:
if ev != nil || !done { // we get a nil in case we are shutting down...
rcsw.log.Warnf("Received unknown event: %v", ev)
}
}
return done, event, err
}
// the main processing loop in which we handle more domain specific events
// and deal with retries
func (rcsw *RemoteClusterServiceWatcher) processEvents() {
for {
event, done := rcsw.eventsQueue.Get()
var err error
switch ev := event.(type) {
case *OnAddCalled:
err = rcsw.createOrUpdateService(ev.svc)
case *OnUpdateCalled:
err = rcsw.createOrUpdateService(ev.svc)
case *OnDeleteCalled:
rcsw.handleOnDelete(ev.svc)
case *RemoteServiceCreated:
err = rcsw.handleRemoteServiceCreated(ev)
case *RemoteServiceUpdated:
err = rcsw.handleRemoteServiceUpdated(ev)
case *RemoteServiceDeleted:
err = rcsw.handleRemoteServiceDeleted(ev)
case *RemoteGatewayUpdated:
err = rcsw.handleRemoteGatewayUpdated(ev)
case *RemoteGatewayDeleted:
err = rcsw.handleRemoteGatewayDeleted(ev)
case *ConsiderGatewayUpdateDispatch:
err = rcsw.handleConsiderGatewayUpdateDispatch(ev)
case *ClusterUnregistered:
err = rcsw.cleanupMirroredResources()
case *OprhanedServicesGcTriggered:
err = rcsw.cleanupOrphanedServices()
default:
if ev != nil || !done { // we get a nil in case we are shutting down...
rcsw.log.Warnf("Received unknown event: %v", ev)
}
}
done, event, err := rcsw.processNextEvent()
// the logic here is that there might have been an API
// connectivity glitch or something. So its not a bad idea to requeue
// the event and try again up to a number of limits, just to ensure

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,214 @@
package servicemirror
import (
"fmt"
"reflect"
"strings"
"testing"
"github.com/ghodss/yaml"
consts "github.com/linkerd/linkerd2/pkg/k8s"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func diffServices(expected, actual *corev1.Service) error {
if expected.Name != actual.Name {
return fmt.Errorf("was expecting service with name %s but was %s", expected.Name, actual.Name)
}
if expected.Namespace != actual.Namespace {
return fmt.Errorf("was expecting service with namespace %s but was %s", expected.Namespace, actual.Namespace)
}
if !reflect.DeepEqual(expected.Annotations, actual.Annotations) {
return fmt.Errorf("was expecting service with annotations %v but got %v", expected.Annotations, actual.Annotations)
}
if !reflect.DeepEqual(expected.Labels, actual.Labels) {
return fmt.Errorf("was expecting service with labels %v but got %v", expected.Labels, actual.Labels)
}
return nil
}
func diffEndpoints(expected, actual *corev1.Endpoints) error {
if expected.Name != actual.Name {
return fmt.Errorf("was expecting endpoints with name %s but was %s", expected.Name, actual.Name)
}
if expected.Namespace != actual.Namespace {
return fmt.Errorf("was expecting endpoints with namespace %s but was %s", expected.Namespace, actual.Namespace)
}
if !reflect.DeepEqual(expected.Annotations, actual.Annotations) {
return fmt.Errorf("was expecting endpoints with annotations %v but got %v", expected.Annotations, actual.Annotations)
}
if !reflect.DeepEqual(expected.Labels, actual.Labels) {
return fmt.Errorf("was expecting endpoints with labels %v but got %v", expected.Labels, actual.Labels)
}
if !reflect.DeepEqual(expected.Subsets, actual.Subsets) {
return fmt.Errorf("was expecting endpoints with subsets %v but got %v", expected.Subsets, actual.Subsets)
}
return nil
}
func remoteService(name, namespace, gtwName, gtwNs, resourceVersion string, ports []corev1.ServicePort) *corev1.Service {
annotations := make(map[string]string)
if gtwName != "" && gtwNs != "" {
annotations[consts.GatewayNameAnnotation] = gtwName
annotations[consts.GatewayNsAnnotation] = gtwNs
}
return &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: resourceVersion,
Annotations: annotations,
},
Spec: corev1.ServiceSpec{
Ports: ports,
},
}
}
func remoteServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion string, ports []corev1.ServicePort, t *testing.T) string {
svc := remoteService(name, namespace, gtwName, gtwNs, resourceVersion, ports)
bytes, err := yaml.Marshal(svc)
if err != nil {
t.Fatal(err)
}
return string(bytes)
}
func mirroredService(name, namespace, gtwName, gtwNs, resourceVersion, gatewayResourceVersion string, ports []corev1.ServicePort) *corev1.Service {
annotations := make(map[string]string)
annotations[consts.RemoteResourceVersionAnnotation] = resourceVersion
annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.svc.cluster.local", strings.Replace(name, "-remote", "", 1), namespace)
if gatewayResourceVersion != "" {
annotations[consts.RemoteGatewayResourceVersionAnnotation] = gatewayResourceVersion
}
return &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
consts.RemoteClusterNameLabel: "remote",
consts.MirroredResourceLabel: "true",
consts.RemoteGatewayNameLabel: gtwName,
consts.RemoteGatewayNsLabel: gtwNs,
},
Annotations: annotations,
},
Spec: corev1.ServiceSpec{
Ports: ports,
},
}
}
func mirroredServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion, gatewayResourceVersion string, ports []corev1.ServicePort, t *testing.T) string {
svc := mirroredService(name, namespace, gtwName, gtwNs, resourceVersion, gatewayResourceVersion, ports)
bytes, err := yaml.Marshal(svc)
if err != nil {
t.Fatal(err)
}
return string(bytes)
}
func gateway(name, namespace, resourceVersion, ip, portName string, port int32) *corev1.Service {
svc := corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: resourceVersion,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: portName,
Protocol: "TCP",
Port: port,
},
},
},
}
if ip != "" {
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{IP: ip})
}
return &svc
}
func gatewayAsYaml(name, namespace, resourceVersion, ip, portName string, port int32, t *testing.T) string {
gtw := gateway(name, namespace, resourceVersion, ip, portName, port)
bytes, err := yaml.Marshal(gtw)
if err != nil {
t.Fatal(err)
}
return string(bytes)
}
func endpoints(name, namespace, gtwName, gtwNs, gatewayIP string, ports []corev1.EndpointPort) *corev1.Endpoints {
var subsets []corev1.EndpointSubset
if gatewayIP != "" {
subsets = []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: gatewayIP,
},
},
Ports: ports,
},
}
}
return &corev1.Endpoints{
TypeMeta: metav1.TypeMeta{
Kind: "Endpoints",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
consts.RemoteClusterNameLabel: "remote",
consts.MirroredResourceLabel: "true",
consts.RemoteGatewayNameLabel: gtwName,
consts.RemoteGatewayNsLabel: gtwNs,
},
},
Subsets: subsets,
}
}
func endpointsAsYaml(name, namespace, gtwName, gtwNs, gatewayIP string, ports []corev1.EndpointPort, t *testing.T) string {
ep := endpoints(name, namespace, gtwName, gtwNs, gatewayIP, ports)
bytes, err := yaml.Marshal(ep)
if err != nil {
t.Fatal(err)
}
return string(bytes)
}