826 lines
22 KiB
Go
826 lines
22 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
"github.com/stretchr/testify/require"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/utils/ptr"
|
|
|
|
"github.com/kedacore/http-add-on/interceptor/config"
|
|
httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1"
|
|
kedanet "github.com/kedacore/http-add-on/pkg/net"
|
|
"github.com/kedacore/http-add-on/pkg/util"
|
|
)
|
|
|
|
var TestTLSConfig = tls.Config{}
|
|
|
|
func init() {
|
|
caCert, err := os.ReadFile("../certs/tls.crt")
|
|
if err != nil {
|
|
log.Fatalf("Error getting tests certs - make sure to run make test to generate them: %v", err)
|
|
}
|
|
caCertPool := x509.NewCertPool()
|
|
caCertPool.AppendCertsFromPEM(caCert)
|
|
cert, err := tls.LoadX509KeyPair("../certs/tls.crt", "../certs/tls.key")
|
|
|
|
if err != nil {
|
|
log.Fatalf("Error getting tests certs - make sure to run make test to generate them %v", err)
|
|
}
|
|
|
|
TestTLSConfig.RootCAs = caCertPool
|
|
TestTLSConfig.Certificates = []tls.Certificate{cert}
|
|
}
|
|
|
|
// the proxy should successfully forward a request to a running server
|
|
func TestImmediatelySuccessfulProxy(t *testing.T) {
|
|
host := fmt.Sprintf("%s.testing", t.Name())
|
|
r := require.New(t)
|
|
|
|
originHdl := kedanet.NewTestHTTPHandlerWrapper(
|
|
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
w.WriteHeader(200)
|
|
_, err := w.Write([]byte("test response"))
|
|
r.NoError(err)
|
|
}),
|
|
)
|
|
srv, originURL, err := kedanet.StartTestServer(originHdl)
|
|
r.NoError(err)
|
|
defer srv.Close()
|
|
originPort, err := strconv.Atoi(originURL.Port())
|
|
r.NoError(err)
|
|
|
|
timeouts := defaultTimeouts()
|
|
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
|
|
waitFunc := func(context.Context, string, string) (bool, error) {
|
|
return false, nil
|
|
}
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: timeouts.WorkloadReplicas,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&tls.Config{},
|
|
&config.Tracing{},
|
|
)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req, targetFromURL(
|
|
originURL,
|
|
originPort,
|
|
"testdepl",
|
|
"testservice",
|
|
))
|
|
req = util.RequestWithStream(req, originURL)
|
|
req.Host = host
|
|
|
|
hdl.ServeHTTP(res, req)
|
|
|
|
r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false")
|
|
r.Equal(200, res.Code, "expected response code 200")
|
|
r.Equal("test response", res.Body.String())
|
|
}
|
|
|
|
func TestImmediatelySuccessfulProxyTLS(t *testing.T) {
|
|
host := fmt.Sprintf("%s.testing", t.Name())
|
|
r := require.New(t)
|
|
|
|
originHdl := kedanet.NewTestHTTPHandlerWrapper(
|
|
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
w.WriteHeader(200)
|
|
_, err := w.Write([]byte("test response"))
|
|
r.NoError(err)
|
|
}),
|
|
)
|
|
srv, originURL, err := kedanet.StartTestServer(originHdl)
|
|
r.NoError(err)
|
|
defer srv.Close()
|
|
originPort, err := strconv.Atoi(originURL.Port())
|
|
r.NoError(err)
|
|
|
|
timeouts := defaultTimeouts()
|
|
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
|
|
waitFunc := func(context.Context, string, string) (bool, error) {
|
|
return false, nil
|
|
}
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: timeouts.WorkloadReplicas,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&TestTLSConfig,
|
|
&config.Tracing{},
|
|
)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req, targetFromURL(
|
|
originURL,
|
|
originPort,
|
|
"testdepl",
|
|
"testsvc",
|
|
))
|
|
req = util.RequestWithStream(req, originURL)
|
|
req.Host = host
|
|
|
|
hdl.ServeHTTP(res, req)
|
|
|
|
r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false")
|
|
r.Equal(200, res.Code, "expected response code 200")
|
|
r.Equal("test response", res.Body.String())
|
|
}
|
|
|
|
// the proxy should successfully forward a request to the failover when the server is not reachable
|
|
func TestImmediatelySuccessfulFailoverProxy(t *testing.T) {
|
|
host := fmt.Sprintf("%s.testing", t.Name())
|
|
r := require.New(t)
|
|
|
|
initialStream, err := url.Parse("http://0.0.0.0:0")
|
|
r.NoError(err)
|
|
|
|
failoverHdl := kedanet.NewTestHTTPHandlerWrapper(
|
|
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
w.WriteHeader(200)
|
|
_, err := w.Write([]byte("test response"))
|
|
r.NoError(err)
|
|
}),
|
|
)
|
|
srv, failoverURL, err := kedanet.StartTestServer(failoverHdl)
|
|
r.NoError(err)
|
|
defer srv.Close()
|
|
failoverPort, err := strconv.Atoi(failoverURL.Port())
|
|
r.NoError(err)
|
|
|
|
timeouts := defaultTimeouts()
|
|
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
|
|
waitFunc := func(ctx context.Context, _ string, _ string) (bool, error) {
|
|
return false, errors.New("nothing")
|
|
}
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: 0,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&tls.Config{},
|
|
&config.Tracing{},
|
|
)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req,
|
|
&httpv1alpha1.HTTPScaledObject{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "@" + host,
|
|
},
|
|
Spec: httpv1alpha1.HTTPScaledObjectSpec{
|
|
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
|
|
Name: "testdepl",
|
|
Service: "testsvc",
|
|
Port: int32(456),
|
|
},
|
|
ColdStartTimeoutFailoverRef: &httpv1alpha1.ColdStartTimeoutFailoverRef{
|
|
Service: "testsvc",
|
|
Port: int32(failoverPort),
|
|
TimeoutSeconds: 30,
|
|
},
|
|
TargetPendingRequests: ptr.To[int32](123),
|
|
},
|
|
},
|
|
)
|
|
req = util.RequestWithStream(req, initialStream)
|
|
req = util.RequestWithFailoverStream(req, failoverURL)
|
|
req.Host = host
|
|
|
|
hdl.ServeHTTP(res, req)
|
|
|
|
r.Equal(200, res.Code, "expected response code 200")
|
|
r.Equal("test response", res.Body.String())
|
|
}
|
|
|
|
// the proxy should wait for a timeout and fail if there is no
|
|
// origin to which to connect
|
|
func TestWaitFailedConnection(t *testing.T) {
|
|
const host = "TestWaitFailedConnection.testing"
|
|
r := require.New(t)
|
|
|
|
timeouts := defaultTimeouts()
|
|
backoff := timeouts.DefaultBackoff()
|
|
backoff.Steps = 2
|
|
dialCtxFunc := retryDialContextFunc(
|
|
timeouts,
|
|
backoff,
|
|
)
|
|
waitFunc := func(context.Context, string, string) (bool, error) {
|
|
return false, nil
|
|
}
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: timeouts.WorkloadReplicas,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&tls.Config{},
|
|
&config.Tracing{},
|
|
)
|
|
stream, err := url.Parse("http://0.0.0.0:0")
|
|
r.NoError(err)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req, &httpv1alpha1.HTTPScaledObject{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "testns",
|
|
},
|
|
Spec: httpv1alpha1.HTTPScaledObjectSpec{
|
|
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
|
|
Service: "nosuchdepl",
|
|
Port: 8081,
|
|
},
|
|
TargetPendingRequests: ptr.To[int32](1234),
|
|
},
|
|
})
|
|
req = util.RequestWithStream(req, stream)
|
|
req.Host = host
|
|
|
|
hdl.ServeHTTP(res, req)
|
|
|
|
r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false")
|
|
r.Equal(502, res.Code, "response code was unexpected")
|
|
}
|
|
|
|
func TestWaitFailedConnectionTLS(t *testing.T) {
|
|
const host = "TestWaitFailedConnection.testing"
|
|
r := require.New(t)
|
|
|
|
timeouts := defaultTimeouts()
|
|
backoff := timeouts.DefaultBackoff()
|
|
backoff.Steps = 2
|
|
dialCtxFunc := retryDialContextFunc(
|
|
timeouts,
|
|
backoff,
|
|
)
|
|
waitFunc := func(context.Context, string, string) (bool, error) {
|
|
return false, nil
|
|
}
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: timeouts.WorkloadReplicas,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&TestTLSConfig,
|
|
&config.Tracing{},
|
|
)
|
|
stream, err := url.Parse("http://0.0.0.0:0")
|
|
r.NoError(err)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req, &httpv1alpha1.HTTPScaledObject{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "testns",
|
|
},
|
|
Spec: httpv1alpha1.HTTPScaledObjectSpec{
|
|
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
|
|
Name: "nosuchdepl",
|
|
Service: "nosuchdepl",
|
|
Port: 8081,
|
|
},
|
|
TargetPendingRequests: ptr.To[int32](1234),
|
|
},
|
|
})
|
|
req = util.RequestWithStream(req, stream)
|
|
req.Host = host
|
|
|
|
hdl.ServeHTTP(res, req)
|
|
|
|
r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false")
|
|
r.Equal(502, res.Code, "response code was unexpected")
|
|
}
|
|
|
|
// the proxy handler should wait for the wait function until it hits
|
|
// a timeout, then it should fail
|
|
func TestTimesOutOnWaitFunc(t *testing.T) {
|
|
r := require.New(t)
|
|
|
|
timeouts := defaultTimeouts()
|
|
timeouts.WorkloadReplicas = 25 * time.Millisecond
|
|
timeouts.ResponseHeader = 25 * time.Millisecond
|
|
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
|
|
|
|
waitFunc, waitFuncCalledCh, finishWaitFunc := notifyingFunc()
|
|
defer finishWaitFunc()
|
|
noSuchHost := fmt.Sprintf("%s.testing", t.Name())
|
|
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: timeouts.WorkloadReplicas,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&tls.Config{},
|
|
&config.Tracing{},
|
|
)
|
|
stream, err := url.Parse("http://1.1.1.1")
|
|
r.NoError(err)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req, &httpv1alpha1.HTTPScaledObject{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "testns",
|
|
},
|
|
Spec: httpv1alpha1.HTTPScaledObjectSpec{
|
|
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
|
|
Service: "nosuchsvc",
|
|
Port: 9091,
|
|
},
|
|
TargetPendingRequests: ptr.To[int32](1234),
|
|
},
|
|
})
|
|
req = util.RequestWithStream(req, stream)
|
|
req.Host = noSuchHost
|
|
|
|
start := time.Now()
|
|
hdl.ServeHTTP(res, req)
|
|
elapsed := time.Since(start)
|
|
|
|
t.Logf("elapsed time was %s", elapsed)
|
|
// serving should take at least timeouts.DeploymentReplicas, but no more than
|
|
// timeouts.DeploymentReplicas*4
|
|
r.GreaterOrEqual(elapsed, timeouts.WorkloadReplicas)
|
|
r.LessOrEqual(elapsed, timeouts.WorkloadReplicas*4)
|
|
r.Equal(502, res.Code, "response code was unexpected")
|
|
|
|
// we will always return the X-KEDA-HTTP-Cold-Start header
|
|
// when we are able to forward the
|
|
// request to the backend but not if we have failed due
|
|
// to a timeout from a waitFunc or earlier in the pipeline,
|
|
// for example, if we cannot reach the Kubernetes control
|
|
// plane.
|
|
r.Equal("", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start to be empty")
|
|
|
|
// waitFunc should have been called, even though it timed out
|
|
waitFuncCalled := false
|
|
select {
|
|
case <-waitFuncCalledCh:
|
|
waitFuncCalled = true
|
|
default:
|
|
}
|
|
|
|
r.True(waitFuncCalled, "wait function was not called")
|
|
}
|
|
|
|
func TestTimesOutOnWaitFuncTLS(t *testing.T) {
|
|
r := require.New(t)
|
|
|
|
timeouts := defaultTimeouts()
|
|
timeouts.WorkloadReplicas = 25 * time.Millisecond
|
|
timeouts.ResponseHeader = 25 * time.Millisecond
|
|
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
|
|
|
|
waitFunc, waitFuncCalledCh, finishWaitFunc := notifyingFunc()
|
|
defer finishWaitFunc()
|
|
noSuchHost := fmt.Sprintf("%s.testing", t.Name())
|
|
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: timeouts.WorkloadReplicas,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&TestTLSConfig,
|
|
&config.Tracing{},
|
|
)
|
|
stream, err := url.Parse("http://1.1.1.1")
|
|
r.NoError(err)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req, &httpv1alpha1.HTTPScaledObject{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "testns",
|
|
},
|
|
Spec: httpv1alpha1.HTTPScaledObjectSpec{
|
|
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
|
|
Name: "nosuchdepl",
|
|
Service: "nosuchsvc",
|
|
Port: 9091,
|
|
},
|
|
TargetPendingRequests: ptr.To[int32](1234),
|
|
},
|
|
})
|
|
req = util.RequestWithStream(req, stream)
|
|
req.Host = noSuchHost
|
|
|
|
start := time.Now()
|
|
hdl.ServeHTTP(res, req)
|
|
elapsed := time.Since(start)
|
|
|
|
t.Logf("elapsed time was %s", elapsed)
|
|
// serving should take at least timeouts.DeploymentReplicas, but no more than
|
|
// timeouts.DeploymentReplicas*4
|
|
r.GreaterOrEqual(elapsed, timeouts.WorkloadReplicas)
|
|
r.LessOrEqual(elapsed, timeouts.WorkloadReplicas*4)
|
|
r.Equal(502, res.Code, "response code was unexpected")
|
|
|
|
// we will always return the X-KEDA-HTTP-Cold-Start header
|
|
// when we are able to forward the
|
|
// request to the backend but not if we have failed due
|
|
// to a timeout from a waitFunc or earlier in the pipeline,
|
|
// for example, if we cannot reach the Kubernetes control
|
|
// plane.
|
|
r.Equal("", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start to be empty")
|
|
|
|
// waitFunc should have been called, even though it timed out
|
|
waitFuncCalled := false
|
|
select {
|
|
case <-waitFuncCalledCh:
|
|
waitFuncCalled = true
|
|
default:
|
|
}
|
|
|
|
r.True(waitFuncCalled, "wait function was not called")
|
|
}
|
|
|
|
// Test to make sure the proxy handler will wait for the waitFunc to
|
|
// complete
|
|
func TestWaitsForWaitFunc(t *testing.T) {
|
|
r := require.New(t)
|
|
|
|
timeouts := defaultTimeouts()
|
|
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
|
|
|
|
waitFunc, waitFuncCalledCh, finishWaitFunc := notifyingFunc()
|
|
const (
|
|
noSuchHost = "TestWaitsForWaitFunc.test"
|
|
originRespCode = 201
|
|
)
|
|
testSrv, testSrvURL, err := kedanet.StartTestServer(
|
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(originRespCode)
|
|
}),
|
|
)
|
|
r.NoError(err)
|
|
defer testSrv.Close()
|
|
_, originPort, err := splitHostPort(testSrvURL.Host)
|
|
r.NoError(err)
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: timeouts.WorkloadReplicas,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&tls.Config{},
|
|
&config.Tracing{},
|
|
)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req, targetFromURL(
|
|
testSrvURL,
|
|
originPort,
|
|
"nosuchdepl",
|
|
"noservice",
|
|
))
|
|
req = util.RequestWithStream(req, testSrvURL)
|
|
req.Host = noSuchHost
|
|
|
|
// make the wait function finish after a short duration
|
|
const waitDur = 100 * time.Millisecond
|
|
go func() {
|
|
time.Sleep(waitDur)
|
|
finishWaitFunc()
|
|
}()
|
|
|
|
start := time.Now()
|
|
hdl.ServeHTTP(res, req)
|
|
elapsed := time.Since(start)
|
|
r.NoError(waitForSignal(waitFuncCalledCh, 1*time.Second))
|
|
|
|
// should take at least waitDur, but no more than waitDur*4
|
|
r.GreaterOrEqual(elapsed, waitDur)
|
|
r.Less(elapsed, waitDur*4)
|
|
|
|
r.Equal("true", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start true")
|
|
r.Equal(
|
|
originRespCode,
|
|
res.Code,
|
|
"response code was unexpected",
|
|
)
|
|
}
|
|
|
|
func TestWaitsForWaitFuncTLS(t *testing.T) {
|
|
r := require.New(t)
|
|
|
|
timeouts := defaultTimeouts()
|
|
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
|
|
|
|
waitFunc, waitFuncCalledCh, finishWaitFunc := notifyingFunc()
|
|
const (
|
|
noSuchHost = "TestWaitsForWaitFunc.test"
|
|
originRespCode = 201
|
|
)
|
|
testSrv, testSrvURL, err := kedanet.StartTestServer(
|
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(originRespCode)
|
|
}),
|
|
)
|
|
r.NoError(err)
|
|
defer testSrv.Close()
|
|
_, originPort, err := splitHostPort(testSrvURL.Host)
|
|
r.NoError(err)
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: timeouts.WorkloadReplicas,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&TestTLSConfig,
|
|
&config.Tracing{},
|
|
)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req, targetFromURL(
|
|
testSrvURL,
|
|
originPort,
|
|
"nosuchdepl",
|
|
"nosuchsvc",
|
|
))
|
|
req = util.RequestWithStream(req, testSrvURL)
|
|
req.Host = noSuchHost
|
|
|
|
// make the wait function finish after a short duration
|
|
const waitDur = 100 * time.Millisecond
|
|
go func() {
|
|
time.Sleep(waitDur)
|
|
finishWaitFunc()
|
|
}()
|
|
|
|
start := time.Now()
|
|
hdl.ServeHTTP(res, req)
|
|
elapsed := time.Since(start)
|
|
r.NoError(waitForSignal(waitFuncCalledCh, 1*time.Second))
|
|
|
|
// should take at least waitDur, but no more than waitDur*4
|
|
r.GreaterOrEqual(elapsed, waitDur)
|
|
r.Less(elapsed, waitDur*4)
|
|
|
|
r.Equal(
|
|
originRespCode,
|
|
res.Code,
|
|
"response code was unexpected",
|
|
)
|
|
}
|
|
|
|
// the proxy should connect to a server, and then time out if
|
|
// the server doesn't respond in time
|
|
func TestWaitHeaderTimeout(t *testing.T) {
|
|
r := require.New(t)
|
|
|
|
// the origin will wait for this channel to receive or close before it sends any data back to the
|
|
// proxy
|
|
originHdlCh := make(chan struct{})
|
|
originHdl := kedanet.NewTestHTTPHandlerWrapper(
|
|
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
<-originHdlCh
|
|
w.WriteHeader(200)
|
|
_, err := w.Write([]byte("test response"))
|
|
r.NoError(err)
|
|
}),
|
|
)
|
|
srv, originURL, err := kedanet.StartTestServer(originHdl)
|
|
r.NoError(err)
|
|
defer srv.Close()
|
|
|
|
timeouts := defaultTimeouts()
|
|
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
|
|
waitFunc := func(context.Context, string, string) (bool, error) {
|
|
return false, nil
|
|
}
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: timeouts.WorkloadReplicas,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&tls.Config{},
|
|
&config.Tracing{},
|
|
)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req, &httpv1alpha1.HTTPScaledObject{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "testns",
|
|
},
|
|
Spec: httpv1alpha1.HTTPScaledObjectSpec{
|
|
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
|
|
Service: "testsvc",
|
|
Port: 9094,
|
|
},
|
|
TargetPendingRequests: ptr.To[int32](1234),
|
|
},
|
|
})
|
|
req = util.RequestWithStream(req, originURL)
|
|
req.Host = originURL.Host
|
|
|
|
hdl.ServeHTTP(res, req)
|
|
|
|
r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false")
|
|
r.Equal(502, res.Code, "response code was unexpected")
|
|
close(originHdlCh)
|
|
}
|
|
|
|
func TestWaitHeaderTimeoutTLS(t *testing.T) {
|
|
r := require.New(t)
|
|
|
|
// the origin will wait for this channel to receive or close before it sends any data back to the
|
|
// proxy
|
|
originHdlCh := make(chan struct{})
|
|
originHdl := kedanet.NewTestHTTPHandlerWrapper(
|
|
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
<-originHdlCh
|
|
w.WriteHeader(200)
|
|
_, err := w.Write([]byte("test response"))
|
|
r.NoError(err)
|
|
}),
|
|
)
|
|
srv, originURL, err := kedanet.StartTestServer(originHdl)
|
|
r.NoError(err)
|
|
defer srv.Close()
|
|
|
|
timeouts := defaultTimeouts()
|
|
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
|
|
waitFunc := func(context.Context, string, string) (bool, error) {
|
|
return false, nil
|
|
}
|
|
hdl := newForwardingHandler(
|
|
logr.Discard(),
|
|
dialCtxFunc,
|
|
waitFunc,
|
|
forwardingConfig{
|
|
waitTimeout: timeouts.WorkloadReplicas,
|
|
respHeaderTimeout: timeouts.ResponseHeader,
|
|
},
|
|
&TestTLSConfig,
|
|
&config.Tracing{},
|
|
)
|
|
const path = "/testfwd"
|
|
res, req, err := reqAndRes(path)
|
|
r.NoError(err)
|
|
req = util.RequestWithHTTPSO(req, &httpv1alpha1.HTTPScaledObject{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "testns",
|
|
},
|
|
Spec: httpv1alpha1.HTTPScaledObjectSpec{
|
|
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
|
|
Name: "nosuchdepl",
|
|
Service: "testsvc",
|
|
Port: 9094,
|
|
},
|
|
TargetPendingRequests: ptr.To[int32](1234),
|
|
},
|
|
})
|
|
req = util.RequestWithStream(req, originURL)
|
|
req.Host = originURL.Host
|
|
|
|
hdl.ServeHTTP(res, req)
|
|
|
|
r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false")
|
|
r.Equal(502, res.Code, "response code was unexpected")
|
|
close(originHdlCh)
|
|
}
|
|
|
|
func waitForSignal(sig <-chan struct{}, waitDur time.Duration) error {
|
|
tmr := time.NewTimer(waitDur)
|
|
defer tmr.Stop()
|
|
select {
|
|
case <-sig:
|
|
return nil
|
|
case <-tmr.C:
|
|
return fmt.Errorf("signal didn't happen within %s", waitDur)
|
|
}
|
|
}
|
|
|
|
// notifyingFunc creates a new function to be used as a waitFunc in the
|
|
// newForwardingHandler function. it also returns a channel that will
|
|
// be closed immediately after the function is called (not necessarily
|
|
// before it returns).
|
|
//
|
|
// the _returned_ function won't itself return until the returned func()
|
|
// is called, or the context that is passed to it is done (e.g. cancelled, timed out,
|
|
// etc...). in the former case, the returned func itself returns nil. in the latter,
|
|
// it returns ctx.Err()
|
|
func notifyingFunc() (forwardWaitFunc, <-chan struct{}, func()) {
|
|
calledCh := make(chan struct{})
|
|
finishCh := make(chan struct{})
|
|
finishFunc := func() {
|
|
close(finishCh)
|
|
}
|
|
return func(ctx context.Context, _, _ string) (bool, error) {
|
|
close(calledCh)
|
|
select {
|
|
case <-finishCh:
|
|
return true, nil
|
|
case <-ctx.Done():
|
|
return true, fmt.Errorf("TEST FUNCTION CONTEXT ERROR: %w", ctx.Err())
|
|
}
|
|
}, calledCh, finishFunc
|
|
}
|
|
|
|
func targetFromURL(
|
|
u *url.URL,
|
|
port int,
|
|
workload string,
|
|
service string,
|
|
) *httpv1alpha1.HTTPScaledObject {
|
|
host := strings.Split(u.Host, ":")[0]
|
|
return &httpv1alpha1.HTTPScaledObject{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "@" + host,
|
|
},
|
|
Spec: httpv1alpha1.HTTPScaledObjectSpec{
|
|
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
|
|
Name: workload,
|
|
Service: service,
|
|
Port: int32(port),
|
|
},
|
|
TargetPendingRequests: ptr.To[int32](123),
|
|
},
|
|
}
|
|
}
|
|
|
|
func defaultTimeouts() config.Timeouts {
|
|
return config.Timeouts{
|
|
Connect: 100 * time.Millisecond,
|
|
KeepAlive: 100 * time.Millisecond,
|
|
ResponseHeader: 500 * time.Millisecond,
|
|
WorkloadReplicas: 1 * time.Second,
|
|
}
|
|
}
|
|
|
|
// returns a kedanet.DialContextFunc by calling kedanet.DialContextWithRetry. if you pass nil for the
|
|
// timeoutConfig, it uses standard values. otherwise it uses the one you passed.
|
|
//
|
|
// the returned config.Timeouts is what was passed to the DialContextWithRetry function
|
|
func retryDialContextFunc(
|
|
timeouts config.Timeouts,
|
|
backoff wait.Backoff,
|
|
) kedanet.DialContextFunc {
|
|
dialer := kedanet.NewNetDialer(
|
|
timeouts.Connect,
|
|
timeouts.KeepAlive,
|
|
)
|
|
return kedanet.DialContextWithRetry(dialer, backoff)
|
|
}
|
|
|
|
func reqAndRes(path string) (*httptest.ResponseRecorder, *http.Request, error) {
|
|
req, err := http.NewRequest("GET", path, nil)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
resRecorder := httptest.NewRecorder()
|
|
return resRecorder, req, nil
|
|
}
|