Implement HTTPScaledObject scoped timeout (#1285)

* Implement HTTPScaledObject scoped timeout

Signed-off-by: Alexander Pykavy <aleksandrpykavyj@gmail.com>

* Add tests for HTTPScaledObject scoped timeout

Signed-off-by: Alexander Pykavy <aleksandrpykavyj@gmail.com>

---------

Signed-off-by: Alexander Pykavy <aleksandrpykavyj@gmail.com>
This commit is contained in:
Alexander Pykavy 2025-07-11 09:41:43 +02:00 committed by GitHub
parent 29a6c2b509
commit bf355649c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 349 additions and 13 deletions

View File

@ -31,7 +31,7 @@ This changelog keeps track of work items that have been completed and are ready
- **General**: Add possibility to skip TLS verification for upstreams in interceptor ([#1307](https://github.com/kedacore/http-add-on/pull/1307))
### Improvements
- **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO))
- **Interceptor**: Support HTTPScaledObject scoped timeout ([#813](https://github.com/kedacore/http-add-on/issues/813))
### Fixes

View File

@ -162,6 +162,22 @@ spec:
metric value
format: int32
type: integer
timeouts:
description: (optional) Timeouts that override the global ones
properties:
conditionWait:
description: How long to wait for the backing workload to have
1 or more replicas before connecting and sending the HTTP request
(Default is set by the KEDA_CONDITION_WAIT_TIMEOUT environment
variable)
type: string
responseHeader:
description: How long to wait between when the HTTP request is
sent to the backing app and when response headers need to arrive
(Default is set by the KEDA_RESPONSE_HEADER_TIMEOUT environment
variable)
type: string
type: object
required:
- scaleTargetRef
type: object

View File

@ -53,23 +53,35 @@ func newForwardingHandler(
tlsCfg *tls.Config,
tracingCfg *config.Tracing,
) http.Handler {
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialCtxFunc,
ForceAttemptHTTP2: fwdCfg.forceAttemptHTTP2,
MaxIdleConns: fwdCfg.maxIdleConns,
IdleConnTimeout: fwdCfg.idleConnTimeout,
TLSHandshakeTimeout: fwdCfg.tlsHandshakeTimeout,
ExpectContinueTimeout: fwdCfg.expectContinueTimeout,
ResponseHeaderTimeout: fwdCfg.respHeaderTimeout,
TLSClientConfig: tlsCfg,
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var uh *handler.Upstream
ctx := r.Context()
httpso := util.HTTPSOFromContext(ctx)
waitFuncCtx, done := context.WithTimeout(ctx, fwdCfg.waitTimeout)
conditionWaitTimeout := fwdCfg.waitTimeout
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialCtxFunc,
ForceAttemptHTTP2: fwdCfg.forceAttemptHTTP2,
MaxIdleConns: fwdCfg.maxIdleConns,
IdleConnTimeout: fwdCfg.idleConnTimeout,
TLSHandshakeTimeout: fwdCfg.tlsHandshakeTimeout,
ExpectContinueTimeout: fwdCfg.expectContinueTimeout,
ResponseHeaderTimeout: fwdCfg.respHeaderTimeout,
TLSClientConfig: tlsCfg,
}
if httpso.Spec.Timeouts != nil {
if httpso.Spec.Timeouts.ConditionWait.Duration > 0 {
conditionWaitTimeout = httpso.Spec.Timeouts.ConditionWait.Duration
}
if httpso.Spec.Timeouts.ResponseHeader.Duration > 0 {
roundTripper.ResponseHeaderTimeout = httpso.Spec.Timeouts.ResponseHeader.Duration
}
}
waitFuncCtx, done := context.WithTimeout(ctx, conditionWaitTimeout)
defer done()
isColdStart, err := waitFunc(
waitFuncCtx,

View File

@ -76,6 +76,17 @@ type RateMetricSpec struct {
Granularity metav1.Duration `json:"granularity" description:"Time granularity for rate calculation"`
}
// HTTPScaledObjectTimeoutsSpec defines timeouts that override the global ones
type HTTPScaledObjectTimeoutsSpec struct {
// How long to wait for the backing workload to have 1 or more replicas before connecting and sending the HTTP request (Default is set by the KEDA_CONDITION_WAIT_TIMEOUT environment variable)
// +optional
ConditionWait metav1.Duration `json:"conditionWait" description:"How long to wait for the backing workload to have 1 or more replicas before connecting and sending the HTTP request"`
// How long to wait between when the HTTP request is sent to the backing app and when response headers need to arrive (Default is set by the KEDA_RESPONSE_HEADER_TIMEOUT environment variable)
// +optional
ResponseHeader metav1.Duration `json:"responseHeader" description:"How long to wait between when the HTTP request is sent to the backing app and when response headers need to arrive"`
}
// HTTPScaledObjectSpec defines the desired state of HTTPScaledObject
type HTTPScaledObjectSpec struct {
// The hosts to route. All requests which the "Host" header
@ -108,6 +119,9 @@ type HTTPScaledObjectSpec struct {
// (optional) Configuration for the metric used for scaling
// +optional
ScalingMetric *ScalingMetricSpec `json:"scalingMetric,omitempty" description:"Configuration for the metric used for scaling. If empty 'concurrency' will be used"`
// (optional) Timeouts that override the global ones
// +optional
Timeouts *HTTPScaledObjectTimeoutsSpec `json:"timeouts,omitempty" description:"Timeouts that override the global ones"`
}
// HTTPScaledObjectStatus defines the observed state of HTTPScaledObject

View File

@ -171,6 +171,11 @@ func (in *HTTPScaledObjectSpec) DeepCopyInto(out *HTTPScaledObjectSpec) {
*out = new(ScalingMetricSpec)
(*in).DeepCopyInto(*out)
}
if in.Timeouts != nil {
in, out := &in.Timeouts, &out.Timeouts
*out = new(HTTPScaledObjectTimeoutsSpec)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPScaledObjectSpec.
@ -203,6 +208,23 @@ func (in *HTTPScaledObjectStatus) DeepCopy() *HTTPScaledObjectStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *HTTPScaledObjectTimeoutsSpec) DeepCopyInto(out *HTTPScaledObjectTimeoutsSpec) {
*out = *in
out.ConditionWait = in.ConditionWait
out.ResponseHeader = in.ResponseHeader
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPScaledObjectTimeoutsSpec.
func (in *HTTPScaledObjectTimeoutsSpec) DeepCopy() *HTTPScaledObjectTimeoutsSpec {
if in == nil {
return nil
}
out := new(HTTPScaledObjectTimeoutsSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RateMetricSpec) DeepCopyInto(out *RateMetricSpec) {
*out = *in

View File

@ -0,0 +1,272 @@
//go:build e2e
// +build e2e
package interceptor_timeouts_test
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"
. "github.com/kedacore/http-add-on/tests/helper"
)
const (
testName = "interceptor-timeouts-test"
)
var (
testNamespace = fmt.Sprintf("%s-ns", testName)
deploymentName = fmt.Sprintf("%s-deployment", testName)
serviceName = fmt.Sprintf("%s-service", testName)
httpScaledObjectName = fmt.Sprintf("%s-http-so", testName)
host = testName
minReplicaCount = 0
maxReplicaCount = 1
requestJobName = fmt.Sprintf("%s-request", testName)
responseDelay = "0"
)
type templateData struct {
TestNamespace string
DeploymentName string
ServiceName string
HTTPScaledObjectName string
ResponseHeaderTimeout string
Host string
MinReplicas int
MaxReplicas int
RequestJobName string
ResponseDelay string
}
const (
serviceTemplate = `
apiVersion: v1
kind: Service
metadata:
name: {{.ServiceName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
ports:
- port: 9898
targetPort: http
protocol: TCP
name: http
selector:
app: {{.DeploymentName}}
`
deploymentTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.DeploymentName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
replicas: 0
selector:
matchLabels:
app: {{.DeploymentName}}
template:
metadata:
labels:
app: {{.DeploymentName}}
spec:
containers:
- name: {{.DeploymentName}}
image: stefanprodan/podinfo:latest
ports:
- name: http
containerPort: 9898
protocol: TCP
readinessProbe:
httpGet:
path: /readyz
port: http
`
loadJobTemplate = `
apiVersion: batch/v1
kind: Job
metadata:
name: {{.RequestJobName}}
namespace: {{.TestNamespace}}
spec:
template:
spec:
containers:
- name: curl-client
image: curlimages/curl
imagePullPolicy: Always
command: ["curl", "-f", "-H", "Host: {{.Host}}", "keda-add-ons-http-interceptor-proxy.keda:8080/delay/{{.ResponseDelay}}"]
restartPolicy: Never
activeDeadlineSeconds: 600
backoffLimit: 2
`
httpScaledObjectWithoutTimeoutsTemplate = `
kind: HTTPScaledObject
apiVersion: http.keda.sh/v1alpha1
metadata:
name: {{.HTTPScaledObjectName}}
namespace: {{.TestNamespace}}
spec:
hosts:
- {{.Host}}
targetPendingRequests: 100
scaledownPeriod: 10
scaleTargetRef:
name: {{.DeploymentName}}
service: {{.ServiceName}}
port: 9898
replicas:
min: {{ .MinReplicas }}
max: {{ .MaxReplicas }}
`
httpScaledObjectWithTimeoutsTemplate = `
kind: HTTPScaledObject
apiVersion: http.keda.sh/v1alpha1
metadata:
name: {{.HTTPScaledObjectName}}
namespace: {{.TestNamespace}}
spec:
hosts:
- {{.Host}}
targetPendingRequests: 100
scaledownPeriod: 10
scaleTargetRef:
name: {{.DeploymentName}}
service: {{.ServiceName}}
port: 9898
replicas:
min: {{ .MinReplicas }}
max: {{ .MaxReplicas }}
timeouts:
responseHeader: "{{ .ResponseHeaderTimeout }}s"
`
)
func TestCheck(t *testing.T) {
// setup
t.Log("--- setting up ---")
// Create kubernetes resources
kc := GetKubernetesClient(t)
data, templates := getTemplateData()
CreateKubernetesResources(t, kc, testNamespace, data, templates)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 6, 10),
"replica count should be %d after 1 minutes", minReplicaCount)
testDefaultTimeouts(t, kc, data)
testCustomTimeouts(t, kc, data)
// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
}
func testDefaultTimeouts(t *testing.T, kc *kubernetes.Clientset, data templateData) {
KubectlApplyWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithoutTimeoutsTemplate)
testDefaultTimeoutPasses(t, kc, data)
testDefaultTimeoutFails(t, kc, data)
KubectlDeleteWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithoutTimeoutsTemplate)
}
func testDefaultTimeoutPasses(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing default timeout passes ---")
KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10),
"replica count should be %d after 1 minutes", maxReplicaCount)
assert.True(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should succeed")
KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10),
"replica count should be %d after 2 minutes", minReplicaCount)
}
func testDefaultTimeoutFails(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing default timeout fails ---")
data.ResponseDelay = "2"
KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10),
"replica count should be %d after 1 minutes", maxReplicaCount)
assert.False(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should fail")
KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10),
"replica count should be %d after 2 minutes", minReplicaCount)
}
func testCustomTimeouts(t *testing.T, kc *kubernetes.Clientset, data templateData) {
data.ResponseHeaderTimeout = "5"
KubectlApplyWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithTimeoutsTemplate)
testCustomTimeoutPasses(t, kc, data)
testCustomTimeoutFails(t, kc, data)
KubectlDeleteWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithTimeoutsTemplate)
}
func testCustomTimeoutPasses(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing custom timeout passes ---")
data.ResponseDelay = "2"
KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10),
"replica count should be %d after 1 minutes", maxReplicaCount)
assert.True(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should succeed")
KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10),
"replica count should be %d after 2 minutes", minReplicaCount)
}
func testCustomTimeoutFails(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing custom timeout fails ---")
data.ResponseDelay = "7"
KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10),
"replica count should be %d after 1 minutes", maxReplicaCount)
assert.False(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should fail")
KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10),
"replica count should be %d after 2 minutes", minReplicaCount)
}
func getTemplateData() (templateData, []Template) {
return templateData{
TestNamespace: testNamespace,
DeploymentName: deploymentName,
ServiceName: serviceName,
HTTPScaledObjectName: httpScaledObjectName,
Host: host,
MinReplicas: minReplicaCount,
MaxReplicas: maxReplicaCount,
RequestJobName: requestJobName,
ResponseDelay: responseDelay,
}, []Template{
{Name: "deploymentTemplate", Config: deploymentTemplate},
{Name: "serviceNameTemplate", Config: serviceTemplate},
}
}