fix: filter parent condition (#1277)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
3682b26647
commit
6fac39b3ca
|
|
@ -150,8 +150,19 @@ type Peer struct {
|
|||
BlockPeers set.SafeSet
|
||||
|
||||
// NeedBackToSource needs downloaded from source
|
||||
//
|
||||
// When peer is registering, at the same time,
|
||||
// scheduler needs to create the new corresponding task and the cdn is disabled,
|
||||
// NeedBackToSource is set to true
|
||||
NeedBackToSource *atomic.Bool
|
||||
|
||||
// IsBackToSource is downloaded from source
|
||||
//
|
||||
// When peer is scheduling and NeedBackToSource is true,
|
||||
// scheduler needs to return Code_SchedNeedBackSource and
|
||||
// IsBackToSource is set to true
|
||||
IsBackToSource *atomic.Bool
|
||||
|
||||
// CreateAt is peer create time
|
||||
CreateAt *atomic.Time
|
||||
|
||||
|
|
@ -181,6 +192,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
|
|||
StealPeers: set.NewSafeSet(),
|
||||
BlockPeers: set.NewSafeSet(),
|
||||
NeedBackToSource: atomic.NewBool(false),
|
||||
IsBackToSource: atomic.NewBool(false),
|
||||
CreateAt: atomic.NewTime(time.Now()),
|
||||
UpdateAt: atomic.NewTime(time.Now()),
|
||||
mu: &sync.RWMutex{},
|
||||
|
|
@ -226,6 +238,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
|
|||
p.Log.Infof("peer state is %s", e.FSM.Current())
|
||||
},
|
||||
PeerEventDownloadFromBackToSource: func(e *fsm.Event) {
|
||||
p.IsBackToSource.Store(true)
|
||||
p.Task.BackToSourcePeers.Add(p)
|
||||
p.DeleteParent()
|
||||
p.Host.DeletePeer(p.ID)
|
||||
|
|
|
|||
|
|
@ -159,19 +159,19 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer
|
|||
return []*resource.Peer{}, false
|
||||
}
|
||||
|
||||
// Find the parent that can be scheduled
|
||||
parents := s.filterParents(peer, blocklist)
|
||||
if len(parents) == 0 {
|
||||
peer.Log.Info("can not find parents")
|
||||
// Find the candidate parent that can be scheduled
|
||||
candidateParents := s.filterCandidateParents(peer, blocklist)
|
||||
if len(candidateParents) == 0 {
|
||||
peer.Log.Info("can not find candidate parents")
|
||||
return []*resource.Peer{}, false
|
||||
}
|
||||
|
||||
// Sort parents by evaluation score
|
||||
// Sort candidate parents by evaluation score
|
||||
taskTotalPieceCount := peer.Task.TotalPieceCount.Load()
|
||||
sort.Slice(
|
||||
parents,
|
||||
candidateParents,
|
||||
func(i, j int) bool {
|
||||
return s.evaluator.Evaluate(parents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(parents[j], peer, taskTotalPieceCount)
|
||||
return s.evaluator.Evaluate(candidateParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(candidateParents[j], peer, taskTotalPieceCount)
|
||||
},
|
||||
)
|
||||
|
||||
|
|
@ -182,56 +182,56 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer
|
|||
return []*resource.Peer{}, false
|
||||
}
|
||||
|
||||
if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, parents[0], parents[1:])); err != nil {
|
||||
if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, candidateParents[0], candidateParents[1:])); err != nil {
|
||||
peer.Log.Error(err)
|
||||
return []*resource.Peer{}, false
|
||||
}
|
||||
|
||||
// Add steal peers to current peer
|
||||
peer.StealPeers.Clear()
|
||||
for _, parent := range parents[1:] {
|
||||
peer.StealPeers.Add(parent.ID)
|
||||
for _, candidateParent := range candidateParents[1:] {
|
||||
peer.StealPeers.Add(candidateParent.ID)
|
||||
}
|
||||
|
||||
// Replace peer's parent with scheduled parent
|
||||
peer.ReplaceParent(parents[0])
|
||||
peer.ReplaceParent(candidateParents[0])
|
||||
peer.Log.Infof("schedule parent successful, replace parent to %s and steal peers is %v",
|
||||
parents[0].ID, peer.StealPeers.Values())
|
||||
candidateParents[0].ID, peer.StealPeers.Values())
|
||||
peer.Log.Debugf("peer ancestors is %v", peer.Ancestors())
|
||||
return parents, true
|
||||
return candidateParents, true
|
||||
}
|
||||
|
||||
// FindParent finds parent that best matches the evaluation
|
||||
func (s *scheduler) FindParent(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet) (*resource.Peer, bool) {
|
||||
// Filter the parent that can be scheduled
|
||||
parents := s.filterParents(peer, blocklist)
|
||||
if len(parents) == 0 {
|
||||
peer.Log.Info("can not find parents")
|
||||
// Filter the candidate parent that can be scheduled
|
||||
candidateParents := s.filterCandidateParents(peer, blocklist)
|
||||
if len(candidateParents) == 0 {
|
||||
peer.Log.Info("can not find candidate parents")
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Sort parents by evaluation score
|
||||
// Sort candidate parents by evaluation score
|
||||
taskTotalPieceCount := peer.Task.TotalPieceCount.Load()
|
||||
sort.Slice(
|
||||
parents,
|
||||
candidateParents,
|
||||
func(i, j int) bool {
|
||||
return s.evaluator.Evaluate(parents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(parents[j], peer, taskTotalPieceCount)
|
||||
return s.evaluator.Evaluate(candidateParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(candidateParents[j], peer, taskTotalPieceCount)
|
||||
},
|
||||
)
|
||||
|
||||
peer.Log.Infof("find parent %s successful", parents[0].ID)
|
||||
return parents[0], true
|
||||
peer.Log.Infof("find parent %s successful", candidateParents[0].ID)
|
||||
return candidateParents[0], true
|
||||
}
|
||||
|
||||
// Filter the parent that can be scheduled
|
||||
func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []*resource.Peer {
|
||||
// Filter the candidate parent that can be scheduled
|
||||
func (s *scheduler) filterCandidateParents(peer *resource.Peer, blocklist set.SafeSet) []*resource.Peer {
|
||||
filterParentLimit := config.DefaultSchedulerFilterParentLimit
|
||||
if config, ok := s.dynconfig.GetSchedulerClusterConfig(); ok && filterParentLimit > 0 {
|
||||
filterParentLimit = int(config.FilterParentLimit)
|
||||
}
|
||||
|
||||
var parents []*resource.Peer
|
||||
var parentIDs []string
|
||||
var candidateParents []*resource.Peer
|
||||
var candidateParentIDs []string
|
||||
var n int
|
||||
peer.Task.Peers.Range(func(_, value interface{}) bool {
|
||||
if n > filterParentLimit {
|
||||
|
|
@ -239,70 +239,82 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []
|
|||
}
|
||||
n++
|
||||
|
||||
parent, ok := value.(*resource.Peer)
|
||||
candidateParent, ok := value.(*resource.Peer)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
if blocklist.Contains(parent.ID) {
|
||||
peer.Log.Debugf("parent %s is not selected because it is in blocklist", parent.ID)
|
||||
// Candidate parent is in blocklist
|
||||
if blocklist.Contains(candidateParent.ID) {
|
||||
peer.Log.Debugf("candidate parent %s is not selected because it is in blocklist", candidateParent.ID)
|
||||
return true
|
||||
}
|
||||
|
||||
if parent.ID == peer.ID {
|
||||
peer.Log.Debug("parent is not selected because it is same")
|
||||
// Candidate parent is itself
|
||||
if candidateParent.ID == peer.ID {
|
||||
peer.Log.Debug("candidate parent is not selected because it is same")
|
||||
return true
|
||||
}
|
||||
|
||||
if s.evaluator.IsBadNode(parent) {
|
||||
peer.Log.Debugf("parent %s is not selected because it is bad node", parent.ID)
|
||||
// Candidate parent is bad node
|
||||
if s.evaluator.IsBadNode(candidateParent) {
|
||||
peer.Log.Debugf("candidate parent %s is not selected because it is bad node", candidateParent.ID)
|
||||
return true
|
||||
}
|
||||
|
||||
_, ok = parent.LoadParent()
|
||||
isBackToSource := peer.Task.BackToSourcePeers.Contains(parent)
|
||||
if !ok && !parent.Host.IsCDN && !isBackToSource {
|
||||
peer.Log.Debugf("parent %s is not selected, because its download state is %t %t %t",
|
||||
parent.ID, ok, parent.Host.IsCDN, isBackToSource)
|
||||
// Conditions for candidate parent to be a parent:
|
||||
// 1. candidate parent has parent
|
||||
// 2. candidate parent is CDN
|
||||
// 3. candidate parent has been back-to-source
|
||||
_, ok = candidateParent.LoadParent()
|
||||
isBackToSource := candidateParent.IsBackToSource.Load()
|
||||
if !ok && !candidateParent.Host.IsCDN && !isBackToSource {
|
||||
peer.Log.Debugf("candidate parent %s is not selected, because its download state is %t %t %t",
|
||||
candidateParent.ID, ok, candidateParent.Host.IsCDN, isBackToSource)
|
||||
return true
|
||||
}
|
||||
|
||||
// Candidate parent's depth exceeds available depth
|
||||
peerChildCount := peer.ChildCount.Load()
|
||||
parentDepth := parent.Depth()
|
||||
parentDepth := candidateParent.Depth()
|
||||
if peerChildCount > 0 && parentDepth > defaultAvailableDepth {
|
||||
peer.Log.Debugf("peer has %d children and parent %s depth is %d", peerChildCount, parent.ID, parentDepth)
|
||||
peer.Log.Debugf("candidate peer has %d children and parent %s depth is %d", peerChildCount, candidateParent.ID, parentDepth)
|
||||
return true
|
||||
}
|
||||
|
||||
// Peer's depth exceeds limit depth
|
||||
peerDepth := peer.Depth()
|
||||
if parentDepth+peerDepth > defaultDepthLimit {
|
||||
peer.Log.Debugf("exceeds the %d depth limit of the tree, peer depth is %d, parent %s is %d", defaultDepthLimit, peerDepth, parent.ID, parentDepth)
|
||||
peer.Log.Debugf("exceeds the %d depth limit of the tree, peer depth is %d, candidate parent %s is %d", defaultDepthLimit, peerDepth, candidateParent.ID, parentDepth)
|
||||
return true
|
||||
}
|
||||
|
||||
if parent.IsDescendant(peer) {
|
||||
peer.Log.Debugf("parent %s is not selected because it is descendant", parent.ID)
|
||||
// Candidate parent is an descendant of peer
|
||||
if candidateParent.IsDescendant(peer) {
|
||||
peer.Log.Debugf("candidate parent %s is not selected because it is descendant", candidateParent.ID)
|
||||
return true
|
||||
}
|
||||
|
||||
if parent.IsAncestor(peer) {
|
||||
peer.Log.Debugf("parent %s is not selected because it is ancestor", parent.ID)
|
||||
// Candidate parent is an ancestor of peer
|
||||
if candidateParent.IsAncestor(peer) {
|
||||
peer.Log.Debugf("candidate parent %s is not selected because it is ancestor", candidateParent.ID)
|
||||
return true
|
||||
}
|
||||
|
||||
if parent.Host.FreeUploadLoad() <= 0 {
|
||||
peer.Log.Debugf("parent %s is not selected because its free upload is empty, upload limit is %d, upload peer count is %d",
|
||||
parent.ID, parent.Host.UploadLoadLimit.Load(), parent.Host.UploadPeerCount.Load())
|
||||
// Candidate parent's free upload is empty
|
||||
if candidateParent.Host.FreeUploadLoad() <= 0 {
|
||||
peer.Log.Debugf("candidate parent %s is not selected because its free upload is empty, upload limit is %d, upload peer count is %d",
|
||||
candidateParent.ID, candidateParent.Host.UploadLoadLimit.Load(), candidateParent.Host.UploadPeerCount.Load())
|
||||
return true
|
||||
}
|
||||
|
||||
parents = append(parents, parent)
|
||||
parentIDs = append(parentIDs, parent.ID)
|
||||
candidateParents = append(candidateParents, candidateParent)
|
||||
candidateParentIDs = append(candidateParentIDs, candidateParent.ID)
|
||||
return true
|
||||
})
|
||||
|
||||
peer.Log.Infof("candidate parents include %#v", parentIDs)
|
||||
return parents
|
||||
peer.Log.Infof("candidate parents include %#v", candidateParentIDs)
|
||||
return candidateParents
|
||||
}
|
||||
|
||||
// Construct peer successful packet
|
||||
|
|
|
|||
|
|
@ -671,6 +671,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
|
|||
peer.FSM.SetState(resource.PeerStateRunning)
|
||||
mockPeer.FSM.SetState(resource.PeerStateRunning)
|
||||
peer.Task.BackToSourcePeers.Add(mockPeer)
|
||||
mockPeer.IsBackToSource.Store(true)
|
||||
peer.Task.StorePeer(mockPeer)
|
||||
mockPeer.Pieces.Set(0)
|
||||
peer.StoreStream(stream)
|
||||
|
|
@ -700,6 +701,8 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
|
|||
peer.Task.StorePeer(stealPeer)
|
||||
peer.Task.BackToSourcePeers.Add(mockPeer)
|
||||
peer.Task.BackToSourcePeers.Add(stealPeer)
|
||||
mockPeer.IsBackToSource.Store(true)
|
||||
stealPeer.IsBackToSource.Store(true)
|
||||
mockPeer.Pieces.Set(0)
|
||||
peer.StoreStream(stream)
|
||||
gomock.InOrder(
|
||||
|
|
@ -838,6 +841,8 @@ func TestScheduler_FindParent(t *testing.T) {
|
|||
peer.Task.StorePeer(mockPeers[1])
|
||||
peer.Task.BackToSourcePeers.Add(mockPeers[0])
|
||||
peer.Task.BackToSourcePeers.Add(mockPeers[1])
|
||||
mockPeers[0].IsBackToSource.Store(true)
|
||||
mockPeers[1].IsBackToSource.Store(true)
|
||||
mockPeers[0].Pieces.Set(0)
|
||||
mockPeers[1].Pieces.Set(0)
|
||||
mockPeers[1].Pieces.Set(1)
|
||||
|
|
@ -907,6 +912,8 @@ func TestScheduler_FindParent(t *testing.T) {
|
|||
peer.Task.StorePeer(mockPeers[1])
|
||||
peer.Task.BackToSourcePeers.Add(mockPeers[0])
|
||||
peer.Task.BackToSourcePeers.Add(mockPeers[1])
|
||||
mockPeers[0].IsBackToSource.Store(true)
|
||||
mockPeers[1].IsBackToSource.Store(true)
|
||||
mockPeers[0].Pieces.Set(0)
|
||||
mockPeers[1].Pieces.Set(0)
|
||||
mockPeers[1].Pieces.Set(1)
|
||||
|
|
|
|||
Loading…
Reference in New Issue