feat: scheduler handles seed peer failed (#1325)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-05-24 21:18:17 +08:00
parent 3bd1f03c43
commit 3fa7c7be75
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
6 changed files with 35 additions and 151 deletions

View File

@ -20,6 +20,7 @@ package resource
import (
"context"
"time"
"github.com/pkg/errors"
@ -34,6 +35,11 @@ const (
SeedBizTag = "d7y/seed"
)
const (
// Default value of seed peer failed timeout.
SeedPeerFailedTimeout = 30 * time.Minute
)
type SeedPeer interface {
// TriggerTask triggers the seed peer to download the task.
TriggerTask(context.Context, *Task) (*Peer, *rpcscheduler.PeerResult, error)

View File

@ -278,7 +278,7 @@ func (t *Task) LoadSeedPeer() (*Peer, bool) {
// IsSeedPeerFailed returns whether the seed peer in the task failed.
func (t *Task) IsSeedPeerFailed() bool {
seedPeer, ok := t.LoadSeedPeer()
return ok && seedPeer.FSM.Is(PeerStateFailed)
return ok && seedPeer.FSM.Is(PeerStateFailed) && time.Since(seedPeer.CreateAt.Load()) < SeedPeerFailedTimeout
}
// LoadPiece return piece for a key.

View File

@ -459,6 +459,18 @@ func TestTask_IsSeedPeerFailed(t *testing.T) {
task.StorePeer(mockSeedPeer)
mockSeedPeer.FSM.SetState(PeerStateSucceeded)
assert.False(task.IsSeedPeerFailed())
},
},
{
name: "seed peer failed timeout",
expect: func(t *testing.T, task *Task, mockPeer *Peer, mockSeedPeer *Peer) {
assert := assert.New(t)
task.StorePeer(mockPeer)
task.StorePeer(mockSeedPeer)
mockSeedPeer.CreateAt.Store(time.Now().Add(-SeedPeerFailedTimeout))
mockSeedPeer.FSM.SetState(PeerStateFailed)
assert.False(task.IsSeedPeerFailed())
},
},

View File

@ -80,11 +80,10 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo
default:
}
// If the scheduling exceeds the RetryBackSourceLimit or the latest seed peer state is PeerStateFailed,
// If the scheduling exceeds the RetryBackSourceLimit or peer needs back-to-source,
// peer will download the task back-to-source.
isSeedPeerFailed := peer.Task.IsSeedPeerFailed()
needBackToSource := peer.NeedBackToSource.Load()
if (n >= s.config.RetryBackSourceLimit || isSeedPeerFailed || needBackToSource) &&
if (n >= s.config.RetryBackSourceLimit || needBackToSource) &&
peer.Task.CanBackToSource() {
stream, ok := peer.LoadStream()
if !ok {
@ -92,8 +91,8 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo
return
}
peer.Log.Infof("peer downloads back-to-source, scheduling %d times, seed peer is failed %t, peer need back-to-source %t",
n, isSeedPeerFailed, needBackToSource)
peer.Log.Infof("peer downloads back-to-source, scheduling %d times, peer need back-to-source %t",
n, needBackToSource)
// Notify peer back-to-source.
if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource}); err != nil {

View File

@ -142,13 +142,12 @@ func TestScheduler_ScheduleParent(t *testing.T) {
},
},
{
name: "seed peer state is PeerStateFailed and peer stream load failed",
name: "peer needs back-to-source and peer stream load failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
task.StorePeer(seedPeer)
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
seedPeer.FSM.SetState(resource.PeerStateFailed)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
@ -156,14 +155,12 @@ func TestScheduler_ScheduleParent(t *testing.T) {
},
},
{
name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed",
name: "peer needs back-to-source and send Code_SchedNeedBackSource code failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
task.StorePeer(seedPeer)
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
seedPeer.FSM.SetState(resource.PeerStateFailed)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(errors.New("foo")).Times(1)
@ -171,33 +168,12 @@ func TestScheduler_ScheduleParent(t *testing.T) {
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.True(ok)
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
task.StorePeer(seedPeer)
seedPeer.FSM.SetState(resource.PeerStateFailed)
peer.FSM.SetState(resource.PeerStateRunning)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
},
},
{
name: "peer need back-to-source and send Code_SchedNeedBackSource code success",
name: "peer needs back-to-source and send Code_SchedNeedBackSource code success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
@ -216,37 +192,13 @@ func TestScheduler_ScheduleParent(t *testing.T) {
},
},
{
name: "seed peer state is PeerStateFailed and task state is PeerStateFailed",
name: "peer needs back-to-source and task state is TaskStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
task.StorePeer(seedPeer)
seedPeer.FSM.SetState(resource.PeerStateFailed)
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
task.StorePeer(seedPeer)
peer.FSM.SetState(resource.PeerStateRunning)
seedPeer.FSM.SetState(resource.PeerStateFailed)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1)
@ -272,95 +224,6 @@ func TestScheduler_ScheduleParent(t *testing.T) {
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1),
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(errors.New("foo")).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.True(ok)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1),
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
},
},
{
name: "seed peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1),
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1),
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStateRunning))
},
},
{
name: "schedule exceeds RetryLimit and peer stream load failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {

View File

@ -535,6 +535,10 @@ func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRe
// Start trigger seed peer task.
if s.config.SeedPeer.Enable {
if task.IsSeedPeerFailed() {
return task, true, nil
}
go s.triggerSeedPeerTask(ctx, task)
return task, false, nil
}