feat: can not return peer with the same host (#1526)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-08-04 13:56:58 +08:00
parent 4984cdfbbc
commit f03fed4bc3
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
2 changed files with 80 additions and 21 deletions

View File

@ -267,6 +267,14 @@ func (s *scheduler) filterCandidateParents(peer *resource.Peer, blocklist set.Sa
continue continue
} }
// Candidate parent host is not allowed to be the same as the peer host,
// because dfdaemon cannot handle the situation
// where two tasks are downloading and downloading each other.
if peer.Host.ID == candidateParent.Host.ID {
peer.Log.Debugf("candidate parent %s host %s is the same as peer host", candidateParent.ID, candidateParent.Host.ID)
continue
}
// Candidate parent is bad node. // Candidate parent is bad node.
if s.evaluator.IsBadNode(candidateParent) { if s.evaluator.IsBadNode(candidateParent) {
peer.Log.Debugf("candidate parent %s is not selected because it is bad node", candidateParent.ID) peer.Log.Debugf("candidate parent %s is not selected because it is bad node", candidateParent.ID)

View File

@ -25,6 +25,7 @@ import (
"time" "time"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
commonv1 "d7y.io/api/pkg/apis/common/v1" commonv1 "d7y.io/api/pkg/apis/common/v1"
@ -327,12 +328,12 @@ func TestScheduler_ScheduleParent(t *testing.T) {
func TestScheduler_NotifyAndFindParent(t *testing.T) { func TestScheduler_NotifyAndFindParent(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
mock func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) mock func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
expect func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) expect func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool)
}{ }{
{ {
name: "peer state is PeerStatePending", name: "peer state is PeerStatePending",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStatePending) peer.FSM.SetState(resource.PeerStatePending)
}, },
expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) {
@ -342,7 +343,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateReceivedSmall", name: "peer state is PeerStateReceivedSmall",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateReceivedSmall) peer.FSM.SetState(resource.PeerStateReceivedSmall)
}, },
expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) {
@ -352,7 +353,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateReceivedNormal", name: "peer state is PeerStateReceivedNormal",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateReceivedNormal) peer.FSM.SetState(resource.PeerStateReceivedNormal)
}, },
expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) {
@ -362,7 +363,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateBackToSource", name: "peer state is PeerStateBackToSource",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateBackToSource) peer.FSM.SetState(resource.PeerStateBackToSource)
}, },
expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) {
@ -372,7 +373,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateSucceeded", name: "peer state is PeerStateSucceeded",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateSucceeded) peer.FSM.SetState(resource.PeerStateSucceeded)
}, },
expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) {
@ -382,7 +383,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateFailed", name: "peer state is PeerStateFailed",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateFailed) peer.FSM.SetState(resource.PeerStateFailed)
}, },
expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) {
@ -392,7 +393,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateLeave", name: "peer state is PeerStateLeave",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateLeave) peer.FSM.SetState(resource.PeerStateLeave)
}, },
expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, peer *resource.Peer, parents []*resource.Peer, ok bool) {
@ -402,7 +403,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "task peers is empty", name: "task peers is empty",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer) peer.Task.StorePeer(peer)
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1) md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1)
@ -414,7 +415,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "task contains only one peer and peer is itself", name: "task contains only one peer and peer is itself",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer) peer.Task.StorePeer(peer)
@ -427,7 +428,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer is in blocklist", name: "peer is in blocklist",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer) peer.Task.StorePeer(peer)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
@ -442,7 +443,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer is bad node", name: "peer is bad node",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
peer.FSM.SetState(resource.PeerStateFailed) peer.FSM.SetState(resource.PeerStateFailed)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
@ -454,7 +455,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "parent is peer's descendant", name: "parent is peer's descendant",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer) peer.Task.StorePeer(peer)
@ -472,7 +473,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "parent is peer's ancestor", name: "parent is peer's ancestor",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer) peer.Task.StorePeer(peer)
@ -490,7 +491,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "parent free upload load is zero", name: "parent free upload load is zero",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer) peer.Task.StorePeer(peer)
@ -506,7 +507,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer stream is empty", name: "peer stream is empty",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer) peer.Task.StorePeer(peer)
@ -522,7 +523,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer stream send failed", name: "peer stream send failed",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.BackToSourcePeers.Add(mockPeer.ID) peer.Task.BackToSourcePeers.Add(mockPeer.ID)
@ -546,10 +547,20 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "schedule parent", name: "schedule parent",
mock: func(peer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
candidatePeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, mockHost) candidatePeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, resource.NewHost(&schedulerv1.PeerHost{
Id: idgen.HostID(uuid.New().String(), 8003),
Ip: "127.0.0.1",
RpcPort: 8003,
DownPort: 8001,
HostName: "hostname",
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
NetTopology: "net_topology",
}))
candidatePeer.FSM.SetState(resource.PeerStateRunning) candidatePeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer) peer.Task.StorePeer(peer)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
@ -585,10 +596,20 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
mockHost := resource.NewHost(mockRawHost) mockHost := resource.NewHost(mockRawHost)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost) peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
mockPeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, mockHost) mockPeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, resource.NewHost(&schedulerv1.PeerHost{
Id: idgen.HostID(uuid.New().String(), 8003),
Ip: "127.0.0.1",
RpcPort: 8003,
DownPort: 8001,
HostName: "hostname",
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
NetTopology: "net_topology",
}))
blocklist := set.NewSafeSet[string]() blocklist := set.NewSafeSet[string]()
tc.mock(peer, mockHost, mockTask, mockPeer, blocklist, stream, dynconfig, stream.EXPECT(), dynconfig.EXPECT()) tc.mock(peer, mockTask, mockPeer, blocklist, stream, dynconfig, stream.EXPECT(), dynconfig.EXPECT())
scheduler := New(mockSchedulerConfig, dynconfig, mockPluginDir) scheduler := New(mockSchedulerConfig, dynconfig, mockPluginDir)
parents, ok := scheduler.NotifyAndFindParent(context.Background(), peer, blocklist) parents, ok := scheduler.NotifyAndFindParent(context.Background(), peer, blocklist)
tc.expect(t, peer, parents, ok) tc.expect(t, peer, parents, ok)
@ -795,6 +816,25 @@ func TestScheduler_FindParent(t *testing.T) {
assert.Equal(mockPeers[1].ID, parent.ID) assert.Equal(mockPeers[1].ID, parent.ID)
}, },
}, },
{
name: "find parent with same host",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
mockPeers[0].FSM.SetState(resource.PeerStateRunning)
mockPeers[1].FSM.SetState(resource.PeerStateRunning)
mockPeers[0].IsBackToSource.Store(true)
mockPeers[1].Host = peer.Host
peer.Task.StorePeer(peer)
peer.Task.StorePeer(mockPeers[0])
peer.Task.StorePeer(mockPeers[1])
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
assert := assert.New(t)
assert.True(ok)
assert.Equal(mockPeers[0].ID, parent.ID)
},
},
{ {
name: "find parent and fetch filterParentLimit from manager dynconfig", name: "find parent and fetch filterParentLimit from manager dynconfig",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
@ -836,6 +876,17 @@ func TestScheduler_FindParent(t *testing.T) {
var mockPeers []*resource.Peer var mockPeers []*resource.Peer
for i := 0; i < 11; i++ { for i := 0; i < 11; i++ {
mockHost := resource.NewHost(&schedulerv1.PeerHost{
Id: idgen.HostID(uuid.New().String(), 8003),
Ip: "127.0.0.1",
RpcPort: 8003,
DownPort: 8001,
HostName: "hostname",
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
NetTopology: "net_topology",
})
peer := resource.NewPeer(idgen.PeerID(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost) peer := resource.NewPeer(idgen.PeerID(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost)
mockPeers = append(mockPeers, peer) mockPeers = append(mockPeers, peer)
} }