From d18625b3e2801ececb7d2dbf6bf726ed494fcc6e Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Fri, 29 Mar 2024 15:44:08 +0800 Subject: [PATCH] fix: parent peertask conductor race condition (#3154) Signed-off-by: Jim Ma --- client/daemon/peer/peertask_conductor.go | 12 ++++++++++-- client/daemon/peer/peertask_manager.go | 17 ++++++++++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index f795d63a0..b3ddbca76 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -85,6 +85,10 @@ type peerTaskConductor struct { needBackSource *atomic.Bool seed bool + // sub peer task need ensure parent storage registered, success or failed + storageRegistered chan struct{} + storageRegisterSuccess bool + peerTaskManager *peerTaskManager storage storage.TaskStorageDriver @@ -238,6 +242,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor( seed: seed, parent: parent, rg: rg, + storageRegistered: make(chan struct{}), } ptc.pieceDownloadCtx, ptc.pieceDownloadCancel = context.WithCancel(ptc.ctx) @@ -1272,7 +1277,7 @@ func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, res span.End() } -func (pt *peerTaskConductor) initStorage(desiredLocation string) (err error) { +func (pt *peerTaskConductor) registerStorage(desiredLocation string) (err error) { // prepare storage if pt.parent == nil { pt.storage, err = pt.StorageManager.RegisterTask(pt.ctx, @@ -1300,10 +1305,13 @@ func (pt *peerTaskConductor) initStorage(desiredLocation string) (err error) { Range: pt.rg, }) } + defer close(pt.storageRegistered) if err != nil { pt.Log().Errorf("register task to storage manager failed: %s", err) + return err } - return err + pt.storageRegisterSuccess = true + return nil } func (pt *peerTaskConductor) UpdateStorage() error { diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index e2686c5b5..e365e9850 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -203,6 +203,7 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor( rg *nethttp.Range, desiredLocation string, seed bool) (*peerTaskConductor, bool, error) { +retry: if ptc, ok := ptm.findPeerTaskConductor(taskID); ok { logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID) return ptc, false, nil @@ -226,7 +227,17 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor( metrics.PeerTaskCount.Add(1) logger.Debugf("peer task created: %s/%s", ptc.taskID, ptc.peerID) - err := ptc.initStorage(desiredLocation) + // wait parent RegisterTask done + if parent != nil { + <-parent.storageRegistered + if !parent.storageRegisterSuccess { + parent = nil + logger.Warnf("parent peer task %s/%s register failed, fallback to non-sub peer task", parent.taskID, parent.peerID) + goto retry + } + } + + err := ptc.registerStorage(desiredLocation) if err != nil { ptc.Errorf("init storage error: %s", err) ptc.cancelNotRegisterred(commonv1.Code_ClientError, err.Error()) @@ -250,7 +261,7 @@ func (ptm *peerTaskManager) createSplitedPeerTaskConductor( metrics.PeerTaskCount.Add(1) logger.Debugf("standalone peer task created: %s/%s", ptc.taskID, ptc.peerID) - err := ptc.initStorage(desiredLocation) + err := ptc.registerStorage(desiredLocation) if err != nil { ptc.Errorf("init storage error: %s", err) ptc.cancelNotRegisterred(commonv1.Code_ClientError, err.Error()) @@ -290,7 +301,7 @@ func (ptm *peerTaskManager) prefetchParentTask(request *schedulerv1.PeerTaskRequ limit = ptm.PerPeerRateLimit } - logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId) + logger.Infof("prefetch peer task %s/%s, sub peer task %s/%s", taskID, req.PeerId, request.TaskId, request.PeerId) prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit, nil, nil, desiredLocation, false) if err != nil { logger.Errorf("prefetch peer task %s error: %s", taskID, err)