flagd/flagd-proxy/pkg/service/server.go

159 lines
4.3 KiB
Go

package service
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"strings"
"time"
syncv1 "buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
rpc "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/flagd-proxy/pkg/service/subscriptions"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)
type Server struct {
server *http.Server
metricsServer *http.Server
Logger *logger.Logger
// oldHandler will not be required anymore when https://github.com/open-feature/flagd/issues/1088 is being worked on
oldHandler *oldHandler
handler *handler
config service.Configuration
grpcServer *grpc.Server
metricServerReady bool
}
func NewServer(ctx context.Context, logger *logger.Logger, store subscriptions.Manager) *Server {
theOldHandler := &oldHandler{
logger: logger,
syncStore: store,
ctx: ctx,
}
theNewHandler := &handler{
logger: logger,
syncStore: store,
ctx: ctx,
}
return &Server{
oldHandler: theOldHandler,
handler: theNewHandler,
Logger: logger,
}
}
func (s *Server) Serve(ctx context.Context, svcConf service.Configuration) error {
s.config = svcConf
s.metricServerReady = true
g, gCtx := errgroup.WithContext(ctx)
g.Go(s.startServer)
g.Go(s.startMetricsServer)
g.Go(func() error {
<-gCtx.Done()
if s.server != nil {
if err := s.server.Shutdown(gCtx); err != nil {
return fmt.Errorf("error shutting down flag evaluation server: %w", err)
}
}
return nil
})
g.Go(func() error {
<-gCtx.Done()
if s.metricsServer != nil {
if err := s.metricsServer.Shutdown(gCtx); err != nil {
return fmt.Errorf("error shutting down metrics server: %w", err)
}
}
return nil
})
g.Go(s.captureMetrics)
err := g.Wait()
if err != nil {
return fmt.Errorf("errgroup closed with error: %w", err)
}
return nil
}
func (s *Server) Shutdown() {
s.metricServerReady = false
// Stop the GRPc server gracefully
s.grpcServer.GracefulStop()
}
func (s *Server) startServer() error {
var lis net.Listener
var err error
address := fmt.Sprintf(":%d", s.config.Port)
lis, err = net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("error setting up listener for address %s: %w", address, err)
}
s.grpcServer = grpc.NewServer()
rpc.RegisterFlagSyncServiceServer(s.grpcServer, s.oldHandler)
syncv1.RegisterFlagSyncServiceServer(s.grpcServer, s.handler)
if err := s.grpcServer.Serve(
lis,
); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("error returned from grpc server: %w", err)
}
return nil
}
func (s *Server) startMetricsServer() error {
s.Logger.Info(fmt.Sprintf("binding metrics to %d", s.config.ManagementPort))
grpcServer := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, health.NewServer())
mux := http.NewServeMux()
mux.Handle("/healthz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
mux.Handle("/readyz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if s.metricServerReady && s.config.ReadinessProbe() {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusPreconditionFailed)
}
}))
mux.Handle("/metrics", promhttp.Handler())
handler := http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
// if this is 'application/grpcServer' and HTTP2, handle with gRPC, otherwise HTTP.
if request.ProtoMajor == 2 && strings.HasPrefix(request.Header.Get("Content-Type"), "application/grpcServer") {
grpcServer.ServeHTTP(writer, request)
} else {
mux.ServeHTTP(writer, request)
return
}
})
s.metricsServer = &http.Server{
Addr: fmt.Sprintf(":%d", s.config.ManagementPort),
ReadHeaderTimeout: 3 * time.Second,
Handler: h2c.NewHandler(handler, &http2.Server{}), // we need to use h2c to support plaintext HTTP2
}
if err := s.metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("error returned from metrics server: %w", err)
}
return nil
}