Refactor garbage collector

This commit is contained in:
stefanprodan 2020-09-10 14:26:14 +03:00
parent 76cd349b6b
commit 087be46136
3 changed files with 109 additions and 87 deletions

View File

@ -19,9 +19,11 @@ package v1alpha1
import (
"bytes"
"io"
"strings"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
)
@ -84,7 +86,7 @@ func (s *Snapshot) addEntry(item *unstructured.Unstructured) {
found := false
for _, tracker := range s.Entries {
if tracker.Namespace == item.GetNamespace() {
tracker.Kinds[item.GetKind()] = item.GetAPIVersion()
tracker.Kinds[item.GroupVersionKind().String()] = item.GetKind()
found = true
break
}
@ -93,31 +95,42 @@ func (s *Snapshot) addEntry(item *unstructured.Unstructured) {
s.Entries = append(s.Entries, SnapshotEntry{
Namespace: item.GetNamespace(),
Kinds: map[string]string{
item.GetKind(): item.GetAPIVersion(),
item.GroupVersionKind().String(): item.GetKind(),
},
})
}
}
func (s *Snapshot) NonNamespacedKinds() []string {
kinds := make([]string, 0)
func (s *Snapshot) NonNamespacedKinds() []schema.GroupVersionKind {
kinds := make([]schema.GroupVersionKind, 0)
for _, tracker := range s.Entries {
if tracker.Namespace == "" {
for k, _ := range tracker.Kinds {
kinds = append(kinds, k)
for gvk, kind := range tracker.Kinds {
if strings.Contains(gvk, ",") {
gv, err := schema.ParseGroupVersion(strings.Split(gvk, ",")[0])
if err == nil {
kinds = append(kinds, gv.WithKind(kind))
}
}
}
}
}
return kinds
}
func (s *Snapshot) NamespacedKinds() map[string][]string {
nsk := make(map[string][]string)
func (s *Snapshot) NamespacedKinds() map[string][]schema.GroupVersionKind {
nsk := make(map[string][]schema.GroupVersionKind)
for _, tracker := range s.Entries {
if tracker.Namespace != "" {
var kinds []string
for k, _ := range tracker.Kinds {
kinds = append(kinds, k)
var kinds []schema.GroupVersionKind
for gvk, kind := range tracker.Kinds {
if strings.Contains(gvk, ",") {
gv, err := schema.ParseGroupVersion(strings.Split(gvk, ",")[0])
if err == nil {
kinds = append(kinds, gv.WithKind(kind))
}
}
}
nsk[tracker.Namespace] = kinds
}

View File

@ -592,24 +592,20 @@ func (r *KustomizationReconciler) prune(kustomization kustomizev1.Kustomization,
}
}
gc := NewGarbageCollector(*kustomization.Status.Snapshot, r.Log)
gc := NewGarbageCollector(r.Client, *kustomization.Status.Snapshot, r.Log)
if output, ok := gc.Prune(kustomization.GetTimeout(),
kustomization.GetName(),
kustomization.GetNamespace(),
); !ok {
return fmt.Errorf("pruning failed")
return fmt.Errorf("garbage collection failed: %s", output)
} else {
changeSet := ""
input := strings.Split(output, "\n")
for _, action := range input {
if strings.Contains(action, "deleted") {
changeSet += action + "\n"
}
}
if changeSet != "" {
r.event(kustomization, snapshot.Revision, recorder.EventSeverityInfo, changeSet)
if output != "" {
r.Log.WithValues(
strings.ToLower(kustomization.Kind),
fmt.Sprintf("%s/%s", kustomization.GetNamespace(), kustomization.GetName()),
).Info(fmt.Sprintf("garbage collection completed: %s", output))
r.event(kustomization, snapshot.Revision, recorder.EventSeverityInfo, output)
}
}
return nil

View File

@ -4,10 +4,13 @@ import (
"context"
"crypto/sha1"
"fmt"
"os/exec"
"strings"
"time"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/go-logr/logr"
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1alpha1"
@ -16,89 +19,99 @@ import (
type KustomizeGarbageCollector struct {
snapshot kustomizev1.Snapshot
log logr.Logger
client.Client
}
func NewGarbageCollector(snapshot kustomizev1.Snapshot, log logr.Logger) *KustomizeGarbageCollector {
func NewGarbageCollector(kubeClient client.Client, snapshot kustomizev1.Snapshot, log logr.Logger) *KustomizeGarbageCollector {
return &KustomizeGarbageCollector{
Client: kubeClient,
snapshot: snapshot,
log: log,
}
}
// Prune deletes Kubernetes objects removed from source.
// Namespaced objects are removed before global ones, as in CRs before CRDs.
// The garbage collector determines what objects to prune based on
// a label selector that contains the previously applied revision.
// The garbage collector ignores objects that are no longer present
// on the cluster or if they are marked for deleting using Kubernetes finalizers.
func (kgc *KustomizeGarbageCollector) Prune(timeout time.Duration, name string, namespace string) (string, bool) {
selector := kgc.selectors(name, namespace, kgc.snapshot.Revision)
ok := true
changeSet := ""
outInfo := ""
outErr := ""
for ns, kinds := range kgc.snapshot.NamespacedKinds() {
for _, kind := range kinds {
if output, err := kgc.deleteByKind(timeout, kind, ns, selector); err != nil {
outErr += " " + err.Error()
ok = false
} else {
outInfo += " " + output + "\n"
}
}
}
if outErr == "" {
kgc.log.Info("Garbage collection for namespaced objects completed",
"kustomization", fmt.Sprintf("%s/%s", namespace, name),
"output", outInfo)
changeSet += outInfo
} else {
kgc.log.Error(fmt.Errorf(outErr), "Garbage collection for namespaced objects failed",
"kustomization", fmt.Sprintf("%s/%s", namespace, name))
}
outInfo = ""
outErr = ""
for _, kind := range kgc.snapshot.NonNamespacedKinds() {
if output, err := kgc.deleteByKind(timeout, kind, "", selector); err != nil {
outErr += " " + err.Error()
ok = false
} else {
outInfo += " " + output + "\n"
}
}
if outErr == "" {
kgc.log.Info("Garbage collection for non-namespaced objects completed",
"kustomization", fmt.Sprintf("%s/%s", namespace, name),
"output", outInfo)
changeSet += outInfo
} else {
kgc.log.Error(fmt.Errorf(outErr), "Garbage collection for non-namespaced objects failed",
"kustomization", fmt.Sprintf("%s/%s", namespace, name))
}
return changeSet, ok
}
func (kgc *KustomizeGarbageCollector) deleteByKind(timeout time.Duration, kind, namespace, selector string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout+time.Second)
defer cancel()
cmd := fmt.Sprintf("kubectl delete %s -l %s", kind, selector)
if namespace != "" {
cmd = fmt.Sprintf("%s -n=%s", cmd, namespace)
for ns, gvks := range kgc.snapshot.NamespacedKinds() {
for _, gvk := range gvks {
ulist := &unstructured.UnstructuredList{}
ulist.SetGroupVersionKind(schema.GroupVersionKind{
Group: gvk.Group,
Kind: gvk.Kind + "List",
Version: gvk.Version,
})
err := kgc.List(ctx, ulist, client.InNamespace(ns), kgc.matchingLabels(name, namespace, kgc.snapshot.Revision))
if err == nil {
for _, item := range ulist.Items {
if item.GetDeletionTimestamp().IsZero() {
name := fmt.Sprintf("%s/%s/%s", item.GetKind(), item.GetNamespace(), item.GetName())
err = kgc.Delete(ctx, &item)
if err != nil {
outErr += fmt.Sprintf("delete failed for %s: %v\n", name, err)
} else {
if len(item.GetFinalizers()) > 0 {
changeSet += fmt.Sprintf("%s/%s marked for deletion\n", item.GetNamespace(), item.GetName())
} else {
changeSet += fmt.Sprintf("%s/%s deleted\n", item.GetNamespace(), item.GetName())
}
}
}
}
}
}
}
command := exec.CommandContext(ctx, "/bin/sh", "-c", cmd)
if output, err := command.CombinedOutput(); err != nil {
// ignore unknown resource kind
if strings.Contains(string(output), "the server doesn't have a resource type") {
return strings.TrimSuffix(string(output), "\n"), nil
} else {
return "", fmt.Errorf("%s", strings.TrimSuffix(string(output), "\n"))
for _, gvk := range kgc.snapshot.NonNamespacedKinds() {
ulist := &unstructured.UnstructuredList{}
ulist.SetGroupVersionKind(schema.GroupVersionKind{
Group: gvk.Group,
Kind: gvk.Kind + "List",
Version: gvk.Version,
})
err := kgc.List(ctx, ulist, kgc.matchingLabels(name, namespace, kgc.snapshot.Revision))
if err == nil {
for _, item := range ulist.Items {
if item.GetDeletionTimestamp().IsZero() {
name := fmt.Sprintf("%s/%s", item.GetKind(), item.GetName())
err = kgc.Delete(ctx, &item)
if err != nil {
outErr += fmt.Sprintf("delete failed for %s: %v\n", name, err)
} else {
if len(item.GetFinalizers()) > 0 {
changeSet += fmt.Sprintf("%s/%s marked for deletion\n", item.GetKind(), item.GetName())
} else {
changeSet += fmt.Sprintf("%s/%s deleted\n", item.GetKind(), item.GetName())
}
}
}
}
}
} else {
return strings.TrimSuffix(string(output), "\n"), nil
}
if outErr != "" {
return outErr, false
}
return changeSet, true
}
func (kgc *KustomizeGarbageCollector) selectors(name, namespace, revision string) string {
return fmt.Sprintf("kustomization/name=%s-%s,kustomization/revision=%s", name, namespace, checksum(revision))
func (kgc *KustomizeGarbageCollector) matchingLabels(name, namespace, revision string) client.MatchingLabels {
return client.MatchingLabels{
"kustomization/name": fmt.Sprintf("%s-%s", name, namespace),
"kustomization/revision": checksum(revision),
}
}
func gcLabels(name, namespace, revision string) map[string]string {