diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 5f96dc383..b16b18d59 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -63,8 +63,6 @@ const ( failedReasonNotSet = "unknown" ) -var errPeerPacketChanged = errors.New("peer packet changed") - var _ Task = (*peerTaskConductor)(nil) // peerTaskConductor will fetch all pieces from other peers and send pieces info to broker @@ -112,15 +110,7 @@ type peerTaskConductor struct { // peerPacketStream stands schedulerclient.PeerPacketStream from scheduler peerPacketStream schedulerv1.Scheduler_ReportPieceResultClient - // peerPacket is the latest available peers from peerPacketCh - // Deprecated: remove in future release - peerPacket atomic.Value // *schedulerv1.PeerPacket - legacyPeerCount *atomic.Int64 - // peerPacketReady will receive a ready signal for peerPacket ready - peerPacketReady chan bool - // pieceTaskPoller pulls piece task from other peers - // Deprecated: pieceTaskPoller is deprecated, use pieceTaskSyncManager - pieceTaskPoller *pieceTaskPoller + legacyPeerCount *atomic.Int64 // pieceTaskSyncManager syncs piece task from other peers pieceTaskSyncManager *pieceTaskSyncManager @@ -134,9 +124,6 @@ type peerTaskConductor struct { // span stands open telemetry trace span span trace.Span - // failedPieceCh will hold all pieces which download failed, - // those pieces will be retried later - failedPieceCh chan int32 // failedReason will be set when peer task failed failedReason string // failedReason will be set when peer task failed @@ -230,7 +217,6 @@ func (ptm *peerTaskManager) newPeerTaskConductor( ctx: ctx, ctxCancel: cancel, broker: newPieceBroker(), - peerPacketReady: make(chan bool, 1), peerID: request.PeerId, taskID: taskID, successCh: make(chan struct{}), @@ -240,7 +226,6 @@ func (ptm *peerTaskManager) newPeerTaskConductor( readyPieces: NewBitmap(), runningPieces: NewBitmap(), requestedPieces: NewBitmap(), - failedPieceCh: make(chan int32, config.DefaultPieceChanSize), failedReason: failedReasonNotSet, failedCode: commonv1.Code_UnknownError, contentLength: atomic.NewInt64(-1), @@ -256,11 +241,6 @@ func (ptm *peerTaskManager) newPeerTaskConductor( rg: rg, } - ptc.pieceTaskPoller = &pieceTaskPoller{ - getPiecesMaxRetry: ptm.GetPiecesMaxRetry, - peerTaskConductor: ptc, - } - ptc.pieceDownloadCtx, ptc.pieceDownloadCancel = context.WithCancel(ptc.ctx) return ptc @@ -505,23 +485,6 @@ func (pt *peerTaskConductor) cancelNotRegisterred(code commonv1.Code, reason str // only use when receive back source code from scheduler func (pt *peerTaskConductor) markBackSource() { pt.needBackSource.Store(true) - // when close peerPacketReady, pullPiecesFromPeers will invoke backSource - close(pt.peerPacketReady) - // let legacy mode exit - pt.peerPacket.Store(&schedulerv1.PeerPacket{ - TaskId: pt.taskID, - SrcPid: pt.peerID, - ParallelCount: 1, - MainPeer: nil, - CandidatePeers: []*schedulerv1.PeerPacket_DestPeer{ - { - Ip: pt.PeerHost.Ip, - RpcPort: pt.PeerHost.RpcPort, - PeerId: pt.peerID, - }, - }, - Code: commonv1.Code_SchedNeedBackSource, - }) } // only use when legacy get piece from peers schedule timeout @@ -593,7 +556,6 @@ func (pt *peerTaskConductor) pullPiecesWithP2P() { pieceRequestQueue: pieceRequestQueue, workers: map[string]*pieceTaskSynchronizer{}, } - go pt.pullPiecesFromPeers(pieceRequestQueue) pt.receivePeerPacket(pieceRequestQueue) } @@ -701,6 +663,7 @@ func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue ring.Queue[Down peerPacket *schedulerv1.PeerPacket err error firstPacketReceived bool + firstPacketDone = make(chan bool) ) // only record first schedule result // other schedule result will record as an event in peer task span @@ -720,6 +683,8 @@ func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue ring.Queue[Down pt.Fail() } }() + + go pt.waitFirstPeerPacket(firstPacketDone) loop: for { select { @@ -739,7 +704,7 @@ loop: } if err != nil { // some errors, like commonv1.Code_SchedReregister, after reregister success, - // we can continue receive peer packet from the new scheduler + // we can continue to receive peer packet from the new scheduler cont := pt.confirmReceivePeerPacketError(err) if cont { continue @@ -753,7 +718,7 @@ loop: pt.Debugf("receive peerPacket %v", peerPacket) if peerPacket.Code != commonv1.Code_Success { if peerPacket.Code == commonv1.Code_SchedNeedBackSource { - pt.markBackSource() + pt.forceBackSource() pt.Infof("receive back source code") return } @@ -789,38 +754,18 @@ loop: firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)) firstPeerSpan.End() } - // updateSynchronizer will update legacy peers to peerPacket.CandidatePeers only - lastNotReadyPiece = pt.updateSynchronizer(lastNotReadyPiece, peerPacket) + + lastNotReadyPiece = pt.updateSynchronizers(lastNotReadyPiece, peerPacket) if !firstPacketReceived { // trigger legacy get piece once to avoid first schedule timeout firstPacketReceived = true - } else if len(peerPacket.CandidatePeers) == 0 { - pt.Debugf("no legacy peers, skip to send peerPacketReady") - pt.legacyPeerCount.Store(0) - continue - } - - legacyPeerCount := int64(len(peerPacket.CandidatePeers)) - pt.Debugf("connect to %d legacy peers", legacyPeerCount) - pt.legacyPeerCount.Store(legacyPeerCount) - - // legacy mode: update peer packet, then send peerPacketReady - pt.peerPacket.Store(peerPacket) - select { - case pt.peerPacketReady <- true: - case <-pt.successCh: - pt.Infof("peer task success, stop wait peer packet from scheduler") - break loop - case <-pt.failCh: - pt.Errorf("peer task fail, stop wait peer packet from scheduler") - break loop - default: + close(firstPacketDone) } } } -// updateSynchronizer will convert peers to synchronizer, if failed, will update failed peers to schedulerv1.PeerPacket -func (pt *peerTaskConductor) updateSynchronizer(lastNum int32, p *schedulerv1.PeerPacket) int32 { +// updateSynchronizers will convert peers to synchronizer, if failed, will update failed peers to schedulerv1.PeerPacket +func (pt *peerTaskConductor) updateSynchronizers(lastNum int32, p *schedulerv1.PeerPacket) int32 { desiredPiece, ok := pt.getNextNotReadyPieceNum(lastNum) if !ok { pt.Infof("all pieces is ready, peer task completed, skip to synchronize") @@ -831,10 +776,7 @@ func (pt *peerTaskConductor) updateSynchronizer(lastNum int32, p *schedulerv1.Pe var peers = []*schedulerv1.PeerPacket_DestPeer{p.MainPeer} peers = append(peers, p.CandidatePeers...) - legacyPeers := pt.pieceTaskSyncManager.newMultiPieceTaskSynchronizer(peers, desiredPiece) - - p.MainPeer = nil - p.CandidatePeers = legacyPeers + _ = pt.pieceTaskSyncManager.syncPeers(peers, desiredPiece) return desiredPiece } @@ -854,7 +796,7 @@ func (pt *peerTaskConductor) confirmReceivePeerPacketError(err error) (cont bool if ok { switch de.Code { case commonv1.Code_SchedNeedBackSource: - pt.markBackSource() + pt.forceBackSource() pt.Infof("receive back source code") return false case commonv1.Code_SchedReregister: @@ -955,98 +897,6 @@ func (pt *peerTaskConductor) pullSinglePiece() { } } -// Deprecated -func (pt *peerTaskConductor) pullPiecesFromPeers(pieceRequestQueue ring.Queue[DownloadPieceRequest]) { - if ok, backSource := pt.waitFirstPeerPacket(); !ok { - if backSource { - return - } - pt.Errorf("wait first peer packet error") - return - } - var ( - num int32 - ok bool - limit uint32 - ) - - // ensure first peer packet is not nil - peerPacket, ok := pt.peerPacket.Load().(*schedulerv1.PeerPacket) - if !ok { - pt.Warn("pull pieces canceled") - return - } - if len(peerPacket.CandidatePeers) == 0 { - num, ok = pt.waitAvailablePeerPacket() - if !ok { - return - } - } - - limit = config.DefaultPieceChanSize -loop: - for { - // 1, check whether catch exit signal or get a failed piece - // if nothing got, process normal pieces - select { - case <-pt.successCh: - pt.Infof("peer task success, stop get pieces from peer") - break loop - case <-pt.failCh: - pt.Error("peer task fail, stop get pieces from peer") - break loop - case failed := <-pt.failedPieceCh: - pt.Warnf("download piece %d failed, retry", failed) - num = failed - limit = 1 - default: - } - - retry: - // 2, try to get pieces - pt.Debugf("try to get pieces, number: %d, limit: %d", num, limit) - piecePacket, err := pt.pieceTaskPoller.preparePieceTasks( - &commonv1.PieceTaskRequest{ - TaskId: pt.taskID, - SrcPid: pt.peerID, - StartNum: uint32(num), - Limit: limit, - }) - - if err != nil { - pt.Warnf("get piece task error: %s, wait available peers from scheduler", err.Error()) - pt.span.RecordError(err) - if num, ok = pt.waitAvailablePeerPacket(); !ok { - break loop - } - continue loop - } - - pt.updateMetadata(piecePacket) - - // 3. dispatch piece request to all workers - pt.dispatchPieceRequest(pieceRequestQueue, piecePacket) - - // 4. get next not request piece - if num, ok = pt.getNextPieceNum(num); ok { - // get next piece success - limit = config.DefaultPieceChanSize - continue - } - - // 5. wait failed pieces - pt.Infof("all pieces requests sent, just wait failed pieces") - // get failed piece - if num, ok = pt.waitFailedPiece(); !ok { - // when ok == false, indicates than need break loop - break loop - } - // just need one piece - limit = 1 - goto retry - } -} - func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) { // update total piece var metadataChanged bool @@ -1094,139 +944,30 @@ func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQ } } -func (pt *peerTaskConductor) waitFirstPeerPacket() (done bool, backSource bool) { +func (pt *peerTaskConductor) waitFirstPeerPacket(done chan bool) { // wait first available peer select { case <-pt.successCh: pt.Infof("peer task succeed, no need to wait first peer") - return true, false + return case <-pt.failCh: pt.Warnf("peer task failed, no need to wait first peer") - return true, false - case _, ok := <-pt.peerPacketReady: - if ok { - // preparePieceTasksByPeer func already send piece result with error - pt.Infof("new peer client ready, scheduler time cost: %dus, peer count: %d", - time.Since(pt.startTime).Microseconds(), len(pt.peerPacket.Load().(*schedulerv1.PeerPacket).CandidatePeers)) - return true, false - } - // when scheduler says commonv1.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady - pt.Infof("start download from source due to commonv1.Code_SchedNeedBackSource") - pt.span.AddEvent("back source due to scheduler says need back source") - pt.backSource() - return false, true + return + case <-done: + pt.Debugf("first peer packet received") + return case <-time.After(pt.SchedulerOption.ScheduleTimeout.Duration): if pt.SchedulerOption.DisableAutoBackSource { pt.cancel(commonv1.Code_ClientScheduleTimeout, reasonBackSourceDisabled) err := fmt.Errorf("%s, auto back source disabled", pt.failedReason) pt.span.RecordError(err) pt.Errorf(err.Error()) - return false, false + return } pt.Warnf("start download from source due to %s", reasonScheduleTimeout) pt.span.AddEvent("back source due to schedule timeout") pt.forceBackSource() - return false, true - } -} - -// Deprecated -func (pt *peerTaskConductor) waitAvailablePeerPacket() (int32, bool) { - // only <-pt.peerPacketReady continue loop, others break - select { - // when peer task without content length or total pieces count, match here - case <-pt.successCh: - pt.Infof("peer task success, stop wait available peer packet") - case <-pt.failCh: - pt.Infof("peer task fail, stop wait available peer packet") - case _, ok := <-pt.peerPacketReady: - if ok { - // preparePieceTasksByPeer func already send piece result with error - pt.Infof("new peer client ready, peer count: %d", len(pt.peerPacket.Load().(*schedulerv1.PeerPacket).CandidatePeers)) - // research from piece 0 - return 0, true - } - // when scheduler says commonv1.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady - pt.Infof("start download from source due to commonv1.Code_SchedNeedBackSource") - pt.span.AddEvent("back source due to scheduler says need back source ") - // TODO optimize back source when already downloaded some pieces - pt.backSource() - } - return -1, false -} - -// Deprecated -func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestQueue ring.Queue[DownloadPieceRequest], piecePacket *commonv1.PiecePacket) { - pieceCount := len(piecePacket.PieceInfos) - pt.Debugf("dispatch piece request, piece count: %d", pieceCount) - // fix cdn return zero piece info, but with total piece count and content length - if pieceCount == 0 { - finished := pt.isCompleted() - if finished { - pt.Done() - } - } - for _, piece := range piecePacket.PieceInfos { - pt.Infof("get piece %d from %s/%s, digest: %s, start: %d, size: %d", - piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize) - // FIXME when set total piece but no total digest, fetch again - pt.requestedPiecesLock.Lock() - if !pt.requestedPieces.IsSet(piece.PieceNum) { - pt.requestedPieces.Set(piece.PieceNum) - } - pt.requestedPiecesLock.Unlock() - req := &DownloadPieceRequest{ - storage: pt.GetStorage(), - piece: piece, - log: pt.Log(), - TaskID: pt.GetTaskID(), - PeerID: pt.GetPeerID(), - DstPid: piecePacket.DstPid, - DstAddr: piecePacket.DstAddr, - } - - pieceRequestQueue.Enqueue(req) - msg := fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum) - pt.span.AddEvent(msg) - pt.Infof(msg) - - select { - case <-pt.successCh: - pt.Infof("peer task success, stop dispatch piece request") - case <-pt.failCh: - pt.Warnf("peer task fail, stop dispatch piece request") - default: - } - } -} - -func (pt *peerTaskConductor) waitFailedPiece() (int32, bool) { - if pt.isCompleted() { - return -1, false - } -wait: - // use no default branch select to wait failed piece or exit - select { - case <-pt.successCh: - pt.Infof("peer task success, stop to wait failed piece") - return -1, false - case <-pt.failCh: - pt.Debugf("peer task fail, stop to wait failed piece") - return -1, false - case failed := <-pt.failedPieceCh: - pt.Warnf("download piece/%d failed, retry", failed) - return failed, true - case _, ok := <-pt.peerPacketReady: - if ok { - // preparePieceTasksByPeer func already send piece result with error - pt.Infof("new peer client ready, but all pieces are already downloading, just wait failed pieces") - goto wait - } - // when scheduler says commonv1.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady - pt.Infof("start download from source due to commonv1.Code_SchedNeedBackSource") - pt.span.AddEvent("back source due to scheduler says need back source") - pt.backSource() - return -1, false + return } } @@ -1312,25 +1053,6 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec }) pt.Infof("send failed piece %d to remote, attempt: %d, success: %d", request.piece.PieceNum, attempt, success) - - // when there is no legacy peers, skip send to failedPieceCh for legacy peers in background - if pt.legacyPeerCount.Load() == 0 { - pt.Infof("there is no legacy peers, skip send to failedPieceCh for legacy peers") - return - } - // Deprecated - // send to fail chan and retry - // try to send directly first, if failed channel is busy, create a new goroutine to do this - select { - case pt.failedPieceCh <- request.piece.PieceNum: - pt.Infof("success to send failed piece %d to failedPieceCh", request.piece.PieceNum) - default: - pt.Infof("start to send failed piece %d to failedPieceCh in background", request.piece.PieceNum) - go func() { - pt.failedPieceCh <- request.piece.PieceNum - pt.Infof("success to send failed piece %d to failedPieceCh in background", request.piece.PieceNum) - }() - } return } // broadcast success piece diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 1eac62955..7bcd135b7 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -83,7 +83,6 @@ type componentsOption struct { backSource bool scope commonv1.SizeScope content []byte - getPieceTasks bool reregister bool } @@ -130,42 +129,32 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio PieceMd5Sign: totalDigests, } } - if opt.getPieceTasks { - daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes(). - DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { - return genPiecePacket(request), nil - }) - daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(arg0 dfdaemonv1.Daemon_SyncPieceTasksServer) error { - return status.Error(codes.Unimplemented, "TODO") + daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { + return nil, status.Error(codes.Unimplemented, "TODO") }) - } else { - daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes(). - DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { - return nil, status.Error(codes.Unimplemented, "TODO") - }) - daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error { - request, err := s.Recv() + daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error { + request, err := s.Recv() + if err != nil { + return err + } + if err = s.Send(genPiecePacket(request)); err != nil { + return err + } + for { + request, err = s.Recv() + if err == io.EOF { + break + } if err != nil { return err } if err = s.Send(genPiecePacket(request)); err != nil { return err } - for { - request, err = s.Recv() - if err == io.EOF { - break - } - if err != nil { - return err - } - if err = s.Send(genPiecePacket(request)); err != nil { - return err - } - } - return nil - }) - } + } + return nil + }) ln, _ := rpc.Listen(dfnet.NetAddr{ Type: "tcp", Addr: fmt.Sprintf("0.0.0.0:%d", port), @@ -354,7 +343,6 @@ type testSpec struct { sizeScope commonv1.SizeScope peerID string url string - legacyFeature bool reregister bool // when urlGenerator is not nil, use urlGenerator instead url // it's useful for httptest server @@ -621,80 +609,76 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { if _tc.runTaskTypes == nil { types = taskTypes } - assert := testifyassert.New(t) - require := testifyrequire.New(t) - for _, legacy := range []bool{true, false} { - for _, typ := range types { - // dup a new test case with the task type - logger.Infof("-------------------- test %s - type %s, legacy feature: %v started --------------------", - _tc.name, taskTypeNames[typ], legacy) - tc := _tc - tc.taskType = typ - tc.legacyFeature = legacy - func() { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockContentLength := len(tc.taskData) + assert = testifyassert.New(t) + require = testifyrequire.New(t) + for _, typ := range types { + // dup a new test case with the task type + logger.Infof("-------------------- test %s - type %s, started --------------------", + _tc.name, taskTypeNames[typ]) + tc := _tc + tc.taskType = typ + func() { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockContentLength := len(tc.taskData) - urlMeta := &commonv1.UrlMeta{ - Tag: "d7y-test", - } + urlMeta := &commonv1.UrlMeta{ + Tag: "d7y-test", + } - if tc.httpRange != nil { - urlMeta.Range = strings.TrimLeft(tc.httpRange.String(), "bytes=") - } + if tc.httpRange != nil { + urlMeta.Range = strings.TrimLeft(tc.httpRange.String(), "bytes=") + } - if tc.urlGenerator != nil { - tc.url = tc.urlGenerator(&tc) - } - taskID := idgen.TaskID(tc.url, urlMeta) + if tc.urlGenerator != nil { + tc.url = tc.urlGenerator(&tc) + } + taskID := idgen.TaskID(tc.url, urlMeta) - var ( - downloader PieceDownloader - sourceClient source.ResourceClient - ) + var ( + downloader PieceDownloader + sourceClient source.ResourceClient + ) - if tc.mockPieceDownloader != nil { - downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize) - } + if tc.mockPieceDownloader != nil { + downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize) + } - if tc.mockHTTPSourceClient != nil { + if tc.mockHTTPSourceClient != nil { + source.UnRegister("http") + defer func() { + // reset source client source.UnRegister("http") - defer func() { - // reset source client - source.UnRegister("http") - require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter)) - }() - // replace source client - sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url) - require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter)) - } + require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter)) + }() + // replace source client + sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url) + require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter)) + } - option := componentsOption{ - taskID: taskID, - contentLength: int64(mockContentLength), - pieceSize: uint32(tc.pieceSize), - pieceParallelCount: tc.pieceParallelCount, - pieceDownloader: downloader, - sourceClient: sourceClient, - content: tc.taskData, - scope: tc.sizeScope, - peerPacketDelay: tc.peerPacketDelay, - backSource: tc.backSource, - getPieceTasks: tc.legacyFeature, - reregister: tc.reregister, - } - // keep peer task running in enough time to check "getOrCreatePeerTaskConductor" always return same - if tc.taskType == taskTypeConductor { - option.peerPacketDelay = []time.Duration{time.Second} - } - mm := setupMockManager(ctrl, &tc, option) - defer mm.CleanUp() + option := componentsOption{ + taskID: taskID, + contentLength: int64(mockContentLength), + pieceSize: uint32(tc.pieceSize), + pieceParallelCount: tc.pieceParallelCount, + pieceDownloader: downloader, + sourceClient: sourceClient, + content: tc.taskData, + scope: tc.sizeScope, + peerPacketDelay: tc.peerPacketDelay, + backSource: tc.backSource, + reregister: tc.reregister, + } + // keep peer task running in enough time to check "getOrCreatePeerTaskConductor" always return same + if tc.taskType == taskTypeConductor { + option.peerPacketDelay = []time.Duration{time.Second} + } + mm := setupMockManager(ctrl, &tc, option) + defer mm.CleanUp() - tc.run(assert, require, mm, urlMeta) - }() - logger.Infof("-------------------- test %s - type %s, finished --------------------", _tc.name, taskTypeNames[typ]) - } + tc.run(assert, require, mm, urlMeta) + }() + logger.Infof("-------------------- test %s - type %s, finished --------------------", _tc.name, taskTypeNames[typ]) } }) } diff --git a/client/daemon/peer/peertask_piecetask_poller.go b/client/daemon/peer/peertask_piecetask_poller.go deleted file mode 100644 index 8b3936a17..000000000 --- a/client/daemon/peer/peertask_piecetask_poller.go +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright 2022 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package peer - -import ( - "context" - "fmt" - "time" - - "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - commonv1 "d7y.io/api/pkg/apis/common/v1" - schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" - - "d7y.io/dragonfly/v2/client/config" - "d7y.io/dragonfly/v2/internal/dferrors" - "d7y.io/dragonfly/v2/pkg/dfnet" - "d7y.io/dragonfly/v2/pkg/retry" - dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" -) - -type pieceTaskPoller struct { - peerTaskConductor *peerTaskConductor - // getPiecesMaxRetry stands max retry to get pieces from one peer packet - getPiecesMaxRetry int -} - -func (poller *pieceTaskPoller) preparePieceTasks(request *commonv1.PieceTaskRequest) (pp *commonv1.PiecePacket, err error) { - ptc := poller.peerTaskConductor - defer ptc.recoverFromPanic() - var retryCount int -prepare: - retryCount++ - poller.peerTaskConductor.Debugf("prepare piece tasks, retry count: %d", retryCount) - peerPacket := ptc.peerPacket.Load().(*schedulerv1.PeerPacket) - - if poller.peerTaskConductor.needBackSource.Load() { - return nil, fmt.Errorf("need back source") - } - - for _, peer := range peerPacket.CandidatePeers { - if poller.peerTaskConductor.needBackSource.Load() { - return nil, fmt.Errorf("need back source") - } - - request.DstPid = peer.PeerId - pp, err = poller.preparePieceTasksByPeer(peerPacket, peer, request) - if err == nil { - return - } - if err == errPeerPacketChanged { - if poller.getPiecesMaxRetry > 0 && retryCount > poller.getPiecesMaxRetry { - err = fmt.Errorf("get pieces max retry count reached") - return - } - goto prepare - } - } - return -} - -func (poller *pieceTaskPoller) preparePieceTasksByPeer( - curPeerPacket *schedulerv1.PeerPacket, - peer *schedulerv1.PeerPacket_DestPeer, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { - ptc := poller.peerTaskConductor - if peer == nil { - return nil, fmt.Errorf("empty peer") - } - - var span trace.Span - _, span = tracer.Start(ptc.ctx, config.SpanGetPieceTasks) - span.SetAttributes(config.AttributeTargetPeerID.String(peer.PeerId)) - span.SetAttributes(config.AttributeGetPieceStartNum.Int(int(request.StartNum))) - span.SetAttributes(config.AttributeGetPieceLimit.Int(int(request.Limit))) - defer span.End() - - var maxRetries = 60 - // when cdn returns commonv1.Code_CDNTaskNotFound, report it to scheduler and wait cdn download it. -retry: - ptc.Debugf("try get piece task from peer %s, piece num: %d, limit: %d\"", peer.PeerId, request.StartNum, request.Limit) - p, err := poller.getPieceTasksByPeer(span, curPeerPacket, peer, request) - if err == nil { - ptc.Infof("get piece task from peer %s ok, pieces length: %d, totalPiece: %d, content length: %d, piece md5 sign: %s", - peer.PeerId, len(p.PieceInfos), p.TotalPiece, p.ContentLength, p.PieceMd5Sign) - span.SetAttributes(config.AttributeGetPieceCount.Int(len(p.PieceInfos))) - return p, nil - } - span.RecordError(err) - if err == errPeerPacketChanged { - return nil, err - } - ptc.Debugf("get piece task error: %s", err) - - // grpc error - if se, ok := err.(interface{ GRPCStatus() *status.Status }); ok { - ptc.Debugf("get piece task with grpc error, code: %d", se.GRPCStatus().Code()) - // context canceled, just exit - if se.GRPCStatus().Code() == codes.Canceled { - span.AddEvent("context canceled") - ptc.Warnf("get piece task from peer %s canceled: %s", peer.PeerId, err) - return nil, err - } - } - code := commonv1.Code_ClientPieceRequestFail - // not grpc error - if de, ok := err.(*dferrors.DfError); ok && uint32(de.Code) > uint32(codes.Unauthenticated) { - ptc.Debugf("get piece task from peer %s with df error, code: %d", peer.PeerId, de.Code) - code = de.Code - } - ptc.Errorf("get piece task from peer %s error: %s, code: %d", peer.PeerId, err, code) - sendError := ptc.sendPieceResult(&schedulerv1.PieceResult{ - TaskId: ptc.taskID, - SrcPid: ptc.peerID, - DstPid: peer.PeerId, - PieceInfo: &commonv1.PieceInfo{}, - Success: false, - Code: code, - HostLoad: nil, - FinishedCount: -1, - }) - // error code should be sent to scheduler and the scheduler can schedule a new peer - if sendError != nil { - ptc.cancel(commonv1.Code_SchedError, sendError.Error()) - span.RecordError(sendError) - ptc.Errorf("send piece result error: %s, code to send: %d", sendError, code) - return nil, sendError - } - - // currently, before cdn gc tasks, it did not notify scheduler, when cdn complains Code_CDNTaskNotFound, retry - if maxRetries > 0 && code == commonv1.Code_CDNTaskNotFound && curPeerPacket == ptc.peerPacket.Load().(*schedulerv1.PeerPacket) { - span.AddEvent("retry for CdnTaskNotFound") - time.Sleep(time.Second) - maxRetries-- - goto retry - } - return nil, err -} - -func (poller *pieceTaskPoller) getPieceTasksByPeer( - span trace.Span, - curPeerPacket *schedulerv1.PeerPacket, - peer *schedulerv1.PeerPacket_DestPeer, - request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { - var ( - peerPacketChanged bool - count int - ptc = poller.peerTaskConductor - ) - p, _, err := retry.Run(ptc.ctx, 0.05, 0.2, 40, func() (any, bool, error) { - // GetPieceTasks must be fast, so short time out is okay - ctx, cancel := context.WithTimeout(ptc.ctx, 4*time.Second) - defer cancel() - - netAddr := &dfnet.NetAddr{ - Type: dfnet.TCP, - Addr: fmt.Sprintf("%s:%d", peer.Ip, peer.RpcPort), - } - client, err := dfdaemonclient.GetInsecureClient(context.Background(), netAddr.String()) - if err != nil { - ptc.Errorf("get dfdaemon client error: %s", err) - span.RecordError(err) - return nil, true, err - } - - piecePacket, getError := client.GetPieceTasks(ctx, request) - // when GetPieceTasks returns err, exit retry - if getError != nil { - ptc.Errorf("get piece tasks with error: %s", getError) - span.RecordError(getError) - - // fast way 1 to exit retry - if de, ok := getError.(*dferrors.DfError); ok { - ptc.Debugf("get piece task with grpc error, code: %d", de.Code) - // bad request, like invalid piece num, just exit - if de.Code == commonv1.Code_BadRequest { - span.AddEvent("bad request") - ptc.Warnf("get piece task from peer %s canceled: %s", peer.PeerId, getError) - return nil, true, getError - } - } - - // fast way 2 to exit retry - lastPeerPacket := ptc.peerPacket.Load().(*schedulerv1.PeerPacket) - if curPeerPacket.CandidatePeers[0].PeerId != lastPeerPacket.CandidatePeers[0].PeerId { - ptc.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getError, - curPeerPacket.CandidatePeers[0].PeerId, lastPeerPacket.CandidatePeers[0].PeerId) - peerPacketChanged = true - return nil, true, nil - } - return nil, true, getError - } - // got any pieces - if len(piecePacket.PieceInfos) > 0 { - return piecePacket, false, nil - } - // need update metadata - if piecePacket.ContentLength > ptc.GetContentLength() || piecePacket.TotalPiece > ptc.GetTotalPieces() { - return piecePacket, false, nil - } - // invalid request num - if piecePacket.TotalPiece > -1 && uint32(piecePacket.TotalPiece) <= request.StartNum { - ptc.Warnf("invalid start num: %d, total piece: %d", request.StartNum, piecePacket.TotalPiece) - return piecePacket, false, nil - } - - // by santong: when peer return empty, retry later - sendError := ptc.sendPieceResult(&schedulerv1.PieceResult{ - TaskId: ptc.taskID, - SrcPid: ptc.peerID, - DstPid: peer.PeerId, - PieceInfo: &commonv1.PieceInfo{}, - Success: false, - Code: commonv1.Code_ClientWaitPieceReady, - HostLoad: nil, - FinishedCount: ptc.readyPieces.Settled(), - }) - if sendError != nil { - ptc.cancel(commonv1.Code_ClientPieceRequestFail, sendError.Error()) - span.RecordError(sendError) - ptc.Errorf("send piece result with commonv1.Code_ClientWaitPieceReady error: %s", sendError) - return nil, true, sendError - } - // fast way to exit retry - lastPeerPacket := ptc.peerPacket.Load().(*schedulerv1.PeerPacket) - if curPeerPacket.CandidatePeers[0].PeerId != lastPeerPacket.CandidatePeers[0].PeerId { - ptc.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", - curPeerPacket.CandidatePeers[0].PeerId, lastPeerPacket.CandidatePeers[0].PeerId) - peerPacketChanged = true - return nil, true, nil - } - count++ - span.AddEvent("retry due to empty pieces", - trace.WithAttributes(config.AttributeGetPieceRetry.Int(count))) - ptc.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId) - return nil, false, dferrors.ErrEmptyValue - }) - if peerPacketChanged { - return nil, errPeerPacketChanged - } - - if err == nil { - return p.(*commonv1.PiecePacket), nil - } - return nil, err -} diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index 87a77176b..0d4d7a103 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -75,29 +75,39 @@ type pieceTaskSynchronizerError struct { } // FIXME for compatibility, sync will be called after the dfdaemonclient.GetPieceTasks deprecated and the pieceTaskPoller removed -func (s *pieceTaskSyncManager) sync(pp *schedulerv1.PeerPacket, desiredPiece int32) error { +func (s *pieceTaskSyncManager) syncPeers(destPeers []*schedulerv1.PeerPacket_DestPeer, desiredPiece int32) error { + s.Lock() + defer func() { + if s.peerTaskConductor.WatchdogTimeout > 0 { + s.resetWatchdog(destPeers[0]) + } + s.Unlock() + }() + var ( peers = map[string]bool{} errs []error ) - peers[pp.MainPeer.PeerId] = true + // TODO if the worker failed, reconnect and retry - s.Lock() - defer s.Unlock() - if _, ok := s.workers[pp.MainPeer.PeerId]; !ok { - err := s.newPieceTaskSynchronizer(s.ctx, pp.MainPeer, desiredPiece) - if err != nil { - s.peerTaskConductor.Errorf("main peer SyncPieceTasks error: %s", err) - errs = append(errs, err) - } - } - for _, p := range pp.CandidatePeers { - peers[p.PeerId] = true - if _, ok := s.workers[p.PeerId]; !ok { - err := s.newPieceTaskSynchronizer(s.ctx, p, desiredPiece) - if err != nil { - s.peerTaskConductor.Errorf("candidate peer SyncPieceTasks error: %s", err) - errs = append(errs, err) + for _, peer := range destPeers { + peers[peer.PeerId] = true + if _, ok := s.workers[peer.PeerId]; !ok { + err := s.newPieceTaskSynchronizer(s.ctx, peer, desiredPiece) + if err == nil { + s.peerTaskConductor.Infof("connected to peer: %s", peer.PeerId) + continue + } + + // other errors, report to scheduler + if errors.Is(err, context.DeadlineExceeded) { + // connect timeout error, report to scheduler to get more available peers + s.reportInvalidPeer(peer, commonv1.Code_ClientConnectionError) + s.peerTaskConductor.Infof("connect peer %s with error: %s", peer.PeerId, err) + } else { + // other errors, report to scheduler to get more available peers + s.reportInvalidPeer(peer, commonv1.Code_ClientPieceRequestFail) + s.peerTaskConductor.Errorf("connect peer %s error: %s", peer.PeerId, err) } } } @@ -224,47 +234,6 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer( return nil } -func (s *pieceTaskSyncManager) newMultiPieceTaskSynchronizer( - destPeers []*schedulerv1.PeerPacket_DestPeer, - desiredPiece int32) (legacyPeers []*schedulerv1.PeerPacket_DestPeer) { - s.Lock() - defer func() { - if s.peerTaskConductor.WatchdogTimeout > 0 { - s.resetWatchdog(destPeers[0]) - } - s.Unlock() - }() - - for _, peer := range destPeers { - err := s.newPieceTaskSynchronizer(s.ctx, peer, desiredPiece) - if err == nil { - s.peerTaskConductor.Infof("connected to peer: %s", peer.PeerId) - continue - } - // when err is codes.Unimplemented, fallback to legacy get piece grpc - stat, ok := status.FromError(err) - if ok && stat.Code() == codes.Unimplemented { - // for legacy peers, when get pieces error, will report the error - s.peerTaskConductor.Warnf("connect peer %s error: %s, fallback to legacy get piece grpc", peer.PeerId, err) - legacyPeers = append(legacyPeers, peer) - continue - } - - // other errors, report to scheduler - if errors.Is(err, context.DeadlineExceeded) { - // connect timeout error, report to scheduler to get more available peers - s.reportInvalidPeer(peer, commonv1.Code_ClientConnectionError) - s.peerTaskConductor.Infof("connect to peer %s with error: %s, peer is invalid, skip legacy grpc", peer.PeerId, err) - } else { - // other errors, report to scheduler to get more available peers - s.reportInvalidPeer(peer, commonv1.Code_ClientPieceRequestFail) - s.peerTaskConductor.Errorf("connect peer %s error: %s, not codes.Unimplemented", peer.PeerId, err) - } - } - s.cleanStaleWorker(destPeers) - return legacyPeers -} - func (s *pieceTaskSyncManager) resetWatchdog(mainPeer *schedulerv1.PeerPacket_DestPeer) { if s.watchdog != nil { close(s.watchdog.done) diff --git a/client/daemon/peer/peertask_stream_backsource_partial_test.go b/client/daemon/peer/peertask_stream_backsource_partial_test.go index cdcc9475d..16abef2b4 100644 --- a/client/daemon/peer/peertask_stream_backsource_partial_test.go +++ b/client/daemon/peer/peertask_stream_backsource_partial_test.go @@ -35,9 +35,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" commonv1 "d7y.io/api/pkg/apis/common/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" @@ -99,8 +97,45 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, TotalPiece: pieceCount, }, nil }) - daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(arg0 dfdaemonv1.Daemon_SyncPieceTasksServer) error { - return status.Error(codes.Unimplemented, "TODO") + daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error { + request, err := s.Recv() + if err != nil { + return err + } + var tasks []*commonv1.PieceInfo + // only return first piece + if request.StartNum == 0 { + tasks = append(tasks, + &commonv1.PieceInfo{ + PieceNum: int32(request.StartNum), + RangeStart: uint64(0), + RangeSize: opt.pieceSize, + PieceMd5: digest.MD5FromBytes(testBytes[0:opt.pieceSize]), + PieceOffset: 0, + PieceStyle: 0, + }) + } + pp := &commonv1.PiecePacket{ + PieceMd5Sign: digest.SHA256FromStrings(piecesMd5...), + TaskId: request.TaskId, + DstPid: "peer-x", + PieceInfos: tasks, + ContentLength: opt.contentLength, + TotalPiece: pieceCount, + } + if err = s.Send(pp); err != nil { + return err + } + for { + _, err = s.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + } + return nil }) ln, _ := rpc.Listen(dfnet.NetAddr{ Type: "tcp", diff --git a/client/daemon/peer/traffic_shaper_test.go b/client/daemon/peer/traffic_shaper_test.go index e418329dc..7b8b193bb 100644 --- a/client/daemon/peer/traffic_shaper_test.go +++ b/client/daemon/peer/traffic_shaper_test.go @@ -132,10 +132,29 @@ func trafficShaperSetupPeerTaskManagerComponents(ctrl *gomock.Controller, opt tr } daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes(). DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { - return genPiecePacket(request), nil + return nil, status.Error(codes.Unimplemented, "TODO") }) - daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(arg0 dfdaemonv1.Daemon_SyncPieceTasksServer) error { - return status.Error(codes.Unimplemented, "TODO") + daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error { + request, err := s.Recv() + if err != nil { + return err + } + if err = s.Send(genPiecePacket(request)); err != nil { + return err + } + for { + request, err = s.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + if err = s.Send(genPiecePacket(request)); err != nil { + return err + } + } + return nil }) ln, _ := rpc.Listen(dfnet.NetAddr{ Type: "tcp",