feat: add dynamic parallel count (#1088)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-02-23 14:26:35 +08:00
parent f56de57c01
commit 3c45cdd6ac
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
4 changed files with 96 additions and 56 deletions

View File

@ -55,7 +55,8 @@ type SchedulerClusterConfig struct {
} }
type SchedulerClusterClientConfig struct { type SchedulerClusterClientConfig struct {
LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=5000"` LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=5000"`
ParallelCount uint32 `yaml:"parallelCount" mapstructure:"parallelCount" json:"parallel_count" binding:"omitempty,gte=1,lte=50"`
} }
type SchedulerClusterScopes struct { type SchedulerClusterScopes struct {

View File

@ -92,11 +92,11 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
} }
// Initialize dynconfig client // Initialize dynconfig client
dynConfig, err := config.NewDynconfig(s.managerClient, d.CacheDir(), cfg) dynconfig, err := config.NewDynconfig(s.managerClient, d.CacheDir(), cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.dynconfig = dynConfig s.dynconfig = dynconfig
// Initialize GC // Initialize GC
s.gc = gc.New(gc.WithLogger(logger.GCLogger)) s.gc = gc.New(gc.WithLogger(logger.GCLogger))
@ -122,16 +122,16 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
} }
// Initialize resource // Initialize resource
resource, err := resource.New(cfg, s.gc, dynConfig, dialOptions...) resource, err := resource.New(cfg, s.gc, dynconfig, dialOptions...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Initialize scheduler // Initialize scheduler
scheduler := scheduler.New(cfg.Scheduler, d.PluginDir()) scheduler := scheduler.New(cfg.Scheduler, dynconfig, d.PluginDir())
// Initialize scheduler service // Initialize scheduler service
service := service.New(cfg, resource, scheduler, dynConfig) service := service.New(cfg, resource, scheduler, dynconfig)
// Initialize grpc service // Initialize grpc service
svr := rpcserver.New(service, serverOptions...) svr := rpcserver.New(service, serverOptions...)

View File

@ -31,6 +31,11 @@ import (
"d7y.io/dragonfly/v2/scheduler/scheduler/evaluator" "d7y.io/dragonfly/v2/scheduler/scheduler/evaluator"
) )
const (
// Default number of pieces downloaded in parallel
defaultParallelCount = 4
)
type Scheduler interface { type Scheduler interface {
// ScheduleParent schedule a parent and candidates to a peer // ScheduleParent schedule a parent and candidates to a peer
ScheduleParent(context.Context, *resource.Peer, set.SafeSet) ScheduleParent(context.Context, *resource.Peer, set.SafeSet)
@ -48,12 +53,16 @@ type scheduler struct {
// Scheduler configuration // Scheduler configuration
config *config.SchedulerConfig config *config.SchedulerConfig
// Scheduler dynamic configuration
dynconfig config.DynconfigInterface
} }
func New(cfg *config.SchedulerConfig, pluginDir string) Scheduler { func New(cfg *config.SchedulerConfig, dynconfig config.DynconfigInterface, pluginDir string) Scheduler {
return &scheduler{ return &scheduler{
evaluator: evaluator.New(cfg.Algorithm, pluginDir), evaluator: evaluator.New(cfg.Algorithm, pluginDir),
config: cfg, config: cfg,
dynconfig: dynconfig,
} }
} }
@ -172,7 +181,7 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer
return []*resource.Peer{}, false return []*resource.Peer{}, false
} }
if err := stream.Send(constructSuccessPeerPacket(peer, parents[0], parents[1:])); err != nil { if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, parents[0], parents[1:])); err != nil {
peer.Log.Error(err) peer.Log.Error(err)
return []*resource.Peer{}, false return []*resource.Peer{}, false
} }
@ -254,7 +263,12 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []
} }
// Construct peer successful packet // Construct peer successful packet
func constructSuccessPeerPacket(peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *rpcscheduler.PeerPacket { func constructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *rpcscheduler.PeerPacket {
parallelCount := defaultParallelCount
if client, ok := dynconfig.GetSchedulerClusterClientConfig(); ok && client.ParallelCount > 0 {
parallelCount = int(client.ParallelCount)
}
var stealPeers []*rpcscheduler.PeerPacket_DestPeer var stealPeers []*rpcscheduler.PeerPacket_DestPeer
for _, candidateParent := range candidateParents { for _, candidateParent := range candidateParents {
stealPeers = append(stealPeers, &rpcscheduler.PeerPacket_DestPeer{ stealPeers = append(stealPeers, &rpcscheduler.PeerPacket_DestPeer{
@ -265,10 +279,9 @@ func constructSuccessPeerPacket(peer *resource.Peer, parent *resource.Peer, cand
} }
return &rpcscheduler.PeerPacket{ return &rpcscheduler.PeerPacket{
TaskId: peer.Task.ID, TaskId: peer.Task.ID,
SrcPid: peer.ID, SrcPid: peer.ID,
// TODO(gaius-qi) Configure ParallelCount parameter in manager service ParallelCount: int32(parallelCount),
ParallelCount: 1,
MainPeer: &rpcscheduler.PeerPacket_DestPeer{ MainPeer: &rpcscheduler.PeerPacket_DestPeer{
Ip: parent.Host.IP, Ip: parent.Host.IP,
RpcPort: parent.Host.Port, RpcPort: parent.Host.Port,

View File

@ -26,12 +26,14 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/container/set" "d7y.io/dragonfly/v2/pkg/container/set"
"d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/base"
rpcscheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler" rpcscheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler"
rpcschedulermocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/mocks" rpcschedulermocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/mocks"
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
"d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduler/evaluator" "d7y.io/dragonfly/v2/scheduler/scheduler/evaluator"
) )
@ -109,7 +111,11 @@ func TestScheduler_New(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
tc.expect(t, New(mockSchedulerConfig, tc.pluginDir)) ctl := gomock.NewController(t)
defer ctl.Finish()
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
tc.expect(t, New(mockSchedulerConfig, dynconfig, tc.pluginDir))
}) })
} }
} }
@ -117,12 +123,12 @@ func TestScheduler_New(t *testing.T) {
func TestCallback_ScheduleParent(t *testing.T) { func TestCallback_ScheduleParent(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
mock func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) mock func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
expect func(t *testing.T, peer *resource.Peer) expect func(t *testing.T, peer *resource.Peer)
}{ }{
{ {
name: "context was done", name: "context was done",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
cancel() cancel()
}, },
@ -133,7 +139,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "cdn peer state is PeerStateFailed and peer stream load failed", name: "cdn peer state is PeerStateFailed and peer stream load failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
task.StorePeer(cdnPeer) task.StorePeer(cdnPeer)
@ -147,7 +153,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed", name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
task.StorePeer(cdnPeer) task.StorePeer(cdnPeer)
@ -167,7 +173,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code success", name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
task.StorePeer(cdnPeer) task.StorePeer(cdnPeer)
@ -188,7 +194,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed", name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
task.StorePeer(cdnPeer) task.StorePeer(cdnPeer)
@ -210,7 +216,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed", name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
task.StorePeer(cdnPeer) task.StorePeer(cdnPeer)
@ -232,7 +238,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "schedule exceeds RetryBackSourceLimit and peer stream load failed", name: "schedule exceeds RetryBackSourceLimit and peer stream load failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
@ -244,7 +250,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed", name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
@ -262,7 +268,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code success", name: "cdn peer state is PeerStateFailed and send Code_SchedNeedBackSource code success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
@ -281,7 +287,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed", name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
@ -301,7 +307,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed", name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
@ -321,7 +327,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "schedule exceeds RetryLimit and peer stream load failed", name: "schedule exceeds RetryLimit and peer stream load failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
@ -336,7 +342,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "schedule exceeds RetryLimit and send Code_SchedTaskStatusError code failed", name: "schedule exceeds RetryLimit and send Code_SchedTaskStatusError code failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
@ -354,7 +360,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "schedule exceeds RetryLimit and send Code_SchedTaskStatusError code success", name: "schedule exceeds RetryLimit and send Code_SchedTaskStatusError code success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
@ -372,14 +378,19 @@ func TestCallback_ScheduleParent(t *testing.T) {
}, },
{ {
name: "schedule succeeded", name: "schedule succeeded",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task task := peer.Task
task.StorePeer(peer) task.StorePeer(peer)
task.StorePeer(cdnPeer) task.StorePeer(cdnPeer)
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
cdnPeer.FSM.SetState(resource.PeerStateRunning) cdnPeer.FSM.SetState(resource.PeerStateRunning)
peer.StoreStream(stream) peer.StoreStream(stream)
mr.Send(gomock.Any()).Return(nil).Times(1) gomock.InOrder(
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{
ParallelCount: 2,
}, true).Times(1),
mr.Send(gomock.Any()).Return(nil).Times(1),
)
}, },
expect: func(t *testing.T, peer *resource.Peer) { expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t) assert := assert.New(t)
@ -395,6 +406,7 @@ func TestCallback_ScheduleParent(t *testing.T) {
ctl := gomock.NewController(t) ctl := gomock.NewController(t)
defer ctl.Finish() defer ctl.Finish()
stream := rpcschedulermocks.NewMockScheduler_ReportPieceResultServer(ctl) stream := rpcschedulermocks.NewMockScheduler_ReportPieceResultServer(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
mockHost := resource.NewHost(mockRawHost) mockHost := resource.NewHost(mockRawHost)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
@ -403,8 +415,8 @@ func TestCallback_ScheduleParent(t *testing.T) {
cdnPeer := resource.NewPeer(mockCDNPeerID, mockTask, mockCDNHost) cdnPeer := resource.NewPeer(mockCDNPeerID, mockTask, mockCDNHost)
blocklist := set.NewSafeSet() blocklist := set.NewSafeSet()
tc.mock(cancel, peer, cdnPeer, blocklist, stream, stream.EXPECT()) tc.mock(cancel, peer, cdnPeer, blocklist, stream, stream.EXPECT(), dynconfig.EXPECT())
scheduler := New(mockSchedulerConfig, mockPluginDir) scheduler := New(mockSchedulerConfig, dynconfig, mockPluginDir)
scheduler.ScheduleParent(ctx, peer, blocklist) scheduler.ScheduleParent(ctx, peer, blocklist)
tc.expect(t, peer) tc.expect(t, peer)
}) })
@ -414,12 +426,12 @@ func TestCallback_ScheduleParent(t *testing.T) {
func TestScheduler_NotifyAndFindParent(t *testing.T) { func TestScheduler_NotifyAndFindParent(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
mock func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) mock func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
expect func(t *testing.T, parents []*resource.Peer, ok bool) expect func(t *testing.T, parents []*resource.Peer, ok bool)
}{ }{
{ {
name: "peer state is PeerStatePending", name: "peer state is PeerStatePending",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStatePending) peer.FSM.SetState(resource.PeerStatePending)
}, },
expect: func(t *testing.T, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
@ -429,7 +441,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateReceivedSmall", name: "peer state is PeerStateReceivedSmall",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateReceivedSmall) peer.FSM.SetState(resource.PeerStateReceivedSmall)
}, },
expect: func(t *testing.T, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
@ -439,7 +451,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateReceivedNormal", name: "peer state is PeerStateReceivedNormal",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateReceivedNormal) peer.FSM.SetState(resource.PeerStateReceivedNormal)
}, },
expect: func(t *testing.T, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
@ -449,7 +461,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateBackToSource", name: "peer state is PeerStateBackToSource",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateBackToSource) peer.FSM.SetState(resource.PeerStateBackToSource)
}, },
expect: func(t *testing.T, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
@ -459,7 +471,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateSucceeded", name: "peer state is PeerStateSucceeded",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateSucceeded) peer.FSM.SetState(resource.PeerStateSucceeded)
}, },
expect: func(t *testing.T, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
@ -469,7 +481,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateFailed", name: "peer state is PeerStateFailed",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateFailed) peer.FSM.SetState(resource.PeerStateFailed)
}, },
expect: func(t *testing.T, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
@ -479,7 +491,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer state is PeerStateLeave", name: "peer state is PeerStateLeave",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateLeave) peer.FSM.SetState(resource.PeerStateLeave)
}, },
expect: func(t *testing.T, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
@ -489,7 +501,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "task peers is empty", name: "task peers is empty",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
}, },
expect: func(t *testing.T, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
@ -499,7 +511,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "task contains only one peer and peer is itself", name: "task contains only one peer and peer is itself",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer) peer.Task.StorePeer(peer)
}, },
@ -510,7 +522,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer is in blocklist", name: "peer is in blocklist",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
blocklist.Add(mockPeer.ID) blocklist.Add(mockPeer.ID)
@ -522,7 +534,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer is bad node", name: "peer is bad node",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
peer.FSM.SetState(resource.PeerStateFailed) peer.FSM.SetState(resource.PeerStateFailed)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
@ -534,7 +546,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "parent is peer's descendant", name: "parent is peer's descendant",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
@ -547,7 +559,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "parent is peer's ancestor", name: "parent is peer's ancestor",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
@ -560,7 +572,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "parent free upload load is zero", name: "parent free upload load is zero",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
@ -573,7 +585,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer stream is empty", name: "peer stream is empty",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
@ -586,13 +598,18 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "peer stream send failed", name: "peer stream send failed",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
mockPeer.Pieces.Set(0) mockPeer.Pieces.Set(0)
peer.StoreStream(stream) peer.StoreStream(stream)
ms.Send(gomock.Eq(constructSuccessPeerPacket(peer, mockPeer, []*resource.Peer{}))).Return(errors.New("foo")).Times(1) gomock.InOrder(
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{
ParallelCount: 2,
}, true).Times(1),
ms.Send(gomock.Any()).Return(errors.New("foo")).Times(1),
)
}, },
expect: func(t *testing.T, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
assert := assert.New(t) assert := assert.New(t)
@ -601,13 +618,18 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
}, },
{ {
name: "schedule parent", name: "schedule parent",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder) { mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning) peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeer) peer.Task.StorePeer(mockPeer)
mockPeer.Pieces.Set(0) mockPeer.Pieces.Set(0)
peer.StoreStream(stream) peer.StoreStream(stream)
ms.Send(gomock.Eq(constructSuccessPeerPacket(peer, mockPeer, []*resource.Peer{}))).Return(nil).Times(1) gomock.InOrder(
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{
ParallelCount: 2,
}, true).Times(1),
ms.Send(gomock.Any()).Return(nil).Times(1),
)
}, },
expect: func(t *testing.T, parents []*resource.Peer, ok bool) { expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
assert := assert.New(t) assert := assert.New(t)
@ -622,14 +644,15 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
ctl := gomock.NewController(t) ctl := gomock.NewController(t)
defer ctl.Finish() defer ctl.Finish()
stream := rpcschedulermocks.NewMockScheduler_ReportPieceResultServer(ctl) stream := rpcschedulermocks.NewMockScheduler_ReportPieceResultServer(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
mockHost := resource.NewHost(mockRawHost) mockHost := resource.NewHost(mockRawHost)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
peer := resource.NewPeer(mockPeerID, mockTask, mockHost) peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
mockPeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, mockHost) mockPeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, mockHost)
blocklist := set.NewSafeSet() blocklist := set.NewSafeSet()
tc.mock(peer, mockPeer, blocklist, stream, stream.EXPECT()) tc.mock(peer, mockPeer, blocklist, stream, dynconfig, stream.EXPECT(), dynconfig.EXPECT())
scheduler := New(mockSchedulerConfig, mockPluginDir) scheduler := New(mockSchedulerConfig, dynconfig, mockPluginDir)
parents, ok := scheduler.NotifyAndFindParent(context.Background(), peer, blocklist) parents, ok := scheduler.NotifyAndFindParent(context.Background(), peer, blocklist)
tc.expect(t, parents, ok) tc.expect(t, parents, ok)
}) })
@ -730,6 +753,9 @@ func TestScheduler_FindParent(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
mockHost := resource.NewHost(mockRawHost) mockHost := resource.NewHost(mockRawHost)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
peer := resource.NewPeer(mockPeerID, mockTask, mockHost) peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
@ -737,7 +763,7 @@ func TestScheduler_FindParent(t *testing.T) {
blocklist := set.NewSafeSet() blocklist := set.NewSafeSet()
tc.mock(peer, mockPeer, blocklist) tc.mock(peer, mockPeer, blocklist)
scheduler := New(mockSchedulerConfig, mockPluginDir) scheduler := New(mockSchedulerConfig, dynconfig, mockPluginDir)
parent, ok := scheduler.FindParent(context.Background(), peer, blocklist) parent, ok := scheduler.FindParent(context.Background(), peer, blocklist)
tc.expect(t, parent, ok) tc.expect(t, parent, ok)
}) })