diff --git a/flagd/pkg/service/flag-evaluation/connect_service.go b/flagd/pkg/service/flag-evaluation/connect_service.go index 6a171ea0..d66c9c7f 100644 --- a/flagd/pkg/service/flag-evaluation/connect_service.go +++ b/flagd/pkg/service/flag-evaluation/connect_service.go @@ -58,7 +58,7 @@ type ConnectService struct { logger *logger.Logger eval evaluator.IEvaluator metrics *telemetry.MetricsRecorder - eventingConfiguration *eventingConfiguration + eventingConfiguration IEvents server *http.Server metricsServer *http.Server @@ -125,7 +125,7 @@ func (s *ConnectService) Serve(ctx context.Context, svcConf service.Configuratio // Notify emits change event notifications for subscriptions func (s *ConnectService) Notify(n service.Notification) { - s.eventingConfiguration.emitToAll(n) + s.eventingConfiguration.EmitToAll(n) } // nolint: funlen @@ -209,7 +209,7 @@ func (s *ConnectService) AddMiddleware(mw middleware.IMiddleware) { func (s *ConnectService) Shutdown() { s.readinessEnabled = false - s.eventingConfiguration.emitToAll(service.Notification{ + s.eventingConfiguration.EmitToAll(service.Notification{ Type: service.Shutdown, Data: map[string]interface{}{}, }) diff --git a/flagd/pkg/service/flag-evaluation/connect_service_test.go b/flagd/pkg/service/flag-evaluation/connect_service_test.go index 727ba60c..3bb3fe00 100644 --- a/flagd/pkg/service/flag-evaluation/connect_service_test.go +++ b/flagd/pkg/service/flag-evaluation/connect_service_test.go @@ -182,7 +182,7 @@ func TestConnectServiceNotify(t *testing.T) { sChan := make(chan iservice.Notification, 1) eventing := service.eventingConfiguration - eventing.subs["key"] = sChan + eventing.Subscribe("key", sChan) // notification type ofType := iservice.ConfigurationChange @@ -220,7 +220,7 @@ func TestConnectServiceShutdown(t *testing.T) { sChan := make(chan iservice.Notification, 1) eventing := service.eventingConfiguration - eventing.subs["key"] = sChan + eventing.Subscribe("key", sChan) // notification type ofType := iservice.Shutdown diff --git a/flagd/pkg/service/flag-evaluation/eventing.go b/flagd/pkg/service/flag-evaluation/eventing.go index 79fa89e1..977a0232 100644 --- a/flagd/pkg/service/flag-evaluation/eventing.go +++ b/flagd/pkg/service/flag-evaluation/eventing.go @@ -6,20 +6,27 @@ import ( iservice "github.com/open-feature/flagd/core/pkg/service" ) +// IEvents is an interface for event subscriptions +type IEvents interface { + Subscribe(id any, notifyChan chan iservice.Notification) + Unsubscribe(id any) + EmitToAll(n iservice.Notification) +} + // eventingConfiguration is a wrapper for notification subscriptions type eventingConfiguration struct { mu *sync.RWMutex - subs map[interface{}]chan iservice.Notification + subs map[any]chan iservice.Notification } -func (eventing *eventingConfiguration) subscribe(id interface{}, notifyChan chan iservice.Notification) { +func (eventing *eventingConfiguration) Subscribe(id any, notifyChan chan iservice.Notification) { eventing.mu.Lock() defer eventing.mu.Unlock() eventing.subs[id] = notifyChan } -func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) { +func (eventing *eventingConfiguration) EmitToAll(n iservice.Notification) { eventing.mu.RLock() defer eventing.mu.RUnlock() @@ -28,7 +35,7 @@ func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) { } } -func (eventing *eventingConfiguration) unSubscribe(id interface{}) { +func (eventing *eventingConfiguration) Unsubscribe(id any) { eventing.mu.Lock() defer eventing.mu.Unlock() diff --git a/flagd/pkg/service/flag-evaluation/eventing_test.go b/flagd/pkg/service/flag-evaluation/eventing_test.go index 01a3e61d..8eba93e1 100644 --- a/flagd/pkg/service/flag-evaluation/eventing_test.go +++ b/flagd/pkg/service/flag-evaluation/eventing_test.go @@ -22,8 +22,8 @@ func TestSubscribe(t *testing.T) { chanB := make(chan iservice.Notification, 1) // when - eventing.subscribe(idA, chanA) - eventing.subscribe(idB, chanB) + eventing.Subscribe(idA, chanA) + eventing.Subscribe(idB, chanB) // then require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association") @@ -43,10 +43,10 @@ func TestUnsubscribe(t *testing.T) { chanB := make(chan iservice.Notification, 1) // when - eventing.subscribe(idA, chanA) - eventing.subscribe(idB, chanB) + eventing.Subscribe(idA, chanA) + eventing.Subscribe(idB, chanB) - eventing.unSubscribe(idA) + eventing.Unsubscribe(idA) // then require.Empty(t, eventing.subs[idA], diff --git a/flagd/pkg/service/flag-evaluation/flag_evaluator.go b/flagd/pkg/service/flag-evaluation/flag_evaluator.go index a01f0c0b..96a0a60b 100644 --- a/flagd/pkg/service/flag-evaluation/flag_evaluator.go +++ b/flagd/pkg/service/flag-evaluation/flag_evaluator.go @@ -30,13 +30,13 @@ type OldFlagEvaluationService struct { logger *logger.Logger eval evaluator.IEvaluator metrics *telemetry.MetricsRecorder - eventingConfiguration *eventingConfiguration + eventingConfiguration IEvents flagEvalTracer trace.Tracer } // NewOldFlagEvaluationService creates a OldFlagEvaluationService with provided parameters func NewOldFlagEvaluationService(log *logger.Logger, - eval evaluator.IEvaluator, eventingCfg *eventingConfiguration, metricsRecorder *telemetry.MetricsRecorder, + eval evaluator.IEvaluator, eventingCfg IEvents, metricsRecorder *telemetry.MetricsRecorder, ) *OldFlagEvaluationService { return &OldFlagEvaluationService{ logger: log, @@ -117,8 +117,8 @@ func (s *OldFlagEvaluationService) EventStream( stream *connect.ServerStream[schemaV1.EventStreamResponse], ) error { requestNotificationChan := make(chan service.Notification, 1) - s.eventingConfiguration.subscribe(req, requestNotificationChan) - defer s.eventingConfiguration.unSubscribe(req) + s.eventingConfiguration.Subscribe(req, requestNotificationChan) + defer s.eventingConfiguration.Unsubscribe(req) requestNotificationChan <- service.Notification{ Type: service.ProviderReady, diff --git a/flagd/pkg/service/flag-evaluation/flag_evaluator_v2.go b/flagd/pkg/service/flag-evaluation/flag_evaluator_v2.go index 32f9cf1c..1e78734f 100644 --- a/flagd/pkg/service/flag-evaluation/flag_evaluator_v2.go +++ b/flagd/pkg/service/flag-evaluation/flag_evaluator_v2.go @@ -23,14 +23,14 @@ type FlagEvaluationService struct { logger *logger.Logger eval evaluator.IEvaluator metrics *telemetry.MetricsRecorder - eventingConfiguration *eventingConfiguration + eventingConfiguration IEvents flagEvalTracer trace.Tracer } // NewFlagEvaluationService creates a FlagEvaluationService with provided parameters func NewFlagEvaluationService(log *logger.Logger, eval evaluator.IEvaluator, - eventingCfg *eventingConfiguration, + eventingCfg IEvents, metricsRecorder *telemetry.MetricsRecorder, ) *FlagEvaluationService { return &FlagEvaluationService{ @@ -116,8 +116,8 @@ func (s *FlagEvaluationService) EventStream( stream *connect.ServerStream[evalV1.EventStreamResponse], ) error { requestNotificationChan := make(chan service.Notification, 1) - s.eventingConfiguration.subscribe(req, requestNotificationChan) - defer s.eventingConfiguration.unSubscribe(req) + s.eventingConfiguration.Subscribe(req, requestNotificationChan) + defer s.eventingConfiguration.Unsubscribe(req) requestNotificationChan <- service.Notification{ Type: service.ProviderReady,