flagd/flagd-proxy/pkg/service/handler.go

117 lines
3.3 KiB
Go

package service
import (
"context"
"fmt"
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
rpc "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
syncv12 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/flagd/flagd-proxy/pkg/service/subscriptions"
)
type handler struct {
syncv1grpc.UnimplementedFlagSyncServiceServer
syncStore subscriptions.Manager
logger *logger.Logger
// ctx is used to handle SIG[INT|TERM]
ctx context.Context
}
func (nh *handler) SyncFlags(
request *syncv12.SyncFlagsRequest,
server syncv1grpc.FlagSyncService_SyncFlagsServer,
) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errChan := make(chan error)
dataSync := make(chan sync.DataSync)
nh.syncStore.RegisterSubscription(ctx, request.GetSelector(), request, dataSync, errChan)
for {
select {
case e := <-errChan:
return e
case d := <-dataSync:
if err := server.Send(&syncv12.SyncFlagsResponse{
FlagConfiguration: d.FlagData,
}); err != nil {
return fmt.Errorf("error sending configuration change event: %w", err)
}
case <-server.Context().Done():
return nil
case <-nh.ctx.Done():
return nil
}
}
}
func (nh *handler) FetchAllFlags(
ctx context.Context,
request *syncv12.FetchAllFlagsRequest,
) (*syncv12.FetchAllFlagsResponse, error) {
data, err := nh.syncStore.FetchAllFlags(ctx, request, request.GetSelector())
if err != nil {
return &syncv12.FetchAllFlagsResponse{}, fmt.Errorf("error fetching all flags from sync store: %w", err)
}
return &syncv12.FetchAllFlagsResponse{
FlagConfiguration: data.FlagData,
}, nil
}
// oldHandler is the implementation of the old sync schema.
// this will not be required anymore when it is time to work on https://github.com/open-feature/flagd/issues/1088
type oldHandler struct {
rpc.UnimplementedFlagSyncServiceServer
syncStore subscriptions.Manager
logger *logger.Logger
// ctx is used to handle SIG[INT|TERM]
ctx context.Context
}
//nolint:staticcheck
func (l *oldHandler) FetchAllFlags(ctx context.Context, req *syncv1.FetchAllFlagsRequest) (
*syncv1.FetchAllFlagsResponse,
error,
) {
data, err := l.syncStore.FetchAllFlags(ctx, req, req.GetSelector())
if err != nil {
return &syncv1.FetchAllFlagsResponse{}, fmt.Errorf("error fetching all flags from sync store: %w", err)
}
return &syncv1.FetchAllFlagsResponse{
FlagConfiguration: data.FlagData,
}, nil
}
//nolint:staticcheck
func (l *oldHandler) SyncFlags(
req *syncv1.SyncFlagsRequest,
stream rpc.FlagSyncService_SyncFlagsServer,
) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errChan := make(chan error)
dataSync := make(chan sync.DataSync)
l.syncStore.RegisterSubscription(ctx, req.GetSelector(), req, dataSync, errChan)
for {
select {
case e := <-errChan:
return e
case d := <-dataSync:
if err := stream.Send(&syncv1.SyncFlagsResponse{
FlagConfiguration: d.FlagData,
}); err != nil {
return fmt.Errorf("error sending configuration change event: %w", err)
}
case <-stream.Context().Done():
return nil
case <-l.ctx.Done():
return nil
}
}
}