Compare commits

...

11 Commits

Author SHA1 Message Date
Todd Baert 52f344fa01
chore: fix CODEOWNERS
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-08-18 11:34:22 -04:00
Todd Baert f9ce46f103
feat: multi-project support via selectors and flagSetId namespacing (#1702)
Sorry for how big this PR seems (a lot of the change lines are not
significant (DB schema or tests), so I've tried to highlight the
important parts. This is a substantial PR that builds on our recent
changes to use go-memdb for storage. It primarily supports 2 new crucial
features:

- support for duplicate flag keys from multiple sources (namespaced by
`flagSetId`) as described in [this
ADR](https://github.com/open-feature/flagd/blob/main/docs/architecture-decisions/duplicate-flag-keys.md)
- support for a robust query syntax in the "selector" fields and
headers, as proposed by @tangenti in [this
ADR](https://github.com/open-feature/flagd/blob/main/docs/architecture-decisions/decouple-flag-source-and-set.md)

Both of these were accomplished using the new go-memdb module. **The
built-in "watcher" functionality also allows us to _completely delete_
our "mux" layer, which was responsible for fanning out changes from
sync-sources to listeners**; this is something the go-memdb module
providers for free (the ability to watch a query for changes). Now, we
completely rely on this feature for all change notifications.
Additionally, unlike before, change notifications are now scoped to
particular _selectors_ (ie: if a client is only interested in changes
for flags from `flagSetId: x` or `source: y`, they will only get change
notifications pertaining to that selection. Currently, the only
supported query fields for the `selector` are `"source"` and
`"flagSetId"`, but this functionality can easily be extended. By
default, if no `selector` is specified, the previous key-overwrite by
source priority apples (this logic has also been simplified using the
new database). Most of the new functionality is tested
[here](https://github.com/open-feature/flagd/pull/1702/files#diff-0cdd4fe716f4b8a94466279fd1b11187fcf4d74e6434727c33d57ed78c89fe27R163-R478).

Selector in action for `Sync` API:


![demo](https://github.com/user-attachments/assets/9a4fd33e-80ef-4b5b-80d1-4fa3fc92b8e7)

Selector in action for `Evaluation` API:


![demo2](https://github.com/user-attachments/assets/abf49f9c-2247-4a03-9b8f-15b6f540aad1)

To test, run the new make target `make run-flagd-selector-demo`, then
use the OFREP or gRPC endpoints to experiment. This new functionality is
available on all APIs and endpoints. The command I ran in the gifs above
are:

streaming:
```shell
grpcurl -d '{"selector":"flagSetId=example"}' -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
grpcurl -d '{"selector":"flagSetId=example,source=../config/samples/example_flags.flagd.json"}' -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
grpcurl -d '{"selector":"flagSetId=other"}' -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
```

single-evaluations:
```shell
curl -X POST  -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags' | jq
curl -X POST -H 'flagd-selector:flagSetId=other'  -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags' | jq
```

⚠️ There's no breaking changes here. Besides the new features,
there is one behavioral change - the top level "metadata" object
returned for bulk evaluations (and failed evaluations) was previously
very nonsensical in it's behavior (we basically just aggregated the
metadata from all sources, discarding duplicates, and sent it back. This
field was used by providers for telemetry purposes. Now, since flag
evaluations and subscripts are "query based" and can aggregate data from
multiple sources, we've opted to simply reflect the selector queries
contents here.

So if you used a selector like `"flagSetId=1234,source=../my/source"`,
the top-level metadata object in the response would be:

```
"metadata": {
  "flagSetId": 1234,
  "source": "../my/source"
}
```

This is useful for the provider's ability to report errors, etc in
telemetry.

Fixes: https://github.com/open-feature/flagd/issues/1675
Fixes: https://github.com/open-feature/flagd/issues/1695
Fixes: https://github.com/open-feature/flagd/issues/1611
Fixes: https://github.com/open-feature/flagd/issues/1700
Fixes: https://github.com/open-feature/flagd/issues/1610

---------

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
Co-authored-by: chrfwow <christian.lutnik@dynatrace.com>
Co-authored-by: Giovanni Liva <giovanni.liva@dynatrace.com>
2025-08-13 16:03:07 -04:00
NeaguGeorgiana23 3228ad8951
chore!: removes the `fractionalEvaluation` operator since it has been replaced with `fractional`. (#1704)
## This PR
<!-- add the description of the PR here -->

- adds this new feature

### Related Issues
- removes the `fractionalEvaluation` operator since it has been replaced
with `fractional`.

Fixes #1677

---------

Signed-off-by: NeaguGeorgiana23 <115723925+NeaguGeorgiana23@users.noreply.github.com>
2025-08-06 17:50:43 -04:00
Todd Baert 5c5c1cfe84
chore(refactor): use memdb for flag storage (#1697)
This PR contains no behavioral changes, and no breaking changes.

It lays the groundwork for some of the upcoming improvements we want in
flagd, as specified in recent ADRs. It does this by re-implementing our
storage layer to use [go-memdb](https://github.com/hashicorp/go-memdb),
an open source in-memory database developed by Hashicorp and used in
[Consul](https://developer.hashicorp.com/consul) (as well as MANY other
projects).

### Why?

This will allow us to easily support:

- duplicate flag keys (namespaced by flagSetId), as mentioned in [this
ADR](https://github.com/open-feature/flagd/blob/main/docs/architecture-decisions/duplicate-flag-keys.md)
- robust flag selectors, as mentioned in [this
ADR](https://github.com/open-feature/flagd/pull/1644), by supporting
"watchers" which allow us to "listen" to change in flags returned by a
given query 🕺
- robust (and possibly, in the future, even customizable) indexing on
arbitrary attributes (to easily support fetching "all boolean flags", or
"flags with metadata = xyz", etc)
- cross-"table" transactions

I have already PoC'd that these are all practical.

### Changes in implementation

- the `store` package's `State` is no longer just a struct; it's a
object with methods wrapping the internal db
- the `store` package's `State` was renamed to `Store` and the file was
renamed from `flags.go` to `store.go`, since it's ceased to be a simple
stateful object, and for consistency
- a non-serialized (used only internally) `Key` field was added to the
flag type (for indexing)
- a new constructor for the `Store` was added which takes a logger and
returns an error, the old was deprecated to avoid breaking changes in
consumers (the Go flagd provider, mostly)

Note that the go-memdb dependency is MPL2, which is not allowed by the
CNCF, however, go-memdb is already used in CNCF projects and has a
[special
exception](347a55dc0a/license-exceptions/cncf-exceptions-2023-08-31.json (L7-L11)).

### Perfromance

There was no significant change in performance, see benchmark diff vs
main below:

<details>
  <summary>Benchmark diff vs main</summary>

```diff
-BenchmarkFractionalEvaluation/test_c@faas.com-16         	  559051	     13832 ns/op	    7229 B/op	     135 allocs/op
-BenchmarkFractionalEvaluation/test_d@faas.com-16         	  611665	     13106 ns/op	    7229 B/op	     135 allocs/op
-BenchmarkFractionalEvaluation/test_a@faas.com-16         	  383074	     13433 ns/op	    7229 B/op	     135 allocs/op
-BenchmarkFractionalEvaluation/test_b@faas.com-16         	  529185	     12190 ns/op	    7229 B/op	     135 allocs/op
-BenchmarkResolveBooleanValue/test_staticBoolFlag-16      	 3929409	      1712 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveBooleanValue/test_targetingBoolFlag-16   	  812671	     10276 ns/op	    6065 B/op	      87 allocs/op
-BenchmarkResolveBooleanValue/test_staticObjectFlag-16    	 4398327	      1700 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveBooleanValue/test_missingFlag-16         	 4541409	      1494 ns/op	     784 B/op	      12 allocs/op
-BenchmarkResolveBooleanValue/test_disabledFlag-16        	 2998599	      1815 ns/op	    1072 B/op	      13 allocs/op
-BenchmarkResolveStringValue/test_staticStringFlag-16     	 4378830	      1698 ns/op	    1040 B/op	      13 allocs/op
-BenchmarkResolveStringValue/test_targetingStringFlag-16  	  849668	      9165 ns/op	    6097 B/op	      89 allocs/op
-BenchmarkResolveStringValue/test_staticObjectFlag-16     	 4560192	      1363 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveStringValue/test_missingFlag-16          	 5283511	      1196 ns/op	     784 B/op	      12 allocs/op
-BenchmarkResolveStringValue/test_disabledFlag-16         	 4393116	      1446 ns/op	    1072 B/op	      13 allocs/op
-BenchmarkResolveFloatValue/test:_staticFloatFlag-16      	 4264772	      1501 ns/op	    1024 B/op	      13 allocs/op
-BenchmarkResolveFloatValue/test:_targetingFloatFlag-16   	  776436	      8191 ns/op	    6081 B/op	      89 allocs/op
-BenchmarkResolveFloatValue/test:_staticObjectFlag-16     	 4685841	      1285 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveFloatValue/test:_missingFlag-16          	 5001636	      1376 ns/op	     784 B/op	      12 allocs/op
-BenchmarkResolveFloatValue/test:_disabledFlag-16         	 3707120	      1897 ns/op	    1072 B/op	      13 allocs/op
-BenchmarkResolveIntValue/test_staticIntFlag-16           	 3770362	      1677 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveIntValue/test_targetingNumberFlag-16     	  739861	     11142 ns/op	    6065 B/op	      87 allocs/op
-BenchmarkResolveIntValue/test_staticObjectFlag-16        	 4221418	      1913 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveIntValue/test_missingFlag-16             	 4289516	      1488 ns/op	     768 B/op	      12 allocs/op
-BenchmarkResolveIntValue/test_disabledFlag-16            	 4027533	      1829 ns/op	    1072 B/op	      13 allocs/op
-BenchmarkResolveObjectValue/test_staticObjectFlag-16     	 1588855	      3880 ns/op	    2243 B/op	      37 allocs/op
-BenchmarkResolveObjectValue/test_targetingObjectFlag-16  	  562364	     11580 ns/op	    7283 B/op	     109 allocs/op
-BenchmarkResolveObjectValue/test_staticBoolFlag-16       	 5026976	      1791 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveObjectValue/test_missingFlag-16          	 4254043	      1553 ns/op	     784 B/op	      12 allocs/op
-BenchmarkResolveObjectValue/test_disabledFlag-16         	 3051976	      2250 ns/op	    1072 B/op	      13 allocs/op
+BenchmarkFractionalEvaluation/test_a@faas.com-16         	  478593	     14527 ns/op	    7467 B/op	     143 allocs/op
+BenchmarkFractionalEvaluation/test_b@faas.com-16         	  429560	     14728 ns/op	    7467 B/op	     143 allocs/op
+BenchmarkFractionalEvaluation/test_c@faas.com-16         	  574078	     14230 ns/op	    7467 B/op	     143 allocs/op
+BenchmarkFractionalEvaluation/test_d@faas.com-16         	  411690	     15296 ns/op	    7467 B/op	     143 allocs/op
+BenchmarkResolveBooleanValue/test_staticBoolFlag-16      	 4133443	      1973 ns/op	     960 B/op	      18 allocs/op
+BenchmarkResolveBooleanValue/test_targetingBoolFlag-16   	  822934	     10981 ns/op	    6033 B/op	      94 allocs/op
+BenchmarkResolveBooleanValue/test_staticObjectFlag-16    	 3955728	      1964 ns/op	     976 B/op	      18 allocs/op
+BenchmarkResolveBooleanValue/test_missingFlag-16         	 3068791	      2294 ns/op	    1064 B/op	      21 allocs/op
+BenchmarkResolveBooleanValue/test_disabledFlag-16        	 3500334	      2225 ns/op	    1024 B/op	      20 allocs/op
+BenchmarkResolveStringValue/test_staticStringFlag-16     	 3935048	      1781 ns/op	    1008 B/op	      20 allocs/op
+BenchmarkResolveStringValue/test_targetingStringFlag-16  	  770565	     10765 ns/op	    6065 B/op	      96 allocs/op
+BenchmarkResolveStringValue/test_staticObjectFlag-16     	 3896060	      2084 ns/op	     976 B/op	      18 allocs/op
+BenchmarkResolveStringValue/test_missingFlag-16          	 3103950	      2141 ns/op	    1064 B/op	      21 allocs/op
+BenchmarkResolveStringValue/test_disabledFlag-16         	 3717013	      2092 ns/op	    1024 B/op	      20 allocs/op
+BenchmarkResolveFloatValue/test:_staticFloatFlag-16      	 3971438	      2003 ns/op	     976 B/op	      20 allocs/op
+BenchmarkResolveFloatValue/test:_targetingFloatFlag-16   	  782996	     10153 ns/op	    6049 B/op	      96 allocs/op
+BenchmarkResolveFloatValue/test:_staticObjectFlag-16     	 3469644	      2115 ns/op	     976 B/op	      18 allocs/op
+BenchmarkResolveFloatValue/test:_missingFlag-16          	 3376167	      2157 ns/op	    1064 B/op	      21 allocs/op
+BenchmarkResolveFloatValue/test:_disabledFlag-16         	 3610095	      2032 ns/op	    1024 B/op	      20 allocs/op
+BenchmarkResolveIntValue/test_staticIntFlag-16           	 3883299	      1941 ns/op	     960 B/op	      18 allocs/op
+BenchmarkResolveIntValue/test_targetingNumberFlag-16     	  823038	     10725 ns/op	    6033 B/op	      94 allocs/op
+BenchmarkResolveIntValue/test_staticObjectFlag-16        	 3697454	      2028 ns/op	     976 B/op	      18 allocs/op
+BenchmarkResolveIntValue/test_missingFlag-16             	 3326895	      1986 ns/op	    1048 B/op	      21 allocs/op
+BenchmarkResolveIntValue/test_disabledFlag-16            	 3327046	      2142 ns/op	    1024 B/op	      20 allocs/op
+BenchmarkResolveObjectValue/test_staticObjectFlag-16     	 1534627	      4885 ns/op	    2211 B/op	      44 allocs/op
+BenchmarkResolveObjectValue/test_targetingObjectFlag-16  	  509614	     14640 ns/op	    7251 B/op	     116 allocs/op
+BenchmarkResolveObjectValue/test_staticBoolFlag-16       	 3871867	      1978 ns/op	     960 B/op	      18 allocs/op
+BenchmarkResolveObjectValue/test_missingFlag-16          	 3484065	      2080 ns/op	    1064 B/op	      21 allocs/op
+BenchmarkResolveObjectValue/test_disabledFlag-16         	 4013230	      2158 ns/op	    1024 B/op	      20 allocs/op
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/evaluator	233.286s
+ok  	github.com/open-feature/flagd/core/pkg/evaluator	261.212s
 ?   	github.com/open-feature/flagd/core/pkg/evaluator/mock	[no test files]
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/logger	0.003s
+ok  	github.com/open-feature/flagd/core/pkg/logger	0.002s
 ?   	github.com/open-feature/flagd/core/pkg/model	[no test files]
 ?   	github.com/open-feature/flagd/core/pkg/service	[no test files]
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/service/ofrep	0.003s
+ok  	github.com/open-feature/flagd/core/pkg/service/ofrep	0.002s
 PASS
 ok  	github.com/open-feature/flagd/core/pkg/store	0.002s
 ?   	github.com/open-feature/flagd/core/pkg/sync	[no test files]
@@ -51,9 +51,9 @@ PASS
 ok  	github.com/open-feature/flagd/core/pkg/sync/builder	0.020s
 ?   	github.com/open-feature/flagd/core/pkg/sync/builder/mock	[no test files]
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/sync/file	1.007s
+ok  	github.com/open-feature/flagd/core/pkg/sync/file	1.006s
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/sync/grpc	8.014s
+ok  	github.com/open-feature/flagd/core/pkg/sync/grpc	8.013s
 PASS
 ok  	github.com/open-feature/flagd/core/pkg/sync/grpc/credentials	0.004s
 ?   	github.com/open-feature/flagd/core/pkg/sync/grpc/credentials/mock	[no test files]
@@ -61,10 +61,10 @@ ok  	github.com/open-feature/flagd/core/pkg/sync/grpc/credentials	0.004s
 PASS
 ok  	github.com/open-feature/flagd/core/pkg/sync/grpc/nameresolvers	0.002s
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/sync/http	4.008s
+ok  	github.com/open-feature/flagd/core/pkg/sync/http	4.007s
 ?   	github.com/open-feature/flagd/core/pkg/sync/http/mock	[no test files]
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/sync/kubernetes	0.015s
+ok  	github.com/open-feature/flagd/core/pkg/sync/kubernetes	0.016s
 ?   	github.com/open-feature/flagd/core/pkg/sync/testing	[no test files]
 PASS
 ok  	github.com/open-feature/flagd/core/pkg/telemetry	0.016s
```

</details>

---------

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-30 09:08:47 -04:00
Todd Baert 2f49ea7ed7
Update decouple-flag-source-and-set.md
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-29 12:18:07 -04:00
Todd Baert 2d40e28b92
Update duplicate-flag-keys.md
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-29 12:16:03 -04:00
Hugo Huang 849ce39827
docs(ADR): decouple flag sources and flag sets (#1644)
The ADR proposes a different approach to solve the problems stated in
https://github.com/open-feature/flagd/pull/1634.

It's quite concise and simple at the point, assuming the reviewers
already have the context from the original discussions.

Will add more details after the initial feedback and inputs.

---------

Signed-off-by: Hugo Huang <lorqor@gmail.com>
2025-07-29 12:14:12 -04:00
github-actions[bot] 1ee636aa03
chore: release main (#1696)
🤖 I have created a release *beep* *boop*
---


<details><summary>flagd: 0.12.9</summary>

##
[0.12.9](https://github.com/open-feature/flagd/compare/flagd/v0.12.8...flagd/v0.12.9)
(2025-07-28)


###  New Features

* Add toggle for disabling getMetadata request
([#1693](https://github.com/open-feature/flagd/issues/1693))
([e8fd680](e8fd680860))
</details>

<details><summary>core: 0.12.1</summary>

##
[0.12.1](https://github.com/open-feature/flagd/compare/core/v0.12.0...core/v0.12.1)
(2025-07-28)


### 🧹 Chore

* add back file-delete test
([#1694](https://github.com/open-feature/flagd/issues/1694))
([750aa17](750aa176b5))
* fix benchmark
([#1698](https://github.com/open-feature/flagd/issues/1698))
([5e2d7d7](5e2d7d7176))
</details>

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).

Signed-off-by: OpenFeature Bot <109696520+openfeaturebot@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-07-28 16:00:14 -04:00
Todd Baert 5e2d7d7176
chore: fix benchmark (#1698)
This was out of date. I've updated it to use the correct format, and
update the asserts accordingly.

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-28 14:30:13 -04:00
lea konvalinka e8fd680860
feat: Add toggle for disabling getMetadata request (#1693)
<!-- Please use this template for your pull request. -->
<!-- Please use the sections that you need and delete other sections -->

## This PR
<!-- add the description of the PR here -->

- adds a toggle `disable-sync-metadata` for the Sync Service to disable
or enable the deprecated `getMetadata` request

### Related Issues
[#1688 [FEATURE] Temporary Context Enrichment
toggle](https://github.com/open-feature/flagd/issues/1688)

---------

Signed-off-by: Konvalinka <lea.konvalinka@dynatrace.com>
2025-07-28 15:33:31 +02:00
Todd Baert 750aa176b5
chore: add back file-delete test (#1694)
Adds back a test that was inappropriately removed (as explained
[here](https://github.com/beeme1mr/flagd/pull/16/files#r2223712589)).

Also fixes some typos.

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-24 07:21:36 -04:00
52 changed files with 2316 additions and 2058 deletions

5
.gitignore vendored
View File

@ -20,4 +20,7 @@ site
.cache/
# coverage results
*coverage.out
*coverage.out
# benchmark results
benchmark.txt

View File

@ -1,5 +1,5 @@
{
"flagd": "0.12.8",
"flagd": "0.12.9",
"flagd-proxy": "0.8.0",
"core": "0.12.0"
"core": "0.12.1"
}

View File

@ -3,4 +3,4 @@
#
# Managed by Peribolos: https://github.com/open-feature/community/blob/main/config/open-feature/cloud-native/workgroup.yaml
#
* @open-feature/cloud-native-maintainers @open-feature/maintainers
* @open-feature/flagd-maintainers @open-feature/maintainers

View File

@ -42,7 +42,7 @@ export GOPRIVATE=buf.build/gen/go
### Manual testing
flagd has a number of interfaces (you can read more about than at [flagd.dev](https://flagd.dev/)) which can be used to evaluate flags, or deliver flag configurations so that they can be evaluated by _in-process_ providers.
flagd has a number of interfaces (you can read more about them at [flagd.dev](https://flagd.dev/)) which can be used to evaluate flags, or deliver flag configurations so that they can be evaluated by _in-process_ providers.
You can manually test this functionality by starting flagd (from the flagd/ directory) with `go run main.go start -f file:../config/samples/example_flags.flagd.json`.
@ -69,7 +69,7 @@ curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{"flagKey":"myBoolFlag"}' localhost:8013 flagd.evaluation.v1.Service/ResolveBoolean | jq
```
#### Remote bulk evaluation via via HTTP1.1/OFREP
#### Remote bulk evaluation via HTTP1.1/OFREP
```sh
# evaluates flags in bulk
@ -83,6 +83,13 @@ curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/ResolveAll | jq
```
#### Remote event streaming via gRPC
```sh
# notifies of flag changes (but does not evaluate)
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/EventStream
```
#### Flag configuration fetch via gRPC
```sh
@ -93,7 +100,7 @@ grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintex
#### Flag synchronization stream via gRPC
```sh
# will open a persistent stream which sends flag changes when the watched source is modified
# will open a persistent stream which sends flag changes when the watched source is modified
grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
```

View File

@ -47,12 +47,19 @@ test-flagd:
go test -race -covermode=atomic -cover -short ./flagd/pkg/... -coverprofile=flagd-coverage.out
test-flagd-proxy:
go test -race -covermode=atomic -cover -short ./flagd-proxy/pkg/... -coverprofile=flagd-proxy-coverage.out
flagd-integration-test: # dependent on ./bin/flagd start -f file:test-harness/flags/testing-flags.json -f file:test-harness/flags/custom-ops.json -f file:test-harness/flags/evaluator-refs.json -f file:test-harness/flags/zero-flags.json
go test -cover ./test/integration $(ARGS)
flagd-benchmark-test:
go test -bench=Bench -short -benchtime=5s -benchmem ./core/... | tee benchmark.txt
flagd-integration-test-harness:
# target used to start a locally built flagd with the e2e flags
cd flagd; go run main.go start -f file:../test-harness/flags/testing-flags.json -f file:../test-harness/flags/custom-ops.json -f file:../test-harness/flags/evaluator-refs.json -f file:../test-harness/flags/zero-flags.json -f file:../test-harness/flags/edge-case-flags.json
flagd-integration-test: # dependent on flagd-e2e-test-harness if not running in github actions
go test -count=1 -cover ./test/integration $(ARGS)
run: # default to flagd
make run-flagd
run-flagd:
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json
run-flagd-selector-demo:
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json -f file:../config/samples/example_flags.flagd.2.json
install:
cp systemd/flagd.service /etc/systemd/system/flagd.service
mkdir -p /etc/flagd

72
benchmark.txt Normal file
View File

@ -0,0 +1,72 @@
PASS
ok github.com/open-feature/flagd/core/pkg/certreloader 15.986s
goos: linux
goarch: amd64
pkg: github.com/open-feature/flagd/core/pkg/evaluator
cpu: 11th Gen Intel(R) Core(TM) i9-11950H @ 2.60GHz
BenchmarkFractionalEvaluation/test_a@faas.com-16 423930 13316 ns/op 7229 B/op 135 allocs/op
BenchmarkFractionalEvaluation/test_b@faas.com-16 469594 13677 ns/op 7229 B/op 135 allocs/op
BenchmarkFractionalEvaluation/test_c@faas.com-16 569103 13286 ns/op 7229 B/op 135 allocs/op
BenchmarkFractionalEvaluation/test_d@faas.com-16 412386 13023 ns/op 7229 B/op 135 allocs/op
BenchmarkResolveBooleanValue/test_staticBoolFlag-16 3106903 1792 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveBooleanValue/test_targetingBoolFlag-16 448164 11250 ns/op 6065 B/op 87 allocs/op
BenchmarkResolveBooleanValue/test_staticObjectFlag-16 3958750 1476 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveBooleanValue/test_missingFlag-16 5331808 1353 ns/op 784 B/op 12 allocs/op
BenchmarkResolveBooleanValue/test_disabledFlag-16 4530751 1301 ns/op 1072 B/op 13 allocs/op
BenchmarkResolveStringValue/test_staticStringFlag-16 4583056 1525 ns/op 1040 B/op 13 allocs/op
BenchmarkResolveStringValue/test_targetingStringFlag-16 839954 10388 ns/op 6097 B/op 89 allocs/op
BenchmarkResolveStringValue/test_staticObjectFlag-16 4252830 1677 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveStringValue/test_missingFlag-16 3743324 1495 ns/op 784 B/op 12 allocs/op
BenchmarkResolveStringValue/test_disabledFlag-16 3495699 1709 ns/op 1072 B/op 13 allocs/op
BenchmarkResolveFloatValue/test:_staticFloatFlag-16 4382868 1511 ns/op 1024 B/op 13 allocs/op
BenchmarkResolveFloatValue/test:_targetingFloatFlag-16 867987 10344 ns/op 6081 B/op 89 allocs/op
BenchmarkResolveFloatValue/test:_staticObjectFlag-16 3913120 1695 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveFloatValue/test:_missingFlag-16 3910468 1349 ns/op 784 B/op 12 allocs/op
BenchmarkResolveFloatValue/test:_disabledFlag-16 3642919 1666 ns/op 1072 B/op 13 allocs/op
BenchmarkResolveIntValue/test_staticIntFlag-16 4077288 1349 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveIntValue/test_targetingNumberFlag-16 922383 7601 ns/op 6065 B/op 87 allocs/op
BenchmarkResolveIntValue/test_staticObjectFlag-16 4995128 1229 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveIntValue/test_missingFlag-16 5574153 1274 ns/op 768 B/op 12 allocs/op
BenchmarkResolveIntValue/test_disabledFlag-16 3633708 1734 ns/op 1072 B/op 13 allocs/op
BenchmarkResolveObjectValue/test_staticObjectFlag-16 1624102 4559 ns/op 2243 B/op 37 allocs/op
BenchmarkResolveObjectValue/test_targetingObjectFlag-16 443880 11995 ns/op 7283 B/op 109 allocs/op
BenchmarkResolveObjectValue/test_staticBoolFlag-16 3462445 1665 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveObjectValue/test_missingFlag-16 4207567 1458 ns/op 784 B/op 12 allocs/op
BenchmarkResolveObjectValue/test_disabledFlag-16 3407262 1848 ns/op 1072 B/op 13 allocs/op
PASS
ok github.com/open-feature/flagd/core/pkg/evaluator 239.506s
? github.com/open-feature/flagd/core/pkg/evaluator/mock [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/logger 0.003s
? github.com/open-feature/flagd/core/pkg/model [no test files]
? github.com/open-feature/flagd/core/pkg/service [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/service/ofrep 0.002s
PASS
ok github.com/open-feature/flagd/core/pkg/store 0.003s
? github.com/open-feature/flagd/core/pkg/sync [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/sync/blob 0.016s
PASS
ok github.com/open-feature/flagd/core/pkg/sync/builder 0.018s
? github.com/open-feature/flagd/core/pkg/sync/builder/mock [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/sync/file 1.007s
PASS
ok github.com/open-feature/flagd/core/pkg/sync/grpc 8.011s
PASS
ok github.com/open-feature/flagd/core/pkg/sync/grpc/credentials 0.008s
? github.com/open-feature/flagd/core/pkg/sync/grpc/credentials/mock [no test files]
? github.com/open-feature/flagd/core/pkg/sync/grpc/mock [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/sync/grpc/nameresolvers 0.002s
PASS
ok github.com/open-feature/flagd/core/pkg/sync/http 4.006s
? github.com/open-feature/flagd/core/pkg/sync/http/mock [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/sync/kubernetes 0.016s
? github.com/open-feature/flagd/core/pkg/sync/testing [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/telemetry 0.016s
PASS
ok github.com/open-feature/flagd/core/pkg/utils 0.002s

View File

@ -0,0 +1,17 @@
{
"$schema": "https://flagd.dev/schema/v0/flags.json",
"metadata": {
"flagSetId": "other",
"version": "v1"
},
"flags": {
"myStringFlag": {
"state": "ENABLED",
"variants": {
"dupe1": "dupe1",
"dupe2": "dupe2"
},
"defaultVariant": "dupe1"
}
}
}

View File

@ -1,5 +1,13 @@
# Changelog
## [0.12.1](https://github.com/open-feature/flagd/compare/core/v0.12.0...core/v0.12.1) (2025-07-28)
### 🧹 Chore
* add back file-delete test ([#1694](https://github.com/open-feature/flagd/issues/1694)) ([750aa17](https://github.com/open-feature/flagd/commit/750aa176b5a8dd24a9daaff985ff6efeb084c758))
* fix benchmark ([#1698](https://github.com/open-feature/flagd/issues/1698)) ([5e2d7d7](https://github.com/open-feature/flagd/commit/5e2d7d7176ba05e667cd92acd7decb531a8de2f6))
## [0.12.0](https://github.com/open-feature/flagd/compare/core/v0.11.8...core/v0.12.0) (2025-07-21)

View File

@ -11,6 +11,8 @@ import (
)
func TestFractionalEvaluation(t *testing.T) {
const source = "testSource"
var sources = []string{source}
ctx := context.Background()
commonFlags := Flags{
@ -458,8 +460,13 @@ func TestFractionalEvaluation(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, store.NewFlags())
je.store.Flags = tt.flags.Flags
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags.Flags, model.Metadata{})
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
@ -486,6 +493,8 @@ func TestFractionalEvaluation(t *testing.T) {
}
func BenchmarkFractionalEvaluation(b *testing.B) {
const source = "testSource"
var sources = []string{source}
ctx := context.Background()
flags := Flags{
@ -508,7 +517,7 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
},
{
"fractional": [
"email",
{"var": "email"},
[
"red",
25
@ -542,41 +551,41 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
expectedReason string
expectedErrorCode string
}{
"test@faas.com": {
"test_a@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test@faas.com",
"email": "test_a@faas.com",
},
expectedVariant: "blue",
expectedValue: "#0000FF",
expectedReason: model.TargetingMatchReason,
},
"test_b@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test_b@faas.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.TargetingMatchReason,
},
"test2@faas.com": {
"test_c@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test2@faas.com",
"email": "test_c@faas.com",
},
expectedVariant: "yellow",
expectedValue: "#FFFF00",
expectedVariant: "green",
expectedValue: "#00FF00",
expectedReason: model.TargetingMatchReason,
},
"test3@faas.com": {
"test_d@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test3@faas.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.TargetingMatchReason,
},
"test4@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test4@faas.com",
"email": "test_d@faas.com",
},
expectedVariant: "blue",
expectedValue: "#0000FF",
@ -587,7 +596,13 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
for name, tt := range tests {
b.Run(name, func(b *testing.B) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, &store.State{Flags: tt.flags.Flags})
s, err := store.NewStore(log, sources)
if err != nil {
b.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags.Flags, model.Metadata{})
for i := 0; i < b.N; i++ {
value, variant, reason, _, err := resolve[string](
ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)

View File

@ -35,7 +35,7 @@ IEvaluator is an extension of IResolver, allowing storage updates and retrievals
*/
type IEvaluator interface {
GetState() (string, error)
SetState(payload sync.DataSync) (model.Metadata, bool, error)
SetState(payload sync.DataSync) (map[string]interface{}, bool, error)
IResolver
}

View File

@ -64,13 +64,13 @@ func WithEvaluator(name string, evalFunc func(interface{}, interface{}) interfac
// JSON evaluator
type JSON struct {
store *store.State
store *store.Store
Logger *logger.Logger
jsonEvalTracer trace.Tracer
Resolver
}
func NewJSON(logger *logger.Logger, s *store.State, opts ...JSONEvaluatorOption) *JSON {
func NewJSON(logger *logger.Logger, s *store.Store, opts ...JSONEvaluatorOption) *JSON {
logger = logger.WithFields(
zap.String("component", "evaluator"),
zap.String("evaluator", "json"),
@ -118,7 +118,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
var events map[string]interface{}
var reSync bool
events, reSync = je.store.Update(je.Logger, payload.Source, payload.Selector, definition.Flags, definition.Metadata)
events, reSync = je.store.Update(payload.Source, definition.Flags, definition.Metadata)
// Number of events correlates to the number of flags changed through this sync, record it
span.SetAttributes(attribute.Int("feature_flag.change_count", len(events)))
@ -139,7 +139,6 @@ func NewResolver(store store.IStore, logger *logger.Logger, jsonEvalTracer trace
jsonlogic.AddOperator(StartsWithEvaluationName, NewStringComparisonEvaluator(logger).StartsWithEvaluation)
jsonlogic.AddOperator(EndsWithEvaluationName, NewStringComparisonEvaluator(logger).EndsWithEvaluation)
jsonlogic.AddOperator(SemVerEvaluationName, NewSemVerComparison(logger).SemVerEvaluation)
jsonlogic.AddOperator(LegacyFractionEvaluationName, NewLegacyFractional(logger).LegacyFractionalEvaluation)
return Resolver{store: store, Logger: logger, tracer: jsonEvalTracer}
}
@ -150,8 +149,12 @@ func (je *Resolver) ResolveAllValues(ctx context.Context, reqID string, context
_, span := je.tracer.Start(ctx, "resolveAll")
defer span.End()
var err error
allFlags, flagSetMetadata, err := je.store.GetAll(ctx)
var selector store.Selector
s := ctx.Value(store.SelectorContextKey{})
if s != nil {
selector = s.(store.Selector)
}
allFlags, flagSetMetadata, err := je.store.GetAll(ctx, &selector)
if err != nil {
return nil, flagSetMetadata, fmt.Errorf("error retreiving flags from the store: %w", err)
}
@ -302,19 +305,19 @@ func resolve[T constraints](ctx context.Context, reqID string, key string, conte
func (je *Resolver) evaluateVariant(ctx context.Context, reqID string, flagKey string, evalCtx map[string]any) (
variant string, variants map[string]interface{}, reason string, metadata map[string]interface{}, err error,
) {
flag, metadata, ok := je.store.Get(ctx, flagKey)
if !ok {
var selector store.Selector
s := ctx.Value(store.SelectorContextKey{})
if s != nil {
selector = s.(store.Selector)
}
flag, metadata, err := je.store.Get(ctx, flagKey, &selector)
if err != nil {
// flag not found
je.Logger.DebugWithID(reqID, fmt.Sprintf("requested flag could not be found: %s", flagKey))
return "", map[string]interface{}{}, model.ErrorReason, metadata, errors.New(model.FlagNotFoundErrorCode)
}
// add selector to evaluation metadata
selector := je.store.SelectorForFlag(ctx, flag)
if selector != "" {
metadata[SelectorMetadataKey] = selector
}
for key, value := range flag.Metadata {
// If value is not nil or empty, copy to metadata
if value != nil {

View File

@ -383,7 +383,7 @@ var Flags = fmt.Sprintf(`{
func TestGetState_Valid_ContainsFlag(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags, Source: "testSource"})
if err != nil {
t.Fatalf("Expected no error")
}
@ -405,7 +405,7 @@ func TestSetState_Invalid_Error(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
// set state with an invalid flag definition
_, _, err := evaluator.SetState(sync.DataSync{FlagData: InvalidFlags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: InvalidFlags, Source: "testSource"})
if err != nil {
t.Fatalf("unexpected error")
}
@ -415,7 +415,7 @@ func TestSetState_Valid_NoError(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
// set state with a valid flag definition
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -423,7 +423,7 @@ func TestSetState_Valid_NoError(t *testing.T) {
func TestResolveAllValues(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -476,62 +476,6 @@ func TestResolveAllValues(t *testing.T) {
}
}
func TestMetadataResolveType(t *testing.T) {
tests := []struct {
flagKey string
metadata model.Metadata
}{
{StaticBoolFlag, model.Metadata{"flagSetId": FlagSetID, "version": Version}},
{MetadataFlag, model.Metadata{"flagSetId": FlagSetID, "version": VersionOverride}},
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
if err != nil {
t.Fatalf("expected no error")
}
for _, test := range tests {
_, _, _, metadata, _ := evaluator.ResolveBooleanValue(context.TODO(), reqID, test.flagKey, nil)
if !reflect.DeepEqual(test.metadata, metadata) {
t.Errorf("expected metadata to be %v, but got %v", test.metadata, metadata)
}
}
}
func TestMetadataResolveAll(t *testing.T) {
expectedFlagSetMetadata := model.Metadata{"flagSetId": FlagSetID, "version": Version}
tests := []struct {
flagKey string
metadata model.Metadata
}{
{StaticBoolFlag, model.Metadata{"flagSetId": FlagSetID, "version": Version}},
{MetadataFlag, model.Metadata{"flagSetId": FlagSetID, "version": VersionOverride}},
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
if err != nil {
t.Fatalf("expected no error")
}
for _, test := range tests {
resolutions, flagSetMetadata, _ := evaluator.ResolveAllValues(context.TODO(), reqID, nil)
for _, resolved := range resolutions {
if resolved.FlagKey == test.flagKey {
if !reflect.DeepEqual(test.metadata, resolved.Metadata) {
t.Errorf("expected flag metadata to be %v, but got %v", test.metadata, resolved.Metadata)
}
}
}
if !reflect.DeepEqual(expectedFlagSetMetadata, flagSetMetadata) {
t.Errorf("expected flag set metadata to be %v, but got %v", expectedFlagSetMetadata, flagSetMetadata)
}
}
}
func TestResolveBooleanValue(t *testing.T) {
tests := []struct {
flagKey string
@ -548,7 +492,7 @@ func TestResolveBooleanValue(t *testing.T) {
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -583,7 +527,7 @@ func BenchmarkResolveBooleanValue(b *testing.B) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
b.Fatalf("expected no error")
}
@ -623,7 +567,7 @@ func TestResolveStringValue(t *testing.T) {
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -659,7 +603,7 @@ func BenchmarkResolveStringValue(b *testing.B) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
b.Fatalf("expected no error")
}
@ -699,7 +643,7 @@ func TestResolveFloatValue(t *testing.T) {
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -735,7 +679,7 @@ func BenchmarkResolveFloatValue(b *testing.B) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
b.Fatalf("expected no error")
}
@ -775,7 +719,7 @@ func TestResolveIntValue(t *testing.T) {
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -811,7 +755,7 @@ func BenchmarkResolveIntValue(b *testing.B) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
b.Fatalf("expected no error")
}
@ -851,7 +795,7 @@ func TestResolveObjectValue(t *testing.T) {
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -890,7 +834,7 @@ func BenchmarkResolveObjectValue(b *testing.B) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
b.Fatalf("expected no error")
}
@ -935,7 +879,7 @@ func TestResolveAsAnyValue(t *testing.T) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -971,7 +915,7 @@ func TestResolve_DefaultVariant(t *testing.T) {
for _, test := range tests {
t.Run("", func(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: test.flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: test.flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
@ -1038,7 +982,7 @@ func TestSetState_DefaultVariantValidation(t *testing.T) {
t.Run(name, func(t *testing.T) {
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.jsonFlags})
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.jsonFlags, Source: "testSource"})
if tt.valid && err != nil {
t.Error(err)
@ -1097,7 +1041,7 @@ func TestState_Evaluator(t *testing.T) {
},
"defaultVariant": "recursive",
"state": "ENABLED",
"source":"",
"source":"testSource",
"selector":"",
"targeting": {
"if": [
@ -1157,7 +1101,7 @@ func TestState_Evaluator(t *testing.T) {
},
"defaultVariant": "recursive",
"state": "ENABLED",
"source":"",
"source":"testSource",
"selector":"",
"targeting": {
"if": [
@ -1233,7 +1177,7 @@ func TestState_Evaluator(t *testing.T) {
"off": false
},
"defaultVariant": "off",
"source":"",
"source":"testSource",
"targeting": {
"if": [
{
@ -1267,7 +1211,7 @@ func TestState_Evaluator(t *testing.T) {
},
"defaultVariant": "recursive",
"state": "ENABLED",
"source":"",
"source":"testSource",
"selector":"",
"targeting": {
"if": [
@ -1286,7 +1230,7 @@ func TestState_Evaluator(t *testing.T) {
"off": false
},
"defaultVariant": "off",
"source":"",
"source":"testSource",
"selector":"",
"targeting": {
"if": [
@ -1344,7 +1288,7 @@ func TestState_Evaluator(t *testing.T) {
t.Run(name, func(t *testing.T) {
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, resync, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.inputState})
_, resync, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.inputState, Source: "testSource"})
if err != nil {
if !tt.expectedError {
t.Error(err)
@ -1377,8 +1321,8 @@ func TestState_Evaluator(t *testing.T) {
t.Fatal(err)
}
if !reflect.DeepEqual(expectedOutputJSON["flags"], gotOutputJSON["flags"]) {
t.Errorf("expected state: %v got state: %v", expectedOutputJSON, gotOutputJSON)
if !reflect.DeepEqual(expectedOutputJSON["flags"], gotOutputJSON) {
t.Errorf("expected state: %v got state: %v", expectedOutputJSON["flags"], gotOutputJSON)
}
})
}
@ -1451,7 +1395,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
t.Run(name, func(t *testing.T) {
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatal(err)
}
@ -1474,7 +1418,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
errChan <- nil
return
default:
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
errChan <- err
return
@ -1516,7 +1460,7 @@ func TestFlagdAmbientProperties(t *testing.T) {
t.Run("flagKeyIsInTheContext", func(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
"flags": {
"welcome-banner": {
"state": "ENABLED",
@ -1556,7 +1500,7 @@ func TestFlagdAmbientProperties(t *testing.T) {
t.Run("timestampIsInTheContext", func(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
"flags": {
"welcome-banner": {
"state": "ENABLED",
@ -1590,7 +1534,7 @@ func TestTargetingVariantBehavior(t *testing.T) {
t.Run("missing variant error", func(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
"flags": {
"missing-variant": {
"state": "ENABLED",
@ -1618,7 +1562,7 @@ func TestTargetingVariantBehavior(t *testing.T) {
t.Run("null fallback", func(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
"flags": {
"null-fallback": {
"state": "ENABLED",
@ -1651,7 +1595,7 @@ func TestTargetingVariantBehavior(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
//nolint:dupword
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
"flags": {
"match-boolean": {
"state": "ENABLED",

View File

@ -1,145 +0,0 @@
// This evaluation type is deprecated and will be removed before v1.
// Do not enhance it or use it for reference.
package evaluator
import (
"errors"
"fmt"
"math"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/zeebo/xxh3"
)
const (
LegacyFractionEvaluationName = "fractionalEvaluation"
LegacyFractionEvaluationLink = "https://flagd.dev/concepts/#migrating-from-legacy-fractionalevaluation"
)
// Deprecated: LegacyFractional is deprecated. This will be removed prior to v1 release.
type LegacyFractional struct {
Logger *logger.Logger
}
type legacyFractionalEvaluationDistribution struct {
variant string
percentage int
}
func NewLegacyFractional(logger *logger.Logger) *LegacyFractional {
return &LegacyFractional{Logger: logger}
}
func (fe *LegacyFractional) LegacyFractionalEvaluation(values, data interface{}) interface{} {
fe.Logger.Warn(
fmt.Sprintf("%s is deprecated, please use %s, see: %s",
LegacyFractionEvaluationName,
FractionEvaluationName,
LegacyFractionEvaluationLink))
valueToDistribute, feDistributions, err := parseLegacyFractionalEvaluationData(values, data)
if err != nil {
fe.Logger.Error(fmt.Sprintf("parse fractional evaluation data: %v", err))
return nil
}
return distributeLegacyValue(valueToDistribute, feDistributions)
}
func parseLegacyFractionalEvaluationData(values, data interface{}) (string,
[]legacyFractionalEvaluationDistribution, error,
) {
valuesArray, ok := values.([]interface{})
if !ok {
return "", nil, errors.New("fractional evaluation data is not an array")
}
if len(valuesArray) < 2 {
return "", nil, errors.New("fractional evaluation data has length under 2")
}
bucketBy, ok := valuesArray[0].(string)
if !ok {
return "", nil, errors.New("first element of fractional evaluation data isn't of type string")
}
dataMap, ok := data.(map[string]interface{})
if !ok {
return "", nil, errors.New("data isn't of type map[string]interface{}")
}
v, ok := dataMap[bucketBy]
if !ok {
return "", nil, nil
}
valueToDistribute, ok := v.(string)
if !ok {
return "", nil, fmt.Errorf("var: %s isn't of type string", bucketBy)
}
feDistributions, err := parseLegacyFractionalEvaluationDistributions(valuesArray)
if err != nil {
return "", nil, err
}
return valueToDistribute, feDistributions, nil
}
func parseLegacyFractionalEvaluationDistributions(values []interface{}) (
[]legacyFractionalEvaluationDistribution, error,
) {
sumOfPercentages := 0
var feDistributions []legacyFractionalEvaluationDistribution
for i := 1; i < len(values); i++ {
distributionArray, ok := values[i].([]interface{})
if !ok {
return nil, errors.New("distribution elements aren't of type []interface{}")
}
if len(distributionArray) != 2 {
return nil, errors.New("distribution element isn't length 2")
}
variant, ok := distributionArray[0].(string)
if !ok {
return nil, errors.New("first element of distribution element isn't string")
}
percentage, ok := distributionArray[1].(float64)
if !ok {
return nil, errors.New("second element of distribution element isn't float")
}
sumOfPercentages += int(percentage)
feDistributions = append(feDistributions, legacyFractionalEvaluationDistribution{
variant: variant,
percentage: int(percentage),
})
}
if sumOfPercentages != 100 {
return nil, fmt.Errorf("percentages must sum to 100, got: %d", sumOfPercentages)
}
return feDistributions, nil
}
func distributeLegacyValue(value string, feDistribution []legacyFractionalEvaluationDistribution) string {
hashValue := xxh3.HashString(value)
hashRatio := float64(hashValue) / math.Pow(2, 64) // divide the hash value by the largest possible value, integer 2^64
bucket := int(hashRatio * 100) // integer in range [0, 99]
rangeEnd := 0
for _, dist := range feDistribution {
rangeEnd += dist.percentage
if bucket < rangeEnd {
return dist.variant
}
}
return ""
}

View File

@ -1,300 +0,0 @@
package evaluator
import (
"context"
"testing"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/store"
)
func TestLegacyFractionalEvaluation(t *testing.T) {
ctx := context.Background()
flags := Flags{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
Variants: map[string]any{
"red": "#FF0000",
"blue": "#0000FF",
"green": "#00FF00",
"yellow": "#FFFF00",
},
Targeting: []byte(`{
"if": [
{
"in": ["@faas.com", {
"var": ["email"]
}]
},
{
"fractionalEvaluation": [
"email",
[
"red",
25
],
[
"blue",
25
],
[
"green",
25
],
[
"yellow",
25
]
]
}, null
]
}`),
},
},
}
tests := map[string]struct {
flags Flags
flagKey string
context map[string]any
expectedValue string
expectedVariant string
expectedReason string
expectedErrorCode string
}{
"test@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test@faas.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.TargetingMatchReason,
},
"test2@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test2@faas.com",
},
expectedVariant: "yellow",
expectedValue: "#FFFF00",
expectedReason: model.TargetingMatchReason,
},
"test3@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test3@faas.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.TargetingMatchReason,
},
"test4@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test4@faas.com",
},
expectedVariant: "blue",
expectedValue: "#0000FF",
expectedReason: model.TargetingMatchReason,
},
"non even split": {
flags: Flags{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
Variants: map[string]any{
"red": "#FF0000",
"blue": "#0000FF",
"green": "#00FF00",
"yellow": "#FFFF00",
},
Targeting: []byte(`{
"if": [
{
"in": ["@faas.com", {
"var": ["email"]
}]
},
{
"fractionalEvaluation": [
"email",
[
"red",
50
],
[
"blue",
25
],
[
"green",
25
]
]
}, null
]
}`),
},
},
},
flagKey: "headerColor",
context: map[string]any{
"email": "test4@faas.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.TargetingMatchReason,
},
"fallback to default variant if no email provided": {
flags: Flags{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
Variants: map[string]any{
"red": "#FF0000",
"blue": "#0000FF",
"green": "#00FF00",
"yellow": "#FFFF00",
},
Targeting: []byte(`{
"fractionalEvaluation": [
"email",
[
"red",
25
],
[
"blue",
25
],
[
"green",
25
],
[
"yellow",
25
]
]
}`),
},
},
},
flagKey: "headerColor",
context: map[string]any{},
expectedVariant: "",
expectedValue: "",
expectedReason: model.ErrorReason,
expectedErrorCode: model.GeneralErrorCode,
},
"fallback to default variant if invalid variant as result of fractional evaluation": {
flags: Flags{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
Variants: map[string]any{
"red": "#FF0000",
"blue": "#0000FF",
"green": "#00FF00",
"yellow": "#FFFF00",
},
Targeting: []byte(`{
"fractionalEvaluation": [
"email",
[
"black",
100
]
]
}`),
},
},
},
flagKey: "headerColor",
context: map[string]any{
"email": "foo@foo.com",
},
expectedVariant: "",
expectedValue: "",
expectedReason: model.ErrorReason,
expectedErrorCode: model.GeneralErrorCode,
},
"fallback to default variant if percentages don't sum to 100": {
flags: Flags{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
Variants: map[string]any{
"red": "#FF0000",
"blue": "#0000FF",
"green": "#00FF00",
"yellow": "#FFFF00",
},
Targeting: []byte(`{
"fractionalEvaluation": [
"email",
[
"red",
25
],
[
"blue",
25
]
]
}`),
},
},
},
flagKey: "headerColor",
context: map[string]any{
"email": "foo@foo.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.DefaultReason,
},
}
const reqID = "default"
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, store.NewFlags())
je.store.Flags = tt.flags.Flags
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
if value != tt.expectedValue {
t.Errorf("expected value '%s', got '%s'", tt.expectedValue, value)
}
if variant != tt.expectedVariant {
t.Errorf("expected variant '%s', got '%s'", tt.expectedVariant, variant)
}
if reason != tt.expectedReason {
t.Errorf("expected reason '%s', got '%s'", tt.expectedReason, reason)
}
if err != nil {
errorCode := err.Error()
if errorCode != tt.expectedErrorCode {
t.Errorf("expected err '%v', got '%v'", tt.expectedErrorCode, err)
}
}
})
}
}

View File

@ -316,6 +316,8 @@ func TestSemVerOperator_Compare(t *testing.T) {
}
func TestJSONEvaluator_semVerEvaluation(t *testing.T) {
const source = "testSource"
var sources = []string{source}
ctx := context.Background()
tests := map[string]struct {
@ -922,8 +924,12 @@ func TestJSONEvaluator_semVerEvaluation(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, store.NewFlags())
je.store.Flags = tt.flags.Flags
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags.Flags, model.Metadata{})
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)

View File

@ -13,6 +13,8 @@ import (
)
func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
const source = "testSource"
var sources = []string{source}
ctx := context.Background()
tests := map[string]struct {
@ -185,8 +187,12 @@ func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, store.NewFlags())
je.store.Flags = tt.flags.Flags
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags.Flags, model.Metadata{})
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
@ -210,6 +216,8 @@ func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
}
func TestJSONEvaluator_endsWithEvaluation(t *testing.T) {
const source = "testSource"
var sources = []string{source}
ctx := context.Background()
tests := map[string]struct {
@ -382,9 +390,12 @@ func TestJSONEvaluator_endsWithEvaluation(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, store.NewFlags())
je.store.Flags = tt.flags.Flags
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags.Flags, model.Metadata{})
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)

View File

@ -2,7 +2,15 @@ package model
import "encoding/json"
const Key = "Key"
const FlagSetId = "FlagSetId"
const Source = "Source"
const Priority = "Priority"
type Flag struct {
Key string `json:"-"` // not serialized, used only for indexing
FlagSetId string `json:"-"` // not serialized, used only for indexing
Priority int `json:"-"` // not serialized, used only for indexing
State string `json:"state"`
DefaultVariant string `json:"defaultVariant"`
Variants map[string]any `json:"variants"`

View File

@ -0,0 +1,52 @@
package notifications
import (
"reflect"
"github.com/open-feature/flagd/core/pkg/model"
)
const typeField = "type"
// Use to represent change notifications for mode PROVIDER_CONFIGURATION_CHANGE events.
type Notifications map[string]any
// Generate notifications (deltas) from old and new flag sets for use in RPC mode PROVIDER_CONFIGURATION_CHANGE events.
func NewFromFlags(oldFlags, newFlags map[string]model.Flag) Notifications {
notifications := map[string]interface{}{}
// flags removed
for key := range oldFlags {
if _, ok := newFlags[key]; !ok {
notifications[key] = map[string]interface{}{
typeField: string(model.NotificationDelete),
}
}
}
// flags added or modified
for key, newFlag := range newFlags {
oldFlag, exists := oldFlags[key]
if !exists {
notifications[key] = map[string]interface{}{
typeField: string(model.NotificationCreate),
}
} else if !flagsEqual(oldFlag, newFlag) {
notifications[key] = map[string]interface{}{
typeField: string(model.NotificationUpdate),
}
}
}
return notifications
}
func flagsEqual(a, b model.Flag) bool {
return a.State == b.State &&
a.DefaultVariant == b.DefaultVariant &&
reflect.DeepEqual(a.Variants, b.Variants) &&
reflect.DeepEqual(a.Targeting, b.Targeting) &&
a.Source == b.Source &&
a.Selector == b.Selector &&
reflect.DeepEqual(a.Metadata, b.Metadata)
}

View File

@ -0,0 +1,102 @@
package notifications
import (
"testing"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/stretchr/testify/assert"
)
func TestNewFromFlags(t *testing.T) {
flagA := model.Flag{
Key: "flagA",
State: "ENABLED",
DefaultVariant: "on",
Source: "source1",
}
flagAUpdated := model.Flag{
Key: "flagA",
State: "DISABLED",
DefaultVariant: "on",
Source: "source1",
}
flagB := model.Flag{
Key: "flagB",
State: "ENABLED",
DefaultVariant: "off",
Source: "source1",
}
tests := []struct {
name string
oldFlags map[string]model.Flag
newFlags map[string]model.Flag
want Notifications
}{
{
name: "flag added",
oldFlags: map[string]model.Flag{},
newFlags: map[string]model.Flag{"flagA": flagA},
want: Notifications{
"flagA": map[string]interface{}{
"type": string(model.NotificationCreate),
},
},
},
{
name: "flag deleted",
oldFlags: map[string]model.Flag{"flagA": flagA},
newFlags: map[string]model.Flag{},
want: Notifications{
"flagA": map[string]interface{}{
"type": string(model.NotificationDelete),
},
},
},
{
name: "flag changed",
oldFlags: map[string]model.Flag{"flagA": flagA},
newFlags: map[string]model.Flag{"flagA": flagAUpdated},
want: Notifications{
"flagA": map[string]interface{}{
"type": string(model.NotificationUpdate),
},
},
},
{
name: "flag unchanged",
oldFlags: map[string]model.Flag{"flagA": flagA},
newFlags: map[string]model.Flag{"flagA": flagA},
want: Notifications{},
},
{
name: "mixed changes",
oldFlags: map[string]model.Flag{
"flagA": flagA,
"flagB": flagB,
},
newFlags: map[string]model.Flag{
"flagA": flagAUpdated, // updated
"flagC": flagA, // added
},
want: Notifications{
"flagA": map[string]interface{}{
"type": string(model.NotificationUpdate),
},
"flagB": map[string]interface{}{
"type": string(model.NotificationDelete),
},
"flagC": map[string]interface{}{
"type": string(model.NotificationCreate),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := NewFromFlags(tt.oldFlags, tt.newFlags)
assert.Equal(t, tt.want, got)
})
}
}

View File

@ -1,253 +0,0 @@
package store
import (
"context"
"encoding/json"
"fmt"
"maps"
"reflect"
"sync"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
)
type del = struct{}
var deleteMarker *del
type IStore interface {
GetAll(ctx context.Context) (map[string]model.Flag, model.Metadata, error)
Get(ctx context.Context, key string) (model.Flag, model.Metadata, bool)
SelectorForFlag(ctx context.Context, flag model.Flag) string
}
type State struct {
mx sync.RWMutex
Flags map[string]model.Flag `json:"flags"`
FlagSources []string
SourceDetails map[string]SourceDetails `json:"sourceMetadata,omitempty"`
MetadataPerSource map[string]model.Metadata `json:"metadata,omitempty"`
}
type SourceDetails struct {
Source string
Selector string
}
func (f *State) hasPriority(stored string, new string) bool {
if stored == new {
return true
}
for i := len(f.FlagSources) - 1; i >= 0; i-- {
switch f.FlagSources[i] {
case stored:
return false
case new:
return true
}
}
return true
}
func NewFlags() *State {
return &State{
Flags: map[string]model.Flag{},
SourceDetails: map[string]SourceDetails{},
MetadataPerSource: map[string]model.Metadata{},
}
}
func (f *State) Set(key string, flag model.Flag) {
f.mx.Lock()
defer f.mx.Unlock()
f.Flags[key] = flag
}
func (f *State) Get(_ context.Context, key string) (model.Flag, model.Metadata, bool) {
f.mx.RLock()
defer f.mx.RUnlock()
metadata := f.getMetadata()
flag, ok := f.Flags[key]
if ok {
metadata = f.GetMetadataForSource(flag.Source)
}
return flag, metadata, ok
}
func (f *State) SelectorForFlag(_ context.Context, flag model.Flag) string {
f.mx.RLock()
defer f.mx.RUnlock()
return f.SourceDetails[flag.Source].Selector
}
func (f *State) Delete(key string) {
f.mx.Lock()
defer f.mx.Unlock()
delete(f.Flags, key)
}
func (f *State) String() (string, error) {
f.mx.RLock()
defer f.mx.RUnlock()
bytes, err := json.Marshal(f)
if err != nil {
return "", fmt.Errorf("unable to marshal flags: %w", err)
}
return string(bytes), nil
}
// GetAll returns a copy of the store's state (copy in order to be concurrency safe)
func (f *State) GetAll(_ context.Context) (map[string]model.Flag, model.Metadata, error) {
f.mx.RLock()
defer f.mx.RUnlock()
flags := make(map[string]model.Flag, len(f.Flags))
for key, flag := range f.Flags {
flags[key] = flag
}
return flags, f.getMetadata(), nil
}
// Add new flags from source.
func (f *State) Add(logger *logger.Logger, source string, selector string, flags map[string]model.Flag,
) map[string]interface{} {
notifications := map[string]interface{}{}
for k, newFlag := range flags {
storedFlag, _, ok := f.Get(context.Background(), k)
if ok && !f.hasPriority(storedFlag.Source, source) {
logger.Debug(
fmt.Sprintf(
"not overwriting: flag %s from source %s does not have priority over %s",
k,
source,
storedFlag.Source,
),
)
continue
}
notifications[k] = map[string]interface{}{
"type": string(model.NotificationCreate),
"source": source,
}
// Store the new version of the flag
newFlag.Source = source
newFlag.Selector = selector
f.Set(k, newFlag)
}
return notifications
}
// Update the flag state with the provided flags.
func (f *State) Update(
logger *logger.Logger,
source string,
selector string,
flags map[string]model.Flag,
metadata model.Metadata,
) (map[string]interface{}, bool) {
notifications := map[string]interface{}{}
resyncRequired := false
f.mx.Lock()
f.setSourceMetadata(source, metadata)
for k, v := range f.Flags {
if v.Source == source && v.Selector == selector {
if _, ok := flags[k]; !ok {
// flag has been deleted
delete(f.Flags, k)
notifications[k] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}
resyncRequired = true
logger.Debug(
fmt.Sprintf(
"store resync triggered: flag %s has been deleted from source %s",
k, source,
),
)
continue
}
}
}
f.mx.Unlock()
for k, newFlag := range flags {
newFlag.Source = source
newFlag.Selector = selector
storedFlag, _, ok := f.Get(context.Background(), k)
if ok {
if !f.hasPriority(storedFlag.Source, source) {
logger.Debug(
fmt.Sprintf(
"not merging: flag %s from source %s does not have priority over %s",
k, source, storedFlag.Source,
),
)
continue
}
if reflect.DeepEqual(storedFlag, newFlag) {
continue
}
}
if !ok {
notifications[k] = map[string]interface{}{
"type": string(model.NotificationCreate),
"source": source,
}
} else {
notifications[k] = map[string]interface{}{
"type": string(model.NotificationUpdate),
"source": source,
}
}
// Store the new version of the flag
f.Set(k, newFlag)
}
return notifications, resyncRequired
}
func (f *State) GetMetadataForSource(source string) model.Metadata {
perSource, ok := f.MetadataPerSource[source]
if ok && perSource != nil {
return maps.Clone(perSource)
}
return model.Metadata{}
}
func (f *State) getMetadata() model.Metadata {
metadata := model.Metadata{}
for _, perSource := range f.MetadataPerSource {
for key, entry := range perSource {
_, exists := metadata[key]
if !exists {
metadata[key] = entry
} else {
metadata[key] = deleteMarker
}
}
}
// keys that exist across multiple sources are deleted
maps.DeleteFunc(metadata, func(key string, _ interface{}) bool {
return metadata[key] == deleteMarker
})
return metadata
}
func (f *State) setSourceMetadata(source string, metadata model.Metadata) {
if f.MetadataPerSource == nil {
f.MetadataPerSource = map[string]model.Metadata{}
}
f.MetadataPerSource[source] = metadata
}

View File

@ -1,326 +0,0 @@
package store
import (
"reflect"
"testing"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/stretchr/testify/require"
)
func TestHasPriority(t *testing.T) {
tests := []struct {
name string
currentState *State
storedSource string
newSource string
hasPriority bool
}{
{
name: "same source",
currentState: &State{},
storedSource: "A",
newSource: "A",
hasPriority: true,
},
{
name: "no priority",
currentState: &State{
FlagSources: []string{
"B",
"A",
},
},
storedSource: "A",
newSource: "B",
hasPriority: false,
},
{
name: "priority",
currentState: &State{
FlagSources: []string{
"A",
"B",
},
},
storedSource: "A",
newSource: "B",
hasPriority: true,
},
{
name: "not in sources",
currentState: &State{
FlagSources: []string{
"A",
"B",
},
},
storedSource: "C",
newSource: "D",
hasPriority: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
p := tt.currentState.hasPriority(tt.storedSource, tt.newSource)
require.Equal(t, p, tt.hasPriority)
})
}
}
func TestMergeFlags(t *testing.T) {
t.Parallel()
tests := []struct {
name string
current *State
new map[string]model.Flag
newSource string
newSelector string
want *State
wantNotifs map[string]interface{}
wantResync bool
}{
{
name: "both nil",
current: &State{Flags: nil},
new: nil,
want: &State{Flags: nil},
wantNotifs: map[string]interface{}{},
},
{
name: "both empty flags",
current: &State{Flags: map[string]model.Flag{}},
new: map[string]model.Flag{},
want: &State{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{},
},
{
name: "empty new",
current: &State{Flags: map[string]model.Flag{}},
new: nil,
want: &State{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{},
},
{
name: "merging with new source",
current: &State{
Flags: map[string]model.Flag{
"waka": {
DefaultVariant: "off",
Source: "1",
},
},
},
new: map[string]model.Flag{
"paka": {
DefaultVariant: "on",
},
},
newSource: "2",
want: &State{Flags: map[string]model.Flag{
"waka": {
DefaultVariant: "off",
Source: "1",
},
"paka": {
DefaultVariant: "on",
Source: "2",
},
}},
wantNotifs: map[string]interface{}{"paka": map[string]interface{}{"type": "write", "source": "2"}},
},
{
name: "override by new update",
current: &State{Flags: map[string]model.Flag{
"waka": {DefaultVariant: "off"},
"paka": {DefaultVariant: "off"},
}},
new: map[string]model.Flag{
"waka": {DefaultVariant: "on"},
"paka": {DefaultVariant: "on"},
},
want: &State{Flags: map[string]model.Flag{
"waka": {DefaultVariant: "on"},
"paka": {DefaultVariant: "on"},
}},
wantNotifs: map[string]interface{}{
"waka": map[string]interface{}{"type": "update", "source": ""},
"paka": map[string]interface{}{"type": "update", "source": ""},
},
},
{
name: "identical update so empty notifications",
current: &State{
Flags: map[string]model.Flag{"hello": {DefaultVariant: "off"}},
},
new: map[string]model.Flag{
"hello": {DefaultVariant: "off"},
},
want: &State{Flags: map[string]model.Flag{
"hello": {DefaultVariant: "off"},
}},
wantNotifs: map[string]interface{}{},
},
{
name: "deleted flag & trigger resync for same source",
current: &State{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A"}}},
new: map[string]model.Flag{},
newSource: "A",
want: &State{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{"hello": map[string]interface{}{"type": "delete", "source": "A"}},
wantResync: true,
},
{
name: "no deleted & no resync for same source but different selector",
current: &State{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A", Selector: "X"}}},
new: map[string]model.Flag{},
newSource: "A",
newSelector: "Y",
want: &State{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A", Selector: "X"}}},
wantResync: false,
wantNotifs: map[string]interface{}{},
},
{
name: "no merge due to low priority",
current: &State{
FlagSources: []string{
"B",
"A",
},
Flags: map[string]model.Flag{
"hello": {
DefaultVariant: "off",
Source: "A",
},
},
},
new: map[string]model.Flag{"hello": {DefaultVariant: "off"}},
newSource: "B",
want: &State{
FlagSources: []string{
"B",
"A",
},
Flags: map[string]model.Flag{
"hello": {
DefaultVariant: "off",
Source: "A",
},
},
},
wantNotifs: map[string]interface{}{},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
gotNotifs, resyncRequired := tt.current.Update(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new, model.Metadata{})
require.True(t, reflect.DeepEqual(tt.want.Flags, tt.current.Flags))
require.Equal(t, tt.wantNotifs, gotNotifs)
require.Equal(t, tt.wantResync, resyncRequired)
})
}
}
func TestFlags_Add(t *testing.T) {
mockLogger := logger.NewLogger(nil, false)
mockSource := "source"
mockOverrideSource := "source-2"
type request struct {
source string
selector string
flags map[string]model.Flag
}
tests := []struct {
name string
storedState *State
addRequest request
expectedState *State
expectedNotificationKeys []string
}{
{
name: "Add success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
addRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"B": {Source: mockSource},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
},
},
expectedNotificationKeys: []string{"B"},
},
{
name: "Add multiple success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
addRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"B": {Source: mockSource},
"C": {Source: mockSource},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource},
},
},
expectedNotificationKeys: []string{"B", "C"},
},
{
name: "Add success - conflict and override",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
addRequest: request{
source: mockOverrideSource,
flags: map[string]model.Flag{
"A": {Source: mockOverrideSource},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockOverrideSource},
},
},
expectedNotificationKeys: []string{"A"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := tt.storedState.Add(mockLogger, tt.addRequest.source, tt.addRequest.selector, tt.addRequest.flags)
require.Equal(t, tt.storedState, tt.expectedState)
for k := range messages {
require.Containsf(t, tt.expectedNotificationKeys, k,
"Message key %s not present in the expected key list", k)
}
})
}
}

133
core/pkg/store/query.go Normal file
View File

@ -0,0 +1,133 @@
package store
import (
"maps"
"sort"
"strings"
uuid "github.com/google/uuid"
"github.com/open-feature/flagd/core/pkg/model"
)
// flags table and index constants
const flagsTable = "flags"
const idIndex = "id"
const keyIndex = "key"
const sourceIndex = "source"
const priorityIndex = "priority"
const flagSetIdIndex = "flagSetId"
// compound indices; maintain sub-indexes alphabetically; order matters; these must match what's generated in the SelectorMapToQuery func.
const flagSetIdSourceCompoundIndex = flagSetIdIndex + "+" + sourceIndex
const keySourceCompoundIndex = keyIndex + "+" + sourceIndex
const flagSetIdKeySourceCompoundIndex = flagSetIdIndex + "+" + keyIndex + "+" + sourceIndex
// flagSetId defaults to a UUID generated at startup to make our queries consistent
// any flag without a "flagSetId" is assigned this one; it's never exposed externally
var nilFlagSetId = uuid.New().String()
// A selector represents a set of constraints used to query the store.
type Selector struct {
indexMap map[string]string
}
// NewSelector creates a new Selector from a selector expression string.
// For example, to select flags from source "./mySource" and flagSetId "1234", use the expression:
// "source=./mySource,flagSetId=1234"
func NewSelector(selectorExpression string) Selector {
return Selector{
indexMap: expressionToMap(selectorExpression),
}
}
func expressionToMap(sExp string) map[string]string {
selectorMap := make(map[string]string)
if sExp == "" {
return selectorMap
}
if strings.Index(sExp, "=") == -1 {
// if no '=' is found, treat the whole string as as source (backwards compatibility)
// we may may support interpreting this as a flagSetId in the future as an option
selectorMap[sourceIndex] = sExp
return selectorMap
}
// Split the selector by commas
pairs := strings.Split(sExp, ",")
for _, pair := range pairs {
// Split each pair by the first equal sign
parts := strings.Split(pair, "=")
if len(parts) == 2 {
key := parts[0]
value := parts[1]
selectorMap[key] = value
}
}
return selectorMap
}
func (s Selector) WithIndex(key string, value string) Selector {
m := maps.Clone(s.indexMap)
m[key] = value
return Selector{
indexMap: m,
}
}
func (s *Selector) IsEmpty() bool {
return s == nil || len(s.indexMap) == 0
}
// SelectorMapToQuery converts the selector map to an indexId and constraints for querying the store.
// For a given index, a specific order and number of constraints are required.
// Both the indexId and constraints are generated based on the keys present in the selector's internal map.
func (s Selector) ToQuery() (indexId string, constraints []interface{}) {
if len(s.indexMap) == 2 && s.indexMap[flagSetIdIndex] != "" && s.indexMap[keyIndex] != "" {
// special case for flagSetId and key (this is the "id" index)
return idIndex, []interface{}{s.indexMap[flagSetIdIndex], s.indexMap[keyIndex]}
}
qs := []string{}
keys := make([]string, 0, len(s.indexMap))
for key := range s.indexMap {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
indexId += key + "+"
qs = append(qs, s.indexMap[key])
}
indexId = strings.TrimSuffix(indexId, "+")
// Convert []string to []interface{}
c := make([]interface{}, 0, len(qs))
for _, v := range qs {
c = append(c, v)
}
constraints = c
return indexId, constraints
}
// SelectorToMetadata converts the selector's internal map to metadata for logging or tracing purposes.
// Only includes known indices to avoid leaking sensitive information, and is usually returned as the "top level" metadata
func (s *Selector) ToMetadata() model.Metadata {
meta := model.Metadata{}
if s == nil || s.indexMap == nil {
return meta
}
if s.indexMap[flagSetIdIndex] != "" {
meta[flagSetIdIndex] = s.indexMap[flagSetIdIndex]
}
if s.indexMap[sourceIndex] != "" {
meta[sourceIndex] = s.indexMap[sourceIndex]
}
return meta
}

View File

@ -0,0 +1,193 @@
package store
import (
"reflect"
"testing"
"github.com/open-feature/flagd/core/pkg/model"
)
func TestSelector_IsEmpty(t *testing.T) {
tests := []struct {
name string
selector *Selector
wantEmpty bool
}{
{
name: "nil selector",
selector: nil,
wantEmpty: true,
},
{
name: "nil indexMap",
selector: &Selector{indexMap: nil},
wantEmpty: true,
},
{
name: "empty indexMap",
selector: &Selector{indexMap: map[string]string{}},
wantEmpty: true,
},
{
name: "non-empty indexMap",
selector: &Selector{indexMap: map[string]string{"source": "abc"}},
wantEmpty: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.selector.IsEmpty()
if got != tt.wantEmpty {
t.Errorf("IsEmpty() = %v, want %v", got, tt.wantEmpty)
}
})
}
}
func TestSelector_WithIndex(t *testing.T) {
oldS := Selector{indexMap: map[string]string{"source": "abc"}}
newS := oldS.WithIndex("flagSetId", "1234")
if newS.indexMap["source"] != "abc" {
t.Errorf("WithIndex did not preserve existing keys")
}
if newS.indexMap["flagSetId"] != "1234" {
t.Errorf("WithIndex did not add new key")
}
// Ensure original is unchanged
if _, ok := oldS.indexMap["flagSetId"]; ok {
t.Errorf("WithIndex mutated original selector")
}
}
func TestSelector_ToQuery(t *testing.T) {
tests := []struct {
name string
selector Selector
wantIndex string
wantConstr []interface{}
}{
{
name: "flagSetId and key primary index special case",
selector: Selector{indexMap: map[string]string{"flagSetId": "fsid", "key": "myKey"}},
wantIndex: "id",
wantConstr: []interface{}{"fsid", "myKey"},
},
{
name: "multiple keys sorted",
selector: Selector{indexMap: map[string]string{"source": "src", "flagSetId": "fsid"}},
wantIndex: "flagSetId+source",
wantConstr: []interface{}{"fsid", "src"},
},
{
name: "single key",
selector: Selector{indexMap: map[string]string{"source": "src"}},
wantIndex: "source",
wantConstr: []interface{}{"src"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotIndex, gotConstr := tt.selector.ToQuery()
if gotIndex != tt.wantIndex {
t.Errorf("ToQuery() index = %v, want %v", gotIndex, tt.wantIndex)
}
if !reflect.DeepEqual(gotConstr, tt.wantConstr) {
t.Errorf("ToQuery() constraints = %v, want %v", gotConstr, tt.wantConstr)
}
})
}
}
func TestSelector_ToMetadata(t *testing.T) {
tests := []struct {
name string
selector *Selector
want model.Metadata
}{
{
name: "nil selector",
selector: nil,
want: model.Metadata{},
},
{
name: "nil indexMap",
selector: &Selector{indexMap: nil},
want: model.Metadata{},
},
{
name: "empty indexMap",
selector: &Selector{indexMap: map[string]string{}},
want: model.Metadata{},
},
{
name: "flagSetId only",
selector: &Selector{indexMap: map[string]string{"flagSetId": "fsid"}},
want: model.Metadata{"flagSetId": "fsid"},
},
{
name: "source only",
selector: &Selector{indexMap: map[string]string{"source": "src"}},
want: model.Metadata{"source": "src"},
},
{
name: "flagSetId and source",
selector: &Selector{indexMap: map[string]string{"flagSetId": "fsid", "source": "src"}},
want: model.Metadata{"flagSetId": "fsid", "source": "src"},
},
{
name: "flagSetId, source, and key (key should be ignored)",
selector: &Selector{indexMap: map[string]string{"flagSetId": "fsid", "source": "src", "key": "myKey"}},
want: model.Metadata{"flagSetId": "fsid", "source": "src"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.selector.ToMetadata()
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ToMetadata() = %v, want %v", got, tt.want)
}
})
}
}
func TestNewSelector(t *testing.T) {
tests := []struct {
name string
input string
wantMap map[string]string
}{
{
name: "source and flagSetId",
input: "source=abc,flagSetId=1234",
wantMap: map[string]string{"source": "abc", "flagSetId": "1234"},
},
{
name: "source",
input: "source=abc",
wantMap: map[string]string{"source": "abc"},
},
{
name: "no equals, treat as source",
input: "mysource",
wantMap: map[string]string{"source": "mysource"},
},
{
name: "empty string",
input: "",
wantMap: map[string]string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := NewSelector(tt.input)
if !reflect.DeepEqual(s.indexMap, tt.wantMap) {
t.Errorf("NewSelector(%q) indexMap = %v, want %v", tt.input, s.indexMap, tt.wantMap)
}
})
}
}

396
core/pkg/store/store.go Normal file
View File

@ -0,0 +1,396 @@
package store
import (
"context"
"encoding/json"
"fmt"
"slices"
"sync"
"github.com/hashicorp/go-memdb"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/notifications"
)
var noValidatedSources = []string{}
type SelectorContextKey struct{}
type FlagQueryResult struct {
Flags map[string]model.Flag
}
type IStore interface {
Get(ctx context.Context, key string, selector *Selector) (model.Flag, model.Metadata, error)
GetAll(ctx context.Context, selector *Selector) (map[string]model.Flag, model.Metadata, error)
Watch(ctx context.Context, selector *Selector, watcher chan<- FlagQueryResult)
}
var _ IStore = (*Store)(nil)
type Store struct {
mx sync.RWMutex
db *memdb.MemDB
logger *logger.Logger
sources []string
// deprecated: has no effect and will be removed soon.
FlagSources []string
}
type SourceDetails struct {
Source string
Selector string
}
// NewStore creates a new in-memory store with the given sources.
// The order of sources in the slice determines their priority, when queries result in duplicate flags (queries without source or flagSetId), the higher priority source "wins".
func NewStore(logger *logger.Logger, sources []string) (*Store, error) {
// a unique index must exist for each set of constraints - for example, to look up by key and source, we need a compound index on key+source, etc
// we maybe want to generate these dynamically in the future to support more robust querying, but for now we will hardcode the ones we need
schema := &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
flagsTable: {
Name: flagsTable,
Indexes: map[string]*memdb.IndexSchema{
// primary index; must be unique and named "id"
idIndex: {
Name: idIndex,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
&memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
},
},
},
// for looking up by source
sourceIndex: {
Name: sourceIndex,
Unique: false,
Indexer: &memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
},
// for looking up by priority, used to maintain highest priority flag when there are duplicates and no selector is provided
priorityIndex: {
Name: priorityIndex,
Unique: false,
Indexer: &memdb.IntFieldIndex{Field: model.Priority},
},
// for looking up by flagSetId
flagSetIdIndex: {
Name: flagSetIdIndex,
Unique: false,
Indexer: &memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
},
keyIndex: {
Name: keyIndex,
Unique: false,
Indexer: &memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
},
flagSetIdSourceCompoundIndex: {
Name: flagSetIdSourceCompoundIndex,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
&memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
},
},
},
keySourceCompoundIndex: {
Name: keySourceCompoundIndex,
Unique: false, // duplicate from a single source ARE allowed (they just must have different flag sets)
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
&memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
},
},
},
// used to query all flags from a specific source so we know which flags to delete if a flag is missing from a source
flagSetIdKeySourceCompoundIndex: {
Name: flagSetIdKeySourceCompoundIndex,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
&memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
&memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
},
},
},
},
},
},
}
// Create a new data base
db, err := memdb.NewMemDB(schema)
if err != nil {
return nil, fmt.Errorf("unable to initialize flag database: %w", err)
}
// clone the sources to avoid modifying the original slice
s := slices.Clone(sources)
return &Store{
sources: s,
db: db,
logger: logger,
}, nil
}
// Deprecated: use NewStore instead - will be removed very soon.
func NewFlags() *Store {
state, err := NewStore(logger.NewLogger(nil, false), noValidatedSources)
if err != nil {
panic(fmt.Sprintf("unable to create flag store: %v", err))
}
return state
}
func (s *Store) Get(_ context.Context, key string, selector *Selector) (model.Flag, model.Metadata, error) {
s.logger.Debug(fmt.Sprintf("getting flag %s", key))
txn := s.db.Txn(false)
queryMeta := selector.ToMetadata()
// if present, use the selector to query the flags
if !selector.IsEmpty() {
selector := selector.WithIndex("key", key)
indexId, constraints := selector.ToQuery()
s.logger.Debug(fmt.Sprintf("getting flag with query: %s, %v", indexId, constraints))
raw, err := txn.First(flagsTable, indexId, constraints...)
flag, ok := raw.(model.Flag)
if err != nil {
return model.Flag{}, queryMeta, fmt.Errorf("flag %s not found: %w", key, err)
}
if !ok {
return model.Flag{}, queryMeta, fmt.Errorf("flag %s is not a valid flag", key)
}
return flag, queryMeta, nil
}
// otherwise, get all flags with the given key, and keep the last one with the highest priority
s.logger.Debug(fmt.Sprintf("getting highest priority flag with key: %s", key))
it, err := txn.Get(flagsTable, keyIndex, key)
if err != nil {
return model.Flag{}, queryMeta, fmt.Errorf("flag %s not found: %w", key, err)
}
flag := model.Flag{}
found := false
for raw := it.Next(); raw != nil; raw = it.Next() {
nextFlag, ok := raw.(model.Flag)
if !ok {
continue
}
found = true
if nextFlag.Priority >= flag.Priority {
flag = nextFlag
} else {
s.logger.Debug(fmt.Sprintf("discarding flag %s from lower priority source %s in favor of flag from source %s", nextFlag.Key, s.sources[nextFlag.Priority], s.sources[flag.Priority]))
}
}
if !found {
return flag, queryMeta, fmt.Errorf("flag %s not found", key)
}
return flag, queryMeta, nil
}
func (f *Store) String() (string, error) {
f.logger.Debug("dumping flags to string")
f.mx.RLock()
defer f.mx.RUnlock()
state, _, err := f.GetAll(context.Background(), nil)
if err != nil {
return "", fmt.Errorf("unable to get all flags: %w", err)
}
bytes, err := json.Marshal(state)
if err != nil {
return "", fmt.Errorf("unable to marshal flags: %w", err)
}
return string(bytes), nil
}
// GetAll returns a copy of the store's state (copy in order to be concurrency safe)
func (s *Store) GetAll(ctx context.Context, selector *Selector) (map[string]model.Flag, model.Metadata, error) {
flags := make(map[string]model.Flag)
queryMeta := selector.ToMetadata()
it, err := s.selectOrAll(selector)
if err != nil {
s.logger.Error(fmt.Sprintf("flag query error: %v", err))
return flags, queryMeta, err
}
flags = s.collect(it)
return flags, queryMeta, nil
}
// Update the flag state with the provided flags.
func (s *Store) Update(
source string,
flags map[string]model.Flag,
metadata model.Metadata,
) (map[string]interface{}, bool) {
resyncRequired := false
if source == "" {
panic("source cannot be empty")
}
priority := slices.Index(s.sources, source)
if priority == -1 {
// this is a hack to allow old constructors that didn't pass sources, remove when we remove "NewFlags" constructor
if !slices.Equal(s.sources, noValidatedSources) {
panic(fmt.Sprintf("source %s is not registered in the store", source))
}
// same as above - remove when we remove "NewFlags" constructor
priority = 0
}
txn := s.db.Txn(true)
defer txn.Abort()
// get all flags for the source we are updating
selector := NewSelector(sourceIndex + "=" + source)
oldFlags, _, _ := s.GetAll(context.Background(), &selector)
s.mx.Lock()
for key := range oldFlags {
if _, ok := flags[key]; !ok {
// flag has been deleted
s.logger.Debug(fmt.Sprintf("flag %s has been deleted from source %s", key, source))
count, err := txn.DeleteAll(flagsTable, keySourceCompoundIndex, key, source)
s.logger.Debug(fmt.Sprintf("deleted %d flags with key %s from source %s", count, key, source))
if err != nil {
s.logger.Error(fmt.Sprintf("error deleting flag: %s, %v", key, err))
}
continue
}
}
s.mx.Unlock()
for key, newFlag := range flags {
s.logger.Debug(fmt.Sprintf("got metadata %v", metadata))
newFlag.Key = key
newFlag.Source = source
newFlag.Priority = priority
newFlag.Metadata = patchMetadata(metadata, newFlag.Metadata)
// flagSetId defaults to a UUID generated at startup to make our queries isomorphic
flagSetId := nilFlagSetId
// flagSetId is inherited from the set, but can be overridden by the flag
setFlagSetId, ok := newFlag.Metadata["flagSetId"].(string)
if ok {
flagSetId = setFlagSetId
}
newFlag.FlagSetId = flagSetId
raw, err := txn.First(flagsTable, keySourceCompoundIndex, key, source)
if err != nil {
s.logger.Error(fmt.Sprintf("unable to get flag %s from source %s: %v", key, source, err))
continue
}
oldFlag, ok := raw.(model.Flag)
// If we already have a flag with the same key and source, we need to check if it has the same flagSetId
if ok {
if oldFlag.FlagSetId != newFlag.FlagSetId {
// If the flagSetId is different, we need to delete the entry, since flagSetId+key represents the primary index, and it's now been changed.
// This is important especially for clients listening to flagSetId changes, as they expect the flag to be removed from the set in this case.
_, err = txn.DeleteAll(flagsTable, idIndex, oldFlag.FlagSetId, key)
if err != nil {
s.logger.Error(fmt.Sprintf("unable to delete flags with key %s and flagSetId %s: %v", key, oldFlag.FlagSetId, err))
continue
}
}
}
// Store the new version of the flag
s.logger.Debug(fmt.Sprintf("storing flag: %v", newFlag))
err = txn.Insert(flagsTable, newFlag)
if err != nil {
s.logger.Error(fmt.Sprintf("unable to insert flag %s: %v", key, err))
continue
}
}
txn.Commit()
return notifications.NewFromFlags(oldFlags, flags), resyncRequired
}
// Watch the result-set of a selector for changes, sending updates to the watcher channel.
func (s *Store) Watch(ctx context.Context, selector *Selector, watcher chan<- FlagQueryResult) {
go func() {
for {
ws := memdb.NewWatchSet()
it, err := s.selectOrAll(selector)
if err != nil {
s.logger.Error(fmt.Sprintf("error watching flags: %v", err))
close(watcher)
return
}
ws.Add(it.WatchCh())
flags := s.collect(it)
watcher <- FlagQueryResult{
Flags: flags,
}
if err = ws.WatchCtx(ctx); err != nil {
s.logger.Error(fmt.Sprintf("error watching flags: %v", err))
close(watcher)
return
}
}
}()
}
// returns an iterator for the given selector, or all flags if the selector is nil or empty
func (s *Store) selectOrAll(selector *Selector) (it memdb.ResultIterator, err error) {
txn := s.db.Txn(false)
if !selector.IsEmpty() {
indexId, constraints := selector.ToQuery()
s.logger.Debug(fmt.Sprintf("getting all flags with query: %s, %v", indexId, constraints))
return txn.Get(flagsTable, indexId, constraints...)
} else {
// no selector, get all flags
return txn.Get(flagsTable, idIndex)
}
}
// collects flags from an iterator, ensuring that only the highest priority flag is kept when there are duplicates
func (s *Store) collect(it memdb.ResultIterator) map[string]model.Flag {
flags := make(map[string]model.Flag)
for raw := it.Next(); raw != nil; raw = it.Next() {
flag := raw.(model.Flag)
if existing, ok := flags[flag.Key]; ok {
if flag.Priority < existing.Priority {
s.logger.Debug(fmt.Sprintf("discarding duplicate flag %s from lower priority source %s in favor of flag from source %s", flag.Key, s.sources[flag.Priority], s.sources[existing.Priority]))
continue // we already have a higher priority flag
}
s.logger.Debug(fmt.Sprintf("overwriting duplicate flag %s from lower priority source %s in favor of flag from source %s", flag.Key, s.sources[existing.Priority], s.sources[flag.Priority]))
}
flags[flag.Key] = flag
}
return flags
}
func patchMetadata(original, patch model.Metadata) model.Metadata {
patched := make(model.Metadata)
if original == nil && patch == nil {
return nil
}
for key, value := range original {
patched[key] = value
}
for key, value := range patch { // patch values overwrite m1 values on key conflict
patched[key] = value
}
return patched
}

View File

@ -0,0 +1,487 @@
package store
import (
"context"
"testing"
"time"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestUpdateFlags(t *testing.T) {
const source1 = "source1"
const source2 = "source2"
var sources = []string{source1, source2}
t.Parallel()
tests := []struct {
name string
setup func(t *testing.T) *Store
newFlags map[string]model.Flag
source string
wantFlags map[string]model.Flag
setMetadata model.Metadata
wantNotifs map[string]interface{}
wantResync bool
}{
{
name: "both nil",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
return s
},
source: source1,
newFlags: nil,
wantFlags: map[string]model.Flag{},
wantNotifs: map[string]interface{}{},
},
{
name: "both empty flags",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
return s
},
source: source1,
newFlags: map[string]model.Flag{},
wantFlags: map[string]model.Flag{},
wantNotifs: map[string]interface{}{},
},
{
name: "empty new",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
return s
},
source: source1,
newFlags: nil,
wantFlags: map[string]model.Flag{},
wantNotifs: map[string]interface{}{},
},
{
name: "update from source 1 (old flag removed)",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
s.Update(source1, map[string]model.Flag{
"waka": {DefaultVariant: "off"},
}, nil)
return s
},
newFlags: map[string]model.Flag{
"paka": {DefaultVariant: "on"},
},
source: source1,
wantFlags: map[string]model.Flag{
"paka": {Key: "paka", DefaultVariant: "on", Source: source1, FlagSetId: nilFlagSetId, Priority: 0},
},
wantNotifs: map[string]interface{}{
"paka": map[string]interface{}{"type": "write"},
"waka": map[string]interface{}{"type": "delete"},
},
},
{
name: "update from source 1 (new flag added)",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
s.Update(source1, map[string]model.Flag{
"waka": {DefaultVariant: "off"},
}, nil)
return s
},
newFlags: map[string]model.Flag{
"paka": {DefaultVariant: "on"},
},
source: source2,
wantFlags: map[string]model.Flag{
"waka": {Key: "waka", DefaultVariant: "off", Source: source1, FlagSetId: nilFlagSetId, Priority: 0},
"paka": {Key: "paka", DefaultVariant: "on", Source: source2, FlagSetId: nilFlagSetId, Priority: 1},
},
wantNotifs: map[string]interface{}{"paka": map[string]interface{}{"type": "write"}},
},
{
name: "flag set inheritance",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
s.Update(source1, map[string]model.Flag{}, model.Metadata{})
return s
},
setMetadata: model.Metadata{
"flagSetId": "topLevelSet", // top level set metadata, including flagSetId
},
newFlags: map[string]model.Flag{
"waka": {DefaultVariant: "on"},
"paka": {DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": "flagLevelSet"}}, // overrides set level flagSetId
},
source: source1,
wantFlags: map[string]model.Flag{
"waka": {Key: "waka", DefaultVariant: "on", Source: source1, FlagSetId: "topLevelSet", Priority: 0, Metadata: model.Metadata{"flagSetId": "topLevelSet"}},
"paka": {Key: "paka", DefaultVariant: "on", Source: source1, FlagSetId: "flagLevelSet", Priority: 0, Metadata: model.Metadata{"flagSetId": "flagLevelSet"}},
},
wantNotifs: map[string]interface{}{
"paka": map[string]interface{}{"type": "write"},
"waka": map[string]interface{}{"type": "write"},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
store := tt.setup(t)
gotNotifs, resyncRequired := store.Update(tt.source, tt.newFlags, tt.setMetadata)
gotFlags, _, _ := store.GetAll(context.Background(), nil)
require.Equal(t, tt.wantFlags, gotFlags)
require.Equal(t, tt.wantNotifs, gotNotifs)
require.Equal(t, tt.wantResync, resyncRequired)
})
}
}
func TestGet(t *testing.T) {
sourceA := "sourceA"
sourceB := "sourceB"
sourceC := "sourceC"
flagSetIdB := "flagSetIdA"
flagSetIdC := "flagSetIdC"
var sources = []string{sourceA, sourceB, sourceC}
sourceASelector := NewSelector("source=" + sourceA)
flagSetIdCSelector := NewSelector("flagSetId=" + flagSetIdC)
t.Parallel()
tests := []struct {
name string
key string
selector *Selector
wantFlag model.Flag
wantErr bool
}{
{
name: "nil selector",
key: "flagA",
selector: nil,
wantFlag: model.Flag{Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
wantErr: false,
},
{
name: "flagSetId selector",
key: "dupe",
selector: &flagSetIdCSelector,
wantFlag: model.Flag{Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
wantErr: false,
},
{
name: "source selector",
key: "dupe",
selector: &sourceASelector,
wantFlag: model.Flag{Key: "dupe", DefaultVariant: "on", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
wantErr: false,
},
{
name: "flag not found with source selector",
key: "flagB",
selector: &sourceASelector,
wantFlag: model.Flag{Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
wantErr: true,
},
{
name: "flag not found with flagSetId selector",
key: "flagB",
selector: &flagSetIdCSelector,
wantFlag: model.Flag{Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
wantErr: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
sourceAFlags := map[string]model.Flag{
"flagA": {Key: "flagA", DefaultVariant: "off"},
"dupe": {Key: "dupe", DefaultVariant: "on"},
}
sourceBFlags := map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdB}},
}
sourceCFlags := map[string]model.Flag{
"flagC": {Key: "flagC", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
"dupe": {Key: "dupe", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
}
store, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
store.Update(sourceA, sourceAFlags, nil)
store.Update(sourceB, sourceBFlags, nil)
store.Update(sourceC, sourceCFlags, nil)
gotFlag, _, err := store.Get(context.Background(), tt.key, tt.selector)
if !tt.wantErr {
require.Equal(t, tt.wantFlag, gotFlag)
} else {
require.Error(t, err, "expected an error for key %s with selector %v", tt.key, tt.selector)
}
})
}
}
func TestGetAllNoWatcher(t *testing.T) {
sourceA := "sourceA"
sourceB := "sourceB"
sourceC := "sourceC"
flagSetIdB := "flagSetIdA"
flagSetIdC := "flagSetIdC"
sources := []string{sourceA, sourceB, sourceC}
sourceASelector := NewSelector("source=" + sourceA)
flagSetIdCSelector := NewSelector("flagSetId=" + flagSetIdC)
t.Parallel()
tests := []struct {
name string
selector *Selector
wantFlags map[string]model.Flag
}{
{
name: "nil selector",
selector: nil,
wantFlags: map[string]model.Flag{
// "dupe" should be overwritten by higher priority flag
"flagA": {Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
"flagB": {Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
"flagC": {Key: "flagC", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
"dupe": {Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
},
},
{
name: "source selector",
selector: &sourceASelector,
wantFlags: map[string]model.Flag{
// we should get the "dupe" from sourceA
"flagA": {Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
"dupe": {Key: "dupe", DefaultVariant: "on", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
},
},
{
name: "flagSetId selector",
selector: &flagSetIdCSelector,
wantFlags: map[string]model.Flag{
// we should get the "dupe" from flagSetIdC
"flagC": {Key: "flagC", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
"dupe": {Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
sourceAFlags := map[string]model.Flag{
"flagA": {Key: "flagA", DefaultVariant: "off"},
"dupe": {Key: "dupe", DefaultVariant: "on"},
}
sourceBFlags := map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdB}},
}
sourceCFlags := map[string]model.Flag{
"flagC": {Key: "flagC", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
"dupe": {Key: "dupe", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
}
store, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
store.Update(sourceA, sourceAFlags, nil)
store.Update(sourceB, sourceBFlags, nil)
store.Update(sourceC, sourceCFlags, nil)
gotFlags, _, _ := store.GetAll(context.Background(), tt.selector)
require.Equal(t, len(tt.wantFlags), len(gotFlags))
require.Equal(t, tt.wantFlags, gotFlags)
})
}
}
func TestWatch(t *testing.T) {
sourceA := "sourceA"
sourceB := "sourceB"
sourceC := "sourceC"
myFlagSetId := "myFlagSet"
var sources = []string{sourceA, sourceB, sourceC}
pauseTime := 100 * time.Millisecond // time for updates to settle
timeout := 1000 * time.Millisecond // time to make sure we get enough updates, and no extras
sourceASelector := NewSelector("source=" + sourceA)
flagSetIdCSelector := NewSelector("flagSetId=" + myFlagSetId)
emptySelector := NewSelector("")
sourceCSelector := NewSelector("source=" + sourceC)
tests := []struct {
name string
selector *Selector
wantUpdates int
}{
{
name: "flag source selector (initial, plus 1 update)",
selector: &sourceASelector,
wantUpdates: 2,
},
{
name: "flag set selector (initial, plus 3 updates)",
selector: &flagSetIdCSelector,
wantUpdates: 4,
},
{
name: "no selector (all updates)",
selector: &emptySelector,
wantUpdates: 5,
},
{
name: "flag source selector for unchanged source (initial, plus no updates)",
selector: &sourceCSelector,
wantUpdates: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
sourceAFlags := map[string]model.Flag{
"flagA": {Key: "flagA", DefaultVariant: "off"},
}
sourceBFlags := map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
}
sourceCFlags := map[string]model.Flag{
"flagC": {Key: "flagC", DefaultVariant: "off"},
}
store, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
// setup initial flags
store.Update(sourceA, sourceAFlags, model.Metadata{})
store.Update(sourceB, sourceBFlags, model.Metadata{})
store.Update(sourceC, sourceCFlags, model.Metadata{})
watcher := make(chan FlagQueryResult, 1)
time.Sleep(pauseTime)
ctx, cancel := context.WithCancel(context.Background())
store.Watch(ctx, tt.selector, watcher)
// perform updates
go func() {
time.Sleep(pauseTime)
// changing a flag default variant should trigger an update
store.Update(sourceA, map[string]model.Flag{
"flagA": {Key: "flagA", DefaultVariant: "on"},
}, model.Metadata{})
time.Sleep(pauseTime)
// changing a flag default variant should trigger an update
store.Update(sourceB, map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
}, model.Metadata{})
time.Sleep(pauseTime)
// removing a flag set id should trigger an update (even for flag set id selectors; it should remove the flag from the set)
store.Update(sourceB, map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "on"},
}, model.Metadata{})
time.Sleep(pauseTime)
// adding a flag set id should trigger an update
store.Update(sourceB, map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
}, model.Metadata{})
}()
updates := 0
for {
select {
case <-time.After(timeout):
assert.Equal(t, tt.wantUpdates, updates, "expected %d updates, got %d", tt.wantUpdates, updates)
cancel()
_, open := <-watcher
assert.False(t, open, "watcher channel should be closed after cancel")
return
case q := <-watcher:
if q.Flags != nil {
updates++
}
}
}
})
}
}
func TestQueryMetadata(t *testing.T) {
sourceA := "sourceA"
otherSource := "otherSource"
nonExistingFlagSetId := "nonExistingFlagSetId"
var sources = []string{sourceA}
sourceAFlags := map[string]model.Flag{
"flagA": {Key: "flagA", DefaultVariant: "off"},
"flagB": {Key: "flagB", DefaultVariant: "on"},
}
store, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
// setup initial flags
store.Update(sourceA, sourceAFlags, model.Metadata{})
selector := NewSelector("source=" + otherSource + ",flagSetId=" + nonExistingFlagSetId)
_, metadata, _ := store.GetAll(context.Background(), &selector)
assert.Equal(t, metadata, model.Metadata{"source": otherSource, "flagSetId": nonExistingFlagSetId}, "metadata did not match expected")
selector = NewSelector("source=" + otherSource + ",flagSetId=" + nonExistingFlagSetId)
_, metadata, _ = store.Get(context.Background(), "key", &selector)
assert.Equal(t, metadata, model.Metadata{"source": otherSource, "flagSetId": nonExistingFlagSetId}, "metadata did not match expected")
}

View File

@ -58,6 +58,7 @@ func TestSimpleReSync(t *testing.T) {
func TestSimpleSync(t *testing.T) {
readDirName := t.TempDir()
updateDirName := t.TempDir()
deleteDirName := t.TempDir()
tests := map[string]struct {
manipulationFuncs []func(t *testing.T)
expectedDataSync []sync.DataSync
@ -98,6 +99,27 @@ func TestSimpleSync(t *testing.T) {
},
},
},
"delete-event": {
fetchDirName: deleteDirName,
manipulationFuncs: []func(t *testing.T){
func(t *testing.T) {
writeToFile(t, deleteDirName, fetchFileContents)
},
func(t *testing.T) {
deleteFile(t, deleteDirName, fetchFileName)
},
},
expectedDataSync: []sync.DataSync{
{
FlagData: fetchFileContents,
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
},
{
FlagData: defaultState,
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
},
},
},
}
for test, tt := range tests {

View File

@ -290,16 +290,16 @@ func TestHTTPSync_Fetch(t *testing.T) {
newETag := `"c2e01ce63d90109c4c7f4f6dcea97ed1bb2b51e3647f36caf5acbe27413a24bb"`
return &http.Response{
Header: map[string][]string{
Header: map[string][]string{
"Content-Type": {"application/json"},
"Etag": {newETag},
"Etag": {newETag},
},
Body: io.NopCloser(strings.NewReader(newContent)),
StatusCode: http.StatusOK,
}, nil
})
},
uri: "http://localhost",
uri: "http://localhost",
eTagHeader: `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`,
handleResponse: func(t *testing.T, httpSync Sync, _ string, err error) {
if err != nil {
@ -370,7 +370,7 @@ func TestSync_Init(t *testing.T) {
func TestHTTPSync_Resync(t *testing.T) {
ctrl := gomock.NewController(t)
source := "http://localhost"
emptyeFlagData := "{}"
emptyFlagData := "{}"
tests := map[string]struct {
setup func(t *testing.T, client *syncmock.MockClient)
@ -385,7 +385,7 @@ func TestHTTPSync_Resync(t *testing.T) {
setup: func(_ *testing.T, client *syncmock.MockClient) {
client.EXPECT().Do(gomock.Any()).Return(&http.Response{
Header: map[string][]string{"Content-Type": {"application/json"}},
Body: io.NopCloser(strings.NewReader(emptyeFlagData)),
Body: io.NopCloser(strings.NewReader(emptyFlagData)),
StatusCode: http.StatusOK,
}, nil)
},
@ -402,7 +402,7 @@ func TestHTTPSync_Resync(t *testing.T) {
wantErr: false,
wantNotifications: []sync.DataSync{
{
FlagData: emptyeFlagData,
FlagData: emptyFlagData,
Source: source,
},
},

View File

@ -0,0 +1,77 @@
---
status: accepted
author: @tangenti
created: 2025-06-16
updated: 2025-06-16
---
# Decouple flag sync sources and flag sets
The goal is to support dynamic flag sets for flagd providers and decouple sources and flag sets.
## Background
Flagd daemon syncs flag configurations from multiple sources. A single source provides a single config, which has an optional flag set ID that may or may not change in the following syncs of the same source.
The in-process provider uses `selector` to specify the desired source. In order to get a desired flag set, a provider has to stick to a source that provides that flag set. In this case, the flagd daemon cannot remove a source without breaking the dependant flagd providers.
Assumptions of the current model
- `flagSetId`s must be unique across different sources or the configuration is considered invalid.
- In-process providers request at most one flag set.
## Requirements
- Flagd daemon can remove a source without breaking in-process providers that depend on the flag set the source provides.
- In-process providers can select based on flag sets.
- No breaking changes for the current usage of `selector`
## Proposal
### API change
#### Flag Configuration Schema
Add an optional field `flagsetID` under `flag` or `flag.metadata`. The flag set ID cannot be specified if a flag set ID is specified for the config.
### Flagd Sync Selector
Selector will be extended for generic flags selection, starting with checking the equivalence of `source` and `flagsetID` of flags.
Example
```yaml
# Flags from the source `override`
selector: override
# Flags from the source `override`
selector: source=override
# Flags from the flag set `project-42`
selector: flagsetID=project-42
```
The semantic can later be extended with a more complex design, such as AIP-160 filter or Kubernetes selections. This is out of the scope of this ADR.
### Flagd Daemon Storage
1. Flagd will have separate stores for `flags` and `sources`
2. `selector` will be removed from the store
3. `flagSetID` will be added as part of `model.Flag` or under `model.Flag.Metadata` for better consistency with the API.
### Flags Sync
Sync server would count the extended syntax of `selector` and filter the list of flags on-the-fly answering the requests from the providers.
The existing conflict resolving based on sources remains the same. Resyncs on removing flags remains unchanged as well.
## Consequences
### The good
- One source can have multiple flag sets.
- `selector` works on a more grandular level.
- No breaking change
- Sync servers and clients now hold the same understanding of the `selector` semantic.

View File

@ -1,5 +1,5 @@
---
status: proposed
status: accepted
author: @tangenti
created: 2025-06-27
updated: 2025-06-27

View File

@ -14,6 +14,7 @@ flagd start [flags]
-H, --context-from-header stringToString add key-value pairs to map header values to context values, where key is Header name, value is context key (default [])
-X, --context-value stringToString add arbitrary key value pairs to the flag evaluation context (default [])
-C, --cors-origin strings CORS allowed origins, * will allow all origins
--disable-sync-metadata Disables the getMetadata endpoint of the sync service. Defaults to false, but will default to true in later versions.
-h, --help help for start
-z, --log-format string Set the logging format, e.g. console or json (default "console")
-m, --management-port int32 Port for management operations (default 8014)

View File

@ -1,5 +1,12 @@
# Changelog
## [0.12.9](https://github.com/open-feature/flagd/compare/flagd/v0.12.8...flagd/v0.12.9) (2025-07-28)
### ✨ New Features
* Add toggle for disabling getMetadata request ([#1693](https://github.com/open-feature/flagd/issues/1693)) ([e8fd680](https://github.com/open-feature/flagd/commit/e8fd6808608caa7ff7e54792fe97d88e7c294486))
## [0.12.8](https://github.com/open-feature/flagd/compare/flagd/v0.12.7...flagd/v0.12.8) (2025-07-21)

View File

@ -36,6 +36,7 @@ const (
syncPortFlagName = "sync-port"
syncSocketPathFlagName = "sync-socket-path"
uriFlagName = "uri"
disableSyncMetadata = "disable-sync-metadata"
contextValueFlagName = "context-value"
headerToContextKeyFlagName = "context-from-header"
streamDeadlineFlagName = "stream-deadline"
@ -89,6 +90,7 @@ func init() {
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map "+
"header values to context values, where key is Header name, value is context key")
flags.Duration(streamDeadlineFlagName, 0, "Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).")
flags.Bool(disableSyncMetadata, false, "Disables the getMetadata endpoint of the sync service. Defaults to false, but will default to true in later versions.")
bindFlags(flags)
}
@ -114,6 +116,7 @@ func bindFlags(flags *pflag.FlagSet) {
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
_ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName))
_ = viper.BindPFlag(disableSyncMetadata, flags.Lookup(disableSyncMetadata))
}
// startCmd represents the start command
@ -186,6 +189,7 @@ var startCmd = &cobra.Command{
SyncServicePort: viper.GetUint16(syncPortFlagName),
SyncServiceSocketPath: viper.GetString(syncSocketPathFlagName),
StreamDeadline: viper.GetDuration(streamDeadlineFlagName),
DisableSyncMetadata: viper.GetBool(disableSyncMetadata),
SyncProviders: syncProviders,
ContextValues: contextValuesToMap,
HeaderToContextKeyMappings: headerToContextKeyMappings,

View File

@ -110,6 +110,9 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.14.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-memdb v1.3.5 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect

View File

@ -313,6 +313,14 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0Ntos
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 h1:X5VWvz21y3gzm9Nw/kaUeku/1+uBhcekkmy4IkffJww=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1/go.mod h1:Zanoh4+gvIgluNqcfMVTJueD4wSS5hT7zTt4Mrutd90=
github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc=
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-memdb v1.3.5 h1:b3taDMxCBCBVgyRrS1AZVHO14ubMYZB++QpNhBg+Nyo=
github.com/hashicorp/go-memdb v1.3.5/go.mod h1:8IVKKBkVe+fxFgdFOYxzQQNjz+sWCyHCdIC/+5+Vy1Y=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=

View File

@ -39,6 +39,7 @@ type Config struct {
SyncServicePort uint16
SyncServiceSocketPath string
StreamDeadline time.Duration
DisableSyncMetadata bool
SyncProviders []sync.SourceConfig
CORS []string
@ -78,21 +79,20 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
logger.Error(fmt.Sprintf("error building metrics recorder: %v", err))
}
// build flag store, collect flag sources & fill sources details
s := store.NewFlags()
sources := []string{}
for _, provider := range config.SyncProviders {
s.FlagSources = append(s.FlagSources, provider.URI)
s.SourceDetails[provider.URI] = store.SourceDetails{
Source: provider.URI,
Selector: provider.Selector,
}
sources = append(sources, provider.URI)
}
// build flag store, collect flag sources & fill sources details
store, err := store.NewStore(logger, sources)
if err != nil {
return nil, fmt.Errorf("error creating flag store: %w", err)
}
// derive evaluator
jsonEvaluator := evaluator.NewJSON(logger, s)
jsonEvaluator := evaluator.NewJSON(logger, store)
// derive services
@ -100,6 +100,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
connectService := flageval.NewConnectService(
logger.WithFields(zap.String("component", "service")),
jsonEvaluator,
store,
recorder)
// ofrep service
@ -111,20 +112,21 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
config.HeaderToContextKeyMappings,
)
if err != nil {
return nil, fmt.Errorf("error creating ofrep service")
return nil, fmt.Errorf("error creating OFREP service: %w", err)
}
// flag sync service
flagSyncService, err := flagsync.NewSyncService(flagsync.SvcConfigurations{
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
Port: config.SyncServicePort,
Sources: sources,
Store: s,
ContextValues: config.ContextValues,
KeyPath: config.ServiceKeyPath,
CertPath: config.ServiceCertPath,
SocketPath: config.SyncServiceSocketPath,
StreamDeadline: config.StreamDeadline,
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
Port: config.SyncServicePort,
Sources: sources,
Store: store,
ContextValues: config.ContextValues,
KeyPath: config.ServiceKeyPath,
CertPath: config.ServiceCertPath,
SocketPath: config.SyncServiceSocketPath,
StreamDeadline: config.StreamDeadline,
DisableSyncMetadata: config.DisableSyncMetadata,
})
if err != nil {
return nil, fmt.Errorf("error creating sync service: %w", err)
@ -144,11 +146,11 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
}
return &Runtime{
Logger: logger.WithFields(zap.String("component", "runtime")),
Evaluator: jsonEvaluator,
FlagSync: flagSyncService,
OfrepService: ofrepService,
Service: connectService,
Logger: logger.WithFields(zap.String("component", "runtime")),
Evaluator: jsonEvaluator,
SyncService: flagSyncService,
OfrepService: ofrepService,
EvaluationService: connectService,
ServiceConfig: service.Configuration{
Port: config.ServicePort,
ManagementPort: config.ManagementPort,
@ -162,7 +164,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
HeaderToContextKeyMappings: config.HeaderToContextKeyMappings,
StreamDeadline: config.StreamDeadline,
},
SyncImpl: iSyncs,
Syncs: iSyncs,
}, nil
}

View File

@ -19,23 +19,23 @@ import (
)
type Runtime struct {
Evaluator evaluator.IEvaluator
Logger *logger.Logger
FlagSync flagsync.ISyncService
OfrepService ofrep.IOfrepService
Service service.IFlagEvaluationService
ServiceConfig service.Configuration
SyncImpl []sync.ISync
Evaluator evaluator.IEvaluator
Logger *logger.Logger
SyncService flagsync.ISyncService
OfrepService ofrep.IOfrepService
EvaluationService service.IFlagEvaluationService
ServiceConfig service.Configuration
Syncs []sync.ISync
mu msync.Mutex
}
//nolint:funlen
func (r *Runtime) Start() error {
if r.Service == nil {
if r.EvaluationService == nil {
return errors.New("no service set")
}
if len(r.SyncImpl) == 0 {
if len(r.Syncs) == 0 {
return errors.New("no sync implementation set")
}
if r.Evaluator == nil {
@ -44,40 +44,26 @@ func (r *Runtime) Start() error {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
g, gCtx := errgroup.WithContext(ctx)
dataSync := make(chan sync.DataSync, len(r.SyncImpl))
dataSync := make(chan sync.DataSync, len(r.Syncs))
// Initialize DataSync channel watcher
g.Go(func() error {
for {
select {
case data := <-dataSync:
// resync events are triggered when a delete occurs during flag merges in the store
// resync events may trigger further resync events, however for a flag to be deleted from the store
// its source must match, preventing the opportunity for resync events to snowball
if resyncRequired := r.updateAndEmit(data); resyncRequired {
for _, s := range r.SyncImpl {
p := s
g.Go(func() error {
err := p.ReSync(gCtx, dataSync)
if err != nil {
return fmt.Errorf("error resyncing sources: %w", err)
}
return nil
})
}
}
r.updateAndEmit(data)
case <-gCtx.Done():
return nil
}
}
})
// Init sync providers
for _, s := range r.SyncImpl {
for _, s := range r.Syncs {
if err := s.Init(gCtx); err != nil {
return fmt.Errorf("sync provider Init returned error: %w", err)
}
}
// Start sync provider
for _, s := range r.SyncImpl {
for _, s := range r.Syncs {
p := s
g.Go(func() error {
if err := p.Sync(gCtx, dataSync); err != nil {
@ -89,14 +75,14 @@ func (r *Runtime) Start() error {
defer func() {
r.Logger.Info("Shutting down server...")
r.Service.Shutdown()
r.EvaluationService.Shutdown()
r.Logger.Info("Server successfully shutdown.")
}()
g.Go(func() error {
// Readiness probe rely on the runtime
r.ServiceConfig.ReadinessProbe = r.isReady
if err := r.Service.Serve(gCtx, r.ServiceConfig); err != nil {
if err := r.EvaluationService.Serve(gCtx, r.ServiceConfig); err != nil {
return fmt.Errorf("error returned from serving flag evaluation service: %w", err)
}
return nil
@ -112,7 +98,7 @@ func (r *Runtime) Start() error {
})
g.Go(func() error {
err := r.FlagSync.Start(gCtx)
err := r.SyncService.Start(gCtx)
if err != nil {
return fmt.Errorf("error from sync server: %w", err)
}
@ -128,7 +114,7 @@ func (r *Runtime) Start() error {
func (r *Runtime) isReady() bool {
// if all providers can watch for flag changes, we are ready.
for _, p := range r.SyncImpl {
for _, p := range r.Syncs {
if !p.IsReady() {
return false
}
@ -137,24 +123,14 @@ func (r *Runtime) isReady() bool {
}
// updateAndEmit helps to update state, notify changes and trigger sync updates
func (r *Runtime) updateAndEmit(payload sync.DataSync) bool {
func (r *Runtime) updateAndEmit(payload sync.DataSync) {
r.mu.Lock()
defer r.mu.Unlock()
notifications, resyncRequired, err := r.Evaluator.SetState(payload)
_, _, err := r.Evaluator.SetState(payload)
if err != nil {
r.Logger.Error(err.Error())
return false
r.Logger.Error(fmt.Sprintf("error setting state: %v", err))
return
}
r.Service.Notify(service.Notification{
Type: service.ConfigurationChange,
Data: map[string]interface{}{
"flags": notifications,
},
})
r.FlagSync.Emit(resyncRequired, payload.Source)
return resyncRequired
r.SyncService.Emit(payload.Source)
}

View File

@ -0,0 +1,3 @@
package service
const FLAGD_SELECTOR_HEADER = "Flagd-Selector"

View File

@ -16,6 +16,7 @@ import (
"github.com/open-feature/flagd/core/pkg/evaluator"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/telemetry"
"github.com/open-feature/flagd/flagd/pkg/service/middleware"
corsmw "github.com/open-feature/flagd/flagd/pkg/service/middleware/cors"
@ -71,15 +72,17 @@ type ConnectService struct {
// NewConnectService creates a ConnectService with provided parameters
func NewConnectService(
logger *logger.Logger, evaluator evaluator.IEvaluator, mRecorder telemetry.IMetricsRecorder,
logger *logger.Logger, evaluator evaluator.IEvaluator, store store.IStore, mRecorder telemetry.IMetricsRecorder,
) *ConnectService {
cs := &ConnectService{
logger: logger,
eval: evaluator,
metrics: &telemetry.NoopMetricsRecorder{},
eventingConfiguration: &eventingConfiguration{
subs: make(map[interface{}]chan service.Notification),
mu: &sync.RWMutex{},
subs: make(map[interface{}]chan service.Notification),
mu: &sync.RWMutex{},
store: store,
logger: logger,
},
}
if mRecorder != nil {

View File

@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"os"
"sync"
"testing"
"time"
@ -14,9 +15,11 @@ import (
mock "github.com/open-feature/flagd/core/pkg/evaluator/mock"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/notifications"
iservice "github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/telemetry"
"github.com/open-feature/flagd/flagd/pkg/service/middleware/mock"
middlewaremock "github.com/open-feature/flagd/flagd/pkg/service/middleware/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
@ -81,7 +84,7 @@ func TestConnectService_UnixConnection(t *testing.T) {
exp := metric.NewManualReader()
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, tt.name)
svc := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)
svc := NewConnectService(logger.NewLogger(nil, false), eval, &store.Store{}, metricRecorder)
serveConf := iservice.Configuration{
ReadinessProbe: func() bool {
return true
@ -136,7 +139,7 @@ func TestAddMiddleware(t *testing.T) {
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")
svc := NewConnectService(logger.NewLogger(nil, false), nil, metricRecorder)
svc := NewConnectService(logger.NewLogger(nil, false), nil, &store.Store{}, metricRecorder)
serveConf := iservice.Configuration{
ReadinessProbe: func() bool {
@ -173,16 +176,22 @@ func TestConnectServiceNotify(t *testing.T) {
// given
ctrl := gomock.NewController(t)
eval := mock.NewMockIEvaluator(ctrl)
sources := []string{"source1", "source2"}
log := logger.NewLogger(nil, false)
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
exp := metric.NewManualReader()
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")
service := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)
service := NewConnectService(logger.NewLogger(nil, false), eval, s, metricRecorder)
sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration
eventing.Subscribe("key", sChan)
eventing.Subscribe(context.Background(), "key", nil, sChan)
// notification type
ofType := iservice.ConfigurationChange
@ -207,20 +216,73 @@ func TestConnectServiceNotify(t *testing.T) {
}
}
func TestConnectServiceWatcher(t *testing.T) {
sources := []string{"source1", "source2"}
log := logger.NewLogger(nil, false)
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
sChan := make(chan iservice.Notification, 1)
eventing := eventingConfiguration{
store: s,
logger: log,
mu: &sync.RWMutex{},
subs: make(map[any]chan iservice.Notification),
}
// subscribe and wait for for the sub to be active
eventing.Subscribe(context.Background(), "anything", nil, sChan)
time.Sleep(100 * time.Millisecond)
// make a change
s.Update(sources[0], map[string]model.Flag{
"flag1": {
Key: "flag1",
DefaultVariant: "off",
},
}, model.Metadata{})
// notification type
ofType := iservice.ConfigurationChange
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case n := <-sChan:
require.Equal(t, ofType, n.Type, "expected notification type: %s, but received %s", ofType, n.Type)
notifications := n.Data["flags"].(notifications.Notifications)
flag1, ok := notifications["flag1"].(map[string]interface{})
require.True(t, ok, "flag1 notification should be a map[string]interface{}")
require.Equal(t, flag1["type"], string(model.NotificationCreate), "expected notification type: %s, but received %s", model.NotificationCreate, flag1["type"])
case <-timeout.Done():
t.Error("timeout while waiting for notifications")
}
}
func TestConnectServiceShutdown(t *testing.T) {
// given
ctrl := gomock.NewController(t)
eval := mock.NewMockIEvaluator(ctrl)
sources := []string{"source1", "source2"}
log := logger.NewLogger(nil, false)
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
exp := metric.NewManualReader()
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")
service := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)
service := NewConnectService(logger.NewLogger(nil, false), eval, s, metricRecorder)
sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration
eventing.Subscribe("key", sChan)
eventing.Subscribe(context.Background(), "key", nil, sChan)
// notification type
ofType := iservice.Shutdown

View File

@ -1,29 +1,65 @@
package service
import (
"context"
"fmt"
"sync"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/notifications"
iservice "github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
)
// IEvents is an interface for event subscriptions
type IEvents interface {
Subscribe(id any, notifyChan chan iservice.Notification)
Subscribe(ctx context.Context, id any, selector *store.Selector, notifyChan chan iservice.Notification)
Unsubscribe(id any)
EmitToAll(n iservice.Notification)
}
var _ IEvents = &eventingConfiguration{}
// eventingConfiguration is a wrapper for notification subscriptions
type eventingConfiguration struct {
mu *sync.RWMutex
subs map[any]chan iservice.Notification
mu *sync.RWMutex
subs map[any]chan iservice.Notification
store store.IStore
logger *logger.Logger
}
func (eventing *eventingConfiguration) Subscribe(id any, notifyChan chan iservice.Notification) {
func (eventing *eventingConfiguration) Subscribe(ctx context.Context, id any, selector *store.Selector, notifier chan iservice.Notification) {
eventing.mu.Lock()
defer eventing.mu.Unlock()
eventing.subs[id] = notifyChan
// proxy events from our store watcher to the notify channel, so that RPC mode event streams
watcher := make(chan store.FlagQueryResult, 1)
go func() {
// store the previous flags to compare against new notifications, to compute proper diffs for RPC mode
var oldFlags map[string]model.Flag
for result := range watcher {
newFlags := result.Flags
// ignore the first notification (nil old flags), the watcher emits on initialization, but for RPC we don't care until there's a change
if oldFlags != nil {
notifications := notifications.NewFromFlags(oldFlags, newFlags)
notifier <- iservice.Notification{
Type: iservice.ConfigurationChange,
Data: map[string]interface{}{
"flags": notifications,
},
}
}
oldFlags = result.Flags
}
eventing.logger.Debug(fmt.Sprintf("closing notify channel for id %v", id))
close(notifier)
}()
eventing.store.Watch(ctx, selector, watcher)
eventing.subs[id] = notifier
}
func (eventing *eventingConfiguration) EmitToAll(n iservice.Notification) {

View File

@ -1,18 +1,29 @@
package service
import (
"context"
"sync"
"testing"
"github.com/open-feature/flagd/core/pkg/logger"
iservice "github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/stretchr/testify/require"
)
func TestSubscribe(t *testing.T) {
// given
sources := []string{"source1", "source2"}
log := logger.NewLogger(nil, false)
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
eventing := &eventingConfiguration{
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
store: s,
}
idA := "a"
@ -22,8 +33,8 @@ func TestSubscribe(t *testing.T) {
chanB := make(chan iservice.Notification, 1)
// when
eventing.Subscribe(idA, chanA)
eventing.Subscribe(idB, chanB)
eventing.Subscribe(context.Background(), idA, nil, chanA)
eventing.Subscribe(context.Background(), idB, nil, chanB)
// then
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
@ -32,9 +43,16 @@ func TestSubscribe(t *testing.T) {
func TestUnsubscribe(t *testing.T) {
// given
sources := []string{"source1", "source2"}
log := logger.NewLogger(nil, false)
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
eventing := &eventingConfiguration{
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
store: s,
}
idA := "a"
@ -43,8 +61,8 @@ func TestUnsubscribe(t *testing.T) {
chanB := make(chan iservice.Notification, 1)
// when
eventing.Subscribe(idA, chanA)
eventing.Subscribe(idB, chanB)
eventing.Subscribe(context.Background(), idA, nil, chanA)
eventing.Subscribe(context.Background(), idB, nil, chanB)
eventing.Unsubscribe(idA)

View File

@ -12,7 +12,9 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/telemetry"
flagdService "github.com/open-feature/flagd/flagd/pkg/service"
"github.com/rs/xid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
@ -67,13 +69,17 @@ func (s *OldFlagEvaluationService) ResolveAll(
) (*connect.Response[schemaV1.ResolveAllResponse], error) {
reqID := xid.New().String()
defer s.logger.ClearFields(reqID)
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
res := &schemaV1.ResolveAllResponse{
Flags: make(map[string]*schemaV1.AnyFlag),
}
values, _, err := s.eval.ResolveAllValues(sCtx, reqID, mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), make(map[string]string)))
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
values, _, err := s.eval.ResolveAllValues(ctx, reqID, mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), make(map[string]string)))
if err != nil {
s.logger.WarnWithID(reqID, fmt.Sprintf("error resolving all flags: %v", err))
return nil, fmt.Errorf("error resolving flags. Tracking ID: %s", reqID)
@ -82,7 +88,7 @@ func (s *OldFlagEvaluationService) ResolveAll(
span.SetAttributes(attribute.Int("feature_flag.count", len(values)))
for _, value := range values {
// register the impression and reason for each flag evaluated
s.metrics.RecordEvaluation(sCtx, value.Error, value.Reason, value.Variant, value.FlagKey)
s.metrics.RecordEvaluation(ctx, value.Error, value.Reason, value.Variant, value.FlagKey)
switch v := value.Value.(type) {
case bool:
@ -133,8 +139,12 @@ func (s *OldFlagEvaluationService) EventStream(
req *connect.Request[schemaV1.EventStreamRequest],
stream *connect.ServerStream[schemaV1.EventStreamResponse],
) error {
s.logger.Debug(fmt.Sprintf("starting event stream for request"))
requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
s.eventingConfiguration.Subscribe(ctx, req, &selector, requestNotificationChan)
defer s.eventingConfiguration.Unsubscribe(req)
requestNotificationChan <- service.Notification{
@ -172,12 +182,15 @@ func (s *OldFlagEvaluationService) ResolveBoolean(
ctx context.Context,
req *connect.Request[schemaV1.ResolveBooleanRequest],
) (*connect.Response[schemaV1.ResolveBooleanResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
res := connect.NewResponse(&schemaV1.ResolveBooleanResponse{})
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
err := resolve[bool](
sCtx,
ctx,
s.logger,
s.eval.ResolveBooleanValue,
req.Header(),
@ -201,12 +214,16 @@ func (s *OldFlagEvaluationService) ResolveString(
ctx context.Context,
req *connect.Request[schemaV1.ResolveStringRequest],
) (*connect.Response[schemaV1.ResolveStringResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&schemaV1.ResolveStringResponse{})
err := resolve[string](
sCtx,
ctx,
s.logger,
s.eval.ResolveStringValue,
req.Header(),
@ -230,12 +247,16 @@ func (s *OldFlagEvaluationService) ResolveInt(
ctx context.Context,
req *connect.Request[schemaV1.ResolveIntRequest],
) (*connect.Response[schemaV1.ResolveIntResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&schemaV1.ResolveIntResponse{})
err := resolve[int64](
sCtx,
ctx,
s.logger,
s.eval.ResolveIntValue,
req.Header(),
@ -259,12 +280,16 @@ func (s *OldFlagEvaluationService) ResolveFloat(
ctx context.Context,
req *connect.Request[schemaV1.ResolveFloatRequest],
) (*connect.Response[schemaV1.ResolveFloatResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&schemaV1.ResolveFloatResponse{})
err := resolve[float64](
sCtx,
ctx,
s.logger,
s.eval.ResolveFloatValue,
req.Header(),
@ -288,12 +313,16 @@ func (s *OldFlagEvaluationService) ResolveObject(
ctx context.Context,
req *connect.Request[schemaV1.ResolveObjectRequest],
) (*connect.Response[schemaV1.ResolveObjectResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&schemaV1.ResolveObjectResponse{})
err := resolve[map[string]any](
sCtx,
ctx,
s.logger,
s.eval.ResolveObjectValue,
req.Header(),
@ -312,7 +341,7 @@ func (s *OldFlagEvaluationService) ResolveObject(
return res, err
}
// mergeContexts combines context values from headers, static context (from cli) and request context.
// mergeContexts combines context values from headers, static context (from cli) and request context.
// highest priority > header-context-from-cli > static-context-from-cli > request-context > lowest priority
func mergeContexts(reqCtx, configFlagsCtx map[string]any, headers http.Header, headerToContextKeyMappings map[string]string) map[string]any {
merged := make(map[string]any)
@ -338,7 +367,7 @@ func resolve[T constraints](ctx context.Context, logger *logger.Logger, resolver
reqID := xid.New().String()
defer logger.ClearFields(reqID)
mergedContext := mergeContexts(evaluationContext.AsMap(), configContextValues, header, configHeaderToContextKeyMappings)
mergedContext := mergeContexts(evaluationContext.AsMap(), configContextValues, header, configHeaderToContextKeyMappings)
logger.WriteFields(
reqID,

View File

@ -11,7 +11,9 @@ import (
"github.com/open-feature/flagd/core/pkg/evaluator"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/telemetry"
flagdService "github.com/open-feature/flagd/flagd/pkg/service"
"github.com/rs/xid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
@ -66,16 +68,19 @@ func (s *FlagEvaluationService) ResolveAll(
reqID := xid.New().String()
defer s.logger.ClearFields(reqID)
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
res := &evalV1.ResolveAllResponse{
Flags: make(map[string]*evalV1.AnyFlag),
}
context := mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), s.headerToContextKeyMappings)
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
evaluationContext := mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), s.headerToContextKeyMappings)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
resolutions, flagSetMetadata, err := s.eval.ResolveAllValues(sCtx, reqID, context)
resolutions, flagSetMetadata, err := s.eval.ResolveAllValues(ctx, reqID, evaluationContext)
if err != nil {
s.logger.WarnWithID(reqID, fmt.Sprintf("error resolving all flags: %v", err))
return nil, fmt.Errorf("error resolving flags. Tracking ID: %s", reqID)
@ -84,7 +89,7 @@ func (s *FlagEvaluationService) ResolveAll(
span.SetAttributes(attribute.Int("feature_flag.count", len(resolutions)))
for _, resolved := range resolutions {
// register the impression and reason for each flag evaluated
s.metrics.RecordEvaluation(sCtx, resolved.Error, resolved.Reason, resolved.Variant, resolved.FlagKey)
s.metrics.RecordEvaluation(ctx, resolved.Error, resolved.Reason, resolved.Variant, resolved.FlagKey)
switch v := resolved.Value.(type) {
case bool:
res.Flags[resolved.FlagKey] = &evalV1.AnyFlag{
@ -147,17 +152,21 @@ func (s *FlagEvaluationService) EventStream(
req *connect.Request[evalV1.EventStreamRequest],
stream *connect.ServerStream[evalV1.EventStreamResponse],
) error {
serviceCtx := ctx
// attach server-side stream deadline to context
s.logger.Debug("starting event stream for request")
if s.deadline != 0 {
streamDeadline := time.Now().Add(s.deadline)
deadlineCtx, cancel := context.WithDeadline(ctx, streamDeadline)
serviceCtx = deadlineCtx
ctx = deadlineCtx
defer cancel()
}
s.logger.Debug("starting event stream for request")
requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
s.eventingConfiguration.Subscribe(ctx, req, &selector, requestNotificationChan)
defer s.eventingConfiguration.Unsubscribe(req)
requestNotificationChan <- service.Notification{
@ -184,8 +193,8 @@ func (s *FlagEvaluationService) EventStream(
if err != nil {
s.logger.Error(err.Error())
}
case <-serviceCtx.Done():
if errors.Is(serviceCtx.Err(), context.DeadlineExceeded) {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
s.logger.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
return connect.NewError(connect.CodeDeadlineExceeded, fmt.Errorf("%s", "stream closed due to server-side timeout"))
}
@ -198,12 +207,16 @@ func (s *FlagEvaluationService) ResolveBoolean(
ctx context.Context,
req *connect.Request[evalV1.ResolveBooleanRequest],
) (*connect.Response[evalV1.ResolveBooleanResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&evalV1.ResolveBooleanResponse{})
err := resolve(
sCtx,
ctx,
s.logger,
s.eval.ResolveBooleanValue,
req.Header(),
@ -226,12 +239,16 @@ func (s *FlagEvaluationService) ResolveString(
ctx context.Context,
req *connect.Request[evalV1.ResolveStringRequest],
) (*connect.Response[evalV1.ResolveStringResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&evalV1.ResolveStringResponse{})
err := resolve(
sCtx,
ctx,
s.logger,
s.eval.ResolveStringValue,
req.Header(),
@ -254,12 +271,16 @@ func (s *FlagEvaluationService) ResolveInt(
ctx context.Context,
req *connect.Request[evalV1.ResolveIntRequest],
) (*connect.Response[evalV1.ResolveIntResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&evalV1.ResolveIntResponse{})
err := resolve(
sCtx,
ctx,
s.logger,
s.eval.ResolveIntValue,
req.Header(),
@ -282,12 +303,16 @@ func (s *FlagEvaluationService) ResolveFloat(
ctx context.Context,
req *connect.Request[evalV1.ResolveFloatRequest],
) (*connect.Response[evalV1.ResolveFloatResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&evalV1.ResolveFloatResponse{})
err := resolve(
sCtx,
ctx,
s.logger,
s.eval.ResolveFloatValue,
req.Header(),
@ -310,12 +335,16 @@ func (s *FlagEvaluationService) ResolveObject(
ctx context.Context,
req *connect.Request[evalV1.ResolveObjectRequest],
) (*connect.Response[evalV1.ResolveObjectResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&evalV1.ResolveObjectResponse{})
err := resolve(
sCtx,
ctx,
s.logger,
s.eval.ResolveObjectValue,
req.Header(),

View File

@ -1,6 +1,7 @@
package ofrep
import (
"context"
"encoding/json"
"fmt"
"net/http"
@ -10,6 +11,8 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/service/ofrep"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/flagd/pkg/service"
"github.com/rs/xid"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
@ -64,9 +67,12 @@ func (h *handler) HandleFlagEvaluation(w http.ResponseWriter, r *http.Request) {
h.writeJSONToResponse(http.StatusBadRequest, ofrep.ContextErrorResponseFrom(flagKey), w)
return
}
context := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
evaluationContext := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
selectorExpression := r.Header.Get(service.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx := context.WithValue(r.Context(), store.SelectorContextKey{}, selector)
evaluation := h.evaluator.ResolveAsAnyValue(r.Context(), requestID, flagKey, context)
evaluation := h.evaluator.ResolveAsAnyValue(ctx, requestID, flagKey, evaluationContext)
if evaluation.Error != nil {
status, evaluationError := ofrep.EvaluationErrorResponseFrom(evaluation)
h.writeJSONToResponse(status, evaluationError, w)
@ -85,9 +91,12 @@ func (h *handler) HandleBulkEvaluation(w http.ResponseWriter, r *http.Request) {
return
}
context := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
evaluationContext := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
selectorExpression := r.Header.Get(service.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx := context.WithValue(r.Context(), store.SelectorContextKey{}, selector)
evaluations, metadata, err := h.evaluator.ResolveAllValues(r.Context(), requestID, context)
evaluations, metadata, err := h.evaluator.ResolveAllValues(ctx, requestID, evaluationContext)
if err != nil {
h.Logger.WarnWithID(requestID, fmt.Sprintf("error from resolver: %v", err))
@ -127,7 +136,7 @@ func extractOfrepRequest(req *http.Request) (ofrep.Request, error) {
return request, nil
}
// flagdContext returns combined context values from headers, static context (from cli) and request context.
// flagdContext returns combined context values from headers, static context (from cli) and request context.
// highest priority > header-context-from-cli > static-context-from-cli > request-context > lowest priority
func flagdContext(
log *logger.Logger, requestID string, request ofrep.Request, staticContextValues map[string]any, headers http.Header, headerToContextKeyMappings map[string]string,
@ -152,4 +161,4 @@ func flagdContext(
}
return context
}
}

View File

@ -2,73 +2,74 @@ package sync
import (
"context"
"encoding/json"
"errors"
"fmt"
"maps"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"time"
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/store"
"google.golang.org/protobuf/types/known/structpb"
)
// syncHandler implements the sync contract
type syncHandler struct {
mux *Multiplexer
log *logger.Logger
contextValues map[string]any
deadline time.Duration
//mux *Multiplexer
store *store.Store
log *logger.Logger
contextValues map[string]any
deadline time.Duration
disableSyncMetadata bool
}
func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.FlagSyncService_SyncFlagsServer) error {
muxPayload := make(chan payload, 1)
selector := req.GetSelector()
watcher := make(chan store.FlagQueryResult, 1)
selectorExpression := req.GetSelector()
selector := store.NewSelector(selectorExpression)
ctx := server.Context()
syncContextMap := make(map[string]any)
maps.Copy(syncContextMap, s.contextValues)
syncContext, err := structpb.NewStruct(syncContextMap)
if err != nil {
return status.Error(codes.DataLoss, "error constructing sync context")
}
// attach server-side stream deadline to context
if s.deadline != 0 {
streamDeadline := time.Now().Add(s.deadline)
deadlineCtx, cancel := context.WithDeadline(server.Context(), streamDeadline)
deadlineCtx, cancel := context.WithDeadline(ctx, streamDeadline)
ctx = deadlineCtx
defer cancel()
}
err := s.mux.Register(ctx, selector, muxPayload)
if err != nil {
return err
}
s.store.Watch(ctx, &selector, watcher)
for {
select {
case payload := <-muxPayload:
metadataSrc := make(map[string]any)
maps.Copy(metadataSrc, s.contextValues)
if sources := s.mux.SourcesAsMetadata(); sources != "" {
metadataSrc["sources"] = sources
}
metadata, err := structpb.NewStruct(metadataSrc)
case payload := <-watcher:
if err != nil {
s.log.Error(fmt.Sprintf("error from struct creation: %v", err))
return fmt.Errorf("error constructing metadata response")
}
flags, err := json.Marshal(payload.Flags)
if err != nil {
s.log.Error(fmt.Sprintf("error retrieving flags from store: %v", err))
return status.Error(codes.DataLoss, "error marshalling flags")
}
err = server.Send(&syncv1.SyncFlagsResponse{
FlagConfiguration: payload.flags,
SyncContext: metadata,
})
err = server.Send(&syncv1.SyncFlagsResponse{FlagConfiguration: string(flags), SyncContext: syncContext})
if err != nil {
s.log.Debug(fmt.Sprintf("error sending stream response: %v", err))
return fmt.Errorf("error sending stream response: %w", err)
}
case <-ctx.Done():
s.mux.Unregister(ctx, selector)
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
s.log.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
return status.Error(codes.DeadlineExceeded, "stream closed due to server-side timeout")
@ -79,16 +80,25 @@ func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.F
}
}
func (s syncHandler) FetchAllFlags(_ context.Context, req *syncv1.FetchAllFlagsRequest) (
func (s syncHandler) FetchAllFlags(ctx context.Context, req *syncv1.FetchAllFlagsRequest) (
*syncv1.FetchAllFlagsResponse, error,
) {
flags, err := s.mux.GetAllFlags(req.GetSelector())
selectorExpression := req.GetSelector()
selector := store.NewSelector(selectorExpression)
flags, _, err := s.store.GetAll(ctx, &selector)
if err != nil {
s.log.Error(fmt.Sprintf("error retrieving flags from store: %v", err))
return nil, status.Error(codes.Internal, "error retrieving flags from store")
}
flagsString, err := json.Marshal(flags)
if err != nil {
return nil, err
}
return &syncv1.FetchAllFlagsResponse{
FlagConfiguration: flags,
FlagConfiguration: string(flagsString),
}, nil
}
@ -97,13 +107,13 @@ func (s syncHandler) FetchAllFlags(_ context.Context, req *syncv1.FetchAllFlagsR
func (s syncHandler) GetMetadata(_ context.Context, _ *syncv1.GetMetadataRequest) (
*syncv1.GetMetadataResponse, error,
) {
if s.disableSyncMetadata {
return nil, status.Error(codes.Unimplemented, "metadata endpoint disabled")
}
metadataSrc := make(map[string]any)
for k, v := range s.contextValues {
metadataSrc[k] = v
}
if sources := s.mux.SourcesAsMetadata(); sources != "" {
metadataSrc["sources"] = sources
}
metadata, err := structpb.NewStruct(metadataSrc)
if err != nil {

View File

@ -22,21 +22,18 @@ func TestSyncHandler_SyncFlags(t *testing.T) {
wantMetadata map[string]any
}{
{
name: "with sources and context",
sources: []string{"A, B, C"},
name: "with sources and context",
contextValues: map[string]any{
"env": "prod",
"region": "us-west",
},
wantMetadata: map[string]any{
"sources": "A, B, C",
"env": "prod",
"region": "us-west",
"env": "prod",
"region": "us-west",
},
},
{
name: "with empty sources",
sources: []string{},
name: "with empty sources",
contextValues: map[string]any{
"env": "dev",
},
@ -46,62 +43,59 @@ func TestSyncHandler_SyncFlags(t *testing.T) {
},
{
name: "with empty context",
sources: []string{"A,B,C"},
contextValues: map[string]any{},
wantMetadata: map[string]any{
"sources": "A,B,C",
},
wantMetadata: map[string]any{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Shared handler for testing both GetMetadata & SyncFlags methods
flagStore := store.NewFlags()
mp, err := NewMux(flagStore, tt.sources)
require.NoError(t, err)
for _, disableSyncMetadata := range []bool{true, false} {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Shared handler for testing both GetMetadata & SyncFlags methods
flagStore, err := store.NewStore(logger.NewLogger(nil, false), tt.sources)
require.NoError(t, err)
handler := syncHandler{
mux: mp,
contextValues: tt.contextValues,
log: logger.NewLogger(nil, false),
}
handler := syncHandler{
store: flagStore,
contextValues: tt.contextValues,
log: logger.NewLogger(nil, false),
disableSyncMetadata: disableSyncMetadata,
}
// Test getting metadata from `GetMetadata` (deprecated)
// remove when `GetMetadata` is full removed and deprecated
metaResp, err := handler.GetMetadata(context.Background(), &syncv1.GetMetadataRequest{})
require.NoError(t, err)
respMetadata := metaResp.GetMetadata().AsMap()
assert.Equal(t, tt.wantMetadata, respMetadata)
// Test metadata from sync_context
stream := &mockSyncFlagsServer{
ctx: context.Background(),
mu: sync.Mutex{},
respReady: make(chan struct{}, 1),
}
go func() {
err := handler.SyncFlags(&syncv1.SyncFlagsRequest{}, stream)
assert.NoError(t, err)
}()
select {
case <-stream.respReady:
syncResp := stream.GetLastResponse()
assert.NotNil(t, syncResp)
syncMetadata := syncResp.GetSyncContext().AsMap()
assert.Equal(t, tt.wantMetadata, syncMetadata)
// Check the two metadatas are equal
// Test getting metadata from `GetMetadata` (deprecated)
// remove when `GetMetadata` is full removed and deprecated
assert.Equal(t, respMetadata, syncMetadata)
case <-time.After(time.Second):
t.Fatal("timeout waiting for response")
}
metaResp, err := handler.GetMetadata(context.Background(), &syncv1.GetMetadataRequest{})
if !disableSyncMetadata {
require.NoError(t, err)
respMetadata := metaResp.GetMetadata().AsMap()
assert.Equal(t, tt.wantMetadata, respMetadata)
} else {
assert.NotNil(t, err)
}
})
// Test metadata from sync_context
stream := &mockSyncFlagsServer{
ctx: context.Background(),
mu: sync.Mutex{},
respReady: make(chan struct{}, 1),
}
go func() {
err := handler.SyncFlags(&syncv1.SyncFlagsRequest{}, stream)
assert.NoError(t, err)
}()
select {
case <-stream.respReady:
syncResp := stream.GetLastResponse()
assert.NotNil(t, syncResp)
syncMetadata := syncResp.GetSyncContext().AsMap()
assert.Equal(t, tt.wantMetadata, syncMetadata)
case <-time.After(time.Second):
t.Fatal("timeout waiting for response")
}
})
}
}
}

View File

@ -1,213 +0,0 @@
package sync
import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"
"sync"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/store"
)
//nolint:errchkjson
var emptyConfigBytes, _ = json.Marshal(map[string]map[string]string{
"flags": {},
})
// Multiplexer abstract subscription handling and storage processing.
// Flag configurations will be lazy loaded using reFill logic upon the calls to publish.
type Multiplexer struct {
store *store.State
sources []string
subs map[interface{}]subscription // subscriptions on all sources
selectorSubs map[string]map[interface{}]subscription // source specific subscriptions
allFlags string // pre-calculated all flags in store as a string
selectorFlags map[string]string // pre-calculated selector scoped flags in store as strings
mu sync.RWMutex
}
type subscription struct {
id interface{}
channel chan payload
}
type payload struct {
flags string
}
// NewMux creates a new sync multiplexer
func NewMux(store *store.State, sources []string) (*Multiplexer, error) {
m := &Multiplexer{
store: store,
sources: sources,
subs: map[interface{}]subscription{},
selectorSubs: map[string]map[interface{}]subscription{},
selectorFlags: map[string]string{},
}
return m, m.reFill()
}
// Register a subscription
func (r *Multiplexer) Register(id interface{}, source string, con chan payload) error {
r.mu.Lock()
defer r.mu.Unlock()
if source != "" && !slices.Contains(r.sources, source) {
return fmt.Errorf("no flag watcher setup for source %s", source)
}
var initSync string
if source == "" {
// subscribe for flags from all source
r.subs[id] = subscription{
id: id,
channel: con,
}
initSync = r.allFlags
} else {
// subscribe for specific source
s, ok := r.selectorSubs[source]
if ok {
s[id] = subscription{
id: id,
channel: con,
}
} else {
r.selectorSubs[source] = map[interface{}]subscription{
id: {
id: id,
channel: con,
},
}
}
initSync = r.selectorFlags[source]
}
// Initial sync
con <- payload{flags: initSync}
return nil
}
// Publish sync updates to subscriptions
func (r *Multiplexer) Publish() error {
r.mu.Lock()
defer r.mu.Unlock()
// perform a refill prior to publishing
err := r.reFill()
if err != nil {
return err
}
// push to all source subs
for _, sub := range r.subs {
sub.channel <- payload{r.allFlags}
}
// push to selector subs
for source, flags := range r.selectorFlags {
for _, s := range r.selectorSubs[source] {
s.channel <- payload{flags}
}
}
return nil
}
// Unregister a subscription
func (r *Multiplexer) Unregister(id interface{}, selector string) {
r.mu.Lock()
defer r.mu.Unlock()
var from map[interface{}]subscription
if selector == "" {
from = r.subs
} else {
from = r.selectorSubs[selector]
}
delete(from, id)
}
// GetAllFlags per specific source
func (r *Multiplexer) GetAllFlags(source string) (string, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if source == "" {
return r.allFlags, nil
}
if !slices.Contains(r.sources, source) {
return "", fmt.Errorf("no flag watcher setup for source %s", source)
}
return r.selectorFlags[source], nil
}
// SourcesAsMetadata returns all known sources, comma separated to be used as service metadata
func (r *Multiplexer) SourcesAsMetadata() string {
r.mu.RLock()
defer r.mu.RUnlock()
return strings.Join(r.sources, ",")
}
// reFill local configuration values
func (r *Multiplexer) reFill() error {
clear(r.selectorFlags)
// start all sources with empty config
for _, source := range r.sources {
r.selectorFlags[source] = string(emptyConfigBytes)
}
all, metadata, err := r.store.GetAll(context.Background())
if err != nil {
return fmt.Errorf("error retrieving flags from the store: %w", err)
}
bytes, err := json.Marshal(map[string]interface{}{"flags": all, "metadata": metadata})
if err != nil {
return fmt.Errorf("error marshalling: %w", err)
}
r.allFlags = string(bytes)
collector := map[string]map[string]model.Flag{}
for key, flag := range all {
c, ok := collector[flag.Source]
if ok {
c[key] = flag
} else {
collector[flag.Source] = map[string]model.Flag{
key: flag,
}
}
}
// for all flags, sort them into their correct selector
for source, flags := range collector {
// store the corresponding metadata
metadata := r.store.GetMetadataForSource(source)
bytes, err := json.Marshal(map[string]interface{}{"flags": flags, "metadata": metadata})
if err != nil {
return fmt.Errorf("unable to marshal flags: %w", err)
}
r.selectorFlags[source] = string(bytes)
}
return nil
}

View File

@ -1,261 +0,0 @@
package sync
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
const emptyConfigString = "{\"flags\":{}}"
func TestRegistration(t *testing.T) {
// given
mux, err := NewMux(getSimpleFlagStore())
if err != nil {
t.Fatal("error during flag extraction")
return
}
tests := []struct {
testName string
id interface{}
source string
flagStringValidator func(flagString string, testSource string, testName string)
connection chan payload
expectError bool
}{
{
testName: "subscribe to all flags",
id: context.Background(),
connection: make(chan payload, 1),
},
{
testName: "subscribe to source A",
id: context.Background(),
source: "A",
flagStringValidator: func(flagString string, testSource string, testName string) {
assert.Contains(t, flagString, fmt.Sprintf("\"source\":\"%s\"", testSource))
},
connection: make(chan payload, 1),
},
{
testName: "subscribe to source B",
id: context.Background(),
source: "B",
flagStringValidator: func(flagString string, testSource string, testName string) {
assert.Contains(t, flagString, fmt.Sprintf("\"source\":\"%s\"", testSource))
},
connection: make(chan payload, 1),
},
{
testName: "subscribe to empty",
id: context.Background(),
source: "C",
connection: make(chan payload, 1),
flagStringValidator: func(flagString string, testSource string, testName string) {
assert.Equal(t, flagString, emptyConfigString)
},
expectError: false,
},
{
testName: "subscribe to non-existing",
id: context.Background(),
source: "D",
connection: make(chan payload, 1),
expectError: true,
},
}
// validate registration
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
// when
err := mux.Register(test.id, test.source, test.connection)
// then
if !test.expectError && err != nil {
t.Fatal("expected no errors, but got error")
}
if test.expectError && err != nil {
// pass
return
}
// validate subscription
var initSync payload
select {
case <-time.After(2 * time.Second):
t.Fatal("data sync did not complete for initial sync within an acceptable timeframe")
case initSync = <-test.connection:
break
}
if test.flagStringValidator != nil {
test.flagStringValidator(initSync.flags, test.source, test.testName)
}
})
}
}
func TestUpdateAndRemoval(t *testing.T) {
// given
mux, err := NewMux(getSimpleFlagStore())
if err != nil {
t.Fatal("error during flag extraction")
return
}
identifier := context.Background()
channel := make(chan payload, 1)
err = mux.Register(identifier, "", channel)
if err != nil {
t.Fatal("error during subscription registration")
return
}
select {
case <-time.After(2 * time.Second):
t.Fatal("data sync did not complete for initial sync within an acceptable timeframe")
case <-channel:
break
}
// when - updates are triggered
err = mux.Publish()
if err != nil {
t.Fatal("failure to trigger update request on multiplexer")
return
}
// then
select {
case <-time.After(2 * time.Second):
t.Fatal("data sync did not complete for initial sync within an acceptable timeframe")
case <-channel:
break
}
// when - subscription removed & update triggered
mux.Unregister(identifier, "")
err = mux.Publish()
if err != nil {
t.Fatal("failure to trigger update request on multiplexer")
return
}
// then
select {
case <-time.After(2 * time.Second):
break
case <-channel:
t.Fatal("expected no sync but got an update as removal was not performed")
}
}
func TestGetAllFlags(t *testing.T) {
// given
mux, err := NewMux(getSimpleFlagStore())
if err != nil {
t.Fatal("error during flag extraction")
return
}
// when - get all with open scope
flagConfig, err := mux.GetAllFlags("")
if err != nil {
t.Fatal("error when retrieving all flags")
return
}
if len(flagConfig) == 0 {
t.Fatal("expected no empty flags")
return
}
// when - get all with a scope
flagConfig, err = mux.GetAllFlags("A")
if err != nil {
t.Fatal("error when retrieving all flags")
return
}
if len(flagConfig) == 0 || !strings.Contains(flagConfig, fmt.Sprintf("\"source\":\"%s\"", "A")) {
t.Fatal("expected flags to be scoped")
return
}
// when - get all for a flagless-scope
flagConfig, err = mux.GetAllFlags("C")
if err != nil {
t.Fatal("error when retrieving all flags")
return
}
assert.Equal(t, flagConfig, emptyConfigString)
}
func TestGetAllFlagsMetadata(t *testing.T) {
// given
mux, err := NewMux(getSimpleFlagStore())
if err != nil {
t.Fatal("error during flag extraction")
return
}
// when - get all with open scope
flagConfig, err := mux.GetAllFlags("")
if err != nil {
t.Fatal("error when retrieving all flags")
return
}
if len(flagConfig) == 0 {
t.Fatal("expected no empty flags")
return
}
if !strings.Contains(flagConfig, "\"keyA\":\"valueA\"") {
t.Fatal("expected unique metadata key for A to be present")
return
}
if !strings.Contains(flagConfig, "\"keyB\":\"valueB\"") {
t.Fatal("expected unique metadata key for B to be present")
return
}
// duplicated keys are removed
if strings.Contains(flagConfig, "\"keyDuped\":\"value\"") {
t.Fatal("expected duplicated metadata key NOT to be present")
return
}
// when - get all with a scope
flagConfig, err = mux.GetAllFlags("A")
if err != nil {
t.Fatal("error when retrieving all flags")
return
}
if len(flagConfig) == 0 {
t.Fatal("expected no empty flags")
return
}
if !strings.Contains(flagConfig, "\"keyA\":\"valueA\"") {
t.Fatal("expected unique metadata key to be present")
return
}
if !strings.Contains(flagConfig, "\"keyDuped\":\"value\"") {
t.Fatal("expected duplicated metadata key to be present")
return
}
}

View File

@ -21,25 +21,25 @@ type ISyncService interface {
Start(context.Context) error
// Emit updates for sync listeners
Emit(isResync bool, source string)
Emit(source string)
}
type SvcConfigurations struct {
Logger *logger.Logger
Port uint16
Sources []string
Store *store.State
ContextValues map[string]any
CertPath string
KeyPath string
SocketPath string
StreamDeadline time.Duration
Logger *logger.Logger
Port uint16
Sources []string
Store *store.Store
ContextValues map[string]any
CertPath string
KeyPath string
SocketPath string
StreamDeadline time.Duration
DisableSyncMetadata bool
}
type Service struct {
listener net.Listener
logger *logger.Logger
mux *Multiplexer
server *grpc.Server
startupTracker syncTracker
@ -65,7 +65,6 @@ func loadTLSCredentials(certPath string, keyPath string) (credentials.TransportC
func NewSyncService(cfg SvcConfigurations) (*Service, error) {
var err error
l := cfg.Logger
mux, err := NewMux(cfg.Store, cfg.Sources)
if err != nil {
return nil, fmt.Errorf("error initializing multiplexer: %w", err)
}
@ -82,10 +81,11 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
}
syncv1grpc.RegisterFlagSyncServiceServer(server, &syncHandler{
mux: mux,
log: l,
contextValues: cfg.ContextValues,
deadline: cfg.StreamDeadline,
store: cfg.Store,
log: l,
contextValues: cfg.ContextValues,
deadline: cfg.StreamDeadline,
disableSyncMetadata: cfg.DisableSyncMetadata,
})
var lis net.Listener
@ -103,7 +103,6 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
return &Service{
listener: lis,
logger: l,
mux: mux,
server: server,
startupTracker: syncTracker{
sources: slices.Clone(cfg.Sources),
@ -149,16 +148,8 @@ func (s *Service) Start(ctx context.Context) error {
return nil
}
func (s *Service) Emit(isResync bool, source string) {
func (s *Service) Emit(source string) {
s.startupTracker.trackAndRemove(source)
if !isResync {
err := s.mux.Publish()
if err != nil {
s.logger.Warn(fmt.Sprintf("error while publishing sync streams: %v", err))
return
}
}
}
func (s *Service) shutdown() {

View File

@ -3,16 +3,16 @@ package sync
import (
"context"
"fmt"
"github.com/open-feature/flagd/core/pkg/store"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
"reflect"
"sort"
"strings"
"testing"
"time"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"github.com/open-feature/flagd/core/pkg/logger"
@ -36,143 +36,129 @@ func TestSyncServiceEndToEnd(t *testing.T) {
{title: "with unix socket connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "/tmp/flagd", tls: false, wantStartErr: false},
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
// given
port := 18016
flagStore, sources := getSimpleFlagStore()
for _, disableSyncMetadata := range []bool{true, false} {
for _, tc := range testCases {
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
// given
port := 18016
flagStore, sources := getSimpleFlagStore(t)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
service, doneChan, err := createAndStartSyncService(port, sources, flagStore, tc.certPath, tc.keyPath, tc.socketPath, ctx, 0)
_, doneChan, err := createAndStartSyncService(
port,
sources,
flagStore,
tc.certPath,
tc.keyPath,
tc.socketPath,
ctx,
0,
disableSyncMetadata,
)
if tc.wantStartErr {
if err == nil {
t.Fatal("expected error creating the service!")
}
return
} else if err != nil {
t.Fatal("unexpected error creating the service: %w", err)
return
}
// when - derive a client for sync service
serviceClient := getSyncClient(t, tc.clientCertPath, tc.socketPath, tc.tls, port, ctx)
// then
// sync flags request
flags, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{})
if err != nil {
t.Fatal(fmt.Printf("error from sync request: %v", err))
return
}
syncRsp, err := flags.Recv()
if err != nil {
t.Fatal(fmt.Printf("stream error: %v", err))
return
}
if len(syncRsp.GetFlagConfiguration()) == 0 {
t.Error("expected non empty sync response, but got empty")
}
// checks sync context actually set
syncContext := syncRsp.GetSyncContext()
if syncContext == nil {
t.Fatal("expected sync_context in SyncFlagsResponse, but got nil")
}
syncAsMap := syncContext.AsMap()
if syncAsMap["sources"] == nil {
t.Fatalf("expected sources in sync_context, but got nil")
}
sourcesStr := syncAsMap["sources"].(string)
sourcesArray := strings.Split(sourcesStr, ",")
sort.Strings(sourcesArray)
expectedSources := []string{"A", "B", "C"}
if !reflect.DeepEqual(sourcesArray, expectedSources) {
t.Fatalf("sources entry in sync_context does not match expected: got %v, want %v", sourcesArray, expectedSources)
}
// validate emits
dataReceived := make(chan interface{})
go func() {
_, err := flags.Recv()
if err != nil {
if tc.wantStartErr {
if err == nil {
t.Fatal("expected error creating the service!")
}
return
} else if err != nil {
t.Fatal("unexpected error creating the service: %w", err)
return
}
dataReceived <- nil
}()
// when - derive a client for sync service
serviceClient := getSyncClient(t, tc.clientCertPath, tc.socketPath, tc.tls, port, ctx)
// Emit as a resync
service.Emit(true, "A")
// then
select {
case <-dataReceived:
t.Fatal("expected no data as this is a resync")
case <-time.After(1 * time.Second):
break
}
// sync flags request
flags, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{})
if err != nil {
t.Fatal(fmt.Printf("error from sync request: %v", err))
return
}
// Emit as a resync
service.Emit(false, "A")
syncRsp, err := flags.Recv()
if err != nil {
t.Fatal(fmt.Printf("stream error: %v", err))
return
}
select {
case <-dataReceived:
break
case <-time.After(1 * time.Second):
t.Fatal("expected data but timeout waiting for sync")
}
if len(syncRsp.GetFlagConfiguration()) == 0 {
t.Error("expected non empty sync response, but got empty")
}
// fetch all flags
allRsp, err := serviceClient.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{})
if err != nil {
t.Fatal(fmt.Printf("fetch all error: %v", err))
return
}
// checks sync context actually set
syncContext := syncRsp.GetSyncContext()
if syncContext == nil {
t.Fatal("expected sync_context in SyncFlagsResponse, but got nil")
}
if allRsp.GetFlagConfiguration() != syncRsp.GetFlagConfiguration() {
t.Errorf("expected both sync and fetch all responses to be same, but got %s from sync & %s from fetch all",
syncRsp.GetFlagConfiguration(), allRsp.GetFlagConfiguration())
}
// validate emits
dataReceived := make(chan interface{})
go func() {
_, err := flags.Recv()
if err != nil {
return
}
// metadata request
metadataRsp, err := serviceClient.GetMetadata(ctx, &v1.GetMetadataRequest{})
if err != nil {
t.Fatal(fmt.Printf("metadata error: %v", err))
return
}
dataReceived <- nil
}()
asMap := metadataRsp.GetMetadata().AsMap()
// make a change
flagStore.Update(testSource1, testSource1Flags, model.Metadata{
"keyDuped": "value",
"keyA": "valueA",
})
// expect `sources` to be present
if asMap["sources"] == nil {
t.Fatal("expected sources entry in the metadata, but got nil")
}
select {
case <-dataReceived:
break
case <-time.After(1 * time.Second):
t.Fatal("expected data but timeout waiting for sync")
}
if asMap["sources"] != "A,B,C" {
t.Fatal("incorrect sources entry in metadata")
}
// fetch all flags
allRsp, err := serviceClient.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{})
if err != nil {
t.Fatal(fmt.Printf("fetch all error: %v", err))
return
}
// validate shutdown from context cancellation
go func() {
cancelFunc()
}()
if allRsp.GetFlagConfiguration() != syncRsp.GetFlagConfiguration() {
t.Errorf("expected both sync and fetch all responses to be same, but got %s from sync & %s from fetch all",
syncRsp.GetFlagConfiguration(), allRsp.GetFlagConfiguration())
}
select {
case <-doneChan:
// exit successful
return
case <-time.After(2 * time.Second):
t.Fatal("service did not exist within sufficient timeframe")
}
})
// metadata request
metadataRsp, err := serviceClient.GetMetadata(ctx, &v1.GetMetadataRequest{})
if disableSyncMetadata {
if err == nil {
t.Fatal(fmt.Printf("getMetadata disabled, error should not be nil"))
return
}
} else {
asMap := metadataRsp.GetMetadata().AsMap()
assert.NotNil(t, asMap, "expected metadata to be non-nil")
}
// validate shutdown from context cancellation
go func() {
cancelFunc()
}()
select {
case <-doneChan:
// exit successful
return
case <-time.After(2 * time.Second):
t.Fatal("service did not exist within sufficient timeframe")
}
})
}
}
}
@ -190,7 +176,7 @@ func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
// given
port := 18016
flagStore, sources := getSimpleFlagStore()
flagStore, sources := getSimpleFlagStore(t)
certPath := "./test-cert/server-cert.pem"
keyPath := "./test-cert/server-key.pem"
socketPath := ""
@ -198,7 +184,7 @@ func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
_, _, err := createAndStartSyncService(port, sources, flagStore, certPath, keyPath, socketPath, ctx, tc.deadline)
_, _, err := createAndStartSyncService(port, sources, flagStore, certPath, keyPath, socketPath, ctx, tc.deadline, false)
if err != nil {
t.Fatal("error creating sync service")
}
@ -256,16 +242,27 @@ func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
}
}
func createAndStartSyncService(port int, sources []string, store *store.State, certPath string, keyPath string, socketPath string, ctx context.Context, deadline time.Duration) (*Service, chan interface{}, error) {
func createAndStartSyncService(
port int,
sources []string,
store *store.Store,
certPath string,
keyPath string,
socketPath string,
ctx context.Context,
deadline time.Duration,
disableSyncMetadata bool,
) (*Service, chan interface{}, error) {
service, err := NewSyncService(SvcConfigurations{
Logger: logger.NewLogger(nil, false),
Port: uint16(port),
Sources: sources,
Store: store,
CertPath: certPath,
KeyPath: keyPath,
SocketPath: socketPath,
StreamDeadline: deadline,
Logger: logger.NewLogger(nil, false),
Port: uint16(port),
Sources: sources,
Store: store,
CertPath: certPath,
KeyPath: keyPath,
SocketPath: socketPath,
StreamDeadline: deadline,
DisableSyncMetadata: disableSyncMetadata,
})
if err != nil {
return nil, nil, err
@ -279,7 +276,7 @@ func createAndStartSyncService(port int, sources []string, store *store.State, c
}()
// trigger manual emits matching sources, so that service can start
for _, source := range sources {
service.Emit(false, source)
service.Emit(source)
}
return service, doneChan, err
}

View File

@ -5,46 +5,57 @@ import (
"crypto/x509"
"fmt"
"os"
"testing"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/store"
"google.golang.org/grpc/credentials"
)
// getSimpleFlagStore returns a flag store pre-filled with flags from sources A & B & C, which C empty
func getSimpleFlagStore() (*store.State, []string) {
variants := map[string]any{
"true": true,
"false": false,
}
flagStore := store.NewFlags()
flagStore.Set("flagA", model.Flag{
var testSource1 = "testSource1"
var testSource2 = "testSource2"
var testVariants = map[string]any{
"true": true,
"false": false,
}
var testSource1Flags = map[string]model.Flag{
"flagA": {
State: "ENABLED",
DefaultVariant: "false",
Variants: variants,
Source: "A",
})
flagStore.Set("flagB", model.Flag{
Variants: testVariants,
},
}
var testSource2Flags = map[string]model.Flag{
"flagB": {
State: "ENABLED",
DefaultVariant: "true",
Variants: variants,
Source: "B",
})
Variants: testVariants,
},
}
flagStore.MetadataPerSource["A"] = model.Metadata{
// getSimpleFlagStore is a test util which returns a flag store pre-filled with flags from sources testSource1 and testSource2.
func getSimpleFlagStore(t testing.TB) (*store.Store, []string) {
t.Helper()
sources := []string{testSource1, testSource2}
flagStore, err := store.NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("error creating flag store: %v", err)
}
flagStore.Update(testSource1, testSource1Flags, model.Metadata{
"keyDuped": "value",
"keyA": "valueA",
}
})
flagStore.MetadataPerSource["B"] = model.Metadata{
flagStore.Update(testSource2, testSource2Flags, model.Metadata{
"keyDuped": "value",
"keyB": "valueB",
}
})
return flagStore, []string{"A", "B", "C"}
return flagStore, sources
}
func loadTLSClientCredentials(certPath string) (credentials.TransportCredentials, error) {

View File

@ -16,13 +16,13 @@ make build
then run the `flagd` binary
```shell
./bin/flagd start -f file:test-harness/symlink_testing-flags.json
make flagd-integration-test-harness
```
and finally run
```shell
make integration-test
make flagd-integration-test
```
## TLS