From 473c5d33f70afb6ab1f04eb64eebfead4768fa81 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 12 Dec 2022 15:58:36 +0800 Subject: [PATCH] feat: update priority api (#1912) Signed-off-by: Gaius --- go.mod | 2 +- go.sum | 4 +- manager/rpcserver/rpcserver.go | 5 +- scheduler/config/dynconfig_test.go | 25 +-- scheduler/resource/peer.go | 11 +- scheduler/resource/peer_test.go | 32 ++-- scheduler/service/service.go | 25 ++- scheduler/service/service_test.go | 245 +++++++++++++++++++++-------- 8 files changed, 231 insertions(+), 118 deletions(-) diff --git a/go.mod b/go.mod index 538fd98c3..a9636e85c 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.19 require ( - d7y.io/api v1.3.6 + d7y.io/api v1.3.9 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 github.com/VividCortex/mysqlerr v1.0.0 diff --git a/go.sum b/go.sum index c8a69d8a0..297e60d63 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= -d7y.io/api v1.3.6 h1:tcS2PBZqqgpSXMnSxTZ5WVDuT5K8O0+Di1L1BwCBBpI= -d7y.io/api v1.3.6/go.mod h1:HERD+sbavL0vJXkd37RZxJvpu+nXZ6ipffm4EFUbF2w= +d7y.io/api v1.3.9 h1:u5HgSTLDz/hMtDN4Ha9JI6KKFfBOfBbSe2nd+Cz94Tc= +d7y.io/api v1.3.9/go.mod h1:HERD+sbavL0vJXkd37RZxJvpu+nXZ6ipffm4EFUbF2w= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/manager/rpcserver/rpcserver.go b/manager/rpcserver/rpcserver.go index 7a08a9d4b..1065f4c6a 100644 --- a/manager/rpcserver/rpcserver.go +++ b/manager/rpcserver/rpcserver.go @@ -36,6 +36,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "gorm.io/gorm" + commonv1 "d7y.io/api/pkg/apis/common/v1" managerv1 "d7y.io/api/pkg/apis/manager/v1" logger "d7y.io/dragonfly/v2/internal/dflog" @@ -1015,7 +1016,7 @@ func (s *Server) ListApplications(ctx context.Context, req *managerv1.ListApplic for _, url := range priority.URLs { pbURLPriorities = append(pbURLPriorities, &managerv1.URLPriority{ Regex: url.Regex, - Value: managerv1.Priority(url.Value), + Value: commonv1.Priority(url.Value), }) } @@ -1025,7 +1026,7 @@ func (s *Server) ListApplications(ctx context.Context, req *managerv1.ListApplic Url: application.URL, Bio: application.BIO, Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority(*priority.Value), + Value: commonv1.Priority(*priority.Value), Urls: pbURLPriorities, }, }) diff --git a/scheduler/config/dynconfig_test.go b/scheduler/config/dynconfig_test.go index 3efb54099..781cea717 100644 --- a/scheduler/config/dynconfig_test.go +++ b/scheduler/config/dynconfig_test.go @@ -26,6 +26,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + commonv1 "d7y.io/api/pkg/apis/common/v1" managerv1 "d7y.io/api/pkg/apis/manager/v1" "d7y.io/dragonfly/v2/pkg/rpc/manager/client/mocks" @@ -105,11 +106,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, }, }, }, @@ -161,11 +162,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, }, }, }, @@ -229,11 +230,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, }, }, }, @@ -287,11 +288,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, }, }, }, @@ -355,11 +356,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, }, }, }, @@ -447,11 +448,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, }, }, }, diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index bd13abe31..13681a07a 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -30,6 +30,7 @@ import ( "github.com/looplab/fsm" "go.uber.org/atomic" + commonv1 "d7y.io/api/pkg/apis/common/v1" managerv1 "d7y.io/api/pkg/apis/manager/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" @@ -424,18 +425,18 @@ func (p *Peer) DownloadTinyFile() ([]byte, error) { } // GetPriority returns priority of peer. -func (p *Peer) GetPriority(dynconfig config.DynconfigInterface) managerv1.Priority { +func (p *Peer) GetPriority(dynconfig config.DynconfigInterface) commonv1.Priority { pbApplications, err := dynconfig.GetApplications() if err != nil { p.Log.Warn(err) - return managerv1.Priority_LEVEL0 + return commonv1.Priority_LEVEL0 } // If manager has no applications, // then return Priority_LEVEL0. if len(pbApplications) == 0 { p.Log.Info("can not found applications") - return managerv1.Priority_LEVEL0 + return commonv1.Priority_LEVEL0 } // Find peer application. @@ -451,14 +452,14 @@ func (p *Peer) GetPriority(dynconfig config.DynconfigInterface) managerv1.Priori // then return Priority_LEVEL0. if application == nil { p.Log.Info("can not found matching application") - return managerv1.Priority_LEVEL0 + return commonv1.Priority_LEVEL0 } // If application has no priority, // then return Priority_LEVEL0. if application.Priority == nil { p.Log.Info("can not found priority") - return managerv1.Priority_LEVEL0 + return commonv1.Priority_LEVEL0 } // Match url priority first. diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index 08165664e..5b0aea5e0 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -463,16 +463,16 @@ func TestPeer_GetPriority(t *testing.T) { tests := []struct { name string mock func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) - expect func(t *testing.T, priority managerv1.Priority) + expect func(t *testing.T, priority commonv1.Priority) }{ { name: "get applications failed", mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) { md.GetApplications().Return(nil, errors.New("bas")).Times(1) }, - expect: func(t *testing.T, priority managerv1.Priority) { + expect: func(t *testing.T, priority commonv1.Priority) { assert := assert.New(t) - assert.Equal(priority, managerv1.Priority_LEVEL0) + assert.Equal(priority, commonv1.Priority_LEVEL0) }, }, { @@ -480,9 +480,9 @@ func TestPeer_GetPriority(t *testing.T) { mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) { md.GetApplications().Return([]*managerv1.Application{}, nil).Times(1) }, - expect: func(t *testing.T, priority managerv1.Priority) { + expect: func(t *testing.T, priority commonv1.Priority) { assert := assert.New(t) - assert.Equal(priority, managerv1.Priority_LEVEL0) + assert.Equal(priority, commonv1.Priority_LEVEL0) }, }, { @@ -494,9 +494,9 @@ func TestPeer_GetPriority(t *testing.T) { }, }, nil).Times(1) }, - expect: func(t *testing.T, priority managerv1.Priority) { + expect: func(t *testing.T, priority commonv1.Priority) { assert := assert.New(t) - assert.Equal(priority, managerv1.Priority_LEVEL0) + assert.Equal(priority, commonv1.Priority_LEVEL0) }, }, { @@ -509,9 +509,9 @@ func TestPeer_GetPriority(t *testing.T) { }, }, nil).Times(1) }, - expect: func(t *testing.T, priority managerv1.Priority) { + expect: func(t *testing.T, priority commonv1.Priority) { assert := assert.New(t) - assert.Equal(priority, managerv1.Priority_LEVEL0) + assert.Equal(priority, commonv1.Priority_LEVEL0) }, }, { @@ -522,14 +522,14 @@ func TestPeer_GetPriority(t *testing.T) { { Name: "baz", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, }, }, }, nil).Times(1) }, - expect: func(t *testing.T, priority managerv1.Priority) { + expect: func(t *testing.T, priority commonv1.Priority) { assert := assert.New(t) - assert.Equal(priority, managerv1.Priority_LEVEL1) + assert.Equal(priority, commonv1.Priority_LEVEL1) }, }, { @@ -541,20 +541,20 @@ func TestPeer_GetPriority(t *testing.T) { { Name: "bak", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, Urls: []*managerv1.URLPriority{ { Regex: "am", - Value: managerv1.Priority_LEVEL2, + Value: commonv1.Priority_LEVEL2, }, }, }, }, }, nil).Times(1) }, - expect: func(t *testing.T, priority managerv1.Priority) { + expect: func(t *testing.T, priority commonv1.Priority) { assert := assert.New(t) - assert.Equal(priority, managerv1.Priority_LEVEL2) + assert.Equal(priority, commonv1.Priority_LEVEL2) }, }, } diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 77886104c..3ded14027 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -29,7 +29,6 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" errordetailsv1 "d7y.io/api/pkg/apis/errordetails/v1" - managerv1 "d7y.io/api/pkg/apis/manager/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/dragonfly/v2/internal/dferrors" @@ -518,33 +517,33 @@ func (s *Service) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequ // the different priorities of the peer and // priority of the RegisterPeerTask parameter is // higher than parameter of the application. - var priority managerv1.Priority - if req.Priority != managerv1.Priority_LEVEL0 { - priority = req.Priority + var priority commonv1.Priority + if req.UrlMeta.Priority != commonv1.Priority_LEVEL0 { + priority = req.UrlMeta.Priority } else { priority = peer.GetPriority(dynconfig) } peer.Log.Infof("peer priority is %d", priority) switch priority { - case managerv1.Priority_LEVEL6, managerv1.Priority_LEVEL0: + case commonv1.Priority_LEVEL6, commonv1.Priority_LEVEL0: if s.config.SeedPeer.Enable && !task.IsSeedPeerFailed() { go s.triggerSeedPeerTask(ctx, task) return nil } fallthrough - case managerv1.Priority_LEVEL5: + case commonv1.Priority_LEVEL5: fallthrough - case managerv1.Priority_LEVEL4: + case commonv1.Priority_LEVEL4: fallthrough - case managerv1.Priority_LEVEL3: - peer.Log.Infof("peer back-to-source, because of hitting priority %d", managerv1.Priority_LEVEL3) + case commonv1.Priority_LEVEL3: + peer.Log.Infof("peer back-to-source, because of hitting priority %d", commonv1.Priority_LEVEL3) peer.NeedBackToSource.Store(true) return nil - case managerv1.Priority_LEVEL2: - return fmt.Errorf("priority is %d and no available peers", managerv1.Priority_LEVEL2) - case managerv1.Priority_LEVEL1: - return fmt.Errorf("priority is %d", managerv1.Priority_LEVEL1) + case commonv1.Priority_LEVEL2: + return fmt.Errorf("priority is %d and no available peers", commonv1.Priority_LEVEL2) + case commonv1.Priority_LEVEL1: + return fmt.Errorf("priority is %d", commonv1.Priority_LEVEL1) } peer.Log.Infof("peer back-to-source, because of peer has invalid priority %d", priority) diff --git a/scheduler/service/service_test.go b/scheduler/service/service_test.go index e5b2abf89..7f389dbb2 100644 --- a/scheduler/service/service_test.go +++ b/scheduler/service/service_test.go @@ -160,10 +160,11 @@ var ( } mockTaskURLMeta = &commonv1.UrlMeta{ - Digest: "digest", - Tag: "tag", - Range: "range", - Filter: "filter", + Digest: "digest", + Tag: "tag", + Range: "range", + Filter: "filter", + Priority: commonv1.Priority_LEVEL0, Header: map[string]string{ "content-length": "100", }, @@ -219,7 +220,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task state is TaskStateRunning and it has available peer", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -253,7 +256,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task state is TaskStatePending and priority is Priority_LEVEL1", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -277,7 +282,7 @@ func TestService_RegisterPeerTask(t *testing.T) { { Name: "baz", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, }, }, }, nil).Times(1), @@ -296,7 +301,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task state is TaskStateRunning and peer state is PeerStateFailed", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -334,7 +341,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "get task scope size failed", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -370,7 +379,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_EMPTY", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -408,7 +419,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_TINY", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -448,7 +461,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_TINY and direct piece content is error", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -485,7 +500,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_TINY and direct piece content is error, peer state is PeerStateFailed", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -525,7 +542,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_SMALL and load piece error, parent state is PeerStateRunning", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -569,7 +588,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_SMALL and load piece error, peer state is PeerStateFailed", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -615,7 +636,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_SMALL and peer state is PeerStateFailed", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -661,7 +684,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_SMALL and vetex not found", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -703,7 +728,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_SMALL", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -746,7 +773,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_NORMAL and peer state is PeerStateFailed", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -786,7 +815,9 @@ func TestService_RegisterPeerTask(t *testing.T) { { name: "task scope size is SizeScope_NORMAL", req: &schedulerv1.PeerTaskRequest{ - UrlMeta: &commonv1.UrlMeta{}, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -1373,9 +1404,11 @@ func TestService_AnnounceTask(t *testing.T) { { name: "task state is TaskStateSucceeded and peer state is PeerStateSucceeded", req: &schedulerv1.AnnounceTaskRequest{ - TaskId: mockTaskID, - Url: mockURL, - UrlMeta: &commonv1.UrlMeta{}, + TaskId: mockTaskID, + Url: mockURL, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: &schedulerv1.PeerHost{ Id: mockRawHost.Id, }, @@ -1409,9 +1442,11 @@ func TestService_AnnounceTask(t *testing.T) { { name: "task state is TaskStatePending and peer state is PeerStateSucceeded", req: &schedulerv1.AnnounceTaskRequest{ - TaskId: mockTaskID, - Url: mockURL, - UrlMeta: &commonv1.UrlMeta{}, + TaskId: mockTaskID, + Url: mockURL, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: mockPeerHost, PiecePacket: &commonv1.PiecePacket{ PieceInfos: []*commonv1.PieceInfo{{PieceNum: 1, DownloadCost: 1}}, @@ -1452,9 +1487,11 @@ func TestService_AnnounceTask(t *testing.T) { { name: "task state is TaskStateFailed and peer state is PeerStateSucceeded", req: &schedulerv1.AnnounceTaskRequest{ - TaskId: mockTaskID, - Url: mockURL, - UrlMeta: &commonv1.UrlMeta{}, + TaskId: mockTaskID, + Url: mockURL, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: mockPeerHost, PiecePacket: &commonv1.PiecePacket{ PieceInfos: []*commonv1.PieceInfo{{PieceNum: 1, DownloadCost: 1}}, @@ -1495,9 +1532,11 @@ func TestService_AnnounceTask(t *testing.T) { { name: "task state is TaskStatePending and peer state is PeerStatePending", req: &schedulerv1.AnnounceTaskRequest{ - TaskId: mockTaskID, - Url: mockURL, - UrlMeta: &commonv1.UrlMeta{}, + TaskId: mockTaskID, + Url: mockURL, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: mockPeerHost, PiecePacket: &commonv1.PiecePacket{ PieceInfos: []*commonv1.PieceInfo{{PieceNum: 1, DownloadCost: 1}}, @@ -1538,9 +1577,11 @@ func TestService_AnnounceTask(t *testing.T) { { name: "task state is TaskStatePending and peer state is PeerStateReceivedNormal", req: &schedulerv1.AnnounceTaskRequest{ - TaskId: mockTaskID, - Url: mockURL, - UrlMeta: &commonv1.UrlMeta{}, + TaskId: mockTaskID, + Url: mockURL, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: mockPeerHost, PiecePacket: &commonv1.PiecePacket{ PieceInfos: []*commonv1.PieceInfo{{PieceNum: 1, DownloadCost: 1}}, @@ -2029,7 +2070,11 @@ func TestService_triggerTask(t *testing.T) { mockSeedPeer.FSM.SetState(resource.PeerStateRunning) mockTask.StorePeer(mockSeedPeer) - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) @@ -2045,7 +2090,11 @@ func TestService_triggerTask(t *testing.T) { mockSeedPeer.FSM.SetState(resource.PeerStateRunning) mockTask.StorePeer(mockSeedPeer) - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockTask.FSM.Current(), resource.TaskStateSucceeded) @@ -2060,7 +2109,11 @@ func TestService_triggerTask(t *testing.T) { mockTask.FSM.SetState(resource.TaskStateFailed) mockHost.Type = pkgtypes.HostTypeSuperSeed - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), true) @@ -2076,7 +2129,11 @@ func TestService_triggerTask(t *testing.T) { mockTask.FSM.SetState(resource.TaskStatePending) mockHost.Type = pkgtypes.HostTypeStrongSeed - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), true) @@ -2092,7 +2149,11 @@ func TestService_triggerTask(t *testing.T) { mockTask.FSM.SetState(resource.TaskStateRunning) mockHost.Type = pkgtypes.HostTypeWeakSeed - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), true) @@ -2108,7 +2169,11 @@ func TestService_triggerTask(t *testing.T) { mockTask.FSM.SetState(resource.TaskStateSucceeded) mockHost.Type = pkgtypes.HostTypeStrongSeed - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), true) @@ -2124,7 +2189,11 @@ func TestService_triggerTask(t *testing.T) { mockTask.FSM.SetState(resource.TaskStateLeave) mockHost.Type = pkgtypes.HostTypeStrongSeed - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), true) @@ -2152,7 +2221,11 @@ func TestService_triggerTask(t *testing.T) { mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1), ) - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), false) @@ -2174,7 +2247,11 @@ func TestService_triggerTask(t *testing.T) { md.GetApplications().Return(nil, errors.New("foo")).Times(1) - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), true) @@ -2197,12 +2274,16 @@ func TestService_triggerTask(t *testing.T) { { Name: "bas", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL5, + Value: commonv1.Priority_LEVEL5, }, }, }, nil).Times(1) - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), true) @@ -2225,12 +2306,16 @@ func TestService_triggerTask(t *testing.T) { { Name: "bas", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL4, + Value: commonv1.Priority_LEVEL4, }, }, }, nil).Times(1) - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), true) @@ -2253,12 +2338,16 @@ func TestService_triggerTask(t *testing.T) { { Name: "bae", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL3, + Value: commonv1.Priority_LEVEL3, }, }, }, nil).Times(1) - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), true) @@ -2281,12 +2370,16 @@ func TestService_triggerTask(t *testing.T) { { Name: "bae", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL2, + Value: commonv1.Priority_LEVEL2, }, }, }, nil).Times(1) - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.Error(err) }, @@ -2307,12 +2400,16 @@ func TestService_triggerTask(t *testing.T) { { Name: "bat", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL1, + Value: commonv1.Priority_LEVEL1, }, }, }, nil).Times(1) - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.Error(err) }, @@ -2338,7 +2435,7 @@ func TestService_triggerTask(t *testing.T) { { Name: "bat", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_LEVEL0, + Value: commonv1.Priority_LEVEL0, }, }, }, nil).Times(1), @@ -2346,7 +2443,11 @@ func TestService_triggerTask(t *testing.T) { mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1), ) - err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{}, mockTask, mockHost, mockPeer, dynconfig) + err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, + }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) assert.Equal(mockPeer.NeedBackToSource.Load(), false) @@ -2374,7 +2475,9 @@ func TestService_triggerTask(t *testing.T) { ) err := svc.triggerTask(context.Background(), &schedulerv1.PeerTaskRequest{ - Priority: managerv1.Priority_LEVEL6, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL6, + }, }, mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) assert.NoError(err) @@ -2416,9 +2519,11 @@ func TestService_storeTask(t *testing.T) { { name: "task already exists", req: &schedulerv1.PeerTaskRequest{ - TaskId: mockTaskID, - Url: "https://example.com", - UrlMeta: &commonv1.UrlMeta{}, + TaskId: mockTaskID, + Url: "https://example.com", + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: mockPeerHost, }, taskType: commonv1.TaskType_Normal, @@ -2439,9 +2544,11 @@ func TestService_storeTask(t *testing.T) { { name: "task does not exist", req: &schedulerv1.PeerTaskRequest{ - TaskId: mockTaskID, - Url: "https://example.com", - UrlMeta: &commonv1.UrlMeta{}, + TaskId: mockTaskID, + Url: "https://example.com", + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, PeerHost: mockPeerHost, }, taskType: commonv1.TaskType_DfCache, @@ -2581,8 +2688,10 @@ func TestService_storePeer(t *testing.T) { { name: "peer already exists", req: &schedulerv1.PeerTaskRequest{ - PeerId: mockPeerID, - UrlMeta: &commonv1.UrlMeta{}, + PeerId: mockPeerID, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, }, mock: func(mockPeer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( @@ -2599,8 +2708,10 @@ func TestService_storePeer(t *testing.T) { { name: "peer does not exists", req: &schedulerv1.PeerTaskRequest{ - PeerId: mockPeerID, - UrlMeta: &commonv1.UrlMeta{}, + PeerId: mockPeerID, + UrlMeta: &commonv1.UrlMeta{ + Priority: commonv1.Priority_LEVEL0, + }, }, mock: func(mockPeer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder(