chore: refactoring component structure (#1044)

This PR refactors the core package by restructuring the components
responsible for managing the subscriptions, as well as the creation of
sync sources. The following changes have been made:

- Renamed `sync-store` package to `subscriptions`. This has been done to
avoid confusion because we already have a `sync`, and a `store`.
package. Within the package, the `SyncStore`. has been renamed to
`subscriptions.Manager`, which should reflect its responsibility in a
better way. Also, the `syncHandler` has been renamed to `multiplexer`,
as this one is responsible for sending updates of a certain target to
all subscribers - `syncHandler` was a bit too generic in my opinion.
- Moved the `GetSyncSourceFromURI` method to a new package,
`sync/builder`, to remove the responsibility of building concrete sync
sources from the subscription manager
- Moved the logic of retrieving the K8s client config to the sync
builder. Previously, this was done in the `runtime` package by calling
the respective methods for the config retrieval provided by the
`sync/kubernetes` package and then handing that config back to the
initialization of the `K8sSync`. Note: This step can potentially be done
in a separate PR, if so desired.

---------

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
Co-authored-by: Giovanni Liva <giovanni.liva@dynatrace.com>
This commit is contained in:
Florian Bacher 2023-12-07 06:59:17 +01:00 committed by GitHub
parent ec5f778286
commit 0c7f78a95f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1035 additions and 757 deletions

View File

@ -72,6 +72,7 @@ mockgen: install-mockgen
cd core; mockgen -source=pkg/sync/grpc/credentials/builder.go -destination=pkg/sync/grpc/credentials/mock/builder.go -package=credendialsmock
cd core; mockgen -source=pkg/eval/ievaluator.go -destination=pkg/eval/mock/ievaluator.go -package=evalmock
cd core; mockgen -source=pkg/service/middleware/interface.go -destination=pkg/service/middleware/mock/interface.go -package=middlewaremock
cd core; mockgen -source=pkg/sync/builder/syncbuilder.go -destination=pkg/sync/builder/mock/syncbuilder.go -package=middlewaremocksyncbuildermock
generate-docs:
cd flagd; go run ./cmd/doc/main.go

View File

@ -2,13 +2,7 @@ package runtime
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"regexp"
msync "sync"
"time"
"github.com/open-feature/flagd/core/pkg/eval"
"github.com/open-feature/flagd/core/pkg/logger"
@ -16,46 +10,14 @@ import (
flageval "github.com/open-feature/flagd/core/pkg/service/flag-evaluation"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/flagd/core/pkg/sync/file"
"github.com/open-feature/flagd/core/pkg/sync/grpc"
"github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
httpSync "github.com/open-feature/flagd/core/pkg/sync/http"
"github.com/open-feature/flagd/core/pkg/sync/kubernetes"
syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder"
"github.com/open-feature/flagd/core/pkg/telemetry"
"github.com/robfig/cron"
"go.uber.org/zap"
)
// from_config is a collection of structures and parsers responsible for deriving flagd runtime
const (
syncProviderFile = "file"
syncProviderGrpc = "grpc"
syncProviderKubernetes = "kubernetes"
syncProviderHTTP = "http"
svcName = "flagd"
)
var (
regCrd *regexp.Regexp
regURL *regexp.Regexp
regGRPC *regexp.Regexp
regGRPCSecure *regexp.Regexp
regFile *regexp.Regexp
)
// SourceConfig is configuration option for flagd. This maps to startup parameter sources
type SourceConfig struct {
URI string `json:"uri"`
Provider string `json:"provider"`
BearerToken string `json:"bearerToken,omitempty"`
CertPath string `json:"certPath,omitempty"`
TLS bool `json:"tls,omitempty"`
ProviderID string `json:"providerID,omitempty"`
Selector string `json:"selector,omitempty"`
Interval uint32 `json:"interval,omitempty"`
}
const svcName = "flagd"
// Config is the configuration structure derived from startup arguments.
type Config struct {
@ -67,18 +29,10 @@ type Config struct {
ServicePort uint16
ServiceSocketPath string
SyncProviders []SourceConfig
SyncProviders []sync.SourceConfig
CORS []string
}
func init() {
regCrd = regexp.MustCompile("^core.openfeature.dev/")
regURL = regexp.MustCompile("^https?://")
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure)
regFile = regexp.MustCompile("^file:")
}
// FromConfig builds a runtime from startup configurations
// nolint: funlen
func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime, error) {
@ -176,155 +130,12 @@ func setupJSONEvaluator(logger *logger.Logger, s *store.Flags) *eval.JSONEvaluat
}
// syncProvidersFromConfig is a helper to build ISync implementations from SourceConfig
func syncProvidersFromConfig(logger *logger.Logger, sources []SourceConfig) ([]sync.ISync, error) {
syncImpls := []sync.ISync{}
for _, syncProvider := range sources {
switch syncProvider.Provider {
case syncProviderFile:
syncImpls = append(syncImpls, NewFile(syncProvider, logger))
logger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", syncProvider.URI))
case syncProviderKubernetes:
k, err := NewK8s(syncProvider.URI, logger)
if err != nil {
return nil, err
}
syncImpls = append(syncImpls, k)
logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", syncProvider.URI))
case syncProviderHTTP:
syncImpls = append(syncImpls, NewHTTP(syncProvider, logger))
logger.Debug(fmt.Sprintf("using remote sync-provider for: %s", syncProvider.URI))
case syncProviderGrpc:
syncImpls = append(syncImpls, NewGRPC(syncProvider, logger))
logger.Debug(fmt.Sprintf("using grpc sync-provider for: %s", syncProvider.URI))
default:
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'",
syncProvider.Provider, syncProviderFile, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
}
}
return syncImpls, nil
}
func NewGRPC(config SourceConfig, logger *logger.Logger) *grpc.Sync {
return &grpc.Sync{
URI: config.URI,
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
CredentialBuilder: &credentials.CredentialBuilder{},
CertPath: config.CertPath,
ProviderID: config.ProviderID,
Secure: config.TLS,
Selector: config.Selector,
}
}
func NewHTTP(config SourceConfig, logger *logger.Logger) *httpSync.Sync {
// Default to 5 seconds
var interval uint32 = 5
if config.Interval != 0 {
interval = config.Interval
}
return &httpSync.Sync{
URI: config.URI,
Client: &http.Client{
Timeout: time.Second * 10,
},
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "remote"),
),
BearerToken: config.BearerToken,
Interval: interval,
Cron: cron.New(),
}
}
func NewK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
reader, dynamic, err := kubernetes.GetClients()
func syncProvidersFromConfig(logger *logger.Logger, sources []sync.SourceConfig) ([]sync.ISync, error) {
builder := syncbuilder.NewSyncBuilder()
syncs, err := builder.SyncsFromConfig(sources, logger)
if err != nil {
return nil, fmt.Errorf("error creating kubernetes clients: %w", err)
return nil, fmt.Errorf("could not create sync sources from config: %w", err)
}
return kubernetes.NewK8sSync(
logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "kubernetes"),
),
regCrd.ReplaceAllString(uri, ""),
reader,
dynamic,
), nil
}
func NewFile(config SourceConfig, logger *logger.Logger) *file.Sync {
return &file.Sync{
URI: config.URI,
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "filepath"),
),
Mux: &msync.RWMutex{},
}
}
// ParseSources parse a json formatted SourceConfig array string and performs validations on the content
func ParseSources(sourcesFlag string) ([]SourceConfig, error) {
syncProvidersParsed := []SourceConfig{}
if err := json.Unmarshal([]byte(sourcesFlag), &syncProvidersParsed); err != nil {
return syncProvidersParsed, fmt.Errorf("error parsing sync providers: %w", err)
}
for _, sp := range syncProvidersParsed {
if sp.URI == "" {
return syncProvidersParsed, errors.New("sync provider argument parse: uri is a required field")
}
if sp.Provider == "" {
return syncProvidersParsed, errors.New("sync provider argument parse: provider is a required field")
}
}
return syncProvidersParsed, nil
}
// ParseSyncProviderURIs uri flag based sync sources to SourceConfig array. Replaces uri prefixes where necessary to
// derive SourceConfig
func ParseSyncProviderURIs(uris []string) ([]SourceConfig, error) {
syncProvidersParsed := []SourceConfig{}
for _, uri := range uris {
switch uriB := []byte(uri); {
case regFile.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
URI: regFile.ReplaceAllString(uri, ""),
Provider: syncProviderFile,
})
case regCrd.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
URI: regCrd.ReplaceAllString(uri, ""),
Provider: syncProviderKubernetes,
})
case regURL.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
URI: uri,
Provider: syncProviderHTTP,
})
case regGRPC.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
URI: regGRPC.ReplaceAllString(uri, ""),
Provider: syncProviderGrpc,
})
case regGRPCSecure.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
URI: regGRPCSecure.ReplaceAllString(uri, ""),
Provider: syncProviderGrpc,
TLS: true,
})
default:
return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+
"'http(s)://', 'grpc(s)://', or 'core.openfeature.dev'", uri)
}
}
return syncProvidersParsed, nil
return syncs, nil
}

