Merge pull request #1228 from fluxcd/bucket-prefix
bucket: Add prefix filtering capability
This commit is contained in:
commit
f2a1814aea
|
@ -23,6 +23,7 @@ import (
|
||||||
|
|
||||||
"github.com/fluxcd/pkg/apis/acl"
|
"github.com/fluxcd/pkg/apis/acl"
|
||||||
"github.com/fluxcd/pkg/apis/meta"
|
"github.com/fluxcd/pkg/apis/meta"
|
||||||
|
|
||||||
apiv1 "github.com/fluxcd/source-controller/api/v1"
|
apiv1 "github.com/fluxcd/source-controller/api/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -73,6 +74,10 @@ type BucketSpec struct {
|
||||||
// +optional
|
// +optional
|
||||||
Region string `json:"region,omitempty"`
|
Region string `json:"region,omitempty"`
|
||||||
|
|
||||||
|
// Prefix to use for server-side filtering of files in the Bucket.
|
||||||
|
// +optional
|
||||||
|
Prefix string `json:"prefix,omitempty"`
|
||||||
|
|
||||||
// SecretRef specifies the Secret containing authentication credentials
|
// SecretRef specifies the Secret containing authentication credentials
|
||||||
// for the Bucket.
|
// for the Bucket.
|
||||||
// +optional
|
// +optional
|
||||||
|
|
|
@ -331,6 +331,10 @@ spec:
|
||||||
to ensure efficient use of resources.
|
to ensure efficient use of resources.
|
||||||
pattern: ^([0-9]+(\.[0-9]+)?(ms|s|m|h))+$
|
pattern: ^([0-9]+(\.[0-9]+)?(ms|s|m|h))+$
|
||||||
type: string
|
type: string
|
||||||
|
prefix:
|
||||||
|
description: Prefix to use for server-side filtering of files in the
|
||||||
|
Bucket.
|
||||||
|
type: string
|
||||||
provider:
|
provider:
|
||||||
default: generic
|
default: generic
|
||||||
description: Provider of the object storage bucket. Defaults to 'generic',
|
description: Provider of the object storage bucket. Defaults to 'generic',
|
||||||
|
|
|
@ -138,6 +138,18 @@ string
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td>
|
<td>
|
||||||
|
<code>prefix</code><br>
|
||||||
|
<em>
|
||||||
|
string
|
||||||
|
</em>
|
||||||
|
</td>
|
||||||
|
<td>
|
||||||
|
<em>(Optional)</em>
|
||||||
|
<p>Prefix to use for server-side filtering of files in the Bucket.</p>
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>
|
||||||
<code>secretRef</code><br>
|
<code>secretRef</code><br>
|
||||||
<em>
|
<em>
|
||||||
<a href="https://pkg.go.dev/github.com/fluxcd/pkg/apis/meta#LocalObjectReference">
|
<a href="https://pkg.go.dev/github.com/fluxcd/pkg/apis/meta#LocalObjectReference">
|
||||||
|
@ -1422,6 +1434,18 @@ string
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td>
|
<td>
|
||||||
|
<code>prefix</code><br>
|
||||||
|
<em>
|
||||||
|
string
|
||||||
|
</em>
|
||||||
|
</td>
|
||||||
|
<td>
|
||||||
|
<em>(Optional)</em>
|
||||||
|
<p>Prefix to use for server-side filtering of files in the Bucket.</p>
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>
|
||||||
<code>secretRef</code><br>
|
<code>secretRef</code><br>
|
||||||
<em>
|
<em>
|
||||||
<a href="https://pkg.go.dev/github.com/fluxcd/pkg/apis/meta#LocalObjectReference">
|
<a href="https://pkg.go.dev/github.com/fluxcd/pkg/apis/meta#LocalObjectReference">
|
||||||
|
|
|
@ -785,6 +785,15 @@ credentials for the object storage. For some `.spec.provider` implementations
|
||||||
the presence of the field is required, see [Provider](#provider) for more
|
the presence of the field is required, see [Provider](#provider) for more
|
||||||
details and examples.
|
details and examples.
|
||||||
|
|
||||||
|
### Prefix
|
||||||
|
|
||||||
|
`.spec.prefix` is an optional field to enable server-side filtering
|
||||||
|
of files in the Bucket.
|
||||||
|
|
||||||
|
**Note:** The server-side filtering works only with the `generic`, `aws`
|
||||||
|
and `gcp` [provider](#provider) and is preferred over [`.spec.ignore`](#ignore)
|
||||||
|
as a more efficient way of excluding files.
|
||||||
|
|
||||||
### Ignore
|
### Ignore
|
||||||
|
|
||||||
`.spec.ignore` is an optional field to specify rules in [the `.gitignore`
|
`.spec.ignore` is an optional field to specify rules in [the `.gitignore`
|
||||||
|
|
|
@ -145,7 +145,7 @@ type BucketProvider interface {
|
||||||
// bucket, calling visit for every item.
|
// bucket, calling visit for every item.
|
||||||
// If the underlying client or the visit callback returns an error,
|
// If the underlying client or the visit callback returns an error,
|
||||||
// it returns early.
|
// it returns early.
|
||||||
VisitObjects(ctx context.Context, bucketName string, visit func(key, etag string) error) error
|
VisitObjects(ctx context.Context, bucketName string, prefix string, visit func(key, etag string) error) error
|
||||||
// ObjectIsNotFound returns true if the given error indicates an object
|
// ObjectIsNotFound returns true if the given error indicates an object
|
||||||
// could not be found.
|
// could not be found.
|
||||||
ObjectIsNotFound(error) bool
|
ObjectIsNotFound(error) bool
|
||||||
|
@ -742,7 +742,7 @@ func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *bucketv1.
|
||||||
matcher := sourceignore.NewMatcher(ps)
|
matcher := sourceignore.NewMatcher(ps)
|
||||||
|
|
||||||
// Build up index
|
// Build up index
|
||||||
err = provider.VisitObjects(ctxTimeout, obj.Spec.BucketName, func(key, etag string) error {
|
err = provider.VisitObjects(ctxTimeout, obj.Spec.BucketName, obj.Spec.Prefix, func(key, etag string) error {
|
||||||
if strings.HasSuffix(key, "/") || key == sourceignore.IgnoreFile {
|
if strings.HasSuffix(key, "/") || key == sourceignore.IgnoreFile {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,7 +69,7 @@ func (m mockBucketClient) ObjectIsNotFound(e error) bool {
|
||||||
return e == errMockNotFound
|
return e == errMockNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m mockBucketClient) VisitObjects(_ context.Context, _ string, f func(key, etag string) error) error {
|
func (m mockBucketClient) VisitObjects(_ context.Context, _ string, _ string, f func(key, etag string) error) error {
|
||||||
for key, obj := range m.objects {
|
for key, obj := range m.objects {
|
||||||
if err := f(key, obj.etag); err != nil {
|
if err := f(key, obj.etag); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -265,7 +265,7 @@ func (c *BlobClient) FGetObject(ctx context.Context, bucketName, objectName, loc
|
||||||
// bucket, calling visit for every item.
|
// bucket, calling visit for every item.
|
||||||
// If the underlying client or the visit callback returns an error,
|
// If the underlying client or the visit callback returns an error,
|
||||||
// it returns early.
|
// it returns early.
|
||||||
func (c *BlobClient) VisitObjects(ctx context.Context, bucketName string, visit func(path, etag string) error) error {
|
func (c *BlobClient) VisitObjects(ctx context.Context, bucketName string, prefix string, visit func(path, etag string) error) error {
|
||||||
items := c.NewListBlobsFlatPager(bucketName, nil)
|
items := c.NewListBlobsFlatPager(bucketName, nil)
|
||||||
for items.More() {
|
for items.More() {
|
||||||
resp, err := items.NextPage(ctx)
|
resp, err := items.NextPage(ctx)
|
||||||
|
|
|
@ -165,8 +165,10 @@ func (c *GCSClient) FGetObject(ctx context.Context, bucketName, objectName, loca
|
||||||
// bucket, calling visit for every item.
|
// bucket, calling visit for every item.
|
||||||
// If the underlying client or the visit callback returns an error,
|
// If the underlying client or the visit callback returns an error,
|
||||||
// it returns early.
|
// it returns early.
|
||||||
func (c *GCSClient) VisitObjects(ctx context.Context, bucketName string, visit func(path, etag string) error) error {
|
func (c *GCSClient) VisitObjects(ctx context.Context, bucketName string, prefix string, visit func(path, etag string) error) error {
|
||||||
items := c.Client.Bucket(bucketName).Objects(ctx, nil)
|
items := c.Client.Bucket(bucketName).Objects(ctx, &gcpstorage.Query{
|
||||||
|
Prefix: prefix,
|
||||||
|
})
|
||||||
for {
|
for {
|
||||||
object, err := items.Next()
|
object, err := items.Next()
|
||||||
if err == IteratorDone {
|
if err == IteratorDone {
|
||||||
|
|
|
@ -170,7 +170,7 @@ func TestVisitObjects(t *testing.T) {
|
||||||
}
|
}
|
||||||
keys := []string{}
|
keys := []string{}
|
||||||
etags := []string{}
|
etags := []string{}
|
||||||
err := gcpClient.VisitObjects(context.Background(), bucketName, func(key, etag string) error {
|
err := gcpClient.VisitObjects(context.Background(), bucketName, "", func(key, etag string) error {
|
||||||
keys = append(keys, key)
|
keys = append(keys, key)
|
||||||
etags = append(etags, etag)
|
etags = append(etags, etag)
|
||||||
return nil
|
return nil
|
||||||
|
@ -185,7 +185,7 @@ func TestVisitObjectsErr(t *testing.T) {
|
||||||
Client: client,
|
Client: client,
|
||||||
}
|
}
|
||||||
badBucketName := "bad-bucket"
|
badBucketName := "bad-bucket"
|
||||||
err := gcpClient.VisitObjects(context.Background(), badBucketName, func(key, etag string) error {
|
err := gcpClient.VisitObjects(context.Background(), badBucketName, "", func(key, etag string) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName))
|
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName))
|
||||||
|
@ -196,7 +196,7 @@ func TestVisitObjectsCallbackErr(t *testing.T) {
|
||||||
Client: client,
|
Client: client,
|
||||||
}
|
}
|
||||||
mockErr := fmt.Errorf("mock")
|
mockErr := fmt.Errorf("mock")
|
||||||
err := gcpClient.VisitObjects(context.Background(), bucketName, func(key, etag string) error {
|
err := gcpClient.VisitObjects(context.Background(), bucketName, "", func(key, etag string) error {
|
||||||
return mockErr
|
return mockErr
|
||||||
})
|
})
|
||||||
assert.Error(t, err, mockErr.Error())
|
assert.Error(t, err, mockErr.Error())
|
||||||
|
|
|
@ -105,9 +105,10 @@ func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, lo
|
||||||
// bucket, calling visit for every item.
|
// bucket, calling visit for every item.
|
||||||
// If the underlying client or the visit callback returns an error,
|
// If the underlying client or the visit callback returns an error,
|
||||||
// it returns early.
|
// it returns early.
|
||||||
func (c *MinioClient) VisitObjects(ctx context.Context, bucketName string, visit func(key, etag string) error) error {
|
func (c *MinioClient) VisitObjects(ctx context.Context, bucketName string, prefix string, visit func(key, etag string) error) error {
|
||||||
for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
|
for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
|
||||||
Recursive: true,
|
Recursive: true,
|
||||||
|
Prefix: prefix,
|
||||||
UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()),
|
UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()),
|
||||||
}) {
|
}) {
|
||||||
if object.Err != nil {
|
if object.Err != nil {
|
||||||
|
|
|
@ -36,6 +36,7 @@ import (
|
||||||
|
|
||||||
"github.com/fluxcd/pkg/apis/meta"
|
"github.com/fluxcd/pkg/apis/meta"
|
||||||
"github.com/fluxcd/pkg/sourceignore"
|
"github.com/fluxcd/pkg/sourceignore"
|
||||||
|
|
||||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -62,6 +63,7 @@ var (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
bucketName = "test-bucket-minio" + uuid.New().String()
|
bucketName = "test-bucket-minio" + uuid.New().String()
|
||||||
|
prefix = ""
|
||||||
secret = corev1.Secret{
|
secret = corev1.Secret{
|
||||||
ObjectMeta: v1.ObjectMeta{
|
ObjectMeta: v1.ObjectMeta{
|
||||||
Name: "minio-secret",
|
Name: "minio-secret",
|
||||||
|
@ -228,7 +230,7 @@ func TestFGetObjectNotExists(t *testing.T) {
|
||||||
func TestVisitObjects(t *testing.T) {
|
func TestVisitObjects(t *testing.T) {
|
||||||
keys := []string{}
|
keys := []string{}
|
||||||
etags := []string{}
|
etags := []string{}
|
||||||
err := testMinioClient.VisitObjects(context.TODO(), bucketName, func(key, etag string) error {
|
err := testMinioClient.VisitObjects(context.TODO(), bucketName, prefix, func(key, etag string) error {
|
||||||
keys = append(keys, key)
|
keys = append(keys, key)
|
||||||
etags = append(etags, etag)
|
etags = append(etags, etag)
|
||||||
return nil
|
return nil
|
||||||
|
@ -241,7 +243,7 @@ func TestVisitObjects(t *testing.T) {
|
||||||
func TestVisitObjectsErr(t *testing.T) {
|
func TestVisitObjectsErr(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
badBucketName := "bad-bucket"
|
badBucketName := "bad-bucket"
|
||||||
err := testMinioClient.VisitObjects(ctx, badBucketName, func(string, string) error {
|
err := testMinioClient.VisitObjects(ctx, badBucketName, prefix, func(string, string) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName))
|
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName))
|
||||||
|
@ -249,7 +251,7 @@ func TestVisitObjectsErr(t *testing.T) {
|
||||||
|
|
||||||
func TestVisitObjectsCallbackErr(t *testing.T) {
|
func TestVisitObjectsCallbackErr(t *testing.T) {
|
||||||
mockErr := fmt.Errorf("mock")
|
mockErr := fmt.Errorf("mock")
|
||||||
err := testMinioClient.VisitObjects(context.TODO(), bucketName, func(key, etag string) error {
|
err := testMinioClient.VisitObjects(context.TODO(), bucketName, prefix, func(key, etag string) error {
|
||||||
return mockErr
|
return mockErr
|
||||||
})
|
})
|
||||||
assert.Error(t, err, mockErr.Error())
|
assert.Error(t, err, mockErr.Error())
|
||||||
|
|
Loading…
Reference in New Issue