256 lines
8.2 KiB
Go
256 lines
8.2 KiB
Go
// Handlers contains the gRPC implementation for an external scaler as defined
|
|
// by the KEDA documentation at https://keda.sh/docs/2.0/concepts/external-scalers/#built-in-scalers-interface
|
|
// This is the interface KEDA will poll in order to get the request queue size
|
|
// and scale user apps properly
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
"github.com/kedacore/keda/v2/pkg/scalers/externalscaler"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
"k8s.io/utils/ptr"
|
|
|
|
informershttpv1alpha1 "github.com/kedacore/http-add-on/operator/generated/informers/externalversions/http/v1alpha1"
|
|
"github.com/kedacore/http-add-on/pkg/k8s"
|
|
"github.com/kedacore/http-add-on/pkg/util"
|
|
)
|
|
|
|
const (
|
|
keyInterceptorTargetPendingRequests = "interceptorTargetPendingRequests"
|
|
)
|
|
|
|
var streamInterval time.Duration
|
|
|
|
func init() {
|
|
defaultMS := 200
|
|
timeoutMS, err := util.ResolveOsEnvInt("KEDA_HTTP_SCALER_STREAM_INTERVAL_MS", defaultMS)
|
|
if err != nil {
|
|
timeoutMS = defaultMS
|
|
}
|
|
streamInterval = time.Duration(timeoutMS) * time.Millisecond
|
|
}
|
|
|
|
type impl struct {
|
|
lggr logr.Logger
|
|
pinger *queuePinger
|
|
httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer
|
|
targetMetric int64
|
|
externalscaler.UnimplementedExternalScalerServer
|
|
}
|
|
|
|
func newImpl(lggr logr.Logger, pinger *queuePinger, httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer, defaultTargetMetric int64) *impl {
|
|
return &impl{
|
|
lggr: lggr,
|
|
pinger: pinger,
|
|
httpsoInformer: httpsoInformer,
|
|
targetMetric: defaultTargetMetric,
|
|
}
|
|
}
|
|
|
|
func (e *impl) Ping(context.Context, *emptypb.Empty) (*emptypb.Empty, error) {
|
|
return &emptypb.Empty{}, nil
|
|
}
|
|
|
|
func (e *impl) IsActive(ctx context.Context, sor *externalscaler.ScaledObjectRef) (*externalscaler.IsActiveResponse, error) {
|
|
lggr := e.lggr.WithName("IsActive")
|
|
|
|
gmr, err := e.GetMetrics(ctx, &externalscaler.GetMetricsRequest{
|
|
ScaledObjectRef: sor,
|
|
})
|
|
if err != nil {
|
|
lggr.Error(err, "GetMetrics failed", "scaledObjectRef", sor.String())
|
|
return nil, err
|
|
}
|
|
|
|
metricValues := gmr.GetMetricValues()
|
|
if err := errors.New("len(metricValues) != 1"); len(metricValues) != 1 {
|
|
lggr.Error(err, "invalid GetMetricsResponse", "scaledObjectRef", sor.String(), "getMetricsResponse", gmr.String())
|
|
return nil, err
|
|
}
|
|
metricValue := metricValues[0].GetMetricValue()
|
|
|
|
active := metricValue > 0
|
|
res := &externalscaler.IsActiveResponse{
|
|
Result: active,
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (e *impl) StreamIsActive(scaledObject *externalscaler.ScaledObjectRef, server externalscaler.ExternalScaler_StreamIsActiveServer) error {
|
|
// this function communicates with KEDA via the 'server' parameter.
|
|
// we call server.Send (below) every streamInterval, which tells it to immediately
|
|
// ping our IsActive RPC
|
|
ticker := time.NewTicker(streamInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-server.Context().Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
active, err := e.IsActive(server.Context(), scaledObject)
|
|
if err != nil {
|
|
e.lggr.Error(err, "error getting active status in stream")
|
|
return err
|
|
}
|
|
err = server.Send(&externalscaler.IsActiveResponse{
|
|
Result: active.Result,
|
|
})
|
|
if err != nil {
|
|
e.lggr.Error(err, "error sending the active result in stream")
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *impl) GetMetricSpec(_ context.Context, sor *externalscaler.ScaledObjectRef) (*externalscaler.GetMetricSpecResponse, error) {
|
|
lggr := e.lggr.WithName("GetMetricSpec")
|
|
|
|
scalerMetadata := sor.GetScalerMetadata()
|
|
httpScaledObjectName, ok := scalerMetadata[k8s.HTTPScaledObjectKey]
|
|
if !ok {
|
|
if scalerMetadata != nil {
|
|
if interceptorTargetPendingRequests, ok := scalerMetadata[keyInterceptorTargetPendingRequests]; ok {
|
|
// generated the metric name for the ScaledObject targeting the interceptor
|
|
metricName := MetricName(k8s.NamespacedNameFromScaledObjectRef(sor))
|
|
return e.interceptorMetricSpec(metricName, interceptorTargetPendingRequests)
|
|
}
|
|
}
|
|
err := fmt.Errorf("unable to get HTTPScaledObject reference")
|
|
lggr.Error(err, "unable to get the linked HTTPScaledObject for ScaledObject", "name", sor.Name, "namespace", sor.Namespace, "httpScaledObjectName", httpScaledObjectName)
|
|
return nil, err
|
|
}
|
|
|
|
httpso, err := e.httpsoInformer.Lister().HTTPScaledObjects(sor.Namespace).Get(httpScaledObjectName)
|
|
if err != nil {
|
|
lggr.Error(err, "unable to get HTTPScaledObject", "name", sor.Name, "namespace", sor.Namespace, "httpScaledObjectName", httpScaledObjectName)
|
|
return nil, err
|
|
}
|
|
|
|
// generated the metric name for HTTPScaledObject
|
|
metricName := MetricName(k8s.NamespacedNameFromNameAndNamespace(httpScaledObjectName, sor.Namespace))
|
|
|
|
targetValue := int64(ptr.Deref(httpso.Spec.TargetPendingRequests, 100))
|
|
|
|
if httpso.Spec.ScalingMetric != nil {
|
|
if httpso.Spec.ScalingMetric.Concurrency != nil {
|
|
targetValue = int64(httpso.Spec.ScalingMetric.Concurrency.TargetValue)
|
|
}
|
|
if httpso.Spec.ScalingMetric.Rate != nil {
|
|
targetValue = int64(httpso.Spec.ScalingMetric.Rate.TargetValue)
|
|
}
|
|
}
|
|
|
|
res := &externalscaler.GetMetricSpecResponse{
|
|
MetricSpecs: []*externalscaler.MetricSpec{
|
|
{
|
|
MetricName: metricName,
|
|
TargetSize: targetValue,
|
|
},
|
|
},
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (e *impl) interceptorMetricSpec(metricName string, interceptorTargetPendingRequests string) (*externalscaler.GetMetricSpecResponse, error) {
|
|
lggr := e.lggr.WithName("interceptorMetricSpec")
|
|
|
|
targetPendingRequests, err := strconv.ParseInt(interceptorTargetPendingRequests, 10, 64)
|
|
if err != nil {
|
|
lggr.Error(err, "unable to parse interceptorTargetPendingRequests", "value", interceptorTargetPendingRequests)
|
|
return nil, err
|
|
}
|
|
|
|
res := &externalscaler.GetMetricSpecResponse{
|
|
MetricSpecs: []*externalscaler.MetricSpec{
|
|
{
|
|
MetricName: metricName,
|
|
TargetSize: targetPendingRequests,
|
|
},
|
|
},
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (e *impl) GetMetrics(_ context.Context, metricRequest *externalscaler.GetMetricsRequest) (*externalscaler.GetMetricsResponse, error) {
|
|
lggr := e.lggr.WithName("GetMetrics")
|
|
sor := metricRequest.ScaledObjectRef
|
|
|
|
scalerMetadata := sor.GetScalerMetadata()
|
|
httpScaledObjectName, ok := scalerMetadata[k8s.HTTPScaledObjectKey]
|
|
if !ok {
|
|
if scalerMetadata != nil {
|
|
if _, ok := scalerMetadata[keyInterceptorTargetPendingRequests]; ok {
|
|
// generated the metric name for the ScaledObject targeting the interceptor
|
|
metricName := MetricName(k8s.NamespacedNameFromScaledObjectRef(sor))
|
|
return e.interceptorMetrics(metricName)
|
|
}
|
|
}
|
|
err := fmt.Errorf("unable to get HTTPScaledObject reference")
|
|
lggr.Error(err, "unable to get the linked HTTPScaledObject for ScaledObject", "name", sor.Name, "namespace", sor.Namespace, "httpScaledObjectName", httpScaledObjectName)
|
|
return nil, err
|
|
}
|
|
|
|
httpso, err := e.httpsoInformer.Lister().HTTPScaledObjects(sor.Namespace).Get(httpScaledObjectName)
|
|
if err != nil {
|
|
lggr.Error(err, "unable to get HTTPScaledObject", "name", httpScaledObjectName, "namespace", sor.Namespace, "httpScaledObjectName", httpScaledObjectName)
|
|
return nil, err
|
|
}
|
|
|
|
// generated the metric name for HTTPScaledObject
|
|
namespacedName := k8s.NamespacedNameFromNameAndNamespace(httpScaledObjectName, sor.Namespace)
|
|
metricName := MetricName(namespacedName)
|
|
|
|
key := namespacedName.String()
|
|
count := e.pinger.counts()[key]
|
|
|
|
var metricValue int
|
|
if httpso.Spec.ScalingMetric != nil && httpso.Spec.ScalingMetric.Rate != nil {
|
|
metricValue = int(math.Ceil(count.RPS))
|
|
lggr.V(1).Info(fmt.Sprintf("%d rps for %s", metricValue, httpso.GetName()))
|
|
} else {
|
|
metricValue = count.Concurrency
|
|
lggr.V(1).Info(fmt.Sprintf("%d concurrent requests for %s", metricValue, httpso.GetName()))
|
|
}
|
|
|
|
res := &externalscaler.GetMetricsResponse{
|
|
MetricValues: []*externalscaler.MetricValue{
|
|
{
|
|
MetricName: metricName,
|
|
MetricValue: int64(metricValue),
|
|
},
|
|
},
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (e *impl) interceptorMetrics(metricName string) (*externalscaler.GetMetricsResponse, error) {
|
|
lggr := e.lggr.WithName("interceptorMetrics")
|
|
|
|
var count int64
|
|
for _, v := range e.pinger.counts() {
|
|
count += int64(v.Concurrency)
|
|
}
|
|
if err := strconv.ErrRange; count < 0 {
|
|
lggr.Error(err, "count overflowed", "value", count)
|
|
return nil, err
|
|
}
|
|
|
|
res := &externalscaler.GetMetricsResponse{
|
|
MetricValues: []*externalscaler.MetricValue{
|
|
{
|
|
MetricName: metricName,
|
|
MetricValue: count,
|
|
},
|
|
},
|
|
}
|
|
return res, nil
|
|
}
|