Don't save managedFields if object is too large
Kubernetes-commit: ccd9e4e2de32b8708f3a7be159f7a4316449c433
This commit is contained in:
parent
5f79f6fd39
commit
a87d964ed1
|
@ -27,6 +27,7 @@ import (
|
|||
"unicode/utf8"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
|
||||
|
@ -139,6 +140,16 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
|
|||
}
|
||||
|
||||
trace.Step("About to store object in database")
|
||||
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
|
||||
requestFunc := func() (runtime.Object, error) {
|
||||
return r.Create(
|
||||
ctx,
|
||||
name,
|
||||
obj,
|
||||
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
|
||||
options,
|
||||
)
|
||||
}
|
||||
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
||||
if scope.FieldManager != nil {
|
||||
liveObj, err := scope.Creater.New(scope.Kind)
|
||||
|
@ -150,20 +161,21 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
|
|||
return nil, fmt.Errorf("failed to update object (Create for %v) managed fields: %v", scope.Kind, err)
|
||||
}
|
||||
}
|
||||
|
||||
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
|
||||
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
|
||||
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return r.Create(
|
||||
ctx,
|
||||
name,
|
||||
obj,
|
||||
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
|
||||
options,
|
||||
)
|
||||
result, err := requestFunc()
|
||||
// If the object wasn't committed to storage because it's serialized size was too large,
|
||||
// it is safe to remove managedFields (which can be large) and try again.
|
||||
if isTooLargeError(err) {
|
||||
if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
|
||||
accessor.SetManagedFields(nil)
|
||||
result, err = requestFunc()
|
||||
}
|
||||
}
|
||||
return result, err
|
||||
})
|
||||
if err != nil {
|
||||
scope.err(err, w, req)
|
||||
|
|
|
@ -581,12 +581,28 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti
|
|||
|
||||
wasCreated := false
|
||||
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission)
|
||||
result, err := finishRequest(p.timeout, func() (runtime.Object, error) {
|
||||
requestFunc := func() (runtime.Object, error) {
|
||||
// Pass in UpdateOptions to override UpdateStrategy.AllowUpdateOnCreate
|
||||
options := patchToUpdateOptions(p.options)
|
||||
updateObject, created, updateErr := p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation, p.forceAllowCreate, options)
|
||||
wasCreated = created
|
||||
return updateObject, updateErr
|
||||
}
|
||||
result, err := finishRequest(p.timeout, func() (runtime.Object, error) {
|
||||
result, err := requestFunc()
|
||||
// If the object wasn't committed to storage because it's serialized size was too large,
|
||||
// it is safe to remove managedFields (which can be large) and try again.
|
||||
if isTooLargeError(err) && p.patchType != types.ApplyPatchType {
|
||||
if _, accessorErr := meta.Accessor(p.restPatcher.New()); accessorErr == nil {
|
||||
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission, func(_ context.Context, obj, _ runtime.Object) (runtime.Object, error) {
|
||||
accessor, _ := meta.Accessor(obj)
|
||||
accessor.SetManagedFields(nil)
|
||||
return obj, nil
|
||||
})
|
||||
result, err = requestFunc()
|
||||
}
|
||||
}
|
||||
return result, err
|
||||
})
|
||||
return result, wasCreated, err
|
||||
}
|
||||
|
|
|
@ -25,8 +25,12 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
goruntime "runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
grpccodes "google.golang.org/grpc/codes"
|
||||
grpcstatus "google.golang.org/grpc/status"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
@ -416,3 +420,28 @@ func parseTimeout(str string) time.Duration {
|
|||
func isDryRun(url *url.URL) bool {
|
||||
return len(url.Query()["dryRun"]) != 0
|
||||
}
|
||||
|
||||
type etcdError interface {
|
||||
Code() grpccodes.Code
|
||||
Error() string
|
||||
}
|
||||
|
||||
type grpcError interface {
|
||||
GRPCStatus() *grpcstatus.Status
|
||||
}
|
||||
|
||||
func isTooLargeError(err error) bool {
|
||||
if err != nil {
|
||||
if etcdErr, ok := err.(etcdError); ok {
|
||||
if etcdErr.Code() == grpccodes.InvalidArgument && etcdErr.Error() == "etcdserver: request is too large" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
if grpcErr, ok := err.(grpcError); ok {
|
||||
if grpcErr.GRPCStatus().Code() == grpccodes.ResourceExhausted && strings.Contains(grpcErr.GRPCStatus().Message(), "trying to send message larger than max") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
|
||||
|
@ -124,15 +125,22 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
|
|||
|
||||
userInfo, _ := request.UserFrom(ctx)
|
||||
transformers := []rest.TransformFunc{}
|
||||
|
||||
// allows skipping managedFields update if the resulting object is too big
|
||||
shouldUpdateManagedFields := true
|
||||
if scope.FieldManager != nil {
|
||||
transformers = append(transformers, func(_ context.Context, newObj, liveObj runtime.Object) (runtime.Object, error) {
|
||||
obj, err := scope.FieldManager.Update(liveObj, newObj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to update object (Update for %v) managed fields: %v", scope.Kind, err)
|
||||
if shouldUpdateManagedFields {
|
||||
obj, err := scope.FieldManager.Update(liveObj, newObj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to update object (Update for %v) managed fields: %v", scope.Kind, err)
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
return obj, nil
|
||||
return newObj, nil
|
||||
})
|
||||
}
|
||||
|
||||
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok {
|
||||
transformers = append(transformers, func(ctx context.Context, newObj, oldObj runtime.Object) (runtime.Object, error) {
|
||||
isNotZeroObject, err := hasUID(oldObj)
|
||||
|
@ -149,7 +157,6 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
|
|||
}
|
||||
return newObj, nil
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
createAuthorizerAttributes := authorizer.AttributesRecord{
|
||||
|
@ -167,7 +174,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
|
|||
|
||||
trace.Step("About to store object in database")
|
||||
wasCreated := false
|
||||
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
||||
requestFunc := func() (runtime.Object, error) {
|
||||
obj, created, err := r.Update(
|
||||
ctx,
|
||||
name,
|
||||
|
@ -184,6 +191,19 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
|
|||
)
|
||||
wasCreated = created
|
||||
return obj, err
|
||||
}
|
||||
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
||||
result, err := requestFunc()
|
||||
// If the object wasn't committed to storage because it's serialized size was too large,
|
||||
// it is safe to remove managedFields (which can be large) and try again.
|
||||
if isTooLargeError(err) && scope.FieldManager != nil {
|
||||
if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
|
||||
accessor.SetManagedFields(nil)
|
||||
shouldUpdateManagedFields = false
|
||||
result, err = requestFunc()
|
||||
}
|
||||
}
|
||||
return result, err
|
||||
})
|
||||
if err != nil {
|
||||
scope.err(err, w, req)
|
||||
|
|
Loading…
Reference in New Issue