diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 88956c753..368bd8432 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -148,14 +148,14 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { } logger.Infof("initialize scheduler addresses: %#v", addrs) - var opts []grpc.DialOption + var schedulerClientOptions []grpc.DialOption if opt.Options.Telemetry.Jaeger != "" { - opts = append(opts, + schedulerClientOptions = append(schedulerClientOptions, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()), ) } - sched, err := schedulerclient.GetClientByAddr(addrs, opts...) + sched, err := schedulerclient.GetClientByAddr(addrs, schedulerClientOptions...) if err != nil { return nil, fmt.Errorf("failed to get schedulers: %w", err) } @@ -212,6 +212,19 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { } peerServerOption = append(peerServerOption, grpc.Creds(tlsCredentials)) } + // enable grpc tracing + if opt.Options.Telemetry.Jaeger != "" { + downloadServerOption = append( + downloadServerOption, + grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor()), + ) + peerServerOption = append( + peerServerOption, + grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor()), + ) + } rpcManager, err := rpcserver.New(host, peerTaskManager, storageManager, defaultPattern, downloadServerOption, peerServerOption) if err != nil { return nil, err diff --git a/client/daemon/peer/piece_downloader.go b/client/daemon/peer/piece_downloader.go index a50ef004a..82934f04c 100644 --- a/client/daemon/peer/piece_downloader.go +++ b/client/daemon/peer/piece_downloader.go @@ -27,6 +27,8 @@ import ( "net/url" "time" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" "google.golang.org/grpc/status" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -211,5 +213,8 @@ func buildDownloadPieceHTTPRequest(ctx context.Context, d *DownloadPieceRequest) // TODO use string.Builder req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", d.piece.RangeStart, d.piece.RangeStart+uint64(d.piece.RangeSize)-1)) + + // inject trace id into request header + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header)) return req }