Rewrite `BucketReconciler` to new standards

This commit rewrites the `BucketReconciler` to new standards, while
implementing the newly introduced Condition types, and trying to
adhere better to Kubernetes API conventions.

More specifically it introduces:

- Implementation of more explicit Condition types to highlight
  abnormalities.
- Extensive usage of the `conditions` subpackage from `runtime`.
- Better and more conflict-resilient (status)patching of reconciled
  objects using the `patch` subpackage from runtime.
- Proper implementation of kstatus' `Reconciling` and `Stalled`
  conditions.
- Refactor of reconciler logic, including more efficient detection of
  changes to bucket objects by making use of the etag data available,
  and downloading of object files in parallel with a limited number of
  workers (4).
- Integration tests that solely rely on `testenv` and do not
  use Ginkgo.

There are a couple of TODOs marked in-code, these are suggestions for
the future and should be non-blocking.
In addition to the TODOs, more complex and/or edge-case test scenarios
may be added as well.

Signed-off-by: Hidde Beydals <hello@hidde.co>
This commit is contained in:
Hidde Beydals 2021-07-31 03:58:43 +02:00
parent 52f4a2a800
commit 89ba8374b6
5 changed files with 1288 additions and 517 deletions

View File

@ -19,12 +19,10 @@ package v1beta2
import ( import (
"time" "time"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/fluxcd/pkg/apis/acl" "github.com/fluxcd/pkg/apis/acl"
"github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
) )
const ( const (
@ -32,6 +30,19 @@ const (
BucketKind = "Bucket" BucketKind = "Bucket"
) )
const (
GenericBucketProvider string = "generic"
AmazonBucketProvider string = "aws"
GoogleBucketProvider string = "gcp"
)
const (
// DownloadFailedCondition indicates a transient or persistent download failure. If True, observations on the
// upstream Source revision are not possible, and the Artifact available for the Source may be outdated.
// This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True.
DownloadFailedCondition string = "DownloadFailed"
)
// BucketSpec defines the desired state of an S3 compatible bucket // BucketSpec defines the desired state of an S3 compatible bucket
type BucketSpec struct { type BucketSpec struct {
// The S3 compatible storage provider name, default ('generic'). // The S3 compatible storage provider name, default ('generic').
@ -85,12 +96,6 @@ type BucketSpec struct {
AccessFrom *acl.AccessFrom `json:"accessFrom,omitempty"` AccessFrom *acl.AccessFrom `json:"accessFrom,omitempty"`
} }
const (
GenericBucketProvider string = "generic"
AmazonBucketProvider string = "aws"
GoogleBucketProvider string = "gcp"
)
// BucketStatus defines the observed state of a bucket // BucketStatus defines the observed state of a bucket
type BucketStatus struct { type BucketStatus struct {
// ObservedGeneration is the last observed generation. // ObservedGeneration is the last observed generation.
@ -122,45 +127,6 @@ const (
BucketOperationFailedReason string = "BucketOperationFailed" BucketOperationFailedReason string = "BucketOperationFailed"
) )
// BucketProgressing resets the conditions of the Bucket to metav1.Condition of
// type meta.ReadyCondition with status 'Unknown' and meta.ProgressingReason
// reason and message. It returns the modified Bucket.
func BucketProgressing(bucket Bucket) Bucket {
bucket.Status.ObservedGeneration = bucket.Generation
bucket.Status.URL = ""
bucket.Status.Conditions = []metav1.Condition{}
conditions.MarkUnknown(&bucket, meta.ReadyCondition, meta.ProgressingReason, "reconciliation in progress")
return bucket
}
// BucketReady sets the given Artifact and URL on the Bucket and sets the
// meta.ReadyCondition to 'True', with the given reason and message. It returns
// the modified Bucket.
func BucketReady(bucket Bucket, artifact Artifact, url, reason, message string) Bucket {
bucket.Status.Artifact = &artifact
bucket.Status.URL = url
conditions.MarkTrue(&bucket, meta.ReadyCondition, reason, message)
return bucket
}
// BucketNotReady sets the meta.ReadyCondition on the Bucket to 'False', with
// the given reason and message. It returns the modified Bucket.
func BucketNotReady(bucket Bucket, reason, message string) Bucket {
conditions.MarkFalse(&bucket, meta.ReadyCondition, reason, message)
return bucket
}
// BucketReadyMessage returns the message of the metav1.Condition of type
// meta.ReadyCondition with status 'True' if present, or an empty string.
func BucketReadyMessage(bucket Bucket) string {
if c := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); c != nil {
if c.Status == metav1.ConditionTrue {
return c.Message
}
}
return ""
}
// GetConditions returns the status conditions of the object. // GetConditions returns the status conditions of the object.
func (in Bucket) GetConditions() []metav1.Condition { func (in Bucket) GetConditions() []metav1.Condition {
return in.Status.Conditions return in.Status.Conditions

File diff suppressed because it is too large Load Diff

View File

@ -17,59 +17,573 @@ limitations under the License.
package controllers package controllers
import ( import (
"context"
"crypto/md5"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os" "os"
"path"
"path/filepath" "path/filepath"
"strings"
"testing" "testing"
"time"
"github.com/go-logr/logr"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
) )
func TestBucketReconciler_checksum(t *testing.T) { func TestBucketReconciler_Reconcile(t *testing.T) {
g := NewWithT(t)
s3Server := newS3Server("test-bucket")
s3Server.Objects = []*s3MockObject{
{
Key: "test.yaml",
Content: []byte("test"),
ContentType: "text/plain",
LastModified: time.Now(),
},
}
s3Server.Start()
defer s3Server.Stop()
g.Expect(s3Server.HTTPAddress()).ToNot(BeEmpty())
u, err := url.Parse(s3Server.HTTPAddress())
g.Expect(err).NotTo(HaveOccurred())
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "bucket-reconcile-",
Namespace: "default",
},
Data: map[string][]byte{
"accesskey": []byte("key"),
"secretkey": []byte("secret"),
},
}
g.Expect(testEnv.Create(ctx, secret)).To(Succeed())
defer testEnv.Delete(ctx, secret)
obj := &sourcev1.Bucket{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "bucket-reconcile-",
Namespace: "default",
},
Spec: sourcev1.BucketSpec{
Provider: "generic",
BucketName: s3Server.BucketName,
Endpoint: u.Host,
Insecure: true,
Interval: metav1.Duration{Duration: interval},
Timeout: &metav1.Duration{Duration: timeout},
SecretRef: &meta.LocalObjectReference{
Name: secret.Name,
},
},
}
g.Expect(testEnv.Create(ctx, obj)).To(Succeed())
key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace}
// Wait for finalizer to be set
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
return len(obj.Finalizers) > 0
}, timeout).Should(BeTrue())
// Wait for Bucket to be Ready
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
if !conditions.IsReady(obj) || obj.Status.Artifact == nil {
return false
}
readyCondition := conditions.Get(obj, meta.ReadyCondition)
return obj.Generation == readyCondition.ObservedGeneration &&
obj.Generation == obj.Status.ObservedGeneration
}, timeout).Should(BeTrue())
g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
// Wait for Bucket to be deleted
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return apierrors.IsNotFound(err)
}
return false
}, timeout).Should(BeTrue())
}
func TestBucketReconciler_reconcileStorage(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
beforeFunc func(root string) beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error
want string want ctrl.Result
wantErr bool wantErr bool
assertArtifact *sourcev1.Artifact
assertConditions []metav1.Condition
assertPaths []string
}{ }{
{ {
name: "empty root", name: "garbage collects",
want: "da39a3ee5e6b4b0d3255bfef95601890afd80709", beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
revisions := []string{"a", "b", "c"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/reconcile-storage/%s.txt", v),
Revision: v,
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
return err
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/c.txt",
Revision: "c",
Checksum: "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4",
URL: testStorage.Hostname + "/reconcile-storage/c.txt",
},
assertPaths: []string{
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
},
}, },
{ {
name: "with file", name: "notices missing artifact in storage",
beforeFunc: func(root string) { beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
mockFile(root, "a/b/c.txt", "a dummy string") obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/reconcile-storage/invalid.txt"),
Revision: "d",
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
want: ctrl.Result{Requeue: true},
assertPaths: []string{
"!/reconcile-storage/invalid.txt",
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage"),
}, },
want: "309a5e6e96b4a7eea0d1cfaabf1be8ec1c063fa0",
}, },
{ {
name: "with file in different path", name: "updates hostname on diff from current",
beforeFunc: func(root string) { beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
mockFile(root, "a/b.txt", "a dummy string") obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/reconcile-storage/hostname.txt"),
Revision: "f",
Checksum: "971c419dd609331343dee105fffd0f4608dc0bf2",
URL: "http://outdated.com/reconcile-storage/hostname.txt",
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
return err
}
return nil
},
assertPaths: []string{
"/reconcile-storage/hostname.txt",
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/hostname.txt",
Revision: "f",
Checksum: "971c419dd609331343dee105fffd0f4608dc0bf2",
URL: testStorage.Hostname + "/reconcile-storage/hostname.txt",
}, },
want: "e28c62b5cc488849950c4355dddc5523712616d4",
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
root, err := os.MkdirTemp("", "bucket-checksum-") g := NewWithT(t)
if err != nil {
t.Fatal(err) r := &BucketReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}
obj := &sourcev1.Bucket{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
},
} }
defer os.RemoveAll(root)
if tt.beforeFunc != nil { if tt.beforeFunc != nil {
tt.beforeFunc(root) g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
} }
got, err := (&BucketReconciler{}).checksum(root)
if (err != nil) != tt.wantErr { got, err := r.reconcileStorage(context.TODO(), obj)
t.Errorf("checksum() error = %v, wantErr %v", err, tt.wantErr) g.Expect(err != nil).To(Equal(tt.wantErr))
return g.Expect(got).To(Equal(tt.want))
g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact))
if tt.assertArtifact != nil && tt.assertArtifact.URL != "" {
g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL))
} }
if got != tt.want { g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
t.Errorf("checksum() got = %v, want %v", got, tt.want)
for _, p := range tt.assertPaths {
absoluteP := filepath.Join(testStorage.BasePath, p)
if !strings.HasPrefix(p, "!") {
g.Expect(absoluteP).To(BeAnExistingFile())
continue
}
g.Expect(absoluteP).NotTo(BeAnExistingFile())
} }
}) })
} }
} }
func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
tests := []struct {
name string
bucketName string
bucketObjects []*s3MockObject
middleware http.Handler
secret *corev1.Secret
beforeFunc func(obj *sourcev1.Bucket)
want ctrl.Result
wantErr bool
assertArtifact sourcev1.Artifact
assertConditions []metav1.Condition
}{
{
name: "reconciles source",
bucketName: "dummy",
bucketObjects: []*s3MockObject{
{
Key: "test.txt",
Content: []byte("test"),
ContentType: "text/plain",
LastModified: time.Now(),
},
},
assertArtifact: sourcev1.Artifact{
Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz",
Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8",
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"),
},
},
// TODO(hidde): middleware for mock server
//{
// name: "authenticates using secretRef",
// bucketName: "dummy",
//},
{
name: "observes non-existing secretRef",
bucketName: "dummy",
beforeFunc: func(obj *sourcev1.Bucket) {
obj.Spec.SecretRef = &meta.LocalObjectReference{
Name: "dummy",
}
},
wantErr: true,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.AuthenticationFailedReason, "Failed to get secret '/dummy': secrets \"dummy\" not found"),
},
},
{
name: "observes invalid secretRef",
bucketName: "dummy",
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "dummy",
},
},
beforeFunc: func(obj *sourcev1.Bucket) {
obj.Spec.SecretRef = &meta.LocalObjectReference{
Name: "dummy",
}
},
wantErr: true,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to construct S3 client: invalid 'dummy' secret data: required fields"),
},
},
{
name: "observes non-existing bucket name",
bucketName: "dummy",
beforeFunc: func(obj *sourcev1.Bucket) {
obj.Spec.BucketName = "invalid"
},
wantErr: true,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Bucket 'invalid' does not exist"),
},
},
{
name: "transient bucket name API failure",
beforeFunc: func(obj *sourcev1.Bucket) {
obj.Spec.Endpoint = "transient.example.com"
obj.Spec.BucketName = "unavailable"
},
wantErr: true,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket 'unavailable'"),
},
},
{
// TODO(hidde): test the lesser happy paths
name: ".sourceignore",
bucketName: "dummy",
bucketObjects: []*s3MockObject{
{
Key: ".sourceignore",
Content: []byte("ignored/file.txt"),
ContentType: "text/plain",
LastModified: time.Now(),
},
{
Key: "ignored/file.txt",
Content: []byte("ignored/file.txt"),
ContentType: "text/plain",
LastModified: time.Now(),
},
{
Key: "included/file.txt",
Content: []byte("included/file.txt"),
ContentType: "text/plain",
LastModified: time.Now(),
},
},
assertArtifact: sourcev1.Artifact{
Path: "bucket/test-bucket/94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100.tar.gz",
Revision: "94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100",
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
builder := fakeclient.NewClientBuilder().WithScheme(testEnv.Scheme())
if tt.secret != nil {
builder.WithObjects(tt.secret)
}
r := &BucketReconciler{
EventRecorder: record.NewFakeRecorder(32),
Client: builder.Build(),
Storage: testStorage,
}
tmpDir, err := os.MkdirTemp("", "reconcile-bucket-source-")
g.Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(tmpDir)
obj := &sourcev1.Bucket{
TypeMeta: metav1.TypeMeta{
Kind: sourcev1.BucketKind,
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-bucket",
},
Spec: sourcev1.BucketSpec{
Timeout: &metav1.Duration{Duration: timeout},
},
}
var server *s3MockServer
if tt.bucketName != "" {
server = newS3Server(tt.bucketName)
server.Objects = tt.bucketObjects
server.Start()
defer server.Stop()
g.Expect(server.HTTPAddress()).ToNot(BeEmpty())
u, err := url.Parse(server.HTTPAddress())
g.Expect(err).NotTo(HaveOccurred())
obj.Spec.BucketName = tt.bucketName
obj.Spec.Endpoint = u.Host
// TODO(hidde): also test TLS
obj.Spec.Insecure = true
}
if tt.beforeFunc != nil {
tt.beforeFunc(obj)
}
artifact := &sourcev1.Artifact{}
got, err := r.reconcileSource(context.TODO(), obj, artifact, tmpDir)
g.Expect(err != nil).To(Equal(tt.wantErr))
g.Expect(got).To(Equal(tt.want))
g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy()))
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
})
}
}
func TestBucketReconciler_reconcileArtifact(t *testing.T) {
tests := []struct {
name string
artifact sourcev1.Artifact
beforeFunc func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string)
want ctrl.Result
wantErr bool
assertConditions []metav1.Condition
}{
{
name: "artifact revision up-to-date",
artifact: sourcev1.Artifact{
Revision: "existing",
},
beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
obj.Status.Artifact = &artifact
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
},
},
{
name: "dir path deleted",
beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
_ = os.RemoveAll(dir)
},
wantErr: true,
},
//{
// name: "dir path empty",
//},
//{
// name: "success",
// artifact: sourcev1.Artifact{
// Revision: "existing",
// },
// beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
// obj.Status.Artifact = &artifact
// },
// assertConditions: []metav1.Condition{
// *conditions.TrueCondition(sourcev1.ArtifactAvailableCondition, meta.SucceededReason, "Compressed source to artifact with revision 'existing'"),
// },
//},
//{
// name: "symlink",
//},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
tmpDir, err := os.MkdirTemp("", "reconcile-bucket-artifact-")
g.Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(tmpDir)
obj := &sourcev1.Bucket{
TypeMeta: metav1.TypeMeta{
Kind: sourcev1.BucketKind,
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-bucket",
},
Spec: sourcev1.BucketSpec{
Timeout: &metav1.Duration{Duration: timeout},
},
}
if tt.beforeFunc != nil {
tt.beforeFunc(obj, tt.artifact, tmpDir)
}
r := &BucketReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}
dlog := log.NewDelegatingLogSink(log.NullLogSink{})
nullLogger := logr.New(dlog)
got, err := r.reconcileArtifact(logr.NewContext(ctx, nullLogger), obj, tt.artifact, tmpDir)
g.Expect(err != nil).To(Equal(tt.wantErr))
g.Expect(got).To(Equal(tt.want))
//g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy()))
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
})
}
}
func Test_etagIndex_Revision(t *testing.T) {
tests := []struct {
name string
list etagIndex
want string
wantErr bool
}{
{
name: "index with items",
list: map[string]string{
"one": "one",
"two": "two",
"three": "three",
},
want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc",
},
{
name: "index with items in different order",
list: map[string]string{
"three": "three",
"one": "one",
"two": "two",
},
want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc",
},
{
name: "empty index",
list: map[string]string{},
want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
},
{
name: "nil index",
list: nil,
want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.list.Revision()
if (err != nil) != tt.wantErr {
t.Errorf("revision() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("revision() got = %v, want %v", got, tt.want)
}
})
}
}
// helpers
func mockFile(root, path, content string) error { func mockFile(root, path, content string) error {
filePath := filepath.Join(root, path) filePath := filepath.Join(root, path)
if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil { if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil {
@ -80,3 +594,120 @@ func mockFile(root, path, content string) error {
} }
return nil return nil
} }
type s3MockObject struct {
Key string
LastModified time.Time
ContentType string
Content []byte
}
type s3MockServer struct {
srv *httptest.Server
mux *http.ServeMux
BucketName string
Objects []*s3MockObject
}
func newS3Server(bucketName string) *s3MockServer {
s := &s3MockServer{BucketName: bucketName}
s.mux = http.NewServeMux()
s.mux.Handle(fmt.Sprintf("/%s/", s.BucketName), http.HandlerFunc(s.handler))
s.srv = httptest.NewUnstartedServer(s.mux)
return s
}
func (s *s3MockServer) Start() {
s.srv.Start()
}
func (s *s3MockServer) Stop() {
s.srv.Close()
}
func (s *s3MockServer) HTTPAddress() string {
return s.srv.URL
}
func (s *s3MockServer) handler(w http.ResponseWriter, r *http.Request) {
key := path.Base(r.URL.Path)
switch key {
case s.BucketName:
w.Header().Add("Content-Type", "application/xml")
if r.Method == http.MethodHead {
return
}
q := r.URL.Query()
if q["location"] != nil {
fmt.Fprint(w, `
<?xml version="1.0" encoding="UTF-8"?>
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">Europe</LocationConstraint>
`)
return
}
contents := ""
for _, o := range s.Objects {
etag := md5.Sum(o.Content)
contents += fmt.Sprintf(`
<Contents>
<Key>%s</Key>
<LastModified>%s</LastModified>
<Size>%d</Size>
<ETag>&quot;%b&quot;</ETag>
<StorageClass>STANDARD</StorageClass>
</Contents>`, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag)
}
fmt.Fprintf(w, `
<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>%s</Name>
<Prefix/>
<Marker/>
<KeyCount>%d</KeyCount>
<MaxKeys>1000</MaxKeys>
<IsTruncated>false</IsTruncated>
%s
</ListBucketResult>
`, s.BucketName, len(s.Objects), contents)
default:
key, err := filepath.Rel("/"+s.BucketName, r.URL.Path)
if err != nil {
w.WriteHeader(500)
return
}
var found *s3MockObject
for _, o := range s.Objects {
if key == o.Key {
found = o
}
}
if found == nil {
w.WriteHeader(404)
return
}
etag := md5.Sum(found.Content)
lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1)
w.Header().Add("Content-Type", found.ContentType)
w.Header().Add("Last-Modified", lastModified)
w.Header().Add("ETag", fmt.Sprintf("\"%b\"", etag))
w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content)))
if r.Method == http.MethodHead {
return
}
w.Write(found.Content)
}
}

View File

@ -97,6 +97,15 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err)) panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err))
} }
if err := (&BucketReconciler{
Client: testEnv,
EventRecorder: record.NewFakeRecorder(32),
Metrics: testMetricsH,
Storage: testStorage,
}).SetupWithManager(testEnv); err != nil {
panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err))
}
go func() { go func() {
fmt.Println("Starting the test environment") fmt.Println("Starting the test environment")
if err := testEnv.Start(ctx); err != nil { if err := testEnv.Start(ctx); err != nil {

View File

@ -203,11 +203,10 @@ func main() {
os.Exit(1) os.Exit(1)
} }
if err = (&controllers.BucketReconciler{ if err = (&controllers.BucketReconciler{
Client: mgr.GetClient(), Client: mgr.GetClient(),
Scheme: mgr.GetScheme(), EventRecorder: eventRecorder,
Storage: storage, Metrics: metricsH,
EventRecorder: eventRecorder, Storage: storage,
MetricsRecorder: metricsH.MetricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{ }).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{
MaxConcurrentReconciles: concurrent, MaxConcurrentReconciles: concurrent,
}); err != nil { }); err != nil {