Retry downloading the artifact on not found errors
- Extract the artifact operations such as download, verify, untar into a dedicated struct - Introduce a dedicated type for artifact not found errors - On artifact not found errors, log the error, update the ready status message and requeue the object - Retry the artifact download at the interval set with `--requeue-dependency` (defaults to 30s) Signed-off-by: Stefan Prodan <stefan.prodan@gmail.com>
This commit is contained in:
parent
db3c321163
commit
cfd5200fbb
|
|
@ -19,19 +19,13 @@ package controllers
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
securejoin "github.com/cyphar/filepath-securejoin"
|
||||
"github.com/hashicorp/go-retryablehttp"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
|
|
@ -62,7 +56,6 @@ import (
|
|||
"github.com/fluxcd/pkg/runtime/metrics"
|
||||
"github.com/fluxcd/pkg/runtime/predicates"
|
||||
"github.com/fluxcd/pkg/ssa"
|
||||
"github.com/fluxcd/pkg/untar"
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||
|
||||
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta2"
|
||||
|
|
@ -79,7 +72,7 @@ import (
|
|||
// KustomizationReconciler reconciles a Kustomization object
|
||||
type KustomizationReconciler struct {
|
||||
client.Client
|
||||
httpClient *retryablehttp.Client
|
||||
artifactFetcher *ArtifactFetcher
|
||||
requeueDependency time.Duration
|
||||
Scheme *runtime.Scheme
|
||||
EventRecorder kuberecorder.EventRecorder
|
||||
|
|
@ -122,15 +115,7 @@ func (r *KustomizationReconciler) SetupWithManager(mgr ctrl.Manager, opts Kustom
|
|||
|
||||
r.requeueDependency = opts.DependencyRequeueInterval
|
||||
r.statusManager = fmt.Sprintf("gotk-%s", r.ControllerName)
|
||||
|
||||
// Configure the retryable http client used for fetching artifacts.
|
||||
// By default it retries 10 times within a 3.5 minutes window.
|
||||
httpClient := retryablehttp.NewClient()
|
||||
httpClient.RetryWaitMin = 5 * time.Second
|
||||
httpClient.RetryWaitMax = 30 * time.Second
|
||||
httpClient.RetryMax = opts.HTTPRetry
|
||||
httpClient.Logger = nil
|
||||
r.httpClient = httpClient
|
||||
r.artifactFetcher = NewArtifactFetcher(opts.HTTPRetry)
|
||||
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&kustomizev1.Kustomization{}, builder.WithPredicates(
|
||||
|
|
@ -268,6 +253,18 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
|
|||
|
||||
// reconcile kustomization by applying the latest revision
|
||||
reconciledKustomization, reconcileErr := r.reconcile(ctx, *kustomization.DeepCopy(), source)
|
||||
|
||||
// requeue if the artifact is not found
|
||||
if reconcileErr == ArtifactNotFoundError {
|
||||
msg := fmt.Sprintf("Source is not ready, artifact not found, retrying in %s", r.requeueDependency.String())
|
||||
log.Info(msg)
|
||||
if err := r.patchStatus(ctx, req, kustomizev1.KustomizationProgressing(kustomization, msg).Status); err != nil {
|
||||
log.Error(err, "unable to update status for artifact not found")
|
||||
return ctrl.Result{Requeue: true}, err
|
||||
}
|
||||
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
|
||||
}
|
||||
|
||||
if err := r.patchStatus(ctx, req, reconciledKustomization.Status); err != nil {
|
||||
return ctrl.Result{Requeue: true}, err
|
||||
}
|
||||
|
|
@ -320,7 +317,7 @@ func (r *KustomizationReconciler) reconcile(
|
|||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// download artifact and extract files
|
||||
err = r.download(source.GetArtifact(), tmpDir)
|
||||
err = r.artifactFetcher.Fetch(source.GetArtifact(), tmpDir)
|
||||
if err != nil {
|
||||
return kustomizev1.KustomizationNotReady(
|
||||
kustomization,
|
||||
|
|
@ -526,70 +523,6 @@ func (r *KustomizationReconciler) checkDependencies(source sourcev1.Source, kust
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *KustomizationReconciler) download(artifact *sourcev1.Artifact, tmpDir string) error {
|
||||
artifactURL := artifact.URL
|
||||
if hostname := os.Getenv("SOURCE_CONTROLLER_LOCALHOST"); hostname != "" {
|
||||
u, err := url.Parse(artifactURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
u.Host = hostname
|
||||
artifactURL = u.String()
|
||||
}
|
||||
|
||||
req, err := retryablehttp.NewRequest(http.MethodGet, artifactURL, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create a new request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := r.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download artifact, error: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// check response
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("failed to download artifact from %s, status: %s", artifactURL, resp.Status)
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
||||
// verify checksum matches origin
|
||||
if err := r.verifyArtifact(artifact, &buf, resp.Body); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// extract
|
||||
if _, err = untar.Untar(&buf, tmpDir); err != nil {
|
||||
return fmt.Errorf("failed to untar artifact, error: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *KustomizationReconciler) verifyArtifact(artifact *sourcev1.Artifact, buf *bytes.Buffer, reader io.Reader) error {
|
||||
hasher := sha256.New()
|
||||
|
||||
// for backwards compatibility with source-controller v0.17.2 and older
|
||||
if len(artifact.Checksum) == 40 {
|
||||
hasher = sha1.New()
|
||||
}
|
||||
|
||||
// compute checksum
|
||||
mw := io.MultiWriter(hasher, buf)
|
||||
if _, err := io.Copy(mw, reader); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if checksum := fmt.Sprintf("%x", hasher.Sum(nil)); checksum != artifact.Checksum {
|
||||
return fmt.Errorf("failed to verify artifact: computed checksum '%s' doesn't match advertised '%s'",
|
||||
checksum, artifact.Checksum)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *KustomizationReconciler) getSource(ctx context.Context, kustomization kustomizev1.Kustomization) (sourcev1.Source, error) {
|
||||
var source sourcev1.Source
|
||||
sourceNamespace := kustomization.GetNamespace()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
Copyright 2022 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/fluxcd/pkg/untar"
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||
"github.com/hashicorp/go-retryablehttp"
|
||||
)
|
||||
|
||||
// ArtifactFetcher holds the HTTP client that reties with back off when
|
||||
// the artifact server is offline.
|
||||
type ArtifactFetcher struct {
|
||||
httpClient *retryablehttp.Client
|
||||
}
|
||||
|
||||
// ArtifactNotFoundError is an error type used to signal 404 HTTP status code responses.
|
||||
var ArtifactNotFoundError = errors.New("artifact not found")
|
||||
|
||||
// NewArtifactFetcher configures the retryable http client used for fetching artifacts.
|
||||
// By default, it retries 10 times within a 3.5 minutes window.
|
||||
func NewArtifactFetcher(retries int) *ArtifactFetcher {
|
||||
httpClient := retryablehttp.NewClient()
|
||||
httpClient.RetryWaitMin = 5 * time.Second
|
||||
httpClient.RetryWaitMax = 30 * time.Second
|
||||
httpClient.RetryMax = retries
|
||||
httpClient.Logger = nil
|
||||
|
||||
return &ArtifactFetcher{httpClient: httpClient}
|
||||
}
|
||||
|
||||
// Fetch downloads, verifies and extracts the artifact content to the specified directory.
|
||||
// If the artifact server responds with 5xx errors, the download operation is retried.
|
||||
// If the artifact server responds with 404, the returned error is of type ArtifactNotFoundError.
|
||||
// If the artifact server is unavailable for more than 3 minutes, the returned error contains the original status code.
|
||||
func (r *ArtifactFetcher) Fetch(artifact *sourcev1.Artifact, dir string) error {
|
||||
artifactURL := artifact.URL
|
||||
if hostname := os.Getenv("SOURCE_CONTROLLER_LOCALHOST"); hostname != "" {
|
||||
u, err := url.Parse(artifactURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
u.Host = hostname
|
||||
artifactURL = u.String()
|
||||
}
|
||||
|
||||
req, err := retryablehttp.NewRequest(http.MethodGet, artifactURL, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create a new request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := r.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download artifact, error: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if code := resp.StatusCode; code != http.StatusOK {
|
||||
if code == http.StatusNotFound {
|
||||
return ArtifactNotFoundError
|
||||
}
|
||||
return fmt.Errorf("failed to download artifact from %s, status: %s", artifactURL, resp.Status)
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
||||
// verify checksum matches origin
|
||||
if err := r.Verify(artifact, &buf, resp.Body); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// extract
|
||||
if _, err = untar.Untar(&buf, dir); err != nil {
|
||||
return fmt.Errorf("failed to untar artifact, error: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verify computes the checksum of the tarball and returns an error if the computed value
|
||||
// does not match the artifact advertised checksum.
|
||||
func (r *ArtifactFetcher) Verify(artifact *sourcev1.Artifact, buf *bytes.Buffer, reader io.Reader) error {
|
||||
hasher := sha256.New()
|
||||
|
||||
// for backwards compatibility with source-controller v0.17.2 and older
|
||||
if len(artifact.Checksum) == 40 {
|
||||
hasher = sha1.New()
|
||||
}
|
||||
|
||||
// compute checksum
|
||||
mw := io.MultiWriter(hasher, buf)
|
||||
if _, err := io.Copy(mw, reader); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if checksum := fmt.Sprintf("%x", hasher.Sum(nil)); checksum != artifact.Checksum {
|
||||
return fmt.Errorf("failed to verify artifact: computed checksum '%s' doesn't match advertised '%s'",
|
||||
checksum, artifact.Checksum)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
Copyright 2022 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/testserver"
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||
. "github.com/onsi/gomega"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta2"
|
||||
)
|
||||
|
||||
func TestKustomizationReconciler_ArtifactDownload(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
id := "fetch-" + randStringRunes(5)
|
||||
revision := "v1.0.0"
|
||||
|
||||
err := createNamespace(id)
|
||||
g.Expect(err).NotTo(HaveOccurred(), "failed to create test namespace")
|
||||
|
||||
err = createKubeConfigSecret(id)
|
||||
g.Expect(err).NotTo(HaveOccurred(), "failed to create kubeconfig secret")
|
||||
|
||||
manifests := func(name string, data string) []testserver.File {
|
||||
return []testserver.File{
|
||||
{
|
||||
Name: "secret.yaml",
|
||||
Body: fmt.Sprintf(`---
|
||||
apiVersion: v1
|
||||
kind: Secret
|
||||
metadata:
|
||||
name: %[1]s
|
||||
stringData:
|
||||
key: "%[2]s"
|
||||
`, name, data),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
artifact, err := testServer.ArtifactFromFiles(manifests(id, randStringRunes(5)))
|
||||
g.Expect(err).NotTo(HaveOccurred(), "failed to create artifact from files")
|
||||
|
||||
repositoryName := types.NamespacedName{
|
||||
Name: fmt.Sprintf("fetch-%s", randStringRunes(5)),
|
||||
Namespace: id,
|
||||
}
|
||||
|
||||
err = applyGitRepository(repositoryName, artifact, revision)
|
||||
g.Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
kustomizationKey := types.NamespacedName{
|
||||
Name: fmt.Sprintf("fetch-%s", randStringRunes(5)),
|
||||
Namespace: id,
|
||||
}
|
||||
kustomization := &kustomizev1.Kustomization{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: kustomizationKey.Name,
|
||||
Namespace: kustomizationKey.Namespace,
|
||||
},
|
||||
Spec: kustomizev1.KustomizationSpec{
|
||||
Interval: metav1.Duration{Duration: reconciliationInterval},
|
||||
Path: "./",
|
||||
KubeConfig: &kustomizev1.KubeConfig{
|
||||
SecretRef: meta.SecretKeyReference{
|
||||
Name: "kubeconfig",
|
||||
},
|
||||
},
|
||||
SourceRef: kustomizev1.CrossNamespaceSourceReference{
|
||||
Name: repositoryName.Name,
|
||||
Namespace: repositoryName.Namespace,
|
||||
Kind: sourcev1.GitRepositoryKind,
|
||||
},
|
||||
HealthChecks: []meta.NamespacedObjectKindReference{
|
||||
{
|
||||
APIVersion: "v1",
|
||||
Kind: "Secret",
|
||||
Name: id,
|
||||
Namespace: id,
|
||||
},
|
||||
},
|
||||
TargetNamespace: id,
|
||||
Force: false,
|
||||
},
|
||||
}
|
||||
|
||||
g.Expect(k8sClient.Create(context.Background(), kustomization)).To(Succeed())
|
||||
|
||||
resultK := &kustomizev1.Kustomization{}
|
||||
repo := &sourcev1.GitRepository{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: sourcev1.GitRepositoryKind,
|
||||
APIVersion: sourcev1.GroupVersion.String(),
|
||||
},
|
||||
}
|
||||
g.Expect(k8sClient.Get(context.Background(), repositoryName, repo)).Should(Succeed())
|
||||
repoURL := repo.Status.Artifact.URL
|
||||
|
||||
t.Run("downloads artifact", func(t *testing.T) {
|
||||
g.Eventually(func() bool {
|
||||
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK)
|
||||
return apimeta.IsStatusConditionTrue(resultK.Status.Conditions, meta.ReadyCondition) &&
|
||||
resultK.Status.LastAppliedRevision == revision
|
||||
}, timeout, time.Second).Should(BeTrue())
|
||||
})
|
||||
|
||||
t.Run("retries on not found errors", func(t *testing.T) {
|
||||
repo.Status.Artifact.URL = repoURL + "not-found"
|
||||
repo.ManagedFields = nil
|
||||
g.Expect(k8sClient.Status().Update(context.Background(), repo)).To(Succeed())
|
||||
|
||||
g.Eventually(func() bool {
|
||||
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK)
|
||||
ready := apimeta.FindStatusCondition(resultK.Status.Conditions, meta.ReadyCondition)
|
||||
return ready.Reason == meta.ProgressingReason
|
||||
}, timeout, time.Second).Should(BeTrue())
|
||||
|
||||
g.Expect(apimeta.FindStatusCondition(resultK.Status.Conditions, meta.ReadyCondition).Message).To(ContainSubstring("artifact not found"))
|
||||
})
|
||||
|
||||
t.Run("recovers after not found errors", func(t *testing.T) {
|
||||
g.Expect(k8sClient.Get(context.Background(), repositoryName, repo)).Should(Succeed())
|
||||
repo.Status.Artifact.URL = repoURL
|
||||
repo.ManagedFields = nil
|
||||
g.Expect(k8sClient.Status().Update(context.Background(), repo)).To(Succeed())
|
||||
|
||||
g.Eventually(func() bool {
|
||||
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK)
|
||||
return apimeta.IsStatusConditionTrue(resultK.Status.Conditions, meta.ReadyCondition)
|
||||
}, timeout, time.Second).Should(BeTrue())
|
||||
})
|
||||
}
|
||||
|
|
@ -167,7 +167,10 @@ func TestMain(m *testing.M) {
|
|||
EventRecorder: testEnv.GetEventRecorderFor(controllerName),
|
||||
MetricsRecorder: testMetricsH.MetricsRecorder,
|
||||
}
|
||||
if err := (reconciler).SetupWithManager(testEnv, KustomizationReconcilerOptions{MaxConcurrentReconciles: 4}); err != nil {
|
||||
if err := (reconciler).SetupWithManager(testEnv, KustomizationReconcilerOptions{
|
||||
MaxConcurrentReconciles: 4,
|
||||
DependencyRequeueInterval: 2 * time.Second,
|
||||
}); err != nil {
|
||||
panic(fmt.Sprintf("Failed to start KustomizationReconciler: %v", err))
|
||||
}
|
||||
}, func() error {
|
||||
|
|
|
|||
Loading…
Reference in New Issue