fix: when peer state is PeerStateSucceeded, return size scope is small (#1103)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
83cdf39a9c
commit
27fcc904c0
|
|
@ -107,8 +107,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
|
||||||
fallthrough
|
fallthrough
|
||||||
case base.SizeScope_SMALL:
|
case base.SizeScope_SMALL:
|
||||||
peer.Log.Info("task size scope is small")
|
peer.Log.Info("task size scope is small")
|
||||||
// If the file is registered as a small type,
|
// There is no need to build a tree, just find the parent and return
|
||||||
// there is no need to build a tree, just find the parent and return
|
|
||||||
parent, ok := s.scheduler.FindParent(ctx, peer, set.NewSafeSet())
|
parent, ok := s.scheduler.FindParent(ctx, peer, set.NewSafeSet())
|
||||||
if !ok {
|
if !ok {
|
||||||
peer.Log.Warn("task size scope is small and it can not select parent")
|
peer.Log.Warn("task size scope is small and it can not select parent")
|
||||||
|
|
@ -124,6 +123,22 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When task size scope is small, parent must be downloaded successfully
|
||||||
|
// before returning to the parent directly
|
||||||
|
if !parent.FSM.Is(resource.PeerStateSucceeded) {
|
||||||
|
peer.Log.Infof("task size scope is small and download state %s is not PeerStateSucceeded", parent.FSM.Current())
|
||||||
|
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
|
||||||
|
dferr := dferrors.New(base.Code_SchedError, err.Error())
|
||||||
|
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
|
||||||
|
return nil, dferr
|
||||||
|
}
|
||||||
|
|
||||||
|
return &rpcscheduler.RegisterResult{
|
||||||
|
TaskId: task.ID,
|
||||||
|
SizeScope: base.SizeScope_NORMAL,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
firstPiece, ok := task.LoadPiece(0)
|
firstPiece, ok := task.LoadPiece(0)
|
||||||
if !ok {
|
if !ok {
|
||||||
peer.Log.Warn("task size scope is small and it can not get first piece")
|
peer.Log.Warn("task size scope is small and it can not get first piece")
|
||||||
|
|
|
||||||
|
|
@ -346,6 +346,46 @@ func TestService_RegisterPeerTask(t *testing.T) {
|
||||||
assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal))
|
assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal))
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "task scope size is SizeScope_SMALL and load piece error, parent state is PeerStateRunning",
|
||||||
|
req: &rpcscheduler.PeerTaskRequest{
|
||||||
|
PeerHost: &rpcscheduler.PeerHost{
|
||||||
|
Uuid: mockRawHost.Uuid,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
mock: func(
|
||||||
|
req *rpcscheduler.PeerTaskRequest, mockPeer *resource.Peer, mockCDNPeer *resource.Peer,
|
||||||
|
scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager,
|
||||||
|
ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder,
|
||||||
|
) {
|
||||||
|
mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded)
|
||||||
|
mockPeer.Task.StorePeer(mockCDNPeer)
|
||||||
|
mockPeer.Task.ContentLength.Store(129)
|
||||||
|
mockPeer.Task.StorePiece(&base.PieceInfo{
|
||||||
|
PieceNum: 0,
|
||||||
|
})
|
||||||
|
mockPeer.Task.TotalPieceCount.Store(1)
|
||||||
|
mockPeer.FSM.SetState(resource.PeerStatePending)
|
||||||
|
mockCDNPeer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
|
||||||
|
gomock.InOrder(
|
||||||
|
mr.TaskManager().Return(taskManager).Times(1),
|
||||||
|
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
|
||||||
|
mr.PeerManager().Return(peerManager).Times(1),
|
||||||
|
mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1),
|
||||||
|
ms.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockCDNPeer, true).Times(1),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.NoError(err)
|
||||||
|
assert.Equal(result.TaskId, peer.Task.ID)
|
||||||
|
assert.Equal(result.SizeScope, base.SizeScope_NORMAL)
|
||||||
|
assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal))
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "task scope size is SizeScope_SMALL and load piece error, peer state is PeerStateFailed",
|
name: "task scope size is SizeScope_SMALL and load piece error, peer state is PeerStateFailed",
|
||||||
req: &rpcscheduler.PeerTaskRequest{
|
req: &rpcscheduler.PeerTaskRequest{
|
||||||
|
|
@ -366,6 +406,8 @@ func TestService_RegisterPeerTask(t *testing.T) {
|
||||||
})
|
})
|
||||||
mockPeer.Task.TotalPieceCount.Store(1)
|
mockPeer.Task.TotalPieceCount.Store(1)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateFailed)
|
mockPeer.FSM.SetState(resource.PeerStateFailed)
|
||||||
|
mockCDNPeer.FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
mr.TaskManager().Return(taskManager).Times(1),
|
||||||
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
|
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
|
||||||
|
|
@ -403,6 +445,8 @@ func TestService_RegisterPeerTask(t *testing.T) {
|
||||||
})
|
})
|
||||||
mockPeer.Task.TotalPieceCount.Store(1)
|
mockPeer.Task.TotalPieceCount.Store(1)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateFailed)
|
mockPeer.FSM.SetState(resource.PeerStateFailed)
|
||||||
|
mockCDNPeer.FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
mr.TaskManager().Return(taskManager).Times(1),
|
||||||
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
|
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
|
||||||
|
|
@ -439,6 +483,8 @@ func TestService_RegisterPeerTask(t *testing.T) {
|
||||||
PieceNum: 0,
|
PieceNum: 0,
|
||||||
})
|
})
|
||||||
mockPeer.Task.TotalPieceCount.Store(1)
|
mockPeer.Task.TotalPieceCount.Store(1)
|
||||||
|
mockCDNPeer.FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
mr.TaskManager().Return(taskManager).Times(1),
|
||||||
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
|
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue