From 211663604036a8ab4b966eb758869b39d6d3965b Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 17 Aug 2022 22:08:47 +0800 Subject: [PATCH] feat: add grpc client error interceptor (#1575) Signed-off-by: Gaius Signed-off-by: Gaius --- pkg/rpc/cdnsystem/client/client.go | 4 +++ pkg/rpc/client.go | 4 +-- pkg/rpc/client_util.go | 48 --------------------------- pkg/rpc/interceptor.go | 53 +++++++++++++++++++++++------- pkg/rpc/scheduler/client/client.go | 5 +++ 5 files changed, 53 insertions(+), 61 deletions(-) diff --git a/pkg/rpc/cdnsystem/client/client.go b/pkg/rpc/cdnsystem/client/client.go index 4ab7d07e1..de37ece7a 100644 --- a/pkg/rpc/cdnsystem/client/client.go +++ b/pkg/rpc/cdnsystem/client/client.go @@ -60,6 +60,7 @@ func GetClientByAddr(netAddr dfnet.NetAddr, options ...grpc.DialOption) (Client, append([]grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + rpc.ConvertErrorUnaryClientInterceptor, otelgrpc.UnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), @@ -70,6 +71,7 @@ func GetClientByAddr(netAddr dfnet.NetAddr, options ...grpc.DialOption) (Client, ), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + rpc.ConvertErrorStreamClientInterceptor, otelgrpc.StreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), @@ -97,6 +99,7 @@ func GetClient(dynconfig config.DynconfigInterface, options ...grpc.DialOption) grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + rpc.ConvertErrorUnaryClientInterceptor, otelgrpc.UnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), @@ -108,6 +111,7 @@ func GetClient(dynconfig config.DynconfigInterface, options ...grpc.DialOption) rpc.RefresherUnaryClientInterceptor(dynconfig), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + rpc.ConvertErrorStreamClientInterceptor, otelgrpc.StreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 3d77f58c8..cd19ca66b 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -97,8 +97,8 @@ var defaultClientOpts = []grpc.DialOption{ Time: 1 * time.Minute, Timeout: 10 * time.Second, }), - grpc.WithStreamInterceptor(streamClientInterceptor), - grpc.WithUnaryInterceptor(unaryClientInterceptor), + grpc.WithStreamInterceptor(ConvertErrorStreamClientInterceptor), + grpc.WithUnaryInterceptor(ConvertErrorUnaryClientInterceptor), } type ConnOption interface { diff --git a/pkg/rpc/client_util.go b/pkg/rpc/client_util.go index 61b70d199..8fcdca127 100644 --- a/pkg/rpc/client_util.go +++ b/pkg/rpc/client_util.go @@ -28,8 +28,6 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" - commonv1 "d7y.io/api/pkg/apis/common/v1" - "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/math" @@ -161,52 +159,6 @@ func (w *wrappedClientStream) SendMsg(m any) error { return err } -func streamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - s, err := streamer(ctx, desc, cc, method, opts...) - if err != nil { - err = convertClientError(err) - logger.GrpcLogger.Errorf("create client stream error: %v for method: %s target: %s connState: %s", err, method, cc.Target(), cc.GetState().String()) - return nil, err - } - - return &wrappedClientStream{ - ClientStream: s, - method: method, - cc: cc, - }, nil -} - -func unaryClientInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - messageSent.Event(ctx, 1, req) - err := invoker(ctx, method, req, reply, cc, opts...) - - messageReceived.Event(ctx, 1, reply) - if err != nil { - err = convertClientError(err) - logger.GrpcLogger.Errorf("do unary client error: %v for method: %s target: %s connState: %s", err, method, cc.Target(), cc.GetState().String()) - } - - return err -} - -func convertClientError(err error) error { - if err == nil { - return nil - } - s := status.Convert(err) - for _, d := range s.Details() { - switch internal := d.(type) { - case *commonv1.GrpcDfError: - return &dferrors.DfError{ - Code: internal.Code, - Message: internal.Message, - } - } - } - // grpc framework error - return err -} - type RetryMeta struct { StreamTimes int // times of replacing stream on the current client MaxAttempts int // limit times for execute diff --git a/pkg/rpc/interceptor.go b/pkg/rpc/interceptor.go index 7129d96e7..6ecf01eed 100644 --- a/pkg/rpc/interceptor.go +++ b/pkg/rpc/interceptor.go @@ -89,28 +89,25 @@ func (r *RateLimiterInterceptor) Limit() bool { // ConvertErrorUnaryServerInterceptor returns a new unary server interceptor that convert error when trigger custom error. func ConvertErrorUnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - m, err := handler(ctx, req) + h, err := handler(ctx, req) if err != nil { - err = convertError(err) - logger.GrpcLogger.Errorf("do unary server error: %v for method: %s", err, info.FullMethod) + return h, convertServerError(err) } - return m, err + return h, nil } // ConvertErrorStreamServerInterceptor returns a new stream server interceptor that convert error when trigger custom error. func ConvertErrorStreamServerInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - err := handler(srv, ss) - if err != nil { - err = convertError(err) - logger.GrpcLogger.Errorf("do stream server error: %v for method: %s", err, info.FullMethod) + if err := handler(srv, ss); err != nil { + return convertServerError(err) } - return err + return nil } -// convertError converts custom error. -func convertError(err error) error { +// convertServerError converts custom error of server. +func convertServerError(err error) error { if status.Code(err) == codes.InvalidArgument { err = dferrors.New(commonv1.Code_BadRequest, err.Error()) } @@ -123,3 +120,37 @@ func convertError(err error) error { } return err } + +// ConvertErrorUnaryClientInterceptor returns a new unary client interceptor that convert error when trigger custom error. +func ConvertErrorUnaryClientInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + if err := invoker(ctx, method, req, reply, cc, opts...); err != nil { + return convertClientError(err) + } + + return nil +} + +// ConvertErrorStreamClientInterceptor returns a new stream client interceptor that convert error when trigger custom error. +func ConvertErrorStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + s, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + return nil, convertClientError(err) + } + + return s, nil +} + +// convertClientError converts custom error of client. +func convertClientError(err error) error { + for _, d := range status.Convert(err).Details() { + switch internal := d.(type) { + case *commonv1.GrpcDfError: + return &dferrors.DfError{ + Code: internal.Code, + Message: internal.Message, + } + } + } + + return err +} diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index 9e08aadd9..88f97401a 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -38,6 +38,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" pkgbalancer "d7y.io/dragonfly/v2/pkg/balancer" "d7y.io/dragonfly/v2/pkg/resolver" + "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/common" ) @@ -65,6 +66,7 @@ func GetClient(dynconfig config.Dynconfig, options ...grpc.DialOption) (Client, grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + rpc.ConvertErrorUnaryClientInterceptor, otelgrpc.UnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), @@ -73,11 +75,14 @@ func GetClient(dynconfig config.Dynconfig, options ...grpc.DialOption) (Client, grpc_retry.WithMax(maxRetries), grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), ), + rpc.RefresherUnaryClientInterceptor(dynconfig), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + rpc.ConvertErrorStreamClientInterceptor, otelgrpc.StreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + rpc.RefresherStreamClientInterceptor(dynconfig), )), }, options...)..., )