Rework release leftovers gc algorithm

If the HelmInstance spec changes, all the resources get re-rendered which means some of
them may become obsolete (i.e. if some resources are moved from cluster to namespace
scope) hence a sort of "garbage collector task" must be executed.

As today, the GC logic would take into account only the resource generation, however,
when the operator is updated, the generation won't change hence a new algorithm should
be implemented to take into account also the release version.

Signed-off-by: Luca Burgazzoli <lburgazzoli@gmail.com>
This commit is contained in:
Luca Burgazzoli 2024-08-19 12:18:54 +02:00
parent b6a28f1f86
commit 0682236568
12 changed files with 305 additions and 69 deletions

View File

@ -21,7 +21,7 @@ limitations under the License.
package v1alpha1
import (
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)

View File

@ -78,6 +78,7 @@ func NewReconciler(ctx context.Context, manager ctrlRt.Manager, o helm.Options)
rec.actions = append(rec.actions, NewApplyCRDsAction(rec.l))
rec.actions = append(rec.actions, NewApplyResourcesAction(rec.l))
rec.actions = append(rec.actions, NewConditionsAction(rec.l))
rec.actions = append(rec.actions, NewGCAction(rec.l))
err = rec.init(ctx)
if err != nil {

View File

@ -59,6 +59,7 @@ func (a *ApplyCRDsAction) Run(ctx context.Context, rc *ReconciliationRequest) er
helm.ReleaseGeneration: strconv.FormatInt(rc.Resource.Generation, 10),
helm.ReleaseName: rc.Resource.Name,
helm.ReleaseNamespace: rc.Resource.Namespace,
helm.ReleaseVersion: c.Version(),
})
apply := rc.Resource.Generation != rc.Resource.Status.ObservedGeneration

View File

@ -6,8 +6,9 @@ import (
"sort"
"strconv"
"github.com/dapr/kubernetes-operator/pkg/controller"
"github.com/dapr/kubernetes-operator/pkg/controller/predicates"
"github.com/dapr/kubernetes-operator/pkg/controller"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@ -18,7 +19,6 @@ import (
daprApi "github.com/dapr/kubernetes-operator/api/operator/v1alpha1"
"github.com/dapr/kubernetes-operator/pkg/controller/client"
"github.com/dapr/kubernetes-operator/pkg/controller/gc"
"github.com/dapr/kubernetes-operator/pkg/helm"
"github.com/dapr/kubernetes-operator/pkg/pointer"
"github.com/dapr/kubernetes-operator/pkg/resources"
@ -28,14 +28,12 @@ func NewApplyResourcesAction(l logr.Logger) Action {
action := ApplyResourcesAction{
l: l.WithName("action").WithName("apply").WithName("resources"),
subscriptions: make(map[string]struct{}),
gc: gc.New(),
}
return &action
}
type ApplyResourcesAction struct {
gc *gc.GC
l logr.Logger
subscriptions map[string]struct{}
}
@ -63,16 +61,23 @@ func (a *ApplyResourcesAction) Run(ctx context.Context, rc *ReconciliationReques
return istr < jstr
})
reinstall := rc.Resource.Generation != rc.Resource.Status.ObservedGeneration
installedVersion := ""
if rc.Resource.Status.Chart != nil {
installedVersion = rc.Resource.Status.Chart.Version
}
reinstall := rc.Resource.Generation != rc.Resource.Status.ObservedGeneration || c.Version() != installedVersion
if reinstall {
rc.Reconciler.Event(
rc.Resource,
corev1.EventTypeNormal,
"RenderFullHelmTemplate",
fmt.Sprintf("Render full Helm template as Dapr spec changed (observedGeneration: %d, generation: %d)",
fmt.Sprintf("Render full Helm template (observedGeneration: %d, generation: %d, installedChartVersion: %s, chartVersion: %s)",
rc.Resource.Status.ObservedGeneration,
rc.Resource.Generation),
rc.Resource.Generation,
installedVersion,
c.Version()),
)
}
@ -94,6 +99,7 @@ func (a *ApplyResourcesAction) Run(ctx context.Context, rc *ReconciliationReques
helm.ReleaseGeneration: strconv.FormatInt(rc.Resource.Generation, 10),
helm.ReleaseName: rc.Resource.Name,
helm.ReleaseNamespace: rc.Resource.Namespace,
helm.ReleaseVersion: c.Version(),
})
switch dc.(type) {
@ -115,9 +121,10 @@ func (a *ApplyResourcesAction) Run(ctx context.Context, rc *ReconciliationReques
&obj,
rc.Reconciler.EnqueueRequestForOwner(&daprApi.DaprInstance{}, handler.OnlyControllerOwner()),
dependantWithLabels(
a.watchForUpdates(gvk),
true,
a.watchStatus(gvk)),
predicates.WithWatchUpdate(a.watchForUpdates(gvk)),
predicates.WithWatchDeleted(true),
predicates.WithWatchStatus(a.watchStatus(gvk)),
),
)
if err != nil {
@ -146,9 +153,10 @@ func (a *ApplyResourcesAction) Run(ctx context.Context, rc *ReconciliationReques
&obj,
rc.Reconciler.EnqueueRequestsFromMapFunc(labelsToRequest),
dependantWithLabels(
a.watchForUpdates(gvk),
true,
a.watchStatus(gvk)),
predicates.WithWatchUpdate(a.watchForUpdates(gvk)),
predicates.WithWatchDeleted(true),
predicates.WithWatchStatus(a.watchStatus(gvk)),
),
)
if err != nil {
@ -207,31 +215,6 @@ func (a *ApplyResourcesAction) Run(ctx context.Context, rc *ReconciliationReques
"ref", resources.Ref(&obj))
}
//
// in case of a re-installation all the resources get re-rendered which means some of them
// may become obsolete (i.e. if some resources are moved from cluster to namespace scope)
// hence a sort of "garbage collector task" must be executed.
//
// The logic of the task it to delete all the resources that have a generation older than
// current CR one, which is propagated by the controller to all the rendered resources in
// the for of a label:
//
// - helm.operator.dapr.io/release.generation
//
if reinstall {
s, err := gcSelector(rc)
if err != nil {
return fmt.Errorf("cannot compute gc selector: %w", err)
}
deleted, err := a.gc.Run(ctx, rc.Resource.Namespace, rc.Client, s)
if err != nil {
return fmt.Errorf("cannot run gc: %w", err)
}
a.l.Info("gc", "deleted", deleted)
}
return nil
}

View File

@ -30,11 +30,13 @@ func (a *ConditionsAction) Configure(_ context.Context, _ *client.Client, b *bui
}
func (a *ConditionsAction) Run(ctx context.Context, rc *ReconciliationRequest) error {
crs, err := CurrentReleaseSelector(rc)
crs, err := currentReleaseSelector(ctx, rc)
if err != nil {
return fmt.Errorf("cannot compute current release selector: %w", err)
}
// Deployments
deployments, err := rc.Client.AppsV1().Deployments(rc.Resource.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: crs.String(),
})
@ -43,32 +45,58 @@ func (a *ConditionsAction) Run(ctx context.Context, rc *ReconciliationRequest) e
return fmt.Errorf("cannot list deployments: %w", err)
}
ready := 0
readyDeployments := 0
for i := range deployments.Items {
if conditions.ConditionStatus(deployments.Items[i], appsv1.DeploymentAvailable) == corev1.ConditionTrue {
ready++
readyDeployments++
}
}
// StatefulSets
statefulSets, err := rc.Client.AppsV1().StatefulSets(rc.Resource.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: crs.String(),
})
if err != nil {
return fmt.Errorf("cannot list stateful sets: %w", err)
}
readyReplicaSets := 0
for i := range statefulSets.Items {
if statefulSets.Items[i].Status.Replicas == 0 {
continue
}
if statefulSets.Items[i].Status.Replicas == statefulSets.Items[i].Status.ReadyReplicas {
readyReplicaSets++
}
}
var readyCondition metav1.Condition
if len(deployments.Items) > 0 {
if ready == len(deployments.Items) {
if len(deployments.Items)+len(statefulSets.Items) > 0 {
if readyDeployments+readyReplicaSets == len(deployments.Items)+len(statefulSets.Items) {
readyCondition = metav1.Condition{
Type: conditions.TypeReady,
Status: metav1.ConditionTrue,
Reason: "Ready",
Message: fmt.Sprintf("%d/%d deployments ready", ready, len(deployments.Items)),
ObservedGeneration: rc.Resource.Generation,
Message: fmt.Sprintf("%d/%d deployments ready, statefulSets ready %d/%d",
readyDeployments, len(deployments.Items),
readyReplicaSets, len(statefulSets.Items)),
}
} else {
readyCondition = metav1.Condition{
Type: conditions.TypeReady,
Status: metav1.ConditionFalse,
Reason: "InProgress",
Message: fmt.Sprintf("%d/%d deployments ready", ready, len(deployments.Items)),
ObservedGeneration: rc.Resource.Generation,
Message: fmt.Sprintf("%d/%d deployments ready, statefulSets ready %d/%d",
readyDeployments, len(deployments.Items),
readyReplicaSets, len(statefulSets.Items)),
}
}
} else {
@ -76,7 +104,7 @@ func (a *ConditionsAction) Run(ctx context.Context, rc *ReconciliationRequest) e
Type: conditions.TypeReady,
Status: metav1.ConditionFalse,
Reason: "InProgress",
Message: "no deployments",
Message: "no deployments/replicasets",
ObservedGeneration: rc.Resource.Generation,
}
}

View File

@ -0,0 +1,93 @@
package instance
import (
"context"
"fmt"
"strconv"
"github.com/dapr/kubernetes-operator/pkg/controller/gc"
"github.com/dapr/kubernetes-operator/pkg/helm"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/dapr/kubernetes-operator/pkg/controller/client"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/builder"
)
func NewGCAction(l logr.Logger) Action {
return &GCAction{
l: l.WithName("action").WithName("gc"),
gc: gc.New(),
}
}
// GCAction cleanup leftover release resources.
//
// If the HelmInstance spec changes, all the resources get re-rendered which means some of
// them may become obsolete (i.e. if some resources are moved from cluster to namespace
// scope) hence a sort of "garbage collector task" must be executed.
//
// The logic of the task it to delete all the resources that have a generation older than
// current CR one or rendered out of a release version different from the current one. The
// related values are propagated by the controller to all the rendered resources in as a
// set of labels (
//
// - helm.operator.dapr.io/release.generation
// - helm.operator.dapr.io/release.version
//
// The action MUST be executed as the latest action in the reconciliation loop.
type GCAction struct {
l logr.Logger
gc *gc.GC
}
func (a *GCAction) Configure(_ context.Context, _ *client.Client, b *builder.Builder) (*builder.Builder, error) {
return b, nil
}
func (a *GCAction) Run(ctx context.Context, rc *ReconciliationRequest) error {
c, err := rc.Chart(ctx)
if err != nil {
return fmt.Errorf("cannot load chart: %w", err)
}
s, err := gcSelector(ctx, rc)
if err != nil {
return fmt.Errorf("cannot compute gc selector: %w", err)
}
deleted, err := a.gc.Run(ctx, rc.Client, rc.Resource.Namespace, s, func(ctx context.Context, obj unstructured.Unstructured) (bool, error) {
if obj.GetLabels() == nil {
return false, nil
}
gen := obj.GetLabels()[helm.ReleaseGeneration]
ver := obj.GetLabels()[helm.ReleaseVersion]
if gen == "" || ver == "" {
return false, nil
}
if ver != c.Version() {
return true, nil
}
g, err := strconv.Atoi(gen)
if err != nil {
return false, fmt.Errorf("cannot determine release generation: %w", err)
}
return rc.Resource.Generation > int64(g), nil
})
if err != nil {
return fmt.Errorf("cannot run gc: %w", err)
}
a.l.Info("gc", "deleted", deleted)
return nil
}
func (a *GCAction) Cleanup(_ context.Context, _ *ReconciliationRequest) error {
return nil
}

View File

@ -17,7 +17,12 @@ import (
"github.com/dapr/kubernetes-operator/pkg/controller/predicates"
)
func gcSelector(rc *ReconciliationRequest) (labels.Selector, error) {
func gcSelector(ctx context.Context, rc *ReconciliationRequest) (labels.Selector, error) {
c, err := rc.Chart(ctx)
if err != nil {
return nil, fmt.Errorf("cannot load chart: %w", err)
}
namespace, err := labels.NewRequirement(
helm.ReleaseNamespace,
selection.Equals,
@ -38,17 +43,27 @@ func gcSelector(rc *ReconciliationRequest) (labels.Selector, error) {
generation, err := labels.NewRequirement(
helm.ReleaseGeneration,
selection.LessThan,
[]string{strconv.FormatInt(rc.Resource.Generation, 10)})
selection.Exists,
[]string{})
if err != nil {
return nil, fmt.Errorf("cannot determine generation requirement: %w", err)
}
version, err := labels.NewRequirement(
helm.ReleaseVersion,
selection.Equals,
[]string{c.Version()})
if err != nil {
return nil, fmt.Errorf("cannot determine release version requirement: %w", err)
}
selector := labels.NewSelector().
Add(*namespace).
Add(*name).
Add(*generation)
Add(*generation).
Add(*version)
return selector, nil
}
@ -77,7 +92,13 @@ func labelsToRequest(_ context.Context, object ctrlCli.Object) []reconcile.Reque
}}
}
func dependantWithLabels(watchUpdate bool, watchDelete bool, watchStatus bool) predicate.Predicate {
func dependantWithLabels(opts ...predicates.DependentPredicateOption) predicate.Predicate {
dp := &predicates.DependentPredicate{}
for i := range opts {
dp = opts[i](dp)
}
return predicate.And(
&predicates.HasLabel{
Name: helm.ReleaseName,
@ -85,15 +106,16 @@ func dependantWithLabels(watchUpdate bool, watchDelete bool, watchStatus bool) p
&predicates.HasLabel{
Name: helm.ReleaseNamespace,
},
&predicates.DependentPredicate{
WatchUpdate: watchUpdate,
WatchDelete: watchDelete,
WatchStatus: watchStatus,
},
dp,
)
}
func CurrentReleaseSelector(rc *ReconciliationRequest) (labels.Selector, error) {
func currentReleaseSelector(ctx context.Context, rc *ReconciliationRequest) (labels.Selector, error) {
c, err := rc.Chart(ctx)
if err != nil {
return nil, fmt.Errorf("cannot load chart: %w", err)
}
namespace, err := labels.NewRequirement(
helm.ReleaseNamespace,
selection.Equals,
@ -121,10 +143,20 @@ func CurrentReleaseSelector(rc *ReconciliationRequest) (labels.Selector, error)
return nil, fmt.Errorf("cannot determine generation requirement: %w", err)
}
version, err := labels.NewRequirement(
helm.ReleaseVersion,
selection.Equals,
[]string{c.Version()})
if err != nil {
return nil, fmt.Errorf("cannot determine release version requirement: %w", err)
}
selector := labels.NewSelector().
Add(*namespace).
Add(*name).
Add(*generation)
Add(*generation).
Add(*version)
return selector, nil
}

View File

@ -4,9 +4,12 @@ import (
"context"
"fmt"
"slices"
"strings"
"sync"
"time"
"golang.org/x/exp/maps"
"github.com/go-logr/logr"
"golang.org/x/time/rate"
authorization "k8s.io/api/authorization/v1"
@ -27,7 +30,7 @@ func New() *GC {
return &GC{
l: ctrl.Log.WithName("gc"),
limiter: rate.NewLimiter(rate.Every(time.Minute), 1),
collectableGVKs: make(map[schema.GroupVersionKind]struct{}),
collectableGVKs: make([]schema.GroupVersionKind, 0),
}
}
@ -35,25 +38,36 @@ type GC struct {
l logr.Logger
lock sync.Mutex
limiter *rate.Limiter
collectableGVKs map[schema.GroupVersionKind]struct{}
collectableGVKs []schema.GroupVersionKind
}
func (gc *GC) Run(ctx context.Context, ns string, c *client.Client, selector labels.Selector) (int, error) {
func (gc *GC) Run(
ctx context.Context,
c *client.Client,
ns string,
selector labels.Selector,
predicate func(context.Context, unstructured.Unstructured) (bool, error),
) (int, error) {
gc.lock.Lock()
defer gc.lock.Unlock()
err := gc.computeDeletableTypes(ctx, ns, c)
err := gc.computeDeletableTypes(ctx, c, ns)
if err != nil {
return 0, fmt.Errorf("cannot discover GVK types: %w", err)
}
return gc.deleteEachOf(ctx, c, selector)
return gc.deleteEachOf(ctx, c, selector, predicate)
}
func (gc *GC) deleteEachOf(ctx context.Context, c *client.Client, selector labels.Selector) (int, error) {
func (gc *GC) deleteEachOf(
ctx context.Context,
c *client.Client,
selector labels.Selector,
predicate func(context.Context, unstructured.Unstructured) (bool, error),
) (int, error) {
deleted := 0
for GVK := range gc.collectableGVKs {
for _, GVK := range gc.collectableGVKs {
items := unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": GVK.GroupVersion().String(),
@ -85,9 +99,18 @@ func (gc *GC) deleteEachOf(ctx context.Context, c *client.Client, selector label
continue
}
canBeDeleted, err := predicate(ctx, resource)
if err != nil {
return 0, err
}
if !canBeDeleted {
continue
}
gc.l.Info("deleting", "ref", resources.Ref(&resource))
err := c.Delete(ctx, &resource, ctrlCli.PropagationPolicy(metav1.DeletePropagationForeground))
err = c.Delete(ctx, &resource, ctrlCli.PropagationPolicy(metav1.DeletePropagationForeground))
if err != nil {
// The resource may have already been deleted
if !k8serrors.IsNotFound(err) {
@ -120,7 +143,7 @@ func (gc *GC) canBeDeleted(_ context.Context, gvk schema.GroupVersionKind) bool
return true
}
func (gc *GC) computeDeletableTypes(ctx context.Context, ns string, c *client.Client) error {
func (gc *GC) computeDeletableTypes(ctx context.Context, c *client.Client, ns string) error {
// Rate limit to avoid Discovery and SelfSubjectRulesReview requests at every reconciliation.
if !gc.limiter.Allow() {
// Return the cached set of garbage collectable GVKs.
@ -193,7 +216,10 @@ func (gc *GC) computeDeletableTypes(ctx context.Context, ns string, c *client.Cl
}
}
gc.collectableGVKs = GVKs
gc.collectableGVKs = maps.Keys(GVKs)
slices.SortFunc(gc.collectableGVKs, func(a, b schema.GroupVersionKind) int {
return strings.Compare(a.String(), b.String())
})
return nil
}

View File

@ -12,6 +12,29 @@ import (
var _ predicate.Predicate = DependentPredicate{}
type DependentPredicateOption func(*DependentPredicate) *DependentPredicate
func WithWatchDeleted(val bool) DependentPredicateOption {
return func(in *DependentPredicate) *DependentPredicate {
in.WatchDelete = val
return in
}
}
func WithWatchUpdate(val bool) DependentPredicateOption {
return func(in *DependentPredicate) *DependentPredicate {
in.WatchUpdate = val
return in
}
}
func WithWatchStatus(val bool) DependentPredicateOption {
return func(in *DependentPredicate) *DependentPredicate {
in.WatchStatus = val
return in
}
}
type DependentPredicate struct {
WatchDelete bool
WatchUpdate bool

View File

@ -9,6 +9,7 @@ const (
ReleaseGeneration = "helm.operator.dapr.io/release.generation"
ReleaseName = "helm.operator.dapr.io/release.name"
ReleaseNamespace = "helm.operator.dapr.io/release.namespace"
ReleaseVersion = "helm.operator.dapr.io/release.version"
ChartsDir = "helm-charts/dapr"
)

View File

@ -64,6 +64,16 @@ func Labels(target *unstructured.Unstructured, labels map[string]string) {
target.SetLabels(m)
}
func Label(target *unstructured.Unstructured, key string) string {
m := target.GetLabels()
if m == nil {
return ""
}
return m[key]
}
func Ref(obj client.Object) string {
name := obj.GetName()
if obj.GetNamespace() == "" {

View File

@ -54,6 +54,44 @@ func TestDaprInstanceDeployWithDefaults(t *testing.T) {
)
}
func TestDaprInstanceGC(t *testing.T) {
test := With(t)
{
instance := dapr.DeployInstance(
test,
daprAc.DaprInstanceSpec().
WithValues(nil),
)
test.Eventually(Deployment(test, "dapr-operator", instance.Namespace), TestTimeoutLong).Should(
WithTransform(ConditionStatus(appsv1.DeploymentAvailable), Equal(corev1.ConditionTrue)))
test.Eventually(Deployment(test, "dapr-sentry", instance.Namespace), TestTimeoutLong).Should(
WithTransform(ConditionStatus(appsv1.DeploymentAvailable), Equal(corev1.ConditionTrue)))
test.Eventually(Deployment(test, "dapr-sidecar-injector", instance.Namespace), TestTimeoutLong).Should(
WithTransform(ConditionStatus(appsv1.DeploymentAvailable), Equal(corev1.ConditionTrue)))
}
{
instance := dapr.DeployInstance(
test,
daprAc.DaprInstanceSpec().
WithValues(dapr.Values(test, map[string]any{
"dapr_sidecar_injector": map[string]any{
"enabled": false,
},
})),
)
test.Eventually(Deployment(test, "dapr-operator", instance.Namespace), TestTimeoutLong).Should(
WithTransform(ConditionStatus(appsv1.DeploymentAvailable), Equal(corev1.ConditionTrue)))
test.Eventually(Deployment(test, "dapr-sentry", instance.Namespace), TestTimeoutLong).Should(
WithTransform(ConditionStatus(appsv1.DeploymentAvailable), Equal(corev1.ConditionTrue)))
test.Eventually(Deployment(test, "dapr-sidecar-injector", instance.Namespace), TestTimeoutLong).Should(
BeNil())
}
}
func TestDaprInstanceDeployWithCustomChart(t *testing.T) {
test := With(t)