Add proxy support for GCS buckets
Signed-off-by: Matheus Pimenta <matheuscscp@gmail.com>
This commit is contained in:
parent
c41c2d6f09
commit
31ed900a90
|
@ -113,7 +113,7 @@ type BucketSpec struct {
|
|||
// ProxySecretRef specifies the Secret containing the proxy configuration
|
||||
// to use while communicating with the Bucket server.
|
||||
//
|
||||
// Only supported for the generic provider.
|
||||
// Only supported for the `generic` and `gcp` providers.
|
||||
// +optional
|
||||
ProxySecretRef *meta.LocalObjectReference `json:"proxySecretRef,omitempty"`
|
||||
|
||||
|
|
|
@ -397,7 +397,7 @@ spec:
|
|||
to use while communicating with the Bucket server.
|
||||
|
||||
|
||||
Only supported for the generic provider.
|
||||
Only supported for the `generic` and `gcp` providers.
|
||||
properties:
|
||||
name:
|
||||
description: Name of the referent.
|
||||
|
|
|
@ -219,7 +219,7 @@ github.com/fluxcd/pkg/apis/meta.LocalObjectReference
|
|||
<em>(Optional)</em>
|
||||
<p>ProxySecretRef specifies the Secret containing the proxy configuration
|
||||
to use while communicating with the Bucket server.</p>
|
||||
<p>Only supported for the generic provider.</p>
|
||||
<p>Only supported for the <code>generic</code> and <code>gcp</code> providers.</p>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
@ -1648,7 +1648,7 @@ github.com/fluxcd/pkg/apis/meta.LocalObjectReference
|
|||
<em>(Optional)</em>
|
||||
<p>ProxySecretRef specifies the Secret containing the proxy configuration
|
||||
to use while communicating with the Bucket server.</p>
|
||||
<p>Only supported for the generic provider.</p>
|
||||
<p>Only supported for the <code>generic</code> and <code>gcp</code> providers.</p>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
|
|
@ -854,7 +854,7 @@ The Secret can contain three keys:
|
|||
- `password`, to specify the password to use if the proxy server is protected by
|
||||
basic authentication. This is an optional key.
|
||||
|
||||
This API is only supported for the `generic` [provider](#provider).
|
||||
This API is only supported for the `generic` and `gcp` [providers](#provider).
|
||||
|
||||
Example:
|
||||
|
||||
|
|
4
go.mod
4
go.mod
|
@ -9,6 +9,7 @@ replace github.com/fluxcd/source-controller/api => ./api
|
|||
replace github.com/opencontainers/go-digest => github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be
|
||||
|
||||
require (
|
||||
cloud.google.com/go/compute/metadata v0.3.0
|
||||
cloud.google.com/go/storage v1.39.1
|
||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1
|
||||
|
@ -60,6 +61,7 @@ require (
|
|||
github.com/sirupsen/logrus v1.9.3
|
||||
github.com/spf13/pflag v1.0.5
|
||||
golang.org/x/crypto v0.22.0
|
||||
golang.org/x/oauth2 v0.19.0
|
||||
golang.org/x/sync v0.7.0
|
||||
google.golang.org/api v0.177.0
|
||||
gotest.tools v2.2.0+incompatible
|
||||
|
@ -77,7 +79,6 @@ require (
|
|||
cloud.google.com/go v0.112.2 // indirect
|
||||
cloud.google.com/go/auth v0.3.0 // indirect
|
||||
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
|
||||
cloud.google.com/go/compute/metadata v0.3.0 // indirect
|
||||
cloud.google.com/go/iam v1.1.6 // indirect
|
||||
dario.cat/mergo v1.0.0 // indirect
|
||||
filippo.io/edwards25519 v1.1.0 // indirect
|
||||
|
@ -360,7 +361,6 @@ require (
|
|||
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
|
||||
golang.org/x/mod v0.17.0 // indirect
|
||||
golang.org/x/net v0.24.0 // indirect
|
||||
golang.org/x/oauth2 v0.19.0 // indirect
|
||||
golang.org/x/sys v0.19.0 // indirect
|
||||
golang.org/x/term v0.19.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
|
|
|
@ -431,6 +431,12 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
|
|||
// Return error as the world as observed may change
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
proxyURL, err := r.getProxyURL(ctx, obj)
|
||||
if err != nil {
|
||||
e := serror.NewGeneric(err, sourcev1.AuthenticationFailedReason)
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e)
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
|
||||
// Construct provider client
|
||||
var provider BucketProvider
|
||||
|
@ -441,7 +447,14 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
|
|||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e)
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
if provider, err = gcp.NewClient(ctx, secret); err != nil {
|
||||
var opts []gcp.Option
|
||||
if secret != nil {
|
||||
opts = append(opts, gcp.WithSecret(secret))
|
||||
}
|
||||
if proxyURL != nil {
|
||||
opts = append(opts, gcp.WithProxyURL(proxyURL))
|
||||
}
|
||||
if provider, err = gcp.NewClient(ctx, opts...); err != nil {
|
||||
e := serror.NewGeneric(err, "ClientError")
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e)
|
||||
return sreconcile.ResultEmpty, e
|
||||
|
@ -482,12 +495,6 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
|
|||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e)
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
proxyURL, err := r.getProxyURL(ctx, obj)
|
||||
if err != nil {
|
||||
e := serror.NewGeneric(err, sourcev1.AuthenticationFailedReason)
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
var opts []minio.Option
|
||||
if secret != nil {
|
||||
opts = append(opts, minio.WithSecret(secret))
|
||||
|
|
|
@ -445,7 +445,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
|
|||
assertConditions []metav1.Condition
|
||||
}{
|
||||
{
|
||||
name: "Reconciles GCS source",
|
||||
name: "Reconciles generic source",
|
||||
bucketName: "dummy",
|
||||
bucketObjects: []*s3mock.Object{
|
||||
{
|
||||
|
@ -972,6 +972,49 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
|
|||
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Observes non-existing proxySecretRef",
|
||||
bucketName: "dummy",
|
||||
beforeFunc: func(obj *bucketv1.Bucket) {
|
||||
obj.Spec.ProxySecretRef = &meta.LocalObjectReference{
|
||||
Name: "dummy",
|
||||
}
|
||||
conditions.MarkReconciling(obj, meta.ProgressingReason, "foo")
|
||||
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
|
||||
},
|
||||
want: sreconcile.ResultEmpty,
|
||||
wantErr: true,
|
||||
assertIndex: index.NewDigester(),
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
|
||||
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Observes invalid proxySecretRef",
|
||||
bucketName: "dummy",
|
||||
secret: &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "dummy",
|
||||
},
|
||||
},
|
||||
beforeFunc: func(obj *bucketv1.Bucket) {
|
||||
obj.Spec.ProxySecretRef = &meta.LocalObjectReference{
|
||||
Name: "dummy",
|
||||
}
|
||||
conditions.MarkReconciling(obj, meta.ProgressingReason, "foo")
|
||||
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
|
||||
},
|
||||
want: sreconcile.ResultEmpty,
|
||||
wantErr: true,
|
||||
assertIndex: index.NewDigester(),
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid proxy secret '/dummy': key 'address' is missing"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
|
||||
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Observes non-existing bucket name",
|
||||
bucketName: "dummy",
|
||||
|
@ -1217,7 +1260,11 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
|
|||
sp := patch.NewSerialPatcher(obj, r.Client)
|
||||
|
||||
got, err := r.reconcileSource(context.TODO(), sp, obj, index, tmpDir)
|
||||
g.Expect(err != nil).To(Equal(tt.wantErr))
|
||||
if tt.wantErr {
|
||||
g.Expect(err).To(HaveOccurred())
|
||||
} else {
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
}
|
||||
g.Expect(got).To(Equal(tt.want))
|
||||
|
||||
g.Expect(index.Index()).To(Equal(tt.assertIndex.Index()))
|
||||
|
|
110
pkg/gcp/gcp.go
110
pkg/gcp/gcp.go
|
@ -21,13 +21,17 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
gcpstorage "cloud.google.com/go/storage"
|
||||
"github.com/go-logr/logr"
|
||||
"golang.org/x/oauth2/google"
|
||||
"google.golang.org/api/iterator"
|
||||
"google.golang.org/api/option"
|
||||
htransport "google.golang.org/api/transport/http"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
)
|
||||
|
@ -48,24 +52,96 @@ type GCSClient struct {
|
|||
*gcpstorage.Client
|
||||
}
|
||||
|
||||
// NewClient creates a new GCP storage client. The Client will automatically look for the Google Application
|
||||
// Credential environment variable or look for the Google Application Credential file.
|
||||
func NewClient(ctx context.Context, secret *corev1.Secret) (*GCSClient, error) {
|
||||
c := &GCSClient{}
|
||||
if secret != nil {
|
||||
client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"]))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Client = client
|
||||
} else {
|
||||
client, err := gcpstorage.NewClient(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Client = client
|
||||
// Option is a functional option for configuring the GCS client.
|
||||
type Option func(*options)
|
||||
|
||||
// WithSecret sets the secret to use for authenticating with GCP.
|
||||
func WithSecret(secret *corev1.Secret) Option {
|
||||
return func(o *options) {
|
||||
o.secret = secret
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// WithProxyURL sets the proxy URL to use for the GCS client.
|
||||
func WithProxyURL(proxyURL *url.URL) Option {
|
||||
return func(o *options) {
|
||||
o.proxyURL = proxyURL
|
||||
}
|
||||
}
|
||||
|
||||
type options struct {
|
||||
secret *corev1.Secret
|
||||
proxyURL *url.URL
|
||||
|
||||
// newCustomHTTPClient should create a new HTTP client for interacting with the GCS API.
|
||||
// This is a test-only option required for mocking the real logic, which requires either
|
||||
// a valid Google Service Account Key or ADC. Both are not available in tests.
|
||||
// The real logic is implemented in the newHTTPClient function, which is used when
|
||||
// constructing the default options object.
|
||||
newCustomHTTPClient func(context.Context, *options) (*http.Client, error)
|
||||
}
|
||||
|
||||
func newOptions() *options {
|
||||
return &options{
|
||||
newCustomHTTPClient: newHTTPClient,
|
||||
}
|
||||
}
|
||||
|
||||
// NewClient creates a new GCP storage client. The Client will automatically look for the Google Application
|
||||
// Credential environment variable or look for the Google Application Credential file.
|
||||
func NewClient(ctx context.Context, opts ...Option) (*GCSClient, error) {
|
||||
o := newOptions()
|
||||
for _, opt := range opts {
|
||||
opt(o)
|
||||
}
|
||||
|
||||
var clientOpts []option.ClientOption
|
||||
|
||||
switch {
|
||||
case o.secret != nil && o.proxyURL == nil:
|
||||
clientOpts = append(clientOpts, option.WithCredentialsJSON(o.secret.Data["serviceaccount"]))
|
||||
case o.proxyURL != nil:
|
||||
httpClient, err := o.newCustomHTTPClient(ctx, o)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clientOpts = append(clientOpts, option.WithHTTPClient(httpClient))
|
||||
}
|
||||
|
||||
client, err := gcpstorage.NewClient(ctx, clientOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &GCSClient{client}, nil
|
||||
}
|
||||
|
||||
// newHTTPClient creates a new HTTP client for interacting with Google Cloud APIs.
|
||||
func newHTTPClient(ctx context.Context, o *options) (*http.Client, error) {
|
||||
baseTransport := http.DefaultTransport.(*http.Transport).Clone()
|
||||
if o.proxyURL != nil {
|
||||
baseTransport.Proxy = http.ProxyURL(o.proxyURL)
|
||||
}
|
||||
|
||||
var opts []option.ClientOption
|
||||
|
||||
if o.secret != nil {
|
||||
// Here we can't use option.WithCredentialsJSON() because htransport.NewTransport()
|
||||
// won't know what scopes to use and yield a 400 Bad Request error when retrieving
|
||||
// the OAuth token. Instead we use google.CredentialsFromJSON(), which allows us to
|
||||
// specify the GCS read-only scope.
|
||||
creds, err := google.CredentialsFromJSON(ctx, o.secret.Data["serviceaccount"], gcpstorage.ScopeReadOnly)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create Google credentials from secret: %w", err)
|
||||
}
|
||||
opts = append(opts, option.WithCredentials(creds))
|
||||
}
|
||||
|
||||
transport, err := htransport.NewTransport(ctx, baseTransport, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create Google HTTP transport: %w", err)
|
||||
}
|
||||
return &http.Client{Transport: transport}, nil
|
||||
}
|
||||
|
||||
// ValidateSecret validates the credential secret. The provided Secret may
|
||||
|
|
|
@ -26,19 +26,22 @@ import (
|
|||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"cloud.google.com/go/compute/metadata"
|
||||
gcpstorage "cloud.google.com/go/storage"
|
||||
"google.golang.org/api/googleapi"
|
||||
"google.golang.org/api/option"
|
||||
raw "google.golang.org/api/storage/v1"
|
||||
"gotest.tools/assert"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"google.golang.org/api/option"
|
||||
testproxy "github.com/fluxcd/source-controller/tests/proxy"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -46,10 +49,13 @@ const (
|
|||
objectName string = "test.yaml"
|
||||
objectGeneration int64 = 3
|
||||
objectEtag string = "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk="
|
||||
envGCSHost string = "STORAGE_EMULATOR_HOST"
|
||||
envADC string = "GOOGLE_APPLICATION_CREDENTIALS"
|
||||
)
|
||||
|
||||
var (
|
||||
hc *http.Client
|
||||
host string
|
||||
client *gcpstorage.Client
|
||||
close func()
|
||||
err error
|
||||
|
@ -76,7 +82,7 @@ var (
|
|||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
hc, close = newTestServer(func(w http.ResponseWriter, r *http.Request) {
|
||||
hc, host, close = newTestServer(func(w http.ResponseWriter, r *http.Request) {
|
||||
io.Copy(io.Discard, r.Body)
|
||||
switch r.RequestURI {
|
||||
case fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName):
|
||||
|
@ -140,12 +146,98 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
|
||||
func TestNewClientWithSecretErr(t *testing.T) {
|
||||
gcpClient, err := NewClient(context.Background(), secret.DeepCopy())
|
||||
gcpClient, err := NewClient(context.Background(), WithSecret(secret.DeepCopy()))
|
||||
t.Log(err)
|
||||
assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value")
|
||||
assert.Assert(t, gcpClient == nil)
|
||||
}
|
||||
|
||||
func TestNewClientWithProxyErr(t *testing.T) {
|
||||
_, envADCIsSet := os.LookupEnv(envADC)
|
||||
assert.Assert(t, !envADCIsSet)
|
||||
assert.Assert(t, !metadata.OnGCE())
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
opts []Option
|
||||
err string
|
||||
}{
|
||||
{
|
||||
name: "invalid secret",
|
||||
opts: []Option{WithSecret(secret.DeepCopy())},
|
||||
err: "failed to create Google credentials from secret: invalid character 'e' looking for beginning of value",
|
||||
},
|
||||
{
|
||||
name: "attempts default credentials",
|
||||
err: "failed to create Google HTTP transport: google: could not find default credentials. See https://cloud.google.com/docs/authentication/external/set-up-adc for more information",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
opts := append([]Option{WithProxyURL(&url.URL{})}, tt.opts...)
|
||||
gcpClient, err := NewClient(context.Background(), opts...)
|
||||
assert.Error(t, err, tt.err)
|
||||
assert.Assert(t, gcpClient == nil)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxy(t *testing.T) {
|
||||
proxyAddr, proxyPort := testproxy.New(t)
|
||||
|
||||
err := os.Setenv(envGCSHost, fmt.Sprintf("https://%s", host))
|
||||
assert.NilError(t, err)
|
||||
defer func() {
|
||||
err := os.Unsetenv(envGCSHost)
|
||||
assert.NilError(t, err)
|
||||
}()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
proxyURL *url.URL
|
||||
err string
|
||||
}{
|
||||
{
|
||||
name: "with correct address",
|
||||
proxyURL: &url.URL{Scheme: "http", Host: proxyAddr},
|
||||
},
|
||||
{
|
||||
name: "with incorrect address",
|
||||
proxyURL: &url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", proxyPort+1)},
|
||||
err: "connection refused",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
opts := []Option{WithProxyURL(tt.proxyURL)}
|
||||
opts = append(opts, func(o *options) {
|
||||
o.newCustomHTTPClient = func(ctx context.Context, o *options) (*http.Client, error) {
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
Proxy: http.ProxyURL(o.proxyURL),
|
||||
}
|
||||
return &http.Client{Transport: transport}, nil
|
||||
}
|
||||
})
|
||||
gcpClient, err := NewClient(context.Background(), opts...)
|
||||
assert.NilError(t, err)
|
||||
assert.Assert(t, gcpClient != nil)
|
||||
gcpClient.Client.SetRetry(gcpstorage.WithMaxAttempts(1))
|
||||
exists, err := gcpClient.BucketExists(context.Background(), bucketName)
|
||||
if tt.err != "" {
|
||||
assert.ErrorContains(t, err, tt.err)
|
||||
} else {
|
||||
assert.NilError(t, err)
|
||||
assert.Assert(t, exists)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBucketExists(t *testing.T) {
|
||||
gcpClient := &GCSClient{
|
||||
Client: client,
|
||||
|
@ -272,16 +364,17 @@ func TestValidateSecret(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func newTestServer(handler func(w http.ResponseWriter, r *http.Request)) (*http.Client, func()) {
|
||||
func newTestServer(handler func(w http.ResponseWriter, r *http.Request)) (*http.Client, string, func()) {
|
||||
ts := httptest.NewTLSServer(http.HandlerFunc(handler))
|
||||
host := ts.Listener.Addr().String()
|
||||
tlsConf := &tls.Config{InsecureSkipVerify: true}
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: tlsConf,
|
||||
DialTLS: func(netw, addr string) (net.Conn, error) {
|
||||
return tls.Dial("tcp", ts.Listener.Addr().String(), tlsConf)
|
||||
return tls.Dial("tcp", host, tlsConf)
|
||||
},
|
||||
}
|
||||
return &http.Client{Transport: tr}, func() {
|
||||
return &http.Client{Transport: tr}, host, func() {
|
||||
tr.CloseIdleConnections()
|
||||
ts.Close()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue