Merge pull request #81816 from jennybuckley/apply-cap-managers

[server-side apply] Cap the number of managedFields entries for updates at 10

Kubernetes-commit: a8e8e54f7a6e3267c7c47bb2037a2dc0ffce8976
This commit is contained in:
Kubernetes Publisher 2019-10-05 03:15:11 -07:00
commit 279a76350f
13 changed files with 961 additions and 303 deletions

View File

@ -0,0 +1,72 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
)
type buildManagerInfoManager struct {
fieldManager Manager
groupVersion schema.GroupVersion
}
var _ Manager = &buildManagerInfoManager{}
// NewBuildManagerInfoManager creates a new Manager that converts the manager name into a unique identifier
// combining operation and version for update requests, and just operation for apply requests.
func NewBuildManagerInfoManager(f Manager, gv schema.GroupVersion) Manager {
return &buildManagerInfoManager{
fieldManager: f,
groupVersion: gv,
}
}
// Update implements Manager.
func (f *buildManagerInfoManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
manager, err := f.buildManagerInfo(manager, metav1.ManagedFieldsOperationUpdate)
if err != nil {
return nil, nil, fmt.Errorf("failed to build manager identifier: %v", err)
}
return f.fieldManager.Update(liveObj, newObj, managed, manager)
}
// Apply implements Manager.
func (f *buildManagerInfoManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, manager string, force bool) (runtime.Object, Managed, error) {
manager, err := f.buildManagerInfo(manager, metav1.ManagedFieldsOperationApply)
if err != nil {
return nil, nil, fmt.Errorf("failed to build manager identifier: %v", err)
}
return f.fieldManager.Apply(liveObj, patch, managed, manager, force)
}
func (f *buildManagerInfoManager) buildManagerInfo(prefix string, operation metav1.ManagedFieldsOperationType) (string, error) {
managerInfo := metav1.ManagedFieldsEntry{
Manager: prefix,
Operation: operation,
APIVersion: f.groupVersion.String(),
}
if managerInfo.Manager == "" {
managerInfo.Manager = "unknown"
}
return internal.BuildManagerIdentifier(&managerInfo)
}

View File

@ -0,0 +1,135 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"fmt"
"sort"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
"sigs.k8s.io/structured-merge-diff/fieldpath"
)
type capManagersManager struct {
fieldManager Manager
maxUpdateManagers int
oldUpdatesManagerName string
}
var _ Manager = &capManagersManager{}
// NewCapManagersManager creates a new wrapped FieldManager which ensures that the number of managers from updates
// does not exceed maxUpdateManagers, by merging some of the oldest entries on each update.
func NewCapManagersManager(fieldManager Manager, maxUpdateManagers int) Manager {
return &capManagersManager{
fieldManager: fieldManager,
maxUpdateManagers: maxUpdateManagers,
oldUpdatesManagerName: "ancient-changes",
}
}
// Update implements Manager.
func (f *capManagersManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
object, managed, err := f.fieldManager.Update(liveObj, newObj, managed, manager)
if err != nil {
return object, managed, err
}
if managed, err = f.capUpdateManagers(managed); err != nil {
return nil, nil, fmt.Errorf("failed to cap update managers: %v", err)
}
return object, managed, nil
}
// Apply implements Manager.
func (f *capManagersManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, fieldManager string, force bool) (runtime.Object, Managed, error) {
return f.fieldManager.Apply(liveObj, patch, managed, fieldManager, force)
}
// capUpdateManagers merges a number of the oldest update entries into versioned buckets,
// such that the number of entries from updates does not exceed f.maxUpdateManagers.
func (f *capManagersManager) capUpdateManagers(managed Managed) (newManaged Managed, err error) {
// Gather all entries from updates
updaters := []string{}
for manager, fields := range managed.Fields() {
if fields.Applied() == false {
updaters = append(updaters, manager)
}
}
if len(updaters) <= f.maxUpdateManagers {
return managed, nil
}
// If we have more than the maximum, sort the update entries by time, oldest first.
sort.Slice(updaters, func(i, j int) bool {
iTime, jTime, nTime := managed.Times()[updaters[i]], managed.Times()[updaters[j]], &metav1.Time{Time: time.Time{}}
if iTime == nil {
iTime = nTime
}
if jTime == nil {
jTime = nTime
}
if !iTime.Equal(jTime) {
return iTime.Before(jTime)
}
return updaters[i] < updaters[j]
})
// Merge the oldest updaters with versioned bucket managers until the number of updaters is under the cap
versionToFirstManager := map[string]string{}
for i, length := 0, len(updaters); i < len(updaters) && length > f.maxUpdateManagers; i++ {
manager := updaters[i]
vs := managed.Fields()[manager]
time := managed.Times()[manager]
version := string(vs.APIVersion())
// Create a new manager identifier for the versioned bucket entry.
// The version for this manager comes from the version of the update being merged into the bucket.
bucket, err := internal.BuildManagerIdentifier(&metav1.ManagedFieldsEntry{
Manager: f.oldUpdatesManagerName,
Operation: metav1.ManagedFieldsOperationUpdate,
APIVersion: version,
})
if err != nil {
return managed, fmt.Errorf("failed to create bucket manager for version %v: %v", version, err)
}
// Merge the fieldets if this is not the first time the version was seen.
// Otherwise just record the manager name in versionToFirstManager
if first, ok := versionToFirstManager[version]; ok {
// If the bucket doesn't exists yet, create one.
if _, ok := managed.Fields()[bucket]; !ok {
s := managed.Fields()[first]
delete(managed.Fields(), first)
managed.Fields()[bucket] = s
}
managed.Fields()[bucket] = fieldpath.NewVersionedSet(vs.Set().Union(managed.Fields()[bucket].Set()), vs.APIVersion(), vs.Applied())
delete(managed.Fields(), manager)
length--
// Use the time from the update being merged into the bucket, since it is more recent.
managed.Times()[bucket] = time
} else {
versionToFirstManager[version] = manager
}
}
return managed, nil
}

View File

@ -0,0 +1,286 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager_test
import (
"bytes"
"encoding/json"
"fmt"
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"sigs.k8s.io/structured-merge-diff/fieldpath"
)
type fakeManager struct{}
var _ fieldmanager.Manager = &fakeManager{}
func (*fakeManager) Update(_, newObj runtime.Object, managed fieldmanager.Managed, _ string) (runtime.Object, fieldmanager.Managed, error) {
return newObj, managed, nil
}
func (*fakeManager) Apply(_ runtime.Object, _ []byte, _ fieldmanager.Managed, _ string, force bool) (runtime.Object, fieldmanager.Managed, error) {
panic("not implemented")
return nil, nil, nil
}
func TestCapManagersManagerMergesEntries(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f.fieldManager = fieldmanager.NewCapManagersManager(f.fieldManager, 3)
podWithLabels := func(labels ...string) runtime.Object {
labelMap := map[string]interface{}{}
for _, key := range labels {
labelMap[key] = "true"
}
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"labels": labelMap,
},
},
}
obj.SetKind("Pod")
obj.SetAPIVersion("v1")
return obj
}
if err := f.Update(podWithLabels("one"), "fieldmanager_test_update_1"); err != nil {
t.Fatalf("failed to update object: %v", err)
}
expectIdempotence(t, f)
if err := f.Update(podWithLabels("one", "two"), "fieldmanager_test_update_2"); err != nil {
t.Fatalf("failed to update object: %v", err)
}
expectIdempotence(t, f)
if err := f.Update(podWithLabels("one", "two", "three"), "fieldmanager_test_update_3"); err != nil {
t.Fatalf("failed to update object: %v", err)
}
expectIdempotence(t, f)
if err := f.Update(podWithLabels("one", "two", "three", "four"), "fieldmanager_test_update_4"); err != nil {
t.Fatalf("failed to update object: %v", err)
}
expectIdempotence(t, f)
if e, a := 3, len(f.ManagedFields()); e != a {
t.Fatalf("exected %v entries in managedFields, but got %v: %#v", e, a, f.ManagedFields())
}
if e, a := "ancient-changes", f.ManagedFields()[0].Manager; e != a {
t.Fatalf("exected first manager name to be %v, but got %v: %#v", e, a, f.ManagedFields())
}
if e, a := "fieldmanager_test_update_3", f.ManagedFields()[1].Manager; e != a {
t.Fatalf("exected second manager name to be %v, but got %v: %#v", e, a, f.ManagedFields())
}
if e, a := "fieldmanager_test_update_4", f.ManagedFields()[2].Manager; e != a {
t.Fatalf("exected third manager name to be %v, but got %v: %#v", e, a, f.ManagedFields())
}
expectManagesField(t, f, "ancient-changes", fieldpath.MakePathOrDie("metadata", "labels", "one"))
expectManagesField(t, f, "ancient-changes", fieldpath.MakePathOrDie("metadata", "labels", "two"))
expectManagesField(t, f, "fieldmanager_test_update_3", fieldpath.MakePathOrDie("metadata", "labels", "three"))
expectManagesField(t, f, "fieldmanager_test_update_4", fieldpath.MakePathOrDie("metadata", "labels", "four"))
}
func TestCapUpdateManagers(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f.fieldManager = fieldmanager.NewCapManagersManager(&fakeManager{}, 3)
set := func(fields ...string) *metav1.FieldsV1 {
s := fieldpath.NewSet()
for _, f := range fields {
s.Insert(fieldpath.MakePathOrDie(f))
}
b, err := s.ToJSON()
if err != nil {
panic(fmt.Sprintf("error building ManagedFieldsEntry for test: %v", err))
}
return &metav1.FieldsV1{Raw: b}
}
entry := func(name string, version string, order int, fields *metav1.FieldsV1) metav1.ManagedFieldsEntry {
return metav1.ManagedFieldsEntry{
Manager: name,
APIVersion: version,
Operation: "Update",
FieldsType: "FieldsV1",
FieldsV1: fields,
Time: &metav1.Time{Time: time.Time{}.Add(time.Hour * time.Duration(order))},
}
}
testCases := []struct {
name string
input []metav1.ManagedFieldsEntry
expected []metav1.ManagedFieldsEntry
}{
{
name: "one version, no ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("update-manager1", "v1", 1, set("a")),
entry("update-manager2", "v1", 2, set("b")),
entry("update-manager3", "v1", 3, set("c")),
entry("update-manager4", "v1", 4, set("d")),
},
expected: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 2, set("a", "b")),
entry("update-manager3", "v1", 3, set("c")),
entry("update-manager4", "v1", 4, set("d")),
},
}, {
name: "one version, one ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 2, set("a", "b")),
entry("update-manager3", "v1", 3, set("c")),
entry("update-manager4", "v1", 4, set("d")),
entry("update-manager5", "v1", 5, set("e")),
},
expected: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 3, set("a", "b", "c")),
entry("update-manager4", "v1", 4, set("d")),
entry("update-manager5", "v1", 5, set("e")),
},
}, {
name: "two versions, no ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("update-manager1", "v1", 1, set("a")),
entry("update-manager2", "v2", 2, set("b")),
entry("update-manager3", "v1", 3, set("c")),
entry("update-manager4", "v1", 4, set("d")),
entry("update-manager5", "v1", 5, set("e")),
},
expected: []metav1.ManagedFieldsEntry{
entry("update-manager2", "v2", 2, set("b")),
entry("ancient-changes", "v1", 4, set("a", "c", "d")),
entry("update-manager5", "v1", 5, set("e")),
},
}, {
name: "three versions, one ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("update-manager2", "v2", 2, set("b")),
entry("ancient-changes", "v1", 4, set("a", "c", "d")),
entry("update-manager5", "v1", 5, set("e")),
entry("update-manager6", "v3", 6, set("f")),
entry("update-manager7", "v2", 7, set("g")),
},
expected: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")),
entry("update-manager6", "v3", 6, set("f")),
entry("ancient-changes", "v2", 7, set("b", "g")),
},
}, {
name: "three versions, two ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")),
entry("update-manager6", "v3", 6, set("f")),
entry("ancient-changes", "v2", 7, set("b", "g")),
entry("update-manager8", "v3", 8, set("h")),
},
expected: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")),
entry("ancient-changes", "v2", 7, set("b", "g")),
entry("ancient-changes", "v3", 8, set("f", "h")),
},
}, {
name: "four versions, two ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")),
entry("update-manager6", "v3", 6, set("f")),
entry("ancient-changes", "v2", 7, set("b", "g")),
entry("update-manager8", "v4", 8, set("h")),
},
expected: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")),
entry("update-manager6", "v3", 6, set("f")),
entry("ancient-changes", "v2", 7, set("b", "g")),
entry("update-manager8", "v4", 8, set("h")),
},
},
}
for _, tc := range testCases {
f.Reset()
accessor, err := meta.Accessor(f.liveObj)
if err != nil {
t.Fatalf("%v: couldn't get accessor: %v", tc.name, err)
}
accessor.SetManagedFields(tc.input)
if err := f.Update(f.liveObj, "no-op-update"); err != nil {
t.Fatalf("%v: failed to do no-op update to object: %v", tc.name, err)
}
if e, a := tc.expected, f.ManagedFields(); !apiequality.Semantic.DeepEqual(e, a) {
t.Errorf("%v: unexpected value for managedFields:\nexpected: %v\n but got: %v", tc.name, mustMarshal(e), mustMarshal(a))
}
expectIdempotence(t, f)
}
}
// expectIdempotence does a no-op update and ensures that managedFields doesn't change by calling capUpdateManagers.
func expectIdempotence(t *testing.T, f TestFieldManager) {
before := []metav1.ManagedFieldsEntry{}
for _, m := range f.ManagedFields() {
before = append(before, *m.DeepCopy())
}
if err := f.Update(f.liveObj, "no-op-update"); err != nil {
t.Fatalf("failed to do no-op update to object: %v", err)
}
if after := f.ManagedFields(); !apiequality.Semantic.DeepEqual(before, after) {
t.Fatalf("exected idempotence, but managedFields changed:\nbefore: %v\n after: %v", mustMarshal(before), mustMarshal(after))
}
}
// expectManagesField ensures that manager m currently manages field path p.
func expectManagesField(t *testing.T, f TestFieldManager, m string, p fieldpath.Path) {
for _, e := range f.ManagedFields() {
if e.Manager == m {
var s fieldpath.Set
err := s.FromJSON(bytes.NewReader(e.FieldsV1.Raw))
if err != nil {
t.Fatalf("error parsing managedFields for %v: %v: %#v", m, err, f.ManagedFields())
}
if !s.Has(p) {
t.Fatalf("expected managedFields for %v to contain %v, but got:\n%v", m, p.String(), s.String())
}
return
}
}
t.Fatalf("exected to find manager name %v, but got: %#v", m, f.ManagedFields())
}
func mustMarshal(i interface{}) string {
b, err := json.MarshalIndent(i, "", " ")
if err != nil {
panic(fmt.Sprintf("error marshalling %v to json: %v", i, err))
}
return string(b)
}

