diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c53cd5..d52bdab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,7 @@ This changelog keeps track of work items that have been completed and are ready ### Fixes -- **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO)) +- **Scaler**: remplement custom interceptor metrics ([#718](https://github.com/kedacore/http-add-on/issues/718)) ### Deprecations diff --git a/config/interceptor/kustomization.yaml b/config/interceptor/kustomization.yaml index fce9640..4158a66 100644 --- a/config/interceptor/kustomization.yaml +++ b/config/interceptor/kustomization.yaml @@ -7,6 +7,9 @@ resources: - admin.service.yaml - proxy.service.yaml - service_account.yaml +- scaledobject.yaml +configurations: +- transformerconfig.yaml labels: - includeSelectors: true includeTemplates: true diff --git a/config/interceptor/scaledobject.yaml b/config/interceptor/scaledobject.yaml new file mode 100644 index 0000000..14b224b --- /dev/null +++ b/config/interceptor/scaledobject.yaml @@ -0,0 +1,17 @@ +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: interceptor +spec: + minReplicaCount: 3 + maxReplicaCount: 50 + pollingInterval: 1 + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: interceptor + triggers: + - type: external + metadata: + scalerAddress: external-scaler:9090 + interceptorTargetPendingRequests: '200' diff --git a/config/interceptor/transformerconfig.yaml b/config/interceptor/transformerconfig.yaml new file mode 100644 index 0000000..251589f --- /dev/null +++ b/config/interceptor/transformerconfig.yaml @@ -0,0 +1,9 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: TransformerConfig +namePrefix: + - apiVersion: keda.sh/v1alpha1 + kind: ScaledObject + path: spec/scaleTargetRef/name + - apiVersion: keda.sh/v1alpha1 + kind: ScaledObject + path: spec/triggers/metadata/scalerAddress diff --git a/scaler/config.go b/scaler/config.go index c18853b..28bbdcd 100644 --- a/scaler/config.go +++ b/scaler/config.go @@ -36,8 +36,6 @@ type config struct { DeploymentCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_DEPLOYMENT_INFORMER_RSYNC_PERIOD" default:"60m"` // QueueTickDuration is the duration between queue requests QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"` - // This will be the 'Target Pending Requests' for the interceptor - TargetPendingRequestsInterceptor int `envconfig:"KEDA_HTTP_SCALER_TARGET_PENDING_REQUESTS_INTERCEPTOR" default:"100"` } func mustParseConfig() *config { diff --git a/scaler/handlers.go b/scaler/handlers.go index d24aaf3..bee3d2f 100644 --- a/scaler/handlers.go +++ b/scaler/handlers.go @@ -6,7 +6,9 @@ package main import ( "context" + "errors" "math/rand" + "strconv" "time" "github.com/go-logr/logr" @@ -22,12 +24,15 @@ func init() { rand.Seed(time.Now().UnixNano()) } +const ( + keyInterceptorTargetPendingRequests = "interceptorTargetPendingRequests" +) + type impl struct { - lggr logr.Logger - pinger *queuePinger - httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer - targetMetric int64 - targetMetricInterceptor int64 + lggr logr.Logger + pinger *queuePinger + httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer + targetMetric int64 externalscaler.UnimplementedExternalScalerServer } @@ -36,14 +41,12 @@ func newImpl( pinger *queuePinger, httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer, defaultTargetMetric int64, - defaultTargetMetricInterceptor int64, ) *impl { return &impl{ - lggr: lggr, - pinger: pinger, - httpsoInformer: httpsoInformer, - targetMetric: defaultTargetMetric, - targetMetricInterceptor: defaultTargetMetricInterceptor, + lggr: lggr, + pinger: pinger, + httpsoInformer: httpsoInformer, + targetMetric: defaultTargetMetric, } } @@ -52,15 +55,27 @@ func (e *impl) Ping(context.Context, *emptypb.Empty) (*emptypb.Empty, error) { } func (e *impl) IsActive( - _ context.Context, - scaledObject *externalscaler.ScaledObjectRef, + ctx context.Context, + sor *externalscaler.ScaledObjectRef, ) (*externalscaler.IsActiveResponse, error) { - namespacedName := k8s.NamespacedNameFromScaledObjectRef(scaledObject) + lggr := e.lggr.WithName("IsActive") - key := namespacedName.String() - count := e.pinger.counts()[key] + gmr, err := e.GetMetrics(ctx, &externalscaler.GetMetricsRequest{ + ScaledObjectRef: sor, + }) + if err != nil { + lggr.Error(err, "GetMetrics failed", "scaledObjectRef", sor.String()) + return nil, err + } - active := count > 0 + metricValues := gmr.GetMetricValues() + if err := errors.New("len(metricValues) != 1"); len(metricValues) != 1 { + lggr.Error(err, "invalid GetMetricsResponse", "scaledObjectRef", sor.String(), "getMetricsResponse", gmr.String()) + return nil, err + } + metricValue := metricValues[0].GetMetricValue() + + active := metricValue > 0 res := &externalscaler.IsActiveResponse{ Result: active, } @@ -114,6 +129,12 @@ func (e *impl) GetMetricSpec( httpso, err := e.httpsoInformer.Lister().HTTPScaledObjects(sor.Namespace).Get(sor.Name) if err != nil { + if scalerMetadata := sor.GetScalerMetadata(); scalerMetadata != nil { + if interceptorTargetPendingRequests, ok := scalerMetadata[keyInterceptorTargetPendingRequests]; ok { + return e.interceptorMetricSpec(metricName, interceptorTargetPendingRequests) + } + } + lggr.Error(err, "unable to get HTTPScaledObject", "name", sor.Name, "namespace", sor.Namespace) return nil, err } @@ -130,6 +151,26 @@ func (e *impl) GetMetricSpec( return res, nil } +func (e *impl) interceptorMetricSpec(metricName string, interceptorTargetPendingRequests string) (*externalscaler.GetMetricSpecResponse, error) { + lggr := e.lggr.WithName("interceptorMetricSpec") + + targetPendingRequests, err := strconv.ParseInt(interceptorTargetPendingRequests, 10, 64) + if err != nil { + lggr.Error(err, "unable to parse interceptorTargetPendingRequests", "value", interceptorTargetPendingRequests) + return nil, err + } + + res := &externalscaler.GetMetricSpecResponse{ + MetricSpecs: []*externalscaler.MetricSpec{ + { + MetricName: metricName, + TargetSize: targetPendingRequests, + }, + }, + } + return res, nil +} + func (e *impl) GetMetrics( _ context.Context, metricRequest *externalscaler.GetMetricsRequest, @@ -140,13 +181,44 @@ func (e *impl) GetMetrics( metricName := MetricName(namespacedName) key := namespacedName.String() - count := e.pinger.counts()[key] + count := int64(e.pinger.counts()[key]) + + if count == 0 { + if scalerMetadata := sor.GetScalerMetadata(); scalerMetadata != nil { + if _, ok := scalerMetadata[keyInterceptorTargetPendingRequests]; ok { + return e.interceptorMetrics(metricName) + } + } + } res := &externalscaler.GetMetricsResponse{ MetricValues: []*externalscaler.MetricValue{ { MetricName: metricName, - MetricValue: int64(count), + MetricValue: count, + }, + }, + } + return res, nil +} + +func (e *impl) interceptorMetrics(metricName string) (*externalscaler.GetMetricsResponse, error) { + lggr := e.lggr.WithName("interceptorMetrics") + + var count int64 + for _, v := range e.pinger.counts() { + count += int64(v) + } + if err := strconv.ErrRange; count < 0 { + lggr.Error(err, "count overflowed", "value", count) + return nil, err + } + + res := &externalscaler.GetMetricsResponse{ + MetricValues: []*externalscaler.MetricValue{ + { + MetricName: metricName, + MetricValue: count, }, }, } diff --git a/scaler/handlers_test.go b/scaler/handlers_test.go index 251fe79..4eb8bdb 100644 --- a/scaler/handlers_test.go +++ b/scaler/handlers_test.go @@ -27,10 +27,11 @@ import ( func TestStreamIsActive(t *testing.T) { type testCase struct { - name string - expected bool - expectedErr bool - setup func(t *testing.T, qp *queuePinger) + name string + expected bool + expectedErr bool + setup func(t *testing.T, qp *queuePinger) + scalerMetadata map[string]string } testCases := []testCase{ { @@ -96,6 +97,22 @@ func TestStreamIsActive(t *testing.T) { expectedErr: true, setup: func(_ *testing.T, _ *queuePinger) {}, }, + { + name: "Interceptor", + expected: true, + expectedErr: false, + setup: func(_ *testing.T, qp *queuePinger) { + qp.pingMut.Lock() + defer qp.pingMut.Unlock() + + qp.allCounts["a"] = 1 + qp.allCounts["b"] = 2 + qp.allCounts["c"] = 3 + }, + scalerMetadata: map[string]string{ + keyInterceptorTargetPendingRequests: "1000", + }, + }, } for _, tc := range testCases { @@ -117,7 +134,6 @@ func TestStreamIsActive(t *testing.T) { pinger, informer, 123, - 200, ) bufSize := 1024 * 1024 @@ -145,8 +161,9 @@ func TestStreamIsActive(t *testing.T) { client := externalscaler.NewExternalScalerClient(conn) testRef := &externalscaler.ScaledObjectRef{ - Namespace: "default", - Name: t.Name(), + Namespace: "default", + Name: t.Name(), + ScalerMetadata: tc.scalerMetadata, } // First will see if we can establish the stream and handle this @@ -176,10 +193,11 @@ func TestStreamIsActive(t *testing.T) { func TestIsActive(t *testing.T) { type testCase struct { - name string - expected bool - expectedErr bool - setup func(t *testing.T, qp *queuePinger) + name string + expected bool + expectedErr bool + setup func(t *testing.T, qp *queuePinger) + scalerMetadata map[string]string } testCases := []testCase{ { @@ -245,6 +263,22 @@ func TestIsActive(t *testing.T) { expectedErr: true, setup: func(_ *testing.T, _ *queuePinger) {}, }, + { + name: "Interceptor", + expected: true, + expectedErr: false, + setup: func(_ *testing.T, qp *queuePinger) { + qp.pingMut.Lock() + defer qp.pingMut.Unlock() + + qp.allCounts["a"] = 1 + qp.allCounts["b"] = 2 + qp.allCounts["c"] = 3 + }, + scalerMetadata: map[string]string{ + keyInterceptorTargetPendingRequests: "1000", + }, + }, } for _, tc := range testCases { @@ -265,14 +299,14 @@ func TestIsActive(t *testing.T) { pinger, informer, 123, - 200, ) res, err := hdl.IsActive( ctx, &externalscaler.ScaledObjectRef{ - Namespace: "default", - Name: t.Name(), + Namespace: "default", + Name: t.Name(), + ScalerMetadata: tc.scalerMetadata, }, ) @@ -292,17 +326,16 @@ func TestIsActive(t *testing.T) { func TestGetMetricSpecTable(t *testing.T) { const ns = "testns" type testCase struct { - name string - defaultTargetMetric int64 - defaultTargetMetricInterceptor int64 - newInformer func(*testing.T, *gomock.Controller) *informersexternalversionshttpv1alpha1mock.MockHTTPScaledObjectInformer - checker func(*testing.T, *externalscaler.GetMetricSpecResponse, error) + name string + defaultTargetMetric int64 + newInformer func(*testing.T, *gomock.Controller) *informersexternalversionshttpv1alpha1mock.MockHTTPScaledObjectInformer + checker func(*testing.T, *externalscaler.GetMetricSpecResponse, error) + scalerMetadata map[string]string } cases := []testCase{ { - name: "valid host as single host value in scaler metadata", - defaultTargetMetric: 0, - defaultTargetMetricInterceptor: 123, + name: "valid host as single host value in scaler metadata", + defaultTargetMetric: 0, newInformer: func(t *testing.T, ctrl *gomock.Controller) *informersexternalversionshttpv1alpha1mock.MockHTTPScaledObjectInformer { informer, _, namespaceLister := newMocks(ctrl) @@ -338,9 +371,8 @@ func TestGetMetricSpecTable(t *testing.T) { }, }, { - name: "valid hosts as multiple hosts value in scaler metadata", - defaultTargetMetric: 0, - defaultTargetMetricInterceptor: 123, + name: "valid hosts as multiple hosts value in scaler metadata", + defaultTargetMetric: 0, newInformer: func(t *testing.T, ctrl *gomock.Controller) *informersexternalversionshttpv1alpha1mock.MockHTTPScaledObjectInformer { informer, _, namespaceLister := newMocks(ctrl) @@ -379,6 +411,34 @@ func TestGetMetricSpecTable(t *testing.T) { r.Equal(int64(123), spec.TargetSize) }, }, + { + name: "interceptor", + defaultTargetMetric: 0, + newInformer: func(t *testing.T, ctrl *gomock.Controller) *informersexternalversionshttpv1alpha1mock.MockHTTPScaledObjectInformer { + informer, _, namespaceLister := newMocks(ctrl) + + namespaceLister.EXPECT(). + Get(gomock.Any()). + DoAndReturn(func(name string) (*httpv1alpha1.HTTPScaledObject, error) { + return nil, errors.NewNotFound(httpv1alpha1.Resource("httpscaledobject"), name) + }) + + return informer + }, + checker: func(t *testing.T, res *externalscaler.GetMetricSpecResponse, err error) { + t.Helper() + r := require.New(t) + r.NoError(err) + r.NotNil(res) + r.Equal(1, len(res.MetricSpecs)) + spec := res.MetricSpecs[0] + r.Equal(MetricName(&types.NamespacedName{Namespace: ns, Name: t.Name()}), spec.MetricName) + r.Equal(int64(1000), spec.TargetSize) + }, + scalerMetadata: map[string]string{ + keyInterceptorTargetPendingRequests: "1000", + }, + }, } for i, c := range cases { @@ -407,11 +467,11 @@ func TestGetMetricSpecTable(t *testing.T) { pinger, informer, testCase.defaultTargetMetric, - testCase.defaultTargetMetricInterceptor, ) scaledObjectRef := externalscaler.ScaledObjectRef{ - Namespace: ns, - Name: t.Name(), + Namespace: ns, + Name: t.Name(), + ScalerMetadata: testCase.scalerMetadata, } ret, err := hdl.GetMetricSpec(ctx, &scaledObjectRef) testCase.checker(t, ret, err) @@ -432,9 +492,9 @@ func TestGetMetrics(t *testing.T) { context.Context, logr.Logger, ) (*informersexternalversionshttpv1alpha1mock.MockHTTPScaledObjectInformer, *queuePinger, func(), error) - checkFn func(*testing.T, *externalscaler.GetMetricsResponse, error) - defaultTargetMetric int64 - defaultTargetMetricInterceptor int64 + checkFn func(*testing.T, *externalscaler.GetMetricsResponse, error) + defaultTargetMetric int64 + scalerMetadata map[string]string } startFakeInterceptorServer := func( @@ -511,8 +571,7 @@ func TestGetMetrics(t *testing.T) { r.Equal(MetricName(&types.NamespacedName{Namespace: ns, Name: t.Name()}), metricVal.MetricName) r.Equal(int64(0), metricVal.MetricValue) }, - defaultTargetMetric: int64(200), - defaultTargetMetricInterceptor: int64(300), + defaultTargetMetric: int64(200), }, { name: "HTTPSO present in the queue pinger", @@ -549,8 +608,7 @@ func TestGetMetrics(t *testing.T) { r.Equal(MetricName(&types.NamespacedName{Namespace: ns, Name: t.Name()}), metricVal.MetricName) r.Equal(int64(201), metricVal.MetricValue) }, - defaultTargetMetric: int64(200), - defaultTargetMetricInterceptor: int64(300), + defaultTargetMetric: int64(200), }, { name: "multiple validHosts add MetricValues", @@ -590,8 +648,47 @@ func TestGetMetrics(t *testing.T) { // in the setup function r.Equal(int64(579), metricVal.MetricValue) }, - defaultTargetMetric: int64(500), - defaultTargetMetricInterceptor: int64(600), + defaultTargetMetric: int64(500), + }, + { + name: "interceptor", + setupFn: func( + t *testing.T, + ctrl *gomock.Controller, + ctx context.Context, + lggr logr.Logger, + ) (*informersexternalversionshttpv1alpha1mock.MockHTTPScaledObjectInformer, *queuePinger, func(), error) { + informer, _, _ := newMocks(ctrl) + + memory := map[string]int{ + "a": 1, + "b": 2, + "c": 3, + } + pinger, done, err := startFakeInterceptorServer(ctx, lggr, memory, 2*time.Millisecond) + if err != nil { + return nil, nil, nil, err + } + + return informer, pinger, done, nil + }, + checkFn: func(t *testing.T, res *externalscaler.GetMetricsResponse, err error) { + t.Helper() + r := require.New(t) + r.NoError(err) + r.NotNil(res) + r.Equal(1, len(res.MetricValues)) + metricVal := res.MetricValues[0] + r.Equal(MetricName(&types.NamespacedName{Namespace: ns, Name: t.Name()}), metricVal.MetricName) + // the value here needs to be the same thing as + // the sum of the values in the fake queue created + // in the setup function + r.Equal(int64(6), metricVal.MetricValue) + }, + defaultTargetMetric: int64(500), + scalerMetadata: map[string]string{ + keyInterceptorTargetPendingRequests: "1000", + }, }, } @@ -616,12 +713,12 @@ func TestGetMetrics(t *testing.T) { pinger, informer, tc.defaultTargetMetric, - tc.defaultTargetMetricInterceptor, ) res, err := hdl.GetMetrics(ctx, &externalscaler.GetMetricsRequest{ ScaledObjectRef: &externalscaler.ScaledObjectRef{ - Namespace: ns, - Name: t.Name(), + Namespace: ns, + Name: t.Name(), + ScalerMetadata: tc.scalerMetadata, }, }) tc.checkFn(t, res, err) diff --git a/scaler/main.go b/scaler/main.go index 589e929..ccceb41 100644 --- a/scaler/main.go +++ b/scaler/main.go @@ -48,7 +48,6 @@ func main() { deplName := cfg.TargetDeployment targetPortStr := fmt.Sprintf("%d", cfg.TargetPort) targetPendingRequests := cfg.TargetPendingRequests - targetPendingRequestsInterceptor := cfg.TargetPendingRequestsInterceptor k8sCfg, err := ctrl.GetConfig() if err != nil { @@ -127,7 +126,6 @@ func main() { pinger, httpsoInformer, int64(targetPendingRequests), - int64(targetPendingRequestsInterceptor), ) }) @@ -142,7 +140,6 @@ func startGrpcServer( pinger *queuePinger, httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer, targetPendingRequests int64, - targetPendingRequestsInterceptor int64, ) error { addr := fmt.Sprintf("0.0.0.0:%d", port) lggr.Info("starting grpc server", "address", addr) @@ -170,7 +167,6 @@ func startGrpcServer( pinger, httpsoInformer, targetPendingRequests, - targetPendingRequestsInterceptor, ), )