diff --git a/controller/api/destination/endpoint_translator.go b/controller/api/destination/endpoint_translator.go index 966ab9a0f..1ccca78ae 100644 --- a/controller/api/destination/endpoint_translator.go +++ b/controller/api/destination/endpoint_translator.go @@ -155,6 +155,7 @@ func (et *endpointTranslator) filterAddresses() watcher.AddressSet { for k, v := range et.availableEndpoints.Addresses { filtered[k] = v } + et.log.Debugf("Hints not available on endpointslice. Zone Filtering disabled. Falling back to routing to all pods") return watcher.AddressSet{ Addresses: filtered, Labels: et.availableEndpoints.Labels, diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index 0db9ac841..995b21f41 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "sort" "strconv" "strings" "sync" @@ -1154,6 +1155,27 @@ func addressChanged(oldAddress Address, newAddress Address) bool { return true } + // If the zone hints have changed, then the address has changed + if len(newAddress.ForZones) != len(oldAddress.ForZones) { + return true + } + + // Sort the zone information so that we can compare them accurately + // We can't use `sort.StringSlice` because these are arrays of structs and not just strings + sort.Slice(oldAddress.ForZones, func(i, j int) bool { + return oldAddress.ForZones[i].Name < (oldAddress.ForZones[j].Name) + }) + sort.Slice(newAddress.ForZones, func(i, j int) bool { + return newAddress.ForZones[i].Name < (newAddress.ForZones[j].Name) + }) + + // Both old and new addresses have the same number of zones, so we can just compare them directly + for k := range oldAddress.ForZones { + if oldAddress.ForZones[k].Name != newAddress.ForZones[k].Name { + return true + } + } + if oldAddress.Pod != nil && newAddress.Pod != nil { // if these addresses are owned by pods we can check the resource versions return oldAddress.Pod.ResourceVersion != newAddress.Pod.ResourceVersion diff --git a/controller/api/destination/watcher/endpoints_watcher_test.go b/controller/api/destination/watcher/endpoints_watcher_test.go index 5f7fd45ea..240fc0bad 100644 --- a/controller/api/destination/watcher/endpoints_watcher_test.go +++ b/controller/api/destination/watcher/endpoints_watcher_test.go @@ -50,6 +50,7 @@ func addressString(address Address) string { func (bel *bufferingEndpointListener) ExpectAdded(expected []string, t *testing.T) { bel.Lock() defer bel.Unlock() + t.Helper() sort.Strings(bel.added) testCompare(t, expected, bel.added) } @@ -57,6 +58,7 @@ func (bel *bufferingEndpointListener) ExpectAdded(expected []string, t *testing. func (bel *bufferingEndpointListener) ExpectRemoved(expected []string, t *testing.T) { bel.Lock() defer bel.Unlock() + t.Helper() sort.Strings(bel.removed) testCompare(t, expected, bel.removed) } @@ -2308,3 +2310,240 @@ status: listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t) } + +// Test that when an endpointslice gets a hint added, then mark it as a change +func TestEndpointSliceAddHints(t *testing.T) { + k8sConfigsWithES := []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1 +resources: +- name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 8989`, ` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1 +endpoints: +- addresses: + - 172.17.0.12 + conditions: + ready: true + targetRef: + kind: Pod + name: name1-1 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name1 + name: name1-es + namespace: ns +ports: +- name: "" + port: 8989`, ` +apiVersion: v1 +kind: Pod +metadata: + name: name1-1 + namespace: ns +status: + phase: Running + podIP: 172.17.0.12`} + + // Create an EndpointSlice with one endpoint, backed by a pod. + + k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...) + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } + + metadataAPI, err := k8s.NewFakeMetadataAPI(nil) + if err != nil { + t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) + } + + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true) + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + listener := newBufferingEndpointListener() + + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener) + if err != nil { + t.Fatal(err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + listener.ExpectAdded([]string{"172.17.0.12:8989"}, t) + + // Add a hint to the EndpointSlice + es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + es.Endpoints[0].Hints = &dv1.EndpointHints{ + ForZones: []dv1.ForZone{{Name: "zone1"}}, + } + + _, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + // Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on + time.Sleep(50 * time.Millisecond) + + listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t) +} + +// Test that when an endpointslice loses a hint, then mark it as a change +func TestEndpointSliceRemoveHints(t *testing.T) { + k8sConfigsWithES := []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1 +resources: +- name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 8989`, ` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1 +endpoints: +- addresses: + - 172.17.0.12 + conditions: + hints: + forZones: + - name: zone1 + ready: true + targetRef: + kind: Pod + name: name1-1 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name1 + name: name1-es + namespace: ns +ports: +- name: "" + port: 8989`, ` +apiVersion: v1 +kind: Pod +metadata: + name: name1-1 + namespace: ns +status: + phase: Running + podIP: 172.17.0.12`} + + // Create an EndpointSlice with one endpoint, backed by a pod. + + k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...) + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } + + metadataAPI, err := k8s.NewFakeMetadataAPI(nil) + if err != nil { + t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) + } + + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true) + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + listener := newBufferingEndpointListener() + + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener) + if err != nil { + t.Fatal(err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + listener.ExpectAdded([]string{"172.17.0.12:8989"}, t) + + // Remove a hint from the EndpointSlice + es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + es.Endpoints[0].Hints = &dv1.EndpointHints{ + //ForZones: []dv1.ForZone{{Name: "zone1"}}, + } + + _, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + // Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on + time.Sleep(50 * time.Millisecond) + + listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t) +}