feat: Create interface for eval events. (#1288)

## This PR

Before this change the eval services used a unexported
struct which prevented creating eval services outside of
this package.

This change creates a new IEvents interface that allows
providing custom impls of flag eval services.

### Related Issues


### Notes
<!-- any additional notes for this PR -->

### Follow-up Tasks
<!-- anything that is related to this PR but not done here should be
noted under this section -->
<!-- if there is a need for a new issue, please link it here -->

### How to test
<!-- if applicable, add testing instructions under this section -->

Signed-off-by: Connor Hindley <connor.hindley@tanium.com>
This commit is contained in:
Connor Hindley 2024-04-25 13:19:38 -06:00 committed by GitHub
parent e1752badc2
commit 9714215ced
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 29 additions and 22 deletions

View File

@ -58,7 +58,7 @@ type ConnectService struct {
logger *logger.Logger logger *logger.Logger
eval evaluator.IEvaluator eval evaluator.IEvaluator
metrics *telemetry.MetricsRecorder metrics *telemetry.MetricsRecorder
eventingConfiguration *eventingConfiguration eventingConfiguration IEvents
server *http.Server server *http.Server
metricsServer *http.Server metricsServer *http.Server
@ -125,7 +125,7 @@ func (s *ConnectService) Serve(ctx context.Context, svcConf service.Configuratio
// Notify emits change event notifications for subscriptions // Notify emits change event notifications for subscriptions
func (s *ConnectService) Notify(n service.Notification) { func (s *ConnectService) Notify(n service.Notification) {
s.eventingConfiguration.emitToAll(n) s.eventingConfiguration.EmitToAll(n)
} }
// nolint: funlen // nolint: funlen
@ -209,7 +209,7 @@ func (s *ConnectService) AddMiddleware(mw middleware.IMiddleware) {
func (s *ConnectService) Shutdown() { func (s *ConnectService) Shutdown() {
s.readinessEnabled = false s.readinessEnabled = false
s.eventingConfiguration.emitToAll(service.Notification{ s.eventingConfiguration.EmitToAll(service.Notification{
Type: service.Shutdown, Type: service.Shutdown,
Data: map[string]interface{}{}, Data: map[string]interface{}{},
}) })

View File

@ -182,7 +182,7 @@ func TestConnectServiceNotify(t *testing.T) {
sChan := make(chan iservice.Notification, 1) sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration eventing := service.eventingConfiguration
eventing.subs["key"] = sChan eventing.Subscribe("key", sChan)
// notification type // notification type
ofType := iservice.ConfigurationChange ofType := iservice.ConfigurationChange
@ -220,7 +220,7 @@ func TestConnectServiceShutdown(t *testing.T) {
sChan := make(chan iservice.Notification, 1) sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration eventing := service.eventingConfiguration
eventing.subs["key"] = sChan eventing.Subscribe("key", sChan)
// notification type // notification type
ofType := iservice.Shutdown ofType := iservice.Shutdown

View File

@ -6,20 +6,27 @@ import (
iservice "github.com/open-feature/flagd/core/pkg/service" iservice "github.com/open-feature/flagd/core/pkg/service"
) )
// IEvents is an interface for event subscriptions
type IEvents interface {
Subscribe(id any, notifyChan chan iservice.Notification)
Unsubscribe(id any)
EmitToAll(n iservice.Notification)
}
// eventingConfiguration is a wrapper for notification subscriptions // eventingConfiguration is a wrapper for notification subscriptions
type eventingConfiguration struct { type eventingConfiguration struct {
mu *sync.RWMutex mu *sync.RWMutex
subs map[interface{}]chan iservice.Notification subs map[any]chan iservice.Notification
} }
func (eventing *eventingConfiguration) subscribe(id interface{}, notifyChan chan iservice.Notification) { func (eventing *eventingConfiguration) Subscribe(id any, notifyChan chan iservice.Notification) {
eventing.mu.Lock() eventing.mu.Lock()
defer eventing.mu.Unlock() defer eventing.mu.Unlock()
eventing.subs[id] = notifyChan eventing.subs[id] = notifyChan
} }
func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) { func (eventing *eventingConfiguration) EmitToAll(n iservice.Notification) {
eventing.mu.RLock() eventing.mu.RLock()
defer eventing.mu.RUnlock() defer eventing.mu.RUnlock()
@ -28,7 +35,7 @@ func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
} }
} }
func (eventing *eventingConfiguration) unSubscribe(id interface{}) { func (eventing *eventingConfiguration) Unsubscribe(id any) {
eventing.mu.Lock() eventing.mu.Lock()
defer eventing.mu.Unlock() defer eventing.mu.Unlock()

View File

@ -22,8 +22,8 @@ func TestSubscribe(t *testing.T) {
chanB := make(chan iservice.Notification, 1) chanB := make(chan iservice.Notification, 1)
// when // when
eventing.subscribe(idA, chanA) eventing.Subscribe(idA, chanA)
eventing.subscribe(idB, chanB) eventing.Subscribe(idB, chanB)
// then // then
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association") require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
@ -43,10 +43,10 @@ func TestUnsubscribe(t *testing.T) {
chanB := make(chan iservice.Notification, 1) chanB := make(chan iservice.Notification, 1)
// when // when
eventing.subscribe(idA, chanA) eventing.Subscribe(idA, chanA)
eventing.subscribe(idB, chanB) eventing.Subscribe(idB, chanB)
eventing.unSubscribe(idA) eventing.Unsubscribe(idA)
// then // then
require.Empty(t, eventing.subs[idA], require.Empty(t, eventing.subs[idA],

View File

@ -30,13 +30,13 @@ type OldFlagEvaluationService struct {
logger *logger.Logger logger *logger.Logger
eval evaluator.IEvaluator eval evaluator.IEvaluator
metrics *telemetry.MetricsRecorder metrics *telemetry.MetricsRecorder
eventingConfiguration *eventingConfiguration eventingConfiguration IEvents
flagEvalTracer trace.Tracer flagEvalTracer trace.Tracer
} }
// NewOldFlagEvaluationService creates a OldFlagEvaluationService with provided parameters // NewOldFlagEvaluationService creates a OldFlagEvaluationService with provided parameters
func NewOldFlagEvaluationService(log *logger.Logger, func NewOldFlagEvaluationService(log *logger.Logger,
eval evaluator.IEvaluator, eventingCfg *eventingConfiguration, metricsRecorder *telemetry.MetricsRecorder, eval evaluator.IEvaluator, eventingCfg IEvents, metricsRecorder *telemetry.MetricsRecorder,
) *OldFlagEvaluationService { ) *OldFlagEvaluationService {
return &OldFlagEvaluationService{ return &OldFlagEvaluationService{
logger: log, logger: log,
@ -117,8 +117,8 @@ func (s *OldFlagEvaluationService) EventStream(
stream *connect.ServerStream[schemaV1.EventStreamResponse], stream *connect.ServerStream[schemaV1.EventStreamResponse],
) error { ) error {
requestNotificationChan := make(chan service.Notification, 1) requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.subscribe(req, requestNotificationChan) s.eventingConfiguration.Subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.unSubscribe(req) defer s.eventingConfiguration.Unsubscribe(req)
requestNotificationChan <- service.Notification{ requestNotificationChan <- service.Notification{
Type: service.ProviderReady, Type: service.ProviderReady,

View File

@ -23,14 +23,14 @@ type FlagEvaluationService struct {
logger *logger.Logger logger *logger.Logger
eval evaluator.IEvaluator eval evaluator.IEvaluator
metrics *telemetry.MetricsRecorder metrics *telemetry.MetricsRecorder
eventingConfiguration *eventingConfiguration eventingConfiguration IEvents
flagEvalTracer trace.Tracer flagEvalTracer trace.Tracer
} }
// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters // NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
func NewFlagEvaluationService(log *logger.Logger, func NewFlagEvaluationService(log *logger.Logger,
eval evaluator.IEvaluator, eval evaluator.IEvaluator,
eventingCfg *eventingConfiguration, eventingCfg IEvents,
metricsRecorder *telemetry.MetricsRecorder, metricsRecorder *telemetry.MetricsRecorder,
) *FlagEvaluationService { ) *FlagEvaluationService {
return &FlagEvaluationService{ return &FlagEvaluationService{
@ -116,8 +116,8 @@ func (s *FlagEvaluationService) EventStream(
stream *connect.ServerStream[evalV1.EventStreamResponse], stream *connect.ServerStream[evalV1.EventStreamResponse],
) error { ) error {
requestNotificationChan := make(chan service.Notification, 1) requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.subscribe(req, requestNotificationChan) s.eventingConfiguration.Subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.unSubscribe(req) defer s.eventingConfiguration.Unsubscribe(req)
requestNotificationChan <- service.Notification{ requestNotificationChan <- service.Notification{
Type: service.ProviderReady, Type: service.ProviderReady,