feat!: remove sync.Type (#1691)

The PR removes the obsolete `sync.Type`. This was intended to be used to
support partial updates from sync sources, but it's never been used and
it's lagging behind on features and would not work fully if implemented
by a source (`metadata` is not properly implemented). Moreover, we're
not convinced the value is worth the complexity it adds.

Specifically:

- removes `sync.Type` and all references
- removes tests for `sync.Type` and updates assertions for unrelated
tests which previously asserted on `sync.Type` (now we use the
payload/source in assertions)
- (unrelated) updates `CONTRIBUTING.md` to include a bunch of helpful
manual testing steps

Note that this is a breaking change, but only for the `flagd/core`
library, NOT for flagd or any sync source itself in terms of their
behavior. Since there was no change in `flagd/`, this will not show up
in the CHANGELOG.

Resolves: https://github.com/open-feature/flagd/issues/1678

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
This commit is contained in:
Todd Baert 2025-07-21 08:01:56 -04:00 committed by GitHub
parent 81855d76f9
commit ac647e0656
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 127 additions and 618 deletions

View File

@ -8,6 +8,24 @@ TLDR: be respectful.
Any contributions are expected to include unit tests.
These can be validated with `make test` or the automated github workflow will run them on PR creation.
## Development
### Prerequisites
You'll need:
- Go
- make
- docker
You'll want:
- curl (for calling HTTP endpoints)
- [grpcurl](https://github.com/fullstorydev/grpcurl) (for making gRPC calls)
- jq (for pretty printing responses)
### Workspace Initialization
This project uses a go workspace, to setup the project run
```shell
@ -22,6 +40,63 @@ The project uses remote buf packages, changing the remote generation source will
export GOPRIVATE=buf.build/gen/go
```
### Manual testing
flagd has a number of interfaces (you can read more about than at [flagd.dev](https://flagd.dev/)) which can be used to evaluate flags, or deliver flag configurations so that they can be evaluated by _in-process_ providers.
You can manually test this functionality by starting flagd (from the flagd/ directory) with `go run main.go start -f file:../config/samples/example_flags.flagd.json`.
NOTE: you will need `go, curl`
#### Remote single flag evaluation via HTTP1.1/Connect
```sh
# evaluates a single boolean flag
curl -X POST -d '{"flagKey":"myBoolFlag","context":{}}' -H "Content-Type: application/json" "http://localhost:8013/flagd.evaluation.v1.Service/ResolveBoolean" | jq
```
#### Remote single flag evaluation via HTTP1.1/OFREP
```sh
# evaluates a single boolean flag
curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags/myBoolFlag' | jq
```
#### Remote single flag evaluation via gRPC
```sh
# evaluates a single boolean flag
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{"flagKey":"myBoolFlag"}' localhost:8013 flagd.evaluation.v1.Service/ResolveBoolean | jq
```
#### Remote bulk evaluation via via HTTP1.1/OFREP
```sh
# evaluates flags in bulk
curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags' | jq
```
#### Remote bulk evaluation via gRPC
```sh
# evaluates flags in bulk
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/ResolveAll | jq
```
#### Flag configuration fetch via gRPC
```sh
# sends back a representation of all flags
grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/FetchAllFlags | jq
```
#### Flag synchronization stream via gRPC
```sh
# will open a persistent stream which sends flag changes when the watched source is modified
grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
```
## DCO Sign-Off
A DCO (Developer Certificate of Origin) sign-off is a line placed at the end of

View File

@ -103,8 +103,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
_, span := je.jsonEvalTracer.Start(
context.Background(),
"flagSync",
trace.WithAttributes(attribute.String("feature_flag.source", payload.Source)),
trace.WithAttributes(attribute.String("feature_flag.sync_type", payload.String())))
trace.WithAttributes(attribute.String("feature_flag.source", payload.Source)))
defer span.End()
var definition Definition
@ -119,19 +118,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
var events map[string]interface{}
var reSync bool
// TODO: We do not handle metadata in ADD/UPDATE operations. These are only relevant for grpc sync implementations.
switch payload.Type {
case sync.ALL:
events, reSync = je.store.Merge(je.Logger, payload.Source, payload.Selector, definition.Flags, definition.Metadata)
case sync.ADD:
events = je.store.Add(je.Logger, payload.Source, payload.Selector, definition.Flags)
case sync.UPDATE:
events = je.store.Update(je.Logger, payload.Source, payload.Selector, definition.Flags)
case sync.DELETE:
events = je.store.DeleteFlags(je.Logger, payload.Source, definition.Flags)
default:
return nil, false, fmt.Errorf("unsupported sync type: %d", payload.Type)
}
events, reSync = je.store.Update(je.Logger, payload.Source, payload.Selector, definition.Flags, definition.Metadata)
// Number of events correlates to the number of flags changed through this sync, record it
span.SetAttributes(attribute.Int("feature_flag.change_count", len(events)))

View File

@ -127,7 +127,7 @@ const UndefinedDefaultWithTargetting = `{
const (
FlagSetID = "testSetId"
Version = "v33"
ValidFlag = "validFlag"
ValidFlag = "validFlag"
MissingFlag = "missingFlag"
StaticBoolFlag = "staticBoolFlag"
StaticBoolValue = true
@ -956,7 +956,7 @@ func TestResolveAsAnyValue(t *testing.T) {
func TestResolve_DefaultVariant(t *testing.T) {
tests := []struct {
flags string
flags string
flagKey string
context map[string]interface{}
reason string
@ -1050,7 +1050,6 @@ func TestSetState_DefaultVariantValidation(t *testing.T) {
func TestState_Evaluator(t *testing.T) {
tests := map[string]struct {
inputState string
inputSyncType sync.Type
expectedOutputState string
expectedError bool
expectedResync bool
@ -1086,7 +1085,6 @@ func TestState_Evaluator(t *testing.T) {
}
}
`,
inputSyncType: sync.ALL,
expectedOutputState: `
{
"flags": {
@ -1147,7 +1145,6 @@ func TestState_Evaluator(t *testing.T) {
}
}
`,
inputSyncType: sync.ALL,
expectedOutputState: `
{
"flags": {
@ -1204,7 +1201,6 @@ func TestState_Evaluator(t *testing.T) {
}
}
`,
inputSyncType: sync.ALL,
expectedError: true,
},
"invalid targeting": {
@ -1258,7 +1254,6 @@ func TestState_Evaluator(t *testing.T) {
"flagSources":null
}
`,
inputSyncType: sync.ALL,
expectedError: false,
expectedOutputState: `
{
@ -1341,40 +1336,8 @@ func TestState_Evaluator(t *testing.T) {
}
}
`,
inputSyncType: sync.ALL,
expectedError: true,
},
"unexpected sync type": {
inputState: `
{
"flags": {
"fibAlgo": {
"variants": {
"recursive": "recursive",
"memo": "memo",
"loop": "loop",
"binet": "binet"
},
"defaultVariant": "recursive",
"state": "ENABLED",
"targeting": {
"if": [
{
"$ref": "emailWithFaas"
}, "binet", null
]
}
}
},
"$evaluators": {
"emailWithFaas": ""
}
}
`,
inputSyncType: 999,
expectedError: true,
expectedResync: false,
},
}
for name, tt := range tests {
@ -1423,11 +1386,9 @@ func TestState_Evaluator(t *testing.T) {
func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
tests := map[string]struct {
dataSyncType sync.Type
flagResolution func(evaluator *evaluator.JSON) error
}{
"Add_ResolveAllValues": {
dataSyncType: sync.ADD,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil {
@ -1437,7 +1398,6 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
},
},
"Update_ResolveAllValues": {
dataSyncType: sync.UPDATE,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil {
@ -1447,7 +1407,6 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
},
},
"Delete_ResolveAllValues": {
dataSyncType: sync.DELETE,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil {
@ -1457,35 +1416,30 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
},
},
"Add_ResolveBooleanValue": {
dataSyncType: sync.ADD,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveBooleanValue(context.TODO(), "", StaticBoolFlag, nil)
return err
},
},
"Update_ResolveStringValue": {
dataSyncType: sync.UPDATE,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveBooleanValue(context.TODO(), "", StaticStringValue, nil)
return err
},
},
"Delete_ResolveIntValue": {
dataSyncType: sync.DELETE,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveIntValue(context.TODO(), "", StaticIntFlag, nil)
return err
},
},
"Add_ResolveFloatValue": {
dataSyncType: sync.ADD,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveFloatValue(context.TODO(), "", StaticFloatFlag, nil)
return err
},
},
"Update_ResolveObjectValue": {
dataSyncType: sync.UPDATE,
flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveObjectValue(context.TODO(), "", StaticObjectFlag, nil)
return err
@ -1497,7 +1451,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
t.Run(name, func(t *testing.T) {
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Type: sync.ADD})
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags})
if err != nil {
t.Fatal(err)
}
@ -1520,7 +1474,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
errChan <- nil
return
default:
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Type: tt.dataSyncType})
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags})
if err != nil {
errChan <- err
return

View File

@ -146,112 +146,8 @@ func (f *State) Add(logger *logger.Logger, source string, selector string, flags
return notifications
}
// Update existing flags from source.
func (f *State) Update(logger *logger.Logger, source string, selector string, flags map[string]model.Flag,
) map[string]interface{} {
notifications := map[string]interface{}{}
for k, flag := range flags {
storedFlag, _, ok := f.Get(context.Background(), k)
if !ok {
logger.Warn(
fmt.Sprintf("failed to update the flag, flag with key %s from source %s does not exist.",
k,
source))
continue
}
if !f.hasPriority(storedFlag.Source, source) {
logger.Debug(
fmt.Sprintf(
"not updating: flag %s from source %s does not have priority over %s",
k,
source,
storedFlag.Source,
),
)
continue
}
notifications[k] = map[string]interface{}{
"type": string(model.NotificationUpdate),
"source": source,
}
flag.Source = source
flag.Selector = selector
f.Set(k, flag)
}
return notifications
}
// DeleteFlags matching flags from source.
func (f *State) DeleteFlags(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
logger.Debug(
fmt.Sprintf(
"store resync triggered: delete event from source %s",
source,
),
)
ctx := context.Background()
_, ok := f.MetadataPerSource[source]
if ok {
delete(f.MetadataPerSource, source)
}
notifications := map[string]interface{}{}
if len(flags) == 0 {
allFlags, _, err := f.GetAll(ctx)
if err != nil {
logger.Error(fmt.Sprintf("error while retrieving flags from the store: %v", err))
return notifications
}
for key, flag := range allFlags {
if flag.Source != source {
continue
}
notifications[key] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}
f.Delete(key)
}
}
for k := range flags {
flag, _, ok := f.Get(ctx, k)
if ok {
if !f.hasPriority(flag.Source, source) {
logger.Debug(
fmt.Sprintf(
"not deleting: flag %s from source %s cannot be deleted by %s",
k,
flag.Source,
source,
),
)
continue
}
notifications[k] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}
f.Delete(k)
} else {
logger.Warn(
fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exist.",
k,
source))
}
}
return notifications
}
// Merge provided flags from source with currently stored flags.
// nolint: funlen
func (f *State) Merge(
// Update the flag state with the provided flags.
func (f *State) Update(
logger *logger.Logger,
source string,
selector string,

View File

@ -219,7 +219,7 @@ func TestMergeFlags(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
gotNotifs, resyncRequired := tt.current.Merge(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new, model.Metadata{})
gotNotifs, resyncRequired := tt.current.Update(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new, model.Metadata{})
require.True(t, reflect.DeepEqual(tt.want.Flags, tt.current.Flags))
require.Equal(t, tt.wantNotifs, gotNotifs)
@ -324,222 +324,3 @@ func TestFlags_Add(t *testing.T) {
})
}
}
func TestFlags_Update(t *testing.T) {
mockLogger := logger.NewLogger(nil, false)
mockSource := "source"
mockOverrideSource := "source-2"
type request struct {
source string
selector string
flags map[string]model.Flag
}
tests := []struct {
name string
storedState *State
UpdateRequest request
expectedState *State
expectedNotificationKeys []string
}{
{
name: "Update success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "True"},
},
},
UpdateRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedNotificationKeys: []string{"A"},
},
{
name: "Update multiple success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "True"},
"B": {Source: mockSource, DefaultVariant: "True"},
},
},
UpdateRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
"B": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
"B": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedNotificationKeys: []string{"A", "B"},
},
{
name: "Update success - conflict and override",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "True"},
},
},
UpdateRequest: request{
source: mockOverrideSource,
flags: map[string]model.Flag{
"A": {Source: mockOverrideSource, DefaultVariant: "True"},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockOverrideSource, DefaultVariant: "True"},
},
},
expectedNotificationKeys: []string{"A"},
},
{
name: "Update fail",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
UpdateRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"B": {Source: mockSource},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
expectedNotificationKeys: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := tt.storedState.Update(mockLogger, tt.UpdateRequest.source,
tt.UpdateRequest.selector, tt.UpdateRequest.flags)
require.Equal(t, tt.storedState, tt.expectedState)
for k := range messages {
require.Containsf(t, tt.expectedNotificationKeys, k,
"Message key %s not present in the expected key list", k)
}
})
}
}
func TestFlags_Delete(t *testing.T) {
mockLogger := logger.NewLogger(nil, false)
mockSource := "source"
mockSource2 := "source2"
tests := []struct {
name string
storedState *State
deleteRequest map[string]model.Flag
expectedState *State
expectedNotificationKeys []string
}{
{
name: "Remove success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
deleteRequest: map[string]model.Flag{
"A": {Source: mockSource},
},
expectedState: &State{
Flags: map[string]model.Flag{
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
expectedNotificationKeys: []string{"A"},
},
{
name: "Nothing to remove",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
deleteRequest: map[string]model.Flag{
"C": {Source: mockSource},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
expectedNotificationKeys: []string{},
},
{
name: "Remove all",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
},
deleteRequest: map[string]model.Flag{},
expectedState: &State{
Flags: map[string]model.Flag{
"C": {Source: mockSource2},
},
},
expectedNotificationKeys: []string{"A", "B"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := tt.storedState.DeleteFlags(mockLogger, mockSource, tt.deleteRequest)
require.Equal(t, tt.storedState, tt.expectedState)
for k := range messages {
require.Containsf(t, tt.expectedNotificationKeys, k,
"Message key %s not present in the expected key list", k)
}
})
}
}

View File

@ -104,7 +104,7 @@ func (hs *Sync) sync(ctx context.Context, dataSync chan<- sync.DataSync, skipChe
if !skipCheckingModTime {
hs.lastUpdated = updated
}
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object}
return nil
}

View File

@ -52,7 +52,7 @@ func NewFileSync(uri string, watchType string, logger *logger.Logger) *Sync {
const defaultState = "{}"
func (fs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
fs.sendDataSync(ctx, sync.ALL, dataSync)
fs.sendDataSync(ctx, dataSync)
return nil
}
@ -94,7 +94,7 @@ func (fs *Sync) setReady(val bool) {
//nolint:funlen
func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
defer fs.watcher.Close()
fs.sendDataSync(ctx, sync.ALL, dataSync)
fs.sendDataSync(ctx, dataSync)
fs.setReady(true)
fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI))
for {
@ -108,7 +108,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
fs.Logger.Info(fmt.Sprintf("filepath event: %s %s", event.Name, event.Op.String()))
switch {
case event.Has(fsnotify.Create) || event.Has(fsnotify.Write):
fs.sendDataSync(ctx, sync.ALL, dataSync)
fs.sendDataSync(ctx, dataSync)
case event.Has(fsnotify.Remove):
// K8s exposes config maps as symlinks.
// Updates cause a remove event, we need to re-add the watcher in this case.
@ -116,20 +116,20 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
if err != nil {
// the watcher could not be re-added, so the file must have been deleted
fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error()))
fs.sendDataSync(ctx, sync.DELETE, dataSync)
fs.sendDataSync(ctx, dataSync)
continue
}
// Counterintuitively, remove events are the only meaningful ones seen in K8s.
// K8s handles mounted ConfigMap updates by modifying symbolic links, which is an atomic operation.
// At the point the remove event is fired, we have our new data, so we can send it down the channel.
fs.sendDataSync(ctx, sync.ALL, dataSync)
fs.sendDataSync(ctx, dataSync)
case event.Has(fsnotify.Chmod):
// on linux the REMOVE event will not fire until all file descriptors are closed, this cannot happen
// while the file is being watched, os.Stat is used here to infer deletion
if _, err := os.Stat(fs.URI); errors.Is(err, os.ErrNotExist) {
fs.Logger.Error(fmt.Sprintf("file has been deleted: %s", err.Error()))
fs.sendDataSync(ctx, sync.DELETE, dataSync)
fs.sendDataSync(ctx, dataSync)
}
}
@ -147,14 +147,8 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
}
}
func (fs *Sync) sendDataSync(ctx context.Context, syncType sync.Type, dataSync chan<- sync.DataSync) {
fs.Logger.Debug(fmt.Sprintf("Configuration %s: %s", fs.URI, syncType.String()))
if syncType == sync.DELETE {
// Skip fetching and emit default state to avoid EOF errors
dataSync <- sync.DataSync{FlagData: defaultState, Source: fs.URI, Type: syncType}
return
}
func (fs *Sync) sendDataSync(ctx context.Context, dataSync chan<- sync.DataSync) {
fs.Logger.Debug(fmt.Sprintf("Data sync received for %s", fs.URI))
msg := defaultState
m, err := fs.fetch(ctx)
@ -167,7 +161,7 @@ func (fs *Sync) sendDataSync(ctx context.Context, syncType sync.Type, dataSync c
msg = m
}
dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI, Type: syncType}
dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI}
}
func (fs *Sync) fetch(_ context.Context) (string, error) {

View File

@ -26,7 +26,6 @@ func TestSimpleReSync(t *testing.T) {
expectedDataSync := sync.DataSync{
FlagData: "hello",
Source: source,
Type: sync.ALL,
}
handler := Sync{
URI: source,
@ -59,7 +58,6 @@ func TestSimpleReSync(t *testing.T) {
func TestSimpleSync(t *testing.T) {
readDirName := t.TempDir()
updateDirName := t.TempDir()
deleteDirName := t.TempDir()
tests := map[string]struct {
manipulationFuncs []func(t *testing.T)
expectedDataSync []sync.DataSync
@ -76,7 +74,6 @@ func TestSimpleSync(t *testing.T) {
{
FlagData: fetchFileContents,
Source: fmt.Sprintf("%s/%s", readDirName, fetchFileName),
Type: sync.ALL,
},
},
},
@ -94,35 +91,10 @@ func TestSimpleSync(t *testing.T) {
{
FlagData: fetchFileContents,
Source: fmt.Sprintf("%s/%s", updateDirName, fetchFileName),
Type: sync.ALL,
},
{
FlagData: "new content",
Source: fmt.Sprintf("%s/%s", updateDirName, fetchFileName),
Type: sync.ALL,
},
},
},
"delete-event": {
fetchDirName: deleteDirName,
manipulationFuncs: []func(t *testing.T){
func(t *testing.T) {
writeToFile(t, deleteDirName, fetchFileContents)
},
func(t *testing.T) {
deleteFile(t, deleteDirName, fetchFileName)
},
},
expectedDataSync: []sync.DataSync{
{
FlagData: fetchFileContents,
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
Type: sync.ALL,
},
{
FlagData: defaultState,
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
Type: sync.DELETE,
},
},
},
@ -172,9 +144,6 @@ func TestSimpleSync(t *testing.T) {
if data.Source != syncEvent.Source {
t.Errorf("expected source: %s, but received source: %s", syncEvent.Source, data.Source)
}
if data.Type != syncEvent.Type {
t.Errorf("expected type: %b, but received type: %b", syncEvent.Type, data.Type)
}
case <-time.After(10 * time.Second):
t.Errorf("event not found, timeout out after 10 seconds")
}

View File

@ -106,7 +106,6 @@ func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
dataSync <- sync.DataSync{
FlagData: res.GetFlagConfiguration(),
Source: g.URI,
Type: sync.ALL,
}
return nil
}
@ -200,11 +199,10 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
}
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
FlagData: data.FlagConfiguration,
SyncContext: data.SyncContext,
Source: g.URI,
Selector: g.Selector,
Type: sync.ALL,
Source: g.URI,
Selector: g.Selector,
}
g.Logger.Debug("received full configuration payload")

View File

@ -16,7 +16,6 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
credendialsmock "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials/mock"
grpcmock "github.com/open-feature/flagd/core/pkg/sync/grpc/mock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
@ -124,7 +123,6 @@ func Test_ReSyncTests(t *testing.T) {
notifications: []sync.DataSync{
{
FlagData: "success",
Type: sync.ALL,
},
},
shouldError: false,
@ -181,9 +179,6 @@ func Test_ReSyncTests(t *testing.T) {
for _, expected := range test.notifications {
out := <-syncChan
if expected.Type != out.Type {
t.Errorf("Returned sync type = %v, wanted %v", out.Type, expected.Type)
}
if expected.FlagData != out.FlagData {
t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData)
@ -197,100 +192,6 @@ func Test_ReSyncTests(t *testing.T) {
}
}
func TestSync_BasicFlagSyncStates(t *testing.T) {
grpcSyncImpl := Sync{
URI: "grpc://test",
ProviderID: "",
Logger: logger.NewLogger(nil, false),
}
mockError := errors.New("could not sync")
tests := []struct {
name string
stream syncv1grpc.FlagSyncService_SyncFlagsClient
setup func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse)
want sync.Type
wantError error
ready bool
}{
{
name: "State All maps to Sync All",
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
metadata, err := structpb.NewStruct(map[string]any{"sources": "A,B,C"})
if err != nil {
t.Fatalf("Failed to create sync context: %v", err)
}
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
gomock.InOrder(
clientResponse.EXPECT().Recv().Return(
&v1.SyncFlagsResponse{
FlagConfiguration: "{}",
SyncContext: metadata,
},
nil,
),
clientResponse.EXPECT().Recv().Return(
nil, io.EOF,
),
)
},
want: sync.ALL,
ready: true,
},
{
name: "Error during flag sync",
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
clientResponse.EXPECT().Recv().Return(
nil,
mockError,
)
},
ready: true,
want: -1,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
syncChan := make(chan sync.DataSync, 1)
mockClient := grpcmock.NewMockFlagSyncServiceClient(ctrl)
mockClientResponse := grpcmock.NewMockFlagSyncServiceClientResponse(ctrl)
test.setup(t, mockClient, mockClientResponse)
waitChan := make(chan struct{})
go func() {
grpcSyncImpl.client = mockClient
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := grpcSyncImpl.Sync(ctx, syncChan)
if err != nil {
t.Errorf("Error handling flag sync: %v", err)
}
close(waitChan)
}()
<-waitChan
if test.want < 0 {
require.Empty(t, syncChan)
return
}
data := <-syncChan
if grpcSyncImpl.IsReady() != test.ready {
t.Errorf("expected grpcSyncImpl.ready to be: '%v', got: '%v'", test.ready, grpcSyncImpl.ready)
}
if data.Type != test.want {
t.Errorf("Returned data sync state = %v, wanted %v", data.Type, test.want)
}
})
}
}
func Test_StreamListener(t *testing.T) {
const target = "localBufCon"
@ -315,7 +216,6 @@ func Test_StreamListener(t *testing.T) {
{
FlagData: "{\"flags\": {}}",
SyncContext: metadata,
Type: sync.ALL,
},
},
},
@ -331,14 +231,12 @@ func Test_StreamListener(t *testing.T) {
},
output: []sync.DataSync{
{
FlagData: "{}",
FlagData: "{}",
SyncContext: metadata,
Type: sync.ALL,
},
{
FlagData: "{\"flags\": {}}",
SyncContext: metadata,
Type: sync.ALL,
},
},
},
@ -391,10 +289,6 @@ func Test_StreamListener(t *testing.T) {
for _, expected := range test.output {
out := <-syncChan
if expected.Type != out.Type {
t.Errorf("Returned sync type = %v, wanted %v", out.Type, expected.Type)
}
if expected.FlagData != out.FlagData {
t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData)
}
@ -485,8 +379,7 @@ func Test_SyncRetry(t *testing.T) {
// Setup
target := "grpc://local"
bufListener := bufconn.Listen(1)
expectType := sync.ALL
emptyFlagData := "{}"
// buffer based server. response ignored purposefully
bServer := bufferedServer{listener: bufListener, mockResponses: []serverPayload{
@ -540,7 +433,7 @@ func Test_SyncRetry(t *testing.T) {
t.Errorf("timeout waiting for conditions to fulfil")
break
case data := <-syncChan:
if data.Type != expectType {
if data.FlagData != emptyFlagData {
t.Errorf("sync start error: %s", err.Error())
}
}
@ -560,9 +453,9 @@ func Test_SyncRetry(t *testing.T) {
case <-tCtx.Done():
cancelFunc()
t.Error("timeout waiting for conditions to fulfil")
case rsp := <-syncChan:
if rsp.Type != expectType {
t.Errorf("expected response: %s, but got: %s", expectType, rsp.Type)
case data := <-syncChan:
if data.FlagData != emptyFlagData {
t.Errorf("sync start error: %s", err.Error())
}
}
}

View File

@ -46,7 +46,7 @@ func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
if err != nil {
return err
}
dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI}
return nil
}
@ -89,10 +89,10 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
if hs.LastBodySHA == "" {
hs.Logger.Debug("new configuration created")
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI}
} else if hs.LastBodySHA != currentSHA {
hs.Logger.Debug("configuration modified")
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI}
}
hs.LastBodySHA = currentSHA
@ -100,7 +100,7 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
hs.Cron.Start()
dataSync <- sync.DataSync{FlagData: fetch, Source: hs.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: fetch, Source: hs.URI}
<-ctx.Done()
hs.Cron.Stop()

View File

@ -5,7 +5,6 @@ import (
"io"
"log"
"net/http"
"reflect"
"strings"
"testing"
"time"
@ -289,6 +288,8 @@ func TestSync_Init(t *testing.T) {
func TestHTTPSync_Resync(t *testing.T) {
ctrl := gomock.NewController(t)
source := "http://localhost"
emptyeFlagData := "{}"
tests := map[string]struct {
setup func(t *testing.T, client *syncmock.MockClient)
@ -303,11 +304,11 @@ func TestHTTPSync_Resync(t *testing.T) {
setup: func(_ *testing.T, client *syncmock.MockClient) {
client.EXPECT().Do(gomock.Any()).Return(&http.Response{
Header: map[string][]string{"Content-Type": {"application/json"}},
Body: io.NopCloser(strings.NewReader("")),
Body: io.NopCloser(strings.NewReader(emptyeFlagData)),
StatusCode: http.StatusOK,
}, nil)
},
uri: "http://localhost",
uri: source,
handleResponse: func(t *testing.T, _ Sync, fetched string, err error) {
if err != nil {
t.Fatalf("fetch: %v", err)
@ -320,9 +321,8 @@ func TestHTTPSync_Resync(t *testing.T) {
wantErr: false,
wantNotifications: []sync.DataSync{
{
Type: sync.ALL,
FlagData: "",
Source: "",
FlagData: emptyeFlagData,
Source: source,
},
},
},
@ -364,8 +364,8 @@ func TestHTTPSync_Resync(t *testing.T) {
for _, dataSync := range tt.wantNotifications {
select {
case x := <-d:
if !reflect.DeepEqual(x.String(), dataSync.String()) {
t.Error("unexpected datasync received", x, dataSync)
if x.FlagData != dataSync.FlagData || x.Source != dataSync.Source {
t.Errorf("unexpected datasync received %v vs %v", x, dataSync)
}
case <-time.After(2 * time.Second):
t.Error("expected datasync not received", dataSync)

View File

@ -6,35 +6,6 @@ import (
"google.golang.org/protobuf/types/known/structpb"
)
type Type int
// Type of the sync operation
const (
// ALL - All flags of sync provider. This is the default if unset due to primitive default
ALL Type = iota
// ADD - Additional flags from sync provider
ADD
// UPDATE - Update for flag(s) previously provided
UPDATE
// DELETE - Delete for flag(s) previously provided
DELETE
)
func (t Type) String() string {
switch t {
case ALL:
return "ALL"
case ADD:
return "ADD"
case UPDATE:
return "UPDATE"
case DELETE:
return "DELETE"
default:
return "UNKNOWN"
}
}
/*
ISync implementations watch for changes in the flag sources (HTTP backend, local file, K8s CRDs ...),fetch the latest
value and communicate to the Runtime with DataSync channel
@ -57,11 +28,10 @@ type ISync interface {
// DataSync is the data contract between Runtime and sync implementations
type DataSync struct {
FlagData string
FlagData string
SyncContext *structpb.Struct
Source string
Selector string
Type
Source string
Selector string
}
// SourceConfig is configuration option for flagd. This maps to startup parameter sources

View File

@ -57,7 +57,7 @@ func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
if err != nil {
return fmt.Errorf("unable to fetch flag configuration: %w", err)
}
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI}
return nil
}
@ -97,7 +97,7 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return err
}
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI}
notifies := make(chan INotify)
@ -136,7 +136,7 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
continue
}
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI}
case DefaultEventTypeModify:
k.logger.Debug("Configuration modified")
msg, err := k.fetch(ctx)
@ -145,7 +145,7 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
continue
}
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI}
case DefaultEventTypeDelete:
k.logger.Debug("configuration deleted")
case DefaultEventTypeReady:

View File

@ -607,6 +607,7 @@ func TestInit(t *testing.T) {
func TestSync_ReSync(t *testing.T) {
const name = "myFF"
const ns = "myNS"
const payload = "{\"flags\":null}"
s := runtime.NewScheme()
ff := &unstructured.Unstructured{}
ff.SetUnstructuredContent(getCFG(name, ns))
@ -668,8 +669,8 @@ func TestSync_ReSync(t *testing.T) {
i := tt.countMsg
for i > 0 {
d := <-dataChannel
if d.Type != sync.ALL {
t.Errorf("Expected %v, got %v", sync.ALL, d)
if d.FlagData != payload {
t.Errorf("Expected %v, got %v", payload, d.FlagData)
}
i--
}

View File

@ -104,7 +104,6 @@ func (l *oldHandler) SyncFlags(
case d := <-dataSync:
if err := stream.Send(&syncv1.SyncFlagsResponse{
FlagConfiguration: d.FlagData,
State: dataSyncToGrpcState(d),
}); err != nil {
return fmt.Errorf("error sending configuration change event: %w", err)
}
@ -115,8 +114,3 @@ func (l *oldHandler) SyncFlags(
}
}
}
//nolint:staticcheck
func dataSyncToGrpcState(s sync.DataSync) syncv1.SyncState {
return syncv1.SyncState(s.Type + 1)
}

View File

@ -110,7 +110,6 @@ func Test_watchResource(t *testing.T) {
in := isync.DataSync{
FlagData: "im a flag",
Source: "im a flag source",
Type: isync.ALL,
}
syncMock.dataSyncChanIn <- in
@ -335,7 +334,6 @@ func Test_FetchAllFlags(t *testing.T) {
mockData: &isync.DataSync{
FlagData: "im a flag",
Source: "im a flag source",
Type: isync.ALL,
},
setHandler: true,
},
@ -402,7 +400,6 @@ func Test_registerSubscriptionResyncPath(t *testing.T) {
data: &isync.DataSync{
FlagData: "im a flag",
Source: "im a flag source",
Type: isync.ALL,
},
expectErr: false,
},