feat: add AnnouncePeers to task in resource (#2051)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-02-07 15:24:05 +08:00
parent 7dc3c826f2
commit e1dd1efca1
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
8 changed files with 211 additions and 96 deletions

View File

@ -153,9 +153,9 @@ type Peer struct {
// Cost is the cost of downloading.
Cost *atomic.Duration
// ReportPieceStream is the grpc stream of Scheduler_ReportPieceResultServer,
// ReportPieceResultStream is the grpc stream of Scheduler_ReportPieceResultServer,
// Used only in v1 version of the grpc.
ReportPieceStream *atomic.Value
ReportPieceResultStream *atomic.Value
// AnnouncePeerStream is the grpc stream of Scheduler_AnnouncePeerServer,
// Used only in v2 version of the grpc.
@ -203,24 +203,24 @@ type Peer struct {
// New Peer instance.
func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
p := &Peer{
ID: id,
Tag: DefaultTag,
Application: DefaultApplication,
Pieces: set.NewSafeSet[*Piece](),
FinishedPieces: &bitset.BitSet{},
pieceCosts: []int64{},
Cost: atomic.NewDuration(0),
ReportPieceStream: &atomic.Value{},
AnnouncePeerStream: &atomic.Value{},
Task: task,
Host: host,
BlockParents: set.NewSafeSet[string](),
NeedBackToSource: atomic.NewBool(false),
IsBackToSource: atomic.NewBool(false),
PieceUpdatedAt: atomic.NewTime(time.Now()),
CreatedAt: atomic.NewTime(time.Now()),
UpdatedAt: atomic.NewTime(time.Now()),
Log: logger.WithPeer(host.ID, task.ID, id),
ID: id,
Tag: DefaultTag,
Application: DefaultApplication,
Pieces: set.NewSafeSet[*Piece](),
FinishedPieces: &bitset.BitSet{},
pieceCosts: []int64{},
Cost: atomic.NewDuration(0),
ReportPieceResultStream: &atomic.Value{},
AnnouncePeerStream: &atomic.Value{},
Task: task,
Host: host,
BlockParents: set.NewSafeSet[string](),
NeedBackToSource: atomic.NewBool(false),
IsBackToSource: atomic.NewBool(false),
PieceUpdatedAt: atomic.NewTime(time.Now()),
CreatedAt: atomic.NewTime(time.Now()),
UpdatedAt: atomic.NewTime(time.Now()),
Log: logger.WithPeer(host.ID, task.ID, id),
}
// Initialize state machine.
@ -334,10 +334,10 @@ func (p *Peer) PieceCosts() []int64 {
return p.pieceCosts
}
// LoadReportPieceStream return the grpc stream of Scheduler_ReportPieceResultServer,
// LoadReportPieceResultStream return the grpc stream of Scheduler_ReportPieceResultServer,
// Used only in v1 version of the grpc.
func (p *Peer) LoadReportPieceStream() (schedulerv1.Scheduler_ReportPieceResultServer, bool) {
rawStream := p.ReportPieceStream.Load()
func (p *Peer) LoadReportPieceResultStream() (schedulerv1.Scheduler_ReportPieceResultServer, bool) {
rawStream := p.ReportPieceResultStream.Load()
if rawStream == nil {
return nil, false
}
@ -345,22 +345,22 @@ func (p *Peer) LoadReportPieceStream() (schedulerv1.Scheduler_ReportPieceResultS
return rawStream.(schedulerv1.Scheduler_ReportPieceResultServer), true
}
// StoreReportPieceStream set the grpc stream of Scheduler_ReportPieceResultServer,
// StoreReportPieceResultStream set the grpc stream of Scheduler_ReportPieceResultServer,
// Used only in v1 version of the grpc.
func (p *Peer) StoreReportPieceStream(stream schedulerv1.Scheduler_ReportPieceResultServer) {
p.ReportPieceStream.Store(stream)
func (p *Peer) StoreReportPieceResultStream(stream schedulerv1.Scheduler_ReportPieceResultServer) {
p.ReportPieceResultStream.Store(stream)
}
// DeleteReportPieceStream deletes the grpc stream of Scheduler_ReportPieceResultServer,
// DeleteReportPieceResultStream deletes the grpc stream of Scheduler_ReportPieceResultServer,
// Used only in v1 version of the grpc.
func (p *Peer) DeleteReportPieceStream() {
p.ReportPieceStream = &atomic.Value{}
func (p *Peer) DeleteReportPieceResultStream() {
p.ReportPieceResultStream = &atomic.Value{}
}
// LoadAnnouncePeerStream return the grpc stream of Scheduler_AnnouncePeerServer,
// Used only in v2 version of the grpc.
func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServer, bool) {
rawStream := p.ReportPieceStream.Load()
rawStream := p.ReportPieceResultStream.Load()
if rawStream == nil {
return nil, false
}
@ -371,13 +371,13 @@ func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServe
// StoreAnnouncePeerStream set the grpc stream of Scheduler_AnnouncePeerServer,
// Used only in v2 version of the grpc.
func (p *Peer) StoreAnnouncePeerStream(stream schedulerv2.Scheduler_AnnouncePeerServer) {
p.ReportPieceStream.Store(stream)
p.ReportPieceResultStream.Store(stream)
}
// DeleteAnnouncePeerStream deletes the grpc stream of Scheduler_AnnouncePeerServer,
// Used only in v2 version of the grpc.
func (p *Peer) DeleteAnnouncePeerStream() {
p.ReportPieceStream = &atomic.Value{}
p.ReportPieceResultStream = &atomic.Value{}
}
// Parents returns parents of peer.

View File

@ -34,7 +34,7 @@ import (
commonv2 "d7y.io/api/pkg/apis/common/v2"
managerv2 "d7y.io/api/pkg/apis/manager/v2"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
"d7y.io/api/pkg/apis/scheduler/v1/mocks"
v1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks"
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
v2mocks "d7y.io/api/pkg/apis/scheduler/v2/mocks"
@ -64,7 +64,7 @@ func TestPeer_NewPeer(t *testing.T) {
assert.Equal(peer.Pieces.Len(), uint(0))
assert.Empty(peer.FinishedPieces)
assert.Equal(len(peer.PieceCosts()), 0)
assert.Empty(peer.ReportPieceStream)
assert.Empty(peer.ReportPieceResultStream)
assert.Empty(peer.AnnouncePeerStream)
assert.Equal(peer.FSM.Current(), PeerStatePending)
assert.EqualValues(peer.Task, mockTask)
@ -86,8 +86,7 @@ func TestPeer_NewPeer(t *testing.T) {
assert.Equal(peer.Pieces.Len(), uint(0))
assert.Empty(peer.FinishedPieces)
assert.Equal(len(peer.PieceCosts()), 0)
assert.Empty(peer.ReportPieceStream)
assert.Empty(peer.AnnouncePeerStream)
assert.Empty(peer.ReportPieceResultStream)
assert.Equal(peer.FSM.Current(), PeerStatePending)
assert.EqualValues(peer.Task, mockTask)
assert.EqualValues(peer.Host, mockHost)
@ -183,7 +182,7 @@ func TestPeer_PieceCosts(t *testing.T) {
}
}
func TestPeer_LoadReportPieceStream(t *testing.T) {
func TestPeer_LoadReportPieceResultStream(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
@ -192,8 +191,8 @@ func TestPeer_LoadReportPieceStream(t *testing.T) {
name: "load stream",
expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
peer.StoreReportPieceStream(stream)
newStream, loaded := peer.LoadReportPieceStream()
peer.StoreReportPieceResultStream(stream)
newStream, loaded := peer.LoadReportPieceResultStream()
assert.Equal(loaded, true)
assert.EqualValues(newStream, stream)
},
@ -202,7 +201,7 @@ func TestPeer_LoadReportPieceStream(t *testing.T) {
name: "stream does not exist",
expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
_, loaded := peer.LoadReportPieceStream()
_, loaded := peer.LoadReportPieceResultStream()
assert.Equal(loaded, false)
},
},
@ -212,7 +211,7 @@ func TestPeer_LoadReportPieceStream(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -224,7 +223,7 @@ func TestPeer_LoadReportPieceStream(t *testing.T) {
}
}
func TestPeer_StoreReportPieceStream(t *testing.T) {
func TestPeer_StoreReportPieceResultStream(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
@ -233,8 +232,8 @@ func TestPeer_StoreReportPieceStream(t *testing.T) {
name: "store stream",
expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
peer.StoreReportPieceStream(stream)
newStream, loaded := peer.LoadReportPieceStream()
peer.StoreReportPieceResultStream(stream)
newStream, loaded := peer.LoadReportPieceResultStream()
assert.Equal(loaded, true)
assert.EqualValues(newStream, stream)
},
@ -245,7 +244,7 @@ func TestPeer_StoreReportPieceStream(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -257,7 +256,7 @@ func TestPeer_StoreReportPieceStream(t *testing.T) {
}
}
func TestPeer_DeleteReportPieceStream(t *testing.T) {
func TestPeer_DeleteReportPieceResultStream(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
@ -266,9 +265,9 @@ func TestPeer_DeleteReportPieceStream(t *testing.T) {
name: "delete stream",
expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
peer.StoreReportPieceStream(stream)
peer.DeleteReportPieceStream()
_, loaded := peer.LoadReportPieceStream()
peer.StoreReportPieceResultStream(stream)
peer.DeleteReportPieceResultStream()
_, loaded := peer.LoadReportPieceResultStream()
assert.Equal(loaded, false)
},
},
@ -278,7 +277,7 @@ func TestPeer_DeleteReportPieceStream(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -430,7 +429,7 @@ func TestPeer_Parents(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -476,7 +475,7 @@ func TestPeer_Children(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,

View File

@ -28,6 +28,7 @@ import (
commonv1 "d7y.io/api/pkg/apis/common/v1"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/set"
@ -439,8 +440,9 @@ func (t *Task) CanReuseDirectPiece() bool {
return len(t.DirectPiece) > 0 && int64(len(t.DirectPiece)) == t.ContentLength.Load()
}
// NotifyPeers notify all peers in the task with the state code.
func (t *Task) NotifyPeers(peerPacket *schedulerv1.PeerPacket, event string) {
// ReportPieceResultToPeers reports all peers in the task with the state code.
// Used only in v1 version of the grpc.
func (t *Task) ReportPieceResultToPeers(peerPacket *schedulerv1.PeerPacket, event string) {
for _, vertex := range t.DAG.GetVertices() {
peer := vertex.Value
if peer == nil {
@ -448,7 +450,7 @@ func (t *Task) NotifyPeers(peerPacket *schedulerv1.PeerPacket, event string) {
}
if peer.FSM.Is(PeerStateRunning) {
stream, loaded := peer.LoadReportPieceStream()
stream, loaded := peer.LoadReportPieceResultStream()
if !loaded {
continue
}
@ -457,7 +459,36 @@ func (t *Task) NotifyPeers(peerPacket *schedulerv1.PeerPacket, event string) {
t.Log.Errorf("send packet to peer %s failed: %s", peer.ID, err.Error())
continue
}
t.Log.Infof("task notify peer %s code %s", peer.ID, peerPacket.Code)
t.Log.Infof("task reports peer %s code %s", peer.ID, peerPacket.Code)
if err := peer.FSM.Event(context.Background(), event); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
continue
}
}
}
}
// AnnouncePeers announces all peers in the task with the state code.
// Used only in v2 version of the grpc.
func (t *Task) AnnouncePeers(resp *schedulerv2.AnnouncePeerResponse, event string) {
for _, vertex := range t.DAG.GetVertices() {
peer := vertex.Value
if peer == nil {
continue
}
if peer.FSM.Is(PeerStateRunning) {
stream, loaded := peer.LoadAnnouncePeerStream()
if !loaded {
continue
}
if err := stream.Send(resp); err != nil {
t.Log.Errorf("send response to peer %s failed: %s", peer.ID, err.Error())
continue
}
t.Log.Infof("task announces peer %s response %#v", peer.ID, resp.Response)
if err := peer.FSM.Event(context.Background(), event); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())

View File

@ -26,7 +26,9 @@ import (
commonv1 "d7y.io/api/pkg/apis/common/v1"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
"d7y.io/api/pkg/apis/scheduler/v1/mocks"
v1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks"
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
v2mocks "d7y.io/api/pkg/apis/scheduler/v2/mocks"
"d7y.io/dragonfly/v2/pkg/container/set"
"d7y.io/dragonfly/v2/pkg/idgen"
@ -1584,16 +1586,16 @@ func TestTask_CanReuseDirectPiece(t *testing.T) {
}
}
func TestTask_NotifyPeers(t *testing.T) {
func TestTask_ReportPieceResultToPeers(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder)
run func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder)
}{
{
name: "peer state is PeerStatePending",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
mockPeer.FSM.SetState(PeerStatePending)
task.NotifyPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed)
task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed)
assert := assert.New(t)
assert.True(mockPeer.FSM.Is(PeerStatePending))
@ -1601,9 +1603,9 @@ func TestTask_NotifyPeers(t *testing.T) {
},
{
name: "peer state is PeerStateRunning and stream is empty",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
mockPeer.FSM.SetState(PeerStateRunning)
task.NotifyPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed)
task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed)
assert := assert.New(t)
assert.True(mockPeer.FSM.Is(PeerStateRunning))
@ -1611,12 +1613,12 @@ func TestTask_NotifyPeers(t *testing.T) {
},
{
name: "peer state is PeerStateRunning and stream sending failed",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
mockPeer.FSM.SetState(PeerStateRunning)
mockPeer.StoreReportPieceStream(stream)
mockPeer.StoreReportPieceResultStream(stream)
ms.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError})).Return(errors.New("foo")).Times(1)
task.NotifyPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed)
task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed)
assert := assert.New(t)
assert.True(mockPeer.FSM.Is(PeerStateRunning))
@ -1624,25 +1626,25 @@ func TestTask_NotifyPeers(t *testing.T) {
},
{
name: "peer state is PeerStateRunning and state changing failed",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
mockPeer.FSM.SetState(PeerStateRunning)
mockPeer.StoreReportPieceStream(stream)
mockPeer.StoreReportPieceResultStream(stream)
ms.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError})).Return(errors.New("foo")).Times(1)
task.NotifyPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed)
task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed)
assert := assert.New(t)
assert.True(mockPeer.FSM.Is(PeerStateRunning))
},
},
{
name: "peer state is PeerStateRunning and notify peer successfully",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
name: "peer state is PeerStateRunning and report peer successfully",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer, ms *v1mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
mockPeer.FSM.SetState(PeerStateRunning)
mockPeer.StoreReportPieceStream(stream)
mockPeer.StoreReportPieceResultStream(stream)
ms.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError})).Return(nil).Times(1)
task.NotifyPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed)
task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}, PeerEventDownloadFailed)
assert := assert.New(t)
assert.True(mockPeer.FSM.Is(PeerStateFailed))
@ -1654,7 +1656,90 @@ func TestTask_NotifyPeers(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, task, mockHost)
task.StorePeer(mockPeer)
tc.run(t, task, mockPeer, stream, stream.EXPECT())
})
}
}
func TestTask_AnnouncePeers(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder)
}{
{
name: "peer state is PeerStatePending",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) {
mockPeer.FSM.SetState(PeerStatePending)
task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed)
assert := assert.New(t)
assert.True(mockPeer.FSM.Is(PeerStatePending))
},
},
{
name: "peer state is PeerStateRunning and stream is empty",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) {
mockPeer.FSM.SetState(PeerStateRunning)
task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed)
assert := assert.New(t)
assert.True(mockPeer.FSM.Is(PeerStateRunning))
},
},
{
name: "peer state is PeerStateRunning and stream sending failed",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) {
mockPeer.FSM.SetState(PeerStateRunning)
mockPeer.StoreAnnouncePeerStream(stream)
ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(errors.New("foo")).Times(1)
task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed)
assert := assert.New(t)
assert.True(mockPeer.FSM.Is(PeerStateRunning))
},
},
{
name: "peer state is PeerStateRunning and state changing failed",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) {
mockPeer.FSM.SetState(PeerStateRunning)
mockPeer.StoreAnnouncePeerStream(stream)
ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(errors.New("foo")).Times(1)
task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed)
assert := assert.New(t)
assert.True(mockPeer.FSM.Is(PeerStateRunning))
},
},
{
name: "peer state is PeerStateRunning and announce peer successfully",
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) {
mockPeer.FSM.SetState(PeerStateRunning)
mockPeer.StoreAnnouncePeerStream(stream)
ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(nil).Times(1)
task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed)
assert := assert.New(t)
assert.True(mockPeer.FSM.Is(PeerStateFailed))
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,

