GCP Storage Bucket binding: Bulk file transfer (#3811)
Signed-off-by: nelson.parente <nelson_parente@live.com.pt> Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
This commit is contained in:
parent
132f562e48
commit
026f99710a
|
@ -25,10 +25,12 @@ import (
|
|||
"net/url"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"cloud.google.com/go/storage"
|
||||
"github.com/google/uuid"
|
||||
"go.uber.org/multierr"
|
||||
"google.golang.org/api/googleapi"
|
||||
"google.golang.org/api/iterator"
|
||||
"google.golang.org/api/option"
|
||||
|
@ -49,8 +51,9 @@ const (
|
|||
metadataKey = "key"
|
||||
maxResults = 1000
|
||||
|
||||
metadataKeyBC = "name"
|
||||
signOperation = "sign"
|
||||
metadataKeyBC = "name"
|
||||
signOperation = "sign"
|
||||
bulkGetOperation = "bulkGet"
|
||||
)
|
||||
|
||||
// GCPStorage allows saving data to GCP bucket storage.
|
||||
|
@ -138,6 +141,7 @@ func (g *GCPStorage) Operations() []bindings.OperationKind {
|
|||
bindings.DeleteOperation,
|
||||
bindings.ListOperation,
|
||||
signOperation,
|
||||
bulkGetOperation,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -155,6 +159,8 @@ func (g *GCPStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
|
|||
return g.list(ctx, req)
|
||||
case signOperation:
|
||||
return g.sign(ctx, req)
|
||||
case bulkGetOperation:
|
||||
return g.bulkGet(ctx, req)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
|
||||
}
|
||||
|
@ -404,3 +410,91 @@ func (g *GCPStorage) signObject(bucket, object, ttl string) (string, error) {
|
|||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
type objectData struct {
|
||||
Name string `json:"name"`
|
||||
Data []byte `json:"data"`
|
||||
Attrs storage.ObjectAttrs `json:"attrs"`
|
||||
}
|
||||
|
||||
func (g *GCPStorage) bulkGet(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
metadata, err := g.metadata.mergeWithRequestMetadata(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("gcp binding error while merging metadata : %w", err)
|
||||
}
|
||||
|
||||
if g.metadata.Bucket == "" {
|
||||
return nil, errors.New("gcp bucket binding error: bucket is required")
|
||||
}
|
||||
|
||||
var allObjs []*storage.ObjectAttrs
|
||||
it := g.client.Bucket(g.metadata.Bucket).Objects(ctx, nil)
|
||||
for {
|
||||
attrs, err2 := it.Next()
|
||||
if err2 == iterator.Done {
|
||||
break
|
||||
}
|
||||
allObjs = append(allObjs, attrs)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
objectsCh := make(chan objectData, len(allObjs))
|
||||
errCh := make(chan error, len(allObjs))
|
||||
|
||||
for i, obj := range allObjs {
|
||||
wg.Add(1)
|
||||
go func(idx int, object *storage.ObjectAttrs) {
|
||||
defer wg.Done()
|
||||
|
||||
rc, err3 := g.client.Bucket(g.metadata.Bucket).Object(object.Name).NewReader(ctx)
|
||||
if err3 != nil {
|
||||
errCh <- err3
|
||||
return
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
data, readErr := io.ReadAll(rc)
|
||||
if readErr != nil {
|
||||
errCh <- readErr
|
||||
return
|
||||
}
|
||||
|
||||
if metadata.EncodeBase64 {
|
||||
encoded := b64.StdEncoding.EncodeToString(data)
|
||||
data = []byte(encoded)
|
||||
}
|
||||
|
||||
objectsCh <- objectData{
|
||||
Name: object.Name,
|
||||
Data: data,
|
||||
Attrs: *object,
|
||||
}
|
||||
}(i, obj)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
var multiErr error
|
||||
for err := range errCh {
|
||||
multierr.AppendInto(&multiErr, err)
|
||||
}
|
||||
|
||||
if multiErr != nil {
|
||||
return nil, multiErr
|
||||
}
|
||||
|
||||
response := make([]objectData, 0, len(allObjs))
|
||||
for obj := range objectsCh {
|
||||
response = append(response, obj)
|
||||
}
|
||||
|
||||
jsonResponse, err := json.Marshal(response)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("gcp bucket binding error while marshalling bulk get response: %w", err)
|
||||
}
|
||||
|
||||
return &bindings.InvokeResponse{
|
||||
Data: jsonResponse,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -254,3 +254,14 @@ func TestDeleteOption(t *testing.T) {
|
|||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBulkGetOption(t *testing.T) {
|
||||
gs := GCPStorage{logger: logger.NewLogger("test")}
|
||||
gs.metadata = &gcpMetadata{}
|
||||
|
||||
t.Run("return error if bucket is missing", func(t *testing.T) {
|
||||
r := bindings.InvokeRequest{}
|
||||
_, err := gs.bulkGet(t.Context(), &r)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue