117 lines
3.3 KiB
Go
117 lines
3.3 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
|
|
rpc "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
|
|
syncv12 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
|
|
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1"
|
|
"github.com/open-feature/flagd/core/pkg/logger"
|
|
"github.com/open-feature/flagd/core/pkg/sync"
|
|
"github.com/open-feature/flagd/flagd-proxy/pkg/service/subscriptions"
|
|
)
|
|
|
|
type handler struct {
|
|
syncv1grpc.UnimplementedFlagSyncServiceServer
|
|
syncStore subscriptions.Manager
|
|
logger *logger.Logger
|
|
// ctx is used to handle SIG[INT|TERM]
|
|
ctx context.Context
|
|
}
|
|
|
|
func (nh *handler) SyncFlags(
|
|
request *syncv12.SyncFlagsRequest,
|
|
server syncv1grpc.FlagSyncService_SyncFlagsServer,
|
|
) error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
errChan := make(chan error)
|
|
dataSync := make(chan sync.DataSync)
|
|
nh.syncStore.RegisterSubscription(ctx, request.GetSelector(), request, dataSync, errChan)
|
|
for {
|
|
select {
|
|
case e := <-errChan:
|
|
return e
|
|
case d := <-dataSync:
|
|
if err := server.Send(&syncv12.SyncFlagsResponse{
|
|
FlagConfiguration: d.FlagData,
|
|
}); err != nil {
|
|
return fmt.Errorf("error sending configuration change event: %w", err)
|
|
}
|
|
case <-server.Context().Done():
|
|
return nil
|
|
case <-nh.ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (nh *handler) FetchAllFlags(
|
|
ctx context.Context,
|
|
request *syncv12.FetchAllFlagsRequest,
|
|
) (*syncv12.FetchAllFlagsResponse, error) {
|
|
data, err := nh.syncStore.FetchAllFlags(ctx, request, request.GetSelector())
|
|
if err != nil {
|
|
return &syncv12.FetchAllFlagsResponse{}, fmt.Errorf("error fetching all flags from sync store: %w", err)
|
|
}
|
|
|
|
return &syncv12.FetchAllFlagsResponse{
|
|
FlagConfiguration: data.FlagData,
|
|
}, nil
|
|
}
|
|
|
|
// oldHandler is the implementation of the old sync schema.
|
|
// this will not be required anymore when it is time to work on https://github.com/open-feature/flagd/issues/1088
|
|
type oldHandler struct {
|
|
rpc.UnimplementedFlagSyncServiceServer
|
|
syncStore subscriptions.Manager
|
|
logger *logger.Logger
|
|
// ctx is used to handle SIG[INT|TERM]
|
|
ctx context.Context
|
|
}
|
|
|
|
//nolint:staticcheck
|
|
func (l *oldHandler) FetchAllFlags(ctx context.Context, req *syncv1.FetchAllFlagsRequest) (
|
|
*syncv1.FetchAllFlagsResponse,
|
|
error,
|
|
) {
|
|
data, err := l.syncStore.FetchAllFlags(ctx, req, req.GetSelector())
|
|
if err != nil {
|
|
return &syncv1.FetchAllFlagsResponse{}, fmt.Errorf("error fetching all flags from sync store: %w", err)
|
|
}
|
|
|
|
return &syncv1.FetchAllFlagsResponse{
|
|
FlagConfiguration: data.FlagData,
|
|
}, nil
|
|
}
|
|
|
|
//nolint:staticcheck
|
|
func (l *oldHandler) SyncFlags(
|
|
req *syncv1.SyncFlagsRequest,
|
|
stream rpc.FlagSyncService_SyncFlagsServer,
|
|
) error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
errChan := make(chan error)
|
|
dataSync := make(chan sync.DataSync)
|
|
l.syncStore.RegisterSubscription(ctx, req.GetSelector(), req, dataSync, errChan)
|
|
for {
|
|
select {
|
|
case e := <-errChan:
|
|
return e
|
|
case d := <-dataSync:
|
|
if err := stream.Send(&syncv1.SyncFlagsResponse{
|
|
FlagConfiguration: d.FlagData,
|
|
}); err != nil {
|
|
return fmt.Errorf("error sending configuration change event: %w", err)
|
|
}
|
|
case <-stream.Context().Done():
|
|
return nil
|
|
case <-l.ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|