Compare commits
11 Commits
flagd-prox
...
main
Author | SHA1 | Date |
---|---|---|
|
52f344fa01 | |
|
f9ce46f103 | |
|
3228ad8951 | |
|
5c5c1cfe84 | |
|
2f49ea7ed7 | |
|
2d40e28b92 | |
|
849ce39827 | |
|
1ee636aa03 | |
|
5e2d7d7176 | |
|
e8fd680860 | |
|
750aa176b5 |
|
@ -20,4 +20,7 @@ site
|
|||
.cache/
|
||||
|
||||
# coverage results
|
||||
*coverage.out
|
||||
*coverage.out
|
||||
|
||||
# benchmark results
|
||||
benchmark.txt
|
|
@ -1,5 +1,5 @@
|
|||
{
|
||||
"flagd": "0.12.8",
|
||||
"flagd": "0.12.9",
|
||||
"flagd-proxy": "0.8.0",
|
||||
"core": "0.12.0"
|
||||
"core": "0.12.1"
|
||||
}
|
|
@ -3,4 +3,4 @@
|
|||
#
|
||||
# Managed by Peribolos: https://github.com/open-feature/community/blob/main/config/open-feature/cloud-native/workgroup.yaml
|
||||
#
|
||||
* @open-feature/cloud-native-maintainers @open-feature/maintainers
|
||||
* @open-feature/flagd-maintainers @open-feature/maintainers
|
||||
|
|
|
@ -42,7 +42,7 @@ export GOPRIVATE=buf.build/gen/go
|
|||
|
||||
### Manual testing
|
||||
|
||||
flagd has a number of interfaces (you can read more about than at [flagd.dev](https://flagd.dev/)) which can be used to evaluate flags, or deliver flag configurations so that they can be evaluated by _in-process_ providers.
|
||||
flagd has a number of interfaces (you can read more about them at [flagd.dev](https://flagd.dev/)) which can be used to evaluate flags, or deliver flag configurations so that they can be evaluated by _in-process_ providers.
|
||||
|
||||
You can manually test this functionality by starting flagd (from the flagd/ directory) with `go run main.go start -f file:../config/samples/example_flags.flagd.json`.
|
||||
|
||||
|
@ -69,7 +69,7 @@ curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags
|
|||
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{"flagKey":"myBoolFlag"}' localhost:8013 flagd.evaluation.v1.Service/ResolveBoolean | jq
|
||||
```
|
||||
|
||||
#### Remote bulk evaluation via via HTTP1.1/OFREP
|
||||
#### Remote bulk evaluation via HTTP1.1/OFREP
|
||||
|
||||
```sh
|
||||
# evaluates flags in bulk
|
||||
|
@ -83,6 +83,13 @@ curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags
|
|||
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/ResolveAll | jq
|
||||
```
|
||||
|
||||
#### Remote event streaming via gRPC
|
||||
|
||||
```sh
|
||||
# notifies of flag changes (but does not evaluate)
|
||||
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/EventStream
|
||||
```
|
||||
|
||||
#### Flag configuration fetch via gRPC
|
||||
|
||||
```sh
|
||||
|
@ -93,7 +100,7 @@ grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintex
|
|||
#### Flag synchronization stream via gRPC
|
||||
|
||||
```sh
|
||||
# will open a persistent stream which sends flag changes when the watched source is modified
|
||||
# will open a persistent stream which sends flag changes when the watched source is modified
|
||||
grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
|
||||
```
|
||||
|
||||
|
|
13
Makefile
13
Makefile
|
@ -47,12 +47,19 @@ test-flagd:
|
|||
go test -race -covermode=atomic -cover -short ./flagd/pkg/... -coverprofile=flagd-coverage.out
|
||||
test-flagd-proxy:
|
||||
go test -race -covermode=atomic -cover -short ./flagd-proxy/pkg/... -coverprofile=flagd-proxy-coverage.out
|
||||
flagd-integration-test: # dependent on ./bin/flagd start -f file:test-harness/flags/testing-flags.json -f file:test-harness/flags/custom-ops.json -f file:test-harness/flags/evaluator-refs.json -f file:test-harness/flags/zero-flags.json
|
||||
go test -cover ./test/integration $(ARGS)
|
||||
flagd-benchmark-test:
|
||||
go test -bench=Bench -short -benchtime=5s -benchmem ./core/... | tee benchmark.txt
|
||||
flagd-integration-test-harness:
|
||||
# target used to start a locally built flagd with the e2e flags
|
||||
cd flagd; go run main.go start -f file:../test-harness/flags/testing-flags.json -f file:../test-harness/flags/custom-ops.json -f file:../test-harness/flags/evaluator-refs.json -f file:../test-harness/flags/zero-flags.json -f file:../test-harness/flags/edge-case-flags.json
|
||||
flagd-integration-test: # dependent on flagd-e2e-test-harness if not running in github actions
|
||||
go test -count=1 -cover ./test/integration $(ARGS)
|
||||
run: # default to flagd
|
||||
make run-flagd
|
||||
run-flagd:
|
||||
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json
|
||||
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json
|
||||
run-flagd-selector-demo:
|
||||
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json -f file:../config/samples/example_flags.flagd.2.json
|
||||
install:
|
||||
cp systemd/flagd.service /etc/systemd/system/flagd.service
|
||||
mkdir -p /etc/flagd
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/certreloader 15.986s
|
||||
goos: linux
|
||||
goarch: amd64
|
||||
pkg: github.com/open-feature/flagd/core/pkg/evaluator
|
||||
cpu: 11th Gen Intel(R) Core(TM) i9-11950H @ 2.60GHz
|
||||
BenchmarkFractionalEvaluation/test_a@faas.com-16 423930 13316 ns/op 7229 B/op 135 allocs/op
|
||||
BenchmarkFractionalEvaluation/test_b@faas.com-16 469594 13677 ns/op 7229 B/op 135 allocs/op
|
||||
BenchmarkFractionalEvaluation/test_c@faas.com-16 569103 13286 ns/op 7229 B/op 135 allocs/op
|
||||
BenchmarkFractionalEvaluation/test_d@faas.com-16 412386 13023 ns/op 7229 B/op 135 allocs/op
|
||||
BenchmarkResolveBooleanValue/test_staticBoolFlag-16 3106903 1792 ns/op 1008 B/op 11 allocs/op
|
||||
BenchmarkResolveBooleanValue/test_targetingBoolFlag-16 448164 11250 ns/op 6065 B/op 87 allocs/op
|
||||
BenchmarkResolveBooleanValue/test_staticObjectFlag-16 3958750 1476 ns/op 1008 B/op 11 allocs/op
|
||||
BenchmarkResolveBooleanValue/test_missingFlag-16 5331808 1353 ns/op 784 B/op 12 allocs/op
|
||||
BenchmarkResolveBooleanValue/test_disabledFlag-16 4530751 1301 ns/op 1072 B/op 13 allocs/op
|
||||
BenchmarkResolveStringValue/test_staticStringFlag-16 4583056 1525 ns/op 1040 B/op 13 allocs/op
|
||||
BenchmarkResolveStringValue/test_targetingStringFlag-16 839954 10388 ns/op 6097 B/op 89 allocs/op
|
||||
BenchmarkResolveStringValue/test_staticObjectFlag-16 4252830 1677 ns/op 1008 B/op 11 allocs/op
|
||||
BenchmarkResolveStringValue/test_missingFlag-16 3743324 1495 ns/op 784 B/op 12 allocs/op
|
||||
BenchmarkResolveStringValue/test_disabledFlag-16 3495699 1709 ns/op 1072 B/op 13 allocs/op
|
||||
BenchmarkResolveFloatValue/test:_staticFloatFlag-16 4382868 1511 ns/op 1024 B/op 13 allocs/op
|
||||
BenchmarkResolveFloatValue/test:_targetingFloatFlag-16 867987 10344 ns/op 6081 B/op 89 allocs/op
|
||||
BenchmarkResolveFloatValue/test:_staticObjectFlag-16 3913120 1695 ns/op 1008 B/op 11 allocs/op
|
||||
BenchmarkResolveFloatValue/test:_missingFlag-16 3910468 1349 ns/op 784 B/op 12 allocs/op
|
||||
BenchmarkResolveFloatValue/test:_disabledFlag-16 3642919 1666 ns/op 1072 B/op 13 allocs/op
|
||||
BenchmarkResolveIntValue/test_staticIntFlag-16 4077288 1349 ns/op 1008 B/op 11 allocs/op
|
||||
BenchmarkResolveIntValue/test_targetingNumberFlag-16 922383 7601 ns/op 6065 B/op 87 allocs/op
|
||||
BenchmarkResolveIntValue/test_staticObjectFlag-16 4995128 1229 ns/op 1008 B/op 11 allocs/op
|
||||
BenchmarkResolveIntValue/test_missingFlag-16 5574153 1274 ns/op 768 B/op 12 allocs/op
|
||||
BenchmarkResolveIntValue/test_disabledFlag-16 3633708 1734 ns/op 1072 B/op 13 allocs/op
|
||||
BenchmarkResolveObjectValue/test_staticObjectFlag-16 1624102 4559 ns/op 2243 B/op 37 allocs/op
|
||||
BenchmarkResolveObjectValue/test_targetingObjectFlag-16 443880 11995 ns/op 7283 B/op 109 allocs/op
|
||||
BenchmarkResolveObjectValue/test_staticBoolFlag-16 3462445 1665 ns/op 1008 B/op 11 allocs/op
|
||||
BenchmarkResolveObjectValue/test_missingFlag-16 4207567 1458 ns/op 784 B/op 12 allocs/op
|
||||
BenchmarkResolveObjectValue/test_disabledFlag-16 3407262 1848 ns/op 1072 B/op 13 allocs/op
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/evaluator 239.506s
|
||||
? github.com/open-feature/flagd/core/pkg/evaluator/mock [no test files]
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/logger 0.003s
|
||||
? github.com/open-feature/flagd/core/pkg/model [no test files]
|
||||
? github.com/open-feature/flagd/core/pkg/service [no test files]
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/service/ofrep 0.002s
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/store 0.003s
|
||||
? github.com/open-feature/flagd/core/pkg/sync [no test files]
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/sync/blob 0.016s
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/sync/builder 0.018s
|
||||
? github.com/open-feature/flagd/core/pkg/sync/builder/mock [no test files]
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/sync/file 1.007s
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/sync/grpc 8.011s
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/sync/grpc/credentials 0.008s
|
||||
? github.com/open-feature/flagd/core/pkg/sync/grpc/credentials/mock [no test files]
|
||||
? github.com/open-feature/flagd/core/pkg/sync/grpc/mock [no test files]
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/sync/grpc/nameresolvers 0.002s
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/sync/http 4.006s
|
||||
? github.com/open-feature/flagd/core/pkg/sync/http/mock [no test files]
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/sync/kubernetes 0.016s
|
||||
? github.com/open-feature/flagd/core/pkg/sync/testing [no test files]
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/telemetry 0.016s
|
||||
PASS
|
||||
ok github.com/open-feature/flagd/core/pkg/utils 0.002s
|
|
@ -0,0 +1,17 @@
|
|||
{
|
||||
"$schema": "https://flagd.dev/schema/v0/flags.json",
|
||||
"metadata": {
|
||||
"flagSetId": "other",
|
||||
"version": "v1"
|
||||
},
|
||||
"flags": {
|
||||
"myStringFlag": {
|
||||
"state": "ENABLED",
|
||||
"variants": {
|
||||
"dupe1": "dupe1",
|
||||
"dupe2": "dupe2"
|
||||
},
|
||||
"defaultVariant": "dupe1"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,5 +1,13 @@
|
|||
# Changelog
|
||||
|
||||
## [0.12.1](https://github.com/open-feature/flagd/compare/core/v0.12.0...core/v0.12.1) (2025-07-28)
|
||||
|
||||
|
||||
### 🧹 Chore
|
||||
|
||||
* add back file-delete test ([#1694](https://github.com/open-feature/flagd/issues/1694)) ([750aa17](https://github.com/open-feature/flagd/commit/750aa176b5a8dd24a9daaff985ff6efeb084c758))
|
||||
* fix benchmark ([#1698](https://github.com/open-feature/flagd/issues/1698)) ([5e2d7d7](https://github.com/open-feature/flagd/commit/5e2d7d7176ba05e667cd92acd7decb531a8de2f6))
|
||||
|
||||
## [0.12.0](https://github.com/open-feature/flagd/compare/core/v0.11.8...core/v0.12.0) (2025-07-21)
|
||||
|
||||
|
||||
|
|
|
@ -11,6 +11,8 @@ import (
|
|||
)
|
||||
|
||||
func TestFractionalEvaluation(t *testing.T) {
|
||||
const source = "testSource"
|
||||
var sources = []string{source}
|
||||
ctx := context.Background()
|
||||
|
||||
commonFlags := Flags{
|
||||
|
@ -458,8 +460,13 @@ func TestFractionalEvaluation(t *testing.T) {
|
|||
for name, tt := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
log := logger.NewLogger(nil, false)
|
||||
je := NewJSON(log, store.NewFlags())
|
||||
je.store.Flags = tt.flags.Flags
|
||||
s, err := store.NewStore(log, sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
|
||||
je := NewJSON(log, s)
|
||||
je.store.Update(source, tt.flags.Flags, model.Metadata{})
|
||||
|
||||
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
|
||||
|
||||
|
@ -486,6 +493,8 @@ func TestFractionalEvaluation(t *testing.T) {
|
|||
}
|
||||
|
||||
func BenchmarkFractionalEvaluation(b *testing.B) {
|
||||
const source = "testSource"
|
||||
var sources = []string{source}
|
||||
ctx := context.Background()
|
||||
|
||||
flags := Flags{
|
||||
|
@ -508,7 +517,7 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
|
|||
},
|
||||
{
|
||||
"fractional": [
|
||||
"email",
|
||||
{"var": "email"},
|
||||
[
|
||||
"red",
|
||||
25
|
||||
|
@ -542,41 +551,41 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
|
|||
expectedReason string
|
||||
expectedErrorCode string
|
||||
}{
|
||||
"test@faas.com": {
|
||||
"test_a@faas.com": {
|
||||
flags: flags,
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "test@faas.com",
|
||||
"email": "test_a@faas.com",
|
||||
},
|
||||
expectedVariant: "blue",
|
||||
expectedValue: "#0000FF",
|
||||
expectedReason: model.TargetingMatchReason,
|
||||
},
|
||||
"test_b@faas.com": {
|
||||
flags: flags,
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "test_b@faas.com",
|
||||
},
|
||||
expectedVariant: "red",
|
||||
expectedValue: "#FF0000",
|
||||
expectedReason: model.TargetingMatchReason,
|
||||
},
|
||||
"test2@faas.com": {
|
||||
"test_c@faas.com": {
|
||||
flags: flags,
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "test2@faas.com",
|
||||
"email": "test_c@faas.com",
|
||||
},
|
||||
expectedVariant: "yellow",
|
||||
expectedValue: "#FFFF00",
|
||||
expectedVariant: "green",
|
||||
expectedValue: "#00FF00",
|
||||
expectedReason: model.TargetingMatchReason,
|
||||
},
|
||||
"test3@faas.com": {
|
||||
"test_d@faas.com": {
|
||||
flags: flags,
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "test3@faas.com",
|
||||
},
|
||||
expectedVariant: "red",
|
||||
expectedValue: "#FF0000",
|
||||
expectedReason: model.TargetingMatchReason,
|
||||
},
|
||||
"test4@faas.com": {
|
||||
flags: flags,
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "test4@faas.com",
|
||||
"email": "test_d@faas.com",
|
||||
},
|
||||
expectedVariant: "blue",
|
||||
expectedValue: "#0000FF",
|
||||
|
@ -587,7 +596,13 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
|
|||
for name, tt := range tests {
|
||||
b.Run(name, func(b *testing.B) {
|
||||
log := logger.NewLogger(nil, false)
|
||||
je := NewJSON(log, &store.State{Flags: tt.flags.Flags})
|
||||
s, err := store.NewStore(log, sources)
|
||||
if err != nil {
|
||||
b.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
je := NewJSON(log, s)
|
||||
je.store.Update(source, tt.flags.Flags, model.Metadata{})
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
value, variant, reason, _, err := resolve[string](
|
||||
ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
|
||||
|
|
|
@ -35,7 +35,7 @@ IEvaluator is an extension of IResolver, allowing storage updates and retrievals
|
|||
*/
|
||||
type IEvaluator interface {
|
||||
GetState() (string, error)
|
||||
SetState(payload sync.DataSync) (model.Metadata, bool, error)
|
||||
SetState(payload sync.DataSync) (map[string]interface{}, bool, error)
|
||||
IResolver
|
||||
}
|
||||
|
||||
|
|
|
@ -64,13 +64,13 @@ func WithEvaluator(name string, evalFunc func(interface{}, interface{}) interfac
|
|||
|
||||
// JSON evaluator
|
||||
type JSON struct {
|
||||
store *store.State
|
||||
store *store.Store
|
||||
Logger *logger.Logger
|
||||
jsonEvalTracer trace.Tracer
|
||||
Resolver
|
||||
}
|
||||
|
||||
func NewJSON(logger *logger.Logger, s *store.State, opts ...JSONEvaluatorOption) *JSON {
|
||||
func NewJSON(logger *logger.Logger, s *store.Store, opts ...JSONEvaluatorOption) *JSON {
|
||||
logger = logger.WithFields(
|
||||
zap.String("component", "evaluator"),
|
||||
zap.String("evaluator", "json"),
|
||||
|
@ -118,7 +118,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
|
|||
var events map[string]interface{}
|
||||
var reSync bool
|
||||
|
||||
events, reSync = je.store.Update(je.Logger, payload.Source, payload.Selector, definition.Flags, definition.Metadata)
|
||||
events, reSync = je.store.Update(payload.Source, definition.Flags, definition.Metadata)
|
||||
|
||||
// Number of events correlates to the number of flags changed through this sync, record it
|
||||
span.SetAttributes(attribute.Int("feature_flag.change_count", len(events)))
|
||||
|
@ -139,7 +139,6 @@ func NewResolver(store store.IStore, logger *logger.Logger, jsonEvalTracer trace
|
|||
jsonlogic.AddOperator(StartsWithEvaluationName, NewStringComparisonEvaluator(logger).StartsWithEvaluation)
|
||||
jsonlogic.AddOperator(EndsWithEvaluationName, NewStringComparisonEvaluator(logger).EndsWithEvaluation)
|
||||
jsonlogic.AddOperator(SemVerEvaluationName, NewSemVerComparison(logger).SemVerEvaluation)
|
||||
jsonlogic.AddOperator(LegacyFractionEvaluationName, NewLegacyFractional(logger).LegacyFractionalEvaluation)
|
||||
|
||||
return Resolver{store: store, Logger: logger, tracer: jsonEvalTracer}
|
||||
}
|
||||
|
@ -150,8 +149,12 @@ func (je *Resolver) ResolveAllValues(ctx context.Context, reqID string, context
|
|||
_, span := je.tracer.Start(ctx, "resolveAll")
|
||||
defer span.End()
|
||||
|
||||
var err error
|
||||
allFlags, flagSetMetadata, err := je.store.GetAll(ctx)
|
||||
var selector store.Selector
|
||||
s := ctx.Value(store.SelectorContextKey{})
|
||||
if s != nil {
|
||||
selector = s.(store.Selector)
|
||||
}
|
||||
allFlags, flagSetMetadata, err := je.store.GetAll(ctx, &selector)
|
||||
if err != nil {
|
||||
return nil, flagSetMetadata, fmt.Errorf("error retreiving flags from the store: %w", err)
|
||||
}
|
||||
|
@ -302,19 +305,19 @@ func resolve[T constraints](ctx context.Context, reqID string, key string, conte
|
|||
func (je *Resolver) evaluateVariant(ctx context.Context, reqID string, flagKey string, evalCtx map[string]any) (
|
||||
variant string, variants map[string]interface{}, reason string, metadata map[string]interface{}, err error,
|
||||
) {
|
||||
flag, metadata, ok := je.store.Get(ctx, flagKey)
|
||||
if !ok {
|
||||
|
||||
var selector store.Selector
|
||||
s := ctx.Value(store.SelectorContextKey{})
|
||||
if s != nil {
|
||||
selector = s.(store.Selector)
|
||||
}
|
||||
flag, metadata, err := je.store.Get(ctx, flagKey, &selector)
|
||||
if err != nil {
|
||||
// flag not found
|
||||
je.Logger.DebugWithID(reqID, fmt.Sprintf("requested flag could not be found: %s", flagKey))
|
||||
return "", map[string]interface{}{}, model.ErrorReason, metadata, errors.New(model.FlagNotFoundErrorCode)
|
||||
}
|
||||
|
||||
// add selector to evaluation metadata
|
||||
selector := je.store.SelectorForFlag(ctx, flag)
|
||||
if selector != "" {
|
||||
metadata[SelectorMetadataKey] = selector
|
||||
}
|
||||
|
||||
for key, value := range flag.Metadata {
|
||||
// If value is not nil or empty, copy to metadata
|
||||
if value != nil {
|
||||
|
|
|
@ -383,7 +383,7 @@ var Flags = fmt.Sprintf(`{
|
|||
|
||||
func TestGetState_Valid_ContainsFlag(t *testing.T) {
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error")
|
||||
}
|
||||
|
@ -405,7 +405,7 @@ func TestSetState_Invalid_Error(t *testing.T) {
|
|||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
// set state with an invalid flag definition
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: InvalidFlags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: InvalidFlags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error")
|
||||
}
|
||||
|
@ -415,7 +415,7 @@ func TestSetState_Valid_NoError(t *testing.T) {
|
|||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
// set state with a valid flag definition
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -423,7 +423,7 @@ func TestSetState_Valid_NoError(t *testing.T) {
|
|||
|
||||
func TestResolveAllValues(t *testing.T) {
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -476,62 +476,6 @@ func TestResolveAllValues(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMetadataResolveType(t *testing.T) {
|
||||
tests := []struct {
|
||||
flagKey string
|
||||
metadata model.Metadata
|
||||
}{
|
||||
{StaticBoolFlag, model.Metadata{"flagSetId": FlagSetID, "version": Version}},
|
||||
{MetadataFlag, model.Metadata{"flagSetId": FlagSetID, "version": VersionOverride}},
|
||||
}
|
||||
const reqID = "default"
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
_, _, _, metadata, _ := evaluator.ResolveBooleanValue(context.TODO(), reqID, test.flagKey, nil)
|
||||
if !reflect.DeepEqual(test.metadata, metadata) {
|
||||
t.Errorf("expected metadata to be %v, but got %v", test.metadata, metadata)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetadataResolveAll(t *testing.T) {
|
||||
expectedFlagSetMetadata := model.Metadata{"flagSetId": FlagSetID, "version": Version}
|
||||
|
||||
tests := []struct {
|
||||
flagKey string
|
||||
metadata model.Metadata
|
||||
}{
|
||||
{StaticBoolFlag, model.Metadata{"flagSetId": FlagSetID, "version": Version}},
|
||||
{MetadataFlag, model.Metadata{"flagSetId": FlagSetID, "version": VersionOverride}},
|
||||
}
|
||||
const reqID = "default"
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
resolutions, flagSetMetadata, _ := evaluator.ResolveAllValues(context.TODO(), reqID, nil)
|
||||
|
||||
for _, resolved := range resolutions {
|
||||
if resolved.FlagKey == test.flagKey {
|
||||
if !reflect.DeepEqual(test.metadata, resolved.Metadata) {
|
||||
t.Errorf("expected flag metadata to be %v, but got %v", test.metadata, resolved.Metadata)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(expectedFlagSetMetadata, flagSetMetadata) {
|
||||
t.Errorf("expected flag set metadata to be %v, but got %v", expectedFlagSetMetadata, flagSetMetadata)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveBooleanValue(t *testing.T) {
|
||||
tests := []struct {
|
||||
flagKey string
|
||||
|
@ -548,7 +492,7 @@ func TestResolveBooleanValue(t *testing.T) {
|
|||
}
|
||||
const reqID = "default"
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -583,7 +527,7 @@ func BenchmarkResolveBooleanValue(b *testing.B) {
|
|||
}
|
||||
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
b.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -623,7 +567,7 @@ func TestResolveStringValue(t *testing.T) {
|
|||
}
|
||||
const reqID = "default"
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -659,7 +603,7 @@ func BenchmarkResolveStringValue(b *testing.B) {
|
|||
}
|
||||
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
b.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -699,7 +643,7 @@ func TestResolveFloatValue(t *testing.T) {
|
|||
}
|
||||
const reqID = "default"
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -735,7 +679,7 @@ func BenchmarkResolveFloatValue(b *testing.B) {
|
|||
}
|
||||
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
b.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -775,7 +719,7 @@ func TestResolveIntValue(t *testing.T) {
|
|||
}
|
||||
const reqID = "default"
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -811,7 +755,7 @@ func BenchmarkResolveIntValue(b *testing.B) {
|
|||
}
|
||||
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
b.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -851,7 +795,7 @@ func TestResolveObjectValue(t *testing.T) {
|
|||
}
|
||||
const reqID = "default"
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -890,7 +834,7 @@ func BenchmarkResolveObjectValue(b *testing.B) {
|
|||
}
|
||||
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
b.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -935,7 +879,7 @@ func TestResolveAsAnyValue(t *testing.T) {
|
|||
}
|
||||
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
@ -971,7 +915,7 @@ func TestResolve_DefaultVariant(t *testing.T) {
|
|||
for _, test := range tests {
|
||||
t.Run("", func(t *testing.T) {
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: test.flags})
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: test.flags, Source: "testSource"})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
|
@ -1038,7 +982,7 @@ func TestSetState_DefaultVariantValidation(t *testing.T) {
|
|||
t.Run(name, func(t *testing.T) {
|
||||
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.jsonFlags})
|
||||
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.jsonFlags, Source: "testSource"})
|
||||
|
||||
if tt.valid && err != nil {
|
||||
t.Error(err)
|
||||
|
@ -1097,7 +1041,7 @@ func TestState_Evaluator(t *testing.T) {
|
|||
},
|
||||
"defaultVariant": "recursive",
|
||||
"state": "ENABLED",
|
||||
"source":"",
|
||||
"source":"testSource",
|
||||
"selector":"",
|
||||
"targeting": {
|
||||
"if": [
|
||||
|
@ -1157,7 +1101,7 @@ func TestState_Evaluator(t *testing.T) {
|
|||
},
|
||||
"defaultVariant": "recursive",
|
||||
"state": "ENABLED",
|
||||
"source":"",
|
||||
"source":"testSource",
|
||||
"selector":"",
|
||||
"targeting": {
|
||||
"if": [
|
||||
|
@ -1233,7 +1177,7 @@ func TestState_Evaluator(t *testing.T) {
|
|||
"off": false
|
||||
},
|
||||
"defaultVariant": "off",
|
||||
"source":"",
|
||||
"source":"testSource",
|
||||
"targeting": {
|
||||
"if": [
|
||||
{
|
||||
|
@ -1267,7 +1211,7 @@ func TestState_Evaluator(t *testing.T) {
|
|||
},
|
||||
"defaultVariant": "recursive",
|
||||
"state": "ENABLED",
|
||||
"source":"",
|
||||
"source":"testSource",
|
||||
"selector":"",
|
||||
"targeting": {
|
||||
"if": [
|
||||
|
@ -1286,7 +1230,7 @@ func TestState_Evaluator(t *testing.T) {
|
|||
"off": false
|
||||
},
|
||||
"defaultVariant": "off",
|
||||
"source":"",
|
||||
"source":"testSource",
|
||||
"selector":"",
|
||||
"targeting": {
|
||||
"if": [
|
||||
|
@ -1344,7 +1288,7 @@ func TestState_Evaluator(t *testing.T) {
|
|||
t.Run(name, func(t *testing.T) {
|
||||
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
_, resync, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.inputState})
|
||||
_, resync, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.inputState, Source: "testSource"})
|
||||
if err != nil {
|
||||
if !tt.expectedError {
|
||||
t.Error(err)
|
||||
|
@ -1377,8 +1321,8 @@ func TestState_Evaluator(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(expectedOutputJSON["flags"], gotOutputJSON["flags"]) {
|
||||
t.Errorf("expected state: %v got state: %v", expectedOutputJSON, gotOutputJSON)
|
||||
if !reflect.DeepEqual(expectedOutputJSON["flags"], gotOutputJSON) {
|
||||
t.Errorf("expected state: %v got state: %v", expectedOutputJSON["flags"], gotOutputJSON)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -1451,7 +1395,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
|
|||
t.Run(name, func(t *testing.T) {
|
||||
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -1474,7 +1418,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
|
|||
errChan <- nil
|
||||
return
|
||||
default:
|
||||
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
|
@ -1516,7 +1460,7 @@ func TestFlagdAmbientProperties(t *testing.T) {
|
|||
t.Run("flagKeyIsInTheContext", func(t *testing.T) {
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
|
||||
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
|
||||
"flags": {
|
||||
"welcome-banner": {
|
||||
"state": "ENABLED",
|
||||
|
@ -1556,7 +1500,7 @@ func TestFlagdAmbientProperties(t *testing.T) {
|
|||
t.Run("timestampIsInTheContext", func(t *testing.T) {
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
|
||||
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
|
||||
"flags": {
|
||||
"welcome-banner": {
|
||||
"state": "ENABLED",
|
||||
|
@ -1590,7 +1534,7 @@ func TestTargetingVariantBehavior(t *testing.T) {
|
|||
t.Run("missing variant error", func(t *testing.T) {
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
|
||||
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
|
||||
"flags": {
|
||||
"missing-variant": {
|
||||
"state": "ENABLED",
|
||||
|
@ -1618,7 +1562,7 @@ func TestTargetingVariantBehavior(t *testing.T) {
|
|||
t.Run("null fallback", func(t *testing.T) {
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
|
||||
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
|
||||
"flags": {
|
||||
"null-fallback": {
|
||||
"state": "ENABLED",
|
||||
|
@ -1651,7 +1595,7 @@ func TestTargetingVariantBehavior(t *testing.T) {
|
|||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
//nolint:dupword
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
|
||||
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
|
||||
"flags": {
|
||||
"match-boolean": {
|
||||
"state": "ENABLED",
|
||||
|
|
|
@ -1,145 +0,0 @@
|
|||
// This evaluation type is deprecated and will be removed before v1.
|
||||
// Do not enhance it or use it for reference.
|
||||
|
||||
package evaluator
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/zeebo/xxh3"
|
||||
)
|
||||
|
||||
const (
|
||||
LegacyFractionEvaluationName = "fractionalEvaluation"
|
||||
LegacyFractionEvaluationLink = "https://flagd.dev/concepts/#migrating-from-legacy-fractionalevaluation"
|
||||
)
|
||||
|
||||
// Deprecated: LegacyFractional is deprecated. This will be removed prior to v1 release.
|
||||
type LegacyFractional struct {
|
||||
Logger *logger.Logger
|
||||
}
|
||||
|
||||
type legacyFractionalEvaluationDistribution struct {
|
||||
variant string
|
||||
percentage int
|
||||
}
|
||||
|
||||
func NewLegacyFractional(logger *logger.Logger) *LegacyFractional {
|
||||
return &LegacyFractional{Logger: logger}
|
||||
}
|
||||
|
||||
func (fe *LegacyFractional) LegacyFractionalEvaluation(values, data interface{}) interface{} {
|
||||
fe.Logger.Warn(
|
||||
fmt.Sprintf("%s is deprecated, please use %s, see: %s",
|
||||
LegacyFractionEvaluationName,
|
||||
FractionEvaluationName,
|
||||
LegacyFractionEvaluationLink))
|
||||
|
||||
valueToDistribute, feDistributions, err := parseLegacyFractionalEvaluationData(values, data)
|
||||
if err != nil {
|
||||
fe.Logger.Error(fmt.Sprintf("parse fractional evaluation data: %v", err))
|
||||
return nil
|
||||
}
|
||||
|
||||
return distributeLegacyValue(valueToDistribute, feDistributions)
|
||||
}
|
||||
|
||||
func parseLegacyFractionalEvaluationData(values, data interface{}) (string,
|
||||
[]legacyFractionalEvaluationDistribution, error,
|
||||
) {
|
||||
valuesArray, ok := values.([]interface{})
|
||||
if !ok {
|
||||
return "", nil, errors.New("fractional evaluation data is not an array")
|
||||
}
|
||||
if len(valuesArray) < 2 {
|
||||
return "", nil, errors.New("fractional evaluation data has length under 2")
|
||||
}
|
||||
|
||||
bucketBy, ok := valuesArray[0].(string)
|
||||
if !ok {
|
||||
return "", nil, errors.New("first element of fractional evaluation data isn't of type string")
|
||||
}
|
||||
|
||||
dataMap, ok := data.(map[string]interface{})
|
||||
if !ok {
|
||||
return "", nil, errors.New("data isn't of type map[string]interface{}")
|
||||
}
|
||||
|
||||
v, ok := dataMap[bucketBy]
|
||||
if !ok {
|
||||
return "", nil, nil
|
||||
}
|
||||
|
||||
valueToDistribute, ok := v.(string)
|
||||
if !ok {
|
||||
return "", nil, fmt.Errorf("var: %s isn't of type string", bucketBy)
|
||||
}
|
||||
|
||||
feDistributions, err := parseLegacyFractionalEvaluationDistributions(valuesArray)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
return valueToDistribute, feDistributions, nil
|
||||
}
|
||||
|
||||
func parseLegacyFractionalEvaluationDistributions(values []interface{}) (
|
||||
[]legacyFractionalEvaluationDistribution, error,
|
||||
) {
|
||||
sumOfPercentages := 0
|
||||
var feDistributions []legacyFractionalEvaluationDistribution
|
||||
for i := 1; i < len(values); i++ {
|
||||
distributionArray, ok := values[i].([]interface{})
|
||||
if !ok {
|
||||
return nil, errors.New("distribution elements aren't of type []interface{}")
|
||||
}
|
||||
|
||||
if len(distributionArray) != 2 {
|
||||
return nil, errors.New("distribution element isn't length 2")
|
||||
}
|
||||
|
||||
variant, ok := distributionArray[0].(string)
|
||||
if !ok {
|
||||
return nil, errors.New("first element of distribution element isn't string")
|
||||
}
|
||||
|
||||
percentage, ok := distributionArray[1].(float64)
|
||||
if !ok {
|
||||
return nil, errors.New("second element of distribution element isn't float")
|
||||
}
|
||||
|
||||
sumOfPercentages += int(percentage)
|
||||
|
||||
feDistributions = append(feDistributions, legacyFractionalEvaluationDistribution{
|
||||
variant: variant,
|
||||
percentage: int(percentage),
|
||||
})
|
||||
}
|
||||
|
||||
if sumOfPercentages != 100 {
|
||||
return nil, fmt.Errorf("percentages must sum to 100, got: %d", sumOfPercentages)
|
||||
}
|
||||
|
||||
return feDistributions, nil
|
||||
}
|
||||
|
||||
func distributeLegacyValue(value string, feDistribution []legacyFractionalEvaluationDistribution) string {
|
||||
hashValue := xxh3.HashString(value)
|
||||
|
||||
hashRatio := float64(hashValue) / math.Pow(2, 64) // divide the hash value by the largest possible value, integer 2^64
|
||||
|
||||
bucket := int(hashRatio * 100) // integer in range [0, 99]
|
||||
|
||||
rangeEnd := 0
|
||||
for _, dist := range feDistribution {
|
||||
rangeEnd += dist.percentage
|
||||
if bucket < rangeEnd {
|
||||
return dist.variant
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
|
@ -1,300 +0,0 @@
|
|||
package evaluator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
)
|
||||
|
||||
func TestLegacyFractionalEvaluation(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
flags := Flags{
|
||||
Flags: map[string]model.Flag{
|
||||
"headerColor": {
|
||||
State: "ENABLED",
|
||||
DefaultVariant: "red",
|
||||
Variants: map[string]any{
|
||||
"red": "#FF0000",
|
||||
"blue": "#0000FF",
|
||||
"green": "#00FF00",
|
||||
"yellow": "#FFFF00",
|
||||
},
|
||||
Targeting: []byte(`{
|
||||
"if": [
|
||||
{
|
||||
"in": ["@faas.com", {
|
||||
"var": ["email"]
|
||||
}]
|
||||
},
|
||||
{
|
||||
"fractionalEvaluation": [
|
||||
"email",
|
||||
[
|
||||
"red",
|
||||
25
|
||||
],
|
||||
[
|
||||
"blue",
|
||||
25
|
||||
],
|
||||
[
|
||||
"green",
|
||||
25
|
||||
],
|
||||
[
|
||||
"yellow",
|
||||
25
|
||||
]
|
||||
]
|
||||
}, null
|
||||
]
|
||||
}`),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests := map[string]struct {
|
||||
flags Flags
|
||||
flagKey string
|
||||
context map[string]any
|
||||
expectedValue string
|
||||
expectedVariant string
|
||||
expectedReason string
|
||||
expectedErrorCode string
|
||||
}{
|
||||
"test@faas.com": {
|
||||
flags: flags,
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "test@faas.com",
|
||||
},
|
||||
expectedVariant: "red",
|
||||
expectedValue: "#FF0000",
|
||||
expectedReason: model.TargetingMatchReason,
|
||||
},
|
||||
"test2@faas.com": {
|
||||
flags: flags,
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "test2@faas.com",
|
||||
},
|
||||
expectedVariant: "yellow",
|
||||
expectedValue: "#FFFF00",
|
||||
expectedReason: model.TargetingMatchReason,
|
||||
},
|
||||
"test3@faas.com": {
|
||||
flags: flags,
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "test3@faas.com",
|
||||
},
|
||||
expectedVariant: "red",
|
||||
expectedValue: "#FF0000",
|
||||
expectedReason: model.TargetingMatchReason,
|
||||
},
|
||||
"test4@faas.com": {
|
||||
flags: flags,
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "test4@faas.com",
|
||||
},
|
||||
expectedVariant: "blue",
|
||||
expectedValue: "#0000FF",
|
||||
expectedReason: model.TargetingMatchReason,
|
||||
},
|
||||
"non even split": {
|
||||
flags: Flags{
|
||||
Flags: map[string]model.Flag{
|
||||
"headerColor": {
|
||||
State: "ENABLED",
|
||||
DefaultVariant: "red",
|
||||
Variants: map[string]any{
|
||||
"red": "#FF0000",
|
||||
"blue": "#0000FF",
|
||||
"green": "#00FF00",
|
||||
"yellow": "#FFFF00",
|
||||
},
|
||||
Targeting: []byte(`{
|
||||
"if": [
|
||||
{
|
||||
"in": ["@faas.com", {
|
||||
"var": ["email"]
|
||||
}]
|
||||
},
|
||||
{
|
||||
"fractionalEvaluation": [
|
||||
"email",
|
||||
[
|
||||
"red",
|
||||
50
|
||||
],
|
||||
[
|
||||
"blue",
|
||||
25
|
||||
],
|
||||
[
|
||||
"green",
|
||||
25
|
||||
]
|
||||
]
|
||||
}, null
|
||||
]
|
||||
}`),
|
||||
},
|
||||
},
|
||||
},
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "test4@faas.com",
|
||||
},
|
||||
expectedVariant: "red",
|
||||
expectedValue: "#FF0000",
|
||||
expectedReason: model.TargetingMatchReason,
|
||||
},
|
||||
"fallback to default variant if no email provided": {
|
||||
flags: Flags{
|
||||
Flags: map[string]model.Flag{
|
||||
"headerColor": {
|
||||
State: "ENABLED",
|
||||
DefaultVariant: "red",
|
||||
Variants: map[string]any{
|
||||
"red": "#FF0000",
|
||||
"blue": "#0000FF",
|
||||
"green": "#00FF00",
|
||||
"yellow": "#FFFF00",
|
||||
},
|
||||
Targeting: []byte(`{
|
||||
"fractionalEvaluation": [
|
||||
"email",
|
||||
[
|
||||
"red",
|
||||
25
|
||||
],
|
||||
[
|
||||
"blue",
|
||||
25
|
||||
],
|
||||
[
|
||||
"green",
|
||||
25
|
||||
],
|
||||
[
|
||||
"yellow",
|
||||
25
|
||||
]
|
||||
]
|
||||
}`),
|
||||
},
|
||||
},
|
||||
},
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{},
|
||||
expectedVariant: "",
|
||||
expectedValue: "",
|
||||
expectedReason: model.ErrorReason,
|
||||
expectedErrorCode: model.GeneralErrorCode,
|
||||
},
|
||||
"fallback to default variant if invalid variant as result of fractional evaluation": {
|
||||
flags: Flags{
|
||||
Flags: map[string]model.Flag{
|
||||
"headerColor": {
|
||||
State: "ENABLED",
|
||||
DefaultVariant: "red",
|
||||
Variants: map[string]any{
|
||||
"red": "#FF0000",
|
||||
"blue": "#0000FF",
|
||||
"green": "#00FF00",
|
||||
"yellow": "#FFFF00",
|
||||
},
|
||||
Targeting: []byte(`{
|
||||
"fractionalEvaluation": [
|
||||
"email",
|
||||
[
|
||||
"black",
|
||||
100
|
||||
]
|
||||
]
|
||||
}`),
|
||||
},
|
||||
},
|
||||
},
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "foo@foo.com",
|
||||
},
|
||||
expectedVariant: "",
|
||||
expectedValue: "",
|
||||
expectedReason: model.ErrorReason,
|
||||
expectedErrorCode: model.GeneralErrorCode,
|
||||
},
|
||||
"fallback to default variant if percentages don't sum to 100": {
|
||||
flags: Flags{
|
||||
Flags: map[string]model.Flag{
|
||||
"headerColor": {
|
||||
State: "ENABLED",
|
||||
DefaultVariant: "red",
|
||||
Variants: map[string]any{
|
||||
"red": "#FF0000",
|
||||
"blue": "#0000FF",
|
||||
"green": "#00FF00",
|
||||
"yellow": "#FFFF00",
|
||||
},
|
||||
Targeting: []byte(`{
|
||||
"fractionalEvaluation": [
|
||||
"email",
|
||||
[
|
||||
"red",
|
||||
25
|
||||
],
|
||||
[
|
||||
"blue",
|
||||
25
|
||||
]
|
||||
]
|
||||
}`),
|
||||
},
|
||||
},
|
||||
},
|
||||
flagKey: "headerColor",
|
||||
context: map[string]any{
|
||||
"email": "foo@foo.com",
|
||||
},
|
||||
expectedVariant: "red",
|
||||
expectedValue: "#FF0000",
|
||||
expectedReason: model.DefaultReason,
|
||||
},
|
||||
}
|
||||
const reqID = "default"
|
||||
for name, tt := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
log := logger.NewLogger(nil, false)
|
||||
je := NewJSON(log, store.NewFlags())
|
||||
je.store.Flags = tt.flags.Flags
|
||||
|
||||
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
|
||||
|
||||
if value != tt.expectedValue {
|
||||
t.Errorf("expected value '%s', got '%s'", tt.expectedValue, value)
|
||||
}
|
||||
|
||||
if variant != tt.expectedVariant {
|
||||
t.Errorf("expected variant '%s', got '%s'", tt.expectedVariant, variant)
|
||||
}
|
||||
|
||||
if reason != tt.expectedReason {
|
||||
t.Errorf("expected reason '%s', got '%s'", tt.expectedReason, reason)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
errorCode := err.Error()
|
||||
if errorCode != tt.expectedErrorCode {
|
||||
t.Errorf("expected err '%v', got '%v'", tt.expectedErrorCode, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -316,6 +316,8 @@ func TestSemVerOperator_Compare(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestJSONEvaluator_semVerEvaluation(t *testing.T) {
|
||||
const source = "testSource"
|
||||
var sources = []string{source}
|
||||
ctx := context.Background()
|
||||
|
||||
tests := map[string]struct {
|
||||
|
@ -922,8 +924,12 @@ func TestJSONEvaluator_semVerEvaluation(t *testing.T) {
|
|||
for name, tt := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
log := logger.NewLogger(nil, false)
|
||||
je := NewJSON(log, store.NewFlags())
|
||||
je.store.Flags = tt.flags.Flags
|
||||
s, err := store.NewStore(log, sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
je := NewJSON(log, s)
|
||||
je.store.Update(source, tt.flags.Flags, model.Metadata{})
|
||||
|
||||
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@ import (
|
|||
)
|
||||
|
||||
func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
|
||||
const source = "testSource"
|
||||
var sources = []string{source}
|
||||
ctx := context.Background()
|
||||
|
||||
tests := map[string]struct {
|
||||
|
@ -185,8 +187,12 @@ func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
|
|||
for name, tt := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
log := logger.NewLogger(nil, false)
|
||||
je := NewJSON(log, store.NewFlags())
|
||||
je.store.Flags = tt.flags.Flags
|
||||
s, err := store.NewStore(log, sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
je := NewJSON(log, s)
|
||||
je.store.Update(source, tt.flags.Flags, model.Metadata{})
|
||||
|
||||
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
|
||||
|
||||
|
@ -210,6 +216,8 @@ func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestJSONEvaluator_endsWithEvaluation(t *testing.T) {
|
||||
const source = "testSource"
|
||||
var sources = []string{source}
|
||||
ctx := context.Background()
|
||||
|
||||
tests := map[string]struct {
|
||||
|
@ -382,9 +390,12 @@ func TestJSONEvaluator_endsWithEvaluation(t *testing.T) {
|
|||
for name, tt := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
log := logger.NewLogger(nil, false)
|
||||
je := NewJSON(log, store.NewFlags())
|
||||
|
||||
je.store.Flags = tt.flags.Flags
|
||||
s, err := store.NewStore(log, sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
je := NewJSON(log, s)
|
||||
je.store.Update(source, tt.flags.Flags, model.Metadata{})
|
||||
|
||||
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
|
||||
|
||||
|
|
|
@ -2,7 +2,15 @@ package model
|
|||
|
||||
import "encoding/json"
|
||||
|
||||
const Key = "Key"
|
||||
const FlagSetId = "FlagSetId"
|
||||
const Source = "Source"
|
||||
const Priority = "Priority"
|
||||
|
||||
type Flag struct {
|
||||
Key string `json:"-"` // not serialized, used only for indexing
|
||||
FlagSetId string `json:"-"` // not serialized, used only for indexing
|
||||
Priority int `json:"-"` // not serialized, used only for indexing
|
||||
State string `json:"state"`
|
||||
DefaultVariant string `json:"defaultVariant"`
|
||||
Variants map[string]any `json:"variants"`
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
package notifications
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
)
|
||||
|
||||
const typeField = "type"
|
||||
|
||||
// Use to represent change notifications for mode PROVIDER_CONFIGURATION_CHANGE events.
|
||||
type Notifications map[string]any
|
||||
|
||||
// Generate notifications (deltas) from old and new flag sets for use in RPC mode PROVIDER_CONFIGURATION_CHANGE events.
|
||||
func NewFromFlags(oldFlags, newFlags map[string]model.Flag) Notifications {
|
||||
notifications := map[string]interface{}{}
|
||||
|
||||
// flags removed
|
||||
for key := range oldFlags {
|
||||
if _, ok := newFlags[key]; !ok {
|
||||
notifications[key] = map[string]interface{}{
|
||||
typeField: string(model.NotificationDelete),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flags added or modified
|
||||
for key, newFlag := range newFlags {
|
||||
oldFlag, exists := oldFlags[key]
|
||||
if !exists {
|
||||
notifications[key] = map[string]interface{}{
|
||||
typeField: string(model.NotificationCreate),
|
||||
}
|
||||
} else if !flagsEqual(oldFlag, newFlag) {
|
||||
notifications[key] = map[string]interface{}{
|
||||
typeField: string(model.NotificationUpdate),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return notifications
|
||||
}
|
||||
|
||||
func flagsEqual(a, b model.Flag) bool {
|
||||
return a.State == b.State &&
|
||||
a.DefaultVariant == b.DefaultVariant &&
|
||||
reflect.DeepEqual(a.Variants, b.Variants) &&
|
||||
reflect.DeepEqual(a.Targeting, b.Targeting) &&
|
||||
a.Source == b.Source &&
|
||||
a.Selector == b.Selector &&
|
||||
reflect.DeepEqual(a.Metadata, b.Metadata)
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
package notifications
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNewFromFlags(t *testing.T) {
|
||||
flagA := model.Flag{
|
||||
Key: "flagA",
|
||||
State: "ENABLED",
|
||||
DefaultVariant: "on",
|
||||
Source: "source1",
|
||||
}
|
||||
flagAUpdated := model.Flag{
|
||||
Key: "flagA",
|
||||
State: "DISABLED",
|
||||
DefaultVariant: "on",
|
||||
Source: "source1",
|
||||
}
|
||||
flagB := model.Flag{
|
||||
Key: "flagB",
|
||||
State: "ENABLED",
|
||||
DefaultVariant: "off",
|
||||
Source: "source1",
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
oldFlags map[string]model.Flag
|
||||
newFlags map[string]model.Flag
|
||||
want Notifications
|
||||
}{
|
||||
{
|
||||
name: "flag added",
|
||||
oldFlags: map[string]model.Flag{},
|
||||
newFlags: map[string]model.Flag{"flagA": flagA},
|
||||
want: Notifications{
|
||||
"flagA": map[string]interface{}{
|
||||
"type": string(model.NotificationCreate),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "flag deleted",
|
||||
oldFlags: map[string]model.Flag{"flagA": flagA},
|
||||
newFlags: map[string]model.Flag{},
|
||||
want: Notifications{
|
||||
"flagA": map[string]interface{}{
|
||||
"type": string(model.NotificationDelete),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "flag changed",
|
||||
oldFlags: map[string]model.Flag{"flagA": flagA},
|
||||
newFlags: map[string]model.Flag{"flagA": flagAUpdated},
|
||||
want: Notifications{
|
||||
"flagA": map[string]interface{}{
|
||||
"type": string(model.NotificationUpdate),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "flag unchanged",
|
||||
oldFlags: map[string]model.Flag{"flagA": flagA},
|
||||
newFlags: map[string]model.Flag{"flagA": flagA},
|
||||
want: Notifications{},
|
||||
},
|
||||
{
|
||||
name: "mixed changes",
|
||||
oldFlags: map[string]model.Flag{
|
||||
"flagA": flagA,
|
||||
"flagB": flagB,
|
||||
},
|
||||
newFlags: map[string]model.Flag{
|
||||
"flagA": flagAUpdated, // updated
|
||||
"flagC": flagA, // added
|
||||
},
|
||||
want: Notifications{
|
||||
"flagA": map[string]interface{}{
|
||||
"type": string(model.NotificationUpdate),
|
||||
},
|
||||
"flagB": map[string]interface{}{
|
||||
"type": string(model.NotificationDelete),
|
||||
},
|
||||
"flagC": map[string]interface{}{
|
||||
"type": string(model.NotificationCreate),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := NewFromFlags(tt.oldFlags, tt.newFlags)
|
||||
assert.Equal(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,253 +0,0 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"maps"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
)
|
||||
|
||||
type del = struct{}
|
||||
|
||||
var deleteMarker *del
|
||||
|
||||
type IStore interface {
|
||||
GetAll(ctx context.Context) (map[string]model.Flag, model.Metadata, error)
|
||||
Get(ctx context.Context, key string) (model.Flag, model.Metadata, bool)
|
||||
SelectorForFlag(ctx context.Context, flag model.Flag) string
|
||||
}
|
||||
|
||||
type State struct {
|
||||
mx sync.RWMutex
|
||||
Flags map[string]model.Flag `json:"flags"`
|
||||
FlagSources []string
|
||||
SourceDetails map[string]SourceDetails `json:"sourceMetadata,omitempty"`
|
||||
MetadataPerSource map[string]model.Metadata `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
type SourceDetails struct {
|
||||
Source string
|
||||
Selector string
|
||||
}
|
||||
|
||||
func (f *State) hasPriority(stored string, new string) bool {
|
||||
if stored == new {
|
||||
return true
|
||||
}
|
||||
for i := len(f.FlagSources) - 1; i >= 0; i-- {
|
||||
switch f.FlagSources[i] {
|
||||
case stored:
|
||||
return false
|
||||
case new:
|
||||
return true
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func NewFlags() *State {
|
||||
return &State{
|
||||
Flags: map[string]model.Flag{},
|
||||
SourceDetails: map[string]SourceDetails{},
|
||||
MetadataPerSource: map[string]model.Metadata{},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *State) Set(key string, flag model.Flag) {
|
||||
f.mx.Lock()
|
||||
defer f.mx.Unlock()
|
||||
f.Flags[key] = flag
|
||||
}
|
||||
|
||||
func (f *State) Get(_ context.Context, key string) (model.Flag, model.Metadata, bool) {
|
||||
f.mx.RLock()
|
||||
defer f.mx.RUnlock()
|
||||
metadata := f.getMetadata()
|
||||
flag, ok := f.Flags[key]
|
||||
if ok {
|
||||
metadata = f.GetMetadataForSource(flag.Source)
|
||||
}
|
||||
|
||||
return flag, metadata, ok
|
||||
}
|
||||
|
||||
func (f *State) SelectorForFlag(_ context.Context, flag model.Flag) string {
|
||||
f.mx.RLock()
|
||||
defer f.mx.RUnlock()
|
||||
|
||||
return f.SourceDetails[flag.Source].Selector
|
||||
}
|
||||
|
||||
func (f *State) Delete(key string) {
|
||||
f.mx.Lock()
|
||||
defer f.mx.Unlock()
|
||||
delete(f.Flags, key)
|
||||
}
|
||||
|
||||
func (f *State) String() (string, error) {
|
||||
f.mx.RLock()
|
||||
defer f.mx.RUnlock()
|
||||
bytes, err := json.Marshal(f)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("unable to marshal flags: %w", err)
|
||||
}
|
||||
|
||||
return string(bytes), nil
|
||||
}
|
||||
|
||||
// GetAll returns a copy of the store's state (copy in order to be concurrency safe)
|
||||
func (f *State) GetAll(_ context.Context) (map[string]model.Flag, model.Metadata, error) {
|
||||
f.mx.RLock()
|
||||
defer f.mx.RUnlock()
|
||||
flags := make(map[string]model.Flag, len(f.Flags))
|
||||
|
||||
for key, flag := range f.Flags {
|
||||
flags[key] = flag
|
||||
}
|
||||
|
||||
return flags, f.getMetadata(), nil
|
||||
}
|
||||
|
||||
// Add new flags from source.
|
||||
func (f *State) Add(logger *logger.Logger, source string, selector string, flags map[string]model.Flag,
|
||||
) map[string]interface{} {
|
||||
notifications := map[string]interface{}{}
|
||||
|
||||
for k, newFlag := range flags {
|
||||
storedFlag, _, ok := f.Get(context.Background(), k)
|
||||
if ok && !f.hasPriority(storedFlag.Source, source) {
|
||||
logger.Debug(
|
||||
fmt.Sprintf(
|
||||
"not overwriting: flag %s from source %s does not have priority over %s",
|
||||
k,
|
||||
source,
|
||||
storedFlag.Source,
|
||||
),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
notifications[k] = map[string]interface{}{
|
||||
"type": string(model.NotificationCreate),
|
||||
"source": source,
|
||||
}
|
||||
|
||||
// Store the new version of the flag
|
||||
newFlag.Source = source
|
||||
newFlag.Selector = selector
|
||||
f.Set(k, newFlag)
|
||||
}
|
||||
|
||||
return notifications
|
||||
}
|
||||
|
||||
// Update the flag state with the provided flags.
|
||||
func (f *State) Update(
|
||||
logger *logger.Logger,
|
||||
source string,
|
||||
selector string,
|
||||
flags map[string]model.Flag,
|
||||
metadata model.Metadata,
|
||||
) (map[string]interface{}, bool) {
|
||||
notifications := map[string]interface{}{}
|
||||
resyncRequired := false
|
||||
f.mx.Lock()
|
||||
f.setSourceMetadata(source, metadata)
|
||||
|
||||
for k, v := range f.Flags {
|
||||
if v.Source == source && v.Selector == selector {
|
||||
if _, ok := flags[k]; !ok {
|
||||
// flag has been deleted
|
||||
delete(f.Flags, k)
|
||||
notifications[k] = map[string]interface{}{
|
||||
"type": string(model.NotificationDelete),
|
||||
"source": source,
|
||||
}
|
||||
resyncRequired = true
|
||||
logger.Debug(
|
||||
fmt.Sprintf(
|
||||
"store resync triggered: flag %s has been deleted from source %s",
|
||||
k, source,
|
||||
),
|
||||
)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
f.mx.Unlock()
|
||||
for k, newFlag := range flags {
|
||||
newFlag.Source = source
|
||||
newFlag.Selector = selector
|
||||
storedFlag, _, ok := f.Get(context.Background(), k)
|
||||
if ok {
|
||||
if !f.hasPriority(storedFlag.Source, source) {
|
||||
logger.Debug(
|
||||
fmt.Sprintf(
|
||||
"not merging: flag %s from source %s does not have priority over %s",
|
||||
k, source, storedFlag.Source,
|
||||
),
|
||||
)
|
||||
continue
|
||||
}
|
||||
if reflect.DeepEqual(storedFlag, newFlag) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
notifications[k] = map[string]interface{}{
|
||||
"type": string(model.NotificationCreate),
|
||||
"source": source,
|
||||
}
|
||||
} else {
|
||||
notifications[k] = map[string]interface{}{
|
||||
"type": string(model.NotificationUpdate),
|
||||
"source": source,
|
||||
}
|
||||
}
|
||||
// Store the new version of the flag
|
||||
f.Set(k, newFlag)
|
||||
}
|
||||
return notifications, resyncRequired
|
||||
}
|
||||
|
||||
func (f *State) GetMetadataForSource(source string) model.Metadata {
|
||||
perSource, ok := f.MetadataPerSource[source]
|
||||
if ok && perSource != nil {
|
||||
return maps.Clone(perSource)
|
||||
}
|
||||
return model.Metadata{}
|
||||
}
|
||||
|
||||
func (f *State) getMetadata() model.Metadata {
|
||||
metadata := model.Metadata{}
|
||||
for _, perSource := range f.MetadataPerSource {
|
||||
for key, entry := range perSource {
|
||||
_, exists := metadata[key]
|
||||
if !exists {
|
||||
metadata[key] = entry
|
||||
} else {
|
||||
metadata[key] = deleteMarker
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// keys that exist across multiple sources are deleted
|
||||
maps.DeleteFunc(metadata, func(key string, _ interface{}) bool {
|
||||
return metadata[key] == deleteMarker
|
||||
})
|
||||
|
||||
return metadata
|
||||
}
|
||||
|
||||
func (f *State) setSourceMetadata(source string, metadata model.Metadata) {
|
||||
if f.MetadataPerSource == nil {
|
||||
f.MetadataPerSource = map[string]model.Metadata{}
|
||||
}
|
||||
|
||||
f.MetadataPerSource[source] = metadata
|
||||
}
|
|
@ -1,326 +0,0 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestHasPriority(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
currentState *State
|
||||
storedSource string
|
||||
newSource string
|
||||
hasPriority bool
|
||||
}{
|
||||
{
|
||||
name: "same source",
|
||||
currentState: &State{},
|
||||
storedSource: "A",
|
||||
newSource: "A",
|
||||
hasPriority: true,
|
||||
},
|
||||
{
|
||||
name: "no priority",
|
||||
currentState: &State{
|
||||
FlagSources: []string{
|
||||
"B",
|
||||
"A",
|
||||
},
|
||||
},
|
||||
storedSource: "A",
|
||||
newSource: "B",
|
||||
hasPriority: false,
|
||||
},
|
||||
{
|
||||
name: "priority",
|
||||
currentState: &State{
|
||||
FlagSources: []string{
|
||||
"A",
|
||||
"B",
|
||||
},
|
||||
},
|
||||
storedSource: "A",
|
||||
newSource: "B",
|
||||
hasPriority: true,
|
||||
},
|
||||
{
|
||||
name: "not in sources",
|
||||
currentState: &State{
|
||||
FlagSources: []string{
|
||||
"A",
|
||||
"B",
|
||||
},
|
||||
},
|
||||
storedSource: "C",
|
||||
newSource: "D",
|
||||
hasPriority: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := tt.currentState.hasPriority(tt.storedSource, tt.newSource)
|
||||
require.Equal(t, p, tt.hasPriority)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeFlags(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
current *State
|
||||
new map[string]model.Flag
|
||||
newSource string
|
||||
newSelector string
|
||||
want *State
|
||||
wantNotifs map[string]interface{}
|
||||
wantResync bool
|
||||
}{
|
||||
{
|
||||
name: "both nil",
|
||||
current: &State{Flags: nil},
|
||||
new: nil,
|
||||
want: &State{Flags: nil},
|
||||
wantNotifs: map[string]interface{}{},
|
||||
},
|
||||
{
|
||||
name: "both empty flags",
|
||||
current: &State{Flags: map[string]model.Flag{}},
|
||||
new: map[string]model.Flag{},
|
||||
want: &State{Flags: map[string]model.Flag{}},
|
||||
wantNotifs: map[string]interface{}{},
|
||||
},
|
||||
{
|
||||
name: "empty new",
|
||||
current: &State{Flags: map[string]model.Flag{}},
|
||||
new: nil,
|
||||
want: &State{Flags: map[string]model.Flag{}},
|
||||
wantNotifs: map[string]interface{}{},
|
||||
},
|
||||
{
|
||||
name: "merging with new source",
|
||||
current: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"waka": {
|
||||
DefaultVariant: "off",
|
||||
Source: "1",
|
||||
},
|
||||
},
|
||||
},
|
||||
new: map[string]model.Flag{
|
||||
"paka": {
|
||||
DefaultVariant: "on",
|
||||
},
|
||||
},
|
||||
newSource: "2",
|
||||
want: &State{Flags: map[string]model.Flag{
|
||||
"waka": {
|
||||
DefaultVariant: "off",
|
||||
Source: "1",
|
||||
},
|
||||
"paka": {
|
||||
DefaultVariant: "on",
|
||||
Source: "2",
|
||||
},
|
||||
}},
|
||||
wantNotifs: map[string]interface{}{"paka": map[string]interface{}{"type": "write", "source": "2"}},
|
||||
},
|
||||
{
|
||||
name: "override by new update",
|
||||
current: &State{Flags: map[string]model.Flag{
|
||||
"waka": {DefaultVariant: "off"},
|
||||
"paka": {DefaultVariant: "off"},
|
||||
}},
|
||||
new: map[string]model.Flag{
|
||||
"waka": {DefaultVariant: "on"},
|
||||
"paka": {DefaultVariant: "on"},
|
||||
},
|
||||
want: &State{Flags: map[string]model.Flag{
|
||||
"waka": {DefaultVariant: "on"},
|
||||
"paka": {DefaultVariant: "on"},
|
||||
}},
|
||||
wantNotifs: map[string]interface{}{
|
||||
"waka": map[string]interface{}{"type": "update", "source": ""},
|
||||
"paka": map[string]interface{}{"type": "update", "source": ""},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "identical update so empty notifications",
|
||||
current: &State{
|
||||
Flags: map[string]model.Flag{"hello": {DefaultVariant: "off"}},
|
||||
},
|
||||
new: map[string]model.Flag{
|
||||
"hello": {DefaultVariant: "off"},
|
||||
},
|
||||
want: &State{Flags: map[string]model.Flag{
|
||||
"hello": {DefaultVariant: "off"},
|
||||
}},
|
||||
wantNotifs: map[string]interface{}{},
|
||||
},
|
||||
{
|
||||
name: "deleted flag & trigger resync for same source",
|
||||
current: &State{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A"}}},
|
||||
new: map[string]model.Flag{},
|
||||
newSource: "A",
|
||||
want: &State{Flags: map[string]model.Flag{}},
|
||||
wantNotifs: map[string]interface{}{"hello": map[string]interface{}{"type": "delete", "source": "A"}},
|
||||
wantResync: true,
|
||||
},
|
||||
{
|
||||
name: "no deleted & no resync for same source but different selector",
|
||||
current: &State{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A", Selector: "X"}}},
|
||||
new: map[string]model.Flag{},
|
||||
newSource: "A",
|
||||
newSelector: "Y",
|
||||
want: &State{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A", Selector: "X"}}},
|
||||
wantResync: false,
|
||||
wantNotifs: map[string]interface{}{},
|
||||
},
|
||||
{
|
||||
name: "no merge due to low priority",
|
||||
current: &State{
|
||||
FlagSources: []string{
|
||||
"B",
|
||||
"A",
|
||||
},
|
||||
Flags: map[string]model.Flag{
|
||||
"hello": {
|
||||
DefaultVariant: "off",
|
||||
Source: "A",
|
||||
},
|
||||
},
|
||||
},
|
||||
new: map[string]model.Flag{"hello": {DefaultVariant: "off"}},
|
||||
newSource: "B",
|
||||
want: &State{
|
||||
FlagSources: []string{
|
||||
"B",
|
||||
"A",
|
||||
},
|
||||
Flags: map[string]model.Flag{
|
||||
"hello": {
|
||||
DefaultVariant: "off",
|
||||
Source: "A",
|
||||
},
|
||||
},
|
||||
},
|
||||
wantNotifs: map[string]interface{}{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
gotNotifs, resyncRequired := tt.current.Update(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new, model.Metadata{})
|
||||
|
||||
require.True(t, reflect.DeepEqual(tt.want.Flags, tt.current.Flags))
|
||||
require.Equal(t, tt.wantNotifs, gotNotifs)
|
||||
require.Equal(t, tt.wantResync, resyncRequired)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlags_Add(t *testing.T) {
|
||||
mockLogger := logger.NewLogger(nil, false)
|
||||
mockSource := "source"
|
||||
mockOverrideSource := "source-2"
|
||||
|
||||
type request struct {
|
||||
source string
|
||||
selector string
|
||||
flags map[string]model.Flag
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
storedState *State
|
||||
addRequest request
|
||||
expectedState *State
|
||||
expectedNotificationKeys []string
|
||||
}{
|
||||
{
|
||||
name: "Add success",
|
||||
storedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
},
|
||||
},
|
||||
addRequest: request{
|
||||
source: mockSource,
|
||||
flags: map[string]model.Flag{
|
||||
"B": {Source: mockSource},
|
||||
},
|
||||
},
|
||||
expectedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
"B": {Source: mockSource},
|
||||
},
|
||||
},
|
||||
expectedNotificationKeys: []string{"B"},
|
||||
},
|
||||
{
|
||||
name: "Add multiple success",
|
||||
storedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
},
|
||||
},
|
||||
addRequest: request{
|
||||
source: mockSource,
|
||||
flags: map[string]model.Flag{
|
||||
"B": {Source: mockSource},
|
||||
"C": {Source: mockSource},
|
||||
},
|
||||
},
|
||||
expectedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
"B": {Source: mockSource},
|
||||
"C": {Source: mockSource},
|
||||
},
|
||||
},
|
||||
expectedNotificationKeys: []string{"B", "C"},
|
||||
},
|
||||
{
|
||||
name: "Add success - conflict and override",
|
||||
storedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
},
|
||||
},
|
||||
addRequest: request{
|
||||
source: mockOverrideSource,
|
||||
flags: map[string]model.Flag{
|
||||
"A": {Source: mockOverrideSource},
|
||||
},
|
||||
},
|
||||
expectedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockOverrideSource},
|
||||
},
|
||||
},
|
||||
expectedNotificationKeys: []string{"A"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
messages := tt.storedState.Add(mockLogger, tt.addRequest.source, tt.addRequest.selector, tt.addRequest.flags)
|
||||
|
||||
require.Equal(t, tt.storedState, tt.expectedState)
|
||||
|
||||
for k := range messages {
|
||||
require.Containsf(t, tt.expectedNotificationKeys, k,
|
||||
"Message key %s not present in the expected key list", k)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"maps"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
uuid "github.com/google/uuid"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
)
|
||||
|
||||
// flags table and index constants
|
||||
const flagsTable = "flags"
|
||||
|
||||
const idIndex = "id"
|
||||
const keyIndex = "key"
|
||||
const sourceIndex = "source"
|
||||
const priorityIndex = "priority"
|
||||
const flagSetIdIndex = "flagSetId"
|
||||
|
||||
// compound indices; maintain sub-indexes alphabetically; order matters; these must match what's generated in the SelectorMapToQuery func.
|
||||
const flagSetIdSourceCompoundIndex = flagSetIdIndex + "+" + sourceIndex
|
||||
const keySourceCompoundIndex = keyIndex + "+" + sourceIndex
|
||||
const flagSetIdKeySourceCompoundIndex = flagSetIdIndex + "+" + keyIndex + "+" + sourceIndex
|
||||
|
||||
// flagSetId defaults to a UUID generated at startup to make our queries consistent
|
||||
// any flag without a "flagSetId" is assigned this one; it's never exposed externally
|
||||
var nilFlagSetId = uuid.New().String()
|
||||
|
||||
// A selector represents a set of constraints used to query the store.
|
||||
type Selector struct {
|
||||
indexMap map[string]string
|
||||
}
|
||||
|
||||
// NewSelector creates a new Selector from a selector expression string.
|
||||
// For example, to select flags from source "./mySource" and flagSetId "1234", use the expression:
|
||||
// "source=./mySource,flagSetId=1234"
|
||||
func NewSelector(selectorExpression string) Selector {
|
||||
return Selector{
|
||||
indexMap: expressionToMap(selectorExpression),
|
||||
}
|
||||
}
|
||||
|
||||
func expressionToMap(sExp string) map[string]string {
|
||||
selectorMap := make(map[string]string)
|
||||
if sExp == "" {
|
||||
return selectorMap
|
||||
}
|
||||
|
||||
if strings.Index(sExp, "=") == -1 {
|
||||
// if no '=' is found, treat the whole string as as source (backwards compatibility)
|
||||
// we may may support interpreting this as a flagSetId in the future as an option
|
||||
selectorMap[sourceIndex] = sExp
|
||||
return selectorMap
|
||||
}
|
||||
|
||||
// Split the selector by commas
|
||||
pairs := strings.Split(sExp, ",")
|
||||
for _, pair := range pairs {
|
||||
// Split each pair by the first equal sign
|
||||
parts := strings.Split(pair, "=")
|
||||
if len(parts) == 2 {
|
||||
key := parts[0]
|
||||
value := parts[1]
|
||||
selectorMap[key] = value
|
||||
}
|
||||
}
|
||||
return selectorMap
|
||||
}
|
||||
|
||||
func (s Selector) WithIndex(key string, value string) Selector {
|
||||
m := maps.Clone(s.indexMap)
|
||||
m[key] = value
|
||||
return Selector{
|
||||
indexMap: m,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Selector) IsEmpty() bool {
|
||||
return s == nil || len(s.indexMap) == 0
|
||||
}
|
||||
|
||||
// SelectorMapToQuery converts the selector map to an indexId and constraints for querying the store.
|
||||
// For a given index, a specific order and number of constraints are required.
|
||||
// Both the indexId and constraints are generated based on the keys present in the selector's internal map.
|
||||
func (s Selector) ToQuery() (indexId string, constraints []interface{}) {
|
||||
|
||||
if len(s.indexMap) == 2 && s.indexMap[flagSetIdIndex] != "" && s.indexMap[keyIndex] != "" {
|
||||
// special case for flagSetId and key (this is the "id" index)
|
||||
return idIndex, []interface{}{s.indexMap[flagSetIdIndex], s.indexMap[keyIndex]}
|
||||
}
|
||||
|
||||
qs := []string{}
|
||||
keys := make([]string, 0, len(s.indexMap))
|
||||
|
||||
for key := range s.indexMap {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
for _, key := range keys {
|
||||
indexId += key + "+"
|
||||
qs = append(qs, s.indexMap[key])
|
||||
}
|
||||
|
||||
indexId = strings.TrimSuffix(indexId, "+")
|
||||
// Convert []string to []interface{}
|
||||
c := make([]interface{}, 0, len(qs))
|
||||
for _, v := range qs {
|
||||
c = append(c, v)
|
||||
}
|
||||
constraints = c
|
||||
|
||||
return indexId, constraints
|
||||
}
|
||||
|
||||
// SelectorToMetadata converts the selector's internal map to metadata for logging or tracing purposes.
|
||||
// Only includes known indices to avoid leaking sensitive information, and is usually returned as the "top level" metadata
|
||||
func (s *Selector) ToMetadata() model.Metadata {
|
||||
meta := model.Metadata{}
|
||||
|
||||
if s == nil || s.indexMap == nil {
|
||||
return meta
|
||||
}
|
||||
|
||||
if s.indexMap[flagSetIdIndex] != "" {
|
||||
meta[flagSetIdIndex] = s.indexMap[flagSetIdIndex]
|
||||
}
|
||||
if s.indexMap[sourceIndex] != "" {
|
||||
meta[sourceIndex] = s.indexMap[sourceIndex]
|
||||
}
|
||||
return meta
|
||||
}
|
|
@ -0,0 +1,193 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
)
|
||||
|
||||
func TestSelector_IsEmpty(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
selector *Selector
|
||||
wantEmpty bool
|
||||
}{
|
||||
{
|
||||
name: "nil selector",
|
||||
selector: nil,
|
||||
wantEmpty: true,
|
||||
},
|
||||
{
|
||||
name: "nil indexMap",
|
||||
selector: &Selector{indexMap: nil},
|
||||
wantEmpty: true,
|
||||
},
|
||||
{
|
||||
name: "empty indexMap",
|
||||
selector: &Selector{indexMap: map[string]string{}},
|
||||
wantEmpty: true,
|
||||
},
|
||||
{
|
||||
name: "non-empty indexMap",
|
||||
selector: &Selector{indexMap: map[string]string{"source": "abc"}},
|
||||
wantEmpty: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := tt.selector.IsEmpty()
|
||||
if got != tt.wantEmpty {
|
||||
t.Errorf("IsEmpty() = %v, want %v", got, tt.wantEmpty)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelector_WithIndex(t *testing.T) {
|
||||
oldS := Selector{indexMap: map[string]string{"source": "abc"}}
|
||||
newS := oldS.WithIndex("flagSetId", "1234")
|
||||
|
||||
if newS.indexMap["source"] != "abc" {
|
||||
t.Errorf("WithIndex did not preserve existing keys")
|
||||
}
|
||||
if newS.indexMap["flagSetId"] != "1234" {
|
||||
t.Errorf("WithIndex did not add new key")
|
||||
}
|
||||
// Ensure original is unchanged
|
||||
if _, ok := oldS.indexMap["flagSetId"]; ok {
|
||||
t.Errorf("WithIndex mutated original selector")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelector_ToQuery(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
selector Selector
|
||||
wantIndex string
|
||||
wantConstr []interface{}
|
||||
}{
|
||||
{
|
||||
name: "flagSetId and key primary index special case",
|
||||
selector: Selector{indexMap: map[string]string{"flagSetId": "fsid", "key": "myKey"}},
|
||||
wantIndex: "id",
|
||||
wantConstr: []interface{}{"fsid", "myKey"},
|
||||
},
|
||||
{
|
||||
name: "multiple keys sorted",
|
||||
selector: Selector{indexMap: map[string]string{"source": "src", "flagSetId": "fsid"}},
|
||||
wantIndex: "flagSetId+source",
|
||||
wantConstr: []interface{}{"fsid", "src"},
|
||||
},
|
||||
{
|
||||
name: "single key",
|
||||
selector: Selector{indexMap: map[string]string{"source": "src"}},
|
||||
wantIndex: "source",
|
||||
wantConstr: []interface{}{"src"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
gotIndex, gotConstr := tt.selector.ToQuery()
|
||||
if gotIndex != tt.wantIndex {
|
||||
t.Errorf("ToQuery() index = %v, want %v", gotIndex, tt.wantIndex)
|
||||
}
|
||||
if !reflect.DeepEqual(gotConstr, tt.wantConstr) {
|
||||
t.Errorf("ToQuery() constraints = %v, want %v", gotConstr, tt.wantConstr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelector_ToMetadata(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
selector *Selector
|
||||
want model.Metadata
|
||||
}{
|
||||
{
|
||||
name: "nil selector",
|
||||
selector: nil,
|
||||
want: model.Metadata{},
|
||||
},
|
||||
{
|
||||
name: "nil indexMap",
|
||||
selector: &Selector{indexMap: nil},
|
||||
want: model.Metadata{},
|
||||
},
|
||||
{
|
||||
name: "empty indexMap",
|
||||
selector: &Selector{indexMap: map[string]string{}},
|
||||
want: model.Metadata{},
|
||||
},
|
||||
{
|
||||
name: "flagSetId only",
|
||||
selector: &Selector{indexMap: map[string]string{"flagSetId": "fsid"}},
|
||||
want: model.Metadata{"flagSetId": "fsid"},
|
||||
},
|
||||
{
|
||||
name: "source only",
|
||||
selector: &Selector{indexMap: map[string]string{"source": "src"}},
|
||||
want: model.Metadata{"source": "src"},
|
||||
},
|
||||
{
|
||||
name: "flagSetId and source",
|
||||
selector: &Selector{indexMap: map[string]string{"flagSetId": "fsid", "source": "src"}},
|
||||
want: model.Metadata{"flagSetId": "fsid", "source": "src"},
|
||||
},
|
||||
{
|
||||
name: "flagSetId, source, and key (key should be ignored)",
|
||||
selector: &Selector{indexMap: map[string]string{"flagSetId": "fsid", "source": "src", "key": "myKey"}},
|
||||
want: model.Metadata{"flagSetId": "fsid", "source": "src"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := tt.selector.ToMetadata()
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("ToMetadata() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewSelector(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
wantMap map[string]string
|
||||
}{
|
||||
{
|
||||
name: "source and flagSetId",
|
||||
input: "source=abc,flagSetId=1234",
|
||||
wantMap: map[string]string{"source": "abc", "flagSetId": "1234"},
|
||||
},
|
||||
{
|
||||
name: "source",
|
||||
input: "source=abc",
|
||||
wantMap: map[string]string{"source": "abc"},
|
||||
},
|
||||
{
|
||||
name: "no equals, treat as source",
|
||||
input: "mysource",
|
||||
wantMap: map[string]string{"source": "mysource"},
|
||||
},
|
||||
{
|
||||
name: "empty string",
|
||||
input: "",
|
||||
wantMap: map[string]string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := NewSelector(tt.input)
|
||||
if !reflect.DeepEqual(s.indexMap, tt.wantMap) {
|
||||
t.Errorf("NewSelector(%q) indexMap = %v, want %v", tt.input, s.indexMap, tt.wantMap)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,396 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/open-feature/flagd/core/pkg/notifications"
|
||||
)
|
||||
|
||||
var noValidatedSources = []string{}
|
||||
|
||||
type SelectorContextKey struct{}
|
||||
|
||||
type FlagQueryResult struct {
|
||||
Flags map[string]model.Flag
|
||||
}
|
||||
|
||||
type IStore interface {
|
||||
Get(ctx context.Context, key string, selector *Selector) (model.Flag, model.Metadata, error)
|
||||
GetAll(ctx context.Context, selector *Selector) (map[string]model.Flag, model.Metadata, error)
|
||||
Watch(ctx context.Context, selector *Selector, watcher chan<- FlagQueryResult)
|
||||
}
|
||||
|
||||
var _ IStore = (*Store)(nil)
|
||||
|
||||
type Store struct {
|
||||
mx sync.RWMutex
|
||||
db *memdb.MemDB
|
||||
logger *logger.Logger
|
||||
sources []string
|
||||
// deprecated: has no effect and will be removed soon.
|
||||
FlagSources []string
|
||||
}
|
||||
|
||||
type SourceDetails struct {
|
||||
Source string
|
||||
Selector string
|
||||
}
|
||||
|
||||
// NewStore creates a new in-memory store with the given sources.
|
||||
// The order of sources in the slice determines their priority, when queries result in duplicate flags (queries without source or flagSetId), the higher priority source "wins".
|
||||
func NewStore(logger *logger.Logger, sources []string) (*Store, error) {
|
||||
|
||||
// a unique index must exist for each set of constraints - for example, to look up by key and source, we need a compound index on key+source, etc
|
||||
// we maybe want to generate these dynamically in the future to support more robust querying, but for now we will hardcode the ones we need
|
||||
schema := &memdb.DBSchema{
|
||||
Tables: map[string]*memdb.TableSchema{
|
||||
flagsTable: {
|
||||
Name: flagsTable,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
// primary index; must be unique and named "id"
|
||||
idIndex: {
|
||||
Name: idIndex,
|
||||
Unique: true,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
|
||||
&memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
|
||||
},
|
||||
},
|
||||
},
|
||||
// for looking up by source
|
||||
sourceIndex: {
|
||||
Name: sourceIndex,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
|
||||
},
|
||||
// for looking up by priority, used to maintain highest priority flag when there are duplicates and no selector is provided
|
||||
priorityIndex: {
|
||||
Name: priorityIndex,
|
||||
Unique: false,
|
||||
Indexer: &memdb.IntFieldIndex{Field: model.Priority},
|
||||
},
|
||||
// for looking up by flagSetId
|
||||
flagSetIdIndex: {
|
||||
Name: flagSetIdIndex,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
|
||||
},
|
||||
keyIndex: {
|
||||
Name: keyIndex,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
|
||||
},
|
||||
flagSetIdSourceCompoundIndex: {
|
||||
Name: flagSetIdSourceCompoundIndex,
|
||||
Unique: false,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
|
||||
&memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
|
||||
},
|
||||
},
|
||||
},
|
||||
keySourceCompoundIndex: {
|
||||
Name: keySourceCompoundIndex,
|
||||
Unique: false, // duplicate from a single source ARE allowed (they just must have different flag sets)
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
|
||||
&memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
|
||||
},
|
||||
},
|
||||
},
|
||||
// used to query all flags from a specific source so we know which flags to delete if a flag is missing from a source
|
||||
flagSetIdKeySourceCompoundIndex: {
|
||||
Name: flagSetIdKeySourceCompoundIndex,
|
||||
Unique: true,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
|
||||
&memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
|
||||
&memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Create a new data base
|
||||
db, err := memdb.NewMemDB(schema)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to initialize flag database: %w", err)
|
||||
}
|
||||
|
||||
// clone the sources to avoid modifying the original slice
|
||||
s := slices.Clone(sources)
|
||||
|
||||
return &Store{
|
||||
sources: s,
|
||||
db: db,
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Deprecated: use NewStore instead - will be removed very soon.
|
||||
func NewFlags() *Store {
|
||||
state, err := NewStore(logger.NewLogger(nil, false), noValidatedSources)
|
||||
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("unable to create flag store: %v", err))
|
||||
}
|
||||
return state
|
||||
}
|
||||
|
||||
func (s *Store) Get(_ context.Context, key string, selector *Selector) (model.Flag, model.Metadata, error) {
|
||||
s.logger.Debug(fmt.Sprintf("getting flag %s", key))
|
||||
txn := s.db.Txn(false)
|
||||
queryMeta := selector.ToMetadata()
|
||||
|
||||
// if present, use the selector to query the flags
|
||||
if !selector.IsEmpty() {
|
||||
selector := selector.WithIndex("key", key)
|
||||
indexId, constraints := selector.ToQuery()
|
||||
s.logger.Debug(fmt.Sprintf("getting flag with query: %s, %v", indexId, constraints))
|
||||
raw, err := txn.First(flagsTable, indexId, constraints...)
|
||||
flag, ok := raw.(model.Flag)
|
||||
if err != nil {
|
||||
return model.Flag{}, queryMeta, fmt.Errorf("flag %s not found: %w", key, err)
|
||||
}
|
||||
if !ok {
|
||||
return model.Flag{}, queryMeta, fmt.Errorf("flag %s is not a valid flag", key)
|
||||
}
|
||||
return flag, queryMeta, nil
|
||||
|
||||
}
|
||||
// otherwise, get all flags with the given key, and keep the last one with the highest priority
|
||||
s.logger.Debug(fmt.Sprintf("getting highest priority flag with key: %s", key))
|
||||
it, err := txn.Get(flagsTable, keyIndex, key)
|
||||
if err != nil {
|
||||
return model.Flag{}, queryMeta, fmt.Errorf("flag %s not found: %w", key, err)
|
||||
}
|
||||
flag := model.Flag{}
|
||||
found := false
|
||||
for raw := it.Next(); raw != nil; raw = it.Next() {
|
||||
nextFlag, ok := raw.(model.Flag)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
if nextFlag.Priority >= flag.Priority {
|
||||
flag = nextFlag
|
||||
} else {
|
||||
s.logger.Debug(fmt.Sprintf("discarding flag %s from lower priority source %s in favor of flag from source %s", nextFlag.Key, s.sources[nextFlag.Priority], s.sources[flag.Priority]))
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
return flag, queryMeta, fmt.Errorf("flag %s not found", key)
|
||||
}
|
||||
return flag, queryMeta, nil
|
||||
}
|
||||
|
||||
func (f *Store) String() (string, error) {
|
||||
f.logger.Debug("dumping flags to string")
|
||||
f.mx.RLock()
|
||||
defer f.mx.RUnlock()
|
||||
|
||||
state, _, err := f.GetAll(context.Background(), nil)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("unable to get all flags: %w", err)
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(state)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("unable to marshal flags: %w", err)
|
||||
}
|
||||
|
||||
return string(bytes), nil
|
||||
}
|
||||
|
||||
// GetAll returns a copy of the store's state (copy in order to be concurrency safe)
|
||||
func (s *Store) GetAll(ctx context.Context, selector *Selector) (map[string]model.Flag, model.Metadata, error) {
|
||||
flags := make(map[string]model.Flag)
|
||||
queryMeta := selector.ToMetadata()
|
||||
it, err := s.selectOrAll(selector)
|
||||
|
||||
if err != nil {
|
||||
s.logger.Error(fmt.Sprintf("flag query error: %v", err))
|
||||
return flags, queryMeta, err
|
||||
}
|
||||
flags = s.collect(it)
|
||||
return flags, queryMeta, nil
|
||||
}
|
||||
|
||||
// Update the flag state with the provided flags.
|
||||
func (s *Store) Update(
|
||||
source string,
|
||||
flags map[string]model.Flag,
|
||||
metadata model.Metadata,
|
||||
) (map[string]interface{}, bool) {
|
||||
resyncRequired := false
|
||||
|
||||
if source == "" {
|
||||
panic("source cannot be empty")
|
||||
}
|
||||
|
||||
priority := slices.Index(s.sources, source)
|
||||
if priority == -1 {
|
||||
// this is a hack to allow old constructors that didn't pass sources, remove when we remove "NewFlags" constructor
|
||||
if !slices.Equal(s.sources, noValidatedSources) {
|
||||
panic(fmt.Sprintf("source %s is not registered in the store", source))
|
||||
}
|
||||
// same as above - remove when we remove "NewFlags" constructor
|
||||
priority = 0
|
||||
}
|
||||
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
// get all flags for the source we are updating
|
||||
selector := NewSelector(sourceIndex + "=" + source)
|
||||
oldFlags, _, _ := s.GetAll(context.Background(), &selector)
|
||||
|
||||
s.mx.Lock()
|
||||
for key := range oldFlags {
|
||||
if _, ok := flags[key]; !ok {
|
||||
// flag has been deleted
|
||||
s.logger.Debug(fmt.Sprintf("flag %s has been deleted from source %s", key, source))
|
||||
|
||||
count, err := txn.DeleteAll(flagsTable, keySourceCompoundIndex, key, source)
|
||||
s.logger.Debug(fmt.Sprintf("deleted %d flags with key %s from source %s", count, key, source))
|
||||
|
||||
if err != nil {
|
||||
s.logger.Error(fmt.Sprintf("error deleting flag: %s, %v", key, err))
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
s.mx.Unlock()
|
||||
for key, newFlag := range flags {
|
||||
s.logger.Debug(fmt.Sprintf("got metadata %v", metadata))
|
||||
|
||||
newFlag.Key = key
|
||||
newFlag.Source = source
|
||||
newFlag.Priority = priority
|
||||
newFlag.Metadata = patchMetadata(metadata, newFlag.Metadata)
|
||||
|
||||
// flagSetId defaults to a UUID generated at startup to make our queries isomorphic
|
||||
flagSetId := nilFlagSetId
|
||||
// flagSetId is inherited from the set, but can be overridden by the flag
|
||||
setFlagSetId, ok := newFlag.Metadata["flagSetId"].(string)
|
||||
if ok {
|
||||
flagSetId = setFlagSetId
|
||||
}
|
||||
newFlag.FlagSetId = flagSetId
|
||||
|
||||
raw, err := txn.First(flagsTable, keySourceCompoundIndex, key, source)
|
||||
if err != nil {
|
||||
s.logger.Error(fmt.Sprintf("unable to get flag %s from source %s: %v", key, source, err))
|
||||
continue
|
||||
}
|
||||
oldFlag, ok := raw.(model.Flag)
|
||||
// If we already have a flag with the same key and source, we need to check if it has the same flagSetId
|
||||
if ok {
|
||||
if oldFlag.FlagSetId != newFlag.FlagSetId {
|
||||
// If the flagSetId is different, we need to delete the entry, since flagSetId+key represents the primary index, and it's now been changed.
|
||||
// This is important especially for clients listening to flagSetId changes, as they expect the flag to be removed from the set in this case.
|
||||
_, err = txn.DeleteAll(flagsTable, idIndex, oldFlag.FlagSetId, key)
|
||||
if err != nil {
|
||||
s.logger.Error(fmt.Sprintf("unable to delete flags with key %s and flagSetId %s: %v", key, oldFlag.FlagSetId, err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
// Store the new version of the flag
|
||||
s.logger.Debug(fmt.Sprintf("storing flag: %v", newFlag))
|
||||
err = txn.Insert(flagsTable, newFlag)
|
||||
if err != nil {
|
||||
s.logger.Error(fmt.Sprintf("unable to insert flag %s: %v", key, err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
txn.Commit()
|
||||
return notifications.NewFromFlags(oldFlags, flags), resyncRequired
|
||||
}
|
||||
|
||||
// Watch the result-set of a selector for changes, sending updates to the watcher channel.
|
||||
func (s *Store) Watch(ctx context.Context, selector *Selector, watcher chan<- FlagQueryResult) {
|
||||
go func() {
|
||||
for {
|
||||
ws := memdb.NewWatchSet()
|
||||
it, err := s.selectOrAll(selector)
|
||||
if err != nil {
|
||||
s.logger.Error(fmt.Sprintf("error watching flags: %v", err))
|
||||
close(watcher)
|
||||
return
|
||||
}
|
||||
ws.Add(it.WatchCh())
|
||||
|
||||
flags := s.collect(it)
|
||||
watcher <- FlagQueryResult{
|
||||
Flags: flags,
|
||||
}
|
||||
|
||||
if err = ws.WatchCtx(ctx); err != nil {
|
||||
s.logger.Error(fmt.Sprintf("error watching flags: %v", err))
|
||||
close(watcher)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// returns an iterator for the given selector, or all flags if the selector is nil or empty
|
||||
func (s *Store) selectOrAll(selector *Selector) (it memdb.ResultIterator, err error) {
|
||||
txn := s.db.Txn(false)
|
||||
if !selector.IsEmpty() {
|
||||
indexId, constraints := selector.ToQuery()
|
||||
s.logger.Debug(fmt.Sprintf("getting all flags with query: %s, %v", indexId, constraints))
|
||||
return txn.Get(flagsTable, indexId, constraints...)
|
||||
} else {
|
||||
// no selector, get all flags
|
||||
return txn.Get(flagsTable, idIndex)
|
||||
}
|
||||
}
|
||||
|
||||
// collects flags from an iterator, ensuring that only the highest priority flag is kept when there are duplicates
|
||||
func (s *Store) collect(it memdb.ResultIterator) map[string]model.Flag {
|
||||
flags := make(map[string]model.Flag)
|
||||
for raw := it.Next(); raw != nil; raw = it.Next() {
|
||||
flag := raw.(model.Flag)
|
||||
if existing, ok := flags[flag.Key]; ok {
|
||||
if flag.Priority < existing.Priority {
|
||||
s.logger.Debug(fmt.Sprintf("discarding duplicate flag %s from lower priority source %s in favor of flag from source %s", flag.Key, s.sources[flag.Priority], s.sources[existing.Priority]))
|
||||
continue // we already have a higher priority flag
|
||||
}
|
||||
s.logger.Debug(fmt.Sprintf("overwriting duplicate flag %s from lower priority source %s in favor of flag from source %s", flag.Key, s.sources[existing.Priority], s.sources[flag.Priority]))
|
||||
}
|
||||
flags[flag.Key] = flag
|
||||
}
|
||||
return flags
|
||||
}
|
||||
|
||||
func patchMetadata(original, patch model.Metadata) model.Metadata {
|
||||
patched := make(model.Metadata)
|
||||
if original == nil && patch == nil {
|
||||
return nil
|
||||
}
|
||||
for key, value := range original {
|
||||
patched[key] = value
|
||||
}
|
||||
for key, value := range patch { // patch values overwrite m1 values on key conflict
|
||||
patched[key] = value
|
||||
}
|
||||
return patched
|
||||
}
|
|
@ -0,0 +1,487 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestUpdateFlags(t *testing.T) {
|
||||
|
||||
const source1 = "source1"
|
||||
const source2 = "source2"
|
||||
var sources = []string{source1, source2}
|
||||
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
setup func(t *testing.T) *Store
|
||||
newFlags map[string]model.Flag
|
||||
source string
|
||||
wantFlags map[string]model.Flag
|
||||
setMetadata model.Metadata
|
||||
wantNotifs map[string]interface{}
|
||||
wantResync bool
|
||||
}{
|
||||
{
|
||||
name: "both nil",
|
||||
setup: func(t *testing.T) *Store {
|
||||
s, err := NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
return s
|
||||
},
|
||||
source: source1,
|
||||
newFlags: nil,
|
||||
wantFlags: map[string]model.Flag{},
|
||||
wantNotifs: map[string]interface{}{},
|
||||
},
|
||||
{
|
||||
name: "both empty flags",
|
||||
setup: func(t *testing.T) *Store {
|
||||
s, err := NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
return s
|
||||
},
|
||||
source: source1,
|
||||
newFlags: map[string]model.Flag{},
|
||||
wantFlags: map[string]model.Flag{},
|
||||
wantNotifs: map[string]interface{}{},
|
||||
},
|
||||
{
|
||||
name: "empty new",
|
||||
setup: func(t *testing.T) *Store {
|
||||
s, err := NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
return s
|
||||
},
|
||||
source: source1,
|
||||
newFlags: nil,
|
||||
wantFlags: map[string]model.Flag{},
|
||||
wantNotifs: map[string]interface{}{},
|
||||
},
|
||||
{
|
||||
name: "update from source 1 (old flag removed)",
|
||||
setup: func(t *testing.T) *Store {
|
||||
s, err := NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
s.Update(source1, map[string]model.Flag{
|
||||
"waka": {DefaultVariant: "off"},
|
||||
}, nil)
|
||||
return s
|
||||
},
|
||||
newFlags: map[string]model.Flag{
|
||||
"paka": {DefaultVariant: "on"},
|
||||
},
|
||||
source: source1,
|
||||
wantFlags: map[string]model.Flag{
|
||||
"paka": {Key: "paka", DefaultVariant: "on", Source: source1, FlagSetId: nilFlagSetId, Priority: 0},
|
||||
},
|
||||
wantNotifs: map[string]interface{}{
|
||||
"paka": map[string]interface{}{"type": "write"},
|
||||
"waka": map[string]interface{}{"type": "delete"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "update from source 1 (new flag added)",
|
||||
setup: func(t *testing.T) *Store {
|
||||
s, err := NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
s.Update(source1, map[string]model.Flag{
|
||||
"waka": {DefaultVariant: "off"},
|
||||
}, nil)
|
||||
return s
|
||||
},
|
||||
newFlags: map[string]model.Flag{
|
||||
"paka": {DefaultVariant: "on"},
|
||||
},
|
||||
source: source2,
|
||||
wantFlags: map[string]model.Flag{
|
||||
"waka": {Key: "waka", DefaultVariant: "off", Source: source1, FlagSetId: nilFlagSetId, Priority: 0},
|
||||
"paka": {Key: "paka", DefaultVariant: "on", Source: source2, FlagSetId: nilFlagSetId, Priority: 1},
|
||||
},
|
||||
wantNotifs: map[string]interface{}{"paka": map[string]interface{}{"type": "write"}},
|
||||
},
|
||||
{
|
||||
name: "flag set inheritance",
|
||||
setup: func(t *testing.T) *Store {
|
||||
s, err := NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
s.Update(source1, map[string]model.Flag{}, model.Metadata{})
|
||||
return s
|
||||
},
|
||||
setMetadata: model.Metadata{
|
||||
"flagSetId": "topLevelSet", // top level set metadata, including flagSetId
|
||||
},
|
||||
newFlags: map[string]model.Flag{
|
||||
"waka": {DefaultVariant: "on"},
|
||||
"paka": {DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": "flagLevelSet"}}, // overrides set level flagSetId
|
||||
},
|
||||
source: source1,
|
||||
wantFlags: map[string]model.Flag{
|
||||
"waka": {Key: "waka", DefaultVariant: "on", Source: source1, FlagSetId: "topLevelSet", Priority: 0, Metadata: model.Metadata{"flagSetId": "topLevelSet"}},
|
||||
"paka": {Key: "paka", DefaultVariant: "on", Source: source1, FlagSetId: "flagLevelSet", Priority: 0, Metadata: model.Metadata{"flagSetId": "flagLevelSet"}},
|
||||
},
|
||||
wantNotifs: map[string]interface{}{
|
||||
"paka": map[string]interface{}{"type": "write"},
|
||||
"waka": map[string]interface{}{"type": "write"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
store := tt.setup(t)
|
||||
gotNotifs, resyncRequired := store.Update(tt.source, tt.newFlags, tt.setMetadata)
|
||||
gotFlags, _, _ := store.GetAll(context.Background(), nil)
|
||||
|
||||
require.Equal(t, tt.wantFlags, gotFlags)
|
||||
require.Equal(t, tt.wantNotifs, gotNotifs)
|
||||
require.Equal(t, tt.wantResync, resyncRequired)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
|
||||
sourceA := "sourceA"
|
||||
sourceB := "sourceB"
|
||||
sourceC := "sourceC"
|
||||
flagSetIdB := "flagSetIdA"
|
||||
flagSetIdC := "flagSetIdC"
|
||||
var sources = []string{sourceA, sourceB, sourceC}
|
||||
|
||||
sourceASelector := NewSelector("source=" + sourceA)
|
||||
flagSetIdCSelector := NewSelector("flagSetId=" + flagSetIdC)
|
||||
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
key string
|
||||
selector *Selector
|
||||
wantFlag model.Flag
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "nil selector",
|
||||
key: "flagA",
|
||||
selector: nil,
|
||||
wantFlag: model.Flag{Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "flagSetId selector",
|
||||
key: "dupe",
|
||||
selector: &flagSetIdCSelector,
|
||||
wantFlag: model.Flag{Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "source selector",
|
||||
key: "dupe",
|
||||
selector: &sourceASelector,
|
||||
wantFlag: model.Flag{Key: "dupe", DefaultVariant: "on", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "flag not found with source selector",
|
||||
key: "flagB",
|
||||
selector: &sourceASelector,
|
||||
wantFlag: model.Flag{Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "flag not found with flagSetId selector",
|
||||
key: "flagB",
|
||||
selector: &flagSetIdCSelector,
|
||||
wantFlag: model.Flag{Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
sourceAFlags := map[string]model.Flag{
|
||||
"flagA": {Key: "flagA", DefaultVariant: "off"},
|
||||
"dupe": {Key: "dupe", DefaultVariant: "on"},
|
||||
}
|
||||
sourceBFlags := map[string]model.Flag{
|
||||
"flagB": {Key: "flagB", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdB}},
|
||||
}
|
||||
sourceCFlags := map[string]model.Flag{
|
||||
"flagC": {Key: "flagC", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
|
||||
"dupe": {Key: "dupe", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
|
||||
}
|
||||
|
||||
store, err := NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
|
||||
store.Update(sourceA, sourceAFlags, nil)
|
||||
store.Update(sourceB, sourceBFlags, nil)
|
||||
store.Update(sourceC, sourceCFlags, nil)
|
||||
gotFlag, _, err := store.Get(context.Background(), tt.key, tt.selector)
|
||||
|
||||
if !tt.wantErr {
|
||||
require.Equal(t, tt.wantFlag, gotFlag)
|
||||
} else {
|
||||
require.Error(t, err, "expected an error for key %s with selector %v", tt.key, tt.selector)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAllNoWatcher(t *testing.T) {
|
||||
|
||||
sourceA := "sourceA"
|
||||
sourceB := "sourceB"
|
||||
sourceC := "sourceC"
|
||||
flagSetIdB := "flagSetIdA"
|
||||
flagSetIdC := "flagSetIdC"
|
||||
sources := []string{sourceA, sourceB, sourceC}
|
||||
|
||||
sourceASelector := NewSelector("source=" + sourceA)
|
||||
flagSetIdCSelector := NewSelector("flagSetId=" + flagSetIdC)
|
||||
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
selector *Selector
|
||||
wantFlags map[string]model.Flag
|
||||
}{
|
||||
{
|
||||
name: "nil selector",
|
||||
selector: nil,
|
||||
wantFlags: map[string]model.Flag{
|
||||
// "dupe" should be overwritten by higher priority flag
|
||||
"flagA": {Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
|
||||
"flagB": {Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
|
||||
"flagC": {Key: "flagC", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
|
||||
"dupe": {Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "source selector",
|
||||
selector: &sourceASelector,
|
||||
wantFlags: map[string]model.Flag{
|
||||
// we should get the "dupe" from sourceA
|
||||
"flagA": {Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
|
||||
"dupe": {Key: "dupe", DefaultVariant: "on", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "flagSetId selector",
|
||||
selector: &flagSetIdCSelector,
|
||||
wantFlags: map[string]model.Flag{
|
||||
// we should get the "dupe" from flagSetIdC
|
||||
"flagC": {Key: "flagC", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
|
||||
"dupe": {Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
sourceAFlags := map[string]model.Flag{
|
||||
"flagA": {Key: "flagA", DefaultVariant: "off"},
|
||||
"dupe": {Key: "dupe", DefaultVariant: "on"},
|
||||
}
|
||||
sourceBFlags := map[string]model.Flag{
|
||||
"flagB": {Key: "flagB", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdB}},
|
||||
}
|
||||
sourceCFlags := map[string]model.Flag{
|
||||
"flagC": {Key: "flagC", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
|
||||
"dupe": {Key: "dupe", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
|
||||
}
|
||||
|
||||
store, err := NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
|
||||
store.Update(sourceA, sourceAFlags, nil)
|
||||
store.Update(sourceB, sourceBFlags, nil)
|
||||
store.Update(sourceC, sourceCFlags, nil)
|
||||
gotFlags, _, _ := store.GetAll(context.Background(), tt.selector)
|
||||
|
||||
require.Equal(t, len(tt.wantFlags), len(gotFlags))
|
||||
require.Equal(t, tt.wantFlags, gotFlags)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
|
||||
sourceA := "sourceA"
|
||||
sourceB := "sourceB"
|
||||
sourceC := "sourceC"
|
||||
myFlagSetId := "myFlagSet"
|
||||
var sources = []string{sourceA, sourceB, sourceC}
|
||||
pauseTime := 100 * time.Millisecond // time for updates to settle
|
||||
timeout := 1000 * time.Millisecond // time to make sure we get enough updates, and no extras
|
||||
|
||||
sourceASelector := NewSelector("source=" + sourceA)
|
||||
flagSetIdCSelector := NewSelector("flagSetId=" + myFlagSetId)
|
||||
emptySelector := NewSelector("")
|
||||
sourceCSelector := NewSelector("source=" + sourceC)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
selector *Selector
|
||||
wantUpdates int
|
||||
}{
|
||||
{
|
||||
name: "flag source selector (initial, plus 1 update)",
|
||||
selector: &sourceASelector,
|
||||
wantUpdates: 2,
|
||||
},
|
||||
{
|
||||
name: "flag set selector (initial, plus 3 updates)",
|
||||
selector: &flagSetIdCSelector,
|
||||
wantUpdates: 4,
|
||||
},
|
||||
{
|
||||
name: "no selector (all updates)",
|
||||
selector: &emptySelector,
|
||||
wantUpdates: 5,
|
||||
},
|
||||
{
|
||||
name: "flag source selector for unchanged source (initial, plus no updates)",
|
||||
selector: &sourceCSelector,
|
||||
wantUpdates: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
sourceAFlags := map[string]model.Flag{
|
||||
"flagA": {Key: "flagA", DefaultVariant: "off"},
|
||||
}
|
||||
sourceBFlags := map[string]model.Flag{
|
||||
"flagB": {Key: "flagB", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
|
||||
}
|
||||
sourceCFlags := map[string]model.Flag{
|
||||
"flagC": {Key: "flagC", DefaultVariant: "off"},
|
||||
}
|
||||
|
||||
store, err := NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
|
||||
// setup initial flags
|
||||
store.Update(sourceA, sourceAFlags, model.Metadata{})
|
||||
store.Update(sourceB, sourceBFlags, model.Metadata{})
|
||||
store.Update(sourceC, sourceCFlags, model.Metadata{})
|
||||
watcher := make(chan FlagQueryResult, 1)
|
||||
time.Sleep(pauseTime)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
store.Watch(ctx, tt.selector, watcher)
|
||||
|
||||
// perform updates
|
||||
go func() {
|
||||
|
||||
time.Sleep(pauseTime)
|
||||
|
||||
// changing a flag default variant should trigger an update
|
||||
store.Update(sourceA, map[string]model.Flag{
|
||||
"flagA": {Key: "flagA", DefaultVariant: "on"},
|
||||
}, model.Metadata{})
|
||||
|
||||
time.Sleep(pauseTime)
|
||||
|
||||
// changing a flag default variant should trigger an update
|
||||
store.Update(sourceB, map[string]model.Flag{
|
||||
"flagB": {Key: "flagB", DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
|
||||
}, model.Metadata{})
|
||||
|
||||
time.Sleep(pauseTime)
|
||||
|
||||
// removing a flag set id should trigger an update (even for flag set id selectors; it should remove the flag from the set)
|
||||
store.Update(sourceB, map[string]model.Flag{
|
||||
"flagB": {Key: "flagB", DefaultVariant: "on"},
|
||||
}, model.Metadata{})
|
||||
|
||||
time.Sleep(pauseTime)
|
||||
|
||||
// adding a flag set id should trigger an update
|
||||
store.Update(sourceB, map[string]model.Flag{
|
||||
"flagB": {Key: "flagB", DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
|
||||
}, model.Metadata{})
|
||||
}()
|
||||
|
||||
updates := 0
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
assert.Equal(t, tt.wantUpdates, updates, "expected %d updates, got %d", tt.wantUpdates, updates)
|
||||
cancel()
|
||||
_, open := <-watcher
|
||||
assert.False(t, open, "watcher channel should be closed after cancel")
|
||||
return
|
||||
case q := <-watcher:
|
||||
if q.Flags != nil {
|
||||
updates++
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryMetadata(t *testing.T) {
|
||||
|
||||
sourceA := "sourceA"
|
||||
otherSource := "otherSource"
|
||||
nonExistingFlagSetId := "nonExistingFlagSetId"
|
||||
var sources = []string{sourceA}
|
||||
sourceAFlags := map[string]model.Flag{
|
||||
"flagA": {Key: "flagA", DefaultVariant: "off"},
|
||||
"flagB": {Key: "flagB", DefaultVariant: "on"},
|
||||
}
|
||||
|
||||
store, err := NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
|
||||
// setup initial flags
|
||||
store.Update(sourceA, sourceAFlags, model.Metadata{})
|
||||
|
||||
selector := NewSelector("source=" + otherSource + ",flagSetId=" + nonExistingFlagSetId)
|
||||
_, metadata, _ := store.GetAll(context.Background(), &selector)
|
||||
assert.Equal(t, metadata, model.Metadata{"source": otherSource, "flagSetId": nonExistingFlagSetId}, "metadata did not match expected")
|
||||
|
||||
selector = NewSelector("source=" + otherSource + ",flagSetId=" + nonExistingFlagSetId)
|
||||
_, metadata, _ = store.Get(context.Background(), "key", &selector)
|
||||
assert.Equal(t, metadata, model.Metadata{"source": otherSource, "flagSetId": nonExistingFlagSetId}, "metadata did not match expected")
|
||||
}
|
|
@ -58,6 +58,7 @@ func TestSimpleReSync(t *testing.T) {
|
|||
func TestSimpleSync(t *testing.T) {
|
||||
readDirName := t.TempDir()
|
||||
updateDirName := t.TempDir()
|
||||
deleteDirName := t.TempDir()
|
||||
tests := map[string]struct {
|
||||
manipulationFuncs []func(t *testing.T)
|
||||
expectedDataSync []sync.DataSync
|
||||
|
@ -98,6 +99,27 @@ func TestSimpleSync(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
"delete-event": {
|
||||
fetchDirName: deleteDirName,
|
||||
manipulationFuncs: []func(t *testing.T){
|
||||
func(t *testing.T) {
|
||||
writeToFile(t, deleteDirName, fetchFileContents)
|
||||
},
|
||||
func(t *testing.T) {
|
||||
deleteFile(t, deleteDirName, fetchFileName)
|
||||
},
|
||||
},
|
||||
expectedDataSync: []sync.DataSync{
|
||||
{
|
||||
FlagData: fetchFileContents,
|
||||
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
|
||||
},
|
||||
{
|
||||
FlagData: defaultState,
|
||||
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for test, tt := range tests {
|
||||
|
|
|
@ -290,16 +290,16 @@ func TestHTTPSync_Fetch(t *testing.T) {
|
|||
newETag := `"c2e01ce63d90109c4c7f4f6dcea97ed1bb2b51e3647f36caf5acbe27413a24bb"`
|
||||
|
||||
return &http.Response{
|
||||
Header: map[string][]string{
|
||||
Header: map[string][]string{
|
||||
"Content-Type": {"application/json"},
|
||||
"Etag": {newETag},
|
||||
"Etag": {newETag},
|
||||
},
|
||||
Body: io.NopCloser(strings.NewReader(newContent)),
|
||||
StatusCode: http.StatusOK,
|
||||
}, nil
|
||||
})
|
||||
},
|
||||
uri: "http://localhost",
|
||||
uri: "http://localhost",
|
||||
eTagHeader: `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`,
|
||||
handleResponse: func(t *testing.T, httpSync Sync, _ string, err error) {
|
||||
if err != nil {
|
||||
|
@ -370,7 +370,7 @@ func TestSync_Init(t *testing.T) {
|
|||
func TestHTTPSync_Resync(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
source := "http://localhost"
|
||||
emptyeFlagData := "{}"
|
||||
emptyFlagData := "{}"
|
||||
|
||||
tests := map[string]struct {
|
||||
setup func(t *testing.T, client *syncmock.MockClient)
|
||||
|
@ -385,7 +385,7 @@ func TestHTTPSync_Resync(t *testing.T) {
|
|||
setup: func(_ *testing.T, client *syncmock.MockClient) {
|
||||
client.EXPECT().Do(gomock.Any()).Return(&http.Response{
|
||||
Header: map[string][]string{"Content-Type": {"application/json"}},
|
||||
Body: io.NopCloser(strings.NewReader(emptyeFlagData)),
|
||||
Body: io.NopCloser(strings.NewReader(emptyFlagData)),
|
||||
StatusCode: http.StatusOK,
|
||||
}, nil)
|
||||
},
|
||||
|
@ -402,7 +402,7 @@ func TestHTTPSync_Resync(t *testing.T) {
|
|||
wantErr: false,
|
||||
wantNotifications: []sync.DataSync{
|
||||
{
|
||||
FlagData: emptyeFlagData,
|
||||
FlagData: emptyFlagData,
|
||||
Source: source,
|
||||
},
|
||||
},
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
---
|
||||
status: accepted
|
||||
author: @tangenti
|
||||
created: 2025-06-16
|
||||
updated: 2025-06-16
|
||||
---
|
||||
|
||||
# Decouple flag sync sources and flag sets
|
||||
|
||||
The goal is to support dynamic flag sets for flagd providers and decouple sources and flag sets.
|
||||
|
||||
## Background
|
||||
|
||||
Flagd daemon syncs flag configurations from multiple sources. A single source provides a single config, which has an optional flag set ID that may or may not change in the following syncs of the same source.
|
||||
|
||||
The in-process provider uses `selector` to specify the desired source. In order to get a desired flag set, a provider has to stick to a source that provides that flag set. In this case, the flagd daemon cannot remove a source without breaking the dependant flagd providers.
|
||||
|
||||
Assumptions of the current model
|
||||
|
||||
- `flagSetId`s must be unique across different sources or the configuration is considered invalid.
|
||||
- In-process providers request at most one flag set.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Flagd daemon can remove a source without breaking in-process providers that depend on the flag set the source provides.
|
||||
- In-process providers can select based on flag sets.
|
||||
- No breaking changes for the current usage of `selector`
|
||||
|
||||
## Proposal
|
||||
|
||||
### API change
|
||||
|
||||
#### Flag Configuration Schema
|
||||
|
||||
Add an optional field `flagsetID` under `flag` or `flag.metadata`. The flag set ID cannot be specified if a flag set ID is specified for the config.
|
||||
|
||||
### Flagd Sync Selector
|
||||
|
||||
Selector will be extended for generic flags selection, starting with checking the equivalence of `source` and `flagsetID` of flags.
|
||||
|
||||
Example
|
||||
|
||||
```yaml
|
||||
# Flags from the source `override`
|
||||
selector: override
|
||||
|
||||
# Flags from the source `override`
|
||||
selector: source=override
|
||||
|
||||
# Flags from the flag set `project-42`
|
||||
selector: flagsetID=project-42
|
||||
```
|
||||
|
||||
The semantic can later be extended with a more complex design, such as AIP-160 filter or Kubernetes selections. This is out of the scope of this ADR.
|
||||
|
||||
### Flagd Daemon Storage
|
||||
|
||||
1. Flagd will have separate stores for `flags` and `sources`
|
||||
|
||||
2. `selector` will be removed from the store
|
||||
|
||||
3. `flagSetID` will be added as part of `model.Flag` or under `model.Flag.Metadata` for better consistency with the API.
|
||||
|
||||
### Flags Sync
|
||||
|
||||
Sync server would count the extended syntax of `selector` and filter the list of flags on-the-fly answering the requests from the providers.
|
||||
|
||||
The existing conflict resolving based on sources remains the same. Resyncs on removing flags remains unchanged as well.
|
||||
|
||||
## Consequences
|
||||
|
||||
### The good
|
||||
|
||||
- One source can have multiple flag sets.
|
||||
- `selector` works on a more grandular level.
|
||||
- No breaking change
|
||||
- Sync servers and clients now hold the same understanding of the `selector` semantic.
|
|
@ -1,5 +1,5 @@
|
|||
---
|
||||
status: proposed
|
||||
status: accepted
|
||||
author: @tangenti
|
||||
created: 2025-06-27
|
||||
updated: 2025-06-27
|
||||
|
|
|
@ -14,6 +14,7 @@ flagd start [flags]
|
|||
-H, --context-from-header stringToString add key-value pairs to map header values to context values, where key is Header name, value is context key (default [])
|
||||
-X, --context-value stringToString add arbitrary key value pairs to the flag evaluation context (default [])
|
||||
-C, --cors-origin strings CORS allowed origins, * will allow all origins
|
||||
--disable-sync-metadata Disables the getMetadata endpoint of the sync service. Defaults to false, but will default to true in later versions.
|
||||
-h, --help help for start
|
||||
-z, --log-format string Set the logging format, e.g. console or json (default "console")
|
||||
-m, --management-port int32 Port for management operations (default 8014)
|
||||
|
|
|
@ -1,5 +1,12 @@
|
|||
# Changelog
|
||||
|
||||
## [0.12.9](https://github.com/open-feature/flagd/compare/flagd/v0.12.8...flagd/v0.12.9) (2025-07-28)
|
||||
|
||||
|
||||
### ✨ New Features
|
||||
|
||||
* Add toggle for disabling getMetadata request ([#1693](https://github.com/open-feature/flagd/issues/1693)) ([e8fd680](https://github.com/open-feature/flagd/commit/e8fd6808608caa7ff7e54792fe97d88e7c294486))
|
||||
|
||||
## [0.12.8](https://github.com/open-feature/flagd/compare/flagd/v0.12.7...flagd/v0.12.8) (2025-07-21)
|
||||
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ const (
|
|||
syncPortFlagName = "sync-port"
|
||||
syncSocketPathFlagName = "sync-socket-path"
|
||||
uriFlagName = "uri"
|
||||
disableSyncMetadata = "disable-sync-metadata"
|
||||
contextValueFlagName = "context-value"
|
||||
headerToContextKeyFlagName = "context-from-header"
|
||||
streamDeadlineFlagName = "stream-deadline"
|
||||
|
@ -89,6 +90,7 @@ func init() {
|
|||
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map "+
|
||||
"header values to context values, where key is Header name, value is context key")
|
||||
flags.Duration(streamDeadlineFlagName, 0, "Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).")
|
||||
flags.Bool(disableSyncMetadata, false, "Disables the getMetadata endpoint of the sync service. Defaults to false, but will default to true in later versions.")
|
||||
|
||||
bindFlags(flags)
|
||||
}
|
||||
|
@ -114,6 +116,7 @@ func bindFlags(flags *pflag.FlagSet) {
|
|||
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
|
||||
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
|
||||
_ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName))
|
||||
_ = viper.BindPFlag(disableSyncMetadata, flags.Lookup(disableSyncMetadata))
|
||||
}
|
||||
|
||||
// startCmd represents the start command
|
||||
|
@ -186,6 +189,7 @@ var startCmd = &cobra.Command{
|
|||
SyncServicePort: viper.GetUint16(syncPortFlagName),
|
||||
SyncServiceSocketPath: viper.GetString(syncSocketPathFlagName),
|
||||
StreamDeadline: viper.GetDuration(streamDeadlineFlagName),
|
||||
DisableSyncMetadata: viper.GetBool(disableSyncMetadata),
|
||||
SyncProviders: syncProviders,
|
||||
ContextValues: contextValuesToMap,
|
||||
HeaderToContextKeyMappings: headerToContextKeyMappings,
|
||||
|
|
|
@ -110,6 +110,9 @@ require (
|
|||
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
|
||||
github.com/googleapis/gax-go/v2 v2.14.2 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect
|
||||
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
|
||||
github.com/hashicorp/go-memdb v1.3.5 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/imdario/mergo v0.3.16 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
|
|
|
@ -313,6 +313,14 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0Ntos
|
|||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 h1:X5VWvz21y3gzm9Nw/kaUeku/1+uBhcekkmy4IkffJww=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1/go.mod h1:Zanoh4+gvIgluNqcfMVTJueD4wSS5hT7zTt4Mrutd90=
|
||||
github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc=
|
||||
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
|
||||
github.com/hashicorp/go-memdb v1.3.5 h1:b3taDMxCBCBVgyRrS1AZVHO14ubMYZB++QpNhBg+Nyo=
|
||||
github.com/hashicorp/go-memdb v1.3.5/go.mod h1:8IVKKBkVe+fxFgdFOYxzQQNjz+sWCyHCdIC/+5+Vy1Y=
|
||||
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
|
||||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
|
||||
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
|
||||
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
|
||||
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
|
||||
|
|
|
@ -39,6 +39,7 @@ type Config struct {
|
|||
SyncServicePort uint16
|
||||
SyncServiceSocketPath string
|
||||
StreamDeadline time.Duration
|
||||
DisableSyncMetadata bool
|
||||
|
||||
SyncProviders []sync.SourceConfig
|
||||
CORS []string
|
||||
|
@ -78,21 +79,20 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
|
|||
logger.Error(fmt.Sprintf("error building metrics recorder: %v", err))
|
||||
}
|
||||
|
||||
// build flag store, collect flag sources & fill sources details
|
||||
s := store.NewFlags()
|
||||
sources := []string{}
|
||||
|
||||
for _, provider := range config.SyncProviders {
|
||||
s.FlagSources = append(s.FlagSources, provider.URI)
|
||||
s.SourceDetails[provider.URI] = store.SourceDetails{
|
||||
Source: provider.URI,
|
||||
Selector: provider.Selector,
|
||||
}
|
||||
sources = append(sources, provider.URI)
|
||||
}
|
||||
|
||||
// build flag store, collect flag sources & fill sources details
|
||||
store, err := store.NewStore(logger, sources)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating flag store: %w", err)
|
||||
}
|
||||
|
||||
// derive evaluator
|
||||
jsonEvaluator := evaluator.NewJSON(logger, s)
|
||||
jsonEvaluator := evaluator.NewJSON(logger, store)
|
||||
|
||||
// derive services
|
||||
|
||||
|
@ -100,6 +100,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
|
|||
connectService := flageval.NewConnectService(
|
||||
logger.WithFields(zap.String("component", "service")),
|
||||
jsonEvaluator,
|
||||
store,
|
||||
recorder)
|
||||
|
||||
// ofrep service
|
||||
|
@ -111,20 +112,21 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
|
|||
config.HeaderToContextKeyMappings,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating ofrep service")
|
||||
return nil, fmt.Errorf("error creating OFREP service: %w", err)
|
||||
}
|
||||
|
||||
// flag sync service
|
||||
flagSyncService, err := flagsync.NewSyncService(flagsync.SvcConfigurations{
|
||||
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
|
||||
Port: config.SyncServicePort,
|
||||
Sources: sources,
|
||||
Store: s,
|
||||
ContextValues: config.ContextValues,
|
||||
KeyPath: config.ServiceKeyPath,
|
||||
CertPath: config.ServiceCertPath,
|
||||
SocketPath: config.SyncServiceSocketPath,
|
||||
StreamDeadline: config.StreamDeadline,
|
||||
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
|
||||
Port: config.SyncServicePort,
|
||||
Sources: sources,
|
||||
Store: store,
|
||||
ContextValues: config.ContextValues,
|
||||
KeyPath: config.ServiceKeyPath,
|
||||
CertPath: config.ServiceCertPath,
|
||||
SocketPath: config.SyncServiceSocketPath,
|
||||
StreamDeadline: config.StreamDeadline,
|
||||
DisableSyncMetadata: config.DisableSyncMetadata,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating sync service: %w", err)
|
||||
|
@ -144,11 +146,11 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
|
|||
}
|
||||
|
||||
return &Runtime{
|
||||
Logger: logger.WithFields(zap.String("component", "runtime")),
|
||||
Evaluator: jsonEvaluator,
|
||||
FlagSync: flagSyncService,
|
||||
OfrepService: ofrepService,
|
||||
Service: connectService,
|
||||
Logger: logger.WithFields(zap.String("component", "runtime")),
|
||||
Evaluator: jsonEvaluator,
|
||||
SyncService: flagSyncService,
|
||||
OfrepService: ofrepService,
|
||||
EvaluationService: connectService,
|
||||
ServiceConfig: service.Configuration{
|
||||
Port: config.ServicePort,
|
||||
ManagementPort: config.ManagementPort,
|
||||
|
@ -162,7 +164,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
|
|||
HeaderToContextKeyMappings: config.HeaderToContextKeyMappings,
|
||||
StreamDeadline: config.StreamDeadline,
|
||||
},
|
||||
SyncImpl: iSyncs,
|
||||
Syncs: iSyncs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -19,23 +19,23 @@ import (
|
|||
)
|
||||
|
||||
type Runtime struct {
|
||||
Evaluator evaluator.IEvaluator
|
||||
Logger *logger.Logger
|
||||
FlagSync flagsync.ISyncService
|
||||
OfrepService ofrep.IOfrepService
|
||||
Service service.IFlagEvaluationService
|
||||
ServiceConfig service.Configuration
|
||||
SyncImpl []sync.ISync
|
||||
Evaluator evaluator.IEvaluator
|
||||
Logger *logger.Logger
|
||||
SyncService flagsync.ISyncService
|
||||
OfrepService ofrep.IOfrepService
|
||||
EvaluationService service.IFlagEvaluationService
|
||||
ServiceConfig service.Configuration
|
||||
Syncs []sync.ISync
|
||||
|
||||
mu msync.Mutex
|
||||
}
|
||||
|
||||
//nolint:funlen
|
||||
func (r *Runtime) Start() error {
|
||||
if r.Service == nil {
|
||||
if r.EvaluationService == nil {
|
||||
return errors.New("no service set")
|
||||
}
|
||||
if len(r.SyncImpl) == 0 {
|
||||
if len(r.Syncs) == 0 {
|
||||
return errors.New("no sync implementation set")
|
||||
}
|
||||
if r.Evaluator == nil {
|
||||
|
@ -44,40 +44,26 @@ func (r *Runtime) Start() error {
|
|||
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
g, gCtx := errgroup.WithContext(ctx)
|
||||
dataSync := make(chan sync.DataSync, len(r.SyncImpl))
|
||||
dataSync := make(chan sync.DataSync, len(r.Syncs))
|
||||
// Initialize DataSync channel watcher
|
||||
g.Go(func() error {
|
||||
for {
|
||||
select {
|
||||
case data := <-dataSync:
|
||||
// resync events are triggered when a delete occurs during flag merges in the store
|
||||
// resync events may trigger further resync events, however for a flag to be deleted from the store
|
||||
// its source must match, preventing the opportunity for resync events to snowball
|
||||
if resyncRequired := r.updateAndEmit(data); resyncRequired {
|
||||
for _, s := range r.SyncImpl {
|
||||
p := s
|
||||
g.Go(func() error {
|
||||
err := p.ReSync(gCtx, dataSync)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error resyncing sources: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
r.updateAndEmit(data)
|
||||
case <-gCtx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
})
|
||||
// Init sync providers
|
||||
for _, s := range r.SyncImpl {
|
||||
for _, s := range r.Syncs {
|
||||
if err := s.Init(gCtx); err != nil {
|
||||
return fmt.Errorf("sync provider Init returned error: %w", err)
|
||||
}
|
||||
}
|
||||
// Start sync provider
|
||||
for _, s := range r.SyncImpl {
|
||||
for _, s := range r.Syncs {
|
||||
p := s
|
||||
g.Go(func() error {
|
||||
if err := p.Sync(gCtx, dataSync); err != nil {
|
||||
|
@ -89,14 +75,14 @@ func (r *Runtime) Start() error {
|
|||
|
||||
defer func() {
|
||||
r.Logger.Info("Shutting down server...")
|
||||
r.Service.Shutdown()
|
||||
r.EvaluationService.Shutdown()
|
||||
r.Logger.Info("Server successfully shutdown.")
|
||||
}()
|
||||
|
||||
g.Go(func() error {
|
||||
// Readiness probe rely on the runtime
|
||||
r.ServiceConfig.ReadinessProbe = r.isReady
|
||||
if err := r.Service.Serve(gCtx, r.ServiceConfig); err != nil {
|
||||
if err := r.EvaluationService.Serve(gCtx, r.ServiceConfig); err != nil {
|
||||
return fmt.Errorf("error returned from serving flag evaluation service: %w", err)
|
||||
}
|
||||
return nil
|
||||
|
@ -112,7 +98,7 @@ func (r *Runtime) Start() error {
|
|||
})
|
||||
|
||||
g.Go(func() error {
|
||||
err := r.FlagSync.Start(gCtx)
|
||||
err := r.SyncService.Start(gCtx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error from sync server: %w", err)
|
||||
}
|
||||
|
@ -128,7 +114,7 @@ func (r *Runtime) Start() error {
|
|||
|
||||
func (r *Runtime) isReady() bool {
|
||||
// if all providers can watch for flag changes, we are ready.
|
||||
for _, p := range r.SyncImpl {
|
||||
for _, p := range r.Syncs {
|
||||
if !p.IsReady() {
|
||||
return false
|
||||
}
|
||||
|
@ -137,24 +123,14 @@ func (r *Runtime) isReady() bool {
|
|||
}
|
||||
|
||||
// updateAndEmit helps to update state, notify changes and trigger sync updates
|
||||
func (r *Runtime) updateAndEmit(payload sync.DataSync) bool {
|
||||
func (r *Runtime) updateAndEmit(payload sync.DataSync) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
notifications, resyncRequired, err := r.Evaluator.SetState(payload)
|
||||
_, _, err := r.Evaluator.SetState(payload)
|
||||
if err != nil {
|
||||
r.Logger.Error(err.Error())
|
||||
return false
|
||||
r.Logger.Error(fmt.Sprintf("error setting state: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
r.Service.Notify(service.Notification{
|
||||
Type: service.ConfigurationChange,
|
||||
Data: map[string]interface{}{
|
||||
"flags": notifications,
|
||||
},
|
||||
})
|
||||
|
||||
r.FlagSync.Emit(resyncRequired, payload.Source)
|
||||
|
||||
return resyncRequired
|
||||
r.SyncService.Emit(payload.Source)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
package service
|
||||
|
||||
const FLAGD_SELECTOR_HEADER = "Flagd-Selector"
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/open-feature/flagd/core/pkg/evaluator"
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/service"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"github.com/open-feature/flagd/core/pkg/telemetry"
|
||||
"github.com/open-feature/flagd/flagd/pkg/service/middleware"
|
||||
corsmw "github.com/open-feature/flagd/flagd/pkg/service/middleware/cors"
|
||||
|
@ -71,15 +72,17 @@ type ConnectService struct {
|
|||
|
||||
// NewConnectService creates a ConnectService with provided parameters
|
||||
func NewConnectService(
|
||||
logger *logger.Logger, evaluator evaluator.IEvaluator, mRecorder telemetry.IMetricsRecorder,
|
||||
logger *logger.Logger, evaluator evaluator.IEvaluator, store store.IStore, mRecorder telemetry.IMetricsRecorder,
|
||||
) *ConnectService {
|
||||
cs := &ConnectService{
|
||||
logger: logger,
|
||||
eval: evaluator,
|
||||
metrics: &telemetry.NoopMetricsRecorder{},
|
||||
eventingConfiguration: &eventingConfiguration{
|
||||
subs: make(map[interface{}]chan service.Notification),
|
||||
mu: &sync.RWMutex{},
|
||||
subs: make(map[interface{}]chan service.Notification),
|
||||
mu: &sync.RWMutex{},
|
||||
store: store,
|
||||
logger: logger,
|
||||
},
|
||||
}
|
||||
if mRecorder != nil {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -14,9 +15,11 @@ import (
|
|||
mock "github.com/open-feature/flagd/core/pkg/evaluator/mock"
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/open-feature/flagd/core/pkg/notifications"
|
||||
iservice "github.com/open-feature/flagd/core/pkg/service"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"github.com/open-feature/flagd/core/pkg/telemetry"
|
||||
"github.com/open-feature/flagd/flagd/pkg/service/middleware/mock"
|
||||
middlewaremock "github.com/open-feature/flagd/flagd/pkg/service/middleware/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
|
@ -81,7 +84,7 @@ func TestConnectService_UnixConnection(t *testing.T) {
|
|||
exp := metric.NewManualReader()
|
||||
rs := resource.NewWithAttributes("testSchema")
|
||||
metricRecorder := telemetry.NewOTelRecorder(exp, rs, tt.name)
|
||||
svc := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)
|
||||
svc := NewConnectService(logger.NewLogger(nil, false), eval, &store.Store{}, metricRecorder)
|
||||
serveConf := iservice.Configuration{
|
||||
ReadinessProbe: func() bool {
|
||||
return true
|
||||
|
@ -136,7 +139,7 @@ func TestAddMiddleware(t *testing.T) {
|
|||
rs := resource.NewWithAttributes("testSchema")
|
||||
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")
|
||||
|
||||
svc := NewConnectService(logger.NewLogger(nil, false), nil, metricRecorder)
|
||||
svc := NewConnectService(logger.NewLogger(nil, false), nil, &store.Store{}, metricRecorder)
|
||||
|
||||
serveConf := iservice.Configuration{
|
||||
ReadinessProbe: func() bool {
|
||||
|
@ -173,16 +176,22 @@ func TestConnectServiceNotify(t *testing.T) {
|
|||
// given
|
||||
ctrl := gomock.NewController(t)
|
||||
eval := mock.NewMockIEvaluator(ctrl)
|
||||
sources := []string{"source1", "source2"}
|
||||
log := logger.NewLogger(nil, false)
|
||||
s, err := store.NewStore(log, sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
|
||||
exp := metric.NewManualReader()
|
||||
rs := resource.NewWithAttributes("testSchema")
|
||||
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")
|
||||
|
||||
service := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)
|
||||
service := NewConnectService(logger.NewLogger(nil, false), eval, s, metricRecorder)
|
||||
|
||||
sChan := make(chan iservice.Notification, 1)
|
||||
eventing := service.eventingConfiguration
|
||||
eventing.Subscribe("key", sChan)
|
||||
eventing.Subscribe(context.Background(), "key", nil, sChan)
|
||||
|
||||
// notification type
|
||||
ofType := iservice.ConfigurationChange
|
||||
|
@ -207,20 +216,73 @@ func TestConnectServiceNotify(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestConnectServiceWatcher(t *testing.T) {
|
||||
sources := []string{"source1", "source2"}
|
||||
log := logger.NewLogger(nil, false)
|
||||
s, err := store.NewStore(log, sources)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
|
||||
sChan := make(chan iservice.Notification, 1)
|
||||
eventing := eventingConfiguration{
|
||||
store: s,
|
||||
logger: log,
|
||||
mu: &sync.RWMutex{},
|
||||
subs: make(map[any]chan iservice.Notification),
|
||||
}
|
||||
|
||||
// subscribe and wait for for the sub to be active
|
||||
eventing.Subscribe(context.Background(), "anything", nil, sChan)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// make a change
|
||||
s.Update(sources[0], map[string]model.Flag{
|
||||
"flag1": {
|
||||
Key: "flag1",
|
||||
DefaultVariant: "off",
|
||||
},
|
||||
}, model.Metadata{})
|
||||
|
||||
// notification type
|
||||
ofType := iservice.ConfigurationChange
|
||||
|
||||
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case n := <-sChan:
|
||||
require.Equal(t, ofType, n.Type, "expected notification type: %s, but received %s", ofType, n.Type)
|
||||
notifications := n.Data["flags"].(notifications.Notifications)
|
||||
flag1, ok := notifications["flag1"].(map[string]interface{})
|
||||
require.True(t, ok, "flag1 notification should be a map[string]interface{}")
|
||||
require.Equal(t, flag1["type"], string(model.NotificationCreate), "expected notification type: %s, but received %s", model.NotificationCreate, flag1["type"])
|
||||
case <-timeout.Done():
|
||||
t.Error("timeout while waiting for notifications")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnectServiceShutdown(t *testing.T) {
|
||||
// given
|
||||
ctrl := gomock.NewController(t)
|
||||
eval := mock.NewMockIEvaluator(ctrl)
|
||||
sources := []string{"source1", "source2"}
|
||||
log := logger.NewLogger(nil, false)
|
||||
s, err := store.NewStore(log, sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
|
||||
exp := metric.NewManualReader()
|
||||
rs := resource.NewWithAttributes("testSchema")
|
||||
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")
|
||||
|
||||
service := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)
|
||||
service := NewConnectService(logger.NewLogger(nil, false), eval, s, metricRecorder)
|
||||
|
||||
sChan := make(chan iservice.Notification, 1)
|
||||
eventing := service.eventingConfiguration
|
||||
eventing.Subscribe("key", sChan)
|
||||
eventing.Subscribe(context.Background(), "key", nil, sChan)
|
||||
|
||||
// notification type
|
||||
ofType := iservice.Shutdown
|
||||
|
|
|
@ -1,29 +1,65 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/open-feature/flagd/core/pkg/notifications"
|
||||
iservice "github.com/open-feature/flagd/core/pkg/service"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
)
|
||||
|
||||
// IEvents is an interface for event subscriptions
|
||||
type IEvents interface {
|
||||
Subscribe(id any, notifyChan chan iservice.Notification)
|
||||
Subscribe(ctx context.Context, id any, selector *store.Selector, notifyChan chan iservice.Notification)
|
||||
Unsubscribe(id any)
|
||||
EmitToAll(n iservice.Notification)
|
||||
}
|
||||
|
||||
var _ IEvents = &eventingConfiguration{}
|
||||
|
||||
// eventingConfiguration is a wrapper for notification subscriptions
|
||||
type eventingConfiguration struct {
|
||||
mu *sync.RWMutex
|
||||
subs map[any]chan iservice.Notification
|
||||
mu *sync.RWMutex
|
||||
subs map[any]chan iservice.Notification
|
||||
store store.IStore
|
||||
logger *logger.Logger
|
||||
}
|
||||
|
||||
func (eventing *eventingConfiguration) Subscribe(id any, notifyChan chan iservice.Notification) {
|
||||
func (eventing *eventingConfiguration) Subscribe(ctx context.Context, id any, selector *store.Selector, notifier chan iservice.Notification) {
|
||||
eventing.mu.Lock()
|
||||
defer eventing.mu.Unlock()
|
||||
|
||||
eventing.subs[id] = notifyChan
|
||||
// proxy events from our store watcher to the notify channel, so that RPC mode event streams
|
||||
watcher := make(chan store.FlagQueryResult, 1)
|
||||
go func() {
|
||||
// store the previous flags to compare against new notifications, to compute proper diffs for RPC mode
|
||||
var oldFlags map[string]model.Flag
|
||||
for result := range watcher {
|
||||
newFlags := result.Flags
|
||||
|
||||
// ignore the first notification (nil old flags), the watcher emits on initialization, but for RPC we don't care until there's a change
|
||||
if oldFlags != nil {
|
||||
notifications := notifications.NewFromFlags(oldFlags, newFlags)
|
||||
notifier <- iservice.Notification{
|
||||
Type: iservice.ConfigurationChange,
|
||||
Data: map[string]interface{}{
|
||||
"flags": notifications,
|
||||
},
|
||||
}
|
||||
}
|
||||
oldFlags = result.Flags
|
||||
}
|
||||
|
||||
eventing.logger.Debug(fmt.Sprintf("closing notify channel for id %v", id))
|
||||
close(notifier)
|
||||
}()
|
||||
|
||||
eventing.store.Watch(ctx, selector, watcher)
|
||||
eventing.subs[id] = notifier
|
||||
}
|
||||
|
||||
func (eventing *eventingConfiguration) EmitToAll(n iservice.Notification) {
|
||||
|
|
|
@ -1,18 +1,29 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
iservice "github.com/open-feature/flagd/core/pkg/service"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSubscribe(t *testing.T) {
|
||||
// given
|
||||
sources := []string{"source1", "source2"}
|
||||
log := logger.NewLogger(nil, false)
|
||||
s, err := store.NewStore(log, sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
|
||||
eventing := &eventingConfiguration{
|
||||
subs: make(map[interface{}]chan iservice.Notification),
|
||||
mu: &sync.RWMutex{},
|
||||
subs: make(map[interface{}]chan iservice.Notification),
|
||||
mu: &sync.RWMutex{},
|
||||
store: s,
|
||||
}
|
||||
|
||||
idA := "a"
|
||||
|
@ -22,8 +33,8 @@ func TestSubscribe(t *testing.T) {
|
|||
chanB := make(chan iservice.Notification, 1)
|
||||
|
||||
// when
|
||||
eventing.Subscribe(idA, chanA)
|
||||
eventing.Subscribe(idB, chanB)
|
||||
eventing.Subscribe(context.Background(), idA, nil, chanA)
|
||||
eventing.Subscribe(context.Background(), idB, nil, chanB)
|
||||
|
||||
// then
|
||||
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
|
||||
|
@ -32,9 +43,16 @@ func TestSubscribe(t *testing.T) {
|
|||
|
||||
func TestUnsubscribe(t *testing.T) {
|
||||
// given
|
||||
sources := []string{"source1", "source2"}
|
||||
log := logger.NewLogger(nil, false)
|
||||
s, err := store.NewStore(log, sources)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore failed: %v", err)
|
||||
}
|
||||
eventing := &eventingConfiguration{
|
||||
subs: make(map[interface{}]chan iservice.Notification),
|
||||
mu: &sync.RWMutex{},
|
||||
subs: make(map[interface{}]chan iservice.Notification),
|
||||
mu: &sync.RWMutex{},
|
||||
store: s,
|
||||
}
|
||||
|
||||
idA := "a"
|
||||
|
@ -43,8 +61,8 @@ func TestUnsubscribe(t *testing.T) {
|
|||
chanB := make(chan iservice.Notification, 1)
|
||||
|
||||
// when
|
||||
eventing.Subscribe(idA, chanA)
|
||||
eventing.Subscribe(idB, chanB)
|
||||
eventing.Subscribe(context.Background(), idA, nil, chanA)
|
||||
eventing.Subscribe(context.Background(), idB, nil, chanB)
|
||||
|
||||
eventing.Unsubscribe(idA)
|
||||
|
||||
|
|
|
@ -12,7 +12,9 @@ import (
|
|||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/open-feature/flagd/core/pkg/service"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"github.com/open-feature/flagd/core/pkg/telemetry"
|
||||
flagdService "github.com/open-feature/flagd/flagd/pkg/service"
|
||||
"github.com/rs/xid"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -67,13 +69,17 @@ func (s *OldFlagEvaluationService) ResolveAll(
|
|||
) (*connect.Response[schemaV1.ResolveAllResponse], error) {
|
||||
reqID := xid.New().String()
|
||||
defer s.logger.ClearFields(reqID)
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
res := &schemaV1.ResolveAllResponse{
|
||||
Flags: make(map[string]*schemaV1.AnyFlag),
|
||||
}
|
||||
|
||||
values, _, err := s.eval.ResolveAllValues(sCtx, reqID, mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), make(map[string]string)))
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
values, _, err := s.eval.ResolveAllValues(ctx, reqID, mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), make(map[string]string)))
|
||||
if err != nil {
|
||||
s.logger.WarnWithID(reqID, fmt.Sprintf("error resolving all flags: %v", err))
|
||||
return nil, fmt.Errorf("error resolving flags. Tracking ID: %s", reqID)
|
||||
|
@ -82,7 +88,7 @@ func (s *OldFlagEvaluationService) ResolveAll(
|
|||
span.SetAttributes(attribute.Int("feature_flag.count", len(values)))
|
||||
for _, value := range values {
|
||||
// register the impression and reason for each flag evaluated
|
||||
s.metrics.RecordEvaluation(sCtx, value.Error, value.Reason, value.Variant, value.FlagKey)
|
||||
s.metrics.RecordEvaluation(ctx, value.Error, value.Reason, value.Variant, value.FlagKey)
|
||||
|
||||
switch v := value.Value.(type) {
|
||||
case bool:
|
||||
|
@ -133,8 +139,12 @@ func (s *OldFlagEvaluationService) EventStream(
|
|||
req *connect.Request[schemaV1.EventStreamRequest],
|
||||
stream *connect.ServerStream[schemaV1.EventStreamResponse],
|
||||
) error {
|
||||
s.logger.Debug(fmt.Sprintf("starting event stream for request"))
|
||||
|
||||
requestNotificationChan := make(chan service.Notification, 1)
|
||||
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
s.eventingConfiguration.Subscribe(ctx, req, &selector, requestNotificationChan)
|
||||
defer s.eventingConfiguration.Unsubscribe(req)
|
||||
|
||||
requestNotificationChan <- service.Notification{
|
||||
|
@ -172,12 +182,15 @@ func (s *OldFlagEvaluationService) ResolveBoolean(
|
|||
ctx context.Context,
|
||||
req *connect.Request[schemaV1.ResolveBooleanRequest],
|
||||
) (*connect.Response[schemaV1.ResolveBooleanResponse], error) {
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
res := connect.NewResponse(&schemaV1.ResolveBooleanResponse{})
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
err := resolve[bool](
|
||||
sCtx,
|
||||
ctx,
|
||||
s.logger,
|
||||
s.eval.ResolveBooleanValue,
|
||||
req.Header(),
|
||||
|
@ -201,12 +214,16 @@ func (s *OldFlagEvaluationService) ResolveString(
|
|||
ctx context.Context,
|
||||
req *connect.Request[schemaV1.ResolveStringRequest],
|
||||
) (*connect.Response[schemaV1.ResolveStringResponse], error) {
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
res := connect.NewResponse(&schemaV1.ResolveStringResponse{})
|
||||
err := resolve[string](
|
||||
sCtx,
|
||||
ctx,
|
||||
s.logger,
|
||||
s.eval.ResolveStringValue,
|
||||
req.Header(),
|
||||
|
@ -230,12 +247,16 @@ func (s *OldFlagEvaluationService) ResolveInt(
|
|||
ctx context.Context,
|
||||
req *connect.Request[schemaV1.ResolveIntRequest],
|
||||
) (*connect.Response[schemaV1.ResolveIntResponse], error) {
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
res := connect.NewResponse(&schemaV1.ResolveIntResponse{})
|
||||
err := resolve[int64](
|
||||
sCtx,
|
||||
ctx,
|
||||
s.logger,
|
||||
s.eval.ResolveIntValue,
|
||||
req.Header(),
|
||||
|
@ -259,12 +280,16 @@ func (s *OldFlagEvaluationService) ResolveFloat(
|
|||
ctx context.Context,
|
||||
req *connect.Request[schemaV1.ResolveFloatRequest],
|
||||
) (*connect.Response[schemaV1.ResolveFloatResponse], error) {
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
res := connect.NewResponse(&schemaV1.ResolveFloatResponse{})
|
||||
err := resolve[float64](
|
||||
sCtx,
|
||||
ctx,
|
||||
s.logger,
|
||||
s.eval.ResolveFloatValue,
|
||||
req.Header(),
|
||||
|
@ -288,12 +313,16 @@ func (s *OldFlagEvaluationService) ResolveObject(
|
|||
ctx context.Context,
|
||||
req *connect.Request[schemaV1.ResolveObjectRequest],
|
||||
) (*connect.Response[schemaV1.ResolveObjectResponse], error) {
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
res := connect.NewResponse(&schemaV1.ResolveObjectResponse{})
|
||||
err := resolve[map[string]any](
|
||||
sCtx,
|
||||
ctx,
|
||||
s.logger,
|
||||
s.eval.ResolveObjectValue,
|
||||
req.Header(),
|
||||
|
@ -312,7 +341,7 @@ func (s *OldFlagEvaluationService) ResolveObject(
|
|||
return res, err
|
||||
}
|
||||
|
||||
// mergeContexts combines context values from headers, static context (from cli) and request context.
|
||||
// mergeContexts combines context values from headers, static context (from cli) and request context.
|
||||
// highest priority > header-context-from-cli > static-context-from-cli > request-context > lowest priority
|
||||
func mergeContexts(reqCtx, configFlagsCtx map[string]any, headers http.Header, headerToContextKeyMappings map[string]string) map[string]any {
|
||||
merged := make(map[string]any)
|
||||
|
@ -338,7 +367,7 @@ func resolve[T constraints](ctx context.Context, logger *logger.Logger, resolver
|
|||
reqID := xid.New().String()
|
||||
defer logger.ClearFields(reqID)
|
||||
|
||||
mergedContext := mergeContexts(evaluationContext.AsMap(), configContextValues, header, configHeaderToContextKeyMappings)
|
||||
mergedContext := mergeContexts(evaluationContext.AsMap(), configContextValues, header, configHeaderToContextKeyMappings)
|
||||
|
||||
logger.WriteFields(
|
||||
reqID,
|
||||
|
|
|
@ -11,7 +11,9 @@ import (
|
|||
"github.com/open-feature/flagd/core/pkg/evaluator"
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/service"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"github.com/open-feature/flagd/core/pkg/telemetry"
|
||||
flagdService "github.com/open-feature/flagd/flagd/pkg/service"
|
||||
"github.com/rs/xid"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -66,16 +68,19 @@ func (s *FlagEvaluationService) ResolveAll(
|
|||
reqID := xid.New().String()
|
||||
defer s.logger.ClearFields(reqID)
|
||||
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
res := &evalV1.ResolveAllResponse{
|
||||
Flags: make(map[string]*evalV1.AnyFlag),
|
||||
}
|
||||
|
||||
context := mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), s.headerToContextKeyMappings)
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
evaluationContext := mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), s.headerToContextKeyMappings)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
resolutions, flagSetMetadata, err := s.eval.ResolveAllValues(sCtx, reqID, context)
|
||||
resolutions, flagSetMetadata, err := s.eval.ResolveAllValues(ctx, reqID, evaluationContext)
|
||||
if err != nil {
|
||||
s.logger.WarnWithID(reqID, fmt.Sprintf("error resolving all flags: %v", err))
|
||||
return nil, fmt.Errorf("error resolving flags. Tracking ID: %s", reqID)
|
||||
|
@ -84,7 +89,7 @@ func (s *FlagEvaluationService) ResolveAll(
|
|||
span.SetAttributes(attribute.Int("feature_flag.count", len(resolutions)))
|
||||
for _, resolved := range resolutions {
|
||||
// register the impression and reason for each flag evaluated
|
||||
s.metrics.RecordEvaluation(sCtx, resolved.Error, resolved.Reason, resolved.Variant, resolved.FlagKey)
|
||||
s.metrics.RecordEvaluation(ctx, resolved.Error, resolved.Reason, resolved.Variant, resolved.FlagKey)
|
||||
switch v := resolved.Value.(type) {
|
||||
case bool:
|
||||
res.Flags[resolved.FlagKey] = &evalV1.AnyFlag{
|
||||
|
@ -147,17 +152,21 @@ func (s *FlagEvaluationService) EventStream(
|
|||
req *connect.Request[evalV1.EventStreamRequest],
|
||||
stream *connect.ServerStream[evalV1.EventStreamResponse],
|
||||
) error {
|
||||
serviceCtx := ctx
|
||||
// attach server-side stream deadline to context
|
||||
s.logger.Debug("starting event stream for request")
|
||||
|
||||
if s.deadline != 0 {
|
||||
streamDeadline := time.Now().Add(s.deadline)
|
||||
deadlineCtx, cancel := context.WithDeadline(ctx, streamDeadline)
|
||||
serviceCtx = deadlineCtx
|
||||
ctx = deadlineCtx
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
s.logger.Debug("starting event stream for request")
|
||||
requestNotificationChan := make(chan service.Notification, 1)
|
||||
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
s.eventingConfiguration.Subscribe(ctx, req, &selector, requestNotificationChan)
|
||||
defer s.eventingConfiguration.Unsubscribe(req)
|
||||
|
||||
requestNotificationChan <- service.Notification{
|
||||
|
@ -184,8 +193,8 @@ func (s *FlagEvaluationService) EventStream(
|
|||
if err != nil {
|
||||
s.logger.Error(err.Error())
|
||||
}
|
||||
case <-serviceCtx.Done():
|
||||
if errors.Is(serviceCtx.Err(), context.DeadlineExceeded) {
|
||||
case <-ctx.Done():
|
||||
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
|
||||
s.logger.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
|
||||
return connect.NewError(connect.CodeDeadlineExceeded, fmt.Errorf("%s", "stream closed due to server-side timeout"))
|
||||
}
|
||||
|
@ -198,12 +207,16 @@ func (s *FlagEvaluationService) ResolveBoolean(
|
|||
ctx context.Context,
|
||||
req *connect.Request[evalV1.ResolveBooleanRequest],
|
||||
) (*connect.Response[evalV1.ResolveBooleanResponse], error) {
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
res := connect.NewResponse(&evalV1.ResolveBooleanResponse{})
|
||||
err := resolve(
|
||||
sCtx,
|
||||
ctx,
|
||||
s.logger,
|
||||
s.eval.ResolveBooleanValue,
|
||||
req.Header(),
|
||||
|
@ -226,12 +239,16 @@ func (s *FlagEvaluationService) ResolveString(
|
|||
ctx context.Context,
|
||||
req *connect.Request[evalV1.ResolveStringRequest],
|
||||
) (*connect.Response[evalV1.ResolveStringResponse], error) {
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
res := connect.NewResponse(&evalV1.ResolveStringResponse{})
|
||||
err := resolve(
|
||||
sCtx,
|
||||
ctx,
|
||||
s.logger,
|
||||
s.eval.ResolveStringValue,
|
||||
req.Header(),
|
||||
|
@ -254,12 +271,16 @@ func (s *FlagEvaluationService) ResolveInt(
|
|||
ctx context.Context,
|
||||
req *connect.Request[evalV1.ResolveIntRequest],
|
||||
) (*connect.Response[evalV1.ResolveIntResponse], error) {
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
res := connect.NewResponse(&evalV1.ResolveIntResponse{})
|
||||
err := resolve(
|
||||
sCtx,
|
||||
ctx,
|
||||
s.logger,
|
||||
s.eval.ResolveIntValue,
|
||||
req.Header(),
|
||||
|
@ -282,12 +303,16 @@ func (s *FlagEvaluationService) ResolveFloat(
|
|||
ctx context.Context,
|
||||
req *connect.Request[evalV1.ResolveFloatRequest],
|
||||
) (*connect.Response[evalV1.ResolveFloatResponse], error) {
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
res := connect.NewResponse(&evalV1.ResolveFloatResponse{})
|
||||
err := resolve(
|
||||
sCtx,
|
||||
ctx,
|
||||
s.logger,
|
||||
s.eval.ResolveFloatValue,
|
||||
req.Header(),
|
||||
|
@ -310,12 +335,16 @@ func (s *FlagEvaluationService) ResolveObject(
|
|||
ctx context.Context,
|
||||
req *connect.Request[evalV1.ResolveObjectRequest],
|
||||
) (*connect.Response[evalV1.ResolveObjectResponse], error) {
|
||||
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
|
||||
ctx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
|
||||
|
||||
res := connect.NewResponse(&evalV1.ResolveObjectResponse{})
|
||||
err := resolve(
|
||||
sCtx,
|
||||
ctx,
|
||||
s.logger,
|
||||
s.eval.ResolveObjectValue,
|
||||
req.Header(),
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package ofrep
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
@ -10,6 +11,8 @@ import (
|
|||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/open-feature/flagd/core/pkg/service/ofrep"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"github.com/open-feature/flagd/flagd/pkg/service"
|
||||
"github.com/rs/xid"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
"go.opentelemetry.io/otel"
|
||||
|
@ -64,9 +67,12 @@ func (h *handler) HandleFlagEvaluation(w http.ResponseWriter, r *http.Request) {
|
|||
h.writeJSONToResponse(http.StatusBadRequest, ofrep.ContextErrorResponseFrom(flagKey), w)
|
||||
return
|
||||
}
|
||||
context := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
|
||||
evaluationContext := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
|
||||
selectorExpression := r.Header.Get(service.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx := context.WithValue(r.Context(), store.SelectorContextKey{}, selector)
|
||||
|
||||
evaluation := h.evaluator.ResolveAsAnyValue(r.Context(), requestID, flagKey, context)
|
||||
evaluation := h.evaluator.ResolveAsAnyValue(ctx, requestID, flagKey, evaluationContext)
|
||||
if evaluation.Error != nil {
|
||||
status, evaluationError := ofrep.EvaluationErrorResponseFrom(evaluation)
|
||||
h.writeJSONToResponse(status, evaluationError, w)
|
||||
|
@ -85,9 +91,12 @@ func (h *handler) HandleBulkEvaluation(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
context := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
|
||||
evaluationContext := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
|
||||
selectorExpression := r.Header.Get(service.FLAGD_SELECTOR_HEADER)
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx := context.WithValue(r.Context(), store.SelectorContextKey{}, selector)
|
||||
|
||||
evaluations, metadata, err := h.evaluator.ResolveAllValues(r.Context(), requestID, context)
|
||||
evaluations, metadata, err := h.evaluator.ResolveAllValues(ctx, requestID, evaluationContext)
|
||||
if err != nil {
|
||||
h.Logger.WarnWithID(requestID, fmt.Sprintf("error from resolver: %v", err))
|
||||
|
||||
|
@ -127,7 +136,7 @@ func extractOfrepRequest(req *http.Request) (ofrep.Request, error) {
|
|||
return request, nil
|
||||
}
|
||||
|
||||
// flagdContext returns combined context values from headers, static context (from cli) and request context.
|
||||
// flagdContext returns combined context values from headers, static context (from cli) and request context.
|
||||
// highest priority > header-context-from-cli > static-context-from-cli > request-context > lowest priority
|
||||
func flagdContext(
|
||||
log *logger.Logger, requestID string, request ofrep.Request, staticContextValues map[string]any, headers http.Header, headerToContextKeyMappings map[string]string,
|
||||
|
@ -152,4 +161,4 @@ func flagdContext(
|
|||
}
|
||||
|
||||
return context
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,73 +2,74 @@ package sync
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"time"
|
||||
|
||||
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
|
||||
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
)
|
||||
|
||||
// syncHandler implements the sync contract
|
||||
type syncHandler struct {
|
||||
mux *Multiplexer
|
||||
log *logger.Logger
|
||||
contextValues map[string]any
|
||||
deadline time.Duration
|
||||
//mux *Multiplexer
|
||||
store *store.Store
|
||||
log *logger.Logger
|
||||
contextValues map[string]any
|
||||
deadline time.Duration
|
||||
disableSyncMetadata bool
|
||||
}
|
||||
|
||||
func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.FlagSyncService_SyncFlagsServer) error {
|
||||
muxPayload := make(chan payload, 1)
|
||||
selector := req.GetSelector()
|
||||
watcher := make(chan store.FlagQueryResult, 1)
|
||||
selectorExpression := req.GetSelector()
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
ctx := server.Context()
|
||||
|
||||
syncContextMap := make(map[string]any)
|
||||
maps.Copy(syncContextMap, s.contextValues)
|
||||
syncContext, err := structpb.NewStruct(syncContextMap)
|
||||
if err != nil {
|
||||
return status.Error(codes.DataLoss, "error constructing sync context")
|
||||
}
|
||||
|
||||
// attach server-side stream deadline to context
|
||||
if s.deadline != 0 {
|
||||
streamDeadline := time.Now().Add(s.deadline)
|
||||
deadlineCtx, cancel := context.WithDeadline(server.Context(), streamDeadline)
|
||||
deadlineCtx, cancel := context.WithDeadline(ctx, streamDeadline)
|
||||
ctx = deadlineCtx
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
err := s.mux.Register(ctx, selector, muxPayload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.store.Watch(ctx, &selector, watcher)
|
||||
|
||||
for {
|
||||
select {
|
||||
case payload := <-muxPayload:
|
||||
|
||||
metadataSrc := make(map[string]any)
|
||||
maps.Copy(metadataSrc, s.contextValues)
|
||||
|
||||
if sources := s.mux.SourcesAsMetadata(); sources != "" {
|
||||
metadataSrc["sources"] = sources
|
||||
}
|
||||
|
||||
metadata, err := structpb.NewStruct(metadataSrc)
|
||||
case payload := <-watcher:
|
||||
if err != nil {
|
||||
s.log.Error(fmt.Sprintf("error from struct creation: %v", err))
|
||||
return fmt.Errorf("error constructing metadata response")
|
||||
}
|
||||
flags, err := json.Marshal(payload.Flags)
|
||||
if err != nil {
|
||||
s.log.Error(fmt.Sprintf("error retrieving flags from store: %v", err))
|
||||
return status.Error(codes.DataLoss, "error marshalling flags")
|
||||
}
|
||||
|
||||
err = server.Send(&syncv1.SyncFlagsResponse{
|
||||
FlagConfiguration: payload.flags,
|
||||
SyncContext: metadata,
|
||||
})
|
||||
err = server.Send(&syncv1.SyncFlagsResponse{FlagConfiguration: string(flags), SyncContext: syncContext})
|
||||
if err != nil {
|
||||
s.log.Debug(fmt.Sprintf("error sending stream response: %v", err))
|
||||
return fmt.Errorf("error sending stream response: %w", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
s.mux.Unregister(ctx, selector)
|
||||
|
||||
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
|
||||
s.log.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
|
||||
return status.Error(codes.DeadlineExceeded, "stream closed due to server-side timeout")
|
||||
|
@ -79,16 +80,25 @@ func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.F
|
|||
}
|
||||
}
|
||||
|
||||
func (s syncHandler) FetchAllFlags(_ context.Context, req *syncv1.FetchAllFlagsRequest) (
|
||||
func (s syncHandler) FetchAllFlags(ctx context.Context, req *syncv1.FetchAllFlagsRequest) (
|
||||
*syncv1.FetchAllFlagsResponse, error,
|
||||
) {
|
||||
flags, err := s.mux.GetAllFlags(req.GetSelector())
|
||||
selectorExpression := req.GetSelector()
|
||||
selector := store.NewSelector(selectorExpression)
|
||||
flags, _, err := s.store.GetAll(ctx, &selector)
|
||||
if err != nil {
|
||||
s.log.Error(fmt.Sprintf("error retrieving flags from store: %v", err))
|
||||
return nil, status.Error(codes.Internal, "error retrieving flags from store")
|
||||
}
|
||||
|
||||
flagsString, err := json.Marshal(flags)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &syncv1.FetchAllFlagsResponse{
|
||||
FlagConfiguration: flags,
|
||||
FlagConfiguration: string(flagsString),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -97,13 +107,13 @@ func (s syncHandler) FetchAllFlags(_ context.Context, req *syncv1.FetchAllFlagsR
|
|||
func (s syncHandler) GetMetadata(_ context.Context, _ *syncv1.GetMetadataRequest) (
|
||||
*syncv1.GetMetadataResponse, error,
|
||||
) {
|
||||
if s.disableSyncMetadata {
|
||||
return nil, status.Error(codes.Unimplemented, "metadata endpoint disabled")
|
||||
}
|
||||
metadataSrc := make(map[string]any)
|
||||
for k, v := range s.contextValues {
|
||||
metadataSrc[k] = v
|
||||
}
|
||||
if sources := s.mux.SourcesAsMetadata(); sources != "" {
|
||||
metadataSrc["sources"] = sources
|
||||
}
|
||||
|
||||
metadata, err := structpb.NewStruct(metadataSrc)
|
||||
if err != nil {
|
||||
|
|
|
@ -22,21 +22,18 @@ func TestSyncHandler_SyncFlags(t *testing.T) {
|
|||
wantMetadata map[string]any
|
||||
}{
|
||||
{
|
||||
name: "with sources and context",
|
||||
sources: []string{"A, B, C"},
|
||||
name: "with sources and context",
|
||||
contextValues: map[string]any{
|
||||
"env": "prod",
|
||||
"region": "us-west",
|
||||
},
|
||||
wantMetadata: map[string]any{
|
||||
"sources": "A, B, C",
|
||||
"env": "prod",
|
||||
"region": "us-west",
|
||||
"env": "prod",
|
||||
"region": "us-west",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with empty sources",
|
||||
sources: []string{},
|
||||
name: "with empty sources",
|
||||
contextValues: map[string]any{
|
||||
"env": "dev",
|
||||
},
|
||||
|
@ -46,62 +43,59 @@ func TestSyncHandler_SyncFlags(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "with empty context",
|
||||
sources: []string{"A,B,C"},
|
||||
contextValues: map[string]any{},
|
||||
wantMetadata: map[string]any{
|
||||
"sources": "A,B,C",
|
||||
},
|
||||
wantMetadata: map[string]any{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Shared handler for testing both GetMetadata & SyncFlags methods
|
||||
flagStore := store.NewFlags()
|
||||
mp, err := NewMux(flagStore, tt.sources)
|
||||
require.NoError(t, err)
|
||||
for _, disableSyncMetadata := range []bool{true, false} {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Shared handler for testing both GetMetadata & SyncFlags methods
|
||||
flagStore, err := store.NewStore(logger.NewLogger(nil, false), tt.sources)
|
||||
require.NoError(t, err)
|
||||
|
||||
handler := syncHandler{
|
||||
mux: mp,
|
||||
contextValues: tt.contextValues,
|
||||
log: logger.NewLogger(nil, false),
|
||||
}
|
||||
handler := syncHandler{
|
||||
store: flagStore,
|
||||
contextValues: tt.contextValues,
|
||||
log: logger.NewLogger(nil, false),
|
||||
disableSyncMetadata: disableSyncMetadata,
|
||||
}
|
||||
|
||||
// Test getting metadata from `GetMetadata` (deprecated)
|
||||
// remove when `GetMetadata` is full removed and deprecated
|
||||
metaResp, err := handler.GetMetadata(context.Background(), &syncv1.GetMetadataRequest{})
|
||||
require.NoError(t, err)
|
||||
respMetadata := metaResp.GetMetadata().AsMap()
|
||||
assert.Equal(t, tt.wantMetadata, respMetadata)
|
||||
|
||||
// Test metadata from sync_context
|
||||
stream := &mockSyncFlagsServer{
|
||||
ctx: context.Background(),
|
||||
mu: sync.Mutex{},
|
||||
respReady: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := handler.SyncFlags(&syncv1.SyncFlagsRequest{}, stream)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stream.respReady:
|
||||
syncResp := stream.GetLastResponse()
|
||||
assert.NotNil(t, syncResp)
|
||||
|
||||
syncMetadata := syncResp.GetSyncContext().AsMap()
|
||||
assert.Equal(t, tt.wantMetadata, syncMetadata)
|
||||
|
||||
// Check the two metadatas are equal
|
||||
// Test getting metadata from `GetMetadata` (deprecated)
|
||||
// remove when `GetMetadata` is full removed and deprecated
|
||||
assert.Equal(t, respMetadata, syncMetadata)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for response")
|
||||
}
|
||||
metaResp, err := handler.GetMetadata(context.Background(), &syncv1.GetMetadataRequest{})
|
||||
if !disableSyncMetadata {
|
||||
require.NoError(t, err)
|
||||
respMetadata := metaResp.GetMetadata().AsMap()
|
||||
assert.Equal(t, tt.wantMetadata, respMetadata)
|
||||
} else {
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
})
|
||||
// Test metadata from sync_context
|
||||
stream := &mockSyncFlagsServer{
|
||||
ctx: context.Background(),
|
||||
mu: sync.Mutex{},
|
||||
respReady: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := handler.SyncFlags(&syncv1.SyncFlagsRequest{}, stream)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stream.respReady:
|
||||
syncResp := stream.GetLastResponse()
|
||||
assert.NotNil(t, syncResp)
|
||||
syncMetadata := syncResp.GetSyncContext().AsMap()
|
||||
assert.Equal(t, tt.wantMetadata, syncMetadata)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for response")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,213 +0,0 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
)
|
||||
|
||||
//nolint:errchkjson
|
||||
var emptyConfigBytes, _ = json.Marshal(map[string]map[string]string{
|
||||
"flags": {},
|
||||
})
|
||||
|
||||
// Multiplexer abstract subscription handling and storage processing.
|
||||
// Flag configurations will be lazy loaded using reFill logic upon the calls to publish.
|
||||
type Multiplexer struct {
|
||||
store *store.State
|
||||
sources []string
|
||||
|
||||
subs map[interface{}]subscription // subscriptions on all sources
|
||||
selectorSubs map[string]map[interface{}]subscription // source specific subscriptions
|
||||
|
||||
allFlags string // pre-calculated all flags in store as a string
|
||||
selectorFlags map[string]string // pre-calculated selector scoped flags in store as strings
|
||||
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
type subscription struct {
|
||||
id interface{}
|
||||
channel chan payload
|
||||
}
|
||||
|
||||
type payload struct {
|
||||
flags string
|
||||
}
|
||||
|
||||
// NewMux creates a new sync multiplexer
|
||||
func NewMux(store *store.State, sources []string) (*Multiplexer, error) {
|
||||
m := &Multiplexer{
|
||||
store: store,
|
||||
sources: sources,
|
||||
subs: map[interface{}]subscription{},
|
||||
selectorSubs: map[string]map[interface{}]subscription{},
|
||||
selectorFlags: map[string]string{},
|
||||
}
|
||||
|
||||
return m, m.reFill()
|
||||
}
|
||||
|
||||
// Register a subscription
|
||||
func (r *Multiplexer) Register(id interface{}, source string, con chan payload) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if source != "" && !slices.Contains(r.sources, source) {
|
||||
return fmt.Errorf("no flag watcher setup for source %s", source)
|
||||
}
|
||||
|
||||
var initSync string
|
||||
|
||||
if source == "" {
|
||||
// subscribe for flags from all source
|
||||
r.subs[id] = subscription{
|
||||
id: id,
|
||||
channel: con,
|
||||
}
|
||||
|
||||
initSync = r.allFlags
|
||||
} else {
|
||||
// subscribe for specific source
|
||||
s, ok := r.selectorSubs[source]
|
||||
if ok {
|
||||
s[id] = subscription{
|
||||
id: id,
|
||||
channel: con,
|
||||
}
|
||||
} else {
|
||||
r.selectorSubs[source] = map[interface{}]subscription{
|
||||
id: {
|
||||
id: id,
|
||||
channel: con,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
initSync = r.selectorFlags[source]
|
||||
}
|
||||
|
||||
// Initial sync
|
||||
con <- payload{flags: initSync}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publish sync updates to subscriptions
|
||||
func (r *Multiplexer) Publish() error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
// perform a refill prior to publishing
|
||||
err := r.reFill()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// push to all source subs
|
||||
for _, sub := range r.subs {
|
||||
sub.channel <- payload{r.allFlags}
|
||||
}
|
||||
|
||||
// push to selector subs
|
||||
for source, flags := range r.selectorFlags {
|
||||
for _, s := range r.selectorSubs[source] {
|
||||
s.channel <- payload{flags}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unregister a subscription
|
||||
func (r *Multiplexer) Unregister(id interface{}, selector string) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
var from map[interface{}]subscription
|
||||
|
||||
if selector == "" {
|
||||
from = r.subs
|
||||
} else {
|
||||
from = r.selectorSubs[selector]
|
||||
}
|
||||
|
||||
delete(from, id)
|
||||
}
|
||||
|
||||
// GetAllFlags per specific source
|
||||
func (r *Multiplexer) GetAllFlags(source string) (string, error) {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
if source == "" {
|
||||
return r.allFlags, nil
|
||||
}
|
||||
|
||||
if !slices.Contains(r.sources, source) {
|
||||
return "", fmt.Errorf("no flag watcher setup for source %s", source)
|
||||
}
|
||||
|
||||
return r.selectorFlags[source], nil
|
||||
}
|
||||
|
||||
// SourcesAsMetadata returns all known sources, comma separated to be used as service metadata
|
||||
func (r *Multiplexer) SourcesAsMetadata() string {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
return strings.Join(r.sources, ",")
|
||||
}
|
||||
|
||||
// reFill local configuration values
|
||||
func (r *Multiplexer) reFill() error {
|
||||
clear(r.selectorFlags)
|
||||
// start all sources with empty config
|
||||
for _, source := range r.sources {
|
||||
r.selectorFlags[source] = string(emptyConfigBytes)
|
||||
}
|
||||
|
||||
all, metadata, err := r.store.GetAll(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving flags from the store: %w", err)
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(map[string]interface{}{"flags": all, "metadata": metadata})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error marshalling: %w", err)
|
||||
}
|
||||
|
||||
r.allFlags = string(bytes)
|
||||
|
||||
collector := map[string]map[string]model.Flag{}
|
||||
|
||||
for key, flag := range all {
|
||||
c, ok := collector[flag.Source]
|
||||
if ok {
|
||||
c[key] = flag
|
||||
} else {
|
||||
collector[flag.Source] = map[string]model.Flag{
|
||||
key: flag,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// for all flags, sort them into their correct selector
|
||||
for source, flags := range collector {
|
||||
// store the corresponding metadata
|
||||
metadata := r.store.GetMetadataForSource(source)
|
||||
bytes, err := json.Marshal(map[string]interface{}{"flags": flags, "metadata": metadata})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to marshal flags: %w", err)
|
||||
}
|
||||
|
||||
r.selectorFlags[source] = string(bytes)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,261 +0,0 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const emptyConfigString = "{\"flags\":{}}"
|
||||
|
||||
func TestRegistration(t *testing.T) {
|
||||
// given
|
||||
mux, err := NewMux(getSimpleFlagStore())
|
||||
if err != nil {
|
||||
t.Fatal("error during flag extraction")
|
||||
return
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
testName string
|
||||
id interface{}
|
||||
source string
|
||||
flagStringValidator func(flagString string, testSource string, testName string)
|
||||
connection chan payload
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
testName: "subscribe to all flags",
|
||||
id: context.Background(),
|
||||
connection: make(chan payload, 1),
|
||||
},
|
||||
{
|
||||
testName: "subscribe to source A",
|
||||
id: context.Background(),
|
||||
source: "A",
|
||||
flagStringValidator: func(flagString string, testSource string, testName string) {
|
||||
assert.Contains(t, flagString, fmt.Sprintf("\"source\":\"%s\"", testSource))
|
||||
},
|
||||
connection: make(chan payload, 1),
|
||||
},
|
||||
{
|
||||
testName: "subscribe to source B",
|
||||
id: context.Background(),
|
||||
source: "B",
|
||||
flagStringValidator: func(flagString string, testSource string, testName string) {
|
||||
assert.Contains(t, flagString, fmt.Sprintf("\"source\":\"%s\"", testSource))
|
||||
},
|
||||
connection: make(chan payload, 1),
|
||||
},
|
||||
|
||||
{
|
||||
testName: "subscribe to empty",
|
||||
id: context.Background(),
|
||||
source: "C",
|
||||
connection: make(chan payload, 1),
|
||||
flagStringValidator: func(flagString string, testSource string, testName string) {
|
||||
assert.Equal(t, flagString, emptyConfigString)
|
||||
},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
testName: "subscribe to non-existing",
|
||||
id: context.Background(),
|
||||
source: "D",
|
||||
connection: make(chan payload, 1),
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
|
||||
// validate registration
|
||||
for _, test := range tests {
|
||||
t.Run(test.testName, func(t *testing.T) {
|
||||
// when
|
||||
err := mux.Register(test.id, test.source, test.connection)
|
||||
|
||||
// then
|
||||
if !test.expectError && err != nil {
|
||||
t.Fatal("expected no errors, but got error")
|
||||
}
|
||||
|
||||
if test.expectError && err != nil {
|
||||
// pass
|
||||
return
|
||||
}
|
||||
|
||||
// validate subscription
|
||||
var initSync payload
|
||||
select {
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("data sync did not complete for initial sync within an acceptable timeframe")
|
||||
|
||||
case initSync = <-test.connection:
|
||||
break
|
||||
}
|
||||
|
||||
if test.flagStringValidator != nil {
|
||||
test.flagStringValidator(initSync.flags, test.source, test.testName)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateAndRemoval(t *testing.T) {
|
||||
// given
|
||||
mux, err := NewMux(getSimpleFlagStore())
|
||||
if err != nil {
|
||||
t.Fatal("error during flag extraction")
|
||||
return
|
||||
}
|
||||
|
||||
identifier := context.Background()
|
||||
channel := make(chan payload, 1)
|
||||
err = mux.Register(identifier, "", channel)
|
||||
if err != nil {
|
||||
t.Fatal("error during subscription registration")
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("data sync did not complete for initial sync within an acceptable timeframe")
|
||||
case <-channel:
|
||||
break
|
||||
}
|
||||
|
||||
// when - updates are triggered
|
||||
err = mux.Publish()
|
||||
if err != nil {
|
||||
t.Fatal("failure to trigger update request on multiplexer")
|
||||
return
|
||||
}
|
||||
|
||||
// then
|
||||
select {
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("data sync did not complete for initial sync within an acceptable timeframe")
|
||||
case <-channel:
|
||||
break
|
||||
}
|
||||
|
||||
// when - subscription removed & update triggered
|
||||
mux.Unregister(identifier, "")
|
||||
err = mux.Publish()
|
||||
if err != nil {
|
||||
t.Fatal("failure to trigger update request on multiplexer")
|
||||
return
|
||||
}
|
||||
|
||||
// then
|
||||
select {
|
||||
case <-time.After(2 * time.Second):
|
||||
break
|
||||
case <-channel:
|
||||
t.Fatal("expected no sync but got an update as removal was not performed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAllFlags(t *testing.T) {
|
||||
// given
|
||||
mux, err := NewMux(getSimpleFlagStore())
|
||||
if err != nil {
|
||||
t.Fatal("error during flag extraction")
|
||||
return
|
||||
}
|
||||
|
||||
// when - get all with open scope
|
||||
flagConfig, err := mux.GetAllFlags("")
|
||||
if err != nil {
|
||||
t.Fatal("error when retrieving all flags")
|
||||
return
|
||||
}
|
||||
|
||||
if len(flagConfig) == 0 {
|
||||
t.Fatal("expected no empty flags")
|
||||
return
|
||||
}
|
||||
|
||||
// when - get all with a scope
|
||||
flagConfig, err = mux.GetAllFlags("A")
|
||||
if err != nil {
|
||||
t.Fatal("error when retrieving all flags")
|
||||
return
|
||||
}
|
||||
|
||||
if len(flagConfig) == 0 || !strings.Contains(flagConfig, fmt.Sprintf("\"source\":\"%s\"", "A")) {
|
||||
t.Fatal("expected flags to be scoped")
|
||||
return
|
||||
}
|
||||
|
||||
// when - get all for a flagless-scope
|
||||
flagConfig, err = mux.GetAllFlags("C")
|
||||
if err != nil {
|
||||
t.Fatal("error when retrieving all flags")
|
||||
return
|
||||
}
|
||||
|
||||
assert.Equal(t, flagConfig, emptyConfigString)
|
||||
}
|
||||
|
||||
func TestGetAllFlagsMetadata(t *testing.T) {
|
||||
// given
|
||||
mux, err := NewMux(getSimpleFlagStore())
|
||||
if err != nil {
|
||||
t.Fatal("error during flag extraction")
|
||||
return
|
||||
}
|
||||
|
||||
// when - get all with open scope
|
||||
flagConfig, err := mux.GetAllFlags("")
|
||||
if err != nil {
|
||||
t.Fatal("error when retrieving all flags")
|
||||
return
|
||||
}
|
||||
|
||||
if len(flagConfig) == 0 {
|
||||
t.Fatal("expected no empty flags")
|
||||
return
|
||||
}
|
||||
|
||||
if !strings.Contains(flagConfig, "\"keyA\":\"valueA\"") {
|
||||
t.Fatal("expected unique metadata key for A to be present")
|
||||
return
|
||||
}
|
||||
|
||||
if !strings.Contains(flagConfig, "\"keyB\":\"valueB\"") {
|
||||
t.Fatal("expected unique metadata key for B to be present")
|
||||
return
|
||||
}
|
||||
|
||||
// duplicated keys are removed
|
||||
if strings.Contains(flagConfig, "\"keyDuped\":\"value\"") {
|
||||
t.Fatal("expected duplicated metadata key NOT to be present")
|
||||
return
|
||||
}
|
||||
|
||||
// when - get all with a scope
|
||||
flagConfig, err = mux.GetAllFlags("A")
|
||||
if err != nil {
|
||||
t.Fatal("error when retrieving all flags")
|
||||
return
|
||||
}
|
||||
|
||||
if len(flagConfig) == 0 {
|
||||
t.Fatal("expected no empty flags")
|
||||
return
|
||||
}
|
||||
|
||||
if !strings.Contains(flagConfig, "\"keyA\":\"valueA\"") {
|
||||
t.Fatal("expected unique metadata key to be present")
|
||||
return
|
||||
}
|
||||
|
||||
if !strings.Contains(flagConfig, "\"keyDuped\":\"value\"") {
|
||||
t.Fatal("expected duplicated metadata key to be present")
|
||||
return
|
||||
}
|
||||
}
|
|
@ -21,25 +21,25 @@ type ISyncService interface {
|
|||
Start(context.Context) error
|
||||
|
||||
// Emit updates for sync listeners
|
||||
Emit(isResync bool, source string)
|
||||
Emit(source string)
|
||||
}
|
||||
|
||||
type SvcConfigurations struct {
|
||||
Logger *logger.Logger
|
||||
Port uint16
|
||||
Sources []string
|
||||
Store *store.State
|
||||
ContextValues map[string]any
|
||||
CertPath string
|
||||
KeyPath string
|
||||
SocketPath string
|
||||
StreamDeadline time.Duration
|
||||
Logger *logger.Logger
|
||||
Port uint16
|
||||
Sources []string
|
||||
Store *store.Store
|
||||
ContextValues map[string]any
|
||||
CertPath string
|
||||
KeyPath string
|
||||
SocketPath string
|
||||
StreamDeadline time.Duration
|
||||
DisableSyncMetadata bool
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
listener net.Listener
|
||||
logger *logger.Logger
|
||||
mux *Multiplexer
|
||||
server *grpc.Server
|
||||
|
||||
startupTracker syncTracker
|
||||
|
@ -65,7 +65,6 @@ func loadTLSCredentials(certPath string, keyPath string) (credentials.TransportC
|
|||
func NewSyncService(cfg SvcConfigurations) (*Service, error) {
|
||||
var err error
|
||||
l := cfg.Logger
|
||||
mux, err := NewMux(cfg.Store, cfg.Sources)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error initializing multiplexer: %w", err)
|
||||
}
|
||||
|
@ -82,10 +81,11 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
|
|||
}
|
||||
|
||||
syncv1grpc.RegisterFlagSyncServiceServer(server, &syncHandler{
|
||||
mux: mux,
|
||||
log: l,
|
||||
contextValues: cfg.ContextValues,
|
||||
deadline: cfg.StreamDeadline,
|
||||
store: cfg.Store,
|
||||
log: l,
|
||||
contextValues: cfg.ContextValues,
|
||||
deadline: cfg.StreamDeadline,
|
||||
disableSyncMetadata: cfg.DisableSyncMetadata,
|
||||
})
|
||||
|
||||
var lis net.Listener
|
||||
|
@ -103,7 +103,6 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
|
|||
return &Service{
|
||||
listener: lis,
|
||||
logger: l,
|
||||
mux: mux,
|
||||
server: server,
|
||||
startupTracker: syncTracker{
|
||||
sources: slices.Clone(cfg.Sources),
|
||||
|
@ -149,16 +148,8 @@ func (s *Service) Start(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Emit(isResync bool, source string) {
|
||||
func (s *Service) Emit(source string) {
|
||||
s.startupTracker.trackAndRemove(source)
|
||||
|
||||
if !isResync {
|
||||
err := s.mux.Publish()
|
||||
if err != nil {
|
||||
s.logger.Warn(fmt.Sprintf("error while publishing sync streams: %v", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) shutdown() {
|
||||
|
|
|
@ -3,16 +3,16 @@ package sync
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"log"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
|
||||
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
|
@ -36,143 +36,129 @@ func TestSyncServiceEndToEnd(t *testing.T) {
|
|||
{title: "with unix socket connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "/tmp/flagd", tls: false, wantStartErr: false},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
|
||||
// given
|
||||
port := 18016
|
||||
flagStore, sources := getSimpleFlagStore()
|
||||
for _, disableSyncMetadata := range []bool{true, false} {
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
|
||||
// given
|
||||
port := 18016
|
||||
flagStore, sources := getSimpleFlagStore(t)
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
|
||||
service, doneChan, err := createAndStartSyncService(port, sources, flagStore, tc.certPath, tc.keyPath, tc.socketPath, ctx, 0)
|
||||
_, doneChan, err := createAndStartSyncService(
|
||||
port,
|
||||
sources,
|
||||
flagStore,
|
||||
tc.certPath,
|
||||
tc.keyPath,
|
||||
tc.socketPath,
|
||||
ctx,
|
||||
0,
|
||||
disableSyncMetadata,
|
||||
)
|
||||
|
||||
if tc.wantStartErr {
|
||||
if err == nil {
|
||||
t.Fatal("expected error creating the service!")
|
||||
}
|
||||
return
|
||||
} else if err != nil {
|
||||
t.Fatal("unexpected error creating the service: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
// when - derive a client for sync service
|
||||
serviceClient := getSyncClient(t, tc.clientCertPath, tc.socketPath, tc.tls, port, ctx)
|
||||
|
||||
// then
|
||||
|
||||
// sync flags request
|
||||
flags, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{})
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Printf("error from sync request: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
syncRsp, err := flags.Recv()
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Printf("stream error: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
if len(syncRsp.GetFlagConfiguration()) == 0 {
|
||||
t.Error("expected non empty sync response, but got empty")
|
||||
}
|
||||
|
||||
// checks sync context actually set
|
||||
syncContext := syncRsp.GetSyncContext()
|
||||
if syncContext == nil {
|
||||
t.Fatal("expected sync_context in SyncFlagsResponse, but got nil")
|
||||
}
|
||||
|
||||
syncAsMap := syncContext.AsMap()
|
||||
if syncAsMap["sources"] == nil {
|
||||
t.Fatalf("expected sources in sync_context, but got nil")
|
||||
}
|
||||
|
||||
sourcesStr := syncAsMap["sources"].(string)
|
||||
sourcesArray := strings.Split(sourcesStr, ",")
|
||||
sort.Strings(sourcesArray)
|
||||
|
||||
expectedSources := []string{"A", "B", "C"}
|
||||
if !reflect.DeepEqual(sourcesArray, expectedSources) {
|
||||
t.Fatalf("sources entry in sync_context does not match expected: got %v, want %v", sourcesArray, expectedSources)
|
||||
}
|
||||
|
||||
// validate emits
|
||||
dataReceived := make(chan interface{})
|
||||
go func() {
|
||||
_, err := flags.Recv()
|
||||
if err != nil {
|
||||
if tc.wantStartErr {
|
||||
if err == nil {
|
||||
t.Fatal("expected error creating the service!")
|
||||
}
|
||||
return
|
||||
} else if err != nil {
|
||||
t.Fatal("unexpected error creating the service: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
dataReceived <- nil
|
||||
}()
|
||||
// when - derive a client for sync service
|
||||
serviceClient := getSyncClient(t, tc.clientCertPath, tc.socketPath, tc.tls, port, ctx)
|
||||
|
||||
// Emit as a resync
|
||||
service.Emit(true, "A")
|
||||
// then
|
||||
|
||||
select {
|
||||
case <-dataReceived:
|
||||
t.Fatal("expected no data as this is a resync")
|
||||
case <-time.After(1 * time.Second):
|
||||
break
|
||||
}
|
||||
// sync flags request
|
||||
flags, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{})
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Printf("error from sync request: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Emit as a resync
|
||||
service.Emit(false, "A")
|
||||
syncRsp, err := flags.Recv()
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Printf("stream error: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-dataReceived:
|
||||
break
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("expected data but timeout waiting for sync")
|
||||
}
|
||||
if len(syncRsp.GetFlagConfiguration()) == 0 {
|
||||
t.Error("expected non empty sync response, but got empty")
|
||||
}
|
||||
|
||||
// fetch all flags
|
||||
allRsp, err := serviceClient.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{})
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Printf("fetch all error: %v", err))
|
||||
return
|
||||
}
|
||||
// checks sync context actually set
|
||||
syncContext := syncRsp.GetSyncContext()
|
||||
if syncContext == nil {
|
||||
t.Fatal("expected sync_context in SyncFlagsResponse, but got nil")
|
||||
}
|
||||
|
||||
if allRsp.GetFlagConfiguration() != syncRsp.GetFlagConfiguration() {
|
||||
t.Errorf("expected both sync and fetch all responses to be same, but got %s from sync & %s from fetch all",
|
||||
syncRsp.GetFlagConfiguration(), allRsp.GetFlagConfiguration())
|
||||
}
|
||||
// validate emits
|
||||
dataReceived := make(chan interface{})
|
||||
go func() {
|
||||
_, err := flags.Recv()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// metadata request
|
||||
metadataRsp, err := serviceClient.GetMetadata(ctx, &v1.GetMetadataRequest{})
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Printf("metadata error: %v", err))
|
||||
return
|
||||
}
|
||||
dataReceived <- nil
|
||||
}()
|
||||
|
||||
asMap := metadataRsp.GetMetadata().AsMap()
|
||||
// make a change
|
||||
flagStore.Update(testSource1, testSource1Flags, model.Metadata{
|
||||
"keyDuped": "value",
|
||||
"keyA": "valueA",
|
||||
})
|
||||
|
||||
// expect `sources` to be present
|
||||
if asMap["sources"] == nil {
|
||||
t.Fatal("expected sources entry in the metadata, but got nil")
|
||||
}
|
||||
select {
|
||||
case <-dataReceived:
|
||||
break
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("expected data but timeout waiting for sync")
|
||||
}
|
||||
|
||||
if asMap["sources"] != "A,B,C" {
|
||||
t.Fatal("incorrect sources entry in metadata")
|
||||
}
|
||||
// fetch all flags
|
||||
allRsp, err := serviceClient.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{})
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Printf("fetch all error: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// validate shutdown from context cancellation
|
||||
go func() {
|
||||
cancelFunc()
|
||||
}()
|
||||
if allRsp.GetFlagConfiguration() != syncRsp.GetFlagConfiguration() {
|
||||
t.Errorf("expected both sync and fetch all responses to be same, but got %s from sync & %s from fetch all",
|
||||
syncRsp.GetFlagConfiguration(), allRsp.GetFlagConfiguration())
|
||||
}
|
||||
|
||||
select {
|
||||
case <-doneChan:
|
||||
// exit successful
|
||||
return
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("service did not exist within sufficient timeframe")
|
||||
}
|
||||
})
|
||||
// metadata request
|
||||
metadataRsp, err := serviceClient.GetMetadata(ctx, &v1.GetMetadataRequest{})
|
||||
|
||||
if disableSyncMetadata {
|
||||
if err == nil {
|
||||
t.Fatal(fmt.Printf("getMetadata disabled, error should not be nil"))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
asMap := metadataRsp.GetMetadata().AsMap()
|
||||
assert.NotNil(t, asMap, "expected metadata to be non-nil")
|
||||
}
|
||||
|
||||
// validate shutdown from context cancellation
|
||||
go func() {
|
||||
cancelFunc()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-doneChan:
|
||||
// exit successful
|
||||
return
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("service did not exist within sufficient timeframe")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,7 +176,7 @@ func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
|
|||
|
||||
// given
|
||||
port := 18016
|
||||
flagStore, sources := getSimpleFlagStore()
|
||||
flagStore, sources := getSimpleFlagStore(t)
|
||||
certPath := "./test-cert/server-cert.pem"
|
||||
keyPath := "./test-cert/server-key.pem"
|
||||
socketPath := ""
|
||||
|
@ -198,7 +184,7 @@ func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
|
|||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
|
||||
_, _, err := createAndStartSyncService(port, sources, flagStore, certPath, keyPath, socketPath, ctx, tc.deadline)
|
||||
_, _, err := createAndStartSyncService(port, sources, flagStore, certPath, keyPath, socketPath, ctx, tc.deadline, false)
|
||||
if err != nil {
|
||||
t.Fatal("error creating sync service")
|
||||
}
|
||||
|
@ -256,16 +242,27 @@ func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func createAndStartSyncService(port int, sources []string, store *store.State, certPath string, keyPath string, socketPath string, ctx context.Context, deadline time.Duration) (*Service, chan interface{}, error) {
|
||||
func createAndStartSyncService(
|
||||
port int,
|
||||
sources []string,
|
||||
store *store.Store,
|
||||
certPath string,
|
||||
keyPath string,
|
||||
socketPath string,
|
||||
ctx context.Context,
|
||||
deadline time.Duration,
|
||||
disableSyncMetadata bool,
|
||||
) (*Service, chan interface{}, error) {
|
||||
service, err := NewSyncService(SvcConfigurations{
|
||||
Logger: logger.NewLogger(nil, false),
|
||||
Port: uint16(port),
|
||||
Sources: sources,
|
||||
Store: store,
|
||||
CertPath: certPath,
|
||||
KeyPath: keyPath,
|
||||
SocketPath: socketPath,
|
||||
StreamDeadline: deadline,
|
||||
Logger: logger.NewLogger(nil, false),
|
||||
Port: uint16(port),
|
||||
Sources: sources,
|
||||
Store: store,
|
||||
CertPath: certPath,
|
||||
KeyPath: keyPath,
|
||||
SocketPath: socketPath,
|
||||
StreamDeadline: deadline,
|
||||
DisableSyncMetadata: disableSyncMetadata,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -279,7 +276,7 @@ func createAndStartSyncService(port int, sources []string, store *store.State, c
|
|||
}()
|
||||
// trigger manual emits matching sources, so that service can start
|
||||
for _, source := range sources {
|
||||
service.Emit(false, source)
|
||||
service.Emit(source)
|
||||
}
|
||||
return service, doneChan, err
|
||||
}
|
||||
|
|
|
@ -5,46 +5,57 @@ import (
|
|||
"crypto/x509"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/model"
|
||||
"github.com/open-feature/flagd/core/pkg/store"
|
||||
"google.golang.org/grpc/credentials"
|
||||
)
|
||||
|
||||
// getSimpleFlagStore returns a flag store pre-filled with flags from sources A & B & C, which C empty
|
||||
func getSimpleFlagStore() (*store.State, []string) {
|
||||
variants := map[string]any{
|
||||
"true": true,
|
||||
"false": false,
|
||||
}
|
||||
|
||||
flagStore := store.NewFlags()
|
||||
|
||||
flagStore.Set("flagA", model.Flag{
|
||||
var testSource1 = "testSource1"
|
||||
var testSource2 = "testSource2"
|
||||
var testVariants = map[string]any{
|
||||
"true": true,
|
||||
"false": false,
|
||||
}
|
||||
var testSource1Flags = map[string]model.Flag{
|
||||
"flagA": {
|
||||
State: "ENABLED",
|
||||
DefaultVariant: "false",
|
||||
Variants: variants,
|
||||
Source: "A",
|
||||
})
|
||||
|
||||
flagStore.Set("flagB", model.Flag{
|
||||
Variants: testVariants,
|
||||
},
|
||||
}
|
||||
var testSource2Flags = map[string]model.Flag{
|
||||
"flagB": {
|
||||
State: "ENABLED",
|
||||
DefaultVariant: "true",
|
||||
Variants: variants,
|
||||
Source: "B",
|
||||
})
|
||||
Variants: testVariants,
|
||||
},
|
||||
}
|
||||
|
||||
flagStore.MetadataPerSource["A"] = model.Metadata{
|
||||
// getSimpleFlagStore is a test util which returns a flag store pre-filled with flags from sources testSource1 and testSource2.
|
||||
func getSimpleFlagStore(t testing.TB) (*store.Store, []string) {
|
||||
t.Helper()
|
||||
|
||||
sources := []string{testSource1, testSource2}
|
||||
|
||||
flagStore, err := store.NewStore(logger.NewLogger(nil, false), sources)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating flag store: %v", err)
|
||||
}
|
||||
|
||||
flagStore.Update(testSource1, testSource1Flags, model.Metadata{
|
||||
"keyDuped": "value",
|
||||
"keyA": "valueA",
|
||||
}
|
||||
})
|
||||
|
||||
flagStore.MetadataPerSource["B"] = model.Metadata{
|
||||
flagStore.Update(testSource2, testSource2Flags, model.Metadata{
|
||||
"keyDuped": "value",
|
||||
"keyB": "valueB",
|
||||
}
|
||||
})
|
||||
|
||||
return flagStore, []string{"A", "B", "C"}
|
||||
return flagStore, sources
|
||||
}
|
||||
|
||||
func loadTLSClientCredentials(certPath string) (credentials.TransportCredentials, error) {
|
||||
|
|
|
@ -16,13 +16,13 @@ make build
|
|||
then run the `flagd` binary
|
||||
|
||||
```shell
|
||||
./bin/flagd start -f file:test-harness/symlink_testing-flags.json
|
||||
make flagd-integration-test-harness
|
||||
```
|
||||
|
||||
and finally run
|
||||
|
||||
```shell
|
||||
make integration-test
|
||||
make flagd-integration-test
|
||||
```
|
||||
|
||||
## TLS
|
||||
|
|
Loading…
Reference in New Issue