diff --git a/client/config/constants_otel.go b/client/config/constants_otel.go index 97758e76b..b5803f548 100644 --- a/client/config/constants_otel.go +++ b/client/config/constants_otel.go @@ -55,6 +55,7 @@ const ( SpanBackSource = "client-back-source" SpanFirstSchedule = "schedule-#1" SpanGetPieceTasks = "get-piece-tasks" + SpanSyncPieceTasks = "sync-piece-tasks" SpanDownloadPiece = "download-piece-#%d" SpanProxy = "proxy" SpanWritePiece = "write-piece" diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index a4ad59950..e2f19f92a 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -476,7 +476,7 @@ func (pt *peerTaskConductor) pullPieces() { func (pt *peerTaskConductor) pullPiecesWithP2P() { var ( - // keep same size with pt.failedPieceCh for avoiding dead lock + // keep same size with pt.failedPieceCh for avoiding deadlock pieceBufferSize = uint32(config.DefaultPieceChanSize) pieceRequestCh = make(chan *DownloadPieceRequest, pieceBufferSize) ) @@ -684,6 +684,8 @@ func (pt *peerTaskConductor) updateSynchronizer(lastNum int32, p *scheduler.Peer num, ok := pt.getNextNotReadyPieceNum(lastNum) if !ok { pt.Infof("all pieces is ready, peer task completed, skip to synchronize") + p.MainPeer = nil + p.StealPeers = nil return num } var peers = []*scheduler.PeerPacket_DestPeer{p.MainPeer} diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index 3515da510..8b1e90ddd 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -18,14 +18,17 @@ package peer import ( "context" + "fmt" "io" "sync" "github.com/pkg/errors" + "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "d7y.io/dragonfly/v2/client/config" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" @@ -44,6 +47,7 @@ type pieceTaskSyncManager struct { type pieceTaskSynchronizer struct { *logger.SugaredLoggerOnWith + span trace.Span client dfdaemon.Daemon_SyncPieceTasksClient dstPeer *scheduler.PeerPacket_DestPeer error atomic.Value @@ -158,7 +162,9 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer( return err } + _, span := tracer.Start(s.ctx, config.SpanSyncPieceTasks) synchronizer := &pieceTaskSynchronizer{ + span: span, peerTaskConductor: s.peerTaskConductor, pieceRequestCh: s.pieceRequestCh, client: client, @@ -263,7 +269,9 @@ func (s *pieceTaskSynchronizer) close() { if err := s.client.CloseSend(); err != nil { s.error.Store(&pieceTaskSynchronizerError{err}) s.Debugf("close send error: %s, dest peer: %s", err, s.dstPeer.PeerId) + s.span.RecordError(err) } + s.span.End() } func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *base.PiecePacket) { @@ -299,6 +307,7 @@ func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *base.PiecePack } select { case s.pieceRequestCh <- req: + s.span.AddEvent(fmt.Sprintf("send piece #%d request", piece.PieceNum)) case <-s.peerTaskConductor.successCh: s.Infof("peer task success, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId) case <-s.peerTaskConductor.failCh: @@ -325,7 +334,8 @@ func (s *pieceTaskSynchronizer) receive(piecePacket *base.PiecePacket) { } else { s.Errorf("synchronizer receives with error: %s", err) s.error.Store(&pieceTaskSynchronizerError{err}) - s.reportError() + s.reportError(err) + s.Errorf("synchronizer receives with error: %s", err) } } @@ -336,19 +346,21 @@ func (s *pieceTaskSynchronizer) acquire(request *base.PieceTaskRequest) error { return err } err := s.client.Send(request) + s.span.AddEvent(fmt.Sprintf("send piece #%d request", request.StartNum)) if err != nil { s.error.Store(&pieceTaskSynchronizerError{err}) if s.canceled(err) { s.Debugf("synchronizer sends canceled") } else { s.Errorf("synchronizer sends with error: %s", err) - s.reportError() + s.reportError(err) } } return err } -func (s *pieceTaskSynchronizer) reportError() { +func (s *pieceTaskSynchronizer) reportError(err error) { + s.span.RecordError(err) sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, base.Code_ClientPieceRequestFail)) if sendError != nil { s.Errorf("sync piece info failed and send piece result with error: %s", sendError) diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 474216f44..089426f07 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -19,6 +19,7 @@ package rpcserver import ( "context" "fmt" + "io" "math" "net" "os" @@ -139,59 +140,85 @@ func (s *server) GetPieceTasks(ctx context.Context, request *base.PieceTaskReque } // sendExistPieces will send as much as possible pieces -func (s *server) sendExistPieces(request *base.PieceTaskRequest, sync dfdaemongrpc.Daemon_SyncPieceTasksServer, sentMap map[int32]struct{}) (total int32, sent int, err error) { - return sendExistPieces(sync.Context(), s.GetPieceTasks, request, sync, sentMap, true) +func (s *server) sendExistPieces(log *logger.SugaredLoggerOnWith, request *base.PieceTaskRequest, sync dfdaemongrpc.Daemon_SyncPieceTasksServer, sentMap map[int32]struct{}) (total int32, err error) { + return sendExistPieces(sync.Context(), log, s.GetPieceTasks, request, sync, sentMap, true) } // sendFirstPieceTasks will send as much as possible pieces, even if no available pieces -func (s *server) sendFirstPieceTasks(request *base.PieceTaskRequest, sync dfdaemongrpc.Daemon_SyncPieceTasksServer, sentMap map[int32]struct{}) (total int32, sent int, err error) { - return sendExistPieces(sync.Context(), s.GetPieceTasks, request, sync, sentMap, false) +func (s *server) sendFirstPieceTasks(log *logger.SugaredLoggerOnWith, request *base.PieceTaskRequest, sync dfdaemongrpc.Daemon_SyncPieceTasksServer, sentMap map[int32]struct{}) (total int32, err error) { + return sendExistPieces(sync.Context(), log, s.GetPieceTasks, request, sync, sentMap, false) } func (s *server) SyncPieceTasks(sync dfdaemongrpc.Daemon_SyncPieceTasksServer) error { request, err := sync.Recv() if err != nil { + logger.Errorf("receive first sync piece tasks request error: %s", err.Error()) return err } + log := logger.With("taskID", request.TaskId, + "localPeerID", request.DstPid, "remotePeerID", request.SrcPid) + skipPieceCount := request.StartNum var sentMap = make(map[int32]struct{}) + // TODO if not found, try to send to peer task conductor, then download it first - total, sent, err := s.sendFirstPieceTasks(request, sync, sentMap) + total, err := s.sendFirstPieceTasks(log, request, sync, sentMap) if err != nil { + log.Errorf("send first piece tasks error: %s", err) return err } - // task is done, just return - if int(total) == sent { - return nil + recvReminding := func() error { + for { + request, err = sync.Recv() + if err == io.EOF { + return nil + } + if err != nil { + logger.Errorf("receive reminding piece tasks request error: %s", err) + return err + } + total, err = s.sendExistPieces(log, request, sync, sentMap) + if err != nil { + logger.Errorf("send reminding piece tasks error: %s", err) + return err + } + } + } + + // task is done, just receive new piece tasks requests only + if int(total) == len(sentMap)+int(skipPieceCount) { + log.Infof("all piece tasks sent, receive new piece tasks requests only") + return recvReminding() } // subscribe peer task message for remaining pieces result, ok := s.peerTaskManager.Subscribe(request) if !ok { - // task not found, double check for done task - total, sent, err = s.sendExistPieces(request, sync, sentMap) + // running task not found, double check for done task + request.StartNum = searchNextPieceNum(sentMap, skipPieceCount) + total, err = s.sendExistPieces(log, request, sync, sentMap) if err != nil { + log.Errorf("send exist piece tasks error: %s", err) return err } - if int(total) > sent { + if int(total) > len(sentMap)+int(skipPieceCount) { return status.Errorf(codes.Unavailable, "peer task not finish, but no running task found") } - return nil + return recvReminding() } var sub = &subscriber{ - SubscribeResult: result, - sync: sync, - request: request, - skipPieceCount: skipPieceCount, - totalPieces: total, - sentMap: sentMap, - done: make(chan struct{}), - uploadAddr: s.uploadAddr, - SugaredLoggerOnWith: logger.With("taskID", request.TaskId, - "localPeerID", request.DstPid, "remotePeerID", request.SrcPid), + SubscribeResult: result, + sync: sync, + request: request, + skipPieceCount: skipPieceCount, + totalPieces: total, + sentMap: sentMap, + done: make(chan struct{}), + uploadAddr: s.uploadAddr, + SugaredLoggerOnWith: log, } go sub.receiveRemainingPieceTaskRequests() diff --git a/client/daemon/rpcserver/rpcserver_test.go b/client/daemon/rpcserver/rpcserver_test.go index 942c35a28..da6f812c9 100644 --- a/client/daemon/rpcserver/rpcserver_test.go +++ b/client/daemon/rpcserver/rpcserver_test.go @@ -48,7 +48,7 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func TestDownloadManager_ServeDownload(t *testing.T) { +func Test_ServeDownload(t *testing.T) { assert := testifyassert.New(t) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -112,7 +112,7 @@ func TestDownloadManager_ServeDownload(t *testing.T) { assert.True(lastResult.Done) } -func TestDownloadManager_ServePeer(t *testing.T) { +func Test_ServePeer(t *testing.T) { assert := testifyassert.New(t) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -216,7 +216,7 @@ func TestDownloadManager_ServePeer(t *testing.T) { } } -func TestDownloadManager_SyncPieceTasks(t *testing.T) { +func Test_SyncPieceTasks(t *testing.T) { assert := testifyassert.New(t) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -231,12 +231,12 @@ func TestDownloadManager_SyncPieceTasks(t *testing.T) { } var tests = []struct { name string - existTaskID string // test for non-exists task - existPieces []pieceRange + existTaskID string // test for non-exists task + existPieces []pieceRange // already exist pieces in storage + followingPieces []pieceRange // following pieces in running task subscribe channel requestPieces []int32 limit uint32 totalPieces uint32 - followingPieces []pieceRange success bool verify func(t *testing.T, assert *testifyassert.Assertions) }{ @@ -253,6 +253,20 @@ func TestDownloadManager_SyncPieceTasks(t *testing.T) { verify: func(t *testing.T, assert *testifyassert.Assertions) { }, }, + { + name: "already exists in storage with extra get piece request", + existPieces: []pieceRange{ + { + start: 0, + end: 10, + }, + }, + totalPieces: 11, + requestPieces: []int32{1, 3, 5, 7}, + success: true, + verify: func(t *testing.T, assert *testifyassert.Assertions) { + }, + }, { name: "already exists in storage - large", existPieces: []pieceRange{ @@ -266,6 +280,20 @@ func TestDownloadManager_SyncPieceTasks(t *testing.T) { verify: func(t *testing.T, assert *testifyassert.Assertions) { }, }, + { + name: "already exists in storage - large with extra get piece request", + existPieces: []pieceRange{ + { + start: 0, + end: 1000, + }, + }, + totalPieces: 1001, + requestPieces: []int32{1, 3, 5, 7, 100, 500, 1000}, + success: true, + verify: func(t *testing.T, assert *testifyassert.Assertions) { + }, + }, { name: "partial exists in storage", existPieces: []pieceRange{ @@ -452,7 +480,6 @@ func TestDownloadManager_SyncPieceTasks(t *testing.T) { } port, client := setupPeerServerAndClient(t, s, assert, s.ServePeer) - defer s.peerServer.GracefulStop() syncClient, err := client.SyncPieceTasks( context.Background(), @@ -514,6 +541,7 @@ func TestDownloadManager_SyncPieceTasks(t *testing.T) { if tc.success { assert.Equal(int(maxNum+1), len(total)) } + s.peerServer.GracefulStop() } }) diff --git a/client/daemon/rpcserver/subscriber.go b/client/daemon/rpcserver/subscriber.go index 9a24510f5..fcd10231f 100644 --- a/client/daemon/rpcserver/subscriber.go +++ b/client/daemon/rpcserver/subscriber.go @@ -52,41 +52,56 @@ func (s *subscriber) getPieces(ctx context.Context, request *base.PieceTaskReque func sendExistPieces( ctx context.Context, + log *logger.SugaredLoggerOnWith, get func(ctx context.Context, request *base.PieceTaskRequest) (*base.PiecePacket, error), request *base.PieceTaskRequest, sync dfdaemon.Daemon_SyncPieceTasksServer, - sendMap map[int32]struct{}, - skipSendZeroPiece bool) (total int32, sent int, err error) { + sentMap map[int32]struct{}, + skipSendZeroPiece bool) (total int32, err error) { if request.Limit <= 0 { request.Limit = 16 } + var pp *base.PiecePacket for { - pp, err := get(ctx, request) + pp, err = get(ctx, request) if err != nil { - return -1, -1, err + log.Errorf("get piece error: %s", err) + return -1, err } if len(pp.PieceInfos) == 0 && skipSendZeroPiece { - return pp.TotalPiece, sent, nil + return pp.TotalPiece, nil } if err = sync.Send(pp); err != nil { - return pp.TotalPiece, sent, err + log.Errorf("send pieces error: %s", err) + return pp.TotalPiece, err } for _, p := range pp.PieceInfos { - sendMap[p.PieceNum] = struct{}{} + log.Infof("send ready piece %d", p.PieceNum) + sentMap[p.PieceNum] = struct{}{} } - sent += len(pp.PieceInfos) if uint32(len(pp.PieceInfos)) < request.Limit { - return pp.TotalPiece, sent, nil + log.Infof("sent %d pieces, total: %d", len(pp.PieceInfos), pp.TotalPiece) + return pp.TotalPiece, nil } // the get piece func always return sorted pieces, use last piece num + 1 to get more pieces request.StartNum = uint32(pp.PieceInfos[request.Limit-1].PieceNum + 1) } } +func searchNextPieceNum(sentMap map[int32]struct{}, cur uint32) (nextPieceNum uint32) { + for i := int32(cur); ; i++ { + if _, ok := sentMap[i]; !ok { + nextPieceNum = uint32(i) + break + } + } + return nextPieceNum +} + // sendExistPieces will send as much as possible pieces -func (s *subscriber) sendExistPieces(startNum uint32) (total int32, sent int, err error) { +func (s *subscriber) sendExistPieces(startNum uint32) (total int32, err error) { s.request.StartNum = startNum - return sendExistPieces(s.sync.Context(), s.getPieces, s.request, s.sync, s.sentMap, true) + return sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true) } func (s *subscriber) receiveRemainingPieceTaskRequests() { @@ -123,6 +138,7 @@ func (s *subscriber) receiveRemainingPieceTaskRequests() { return } for _, p := range pp.PieceInfos { + s.Infof("send ready piece %d", p.PieceNum) s.sentMap[p.PieceNum] = struct{}{} } s.Unlock() @@ -148,14 +164,14 @@ loop: s.Infof("remote SyncPieceTasks done, exit sending, local task is running") return nil case info := <-s.PieceInfoChannel: + s.Infof("receive piece info, num: %d, %v", info.Num, info.Finished) // not desired piece - s.Debugf("receive piece info, num: %d, %v", info.Num, info.Finished) if s.totalPieces > -1 && uint32(info.Num) < nextPieceNum { continue } - s.Lock() - total, _, err := s.sendExistPieces(uint32(info.Num)) + s.Lock() + total, err := s.sendExistPieces(uint32(info.Num)) if err != nil { err = s.saveError(err) s.Unlock() @@ -175,13 +191,14 @@ loop: nextPieceNum = s.searchNextPieceNum(nextPieceNum) s.Unlock() case <-s.Success: + s.Infof("peer task is success, send remaining pieces") s.Lock() // all pieces already sent if s.totalPieces > -1 && nextPieceNum == uint32(s.totalPieces) { s.Unlock() break loop } - total, _, err := s.sendExistPieces(nextPieceNum) + total, err := s.sendExistPieces(nextPieceNum) if err != nil { err = s.saveError(err) s.Unlock() @@ -192,11 +209,14 @@ loop: } if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) != int(s.totalPieces) { s.Unlock() - return dferrors.Newf(base.Code_ClientError, "peer task success, but can not send all pieces") + msg := "peer task success, but can not send all pieces" + s.Errorf(msg) + return dferrors.Newf(base.Code_ClientError, msg) } s.Unlock() break loop case <-s.Fail: + s.Errorf("peer task failed") return dferrors.Newf(base.Code_ClientError, "peer task failed") } } @@ -221,11 +241,5 @@ func (s *subscriber) saveError(err error) error { } func (s *subscriber) searchNextPieceNum(cur uint32) (nextPieceNum uint32) { - for i := int32(cur); ; i++ { - if _, ok := s.sentMap[i]; !ok { - nextPieceNum = uint32(i) - break - } - } - return nextPieceNum + return searchNextPieceNum(s.sentMap, cur) }