feat: scheduler add block peers set (#1202)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-03-28 14:32:50 +08:00
parent 23cb77d5b5
commit e33736121f
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
3 changed files with 19 additions and 32 deletions

View File

@ -31,6 +31,7 @@ import (
"go.uber.org/atomic"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/set"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
)
@ -142,6 +143,9 @@ type Peer struct {
// ChildCount is child count
ChildCount *atomic.Int32
// BlockPeers is bad peer ids
BlockPeers set.SafeSet
// CreateAt is peer create time
CreateAt *atomic.Time
@ -168,6 +172,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
Parent: &atomic.Value{},
Children: &sync.Map{},
ChildCount: atomic.NewInt32(0),
BlockPeers: set.NewSafeSet(),
CreateAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()),
mu: &sync.RWMutex{},

View File

@ -368,11 +368,8 @@ func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) e
}
// Reschedule a new parent to children of peer to exclude the current leave peer
blocklist := set.NewSafeSet()
blocklist.Add(peer.ID)
child.Log.Infof("schedule parent because of parent peer %s is leaving", peer.ID)
s.scheduler.ScheduleParent(ctx, child, blocklist)
s.scheduler.ScheduleParent(ctx, child, child.BlockPeers)
return true
})
@ -496,13 +493,8 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
return
}
// Its not a case of back-to-source or small task downloading,
// to help peer to schedule the parent node
blocklist := set.NewSafeSet()
blocklist.Add(peer.ID)
peer.Log.Infof("schedule parent because of peer receive begin of piece")
s.scheduler.ScheduleParent(ctx, peer, blocklist)
s.scheduler.ScheduleParent(ctx, peer, set.NewSafeSet())
default:
peer.Log.Warnf("peer state is %s when receive the begin of piece", peer.FSM.Current())
}
@ -535,7 +527,8 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec
parent, ok := s.resource.PeerManager().Load(piece.DstPid)
if !ok {
peer.Log.Errorf("schedule parent because of peer can not found parent %s", piece.DstPid)
s.scheduler.ScheduleParent(ctx, peer, set.NewSafeSet())
peer.BlockPeers.Add(piece.DstPid)
s.scheduler.ScheduleParent(ctx, peer, peer.BlockPeers)
return
}
@ -574,11 +567,9 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec
return
}
blocklist := set.NewSafeSet()
blocklist.Add(parent.ID)
peer.Log.Infof("schedule parent because of peer receive failed piece")
s.scheduler.ScheduleParent(ctx, peer, blocklist)
peer.BlockPeers.Add(parent.ID)
s.scheduler.ScheduleParent(ctx, peer, peer.BlockPeers)
}
// handlePeerSuccess handles successful peer
@ -609,9 +600,6 @@ func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) {
}
// Reschedule a new parent to children of peer to exclude the current failed peer
blocklist := set.NewSafeSet()
blocklist.Add(peer.ID)
peer.Children.Range(func(_, value interface{}) bool {
child, ok := value.(*resource.Peer)
if !ok {
@ -619,7 +607,7 @@ func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) {
}
child.Log.Infof("schedule parent because of parent peer %s is failed", peer.ID)
s.scheduler.ScheduleParent(ctx, child, blocklist)
s.scheduler.ScheduleParent(ctx, child, child.BlockPeers)
return true
})
}

View File

@ -1159,12 +1159,10 @@ func TestService_LeaveTask(t *testing.T) {
peer.FSM.SetState(resource.PeerStateSucceeded)
child.FSM.SetState(resource.PeerStateRunning)
blocklist := set.NewSafeSet()
blocklist.Add(peer.ID)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Any()).Return(peer, true).Times(1),
ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(blocklist)).Return().Times(1),
ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(set.NewSafeSet())).Return().Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Delete(gomock.Eq(peer.ID)).Return().Times(1),
)
@ -1210,12 +1208,10 @@ func TestService_LeaveTask(t *testing.T) {
peer.FSM.SetState(resource.PeerStateFailed)
child.FSM.SetState(resource.PeerStateRunning)
blocklist := set.NewSafeSet()
blocklist.Add(peer.ID)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Any()).Return(peer, true).Times(1),
ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(blocklist)).Return().Times(1),
ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(set.NewSafeSet())).Return().Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Delete(gomock.Eq(peer.ID)).Return().Times(1),
)
@ -1716,9 +1712,7 @@ func TestService_handleBeginOfPiece(t *testing.T) {
name: "peer state is PeerStateReceivedNormal",
mock: func(peer *resource.Peer, scheduler *mocks.MockSchedulerMockRecorder) {
peer.FSM.SetState(resource.PeerStateReceivedNormal)
blocklist := set.NewSafeSet()
blocklist.Add(peer.ID)
scheduler.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1)
scheduler.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(set.NewSafeSet())).Return().Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
@ -1942,10 +1936,12 @@ func TestService_handlePieceFail(t *testing.T) {
parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost),
run: func(t *testing.T, svc *Service, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
blocklist := set.NewSafeSet()
blocklist.Add(mockCDNPeerID)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(nil, false).Times(1),
ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(set.NewSafeSet())).Return().Times(1),
ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
)
svc.handlePieceFail(context.Background(), peer, piece)
@ -2361,9 +2357,7 @@ func TestService_handlePeerFail(t *testing.T) {
peer.FSM.SetState(resource.PeerStateRunning)
child.FSM.SetState(resource.PeerStateRunning)
blocklist := set.NewSafeSet()
blocklist.Add(peer.ID)
ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(blocklist)).Return().Times(1)
ms.ScheduleParent(gomock.Any(), gomock.Eq(child), gomock.Eq(set.NewSafeSet())).Return().Times(1)
},
expect: func(t *testing.T, peer *resource.Peer, child *resource.Peer) {
assert := assert.New(t)