feat: registerTask returns to the task in time (#1250)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-04-14 12:19:11 +08:00
parent b2c9577e7b
commit f26a63b96e
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
2 changed files with 32 additions and 5 deletions

View File

@ -398,8 +398,8 @@ func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) e
func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRequest) (*resource.Task, error) { func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRequest) (*resource.Task, error) {
task := resource.NewTask(idgen.TaskID(req.Url, req.UrlMeta), req.Url, s.config.Scheduler.BackSourceCount, req.UrlMeta) task := resource.NewTask(idgen.TaskID(req.Url, req.UrlMeta), req.Url, s.config.Scheduler.BackSourceCount, req.UrlMeta)
task, loaded := s.resource.TaskManager().LoadOrStore(task) task, loaded := s.resource.TaskManager().LoadOrStore(task)
if loaded && task.HasAvailablePeer() && (task.FSM.Is(resource.TaskStateSucceeded) || task.FSM.Is(resource.TaskStateRunning)) { if loaded && !task.FSM.Is(resource.TaskStateFailed) {
task.Log.Infof("task state is %s and it has available peer", task.FSM.Current()) task.Log.Infof("task state is %s", task.FSM.Current())
return task, nil return task, nil
} }

View File

@ -1332,6 +1332,33 @@ func TestService_registerTask(t *testing.T) {
req *rpcscheduler.PeerTaskRequest req *rpcscheduler.PeerTaskRequest
run func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder) run func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder)
}{ }{
{
name: "task already exists and state is TaskStatePending",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{
Enable: true,
},
},
req: &rpcscheduler.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: mockTaskURLMeta,
},
run: func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder) {
mockTask.FSM.SetState(resource.TaskStateRunning)
mockTask.StorePeer(mockPeer)
mockPeer.FSM.SetState(resource.PeerStateRunning)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
)
task, err := svc.registerTask(context.Background(), req)
assert := assert.New(t)
assert.NoError(err)
assert.EqualValues(mockTask, task)
},
},
{ {
name: "task already exists and state is TaskStateRunning", name: "task already exists and state is TaskStateRunning",
config: &config.Config{ config: &config.Config{
@ -1406,7 +1433,7 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStatePending) mockTask.FSM.SetState(resource.TaskStatePending)
gomock.InOrder( gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1), mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), mt.LoadOrStore(gomock.Any()).Return(mockTask, false).Times(1),
mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1), mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1),
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &rpcscheduler.PeerResult{}, nil).Times(1), mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &rpcscheduler.PeerResult{}, nil).Times(1),
) )
@ -1468,7 +1495,7 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStatePending) mockTask.FSM.SetState(resource.TaskStatePending)
gomock.InOrder( gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1), mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), mt.LoadOrStore(gomock.Any()).Return(mockTask, false).Times(1),
mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1), mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1),
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &rpcscheduler.PeerResult{}, errors.New("foo")).Times(1), mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &rpcscheduler.PeerResult{}, errors.New("foo")).Times(1),
) )
@ -1527,7 +1554,7 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStatePending) mockTask.FSM.SetState(resource.TaskStatePending)
gomock.InOrder( gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1), mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), mt.LoadOrStore(gomock.Any()).Return(mockTask, false).Times(1),
) )
task, err := svc.registerTask(context.Background(), req) task, err := svc.registerTask(context.Background(), req)