Enable experimental drift detection

This enables experimental drift detection of cluster state compared to
the current manifest data from the Helm storage's manifest blob.

Drift detection works based on the already proven approach of the
kustomize-controller's SSA package, and utilizes the managed field
configured by the controller since `v0.12.2`.

This feature is planned to go out of experimental once the further
controller rewrite has been finished, and the state of the Helm storage
itself is more fault tolerant.

Signed-off-by: Hidde Beydals <hidde@hhh.computer>
This commit is contained in:
Hidde Beydals 2023-02-27 10:44:27 +01:00
parent 16ce900b20
commit 1240f20183
No known key found for this signature in database
GPG Key ID: 979F380FC2341744
8 changed files with 381 additions and 10 deletions

View File

@ -38,6 +38,7 @@ import (
"k8s.io/client-go/rest"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -53,13 +54,15 @@ import (
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/acl"
fluxClient "github.com/fluxcd/pkg/runtime/client"
runtimeClient "github.com/fluxcd/pkg/runtime/client"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
"github.com/fluxcd/pkg/runtime/transform"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
v2 "github.com/fluxcd/helm-controller/api/v2beta1"
"github.com/fluxcd/helm-controller/internal/diff"
"github.com/fluxcd/helm-controller/internal/features"
"github.com/fluxcd/helm-controller/internal/kube"
"github.com/fluxcd/helm-controller/internal/runner"
"github.com/fluxcd/helm-controller/internal/util"
@ -83,8 +86,11 @@ type HelmReleaseReconciler struct {
MetricsRecorder *metrics.Recorder
DefaultServiceAccount string
NoCrossNamespaceRef bool
ClientOpts fluxClient.Options
KubeConfigOpts fluxClient.KubeConfigOptions
ClientOpts runtimeClient.Options
KubeConfigOpts runtimeClient.KubeConfigOptions
StatusPoller *polling.StatusPoller
PollingOpts polling.Options
ControllerName string
}
func (r *HelmReleaseReconciler) SetupWithManager(mgr ctrl.Manager, opts HelmReleaseReconcilerOptions) error {
@ -103,7 +109,7 @@ func (r *HelmReleaseReconciler) SetupWithManager(mgr ctrl.Manager, opts HelmRele
r.requeueDependency = opts.DependencyRequeueInterval
// Configure the retryable http client used for fetching artifacts.
// By default it retries 10 times within a 3.5 minutes window.
// By default, it retries 10 times within a 3.5 minutes window.
httpClient := retryablehttp.NewClient()
httpClient.RetryWaitMin = 5 * time.Second
httpClient.RetryWaitMax = 30 * time.Second
@ -319,6 +325,44 @@ func (r *HelmReleaseReconciler) reconcileRelease(ctx context.Context,
releaseRevision := util.ReleaseRevision(rel)
valuesChecksum := util.ValuesChecksum(values)
hr, hasNewState := v2.HelmReleaseAttempted(hr, revision, releaseRevision, valuesChecksum)
// Run diff against current cluster state.
if !hasNewState {
if ok, _ := features.Enabled(features.DetectDrift); ok {
differ := diff.NewDiffer(runtimeClient.NewImpersonator(
r.Client,
r.StatusPoller,
r.PollingOpts,
hr.Spec.KubeConfig,
r.KubeConfigOpts,
r.DefaultServiceAccount,
hr.Spec.ServiceAccountName,
hr.GetNamespace(),
), r.ControllerName)
changeSet, drift, err := differ.Diff(ctx, rel)
if err != nil {
if changeSet == nil {
msg := "failed to diff release against cluster resources"
r.event(ctx, hr, rel.Chart.Metadata.Version, eventv1.EventSeverityError, err.Error())
return v2.HelmReleaseNotReady(hr, "DiffFailed", fmt.Sprintf("%s: %s", msg, err.Error())), err
}
log.Error(err, "diff of release against cluster resources finished with error")
}
msg := "no diff in cluster resources compared to release"
if drift {
hasNewState = true
msg = "diff in cluster resources compared to release"
}
if changeSet != nil {
msg = fmt.Sprintf("%s:\n\n%s", msg, changeSet.String())
r.event(ctx, hr, rel.Chart.Metadata.Version, eventv1.EventSeverityInfo, msg)
}
log.Info(msg)
}
}
if hasNewState {
hr = v2.HelmReleaseProgressing(hr)
if updateStatusErr := r.patchStatus(ctx, &hr); updateStatusErr != nil {

5
go.mod
View File

@ -11,6 +11,7 @@ require (
github.com/fluxcd/pkg/apis/kustomize v0.8.0
github.com/fluxcd/pkg/apis/meta v0.19.0
github.com/fluxcd/pkg/runtime v0.29.0
github.com/fluxcd/pkg/ssa v0.23.0
github.com/fluxcd/source-controller/api v0.35.1
github.com/go-logr/logr v1.2.3
github.com/hashicorp/go-retryablehttp v0.7.2
@ -23,6 +24,7 @@ require (
k8s.io/cli-runtime v0.26.1
k8s.io/client-go v0.26.1
k8s.io/utils v0.0.0-20230209194617-a36077c30491
sigs.k8s.io/cli-utils v0.34.0
sigs.k8s.io/controller-runtime v0.14.4
sigs.k8s.io/kustomize/api v0.12.1
sigs.k8s.io/yaml v1.3.0
@ -50,7 +52,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/containerd/containerd v1.6.18 // indirect
github.com/containerd/containerd v1.6.15 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/cli v20.10.21+incompatible // indirect
@ -161,7 +163,6 @@ require (
k8s.io/kube-openapi v0.0.0-20221110221610-a28e98eb7c70 // indirect
k8s.io/kubectl v0.26.0 // indirect
oras.land/oras-go v1.2.2 // indirect
sigs.k8s.io/cli-utils v0.34.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/kustomize/kyaml v0.13.9 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect

6
go.sum
View File

@ -104,8 +104,8 @@ github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnht
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA=
github.com/containerd/containerd v1.6.18 h1:qZbsLvmyu+Vlty0/Ex5xc0z2YtKpIsb5n45mAMI+2Ns=
github.com/containerd/containerd v1.6.18/go.mod h1:1RdCUu95+gc2v9t3IL+zIlpClSmew7/0YS8O5eQZrOw=
github.com/containerd/containerd v1.6.15 h1:4wWexxzLNHNE46aIETc6ge4TofO550v+BlLoANrbses=
github.com/containerd/containerd v1.6.15/go.mod h1:U2NnBPIhzJDm59xF7xB2MMHnKtggpZ+phKg8o2TKj2c=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
@ -168,6 +168,8 @@ github.com/fluxcd/pkg/apis/meta v0.19.0 h1:CX75e/eaRWZDTzNdMSWomY1InlssLKcS8GQDS
github.com/fluxcd/pkg/apis/meta v0.19.0/go.mod h1:7b6prDPsViyAzoY7eRfSPS0/MbXpGGsOMvRq2QrTKa4=
github.com/fluxcd/pkg/runtime v0.29.0 h1:/BDitj/y5shWqczECCiZFsEm9FH7do4VBgMHBiRiol0=
github.com/fluxcd/pkg/runtime v0.29.0/go.mod h1:NrBONYHO5Piuzm6Y7QTS3cJRlgkgsDPn2EKB6gJ4BQw=
github.com/fluxcd/pkg/ssa v0.23.0 h1:e51n2642tyl8iytYQ68geg8E/6tLJcYqXl83HFrJcr4=
github.com/fluxcd/pkg/ssa v0.23.0/go.mod h1:fbnulY5zeKBC6dXwNIgMc9DfPjEgjfhweL031/9ZFKQ=
github.com/fluxcd/source-controller/api v0.35.1 h1:IHlbN7giz5kY4z9oWZ9QLNKtHAaxHdk9RbIurUPS1aI=
github.com/fluxcd/source-controller/api v0.35.1/go.mod h1:TImPMy/MEwNpDu6qHsw9LlCznXaB8bSO8mnxBSFsX4Q=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=

112
internal/diff/differ.go Normal file
View File

@ -0,0 +1,112 @@
/*
Copyright 2023 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diff
import (
"context"
"fmt"
"strings"
"helm.sh/helm/v3/pkg/release"
"k8s.io/apimachinery/pkg/util/errors"
"github.com/fluxcd/pkg/runtime/client"
"github.com/fluxcd/pkg/ssa"
"github.com/fluxcd/helm-controller/internal/util"
)
type Differ struct {
impersonator *client.Impersonator
controllerName string
}
func NewDiffer(impersonator *client.Impersonator, controllerName string) *Differ {
return &Differ{
impersonator: impersonator,
controllerName: controllerName,
}
}
// Manager returns a new ssa.ResourceManager constructed using the client.Impersonator.
func (d *Differ) Manager(ctx context.Context) (*ssa.ResourceManager, error) {
c, poller, err := d.impersonator.GetClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get client to configure resource manager: %w", err)
}
owner := ssa.Owner{
Field: d.controllerName,
}
return ssa.NewResourceManager(c, poller, owner), nil
}
func (d *Differ) Diff(ctx context.Context, rel *release.Release) (*ssa.ChangeSet, bool, error) {
objects, err := ssa.ReadObjects(strings.NewReader(rel.Manifest))
if err != nil {
return nil, false, fmt.Errorf("failed to read objects from release manifest: %w", err)
}
if err := ssa.SetNativeKindsDefaults(objects); err != nil {
return nil, false, fmt.Errorf("failed to set native kind defaults on release objects: %w", err)
}
resourceManager, err := d.Manager(ctx)
if err != nil {
return nil, false, err
}
var (
changeSet = ssa.NewChangeSet()
isNamespacedGVK = map[string]bool{}
errs []error
)
for _, obj := range objects {
if obj.GetNamespace() == "" {
// Manifest does not contain the namespace of the release.
// Figure out if the object is namespaced if the namespace is not
// explicitly set, and configure the namespace accordingly.
objGVK := obj.GetObjectKind().GroupVersionKind().String()
if _, ok := isNamespacedGVK[objGVK]; !ok {
namespaced, err := util.IsAPINamespaced(obj, resourceManager.Client().Scheme(), resourceManager.Client().RESTMapper())
if err != nil {
errs = append(errs, fmt.Errorf("failed to determine if %s is namespace scoped: %w",
obj.GetObjectKind().GroupVersionKind().Kind, err))
continue
}
// Cache the result, so we don't have to do this for every object
isNamespacedGVK[objGVK] = namespaced
}
if isNamespacedGVK[objGVK] {
obj.SetNamespace(rel.Namespace)
}
}
entry, _, _, err := resourceManager.Diff(ctx, obj, ssa.DiffOptions{})
if err != nil {
errs = append(errs, err)
}
if entry != nil && (entry.Action == string(ssa.CreatedAction) || entry.Action == string(ssa.ConfiguredAction)) {
changeSet.Add(*entry)
}
}
err = errors.Reduce(errors.Flatten(errors.NewAggregate(errs)))
if len(changeSet.Entries) == 0 {
return nil, false, err
}
return changeSet, true, err
}

View File

@ -0,0 +1,137 @@
/*
Copyright 2023 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diff
import (
"context"
"testing"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
runtimeClient "github.com/fluxcd/pkg/runtime/client"
"github.com/fluxcd/pkg/ssa"
"helm.sh/helm/v3/pkg/release"
)
func TestDiffer_Diff(t *testing.T) {
scheme, mapper := testSchemeWithMapper()
// We do not test all the possible scenarios here, as the ssa package is
// already tested in depth. We only test the integration with the ssa package.
tests := []struct {
name string
client client.Client
rel *release.Release
want *ssa.ChangeSet
wantDrift bool
wantErr string
}{
{
name: "manifest read error",
client: fake.NewClientBuilder().Build(),
rel: &release.Release{
Manifest: "invalid",
},
wantErr: "failed to read objects from release manifest",
},
{
name: "error on failure to determine namespace scope",
client: fake.NewClientBuilder().Build(),
rel: &release.Release{
Namespace: "release",
Manifest: `apiVersion: v1
kind: Secret
metadata:
name: test
stringData:
foo: bar
`,
},
wantErr: "failed to determine if Secret is namespace scoped",
},
{
name: "detects changes",
client: fake.NewClientBuilder().
WithScheme(scheme).
WithRESTMapper(mapper).
Build(),
rel: &release.Release{
Namespace: "release",
Manifest: `---
apiVersion: v1
kind: Secret
metadata:
name: test
stringData:
foo: bar
`,
},
want: &ssa.ChangeSet{
Entries: []ssa.ChangeSetEntry{
{
ObjMetadata: object.ObjMetadata{
Namespace: "release",
Name: "test",
GroupKind: schema.GroupKind{
Kind: "Secret",
},
},
GroupVersion: "v1",
Subject: "Secret/release/test",
Action: string(ssa.CreatedAction),
},
},
},
wantDrift: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
d := NewDiffer(runtimeClient.NewImpersonator(tt.client, nil, polling.Options{}, nil, runtimeClient.KubeConfigOptions{}, "", "", ""), "test-controller")
got, drift, err := d.Diff(context.TODO(), tt.rel)
if tt.wantErr != "" {
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring(tt.wantErr))
} else {
g.Expect(err).NotTo(HaveOccurred())
}
g.Expect(got).To(Equal(tt.want))
g.Expect(drift).To(Equal(tt.wantDrift))
})
}
}
func testSchemeWithMapper() (*runtime.Scheme, meta.RESTMapper) {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
mapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
mapper.Add(corev1.SchemeGroupVersion.WithKind("Secret"), meta.RESTScopeNamespace)
return scheme, mapper
}

View File

@ -24,15 +24,23 @@ const (
// CacheSecretsAndConfigMaps configures the caching of Secrets and ConfigMaps
// by the controller-runtime client.
//
// When enabled, it will cache both object types, resulting in increased memory usage
// and cluster-wide RBAC permissions (list and watch).
// When enabled, it will cache both object types, resulting in increased memory
// usage and cluster-wide RBAC permissions (list and watch).
CacheSecretsAndConfigMaps = "CacheSecretsAndConfigMaps"
// DetectDrift configures the detection of cluster state drift compared to
// the desired state as described in the manifest of the Helm release
// storage object.
DetectDrift = "DetectDrift"
)
var features = map[string]bool{
// CacheSecretsAndConfigMaps
// opt-in from v0.28
CacheSecretsAndConfigMaps: false,
// DetectClusterStateDrift
// opt-in from v0.31
DetectDrift: false,
}
// FeatureGates contains a list of all supported feature gates and

62
internal/util/object.go Normal file
View File

@ -0,0 +1,62 @@
/*
Copyright 2023 The Flux authors
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// TODO: Remove this when
// https://github.com/kubernetes-sigs/controller-runtime/blob/c783d2527a7da76332a2d8d563a6ca0b80c12122/pkg/client/apiutil/apimachinery.go#L76-L104
// is included in a semver release.
package util
import (
"errors"
"fmt"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
// IsAPINamespaced returns true if the object is namespace scoped.
// For unstructured objects the gvk is found from the object itself.
func IsAPINamespaced(obj runtime.Object, scheme *runtime.Scheme, restmapper apimeta.RESTMapper) (bool, error) {
gvk, err := apiutil.GVKForObject(obj, scheme)
if err != nil {
return false, err
}
return IsAPINamespacedWithGVK(gvk, restmapper)
}
// IsAPINamespacedWithGVK returns true if the object having the provided
// GVK is namespace scoped.
func IsAPINamespacedWithGVK(gk schema.GroupVersionKind, restmapper apimeta.RESTMapper) (bool, error) {
restmapping, err := restmapper.RESTMapping(schema.GroupKind{Group: gk.Group, Kind: gk.Kind})
if err != nil {
return false, fmt.Errorf("failed to get restmapping: %w", err)
}
scope := restmapping.Scope.Name()
if scope == "" {
return false, errors.New("scope cannot be identified, empty scope returned")
}
if scope != apimeta.RESTScopeNameRoot {
return true, nil
}
return false, nil
}

View File

@ -27,6 +27,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
ctrl "sigs.k8s.io/controller-runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
crtlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
@ -165,6 +166,7 @@ func main() {
os.Exit(1)
}
pollingOpts := polling.Options{}
if err = (&controllers.HelmReleaseReconciler{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
@ -174,6 +176,9 @@ func main() {
NoCrossNamespaceRef: aclOptions.NoCrossNamespaceRefs,
ClientOpts: clientOptions,
KubeConfigOpts: kubeConfigOpts,
PollingOpts: pollingOpts,
StatusPoller: polling.NewStatusPoller(mgr.GetClient(), mgr.GetRESTMapper(), pollingOpts),
ControllerName: controllerName,
}).SetupWithManager(mgr, controllers.HelmReleaseReconcilerOptions{
MaxConcurrentReconciles: concurrent,
DependencyRequeueInterval: requeueDependency,