View File

@ -1,7 +1,6 @@
package runtime
import (
"reflect"
"testing"
"github.com/open-feature/flagd/core/pkg/logger"
@ -9,304 +8,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestParseSource(t *testing.T) {
test := map[string]struct {
in string
expectErr bool
out []SourceConfig
}{
"simple": {
in: "[{\"uri\":\"config/samples/example_flags.json\",\"provider\":\"file\"}]",
expectErr: false,
out: []SourceConfig{
{
URI: "config/samples/example_flags.json",
Provider: syncProviderFile,
},
},
},
"multiple-syncs": {
in: `[
{"uri":"config/samples/example_flags.json","provider":"file"},
{"uri":"http://test.com","provider":"http","bearerToken":":)"},
{"uri":"host:port","provider":"grpc"},
{"uri":"default/my-crd","provider":"kubernetes"}
]`,
expectErr: false,
out: []SourceConfig{
{
URI: "config/samples/example_flags.json",
Provider: syncProviderFile,
},
{
URI: "http://test.com",
Provider: syncProviderHTTP,
BearerToken: ":)",
},
{
URI: "host:port",
Provider: syncProviderGrpc,
},
{
URI: "default/my-crd",
Provider: syncProviderKubernetes,
},
},
},
"multiple-syncs-with-options": {
in: `[{"uri":"config/samples/example_flags.json","provider":"file"},
{"uri":"http://my-flag-source.json","provider":"http","bearerToken":"bearer-dji34ld2l"},
{"uri":"https://secure-remote","provider":"http","bearerToken":"bearer-dji34ld2l"},
{"uri":"http://site.com","provider":"http","interval":77 },
{"uri":"default/my-flag-config","provider":"kubernetes"},
{"uri":"grpc-source:8080","provider":"grpc"},
{"uri":"my-flag-source:8080","provider":"grpc", "tls":true, "certPath": "/certs/ca.cert", "providerID": "flagd-weatherapp-sidecar", "selector": "source=database,app=weatherapp"}]
`,
expectErr: false,
out: []SourceConfig{
{
URI: "config/samples/example_flags.json",
Provider: syncProviderFile,
},
{
URI: "http://my-flag-source.json",
Provider: syncProviderHTTP,
BearerToken: "bearer-dji34ld2l",
},
{
URI: "https://secure-remote",
Provider: syncProviderHTTP,
BearerToken: "bearer-dji34ld2l",
},
{
URI: "http://site.com",
Provider: syncProviderHTTP,
Interval: 77,
},
{
URI: "default/my-flag-config",
Provider: syncProviderKubernetes,
},
{
URI: "grpc-source:8080",
Provider: syncProviderGrpc,
},
{
URI: "my-flag-source:8080",
Provider: syncProviderGrpc,
TLS: true,
CertPath: "/certs/ca.cert",
ProviderID: "flagd-weatherapp-sidecar",
Selector: "source=database,app=weatherapp",
},
},
},
"empty": {
in: `[]`,
expectErr: false,
out: []SourceConfig{},
},
"parse-failure": {
in: ``,
expectErr: true,
out: []SourceConfig{},
},
}
for name, tt := range test {
t.Run(name, func(t *testing.T) {
out, err := ParseSources(tt.in)
if tt.expectErr {
if err == nil {
t.Error("expected error, got none")
}
} else if err != nil {
t.Errorf("did not expect error: %s", err.Error())
}
if !reflect.DeepEqual(out, tt.out) {
t.Errorf("unexpected output, expected %v, got %v", tt.out, out)
}
})
}
}
func TestParseSyncProviderURIs(t *testing.T) {
test := map[string]struct {
in []string
expectErr bool
out []SourceConfig
}{
"simple": {
in: []string{
"file:my-file.json",
},
expectErr: false,
out: []SourceConfig{
{
URI: "my-file.json",
Provider: "file",
},
},
},
"multiple-uris": {
in: []string{
"file:my-file.json",
"https://test.com",
"grpc://host:port",
"grpcs://secure-grpc",
"core.openfeature.dev/default/my-crd",
},
expectErr: false,
out: []SourceConfig{
{
URI: "my-file.json",
Provider: "file",
},
{
URI: "https://test.com",
Provider: "http",
},
{
URI: "host:port",
Provider: "grpc",
TLS: false,
},
{
URI: "secure-grpc",
Provider: "grpc",
TLS: true,
},
{
URI: "default/my-crd",
Provider: "kubernetes",
},
},
},
"empty": {
in: []string{},
expectErr: false,
out: []SourceConfig{},
},
"parse-failure": {
in: []string{"care.openfeature.dev/will/fail"},
expectErr: true,
out: []SourceConfig{},
},
}
for name, tt := range test {
t.Run(name, func(t *testing.T) {
out, err := ParseSyncProviderURIs(tt.in)
if tt.expectErr {
if err == nil {
t.Error("expected error, got none")
}
} else if err != nil {
t.Errorf("did not expect error: %s", err.Error())
}
if !reflect.DeepEqual(out, tt.out) {
t.Errorf("unexpected output, expected %v, got %v", tt.out, out)
}
})
}
}
// Note - K8s configuration require K8s client, hence do not use K8s sync provider in this test
func Test_syncProvidersFromConfig(t *testing.T) {
lg := logger.NewLogger(nil, false)
type args struct {
logger *logger.Logger
sources []SourceConfig
}
tests := []struct {
name string
args args
wantSyncs int // simply check the count of ISync providers yield from configurations
wantErr bool
}{
{
name: "Empty",
args: args{
logger: lg,
sources: []SourceConfig{},
},
wantSyncs: 0,
wantErr: false,
},
{
name: "Error",
args: args{
logger: lg,
sources: []SourceConfig{
{
URI: "fake",
Provider: "disk",
},
},
},
wantSyncs: 0,
wantErr: true,
},
{
name: "single",
args: args{
logger: lg,
sources: []SourceConfig{
{
URI: "grpc://host:port",
Provider: syncProviderGrpc,
ProviderID: "myapp",
CertPath: "/tmp/ca.cert",
Selector: "source=database",
},
},
},
wantSyncs: 1,
wantErr: false,
},
{
name: "combined",
args: args{
logger: lg,
sources: []SourceConfig{
{
URI: "grpc://host:port",
Provider: syncProviderGrpc,
ProviderID: "myapp",
CertPath: "/tmp/ca.cert",
Selector: "source=database",
},
{
URI: "https://host:port",
Provider: syncProviderHTTP,
BearerToken: "token",
},
{
URI: "/tmp/flags.json",
Provider: syncProviderFile,
},
},
},
wantSyncs: 3,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
syncs, err := syncProvidersFromConfig(tt.args.logger, tt.args.sources)
if (err != nil) != tt.wantErr {
t.Errorf("syncProvidersFromConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.wantSyncs != len(syncs) {
t.Errorf("syncProvidersFromConfig() yielded = %v, but expected %v", len(syncs), tt.wantSyncs)
}
})
}
}
func Test_setupJSONEvaluator(t *testing.T) {
lg := logger.NewLogger(nil, false)

View File

@ -7,13 +7,13 @@ import (
rpc "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/subscriptions"
"github.com/open-feature/flagd/core/pkg/sync"
syncStore "github.com/open-feature/flagd/core/pkg/sync-store"
)
type handler struct {
rpc.UnimplementedFlagSyncServiceServer
syncStore syncStore.ISyncStore
syncStore subscriptions.Manager
logger *logger.Logger
}

View File

@ -12,7 +12,7 @@ import (
rpc "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
"github.com/open-feature/flagd/core/pkg/logger"
iservice "github.com/open-feature/flagd/core/pkg/service"
syncStore "github.com/open-feature/flagd/core/pkg/sync-store"
"github.com/open-feature/flagd/core/pkg/subscriptions"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
@ -32,7 +32,7 @@ type Server struct {
metricServerReady bool
}
func NewServer(logger *logger.Logger, store syncStore.ISyncStore) *Server {
func NewServer(logger *logger.Logger, store subscriptions.Manager) *Server {
return &Server{
handler: &handler{
logger: logger,

View File

@ -1,44 +1,45 @@
//nolint:contextcheck
package store
package subscriptions
import (
"context"
"errors"
"fmt"
"regexp"
"sync"
"time"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/runtime"
isync "github.com/open-feature/flagd/core/pkg/sync"
"go.uber.org/zap"
syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder"
)
var (
regCrd *regexp.Regexp
regFile *regexp.Regexp
)
// Manager defines the interface for the subscription management
type Manager interface {
FetchAllFlags(
ctx context.Context,
key interface{},
target string,
) (isync.DataSync, error)
RegisterSubscription(
ctx context.Context,
target string,
key interface{},
dataSync chan isync.DataSync,
errChan chan error,
)
func init() {
regCrd = regexp.MustCompile("^core.openfeature.dev/")
regFile = regexp.MustCompile("^file:")
// metrics hooks
GetActiveSubscriptionsInt64() int64
}
type SyncStore struct {
// Coordinator coordinates subscriptions by aggregating subscribers for the same target, and keeping them up to date
// for any updates that have happened for those targets.
type Coordinator struct {
ctx context.Context
syncHandlers map[string]*syncHandler
multiplexers map[string]*multiplexer
logger *logger.Logger
mu *sync.RWMutex
syncBuilder SyncBuilderInterface
}
type syncHandler struct {
subs map[interface{}]storedChannels
dataSync chan isync.DataSync
cancelFunc context.CancelFunc
syncRef isync.ISync
mu *sync.RWMutex
syncBuilder syncbuilder.ISyncBuilder
}
type storedChannels struct {
@ -46,27 +47,27 @@ type storedChannels struct {
dataSync chan isync.DataSync
}
// NewSyncStore returns a new sync store
func NewSyncStore(ctx context.Context, logger *logger.Logger) *SyncStore {
ss := SyncStore{
// NewManager returns a new subscription manager
func NewManager(ctx context.Context, logger *logger.Logger) *Coordinator {
mgr := Coordinator{
ctx: ctx,
syncHandlers: map[string]*syncHandler{},
multiplexers: map[string]*multiplexer{},
logger: logger,
mu: &sync.RWMutex{},
syncBuilder: &SyncBuilder{},
syncBuilder: &syncbuilder.SyncBuilder{},
}
go ss.cleanup()
return &ss
go mgr.cleanup()
return &mgr
}
// FetchAllFlags returns a DataSync containing the full set of flag configurations from the SyncStore.
// FetchAllFlags returns a DataSync containing the full set of flag configurations from the Coordinator.
// This will either occur via triggering a resync, or through setting up a new subscription to the resource
func (s *SyncStore) FetchAllFlags(ctx context.Context, key interface{}, target string) (isync.DataSync, error) {
func (s *Coordinator) FetchAllFlags(ctx context.Context, key interface{}, target string) (isync.DataSync, error) {
s.logger.Debug(fmt.Sprintf("fetching all flags for target %s", target))
dataSyncChan := make(chan isync.DataSync, 1)
errChan := make(chan error, 1)
s.mu.RLock()
syncHandler, ok := s.syncHandlers[target]
syncHandler, ok := s.multiplexers[target]
s.mu.RUnlock()
if !ok {
s.logger.Debug(fmt.Sprintf("sync handler does not exist for target %s, registering a new subscription", target))
@ -95,7 +96,7 @@ func (s *SyncStore) FetchAllFlags(ctx context.Context, key interface{}, target s
// RegisterSubscription starts a new subscription to the target resource.
// Once the subscription is set an ALL sync event will be received via the DataSync chan.
func (s *SyncStore) RegisterSubscription(
func (s *Coordinator) RegisterSubscription(
ctx context.Context,
target string,
key interface{},
@ -105,16 +106,16 @@ func (s *SyncStore) RegisterSubscription(
s.mu.Lock()
defer s.mu.Unlock()
// is there a currently active subscription for this target?
sh, ok := s.syncHandlers[target]
sh, ok := s.multiplexers[target]
if !ok {
// we need to start a sync for this
s.logger.Debug(
fmt.Sprintf(
"sync handler does not exist for target %s, registering syncHandler with sub %p",
"sync handler does not exist for target %s, registering multiplexer with sub %p",
target,
key,
))
s.syncHandlers[target] = &syncHandler{
s.multiplexers[target] = &multiplexer{
dataSync: make(chan isync.DataSync),
subs: map[interface{}]storedChannels{
key: {
@ -137,7 +138,7 @@ func (s *SyncStore) RegisterSubscription(
go func() {
s.mu.RLock()
defer s.mu.RUnlock()
if _, ok := s.syncHandlers[target]; ok {
if _, ok := s.multiplexers[target]; ok {
s.logger.Debug(fmt.Sprintf("sync handler exists for target %s, triggering a resync", target))
if err := sh.syncRef.ReSync(ctx, dataSync); err != nil {
errChan <- err
@ -151,28 +152,28 @@ func (s *SyncStore) RegisterSubscription(
<-ctx.Done()
s.mu.Lock()
defer s.mu.Unlock()
if s.syncHandlers[target] != nil && s.syncHandlers[target].subs != nil {
if s.multiplexers[target] != nil && s.multiplexers[target].subs != nil {
s.logger.Debug(fmt.Sprintf("removing sync subscription due to context cancellation %p", key))
delete(s.syncHandlers[target].subs, key)
delete(s.multiplexers[target].subs, key)
}
}()
}
func (s *SyncStore) watchResource(target string) {
func (s *Coordinator) watchResource(target string) {
s.logger.Debug(fmt.Sprintf("watching resource %s", target))
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
sh, ok := s.syncHandlers[target]
sh, ok := s.multiplexers[target]
if !ok {
s.logger.Error(fmt.Sprintf("no sync handler exists for target %s", target))
return
}
// this cancel is accessed by the cleanup method shutdown the listener + delete the syncHandler
// this cancel is accessed by the cleanup method shutdown the listener + delete the multiplexer
sh.cancelFunc = cancel
go func() {
<-ctx.Done()
s.mu.Lock()
delete(s.syncHandlers, target)
delete(s.multiplexers, target)
s.mu.Unlock()
}()
// broadcast any data passed through the core channel to all subscribing channels
@ -182,73 +183,47 @@ func (s *SyncStore) watchResource(target string) {
case <-ctx.Done():
return
case d := <-sh.dataSync:
sh.writeData(s.logger, d)
sh.broadcastData(s.logger, d)
}
}
}()
// setup sync, if this fails an error is broadcasted, and the defer results in cleanup
sync, err := s.syncBuilder.SyncFromURI(target, s.logger)
syncSource, err := s.syncBuilder.SyncFromURI(target, s.logger)
if err != nil {
s.logger.Error(fmt.Sprintf("unable to build sync from URI for target %s: %s", target, err.Error()))
sh.writeError(s.logger, err)
sh.broadcastError(s.logger, err)
return
}
// init sync, if this fails an error is broadcasted, and the defer results in cleanup
err = sync.Init(ctx)
err = syncSource.Init(ctx)
if err != nil {
s.logger.Error(fmt.Sprintf("unable to initiate sync for target %s: %s", target, err.Error()))
sh.writeError(s.logger, err)
sh.broadcastError(s.logger, err)
return
}
// sync ref is used to trigger a resync on a single channel when a new subscription is started
// syncSource ref is used to trigger a resync on a single channel when a new subscription is started
// but the associated SyncHandler already exists, i.e. this function is not run
sh.syncRef = sync
err = sync.Sync(ctx, sh.dataSync)
sh.syncRef = syncSource
err = syncSource.Sync(ctx, sh.dataSync)
if err != nil {
s.logger.Error(fmt.Sprintf("error from sync for target %s: %s", target, err.Error()))
sh.writeError(s.logger, err)
sh.broadcastError(s.logger, err)
}
}
func (h *syncHandler) writeError(logger *logger.Logger, err error) {
h.mu.RLock()
defer h.mu.RUnlock()
for k, ec := range h.subs {
select {
case ec.errChan <- err:
continue
default:
logger.Error(fmt.Sprintf("unable to write error to channel for key %p", k))
}
}
}
func (h *syncHandler) writeData(logger *logger.Logger, data isync.DataSync) {
h.mu.RLock()
defer h.mu.RUnlock()
for k, ds := range h.subs {
select {
case ds.dataSync <- data:
continue
default:
logger.Error(fmt.Sprintf("unable to write data to channel for key %p", k))
}
}
}
func (s *SyncStore) cleanup() {
func (s *Coordinator) cleanup() {
for {
select {
case <-s.ctx.Done():
return
case <-time.After(5 * time.Second):
s.mu.Lock()
for k, v := range s.syncHandlers {
// delete any syncHandlers with 0 active subscriptions through cancelling its context
s.logger.Debug(fmt.Sprintf("syncHandler for target %s has %d subscriptions", k, len(v.subs)))
for k, v := range s.multiplexers {
// delete any multiplexers with 0 active subscriptions through cancelling its context
s.logger.Debug(fmt.Sprintf("multiplexer for target %s has %d subscriptions", k, len(v.subs)))
if len(v.subs) == 0 {
s.logger.Debug(fmt.Sprintf("shutting down syncHandler %s", k))
s.syncHandlers[k].cancelFunc()
s.logger.Debug(fmt.Sprintf("shutting down multiplexer %s", k))
s.multiplexers[k].cancelFunc()
}
}
s.mu.Unlock()
@ -256,45 +231,14 @@ func (s *SyncStore) cleanup() {
}
}
func (s *SyncStore) GetActiveSubscriptionsInt64() int64 {
func (s *Coordinator) GetActiveSubscriptionsInt64() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
syncs := 0
for _, v := range s.syncHandlers {
for _, v := range s.multiplexers {
syncs += len(v.subs)
}
return int64(syncs)
}
type SyncBuilderInterface interface {
SyncFromURI(uri string, logger *logger.Logger) (isync.ISync, error)
}
type SyncBuilder struct{}
// SyncFromURI builds an ISync interface from the input uri string
func (sb *SyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (isync.ISync, error) {
switch uriB := []byte(uri); {
// filepath may be used for debugging, not recommended in deployment
case regFile.Match(uriB):
return runtime.NewFile(runtime.SourceConfig{
URI: regFile.ReplaceAllString(uri, ""),
}, logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "filepath"),
zap.String("target", "target"),
)), nil
case regCrd.Match(uriB):
s, err := runtime.NewK8s(uri, logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "kubernetes"),
))
if err != nil {
return nil, fmt.Errorf("error creating k8s sync: %w", err)
}
return s, nil
}
return nil, fmt.Errorf("unrecognized URI: %s", uri)
}

View File

@ -1,4 +1,4 @@
package store
package subscriptions
import (
"context"
@ -59,17 +59,21 @@ type syncBuilderMock struct {
initError error
}
func (s *syncBuilderMock) SyncsFromConfig(_ []isync.SourceConfig, _ *logger.Logger) ([]isync.ISync, error) {
return nil, nil
}
func (s *syncBuilderMock) SyncFromURI(_ string, _ *logger.Logger) (isync.ISync, error) {
return s.mock, s.initError
}
func newSyncHandler() (*syncHandler, string) {
func newSyncHandler() (*multiplexer, string) {
coreDataSyncChan := make(chan isync.DataSync, 1)
dataSyncChan := make(chan isync.DataSync, 1)
errChan := make(chan error, 1)
key := "key"
return &syncHandler{
return &multiplexer{
dataSync: coreDataSyncChan,
subs: map[interface{}]storedChannels{
key: {
@ -83,7 +87,7 @@ func newSyncHandler() (*syncHandler, string) {
func Test_watchResource(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
syncMock := newMockSync()
syncStore.syncBuilder = &syncBuilderMock{
mock: syncMock,
@ -92,7 +96,7 @@ func Test_watchResource(t *testing.T) {
target := "test-target"
syncHandler, key := newSyncHandler()
syncStore.syncHandlers[target] = syncHandler
syncStore.multiplexers[target] = syncHandler
go syncStore.watchResource(target)
@ -128,17 +132,17 @@ func Test_watchResource(t *testing.T) {
// no context cancellation should have occurred, and there should still be registered sync sub
syncStore.mu.Lock()
if len(syncHandler.subs) != 1 {
t.Error("incorrect number of subs in syncHandler", syncHandler.subs)
t.Error("incorrect number of subs in multiplexer", syncHandler.subs)
}
syncStore.mu.Unlock()
// cancellation of context will result in the syncHandler being deleted
// cancellation of context will result in the multiplexer being deleted
cancel()
// allow for the goroutine to catch the lock first
time.Sleep(1 * time.Second)
syncStore.mu.Lock()
if syncStore.syncHandlers[target] != nil {
t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs)
if syncStore.multiplexers[target] != nil {
t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs)
}
syncStore.mu.Unlock()
}
@ -146,7 +150,7 @@ func Test_watchResource(t *testing.T) {
func Test_watchResource_initFail(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
syncMock := newMockSync()
// return an error on startup
@ -158,7 +162,7 @@ func Test_watchResource_initFail(t *testing.T) {
target := "test-target"
syncHandler, key := newSyncHandler()
syncStore.syncHandlers[target] = syncHandler
syncStore.multiplexers[target] = syncHandler
go syncStore.watchResource(target)
@ -175,8 +179,8 @@ func Test_watchResource_initFail(t *testing.T) {
// this should then close the internal context and the watcher should be removed
time.Sleep(1 * time.Second)
syncStore.mu.Lock()
if syncStore.syncHandlers[target] != nil {
t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs)
if syncStore.multiplexers[target] != nil {
t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs)
}
syncStore.mu.Unlock()
}
@ -184,7 +188,7 @@ func Test_watchResource_initFail(t *testing.T) {
func Test_watchResource_SyncFromURIFail(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
syncMock := newMockSync()
// return an error on startup
@ -197,7 +201,7 @@ func Test_watchResource_SyncFromURIFail(t *testing.T) {
target := "test-target"
syncHandler, key := newSyncHandler()
syncStore.syncHandlers[target] = syncHandler
syncStore.multiplexers[target] = syncHandler
go syncStore.watchResource(target)
@ -214,8 +218,8 @@ func Test_watchResource_SyncFromURIFail(t *testing.T) {
// this should then close the internal context and the watcher should be removed
time.Sleep(1 * time.Second)
syncStore.mu.Lock()
if syncStore.syncHandlers[target] != nil {
t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs)
if syncStore.multiplexers[target] != nil {
t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs)
}
syncStore.mu.Unlock()
}
@ -223,7 +227,7 @@ func Test_watchResource_SyncFromURIFail(t *testing.T) {
func Test_watchResource_SyncErrorOnClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
syncMock := newMockSync()
// return an error on startup
@ -235,7 +239,7 @@ func Test_watchResource_SyncErrorOnClose(t *testing.T) {
target := "test-target"
syncHandler, key := newSyncHandler()
syncStore.syncHandlers[target] = syncHandler
syncStore.multiplexers[target] = syncHandler
go syncStore.watchResource(target)
cancel()
@ -252,8 +256,8 @@ func Test_watchResource_SyncErrorOnClose(t *testing.T) {
// this should then close the internal context and the watcher should be removed
time.Sleep(1 * time.Second)
syncStore.mu.Lock()
if syncStore.syncHandlers[target] != nil {
t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs)
if syncStore.multiplexers[target] != nil {
t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs)
}
syncStore.mu.Unlock()
}
@ -261,7 +265,7 @@ func Test_watchResource_SyncErrorOnClose(t *testing.T) {
func Test_watchResource_SyncHandlerDoesNotExist(_ *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
syncMock := newMockSync()
// return an error on startup
@ -279,7 +283,7 @@ func Test_watchResource_SyncHandlerDoesNotExist(_ *testing.T) {
func Test_watchResource_Cleanup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
syncMock := newMockSync()
// return an error on startup
@ -297,7 +301,7 @@ func Test_watchResource_Cleanup(t *testing.T) {
doneChan <- struct{}{}
}
syncStore.mu.Lock()
syncStore.syncHandlers[target] = syncHandler
syncStore.multiplexers[target] = syncHandler
syncStore.mu.Unlock()
go func() {
syncStore.cleanup()
@ -307,7 +311,7 @@ func Test_watchResource_Cleanup(t *testing.T) {
case <-doneChan:
return
case <-time.After(10 * time.Second):
t.Error("syncHandlers not being cleaned up, timed out after 10 seconds")
t.Error("multiplexers not being cleaned up, timed out after 10 seconds")
}
}
@ -351,7 +355,7 @@ func Test_FetchAllFlags(t *testing.T) {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
syncMock := newMockSync()
syncMock.resyncData = tt.mockData
syncMock.resyncError = tt.mockError
@ -365,7 +369,7 @@ func Test_FetchAllFlags(t *testing.T) {
syncHandler.syncRef = syncMock
}
if tt.setHandler {
syncStore.syncHandlers[target] = syncHandler
syncStore.multiplexers[target] = syncHandler
}
data, err := syncStore.FetchAllFlags(ctx, key, target)
@ -406,7 +410,7 @@ func Test_registerSubscriptionResyncPath(t *testing.T) {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
syncMock := newMockSync()
syncMock.resyncData = tt.data
@ -420,7 +424,7 @@ func Test_registerSubscriptionResyncPath(t *testing.T) {
syncHandler, _ := newSyncHandler()
syncHandler.syncRef = syncMock
key := struct{}{}
syncStore.syncHandlers[target] = syncHandler
syncStore.multiplexers[target] = syncHandler
dataChan := make(chan isync.DataSync, 1)
errChan := make(chan error, 1)
@ -445,7 +449,7 @@ func Test_registerSubscriptionResyncPath(t *testing.T) {
func Test_syncMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
syncMock := newMockSync()
syncStore.syncBuilder = &syncBuilderMock{
mock: syncMock,
@ -459,7 +463,7 @@ func Test_syncMetrics(t *testing.T) {
target := "test-target"
syncHandler, _ := newSyncHandler()
syncStore.syncHandlers[target] = syncHandler
syncStore.multiplexers[target] = syncHandler
subs = syncStore.GetActiveSubscriptionsInt64()
if subs != 1 {

View File

@ -0,0 +1,45 @@
package subscriptions
import (
"context"
"fmt"
"sync"
"github.com/open-feature/flagd/core/pkg/logger"
sourceSync "github.com/open-feature/flagd/core/pkg/sync"
)
// multiplexer distributes updates for a target to all of its subscribers
type multiplexer struct {
subs map[interface{}]storedChannels
dataSync chan sourceSync.DataSync
cancelFunc context.CancelFunc
syncRef sourceSync.ISync
mu *sync.RWMutex
}
func (h *multiplexer) broadcastError(logger *logger.Logger, err error) {
h.mu.RLock()
defer h.mu.RUnlock()
for k, ec := range h.subs {
select {
case ec.errChan <- err:
continue
default:
logger.Error(fmt.Sprintf("unable to write error to channel for key %p", k))
}
}
}
func (h *multiplexer) broadcastData(logger *logger.Logger, data sourceSync.DataSync) {
h.mu.RLock()
defer h.mu.RUnlock()
for k, ds := range h.subs {
select {
case ds.dataSync <- data:
continue
default:
logger.Error(fmt.Sprintf("unable to write data to channel for key %p", k))
}
}
}

View File

@ -1,26 +0,0 @@
package store
import (
"context"
isync "github.com/open-feature/flagd/core/pkg/sync"
)
// ISyncStore defines the interface for the sync store
type ISyncStore interface {
FetchAllFlags(
ctx context.Context,
key interface{},
target string,
) (isync.DataSync, error)
RegisterSubscription(
ctx context.Context,
target string,
key interface{},
dataSync chan isync.DataSync,
errChan chan error,
)
// metrics hooks
GetActiveSubscriptionsInt64() int64
}

View File

@ -0,0 +1,107 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: pkg/sync/builder/syncbuilder.go
// Package middlewaremocksyncbuildermock is a generated GoMock package.
package middlewaremocksyncbuildermock
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
logger "github.com/open-feature/flagd/core/pkg/logger"
sync "github.com/open-feature/flagd/core/pkg/sync"
dynamic "k8s.io/client-go/dynamic"
client "sigs.k8s.io/controller-runtime/pkg/client"
)
// MockISyncBuilder is a mock of ISyncBuilder interface.
type MockISyncBuilder struct {
ctrl *gomock.Controller
recorder *MockISyncBuilderMockRecorder
}
// MockISyncBuilderMockRecorder is the mock recorder for MockISyncBuilder.
type MockISyncBuilderMockRecorder struct {
mock *MockISyncBuilder
}
// NewMockISyncBuilder creates a new mock instance.
func NewMockISyncBuilder(ctrl *gomock.Controller) *MockISyncBuilder {
mock := &MockISyncBuilder{ctrl: ctrl}
mock.recorder = &MockISyncBuilderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockISyncBuilder) EXPECT() *MockISyncBuilderMockRecorder {
return m.recorder
}
// SyncFromURI mocks base method.
func (m *MockISyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SyncFromURI", uri, logger)
ret0, _ := ret[0].(sync.ISync)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SyncFromURI indicates an expected call of SyncFromURI.
func (mr *MockISyncBuilderMockRecorder) SyncFromURI(uri, logger interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncFromURI", reflect.TypeOf((*MockISyncBuilder)(nil).SyncFromURI), uri, logger)
}
// SyncsFromConfig mocks base method.
func (m *MockISyncBuilder) SyncsFromConfig(sourceConfig []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SyncsFromConfig", sourceConfig, logger)
ret0, _ := ret[0].([]sync.ISync)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SyncsFromConfig indicates an expected call of SyncsFromConfig.
func (mr *MockISyncBuilderMockRecorder) SyncsFromConfig(sourceConfig, logger interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncsFromConfig", reflect.TypeOf((*MockISyncBuilder)(nil).SyncsFromConfig), sourceConfig, logger)
}
// MockIK8sClientBuilder is a mock of IK8sClientBuilder interface.
type MockIK8sClientBuilder struct {
ctrl *gomock.Controller
recorder *MockIK8sClientBuilderMockRecorder
}
// MockIK8sClientBuilderMockRecorder is the mock recorder for MockIK8sClientBuilder.
type MockIK8sClientBuilderMockRecorder struct {
mock *MockIK8sClientBuilder
}
// NewMockIK8sClientBuilder creates a new mock instance.
func NewMockIK8sClientBuilder(ctrl *gomock.Controller) *MockIK8sClientBuilder {
mock := &MockIK8sClientBuilder{ctrl: ctrl}
mock.recorder = &MockIK8sClientBuilderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockIK8sClientBuilder) EXPECT() *MockIK8sClientBuilderMockRecorder {
return m.recorder
}
// GetK8sClients mocks base method.
func (m *MockIK8sClientBuilder) GetK8sClients() (client.Reader, dynamic.Interface, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetK8sClients")
ret0, _ := ret[0].(client.Reader)
ret1, _ := ret[1].(dynamic.Interface)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// GetK8sClients indicates an expected call of GetK8sClients.
func (mr *MockIK8sClientBuilderMockRecorder) GetK8sClients() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetK8sClients", reflect.TypeOf((*MockIK8sClientBuilder)(nil).GetK8sClients))
}

View File

@ -0,0 +1,218 @@
package builder
import (
"fmt"
"net/http"
"os"
"regexp"
msync "sync"
"time"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/flagd/core/pkg/sync/file"
"github.com/open-feature/flagd/core/pkg/sync/grpc"
"github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
httpSync "github.com/open-feature/flagd/core/pkg/sync/http"
"github.com/open-feature/flagd/core/pkg/sync/kubernetes"
"github.com/robfig/cron"
"go.uber.org/zap"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
syncProviderFile = "file"
syncProviderGrpc = "grpc"
syncProviderKubernetes = "kubernetes"
syncProviderHTTP = "http"
)
var (
regCrd *regexp.Regexp
regURL *regexp.Regexp
regGRPC *regexp.Regexp
regGRPCSecure *regexp.Regexp
regFile *regexp.Regexp
)
func init() {
regCrd = regexp.MustCompile("^core.openfeature.dev/")
regURL = regexp.MustCompile("^https?://")
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure)
regFile = regexp.MustCompile("^file:")
}
type ISyncBuilder interface {
SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error)
SyncsFromConfig(sourceConfig []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error)
}
type SyncBuilder struct {
k8sClientBuilder IK8sClientBuilder
}
func NewSyncBuilder() *SyncBuilder {
return &SyncBuilder{
k8sClientBuilder: &KubernetesClientBuilder{},
}
}
func (sb *SyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error) {
switch uriB := []byte(uri); {
// filepath may be used for debugging, not recommended in deployment
case regFile.Match(uriB):
return sb.newFile(uri, logger), nil
case regCrd.Match(uriB):
return sb.newK8s(uri, logger)
}
return nil, fmt.Errorf("unrecognized URI: %s", uri)
}
func (sb *SyncBuilder) SyncsFromConfig(sourceConfigs []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error) {
syncImpls := make([]sync.ISync, len(sourceConfigs))
for i, syncProvider := range sourceConfigs {
syncImpl, err := sb.syncFromConfig(syncProvider, logger)
if err != nil {
return nil, fmt.Errorf("could not create sync provider: %w", err)
}
syncImpls[i] = syncImpl
}
return syncImpls, nil
}
func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *logger.Logger) (sync.ISync, error) {
switch sourceConfig.Provider {
case syncProviderFile:
logger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", sourceConfig.URI))
return sb.newFile(sourceConfig.URI, logger), nil
case syncProviderKubernetes:
logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", sourceConfig.URI))
return sb.newK8s(sourceConfig.URI, logger)
case syncProviderHTTP:
logger.Debug(fmt.Sprintf("using remote sync-provider for: %s", sourceConfig.URI))
return sb.newHTTP(sourceConfig, logger), nil
case syncProviderGrpc:
logger.Debug(fmt.Sprintf("using grpc sync-provider for: %s", sourceConfig.URI))
return sb.newGRPC(sourceConfig, logger), nil
default:
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'",
sourceConfig.Provider, syncProviderFile, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
}
}
func (sb *SyncBuilder) newFile(uri string, logger *logger.Logger) *file.Sync {
return &file.Sync{
URI: regFile.ReplaceAllString(uri, ""),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "filepath"),
),
Mux: &msync.RWMutex{},
}
}
func (sb *SyncBuilder) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
reader, dynamicClient, err := sb.k8sClientBuilder.GetK8sClients()
if err != nil {
return nil, fmt.Errorf("error creating kubernetes clients: %w", err)
}
return kubernetes.NewK8sSync(
logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "kubernetes"),
),
regCrd.ReplaceAllString(uri, ""),
reader,
dynamicClient,
), nil
}
func (sb *SyncBuilder) newHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync {
// Default to 5 seconds
var interval uint32 = 5
if config.Interval != 0 {
interval = config.Interval
}
return &httpSync.Sync{
URI: config.URI,
Client: &http.Client{
Timeout: time.Second * 10,
},
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "remote"),
),
BearerToken: config.BearerToken,
Interval: interval,
Cron: cron.New(),
}
}
func (sb *SyncBuilder) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync {
return &grpc.Sync{
URI: config.URI,
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
CredentialBuilder: &credentials.CredentialBuilder{},
CertPath: config.CertPath,
ProviderID: config.ProviderID,
Secure: config.TLS,
Selector: config.Selector,
}
}
type IK8sClientBuilder interface {
GetK8sClients() (client.Reader, dynamic.Interface, error)
}
type KubernetesClientBuilder struct{}
func (kcb KubernetesClientBuilder) GetK8sClients() (client.Reader, dynamic.Interface, error) {
clusterConfig, err := k8sClusterConfig()
if err != nil {
return nil, nil, err
}
readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return nil, nil, fmt.Errorf("unable to create readClient: %w", err)
}
dynamicClient, err := dynamic.NewForConfig(clusterConfig)
if err != nil {
return nil, nil, fmt.Errorf("unable to create dynamicClient: %w", err)
}
return readClient, dynamicClient, nil
}
// k8sClusterConfig build K8s connection config based available configurations
func k8sClusterConfig() (*rest.Config, error) {
cfg := os.Getenv("KUBECONFIG")
var clusterConfig *rest.Config
var err error
if cfg != "" {
clusterConfig, err = clientcmd.BuildConfigFromFlags("", cfg)
if err != nil {
return nil, fmt.Errorf("error building cluster config from flags: %w", err)
}
} else {
clusterConfig, err = rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("error fetching cluster config: %w", err)
}
}
return clusterConfig, nil
}

View File

@ -0,0 +1,240 @@
package builder
import (
"errors"
"testing"
"github.com/golang/mock/gomock"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
buildermock "github.com/open-feature/flagd/core/pkg/sync/builder/mock"
"github.com/open-feature/flagd/core/pkg/sync/file"
"github.com/open-feature/flagd/core/pkg/sync/grpc"
"github.com/open-feature/flagd/core/pkg/sync/http"
"github.com/open-feature/flagd/core/pkg/sync/kubernetes"
"github.com/stretchr/testify/require"
)
func TestSyncBuilder_SyncFromURI(t *testing.T) {
type args struct {
uri string
logger *logger.Logger
}
tests := []struct {
name string
args args
injectFunc func(builder *SyncBuilder)
want sync.ISync
wantErr bool
}{
{
name: "kubernetes sync",
args: args{
uri: "core.openfeature.dev/ff-config",
logger: logger.NewLogger(nil, false),
},
injectFunc: func(builder *SyncBuilder) {
ctrl := gomock.NewController(t)
mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl)
mockClientBuilder.EXPECT().GetK8sClients().Times(1).Return(nil, nil, nil)
builder.k8sClientBuilder = mockClientBuilder
},
want: &kubernetes.Sync{},
wantErr: false,
},
{
name: "kubernetes sync - error when retrieving config",
args: args{
uri: "core.openfeature.dev/ff-config",
logger: logger.NewLogger(nil, false),
},
injectFunc: func(builder *SyncBuilder) {
ctrl := gomock.NewController(t)
mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl)
mockClientBuilder.EXPECT().GetK8sClients().Times(1).Return(nil, nil, errors.New("oops"))
builder.k8sClientBuilder = mockClientBuilder
},
want: nil,
wantErr: true,
},
{
name: "file sync",
args: args{
uri: "file:my-file",
logger: logger.NewLogger(nil, false),
},
want: &file.Sync{},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sb := NewSyncBuilder()
if tt.injectFunc != nil {
tt.injectFunc(sb)
}
got, err := sb.SyncFromURI(tt.args.uri, tt.args.logger)
if tt.wantErr {
require.NotNil(t, err)
require.Nil(t, got)
} else {
require.Nil(t, err)
require.IsType(t, tt.want, got)
}
})
}
}
func Test_k8sClusterConfig(t *testing.T) {
t.Run("Cannot find KUBECONFIG file", func(tt *testing.T) {
tt.Setenv("KUBECONFIG", "")
_, err := k8sClusterConfig()
if err == nil {
tt.Error("Expected error but got none")
}
})
t.Run("KUBECONFIG file not existing", func(tt *testing.T) {
tt.Setenv("KUBECONFIG", "value")
_, err := k8sClusterConfig()
if err == nil {
tt.Error("Expected error but got none")
}
})
t.Run("Default REST Config and missing svc account", func(tt *testing.T) {
tt.Setenv("KUBERNETES_SERVICE_HOST", "127.0.0.1")
tt.Setenv("KUBERNETES_SERVICE_PORT", "8080")
_, err := k8sClusterConfig()
if err == nil {
tt.Error("Expected error but got none")
}
})
}
func Test_SyncsFromFromConfig(t *testing.T) {
lg := logger.NewLogger(nil, false)
type args struct {
logger *logger.Logger
sources []sync.SourceConfig
}
tests := []struct {
name string
args args
injectFunc func(builder *SyncBuilder)
wantSyncs []sync.ISync
wantErr bool
}{
{
name: "Empty",
args: args{
logger: lg,
sources: []sync.SourceConfig{},
},
wantSyncs: nil,
wantErr: false,
},
{
name: "Error",
args: args{
logger: lg,
sources: []sync.SourceConfig{
{
URI: "fake",
Provider: "disk",
},
},
},
wantSyncs: nil,
wantErr: true,
},
{
name: "single",
args: args{
logger: lg,
sources: []sync.SourceConfig{
{
URI: "grpc://host:port",
Provider: syncProviderGrpc,
ProviderID: "myapp",
CertPath: "/tmp/ca.cert",
Selector: "source=database",
},
},
},
wantSyncs: []sync.ISync{
&grpc.Sync{},
},
wantErr: false,
},
{
name: "combined",
injectFunc: func(builder *SyncBuilder) {
ctrl := gomock.NewController(t)
mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl)
mockClientBuilder.EXPECT().GetK8sClients().Times(1).Return(nil, nil, nil)
builder.k8sClientBuilder = mockClientBuilder
},
args: args{
logger: lg,
sources: []sync.SourceConfig{
{
URI: "grpc://host:port",
Provider: syncProviderGrpc,
ProviderID: "myapp",
CertPath: "/tmp/ca.cert",
Selector: "source=database",
},
{
URI: "https://host:port",
Provider: syncProviderHTTP,
BearerToken: "token",
},
{
URI: "/tmp/flags.json",
Provider: syncProviderFile,
},
{
URI: "my-namespace/my-flags",
Provider: syncProviderKubernetes,
},
},
},
wantSyncs: []sync.ISync{
&grpc.Sync{},
&http.Sync{},
&file.Sync{},
&kubernetes.Sync{},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sb := NewSyncBuilder()
if tt.injectFunc != nil {
tt.injectFunc(sb)
}
syncs, err := sb.SyncsFromConfig(tt.args.sources, tt.args.logger)
if (err != nil) != tt.wantErr {
t.Errorf("syncProvidersFromConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
require.Len(t, syncs, len(tt.wantSyncs))
// check if we got the expected sync types
for index, wantType := range tt.wantSyncs {
require.IsType(t, wantType, syncs[index])
}
})
}
}

View File

@ -0,0 +1,68 @@
package builder
import (
"encoding/json"
"errors"
"fmt"
"github.com/open-feature/flagd/core/pkg/sync"
)
// ParseSources parse a json formatted SourceConfig array string and performs validations on the content
func ParseSources(sourcesFlag string) ([]sync.SourceConfig, error) {
syncProvidersParsed := []sync.SourceConfig{}
if err := json.Unmarshal([]byte(sourcesFlag), &syncProvidersParsed); err != nil {
return syncProvidersParsed, fmt.Errorf("error parsing sync providers: %w", err)
}
for _, sp := range syncProvidersParsed {
if sp.URI == "" {
return syncProvidersParsed, errors.New("sync provider argument parse: uri is a required field")
}
if sp.Provider == "" {
return syncProvidersParsed, errors.New("sync provider argument parse: provider is a required field")
}
}
return syncProvidersParsed, nil
}
// ParseSyncProviderURIs uri flag based sync sources to SourceConfig array. Replaces uri prefixes where necessary to
// derive SourceConfig
func ParseSyncProviderURIs(uris []string) ([]sync.SourceConfig, error) {
syncProvidersParsed := []sync.SourceConfig{}
for _, uri := range uris {
switch uriB := []byte(uri); {
case regFile.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
URI: regFile.ReplaceAllString(uri, ""),
Provider: syncProviderFile,
})
case regCrd.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
URI: regCrd.ReplaceAllString(uri, ""),
Provider: syncProviderKubernetes,
})
case regURL.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
URI: uri,
Provider: syncProviderHTTP,
})
case regGRPC.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
URI: regGRPC.ReplaceAllString(uri, ""),
Provider: syncProviderGrpc,
})
case regGRPCSecure.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
URI: regGRPCSecure.ReplaceAllString(uri, ""),
Provider: syncProviderGrpc,
TLS: true,
})
default:
return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+
"'http(s)://', 'grpc(s)://', or 'core.openfeature.dev'", uri)
}
}
return syncProvidersParsed, nil
}

View File

@ -0,0 +1,210 @@
package builder
import (
"reflect"
"testing"
"github.com/open-feature/flagd/core/pkg/sync"
)
func TestParseSource(t *testing.T) {
test := map[string]struct {
in string
expectErr bool
out []sync.SourceConfig
}{
"simple": {
in: "[{\"uri\":\"config/samples/example_flags.json\",\"provider\":\"file\"}]",
expectErr: false,
out: []sync.SourceConfig{
{
URI: "config/samples/example_flags.json",
Provider: syncProviderFile,
},
},
},
"multiple-syncs": {
in: `[
{"uri":"config/samples/example_flags.json","provider":"file"},
{"uri":"http://test.com","provider":"http","bearerToken":":)"},
{"uri":"host:port","provider":"grpc"},
{"uri":"default/my-crd","provider":"kubernetes"}
]`,
expectErr: false,
out: []sync.SourceConfig{
{
URI: "config/samples/example_flags.json",
Provider: syncProviderFile,
},
{
URI: "http://test.com",
Provider: syncProviderHTTP,
BearerToken: ":)",
},
{
URI: "host:port",
Provider: syncProviderGrpc,
},
{
URI: "default/my-crd",
Provider: syncProviderKubernetes,
},
},
},
"multiple-syncs-with-options": {
in: `[{"uri":"config/samples/example_flags.json","provider":"file"},
{"uri":"http://my-flag-source.json","provider":"http","bearerToken":"bearer-dji34ld2l"},
{"uri":"https://secure-remote","provider":"http","bearerToken":"bearer-dji34ld2l"},
{"uri":"http://site.com","provider":"http","interval":77 },
{"uri":"default/my-flag-config","provider":"kubernetes"},
{"uri":"grpc-source:8080","provider":"grpc"},
{"uri":"my-flag-source:8080","provider":"grpc", "tls":true, "certPath": "/certs/ca.cert", "providerID": "flagd-weatherapp-sidecar", "selector": "source=database,app=weatherapp"}]
`,
expectErr: false,
out: []sync.SourceConfig{
{
URI: "config/samples/example_flags.json",
Provider: syncProviderFile,
},
{
URI: "http://my-flag-source.json",
Provider: syncProviderHTTP,
BearerToken: "bearer-dji34ld2l",
},
{
URI: "https://secure-remote",
Provider: syncProviderHTTP,
BearerToken: "bearer-dji34ld2l",
},
{
URI: "http://site.com",
Provider: syncProviderHTTP,
Interval: 77,
},
{
URI: "default/my-flag-config",
Provider: syncProviderKubernetes,
},
{
URI: "grpc-source:8080",
Provider: syncProviderGrpc,
},
{
URI: "my-flag-source:8080",
Provider: syncProviderGrpc,
TLS: true,
CertPath: "/certs/ca.cert",
ProviderID: "flagd-weatherapp-sidecar",
Selector: "source=database,app=weatherapp",
},
},
},
"empty": {
in: `[]`,
expectErr: false,
out: []sync.SourceConfig{},
},
"parse-failure": {
in: ``,
expectErr: true,
out: []sync.SourceConfig{},
},
}
for name, tt := range test {
t.Run(name, func(t *testing.T) {
out, err := ParseSources(tt.in)
if tt.expectErr {
if err == nil {
t.Error("expected error, got none")
}
} else if err != nil {
t.Errorf("did not expect error: %s", err.Error())
}
if !reflect.DeepEqual(out, tt.out) {
t.Errorf("unexpected output, expected %v, got %v", tt.out, out)
}
})
}
}
func TestParseSyncProviderURIs(t *testing.T) {
test := map[string]struct {
in []string
expectErr bool
out []sync.SourceConfig
}{
"simple": {
in: []string{
"file:my-file.json",
},
expectErr: false,
out: []sync.SourceConfig{
{
URI: "my-file.json",
Provider: "file",
},
},
},
"multiple-uris": {
in: []string{
"file:my-file.json",
"https://test.com",
"grpc://host:port",
"grpcs://secure-grpc",
"core.openfeature.dev/default/my-crd",
},
expectErr: false,
out: []sync.SourceConfig{
{
URI: "my-file.json",
Provider: "file",
},
{
URI: "https://test.com",
Provider: "http",
},
{
URI: "host:port",
Provider: "grpc",
TLS: false,
},
{
URI: "secure-grpc",
Provider: "grpc",
TLS: true,
},
{
URI: "default/my-crd",
Provider: "kubernetes",
},
},
},
"empty": {
in: []string{},
expectErr: false,
out: []sync.SourceConfig{},
},
"parse-failure": {
in: []string{"care.openfeature.dev/will/fail"},
expectErr: true,
out: []sync.SourceConfig{},
},
}
for name, tt := range test {
t.Run(name, func(t *testing.T) {
out, err := ParseSyncProviderURIs(tt.in)
if tt.expectErr {
if err == nil {
t.Error("expected error, got none")
}
} else if err != nil {
t.Errorf("did not expect error: %s", err.Error())
}
if !reflect.DeepEqual(out, tt.out) {
t.Errorf("unexpected output, expected %v, got %v", tt.out, out)
}
})
}
}

View File

@ -25,6 +25,14 @@ type Sync struct {
Mux *msync.RWMutex
}
func NewFileSync(uri string, logger *logger.Logger) *Sync {
return &Sync{
URI: uri,
Logger: logger,
Mux: &msync.RWMutex{},
}
}
// default state is used to prevent EOF errors when handling filepath delete events + empty files
const defaultState = "{}"

View File

@ -160,7 +160,7 @@ func (mr *MockFlagSyncServiceClientResponseMockRecorder) Recv() *gomock.Call {
}
// RecvMsg mocks base method.
func (m_2 *MockFlagSyncServiceClientResponse) RecvMsg(m interface{}) error {
func (m_2 *MockFlagSyncServiceClientResponse) RecvMsg(m any) error {
m_2.ctrl.T.Helper()
ret := m_2.ctrl.Call(m_2, "RecvMsg", m)
ret0, _ := ret[0].(error)
@ -174,7 +174,7 @@ func (mr *MockFlagSyncServiceClientResponseMockRecorder) RecvMsg(m interface{})
}
// SendMsg mocks base method.
func (m_2 *MockFlagSyncServiceClientResponse) SendMsg(m interface{}) error {
func (m_2 *MockFlagSyncServiceClientResponse) SendMsg(m any) error {
m_2.ctrl.T.Helper()
ret := m_2.ctrl.Call(m_2, "SendMsg", m)
ret0, _ := ret[0].(error)

View File

@ -1,6 +1,8 @@
package sync
import "context"
import (
"context"
)
type Type int
@ -57,3 +59,16 @@ type DataSync struct {
Source string
Type
}
// SourceConfig is configuration option for flagd. This maps to startup parameter sources
type SourceConfig struct {
URI string `json:"uri"`
Provider string `json:"provider"`
BearerToken string `json:"bearerToken,omitempty"`
CertPath string `json:"certPath,omitempty"`
TLS bool `json:"tls,omitempty"`
ProviderID string `json:"providerID,omitempty"`
Selector string `json:"selector,omitempty"`
Interval uint32 `json:"interval,omitempty"`
}

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
msync "sync"
"time"
@ -17,9 +16,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
)
@ -29,6 +26,8 @@ var (
featureFlagResource = v1beta1.GroupVersion.WithResource("featureflags")
)
type SyncOption func(s *Sync)
type Sync struct {
URI string
@ -45,34 +44,16 @@ func NewK8sSync(
logger *logger.Logger,
uri string,
reader client.Reader,
dynamic dynamic.Interface,
dynamicClient dynamic.Interface,
) *Sync {
return &Sync{
logger: logger,
URI: uri,
readClient: reader,
dynamicClient: dynamic,
dynamicClient: dynamicClient,
}
}
func GetClients() (client.Reader, dynamic.Interface, error) {
clusterConfig, err := k8sClusterConfig()
if err != nil {
return nil, nil, err
}
readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return nil, nil, fmt.Errorf("unable to create readClient: %w", err)
}
dynamicClient, err := dynamic.NewForConfig(clusterConfig)
if err != nil {
return nil, nil, fmt.Errorf("unable to create dynamicClient: %w", err)
}
return readClient, dynamicClient, nil
}
func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
fetch, err := k.fetch(ctx)
if err != nil {
@ -325,32 +306,6 @@ func parseURI(uri string) (string, string, error) {
return s[0], s[1], nil
}
// k8sClusterConfig build K8s connection config based available configurations
func k8sClusterConfig() (*rest.Config, error) {
cfg := os.Getenv("KUBECONFIG")
var clusterConfig *rest.Config
var err error
if cfg != "" {
clusterConfig, err = clientcmd.BuildConfigFromFlags("", cfg)
if err != nil {
err = fmt.Errorf("error building cluster config from flags: %w", err)
}
} else {
clusterConfig, err = rest.InClusterConfig()
if err != nil {
err = fmt.Errorf("error fetch cluster config: %w", err)
}
}
if err != nil {
return nil, err
}
return clusterConfig, nil
}
func marshallFeatureFlagSpec(ff *v1beta1.FeatureFlag) (string, error) {
b, err := json.Marshal(ff.Spec.FlagSpec)
if err != nil {

View File

@ -786,31 +786,6 @@ func TestNotify(t *testing.T) {
}
}
func Test_k8sClusterConfig(t *testing.T) {
t.Run("Cannot find KUBECONFIG file", func(tt *testing.T) {
tt.Setenv("KUBECONFIG", "")
_, err := k8sClusterConfig()
if err == nil {
tt.Error("Expected error but got none")
}
})
t.Run("KUBECONFIG file not existing", func(tt *testing.T) {
tt.Setenv("KUBECONFIG", "value")
_, err := k8sClusterConfig()
if err == nil {
tt.Error("Expected error but got none")
}
})
t.Run("Default REST Config and missing svc account", func(tt *testing.T) {
tt.Setenv("KUBERNETES_SERVICE_HOST", "127.0.0.1")
tt.Setenv("KUBERNETES_SERVICE_PORT", "8080")
_, err := k8sClusterConfig()
if err == nil {
tt.Error("Expected error but got none")
}
})
}
func Test_NewK8sSync(t *testing.T) {
l, err := logger.NewZapLogger(zapcore.FatalLevel, "console")
if err != nil {

View File

@ -13,7 +13,7 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/service"
syncServer "github.com/open-feature/flagd/core/pkg/service/sync"
syncStore "github.com/open-feature/flagd/core/pkg/sync-store"
"github.com/open-feature/flagd/core/pkg/subscriptions"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap/zapcore"
@ -70,7 +70,7 @@ var startCmd = &cobra.Command{
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
syncStore := syncStore.NewSyncStore(ctx, logger)
syncStore := subscriptions.NewManager(ctx, logger)
s := syncServer.NewServer(logger, syncStore)
// If --management-port is set use that value. If not and

View File

@ -7,6 +7,8 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/runtime"
"github.com/open-feature/flagd/core/pkg/sync"
syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
@ -140,14 +142,14 @@ var startCmd = &cobra.Command{
docsLinkConfiguration)
}
syncProviders, err := runtime.ParseSyncProviderURIs(viper.GetStringSlice(uriFlagName))
syncProviders, err := syncbuilder.ParseSyncProviderURIs(viper.GetStringSlice(uriFlagName))
if err != nil {
log.Fatal(err)
}
syncProvidersFromConfig := []runtime.SourceConfig{}
syncProvidersFromConfig := []sync.SourceConfig{}
if cfgFile == "" && viper.GetString(sourcesFlagName) != "" {
syncProvidersFromConfig, err = runtime.ParseSources(viper.GetString(sourcesFlagName))
syncProvidersFromConfig, err = syncbuilder.ParseSources(viper.GetString(sourcesFlagName))
if err != nil {
log.Fatal(err)
}