feat: limit tree depth (#1099)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
fc2b1f8187
commit
83cdf39a9c
|
|
@ -308,20 +308,35 @@ func (p *Peer) ReplaceParent(parent *Peer) {
|
|||
p.StoreParent(parent)
|
||||
}
|
||||
|
||||
// TreeTotalNodeCount represents tree's total node count
|
||||
func (p *Peer) TreeTotalNodeCount() int {
|
||||
count := 1
|
||||
p.Children.Range(func(_, value interface{}) bool {
|
||||
node, ok := value.(*Peer)
|
||||
if !ok {
|
||||
return true
|
||||
// Depth represents depth of tree
|
||||
func (p *Peer) Depth() int {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
node := p
|
||||
var depth int
|
||||
for node != nil {
|
||||
depth++
|
||||
|
||||
if node.Host.IsCDN {
|
||||
break
|
||||
}
|
||||
|
||||
count += node.TreeTotalNodeCount()
|
||||
return true
|
||||
})
|
||||
parent, ok := node.LoadParent()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
return count
|
||||
// Prevent traversal tree from infinite loop
|
||||
if p == parent {
|
||||
p.Log.Info("tree structure produces an infinite loop")
|
||||
break
|
||||
}
|
||||
|
||||
node = parent
|
||||
}
|
||||
|
||||
return depth
|
||||
}
|
||||
|
||||
// IsDescendant determines whether it is ancestor of peer
|
||||
|
|
|
|||
|
|
@ -467,30 +467,43 @@ func TestPeer_ReplaceParent(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestPeer_TreeTotalNodeCount(t *testing.T) {
|
||||
func TestPeer_Depth(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
childID string
|
||||
expect func(t *testing.T, peer *Peer, mockChildPeer *Peer)
|
||||
name string
|
||||
expect func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer)
|
||||
}{
|
||||
{
|
||||
name: "get tree total node count",
|
||||
childID: idgen.PeerID("127.0.0.1"),
|
||||
expect: func(t *testing.T, peer *Peer, mockChildPeer *Peer) {
|
||||
name: "there is only one node in the tree",
|
||||
expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) {
|
||||
assert := assert.New(t)
|
||||
peer.StoreChild(mockChildPeer)
|
||||
assert.Equal(peer.TreeTotalNodeCount(), 2)
|
||||
mockChildPeer.ID = idgen.PeerID("0.0.0.0")
|
||||
peer.StoreChild(mockChildPeer)
|
||||
assert.Equal(peer.TreeTotalNodeCount(), 3)
|
||||
assert.Equal(peer.Depth(), 1)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "tree is empty",
|
||||
childID: idgen.PeerID("127.0.0.1"),
|
||||
expect: func(t *testing.T, peer *Peer, mockChildPeer *Peer) {
|
||||
name: "more than one node in the tree",
|
||||
expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) {
|
||||
peer.StoreParent(parent)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.Equal(peer.TreeTotalNodeCount(), 1)
|
||||
assert.Equal(peer.Depth(), 2)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "node parent is cdn",
|
||||
expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) {
|
||||
peer.StoreParent(cdnParent)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.Equal(peer.Depth(), 2)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "node parent is itself",
|
||||
expect: func(t *testing.T, peer *Peer, parent *Peer, cdnParent *Peer) {
|
||||
peer.StoreParent(peer)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.Equal(peer.Depth(), 1)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -498,11 +511,13 @@ func TestPeer_TreeTotalNodeCount(t *testing.T) {
|
|||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
mockHost := NewHost(mockRawHost)
|
||||
mockCDNHost := NewHost(mockRawCDNHost, WithIsCDN(true))
|
||||
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
|
||||
mockChildPeer := NewPeer(tc.childID, mockTask, mockHost)
|
||||
peer := NewPeer(mockPeerID, mockTask, mockHost)
|
||||
parent := NewPeer(idgen.PeerID("127.0.0.2"), mockTask, mockHost)
|
||||
cdnParent := NewPeer(mockCDNPeerID, mockTask, mockCDNHost)
|
||||
|
||||
tc.expect(t, peer, mockChildPeer)
|
||||
tc.expect(t, peer, parent, cdnParent)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,6 +37,9 @@ const (
|
|||
|
||||
// Default number of available parents after filtering
|
||||
defaultFilterParentCount = 5
|
||||
|
||||
// Default tree depth limit
|
||||
defaultDepthLimit = 10
|
||||
)
|
||||
|
||||
type Scheduler interface {
|
||||
|
|
@ -250,6 +253,11 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []
|
|||
return true
|
||||
}
|
||||
|
||||
if parent.Depth() > defaultDepthLimit {
|
||||
peer.Log.Infof("exceeds the %d depth limit of the tree", defaultDepthLimit)
|
||||
return true
|
||||
}
|
||||
|
||||
if parent.IsDescendant(peer) {
|
||||
peer.Log.Infof("parent %s is not selected because it is descendant", parent.ID)
|
||||
return true
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package scheduler
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -829,6 +830,23 @@ func TestScheduler_FindParent(t *testing.T) {
|
|||
assert.Contains([]string{mockPeers[0].ID, mockPeers[1].ID}, parent.ID)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "exceeds the depth limit of the tree",
|
||||
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet, md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||
peer.FSM.SetState(resource.PeerStateRunning)
|
||||
mockPeers[0].FSM.SetState(resource.PeerStateRunning)
|
||||
for i := 0; i < 10; i++ {
|
||||
mockPeers[i].StoreParent(mockPeers[i+1])
|
||||
}
|
||||
peer.Task.StorePeer(mockPeers[0])
|
||||
|
||||
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1)
|
||||
},
|
||||
expect: func(t *testing.T, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||
assert := assert.New(t)
|
||||
assert.False(ok)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
|
|
@ -839,12 +857,14 @@ func TestScheduler_FindParent(t *testing.T) {
|
|||
mockHost := resource.NewHost(mockRawHost)
|
||||
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
|
||||
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
|
||||
mockPeers := []*resource.Peer{
|
||||
resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, mockHost),
|
||||
resource.NewPeer(idgen.PeerID("127.0.0.2"), mockTask, mockHost),
|
||||
}
|
||||
blocklist := set.NewSafeSet()
|
||||
|
||||
var mockPeers []*resource.Peer
|
||||
for i := 0; i < 11; i++ {
|
||||
peer := resource.NewPeer(idgen.PeerID(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost)
|
||||
mockPeers = append(mockPeers, peer)
|
||||
}
|
||||
|
||||
blocklist := set.NewSafeSet()
|
||||
tc.mock(peer, mockPeers, blocklist, dynconfig.EXPECT())
|
||||
scheduler := New(mockSchedulerConfig, dynconfig, mockPluginDir)
|
||||
parent, ok := scheduler.FindParent(context.Background(), peer, blocklist)
|
||||
|
|
|
|||
Loading…
Reference in New Issue