fix: otelgrpc.UnaryClientInterceptor memory leak (#2772)

fix: calling otelgrpc.UnaryClientInterceptor repeatedly creates multiple Int64Histogram resulting in a memory leak

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-10-08 16:20:46 +08:00 committed by GitHub
parent 237eb9e505
commit 3b55ae4300
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 70 additions and 39 deletions

View File

@ -26,7 +26,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
@ -60,7 +59,7 @@ func GetClientByAddr(ctx context.Context, netAddr dfnet.NetAddr, opts ...grpc.Di
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor, rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -70,7 +69,7 @@ func GetClientByAddr(ctx context.Context, netAddr dfnet.NetAddr, opts ...grpc.Di
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor, rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),
@ -99,7 +98,7 @@ func GetClient(ctx context.Context, dynconfig config.DynconfigInterface, opts ..
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor, rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -110,7 +109,7 @@ func GetClient(ctx context.Context, dynconfig config.DynconfigInterface, opts ..
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor, rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
rpc.RefresherStreamClientInterceptor(dynconfig), rpc.RefresherStreamClientInterceptor(dynconfig),

View File

@ -26,7 +26,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
@ -50,7 +49,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor, rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -60,7 +59,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor, rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),

View File

@ -25,13 +25,13 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
commonv2 "d7y.io/api/v2/pkg/apis/common/v2" commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2" dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
) )
// GetV2 returns v2 version of the dfdaemon client. // GetV2 returns v2 version of the dfdaemon client.
@ -41,7 +41,7 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
target, target,
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -50,7 +50,7 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
), ),
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),

View File

@ -26,11 +26,11 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1" healthpb "google.golang.org/grpc/health/grpc_health_v1"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
) )
const ( const (
@ -45,12 +45,12 @@ func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Cli
target, target,
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),

View File

@ -26,12 +26,12 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
inferencev1 "d7y.io/api/v2/pkg/apis/inference/v1" inferencev1 "d7y.io/api/v2/pkg/apis/inference/v1"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
) )
const ( const (
@ -53,7 +53,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
target, target,
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -62,7 +62,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
), ),
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),

View File

@ -18,8 +18,10 @@ package rpc
import ( import (
"context" "context"
"sync"
"github.com/juju/ratelimit" "github.com/juju/ratelimit"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -27,6 +29,39 @@ import (
"d7y.io/dragonfly/v2/internal/dferrors" "d7y.io/dragonfly/v2/internal/dferrors"
) )
var (
// otelUnaryInterceptor is the unary interceptor for tracing.
otelUnaryInterceptor grpc.UnaryClientInterceptor
// otelStreamInterceptor is the stream interceptor for tracing.
otelStreamInterceptor grpc.StreamClientInterceptor
// interceptorsInitialized is used to ensure that otel interceptors are initialized only once.
interceptorsInitialized = sync.Once{}
)
// OTEL interceptors must be created once to avoid memory leak,
// refer to https://github.com/open-telemetry/opentelemetry-go-contrib/issues/4226 and
// https://github.com/argoproj/argo-cd/pull/15174.
func ensureOTELInterceptorInitialized() {
interceptorsInitialized.Do(func() {
otelUnaryInterceptor = otelgrpc.UnaryClientInterceptor()
otelStreamInterceptor = otelgrpc.StreamClientInterceptor()
})
}
// OTELUnaryClientInterceptor returns a new unary client interceptor that traces gRPC requests.
func OTELUnaryClientInterceptor() grpc.UnaryClientInterceptor {
ensureOTELInterceptorInitialized()
return otelUnaryInterceptor
}
// OTELStreamClientInterceptor returns a new stream client interceptor that traces gRPC requests.
func OTELStreamClientInterceptor() grpc.StreamClientInterceptor {
ensureOTELInterceptorInitialized()
return otelStreamInterceptor
}
// Refresher is the interface for refreshing dynconfig. // Refresher is the interface for refreshing dynconfig.
type Refresher interface { type Refresher interface {
Refresh() error Refresh() error

View File

@ -27,7 +27,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -37,6 +36,7 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet" "d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client" healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client"
) )
@ -47,7 +47,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
target, target,
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -56,7 +56,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
), ),
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),

View File

@ -27,7 +27,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -37,6 +36,7 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet" "d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client" healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client"
) )
@ -47,7 +47,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
target, target,
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -56,7 +56,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
), ),
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),

View File

@ -25,7 +25,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
@ -55,7 +54,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor, rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -66,7 +65,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor, rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
rpc.RefresherStreamClientInterceptor(dynconfig), rpc.RefresherStreamClientInterceptor(dynconfig),
@ -95,7 +94,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor, rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -105,7 +104,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor, rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),

View File

@ -25,7 +25,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
@ -53,7 +52,7 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -63,7 +62,7 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
rpc.RefresherUnaryClientInterceptor(dynconfig), rpc.RefresherUnaryClientInterceptor(dynconfig),
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
rpc.RefresherStreamClientInterceptor(dynconfig), rpc.RefresherStreamClientInterceptor(dynconfig),
@ -91,7 +90,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -100,7 +99,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
), ),
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),

View File

@ -27,13 +27,13 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
securityv1 "d7y.io/api/v2/pkg/apis/security/v1" securityv1 "d7y.io/api/v2/pkg/apis/security/v1"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet" "d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client" healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client"
) )
@ -56,7 +56,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
target, target,
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -65,7 +65,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
), ),
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),

View File

@ -26,12 +26,12 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
trainerv1 "d7y.io/api/v2/pkg/apis/trainer/v1" trainerv1 "d7y.io/api/v2/pkg/apis/trainer/v1"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
) )
const ( const (
@ -50,7 +50,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
target, target,
append([]grpc.DialOption{ append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(), rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor( grpc_retry.UnaryClientInterceptor(
@ -59,7 +59,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
), ),
)), )),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(), rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor, grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)), )),