diff --git a/go.mod b/go.mod index 2938c3b36d..ec503f464e 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( helm.sh/helm/v3 v3.10.2 k8s.io/api v0.25.4 k8s.io/apimachinery v0.25.4 + k8s.io/apiserver v0.25.4 k8s.io/cli-runtime v0.25.4 k8s.io/client-go v0.25.4 k8s.io/cloud-provider-aws v1.25.1 @@ -69,6 +70,7 @@ require ( k8s.io/mount-utils v0.25.4 k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 sigs.k8s.io/controller-runtime v0.13.1 + sigs.k8s.io/structured-merge-diff/v4 v4.2.3 sigs.k8s.io/yaml v1.3.0 ) @@ -248,5 +250,4 @@ require ( sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/kustomize/api v0.12.1 // indirect sigs.k8s.io/kustomize/kyaml v0.13.9 // indirect - sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) diff --git a/go.sum b/go.sum index 016772c0b7..82d8b658ef 100644 --- a/go.sum +++ b/go.sum @@ -1345,6 +1345,8 @@ k8s.io/apiextensions-apiserver v0.25.2 h1:8uOQX17RE7XL02ngtnh3TgifY7EhekpK+/piwz k8s.io/apiextensions-apiserver v0.25.2/go.mod h1:iRwwRDlWPfaHhuBfQ0WMa5skdQfrE18QXJaJvIDLvE8= k8s.io/apimachinery v0.25.4 h1:CtXsuaitMESSu339tfhVXhQrPET+EiWnIY1rcurKnAc= k8s.io/apimachinery v0.25.4/go.mod h1:jaF9C/iPNM1FuLl7Zuy5b9v+n35HGSh6AQ4HYRkCqwo= +k8s.io/apiserver v0.25.4 h1:/3TwZcgLqX7wUxq7TtXOUqXeBTwXIblVMQdhR5XZ7yo= +k8s.io/apiserver v0.25.4/go.mod h1:rPcm567XxjOnnd7jedDUnGJGmDGAo+cT6H7QHAN+xV0= k8s.io/cli-runtime v0.25.4 h1:GTSBN7aKBrc2LqpdO30CmHQqJtRmotxV7XsMSP+QZIk= k8s.io/cli-runtime v0.25.4/go.mod h1:JGOw1CR8v4Mcz6cEKA7bFQe0bPrNn1l5sGAX1/Ke4Eg= k8s.io/client-go v0.25.4 h1:3RNRDffAkNU56M/a7gUfXaEzdhZlYhoW8dgViGy5fn8= diff --git a/pkg/applylib/applyset/applyset.go b/pkg/applylib/applyset/applyset.go index 86560ea172..4c9e3b0e5a 100644 --- a/pkg/applylib/applyset/applyset.go +++ b/pkg/applylib/applyset/applyset.go @@ -22,11 +22,9 @@ import ( "fmt" "sync" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" ) @@ -102,6 +100,11 @@ func (a *ApplySet) ApplyOnce(ctx context.Context) (*ApplyResults, error) { trackers := a.trackers a.mutex.Unlock() + client := &UnstructuredClient{ + client: a.client, + restMapper: a.restMapper, + } + results := &ApplyResults{total: len(trackers.items)} for i := range trackers.items { @@ -113,54 +116,28 @@ func (a *ApplySet) ApplyOnce(ctx context.Context) (*ApplyResults, error) { gvk := expectedObject.GroupVersionKind() nn := types.NamespacedName{Namespace: ns, Name: name} - restMapping, err := a.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + currentObj, err := client.Get(ctx, gvk, nn) if err != nil { - results.applyError(gvk, nn, fmt.Errorf("error getting rest mapping for %v: %w", gvk, err)) - continue - } - gvr := restMapping.Resource - - var dynamicResource dynamic.ResourceInterface - - switch restMapping.Scope.Name() { - case meta.RESTScopeNameNamespace: - if ns == "" { - // TODO: Differentiate between server-fixable vs client-fixable errors? - results.applyError(gvk, nn, fmt.Errorf("namespace was not provided for namespace-scoped object %v", gvk)) + if !apierrors.IsNotFound(err) { + results.applyError(gvk, nn, err) continue } - dynamicResource = a.client.Resource(gvr).Namespace(ns) + } - case meta.RESTScopeNameRoot: - if ns != "" { - // TODO: Differentiate between server-fixable vs client-fixable errors? - results.applyError(gvk, nn, fmt.Errorf("namespace %q was provided for cluster-scoped object %v", expectedObject.GetNamespace(), gvk)) + // If the object exists, we need to update any client-side-apply field-managers + // Otherwise we often end up with old and new objects combined, which + // is unexpected and can be invalid. + if currentObj != nil { + managedFields := &ManagedFieldsMigrator{ + NewManager: "kops", + Client: client, + } + if err := managedFields.Migrate(ctx, currentObj); err != nil { + results.applyError(gvk, nn, err) continue } - dynamicResource = a.client.Resource(gvr) - - default: - // Internal error ... this is panic-level - return nil, fmt.Errorf("unknown scope for gvk %s: %q", gvk, restMapping.Scope.Name()) } - currentObj, err := dynamicResource.Get(ctx, name, v1.GetOptions{}) - if err != nil { - if !errors.IsNotFound(err) { - return nil, fmt.Errorf("could not get existing object: %w", err) - } - } - jsonData, err := createManagedFieldPatch(currentObj) - if err != nil { - return nil, fmt.Errorf("failed to create json patch: %w", err) - } - - if jsonData != nil { - _, err := dynamicResource.Patch(ctx, name, types.MergePatchType, jsonData, v1.PatchOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to patch object %q: %w", name, err) - } - } j, err := json.Marshal(expectedObject) if err != nil { // TODO: Differentiate between server-fixable vs client-fixable errors? @@ -168,7 +145,7 @@ func (a *ApplySet) ApplyOnce(ctx context.Context) (*ApplyResults, error) { continue } - lastApplied, err := dynamicResource.Patch(ctx, name, types.ApplyPatchType, j, a.patchOptions) + lastApplied, err := client.Patch(ctx, gvk, nn, types.ApplyPatchType, j, a.patchOptions) if err != nil { results.applyError(gvk, nn, fmt.Errorf("error from apply: %w", err)) continue @@ -181,36 +158,3 @@ func (a *ApplySet) ApplyOnce(ctx context.Context) (*ApplyResults, error) { } return results, nil } - -func createManagedFieldPatch(currentObject *unstructured.Unstructured) ([]byte, error) { - if currentObject == nil { - return nil, nil - } - fixedManagedFields := []v1.ManagedFieldsEntry{} - for _, managedField := range currentObject.GetManagedFields() { - fixedManagedField := managedField.DeepCopy() - if managedField.Manager == "kubectl-edit" || managedField.Manager == "kubectl-client-side-apply" { - fixedManagedField.Manager = "kops" - fixedManagedField.Operation = "Apply" - } - // In case we have a kops & Update manager - if fixedManagedField.Manager == "kops" && fixedManagedField.Operation == "Update" { - fixedManagedField.Operation = "Apply" - } - fixedManagedFields = append(fixedManagedFields, *fixedManagedField) - } - if len(fixedManagedFields) == 0 { - return nil, nil - } - meta := &v1.ObjectMeta{} - meta.SetManagedFields(fixedManagedFields) - patchObject := map[string]interface{}{ - "metadata": meta, - } - - jsonData, err := json.Marshal(patchObject) - if err != nil { - return nil, fmt.Errorf("failed to marsal %q into json: %w", currentObject.GetName(), err) - } - return jsonData, nil -} diff --git a/pkg/applylib/applyset/managedfields.go b/pkg/applylib/applyset/managedfields.go new file mode 100644 index 0000000000..ba3fbb6977 --- /dev/null +++ b/pkg/applylib/applyset/managedfields.go @@ -0,0 +1,182 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package applyset + +import ( + "context" + "encoding/json" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" + "sigs.k8s.io/structured-merge-diff/v4/fieldpath" +) + +// ManagedFieldsMigrator manages the migration of field managers from client-side managers to the server-side manager. +type ManagedFieldsMigrator struct { + Client *UnstructuredClient + NewManager string +} + +// Migrate migrates from client-side field managers to the NewManager (with an Apply operation). +// This is needed to move from client-side apply to server-side apply. +func (m *ManagedFieldsMigrator) Migrate(ctx context.Context, obj *unstructured.Unstructured) error { + managedFieldPatch, err := m.createManagedFieldPatch(obj) + if err != nil { + return fmt.Errorf("failed to create managed-fields patch: %w", err) + } + if managedFieldPatch != nil { + gvk := obj.GroupVersionKind() + nn := types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()} + _, err := m.Client.Patch(ctx, gvk, nn, types.MergePatchType, managedFieldPatch, metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("failed to patch object managed-fields for %q: %w", obj.GetName(), err) + } + } + return nil +} + +// createManagedFieldPatch constructs a patch to combine managed fields. +// It returns nil if no patch is needed. +func (m *ManagedFieldsMigrator) createManagedFieldPatch(currentObject *unstructured.Unstructured) ([]byte, error) { + if currentObject == nil { + return nil, nil + } + needPatch := false + fixedManagedFields := []metav1.ManagedFieldsEntry{} + for _, managedField := range currentObject.GetManagedFields() { + fixedManagedField := managedField.DeepCopy() + if managedField.Manager == "kubectl-edit" || managedField.Manager == "kubectl-client-side-apply" { + needPatch = true + fixedManagedField.Manager = m.NewManager + fixedManagedField.Operation = metav1.ManagedFieldsOperationApply + } + // In case we have an existing Update operation + if fixedManagedField.Manager == m.NewManager && fixedManagedField.Operation == "Update" { + needPatch = true + fixedManagedField.Operation = metav1.ManagedFieldsOperationApply + } + fixedManagedFields = append(fixedManagedFields, *fixedManagedField) + } + if !needPatch { + return nil, nil + } + + merged, err := mergeFieldManagers(fixedManagedFields) + if err != nil { + return nil, err + } + fixedManagedFields = merged + + meta := &metav1.ObjectMeta{} + meta.SetManagedFields(fixedManagedFields) + patchObject := map[string]interface{}{ + "metadata": meta, + } + + jsonData, err := json.Marshal(patchObject) + if err != nil { + return nil, fmt.Errorf("failed to marsal %q into json: %w", currentObject.GetName(), err) + } + return jsonData, nil +} + +// fieldManagerKey is the primary key for a ManagedFieldEntry +type fieldManagerKey struct { + Manager string + Operation metav1.ManagedFieldsOperationType + Subresource string +} + +// mergeFieldManagers merges the managed fields from identical field managers. +// If we don't do this, the apiserver will not currently construct the union for duplicate keys. +func mergeFieldManagers(managedFields []metav1.ManagedFieldsEntry) ([]metav1.ManagedFieldsEntry, error) { + byKey := make(map[fieldManagerKey][]metav1.ManagedFieldsEntry) + for _, f := range managedFields { + k := fieldManagerKey{ + Manager: f.Manager, + Operation: f.Operation, + Subresource: f.Subresource, + } + + byKey[k] = append(byKey[k], f) + } + + var result []metav1.ManagedFieldsEntry + for k := range byKey { + managers := byKey[k] + if len(managers) > 1 { + fieldSet, err := mergeManagedFields(managers) + if err != nil { + return nil, err + } + encoded, err := fieldSet.ToJSON() + if err != nil { + return nil, err + } + managers[0].FieldsV1.Raw = encoded + } + result = append(result, managers[0]) + } + return result, nil +} + +// mergeManagedFields merges a set of ManagedFieldEntry managed fields, that are expected to have the same key. +func mergeManagedFields(managedFields []metav1.ManagedFieldsEntry) (*fieldpath.Set, error) { + if len(managedFields) == 0 { + return nil, fmt.Errorf("no managed fields supplied") + } + + union, err := toFieldPathSet(&managedFields[0]) + if err != nil { + return nil, err + } + + for i := range managedFields { + if i == 0 { + continue + } + m := &managedFields[i] + if managedFields[0].APIVersion != m.APIVersion { + return nil, fmt.Errorf("cannot merge ManagedFieldsEntry apiVersion %q with apiVersion %q", managedFields[0].APIVersion, m.APIVersion) + } + + set, err := toFieldPathSet(m) + if err != nil { + return nil, err + } + union = union.Union(set) + } + return union, nil +} + +// toFieldPathSet converts an encoded ManagedFieldsEntry to a set of managed fields (a fieldpath.Set) +func toFieldPathSet(fields *metav1.ManagedFieldsEntry) (*fieldpath.Set, error) { + decoded, err := fieldmanager.DecodeManagedFields([]metav1.ManagedFieldsEntry{*fields}) + if err != nil { + return nil, err + } + if len(decoded.Fields()) != 1 { + return nil, fmt.Errorf("expected a single managed fields entry, but got %d", len(decoded.Fields())) + } + for _, fieldSet := range decoded.Fields() { + return fieldSet.Set(), nil + } + return nil, fmt.Errorf("no fields were decoded") +} diff --git a/pkg/applylib/applyset/unstructuredclient.go b/pkg/applylib/applyset/unstructuredclient.go new file mode 100644 index 0000000000..38a7cedb07 --- /dev/null +++ b/pkg/applylib/applyset/unstructuredclient.go @@ -0,0 +1,122 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package applyset + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" +) + +// UnstructuredClient is a client that makes it easier to work with unstructured objects. +// It is similar to client.Client in controller-runtime, but is never cached. +type UnstructuredClient struct { + // client is the dynamic kubernetes client used to apply objects to the k8s cluster. + client dynamic.Interface + // restMapper is used to map object kind to resources, and to know if objects are cluster-scoped. + restMapper meta.RESTMapper +} + +// NewUnstructuredClient constructs an UnstructuredClient +func NewUnstructuredClient(options Options) *UnstructuredClient { + return &UnstructuredClient{ + client: options.Client, + restMapper: options.RESTMapper, + } +} + +// dynamicResource is a helper to get the resource for a gvk (with the namespace) +// It returns an error if a namespace is provided for a cluster-scoped resource, +// or no namespace is provided for a namespace-scoped resource. +func (c *UnstructuredClient) dynamicResource(ctx context.Context, gvk schema.GroupVersionKind, ns string) (dynamic.ResourceInterface, error) { + restMapping, err := c.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return nil, fmt.Errorf("error getting rest mapping for %v: %w", gvk, err) + } + gvr := restMapping.Resource + + switch restMapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + if ns == "" { + // TODO: Differentiate between server-fixable vs client-fixable errors? + return nil, fmt.Errorf("namespace was not provided for namespace-scoped object %v", gvk) + } + return c.client.Resource(gvr).Namespace(ns), nil + + case meta.RESTScopeNameRoot: + if ns != "" { + // TODO: Differentiate between server-fixable vs client-fixable errors? + return nil, fmt.Errorf("namespace %q was provided for cluster-scoped object %v", ns, gvk) + } + return c.client.Resource(gvr), nil + + default: + // Internal error ... this is panic-level + return nil, fmt.Errorf("unknown scope for gvk %s: %q", gvk, restMapping.Scope.Name()) + } +} + +// Patch performs a Patch operation, used for server-side apply and client-side patch. +func (c *UnstructuredClient) Patch(ctx context.Context, gvk schema.GroupVersionKind, nn types.NamespacedName, patchType types.PatchType, data []byte, opt metav1.PatchOptions) (*unstructured.Unstructured, error) { + dynamicResource, err := c.dynamicResource(ctx, gvk, nn.Namespace) + if err != nil { + return nil, err + } + + name := nn.Name + patched, err := dynamicResource.Patch(ctx, name, patchType, data, opt) + if err != nil { + return nil, fmt.Errorf("error patching object: %w", err) + } + return patched, nil +} + +// Update performs an Update operation on the object. Generally we should prefer server-side-apply. +func (c *UnstructuredClient) Update(ctx context.Context, obj *unstructured.Unstructured, opt metav1.UpdateOptions) (*unstructured.Unstructured, error) { + gvk := obj.GroupVersionKind() + dynamicResource, err := c.dynamicResource(ctx, gvk, obj.GetNamespace()) + if err != nil { + return nil, err + } + + updated, err := dynamicResource.Update(ctx, obj, opt) + if err != nil { + return nil, fmt.Errorf("error updating object: %w", err) + } + return updated, nil +} + +// Get reads the specified object. +func (c *UnstructuredClient) Get(ctx context.Context, gvk schema.GroupVersionKind, nn types.NamespacedName) (*unstructured.Unstructured, error) { + dynamicResource, err := c.dynamicResource(ctx, gvk, nn.Namespace) + if err != nil { + return nil, err + } + + obj, err := dynamicResource.Get(ctx, nn.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("could not get existing object: %w", err) + } + + return obj, nil +}