feat: add seed trace (#1549)
Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
parent
439ad71230
commit
baa64bb0a3
|
|
@ -148,14 +148,14 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
|
||||||
}
|
}
|
||||||
logger.Infof("initialize scheduler addresses: %#v", addrs)
|
logger.Infof("initialize scheduler addresses: %#v", addrs)
|
||||||
|
|
||||||
var opts []grpc.DialOption
|
var schedulerClientOptions []grpc.DialOption
|
||||||
if opt.Options.Telemetry.Jaeger != "" {
|
if opt.Options.Telemetry.Jaeger != "" {
|
||||||
opts = append(opts,
|
schedulerClientOptions = append(schedulerClientOptions,
|
||||||
grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
|
grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
|
||||||
grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()),
|
grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
sched, err := schedulerclient.GetClientByAddr(addrs, opts...)
|
sched, err := schedulerclient.GetClientByAddr(addrs, schedulerClientOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get schedulers: %w", err)
|
return nil, fmt.Errorf("failed to get schedulers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -212,6 +212,19 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
|
||||||
}
|
}
|
||||||
peerServerOption = append(peerServerOption, grpc.Creds(tlsCredentials))
|
peerServerOption = append(peerServerOption, grpc.Creds(tlsCredentials))
|
||||||
}
|
}
|
||||||
|
// enable grpc tracing
|
||||||
|
if opt.Options.Telemetry.Jaeger != "" {
|
||||||
|
downloadServerOption = append(
|
||||||
|
downloadServerOption,
|
||||||
|
grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
|
||||||
|
grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor()),
|
||||||
|
)
|
||||||
|
peerServerOption = append(
|
||||||
|
peerServerOption,
|
||||||
|
grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
|
||||||
|
grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor()),
|
||||||
|
)
|
||||||
|
}
|
||||||
rpcManager, err := rpcserver.New(host, peerTaskManager, storageManager, defaultPattern, downloadServerOption, peerServerOption)
|
rpcManager, err := rpcserver.New(host, peerTaskManager, storageManager, defaultPattern, downloadServerOption, peerServerOption)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,8 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/propagation"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
commonv1 "d7y.io/api/pkg/apis/common/v1"
|
commonv1 "d7y.io/api/pkg/apis/common/v1"
|
||||||
|
|
@ -211,5 +213,8 @@ func buildDownloadPieceHTTPRequest(ctx context.Context, d *DownloadPieceRequest)
|
||||||
// TODO use string.Builder
|
// TODO use string.Builder
|
||||||
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d",
|
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d",
|
||||||
d.piece.RangeStart, d.piece.RangeStart+uint64(d.piece.RangeSize)-1))
|
d.piece.RangeStart, d.piece.RangeStart+uint64(d.piece.RangeSize)-1))
|
||||||
|
|
||||||
|
// inject trace id into request header
|
||||||
|
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue