diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ac247d..2cd4df1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ This changelog keeps track of work items that have been completed and are ready - **General**: Add possibility to skip TLS verification for upstreams in interceptor ([#1307](https://github.com/kedacore/http-add-on/pull/1307)) ### Improvements -- **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO)) +- **Interceptor**: Support HTTPScaledObject scoped timeout ([#813](https://github.com/kedacore/http-add-on/issues/813)) ### Fixes diff --git a/config/crd/bases/http.keda.sh_httpscaledobjects.yaml b/config/crd/bases/http.keda.sh_httpscaledobjects.yaml index fce12d7..7872e4a 100644 --- a/config/crd/bases/http.keda.sh_httpscaledobjects.yaml +++ b/config/crd/bases/http.keda.sh_httpscaledobjects.yaml @@ -162,6 +162,22 @@ spec: metric value format: int32 type: integer + timeouts: + description: (optional) Timeouts that override the global ones + properties: + conditionWait: + description: How long to wait for the backing workload to have + 1 or more replicas before connecting and sending the HTTP request + (Default is set by the KEDA_CONDITION_WAIT_TIMEOUT environment + variable) + type: string + responseHeader: + description: How long to wait between when the HTTP request is + sent to the backing app and when response headers need to arrive + (Default is set by the KEDA_RESPONSE_HEADER_TIMEOUT environment + variable) + type: string + type: object required: - scaleTargetRef type: object diff --git a/interceptor/proxy_handlers.go b/interceptor/proxy_handlers.go index ff06a16..95d3ba6 100644 --- a/interceptor/proxy_handlers.go +++ b/interceptor/proxy_handlers.go @@ -53,23 +53,35 @@ func newForwardingHandler( tlsCfg *tls.Config, tracingCfg *config.Tracing, ) http.Handler { - roundTripper := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: dialCtxFunc, - ForceAttemptHTTP2: fwdCfg.forceAttemptHTTP2, - MaxIdleConns: fwdCfg.maxIdleConns, - IdleConnTimeout: fwdCfg.idleConnTimeout, - TLSHandshakeTimeout: fwdCfg.tlsHandshakeTimeout, - ExpectContinueTimeout: fwdCfg.expectContinueTimeout, - ResponseHeaderTimeout: fwdCfg.respHeaderTimeout, - TLSClientConfig: tlsCfg, - } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var uh *handler.Upstream ctx := r.Context() httpso := util.HTTPSOFromContext(ctx) - waitFuncCtx, done := context.WithTimeout(ctx, fwdCfg.waitTimeout) + conditionWaitTimeout := fwdCfg.waitTimeout + roundTripper := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: dialCtxFunc, + ForceAttemptHTTP2: fwdCfg.forceAttemptHTTP2, + MaxIdleConns: fwdCfg.maxIdleConns, + IdleConnTimeout: fwdCfg.idleConnTimeout, + TLSHandshakeTimeout: fwdCfg.tlsHandshakeTimeout, + ExpectContinueTimeout: fwdCfg.expectContinueTimeout, + ResponseHeaderTimeout: fwdCfg.respHeaderTimeout, + TLSClientConfig: tlsCfg, + } + + if httpso.Spec.Timeouts != nil { + if httpso.Spec.Timeouts.ConditionWait.Duration > 0 { + conditionWaitTimeout = httpso.Spec.Timeouts.ConditionWait.Duration + } + + if httpso.Spec.Timeouts.ResponseHeader.Duration > 0 { + roundTripper.ResponseHeaderTimeout = httpso.Spec.Timeouts.ResponseHeader.Duration + } + } + + waitFuncCtx, done := context.WithTimeout(ctx, conditionWaitTimeout) defer done() isColdStart, err := waitFunc( waitFuncCtx, diff --git a/operator/apis/http/v1alpha1/httpscaledobject_types.go b/operator/apis/http/v1alpha1/httpscaledobject_types.go index bfa1099..b8d9ea9 100644 --- a/operator/apis/http/v1alpha1/httpscaledobject_types.go +++ b/operator/apis/http/v1alpha1/httpscaledobject_types.go @@ -76,6 +76,17 @@ type RateMetricSpec struct { Granularity metav1.Duration `json:"granularity" description:"Time granularity for rate calculation"` } +// HTTPScaledObjectTimeoutsSpec defines timeouts that override the global ones +type HTTPScaledObjectTimeoutsSpec struct { + // How long to wait for the backing workload to have 1 or more replicas before connecting and sending the HTTP request (Default is set by the KEDA_CONDITION_WAIT_TIMEOUT environment variable) + // +optional + ConditionWait metav1.Duration `json:"conditionWait" description:"How long to wait for the backing workload to have 1 or more replicas before connecting and sending the HTTP request"` + + // How long to wait between when the HTTP request is sent to the backing app and when response headers need to arrive (Default is set by the KEDA_RESPONSE_HEADER_TIMEOUT environment variable) + // +optional + ResponseHeader metav1.Duration `json:"responseHeader" description:"How long to wait between when the HTTP request is sent to the backing app and when response headers need to arrive"` +} + // HTTPScaledObjectSpec defines the desired state of HTTPScaledObject type HTTPScaledObjectSpec struct { // The hosts to route. All requests which the "Host" header @@ -108,6 +119,9 @@ type HTTPScaledObjectSpec struct { // (optional) Configuration for the metric used for scaling // +optional ScalingMetric *ScalingMetricSpec `json:"scalingMetric,omitempty" description:"Configuration for the metric used for scaling. If empty 'concurrency' will be used"` + // (optional) Timeouts that override the global ones + // +optional + Timeouts *HTTPScaledObjectTimeoutsSpec `json:"timeouts,omitempty" description:"Timeouts that override the global ones"` } // HTTPScaledObjectStatus defines the observed state of HTTPScaledObject diff --git a/operator/apis/http/v1alpha1/zz_generated.deepcopy.go b/operator/apis/http/v1alpha1/zz_generated.deepcopy.go index 50cd834..ae9025d 100644 --- a/operator/apis/http/v1alpha1/zz_generated.deepcopy.go +++ b/operator/apis/http/v1alpha1/zz_generated.deepcopy.go @@ -171,6 +171,11 @@ func (in *HTTPScaledObjectSpec) DeepCopyInto(out *HTTPScaledObjectSpec) { *out = new(ScalingMetricSpec) (*in).DeepCopyInto(*out) } + if in.Timeouts != nil { + in, out := &in.Timeouts, &out.Timeouts + *out = new(HTTPScaledObjectTimeoutsSpec) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPScaledObjectSpec. @@ -203,6 +208,23 @@ func (in *HTTPScaledObjectStatus) DeepCopy() *HTTPScaledObjectStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HTTPScaledObjectTimeoutsSpec) DeepCopyInto(out *HTTPScaledObjectTimeoutsSpec) { + *out = *in + out.ConditionWait = in.ConditionWait + out.ResponseHeader = in.ResponseHeader +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPScaledObjectTimeoutsSpec. +func (in *HTTPScaledObjectTimeoutsSpec) DeepCopy() *HTTPScaledObjectTimeoutsSpec { + if in == nil { + return nil + } + out := new(HTTPScaledObjectTimeoutsSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RateMetricSpec) DeepCopyInto(out *RateMetricSpec) { *out = *in diff --git a/tests/checks/interceptor_timeouts/interceptor_timeouts_test.go b/tests/checks/interceptor_timeouts/interceptor_timeouts_test.go new file mode 100644 index 0000000..da14716 --- /dev/null +++ b/tests/checks/interceptor_timeouts/interceptor_timeouts_test.go @@ -0,0 +1,272 @@ +//go:build e2e +// +build e2e + +package interceptor_timeouts_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/http-add-on/tests/helper" +) + +const ( + testName = "interceptor-timeouts-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + serviceName = fmt.Sprintf("%s-service", testName) + httpScaledObjectName = fmt.Sprintf("%s-http-so", testName) + host = testName + minReplicaCount = 0 + maxReplicaCount = 1 + requestJobName = fmt.Sprintf("%s-request", testName) + responseDelay = "0" +) + +type templateData struct { + TestNamespace string + DeploymentName string + ServiceName string + HTTPScaledObjectName string + ResponseHeaderTimeout string + Host string + MinReplicas int + MaxReplicas int + RequestJobName string + ResponseDelay string +} + +const ( + serviceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.ServiceName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + ports: + - port: 9898 + targetPort: http + protocol: TCP + name: http + selector: + app: {{.DeploymentName}} +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: {{.DeploymentName}} + image: stefanprodan/podinfo:latest + ports: + - name: http + containerPort: 9898 + protocol: TCP + readinessProbe: + httpGet: + path: /readyz + port: http +` + + loadJobTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: {{.RequestJobName}} + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - name: curl-client + image: curlimages/curl + imagePullPolicy: Always + command: ["curl", "-f", "-H", "Host: {{.Host}}", "keda-add-ons-http-interceptor-proxy.keda:8080/delay/{{.ResponseDelay}}"] + restartPolicy: Never + activeDeadlineSeconds: 600 + backoffLimit: 2 +` + + httpScaledObjectWithoutTimeoutsTemplate = ` +kind: HTTPScaledObject +apiVersion: http.keda.sh/v1alpha1 +metadata: + name: {{.HTTPScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + hosts: + - {{.Host}} + targetPendingRequests: 100 + scaledownPeriod: 10 + scaleTargetRef: + name: {{.DeploymentName}} + service: {{.ServiceName}} + port: 9898 + replicas: + min: {{ .MinReplicas }} + max: {{ .MaxReplicas }} +` + + httpScaledObjectWithTimeoutsTemplate = ` +kind: HTTPScaledObject +apiVersion: http.keda.sh/v1alpha1 +metadata: + name: {{.HTTPScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + hosts: + - {{.Host}} + targetPendingRequests: 100 + scaledownPeriod: 10 + scaleTargetRef: + name: {{.DeploymentName}} + service: {{.ServiceName}} + port: 9898 + replicas: + min: {{ .MinReplicas }} + max: {{ .MaxReplicas }} + timeouts: + responseHeader: "{{ .ResponseHeaderTimeout }}s" +` +) + +func TestCheck(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", minReplicaCount) + + testDefaultTimeouts(t, kc, data) + testCustomTimeouts(t, kc, data) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func testDefaultTimeouts(t *testing.T, kc *kubernetes.Clientset, data templateData) { + KubectlApplyWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithoutTimeoutsTemplate) + + testDefaultTimeoutPasses(t, kc, data) + testDefaultTimeoutFails(t, kc, data) + + KubectlDeleteWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithoutTimeoutsTemplate) +} + +func testDefaultTimeoutPasses(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing default timeout passes ---") + + KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", maxReplicaCount) + + assert.True(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should succeed") + + KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10), + "replica count should be %d after 2 minutes", minReplicaCount) +} + +func testDefaultTimeoutFails(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing default timeout fails ---") + + data.ResponseDelay = "2" + + KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", maxReplicaCount) + + assert.False(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should fail") + + KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10), + "replica count should be %d after 2 minutes", minReplicaCount) +} + +func testCustomTimeouts(t *testing.T, kc *kubernetes.Clientset, data templateData) { + data.ResponseHeaderTimeout = "5" + + KubectlApplyWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithTimeoutsTemplate) + + testCustomTimeoutPasses(t, kc, data) + testCustomTimeoutFails(t, kc, data) + + KubectlDeleteWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithTimeoutsTemplate) +} + +func testCustomTimeoutPasses(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing custom timeout passes ---") + + data.ResponseDelay = "2" + + KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", maxReplicaCount) + + assert.True(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should succeed") + + KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10), + "replica count should be %d after 2 minutes", minReplicaCount) +} + +func testCustomTimeoutFails(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing custom timeout fails ---") + + data.ResponseDelay = "7" + + KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", maxReplicaCount) + + assert.False(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should fail") + + KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10), + "replica count should be %d after 2 minutes", minReplicaCount) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ServiceName: serviceName, + HTTPScaledObjectName: httpScaledObjectName, + Host: host, + MinReplicas: minReplicaCount, + MaxReplicas: maxReplicaCount, + RequestJobName: requestJobName, + ResponseDelay: responseDelay, + }, []Template{ + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "serviceNameTemplate", Config: serviceTemplate}, + } +}