feat!: remove sync.Type (#1691)
The PR removes the obsolete `sync.Type`. This was intended to be used to support partial updates from sync sources, but it's never been used and it's lagging behind on features and would not work fully if implemented by a source (`metadata` is not properly implemented). Moreover, we're not convinced the value is worth the complexity it adds. Specifically: - removes `sync.Type` and all references - removes tests for `sync.Type` and updates assertions for unrelated tests which previously asserted on `sync.Type` (now we use the payload/source in assertions) - (unrelated) updates `CONTRIBUTING.md` to include a bunch of helpful manual testing steps Note that this is a breaking change, but only for the `flagd/core` library, NOT for flagd or any sync source itself in terms of their behavior. Since there was no change in `flagd/`, this will not show up in the CHANGELOG. Resolves: https://github.com/open-feature/flagd/issues/1678 Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
This commit is contained in:
parent
81855d76f9
commit
ac647e0656
|
|
@ -8,6 +8,24 @@ TLDR: be respectful.
|
|||
Any contributions are expected to include unit tests.
|
||||
These can be validated with `make test` or the automated github workflow will run them on PR creation.
|
||||
|
||||
## Development
|
||||
|
||||
### Prerequisites
|
||||
|
||||
You'll need:
|
||||
|
||||
- Go
|
||||
- make
|
||||
- docker
|
||||
|
||||
You'll want:
|
||||
|
||||
- curl (for calling HTTP endpoints)
|
||||
- [grpcurl](https://github.com/fullstorydev/grpcurl) (for making gRPC calls)
|
||||
- jq (for pretty printing responses)
|
||||
|
||||
### Workspace Initialization
|
||||
|
||||
This project uses a go workspace, to setup the project run
|
||||
|
||||
```shell
|
||||
|
|
@ -22,6 +40,63 @@ The project uses remote buf packages, changing the remote generation source will
|
|||
export GOPRIVATE=buf.build/gen/go
|
||||
```
|
||||
|
||||
### Manual testing
|
||||
|
||||
flagd has a number of interfaces (you can read more about than at [flagd.dev](https://flagd.dev/)) which can be used to evaluate flags, or deliver flag configurations so that they can be evaluated by _in-process_ providers.
|
||||
|
||||
You can manually test this functionality by starting flagd (from the flagd/ directory) with `go run main.go start -f file:../config/samples/example_flags.flagd.json`.
|
||||
|
||||
NOTE: you will need `go, curl`
|
||||
|
||||
#### Remote single flag evaluation via HTTP1.1/Connect
|
||||
|
||||
```sh
|
||||
# evaluates a single boolean flag
|
||||
curl -X POST -d '{"flagKey":"myBoolFlag","context":{}}' -H "Content-Type: application/json" "http://localhost:8013/flagd.evaluation.v1.Service/ResolveBoolean" | jq
|
||||
```
|
||||
|
||||
#### Remote single flag evaluation via HTTP1.1/OFREP
|
||||
|
||||
```sh
|
||||
# evaluates a single boolean flag
|
||||
curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags/myBoolFlag' | jq
|
||||
```
|
||||
|
||||
#### Remote single flag evaluation via gRPC
|
||||
|
||||
```sh
|
||||
# evaluates a single boolean flag
|
||||
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{"flagKey":"myBoolFlag"}' localhost:8013 flagd.evaluation.v1.Service/ResolveBoolean | jq
|
||||
```
|
||||
|
||||
#### Remote bulk evaluation via via HTTP1.1/OFREP
|
||||
|
||||
```sh
|
||||
# evaluates flags in bulk
|
||||
curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags' | jq
|
||||
```
|
||||
|
||||
#### Remote bulk evaluation via gRPC
|
||||
|
||||
```sh
|
||||
# evaluates flags in bulk
|
||||
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/ResolveAll | jq
|
||||
```
|
||||
|
||||
#### Flag configuration fetch via gRPC
|
||||
|
||||
```sh
|
||||
# sends back a representation of all flags
|
||||
grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/FetchAllFlags | jq
|
||||
```
|
||||
|
||||
#### Flag synchronization stream via gRPC
|
||||
|
||||
```sh
|
||||
# will open a persistent stream which sends flag changes when the watched source is modified
|
||||
grpcurl -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq
|
||||
```
|
||||
|
||||
## DCO Sign-Off
|
||||
|
||||
A DCO (Developer Certificate of Origin) sign-off is a line placed at the end of
|
||||
|
|
|
|||
|
|
@ -103,8 +103,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
|
|||
_, span := je.jsonEvalTracer.Start(
|
||||
context.Background(),
|
||||
"flagSync",
|
||||
trace.WithAttributes(attribute.String("feature_flag.source", payload.Source)),
|
||||
trace.WithAttributes(attribute.String("feature_flag.sync_type", payload.String())))
|
||||
trace.WithAttributes(attribute.String("feature_flag.source", payload.Source)))
|
||||
defer span.End()
|
||||
|
||||
var definition Definition
|
||||
|
|
@ -119,19 +118,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
|
|||
var events map[string]interface{}
|
||||
var reSync bool
|
||||
|
||||
// TODO: We do not handle metadata in ADD/UPDATE operations. These are only relevant for grpc sync implementations.
|
||||
switch payload.Type {
|
||||
case sync.ALL:
|
||||
events, reSync = je.store.Merge(je.Logger, payload.Source, payload.Selector, definition.Flags, definition.Metadata)
|
||||
case sync.ADD:
|
||||
events = je.store.Add(je.Logger, payload.Source, payload.Selector, definition.Flags)
|
||||
case sync.UPDATE:
|
||||
events = je.store.Update(je.Logger, payload.Source, payload.Selector, definition.Flags)
|
||||
case sync.DELETE:
|
||||
events = je.store.DeleteFlags(je.Logger, payload.Source, definition.Flags)
|
||||
default:
|
||||
return nil, false, fmt.Errorf("unsupported sync type: %d", payload.Type)
|
||||
}
|
||||
events, reSync = je.store.Update(je.Logger, payload.Source, payload.Selector, definition.Flags, definition.Metadata)
|
||||
|
||||
// Number of events correlates to the number of flags changed through this sync, record it
|
||||
span.SetAttributes(attribute.Int("feature_flag.change_count", len(events)))
|
||||
|
|
|
|||
|
|
@ -127,7 +127,7 @@ const UndefinedDefaultWithTargetting = `{
|
|||
const (
|
||||
FlagSetID = "testSetId"
|
||||
Version = "v33"
|
||||
ValidFlag = "validFlag"
|
||||
ValidFlag = "validFlag"
|
||||
MissingFlag = "missingFlag"
|
||||
StaticBoolFlag = "staticBoolFlag"
|
||||
StaticBoolValue = true
|
||||
|
|
@ -956,7 +956,7 @@ func TestResolveAsAnyValue(t *testing.T) {
|
|||
|
||||
func TestResolve_DefaultVariant(t *testing.T) {
|
||||
tests := []struct {
|
||||
flags string
|
||||
flags string
|
||||
flagKey string
|
||||
context map[string]interface{}
|
||||
reason string
|
||||
|
|
@ -972,13 +972,13 @@ func TestResolve_DefaultVariant(t *testing.T) {
|
|||
t.Run("", func(t *testing.T) {
|
||||
evaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
_, _, err := evaluator.SetState(sync.DataSync{FlagData: test.flags})
|
||||
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error")
|
||||
}
|
||||
|
||||
anyResult := evaluator.ResolveAsAnyValue(context.TODO(), "", test.flagKey, test.context)
|
||||
|
||||
|
||||
assert.Equal(t, model.ErrorReason, anyResult.Reason)
|
||||
assert.EqualError(t, anyResult.Error, test.errorCode)
|
||||
})
|
||||
|
|
@ -1050,7 +1050,6 @@ func TestSetState_DefaultVariantValidation(t *testing.T) {
|
|||
func TestState_Evaluator(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
inputState string
|
||||
inputSyncType sync.Type
|
||||
expectedOutputState string
|
||||
expectedError bool
|
||||
expectedResync bool
|
||||
|
|
@ -1086,7 +1085,6 @@ func TestState_Evaluator(t *testing.T) {
|
|||
}
|
||||
}
|
||||
`,
|
||||
inputSyncType: sync.ALL,
|
||||
expectedOutputState: `
|
||||
{
|
||||
"flags": {
|
||||
|
|
@ -1147,7 +1145,6 @@ func TestState_Evaluator(t *testing.T) {
|
|||
}
|
||||
}
|
||||
`,
|
||||
inputSyncType: sync.ALL,
|
||||
expectedOutputState: `
|
||||
{
|
||||
"flags": {
|
||||
|
|
@ -1204,7 +1201,6 @@ func TestState_Evaluator(t *testing.T) {
|
|||
}
|
||||
}
|
||||
`,
|
||||
inputSyncType: sync.ALL,
|
||||
expectedError: true,
|
||||
},
|
||||
"invalid targeting": {
|
||||
|
|
@ -1258,7 +1254,6 @@ func TestState_Evaluator(t *testing.T) {
|
|||
"flagSources":null
|
||||
}
|
||||
`,
|
||||
inputSyncType: sync.ALL,
|
||||
expectedError: false,
|
||||
expectedOutputState: `
|
||||
{
|
||||
|
|
@ -1341,40 +1336,8 @@ func TestState_Evaluator(t *testing.T) {
|
|||
}
|
||||
}
|
||||
`,
|
||||
inputSyncType: sync.ALL,
|
||||
expectedError: true,
|
||||
},
|
||||
"unexpected sync type": {
|
||||
inputState: `
|
||||
{
|
||||
"flags": {
|
||||
"fibAlgo": {
|
||||
"variants": {
|
||||
"recursive": "recursive",
|
||||
"memo": "memo",
|
||||
"loop": "loop",
|
||||
"binet": "binet"
|
||||
},
|
||||
"defaultVariant": "recursive",
|
||||
"state": "ENABLED",
|
||||
"targeting": {
|
||||
"if": [
|
||||
{
|
||||
"$ref": "emailWithFaas"
|
||||
}, "binet", null
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"$evaluators": {
|
||||
"emailWithFaas": ""
|
||||
}
|
||||
}
|
||||
`,
|
||||
inputSyncType: 999,
|
||||
expectedError: true,
|
||||
expectedResync: false,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tt := range tests {
|
||||
|
|
@ -1423,11 +1386,9 @@ func TestState_Evaluator(t *testing.T) {
|
|||
|
||||
func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
dataSyncType sync.Type
|
||||
flagResolution func(evaluator *evaluator.JSON) error
|
||||
}{
|
||||
"Add_ResolveAllValues": {
|
||||
dataSyncType: sync.ADD,
|
||||
flagResolution: func(evaluator *evaluator.JSON) error {
|
||||
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
|
||||
if err != nil {
|
||||
|
|
@ -1437,7 +1398,6 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"Update_ResolveAllValues": {
|
||||
dataSyncType: sync.UPDATE,
|
||||
flagResolution: func(evaluator *evaluator.JSON) error {
|
||||
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
|
||||
if err != nil {
|
||||
|
|
@ -1447,7 +1407,6 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"Delete_ResolveAllValues": {
|
||||
dataSyncType: sync.DELETE,
|
||||
flagResolution: func(evaluator *evaluator.JSON) error {
|
||||
_, _, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
|
||||
if err != nil {
|
||||
|
|
@ -1457,35 +1416,30 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"Add_ResolveBooleanValue": {
|
||||
dataSyncType: sync.ADD,
|
||||
flagResolution: func(evaluator *evaluator.JSON) error {
|
||||
_, _, _, _, err := evaluator.ResolveBooleanValue(context.TODO(), "", StaticBoolFlag, nil)
|
||||
return err
|
||||
},
|
||||
},
|
||||
"Update_ResolveStringValue": {
|
||||
dataSyncType: sync.UPDATE,
|
||||
flagResolution: func(evaluator *evaluator.JSON) error {
|
||||
_, _, _, _, err := evaluator.ResolveBooleanValue(context.TODO(), "", StaticStringValue, nil)
|
||||
return err
|
||||
},
|
||||
},
|
||||
"Delete_ResolveIntValue": {
|
||||
dataSyncType: sync.DELETE,
|
||||
flagResolution: func(evaluator *evaluator.JSON) error {
|
||||
_, _, _, _, err := evaluator.ResolveIntValue(context.TODO(), "", StaticIntFlag, nil)
|
||||
return err
|
||||
},
|
||||
},
|
||||
"Add_ResolveFloatValue": {
|
||||
dataSyncType: sync.ADD,
|
||||
flagResolution: func(evaluator *evaluator.JSON) error {
|
||||
_, _, _, _, err := evaluator.ResolveFloatValue(context.TODO(), "", StaticFloatFlag, nil)
|
||||
return err
|
||||
},
|
||||
},
|
||||
"Update_ResolveObjectValue": {
|
||||
dataSyncType: sync.UPDATE,
|
||||
flagResolution: func(evaluator *evaluator.JSON) error {
|
||||
_, _, _, _, err := evaluator.ResolveObjectValue(context.TODO(), "", StaticObjectFlag, nil)
|
||||
return err
|
||||
|
|
@ -1497,7 +1451,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
|
|||
t.Run(name, func(t *testing.T) {
|
||||
jsonEvaluator := evaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags())
|
||||
|
||||
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Type: sync.ADD})
|
||||
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
@ -1520,7 +1474,7 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
|
|||
errChan <- nil
|
||||
return
|
||||
default:
|
||||
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags, Type: tt.dataSyncType})
|
||||
_, _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: Flags})
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
|
|
|
|||
|
|
@ -146,112 +146,8 @@ func (f *State) Add(logger *logger.Logger, source string, selector string, flags
|
|||
return notifications
|
||||
}
|
||||
|
||||
// Update existing flags from source.
|
||||
func (f *State) Update(logger *logger.Logger, source string, selector string, flags map[string]model.Flag,
|
||||
) map[string]interface{} {
|
||||
notifications := map[string]interface{}{}
|
||||
|
||||
for k, flag := range flags {
|
||||
storedFlag, _, ok := f.Get(context.Background(), k)
|
||||
if !ok {
|
||||
logger.Warn(
|
||||
fmt.Sprintf("failed to update the flag, flag with key %s from source %s does not exist.",
|
||||
k,
|
||||
source))
|
||||
|
||||
continue
|
||||
}
|
||||
if !f.hasPriority(storedFlag.Source, source) {
|
||||
logger.Debug(
|
||||
fmt.Sprintf(
|
||||
"not updating: flag %s from source %s does not have priority over %s",
|
||||
k,
|
||||
source,
|
||||
storedFlag.Source,
|
||||
),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
notifications[k] = map[string]interface{}{
|
||||
"type": string(model.NotificationUpdate),
|
||||
"source": source,
|
||||
}
|
||||
|
||||
flag.Source = source
|
||||
flag.Selector = selector
|
||||
f.Set(k, flag)
|
||||
}
|
||||
|
||||
return notifications
|
||||
}
|
||||
|
||||
// DeleteFlags matching flags from source.
|
||||
func (f *State) DeleteFlags(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
|
||||
logger.Debug(
|
||||
fmt.Sprintf(
|
||||
"store resync triggered: delete event from source %s",
|
||||
source,
|
||||
),
|
||||
)
|
||||
ctx := context.Background()
|
||||
|
||||
_, ok := f.MetadataPerSource[source]
|
||||
if ok {
|
||||
delete(f.MetadataPerSource, source)
|
||||
}
|
||||
|
||||
notifications := map[string]interface{}{}
|
||||
if len(flags) == 0 {
|
||||
allFlags, _, err := f.GetAll(ctx)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("error while retrieving flags from the store: %v", err))
|
||||
return notifications
|
||||
}
|
||||
for key, flag := range allFlags {
|
||||
if flag.Source != source {
|
||||
continue
|
||||
}
|
||||
notifications[key] = map[string]interface{}{
|
||||
"type": string(model.NotificationDelete),
|
||||
"source": source,
|
||||
}
|
||||
f.Delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
for k := range flags {
|
||||
flag, _, ok := f.Get(ctx, k)
|
||||
if ok {
|
||||
if !f.hasPriority(flag.Source, source) {
|
||||
logger.Debug(
|
||||
fmt.Sprintf(
|
||||
"not deleting: flag %s from source %s cannot be deleted by %s",
|
||||
k,
|
||||
flag.Source,
|
||||
source,
|
||||
),
|
||||
)
|
||||
continue
|
||||
}
|
||||
notifications[k] = map[string]interface{}{
|
||||
"type": string(model.NotificationDelete),
|
||||
"source": source,
|
||||
}
|
||||
f.Delete(k)
|
||||
} else {
|
||||
logger.Warn(
|
||||
fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exist.",
|
||||
k,
|
||||
source))
|
||||
}
|
||||
}
|
||||
return notifications
|
||||
}
|
||||
|
||||
// Merge provided flags from source with currently stored flags.
|
||||
// nolint: funlen
|
||||
func (f *State) Merge(
|
||||
// Update the flag state with the provided flags.
|
||||
func (f *State) Update(
|
||||
logger *logger.Logger,
|
||||
source string,
|
||||
selector string,
|
||||
|
|
|
|||
|
|
@ -219,7 +219,7 @@ func TestMergeFlags(t *testing.T) {
|
|||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
gotNotifs, resyncRequired := tt.current.Merge(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new, model.Metadata{})
|
||||
gotNotifs, resyncRequired := tt.current.Update(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new, model.Metadata{})
|
||||
|
||||
require.True(t, reflect.DeepEqual(tt.want.Flags, tt.current.Flags))
|
||||
require.Equal(t, tt.wantNotifs, gotNotifs)
|
||||
|
|
@ -324,222 +324,3 @@ func TestFlags_Add(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlags_Update(t *testing.T) {
|
||||
mockLogger := logger.NewLogger(nil, false)
|
||||
mockSource := "source"
|
||||
mockOverrideSource := "source-2"
|
||||
|
||||
type request struct {
|
||||
source string
|
||||
selector string
|
||||
flags map[string]model.Flag
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
storedState *State
|
||||
UpdateRequest request
|
||||
expectedState *State
|
||||
expectedNotificationKeys []string
|
||||
}{
|
||||
{
|
||||
name: "Update success",
|
||||
storedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource, DefaultVariant: "True"},
|
||||
},
|
||||
},
|
||||
UpdateRequest: request{
|
||||
source: mockSource,
|
||||
flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource, DefaultVariant: "False"},
|
||||
},
|
||||
},
|
||||
expectedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource, DefaultVariant: "False"},
|
||||
},
|
||||
},
|
||||
expectedNotificationKeys: []string{"A"},
|
||||
},
|
||||
{
|
||||
name: "Update multiple success",
|
||||
storedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource, DefaultVariant: "True"},
|
||||
"B": {Source: mockSource, DefaultVariant: "True"},
|
||||
},
|
||||
},
|
||||
UpdateRequest: request{
|
||||
source: mockSource,
|
||||
flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource, DefaultVariant: "False"},
|
||||
"B": {Source: mockSource, DefaultVariant: "False"},
|
||||
},
|
||||
},
|
||||
expectedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource, DefaultVariant: "False"},
|
||||
"B": {Source: mockSource, DefaultVariant: "False"},
|
||||
},
|
||||
},
|
||||
expectedNotificationKeys: []string{"A", "B"},
|
||||
},
|
||||
{
|
||||
name: "Update success - conflict and override",
|
||||
storedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource, DefaultVariant: "True"},
|
||||
},
|
||||
},
|
||||
UpdateRequest: request{
|
||||
source: mockOverrideSource,
|
||||
flags: map[string]model.Flag{
|
||||
"A": {Source: mockOverrideSource, DefaultVariant: "True"},
|
||||
},
|
||||
},
|
||||
expectedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockOverrideSource, DefaultVariant: "True"},
|
||||
},
|
||||
},
|
||||
expectedNotificationKeys: []string{"A"},
|
||||
},
|
||||
{
|
||||
name: "Update fail",
|
||||
storedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
},
|
||||
},
|
||||
UpdateRequest: request{
|
||||
source: mockSource,
|
||||
flags: map[string]model.Flag{
|
||||
"B": {Source: mockSource},
|
||||
},
|
||||
},
|
||||
expectedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
},
|
||||
},
|
||||
expectedNotificationKeys: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
messages := tt.storedState.Update(mockLogger, tt.UpdateRequest.source,
|
||||
tt.UpdateRequest.selector, tt.UpdateRequest.flags)
|
||||
|
||||
require.Equal(t, tt.storedState, tt.expectedState)
|
||||
|
||||
for k := range messages {
|
||||
require.Containsf(t, tt.expectedNotificationKeys, k,
|
||||
"Message key %s not present in the expected key list", k)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlags_Delete(t *testing.T) {
|
||||
mockLogger := logger.NewLogger(nil, false)
|
||||
mockSource := "source"
|
||||
mockSource2 := "source2"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
storedState *State
|
||||
deleteRequest map[string]model.Flag
|
||||
expectedState *State
|
||||
expectedNotificationKeys []string
|
||||
}{
|
||||
{
|
||||
name: "Remove success",
|
||||
storedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
"B": {Source: mockSource},
|
||||
"C": {Source: mockSource2},
|
||||
},
|
||||
FlagSources: []string{
|
||||
mockSource,
|
||||
mockSource2,
|
||||
},
|
||||
},
|
||||
deleteRequest: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
},
|
||||
expectedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"B": {Source: mockSource},
|
||||
"C": {Source: mockSource2},
|
||||
},
|
||||
FlagSources: []string{
|
||||
mockSource,
|
||||
mockSource2,
|
||||
},
|
||||
},
|
||||
expectedNotificationKeys: []string{"A"},
|
||||
},
|
||||
{
|
||||
name: "Nothing to remove",
|
||||
storedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
"B": {Source: mockSource},
|
||||
"C": {Source: mockSource2},
|
||||
},
|
||||
FlagSources: []string{
|
||||
mockSource,
|
||||
mockSource2,
|
||||
},
|
||||
},
|
||||
deleteRequest: map[string]model.Flag{
|
||||
"C": {Source: mockSource},
|
||||
},
|
||||
expectedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
"B": {Source: mockSource},
|
||||
"C": {Source: mockSource2},
|
||||
},
|
||||
FlagSources: []string{
|
||||
mockSource,
|
||||
mockSource2,
|
||||
},
|
||||
},
|
||||
expectedNotificationKeys: []string{},
|
||||
},
|
||||
{
|
||||
name: "Remove all",
|
||||
storedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"A": {Source: mockSource},
|
||||
"B": {Source: mockSource},
|
||||
"C": {Source: mockSource2},
|
||||
},
|
||||
},
|
||||
deleteRequest: map[string]model.Flag{},
|
||||
expectedState: &State{
|
||||
Flags: map[string]model.Flag{
|
||||
"C": {Source: mockSource2},
|
||||
},
|
||||
},
|
||||
expectedNotificationKeys: []string{"A", "B"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
messages := tt.storedState.DeleteFlags(mockLogger, mockSource, tt.deleteRequest)
|
||||
|
||||
require.Equal(t, tt.storedState, tt.expectedState)
|
||||
|
||||
for k := range messages {
|
||||
require.Containsf(t, tt.expectedNotificationKeys, k,
|
||||
"Message key %s not present in the expected key list", k)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ func (hs *Sync) sync(ctx context.Context, dataSync chan<- sync.DataSync, skipChe
|
|||
if !skipCheckingModTime {
|
||||
hs.lastUpdated = updated
|
||||
}
|
||||
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL}
|
||||
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ func NewFileSync(uri string, watchType string, logger *logger.Logger) *Sync {
|
|||
const defaultState = "{}"
|
||||
|
||||
func (fs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
||||
fs.sendDataSync(ctx, sync.ALL, dataSync)
|
||||
fs.sendDataSync(ctx, dataSync)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -94,7 +94,7 @@ func (fs *Sync) setReady(val bool) {
|
|||
//nolint:funlen
|
||||
func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
||||
defer fs.watcher.Close()
|
||||
fs.sendDataSync(ctx, sync.ALL, dataSync)
|
||||
fs.sendDataSync(ctx, dataSync)
|
||||
fs.setReady(true)
|
||||
fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI))
|
||||
for {
|
||||
|
|
@ -108,7 +108,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
|||
fs.Logger.Info(fmt.Sprintf("filepath event: %s %s", event.Name, event.Op.String()))
|
||||
switch {
|
||||
case event.Has(fsnotify.Create) || event.Has(fsnotify.Write):
|
||||
fs.sendDataSync(ctx, sync.ALL, dataSync)
|
||||
fs.sendDataSync(ctx, dataSync)
|
||||
case event.Has(fsnotify.Remove):
|
||||
// K8s exposes config maps as symlinks.
|
||||
// Updates cause a remove event, we need to re-add the watcher in this case.
|
||||
|
|
@ -116,20 +116,20 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
|||
if err != nil {
|
||||
// the watcher could not be re-added, so the file must have been deleted
|
||||
fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error()))
|
||||
fs.sendDataSync(ctx, sync.DELETE, dataSync)
|
||||
fs.sendDataSync(ctx, dataSync)
|
||||
continue
|
||||
}
|
||||
|
||||
// Counterintuitively, remove events are the only meaningful ones seen in K8s.
|
||||
// K8s handles mounted ConfigMap updates by modifying symbolic links, which is an atomic operation.
|
||||
// At the point the remove event is fired, we have our new data, so we can send it down the channel.
|
||||
fs.sendDataSync(ctx, sync.ALL, dataSync)
|
||||
fs.sendDataSync(ctx, dataSync)
|
||||
case event.Has(fsnotify.Chmod):
|
||||
// on linux the REMOVE event will not fire until all file descriptors are closed, this cannot happen
|
||||
// while the file is being watched, os.Stat is used here to infer deletion
|
||||
if _, err := os.Stat(fs.URI); errors.Is(err, os.ErrNotExist) {
|
||||
fs.Logger.Error(fmt.Sprintf("file has been deleted: %s", err.Error()))
|
||||
fs.sendDataSync(ctx, sync.DELETE, dataSync)
|
||||
fs.sendDataSync(ctx, dataSync)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -147,14 +147,8 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (fs *Sync) sendDataSync(ctx context.Context, syncType sync.Type, dataSync chan<- sync.DataSync) {
|
||||
fs.Logger.Debug(fmt.Sprintf("Configuration %s: %s", fs.URI, syncType.String()))
|
||||
|
||||
if syncType == sync.DELETE {
|
||||
// Skip fetching and emit default state to avoid EOF errors
|
||||
dataSync <- sync.DataSync{FlagData: defaultState, Source: fs.URI, Type: syncType}
|
||||
return
|
||||
}
|
||||
func (fs *Sync) sendDataSync(ctx context.Context, dataSync chan<- sync.DataSync) {
|
||||
fs.Logger.Debug(fmt.Sprintf("Data sync received for %s", fs.URI))
|
||||
|
||||
msg := defaultState
|
||||
m, err := fs.fetch(ctx)
|
||||
|
|
@ -167,7 +161,7 @@ func (fs *Sync) sendDataSync(ctx context.Context, syncType sync.Type, dataSync c
|
|||
msg = m
|
||||
}
|
||||
|
||||
dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI, Type: syncType}
|
||||
dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI}
|
||||
}
|
||||
|
||||
func (fs *Sync) fetch(_ context.Context) (string, error) {
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ func TestSimpleReSync(t *testing.T) {
|
|||
expectedDataSync := sync.DataSync{
|
||||
FlagData: "hello",
|
||||
Source: source,
|
||||
Type: sync.ALL,
|
||||
}
|
||||
handler := Sync{
|
||||
URI: source,
|
||||
|
|
@ -59,7 +58,6 @@ func TestSimpleReSync(t *testing.T) {
|
|||
func TestSimpleSync(t *testing.T) {
|
||||
readDirName := t.TempDir()
|
||||
updateDirName := t.TempDir()
|
||||
deleteDirName := t.TempDir()
|
||||
tests := map[string]struct {
|
||||
manipulationFuncs []func(t *testing.T)
|
||||
expectedDataSync []sync.DataSync
|
||||
|
|
@ -76,7 +74,6 @@ func TestSimpleSync(t *testing.T) {
|
|||
{
|
||||
FlagData: fetchFileContents,
|
||||
Source: fmt.Sprintf("%s/%s", readDirName, fetchFileName),
|
||||
Type: sync.ALL,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
@ -94,35 +91,10 @@ func TestSimpleSync(t *testing.T) {
|
|||
{
|
||||
FlagData: fetchFileContents,
|
||||
Source: fmt.Sprintf("%s/%s", updateDirName, fetchFileName),
|
||||
Type: sync.ALL,
|
||||
},
|
||||
{
|
||||
FlagData: "new content",
|
||||
Source: fmt.Sprintf("%s/%s", updateDirName, fetchFileName),
|
||||
Type: sync.ALL,
|
||||
},
|
||||
},
|
||||
},
|
||||
"delete-event": {
|
||||
fetchDirName: deleteDirName,
|
||||
manipulationFuncs: []func(t *testing.T){
|
||||
func(t *testing.T) {
|
||||
writeToFile(t, deleteDirName, fetchFileContents)
|
||||
},
|
||||
func(t *testing.T) {
|
||||
deleteFile(t, deleteDirName, fetchFileName)
|
||||
},
|
||||
},
|
||||
expectedDataSync: []sync.DataSync{
|
||||
{
|
||||
FlagData: fetchFileContents,
|
||||
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
|
||||
Type: sync.ALL,
|
||||
},
|
||||
{
|
||||
FlagData: defaultState,
|
||||
Source: fmt.Sprintf("%s/%s", deleteDirName, fetchFileName),
|
||||
Type: sync.DELETE,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
@ -172,9 +144,6 @@ func TestSimpleSync(t *testing.T) {
|
|||
if data.Source != syncEvent.Source {
|
||||
t.Errorf("expected source: %s, but received source: %s", syncEvent.Source, data.Source)
|
||||
}
|
||||
if data.Type != syncEvent.Type {
|
||||
t.Errorf("expected type: %b, but received type: %b", syncEvent.Type, data.Type)
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Errorf("event not found, timeout out after 10 seconds")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -106,7 +106,6 @@ func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
|
|||
dataSync <- sync.DataSync{
|
||||
FlagData: res.GetFlagConfiguration(),
|
||||
Source: g.URI,
|
||||
Type: sync.ALL,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -200,11 +199,10 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
|
|||
}
|
||||
|
||||
dataSync <- sync.DataSync{
|
||||
FlagData: data.FlagConfiguration,
|
||||
FlagData: data.FlagConfiguration,
|
||||
SyncContext: data.SyncContext,
|
||||
Source: g.URI,
|
||||
Selector: g.Selector,
|
||||
Type: sync.ALL,
|
||||
Source: g.URI,
|
||||
Selector: g.Selector,
|
||||
}
|
||||
|
||||
g.Logger.Debug("received full configuration payload")
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/open-feature/flagd/core/pkg/logger"
|
||||
"github.com/open-feature/flagd/core/pkg/sync"
|
||||
credendialsmock "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials/mock"
|
||||
grpcmock "github.com/open-feature/flagd/core/pkg/sync/grpc/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
"go.uber.org/zap"
|
||||
|
|
@ -124,7 +123,6 @@ func Test_ReSyncTests(t *testing.T) {
|
|||
notifications: []sync.DataSync{
|
||||
{
|
||||
FlagData: "success",
|
||||
Type: sync.ALL,
|
||||
},
|
||||
},
|
||||
shouldError: false,
|
||||
|
|
@ -181,9 +179,6 @@ func Test_ReSyncTests(t *testing.T) {
|
|||
|
||||
for _, expected := range test.notifications {
|
||||
out := <-syncChan
|
||||
if expected.Type != out.Type {
|
||||
t.Errorf("Returned sync type = %v, wanted %v", out.Type, expected.Type)
|
||||
}
|
||||
|
||||
if expected.FlagData != out.FlagData {
|
||||
t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData)
|
||||
|
|
@ -197,100 +192,6 @@ func Test_ReSyncTests(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSync_BasicFlagSyncStates(t *testing.T) {
|
||||
grpcSyncImpl := Sync{
|
||||
URI: "grpc://test",
|
||||
ProviderID: "",
|
||||
Logger: logger.NewLogger(nil, false),
|
||||
}
|
||||
|
||||
mockError := errors.New("could not sync")
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
stream syncv1grpc.FlagSyncService_SyncFlagsClient
|
||||
setup func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse)
|
||||
want sync.Type
|
||||
wantError error
|
||||
ready bool
|
||||
}{
|
||||
{
|
||||
name: "State All maps to Sync All",
|
||||
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
|
||||
metadata, err := structpb.NewStruct(map[string]any{"sources": "A,B,C"})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create sync context: %v", err)
|
||||
}
|
||||
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
|
||||
gomock.InOrder(
|
||||
clientResponse.EXPECT().Recv().Return(
|
||||
&v1.SyncFlagsResponse{
|
||||
FlagConfiguration: "{}",
|
||||
SyncContext: metadata,
|
||||
},
|
||||
nil,
|
||||
),
|
||||
clientResponse.EXPECT().Recv().Return(
|
||||
nil, io.EOF,
|
||||
),
|
||||
)
|
||||
},
|
||||
want: sync.ALL,
|
||||
ready: true,
|
||||
},
|
||||
{
|
||||
name: "Error during flag sync",
|
||||
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
|
||||
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
|
||||
clientResponse.EXPECT().Recv().Return(
|
||||
nil,
|
||||
mockError,
|
||||
)
|
||||
},
|
||||
ready: true,
|
||||
want: -1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
syncChan := make(chan sync.DataSync, 1)
|
||||
|
||||
mockClient := grpcmock.NewMockFlagSyncServiceClient(ctrl)
|
||||
mockClientResponse := grpcmock.NewMockFlagSyncServiceClientResponse(ctrl)
|
||||
test.setup(t, mockClient, mockClientResponse)
|
||||
|
||||
waitChan := make(chan struct{})
|
||||
go func() {
|
||||
grpcSyncImpl.client = mockClient
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
err := grpcSyncImpl.Sync(ctx, syncChan)
|
||||
if err != nil {
|
||||
t.Errorf("Error handling flag sync: %v", err)
|
||||
}
|
||||
close(waitChan)
|
||||
}()
|
||||
<-waitChan
|
||||
|
||||
if test.want < 0 {
|
||||
require.Empty(t, syncChan)
|
||||
return
|
||||
}
|
||||
data := <-syncChan
|
||||
|
||||
if grpcSyncImpl.IsReady() != test.ready {
|
||||
t.Errorf("expected grpcSyncImpl.ready to be: '%v', got: '%v'", test.ready, grpcSyncImpl.ready)
|
||||
}
|
||||
|
||||
if data.Type != test.want {
|
||||
t.Errorf("Returned data sync state = %v, wanted %v", data.Type, test.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_StreamListener(t *testing.T) {
|
||||
const target = "localBufCon"
|
||||
|
||||
|
|
@ -315,7 +216,6 @@ func Test_StreamListener(t *testing.T) {
|
|||
{
|
||||
FlagData: "{\"flags\": {}}",
|
||||
SyncContext: metadata,
|
||||
Type: sync.ALL,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
@ -331,14 +231,12 @@ func Test_StreamListener(t *testing.T) {
|
|||
},
|
||||
output: []sync.DataSync{
|
||||
{
|
||||
FlagData: "{}",
|
||||
FlagData: "{}",
|
||||
SyncContext: metadata,
|
||||
Type: sync.ALL,
|
||||
},
|
||||
{
|
||||
FlagData: "{\"flags\": {}}",
|
||||
SyncContext: metadata,
|
||||
Type: sync.ALL,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
@ -391,10 +289,6 @@ func Test_StreamListener(t *testing.T) {
|
|||
for _, expected := range test.output {
|
||||
out := <-syncChan
|
||||
|
||||
if expected.Type != out.Type {
|
||||
t.Errorf("Returned sync type = %v, wanted %v", out.Type, expected.Type)
|
||||
}
|
||||
|
||||
if expected.FlagData != out.FlagData {
|
||||
t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData)
|
||||
}
|
||||
|
|
@ -485,8 +379,7 @@ func Test_SyncRetry(t *testing.T) {
|
|||
// Setup
|
||||
target := "grpc://local"
|
||||
bufListener := bufconn.Listen(1)
|
||||
|
||||
expectType := sync.ALL
|
||||
emptyFlagData := "{}"
|
||||
|
||||
// buffer based server. response ignored purposefully
|
||||
bServer := bufferedServer{listener: bufListener, mockResponses: []serverPayload{
|
||||
|
|
@ -540,7 +433,7 @@ func Test_SyncRetry(t *testing.T) {
|
|||
t.Errorf("timeout waiting for conditions to fulfil")
|
||||
break
|
||||
case data := <-syncChan:
|
||||
if data.Type != expectType {
|
||||
if data.FlagData != emptyFlagData {
|
||||
t.Errorf("sync start error: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
|
@ -560,9 +453,9 @@ func Test_SyncRetry(t *testing.T) {
|
|||
case <-tCtx.Done():
|
||||
cancelFunc()
|
||||
t.Error("timeout waiting for conditions to fulfil")
|
||||
case rsp := <-syncChan:
|
||||
if rsp.Type != expectType {
|
||||
t.Errorf("expected response: %s, but got: %s", expectType, rsp.Type)
|
||||
case data := <-syncChan:
|
||||
if data.FlagData != emptyFlagData {
|
||||
t.Errorf("sync start error: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI, Type: sync.ALL}
|
||||
dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -89,10 +89,10 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
|||
|
||||
if hs.LastBodySHA == "" {
|
||||
hs.Logger.Debug("new configuration created")
|
||||
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI, Type: sync.ALL}
|
||||
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI}
|
||||
} else if hs.LastBodySHA != currentSHA {
|
||||
hs.Logger.Debug("configuration modified")
|
||||
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI, Type: sync.ALL}
|
||||
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI}
|
||||
}
|
||||
|
||||
hs.LastBodySHA = currentSHA
|
||||
|
|
@ -100,7 +100,7 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
|||
|
||||
hs.Cron.Start()
|
||||
|
||||
dataSync <- sync.DataSync{FlagData: fetch, Source: hs.URI, Type: sync.ALL}
|
||||
dataSync <- sync.DataSync{FlagData: fetch, Source: hs.URI}
|
||||
|
||||
<-ctx.Done()
|
||||
hs.Cron.Stop()
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import (
|
|||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -289,6 +288,8 @@ func TestSync_Init(t *testing.T) {
|
|||
|
||||
func TestHTTPSync_Resync(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
source := "http://localhost"
|
||||
emptyeFlagData := "{}"
|
||||
|
||||
tests := map[string]struct {
|
||||
setup func(t *testing.T, client *syncmock.MockClient)
|
||||
|
|
@ -303,11 +304,11 @@ func TestHTTPSync_Resync(t *testing.T) {
|
|||
setup: func(_ *testing.T, client *syncmock.MockClient) {
|
||||
client.EXPECT().Do(gomock.Any()).Return(&http.Response{
|
||||
Header: map[string][]string{"Content-Type": {"application/json"}},
|
||||
Body: io.NopCloser(strings.NewReader("")),
|
||||
Body: io.NopCloser(strings.NewReader(emptyeFlagData)),
|
||||
StatusCode: http.StatusOK,
|
||||
}, nil)
|
||||
},
|
||||
uri: "http://localhost",
|
||||
uri: source,
|
||||
handleResponse: func(t *testing.T, _ Sync, fetched string, err error) {
|
||||
if err != nil {
|
||||
t.Fatalf("fetch: %v", err)
|
||||
|
|
@ -320,9 +321,8 @@ func TestHTTPSync_Resync(t *testing.T) {
|
|||
wantErr: false,
|
||||
wantNotifications: []sync.DataSync{
|
||||
{
|
||||
Type: sync.ALL,
|
||||
FlagData: "",
|
||||
Source: "",
|
||||
FlagData: emptyeFlagData,
|
||||
Source: source,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
@ -364,8 +364,8 @@ func TestHTTPSync_Resync(t *testing.T) {
|
|||
for _, dataSync := range tt.wantNotifications {
|
||||
select {
|
||||
case x := <-d:
|
||||
if !reflect.DeepEqual(x.String(), dataSync.String()) {
|
||||
t.Error("unexpected datasync received", x, dataSync)
|
||||
if x.FlagData != dataSync.FlagData || x.Source != dataSync.Source {
|
||||
t.Errorf("unexpected datasync received %v vs %v", x, dataSync)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("expected datasync not received", dataSync)
|
||||
|
|
|
|||
|
|
@ -6,35 +6,6 @@ import (
|
|||
"google.golang.org/protobuf/types/known/structpb"
|
||||
)
|
||||
|
||||
type Type int
|
||||
|
||||
// Type of the sync operation
|
||||
const (
|
||||
// ALL - All flags of sync provider. This is the default if unset due to primitive default
|
||||
ALL Type = iota
|
||||
// ADD - Additional flags from sync provider
|
||||
ADD
|
||||
// UPDATE - Update for flag(s) previously provided
|
||||
UPDATE
|
||||
// DELETE - Delete for flag(s) previously provided
|
||||
DELETE
|
||||
)
|
||||
|
||||
func (t Type) String() string {
|
||||
switch t {
|
||||
case ALL:
|
||||
return "ALL"
|
||||
case ADD:
|
||||
return "ADD"
|
||||
case UPDATE:
|
||||
return "UPDATE"
|
||||
case DELETE:
|
||||
return "DELETE"
|
||||
default:
|
||||
return "UNKNOWN"
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
ISync implementations watch for changes in the flag sources (HTTP backend, local file, K8s CRDs ...),fetch the latest
|
||||
value and communicate to the Runtime with DataSync channel
|
||||
|
|
@ -57,11 +28,10 @@ type ISync interface {
|
|||
|
||||
// DataSync is the data contract between Runtime and sync implementations
|
||||
type DataSync struct {
|
||||
FlagData string
|
||||
FlagData string
|
||||
SyncContext *structpb.Struct
|
||||
Source string
|
||||
Selector string
|
||||
Type
|
||||
Source string
|
||||
Selector string
|
||||
}
|
||||
|
||||
// SourceConfig is configuration option for flagd. This maps to startup parameter sources
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
|
|||
if err != nil {
|
||||
return fmt.Errorf("unable to fetch flag configuration: %w", err)
|
||||
}
|
||||
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI, Type: sync.ALL}
|
||||
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -97,7 +97,7 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
|||
return err
|
||||
}
|
||||
|
||||
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI, Type: sync.ALL}
|
||||
dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI}
|
||||
|
||||
notifies := make(chan INotify)
|
||||
|
||||
|
|
@ -136,7 +136,7 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
|
|||
continue
|
||||
}
|
||||
|
||||
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
|
||||
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI}
|
||||
case DefaultEventTypeModify:
|
||||
k.logger.Debug("Configuration modified")
|
||||
msg, err := k.fetch(ctx)
|
||||
|
|
@ -145,7 +145,7 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
|
|||
continue
|
||||
}
|
||||
|
||||
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
|
||||
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI}
|
||||
case DefaultEventTypeDelete:
|
||||
k.logger.Debug("configuration deleted")
|
||||
case DefaultEventTypeReady:
|
||||
|
|
|
|||
|
|
@ -607,6 +607,7 @@ func TestInit(t *testing.T) {
|
|||
func TestSync_ReSync(t *testing.T) {
|
||||
const name = "myFF"
|
||||
const ns = "myNS"
|
||||
const payload = "{\"flags\":null}"
|
||||
s := runtime.NewScheme()
|
||||
ff := &unstructured.Unstructured{}
|
||||
ff.SetUnstructuredContent(getCFG(name, ns))
|
||||
|
|
@ -668,8 +669,8 @@ func TestSync_ReSync(t *testing.T) {
|
|||
i := tt.countMsg
|
||||
for i > 0 {
|
||||
d := <-dataChannel
|
||||
if d.Type != sync.ALL {
|
||||
t.Errorf("Expected %v, got %v", sync.ALL, d)
|
||||
if d.FlagData != payload {
|
||||
t.Errorf("Expected %v, got %v", payload, d.FlagData)
|
||||
}
|
||||
i--
|
||||
}
|
||||
|
|
|
|||
|
|
@ -104,7 +104,6 @@ func (l *oldHandler) SyncFlags(
|
|||
case d := <-dataSync:
|
||||
if err := stream.Send(&syncv1.SyncFlagsResponse{
|
||||
FlagConfiguration: d.FlagData,
|
||||
State: dataSyncToGrpcState(d),
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error sending configuration change event: %w", err)
|
||||
}
|
||||
|
|
@ -115,8 +114,3 @@ func (l *oldHandler) SyncFlags(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:staticcheck
|
||||
func dataSyncToGrpcState(s sync.DataSync) syncv1.SyncState {
|
||||
return syncv1.SyncState(s.Type + 1)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -110,7 +110,6 @@ func Test_watchResource(t *testing.T) {
|
|||
in := isync.DataSync{
|
||||
FlagData: "im a flag",
|
||||
Source: "im a flag source",
|
||||
Type: isync.ALL,
|
||||
}
|
||||
syncMock.dataSyncChanIn <- in
|
||||
|
||||
|
|
@ -335,7 +334,6 @@ func Test_FetchAllFlags(t *testing.T) {
|
|||
mockData: &isync.DataSync{
|
||||
FlagData: "im a flag",
|
||||
Source: "im a flag source",
|
||||
Type: isync.ALL,
|
||||
},
|
||||
setHandler: true,
|
||||
},
|
||||
|
|
@ -402,7 +400,6 @@ func Test_registerSubscriptionResyncPath(t *testing.T) {
|
|||
data: &isync.DataSync{
|
||||
FlagData: "im a flag",
|
||||
Source: "im a flag source",
|
||||
Type: isync.ALL,
|
||||
},
|
||||
expectErr: false,
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in New Issue