feat: Create interface for eval events. (#1288)
## This PR Before this change the eval services used a unexported struct which prevented creating eval services outside of this package. This change creates a new IEvents interface that allows providing custom impls of flag eval services. ### Related Issues ### Notes <!-- any additional notes for this PR --> ### Follow-up Tasks <!-- anything that is related to this PR but not done here should be noted under this section --> <!-- if there is a need for a new issue, please link it here --> ### How to test <!-- if applicable, add testing instructions under this section --> Signed-off-by: Connor Hindley <connor.hindley@tanium.com>
This commit is contained in:
parent
e1752badc2
commit
9714215ced
|
|
@ -58,7 +58,7 @@ type ConnectService struct {
|
||||||
logger *logger.Logger
|
logger *logger.Logger
|
||||||
eval evaluator.IEvaluator
|
eval evaluator.IEvaluator
|
||||||
metrics *telemetry.MetricsRecorder
|
metrics *telemetry.MetricsRecorder
|
||||||
eventingConfiguration *eventingConfiguration
|
eventingConfiguration IEvents
|
||||||
|
|
||||||
server *http.Server
|
server *http.Server
|
||||||
metricsServer *http.Server
|
metricsServer *http.Server
|
||||||
|
|
@ -125,7 +125,7 @@ func (s *ConnectService) Serve(ctx context.Context, svcConf service.Configuratio
|
||||||
|
|
||||||
// Notify emits change event notifications for subscriptions
|
// Notify emits change event notifications for subscriptions
|
||||||
func (s *ConnectService) Notify(n service.Notification) {
|
func (s *ConnectService) Notify(n service.Notification) {
|
||||||
s.eventingConfiguration.emitToAll(n)
|
s.eventingConfiguration.EmitToAll(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: funlen
|
// nolint: funlen
|
||||||
|
|
@ -209,7 +209,7 @@ func (s *ConnectService) AddMiddleware(mw middleware.IMiddleware) {
|
||||||
|
|
||||||
func (s *ConnectService) Shutdown() {
|
func (s *ConnectService) Shutdown() {
|
||||||
s.readinessEnabled = false
|
s.readinessEnabled = false
|
||||||
s.eventingConfiguration.emitToAll(service.Notification{
|
s.eventingConfiguration.EmitToAll(service.Notification{
|
||||||
Type: service.Shutdown,
|
Type: service.Shutdown,
|
||||||
Data: map[string]interface{}{},
|
Data: map[string]interface{}{},
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -182,7 +182,7 @@ func TestConnectServiceNotify(t *testing.T) {
|
||||||
|
|
||||||
sChan := make(chan iservice.Notification, 1)
|
sChan := make(chan iservice.Notification, 1)
|
||||||
eventing := service.eventingConfiguration
|
eventing := service.eventingConfiguration
|
||||||
eventing.subs["key"] = sChan
|
eventing.Subscribe("key", sChan)
|
||||||
|
|
||||||
// notification type
|
// notification type
|
||||||
ofType := iservice.ConfigurationChange
|
ofType := iservice.ConfigurationChange
|
||||||
|
|
@ -220,7 +220,7 @@ func TestConnectServiceShutdown(t *testing.T) {
|
||||||
|
|
||||||
sChan := make(chan iservice.Notification, 1)
|
sChan := make(chan iservice.Notification, 1)
|
||||||
eventing := service.eventingConfiguration
|
eventing := service.eventingConfiguration
|
||||||
eventing.subs["key"] = sChan
|
eventing.Subscribe("key", sChan)
|
||||||
|
|
||||||
// notification type
|
// notification type
|
||||||
ofType := iservice.Shutdown
|
ofType := iservice.Shutdown
|
||||||
|
|
|
||||||
|
|
@ -6,20 +6,27 @@ import (
|
||||||
iservice "github.com/open-feature/flagd/core/pkg/service"
|
iservice "github.com/open-feature/flagd/core/pkg/service"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// IEvents is an interface for event subscriptions
|
||||||
|
type IEvents interface {
|
||||||
|
Subscribe(id any, notifyChan chan iservice.Notification)
|
||||||
|
Unsubscribe(id any)
|
||||||
|
EmitToAll(n iservice.Notification)
|
||||||
|
}
|
||||||
|
|
||||||
// eventingConfiguration is a wrapper for notification subscriptions
|
// eventingConfiguration is a wrapper for notification subscriptions
|
||||||
type eventingConfiguration struct {
|
type eventingConfiguration struct {
|
||||||
mu *sync.RWMutex
|
mu *sync.RWMutex
|
||||||
subs map[interface{}]chan iservice.Notification
|
subs map[any]chan iservice.Notification
|
||||||
}
|
}
|
||||||
|
|
||||||
func (eventing *eventingConfiguration) subscribe(id interface{}, notifyChan chan iservice.Notification) {
|
func (eventing *eventingConfiguration) Subscribe(id any, notifyChan chan iservice.Notification) {
|
||||||
eventing.mu.Lock()
|
eventing.mu.Lock()
|
||||||
defer eventing.mu.Unlock()
|
defer eventing.mu.Unlock()
|
||||||
|
|
||||||
eventing.subs[id] = notifyChan
|
eventing.subs[id] = notifyChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
|
func (eventing *eventingConfiguration) EmitToAll(n iservice.Notification) {
|
||||||
eventing.mu.RLock()
|
eventing.mu.RLock()
|
||||||
defer eventing.mu.RUnlock()
|
defer eventing.mu.RUnlock()
|
||||||
|
|
||||||
|
|
@ -28,7 +35,7 @@ func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (eventing *eventingConfiguration) unSubscribe(id interface{}) {
|
func (eventing *eventingConfiguration) Unsubscribe(id any) {
|
||||||
eventing.mu.Lock()
|
eventing.mu.Lock()
|
||||||
defer eventing.mu.Unlock()
|
defer eventing.mu.Unlock()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,8 +22,8 @@ func TestSubscribe(t *testing.T) {
|
||||||
chanB := make(chan iservice.Notification, 1)
|
chanB := make(chan iservice.Notification, 1)
|
||||||
|
|
||||||
// when
|
// when
|
||||||
eventing.subscribe(idA, chanA)
|
eventing.Subscribe(idA, chanA)
|
||||||
eventing.subscribe(idB, chanB)
|
eventing.Subscribe(idB, chanB)
|
||||||
|
|
||||||
// then
|
// then
|
||||||
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
|
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
|
||||||
|
|
@ -43,10 +43,10 @@ func TestUnsubscribe(t *testing.T) {
|
||||||
chanB := make(chan iservice.Notification, 1)
|
chanB := make(chan iservice.Notification, 1)
|
||||||
|
|
||||||
// when
|
// when
|
||||||
eventing.subscribe(idA, chanA)
|
eventing.Subscribe(idA, chanA)
|
||||||
eventing.subscribe(idB, chanB)
|
eventing.Subscribe(idB, chanB)
|
||||||
|
|
||||||
eventing.unSubscribe(idA)
|
eventing.Unsubscribe(idA)
|
||||||
|
|
||||||
// then
|
// then
|
||||||
require.Empty(t, eventing.subs[idA],
|
require.Empty(t, eventing.subs[idA],
|
||||||
|
|
|
||||||
|
|
@ -30,13 +30,13 @@ type OldFlagEvaluationService struct {
|
||||||
logger *logger.Logger
|
logger *logger.Logger
|
||||||
eval evaluator.IEvaluator
|
eval evaluator.IEvaluator
|
||||||
metrics *telemetry.MetricsRecorder
|
metrics *telemetry.MetricsRecorder
|
||||||
eventingConfiguration *eventingConfiguration
|
eventingConfiguration IEvents
|
||||||
flagEvalTracer trace.Tracer
|
flagEvalTracer trace.Tracer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOldFlagEvaluationService creates a OldFlagEvaluationService with provided parameters
|
// NewOldFlagEvaluationService creates a OldFlagEvaluationService with provided parameters
|
||||||
func NewOldFlagEvaluationService(log *logger.Logger,
|
func NewOldFlagEvaluationService(log *logger.Logger,
|
||||||
eval evaluator.IEvaluator, eventingCfg *eventingConfiguration, metricsRecorder *telemetry.MetricsRecorder,
|
eval evaluator.IEvaluator, eventingCfg IEvents, metricsRecorder *telemetry.MetricsRecorder,
|
||||||
) *OldFlagEvaluationService {
|
) *OldFlagEvaluationService {
|
||||||
return &OldFlagEvaluationService{
|
return &OldFlagEvaluationService{
|
||||||
logger: log,
|
logger: log,
|
||||||
|
|
@ -117,8 +117,8 @@ func (s *OldFlagEvaluationService) EventStream(
|
||||||
stream *connect.ServerStream[schemaV1.EventStreamResponse],
|
stream *connect.ServerStream[schemaV1.EventStreamResponse],
|
||||||
) error {
|
) error {
|
||||||
requestNotificationChan := make(chan service.Notification, 1)
|
requestNotificationChan := make(chan service.Notification, 1)
|
||||||
s.eventingConfiguration.subscribe(req, requestNotificationChan)
|
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
|
||||||
defer s.eventingConfiguration.unSubscribe(req)
|
defer s.eventingConfiguration.Unsubscribe(req)
|
||||||
|
|
||||||
requestNotificationChan <- service.Notification{
|
requestNotificationChan <- service.Notification{
|
||||||
Type: service.ProviderReady,
|
Type: service.ProviderReady,
|
||||||
|
|
|
||||||
|
|
@ -23,14 +23,14 @@ type FlagEvaluationService struct {
|
||||||
logger *logger.Logger
|
logger *logger.Logger
|
||||||
eval evaluator.IEvaluator
|
eval evaluator.IEvaluator
|
||||||
metrics *telemetry.MetricsRecorder
|
metrics *telemetry.MetricsRecorder
|
||||||
eventingConfiguration *eventingConfiguration
|
eventingConfiguration IEvents
|
||||||
flagEvalTracer trace.Tracer
|
flagEvalTracer trace.Tracer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
|
// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
|
||||||
func NewFlagEvaluationService(log *logger.Logger,
|
func NewFlagEvaluationService(log *logger.Logger,
|
||||||
eval evaluator.IEvaluator,
|
eval evaluator.IEvaluator,
|
||||||
eventingCfg *eventingConfiguration,
|
eventingCfg IEvents,
|
||||||
metricsRecorder *telemetry.MetricsRecorder,
|
metricsRecorder *telemetry.MetricsRecorder,
|
||||||
) *FlagEvaluationService {
|
) *FlagEvaluationService {
|
||||||
return &FlagEvaluationService{
|
return &FlagEvaluationService{
|
||||||
|
|
@ -116,8 +116,8 @@ func (s *FlagEvaluationService) EventStream(
|
||||||
stream *connect.ServerStream[evalV1.EventStreamResponse],
|
stream *connect.ServerStream[evalV1.EventStreamResponse],
|
||||||
) error {
|
) error {
|
||||||
requestNotificationChan := make(chan service.Notification, 1)
|
requestNotificationChan := make(chan service.Notification, 1)
|
||||||
s.eventingConfiguration.subscribe(req, requestNotificationChan)
|
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
|
||||||
defer s.eventingConfiguration.unSubscribe(req)
|
defer s.eventingConfiguration.Unsubscribe(req)
|
||||||
|
|
||||||
requestNotificationChan <- service.Notification{
|
requestNotificationChan <- service.Notification{
|
||||||
Type: service.ProviderReady,
|
Type: service.ProviderReady,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue