feat: remove legacy peers support (#1939)

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2022-12-21 14:14:35 +08:00 committed by Gaius
parent c3882ccfc6
commit 7cb802aeb7
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
6 changed files with 190 additions and 722 deletions

View File

@ -63,8 +63,6 @@ const (
failedReasonNotSet = "unknown" failedReasonNotSet = "unknown"
) )
var errPeerPacketChanged = errors.New("peer packet changed")
var _ Task = (*peerTaskConductor)(nil) var _ Task = (*peerTaskConductor)(nil)
// peerTaskConductor will fetch all pieces from other peers and send pieces info to broker // peerTaskConductor will fetch all pieces from other peers and send pieces info to broker
@ -112,15 +110,7 @@ type peerTaskConductor struct {
// peerPacketStream stands schedulerclient.PeerPacketStream from scheduler // peerPacketStream stands schedulerclient.PeerPacketStream from scheduler
peerPacketStream schedulerv1.Scheduler_ReportPieceResultClient peerPacketStream schedulerv1.Scheduler_ReportPieceResultClient
// peerPacket is the latest available peers from peerPacketCh legacyPeerCount *atomic.Int64
// Deprecated: remove in future release
peerPacket atomic.Value // *schedulerv1.PeerPacket
legacyPeerCount *atomic.Int64
// peerPacketReady will receive a ready signal for peerPacket ready
peerPacketReady chan bool
// pieceTaskPoller pulls piece task from other peers
// Deprecated: pieceTaskPoller is deprecated, use pieceTaskSyncManager
pieceTaskPoller *pieceTaskPoller
// pieceTaskSyncManager syncs piece task from other peers // pieceTaskSyncManager syncs piece task from other peers
pieceTaskSyncManager *pieceTaskSyncManager pieceTaskSyncManager *pieceTaskSyncManager
@ -134,9 +124,6 @@ type peerTaskConductor struct {
// span stands open telemetry trace span // span stands open telemetry trace span
span trace.Span span trace.Span
// failedPieceCh will hold all pieces which download failed,
// those pieces will be retried later
failedPieceCh chan int32
// failedReason will be set when peer task failed // failedReason will be set when peer task failed
failedReason string failedReason string
// failedReason will be set when peer task failed // failedReason will be set when peer task failed
@ -230,7 +217,6 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
ctx: ctx, ctx: ctx,
ctxCancel: cancel, ctxCancel: cancel,
broker: newPieceBroker(), broker: newPieceBroker(),
peerPacketReady: make(chan bool, 1),
peerID: request.PeerId, peerID: request.PeerId,
taskID: taskID, taskID: taskID,
successCh: make(chan struct{}), successCh: make(chan struct{}),
@ -240,7 +226,6 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
readyPieces: NewBitmap(), readyPieces: NewBitmap(),
runningPieces: NewBitmap(), runningPieces: NewBitmap(),
requestedPieces: NewBitmap(), requestedPieces: NewBitmap(),
failedPieceCh: make(chan int32, config.DefaultPieceChanSize),
failedReason: failedReasonNotSet, failedReason: failedReasonNotSet,
failedCode: commonv1.Code_UnknownError, failedCode: commonv1.Code_UnknownError,
contentLength: atomic.NewInt64(-1), contentLength: atomic.NewInt64(-1),
@ -256,11 +241,6 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
rg: rg, rg: rg,
} }
ptc.pieceTaskPoller = &pieceTaskPoller{
getPiecesMaxRetry: ptm.GetPiecesMaxRetry,
peerTaskConductor: ptc,
}
ptc.pieceDownloadCtx, ptc.pieceDownloadCancel = context.WithCancel(ptc.ctx) ptc.pieceDownloadCtx, ptc.pieceDownloadCancel = context.WithCancel(ptc.ctx)
return ptc return ptc
@ -505,23 +485,6 @@ func (pt *peerTaskConductor) cancelNotRegisterred(code commonv1.Code, reason str
// only use when receive back source code from scheduler // only use when receive back source code from scheduler
func (pt *peerTaskConductor) markBackSource() { func (pt *peerTaskConductor) markBackSource() {
pt.needBackSource.Store(true) pt.needBackSource.Store(true)
// when close peerPacketReady, pullPiecesFromPeers will invoke backSource
close(pt.peerPacketReady)
// let legacy mode exit
pt.peerPacket.Store(&schedulerv1.PeerPacket{
TaskId: pt.taskID,
SrcPid: pt.peerID,
ParallelCount: 1,
MainPeer: nil,
CandidatePeers: []*schedulerv1.PeerPacket_DestPeer{
{
Ip: pt.PeerHost.Ip,
RpcPort: pt.PeerHost.RpcPort,
PeerId: pt.peerID,
},
},
Code: commonv1.Code_SchedNeedBackSource,
})
} }
// only use when legacy get piece from peers schedule timeout // only use when legacy get piece from peers schedule timeout
@ -593,7 +556,6 @@ func (pt *peerTaskConductor) pullPiecesWithP2P() {
pieceRequestQueue: pieceRequestQueue, pieceRequestQueue: pieceRequestQueue,
workers: map[string]*pieceTaskSynchronizer{}, workers: map[string]*pieceTaskSynchronizer{},
} }
go pt.pullPiecesFromPeers(pieceRequestQueue)
pt.receivePeerPacket(pieceRequestQueue) pt.receivePeerPacket(pieceRequestQueue)
} }
@ -701,6 +663,7 @@ func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue ring.Queue[Down
peerPacket *schedulerv1.PeerPacket peerPacket *schedulerv1.PeerPacket
err error err error
firstPacketReceived bool firstPacketReceived bool
firstPacketDone = make(chan bool)
) )
// only record first schedule result // only record first schedule result
// other schedule result will record as an event in peer task span // other schedule result will record as an event in peer task span
@ -720,6 +683,8 @@ func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue ring.Queue[Down
pt.Fail() pt.Fail()
} }
}() }()
go pt.waitFirstPeerPacket(firstPacketDone)
loop: loop:
for { for {
select { select {
@ -739,7 +704,7 @@ loop:
} }
if err != nil { if err != nil {
// some errors, like commonv1.Code_SchedReregister, after reregister success, // some errors, like commonv1.Code_SchedReregister, after reregister success,
// we can continue receive peer packet from the new scheduler // we can continue to receive peer packet from the new scheduler
cont := pt.confirmReceivePeerPacketError(err) cont := pt.confirmReceivePeerPacketError(err)
if cont { if cont {
continue continue
@ -753,7 +718,7 @@ loop:
pt.Debugf("receive peerPacket %v", peerPacket) pt.Debugf("receive peerPacket %v", peerPacket)
if peerPacket.Code != commonv1.Code_Success { if peerPacket.Code != commonv1.Code_Success {
if peerPacket.Code == commonv1.Code_SchedNeedBackSource { if peerPacket.Code == commonv1.Code_SchedNeedBackSource {
pt.markBackSource() pt.forceBackSource()
pt.Infof("receive back source code") pt.Infof("receive back source code")
return return
} }
@ -789,38 +754,18 @@ loop:
firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)) firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId))
firstPeerSpan.End() firstPeerSpan.End()
} }
// updateSynchronizer will update legacy peers to peerPacket.CandidatePeers only
lastNotReadyPiece = pt.updateSynchronizer(lastNotReadyPiece, peerPacket) lastNotReadyPiece = pt.updateSynchronizers(lastNotReadyPiece, peerPacket)
if !firstPacketReceived { if !firstPacketReceived {
// trigger legacy get piece once to avoid first schedule timeout // trigger legacy get piece once to avoid first schedule timeout
firstPacketReceived = true firstPacketReceived = true
} else if len(peerPacket.CandidatePeers) == 0 { close(firstPacketDone)
pt.Debugf("no legacy peers, skip to send peerPacketReady")
pt.legacyPeerCount.Store(0)
continue
}
legacyPeerCount := int64(len(peerPacket.CandidatePeers))
pt.Debugf("connect to %d legacy peers", legacyPeerCount)
pt.legacyPeerCount.Store(legacyPeerCount)
// legacy mode: update peer packet, then send peerPacketReady
pt.peerPacket.Store(peerPacket)
select {
case pt.peerPacketReady <- true:
case <-pt.successCh:
pt.Infof("peer task success, stop wait peer packet from scheduler")
break loop
case <-pt.failCh:
pt.Errorf("peer task fail, stop wait peer packet from scheduler")
break loop
default:
} }
} }
} }
// updateSynchronizer will convert peers to synchronizer, if failed, will update failed peers to schedulerv1.PeerPacket // updateSynchronizers will convert peers to synchronizer, if failed, will update failed peers to schedulerv1.PeerPacket
func (pt *peerTaskConductor) updateSynchronizer(lastNum int32, p *schedulerv1.PeerPacket) int32 { func (pt *peerTaskConductor) updateSynchronizers(lastNum int32, p *schedulerv1.PeerPacket) int32 {
desiredPiece, ok := pt.getNextNotReadyPieceNum(lastNum) desiredPiece, ok := pt.getNextNotReadyPieceNum(lastNum)
if !ok { if !ok {
pt.Infof("all pieces is ready, peer task completed, skip to synchronize") pt.Infof("all pieces is ready, peer task completed, skip to synchronize")
@ -831,10 +776,7 @@ func (pt *peerTaskConductor) updateSynchronizer(lastNum int32, p *schedulerv1.Pe
var peers = []*schedulerv1.PeerPacket_DestPeer{p.MainPeer} var peers = []*schedulerv1.PeerPacket_DestPeer{p.MainPeer}
peers = append(peers, p.CandidatePeers...) peers = append(peers, p.CandidatePeers...)
legacyPeers := pt.pieceTaskSyncManager.newMultiPieceTaskSynchronizer(peers, desiredPiece) _ = pt.pieceTaskSyncManager.syncPeers(peers, desiredPiece)
p.MainPeer = nil
p.CandidatePeers = legacyPeers
return desiredPiece return desiredPiece
} }
@ -854,7 +796,7 @@ func (pt *peerTaskConductor) confirmReceivePeerPacketError(err error) (cont bool
if ok { if ok {
switch de.Code { switch de.Code {
case commonv1.Code_SchedNeedBackSource: case commonv1.Code_SchedNeedBackSource:
pt.markBackSource() pt.forceBackSource()
pt.Infof("receive back source code") pt.Infof("receive back source code")
return false return false
case commonv1.Code_SchedReregister: case commonv1.Code_SchedReregister:
@ -955,98 +897,6 @@ func (pt *peerTaskConductor) pullSinglePiece() {
} }
} }
// Deprecated
func (pt *peerTaskConductor) pullPiecesFromPeers(pieceRequestQueue ring.Queue[DownloadPieceRequest]) {
if ok, backSource := pt.waitFirstPeerPacket(); !ok {
if backSource {
return
}
pt.Errorf("wait first peer packet error")
return
}
var (
num int32
ok bool
limit uint32
)
// ensure first peer packet is not nil
peerPacket, ok := pt.peerPacket.Load().(*schedulerv1.PeerPacket)
if !ok {
pt.Warn("pull pieces canceled")
return
}
if len(peerPacket.CandidatePeers) == 0 {
num, ok = pt.waitAvailablePeerPacket()
if !ok {
return
}
}
limit = config.DefaultPieceChanSize
loop:
for {
// 1, check whether catch exit signal or get a failed piece
// if nothing got, process normal pieces
select {
case <-pt.successCh:
pt.Infof("peer task success, stop get pieces from peer")
break loop
case <-pt.failCh:
pt.Error("peer task fail, stop get pieces from peer")
break loop
case failed := <-pt.failedPieceCh:
pt.Warnf("download piece %d failed, retry", failed)
num = failed
limit = 1
default:
}
retry:
// 2, try to get pieces
pt.Debugf("try to get pieces, number: %d, limit: %d", num, limit)
piecePacket, err := pt.pieceTaskPoller.preparePieceTasks(
&commonv1.PieceTaskRequest{
TaskId: pt.taskID,
SrcPid: pt.peerID,
StartNum: uint32(num),
Limit: limit,
})
if err != nil {
pt.Warnf("get piece task error: %s, wait available peers from scheduler", err.Error())
pt.span.RecordError(err)
if num, ok = pt.waitAvailablePeerPacket(); !ok {
break loop
}
continue loop
}
pt.updateMetadata(piecePacket)
// 3. dispatch piece request to all workers
pt.dispatchPieceRequest(pieceRequestQueue, piecePacket)
// 4. get next not request piece
if num, ok = pt.getNextPieceNum(num); ok {
// get next piece success
limit = config.DefaultPieceChanSize
continue
}
// 5. wait failed pieces
pt.Infof("all pieces requests sent, just wait failed pieces")
// get failed piece
if num, ok = pt.waitFailedPiece(); !ok {
// when ok == false, indicates than need break loop
break loop
}
// just need one piece
limit = 1
goto retry
}
}
func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) { func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) {
// update total piece // update total piece
var metadataChanged bool var metadataChanged bool
@ -1094,139 +944,30 @@ func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQ
} }
} }
func (pt *peerTaskConductor) waitFirstPeerPacket() (done bool, backSource bool) { func (pt *peerTaskConductor) waitFirstPeerPacket(done chan bool) {
// wait first available peer // wait first available peer
select { select {
case <-pt.successCh: case <-pt.successCh:
pt.Infof("peer task succeed, no need to wait first peer") pt.Infof("peer task succeed, no need to wait first peer")
return true, false return
case <-pt.failCh: case <-pt.failCh:
pt.Warnf("peer task failed, no need to wait first peer") pt.Warnf("peer task failed, no need to wait first peer")
return true, false return
case _, ok := <-pt.peerPacketReady: case <-done:
if ok { pt.Debugf("first peer packet received")
// preparePieceTasksByPeer func already send piece result with error return
pt.Infof("new peer client ready, scheduler time cost: %dus, peer count: %d",
time.Since(pt.startTime).Microseconds(), len(pt.peerPacket.Load().(*schedulerv1.PeerPacket).CandidatePeers))
return true, false
}
// when scheduler says commonv1.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady
pt.Infof("start download from source due to commonv1.Code_SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source")
pt.backSource()
return false, true
case <-time.After(pt.SchedulerOption.ScheduleTimeout.Duration): case <-time.After(pt.SchedulerOption.ScheduleTimeout.Duration):
if pt.SchedulerOption.DisableAutoBackSource { if pt.SchedulerOption.DisableAutoBackSource {
pt.cancel(commonv1.Code_ClientScheduleTimeout, reasonBackSourceDisabled) pt.cancel(commonv1.Code_ClientScheduleTimeout, reasonBackSourceDisabled)
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason) err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err) pt.span.RecordError(err)
pt.Errorf(err.Error()) pt.Errorf(err.Error())
return false, false return
} }
pt.Warnf("start download from source due to %s", reasonScheduleTimeout) pt.Warnf("start download from source due to %s", reasonScheduleTimeout)
pt.span.AddEvent("back source due to schedule timeout") pt.span.AddEvent("back source due to schedule timeout")
pt.forceBackSource() pt.forceBackSource()
return false, true return
}
}
// Deprecated
func (pt *peerTaskConductor) waitAvailablePeerPacket() (int32, bool) {
// only <-pt.peerPacketReady continue loop, others break
select {
// when peer task without content length or total pieces count, match here
case <-pt.successCh:
pt.Infof("peer task success, stop wait available peer packet")
case <-pt.failCh:
pt.Infof("peer task fail, stop wait available peer packet")
case _, ok := <-pt.peerPacketReady:
if ok {
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, peer count: %d", len(pt.peerPacket.Load().(*schedulerv1.PeerPacket).CandidatePeers))
// research from piece 0
return 0, true
}
// when scheduler says commonv1.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady
pt.Infof("start download from source due to commonv1.Code_SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source ")
// TODO optimize back source when already downloaded some pieces
pt.backSource()
}
return -1, false
}
// Deprecated
func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestQueue ring.Queue[DownloadPieceRequest], piecePacket *commonv1.PiecePacket) {
pieceCount := len(piecePacket.PieceInfos)
pt.Debugf("dispatch piece request, piece count: %d", pieceCount)
// fix cdn return zero piece info, but with total piece count and content length
if pieceCount == 0 {
finished := pt.isCompleted()
if finished {
pt.Done()
}
}
for _, piece := range piecePacket.PieceInfos {
pt.Infof("get piece %d from %s/%s, digest: %s, start: %d, size: %d",
piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize)
// FIXME when set total piece but no total digest, fetch again
pt.requestedPiecesLock.Lock()
if !pt.requestedPieces.IsSet(piece.PieceNum) {
pt.requestedPieces.Set(piece.PieceNum)
}
pt.requestedPiecesLock.Unlock()
req := &DownloadPieceRequest{
storage: pt.GetStorage(),
piece: piece,
log: pt.Log(),
TaskID: pt.GetTaskID(),
PeerID: pt.GetPeerID(),
DstPid: piecePacket.DstPid,
DstAddr: piecePacket.DstAddr,
}
pieceRequestQueue.Enqueue(req)
msg := fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum)
pt.span.AddEvent(msg)
pt.Infof(msg)
select {
case <-pt.successCh:
pt.Infof("peer task success, stop dispatch piece request")
case <-pt.failCh:
pt.Warnf("peer task fail, stop dispatch piece request")
default:
}
}
}
func (pt *peerTaskConductor) waitFailedPiece() (int32, bool) {
if pt.isCompleted() {
return -1, false
}
wait:
// use no default branch select to wait failed piece or exit
select {
case <-pt.successCh:
pt.Infof("peer task success, stop to wait failed piece")
return -1, false
case <-pt.failCh:
pt.Debugf("peer task fail, stop to wait failed piece")
return -1, false
case failed := <-pt.failedPieceCh:
pt.Warnf("download piece/%d failed, retry", failed)
return failed, true
case _, ok := <-pt.peerPacketReady:
if ok {
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, but all pieces are already downloading, just wait failed pieces")
goto wait
}
// when scheduler says commonv1.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady
pt.Infof("start download from source due to commonv1.Code_SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source")
pt.backSource()
return -1, false
} }
} }
@ -1312,25 +1053,6 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec
}) })
pt.Infof("send failed piece %d to remote, attempt: %d, success: %d", pt.Infof("send failed piece %d to remote, attempt: %d, success: %d",
request.piece.PieceNum, attempt, success) request.piece.PieceNum, attempt, success)
// when there is no legacy peers, skip send to failedPieceCh for legacy peers in background
if pt.legacyPeerCount.Load() == 0 {
pt.Infof("there is no legacy peers, skip send to failedPieceCh for legacy peers")
return
}
// Deprecated
// send to fail chan and retry
// try to send directly first, if failed channel is busy, create a new goroutine to do this
select {
case pt.failedPieceCh <- request.piece.PieceNum:
pt.Infof("success to send failed piece %d to failedPieceCh", request.piece.PieceNum)
default:
pt.Infof("start to send failed piece %d to failedPieceCh in background", request.piece.PieceNum)
go func() {
pt.failedPieceCh <- request.piece.PieceNum
pt.Infof("success to send failed piece %d to failedPieceCh in background", request.piece.PieceNum)
}()
}
return return
} }
// broadcast success piece // broadcast success piece

