chore: refactoring component structure (#1044)
This PR refactors the core package by restructuring the components responsible for managing the subscriptions, as well as the creation of sync sources. The following changes have been made: - Renamed `sync-store` package to `subscriptions`. This has been done to avoid confusion because we already have a `sync`, and a `store`. package. Within the package, the `SyncStore`. has been renamed to `subscriptions.Manager`, which should reflect its responsibility in a better way. Also, the `syncHandler` has been renamed to `multiplexer`, as this one is responsible for sending updates of a certain target to all subscribers - `syncHandler` was a bit too generic in my opinion. - Moved the `GetSyncSourceFromURI` method to a new package, `sync/builder`, to remove the responsibility of building concrete sync sources from the subscription manager - Moved the logic of retrieving the K8s client config to the sync builder. Previously, this was done in the `runtime` package by calling the respective methods for the config retrieval provided by the `sync/kubernetes` package and then handing that config back to the initialization of the `K8sSync`. Note: This step can potentially be done in a separate PR, if so desired. --------- Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com> Co-authored-by: Giovanni Liva <giovanni.liva@dynatrace.com>
This commit is contained in:
parent
ec5f778286
commit
0c7f78a95f
1
Makefile
1
Makefile
|
|
@ -72,6 +72,7 @@ mockgen: install-mockgen
|
|||
cd core; mockgen -source=pkg/sync/grpc/credentials/builder.go -destination=pkg/sync/grpc/credentials/mock/builder.go -package=credendialsmock
|
||||
cd core; mockgen -source=pkg/eval/ievaluator.go -destination=pkg/eval/mock/ievaluator.go -package=evalmock
|
||||
cd core; mockgen -source=pkg/service/middleware/interface.go -destination=pkg/service/middleware/mock/interface.go -package=middlewaremock
|
||||
cd core; mockgen -source=pkg/sync/builder/syncbuilder.go -destination=pkg/sync/builder/mock/syncbuilder.go -package=middlewaremocksyncbuildermock
|
||||
generate-docs:
|
||||
cd flagd; go run ./cmd/doc/main.go
|
||||
|
||||
|
|
|
|||
|
|
@ -2,13 +2,7 @@ package runtime
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"regexp"
|
||||
msync "sync"
|
||||
"time"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/eval"
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
|
|
@ -16,46 +10,14 @@ import (
|
|||
flageval "github.com/open-feature/flagd/core/pkg/service/flag-evaluation"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"github.com/open-feature/flagd/core/pkg/sync"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/file"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/grpc"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
|
||||
httpSync "github.com/open-feature/flagd/core/pkg/sync/http"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/kubernetes"
|
||||
syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder"
|
||||
"github.com/open-feature/flagd/core/pkg/telemetry"
|
||||
"github.com/robfig/cron"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// from_config is a collection of structures and parsers responsible for deriving flagd runtime
|
||||
|
||||
const (
|
||||
syncProviderFile = "file"
|
||||
syncProviderGrpc = "grpc"
|
||||
syncProviderKubernetes = "kubernetes"
|
||||
syncProviderHTTP = "http"
|
||||
svcName = "flagd"
|
||||
)
|
||||
|
||||
var (
|
||||
regCrd *regexp.Regexp
|
||||
regURL *regexp.Regexp
|
||||
regGRPC *regexp.Regexp
|
||||
regGRPCSecure *regexp.Regexp
|
||||
regFile *regexp.Regexp
|
||||
)
|
||||
|
||||
// SourceConfig is configuration option for flagd. This maps to startup parameter sources
|
||||
type SourceConfig struct {
|
||||
URI string `json:"uri"`
|
||||
Provider string `json:"provider"`
|
||||
|
||||
BearerToken string `json:"bearerToken,omitempty"`
|
||||
CertPath string `json:"certPath,omitempty"`
|
||||
TLS bool `json:"tls,omitempty"`
|
||||
ProviderID string `json:"providerID,omitempty"`
|
||||
Selector string `json:"selector,omitempty"`
|
||||
Interval uint32 `json:"interval,omitempty"`
|
||||
}
|
||||
const svcName = "flagd"
|
||||
|
||||
// Config is the configuration structure derived from startup arguments.
|
||||
type Config struct {
|
||||
|
|
@ -67,18 +29,10 @@ type Config struct {
|
|||
ServicePort uint16
|
||||
ServiceSocketPath string
|
||||
|
||||
SyncProviders []SourceConfig
|
||||
SyncProviders []sync.SourceConfig
|
||||
CORS []string
|
||||
}
|
||||
|
||||
func init() {
|
||||
regCrd = regexp.MustCompile("^core.openfeature.dev/")
|
||||
regURL = regexp.MustCompile("^https?://")
|
||||
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
|
||||
regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure)
|
||||
regFile = regexp.MustCompile("^file:")
|
||||
}
|
||||
|
||||
// FromConfig builds a runtime from startup configurations
|
||||
// nolint: funlen
|
||||
func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime, error) {
|
||||
|
|
@ -176,155 +130,12 @@ func setupJSONEvaluator(logger *logger.Logger, s *store.Flags) *eval.JSONEvaluat
|
|||
}
|
||||
|
||||
// syncProvidersFromConfig is a helper to build ISync implementations from SourceConfig
|
||||
func syncProvidersFromConfig(logger *logger.Logger, sources []SourceConfig) ([]sync.ISync, error) {
|
||||
syncImpls := []sync.ISync{}
|
||||
|
||||
for _, syncProvider := range sources {
|
||||
switch syncProvider.Provider {
|
||||
case syncProviderFile:
|
||||
syncImpls = append(syncImpls, NewFile(syncProvider, logger))
|
||||
logger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", syncProvider.URI))
|
||||
case syncProviderKubernetes:
|
||||
k, err := NewK8s(syncProvider.URI, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
syncImpls = append(syncImpls, k)
|
||||
logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", syncProvider.URI))
|
||||
case syncProviderHTTP:
|
||||
syncImpls = append(syncImpls, NewHTTP(syncProvider, logger))
|
||||
logger.Debug(fmt.Sprintf("using remote sync-provider for: %s", syncProvider.URI))
|
||||
case syncProviderGrpc:
|
||||
syncImpls = append(syncImpls, NewGRPC(syncProvider, logger))
|
||||
logger.Debug(fmt.Sprintf("using grpc sync-provider for: %s", syncProvider.URI))
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'",
|
||||
syncProvider.Provider, syncProviderFile, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
|
||||
}
|
||||
}
|
||||
return syncImpls, nil
|
||||
}
|
||||
|
||||
func NewGRPC(config SourceConfig, logger *logger.Logger) *grpc.Sync {
|
||||
return &grpc.Sync{
|
||||
URI: config.URI,
|
||||
Logger: logger.WithFields(
|
||||
zap.String("component", "sync"),
|
||||
zap.String("sync", "grpc"),
|
||||
),
|
||||
CredentialBuilder: &credentials.CredentialBuilder{},
|
||||
CertPath: config.CertPath,
|
||||
ProviderID: config.ProviderID,
|
||||
Secure: config.TLS,
|
||||
Selector: config.Selector,
|
||||
}
|
||||
}
|
||||
|
||||
func NewHTTP(config SourceConfig, logger *logger.Logger) *httpSync.Sync {
|
||||
// Default to 5 seconds
|
||||
var interval uint32 = 5
|
||||
if config.Interval != 0 {
|
||||
interval = config.Interval
|
||||
}
|
||||
|
||||
return &httpSync.Sync{
|
||||
URI: config.URI,
|
||||
Client: &http.Client{
|
||||
Timeout: time.Second * 10,
|
||||
},
|
||||
Logger: logger.WithFields(
|
||||
zap.String("component", "sync"),
|
||||
zap.String("sync", "remote"),
|
||||
),
|
||||
BearerToken: config.BearerToken,
|
||||
Interval: interval,
|
||||
Cron: cron.New(),
|
||||
}
|
||||
}
|
||||
|
||||
func NewK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
|
||||
reader, dynamic, err := kubernetes.GetClients()
|
||||
func syncProvidersFromConfig(logger *logger.Logger, sources []sync.SourceConfig) ([]sync.ISync, error) {
|
||||
builder := syncbuilder.NewSyncBuilder()
|
||||
syncs, err := builder.SyncsFromConfig(sources, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating kubernetes clients: %w", err)
|
||||
return nil, fmt.Errorf("could not create sync sources from config: %w", err)
|
||||
}
|
||||
return kubernetes.NewK8sSync(
|
||||
logger.WithFields(
|
||||
zap.String("component", "sync"),
|
||||
zap.String("sync", "kubernetes"),
|
||||
),
|
||||
regCrd.ReplaceAllString(uri, ""),
|
||||
reader,
|
||||
dynamic,
|
||||
), nil
|
||||
}
|
||||
|
||||
func NewFile(config SourceConfig, logger *logger.Logger) *file.Sync {
|
||||
return &file.Sync{
|
||||
URI: config.URI,
|
||||
Logger: logger.WithFields(
|
||||
zap.String("component", "sync"),
|
||||
zap.String("sync", "filepath"),
|
||||
),
|
||||
Mux: &msync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// ParseSources parse a json formatted SourceConfig array string and performs validations on the content
|
||||
func ParseSources(sourcesFlag string) ([]SourceConfig, error) {
|
||||
syncProvidersParsed := []SourceConfig{}
|
||||
|
||||
if err := json.Unmarshal([]byte(sourcesFlag), &syncProvidersParsed); err != nil {
|
||||
return syncProvidersParsed, fmt.Errorf("error parsing sync providers: %w", err)
|
||||
}
|
||||
for _, sp := range syncProvidersParsed {
|
||||
if sp.URI == "" {
|
||||
return syncProvidersParsed, errors.New("sync provider argument parse: uri is a required field")
|
||||
}
|
||||
if sp.Provider == "" {
|
||||
return syncProvidersParsed, errors.New("sync provider argument parse: provider is a required field")
|
||||
}
|
||||
}
|
||||
return syncProvidersParsed, nil
|
||||
}
|
||||
|
||||
// ParseSyncProviderURIs uri flag based sync sources to SourceConfig array. Replaces uri prefixes where necessary to
|
||||
// derive SourceConfig
|
||||
func ParseSyncProviderURIs(uris []string) ([]SourceConfig, error) {
|
||||
syncProvidersParsed := []SourceConfig{}
|
||||
|
||||
for _, uri := range uris {
|
||||
switch uriB := []byte(uri); {
|
||||
case regFile.Match(uriB):
|
||||
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
|
||||
URI: regFile.ReplaceAllString(uri, ""),
|
||||
Provider: syncProviderFile,
|
||||
})
|
||||
case regCrd.Match(uriB):
|
||||
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
|
||||
URI: regCrd.ReplaceAllString(uri, ""),
|
||||
Provider: syncProviderKubernetes,
|
||||
})
|
||||
case regURL.Match(uriB):
|
||||
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
|
||||
URI: uri,
|
||||
Provider: syncProviderHTTP,
|
||||
})
|
||||
case regGRPC.Match(uriB):
|
||||
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
|
||||
URI: regGRPC.ReplaceAllString(uri, ""),
|
||||
Provider: syncProviderGrpc,
|
||||
})
|
||||
case regGRPCSecure.Match(uriB):
|
||||
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
|
||||
URI: regGRPCSecure.ReplaceAllString(uri, ""),
|
||||
Provider: syncProviderGrpc,
|
||||
TLS: true,
|
||||
})
|
||||
default:
|
||||
return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+
|
||||
"'http(s)://', 'grpc(s)://', or 'core.openfeature.dev'", uri)
|
||||
}
|
||||
}
|
||||
return syncProvidersParsed, nil
|
||||
|
||||
return syncs, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package runtime
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
|
|
@ -9,304 +8,6 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParseSource(t *testing.T) {
|
||||
test := map[string]struct {
|
||||
in string
|
||||
expectErr bool
|
||||
out []SourceConfig
|
||||
}{
|
||||
"simple": {
|
||||
in: "[{\"uri\":\"config/samples/example_flags.json\",\"provider\":\"file\"}]",
|
||||
expectErr: false,
|
||||
out: []SourceConfig{
|
||||
{
|
||||
URI: "config/samples/example_flags.json",
|
||||
Provider: syncProviderFile,
|
||||
},
|
||||
},
|
||||
},
|
||||
"multiple-syncs": {
|
||||
in: `[
|
||||
{"uri":"config/samples/example_flags.json","provider":"file"},
|
||||
{"uri":"http://test.com","provider":"http","bearerToken":":)"},
|
||||
{"uri":"host:port","provider":"grpc"},
|
||||
{"uri":"default/my-crd","provider":"kubernetes"}
|
||||
]`,
|
||||
expectErr: false,
|
||||
out: []SourceConfig{
|
||||
{
|
||||
URI: "config/samples/example_flags.json",
|
||||
Provider: syncProviderFile,
|
||||
},
|
||||
{
|
||||
URI: "http://test.com",
|
||||
Provider: syncProviderHTTP,
|
||||
BearerToken: ":)",
|
||||
},
|
||||
{
|
||||
URI: "host:port",
|
||||
Provider: syncProviderGrpc,
|
||||
},
|
||||
{
|
||||
URI: "default/my-crd",
|
||||
Provider: syncProviderKubernetes,
|
||||
},
|
||||
},
|
||||
},
|
||||
"multiple-syncs-with-options": {
|
||||
in: `[{"uri":"config/samples/example_flags.json","provider":"file"},
|
||||
{"uri":"http://my-flag-source.json","provider":"http","bearerToken":"bearer-dji34ld2l"},
|
||||
{"uri":"https://secure-remote","provider":"http","bearerToken":"bearer-dji34ld2l"},
|
||||
{"uri":"http://site.com","provider":"http","interval":77 },
|
||||
{"uri":"default/my-flag-config","provider":"kubernetes"},
|
||||
{"uri":"grpc-source:8080","provider":"grpc"},
|
||||
{"uri":"my-flag-source:8080","provider":"grpc", "tls":true, "certPath": "/certs/ca.cert", "providerID": "flagd-weatherapp-sidecar", "selector": "source=database,app=weatherapp"}]
|
||||
`,
|
||||
expectErr: false,
|
||||
out: []SourceConfig{
|
||||
{
|
||||
URI: "config/samples/example_flags.json",
|
||||
Provider: syncProviderFile,
|
||||
},
|
||||
{
|
||||
URI: "http://my-flag-source.json",
|
||||
Provider: syncProviderHTTP,
|
||||
BearerToken: "bearer-dji34ld2l",
|
||||
},
|
||||
{
|
||||
URI: "https://secure-remote",
|
||||
Provider: syncProviderHTTP,
|
||||
BearerToken: "bearer-dji34ld2l",
|
||||
},
|
||||
{
|
||||
URI: "http://site.com",
|
||||
Provider: syncProviderHTTP,
|
||||
Interval: 77,
|
||||
},
|
||||
{
|
||||
URI: "default/my-flag-config",
|
||||
Provider: syncProviderKubernetes,
|
||||
},
|
||||
{
|
||||
URI: "grpc-source:8080",
|
||||
Provider: syncProviderGrpc,
|
||||
},
|
||||
{
|
||||
URI: "my-flag-source:8080",
|
||||
Provider: syncProviderGrpc,
|
||||
TLS: true,
|
||||
CertPath: "/certs/ca.cert",
|
||||
ProviderID: "flagd-weatherapp-sidecar",
|
||||
Selector: "source=database,app=weatherapp",
|
||||
},
|
||||
},
|
||||
},
|
||||
"empty": {
|
||||
in: `[]`,
|
||||
expectErr: false,
|
||||
out: []SourceConfig{},
|
||||
},
|
||||
"parse-failure": {
|
||||
in: ``,
|
||||
expectErr: true,
|
||||
out: []SourceConfig{},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tt := range test {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
out, err := ParseSources(tt.in)
|
||||
if tt.expectErr {
|
||||
if err == nil {
|
||||
t.Error("expected error, got none")
|
||||
}
|
||||
} else if err != nil {
|
||||
t.Errorf("did not expect error: %s", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(out, tt.out) {
|
||||
t.Errorf("unexpected output, expected %v, got %v", tt.out, out)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseSyncProviderURIs(t *testing.T) {
|
||||
test := map[string]struct {
|
||||
in []string
|
||||
expectErr bool
|
||||
out []SourceConfig
|
||||
}{
|
||||
"simple": {
|
||||
in: []string{
|
||||
"file:my-file.json",
|
||||
},
|
||||
expectErr: false,
|
||||
out: []SourceConfig{
|
||||
{
|
||||
URI: "my-file.json",
|
||||
Provider: "file",
|
||||
},
|
||||
},
|
||||
},
|
||||
"multiple-uris": {
|
||||
in: []string{
|
||||
"file:my-file.json",
|
||||
"https://test.com",
|
||||
"grpc://host:port",
|
||||
"grpcs://secure-grpc",
|
||||
"core.openfeature.dev/default/my-crd",
|
||||
},
|
||||
expectErr: false,
|
||||
out: []SourceConfig{
|
||||
{
|
||||
URI: "my-file.json",
|
||||
Provider: "file",
|
||||
},
|
||||
{
|
||||
URI: "https://test.com",
|
||||
Provider: "http",
|
||||
},
|
||||
{
|
||||
URI: "host:port",
|
||||
Provider: "grpc",
|
||||
TLS: false,
|
||||
},
|
||||
{
|
||||
URI: "secure-grpc",
|
||||
Provider: "grpc",
|
||||
TLS: true,
|
||||
},
|
||||
{
|
||||
URI: "default/my-crd",
|
||||
Provider: "kubernetes",
|
||||
},
|
||||
},
|
||||
},
|
||||
"empty": {
|
||||
in: []string{},
|
||||
expectErr: false,
|
||||
out: []SourceConfig{},
|
||||
},
|
||||
"parse-failure": {
|
||||
in: []string{"care.openfeature.dev/will/fail"},
|
||||
expectErr: true,
|
||||
out: []SourceConfig{},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tt := range test {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
out, err := ParseSyncProviderURIs(tt.in)
|
||||
if tt.expectErr {
|
||||
if err == nil {
|
||||
t.Error("expected error, got none")
|
||||
}
|
||||
} else if err != nil {
|
||||
t.Errorf("did not expect error: %s", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(out, tt.out) {
|
||||
t.Errorf("unexpected output, expected %v, got %v", tt.out, out)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Note - K8s configuration require K8s client, hence do not use K8s sync provider in this test
|
||||
func Test_syncProvidersFromConfig(t *testing.T) {
|
||||
lg := logger.NewLogger(nil, false)
|
||||
|
||||
type args struct {
|
||||
logger *logger.Logger
|
||||
sources []SourceConfig
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantSyncs int // simply check the count of ISync providers yield from configurations
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Empty",
|
||||
args: args{
|
||||
logger: lg,
|
||||
sources: []SourceConfig{},
|
||||
},
|
||||
wantSyncs: 0,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "Error",
|
||||
args: args{
|
||||
logger: lg,
|
||||
sources: []SourceConfig{
|
||||
{
|
||||
URI: "fake",
|
||||
Provider: "disk",
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSyncs: 0,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "single",
|
||||
args: args{
|
||||
logger: lg,
|
||||
sources: []SourceConfig{
|
||||
{
|
||||
URI: "grpc://host:port",
|
||||
Provider: syncProviderGrpc,
|
||||
ProviderID: "myapp",
|
||||
CertPath: "/tmp/ca.cert",
|
||||
Selector: "source=database",
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSyncs: 1,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "combined",
|
||||
args: args{
|
||||
logger: lg,
|
||||
sources: []SourceConfig{
|
||||
{
|
||||
URI: "grpc://host:port",
|
||||
Provider: syncProviderGrpc,
|
||||
ProviderID: "myapp",
|
||||
CertPath: "/tmp/ca.cert",
|
||||
Selector: "source=database",
|
||||
},
|
||||
{
|
||||
URI: "https://host:port",
|
||||
Provider: syncProviderHTTP,
|
||||
BearerToken: "token",
|
||||
},
|
||||
{
|
||||
URI: "/tmp/flags.json",
|
||||
Provider: syncProviderFile,
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSyncs: 3,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
syncs, err := syncProvidersFromConfig(tt.args.logger, tt.args.sources)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("syncProvidersFromConfig() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if tt.wantSyncs != len(syncs) {
|
||||
t.Errorf("syncProvidersFromConfig() yielded = %v, but expected %v", len(syncs), tt.wantSyncs)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_setupJSONEvaluator(t *testing.T) {
|
||||
lg := logger.NewLogger(nil, false)
|
||||
|
||||
|
|
|
|||
|
|
@ -7,13 +7,13 @@ import (
|
|||
rpc "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
|
||||
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1"
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/subscriptions"
|
||||
"github.com/open-feature/flagd/core/pkg/sync"
|
||||
syncStore "github.com/open-feature/flagd/core/pkg/sync-store"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
rpc.UnimplementedFlagSyncServiceServer
|
||||
syncStore syncStore.ISyncStore
|
||||
syncStore subscriptions.Manager
|
||||
logger *logger.Logger
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import (
|
|||
rpc "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
iservice "github.com/open-feature/flagd/core/pkg/service"
|
||||
syncStore "github.com/open-feature/flagd/core/pkg/sync-store"
|
||||
"github.com/open-feature/flagd/core/pkg/subscriptions"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/h2c"
|
||||
|
|
@ -32,7 +32,7 @@ type Server struct {
|
|||
metricServerReady bool
|
||||
}
|
||||
|
||||
func NewServer(logger *logger.Logger, store syncStore.ISyncStore) *Server {
|
||||
func NewServer(logger *logger.Logger, store subscriptions.Manager) *Server {
|
||||
return &Server{
|
||||
handler: &handler{
|
||||
logger: logger,
|
||||
|
|
|
|||
|
|
@ -1,44 +1,45 @@
|
|||
//nolint:contextcheck
|
||||
package store
|
||||
package subscriptions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/runtime"
|
||||
isync "github.com/open-feature/flagd/core/pkg/sync"
|
||||
"go.uber.org/zap"
|
||||
syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder"
|
||||
)
|
||||
|
||||
var (
|
||||
regCrd *regexp.Regexp
|
||||
regFile *regexp.Regexp
|
||||
)
|
||||
// Manager defines the interface for the subscription management
|
||||
type Manager interface {
|
||||
FetchAllFlags(
|
||||
ctx context.Context,
|
||||
key interface{},
|
||||
target string,
|
||||
) (isync.DataSync, error)
|
||||
RegisterSubscription(
|
||||
ctx context.Context,
|
||||
target string,
|
||||
key interface{},
|
||||
dataSync chan isync.DataSync,
|
||||
errChan chan error,
|
||||
)
|
||||
|
||||
func init() {
|
||||
regCrd = regexp.MustCompile("^core.openfeature.dev/")
|
||||
regFile = regexp.MustCompile("^file:")
|
||||
// metrics hooks
|
||||
GetActiveSubscriptionsInt64() int64
|
||||
}
|
||||
|
||||
type SyncStore struct {
|
||||
// Coordinator coordinates subscriptions by aggregating subscribers for the same target, and keeping them up to date
|
||||
// for any updates that have happened for those targets.
|
||||
type Coordinator struct {
|
||||
ctx context.Context
|
||||
syncHandlers map[string]*syncHandler
|
||||
multiplexers map[string]*multiplexer
|
||||
logger *logger.Logger
|
||||
mu *sync.RWMutex
|
||||
syncBuilder SyncBuilderInterface
|
||||
}
|
||||
|
||||
type syncHandler struct {
|
||||
subs map[interface{}]storedChannels
|
||||
dataSync chan isync.DataSync
|
||||
cancelFunc context.CancelFunc
|
||||
syncRef isync.ISync
|
||||
mu *sync.RWMutex
|
||||
syncBuilder syncbuilder.ISyncBuilder
|
||||
}
|
||||
|
||||
type storedChannels struct {
|
||||
|
|
@ -46,27 +47,27 @@ type storedChannels struct {
|
|||
dataSync chan isync.DataSync
|
||||
}
|
||||
|
||||
// NewSyncStore returns a new sync store
|
||||
func NewSyncStore(ctx context.Context, logger *logger.Logger) *SyncStore {
|
||||
ss := SyncStore{
|
||||
// NewManager returns a new subscription manager
|
||||
func NewManager(ctx context.Context, logger *logger.Logger) *Coordinator {
|
||||
mgr := Coordinator{
|
||||
ctx: ctx,
|
||||
syncHandlers: map[string]*syncHandler{},
|
||||
multiplexers: map[string]*multiplexer{},
|
||||
logger: logger,
|
||||
mu: &sync.RWMutex{},
|
||||
syncBuilder: &SyncBuilder{},
|
||||
syncBuilder: &syncbuilder.SyncBuilder{},
|
||||
}
|
||||
go ss.cleanup()
|
||||
return &ss
|
||||
go mgr.cleanup()
|
||||
return &mgr
|
||||
}
|
||||
|
||||
// FetchAllFlags returns a DataSync containing the full set of flag configurations from the SyncStore.
|
||||
// FetchAllFlags returns a DataSync containing the full set of flag configurations from the Coordinator.
|
||||
// This will either occur via triggering a resync, or through setting up a new subscription to the resource
|
||||
func (s *SyncStore) FetchAllFlags(ctx context.Context, key interface{}, target string) (isync.DataSync, error) {
|
||||
func (s *Coordinator) FetchAllFlags(ctx context.Context, key interface{}, target string) (isync.DataSync, error) {
|
||||
s.logger.Debug(fmt.Sprintf("fetching all flags for target %s", target))
|
||||
dataSyncChan := make(chan isync.DataSync, 1)
|
||||
errChan := make(chan error, 1)
|
||||
s.mu.RLock()
|
||||
syncHandler, ok := s.syncHandlers[target]
|
||||
syncHandler, ok := s.multiplexers[target]
|
||||
s.mu.RUnlock()
|
||||
if !ok {
|
||||
s.logger.Debug(fmt.Sprintf("sync handler does not exist for target %s, registering a new subscription", target))
|
||||
|
|
@ -95,7 +96,7 @@ func (s *SyncStore) FetchAllFlags(ctx context.Context, key interface{}, target s
|
|||
|
||||
// RegisterSubscription starts a new subscription to the target resource.
|
||||
// Once the subscription is set an ALL sync event will be received via the DataSync chan.
|
||||
func (s *SyncStore) RegisterSubscription(
|
||||
func (s *Coordinator) RegisterSubscription(
|
||||
ctx context.Context,
|
||||
target string,
|
||||
key interface{},
|
||||
|
|
@ -105,16 +106,16 @@ func (s *SyncStore) RegisterSubscription(
|
|||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
// is there a currently active subscription for this target?
|
||||
sh, ok := s.syncHandlers[target]
|
||||
sh, ok := s.multiplexers[target]
|
||||
if !ok {
|
||||
// we need to start a sync for this
|
||||
s.logger.Debug(
|
||||
fmt.Sprintf(
|
||||
"sync handler does not exist for target %s, registering syncHandler with sub %p",
|
||||
"sync handler does not exist for target %s, registering multiplexer with sub %p",
|
||||
target,
|
||||
key,
|
||||
))
|
||||
s.syncHandlers[target] = &syncHandler{
|
||||
s.multiplexers[target] = &multiplexer{
|
||||
dataSync: make(chan isync.DataSync),
|
||||
subs: map[interface{}]storedChannels{
|
||||
key: {
|
||||
|
|
@ -137,7 +138,7 @@ func (s *SyncStore) RegisterSubscription(
|
|||
go func() {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
if _, ok := s.syncHandlers[target]; ok {
|
||||
if _, ok := s.multiplexers[target]; ok {
|
||||
s.logger.Debug(fmt.Sprintf("sync handler exists for target %s, triggering a resync", target))
|
||||
if err := sh.syncRef.ReSync(ctx, dataSync); err != nil {
|
||||
errChan <- err
|
||||
|
|
@ -151,28 +152,28 @@ func (s *SyncStore) RegisterSubscription(
|
|||
<-ctx.Done()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.syncHandlers[target] != nil && s.syncHandlers[target].subs != nil {
|
||||
if s.multiplexers[target] != nil && s.multiplexers[target].subs != nil {
|
||||
s.logger.Debug(fmt.Sprintf("removing sync subscription due to context cancellation %p", key))
|
||||
delete(s.syncHandlers[target].subs, key)
|
||||
delete(s.multiplexers[target].subs, key)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *SyncStore) watchResource(target string) {
|
||||
func (s *Coordinator) watchResource(target string) {
|
||||
s.logger.Debug(fmt.Sprintf("watching resource %s", target))
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
defer cancel()
|
||||
sh, ok := s.syncHandlers[target]
|
||||
sh, ok := s.multiplexers[target]
|
||||
if !ok {
|
||||
s.logger.Error(fmt.Sprintf("no sync handler exists for target %s", target))
|
||||
return
|
||||
}
|
||||
// this cancel is accessed by the cleanup method shutdown the listener + delete the syncHandler
|
||||
// this cancel is accessed by the cleanup method shutdown the listener + delete the multiplexer
|
||||
sh.cancelFunc = cancel
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
s.mu.Lock()
|
||||
delete(s.syncHandlers, target)
|
||||
delete(s.multiplexers, target)
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
// broadcast any data passed through the core channel to all subscribing channels
|
||||
|
|
@ -182,73 +183,47 @@ func (s *SyncStore) watchResource(target string) {
|
|||
case <-ctx.Done():
|
||||
return
|
||||
case d := <-sh.dataSync:
|
||||
sh.writeData(s.logger, d)
|
||||
sh.broadcastData(s.logger, d)
|
||||
}
|
||||
}
|
||||
}()
|
||||
// setup sync, if this fails an error is broadcasted, and the defer results in cleanup
|
||||
sync, err := s.syncBuilder.SyncFromURI(target, s.logger)
|
||||
syncSource, err := s.syncBuilder.SyncFromURI(target, s.logger)
|
||||
if err != nil {
|
||||
s.logger.Error(fmt.Sprintf("unable to build sync from URI for target %s: %s", target, err.Error()))
|
||||
sh.writeError(s.logger, err)
|
||||
sh.broadcastError(s.logger, err)
|
||||
return
|
||||
}
|
||||
// init sync, if this fails an error is broadcasted, and the defer results in cleanup
|
||||
err = sync.Init(ctx)
|
||||
err = syncSource.Init(ctx)
|
||||
if err != nil {
|
||||
s.logger.Error(fmt.Sprintf("unable to initiate sync for target %s: %s", target, err.Error()))
|
||||
sh.writeError(s.logger, err)
|
||||
sh.broadcastError(s.logger, err)
|
||||
return
|
||||
}
|
||||
// sync ref is used to trigger a resync on a single channel when a new subscription is started
|
||||
// syncSource ref is used to trigger a resync on a single channel when a new subscription is started
|
||||
// but the associated SyncHandler already exists, i.e. this function is not run
|
||||
sh.syncRef = sync
|
||||
err = sync.Sync(ctx, sh.dataSync)
|
||||
sh.syncRef = syncSource
|
||||
err = syncSource.Sync(ctx, sh.dataSync)
|
||||
if err != nil {
|
||||
s.logger.Error(fmt.Sprintf("error from sync for target %s: %s", target, err.Error()))
|
||||
sh.writeError(s.logger, err)
|
||||
sh.broadcastError(s.logger, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *syncHandler) writeError(logger *logger.Logger, err error) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for k, ec := range h.subs {
|
||||
select {
|
||||
case ec.errChan <- err:
|
||||
continue
|
||||
default:
|
||||
logger.Error(fmt.Sprintf("unable to write error to channel for key %p", k))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *syncHandler) writeData(logger *logger.Logger, data isync.DataSync) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for k, ds := range h.subs {
|
||||
select {
|
||||
case ds.dataSync <- data:
|
||||
continue
|
||||
default:
|
||||
logger.Error(fmt.Sprintf("unable to write data to channel for key %p", k))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SyncStore) cleanup() {
|
||||
func (s *Coordinator) cleanup() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
s.mu.Lock()
|
||||
for k, v := range s.syncHandlers {
|
||||
// delete any syncHandlers with 0 active subscriptions through cancelling its context
|
||||
s.logger.Debug(fmt.Sprintf("syncHandler for target %s has %d subscriptions", k, len(v.subs)))
|
||||
for k, v := range s.multiplexers {
|
||||
// delete any multiplexers with 0 active subscriptions through cancelling its context
|
||||
s.logger.Debug(fmt.Sprintf("multiplexer for target %s has %d subscriptions", k, len(v.subs)))
|
||||
if len(v.subs) == 0 {
|
||||
s.logger.Debug(fmt.Sprintf("shutting down syncHandler %s", k))
|
||||
s.syncHandlers[k].cancelFunc()
|
||||
s.logger.Debug(fmt.Sprintf("shutting down multiplexer %s", k))
|
||||
s.multiplexers[k].cancelFunc()
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
|
@ -256,45 +231,14 @@ func (s *SyncStore) cleanup() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *SyncStore) GetActiveSubscriptionsInt64() int64 {
|
||||
func (s *Coordinator) GetActiveSubscriptionsInt64() int64 {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
syncs := 0
|
||||
for _, v := range s.syncHandlers {
|
||||
for _, v := range s.multiplexers {
|
||||
syncs += len(v.subs)
|
||||
}
|
||||
|
||||
return int64(syncs)
|
||||
}
|
||||
|
||||
type SyncBuilderInterface interface {
|
||||
SyncFromURI(uri string, logger *logger.Logger) (isync.ISync, error)
|
||||
}
|
||||
|
||||
type SyncBuilder struct{}
|
||||
|
||||
// SyncFromURI builds an ISync interface from the input uri string
|
||||
func (sb *SyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (isync.ISync, error) {
|
||||
switch uriB := []byte(uri); {
|
||||
// filepath may be used for debugging, not recommended in deployment
|
||||
case regFile.Match(uriB):
|
||||
return runtime.NewFile(runtime.SourceConfig{
|
||||
URI: regFile.ReplaceAllString(uri, ""),
|
||||
}, logger.WithFields(
|
||||
zap.String("component", "sync"),
|
||||
zap.String("sync", "filepath"),
|
||||
zap.String("target", "target"),
|
||||
)), nil
|
||||
case regCrd.Match(uriB):
|
||||
s, err := runtime.NewK8s(uri, logger.WithFields(
|
||||
zap.String("component", "sync"),
|
||||
zap.String("sync", "kubernetes"),
|
||||
))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating k8s sync: %w", err)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
return nil, fmt.Errorf("unrecognized URI: %s", uri)
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package store
|
||||
package subscriptions
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -59,17 +59,21 @@ type syncBuilderMock struct {
|
|||
initError error
|
||||
}
|
||||
|
||||
func (s *syncBuilderMock) SyncsFromConfig(_ []isync.SourceConfig, _ *logger.Logger) ([]isync.ISync, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *syncBuilderMock) SyncFromURI(_ string, _ *logger.Logger) (isync.ISync, error) {
|
||||
return s.mock, s.initError
|
||||
}
|
||||
|
||||
func newSyncHandler() (*syncHandler, string) {
|
||||
func newSyncHandler() (*multiplexer, string) {
|
||||
coreDataSyncChan := make(chan isync.DataSync, 1)
|
||||
dataSyncChan := make(chan isync.DataSync, 1)
|
||||
errChan := make(chan error, 1)
|
||||
key := "key"
|
||||
|
||||
return &syncHandler{
|
||||
return &multiplexer{
|
||||
dataSync: coreDataSyncChan,
|
||||
subs: map[interface{}]storedChannels{
|
||||
key: {
|
||||
|
|
@ -83,7 +87,7 @@ func newSyncHandler() (*syncHandler, string) {
|
|||
|
||||
func Test_watchResource(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
|
||||
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
|
||||
syncMock := newMockSync()
|
||||
syncStore.syncBuilder = &syncBuilderMock{
|
||||
mock: syncMock,
|
||||
|
|
@ -92,7 +96,7 @@ func Test_watchResource(t *testing.T) {
|
|||
target := "test-target"
|
||||
syncHandler, key := newSyncHandler()
|
||||
|
||||
syncStore.syncHandlers[target] = syncHandler
|
||||
syncStore.multiplexers[target] = syncHandler
|
||||
|
||||
go syncStore.watchResource(target)
|
||||
|
||||
|
|
@ -128,17 +132,17 @@ func Test_watchResource(t *testing.T) {
|
|||
// no context cancellation should have occurred, and there should still be registered sync sub
|
||||
syncStore.mu.Lock()
|
||||
if len(syncHandler.subs) != 1 {
|
||||
t.Error("incorrect number of subs in syncHandler", syncHandler.subs)
|
||||
t.Error("incorrect number of subs in multiplexer", syncHandler.subs)
|
||||
}
|
||||
syncStore.mu.Unlock()
|
||||
|
||||
// cancellation of context will result in the syncHandler being deleted
|
||||
// cancellation of context will result in the multiplexer being deleted
|
||||
cancel()
|
||||
// allow for the goroutine to catch the lock first
|
||||
time.Sleep(1 * time.Second)
|
||||
syncStore.mu.Lock()
|
||||
if syncStore.syncHandlers[target] != nil {
|
||||
t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs)
|
||||
if syncStore.multiplexers[target] != nil {
|
||||
t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs)
|
||||
}
|
||||
syncStore.mu.Unlock()
|
||||
}
|
||||
|
|
@ -146,7 +150,7 @@ func Test_watchResource(t *testing.T) {
|
|||
func Test_watchResource_initFail(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
|
||||
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
|
||||
syncMock := newMockSync()
|
||||
|
||||
// return an error on startup
|
||||
|
|
@ -158,7 +162,7 @@ func Test_watchResource_initFail(t *testing.T) {
|
|||
target := "test-target"
|
||||
syncHandler, key := newSyncHandler()
|
||||
|
||||
syncStore.syncHandlers[target] = syncHandler
|
||||
syncStore.multiplexers[target] = syncHandler
|
||||
|
||||
go syncStore.watchResource(target)
|
||||
|
||||
|
|
@ -175,8 +179,8 @@ func Test_watchResource_initFail(t *testing.T) {
|
|||
// this should then close the internal context and the watcher should be removed
|
||||
time.Sleep(1 * time.Second)
|
||||
syncStore.mu.Lock()
|
||||
if syncStore.syncHandlers[target] != nil {
|
||||
t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs)
|
||||
if syncStore.multiplexers[target] != nil {
|
||||
t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs)
|
||||
}
|
||||
syncStore.mu.Unlock()
|
||||
}
|
||||
|
|
@ -184,7 +188,7 @@ func Test_watchResource_initFail(t *testing.T) {
|
|||
func Test_watchResource_SyncFromURIFail(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
|
||||
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
|
||||
syncMock := newMockSync()
|
||||
|
||||
// return an error on startup
|
||||
|
|
@ -197,7 +201,7 @@ func Test_watchResource_SyncFromURIFail(t *testing.T) {
|
|||
target := "test-target"
|
||||
syncHandler, key := newSyncHandler()
|
||||
|
||||
syncStore.syncHandlers[target] = syncHandler
|
||||
syncStore.multiplexers[target] = syncHandler
|
||||
|
||||
go syncStore.watchResource(target)
|
||||
|
||||
|
|
@ -214,8 +218,8 @@ func Test_watchResource_SyncFromURIFail(t *testing.T) {
|
|||
// this should then close the internal context and the watcher should be removed
|
||||
time.Sleep(1 * time.Second)
|
||||
syncStore.mu.Lock()
|
||||
if syncStore.syncHandlers[target] != nil {
|
||||
t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs)
|
||||
if syncStore.multiplexers[target] != nil {
|
||||
t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs)
|
||||
}
|
||||
syncStore.mu.Unlock()
|
||||
}
|
||||
|
|
@ -223,7 +227,7 @@ func Test_watchResource_SyncFromURIFail(t *testing.T) {
|
|||
func Test_watchResource_SyncErrorOnClose(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
|
||||
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
|
||||
syncMock := newMockSync()
|
||||
|
||||
// return an error on startup
|
||||
|
|
@ -235,7 +239,7 @@ func Test_watchResource_SyncErrorOnClose(t *testing.T) {
|
|||
target := "test-target"
|
||||
syncHandler, key := newSyncHandler()
|
||||
|
||||
syncStore.syncHandlers[target] = syncHandler
|
||||
syncStore.multiplexers[target] = syncHandler
|
||||
|
||||
go syncStore.watchResource(target)
|
||||
cancel()
|
||||
|
|
@ -252,8 +256,8 @@ func Test_watchResource_SyncErrorOnClose(t *testing.T) {
|
|||
// this should then close the internal context and the watcher should be removed
|
||||
time.Sleep(1 * time.Second)
|
||||
syncStore.mu.Lock()
|
||||
if syncStore.syncHandlers[target] != nil {
|
||||
t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs)
|
||||
if syncStore.multiplexers[target] != nil {
|
||||
t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs)
|
||||
}
|
||||
syncStore.mu.Unlock()
|
||||
}
|
||||
|
|
@ -261,7 +265,7 @@ func Test_watchResource_SyncErrorOnClose(t *testing.T) {
|
|||
func Test_watchResource_SyncHandlerDoesNotExist(_ *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
|
||||
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
|
||||
syncMock := newMockSync()
|
||||
|
||||
// return an error on startup
|
||||
|
|
@ -279,7 +283,7 @@ func Test_watchResource_SyncHandlerDoesNotExist(_ *testing.T) {
|
|||
func Test_watchResource_Cleanup(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
|
||||
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
|
||||
syncMock := newMockSync()
|
||||
|
||||
// return an error on startup
|
||||
|
|
@ -297,7 +301,7 @@ func Test_watchResource_Cleanup(t *testing.T) {
|
|||
doneChan <- struct{}{}
|
||||
}
|
||||
syncStore.mu.Lock()
|
||||
syncStore.syncHandlers[target] = syncHandler
|
||||
syncStore.multiplexers[target] = syncHandler
|
||||
syncStore.mu.Unlock()
|
||||
go func() {
|
||||
syncStore.cleanup()
|
||||
|
|
@ -307,7 +311,7 @@ func Test_watchResource_Cleanup(t *testing.T) {
|
|||
case <-doneChan:
|
||||
return
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Error("syncHandlers not being cleaned up, timed out after 10 seconds")
|
||||
t.Error("multiplexers not being cleaned up, timed out after 10 seconds")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -351,7 +355,7 @@ func Test_FetchAllFlags(t *testing.T) {
|
|||
t.Run(name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
|
||||
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
|
||||
syncMock := newMockSync()
|
||||
syncMock.resyncData = tt.mockData
|
||||
syncMock.resyncError = tt.mockError
|
||||
|
|
@ -365,7 +369,7 @@ func Test_FetchAllFlags(t *testing.T) {
|
|||
syncHandler.syncRef = syncMock
|
||||
}
|
||||
if tt.setHandler {
|
||||
syncStore.syncHandlers[target] = syncHandler
|
||||
syncStore.multiplexers[target] = syncHandler
|
||||
}
|
||||
|
||||
data, err := syncStore.FetchAllFlags(ctx, key, target)
|
||||
|
|
@ -406,7 +410,7 @@ func Test_registerSubscriptionResyncPath(t *testing.T) {
|
|||
t.Run(name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
|
||||
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
|
||||
|
||||
syncMock := newMockSync()
|
||||
syncMock.resyncData = tt.data
|
||||
|
|
@ -420,7 +424,7 @@ func Test_registerSubscriptionResyncPath(t *testing.T) {
|
|||
syncHandler, _ := newSyncHandler()
|
||||
syncHandler.syncRef = syncMock
|
||||
key := struct{}{}
|
||||
syncStore.syncHandlers[target] = syncHandler
|
||||
syncStore.multiplexers[target] = syncHandler
|
||||
dataChan := make(chan isync.DataSync, 1)
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
|
|
@ -445,7 +449,7 @@ func Test_registerSubscriptionResyncPath(t *testing.T) {
|
|||
func Test_syncMetrics(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false))
|
||||
syncStore := NewManager(ctx, logger.NewLogger(nil, false))
|
||||
syncMock := newMockSync()
|
||||
syncStore.syncBuilder = &syncBuilderMock{
|
||||
mock: syncMock,
|
||||
|
|
@ -459,7 +463,7 @@ func Test_syncMetrics(t *testing.T) {
|
|||
target := "test-target"
|
||||
syncHandler, _ := newSyncHandler()
|
||||
|
||||
syncStore.syncHandlers[target] = syncHandler
|
||||
syncStore.multiplexers[target] = syncHandler
|
||||
|
||||
subs = syncStore.GetActiveSubscriptionsInt64()
|
||||
if subs != 1 {
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
package subscriptions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
sourceSync "github.com/open-feature/flagd/core/pkg/sync"
|
||||
)
|
||||
|
||||
// multiplexer distributes updates for a target to all of its subscribers
|
||||
type multiplexer struct {
|
||||
subs map[interface{}]storedChannels
|
||||
dataSync chan sourceSync.DataSync
|
||||
cancelFunc context.CancelFunc
|
||||
syncRef sourceSync.ISync
|
||||
mu *sync.RWMutex
|
||||
}
|
||||
|
||||
func (h *multiplexer) broadcastError(logger *logger.Logger, err error) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for k, ec := range h.subs {
|
||||
select {
|
||||
case ec.errChan <- err:
|
||||
continue
|
||||
default:
|
||||
logger.Error(fmt.Sprintf("unable to write error to channel for key %p", k))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *multiplexer) broadcastData(logger *logger.Logger, data sourceSync.DataSync) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for k, ds := range h.subs {
|
||||
select {
|
||||
case ds.dataSync <- data:
|
||||
continue
|
||||
default:
|
||||
logger.Error(fmt.Sprintf("unable to write data to channel for key %p", k))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,26 +0,0 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
isync "github.com/open-feature/flagd/core/pkg/sync"
|
||||
)
|
||||
|
||||
// ISyncStore defines the interface for the sync store
|
||||
type ISyncStore interface {
|
||||
FetchAllFlags(
|
||||
ctx context.Context,
|
||||
key interface{},
|
||||
target string,
|
||||
) (isync.DataSync, error)
|
||||
RegisterSubscription(
|
||||
ctx context.Context,
|
||||
target string,
|
||||
key interface{},
|
||||
dataSync chan isync.DataSync,
|
||||
errChan chan error,
|
||||
)
|
||||
|
||||
// metrics hooks
|
||||
GetActiveSubscriptionsInt64() int64
|
||||
}
|
||||
|
|
@ -0,0 +1,107 @@
|
|||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: pkg/sync/builder/syncbuilder.go
|
||||
|
||||
// Package middlewaremocksyncbuildermock is a generated GoMock package.
|
||||
package middlewaremocksyncbuildermock
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
logger "github.com/open-feature/flagd/core/pkg/logger"
|
||||
sync "github.com/open-feature/flagd/core/pkg/sync"
|
||||
dynamic "k8s.io/client-go/dynamic"
|
||||
client "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
// MockISyncBuilder is a mock of ISyncBuilder interface.
|
||||
type MockISyncBuilder struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockISyncBuilderMockRecorder
|
||||
}
|
||||
|
||||
// MockISyncBuilderMockRecorder is the mock recorder for MockISyncBuilder.
|
||||
type MockISyncBuilderMockRecorder struct {
|
||||
mock *MockISyncBuilder
|
||||
}
|
||||
|
||||
// NewMockISyncBuilder creates a new mock instance.
|
||||
func NewMockISyncBuilder(ctrl *gomock.Controller) *MockISyncBuilder {
|
||||
mock := &MockISyncBuilder{ctrl: ctrl}
|
||||
mock.recorder = &MockISyncBuilderMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockISyncBuilder) EXPECT() *MockISyncBuilderMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// SyncFromURI mocks base method.
|
||||
func (m *MockISyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SyncFromURI", uri, logger)
|
||||
ret0, _ := ret[0].(sync.ISync)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// SyncFromURI indicates an expected call of SyncFromURI.
|
||||
func (mr *MockISyncBuilderMockRecorder) SyncFromURI(uri, logger interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncFromURI", reflect.TypeOf((*MockISyncBuilder)(nil).SyncFromURI), uri, logger)
|
||||
}
|
||||
|
||||
// SyncsFromConfig mocks base method.
|
||||
func (m *MockISyncBuilder) SyncsFromConfig(sourceConfig []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SyncsFromConfig", sourceConfig, logger)
|
||||
ret0, _ := ret[0].([]sync.ISync)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// SyncsFromConfig indicates an expected call of SyncsFromConfig.
|
||||
func (mr *MockISyncBuilderMockRecorder) SyncsFromConfig(sourceConfig, logger interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncsFromConfig", reflect.TypeOf((*MockISyncBuilder)(nil).SyncsFromConfig), sourceConfig, logger)
|
||||
}
|
||||
|
||||
// MockIK8sClientBuilder is a mock of IK8sClientBuilder interface.
|
||||
type MockIK8sClientBuilder struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockIK8sClientBuilderMockRecorder
|
||||
}
|
||||
|
||||
// MockIK8sClientBuilderMockRecorder is the mock recorder for MockIK8sClientBuilder.
|
||||
type MockIK8sClientBuilderMockRecorder struct {
|
||||
mock *MockIK8sClientBuilder
|
||||
}
|
||||
|
||||
// NewMockIK8sClientBuilder creates a new mock instance.
|
||||
func NewMockIK8sClientBuilder(ctrl *gomock.Controller) *MockIK8sClientBuilder {
|
||||
mock := &MockIK8sClientBuilder{ctrl: ctrl}
|
||||
mock.recorder = &MockIK8sClientBuilderMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockIK8sClientBuilder) EXPECT() *MockIK8sClientBuilderMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// GetK8sClients mocks base method.
|
||||
func (m *MockIK8sClientBuilder) GetK8sClients() (client.Reader, dynamic.Interface, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetK8sClients")
|
||||
ret0, _ := ret[0].(client.Reader)
|
||||
ret1, _ := ret[1].(dynamic.Interface)
|
||||
ret2, _ := ret[2].(error)
|
||||
return ret0, ret1, ret2
|
||||
}
|
||||
|
||||
// GetK8sClients indicates an expected call of GetK8sClients.
|
||||
func (mr *MockIK8sClientBuilderMockRecorder) GetK8sClients() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetK8sClients", reflect.TypeOf((*MockIK8sClientBuilder)(nil).GetK8sClients))
|
||||
}
|
||||
|
|
@ -0,0 +1,218 @@
|
|||
package builder
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
msync "sync"
|
||||
"time"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/sync"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/file"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/grpc"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
|
||||
httpSync "github.com/open-feature/flagd/core/pkg/sync/http"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/kubernetes"
|
||||
"github.com/robfig/cron"
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
const (
|
||||
syncProviderFile = "file"
|
||||
syncProviderGrpc = "grpc"
|
||||
syncProviderKubernetes = "kubernetes"
|
||||
syncProviderHTTP = "http"
|
||||
)
|
||||
|
||||
var (
|
||||
regCrd *regexp.Regexp
|
||||
regURL *regexp.Regexp
|
||||
regGRPC *regexp.Regexp
|
||||
regGRPCSecure *regexp.Regexp
|
||||
regFile *regexp.Regexp
|
||||
)
|
||||
|
||||
func init() {
|
||||
regCrd = regexp.MustCompile("^core.openfeature.dev/")
|
||||
regURL = regexp.MustCompile("^https?://")
|
||||
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
|
||||
regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure)
|
||||
regFile = regexp.MustCompile("^file:")
|
||||
}
|
||||
|
||||
type ISyncBuilder interface {
|
||||
SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error)
|
||||
SyncsFromConfig(sourceConfig []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error)
|
||||
}
|
||||
|
||||
type SyncBuilder struct {
|
||||
k8sClientBuilder IK8sClientBuilder
|
||||
}
|
||||
|
||||
func NewSyncBuilder() *SyncBuilder {
|
||||
return &SyncBuilder{
|
||||
k8sClientBuilder: &KubernetesClientBuilder{},
|
||||
}
|
||||
}
|
||||
|
||||
func (sb *SyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error) {
|
||||
switch uriB := []byte(uri); {
|
||||
// filepath may be used for debugging, not recommended in deployment
|
||||
case regFile.Match(uriB):
|
||||
return sb.newFile(uri, logger), nil
|
||||
case regCrd.Match(uriB):
|
||||
return sb.newK8s(uri, logger)
|
||||
}
|
||||
return nil, fmt.Errorf("unrecognized URI: %s", uri)
|
||||
}
|
||||
|
||||
func (sb *SyncBuilder) SyncsFromConfig(sourceConfigs []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error) {
|
||||
syncImpls := make([]sync.ISync, len(sourceConfigs))
|
||||
for i, syncProvider := range sourceConfigs {
|
||||
syncImpl, err := sb.syncFromConfig(syncProvider, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create sync provider: %w", err)
|
||||
}
|
||||
syncImpls[i] = syncImpl
|
||||
}
|
||||
return syncImpls, nil
|
||||
}
|
||||
|
||||
func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *logger.Logger) (sync.ISync, error) {
|
||||
switch sourceConfig.Provider {
|
||||
case syncProviderFile:
|
||||
logger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", sourceConfig.URI))
|
||||
return sb.newFile(sourceConfig.URI, logger), nil
|
||||
case syncProviderKubernetes:
|
||||
logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", sourceConfig.URI))
|
||||
return sb.newK8s(sourceConfig.URI, logger)
|
||||
case syncProviderHTTP:
|
||||
logger.Debug(fmt.Sprintf("using remote sync-provider for: %s", sourceConfig.URI))
|
||||
return sb.newHTTP(sourceConfig, logger), nil
|
||||
case syncProviderGrpc:
|
||||
logger.Debug(fmt.Sprintf("using grpc sync-provider for: %s", sourceConfig.URI))
|
||||
return sb.newGRPC(sourceConfig, logger), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'",
|
||||
sourceConfig.Provider, syncProviderFile, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
|
||||
}
|
||||
}
|
||||
|
||||
func (sb *SyncBuilder) newFile(uri string, logger *logger.Logger) *file.Sync {
|
||||
return &file.Sync{
|
||||
URI: regFile.ReplaceAllString(uri, ""),
|
||||
Logger: logger.WithFields(
|
||||
zap.String("component", "sync"),
|
||||
zap.String("sync", "filepath"),
|
||||
),
|
||||
Mux: &msync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (sb *SyncBuilder) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
|
||||
reader, dynamicClient, err := sb.k8sClientBuilder.GetK8sClients()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating kubernetes clients: %w", err)
|
||||
}
|
||||
|
||||
return kubernetes.NewK8sSync(
|
||||
logger.WithFields(
|
||||
zap.String("component", "sync"),
|
||||
zap.String("sync", "kubernetes"),
|
||||
),
|
||||
regCrd.ReplaceAllString(uri, ""),
|
||||
reader,
|
||||
dynamicClient,
|
||||
), nil
|
||||
}
|
||||
|
||||
func (sb *SyncBuilder) newHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync {
|
||||
// Default to 5 seconds
|
||||
var interval uint32 = 5
|
||||
if config.Interval != 0 {
|
||||
interval = config.Interval
|
||||
}
|
||||
|
||||
return &httpSync.Sync{
|
||||
URI: config.URI,
|
||||
Client: &http.Client{
|
||||
Timeout: time.Second * 10,
|
||||
},
|
||||
Logger: logger.WithFields(
|
||||
zap.String("component", "sync"),
|
||||
zap.String("sync", "remote"),
|
||||
),
|
||||
BearerToken: config.BearerToken,
|
||||
Interval: interval,
|
||||
Cron: cron.New(),
|
||||
}
|
||||
}
|
||||
|
||||
func (sb *SyncBuilder) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync {
|
||||
return &grpc.Sync{
|
||||
URI: config.URI,
|
||||
Logger: logger.WithFields(
|
||||
zap.String("component", "sync"),
|
||||
zap.String("sync", "grpc"),
|
||||
),
|
||||
CredentialBuilder: &credentials.CredentialBuilder{},
|
||||
CertPath: config.CertPath,
|
||||
ProviderID: config.ProviderID,
|
||||
Secure: config.TLS,
|
||||
Selector: config.Selector,
|
||||
}
|
||||
}
|
||||
|
||||
type IK8sClientBuilder interface {
|
||||
GetK8sClients() (client.Reader, dynamic.Interface, error)
|
||||
}
|
||||
|
||||
type KubernetesClientBuilder struct{}
|
||||
|
||||
func (kcb KubernetesClientBuilder) GetK8sClients() (client.Reader, dynamic.Interface, error) {
|
||||
clusterConfig, err := k8sClusterConfig()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to create readClient: %w", err)
|
||||
}
|
||||
|
||||
dynamicClient, err := dynamic.NewForConfig(clusterConfig)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to create dynamicClient: %w", err)
|
||||
}
|
||||
return readClient, dynamicClient, nil
|
||||
}
|
||||
|
||||
// k8sClusterConfig build K8s connection config based available configurations
|
||||
func k8sClusterConfig() (*rest.Config, error) {
|
||||
cfg := os.Getenv("KUBECONFIG")
|
||||
|
||||
var clusterConfig *rest.Config
|
||||
var err error
|
||||
|
||||
if cfg != "" {
|
||||
clusterConfig, err = clientcmd.BuildConfigFromFlags("", cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building cluster config from flags: %w", err)
|
||||
}
|
||||
} else {
|
||||
clusterConfig, err = rest.InClusterConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error fetching cluster config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return clusterConfig, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,240 @@
|
|||
package builder
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/sync"
|
||||
buildermock "github.com/open-feature/flagd/core/pkg/sync/builder/mock"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/file"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/grpc"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/http"
|
||||
"github.com/open-feature/flagd/core/pkg/sync/kubernetes"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSyncBuilder_SyncFromURI(t *testing.T) {
|
||||
type args struct {
|
||||
uri string
|
||||
logger *logger.Logger
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
injectFunc func(builder *SyncBuilder)
|
||||
want sync.ISync
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "kubernetes sync",
|
||||
args: args{
|
||||
uri: "core.openfeature.dev/ff-config",
|
||||
logger: logger.NewLogger(nil, false),
|
||||
},
|
||||
injectFunc: func(builder *SyncBuilder) {
|
||||
ctrl := gomock.NewController(t)
|
||||
|
||||
mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl)
|
||||
mockClientBuilder.EXPECT().GetK8sClients().Times(1).Return(nil, nil, nil)
|
||||
|
||||
builder.k8sClientBuilder = mockClientBuilder
|
||||
},
|
||||
want: &kubernetes.Sync{},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "kubernetes sync - error when retrieving config",
|
||||
args: args{
|
||||
uri: "core.openfeature.dev/ff-config",
|
||||
logger: logger.NewLogger(nil, false),
|
||||
},
|
||||
injectFunc: func(builder *SyncBuilder) {
|
||||
ctrl := gomock.NewController(t)
|
||||
|
||||
mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl)
|
||||
mockClientBuilder.EXPECT().GetK8sClients().Times(1).Return(nil, nil, errors.New("oops"))
|
||||
|
||||
builder.k8sClientBuilder = mockClientBuilder
|
||||
},
|
||||
want: nil,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "file sync",
|
||||
args: args{
|
||||
uri: "file:my-file",
|
||||
logger: logger.NewLogger(nil, false),
|
||||
},
|
||||
want: &file.Sync{},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
sb := NewSyncBuilder()
|
||||
|
||||
if tt.injectFunc != nil {
|
||||
tt.injectFunc(sb)
|
||||
}
|
||||
|
||||
got, err := sb.SyncFromURI(tt.args.uri, tt.args.logger)
|
||||
if tt.wantErr {
|
||||
require.NotNil(t, err)
|
||||
require.Nil(t, got)
|
||||
} else {
|
||||
require.Nil(t, err)
|
||||
require.IsType(t, tt.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_k8sClusterConfig(t *testing.T) {
|
||||
t.Run("Cannot find KUBECONFIG file", func(tt *testing.T) {
|
||||
tt.Setenv("KUBECONFIG", "")
|
||||
_, err := k8sClusterConfig()
|
||||
if err == nil {
|
||||
tt.Error("Expected error but got none")
|
||||
}
|
||||
})
|
||||
t.Run("KUBECONFIG file not existing", func(tt *testing.T) {
|
||||
tt.Setenv("KUBECONFIG", "value")
|
||||
_, err := k8sClusterConfig()
|
||||
if err == nil {
|
||||
tt.Error("Expected error but got none")
|
||||
}
|
||||
})
|
||||
t.Run("Default REST Config and missing svc account", func(tt *testing.T) {
|
||||
tt.Setenv("KUBERNETES_SERVICE_HOST", "127.0.0.1")
|
||||
tt.Setenv("KUBERNETES_SERVICE_PORT", "8080")
|
||||
_, err := k8sClusterConfig()
|
||||
if err == nil {
|
||||
tt.Error("Expected error but got none")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func Test_SyncsFromFromConfig(t *testing.T) {
|
||||
lg := logger.NewLogger(nil, false)
|
||||
|
||||
type args struct {
|
||||
logger *logger.Logger
|
||||
sources []sync.SourceConfig
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
injectFunc func(builder *SyncBuilder)
|
||||
wantSyncs []sync.ISync
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Empty",
|
||||
args: args{
|
||||
logger: lg,
|
||||
sources: []sync.SourceConfig{},
|
||||
},
|
||||
wantSyncs: nil,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "Error",
|
||||
args: args{
|
||||
logger: lg,
|
||||
sources: []sync.SourceConfig{
|
||||
{
|
||||
URI: "fake",
|
||||
Provider: "disk",
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSyncs: nil,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "single",
|
||||
args: args{
|
||||
logger: lg,
|
||||
sources: []sync.SourceConfig{
|
||||
{
|
||||
URI: "grpc://host:port",
|
||||
Provider: syncProviderGrpc,
|
||||
ProviderID: "myapp",
|
||||
CertPath: "/tmp/ca.cert",
|
||||
Selector: "source=database",
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSyncs: []sync.ISync{
|
||||
&grpc.Sync{},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "combined",
|
||||
injectFunc: func(builder *SyncBuilder) {
|
||||
ctrl := gomock.NewController(t)
|
||||
|
||||
mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl)
|
||||
mockClientBuilder.EXPECT().GetK8sClients().Times(1).Return(nil, nil, nil)
|
||||
|
||||
builder.k8sClientBuilder = mockClientBuilder
|
||||
},
|
||||
args: args{
|
||||
logger: lg,
|
||||
sources: []sync.SourceConfig{
|
||||
{
|
||||
URI: "grpc://host:port",
|
||||
Provider: syncProviderGrpc,
|
||||
ProviderID: "myapp",
|
||||
CertPath: "/tmp/ca.cert",
|
||||
Selector: "source=database",
|
||||
},
|
||||
{
|
||||
URI: "https://host:port",
|
||||
Provider: syncProviderHTTP,
|
||||
BearerToken: "token",
|
||||
},
|
||||
{
|
||||
URI: "/tmp/flags.json",
|
||||
Provider: syncProviderFile,
|
||||
},
|
||||
{
|
||||
URI: "my-namespace/my-flags",
|
||||
Provider: syncProviderKubernetes,
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSyncs: []sync.ISync{
|
||||
&grpc.Sync{},
|
||||
&http.Sync{},
|
||||
&file.Sync{},
|
||||
&kubernetes.Sync{},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
sb := NewSyncBuilder()
|
||||
|
||||
if tt.injectFunc != nil {
|
||||
tt.injectFunc(sb)
|
||||
}
|
||||
syncs, err := sb.SyncsFromConfig(tt.args.sources, tt.args.logger)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("syncProvidersFromConfig() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
require.Len(t, syncs, len(tt.wantSyncs))
|
||||
|
||||
// check if we got the expected sync types
|
||||
for index, wantType := range tt.wantSyncs {
|
||||
require.IsType(t, wantType, syncs[index])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
package builder
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/sync"
|
||||
)
|
||||
|
||||
// ParseSources parse a json formatted SourceConfig array string and performs validations on the content
|
||||
func ParseSources(sourcesFlag string) ([]sync.SourceConfig, error) {
|
||||
syncProvidersParsed := []sync.SourceConfig{}
|
||||
|
||||
if err := json.Unmarshal([]byte(sourcesFlag), &syncProvidersParsed); err != nil {
|
||||
return syncProvidersParsed, fmt.Errorf("error parsing sync providers: %w", err)
|
||||
}
|
||||
for _, sp := range syncProvidersParsed {
|
||||
if sp.URI == "" {
|
||||
return syncProvidersParsed, errors.New("sync provider argument parse: uri is a required field")
|
||||
}
|
||||
if sp.Provider == "" {
|
||||
return syncProvidersParsed, errors.New("sync provider argument parse: provider is a required field")
|
||||
}
|
||||
}
|
||||
return syncProvidersParsed, nil
|
||||
}
|
||||
|
||||
// ParseSyncProviderURIs uri flag based sync sources to SourceConfig array. Replaces uri prefixes where necessary to
|
||||
// derive SourceConfig
|
||||
func ParseSyncProviderURIs(uris []string) ([]sync.SourceConfig, error) {
|
||||
syncProvidersParsed := []sync.SourceConfig{}
|
||||
|
||||
for _, uri := range uris {
|
||||
switch uriB := []byte(uri); {
|
||||
case regFile.Match(uriB):
|
||||
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
|
||||
URI: regFile.ReplaceAllString(uri, ""),
|
||||
Provider: syncProviderFile,
|
||||
})
|
||||
case regCrd.Match(uriB):
|
||||
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
|
||||
URI: regCrd.ReplaceAllString(uri, ""),
|
||||
Provider: syncProviderKubernetes,
|
||||
})
|
||||
case regURL.Match(uriB):
|
||||
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
|
||||
URI: uri,
|
||||
Provider: syncProviderHTTP,
|
||||
})
|
||||
case regGRPC.Match(uriB):
|
||||
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
|
||||
URI: regGRPC.ReplaceAllString(uri, ""),
|
||||
Provider: syncProviderGrpc,
|
||||
})
|
||||
case regGRPCSecure.Match(uriB):
|
||||
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
|
||||
URI: regGRPCSecure.ReplaceAllString(uri, ""),
|
||||
Provider: syncProviderGrpc,
|
||||
TLS: true,
|
||||
})
|
||||
default:
|
||||
return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+
|
||||
"'http(s)://', 'grpc(s)://', or 'core.openfeature.dev'", uri)
|
||||
}
|
||||
}
|
||||
return syncProvidersParsed, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,210 @@
|
|||
package builder
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/sync"
|
||||
)
|
||||
|
||||
func TestParseSource(t *testing.T) {
|
||||
test := map[string]struct {
|
||||
in string
|
||||
expectErr bool
|
||||
out []sync.SourceConfig
|
||||
}{
|
||||
"simple": {
|
||||
in: "[{\"uri\":\"config/samples/example_flags.json\",\"provider\":\"file\"}]",
|
||||
expectErr: false,
|
||||
out: []sync.SourceConfig{
|
||||
{
|
||||
URI: "config/samples/example_flags.json",
|
||||
Provider: syncProviderFile,
|
||||
},
|
||||
},
|
||||
},
|
||||
"multiple-syncs": {
|
||||
in: `[
|
||||
{"uri":"config/samples/example_flags.json","provider":"file"},
|
||||
{"uri":"http://test.com","provider":"http","bearerToken":":)"},
|
||||
{"uri":"host:port","provider":"grpc"},
|
||||
{"uri":"default/my-crd","provider":"kubernetes"}
|
||||
]`,
|
||||
expectErr: false,
|
||||
out: []sync.SourceConfig{
|
||||
{
|
||||
URI: "config/samples/example_flags.json",
|
||||
Provider: syncProviderFile,
|
||||
},
|
||||
{
|
||||
URI: "http://test.com",
|
||||
Provider: syncProviderHTTP,
|
||||
BearerToken: ":)",
|
||||
},
|
||||
{
|
||||
URI: "host:port",
|
||||
Provider: syncProviderGrpc,
|
||||
},
|
||||
{
|
||||
URI: "default/my-crd",
|
||||
Provider: syncProviderKubernetes,
|
||||
},
|
||||
},
|
||||
},
|
||||
"multiple-syncs-with-options": {
|
||||
in: `[{"uri":"config/samples/example_flags.json","provider":"file"},
|
||||
{"uri":"http://my-flag-source.json","provider":"http","bearerToken":"bearer-dji34ld2l"},
|
||||
{"uri":"https://secure-remote","provider":"http","bearerToken":"bearer-dji34ld2l"},
|
||||
{"uri":"http://site.com","provider":"http","interval":77 },
|
||||
{"uri":"default/my-flag-config","provider":"kubernetes"},
|
||||
{"uri":"grpc-source:8080","provider":"grpc"},
|
||||
{"uri":"my-flag-source:8080","provider":"grpc", "tls":true, "certPath": "/certs/ca.cert", "providerID": "flagd-weatherapp-sidecar", "selector": "source=database,app=weatherapp"}]
|
||||
`,
|
||||
expectErr: false,
|
||||
out: []sync.SourceConfig{
|
||||
{
|
||||
URI: "config/samples/example_flags.json",
|
||||
Provider: syncProviderFile,
|
||||
},
|
||||
{
|
||||
URI: "http://my-flag-source.json",
|
||||
Provider: syncProviderHTTP,
|
||||
BearerToken: "bearer-dji34ld2l",
|
||||
},
|
||||
{
|
||||
URI: "https://secure-remote",
|
||||
Provider: syncProviderHTTP,
|
||||
BearerToken: "bearer-dji34ld2l",
|
||||
},
|
||||
{
|
||||
URI: "http://site.com",
|
||||
Provider: syncProviderHTTP,
|
||||
Interval: 77,
|
||||
},
|
||||
{
|
||||
URI: "default/my-flag-config",
|
||||
Provider: syncProviderKubernetes,
|
||||
},
|
||||
{
|
||||
URI: "grpc-source:8080",
|
||||
Provider: syncProviderGrpc,
|
||||
},
|
||||
{
|
||||
URI: "my-flag-source:8080",
|
||||
Provider: syncProviderGrpc,
|
||||
TLS: true,
|
||||
CertPath: "/certs/ca.cert",
|
||||
ProviderID: "flagd-weatherapp-sidecar",
|
||||
Selector: "source=database,app=weatherapp",
|
||||
},
|
||||
},
|
||||
},
|
||||
"empty": {
|
||||
in: `[]`,
|
||||
expectErr: false,
|
||||
out: []sync.SourceConfig{},
|
||||
},
|
||||
"parse-failure": {
|
||||
in: ``,
|
||||
expectErr: true,
|
||||
out: []sync.SourceConfig{},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tt := range test {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
out, err := ParseSources(tt.in)
|
||||
if tt.expectErr {
|
||||
if err == nil {
|
||||
t.Error("expected error, got none")
|
||||
}
|
||||
} else if err != nil {
|
||||
t.Errorf("did not expect error: %s", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(out, tt.out) {
|
||||
t.Errorf("unexpected output, expected %v, got %v", tt.out, out)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseSyncProviderURIs(t *testing.T) {
|
||||
test := map[string]struct {
|
||||
in []string
|
||||
expectErr bool
|
||||
out []sync.SourceConfig
|
||||
}{
|
||||
"simple": {
|
||||
in: []string{
|
||||
"file:my-file.json",
|
||||
},
|
||||
expectErr: false,
|
||||
out: []sync.SourceConfig{
|
||||
{
|
||||
URI: "my-file.json",
|
||||
Provider: "file",
|
||||
},
|
||||
},
|
||||
},
|
||||
"multiple-uris": {
|
||||
in: []string{
|
||||
"file:my-file.json",
|
||||
"https://test.com",
|
||||
"grpc://host:port",
|
||||
"grpcs://secure-grpc",
|
||||
"core.openfeature.dev/default/my-crd",
|
||||
},
|
||||
expectErr: false,
|
||||
out: []sync.SourceConfig{
|
||||
{
|
||||
URI: "my-file.json",
|
||||
Provider: "file",
|
||||
},
|
||||
{
|
||||
URI: "https://test.com",
|
||||
Provider: "http",
|
||||
},
|
||||
{
|
||||
URI: "host:port",
|
||||
Provider: "grpc",
|
||||
TLS: false,
|
||||
},
|
||||
{
|
||||
URI: "secure-grpc",
|
||||
Provider: "grpc",
|
||||
TLS: true,
|
||||
},
|
||||
{
|
||||
URI: "default/my-crd",
|
||||
Provider: "kubernetes",
|
||||
},
|
||||
},
|
||||
},
|
||||
"empty": {
|
||||
in: []string{},
|
||||
expectErr: false,
|
||||
out: []sync.SourceConfig{},
|
||||
},
|
||||
"parse-failure": {
|
||||
in: []string{"care.openfeature.dev/will/fail"},
|
||||
expectErr: true,
|
||||
out: []sync.SourceConfig{},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tt := range test {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
out, err := ParseSyncProviderURIs(tt.in)
|
||||
if tt.expectErr {
|
||||
if err == nil {
|
||||
t.Error("expected error, got none")
|
||||
}
|
||||
} else if err != nil {
|
||||
t.Errorf("did not expect error: %s", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(out, tt.out) {
|
||||
t.Errorf("unexpected output, expected %v, got %v", tt.out, out)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -25,6 +25,14 @@ type Sync struct {
|
|||
Mux *msync.RWMutex
|
||||
}
|
||||
|
||||
func NewFileSync(uri string, logger *logger.Logger) *Sync {
|
||||
return &Sync{
|
||||
URI: uri,
|
||||
Logger: logger,
|
||||
Mux: &msync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// default state is used to prevent EOF errors when handling filepath delete events + empty files
|
||||
const defaultState = "{}"
|
||||
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ func (mr *MockFlagSyncServiceClientResponseMockRecorder) Recv() *gomock.Call {
|
|||
}
|
||||
|
||||
// RecvMsg mocks base method.
|
||||
func (m_2 *MockFlagSyncServiceClientResponse) RecvMsg(m interface{}) error {
|
||||
func (m_2 *MockFlagSyncServiceClientResponse) RecvMsg(m any) error {
|
||||
m_2.ctrl.T.Helper()
|
||||
ret := m_2.ctrl.Call(m_2, "RecvMsg", m)
|
||||
ret0, _ := ret[0].(error)
|
||||
|
|
@ -174,7 +174,7 @@ func (mr *MockFlagSyncServiceClientResponseMockRecorder) RecvMsg(m interface{})
|
|||
}
|
||||
|
||||
// SendMsg mocks base method.
|
||||
func (m_2 *MockFlagSyncServiceClientResponse) SendMsg(m interface{}) error {
|
||||
func (m_2 *MockFlagSyncServiceClientResponse) SendMsg(m any) error {
|
||||
m_2.ctrl.T.Helper()
|
||||
ret := m_2.ctrl.Call(m_2, "SendMsg", m)
|
||||
ret0, _ := ret[0].(error)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
package sync
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type Type int
|
||||
|
||||
|
|
@ -57,3 +59,16 @@ type DataSync struct {
|
|||
Source string
|
||||
Type
|
||||
}
|
||||
|
||||
// SourceConfig is configuration option for flagd. This maps to startup parameter sources
|
||||
type SourceConfig struct {
|
||||
URI string `json:"uri"`
|
||||
Provider string `json:"provider"`
|
||||
|
||||
BearerToken string `json:"bearerToken,omitempty"`
|
||||
CertPath string `json:"certPath,omitempty"`
|
||||
TLS bool `json:"tls,omitempty"`
|
||||
ProviderID string `json:"providerID,omitempty"`
|
||||
Selector string `json:"selector,omitempty"`
|
||||
Interval uint32 `json:"interval,omitempty"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
msync "sync"
|
||||
"time"
|
||||
|
|
@ -17,9 +16,7 @@ import (
|
|||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
|
|
@ -29,6 +26,8 @@ var (
|
|||
featureFlagResource = v1beta1.GroupVersion.WithResource("featureflags")
|
||||
)
|
||||
|
||||
type SyncOption func(s *Sync)
|
||||
|
||||
type Sync struct {
|
||||
URI string
|
||||
|
||||
|
|
@ -45,34 +44,16 @@ func NewK8sSync(
|
|||
logger *logger.Logger,
|
||||
uri string,
|
||||
reader client.Reader,
|
||||
dynamic dynamic.Interface,
|
||||
dynamicClient dynamic.Interface,
|
||||
) *Sync {
|
||||
return &Sync{
|
||||
logger: logger,
|
||||
URI: uri,
|
||||
readClient: reader,
|
||||
dynamicClient: dynamic,
|
||||
dynamicClient: dynamicClient,
|
||||
}
|
||||
}
|
||||
|
||||
func GetClients() (client.Reader, dynamic.Interface, error) {
|
||||
clusterConfig, err := k8sClusterConfig()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to create readClient: %w", err)
|
||||
}
|
||||
|
||||
dynamicClient, err := dynamic.NewForConfig(clusterConfig)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to create dynamicClient: %w", err)
|
||||
}
|
||||
return readClient, dynamicClient, nil
|
||||
}
|
||||
|
||||
func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
||||
fetch, err := k.fetch(ctx)
|
||||
if err != nil {
|
||||
|
|
@ -325,32 +306,6 @@ func parseURI(uri string) (string, string, error) {
|
|||
return s[0], s[1], nil
|
||||
}
|
||||
|
||||
// k8sClusterConfig build K8s connection config based available configurations
|
||||
func k8sClusterConfig() (*rest.Config, error) {
|
||||
cfg := os.Getenv("KUBECONFIG")
|
||||
|
||||
var clusterConfig *rest.Config
|
||||
var err error
|
||||
|
||||
if cfg != "" {
|
||||
clusterConfig, err = clientcmd.BuildConfigFromFlags("", cfg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error building cluster config from flags: %w", err)
|
||||
}
|
||||
} else {
|
||||
clusterConfig, err = rest.InClusterConfig()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error fetch cluster config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return clusterConfig, nil
|
||||
}
|
||||
|
||||
func marshallFeatureFlagSpec(ff *v1beta1.FeatureFlag) (string, error) {
|
||||
b, err := json.Marshal(ff.Spec.FlagSpec)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -786,31 +786,6 @@ func TestNotify(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func Test_k8sClusterConfig(t *testing.T) {
|
||||
t.Run("Cannot find KUBECONFIG file", func(tt *testing.T) {
|
||||
tt.Setenv("KUBECONFIG", "")
|
||||
_, err := k8sClusterConfig()
|
||||
if err == nil {
|
||||
tt.Error("Expected error but got none")
|
||||
}
|
||||
})
|
||||
t.Run("KUBECONFIG file not existing", func(tt *testing.T) {
|
||||
tt.Setenv("KUBECONFIG", "value")
|
||||
_, err := k8sClusterConfig()
|
||||
if err == nil {
|
||||
tt.Error("Expected error but got none")
|
||||
}
|
||||
})
|
||||
t.Run("Default REST Config and missing svc account", func(tt *testing.T) {
|
||||
tt.Setenv("KUBERNETES_SERVICE_HOST", "127.0.0.1")
|
||||
tt.Setenv("KUBERNETES_SERVICE_PORT", "8080")
|
||||
_, err := k8sClusterConfig()
|
||||
if err == nil {
|
||||
tt.Error("Expected error but got none")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func Test_NewK8sSync(t *testing.T) {
|
||||
l, err := logger.NewZapLogger(zapcore.FatalLevel, "console")
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import (
|
|||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/service"
|
||||
syncServer "github.com/open-feature/flagd/core/pkg/service/sync"
|
||||
syncStore "github.com/open-feature/flagd/core/pkg/sync-store"
|
||||
"github.com/open-feature/flagd/core/pkg/subscriptions"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
|
@ -70,7 +70,7 @@ var startCmd = &cobra.Command{
|
|||
|
||||
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
syncStore := syncStore.NewSyncStore(ctx, logger)
|
||||
syncStore := subscriptions.NewManager(ctx, logger)
|
||||
s := syncServer.NewServer(logger, syncStore)
|
||||
|
||||
// If --management-port is set use that value. If not and
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@ import (
|
|||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/runtime"
|
||||
"github.com/open-feature/flagd/core/pkg/sync"
|
||||
syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/zap"
|
||||
|
|
@ -140,14 +142,14 @@ var startCmd = &cobra.Command{
|
|||
docsLinkConfiguration)
|
||||
}
|
||||
|
||||
syncProviders, err := runtime.ParseSyncProviderURIs(viper.GetStringSlice(uriFlagName))
|
||||
syncProviders, err := syncbuilder.ParseSyncProviderURIs(viper.GetStringSlice(uriFlagName))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
syncProvidersFromConfig := []runtime.SourceConfig{}
|
||||
syncProvidersFromConfig := []sync.SourceConfig{}
|
||||
if cfgFile == "" && viper.GetString(sourcesFlagName) != "" {
|
||||
syncProvidersFromConfig, err = runtime.ParseSources(viper.GetString(sourcesFlagName))
|
||||
syncProvidersFromConfig, err = syncbuilder.ParseSources(viper.GetString(sourcesFlagName))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue