This commit is contained in:
Josh van Leeuwen 2025-05-22 18:02:30 +01:00 committed by GitHub
commit 26926a956b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 17 additions and 28 deletions

View File

@ -30,7 +30,6 @@ import (
"cloud.google.com/go/storage" "cloud.google.com/go/storage"
"github.com/google/uuid" "github.com/google/uuid"
"go.uber.org/multierr"
"google.golang.org/api/googleapi" "google.golang.org/api/googleapi"
"google.golang.org/api/iterator" "google.golang.org/api/iterator"
"google.golang.org/api/option" "google.golang.org/api/option"
@ -430,32 +429,33 @@ func (g *GCPStorage) bulkGet(ctx context.Context, req *bindings.InvokeRequest) (
var allObjs []*storage.ObjectAttrs var allObjs []*storage.ObjectAttrs
it := g.client.Bucket(g.metadata.Bucket).Objects(ctx, nil) it := g.client.Bucket(g.metadata.Bucket).Objects(ctx, nil)
for { for {
attrs, err2 := it.Next() var attrs *storage.ObjectAttrs
if err2 == iterator.Done { attrs, err = it.Next()
if err == iterator.Done {
break break
} }
allObjs = append(allObjs, attrs) allObjs = append(allObjs, attrs)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
objectsCh := make(chan objectData, len(allObjs)) wg.Add(len(allObjs))
errCh := make(chan error, len(allObjs))
objects := make([]objectData, len(allObjs))
errs := make([]error, len(allObjs))
for i, obj := range allObjs { for i, obj := range allObjs {
wg.Add(1)
go func(idx int, object *storage.ObjectAttrs) { go func(idx int, object *storage.ObjectAttrs) {
defer wg.Done() defer wg.Done()
rc, err3 := g.client.Bucket(g.metadata.Bucket).Object(object.Name).NewReader(ctx) rc, gerr := g.client.Bucket(g.metadata.Bucket).Object(object.Name).NewReader(ctx)
if err3 != nil { if gerr != nil {
errCh <- err3 errs[idx] = err
return return
} }
defer rc.Close() defer rc.Close()
data, readErr := io.ReadAll(rc) data, gerr := io.ReadAll(rc)
if readErr != nil { if gerr != nil {
errCh <- readErr errs[idx] = err
return return
} }
@ -464,7 +464,7 @@ func (g *GCPStorage) bulkGet(ctx context.Context, req *bindings.InvokeRequest) (
data = []byte(encoded) data = []byte(encoded)
} }
objectsCh <- objectData{ objects[idx] = objectData{
Name: object.Name, Name: object.Name,
Data: data, Data: data,
Attrs: *object, Attrs: *object,
@ -473,28 +473,17 @@ func (g *GCPStorage) bulkGet(ctx context.Context, req *bindings.InvokeRequest) (
} }
wg.Wait() wg.Wait()
close(errCh)
var multiErr error if err = errors.Join(errs...); err != nil {
for err := range errCh { return nil, fmt.Errorf("gcp bucket binding error while reading objects: %w", err)
multierr.AppendInto(&multiErr, err)
} }
if multiErr != nil { response, err := json.Marshal(allObjs)
return nil, multiErr
}
response := make([]objectData, 0, len(allObjs))
for obj := range objectsCh {
response = append(response, obj)
}
jsonResponse, err := json.Marshal(response)
if err != nil { if err != nil {
return nil, fmt.Errorf("gcp bucket binding error while marshalling bulk get response: %w", err) return nil, fmt.Errorf("gcp bucket binding error while marshalling bulk get response: %w", err)
} }
return &bindings.InvokeResponse{ return &bindings.InvokeResponse{
Data: jsonResponse, Data: response,
}, nil }, nil
} }