From be73f762474052a08c6a44c3e6283fdf08c14281 Mon Sep 17 00:00:00 2001 From: HirazawaUi <695097494plus@gmail.com> Date: Wed, 25 Oct 2023 23:12:16 +0800 Subject: [PATCH] Make the decode function respect the timeout context Kubernetes-commit: f78b367db6393a449b8f456e725cbe155d9b90e6 --- pkg/storage/errors.go | 15 +++++++++++++++ pkg/storage/errors/storage.go | 2 +- pkg/storage/etcd3/store.go | 8 ++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pkg/storage/errors.go b/pkg/storage/errors.go index 5f29097c5..e7e095748 100644 --- a/pkg/storage/errors.go +++ b/pkg/storage/errors.go @@ -33,6 +33,7 @@ const ( ErrCodeResourceVersionConflicts ErrCodeInvalidObj ErrCodeUnreachable + ErrCodeTimeout ) var errCodeToMessage = map[int]string{ @@ -41,6 +42,7 @@ var errCodeToMessage = map[int]string{ ErrCodeResourceVersionConflicts: "resource version conflicts", ErrCodeInvalidObj: "invalid object", ErrCodeUnreachable: "server unreachable", + ErrCodeTimeout: "request timeout", } func NewKeyNotFoundError(key string, rv int64) *StorageError { @@ -75,6 +77,14 @@ func NewUnreachableError(key string, rv int64) *StorageError { } } +func NewTimeoutError(key, msg string) *StorageError { + return &StorageError{ + Code: ErrCodeTimeout, + Key: key, + AdditionalErrorMsg: msg, + } +} + func NewInvalidObjError(key, msg string) *StorageError { return &StorageError{ Code: ErrCodeInvalidObj, @@ -115,6 +125,11 @@ func IsConflict(err error) bool { return isErrCode(err, ErrCodeResourceVersionConflicts) } +// IsRequestTimeout returns true if and only if err indicates that the request has timed out. +func IsRequestTimeout(err error) bool { + return isErrCode(err, ErrCodeTimeout) +} + // IsInvalidObj returns true if and only if err is invalid error func IsInvalidObj(err error) bool { return isErrCode(err, ErrCodeInvalidObj) diff --git a/pkg/storage/errors/storage.go b/pkg/storage/errors/storage.go index 89f345398..60a6d5cd8 100644 --- a/pkg/storage/errors/storage.go +++ b/pkg/storage/errors/storage.go @@ -28,7 +28,7 @@ func InterpretListError(err error, qualifiedResource schema.GroupResource) error switch { case storage.IsNotFound(err): return errors.NewNotFound(qualifiedResource, "") - case storage.IsUnreachable(err): + case storage.IsUnreachable(err), storage.IsRequestTimeout(err): return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level case storage.IsInternalError(err): return errors.NewInternalError(err) diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 9c52ce17e..e96efccce 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -739,6 +739,14 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err) } + // Check if the request has already timed out before decode object + select { + case <-ctx.Done(): + // parent context is canceled or timed out, no point in continuing + return storage.NewTimeoutError(string(kv.Key), "request did not complete within requested timeout") + default: + } + if err := appendListItem(v, data, uint64(kv.ModRevision), opts.Predicate, s.codec, s.versioner, newItemFunc); err != nil { recordDecodeError(s.groupResourceString, string(kv.Key)) return err