applylib: combine field managers

This ensures that we don't lose managed fields as we update them.
This commit is contained in:
justinsb 2022-11-22 10:58:45 -05:00
parent 32cb151419
commit 8288515eb4
5 changed files with 329 additions and 78 deletions

3
go.mod
View File

@ -57,6 +57,7 @@ require (
helm.sh/helm/v3 v3.10.2
k8s.io/api v0.25.4
k8s.io/apimachinery v0.25.4
k8s.io/apiserver v0.25.4
k8s.io/cli-runtime v0.25.4
k8s.io/client-go v0.25.4
k8s.io/cloud-provider-aws v1.25.1
@ -69,6 +70,7 @@ require (
k8s.io/mount-utils v0.25.4
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2
sigs.k8s.io/controller-runtime v0.13.1
sigs.k8s.io/structured-merge-diff/v4 v4.2.3
sigs.k8s.io/yaml v1.3.0
)
@ -248,5 +250,4 @@ require (
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/kustomize/api v0.12.1 // indirect
sigs.k8s.io/kustomize/kyaml v0.13.9 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)

2
go.sum
View File

@ -1345,6 +1345,8 @@ k8s.io/apiextensions-apiserver v0.25.2 h1:8uOQX17RE7XL02ngtnh3TgifY7EhekpK+/piwz
k8s.io/apiextensions-apiserver v0.25.2/go.mod h1:iRwwRDlWPfaHhuBfQ0WMa5skdQfrE18QXJaJvIDLvE8=
k8s.io/apimachinery v0.25.4 h1:CtXsuaitMESSu339tfhVXhQrPET+EiWnIY1rcurKnAc=
k8s.io/apimachinery v0.25.4/go.mod h1:jaF9C/iPNM1FuLl7Zuy5b9v+n35HGSh6AQ4HYRkCqwo=
k8s.io/apiserver v0.25.4 h1:/3TwZcgLqX7wUxq7TtXOUqXeBTwXIblVMQdhR5XZ7yo=
k8s.io/apiserver v0.25.4/go.mod h1:rPcm567XxjOnnd7jedDUnGJGmDGAo+cT6H7QHAN+xV0=
k8s.io/cli-runtime v0.25.4 h1:GTSBN7aKBrc2LqpdO30CmHQqJtRmotxV7XsMSP+QZIk=
k8s.io/cli-runtime v0.25.4/go.mod h1:JGOw1CR8v4Mcz6cEKA7bFQe0bPrNn1l5sGAX1/Ke4Eg=
k8s.io/client-go v0.25.4 h1:3RNRDffAkNU56M/a7gUfXaEzdhZlYhoW8dgViGy5fn8=

View File

@ -22,11 +22,9 @@ import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
)
@ -102,6 +100,11 @@ func (a *ApplySet) ApplyOnce(ctx context.Context) (*ApplyResults, error) {
trackers := a.trackers
a.mutex.Unlock()
client := &UnstructuredClient{
client: a.client,
restMapper: a.restMapper,
}
results := &ApplyResults{total: len(trackers.items)}
for i := range trackers.items {
@ -113,54 +116,28 @@ func (a *ApplySet) ApplyOnce(ctx context.Context) (*ApplyResults, error) {
gvk := expectedObject.GroupVersionKind()
nn := types.NamespacedName{Namespace: ns, Name: name}
restMapping, err := a.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
currentObj, err := client.Get(ctx, gvk, nn)
if err != nil {
results.applyError(gvk, nn, fmt.Errorf("error getting rest mapping for %v: %w", gvk, err))
continue
}
gvr := restMapping.Resource
var dynamicResource dynamic.ResourceInterface
switch restMapping.Scope.Name() {
case meta.RESTScopeNameNamespace:
if ns == "" {
// TODO: Differentiate between server-fixable vs client-fixable errors?
results.applyError(gvk, nn, fmt.Errorf("namespace was not provided for namespace-scoped object %v", gvk))
if !apierrors.IsNotFound(err) {
results.applyError(gvk, nn, err)
continue
}
dynamicResource = a.client.Resource(gvr).Namespace(ns)
}
case meta.RESTScopeNameRoot:
if ns != "" {
// TODO: Differentiate between server-fixable vs client-fixable errors?
results.applyError(gvk, nn, fmt.Errorf("namespace %q was provided for cluster-scoped object %v", expectedObject.GetNamespace(), gvk))
// If the object exists, we need to update any client-side-apply field-managers
// Otherwise we often end up with old and new objects combined, which
// is unexpected and can be invalid.
if currentObj != nil {
managedFields := &ManagedFieldsMigrator{
NewManager: "kops",
Client: client,
}
if err := managedFields.Migrate(ctx, currentObj); err != nil {
results.applyError(gvk, nn, err)
continue
}
dynamicResource = a.client.Resource(gvr)
default:
// Internal error ... this is panic-level
return nil, fmt.Errorf("unknown scope for gvk %s: %q", gvk, restMapping.Scope.Name())
}
currentObj, err := dynamicResource.Get(ctx, name, v1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return nil, fmt.Errorf("could not get existing object: %w", err)
}
}
jsonData, err := createManagedFieldPatch(currentObj)
if err != nil {
return nil, fmt.Errorf("failed to create json patch: %w", err)
}
if jsonData != nil {
_, err := dynamicResource.Patch(ctx, name, types.MergePatchType, jsonData, v1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("failed to patch object %q: %w", name, err)
}
}
j, err := json.Marshal(expectedObject)
if err != nil {
// TODO: Differentiate between server-fixable vs client-fixable errors?
@ -168,7 +145,7 @@ func (a *ApplySet) ApplyOnce(ctx context.Context) (*ApplyResults, error) {
continue
}
lastApplied, err := dynamicResource.Patch(ctx, name, types.ApplyPatchType, j, a.patchOptions)
lastApplied, err := client.Patch(ctx, gvk, nn, types.ApplyPatchType, j, a.patchOptions)
if err != nil {
results.applyError(gvk, nn, fmt.Errorf("error from apply: %w", err))
continue
@ -181,36 +158,3 @@ func (a *ApplySet) ApplyOnce(ctx context.Context) (*ApplyResults, error) {
}
return results, nil
}
func createManagedFieldPatch(currentObject *unstructured.Unstructured) ([]byte, error) {
if currentObject == nil {
return nil, nil
}
fixedManagedFields := []v1.ManagedFieldsEntry{}
for _, managedField := range currentObject.GetManagedFields() {
fixedManagedField := managedField.DeepCopy()
if managedField.Manager == "kubectl-edit" || managedField.Manager == "kubectl-client-side-apply" {
fixedManagedField.Manager = "kops"
fixedManagedField.Operation = "Apply"
}
// In case we have a kops & Update manager
if fixedManagedField.Manager == "kops" && fixedManagedField.Operation == "Update" {
fixedManagedField.Operation = "Apply"
}
fixedManagedFields = append(fixedManagedFields, *fixedManagedField)
}
if len(fixedManagedFields) == 0 {
return nil, nil
}
meta := &v1.ObjectMeta{}
meta.SetManagedFields(fixedManagedFields)
patchObject := map[string]interface{}{
"metadata": meta,
}
jsonData, err := json.Marshal(patchObject)
if err != nil {
return nil, fmt.Errorf("failed to marsal %q into json: %w", currentObject.GetName(), err)
}
return jsonData, nil
}

View File

@ -0,0 +1,182 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applyset
import (
"context"
"encoding/json"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
)
// ManagedFieldsMigrator manages the migration of field managers from client-side managers to the server-side manager.
type ManagedFieldsMigrator struct {
Client *UnstructuredClient
NewManager string
}
// Migrate migrates from client-side field managers to the NewManager (with an Apply operation).
// This is needed to move from client-side apply to server-side apply.
func (m *ManagedFieldsMigrator) Migrate(ctx context.Context, obj *unstructured.Unstructured) error {
managedFieldPatch, err := m.createManagedFieldPatch(obj)
if err != nil {
return fmt.Errorf("failed to create managed-fields patch: %w", err)
}
if managedFieldPatch != nil {
gvk := obj.GroupVersionKind()
nn := types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}
_, err := m.Client.Patch(ctx, gvk, nn, types.MergePatchType, managedFieldPatch, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("failed to patch object managed-fields for %q: %w", obj.GetName(), err)
}
}
return nil
}
// createManagedFieldPatch constructs a patch to combine managed fields.
// It returns nil if no patch is needed.
func (m *ManagedFieldsMigrator) createManagedFieldPatch(currentObject *unstructured.Unstructured) ([]byte, error) {
if currentObject == nil {
return nil, nil
}
needPatch := false
fixedManagedFields := []metav1.ManagedFieldsEntry{}
for _, managedField := range currentObject.GetManagedFields() {
fixedManagedField := managedField.DeepCopy()
if managedField.Manager == "kubectl-edit" || managedField.Manager == "kubectl-client-side-apply" {
needPatch = true
fixedManagedField.Manager = m.NewManager
fixedManagedField.Operation = metav1.ManagedFieldsOperationApply
}
// In case we have an existing Update operation
if fixedManagedField.Manager == m.NewManager && fixedManagedField.Operation == "Update" {
needPatch = true
fixedManagedField.Operation = metav1.ManagedFieldsOperationApply
}
fixedManagedFields = append(fixedManagedFields, *fixedManagedField)
}
if !needPatch {
return nil, nil
}
merged, err := mergeFieldManagers(fixedManagedFields)
if err != nil {
return nil, err
}
fixedManagedFields = merged
meta := &metav1.ObjectMeta{}
meta.SetManagedFields(fixedManagedFields)
patchObject := map[string]interface{}{
"metadata": meta,
}
jsonData, err := json.Marshal(patchObject)
if err != nil {
return nil, fmt.Errorf("failed to marsal %q into json: %w", currentObject.GetName(), err)
}
return jsonData, nil
}
// fieldManagerKey is the primary key for a ManagedFieldEntry
type fieldManagerKey struct {
Manager string
Operation metav1.ManagedFieldsOperationType
Subresource string
}
// mergeFieldManagers merges the managed fields from identical field managers.
// If we don't do this, the apiserver will not currently construct the union for duplicate keys.
func mergeFieldManagers(managedFields []metav1.ManagedFieldsEntry) ([]metav1.ManagedFieldsEntry, error) {
byKey := make(map[fieldManagerKey][]metav1.ManagedFieldsEntry)
for _, f := range managedFields {
k := fieldManagerKey{
Manager: f.Manager,
Operation: f.Operation,
Subresource: f.Subresource,
}
byKey[k] = append(byKey[k], f)
}
var result []metav1.ManagedFieldsEntry
for k := range byKey {
managers := byKey[k]
if len(managers) > 1 {
fieldSet, err := mergeManagedFields(managers)
if err != nil {
return nil, err
}
encoded, err := fieldSet.ToJSON()
if err != nil {
return nil, err
}
managers[0].FieldsV1.Raw = encoded
}
result = append(result, managers[0])
}
return result, nil
}
// mergeManagedFields merges a set of ManagedFieldEntry managed fields, that are expected to have the same key.
func mergeManagedFields(managedFields []metav1.ManagedFieldsEntry) (*fieldpath.Set, error) {
if len(managedFields) == 0 {
return nil, fmt.Errorf("no managed fields supplied")
}
union, err := toFieldPathSet(&managedFields[0])
if err != nil {
return nil, err
}
for i := range managedFields {
if i == 0 {
continue
}
m := &managedFields[i]
if managedFields[0].APIVersion != m.APIVersion {
return nil, fmt.Errorf("cannot merge ManagedFieldsEntry apiVersion %q with apiVersion %q", managedFields[0].APIVersion, m.APIVersion)
}
set, err := toFieldPathSet(m)
if err != nil {
return nil, err
}
union = union.Union(set)
}
return union, nil
}
// toFieldPathSet converts an encoded ManagedFieldsEntry to a set of managed fields (a fieldpath.Set)
func toFieldPathSet(fields *metav1.ManagedFieldsEntry) (*fieldpath.Set, error) {
decoded, err := fieldmanager.DecodeManagedFields([]metav1.ManagedFieldsEntry{*fields})
if err != nil {
return nil, err
}
if len(decoded.Fields()) != 1 {
return nil, fmt.Errorf("expected a single managed fields entry, but got %d", len(decoded.Fields()))
}
for _, fieldSet := range decoded.Fields() {
return fieldSet.Set(), nil
}
return nil, fmt.Errorf("no fields were decoded")
}

View File

@ -0,0 +1,122 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applyset
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
)
// UnstructuredClient is a client that makes it easier to work with unstructured objects.
// It is similar to client.Client in controller-runtime, but is never cached.
type UnstructuredClient struct {
// client is the dynamic kubernetes client used to apply objects to the k8s cluster.
client dynamic.Interface
// restMapper is used to map object kind to resources, and to know if objects are cluster-scoped.
restMapper meta.RESTMapper
}
// NewUnstructuredClient constructs an UnstructuredClient
func NewUnstructuredClient(options Options) *UnstructuredClient {
return &UnstructuredClient{
client: options.Client,
restMapper: options.RESTMapper,
}
}
// dynamicResource is a helper to get the resource for a gvk (with the namespace)
// It returns an error if a namespace is provided for a cluster-scoped resource,
// or no namespace is provided for a namespace-scoped resource.
func (c *UnstructuredClient) dynamicResource(ctx context.Context, gvk schema.GroupVersionKind, ns string) (dynamic.ResourceInterface, error) {
restMapping, err := c.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, fmt.Errorf("error getting rest mapping for %v: %w", gvk, err)
}
gvr := restMapping.Resource
switch restMapping.Scope.Name() {
case meta.RESTScopeNameNamespace:
if ns == "" {
// TODO: Differentiate between server-fixable vs client-fixable errors?
return nil, fmt.Errorf("namespace was not provided for namespace-scoped object %v", gvk)
}
return c.client.Resource(gvr).Namespace(ns), nil
case meta.RESTScopeNameRoot:
if ns != "" {
// TODO: Differentiate between server-fixable vs client-fixable errors?
return nil, fmt.Errorf("namespace %q was provided for cluster-scoped object %v", ns, gvk)
}
return c.client.Resource(gvr), nil
default:
// Internal error ... this is panic-level
return nil, fmt.Errorf("unknown scope for gvk %s: %q", gvk, restMapping.Scope.Name())
}
}
// Patch performs a Patch operation, used for server-side apply and client-side patch.
func (c *UnstructuredClient) Patch(ctx context.Context, gvk schema.GroupVersionKind, nn types.NamespacedName, patchType types.PatchType, data []byte, opt metav1.PatchOptions) (*unstructured.Unstructured, error) {
dynamicResource, err := c.dynamicResource(ctx, gvk, nn.Namespace)
if err != nil {
return nil, err
}
name := nn.Name
patched, err := dynamicResource.Patch(ctx, name, patchType, data, opt)
if err != nil {
return nil, fmt.Errorf("error patching object: %w", err)
}
return patched, nil
}
// Update performs an Update operation on the object. Generally we should prefer server-side-apply.
func (c *UnstructuredClient) Update(ctx context.Context, obj *unstructured.Unstructured, opt metav1.UpdateOptions) (*unstructured.Unstructured, error) {
gvk := obj.GroupVersionKind()
dynamicResource, err := c.dynamicResource(ctx, gvk, obj.GetNamespace())
if err != nil {
return nil, err
}
updated, err := dynamicResource.Update(ctx, obj, opt)
if err != nil {
return nil, fmt.Errorf("error updating object: %w", err)
}
return updated, nil
}
// Get reads the specified object.
func (c *UnstructuredClient) Get(ctx context.Context, gvk schema.GroupVersionKind, nn types.NamespacedName) (*unstructured.Unstructured, error) {
dynamicResource, err := c.dynamicResource(ctx, gvk, nn.Namespace)
if err != nil {
return nil, err
}
obj, err := dynamicResource.Get(ctx, nn.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("could not get existing object: %w", err)
}
return obj, nil
}