View File

@ -83,7 +83,6 @@ type componentsOption struct {
backSource bool backSource bool
scope commonv1.SizeScope scope commonv1.SizeScope
content []byte content []byte
getPieceTasks bool
reregister bool reregister bool
} }
@ -130,42 +129,32 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
PieceMd5Sign: totalDigests, PieceMd5Sign: totalDigests,
} }
} }
if opt.getPieceTasks { daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes().
daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes(). DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { return nil, status.Error(codes.Unimplemented, "TODO")
return genPiecePacket(request), nil
})
daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(arg0 dfdaemonv1.Daemon_SyncPieceTasksServer) error {
return status.Error(codes.Unimplemented, "TODO")
}) })
} else { daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error {
daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes(). request, err := s.Recv()
DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { if err != nil {
return nil, status.Error(codes.Unimplemented, "TODO") return err
}) }
daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error { if err = s.Send(genPiecePacket(request)); err != nil {
request, err := s.Recv() return err
}
for {
request, err = s.Recv()
if err == io.EOF {
break
}
if err != nil { if err != nil {
return err return err
} }
if err = s.Send(genPiecePacket(request)); err != nil { if err = s.Send(genPiecePacket(request)); err != nil {
return err return err
} }
for { }
request, err = s.Recv() return nil
if err == io.EOF { })
break
}
if err != nil {
return err
}
if err = s.Send(genPiecePacket(request)); err != nil {
return err
}
}
return nil
})
}
ln, _ := rpc.Listen(dfnet.NetAddr{ ln, _ := rpc.Listen(dfnet.NetAddr{
Type: "tcp", Type: "tcp",
Addr: fmt.Sprintf("0.0.0.0:%d", port), Addr: fmt.Sprintf("0.0.0.0:%d", port),
@ -354,7 +343,6 @@ type testSpec struct {
sizeScope commonv1.SizeScope sizeScope commonv1.SizeScope
peerID string peerID string
url string url string
legacyFeature bool
reregister bool reregister bool
// when urlGenerator is not nil, use urlGenerator instead url // when urlGenerator is not nil, use urlGenerator instead url
// it's useful for httptest server // it's useful for httptest server
@ -621,80 +609,76 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
if _tc.runTaskTypes == nil { if _tc.runTaskTypes == nil {
types = taskTypes types = taskTypes
} }
assert := testifyassert.New(t) assert = testifyassert.New(t)
require := testifyrequire.New(t) require = testifyrequire.New(t)
for _, legacy := range []bool{true, false} { for _, typ := range types {
for _, typ := range types { // dup a new test case with the task type
// dup a new test case with the task type logger.Infof("-------------------- test %s - type %s, started --------------------",
logger.Infof("-------------------- test %s - type %s, legacy feature: %v started --------------------", _tc.name, taskTypeNames[typ])
_tc.name, taskTypeNames[typ], legacy) tc := _tc
tc := _tc tc.taskType = typ
tc.taskType = typ func() {
tc.legacyFeature = legacy ctrl := gomock.NewController(t)
func() { defer ctrl.Finish()
ctrl := gomock.NewController(t) mockContentLength := len(tc.taskData)
defer ctrl.Finish()
mockContentLength := len(tc.taskData)
urlMeta := &commonv1.UrlMeta{ urlMeta := &commonv1.UrlMeta{
Tag: "d7y-test", Tag: "d7y-test",
} }
if tc.httpRange != nil { if tc.httpRange != nil {
urlMeta.Range = strings.TrimLeft(tc.httpRange.String(), "bytes=") urlMeta.Range = strings.TrimLeft(tc.httpRange.String(), "bytes=")
} }
if tc.urlGenerator != nil { if tc.urlGenerator != nil {
tc.url = tc.urlGenerator(&tc) tc.url = tc.urlGenerator(&tc)
} }
taskID := idgen.TaskID(tc.url, urlMeta) taskID := idgen.TaskID(tc.url, urlMeta)
var ( var (
downloader PieceDownloader downloader PieceDownloader
sourceClient source.ResourceClient sourceClient source.ResourceClient
) )
if tc.mockPieceDownloader != nil { if tc.mockPieceDownloader != nil {
downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize) downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize)
} }
if tc.mockHTTPSourceClient != nil { if tc.mockHTTPSourceClient != nil {
source.UnRegister("http")
defer func() {
// reset source client
source.UnRegister("http") source.UnRegister("http")
defer func() { require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter))
// reset source client }()
source.UnRegister("http") // replace source client
require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter)) sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url)
}() require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter))
// replace source client }
sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url)
require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter))
}
option := componentsOption{ option := componentsOption{
taskID: taskID, taskID: taskID,
contentLength: int64(mockContentLength), contentLength: int64(mockContentLength),
pieceSize: uint32(tc.pieceSize), pieceSize: uint32(tc.pieceSize),
pieceParallelCount: tc.pieceParallelCount, pieceParallelCount: tc.pieceParallelCount,
pieceDownloader: downloader, pieceDownloader: downloader,
sourceClient: sourceClient, sourceClient: sourceClient,
content: tc.taskData, content: tc.taskData,
scope: tc.sizeScope, scope: tc.sizeScope,
peerPacketDelay: tc.peerPacketDelay, peerPacketDelay: tc.peerPacketDelay,
backSource: tc.backSource, backSource: tc.backSource,
getPieceTasks: tc.legacyFeature, reregister: tc.reregister,
reregister: tc.reregister, }
} // keep peer task running in enough time to check "getOrCreatePeerTaskConductor" always return same
// keep peer task running in enough time to check "getOrCreatePeerTaskConductor" always return same if tc.taskType == taskTypeConductor {
if tc.taskType == taskTypeConductor { option.peerPacketDelay = []time.Duration{time.Second}
option.peerPacketDelay = []time.Duration{time.Second} }
} mm := setupMockManager(ctrl, &tc, option)
mm := setupMockManager(ctrl, &tc, option) defer mm.CleanUp()
defer mm.CleanUp()
tc.run(assert, require, mm, urlMeta) tc.run(assert, require, mm, urlMeta)
}() }()
logger.Infof("-------------------- test %s - type %s, finished --------------------", _tc.name, taskTypeNames[typ]) logger.Infof("-------------------- test %s - type %s, finished --------------------", _tc.name, taskTypeNames[typ])
}
} }
}) })
} }

View File

@ -1,261 +0,0 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package peer
import (
"context"
"fmt"
"time"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
commonv1 "d7y.io/api/pkg/apis/common/v1"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/internal/dferrors"
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/retry"
dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
)
type pieceTaskPoller struct {
peerTaskConductor *peerTaskConductor
// getPiecesMaxRetry stands max retry to get pieces from one peer packet
getPiecesMaxRetry int
}
func (poller *pieceTaskPoller) preparePieceTasks(request *commonv1.PieceTaskRequest) (pp *commonv1.PiecePacket, err error) {
ptc := poller.peerTaskConductor
defer ptc.recoverFromPanic()
var retryCount int
prepare:
retryCount++
poller.peerTaskConductor.Debugf("prepare piece tasks, retry count: %d", retryCount)
peerPacket := ptc.peerPacket.Load().(*schedulerv1.PeerPacket)
if poller.peerTaskConductor.needBackSource.Load() {
return nil, fmt.Errorf("need back source")
}
for _, peer := range peerPacket.CandidatePeers {
if poller.peerTaskConductor.needBackSource.Load() {
return nil, fmt.Errorf("need back source")
}
request.DstPid = peer.PeerId
pp, err = poller.preparePieceTasksByPeer(peerPacket, peer, request)
if err == nil {
return
}
if err == errPeerPacketChanged {
if poller.getPiecesMaxRetry > 0 && retryCount > poller.getPiecesMaxRetry {
err = fmt.Errorf("get pieces max retry count reached")
return
}
goto prepare
}
}
return
}
func (poller *pieceTaskPoller) preparePieceTasksByPeer(
curPeerPacket *schedulerv1.PeerPacket,
peer *schedulerv1.PeerPacket_DestPeer, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
ptc := poller.peerTaskConductor
if peer == nil {
return nil, fmt.Errorf("empty peer")
}
var span trace.Span
_, span = tracer.Start(ptc.ctx, config.SpanGetPieceTasks)
span.SetAttributes(config.AttributeTargetPeerID.String(peer.PeerId))
span.SetAttributes(config.AttributeGetPieceStartNum.Int(int(request.StartNum)))
span.SetAttributes(config.AttributeGetPieceLimit.Int(int(request.Limit)))
defer span.End()
var maxRetries = 60
// when cdn returns commonv1.Code_CDNTaskNotFound, report it to scheduler and wait cdn download it.
retry:
ptc.Debugf("try get piece task from peer %s, piece num: %d, limit: %d\"", peer.PeerId, request.StartNum, request.Limit)
p, err := poller.getPieceTasksByPeer(span, curPeerPacket, peer, request)
if err == nil {
ptc.Infof("get piece task from peer %s ok, pieces length: %d, totalPiece: %d, content length: %d, piece md5 sign: %s",
peer.PeerId, len(p.PieceInfos), p.TotalPiece, p.ContentLength, p.PieceMd5Sign)
span.SetAttributes(config.AttributeGetPieceCount.Int(len(p.PieceInfos)))
return p, nil
}
span.RecordError(err)
if err == errPeerPacketChanged {
return nil, err
}
ptc.Debugf("get piece task error: %s", err)
// grpc error
if se, ok := err.(interface{ GRPCStatus() *status.Status }); ok {
ptc.Debugf("get piece task with grpc error, code: %d", se.GRPCStatus().Code())
// context canceled, just exit
if se.GRPCStatus().Code() == codes.Canceled {
span.AddEvent("context canceled")
ptc.Warnf("get piece task from peer %s canceled: %s", peer.PeerId, err)
return nil, err
}
}
code := commonv1.Code_ClientPieceRequestFail
// not grpc error
if de, ok := err.(*dferrors.DfError); ok && uint32(de.Code) > uint32(codes.Unauthenticated) {
ptc.Debugf("get piece task from peer %s with df error, code: %d", peer.PeerId, de.Code)
code = de.Code
}
ptc.Errorf("get piece task from peer %s error: %s, code: %d", peer.PeerId, err, code)
sendError := ptc.sendPieceResult(&schedulerv1.PieceResult{
TaskId: ptc.taskID,
SrcPid: ptc.peerID,
DstPid: peer.PeerId,
PieceInfo: &commonv1.PieceInfo{},
Success: false,
Code: code,
HostLoad: nil,
FinishedCount: -1,
})
// error code should be sent to scheduler and the scheduler can schedule a new peer
if sendError != nil {
ptc.cancel(commonv1.Code_SchedError, sendError.Error())
span.RecordError(sendError)
ptc.Errorf("send piece result error: %s, code to send: %d", sendError, code)
return nil, sendError
}
// currently, before cdn gc tasks, it did not notify scheduler, when cdn complains Code_CDNTaskNotFound, retry
if maxRetries > 0 && code == commonv1.Code_CDNTaskNotFound && curPeerPacket == ptc.peerPacket.Load().(*schedulerv1.PeerPacket) {
span.AddEvent("retry for CdnTaskNotFound")
time.Sleep(time.Second)
maxRetries--
goto retry
}
return nil, err
}
func (poller *pieceTaskPoller) getPieceTasksByPeer(
span trace.Span,
curPeerPacket *schedulerv1.PeerPacket,
peer *schedulerv1.PeerPacket_DestPeer,
request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
var (
peerPacketChanged bool
count int
ptc = poller.peerTaskConductor
)
p, _, err := retry.Run(ptc.ctx, 0.05, 0.2, 40, func() (any, bool, error) {
// GetPieceTasks must be fast, so short time out is okay
ctx, cancel := context.WithTimeout(ptc.ctx, 4*time.Second)
defer cancel()
netAddr := &dfnet.NetAddr{
Type: dfnet.TCP,
Addr: fmt.Sprintf("%s:%d", peer.Ip, peer.RpcPort),
}
client, err := dfdaemonclient.GetInsecureClient(context.Background(), netAddr.String())
if err != nil {
ptc.Errorf("get dfdaemon client error: %s", err)
span.RecordError(err)
return nil, true, err
}
piecePacket, getError := client.GetPieceTasks(ctx, request)
// when GetPieceTasks returns err, exit retry
if getError != nil {
ptc.Errorf("get piece tasks with error: %s", getError)
span.RecordError(getError)
// fast way 1 to exit retry
if de, ok := getError.(*dferrors.DfError); ok {
ptc.Debugf("get piece task with grpc error, code: %d", de.Code)
// bad request, like invalid piece num, just exit
if de.Code == commonv1.Code_BadRequest {
span.AddEvent("bad request")
ptc.Warnf("get piece task from peer %s canceled: %s", peer.PeerId, getError)
return nil, true, getError
}
}
// fast way 2 to exit retry
lastPeerPacket := ptc.peerPacket.Load().(*schedulerv1.PeerPacket)
if curPeerPacket.CandidatePeers[0].PeerId != lastPeerPacket.CandidatePeers[0].PeerId {
ptc.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getError,
curPeerPacket.CandidatePeers[0].PeerId, lastPeerPacket.CandidatePeers[0].PeerId)
peerPacketChanged = true
return nil, true, nil
}
return nil, true, getError
}
// got any pieces
if len(piecePacket.PieceInfos) > 0 {
return piecePacket, false, nil
}
// need update metadata
if piecePacket.ContentLength > ptc.GetContentLength() || piecePacket.TotalPiece > ptc.GetTotalPieces() {
return piecePacket, false, nil
}
// invalid request num
if piecePacket.TotalPiece > -1 && uint32(piecePacket.TotalPiece) <= request.StartNum {
ptc.Warnf("invalid start num: %d, total piece: %d", request.StartNum, piecePacket.TotalPiece)
return piecePacket, false, nil
}
// by santong: when peer return empty, retry later
sendError := ptc.sendPieceResult(&schedulerv1.PieceResult{
TaskId: ptc.taskID,
SrcPid: ptc.peerID,
DstPid: peer.PeerId,
PieceInfo: &commonv1.PieceInfo{},
Success: false,
Code: commonv1.Code_ClientWaitPieceReady,
HostLoad: nil,
FinishedCount: ptc.readyPieces.Settled(),
})
if sendError != nil {
ptc.cancel(commonv1.Code_ClientPieceRequestFail, sendError.Error())
span.RecordError(sendError)
ptc.Errorf("send piece result with commonv1.Code_ClientWaitPieceReady error: %s", sendError)
return nil, true, sendError
}
// fast way to exit retry
lastPeerPacket := ptc.peerPacket.Load().(*schedulerv1.PeerPacket)
if curPeerPacket.CandidatePeers[0].PeerId != lastPeerPacket.CandidatePeers[0].PeerId {
ptc.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s",
curPeerPacket.CandidatePeers[0].PeerId, lastPeerPacket.CandidatePeers[0].PeerId)
peerPacketChanged = true
return nil, true, nil
}
count++
span.AddEvent("retry due to empty pieces",
trace.WithAttributes(config.AttributeGetPieceRetry.Int(count)))
ptc.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId)
return nil, false, dferrors.ErrEmptyValue
})
if peerPacketChanged {
return nil, errPeerPacketChanged
}
if err == nil {
return p.(*commonv1.PiecePacket), nil
}
return nil, err
}

