fix: infinitely get pieces when piece num is invalid (#926)
Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
parent
cb4202319e
commit
7ba341fc5c
|
|
@ -173,6 +173,7 @@ type DownloadOption struct {
|
|||
PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"`
|
||||
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
|
||||
TransportOption *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
|
||||
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
|
||||
}
|
||||
|
||||
type TransportOption struct {
|
||||
|
|
|
|||
|
|
@ -60,6 +60,7 @@ var peerHostConfig = DaemonOption{
|
|||
Download: DownloadOption{
|
||||
CalculateDigest: true,
|
||||
PieceDownloadTimeout: 30 * time.Second,
|
||||
GetPiecesMaxRetry: 100,
|
||||
TotalRateLimit: clientutil.RateLimit{
|
||||
Limit: rate.Limit(DefaultTotalDownloadLimit),
|
||||
},
|
||||
|
|
|
|||
|
|
@ -60,6 +60,7 @@ var peerHostConfig = DaemonOption{
|
|||
Download: DownloadOption{
|
||||
CalculateDigest: true,
|
||||
PieceDownloadTimeout: 30 * time.Second,
|
||||
GetPiecesMaxRetry: 100,
|
||||
TotalRateLimit: clientutil.RateLimit{
|
||||
Limit: rate.Limit(DefaultTotalDownloadLimit),
|
||||
},
|
||||
|
|
|
|||
|
|
@ -168,7 +168,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
|
|||
return nil, err
|
||||
}
|
||||
peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler,
|
||||
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest)
|
||||
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,6 +103,8 @@ type peerTask struct {
|
|||
peerPacketReady chan bool
|
||||
// pieceParallelCount stands the piece parallel count from peerPacket
|
||||
pieceParallelCount *atomic.Int32
|
||||
// getPiecesMaxRetry stands max retry to get pieces from one peer packet
|
||||
getPiecesMaxRetry int
|
||||
|
||||
// done channel will be close when peer task is finished
|
||||
done chan struct{}
|
||||
|
|
@ -502,8 +504,8 @@ loop:
|
|||
|
||||
func (pt *peerTask) init(piecePacket *base.PiecePacket, pieceBufferSize uint32) (chan *DownloadPieceRequest, bool) {
|
||||
pt.contentLength.Store(piecePacket.ContentLength)
|
||||
if pt.contentLength.Load() > 0 {
|
||||
pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(pt.contentLength.Load()))
|
||||
if piecePacket.ContentLength > 0 {
|
||||
pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(piecePacket.ContentLength))
|
||||
}
|
||||
if err := pt.callback.Init(pt); err != nil {
|
||||
pt.span.RecordError(err)
|
||||
|
|
@ -579,7 +581,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
|
|||
// preparePieceTasksByPeer func already send piece result with error
|
||||
pt.Infof("new peer client ready, main peer: %s", pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
|
||||
// research from piece 0
|
||||
return pt.getNextPieceNum(0), true
|
||||
return 0, true
|
||||
}
|
||||
// when scheduler says base.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady
|
||||
pt.Infof("start download from source due to base.Code_SchedNeedBackSource")
|
||||
|
|
@ -605,6 +607,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
|
|||
}
|
||||
|
||||
func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *base.PiecePacket) {
|
||||
pt.Debugf("dispatch piece request, piece count: %d", len(piecePacket.PieceInfos))
|
||||
for _, piece := range piecePacket.PieceInfos {
|
||||
pt.Infof("get piece %d from %s/%s, md5: %s, start: %d, size: %d",
|
||||
piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize)
|
||||
|
|
@ -721,7 +724,9 @@ func (pt *peerTask) isCompleted() bool {
|
|||
|
||||
func (pt *peerTask) preparePieceTasks(request *base.PieceTaskRequest) (p *base.PiecePacket, err error) {
|
||||
defer pt.recoverFromPanic()
|
||||
var retryCount int
|
||||
prepare:
|
||||
retryCount++
|
||||
peerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket)
|
||||
pt.pieceParallelCount.Store(peerPacket.ParallelCount)
|
||||
request.DstPid = peerPacket.MainPeer.PeerId
|
||||
|
|
@ -730,6 +735,10 @@ prepare:
|
|||
return
|
||||
}
|
||||
if err == errPeerPacketChanged {
|
||||
if pt.getPiecesMaxRetry > 0 && retryCount > pt.getPiecesMaxRetry {
|
||||
err = fmt.Errorf("get pieces max retry count reached")
|
||||
return
|
||||
}
|
||||
goto prepare
|
||||
}
|
||||
for _, peer := range peerPacket.StealPeers {
|
||||
|
|
@ -739,6 +748,10 @@ prepare:
|
|||
return
|
||||
}
|
||||
if err == errPeerPacketChanged {
|
||||
if pt.getPiecesMaxRetry > 0 && retryCount > pt.getPiecesMaxRetry {
|
||||
err = fmt.Errorf("get pieces max retry count reached")
|
||||
return
|
||||
}
|
||||
goto prepare
|
||||
}
|
||||
}
|
||||
|
|
@ -816,24 +829,49 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
|
|||
count int
|
||||
)
|
||||
p, _, err := retry.Run(pt.ctx, func() (interface{}, bool, error) {
|
||||
pp, getErr := dfclient.GetPieceTasks(pt.ctx, peer, request)
|
||||
piecePacket, getError := dfclient.GetPieceTasks(pt.ctx, peer, request)
|
||||
// when GetPieceTasks returns err, exit retry
|
||||
if getErr != nil {
|
||||
span.RecordError(getErr)
|
||||
// fast way to exit retry
|
||||
if getError != nil {
|
||||
pt.Errorf("get piece tasks with error: %s", getError)
|
||||
span.RecordError(getError)
|
||||
|
||||
// fast way 1 to exit retry
|
||||
if de, ok := getError.(*dferrors.DfError); ok {
|
||||
pt.Debugf("get piece task with grpc error, code: %d", de.Code)
|
||||
// bad request, like invalid piece num, just exit
|
||||
if de.Code == base.Code_BadRequest {
|
||||
span.AddEvent("bad request")
|
||||
pt.Warnf("get piece task from peer %s canceled: %s", peer.PeerId, getError)
|
||||
return nil, true, getError
|
||||
}
|
||||
}
|
||||
|
||||
// fast way 2 to exit retry
|
||||
lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket)
|
||||
if curPeerPacket.MainPeer.PeerId != lastPeerPacket.MainPeer.PeerId {
|
||||
pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr,
|
||||
pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getError,
|
||||
curPeerPacket.MainPeer.PeerId, lastPeerPacket.MainPeer.PeerId)
|
||||
peerPacketChanged = true
|
||||
return nil, true, nil
|
||||
}
|
||||
return nil, true, getErr
|
||||
return nil, true, getError
|
||||
}
|
||||
// got any pieces
|
||||
if len(piecePacket.PieceInfos) > 0 {
|
||||
return piecePacket, false, nil
|
||||
}
|
||||
// need update metadata
|
||||
if piecePacket.ContentLength > pt.contentLength.Load() || piecePacket.TotalPiece > pt.totalPiece {
|
||||
return piecePacket, false, nil
|
||||
}
|
||||
// invalid request num
|
||||
if piecePacket.TotalPiece > -1 && uint32(piecePacket.TotalPiece) <= request.StartNum {
|
||||
pt.Warnf("invalid start num: %d, total piece: %d", request.StartNum, piecePacket.TotalPiece)
|
||||
return piecePacket, false, nil
|
||||
}
|
||||
|
||||
// by santong: when peer return empty, retry later
|
||||
if len(pp.PieceInfos) == 0 {
|
||||
count++
|
||||
er := pt.peerPacketStream.Send(&scheduler.PieceResult{
|
||||
sendError := pt.peerPacketStream.Send(&scheduler.PieceResult{
|
||||
TaskId: pt.taskID,
|
||||
SrcPid: pt.peerID,
|
||||
DstPid: peer.PeerId,
|
||||
|
|
@ -843,9 +881,9 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
|
|||
HostLoad: nil,
|
||||
FinishedCount: pt.readyPieces.Settled(),
|
||||
})
|
||||
if er != nil {
|
||||
span.RecordError(er)
|
||||
pt.Errorf("send piece result with base.Code_ClientWaitPieceReady error: %s", er)
|
||||
if sendError != nil {
|
||||
span.RecordError(sendError)
|
||||
pt.Errorf("send piece result with base.Code_ClientWaitPieceReady error: %s", sendError)
|
||||
}
|
||||
// fast way to exit retry
|
||||
lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket)
|
||||
|
|
@ -855,12 +893,11 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
|
|||
peerPacketChanged = true
|
||||
return nil, true, nil
|
||||
}
|
||||
count++
|
||||
span.AddEvent("retry due to empty pieces",
|
||||
trace.WithAttributes(config.AttributeGetPieceRetry.Int(count)))
|
||||
pt.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId)
|
||||
return nil, false, dferrors.ErrEmptyValue
|
||||
}
|
||||
return pp, false, nil
|
||||
}, 0.05, 0.2, 40, nil)
|
||||
if peerPacketChanged {
|
||||
return nil, errPeerPacketChanged
|
||||
|
|
@ -877,13 +914,14 @@ func (pt *peerTask) getNextPieceNum(cur int32) int32 {
|
|||
return -1
|
||||
}
|
||||
i := cur
|
||||
// try to find next not requested piece
|
||||
for ; pt.requestedPieces.IsSet(i); i++ {
|
||||
}
|
||||
if pt.totalPiece > 0 && i >= pt.totalPiece {
|
||||
// double check, re-search not success or not requested pieces
|
||||
for i = int32(0); pt.requestedPieces.IsSet(i); i++ {
|
||||
}
|
||||
if pt.totalPiece > 0 && i >= int32(pt.totalPiece) {
|
||||
if pt.totalPiece > 0 && i >= pt.totalPiece {
|
||||
return -1
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -86,7 +86,8 @@ func newFilePeerTask(ctx context.Context,
|
|||
request *FilePeerTaskRequest,
|
||||
schedulerClient schedulerclient.SchedulerClient,
|
||||
schedulerOption config.SchedulerOption,
|
||||
perPeerRateLimit rate.Limit) (context.Context, *filePeerTask, *TinyData, error) {
|
||||
perPeerRateLimit rate.Limit,
|
||||
getPiecesMaxRetry int) (context.Context, *filePeerTask, *TinyData, error) {
|
||||
ctx, span := tracer.Start(ctx, config.SpanFilePeerTask, trace.WithSpanKind(trace.SpanKindClient))
|
||||
span.SetAttributes(config.AttributePeerHost.String(host.Uuid))
|
||||
span.SetAttributes(semconv.NetHostIPKey.String(host.Ip))
|
||||
|
|
@ -196,6 +197,7 @@ func newFilePeerTask(ctx context.Context,
|
|||
contentLength: atomic.NewInt64(-1),
|
||||
pieceParallelCount: atomic.NewInt32(0),
|
||||
totalPiece: -1,
|
||||
getPiecesMaxRetry: getPiecesMaxRetry,
|
||||
schedulerOption: schedulerOption,
|
||||
schedulerClient: schedulerClient,
|
||||
limiter: limiter,
|
||||
|
|
|
|||
|
|
@ -136,7 +136,7 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) {
|
|||
req,
|
||||
ptm.schedulerClient,
|
||||
ptm.schedulerOption,
|
||||
0)
|
||||
0, 10)
|
||||
assert.Nil(err, "new file peer task")
|
||||
pt.needBackSource = true
|
||||
|
||||
|
|
@ -261,7 +261,7 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) {
|
|||
req,
|
||||
ptm.schedulerClient,
|
||||
ptm.schedulerOption,
|
||||
0)
|
||||
0, 10)
|
||||
assert.Nil(err, "new file peer task")
|
||||
pt.needBackSource = true
|
||||
|
||||
|
|
|
|||
|
|
@ -117,6 +117,8 @@ type peerTaskManager struct {
|
|||
enableMultiplex bool
|
||||
|
||||
calculateDigest bool
|
||||
|
||||
getPiecesMaxRetry int
|
||||
}
|
||||
|
||||
func NewPeerTaskManager(
|
||||
|
|
@ -127,7 +129,8 @@ func NewPeerTaskManager(
|
|||
schedulerOption config.SchedulerOption,
|
||||
perPeerRateLimit rate.Limit,
|
||||
multiplex bool,
|
||||
calculateDigest bool) (TaskManager, error) {
|
||||
calculateDigest bool,
|
||||
getPiecesMaxRetry int) (TaskManager, error) {
|
||||
|
||||
ptm := &peerTaskManager{
|
||||
host: host,
|
||||
|
|
@ -139,6 +142,7 @@ func NewPeerTaskManager(
|
|||
perPeerRateLimit: perPeerRateLimit,
|
||||
enableMultiplex: multiplex,
|
||||
calculateDigest: calculateDigest,
|
||||
getPiecesMaxRetry: getPiecesMaxRetry,
|
||||
}
|
||||
return ptm, nil
|
||||
}
|
||||
|
|
@ -159,7 +163,7 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer
|
|||
limit = rate.Limit(req.Limit)
|
||||
}
|
||||
ctx, pt, tiny, err := newFilePeerTask(ctx, ptm.host, ptm.pieceManager,
|
||||
req, ptm.schedulerClient, ptm.schedulerOption, limit)
|
||||
req, ptm.schedulerClient, ptm.schedulerOption, limit, ptm.getPiecesMaxRetry)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
@ -217,8 +221,7 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu
|
|||
}
|
||||
|
||||
start := time.Now()
|
||||
ctx, pt, tiny, err := newStreamPeerTask(ctx, ptm.host, ptm.pieceManager,
|
||||
req, ptm.schedulerClient, ptm.schedulerOption, ptm.perPeerRateLimit)
|
||||
ctx, pt, tiny, err := newStreamPeerTask(ctx, ptm, req)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,7 +34,6 @@ import (
|
|||
"d7y.io/dragonfly/v2/pkg/idgen"
|
||||
"d7y.io/dragonfly/v2/pkg/rpc/base"
|
||||
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
|
||||
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
|
||||
)
|
||||
|
||||
// StreamPeerTask represents a peer task with stream io for reading directly without once more disk io
|
||||
|
|
@ -55,36 +54,32 @@ type streamPeerTask struct {
|
|||
var _ StreamPeerTask = (*streamPeerTask)(nil)
|
||||
|
||||
func newStreamPeerTask(ctx context.Context,
|
||||
host *scheduler.PeerHost,
|
||||
pieceManager PieceManager,
|
||||
request *scheduler.PeerTaskRequest,
|
||||
schedulerClient schedulerclient.SchedulerClient,
|
||||
schedulerOption config.SchedulerOption,
|
||||
perPeerRateLimit rate.Limit) (context.Context, *streamPeerTask, *TinyData, error) {
|
||||
ptm *peerTaskManager,
|
||||
request *scheduler.PeerTaskRequest) (context.Context, *streamPeerTask, *TinyData, error) {
|
||||
ctx, span := tracer.Start(ctx, config.SpanStreamPeerTask, trace.WithSpanKind(trace.SpanKindClient))
|
||||
span.SetAttributes(config.AttributePeerHost.String(host.Uuid))
|
||||
span.SetAttributes(semconv.NetHostIPKey.String(host.Ip))
|
||||
span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid))
|
||||
span.SetAttributes(semconv.NetHostIPKey.String(ptm.host.Ip))
|
||||
span.SetAttributes(config.AttributePeerID.String(request.PeerId))
|
||||
span.SetAttributes(semconv.HTTPURLKey.String(request.Url))
|
||||
|
||||
logger.Debugf("request overview, pid: %s, url: %s, filter: %s, meta: %s, tag: %s",
|
||||
request.PeerId, request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag)
|
||||
// trace register
|
||||
regCtx, cancel := context.WithTimeout(ctx, schedulerOption.ScheduleTimeout.Duration)
|
||||
regCtx, cancel := context.WithTimeout(ctx, ptm.schedulerOption.ScheduleTimeout.Duration)
|
||||
defer cancel()
|
||||
regCtx, regSpan := tracer.Start(regCtx, config.SpanRegisterTask)
|
||||
logger.Infof("step 1: peer %s start to register", request.PeerId)
|
||||
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
|
||||
result, err := ptm.schedulerClient.RegisterPeerTask(regCtx, request)
|
||||
regSpan.RecordError(err)
|
||||
regSpan.End()
|
||||
|
||||
var needBackSource bool
|
||||
if err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
logger.Errorf("scheduler did not response in %s", schedulerOption.ScheduleTimeout.Duration)
|
||||
logger.Errorf("scheduler did not response in %s", ptm.schedulerOption.ScheduleTimeout.Duration)
|
||||
}
|
||||
logger.Errorf("step 1: peer %s register failed: %s", request.PeerId, err)
|
||||
if schedulerOption.DisableAutoBackSource {
|
||||
if ptm.schedulerOption.DisableAutoBackSource {
|
||||
logger.Errorf("register peer task failed: %s, peer id: %s, auto back source disabled", err, request.PeerId)
|
||||
span.RecordError(err)
|
||||
span.End()
|
||||
|
|
@ -92,7 +87,7 @@ func newStreamPeerTask(ctx context.Context,
|
|||
}
|
||||
needBackSource = true
|
||||
// can not detect source or scheduler error, create a new dummy scheduler client
|
||||
schedulerClient = &dummySchedulerClient{}
|
||||
ptm.schedulerClient = &dummySchedulerClient{}
|
||||
result = &scheduler.RegisterResult{TaskId: idgen.TaskID(request.Url, request.UrlMeta)}
|
||||
logger.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, request.PeerId)
|
||||
}
|
||||
|
|
@ -137,7 +132,7 @@ func newStreamPeerTask(ctx context.Context,
|
|||
}
|
||||
}
|
||||
|
||||
peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
|
||||
peerPacketStream, err := ptm.schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
|
||||
logger.Infof("step 2: start report peer %s piece result", request.PeerId)
|
||||
if err != nil {
|
||||
defer span.End()
|
||||
|
|
@ -145,19 +140,19 @@ func newStreamPeerTask(ctx context.Context,
|
|||
return ctx, nil, nil, err
|
||||
}
|
||||
var limiter *rate.Limiter
|
||||
if perPeerRateLimit > 0 {
|
||||
limiter = rate.NewLimiter(perPeerRateLimit, int(perPeerRateLimit))
|
||||
if ptm.perPeerRateLimit > 0 {
|
||||
limiter = rate.NewLimiter(ptm.perPeerRateLimit, int(ptm.perPeerRateLimit))
|
||||
}
|
||||
pt := &streamPeerTask{
|
||||
successPieceCh: make(chan int32),
|
||||
streamDone: make(chan struct{}),
|
||||
peerTask: peerTask{
|
||||
ctx: ctx,
|
||||
host: host,
|
||||
host: ptm.host,
|
||||
needBackSource: needBackSource,
|
||||
request: request,
|
||||
peerPacketStream: peerPacketStream,
|
||||
pieceManager: pieceManager,
|
||||
pieceManager: ptm.pieceManager,
|
||||
peerPacketReady: make(chan bool, 1),
|
||||
peerID: request.PeerId,
|
||||
taskID: result.TaskId,
|
||||
|
|
@ -172,8 +167,9 @@ func newStreamPeerTask(ctx context.Context,
|
|||
contentLength: atomic.NewInt64(-1),
|
||||
pieceParallelCount: atomic.NewInt32(0),
|
||||
totalPiece: -1,
|
||||
schedulerOption: schedulerOption,
|
||||
schedulerClient: schedulerClient,
|
||||
getPiecesMaxRetry: ptm.getPiecesMaxRetry,
|
||||
schedulerOption: ptm.schedulerOption,
|
||||
schedulerClient: ptm.schedulerClient,
|
||||
limiter: limiter,
|
||||
completedLength: atomic.NewInt64(0),
|
||||
usedTraffic: atomic.NewUint64(0),
|
||||
|
|
|
|||
|
|
@ -256,8 +256,7 @@ func TestStreamPeerTask_BackSource_Partial_WithContentLength(t *testing.T) {
|
|||
PeerHost: &scheduler.PeerHost{},
|
||||
}
|
||||
ctx := context.Background()
|
||||
_, pt, _, err := newStreamPeerTask(ctx, ptm.host, pm, req,
|
||||
ptm.schedulerClient, ptm.schedulerOption, 0)
|
||||
_, pt, _, err := newStreamPeerTask(ctx, ptm, req)
|
||||
assert.Nil(err, "new stream peer task")
|
||||
pt.SetCallback(&streamPeerTaskCallback{
|
||||
ptm: ptm,
|
||||
|
|
|
|||
|
|
@ -32,7 +32,6 @@ import (
|
|||
"d7y.io/dragonfly/v2/client/clientutil"
|
||||
"d7y.io/dragonfly/v2/client/config"
|
||||
"d7y.io/dragonfly/v2/client/daemon/test"
|
||||
"d7y.io/dragonfly/v2/internal/util"
|
||||
"d7y.io/dragonfly/v2/pkg/rpc/base"
|
||||
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
|
||||
"d7y.io/dragonfly/v2/pkg/source"
|
||||
|
|
@ -103,7 +102,9 @@ func TestStreamPeerTask_BackSource_WithContentLength(t *testing.T) {
|
|||
pieceManager: &pieceManager{
|
||||
storageManager: storageManager,
|
||||
pieceDownloader: downloader,
|
||||
computePieceSize: util.ComputePieceSize,
|
||||
computePieceSize: func(contentLength int64) uint32 {
|
||||
return uint32(pieceSize)
|
||||
},
|
||||
},
|
||||
storageManager: storageManager,
|
||||
schedulerClient: schedulerClient,
|
||||
|
|
@ -120,19 +121,7 @@ func TestStreamPeerTask_BackSource_WithContentLength(t *testing.T) {
|
|||
PeerHost: &scheduler.PeerHost{},
|
||||
}
|
||||
ctx := context.Background()
|
||||
_, pt, _, err := newStreamPeerTask(ctx,
|
||||
ptm.host,
|
||||
&pieceManager{
|
||||
storageManager: storageManager,
|
||||
pieceDownloader: downloader,
|
||||
computePieceSize: func(contentLength int64) uint32 {
|
||||
return uint32(pieceSize)
|
||||
},
|
||||
},
|
||||
req,
|
||||
ptm.schedulerClient,
|
||||
ptm.schedulerOption,
|
||||
0)
|
||||
_, pt, _, err := newStreamPeerTask(ctx, ptm, req)
|
||||
assert.Nil(err, "new stream peer task")
|
||||
pt.SetCallback(&streamPeerTaskCallback{
|
||||
ptm: ptm,
|
||||
|
|
@ -213,7 +202,9 @@ func TestStreamPeerTask_BackSource_WithoutContentLength(t *testing.T) {
|
|||
pieceManager: &pieceManager{
|
||||
storageManager: storageManager,
|
||||
pieceDownloader: downloader,
|
||||
computePieceSize: util.ComputePieceSize,
|
||||
computePieceSize: func(contentLength int64) uint32 {
|
||||
return uint32(pieceSize)
|
||||
},
|
||||
},
|
||||
storageManager: storageManager,
|
||||
schedulerClient: schedulerClient,
|
||||
|
|
@ -230,19 +221,7 @@ func TestStreamPeerTask_BackSource_WithoutContentLength(t *testing.T) {
|
|||
PeerHost: &scheduler.PeerHost{},
|
||||
}
|
||||
ctx := context.Background()
|
||||
_, pt, _, err := newStreamPeerTask(ctx,
|
||||
ptm.host,
|
||||
&pieceManager{
|
||||
storageManager: storageManager,
|
||||
pieceDownloader: downloader,
|
||||
computePieceSize: func(contentLength int64) uint32 {
|
||||
return uint32(pieceSize)
|
||||
},
|
||||
},
|
||||
req,
|
||||
schedulerClient,
|
||||
ptm.schedulerOption,
|
||||
0)
|
||||
_, pt, _, err := newStreamPeerTask(ctx, ptm, req)
|
||||
assert.Nil(err, "new stream peer task")
|
||||
pt.SetCallback(&streamPeerTaskCallback{
|
||||
ptm: ptm,
|
||||
|
|
|
|||
|
|
@ -88,6 +88,9 @@ func (m *server) GetPieceTasks(ctx context.Context, request *base.PieceTaskReque
|
|||
p, err := m.storageManager.GetPieces(ctx, request)
|
||||
if err != nil {
|
||||
code := base.Code_UnknownError
|
||||
if err == dferrors.ErrInvalidArgument {
|
||||
code = base.Code_BadRequest
|
||||
}
|
||||
if err != storage.ErrTaskNotFound {
|
||||
logger.Errorf("get piece tasks error: %s, task id: %s, src peer: %s, dst peer: %s, piece num: %d, limit: %d",
|
||||
err, request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit)
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ import (
|
|||
"go.uber.org/atomic"
|
||||
|
||||
"d7y.io/dragonfly/v2/client/clientutil"
|
||||
"d7y.io/dragonfly/v2/internal/dferrors"
|
||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||
"d7y.io/dragonfly/v2/pkg/rpc/base"
|
||||
"d7y.io/dragonfly/v2/pkg/util/digestutils"
|
||||
|
|
@ -321,17 +320,22 @@ func (t *localTaskStore) GetPieces(ctx context.Context, req *base.PieceTaskReque
|
|||
return nil, ErrInvalidDigest
|
||||
}
|
||||
|
||||
var pieces []*base.PieceInfo
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
t.touch()
|
||||
if t.TotalPieces > 0 && int32(req.StartNum) >= t.TotalPieces {
|
||||
t.Errorf("invalid start num: %d", req.StartNum)
|
||||
return nil, dferrors.ErrInvalidArgument
|
||||
piecePacket := &base.PiecePacket{
|
||||
TaskId: req.TaskId,
|
||||
DstPid: t.PeerID,
|
||||
TotalPiece: t.TotalPieces,
|
||||
ContentLength: t.ContentLength,
|
||||
PieceMd5Sign: t.PieceMd5Sign,
|
||||
}
|
||||
if t.TotalPieces > -1 && int32(req.StartNum) >= t.TotalPieces {
|
||||
t.Warnf("invalid start num: %d", req.StartNum)
|
||||
}
|
||||
for i := int32(0); i < int32(req.Limit); i++ {
|
||||
if piece, ok := t.Pieces[int32(req.StartNum)+i]; ok {
|
||||
pieces = append(pieces, &base.PieceInfo{
|
||||
piecePacket.PieceInfos = append(piecePacket.PieceInfos, &base.PieceInfo{
|
||||
PieceNum: piece.Num,
|
||||
RangeStart: uint64(piece.Range.Start),
|
||||
RangeSize: uint32(piece.Range.Length),
|
||||
|
|
@ -341,14 +345,7 @@ func (t *localTaskStore) GetPieces(ctx context.Context, req *base.PieceTaskReque
|
|||
})
|
||||
}
|
||||
}
|
||||
return &base.PiecePacket{
|
||||
TaskId: req.TaskId,
|
||||
DstPid: t.PeerID,
|
||||
PieceInfos: pieces,
|
||||
TotalPiece: t.TotalPieces,
|
||||
ContentLength: t.ContentLength,
|
||||
PieceMd5Sign: t.PieceMd5Sign,
|
||||
}, nil
|
||||
return piecePacket, nil
|
||||
}
|
||||
|
||||
func (t *localTaskStore) CanReclaim() bool {
|
||||
|
|
|
|||
Loading…
Reference in New Issue