View File

@ -18,302 +18,144 @@ package fieldmanager
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
"k8s.io/klog"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/structured-merge-diff/fieldpath"
"sigs.k8s.io/structured-merge-diff/merge"
"sigs.k8s.io/yaml"
)
// FieldManager updates the managed fields and merge applied
// configurations.
type FieldManager interface {
// DefaultMaxUpdateManagers defines the default maximum retained number of managedFields entries from updates
// if the number of update managers exceeds this, the oldest entries will be merged until the number is below the maximum.
// TODO(jennybuckley): Determine if this is really the best value. Ideally we wouldn't unnecessarily merge too many entries.
const DefaultMaxUpdateManagers int = 10
// Managed groups a fieldpath.ManagedFields together with the timestamps associated with each operation.
type Managed interface {
// Fields gets the fieldpath.ManagedFields.
Fields() fieldpath.ManagedFields
// Times gets the timestamps associated with each operation.
Times() map[string]*metav1.Time
}
// Manager updates the managed fields and merges applied configurations.
type Manager interface {
// Update is used when the object has already been merged (non-apply
// use-case), and simply updates the managed fields in the output
// object.
Update(liveObj, newObj runtime.Object, manager string) (runtime.Object, error)
Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error)
// Apply is used when server-side apply is called, as it merges the
// object and update the managed fields.
Apply(liveObj runtime.Object, patch []byte, fieldManager string, force bool) (runtime.Object, error)
// object and updates the managed fields.
Apply(liveObj runtime.Object, patch []byte, managed Managed, fieldManager string, force bool) (runtime.Object, Managed, error)
}
type fieldManager struct {
typeConverter internal.TypeConverter
objectConverter runtime.ObjectConvertor
objectDefaulter runtime.ObjectDefaulter
groupVersion schema.GroupVersion
hubVersion schema.GroupVersion
updater merge.Updater
// FieldManager updates the managed fields and merge applied
// configurations.
type FieldManager struct {
fieldManager Manager
}
var _ FieldManager = &fieldManager{}
// NewFieldManager creates a new FieldManager that decodes, manages, then re-encodes managedFields
// on update and apply requests.
func NewFieldManager(f Manager) *FieldManager {
return &FieldManager{f}
}
// NewFieldManager creates a new FieldManager that merges apply requests
// NewDefaultFieldManager creates a new FieldManager that merges apply requests
// and update managed fields for other types of requests.
func NewFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion) (FieldManager, error) {
typeConverter, err := internal.NewTypeConverter(models, false)
func NewDefaultFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind, hub schema.GroupVersion) (*FieldManager, error) {
f, err := NewStructuredMergeManager(models, objectConverter, objectDefaulter, kind.GroupVersion(), hub)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create field manager: %v", err)
}
return &fieldManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
objectDefaulter: objectDefaulter,
groupVersion: gv,
hubVersion: hub,
updater: merge.Updater{
Converter: internal.NewVersionConverter(typeConverter, objectConverter, hub),
},
}, nil
return newDefaultFieldManager(f, objectCreater, kind), nil
}
// NewCRDFieldManager creates a new FieldManager specifically for
// NewDefaultCRDFieldManager creates a new FieldManager specifically for
// CRDs. This allows for the possibility of fields which are not defined
// in models, as well as having no models defined at all.
func NewCRDFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion, preserveUnknownFields bool) (_ FieldManager, err error) {
var typeConverter internal.TypeConverter = internal.DeducedTypeConverter{}
if models != nil {
typeConverter, err = internal.NewTypeConverter(models, preserveUnknownFields)
if err != nil {
return nil, err
}
func NewDefaultCRDFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind, hub schema.GroupVersion, preserveUnknownFields bool) (_ *FieldManager, err error) {
f, err := NewCRDStructuredMergeManager(models, objectConverter, objectDefaulter, kind.GroupVersion(), hub, preserveUnknownFields)
if err != nil {
return nil, fmt.Errorf("failed to create field manager: %v", err)
}
return &fieldManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
objectDefaulter: objectDefaulter,
groupVersion: gv,
hubVersion: hub,
updater: merge.Updater{
Converter: internal.NewCRDVersionConverter(typeConverter, objectConverter, hub),
},
}, nil
return newDefaultFieldManager(f, objectCreater, kind), nil
}
// Update implements FieldManager.
func (f *fieldManager) Update(liveObj, newObj runtime.Object, manager string) (runtime.Object, error) {
// newDefaultFieldManager is a helper function which wraps a Manager with certain default logic.
func newDefaultFieldManager(f Manager, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind) *FieldManager {
f = NewStripMetaManager(f)
f = NewBuildManagerInfoManager(f, kind.GroupVersion())
f = NewCapManagersManager(f, DefaultMaxUpdateManagers)
f = NewSkipNonAppliedManager(f, objectCreater, kind)
return NewFieldManager(f)
}
// Update is used when the object has already been merged (non-apply
// use-case), and simply updates the managed fields in the output
// object.
func (f *FieldManager) Update(liveObj, newObj runtime.Object, manager string) (object runtime.Object, err error) {
// If the object doesn't have metadata, we should just return without trying to
// set the managedFields at all, so creates/updates/patches will work normally.
if _, err := meta.Accessor(newObj); err != nil {
if _, err = meta.Accessor(newObj); err != nil {
return newObj, nil
}
// First try to decode the managed fields provided in the update,
// This is necessary to allow directly updating managed fields.
managed, err := internal.DecodeObjectManagedFields(newObj)
// If the managed field is empty or we failed to decode it,
// let's try the live object. This is to prevent clients who
// don't understand managedFields from deleting it accidentally.
if err != nil || len(managed.Fields) == 0 {
var managed Managed
if managed, err = internal.DecodeObjectManagedFields(newObj); err != nil || len(managed.Fields()) == 0 {
// If the managed field is empty or we failed to decode it,
// let's try the live object. This is to prevent clients who
// don't understand managedFields from deleting it accidentally.
managed, err = internal.DecodeObjectManagedFields(liveObj)
if err != nil {
return nil, fmt.Errorf("failed to decode managed fields: %v", err)
}
}
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {
return nil, fmt.Errorf("failed to convert live object to proper version: %v", err)
}
internal.RemoveObjectManagedFields(liveObjVersioned)
internal.RemoveObjectManagedFields(newObjVersioned)
newObjTyped, err := f.typeConverter.ObjectToTyped(newObjVersioned)
if err != nil {
// Return newObj and just by-pass fields update. This really shouldn't happen.
klog.Errorf("[SHOULD NOT HAPPEN] failed to create typed new object: %v", err)
return newObj, nil
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
// Return newObj and just by-pass fields update. This really shouldn't happen.
klog.Errorf("[SHOULD NOT HAPPEN] failed to create typed live object: %v", err)
return newObj, nil
}
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
// TODO(apelisse) use the first return value when unions are implemented
_, managed.Fields, err = f.updater.Update(liveObjTyped, newObjTyped, apiVersion, managed.Fields, manager)
if err != nil {
return nil, fmt.Errorf("failed to update ManagedFields: %v", err)
}
managed.Fields = f.stripFields(managed.Fields, manager)
internal.RemoveObjectManagedFields(liveObj)
internal.RemoveObjectManagedFields(newObj)
// If the current operation took any fields from anything, it means the object changed,
// so update the timestamp of the managedFieldsEntry and merge with any previous updates from the same manager
if vs, ok := managed.Fields[manager]; ok {
delete(managed.Fields, manager)
// Build a manager identifier which will only match previous updates from the same manager
manager, err = f.buildManagerInfo(manager, metav1.ManagedFieldsOperationUpdate)
if err != nil {
return nil, fmt.Errorf("failed to build manager identifier: %v", err)
}
managed.Times[manager] = &metav1.Time{Time: time.Now().UTC()}
if previous, ok := managed.Fields[manager]; ok {
managed.Fields[manager] = fieldpath.NewVersionedSet(vs.Set().Union(previous.Set()), vs.APIVersion(), vs.Applied())
} else {
managed.Fields[manager] = vs
}
if object, managed, err = f.fieldManager.Update(liveObj, newObj, managed, manager); err != nil {
return nil, err
}
if err := internal.EncodeObjectManagedFields(newObj, managed); err != nil {
if err = internal.EncodeObjectManagedFields(object, managed); err != nil {
return nil, fmt.Errorf("failed to encode managed fields: %v", err)
}
return newObj, nil
return object, nil
}
// Apply implements FieldManager.
func (f *fieldManager) Apply(liveObj runtime.Object, patch []byte, fieldManager string, force bool) (runtime.Object, error) {
// Apply is used when server-side apply is called, as it merges the
// object and updates the managed fields.
func (f *FieldManager) Apply(liveObj runtime.Object, patch []byte, manager string, force bool) (object runtime.Object, err error) {
// If the object doesn't have metadata, apply isn't allowed.
_, err := meta.Accessor(liveObj)
if err != nil {
if _, err = meta.Accessor(liveObj); err != nil {
return nil, fmt.Errorf("couldn't get accessor: %v", err)
}
managed, err := internal.DecodeObjectManagedFields(liveObj)
if err != nil {
// Decode the managed fields in the live object, since it isn't allowed in the patch.
var managed Managed
if managed, err = internal.DecodeObjectManagedFields(liveObj); err != nil {
return nil, fmt.Errorf("failed to decode managed fields: %v", err)
}
// Check that the patch object has the same version as the live object
patchObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(patch, &patchObj.Object); err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error decoding YAML: %v", err))
}
internal.RemoveObjectManagedFields(liveObj)
if patchObj.GetManagedFields() != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("metadata.managedFields must be nil"))
}
if patchObj.GetAPIVersion() != f.groupVersion.String() {
return nil,
errors.NewBadRequest(
fmt.Sprintf("Incorrect version specified in apply patch. "+
"Specified patch version: %s, expected: %s",
patchObj.GetAPIVersion(), f.groupVersion.String()))
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {
return nil, fmt.Errorf("failed to convert live object to proper version: %v", err)
}
internal.RemoveObjectManagedFields(liveObjVersioned)
patchObjTyped, err := f.typeConverter.ObjectToTyped(patchObj)
if err != nil {
return nil, fmt.Errorf("failed to create typed patch object: %v", err)
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
return nil, fmt.Errorf("failed to create typed live object: %v", err)
}
manager, err := f.buildManagerInfo(fieldManager, metav1.ManagedFieldsOperationApply)
if err != nil {
return nil, fmt.Errorf("failed to build manager identifier: %v", err)
}
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
newObjTyped, managedFields, err := f.updater.Apply(liveObjTyped, patchObjTyped, apiVersion, managed.Fields, manager, force)
if err != nil {
if conflicts, ok := err.(merge.Conflicts); ok {
return nil, internal.NewConflictError(conflicts)
}
if object, managed, err = f.fieldManager.Apply(liveObj, patch, managed, manager, force); err != nil {
return nil, err
}
managed.Fields = f.stripFields(managedFields, manager)
// Update the time in the managedFieldsEntry for this operation
managed.Times[manager] = &metav1.Time{Time: time.Now().UTC()}
newObj, err := f.typeConverter.TypedToObject(newObjTyped)
if err != nil {
return nil, fmt.Errorf("failed to convert new typed object to object: %v", err)
}
if err := internal.EncodeObjectManagedFields(newObj, managed); err != nil {
if err = internal.EncodeObjectManagedFields(object, managed); err != nil {
return nil, fmt.Errorf("failed to encode managed fields: %v", err)
}
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
}
f.objectDefaulter.Default(newObjVersioned)
newObjUnversioned, err := f.toUnversioned(newObjVersioned)
if err != nil {
return nil, fmt.Errorf("failed to convert to unversioned: %v", err)
}
return newObjUnversioned, nil
}
func (f *fieldManager) toVersioned(obj runtime.Object) (runtime.Object, error) {
return f.objectConverter.ConvertToVersion(obj, f.groupVersion)
}
func (f *fieldManager) toUnversioned(obj runtime.Object) (runtime.Object, error) {
return f.objectConverter.ConvertToVersion(obj, f.hubVersion)
}
func (f *fieldManager) buildManagerInfo(prefix string, operation metav1.ManagedFieldsOperationType) (string, error) {
managerInfo := metav1.ManagedFieldsEntry{
Manager: prefix,
Operation: operation,
APIVersion: f.groupVersion.String(),
}
if managerInfo.Manager == "" {
managerInfo.Manager = "unknown"
}
return internal.BuildManagerIdentifier(&managerInfo)
}
// stripSet is the list of fields that should never be part of a mangedFields.
var stripSet = fieldpath.NewSet(
fieldpath.MakePathOrDie("apiVersion"),
fieldpath.MakePathOrDie("kind"),
fieldpath.MakePathOrDie("metadata"),
fieldpath.MakePathOrDie("metadata", "name"),
fieldpath.MakePathOrDie("metadata", "namespace"),
fieldpath.MakePathOrDie("metadata", "creationTimestamp"),
fieldpath.MakePathOrDie("metadata", "selfLink"),
fieldpath.MakePathOrDie("metadata", "uid"),
fieldpath.MakePathOrDie("metadata", "clusterName"),
fieldpath.MakePathOrDie("metadata", "generation"),
fieldpath.MakePathOrDie("metadata", "managedFields"),
fieldpath.MakePathOrDie("metadata", "resourceVersion"),
)
// stripFields removes a predefined set of paths found in typed from managed and returns the updated ManagedFields
func (f *fieldManager) stripFields(managed fieldpath.ManagedFields, manager string) fieldpath.ManagedFields {
vs, ok := managed[manager]
if ok {
if vs == nil {
panic(fmt.Sprintf("Found unexpected nil manager which should never happen: %s", manager))
}
newSet := vs.Set().Difference(stripSet)
if newSet.Empty() {
delete(managed, manager)
} else {
managed[manager] = fieldpath.NewVersionedSet(newSet, vs.APIVersion(), vs.Applied())
}
}
return managed
return object, nil
}

View File

@ -65,7 +65,7 @@ type fakeObjectDefaulter struct{}
func (d *fakeObjectDefaulter) Default(in runtime.Object) {}
type TestFieldManager struct {
fieldManager fieldmanager.FieldManager
fieldManager fieldmanager.Manager
emptyObj runtime.Object
liveObj runtime.Object
}
@ -80,7 +80,7 @@ func NewTestFieldManager(gvk schema.GroupVersionKind) TestFieldManager {
panic(err)
}
f, err := fieldmanager.NewFieldManager(
f, err := fieldmanager.NewStructuredMergeManager(
m,
&fakeObjectConvertor{},
&fakeObjectDefaulter{},
@ -93,6 +93,8 @@ func NewTestFieldManager(gvk schema.GroupVersionKind) TestFieldManager {
live := &unstructured.Unstructured{}
live.SetKind(gvk.Kind)
live.SetAPIVersion(gvk.GroupVersion().String())
f = fieldmanager.NewStripMetaManager(f)
f = fieldmanager.NewBuildManagerInfoManager(f, gvk.GroupVersion())
return TestFieldManager{
fieldManager: f,
emptyObj: live,
@ -105,7 +107,7 @@ func (f *TestFieldManager) Reset() {
}
func (f *TestFieldManager) Apply(obj []byte, manager string, force bool) error {
out, err := f.fieldManager.Apply(f.liveObj, obj, manager, force)
out, err := fieldmanager.NewFieldManager(f.fieldManager).Apply(f.liveObj, obj, manager, force)
if err == nil {
f.liveObj = out
}
@ -113,7 +115,7 @@ func (f *TestFieldManager) Apply(obj []byte, manager string, force bool) error {
}
func (f *TestFieldManager) Update(obj runtime.Object, manager string) error {
out, err := f.fieldManager.Update(f.liveObj, obj, manager)
out, err := fieldmanager.NewFieldManager(f.fieldManager).Update(f.liveObj, obj, manager)
if err == nil {
f.liveObj = out
}

View File

@ -28,10 +28,38 @@ import (
"sigs.k8s.io/structured-merge-diff/fieldpath"
)
// Managed groups a fieldpath.ManagedFields together with the timestamps associated with each operation.
type Managed struct {
Fields fieldpath.ManagedFields
Times map[string]*metav1.Time
// ManagedInterface groups a fieldpath.ManagedFields together with the timestamps associated with each operation.
type ManagedInterface interface {
// Fields gets the fieldpath.ManagedFields.
Fields() fieldpath.ManagedFields
// Times gets the timestamps associated with each operation.
Times() map[string]*metav1.Time
}
type managedStruct struct {
fields fieldpath.ManagedFields
times map[string]*metav1.Time
}
var _ ManagedInterface = &managedStruct{}
// Fields implements ManagedInterface.
func (m *managedStruct) Fields() fieldpath.ManagedFields {
return m.fields
}
// Times implements ManagedInterface.
func (m *managedStruct) Times() map[string]*metav1.Time {
return m.times
}
// NewManaged creates a ManagedInterface from a fieldpath.ManagedFields and the timestamps associated with each operation.
func NewManaged(f fieldpath.ManagedFields, t map[string]*metav1.Time) ManagedInterface {
return &managedStruct{
fields: f,
times: t,
}
}
// RemoveObjectManagedFields removes the ManagedFields from the object
@ -46,9 +74,9 @@ func RemoveObjectManagedFields(obj runtime.Object) {
}
// DecodeObjectManagedFields extracts and converts the objects ManagedFields into a fieldpath.ManagedFields.
func DecodeObjectManagedFields(from runtime.Object) (Managed, error) {
func DecodeObjectManagedFields(from runtime.Object) (ManagedInterface, error) {
if from == nil {
return Managed{}, nil
return &managedStruct{}, nil
}
accessor, err := meta.Accessor(from)
if err != nil {
@ -57,13 +85,13 @@ func DecodeObjectManagedFields(from runtime.Object) (Managed, error) {
managed, err := decodeManagedFields(accessor.GetManagedFields())
if err != nil {
return Managed{}, fmt.Errorf("failed to convert managed fields from API: %v", err)
return nil, fmt.Errorf("failed to convert managed fields from API: %v", err)
}
return managed, err
return &managed, nil
}
// EncodeObjectManagedFields converts and stores the fieldpathManagedFields into the objects ManagedFields
func EncodeObjectManagedFields(obj runtime.Object, managed Managed) error {
func EncodeObjectManagedFields(obj runtime.Object, managed ManagedInterface) error {
accessor, err := meta.Accessor(obj)
if err != nil {
panic(fmt.Sprintf("couldn't get accessor: %v", err))
@ -80,19 +108,19 @@ func EncodeObjectManagedFields(obj runtime.Object, managed Managed) error {
// decodeManagedFields converts ManagedFields from the wire format (api format)
// to the format used by sigs.k8s.io/structured-merge-diff
func decodeManagedFields(encodedManagedFields []metav1.ManagedFieldsEntry) (managed Managed, err error) {
managed.Fields = make(fieldpath.ManagedFields, len(encodedManagedFields))
managed.Times = make(map[string]*metav1.Time, len(encodedManagedFields))
func decodeManagedFields(encodedManagedFields []metav1.ManagedFieldsEntry) (managed managedStruct, err error) {
managed.fields = make(fieldpath.ManagedFields, len(encodedManagedFields))
managed.times = make(map[string]*metav1.Time, len(encodedManagedFields))
for _, encodedVersionedSet := range encodedManagedFields {
manager, err := BuildManagerIdentifier(&encodedVersionedSet)
if err != nil {
return Managed{}, fmt.Errorf("error decoding manager from %v: %v", encodedVersionedSet, err)
return managedStruct{}, fmt.Errorf("error decoding manager from %v: %v", encodedVersionedSet, err)
}
managed.Fields[manager], err = decodeVersionedSet(&encodedVersionedSet)
managed.fields[manager], err = decodeVersionedSet(&encodedVersionedSet)
if err != nil {
return Managed{}, fmt.Errorf("error decoding versioned set from %v: %v", encodedVersionedSet, err)
return managedStruct{}, fmt.Errorf("error decoding versioned set from %v: %v", encodedVersionedSet, err)
}
managed.Times[manager] = encodedVersionedSet.Time
managed.times[manager] = encodedVersionedSet.Time
}
return managed, nil
}
@ -139,15 +167,18 @@ func decodeVersionedSet(encodedVersionedSet *metav1.ManagedFieldsEntry) (version
// encodeManagedFields converts ManagedFields from the format used by
// sigs.k8s.io/structured-merge-diff to the wire format (api format)
func encodeManagedFields(managed Managed) (encodedManagedFields []metav1.ManagedFieldsEntry, err error) {
func encodeManagedFields(managed ManagedInterface) (encodedManagedFields []metav1.ManagedFieldsEntry, err error) {
if len(managed.Fields()) == 0 {
return nil, nil
}
encodedManagedFields = []metav1.ManagedFieldsEntry{}
for manager := range managed.Fields {
versionedSet := managed.Fields[manager]
for manager := range managed.Fields() {
versionedSet := managed.Fields()[manager]
v, err := encodeManagerVersionedSet(manager, versionedSet)
if err != nil {
return nil, fmt.Errorf("error encoding versioned set for %v: %v", manager, err)
}
if t, ok := managed.Times[manager]; ok {
if t, ok := managed.Times()[manager]; ok {
v.Time = t
}
encodedManagedFields = append(encodedManagedFields, *v)
@ -174,7 +205,10 @@ func sortEncodedManagedFields(encodedManagedFields []metav1.ManagedFieldsEntry)
return p.Time.Before(q.Time)
}
return p.Manager < q.Manager
if p.Manager != q.Manager {
return p.Manager < q.Manager
}
return p.APIVersion < q.APIVersion
})
return encodedManagedFields, nil

View File

@ -31,6 +31,8 @@ import (
// (api format) to the format used by sigs.k8s.io/structured-merge-diff and back
func TestRoundTripManagedFields(t *testing.T) {
tests := []string{
`null
`,
`- apiVersion: v1
fieldsType: FieldsV1
fieldsV1:
@ -146,7 +148,7 @@ func TestRoundTripManagedFields(t *testing.T) {
if err != nil {
t.Fatalf("did not expect decoding error but got: %v", err)
}
encoded, err := encodeManagedFields(decoded)
encoded, err := encodeManagedFields(&decoded)
if err != nil {
t.Fatalf("did not expect encoding error but got: %v", err)
}

View File

@ -19,61 +19,48 @@ package fieldmanager
import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type skipNonAppliedManager struct {
fieldManager FieldManager
objectCreater runtime.ObjectCreater
gvk schema.GroupVersionKind
fieldManager Manager
objectCreater runtime.ObjectCreater
gvk schema.GroupVersionKind
beforeApplyManagerName string
}
var _ FieldManager = &skipNonAppliedManager{}
var _ Manager = &skipNonAppliedManager{}
// NewSkipNonAppliedManager creates a new wrapped FieldManager that only starts tracking managers after the first apply
func NewSkipNonAppliedManager(fieldManager FieldManager, objectCreater runtime.ObjectCreater, gvk schema.GroupVersionKind) FieldManager {
// NewSkipNonAppliedManager creates a new wrapped FieldManager that only starts tracking managers after the first apply.
func NewSkipNonAppliedManager(fieldManager Manager, objectCreater runtime.ObjectCreater, gvk schema.GroupVersionKind) Manager {
return &skipNonAppliedManager{
fieldManager: fieldManager,
objectCreater: objectCreater,
gvk: gvk,
fieldManager: fieldManager,
objectCreater: objectCreater,
gvk: gvk,
beforeApplyManagerName: "before-first-apply",
}
}
// Update implements FieldManager.
func (f *skipNonAppliedManager) Update(liveObj, newObj runtime.Object, manager string) (runtime.Object, error) {
liveObjAccessor, err := meta.Accessor(liveObj)
if err != nil {
return newObj, nil
// Update implements Manager.
func (f *skipNonAppliedManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
if len(managed.Fields()) == 0 {
return newObj, managed, nil
}
newObjAccessor, err := meta.Accessor(newObj)
if err != nil {
return newObj, nil
}
if len(liveObjAccessor.GetManagedFields()) == 0 && len(newObjAccessor.GetManagedFields()) == 0 {
return newObj, nil
}
return f.fieldManager.Update(liveObj, newObj, manager)
return f.fieldManager.Update(liveObj, newObj, managed, manager)
}
// Apply implements FieldManager.
func (f *skipNonAppliedManager) Apply(liveObj runtime.Object, patch []byte, fieldManager string, force bool) (runtime.Object, error) {
liveObjAccessor, err := meta.Accessor(liveObj)
if err != nil {
return nil, fmt.Errorf("couldn't get accessor: %v", err)
}
if len(liveObjAccessor.GetManagedFields()) == 0 {
// Apply implements Manager.
func (f *skipNonAppliedManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, fieldManager string, force bool) (runtime.Object, Managed, error) {
if len(managed.Fields()) == 0 {
emptyObj, err := f.objectCreater.New(f.gvk)
if err != nil {
return nil, fmt.Errorf("failed to create empty object of type %v: %v", f.gvk, err)
return nil, nil, fmt.Errorf("failed to create empty object of type %v: %v", f.gvk, err)
}
liveObj, err = f.fieldManager.Update(emptyObj, liveObj, "before-first-apply")
liveObj, managed, err = f.fieldManager.Update(emptyObj, liveObj, managed, f.beforeApplyManagerName)
if err != nil {
return nil, fmt.Errorf("failed to create manager for existing fields: %v", err)
return nil, nil, fmt.Errorf("failed to create manager for existing fields: %v", err)
}
}
return f.fieldManager.Apply(liveObj, patch, fieldManager, force)
return f.fieldManager.Apply(liveObj, patch, managed, fieldManager, force)
}

View File

@ -0,0 +1,90 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/structured-merge-diff/fieldpath"
)
type stripMetaManager struct {
fieldManager Manager
// stripSet is the list of fields that should never be part of a mangedFields.
stripSet *fieldpath.Set
}
var _ Manager = &stripMetaManager{}
// NewStripMetaManager creates a new Manager that strips metadata and typemeta fields from the manager's fieldset.
func NewStripMetaManager(fieldManager Manager) Manager {
return &stripMetaManager{
fieldManager: fieldManager,
stripSet: fieldpath.NewSet(
fieldpath.MakePathOrDie("apiVersion"),
fieldpath.MakePathOrDie("kind"),
fieldpath.MakePathOrDie("metadata"),
fieldpath.MakePathOrDie("metadata", "name"),
fieldpath.MakePathOrDie("metadata", "namespace"),
fieldpath.MakePathOrDie("metadata", "creationTimestamp"),
fieldpath.MakePathOrDie("metadata", "selfLink"),
fieldpath.MakePathOrDie("metadata", "uid"),
fieldpath.MakePathOrDie("metadata", "clusterName"),
fieldpath.MakePathOrDie("metadata", "generation"),
fieldpath.MakePathOrDie("metadata", "managedFields"),
fieldpath.MakePathOrDie("metadata", "resourceVersion"),
),
}
}
// Update implements Manager.
func (f *stripMetaManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
newObj, managed, err := f.fieldManager.Update(liveObj, newObj, managed, manager)
if err != nil {
return nil, nil, err
}
f.stripFields(managed.Fields(), manager)
return newObj, managed, nil
}
// Apply implements Manager.
func (f *stripMetaManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, manager string, force bool) (runtime.Object, Managed, error) {
newObj, managed, err := f.fieldManager.Apply(liveObj, patch, managed, manager, force)
if err != nil {
return nil, nil, err
}
f.stripFields(managed.Fields(), manager)
return newObj, managed, nil
}
// stripFields removes a predefined set of paths found in typed from managed
func (f *stripMetaManager) stripFields(managed fieldpath.ManagedFields, manager string) {
vs, ok := managed[manager]
if ok {
if vs == nil {
panic(fmt.Sprintf("Found unexpected nil manager which should never happen: %s", manager))
}
newSet := vs.Set().Difference(f.stripSet)
if newSet.Empty() {
delete(managed, manager)
} else {
managed[manager] = fieldpath.NewVersionedSet(newSet, vs.APIVersion(), vs.Applied())
}
}
}

View File

@ -0,0 +1,209 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
"k8s.io/klog"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/structured-merge-diff/fieldpath"
"sigs.k8s.io/structured-merge-diff/merge"
"sigs.k8s.io/yaml"
)
type structuredMergeManager struct {
typeConverter internal.TypeConverter
objectConverter runtime.ObjectConvertor
objectDefaulter runtime.ObjectDefaulter
groupVersion schema.GroupVersion
hubVersion schema.GroupVersion
updater merge.Updater
}
var _ Manager = &structuredMergeManager{}
// NewStructuredMergeManager creates a new Manager that merges apply requests
// and update managed fields for other types of requests.
func NewStructuredMergeManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion) (Manager, error) {
typeConverter, err := internal.NewTypeConverter(models, false)
if err != nil {
return nil, err
}
return &structuredMergeManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
objectDefaulter: objectDefaulter,
groupVersion: gv,
hubVersion: hub,
updater: merge.Updater{
Converter: internal.NewVersionConverter(typeConverter, objectConverter, hub),
},
}, nil
}
// NewCRDStructuredMergeManager creates a new Manager specifically for
// CRDs. This allows for the possibility of fields which are not defined
// in models, as well as having no models defined at all.
func NewCRDStructuredMergeManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion, preserveUnknownFields bool) (_ Manager, err error) {
var typeConverter internal.TypeConverter = internal.DeducedTypeConverter{}
if models != nil {
typeConverter, err = internal.NewTypeConverter(models, preserveUnknownFields)
if err != nil {
return nil, err
}
}
return &structuredMergeManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
objectDefaulter: objectDefaulter,
groupVersion: gv,
hubVersion: hub,
updater: merge.Updater{
Converter: internal.NewCRDVersionConverter(typeConverter, objectConverter, hub),
},
}, nil
}
// Update implements Manager.
func (f *structuredMergeManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert live object to proper version: %v", err)
}
newObjTyped, err := f.typeConverter.ObjectToTyped(newObjVersioned)
if err != nil {
// Return newObj and just by-pass fields update. This really shouldn't happen.
klog.Errorf("[SHOULD NOT HAPPEN] failed to create typed new object: %v", err)
return newObj, managed, nil
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
// Return newObj and just by-pass fields update. This really shouldn't happen.
klog.Errorf("[SHOULD NOT HAPPEN] failed to create typed live object: %v", err)
return newObj, managed, nil
}
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
// TODO(apelisse) use the first return value when unions are implemented
self := "current-operation"
_, managedFields, err := f.updater.Update(liveObjTyped, newObjTyped, apiVersion, managed.Fields(), self)
if err != nil {
return nil, nil, fmt.Errorf("failed to update ManagedFields: %v", err)
}
managed = internal.NewManaged(managedFields, managed.Times())
// If the current operation took any fields from anything, it means the object changed,
// so update the timestamp of the managedFieldsEntry and merge with any previous updates from the same manager
if vs, ok := managed.Fields()[self]; ok {
delete(managed.Fields(), self)
managed.Times()[manager] = &metav1.Time{Time: time.Now().UTC()}
if previous, ok := managed.Fields()[manager]; ok {
managed.Fields()[manager] = fieldpath.NewVersionedSet(vs.Set().Union(previous.Set()), vs.APIVersion(), vs.Applied())
} else {
managed.Fields()[manager] = vs
}
}
return newObj, managed, nil
}
// Apply implements Manager.
func (f *structuredMergeManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, manager string, force bool) (runtime.Object, Managed, error) {
patchObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(patch, &patchObj.Object); err != nil {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("error decoding YAML: %v", err))
}
// Check that the patch object has the same version as the live object
if patchObj.GetAPIVersion() != f.groupVersion.String() {
return nil, nil,
errors.NewBadRequest(
fmt.Sprintf("Incorrect version specified in apply patch. "+
"Specified patch version: %s, expected: %s",
patchObj.GetAPIVersion(), f.groupVersion.String()))
}
if patchObj.GetManagedFields() != nil {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("metadata.managedFields must be nil"))
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert live object to proper version: %v", err)
}
patchObjTyped, err := f.typeConverter.ObjectToTyped(patchObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to create typed patch object: %v", err)
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
return nil, nil, fmt.Errorf("failed to create typed live object: %v", err)
}
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
newObjTyped, managedFields, err := f.updater.Apply(liveObjTyped, patchObjTyped, apiVersion, managed.Fields(), manager, force)
if err != nil {
if conflicts, ok := err.(merge.Conflicts); ok {
return nil, nil, internal.NewConflictError(conflicts)
}
return nil, nil, err
}
managed = internal.NewManaged(managedFields, managed.Times())
// Update the time in the managedFieldsEntry for this operation
managed.Times()[manager] = &metav1.Time{Time: time.Now().UTC()}
newObj, err := f.typeConverter.TypedToObject(newObjTyped)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert new typed object to object: %v", err)
}
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
}
f.objectDefaulter.Default(newObjVersioned)
newObjUnversioned, err := f.toUnversioned(newObjVersioned)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert to unversioned: %v", err)
}
return newObjUnversioned, managed, nil
}
func (f *structuredMergeManager) toVersioned(obj runtime.Object) (runtime.Object, error) {
return f.objectConverter.ConvertToVersion(obj, f.groupVersion)
}
func (f *structuredMergeManager) toUnversioned(obj runtime.Object) (runtime.Object, error) {
return f.objectConverter.ConvertToVersion(obj, f.hubVersion)
}

View File

@ -296,7 +296,7 @@ type patchMechanism interface {
type jsonPatcher struct {
*patcher
fieldManager fieldmanager.FieldManager
fieldManager *fieldmanager.FieldManager
}
func (p *jsonPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) {
@ -382,7 +382,7 @@ type smpPatcher struct {
// Schema
schemaReferenceObj runtime.Object
fieldManager fieldmanager.FieldManager
fieldManager *fieldmanager.FieldManager
}
func (p *smpPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) {
@ -422,7 +422,7 @@ type applyPatcher struct {
options *metav1.PatchOptions
creater runtime.ObjectCreater
kind schema.GroupVersionKind
fieldManager fieldmanager.FieldManager
fieldManager *fieldmanager.FieldManager
}
func (p *applyPatcher) applyPatchToCurrentObject(obj runtime.Object) (runtime.Object, error) {

View File

@ -67,7 +67,7 @@ type RequestScope struct {
EquivalentResourceMapper runtime.EquivalentResourceMapper
TableConvertor rest.TableConvertor
FieldManager fieldmanager.FieldManager
FieldManager *fieldmanager.FieldManager
Resource schema.GroupVersionResource
Kind schema.GroupVersionKind

View File

@ -550,18 +550,17 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
reqScope.MetaGroupVersion = *a.group.MetaGroupVersion
}
if a.group.OpenAPIModels != nil && utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) {
fm, err := fieldmanager.NewFieldManager(
reqScope.FieldManager, err = fieldmanager.NewDefaultFieldManager(
a.group.OpenAPIModels,
a.group.UnsafeConvertor,
a.group.Defaulter,
fqKindToRegister.GroupVersion(),
a.group.Creater,
fqKindToRegister,
reqScope.HubGroupVersion,
)
if err != nil {
return nil, fmt.Errorf("failed to create field manager: %v", err)
}
fm = fieldmanager.NewSkipNonAppliedManager(fm, a.group.Creater, fqKindToRegister)
reqScope.FieldManager = fm
}
for _, action := range actions {
producedObject := storageMeta.ProducesObject(action.Verb)