View File

@ -75,29 +75,39 @@ type pieceTaskSynchronizerError struct {
} }
// FIXME for compatibility, sync will be called after the dfdaemonclient.GetPieceTasks deprecated and the pieceTaskPoller removed // FIXME for compatibility, sync will be called after the dfdaemonclient.GetPieceTasks deprecated and the pieceTaskPoller removed
func (s *pieceTaskSyncManager) sync(pp *schedulerv1.PeerPacket, desiredPiece int32) error { func (s *pieceTaskSyncManager) syncPeers(destPeers []*schedulerv1.PeerPacket_DestPeer, desiredPiece int32) error {
s.Lock()
defer func() {
if s.peerTaskConductor.WatchdogTimeout > 0 {
s.resetWatchdog(destPeers[0])
}
s.Unlock()
}()
var ( var (
peers = map[string]bool{} peers = map[string]bool{}
errs []error errs []error
) )
peers[pp.MainPeer.PeerId] = true
// TODO if the worker failed, reconnect and retry // TODO if the worker failed, reconnect and retry
s.Lock() for _, peer := range destPeers {
defer s.Unlock() peers[peer.PeerId] = true
if _, ok := s.workers[pp.MainPeer.PeerId]; !ok { if _, ok := s.workers[peer.PeerId]; !ok {
err := s.newPieceTaskSynchronizer(s.ctx, pp.MainPeer, desiredPiece) err := s.newPieceTaskSynchronizer(s.ctx, peer, desiredPiece)
if err != nil { if err == nil {
s.peerTaskConductor.Errorf("main peer SyncPieceTasks error: %s", err) s.peerTaskConductor.Infof("connected to peer: %s", peer.PeerId)
errs = append(errs, err) continue
} }
}
for _, p := range pp.CandidatePeers { // other errors, report to scheduler
peers[p.PeerId] = true if errors.Is(err, context.DeadlineExceeded) {
if _, ok := s.workers[p.PeerId]; !ok { // connect timeout error, report to scheduler to get more available peers
err := s.newPieceTaskSynchronizer(s.ctx, p, desiredPiece) s.reportInvalidPeer(peer, commonv1.Code_ClientConnectionError)
if err != nil { s.peerTaskConductor.Infof("connect peer %s with error: %s", peer.PeerId, err)
s.peerTaskConductor.Errorf("candidate peer SyncPieceTasks error: %s", err) } else {
errs = append(errs, err) // other errors, report to scheduler to get more available peers
s.reportInvalidPeer(peer, commonv1.Code_ClientPieceRequestFail)
s.peerTaskConductor.Errorf("connect peer %s error: %s", peer.PeerId, err)
} }
} }
} }
@ -224,47 +234,6 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer(
return nil return nil
} }
func (s *pieceTaskSyncManager) newMultiPieceTaskSynchronizer(
destPeers []*schedulerv1.PeerPacket_DestPeer,
desiredPiece int32) (legacyPeers []*schedulerv1.PeerPacket_DestPeer) {
s.Lock()
defer func() {
if s.peerTaskConductor.WatchdogTimeout > 0 {
s.resetWatchdog(destPeers[0])
}
s.Unlock()
}()
for _, peer := range destPeers {
err := s.newPieceTaskSynchronizer(s.ctx, peer, desiredPiece)
if err == nil {
s.peerTaskConductor.Infof("connected to peer: %s", peer.PeerId)
continue
}
// when err is codes.Unimplemented, fallback to legacy get piece grpc
stat, ok := status.FromError(err)
if ok && stat.Code() == codes.Unimplemented {
// for legacy peers, when get pieces error, will report the error
s.peerTaskConductor.Warnf("connect peer %s error: %s, fallback to legacy get piece grpc", peer.PeerId, err)
legacyPeers = append(legacyPeers, peer)
continue
}
// other errors, report to scheduler
if errors.Is(err, context.DeadlineExceeded) {
// connect timeout error, report to scheduler to get more available peers
s.reportInvalidPeer(peer, commonv1.Code_ClientConnectionError)
s.peerTaskConductor.Infof("connect to peer %s with error: %s, peer is invalid, skip legacy grpc", peer.PeerId, err)
} else {
// other errors, report to scheduler to get more available peers
s.reportInvalidPeer(peer, commonv1.Code_ClientPieceRequestFail)
s.peerTaskConductor.Errorf("connect peer %s error: %s, not codes.Unimplemented", peer.PeerId, err)
}
}
s.cleanStaleWorker(destPeers)
return legacyPeers
}
func (s *pieceTaskSyncManager) resetWatchdog(mainPeer *schedulerv1.PeerPacket_DestPeer) { func (s *pieceTaskSyncManager) resetWatchdog(mainPeer *schedulerv1.PeerPacket_DestPeer) {
if s.watchdog != nil { if s.watchdog != nil {
close(s.watchdog.done) close(s.watchdog.done)

View File

@ -35,9 +35,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/atomic" "go.uber.org/atomic"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
commonv1 "d7y.io/api/pkg/apis/common/v1" commonv1 "d7y.io/api/pkg/apis/common/v1"
dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1"
@ -99,8 +97,45 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte,
TotalPiece: pieceCount, TotalPiece: pieceCount,
}, nil }, nil
}) })
daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(arg0 dfdaemonv1.Daemon_SyncPieceTasksServer) error { daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error {
return status.Error(codes.Unimplemented, "TODO") request, err := s.Recv()
if err != nil {
return err
}
var tasks []*commonv1.PieceInfo
// only return first piece
if request.StartNum == 0 {
tasks = append(tasks,
&commonv1.PieceInfo{
PieceNum: int32(request.StartNum),
RangeStart: uint64(0),
RangeSize: opt.pieceSize,
PieceMd5: digest.MD5FromBytes(testBytes[0:opt.pieceSize]),
PieceOffset: 0,
PieceStyle: 0,
})
}
pp := &commonv1.PiecePacket{
PieceMd5Sign: digest.SHA256FromStrings(piecesMd5...),
TaskId: request.TaskId,
DstPid: "peer-x",
PieceInfos: tasks,
ContentLength: opt.contentLength,
TotalPiece: pieceCount,
}
if err = s.Send(pp); err != nil {
return err
}
for {
_, err = s.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
}
return nil
}) })
ln, _ := rpc.Listen(dfnet.NetAddr{ ln, _ := rpc.Listen(dfnet.NetAddr{
Type: "tcp", Type: "tcp",

View File

@ -132,10 +132,29 @@ func trafficShaperSetupPeerTaskManagerComponents(ctrl *gomock.Controller, opt tr
} }
daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes(). daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
return genPiecePacket(request), nil return nil, status.Error(codes.Unimplemented, "TODO")
}) })
daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(arg0 dfdaemonv1.Daemon_SyncPieceTasksServer) error { daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error {
return status.Error(codes.Unimplemented, "TODO") request, err := s.Recv()
if err != nil {
return err
}
if err = s.Send(genPiecePacket(request)); err != nil {
return err
}
for {
request, err = s.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
if err = s.Send(genPiecePacket(request)); err != nil {
return err
}
}
return nil
}) })
ln, _ := rpc.Listen(dfnet.NetAddr{ ln, _ := rpc.Listen(dfnet.NetAddr{
Type: "tcp", Type: "tcp",