[RFC-0010] Add multi-tenant workload identity support for AWS Bucket

Signed-off-by: cappyzawa <cappyzawa@gmail.com>
This commit is contained in:
cappyzawa 2025-08-13 23:42:11 +09:00
parent 48da00dba2
commit 041aa6c993
No known key found for this signature in database
7 changed files with 152 additions and 115 deletions

View File

@ -33,7 +33,8 @@ const (
// BucketProviderGeneric for any S3 API compatible storage Bucket.
BucketProviderGeneric string = "generic"
// BucketProviderAmazon for an AWS S3 object storage Bucket.
// Provides support for retrieving credentials from the AWS EC2 service.
// Provides support for retrieving credentials from the AWS EC2 service
// and workload identity authentication.
BucketProviderAmazon string = "aws"
// BucketProviderGoogle for a Google Cloud Storage Bucket.
// Provides support for authentication using a workload identity.
@ -51,7 +52,7 @@ const (
// +kubebuilder:validation:XValidation:rule="self.provider != 'generic' || !has(self.sts) || self.sts.provider == 'ldap'", message="'ldap' is the only supported STS provider for the 'generic' Bucket provider"
// +kubebuilder:validation:XValidation:rule="!has(self.sts) || self.sts.provider != 'aws' || !has(self.sts.secretRef)", message="spec.sts.secretRef is not required for the 'aws' STS provider"
// +kubebuilder:validation:XValidation:rule="!has(self.sts) || self.sts.provider != 'aws' || !has(self.sts.certSecretRef)", message="spec.sts.certSecretRef is not required for the 'aws' STS provider"
// +kubebuilder:validation:XValidation:rule="self.provider == 'gcp' || !has(self.serviceAccountName)", message="ServiceAccountName is only supported for the 'gcp' Bucket provider"
// +kubebuilder:validation:XValidation:rule="self.provider == 'gcp' || self.provider == 'aws' || !has(self.serviceAccountName)", message="ServiceAccountName is only supported for the 'gcp' and 'aws' Bucket providers"
// +kubebuilder:validation:XValidation:rule="!has(self.secretRef) || !has(self.serviceAccountName)", message="cannot set both .spec.secretRef and .spec.serviceAccountName"
type BucketSpec struct {
// Provider of the object storage bucket.
@ -96,7 +97,8 @@ type BucketSpec struct {
SecretRef *meta.LocalObjectReference `json:"secretRef,omitempty"`
// ServiceAccountName is the name of the Kubernetes ServiceAccount used to authenticate
// the bucket. For more information about workload identity:
// the bucket. This field is only supported for the 'gcp' and 'aws' providers.
// For more information about workload identity:
// https://fluxcd.io/flux/components/source/buckets/#workload-identity
// +optional
ServiceAccountName string `json:"serviceAccountName,omitempty"`

View File

@ -145,7 +145,8 @@ spec:
serviceAccountName:
description: |-
ServiceAccountName is the name of the Kubernetes ServiceAccount used to authenticate
the bucket. For more information about workload identity:
the bucket. This field is only supported for the 'gcp' and 'aws' providers.
For more information about workload identity:
https://fluxcd.io/flux/components/source/buckets/#workload-identity
type: string
sts:
@ -238,8 +239,9 @@ spec:
rule: '!has(self.sts) || self.sts.provider != ''aws'' || !has(self.sts.secretRef)'
- message: spec.sts.certSecretRef is not required for the 'aws' STS provider
rule: '!has(self.sts) || self.sts.provider != ''aws'' || !has(self.sts.certSecretRef)'
- message: ServiceAccountName is only supported for the 'gcp' Bucket provider
rule: self.provider == 'gcp' || !has(self.serviceAccountName)
- message: ServiceAccountName is only supported for the 'gcp' and 'aws'
Bucket providers
rule: self.provider == 'gcp' || self.provider == 'aws' || !has(self.serviceAccountName)
- message: cannot set both .spec.secretRef and .spec.serviceAccountName
rule: '!has(self.secretRef) || !has(self.serviceAccountName)'
status:

View File

@ -190,7 +190,8 @@ string
<td>
<em>(Optional)</em>
<p>ServiceAccountName is the name of the Kubernetes ServiceAccount used to authenticate
the bucket. For more information about workload identity:
the bucket. This field is only supported for the &lsquo;gcp&rsquo; and &lsquo;aws&rsquo; providers.
For more information about workload identity:
<a href="https://fluxcd.io/flux/components/source/buckets/#workload-identity">https://fluxcd.io/flux/components/source/buckets/#workload-identity</a></p>
</td>
</tr>
@ -1646,7 +1647,8 @@ string
<td>
<em>(Optional)</em>
<p>ServiceAccountName is the name of the Kubernetes ServiceAccount used to authenticate
the bucket. For more information about workload identity:
the bucket. This field is only supported for the &lsquo;gcp&rsquo; and &lsquo;aws&rsquo; providers.
For more information about workload identity:
<a href="https://fluxcd.io/flux/components/source/buckets/#workload-identity">https://fluxcd.io/flux/components/source/buckets/#workload-identity</a></p>
</td>
</tr>

View File

@ -199,6 +199,8 @@ The Provider allows for specifying the
[Amazon AWS Region](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions)
using the [`.spec.region` field](#region).
For detailed setup instructions, see: https://fluxcd.io/flux/integrations/aws/#for-amazon-simple-storage-service
##### AWS EC2 example
**Note:** On EKS you have to create an [IAM role](#aws-iam-role-example) for
@ -273,6 +275,55 @@ data:
secretkey: <BASE64>
```
##### AWS Controller-Level Workload Identity example
```yaml
---
apiVersion: source.toolkit.fluxcd.io/v1
kind: Bucket
metadata:
name: aws-controller-level-workload-identity
namespace: default
spec:
interval: 5m0s
provider: aws
bucketName: podinfo
endpoint: s3.amazonaws.com
region: us-east-1
timeout: 30s
```
##### AWS Object-Level Workload Identity example
**Note:** To use Object-Level Workload Identity (`.spec.serviceAccountName` with
cloud providers), the controller feature gate `ObjectLevelWorkloadIdentity` must
be enabled.
```yaml
---
apiVersion: source.toolkit.fluxcd.io/v1
kind: Bucket
metadata:
name: aws-object-level-workload-identity
namespace: default
spec:
interval: 5m0s
provider: aws
bucketName: podinfo
endpoint: s3.amazonaws.com
region: us-east-1
serviceAccountName: aws-workload-identity-sa
timeout: 30s
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: aws-workload-identity-sa
namespace: default
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::123456789012:role/flux-bucket-role
```
#### Azure
When a Bucket's `.spec.provider` is set to `azure`, the source-controller will

View File

@ -30,6 +30,9 @@ import (
"github.com/minio/minio-go/v7/pkg/s3utils"
corev1 "k8s.io/api/core/v1"
"github.com/fluxcd/pkg/auth"
awsauth "github.com/fluxcd/pkg/auth/aws"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
)
@ -46,6 +49,7 @@ type options struct {
tlsConfig *tls.Config
stsTLSConfig *tls.Config
proxyURL *url.URL
authOpts []auth.Option
}
// Option is a function that configures the Minio client.
@ -86,8 +90,15 @@ func WithSTSTLSConfig(tlsConfig *tls.Config) Option {
}
}
// WithAuth sets the auth options for workload identity authentication.
func WithAuth(authOpts ...auth.Option) Option {
return func(o *options) {
o.authOpts = authOpts
}
}
// NewClient creates a new Minio storage client.
func NewClient(bucket *sourcev1.Bucket, opts ...Option) (*MinioClient, error) {
func NewClient(ctx context.Context, bucket *sourcev1.Bucket, opts ...Option) (*MinioClient, error) {
var o options
for _, opt := range opts {
opt(&o)
@ -105,7 +116,11 @@ func NewClient(bucket *sourcev1.Bucket, opts ...Option) (*MinioClient, error) {
case o.secret != nil:
minioOpts.Creds = newCredsFromSecret(o.secret)
case bucketProvider == sourcev1.BucketProviderAmazon:
minioOpts.Creds = newAWSCreds(bucket, o.proxyURL)
creds, err := newAWSCreds(ctx, &o)
if err != nil {
return nil, err
}
minioOpts.Creds = creds
case bucketProvider == sourcev1.BucketProviderGeneric:
minioOpts.Creds = newGenericCreds(bucket, &o)
}
@ -159,23 +174,30 @@ func newCredsFromSecret(secret *corev1.Secret) *credentials.Credentials {
}
// newAWSCreds creates a new Minio credentials object for `aws` bucket provider.
func newAWSCreds(bucket *sourcev1.Bucket, proxyURL *url.URL) *credentials.Credentials {
stsEndpoint := ""
if sts := bucket.Spec.STS; sts != nil {
stsEndpoint = sts.Endpoint
//
// This function is only called when Secret authentication is not available.
//
// Uses AWS SDK's config.LoadDefaultConfig() which supports:
// - Workload Identity (IRSA/EKS Pod Identity)
// - EC2 instance profiles
// - Environment variables
// - Shared credentials files
// - All other AWS SDK authentication methods
func newAWSCreds(ctx context.Context, o *options) (*credentials.Credentials, error) {
var opts auth.Options
opts.Apply(o.authOpts...)
awsCredsProvider := awsauth.NewCredentialsProvider(ctx, o.authOpts...)
awsCreds, err := awsCredsProvider.Retrieve(ctx)
if err != nil {
return nil, fmt.Errorf("AWS authentication failed: %w", err)
}
creds := credentials.NewIAM(stsEndpoint)
if proxyURL != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.Proxy = http.ProxyURL(proxyURL)
client := &http.Client{Transport: transport}
creds = credentials.New(&credentials.IAM{
Client: client,
Endpoint: stsEndpoint,
})
}
return creds
return credentials.NewStaticV4(
awsCreds.AccessKeyID,
awsCreds.SecretAccessKey,
awsCreds.SessionToken,
), nil
}
// newGenericCreds creates a new Minio credentials object for the `generic` bucket provider.

View File

@ -20,7 +20,6 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
@ -76,6 +75,8 @@ var (
testServerCert string
// testServerKey is the path to the server key used to start the Minio and STS servers.
testServerKey string
// ctx is the common context used in tests.
ctx context.Context
)
var (
@ -126,6 +127,9 @@ var (
)
func TestMain(m *testing.M) {
// Initialize common test context
ctx = context.Background()
// Uses a sensible default on Windows (TCP/HTTP) and Linux/MacOS (socket)
pool, err := dockertest.NewPool("")
if err != nil {
@ -173,7 +177,7 @@ func TestMain(m *testing.M) {
testMinioAddress = fmt.Sprintf("127.0.0.1:%v", resource.GetPort("9000/tcp"))
// Construct a Minio client using the address of the Minio server.
testMinioClient, err = NewClient(bucketStub(bucket, testMinioAddress),
testMinioClient, err = NewClient(ctx, bucketStub(bucket, testMinioAddress),
WithSecret(secret.DeepCopy()),
WithTLSConfig(testTLSConfig))
if err != nil {
@ -197,7 +201,6 @@ func TestMain(m *testing.M) {
log.Fatalf("could not connect to docker: %s", err)
}
ctx := context.Background()
createBucket(ctx)
addObjectToBucket(ctx)
run := m.Run()
@ -208,7 +211,7 @@ func TestMain(m *testing.M) {
}
func TestNewClient(t *testing.T) {
minioClient, err := NewClient(bucketStub(bucket, testMinioAddress),
minioClient, err := NewClient(ctx, bucketStub(bucket, testMinioAddress),
WithSecret(secret.DeepCopy()),
WithTLSConfig(testTLSConfig))
assert.NilError(t, err)
@ -216,35 +219,54 @@ func TestNewClient(t *testing.T) {
}
func TestNewClientEmptySecret(t *testing.T) {
minioClient, err := NewClient(bucketStub(bucket, testMinioAddress),
minioClient, err := NewClient(ctx, bucketStub(bucket, testMinioAddress),
WithSecret(emptySecret.DeepCopy()),
WithTLSConfig(testTLSConfig))
assert.NilError(t, err)
assert.Assert(t, minioClient != nil)
}
func TestNewClientAwsProvider(t *testing.T) {
minioClient, err := NewClient(bucketStub(bucketAwsProvider, testMinioAddress))
assert.NilError(t, err)
assert.Assert(t, minioClient != nil)
func TestNewClientAWSProvider(t *testing.T) {
t.Run("with secret", func(t *testing.T) {
validSecret := corev1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: "valid-secret",
Namespace: "default",
},
Data: map[string][]byte{
"accesskey": []byte(testMinioRootUser),
"secretkey": []byte(testMinioRootPassword),
},
Type: "Opaque",
}
bucket := bucketStub(bucketAwsProvider, testMinioAddress)
minioClient, err := NewClient(ctx, bucket, WithSecret(&validSecret))
assert.NilError(t, err)
assert.Assert(t, minioClient != nil)
})
t.Run("without secret", func(t *testing.T) {
bucket := bucketStub(bucketAwsProvider, testMinioAddress)
minioClient, err := NewClient(ctx, bucket)
assert.ErrorContains(t, err, "AWS authentication failed")
assert.Assert(t, minioClient == nil)
})
}
func TestBucketExists(t *testing.T) {
ctx := context.Background()
exists, err := testMinioClient.BucketExists(ctx, bucketName)
assert.NilError(t, err)
assert.Assert(t, exists)
}
func TestBucketNotExists(t *testing.T) {
ctx := context.Background()
exists, err := testMinioClient.BucketExists(ctx, "notexistsbucket")
assert.NilError(t, err)
assert.Assert(t, !exists)
}
func TestFGetObject(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
_, err := testMinioClient.FGetObject(ctx, bucketName, objectName, path)
@ -252,41 +274,7 @@ func TestFGetObject(t *testing.T) {
}
func TestNewClientAndFGetObjectWithSTSEndpoint(t *testing.T) {
// start a mock AWS STS server
awsSTSListener, awsSTSAddr, awsSTSPort := testlistener.New(t)
awsSTSEndpoint := fmt.Sprintf("http://%s", awsSTSAddr)
awsSTSHandler := http.NewServeMux()
awsSTSHandler.HandleFunc("PUT "+credentials.TokenPath,
func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte("mock-token"))
assert.NilError(t, err)
})
awsSTSHandler.HandleFunc("GET "+credentials.DefaultIAMSecurityCredsPath,
func(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get(credentials.TokenRequestHeader)
assert.Equal(t, token, "mock-token")
_, err := w.Write([]byte("mock-role"))
assert.NilError(t, err)
})
var credsRetrieved bool
awsSTSHandler.HandleFunc("GET "+credentials.DefaultIAMSecurityCredsPath+"mock-role",
func(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get(credentials.TokenRequestHeader)
assert.Equal(t, token, "mock-token")
err := json.NewEncoder(w).Encode(map[string]any{
"Code": "Success",
"AccessKeyID": testMinioRootUser,
"SecretAccessKey": testMinioRootPassword,
})
assert.NilError(t, err)
credsRetrieved = true
})
awsSTSServer := &http.Server{
Addr: awsSTSAddr,
Handler: awsSTSHandler,
}
go awsSTSServer.Serve(awsSTSListener)
defer awsSTSServer.Shutdown(context.Background())
// start a mock LDAP STS server
ldapSTSListener, ldapSTSAddr, ldapSTSPort := testlistener.New(t)
@ -313,7 +301,7 @@ func TestNewClientAndFGetObjectWithSTSEndpoint(t *testing.T) {
Handler: ldapSTSHandler,
}
go ldapSTSServer.ServeTLS(ldapSTSListener, testServerCert, testServerKey)
defer ldapSTSServer.Shutdown(context.Background())
defer ldapSTSServer.Shutdown(ctx)
// start proxy
proxyAddr, proxyPort := testproxy.New(t)
@ -327,42 +315,6 @@ func TestNewClientAndFGetObjectWithSTSEndpoint(t *testing.T) {
ldapPassword string
err string
}{
{
name: "with correct aws endpoint",
provider: "aws",
stsSpec: &sourcev1.BucketSTSSpec{
Provider: "aws",
Endpoint: awsSTSEndpoint,
},
},
{
name: "with incorrect aws endpoint",
provider: "aws",
stsSpec: &sourcev1.BucketSTSSpec{
Provider: "aws",
Endpoint: fmt.Sprintf("http://localhost:%d", awsSTSPort+1),
},
err: "connection refused",
},
{
name: "with correct aws endpoint and proxy",
provider: "aws",
stsSpec: &sourcev1.BucketSTSSpec{
Provider: "aws",
Endpoint: awsSTSEndpoint,
},
opts: []Option{WithProxyURL(&url.URL{Scheme: "http", Host: proxyAddr})},
},
{
name: "with correct aws endpoint and incorrect proxy",
provider: "aws",
stsSpec: &sourcev1.BucketSTSSpec{
Provider: "aws",
Endpoint: awsSTSEndpoint,
},
opts: []Option{WithProxyURL(&url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", proxyPort+1)})},
err: "connection refused",
},
{
name: "with correct ldap endpoint",
provider: "generic",
@ -448,11 +400,10 @@ func TestNewClientAndFGetObjectWithSTSEndpoint(t *testing.T) {
opts := tt.opts
opts = append(opts, WithTLSConfig(testTLSConfig))
minioClient, err := NewClient(bucket, opts...)
minioClient, err := NewClient(ctx, bucket, opts...)
assert.NilError(t, err)
assert.Assert(t, minioClient != nil)
ctx := context.Background()
path := filepath.Join(t.TempDir(), sourceignore.IgnoreFile)
_, err = minioClient.FGetObject(ctx, bucketName, objectName, path)
if tt.err != "" {
@ -487,13 +438,12 @@ func TestNewClientAndFGetObjectWithProxy(t *testing.T) {
// run test
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
minioClient, err := NewClient(bucketStub(bucket, testMinioAddress),
minioClient, err := NewClient(ctx, bucketStub(bucket, testMinioAddress),
WithSecret(secret.DeepCopy()),
WithTLSConfig(testTLSConfig),
WithProxyURL(tt.proxyURL))
assert.NilError(t, err)
assert.Assert(t, minioClient != nil)
ctx := context.Background()
tempDir := t.TempDir()
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
_, err = minioClient.FGetObject(ctx, bucketName, objectName, path)
@ -507,7 +457,6 @@ func TestNewClientAndFGetObjectWithProxy(t *testing.T) {
}
func TestFGetObjectNotExists(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
badKey := "invalid.txt"
path := filepath.Join(tempDir, badKey)
@ -530,7 +479,6 @@ func TestVisitObjects(t *testing.T) {
}
func TestVisitObjectsErr(t *testing.T) {
ctx := context.Background()
badBucketName := "bad-bucket"
err := testMinioClient.VisitObjects(ctx, badBucketName, prefix, func(string, string) error {
return nil

View File

@ -884,6 +884,14 @@ func (r *BucketReconciler) createBucketProvider(ctx context.Context, obj *source
authOpts = append(authOpts, auth.WithProxyURL(*creds.proxyURL))
}
if obj.Spec.Region != "" {
authOpts = append(authOpts, auth.WithSTSRegion(obj.Spec.Region))
}
if sts := obj.Spec.STS; sts != nil {
authOpts = append(authOpts, auth.WithSTSEndpoint(sts.Endpoint))
}
switch obj.Spec.Provider {
case sourcev1.BucketProviderGoogle:
var opts []gcp.Option
@ -933,6 +941,8 @@ func (r *BucketReconciler) createBucketProvider(ctx context.Context, obj *source
var opts []minio.Option
if creds.secret != nil {
opts = append(opts, minio.WithSecret(creds.secret))
} else if obj.Spec.Provider == sourcev1.BucketProviderAmazon {
opts = append(opts, minio.WithAuth(authOpts...))
}
if creds.tlsConfig != nil {
opts = append(opts, minio.WithTLSConfig(creds.tlsConfig))
@ -946,7 +956,7 @@ func (r *BucketReconciler) createBucketProvider(ctx context.Context, obj *source
if creds.stsTLSConfig != nil {
opts = append(opts, minio.WithSTSTLSConfig(creds.stsTLSConfig))
}
return minio.NewClient(obj, opts...)
return minio.NewClient(ctx, obj, opts...)
}
}