Normalise error messages

Signed-off-by: Stefan Prodan <stefan.prodan@gmail.com>
This commit is contained in:
Stefan Prodan 2022-07-08 14:06:50 +03:00
parent 942d92834b
commit 9a6ff19487
No known key found for this signature in database
GPG Key ID: 3299AEB0E4085BAF
1 changed files with 101 additions and 66 deletions

View File

@ -297,43 +297,57 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
// Generates registry credential keychain
// Generate the registry credential keychain
keychain, err := r.keychain(ctx, obj)
if err != nil {
e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
e := serror.NewGeneric(
fmt.Errorf("failed to get credential: %w", err),
sourcev1.AuthenticationFailedReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
// Generates transport for remote operations
// Generate the transport for remote operations
transport, err := r.transport(ctx, obj)
if err != nil {
e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
e := serror.NewGeneric(
fmt.Errorf("failed to generate transport for '%s': %w", obj.Spec.URL, err),
sourcev1.OCIOperationFailedReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
// Determine which artifact revision to pull
url, err := r.getArtifactURL(ctxTimeout, obj, keychain, transport)
if err != nil {
e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
e := serror.NewGeneric(
fmt.Errorf("failed to determine the artifact address for '%s': %w", obj.Spec.URL, err),
sourcev1.URLInvalidReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
// Pull artifact from the remote container registry
img, err := crane.Pull(url, r.craneOptions(ctxTimeout, keychain, transport)...)
if err != nil {
e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
e := serror.NewGeneric(
fmt.Errorf("failed to pull artifact from '%s': %w", obj.Spec.URL, err),
sourcev1.OCIOperationFailedReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
// Determine the artifact SHA256 digest
imgDigest, err := img.Digest()
if err != nil {
e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
e := serror.NewGeneric(
fmt.Errorf("failed to determine artifact digest: %w", err),
sourcev1.OCIOperationFailedReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
@ -344,7 +358,7 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
// Mark observations about the revision on the object
defer func() {
if !obj.GetArtifact().HasRevision(revision) {
message := fmt.Sprintf("new upstream revision '%s' for '%s'", revision, url)
message := fmt.Sprintf("new digest '%s' for '%s'", revision, url)
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
conditions.MarkReconciling(obj, "NewRevision", message)
}
@ -354,28 +368,39 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
if !obj.GetArtifact().HasRevision(revision) {
layers, err := img.Layers()
if err != nil {
e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
e := serror.NewGeneric(
fmt.Errorf("failed to parse artifact layers: %w", err),
sourcev1.OCIOperationFailedReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
if len(layers) < 1 {
err = fmt.Errorf("no layers found in artifact")
e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
e := serror.NewGeneric(
fmt.Errorf("no layers found in artifact"),
sourcev1.OCIOperationFailedReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
blob, err := layers[0].Compressed()
if err != nil {
e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
e := serror.NewGeneric(
fmt.Errorf("failed to extract the first layer from artifact: %w", err),
sourcev1.OCIOperationFailedReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
if _, err = untar.Untar(blob, dir); err != nil {
e := serror.NewGeneric(err, sourcev1.OCIOperationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
e := serror.NewGeneric(
fmt.Errorf("failed to untar the first layer from artifact: %w", err),
sourcev1.OCIOperationFailedReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
}
@ -384,7 +409,7 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
return sreconcile.ResultSuccess, nil
}
// parseRepositoryURL extracts the repository URL.
// parseRepositoryURL validates and extracts the repository URL.
func (r *OCIRepositoryReconciler) parseRepositoryURL(obj *sourcev1.OCIRepository) (string, error) {
if !strings.HasPrefix(obj.Spec.URL, sourcev1.OCIRepositoryPrefix) {
return "", fmt.Errorf("URL must be in format 'oci://<domain>/<org>/<repo>'")
@ -393,14 +418,15 @@ func (r *OCIRepositoryReconciler) parseRepositoryURL(obj *sourcev1.OCIRepository
url := strings.TrimPrefix(obj.Spec.URL, sourcev1.OCIRepositoryPrefix)
ref, err := name.ParseReference(url)
if err != nil {
return "", fmt.Errorf("'%s' invalid URL: %w", obj.Spec.URL, err)
return "", err
}
return ref.Context().Name(), nil
}
// getArtifactURL determines which tag or digest should be used and returns the OCI artifact FQN.
func (r *OCIRepositoryReconciler) getArtifactURL(ctx context.Context, obj *sourcev1.OCIRepository, keychain authn.Keychain, transport http.RoundTripper) (string, error) {
func (r *OCIRepositoryReconciler) getArtifactURL(ctx context.Context,
obj *sourcev1.OCIRepository, keychain authn.Keychain, transport http.RoundTripper) (string, error) {
url, err := r.parseRepositoryURL(obj)
if err != nil {
return "", err
@ -429,7 +455,8 @@ func (r *OCIRepositoryReconciler) getArtifactURL(ctx context.Context, obj *sourc
// getTagBySemver call the remote container registry, fetches all the tags from the repository,
// and returns the latest tag according to the semver expression.
func (r *OCIRepositoryReconciler) getTagBySemver(ctx context.Context, url, exp string, keychain authn.Keychain, transport http.RoundTripper) (string, error) {
func (r *OCIRepositoryReconciler) getTagBySemver(ctx context.Context,
url, exp string, keychain authn.Keychain, transport http.RoundTripper) (string, error) {
tags, err := crane.ListTags(url, r.craneOptions(ctx, keychain, transport)...)
if err != nil {
return "", err
@ -495,7 +522,8 @@ func (r *OCIRepositoryReconciler) keychain(ctx context.Context, obj *sourcev1.OC
imagePullSecret := corev1.Secret{}
err := r.Get(ctx, types.NamespacedName{Namespace: obj.Namespace, Name: imagePullSecretName}, &imagePullSecret)
if err != nil {
r.eventLogf(ctx, obj, events.EventSeverityTrace, "secret %q not found", imagePullSecretName)
r.eventLogf(ctx, obj, events.EventSeverityTrace, sourcev1.AuthenticationFailedReason,
"auth secret '%s' not found", imagePullSecretName)
return nil, err
}
imagePullSecrets[i] = imagePullSecret
@ -504,51 +532,54 @@ func (r *OCIRepositoryReconciler) keychain(ctx context.Context, obj *sourcev1.OC
return k8schain.NewFromPullSecrets(ctx, imagePullSecrets)
}
// transport clones the default transport from remote.
// If certSecretRef is configured in the resource configuration,
// returned transport will iclude client and/or CA certifactes
// transport clones the default transport from remote and when a certSecretRef is specified,
// the returned transport will include the TLS client and/or CA certificates.
func (r *OCIRepositoryReconciler) transport(ctx context.Context, obj *sourcev1.OCIRepository) (http.RoundTripper, error) {
if obj.Spec.CertSecretRef != nil {
var certSecret corev1.Secret
err := r.Get(ctx,
types.NamespacedName{Namespace: obj.Namespace, Name: obj.Spec.CertSecretRef.Name},
&certSecret)
if obj.Spec.CertSecretRef == nil || obj.Spec.CertSecretRef.Name == "" {
return nil, nil
}
if err != nil {
r.eventLogf(ctx, obj, events.EventSeverityTrace, "secret %q not found", obj.Spec.CertSecretRef.Name)
return nil, err
}
certSecretName := types.NamespacedName{
Namespace: obj.Namespace,
Name: obj.Spec.CertSecretRef.Name,
}
var certSecret corev1.Secret
if err := r.Get(ctx, certSecretName, &certSecret); err != nil {
return nil, err
}
transport := remote.DefaultTransport.Clone()
tlsConfig := transport.TLSClientConfig
transport := remote.DefaultTransport.Clone()
tlsConfig := transport.TLSClientConfig
if clientCert, ok := certSecret.Data[ClientCert]; ok {
// parse and set client cert and secret
if clientKey, ok := certSecret.Data[ClientKey]; ok {
cert, err := tls.X509KeyPair(clientCert, clientKey)
if err != nil {
return nil, err
}
tlsConfig.Certificates = append(tlsConfig.Certificates, cert)
} else {
return nil, fmt.Errorf("client certificate found, but no key")
}
}
if caCert, ok := certSecret.Data[CACert]; ok {
syscerts, err := x509.SystemCertPool()
if clientCert, ok := certSecret.Data[ClientCert]; ok {
// parse and set client cert and secret
if clientKey, ok := certSecret.Data[ClientKey]; ok {
cert, err := tls.X509KeyPair(clientCert, clientKey)
if err != nil {
return nil, err
}
syscerts.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = syscerts
tlsConfig.Certificates = append(tlsConfig.Certificates, cert)
} else {
return nil, fmt.Errorf("'%s' found in secret, but no %s", ClientCert, ClientKey)
}
return transport, nil
}
return nil, nil
if caCert, ok := certSecret.Data[CACert]; ok {
syscerts, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
syscerts.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = syscerts
}
return transport, nil
}
// craneOptions sets the timeout and user agent for all operations against remote container registries.
func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context, keychain authn.Keychain, transport http.RoundTripper) []crane.Option {
// craneOptions sets the auth headers, timeout and user agent
// for all operations against remote container registries.
func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context,
keychain authn.Keychain, transport http.RoundTripper) []crane.Option {
options := []crane.Option{
crane.WithContext(ctx),
crane.WithUserAgent("flux/v2"),
@ -574,7 +605,8 @@ func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context, keychain aut
// condition is added.
// The hostname of any URL in the Status of the object are updated, to ensure
// they match the Storage server hostname of current runtime.
func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.OCIRepository, _ *gcrv1.Hash, _ string) (sreconcile.Result, error) {
func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context,
obj *sourcev1.OCIRepository, _ *gcrv1.Hash, _ string) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
_ = r.garbageCollect(ctx, obj)
@ -609,7 +641,8 @@ func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sou
// early.
// On a successful archive, the Artifact in the Status of the object is set,
// and the symlink in the Storage is updated to its path.
func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.OCIRepository, digest *gcrv1.Hash, dir string) (sreconcile.Result, error) {
func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context,
obj *sourcev1.OCIRepository, digest *gcrv1.Hash, dir string) (sreconcile.Result, error) {
// Calculate revision
revision := digest.Hex
@ -628,7 +661,7 @@ func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *so
// The artifact is up-to-date
if obj.GetArtifact().HasRevision(artifact.Revision) {
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.ArtifactUpToDateReason,
"artifact up-to-date with remote revision: '%s'", artifact.Revision)
"artifact up-to-date with remote digest: '%s'", artifact.Revision)
return sreconcile.ResultSuccess, nil
}
@ -751,7 +784,8 @@ func (r *OCIRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc
// This log is different from the debug log in the EventRecorder, in the sense
// that this is a simple log. While the debug log contains complete details
// about the event.
func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context,
obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
msg := fmt.Sprintf(messageFmt, args...)
// Log and emit event.
if eventType == corev1.EventTypeWarning {
@ -763,7 +797,8 @@ func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Obj
}
// notify emits notification related to the reconciliation.
func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.OCIRepository, digest *gcrv1.Hash, res sreconcile.Result, resErr error) {
func (r *OCIRepositoryReconciler) notify(ctx context.Context,
oldObj, newObj *sourcev1.OCIRepository, digest *gcrv1.Hash, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {