diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 31640c4eb..337a6d62e 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -80,12 +80,7 @@ func New( // RegisterPeerTask registers peer and triggers seed peer download task. func (s *Service) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) { // Register task and trigger seed peer download task. - task, needBackToSource, err := s.registerTask(ctx, req) - if err != nil { - msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) - logger.Error(msg) - return nil, dferrors.New(commonv1.Code_SchedTaskStatusError, msg) - } + task, needBackToSource := s.registerTask(ctx, req) host := s.registerHost(ctx, req.PeerHost) peer := s.registerPeer(ctx, req.PeerId, task, host, req.UrlMeta.Tag, req.UrlMeta.Application) peer.Log.Infof("register peer task request: %#v %#v %#v", req, req.UrlMeta, req.HostLoad) @@ -537,7 +532,7 @@ func (s *Service) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) er } // registerTask creates a new task or reuses a previous task. -func (s *Service) registerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*resource.Task, bool, error) { +func (s *Service) registerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*resource.Task, bool) { task, loaded := s.resource.TaskManager().Load(req.TaskId) if loaded { // Task is the pointer, if the task already exists, the next request will @@ -547,8 +542,8 @@ func (s *Service) registerTask(ctx context.Context, req *schedulerv1.PeerTaskReq if !task.FSM.Is(resource.TaskStateFailed) && !task.FSM.Is(resource.TaskStateLeave) && task.HasAvailablePeer() { - task.Log.Infof("task state is %s", task.FSM.Current()) - return task, false, nil + task.Log.Infof("task dose not need to back-to-source, because of task has available peer and state is %s", task.FSM.Current()) + return task, false } } else { // Create a task for the first time. @@ -561,29 +556,34 @@ func (s *Service) registerTask(ctx context.Context, req *schedulerv1.PeerTaskReq // the concurrent download of the peer may trigger multiple seed peer downloads. if !task.FSM.Is(resource.TaskStateRunning) { if err := task.FSM.Event(resource.TaskEventDownload); err != nil { - return nil, false, err + task.Log.Errorf("task needs to back-to-source, because of %s", err.Error()) + return task, true } } // Seed peer registers the task, then it needs to back-to-source. host, loaded := s.resource.HostManager().Load(req.PeerHost.Id) if loaded && host.Type != resource.HostTypeNormal { - return task, true, nil + task.Log.Infof("task needs to back-to-source, because of host can be loaded and type is %d", host.Type) + return task, true } // FIXME Need to add the condition that the seed peer grpc client is // available and can be triggered back-to-source. if s.config.SeedPeer.Enable { if task.IsSeedPeerFailed() { - return task, true, nil + task.Log.Info("task needs to back-to-source, because of seed peer is failed") + return task, true } go s.triggerSeedPeerTask(ctx, task) - return task, false, nil + task.Log.Info("task dose not need to back-to-source, because of seed peer has been triggered") + return task, false } // Task need to back-to-source. - return task, true, nil + task.Log.Info("task needs to back-to-source, because of seed peer disabled") + return task, true } // registerHost creates a new host or reuses a previous host. diff --git a/scheduler/service/service_test.go b/scheduler/service/service_test.go index 1acf50acc..4bd8d2a1b 100644 --- a/scheduler/service/service_test.go +++ b/scheduler/service/service_test.go @@ -1749,9 +1749,8 @@ func TestService_registerTask(t *testing.T) { mt.Load(gomock.Any()).Return(mockTask, true).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.False(needBackToSource) assert.EqualValues(mockTask, task) }, @@ -1777,9 +1776,8 @@ func TestService_registerTask(t *testing.T) { mt.Load(gomock.Any()).Return(mockTask, true).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.False(needBackToSource) assert.EqualValues(mockTask, task) }, @@ -1805,9 +1803,8 @@ func TestService_registerTask(t *testing.T) { mt.Load(gomock.Any()).Return(mockTask, true).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.False(needBackToSource) assert.EqualValues(mockTask, task) }, @@ -1844,9 +1841,8 @@ func TestService_registerTask(t *testing.T) { mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.False(needBackToSource) assert.EqualValues(mockTaskURL, task.URL) assert.EqualValues(mockTaskURLMeta, task.URLMeta) @@ -1882,9 +1878,8 @@ func TestService_registerTask(t *testing.T) { mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.False(needBackToSource) assert.EqualValues(mockTask, task) }, @@ -1919,9 +1914,8 @@ func TestService_registerTask(t *testing.T) { mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.False(needBackToSource) assert.EqualValues(mockTask, task) }, @@ -1956,9 +1950,8 @@ func TestService_registerTask(t *testing.T) { mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.False(needBackToSource) assert.EqualValues(mockTask, task) }, @@ -1988,9 +1981,8 @@ func TestService_registerTask(t *testing.T) { mh.Load(gomock.Any()).Return(mockHost, true).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.True(needBackToSource) assert.EqualValues(mockTask, task) }, @@ -2027,9 +2019,8 @@ func TestService_registerTask(t *testing.T) { mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, errors.New("foo")).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.False(needBackToSource) assert.EqualValues(mockTaskURL, task.URL) assert.EqualValues(mockTaskURLMeta, task.URLMeta) @@ -2065,9 +2056,8 @@ func TestService_registerTask(t *testing.T) { mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, errors.New("foo")).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.False(needBackToSource) assert.EqualValues(mockTask, task) }, @@ -2098,9 +2088,8 @@ func TestService_registerTask(t *testing.T) { mh.Load(gomock.Any()).Return(nil, false).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.True(needBackToSource) assert.EqualValues(mockTaskURL, task.URL) assert.EqualValues(mockTaskURLMeta, task.URLMeta) @@ -2130,9 +2119,8 @@ func TestService_registerTask(t *testing.T) { mh.Load(gomock.Any()).Return(nil, false).Times(1), ) - task, needBackToSource, err := svc.registerTask(context.Background(), req) + task, needBackToSource := svc.registerTask(context.Background(), req) assert := assert.New(t) - assert.NoError(err) assert.True(needBackToSource) assert.EqualValues(mockTask, task) },