Merge branch 'main' into bump-deps
Signed-off-by: Jorge Turrado Ferrero <jorge.turrado@scrm.lidl>
This commit is contained in:
commit
f497338029
|
|
@ -21,5 +21,6 @@ jobs:
|
|||
with:
|
||||
paths: "**/*.md"
|
||||
markdown: true
|
||||
concurrency: 1
|
||||
retry: true
|
||||
linksToSkip: "https://github.com/kedacore/http-add-on/pkgs/container/http-add-on-interceptor, https://github.com/kedacore/http-add-on/pkgs/container/http-add-on-operator, https://github.com/kedacore/http-add-on/pkgs/container/http-add-on-scaler,http://opentelemetry-collector.open-telemetry-system:4318,http://opentelemetry-collector.open-telemetry-system:4318/v1/traces, https://www.gnu.org/software/make/"
|
||||
|
|
|
|||
|
|
@ -7,6 +7,10 @@ linters:
|
|||
default: none
|
||||
enable:
|
||||
- bodyclose
|
||||
- unconvert
|
||||
- ineffassign
|
||||
- staticcheck
|
||||
- copyloopvar
|
||||
#- depguard #https://github.com/kedacore/keda/issues/4980
|
||||
- dogsled
|
||||
- dupl
|
||||
|
|
|
|||
|
|
@ -27,10 +27,11 @@ This changelog keeps track of work items that have been completed and are ready
|
|||
|
||||
- **General**: Add configurable tracing support to the interceptor proxy ([#1021](https://github.com/kedacore/http-add-on/pull/1021))
|
||||
- **General**: Allow using HSO and SO with different names ([#1293](https://github.com/kedacore/http-add-on/issues/1293))
|
||||
|
||||
- **General**: Support profiling for KEDA components ([#4789](https://github.com/kedacore/keda/issues/4789))
|
||||
- **General**: Add possibility to skip TLS verification for upstreams in interceptor ([#1307](https://github.com/kedacore/http-add-on/pull/1307))
|
||||
### Improvements
|
||||
|
||||
- **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO))
|
||||
- **Interceptor**: Support HTTPScaledObject scoped timeout ([#813](https://github.com/kedacore/http-add-on/issues/813))
|
||||
|
||||
### Fixes
|
||||
|
||||
|
|
|
|||
|
|
@ -162,6 +162,22 @@ spec:
|
|||
metric value
|
||||
format: int32
|
||||
type: integer
|
||||
timeouts:
|
||||
description: (optional) Timeouts that override the global ones
|
||||
properties:
|
||||
conditionWait:
|
||||
description: How long to wait for the backing workload to have
|
||||
1 or more replicas before connecting and sending the HTTP request
|
||||
(Default is set by the KEDA_CONDITION_WAIT_TIMEOUT environment
|
||||
variable)
|
||||
type: string
|
||||
responseHeader:
|
||||
description: How long to wait between when the HTTP request is
|
||||
sent to the backing app and when response headers need to arrive
|
||||
(Default is set by the KEDA_RESPONSE_HEADER_TIMEOUT environment
|
||||
variable)
|
||||
type: string
|
||||
type: object
|
||||
required:
|
||||
- scaleTargetRef
|
||||
type: object
|
||||
|
|
|
|||
|
|
@ -28,6 +28,8 @@ For setting multiple TLS certs, set `KEDA_HTTP_PROXY_TLS_CERT_STORE_PATHS` with
|
|||
* `XYZ.crt` + `XYZ.key` - this is a convention when using Kubernetes Secrets of type tls
|
||||
* `XYZ.pem` + `XYZ-key.pem`
|
||||
|
||||
To disable certificate chain verification, set `KEDA_HTTP_PROXY_TLS_SKIP_VERIFY` to `false`
|
||||
|
||||
The matching between certs and requests is performed during the TLS ClientHelo message, where the SNI service name is compared to SANs provided in each cert and the first matching cert will be used for the rest of the TLS handshake.
|
||||
# Configuring tracing for the KEDA HTTP Add-on interceptor proxy
|
||||
|
||||
|
|
|
|||
|
|
@ -43,8 +43,12 @@ type Serving struct {
|
|||
TLSKeyPath string `envconfig:"KEDA_HTTP_PROXY_TLS_KEY_PATH" default:"/certs/tls.key"`
|
||||
// TLSCertStorePaths is a comma separated list of paths to read the certificate/key pairs for the TLS server
|
||||
TLSCertStorePaths string `envconfig:"KEDA_HTTP_PROXY_TLS_CERT_STORE_PATHS" default:""`
|
||||
// TLSSkipVerify is a boolean flag to specify whether the interceptor should skip TLS verification for upstreams
|
||||
TLSSkipVerify bool `envconfig:"KEDA_HTTP_PROXY_TLS_SKIP_VERIFY" default:"false"`
|
||||
// TLSPort is the port that the server should serve on if TLS is enabled
|
||||
TLSPort int `envconfig:"KEDA_HTTP_PROXY_TLS_PORT" default:"8443"`
|
||||
// ProfilingAddr if not empty, pprof will be available on this address, assuming host:port here
|
||||
ProfilingAddr string `envconfig:"PROFILING_BIND_ADDRESS" default:""`
|
||||
}
|
||||
|
||||
// Parse parses standard configs using envconfig and returns a pointer to the
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
|
|
@ -81,6 +82,7 @@ func main() {
|
|||
proxyPort := servingCfg.ProxyPort
|
||||
adminPort := servingCfg.AdminPort
|
||||
proxyTLSEnabled := servingCfg.ProxyTLSEnabled
|
||||
profilingAddr := servingCfg.ProfilingAddr
|
||||
|
||||
// setup the configured metrics collectors
|
||||
metrics.NewMetricsCollectors(metricsCfg)
|
||||
|
|
@ -190,7 +192,7 @@ func main() {
|
|||
// start a proxy server with TLS
|
||||
if proxyTLSEnabled {
|
||||
eg.Go(func() error {
|
||||
proxyTLSConfig := map[string]string{"certificatePath": servingCfg.TLSCertPath, "keyPath": servingCfg.TLSKeyPath, "certstorePaths": servingCfg.TLSCertStorePaths}
|
||||
proxyTLSConfig := map[string]interface{}{"certificatePath": servingCfg.TLSCertPath, "keyPath": servingCfg.TLSKeyPath, "certstorePaths": servingCfg.TLSCertStorePaths, "skipVerify": servingCfg.TLSSkipVerify}
|
||||
proxyTLSPort := servingCfg.TLSPort
|
||||
k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
|
|
@ -218,6 +220,13 @@ func main() {
|
|||
return nil
|
||||
})
|
||||
|
||||
if len(profilingAddr) > 0 {
|
||||
eg.Go(func() error {
|
||||
setupLog.Info("enabling pprof for profiling", "address", profilingAddr)
|
||||
return http.ListenAndServe(profilingAddr, nil)
|
||||
})
|
||||
}
|
||||
|
||||
build.PrintComponentInfo(ctrl.Log, "Interceptor")
|
||||
|
||||
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
|
||||
|
|
@ -299,12 +308,15 @@ func defaultCertPool(logger logr.Logger) *x509.CertPool {
|
|||
|
||||
// getTLSConfig creates a TLS config from KEDA_HTTP_PROXY_TLS_CERT_PATH, KEDA_HTTP_PROXY_TLS_KEY_PATH and KEDA_HTTP_PROXY_TLS_CERTSTORE_PATHS
|
||||
// The matching between request and certificate is performed by comparing TLS/SNI server name with x509 SANs
|
||||
func getTLSConfig(tlsConfig map[string]string, logger logr.Logger) (*tls.Config, error) {
|
||||
certPath := tlsConfig["certificatePath"]
|
||||
keyPath := tlsConfig["keyPath"]
|
||||
certStorePaths := tlsConfig["certstorePaths"]
|
||||
func getTLSConfig(tlsConfig map[string]interface{}, logger logr.Logger) (*tls.Config, error) {
|
||||
certPath, _ := tlsConfig["certificatePath"].(string)
|
||||
keyPath, _ := tlsConfig["keyPath"].(string)
|
||||
certStorePaths, _ := tlsConfig["certstorePaths"].(string)
|
||||
insecureSkipVerify, _ := tlsConfig["skipVerify"].(bool)
|
||||
|
||||
servingTLS := &tls.Config{
|
||||
RootCAs: defaultCertPool(logger),
|
||||
RootCAs: defaultCertPool(logger),
|
||||
InsecureSkipVerify: insecureSkipVerify,
|
||||
}
|
||||
var defaultCert *tls.Certificate
|
||||
|
||||
|
|
@ -395,7 +407,7 @@ func runProxyServer(
|
|||
timeouts *config.Timeouts,
|
||||
port int,
|
||||
tlsEnabled bool,
|
||||
tlsConfig map[string]string,
|
||||
tlsConfig map[string]interface{},
|
||||
tracingConfig *config.Tracing,
|
||||
) error {
|
||||
dialer := kedanet.NewNetDialer(timeouts.Connect, timeouts.KeepAlive)
|
||||
|
|
@ -421,6 +433,7 @@ func runProxyServer(
|
|||
if tlsCfg != nil {
|
||||
forwardingTLSCfg.RootCAs = tlsCfg.RootCAs
|
||||
forwardingTLSCfg.Certificates = tlsCfg.Certificates
|
||||
forwardingTLSCfg.InsecureSkipVerify = tlsCfg.InsecureSkipVerify
|
||||
}
|
||||
|
||||
upstreamHandler = newForwardingHandler(
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) {
|
|||
timeouts,
|
||||
port,
|
||||
false,
|
||||
map[string]string{},
|
||||
map[string]interface{}{},
|
||||
&tracingCfg,
|
||||
)
|
||||
})
|
||||
|
|
@ -232,7 +232,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) {
|
|||
timeouts,
|
||||
port,
|
||||
true,
|
||||
map[string]string{"certificatePath": "../certs/tls.crt", "keyPath": "../certs/tls.key"},
|
||||
map[string]interface{}{"certificatePath": "../certs/tls.crt", "keyPath": "../certs/tls.key", "skipVerify": true},
|
||||
&tracingCfg,
|
||||
)
|
||||
})
|
||||
|
|
@ -382,7 +382,7 @@ func TestRunProxyServerWithMultipleCertsTLSCountMiddleware(t *testing.T) {
|
|||
timeouts,
|
||||
port,
|
||||
true,
|
||||
map[string]string{"certstorePaths": "../certs"},
|
||||
map[string]interface{}{"certstorePaths": "../certs"},
|
||||
&tracingCfg,
|
||||
)
|
||||
})
|
||||
|
|
|
|||
|
|
@ -53,23 +53,35 @@ func newForwardingHandler(
|
|||
tlsCfg *tls.Config,
|
||||
tracingCfg *config.Tracing,
|
||||
) http.Handler {
|
||||
roundTripper := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: dialCtxFunc,
|
||||
ForceAttemptHTTP2: fwdCfg.forceAttemptHTTP2,
|
||||
MaxIdleConns: fwdCfg.maxIdleConns,
|
||||
IdleConnTimeout: fwdCfg.idleConnTimeout,
|
||||
TLSHandshakeTimeout: fwdCfg.tlsHandshakeTimeout,
|
||||
ExpectContinueTimeout: fwdCfg.expectContinueTimeout,
|
||||
ResponseHeaderTimeout: fwdCfg.respHeaderTimeout,
|
||||
TLSClientConfig: tlsCfg,
|
||||
}
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var uh *handler.Upstream
|
||||
ctx := r.Context()
|
||||
httpso := util.HTTPSOFromContext(ctx)
|
||||
|
||||
waitFuncCtx, done := context.WithTimeout(ctx, fwdCfg.waitTimeout)
|
||||
conditionWaitTimeout := fwdCfg.waitTimeout
|
||||
roundTripper := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: dialCtxFunc,
|
||||
ForceAttemptHTTP2: fwdCfg.forceAttemptHTTP2,
|
||||
MaxIdleConns: fwdCfg.maxIdleConns,
|
||||
IdleConnTimeout: fwdCfg.idleConnTimeout,
|
||||
TLSHandshakeTimeout: fwdCfg.tlsHandshakeTimeout,
|
||||
ExpectContinueTimeout: fwdCfg.expectContinueTimeout,
|
||||
ResponseHeaderTimeout: fwdCfg.respHeaderTimeout,
|
||||
TLSClientConfig: tlsCfg,
|
||||
}
|
||||
|
||||
if httpso.Spec.Timeouts != nil {
|
||||
if httpso.Spec.Timeouts.ConditionWait.Duration > 0 {
|
||||
conditionWaitTimeout = httpso.Spec.Timeouts.ConditionWait.Duration
|
||||
}
|
||||
|
||||
if httpso.Spec.Timeouts.ResponseHeader.Duration > 0 {
|
||||
roundTripper.ResponseHeaderTimeout = httpso.Spec.Timeouts.ResponseHeader.Duration
|
||||
}
|
||||
}
|
||||
|
||||
waitFuncCtx, done := context.WithTimeout(ctx, conditionWaitTimeout)
|
||||
defer done()
|
||||
isColdStart, err := waitFunc(
|
||||
waitFuncCtx,
|
||||
|
|
|
|||
|
|
@ -76,6 +76,17 @@ type RateMetricSpec struct {
|
|||
Granularity metav1.Duration `json:"granularity" description:"Time granularity for rate calculation"`
|
||||
}
|
||||
|
||||
// HTTPScaledObjectTimeoutsSpec defines timeouts that override the global ones
|
||||
type HTTPScaledObjectTimeoutsSpec struct {
|
||||
// How long to wait for the backing workload to have 1 or more replicas before connecting and sending the HTTP request (Default is set by the KEDA_CONDITION_WAIT_TIMEOUT environment variable)
|
||||
// +optional
|
||||
ConditionWait metav1.Duration `json:"conditionWait" description:"How long to wait for the backing workload to have 1 or more replicas before connecting and sending the HTTP request"`
|
||||
|
||||
// How long to wait between when the HTTP request is sent to the backing app and when response headers need to arrive (Default is set by the KEDA_RESPONSE_HEADER_TIMEOUT environment variable)
|
||||
// +optional
|
||||
ResponseHeader metav1.Duration `json:"responseHeader" description:"How long to wait between when the HTTP request is sent to the backing app and when response headers need to arrive"`
|
||||
}
|
||||
|
||||
// HTTPScaledObjectSpec defines the desired state of HTTPScaledObject
|
||||
type HTTPScaledObjectSpec struct {
|
||||
// The hosts to route. All requests which the "Host" header
|
||||
|
|
@ -108,6 +119,9 @@ type HTTPScaledObjectSpec struct {
|
|||
// (optional) Configuration for the metric used for scaling
|
||||
// +optional
|
||||
ScalingMetric *ScalingMetricSpec `json:"scalingMetric,omitempty" description:"Configuration for the metric used for scaling. If empty 'concurrency' will be used"`
|
||||
// (optional) Timeouts that override the global ones
|
||||
// +optional
|
||||
Timeouts *HTTPScaledObjectTimeoutsSpec `json:"timeouts,omitempty" description:"Timeouts that override the global ones"`
|
||||
}
|
||||
|
||||
// HTTPScaledObjectStatus defines the observed state of HTTPScaledObject
|
||||
|
|
|
|||
|
|
@ -171,6 +171,11 @@ func (in *HTTPScaledObjectSpec) DeepCopyInto(out *HTTPScaledObjectSpec) {
|
|||
*out = new(ScalingMetricSpec)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
if in.Timeouts != nil {
|
||||
in, out := &in.Timeouts, &out.Timeouts
|
||||
*out = new(HTTPScaledObjectTimeoutsSpec)
|
||||
**out = **in
|
||||
}
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPScaledObjectSpec.
|
||||
|
|
@ -203,6 +208,23 @@ func (in *HTTPScaledObjectStatus) DeepCopy() *HTTPScaledObjectStatus {
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *HTTPScaledObjectTimeoutsSpec) DeepCopyInto(out *HTTPScaledObjectTimeoutsSpec) {
|
||||
*out = *in
|
||||
out.ConditionWait = in.ConditionWait
|
||||
out.ResponseHeader = in.ResponseHeader
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPScaledObjectTimeoutsSpec.
|
||||
func (in *HTTPScaledObjectTimeoutsSpec) DeepCopy() *HTTPScaledObjectTimeoutsSpec {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(HTTPScaledObjectTimeoutsSpec)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *RateMetricSpec) DeepCopyInto(out *RateMetricSpec) {
|
||||
*out = *in
|
||||
|
|
|
|||
|
|
@ -57,11 +57,13 @@ func main() {
|
|||
var metricsAddr string
|
||||
var enableLeaderElection bool
|
||||
var probeAddr string
|
||||
var profilingAddr string
|
||||
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
|
||||
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
|
||||
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
|
||||
"Enable leader election for controller manager. "+
|
||||
"Enabling this will ensure there is only one active controller manager.")
|
||||
flag.StringVar(&profilingAddr, "profiling-bind-address", "", "The address the profiling would be exposed on.")
|
||||
opts := zap.Options{
|
||||
Development: true,
|
||||
}
|
||||
|
|
@ -96,6 +98,7 @@ func main() {
|
|||
Metrics: server.Options{
|
||||
BindAddress: metricsAddr,
|
||||
},
|
||||
PprofBindAddress: profilingAddr,
|
||||
HealthProbeBindAddress: probeAddr,
|
||||
LeaderElection: enableLeaderElection,
|
||||
LeaderElectionID: "http-add-on.keda.sh",
|
||||
|
|
|
|||
|
|
@ -57,7 +57,6 @@ func TestGetEndpoints(t *testing.T) {
|
|||
addrLookup := map[string]*v1.EndpointAddress{}
|
||||
for _, subset := range endpoints.Subsets {
|
||||
for _, addr := range subset.Addresses {
|
||||
addr := addr
|
||||
key := fmt.Sprintf("http://%s:%s", addr.IP, svcPort)
|
||||
addrLookup[key] = &addr
|
||||
}
|
||||
|
|
|
|||
|
|
@ -190,8 +190,6 @@ var _ = Describe("Table", func() {
|
|||
defer cancel()
|
||||
|
||||
for _, httpso := range httpsoList.Items {
|
||||
httpso := httpso
|
||||
|
||||
key := *k8s.NamespacedNameFromObject(&httpso)
|
||||
t.httpScaledObjects[key] = &httpso
|
||||
}
|
||||
|
|
@ -216,8 +214,6 @@ var _ = Describe("Table", func() {
|
|||
defer cancel()
|
||||
|
||||
for _, httpso := range httpsoList.Items {
|
||||
httpso := httpso
|
||||
|
||||
key := *k8s.NamespacedNameFromObject(&httpso)
|
||||
t.httpScaledObjects[key] = &httpso
|
||||
}
|
||||
|
|
@ -285,8 +281,6 @@ var _ = Describe("Table", func() {
|
|||
|
||||
It("returns new memory based on HTTPSOs", func() {
|
||||
for _, httpso := range httpsoList.Items {
|
||||
httpso := httpso
|
||||
|
||||
key := *k8s.NamespacedNameFromObject(&httpso)
|
||||
t.httpScaledObjects[key] = &httpso
|
||||
}
|
||||
|
|
|
|||
|
|
@ -484,8 +484,6 @@ var _ = Describe("TableMemory", func() {
|
|||
store: iradix.New[*httpv1alpha1.HTTPScaledObject](),
|
||||
}
|
||||
for _, httpso := range httpsoList.Items {
|
||||
httpso := httpso
|
||||
|
||||
tm = insertTrees(tm, &httpso)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ type config struct {
|
|||
DeploymentCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_DEPLOYMENT_INFORMER_RSYNC_PERIOD" default:"60m"`
|
||||
// QueueTickDuration is the duration between queue requests
|
||||
QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"`
|
||||
// ProfilingAddr if not empty, pprof will be available on this address, assuming host:port here
|
||||
ProfilingAddr string `envconfig:"PROFILING_BIND_ADDRESS" default:""`
|
||||
}
|
||||
|
||||
func mustParseConfig() *config {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,8 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
|
|
@ -47,6 +49,7 @@ func main() {
|
|||
deplName := cfg.TargetDeployment
|
||||
targetPortStr := fmt.Sprintf("%d", cfg.TargetPort)
|
||||
targetPendingRequests := cfg.TargetPendingRequests
|
||||
profilingAddr := cfg.ProfilingAddr
|
||||
|
||||
opts := zap.Options{}
|
||||
opts.BindFlags(flag.CommandLine)
|
||||
|
|
@ -113,6 +116,13 @@ func main() {
|
|||
return nil
|
||||
})
|
||||
|
||||
if len(profilingAddr) > 0 {
|
||||
eg.Go(func() error {
|
||||
setupLog.Info("enabling pprof for profiling", "address", profilingAddr)
|
||||
return http.ListenAndServe(profilingAddr, nil)
|
||||
})
|
||||
}
|
||||
|
||||
eg.Go(func() error {
|
||||
setupLog.Info("starting the grpc server")
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,272 @@
|
|||
//go:build e2e
|
||||
// +build e2e
|
||||
|
||||
package interceptor_timeouts_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
. "github.com/kedacore/http-add-on/tests/helper"
|
||||
)
|
||||
|
||||
const (
|
||||
testName = "interceptor-timeouts-test"
|
||||
)
|
||||
|
||||
var (
|
||||
testNamespace = fmt.Sprintf("%s-ns", testName)
|
||||
deploymentName = fmt.Sprintf("%s-deployment", testName)
|
||||
serviceName = fmt.Sprintf("%s-service", testName)
|
||||
httpScaledObjectName = fmt.Sprintf("%s-http-so", testName)
|
||||
host = testName
|
||||
minReplicaCount = 0
|
||||
maxReplicaCount = 1
|
||||
requestJobName = fmt.Sprintf("%s-request", testName)
|
||||
responseDelay = "0"
|
||||
)
|
||||
|
||||
type templateData struct {
|
||||
TestNamespace string
|
||||
DeploymentName string
|
||||
ServiceName string
|
||||
HTTPScaledObjectName string
|
||||
ResponseHeaderTimeout string
|
||||
Host string
|
||||
MinReplicas int
|
||||
MaxReplicas int
|
||||
RequestJobName string
|
||||
ResponseDelay string
|
||||
}
|
||||
|
||||
const (
|
||||
serviceTemplate = `
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{.ServiceName}}
|
||||
namespace: {{.TestNamespace}}
|
||||
labels:
|
||||
app: {{.DeploymentName}}
|
||||
spec:
|
||||
ports:
|
||||
- port: 9898
|
||||
targetPort: http
|
||||
protocol: TCP
|
||||
name: http
|
||||
selector:
|
||||
app: {{.DeploymentName}}
|
||||
`
|
||||
|
||||
deploymentTemplate = `
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: {{.DeploymentName}}
|
||||
namespace: {{.TestNamespace}}
|
||||
labels:
|
||||
app: {{.DeploymentName}}
|
||||
spec:
|
||||
replicas: 0
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{.DeploymentName}}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: {{.DeploymentName}}
|
||||
spec:
|
||||
containers:
|
||||
- name: {{.DeploymentName}}
|
||||
image: stefanprodan/podinfo:latest
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: 9898
|
||||
protocol: TCP
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /readyz
|
||||
port: http
|
||||
`
|
||||
|
||||
loadJobTemplate = `
|
||||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: {{.RequestJobName}}
|
||||
namespace: {{.TestNamespace}}
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: curl-client
|
||||
image: curlimages/curl
|
||||
imagePullPolicy: Always
|
||||
command: ["curl", "-f", "-H", "Host: {{.Host}}", "keda-add-ons-http-interceptor-proxy.keda:8080/delay/{{.ResponseDelay}}"]
|
||||
restartPolicy: Never
|
||||
activeDeadlineSeconds: 600
|
||||
backoffLimit: 2
|
||||
`
|
||||
|
||||
httpScaledObjectWithoutTimeoutsTemplate = `
|
||||
kind: HTTPScaledObject
|
||||
apiVersion: http.keda.sh/v1alpha1
|
||||
metadata:
|
||||
name: {{.HTTPScaledObjectName}}
|
||||
namespace: {{.TestNamespace}}
|
||||
spec:
|
||||
hosts:
|
||||
- {{.Host}}
|
||||
targetPendingRequests: 100
|
||||
scaledownPeriod: 10
|
||||
scaleTargetRef:
|
||||
name: {{.DeploymentName}}
|
||||
service: {{.ServiceName}}
|
||||
port: 9898
|
||||
replicas:
|
||||
min: {{ .MinReplicas }}
|
||||
max: {{ .MaxReplicas }}
|
||||
`
|
||||
|
||||
httpScaledObjectWithTimeoutsTemplate = `
|
||||
kind: HTTPScaledObject
|
||||
apiVersion: http.keda.sh/v1alpha1
|
||||
metadata:
|
||||
name: {{.HTTPScaledObjectName}}
|
||||
namespace: {{.TestNamespace}}
|
||||
spec:
|
||||
hosts:
|
||||
- {{.Host}}
|
||||
targetPendingRequests: 100
|
||||
scaledownPeriod: 10
|
||||
scaleTargetRef:
|
||||
name: {{.DeploymentName}}
|
||||
service: {{.ServiceName}}
|
||||
port: 9898
|
||||
replicas:
|
||||
min: {{ .MinReplicas }}
|
||||
max: {{ .MaxReplicas }}
|
||||
timeouts:
|
||||
responseHeader: "{{ .ResponseHeaderTimeout }}s"
|
||||
`
|
||||
)
|
||||
|
||||
func TestCheck(t *testing.T) {
|
||||
// setup
|
||||
t.Log("--- setting up ---")
|
||||
// Create kubernetes resources
|
||||
kc := GetKubernetesClient(t)
|
||||
data, templates := getTemplateData()
|
||||
CreateKubernetesResources(t, kc, testNamespace, data, templates)
|
||||
|
||||
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 6, 10),
|
||||
"replica count should be %d after 1 minutes", minReplicaCount)
|
||||
|
||||
testDefaultTimeouts(t, kc, data)
|
||||
testCustomTimeouts(t, kc, data)
|
||||
|
||||
// cleanup
|
||||
DeleteKubernetesResources(t, testNamespace, data, templates)
|
||||
}
|
||||
|
||||
func testDefaultTimeouts(t *testing.T, kc *kubernetes.Clientset, data templateData) {
|
||||
KubectlApplyWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithoutTimeoutsTemplate)
|
||||
|
||||
testDefaultTimeoutPasses(t, kc, data)
|
||||
testDefaultTimeoutFails(t, kc, data)
|
||||
|
||||
KubectlDeleteWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithoutTimeoutsTemplate)
|
||||
}
|
||||
|
||||
func testDefaultTimeoutPasses(t *testing.T, kc *kubernetes.Clientset, data templateData) {
|
||||
t.Log("--- testing default timeout passes ---")
|
||||
|
||||
KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
|
||||
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10),
|
||||
"replica count should be %d after 1 minutes", maxReplicaCount)
|
||||
|
||||
assert.True(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should succeed")
|
||||
|
||||
KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
|
||||
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10),
|
||||
"replica count should be %d after 2 minutes", minReplicaCount)
|
||||
}
|
||||
|
||||
func testDefaultTimeoutFails(t *testing.T, kc *kubernetes.Clientset, data templateData) {
|
||||
t.Log("--- testing default timeout fails ---")
|
||||
|
||||
data.ResponseDelay = "2"
|
||||
|
||||
KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
|
||||
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10),
|
||||
"replica count should be %d after 1 minutes", maxReplicaCount)
|
||||
|
||||
assert.False(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should fail")
|
||||
|
||||
KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
|
||||
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10),
|
||||
"replica count should be %d after 2 minutes", minReplicaCount)
|
||||
}
|
||||
|
||||
func testCustomTimeouts(t *testing.T, kc *kubernetes.Clientset, data templateData) {
|
||||
data.ResponseHeaderTimeout = "5"
|
||||
|
||||
KubectlApplyWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithTimeoutsTemplate)
|
||||
|
||||
testCustomTimeoutPasses(t, kc, data)
|
||||
testCustomTimeoutFails(t, kc, data)
|
||||
|
||||
KubectlDeleteWithTemplate(t, data, "httpScaledObjectTemplate", httpScaledObjectWithTimeoutsTemplate)
|
||||
}
|
||||
|
||||
func testCustomTimeoutPasses(t *testing.T, kc *kubernetes.Clientset, data templateData) {
|
||||
t.Log("--- testing custom timeout passes ---")
|
||||
|
||||
data.ResponseDelay = "2"
|
||||
|
||||
KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
|
||||
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10),
|
||||
"replica count should be %d after 1 minutes", maxReplicaCount)
|
||||
|
||||
assert.True(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should succeed")
|
||||
|
||||
KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
|
||||
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10),
|
||||
"replica count should be %d after 2 minutes", minReplicaCount)
|
||||
}
|
||||
|
||||
func testCustomTimeoutFails(t *testing.T, kc *kubernetes.Clientset, data templateData) {
|
||||
t.Log("--- testing custom timeout fails ---")
|
||||
|
||||
data.ResponseDelay = "7"
|
||||
|
||||
KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
|
||||
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10),
|
||||
"replica count should be %d after 1 minutes", maxReplicaCount)
|
||||
|
||||
assert.False(t, WaitForJobSuccess(t, kc, requestJobName, testNamespace, 1, 1), "request should fail")
|
||||
|
||||
KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate)
|
||||
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10),
|
||||
"replica count should be %d after 2 minutes", minReplicaCount)
|
||||
}
|
||||
|
||||
func getTemplateData() (templateData, []Template) {
|
||||
return templateData{
|
||||
TestNamespace: testNamespace,
|
||||
DeploymentName: deploymentName,
|
||||
ServiceName: serviceName,
|
||||
HTTPScaledObjectName: httpScaledObjectName,
|
||||
Host: host,
|
||||
MinReplicas: minReplicaCount,
|
||||
MaxReplicas: maxReplicaCount,
|
||||
RequestJobName: requestJobName,
|
||||
ResponseDelay: responseDelay,
|
||||
}, []Template{
|
||||
{Name: "deploymentTemplate", Config: deploymentTemplate},
|
||||
{Name: "serviceNameTemplate", Config: serviceTemplate},
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue