Compare commits

...

16 Commits

Author SHA1 Message Date
Todd Baert 52f344fa01
chore: fix CODEOWNERS
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-08-18 11:34:22 -04:00
Todd Baert f9ce46f103
feat: multi-project support via selectors and flagSetId namespacing (#1702)
Sorry for how big this PR seems (a lot of the change lines are not
significant (DB schema or tests), so I've tried to highlight the
important parts. This is a substantial PR that builds on our recent
changes to use go-memdb for storage. It primarily supports 2 new crucial
features:

- support for duplicate flag keys from multiple sources (namespaced by
`flagSetId`) as described in [this
ADR](https://github.com/open-feature/flagd/blob/main/docs/architecture-decisions/duplicate-flag-keys.md)
- support for a robust query syntax in the "selector" fields and
headers, as proposed by @tangenti in [this
ADR](https://github.com/open-feature/flagd/blob/main/docs/architecture-decisions/decouple-flag-source-and-set.md)

Both of these were accomplished using the new go-memdb module. **The
built-in "watcher" functionality also allows us to _completely delete_
our "mux" layer, which was responsible for fanning out changes from
sync-sources to listeners**; this is something the go-memdb module
providers for free (the ability to watch a query for changes). Now, we
completely rely on this feature for all change notifications.
Additionally, unlike before, change notifications are now scoped to
particular _selectors_ (ie: if a client is only interested in changes
for flags from `flagSetId: x` or `source: y`, they will only get change
notifications pertaining to that selection. Currently, the only
supported query fields for the `selector` are `"source"` and
`"flagSetId"`, but this functionality can easily be extended. By
default, if no `selector` is specified, the previous key-overwrite by
source priority apples (this logic has also been simplified using the
new database). Most of the new functionality is tested
[here](https://github.com/open-feature/flagd/pull/1702/files#diff-0cdd4fe716f4b8a94466279fd1b11187fcf4d74e6434727c33d57ed78c89fe27R163-R478).

Selector in action for `Sync` API:


![demo](https://github.com/user-attachments/assets/9a4fd33e-80ef-4b5b-80d1-4fa3fc92b8e7)

Selector in action for `Evaluation` API:


![demo2](https://github.com/user-attachments/assets/abf49f9c-2247-4a03-9b8f-15b6f540aad1)

To test, run the new make target `make run-flagd-selector-demo`, then
use the OFREP or gRPC endpoints to experiment. This new functionality is
available on all APIs and endpoints. The command I ran in the gifs above
are:

streaming:
```shell
grpcurl -d '{"selector":"flagSetId=example"}' -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
grpcurl -d '{"selector":"flagSetId=example,source=../config/samples/example_flags.flagd.json"}' -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
grpcurl -d '{"selector":"flagSetId=other"}' -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
```

single-evaluations:
```shell
curl -X POST  -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags' | jq
curl -X POST -H 'flagd-selector:flagSetId=other'  -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags' | jq
```

⚠️ There's no breaking changes here. Besides the new features,
there is one behavioral change - the top level "metadata" object
returned for bulk evaluations (and failed evaluations) was previously
very nonsensical in it's behavior (we basically just aggregated the
metadata from all sources, discarding duplicates, and sent it back. This
field was used by providers for telemetry purposes. Now, since flag
evaluations and subscripts are "query based" and can aggregate data from
multiple sources, we've opted to simply reflect the selector queries
contents here.

So if you used a selector like `"flagSetId=1234,source=../my/source"`,
the top-level metadata object in the response would be:

```
"metadata": {
  "flagSetId": 1234,
  "source": "../my/source"
}
```

This is useful for the provider's ability to report errors, etc in
telemetry.

Fixes: https://github.com/open-feature/flagd/issues/1675
Fixes: https://github.com/open-feature/flagd/issues/1695
Fixes: https://github.com/open-feature/flagd/issues/1611
Fixes: https://github.com/open-feature/flagd/issues/1700
Fixes: https://github.com/open-feature/flagd/issues/1610

---------

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
Co-authored-by: chrfwow <christian.lutnik@dynatrace.com>
Co-authored-by: Giovanni Liva <giovanni.liva@dynatrace.com>
2025-08-13 16:03:07 -04:00
NeaguGeorgiana23 3228ad8951
chore!: removes the `fractionalEvaluation` operator since it has been replaced with `fractional`. (#1704)
## This PR
<!-- add the description of the PR here -->

- adds this new feature

### Related Issues
- removes the `fractionalEvaluation` operator since it has been replaced
with `fractional`.

Fixes #1677

---------

Signed-off-by: NeaguGeorgiana23 <115723925+NeaguGeorgiana23@users.noreply.github.com>
2025-08-06 17:50:43 -04:00
Todd Baert 5c5c1cfe84
chore(refactor): use memdb for flag storage (#1697)
This PR contains no behavioral changes, and no breaking changes.

It lays the groundwork for some of the upcoming improvements we want in
flagd, as specified in recent ADRs. It does this by re-implementing our
storage layer to use [go-memdb](https://github.com/hashicorp/go-memdb),
an open source in-memory database developed by Hashicorp and used in
[Consul](https://developer.hashicorp.com/consul) (as well as MANY other
projects).

### Why?

This will allow us to easily support:

- duplicate flag keys (namespaced by flagSetId), as mentioned in [this
ADR](https://github.com/open-feature/flagd/blob/main/docs/architecture-decisions/duplicate-flag-keys.md)
- robust flag selectors, as mentioned in [this
ADR](https://github.com/open-feature/flagd/pull/1644), by supporting
"watchers" which allow us to "listen" to change in flags returned by a
given query 🕺
- robust (and possibly, in the future, even customizable) indexing on
arbitrary attributes (to easily support fetching "all boolean flags", or
"flags with metadata = xyz", etc)
- cross-"table" transactions

I have already PoC'd that these are all practical.

### Changes in implementation

- the `store` package's `State` is no longer just a struct; it's a
object with methods wrapping the internal db
- the `store` package's `State` was renamed to `Store` and the file was
renamed from `flags.go` to `store.go`, since it's ceased to be a simple
stateful object, and for consistency
- a non-serialized (used only internally) `Key` field was added to the
flag type (for indexing)
- a new constructor for the `Store` was added which takes a logger and
returns an error, the old was deprecated to avoid breaking changes in
consumers (the Go flagd provider, mostly)

Note that the go-memdb dependency is MPL2, which is not allowed by the
CNCF, however, go-memdb is already used in CNCF projects and has a
[special
exception](347a55dc0a/license-exceptions/cncf-exceptions-2023-08-31.json (L7-L11)).

### Perfromance

There was no significant change in performance, see benchmark diff vs
main below:

<details>
  <summary>Benchmark diff vs main</summary>

```diff
-BenchmarkFractionalEvaluation/test_c@faas.com-16         	  559051	     13832 ns/op	    7229 B/op	     135 allocs/op
-BenchmarkFractionalEvaluation/test_d@faas.com-16         	  611665	     13106 ns/op	    7229 B/op	     135 allocs/op
-BenchmarkFractionalEvaluation/test_a@faas.com-16         	  383074	     13433 ns/op	    7229 B/op	     135 allocs/op
-BenchmarkFractionalEvaluation/test_b@faas.com-16         	  529185	     12190 ns/op	    7229 B/op	     135 allocs/op
-BenchmarkResolveBooleanValue/test_staticBoolFlag-16      	 3929409	      1712 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveBooleanValue/test_targetingBoolFlag-16   	  812671	     10276 ns/op	    6065 B/op	      87 allocs/op
-BenchmarkResolveBooleanValue/test_staticObjectFlag-16    	 4398327	      1700 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveBooleanValue/test_missingFlag-16         	 4541409	      1494 ns/op	     784 B/op	      12 allocs/op
-BenchmarkResolveBooleanValue/test_disabledFlag-16        	 2998599	      1815 ns/op	    1072 B/op	      13 allocs/op
-BenchmarkResolveStringValue/test_staticStringFlag-16     	 4378830	      1698 ns/op	    1040 B/op	      13 allocs/op
-BenchmarkResolveStringValue/test_targetingStringFlag-16  	  849668	      9165 ns/op	    6097 B/op	      89 allocs/op
-BenchmarkResolveStringValue/test_staticObjectFlag-16     	 4560192	      1363 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveStringValue/test_missingFlag-16          	 5283511	      1196 ns/op	     784 B/op	      12 allocs/op
-BenchmarkResolveStringValue/test_disabledFlag-16         	 4393116	      1446 ns/op	    1072 B/op	      13 allocs/op
-BenchmarkResolveFloatValue/test:_staticFloatFlag-16      	 4264772	      1501 ns/op	    1024 B/op	      13 allocs/op
-BenchmarkResolveFloatValue/test:_targetingFloatFlag-16   	  776436	      8191 ns/op	    6081 B/op	      89 allocs/op
-BenchmarkResolveFloatValue/test:_staticObjectFlag-16     	 4685841	      1285 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveFloatValue/test:_missingFlag-16          	 5001636	      1376 ns/op	     784 B/op	      12 allocs/op
-BenchmarkResolveFloatValue/test:_disabledFlag-16         	 3707120	      1897 ns/op	    1072 B/op	      13 allocs/op
-BenchmarkResolveIntValue/test_staticIntFlag-16           	 3770362	      1677 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveIntValue/test_targetingNumberFlag-16     	  739861	     11142 ns/op	    6065 B/op	      87 allocs/op
-BenchmarkResolveIntValue/test_staticObjectFlag-16        	 4221418	      1913 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveIntValue/test_missingFlag-16             	 4289516	      1488 ns/op	     768 B/op	      12 allocs/op
-BenchmarkResolveIntValue/test_disabledFlag-16            	 4027533	      1829 ns/op	    1072 B/op	      13 allocs/op
-BenchmarkResolveObjectValue/test_staticObjectFlag-16     	 1588855	      3880 ns/op	    2243 B/op	      37 allocs/op
-BenchmarkResolveObjectValue/test_targetingObjectFlag-16  	  562364	     11580 ns/op	    7283 B/op	     109 allocs/op
-BenchmarkResolveObjectValue/test_staticBoolFlag-16       	 5026976	      1791 ns/op	    1008 B/op	      11 allocs/op
-BenchmarkResolveObjectValue/test_missingFlag-16          	 4254043	      1553 ns/op	     784 B/op	      12 allocs/op
-BenchmarkResolveObjectValue/test_disabledFlag-16         	 3051976	      2250 ns/op	    1072 B/op	      13 allocs/op
+BenchmarkFractionalEvaluation/test_a@faas.com-16         	  478593	     14527 ns/op	    7467 B/op	     143 allocs/op
+BenchmarkFractionalEvaluation/test_b@faas.com-16         	  429560	     14728 ns/op	    7467 B/op	     143 allocs/op
+BenchmarkFractionalEvaluation/test_c@faas.com-16         	  574078	     14230 ns/op	    7467 B/op	     143 allocs/op
+BenchmarkFractionalEvaluation/test_d@faas.com-16         	  411690	     15296 ns/op	    7467 B/op	     143 allocs/op
+BenchmarkResolveBooleanValue/test_staticBoolFlag-16      	 4133443	      1973 ns/op	     960 B/op	      18 allocs/op
+BenchmarkResolveBooleanValue/test_targetingBoolFlag-16   	  822934	     10981 ns/op	    6033 B/op	      94 allocs/op
+BenchmarkResolveBooleanValue/test_staticObjectFlag-16    	 3955728	      1964 ns/op	     976 B/op	      18 allocs/op
+BenchmarkResolveBooleanValue/test_missingFlag-16         	 3068791	      2294 ns/op	    1064 B/op	      21 allocs/op
+BenchmarkResolveBooleanValue/test_disabledFlag-16        	 3500334	      2225 ns/op	    1024 B/op	      20 allocs/op
+BenchmarkResolveStringValue/test_staticStringFlag-16     	 3935048	      1781 ns/op	    1008 B/op	      20 allocs/op
+BenchmarkResolveStringValue/test_targetingStringFlag-16  	  770565	     10765 ns/op	    6065 B/op	      96 allocs/op
+BenchmarkResolveStringValue/test_staticObjectFlag-16     	 3896060	      2084 ns/op	     976 B/op	      18 allocs/op
+BenchmarkResolveStringValue/test_missingFlag-16          	 3103950	      2141 ns/op	    1064 B/op	      21 allocs/op
+BenchmarkResolveStringValue/test_disabledFlag-16         	 3717013	      2092 ns/op	    1024 B/op	      20 allocs/op
+BenchmarkResolveFloatValue/test:_staticFloatFlag-16      	 3971438	      2003 ns/op	     976 B/op	      20 allocs/op
+BenchmarkResolveFloatValue/test:_targetingFloatFlag-16   	  782996	     10153 ns/op	    6049 B/op	      96 allocs/op
+BenchmarkResolveFloatValue/test:_staticObjectFlag-16     	 3469644	      2115 ns/op	     976 B/op	      18 allocs/op
+BenchmarkResolveFloatValue/test:_missingFlag-16          	 3376167	      2157 ns/op	    1064 B/op	      21 allocs/op
+BenchmarkResolveFloatValue/test:_disabledFlag-16         	 3610095	      2032 ns/op	    1024 B/op	      20 allocs/op
+BenchmarkResolveIntValue/test_staticIntFlag-16           	 3883299	      1941 ns/op	     960 B/op	      18 allocs/op
+BenchmarkResolveIntValue/test_targetingNumberFlag-16     	  823038	     10725 ns/op	    6033 B/op	      94 allocs/op
+BenchmarkResolveIntValue/test_staticObjectFlag-16        	 3697454	      2028 ns/op	     976 B/op	      18 allocs/op
+BenchmarkResolveIntValue/test_missingFlag-16             	 3326895	      1986 ns/op	    1048 B/op	      21 allocs/op
+BenchmarkResolveIntValue/test_disabledFlag-16            	 3327046	      2142 ns/op	    1024 B/op	      20 allocs/op
+BenchmarkResolveObjectValue/test_staticObjectFlag-16     	 1534627	      4885 ns/op	    2211 B/op	      44 allocs/op
+BenchmarkResolveObjectValue/test_targetingObjectFlag-16  	  509614	     14640 ns/op	    7251 B/op	     116 allocs/op
+BenchmarkResolveObjectValue/test_staticBoolFlag-16       	 3871867	      1978 ns/op	     960 B/op	      18 allocs/op
+BenchmarkResolveObjectValue/test_missingFlag-16          	 3484065	      2080 ns/op	    1064 B/op	      21 allocs/op
+BenchmarkResolveObjectValue/test_disabledFlag-16         	 4013230	      2158 ns/op	    1024 B/op	      20 allocs/op
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/evaluator	233.286s
+ok  	github.com/open-feature/flagd/core/pkg/evaluator	261.212s
 ?   	github.com/open-feature/flagd/core/pkg/evaluator/mock	[no test files]
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/logger	0.003s
+ok  	github.com/open-feature/flagd/core/pkg/logger	0.002s
 ?   	github.com/open-feature/flagd/core/pkg/model	[no test files]
 ?   	github.com/open-feature/flagd/core/pkg/service	[no test files]
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/service/ofrep	0.003s
+ok  	github.com/open-feature/flagd/core/pkg/service/ofrep	0.002s
 PASS
 ok  	github.com/open-feature/flagd/core/pkg/store	0.002s
 ?   	github.com/open-feature/flagd/core/pkg/sync	[no test files]
@@ -51,9 +51,9 @@ PASS
 ok  	github.com/open-feature/flagd/core/pkg/sync/builder	0.020s
 ?   	github.com/open-feature/flagd/core/pkg/sync/builder/mock	[no test files]
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/sync/file	1.007s
+ok  	github.com/open-feature/flagd/core/pkg/sync/file	1.006s
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/sync/grpc	8.014s
+ok  	github.com/open-feature/flagd/core/pkg/sync/grpc	8.013s
 PASS
 ok  	github.com/open-feature/flagd/core/pkg/sync/grpc/credentials	0.004s
 ?   	github.com/open-feature/flagd/core/pkg/sync/grpc/credentials/mock	[no test files]
@@ -61,10 +61,10 @@ ok  	github.com/open-feature/flagd/core/pkg/sync/grpc/credentials	0.004s
 PASS
 ok  	github.com/open-feature/flagd/core/pkg/sync/grpc/nameresolvers	0.002s
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/sync/http	4.008s
+ok  	github.com/open-feature/flagd/core/pkg/sync/http	4.007s
 ?   	github.com/open-feature/flagd/core/pkg/sync/http/mock	[no test files]
 PASS
-ok  	github.com/open-feature/flagd/core/pkg/sync/kubernetes	0.015s
+ok  	github.com/open-feature/flagd/core/pkg/sync/kubernetes	0.016s
 ?   	github.com/open-feature/flagd/core/pkg/sync/testing	[no test files]
 PASS
 ok  	github.com/open-feature/flagd/core/pkg/telemetry	0.016s
```

</details>

---------

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-30 09:08:47 -04:00
Todd Baert 2f49ea7ed7
Update decouple-flag-source-and-set.md
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-29 12:18:07 -04:00
Todd Baert 2d40e28b92
Update duplicate-flag-keys.md
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-29 12:16:03 -04:00
Hugo Huang 849ce39827
docs(ADR): decouple flag sources and flag sets (#1644)
The ADR proposes a different approach to solve the problems stated in
https://github.com/open-feature/flagd/pull/1634.

It's quite concise and simple at the point, assuming the reviewers
already have the context from the original discussions.

Will add more details after the initial feedback and inputs.

---------

Signed-off-by: Hugo Huang <lorqor@gmail.com>
2025-07-29 12:14:12 -04:00
github-actions[bot] 1ee636aa03
chore: release main (#1696)
🤖 I have created a release *beep* *boop*
---


<details><summary>flagd: 0.12.9</summary>

##
[0.12.9](https://github.com/open-feature/flagd/compare/flagd/v0.12.8...flagd/v0.12.9)
(2025-07-28)


###  New Features

* Add toggle for disabling getMetadata request
([#1693](https://github.com/open-feature/flagd/issues/1693))
([e8fd680](e8fd680860))
</details>

<details><summary>core: 0.12.1</summary>

##
[0.12.1](https://github.com/open-feature/flagd/compare/core/v0.12.0...core/v0.12.1)
(2025-07-28)


### 🧹 Chore

* add back file-delete test
([#1694](https://github.com/open-feature/flagd/issues/1694))
([750aa17](750aa176b5))
* fix benchmark
([#1698](https://github.com/open-feature/flagd/issues/1698))
([5e2d7d7](5e2d7d7176))
</details>

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).

Signed-off-by: OpenFeature Bot <109696520+openfeaturebot@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-07-28 16:00:14 -04:00
Todd Baert 5e2d7d7176
chore: fix benchmark (#1698)
This was out of date. I've updated it to use the correct format, and
update the asserts accordingly.

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-28 14:30:13 -04:00
lea konvalinka e8fd680860
feat: Add toggle for disabling getMetadata request (#1693)
<!-- Please use this template for your pull request. -->
<!-- Please use the sections that you need and delete other sections -->

## This PR
<!-- add the description of the PR here -->

- adds a toggle `disable-sync-metadata` for the Sync Service to disable
or enable the deprecated `getMetadata` request

### Related Issues
[#1688 [FEATURE] Temporary Context Enrichment
toggle](https://github.com/open-feature/flagd/issues/1688)

---------

Signed-off-by: Konvalinka <lea.konvalinka@dynatrace.com>
2025-07-28 15:33:31 +02:00
Todd Baert 750aa176b5
chore: add back file-delete test (#1694)
Adds back a test that was inappropriately removed (as explained
[here](https://github.com/beeme1mr/flagd/pull/16/files#r2223712589)).

Also fixes some typos.

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-24 07:21:36 -04:00
github-actions[bot] c0a2940aef
chore: release main (#1686)
🤖 I have created a release *beep* *boop*
---


<details><summary>flagd: 0.12.8</summary>

##
[0.12.8](https://github.com/open-feature/flagd/compare/flagd/v0.12.7...flagd/v0.12.8)
(2025-07-21)


### 🐛 Bug Fixes

* update to latest otel semconv
([#1668](https://github.com/open-feature/flagd/issues/1668))
([81855d7](81855d76f9))


### 🧹 Chore

* **deps:** update module github.com/open-feature/flagd/core to v0.11.8
([#1685](https://github.com/open-feature/flagd/issues/1685))
([c07ffba](c07ffba554))
</details>

<details><summary>flagd-proxy: 0.8.0</summary>

##
[0.8.0](https://github.com/open-feature/flagd/compare/flagd-proxy/v0.7.6...flagd-proxy/v0.8.0)
(2025-07-21)


### ⚠ BREAKING CHANGES

* remove sync.Type
([#1691](https://github.com/open-feature/flagd/issues/1691))

###  New Features

* remove sync.Type
([#1691](https://github.com/open-feature/flagd/issues/1691))
([ac647e0](ac647e0656))


### 🧹 Chore

* **deps:** update module github.com/open-feature/flagd/core to v0.11.8
([#1685](https://github.com/open-feature/flagd/issues/1685))
([c07ffba](c07ffba554))
</details>

<details><summary>core: 0.12.0</summary>

##
[0.12.0](https://github.com/open-feature/flagd/compare/core/v0.11.8...core/v0.12.0)
(2025-07-21)


### ⚠ BREAKING CHANGES

* remove sync.Type
([#1691](https://github.com/open-feature/flagd/issues/1691))

### 🐛 Bug Fixes

* update to latest otel semconv
([#1668](https://github.com/open-feature/flagd/issues/1668))
([81855d7](81855d76f9))


###  New Features

* Add support for HTTP eTag header and 304 no change response
([#1645](https://github.com/open-feature/flagd/issues/1645))
([ea3be4f](ea3be4f901))
* remove sync.Type
([#1691](https://github.com/open-feature/flagd/issues/1691))
([ac647e0](ac647e0656))
</details>

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).

Signed-off-by: OpenFeature Bot <109696520+openfeaturebot@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-07-23 16:39:41 -04:00
Zhiwei Liang ea3be4f901
feat: Add support for HTTP eTag header and 304 no change response (#1645)
## This PR
- adds the support for the `eTag` request header and `304 Not Modified`
response.

### Related Issues
Fixes #1558

### Notes
This proposal includes some significant behavior changes; therefore, any
feedback, opinions, or objections are welcome and appreciated.

### How to test
```bash
make build
cd bin && ./flagd start --port 8013 --uri https://raw.githubusercontent.com/open-feature/flagd/main/samples/example_flags.flagd.json --debug
```

More specific test cases to be added when we all agree on proceeding
with the implementation of the change in this PR.

---------

Signed-off-by: Zhiwei Liang <zhiwei.liang27@pm.me>
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
Co-authored-by: Michael Beemer <beeme1mr@users.noreply.github.com>
Co-authored-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-21 08:30:22 -04:00
Todd Baert ac647e0656
feat!: remove sync.Type (#1691)
The PR removes the obsolete `sync.Type`. This was intended to be used to
support partial updates from sync sources, but it's never been used and
it's lagging behind on features and would not work fully if implemented
by a source (`metadata` is not properly implemented). Moreover, we're
not convinced the value is worth the complexity it adds.

Specifically:

- removes `sync.Type` and all references
- removes tests for `sync.Type` and updates assertions for unrelated
tests which previously asserted on `sync.Type` (now we use the
payload/source in assertions)
- (unrelated) updates `CONTRIBUTING.md` to include a bunch of helpful
manual testing steps

Note that this is a breaking change, but only for the `flagd/core`
library, NOT for flagd or any sync source itself in terms of their
behavior. Since there was no change in `flagd/`, this will not show up
in the CHANGELOG.

Resolves: https://github.com/open-feature/flagd/issues/1678

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
2025-07-21 08:01:56 -04:00
Michael Beemer 81855d76f9
fix: update to latest otel semconv (#1668)
Signed-off-by: Michael Beemer <beeme1mr@users.noreply.github.com>
2025-07-17 12:36:02 -04:00
renovate[bot] c07ffba554
chore(deps): update module github.com/open-feature/flagd/core to v0.11.8 (#1685)
This PR contains the following updates:

| Package | Change | Age | Confidence |
|---|---|---|---|
|
[github.com/open-feature/flagd/core](https://redirect.github.com/open-feature/flagd)
| `v0.11.6` -> `v0.11.8` |
[![age](https://developer.mend.io/api/mc/badges/age/go/github.com%2fopen-feature%2fflagd%2fcore/v0.11.8?slim=true)](https://docs.renovatebot.com/merge-confidence/)
|
[![confidence](https://developer.mend.io/api/mc/badges/confidence/go/github.com%2fopen-feature%2fflagd%2fcore/v0.11.6/v0.11.8?slim=true)](https://docs.renovatebot.com/merge-confidence/)
|

---

### Configuration

📅 **Schedule**: Branch creation - At any time (no schedule defined),
Automerge - At any time (no schedule defined).

🚦 **Automerge**: Enabled.

♻ **Rebasing**: Whenever PR is behind base branch, or you tick the
rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about this update
again.

---

- [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check
this box

---

This PR was generated by [Mend Renovate](https://mend.io/renovate/).
View the [repository job
log](https://developer.mend.io/github/open-feature/flagd).

<!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiI0MS4yMy4yIiwidXBkYXRlZEluVmVyIjoiNDEuMjMuMiIsInRhcmdldEJyYW5jaCI6Im1haW4iLCJsYWJlbHMiOlsicmVub3ZhdGUiXX0=-->

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-16 06:38:56 +00:00
75 changed files with 2663 additions and 2725 deletions

5
.gitignore vendored
View File

@ -20,4 +20,7 @@ site
.cache/
# coverage results
*coverage.out
*coverage.out
# benchmark results
benchmark.txt

View File

@ -1,5 +1,5 @@
{
"flagd": "0.12.7",
"flagd-proxy": "0.7.6",
"core": "0.11.8"
"flagd": "0.12.9",
"flagd-proxy": "0.8.0",
"core": "0.12.1"
}

View File

@ -3,4 +3,4 @@
#
# Managed by Peribolos: https://github.com/open-feature/community/blob/main/config/open-feature/cloud-native/workgroup.yaml
#
* @open-feature/cloud-native-maintainers @open-feature/maintainers
* @open-feature/flagd-maintainers @open-feature/maintainers

View File

@ -8,6 +8,24 @@ TLDR: be respectful.
Any contributions are expected to include unit tests.
These can be validated with `make test` or the automated github workflow will run them on PR creation.
## Development
### Prerequisites
You'll need:
- Go
- make
- docker
You'll want:
- curl (for calling HTTP endpoints)
- [grpcurl](https://github.com/fullstorydev/grpcurl) (for making gRPC calls)
- jq (for pretty printing responses)
### Workspace Initialization
This project uses a go workspace, to setup the project run
```shell
@ -22,6 +40,70 @@ The project uses remote buf packages, changing the remote generation source will
export GOPRIVATE=buf.build/gen/go
```
### Manual testing
flagd has a number of interfaces (you can read more about them at [flagd.dev](https://flagd.dev/)) which can be used to evaluate flags, or deliver flag configurations so that they can be evaluated by _in-process_ providers.
You can manually test this functionality by starting flagd (from the flagd/ directory) with `go run main.go start -f file:../config/samples/example_flags.flagd.json`.
NOTE: you will need `go, curl`
#### Remote single flag evaluation via HTTP1.1/Connect
```sh
# evaluates a single boolean flag
curl -X POST -d '{"flagKey":"myBoolFlag","context":{}}' -H "Content-Type: application/json" "http://localhost:8013/flagd.evaluation.v1.Service/ResolveBoolean" | jq
```
#### Remote single flag evaluation via HTTP1.1/OFREP
```sh
# evaluates a single boolean flag
curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags/myBoolFlag' | jq
```
#### Remote single flag evaluation via gRPC
```sh
# evaluates a single boolean flag
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{"flagKey":"myBoolFlag"}' localhost:8013 flagd.evaluation.v1.Service/ResolveBoolean | jq
```
#### Remote bulk evaluation via HTTP1.1/OFREP
```sh
# evaluates flags in bulk
curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags' | jq
```
#### Remote bulk evaluation via gRPC
```sh
# evaluates flags in bulk
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/ResolveAll | jq
```
#### Remote event streaming via gRPC
```sh
# notifies of flag changes (but does not evaluate)
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/EventStream
```
#### Flag configuration fetch via gRPC
```sh
# sends back a representation of all flags
grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/FetchAllFlags | jq
```
#### Flag synchronization stream via gRPC
```sh
# will open a persistent stream which sends flag changes when the watched source is modified
grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
```
## DCO Sign-Off
A DCO (Developer Certificate of Origin) sign-off is a line placed at the end of

View File

@ -47,12 +47,19 @@ test-flagd:
go test -race -covermode=atomic -cover -short ./flagd/pkg/... -coverprofile=flagd-coverage.out
test-flagd-proxy:
go test -race -covermode=atomic -cover -short ./flagd-proxy/pkg/... -coverprofile=flagd-proxy-coverage.out
flagd-integration-test: # dependent on ./bin/flagd start -f file:test-harness/flags/testing-flags.json -f file:test-harness/flags/custom-ops.json -f file:test-harness/flags/evaluator-refs.json -f file:test-harness/flags/zero-flags.json
go test -cover ./test/integration $(ARGS)
flagd-benchmark-test:
go test -bench=Bench -short -benchtime=5s -benchmem ./core/... | tee benchmark.txt
flagd-integration-test-harness:
# target used to start a locally built flagd with the e2e flags
cd flagd; go run main.go start -f file:../test-harness/flags/testing-flags.json -f file:../test-harness/flags/custom-ops.json -f file:../test-harness/flags/evaluator-refs.json -f file:../test-harness/flags/zero-flags.json -f file:../test-harness/flags/edge-case-flags.json
flagd-integration-test: # dependent on flagd-e2e-test-harness if not running in github actions
go test -count=1 -cover ./test/integration $(ARGS)
run: # default to flagd
make run-flagd
run-flagd:
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json
run-flagd-selector-demo:
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json -f file:../config/samples/example_flags.flagd.2.json
install:
cp systemd/flagd.service /etc/systemd/system/flagd.service
mkdir -p /etc/flagd

72
benchmark.txt Normal file
View File

@ -0,0 +1,72 @@
PASS
ok github.com/open-feature/flagd/core/pkg/certreloader 15.986s
goos: linux
goarch: amd64
pkg: github.com/open-feature/flagd/core/pkg/evaluator
cpu: 11th Gen Intel(R) Core(TM) i9-11950H @ 2.60GHz
BenchmarkFractionalEvaluation/test_a@faas.com-16 423930 13316 ns/op 7229 B/op 135 allocs/op
BenchmarkFractionalEvaluation/test_b@faas.com-16 469594 13677 ns/op 7229 B/op 135 allocs/op
BenchmarkFractionalEvaluation/test_c@faas.com-16 569103 13286 ns/op 7229 B/op 135 allocs/op
BenchmarkFractionalEvaluation/test_d@faas.com-16 412386 13023 ns/op 7229 B/op 135 allocs/op
BenchmarkResolveBooleanValue/test_staticBoolFlag-16 3106903 1792 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveBooleanValue/test_targetingBoolFlag-16 448164 11250 ns/op 6065 B/op 87 allocs/op
BenchmarkResolveBooleanValue/test_staticObjectFlag-16 3958750 1476 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveBooleanValue/test_missingFlag-16 5331808 1353 ns/op 784 B/op 12 allocs/op
BenchmarkResolveBooleanValue/test_disabledFlag-16 4530751 1301 ns/op 1072 B/op 13 allocs/op
BenchmarkResolveStringValue/test_staticStringFlag-16 4583056 1525 ns/op 1040 B/op 13 allocs/op
BenchmarkResolveStringValue/test_targetingStringFlag-16 839954 10388 ns/op 6097 B/op 89 allocs/op
BenchmarkResolveStringValue/test_staticObjectFlag-16 4252830 1677 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveStringValue/test_missingFlag-16 3743324 1495 ns/op 784 B/op 12 allocs/op
BenchmarkResolveStringValue/test_disabledFlag-16 3495699 1709 ns/op 1072 B/op 13 allocs/op
BenchmarkResolveFloatValue/test:_staticFloatFlag-16 4382868 1511 ns/op 1024 B/op 13 allocs/op
BenchmarkResolveFloatValue/test:_targetingFloatFlag-16 867987 10344 ns/op 6081 B/op 89 allocs/op
BenchmarkResolveFloatValue/test:_staticObjectFlag-16 3913120 1695 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveFloatValue/test:_missingFlag-16 3910468 1349 ns/op 784 B/op 12 allocs/op
BenchmarkResolveFloatValue/test:_disabledFlag-16 3642919 1666 ns/op 1072 B/op 13 allocs/op
BenchmarkResolveIntValue/test_staticIntFlag-16 4077288 1349 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveIntValue/test_targetingNumberFlag-16 922383 7601 ns/op 6065 B/op 87 allocs/op
BenchmarkResolveIntValue/test_staticObjectFlag-16 4995128 1229 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveIntValue/test_missingFlag-16 5574153 1274 ns/op 768 B/op 12 allocs/op
BenchmarkResolveIntValue/test_disabledFlag-16 3633708 1734 ns/op 1072 B/op 13 allocs/op
BenchmarkResolveObjectValue/test_staticObjectFlag-16 1624102 4559 ns/op 2243 B/op 37 allocs/op
BenchmarkResolveObjectValue/test_targetingObjectFlag-16 443880 11995 ns/op 7283 B/op 109 allocs/op
BenchmarkResolveObjectValue/test_staticBoolFlag-16 3462445 1665 ns/op 1008 B/op 11 allocs/op
BenchmarkResolveObjectValue/test_missingFlag-16 4207567 1458 ns/op 784 B/op 12 allocs/op
BenchmarkResolveObjectValue/test_disabledFlag-16 3407262 1848 ns/op 1072 B/op 13 allocs/op
PASS
ok github.com/open-feature/flagd/core/pkg/evaluator 239.506s
? github.com/open-feature/flagd/core/pkg/evaluator/mock [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/logger 0.003s
? github.com/open-feature/flagd/core/pkg/model [no test files]
? github.com/open-feature/flagd/core/pkg/service [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/service/ofrep 0.002s
PASS
ok github.com/open-feature/flagd/core/pkg/store 0.003s
? github.com/open-feature/flagd/core/pkg/sync [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/sync/blob 0.016s
PASS
ok github.com/open-feature/flagd/core/pkg/sync/builder 0.018s
? github.com/open-feature/flagd/core/pkg/sync/builder/mock [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/sync/file 1.007s
PASS
ok github.com/open-feature/flagd/core/pkg/sync/grpc 8.011s
PASS
ok github.com/open-feature/flagd/core/pkg/sync/grpc/credentials 0.008s
? github.com/open-feature/flagd/core/pkg/sync/grpc/credentials/mock [no test files]
? github.com/open-feature/flagd/core/pkg/sync/grpc/mock [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/sync/grpc/nameresolvers 0.002s
PASS
ok github.com/open-feature/flagd/core/pkg/sync/http 4.006s
? github.com/open-feature/flagd/core/pkg/sync/http/mock [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/sync/kubernetes 0.016s
? github.com/open-feature/flagd/core/pkg/sync/testing [no test files]
PASS
ok github.com/open-feature/flagd/core/pkg/telemetry 0.016s
PASS
ok github.com/open-feature/flagd/core/pkg/utils 0.002s

View File

@ -0,0 +1,17 @@
{
"$schema": "https://flagd.dev/schema/v0/flags.json",
"metadata": {
"flagSetId": "other",
"version": "v1"
},
"flags": {
"myStringFlag": {
"state": "ENABLED",
"variants": {
"dupe1": "dupe1",
"dupe2": "dupe2"
},
"defaultVariant": "dupe1"
}
}
}

View File

@ -1,5 +1,30 @@
# Changelog
## [0.12.1](https://github.com/open-feature/flagd/compare/core/v0.12.0...core/v0.12.1) (2025-07-28)
### 🧹 Chore
* add back file-delete test ([#1694](https://github.com/open-feature/flagd/issues/1694)) ([750aa17](https://github.com/open-feature/flagd/commit/750aa176b5a8dd24a9daaff985ff6efeb084c758))
* fix benchmark ([#1698](https://github.com/open-feature/flagd/issues/1698)) ([5e2d7d7](https://github.com/open-feature/flagd/commit/5e2d7d7176ba05e667cd92acd7decb531a8de2f6))
## [0.12.0](https://github.com/open-feature/flagd/compare/core/v0.11.8...core/v0.12.0) (2025-07-21)
### ⚠ BREAKING CHANGES
* remove sync.Type ([#1691](https://github.com/open-feature/flagd/issues/1691))
### 🐛 Bug Fixes
* update to latest otel semconv ([#1668](https://github.com/open-feature/flagd/issues/1668)) ([81855d7](https://github.com/open-feature/flagd/commit/81855d76f94a09251a19a05f830cc1d11ab6b566))
### ✨ New Features
* Add support for HTTP eTag header and 304 no change response ([#1645](https://github.com/open-feature/flagd/issues/1645)) ([ea3be4f](https://github.com/open-feature/flagd/commit/ea3be4f9010644132795bb60b36fb7705f901b62))
* remove sync.Type ([#1691](https://github.com/open-feature/flagd/issues/1691)) ([ac647e0](https://github.com/open-feature/flagd/commit/ac647e065636071f5bc065a9a084461cea692166))
## [0.11.8](https://github.com/open-feature/flagd/compare/core/v0.11.7...core/v0.11.8) (2025-07-15)

View File

@ -11,6 +11,8 @@ import (
)
func TestFractionalEvaluation(t *testing.T) {
const source = "testSource"
var sources = []string{source}
ctx := context.Background()
commonFlags := Flags{
@ -458,8 +460,13 @@ func TestFractionalEvaluation(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, store.NewFlags())
je.store.Flags = tt.flags.Flags
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags.Flags, model.Metadata{})
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
@ -486,6 +493,8 @@ func TestFractionalEvaluation(t *testing.T) {
}
func BenchmarkFractionalEvaluation(b *testing.B) {
const source = "testSource"
var sources = []string{source}
ctx := context.Background()
flags := Flags{
@ -508,7 +517,7 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
},
{
"fractional": [
"email",
{"var": "email"},
[
"red",
25
@ -542,41 +551,41 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
expectedReason string
expectedErrorCode string
}{
"test@faas.com": {
"test_a@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test@faas.com",
"email": "test_a@faas.com",
},
expectedVariant: "blue",
expectedValue: "#0000FF",
expectedReason: model.TargetingMatchReason,
},
"test_b@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test_b@faas.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.TargetingMatchReason,
},
"test2@faas.com": {
"test_c@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test2@faas.com",
"email": "test_c@faas.com",
},
expectedVariant: "yellow",
expectedValue: "#FFFF00",
expectedVariant: "green",
expectedValue: "#00FF00",
expectedReason: model.TargetingMatchReason,
},
"test3@faas.com": {
"test_d@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test3@faas.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.TargetingMatchReason,
},
"test4@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test4@faas.com",
"email": "test_d@faas.com",
},
expectedVariant: "blue",
expectedValue: "#0000FF",
@ -587,7 +596,13 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
for name, tt := range tests {
b.Run(name, func(b *testing.B) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, &store.State{Flags: tt.flags.Flags})
s, err := store.NewStore(log, sources)
if err != nil {
b.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags.Flags, model.Metadata{})
for i := 0; i < b.N; i++ {
value, variant, reason, _, err := resolve[string](
ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)

View File

@ -35,7 +35,7 @@ IEvaluator is an extension of IResolver, allowing storage updates and retrievals
*/
type IEvaluator interface {
GetState() (string, error)
SetState(payload sync.DataSync) (model.Metadata, bool, error)
SetState(payload sync.DataSync) (map[string]interface{}, bool, error)
IResolver
}

View File

@ -64,13 +64,13 @@ func WithEvaluator(name string, evalFunc func(interface{}, interface{}) interfac
// JSON evaluator
type JSON struct {
store *store.State
store *store.Store
Logger *logger.Logger
jsonEvalTracer trace.Tracer
Resolver
}
func NewJSON(logger *logger.Logger, s *store.State, opts ...JSONEvaluatorOption) *JSON {
func NewJSON(logger *logger.Logger, s *store.Store, opts ...JSONEvaluatorOption) *JSON {
logger = logger.WithFields(
zap.String("component", "evaluator"),
zap.String("evaluator", "json"),
@ -103,8 +103,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
_, span := je.jsonEvalTracer.Start(
context.Background(),
"flagSync",
trace.WithAttributes(attribute.String("feature_flag.source", payload.Source)),
trace.WithAttributes(attribute.String("feature_flag.sync_type", payload.String())))
trace.WithAttributes(attribute.String("feature_flag.source", payload.Source)))
defer span.End()
var definition Definition
@ -119,19 +118,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
var events map[string]interface{}
var reSync bool
// TODO: We do not handle metadata in ADD/UPDATE operations. These are only relevant for grpc sync implementations.
switch payload.Type {
case sync.ALL:
events, reSync = je.store.Merge(je.Logger, payload.Source, payload.Selector, definition.Flags, definition.Metadata)
case sync.ADD:
events = je.store.Add(je.Logger, payload.Source, payload.Selector, definition.Flags)
case sync.UPDATE:
events = je.store.Update(je.Logger, payload.Source, payload.Selector, definition.Flags)
case sync.DELETE:
events = je.store.DeleteFlags(je.Logger, payload.Source, definition.Flags)
default:
return nil, false, fmt.Errorf("unsupported sync type: %d", payload.Type)
}
events, reSync = je.store.Update(payload.Source, definition.Flags, definition.Metadata)
// Number of events correlates to the number of flags changed through this sync, record it
span.SetAttributes(attribute.Int("feature_flag.change_count", len(events)))
@ -152,7 +139,6 @@ func NewResolver(store store.IStore, logger *logger.Logger, jsonEvalTracer trace
jsonlogic.AddOperator(StartsWithEvaluationName, NewStringComparisonEvaluator(logger).StartsWithEvaluation)
jsonlogic.AddOperator(EndsWithEvaluationName, NewStringComparisonEvaluator(logger).EndsWithEvaluation)
jsonlogic.AddOperator(SemVerEvaluationName, NewSemVerComparison(logger).SemVerEvaluation)
jsonlogic.AddOperator(LegacyFractionEvaluationName, NewLegacyFractional(logger).LegacyFractionalEvaluation)
return Resolver{store: store, Logger: logger, tracer: jsonEvalTracer}
}
@ -163,8 +149,12 @@ func (je *Resolver) ResolveAllValues(ctx context.Context, reqID string, context
_, span := je.tracer.Start(ctx, "resolveAll")
defer span.End()
var err error
allFlags, flagSetMetadata, err := je.store.GetAll(ctx)
var selector store.Selector
s := ctx.Value(store.SelectorContextKey{})
if s != nil {
selector = s.(store.Selector)
}
allFlags, flagSetMetadata, err := je.store.GetAll(ctx, &selector)
if err != nil {
return nil, flagSetMetadata, fmt.Errorf("error retreiving flags from the store: %w", err)
}
@ -315,19 +305,19 @@ func resolve[T constraints](ctx context.Context, reqID string, key string, conte
func (je *Resolver) evaluateVariant(ctx context.Context, reqID string, flagKey string, evalCtx map[string]any) (
variant string, variants map[string]interface{}, reason string, metadata map[string]interface{}, err error,
) {
flag, metadata, ok := je.store.Get(ctx, flagKey)
if !ok {
var selector store.Selector
s := ctx.Value(store.SelectorContextKey{})
if s != nil {
selector = s.(store.Selector)
}
flag, metadata, err := je.store.Get(ctx, flagKey, &selector)
if err != nil {
// flag not found
je.Logger.DebugWithID(reqID, fmt.Sprintf("requested flag could not be found: %s", flagKey))
return "", map[string]interface{}{}, model.ErrorReason, metadata, errors.New(model.FlagNotFoundErrorCode)
}
// add selector to evaluation metadata
selector := je.store.SelectorForFlag(ctx, flag)
if selector != "" {
metadata[SelectorMetadataKey] = selector
}
for key, value := range flag.Metadata {
// If value is not nil or empty, copy to metadata
if value != nil {

View File

@ -127,7 +127,7 @@ const UndefinedDefaultWithTargetting = `{
const (
FlagSetID = "testSetId"
Version = "v33"
ValidFlag = "validFlag"
ValidFlag = "validFlag"
MissingFlag = "missingFlag"
StaticBoolFlag = "staticBoolFlag"
StaticBoolValue = true
@ -383,7 +383,7 @@ var Flags = fmt.Sprintf(`{
func TestGetState_Valid_ContainsFlag(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags, Source: "testSource"})
if err != nil {
t.Fatalf("Expected no error")
}
@ -405,7 +405,7 @@ func TestSetState_Invalid_Error(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
// set state with an invalid flag definition
_, _, err := evaluator.SetState(sync.DataSync{FlagData: InvalidFlags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: InvalidFlags, Source: "testSource"})
if err != nil {
t.Fatalf("unexpected error")
}
@ -415,7 +415,7 @@ func TestSetState_Valid_NoError(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
// set state with a valid flag definition
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -423,7 +423,7 @@ func TestSetState_Valid_NoError(t *testing.T) {
func TestResolveAllValues(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -476,62 +476,6 @@ func TestResolveAllValues(t *testing.T) {
}
}
func TestMetadataResolveType(t *testing.T) {
tests := []struct {
flagKey string
metadata model.Metadata
}{
{StaticBoolFlag, model.Metadata{"flagSetId": FlagSetID, "version": Version}},
{MetadataFlag, model.Metadata{"flagSetId": FlagSetID, "version": VersionOverride}},
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
if err != nil {
t.Fatalf("expected no error")
}
for _, test := range tests {
_, _, _, metadata, _ := evaluator.ResolveBooleanValue(context.TODO(), reqID, test.flagKey, nil)
if !reflect.DeepEqual(test.metadata, metadata) {
t.Errorf("expected metadata to be %v, but got %v", test.metadata, metadata)
}
}
}
func TestMetadataResolveAll(t *testing.T) {
expectedFlagSetMetadata := model.Metadata{"flagSetId": FlagSetID, "version": Version}
tests := []struct {
flagKey string
metadata model.Metadata
}{
{StaticBoolFlag, model.Metadata{"flagSetId": FlagSetID, "version": Version}},
{MetadataFlag, model.Metadata{"flagSetId": FlagSetID, "version": VersionOverride}},
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
if err != nil {
t.Fatalf("expected no error")
}
for _, test := range tests {
resolutions, flagSetMetadata, _ := evaluator.ResolveAllValues(context.TODO(), reqID, nil)
for _, resolved := range resolutions {
if resolved.FlagKey == test.flagKey {
if !reflect.DeepEqual(test.metadata, resolved.Metadata) {
t.Errorf("expected flag metadata to be %v, but got %v", test.metadata, resolved.Metadata)
}
}
}
if !reflect.DeepEqual(expectedFlagSetMetadata, flagSetMetadata) {
t.Errorf("expected flag set metadata to be %v, but got %v", expectedFlagSetMetadata, flagSetMetadata)
}
}
}
func TestResolveBooleanValue(t *testing.T) {
tests := []struct {
flagKey string
@ -548,7 +492,7 @@ func TestResolveBooleanValue(t *testing.T) {
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -583,7 +527,7 @@ func BenchmarkResolveBooleanValue(b *testing.B) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
b.Fatalf("expected no error")
}
@ -623,7 +567,7 @@ func TestResolveStringValue(t *testing.T) {
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -659,7 +603,7 @@ func BenchmarkResolveStringValue(b *testing.B) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
b.Fatalf("expected no error")
}
@ -699,7 +643,7 @@ func TestResolveFloatValue(t *testing.T) {
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -735,7 +679,7 @@ func BenchmarkResolveFloatValue(b *testing.B) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
b.Fatalf("expected no error")
}
@ -775,7 +719,7 @@ func TestResolveIntValue(t *testing.T) {
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -811,7 +755,7 @@ func BenchmarkResolveIntValue(b *testing.B) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
b.Fatalf("expected no error")
}
@ -851,7 +795,7 @@ func TestResolveObjectValue(t *testing.T) {
}
const reqID = "default"
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -890,7 +834,7 @@ func BenchmarkResolveObjectValue(b *testing.B) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
b.Fatalf("expected no error")
}
@ -935,7 +879,7 @@ func TestResolveAsAnyValue(t *testing.T) {
}
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
@ -956,7 +900,7 @@ func TestResolveAsAnyValue(t *testing.T) {
func TestResolve_DefaultVariant(t *testing.T) {
tests := []struct {
flags string
flags string
flagKey string
context map[string]interface{}
reason string
@ -971,14 +915,14 @@ func TestResolve_DefaultVariant(t *testing.T) {
for _, test := range tests {
t.Run("", func(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: test.flags})
_, _, err := evaluator.SetState(sync.DataSync{FlagData: test.flags, Source: "testSource"})
if err != nil {
t.Fatalf("expected no error")
}
anyResult := evaluator.ResolveAsAnyValue(context.TODO(), "", test.flagKey, test.context)
assert.Equal(t, model.ErrorReason, anyResult.Reason)
assert.EqualError(t, anyResult.Error, test.errorCode)
})
@ -1038,7 +982,7 @@ func TestSetState_DefaultVariantValidation(t *testing.T) {
t.Run(name, func(t *testing.T) {
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.jsonFlags})
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.jsonFlags, Source: "testSource"})
if tt.valid && err != nil {
t.Error(err)
@ -1050,7 +994,6 @@ func TestSetState_DefaultVariantValidation(t *testing.T) {
func TestState_Evaluator(t *testing.T) {
tests := map[string]struct {
inputState string
inputSyncType sync.Type
expectedOutputState string
expectedError bool
expectedResync bool
@ -1086,7 +1029,6 @@ func TestState_Evaluator(t *testing.T) {
}
}
`,
inputSyncType: sync.ALL,
expectedOutputState: `
{
"flags": {
@ -1099,7 +1041,7 @@ func TestState_Evaluator(t *testing.T) {
},
"defaultVariant": "recursive",
"state": "ENABLED",
"source":"",
"source":"testSource",
"selector":"",
"targeting": {
"if": [
@ -1147,7 +1089,6 @@ func TestState_Evaluator(t *testing.T) {
}
}
`,
inputSyncType: sync.ALL,
expectedOutputState: `
{
"flags": {
@ -1160,7 +1101,7 @@ func TestState_Evaluator(t *testing.T) {
},
"defaultVariant": "recursive",
"state": "ENABLED",
"source":"",
"source":"testSource",
"selector":"",
"targeting": {
"if": [
@ -1204,7 +1145,6 @@ func TestState_Evaluator(t *testing.T) {
}
}
`,
inputSyncType: sync.ALL,
expectedError: true,
},
"invalid targeting": {
@ -1237,7 +1177,7 @@ func TestState_Evaluator(t *testing.T) {
"off": false
},
"defaultVariant": "off",
"source":"",
"source":"testSource",
"targeting": {
"if": [
{
@ -1258,7 +1198,6 @@ func TestState_Evaluator(t *testing.T) {
"flagSources":null
}
`,
inputSyncType: sync.ALL,
expectedError: false,
expectedOutputState: `
{
@ -1272,7 +1211,7 @@ func TestState_Evaluator(t *testing.T) {
},
"defaultVariant": "recursive",
"state": "ENABLED",
"source":"",
"source":"testSource",
"selector":"",
"targeting": {
"if": [
@ -1291,7 +1230,7 @@ func TestState_Evaluator(t *testing.T) {
"off": false
},
"defaultVariant": "off",
"source":"",
"source":"testSource",
"selector":"",
"targeting": {
"if": [
@ -1341,47 +1280,15 @@ func TestState_Evaluator(t *testing.T) {
}
}
`,
inputSyncType: sync.ALL,
expectedError: true,
},
"unexpected sync type": {
inputState: `
{
"flags": {
"fibAlgo": {
"variants": {
"recursive": "recursive",
"memo": "memo",
"loop": "loop",
"binet": "binet"
},
"defaultVariant": "recursive",
"state": "ENABLED",
"targeting": {
"if": [
{
"$ref": "emailWithFaas"
}, "binet", null
]
}
}
},
"$evaluators": {
"emailWithFaas": ""
}
}
`,
inputSyncType: 999,
expectedError: true,
expectedResync: false,
},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, resync, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.inputState})
_, resync, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.inputState, Source: "testSource"})
if err != nil {
if !tt.expectedError {
t.Error(err)
@ -1414,8 +1321,8 @@ func TestState_Evaluator(t *testing.T) {
t.Fatal(err)
}
if !reflect.DeepEqual(expectedOutputJSON["flags"], gotOutputJSON["flags"]) {
t.Errorf("expected state: %v got state: %v", expectedOutputJSON, gotOutputJSON)
if !reflect.DeepEqual(expectedOutputJSON["flags"], gotOutputJSON) {
t.Errorf("expected state: %v got state: %v", expectedOutputJSON["flags"], gotOutputJSON)
}
})
}
@ -1423,11 +1330,9 @@ func TestState_Evaluator(t *testing.T) {
func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
tests := map[string]struct {
dataSyncType sync.Type
flagResolution func(evaluator *evaluator.JSON) error
}{
"Add_ResolveAllValues": {
dataSyncType: sync.ADD,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil {
@ -1437,7 +1342,6 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
},
},
"Update_ResolveAllValues": {
dataSyncType: sync.UPDATE,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil {
@ -1447,7 +1351,6 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
},
},
"Delete_ResolveAllValues": {
dataSyncType: sync.DELETE,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil {
@ -1457,35 +1360,30 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
},
},
"Add_ResolveBooleanValue": {
dataSyncType: sync.ADD,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveBooleanValue(context.TODO(), "", StaticBoolFlag, nil)
return err
},
},
"Update_ResolveStringValue": {
dataSyncType: sync.UPDATE,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveBooleanValue(context.TODO(), "", StaticStringValue, nil)
return err
},
},
"Delete_ResolveIntValue": {
dataSyncType: sync.DELETE,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveIntValue(context.TODO(), "", StaticIntFlag, nil)
return err
},
},
"Add_ResolveFloatValue": {
dataSyncType: sync.ADD,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveFloatValue(context.TODO(), "", StaticFloatFlag, nil)
return err
},
},
"Update_ResolveObjectValue": {
dataSyncType: sync.UPDATE,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveObjectValue(context.TODO(), "", StaticObjectFlag, nil)
return err
@ -1497,7 +1395,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
t.Run(name, func(t *testing.T) {
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Type: sync.ADD})
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
t.Fatal(err)
}
@ -1520,7 +1418,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
errChan <- nil
return
default:
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Type: tt.dataSyncType})
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Source: "testSource"})
if err != nil {
errChan <- err
return
@ -1562,7 +1460,7 @@ func TestFlagdAmbientProperties(t *testing.T) {
t.Run("flagKeyIsInTheContext", func(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
"flags": {
"welcome-banner": {
"state": "ENABLED",
@ -1602,7 +1500,7 @@ func TestFlagdAmbientProperties(t *testing.T) {
t.Run("timestampIsInTheContext", func(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
"flags": {
"welcome-banner": {
"state": "ENABLED",
@ -1636,7 +1534,7 @@ func TestTargetingVariantBehavior(t *testing.T) {
t.Run("missing variant error", func(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
"flags": {
"missing-variant": {
"state": "ENABLED",
@ -1664,7 +1562,7 @@ func TestTargetingVariantBehavior(t *testing.T) {
t.Run("null fallback", func(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
"flags": {
"null-fallback": {
"state": "ENABLED",
@ -1697,7 +1595,7 @@ func TestTargetingVariantBehavior(t *testing.T) {
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
//nolint:dupword
_, _, err := evaluator.SetState(sync.DataSync{FlagData: `{
_, _, err := evaluator.SetState(sync.DataSync{Source: "testSource", FlagData: `{
"flags": {
"match-boolean": {
"state": "ENABLED",

View File

@ -1,145 +0,0 @@
// This evaluation type is deprecated and will be removed before v1.
// Do not enhance it or use it for reference.
package evaluator
import (
"errors"
"fmt"
"math"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/zeebo/xxh3"
)
const (
LegacyFractionEvaluationName = "fractionalEvaluation"
LegacyFractionEvaluationLink = "https://flagd.dev/concepts/#migrating-from-legacy-fractionalevaluation"
)
// Deprecated: LegacyFractional is deprecated. This will be removed prior to v1 release.
type LegacyFractional struct {
Logger *logger.Logger
}
type legacyFractionalEvaluationDistribution struct {
variant string
percentage int
}
func NewLegacyFractional(logger *logger.Logger) *LegacyFractional {
return &LegacyFractional{Logger: logger}
}
func (fe *LegacyFractional) LegacyFractionalEvaluation(values, data interface{}) interface{} {
fe.Logger.Warn(
fmt.Sprintf("%s is deprecated, please use %s, see: %s",
LegacyFractionEvaluationName,
FractionEvaluationName,
LegacyFractionEvaluationLink))
valueToDistribute, feDistributions, err := parseLegacyFractionalEvaluationData(values, data)
if err != nil {
fe.Logger.Error(fmt.Sprintf("parse fractional evaluation data: %v", err))
return nil
}
return distributeLegacyValue(valueToDistribute, feDistributions)
}
func parseLegacyFractionalEvaluationData(values, data interface{}) (string,
[]legacyFractionalEvaluationDistribution, error,
) {
valuesArray, ok := values.([]interface{})
if !ok {
return "", nil, errors.New("fractional evaluation data is not an array")
}
if len(valuesArray) < 2 {
return "", nil, errors.New("fractional evaluation data has length under 2")
}
bucketBy, ok := valuesArray[0].(string)
if !ok {
return "", nil, errors.New("first element of fractional evaluation data isn't of type string")
}
dataMap, ok := data.(map[string]interface{})
if !ok {
return "", nil, errors.New("data isn't of type map[string]interface{}")
}
v, ok := dataMap[bucketBy]
if !ok {
return "", nil, nil
}
valueToDistribute, ok := v.(string)
if !ok {
return "", nil, fmt.Errorf("var: %s isn't of type string", bucketBy)
}
feDistributions, err := parseLegacyFractionalEvaluationDistributions(valuesArray)
if err != nil {
return "", nil, err
}
return valueToDistribute, feDistributions, nil
}
func parseLegacyFractionalEvaluationDistributions(values []interface{}) (
[]legacyFractionalEvaluationDistribution, error,
) {
sumOfPercentages := 0
var feDistributions []legacyFractionalEvaluationDistribution
for i := 1; i < len(values); i++ {
distributionArray, ok := values[i].([]interface{})
if !ok {
return nil, errors.New("distribution elements aren't of type []interface{}")
}
if len(distributionArray) != 2 {
return nil, errors.New("distribution element isn't length 2")
}
variant, ok := distributionArray[0].(string)
if !ok {
return nil, errors.New("first element of distribution element isn't string")
}
percentage, ok := distributionArray[1].(float64)
if !ok {
return nil, errors.New("second element of distribution element isn't float")
}
sumOfPercentages += int(percentage)
feDistributions = append(feDistributions, legacyFractionalEvaluationDistribution{
variant: variant,
percentage: int(percentage),
})
}
if sumOfPercentages != 100 {
return nil, fmt.Errorf("percentages must sum to 100, got: %d", sumOfPercentages)
}
return feDistributions, nil
}
func distributeLegacyValue(value string, feDistribution []legacyFractionalEvaluationDistribution) string {
hashValue := xxh3.HashString(value)
hashRatio := float64(hashValue) / math.Pow(2, 64) // divide the hash value by the largest possible value, integer 2^64
bucket := int(hashRatio * 100) // integer in range [0, 99]
rangeEnd := 0
for _, dist := range feDistribution {
rangeEnd += dist.percentage
if bucket < rangeEnd {
return dist.variant
}
}
return ""
}

View File

@ -1,300 +0,0 @@
package evaluator
import (
"context"
"testing"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/store"
)
func TestLegacyFractionalEvaluation(t *testing.T) {
ctx := context.Background()
flags := Flags{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
Variants: map[string]any{
"red": "#FF0000",
"blue": "#0000FF",
"green": "#00FF00",
"yellow": "#FFFF00",
},
Targeting: []byte(`{
"if": [
{
"in": ["@faas.com", {
"var": ["email"]
}]
},
{
"fractionalEvaluation": [
"email",
[
"red",
25
],
[
"blue",
25
],
[
"green",
25
],
[
"yellow",
25
]
]
}, null
]
}`),
},
},
}
tests := map[string]struct {
flags Flags
flagKey string
context map[string]any
expectedValue string
expectedVariant string
expectedReason string
expectedErrorCode string
}{
"test@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test@faas.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.TargetingMatchReason,
},
"test2@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test2@faas.com",
},
expectedVariant: "yellow",
expectedValue: "#FFFF00",
expectedReason: model.TargetingMatchReason,
},
"test3@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test3@faas.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.TargetingMatchReason,
},
"test4@faas.com": {
flags: flags,
flagKey: "headerColor",
context: map[string]any{
"email": "test4@faas.com",
},
expectedVariant: "blue",
expectedValue: "#0000FF",
expectedReason: model.TargetingMatchReason,
},
"non even split": {
flags: Flags{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
Variants: map[string]any{
"red": "#FF0000",
"blue": "#0000FF",
"green": "#00FF00",
"yellow": "#FFFF00",
},
Targeting: []byte(`{
"if": [
{
"in": ["@faas.com", {
"var": ["email"]
}]
},
{
"fractionalEvaluation": [
"email",
[
"red",
50
],
[
"blue",
25
],
[
"green",
25
]
]
}, null
]
}`),
},
},
},
flagKey: "headerColor",
context: map[string]any{
"email": "test4@faas.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.TargetingMatchReason,
},
"fallback to default variant if no email provided": {
flags: Flags{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
Variants: map[string]any{
"red": "#FF0000",
"blue": "#0000FF",
"green": "#00FF00",
"yellow": "#FFFF00",
},
Targeting: []byte(`{
"fractionalEvaluation": [
"email",
[
"red",
25
],
[
"blue",
25
],
[
"green",
25
],
[
"yellow",
25
]
]
}`),
},
},
},
flagKey: "headerColor",
context: map[string]any{},
expectedVariant: "",
expectedValue: "",
expectedReason: model.ErrorReason,
expectedErrorCode: model.GeneralErrorCode,
},
"fallback to default variant if invalid variant as result of fractional evaluation": {
flags: Flags{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
Variants: map[string]any{
"red": "#FF0000",
"blue": "#0000FF",
"green": "#00FF00",
"yellow": "#FFFF00",
},
Targeting: []byte(`{
"fractionalEvaluation": [
"email",
[
"black",
100
]
]
}`),
},
},
},
flagKey: "headerColor",
context: map[string]any{
"email": "foo@foo.com",
},
expectedVariant: "",
expectedValue: "",
expectedReason: model.ErrorReason,
expectedErrorCode: model.GeneralErrorCode,
},
"fallback to default variant if percentages don't sum to 100": {
flags: Flags{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
Variants: map[string]any{
"red": "#FF0000",
"blue": "#0000FF",
"green": "#00FF00",
"yellow": "#FFFF00",
},
Targeting: []byte(`{
"fractionalEvaluation": [
"email",
[
"red",
25
],
[
"blue",
25
]
]
}`),
},
},
},
flagKey: "headerColor",
context: map[string]any{
"email": "foo@foo.com",
},
expectedVariant: "red",
expectedValue: "#FF0000",
expectedReason: model.DefaultReason,
},
}
const reqID = "default"
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, store.NewFlags())
je.store.Flags = tt.flags.Flags
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
if value != tt.expectedValue {
t.Errorf("expected value '%s', got '%s'", tt.expectedValue, value)
}
if variant != tt.expectedVariant {
t.Errorf("expected variant '%s', got '%s'", tt.expectedVariant, variant)
}
if reason != tt.expectedReason {
t.Errorf("expected reason '%s', got '%s'", tt.expectedReason, reason)
}
if err != nil {
errorCode := err.Error()
if errorCode != tt.expectedErrorCode {
t.Errorf("expected err '%v', got '%v'", tt.expectedErrorCode, err)
}
}
})
}
}

View File

@ -316,6 +316,8 @@ func TestSemVerOperator_Compare(t *testing.T) {
}
func TestJSONEvaluator_semVerEvaluation(t *testing.T) {
const source = "testSource"
var sources = []string{source}
ctx := context.Background()
tests := map[string]struct {
@ -922,8 +924,12 @@ func TestJSONEvaluator_semVerEvaluation(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, store.NewFlags())
je.store.Flags = tt.flags.Flags
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags.Flags, model.Metadata{})
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)

View File

@ -13,6 +13,8 @@ import (
)
func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
const source = "testSource"
var sources = []string{source}
ctx := context.Background()
tests := map[string]struct {
@ -185,8 +187,12 @@ func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, store.NewFlags())
je.store.Flags = tt.flags.Flags
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags.Flags, model.Metadata{})
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
@ -210,6 +216,8 @@ func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
}
func TestJSONEvaluator_endsWithEvaluation(t *testing.T) {
const source = "testSource"
var sources = []string{source}
ctx := context.Background()
tests := map[string]struct {
@ -382,9 +390,12 @@ func TestJSONEvaluator_endsWithEvaluation(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
log := logger.NewLogger(nil, false)
je := NewJSON(log, store.NewFlags())
je.store.Flags = tt.flags.Flags
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags.Flags, model.Metadata{})
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)

View File

@ -2,7 +2,15 @@ package model
import "encoding/json"
const Key = "Key"
const FlagSetId = "FlagSetId"
const Source = "Source"
const Priority = "Priority"
type Flag struct {
Key string `json:"-"` // not serialized, used only for indexing
FlagSetId string `json:"-"` // not serialized, used only for indexing
Priority int `json:"-"` // not serialized, used only for indexing
State string `json:"state"`
DefaultVariant string `json:"defaultVariant"`
Variants map[string]any `json:"variants"`

View File

@ -0,0 +1,52 @@
package notifications
import (
"reflect"
"github.com/open-feature/flagd/core/pkg/model"
)
const typeField = "type"
// Use to represent change notifications for mode PROVIDER_CONFIGURATION_CHANGE events.
type Notifications map[string]any
// Generate notifications (deltas) from old and new flag sets for use in RPC mode PROVIDER_CONFIGURATION_CHANGE events.
func NewFromFlags(oldFlags, newFlags map[string]model.Flag) Notifications {
notifications := map[string]interface{}{}
// flags removed
for key := range oldFlags {
if _, ok := newFlags[key]; !ok {
notifications[key] = map[string]interface{}{
typeField: string(model.NotificationDelete),
}
}
}
// flags added or modified
for key, newFlag := range newFlags {
oldFlag, exists := oldFlags[key]
if !exists {
notifications[key] = map[string]interface{}{
typeField: string(model.NotificationCreate),
}
} else if !flagsEqual(oldFlag, newFlag) {
notifications[key] = map[string]interface{}{
typeField: string(model.NotificationUpdate),
}
}
}
return notifications
}
func flagsEqual(a, b model.Flag) bool {
return a.State == b.State &&
a.DefaultVariant == b.DefaultVariant &&
reflect.DeepEqual(a.Variants, b.Variants) &&
reflect.DeepEqual(a.Targeting, b.Targeting) &&
a.Source == b.Source &&
a.Selector == b.Selector &&
reflect.DeepEqual(a.Metadata, b.Metadata)
}

View File

@ -0,0 +1,102 @@
package notifications
import (
"testing"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/stretchr/testify/assert"
)
func TestNewFromFlags(t *testing.T) {
flagA := model.Flag{
Key: "flagA",
State: "ENABLED",
DefaultVariant: "on",
Source: "source1",
}
flagAUpdated := model.Flag{
Key: "flagA",
State: "DISABLED",
DefaultVariant: "on",
Source: "source1",
}
flagB := model.Flag{
Key: "flagB",
State: "ENABLED",
DefaultVariant: "off",
Source: "source1",
}
tests := []struct {
name string
oldFlags map[string]model.Flag
newFlags map[string]model.Flag
want Notifications
}{
{
name: "flag added",
oldFlags: map[string]model.Flag{},
newFlags: map[string]model.Flag{"flagA": flagA},
want: Notifications{
"flagA": map[string]interface{}{
"type": string(model.NotificationCreate),
},
},
},
{
name: "flag deleted",
oldFlags: map[string]model.Flag{"flagA": flagA},
newFlags: map[string]model.Flag{},
want: Notifications{
"flagA": map[string]interface{}{
"type": string(model.NotificationDelete),
},
},
},
{
name: "flag changed",
oldFlags: map[string]model.Flag{"flagA": flagA},
newFlags: map[string]model.Flag{"flagA": flagAUpdated},
want: Notifications{
"flagA": map[string]interface{}{
"type": string(model.NotificationUpdate),
},
},
},
{
name: "flag unchanged",
oldFlags: map[string]model.Flag{"flagA": flagA},
newFlags: map[string]model.Flag{"flagA": flagA},
want: Notifications{},
},
{
name: "mixed changes",
oldFlags: map[string]model.Flag{
"flagA": flagA,
"flagB": flagB,
},
newFlags: map[string]model.Flag{
"flagA": flagAUpdated, // updated
"flagC": flagA, // added
},
want: Notifications{
"flagA": map[string]interface{}{
"type": string(model.NotificationUpdate),
},
"flagB": map[string]interface{}{
"type": string(model.NotificationDelete),
},
"flagC": map[string]interface{}{
"type": string(model.NotificationCreate),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := NewFromFlags(tt.oldFlags, tt.newFlags)
assert.Equal(t, tt.want, got)
})
}
}

View File

@ -1,357 +0,0 @@
package store
import (
"context"
"encoding/json"
"fmt"
"maps"
"reflect"
"sync"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
)
type del = struct{}
var deleteMarker *del
type IStore interface {
GetAll(ctx context.Context) (map[string]model.Flag, model.Metadata, error)
Get(ctx context.Context, key string) (model.Flag, model.Metadata, bool)
SelectorForFlag(ctx context.Context, flag model.Flag) string
}
type State struct {
mx sync.RWMutex
Flags map[string]model.Flag `json:"flags"`
FlagSources []string
SourceDetails map[string]SourceDetails `json:"sourceMetadata,omitempty"`
MetadataPerSource map[string]model.Metadata `json:"metadata,omitempty"`
}
type SourceDetails struct {
Source string
Selector string
}
func (f *State) hasPriority(stored string, new string) bool {
if stored == new {
return true
}
for i := len(f.FlagSources) - 1; i >= 0; i-- {
switch f.FlagSources[i] {
case stored:
return false
case new:
return true
}
}
return true
}
func NewFlags() *State {
return &State{
Flags: map[string]model.Flag{},
SourceDetails: map[string]SourceDetails{},
MetadataPerSource: map[string]model.Metadata{},
}
}
func (f *State) Set(key string, flag model.Flag) {
f.mx.Lock()
defer f.mx.Unlock()
f.Flags[key] = flag
}
func (f *State) Get(_ context.Context, key string) (model.Flag, model.Metadata, bool) {
f.mx.RLock()
defer f.mx.RUnlock()
metadata := f.getMetadata()
flag, ok := f.Flags[key]
if ok {
metadata = f.GetMetadataForSource(flag.Source)
}
return flag, metadata, ok
}
func (f *State) SelectorForFlag(_ context.Context, flag model.Flag) string {
f.mx.RLock()
defer f.mx.RUnlock()
return f.SourceDetails[flag.Source].Selector
}
func (f *State) Delete(key string) {
f.mx.Lock()
defer f.mx.Unlock()
delete(f.Flags, key)
}
func (f *State) String() (string, error) {
f.mx.RLock()
defer f.mx.RUnlock()
bytes, err := json.Marshal(f)
if err != nil {
return "", fmt.Errorf("unable to marshal flags: %w", err)
}
return string(bytes), nil
}
// GetAll returns a copy of the store's state (copy in order to be concurrency safe)
func (f *State) GetAll(_ context.Context) (map[string]model.Flag, model.Metadata, error) {
f.mx.RLock()
defer f.mx.RUnlock()
flags := make(map[string]model.Flag, len(f.Flags))
for key, flag := range f.Flags {
flags[key] = flag
}
return flags, f.getMetadata(), nil
}
// Add new flags from source.
func (f *State) Add(logger *logger.Logger, source string, selector string, flags map[string]model.Flag,
) map[string]interface{} {
notifications := map[string]interface{}{}
for k, newFlag := range flags {
storedFlag, _, ok := f.Get(context.Background(), k)
if ok && !f.hasPriority(storedFlag.Source, source) {
logger.Debug(
fmt.Sprintf(
"not overwriting: flag %s from source %s does not have priority over %s",
k,
source,
storedFlag.Source,
),
)
continue
}
notifications[k] = map[string]interface{}{
"type": string(model.NotificationCreate),
"source": source,
}
// Store the new version of the flag
newFlag.Source = source
newFlag.Selector = selector
f.Set(k, newFlag)
}
return notifications
}
// Update existing flags from source.
func (f *State) Update(logger *logger.Logger, source string, selector string, flags map[string]model.Flag,
) map[string]interface{} {
notifications := map[string]interface{}{}
for k, flag := range flags {
storedFlag, _, ok := f.Get(context.Background(), k)
if !ok {
logger.Warn(
fmt.Sprintf("failed to update the flag, flag with key %s from source %s does not exist.",
k,
source))
continue
}
if !f.hasPriority(storedFlag.Source, source) {
logger.Debug(
fmt.Sprintf(
"not updating: flag %s from source %s does not have priority over %s",
k,
source,
storedFlag.Source,
),
)
continue
}
notifications[k] = map[string]interface{}{
"type": string(model.NotificationUpdate),
"source": source,
}
flag.Source = source
flag.Selector = selector
f.Set(k, flag)
}
return notifications
}
// DeleteFlags matching flags from source.
func (f *State) DeleteFlags(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
logger.Debug(
fmt.Sprintf(
"store resync triggered: delete event from source %s",
source,
),
)
ctx := context.Background()
_, ok := f.MetadataPerSource[source]
if ok {
delete(f.MetadataPerSource, source)
}
notifications := map[string]interface{}{}
if len(flags) == 0 {
allFlags, _, err := f.GetAll(ctx)
if err != nil {
logger.Error(fmt.Sprintf("error while retrieving flags from the store: %v", err))
return notifications
}
for key, flag := range allFlags {
if flag.Source != source {
continue
}
notifications[key] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}
f.Delete(key)
}
}
for k := range flags {
flag, _, ok := f.Get(ctx, k)
if ok {
if !f.hasPriority(flag.Source, source) {
logger.Debug(
fmt.Sprintf(
"not deleting: flag %s from source %s cannot be deleted by %s",
k,
flag.Source,
source,
),
)
continue
}
notifications[k] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}
f.Delete(k)
} else {
logger.Warn(
fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exist.",
k,
source))
}
}
return notifications
}
// Merge provided flags from source with currently stored flags.
// nolint: funlen
func (f *State) Merge(
logger *logger.Logger,
source string,
selector string,
flags map[string]model.Flag,
metadata model.Metadata,
) (map[string]interface{}, bool) {
notifications := map[string]interface{}{}
resyncRequired := false
f.mx.Lock()
f.setSourceMetadata(source, metadata)
for k, v := range f.Flags {
if v.Source == source && v.Selector == selector {
if _, ok := flags[k]; !ok {
// flag has been deleted
delete(f.Flags, k)
notifications[k] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}
resyncRequired = true
logger.Debug(
fmt.Sprintf(
"store resync triggered: flag %s has been deleted from source %s",
k, source,
),
)
continue
}
}
}
f.mx.Unlock()
for k, newFlag := range flags {
newFlag.Source = source
newFlag.Selector = selector
storedFlag, _, ok := f.Get(context.Background(), k)
if ok {
if !f.hasPriority(storedFlag.Source, source) {
logger.Debug(
fmt.Sprintf(
"not merging: flag %s from source %s does not have priority over %s",
k, source, storedFlag.Source,
),
)
continue
}
if reflect.DeepEqual(storedFlag, newFlag) {
continue
}
}
if !ok {
notifications[k] = map[string]interface{}{
"type": string(model.NotificationCreate),
"source": source,
}
} else {
notifications[k] = map[string]interface{}{
"type": string(model.NotificationUpdate),
"source": source,
}
}
// Store the new version of the flag
f.Set(k, newFlag)
}
return notifications, resyncRequired
}
func (f *State) GetMetadataForSource(source string) model.Metadata {
perSource, ok := f.MetadataPerSource[source]
if ok && perSource != nil {
return maps.Clone(perSource)
}
return model.Metadata{}
}
func (f *State) getMetadata() model.Metadata {
metadata := model.Metadata{}
for _, perSource := range f.MetadataPerSource {
for key, entry := range perSource {
_, exists := metadata[key]
if !exists {
metadata[key] = entry
} else {
metadata[key] = deleteMarker
}
}
}
// keys that exist across multiple sources are deleted
maps.DeleteFunc(metadata, func(key string, _ interface{}) bool {
return metadata[key] == deleteMarker
})
return metadata
}
func (f *State) setSourceMetadata(source string, metadata model.Metadata) {
if f.MetadataPerSource == nil {
f.MetadataPerSource = map[string]model.Metadata{}
}
f.MetadataPerSource[source] = metadata
}

View File

@ -1,545 +0,0 @@
package store
import (
"reflect"
"testing"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/stretchr/testify/require"
)
func TestHasPriority(t *testing.T) {
tests := []struct {
name string
currentState *State
storedSource string
newSource string
hasPriority bool
}{
{
name: "same source",
currentState: &State{},
storedSource: "A",
newSource: "A",
hasPriority: true,
},
{
name: "no priority",
currentState: &State{
FlagSources: []string{
"B",
"A",
},
},
storedSource: "A",
newSource: "B",
hasPriority: false,
},
{
name: "priority",
currentState: &State{
FlagSources: []string{
"A",
"B",
},
},
storedSource: "A",
newSource: "B",
hasPriority: true,
},
{
name: "not in sources",
currentState: &State{
FlagSources: []string{
"A",
"B",
},
},
storedSource: "C",
newSource: "D",
hasPriority: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
p := tt.currentState.hasPriority(tt.storedSource, tt.newSource)
require.Equal(t, p, tt.hasPriority)
})
}
}
func TestMergeFlags(t *testing.T) {
t.Parallel()
tests := []struct {
name string
current *State
new map[string]model.Flag
newSource string
newSelector string
want *State
wantNotifs map[string]interface{}
wantResync bool
}{
{
name: "both nil",
current: &State{Flags: nil},
new: nil,
want: &State{Flags: nil},
wantNotifs: map[string]interface{}{},
},
{
name: "both empty flags",
current: &State{Flags: map[string]model.Flag{}},
new: map[string]model.Flag{},
want: &State{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{},
},
{
name: "empty new",
current: &State{Flags: map[string]model.Flag{}},
new: nil,
want: &State{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{},
},
{
name: "merging with new source",
current: &State{
Flags: map[string]model.Flag{
"waka": {
DefaultVariant: "off",
Source: "1",
},
},
},
new: map[string]model.Flag{
"paka": {
DefaultVariant: "on",
},
},
newSource: "2",
want: &State{Flags: map[string]model.Flag{
"waka": {
DefaultVariant: "off",
Source: "1",
},
"paka": {
DefaultVariant: "on",
Source: "2",
},
}},
wantNotifs: map[string]interface{}{"paka": map[string]interface{}{"type": "write", "source": "2"}},
},
{
name: "override by new update",
current: &State{Flags: map[string]model.Flag{
"waka": {DefaultVariant: "off"},
"paka": {DefaultVariant: "off"},
}},
new: map[string]model.Flag{
"waka": {DefaultVariant: "on"},
"paka": {DefaultVariant: "on"},
},
want: &State{Flags: map[string]model.Flag{
"waka": {DefaultVariant: "on"},
"paka": {DefaultVariant: "on"},
}},
wantNotifs: map[string]interface{}{
"waka": map[string]interface{}{"type": "update", "source": ""},
"paka": map[string]interface{}{"type": "update", "source": ""},
},
},
{
name: "identical update so empty notifications",
current: &State{
Flags: map[string]model.Flag{"hello": {DefaultVariant: "off"}},
},
new: map[string]model.Flag{
"hello": {DefaultVariant: "off"},
},
want: &State{Flags: map[string]model.Flag{
"hello": {DefaultVariant: "off"},
}},
wantNotifs: map[string]interface{}{},
},
{
name: "deleted flag & trigger resync for same source",
current: &State{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A"}}},
new: map[string]model.Flag{},
newSource: "A",
want: &State{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{"hello": map[string]interface{}{"type": "delete", "source": "A"}},
wantResync: true,
},
{
name: "no deleted & no resync for same source but different selector",
current: &State{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A", Selector: "X"}}},
new: map[string]model.Flag{},
newSource: "A",
newSelector: "Y",
want: &State{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A", Selector: "X"}}},
wantResync: false,
wantNotifs: map[string]interface{}{},
},
{
name: "no merge due to low priority",
current: &State{
FlagSources: []string{
"B",
"A",
},
Flags: map[string]model.Flag{
"hello": {
DefaultVariant: "off",
Source: "A",
},
},
},
new: map[string]model.Flag{"hello": {DefaultVariant: "off"}},
newSource: "B",
want: &State{
FlagSources: []string{
"B",
"A",
},
Flags: map[string]model.Flag{
"hello": {
DefaultVariant: "off",
Source: "A",
},
},
},
wantNotifs: map[string]interface{}{},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
gotNotifs, resyncRequired := tt.current.Merge(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new, model.Metadata{})
require.True(t, reflect.DeepEqual(tt.want.Flags, tt.current.Flags))
require.Equal(t, tt.wantNotifs, gotNotifs)
require.Equal(t, tt.wantResync, resyncRequired)
})
}
}
func TestFlags_Add(t *testing.T) {
mockLogger := logger.NewLogger(nil, false)
mockSource := "source"
mockOverrideSource := "source-2"
type request struct {
source string
selector string
flags map[string]model.Flag
}
tests := []struct {
name string
storedState *State
addRequest request
expectedState *State
expectedNotificationKeys []string
}{
{
name: "Add success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
addRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"B": {Source: mockSource},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
},
},
expectedNotificationKeys: []string{"B"},
},
{
name: "Add multiple success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
addRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"B": {Source: mockSource},
"C": {Source: mockSource},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource},
},
},
expectedNotificationKeys: []string{"B", "C"},
},
{
name: "Add success - conflict and override",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
addRequest: request{
source: mockOverrideSource,
flags: map[string]model.Flag{
"A": {Source: mockOverrideSource},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockOverrideSource},
},
},
expectedNotificationKeys: []string{"A"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := tt.storedState.Add(mockLogger, tt.addRequest.source, tt.addRequest.selector, tt.addRequest.flags)
require.Equal(t, tt.storedState, tt.expectedState)
for k := range messages {
require.Containsf(t, tt.expectedNotificationKeys, k,
"Message key %s not present in the expected key list", k)
}
})
}
}
func TestFlags_Update(t *testing.T) {
mockLogger := logger.NewLogger(nil, false)
mockSource := "source"
mockOverrideSource := "source-2"
type request struct {
source string
selector string
flags map[string]model.Flag
}
tests := []struct {
name string
storedState *State
UpdateRequest request
expectedState *State
expectedNotificationKeys []string
}{
{
name: "Update success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "True"},
},
},
UpdateRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedNotificationKeys: []string{"A"},
},
{
name: "Update multiple success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "True"},
"B": {Source: mockSource, DefaultVariant: "True"},
},
},
UpdateRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
"B": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
"B": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedNotificationKeys: []string{"A", "B"},
},
{
name: "Update success - conflict and override",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "True"},
},
},
UpdateRequest: request{
source: mockOverrideSource,
flags: map[string]model.Flag{
"A": {Source: mockOverrideSource, DefaultVariant: "True"},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockOverrideSource, DefaultVariant: "True"},
},
},
expectedNotificationKeys: []string{"A"},
},
{
name: "Update fail",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
UpdateRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"B": {Source: mockSource},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
expectedNotificationKeys: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := tt.storedState.Update(mockLogger, tt.UpdateRequest.source,
tt.UpdateRequest.selector, tt.UpdateRequest.flags)
require.Equal(t, tt.storedState, tt.expectedState)
for k := range messages {
require.Containsf(t, tt.expectedNotificationKeys, k,
"Message key %s not present in the expected key list", k)
}
})
}
}
func TestFlags_Delete(t *testing.T) {
mockLogger := logger.NewLogger(nil, false)
mockSource := "source"
mockSource2 := "source2"
tests := []struct {
name string
storedState *State
deleteRequest map[string]model.Flag
expectedState *State
expectedNotificationKeys []string
}{
{
name: "Remove success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
deleteRequest: map[string]model.Flag{
"A": {Source: mockSource},
},
expectedState: &State{
Flags: map[string]model.Flag{
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
expectedNotificationKeys: []string{"A"},
},
{
name: "Nothing to remove",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
deleteRequest: map[string]model.Flag{
"C": {Source: mockSource},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
expectedNotificationKeys: []string{},
},
{
name: "Remove all",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
},
deleteRequest: map[string]model.Flag{},
expectedState: &State{
Flags: map[string]model.Flag{
"C": {Source: mockSource2},
},
},
expectedNotificationKeys: []string{"A", "B"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := tt.storedState.DeleteFlags(mockLogger, mockSource, tt.deleteRequest)
require.Equal(t, tt.storedState, tt.expectedState)
for k := range messages {
require.Containsf(t, tt.expectedNotificationKeys, k,
"Message key %s not present in the expected key list", k)
}
})
}
}

133
core/pkg/store/query.go Normal file
View File

@ -0,0 +1,133 @@
package store
import (
"maps"
"sort"
"strings"
uuid "github.com/google/uuid"
"github.com/open-feature/flagd/core/pkg/model"
)
// flags table and index constants
const flagsTable = "flags"
const idIndex = "id"
const keyIndex = "key"
const sourceIndex = "source"
const priorityIndex = "priority"
const flagSetIdIndex = "flagSetId"
// compound indices; maintain sub-indexes alphabetically; order matters; these must match what's generated in the SelectorMapToQuery func.
const flagSetIdSourceCompoundIndex = flagSetIdIndex + "+" + sourceIndex
const keySourceCompoundIndex = keyIndex + "+" + sourceIndex
const flagSetIdKeySourceCompoundIndex = flagSetIdIndex + "+" + keyIndex + "+" + sourceIndex
// flagSetId defaults to a UUID generated at startup to make our queries consistent
// any flag without a "flagSetId" is assigned this one; it's never exposed externally
var nilFlagSetId = uuid.New().String()
// A selector represents a set of constraints used to query the store.
type Selector struct {
indexMap map[string]string
}
// NewSelector creates a new Selector from a selector expression string.
// For example, to select flags from source "./mySource" and flagSetId "1234", use the expression:
// "source=./mySource,flagSetId=1234"
func NewSelector(selectorExpression string) Selector {
return Selector{
indexMap: expressionToMap(selectorExpression),
}
}
func expressionToMap(sExp string) map[string]string {
selectorMap := make(map[string]string)
if sExp == "" {
return selectorMap
}
if strings.Index(sExp, "=") == -1 {
// if no '=' is found, treat the whole string as as source (backwards compatibility)
// we may may support interpreting this as a flagSetId in the future as an option
selectorMap[sourceIndex] = sExp
return selectorMap
}
// Split the selector by commas
pairs := strings.Split(sExp, ",")
for _, pair := range pairs {
// Split each pair by the first equal sign
parts := strings.Split(pair, "=")
if len(parts) == 2 {
key := parts[0]
value := parts[1]
selectorMap[key] = value
}
}
return selectorMap
}
func (s Selector) WithIndex(key string, value string) Selector {
m := maps.Clone(s.indexMap)
m[key] = value
return Selector{
indexMap: m,
}
}
func (s *Selector) IsEmpty() bool {
return s == nil || len(s.indexMap) == 0
}
// SelectorMapToQuery converts the selector map to an indexId and constraints for querying the store.
// For a given index, a specific order and number of constraints are required.
// Both the indexId and constraints are generated based on the keys present in the selector's internal map.
func (s Selector) ToQuery() (indexId string, constraints []interface{}) {
if len(s.indexMap) == 2 && s.indexMap[flagSetIdIndex] != "" && s.indexMap[keyIndex] != "" {
// special case for flagSetId and key (this is the "id" index)
return idIndex, []interface{}{s.indexMap[flagSetIdIndex], s.indexMap[keyIndex]}
}
qs := []string{}
keys := make([]string, 0, len(s.indexMap))
for key := range s.indexMap {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
indexId += key + "+"
qs = append(qs, s.indexMap[key])
}
indexId = strings.TrimSuffix(indexId, "+")
// Convert []string to []interface{}
c := make([]interface{}, 0, len(qs))
for _, v := range qs {
c = append(c, v)
}
constraints = c
return indexId, constraints
}
// SelectorToMetadata converts the selector's internal map to metadata for logging or tracing purposes.
// Only includes known indices to avoid leaking sensitive information, and is usually returned as the "top level" metadata
func (s *Selector) ToMetadata() model.Metadata {
meta := model.Metadata{}
if s == nil || s.indexMap == nil {
return meta
}
if s.indexMap[flagSetIdIndex] != "" {
meta[flagSetIdIndex] = s.indexMap[flagSetIdIndex]
}
if s.indexMap[sourceIndex] != "" {
meta[sourceIndex] = s.indexMap[sourceIndex]
}
return meta
}

View File

@ -0,0 +1,193 @@
package store
import (
"reflect"
"testing"
"github.com/open-feature/flagd/core/pkg/model"
)
func TestSelector_IsEmpty(t *testing.T) {
tests := []struct {
name string
selector *Selector
wantEmpty bool
}{
{
name: "nil selector",
selector: nil,
wantEmpty: true,
},
{
name: "nil indexMap",
selector: &Selector{indexMap: nil},
wantEmpty: true,
},
{
name: "empty indexMap",
selector: &Selector{indexMap: map[string]string{}},
wantEmpty: true,
},
{
name: "non-empty indexMap",
selector: &Selector{indexMap: map[string]string{"source": "abc"}},
wantEmpty: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.selector.IsEmpty()
if got != tt.wantEmpty {
t.Errorf("IsEmpty() = %v, want %v", got, tt.wantEmpty)
}
})
}
}
func TestSelector_WithIndex(t *testing.T) {
oldS := Selector{indexMap: map[string]string{"source": "abc"}}
newS := oldS.WithIndex("flagSetId", "1234")
if newS.indexMap["source"] != "abc" {
t.Errorf("WithIndex did not preserve existing keys")
}
if newS.indexMap["flagSetId"] != "1234" {
t.Errorf("WithIndex did not add new key")
}
// Ensure original is unchanged
if _, ok := oldS.indexMap["flagSetId"]; ok {
t.Errorf("WithIndex mutated original selector")
}
}
func TestSelector_ToQuery(t *testing.T) {
tests := []struct {
name string
selector Selector
wantIndex string
wantConstr []interface{}
}{
{
name: "flagSetId and key primary index special case",
selector: Selector{indexMap: map[string]string{"flagSetId": "fsid", "key": "myKey"}},
wantIndex: "id",
wantConstr: []interface{}{"fsid", "myKey"},
},
{
name: "multiple keys sorted",
selector: Selector{indexMap: map[string]string{"source": "src", "flagSetId": "fsid"}},
wantIndex: "flagSetId+source",
wantConstr: []interface{}{"fsid", "src"},
},
{
name: "single key",
selector: Selector{indexMap: map[string]string{"source": "src"}},
wantIndex: "source",
wantConstr: []interface{}{"src"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotIndex, gotConstr := tt.selector.ToQuery()
if gotIndex != tt.wantIndex {
t.Errorf("ToQuery() index = %v, want %v", gotIndex, tt.wantIndex)
}
if !reflect.DeepEqual(gotConstr, tt.wantConstr) {
t.Errorf("ToQuery() constraints = %v, want %v", gotConstr, tt.wantConstr)
}
})
}
}
func TestSelector_ToMetadata(t *testing.T) {
tests := []struct {
name string
selector *Selector
want model.Metadata
}{
{
name: "nil selector",
selector: nil,
want: model.Metadata{},
},
{
name: "nil indexMap",
selector: &Selector{indexMap: nil},
want: model.Metadata{},
},
{
name: "empty indexMap",
selector: &Selector{indexMap: map[string]string{}},
want: model.Metadata{},
},
{
name: "flagSetId only",
selector: &Selector{indexMap: map[string]string{"flagSetId": "fsid"}},
want: model.Metadata{"flagSetId": "fsid"},
},
{
name: "source only",
selector: &Selector{indexMap: map[string]string{"source": "src"}},
want: model.Metadata{"source": "src"},
},
{
name: "flagSetId and source",
selector: &Selector{indexMap: map[string]string{"flagSetId": "fsid", "source": "src"}},
want: model.Metadata{"flagSetId": "fsid", "source": "src"},
},
{
name: "flagSetId, source, and key (key should be ignored)",
selector: &Selector{indexMap: map[string]string{"flagSetId": "fsid", "source": "src", "key": "myKey"}},
want: model.Metadata{"flagSetId": "fsid", "source": "src"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.selector.ToMetadata()
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ToMetadata() = %v, want %v", got, tt.want)
}
})
}
}
func TestNewSelector(t *testing.T) {
tests := []struct {
name string
input string
wantMap map[string]string
}{
{
name: "source and flagSetId",
input: "source=abc,flagSetId=1234",
wantMap: map[string]string{"source": "abc", "flagSetId": "1234"},
},
{
name: "source",
input: "source=abc",
wantMap: map[string]string{"source": "abc"},
},
{
name: "no equals, treat as source",
input: "mysource",
wantMap: map[string]string{"source": "mysource"},
},
{
name: "empty string",
input: "",
wantMap: map[string]string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := NewSelector(tt.input)
if !reflect.DeepEqual(s.indexMap, tt.wantMap) {
t.Errorf("NewSelector(%q) indexMap = %v, want %v", tt.input, s.indexMap, tt.wantMap)
}
})
}
}

396
core/pkg/store/store.go Normal file
View File

@ -0,0 +1,396 @@
package store
import (
"context"
"encoding/json"
"fmt"
"slices"
"sync"
"github.com/hashicorp/go-memdb"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/notifications"
)
var noValidatedSources = []string{}
type SelectorContextKey struct{}
type FlagQueryResult struct {
Flags map[string]model.Flag
}
type IStore interface {
Get(ctx context.Context, key string, selector *Selector) (model.Flag, model.Metadata, error)
GetAll(ctx context.Context, selector *Selector) (map[string]model.Flag, model.Metadata, error)
Watch(ctx context.Context, selector *Selector, watcher chan<- FlagQueryResult)
}
var _ IStore = (*Store)(nil)
type Store struct {
mx sync.RWMutex
db *memdb.MemDB
logger *logger.Logger
sources []string
// deprecated: has no effect and will be removed soon.
FlagSources []string
}
type SourceDetails struct {
Source string
Selector string
}
// NewStore creates a new in-memory store with the given sources.
// The order of sources in the slice determines their priority, when queries result in duplicate flags (queries without source or flagSetId), the higher priority source "wins".
func NewStore(logger *logger.Logger, sources []string) (*Store, error) {
// a unique index must exist for each set of constraints - for example, to look up by key and source, we need a compound index on key+source, etc
// we maybe want to generate these dynamically in the future to support more robust querying, but for now we will hardcode the ones we need
schema := &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
flagsTable: {
Name: flagsTable,
Indexes: map[string]*memdb.IndexSchema{
// primary index; must be unique and named "id"
idIndex: {
Name: idIndex,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
&memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
},
},
},
// for looking up by source
sourceIndex: {
Name: sourceIndex,
Unique: false,
Indexer: &memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
},
// for looking up by priority, used to maintain highest priority flag when there are duplicates and no selector is provided
priorityIndex: {
Name: priorityIndex,
Unique: false,
Indexer: &memdb.IntFieldIndex{Field: model.Priority},
},
// for looking up by flagSetId
flagSetIdIndex: {
Name: flagSetIdIndex,
Unique: false,
Indexer: &memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
},
keyIndex: {
Name: keyIndex,
Unique: false,
Indexer: &memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
},
flagSetIdSourceCompoundIndex: {
Name: flagSetIdSourceCompoundIndex,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
&memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
},
},
},
keySourceCompoundIndex: {
Name: keySourceCompoundIndex,
Unique: false, // duplicate from a single source ARE allowed (they just must have different flag sets)
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
&memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
},
},
},
// used to query all flags from a specific source so we know which flags to delete if a flag is missing from a source
flagSetIdKeySourceCompoundIndex: {
Name: flagSetIdKeySourceCompoundIndex,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: model.FlagSetId, Lowercase: false},
&memdb.StringFieldIndex{Field: model.Key, Lowercase: false},
&memdb.StringFieldIndex{Field: model.Source, Lowercase: false},
},
},
},
},
},
},
}
// Create a new data base
db, err := memdb.NewMemDB(schema)
if err != nil {
return nil, fmt.Errorf("unable to initialize flag database: %w", err)
}
// clone the sources to avoid modifying the original slice
s := slices.Clone(sources)
return &Store{
sources: s,
db: db,
logger: logger,
}, nil
}
// Deprecated: use NewStore instead - will be removed very soon.
func NewFlags() *Store {
state, err := NewStore(logger.NewLogger(nil, false), noValidatedSources)
if err != nil {
panic(fmt.Sprintf("unable to create flag store: %v", err))
}
return state
}
func (s *Store) Get(_ context.Context, key string, selector *Selector) (model.Flag, model.Metadata, error) {
s.logger.Debug(fmt.Sprintf("getting flag %s", key))
txn := s.db.Txn(false)
queryMeta := selector.ToMetadata()
// if present, use the selector to query the flags
if !selector.IsEmpty() {
selector := selector.WithIndex("key", key)
indexId, constraints := selector.ToQuery()
s.logger.Debug(fmt.Sprintf("getting flag with query: %s, %v", indexId, constraints))
raw, err := txn.First(flagsTable, indexId, constraints...)
flag, ok := raw.(model.Flag)
if err != nil {
return model.Flag{}, queryMeta, fmt.Errorf("flag %s not found: %w", key, err)
}
if !ok {
return model.Flag{}, queryMeta, fmt.Errorf("flag %s is not a valid flag", key)
}
return flag, queryMeta, nil
}
// otherwise, get all flags with the given key, and keep the last one with the highest priority
s.logger.Debug(fmt.Sprintf("getting highest priority flag with key: %s", key))
it, err := txn.Get(flagsTable, keyIndex, key)
if err != nil {
return model.Flag{}, queryMeta, fmt.Errorf("flag %s not found: %w", key, err)
}
flag := model.Flag{}
found := false
for raw := it.Next(); raw != nil; raw = it.Next() {
nextFlag, ok := raw.(model.Flag)
if !ok {
continue
}
found = true
if nextFlag.Priority >= flag.Priority {
flag = nextFlag
} else {
s.logger.Debug(fmt.Sprintf("discarding flag %s from lower priority source %s in favor of flag from source %s", nextFlag.Key, s.sources[nextFlag.Priority], s.sources[flag.Priority]))
}
}
if !found {
return flag, queryMeta, fmt.Errorf("flag %s not found", key)
}
return flag, queryMeta, nil
}
func (f *Store) String() (string, error) {
f.logger.Debug("dumping flags to string")
f.mx.RLock()
defer f.mx.RUnlock()
state, _, err := f.GetAll(context.Background(), nil)
if err != nil {
return "", fmt.Errorf("unable to get all flags: %w", err)
}
bytes, err := json.Marshal(state)
if err != nil {
return "", fmt.Errorf("unable to marshal flags: %w", err)
}
return string(bytes), nil
}
// GetAll returns a copy of the store's state (copy in order to be concurrency safe)
func (s *Store) GetAll(ctx context.Context, selector *Selector) (map[string]model.Flag, model.Metadata, error) {
flags := make(map[string]model.Flag)
queryMeta := selector.ToMetadata()
it, err := s.selectOrAll(selector)
if err != nil {
s.logger.Error(fmt.Sprintf("flag query error: %v", err))
return flags, queryMeta, err
}
flags = s.collect(it)
return flags, queryMeta, nil
}
// Update the flag state with the provided flags.
func (s *Store) Update(
source string,
flags map[string]model.Flag,
metadata model.Metadata,
) (map[string]interface{}, bool) {
resyncRequired := false
if source == "" {
panic("source cannot be empty")
}
priority := slices.Index(s.sources, source)
if priority == -1 {
// this is a hack to allow old constructors that didn't pass sources, remove when we remove "NewFlags" constructor
if !slices.Equal(s.sources, noValidatedSources) {
panic(fmt.Sprintf("source %s is not registered in the store", source))
}
// same as above - remove when we remove "NewFlags" constructor
priority = 0
}
txn := s.db.Txn(true)
defer txn.Abort()
// get all flags for the source we are updating
selector := NewSelector(sourceIndex + "=" + source)
oldFlags, _, _ := s.GetAll(context.Background(), &selector)
s.mx.Lock()
for key := range oldFlags {
if _, ok := flags[key]; !ok {
// flag has been deleted
s.logger.Debug(fmt.Sprintf("flag %s has been deleted from source %s", key, source))
count, err := txn.DeleteAll(flagsTable, keySourceCompoundIndex, key, source)
s.logger.Debug(fmt.Sprintf("deleted %d flags with key %s from source %s", count, key, source))
if err != nil {
s.logger.Error(fmt.Sprintf("error deleting flag: %s, %v", key, err))
}
continue
}
}
s.mx.Unlock()
for key, newFlag := range flags {
s.logger.Debug(fmt.Sprintf("got metadata %v", metadata))
newFlag.Key = key
newFlag.Source = source
newFlag.Priority = priority
newFlag.Metadata = patchMetadata(metadata, newFlag.Metadata)
// flagSetId defaults to a UUID generated at startup to make our queries isomorphic
flagSetId := nilFlagSetId
// flagSetId is inherited from the set, but can be overridden by the flag
setFlagSetId, ok := newFlag.Metadata["flagSetId"].(string)
if ok {
flagSetId = setFlagSetId
}
newFlag.FlagSetId = flagSetId
raw, err := txn.First(flagsTable, keySourceCompoundIndex, key, source)
if err != nil {
s.logger.Error(fmt.Sprintf("unable to get flag %s from source %s: %v", key, source, err))
continue
}
oldFlag, ok := raw.(model.Flag)
// If we already have a flag with the same key and source, we need to check if it has the same flagSetId
if ok {
if oldFlag.FlagSetId != newFlag.FlagSetId {
// If the flagSetId is different, we need to delete the entry, since flagSetId+key represents the primary index, and it's now been changed.
// This is important especially for clients listening to flagSetId changes, as they expect the flag to be removed from the set in this case.
_, err = txn.DeleteAll(flagsTable, idIndex, oldFlag.FlagSetId, key)
if err != nil {
s.logger.Error(fmt.Sprintf("unable to delete flags with key %s and flagSetId %s: %v", key, oldFlag.FlagSetId, err))
continue
}
}
}
// Store the new version of the flag
s.logger.Debug(fmt.Sprintf("storing flag: %v", newFlag))
err = txn.Insert(flagsTable, newFlag)
if err != nil {
s.logger.Error(fmt.Sprintf("unable to insert flag %s: %v", key, err))
continue
}
}
txn.Commit()
return notifications.NewFromFlags(oldFlags, flags), resyncRequired
}
// Watch the result-set of a selector for changes, sending updates to the watcher channel.
func (s *Store) Watch(ctx context.Context, selector *Selector, watcher chan<- FlagQueryResult) {
go func() {
for {
ws := memdb.NewWatchSet()
it, err := s.selectOrAll(selector)
if err != nil {
s.logger.Error(fmt.Sprintf("error watching flags: %v", err))
close(watcher)
return
}
ws.Add(it.WatchCh())
flags := s.collect(it)
watcher <- FlagQueryResult{
Flags: flags,
}
if err = ws.WatchCtx(ctx); err != nil {
s.logger.Error(fmt.Sprintf("error watching flags: %v", err))
close(watcher)
return
}
}
}()
}
// returns an iterator for the given selector, or all flags if the selector is nil or empty
func (s *Store) selectOrAll(selector *Selector) (it memdb.ResultIterator, err error) {
txn := s.db.Txn(false)
if !selector.IsEmpty() {
indexId, constraints := selector.ToQuery()
s.logger.Debug(fmt.Sprintf("getting all flags with query: %s, %v", indexId, constraints))
return txn.Get(flagsTable, indexId, constraints...)
} else {
// no selector, get all flags
return txn.Get(flagsTable, idIndex)
}
}
// collects flags from an iterator, ensuring that only the highest priority flag is kept when there are duplicates
func (s *Store) collect(it memdb.ResultIterator) map[string]model.Flag {
flags := make(map[string]model.Flag)
for raw := it.Next(); raw != nil; raw = it.Next() {
flag := raw.(model.Flag)
if existing, ok := flags[flag.Key]; ok {
if flag.Priority < existing.Priority {
s.logger.Debug(fmt.Sprintf("discarding duplicate flag %s from lower priority source %s in favor of flag from source %s", flag.Key, s.sources[flag.Priority], s.sources[existing.Priority]))
continue // we already have a higher priority flag
}
s.logger.Debug(fmt.Sprintf("overwriting duplicate flag %s from lower priority source %s in favor of flag from source %s", flag.Key, s.sources[existing.Priority], s.sources[flag.Priority]))
}
flags[flag.Key] = flag
}
return flags
}
func patchMetadata(original, patch model.Metadata) model.Metadata {
patched := make(model.Metadata)
if original == nil && patch == nil {
return nil
}
for key, value := range original {
patched[key] = value
}
for key, value := range patch { // patch values overwrite m1 values on key conflict
patched[key] = value
}
return patched
}

View File

@ -0,0 +1,487 @@
package store
import (
"context"
"testing"
"time"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestUpdateFlags(t *testing.T) {
const source1 = "source1"
const source2 = "source2"
var sources = []string{source1, source2}
t.Parallel()
tests := []struct {
name string
setup func(t *testing.T) *Store
newFlags map[string]model.Flag
source string
wantFlags map[string]model.Flag
setMetadata model.Metadata
wantNotifs map[string]interface{}
wantResync bool
}{
{
name: "both nil",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
return s
},
source: source1,
newFlags: nil,
wantFlags: map[string]model.Flag{},
wantNotifs: map[string]interface{}{},
},
{
name: "both empty flags",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
return s
},
source: source1,
newFlags: map[string]model.Flag{},
wantFlags: map[string]model.Flag{},
wantNotifs: map[string]interface{}{},
},
{
name: "empty new",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
return s
},
source: source1,
newFlags: nil,
wantFlags: map[string]model.Flag{},
wantNotifs: map[string]interface{}{},
},
{
name: "update from source 1 (old flag removed)",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
s.Update(source1, map[string]model.Flag{
"waka": {DefaultVariant: "off"},
}, nil)
return s
},
newFlags: map[string]model.Flag{
"paka": {DefaultVariant: "on"},
},
source: source1,
wantFlags: map[string]model.Flag{
"paka": {Key: "paka", DefaultVariant: "on", Source: source1, FlagSetId: nilFlagSetId, Priority: 0},
},
wantNotifs: map[string]interface{}{
"paka": map[string]interface{}{"type": "write"},
"waka": map[string]interface{}{"type": "delete"},
},
},
{
name: "update from source 1 (new flag added)",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
s.Update(source1, map[string]model.Flag{
"waka": {DefaultVariant: "off"},
}, nil)
return s
},
newFlags: map[string]model.Flag{
"paka": {DefaultVariant: "on"},
},
source: source2,
wantFlags: map[string]model.Flag{
"waka": {Key: "waka", DefaultVariant: "off", Source: source1, FlagSetId: nilFlagSetId, Priority: 0},
"paka": {Key: "paka", DefaultVariant: "on", Source: source2, FlagSetId: nilFlagSetId, Priority: 1},
},
wantNotifs: map[string]interface{}{"paka": map[string]interface{}{"type": "write"}},
},
{
name: "flag set inheritance",
setup: func(t *testing.T) *Store {
s, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
s.Update(source1, map[string]model.Flag{}, model.Metadata{})
return s
},
setMetadata: model.Metadata{
"flagSetId": "topLevelSet", // top level set metadata, including flagSetId
},
newFlags: map[string]model.Flag{
"waka": {DefaultVariant: "on"},
"paka": {DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": "flagLevelSet"}}, // overrides set level flagSetId
},
source: source1,
wantFlags: map[string]model.Flag{
"waka": {Key: "waka", DefaultVariant: "on", Source: source1, FlagSetId: "topLevelSet", Priority: 0, Metadata: model.Metadata{"flagSetId": "topLevelSet"}},
"paka": {Key: "paka", DefaultVariant: "on", Source: source1, FlagSetId: "flagLevelSet", Priority: 0, Metadata: model.Metadata{"flagSetId": "flagLevelSet"}},
},
wantNotifs: map[string]interface{}{
"paka": map[string]interface{}{"type": "write"},
"waka": map[string]interface{}{"type": "write"},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
store := tt.setup(t)
gotNotifs, resyncRequired := store.Update(tt.source, tt.newFlags, tt.setMetadata)
gotFlags, _, _ := store.GetAll(context.Background(), nil)
require.Equal(t, tt.wantFlags, gotFlags)
require.Equal(t, tt.wantNotifs, gotNotifs)
require.Equal(t, tt.wantResync, resyncRequired)
})
}
}
func TestGet(t *testing.T) {
sourceA := "sourceA"
sourceB := "sourceB"
sourceC := "sourceC"
flagSetIdB := "flagSetIdA"
flagSetIdC := "flagSetIdC"
var sources = []string{sourceA, sourceB, sourceC}
sourceASelector := NewSelector("source=" + sourceA)
flagSetIdCSelector := NewSelector("flagSetId=" + flagSetIdC)
t.Parallel()
tests := []struct {
name string
key string
selector *Selector
wantFlag model.Flag
wantErr bool
}{
{
name: "nil selector",
key: "flagA",
selector: nil,
wantFlag: model.Flag{Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
wantErr: false,
},
{
name: "flagSetId selector",
key: "dupe",
selector: &flagSetIdCSelector,
wantFlag: model.Flag{Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
wantErr: false,
},
{
name: "source selector",
key: "dupe",
selector: &sourceASelector,
wantFlag: model.Flag{Key: "dupe", DefaultVariant: "on", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
wantErr: false,
},
{
name: "flag not found with source selector",
key: "flagB",
selector: &sourceASelector,
wantFlag: model.Flag{Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
wantErr: true,
},
{
name: "flag not found with flagSetId selector",
key: "flagB",
selector: &flagSetIdCSelector,
wantFlag: model.Flag{Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
wantErr: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
sourceAFlags := map[string]model.Flag{
"flagA": {Key: "flagA", DefaultVariant: "off"},
"dupe": {Key: "dupe", DefaultVariant: "on"},
}
sourceBFlags := map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdB}},
}
sourceCFlags := map[string]model.Flag{
"flagC": {Key: "flagC", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
"dupe": {Key: "dupe", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
}
store, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
store.Update(sourceA, sourceAFlags, nil)
store.Update(sourceB, sourceBFlags, nil)
store.Update(sourceC, sourceCFlags, nil)
gotFlag, _, err := store.Get(context.Background(), tt.key, tt.selector)
if !tt.wantErr {
require.Equal(t, tt.wantFlag, gotFlag)
} else {
require.Error(t, err, "expected an error for key %s with selector %v", tt.key, tt.selector)
}
})
}
}
func TestGetAllNoWatcher(t *testing.T) {
sourceA := "sourceA"
sourceB := "sourceB"
sourceC := "sourceC"
flagSetIdB := "flagSetIdA"
flagSetIdC := "flagSetIdC"
sources := []string{sourceA, sourceB, sourceC}
sourceASelector := NewSelector("source=" + sourceA)
flagSetIdCSelector := NewSelector("flagSetId=" + flagSetIdC)
t.Parallel()
tests := []struct {
name string
selector *Selector
wantFlags map[string]model.Flag
}{
{
name: "nil selector",
selector: nil,
wantFlags: map[string]model.Flag{
// "dupe" should be overwritten by higher priority flag
"flagA": {Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
"flagB": {Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
"flagC": {Key: "flagC", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
"dupe": {Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
},
},
{
name: "source selector",
selector: &sourceASelector,
wantFlags: map[string]model.Flag{
// we should get the "dupe" from sourceA
"flagA": {Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
"dupe": {Key: "dupe", DefaultVariant: "on", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
},
},
{
name: "flagSetId selector",
selector: &flagSetIdCSelector,
wantFlags: map[string]model.Flag{
// we should get the "dupe" from flagSetIdC
"flagC": {Key: "flagC", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
"dupe": {Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
sourceAFlags := map[string]model.Flag{
"flagA": {Key: "flagA", DefaultVariant: "off"},
"dupe": {Key: "dupe", DefaultVariant: "on"},
}
sourceBFlags := map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdB}},
}
sourceCFlags := map[string]model.Flag{
"flagC": {Key: "flagC", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
"dupe": {Key: "dupe", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": flagSetIdC}},
}
store, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
store.Update(sourceA, sourceAFlags, nil)
store.Update(sourceB, sourceBFlags, nil)
store.Update(sourceC, sourceCFlags, nil)
gotFlags, _, _ := store.GetAll(context.Background(), tt.selector)
require.Equal(t, len(tt.wantFlags), len(gotFlags))
require.Equal(t, tt.wantFlags, gotFlags)
})
}
}
func TestWatch(t *testing.T) {
sourceA := "sourceA"
sourceB := "sourceB"
sourceC := "sourceC"
myFlagSetId := "myFlagSet"
var sources = []string{sourceA, sourceB, sourceC}
pauseTime := 100 * time.Millisecond // time for updates to settle
timeout := 1000 * time.Millisecond // time to make sure we get enough updates, and no extras
sourceASelector := NewSelector("source=" + sourceA)
flagSetIdCSelector := NewSelector("flagSetId=" + myFlagSetId)
emptySelector := NewSelector("")
sourceCSelector := NewSelector("source=" + sourceC)
tests := []struct {
name string
selector *Selector
wantUpdates int
}{
{
name: "flag source selector (initial, plus 1 update)",
selector: &sourceASelector,
wantUpdates: 2,
},
{
name: "flag set selector (initial, plus 3 updates)",
selector: &flagSetIdCSelector,
wantUpdates: 4,
},
{
name: "no selector (all updates)",
selector: &emptySelector,
wantUpdates: 5,
},
{
name: "flag source selector for unchanged source (initial, plus no updates)",
selector: &sourceCSelector,
wantUpdates: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
sourceAFlags := map[string]model.Flag{
"flagA": {Key: "flagA", DefaultVariant: "off"},
}
sourceBFlags := map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "off", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
}
sourceCFlags := map[string]model.Flag{
"flagC": {Key: "flagC", DefaultVariant: "off"},
}
store, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
// setup initial flags
store.Update(sourceA, sourceAFlags, model.Metadata{})
store.Update(sourceB, sourceBFlags, model.Metadata{})
store.Update(sourceC, sourceCFlags, model.Metadata{})
watcher := make(chan FlagQueryResult, 1)
time.Sleep(pauseTime)
ctx, cancel := context.WithCancel(context.Background())
store.Watch(ctx, tt.selector, watcher)
// perform updates
go func() {
time.Sleep(pauseTime)
// changing a flag default variant should trigger an update
store.Update(sourceA, map[string]model.Flag{
"flagA": {Key: "flagA", DefaultVariant: "on"},
}, model.Metadata{})
time.Sleep(pauseTime)
// changing a flag default variant should trigger an update
store.Update(sourceB, map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
}, model.Metadata{})
time.Sleep(pauseTime)
// removing a flag set id should trigger an update (even for flag set id selectors; it should remove the flag from the set)
store.Update(sourceB, map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "on"},
}, model.Metadata{})
time.Sleep(pauseTime)
// adding a flag set id should trigger an update
store.Update(sourceB, map[string]model.Flag{
"flagB": {Key: "flagB", DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
}, model.Metadata{})
}()
updates := 0
for {
select {
case <-time.After(timeout):
assert.Equal(t, tt.wantUpdates, updates, "expected %d updates, got %d", tt.wantUpdates, updates)
cancel()
_, open := <-watcher
assert.False(t, open, "watcher channel should be closed after cancel")
return
case q := <-watcher:
if q.Flags != nil {
updates++
}
}
}
})
}
}
func TestQueryMetadata(t *testing.T) {
sourceA := "sourceA"
otherSource := "otherSource"
nonExistingFlagSetId := "nonExistingFlagSetId"
var sources = []string{sourceA}
sourceAFlags := map[string]model.Flag{
"flagA": {Key: "flagA", DefaultVariant: "off"},
"flagB": {Key: "flagB", DefaultVariant: "on"},
}
store, err := NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
// setup initial flags
store.Update(sourceA, sourceAFlags, model.Metadata{})
selector := NewSelector("source=" + otherSource + ",flagSetId=" + nonExistingFlagSetId)
_, metadata, _ := store.GetAll(context.Background(), &selector)
assert.Equal(t, metadata, model.Metadata{"source": otherSource, "flagSetId": nonExistingFlagSetId}, "metadata did not match expected")
selector = NewSelector("source=" + otherSource + ",flagSetId=" + nonExistingFlagSetId)
_, metadata, _ = store.Get(context.Background(), "key", &selector)
assert.Equal(t, metadata, model.Metadata{"source": otherSource, "flagSetId": nonExistingFlagSetId}, "metadata did not match expected")
}

View File

@ -104,7 +104,7 @@ func (hs *Sync) sync(ctx context.Context, dataSync chan<- sync.DataSync, skipChe
if !skipCheckingModTime {
hs.lastUpdated = updated
}
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object}
return nil
}

View File

@ -52,7 +52,7 @@ func NewFileSync(uri string, watchType string, logger *logger.Logger) *Sync {
const defaultState = "{}"
func (fs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
fs.sendDataSync(ctx, sync.ALL, dataSync)
fs.sendDataSync(ctx, dataSync)
return nil
}
@ -94,7 +94,7 @@ func (fs *Sync) setReady(val bool) {
//nolint:funlen
func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
defer fs.watcher.Close()
fs.sendDataSync(ctx, sync.ALL, dataSync)
fs.sendDataSync(ctx, dataSync)
fs.setReady(true)
fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI))
for {
@ -108,7 +108,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
fs.Logger.Info(fmt.Sprintf("filepath event: %s %s", event.Name, event.Op.String()))
switch {
case event.Has(fsnotify.Create) || event.Has(fsnotify.Write):
fs.sendDataSync(ctx, sync.ALL, dataSync)
fs.sendDataSync(ctx, dataSync)
case event.Has(fsnotify.Remove):
// K8s exposes config maps as symlinks.
// Updates cause a remove event, we need to re-add the watcher in this case.
@ -116,20 +116,20 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
if err != nil {
// the watcher could not be re-added, so the file must have been deleted
fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error()))
fs.sendDataSync(ctx, sync.DELETE, dataSync)
fs.sendDataSync(ctx, dataSync)
continue
}
// Counterintuitively, remove events are the only meaningful ones seen in K8s.
// K8s handles mounted ConfigMap updates by modifying symbolic links, which is an atomic operation.
// At the point the remove event is fired, we have our new data, so we can send it down the channel.
fs.sendDataSync(ctx, sync.ALL, dataSync)
fs.sendDataSync(ctx, dataSync)
case event.Has(fsnotify.Chmod):
// on linux the REMOVE event will not fire until all file descriptors are closed, this cannot happen
// while the file is being watched, os.Stat is used here to infer deletion
if _, err := os.Stat(fs.URI); errors.Is(err, os.ErrNotExist) {
fs.Logger.Error(fmt.Sprintf("file has been deleted: %s", err.Error()))
fs.sendDataSync(ctx, sync.DELETE, dataSync)
fs.sendDataSync(ctx, dataSync)
}
}
@ -147,14 +147,8 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
}
}
func (fs *Sync) sendDataSync(ctx context.Context, syncType sync.Type, dataSync chan<- sync.DataSync) {
fs.Logger.Debug(fmt.Sprintf("Configuration %s: %s", fs.URI, syncType.String()))
if syncType == sync.DELETE {
// Skip fetching and emit default state to avoid EOF errors
dataSync <- sync.DataSync{FlagData: defaultState, Source: fs.URI, Type: syncType}
return
}
func (fs *Sync) sendDataSync(ctx context.Context, dataSync chan<- sync.DataSync) {
fs.Logger.Debug(fmt.Sprintf("Data sync received for %s", fs.URI))
msg := defaultState
m, err := fs.fetch(ctx)
@ -167,7 +161,7 @@ func (fs *Sync) sendDataSync(ctx context.Context, syncType sync.Type, dataSync c
msg = m
}
dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI, Type: syncType}
dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI}
}
func (fs *Sync) fetch(_ context.Context) (string, error) {

View File

@ -26,7 +26,6 @@ func TestSimpleReSync(t *testing.T) {
expectedDataSync := sync.DataSync{
FlagData: "hello",
Source: source,
Type: sync.ALL,
}
handler := Sync{
URI: source,
@ -76,7 +75,6 @@ func TestSimpleSync(t *testing.T) {
{
FlagData: fetchFileContents,
Source: fmt.Sprintf("%s/%s", readDirName, fetchFileName),
Type: sync.ALL,
},
},
},
@ -94,12 +92,10 @@ func TestSimpleSync(t *testing.T) {
{
FlagData: fetchFileContents,
Source: fmt.Sprintf("%s/%s", updateDirName, fetchFileName),
Type: sync.ALL,
},
{
FlagData: "new content",
Source: fmt.Sprintf("%s/%s", updateDirName, fetchFileName),
Type: sync.ALL,
},
},
},
@ -117,12 +113,10 @@ func TestSimpleSync(t *testing.T) {
{
FlagData: fetchFileContents,
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
Type: sync.ALL,
},
{
FlagData: defaultState,
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
Type: sync.DELETE,
},
},
},
@ -172,9 +166,6 @@ func TestSimpleSync(t *testing.T) {
if data.Source != syncEvent.Source {
t.Errorf("expected source: %s, but received source: %s", syncEvent.Source, data.Source)
}
if data.Type != syncEvent.Type {
t.Errorf("expected type: %b, but received type: %b", syncEvent.Type, data.Type)
}
case <-time.After(10 * time.Second):
t.Errorf("event not found, timeout out after 10 seconds")
}

View File

@ -106,7 +106,6 @@ func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
dataSync <- sync.DataSync{
FlagData: res.GetFlagConfiguration(),
Source: g.URI,
Type: sync.ALL,
}
return nil
}
@ -200,11 +199,10 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
}
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
FlagData: data.FlagConfiguration,
SyncContext: data.SyncContext,
Source: g.URI,
Selector: g.Selector,
Type: sync.ALL,
Source: g.URI,
Selector: g.Selector,
}
g.Logger.Debug("received full configuration payload")

View File

@ -16,7 +16,6 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
credendialsmock "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials/mock"
grpcmock "github.com/open-feature/flagd/core/pkg/sync/grpc/mock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
@ -124,7 +123,6 @@ func Test_ReSyncTests(t *testing.T) {
notifications: []sync.DataSync{
{
FlagData: "success",
Type: sync.ALL,
},
},
shouldError: false,
@ -181,9 +179,6 @@ func Test_ReSyncTests(t *testing.T) {
for _, expected := range test.notifications {
out := <-syncChan
if expected.Type != out.Type {
t.Errorf("Returned sync type = %v, wanted %v", out.Type, expected.Type)
}
if expected.FlagData != out.FlagData {
t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData)
@ -197,100 +192,6 @@ func Test_ReSyncTests(t *testing.T) {
}
}
func TestSync_BasicFlagSyncStates(t *testing.T) {
grpcSyncImpl := Sync{
URI: "grpc://test",
ProviderID: "",
Logger: logger.NewLogger(nil, false),
}
mockError := errors.New("could not sync")
tests := []struct {
name string
stream syncv1grpc.FlagSyncService_SyncFlagsClient
setup func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse)
want sync.Type
wantError error
ready bool
}{
{
name: "State All maps to Sync All",
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
metadata, err := structpb.NewStruct(map[string]any{"sources": "A,B,C"})
if err != nil {
t.Fatalf("Failed to create sync context: %v", err)
}
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
gomock.InOrder(
clientResponse.EXPECT().Recv().Return(
&v1.SyncFlagsResponse{
FlagConfiguration: "{}",
SyncContext: metadata,
},
nil,
),
clientResponse.EXPECT().Recv().Return(
nil, io.EOF,
),
)
},
want: sync.ALL,
ready: true,
},
{
name: "Error during flag sync",
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
clientResponse.EXPECT().Recv().Return(
nil,
mockError,
)
},
ready: true,
want: -1,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
syncChan := make(chan sync.DataSync, 1)
mockClient := grpcmock.NewMockFlagSyncServiceClient(ctrl)
mockClientResponse := grpcmock.NewMockFlagSyncServiceClientResponse(ctrl)
test.setup(t, mockClient, mockClientResponse)
waitChan := make(chan struct{})
go func() {
grpcSyncImpl.client = mockClient
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := grpcSyncImpl.Sync(ctx, syncChan)
if err != nil {
t.Errorf("Error handling flag sync: %v", err)
}
close(waitChan)
}()
<-waitChan
if test.want < 0 {
require.Empty(t, syncChan)
return
}
data := <-syncChan
if grpcSyncImpl.IsReady() != test.ready {
t.Errorf("expected grpcSyncImpl.ready to be: '%v', got: '%v'", test.ready, grpcSyncImpl.ready)
}
if data.Type != test.want {
t.Errorf("Returned data sync state = %v, wanted %v", data.Type, test.want)
}
})
}
}
func Test_StreamListener(t *testing.T) {
const target = "localBufCon"
@ -315,7 +216,6 @@ func Test_StreamListener(t *testing.T) {
{
FlagData: "{\"flags\": {}}",
SyncContext: metadata,
Type: sync.ALL,
},
},
},
@ -331,14 +231,12 @@ func Test_StreamListener(t *testing.T) {
},
output: []sync.DataSync{
{
FlagData: "{}",
FlagData: "{}",
SyncContext: metadata,
Type: sync.ALL,
},
{
FlagData: "{\"flags\": {}}",
SyncContext: metadata,
Type: sync.ALL,
},
},
},
@ -391,10 +289,6 @@ func Test_StreamListener(t *testing.T) {
for _, expected := range test.output {
out := <-syncChan
if expected.Type != out.Type {
t.Errorf("Returned sync type = %v, wanted %v", out.Type, expected.Type)
}
if expected.FlagData != out.FlagData {
t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData)
}
@ -485,8 +379,7 @@ func Test_SyncRetry(t *testing.T) {
// Setup
target := "grpc://local"
bufListener := bufconn.Listen(1)
expectType := sync.ALL
emptyFlagData := "{}"
// buffer based server. response ignored purposefully
bServer := bufferedServer{listener: bufListener, mockResponses: []serverPayload{
@ -540,7 +433,7 @@ func Test_SyncRetry(t *testing.T) {
t.Errorf("timeout waiting for conditions to fulfil")
break
case data := <-syncChan:
if data.Type != expectType {
if data.FlagData != emptyFlagData {
t.Errorf("sync start error: %s", err.Error())
}
}
@ -560,9 +453,9 @@ func Test_SyncRetry(t *testing.T) {
case <-tCtx.Done():
cancelFunc()
t.Error("timeout waiting for conditions to fulfil")
case rsp := <-syncChan:
if rsp.Type != expectType {
t.Errorf("expected response: %s, but got: %s", expectType, rsp.Type)
case data := <-syncChan:
if data.FlagData != emptyFlagData {
t.Errorf("sync start error: %s", err.Error())
}
}
}

View File

@ -27,6 +27,7 @@ type Sync struct {
AuthHeader string
Interval uint32
ready bool
eTag string
}
// Client defines the behaviour required of a http client
@ -42,11 +43,11 @@ type Cron interface {
}
func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
msg, err := hs.Fetch(ctx)
msg, _, err := hs.fetchBody(ctx, true)
if err != nil {
return err
}
dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI}
return nil
}
@ -63,7 +64,7 @@ func (hs *Sync) IsReady() bool {
func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
// Initial fetch
fetch, err := hs.Fetch(ctx)
fetch, _, err := hs.fetchBody(ctx, true)
if err != nil {
return err
}
@ -74,33 +75,30 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
hs.Logger.Debug(fmt.Sprintf("polling %s every %d seconds", hs.URI, hs.Interval))
_ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() {
hs.Logger.Debug(fmt.Sprintf("fetching configuration from %s", hs.URI))
body, err := hs.fetchBodyFromURL(ctx, hs.URI)
previousBodySHA := hs.LastBodySHA
body, noChange, err := hs.fetchBody(ctx, false)
if err != nil {
hs.Logger.Error(err.Error())
hs.Logger.Error(fmt.Sprintf("error fetching: %s", err.Error()))
return
}
if body == "" {
if body == "" && !noChange {
hs.Logger.Debug("configuration deleted")
return
}
currentSHA := hs.generateSha([]byte(body))
if hs.LastBodySHA == "" {
hs.Logger.Debug("new configuration created")
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI, Type: sync.ALL}
} else if hs.LastBodySHA != currentSHA {
hs.Logger.Debug("configuration modified")
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI, Type: sync.ALL}
if previousBodySHA == "" {
hs.Logger.Debug("configuration created")
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI}
} else if previousBodySHA != hs.LastBodySHA {
hs.Logger.Debug("configuration updated")
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI}
}
hs.LastBodySHA = currentSHA
})
hs.Cron.Start()
dataSync <- sync.DataSync{FlagData: fetch, Source: hs.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: fetch, Source: hs.URI}
<-ctx.Done()
hs.Cron.Stop()
@ -108,10 +106,14 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return nil
}
func (hs *Sync) fetchBodyFromURL(ctx context.Context, url string) (string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, bytes.NewBuffer(nil))
func (hs *Sync) fetchBody(ctx context.Context, fetchAll bool) (string, bool, error) {
if hs.URI == "" {
return "", false, errors.New("no HTTP URL string set")
}
req, err := http.NewRequestWithContext(ctx, "GET", hs.URI, bytes.NewBuffer(nil))
if err != nil {
return "", fmt.Errorf("error creating request for url %s: %w", url, err)
return "", false, fmt.Errorf("error creating request for url %s: %w", hs.URI, err)
}
req.Header.Add("Accept", "application/json")
@ -124,32 +126,50 @@ func (hs *Sync) fetchBodyFromURL(ctx context.Context, url string) (string, error
req.Header.Set("Authorization", bearer)
}
if hs.eTag != "" && !fetchAll {
req.Header.Set("If-None-Match", hs.eTag)
}
resp, err := hs.Client.Do(req)
if err != nil {
return "", fmt.Errorf("error calling endpoint %s: %w", url, err)
return "", false, fmt.Errorf("error calling endpoint %s: %w", hs.URI, err)
}
defer func() {
err = resp.Body.Close()
if err != nil {
hs.Logger.Debug(fmt.Sprintf("error closing the response body: %s", err.Error()))
hs.Logger.Error(fmt.Sprintf("error closing the response body: %s", err.Error()))
}
}()
if resp.StatusCode == 304 {
hs.Logger.Debug("no changes detected")
return "", true, nil
}
statusOK := resp.StatusCode >= 200 && resp.StatusCode < 300
if !statusOK {
return "", fmt.Errorf("error fetching from url %s: %s", url, resp.Status)
return "", false, fmt.Errorf("error fetching from url %s: %s", hs.URI, resp.Status)
}
if resp.Header.Get("ETag") != "" {
hs.eTag = resp.Header.Get("ETag")
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("unable to read body to bytes: %w", err)
return "", false, fmt.Errorf("unable to read body to bytes: %w", err)
}
json, err := utils.ConvertToJSON(body, getFileExtensions(url), resp.Header.Get("Content-Type"))
json, err := utils.ConvertToJSON(body, getFileExtensions(hs.URI), resp.Header.Get("Content-Type"))
if err != nil {
return "", fmt.Errorf("error converting response body to json: %w", err)
return "", false, fmt.Errorf("error converting response body to json: %w", err)
}
return json, nil
if json != "" {
hs.LastBodySHA = hs.generateSha([]byte(body))
}
return json, false, nil
}
// getFileExtensions returns the file extension from the URL path
@ -169,17 +189,6 @@ func (hs *Sync) generateSha(body []byte) string {
}
func (hs *Sync) Fetch(ctx context.Context) (string, error) {
if hs.URI == "" {
return "", errors.New("no HTTP URL string set")
}
body, err := hs.fetchBodyFromURL(ctx, hs.URI)
if err != nil {
return "", err
}
if body != "" {
hs.LastBodySHA = hs.generateSha([]byte(body))
}
return body, nil
body, _, err := hs.fetchBody(ctx, false)
return body, err
}

View File

@ -5,7 +5,6 @@ import (
"io"
"log"
"net/http"
"reflect"
"strings"
"testing"
"time"
@ -111,6 +110,7 @@ func TestHTTPSync_Fetch(t *testing.T) {
uri string
bearerToken string
authHeader string
eTagHeader string
lastBodySHA string
handleResponse func(*testing.T, Sync, string, error)
}{
@ -241,6 +241,85 @@ func TestHTTPSync_Fetch(t *testing.T) {
}
},
},
"not modified response etag matched": {
setup: func(t *testing.T, client *syncmock.MockClient) {
expectedIfNoneMatch := `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`
client.EXPECT().Do(gomock.Any()).DoAndReturn(func(req *http.Request) (*http.Response, error) {
actualIfNoneMatch := req.Header.Get("If-None-Match")
if actualIfNoneMatch != expectedIfNoneMatch {
t.Fatalf("expected If-None-Match header to be '%s', got %s", expectedIfNoneMatch, actualIfNoneMatch)
}
return &http.Response{
Header: map[string][]string{"ETag": {expectedIfNoneMatch}},
Body: io.NopCloser(strings.NewReader("")),
StatusCode: http.StatusNotModified,
}, nil
})
},
uri: "http://localhost",
eTagHeader: `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`,
handleResponse: func(t *testing.T, httpSync Sync, _ string, err error) {
if err != nil {
t.Fatalf("fetch: %v", err)
}
expectedLastBodySHA := ""
expectedETag := `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`
if httpSync.LastBodySHA != expectedLastBodySHA {
t.Errorf(
"expected last body sha to be: '%s', got: '%s'", expectedLastBodySHA, httpSync.LastBodySHA,
)
}
if httpSync.eTag != expectedETag {
t.Errorf(
"expected last etag to be: '%s', got: '%s'", expectedETag, httpSync.eTag,
)
}
},
},
"modified response etag mismatched": {
setup: func(t *testing.T, client *syncmock.MockClient) {
expectedIfNoneMatch := `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`
client.EXPECT().Do(gomock.Any()).DoAndReturn(func(req *http.Request) (*http.Response, error) {
actualIfNoneMatch := req.Header.Get("If-None-Match")
if actualIfNoneMatch != expectedIfNoneMatch {
t.Fatalf("expected If-None-Match header to be '%s', got %s", expectedIfNoneMatch, actualIfNoneMatch)
}
newContent := "\"Hey there!\""
newETag := `"c2e01ce63d90109c4c7f4f6dcea97ed1bb2b51e3647f36caf5acbe27413a24bb"`
return &http.Response{
Header: map[string][]string{
"Content-Type": {"application/json"},
"Etag": {newETag},
},
Body: io.NopCloser(strings.NewReader(newContent)),
StatusCode: http.StatusOK,
}, nil
})
},
uri: "http://localhost",
eTagHeader: `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`,
handleResponse: func(t *testing.T, httpSync Sync, _ string, err error) {
if err != nil {
t.Fatalf("fetch: %v", err)
}
expectedLastBodySHA := "wuAc5j2QEJxMf09tzql-0bsrUeNkfzbK9ay-J0E6JLs="
expectedETag := `"c2e01ce63d90109c4c7f4f6dcea97ed1bb2b51e3647f36caf5acbe27413a24bb"`
if httpSync.LastBodySHA != expectedLastBodySHA {
t.Errorf(
"expected last body sha to be: '%s', got: '%s'", expectedLastBodySHA, httpSync.LastBodySHA,
)
}
if httpSync.eTag != expectedETag {
t.Errorf(
"expected last etag to be: '%s', got: '%s'", expectedETag, httpSync.eTag,
)
}
},
},
}
for name, tt := range tests {
@ -256,6 +335,7 @@ func TestHTTPSync_Fetch(t *testing.T) {
AuthHeader: tt.authHeader,
LastBodySHA: tt.lastBodySHA,
Logger: logger.NewLogger(nil, false),
eTag: tt.eTagHeader,
}
fetched, err := httpSync.Fetch(context.Background())
@ -289,6 +369,8 @@ func TestSync_Init(t *testing.T) {
func TestHTTPSync_Resync(t *testing.T) {
ctrl := gomock.NewController(t)
source := "http://localhost"
emptyFlagData := "{}"
tests := map[string]struct {
setup func(t *testing.T, client *syncmock.MockClient)
@ -303,11 +385,11 @@ func TestHTTPSync_Resync(t *testing.T) {
setup: func(_ *testing.T, client *syncmock.MockClient) {
client.EXPECT().Do(gomock.Any()).Return(&http.Response{
Header: map[string][]string{"Content-Type": {"application/json"}},
Body: io.NopCloser(strings.NewReader("")),
Body: io.NopCloser(strings.NewReader(emptyFlagData)),
StatusCode: http.StatusOK,
}, nil)
},
uri: "http://localhost",
uri: source,
handleResponse: func(t *testing.T, _ Sync, fetched string, err error) {
if err != nil {
t.Fatalf("fetch: %v", err)
@ -320,9 +402,8 @@ func TestHTTPSync_Resync(t *testing.T) {
wantErr: false,
wantNotifications: []sync.DataSync{
{
Type: sync.ALL,
FlagData: "",
Source: "",
FlagData: emptyFlagData,
Source: source,
},
},
},
@ -364,8 +445,8 @@ func TestHTTPSync_Resync(t *testing.T) {
for _, dataSync := range tt.wantNotifications {
select {
case x := <-d:
if !reflect.DeepEqual(x.String(), dataSync.String()) {
t.Error("unexpected datasync received", x, dataSync)
if x.FlagData != dataSync.FlagData || x.Source != dataSync.Source {
t.Errorf("unexpected datasync received %v vs %v", x, dataSync)
}
case <-time.After(2 * time.Second):
t.Error("expected datasync not received", dataSync)

View File

@ -6,35 +6,6 @@ import (
"google.golang.org/protobuf/types/known/structpb"
)
type Type int
// Type of the sync operation
const (
// ALL - All flags of sync provider. This is the default if unset due to primitive default
ALL Type = iota
// ADD - Additional flags from sync provider
ADD
// UPDATE - Update for flag(s) previously provided
UPDATE
// DELETE - Delete for flag(s) previously provided
DELETE
)
func (t Type) String() string {
switch t {
case ALL:
return "ALL"
case ADD:
return "ADD"
case UPDATE:
return "UPDATE"
case DELETE:
return "DELETE"
default:
return "UNKNOWN"
}
}
/*
ISync implementations watch for changes in the flag sources (HTTP backend, local file, K8s CRDs ...),fetch the latest
value and communicate to the Runtime with DataSync channel
@ -57,11 +28,10 @@ type ISync interface {
// DataSync is the data contract between Runtime and sync implementations
type DataSync struct {
FlagData string
FlagData string
SyncContext *structpb.Struct
Source string
Selector string
Type
Source string
Selector string
}
// SourceConfig is configuration option for flagd. This maps to startup parameter sources

View File

@ -57,7 +57,7 @@ func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
if err != nil {
return fmt.Errorf("unable to fetch flag configuration: %w", err)
}
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI}
return nil
}
@ -97,7 +97,7 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return err
}
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI}
notifies := make(chan INotify)
@ -136,7 +136,7 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
continue
}
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI}
case DefaultEventTypeModify:
k.logger.Debug("Configuration modified")
msg, err := k.fetch(ctx)
@ -145,7 +145,7 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
continue
}
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI}
case DefaultEventTypeDelete:
k.logger.Debug("configuration deleted")
case DefaultEventTypeReady:

View File

@ -607,6 +607,7 @@ func TestInit(t *testing.T) {
func TestSync_ReSync(t *testing.T) {
const name = "myFF"
const ns = "myNS"
const payload = "{\"flags\":null}"
s := runtime.NewScheme()
ff := &unstructured.Unstructured{}
ff.SetUnstructuredContent(getCFG(name, ns))
@ -668,8 +669,8 @@ func TestSync_ReSync(t *testing.T) {
i := tt.countMsg
for i > 0 {
d := <-dataChannel
if d.Type != sync.ALL {
t.Errorf("Expected %v, got %v", sync.ALL, d)
if d.FlagData != payload {
t.Errorf("Expected %v, got %v", payload, d.FlagData)
}
i--
}

View File

@ -21,7 +21,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

View File

@ -12,7 +12,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)

View File

@ -10,7 +10,7 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
msdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
)
const (
@ -19,15 +19,15 @@ const (
FeatureFlagReasonKey = attribute.Key("feature_flag.reason")
ExceptionTypeKey = attribute.Key("ExceptionTypeKeyName")
httpRequestDurationMetric = "http.server.duration"
httpResponseSizeMetric = "http.server.response.size"
httpRequestDurationMetric = "http.server.request.duration"
httpResponseSizeMetric = "http.server.response.body.size"
httpActiveRequestsMetric = "http.server.active_requests"
impressionMetric = "feature_flag." + ProviderName + ".impression"
reasonMetric = "feature_flag." + ProviderName + ".evaluation.reason"
reasonMetric = "feature_flag." + ProviderName + ".result.reason"
)
type IMetricsRecorder interface {
HTTPAttributes(svcName, url, method, code string) []attribute.KeyValue
HTTPAttributes(svcName, url, method, code, scheme string) []attribute.KeyValue
HTTPRequestDuration(ctx context.Context, duration time.Duration, attrs []attribute.KeyValue)
HTTPResponseSize(ctx context.Context, sizeBytes int64, attrs []attribute.KeyValue)
InFlightRequestStart(ctx context.Context, attrs []attribute.KeyValue)
@ -38,7 +38,7 @@ type IMetricsRecorder interface {
type NoopMetricsRecorder struct{}
func (NoopMetricsRecorder) HTTPAttributes(_, _, _, _ string) []attribute.KeyValue {
func (NoopMetricsRecorder) HTTPAttributes(_, _, _, _, _ string) []attribute.KeyValue {
return []attribute.KeyValue{}
}
@ -68,12 +68,13 @@ type MetricsRecorder struct {
reasons metric.Int64Counter
}
func (r MetricsRecorder) HTTPAttributes(svcName, url, method, code string) []attribute.KeyValue {
func (r MetricsRecorder) HTTPAttributes(svcName, url, method, code, scheme string) []attribute.KeyValue {
return []attribute.KeyValue{
semconv.ServiceNameKey.String(svcName),
semconv.HTTPURLKey.String(url),
semconv.HTTPMethodKey.String(method),
semconv.HTTPStatusCodeKey.String(code),
semconv.HTTPRouteKey.String(url),
semconv.HTTPRequestMethodKey.String(method),
semconv.HTTPResponseStatusCodeKey.String(code),
semconv.URLSchemeKey.String(scheme),
}
}

View File

@ -10,7 +10,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.13.0"
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
)
const svcName = "mySvc"
@ -38,9 +38,10 @@ func TestHTTPAttributes(t *testing.T) {
},
want: []attribute.KeyValue{
semconv.ServiceNameKey.String(""),
semconv.HTTPURLKey.String(""),
semconv.HTTPMethodKey.String(""),
semconv.HTTPStatusCodeKey.String(""),
semconv.HTTPRouteKey.String(""),
semconv.HTTPRequestMethodKey.String(""),
semconv.HTTPResponseStatusCodeKey.String(""),
semconv.URLSchemeKey.String("http"),
},
},
{
@ -53,9 +54,10 @@ func TestHTTPAttributes(t *testing.T) {
},
want: []attribute.KeyValue{
semconv.ServiceNameKey.String("myService"),
semconv.HTTPURLKey.String("#123"),
semconv.HTTPMethodKey.String("POST"),
semconv.HTTPStatusCodeKey.String("300"),
semconv.HTTPRouteKey.String("#123"),
semconv.HTTPRequestMethodKey.String("POST"),
semconv.HTTPResponseStatusCodeKey.String("300"),
semconv.URLSchemeKey.String("http"),
},
},
{
@ -68,16 +70,17 @@ func TestHTTPAttributes(t *testing.T) {
},
want: []attribute.KeyValue{
semconv.ServiceNameKey.String("!@#$%^&*()_+|}{[];',./<>"),
semconv.HTTPURLKey.String(""),
semconv.HTTPMethodKey.String(""),
semconv.HTTPStatusCodeKey.String(""),
semconv.HTTPRouteKey.String(""),
semconv.HTTPRequestMethodKey.String(""),
semconv.HTTPResponseStatusCodeKey.String(""),
semconv.URLSchemeKey.String("http"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rec := MetricsRecorder{}
res := rec.HTTPAttributes(tt.req.Service, tt.req.ID, tt.req.Method, tt.req.Code)
res := rec.HTTPAttributes(tt.req.Service, tt.req.ID, tt.req.Method, tt.req.Code, "http")
require.Equal(t, tt.want, res)
})
}
@ -208,7 +211,7 @@ func TestMetrics(t *testing.T) {
// some really simple tests just to make sure all methods are actually implemented and nothing panics
func TestNoopMetricsRecorder_HTTPAttributes(t *testing.T) {
no := NoopMetricsRecorder{}
got := no.HTTPAttributes("", "", "", "")
got := no.HTTPAttributes("", "", "", "", "")
require.Empty(t, got)
}

View File

@ -2,7 +2,7 @@ package telemetry
import (
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
)
// utils contain common utilities to help with telemetry
@ -14,7 +14,7 @@ const provider = "flagd"
func SemConvFeatureFlagAttributes(ffKey string, ffVariant string) []attribute.KeyValue {
return []attribute.KeyValue{
semconv.FeatureFlagKey(ffKey),
semconv.FeatureFlagVariant(ffVariant),
semconv.FeatureFlagResultVariant(ffVariant),
semconv.FeatureFlagProviderName(provider),
}
}

View File

@ -4,7 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
)
func TestSemConvFeatureFlagAttributes(t *testing.T) {
@ -35,7 +35,7 @@ func TestSemConvFeatureFlagAttributes(t *testing.T) {
case semconv.FeatureFlagKeyKey:
require.Equal(t, test.key, attribute.Value.AsString(),
"expected flag key: %s, but received: %s", test.key, attribute.Value.AsString())
case semconv.FeatureFlagVariantKey:
case semconv.FeatureFlagResultVariantKey:
require.Equal(t, test.variant, attribute.Value.AsString(),
"expected flag variant: %s, but received %s", test.variant, attribute.Value.AsString())
case semconv.FeatureFlagProviderNameKey:

View File

@ -0,0 +1,77 @@
---
status: accepted
author: @tangenti
created: 2025-06-16
updated: 2025-06-16
---
# Decouple flag sync sources and flag sets
The goal is to support dynamic flag sets for flagd providers and decouple sources and flag sets.
## Background
Flagd daemon syncs flag configurations from multiple sources. A single source provides a single config, which has an optional flag set ID that may or may not change in the following syncs of the same source.
The in-process provider uses `selector` to specify the desired source. In order to get a desired flag set, a provider has to stick to a source that provides that flag set. In this case, the flagd daemon cannot remove a source without breaking the dependant flagd providers.
Assumptions of the current model
- `flagSetId`s must be unique across different sources or the configuration is considered invalid.
- In-process providers request at most one flag set.
## Requirements
- Flagd daemon can remove a source without breaking in-process providers that depend on the flag set the source provides.
- In-process providers can select based on flag sets.
- No breaking changes for the current usage of `selector`
## Proposal
### API change
#### Flag Configuration Schema
Add an optional field `flagsetID` under `flag` or `flag.metadata`. The flag set ID cannot be specified if a flag set ID is specified for the config.
### Flagd Sync Selector
Selector will be extended for generic flags selection, starting with checking the equivalence of `source` and `flagsetID` of flags.
Example
```yaml
# Flags from the source `override`
selector: override
# Flags from the source `override`
selector: source=override
# Flags from the flag set `project-42`
selector: flagsetID=project-42
```
The semantic can later be extended with a more complex design, such as AIP-160 filter or Kubernetes selections. This is out of the scope of this ADR.
### Flagd Daemon Storage
1. Flagd will have separate stores for `flags` and `sources`
2. `selector` will be removed from the store
3. `flagSetID` will be added as part of `model.Flag` or under `model.Flag.Metadata` for better consistency with the API.
### Flags Sync
Sync server would count the extended syntax of `selector` and filter the list of flags on-the-fly answering the requests from the providers.
The existing conflict resolving based on sources remains the same. Resyncs on removing flags remains unchanged as well.
## Consequences
### The good
- One source can have multiple flag sets.
- `selector` works on a more grandular level.
- No breaking change
- Sync servers and clients now hold the same understanding of the `selector` semantic.

View File

@ -1,5 +1,5 @@
---
status: proposed
status: accepted
author: @tangenti
created: 2025-06-27
updated: 2025-06-27

View File

@ -34,6 +34,11 @@ invoked with **HTTP GET** request.
The polling interval, port, TLS settings, and authentication information can be configured.
See [sync source](../reference/sync-configuration.md#source-configuration) configuration for details.
To optimize network usage, it honors the HTTP ETag protocol: if the server includes an `ETag` header in its response,
flagd will store this value and send it in the `If-None-Match` header on subsequent requests. If the flag data has
not changed, the server responds with 304 Not Modified, and flagd will skip updating its state. If the data has
changed, the server returns the new content and a new ETag, prompting flagd to update its flags.
---
### gRPC sync

View File

@ -14,6 +14,7 @@ flagd start [flags]
-H, --context-from-header stringToString add key-value pairs to map header values to context values, where key is Header name, value is context key (default [])
-X, --context-value stringToString add arbitrary key value pairs to the flag evaluation context (default [])
-C, --cors-origin strings CORS allowed origins, * will allow all origins
--disable-sync-metadata Disables the getMetadata endpoint of the sync service. Defaults to false, but will default to true in later versions.
-h, --help help for start
-z, --log-format string Set the logging format, e.g. console or json (default "console")
-m, --management-port int32 Port for management operations (default 8014)

View File

@ -47,15 +47,25 @@ Given below is the current implementation overview of flagd telemetry internals,
flagd exposes the following metrics:
- `http.server.duration`
- `http.server.response.size`
- `http.server.active_requests`
- `feature_flag.flagd.impression`
- `feature_flag.flagd.evaluation.reason`
- `http.server.request.duration` - Measures the duration of inbound HTTP requests
- `http.server.response.body.size` - Measures the size of HTTP response messages
- `http.server.active_requests` - Measures the number of concurrent HTTP requests that are currently in-flight
- `feature_flag.flagd.impression` - Measures the number of evaluations for a given flag
- `feature_flag.flagd.result.reason` - Measures the number of evaluations for a given reason
> Please note that metric names may vary based on the consuming monitoring tool naming requirements.
> For example, the transformation of OTLP metrics to Prometheus is described [here](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/compatibility/prometheus_and_openmetrics.md#otlp-metric-points-to-prometheus).
### HTTP Metric Attributes
flagd uses the following OpenTelemetry Semantic Conventions for HTTP metrics:
- `service.name` - The name of the service
- `http.route` - The matched route (path template)
- `http.request.method` - The HTTP request method (GET, POST, etc.)
- `http.response.status_code` - The HTTP response status code
- `url.scheme` - The URI scheme (http or https)
## Traces
flagd creates the following spans as part of a trace:
@ -83,9 +93,8 @@ official [OTEL collector example](https://github.com/open-telemetry/opentelemetr
```yaml
services:
# Jaeger
jaeger-all-in-one:
image: jaegertracing/all-in-one:latest
jaeger:
image: cr.jaegertracing.io/jaegertracing/jaeger:2.8.0
restart: always
ports:
- "16686:16686"
@ -93,7 +102,7 @@ services:
- "14250"
# Collector
otel-collector:
image: otel/opentelemetry-collector:latest
image: otel/opentelemetry-collector:0.129.1
restart: always
command: [ "--config=/etc/otel-collector-config.yaml" ]
volumes:
@ -106,10 +115,10 @@ services:
- "4317:4317" # OTLP gRPC receiver
- "55679:55679" # zpages extension
depends_on:
- jaeger-all-in-one
- jaeger
prometheus:
container_name: prometheus
image: prom/prometheus:latest
image: prom/prometheus:v2.53.5
restart: always
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
@ -128,10 +137,8 @@ receivers:
exporters:
prometheus:
endpoint: "0.0.0.0:8889"
const_labels:
label1: value1
otlp/jaeger:
endpoint: jaeger-all-in-one:4317
endpoint: jaeger:4317
tls:
insecure: true
processors:
@ -148,7 +155,7 @@ service:
exporters: [ prometheus ]
```
#### prometheus.yml
#### prometheus.yaml
```yaml
scrape_configs:
@ -156,10 +163,9 @@ scrape_configs:
scrape_interval: 10s
static_configs:
- targets: [ 'otel-collector:8889' ]
- targets: [ 'otel-collector:8888' ]
```
Once, configuration files are ready, use `docker-compose up` to start the local setup. With successful startup, you can
Once, configuration files are ready, use `docker compose up` to start the local setup. With successful startup, you can
access metrics through [Prometheus](http://localhost:9090/graph) & traces through [Jaeger](http://localhost:16686/).
## Metadata

View File

@ -1,5 +1,21 @@
# Changelog
## [0.8.0](https://github.com/open-feature/flagd/compare/flagd-proxy/v0.7.6...flagd-proxy/v0.8.0) (2025-07-21)
### ⚠ BREAKING CHANGES
* remove sync.Type ([#1691](https://github.com/open-feature/flagd/issues/1691))
### ✨ New Features
* remove sync.Type ([#1691](https://github.com/open-feature/flagd/issues/1691)) ([ac647e0](https://github.com/open-feature/flagd/commit/ac647e065636071f5bc065a9a084461cea692166))
### 🧹 Chore
* **deps:** update module github.com/open-feature/flagd/core to v0.11.8 ([#1685](https://github.com/open-feature/flagd/issues/1685)) ([c07ffba](https://github.com/open-feature/flagd/commit/c07ffba55426d538224d8564be5f35339d2258d0))
## [0.7.6](https://github.com/open-feature/flagd/compare/flagd-proxy/v0.7.5...flagd-proxy/v0.7.6) (2025-07-15)

View File

@ -9,7 +9,7 @@ require (
buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.36.6-20250529171031-ebdc14163473.1
github.com/dimiro1/banner v1.1.0
github.com/mattn/go-colorable v0.1.14
github.com/open-feature/flagd/core v0.11.6
github.com/open-feature/flagd/core v0.11.8
github.com/prometheus/client_golang v1.22.0
github.com/spf13/cobra v1.9.1
github.com/spf13/viper v1.20.1
@ -104,7 +104,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/open-feature/flagd-schemas v0.2.9-0.20250319190911-9b0ee43ecc47 // indirect
github.com/open-feature/flagd-schemas v0.2.9-0.20250707123415-08b4c52d3b86 // indirect
github.com/open-feature/open-feature-operator/apis v0.2.45 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect

View File

@ -336,10 +336,14 @@ github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/open-feature/flagd-schemas v0.2.9-0.20250319190911-9b0ee43ecc47 h1:c6nodciz/xeU0xiAcDQ5MBW34DnPoi5/lEgjV5kZeZA=
github.com/open-feature/flagd-schemas v0.2.9-0.20250319190911-9b0ee43ecc47/go.mod h1:WKtwo1eW9/K6D+4HfgTXWBqCDzpvMhDa5eRxW7R5B2U=
github.com/open-feature/flagd-schemas v0.2.9-0.20250707123415-08b4c52d3b86 h1:r3e+qs3QUdf4+lUi2ZZnSHgYkjeLIb5yu5jo+ypA8iw=
github.com/open-feature/flagd-schemas v0.2.9-0.20250707123415-08b4c52d3b86/go.mod h1:WKtwo1eW9/K6D+4HfgTXWBqCDzpvMhDa5eRxW7R5B2U=
github.com/open-feature/flagd/core v0.11.5 h1:2zUbXN3Uh/cKZZVOQo9Z+Evjb9s15HcCtdo8vaNiBPY=
github.com/open-feature/flagd/core v0.11.5/go.mod h1:VBCrZ6bs+Occ226391YoIOnNEXmNvpWsQEeYfsLJ23c=
github.com/open-feature/flagd/core v0.11.6 h1:qrGYGUBmFABlvFA46LkLoG/DTwchUm+OLKX+XUytgS4=
github.com/open-feature/flagd/core v0.11.6/go.mod h1:UUhSAf0NSbGmFbdmkOf93YP90DA8O6ahVYn+vJO4BMg=
github.com/open-feature/flagd/core v0.11.8 h1:84uDdSzhtVNBnsjuAnBqBXbFwXC2CQ6aO5cNNKKM7uc=
github.com/open-feature/flagd/core v0.11.8/go.mod h1:3dNe+BX8HUpx/mXrGLD554G6cQB67yvuASVTKVC4TU4=
github.com/open-feature/open-feature-operator/apis v0.2.44 h1:0r4Z+RnJltuHdRBv79NFgAckhna6/M3Wcec6gzNX5vI=
github.com/open-feature/open-feature-operator/apis v0.2.44/go.mod h1:xB2uLzvUkbydieX7q6/NqannBz3bt/e5BS2DeOyyw4Q=
github.com/open-feature/open-feature-operator/apis v0.2.45 h1:URnUf22ZoAx7/W8ek8dXCBYgY8FmnFEuEOSDLROQafY=

View File

@ -104,7 +104,6 @@ func (l *oldHandler) SyncFlags(
case d := <-dataSync:
if err := stream.Send(&syncv1.SyncFlagsResponse{
FlagConfiguration: d.FlagData,
State: dataSyncToGrpcState(d),
}); err != nil {
return fmt.Errorf("error sending configuration change event: %w", err)
}
@ -115,8 +114,3 @@ func (l *oldHandler) SyncFlags(
}
}
}
//nolint:staticcheck
func dataSyncToGrpcState(s sync.DataSync) syncv1.SyncState {
return syncv1.SyncState(s.Type + 1)
}

View File

@ -110,7 +110,6 @@ func Test_watchResource(t *testing.T) {
in := isync.DataSync{
FlagData: "im a flag",
Source: "im a flag source",
Type: isync.ALL,
}
syncMock.dataSyncChanIn <- in
@ -335,7 +334,6 @@ func Test_FetchAllFlags(t *testing.T) {
mockData: &isync.DataSync{
FlagData: "im a flag",
Source: "im a flag source",
Type: isync.ALL,
},
setHandler: true,
},
@ -402,7 +400,6 @@ func Test_registerSubscriptionResyncPath(t *testing.T) {
data: &isync.DataSync{
FlagData: "im a flag",
Source: "im a flag source",
Type: isync.ALL,
},
expectErr: false,
},

View File

@ -1,5 +1,24 @@
# Changelog
## [0.12.9](https://github.com/open-feature/flagd/compare/flagd/v0.12.8...flagd/v0.12.9) (2025-07-28)
### ✨ New Features
* Add toggle for disabling getMetadata request ([#1693](https://github.com/open-feature/flagd/issues/1693)) ([e8fd680](https://github.com/open-feature/flagd/commit/e8fd6808608caa7ff7e54792fe97d88e7c294486))
## [0.12.8](https://github.com/open-feature/flagd/compare/flagd/v0.12.7...flagd/v0.12.8) (2025-07-21)
### 🐛 Bug Fixes
* update to latest otel semconv ([#1668](https://github.com/open-feature/flagd/issues/1668)) ([81855d7](https://github.com/open-feature/flagd/commit/81855d76f94a09251a19a05f830cc1d11ab6b566))
### 🧹 Chore
* **deps:** update module github.com/open-feature/flagd/core to v0.11.8 ([#1685](https://github.com/open-feature/flagd/issues/1685)) ([c07ffba](https://github.com/open-feature/flagd/commit/c07ffba55426d538224d8564be5f35339d2258d0))
## [0.12.7](https://github.com/open-feature/flagd/compare/flagd/v0.12.6...flagd/v0.12.7) (2025-07-15)

View File

@ -36,6 +36,7 @@ const (
syncPortFlagName = "sync-port"
syncSocketPathFlagName = "sync-socket-path"
uriFlagName = "uri"
disableSyncMetadata = "disable-sync-metadata"
contextValueFlagName = "context-value"
headerToContextKeyFlagName = "context-from-header"
streamDeadlineFlagName = "stream-deadline"
@ -89,6 +90,7 @@ func init() {
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map "+
"header values to context values, where key is Header name, value is context key")
flags.Duration(streamDeadlineFlagName, 0, "Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).")
flags.Bool(disableSyncMetadata, false, "Disables the getMetadata endpoint of the sync service. Defaults to false, but will default to true in later versions.")
bindFlags(flags)
}
@ -114,6 +116,7 @@ func bindFlags(flags *pflag.FlagSet) {
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
_ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName))
_ = viper.BindPFlag(disableSyncMetadata, flags.Lookup(disableSyncMetadata))
}
// startCmd represents the start command
@ -186,6 +189,7 @@ var startCmd = &cobra.Command{
SyncServicePort: viper.GetUint16(syncPortFlagName),
SyncServiceSocketPath: viper.GetString(syncSocketPathFlagName),
StreamDeadline: viper.GetDuration(streamDeadlineFlagName),
DisableSyncMetadata: viper.GetBool(disableSyncMetadata),
SyncProviders: syncProviders,
ContextValues: contextValuesToMap,
HeaderToContextKeyMappings: headerToContextKeyMappings,

View File

@ -12,7 +12,7 @@ require (
github.com/dimiro1/banner v1.1.0
github.com/gorilla/mux v1.8.1
github.com/mattn/go-colorable v0.1.14
github.com/open-feature/flagd/core v0.11.6
github.com/open-feature/flagd/core v0.11.8
github.com/prometheus/client_golang v1.22.0
github.com/rs/cors v1.11.1
github.com/rs/xid v1.6.0
@ -110,6 +110,9 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.14.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-memdb v1.3.5 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
@ -122,7 +125,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/open-feature/flagd-schemas v0.2.9-0.20250319190911-9b0ee43ecc47 // indirect
github.com/open-feature/flagd-schemas v0.2.9-0.20250707123415-08b4c52d3b86 // indirect
github.com/open-feature/open-feature-operator/apis v0.2.45 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect

View File

@ -313,6 +313,14 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0Ntos
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 h1:X5VWvz21y3gzm9Nw/kaUeku/1+uBhcekkmy4IkffJww=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1/go.mod h1:Zanoh4+gvIgluNqcfMVTJueD4wSS5hT7zTt4Mrutd90=
github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc=
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-memdb v1.3.5 h1:b3taDMxCBCBVgyRrS1AZVHO14ubMYZB++QpNhBg+Nyo=
github.com/hashicorp/go-memdb v1.3.5/go.mod h1:8IVKKBkVe+fxFgdFOYxzQQNjz+sWCyHCdIC/+5+Vy1Y=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@ -359,10 +367,14 @@ github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/open-feature/flagd-schemas v0.2.9-0.20250319190911-9b0ee43ecc47 h1:c6nodciz/xeU0xiAcDQ5MBW34DnPoi5/lEgjV5kZeZA=
github.com/open-feature/flagd-schemas v0.2.9-0.20250319190911-9b0ee43ecc47/go.mod h1:WKtwo1eW9/K6D+4HfgTXWBqCDzpvMhDa5eRxW7R5B2U=
github.com/open-feature/flagd-schemas v0.2.9-0.20250707123415-08b4c52d3b86 h1:r3e+qs3QUdf4+lUi2ZZnSHgYkjeLIb5yu5jo+ypA8iw=
github.com/open-feature/flagd-schemas v0.2.9-0.20250707123415-08b4c52d3b86/go.mod h1:WKtwo1eW9/K6D+4HfgTXWBqCDzpvMhDa5eRxW7R5B2U=
github.com/open-feature/flagd/core v0.11.5 h1:2zUbXN3Uh/cKZZVOQo9Z+Evjb9s15HcCtdo8vaNiBPY=
github.com/open-feature/flagd/core v0.11.5/go.mod h1:VBCrZ6bs+Occ226391YoIOnNEXmNvpWsQEeYfsLJ23c=
github.com/open-feature/flagd/core v0.11.6 h1:qrGYGUBmFABlvFA46LkLoG/DTwchUm+OLKX+XUytgS4=
github.com/open-feature/flagd/core v0.11.6/go.mod h1:UUhSAf0NSbGmFbdmkOf93YP90DA8O6ahVYn+vJO4BMg=
github.com/open-feature/flagd/core v0.11.8 h1:84uDdSzhtVNBnsjuAnBqBXbFwXC2CQ6aO5cNNKKM7uc=
github.com/open-feature/flagd/core v0.11.8/go.mod h1:3dNe+BX8HUpx/mXrGLD554G6cQB67yvuASVTKVC4TU4=
github.com/open-feature/open-feature-operator/apis v0.2.44 h1:0r4Z+RnJltuHdRBv79NFgAckhna6/M3Wcec6gzNX5vI=
github.com/open-feature/open-feature-operator/apis v0.2.44/go.mod h1:xB2uLzvUkbydieX7q6/NqannBz3bt/e5BS2DeOyyw4Q=
github.com/open-feature/open-feature-operator/apis v0.2.45 h1:URnUf22ZoAx7/W8ek8dXCBYgY8FmnFEuEOSDLROQafY=

View File

@ -39,6 +39,7 @@ type Config struct {
SyncServicePort uint16
SyncServiceSocketPath string
StreamDeadline time.Duration
DisableSyncMetadata bool
SyncProviders []sync.SourceConfig
CORS []string
@ -78,21 +79,20 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
logger.Error(fmt.Sprintf("error building metrics recorder: %v", err))
}
// build flag store, collect flag sources & fill sources details
s := store.NewFlags()
sources := []string{}
for _, provider := range config.SyncProviders {
s.FlagSources = append(s.FlagSources, provider.URI)
s.SourceDetails[provider.URI] = store.SourceDetails{
Source: provider.URI,
Selector: provider.Selector,
}
sources = append(sources, provider.URI)
}
// build flag store, collect flag sources & fill sources details
store, err := store.NewStore(logger, sources)
if err != nil {
return nil, fmt.Errorf("error creating flag store: %w", err)
}
// derive evaluator
jsonEvaluator := evaluator.NewJSON(logger, s)
jsonEvaluator := evaluator.NewJSON(logger, store)
// derive services
@ -100,6 +100,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
connectService := flageval.NewConnectService(
logger.WithFields(zap.String("component", "service")),
jsonEvaluator,
store,
recorder)
// ofrep service
@ -111,20 +112,21 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
config.HeaderToContextKeyMappings,
)
if err != nil {
return nil, fmt.Errorf("error creating ofrep service")
return nil, fmt.Errorf("error creating OFREP service: %w", err)
}
// flag sync service
flagSyncService, err := flagsync.NewSyncService(flagsync.SvcConfigurations{
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
Port: config.SyncServicePort,
Sources: sources,
Store: s,
ContextValues: config.ContextValues,
KeyPath: config.ServiceKeyPath,
CertPath: config.ServiceCertPath,
SocketPath: config.SyncServiceSocketPath,
StreamDeadline: config.StreamDeadline,
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
Port: config.SyncServicePort,
Sources: sources,
Store: store,
ContextValues: config.ContextValues,
KeyPath: config.ServiceKeyPath,
CertPath: config.ServiceCertPath,
SocketPath: config.SyncServiceSocketPath,
StreamDeadline: config.StreamDeadline,
DisableSyncMetadata: config.DisableSyncMetadata,
})
if err != nil {
return nil, fmt.Errorf("error creating sync service: %w", err)
@ -144,11 +146,11 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
}
return &Runtime{
Logger: logger.WithFields(zap.String("component", "runtime")),
Evaluator: jsonEvaluator,
FlagSync: flagSyncService,
OfrepService: ofrepService,
Service: connectService,
Logger: logger.WithFields(zap.String("component", "runtime")),
Evaluator: jsonEvaluator,
SyncService: flagSyncService,
OfrepService: ofrepService,
EvaluationService: connectService,
ServiceConfig: service.Configuration{
Port: config.ServicePort,
ManagementPort: config.ManagementPort,
@ -162,7 +164,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
HeaderToContextKeyMappings: config.HeaderToContextKeyMappings,
StreamDeadline: config.StreamDeadline,
},
SyncImpl: iSyncs,
Syncs: iSyncs,
}, nil
}

View File

@ -19,23 +19,23 @@ import (
)
type Runtime struct {
Evaluator evaluator.IEvaluator
Logger *logger.Logger
FlagSync flagsync.ISyncService
OfrepService ofrep.IOfrepService
Service service.IFlagEvaluationService
ServiceConfig service.Configuration
SyncImpl []sync.ISync
Evaluator evaluator.IEvaluator
Logger *logger.Logger
SyncService flagsync.ISyncService
OfrepService ofrep.IOfrepService
EvaluationService service.IFlagEvaluationService
ServiceConfig service.Configuration
Syncs []sync.ISync
mu msync.Mutex
}
//nolint:funlen
func (r *Runtime) Start() error {
if r.Service == nil {
if r.EvaluationService == nil {
return errors.New("no service set")
}
if len(r.SyncImpl) == 0 {
if len(r.Syncs) == 0 {
return errors.New("no sync implementation set")
}
if r.Evaluator == nil {
@ -44,40 +44,26 @@ func (r *Runtime) Start() error {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
g, gCtx := errgroup.WithContext(ctx)
dataSync := make(chan sync.DataSync, len(r.SyncImpl))
dataSync := make(chan sync.DataSync, len(r.Syncs))
// Initialize DataSync channel watcher
g.Go(func() error {
for {
select {
case data := <-dataSync:
// resync events are triggered when a delete occurs during flag merges in the store
// resync events may trigger further resync events, however for a flag to be deleted from the store
// its source must match, preventing the opportunity for resync events to snowball
if resyncRequired := r.updateAndEmit(data); resyncRequired {
for _, s := range r.SyncImpl {
p := s
g.Go(func() error {
err := p.ReSync(gCtx, dataSync)
if err != nil {
return fmt.Errorf("error resyncing sources: %w", err)
}
return nil
})
}
}
r.updateAndEmit(data)
case <-gCtx.Done():
return nil
}
}
})
// Init sync providers
for _, s := range r.SyncImpl {
for _, s := range r.Syncs {
if err := s.Init(gCtx); err != nil {
return fmt.Errorf("sync provider Init returned error: %w", err)
}
}
// Start sync provider
for _, s := range r.SyncImpl {
for _, s := range r.Syncs {
p := s
g.Go(func() error {
if err := p.Sync(gCtx, dataSync); err != nil {
@ -89,14 +75,14 @@ func (r *Runtime) Start() error {
defer func() {
r.Logger.Info("Shutting down server...")
r.Service.Shutdown()
r.EvaluationService.Shutdown()
r.Logger.Info("Server successfully shutdown.")
}()
g.Go(func() error {
// Readiness probe rely on the runtime
r.ServiceConfig.ReadinessProbe = r.isReady
if err := r.Service.Serve(gCtx, r.ServiceConfig); err != nil {
if err := r.EvaluationService.Serve(gCtx, r.ServiceConfig); err != nil {
return fmt.Errorf("error returned from serving flag evaluation service: %w", err)
}
return nil
@ -112,7 +98,7 @@ func (r *Runtime) Start() error {
})
g.Go(func() error {
err := r.FlagSync.Start(gCtx)
err := r.SyncService.Start(gCtx)
if err != nil {
return fmt.Errorf("error from sync server: %w", err)
}
@ -128,7 +114,7 @@ func (r *Runtime) Start() error {
func (r *Runtime) isReady() bool {
// if all providers can watch for flag changes, we are ready.
for _, p := range r.SyncImpl {
for _, p := range r.Syncs {
if !p.IsReady() {
return false
}
@ -137,24 +123,14 @@ func (r *Runtime) isReady() bool {
}
// updateAndEmit helps to update state, notify changes and trigger sync updates
func (r *Runtime) updateAndEmit(payload sync.DataSync) bool {
func (r *Runtime) updateAndEmit(payload sync.DataSync) {
r.mu.Lock()
defer r.mu.Unlock()
notifications, resyncRequired, err := r.Evaluator.SetState(payload)
_, _, err := r.Evaluator.SetState(payload)
if err != nil {
r.Logger.Error(err.Error())
return false
r.Logger.Error(fmt.Sprintf("error setting state: %v", err))
return
}
r.Service.Notify(service.Notification{
Type: service.ConfigurationChange,
Data: map[string]interface{}{
"flags": notifications,
},
})
r.FlagSync.Emit(resyncRequired, payload.Source)
return resyncRequired
r.SyncService.Emit(payload.Source)
}

View File

@ -0,0 +1,3 @@
package service
const FLAGD_SELECTOR_HEADER = "Flagd-Selector"

View File

@ -16,6 +16,7 @@ import (
"github.com/open-feature/flagd/core/pkg/evaluator"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/telemetry"
"github.com/open-feature/flagd/flagd/pkg/service/middleware"
corsmw "github.com/open-feature/flagd/flagd/pkg/service/middleware/cors"
@ -71,15 +72,17 @@ type ConnectService struct {
// NewConnectService creates a ConnectService with provided parameters
func NewConnectService(
logger *logger.Logger, evaluator evaluator.IEvaluator, mRecorder telemetry.IMetricsRecorder,
logger *logger.Logger, evaluator evaluator.IEvaluator, store store.IStore, mRecorder telemetry.IMetricsRecorder,
) *ConnectService {
cs := &ConnectService{
logger: logger,
eval: evaluator,
metrics: &telemetry.NoopMetricsRecorder{},
eventingConfiguration: &eventingConfiguration{
subs: make(map[interface{}]chan service.Notification),
mu: &sync.RWMutex{},
subs: make(map[interface{}]chan service.Notification),
mu: &sync.RWMutex{},
store: store,
logger: logger,
},
}
if mRecorder != nil {

View File

@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"os"
"sync"
"testing"
"time"
@ -14,9 +15,11 @@ import (
mock "github.com/open-feature/flagd/core/pkg/evaluator/mock"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/notifications"
iservice "github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/telemetry"
"github.com/open-feature/flagd/flagd/pkg/service/middleware/mock"
middlewaremock "github.com/open-feature/flagd/flagd/pkg/service/middleware/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
@ -81,7 +84,7 @@ func TestConnectService_UnixConnection(t *testing.T) {
exp := metric.NewManualReader()
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, tt.name)
svc := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)
svc := NewConnectService(logger.NewLogger(nil, false), eval, &store.Store{}, metricRecorder)
serveConf := iservice.Configuration{
ReadinessProbe: func() bool {
return true
@ -136,7 +139,7 @@ func TestAddMiddleware(t *testing.T) {
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")
svc := NewConnectService(logger.NewLogger(nil, false), nil, metricRecorder)
svc := NewConnectService(logger.NewLogger(nil, false), nil, &store.Store{}, metricRecorder)
serveConf := iservice.Configuration{
ReadinessProbe: func() bool {
@ -173,16 +176,22 @@ func TestConnectServiceNotify(t *testing.T) {
// given
ctrl := gomock.NewController(t)
eval := mock.NewMockIEvaluator(ctrl)
sources := []string{"source1", "source2"}
log := logger.NewLogger(nil, false)
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
exp := metric.NewManualReader()
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")
service := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)
service := NewConnectService(logger.NewLogger(nil, false), eval, s, metricRecorder)
sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration
eventing.Subscribe("key", sChan)
eventing.Subscribe(context.Background(), "key", nil, sChan)
// notification type
ofType := iservice.ConfigurationChange
@ -207,20 +216,73 @@ func TestConnectServiceNotify(t *testing.T) {
}
}
func TestConnectServiceWatcher(t *testing.T) {
sources := []string{"source1", "source2"}
log := logger.NewLogger(nil, false)
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
sChan := make(chan iservice.Notification, 1)
eventing := eventingConfiguration{
store: s,
logger: log,
mu: &sync.RWMutex{},
subs: make(map[any]chan iservice.Notification),
}
// subscribe and wait for for the sub to be active
eventing.Subscribe(context.Background(), "anything", nil, sChan)
time.Sleep(100 * time.Millisecond)
// make a change
s.Update(sources[0], map[string]model.Flag{
"flag1": {
Key: "flag1",
DefaultVariant: "off",
},
}, model.Metadata{})
// notification type
ofType := iservice.ConfigurationChange
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case n := <-sChan:
require.Equal(t, ofType, n.Type, "expected notification type: %s, but received %s", ofType, n.Type)
notifications := n.Data["flags"].(notifications.Notifications)
flag1, ok := notifications["flag1"].(map[string]interface{})
require.True(t, ok, "flag1 notification should be a map[string]interface{}")
require.Equal(t, flag1["type"], string(model.NotificationCreate), "expected notification type: %s, but received %s", model.NotificationCreate, flag1["type"])
case <-timeout.Done():
t.Error("timeout while waiting for notifications")
}
}
func TestConnectServiceShutdown(t *testing.T) {
// given
ctrl := gomock.NewController(t)
eval := mock.NewMockIEvaluator(ctrl)
sources := []string{"source1", "source2"}
log := logger.NewLogger(nil, false)
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
exp := metric.NewManualReader()
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")
service := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)
service := NewConnectService(logger.NewLogger(nil, false), eval, s, metricRecorder)
sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration
eventing.Subscribe("key", sChan)
eventing.Subscribe(context.Background(), "key", nil, sChan)
// notification type
ofType := iservice.Shutdown

View File

@ -1,29 +1,65 @@
package service
import (
"context"
"fmt"
"sync"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/notifications"
iservice "github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
)
// IEvents is an interface for event subscriptions
type IEvents interface {
Subscribe(id any, notifyChan chan iservice.Notification)
Subscribe(ctx context.Context, id any, selector *store.Selector, notifyChan chan iservice.Notification)
Unsubscribe(id any)
EmitToAll(n iservice.Notification)
}
var _ IEvents = &eventingConfiguration{}
// eventingConfiguration is a wrapper for notification subscriptions
type eventingConfiguration struct {
mu *sync.RWMutex
subs map[any]chan iservice.Notification
mu *sync.RWMutex
subs map[any]chan iservice.Notification
store store.IStore
logger *logger.Logger
}
func (eventing *eventingConfiguration) Subscribe(id any, notifyChan chan iservice.Notification) {
func (eventing *eventingConfiguration) Subscribe(ctx context.Context, id any, selector *store.Selector, notifier chan iservice.Notification) {
eventing.mu.Lock()
defer eventing.mu.Unlock()
eventing.subs[id] = notifyChan
// proxy events from our store watcher to the notify channel, so that RPC mode event streams
watcher := make(chan store.FlagQueryResult, 1)
go func() {
// store the previous flags to compare against new notifications, to compute proper diffs for RPC mode
var oldFlags map[string]model.Flag
for result := range watcher {
newFlags := result.Flags
// ignore the first notification (nil old flags), the watcher emits on initialization, but for RPC we don't care until there's a change
if oldFlags != nil {
notifications := notifications.NewFromFlags(oldFlags, newFlags)
notifier <- iservice.Notification{
Type: iservice.ConfigurationChange,
Data: map[string]interface{}{
"flags": notifications,
},
}
}
oldFlags = result.Flags
}
eventing.logger.Debug(fmt.Sprintf("closing notify channel for id %v", id))
close(notifier)
}()
eventing.store.Watch(ctx, selector, watcher)
eventing.subs[id] = notifier
}
func (eventing *eventingConfiguration) EmitToAll(n iservice.Notification) {

View File

@ -1,18 +1,29 @@
package service
import (
"context"
"sync"
"testing"
"github.com/open-feature/flagd/core/pkg/logger"
iservice "github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/stretchr/testify/require"
)
func TestSubscribe(t *testing.T) {
// given
sources := []string{"source1", "source2"}
log := logger.NewLogger(nil, false)
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
eventing := &eventingConfiguration{
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
store: s,
}
idA := "a"
@ -22,8 +33,8 @@ func TestSubscribe(t *testing.T) {
chanB := make(chan iservice.Notification, 1)
// when
eventing.Subscribe(idA, chanA)
eventing.Subscribe(idB, chanB)
eventing.Subscribe(context.Background(), idA, nil, chanA)
eventing.Subscribe(context.Background(), idB, nil, chanB)
// then
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
@ -32,9 +43,16 @@ func TestSubscribe(t *testing.T) {
func TestUnsubscribe(t *testing.T) {
// given
sources := []string{"source1", "source2"}
log := logger.NewLogger(nil, false)
s, err := store.NewStore(log, sources)
if err != nil {
t.Fatalf("NewStore failed: %v", err)
}
eventing := &eventingConfiguration{
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
store: s,
}
idA := "a"
@ -43,8 +61,8 @@ func TestUnsubscribe(t *testing.T) {
chanB := make(chan iservice.Notification, 1)
// when
eventing.Subscribe(idA, chanA)
eventing.Subscribe(idB, chanB)
eventing.Subscribe(context.Background(), idA, nil, chanA)
eventing.Subscribe(context.Background(), idB, nil, chanB)
eventing.Unsubscribe(idA)

View File

@ -12,7 +12,9 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/telemetry"
flagdService "github.com/open-feature/flagd/flagd/pkg/service"
"github.com/rs/xid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
@ -67,13 +69,17 @@ func (s *OldFlagEvaluationService) ResolveAll(
) (*connect.Response[schemaV1.ResolveAllResponse], error) {
reqID := xid.New().String()
defer s.logger.ClearFields(reqID)
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
res := &schemaV1.ResolveAllResponse{
Flags: make(map[string]*schemaV1.AnyFlag),
}
values, _, err := s.eval.ResolveAllValues(sCtx, reqID, mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), make(map[string]string)))
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
values, _, err := s.eval.ResolveAllValues(ctx, reqID, mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), make(map[string]string)))
if err != nil {
s.logger.WarnWithID(reqID, fmt.Sprintf("error resolving all flags: %v", err))
return nil, fmt.Errorf("error resolving flags. Tracking ID: %s", reqID)
@ -82,7 +88,7 @@ func (s *OldFlagEvaluationService) ResolveAll(
span.SetAttributes(attribute.Int("feature_flag.count", len(values)))
for _, value := range values {
// register the impression and reason for each flag evaluated
s.metrics.RecordEvaluation(sCtx, value.Error, value.Reason, value.Variant, value.FlagKey)
s.metrics.RecordEvaluation(ctx, value.Error, value.Reason, value.Variant, value.FlagKey)
switch v := value.Value.(type) {
case bool:
@ -133,8 +139,12 @@ func (s *OldFlagEvaluationService) EventStream(
req *connect.Request[schemaV1.EventStreamRequest],
stream *connect.ServerStream[schemaV1.EventStreamResponse],
) error {
s.logger.Debug(fmt.Sprintf("starting event stream for request"))
requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
s.eventingConfiguration.Subscribe(ctx, req, &selector, requestNotificationChan)
defer s.eventingConfiguration.Unsubscribe(req)
requestNotificationChan <- service.Notification{
@ -172,12 +182,15 @@ func (s *OldFlagEvaluationService) ResolveBoolean(
ctx context.Context,
req *connect.Request[schemaV1.ResolveBooleanRequest],
) (*connect.Response[schemaV1.ResolveBooleanResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
res := connect.NewResponse(&schemaV1.ResolveBooleanResponse{})
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
err := resolve[bool](
sCtx,
ctx,
s.logger,
s.eval.ResolveBooleanValue,
req.Header(),
@ -201,12 +214,16 @@ func (s *OldFlagEvaluationService) ResolveString(
ctx context.Context,
req *connect.Request[schemaV1.ResolveStringRequest],
) (*connect.Response[schemaV1.ResolveStringResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&schemaV1.ResolveStringResponse{})
err := resolve[string](
sCtx,
ctx,
s.logger,
s.eval.ResolveStringValue,
req.Header(),
@ -230,12 +247,16 @@ func (s *OldFlagEvaluationService) ResolveInt(
ctx context.Context,
req *connect.Request[schemaV1.ResolveIntRequest],
) (*connect.Response[schemaV1.ResolveIntResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&schemaV1.ResolveIntResponse{})
err := resolve[int64](
sCtx,
ctx,
s.logger,
s.eval.ResolveIntValue,
req.Header(),
@ -259,12 +280,16 @@ func (s *OldFlagEvaluationService) ResolveFloat(
ctx context.Context,
req *connect.Request[schemaV1.ResolveFloatRequest],
) (*connect.Response[schemaV1.ResolveFloatResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&schemaV1.ResolveFloatResponse{})
err := resolve[float64](
sCtx,
ctx,
s.logger,
s.eval.ResolveFloatValue,
req.Header(),
@ -288,12 +313,16 @@ func (s *OldFlagEvaluationService) ResolveObject(
ctx context.Context,
req *connect.Request[schemaV1.ResolveObjectRequest],
) (*connect.Response[schemaV1.ResolveObjectResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&schemaV1.ResolveObjectResponse{})
err := resolve[map[string]any](
sCtx,
ctx,
s.logger,
s.eval.ResolveObjectValue,
req.Header(),
@ -312,7 +341,7 @@ func (s *OldFlagEvaluationService) ResolveObject(
return res, err
}
// mergeContexts combines context values from headers, static context (from cli) and request context.
// mergeContexts combines context values from headers, static context (from cli) and request context.
// highest priority > header-context-from-cli > static-context-from-cli > request-context > lowest priority
func mergeContexts(reqCtx, configFlagsCtx map[string]any, headers http.Header, headerToContextKeyMappings map[string]string) map[string]any {
merged := make(map[string]any)
@ -338,7 +367,7 @@ func resolve[T constraints](ctx context.Context, logger *logger.Logger, resolver
reqID := xid.New().String()
defer logger.ClearFields(reqID)
mergedContext := mergeContexts(evaluationContext.AsMap(), configContextValues, header, configHeaderToContextKeyMappings)
mergedContext := mergeContexts(evaluationContext.AsMap(), configContextValues, header, configHeaderToContextKeyMappings)
logger.WriteFields(
reqID,

View File

@ -11,7 +11,9 @@ import (
"github.com/open-feature/flagd/core/pkg/evaluator"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/telemetry"
flagdService "github.com/open-feature/flagd/flagd/pkg/service"
"github.com/rs/xid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
@ -66,16 +68,19 @@ func (s *FlagEvaluationService) ResolveAll(
reqID := xid.New().String()
defer s.logger.ClearFields(reqID)
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveAll", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
res := &evalV1.ResolveAllResponse{
Flags: make(map[string]*evalV1.AnyFlag),
}
context := mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), s.headerToContextKeyMappings)
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
evaluationContext := mergeContexts(req.Msg.GetContext().AsMap(), s.contextValues, req.Header(), s.headerToContextKeyMappings)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
resolutions, flagSetMetadata, err := s.eval.ResolveAllValues(sCtx, reqID, context)
resolutions, flagSetMetadata, err := s.eval.ResolveAllValues(ctx, reqID, evaluationContext)
if err != nil {
s.logger.WarnWithID(reqID, fmt.Sprintf("error resolving all flags: %v", err))
return nil, fmt.Errorf("error resolving flags. Tracking ID: %s", reqID)
@ -84,7 +89,7 @@ func (s *FlagEvaluationService) ResolveAll(
span.SetAttributes(attribute.Int("feature_flag.count", len(resolutions)))
for _, resolved := range resolutions {
// register the impression and reason for each flag evaluated
s.metrics.RecordEvaluation(sCtx, resolved.Error, resolved.Reason, resolved.Variant, resolved.FlagKey)
s.metrics.RecordEvaluation(ctx, resolved.Error, resolved.Reason, resolved.Variant, resolved.FlagKey)
switch v := resolved.Value.(type) {
case bool:
res.Flags[resolved.FlagKey] = &evalV1.AnyFlag{
@ -147,17 +152,21 @@ func (s *FlagEvaluationService) EventStream(
req *connect.Request[evalV1.EventStreamRequest],
stream *connect.ServerStream[evalV1.EventStreamResponse],
) error {
serviceCtx := ctx
// attach server-side stream deadline to context
s.logger.Debug("starting event stream for request")
if s.deadline != 0 {
streamDeadline := time.Now().Add(s.deadline)
deadlineCtx, cancel := context.WithDeadline(ctx, streamDeadline)
serviceCtx = deadlineCtx
ctx = deadlineCtx
defer cancel()
}
s.logger.Debug("starting event stream for request")
requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
s.eventingConfiguration.Subscribe(ctx, req, &selector, requestNotificationChan)
defer s.eventingConfiguration.Unsubscribe(req)
requestNotificationChan <- service.Notification{
@ -184,8 +193,8 @@ func (s *FlagEvaluationService) EventStream(
if err != nil {
s.logger.Error(err.Error())
}
case <-serviceCtx.Done():
if errors.Is(serviceCtx.Err(), context.DeadlineExceeded) {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
s.logger.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
return connect.NewError(connect.CodeDeadlineExceeded, fmt.Errorf("%s", "stream closed due to server-side timeout"))
}
@ -198,12 +207,16 @@ func (s *FlagEvaluationService) ResolveBoolean(
ctx context.Context,
req *connect.Request[evalV1.ResolveBooleanRequest],
) (*connect.Response[evalV1.ResolveBooleanResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveBoolean", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&evalV1.ResolveBooleanResponse{})
err := resolve(
sCtx,
ctx,
s.logger,
s.eval.ResolveBooleanValue,
req.Header(),
@ -226,12 +239,16 @@ func (s *FlagEvaluationService) ResolveString(
ctx context.Context,
req *connect.Request[evalV1.ResolveStringRequest],
) (*connect.Response[evalV1.ResolveStringResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveString", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&evalV1.ResolveStringResponse{})
err := resolve(
sCtx,
ctx,
s.logger,
s.eval.ResolveStringValue,
req.Header(),
@ -254,12 +271,16 @@ func (s *FlagEvaluationService) ResolveInt(
ctx context.Context,
req *connect.Request[evalV1.ResolveIntRequest],
) (*connect.Response[evalV1.ResolveIntResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveInt", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&evalV1.ResolveIntResponse{})
err := resolve(
sCtx,
ctx,
s.logger,
s.eval.ResolveIntValue,
req.Header(),
@ -282,12 +303,16 @@ func (s *FlagEvaluationService) ResolveFloat(
ctx context.Context,
req *connect.Request[evalV1.ResolveFloatRequest],
) (*connect.Response[evalV1.ResolveFloatResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveFloat", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&evalV1.ResolveFloatResponse{})
err := resolve(
sCtx,
ctx,
s.logger,
s.eval.ResolveFloatValue,
req.Header(),
@ -310,12 +335,16 @@ func (s *FlagEvaluationService) ResolveObject(
ctx context.Context,
req *connect.Request[evalV1.ResolveObjectRequest],
) (*connect.Response[evalV1.ResolveObjectResponse], error) {
sCtx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := s.flagEvalTracer.Start(ctx, "resolveObject", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
selectorExpression := req.Header().Get(flagdService.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx = context.WithValue(ctx, store.SelectorContextKey{}, selector)
res := connect.NewResponse(&evalV1.ResolveObjectResponse{})
err := resolve(
sCtx,
ctx,
s.logger,
s.eval.ResolveObjectValue,
req.Header(),

View File

@ -1,6 +1,7 @@
package ofrep
import (
"context"
"encoding/json"
"fmt"
"net/http"
@ -10,6 +11,8 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/service/ofrep"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/flagd/pkg/service"
"github.com/rs/xid"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
@ -64,9 +67,12 @@ func (h *handler) HandleFlagEvaluation(w http.ResponseWriter, r *http.Request) {
h.writeJSONToResponse(http.StatusBadRequest, ofrep.ContextErrorResponseFrom(flagKey), w)
return
}
context := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
evaluationContext := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
selectorExpression := r.Header.Get(service.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx := context.WithValue(r.Context(), store.SelectorContextKey{}, selector)
evaluation := h.evaluator.ResolveAsAnyValue(r.Context(), requestID, flagKey, context)
evaluation := h.evaluator.ResolveAsAnyValue(ctx, requestID, flagKey, evaluationContext)
if evaluation.Error != nil {
status, evaluationError := ofrep.EvaluationErrorResponseFrom(evaluation)
h.writeJSONToResponse(status, evaluationError, w)
@ -85,9 +91,12 @@ func (h *handler) HandleBulkEvaluation(w http.ResponseWriter, r *http.Request) {
return
}
context := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
evaluationContext := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
selectorExpression := r.Header.Get(service.FLAGD_SELECTOR_HEADER)
selector := store.NewSelector(selectorExpression)
ctx := context.WithValue(r.Context(), store.SelectorContextKey{}, selector)
evaluations, metadata, err := h.evaluator.ResolveAllValues(r.Context(), requestID, context)
evaluations, metadata, err := h.evaluator.ResolveAllValues(ctx, requestID, evaluationContext)
if err != nil {
h.Logger.WarnWithID(requestID, fmt.Sprintf("error from resolver: %v", err))
@ -127,7 +136,7 @@ func extractOfrepRequest(req *http.Request) (ofrep.Request, error) {
return request, nil
}
// flagdContext returns combined context values from headers, static context (from cli) and request context.
// flagdContext returns combined context values from headers, static context (from cli) and request context.
// highest priority > header-context-from-cli > static-context-from-cli > request-context > lowest priority
func flagdContext(
log *logger.Logger, requestID string, request ofrep.Request, staticContextValues map[string]any, headers http.Header, headerToContextKeyMappings map[string]string,
@ -152,4 +161,4 @@ func flagdContext(
}
return context
}
}

View File

@ -2,73 +2,74 @@ package sync
import (
"context"
"encoding/json"
"errors"
"fmt"
"maps"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"time"
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/store"
"google.golang.org/protobuf/types/known/structpb"
)
// syncHandler implements the sync contract
type syncHandler struct {
mux *Multiplexer
log *logger.Logger
contextValues map[string]any
deadline time.Duration
//mux *Multiplexer
store *store.Store
log *logger.Logger
contextValues map[string]any
deadline time.Duration
disableSyncMetadata bool
}
func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.FlagSyncService_SyncFlagsServer) error {
muxPayload := make(chan payload, 1)
selector := req.GetSelector()
watcher := make(chan store.FlagQueryResult, 1)
selectorExpression := req.GetSelector()
selector := store.NewSelector(selectorExpression)
ctx := server.Context()
syncContextMap := make(map[string]any)
maps.Copy(syncContextMap, s.contextValues)
syncContext, err := structpb.NewStruct(syncContextMap)
if err != nil {
return status.Error(codes.DataLoss, "error constructing sync context")
}
// attach server-side stream deadline to context
if s.deadline != 0 {
streamDeadline := time.Now().Add(s.deadline)
deadlineCtx, cancel := context.WithDeadline(server.Context(), streamDeadline)
deadlineCtx, cancel := context.WithDeadline(ctx, streamDeadline)
ctx = deadlineCtx
defer cancel()
}
err := s.mux.Register(ctx, selector, muxPayload)
if err != nil {
return err
}
s.store.Watch(ctx, &selector, watcher)
for {
select {
case payload := <-muxPayload:
metadataSrc := make(map[string]any)
maps.Copy(metadataSrc, s.contextValues)
if sources := s.mux.SourcesAsMetadata(); sources != "" {
metadataSrc["sources"] = sources
}
metadata, err := structpb.NewStruct(metadataSrc)
case payload := <-watcher:
if err != nil {
s.log.Error(fmt.Sprintf("error from struct creation: %v", err))
return fmt.Errorf("error constructing metadata response")
}
flags, err := json.Marshal(payload.Flags)
if err != nil {
s.log.Error(fmt.Sprintf("error retrieving flags from store: %v", err))
return status.Error(codes.DataLoss, "error marshalling flags")
}
err = server.Send(&syncv1.SyncFlagsResponse{
FlagConfiguration: payload.flags,
SyncContext: metadata,
})
err = server.Send(&syncv1.SyncFlagsResponse{FlagConfiguration: string(flags), SyncContext: syncContext})
if err != nil {
s.log.Debug(fmt.Sprintf("error sending stream response: %v", err))
return fmt.Errorf("error sending stream response: %w", err)
}
case <-ctx.Done():
s.mux.Unregister(ctx, selector)
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
s.log.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
return status.Error(codes.DeadlineExceeded, "stream closed due to server-side timeout")
@ -79,16 +80,25 @@ func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.F
}
}
func (s syncHandler) FetchAllFlags(_ context.Context, req *syncv1.FetchAllFlagsRequest) (
func (s syncHandler) FetchAllFlags(ctx context.Context, req *syncv1.FetchAllFlagsRequest) (
*syncv1.FetchAllFlagsResponse, error,
) {
flags, err := s.mux.GetAllFlags(req.GetSelector())
selectorExpression := req.GetSelector()
selector := store.NewSelector(selectorExpression)
flags, _, err := s.store.GetAll(ctx, &selector)
if err != nil {
s.log.Error(fmt.Sprintf("error retrieving flags from store: %v", err))
return nil, status.Error(codes.Internal, "error retrieving flags from store")
}
flagsString, err := json.Marshal(flags)
if err != nil {
return nil, err
}
return &syncv1.FetchAllFlagsResponse{
FlagConfiguration: flags,
FlagConfiguration: string(flagsString),
}, nil
}
@ -97,13 +107,13 @@ func (s syncHandler) FetchAllFlags(_ context.Context, req *syncv1.FetchAllFlagsR
func (s syncHandler) GetMetadata(_ context.Context, _ *syncv1.GetMetadataRequest) (
*syncv1.GetMetadataResponse, error,
) {
if s.disableSyncMetadata {
return nil, status.Error(codes.Unimplemented, "metadata endpoint disabled")
}
metadataSrc := make(map[string]any)
for k, v := range s.contextValues {
metadataSrc[k] = v
}
if sources := s.mux.SourcesAsMetadata(); sources != "" {
metadataSrc["sources"] = sources
}
metadata, err := structpb.NewStruct(metadataSrc)
if err != nil {

View File

@ -22,21 +22,18 @@ func TestSyncHandler_SyncFlags(t *testing.T) {
wantMetadata map[string]any
}{
{
name: "with sources and context",
sources: []string{"A, B, C"},
name: "with sources and context",
contextValues: map[string]any{
"env": "prod",
"region": "us-west",
},
wantMetadata: map[string]any{
"sources": "A, B, C",
"env": "prod",
"region": "us-west",
"env": "prod",
"region": "us-west",
},
},
{
name: "with empty sources",
sources: []string{},
name: "with empty sources",
contextValues: map[string]any{
"env": "dev",
},
@ -46,62 +43,59 @@ func TestSyncHandler_SyncFlags(t *testing.T) {
},
{
name: "with empty context",
sources: []string{"A,B,C"},
contextValues: map[string]any{},
wantMetadata: map[string]any{
"sources": "A,B,C",
},
wantMetadata: map[string]any{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Shared handler for testing both GetMetadata & SyncFlags methods
flagStore := store.NewFlags()
mp, err := NewMux(flagStore, tt.sources)
require.NoError(t, err)
for _, disableSyncMetadata := range []bool{true, false} {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Shared handler for testing both GetMetadata & SyncFlags methods
flagStore, err := store.NewStore(logger.NewLogger(nil, false), tt.sources)
require.NoError(t, err)
handler := syncHandler{
mux: mp,
contextValues: tt.contextValues,
log: logger.NewLogger(nil, false),
}
handler := syncHandler{
store: flagStore,
contextValues: tt.contextValues,
log: logger.NewLogger(nil, false),
disableSyncMetadata: disableSyncMetadata,
}
// Test getting metadata from `GetMetadata` (deprecated)
// remove when `GetMetadata` is full removed and deprecated
metaResp, err := handler.GetMetadata(context.Background(), &syncv1.GetMetadataRequest{})
require.NoError(t, err)
respMetadata := metaResp.GetMetadata().AsMap()
assert.Equal(t, tt.wantMetadata, respMetadata)
// Test metadata from sync_context
stream := &mockSyncFlagsServer{
ctx: context.Background(),
mu: sync.Mutex{},
respReady: make(chan struct{}, 1),
}
go func() {
err := handler.SyncFlags(&syncv1.SyncFlagsRequest{}, stream)
assert.NoError(t, err)
}()
select {
case <-stream.respReady:
syncResp := stream.GetLastResponse()
assert.NotNil(t, syncResp)
syncMetadata := syncResp.GetSyncContext().AsMap()
assert.Equal(t, tt.wantMetadata, syncMetadata)
// Check the two metadatas are equal
// Test getting metadata from `GetMetadata` (deprecated)
// remove when `GetMetadata` is full removed and deprecated
assert.Equal(t, respMetadata, syncMetadata)
case <-time.After(time.Second):
t.Fatal("timeout waiting for response")
}
metaResp, err := handler.GetMetadata(context.Background(), &syncv1.GetMetadataRequest{})
if !disableSyncMetadata {
require.NoError(t, err)
respMetadata := metaResp.GetMetadata().AsMap()
assert.Equal(t, tt.wantMetadata, respMetadata)
} else {
assert.NotNil(t, err)
}
})
// Test metadata from sync_context
stream := &mockSyncFlagsServer{
ctx: context.Background(),
mu: sync.Mutex{},
respReady: make(chan struct{}, 1),
}
go func() {
err := handler.SyncFlags(&syncv1.SyncFlagsRequest{}, stream)
assert.NoError(t, err)
}()
select {
case <-stream.respReady:
syncResp := stream.GetLastResponse()
assert.NotNil(t, syncResp)
syncMetadata := syncResp.GetSyncContext().AsMap()
assert.Equal(t, tt.wantMetadata, syncMetadata)
case <-time.After(time.Second):
t.Fatal("timeout waiting for response")
}
})
}
}
}

View File

@ -1,213 +0,0 @@
package sync
import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"
"sync"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/store"
)
//nolint:errchkjson
var emptyConfigBytes, _ = json.Marshal(map[string]map[string]string{
"flags": {},
})
// Multiplexer abstract subscription handling and storage processing.
// Flag configurations will be lazy loaded using reFill logic upon the calls to publish.
type Multiplexer struct {
store *store.State
sources []string
subs map[interface{}]subscription // subscriptions on all sources
selectorSubs map[string]map[interface{}]subscription // source specific subscriptions
allFlags string // pre-calculated all flags in store as a string
selectorFlags map[string]string // pre-calculated selector scoped flags in store as strings
mu sync.RWMutex
}
type subscription struct {
id interface{}
channel chan payload
}
type payload struct {
flags string
}
// NewMux creates a new sync multiplexer
func NewMux(store *store.State, sources []string) (*Multiplexer, error) {
m := &Multiplexer{
store: store,
sources: sources,
subs: map[interface{}]subscription{},
selectorSubs: map[string]map[interface{}]subscription{},
selectorFlags: map[string]string{},
}
return m, m.reFill()
}
// Register a subscription
func (r *Multiplexer) Register(id interface{}, source string, con chan payload) error {
r.mu.Lock()
defer r.mu.Unlock()
if source != "" && !slices.Contains(r.sources, source) {
return fmt.Errorf("no flag watcher setup for source %s", source)
}
var initSync string
if source == "" {
// subscribe for flags from all source
r.subs[id] = subscription{
id: id,
channel: con,
}
initSync = r.allFlags
} else {
// subscribe for specific source
s, ok := r.selectorSubs[source]
if ok {
s[id] = subscription{
id: id,
channel: con,
}
} else {
r.selectorSubs[source] = map[interface{}]subscription{
id: {
id: id,
channel: con,
},
}
}
initSync = r.selectorFlags[source]
}
// Initial sync
con <- payload{flags: initSync}
return nil
}
// Publish sync updates to subscriptions
func (r *Multiplexer) Publish() error {
r.mu.Lock()
defer r.mu.Unlock()
// perform a refill prior to publishing
err := r.reFill()
if err != nil {
return err
}
// push to all source subs
for _, sub := range r.subs {
sub.channel <- payload{r.allFlags}
}
// push to selector subs
for source, flags := range r.selectorFlags {
for _, s := range r.selectorSubs[source] {
s.channel <- payload{flags}
}
}
return nil
}
// Unregister a subscription
func (r *Multiplexer) Unregister(id interface{}, selector string) {
r.mu.Lock()
defer r.mu.Unlock()
var from map[interface{}]subscription
if selector == "" {
from = r.subs
} else {
from = r.selectorSubs[selector]
}
delete(from, id)
}
// GetAllFlags per specific source
func (r *Multiplexer) GetAllFlags(source string) (string, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if source == "" {
return r.allFlags, nil
}
if !slices.Contains(r.sources, source) {
return "", fmt.Errorf("no flag watcher setup for source %s", source)
}
return r.selectorFlags[source], nil
}
// SourcesAsMetadata returns all known sources, comma separated to be used as service metadata
func (r *Multiplexer) SourcesAsMetadata() string {
r.mu.RLock()
defer r.mu.RUnlock()
return strings.Join(r.sources, ",")
}
// reFill local configuration values
func (r *Multiplexer) reFill() error {
clear(r.selectorFlags)
// start all sources with empty config
for _, source := range r.sources {
r.selectorFlags[source] = string(emptyConfigBytes)
}
all, metadata, err := r.store.GetAll(context.Background())
if err != nil {
return fmt.Errorf("error retrieving flags from the store: %w", err)
}
bytes, err := json.Marshal(map[string]interface{}{"flags": all, "metadata": metadata})
if err != nil {
return fmt.Errorf("error marshalling: %w", err)
}
r.allFlags = string(bytes)
collector := map[string]map[string]model.Flag{}
for key, flag := range all {
c, ok := collector[flag.Source]
if ok {
c[key] = flag
} else {
collector[flag.Source] = map[string]model.Flag{
key: flag,
}
}
}
// for all flags, sort them into their correct selector
for source, flags := range collector {
// store the corresponding metadata
metadata := r.store.GetMetadataForSource(source)
bytes, err := json.Marshal(map[string]interface{}{"flags": flags, "metadata": metadata})
if err != nil {
return fmt.Errorf("unable to marshal flags: %w", err)
}
r.selectorFlags[source] = string(bytes)
}
return nil
}

View File

@ -1,261 +0,0 @@
package sync
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
const emptyConfigString = "{\"flags\":{}}"
func TestRegistration(t *testing.T) {
// given
mux, err := NewMux(getSimpleFlagStore())
if err != nil {
t.Fatal("error during flag extraction")
return
}
tests := []struct {
testName string
id interface{}
source string
flagStringValidator func(flagString string, testSource string, testName string)
connection chan payload
expectError bool
}{
{
testName: "subscribe to all flags",
id: context.Background(),
connection: make(chan payload, 1),
},
{
testName: "subscribe to source A",
id: context.Background(),
source: "A",
flagStringValidator: func(flagString string, testSource string, testName string) {
assert.Contains(t, flagString, fmt.Sprintf("\"source\":\"%s\"", testSource))
},
connection: make(chan payload, 1),
},
{
testName: "subscribe to source B",
id: context.Background(),
source: "B",
flagStringValidator: func(flagString string, testSource string, testName string) {
assert.Contains(t, flagString, fmt.Sprintf("\"source\":\"%s\"", testSource))
},
connection: make(chan payload, 1),
},
{
testName: "subscribe to empty",
id: context.Background(),
source: "C",
connection: make(chan payload, 1),
flagStringValidator: func(flagString string, testSource string, testName string) {
assert.Equal(t, flagString, emptyConfigString)
},
expectError: false,
},
{
testName: "subscribe to non-existing",
id: context.Background(),
source: "D",
connection: make(chan payload, 1),
expectError: true,
},
}
// validate registration
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
// when
err := mux.Register(test.id, test.source, test.connection)
// then
if !test.expectError && err != nil {
t.Fatal("expected no errors, but got error")
}
if test.expectError && err != nil {
// pass
return
}
// validate subscription
var initSync payload
select {
case <-time.After(2 * time.Second):
t.Fatal("data sync did not complete for initial sync within an acceptable timeframe")
case initSync = <-test.connection:
break
}
if test.flagStringValidator != nil {
test.flagStringValidator(initSync.flags, test.source, test.testName)
}
})
}
}
func TestUpdateAndRemoval(t *testing.T) {
// given
mux, err := NewMux(getSimpleFlagStore())
if err != nil {
t.Fatal("error during flag extraction")
return
}
identifier := context.Background()
channel := make(chan payload, 1)
err = mux.Register(identifier, "", channel)
if err != nil {
t.Fatal("error during subscription registration")
return
}
select {
case <-time.After(2 * time.Second):
t.Fatal("data sync did not complete for initial sync within an acceptable timeframe")
case <-channel:
break
}
// when - updates are triggered
err = mux.Publish()
if err != nil {
t.Fatal("failure to trigger update request on multiplexer")
return
}
// then
select {
case <-time.After(2 * time.Second):
t.Fatal("data sync did not complete for initial sync within an acceptable timeframe")
case <-channel:
break
}
// when - subscription removed & update triggered
mux.Unregister(identifier, "")
err = mux.Publish()
if err != nil {
t.Fatal("failure to trigger update request on multiplexer")
return
}
// then
select {
case <-time.After(2 * time.Second):
break
case <-channel:
t.Fatal("expected no sync but got an update as removal was not performed")
}
}
func TestGetAllFlags(t *testing.T) {
// given
mux, err := NewMux(getSimpleFlagStore())
if err != nil {
t.Fatal("error during flag extraction")
return
}
// when - get all with open scope
flagConfig, err := mux.GetAllFlags("")
if err != nil {
t.Fatal("error when retrieving all flags")
return
}
if len(flagConfig) == 0 {
t.Fatal("expected no empty flags")
return
}
// when - get all with a scope
flagConfig, err = mux.GetAllFlags("A")
if err != nil {
t.Fatal("error when retrieving all flags")
return
}
if len(flagConfig) == 0 || !strings.Contains(flagConfig, fmt.Sprintf("\"source\":\"%s\"", "A")) {
t.Fatal("expected flags to be scoped")
return
}
// when - get all for a flagless-scope
flagConfig, err = mux.GetAllFlags("C")
if err != nil {
t.Fatal("error when retrieving all flags")
return
}
assert.Equal(t, flagConfig, emptyConfigString)
}
func TestGetAllFlagsMetadata(t *testing.T) {
// given
mux, err := NewMux(getSimpleFlagStore())
if err != nil {
t.Fatal("error during flag extraction")
return
}
// when - get all with open scope
flagConfig, err := mux.GetAllFlags("")
if err != nil {
t.Fatal("error when retrieving all flags")
return
}
if len(flagConfig) == 0 {
t.Fatal("expected no empty flags")
return
}
if !strings.Contains(flagConfig, "\"keyA\":\"valueA\"") {
t.Fatal("expected unique metadata key for A to be present")
return
}
if !strings.Contains(flagConfig, "\"keyB\":\"valueB\"") {
t.Fatal("expected unique metadata key for B to be present")
return
}
// duplicated keys are removed
if strings.Contains(flagConfig, "\"keyDuped\":\"value\"") {
t.Fatal("expected duplicated metadata key NOT to be present")
return
}
// when - get all with a scope
flagConfig, err = mux.GetAllFlags("A")
if err != nil {
t.Fatal("error when retrieving all flags")
return
}
if len(flagConfig) == 0 {
t.Fatal("expected no empty flags")
return
}
if !strings.Contains(flagConfig, "\"keyA\":\"valueA\"") {
t.Fatal("expected unique metadata key to be present")
return
}
if !strings.Contains(flagConfig, "\"keyDuped\":\"value\"") {
t.Fatal("expected duplicated metadata key to be present")
return
}
}

View File

@ -21,25 +21,25 @@ type ISyncService interface {
Start(context.Context) error
// Emit updates for sync listeners
Emit(isResync bool, source string)
Emit(source string)
}
type SvcConfigurations struct {
Logger *logger.Logger
Port uint16
Sources []string
Store *store.State
ContextValues map[string]any
CertPath string
KeyPath string
SocketPath string
StreamDeadline time.Duration
Logger *logger.Logger
Port uint16
Sources []string
Store *store.Store
ContextValues map[string]any
CertPath string
KeyPath string
SocketPath string
StreamDeadline time.Duration
DisableSyncMetadata bool
}
type Service struct {
listener net.Listener
logger *logger.Logger
mux *Multiplexer
server *grpc.Server
startupTracker syncTracker
@ -65,7 +65,6 @@ func loadTLSCredentials(certPath string, keyPath string) (credentials.TransportC
func NewSyncService(cfg SvcConfigurations) (*Service, error) {
var err error
l := cfg.Logger
mux, err := NewMux(cfg.Store, cfg.Sources)
if err != nil {
return nil, fmt.Errorf("error initializing multiplexer: %w", err)
}
@ -82,10 +81,11 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
}
syncv1grpc.RegisterFlagSyncServiceServer(server, &syncHandler{
mux: mux,
log: l,
contextValues: cfg.ContextValues,
deadline: cfg.StreamDeadline,
store: cfg.Store,
log: l,
contextValues: cfg.ContextValues,
deadline: cfg.StreamDeadline,
disableSyncMetadata: cfg.DisableSyncMetadata,
})
var lis net.Listener
@ -103,7 +103,6 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
return &Service{
listener: lis,
logger: l,
mux: mux,
server: server,
startupTracker: syncTracker{
sources: slices.Clone(cfg.Sources),
@ -149,16 +148,8 @@ func (s *Service) Start(ctx context.Context) error {
return nil
}
func (s *Service) Emit(isResync bool, source string) {
func (s *Service) Emit(source string) {
s.startupTracker.trackAndRemove(source)
if !isResync {
err := s.mux.Publish()
if err != nil {
s.logger.Warn(fmt.Sprintf("error while publishing sync streams: %v", err))
return
}
}
}
func (s *Service) shutdown() {

View File

@ -3,16 +3,16 @@ package sync
import (
"context"
"fmt"
"github.com/open-feature/flagd/core/pkg/store"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
"reflect"
"sort"
"strings"
"testing"
"time"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"github.com/open-feature/flagd/core/pkg/logger"
@ -36,143 +36,129 @@ func TestSyncServiceEndToEnd(t *testing.T) {
{title: "with unix socket connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "/tmp/flagd", tls: false, wantStartErr: false},
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
// given
port := 18016
flagStore, sources := getSimpleFlagStore()
for _, disableSyncMetadata := range []bool{true, false} {
for _, tc := range testCases {
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
// given
port := 18016
flagStore, sources := getSimpleFlagStore(t)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
service, doneChan, err := createAndStartSyncService(port, sources, flagStore, tc.certPath, tc.keyPath, tc.socketPath, ctx, 0)
_, doneChan, err := createAndStartSyncService(
port,
sources,
flagStore,
tc.certPath,
tc.keyPath,
tc.socketPath,
ctx,
0,
disableSyncMetadata,
)
if tc.wantStartErr {
if err == nil {
t.Fatal("expected error creating the service!")
}
return
} else if err != nil {
t.Fatal("unexpected error creating the service: %w", err)
return
}
// when - derive a client for sync service
serviceClient := getSyncClient(t, tc.clientCertPath, tc.socketPath, tc.tls, port, ctx)
// then
// sync flags request
flags, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{})
if err != nil {
t.Fatal(fmt.Printf("error from sync request: %v", err))
return
}
syncRsp, err := flags.Recv()
if err != nil {
t.Fatal(fmt.Printf("stream error: %v", err))
return
}
if len(syncRsp.GetFlagConfiguration()) == 0 {
t.Error("expected non empty sync response, but got empty")
}
// checks sync context actually set
syncContext := syncRsp.GetSyncContext()
if syncContext == nil {
t.Fatal("expected sync_context in SyncFlagsResponse, but got nil")
}
syncAsMap := syncContext.AsMap()
if syncAsMap["sources"] == nil {
t.Fatalf("expected sources in sync_context, but got nil")
}
sourcesStr := syncAsMap["sources"].(string)
sourcesArray := strings.Split(sourcesStr, ",")
sort.Strings(sourcesArray)
expectedSources := []string{"A", "B", "C"}
if !reflect.DeepEqual(sourcesArray, expectedSources) {
t.Fatalf("sources entry in sync_context does not match expected: got %v, want %v", sourcesArray, expectedSources)
}
// validate emits
dataReceived := make(chan interface{})
go func() {
_, err := flags.Recv()
if err != nil {
if tc.wantStartErr {
if err == nil {
t.Fatal("expected error creating the service!")
}
return
} else if err != nil {
t.Fatal("unexpected error creating the service: %w", err)
return
}
dataReceived <- nil
}()
// when - derive a client for sync service
serviceClient := getSyncClient(t, tc.clientCertPath, tc.socketPath, tc.tls, port, ctx)
// Emit as a resync
service.Emit(true, "A")
// then
select {
case <-dataReceived:
t.Fatal("expected no data as this is a resync")
case <-time.After(1 * time.Second):
break
}
// sync flags request
flags, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{})
if err != nil {
t.Fatal(fmt.Printf("error from sync request: %v", err))
return
}
// Emit as a resync
service.Emit(false, "A")
syncRsp, err := flags.Recv()
if err != nil {
t.Fatal(fmt.Printf("stream error: %v", err))
return
}
select {
case <-dataReceived:
break
case <-time.After(1 * time.Second):
t.Fatal("expected data but timeout waiting for sync")
}
if len(syncRsp.GetFlagConfiguration()) == 0 {
t.Error("expected non empty sync response, but got empty")
}
// fetch all flags
allRsp, err := serviceClient.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{})
if err != nil {
t.Fatal(fmt.Printf("fetch all error: %v", err))
return
}
// checks sync context actually set
syncContext := syncRsp.GetSyncContext()
if syncContext == nil {
t.Fatal("expected sync_context in SyncFlagsResponse, but got nil")
}
if allRsp.GetFlagConfiguration() != syncRsp.GetFlagConfiguration() {
t.Errorf("expected both sync and fetch all responses to be same, but got %s from sync & %s from fetch all",
syncRsp.GetFlagConfiguration(), allRsp.GetFlagConfiguration())
}
// validate emits
dataReceived := make(chan interface{})
go func() {
_, err := flags.Recv()
if err != nil {
return
}
// metadata request
metadataRsp, err := serviceClient.GetMetadata(ctx, &v1.GetMetadataRequest{})
if err != nil {
t.Fatal(fmt.Printf("metadata error: %v", err))
return
}
dataReceived <- nil
}()
asMap := metadataRsp.GetMetadata().AsMap()
// make a change
flagStore.Update(testSource1, testSource1Flags, model.Metadata{
"keyDuped": "value",
"keyA": "valueA",
})
// expect `sources` to be present
if asMap["sources"] == nil {
t.Fatal("expected sources entry in the metadata, but got nil")
}
select {
case <-dataReceived:
break
case <-time.After(1 * time.Second):
t.Fatal("expected data but timeout waiting for sync")
}
if asMap["sources"] != "A,B,C" {
t.Fatal("incorrect sources entry in metadata")
}
// fetch all flags
allRsp, err := serviceClient.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{})
if err != nil {
t.Fatal(fmt.Printf("fetch all error: %v", err))
return
}
// validate shutdown from context cancellation
go func() {
cancelFunc()
}()
if allRsp.GetFlagConfiguration() != syncRsp.GetFlagConfiguration() {
t.Errorf("expected both sync and fetch all responses to be same, but got %s from sync & %s from fetch all",
syncRsp.GetFlagConfiguration(), allRsp.GetFlagConfiguration())
}
select {
case <-doneChan:
// exit successful
return
case <-time.After(2 * time.Second):
t.Fatal("service did not exist within sufficient timeframe")
}
})
// metadata request
metadataRsp, err := serviceClient.GetMetadata(ctx, &v1.GetMetadataRequest{})
if disableSyncMetadata {
if err == nil {
t.Fatal(fmt.Printf("getMetadata disabled, error should not be nil"))
return
}
} else {
asMap := metadataRsp.GetMetadata().AsMap()
assert.NotNil(t, asMap, "expected metadata to be non-nil")
}
// validate shutdown from context cancellation
go func() {
cancelFunc()
}()
select {
case <-doneChan:
// exit successful
return
case <-time.After(2 * time.Second):
t.Fatal("service did not exist within sufficient timeframe")
}
})
}
}
}
@ -190,7 +176,7 @@ func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
// given
port := 18016
flagStore, sources := getSimpleFlagStore()
flagStore, sources := getSimpleFlagStore(t)
certPath := "./test-cert/server-cert.pem"
keyPath := "./test-cert/server-key.pem"
socketPath := ""
@ -198,7 +184,7 @@ func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
_, _, err := createAndStartSyncService(port, sources, flagStore, certPath, keyPath, socketPath, ctx, tc.deadline)
_, _, err := createAndStartSyncService(port, sources, flagStore, certPath, keyPath, socketPath, ctx, tc.deadline, false)
if err != nil {
t.Fatal("error creating sync service")
}
@ -256,16 +242,27 @@ func TestSyncServiceDeadlineEndToEnd(t *testing.T) {
}
}
func createAndStartSyncService(port int, sources []string, store *store.State, certPath string, keyPath string, socketPath string, ctx context.Context, deadline time.Duration) (*Service, chan interface{}, error) {
func createAndStartSyncService(
port int,
sources []string,
store *store.Store,
certPath string,
keyPath string,
socketPath string,
ctx context.Context,
deadline time.Duration,
disableSyncMetadata bool,
) (*Service, chan interface{}, error) {
service, err := NewSyncService(SvcConfigurations{
Logger: logger.NewLogger(nil, false),
Port: uint16(port),
Sources: sources,
Store: store,
CertPath: certPath,
KeyPath: keyPath,
SocketPath: socketPath,
StreamDeadline: deadline,
Logger: logger.NewLogger(nil, false),
Port: uint16(port),
Sources: sources,
Store: store,
CertPath: certPath,
KeyPath: keyPath,
SocketPath: socketPath,
StreamDeadline: deadline,
DisableSyncMetadata: disableSyncMetadata,
})
if err != nil {
return nil, nil, err
@ -279,7 +276,7 @@ func createAndStartSyncService(port int, sources []string, store *store.State, c
}()
// trigger manual emits matching sources, so that service can start
for _, source := range sources {
service.Emit(false, source)
service.Emit(source)
}
return service, doneChan, err
}

View File

@ -5,46 +5,57 @@ import (
"crypto/x509"
"fmt"
"os"
"testing"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/store"
"google.golang.org/grpc/credentials"
)
// getSimpleFlagStore returns a flag store pre-filled with flags from sources A & B & C, which C empty
func getSimpleFlagStore() (*store.State, []string) {
variants := map[string]any{
"true": true,
"false": false,
}
flagStore := store.NewFlags()
flagStore.Set("flagA", model.Flag{
var testSource1 = "testSource1"
var testSource2 = "testSource2"
var testVariants = map[string]any{
"true": true,
"false": false,
}
var testSource1Flags = map[string]model.Flag{
"flagA": {
State: "ENABLED",
DefaultVariant: "false",
Variants: variants,
Source: "A",
})
flagStore.Set("flagB", model.Flag{
Variants: testVariants,
},
}
var testSource2Flags = map[string]model.Flag{
"flagB": {
State: "ENABLED",
DefaultVariant: "true",
Variants: variants,
Source: "B",
})
Variants: testVariants,
},
}
flagStore.MetadataPerSource["A"] = model.Metadata{
// getSimpleFlagStore is a test util which returns a flag store pre-filled with flags from sources testSource1 and testSource2.
func getSimpleFlagStore(t testing.TB) (*store.Store, []string) {
t.Helper()
sources := []string{testSource1, testSource2}
flagStore, err := store.NewStore(logger.NewLogger(nil, false), sources)
if err != nil {
t.Fatalf("error creating flag store: %v", err)
}
flagStore.Update(testSource1, testSource1Flags, model.Metadata{
"keyDuped": "value",
"keyA": "valueA",
}
})
flagStore.MetadataPerSource["B"] = model.Metadata{
flagStore.Update(testSource2, testSource2Flags, model.Metadata{
"keyDuped": "value",
"keyB": "valueB",
}
})
return flagStore, []string{"A", "B", "C"}
return flagStore, sources
}
func loadTLSClientCredentials(certPath string) (credentials.TransportCredentials, error) {

View File

@ -68,6 +68,7 @@ func (m Middleware) Measure(ctx context.Context, handlerID string, reporter Repo
hid,
reporter.Method(),
code,
reporter.Scheme(),
)
m.cfg.MetricRecorder.InFlightRequestStart(ctx, httpAttrs)
@ -112,6 +113,7 @@ type Reporter interface {
URLPath() string
StatusCode() int
BytesWritten() int64
Scheme() string
}
type stdReporter struct {
@ -127,6 +129,13 @@ func (s *stdReporter) StatusCode() int { return s.w.statusCode }
func (s *stdReporter) BytesWritten() int64 { return int64(s.w.bytesWritten) }
func (s *stdReporter) Scheme() string {
if s.r.TLS != nil {
return "https"
}
return "http"
}
// responseWriterInterceptor is a simple wrapper to intercept set data on a
// ResponseWriter.
type responseWriterInterceptor struct {

View File

@ -190,6 +190,10 @@ func (m *MockReporter) BytesWritten() int64 {
return m.Bytes
}
func (m *MockReporter) Scheme() string {
return "http"
}
func (m *MockReporter) URLCalled() bool { return m.urlCalled }
func (m *MockReporter) MethodCalled() bool { return m.methodCalled }
func (m *MockReporter) StatusCalled() bool { return m.statusCalled }

View File

@ -16,13 +16,13 @@ make build
then run the `flagd` binary
```shell
./bin/flagd start -f file:test-harness/symlink_testing-flags.json
make flagd-integration-test-harness
```
and finally run
```shell
make integration-test
make flagd-integration-test
```
## TLS