feat: peer will back-to-source when task switch state failed (#1754)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
61a6e545e5
commit
14f88993ae
|
|
@ -80,12 +80,7 @@ func New(
|
|||
// RegisterPeerTask registers peer and triggers seed peer download task.
|
||||
func (s *Service) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) {
|
||||
// Register task and trigger seed peer download task.
|
||||
task, needBackToSource, err := s.registerTask(ctx, req)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error())
|
||||
logger.Error(msg)
|
||||
return nil, dferrors.New(commonv1.Code_SchedTaskStatusError, msg)
|
||||
}
|
||||
task, needBackToSource := s.registerTask(ctx, req)
|
||||
host := s.registerHost(ctx, req.PeerHost)
|
||||
peer := s.registerPeer(ctx, req.PeerId, task, host, req.UrlMeta.Tag, req.UrlMeta.Application)
|
||||
peer.Log.Infof("register peer task request: %#v %#v %#v", req, req.UrlMeta, req.HostLoad)
|
||||
|
|
@ -537,7 +532,7 @@ func (s *Service) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) er
|
|||
}
|
||||
|
||||
// registerTask creates a new task or reuses a previous task.
|
||||
func (s *Service) registerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*resource.Task, bool, error) {
|
||||
func (s *Service) registerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*resource.Task, bool) {
|
||||
task, loaded := s.resource.TaskManager().Load(req.TaskId)
|
||||
if loaded {
|
||||
// Task is the pointer, if the task already exists, the next request will
|
||||
|
|
@ -547,8 +542,8 @@ func (s *Service) registerTask(ctx context.Context, req *schedulerv1.PeerTaskReq
|
|||
|
||||
if !task.FSM.Is(resource.TaskStateFailed) &&
|
||||
!task.FSM.Is(resource.TaskStateLeave) && task.HasAvailablePeer() {
|
||||
task.Log.Infof("task state is %s", task.FSM.Current())
|
||||
return task, false, nil
|
||||
task.Log.Infof("task dose not need to back-to-source, because of task has available peer and state is %s", task.FSM.Current())
|
||||
return task, false
|
||||
}
|
||||
} else {
|
||||
// Create a task for the first time.
|
||||
|
|
@ -561,29 +556,34 @@ func (s *Service) registerTask(ctx context.Context, req *schedulerv1.PeerTaskReq
|
|||
// the concurrent download of the peer may trigger multiple seed peer downloads.
|
||||
if !task.FSM.Is(resource.TaskStateRunning) {
|
||||
if err := task.FSM.Event(resource.TaskEventDownload); err != nil {
|
||||
return nil, false, err
|
||||
task.Log.Errorf("task needs to back-to-source, because of %s", err.Error())
|
||||
return task, true
|
||||
}
|
||||
}
|
||||
|
||||
// Seed peer registers the task, then it needs to back-to-source.
|
||||
host, loaded := s.resource.HostManager().Load(req.PeerHost.Id)
|
||||
if loaded && host.Type != resource.HostTypeNormal {
|
||||
return task, true, nil
|
||||
task.Log.Infof("task needs to back-to-source, because of host can be loaded and type is %d", host.Type)
|
||||
return task, true
|
||||
}
|
||||
|
||||
// FIXME Need to add the condition that the seed peer grpc client is
|
||||
// available and can be triggered back-to-source.
|
||||
if s.config.SeedPeer.Enable {
|
||||
if task.IsSeedPeerFailed() {
|
||||
return task, true, nil
|
||||
task.Log.Info("task needs to back-to-source, because of seed peer is failed")
|
||||
return task, true
|
||||
}
|
||||
|
||||
go s.triggerSeedPeerTask(ctx, task)
|
||||
return task, false, nil
|
||||
task.Log.Info("task dose not need to back-to-source, because of seed peer has been triggered")
|
||||
return task, false
|
||||
}
|
||||
|
||||
// Task need to back-to-source.
|
||||
return task, true, nil
|
||||
task.Log.Info("task needs to back-to-source, because of seed peer disabled")
|
||||
return task, true
|
||||
}
|
||||
|
||||
// registerHost creates a new host or reuses a previous host.
|
||||
|
|
|
|||
|
|
@ -1749,9 +1749,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.False(needBackToSource)
|
||||
assert.EqualValues(mockTask, task)
|
||||
},
|
||||
|
|
@ -1777,9 +1776,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.False(needBackToSource)
|
||||
assert.EqualValues(mockTask, task)
|
||||
},
|
||||
|
|
@ -1805,9 +1803,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.False(needBackToSource)
|
||||
assert.EqualValues(mockTask, task)
|
||||
},
|
||||
|
|
@ -1844,9 +1841,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.False(needBackToSource)
|
||||
assert.EqualValues(mockTaskURL, task.URL)
|
||||
assert.EqualValues(mockTaskURLMeta, task.URLMeta)
|
||||
|
|
@ -1882,9 +1878,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.False(needBackToSource)
|
||||
assert.EqualValues(mockTask, task)
|
||||
},
|
||||
|
|
@ -1919,9 +1914,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.False(needBackToSource)
|
||||
assert.EqualValues(mockTask, task)
|
||||
},
|
||||
|
|
@ -1956,9 +1950,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.False(needBackToSource)
|
||||
assert.EqualValues(mockTask, task)
|
||||
},
|
||||
|
|
@ -1988,9 +1981,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mh.Load(gomock.Any()).Return(mockHost, true).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.True(needBackToSource)
|
||||
assert.EqualValues(mockTask, task)
|
||||
},
|
||||
|
|
@ -2027,9 +2019,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, errors.New("foo")).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.False(needBackToSource)
|
||||
assert.EqualValues(mockTaskURL, task.URL)
|
||||
assert.EqualValues(mockTaskURLMeta, task.URLMeta)
|
||||
|
|
@ -2065,9 +2056,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, errors.New("foo")).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.False(needBackToSource)
|
||||
assert.EqualValues(mockTask, task)
|
||||
},
|
||||
|
|
@ -2098,9 +2088,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mh.Load(gomock.Any()).Return(nil, false).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.True(needBackToSource)
|
||||
assert.EqualValues(mockTaskURL, task.URL)
|
||||
assert.EqualValues(mockTaskURLMeta, task.URLMeta)
|
||||
|
|
@ -2130,9 +2119,8 @@ func TestService_registerTask(t *testing.T) {
|
|||
mh.Load(gomock.Any()).Return(nil, false).Times(1),
|
||||
)
|
||||
|
||||
task, needBackToSource, err := svc.registerTask(context.Background(), req)
|
||||
task, needBackToSource := svc.registerTask(context.Background(), req)
|
||||
assert := assert.New(t)
|
||||
assert.NoError(err)
|
||||
assert.True(needBackToSource)
|
||||
assert.EqualValues(mockTask, task)
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in New Issue