Support exec readiness probes for sidecar containers (#15773)

* Support exec readiness probes for sidecar containers

* disable probe optimization when sidecar containers have an exec probe

* Fix linter issue in tests
This commit is contained in:
Florian MEDJA 2025-04-17 21:58:03 -04:00 committed by GitHub
parent a221c53d3a
commit c7f03afb49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 174 additions and 2 deletions

View File

@ -25,6 +25,7 @@ import (
"context"
"k8s.io/apimachinery/pkg/types"
v1 "knative.dev/serving/pkg/apis/serving/v1"
)
@ -50,7 +51,7 @@ func RevisionFrom(ctx context.Context) *v1.Revision {
return ctx.Value(revCtxKey{}).(*revCtx).revision
}
// RevIDFrom retrieves the the revisionID from the context.
// RevIDFrom retrieves the revisionID from the context.
func RevIDFrom(ctx context.Context) types.NamespacedName {
return ctx.Value(revCtxKey{}).(*revCtx).revID
}

View File

@ -542,6 +542,16 @@ func (rbm *revisionBackendsManager) getOrCreateRevisionWatcher(revID types.Names
if sp := rev.Spec.GetContainer().StartupProbe; sp != nil {
enableProbeOptimisation = false
}
// Startup probes for sidecars are executed by Kubelet, so we can only mark the container as ready
// once K8s sees it as ready.
if len(rev.Spec.GetSidecarContainers()) > 0 {
for _, sc := range rev.Spec.GetSidecarContainers() {
if sp := sc.StartupProbe; sp != nil {
enableProbeOptimisation = false
break
}
}
}
destsCh := make(chan dests)
rw := newRevisionWatcher(rbm.ctx, revID, rev.GetProtocol(), rbm.updateCh, destsCh, rbm.transport, rbm.serviceLister, rbm.usePassthroughLb, rbm.meshMode, enableProbeOptimisation, rbm.logger)

View File

@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
pkgnet "knative.dev/networking/pkg/apis/networking"
@ -38,6 +39,7 @@ import (
pkgnetwork "knative.dev/pkg/network"
"knative.dev/pkg/ptr"
rtesting "knative.dev/pkg/reconciler/testing"
activatortest "knative.dev/serving/pkg/activator/testing"
"knative.dev/serving/pkg/apis/serving"
v1 "knative.dev/serving/pkg/apis/serving/v1"
@ -889,6 +891,78 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) {
},
},
updateCnt: 1,
}, {
name: "pod with sidecar container with exec probe only goes ready when kubernetes agrees",
endpointsArr: []*corev1.Endpoints{epNotReady(testRevision, 1234, "http", nil, []string{"128.0.0.1"})},
revisions: []*v1.Revision{
revision(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1, 1, func(r *v1.Revision) {
r.Spec.Containers[0].ReadinessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.FromInt32(8080),
},
},
}
r.Spec.Containers = append(r.Spec.Containers, corev1.Container{
Name: "sidecar",
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{},
},
},
})
}),
},
services: []*corev1.Service{
privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1",
[]corev1.ServicePort{{Name: "http", Port: 1234}}),
},
probeHostResponses: map[string][]activatortest.FakeResponse{
"129.0.0.1:1234": {{
Err: errors.New("clusterIP transport error"),
}},
"128.0.0.1:1234": {{
Code: http.StatusOK,
Body: queue.Name,
}},
},
expectDests: map[types.NamespacedName]revisionDestsUpdate{},
updateCnt: 0,
}, {
name: "pod with sidecar container with exec probe goes ready when kubernetes agrees",
endpointsArr: []*corev1.Endpoints{epNotReady(testRevision, 1234, "http", []string{"128.0.0.1"}, nil)},
revisions: []*v1.Revision{
revision(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1, 1, func(r *v1.Revision) {
//
r.Spec.Containers = append(r.Spec.Containers, corev1.Container{
Name: "sidecar",
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{},
},
},
})
}),
},
services: []*corev1.Service{
privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1",
[]corev1.ServicePort{{Name: "http", Port: 1234}}),
},
probeHostResponses: map[string][]activatortest.FakeResponse{
"129.0.0.1:1234": {{
Err: errors.New("clusterIP transport error"),
}},
"128.0.0.1:1234": {{
Code: http.StatusOK,
Body: queue.Name,
}},
},
expectDests: map[types.NamespacedName]revisionDestsUpdate{
{Namespace: testNamespace, Name: testRevision}: {
Dests: sets.New("128.0.0.1:1234"),
},
},
updateCnt: 1,
}} {
t.Run(tc.name, func(t *testing.T) {
fakeRT := activatortest.FakeRoundTripper{

View File

@ -1521,6 +1521,45 @@ func TestMakePodSpec(t *testing.T) {
withEnvVar("SERVING_READINESS_PROBE", `[{"httpGet":{"path":"/","port":8080,"host":"127.0.0.1","scheme":"HTTP"}},{"httpGet":{"path":"/","port":8090,"host":"127.0.0.1","scheme":"HTTP"}}]`),
),
}),
}, {
name: "with multiple containers with exec probes",
rev: revision("bar", "foo",
withContainers([]corev1.Container{{
Name: servingContainerName,
Image: "busybox",
Ports: buildContainerPorts(v1.DefaultUserPort),
ReadinessProbe: withExecReadinessProbe([]string{"bin/sh", "serving.sh"}),
}, {
Name: sidecarContainerName,
Image: "Ubuntu",
ReadinessProbe: withExecReadinessProbe([]string{"bin/sh", "sidecar.sh"}),
}}),
WithContainerStatuses([]v1.ContainerStatus{{
ImageDigest: "busybox@sha256:deadbeef",
}, {
ImageDigest: "ubuntu@sha256:deadbffe",
}}),
),
fc: apicfg.Features{
MultiContainerProbing: apicfg.Enabled,
},
want: podSpec(
[]corev1.Container{
servingContainer(func(container *corev1.Container) {
container.Image = "busybox@sha256:deadbeef"
container.ReadinessProbe = withExecReadinessProbe([]string{"bin/sh", "serving.sh"})
}),
sidecarContainer(sidecarContainerName,
func(container *corev1.Container) {
container.Image = "ubuntu@sha256:deadbffe"
container.ReadinessProbe = withExecReadinessProbe([]string{"bin/sh", "sidecar.sh"})
},
),
queueContainer(
withEnvVar("ENABLE_MULTI_CONTAINER_PROBES", "true"),
withEnvVar("SERVING_READINESS_PROBE", `[{"tcpSocket":{"port":8080,"host":"127.0.0.1"}}]`),
),
}),
}, {
name: "with default affinity type set",
rev: revision("bar", "foo",

View File

@ -312,6 +312,9 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container
probePort = sc.ReadinessProbe.TCPSocket.Port.IntVal
case sc.ReadinessProbe.GRPC != nil && sc.ReadinessProbe.GRPC.Port > 0:
probePort = sc.ReadinessProbe.GRPC.Port
case sc.ReadinessProbe.Exec != nil:
// Skip the queue-proxy optimization for readiness probing when exec probe is defined on a sidecar container
continue
default:
return nil, fmt.Errorf("sidecar readiness probe does not define probe port on container: %s", sc.Name)
}

View File

@ -41,6 +41,7 @@ import (
"knative.dev/pkg/ptr"
"knative.dev/pkg/system"
tracingconfig "knative.dev/pkg/tracing/config"
apicfg "knative.dev/serving/pkg/apis/config"
"knative.dev/serving/pkg/apis/serving"
v1 "knative.dev/serving/pkg/apis/serving/v1"
@ -441,7 +442,51 @@ func TestMakeQueueContainer(t *testing.T) {
"ENABLE_MULTI_CONTAINER_PROBES": "true",
})
}),
}}
}, {
name: "multi container probing enabled with exec probes on all containers",
rev: revision("bar", "foo", withContainers([]corev1.Container{
{
Name: servingContainerName,
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"bin/sh", "serving.sh"},
},
},
},
Ports: []corev1.ContainerPort{{
ContainerPort: 1955,
Name: string(netapi.ProtocolH2C),
}},
},
{
Name: sidecarContainerName,
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"bin/sh", "sidecar.sh"},
},
},
},
},
})),
fc: apicfg.Features{
MultiContainerProbing: apicfg.Enabled,
},
dc: deployment.Config{
ProgressDeadline: 0 * time.Second,
},
want: queueContainer(func(c *corev1.Container) {
c.Ports = append(queueNonServingPorts, queueHTTP2Port, queueHTTPSPort)
c.ReadinessProbe.HTTPGet.Port.IntVal = queueHTTP2Port.ContainerPort
c.Env = env(map[string]string{
"ENABLE_MULTI_CONTAINER_PROBES": "true",
"USER_PORT": "1955",
"QUEUE_SERVING_PORT": "8013",
})
}),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {