Merge pull request #1484 from fluxcd/ssa-staged

Refactor reconciler to use `ssa.ApplyAllStaged`
This commit is contained in:
Stefan Prodan 2025-07-08 20:25:06 +03:00 committed by GitHub
commit 52170876d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 18 additions and 95 deletions

8
go.mod
View File

@ -23,13 +23,13 @@ require (
github.com/fluxcd/pkg/apis/acl v0.7.0
github.com/fluxcd/pkg/apis/event v0.17.0
github.com/fluxcd/pkg/apis/kustomize v1.10.0
github.com/fluxcd/pkg/apis/meta v1.12.0
github.com/fluxcd/pkg/apis/meta v1.13.0
github.com/fluxcd/pkg/auth v0.16.0
github.com/fluxcd/pkg/cache v0.9.0
github.com/fluxcd/pkg/http/fetch v0.16.0
github.com/fluxcd/pkg/kustomize v1.18.0
github.com/fluxcd/pkg/runtime v0.60.0
github.com/fluxcd/pkg/ssa v0.49.0
github.com/fluxcd/pkg/runtime v0.63.0
github.com/fluxcd/pkg/ssa v0.50.1
github.com/fluxcd/pkg/tar v0.12.0
github.com/fluxcd/pkg/testserver v0.11.0
github.com/fluxcd/source-controller/api v1.6.0
@ -46,7 +46,7 @@ require (
k8s.io/client-go v0.33.0
k8s.io/utils v0.0.0-20250321185631-1f6e0b77f77e
sigs.k8s.io/controller-runtime v0.21.0
sigs.k8s.io/kustomize/api v0.19.0
sigs.k8s.io/kustomize/api v0.20.0
sigs.k8s.io/yaml v1.5.0
)

12
go.sum
View File

@ -188,8 +188,8 @@ github.com/fluxcd/pkg/apis/event v0.17.0 h1:foEINE++pCJlWVhWjYDXfkVmGKu8mQ4BDBlb
github.com/fluxcd/pkg/apis/event v0.17.0/go.mod h1:0fLhLFiHlRTDKPDXdRnv+tS7mCMIQ0fJxnEfmvGM/5A=
github.com/fluxcd/pkg/apis/kustomize v1.10.0 h1:47EeSzkQvlQZdH92vHMe2lK2iR8aOSEJq95avw5idts=
github.com/fluxcd/pkg/apis/kustomize v1.10.0/go.mod h1:UsqMV4sqNa1Yg0pmTsdkHRJr7bafBOENIJoAN+3ezaQ=
github.com/fluxcd/pkg/apis/meta v1.12.0 h1:XW15TKZieC2b7MN8VS85stqZJOx+/b8jATQ/xTUhVYg=
github.com/fluxcd/pkg/apis/meta v1.12.0/go.mod h1:+son1Va60x2eiDcTwd7lcctbI6C+K3gM7R+ULmEq1SI=
github.com/fluxcd/pkg/apis/meta v1.13.0 h1:KKYdXFzEmSuM8CP1Zqpt68IF+jqQILeiDiTmXI7DMTM=
github.com/fluxcd/pkg/apis/meta v1.13.0/go.mod h1:+son1Va60x2eiDcTwd7lcctbI6C+K3gM7R+ULmEq1SI=
github.com/fluxcd/pkg/auth v0.16.0 h1:YEjSaNqlpYoXfoFAGhU/Z8y0322nGsT24W6zCh+sbGw=
github.com/fluxcd/pkg/auth v0.16.0/go.mod h1:+BRnAO61Nr6fACEjJS6eNRdOk1nXhX/FCPylYn1ypNc=
github.com/fluxcd/pkg/cache v0.9.0 h1:EGKfOLMG3fOwWnH/4Axl5xd425mxoQbZzlZoLfd8PDk=
@ -200,12 +200,12 @@ github.com/fluxcd/pkg/http/fetch v0.16.0 h1:XzhBTSK5HNdAPEnEGMJHwtoN2LfqQ9QFDsu3
github.com/fluxcd/pkg/http/fetch v0.16.0/go.mod h1:+A+yrOzwA5436ufD8NPeCCQFNzk4metoPUgRVCozvzw=
github.com/fluxcd/pkg/kustomize v1.18.0 h1:wWK+qYwmBmba3N3VAqZ9ijnfVGGaIjcaHWo033URZTw=
github.com/fluxcd/pkg/kustomize v1.18.0/go.mod h1:Ij9722MdWIE6B1EPg2ZJUf6npycgfRfN4Lohi7D/Kic=
github.com/fluxcd/pkg/runtime v0.60.0 h1:d++EkV3FlycB+bzakB5NumwY4J8xts8i7lbvD6jBLeU=
github.com/fluxcd/pkg/runtime v0.60.0/go.mod h1:UeU0/eZLErYC/1bTmgzBfNXhiHy9fuQzjfLK0HxRgxY=
github.com/fluxcd/pkg/runtime v0.63.0 h1:55J7ascGmXyTXWGwhD21N9fU7jC1l5rhdzjgNXs6aZg=
github.com/fluxcd/pkg/runtime v0.63.0/go.mod h1:7pxGvaU0Yy1cDIUhiHAHhCx2yCLnkcVsplbYZG6j4JY=
github.com/fluxcd/pkg/sourceignore v0.12.0 h1:jCIe6d50rQ3wdXPF0+PhhqN0XrTRIq3upMomPelI8Mw=
github.com/fluxcd/pkg/sourceignore v0.12.0/go.mod h1:dc0zvkuXM5OgL/b3IkrVuwvPjj1zJn4NBUMH45uJ4Y0=
github.com/fluxcd/pkg/ssa v0.49.0 h1:3xBMxWQIpmKu+zUmyuKQ9M4f+ALhbMJIkiLXeGkhig4=
github.com/fluxcd/pkg/ssa v0.49.0/go.mod h1:T50TO0U2obLodZnrFgOrxollfBEy4V673OkM2aTUF1c=
github.com/fluxcd/pkg/ssa v0.50.1 h1:ESyHtd0B5vyrnKunfHfUesT8ZtdMHBRPKtoxpxOGIYM=
github.com/fluxcd/pkg/ssa v0.50.1/go.mod h1:T50TO0U2obLodZnrFgOrxollfBEy4V673OkM2aTUF1c=
github.com/fluxcd/pkg/tar v0.12.0 h1:og6F+ivnWNRbNJSq0ukCTVs7YrGIlzjxSVZU+E8NprM=
github.com/fluxcd/pkg/tar v0.12.0/go.mod h1:Ra5Cj++MD5iCy7bZGKJJX3GpOeMPv+ZDkPO9bBwpDeU=
github.com/fluxcd/pkg/testserver v0.11.0 h1:a/kxpFqv7XQxZjwVPP3voooRmSd/3ipLVolK0xUIxXQ=

View File

@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"os"
"sort"
"strings"
"time"
@ -807,120 +806,44 @@ func (r *KustomizationReconciler) apply(ctx context.Context,
},
}
// contains only CRDs and Namespaces
var defStage []*unstructured.Unstructured
// contains only Kubernetes Class types e.g.: RuntimeClass, PriorityClass,
// StorageClass, VolumeSnapshotClass, IngressClass, GatewayClass, ClusterClass, etc
var classStage []*unstructured.Unstructured
// contains all objects except for CRDs, Namespaces and Class type objects
var resStage []*unstructured.Unstructured
// contains the objects' metadata after apply
resultSet := ssa.NewChangeSet()
for _, u := range objects {
if decryptor.IsEncryptedSecret(u) {
return false, nil,
fmt.Errorf("%s is SOPS encrypted, configuring decryption is required for this secret to be reconciled",
ssautil.FmtUnstructured(u))
}
switch {
case ssautil.IsClusterDefinition(u):
defStage = append(defStage, u)
case strings.HasSuffix(u.GetKind(), "Class"):
classStage = append(classStage, u)
default:
resStage = append(resStage, u)
}
}
// contains the objects' metadata after apply
resultSet := ssa.NewChangeSet()
var changeSetLog strings.Builder
// validate, apply and wait for CRDs and Namespaces to register
if len(defStage) > 0 {
changeSet, err := manager.ApplyAll(ctx, defStage, applyOpts)
if err != nil {
return false, nil, err
}
if len(objects) > 0 {
changeSet, err := manager.ApplyAllStaged(ctx, objects, applyOpts)
if changeSet != nil && len(changeSet.Entries) > 0 {
resultSet.Append(changeSet.Entries)
if r.GroupChangeLog {
log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToGroupedMap())
} else {
log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToMap())
}
// filter out the objects that have not changed
for _, change := range changeSet.Entries {
if HasChanged(change.Action) {
changeSetLog.WriteString(change.String() + "\n")
}
}
if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), ssa.WaitOptions{
Interval: 2 * time.Second,
Timeout: obj.GetTimeout(),
}); err != nil {
return false, nil, err
}
}
}
// validate, apply and wait for Class type objects to register
if len(classStage) > 0 {
changeSet, err := manager.ApplyAll(ctx, classStage, applyOpts)
if err != nil {
return false, nil, err
}
if changeSet != nil && len(changeSet.Entries) > 0 {
resultSet.Append(changeSet.Entries)
if r.GroupChangeLog {
log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToGroupedMap())
} else {
log.Info("server-side apply for cluster class types completed", "output", changeSet.ToMap())
}
for _, change := range changeSet.Entries {
if HasChanged(change.Action) {
changeSetLog.WriteString(change.String() + "\n")
}
}
if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), ssa.WaitOptions{
Interval: 2 * time.Second,
Timeout: obj.GetTimeout(),
}); err != nil {
return false, nil, err
}
}
}
// sort by kind, validate and apply all the others objects
sort.Sort(ssa.SortableUnstructureds(resStage))
if len(resStage) > 0 {
changeSet, err := manager.ApplyAll(ctx, resStage, applyOpts)
// include the change log in the error message in case af a partial apply
if err != nil {
return false, nil, fmt.Errorf("%w\n%s", err, changeSetLog.String())
}
// log all applied objects
if changeSet != nil && len(changeSet.Entries) > 0 {
resultSet.Append(changeSet.Entries)
if r.GroupChangeLog {
log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToGroupedMap())
log.Info("server-side apply completed", "output", changeSet.ToGroupedMap(), "revision", revision)
} else {
log.Info("server-side apply completed", "output", changeSet.ToMap(), "revision", revision)
}
for _, change := range changeSet.Entries {
if HasChanged(change.Action) {
changeSetLog.WriteString(change.String() + "\n")
}
}
}
}