From e33736121fd7de9f6e2b636de03cbc0293f00b43 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 28 Mar 2022 14:32:50 +0800 Subject: [PATCH] feat: scheduler add block peers set (#1202) Signed-off-by: Gaius --- scheduler/resource/peer.go | 5 +++++ scheduler/service/service.go | 26 +++++++------------------- scheduler/service/service_test.go | 20 +++++++------------- 3 files changed, 19 insertions(+), 32 deletions(-) diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index 0ac917fb9..3d749e9ff 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -31,6 +31,7 @@ import ( "go.uber.org/atomic" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/container/set" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" ) @@ -142,6 +143,9 @@ type Peer struct { // ChildCount is child count ChildCount *atomic.Int32 + // BlockPeers is bad peer ids + BlockPeers set.SafeSet + // CreateAt is peer create time CreateAt *atomic.Time @@ -168,6 +172,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { Parent: &atomic.Value{}, Children: &sync.Map{}, ChildCount: atomic.NewInt32(0), + BlockPeers: set.NewSafeSet(), CreateAt: atomic.NewTime(time.Now()), UpdateAt: atomic.NewTime(time.Now()), mu: &sync.RWMutex{}, diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 2343618bf..0fae7985d 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -368,11 +368,8 @@ func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) e } // Reschedule a new parent to children of peer to exclude the current leave peer - blocklist := set.NewSafeSet() - blocklist.Add(peer.ID) - child.Log.Infof("schedule parent because of parent peer %s is leaving", peer.ID) - s.scheduler.ScheduleParent(ctx, child, blocklist) + s.scheduler.ScheduleParent(ctx, child, child.BlockPeers) return true }) @@ -496,13 +493,8 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { return } - // It’s not a case of back-to-source or small task downloading, - // to help peer to schedule the parent node - blocklist := set.NewSafeSet() - blocklist.Add(peer.ID) - peer.Log.Infof("schedule parent because of peer receive begin of piece") - s.scheduler.ScheduleParent(ctx, peer, blocklist) + s.scheduler.ScheduleParent(ctx, peer, set.NewSafeSet()) default: peer.Log.Warnf("peer state is %s when receive the begin of piece", peer.FSM.Current()) } @@ -535,7 +527,8 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec parent, ok := s.resource.PeerManager().Load(piece.DstPid) if !ok { peer.Log.Errorf("schedule parent because of peer can not found parent %s", piece.DstPid) - s.scheduler.ScheduleParent(ctx, peer, set.NewSafeSet()) + peer.BlockPeers.Add(piece.DstPid) + s.scheduler.ScheduleParent(ctx, peer, peer.BlockPeers) return } @@ -574,11 +567,9 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec return } - blocklist := set.NewSafeSet() - blocklist.Add(parent.ID) - peer.Log.Infof("schedule parent because of peer receive failed piece") - s.scheduler.ScheduleParent(ctx, peer, blocklist) + peer.BlockPeers.Add(parent.ID) + s.scheduler.ScheduleParent(ctx, peer, peer.BlockPeers) } // handlePeerSuccess handles successful peer @@ -609,9 +600,6 @@ func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) { } // Reschedule a new parent to children of peer to exclude the current failed peer - blocklist := set.NewSafeSet() - blocklist.Add(peer.ID) - peer.Children.Range(func(_, value interface{}) bool { child, ok := value.(*resource.Peer) if !ok { @@ -619,7 +607,7 @@ func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) { } child.Log.Infof("schedule parent because of parent peer %s is failed", peer.ID) - s.scheduler.ScheduleParent(ctx, child, blocklist) + s.scheduler.ScheduleParent(ctx, child, child.BlockPeers) return true }) } diff --git a/scheduler/service/service_test.go b/scheduler/service/service_test.go index 34aecf11a..cce1f60ab 100644 --- a/scheduler/service/service_test.go +++ b/scheduler/service/service_test.go @@ -1159,12 +1159,10 @@ func TestService_LeaveTask(t *testing.T) { peer.FSM.SetState(resource.PeerStateSucceeded) child.FSM.SetState(resource.PeerStateRunning) - blocklist := set.NewSafeSet() - blocklist.Add(peer.ID) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Any()).Return(peer, true).Times(1), - ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(blocklist)).Return().Times(1), + ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(set.NewSafeSet())).Return().Times(1), mr.PeerManager().Return(peerManager).Times(1), mp.Delete(gomock.Eq(peer.ID)).Return().Times(1), ) @@ -1210,12 +1208,10 @@ func TestService_LeaveTask(t *testing.T) { peer.FSM.SetState(resource.PeerStateFailed) child.FSM.SetState(resource.PeerStateRunning) - blocklist := set.NewSafeSet() - blocklist.Add(peer.ID) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Any()).Return(peer, true).Times(1), - ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(blocklist)).Return().Times(1), + ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(set.NewSafeSet())).Return().Times(1), mr.PeerManager().Return(peerManager).Times(1), mp.Delete(gomock.Eq(peer.ID)).Return().Times(1), ) @@ -1716,9 +1712,7 @@ func TestService_handleBeginOfPiece(t *testing.T) { name: "peer state is PeerStateReceivedNormal", mock: func(peer *resource.Peer, scheduler *mocks.MockSchedulerMockRecorder) { peer.FSM.SetState(resource.PeerStateReceivedNormal) - blocklist := set.NewSafeSet() - blocklist.Add(peer.ID) - scheduler.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1) + scheduler.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(set.NewSafeSet())).Return().Times(1) }, expect: func(t *testing.T, peer *resource.Peer) { assert := assert.New(t) @@ -1942,10 +1936,12 @@ func TestService_handlePieceFail(t *testing.T) { parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost), run: func(t *testing.T, svc *Service, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) + blocklist := set.NewSafeSet() + blocklist.Add(mockCDNPeerID) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(parent.ID)).Return(nil, false).Times(1), - ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(set.NewSafeSet())).Return().Times(1), + ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) svc.handlePieceFail(context.Background(), peer, piece) @@ -2361,9 +2357,7 @@ func TestService_handlePeerFail(t *testing.T) { peer.FSM.SetState(resource.PeerStateRunning) child.FSM.SetState(resource.PeerStateRunning) - blocklist := set.NewSafeSet() - blocklist.Add(peer.ID) - ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(blocklist)).Return().Times(1) + ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(set.NewSafeSet())).Return().Times(1) }, expect: func(t *testing.T, peer *resource.Peer, child *resource.Peer) { assert := assert.New(t)