flagd/core/pkg/sync/blob/blob_sync.go

137 lines
3.6 KiB
Go

package blob
import (
"bytes"
"context"
"errors"
"fmt"
"time"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
"gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob" // needed to initialize Azure Blob Storage driver
_ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver
_ "gocloud.dev/blob/s3blob" // needed to initialize s3 driver
)
type Sync struct {
Bucket string
Object string
BlobURLMux *blob.URLMux
Cron Cron
Logger *logger.Logger
Interval uint32
ready bool
lastUpdated time.Time
}
// Cron defines the behaviour required of a cron
type Cron interface {
AddFunc(spec string, cmd func()) error
Start()
Stop()
}
func (hs *Sync) Init(_ context.Context) error {
if hs.Bucket == "" {
return errors.New("no bucket string set")
}
if hs.Object == "" {
return errors.New("no object string set")
}
return nil
}
func (hs *Sync) IsReady() bool {
return hs.ready
}
func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
hs.Logger.Info(fmt.Sprintf("starting sync from %s/%s with interval %ds", hs.Bucket, hs.Object, hs.Interval))
_ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() {
err := hs.sync(ctx, dataSync, false)
if err != nil {
hs.Logger.Warn(fmt.Sprintf("sync failed: %v", err))
}
})
// Initial fetch
hs.Logger.Debug(fmt.Sprintf("initial sync of the %s/%s", hs.Bucket, hs.Object))
err := hs.sync(ctx, dataSync, false)
if err != nil {
return err
}
hs.ready = true
hs.Cron.Start()
<-ctx.Done()
hs.Cron.Stop()
return nil
}
func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return hs.sync(ctx, dataSync, true)
}
func (hs *Sync) sync(ctx context.Context, dataSync chan<- sync.DataSync, skipCheckingModTime bool) error {
bucket, err := hs.getBucket(ctx)
if err != nil {
return fmt.Errorf("couldn't get bucket: %v", err)
}
defer bucket.Close()
var updated time.Time
if !skipCheckingModTime {
updated, err = hs.fetchObjectModificationTime(ctx, bucket)
if err != nil {
return fmt.Errorf("couldn't get object attributes: %v", err)
}
if hs.lastUpdated == updated {
hs.Logger.Debug("configuration hasn't changed, skipping fetching full object")
return nil
}
if hs.lastUpdated.After(updated) {
hs.Logger.Warn("configuration changed but the modification time decreased instead of increasing")
}
}
msg, err := hs.fetchObject(ctx, bucket)
if err != nil {
return fmt.Errorf("couldn't get object: %v", err)
}
hs.Logger.Debug(fmt.Sprintf("configuration updated: %s", msg))
if !skipCheckingModTime {
hs.lastUpdated = updated
}
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL}
return nil
}
func (hs *Sync) getBucket(ctx context.Context) (*blob.Bucket, error) {
b, err := hs.BlobURLMux.OpenBucket(ctx, hs.Bucket)
if err != nil {
return nil, fmt.Errorf("error opening bucket %s: %v", hs.Bucket, err)
}
return b, nil
}
func (hs *Sync) fetchObjectModificationTime(ctx context.Context, bucket *blob.Bucket) (time.Time, error) {
if hs.Object == "" {
return time.Time{}, errors.New("no object string set")
}
attrs, err := bucket.Attributes(ctx, hs.Object)
if err != nil {
return time.Time{}, fmt.Errorf("error fetching attributes for object %s/%s: %w", hs.Bucket, hs.Object, err)
}
return attrs.ModTime, nil
}
func (hs *Sync) fetchObject(ctx context.Context, bucket *blob.Bucket) (string, error) {
buf := bytes.NewBuffer(nil)
err := bucket.Download(ctx, hs.Object, buf, nil)
if err != nil {
return "", fmt.Errorf("error downloading object %s/%s: %w", hs.Bucket, hs.Object, err)
}
return buf.String(), nil
}