View File

@ -80,7 +80,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo
peer.Log.Infof("peer needs to back-to-source: %t", needBackToSource)
if (n >= s.config.RetryBackToSourceLimit || needBackToSource) &&
peer.Task.CanBackToSource() {
stream, loaded := peer.LoadReportPieceStream()
stream, loaded := peer.LoadReportPieceResultStream()
if !loaded {
peer.Log.Error("load stream failed")
return
@ -112,7 +112,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo
// Handle peer schedule failed.
if n >= s.config.RetryLimit {
stream, loaded := peer.LoadReportPieceStream()
stream, loaded := peer.LoadReportPieceResultStream()
if !loaded {
peer.Log.Error("load stream failed")
return
@ -193,7 +193,7 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer
}
// Send scheduling success message.
stream, loaded := peer.LoadReportPieceStream()
stream, loaded := peer.LoadReportPieceResultStream()
if !loaded {
peer.Log.Error("load peer stream failed")
return []*resource.Peer{}, false

View File

@ -233,7 +233,7 @@ func TestScheduler_ScheduleParent(t *testing.T) {
task.StorePeer(peer)
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
peer.StoreReportPieceStream(stream)
peer.StoreReportPieceResultStream(stream)
mr.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedNeedBackSource})).Return(errors.New("foo")).Times(1)
},
@ -250,7 +250,7 @@ func TestScheduler_ScheduleParent(t *testing.T) {
task.StorePeer(peer)
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
peer.StoreReportPieceStream(stream)
peer.StoreReportPieceResultStream(stream)
mr.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedNeedBackSource})).Return(nil).Times(1)
},
@ -269,7 +269,7 @@ func TestScheduler_ScheduleParent(t *testing.T) {
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreReportPieceStream(stream)
peer.StoreReportPieceResultStream(stream)
mr.Send(gomock.Eq(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedNeedBackSource})).Return(nil).Times(1)
},
@ -315,7 +315,7 @@ func TestScheduler_ScheduleParent(t *testing.T) {
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
peer.Task.BackToSourceLimit.Store(-1)
peer.StoreReportPieceStream(stream)
peer.StoreReportPieceResultStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2),
@ -335,7 +335,7 @@ func TestScheduler_ScheduleParent(t *testing.T) {
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
peer.Task.BackToSourceLimit.Store(-1)
peer.StoreReportPieceStream(stream)
peer.StoreReportPieceResultStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2),
@ -356,7 +356,7 @@ func TestScheduler_ScheduleParent(t *testing.T) {
task.StorePeer(seedPeer)
peer.FSM.SetState(resource.PeerStateRunning)
seedPeer.FSM.SetState(resource.PeerStateRunning)
peer.StoreReportPieceStream(stream)
peer.StoreReportPieceResultStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1),
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{
@ -605,7 +605,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
peer.Task.StorePeer(peer)
peer.Task.StorePeer(mockPeer)
mockPeer.FinishedPieces.Set(0)
peer.StoreReportPieceStream(stream)
peer.StoreReportPieceResultStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1),
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{
@ -636,7 +636,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
mockPeer.IsBackToSource.Store(true)
candidatePeer.IsBackToSource.Store(true)
mockPeer.FinishedPieces.Set(0)
peer.StoreReportPieceStream(stream)
peer.StoreReportPieceResultStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1),
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{

View File

@ -208,8 +208,8 @@ func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultSer
}
// Peer setting stream.
peer.StoreReportPieceStream(stream)
defer peer.DeleteReportPieceStream()
peer.StoreReportPieceResultStream(stream)
defer peer.DeleteReportPieceResultStream()
}
if piece.PieceInfo != nil {
@ -1020,7 +1020,7 @@ func (v *V1) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece
// Returns an scheduling error if the peer
// state is not PeerStateRunning.
stream, loaded := peer.LoadReportPieceStream()
stream, loaded := peer.LoadReportPieceResultStream()
if !loaded {
peer.Log.Error("load stream failed")
return
@ -1131,7 +1131,7 @@ func (v *V1) handleTaskFailure(ctx context.Context, task *resource.Task, backToS
// and return the source metadata to peer.
if backToSourceErr != nil {
if !backToSourceErr.Temporary {
task.NotifyPeers(&schedulerv1.PeerPacket{
task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{
Code: commonv1.Code_BackToSourceAborted,
Errordetails: &schedulerv1.PeerPacket_SourceError{
SourceError: backToSourceErr,
@ -1162,7 +1162,7 @@ func (v *V1) handleTaskFailure(ctx context.Context, task *resource.Task, backToS
task.URLMeta.Tag, task.URLMeta.Application, proto, "0").Inc()
}
if !d.Temporary {
task.NotifyPeers(&schedulerv1.PeerPacket{
task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{
Code: commonv1.Code_BackToSourceAborted,
Errordetails: &schedulerv1.PeerPacket_SourceError{
SourceError: d,
@ -1176,7 +1176,7 @@ func (v *V1) handleTaskFailure(ctx context.Context, task *resource.Task, backToS
} else if task.PeerFailedCount.Load() > resource.FailedPeerCountLimit {
// If the number of failed peers in the task is greater than FailedPeerCountLimit,
// then scheduler notifies running peers of failure.
task.NotifyPeers(&schedulerv1.PeerPacket{
task.ReportPieceResultToPeers(&schedulerv1.PeerPacket{
Code: commonv1.Code_SchedTaskStatusError,
}, resource.PeerEventDownloadFailed)
task.PeerFailedCount.Store(0)

View File

@ -1011,7 +1011,7 @@ func TestService_ReportPieceResult(t *testing.T) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.NoError(err)
_, loaded := peer.LoadReportPieceStream()
_, loaded := peer.LoadReportPieceResultStream()
assert.False(loaded)
},
},
@ -1039,7 +1039,7 @@ func TestService_ReportPieceResult(t *testing.T) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.NoError(err)
_, loaded := peer.LoadReportPieceStream()
_, loaded := peer.LoadReportPieceResultStream()
assert.False(loaded)
},
},
@ -1068,7 +1068,7 @@ func TestService_ReportPieceResult(t *testing.T) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.NoError(err)
_, loaded := peer.LoadReportPieceStream()
_, loaded := peer.LoadReportPieceResultStream()
assert.False(loaded)
},
},
@ -1094,7 +1094,7 @@ func TestService_ReportPieceResult(t *testing.T) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.NoError(err)
_, loaded := peer.LoadReportPieceStream()
_, loaded := peer.LoadReportPieceResultStream()
assert.False(loaded)
},
},
@ -1121,7 +1121,7 @@ func TestService_ReportPieceResult(t *testing.T) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.NoError(err)
_, loaded := peer.LoadReportPieceStream()
_, loaded := peer.LoadReportPieceResultStream()
assert.False(loaded)
},
},