Fix bug where topology routing would not disable while service was under load. (#10925)

Add support for enabling and disabling topology aware routing when hints are added/removed.

The testing setup is very involved because it involves so many moving parts

1) Setup a service which is layered over several availability zones.
1a) The best way to do this is one service object, with 3 replicasets explicitly forced to use a specific AZ each.
2) Add `service.kubernetes.io/topology-aware-hints: Auto` annotation to the Service object
3) Use a load tester like k6 to send meaningful traffic to your service but only in one AZ
3) Scale up your replica sets until k8s adds Hints to your endpointslices
4) Observe that traffic shifts to only hit pods in one AZ

5) Turn down the replicasets count until such time that K8s removes the hints from your endpointslices
6) Observe traffic shifts back to all pods across all AZ.
This commit is contained in:
Mark Robinson 2023-05-26 10:31:14 -07:00 committed by GitHub
parent f999e8b109
commit 21209955c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 262 additions and 0 deletions

View File

@ -155,6 +155,7 @@ func (et *endpointTranslator) filterAddresses() watcher.AddressSet {
for k, v := range et.availableEndpoints.Addresses {
filtered[k] = v
}
et.log.Debugf("Hints not available on endpointslice. Zone Filtering disabled. Falling back to routing to all pods")
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"sort"
"strconv"
"strings"
"sync"
@ -1154,6 +1155,27 @@ func addressChanged(oldAddress Address, newAddress Address) bool {
return true
}
// If the zone hints have changed, then the address has changed
if len(newAddress.ForZones) != len(oldAddress.ForZones) {
return true
}
// Sort the zone information so that we can compare them accurately
// We can't use `sort.StringSlice` because these are arrays of structs and not just strings
sort.Slice(oldAddress.ForZones, func(i, j int) bool {
return oldAddress.ForZones[i].Name < (oldAddress.ForZones[j].Name)
})
sort.Slice(newAddress.ForZones, func(i, j int) bool {
return newAddress.ForZones[i].Name < (newAddress.ForZones[j].Name)
})
// Both old and new addresses have the same number of zones, so we can just compare them directly
for k := range oldAddress.ForZones {
if oldAddress.ForZones[k].Name != newAddress.ForZones[k].Name {
return true
}
}
if oldAddress.Pod != nil && newAddress.Pod != nil {
// if these addresses are owned by pods we can check the resource versions
return oldAddress.Pod.ResourceVersion != newAddress.Pod.ResourceVersion

View File

@ -50,6 +50,7 @@ func addressString(address Address) string {
func (bel *bufferingEndpointListener) ExpectAdded(expected []string, t *testing.T) {
bel.Lock()
defer bel.Unlock()
t.Helper()
sort.Strings(bel.added)
testCompare(t, expected, bel.added)
}
@ -57,6 +58,7 @@ func (bel *bufferingEndpointListener) ExpectAdded(expected []string, t *testing.
func (bel *bufferingEndpointListener) ExpectRemoved(expected []string, t *testing.T) {
bel.Lock()
defer bel.Unlock()
t.Helper()
sort.Strings(bel.removed)
testCompare(t, expected, bel.removed)
}
@ -2308,3 +2310,240 @@ status:
listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t)
}
// Test that when an endpointslice gets a hint added, then mark it as a change
func TestEndpointSliceAddHints(t *testing.T) {
k8sConfigsWithES := []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`, `
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.12
conditions:
ready: true
targetRef:
kind: Pod
name: name1-1
namespace: ns
topology:
kubernetes.io/hostname: node-1
kind: EndpointSlice
metadata:
labels:
kubernetes.io/service-name: name1
name: name1-es
namespace: ns
ports:
- name: ""
port: 8989`, `
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
status:
phase: Running
podIP: 172.17.0.12`}
// Create an EndpointSlice with one endpoint, backed by a pod.
k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
}
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
listener := newBufferingEndpointListener()
err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
// Add a hint to the EndpointSlice
es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
es.Endpoints[0].Hints = &dv1.EndpointHints{
ForZones: []dv1.ForZone{{Name: "zone1"}},
}
_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
time.Sleep(50 * time.Millisecond)
listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t)
}
// Test that when an endpointslice loses a hint, then mark it as a change
func TestEndpointSliceRemoveHints(t *testing.T) {
k8sConfigsWithES := []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`, `
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.12
conditions:
hints:
forZones:
- name: zone1
ready: true
targetRef:
kind: Pod
name: name1-1
namespace: ns
topology:
kubernetes.io/hostname: node-1
kind: EndpointSlice
metadata:
labels:
kubernetes.io/service-name: name1
name: name1-es
namespace: ns
ports:
- name: ""
port: 8989`, `
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
status:
phase: Running
podIP: 172.17.0.12`}
// Create an EndpointSlice with one endpoint, backed by a pod.
k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
}
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
listener := newBufferingEndpointListener()
err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
// Remove a hint from the EndpointSlice
es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
es.Endpoints[0].Hints = &dv1.EndpointHints{
//ForZones: []dv1.ForZone{{Name: "zone1"}},
}
_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
time.Sleep(50 * time.Millisecond)
listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t)
}