diff --git a/client/config/peerhost.go b/client/config/peerhost.go index 46bbec739..cbb3f5b8a 100644 --- a/client/config/peerhost.go +++ b/client/config/peerhost.go @@ -173,6 +173,7 @@ type DownloadOption struct { PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"` CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"` TransportOption *TransportOption `mapstructure:"transportOption" yaml:"transportOption"` + GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"` } type TransportOption struct { diff --git a/client/config/peerhost_darwin.go b/client/config/peerhost_darwin.go index f57305412..193fdaa58 100644 --- a/client/config/peerhost_darwin.go +++ b/client/config/peerhost_darwin.go @@ -60,6 +60,7 @@ var peerHostConfig = DaemonOption{ Download: DownloadOption{ CalculateDigest: true, PieceDownloadTimeout: 30 * time.Second, + GetPiecesMaxRetry: 100, TotalRateLimit: clientutil.RateLimit{ Limit: rate.Limit(DefaultTotalDownloadLimit), }, diff --git a/client/config/peerhost_linux.go b/client/config/peerhost_linux.go index b5795a5b4..02a9d2b82 100644 --- a/client/config/peerhost_linux.go +++ b/client/config/peerhost_linux.go @@ -60,6 +60,7 @@ var peerHostConfig = DaemonOption{ Download: DownloadOption{ CalculateDigest: true, PieceDownloadTimeout: 30 * time.Second, + GetPiecesMaxRetry: 100, TotalRateLimit: clientutil.RateLimit{ Limit: rate.Limit(DefaultTotalDownloadLimit), }, diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 1a3d6a121..a84e7e25e 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -168,7 +168,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { return nil, err } peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler, - opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest) + opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry) if err != nil { return nil, err } diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index 5b5702599..433d6a5ac 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -103,6 +103,8 @@ type peerTask struct { peerPacketReady chan bool // pieceParallelCount stands the piece parallel count from peerPacket pieceParallelCount *atomic.Int32 + // getPiecesMaxRetry stands max retry to get pieces from one peer packet + getPiecesMaxRetry int // done channel will be close when peer task is finished done chan struct{} @@ -502,8 +504,8 @@ loop: func (pt *peerTask) init(piecePacket *base.PiecePacket, pieceBufferSize uint32) (chan *DownloadPieceRequest, bool) { pt.contentLength.Store(piecePacket.ContentLength) - if pt.contentLength.Load() > 0 { - pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(pt.contentLength.Load())) + if piecePacket.ContentLength > 0 { + pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(piecePacket.ContentLength)) } if err := pt.callback.Init(pt); err != nil { pt.span.RecordError(err) @@ -579,7 +581,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) { // preparePieceTasksByPeer func already send piece result with error pt.Infof("new peer client ready, main peer: %s", pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer) // research from piece 0 - return pt.getNextPieceNum(0), true + return 0, true } // when scheduler says base.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady pt.Infof("start download from source due to base.Code_SchedNeedBackSource") @@ -605,6 +607,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) { } func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *base.PiecePacket) { + pt.Debugf("dispatch piece request, piece count: %d", len(piecePacket.PieceInfos)) for _, piece := range piecePacket.PieceInfos { pt.Infof("get piece %d from %s/%s, md5: %s, start: %d, size: %d", piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize) @@ -721,7 +724,9 @@ func (pt *peerTask) isCompleted() bool { func (pt *peerTask) preparePieceTasks(request *base.PieceTaskRequest) (p *base.PiecePacket, err error) { defer pt.recoverFromPanic() + var retryCount int prepare: + retryCount++ peerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket) pt.pieceParallelCount.Store(peerPacket.ParallelCount) request.DstPid = peerPacket.MainPeer.PeerId @@ -730,6 +735,10 @@ prepare: return } if err == errPeerPacketChanged { + if pt.getPiecesMaxRetry > 0 && retryCount > pt.getPiecesMaxRetry { + err = fmt.Errorf("get pieces max retry count reached") + return + } goto prepare } for _, peer := range peerPacket.StealPeers { @@ -739,6 +748,10 @@ prepare: return } if err == errPeerPacketChanged { + if pt.getPiecesMaxRetry > 0 && retryCount > pt.getPiecesMaxRetry { + err = fmt.Errorf("get pieces max retry count reached") + return + } goto prepare } } @@ -816,51 +829,75 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer count int ) p, _, err := retry.Run(pt.ctx, func() (interface{}, bool, error) { - pp, getErr := dfclient.GetPieceTasks(pt.ctx, peer, request) + piecePacket, getError := dfclient.GetPieceTasks(pt.ctx, peer, request) // when GetPieceTasks returns err, exit retry - if getErr != nil { - span.RecordError(getErr) - // fast way to exit retry + if getError != nil { + pt.Errorf("get piece tasks with error: %s", getError) + span.RecordError(getError) + + // fast way 1 to exit retry + if de, ok := getError.(*dferrors.DfError); ok { + pt.Debugf("get piece task with grpc error, code: %d", de.Code) + // bad request, like invalid piece num, just exit + if de.Code == base.Code_BadRequest { + span.AddEvent("bad request") + pt.Warnf("get piece task from peer %s canceled: %s", peer.PeerId, getError) + return nil, true, getError + } + } + + // fast way 2 to exit retry lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket) if curPeerPacket.MainPeer.PeerId != lastPeerPacket.MainPeer.PeerId { - pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr, + pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getError, curPeerPacket.MainPeer.PeerId, lastPeerPacket.MainPeer.PeerId) peerPacketChanged = true return nil, true, nil } - return nil, true, getErr + return nil, true, getError } + // got any pieces + if len(piecePacket.PieceInfos) > 0 { + return piecePacket, false, nil + } + // need update metadata + if piecePacket.ContentLength > pt.contentLength.Load() || piecePacket.TotalPiece > pt.totalPiece { + return piecePacket, false, nil + } + // invalid request num + if piecePacket.TotalPiece > -1 && uint32(piecePacket.TotalPiece) <= request.StartNum { + pt.Warnf("invalid start num: %d, total piece: %d", request.StartNum, piecePacket.TotalPiece) + return piecePacket, false, nil + } + // by santong: when peer return empty, retry later - if len(pp.PieceInfos) == 0 { - count++ - er := pt.peerPacketStream.Send(&scheduler.PieceResult{ - TaskId: pt.taskID, - SrcPid: pt.peerID, - DstPid: peer.PeerId, - PieceInfo: &base.PieceInfo{}, - Success: false, - Code: base.Code_ClientWaitPieceReady, - HostLoad: nil, - FinishedCount: pt.readyPieces.Settled(), - }) - if er != nil { - span.RecordError(er) - pt.Errorf("send piece result with base.Code_ClientWaitPieceReady error: %s", er) - } - // fast way to exit retry - lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket) - if curPeerPacket.MainPeer.PeerId != lastPeerPacket.MainPeer.PeerId { - pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", - curPeerPacket.MainPeer.PeerId, lastPeerPacket.MainPeer.PeerId) - peerPacketChanged = true - return nil, true, nil - } - span.AddEvent("retry due to empty pieces", - trace.WithAttributes(config.AttributeGetPieceRetry.Int(count))) - pt.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId) - return nil, false, dferrors.ErrEmptyValue + sendError := pt.peerPacketStream.Send(&scheduler.PieceResult{ + TaskId: pt.taskID, + SrcPid: pt.peerID, + DstPid: peer.PeerId, + PieceInfo: &base.PieceInfo{}, + Success: false, + Code: base.Code_ClientWaitPieceReady, + HostLoad: nil, + FinishedCount: pt.readyPieces.Settled(), + }) + if sendError != nil { + span.RecordError(sendError) + pt.Errorf("send piece result with base.Code_ClientWaitPieceReady error: %s", sendError) } - return pp, false, nil + // fast way to exit retry + lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket) + if curPeerPacket.MainPeer.PeerId != lastPeerPacket.MainPeer.PeerId { + pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", + curPeerPacket.MainPeer.PeerId, lastPeerPacket.MainPeer.PeerId) + peerPacketChanged = true + return nil, true, nil + } + count++ + span.AddEvent("retry due to empty pieces", + trace.WithAttributes(config.AttributeGetPieceRetry.Int(count))) + pt.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId) + return nil, false, dferrors.ErrEmptyValue }, 0.05, 0.2, 40, nil) if peerPacketChanged { return nil, errPeerPacketChanged @@ -877,13 +914,14 @@ func (pt *peerTask) getNextPieceNum(cur int32) int32 { return -1 } i := cur + // try to find next not requested piece for ; pt.requestedPieces.IsSet(i); i++ { } if pt.totalPiece > 0 && i >= pt.totalPiece { // double check, re-search not success or not requested pieces for i = int32(0); pt.requestedPieces.IsSet(i); i++ { } - if pt.totalPiece > 0 && i >= int32(pt.totalPiece) { + if pt.totalPiece > 0 && i >= pt.totalPiece { return -1 } } diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 137813df8..55b22bbfa 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -86,7 +86,8 @@ func newFilePeerTask(ctx context.Context, request *FilePeerTaskRequest, schedulerClient schedulerclient.SchedulerClient, schedulerOption config.SchedulerOption, - perPeerRateLimit rate.Limit) (context.Context, *filePeerTask, *TinyData, error) { + perPeerRateLimit rate.Limit, + getPiecesMaxRetry int) (context.Context, *filePeerTask, *TinyData, error) { ctx, span := tracer.Start(ctx, config.SpanFilePeerTask, trace.WithSpanKind(trace.SpanKindClient)) span.SetAttributes(config.AttributePeerHost.String(host.Uuid)) span.SetAttributes(semconv.NetHostIPKey.String(host.Ip)) @@ -196,6 +197,7 @@ func newFilePeerTask(ctx context.Context, contentLength: atomic.NewInt64(-1), pieceParallelCount: atomic.NewInt32(0), totalPiece: -1, + getPiecesMaxRetry: getPiecesMaxRetry, schedulerOption: schedulerOption, schedulerClient: schedulerClient, limiter: limiter, diff --git a/client/daemon/peer/peertask_file_test.go b/client/daemon/peer/peertask_file_test.go index 32b2260df..430becd87 100644 --- a/client/daemon/peer/peertask_file_test.go +++ b/client/daemon/peer/peertask_file_test.go @@ -136,7 +136,7 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) { req, ptm.schedulerClient, ptm.schedulerOption, - 0) + 0, 10) assert.Nil(err, "new file peer task") pt.needBackSource = true @@ -261,7 +261,7 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) { req, ptm.schedulerClient, ptm.schedulerOption, - 0) + 0, 10) assert.Nil(err, "new file peer task") pt.needBackSource = true diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 486453a54..964ec5a93 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -117,6 +117,8 @@ type peerTaskManager struct { enableMultiplex bool calculateDigest bool + + getPiecesMaxRetry int } func NewPeerTaskManager( @@ -127,18 +129,20 @@ func NewPeerTaskManager( schedulerOption config.SchedulerOption, perPeerRateLimit rate.Limit, multiplex bool, - calculateDigest bool) (TaskManager, error) { + calculateDigest bool, + getPiecesMaxRetry int) (TaskManager, error) { ptm := &peerTaskManager{ - host: host, - runningPeerTasks: sync.Map{}, - pieceManager: pieceManager, - storageManager: storageManager, - schedulerClient: schedulerClient, - schedulerOption: schedulerOption, - perPeerRateLimit: perPeerRateLimit, - enableMultiplex: multiplex, - calculateDigest: calculateDigest, + host: host, + runningPeerTasks: sync.Map{}, + pieceManager: pieceManager, + storageManager: storageManager, + schedulerClient: schedulerClient, + schedulerOption: schedulerOption, + perPeerRateLimit: perPeerRateLimit, + enableMultiplex: multiplex, + calculateDigest: calculateDigest, + getPiecesMaxRetry: getPiecesMaxRetry, } return ptm, nil } @@ -159,7 +163,7 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer limit = rate.Limit(req.Limit) } ctx, pt, tiny, err := newFilePeerTask(ctx, ptm.host, ptm.pieceManager, - req, ptm.schedulerClient, ptm.schedulerOption, limit) + req, ptm.schedulerClient, ptm.schedulerOption, limit, ptm.getPiecesMaxRetry) if err != nil { return nil, nil, err } @@ -217,8 +221,7 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu } start := time.Now() - ctx, pt, tiny, err := newStreamPeerTask(ctx, ptm.host, ptm.pieceManager, - req, ptm.schedulerClient, ptm.schedulerOption, ptm.perPeerRateLimit) + ctx, pt, tiny, err := newStreamPeerTask(ctx, ptm, req) if err != nil { return nil, nil, err } diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 9467933de..7d8e0f592 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -34,7 +34,6 @@ import ( "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" - schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" ) // StreamPeerTask represents a peer task with stream io for reading directly without once more disk io @@ -55,36 +54,32 @@ type streamPeerTask struct { var _ StreamPeerTask = (*streamPeerTask)(nil) func newStreamPeerTask(ctx context.Context, - host *scheduler.PeerHost, - pieceManager PieceManager, - request *scheduler.PeerTaskRequest, - schedulerClient schedulerclient.SchedulerClient, - schedulerOption config.SchedulerOption, - perPeerRateLimit rate.Limit) (context.Context, *streamPeerTask, *TinyData, error) { + ptm *peerTaskManager, + request *scheduler.PeerTaskRequest) (context.Context, *streamPeerTask, *TinyData, error) { ctx, span := tracer.Start(ctx, config.SpanStreamPeerTask, trace.WithSpanKind(trace.SpanKindClient)) - span.SetAttributes(config.AttributePeerHost.String(host.Uuid)) - span.SetAttributes(semconv.NetHostIPKey.String(host.Ip)) + span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid)) + span.SetAttributes(semconv.NetHostIPKey.String(ptm.host.Ip)) span.SetAttributes(config.AttributePeerID.String(request.PeerId)) span.SetAttributes(semconv.HTTPURLKey.String(request.Url)) logger.Debugf("request overview, pid: %s, url: %s, filter: %s, meta: %s, tag: %s", request.PeerId, request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag) // trace register - regCtx, cancel := context.WithTimeout(ctx, schedulerOption.ScheduleTimeout.Duration) + regCtx, cancel := context.WithTimeout(ctx, ptm.schedulerOption.ScheduleTimeout.Duration) defer cancel() regCtx, regSpan := tracer.Start(regCtx, config.SpanRegisterTask) logger.Infof("step 1: peer %s start to register", request.PeerId) - result, err := schedulerClient.RegisterPeerTask(regCtx, request) + result, err := ptm.schedulerClient.RegisterPeerTask(regCtx, request) regSpan.RecordError(err) regSpan.End() var needBackSource bool if err != nil { if err == context.DeadlineExceeded { - logger.Errorf("scheduler did not response in %s", schedulerOption.ScheduleTimeout.Duration) + logger.Errorf("scheduler did not response in %s", ptm.schedulerOption.ScheduleTimeout.Duration) } logger.Errorf("step 1: peer %s register failed: %s", request.PeerId, err) - if schedulerOption.DisableAutoBackSource { + if ptm.schedulerOption.DisableAutoBackSource { logger.Errorf("register peer task failed: %s, peer id: %s, auto back source disabled", err, request.PeerId) span.RecordError(err) span.End() @@ -92,7 +87,7 @@ func newStreamPeerTask(ctx context.Context, } needBackSource = true // can not detect source or scheduler error, create a new dummy scheduler client - schedulerClient = &dummySchedulerClient{} + ptm.schedulerClient = &dummySchedulerClient{} result = &scheduler.RegisterResult{TaskId: idgen.TaskID(request.Url, request.UrlMeta)} logger.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, request.PeerId) } @@ -137,7 +132,7 @@ func newStreamPeerTask(ctx context.Context, } } - peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) + peerPacketStream, err := ptm.schedulerClient.ReportPieceResult(ctx, result.TaskId, request) logger.Infof("step 2: start report peer %s piece result", request.PeerId) if err != nil { defer span.End() @@ -145,19 +140,19 @@ func newStreamPeerTask(ctx context.Context, return ctx, nil, nil, err } var limiter *rate.Limiter - if perPeerRateLimit > 0 { - limiter = rate.NewLimiter(perPeerRateLimit, int(perPeerRateLimit)) + if ptm.perPeerRateLimit > 0 { + limiter = rate.NewLimiter(ptm.perPeerRateLimit, int(ptm.perPeerRateLimit)) } pt := &streamPeerTask{ successPieceCh: make(chan int32), streamDone: make(chan struct{}), peerTask: peerTask{ ctx: ctx, - host: host, + host: ptm.host, needBackSource: needBackSource, request: request, peerPacketStream: peerPacketStream, - pieceManager: pieceManager, + pieceManager: ptm.pieceManager, peerPacketReady: make(chan bool, 1), peerID: request.PeerId, taskID: result.TaskId, @@ -172,8 +167,9 @@ func newStreamPeerTask(ctx context.Context, contentLength: atomic.NewInt64(-1), pieceParallelCount: atomic.NewInt32(0), totalPiece: -1, - schedulerOption: schedulerOption, - schedulerClient: schedulerClient, + getPiecesMaxRetry: ptm.getPiecesMaxRetry, + schedulerOption: ptm.schedulerOption, + schedulerClient: ptm.schedulerClient, limiter: limiter, completedLength: atomic.NewInt64(0), usedTraffic: atomic.NewUint64(0), diff --git a/client/daemon/peer/peertask_stream_backsource_partial_test.go b/client/daemon/peer/peertask_stream_backsource_partial_test.go index 5059c337d..ab6801fe8 100644 --- a/client/daemon/peer/peertask_stream_backsource_partial_test.go +++ b/client/daemon/peer/peertask_stream_backsource_partial_test.go @@ -256,8 +256,7 @@ func TestStreamPeerTask_BackSource_Partial_WithContentLength(t *testing.T) { PeerHost: &scheduler.PeerHost{}, } ctx := context.Background() - _, pt, _, err := newStreamPeerTask(ctx, ptm.host, pm, req, - ptm.schedulerClient, ptm.schedulerOption, 0) + _, pt, _, err := newStreamPeerTask(ctx, ptm, req) assert.Nil(err, "new stream peer task") pt.SetCallback(&streamPeerTaskCallback{ ptm: ptm, diff --git a/client/daemon/peer/peertask_stream_test.go b/client/daemon/peer/peertask_stream_test.go index b755687b8..4205b0f1f 100644 --- a/client/daemon/peer/peertask_stream_test.go +++ b/client/daemon/peer/peertask_stream_test.go @@ -32,7 +32,6 @@ import ( "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/test" - "d7y.io/dragonfly/v2/internal/util" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/pkg/source" @@ -101,9 +100,11 @@ func TestStreamPeerTask_BackSource_WithContentLength(t *testing.T) { }, runningPeerTasks: sync.Map{}, pieceManager: &pieceManager{ - storageManager: storageManager, - pieceDownloader: downloader, - computePieceSize: util.ComputePieceSize, + storageManager: storageManager, + pieceDownloader: downloader, + computePieceSize: func(contentLength int64) uint32 { + return uint32(pieceSize) + }, }, storageManager: storageManager, schedulerClient: schedulerClient, @@ -120,19 +121,7 @@ func TestStreamPeerTask_BackSource_WithContentLength(t *testing.T) { PeerHost: &scheduler.PeerHost{}, } ctx := context.Background() - _, pt, _, err := newStreamPeerTask(ctx, - ptm.host, - &pieceManager{ - storageManager: storageManager, - pieceDownloader: downloader, - computePieceSize: func(contentLength int64) uint32 { - return uint32(pieceSize) - }, - }, - req, - ptm.schedulerClient, - ptm.schedulerOption, - 0) + _, pt, _, err := newStreamPeerTask(ctx, ptm, req) assert.Nil(err, "new stream peer task") pt.SetCallback(&streamPeerTaskCallback{ ptm: ptm, @@ -211,9 +200,11 @@ func TestStreamPeerTask_BackSource_WithoutContentLength(t *testing.T) { }, runningPeerTasks: sync.Map{}, pieceManager: &pieceManager{ - storageManager: storageManager, - pieceDownloader: downloader, - computePieceSize: util.ComputePieceSize, + storageManager: storageManager, + pieceDownloader: downloader, + computePieceSize: func(contentLength int64) uint32 { + return uint32(pieceSize) + }, }, storageManager: storageManager, schedulerClient: schedulerClient, @@ -230,19 +221,7 @@ func TestStreamPeerTask_BackSource_WithoutContentLength(t *testing.T) { PeerHost: &scheduler.PeerHost{}, } ctx := context.Background() - _, pt, _, err := newStreamPeerTask(ctx, - ptm.host, - &pieceManager{ - storageManager: storageManager, - pieceDownloader: downloader, - computePieceSize: func(contentLength int64) uint32 { - return uint32(pieceSize) - }, - }, - req, - schedulerClient, - ptm.schedulerOption, - 0) + _, pt, _, err := newStreamPeerTask(ctx, ptm, req) assert.Nil(err, "new stream peer task") pt.SetCallback(&streamPeerTaskCallback{ ptm: ptm, diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 5a4a2a04e..0c6c5c328 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -88,6 +88,9 @@ func (m *server) GetPieceTasks(ctx context.Context, request *base.PieceTaskReque p, err := m.storageManager.GetPieces(ctx, request) if err != nil { code := base.Code_UnknownError + if err == dferrors.ErrInvalidArgument { + code = base.Code_BadRequest + } if err != storage.ErrTaskNotFound { logger.Errorf("get piece tasks error: %s, task id: %s, src peer: %s, dst peer: %s, piece num: %d, limit: %d", err, request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit) diff --git a/client/daemon/storage/local_storage.go b/client/daemon/storage/local_storage.go index 5fc966704..c9c3e3434 100644 --- a/client/daemon/storage/local_storage.go +++ b/client/daemon/storage/local_storage.go @@ -28,7 +28,6 @@ import ( "go.uber.org/atomic" "d7y.io/dragonfly/v2/client/clientutil" - "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/util/digestutils" @@ -321,17 +320,22 @@ func (t *localTaskStore) GetPieces(ctx context.Context, req *base.PieceTaskReque return nil, ErrInvalidDigest } - var pieces []*base.PieceInfo t.RLock() defer t.RUnlock() t.touch() - if t.TotalPieces > 0 && int32(req.StartNum) >= t.TotalPieces { - t.Errorf("invalid start num: %d", req.StartNum) - return nil, dferrors.ErrInvalidArgument + piecePacket := &base.PiecePacket{ + TaskId: req.TaskId, + DstPid: t.PeerID, + TotalPiece: t.TotalPieces, + ContentLength: t.ContentLength, + PieceMd5Sign: t.PieceMd5Sign, + } + if t.TotalPieces > -1 && int32(req.StartNum) >= t.TotalPieces { + t.Warnf("invalid start num: %d", req.StartNum) } for i := int32(0); i < int32(req.Limit); i++ { if piece, ok := t.Pieces[int32(req.StartNum)+i]; ok { - pieces = append(pieces, &base.PieceInfo{ + piecePacket.PieceInfos = append(piecePacket.PieceInfos, &base.PieceInfo{ PieceNum: piece.Num, RangeStart: uint64(piece.Range.Start), RangeSize: uint32(piece.Range.Length), @@ -341,14 +345,7 @@ func (t *localTaskStore) GetPieces(ctx context.Context, req *base.PieceTaskReque }) } } - return &base.PiecePacket{ - TaskId: req.TaskId, - DstPid: t.PeerID, - PieceInfos: pieces, - TotalPiece: t.TotalPieces, - ContentLength: t.ContentLength, - PieceMd5Sign: t.PieceMd5Sign, - }, nil + return piecePacket, nil } func (t *localTaskStore) CanReclaim() bool {