feat: Create interface for eval events. (#1288)
## This PR Before this change the eval services used a unexported struct which prevented creating eval services outside of this package. This change creates a new IEvents interface that allows providing custom impls of flag eval services. ### Related Issues ### Notes <!-- any additional notes for this PR --> ### Follow-up Tasks <!-- anything that is related to this PR but not done here should be noted under this section --> <!-- if there is a need for a new issue, please link it here --> ### How to test <!-- if applicable, add testing instructions under this section --> Signed-off-by: Connor Hindley <connor.hindley@tanium.com>
This commit is contained in:
parent
e1752badc2
commit
9714215ced
|
|
@ -58,7 +58,7 @@ type ConnectService struct {
|
|||
logger *logger.Logger
|
||||
eval evaluator.IEvaluator
|
||||
metrics *telemetry.MetricsRecorder
|
||||
eventingConfiguration *eventingConfiguration
|
||||
eventingConfiguration IEvents
|
||||
|
||||
server *http.Server
|
||||
metricsServer *http.Server
|
||||
|
|
@ -125,7 +125,7 @@ func (s *ConnectService) Serve(ctx context.Context, svcConf service.Configuratio
|
|||
|
||||
// Notify emits change event notifications for subscriptions
|
||||
func (s *ConnectService) Notify(n service.Notification) {
|
||||
s.eventingConfiguration.emitToAll(n)
|
||||
s.eventingConfiguration.EmitToAll(n)
|
||||
}
|
||||
|
||||
// nolint: funlen
|
||||
|
|
@ -209,7 +209,7 @@ func (s *ConnectService) AddMiddleware(mw middleware.IMiddleware) {
|
|||
|
||||
func (s *ConnectService) Shutdown() {
|
||||
s.readinessEnabled = false
|
||||
s.eventingConfiguration.emitToAll(service.Notification{
|
||||
s.eventingConfiguration.EmitToAll(service.Notification{
|
||||
Type: service.Shutdown,
|
||||
Data: map[string]interface{}{},
|
||||
})
|
||||
|
|
|
|||
|
|
@ -182,7 +182,7 @@ func TestConnectServiceNotify(t *testing.T) {
|
|||
|
||||
sChan := make(chan iservice.Notification, 1)
|
||||
eventing := service.eventingConfiguration
|
||||
eventing.subs["key"] = sChan
|
||||
eventing.Subscribe("key", sChan)
|
||||
|
||||
// notification type
|
||||
ofType := iservice.ConfigurationChange
|
||||
|
|
@ -220,7 +220,7 @@ func TestConnectServiceShutdown(t *testing.T) {
|
|||
|
||||
sChan := make(chan iservice.Notification, 1)
|
||||
eventing := service.eventingConfiguration
|
||||
eventing.subs["key"] = sChan
|
||||
eventing.Subscribe("key", sChan)
|
||||
|
||||
// notification type
|
||||
ofType := iservice.Shutdown
|
||||
|
|
|
|||
|
|
@ -6,20 +6,27 @@ import (
|
|||
iservice "github.com/open-feature/flagd/core/pkg/service"
|
||||
)
|
||||
|
||||
// IEvents is an interface for event subscriptions
|
||||
type IEvents interface {
|
||||
Subscribe(id any, notifyChan chan iservice.Notification)
|
||||
Unsubscribe(id any)
|
||||
EmitToAll(n iservice.Notification)
|
||||
}
|
||||
|
||||
// eventingConfiguration is a wrapper for notification subscriptions
|
||||
type eventingConfiguration struct {
|
||||
mu *sync.RWMutex
|
||||
subs map[interface{}]chan iservice.Notification
|
||||
subs map[any]chan iservice.Notification
|
||||
}
|
||||
|
||||
func (eventing *eventingConfiguration) subscribe(id interface{}, notifyChan chan iservice.Notification) {
|
||||
func (eventing *eventingConfiguration) Subscribe(id any, notifyChan chan iservice.Notification) {
|
||||
eventing.mu.Lock()
|
||||
defer eventing.mu.Unlock()
|
||||
|
||||
eventing.subs[id] = notifyChan
|
||||
}
|
||||
|
||||
func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
|
||||
func (eventing *eventingConfiguration) EmitToAll(n iservice.Notification) {
|
||||
eventing.mu.RLock()
|
||||
defer eventing.mu.RUnlock()
|
||||
|
||||
|
|
@ -28,7 +35,7 @@ func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
|
|||
}
|
||||
}
|
||||
|
||||
func (eventing *eventingConfiguration) unSubscribe(id interface{}) {
|
||||
func (eventing *eventingConfiguration) Unsubscribe(id any) {
|
||||
eventing.mu.Lock()
|
||||
defer eventing.mu.Unlock()
|
||||
|
||||
|
|
|
|||
|
|
@ -22,8 +22,8 @@ func TestSubscribe(t *testing.T) {
|
|||
chanB := make(chan iservice.Notification, 1)
|
||||
|
||||
// when
|
||||
eventing.subscribe(idA, chanA)
|
||||
eventing.subscribe(idB, chanB)
|
||||
eventing.Subscribe(idA, chanA)
|
||||
eventing.Subscribe(idB, chanB)
|
||||
|
||||
// then
|
||||
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
|
||||
|
|
@ -43,10 +43,10 @@ func TestUnsubscribe(t *testing.T) {
|
|||
chanB := make(chan iservice.Notification, 1)
|
||||
|
||||
// when
|
||||
eventing.subscribe(idA, chanA)
|
||||
eventing.subscribe(idB, chanB)
|
||||
eventing.Subscribe(idA, chanA)
|
||||
eventing.Subscribe(idB, chanB)
|
||||
|
||||
eventing.unSubscribe(idA)
|
||||
eventing.Unsubscribe(idA)
|
||||
|
||||
// then
|
||||
require.Empty(t, eventing.subs[idA],
|
||||
|
|
|
|||
|
|
@ -30,13 +30,13 @@ type OldFlagEvaluationService struct {
|
|||
logger *logger.Logger
|
||||
eval evaluator.IEvaluator
|
||||
metrics *telemetry.MetricsRecorder
|
||||
eventingConfiguration *eventingConfiguration
|
||||
eventingConfiguration IEvents
|
||||
flagEvalTracer trace.Tracer
|
||||
}
|
||||
|
||||
// NewOldFlagEvaluationService creates a OldFlagEvaluationService with provided parameters
|
||||
func NewOldFlagEvaluationService(log *logger.Logger,
|
||||
eval evaluator.IEvaluator, eventingCfg *eventingConfiguration, metricsRecorder *telemetry.MetricsRecorder,
|
||||
eval evaluator.IEvaluator, eventingCfg IEvents, metricsRecorder *telemetry.MetricsRecorder,
|
||||
) *OldFlagEvaluationService {
|
||||
return &OldFlagEvaluationService{
|
||||
logger: log,
|
||||
|
|
@ -117,8 +117,8 @@ func (s *OldFlagEvaluationService) EventStream(
|
|||
stream *connect.ServerStream[schemaV1.EventStreamResponse],
|
||||
) error {
|
||||
requestNotificationChan := make(chan service.Notification, 1)
|
||||
s.eventingConfiguration.subscribe(req, requestNotificationChan)
|
||||
defer s.eventingConfiguration.unSubscribe(req)
|
||||
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
|
||||
defer s.eventingConfiguration.Unsubscribe(req)
|
||||
|
||||
requestNotificationChan <- service.Notification{
|
||||
Type: service.ProviderReady,
|
||||
|
|
|
|||
|
|
@ -23,14 +23,14 @@ type FlagEvaluationService struct {
|
|||
logger *logger.Logger
|
||||
eval evaluator.IEvaluator
|
||||
metrics *telemetry.MetricsRecorder
|
||||
eventingConfiguration *eventingConfiguration
|
||||
eventingConfiguration IEvents
|
||||
flagEvalTracer trace.Tracer
|
||||
}
|
||||
|
||||
// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
|
||||
func NewFlagEvaluationService(log *logger.Logger,
|
||||
eval evaluator.IEvaluator,
|
||||
eventingCfg *eventingConfiguration,
|
||||
eventingCfg IEvents,
|
||||
metricsRecorder *telemetry.MetricsRecorder,
|
||||
) *FlagEvaluationService {
|
||||
return &FlagEvaluationService{
|
||||
|
|
@ -116,8 +116,8 @@ func (s *FlagEvaluationService) EventStream(
|
|||
stream *connect.ServerStream[evalV1.EventStreamResponse],
|
||||
) error {
|
||||
requestNotificationChan := make(chan service.Notification, 1)
|
||||
s.eventingConfiguration.subscribe(req, requestNotificationChan)
|
||||
defer s.eventingConfiguration.unSubscribe(req)
|
||||
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
|
||||
defer s.eventingConfiguration.Unsubscribe(req)
|
||||
|
||||
requestNotificationChan <- service.Notification{
|
||||
Type: service.ProviderReady,
|
||||
|
|
|
|||
Loading…
Reference in New Issue