From c7f03afb49d4cfa24b13d663fbc290fa14c7dc94 Mon Sep 17 00:00:00 2001 From: Florian MEDJA Date: Thu, 17 Apr 2025 21:58:03 -0400 Subject: [PATCH] Support exec readiness probes for sidecar containers (#15773) * Support exec readiness probes for sidecar containers * disable probe optimization when sidecar containers have an exec probe * Fix linter issue in tests --- pkg/activator/handler/context.go | 3 +- pkg/activator/net/revision_backends.go | 10 +++ pkg/activator/net/revision_backends_test.go | 74 +++++++++++++++++++ .../revision/resources/deploy_test.go | 39 ++++++++++ pkg/reconciler/revision/resources/queue.go | 3 + .../revision/resources/queue_test.go | 47 +++++++++++- 6 files changed, 174 insertions(+), 2 deletions(-) diff --git a/pkg/activator/handler/context.go b/pkg/activator/handler/context.go index 0b6cc88ff6..f0aaf7da7c 100644 --- a/pkg/activator/handler/context.go +++ b/pkg/activator/handler/context.go @@ -25,6 +25,7 @@ import ( "context" "k8s.io/apimachinery/pkg/types" + v1 "knative.dev/serving/pkg/apis/serving/v1" ) @@ -50,7 +51,7 @@ func RevisionFrom(ctx context.Context) *v1.Revision { return ctx.Value(revCtxKey{}).(*revCtx).revision } -// RevIDFrom retrieves the the revisionID from the context. +// RevIDFrom retrieves the revisionID from the context. func RevIDFrom(ctx context.Context) types.NamespacedName { return ctx.Value(revCtxKey{}).(*revCtx).revID } diff --git a/pkg/activator/net/revision_backends.go b/pkg/activator/net/revision_backends.go index 8940054393..1cc9620426 100644 --- a/pkg/activator/net/revision_backends.go +++ b/pkg/activator/net/revision_backends.go @@ -542,6 +542,16 @@ func (rbm *revisionBackendsManager) getOrCreateRevisionWatcher(revID types.Names if sp := rev.Spec.GetContainer().StartupProbe; sp != nil { enableProbeOptimisation = false } + // Startup probes for sidecars are executed by Kubelet, so we can only mark the container as ready + // once K8s sees it as ready. + if len(rev.Spec.GetSidecarContainers()) > 0 { + for _, sc := range rev.Spec.GetSidecarContainers() { + if sp := sc.StartupProbe; sp != nil { + enableProbeOptimisation = false + break + } + } + } destsCh := make(chan dests) rw := newRevisionWatcher(rbm.ctx, revID, rev.GetProtocol(), rbm.updateCh, destsCh, rbm.transport, rbm.serviceLister, rbm.usePassthroughLb, rbm.meshMode, enableProbeOptimisation, rbm.logger) diff --git a/pkg/activator/net/revision_backends_test.go b/pkg/activator/net/revision_backends_test.go index 9e1f1e2e1d..6137226d9e 100644 --- a/pkg/activator/net/revision_backends_test.go +++ b/pkg/activator/net/revision_backends_test.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" pkgnet "knative.dev/networking/pkg/apis/networking" @@ -38,6 +39,7 @@ import ( pkgnetwork "knative.dev/pkg/network" "knative.dev/pkg/ptr" rtesting "knative.dev/pkg/reconciler/testing" + activatortest "knative.dev/serving/pkg/activator/testing" "knative.dev/serving/pkg/apis/serving" v1 "knative.dev/serving/pkg/apis/serving/v1" @@ -889,6 +891,78 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { }, }, updateCnt: 1, + }, { + name: "pod with sidecar container with exec probe only goes ready when kubernetes agrees", + endpointsArr: []*corev1.Endpoints{epNotReady(testRevision, 1234, "http", nil, []string{"128.0.0.1"})}, + revisions: []*v1.Revision{ + revision(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1, 1, func(r *v1.Revision) { + r.Spec.Containers[0].ReadinessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt32(8080), + }, + }, + } + r.Spec.Containers = append(r.Spec.Containers, corev1.Container{ + Name: "sidecar", + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{}, + }, + }, + }) + }), + }, + services: []*corev1.Service{ + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1", + []corev1.ServicePort{{Name: "http", Port: 1234}}), + }, + probeHostResponses: map[string][]activatortest.FakeResponse{ + "129.0.0.1:1234": {{ + Err: errors.New("clusterIP transport error"), + }}, + "128.0.0.1:1234": {{ + Code: http.StatusOK, + Body: queue.Name, + }}, + }, + expectDests: map[types.NamespacedName]revisionDestsUpdate{}, + updateCnt: 0, + }, { + name: "pod with sidecar container with exec probe goes ready when kubernetes agrees", + endpointsArr: []*corev1.Endpoints{epNotReady(testRevision, 1234, "http", []string{"128.0.0.1"}, nil)}, + revisions: []*v1.Revision{ + revision(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1, 1, func(r *v1.Revision) { + // + r.Spec.Containers = append(r.Spec.Containers, corev1.Container{ + Name: "sidecar", + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{}, + }, + }, + }) + }), + }, + services: []*corev1.Service{ + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1", + []corev1.ServicePort{{Name: "http", Port: 1234}}), + }, + probeHostResponses: map[string][]activatortest.FakeResponse{ + "129.0.0.1:1234": {{ + Err: errors.New("clusterIP transport error"), + }}, + "128.0.0.1:1234": {{ + Code: http.StatusOK, + Body: queue.Name, + }}, + }, + expectDests: map[types.NamespacedName]revisionDestsUpdate{ + {Namespace: testNamespace, Name: testRevision}: { + Dests: sets.New("128.0.0.1:1234"), + }, + }, + updateCnt: 1, }} { t.Run(tc.name, func(t *testing.T) { fakeRT := activatortest.FakeRoundTripper{ diff --git a/pkg/reconciler/revision/resources/deploy_test.go b/pkg/reconciler/revision/resources/deploy_test.go index 1b9496e100..1925c8bd62 100644 --- a/pkg/reconciler/revision/resources/deploy_test.go +++ b/pkg/reconciler/revision/resources/deploy_test.go @@ -1521,6 +1521,45 @@ func TestMakePodSpec(t *testing.T) { withEnvVar("SERVING_READINESS_PROBE", `[{"httpGet":{"path":"/","port":8080,"host":"127.0.0.1","scheme":"HTTP"}},{"httpGet":{"path":"/","port":8090,"host":"127.0.0.1","scheme":"HTTP"}}]`), ), }), + }, { + name: "with multiple containers with exec probes", + rev: revision("bar", "foo", + withContainers([]corev1.Container{{ + Name: servingContainerName, + Image: "busybox", + Ports: buildContainerPorts(v1.DefaultUserPort), + ReadinessProbe: withExecReadinessProbe([]string{"bin/sh", "serving.sh"}), + }, { + Name: sidecarContainerName, + Image: "Ubuntu", + ReadinessProbe: withExecReadinessProbe([]string{"bin/sh", "sidecar.sh"}), + }}), + WithContainerStatuses([]v1.ContainerStatus{{ + ImageDigest: "busybox@sha256:deadbeef", + }, { + ImageDigest: "ubuntu@sha256:deadbffe", + }}), + ), + fc: apicfg.Features{ + MultiContainerProbing: apicfg.Enabled, + }, + want: podSpec( + []corev1.Container{ + servingContainer(func(container *corev1.Container) { + container.Image = "busybox@sha256:deadbeef" + container.ReadinessProbe = withExecReadinessProbe([]string{"bin/sh", "serving.sh"}) + }), + sidecarContainer(sidecarContainerName, + func(container *corev1.Container) { + container.Image = "ubuntu@sha256:deadbffe" + container.ReadinessProbe = withExecReadinessProbe([]string{"bin/sh", "sidecar.sh"}) + }, + ), + queueContainer( + withEnvVar("ENABLE_MULTI_CONTAINER_PROBES", "true"), + withEnvVar("SERVING_READINESS_PROBE", `[{"tcpSocket":{"port":8080,"host":"127.0.0.1"}}]`), + ), + }), }, { name: "with default affinity type set", rev: revision("bar", "foo", diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index 1ef7f58e40..9c2099761c 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -312,6 +312,9 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container probePort = sc.ReadinessProbe.TCPSocket.Port.IntVal case sc.ReadinessProbe.GRPC != nil && sc.ReadinessProbe.GRPC.Port > 0: probePort = sc.ReadinessProbe.GRPC.Port + case sc.ReadinessProbe.Exec != nil: + // Skip the queue-proxy optimization for readiness probing when exec probe is defined on a sidecar container + continue default: return nil, fmt.Errorf("sidecar readiness probe does not define probe port on container: %s", sc.Name) } diff --git a/pkg/reconciler/revision/resources/queue_test.go b/pkg/reconciler/revision/resources/queue_test.go index fe29518e63..451a082ca3 100644 --- a/pkg/reconciler/revision/resources/queue_test.go +++ b/pkg/reconciler/revision/resources/queue_test.go @@ -41,6 +41,7 @@ import ( "knative.dev/pkg/ptr" "knative.dev/pkg/system" tracingconfig "knative.dev/pkg/tracing/config" + apicfg "knative.dev/serving/pkg/apis/config" "knative.dev/serving/pkg/apis/serving" v1 "knative.dev/serving/pkg/apis/serving/v1" @@ -441,7 +442,51 @@ func TestMakeQueueContainer(t *testing.T) { "ENABLE_MULTI_CONTAINER_PROBES": "true", }) }), - }} + }, { + name: "multi container probing enabled with exec probes on all containers", + rev: revision("bar", "foo", withContainers([]corev1.Container{ + { + Name: servingContainerName, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"bin/sh", "serving.sh"}, + }, + }, + }, + Ports: []corev1.ContainerPort{{ + ContainerPort: 1955, + Name: string(netapi.ProtocolH2C), + }}, + }, + { + Name: sidecarContainerName, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"bin/sh", "sidecar.sh"}, + }, + }, + }, + }, + })), + fc: apicfg.Features{ + MultiContainerProbing: apicfg.Enabled, + }, + dc: deployment.Config{ + ProgressDeadline: 0 * time.Second, + }, + want: queueContainer(func(c *corev1.Container) { + c.Ports = append(queueNonServingPorts, queueHTTP2Port, queueHTTPSPort) + c.ReadinessProbe.HTTPGet.Port.IntVal = queueHTTP2Port.ContainerPort + c.Env = env(map[string]string{ + "ENABLE_MULTI_CONTAINER_PROBES": "true", + "USER_PORT": "1955", + "QUEUE_SERVING_PORT": "8013", + }) + }), + }, + } for _, test := range tests { t.Run(test.name, func(t *testing.T) {