Merge pull request #116393 from liggitt/etcd-cancel-error

Recognize etcd/grpc cancel errors correctly

Kubernetes-commit: 7fe0fb7fbfd3d6a8e07d6cc732d963767b2b0c58
This commit is contained in:
Kubernetes Publisher 2023-03-08 15:42:49 -08:00
commit 2fa0308197
3 changed files with 45 additions and 13 deletions

8
go.mod
View File

@ -42,9 +42,9 @@ require (
google.golang.org/protobuf v1.28.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/square/go-jose.v2 v2.6.0
k8s.io/api v0.0.0-20230307152047-2e0b62a70fa1
k8s.io/api v0.0.0-20230308234233-a4afee70a903
k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7
k8s.io/client-go v0.0.0-20230308234350-223d456ea214
k8s.io/client-go v0.0.0-20230309033544-64e2c7ff167c
k8s.io/component-base v0.0.0-20230308075123-cfc68dcaff73
k8s.io/klog/v2 v2.90.1
k8s.io/kms v0.0.0-20230304001132-5439f76ca4a7
@ -124,9 +124,9 @@ require (
)
replace (
k8s.io/api => k8s.io/api v0.0.0-20230307152047-2e0b62a70fa1
k8s.io/api => k8s.io/api v0.0.0-20230308234233-a4afee70a903
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7
k8s.io/client-go => k8s.io/client-go v0.0.0-20230308234350-223d456ea214
k8s.io/client-go => k8s.io/client-go v0.0.0-20230309033544-64e2c7ff167c
k8s.io/component-base => k8s.io/component-base v0.0.0-20230308075123-cfc68dcaff73
k8s.io/kms => k8s.io/kms v0.0.0-20230304001132-5439f76ca4a7
)

8
go.sum
View File

@ -874,12 +874,12 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20230307152047-2e0b62a70fa1 h1:tVDuZIySDyJ0DMWMaE+70gkf+zKqEB+zdqPhayjP1tA=
k8s.io/api v0.0.0-20230307152047-2e0b62a70fa1/go.mod h1:esKbT+6XB9TZUHyxlJVQ3zUM0abhQZ81Ic68eirO+xM=
k8s.io/api v0.0.0-20230308234233-a4afee70a903 h1:TmxUf1tDcGUHE8qZKLRWmn2nr2FypkwvqC2qviOUmQc=
k8s.io/api v0.0.0-20230308234233-a4afee70a903/go.mod h1:esKbT+6XB9TZUHyxlJVQ3zUM0abhQZ81Ic68eirO+xM=
k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7 h1:YN43Lvs3Pj9iQmuWGojeBiFdz1mkrxe0EZn7Ba3TMpQ=
k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7/go.mod h1:jlJwObMa4oKAEOMnAeEaqeiM+Fwd/CbAwNyQ7OaEwS0=
k8s.io/client-go v0.0.0-20230308234350-223d456ea214 h1:AI2D+LTtMmujHZcC4wTDKguy4j37Djz9FPCIOsXbWRc=
k8s.io/client-go v0.0.0-20230308234350-223d456ea214/go.mod h1:UclRvKUZ6gcvA8MCiJnYfr++APIeKcCuWCbm28kUwqI=
k8s.io/client-go v0.0.0-20230309033544-64e2c7ff167c h1:1IXuG9QQvPMR3GbYgBhOKre47MAIq+U41cWOGoAHpd8=
k8s.io/client-go v0.0.0-20230309033544-64e2c7ff167c/go.mod h1:hjEB5iFHr17qVb6wnh6w2LQvO5DfoP6rzLN8NAE8K6U=
k8s.io/component-base v0.0.0-20230308075123-cfc68dcaff73 h1:MEKvhkstqrRFmA9+qQlnkA/jPbZUH/VnMKiEfBeLbf8=
k8s.io/component-base v0.0.0-20230308075123-cfc68dcaff73/go.mod h1:MB0hQ6Wy3OOZ/dr+sy5FwxCJhDJ4hszX743ar8dd2zE=
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=

View File

@ -25,6 +25,9 @@ import (
"strings"
"sync"
grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -35,6 +38,7 @@ import (
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/klog/v2"
)
@ -152,6 +156,31 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
return wc
}
type etcdError interface {
Code() grpccodes.Code
Error() string
}
type grpcError interface {
GRPCStatus() *grpcstatus.Status
}
func isCancelError(err error) bool {
if err == nil {
return false
}
if err == context.Canceled {
return true
}
if etcdErr, ok := err.(etcdError); ok && etcdErr.Code() == grpccodes.Canceled {
return true
}
if grpcErr, ok := err.(grpcError); ok && grpcErr.GRPCStatus().Code() == grpccodes.Canceled {
return true
}
return false
}
func (wc *watchChan) run() {
watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)
@ -162,7 +191,7 @@ func (wc *watchChan) run() {
select {
case err := <-wc.errChan:
if err == context.Canceled {
if isCancelError(err) {
break
}
errResult := transformErrorToEvent(err)
@ -213,12 +242,15 @@ func (wc *watchChan) sync() error {
return nil
}
// logWatchChannelErr checks whether the error is about mvcc revision compaction which is regarded as warning
func logWatchChannelErr(err error) {
if !strings.Contains(err.Error(), "mvcc: required revision has been compacted") {
klog.Errorf("watch chan error: %v", err)
} else {
switch {
case strings.Contains(err.Error(), "mvcc: required revision has been compacted"):
// mvcc revision compaction which is regarded as warning, not error
klog.Warningf("watch chan error: %v", err)
case isCancelError(err):
// expected when watches close, no need to log
default:
klog.Errorf("watch chan error: %v", err)
}
}