feat: Create interface for eval events. (#1288)

## This PR

Before this change the eval services used a unexported
struct which prevented creating eval services outside of
this package.

This change creates a new IEvents interface that allows
providing custom impls of flag eval services.

### Related Issues


### Notes
<!-- any additional notes for this PR -->

### Follow-up Tasks
<!-- anything that is related to this PR but not done here should be
noted under this section -->
<!-- if there is a need for a new issue, please link it here -->

### How to test
<!-- if applicable, add testing instructions under this section -->

Signed-off-by: Connor Hindley <connor.hindley@tanium.com>
This commit is contained in:
Connor Hindley 2024-04-25 13:19:38 -06:00 committed by GitHub
parent e1752badc2
commit 9714215ced
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 29 additions and 22 deletions

View File

@ -58,7 +58,7 @@ type ConnectService struct {
logger *logger.Logger
eval evaluator.IEvaluator
metrics *telemetry.MetricsRecorder
eventingConfiguration *eventingConfiguration
eventingConfiguration IEvents
server *http.Server
metricsServer *http.Server
@ -125,7 +125,7 @@ func (s *ConnectService) Serve(ctx context.Context, svcConf service.Configuratio
// Notify emits change event notifications for subscriptions
func (s *ConnectService) Notify(n service.Notification) {
s.eventingConfiguration.emitToAll(n)
s.eventingConfiguration.EmitToAll(n)
}
// nolint: funlen
@ -209,7 +209,7 @@ func (s *ConnectService) AddMiddleware(mw middleware.IMiddleware) {
func (s *ConnectService) Shutdown() {
s.readinessEnabled = false
s.eventingConfiguration.emitToAll(service.Notification{
s.eventingConfiguration.EmitToAll(service.Notification{
Type: service.Shutdown,
Data: map[string]interface{}{},
})

View File

@ -182,7 +182,7 @@ func TestConnectServiceNotify(t *testing.T) {
sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration
eventing.subs["key"] = sChan
eventing.Subscribe("key", sChan)
// notification type
ofType := iservice.ConfigurationChange
@ -220,7 +220,7 @@ func TestConnectServiceShutdown(t *testing.T) {
sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration
eventing.subs["key"] = sChan
eventing.Subscribe("key", sChan)
// notification type
ofType := iservice.Shutdown

View File

@ -6,20 +6,27 @@ import (
iservice "github.com/open-feature/flagd/core/pkg/service"
)
// IEvents is an interface for event subscriptions
type IEvents interface {
Subscribe(id any, notifyChan chan iservice.Notification)
Unsubscribe(id any)
EmitToAll(n iservice.Notification)
}
// eventingConfiguration is a wrapper for notification subscriptions
type eventingConfiguration struct {
mu *sync.RWMutex
subs map[interface{}]chan iservice.Notification
subs map[any]chan iservice.Notification
}
func (eventing *eventingConfiguration) subscribe(id interface{}, notifyChan chan iservice.Notification) {
func (eventing *eventingConfiguration) Subscribe(id any, notifyChan chan iservice.Notification) {
eventing.mu.Lock()
defer eventing.mu.Unlock()
eventing.subs[id] = notifyChan
}
func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
func (eventing *eventingConfiguration) EmitToAll(n iservice.Notification) {
eventing.mu.RLock()
defer eventing.mu.RUnlock()
@ -28,7 +35,7 @@ func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
}
}
func (eventing *eventingConfiguration) unSubscribe(id interface{}) {
func (eventing *eventingConfiguration) Unsubscribe(id any) {
eventing.mu.Lock()
defer eventing.mu.Unlock()

View File

@ -22,8 +22,8 @@ func TestSubscribe(t *testing.T) {
chanB := make(chan iservice.Notification, 1)
// when
eventing.subscribe(idA, chanA)
eventing.subscribe(idB, chanB)
eventing.Subscribe(idA, chanA)
eventing.Subscribe(idB, chanB)
// then
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
@ -43,10 +43,10 @@ func TestUnsubscribe(t *testing.T) {
chanB := make(chan iservice.Notification, 1)
// when
eventing.subscribe(idA, chanA)
eventing.subscribe(idB, chanB)
eventing.Subscribe(idA, chanA)
eventing.Subscribe(idB, chanB)
eventing.unSubscribe(idA)
eventing.Unsubscribe(idA)
// then
require.Empty(t, eventing.subs[idA],

View File

@ -30,13 +30,13 @@ type OldFlagEvaluationService struct {
logger *logger.Logger
eval evaluator.IEvaluator
metrics *telemetry.MetricsRecorder
eventingConfiguration *eventingConfiguration
eventingConfiguration IEvents
flagEvalTracer trace.Tracer
}
// NewOldFlagEvaluationService creates a OldFlagEvaluationService with provided parameters
func NewOldFlagEvaluationService(log *logger.Logger,
eval evaluator.IEvaluator, eventingCfg *eventingConfiguration, metricsRecorder *telemetry.MetricsRecorder,
eval evaluator.IEvaluator, eventingCfg IEvents, metricsRecorder *telemetry.MetricsRecorder,
) *OldFlagEvaluationService {
return &OldFlagEvaluationService{
logger: log,
@ -117,8 +117,8 @@ func (s *OldFlagEvaluationService) EventStream(
stream *connect.ServerStream[schemaV1.EventStreamResponse],
) error {
requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.unSubscribe(req)
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.Unsubscribe(req)
requestNotificationChan <- service.Notification{
Type: service.ProviderReady,

View File

@ -23,14 +23,14 @@ type FlagEvaluationService struct {
logger *logger.Logger
eval evaluator.IEvaluator
metrics *telemetry.MetricsRecorder
eventingConfiguration *eventingConfiguration
eventingConfiguration IEvents
flagEvalTracer trace.Tracer
}
// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
func NewFlagEvaluationService(log *logger.Logger,
eval evaluator.IEvaluator,
eventingCfg *eventingConfiguration,
eventingCfg IEvents,
metricsRecorder *telemetry.MetricsRecorder,
) *FlagEvaluationService {
return &FlagEvaluationService{
@ -116,8 +116,8 @@ func (s *FlagEvaluationService) EventStream(
stream *connect.ServerStream[evalV1.EventStreamResponse],
) error {
requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.unSubscribe(req)
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.Unsubscribe(req)
requestNotificationChan <- service.Notification{
Type: service.ProviderReady,