197 lines
5.8 KiB
Go
197 lines
5.8 KiB
Go
// The HTTP Scaler is the standard implementation for a KEDA external scaler
|
|
// which can be found at https://keda.sh/docs/2.0/concepts/external-scalers/
|
|
// This scaler has the implementation of an HTTP request counter and informs
|
|
// KEDA of the current request number for the queue in order to scale the app
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
"github.com/kedacore/keda/v2/pkg/scalers/externalscaler"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/health"
|
|
"google.golang.org/grpc/health/grpc_health_v1"
|
|
"google.golang.org/grpc/reflection"
|
|
"k8s.io/client-go/kubernetes"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/log/zap"
|
|
|
|
clientset "github.com/kedacore/http-add-on/operator/generated/clientset/versioned"
|
|
informers "github.com/kedacore/http-add-on/operator/generated/informers/externalversions"
|
|
informershttpv1alpha1 "github.com/kedacore/http-add-on/operator/generated/informers/externalversions/http/v1alpha1"
|
|
"github.com/kedacore/http-add-on/pkg/build"
|
|
"github.com/kedacore/http-add-on/pkg/k8s"
|
|
"github.com/kedacore/http-add-on/pkg/util"
|
|
)
|
|
|
|
var (
|
|
setupLog = ctrl.Log.WithName("setup")
|
|
)
|
|
|
|
// +kubebuilder:rbac:groups=discovery.k8s.io,resources=endpointslices,verbs=get;list;watch
|
|
// +kubebuilder:rbac:groups=http.keda.sh,resources=httpscaledobjects,verbs=get;list;watch
|
|
|
|
func main() {
|
|
cfg := mustParseConfig()
|
|
grpcPort := cfg.GRPCPort
|
|
namespace := cfg.TargetNamespace
|
|
svcName := cfg.TargetService
|
|
deplName := cfg.TargetDeployment
|
|
targetPortStr := fmt.Sprintf("%d", cfg.TargetPort)
|
|
targetPendingRequests := cfg.TargetPendingRequests
|
|
profilingAddr := cfg.ProfilingAddr
|
|
|
|
opts := zap.Options{}
|
|
opts.BindFlags(flag.CommandLine)
|
|
flag.Parse()
|
|
|
|
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
|
|
|
|
k8sCfg, err := ctrl.GetConfig()
|
|
if err != nil {
|
|
setupLog.Error(err, "Kubernetes client config not found")
|
|
os.Exit(1)
|
|
}
|
|
k8sCl, err := kubernetes.NewForConfig(k8sCfg)
|
|
if err != nil {
|
|
setupLog.Error(err, "creating new Kubernetes ClientSet")
|
|
os.Exit(1)
|
|
}
|
|
pinger := newQueuePinger(ctrl.Log, k8s.EndpointsFuncForK8sClientset(k8sCl), namespace, svcName, deplName, targetPortStr)
|
|
|
|
// create the endpoints informer
|
|
endpInformer := k8s.NewInformerBackedEndpointsCache(
|
|
ctrl.Log,
|
|
k8sCl,
|
|
cfg.DeploymentCacheRsyncPeriod,
|
|
)
|
|
|
|
httpCl, err := clientset.NewForConfig(k8sCfg)
|
|
if err != nil {
|
|
setupLog.Error(err, "creating new HTTP ClientSet")
|
|
os.Exit(1)
|
|
}
|
|
sharedInformerFactory := informers.NewSharedInformerFactory(httpCl, cfg.ConfigMapCacheRsyncPeriod)
|
|
httpsoInformer := informershttpv1alpha1.New(sharedInformerFactory, "", nil).HTTPScaledObjects()
|
|
|
|
ctx := ctrl.SetupSignalHandler()
|
|
ctx = util.ContextWithLogger(ctx, setupLog)
|
|
|
|
eg, ctx := errgroup.WithContext(ctx)
|
|
|
|
// start the endpoints informer
|
|
eg.Go(func() error {
|
|
setupLog.Info("starting the endpoints informer")
|
|
|
|
endpInformer.Start(ctx)
|
|
return nil
|
|
})
|
|
|
|
// start the httpso informer
|
|
eg.Go(func() error {
|
|
setupLog.Info("starting the httpso informer")
|
|
|
|
httpsoInformer.Informer().Run(ctx.Done())
|
|
return nil
|
|
})
|
|
|
|
eg.Go(func() error {
|
|
setupLog.Info("starting the queue pinger")
|
|
|
|
if err := pinger.start(ctx, time.NewTicker(cfg.QueueTickDuration), endpInformer); !util.IsIgnoredErr(err) {
|
|
setupLog.Error(err, "queue pinger failed")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if len(profilingAddr) > 0 {
|
|
eg.Go(func() error {
|
|
setupLog.Info("enabling pprof for profiling", "address", profilingAddr)
|
|
return http.ListenAndServe(profilingAddr, nil)
|
|
})
|
|
}
|
|
|
|
eg.Go(func() error {
|
|
setupLog.Info("starting the grpc server")
|
|
|
|
if err := startGrpcServer(ctx, cfg, ctrl.Log, grpcPort, pinger, httpsoInformer, int64(targetPendingRequests)); !util.IsIgnoredErr(err) {
|
|
setupLog.Error(err, "grpc server failed")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
build.PrintComponentInfo(ctrl.Log, "Scaler")
|
|
|
|
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
|
|
setupLog.Error(err, "fatal error")
|
|
os.Exit(1)
|
|
}
|
|
|
|
setupLog.Info("Bye!")
|
|
}
|
|
|
|
func startGrpcServer(ctx context.Context, cfg *config, lggr logr.Logger, port int, pinger *queuePinger, httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer, targetPendingRequests int64) error {
|
|
addr := fmt.Sprintf("0.0.0.0:%d", port)
|
|
lggr.Info("starting grpc server", "address", addr)
|
|
|
|
lis, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
grpcServer := grpc.NewServer()
|
|
reflection.Register(grpcServer)
|
|
|
|
hs := health.NewServer()
|
|
go func() {
|
|
lggr.Info("starting healthchecks loop")
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
// handle cancellations/timeout
|
|
case <-ctx.Done():
|
|
hs.SetServingStatus("liveness", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
|
|
hs.SetServingStatus("readiness", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
|
|
return
|
|
// do our regularly scheduled work
|
|
case <-ticker.C:
|
|
// if we haven't updated the endpoints in twice QueueTickDuration we drop the check
|
|
if time.Now().After(pinger.lastPingTime.Add(cfg.QueueTickDuration * 2)) {
|
|
hs.SetServingStatus("liveness", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
|
|
hs.SetServingStatus("readiness", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
|
|
} else {
|
|
// we propagate pinger status as scaler status
|
|
hs.SetServingStatus("liveness", grpc_health_v1.HealthCheckResponse_ServingStatus(pinger.status))
|
|
hs.SetServingStatus("readiness", grpc_health_v1.HealthCheckResponse_ServingStatus(pinger.status))
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
grpc_health_v1.RegisterHealthServer(grpcServer, hs)
|
|
|
|
externalscaler.RegisterExternalScalerServer(grpcServer, newImpl(lggr, pinger, httpsoInformer, targetPendingRequests))
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
grpcServer.GracefulStop()
|
|
}()
|
|
|
|
return grpcServer.Serve(lis)
|
|
}
|