chore: add sync pieces trace and update sync pieces logic for done task (#1263)

* chore: add sync pieces trace

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* chore: when task is done, sync piece tasks will still receive remote request

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* fix: go context lint

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* Update client/daemon/rpcserver/rpcserver.go

Co-authored-by: cndoit18 <cndoit18@outlook.com>
Signed-off-by: Jim Ma <majinjing3@gmail.com>

* chore: record close send error

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* chore: clean PeerPacket when peer task completed

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* chore: optimize sync piece completed check

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* chore: fix go lint

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* chore: update sync piece test

Signed-off-by: Jim Ma <majinjing3@gmail.com>

Co-authored-by: cndoit18 <cndoit18@outlook.com>
This commit is contained in:
Jim Ma 2022-04-21 14:16:01 +08:00 committed by Gaius
parent 5662d08b4e
commit 41f766168b
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
6 changed files with 140 additions and 56 deletions

View File

@ -55,6 +55,7 @@ const (
SpanBackSource = "client-back-source"
SpanFirstSchedule = "schedule-#1"
SpanGetPieceTasks = "get-piece-tasks"
SpanSyncPieceTasks = "sync-piece-tasks"
SpanDownloadPiece = "download-piece-#%d"
SpanProxy = "proxy"
SpanWritePiece = "write-piece"

View File

@ -476,7 +476,7 @@ func (pt *peerTaskConductor) pullPieces() {
func (pt *peerTaskConductor) pullPiecesWithP2P() {
var (
// keep same size with pt.failedPieceCh for avoiding dead lock
// keep same size with pt.failedPieceCh for avoiding deadlock
pieceBufferSize = uint32(config.DefaultPieceChanSize)
pieceRequestCh = make(chan *DownloadPieceRequest, pieceBufferSize)
)
@ -684,6 +684,8 @@ func (pt *peerTaskConductor) updateSynchronizer(lastNum int32, p *scheduler.Peer
num, ok := pt.getNextNotReadyPieceNum(lastNum)
if !ok {
pt.Infof("all pieces is ready, peer task completed, skip to synchronize")
p.MainPeer = nil
p.StealPeers = nil
return num
}
var peers = []*scheduler.PeerPacket_DestPeer{p.MainPeer}

View File

@ -18,14 +18,17 @@ package peer
import (
"context"
"fmt"
"io"
"sync"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"d7y.io/dragonfly/v2/client/config"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/dfdaemon"
@ -44,6 +47,7 @@ type pieceTaskSyncManager struct {
type pieceTaskSynchronizer struct {
*logger.SugaredLoggerOnWith
span trace.Span
client dfdaemon.Daemon_SyncPieceTasksClient
dstPeer *scheduler.PeerPacket_DestPeer
error atomic.Value
@ -158,7 +162,9 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer(
return err
}
_, span := tracer.Start(s.ctx, config.SpanSyncPieceTasks)
synchronizer := &pieceTaskSynchronizer{
span: span,
peerTaskConductor: s.peerTaskConductor,
pieceRequestCh: s.pieceRequestCh,
client: client,
@ -263,7 +269,9 @@ func (s *pieceTaskSynchronizer) close() {
if err := s.client.CloseSend(); err != nil {
s.error.Store(&pieceTaskSynchronizerError{err})
s.Debugf("close send error: %s, dest peer: %s", err, s.dstPeer.PeerId)
s.span.RecordError(err)
}
s.span.End()
}
func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *base.PiecePacket) {
@ -299,6 +307,7 @@ func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *base.PiecePack
}
select {
case s.pieceRequestCh <- req:
s.span.AddEvent(fmt.Sprintf("send piece #%d request", piece.PieceNum))
case <-s.peerTaskConductor.successCh:
s.Infof("peer task success, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId)
case <-s.peerTaskConductor.failCh:
@ -325,7 +334,8 @@ func (s *pieceTaskSynchronizer) receive(piecePacket *base.PiecePacket) {
} else {
s.Errorf("synchronizer receives with error: %s", err)
s.error.Store(&pieceTaskSynchronizerError{err})
s.reportError()
s.reportError(err)
s.Errorf("synchronizer receives with error: %s", err)
}
}
@ -336,19 +346,21 @@ func (s *pieceTaskSynchronizer) acquire(request *base.PieceTaskRequest) error {
return err
}
err := s.client.Send(request)
s.span.AddEvent(fmt.Sprintf("send piece #%d request", request.StartNum))
if err != nil {
s.error.Store(&pieceTaskSynchronizerError{err})
if s.canceled(err) {
s.Debugf("synchronizer sends canceled")
} else {
s.Errorf("synchronizer sends with error: %s", err)
s.reportError()
s.reportError(err)
}
}
return err
}
func (s *pieceTaskSynchronizer) reportError() {
func (s *pieceTaskSynchronizer) reportError(err error) {
s.span.RecordError(err)
sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, base.Code_ClientPieceRequestFail))
if sendError != nil {
s.Errorf("sync piece info failed and send piece result with error: %s", sendError)

View File

@ -19,6 +19,7 @@ package rpcserver
import (
"context"
"fmt"
"io"
"math"
"net"
"os"
@ -139,46 +140,73 @@ func (s *server) GetPieceTasks(ctx context.Context, request *base.PieceTaskReque
}
// sendExistPieces will send as much as possible pieces
func (s *server) sendExistPieces(request *base.PieceTaskRequest, sync dfdaemongrpc.Daemon_SyncPieceTasksServer, sentMap map[int32]struct{}) (total int32, sent int, err error) {
return sendExistPieces(sync.Context(), s.GetPieceTasks, request, sync, sentMap, true)
func (s *server) sendExistPieces(log *logger.SugaredLoggerOnWith, request *base.PieceTaskRequest, sync dfdaemongrpc.Daemon_SyncPieceTasksServer, sentMap map[int32]struct{}) (total int32, err error) {
return sendExistPieces(sync.Context(), log, s.GetPieceTasks, request, sync, sentMap, true)
}
// sendFirstPieceTasks will send as much as possible pieces, even if no available pieces
func (s *server) sendFirstPieceTasks(request *base.PieceTaskRequest, sync dfdaemongrpc.Daemon_SyncPieceTasksServer, sentMap map[int32]struct{}) (total int32, sent int, err error) {
return sendExistPieces(sync.Context(), s.GetPieceTasks, request, sync, sentMap, false)
func (s *server) sendFirstPieceTasks(log *logger.SugaredLoggerOnWith, request *base.PieceTaskRequest, sync dfdaemongrpc.Daemon_SyncPieceTasksServer, sentMap map[int32]struct{}) (total int32, err error) {
return sendExistPieces(sync.Context(), log, s.GetPieceTasks, request, sync, sentMap, false)
}
func (s *server) SyncPieceTasks(sync dfdaemongrpc.Daemon_SyncPieceTasksServer) error {
request, err := sync.Recv()
if err != nil {
logger.Errorf("receive first sync piece tasks request error: %s", err.Error())
return err
}
log := logger.With("taskID", request.TaskId,
"localPeerID", request.DstPid, "remotePeerID", request.SrcPid)
skipPieceCount := request.StartNum
var sentMap = make(map[int32]struct{})
// TODO if not found, try to send to peer task conductor, then download it first
total, sent, err := s.sendFirstPieceTasks(request, sync, sentMap)
total, err := s.sendFirstPieceTasks(log, request, sync, sentMap)
if err != nil {
log.Errorf("send first piece tasks error: %s", err)
return err
}
// task is done, just return
if int(total) == sent {
recvReminding := func() error {
for {
request, err = sync.Recv()
if err == io.EOF {
return nil
}
if err != nil {
logger.Errorf("receive reminding piece tasks request error: %s", err)
return err
}
total, err = s.sendExistPieces(log, request, sync, sentMap)
if err != nil {
logger.Errorf("send reminding piece tasks error: %s", err)
return err
}
}
}
// task is done, just receive new piece tasks requests only
if int(total) == len(sentMap)+int(skipPieceCount) {
log.Infof("all piece tasks sent, receive new piece tasks requests only")
return recvReminding()
}
// subscribe peer task message for remaining pieces
result, ok := s.peerTaskManager.Subscribe(request)
if !ok {
// task not found, double check for done task
total, sent, err = s.sendExistPieces(request, sync, sentMap)
// running task not found, double check for done task
request.StartNum = searchNextPieceNum(sentMap, skipPieceCount)
total, err = s.sendExistPieces(log, request, sync, sentMap)
if err != nil {
log.Errorf("send exist piece tasks error: %s", err)
return err
}
if int(total) > sent {
if int(total) > len(sentMap)+int(skipPieceCount) {
return status.Errorf(codes.Unavailable, "peer task not finish, but no running task found")
}
return nil
return recvReminding()
}
var sub = &subscriber{
@ -190,8 +218,7 @@ func (s *server) SyncPieceTasks(sync dfdaemongrpc.Daemon_SyncPieceTasksServer) e
sentMap: sentMap,
done: make(chan struct{}),
uploadAddr: s.uploadAddr,
SugaredLoggerOnWith: logger.With("taskID", request.TaskId,
"localPeerID", request.DstPid, "remotePeerID", request.SrcPid),
SugaredLoggerOnWith: log,
}
go sub.receiveRemainingPieceTaskRequests()

View File

@ -48,7 +48,7 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
func TestDownloadManager_ServeDownload(t *testing.T) {
func Test_ServeDownload(t *testing.T) {
assert := testifyassert.New(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@ -112,7 +112,7 @@ func TestDownloadManager_ServeDownload(t *testing.T) {
assert.True(lastResult.Done)
}
func TestDownloadManager_ServePeer(t *testing.T) {
func Test_ServePeer(t *testing.T) {
assert := testifyassert.New(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@ -216,7 +216,7 @@ func TestDownloadManager_ServePeer(t *testing.T) {
}
}
func TestDownloadManager_SyncPieceTasks(t *testing.T) {
func Test_SyncPieceTasks(t *testing.T) {
assert := testifyassert.New(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@ -232,11 +232,11 @@ func TestDownloadManager_SyncPieceTasks(t *testing.T) {
var tests = []struct {
name string
existTaskID string // test for non-exists task
existPieces []pieceRange
existPieces []pieceRange // already exist pieces in storage
followingPieces []pieceRange // following pieces in running task subscribe channel
requestPieces []int32
limit uint32
totalPieces uint32
followingPieces []pieceRange
success bool
verify func(t *testing.T, assert *testifyassert.Assertions)
}{
@ -253,6 +253,20 @@ func TestDownloadManager_SyncPieceTasks(t *testing.T) {
verify: func(t *testing.T, assert *testifyassert.Assertions) {
},
},
{
name: "already exists in storage with extra get piece request",
existPieces: []pieceRange{
{
start: 0,
end: 10,
},
},
totalPieces: 11,
requestPieces: []int32{1, 3, 5, 7},
success: true,
verify: func(t *testing.T, assert *testifyassert.Assertions) {
},
},
{
name: "already exists in storage - large",
existPieces: []pieceRange{
@ -266,6 +280,20 @@ func TestDownloadManager_SyncPieceTasks(t *testing.T) {
verify: func(t *testing.T, assert *testifyassert.Assertions) {
},
},
{
name: "already exists in storage - large with extra get piece request",
existPieces: []pieceRange{
{
start: 0,
end: 1000,
},
},
totalPieces: 1001,
requestPieces: []int32{1, 3, 5, 7, 100, 500, 1000},
success: true,
verify: func(t *testing.T, assert *testifyassert.Assertions) {
},
},
{
name: "partial exists in storage",
existPieces: []pieceRange{
@ -452,7 +480,6 @@ func TestDownloadManager_SyncPieceTasks(t *testing.T) {
}
port, client := setupPeerServerAndClient(t, s, assert, s.ServePeer)
defer s.peerServer.GracefulStop()
syncClient, err := client.SyncPieceTasks(
context.Background(),
@ -514,6 +541,7 @@ func TestDownloadManager_SyncPieceTasks(t *testing.T) {
if tc.success {
assert.Equal(int(maxNum+1), len(total))
}
s.peerServer.GracefulStop()
}
})

View File

@ -52,41 +52,56 @@ func (s *subscriber) getPieces(ctx context.Context, request *base.PieceTaskReque
func sendExistPieces(
ctx context.Context,
log *logger.SugaredLoggerOnWith,
get func(ctx context.Context, request *base.PieceTaskRequest) (*base.PiecePacket, error),
request *base.PieceTaskRequest,
sync dfdaemon.Daemon_SyncPieceTasksServer,
sendMap map[int32]struct{},
skipSendZeroPiece bool) (total int32, sent int, err error) {
sentMap map[int32]struct{},
skipSendZeroPiece bool) (total int32, err error) {
if request.Limit <= 0 {
request.Limit = 16
}
var pp *base.PiecePacket
for {
pp, err := get(ctx, request)
pp, err = get(ctx, request)
if err != nil {
return -1, -1, err
log.Errorf("get piece error: %s", err)
return -1, err
}
if len(pp.PieceInfos) == 0 && skipSendZeroPiece {
return pp.TotalPiece, sent, nil
return pp.TotalPiece, nil
}
if err = sync.Send(pp); err != nil {
return pp.TotalPiece, sent, err
log.Errorf("send pieces error: %s", err)
return pp.TotalPiece, err
}
for _, p := range pp.PieceInfos {
sendMap[p.PieceNum] = struct{}{}
log.Infof("send ready piece %d", p.PieceNum)
sentMap[p.PieceNum] = struct{}{}
}
sent += len(pp.PieceInfos)
if uint32(len(pp.PieceInfos)) < request.Limit {
return pp.TotalPiece, sent, nil
log.Infof("sent %d pieces, total: %d", len(pp.PieceInfos), pp.TotalPiece)
return pp.TotalPiece, nil
}
// the get piece func always return sorted pieces, use last piece num + 1 to get more pieces
request.StartNum = uint32(pp.PieceInfos[request.Limit-1].PieceNum + 1)
}
}
func searchNextPieceNum(sentMap map[int32]struct{}, cur uint32) (nextPieceNum uint32) {
for i := int32(cur); ; i++ {
if _, ok := sentMap[i]; !ok {
nextPieceNum = uint32(i)
break
}
}
return nextPieceNum
}
// sendExistPieces will send as much as possible pieces
func (s *subscriber) sendExistPieces(startNum uint32) (total int32, sent int, err error) {
func (s *subscriber) sendExistPieces(startNum uint32) (total int32, err error) {
s.request.StartNum = startNum
return sendExistPieces(s.sync.Context(), s.getPieces, s.request, s.sync, s.sentMap, true)
return sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true)
}
func (s *subscriber) receiveRemainingPieceTaskRequests() {
@ -123,6 +138,7 @@ func (s *subscriber) receiveRemainingPieceTaskRequests() {
return
}
for _, p := range pp.PieceInfos {
s.Infof("send ready piece %d", p.PieceNum)
s.sentMap[p.PieceNum] = struct{}{}
}
s.Unlock()
@ -148,14 +164,14 @@ loop:
s.Infof("remote SyncPieceTasks done, exit sending, local task is running")
return nil
case info := <-s.PieceInfoChannel:
s.Infof("receive piece info, num: %d, %v", info.Num, info.Finished)
// not desired piece
s.Debugf("receive piece info, num: %d, %v", info.Num, info.Finished)
if s.totalPieces > -1 && uint32(info.Num) < nextPieceNum {
continue
}
s.Lock()
total, _, err := s.sendExistPieces(uint32(info.Num))
s.Lock()
total, err := s.sendExistPieces(uint32(info.Num))
if err != nil {
err = s.saveError(err)
s.Unlock()
@ -175,13 +191,14 @@ loop:
nextPieceNum = s.searchNextPieceNum(nextPieceNum)
s.Unlock()
case <-s.Success:
s.Infof("peer task is success, send remaining pieces")
s.Lock()
// all pieces already sent
if s.totalPieces > -1 && nextPieceNum == uint32(s.totalPieces) {
s.Unlock()
break loop
}
total, _, err := s.sendExistPieces(nextPieceNum)
total, err := s.sendExistPieces(nextPieceNum)
if err != nil {
err = s.saveError(err)
s.Unlock()
@ -192,11 +209,14 @@ loop:
}
if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) != int(s.totalPieces) {
s.Unlock()
return dferrors.Newf(base.Code_ClientError, "peer task success, but can not send all pieces")
msg := "peer task success, but can not send all pieces"
s.Errorf(msg)
return dferrors.Newf(base.Code_ClientError, msg)
}
s.Unlock()
break loop
case <-s.Fail:
s.Errorf("peer task failed")
return dferrors.Newf(base.Code_ClientError, "peer task failed")
}
}
@ -221,11 +241,5 @@ func (s *subscriber) saveError(err error) error {
}
func (s *subscriber) searchNextPieceNum(cur uint32) (nextPieceNum uint32) {
for i := int32(cur); ; i++ {
if _, ok := s.sentMap[i]; !ok {
nextPieceNum = uint32(i)
break
}
}
return nextPieceNum
return searchNextPieceNum(s.sentMap, cur)
}