bucket: Make NewArtifact event more informative
Use the etagIndex to provide more information about the artifact in NewArtifact events and remove the revision from the event message. The revision is still kept in the event annotations. Signed-off-by: Sunny <darkowlzz@protonmail.com>
This commit is contained in:
parent
acda998150
commit
47d09581df
|
|
@ -104,7 +104,7 @@ type BucketReconcilerOptions struct {
|
||||||
|
|
||||||
// bucketReconcilerFunc is the function type for all the bucket reconciler
|
// bucketReconcilerFunc is the function type for all the bucket reconciler
|
||||||
// functions.
|
// functions.
|
||||||
type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error)
|
type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error)
|
||||||
|
|
||||||
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||||
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
|
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
|
||||||
|
|
@ -199,6 +199,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
|
||||||
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
|
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
index := make(etagIndex)
|
||||||
var artifact sourcev1.Artifact
|
var artifact sourcev1.Artifact
|
||||||
|
|
||||||
// Create temp working dir
|
// Create temp working dir
|
||||||
|
|
@ -215,7 +216,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
|
||||||
var res sreconcile.Result
|
var res sreconcile.Result
|
||||||
var resErr error
|
var resErr error
|
||||||
for _, rec := range reconcilers {
|
for _, rec := range reconcilers {
|
||||||
recResult, err := rec(ctx, obj, &artifact, tmpDir)
|
recResult, err := rec(ctx, obj, index, &artifact, tmpDir)
|
||||||
// Exit immediately on ResultRequeue.
|
// Exit immediately on ResultRequeue.
|
||||||
if recResult == sreconcile.ResultRequeue {
|
if recResult == sreconcile.ResultRequeue {
|
||||||
return sreconcile.ResultRequeue, nil
|
return sreconcile.ResultRequeue, nil
|
||||||
|
|
@ -238,7 +239,8 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
|
||||||
// All artifacts for the resource except for the current one are garbage collected from the storage.
|
// All artifacts for the resource except for the current one are garbage collected from the storage.
|
||||||
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
|
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
|
||||||
// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated.
|
// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated.
|
||||||
func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
func (r *BucketReconciler) reconcileStorage(ctx context.Context,
|
||||||
|
obj *sourcev1.Bucket, _ etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
||||||
// Garbage collect previous advertised artifact(s) from storage
|
// Garbage collect previous advertised artifact(s) from storage
|
||||||
_ = r.garbageCollect(ctx, obj)
|
_ = r.garbageCollect(ctx, obj)
|
||||||
|
|
||||||
|
|
@ -266,7 +268,8 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.B
|
||||||
// result.
|
// result.
|
||||||
// If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret
|
// If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret
|
||||||
// fails, it records v1beta1.FetchFailedCondition=True and returns early.
|
// fails, it records v1beta1.FetchFailedCondition=True and returns early.
|
||||||
func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
func (r *BucketReconciler) reconcileSource(ctx context.Context,
|
||||||
|
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
||||||
var secret *corev1.Secret
|
var secret *corev1.Secret
|
||||||
if obj.Spec.SecretRef != nil {
|
if obj.Spec.SecretRef != nil {
|
||||||
secretName := types.NamespacedName{
|
secretName := types.NamespacedName{
|
||||||
|
|
@ -287,9 +290,9 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu
|
||||||
|
|
||||||
switch obj.Spec.Provider {
|
switch obj.Spec.Provider {
|
||||||
case sourcev1.GoogleBucketProvider:
|
case sourcev1.GoogleBucketProvider:
|
||||||
return r.reconcileGCPSource(ctx, obj, artifact, secret, dir)
|
return r.reconcileGCPSource(ctx, obj, index, artifact, secret, dir)
|
||||||
default:
|
default:
|
||||||
return r.reconcileMinioSource(ctx, obj, artifact, secret, dir)
|
return r.reconcileMinioSource(ctx, obj, index, artifact, secret, dir)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -302,8 +305,8 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu
|
||||||
// On a successful download, it removes v1beta1.FetchFailedCondition, and compares the current revision of HEAD to
|
// On a successful download, it removes v1beta1.FetchFailedCondition, and compares the current revision of HEAD to
|
||||||
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
|
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
|
||||||
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
|
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
|
||||||
func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact,
|
func (r *BucketReconciler) reconcileMinioSource(ctx context.Context,
|
||||||
secret *corev1.Secret, dir string) (sreconcile.Result, error) {
|
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, secret *corev1.Secret, dir string) (sreconcile.Result, error) {
|
||||||
// Build the client with the configuration from the object and secret
|
// Build the client with the configuration from the object and secret
|
||||||
s3Client, err := r.buildMinioClient(obj, secret)
|
s3Client, err := r.buildMinioClient(obj, secret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -367,7 +370,6 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
|
||||||
// Build up an index of object keys and their etags
|
// Build up an index of object keys and their etags
|
||||||
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
|
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
|
||||||
// detect both structural and file changes
|
// detect both structural and file changes
|
||||||
var index = make(etagIndex)
|
|
||||||
for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{
|
for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{
|
||||||
Recursive: true,
|
Recursive: true,
|
||||||
UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
|
UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
|
||||||
|
|
@ -438,8 +440,6 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||||
return sreconcile.ResultEmpty, e
|
return sreconcile.ResultEmpty, e
|
||||||
}
|
}
|
||||||
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.BucketOperationSucceededReason,
|
|
||||||
"fetched %d files with revision '%s' from '%s'", len(index), revision, obj.Spec.BucketName)
|
|
||||||
}
|
}
|
||||||
conditions.Delete(obj, sourcev1.FetchFailedCondition)
|
conditions.Delete(obj, sourcev1.FetchFailedCondition)
|
||||||
|
|
||||||
|
|
@ -457,8 +457,8 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
|
||||||
// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to
|
// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to
|
||||||
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
|
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
|
||||||
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
|
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
|
||||||
func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact,
|
func (r *BucketReconciler) reconcileGCPSource(ctx context.Context,
|
||||||
secret *corev1.Secret, dir string) (sreconcile.Result, error) {
|
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, secret *corev1.Secret, dir string) (sreconcile.Result, error) {
|
||||||
gcpClient, err := r.buildGCPClient(ctx, secret)
|
gcpClient, err := r.buildGCPClient(ctx, secret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e := &serror.Event{
|
e := &serror.Event{
|
||||||
|
|
@ -522,7 +522,6 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1
|
||||||
// Build up an index of object keys and their etags
|
// Build up an index of object keys and their etags
|
||||||
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
|
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
|
||||||
// detect both structural and file changes
|
// detect both structural and file changes
|
||||||
var index = make(etagIndex)
|
|
||||||
objects := gcpClient.ListObjects(ctxTimeout, obj.Spec.BucketName, nil)
|
objects := gcpClient.ListObjects(ctxTimeout, obj.Spec.BucketName, nil)
|
||||||
for {
|
for {
|
||||||
object, err := objects.Next()
|
object, err := objects.Next()
|
||||||
|
|
@ -593,8 +592,6 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||||
return sreconcile.ResultEmpty, e
|
return sreconcile.ResultEmpty, e
|
||||||
}
|
}
|
||||||
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.BucketOperationSucceededReason,
|
|
||||||
"fetched %d files from bucket '%s'", len(index), obj.Spec.BucketName)
|
|
||||||
}
|
}
|
||||||
conditions.Delete(obj, sourcev1.FetchFailedCondition)
|
conditions.Delete(obj, sourcev1.FetchFailedCondition)
|
||||||
|
|
||||||
|
|
@ -610,7 +607,8 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1
|
||||||
// If the given artifact does not differ from the object's current, it returns early.
|
// If the given artifact does not differ from the object's current, it returns early.
|
||||||
// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is
|
// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is
|
||||||
// updated to its path.
|
// updated to its path.
|
||||||
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
func (r *BucketReconciler) reconcileArtifact(ctx context.Context,
|
||||||
|
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
||||||
// Always restore the Ready condition in case it got removed due to a transient error
|
// Always restore the Ready condition in case it got removed due to a transient error
|
||||||
defer func() {
|
defer func() {
|
||||||
if obj.GetArtifact().HasRevision(artifact.Revision) {
|
if obj.GetArtifact().HasRevision(artifact.Revision) {
|
||||||
|
|
@ -666,10 +664,10 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.
|
||||||
Reason: sourcev1.StorageOperationFailedReason,
|
Reason: sourcev1.StorageOperationFailedReason,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
r.AnnotatedEventf(obj, map[string]string{
|
r.annotatedEventLogf(ctx, obj, map[string]string{
|
||||||
"revision": artifact.Revision,
|
"revision": artifact.Revision,
|
||||||
"checksum": artifact.Checksum,
|
"checksum": artifact.Checksum,
|
||||||
}, corev1.EventTypeNormal, "NewArtifact", "stored artifact for revision '%s'", artifact.Revision)
|
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", len(index), obj.Spec.BucketName)
|
||||||
|
|
||||||
// Record it on the object
|
// Record it on the object
|
||||||
obj.Status.Artifact = artifact.DeepCopy()
|
obj.Status.Artifact = artifact.DeepCopy()
|
||||||
|
|
@ -803,10 +801,17 @@ func (i etagIndex) Revision() (string, error) {
|
||||||
return fmt.Sprintf("%x", sum.Sum(nil)), nil
|
return fmt.Sprintf("%x", sum.Sum(nil)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// eventLog records event and logs at the same time. This log is different from
|
// eventLogf records event and logs at the same time.
|
||||||
// the debug log in the event recorder in the sense that this is a simple log,
|
|
||||||
// the event recorder debug log contains complete details about the event.
|
|
||||||
func (r *BucketReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
|
func (r *BucketReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
|
||||||
|
r.annotatedEventLogf(ctx, obj, nil, eventType, reason, messageFmt, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// annotatedEventLogf records annotated event and logs at the same time. This
|
||||||
|
// log is different from the debug log in the event recorder in the sense that
|
||||||
|
// this is a simple log, the event recorder debug log contains complete details
|
||||||
|
// about the event.
|
||||||
|
func (r *BucketReconciler) annotatedEventLogf(ctx context.Context,
|
||||||
|
obj runtime.Object, annotations map[string]string, eventType string, reason string, messageFmt string, args ...interface{}) {
|
||||||
msg := fmt.Sprintf(messageFmt, args...)
|
msg := fmt.Sprintf(messageFmt, args...)
|
||||||
// Log and emit event.
|
// Log and emit event.
|
||||||
if eventType == corev1.EventTypeWarning {
|
if eventType == corev1.EventTypeWarning {
|
||||||
|
|
@ -814,5 +819,5 @@ func (r *BucketReconciler) eventLogf(ctx context.Context, obj runtime.Object, ev
|
||||||
} else {
|
} else {
|
||||||
ctrl.LoggerFrom(ctx).Info(msg)
|
ctrl.LoggerFrom(ctx).Info(msg)
|
||||||
}
|
}
|
||||||
r.Eventf(obj, eventType, reason, msg)
|
r.AnnotatedEventf(obj, annotations, eventType, reason, msg)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -264,9 +264,10 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
|
||||||
g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
|
g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
index := make(etagIndex)
|
||||||
var artifact sourcev1.Artifact
|
var artifact sourcev1.Artifact
|
||||||
|
|
||||||
got, err := r.reconcileStorage(context.TODO(), obj, &artifact, "")
|
got, err := r.reconcileStorage(context.TODO(), obj, index, &artifact, "")
|
||||||
g.Expect(err != nil).To(Equal(tt.wantErr))
|
g.Expect(err != nil).To(Equal(tt.wantErr))
|
||||||
g.Expect(got).To(Equal(tt.want))
|
g.Expect(got).To(Equal(tt.want))
|
||||||
|
|
||||||
|
|
@ -549,7 +550,8 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
artifact := &sourcev1.Artifact{}
|
artifact := &sourcev1.Artifact{}
|
||||||
got, err := r.reconcileSource(context.TODO(), obj, artifact, tmpDir)
|
index := make(etagIndex)
|
||||||
|
got, err := r.reconcileSource(context.TODO(), obj, index, artifact, tmpDir)
|
||||||
g.Expect(err != nil).To(Equal(tt.wantErr))
|
g.Expect(err != nil).To(Equal(tt.wantErr))
|
||||||
g.Expect(got).To(Equal(tt.want))
|
g.Expect(got).To(Equal(tt.want))
|
||||||
|
|
||||||
|
|
@ -828,7 +830,8 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
artifact := &sourcev1.Artifact{}
|
artifact := &sourcev1.Artifact{}
|
||||||
got, err := r.reconcileSource(context.TODO(), obj, artifact, tmpDir)
|
index := make(etagIndex)
|
||||||
|
got, err := r.reconcileSource(context.TODO(), obj, index, artifact, tmpDir)
|
||||||
g.Expect(err != nil).To(Equal(tt.wantErr))
|
g.Expect(err != nil).To(Equal(tt.wantErr))
|
||||||
g.Expect(got).To(Equal(tt.want))
|
g.Expect(got).To(Equal(tt.want))
|
||||||
|
|
||||||
|
|
@ -965,6 +968,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
index := make(etagIndex)
|
||||||
artifact := testStorage.NewArtifactFor(obj.Kind, obj, "existing", "foo.tar.gz")
|
artifact := testStorage.NewArtifactFor(obj.Kind, obj, "existing", "foo.tar.gz")
|
||||||
artifact.Checksum = testChecksum
|
artifact.Checksum = testChecksum
|
||||||
|
|
||||||
|
|
@ -972,7 +976,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
||||||
tt.beforeFunc(g, obj, artifact, tmpDir)
|
tt.beforeFunc(g, obj, artifact, tmpDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
got, err := r.reconcileArtifact(context.TODO(), obj, &artifact, tmpDir)
|
got, err := r.reconcileArtifact(context.TODO(), obj, index, &artifact, tmpDir)
|
||||||
g.Expect(err != nil).To(Equal(tt.wantErr))
|
g.Expect(err != nil).To(Equal(tt.wantErr))
|
||||||
g.Expect(got).To(Equal(tt.want))
|
g.Expect(got).To(Equal(tt.want))
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue