fix: parent peertask conductor race condition (#3154)

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2024-03-29 15:44:08 +08:00 committed by GitHub
parent afc54df6b3
commit d18625b3e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 24 additions and 5 deletions

View File

@ -85,6 +85,10 @@ type peerTaskConductor struct {
needBackSource *atomic.Bool
seed bool
// sub peer task need ensure parent storage registered, success or failed
storageRegistered chan struct{}
storageRegisterSuccess bool
peerTaskManager *peerTaskManager
storage storage.TaskStorageDriver
@ -238,6 +242,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
seed: seed,
parent: parent,
rg: rg,
storageRegistered: make(chan struct{}),
}
ptc.pieceDownloadCtx, ptc.pieceDownloadCancel = context.WithCancel(ptc.ctx)
@ -1272,7 +1277,7 @@ func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, res
span.End()
}
func (pt *peerTaskConductor) initStorage(desiredLocation string) (err error) {
func (pt *peerTaskConductor) registerStorage(desiredLocation string) (err error) {
// prepare storage
if pt.parent == nil {
pt.storage, err = pt.StorageManager.RegisterTask(pt.ctx,
@ -1300,10 +1305,13 @@ func (pt *peerTaskConductor) initStorage(desiredLocation string) (err error) {
Range: pt.rg,
})
}
defer close(pt.storageRegistered)
if err != nil {
pt.Log().Errorf("register task to storage manager failed: %s", err)
}
return err
}
pt.storageRegisterSuccess = true
return nil
}
func (pt *peerTaskConductor) UpdateStorage() error {

View File

@ -203,6 +203,7 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
rg *nethttp.Range,
desiredLocation string,
seed bool) (*peerTaskConductor, bool, error) {
retry:
if ptc, ok := ptm.findPeerTaskConductor(taskID); ok {
logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID)
return ptc, false, nil
@ -226,7 +227,17 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
metrics.PeerTaskCount.Add(1)
logger.Debugf("peer task created: %s/%s", ptc.taskID, ptc.peerID)
err := ptc.initStorage(desiredLocation)
// wait parent RegisterTask done
if parent != nil {
<-parent.storageRegistered
if !parent.storageRegisterSuccess {
parent = nil
logger.Warnf("parent peer task %s/%s register failed, fallback to non-sub peer task", parent.taskID, parent.peerID)
goto retry
}
}
err := ptc.registerStorage(desiredLocation)
if err != nil {
ptc.Errorf("init storage error: %s", err)
ptc.cancelNotRegisterred(commonv1.Code_ClientError, err.Error())
@ -250,7 +261,7 @@ func (ptm *peerTaskManager) createSplitedPeerTaskConductor(
metrics.PeerTaskCount.Add(1)
logger.Debugf("standalone peer task created: %s/%s", ptc.taskID, ptc.peerID)
err := ptc.initStorage(desiredLocation)
err := ptc.registerStorage(desiredLocation)
if err != nil {
ptc.Errorf("init storage error: %s", err)
ptc.cancelNotRegisterred(commonv1.Code_ClientError, err.Error())
@ -290,7 +301,7 @@ func (ptm *peerTaskManager) prefetchParentTask(request *schedulerv1.PeerTaskRequ
limit = ptm.PerPeerRateLimit
}
logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId)
logger.Infof("prefetch peer task %s/%s, sub peer task %s/%s", taskID, req.PeerId, request.TaskId, request.PeerId)
prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit, nil, nil, desiredLocation, false)
if err != nil {
logger.Errorf("prefetch peer task %s error: %s", taskID, err)