361 lines
11 KiB
Go
361 lines
11 KiB
Go
package builder
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"regexp"
|
|
"time"
|
|
|
|
"github.com/open-feature/flagd/core/pkg/logger"
|
|
"github.com/open-feature/flagd/core/pkg/sync"
|
|
blobSync "github.com/open-feature/flagd/core/pkg/sync/blob"
|
|
"github.com/open-feature/flagd/core/pkg/sync/file"
|
|
"github.com/open-feature/flagd/core/pkg/sync/grpc"
|
|
"github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
|
|
httpSync "github.com/open-feature/flagd/core/pkg/sync/http"
|
|
"github.com/open-feature/flagd/core/pkg/sync/kubernetes"
|
|
"github.com/robfig/cron"
|
|
"go.uber.org/zap"
|
|
"gocloud.dev/blob"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
)
|
|
|
|
const (
|
|
syncProviderFile = "file"
|
|
syncProviderFsNotify = "fsnotify"
|
|
syncProviderFileInfo = "fileinfo"
|
|
syncProviderGrpc = "grpc"
|
|
syncProviderKubernetes = "kubernetes"
|
|
syncProviderHTTP = "http"
|
|
syncProviderGcs = "gcs"
|
|
syncProviderAzblob = "azblob"
|
|
syncProviderS3 = "s3"
|
|
)
|
|
|
|
var (
|
|
regCrd *regexp.Regexp
|
|
regURL *regexp.Regexp
|
|
regGRPC *regexp.Regexp
|
|
regGRPCSecure *regexp.Regexp
|
|
regGRPCCustomResolver *regexp.Regexp
|
|
regFile *regexp.Regexp
|
|
regGcs *regexp.Regexp
|
|
regAzblob *regexp.Regexp
|
|
regS3 *regexp.Regexp
|
|
)
|
|
|
|
func init() {
|
|
regCrd = regexp.MustCompile("^core.openfeature.dev/")
|
|
regURL = regexp.MustCompile("^https?://")
|
|
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
|
|
regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure)
|
|
regGRPCCustomResolver = regexp.MustCompile("^" + grpc.SupportedScheme)
|
|
regFile = regexp.MustCompile("^file:")
|
|
regGcs = regexp.MustCompile("^gs://.+?/")
|
|
regAzblob = regexp.MustCompile("^azblob://.+?/")
|
|
regS3 = regexp.MustCompile("^s3://.+?/")
|
|
}
|
|
|
|
type ISyncBuilder interface {
|
|
SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error)
|
|
SyncsFromConfig(sourceConfig []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error)
|
|
}
|
|
|
|
type SyncBuilder struct {
|
|
k8sClientBuilder IK8sClientBuilder
|
|
}
|
|
|
|
func NewSyncBuilder() *SyncBuilder {
|
|
return &SyncBuilder{
|
|
k8sClientBuilder: &KubernetesClientBuilder{},
|
|
}
|
|
}
|
|
|
|
func (sb *SyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error) {
|
|
switch uriB := []byte(uri); {
|
|
// filepath may be used for debugging, not recommended in deployment
|
|
case regFile.Match(uriB):
|
|
return sb.newFile(uri, logger), nil
|
|
case regCrd.Match(uriB):
|
|
return sb.newK8s(uri, logger)
|
|
}
|
|
return nil, fmt.Errorf("unrecognized URI: %s", uri)
|
|
}
|
|
|
|
func (sb *SyncBuilder) SyncsFromConfig(sourceConfigs []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error) {
|
|
syncImpls := make([]sync.ISync, len(sourceConfigs))
|
|
for i, syncProvider := range sourceConfigs {
|
|
syncImpl, err := sb.syncFromConfig(syncProvider, logger)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not create sync provider: %w", err)
|
|
}
|
|
syncImpls[i] = syncImpl
|
|
}
|
|
return syncImpls, nil
|
|
}
|
|
|
|
func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *logger.Logger) (sync.ISync, error) {
|
|
switch sourceConfig.Provider {
|
|
case syncProviderFile:
|
|
return sb.newFile(sourceConfig.URI, logger), nil
|
|
case syncProviderFsNotify:
|
|
logger.Debug(fmt.Sprintf("using fsnotify sync-provider for: %q", sourceConfig.URI))
|
|
return sb.newFsNotify(sourceConfig.URI, logger), nil
|
|
case syncProviderFileInfo:
|
|
logger.Debug(fmt.Sprintf("using fileinfo sync-provider for: %q", sourceConfig.URI))
|
|
return sb.newFileInfo(sourceConfig.URI, logger), nil
|
|
case syncProviderKubernetes:
|
|
logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", sourceConfig.URI))
|
|
return sb.newK8s(sourceConfig.URI, logger)
|
|
case syncProviderHTTP:
|
|
logger.Debug(fmt.Sprintf("using remote sync-provider for: %s", sourceConfig.URI))
|
|
return sb.newHTTP(sourceConfig, logger), nil
|
|
case syncProviderGrpc:
|
|
logger.Debug(fmt.Sprintf("using grpc sync-provider for: %s", sourceConfig.URI))
|
|
return sb.newGRPC(sourceConfig, logger), nil
|
|
case syncProviderGcs:
|
|
logger.Debug(fmt.Sprintf("using blob sync-provider with gcs driver for: %s", sourceConfig.URI))
|
|
return sb.newGcs(sourceConfig, logger), nil
|
|
case syncProviderAzblob:
|
|
logger.Debug(fmt.Sprintf("using blob sync-provider with azblob driver for: %s", sourceConfig.URI))
|
|
return sb.newAzblob(sourceConfig, logger)
|
|
case syncProviderS3:
|
|
logger.Debug(fmt.Sprintf("using blob sync-provider with s3 driver for: %s", sourceConfig.URI))
|
|
return sb.newS3(sourceConfig, logger), nil
|
|
|
|
default:
|
|
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with "+
|
|
"'%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s' or '%s'",
|
|
sourceConfig.Provider, syncProviderFile, syncProviderFsNotify, syncProviderFileInfo,
|
|
syncProviderKubernetes, syncProviderHTTP, syncProviderGrpc, syncProviderGcs, syncProviderAzblob, syncProviderS3)
|
|
}
|
|
}
|
|
|
|
// newFile returns an fsinfo sync if we are in k8s or fileinfo if not
|
|
func (sb *SyncBuilder) newFile(uri string, logger *logger.Logger) *file.Sync {
|
|
switch os.Getenv("KUBERNETES_SERVICE_HOST") {
|
|
case "":
|
|
// no k8s service host env; use fileinfo
|
|
return sb.newFileInfo(uri, logger)
|
|
default:
|
|
// default to fsnotify
|
|
return sb.newFsNotify(uri, logger)
|
|
}
|
|
}
|
|
|
|
// return a new file.Sync that uses fsnotify under the hood
|
|
func (sb *SyncBuilder) newFsNotify(uri string, logger *logger.Logger) *file.Sync {
|
|
return file.NewFileSync(
|
|
regFile.ReplaceAllString(uri, ""),
|
|
file.FSNOTIFY,
|
|
logger.WithFields(
|
|
zap.String("component", "sync"),
|
|
zap.String("sync", syncProviderFsNotify),
|
|
),
|
|
)
|
|
}
|
|
|
|
// return a new file.Sync that uses os.Stat/fs.FileInfo under the hood
|
|
func (sb *SyncBuilder) newFileInfo(uri string, logger *logger.Logger) *file.Sync {
|
|
return file.NewFileSync(
|
|
regFile.ReplaceAllString(uri, ""),
|
|
file.FILEINFO,
|
|
logger.WithFields(
|
|
zap.String("component", "sync"),
|
|
zap.String("sync", syncProviderFileInfo),
|
|
),
|
|
)
|
|
}
|
|
|
|
func (sb *SyncBuilder) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
|
|
dynamicClient, err := sb.k8sClientBuilder.GetK8sClient()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating kubernetes clients: %w", err)
|
|
}
|
|
|
|
return kubernetes.NewK8sSync(
|
|
logger.WithFields(
|
|
zap.String("component", "sync"),
|
|
zap.String("sync", "kubernetes"),
|
|
),
|
|
regCrd.ReplaceAllString(uri, ""),
|
|
dynamicClient,
|
|
), nil
|
|
}
|
|
|
|
func (sb *SyncBuilder) newHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync {
|
|
// Default to 5 seconds
|
|
var interval uint32 = 5
|
|
if config.Interval != 0 {
|
|
interval = config.Interval
|
|
}
|
|
|
|
return &httpSync.Sync{
|
|
URI: config.URI,
|
|
Client: &http.Client{
|
|
Timeout: time.Second * 10,
|
|
},
|
|
Logger: logger.WithFields(
|
|
zap.String("component", "sync"),
|
|
zap.String("sync", "remote"),
|
|
),
|
|
BearerToken: config.BearerToken,
|
|
AuthHeader: config.AuthHeader,
|
|
Interval: interval,
|
|
Cron: cron.New(),
|
|
}
|
|
}
|
|
|
|
func (sb *SyncBuilder) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync {
|
|
return &grpc.Sync{
|
|
URI: config.URI,
|
|
Logger: logger.WithFields(
|
|
zap.String("component", "sync"),
|
|
zap.String("sync", "grpc"),
|
|
),
|
|
CredentialBuilder: &credentials.CredentialBuilder{},
|
|
CertPath: config.CertPath,
|
|
ProviderID: config.ProviderID,
|
|
Secure: config.TLS,
|
|
Selector: config.Selector,
|
|
MaxMsgSize: config.MaxMsgSize,
|
|
}
|
|
}
|
|
|
|
func (sb *SyncBuilder) newGcs(config sync.SourceConfig, logger *logger.Logger) *blobSync.Sync {
|
|
// Extract bucket uri and object name from the full URI:
|
|
// gs://bucket/path/to/object results in gs://bucket/ as bucketUri and
|
|
// path/to/object as an object name.
|
|
bucketURI := regGcs.FindString(config.URI)
|
|
objectName := regGcs.ReplaceAllString(config.URI, "")
|
|
|
|
// Defaults to 5 seconds if interval is not set.
|
|
var interval uint32 = 5
|
|
if config.Interval != 0 {
|
|
interval = config.Interval
|
|
}
|
|
|
|
return &blobSync.Sync{
|
|
Bucket: bucketURI,
|
|
Object: objectName,
|
|
|
|
BlobURLMux: blob.DefaultURLMux(),
|
|
|
|
Logger: logger.WithFields(
|
|
zap.String("component", "sync"),
|
|
zap.String("sync", "gcs"),
|
|
),
|
|
Interval: interval,
|
|
Cron: cron.New(),
|
|
}
|
|
}
|
|
|
|
func (sb *SyncBuilder) newAzblob(config sync.SourceConfig, logger *logger.Logger) (*blobSync.Sync, error) {
|
|
// Required to generate the azblob service URL
|
|
storageAccountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
|
|
if storageAccountName == "" {
|
|
return nil, fmt.Errorf("environment variable AZURE_STORAGE_ACCOUNT not set or is blank")
|
|
}
|
|
if regexp.MustCompile(`\s`).MatchString(storageAccountName) {
|
|
return nil, fmt.Errorf("environment variable AZURE_STORAGE_ACCOUNT contains whitespace")
|
|
}
|
|
|
|
// Extract bucket uri and object name from the full URI:
|
|
// azblob://bucket/path/to/object results in azblob://bucket/ as bucketUri and
|
|
// path/to/object as an object name.
|
|
bucketURI := regAzblob.FindString(config.URI)
|
|
objectName := regAzblob.ReplaceAllString(config.URI, "")
|
|
|
|
// Defaults to 5 seconds if interval is not set.
|
|
var interval uint32 = 5
|
|
if config.Interval != 0 {
|
|
interval = config.Interval
|
|
}
|
|
|
|
return &blobSync.Sync{
|
|
Bucket: bucketURI,
|
|
Object: objectName,
|
|
|
|
BlobURLMux: blob.DefaultURLMux(),
|
|
|
|
Logger: logger.WithFields(
|
|
zap.String("component", "sync"),
|
|
zap.String("sync", "azblob"),
|
|
),
|
|
Interval: interval,
|
|
Cron: cron.New(),
|
|
}, nil
|
|
}
|
|
|
|
func (sb *SyncBuilder) newS3(config sync.SourceConfig, logger *logger.Logger) *blobSync.Sync {
|
|
// Extract bucket uri and object name from the full URI:
|
|
// gs://bucket/path/to/object results in gs://bucket/ as bucketUri and
|
|
// path/to/object as an object name.
|
|
bucketURI := regS3.FindString(config.URI)
|
|
objectName := regS3.ReplaceAllString(config.URI, "")
|
|
|
|
// Defaults to 5 seconds if interval is not set.
|
|
var interval uint32 = 5
|
|
if config.Interval != 0 {
|
|
interval = config.Interval
|
|
}
|
|
|
|
return &blobSync.Sync{
|
|
Bucket: bucketURI,
|
|
Object: objectName,
|
|
|
|
BlobURLMux: blob.DefaultURLMux(),
|
|
|
|
Logger: logger.WithFields(
|
|
zap.String("component", "sync"),
|
|
zap.String("sync", "s3"),
|
|
),
|
|
Interval: interval,
|
|
Cron: cron.New(),
|
|
}
|
|
}
|
|
|
|
type IK8sClientBuilder interface {
|
|
GetK8sClient() (dynamic.Interface, error)
|
|
}
|
|
|
|
type KubernetesClientBuilder struct{}
|
|
|
|
func (kcb KubernetesClientBuilder) GetK8sClient() (dynamic.Interface, error) {
|
|
clusterConfig, err := k8sClusterConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dynamicClient, err := dynamic.NewForConfig(clusterConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create dynamicClient: %w", err)
|
|
}
|
|
return dynamicClient, nil
|
|
}
|
|
|
|
// k8sClusterConfig build K8s connection config based available configurations
|
|
func k8sClusterConfig() (*rest.Config, error) {
|
|
cfg := os.Getenv("KUBECONFIG")
|
|
|
|
var clusterConfig *rest.Config
|
|
var err error
|
|
|
|
if cfg != "" {
|
|
clusterConfig, err = clientcmd.BuildConfigFromFlags("", cfg)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error building cluster config from flags: %w", err)
|
|
}
|
|
} else {
|
|
clusterConfig, err = rest.InClusterConfig()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error fetching cluster config: %w", err)
|
|
}
|
|
}
|
|
|
|
return clusterConfig, nil
|
|
}
|