Process ExternalWorkload targetRefs when updating slices (#11987)

If the readiness of an external workload endpoint changes while traffic
is being sent to it, the update will not be propagated to clients. This
can lead to issues where an endpoint that is marked as `notReady`
continues to figure out as being `ready` by the endpoints watcher.

The issue stems from how endpoint slices are diffed. A utility function
responsible for processing addresses does not consider endpoints whose
targetRef is an external workload. We fix the problem and add two module
tests to validate readiness is propagated to clients correctly.

---------

Signed-off-by: Matei David <matei@buoyant.io>
This commit is contained in:
Matei David 2024-01-26 12:01:50 +00:00 committed by GitHub
parent fac3f99f0e
commit 3313bec072
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 339 additions and 1 deletions

View File

@ -985,6 +985,11 @@ func (pp *portPublisher) endpointSliceToIDs(es *discovery.EndpointSlice) []ID {
Name: endpoint.TargetRef.Name,
Namespace: endpoint.TargetRef.Namespace,
})
} else if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload {
ids = append(ids, ExternalWorkloadID{
Name: endpoint.TargetRef.Name,
Namespace: endpoint.TargetRef.Namespace,
})
}
}

View File

@ -2755,7 +2755,7 @@ status:
}
// Test that when an EndpointSlice is scaled down, the EndpointsWatcher sends
// all of the Remove events, even if the associated pod is no longer available
// all of the Remove events, even if the associated pod / workload is no longer available
// from the API.
func TestEndpointSliceScaleDown(t *testing.T) {
k8sConfigsWithES := []string{`
@ -2893,6 +2893,339 @@ status:
listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t)
}
// Test that when an endpointslice's endpoints change their readiness status to
// not ready, this is correctly picked up by the subscribers
func TestEndpointSliceChangeNotReady(t *testing.T) {
k8sConfigsWithES := []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`, `
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.12
conditions:
ready: true
targetRef:
kind: Pod
name: name1-1
namespace: ns
- addresses:
- 192.0.2.0
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: wlkd1
namespace: ns
topology:
kubernetes.io/hostname: node-1
kind: EndpointSlice
metadata:
labels:
kubernetes.io/service-name: name1
name: name1-es
namespace: ns
ports:
- name: ""
port: 8989`, `
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
status:
phase: Running
podIP: 172.17.0.12`, `
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: wlkd1
namespace: ns
spec:
meshTls:
identity: foo
serverName: foo
ports:
- port: 8989
workloadIPs:
- ip: 192.0.2.0
status:
conditions:
- type: Ready
status: "True"
`,
}
k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
}
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
listener := newBufferingEndpointListener()
err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
listener.ExpectAdded([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
// Change readiness status for pod and for external workload
es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
unready := false
es.Endpoints[0].Conditions.Ready = &unready
es.Endpoints[1].Conditions.Ready = &unready
_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
time.Sleep(50 * time.Millisecond)
listener.ExpectRemoved([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
}
// Test that when an endpointslice's endpoints change their readiness status to
// ready, this is correctly picked up by the subscribers
func TestEndpointSliceChangeToReady(t *testing.T) {
k8sConfigsWithES := []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`, `
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.12
conditions:
ready: true
targetRef:
kind: Pod
name: name1-1
namespace: ns
- addresses:
- 172.17.0.13
conditions:
ready: false
targetRef:
kind: Pod
name: name1-2
namespace: ns
- addresses:
- 192.0.2.0
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: wlkd1
namespace: ns
topology:
kubernetes.io/hostname: node-1
- addresses:
- 192.0.2.1
conditions:
ready: false
targetRef:
kind: ExternalWorkload
name: wlkd2
namespace: ns
topology:
kubernetes.io/hostname: node-1
kind: EndpointSlice
metadata:
labels:
kubernetes.io/service-name: name1
name: name1-es
namespace: ns
ports:
- name: ""
port: 8989`, `
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
status:
phase: Running
podIP: 172.17.0.12`, `
apiVersion: v1
kind: Pod
metadata:
name: name1-2
namespace: ns
status:
phase: Running
podIP: 172.17.0.13`, `
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: wlkd1
namespace: ns
spec:
meshTls:
identity: foo
serverName: foo
ports:
- port: 8989
workloadIPs:
- ip: 192.0.2.0
status:
conditions:
- type: Ready
status: "True"
`, `
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: wlkd2
namespace: ns
spec:
meshTls:
identity: foo
serverName: foo
ports:
- port: 8989
workloadIPs:
- ip: 192.0.2.1
status:
conditions:
- type: Ready
status: "True"
`,
}
k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
}
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
listener := newBufferingEndpointListener()
err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
// Expect only two endpoints to be added, the rest are not ready
listener.ExpectAdded([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// Change readiness status for pod and for external workload only if they
// are unready
rdy := true
es.Endpoints[1].Conditions.Ready = &rdy
es.Endpoints[3].Conditions.Ready = &rdy
_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
time.Sleep(50 * time.Millisecond)
listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.13:8989", "192.0.2.0:8989", "192.0.2.1:8989"}, t)
}
// Test that when an endpointslice gets a hint added, then mark it as a change
func TestEndpointSliceAddHints(t *testing.T) {
k8sConfigsWithES := []string{`