diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 0ea0748c7..a7b8ec025 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -398,8 +398,8 @@ func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) e func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRequest) (*resource.Task, error) { task := resource.NewTask(idgen.TaskID(req.Url, req.UrlMeta), req.Url, s.config.Scheduler.BackSourceCount, req.UrlMeta) task, loaded := s.resource.TaskManager().LoadOrStore(task) - if loaded && task.HasAvailablePeer() && (task.FSM.Is(resource.TaskStateSucceeded) || task.FSM.Is(resource.TaskStateRunning)) { - task.Log.Infof("task state is %s and it has available peer", task.FSM.Current()) + if loaded && !task.FSM.Is(resource.TaskStateFailed) { + task.Log.Infof("task state is %s", task.FSM.Current()) return task, nil } diff --git a/scheduler/service/service_test.go b/scheduler/service/service_test.go index 6d05cff18..1575a24ff 100644 --- a/scheduler/service/service_test.go +++ b/scheduler/service/service_test.go @@ -1332,6 +1332,33 @@ func TestService_registerTask(t *testing.T) { req *rpcscheduler.PeerTaskRequest run func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder) }{ + { + name: "task already exists and state is TaskStatePending", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{ + Enable: true, + }, + }, + req: &rpcscheduler.PeerTaskRequest{ + Url: mockTaskURL, + UrlMeta: mockTaskURLMeta, + }, + run: func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder) { + mockTask.FSM.SetState(resource.TaskStateRunning) + mockTask.StorePeer(mockPeer) + mockPeer.FSM.SetState(resource.PeerStateRunning) + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + ) + + task, err := svc.registerTask(context.Background(), req) + assert := assert.New(t) + assert.NoError(err) + assert.EqualValues(mockTask, task) + }, + }, { name: "task already exists and state is TaskStateRunning", config: &config.Config{ @@ -1406,7 +1433,7 @@ func TestService_registerTask(t *testing.T) { mockTask.FSM.SetState(resource.TaskStatePending) gomock.InOrder( mr.TaskManager().Return(taskManager).Times(1), - mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, false).Times(1), mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1), mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &rpcscheduler.PeerResult{}, nil).Times(1), ) @@ -1468,7 +1495,7 @@ func TestService_registerTask(t *testing.T) { mockTask.FSM.SetState(resource.TaskStatePending) gomock.InOrder( mr.TaskManager().Return(taskManager).Times(1), - mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, false).Times(1), mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1), mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &rpcscheduler.PeerResult{}, errors.New("foo")).Times(1), ) @@ -1527,7 +1554,7 @@ func TestService_registerTask(t *testing.T) { mockTask.FSM.SetState(resource.TaskStatePending) gomock.InOrder( mr.TaskManager().Return(taskManager).Times(1), - mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, false).Times(1), ) task, err := svc.registerTask(context.Background(), req)