feat: add server-side deadline to sync service (#1638)
<!-- Please use this template for your pull request. --> <!-- Please use the sections that you need and delete other sections --> ## This PR <!-- add the description of the PR here --> - adds server side deadline for sync and event streams configurable via cmd argument `--stream-deadline` ### Related Issues #1582 ### Notes <!-- any additional notes for this PR --> ### How to test 1. Run flagd with `--stream-deadline 3s` // 3s can be replaced with any duration the deadline should have 2. Test Event Stream deadline: run `grpcurl -v --proto schemas/protobuf/flagd/evaluation/v1/evaluation.proto -plaintext localhost:8013 flagd.evaluation.v1.Service/EventStream` or similar depending on your flagd settings to check if the deadline exceeded is returned after the specified duration 3. Test Sync Service Stream deadline: run `grpcurl -v --proto schemas/protobuf/flagd/sync/v1/sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags` or similar depending on your flagd settings to check if the deadline exceeded is returned after the specified duration Signed-off-by: alexandra.oberaigner <alexandra.oberaigner@dynatrace.com> Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
This commit is contained in:
parent
ba348152b6
commit
b70fa06b66
|
|
@ -2,6 +2,7 @@ package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"connectrpc.com/connect"
|
"connectrpc.com/connect"
|
||||||
)
|
)
|
||||||
|
|
@ -34,6 +35,7 @@ type Configuration struct {
|
||||||
Options []connect.HandlerOption
|
Options []connect.HandlerOption
|
||||||
ContextValues map[string]any
|
ContextValues map[string]any
|
||||||
HeaderToContextKeyMappings map[string]string
|
HeaderToContextKeyMappings map[string]string
|
||||||
|
StreamDeadline time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ flagd start [flags]
|
||||||
-k, --server-key-path string Server side tls key path
|
-k, --server-key-path string Server side tls key path
|
||||||
-d, --socket-path string Flagd unix socket path. With grpc the evaluations service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
|
-d, --socket-path string Flagd unix socket path. With grpc the evaluations service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
|
||||||
-s, --sources string JSON representation of an array of SourceConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object: https://flagd.dev/reference/sync-configuration/#source-configuration
|
-s, --sources string JSON representation of an array of SourceConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object: https://flagd.dev/reference/sync-configuration/#source-configuration
|
||||||
|
--stream-deadline duration Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).
|
||||||
-g, --sync-port int32 gRPC Sync port (default 8015)
|
-g, --sync-port int32 gRPC Sync port (default 8015)
|
||||||
-e, --sync-socket-path string Flagd sync service socket path. With grpc the sync service will be available on this address.
|
-e, --sync-socket-path string Flagd sync service socket path. With grpc the sync service will be available on this address.
|
||||||
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC), FeatureFlag custom resource, or GCS or Azure Blob. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
|
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC), FeatureFlag custom resource, or GCS or Azure Blob. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ const (
|
||||||
uriFlagName = "uri"
|
uriFlagName = "uri"
|
||||||
contextValueFlagName = "context-value"
|
contextValueFlagName = "context-value"
|
||||||
headerToContextKeyFlagName = "context-from-header"
|
headerToContextKeyFlagName = "context-from-header"
|
||||||
|
streamDeadlineFlagName = "stream-deadline"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
@ -85,8 +86,9 @@ func init() {
|
||||||
"from disk")
|
"from disk")
|
||||||
flags.StringToStringP(contextValueFlagName, "X", map[string]string{}, "add arbitrary key value pairs "+
|
flags.StringToStringP(contextValueFlagName, "X", map[string]string{}, "add arbitrary key value pairs "+
|
||||||
"to the flag evaluation context")
|
"to the flag evaluation context")
|
||||||
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map " +
|
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map "+
|
||||||
"header values to context values, where key is Header name, value is context key")
|
"header values to context values, where key is Header name, value is context key")
|
||||||
|
flags.Duration(streamDeadlineFlagName, 0, "Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).")
|
||||||
|
|
||||||
bindFlags(flags)
|
bindFlags(flags)
|
||||||
}
|
}
|
||||||
|
|
@ -111,6 +113,7 @@ func bindFlags(flags *pflag.FlagSet) {
|
||||||
_ = viper.BindPFlag(ofrepPortFlagName, flags.Lookup(ofrepPortFlagName))
|
_ = viper.BindPFlag(ofrepPortFlagName, flags.Lookup(ofrepPortFlagName))
|
||||||
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
|
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
|
||||||
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
|
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
|
||||||
|
_ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName))
|
||||||
}
|
}
|
||||||
|
|
||||||
// startCmd represents the start command
|
// startCmd represents the start command
|
||||||
|
|
@ -182,6 +185,7 @@ var startCmd = &cobra.Command{
|
||||||
ServiceSocketPath: viper.GetString(socketPathFlagName),
|
ServiceSocketPath: viper.GetString(socketPathFlagName),
|
||||||
SyncServicePort: viper.GetUint16(syncPortFlagName),
|
SyncServicePort: viper.GetUint16(syncPortFlagName),
|
||||||
SyncServiceSocketPath: viper.GetString(syncSocketPathFlagName),
|
SyncServiceSocketPath: viper.GetString(syncSocketPathFlagName),
|
||||||
|
StreamDeadline: viper.GetDuration(streamDeadlineFlagName),
|
||||||
SyncProviders: syncProviders,
|
SyncProviders: syncProviders,
|
||||||
ContextValues: contextValuesToMap,
|
ContextValues: contextValuesToMap,
|
||||||
HeaderToContextKeyMappings: headerToContextKeyMappings,
|
HeaderToContextKeyMappings: headerToContextKeyMappings,
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ type Config struct {
|
||||||
ServiceSocketPath string
|
ServiceSocketPath string
|
||||||
SyncServicePort uint16
|
SyncServicePort uint16
|
||||||
SyncServiceSocketPath string
|
SyncServiceSocketPath string
|
||||||
|
StreamDeadline time.Duration
|
||||||
|
|
||||||
SyncProviders []sync.SourceConfig
|
SyncProviders []sync.SourceConfig
|
||||||
CORS []string
|
CORS []string
|
||||||
|
|
@ -115,14 +116,15 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
|
||||||
|
|
||||||
// flag sync service
|
// flag sync service
|
||||||
flagSyncService, err := flagsync.NewSyncService(flagsync.SvcConfigurations{
|
flagSyncService, err := flagsync.NewSyncService(flagsync.SvcConfigurations{
|
||||||
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
|
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
|
||||||
Port: config.SyncServicePort,
|
Port: config.SyncServicePort,
|
||||||
Sources: sources,
|
Sources: sources,
|
||||||
Store: s,
|
Store: s,
|
||||||
ContextValues: config.ContextValues,
|
ContextValues: config.ContextValues,
|
||||||
KeyPath: config.ServiceKeyPath,
|
KeyPath: config.ServiceKeyPath,
|
||||||
CertPath: config.ServiceCertPath,
|
CertPath: config.ServiceCertPath,
|
||||||
SocketPath: config.SyncServiceSocketPath,
|
SocketPath: config.SyncServiceSocketPath,
|
||||||
|
StreamDeadline: config.StreamDeadline,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating sync service: %w", err)
|
return nil, fmt.Errorf("error creating sync service: %w", err)
|
||||||
|
|
@ -158,6 +160,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
|
||||||
Options: options,
|
Options: options,
|
||||||
ContextValues: config.ContextValues,
|
ContextValues: config.ContextValues,
|
||||||
HeaderToContextKeyMappings: config.HeaderToContextKeyMappings,
|
HeaderToContextKeyMappings: config.HeaderToContextKeyMappings,
|
||||||
|
StreamDeadline: config.StreamDeadline,
|
||||||
},
|
},
|
||||||
SyncImpl: iSyncs,
|
SyncImpl: iSyncs,
|
||||||
}, nil
|
}, nil
|
||||||
|
|
|
||||||
|
|
@ -173,6 +173,7 @@ func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listene
|
||||||
s.metrics,
|
s.metrics,
|
||||||
svcConf.ContextValues,
|
svcConf.ContextValues,
|
||||||
svcConf.HeaderToContextKeyMappings,
|
svcConf.HeaderToContextKeyMappings,
|
||||||
|
svcConf.StreamDeadline,
|
||||||
)
|
)
|
||||||
|
|
||||||
_, newHandler := evaluationV1.NewServiceHandler(newFes, append(svcConf.Options, marshalOpts)...)
|
_, newHandler := evaluationV1.NewServiceHandler(newFes, append(svcConf.Options, marshalOpts)...)
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -27,6 +28,7 @@ type FlagEvaluationService struct {
|
||||||
flagEvalTracer trace.Tracer
|
flagEvalTracer trace.Tracer
|
||||||
contextValues map[string]any
|
contextValues map[string]any
|
||||||
headerToContextKeyMappings map[string]string
|
headerToContextKeyMappings map[string]string
|
||||||
|
deadline time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
|
// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
|
||||||
|
|
@ -36,6 +38,7 @@ func NewFlagEvaluationService(log *logger.Logger,
|
||||||
metricsRecorder telemetry.IMetricsRecorder,
|
metricsRecorder telemetry.IMetricsRecorder,
|
||||||
contextValues map[string]any,
|
contextValues map[string]any,
|
||||||
headerToContextKeyMappings map[string]string,
|
headerToContextKeyMappings map[string]string,
|
||||||
|
streamDeadline time.Duration,
|
||||||
) *FlagEvaluationService {
|
) *FlagEvaluationService {
|
||||||
svc := &FlagEvaluationService{
|
svc := &FlagEvaluationService{
|
||||||
logger: log,
|
logger: log,
|
||||||
|
|
@ -45,6 +48,7 @@ func NewFlagEvaluationService(log *logger.Logger,
|
||||||
flagEvalTracer: otel.Tracer("flagd.evaluation.v1"),
|
flagEvalTracer: otel.Tracer("flagd.evaluation.v1"),
|
||||||
contextValues: contextValues,
|
contextValues: contextValues,
|
||||||
headerToContextKeyMappings: headerToContextKeyMappings,
|
headerToContextKeyMappings: headerToContextKeyMappings,
|
||||||
|
deadline: streamDeadline,
|
||||||
}
|
}
|
||||||
|
|
||||||
if metricsRecorder != nil {
|
if metricsRecorder != nil {
|
||||||
|
|
@ -143,6 +147,15 @@ func (s *FlagEvaluationService) EventStream(
|
||||||
req *connect.Request[evalV1.EventStreamRequest],
|
req *connect.Request[evalV1.EventStreamRequest],
|
||||||
stream *connect.ServerStream[evalV1.EventStreamResponse],
|
stream *connect.ServerStream[evalV1.EventStreamResponse],
|
||||||
) error {
|
) error {
|
||||||
|
serviceCtx := ctx
|
||||||
|
// attach server-side stream deadline to context
|
||||||
|
if s.deadline != 0 {
|
||||||
|
streamDeadline := time.Now().Add(s.deadline)
|
||||||
|
deadlineCtx, cancel := context.WithDeadline(ctx, streamDeadline)
|
||||||
|
serviceCtx = deadlineCtx
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
requestNotificationChan := make(chan service.Notification, 1)
|
requestNotificationChan := make(chan service.Notification, 1)
|
||||||
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
|
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
|
||||||
defer s.eventingConfiguration.Unsubscribe(req)
|
defer s.eventingConfiguration.Unsubscribe(req)
|
||||||
|
|
@ -171,7 +184,11 @@ func (s *FlagEvaluationService) EventStream(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error(err.Error())
|
s.logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-serviceCtx.Done():
|
||||||
|
if errors.Is(serviceCtx.Err(), context.DeadlineExceeded) {
|
||||||
|
s.logger.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
|
||||||
|
return connect.NewError(connect.CodeDeadlineExceeded, fmt.Errorf("%s", "stream closed due to server-side timeout"))
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -104,7 +104,7 @@ func TestConnectServiceV2_ResolveAll(t *testing.T) {
|
||||||
).AnyTimes()
|
).AnyTimes()
|
||||||
|
|
||||||
metrics, exp := getMetricReader()
|
metrics, exp := getMetricReader()
|
||||||
s := NewFlagEvaluationService(logger.NewLogger(nil, false), eval, &eventingConfiguration{}, metrics, nil, nil)
|
s := NewFlagEvaluationService(logger.NewLogger(nil, false), eval, &eventingConfiguration{}, metrics, nil, nil, 0)
|
||||||
|
|
||||||
// when
|
// when
|
||||||
got, err := s.ResolveAll(context.Background(), connect.NewRequest(tt.req))
|
got, err := s.ResolveAll(context.Background(), connect.NewRequest(tt.req))
|
||||||
|
|
@ -222,6 +222,7 @@ func TestFlag_EvaluationV2_ResolveBoolean(t *testing.T) {
|
||||||
metrics,
|
metrics,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
got, err := s.ResolveBoolean(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
got, err := s.ResolveBoolean(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
||||||
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
||||||
|
|
@ -279,6 +280,7 @@ func BenchmarkFlag_EvaluationV2_ResolveBoolean(b *testing.B) {
|
||||||
metrics,
|
metrics,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
b.Run(name, func(b *testing.B) {
|
b.Run(name, func(b *testing.B) {
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
|
|
@ -379,6 +381,7 @@ func TestFlag_EvaluationV2_ResolveString(t *testing.T) {
|
||||||
metrics,
|
metrics,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
got, err := s.ResolveString(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
got, err := s.ResolveString(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
||||||
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
||||||
|
|
@ -436,6 +439,7 @@ func BenchmarkFlag_EvaluationV2_ResolveString(b *testing.B) {
|
||||||
metrics,
|
metrics,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
b.Run(name, func(b *testing.B) {
|
b.Run(name, func(b *testing.B) {
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
|
|
@ -535,6 +539,7 @@ func TestFlag_EvaluationV2_ResolveFloat(t *testing.T) {
|
||||||
metrics,
|
metrics,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
got, err := s.ResolveFloat(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
got, err := s.ResolveFloat(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
||||||
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
||||||
|
|
@ -592,6 +597,7 @@ func BenchmarkFlag_EvaluationV2_ResolveFloat(b *testing.B) {
|
||||||
metrics,
|
metrics,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
b.Run(name, func(b *testing.B) {
|
b.Run(name, func(b *testing.B) {
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
|
|
@ -691,6 +697,7 @@ func TestFlag_EvaluationV2_ResolveInt(t *testing.T) {
|
||||||
metrics,
|
metrics,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
got, err := s.ResolveInt(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
got, err := s.ResolveInt(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
||||||
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
||||||
|
|
@ -748,6 +755,7 @@ func BenchmarkFlag_EvaluationV2_ResolveInt(b *testing.B) {
|
||||||
metrics,
|
metrics,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
b.Run(name, func(b *testing.B) {
|
b.Run(name, func(b *testing.B) {
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
|
|
@ -850,6 +858,7 @@ func TestFlag_EvaluationV2_ResolveObject(t *testing.T) {
|
||||||
metrics,
|
metrics,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
|
|
||||||
outParsed, err := structpb.NewStruct(tt.evalFields.result)
|
outParsed, err := structpb.NewStruct(tt.evalFields.result)
|
||||||
|
|
@ -915,6 +924,7 @@ func BenchmarkFlag_EvaluationV2_ResolveObject(b *testing.B) {
|
||||||
metrics,
|
metrics,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
if name != "eval returns error" {
|
if name != "eval returns error" {
|
||||||
outParsed, err := structpb.NewStruct(tt.evalFields.result)
|
outParsed, err := structpb.NewStruct(tt.evalFields.result)
|
||||||
|
|
@ -1004,9 +1014,9 @@ func Test_mergeContexts(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "merge contexts with no headers, with no header-context mappings",
|
name: "merge contexts with no headers, with no header-context mappings",
|
||||||
args: args{
|
args: args{
|
||||||
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
|
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
|
||||||
configContext: map[string]any{"k2": "v22", "k3": "v3"},
|
configContext: map[string]any{"k2": "v22", "k3": "v3"},
|
||||||
headers: http.Header{},
|
headers: http.Header{},
|
||||||
headerToContextKeyMappings: map[string]string{},
|
headerToContextKeyMappings: map[string]string{},
|
||||||
},
|
},
|
||||||
// static context should "win"
|
// static context should "win"
|
||||||
|
|
@ -1015,9 +1025,9 @@ func Test_mergeContexts(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "merge contexts with headers, with no header-context mappings",
|
name: "merge contexts with headers, with no header-context mappings",
|
||||||
args: args{
|
args: args{
|
||||||
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
|
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
|
||||||
configContext: map[string]any{"k2": "v22", "k3": "v3"},
|
configContext: map[string]any{"k2": "v22", "k3": "v3"},
|
||||||
headers: http.Header{"X-key": []string{"value"}, "X-token": []string{"token"}},
|
headers: http.Header{"X-key": []string{"value"}, "X-token": []string{"token"}},
|
||||||
headerToContextKeyMappings: map[string]string{},
|
headerToContextKeyMappings: map[string]string{},
|
||||||
},
|
},
|
||||||
// static context should "win"
|
// static context should "win"
|
||||||
|
|
@ -1026,9 +1036,9 @@ func Test_mergeContexts(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "merge contexts with no headers, with header-context mappings",
|
name: "merge contexts with no headers, with header-context mappings",
|
||||||
args: args{
|
args: args{
|
||||||
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
|
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
|
||||||
configContext: map[string]any{"k2": "v22", "k3": "v3"},
|
configContext: map[string]any{"k2": "v22", "k3": "v3"},
|
||||||
headers: http.Header{},
|
headers: http.Header{},
|
||||||
headerToContextKeyMappings: map[string]string{"X-key": "k2"},
|
headerToContextKeyMappings: map[string]string{"X-key": "k2"},
|
||||||
},
|
},
|
||||||
// static context should "win"
|
// static context should "win"
|
||||||
|
|
@ -1037,9 +1047,9 @@ func Test_mergeContexts(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "merge contexts with headers, with header-context mappings",
|
name: "merge contexts with headers, with header-context mappings",
|
||||||
args: args{
|
args: args{
|
||||||
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
|
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
|
||||||
configContext: map[string]any{"k2": "v22", "k3": "v3"},
|
configContext: map[string]any{"k2": "v22", "k3": "v3"},
|
||||||
headers: http.Header{"X-key": []string{"value"}, "X-token": []string{"token"}},
|
headers: http.Header{"X-key": []string{"value"}, "X-token": []string{"token"}},
|
||||||
headerToContextKeyMappings: map[string]string{"X-key": "k2"},
|
headerToContextKeyMappings: map[string]string{"X-key": "k2"},
|
||||||
},
|
},
|
||||||
// header context should "win"
|
// header context should "win"
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,11 @@ package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
"time"
|
||||||
|
|
||||||
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
|
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
|
||||||
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
|
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
|
||||||
|
|
@ -15,14 +19,22 @@ type syncHandler struct {
|
||||||
mux *Multiplexer
|
mux *Multiplexer
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
contextValues map[string]any
|
contextValues map[string]any
|
||||||
|
deadline time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.FlagSyncService_SyncFlagsServer) error {
|
func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.FlagSyncService_SyncFlagsServer) error {
|
||||||
muxPayload := make(chan payload, 1)
|
muxPayload := make(chan payload, 1)
|
||||||
selector := req.GetSelector()
|
selector := req.GetSelector()
|
||||||
|
|
||||||
ctx := server.Context()
|
ctx := server.Context()
|
||||||
|
|
||||||
|
// attach server-side stream deadline to context
|
||||||
|
if s.deadline != 0 {
|
||||||
|
streamDeadline := time.Now().Add(s.deadline)
|
||||||
|
deadlineCtx, cancel := context.WithDeadline(server.Context(), streamDeadline)
|
||||||
|
ctx = deadlineCtx
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
err := s.mux.Register(ctx, selector, muxPayload)
|
err := s.mux.Register(ctx, selector, muxPayload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -38,6 +50,11 @@ func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.F
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
s.mux.Unregister(ctx, selector)
|
s.mux.Unregister(ctx, selector)
|
||||||
|
|
||||||
|
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
|
||||||
|
s.log.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
|
||||||
|
return status.Error(codes.DeadlineExceeded, "stream closed due to server-side timeout")
|
||||||
|
}
|
||||||
s.log.Debug("context complete and exiting stream request")
|
s.log.Debug("context complete and exiting stream request")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,14 +25,15 @@ type ISyncService interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type SvcConfigurations struct {
|
type SvcConfigurations struct {
|
||||||
Logger *logger.Logger
|
Logger *logger.Logger
|
||||||
Port uint16
|
Port uint16
|
||||||
Sources []string
|
Sources []string
|
||||||
Store *store.State
|
Store *store.State
|
||||||
ContextValues map[string]any
|
ContextValues map[string]any
|
||||||
CertPath string
|
CertPath string
|
||||||
KeyPath string
|
KeyPath string
|
||||||
SocketPath string
|
SocketPath string
|
||||||
|
StreamDeadline time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
|
|
@ -84,6 +85,7 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
|
||||||
mux: mux,
|
mux: mux,
|
||||||
log: l,
|
log: l,
|
||||||
contextValues: cfg.ContextValues,
|
contextValues: cfg.ContextValues,
|
||||||
|
deadline: cfg.StreamDeadline,
|
||||||
})
|
})
|
||||||
|
|
||||||
var lis net.Listener
|
var lis net.Listener
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,9 @@ package sync
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/open-feature/flagd/core/pkg/store"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
"log"
|
"log"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -22,31 +25,26 @@ func TestSyncServiceEndToEnd(t *testing.T) {
|
||||||
clientCertPath string
|
clientCertPath string
|
||||||
socketPath string
|
socketPath string
|
||||||
tls bool
|
tls bool
|
||||||
wantErr bool
|
wantStartErr bool
|
||||||
}{
|
}{
|
||||||
{title: "with TLS Connection", certPath: "./test-cert/server-cert.pem", keyPath: "./test-cert/server-key.pem", clientCertPath: "./test-cert/ca-cert.pem", socketPath: "", tls: true, wantErr: false},
|
{title: "with TLS Connection", certPath: "./test-cert/server-cert.pem", keyPath: "./test-cert/server-key.pem", clientCertPath: "./test-cert/ca-cert.pem", socketPath: "", tls: true, wantStartErr: false},
|
||||||
{title: "witout TLS Connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "", tls: false, wantErr: false},
|
{title: "without TLS Connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "", tls: false, wantStartErr: false},
|
||||||
{title: "with invalid TLS certificate path", certPath: "./lol/not/a/cert", keyPath: "./test-cert/server-key.pem", clientCertPath: "./test-cert/ca-cert.pem", socketPath: "", tls: true, wantErr: true},
|
{title: "with invalid TLS certificate path", certPath: "./lol/not/a/cert", keyPath: "./test-cert/server-key.pem", clientCertPath: "./test-cert/ca-cert.pem", socketPath: "", tls: true, wantStartErr: true},
|
||||||
{title: "with unix socket connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "/tmp/flagd", tls: false, wantErr: false},
|
{title: "with unix socket connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "/tmp/flagd", tls: false, wantStartErr: false},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
|
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
|
||||||
// given
|
// given
|
||||||
port := 18016
|
port := 18016
|
||||||
store, sources := getSimpleFlagStore()
|
flagStore, sources := getSimpleFlagStore()
|
||||||
|
|
||||||
service, err := NewSyncService(SvcConfigurations{
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||||
Logger: logger.NewLogger(nil, false),
|
defer cancelFunc()
|
||||||
Port: uint16(port),
|
|
||||||
Sources: sources,
|
|
||||||
Store: store,
|
|
||||||
CertPath: tc.certPath,
|
|
||||||
KeyPath: tc.keyPath,
|
|
||||||
SocketPath: tc.socketPath,
|
|
||||||
})
|
|
||||||
|
|
||||||
if tc.wantErr {
|
service, doneChan, err := createAndStartSyncService(port, sources, flagStore, tc.certPath, tc.keyPath, tc.socketPath, ctx, 0)
|
||||||
|
|
||||||
|
if tc.wantStartErr {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatal("expected error creating the service!")
|
t.Fatal("expected error creating the service!")
|
||||||
}
|
}
|
||||||
|
|
@ -56,46 +54,8 @@ func TestSyncServiceEndToEnd(t *testing.T) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
||||||
doneChan := make(chan interface{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
// error ignored, tests will fail if start is not successful
|
|
||||||
_ = service.Start(ctx)
|
|
||||||
close(doneChan)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// trigger manual emits matching sources, so that service can start
|
|
||||||
for _, source := range sources {
|
|
||||||
service.Emit(false, source)
|
|
||||||
}
|
|
||||||
|
|
||||||
// when - derive a client for sync service
|
// when - derive a client for sync service
|
||||||
var con *grpc.ClientConn
|
serviceClient := getSyncClient(t, tc.clientCertPath, tc.socketPath, tc.tls, port, ctx)
|
||||||
if tc.tls {
|
|
||||||
tlsCredentials, e := loadTLSClientCredentials(tc.clientCertPath)
|
|
||||||
if e != nil {
|
|
||||||
log.Fatal("cannot load TLS credentials: ", e)
|
|
||||||
}
|
|
||||||
con, err = grpc.Dial(fmt.Sprintf("0.0.0.0:%d", port), grpc.WithTransportCredentials(tlsCredentials))
|
|
||||||
} else {
|
|
||||||
if tc.socketPath != "" {
|
|
||||||
con, err = grpc.Dial(
|
|
||||||
fmt.Sprintf("unix://%s", tc.socketPath),
|
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
||||||
grpc.WithBlock(),
|
|
||||||
grpc.WithTimeout(2*time.Second),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
con, err = grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(fmt.Printf("error creating grpc dial ctx: %v", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceClient := syncv1grpc.NewFlagSyncServiceClient(con)
|
|
||||||
|
|
||||||
// then
|
// then
|
||||||
|
|
||||||
|
|
@ -192,3 +152,140 @@ func TestSyncServiceEndToEnd(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
title string
|
||||||
|
deadline time.Duration
|
||||||
|
}{
|
||||||
|
{title: "without deadline", deadline: 0},
|
||||||
|
{title: "with deadline", deadline: 2 * time.Second},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
|
||||||
|
|
||||||
|
// given
|
||||||
|
port := 18016
|
||||||
|
flagStore, sources := getSimpleFlagStore()
|
||||||
|
certPath := "./test-cert/server-cert.pem"
|
||||||
|
keyPath := "./test-cert/server-key.pem"
|
||||||
|
socketPath := ""
|
||||||
|
|
||||||
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||||
|
defer cancelFunc()
|
||||||
|
|
||||||
|
_, _, err := createAndStartSyncService(port, sources, flagStore, certPath, keyPath, socketPath, ctx, tc.deadline)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating sync service")
|
||||||
|
}
|
||||||
|
|
||||||
|
// when - derive a client for sync service
|
||||||
|
serviceClient := getSyncClient(t, "./test-cert/ca-cert.pem", "", true, port, nil)
|
||||||
|
|
||||||
|
// then
|
||||||
|
|
||||||
|
// sync flags request
|
||||||
|
flags, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(fmt.Printf("error from sync request: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dataChan := make(chan any)
|
||||||
|
errorChan := make(chan error)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
data, err := flags.Recv()
|
||||||
|
dataChan <- data
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
errorChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-dataChan:
|
||||||
|
// received data, continuing..
|
||||||
|
break
|
||||||
|
case err := <-errorChan:
|
||||||
|
st, _ := status.FromError(err)
|
||||||
|
if st.Code() == codes.DeadlineExceeded {
|
||||||
|
if tc.deadline == 0 {
|
||||||
|
t.Fatal("ran into deadline exceeded error even though no deadline was configured.")
|
||||||
|
}
|
||||||
|
// expected error due to deadline
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Fatal("unexpected error: ", err)
|
||||||
|
case <-time.After(tc.deadline + 1*time.Second):
|
||||||
|
if tc.deadline == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Fatal("not expected as the deadline should result in other cases.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createAndStartSyncService(port int, sources []string, store *store.State, certPath string, keyPath string, socketPath string, ctx context.Context, deadline time.Duration) (*Service, chan interface{}, error) {
|
||||||
|
service, err := NewSyncService(SvcConfigurations{
|
||||||
|
Logger: logger.NewLogger(nil, false),
|
||||||
|
Port: uint16(port),
|
||||||
|
Sources: sources,
|
||||||
|
Store: store,
|
||||||
|
CertPath: certPath,
|
||||||
|
KeyPath: keyPath,
|
||||||
|
SocketPath: socketPath,
|
||||||
|
StreamDeadline: deadline,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
doneChan := make(chan interface{})
|
||||||
|
go func() {
|
||||||
|
// error ignored, tests will fail if start is not successful
|
||||||
|
_ = service.Start(ctx)
|
||||||
|
close(doneChan)
|
||||||
|
}()
|
||||||
|
// trigger manual emits matching sources, so that service can start
|
||||||
|
for _, source := range sources {
|
||||||
|
service.Emit(false, source)
|
||||||
|
}
|
||||||
|
return service, doneChan, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSyncClient(t *testing.T, clientCertPath string, socketPath string, tls bool, port int, ctx context.Context) syncv1grpc.FlagSyncServiceClient {
|
||||||
|
var con *grpc.ClientConn
|
||||||
|
var err error
|
||||||
|
if tls {
|
||||||
|
tlsCredentials, e := loadTLSClientCredentials(clientCertPath)
|
||||||
|
if e != nil {
|
||||||
|
log.Fatal("cannot load TLS credentials: ", e)
|
||||||
|
}
|
||||||
|
con, err = grpc.Dial(fmt.Sprintf("0.0.0.0:%d", port), grpc.WithTransportCredentials(tlsCredentials))
|
||||||
|
} else {
|
||||||
|
if socketPath != "" {
|
||||||
|
con, err = grpc.Dial(
|
||||||
|
fmt.Sprintf("unix://%s", socketPath),
|
||||||
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
|
grpc.WithBlock(),
|
||||||
|
grpc.WithTimeout(2*time.Second),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
con, err = grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(fmt.Printf("error creating grpc dial ctx: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceClient := syncv1grpc.NewFlagSyncServiceClient(con)
|
||||||
|
return serviceClient
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue