Merge pull request #210 from lburgazzoli/gc

Rework release leftovers gc algorithm
This commit is contained in:
Luca Burgazzoli 2024-08-27 10:32:42 +02:00 committed by GitHub
commit d460da3772
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 305 additions and 69 deletions

View File

@ -21,7 +21,7 @@ limitations under the License.
package v1alpha1
import (
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)

View File

@ -78,6 +78,7 @@ func NewReconciler(ctx context.Context, manager ctrlRt.Manager, o helm.Options)
rec.actions = append(rec.actions, NewApplyCRDsAction(rec.l))
rec.actions = append(rec.actions, NewApplyResourcesAction(rec.l))
rec.actions = append(rec.actions, NewConditionsAction(rec.l))
rec.actions = append(rec.actions, NewGCAction(rec.l))
err = rec.init(ctx)
if err != nil {

View File

@ -59,6 +59,7 @@ func (a *ApplyCRDsAction) Run(ctx context.Context, rc *ReconciliationRequest) er
helm.ReleaseGeneration: strconv.FormatInt(rc.Resource.Generation, 10),
helm.ReleaseName: rc.Resource.Name,
helm.ReleaseNamespace: rc.Resource.Namespace,
helm.ReleaseVersion: c.Version(),
})
apply := rc.Resource.Generation != rc.Resource.Status.ObservedGeneration

View File

@ -6,8 +6,9 @@ import (
"sort"
"strconv"
"github.com/dapr/kubernetes-operator/pkg/controller"
"github.com/dapr/kubernetes-operator/pkg/controller/predicates"
"github.com/dapr/kubernetes-operator/pkg/controller"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@ -18,7 +19,6 @@ import (
daprApi "github.com/dapr/kubernetes-operator/api/operator/v1alpha1"
"github.com/dapr/kubernetes-operator/pkg/controller/client"
"github.com/dapr/kubernetes-operator/pkg/controller/gc"
"github.com/dapr/kubernetes-operator/pkg/helm"
"github.com/dapr/kubernetes-operator/pkg/pointer"
"github.com/dapr/kubernetes-operator/pkg/resources"
@ -28,14 +28,12 @@ func NewApplyResourcesAction(l logr.Logger) Action {
action := ApplyResourcesAction{
l: l.WithName("action").WithName("apply").WithName("resources"),
subscriptions: make(map[string]struct{}),
gc: gc.New(),
}
return &action
}
type ApplyResourcesAction struct {
gc *gc.GC
l logr.Logger
subscriptions map[string]struct{}
}
@ -63,16 +61,23 @@ func (a *ApplyResourcesAction) Run(ctx context.Context, rc *ReconciliationReques
return istr < jstr
})
reinstall := rc.Resource.Generation != rc.Resource.Status.ObservedGeneration
installedVersion := ""
if rc.Resource.Status.Chart != nil {
installedVersion = rc.Resource.Status.Chart.Version
}
reinstall := rc.Resource.Generation != rc.Resource.Status.ObservedGeneration || c.Version() != installedVersion
if reinstall {
rc.Reconciler.Event(
rc.Resource,
corev1.EventTypeNormal,
"RenderFullHelmTemplate",
fmt.Sprintf("Render full Helm template as Dapr spec changed (observedGeneration: %d, generation: %d)",
fmt.Sprintf("Render full Helm template (observedGeneration: %d, generation: %d, installedChartVersion: %s, chartVersion: %s)",
rc.Resource.Status.ObservedGeneration,
rc.Resource.Generation),
rc.Resource.Generation,
installedVersion,
c.Version()),
)
}
@ -94,6 +99,7 @@ func (a *ApplyResourcesAction) Run(ctx context.Context, rc *ReconciliationReques
helm.ReleaseGeneration: strconv.FormatInt(rc.Resource.Generation, 10),
helm.ReleaseName: rc.Resource.Name,
helm.ReleaseNamespace: rc.Resource.Namespace,
helm.ReleaseVersion: c.Version(),
})
switch dc.(type) {
@ -115,9 +121,10 @@ func (a *ApplyResourcesAction) Run(ctx context.Context, rc *ReconciliationReques
&obj,
rc.Reconciler.EnqueueRequestForOwner(&daprApi.DaprInstance{}, handler.OnlyControllerOwner()),
dependantWithLabels(
a.watchForUpdates(gvk),
true,
a.watchStatus(gvk)),
predicates.WithWatchUpdate(a.watchForUpdates(gvk)),
predicates.WithWatchDeleted(true),
predicates.WithWatchStatus(a.watchStatus(gvk)),
),
)
if err != nil {
@ -146,9 +153,10 @@ func (a *ApplyResourcesAction) Run(ctx context.Context, rc *ReconciliationReques
&obj,
rc.Reconciler.EnqueueRequestsFromMapFunc(labelsToRequest),
dependantWithLabels(
a.watchForUpdates(gvk),
true,
a.watchStatus(gvk)),
predicates.WithWatchUpdate(a.watchForUpdates(gvk)),
predicates.WithWatchDeleted(true),
predicates.WithWatchStatus(a.watchStatus(gvk)),
),
)
if err != nil {
@ -207,31 +215,6 @@ func (a *ApplyResourcesAction) Run(ctx context.Context, rc *ReconciliationReques
"ref", resources.Ref(&obj))
}
//
// in case of a re-installation all the resources get re-rendered which means some of them
// may become obsolete (i.e. if some resources are moved from cluster to namespace scope)
// hence a sort of "garbage collector task" must be executed.
//
// The logic of the task it to delete all the resources that have a generation older than
// current CR one, which is propagated by the controller to all the rendered resources in
// the for of a label:
//
// - helm.operator.dapr.io/release.generation
//
if reinstall {
s, err := gcSelector(rc)
if err != nil {
return fmt.Errorf("cannot compute gc selector: %w", err)
}
deleted, err := a.gc.Run(ctx, rc.Resource.Namespace, rc.Client, s)
if err != nil {
return fmt.Errorf("cannot run gc: %w", err)
}
a.l.Info("gc", "deleted", deleted)
}
return nil
}

View File

@ -30,11 +30,13 @@ func (a *ConditionsAction) Configure(_ context.Context, _ *client.Client, b *bui
}
func (a *ConditionsAction) Run(ctx context.Context, rc *ReconciliationRequest) error {
crs, err := CurrentReleaseSelector(rc)
crs, err := currentReleaseSelector(ctx, rc)
if err != nil {
return fmt.Errorf("cannot compute current release selector: %w", err)
}
// Deployments
deployments, err := rc.Client.AppsV1().Deployments(rc.Resource.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: crs.String(),
})
@ -43,32 +45,58 @@ func (a *ConditionsAction) Run(ctx context.Context, rc *ReconciliationRequest) e
return fmt.Errorf("cannot list deployments: %w", err)
}
ready := 0
readyDeployments := 0
for i := range deployments.Items {
if conditions.ConditionStatus(deployments.Items[i], appsv1.DeploymentAvailable) == corev1.ConditionTrue {
ready++
readyDeployments++
}
}
// StatefulSets
statefulSets, err := rc.Client.AppsV1().StatefulSets(rc.Resource.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: crs.String(),
})
if err != nil {
return fmt.Errorf("cannot list stateful sets: %w", err)
}
readyReplicaSets := 0
for i := range statefulSets.Items {
if statefulSets.Items[i].Status.Replicas == 0 {
continue
}
if statefulSets.Items[i].Status.Replicas == statefulSets.Items[i].Status.ReadyReplicas {
readyReplicaSets++
}
}
var readyCondition metav1.Condition
if len(deployments.Items) > 0 {
if ready == len(deployments.Items) {
if len(deployments.Items)+len(statefulSets.Items) > 0 {
if readyDeployments+readyReplicaSets == len(deployments.Items)+len(statefulSets.Items) {
readyCondition = metav1.Condition{
Type: conditions.TypeReady,
Status: metav1.ConditionTrue,
Reason: "Ready",
Message: fmt.Sprintf("%d/%d deployments ready", ready, len(deployments.Items)),
ObservedGeneration: rc.Resource.Generation,
Message: fmt.Sprintf("%d/%d deployments ready, statefulSets ready %d/%d",
readyDeployments, len(deployments.Items),
readyReplicaSets, len(statefulSets.Items)),
}
} else {
readyCondition = metav1.Condition{
Type: conditions.TypeReady,
Status: metav1.ConditionFalse,
Reason: "InProgress",
Message: fmt.Sprintf("%d/%d deployments ready", ready, len(deployments.Items)),
ObservedGeneration: rc.Resource.Generation,
Message: fmt.Sprintf("%d/%d deployments ready, statefulSets ready %d/%d",
readyDeployments, len(deployments.Items),
readyReplicaSets, len(statefulSets.Items)),
}
}
} else {
@ -76,7 +104,7 @@ func (a *ConditionsAction) Run(ctx context.Context, rc *ReconciliationRequest) e
Type: conditions.TypeReady,
Status: metav1.ConditionFalse,
Reason: "InProgress",
Message: "no deployments",
Message: "no deployments/replicasets",
ObservedGeneration: rc.Resource.Generation,
}
}

View File

@ -0,0 +1,93 @@
package instance
import (
"context"
"fmt"
"strconv"
"github.com/dapr/kubernetes-operator/pkg/controller/gc"
"github.com/dapr/kubernetes-operator/pkg/helm"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/dapr/kubernetes-operator/pkg/controller/client"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/builder"
)
func NewGCAction(l logr.Logger) Action {
return &GCAction{
l: l.WithName("action").WithName("gc"),
gc: gc.New(),
}
}
// GCAction cleanup leftover release resources.
//
// If the HelmInstance spec changes, all the resources get re-rendered which means some of
// them may become obsolete (i.e. if some resources are moved from cluster to namespace
// scope) hence a sort of "garbage collector task" must be executed.
//
// The logic of the task it to delete all the resources that have a generation older than
// current CR one or rendered out of a release version different from the current one. The
// related values are propagated by the controller to all the rendered resources in as a
// set of labels (
//
// - helm.operator.dapr.io/release.generation
// - helm.operator.dapr.io/release.version
//
// The action MUST be executed as the latest action in the reconciliation loop.
type GCAction struct {
l logr.Logger
gc *gc.GC
}
func (a *GCAction) Configure(_ context.Context, _ *client.Client, b *builder.Builder) (*builder.Builder, error) {
return b, nil
}
func (a *GCAction) Run(ctx context.Context, rc *ReconciliationRequest) error {
c, err := rc.Chart(ctx)
if err != nil {
return fmt.Errorf("cannot load chart: %w", err)
}
s, err := gcSelector(ctx, rc)
if err != nil {
return fmt.Errorf("cannot compute gc selector: %w", err)
}
deleted, err := a.gc.Run(ctx, rc.Client, rc.Resource.Namespace, s, func(ctx context.Context, obj unstructured.Unstructured) (bool, error) {
if obj.GetLabels() == nil {
return false, nil
}
gen := obj.GetLabels()[helm.ReleaseGeneration]
ver := obj.GetLabels()[helm.ReleaseVersion]
if gen == "" || ver == "" {
return false, nil
}
if ver != c.Version() {
return true, nil
}
g, err := strconv.Atoi(gen)
if err != nil {
return false, fmt.Errorf("cannot determine release generation: %w", err)
}
return rc.Resource.Generation > int64(g), nil
})
if err != nil {
return fmt.Errorf("cannot run gc: %w", err)
}
a.l.Info("gc", "deleted", deleted)
return nil
}
func (a *GCAction) Cleanup(_ context.Context, _ *ReconciliationRequest) error {
return nil
}

View File

@ -17,7 +17,12 @@ import (
"github.com/dapr/kubernetes-operator/pkg/controller/predicates"
)
func gcSelector(rc *ReconciliationRequest) (labels.Selector, error) {
func gcSelector(ctx context.Context, rc *ReconciliationRequest) (labels.Selector, error) {
c, err := rc.Chart(ctx)
if err != nil {
return nil, fmt.Errorf("cannot load chart: %w", err)
}
namespace, err := labels.NewRequirement(
helm.ReleaseNamespace,
selection.Equals,
@ -38,17 +43,27 @@ func gcSelector(rc *ReconciliationRequest) (labels.Selector, error) {
generation, err := labels.NewRequirement(
helm.ReleaseGeneration,
selection.LessThan,
[]string{strconv.FormatInt(rc.Resource.Generation, 10)})
selection.Exists,
[]string{})
if err != nil {
return nil, fmt.Errorf("cannot determine generation requirement: %w", err)
}
version, err := labels.NewRequirement(
helm.ReleaseVersion,
selection.Equals,
[]string{c.Version()})
if err != nil {
return nil, fmt.Errorf("cannot determine release version requirement: %w", err)
}
selector := labels.NewSelector().
Add(*namespace).
Add(*name).
Add(*generation)
Add(*generation).
Add(*version)
return selector, nil
}
@ -77,7 +92,13 @@ func labelsToRequest(_ context.Context, object ctrlCli.Object) []reconcile.Reque
}}
}
func dependantWithLabels(watchUpdate bool, watchDelete bool, watchStatus bool) predicate.Predicate {
func dependantWithLabels(opts ...predicates.DependentPredicateOption) predicate.Predicate {
dp := &predicates.DependentPredicate{}
for i := range opts {
dp = opts[i](dp)
}
return predicate.And(
&predicates.HasLabel{
Name: helm.ReleaseName,
@ -85,15 +106,16 @@ func dependantWithLabels(watchUpdate bool, watchDelete bool, watchStatus bool) p
&predicates.HasLabel{
Name: helm.ReleaseNamespace,
},
&predicates.DependentPredicate{
WatchUpdate: watchUpdate,
WatchDelete: watchDelete,
WatchStatus: watchStatus,
},
dp,
)
}
func CurrentReleaseSelector(rc *ReconciliationRequest) (labels.Selector, error) {
func currentReleaseSelector(ctx context.Context, rc *ReconciliationRequest) (labels.Selector, error) {
c, err := rc.Chart(ctx)
if err != nil {
return nil, fmt.Errorf("cannot load chart: %w", err)
}
namespace, err := labels.NewRequirement(
helm.ReleaseNamespace,
selection.Equals,
@ -121,10 +143,20 @@ func CurrentReleaseSelector(rc *ReconciliationRequest) (labels.Selector, error)
return nil, fmt.Errorf("cannot determine generation requirement: %w", err)
}
version, err := labels.NewRequirement(
helm.ReleaseVersion,
selection.Equals,
[]string{c.Version()})
if err != nil {
return nil, fmt.Errorf("cannot determine release version requirement: %w", err)
}
selector := labels.NewSelector().
Add(*namespace).
Add(*name).
Add(*generation)
Add(*generation).
Add(*version)
return selector, nil
}

View File

@ -4,9 +4,12 @@ import (
"context"
"fmt"
"slices"
"strings"
"sync"
"time"
"golang.org/x/exp/maps"
"github.com/go-logr/logr"
"golang.org/x/time/rate"
authorization "k8s.io/api/authorization/v1"
@ -27,7 +30,7 @@ func New() *GC {
return &GC{
l: ctrl.Log.WithName("gc"),
limiter: rate.NewLimiter(rate.Every(time.Minute), 1),
collectableGVKs: make(map[schema.GroupVersionKind]struct{}),
collectableGVKs: make([]schema.GroupVersionKind, 0),
}
}
@ -35,25 +38,36 @@ type GC struct {
l logr.Logger
lock sync.Mutex
limiter *rate.Limiter
collectableGVKs map[schema.GroupVersionKind]struct{}
collectableGVKs []schema.GroupVersionKind
}
func (gc *GC) Run(ctx context.Context, ns string, c *client.Client, selector labels.Selector) (int, error) {
func (gc *GC) Run(
ctx context.Context,
c *client.Client,
ns string,
selector labels.Selector,
predicate func(context.Context, unstructured.Unstructured) (bool, error),
) (int, error) {
gc.lock.Lock()
defer gc.lock.Unlock()
err := gc.computeDeletableTypes(ctx, ns, c)
err := gc.computeDeletableTypes(ctx, c, ns)
if err != nil {
return 0, fmt.Errorf("cannot discover GVK types: %w", err)
}
return gc.deleteEachOf(ctx, c, selector)
return gc.deleteEachOf(ctx, c, selector, predicate)
}
func (gc *GC) deleteEachOf(ctx context.Context, c *client.Client, selector labels.Selector) (int, error) {
func (gc *GC) deleteEachOf(
ctx context.Context,
c *client.Client,
selector labels.Selector,
predicate func(context.Context, unstructured.Unstructured) (bool, error),
) (int, error) {
deleted := 0
for GVK := range gc.collectableGVKs {
for _, GVK := range gc.collectableGVKs {
items := unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": GVK.GroupVersion().String(),
@ -85,9 +99,18 @@ func (gc *GC) deleteEachOf(ctx context.Context, c *client.Client, selector label
continue
}
canBeDeleted, err := predicate(ctx, resource)
if err != nil {
return 0, err
}
if !canBeDeleted {
continue
}
gc.l.Info("deleting", "ref", resources.Ref(&resource))
err := c.Delete(ctx, &resource, ctrlCli.PropagationPolicy(metav1.DeletePropagationForeground))
err = c.Delete(ctx, &resource, ctrlCli.PropagationPolicy(metav1.DeletePropagationForeground))
if err != nil {
// The resource may have already been deleted
if !k8serrors.IsNotFound(err) {
@ -120,7 +143,7 @@ func (gc *GC) canBeDeleted(_ context.Context, gvk schema.GroupVersionKind) bool
return true
}
func (gc *GC) computeDeletableTypes(ctx context.Context, ns string, c *client.Client) error {
func (gc *GC) computeDeletableTypes(ctx context.Context, c *client.Client, ns string) error {
// Rate limit to avoid Discovery and SelfSubjectRulesReview requests at every reconciliation.
if !gc.limiter.Allow() {
// Return the cached set of garbage collectable GVKs.
@ -193,7 +216,10 @@ func (gc *GC) computeDeletableTypes(ctx context.Context, ns string, c *client.Cl
}
}
gc.collectableGVKs = GVKs
gc.collectableGVKs = maps.Keys(GVKs)
slices.SortFunc(gc.collectableGVKs, func(a, b schema.GroupVersionKind) int {
return strings.Compare(a.String(), b.String())
})
return nil
}

View File

@ -12,6 +12,29 @@ import (
var _ predicate.Predicate = DependentPredicate{}
type DependentPredicateOption func(*DependentPredicate) *DependentPredicate
func WithWatchDeleted(val bool) DependentPredicateOption {
return func(in *DependentPredicate) *DependentPredicate {
in.WatchDelete = val
return in
}
}
func WithWatchUpdate(val bool) DependentPredicateOption {
return func(in *DependentPredicate) *DependentPredicate {
in.WatchUpdate = val
return in
}
}
func WithWatchStatus(val bool) DependentPredicateOption {
return func(in *DependentPredicate) *DependentPredicate {
in.WatchStatus = val
return in
}
}
type DependentPredicate struct {
WatchDelete bool
WatchUpdate bool

View File

@ -9,6 +9,7 @@ const (
ReleaseGeneration = "helm.operator.dapr.io/release.generation"
ReleaseName = "helm.operator.dapr.io/release.name"
ReleaseNamespace = "helm.operator.dapr.io/release.namespace"
ReleaseVersion = "helm.operator.dapr.io/release.version"
ChartsDir = "helm-charts/dapr"
)

View File

@ -64,6 +64,16 @@ func Labels(target *unstructured.Unstructured, labels map[string]string) {
target.SetLabels(m)
}
func Label(target *unstructured.Unstructured, key string) string {
m := target.GetLabels()
if m == nil {
return ""
}
return m[key]
}
func Ref(obj client.Object) string {
name := obj.GetName()
if obj.GetNamespace() == "" {

View File

@ -54,6 +54,44 @@ func TestDaprInstanceDeployWithDefaults(t *testing.T) {
)
}
func TestDaprInstanceGC(t *testing.T) {
test := With(t)
{
instance := dapr.DeployInstance(
test,
daprAc.DaprInstanceSpec().
WithValues(nil),
)
test.Eventually(Deployment(test, "dapr-operator", instance.Namespace), TestTimeoutLong).Should(
WithTransform(ConditionStatus(appsv1.DeploymentAvailable), Equal(corev1.ConditionTrue)))
test.Eventually(Deployment(test, "dapr-sentry", instance.Namespace), TestTimeoutLong).Should(
WithTransform(ConditionStatus(appsv1.DeploymentAvailable), Equal(corev1.ConditionTrue)))
test.Eventually(Deployment(test, "dapr-sidecar-injector", instance.Namespace), TestTimeoutLong).Should(
WithTransform(ConditionStatus(appsv1.DeploymentAvailable), Equal(corev1.ConditionTrue)))
}
{
instance := dapr.DeployInstance(
test,
daprAc.DaprInstanceSpec().
WithValues(dapr.Values(test, map[string]any{
"dapr_sidecar_injector": map[string]any{
"enabled": false,
},
})),
)
test.Eventually(Deployment(test, "dapr-operator", instance.Namespace), TestTimeoutLong).Should(
WithTransform(ConditionStatus(appsv1.DeploymentAvailable), Equal(corev1.ConditionTrue)))
test.Eventually(Deployment(test, "dapr-sentry", instance.Namespace), TestTimeoutLong).Should(
WithTransform(ConditionStatus(appsv1.DeploymentAvailable), Equal(corev1.ConditionTrue)))
test.Eventually(Deployment(test, "dapr-sidecar-injector", instance.Namespace), TestTimeoutLong).Should(
BeNil())
}
}
func TestDaprInstanceDeployWithCustomChart(t *testing.T) {
test := With(t)