Make the decode function respect the timeout context
Kubernetes-commit: f78b367db6393a449b8f456e725cbe155d9b90e6
This commit is contained in:
parent
a454d5acfe
commit
be73f76247
|
@ -33,6 +33,7 @@ const (
|
|||
ErrCodeResourceVersionConflicts
|
||||
ErrCodeInvalidObj
|
||||
ErrCodeUnreachable
|
||||
ErrCodeTimeout
|
||||
)
|
||||
|
||||
var errCodeToMessage = map[int]string{
|
||||
|
@ -41,6 +42,7 @@ var errCodeToMessage = map[int]string{
|
|||
ErrCodeResourceVersionConflicts: "resource version conflicts",
|
||||
ErrCodeInvalidObj: "invalid object",
|
||||
ErrCodeUnreachable: "server unreachable",
|
||||
ErrCodeTimeout: "request timeout",
|
||||
}
|
||||
|
||||
func NewKeyNotFoundError(key string, rv int64) *StorageError {
|
||||
|
@ -75,6 +77,14 @@ func NewUnreachableError(key string, rv int64) *StorageError {
|
|||
}
|
||||
}
|
||||
|
||||
func NewTimeoutError(key, msg string) *StorageError {
|
||||
return &StorageError{
|
||||
Code: ErrCodeTimeout,
|
||||
Key: key,
|
||||
AdditionalErrorMsg: msg,
|
||||
}
|
||||
}
|
||||
|
||||
func NewInvalidObjError(key, msg string) *StorageError {
|
||||
return &StorageError{
|
||||
Code: ErrCodeInvalidObj,
|
||||
|
@ -115,6 +125,11 @@ func IsConflict(err error) bool {
|
|||
return isErrCode(err, ErrCodeResourceVersionConflicts)
|
||||
}
|
||||
|
||||
// IsRequestTimeout returns true if and only if err indicates that the request has timed out.
|
||||
func IsRequestTimeout(err error) bool {
|
||||
return isErrCode(err, ErrCodeTimeout)
|
||||
}
|
||||
|
||||
// IsInvalidObj returns true if and only if err is invalid error
|
||||
func IsInvalidObj(err error) bool {
|
||||
return isErrCode(err, ErrCodeInvalidObj)
|
||||
|
|
|
@ -28,7 +28,7 @@ func InterpretListError(err error, qualifiedResource schema.GroupResource) error
|
|||
switch {
|
||||
case storage.IsNotFound(err):
|
||||
return errors.NewNotFound(qualifiedResource, "")
|
||||
case storage.IsUnreachable(err):
|
||||
case storage.IsUnreachable(err), storage.IsRequestTimeout(err):
|
||||
return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level
|
||||
case storage.IsInternalError(err):
|
||||
return errors.NewInternalError(err)
|
||||
|
|
|
@ -739,6 +739,14 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
|||
return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
|
||||
}
|
||||
|
||||
// Check if the request has already timed out before decode object
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// parent context is canceled or timed out, no point in continuing
|
||||
return storage.NewTimeoutError(string(kv.Key), "request did not complete within requested timeout")
|
||||
default:
|
||||
}
|
||||
|
||||
if err := appendListItem(v, data, uint64(kv.ModRevision), opts.Predicate, s.codec, s.versioner, newItemFunc); err != nil {
|
||||
recordDecodeError(s.groupResourceString, string(kv.Key))
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue