Reinstate fetching bucket contents in parallel

This commit reintroduces the use of goroutines for fetching objects,
but in the caller of the client interface rather than in a particular
client implementation.

Signed-off-by: Michael Bridgen <michael@weave.works>
This commit is contained in:
Michael Bridgen 2021-11-11 17:09:34 +00:00
parent d51bddc263
commit 404df50ba9
2 changed files with 198 additions and 7 deletions

View File

@ -26,6 +26,7 @@ import (
"time"
"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -50,6 +51,18 @@ import (
"github.com/fluxcd/source-controller/pkg/sourceignore"
)
// maxConcurrentFetches is the upper bound on the goroutines used to
// fetch bucket objects. It's important to have a bound, to avoid
// using arbitrary amounts of memory; the actual number is chosen
// according to the queueing rule of thumb with some conservative
// parameters:
// s > Nr / T
// N (number of requestors, i.e., objects to fetch) = 10000
// r (service time -- fetch duration) = 0.01s (~ a megabyte file over 1Gb/s)
// T (total time available) = 1s
// -> s > 100
const maxConcurrentFetches = 100
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
@ -318,6 +331,14 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck
}
matcher := sourceignore.NewMatcher(ps)
// Download in parallel, but bound the concurrency. According to
// AWS and GCP docs, rate limits are either soft or don't exist:
// - https://cloud.google.com/storage/quotas
// - https://docs.aws.amazon.com/general/latest/gr/s3.html
// .. so, the limiting factor is this process keeping a small footprint.
semaphore := make(chan struct{}, maxConcurrentFetches)
group, ctx := errgroup.WithContext(ctx)
err = client.VisitObjects(ctxTimeout, bucket.Spec.BucketName, func(path string) error {
if strings.HasSuffix(path, "/") || path == sourceignore.IgnoreFile {
return nil
@ -327,15 +348,23 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck
return nil
}
localPath := filepath.Join(tempDir, path)
err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath)
if err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return err
}
// block until there's capacity
semaphore <- struct{}{}
group.Go(func() error {
defer func() { <-semaphore }()
localPath := filepath.Join(tempDir, path)
err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath)
if err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return err
}
return nil
})
return nil
})
if err != nil {
// VisitObjects won't return an error, but the errgroup might.
if err = group.Wait(); err != nil {
err = fmt.Errorf("fetching objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}

View File

@ -0,0 +1,162 @@
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
)
type mockBucketClient struct {
bucketName string
objects map[string]string
}
var mockNotFound = fmt.Errorf("not found")
func (m mockBucketClient) BucketExists(c context.Context, name string) (bool, error) {
return name == m.bucketName, nil
}
func (m mockBucketClient) ObjectExists(c context.Context, bucket, obj string) (bool, error) {
if bucket != m.bucketName {
return false, fmt.Errorf("bucket does not exist")
}
_, ok := m.objects[obj]
return ok, nil
}
func (m mockBucketClient) FGetObject(c context.Context, bucket, obj, path string) error {
if bucket != m.bucketName {
return fmt.Errorf("bucket does not exist")
}
// tiny bit of protocol, for convenience: if asked for an object "error", then return an error.
if obj == "error" {
return fmt.Errorf("I was asked to report an error")
}
object, ok := m.objects[obj]
if !ok {
return mockNotFound
}
return os.WriteFile(path, []byte(object), os.FileMode(0660))
}
func (m mockBucketClient) ObjectIsNotFound(e error) bool {
return e == mockNotFound
}
func (m mockBucketClient) VisitObjects(c context.Context, bucket string, f func(string) error) error {
for path := range m.objects {
if err := f(path); err != nil {
return err
}
}
return nil
}
func (m mockBucketClient) Close(c context.Context) {
return
}
// Since the algorithm for fetching files uses concurrency and has some complications around error
// reporting, it's worth testing by itself.
func TestFetchFiles(t *testing.T) {
files := map[string]string{
"foo.yaml": "foo: 1",
"bar.yaml": "bar: 2",
"baz.yaml": "baz: 3",
}
bucketName := "all-my-config"
bucket := sourcev1.Bucket{
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
},
}
client := mockBucketClient{
objects: files,
bucketName: bucketName,
}
t.Run("fetch files happy path", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
_, err = fetchFiles(context.TODO(), client, bucket, tmp)
if err != nil {
t.Fatal(err)
}
for path := range files {
p := filepath.Join(tmp, path)
_, err := os.Stat(p)
if err != nil {
t.Error(err)
}
}
})
t.Run("an error while fetching returns an error for the whole procedure", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
files["error"] = "this one causes an error"
_, err = fetchFiles(context.TODO(), client, bucket, tmp)
if err == nil {
t.Fatal("expected error but got nil")
}
})
t.Run("can fetch more than maxConcurrentFetches", func(t *testing.T) {
// this will fail if, for example, the semaphore is not used correctly and blocks
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
lotsOfFiles := map[string]string{}
for i := 0; i < 2*maxConcurrentFetches; i++ {
f := fmt.Sprintf("file-%d", i)
lotsOfFiles[f] = f
}
lotsOfFilesClient := mockBucketClient{
bucketName: bucketName,
objects: lotsOfFiles,
}
_, err = fetchFiles(context.TODO(), lotsOfFilesClient, bucket, tmp)
if err != nil {
t.Fatal(err)
}
})
}