feat!: remove sync.Type (#1691)

The PR removes the obsolete `sync.Type`. This was intended to be used to
support partial updates from sync sources, but it's never been used and
it's lagging behind on features and would not work fully if implemented
by a source (`metadata` is not properly implemented). Moreover, we're
not convinced the value is worth the complexity it adds.

Specifically:

- removes `sync.Type` and all references
- removes tests for `sync.Type` and updates assertions for unrelated
tests which previously asserted on `sync.Type` (now we use the
payload/source in assertions)
- (unrelated) updates `CONTRIBUTING.md` to include a bunch of helpful
manual testing steps

Note that this is a breaking change, but only for the `flagd/core`
library, NOT for flagd or any sync source itself in terms of their
behavior. Since there was no change in `flagd/`, this will not show up
in the CHANGELOG.

Resolves: https://github.com/open-feature/flagd/issues/1678

Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
This commit is contained in:
Todd Baert 2025-07-21 08:01:56 -04:00 committed by GitHub
parent 81855d76f9
commit ac647e0656
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 127 additions and 618 deletions

View File

@ -8,6 +8,24 @@ TLDR: be respectful.
Any contributions are expected to include unit tests. Any contributions are expected to include unit tests.
These can be validated with `make test` or the automated github workflow will run them on PR creation. These can be validated with `make test` or the automated github workflow will run them on PR creation.
## Development
### Prerequisites
You'll need:
- Go
- make
- docker
You'll want:
- curl (for calling HTTP endpoints)
- [grpcurl](https://github.com/fullstorydev/grpcurl) (for making gRPC calls)
- jq (for pretty printing responses)
### Workspace Initialization
This project uses a go workspace, to setup the project run This project uses a go workspace, to setup the project run
```shell ```shell
@ -22,6 +40,63 @@ The project uses remote buf packages, changing the remote generation source will
export GOPRIVATE=buf.build/gen/go export GOPRIVATE=buf.build/gen/go
``` ```
### Manual testing
flagd has a number of interfaces (you can read more about than at [flagd.dev](https://flagd.dev/)) which can be used to evaluate flags, or deliver flag configurations so that they can be evaluated by _in-process_ providers.
You can manually test this functionality by starting flagd (from the flagd/ directory) with `go run main.go start -f file:../config/samples/example_flags.flagd.json`.
NOTE: you will need `go, curl`
#### Remote single flag evaluation via HTTP1.1/Connect
```sh
# evaluates a single boolean flag
curl -X POST -d '{"flagKey":"myBoolFlag","context":{}}' -H "Content-Type: application/json" "http://localhost:8013/flagd.evaluation.v1.Service/ResolveBoolean" | jq
```
#### Remote single flag evaluation via HTTP1.1/OFREP
```sh
# evaluates a single boolean flag
curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags/myBoolFlag' | jq
```
#### Remote single flag evaluation via gRPC
```sh
# evaluates a single boolean flag
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{"flagKey":"myBoolFlag"}' localhost:8013 flagd.evaluation.v1.Service/ResolveBoolean | jq
```
#### Remote bulk evaluation via via HTTP1.1/OFREP
```sh
# evaluates flags in bulk
curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags' | jq
```
#### Remote bulk evaluation via gRPC
```sh
# evaluates flags in bulk
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/ResolveAll | jq
```
#### Flag configuration fetch via gRPC
```sh
# sends back a representation of all flags
grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/FetchAllFlags | jq
```
#### Flag synchronization stream via gRPC
```sh
# will open a persistent stream which sends flag changes when the watched source is modified
grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
```
## DCO Sign-Off ## DCO Sign-Off
A DCO (Developer Certificate of Origin) sign-off is a line placed at the end of A DCO (Developer Certificate of Origin) sign-off is a line placed at the end of

View File

@ -103,8 +103,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
_, span := je.jsonEvalTracer.Start( _, span := je.jsonEvalTracer.Start(
context.Background(), context.Background(),
"flagSync", "flagSync",
trace.WithAttributes(attribute.String("feature_flag.source", payload.Source)), trace.WithAttributes(attribute.String("feature_flag.source", payload.Source)))
trace.WithAttributes(attribute.String("feature_flag.sync_type", payload.String())))
defer span.End() defer span.End()
var definition Definition var definition Definition
@ -119,19 +118,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
var events map[string]interface{} var events map[string]interface{}
var reSync bool var reSync bool
// TODO: We do not handle metadata in ADD/UPDATE operations. These are only relevant for grpc sync implementations. events, reSync = je.store.Update(je.Logger, payload.Source, payload.Selector, definition.Flags, definition.Metadata)
switch payload.Type {
case sync.ALL:
events, reSync = je.store.Merge(je.Logger, payload.Source, payload.Selector, definition.Flags, definition.Metadata)
case sync.ADD:
events = je.store.Add(je.Logger, payload.Source, payload.Selector, definition.Flags)
case sync.UPDATE:
events = je.store.Update(je.Logger, payload.Source, payload.Selector, definition.Flags)
case sync.DELETE:
events = je.store.DeleteFlags(je.Logger, payload.Source, definition.Flags)
default:
return nil, false, fmt.Errorf("unsupported sync type: %d", payload.Type)
}
// Number of events correlates to the number of flags changed through this sync, record it // Number of events correlates to the number of flags changed through this sync, record it
span.SetAttributes(attribute.Int("feature_flag.change_count", len(events))) span.SetAttributes(attribute.Int("feature_flag.change_count", len(events)))

View File

@ -127,7 +127,7 @@ const UndefinedDefaultWithTargetting = `{
const ( const (
FlagSetID = "testSetId" FlagSetID = "testSetId"
Version = "v33" Version = "v33"
ValidFlag = "validFlag" ValidFlag = "validFlag"
MissingFlag = "missingFlag" MissingFlag = "missingFlag"
StaticBoolFlag = "staticBoolFlag" StaticBoolFlag = "staticBoolFlag"
StaticBoolValue = true StaticBoolValue = true
@ -956,7 +956,7 @@ func TestResolveAsAnyValue(t *testing.T) {
func TestResolve_DefaultVariant(t *testing.T) { func TestResolve_DefaultVariant(t *testing.T) {
tests := []struct { tests := []struct {
flags string flags string
flagKey string flagKey string
context map[string]interface{} context map[string]interface{}
reason string reason string
@ -1050,7 +1050,6 @@ func TestSetState_DefaultVariantValidation(t *testing.T) {
func TestState_Evaluator(t *testing.T) { func TestState_Evaluator(t *testing.T) {
tests := map[string]struct { tests := map[string]struct {
inputState string inputState string
inputSyncType sync.Type
expectedOutputState string expectedOutputState string
expectedError bool expectedError bool
expectedResync bool expectedResync bool
@ -1086,7 +1085,6 @@ func TestState_Evaluator(t *testing.T) {
} }
} }
`, `,
inputSyncType: sync.ALL,
expectedOutputState: ` expectedOutputState: `
{ {
"flags": { "flags": {
@ -1147,7 +1145,6 @@ func TestState_Evaluator(t *testing.T) {
} }
} }
`, `,
inputSyncType: sync.ALL,
expectedOutputState: ` expectedOutputState: `
{ {
"flags": { "flags": {
@ -1204,7 +1201,6 @@ func TestState_Evaluator(t *testing.T) {
} }
} }
`, `,
inputSyncType: sync.ALL,
expectedError: true, expectedError: true,
}, },
"invalid targeting": { "invalid targeting": {
@ -1258,7 +1254,6 @@ func TestState_Evaluator(t *testing.T) {
"flagSources":null "flagSources":null
} }
`, `,
inputSyncType: sync.ALL,
expectedError: false, expectedError: false,
expectedOutputState: ` expectedOutputState: `
{ {
@ -1341,40 +1336,8 @@ func TestState_Evaluator(t *testing.T) {
} }
} }
`, `,
inputSyncType: sync.ALL,
expectedError: true, expectedError: true,
}, },
"unexpected sync type": {
inputState: `
{
"flags": {
"fibAlgo": {
"variants": {
"recursive": "recursive",
"memo": "memo",
"loop": "loop",
"binet": "binet"
},
"defaultVariant": "recursive",
"state": "ENABLED",
"targeting": {
"if": [
{
"$ref": "emailWithFaas"
}, "binet", null
]
}
}
},
"$evaluators": {
"emailWithFaas": ""
}
}
`,
inputSyncType: 999,
expectedError: true,
expectedResync: false,
},
} }
for name, tt := range tests { for name, tt := range tests {
@ -1423,11 +1386,9 @@ func TestState_Evaluator(t *testing.T) {
func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) { func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
tests := map[string]struct { tests := map[string]struct {
dataSyncType sync.Type
flagResolution func(evaluator *evaluator.JSON) error flagResolution func(evaluator *evaluator.JSON) error
}{ }{
"Add_ResolveAllValues": { "Add_ResolveAllValues": {
dataSyncType: sync.ADD,
flagResolution: func(evaluator *evaluator.JSON) error { flagResolution: func(evaluator *evaluator.JSON) error {
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil) _, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil { if err != nil {
@ -1437,7 +1398,6 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
}, },
}, },
"Update_ResolveAllValues": { "Update_ResolveAllValues": {
dataSyncType: sync.UPDATE,
flagResolution: func(evaluator *evaluator.JSON) error { flagResolution: func(evaluator *evaluator.JSON) error {
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil) _, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil { if err != nil {
@ -1447,7 +1407,6 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
}, },
}, },
"Delete_ResolveAllValues": { "Delete_ResolveAllValues": {
dataSyncType: sync.DELETE,
flagResolution: func(evaluator *evaluator.JSON) error { flagResolution: func(evaluator *evaluator.JSON) error {
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil) _, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil { if err != nil {
@ -1457,35 +1416,30 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
}, },
}, },
"Add_ResolveBooleanValue": { "Add_ResolveBooleanValue": {
dataSyncType: sync.ADD,
flagResolution: func(evaluator *evaluator.JSON) error { flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveBooleanValue(context.TODO(), "", StaticBoolFlag, nil) _, _, _, _, err := evaluator.ResolveBooleanValue(context.TODO(), "", StaticBoolFlag, nil)
return err return err
}, },
}, },
"Update_ResolveStringValue": { "Update_ResolveStringValue": {
dataSyncType: sync.UPDATE,
flagResolution: func(evaluator *evaluator.JSON) error { flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveBooleanValue(context.TODO(), "", StaticStringValue, nil) _, _, _, _, err := evaluator.ResolveBooleanValue(context.TODO(), "", StaticStringValue, nil)
return err return err
}, },
}, },
"Delete_ResolveIntValue": { "Delete_ResolveIntValue": {
dataSyncType: sync.DELETE,
flagResolution: func(evaluator *evaluator.JSON) error { flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveIntValue(context.TODO(), "", StaticIntFlag, nil) _, _, _, _, err := evaluator.ResolveIntValue(context.TODO(), "", StaticIntFlag, nil)
return err return err
}, },
}, },
"Add_ResolveFloatValue": { "Add_ResolveFloatValue": {
dataSyncType: sync.ADD,
flagResolution: func(evaluator *evaluator.JSON) error { flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveFloatValue(context.TODO(), "", StaticFloatFlag, nil) _, _, _, _, err := evaluator.ResolveFloatValue(context.TODO(), "", StaticFloatFlag, nil)
return err return err
}, },
}, },
"Update_ResolveObjectValue": { "Update_ResolveObjectValue": {
dataSyncType: sync.UPDATE,
flagResolution: func(evaluator *evaluator.JSON) error { flagResolution: func(evaluator *evaluator.JSON) error {
_, _, _, _, err := evaluator.ResolveObjectValue(context.TODO(), "", StaticObjectFlag, nil) _, _, _, _, err := evaluator.ResolveObjectValue(context.TODO(), "", StaticObjectFlag, nil)
return err return err
@ -1497,7 +1451,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags()) jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Type: sync.ADD}) _, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1520,7 +1474,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
errChan <- nil errChan <- nil
return return
default: default:
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Type: tt.dataSyncType}) _, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags})
if err != nil { if err != nil {
errChan <- err errChan <- err
return return

View File

@ -146,112 +146,8 @@ func (f *State) Add(logger *logger.Logger, source string, selector string, flags
return notifications return notifications
} }
// Update existing flags from source. // Update the flag state with the provided flags.
func (f *State) Update(logger *logger.Logger, source string, selector string, flags map[string]model.Flag, func (f *State) Update(
) map[string]interface{} {
notifications := map[string]interface{}{}
for k, flag := range flags {
storedFlag, _, ok := f.Get(context.Background(), k)
if !ok {
logger.Warn(
fmt.Sprintf("failed to update the flag, flag with key %s from source %s does not exist.",
k,
source))
continue
}
if !f.hasPriority(storedFlag.Source, source) {
logger.Debug(
fmt.Sprintf(
"not updating: flag %s from source %s does not have priority over %s",
k,
source,
storedFlag.Source,
),
)
continue
}
notifications[k] = map[string]interface{}{
"type": string(model.NotificationUpdate),
"source": source,
}
flag.Source = source
flag.Selector = selector
f.Set(k, flag)
}
return notifications
}
// DeleteFlags matching flags from source.
func (f *State) DeleteFlags(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
logger.Debug(
fmt.Sprintf(
"store resync triggered: delete event from source %s",
source,
),
)
ctx := context.Background()
_, ok := f.MetadataPerSource[source]
if ok {
delete(f.MetadataPerSource, source)
}
notifications := map[string]interface{}{}
if len(flags) == 0 {
allFlags, _, err := f.GetAll(ctx)
if err != nil {
logger.Error(fmt.Sprintf("error while retrieving flags from the store: %v", err))
return notifications
}
for key, flag := range allFlags {
if flag.Source != source {
continue
}
notifications[key] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}
f.Delete(key)
}
}
for k := range flags {
flag, _, ok := f.Get(ctx, k)
if ok {
if !f.hasPriority(flag.Source, source) {
logger.Debug(
fmt.Sprintf(
"not deleting: flag %s from source %s cannot be deleted by %s",
k,
flag.Source,
source,
),
)
continue
}
notifications[k] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}
f.Delete(k)
} else {
logger.Warn(
fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exist.",
k,
source))
}
}
return notifications
}
// Merge provided flags from source with currently stored flags.
// nolint: funlen
func (f *State) Merge(
logger *logger.Logger, logger *logger.Logger,
source string, source string,
selector string, selector string,

View File

@ -219,7 +219,7 @@ func TestMergeFlags(t *testing.T) {
tt := tt tt := tt
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
t.Parallel() t.Parallel()
gotNotifs, resyncRequired := tt.current.Merge(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new, model.Metadata{}) gotNotifs, resyncRequired := tt.current.Update(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new, model.Metadata{})
require.True(t, reflect.DeepEqual(tt.want.Flags, tt.current.Flags)) require.True(t, reflect.DeepEqual(tt.want.Flags, tt.current.Flags))
require.Equal(t, tt.wantNotifs, gotNotifs) require.Equal(t, tt.wantNotifs, gotNotifs)
@ -324,222 +324,3 @@ func TestFlags_Add(t *testing.T) {
}) })
} }
} }
func TestFlags_Update(t *testing.T) {
mockLogger := logger.NewLogger(nil, false)
mockSource := "source"
mockOverrideSource := "source-2"
type request struct {
source string
selector string
flags map[string]model.Flag
}
tests := []struct {
name string
storedState *State
UpdateRequest request
expectedState *State
expectedNotificationKeys []string
}{
{
name: "Update success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "True"},
},
},
UpdateRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedNotificationKeys: []string{"A"},
},
{
name: "Update multiple success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "True"},
"B": {Source: mockSource, DefaultVariant: "True"},
},
},
UpdateRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
"B": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "False"},
"B": {Source: mockSource, DefaultVariant: "False"},
},
},
expectedNotificationKeys: []string{"A", "B"},
},
{
name: "Update success - conflict and override",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource, DefaultVariant: "True"},
},
},
UpdateRequest: request{
source: mockOverrideSource,
flags: map[string]model.Flag{
"A": {Source: mockOverrideSource, DefaultVariant: "True"},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockOverrideSource, DefaultVariant: "True"},
},
},
expectedNotificationKeys: []string{"A"},
},
{
name: "Update fail",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
UpdateRequest: request{
source: mockSource,
flags: map[string]model.Flag{
"B": {Source: mockSource},
},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
},
},
expectedNotificationKeys: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := tt.storedState.Update(mockLogger, tt.UpdateRequest.source,
tt.UpdateRequest.selector, tt.UpdateRequest.flags)
require.Equal(t, tt.storedState, tt.expectedState)
for k := range messages {
require.Containsf(t, tt.expectedNotificationKeys, k,
"Message key %s not present in the expected key list", k)
}
})
}
}
func TestFlags_Delete(t *testing.T) {
mockLogger := logger.NewLogger(nil, false)
mockSource := "source"
mockSource2 := "source2"
tests := []struct {
name string
storedState *State
deleteRequest map[string]model.Flag
expectedState *State
expectedNotificationKeys []string
}{
{
name: "Remove success",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
deleteRequest: map[string]model.Flag{
"A": {Source: mockSource},
},
expectedState: &State{
Flags: map[string]model.Flag{
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
expectedNotificationKeys: []string{"A"},
},
{
name: "Nothing to remove",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
deleteRequest: map[string]model.Flag{
"C": {Source: mockSource},
},
expectedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
FlagSources: []string{
mockSource,
mockSource2,
},
},
expectedNotificationKeys: []string{},
},
{
name: "Remove all",
storedState: &State{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
},
deleteRequest: map[string]model.Flag{},
expectedState: &State{
Flags: map[string]model.Flag{
"C": {Source: mockSource2},
},
},
expectedNotificationKeys: []string{"A", "B"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := tt.storedState.DeleteFlags(mockLogger, mockSource, tt.deleteRequest)
require.Equal(t, tt.storedState, tt.expectedState)
for k := range messages {
require.Containsf(t, tt.expectedNotificationKeys, k,
"Message key %s not present in the expected key list", k)
}
})
}
}

View File

@ -104,7 +104,7 @@ func (hs *Sync) sync(ctx context.Context, dataSync chan<- sync.DataSync, skipChe
if !skipCheckingModTime { if !skipCheckingModTime {
hs.lastUpdated = updated hs.lastUpdated = updated
} }
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL} dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object}
return nil return nil
} }

View File

@ -52,7 +52,7 @@ func NewFileSync(uri string, watchType string, logger *logger.Logger) *Sync {
const defaultState = "{}" const defaultState = "{}"
func (fs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { func (fs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
fs.sendDataSync(ctx, sync.ALL, dataSync) fs.sendDataSync(ctx, dataSync)
return nil return nil
} }
@ -94,7 +94,7 @@ func (fs *Sync) setReady(val bool) {
//nolint:funlen //nolint:funlen
func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
defer fs.watcher.Close() defer fs.watcher.Close()
fs.sendDataSync(ctx, sync.ALL, dataSync) fs.sendDataSync(ctx, dataSync)
fs.setReady(true) fs.setReady(true)
fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI)) fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI))
for { for {
@ -108,7 +108,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
fs.Logger.Info(fmt.Sprintf("filepath event: %s %s", event.Name, event.Op.String())) fs.Logger.Info(fmt.Sprintf("filepath event: %s %s", event.Name, event.Op.String()))
switch { switch {
case event.Has(fsnotify.Create) || event.Has(fsnotify.Write): case event.Has(fsnotify.Create) || event.Has(fsnotify.Write):
fs.sendDataSync(ctx, sync.ALL, dataSync) fs.sendDataSync(ctx, dataSync)
case event.Has(fsnotify.Remove): case event.Has(fsnotify.Remove):
// K8s exposes config maps as symlinks. // K8s exposes config maps as symlinks.
// Updates cause a remove event, we need to re-add the watcher in this case. // Updates cause a remove event, we need to re-add the watcher in this case.
@ -116,20 +116,20 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
if err != nil { if err != nil {
// the watcher could not be re-added, so the file must have been deleted // the watcher could not be re-added, so the file must have been deleted
fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error())) fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error()))
fs.sendDataSync(ctx, sync.DELETE, dataSync) fs.sendDataSync(ctx, dataSync)
continue continue
} }
// Counterintuitively, remove events are the only meaningful ones seen in K8s. // Counterintuitively, remove events are the only meaningful ones seen in K8s.
// K8s handles mounted ConfigMap updates by modifying symbolic links, which is an atomic operation. // K8s handles mounted ConfigMap updates by modifying symbolic links, which is an atomic operation.
// At the point the remove event is fired, we have our new data, so we can send it down the channel. // At the point the remove event is fired, we have our new data, so we can send it down the channel.
fs.sendDataSync(ctx, sync.ALL, dataSync) fs.sendDataSync(ctx, dataSync)
case event.Has(fsnotify.Chmod): case event.Has(fsnotify.Chmod):
// on linux the REMOVE event will not fire until all file descriptors are closed, this cannot happen // on linux the REMOVE event will not fire until all file descriptors are closed, this cannot happen
// while the file is being watched, os.Stat is used here to infer deletion // while the file is being watched, os.Stat is used here to infer deletion
if _, err := os.Stat(fs.URI); errors.Is(err, os.ErrNotExist) { if _, err := os.Stat(fs.URI); errors.Is(err, os.ErrNotExist) {
fs.Logger.Error(fmt.Sprintf("file has been deleted: %s", err.Error())) fs.Logger.Error(fmt.Sprintf("file has been deleted: %s", err.Error()))
fs.sendDataSync(ctx, sync.DELETE, dataSync) fs.sendDataSync(ctx, dataSync)
} }
} }
@ -147,14 +147,8 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
} }
} }
func (fs *Sync) sendDataSync(ctx context.Context, syncType sync.Type, dataSync chan<- sync.DataSync) { func (fs *Sync) sendDataSync(ctx context.Context, dataSync chan<- sync.DataSync) {
fs.Logger.Debug(fmt.Sprintf("Configuration %s: %s", fs.URI, syncType.String())) fs.Logger.Debug(fmt.Sprintf("Data sync received for %s", fs.URI))
if syncType == sync.DELETE {
// Skip fetching and emit default state to avoid EOF errors
dataSync <- sync.DataSync{FlagData: defaultState, Source: fs.URI, Type: syncType}
return
}
msg := defaultState msg := defaultState
m, err := fs.fetch(ctx) m, err := fs.fetch(ctx)
@ -167,7 +161,7 @@ func (fs *Sync) sendDataSync(ctx context.Context, syncType sync.Type, dataSync c
msg = m msg = m
} }
dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI, Type: syncType} dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI}
} }
func (fs *Sync) fetch(_ context.Context) (string, error) { func (fs *Sync) fetch(_ context.Context) (string, error) {

View File

@ -26,7 +26,6 @@ func TestSimpleReSync(t *testing.T) {
expectedDataSync := sync.DataSync{ expectedDataSync := sync.DataSync{
FlagData: "hello", FlagData: "hello",
Source: source, Source: source,
Type: sync.ALL,
} }
handler := Sync{ handler := Sync{
URI: source, URI: source,
@ -59,7 +58,6 @@ func TestSimpleReSync(t *testing.T) {
func TestSimpleSync(t *testing.T) { func TestSimpleSync(t *testing.T) {
readDirName := t.TempDir() readDirName := t.TempDir()
updateDirName := t.TempDir() updateDirName := t.TempDir()
deleteDirName := t.TempDir()
tests := map[string]struct { tests := map[string]struct {
manipulationFuncs []func(t *testing.T) manipulationFuncs []func(t *testing.T)
expectedDataSync []sync.DataSync expectedDataSync []sync.DataSync
@ -76,7 +74,6 @@ func TestSimpleSync(t *testing.T) {
{ {
FlagData: fetchFileContents, FlagData: fetchFileContents,
Source: fmt.Sprintf("%s/%s", readDirName, fetchFileName), Source: fmt.Sprintf("%s/%s", readDirName, fetchFileName),
Type: sync.ALL,
}, },
}, },
}, },
@ -94,35 +91,10 @@ func TestSimpleSync(t *testing.T) {
{ {
FlagData: fetchFileContents, FlagData: fetchFileContents,
Source: fmt.Sprintf("%s/%s", updateDirName, fetchFileName), Source: fmt.Sprintf("%s/%s", updateDirName, fetchFileName),
Type: sync.ALL,
}, },
{ {
FlagData: "new content", FlagData: "new content",
Source: fmt.Sprintf("%s/%s", updateDirName, fetchFileName), Source: fmt.Sprintf("%s/%s", updateDirName, fetchFileName),
Type: sync.ALL,
},
},
},
"delete-event": {
fetchDirName: deleteDirName,
manipulationFuncs: []func(t *testing.T){
func(t *testing.T) {
writeToFile(t, deleteDirName, fetchFileContents)
},
func(t *testing.T) {
deleteFile(t, deleteDirName, fetchFileName)
},
},
expectedDataSync: []sync.DataSync{
{
FlagData: fetchFileContents,
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
Type: sync.ALL,
},
{
FlagData: defaultState,
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
Type: sync.DELETE,
}, },
}, },
}, },
@ -172,9 +144,6 @@ func TestSimpleSync(t *testing.T) {
if data.Source != syncEvent.Source { if data.Source != syncEvent.Source {
t.Errorf("expected source: %s, but received source: %s", syncEvent.Source, data.Source) t.Errorf("expected source: %s, but received source: %s", syncEvent.Source, data.Source)
} }
if data.Type != syncEvent.Type {
t.Errorf("expected type: %b, but received type: %b", syncEvent.Type, data.Type)
}
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
t.Errorf("event not found, timeout out after 10 seconds") t.Errorf("event not found, timeout out after 10 seconds")
} }

View File

@ -106,7 +106,6 @@ func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
dataSync <- sync.DataSync{ dataSync <- sync.DataSync{
FlagData: res.GetFlagConfiguration(), FlagData: res.GetFlagConfiguration(),
Source: g.URI, Source: g.URI,
Type: sync.ALL,
} }
return nil return nil
} }
@ -200,11 +199,10 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
} }
dataSync <- sync.DataSync{ dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration, FlagData: data.FlagConfiguration,
SyncContext: data.SyncContext, SyncContext: data.SyncContext,
Source: g.URI, Source: g.URI,
Selector: g.Selector, Selector: g.Selector,
Type: sync.ALL,
} }
g.Logger.Debug("received full configuration payload") g.Logger.Debug("received full configuration payload")

View File

@ -16,7 +16,6 @@ import (
"github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync" "github.com/open-feature/flagd/core/pkg/sync"
credendialsmock "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials/mock" credendialsmock "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials/mock"
grpcmock "github.com/open-feature/flagd/core/pkg/sync/grpc/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/mock/gomock" "go.uber.org/mock/gomock"
"go.uber.org/zap" "go.uber.org/zap"
@ -124,7 +123,6 @@ func Test_ReSyncTests(t *testing.T) {
notifications: []sync.DataSync{ notifications: []sync.DataSync{
{ {
FlagData: "success", FlagData: "success",
Type: sync.ALL,
}, },
}, },
shouldError: false, shouldError: false,
@ -181,9 +179,6 @@ func Test_ReSyncTests(t *testing.T) {
for _, expected := range test.notifications { for _, expected := range test.notifications {
out := <-syncChan out := <-syncChan
if expected.Type != out.Type {
t.Errorf("Returned sync type = %v, wanted %v", out.Type, expected.Type)
}
if expected.FlagData != out.FlagData { if expected.FlagData != out.FlagData {
t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData) t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData)
@ -197,100 +192,6 @@ func Test_ReSyncTests(t *testing.T) {
} }
} }
func TestSync_BasicFlagSyncStates(t *testing.T) {
grpcSyncImpl := Sync{
URI: "grpc://test",
ProviderID: "",
Logger: logger.NewLogger(nil, false),
}
mockError := errors.New("could not sync")
tests := []struct {
name string
stream syncv1grpc.FlagSyncService_SyncFlagsClient
setup func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse)
want sync.Type
wantError error
ready bool
}{
{
name: "State All maps to Sync All",
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
metadata, err := structpb.NewStruct(map[string]any{"sources": "A,B,C"})
if err != nil {
t.Fatalf("Failed to create sync context: %v", err)
}
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
gomock.InOrder(
clientResponse.EXPECT().Recv().Return(
&v1.SyncFlagsResponse{
FlagConfiguration: "{}",
SyncContext: metadata,
},
nil,
),
clientResponse.EXPECT().Recv().Return(
nil, io.EOF,
),
)
},
want: sync.ALL,
ready: true,
},
{
name: "Error during flag sync",
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
clientResponse.EXPECT().Recv().Return(
nil,
mockError,
)
},
ready: true,
want: -1,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
syncChan := make(chan sync.DataSync, 1)
mockClient := grpcmock.NewMockFlagSyncServiceClient(ctrl)
mockClientResponse := grpcmock.NewMockFlagSyncServiceClientResponse(ctrl)
test.setup(t, mockClient, mockClientResponse)
waitChan := make(chan struct{})
go func() {
grpcSyncImpl.client = mockClient
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := grpcSyncImpl.Sync(ctx, syncChan)
if err != nil {
t.Errorf("Error handling flag sync: %v", err)
}
close(waitChan)
}()
<-waitChan
if test.want < 0 {
require.Empty(t, syncChan)
return
}
data := <-syncChan
if grpcSyncImpl.IsReady() != test.ready {
t.Errorf("expected grpcSyncImpl.ready to be: '%v', got: '%v'", test.ready, grpcSyncImpl.ready)
}
if data.Type != test.want {
t.Errorf("Returned data sync state = %v, wanted %v", data.Type, test.want)
}
})
}
}
func Test_StreamListener(t *testing.T) { func Test_StreamListener(t *testing.T) {
const target = "localBufCon" const target = "localBufCon"
@ -315,7 +216,6 @@ func Test_StreamListener(t *testing.T) {
{ {
FlagData: "{\"flags\": {}}", FlagData: "{\"flags\": {}}",
SyncContext: metadata, SyncContext: metadata,
Type: sync.ALL,
}, },
}, },
}, },
@ -331,14 +231,12 @@ func Test_StreamListener(t *testing.T) {
}, },
output: []sync.DataSync{ output: []sync.DataSync{
{ {
FlagData: "{}", FlagData: "{}",
SyncContext: metadata, SyncContext: metadata,
Type: sync.ALL,
}, },
{ {
FlagData: "{\"flags\": {}}", FlagData: "{\"flags\": {}}",
SyncContext: metadata, SyncContext: metadata,
Type: sync.ALL,
}, },
}, },
}, },
@ -391,10 +289,6 @@ func Test_StreamListener(t *testing.T) {
for _, expected := range test.output { for _, expected := range test.output {
out := <-syncChan out := <-syncChan
if expected.Type != out.Type {
t.Errorf("Returned sync type = %v, wanted %v", out.Type, expected.Type)
}
if expected.FlagData != out.FlagData { if expected.FlagData != out.FlagData {
t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData) t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData)
} }
@ -485,8 +379,7 @@ func Test_SyncRetry(t *testing.T) {
// Setup // Setup
target := "grpc://local" target := "grpc://local"
bufListener := bufconn.Listen(1) bufListener := bufconn.Listen(1)
emptyFlagData := "{}"
expectType := sync.ALL
// buffer based server. response ignored purposefully // buffer based server. response ignored purposefully
bServer := bufferedServer{listener: bufListener, mockResponses: []serverPayload{ bServer := bufferedServer{listener: bufListener, mockResponses: []serverPayload{
@ -540,7 +433,7 @@ func Test_SyncRetry(t *testing.T) {
t.Errorf("timeout waiting for conditions to fulfil") t.Errorf("timeout waiting for conditions to fulfil")
break break
case data := <-syncChan: case data := <-syncChan:
if data.Type != expectType { if data.FlagData != emptyFlagData {
t.Errorf("sync start error: %s", err.Error()) t.Errorf("sync start error: %s", err.Error())
} }
} }
@ -560,9 +453,9 @@ func Test_SyncRetry(t *testing.T) {
case <-tCtx.Done(): case <-tCtx.Done():
cancelFunc() cancelFunc()
t.Error("timeout waiting for conditions to fulfil") t.Error("timeout waiting for conditions to fulfil")
case rsp := <-syncChan: case data := <-syncChan:
if rsp.Type != expectType { if data.FlagData != emptyFlagData {
t.Errorf("expected response: %s, but got: %s", expectType, rsp.Type) t.Errorf("sync start error: %s", err.Error())
} }
} }
} }

View File

@ -46,7 +46,7 @@ func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
if err != nil { if err != nil {
return err return err
} }
dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI, Type: sync.ALL} dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI}
return nil return nil
} }
@ -89,10 +89,10 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
if hs.LastBodySHA == "" { if hs.LastBodySHA == "" {
hs.Logger.Debug("new configuration created") hs.Logger.Debug("new configuration created")
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI, Type: sync.ALL} dataSync <- sync.DataSync{FlagData: body, Source: hs.URI}
} else if hs.LastBodySHA != currentSHA { } else if hs.LastBodySHA != currentSHA {
hs.Logger.Debug("configuration modified") hs.Logger.Debug("configuration modified")
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI, Type: sync.ALL} dataSync <- sync.DataSync{FlagData: body, Source: hs.URI}
} }
hs.LastBodySHA = currentSHA hs.LastBodySHA = currentSHA
@ -100,7 +100,7 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
hs.Cron.Start() hs.Cron.Start()
dataSync <- sync.DataSync{FlagData: fetch, Source: hs.URI, Type: sync.ALL} dataSync <- sync.DataSync{FlagData: fetch, Source: hs.URI}
<-ctx.Done() <-ctx.Done()
hs.Cron.Stop() hs.Cron.Stop()

View File

@ -5,7 +5,6 @@ import (
"io" "io"
"log" "log"
"net/http" "net/http"
"reflect"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -289,6 +288,8 @@ func TestSync_Init(t *testing.T) {
func TestHTTPSync_Resync(t *testing.T) { func TestHTTPSync_Resync(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
source := "http://localhost"
emptyeFlagData := "{}"
tests := map[string]struct { tests := map[string]struct {
setup func(t *testing.T, client *syncmock.MockClient) setup func(t *testing.T, client *syncmock.MockClient)
@ -303,11 +304,11 @@ func TestHTTPSync_Resync(t *testing.T) {
setup: func(_ *testing.T, client *syncmock.MockClient) { setup: func(_ *testing.T, client *syncmock.MockClient) {
client.EXPECT().Do(gomock.Any()).Return(&http.Response{ client.EXPECT().Do(gomock.Any()).Return(&http.Response{
Header: map[string][]string{"Content-Type": {"application/json"}}, Header: map[string][]string{"Content-Type": {"application/json"}},
Body: io.NopCloser(strings.NewReader("")), Body: io.NopCloser(strings.NewReader(emptyeFlagData)),
StatusCode: http.StatusOK, StatusCode: http.StatusOK,
}, nil) }, nil)
}, },
uri: "http://localhost", uri: source,
handleResponse: func(t *testing.T, _ Sync, fetched string, err error) { handleResponse: func(t *testing.T, _ Sync, fetched string, err error) {
if err != nil { if err != nil {
t.Fatalf("fetch: %v", err) t.Fatalf("fetch: %v", err)
@ -320,9 +321,8 @@ func TestHTTPSync_Resync(t *testing.T) {
wantErr: false, wantErr: false,
wantNotifications: []sync.DataSync{ wantNotifications: []sync.DataSync{
{ {
Type: sync.ALL, FlagData: emptyeFlagData,
FlagData: "", Source: source,
Source: "",
}, },
}, },
}, },
@ -364,8 +364,8 @@ func TestHTTPSync_Resync(t *testing.T) {
for _, dataSync := range tt.wantNotifications { for _, dataSync := range tt.wantNotifications {
select { select {
case x := <-d: case x := <-d:
if !reflect.DeepEqual(x.String(), dataSync.String()) { if x.FlagData != dataSync.FlagData || x.Source != dataSync.Source {
t.Error("unexpected datasync received", x, dataSync) t.Errorf("unexpected datasync received %v vs %v", x, dataSync)
} }
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Error("expected datasync not received", dataSync) t.Error("expected datasync not received", dataSync)

View File

@ -6,35 +6,6 @@ import (
"google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/structpb"
) )
type Type int
// Type of the sync operation
const (
// ALL - All flags of sync provider. This is the default if unset due to primitive default
ALL Type = iota
// ADD - Additional flags from sync provider
ADD
// UPDATE - Update for flag(s) previously provided
UPDATE
// DELETE - Delete for flag(s) previously provided
DELETE
)
func (t Type) String() string {
switch t {
case ALL:
return "ALL"
case ADD:
return "ADD"
case UPDATE:
return "UPDATE"
case DELETE:
return "DELETE"
default:
return "UNKNOWN"
}
}
/* /*
ISync implementations watch for changes in the flag sources (HTTP backend, local file, K8s CRDs ...),fetch the latest ISync implementations watch for changes in the flag sources (HTTP backend, local file, K8s CRDs ...),fetch the latest
value and communicate to the Runtime with DataSync channel value and communicate to the Runtime with DataSync channel
@ -57,11 +28,10 @@ type ISync interface {
// DataSync is the data contract between Runtime and sync implementations // DataSync is the data contract between Runtime and sync implementations
type DataSync struct { type DataSync struct {
FlagData string FlagData string
SyncContext *structpb.Struct SyncContext *structpb.Struct
Source string Source string
Selector string Selector string
Type
} }
// SourceConfig is configuration option for flagd. This maps to startup parameter sources // SourceConfig is configuration option for flagd. This maps to startup parameter sources

View File

@ -57,7 +57,7 @@ func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
if err != nil { if err != nil {
return fmt.Errorf("unable to fetch flag configuration: %w", err) return fmt.Errorf("unable to fetch flag configuration: %w", err)
} }
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI, Type: sync.ALL} dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI}
return nil return nil
} }
@ -97,7 +97,7 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return err return err
} }
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI, Type: sync.ALL} dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI}
notifies := make(chan INotify) notifies := make(chan INotify)
@ -136,7 +136,7 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
continue continue
} }
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL} dataSync <- sync.DataSync{FlagData: msg, Source: k.URI}
case DefaultEventTypeModify: case DefaultEventTypeModify:
k.logger.Debug("Configuration modified") k.logger.Debug("Configuration modified")
msg, err := k.fetch(ctx) msg, err := k.fetch(ctx)
@ -145,7 +145,7 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
continue continue
} }
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL} dataSync <- sync.DataSync{FlagData: msg, Source: k.URI}
case DefaultEventTypeDelete: case DefaultEventTypeDelete:
k.logger.Debug("configuration deleted") k.logger.Debug("configuration deleted")
case DefaultEventTypeReady: case DefaultEventTypeReady:

View File

@ -607,6 +607,7 @@ func TestInit(t *testing.T) {
func TestSync_ReSync(t *testing.T) { func TestSync_ReSync(t *testing.T) {
const name = "myFF" const name = "myFF"
const ns = "myNS" const ns = "myNS"
const payload = "{\"flags\":null}"
s := runtime.NewScheme() s := runtime.NewScheme()
ff := &unstructured.Unstructured{} ff := &unstructured.Unstructured{}
ff.SetUnstructuredContent(getCFG(name, ns)) ff.SetUnstructuredContent(getCFG(name, ns))
@ -668,8 +669,8 @@ func TestSync_ReSync(t *testing.T) {
i := tt.countMsg i := tt.countMsg
for i > 0 { for i > 0 {
d := <-dataChannel d := <-dataChannel
if d.Type != sync.ALL { if d.FlagData != payload {
t.Errorf("Expected %v, got %v", sync.ALL, d) t.Errorf("Expected %v, got %v", payload, d.FlagData)
} }
i-- i--
} }

View File

@ -104,7 +104,6 @@ func (l *oldHandler) SyncFlags(
case d := <-dataSync: case d := <-dataSync:
if err := stream.Send(&syncv1.SyncFlagsResponse{ if err := stream.Send(&syncv1.SyncFlagsResponse{
FlagConfiguration: d.FlagData, FlagConfiguration: d.FlagData,
State: dataSyncToGrpcState(d),
}); err != nil { }); err != nil {
return fmt.Errorf("error sending configuration change event: %w", err) return fmt.Errorf("error sending configuration change event: %w", err)
} }
@ -115,8 +114,3 @@ func (l *oldHandler) SyncFlags(
} }
} }
} }
//nolint:staticcheck
func dataSyncToGrpcState(s sync.DataSync) syncv1.SyncState {
return syncv1.SyncState(s.Type + 1)
}

View File

@ -110,7 +110,6 @@ func Test_watchResource(t *testing.T) {
in := isync.DataSync{ in := isync.DataSync{
FlagData: "im a flag", FlagData: "im a flag",
Source: "im a flag source", Source: "im a flag source",
Type: isync.ALL,
} }
syncMock.dataSyncChanIn <- in syncMock.dataSyncChanIn <- in
@ -335,7 +334,6 @@ func Test_FetchAllFlags(t *testing.T) {
mockData: &isync.DataSync{ mockData: &isync.DataSync{
FlagData: "im a flag", FlagData: "im a flag",
Source: "im a flag source", Source: "im a flag source",
Type: isync.ALL,
}, },
setHandler: true, setHandler: true,
}, },
@ -402,7 +400,6 @@ func Test_registerSubscriptionResyncPath(t *testing.T) {
data: &isync.DataSync{ data: &isync.DataSync{
FlagData: "im a flag", FlagData: "im a flag",
Source: "im a flag source", Source: "im a flag source",
Type: isync.ALL,
}, },
expectErr: false, expectErr: false,
}, },