diff --git a/go.mod b/go.mod index 18d589fe0..bff0607fb 100644 --- a/go.mod +++ b/go.mod @@ -42,9 +42,9 @@ require ( google.golang.org/protobuf v1.28.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/square/go-jose.v2 v2.6.0 - k8s.io/api v0.0.0-20230307152047-2e0b62a70fa1 + k8s.io/api v0.0.0-20230308234233-a4afee70a903 k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7 - k8s.io/client-go v0.0.0-20230308234350-223d456ea214 + k8s.io/client-go v0.0.0-20230309033544-64e2c7ff167c k8s.io/component-base v0.0.0-20230308075123-cfc68dcaff73 k8s.io/klog/v2 v2.90.1 k8s.io/kms v0.0.0-20230304001132-5439f76ca4a7 @@ -124,9 +124,9 @@ require ( ) replace ( - k8s.io/api => k8s.io/api v0.0.0-20230307152047-2e0b62a70fa1 + k8s.io/api => k8s.io/api v0.0.0-20230308234233-a4afee70a903 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7 - k8s.io/client-go => k8s.io/client-go v0.0.0-20230308234350-223d456ea214 + k8s.io/client-go => k8s.io/client-go v0.0.0-20230309033544-64e2c7ff167c k8s.io/component-base => k8s.io/component-base v0.0.0-20230308075123-cfc68dcaff73 k8s.io/kms => k8s.io/kms v0.0.0-20230304001132-5439f76ca4a7 ) diff --git a/go.sum b/go.sum index 5fc6774e9..39a2821b6 100644 --- a/go.sum +++ b/go.sum @@ -874,12 +874,12 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.0.0-20230307152047-2e0b62a70fa1 h1:tVDuZIySDyJ0DMWMaE+70gkf+zKqEB+zdqPhayjP1tA= -k8s.io/api v0.0.0-20230307152047-2e0b62a70fa1/go.mod h1:esKbT+6XB9TZUHyxlJVQ3zUM0abhQZ81Ic68eirO+xM= +k8s.io/api v0.0.0-20230308234233-a4afee70a903 h1:TmxUf1tDcGUHE8qZKLRWmn2nr2FypkwvqC2qviOUmQc= +k8s.io/api v0.0.0-20230308234233-a4afee70a903/go.mod h1:esKbT+6XB9TZUHyxlJVQ3zUM0abhQZ81Ic68eirO+xM= k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7 h1:YN43Lvs3Pj9iQmuWGojeBiFdz1mkrxe0EZn7Ba3TMpQ= k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7/go.mod h1:jlJwObMa4oKAEOMnAeEaqeiM+Fwd/CbAwNyQ7OaEwS0= -k8s.io/client-go v0.0.0-20230308234350-223d456ea214 h1:AI2D+LTtMmujHZcC4wTDKguy4j37Djz9FPCIOsXbWRc= -k8s.io/client-go v0.0.0-20230308234350-223d456ea214/go.mod h1:UclRvKUZ6gcvA8MCiJnYfr++APIeKcCuWCbm28kUwqI= +k8s.io/client-go v0.0.0-20230309033544-64e2c7ff167c h1:1IXuG9QQvPMR3GbYgBhOKre47MAIq+U41cWOGoAHpd8= +k8s.io/client-go v0.0.0-20230309033544-64e2c7ff167c/go.mod h1:hjEB5iFHr17qVb6wnh6w2LQvO5DfoP6rzLN8NAE8K6U= k8s.io/component-base v0.0.0-20230308075123-cfc68dcaff73 h1:MEKvhkstqrRFmA9+qQlnkA/jPbZUH/VnMKiEfBeLbf8= k8s.io/component-base v0.0.0-20230308075123-cfc68dcaff73/go.mod h1:MB0hQ6Wy3OOZ/dr+sy5FwxCJhDJ4hszX743ar8dd2zE= k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index af4992c93..49d9005fc 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -25,6 +25,9 @@ import ( "strings" "sync" + grpccodes "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -35,6 +38,7 @@ import ( utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" clientv3 "go.etcd.io/etcd/client/v3" + "k8s.io/klog/v2" ) @@ -152,6 +156,31 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re return wc } +type etcdError interface { + Code() grpccodes.Code + Error() string +} + +type grpcError interface { + GRPCStatus() *grpcstatus.Status +} + +func isCancelError(err error) bool { + if err == nil { + return false + } + if err == context.Canceled { + return true + } + if etcdErr, ok := err.(etcdError); ok && etcdErr.Code() == grpccodes.Canceled { + return true + } + if grpcErr, ok := err.(grpcError); ok && grpcErr.GRPCStatus().Code() == grpccodes.Canceled { + return true + } + return false +} + func (wc *watchChan) run() { watchClosedCh := make(chan struct{}) go wc.startWatching(watchClosedCh) @@ -162,7 +191,7 @@ func (wc *watchChan) run() { select { case err := <-wc.errChan: - if err == context.Canceled { + if isCancelError(err) { break } errResult := transformErrorToEvent(err) @@ -213,12 +242,15 @@ func (wc *watchChan) sync() error { return nil } -// logWatchChannelErr checks whether the error is about mvcc revision compaction which is regarded as warning func logWatchChannelErr(err error) { - if !strings.Contains(err.Error(), "mvcc: required revision has been compacted") { - klog.Errorf("watch chan error: %v", err) - } else { + switch { + case strings.Contains(err.Error(), "mvcc: required revision has been compacted"): + // mvcc revision compaction which is regarded as warning, not error klog.Warningf("watch chan error: %v", err) + case isCancelError(err): + // expected when watches close, no need to log + default: + klog.Errorf("watch chan error: %v", err) } }