From 04a35509257f9c40e27ac16fd21d91e31665605b Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 8 Dec 2021 10:33:56 +0800 Subject: [PATCH] fix: scheduler success event (#891) Signed-off-by: Gaius --- scheduler/core/events.go | 41 ++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/scheduler/core/events.go b/scheduler/core/events.go index e94f39172..b3c5ea0c0 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -175,27 +175,28 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) { var candidates []*supervisor.Peer parentPeer, ok := s.peerManager.Get(e.pr.DstPid) - if ok { - if parentPeer.IsLeave() { - e.peer.Log().Warnf("peerDownloadPieceSuccessEvent: need reschedule parent for peer because it's parent is already left") - e.peer.ReplaceParent(nil) - var hasParent bool - parentPeer, candidates, hasParent = s.sched.ScheduleParent(e.peer, sets.NewString(parentPeer.ID)) - if !hasParent { - e.peer.Log().Warnf("peerDownloadPieceSuccessEvent: no parent node is currently available, " + - "reschedule it later") - s.waitScheduleParentPeerQueue.AddAfter(&rsPeer{peer: e.peer, blankParents: sets.NewString(parentPeer.ID)}, time.Second) - return - } - } - - if oldParent, ok := e.peer.GetParent(); e.pr.DstPid != e.peer.ID && (!ok || oldParent.ID != e.pr.DstPid) { - logger.WithTaskAndPeerID(e.peer.Task.ID, e.peer.ID).Debugf("parent peerID is not same as DestPid, replace it's parent node with %s", - e.pr.DstPid) - e.peer.ReplaceParent(parentPeer) - } - } else { + if !ok { e.peer.Log().Warnf("parent peer %s not found", e.pr.DstPid) + return + } + + if parentPeer.IsLeave() { + e.peer.Log().Warnf("peerDownloadPieceSuccessEvent: need reschedule parent for peer because it's parent is already left") + e.peer.ReplaceParent(nil) + var hasParent bool + parentPeer, candidates, hasParent = s.sched.ScheduleParent(e.peer, sets.NewString(parentPeer.ID)) + if !hasParent { + e.peer.Log().Warnf("peerDownloadPieceSuccessEvent: no parent node is currently available, " + + "reschedule it later") + s.waitScheduleParentPeerQueue.AddAfter(&rsPeer{peer: e.peer, blankParents: sets.NewString(parentPeer.ID)}, time.Second) + return + } + } + + if oldParent, ok := e.peer.GetParent(); e.pr.DstPid != e.peer.ID && (!ok || oldParent.ID != e.pr.DstPid) { + logger.WithTaskAndPeerID(e.peer.Task.ID, e.peer.ID).Debugf("parent peerID is not same as DestPid, replace it's parent node with %s", + e.pr.DstPid) + e.peer.ReplaceParent(parentPeer) } parentPeer.Touch()