From 3c45cdd6acccf9bc94dfaed856254af5acb4e34e Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 23 Feb 2022 14:26:35 +0800 Subject: [PATCH] feat: add dynamic parallel count (#1088) Signed-off-by: Gaius --- manager/types/scheduler_cluster.go | 3 +- scheduler/scheduler.go | 10 +-- scheduler/scheduler/scheduler.go | 27 +++++-- scheduler/scheduler/scheduler_test.go | 112 ++++++++++++++++---------- 4 files changed, 96 insertions(+), 56 deletions(-) diff --git a/manager/types/scheduler_cluster.go b/manager/types/scheduler_cluster.go index 63069abdb..1bc93610a 100644 --- a/manager/types/scheduler_cluster.go +++ b/manager/types/scheduler_cluster.go @@ -55,7 +55,8 @@ type SchedulerClusterConfig struct { } type SchedulerClusterClientConfig struct { - LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=5000"` + LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=5000"` + ParallelCount uint32 `yaml:"parallelCount" mapstructure:"parallelCount" json:"parallel_count" binding:"omitempty,gte=1,lte=50"` } type SchedulerClusterScopes struct { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index a7cfad5ae..491a777f1 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -92,11 +92,11 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err } // Initialize dynconfig client - dynConfig, err := config.NewDynconfig(s.managerClient, d.CacheDir(), cfg) + dynconfig, err := config.NewDynconfig(s.managerClient, d.CacheDir(), cfg) if err != nil { return nil, err } - s.dynconfig = dynConfig + s.dynconfig = dynconfig // Initialize GC s.gc = gc.New(gc.WithLogger(logger.GCLogger)) @@ -122,16 +122,16 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err } // Initialize resource - resource, err := resource.New(cfg, s.gc, dynConfig, dialOptions...) + resource, err := resource.New(cfg, s.gc, dynconfig, dialOptions...) if err != nil { return nil, err } // Initialize scheduler - scheduler := scheduler.New(cfg.Scheduler, d.PluginDir()) + scheduler := scheduler.New(cfg.Scheduler, dynconfig, d.PluginDir()) // Initialize scheduler service - service := service.New(cfg, resource, scheduler, dynConfig) + service := service.New(cfg, resource, scheduler, dynconfig) // Initialize grpc service svr := rpcserver.New(service, serverOptions...) diff --git a/scheduler/scheduler/scheduler.go b/scheduler/scheduler/scheduler.go index 31abb1f35..9d553acce 100644 --- a/scheduler/scheduler/scheduler.go +++ b/scheduler/scheduler/scheduler.go @@ -31,6 +31,11 @@ import ( "d7y.io/dragonfly/v2/scheduler/scheduler/evaluator" ) +const ( + // Default number of pieces downloaded in parallel + defaultParallelCount = 4 +) + type Scheduler interface { // ScheduleParent schedule a parent and candidates to a peer ScheduleParent(context.Context, *resource.Peer, set.SafeSet) @@ -48,12 +53,16 @@ type scheduler struct { // Scheduler configuration config *config.SchedulerConfig + + // Scheduler dynamic configuration + dynconfig config.DynconfigInterface } -func New(cfg *config.SchedulerConfig, pluginDir string) Scheduler { +func New(cfg *config.SchedulerConfig, dynconfig config.DynconfigInterface, pluginDir string) Scheduler { return &scheduler{ evaluator: evaluator.New(cfg.Algorithm, pluginDir), config: cfg, + dynconfig: dynconfig, } } @@ -172,7 +181,7 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer return []*resource.Peer{}, false } - if err := stream.Send(constructSuccessPeerPacket(peer, parents[0], parents[1:])); err != nil { + if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, parents[0], parents[1:])); err != nil { peer.Log.Error(err) return []*resource.Peer{}, false } @@ -254,7 +263,12 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) [] } // Construct peer successful packet -func constructSuccessPeerPacket(peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *rpcscheduler.PeerPacket { +func constructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *rpcscheduler.PeerPacket { + parallelCount := defaultParallelCount + if client, ok := dynconfig.GetSchedulerClusterClientConfig(); ok && client.ParallelCount > 0 { + parallelCount = int(client.ParallelCount) + } + var stealPeers []*rpcscheduler.PeerPacket_DestPeer for _, candidateParent := range candidateParents { stealPeers = append(stealPeers, &rpcscheduler.PeerPacket_DestPeer{ @@ -265,10 +279,9 @@ func constructSuccessPeerPacket(peer *resource.Peer, parent *resource.Peer, cand } return &rpcscheduler.PeerPacket{ - TaskId: peer.Task.ID, - SrcPid: peer.ID, - // TODO(gaius-qi) Configure ParallelCount parameter in manager service - ParallelCount: 1, + TaskId: peer.Task.ID, + SrcPid: peer.ID, + ParallelCount: int32(parallelCount), MainPeer: &rpcscheduler.PeerPacket_DestPeer{ Ip: parent.Host.IP, RpcPort: parent.Host.Port, diff --git a/scheduler/scheduler/scheduler_test.go b/scheduler/scheduler/scheduler_test.go index 528b72791..d14a1bcef 100644 --- a/scheduler/scheduler/scheduler_test.go +++ b/scheduler/scheduler/scheduler_test.go @@ -26,12 +26,14 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "d7y.io/dragonfly/v2/manager/types" "d7y.io/dragonfly/v2/pkg/container/set" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" rpcscheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler" rpcschedulermocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/mocks" "d7y.io/dragonfly/v2/scheduler/config" + configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduler/evaluator" ) @@ -109,7 +111,11 @@ func TestScheduler_New(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, New(mockSchedulerConfig, tc.pluginDir)) + ctl := gomock.NewController(t) + defer ctl.Finish() + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + + tc.expect(t, New(mockSchedulerConfig, dynconfig, tc.pluginDir)) }) } } @@ -117,12 +123,12 @@ func TestScheduler_New(t *testing.T) { func TestCallback_ScheduleParent(t *testing.T) { tests := []struct { name string - mock func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) + mock func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) expect func(t *testing.T, peer *resource.Peer) }{ { name: "context was done", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) cancel() }, @@ -133,7 +139,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "cdn peer state is PeerStateFailed and peer stream load failed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) task.StorePeer(cdnPeer) @@ -147,7 +153,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) task.StorePeer(cdnPeer) @@ -167,7 +173,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code success", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) task.StorePeer(cdnPeer) @@ -188,7 +194,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) task.StorePeer(cdnPeer) @@ -210,7 +216,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) task.StorePeer(cdnPeer) @@ -232,7 +238,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "schedule exceeds RetryBackSourceLimit and peer stream load failed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) peer.FSM.SetState(resource.PeerStateRunning) @@ -244,7 +250,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) peer.FSM.SetState(resource.PeerStateRunning) @@ -262,7 +268,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code success", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) peer.FSM.SetState(resource.PeerStateRunning) @@ -281,7 +287,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) peer.FSM.SetState(resource.PeerStateRunning) @@ -301,7 +307,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) peer.FSM.SetState(resource.PeerStateRunning) @@ -321,7 +327,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "schedule exceeds RetryLimit and peer stream load failed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) peer.FSM.SetState(resource.PeerStateRunning) @@ -336,7 +342,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "schedule exceeds RetryLimit and send Code_SchedTaskStatusError code failed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) peer.FSM.SetState(resource.PeerStateRunning) @@ -354,7 +360,7 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "schedule exceeds RetryLimit and send Code_SchedTaskStatusError code success", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) peer.FSM.SetState(resource.PeerStateRunning) @@ -372,14 +378,19 @@ func TestCallback_ScheduleParent(t *testing.T) { }, { name: "schedule succeeded", - mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) task.StorePeer(cdnPeer) peer.FSM.SetState(resource.PeerStateRunning) cdnPeer.FSM.SetState(resource.PeerStateRunning) peer.StoreStream(stream) - mr.Send(gomock.Any()).Return(nil).Times(1) + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{ + ParallelCount: 2, + }, true).Times(1), + mr.Send(gomock.Any()).Return(nil).Times(1), + ) }, expect: func(t *testing.T, peer *resource.Peer) { assert := assert.New(t) @@ -395,6 +406,7 @@ func TestCallback_ScheduleParent(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() stream := rpcschedulermocks.NewMockScheduler_ReportPieceResultServer(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) ctx, cancel := context.WithCancel(context.Background()) mockHost := resource.NewHost(mockRawHost) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) @@ -403,8 +415,8 @@ func TestCallback_ScheduleParent(t *testing.T) { cdnPeer := resource.NewPeer(mockCDNPeerID, mockTask, mockCDNHost) blocklist := set.NewSafeSet() - tc.mock(cancel, peer, cdnPeer, blocklist, stream, stream.EXPECT()) - scheduler := New(mockSchedulerConfig, mockPluginDir) + tc.mock(cancel, peer, cdnPeer, blocklist, stream, stream.EXPECT(), dynconfig.EXPECT()) + scheduler := New(mockSchedulerConfig, dynconfig, mockPluginDir) scheduler.ScheduleParent(ctx, peer, blocklist) tc.expect(t, peer) }) @@ -414,12 +426,12 @@ func TestCallback_ScheduleParent(t *testing.T) { func TestScheduler_NotifyAndFindParent(t *testing.T) { tests := []struct { name string - mock func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) + mock func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) expect func(t *testing.T, parents []*resource.Peer, ok bool) }{ { name: "peer state is PeerStatePending", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStatePending) }, expect: func(t *testing.T, parents []*resource.Peer, ok bool) { @@ -429,7 +441,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "peer state is PeerStateReceivedSmall", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateReceivedSmall) }, expect: func(t *testing.T, parents []*resource.Peer, ok bool) { @@ -439,7 +451,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "peer state is PeerStateReceivedNormal", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateReceivedNormal) }, expect: func(t *testing.T, parents []*resource.Peer, ok bool) { @@ -449,7 +461,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "peer state is PeerStateBackToSource", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateBackToSource) }, expect: func(t *testing.T, parents []*resource.Peer, ok bool) { @@ -459,7 +471,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "peer state is PeerStateSucceeded", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateSucceeded) }, expect: func(t *testing.T, parents []*resource.Peer, ok bool) { @@ -469,7 +481,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "peer state is PeerStateFailed", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateFailed) }, expect: func(t *testing.T, parents []*resource.Peer, ok bool) { @@ -479,7 +491,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "peer state is PeerStateLeave", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateLeave) }, expect: func(t *testing.T, parents []*resource.Peer, ok bool) { @@ -489,7 +501,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "task peers is empty", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) }, expect: func(t *testing.T, parents []*resource.Peer, ok bool) { @@ -499,7 +511,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "task contains only one peer and peer is itself", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) peer.Task.StorePeer(peer) }, @@ -510,7 +522,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "peer is in blocklist", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) peer.Task.StorePeer(mockPeer) blocklist.Add(mockPeer.ID) @@ -522,7 +534,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "peer is bad node", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateFailed) peer.Task.StorePeer(mockPeer) @@ -534,7 +546,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "parent is peer's descendant", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning) peer.Task.StorePeer(mockPeer) @@ -547,7 +559,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "parent is peer's ancestor", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning) peer.Task.StorePeer(mockPeer) @@ -560,7 +572,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "parent free upload load is zero", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning) peer.Task.StorePeer(mockPeer) @@ -573,7 +585,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "peer stream is empty", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning) peer.Task.StorePeer(mockPeer) @@ -586,13 +598,18 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "peer stream send failed", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning) peer.Task.StorePeer(mockPeer) mockPeer.Pieces.Set(0) peer.StoreStream(stream) - ms.Send(gomock.Eq(constructSuccessPeerPacket(peer, mockPeer, []*resource.Peer{}))).Return(errors.New("foo")).Times(1) + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{ + ParallelCount: 2, + }, true).Times(1), + ms.Send(gomock.Any()).Return(errors.New("foo")).Times(1), + ) }, expect: func(t *testing.T, parents []*resource.Peer, ok bool) { assert := assert.New(t) @@ -601,13 +618,18 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { }, { name: "schedule parent", - mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { + mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning) peer.Task.StorePeer(mockPeer) mockPeer.Pieces.Set(0) peer.StoreStream(stream) - ms.Send(gomock.Eq(constructSuccessPeerPacket(peer, mockPeer, []*resource.Peer{}))).Return(nil).Times(1) + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{ + ParallelCount: 2, + }, true).Times(1), + ms.Send(gomock.Any()).Return(nil).Times(1), + ) }, expect: func(t *testing.T, parents []*resource.Peer, ok bool) { assert := assert.New(t) @@ -622,14 +644,15 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() stream := rpcschedulermocks.NewMockScheduler_ReportPieceResultServer(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) mockHost := resource.NewHost(mockRawHost) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) mockPeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, mockHost) blocklist := set.NewSafeSet() - tc.mock(peer, mockPeer, blocklist, stream, stream.EXPECT()) - scheduler := New(mockSchedulerConfig, mockPluginDir) + tc.mock(peer, mockPeer, blocklist, stream, dynconfig, stream.EXPECT(), dynconfig.EXPECT()) + scheduler := New(mockSchedulerConfig, dynconfig, mockPluginDir) parents, ok := scheduler.NotifyAndFindParent(context.Background(), peer, blocklist) tc.expect(t, parents, ok) }) @@ -730,6 +753,9 @@ func TestScheduler_FindParent(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + dynconfig := configmocks.NewMockDynconfigInterface(ctl) mockHost := resource.NewHost(mockRawHost) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) @@ -737,7 +763,7 @@ func TestScheduler_FindParent(t *testing.T) { blocklist := set.NewSafeSet() tc.mock(peer, mockPeer, blocklist) - scheduler := New(mockSchedulerConfig, mockPluginDir) + scheduler := New(mockSchedulerConfig, dynconfig, mockPluginDir) parent, ok := scheduler.FindParent(context.Background(), peer, blocklist) tc.expect(t, parent, ok) })