feat: introduce per-sync configurations (#448)
<!-- Please use this template for your pull request. --> <!-- Please use the sections that you need and delete other sections --> ## This PR <!-- add the description of the PR here --> - introduces the `SyncProviderConfig` object and associated start flags to pass configuration to specific sync providers ### Related Issues <!-- add here the GitHub issue that this PR resolves if applicable --> https://github.com/open-feature/flagd/issues/405 ### Notes <!-- any additional notes for this PR --> ### Follow-up Tasks <!-- anything that is related to this PR but not done here should be noted under this section --> <!-- if there is a need for a new issue, please link it here --> ### How to test <!-- if applicable, add testing instructions under this section --> --------- Signed-off-by: James Milligan <james@omnant.co.uk> Signed-off-by: James Milligan <75740990+james-milligan@users.noreply.github.com> Co-authored-by: Skye Gill <gill.skye95@gmail.com>
This commit is contained in:
parent
6a039cef87
commit
1d80039558
41
cmd/start.go
41
cmd/start.go
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/open-feature/flagd/pkg/logger"
|
"github.com/open-feature/flagd/pkg/logger"
|
||||||
"github.com/open-feature/flagd/pkg/runtime"
|
"github.com/open-feature/flagd/pkg/runtime"
|
||||||
|
"github.com/open-feature/flagd/pkg/sync"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
@ -24,6 +25,7 @@ const (
|
||||||
serverCertPathFlagName = "server-cert-path"
|
serverCertPathFlagName = "server-cert-path"
|
||||||
serverKeyPathFlagName = "server-key-path"
|
serverKeyPathFlagName = "server-key-path"
|
||||||
socketPathFlagName = "socket-path"
|
socketPathFlagName = "socket-path"
|
||||||
|
sourcesFlagName = "sources"
|
||||||
syncProviderFlagName = "sync-provider"
|
syncProviderFlagName = "sync-provider"
|
||||||
uriFlagName = "uri"
|
uriFlagName = "uri"
|
||||||
)
|
)
|
||||||
|
|
@ -44,7 +46,7 @@ func init() {
|
||||||
flags.StringP(serverCertPathFlagName, "c", "", "Server side tls certificate path")
|
flags.StringP(serverCertPathFlagName, "c", "", "Server side tls certificate path")
|
||||||
flags.StringP(serverKeyPathFlagName, "k", "", "Server side tls key path")
|
flags.StringP(serverKeyPathFlagName, "k", "", "Server side tls key path")
|
||||||
flags.StringToStringP(providerArgsFlagName,
|
flags.StringToStringP(providerArgsFlagName,
|
||||||
"a", nil, "Sync provider arguments as key values separated by =")
|
"a", nil, "DEPRECATED: Sync provider arguments as key values separated by =")
|
||||||
flags.StringSliceP(
|
flags.StringSliceP(
|
||||||
uriFlagName, "f", []string{}, "Set a sync provider uri to read data from, this can be a filepath,"+
|
uriFlagName, "f", []string{}, "Set a sync provider uri to read data from, this can be a filepath,"+
|
||||||
"url (http and grpc) or FeatureFlagConfiguration. When flag keys are duplicated across multiple providers the "+
|
"url (http and grpc) or FeatureFlagConfiguration. When flag keys are duplicated across multiple providers the "+
|
||||||
|
|
@ -53,11 +55,16 @@ func init() {
|
||||||
"Please note that if you are using filepath, flagd only supports files with `.yaml/.yml/.json` extension.",
|
"Please note that if you are using filepath, flagd only supports files with `.yaml/.yml/.json` extension.",
|
||||||
)
|
)
|
||||||
flags.StringP(
|
flags.StringP(
|
||||||
bearerTokenFlagName, "b", "", "Set a bearer token to use for remote sync")
|
bearerTokenFlagName, "b", "", "DEPRECATED: Superseded by --sources.")
|
||||||
flags.StringSliceP(corsFlagName, "C", []string{}, "CORS allowed origins, * will allow all origins")
|
flags.StringSliceP(corsFlagName, "C", []string{}, "CORS allowed origins, * will allow all origins")
|
||||||
flags.StringP(
|
flags.StringP(
|
||||||
syncProviderFlagName, "y", "", "DEPRECATED: Set a sync provider e.g. filepath or remote",
|
syncProviderFlagName, "y", "", "DEPRECATED: Set a sync provider e.g. filepath or remote",
|
||||||
)
|
)
|
||||||
|
flags.StringP(
|
||||||
|
sourcesFlagName, "s", "", "JSON representation of an array of SourceConfig objects. This object contains "+
|
||||||
|
"2 required fields, uri (string) and provider (string). Documentation for this object: "+
|
||||||
|
"https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md#sync-provider-customisation",
|
||||||
|
)
|
||||||
flags.StringP(logFormatFlagName, "z", "console", "Set the logging format, e.g. console or json ")
|
flags.StringP(logFormatFlagName, "z", "console", "Set the logging format, e.g. console or json ")
|
||||||
|
|
||||||
_ = viper.BindPFlag(bearerTokenFlagName, flags.Lookup(bearerTokenFlagName))
|
_ = viper.BindPFlag(bearerTokenFlagName, flags.Lookup(bearerTokenFlagName))
|
||||||
|
|
@ -71,6 +78,7 @@ func init() {
|
||||||
_ = viper.BindPFlag(serverKeyPathFlagName, flags.Lookup(serverKeyPathFlagName))
|
_ = viper.BindPFlag(serverKeyPathFlagName, flags.Lookup(serverKeyPathFlagName))
|
||||||
_ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName))
|
_ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName))
|
||||||
_ = viper.BindPFlag(syncProviderFlagName, flags.Lookup(syncProviderFlagName))
|
_ = viper.BindPFlag(syncProviderFlagName, flags.Lookup(syncProviderFlagName))
|
||||||
|
_ = viper.BindPFlag(sourcesFlagName, flags.Lookup(sourcesFlagName))
|
||||||
_ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName))
|
_ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -106,17 +114,40 @@ var startCmd = &cobra.Command{
|
||||||
rtLogger.Warn("DEPRECATED: The --evaluator flag has been deprecated. " +
|
rtLogger.Warn("DEPRECATED: The --evaluator flag has been deprecated. " +
|
||||||
"Docs: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md")
|
"Docs: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if viper.GetString(providerArgsFlagName) != "" {
|
||||||
|
rtLogger.Warn("DEPRECATED: The --sync-provider-args flag has been deprecated. " +
|
||||||
|
"Docs: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md")
|
||||||
|
}
|
||||||
|
|
||||||
|
syncProviders, err := runtime.SyncProvidersFromURIs(viper.GetStringSlice(uriFlagName))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
syncProvidersFromConfig := []sync.SourceConfig{}
|
||||||
|
if cfgFile == "" && viper.GetString(sourcesFlagName) != "" {
|
||||||
|
syncProvidersFromConfig, err = runtime.SyncProviderArgParse(viper.GetString(sourcesFlagName))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = viper.UnmarshalKey(sourcesFlagName, &syncProvidersFromConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
syncProviders = append(syncProviders, syncProvidersFromConfig...)
|
||||||
|
|
||||||
// Build Runtime -----------------------------------------------------------
|
// Build Runtime -----------------------------------------------------------
|
||||||
rt, err := runtime.FromConfig(logger, runtime.Config{
|
rt, err := runtime.FromConfig(logger, runtime.Config{
|
||||||
CORS: viper.GetStringSlice(corsFlagName),
|
CORS: viper.GetStringSlice(corsFlagName),
|
||||||
MetricsPort: viper.GetInt32(metricsPortFlagName),
|
MetricsPort: viper.GetInt32(metricsPortFlagName),
|
||||||
ProviderArgs: viper.GetStringMapString(providerArgsFlagName),
|
|
||||||
ServiceCertPath: viper.GetString(serverCertPathFlagName),
|
ServiceCertPath: viper.GetString(serverCertPathFlagName),
|
||||||
ServiceKeyPath: viper.GetString(serverKeyPathFlagName),
|
ServiceKeyPath: viper.GetString(serverKeyPathFlagName),
|
||||||
ServicePort: viper.GetInt32(portFlagName),
|
ServicePort: viper.GetInt32(portFlagName),
|
||||||
ServiceSocketPath: viper.GetString(socketPathFlagName),
|
ServiceSocketPath: viper.GetString(socketPathFlagName),
|
||||||
SyncBearerToken: viper.GetString(bearerTokenFlagName),
|
SyncProviders: syncProviders,
|
||||||
SyncURI: viper.GetStringSlice(uriFlagName),
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rtLogger.Fatal(err.Error())
|
rtLogger.Fatal(err.Error())
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ Environment variable keys are uppercased, prefixed with `FLAGD_` and all `-` are
|
||||||
|
|
||||||
Config file expects the keys to have the exact naming as the flags.
|
Config file expects the keys to have the exact naming as the flags.
|
||||||
|
|
||||||
### URI patterns
|
### <a name="uri-patterns"></a> URI patterns
|
||||||
|
|
||||||
Any URI passed to flagd via the `--uri` flag must follow one of the 4 following patterns to ensure that it is passed to the correct implementation:
|
Any URI passed to flagd via the `--uri` flag must follow one of the 4 following patterns to ensure that it is passed to the correct implementation:
|
||||||
|
|
||||||
|
|
@ -22,7 +22,6 @@ Any URI passed to flagd via the `--uri` flag must follow one of the 4 following
|
||||||
| Grpc | `grpc://flag-source-url` | `grpc://my-flags-server` |
|
| Grpc | `grpc://flag-source-url` | `grpc://my-flags-server` |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
### Customising sync providers
|
### Customising sync providers
|
||||||
|
|
||||||
Custom sync providers can be used to provide flag evaluation logic.
|
Custom sync providers can be used to provide flag evaluation logic.
|
||||||
|
|
@ -36,3 +35,37 @@ To use an existing FeatureFlagConfiguration custom resource, start flagD with th
|
||||||
```shell
|
```shell
|
||||||
flagd start --uri core.openfeature.dev/default/my_example
|
flagd start --uri core.openfeature.dev/default/my_example
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Source Configuration
|
||||||
|
|
||||||
|
While a URI may be passed to flagd via the `--uri` flag, some implementations may require further configurations. In these cases the `--sources` flag should be used.
|
||||||
|
The flag takes a string argument, which should be a JSON representation of an array of `SourceConfig` objects. Alternatively, these configurations should be passed to
|
||||||
|
flagd via config file, specified using the `--config` flag.
|
||||||
|
|
||||||
|
| Field | Type |
|
||||||
|
|------------|------------------------------------|
|
||||||
|
| uri | required `string` | |
|
||||||
|
| provider | required `string` (`file`, `kubernetes`, `http` or `grpc`) |
|
||||||
|
| bearerToken | optional `string` |
|
||||||
|
|
||||||
|
The `uri` field values do not need to follow the [URI patterns](#uri-patterns), the provider type is instead derived from the provider field. If the prefix is supplied, it will be removed on startup without error.
|
||||||
|
|
||||||
|
Example start command using a filepath sync provider and the equivalent config file definition:
|
||||||
|
```sh
|
||||||
|
./flagd start --sources='[{"uri":"config/samples/example_flags.json","provider":"file"},{"uri":"http://my-flag-source.json","provider":"http","bearerToken":"bearer-dji34ld2l"}]{"uri":"default/my-flag-config","provider":"kubernetes"},{"uri":"grpc://my-flag-source:8080","provider":"grpc"}'
|
||||||
|
```
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
sources:
|
||||||
|
- uri: config/samples/example_flags.json
|
||||||
|
provider: file
|
||||||
|
- uri: http://my-flag-source.json
|
||||||
|
provider: http
|
||||||
|
bearerToken: bearer-dji34ld2l
|
||||||
|
- uri: default/my-flag-config
|
||||||
|
provider: kubernetes
|
||||||
|
- uri: http://my-flag-source.json
|
||||||
|
provider: kubernetes
|
||||||
|
- uri: grpc://my-flag-source:8080
|
||||||
|
provider: grpc
|
||||||
|
```
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ flagd start [flags]
|
||||||
### Options
|
### Options
|
||||||
|
|
||||||
```
|
```
|
||||||
-b, --bearer-token string Set a bearer token to use for remote sync
|
-b, --bearer-token string DEPRECATED: Superseded by --sources.
|
||||||
-C, --cors-origin strings CORS allowed origins, * will allow all origins
|
-C, --cors-origin strings CORS allowed origins, * will allow all origins
|
||||||
-e, --evaluator string DEPRECATED: Set an evaluator e.g. json, yaml/yml.Please note that yaml/yml and json evaluations work the same (yaml/yml files are converted to json internally) (default "json")
|
-e, --evaluator string DEPRECATED: Set an evaluator e.g. json, yaml/yml.Please note that yaml/yml and json evaluations work the same (yaml/yml files are converted to json internally) (default "json")
|
||||||
-h, --help help for start
|
-h, --help help for start
|
||||||
|
|
@ -19,8 +19,9 @@ flagd start [flags]
|
||||||
-c, --server-cert-path string Server side tls certificate path
|
-c, --server-cert-path string Server side tls certificate path
|
||||||
-k, --server-key-path string Server side tls key path
|
-k, --server-key-path string Server side tls key path
|
||||||
-d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
|
-d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
|
||||||
|
-s, --sources string JSON representation of an array of SourceConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md#sync-provider-customisation
|
||||||
-y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote
|
-y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote
|
||||||
-a, --sync-provider-args stringToString Sync provider arguments as key values separated by = (default [])
|
-a, --sync-provider-args stringToString DEPRECATED: Sync provider arguments as key values separated by = (default [])
|
||||||
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
|
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
||||||
2
go.sum
2
go.sum
|
|
@ -407,8 +407,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
|
||||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||||
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||||
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||||
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
|
|
||||||
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
|
|
||||||
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
|
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
|
||||||
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
|
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
|
||||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
package runtime
|
package runtime
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
|
@ -20,6 +22,13 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
syncProviderFile = "file"
|
||||||
|
syncProviderGrpc = "grpc"
|
||||||
|
syncProviderKubernetes = "kubernetes"
|
||||||
|
syncProviderHTTP = "http"
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
regCrd *regexp.Regexp
|
regCrd *regexp.Regexp
|
||||||
regURL *regexp.Regexp
|
regURL *regexp.Regexp
|
||||||
|
|
@ -36,7 +45,11 @@ func init() {
|
||||||
|
|
||||||
func FromConfig(logger *logger.Logger, config Config) (*Runtime, error) {
|
func FromConfig(logger *logger.Logger, config Config) (*Runtime, error) {
|
||||||
s := store.NewFlags()
|
s := store.NewFlags()
|
||||||
s.FlagSources = config.SyncURI
|
sources := []string{}
|
||||||
|
for _, sync := range config.SyncProviders {
|
||||||
|
sources = append(sources, sync.URI)
|
||||||
|
}
|
||||||
|
s.FlagSources = sources
|
||||||
rt := Runtime{
|
rt := Runtime{
|
||||||
config: config,
|
config: config,
|
||||||
Logger: logger.WithFields(zap.String("component", "runtime")),
|
Logger: logger.WithFields(zap.String("component", "runtime")),
|
||||||
|
|
@ -67,17 +80,17 @@ func (r *Runtime) setService(logger *logger.Logger) {
|
||||||
|
|
||||||
func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
|
func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
|
||||||
rtLogger := logger.WithFields(zap.String("component", "runtime"))
|
rtLogger := logger.WithFields(zap.String("component", "runtime"))
|
||||||
r.SyncImpl = make([]sync.ISync, 0, len(r.config.SyncURI))
|
r.SyncImpl = make([]sync.ISync, 0, len(r.config.SyncProviders))
|
||||||
for _, uri := range r.config.SyncURI {
|
for _, syncProvider := range r.config.SyncProviders {
|
||||||
switch uriB := []byte(uri); {
|
switch syncProvider.Provider {
|
||||||
case regFile.Match(uriB):
|
case syncProviderFile:
|
||||||
r.SyncImpl = append(
|
r.SyncImpl = append(
|
||||||
r.SyncImpl,
|
r.SyncImpl,
|
||||||
r.newFile(uri, logger),
|
r.newFile(syncProvider, logger),
|
||||||
)
|
)
|
||||||
rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri))
|
rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", syncProvider.URI))
|
||||||
case regCrd.Match(uriB):
|
case syncProviderKubernetes:
|
||||||
k, err := r.newK8s(uri, logger)
|
k, err := r.newK8s(syncProvider.URI, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -85,29 +98,29 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
|
||||||
r.SyncImpl,
|
r.SyncImpl,
|
||||||
k,
|
k,
|
||||||
)
|
)
|
||||||
rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri))
|
rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", syncProvider.URI))
|
||||||
case regURL.Match(uriB):
|
case syncProviderHTTP:
|
||||||
r.SyncImpl = append(
|
r.SyncImpl = append(
|
||||||
r.SyncImpl,
|
r.SyncImpl,
|
||||||
r.newHTTP(uri, logger),
|
r.newHTTP(syncProvider, logger),
|
||||||
)
|
)
|
||||||
rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri))
|
rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %s", syncProvider.URI))
|
||||||
case regGRPC.Match(uriB):
|
case syncProviderGrpc:
|
||||||
r.SyncImpl = append(
|
r.SyncImpl = append(
|
||||||
r.SyncImpl,
|
r.SyncImpl,
|
||||||
r.newGRPC(uri, logger),
|
r.newGRPC(syncProvider, logger),
|
||||||
)
|
)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+
|
return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+
|
||||||
" or 'core.openfeature.dev'", uri)
|
" or 'core.openfeature.dev'", syncProvider.URI)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync {
|
func (r *Runtime) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync {
|
||||||
return &grpc.Sync{
|
return &grpc.Sync{
|
||||||
Target: grpc.URLToGRPCTarget(uri),
|
Target: grpc.URLToGRPCTarget(config.URI),
|
||||||
Logger: logger.WithFields(
|
Logger: logger.WithFields(
|
||||||
zap.String("component", "sync"),
|
zap.String("component", "sync"),
|
||||||
zap.String("sync", "grpc"),
|
zap.String("sync", "grpc"),
|
||||||
|
|
@ -115,10 +128,9 @@ func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync {
|
func (r *Runtime) newHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync {
|
||||||
return &httpSync.Sync{
|
return &httpSync.Sync{
|
||||||
URI: uri,
|
URI: config.URI,
|
||||||
BearerToken: r.config.SyncBearerToken,
|
|
||||||
Client: &http.Client{
|
Client: &http.Client{
|
||||||
Timeout: time.Second * 10,
|
Timeout: time.Second * 10,
|
||||||
},
|
},
|
||||||
|
|
@ -126,8 +138,8 @@ func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync {
|
||||||
zap.String("component", "sync"),
|
zap.String("component", "sync"),
|
||||||
zap.String("sync", "remote"),
|
zap.String("sync", "remote"),
|
||||||
),
|
),
|
||||||
ProviderArgs: r.config.ProviderArgs,
|
BearerToken: config.BearerToken,
|
||||||
Cron: cron.New(),
|
Cron: cron.New(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -142,20 +154,72 @@ func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, e
|
||||||
zap.String("sync", "kubernetes"),
|
zap.String("sync", "kubernetes"),
|
||||||
),
|
),
|
||||||
regCrd.ReplaceAllString(uri, ""),
|
regCrd.ReplaceAllString(uri, ""),
|
||||||
r.config.ProviderArgs,
|
|
||||||
reader,
|
reader,
|
||||||
dynamic,
|
dynamic,
|
||||||
), nil
|
), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync {
|
func (r *Runtime) newFile(config sync.SourceConfig, logger *logger.Logger) *file.Sync {
|
||||||
return &file.Sync{
|
return &file.Sync{
|
||||||
URI: regFile.ReplaceAllString(uri, ""),
|
URI: config.URI,
|
||||||
Logger: logger.WithFields(
|
Logger: logger.WithFields(
|
||||||
zap.String("component", "sync"),
|
zap.String("component", "sync"),
|
||||||
zap.String("sync", "filepath"),
|
zap.String("sync", "filepath"),
|
||||||
),
|
),
|
||||||
ProviderArgs: r.config.ProviderArgs,
|
Mux: &msync.RWMutex{},
|
||||||
Mux: &msync.RWMutex{},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SyncProviderArgParse(syncProviders string) ([]sync.SourceConfig, error) {
|
||||||
|
syncProvidersParsed := []sync.SourceConfig{}
|
||||||
|
if err := json.Unmarshal([]byte(syncProviders), &syncProvidersParsed); err != nil {
|
||||||
|
return syncProvidersParsed, fmt.Errorf("unable to parse sync providers: %w", err)
|
||||||
|
}
|
||||||
|
for i, sp := range syncProvidersParsed {
|
||||||
|
if sp.URI == "" {
|
||||||
|
return syncProvidersParsed, errors.New("sync provider argument parse: uri is a required field")
|
||||||
|
}
|
||||||
|
if sp.Provider == "" {
|
||||||
|
return syncProvidersParsed, errors.New("sync provider argument parse: provider is a required field")
|
||||||
|
}
|
||||||
|
switch uriB := []byte(sp.URI); {
|
||||||
|
case regFile.Match(uriB):
|
||||||
|
syncProvidersParsed[i].URI = regFile.ReplaceAllString(syncProvidersParsed[i].URI, "")
|
||||||
|
case regCrd.Match(uriB):
|
||||||
|
syncProvidersParsed[i].URI = regCrd.ReplaceAllString(syncProvidersParsed[i].URI, "")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return syncProvidersParsed, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func SyncProvidersFromURIs(uris []string) ([]sync.SourceConfig, error) {
|
||||||
|
syncProvidersParsed := []sync.SourceConfig{}
|
||||||
|
for _, uri := range uris {
|
||||||
|
switch uriB := []byte(uri); {
|
||||||
|
case regFile.Match(uriB):
|
||||||
|
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
|
||||||
|
URI: regFile.ReplaceAllString(uri, ""),
|
||||||
|
Provider: syncProviderFile,
|
||||||
|
})
|
||||||
|
case regCrd.Match(uriB):
|
||||||
|
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
|
||||||
|
URI: regCrd.ReplaceAllString(uri, ""),
|
||||||
|
Provider: syncProviderKubernetes,
|
||||||
|
})
|
||||||
|
case regURL.Match(uriB):
|
||||||
|
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
|
||||||
|
URI: uri,
|
||||||
|
Provider: syncProviderHTTP,
|
||||||
|
})
|
||||||
|
case regGRPC.Match(uriB):
|
||||||
|
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
|
||||||
|
URI: uri,
|
||||||
|
Provider: syncProviderGrpc,
|
||||||
|
})
|
||||||
|
default:
|
||||||
|
return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+
|
||||||
|
"'http(s)://', 'grpc://', or 'core.openfeature.dev'", uri)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return syncProvidersParsed, nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,12 +33,8 @@ type Config struct {
|
||||||
ServiceCertPath string
|
ServiceCertPath string
|
||||||
ServiceKeyPath string
|
ServiceKeyPath string
|
||||||
|
|
||||||
ProviderArgs sync.ProviderArgs
|
SyncProviders []sync.SourceConfig
|
||||||
SyncURI []string
|
CORS []string
|
||||||
RemoteSyncType string
|
|
||||||
SyncBearerToken string
|
|
||||||
|
|
||||||
CORS []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) Start() error {
|
func (r *Runtime) Start() error {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,156 @@
|
||||||
|
package runtime_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/open-feature/flagd/pkg/runtime"
|
||||||
|
"github.com/open-feature/flagd/pkg/sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSyncProviderArgParse(t *testing.T) {
|
||||||
|
test := map[string]struct {
|
||||||
|
in string
|
||||||
|
expectErr bool
|
||||||
|
out []sync.SourceConfig
|
||||||
|
}{
|
||||||
|
"simple": {
|
||||||
|
in: "[{\"uri\":\"config/samples/example_flags.json\",\"provider\":\"file\"}]",
|
||||||
|
expectErr: false,
|
||||||
|
out: []sync.SourceConfig{
|
||||||
|
{
|
||||||
|
URI: "config/samples/example_flags.json",
|
||||||
|
Provider: "file",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"multiple-syncs": {
|
||||||
|
in: `[
|
||||||
|
{"uri":"config/samples/example_flags.json","provider":"file"},
|
||||||
|
{"uri":"http://test.com","provider":"http","bearerToken":":)"},
|
||||||
|
{"uri":"host:port","provider":"grpc"},
|
||||||
|
{"uri":"default/my-crd","provider":"kubernetes"}
|
||||||
|
]`,
|
||||||
|
expectErr: false,
|
||||||
|
out: []sync.SourceConfig{
|
||||||
|
{
|
||||||
|
URI: "config/samples/example_flags.json",
|
||||||
|
Provider: "file",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
URI: "http://test.com",
|
||||||
|
Provider: "http",
|
||||||
|
BearerToken: ":)",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
URI: "host:port",
|
||||||
|
Provider: "grpc",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
URI: "default/my-crd",
|
||||||
|
Provider: "kubernetes",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"empty": {
|
||||||
|
in: `[]`,
|
||||||
|
expectErr: false,
|
||||||
|
out: []sync.SourceConfig{},
|
||||||
|
},
|
||||||
|
"parse-failure": {
|
||||||
|
in: ``,
|
||||||
|
expectErr: true,
|
||||||
|
out: []sync.SourceConfig{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tt := range test {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
out, err := runtime.SyncProviderArgParse(tt.in)
|
||||||
|
if tt.expectErr {
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected error, got none")
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
t.Errorf("did not expect error: %s", err.Error())
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(out, tt.out) {
|
||||||
|
t.Errorf("unexpected output, expected %v, got %v", tt.out, out)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyncProvidersFromURIs(t *testing.T) {
|
||||||
|
test := map[string]struct {
|
||||||
|
in []string
|
||||||
|
expectErr bool
|
||||||
|
out []sync.SourceConfig
|
||||||
|
}{
|
||||||
|
"simple": {
|
||||||
|
in: []string{
|
||||||
|
"file:my-file.json",
|
||||||
|
},
|
||||||
|
expectErr: false,
|
||||||
|
out: []sync.SourceConfig{
|
||||||
|
{
|
||||||
|
URI: "my-file.json",
|
||||||
|
Provider: "file",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"multiple-uris": {
|
||||||
|
in: []string{
|
||||||
|
"file:my-file.json",
|
||||||
|
"https://test.com",
|
||||||
|
"grpc://host:port",
|
||||||
|
"core.openfeature.dev/default/my-crd",
|
||||||
|
},
|
||||||
|
expectErr: false,
|
||||||
|
out: []sync.SourceConfig{
|
||||||
|
{
|
||||||
|
URI: "my-file.json",
|
||||||
|
Provider: "file",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
URI: "https://test.com",
|
||||||
|
Provider: "http",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
URI: "grpc://host:port",
|
||||||
|
Provider: "grpc",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
URI: "default/my-crd",
|
||||||
|
Provider: "kubernetes",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"empty": {
|
||||||
|
in: []string{},
|
||||||
|
expectErr: false,
|
||||||
|
out: []sync.SourceConfig{},
|
||||||
|
},
|
||||||
|
"parse-failure": {
|
||||||
|
in: []string{"care.openfeature.dev/will/fail"},
|
||||||
|
expectErr: true,
|
||||||
|
out: []sync.SourceConfig{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tt := range test {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
out, err := runtime.SyncProvidersFromURIs(tt.in)
|
||||||
|
if tt.expectErr {
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected error, got none")
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
t.Errorf("did not expect error: %s", err.Error())
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(out, tt.out) {
|
||||||
|
t.Errorf("unexpected output, expected %v, got %v", tt.out, out)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -17,9 +17,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Sync struct {
|
type Sync struct {
|
||||||
URI string
|
URI string
|
||||||
Logger *logger.Logger
|
Logger *logger.Logger
|
||||||
ProviderArgs sync.ProviderArgs
|
|
||||||
// FileType indicates the file type e.g., json, yaml/yml etc.,
|
// FileType indicates the file type e.g., json, yaml/yml etc.,
|
||||||
fileType string
|
fileType string
|
||||||
watcher *fsnotify.Watcher
|
watcher *fsnotify.Watcher
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ type Sync struct {
|
||||||
Target string
|
Target string
|
||||||
ProviderID string
|
ProviderID string
|
||||||
Logger *logger.Logger
|
Logger *logger.Logger
|
||||||
|
Mux *msync.RWMutex
|
||||||
|
|
||||||
syncClient syncv1grpc.FlagSyncService_SyncFlagsClient
|
syncClient syncv1grpc.FlagSyncService_SyncFlagsClient
|
||||||
client syncv1grpc.FlagSyncServiceClient
|
client syncv1grpc.FlagSyncServiceClient
|
||||||
|
|
|
||||||
|
|
@ -15,13 +15,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Sync struct {
|
type Sync struct {
|
||||||
URI string
|
URI string
|
||||||
Client Client
|
Client Client
|
||||||
Cron Cron
|
Cron Cron
|
||||||
BearerToken string
|
LastBodySHA string
|
||||||
LastBodySHA string
|
Logger *logger.Logger
|
||||||
Logger *logger.Logger
|
BearerToken string
|
||||||
ProviderArgs sync.ProviderArgs
|
|
||||||
|
|
||||||
ready bool
|
ready bool
|
||||||
}
|
}
|
||||||
|
|
@ -120,7 +119,7 @@ func (hs *Sync) fetchBodyFromURL(ctx context.Context, url string) ([]byte, error
|
||||||
req.Header.Add("Accept", "application/json")
|
req.Header.Add("Accept", "application/json")
|
||||||
|
|
||||||
if hs.BearerToken != "" {
|
if hs.BearerToken != "" {
|
||||||
bearer := "Bearer " + hs.BearerToken
|
bearer := fmt.Sprintf("Bearer %s", hs.BearerToken)
|
||||||
req.Header.Set("Authorization", bearer)
|
req.Header.Set("Authorization", bearer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,6 @@ func TestSimpleSync(t *testing.T) {
|
||||||
URI: "http://localhost",
|
URI: "http://localhost",
|
||||||
Client: mockClient,
|
Client: mockClient,
|
||||||
Cron: mockCron,
|
Cron: mockCron,
|
||||||
BearerToken: "",
|
|
||||||
LastBodySHA: "",
|
LastBodySHA: "",
|
||||||
Logger: logger.NewLogger(nil, false),
|
Logger: logger.NewLogger(nil, false),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,6 @@ package sync
|
||||||
|
|
||||||
import "context"
|
import "context"
|
||||||
|
|
||||||
type ProviderArgs map[string]string
|
|
||||||
|
|
||||||
type Type int
|
type Type int
|
||||||
|
|
||||||
// Type of the sync operation
|
// Type of the sync operation
|
||||||
|
|
@ -59,3 +57,10 @@ type DataSync struct {
|
||||||
Source string
|
Source string
|
||||||
Type
|
Type
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SourceConfig struct {
|
||||||
|
URI string `json:"uri"`
|
||||||
|
Provider string `json:"provider"`
|
||||||
|
|
||||||
|
BearerToken string `json:"bearerToken,omitempty"`
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,6 @@ type Sync struct {
|
||||||
namespace string
|
namespace string
|
||||||
crdName string
|
crdName string
|
||||||
logger *logger.Logger
|
logger *logger.Logger
|
||||||
providerArgs sync.ProviderArgs
|
|
||||||
readClient client.Reader
|
readClient client.Reader
|
||||||
dynamicClient dynamic.Interface
|
dynamicClient dynamic.Interface
|
||||||
informer cache.SharedInformer
|
informer cache.SharedInformer
|
||||||
|
|
@ -44,14 +43,12 @@ type Sync struct {
|
||||||
func NewK8sSync(
|
func NewK8sSync(
|
||||||
logger *logger.Logger,
|
logger *logger.Logger,
|
||||||
uri string,
|
uri string,
|
||||||
providerArgs sync.ProviderArgs,
|
|
||||||
reader client.Reader,
|
reader client.Reader,
|
||||||
dynamic dynamic.Interface,
|
dynamic dynamic.Interface,
|
||||||
) *Sync {
|
) *Sync {
|
||||||
return &Sync{
|
return &Sync{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
URI: uri,
|
URI: uri,
|
||||||
providerArgs: providerArgs,
|
|
||||||
readClient: reader,
|
readClient: reader,
|
||||||
dynamicClient: dynamic,
|
dynamicClient: dynamic,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ import (
|
||||||
"github.com/open-feature/flagd/pkg/sync"
|
"github.com/open-feature/flagd/pkg/sync"
|
||||||
"github.com/open-feature/open-feature-operator/apis/core/v1alpha1"
|
"github.com/open-feature/open-feature-operator/apis/core/v1alpha1"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/client-go/dynamic/fake"
|
"k8s.io/client-go/dynamic/fake"
|
||||||
|
|
@ -813,14 +813,11 @@ func Test_NewK8sSync(t *testing.T) {
|
||||||
}
|
}
|
||||||
const uri = "myURI"
|
const uri = "myURI"
|
||||||
log := logger.NewLogger(l, true)
|
log := logger.NewLogger(l, true)
|
||||||
const key, value = "myKey", "myValue"
|
|
||||||
args := map[string]string{key: value}
|
|
||||||
rc := newFakeReadClient()
|
rc := newFakeReadClient()
|
||||||
dc := fake.NewSimpleDynamicClient(runtime.NewScheme())
|
dc := fake.NewSimpleDynamicClient(runtime.NewScheme())
|
||||||
k := NewK8sSync(
|
k := NewK8sSync(
|
||||||
log,
|
log,
|
||||||
uri,
|
uri,
|
||||||
args,
|
|
||||||
rc,
|
rc,
|
||||||
dc,
|
dc,
|
||||||
)
|
)
|
||||||
|
|
@ -833,9 +830,6 @@ func Test_NewK8sSync(t *testing.T) {
|
||||||
if k.logger != log {
|
if k.logger != log {
|
||||||
t.Errorf("Object not initialized with the right logger")
|
t.Errorf("Object not initialized with the right logger")
|
||||||
}
|
}
|
||||||
if k.providerArgs[key] != value {
|
|
||||||
t.Errorf("Object not initialized with the right arguments")
|
|
||||||
}
|
|
||||||
if k.readClient != rc {
|
if k.readClient != rc {
|
||||||
t.Errorf("Object not initialized with the right K8s client")
|
t.Errorf("Object not initialized with the right K8s client")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue