feat: add server-side deadline to sync service (#1638)
<!-- Please use this template for your pull request. --> <!-- Please use the sections that you need and delete other sections --> ## This PR <!-- add the description of the PR here --> - adds server side deadline for sync and event streams configurable via cmd argument `--stream-deadline` ### Related Issues #1582 ### Notes <!-- any additional notes for this PR --> ### How to test 1. Run flagd with `--stream-deadline 3s` // 3s can be replaced with any duration the deadline should have 2. Test Event Stream deadline: run `grpcurl -v --proto schemas/protobuf/flagd/evaluation/v1/evaluation.proto -plaintext localhost:8013 flagd.evaluation.v1.Service/EventStream` or similar depending on your flagd settings to check if the deadline exceeded is returned after the specified duration 3. Test Sync Service Stream deadline: run `grpcurl -v --proto schemas/protobuf/flagd/sync/v1/sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags` or similar depending on your flagd settings to check if the deadline exceeded is returned after the specified duration Signed-off-by: alexandra.oberaigner <alexandra.oberaigner@dynatrace.com> Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
This commit is contained in:
parent
ba348152b6
commit
b70fa06b66
|
|
@ -2,6 +2,7 @@ package service
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
)
|
||||
|
|
@ -34,6 +35,7 @@ type Configuration struct {
|
|||
Options []connect.HandlerOption
|
||||
ContextValues map[string]any
|
||||
HeaderToContextKeyMappings map[string]string
|
||||
StreamDeadline time.Duration
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ flagd start [flags]
|
|||
-k, --server-key-path string Server side tls key path
|
||||
-d, --socket-path string Flagd unix socket path. With grpc the evaluations service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
|
||||
-s, --sources string JSON representation of an array of SourceConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object: https://flagd.dev/reference/sync-configuration/#source-configuration
|
||||
--stream-deadline duration Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).
|
||||
-g, --sync-port int32 gRPC Sync port (default 8015)
|
||||
-e, --sync-socket-path string Flagd sync service socket path. With grpc the sync service will be available on this address.
|
||||
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC), FeatureFlag custom resource, or GCS or Azure Blob. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ const (
|
|||
uriFlagName = "uri"
|
||||
contextValueFlagName = "context-value"
|
||||
headerToContextKeyFlagName = "context-from-header"
|
||||
streamDeadlineFlagName = "stream-deadline"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
@ -85,8 +86,9 @@ func init() {
|
|||
"from disk")
|
||||
flags.StringToStringP(contextValueFlagName, "X", map[string]string{}, "add arbitrary key value pairs "+
|
||||
"to the flag evaluation context")
|
||||
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map " +
|
||||
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map "+
|
||||
"header values to context values, where key is Header name, value is context key")
|
||||
flags.Duration(streamDeadlineFlagName, 0, "Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).")
|
||||
|
||||
bindFlags(flags)
|
||||
}
|
||||
|
|
@ -111,6 +113,7 @@ func bindFlags(flags *pflag.FlagSet) {
|
|||
_ = viper.BindPFlag(ofrepPortFlagName, flags.Lookup(ofrepPortFlagName))
|
||||
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
|
||||
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
|
||||
_ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName))
|
||||
}
|
||||
|
||||
// startCmd represents the start command
|
||||
|
|
@ -182,6 +185,7 @@ var startCmd = &cobra.Command{
|
|||
ServiceSocketPath: viper.GetString(socketPathFlagName),
|
||||
SyncServicePort: viper.GetUint16(syncPortFlagName),
|
||||
SyncServiceSocketPath: viper.GetString(syncSocketPathFlagName),
|
||||
StreamDeadline: viper.GetDuration(streamDeadlineFlagName),
|
||||
SyncProviders: syncProviders,
|
||||
ContextValues: contextValuesToMap,
|
||||
HeaderToContextKeyMappings: headerToContextKeyMappings,
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ type Config struct {
|
|||
ServiceSocketPath string
|
||||
SyncServicePort uint16
|
||||
SyncServiceSocketPath string
|
||||
StreamDeadline time.Duration
|
||||
|
||||
SyncProviders []sync.SourceConfig
|
||||
CORS []string
|
||||
|
|
@ -123,6 +124,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
|
|||
KeyPath: config.ServiceKeyPath,
|
||||
CertPath: config.ServiceCertPath,
|
||||
SocketPath: config.SyncServiceSocketPath,
|
||||
StreamDeadline: config.StreamDeadline,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating sync service: %w", err)
|
||||
|
|
@ -158,6 +160,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
|
|||
Options: options,
|
||||
ContextValues: config.ContextValues,
|
||||
HeaderToContextKeyMappings: config.HeaderToContextKeyMappings,
|
||||
StreamDeadline: config.StreamDeadline,
|
||||
},
|
||||
SyncImpl: iSyncs,
|
||||
}, nil
|
||||
|
|
|
|||
|
|
@ -173,6 +173,7 @@ func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listene
|
|||
s.metrics,
|
||||
svcConf.ContextValues,
|
||||
svcConf.HeaderToContextKeyMappings,
|
||||
svcConf.StreamDeadline,
|
||||
)
|
||||
|
||||
_, newHandler := evaluationV1.NewServiceHandler(newFes, append(svcConf.Options, marshalOpts)...)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package service
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
|
|
@ -27,6 +28,7 @@ type FlagEvaluationService struct {
|
|||
flagEvalTracer trace.Tracer
|
||||
contextValues map[string]any
|
||||
headerToContextKeyMappings map[string]string
|
||||
deadline time.Duration
|
||||
}
|
||||
|
||||
// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
|
||||
|
|
@ -36,6 +38,7 @@ func NewFlagEvaluationService(log *logger.Logger,
|
|||
metricsRecorder telemetry.IMetricsRecorder,
|
||||
contextValues map[string]any,
|
||||
headerToContextKeyMappings map[string]string,
|
||||
streamDeadline time.Duration,
|
||||
) *FlagEvaluationService {
|
||||
svc := &FlagEvaluationService{
|
||||
logger: log,
|
||||
|
|
@ -45,6 +48,7 @@ func NewFlagEvaluationService(log *logger.Logger,
|
|||
flagEvalTracer: otel.Tracer("flagd.evaluation.v1"),
|
||||
contextValues: contextValues,
|
||||
headerToContextKeyMappings: headerToContextKeyMappings,
|
||||
deadline: streamDeadline,
|
||||
}
|
||||
|
||||
if metricsRecorder != nil {
|
||||
|
|
@ -143,6 +147,15 @@ func (s *FlagEvaluationService) EventStream(
|
|||
req *connect.Request[evalV1.EventStreamRequest],
|
||||
stream *connect.ServerStream[evalV1.EventStreamResponse],
|
||||
) error {
|
||||
serviceCtx := ctx
|
||||
// attach server-side stream deadline to context
|
||||
if s.deadline != 0 {
|
||||
streamDeadline := time.Now().Add(s.deadline)
|
||||
deadlineCtx, cancel := context.WithDeadline(ctx, streamDeadline)
|
||||
serviceCtx = deadlineCtx
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
requestNotificationChan := make(chan service.Notification, 1)
|
||||
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
|
||||
defer s.eventingConfiguration.Unsubscribe(req)
|
||||
|
|
@ -171,7 +184,11 @@ func (s *FlagEvaluationService) EventStream(
|
|||
if err != nil {
|
||||
s.logger.Error(err.Error())
|
||||
}
|
||||
case <-ctx.Done():
|
||||
case <-serviceCtx.Done():
|
||||
if errors.Is(serviceCtx.Err(), context.DeadlineExceeded) {
|
||||
s.logger.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
|
||||
return connect.NewError(connect.CodeDeadlineExceeded, fmt.Errorf("%s", "stream closed due to server-side timeout"))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ func TestConnectServiceV2_ResolveAll(t *testing.T) {
|
|||
).AnyTimes()
|
||||
|
||||
metrics, exp := getMetricReader()
|
||||
s := NewFlagEvaluationService(logger.NewLogger(nil, false), eval, &eventingConfiguration{}, metrics, nil, nil)
|
||||
s := NewFlagEvaluationService(logger.NewLogger(nil, false), eval, &eventingConfiguration{}, metrics, nil, nil, 0)
|
||||
|
||||
// when
|
||||
got, err := s.ResolveAll(context.Background(), connect.NewRequest(tt.req))
|
||||
|
|
@ -222,6 +222,7 @@ func TestFlag_EvaluationV2_ResolveBoolean(t *testing.T) {
|
|||
metrics,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
got, err := s.ResolveBoolean(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
||||
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
||||
|
|
@ -279,6 +280,7 @@ func BenchmarkFlag_EvaluationV2_ResolveBoolean(b *testing.B) {
|
|||
metrics,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
b.Run(name, func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
|
@ -379,6 +381,7 @@ func TestFlag_EvaluationV2_ResolveString(t *testing.T) {
|
|||
metrics,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
got, err := s.ResolveString(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
||||
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
||||
|
|
@ -436,6 +439,7 @@ func BenchmarkFlag_EvaluationV2_ResolveString(b *testing.B) {
|
|||
metrics,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
b.Run(name, func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
|
@ -535,6 +539,7 @@ func TestFlag_EvaluationV2_ResolveFloat(t *testing.T) {
|
|||
metrics,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
got, err := s.ResolveFloat(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
||||
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
||||
|
|
@ -592,6 +597,7 @@ func BenchmarkFlag_EvaluationV2_ResolveFloat(b *testing.B) {
|
|||
metrics,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
b.Run(name, func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
|
@ -691,6 +697,7 @@ func TestFlag_EvaluationV2_ResolveInt(t *testing.T) {
|
|||
metrics,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
got, err := s.ResolveInt(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
|
||||
if (err != nil) && !errors.Is(err, tt.wantErr) {
|
||||
|
|
@ -748,6 +755,7 @@ func BenchmarkFlag_EvaluationV2_ResolveInt(b *testing.B) {
|
|||
metrics,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
b.Run(name, func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
|
@ -850,6 +858,7 @@ func TestFlag_EvaluationV2_ResolveObject(t *testing.T) {
|
|||
metrics,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
|
||||
outParsed, err := structpb.NewStruct(tt.evalFields.result)
|
||||
|
|
@ -915,6 +924,7 @@ func BenchmarkFlag_EvaluationV2_ResolveObject(b *testing.B) {
|
|||
metrics,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
if name != "eval returns error" {
|
||||
outParsed, err := structpb.NewStruct(tt.evalFields.result)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,11 @@ package sync
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"time"
|
||||
|
||||
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
|
||||
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
|
||||
|
|
@ -15,14 +19,22 @@ type syncHandler struct {
|
|||
mux *Multiplexer
|
||||
log *logger.Logger
|
||||
contextValues map[string]any
|
||||
deadline time.Duration
|
||||
}
|
||||
|
||||
func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.FlagSyncService_SyncFlagsServer) error {
|
||||
muxPayload := make(chan payload, 1)
|
||||
selector := req.GetSelector()
|
||||
|
||||
ctx := server.Context()
|
||||
|
||||
// attach server-side stream deadline to context
|
||||
if s.deadline != 0 {
|
||||
streamDeadline := time.Now().Add(s.deadline)
|
||||
deadlineCtx, cancel := context.WithDeadline(server.Context(), streamDeadline)
|
||||
ctx = deadlineCtx
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
err := s.mux.Register(ctx, selector, muxPayload)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -38,6 +50,11 @@ func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.F
|
|||
}
|
||||
case <-ctx.Done():
|
||||
s.mux.Unregister(ctx, selector)
|
||||
|
||||
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
|
||||
s.log.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
|
||||
return status.Error(codes.DeadlineExceeded, "stream closed due to server-side timeout")
|
||||
}
|
||||
s.log.Debug("context complete and exiting stream request")
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ type SvcConfigurations struct {
|
|||
CertPath string
|
||||
KeyPath string
|
||||
SocketPath string
|
||||
StreamDeadline time.Duration
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
|
|
@ -84,6 +85,7 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
|
|||
mux: mux,
|
||||
log: l,
|
||||
contextValues: cfg.ContextValues,
|
||||
deadline: cfg.StreamDeadline,
|
||||
})
|
||||
|
||||
var lis net.Listener
|
||||
|
|
|
|||
|
|
@ -3,6 +3,9 @@ package sync
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"log"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -22,31 +25,26 @@ func TestSyncServiceEndToEnd(t *testing.T) {
|
|||
clientCertPath string
|
||||
socketPath string
|
||||
tls bool
|
||||
wantErr bool
|
||||
wantStartErr bool
|
||||
}{
|
||||
{title: "with TLS Connection", certPath: "./test-cert/server-cert.pem", keyPath: "./test-cert/server-key.pem", clientCertPath: "./test-cert/ca-cert.pem", socketPath: "", tls: true, wantErr: false},
|
||||
{title: "witout TLS Connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "", tls: false, wantErr: false},
|
||||
{title: "with invalid TLS certificate path", certPath: "./lol/not/a/cert", keyPath: "./test-cert/server-key.pem", clientCertPath: "./test-cert/ca-cert.pem", socketPath: "", tls: true, wantErr: true},
|
||||
{title: "with unix socket connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "/tmp/flagd", tls: false, wantErr: false},
|
||||
{title: "with TLS Connection", certPath: "./test-cert/server-cert.pem", keyPath: "./test-cert/server-key.pem", clientCertPath: "./test-cert/ca-cert.pem", socketPath: "", tls: true, wantStartErr: false},
|
||||
{title: "without TLS Connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "", tls: false, wantStartErr: false},
|
||||
{title: "with invalid TLS certificate path", certPath: "./lol/not/a/cert", keyPath: "./test-cert/server-key.pem", clientCertPath: "./test-cert/ca-cert.pem", socketPath: "", tls: true, wantStartErr: true},
|
||||
{title: "with unix socket connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "/tmp/flagd", tls: false, wantStartErr: false},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
|
||||
// given
|
||||
port := 18016
|
||||
store, sources := getSimpleFlagStore()
|
||||
flagStore, sources := getSimpleFlagStore()
|
||||
|
||||
service, err := NewSyncService(SvcConfigurations{
|
||||
Logger: logger.NewLogger(nil, false),
|
||||
Port: uint16(port),
|
||||
Sources: sources,
|
||||
Store: store,
|
||||
CertPath: tc.certPath,
|
||||
KeyPath: tc.keyPath,
|
||||
SocketPath: tc.socketPath,
|
||||
})
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
|
||||
if tc.wantErr {
|
||||
service, doneChan, err := createAndStartSyncService(port, sources, flagStore, tc.certPath, tc.keyPath, tc.socketPath, ctx, 0)
|
||||
|
||||
if tc.wantStartErr {
|
||||
if err == nil {
|
||||
t.Fatal("expected error creating the service!")
|
||||
}
|
||||
|
|
@ -56,46 +54,8 @@ func TestSyncServiceEndToEnd(t *testing.T) {
|
|||
return
|
||||
}
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
doneChan := make(chan interface{})
|
||||
|
||||
go func() {
|
||||
// error ignored, tests will fail if start is not successful
|
||||
_ = service.Start(ctx)
|
||||
close(doneChan)
|
||||
}()
|
||||
|
||||
// trigger manual emits matching sources, so that service can start
|
||||
for _, source := range sources {
|
||||
service.Emit(false, source)
|
||||
}
|
||||
|
||||
// when - derive a client for sync service
|
||||
var con *grpc.ClientConn
|
||||
if tc.tls {
|
||||
tlsCredentials, e := loadTLSClientCredentials(tc.clientCertPath)
|
||||
if e != nil {
|
||||
log.Fatal("cannot load TLS credentials: ", e)
|
||||
}
|
||||
con, err = grpc.Dial(fmt.Sprintf("0.0.0.0:%d", port), grpc.WithTransportCredentials(tlsCredentials))
|
||||
} else {
|
||||
if tc.socketPath != "" {
|
||||
con, err = grpc.Dial(
|
||||
fmt.Sprintf("unix://%s", tc.socketPath),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithTimeout(2*time.Second),
|
||||
)
|
||||
} else {
|
||||
con, err = grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Printf("error creating grpc dial ctx: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
serviceClient := syncv1grpc.NewFlagSyncServiceClient(con)
|
||||
serviceClient := getSyncClient(t, tc.clientCertPath, tc.socketPath, tc.tls, port, ctx)
|
||||
|
||||
// then
|
||||
|
||||
|
|
@ -192,3 +152,140 @@ func TestSyncServiceEndToEnd(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
|
||||
testCases := []struct {
|
||||
title string
|
||||
deadline time.Duration
|
||||
}{
|
||||
{title: "without deadline", deadline: 0},
|
||||
{title: "with deadline", deadline: 2 * time.Second},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
|
||||
|
||||
// given
|
||||
port := 18016
|
||||
flagStore, sources := getSimpleFlagStore()
|
||||
certPath := "./test-cert/server-cert.pem"
|
||||
keyPath := "./test-cert/server-key.pem"
|
||||
socketPath := ""
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
|
||||
_, _, err := createAndStartSyncService(port, sources, flagStore, certPath, keyPath, socketPath, ctx, tc.deadline)
|
||||
if err != nil {
|
||||
t.Fatal("error creating sync service")
|
||||
}
|
||||
|
||||
// when - derive a client for sync service
|
||||
serviceClient := getSyncClient(t, "./test-cert/ca-cert.pem", "", true, port, nil)
|
||||
|
||||
// then
|
||||
|
||||
// sync flags request
|
||||
flags, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{})
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Printf("error from sync request: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
dataChan := make(chan any)
|
||||
errorChan := make(chan error)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
data, err := flags.Recv()
|
||||
dataChan <- data
|
||||
|
||||
if err != nil {
|
||||
errorChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-dataChan:
|
||||
// received data, continuing..
|
||||
break
|
||||
case err := <-errorChan:
|
||||
st, _ := status.FromError(err)
|
||||
if st.Code() == codes.DeadlineExceeded {
|
||||
if tc.deadline == 0 {
|
||||
t.Fatal("ran into deadline exceeded error even though no deadline was configured.")
|
||||
}
|
||||
// expected error due to deadline
|
||||
return
|
||||
}
|
||||
t.Fatal("unexpected error: ", err)
|
||||
case <-time.After(tc.deadline + 1*time.Second):
|
||||
if tc.deadline == 0 {
|
||||
return
|
||||
}
|
||||
t.Fatal("not expected as the deadline should result in other cases.")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func createAndStartSyncService(port int, sources []string, store *store.State, certPath string, keyPath string, socketPath string, ctx context.Context, deadline time.Duration) (*Service, chan interface{}, error) {
|
||||
service, err := NewSyncService(SvcConfigurations{
|
||||
Logger: logger.NewLogger(nil, false),
|
||||
Port: uint16(port),
|
||||
Sources: sources,
|
||||
Store: store,
|
||||
CertPath: certPath,
|
||||
KeyPath: keyPath,
|
||||
SocketPath: socketPath,
|
||||
StreamDeadline: deadline,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
doneChan := make(chan interface{})
|
||||
go func() {
|
||||
// error ignored, tests will fail if start is not successful
|
||||
_ = service.Start(ctx)
|
||||
close(doneChan)
|
||||
}()
|
||||
// trigger manual emits matching sources, so that service can start
|
||||
for _, source := range sources {
|
||||
service.Emit(false, source)
|
||||
}
|
||||
return service, doneChan, err
|
||||
}
|
||||
|
||||
func getSyncClient(t *testing.T, clientCertPath string, socketPath string, tls bool, port int, ctx context.Context) syncv1grpc.FlagSyncServiceClient {
|
||||
var con *grpc.ClientConn
|
||||
var err error
|
||||
if tls {
|
||||
tlsCredentials, e := loadTLSClientCredentials(clientCertPath)
|
||||
if e != nil {
|
||||
log.Fatal("cannot load TLS credentials: ", e)
|
||||
}
|
||||
con, err = grpc.Dial(fmt.Sprintf("0.0.0.0:%d", port), grpc.WithTransportCredentials(tlsCredentials))
|
||||
} else {
|
||||
if socketPath != "" {
|
||||
con, err = grpc.Dial(
|
||||
fmt.Sprintf("unix://%s", socketPath),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithTimeout(2*time.Second),
|
||||
)
|
||||
} else {
|
||||
con, err = grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Printf("error creating grpc dial ctx: %v", err))
|
||||
}
|
||||
|
||||
serviceClient := syncv1grpc.NewFlagSyncServiceClient(con)
|
||||
return serviceClient
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue