From e1dd1efca1ee12d76535820cfd32f67d93e6be03 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 7 Feb 2023 15:24:05 +0800 Subject: [PATCH] feat: add AnnouncePeers to task in resource (#2051) Signed-off-by: Gaius --- scheduler/resource/peer.go | 64 +++++++------- scheduler/resource/peer_test.go | 39 ++++----- scheduler/resource/task.go | 39 ++++++++- scheduler/resource/task_test.go | 121 ++++++++++++++++++++++---- scheduler/scheduler/scheduler.go | 6 +- scheduler/scheduler/scheduler_test.go | 16 ++-- scheduler/service/service_v1.go | 12 +-- scheduler/service/service_v1_test.go | 10 +-- 8 files changed, 211 insertions(+), 96 deletions(-) diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index 69d2cd22f..a2506c73e 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -153,9 +153,9 @@ type Peer struct { // Cost is the cost of downloading. Cost *atomic.Duration - // ReportPieceStream is the grpc stream of Scheduler_ReportPieceResultServer, + // ReportPieceResultStream is the grpc stream of Scheduler_ReportPieceResultServer, // Used only in v1 version of the grpc. - ReportPieceStream *atomic.Value + ReportPieceResultStream *atomic.Value // AnnouncePeerStream is the grpc stream of Scheduler_AnnouncePeerServer, // Used only in v2 version of the grpc. @@ -203,24 +203,24 @@ type Peer struct { // New Peer instance. func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { p := &Peer{ - ID: id, - Tag: DefaultTag, - Application: DefaultApplication, - Pieces: set.NewSafeSet[*Piece](), - FinishedPieces: &bitset.BitSet{}, - pieceCosts: []int64{}, - Cost: atomic.NewDuration(0), - ReportPieceStream: &atomic.Value{}, - AnnouncePeerStream: &atomic.Value{}, - Task: task, - Host: host, - BlockParents: set.NewSafeSet[string](), - NeedBackToSource: atomic.NewBool(false), - IsBackToSource: atomic.NewBool(false), - PieceUpdatedAt: atomic.NewTime(time.Now()), - CreatedAt: atomic.NewTime(time.Now()), - UpdatedAt: atomic.NewTime(time.Now()), - Log: logger.WithPeer(host.ID, task.ID, id), + ID: id, + Tag: DefaultTag, + Application: DefaultApplication, + Pieces: set.NewSafeSet[*Piece](), + FinishedPieces: &bitset.BitSet{}, + pieceCosts: []int64{}, + Cost: atomic.NewDuration(0), + ReportPieceResultStream: &atomic.Value{}, + AnnouncePeerStream: &atomic.Value{}, + Task: task, + Host: host, + BlockParents: set.NewSafeSet[string](), + NeedBackToSource: atomic.NewBool(false), + IsBackToSource: atomic.NewBool(false), + PieceUpdatedAt: atomic.NewTime(time.Now()), + CreatedAt: atomic.NewTime(time.Now()), + UpdatedAt: atomic.NewTime(time.Now()), + Log: logger.WithPeer(host.ID, task.ID, id), } // Initialize state machine. @@ -334,10 +334,10 @@ func (p *Peer) PieceCosts() []int64 { return p.pieceCosts } -// LoadReportPieceStream return the grpc stream of Scheduler_ReportPieceResultServer, +// LoadReportPieceResultStream return the grpc stream of Scheduler_ReportPieceResultServer, // Used only in v1 version of the grpc. -func (p *Peer) LoadReportPieceStream() (schedulerv1.Scheduler_ReportPieceResultServer, bool) { - rawStream := p.ReportPieceStream.Load() +func (p *Peer) LoadReportPieceResultStream() (schedulerv1.Scheduler_ReportPieceResultServer, bool) { + rawStream := p.ReportPieceResultStream.Load() if rawStream == nil { return nil, false } @@ -345,22 +345,22 @@ func (p *Peer) LoadReportPieceStream() (schedulerv1.Scheduler_ReportPieceResultS return rawStream.(schedulerv1.Scheduler_ReportPieceResultServer), true } -// StoreReportPieceStream set the grpc stream of Scheduler_ReportPieceResultServer, +// StoreReportPieceResultStream set the grpc stream of Scheduler_ReportPieceResultServer, // Used only in v1 version of the grpc. -func (p *Peer) StoreReportPieceStream(stream schedulerv1.Scheduler_ReportPieceResultServer) { - p.ReportPieceStream.Store(stream) +func (p *Peer) StoreReportPieceResultStream(stream schedulerv1.Scheduler_ReportPieceResultServer) { + p.ReportPieceResultStream.Store(stream) } -// DeleteReportPieceStream deletes the grpc stream of Scheduler_ReportPieceResultServer, +// DeleteReportPieceResultStream deletes the grpc stream of Scheduler_ReportPieceResultServer, // Used only in v1 version of the grpc. -func (p *Peer) DeleteReportPieceStream() { - p.ReportPieceStream = &atomic.Value{} +func (p *Peer) DeleteReportPieceResultStream() { + p.ReportPieceResultStream = &atomic.Value{} } // LoadAnnouncePeerStream return the grpc stream of Scheduler_AnnouncePeerServer, // Used only in v2 version of the grpc. func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServer, bool) { - rawStream := p.ReportPieceStream.Load() + rawStream := p.ReportPieceResultStream.Load() if rawStream == nil { return nil, false } @@ -371,13 +371,13 @@ func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServe // StoreAnnouncePeerStream set the grpc stream of Scheduler_AnnouncePeerServer, // Used only in v2 version of the grpc. func (p *Peer) StoreAnnouncePeerStream(stream schedulerv2.Scheduler_AnnouncePeerServer) { - p.ReportPieceStream.Store(stream) + p.ReportPieceResultStream.Store(stream) } // DeleteAnnouncePeerStream deletes the grpc stream of Scheduler_AnnouncePeerServer, // Used only in v2 version of the grpc. func (p *Peer) DeleteAnnouncePeerStream() { - p.ReportPieceStream = &atomic.Value{} + p.ReportPieceResultStream = &atomic.Value{} } // Parents returns parents of peer. diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index 83d638c59..1f4c362db 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -34,7 +34,7 @@ import ( commonv2 "d7y.io/api/pkg/apis/common/v2" managerv2 "d7y.io/api/pkg/apis/manager/v2" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" - "d7y.io/api/pkg/apis/scheduler/v1/mocks" + v1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks" schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" v2mocks "d7y.io/api/pkg/apis/scheduler/v2/mocks" @@ -64,7 +64,7 @@ func TestPeer_NewPeer(t *testing.T) { assert.Equal(peer.Pieces.Len(), uint(0)) assert.Empty(peer.FinishedPieces) assert.Equal(len(peer.PieceCosts()), 0) - assert.Empty(peer.ReportPieceStream) + assert.Empty(peer.ReportPieceResultStream) assert.Empty(peer.AnnouncePeerStream) assert.Equal(peer.FSM.Current(), PeerStatePending) assert.EqualValues(peer.Task, mockTask) @@ -86,8 +86,7 @@ func TestPeer_NewPeer(t *testing.T) { assert.Equal(peer.Pieces.Len(), uint(0)) assert.Empty(peer.FinishedPieces) assert.Equal(len(peer.PieceCosts()), 0) - assert.Empty(peer.ReportPieceStream) - assert.Empty(peer.AnnouncePeerStream) + assert.Empty(peer.ReportPieceResultStream) assert.Equal(peer.FSM.Current(), PeerStatePending) assert.EqualValues(peer.Task, mockTask) assert.EqualValues(peer.Host, mockHost) @@ -183,7 +182,7 @@ func TestPeer_PieceCosts(t *testing.T) { } } -func TestPeer_LoadReportPieceStream(t *testing.T) { +func TestPeer_LoadReportPieceResultStream(t *testing.T) { tests := []struct { name string expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) @@ -192,8 +191,8 @@ func TestPeer_LoadReportPieceStream(t *testing.T) { name: "load stream", expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) { assert := assert.New(t) - peer.StoreReportPieceStream(stream) - newStream, loaded := peer.LoadReportPieceStream() + peer.StoreReportPieceResultStream(stream) + newStream, loaded := peer.LoadReportPieceResultStream() assert.Equal(loaded, true) assert.EqualValues(newStream, stream) }, @@ -202,7 +201,7 @@ func TestPeer_LoadReportPieceStream(t *testing.T) { name: "stream does not exist", expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) { assert := assert.New(t) - _, loaded := peer.LoadReportPieceStream() + _, loaded := peer.LoadReportPieceResultStream() assert.Equal(loaded, false) }, }, @@ -212,7 +211,7 @@ func TestPeer_LoadReportPieceStream(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) + stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl) mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -224,7 +223,7 @@ func TestPeer_LoadReportPieceStream(t *testing.T) { } } -func TestPeer_StoreReportPieceStream(t *testing.T) { +func TestPeer_StoreReportPieceResultStream(t *testing.T) { tests := []struct { name string expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) @@ -233,8 +232,8 @@ func TestPeer_StoreReportPieceStream(t *testing.T) { name: "store stream", expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) { assert := assert.New(t) - peer.StoreReportPieceStream(stream) - newStream, loaded := peer.LoadReportPieceStream() + peer.StoreReportPieceResultStream(stream) + newStream, loaded := peer.LoadReportPieceResultStream() assert.Equal(loaded, true) assert.EqualValues(newStream, stream) }, @@ -245,7 +244,7 @@ func TestPeer_StoreReportPieceStream(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) + stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl) mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -257,7 +256,7 @@ func TestPeer_StoreReportPieceStream(t *testing.T) { } } -func TestPeer_DeleteReportPieceStream(t *testing.T) { +func TestPeer_DeleteReportPieceResultStream(t *testing.T) { tests := []struct { name string expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) @@ -266,9 +265,9 @@ func TestPeer_DeleteReportPieceStream(t *testing.T) { name: "delete stream", expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) { assert := assert.New(t) - peer.StoreReportPieceStream(stream) - peer.DeleteReportPieceStream() - _, loaded := peer.LoadReportPieceStream() + peer.StoreReportPieceResultStream(stream) + peer.DeleteReportPieceResultStream() + _, loaded := peer.LoadReportPieceResultStream() assert.Equal(loaded, false) }, }, @@ -278,7 +277,7 @@ func TestPeer_DeleteReportPieceStream(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) + stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl) mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -430,7 +429,7 @@ func TestPeer_Parents(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) + stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl) mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -476,7 +475,7 @@ func TestPeer_Children(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) + stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl) mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index 6b7c9309a..09f749987 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -28,6 +28,7 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" + schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/container/set" @@ -439,8 +440,9 @@ func (t *Task) CanReuseDirectPiece() bool { return len(t.DirectPiece) > 0 && int64(len(t.DirectPiece)) == t.ContentLength.Load() } -// NotifyPeers notify all peers in the task with the state code. -func (t *Task) NotifyPeers(peerPacket *schedulerv1.PeerPacket, event string) { +// ReportPieceResultToPeers reports all peers in the task with the state code. +// Used only in v1 version of the grpc. +func (t *Task) ReportPieceResultToPeers(peerPacket *schedulerv1.PeerPacket, event string) { for _, vertex := range t.DAG.GetVertices() { peer := vertex.Value if peer == nil { @@ -448,7 +450,7 @@ func (t *Task) NotifyPeers(peerPacket *schedulerv1.PeerPacket, event string) { } if peer.FSM.Is(PeerStateRunning) { - stream, loaded := peer.LoadReportPieceStream() + stream, loaded := peer.LoadReportPieceResultStream() if !loaded { continue } @@ -457,7 +459,36 @@ func (t *Task) NotifyPeers(peerPacket *schedulerv1.PeerPacket, event string) { t.Log.Errorf("send packet to peer %s failed: %s", peer.ID, err.Error()) continue } - t.Log.Infof("task notify peer %s code %s", peer.ID, peerPacket.Code) + t.Log.Infof("task reports peer %s code %s", peer.ID, peerPacket.Code) + + if err := peer.FSM.Event(context.Background(), event); err != nil { + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) + continue + } + } + } +} + +// AnnouncePeers announces all peers in the task with the state code. +// Used only in v2 version of the grpc. +func (t *Task) AnnouncePeers(resp *schedulerv2.AnnouncePeerResponse, event string) { + for _, vertex := range t.DAG.GetVertices() { + peer := vertex.Value + if peer == nil { + continue + } + + if peer.FSM.Is(PeerStateRunning) { + stream, loaded := peer.LoadAnnouncePeerStream() + if !loaded { + continue + } + + if err := stream.Send(resp); err != nil { + t.Log.Errorf("send response to peer %s failed: %s", peer.ID, err.Error()) + continue + } + t.Log.Infof("task announces peer %s response %#v", peer.ID, resp.Response) if err := peer.FSM.Event(context.Background(), event); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) diff --git a/scheduler/resource/task_test.go b/scheduler/resource/task_test.go index ee90fb67a..8a52c65f1 100644 --- a/scheduler/resource/task_test.go +++ b/scheduler/resource/task_test.go @@ -26,7 +26,9 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" - "d7y.io/api/pkg/apis/scheduler/v1/mocks" + v1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks" + schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" + v2mocks "d7y.io/api/pkg/apis/scheduler/v2/mocks" "d7y.io/dragonfly/v2/pkg/container/set" "d7y.io/dragonfly/v2/pkg/idgen" @@ -1584,16 +1586,16 @@ func TestTask_CanReuseDirectPiece(t *testing.T) { } } -func TestTask_NotifyPeers(t *testing.T) { +func TestTask_ReportPieceResultToPeers(t *testing.T) { tests := []struct { name string - run func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) + run func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) }{ { name: "peer state is PeerStatePending", - run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) { + run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) { mockPeer.FSM.SetState(PeerStatePending) - task.NotifyPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed) + task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed) assert := assert.New(t) assert.True(mockPeer.FSM.Is(PeerStatePending)) @@ -1601,9 +1603,9 @@ func TestTask_NotifyPeers(t *testing.T) { }, { name: "peer state is PeerStateRunning and stream is empty", - run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) { + run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) { mockPeer.FSM.SetState(PeerStateRunning) - task.NotifyPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed) + task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed) assert := assert.New(t) assert.True(mockPeer.FSM.Is(PeerStateRunning)) @@ -1611,12 +1613,12 @@ func TestTask_NotifyPeers(t *testing.T) { }, { name: "peer state is PeerStateRunning and stream sending failed", - run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) { + run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) { mockPeer.FSM.SetState(PeerStateRunning) - mockPeer.StoreReportPieceStream(stream) + mockPeer.StoreReportPieceResultStream(stream) ms.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError})).Return(errors.New("foo")).Times(1) - task.NotifyPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed) + task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed) assert := assert.New(t) assert.True(mockPeer.FSM.Is(PeerStateRunning)) @@ -1624,25 +1626,25 @@ func TestTask_NotifyPeers(t *testing.T) { }, { name: "peer state is PeerStateRunning and state changing failed", - run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) { + run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) { mockPeer.FSM.SetState(PeerStateRunning) - mockPeer.StoreReportPieceStream(stream) + mockPeer.StoreReportPieceResultStream(stream) ms.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError})).Return(errors.New("foo")).Times(1) - task.NotifyPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed) + task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed) assert := assert.New(t) assert.True(mockPeer.FSM.Is(PeerStateRunning)) }, }, { - name: "peer state is PeerStateRunning and notify peer successfully", - run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) { + name: "peer state is PeerStateRunning and report peer successfully", + run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) { mockPeer.FSM.SetState(PeerStateRunning) - mockPeer.StoreReportPieceStream(stream) + mockPeer.StoreReportPieceResultStream(stream) ms.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError})).Return(nil).Times(1) - task.NotifyPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed) + task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed) assert := assert.New(t) assert.True(mockPeer.FSM.Is(PeerStateFailed)) @@ -1654,7 +1656,90 @@ func TestTask_NotifyPeers(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) + stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl) + + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) + mockPeer := NewPeer(mockPeerID, task, mockHost) + task.StorePeer(mockPeer) + tc.run(t, task, mockPeer, stream, stream.EXPECT()) + }) + } +} + +func TestTask_AnnouncePeers(t *testing.T) { + tests := []struct { + name string + run func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) + }{ + { + name: "peer state is PeerStatePending", + run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) { + mockPeer.FSM.SetState(PeerStatePending) + task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed) + + assert := assert.New(t) + assert.True(mockPeer.FSM.Is(PeerStatePending)) + }, + }, + { + name: "peer state is PeerStateRunning and stream is empty", + run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) { + mockPeer.FSM.SetState(PeerStateRunning) + task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed) + + assert := assert.New(t) + assert.True(mockPeer.FSM.Is(PeerStateRunning)) + }, + }, + { + name: "peer state is PeerStateRunning and stream sending failed", + run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) { + mockPeer.FSM.SetState(PeerStateRunning) + mockPeer.StoreAnnouncePeerStream(stream) + ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(errors.New("foo")).Times(1) + + task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed) + + assert := assert.New(t) + assert.True(mockPeer.FSM.Is(PeerStateRunning)) + }, + }, + { + name: "peer state is PeerStateRunning and state changing failed", + run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) { + mockPeer.FSM.SetState(PeerStateRunning) + mockPeer.StoreAnnouncePeerStream(stream) + ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(errors.New("foo")).Times(1) + + task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed) + + assert := assert.New(t) + assert.True(mockPeer.FSM.Is(PeerStateRunning)) + }, + }, + { + name: "peer state is PeerStateRunning and announce peer successfully", + run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) { + mockPeer.FSM.SetState(PeerStateRunning) + mockPeer.StoreAnnouncePeerStream(stream) + ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(nil).Times(1) + + task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed) + + assert := assert.New(t) + assert.True(mockPeer.FSM.Is(PeerStateFailed)) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl) mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, diff --git a/scheduler/scheduler/scheduler.go b/scheduler/scheduler/scheduler.go index 81255cfca..fd6d36a2a 100644 --- a/scheduler/scheduler/scheduler.go +++ b/scheduler/scheduler/scheduler.go @@ -80,7 +80,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo peer.Log.Infof("peer needs to back-to-source: %t", needBackToSource) if (n >= s.config.RetryBackToSourceLimit || needBackToSource) && peer.Task.CanBackToSource() { - stream, loaded := peer.LoadReportPieceStream() + stream, loaded := peer.LoadReportPieceResultStream() if !loaded { peer.Log.Error("load stream failed") return @@ -112,7 +112,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo // Handle peer schedule failed. if n >= s.config.RetryLimit { - stream, loaded := peer.LoadReportPieceStream() + stream, loaded := peer.LoadReportPieceResultStream() if !loaded { peer.Log.Error("load stream failed") return @@ -193,7 +193,7 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer } // Send scheduling success message. - stream, loaded := peer.LoadReportPieceStream() + stream, loaded := peer.LoadReportPieceResultStream() if !loaded { peer.Log.Error("load peer stream failed") return []*resource.Peer{}, false diff --git a/scheduler/scheduler/scheduler_test.go b/scheduler/scheduler/scheduler_test.go index 9c064de50..11d5a6ce5 100644 --- a/scheduler/scheduler/scheduler_test.go +++ b/scheduler/scheduler/scheduler_test.go @@ -233,7 +233,7 @@ func TestScheduler_ScheduleParent(t *testing.T) { task.StorePeer(peer) peer.NeedBackToSource.Store(true) peer.FSM.SetState(resource.PeerStateRunning) - peer.StoreReportPieceStream(stream) + peer.StoreReportPieceResultStream(stream) mr.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedNeedBackSource})).Return(errors.New("foo")).Times(1) }, @@ -250,7 +250,7 @@ func TestScheduler_ScheduleParent(t *testing.T) { task.StorePeer(peer) peer.NeedBackToSource.Store(true) peer.FSM.SetState(resource.PeerStateRunning) - peer.StoreReportPieceStream(stream) + peer.StoreReportPieceResultStream(stream) mr.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedNeedBackSource})).Return(nil).Times(1) }, @@ -269,7 +269,7 @@ func TestScheduler_ScheduleParent(t *testing.T) { peer.NeedBackToSource.Store(true) peer.FSM.SetState(resource.PeerStateRunning) task.FSM.SetState(resource.TaskStateFailed) - peer.StoreReportPieceStream(stream) + peer.StoreReportPieceResultStream(stream) mr.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedNeedBackSource})).Return(nil).Times(1) }, @@ -315,7 +315,7 @@ func TestScheduler_ScheduleParent(t *testing.T) { task.StorePeer(peer) peer.FSM.SetState(resource.PeerStateRunning) peer.Task.BackToSourceLimit.Store(-1) - peer.StoreReportPieceStream(stream) + peer.StoreReportPieceResultStream(stream) gomock.InOrder( md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2), @@ -335,7 +335,7 @@ func TestScheduler_ScheduleParent(t *testing.T) { task.StorePeer(peer) peer.FSM.SetState(resource.PeerStateRunning) peer.Task.BackToSourceLimit.Store(-1) - peer.StoreReportPieceStream(stream) + peer.StoreReportPieceResultStream(stream) gomock.InOrder( md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2), @@ -356,7 +356,7 @@ func TestScheduler_ScheduleParent(t *testing.T) { task.StorePeer(seedPeer) peer.FSM.SetState(resource.PeerStateRunning) seedPeer.FSM.SetState(resource.PeerStateRunning) - peer.StoreReportPieceStream(stream) + peer.StoreReportPieceResultStream(stream) gomock.InOrder( md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1), md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{ @@ -605,7 +605,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { peer.Task.StorePeer(peer) peer.Task.StorePeer(mockPeer) mockPeer.FinishedPieces.Set(0) - peer.StoreReportPieceStream(stream) + peer.StoreReportPieceResultStream(stream) gomock.InOrder( md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1), md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{ @@ -636,7 +636,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { mockPeer.IsBackToSource.Store(true) candidatePeer.IsBackToSource.Store(true) mockPeer.FinishedPieces.Set(0) - peer.StoreReportPieceStream(stream) + peer.StoreReportPieceResultStream(stream) gomock.InOrder( md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1), md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{ diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 35212a791..5a23952b3 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -208,8 +208,8 @@ func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultSer } // Peer setting stream. - peer.StoreReportPieceStream(stream) - defer peer.DeleteReportPieceStream() + peer.StoreReportPieceResultStream(stream) + defer peer.DeleteReportPieceResultStream() } if piece.PieceInfo != nil { @@ -1020,7 +1020,7 @@ func (v *V1) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece // Returns an scheduling error if the peer // state is not PeerStateRunning. - stream, loaded := peer.LoadReportPieceStream() + stream, loaded := peer.LoadReportPieceResultStream() if !loaded { peer.Log.Error("load stream failed") return @@ -1131,7 +1131,7 @@ func (v *V1) handleTaskFailure(ctx context.Context, task *resource.Task, backToS // and return the source metadata to peer. if backToSourceErr != nil { if !backToSourceErr.Temporary { - task.NotifyPeers(&schedulerv1.PeerPacket{ + task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{ Code: commonv1.Code_BackToSourceAborted, Errordetails: &schedulerv1.PeerPacket_SourceError{ SourceError: backToSourceErr, @@ -1162,7 +1162,7 @@ func (v *V1) handleTaskFailure(ctx context.Context, task *resource.Task, backToS task.URLMeta.Tag, task.URLMeta.Application, proto, "0").Inc() } if !d.Temporary { - task.NotifyPeers(&schedulerv1.PeerPacket{ + task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{ Code: commonv1.Code_BackToSourceAborted, Errordetails: &schedulerv1.PeerPacket_SourceError{ SourceError: d, @@ -1176,7 +1176,7 @@ func (v *V1) handleTaskFailure(ctx context.Context, task *resource.Task, backToS } else if task.PeerFailedCount.Load() > resource.FailedPeerCountLimit { // If the number of failed peers in the task is greater than FailedPeerCountLimit, // then scheduler notifies running peers of failure. - task.NotifyPeers(&schedulerv1.PeerPacket{ + task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{ Code: commonv1.Code_SchedTaskStatusError, }, resource.PeerEventDownloadFailed) task.PeerFailedCount.Store(0) diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index b7cc5085c..f165e76ac 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -1011,7 +1011,7 @@ func TestService_ReportPieceResult(t *testing.T) { expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) assert.NoError(err) - _, loaded := peer.LoadReportPieceStream() + _, loaded := peer.LoadReportPieceResultStream() assert.False(loaded) }, }, @@ -1039,7 +1039,7 @@ func TestService_ReportPieceResult(t *testing.T) { expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) assert.NoError(err) - _, loaded := peer.LoadReportPieceStream() + _, loaded := peer.LoadReportPieceResultStream() assert.False(loaded) }, }, @@ -1068,7 +1068,7 @@ func TestService_ReportPieceResult(t *testing.T) { expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) assert.NoError(err) - _, loaded := peer.LoadReportPieceStream() + _, loaded := peer.LoadReportPieceResultStream() assert.False(loaded) }, }, @@ -1094,7 +1094,7 @@ func TestService_ReportPieceResult(t *testing.T) { expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) assert.NoError(err) - _, loaded := peer.LoadReportPieceStream() + _, loaded := peer.LoadReportPieceResultStream() assert.False(loaded) }, }, @@ -1121,7 +1121,7 @@ func TestService_ReportPieceResult(t *testing.T) { expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) assert.NoError(err) - _, loaded := peer.LoadReportPieceStream() + _, loaded := peer.LoadReportPieceResultStream() assert.False(loaded) }, },