Update endpoints watcher to not fetch pods for removed endpoints (#10013)

Fixes #10003

When endpoints are removed from an EndpointSlice resource, the destination controller builds a list of addresses to remove.  However, if any of the removed endpoints have a Pod as their targetRef, we will attempt to fetch that pod to build the address to remove.  If that pod has already been removed from the informer cache, this will fail and the endpoint will be skipped in the list of endpoints to be removed.  This results in stale endpoints being stuck in the address set and never being removed.

We update the endpoint watcher to construct only a list of endpoint IDs for endpoints to remove, rather than fetching the entire pod object.  Since we no longer attempt to fetch the pod, this operation is now infallible and endpoints will no longer be skipped during removal.

We also add a `TestEndpointSliceScaleDown` test to exercise this.

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2023-01-03 10:04:02 -08:00 committed by GitHub
parent 68be64083c
commit 768e04dd7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 213 additions and 30 deletions

View File

@ -634,8 +634,7 @@ func (pp *portPublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice,
updatedAddressSet.Addresses[id] = address
}
oldAddressSet := pp.endpointSliceToAddresses(oldSlice)
for id := range oldAddressSet.Addresses {
for _, id := range pp.endpointSliceToIDs(oldSlice) {
delete(updatedAddressSet.Addresses, id)
}
@ -771,6 +770,55 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A
}
}
// endpointSliceToIDs is similar to endpointSliceToAddresses but instead returns
// only the IDs of the endpoints rather than the addresses themselves.
func (pp *portPublisher) endpointSliceToIDs(es *discovery.EndpointSlice) []ID {
resolvedPort := pp.resolveESTargetPort(es.Ports)
if resolvedPort == undefinedEndpointPort {
return []ID{}
}
serviceID, err := getEndpointSliceServiceID(es)
if err != nil {
pp.log.Errorf("Could not fetch resource service name:%v", err)
}
ids := []ID{}
for _, endpoint := range es.Endpoints {
if endpoint.Hostname != nil {
if pp.hostname != "" && pp.hostname != *endpoint.Hostname {
continue
}
}
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
continue
}
if endpoint.TargetRef == nil {
for _, IPAddr := range endpoint.Addresses {
ids = append(ids, ServiceID{
Name: strings.Join([]string{
serviceID.Name,
IPAddr,
fmt.Sprint(resolvedPort),
}, "-"),
Namespace: es.Namespace,
})
}
continue
}
if endpoint.TargetRef.Kind == endpointTargetRefPod {
ids = append(ids, PodID{
Name: endpoint.TargetRef.Name,
Namespace: endpoint.TargetRef.Namespace,
})
}
}
return ids
}
func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) AddressSet {
addresses := make(map[ID]Address)
for _, subset := range endpoints.Subsets {

View File

@ -1,16 +1,21 @@
package watcher
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"testing"
"time"
"github.com/linkerd/linkerd2/controller/k8s"
consts "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/testutil"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
dv1 "k8s.io/api/discovery/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -2046,3 +2051,133 @@ status:
})
}
}
// Test that when an EndpointSlice is scaled down, the EndpointsWatcher sends
// all of the Remove events, even if the associated pod is no longer available
// from the API.
func TestEndpointSliceScaleDown(t *testing.T) {
k8sConfigsWithES := []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`, `
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.12
conditions:
ready: true
targetRef:
kind: Pod
name: name1-1
namespace: ns
topology:
kubernetes.io/hostname: node-1
kind: EndpointSlice
metadata:
labels:
kubernetes.io/service-name: name1
name: name1-es
namespace: ns
ports:
- name: ""
port: 8989`, `
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
status:
phase: Running
podIP: 172.17.0.12`}
// Create an EndpointSlice with one endpoint, backed by a pod.
k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
k8sAPI.Sync(nil)
listener := newBufferingEndpointListener()
err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
// Delete the backing pod and scale the EndpointSlice to 0 endpoints.
err = k8sAPI.Client.CoreV1().Pods("ns").Delete(context.Background(), "name1-1", metav1.DeleteOptions{})
if err != nil {
t.Fatal(err)
}
// It may take some time before the pod deletion is recognized by the
// lister. We wait until the lister sees the pod as deleted.
err = testutil.RetryFor(time.Second*30, func() error {
_, err := k8sAPI.Pod().Lister().Pods("ns").Get("name1-1")
if kerrors.IsNotFound(err) {
return nil
}
if err == nil {
return errors.New("pod should be deleted, but still exists in lister")
}
return err
})
if err != nil {
t.Fatal(err)
}
ES, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
emptyES := &dv1.EndpointSlice{
AddressType: "IPv4",
ObjectMeta: metav1.ObjectMeta{
Name: "name1-es", Namespace: "ns",
Labels: map[string]string{dv1.LabelServiceName: "name1"},
},
Endpoints: []dv1.Endpoint{},
Ports: []dv1.EndpointPort{},
}
watcher.updateEndpointSlice(ES, emptyES)
// Ensure the watcher emits a remove event.
listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t)
}

View File

@ -53,7 +53,7 @@ func TestGoodEndpoints(t *testing.T) {
testName := fmt.Sprintf("expect endpoints created for %s", endpointCase.name)
t.Run(testName, func(t *testing.T) {
err = TestHelper.RetryFor(5*time.Second, func() error {
err = testutil.RetryFor(5*time.Second, func() error {
out, err = TestHelper.LinkerdRun("diagnostics", "endpoints", endpointCase.authority, "-ojson")
if err != nil {
return fmt.Errorf("failed to get endpoints for %s: %w", endpointCase.authority, err)

View File

@ -73,7 +73,7 @@ func TestInstallCNIPlugin(t *testing.T) {
// perform a linkerd check with --linkerd-cni-enabled
timeout := time.Minute
err = TestHelper.RetryFor(timeout, func() error {
err = testutil.RetryFor(timeout, func() error {
out, err = TestHelper.LinkerdRun("check", "--pre", "--linkerd-cni-enabled", "--wait=60m")
if err != nil {
return err

View File

@ -95,7 +95,7 @@ func TestLocalhostServer(t *testing.T) {
}
}
err = TestHelper.RetryFor(50*time.Second, func() error {
err = testutil.RetryFor(50*time.Second, func() error {
// Use a short time window so that transient errors at startup
// fall out of the window.
metrics, err := TestHelper.LinkerdRun("diagnostics", "proxy-metrics", "-n", ns, "deploy/slow-cooker")

View File

@ -229,7 +229,7 @@ func runTests(ctx context.Context, t *testing.T, ns string, tcs []testCase) {
t.Helper()
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
err := TestHelper.RetryFor(30*time.Second, func() error {
err := testutil.RetryFor(30*time.Second, func() error {
if err := checkPodMetrics(ctx, ns, tc.scName, tc.scChecks); err != nil {
return fmt.Errorf("failed to check metrics for client pod: %w", err)
}

View File

@ -80,7 +80,7 @@ func TestSkipInboundPorts(t *testing.T) {
t.Run("check webapp metrics", func(t *testing.T) {
// Wait for slow-cookers to start sending requests by using a short
// time window through RetryFor.
err := TestHelper.RetryFor(30*time.Second, func() error {
err := testutil.RetryFor(30*time.Second, func() error {
pods, err := TestHelper.GetPods(ctx, ns, map[string]string{"app": "webapp"})
if err != nil {
return fmt.Errorf("error getting pods\n%w", err)

View File

@ -70,7 +70,7 @@ func verifyInstallApp(ctx context.Context, t *testing.T) {
}
func checkAppWoks(t *testing.T, timeout time.Duration) error {
return TestHelper.RetryFor(timeout, func() error {
return testutil.RetryFor(timeout, func() error {
args := []string{"viz", "stat", "deploy", "-n", TestHelper.GetTestNamespace(TestAppNamespaceSuffix), "--from", "deploy/slow-cooker", "-t", "1m"}
out, err := TestHelper.LinkerdRun(args...)
if err != nil {
@ -119,7 +119,7 @@ func verifyRotateExternalCerts(ctx context.Context, t *testing.T) {
func verifyIdentityServiceReloadsIssuerCert(t *testing.T) {
// check that the identity service has received an IssuerUpdated event
timeout := 90 * time.Second
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
out, err := TestHelper.Kubectl("",
"--namespace", TestHelper.GetLinkerdNamespace(),
"get", "events", "--field-selector", "reason=IssuerUpdated", "-ojson",

View File

@ -65,7 +65,7 @@ func TestRabbitMQDeploy(t *testing.T) {
// Verify client output
golden := "check.rabbitmq.golden"
timeout := 50 * time.Second
err = TestHelper.RetryFor(timeout, func() error {
err = testutil.RetryFor(timeout, func() error {
out, err := TestHelper.Kubectl("", "-n", testNamespace, "logs", "-lapp=rabbitmq-client", "-crabbitmq-client")
if err != nil {
return fmt.Errorf("'kubectl logs -l app=rabbitmq-client -c rabbitmq-client' command failed\n%w", err)

View File

@ -233,7 +233,7 @@ func TestCliStatForLinkerdNamespace(t *testing.T) {
tt := tt // pin
timeout := 20 * time.Second
t.Run("linkerd "+strings.Join(tt.args, " "), func(t *testing.T) {
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
// Use a short time window so that transient errors at startup
// fall out of the window.
tt.args = append(tt.args, "-t", "30s")

View File

@ -162,7 +162,7 @@ func TestInjectAutoParams(t *testing.T) {
}
var pod *v1.Pod
err = TestHelper.RetryFor(30*time.Second, func() error {
err = testutil.RetryFor(30*time.Second, func() error {
pods, err := TestHelper.GetPodsForDeployment(ctx, ns, deployName)
if err != nil {
return fmt.Errorf("failed to get pods for namespace %s", ns)

View File

@ -75,7 +75,7 @@ func TestSmoke(t *testing.T) {
// Use a short time window for check tests to get rid of transient
// errors
timeout := 5 * time.Minute
err = TestHelper.RetryFor(timeout, func() error {
err = testutil.RetryFor(timeout, func() error {
out, err := TestHelper.LinkerdRun(cmd...)
if err != nil {
return fmt.Errorf("'linkerd check' command failed\n%w\n%s", err, out)

View File

@ -90,7 +90,7 @@ func TestGateways(t *testing.T) {
})
timeout := time.Minute
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
out, err := TestHelper.LinkerdRun("--context="+contexts[testutil.SourceContextKey], "multicluster", "gateways")
if err != nil {
return err
@ -178,7 +178,7 @@ func TestTargetTraffic(t *testing.T) {
})
timeout := time.Minute
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
out, err := TestHelper.KubectlWithContext("",
targetCtx,
"--namespace", ns,
@ -259,7 +259,7 @@ func TestMulticlusterStatefulSetTargetTraffic(t *testing.T) {
t.Run("expect open outbound TCP connection from gateway to nginx", func(t *testing.T) {
// Use a short time window so that slow-cooker can warm-up and send
// requests.
err := TestHelper.RetryFor(1*time.Minute, func() error {
err := testutil.RetryFor(1*time.Minute, func() error {
// Check gateway metrics
metrics, err := TestHelper.LinkerdRun(dgCmd...)
if err != nil {

View File

@ -49,7 +49,7 @@ func TestEdges(t *testing.T) {
"-ojson",
}
r := regexp.MustCompile(b.String())
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
out, err := TestHelper.LinkerdRun(cmd...)
if err != nil {
t.Fatal(err)
@ -141,7 +141,7 @@ func TestDirectEdges(t *testing.T) {
// check edges
timeout := 50 * time.Second
testDataPath := "testdata"
err = TestHelper.RetryFor(timeout, func() error {
err = testutil.RetryFor(timeout, func() error {
out, err = TestHelper.LinkerdRun("-n", testNamespace, "-o", "json", "viz", "edges", "deploy")
if err != nil {
return err

View File

@ -115,7 +115,7 @@ func TestPolicy(t *testing.T) {
tt := tt // pin
timeout := 3 * time.Minute
t.Run("linkerd "+strings.Join(tt.args, " "), func(t *testing.T) {
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
// Use a short time window so that transient errors at startup
// fall out of the window.
tt.args = append(tt.args, "-t", "30s")

View File

@ -226,7 +226,7 @@ func testMetrics(t *testing.T) {
func assertRouteStat(upstream, namespace, downstream string, t *testing.T, assertFn func(stat *cmd2.JSONRouteStats) error) {
const routePath = "GET /testpath"
timeout := 2 * time.Minute
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
routes, err := getRoutes(upstream, namespace, []string{"--to", downstream})
if err != nil {
return fmt.Errorf("'linkerd routes' command failed: %w", err)
@ -286,7 +286,7 @@ func getRoutes(deployName, namespace string, additionalArgs []string) ([]*cmd2.J
cmd = append(cmd, "--output", "json")
var results map[string][]*cmd2.JSONRouteStats
err := TestHelper.RetryFor(2*time.Minute, func() error {
err := testutil.RetryFor(2*time.Minute, func() error {
out, err := TestHelper.LinkerdRun(cmd...)
if err != nil {
return err

View File

@ -233,7 +233,7 @@ func TestCliStatForLinkerdNamespace(t *testing.T) {
tt := tt // pin
timeout := 20 * time.Second
t.Run("linkerd "+strings.Join(tt.args, " "), func(t *testing.T) {
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
// Use a short time window so that transient errors at startup
// fall out of the window.
tt.args = append(tt.args, "-t", "30s")

View File

@ -68,7 +68,7 @@ func TestTracing(t *testing.T) {
checkCmd := []string{"jaeger", "check", "--wait=0"}
golden := "check.jaeger.golden"
timeout := time.Minute
err = TestHelper.RetryFor(timeout, func() error {
err = testutil.RetryFor(timeout, func() error {
out, err := TestHelper.LinkerdRun(checkCmd...)
if err != nil {
return fmt.Errorf("'linkerd jaeger check' command failed\n%w\n%s", err, out)
@ -159,7 +159,7 @@ func TestTracing(t *testing.T) {
t.Run("expect full trace", func(t *testing.T) {
timeout := 3 * time.Minute
err = TestHelper.RetryFor(timeout, func() error {
err = testutil.RetryFor(timeout, func() error {
url, err := TestHelper.URLFor(ctx, tracingNs, "jaeger", 16686)
if err != nil {
return err

View File

@ -98,7 +98,7 @@ func TestTrafficSplitCliWithSP(t *testing.T) {
t.Run(fmt.Sprintf("ensure traffic is sent to one backend only for %s", version), func(t *testing.T) {
timeout := 40 * time.Second
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
out, err := TestHelper.LinkerdRun("viz", "stat", "deploy", "--namespace", prefixedNs, "--from", "deploy/slow-cooker", "-t", "30s")
if err != nil {
return err
@ -148,7 +148,7 @@ func TestTrafficSplitCliWithSP(t *testing.T) {
t.Run(fmt.Sprintf("ensure traffic is sent to both backends for %s", version), func(t *testing.T) {
timeout := 40 * time.Second
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
out, err := TestHelper.LinkerdRun("viz", "stat", "deploy", "-n", prefixedNs, "--from", "deploy/slow-cooker", "-t", "30s")
if err != nil {

View File

@ -260,7 +260,7 @@ func NewTestHelper() *TestHelper {
}
testHelper.version = strings.TrimSpace(version)
kubernetesHelper, err := NewKubernetesHelper(*k8sContext, testHelper.RetryFor)
kubernetesHelper, err := NewKubernetesHelper(*k8sContext, RetryFor)
if err != nil {
exit(1, fmt.Sprintf("error creating kubernetes helper: %s", err.Error()))
}
@ -618,7 +618,7 @@ func (h *TestHelper) CheckVersion(serverVersion string) error {
// RetryFor retries a given function every second until the function returns
// without an error, or a timeout is reached. If the timeout is reached, it
// returns the last error received from the function.
func (h *TestHelper) RetryFor(timeout time.Duration, fn func() error) error {
func RetryFor(timeout time.Duration, fn func() error) error {
err := fn()
if err == nil {
return nil
@ -648,7 +648,7 @@ func (h *TestHelper) RetryFor(timeout time.Duration, fn func() error) error {
// giving pods time to start.
func (h *TestHelper) HTTPGetURL(url string) (string, error) {
var body string
err := h.RetryFor(time.Minute, func() error {
err := RetryFor(time.Minute, func() error {
resp, err := h.httpClient.Get(url)
if err != nil {
return err

View File

@ -63,7 +63,7 @@ func (h *TestHelper) TestCheckProxy(expectedVersion, namespace string) error {
func (h *TestHelper) testCheck(cmd []string, categories []healthcheck.CategoryID) error {
timeout := time.Minute * 10
return h.RetryFor(timeout, func() error {
return RetryFor(timeout, func() error {
res, err := h.LinkerdRun(cmd...)
if err != nil {
return fmt.Errorf("'linkerd check' command failed\n%w\n%s", err, res)