chore: refactoring and improve coverage for K8s Sync (#466)
Signed-off-by: Giovanni Liva <giovanni.liva@dynatrace.com>
This commit is contained in:
parent
5b85b2a611
commit
6dc441e2f2
1
go.mod
1
go.mod
|
|
@ -52,6 +52,7 @@ require (
|
||||||
github.com/cucumber/messages-go/v16 v16.0.1 // indirect
|
github.com/cucumber/messages-go/v16 v16.0.1 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/emicklei/go-restful/v3 v3.10.1 // indirect
|
github.com/emicklei/go-restful/v3 v3.10.1 // indirect
|
||||||
|
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
|
||||||
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
|
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
|
||||||
github.com/go-logr/logr v1.2.3 // indirect
|
github.com/go-logr/logr v1.2.3 // indirect
|
||||||
github.com/go-logr/stdr v1.2.2 // indirect
|
github.com/go-logr/stdr v1.2.2 // indirect
|
||||||
|
|
|
||||||
1
go.sum
1
go.sum
|
|
@ -105,6 +105,7 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
|
||||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||||
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
|
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
|
||||||
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
|
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
|
||||||
|
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
||||||
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
|
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
|
||||||
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
|
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
|
||||||
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
|
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
|
||||||
|
|
|
||||||
|
|
@ -77,9 +77,13 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
|
||||||
)
|
)
|
||||||
rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri))
|
rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri))
|
||||||
case regCrd.Match(uriB):
|
case regCrd.Match(uriB):
|
||||||
|
k, err := r.newK8s(uri, logger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
r.SyncImpl = append(
|
r.SyncImpl = append(
|
||||||
r.SyncImpl,
|
r.SyncImpl,
|
||||||
r.newK8s(uri, logger),
|
k,
|
||||||
)
|
)
|
||||||
rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri))
|
rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri))
|
||||||
case regURL.Match(uriB):
|
case regURL.Match(uriB):
|
||||||
|
|
@ -127,15 +131,21 @@ func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) newK8s(uri string, logger *logger.Logger) *kubernetes.Sync {
|
func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
|
||||||
return &kubernetes.Sync{
|
reader, dynamic, err := kubernetes.GetClients()
|
||||||
Logger: logger.WithFields(
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return kubernetes.NewK8sSync(
|
||||||
|
logger.WithFields(
|
||||||
zap.String("component", "sync"),
|
zap.String("component", "sync"),
|
||||||
zap.String("sync", "kubernetes"),
|
zap.String("sync", "kubernetes"),
|
||||||
),
|
),
|
||||||
URI: regCrd.ReplaceAllString(uri, ""),
|
regCrd.ReplaceAllString(uri, ""),
|
||||||
ProviderArgs: r.config.ProviderArgs,
|
r.config.ProviderArgs,
|
||||||
}
|
reader,
|
||||||
|
dynamic,
|
||||||
|
), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync {
|
func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync {
|
||||||
|
|
|
||||||
|
|
@ -23,20 +23,56 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
resyncPeriod = 1 * time.Minute
|
resyncPeriod = 1 * time.Minute
|
||||||
apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version)
|
apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version)
|
||||||
|
featureFlagConfigurationResource = v1alpha1.GroupVersion.WithResource("featureflagconfigurations")
|
||||||
)
|
)
|
||||||
|
|
||||||
type Sync struct {
|
type Sync struct {
|
||||||
Logger *logger.Logger
|
URI string
|
||||||
ProviderArgs sync.ProviderArgs
|
|
||||||
URI string
|
|
||||||
|
|
||||||
ready bool
|
ready bool
|
||||||
namespace string
|
namespace string
|
||||||
crdName string
|
crdName string
|
||||||
readClient client.Reader
|
logger *logger.Logger
|
||||||
informer cache.SharedInformer
|
providerArgs sync.ProviderArgs
|
||||||
|
readClient client.Reader
|
||||||
|
dynamicClient dynamic.Interface
|
||||||
|
informer cache.SharedInformer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewK8sSync(
|
||||||
|
logger *logger.Logger,
|
||||||
|
uri string,
|
||||||
|
providerArgs sync.ProviderArgs,
|
||||||
|
reader client.Reader,
|
||||||
|
dynamic dynamic.Interface,
|
||||||
|
) *Sync {
|
||||||
|
return &Sync{
|
||||||
|
logger: logger,
|
||||||
|
URI: uri,
|
||||||
|
providerArgs: providerArgs,
|
||||||
|
readClient: reader,
|
||||||
|
dynamicClient: dynamic,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetClients() (client.Reader, dynamic.Interface, error) {
|
||||||
|
clusterConfig, err := k8sClusterConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme})
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
dynamicClient, err := dynamic.NewForConfig(clusterConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return readClient, dynamicClient, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
||||||
|
|
@ -59,28 +95,12 @@ func (k *Sync) Init(ctx context.Context) error {
|
||||||
if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil {
|
if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
clusterConfig, err := k8sClusterConfig()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
k.readClient, err = client.New(clusterConfig, client.Options{Scheme: scheme.Scheme})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
dynamicClient, err := dynamic.NewForConfig(clusterConfig)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
resource := v1alpha1.GroupVersion.WithResource("featureflagconfigurations")
|
|
||||||
|
|
||||||
// The created informer will not do resyncs if the given defaultEventHandlerResyncPeriod is zero.
|
// The created informer will not do resyncs if the given defaultEventHandlerResyncPeriod is zero.
|
||||||
// For more details on resync implications refer to tools/cache/shared_informer.go
|
// For more details on resync implications refer to tools/cache/shared_informer.go
|
||||||
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, resyncPeriod, k.namespace, nil)
|
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k.dynamicClient, resyncPeriod, k.namespace, nil)
|
||||||
|
|
||||||
k.informer = factory.ForResource(resource).Informer()
|
k.informer = factory.ForResource(featureFlagConfigurationResource).Informer()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -90,12 +110,12 @@ func (k *Sync) IsReady() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
|
||||||
k.Logger.Info(fmt.Sprintf("starting kubernetes sync notifier for resource: %s", k.URI))
|
k.logger.Info(fmt.Sprintf("starting kubernetes sync notifier for resource: %s", k.URI))
|
||||||
|
|
||||||
// Initial fetch
|
// Initial fetch
|
||||||
fetch, err := k.fetch(ctx)
|
fetch, err := k.fetch(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
k.Logger.Error(fmt.Sprintf("error with the initial fetch: %s", err.Error()))
|
k.logger.Error(fmt.Sprintf("error with the initial fetch: %s", err.Error()))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -131,27 +151,27 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
|
||||||
case w := <-notifies:
|
case w := <-notifies:
|
||||||
switch w.GetEvent().EventType {
|
switch w.GetEvent().EventType {
|
||||||
case DefaultEventTypeCreate:
|
case DefaultEventTypeCreate:
|
||||||
k.Logger.Debug("new configuration created")
|
k.logger.Debug("new configuration created")
|
||||||
msg, err := k.fetch(ctx)
|
msg, err := k.fetch(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
k.Logger.Error(fmt.Sprintf("error fetching after create notification: %s", err.Error()))
|
k.logger.Error(fmt.Sprintf("error fetching after create notification: %s", err.Error()))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
|
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
|
||||||
case DefaultEventTypeModify:
|
case DefaultEventTypeModify:
|
||||||
k.Logger.Debug("Configuration modified")
|
k.logger.Debug("Configuration modified")
|
||||||
msg, err := k.fetch(ctx)
|
msg, err := k.fetch(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
k.Logger.Error(fmt.Sprintf("error fetching after update notification: %s", err.Error()))
|
k.logger.Error(fmt.Sprintf("error fetching after update notification: %s", err.Error()))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
|
dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
|
||||||
case DefaultEventTypeDelete:
|
case DefaultEventTypeDelete:
|
||||||
k.Logger.Debug("configuration deleted")
|
k.logger.Debug("configuration deleted")
|
||||||
case DefaultEventTypeReady:
|
case DefaultEventTypeReady:
|
||||||
k.Logger.Debug("notifier ready")
|
k.logger.Debug("notifier ready")
|
||||||
k.ready = true
|
k.ready = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -172,7 +192,7 @@ func (k *Sync) fetch(ctx context.Context) (string, error) {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
k.Logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI))
|
k.logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI))
|
||||||
return configuration.Spec.FeatureFlagSpec, nil
|
return configuration.Spec.FeatureFlagSpec, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -186,7 +206,7 @@ func (k *Sync) fetch(ctx context.Context) (string, error) {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
k.Logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI))
|
k.logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI))
|
||||||
return ff.Spec.FeatureFlagSpec, nil
|
return ff.Spec.FeatureFlagSpec, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -197,25 +217,25 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) {
|
||||||
}
|
}
|
||||||
if _, err := k.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
if _, err := k.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(obj interface{}) {
|
AddFunc: func(obj interface{}) {
|
||||||
k.Logger.Info(fmt.Sprintf("kube sync notifier event: add: %s %s", objectKey.Namespace, objectKey.Name))
|
k.logger.Info(fmt.Sprintf("kube sync notifier event: add: %s %s", objectKey.Namespace, objectKey.Name))
|
||||||
if err := commonHandler(obj, objectKey, DefaultEventTypeCreate, c); err != nil {
|
if err := commonHandler(obj, objectKey, DefaultEventTypeCreate, c); err != nil {
|
||||||
k.Logger.Warn(err.Error())
|
k.logger.Warn(err.Error())
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
k.Logger.Info(fmt.Sprintf("kube sync notifier event: update: %s %s", objectKey.Namespace, objectKey.Name))
|
k.logger.Info(fmt.Sprintf("kube sync notifier event: update: %s %s", objectKey.Namespace, objectKey.Name))
|
||||||
if err := updateFuncHandler(oldObj, newObj, objectKey, c); err != nil {
|
if err := updateFuncHandler(oldObj, newObj, objectKey, c); err != nil {
|
||||||
k.Logger.Warn(err.Error())
|
k.logger.Warn(err.Error())
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
DeleteFunc: func(obj interface{}) {
|
DeleteFunc: func(obj interface{}) {
|
||||||
k.Logger.Info(fmt.Sprintf("kube sync notifier event: delete: %s %s", objectKey.Namespace, objectKey.Name))
|
k.logger.Info(fmt.Sprintf("kube sync notifier event: delete: %s %s", objectKey.Namespace, objectKey.Name))
|
||||||
if err := commonHandler(obj, objectKey, DefaultEventTypeDelete, c); err != nil {
|
if err := commonHandler(obj, objectKey, DefaultEventTypeDelete, c); err != nil {
|
||||||
k.Logger.Warn(err.Error())
|
k.logger.Warn(err.Error())
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
k.Logger.Fatal(err.Error())
|
k.logger.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
c <- &Notifier{
|
c <- &Notifier{
|
||||||
|
|
|
||||||
|
|
@ -4,22 +4,25 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/open-feature/flagd/pkg/sync"
|
|
||||||
|
|
||||||
"github.com/open-feature/flagd/pkg/logger"
|
"github.com/open-feature/flagd/pkg/logger"
|
||||||
"k8s.io/client-go/tools/cache"
|
"github.com/open-feature/flagd/pkg/sync"
|
||||||
|
|
||||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
|
|
||||||
|
|
||||||
"github.com/open-feature/open-feature-operator/apis/core/v1alpha1"
|
"github.com/open-feature/open-feature-operator/apis/core/v1alpha1"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/client-go/dynamic/fake"
|
||||||
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
fakeClient "sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
|
||||||
)
|
)
|
||||||
|
|
||||||
var Metadata = v1.TypeMeta{
|
var Metadata = v1.TypeMeta{
|
||||||
|
|
@ -446,7 +449,7 @@ func TestSync_fetch(t *testing.T) {
|
||||||
getResponse: tt.args.ClientResponse,
|
getResponse: tt.args.ClientResponse,
|
||||||
clientErr: tt.args.ClientError,
|
clientErr: tt.args.ClientError,
|
||||||
},
|
},
|
||||||
Logger: logger.NewLogger(nil, false),
|
logger: logger.NewLogger(nil, false),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test fetch
|
// Test fetch
|
||||||
|
|
@ -527,7 +530,7 @@ func TestSync_watcher(t *testing.T) {
|
||||||
GetByKeyFunc: tt.args.InformerGetFunc,
|
GetByKeyFunc: tt.args.InformerGetFunc,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Logger: logger.NewLogger(nil, false),
|
logger: logger.NewLogger(nil, false),
|
||||||
}
|
}
|
||||||
|
|
||||||
// create communication channels with buffer to so that calls are non-blocking
|
// create communication channels with buffer to so that calls are non-blocking
|
||||||
|
|
@ -556,6 +559,308 @@ func TestSync_watcher(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInit(t *testing.T) {
|
||||||
|
t.Run("expect error with wrong URI format", func(t *testing.T) {
|
||||||
|
k := Sync{URI: ""}
|
||||||
|
e := k.Init(context.TODO())
|
||||||
|
if e == nil {
|
||||||
|
t.Errorf("Expected error but got none")
|
||||||
|
}
|
||||||
|
if k.IsReady() {
|
||||||
|
t.Errorf("Expected NOT to be ready")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("expect informer registration", func(t *testing.T) {
|
||||||
|
const name = "myFF"
|
||||||
|
const ns = "myNS"
|
||||||
|
scheme := runtime.NewScheme()
|
||||||
|
ff := &unstructured.Unstructured{}
|
||||||
|
ff.SetUnstructuredContent(getCFG(name, ns))
|
||||||
|
fakeClient := fake.NewSimpleDynamicClient(scheme, ff)
|
||||||
|
k := Sync{
|
||||||
|
URI: fmt.Sprintf("%s/%s", ns, name),
|
||||||
|
dynamicClient: fakeClient,
|
||||||
|
namespace: ns,
|
||||||
|
}
|
||||||
|
e := k.Init(context.TODO())
|
||||||
|
if e != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", e)
|
||||||
|
}
|
||||||
|
if k.informer == nil {
|
||||||
|
t.Errorf("Informer not initialized")
|
||||||
|
}
|
||||||
|
if k.IsReady() {
|
||||||
|
t.Errorf("The Sync should not be ready")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSync_ReSync(t *testing.T) {
|
||||||
|
const name = "myFF"
|
||||||
|
const ns = "myNS"
|
||||||
|
s := runtime.NewScheme()
|
||||||
|
ff := &unstructured.Unstructured{}
|
||||||
|
ff.SetUnstructuredContent(getCFG(name, ns))
|
||||||
|
fakeDynamicClient := fake.NewSimpleDynamicClient(s, ff)
|
||||||
|
validFFCfg := &v1alpha1.FeatureFlagConfiguration{
|
||||||
|
TypeMeta: Metadata,
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: ns,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
fakeReadClient := newFakeReadClient(validFFCfg)
|
||||||
|
l, err := logger.NewZapLogger(zapcore.FatalLevel, "console")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
k Sync
|
||||||
|
countMsg int
|
||||||
|
async bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Happy Path",
|
||||||
|
k: Sync{
|
||||||
|
URI: fmt.Sprintf("%s/%s", ns, name),
|
||||||
|
dynamicClient: fakeDynamicClient,
|
||||||
|
readClient: fakeReadClient,
|
||||||
|
namespace: ns,
|
||||||
|
logger: logger.NewLogger(l, true),
|
||||||
|
},
|
||||||
|
countMsg: 2, // one for sync and one for resync
|
||||||
|
async: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "CRD not found",
|
||||||
|
k: Sync{
|
||||||
|
URI: fmt.Sprintf("doesnt%s/exist%s", ns, name),
|
||||||
|
dynamicClient: fakeDynamicClient,
|
||||||
|
readClient: fakeReadClient,
|
||||||
|
namespace: ns,
|
||||||
|
logger: logger.NewLogger(l, true),
|
||||||
|
},
|
||||||
|
countMsg: 0,
|
||||||
|
async: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
e := tt.k.Init(context.TODO())
|
||||||
|
if e != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", e)
|
||||||
|
}
|
||||||
|
if tt.k.IsReady() {
|
||||||
|
t.Errorf("The Sync should not be ready")
|
||||||
|
}
|
||||||
|
dataChannel := make(chan sync.DataSync, tt.countMsg)
|
||||||
|
if tt.async {
|
||||||
|
go func() {
|
||||||
|
if err := tt.k.Sync(context.TODO(), dataChannel); err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", e)
|
||||||
|
}
|
||||||
|
if err := tt.k.ReSync(context.TODO(), dataChannel); err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", e)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
i := tt.countMsg
|
||||||
|
for i > 0 {
|
||||||
|
d := <-dataChannel
|
||||||
|
if d.Type != sync.ALL {
|
||||||
|
t.Errorf("Expected %v, got %v", sync.ALL, d)
|
||||||
|
}
|
||||||
|
i--
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := tt.k.Sync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if err := tt.k.ReSync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNotify(t *testing.T) {
|
||||||
|
const name = "myFF"
|
||||||
|
const ns = "myNS"
|
||||||
|
s := runtime.NewScheme()
|
||||||
|
ff := &unstructured.Unstructured{}
|
||||||
|
cfg := getCFG(name, ns)
|
||||||
|
ff.SetUnstructuredContent(cfg)
|
||||||
|
fc := fake.NewSimpleDynamicClient(s, ff)
|
||||||
|
l, err := logger.NewZapLogger(zapcore.FatalLevel, "console")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
k := Sync{
|
||||||
|
URI: fmt.Sprintf("%s/%s", ns, name),
|
||||||
|
dynamicClient: fc,
|
||||||
|
namespace: ns,
|
||||||
|
logger: logger.NewLogger(l, true),
|
||||||
|
}
|
||||||
|
err = k.Init(context.TODO())
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if k.informer == nil {
|
||||||
|
t.Errorf("Informer not initialized")
|
||||||
|
}
|
||||||
|
c := make(chan INotify)
|
||||||
|
go func() { k.notify(context.TODO(), c) }()
|
||||||
|
|
||||||
|
if k.IsReady() {
|
||||||
|
t.Errorf("The Sync should not be ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for informer callbacks to be set
|
||||||
|
msg := <-c
|
||||||
|
if msg.GetEvent().EventType != DefaultEventTypeReady {
|
||||||
|
t.Errorf("Expected message %v, got %v", DefaultEventTypeReady, msg)
|
||||||
|
}
|
||||||
|
// create
|
||||||
|
cfg["status"] = map[string]interface{}{
|
||||||
|
"empty": "",
|
||||||
|
}
|
||||||
|
ff.SetUnstructuredContent(cfg)
|
||||||
|
_, err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
msg = <-c
|
||||||
|
if msg.GetEvent().EventType != DefaultEventTypeCreate {
|
||||||
|
t.Errorf("Expected message %v, got %v", DefaultEventTypeCreate, msg)
|
||||||
|
}
|
||||||
|
// update
|
||||||
|
old := cfg["metadata"].(map[string]interface{})
|
||||||
|
old["resourceVersion"] = "newVersion"
|
||||||
|
cfg["metadata"] = old
|
||||||
|
ff.SetUnstructuredContent(cfg)
|
||||||
|
_, err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
msg = <-c
|
||||||
|
if msg.GetEvent().EventType != DefaultEventTypeModify {
|
||||||
|
t.Errorf("Expected message %v, got %v", DefaultEventTypeModify, msg)
|
||||||
|
}
|
||||||
|
// delete
|
||||||
|
err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).Delete(context.TODO(), name, v1.DeleteOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
msg = <-c
|
||||||
|
if msg.GetEvent().EventType != DefaultEventTypeDelete {
|
||||||
|
t.Errorf("Expected message %v, got %v", DefaultEventTypeDelete, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// validate we don't crash parsing wrong spec
|
||||||
|
cfg["spec"] = map[string]interface{}{
|
||||||
|
"featureFlagSpec": int64(12), // we expect string here
|
||||||
|
}
|
||||||
|
ff.SetUnstructuredContent(cfg)
|
||||||
|
_, err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).Create(context.TODO(), ff, v1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
cfg["status"] = map[string]interface{}{
|
||||||
|
"bump": "1",
|
||||||
|
}
|
||||||
|
ff.SetUnstructuredContent(cfg)
|
||||||
|
_, err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).Delete(context.TODO(), name, v1.DeleteOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_k8sClusterConfig(t *testing.T) {
|
||||||
|
t.Run("Cannot find KUBECONFIG file", func(tt *testing.T) {
|
||||||
|
tt.Setenv("KUBECONFIG", "")
|
||||||
|
_, err := k8sClusterConfig()
|
||||||
|
if err == nil {
|
||||||
|
tt.Error("Expected error but got none")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("KUBECONFIG file not existing", func(tt *testing.T) {
|
||||||
|
tt.Setenv("KUBECONFIG", "value")
|
||||||
|
_, err := k8sClusterConfig()
|
||||||
|
if err == nil {
|
||||||
|
tt.Error("Expected error but got none")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("Default REST Config and missing svc account", func(tt *testing.T) {
|
||||||
|
tt.Setenv("KUBERNETES_SERVICE_HOST", "127.0.0.1")
|
||||||
|
tt.Setenv("KUBERNETES_SERVICE_PORT", "8080")
|
||||||
|
_, err := k8sClusterConfig()
|
||||||
|
if err == nil {
|
||||||
|
tt.Error("Expected error but got none")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_NewK8sSync(t *testing.T) {
|
||||||
|
l, err := logger.NewZapLogger(zapcore.FatalLevel, "console")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
const uri = "myURI"
|
||||||
|
log := logger.NewLogger(l, true)
|
||||||
|
const key, value = "myKey", "myValue"
|
||||||
|
args := map[string]string{key: value}
|
||||||
|
rc := newFakeReadClient()
|
||||||
|
dc := fake.NewSimpleDynamicClient(runtime.NewScheme())
|
||||||
|
k := NewK8sSync(
|
||||||
|
log,
|
||||||
|
uri,
|
||||||
|
args,
|
||||||
|
rc,
|
||||||
|
dc,
|
||||||
|
)
|
||||||
|
if k == nil {
|
||||||
|
t.Errorf("Object not initialized properly")
|
||||||
|
}
|
||||||
|
if k.URI != uri {
|
||||||
|
t.Errorf("Object not initialized with the right URI")
|
||||||
|
}
|
||||||
|
if k.logger != log {
|
||||||
|
t.Errorf("Object not initialized with the right logger")
|
||||||
|
}
|
||||||
|
if k.providerArgs[key] != value {
|
||||||
|
t.Errorf("Object not initialized with the right arguments")
|
||||||
|
}
|
||||||
|
if k.readClient != rc {
|
||||||
|
t.Errorf("Object not initialized with the right K8s client")
|
||||||
|
}
|
||||||
|
if k.dynamicClient != dc {
|
||||||
|
t.Errorf("Object not initialized with the right K8s dynamic client")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFakeReadClient(objs ...client.Object) client.Client {
|
||||||
|
_ = v1alpha1.AddToScheme(scheme.Scheme)
|
||||||
|
return fakeClient.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(objs...).Build()
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCFG(name, namespace string) map[string]interface{} {
|
||||||
|
return map[string]interface{}{
|
||||||
|
"apiVersion": "core.openfeature.dev/v1alpha1",
|
||||||
|
"kind": "FeatureFlagConfiguration",
|
||||||
|
"metadata": map[string]interface{}{
|
||||||
|
"name": name,
|
||||||
|
"namespace": namespace,
|
||||||
|
},
|
||||||
|
"spec": map[string]interface{}{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// toUnstructured helper to convert an interface to unstructured.Unstructured
|
// toUnstructured helper to convert an interface to unstructured.Unstructured
|
||||||
func toUnstructured(t *testing.T, obj interface{}) interface{} {
|
func toUnstructured(t *testing.T, obj interface{}) interface{} {
|
||||||
bytes, err := json.Marshal(obj)
|
bytes, err := json.Marshal(obj)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue