From 3fa7c7be756b3f42f1445e7677296d58bc1c75c9 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 24 May 2022 21:18:17 +0800 Subject: [PATCH] feat: scheduler handles seed peer failed (#1325) Signed-off-by: Gaius --- scheduler/resource/seed_peer.go | 6 + scheduler/resource/task.go | 2 +- scheduler/resource/task_test.go | 12 ++ scheduler/scheduler/scheduler.go | 9 +- scheduler/scheduler/scheduler_test.go | 153 ++------------------------ scheduler/service/service.go | 4 + 6 files changed, 35 insertions(+), 151 deletions(-) diff --git a/scheduler/resource/seed_peer.go b/scheduler/resource/seed_peer.go index fcaf54511..e6875701c 100644 --- a/scheduler/resource/seed_peer.go +++ b/scheduler/resource/seed_peer.go @@ -20,6 +20,7 @@ package resource import ( "context" + "time" "github.com/pkg/errors" @@ -34,6 +35,11 @@ const ( SeedBizTag = "d7y/seed" ) +const ( + // Default value of seed peer failed timeout. + SeedPeerFailedTimeout = 30 * time.Minute +) + type SeedPeer interface { // TriggerTask triggers the seed peer to download the task. TriggerTask(context.Context, *Task) (*Peer, *rpcscheduler.PeerResult, error) diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index 7ec2fbc0d..37362a60c 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -278,7 +278,7 @@ func (t *Task) LoadSeedPeer() (*Peer, bool) { // IsSeedPeerFailed returns whether the seed peer in the task failed. func (t *Task) IsSeedPeerFailed() bool { seedPeer, ok := t.LoadSeedPeer() - return ok && seedPeer.FSM.Is(PeerStateFailed) + return ok && seedPeer.FSM.Is(PeerStateFailed) && time.Since(seedPeer.CreateAt.Load()) < SeedPeerFailedTimeout } // LoadPiece return piece for a key. diff --git a/scheduler/resource/task_test.go b/scheduler/resource/task_test.go index dbfc16905..f507933ac 100644 --- a/scheduler/resource/task_test.go +++ b/scheduler/resource/task_test.go @@ -459,6 +459,18 @@ func TestTask_IsSeedPeerFailed(t *testing.T) { task.StorePeer(mockSeedPeer) mockSeedPeer.FSM.SetState(PeerStateSucceeded) + assert.False(task.IsSeedPeerFailed()) + }, + }, + { + name: "seed peer failed timeout", + expect: func(t *testing.T, task *Task, mockPeer *Peer, mockSeedPeer *Peer) { + assert := assert.New(t) + task.StorePeer(mockPeer) + task.StorePeer(mockSeedPeer) + mockSeedPeer.CreateAt.Store(time.Now().Add(-SeedPeerFailedTimeout)) + mockSeedPeer.FSM.SetState(PeerStateFailed) + assert.False(task.IsSeedPeerFailed()) }, }, diff --git a/scheduler/scheduler/scheduler.go b/scheduler/scheduler/scheduler.go index 86abb33f9..79926c9b8 100644 --- a/scheduler/scheduler/scheduler.go +++ b/scheduler/scheduler/scheduler.go @@ -80,11 +80,10 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo default: } - // If the scheduling exceeds the RetryBackSourceLimit or the latest seed peer state is PeerStateFailed, + // If the scheduling exceeds the RetryBackSourceLimit or peer needs back-to-source, // peer will download the task back-to-source. - isSeedPeerFailed := peer.Task.IsSeedPeerFailed() needBackToSource := peer.NeedBackToSource.Load() - if (n >= s.config.RetryBackSourceLimit || isSeedPeerFailed || needBackToSource) && + if (n >= s.config.RetryBackSourceLimit || needBackToSource) && peer.Task.CanBackToSource() { stream, ok := peer.LoadStream() if !ok { @@ -92,8 +91,8 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo return } - peer.Log.Infof("peer downloads back-to-source, scheduling %d times, seed peer is failed %t, peer need back-to-source %t", - n, isSeedPeerFailed, needBackToSource) + peer.Log.Infof("peer downloads back-to-source, scheduling %d times, peer need back-to-source %t", + n, needBackToSource) // Notify peer back-to-source. if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource}); err != nil { diff --git a/scheduler/scheduler/scheduler_test.go b/scheduler/scheduler/scheduler_test.go index d95757786..8e7a89bbb 100644 --- a/scheduler/scheduler/scheduler_test.go +++ b/scheduler/scheduler/scheduler_test.go @@ -142,13 +142,12 @@ func TestScheduler_ScheduleParent(t *testing.T) { }, }, { - name: "seed peer state is PeerStateFailed and peer stream load failed", + name: "peer needs back-to-source and peer stream load failed", mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) - task.StorePeer(seedPeer) + peer.NeedBackToSource.Store(true) peer.FSM.SetState(resource.PeerStateRunning) - seedPeer.FSM.SetState(resource.PeerStateFailed) }, expect: func(t *testing.T, peer *resource.Peer) { assert := assert.New(t) @@ -156,14 +155,12 @@ func TestScheduler_ScheduleParent(t *testing.T) { }, }, { - name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed", + name: "peer needs back-to-source and send Code_SchedNeedBackSource code failed", mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) - task.StorePeer(seedPeer) + peer.NeedBackToSource.Store(true) peer.FSM.SetState(resource.PeerStateRunning) - seedPeer.FSM.SetState(resource.PeerStateFailed) - peer.StoreParent(seedPeer) peer.StoreStream(stream) mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(errors.New("foo")).Times(1) @@ -171,33 +168,12 @@ func TestScheduler_ScheduleParent(t *testing.T) { expect: func(t *testing.T, peer *resource.Peer) { assert := assert.New(t) _, ok := peer.LoadParent() - assert.True(ok) + assert.False(ok) assert.True(peer.FSM.Is(resource.PeerStateRunning)) }, }, { - name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code success", - mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - task := peer.Task - task.StorePeer(peer) - task.StorePeer(seedPeer) - seedPeer.FSM.SetState(resource.PeerStateFailed) - peer.FSM.SetState(resource.PeerStateRunning) - peer.StoreParent(seedPeer) - peer.StoreStream(stream) - - mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - _, ok := peer.LoadParent() - assert.False(ok) - assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) - assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) - }, - }, - { - name: "peer need back-to-source and send Code_SchedNeedBackSource code success", + name: "peer needs back-to-source and send Code_SchedNeedBackSource code success", mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) @@ -216,37 +192,13 @@ func TestScheduler_ScheduleParent(t *testing.T) { }, }, { - name: "seed peer state is PeerStateFailed and task state is PeerStateFailed", + name: "peer needs back-to-source and task state is TaskStateFailed", mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) - task.StorePeer(seedPeer) - seedPeer.FSM.SetState(resource.PeerStateFailed) + peer.NeedBackToSource.Store(true) peer.FSM.SetState(resource.PeerStateRunning) task.FSM.SetState(resource.TaskStateFailed) - peer.StoreParent(seedPeer) - peer.StoreStream(stream) - - mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - _, ok := peer.LoadParent() - assert.False(ok) - assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) - assert.True(peer.Task.FSM.Is(resource.TaskStateRunning)) - }, - }, - { - name: "seed peer state is PeerStateFailed and task state is PeerStateFailed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - task := peer.Task - task.StorePeer(peer) - task.StorePeer(seedPeer) - peer.FSM.SetState(resource.PeerStateRunning) - seedPeer.FSM.SetState(resource.PeerStateFailed) - task.FSM.SetState(resource.TaskStateFailed) - peer.StoreParent(seedPeer) peer.StoreStream(stream) mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1) @@ -272,95 +224,6 @@ func TestScheduler_ScheduleParent(t *testing.T) { assert.True(peer.FSM.Is(resource.PeerStateRunning)) }, }, - { - name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - task := peer.Task - task.StorePeer(peer) - peer.FSM.SetState(resource.PeerStateRunning) - peer.StoreParent(seedPeer) - peer.StoreStream(stream) - - gomock.InOrder( - md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1), - mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(errors.New("foo")).Times(1), - ) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - _, ok := peer.LoadParent() - assert.True(ok) - assert.True(peer.FSM.Is(resource.PeerStateRunning)) - }, - }, - { - name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code success", - mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - task := peer.Task - task.StorePeer(peer) - peer.FSM.SetState(resource.PeerStateRunning) - peer.StoreParent(seedPeer) - peer.StoreStream(stream) - - gomock.InOrder( - md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1), - mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1), - ) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - _, ok := peer.LoadParent() - assert.False(ok) - assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) - assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) - }, - }, - { - name: "seed peer state is PeerStateFailed and task state is PeerStateFailed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - task := peer.Task - task.StorePeer(peer) - peer.FSM.SetState(resource.PeerStateRunning) - task.FSM.SetState(resource.TaskStateFailed) - peer.StoreParent(seedPeer) - peer.StoreStream(stream) - - gomock.InOrder( - md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1), - mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1), - ) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - _, ok := peer.LoadParent() - assert.False(ok) - assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) - assert.True(peer.Task.FSM.Is(resource.TaskStateRunning)) - }, - }, - { - name: "seed peer state is PeerStateFailed and task state is PeerStateFailed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - task := peer.Task - task.StorePeer(peer) - peer.FSM.SetState(resource.PeerStateRunning) - task.FSM.SetState(resource.TaskStateFailed) - peer.StoreParent(seedPeer) - peer.StoreStream(stream) - - gomock.InOrder( - md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1), - mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1), - ) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - _, ok := peer.LoadParent() - assert.False(ok) - assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) - assert.True(peer.Task.FSM.Is(resource.TaskStateRunning)) - }, - }, { name: "schedule exceeds RetryLimit and peer stream load failed", mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 541a05698..075edbcc2 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -535,6 +535,10 @@ func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRe // Start trigger seed peer task. if s.config.SeedPeer.Enable { + if task.IsSeedPeerFailed() { + return task, true, nil + } + go s.triggerSeedPeerTask(ctx, task) return task, false, nil }