diff --git a/go.mod b/go.mod index 56f98ccbe..5688a4f16 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.19 require ( - d7y.io/api v1.3.2 + d7y.io/api v1.3.3 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 github.com/VividCortex/mysqlerr v1.0.0 diff --git a/go.sum b/go.sum index ffe58155b..94ca1505b 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= -d7y.io/api v1.3.2 h1:ClaW+I4VwtTN3yaHcrevTkcA7PbR4WNv3gJBPnMZi1Y= -d7y.io/api v1.3.2/go.mod h1:HERD+sbavL0vJXkd37RZxJvpu+nXZ6ipffm4EFUbF2w= +d7y.io/api v1.3.3 h1:KxOfhOLd4/cbgt2rJWJbSVtehdOG+lhwRP1O492PZd4= +d7y.io/api v1.3.3/go.mod h1:HERD+sbavL0vJXkd37RZxJvpu+nXZ6ipffm4EFUbF2w= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/internal/dflog/logger.go b/internal/dflog/logger.go index f6a9ec7a9..0ddaa38be 100644 --- a/internal/dflog/logger.go +++ b/internal/dflog/logger.go @@ -147,6 +147,12 @@ func WithTaskID(taskID string) *SugaredLoggerOnWith { } } +func WithHostID(hostID string) *SugaredLoggerOnWith { + return &SugaredLoggerOnWith{ + withArgs: []any{"hostID", hostID}, + } +} + func WithKeepAlive(hostname, ip, sourceType string, clusterID uint64) *SugaredLoggerOnWith { return &SugaredLoggerOnWith{ withArgs: []any{"hostname", hostname, "ip", ip, "sourceType", sourceType, "clusterID", clusterID}, diff --git a/scheduler/config/dynconfig_test.go b/scheduler/config/dynconfig_test.go index 884ec2422..96219bc67 100644 --- a/scheduler/config/dynconfig_test.go +++ b/scheduler/config/dynconfig_test.go @@ -105,11 +105,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_Level0, + Value: managerv1.Priority_LEVEL0, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_Level1, + Value: managerv1.Priority_LEVEL0, }, }, }, @@ -161,11 +161,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_Level0, + Value: managerv1.Priority_LEVEL0, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_Level1, + Value: managerv1.Priority_LEVEL0, }, }, }, @@ -229,11 +229,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_Level0, + Value: managerv1.Priority_LEVEL0, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_Level1, + Value: managerv1.Priority_LEVEL0, }, }, }, @@ -287,11 +287,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_Level0, + Value: managerv1.Priority_LEVEL0, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_Level1, + Value: managerv1.Priority_LEVEL0, }, }, }, @@ -355,11 +355,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_Level0, + Value: managerv1.Priority_LEVEL0, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_Level1, + Value: managerv1.Priority_LEVEL0, }, }, }, @@ -447,11 +447,11 @@ func TestDynconfig_GetManagerSourceType(t *testing.T) { Url: "example.com", Bio: "bar", Priority: &managerv1.ApplicationPriority{ - Value: managerv1.Priority_Level0, + Value: managerv1.Priority_LEVEL0, Urls: []*managerv1.URLPriority{ { Regex: "blobs*", - Value: managerv1.Priority_Level1, + Value: managerv1.Priority_LEVEL0, }, }, }, diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index 2d3955a5c..bcec513c8 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -22,6 +22,7 @@ import ( "io" "net/http" "net/url" + "regexp" "time" "github.com/bits-and-blooms/bitset" @@ -29,10 +30,12 @@ import ( "github.com/looplab/fsm" "go.uber.org/atomic" + managerv1 "d7y.io/api/pkg/apis/manager/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/container/set" + "d7y.io/dragonfly/v2/scheduler/config" ) const ( @@ -415,3 +418,57 @@ func (p *Peer) DownloadTinyFile() ([]byte, error) { return io.ReadAll(resp.Body) } + +// GetPriority returns priority of peer. +func (p *Peer) GetPriority(dynconfig config.DynconfigInterface) managerv1.Priority { + pbApplications, err := dynconfig.GetApplications() + if err != nil { + p.Log.Warn(err) + return managerv1.Priority_LEVEL5 + } + + // If manager has no applications, + // then return Priority_LEVEL5. + if len(pbApplications) == 0 { + p.Log.Info("can not found applications") + return managerv1.Priority_LEVEL5 + } + + // Find peer application. + var application *managerv1.Application + for _, pbApplication := range pbApplications { + if p.Application == pbApplication.Name { + application = pbApplication + break + } + } + + // If no application matches peer application, + // then return Priority_LEVEL5. + if application == nil { + p.Log.Info("can not found matching application") + return managerv1.Priority_LEVEL5 + } + + // If application has no priority, + // then return Priority_LEVEL5. + if application.Priority == nil { + p.Log.Info("can not found priority") + return managerv1.Priority_LEVEL5 + } + + // Match url priority first. + for _, url := range application.Priority.Urls { + matched, err := regexp.MatchString(url.Regex, p.Task.URL) + if err != nil { + p.Log.Warn(err) + continue + } + + if matched { + return url.Value + } + } + + return application.Priority.Value +} diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index e8c42b68c..97b6538ef 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -17,6 +17,7 @@ package resource import ( + "errors" "fmt" "net" "net/http" @@ -30,11 +31,13 @@ import ( "github.com/stretchr/testify/assert" commonv1 "d7y.io/api/pkg/apis/common/v1" + managerv1 "d7y.io/api/pkg/apis/manager/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/api/pkg/apis/scheduler/v1/mocks" "d7y.io/dragonfly/v2/client/util" "d7y.io/dragonfly/v2/pkg/idgen" + configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" ) var ( @@ -455,3 +458,118 @@ func TestPeer_DownloadTinyFile(t *testing.T) { }) } } + +func TestPeer_GetPriority(t *testing.T) { + tests := []struct { + name string + mock func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) + expect func(t *testing.T, priority managerv1.Priority) + }{ + { + name: "get applications failed", + mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) { + md.GetApplications().Return(nil, errors.New("bas")).Times(1) + }, + expect: func(t *testing.T, priority managerv1.Priority) { + assert := assert.New(t) + assert.Equal(priority, managerv1.Priority_LEVEL5) + }, + }, + { + name: "can not found applications", + mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) { + md.GetApplications().Return([]*managerv1.Application{}, nil).Times(1) + }, + expect: func(t *testing.T, priority managerv1.Priority) { + assert := assert.New(t) + assert.Equal(priority, managerv1.Priority_LEVEL5) + }, + }, + { + name: "can not found matching application", + mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) { + md.GetApplications().Return([]*managerv1.Application{ + { + Name: "baw", + }, + }, nil).Times(1) + }, + expect: func(t *testing.T, priority managerv1.Priority) { + assert := assert.New(t) + assert.Equal(priority, managerv1.Priority_LEVEL5) + }, + }, + { + name: "can not found priority", + mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.Application = "bae" + md.GetApplications().Return([]*managerv1.Application{ + { + Name: "bae", + }, + }, nil).Times(1) + }, + expect: func(t *testing.T, priority managerv1.Priority) { + assert := assert.New(t) + assert.Equal(priority, managerv1.Priority_LEVEL5) + }, + }, + { + name: "match the priority of application", + mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.Application = "baz" + md.GetApplications().Return([]*managerv1.Application{ + { + Name: "baz", + Priority: &managerv1.ApplicationPriority{ + Value: managerv1.Priority_LEVEL0, + }, + }, + }, nil).Times(1) + }, + expect: func(t *testing.T, priority managerv1.Priority) { + assert := assert.New(t) + assert.Equal(priority, managerv1.Priority_LEVEL0) + }, + }, + { + name: "match the priority of url", + mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.Application = "bak" + peer.Task.URL = "example.com" + md.GetApplications().Return([]*managerv1.Application{ + { + Name: "bak", + Priority: &managerv1.ApplicationPriority{ + Value: managerv1.Priority_LEVEL0, + Urls: []*managerv1.URLPriority{ + { + Regex: "am", + Value: managerv1.Priority_LEVEL1, + }, + }, + }, + }, + }, nil).Times(1) + }, + expect: func(t *testing.T, priority managerv1.Priority) { + assert := assert.New(t) + assert.Equal(priority, managerv1.Priority_LEVEL1) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + + mockHost := NewHost(mockRawHost) + mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) + peer := NewPeer(mockPeerID, mockTask, mockHost) + tc.mock(peer, dynconfig.EXPECT()) + tc.expect(t, peer.GetPriority(dynconfig)) + }) + } +} diff --git a/scheduler/resource/resource_test.go b/scheduler/resource/resource_test.go index 0c33c6ca4..c44a02027 100644 --- a/scheduler/resource/resource_test.go +++ b/scheduler/resource/resource_test.go @@ -36,16 +36,16 @@ func TestResource_New(t *testing.T) { tests := []struct { name string config *config.Config - mock func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) + mock func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) expect func(t *testing.T, resource Resource, err error) }{ { name: "new resource", config: config.New(), - mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { + mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( - gc.Add(gomock.Any()).Return(nil).Times(3), - dynconfig.Get().Return(&config.DynconfigData{ + mg.Add(gomock.Any()).Return(nil).Times(3), + md.Get().Return(&config.DynconfigData{ Scheduler: &managerv1.Scheduler{ SeedPeers: []*managerv1.SeedPeer{ { @@ -54,9 +54,9 @@ func TestResource_New(t *testing.T) { }, }, }, nil).Times(1), - dynconfig.Register(gomock.Any()).Return().Times(1), - dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), - dynconfig.Register(gomock.Any()).Return().Times(1), + md.Register(gomock.Any()).Return().Times(1), + md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + md.Register(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, resource Resource, err error) { @@ -68,10 +68,8 @@ func TestResource_New(t *testing.T) { { name: "new resource failed because of host manager error", config: config.New(), - mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { - gomock.InOrder( - gc.Add(gomock.Any()).Return(errors.New("foo")).Times(1), - ) + mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mg.Add(gomock.Any()).Return(errors.New("foo")).Times(1) }, expect: func(t *testing.T, resource Resource, err error) { assert := assert.New(t) @@ -81,10 +79,10 @@ func TestResource_New(t *testing.T) { { name: "new resource failed because of task manager error", config: config.New(), - mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { + mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( - gc.Add(gomock.Any()).Return(nil).Times(1), - gc.Add(gomock.Any()).Return(errors.New("foo")).Times(1), + mg.Add(gomock.Any()).Return(nil).Times(1), + mg.Add(gomock.Any()).Return(errors.New("foo")).Times(1), ) }, expect: func(t *testing.T, resource Resource, err error) { @@ -95,10 +93,10 @@ func TestResource_New(t *testing.T) { { name: "new resource failed because of peer manager error", config: config.New(), - mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { + mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( - gc.Add(gomock.Any()).Return(nil).Times(2), - gc.Add(gomock.Any()).Return(errors.New("foo")).Times(1), + mg.Add(gomock.Any()).Return(nil).Times(2), + mg.Add(gomock.Any()).Return(errors.New("foo")).Times(1), ) }, expect: func(t *testing.T, resource Resource, err error) { @@ -109,10 +107,10 @@ func TestResource_New(t *testing.T) { { name: "new resource faild because of dynconfig get error", config: config.New(), - mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { + mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( - gc.Add(gomock.Any()).Return(nil).Times(3), - dynconfig.Get().Return(nil, errors.New("foo")).Times(1), + mg.Add(gomock.Any()).Return(nil).Times(3), + md.Get().Return(nil, errors.New("foo")).Times(1), ) }, expect: func(t *testing.T, resource Resource, err error) { @@ -123,17 +121,17 @@ func TestResource_New(t *testing.T) { { name: "new resource faild because of seed peer list is empty", config: config.New(), - mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { + mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( - gc.Add(gomock.Any()).Return(nil).Times(3), - dynconfig.Get().Return(&config.DynconfigData{ + mg.Add(gomock.Any()).Return(nil).Times(3), + md.Get().Return(&config.DynconfigData{ Scheduler: &managerv1.Scheduler{ SeedPeers: []*managerv1.SeedPeer{}, }, }, nil).Times(1), - dynconfig.Register(gomock.Any()).Return().Times(1), - dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), - dynconfig.Register(gomock.Any()).Return().Times(1), + md.Register(gomock.Any()).Return().Times(1), + md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + md.Register(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, resource Resource, err error) { @@ -156,10 +154,8 @@ func TestResource_New(t *testing.T) { Enable: false, }, }, - mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { - gomock.InOrder( - gc.Add(gomock.Any()).Return(nil).Times(3), - ) + mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mg.Add(gomock.Any()).Return(nil).Times(3) }, expect: func(t *testing.T, resource Resource, err error) { assert := assert.New(t) diff --git a/scheduler/resource/seed_peer.go b/scheduler/resource/seed_peer.go index 5cce332e7..458472672 100644 --- a/scheduler/resource/seed_peer.go +++ b/scheduler/resource/seed_peer.go @@ -183,11 +183,8 @@ func (s *seedPeer) initSeedPeer(task *Task, ps *cdnsystemv1.PieceSeed) (*Peer, e return nil, fmt.Errorf("can not find host id: %s", ps.HostId) } - // New seed peer. + // New and store seed peer. peer = NewPeer(ps.PeerId, task, host, WithTag(SeedTag), WithApplication(SeedApplication)) - peer.Log.Info("new seed peer successfully") - - // Store seed peer. s.peerManager.Store(peer) peer.Log.Info("seed peer has been stored") diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index b8b1342f2..66ebef2d0 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -325,7 +325,7 @@ func (t *Task) PeerOutDegree(key string) (int, error) { } // HasAvailablePeer returns whether there is an available peer. -func (t *Task) HasAvailablePeer() bool { +func (t *Task) HasAvailablePeer(blocklist set.SafeSet[string]) bool { var hasAvailablePeer bool for _, vertex := range t.DAG.GetVertices() { peer := vertex.Value @@ -333,6 +333,10 @@ func (t *Task) HasAvailablePeer() bool { continue } + if blocklist.Contains(peer.ID) { + continue + } + if peer.FSM.Is(PeerStatePending) || peer.FSM.Is(PeerStateRunning) || peer.FSM.Is(PeerStateSucceeded) || @@ -375,8 +379,8 @@ func (t *Task) LoadSeedPeer() (*Peer, bool) { // IsSeedPeerFailed returns whether the seed peer in the task failed. func (t *Task) IsSeedPeerFailed() bool { - seedPeer, ok := t.LoadSeedPeer() - return ok && seedPeer.FSM.Is(PeerStateFailed) && time.Since(seedPeer.CreatedAt.Load()) < SeedPeerFailedTimeout + seedPeer, loaded := t.LoadSeedPeer() + return loaded && seedPeer.FSM.Is(PeerStateFailed) && time.Since(seedPeer.CreatedAt.Load()) < SeedPeerFailedTimeout } // LoadPiece return piece for a key. diff --git a/scheduler/resource/task_test.go b/scheduler/resource/task_test.go index 5fa09d69d..f6e0e9bcd 100644 --- a/scheduler/resource/task_test.go +++ b/scheduler/resource/task_test.go @@ -28,6 +28,7 @@ import ( schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/api/pkg/apis/scheduler/v1/mocks" + "d7y.io/dragonfly/v2/pkg/container/set" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/types" ) @@ -934,6 +935,22 @@ func TestTask_HasAvailablePeer(t *testing.T) { backToSourceLimit int32 expect func(t *testing.T, task *Task, mockPeer *Peer) }{ + { + name: "blocklist includes peer", + id: mockTaskID, + urlMeta: mockTaskURLMeta, + url: mockTaskURL, + backToSourceLimit: mockTaskBackToSourceLimit, + expect: func(t *testing.T, task *Task, mockPeer *Peer) { + assert := assert.New(t) + mockPeer.FSM.SetState(PeerStatePending) + task.StorePeer(mockPeer) + + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockPeer.ID) + assert.Equal(task.HasAvailablePeer(blocklist), false) + }, + }, { name: "peer state is PeerStatePending", id: mockTaskID, @@ -946,7 +963,7 @@ func TestTask_HasAvailablePeer(t *testing.T) { mockPeer.ID = idgen.PeerID("0.0.0.0") mockPeer.FSM.SetState(PeerStatePending) task.StorePeer(mockPeer) - assert.Equal(task.HasAvailablePeer(), true) + assert.Equal(task.HasAvailablePeer(set.NewSafeSet[string]()), true) }, }, { @@ -961,7 +978,7 @@ func TestTask_HasAvailablePeer(t *testing.T) { mockPeer.ID = idgen.PeerID("0.0.0.0") mockPeer.FSM.SetState(PeerStateSucceeded) task.StorePeer(mockPeer) - assert.Equal(task.HasAvailablePeer(), true) + assert.Equal(task.HasAvailablePeer(set.NewSafeSet[string]()), true) }, }, { @@ -976,7 +993,7 @@ func TestTask_HasAvailablePeer(t *testing.T) { mockPeer.ID = idgen.PeerID("0.0.0.0") mockPeer.FSM.SetState(PeerStateRunning) task.StorePeer(mockPeer) - assert.Equal(task.HasAvailablePeer(), true) + assert.Equal(task.HasAvailablePeer(set.NewSafeSet[string]()), true) }, }, { @@ -991,7 +1008,7 @@ func TestTask_HasAvailablePeer(t *testing.T) { mockPeer.ID = idgen.PeerID("0.0.0.0") mockPeer.FSM.SetState(PeerStateBackToSource) task.StorePeer(mockPeer) - assert.Equal(task.HasAvailablePeer(), true) + assert.Equal(task.HasAvailablePeer(set.NewSafeSet[string]()), true) }, }, { @@ -1002,7 +1019,7 @@ func TestTask_HasAvailablePeer(t *testing.T) { backToSourceLimit: mockTaskBackToSourceLimit, expect: func(t *testing.T, task *Task, mockPeer *Peer) { assert := assert.New(t) - assert.Equal(task.HasAvailablePeer(), false) + assert.Equal(task.HasAvailablePeer(set.NewSafeSet[string]()), false) }, }, } diff --git a/scheduler/scheduler/scheduler.go b/scheduler/scheduler/scheduler.go index 93d22fd92..c37f4f7c5 100644 --- a/scheduler/scheduler/scheduler.go +++ b/scheduler/scheduler/scheduler.go @@ -77,6 +77,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo // If the scheduling exceeds the RetryBackToSourceLimit or peer needs back-to-source, // peer will download the task back-to-source. needBackToSource := peer.NeedBackToSource.Load() + peer.Log.Infof("peer needs to back-to-source: %t", needBackToSource) if (n >= s.config.RetryBackToSourceLimit || needBackToSource) && peer.Task.CanBackToSource() { stream, ok := peer.LoadStream() @@ -84,13 +85,11 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo peer.Log.Error("load stream failed") return } - - peer.Log.Infof("peer downloads back-to-source, scheduling %d times, peer need back-to-source %t", - n, needBackToSource) + peer.Log.Infof("schedule peer back-to-source in %d times", n) // Notify peer back-to-source. if err := stream.Send(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedNeedBackSource}); err != nil { - peer.Log.Errorf("send packet failed: %s", err.Error()) + peer.Log.Error(err) return } @@ -121,23 +120,23 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo // Notify peer schedule failed. if err := stream.Send(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}); err != nil { - peer.Log.Errorf("send packet failed: %s", err.Error()) + peer.Log.Error(err) return } - peer.Log.Errorf("peer scheduling exceeds the limit %d times and return code %d", s.config.RetryLimit, commonv1.Code_SchedTaskStatusError) + peer.Log.Errorf("peer scheduling exceeds the limit %d times", s.config.RetryLimit) return } if _, ok := s.NotifyAndFindParent(ctx, peer, blocklist); !ok { n++ - peer.Log.Infof("schedule parent %d times failed", n) + peer.Log.Infof("schedule parent failed in %d times ", n) // Sleep to avoid hot looping. time.Sleep(s.config.RetryInterval) continue } - peer.Log.Infof("schedule parent %d times successfully", n+1) + peer.Log.Infof("schedule parent successfully in %d times", n+1) return } } @@ -205,8 +204,7 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer return []*resource.Peer{}, false } - peer.Log.Infof("schedule parent successful, replace parent to %s and candidate parents is %v", - parentIDs[0], parentIDs[1:]) + peer.Log.Infof("schedule candidate parents is %#v", parentIDs) return candidateParents, true } @@ -228,7 +226,7 @@ func (s *scheduler) FindParent(ctx context.Context, peer *resource.Peer, blockli }, ) - peer.Log.Infof("find parent %s successful", candidateParents[0].ID) + peer.Log.Infof("schedule candidate parent is %s", candidateParents[0].ID) return candidateParents[0], true } @@ -258,13 +256,13 @@ func (s *scheduler) filterCandidateParents(peer *resource.Peer, blocklist set.Sa // Candidate parent is in blocklist. if blocklist.Contains(candidateParent.ID) { - peer.Log.Debugf("candidate parent %s is not selected because it is in blocklist", candidateParent.ID) + peer.Log.Debugf("parent %s is not selected because it is in blocklist", candidateParent.ID) continue } // Candidate parent can add edge with peer. if !peer.Task.CanAddPeerEdge(candidateParent.ID, peer.ID) { - peer.Log.Debugf("can not add edge with candidate parent %s", candidateParent.ID) + peer.Log.Debugf("can not add edge with parent %s", candidateParent.ID) continue } @@ -272,20 +270,20 @@ func (s *scheduler) filterCandidateParents(peer *resource.Peer, blocklist set.Sa // because dfdaemon cannot handle the situation // where two tasks are downloading and downloading each other. if peer.Host.ID == candidateParent.Host.ID { - peer.Log.Debugf("candidate parent %s host %s is the same as peer host", candidateParent.ID, candidateParent.Host.ID) + peer.Log.Debugf("parent %s host %s is the same as peer host", candidateParent.ID, candidateParent.Host.ID) continue } // Candidate parent is bad node. if s.evaluator.IsBadNode(candidateParent) { - peer.Log.Debugf("candidate parent %s is not selected because it is bad node", candidateParent.ID) + peer.Log.Debugf("parent %s is not selected because it is bad node", candidateParent.ID) continue } // Candidate parent can not find in dag. inDegree, err := peer.Task.PeerInDegree(candidateParent.ID) if err != nil { - peer.Log.Debugf("can not find candidate parent %s vertex in dag", candidateParent.ID) + peer.Log.Debugf("can not find parent %s vertex in dag", candidateParent.ID) continue } @@ -297,14 +295,14 @@ func (s *scheduler) filterCandidateParents(peer *resource.Peer, blocklist set.Sa isBackToSource := candidateParent.IsBackToSource.Load() if candidateParent.Host.Type == types.HostTypeNormal && inDegree == 0 && !isBackToSource && !candidateParent.FSM.Is(resource.PeerStateSucceeded) { - peer.Log.Debugf("candidate parent %s is not selected, because its download state is %d %d %t %s", + peer.Log.Debugf("parent %s is not selected, because its download state is %d %d %t %s", candidateParent.ID, inDegree, int(candidateParent.Host.Type), isBackToSource, candidateParent.FSM.Current()) continue } // Candidate parent's free upload is empty. if candidateParent.Host.FreeUploadCount() <= 0 { - peer.Log.Debugf("candidate parent %s is not selected because its free upload is empty, upload limit is %d, upload count is %d", + peer.Log.Debugf("parent %s is not selected because its free upload is empty, upload limit is %d, upload count is %d", candidateParent.ID, candidateParent.Host.ConcurrentUploadLimit.Load(), candidateParent.Host.ConcurrentUploadCount.Load()) continue } @@ -313,7 +311,7 @@ func (s *scheduler) filterCandidateParents(peer *resource.Peer, blocklist set.Sa candidateParentIDs = append(candidateParentIDs, candidateParent.ID) } - peer.Log.Infof("candidate parents include %#v", candidateParentIDs) + peer.Log.Infof("filter candidate parents is %#v", candidateParentIDs) return candidateParents } diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 9c7eab830..8afa5cd89 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -29,6 +29,7 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" errordetailsv1 "d7y.io/api/pkg/apis/errordetails/v1" + managerv1 "d7y.io/api/pkg/apis/manager/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/dragonfly/v2/internal/dferrors" @@ -83,22 +84,28 @@ func New( func (s *Service) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) { logger.WithPeer(req.PeerHost.Id, req.TaskId, req.PeerId).Infof("register peer task request: %#v %#v %#v", req, req.UrlMeta, req.HostLoad) - // Register task and trigger seed peer download task. - task, needBackToSource := s.registerTask(ctx, req) - host := s.registerHost(ctx, req.PeerHost) - peer := s.registerPeer(ctx, req.PeerId, task, host, req.UrlMeta.Tag, req.UrlMeta.Application) - // When the peer registers for the first time and - // does not have a seed peer, it will back-to-source. - peer.NeedBackToSource.Store(needBackToSource) + // Store resource. + task := s.storeTask(ctx, req, commonv1.TaskType_Normal) + host := s.storeHost(ctx, req.PeerHost) + peer := s.storePeer(ctx, req.PeerId, task, host, req.UrlMeta.Tag, req.UrlMeta.Application) + // Trigger the first download of the task. + if err := s.triggerTask(ctx, task, host, peer, s.dynconfig); err != nil { + peer.Log.Error(err) + s.handleRegisterFailure(ctx, peer) + return nil, dferrors.New(commonv1.Code_SchedForbidden, err.Error()) + } + + // If the task does not succeed, it is scheduled as a normal task. if !task.FSM.Is(resource.TaskStateSucceeded) { - peer.Log.Infof("task can not be reused directly, because of task state is %s", + peer.Log.Infof("register as normal task, because of task state is %s", task.FSM.Current()) result, err := s.registerNormalTask(ctx, peer) if err != nil { peer.Log.Error(err) + s.handleRegisterFailure(ctx, peer) return nil, dferrors.New(commonv1.Code_SchedError, err.Error()) } @@ -110,23 +117,20 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTas if err != nil { peer.Log.Warnf("scope size is invalid: %s", err.Error()) } + peer.Log.Infof("task size scope is %s", sizeScope) // The task state is TaskStateSucceeded and SizeScope is not invalid. - peer.Log.Info("task can be reused directly") switch sizeScope { case commonv1.SizeScope_EMPTY: - peer.Log.Info("task size scope is EMPTY") result, err := s.registerEmptyTask(ctx, peer) if err != nil { peer.Log.Error(err) + s.handleRegisterFailure(ctx, peer) return nil, dferrors.New(commonv1.Code_SchedError, err.Error()) } - peer.Log.Info("return empty content") return result, nil case commonv1.SizeScope_TINY: - peer.Log.Info("task size scope is TINY") - // Validate data of direct piece. if !peer.Task.CanReuseDirectPiece() { peer.Log.Warnf("register as normal task, because of length of direct piece is %d, content length is %d", @@ -140,28 +144,25 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTas break } - peer.Log.Info("return direct piece") return result, nil case commonv1.SizeScope_SMALL: - peer.Log.Info("task size scope is SMALL") result, err := s.registerSmallTask(ctx, peer) if err != nil { peer.Log.Warnf("register as normal task, because of %s", err.Error()) break } - peer.Log.Info("return the single piece") return result, nil } - peer.Log.Infof("task size scope is %s", sizeScope) result, err := s.registerNormalTask(ctx, peer) if err != nil { peer.Log.Error(err) + s.handleRegisterFailure(ctx, peer) return nil, dferrors.New(commonv1.Code_SchedError, err.Error()) } - peer.Log.Info("return the normal task") + peer.Log.Info("register as normal task, because of invalid size scope") return result, nil } @@ -187,7 +188,8 @@ func (s *Service) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResu if err == io.EOF { return nil } - logger.Errorf("receive piece %#v error: %s", piece, err.Error()) + + logger.Errorf("receive piece failed: %s", err.Error()) return err } @@ -227,7 +229,7 @@ func (s *Service) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResu // Handle piece download successfully. if piece.Success { - peer.Log.Infof("receive piece: %#v %#v", piece, piece.PieceInfo) + peer.Log.Infof("receive success piece: %#v %#v", piece, piece.PieceInfo) s.handlePieceSuccess(ctx, peer, piece) // Collect peer host traffic metrics. @@ -236,7 +238,7 @@ func (s *Service) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResu if parent, loaded := s.resource.PeerManager().Load(piece.DstPid); loaded { metrics.PeerHostTraffic.WithLabelValues(peer.Tag, peer.Application, metrics.PeerHostTrafficUploadType, parent.Host.ID, parent.Host.IP).Add(float64(piece.PieceInfo.RangeSize)) } else if !resource.IsPieceBackToSource(piece) { - peer.Log.Warnf("dst peer %s not found for piece %#v %#v", piece.DstPid, piece, piece.PieceInfo) + peer.Log.Warnf("dst peer %s not found", piece.DstPid) } } @@ -252,13 +254,13 @@ func (s *Service) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResu // Handle piece download code. if piece.Code != commonv1.Code_Success { if piece.Code == commonv1.Code_ClientWaitPieceReady { - peer.Log.Debugf("receive piece code %d and wait for dfdaemon piece ready", piece.Code) + peer.Log.Debug("receive wait piece") continue } // Handle piece download failed. peer.Log.Errorf("receive failed piece: %#v", piece) - s.handlePieceFail(ctx, peer, piece) + s.handlePieceFailure(ctx, peer, piece) continue } @@ -268,9 +270,11 @@ func (s *Service) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResu // ReportPeerResult handles peer result reported by dfdaemon. func (s *Service) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) error { + logger.WithTaskAndPeerID(req.TaskId, req.PeerId).Infof("report peer result request: %#v", req) + peer, loaded := s.resource.PeerManager().Load(req.PeerId) if !loaded { - msg := fmt.Sprintf("report peer result and peer %s is not exists", req.PeerId) + msg := fmt.Sprintf("peer %s not found", req.PeerId) logger.Error(msg) return dferrors.New(commonv1.Code_SchedPeerNotFound, msg) } @@ -278,23 +282,23 @@ func (s *Service) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerRes parents := peer.Parents() if !req.Success { - peer.Log.Errorf("report peer failed result: %s %#v", req.Code, req) + peer.Log.Error("report failed peer") if peer.FSM.Is(resource.PeerStateBackToSource) { metrics.DownloadFailureCount.WithLabelValues(peer.Tag, peer.Application, metrics.DownloadFailureBackToSourceType).Inc() go s.createRecord(peer, parents, req) - s.handleTaskFail(ctx, peer.Task, req.GetSourceError(), nil) - s.handlePeerFail(ctx, peer) + s.handleTaskFailure(ctx, peer.Task, req.GetSourceError(), nil) + s.handlePeerFailure(ctx, peer) return nil } metrics.DownloadFailureCount.WithLabelValues(peer.Tag, peer.Application, metrics.DownloadFailureP2PType).Inc() go s.createRecord(peer, parents, req) - s.handlePeerFail(ctx, peer) + s.handlePeerFailure(ctx, peer) return nil } metrics.PeerTaskDownloadDuration.WithLabelValues(peer.Tag, peer.Application).Observe(float64(req.Cost)) - peer.Log.Infof("report peer result: %#v", req) + peer.Log.Info("report success peer") if peer.FSM.Is(resource.PeerStateBackToSource) { go s.createRecord(peer, parents, req) s.handleTaskSuccess(ctx, peer.Task, req) @@ -309,14 +313,16 @@ func (s *Service) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerRes // AnnounceTask informs scheduler a peer has completed task. func (s *Service) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequest) error { + logger.WithPeer(req.PeerHost.Id, req.TaskId, req.PiecePacket.DstPid).Infof("announce task request: %#v %#v %#v %#v", + req, req.UrlMeta, req.PeerHost, req.PiecePacket, + ) + taskID := req.TaskId peerID := req.PiecePacket.DstPid - task := resource.NewTask(taskID, req.Url, req.TaskType, req.UrlMeta) task, _ = s.resource.TaskManager().LoadOrStore(task) - host := s.registerHost(ctx, req.PeerHost) - peer := s.registerPeer(ctx, peerID, task, host, req.UrlMeta.Tag, req.UrlMeta.Application) - peer.Log.Infof("announce peer task request: %#v %#v %#v %#v", req, req.UrlMeta, req.PeerHost, req.PiecePacket) + host := s.storeHost(ctx, req.PeerHost) + peer := s.storePeer(ctx, peerID, task, host, req.UrlMeta.Tag, req.UrlMeta.Application) // If the task state is not TaskStateSucceeded, // advance the task state to TaskStateSucceeded. @@ -379,6 +385,8 @@ func (s *Service) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTas // StatTask checks the current state of the task. func (s *Service) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*schedulerv1.Task, error) { + logger.WithTaskID(req.TaskId).Infof("stat task request: %#v", req) + task, loaded := s.resource.TaskManager().Load(req.TaskId) if !loaded { msg := fmt.Sprintf("task %s not found", req.TaskId) @@ -386,7 +394,6 @@ func (s *Service) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest return nil, dferrors.New(commonv1.Code_PeerTaskNotFound, msg) } - task.Log.Debug("task has been found") return &schedulerv1.Task{ Id: task.ID, Type: task.Type, @@ -394,12 +401,14 @@ func (s *Service) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest TotalPieceCount: task.TotalPieceCount.Load(), State: task.FSM.Current(), PeerCount: int32(task.PeerCount()), - HasAvailablePeer: task.HasAvailablePeer(), + HasAvailablePeer: task.HasAvailablePeer(set.NewSafeSet[string]()), }, nil } // LeaveTask releases peer in scheduler. func (s *Service) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) error { + logger.WithTaskAndPeerID(req.TaskId, req.PeerId).Infof("leave task request: %#v", req) + peer, loaded := s.resource.PeerManager().Load(req.PeerId) if !loaded { msg := fmt.Sprintf("peer %s not found", req.PeerId) @@ -408,10 +417,8 @@ func (s *Service) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) er } metrics.LeaveTaskCount.WithLabelValues(peer.Tag, peer.Application).Inc() - peer.Log.Infof("client releases peer, causing the peer to leave: %#v", req) if err := peer.FSM.Event(resource.PeerEventLeave); err != nil { metrics.LeaveTaskFailureCount.WithLabelValues(peer.Tag, peer.Application).Inc() - msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) peer.Log.Error(msg) return dferrors.New(commonv1.Code_SchedTaskStatusError, msg) @@ -466,7 +473,8 @@ func (s *Service) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHos // LeaveHost releases host in scheduler. func (s *Service) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) error { - logger.Infof("leave host: %#v", req) + logger.WithHostID(req.Id).Infof("leave host request: %#v", req) + host, loaded := s.resource.HostManager().Load(req.Id) if !loaded { msg := fmt.Sprintf("host %s not found", req.Id) @@ -478,59 +486,102 @@ func (s *Service) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostReque return nil } -// registerTask creates a new task or reuses a previous task. -func (s *Service) registerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*resource.Task, bool) { - task, loaded := s.resource.TaskManager().Load(req.TaskId) - if loaded { - // Task is the pointer, if the task already exists, the next request will - // update the task's Url and UrlMeta in task manager. - task.URL = req.Url - task.URLMeta = req.UrlMeta - - if (task.FSM.Is(resource.TaskStatePending) || task.FSM.Is(resource.TaskStateRunning) || task.FSM.Is(resource.TaskStateSucceeded)) && task.HasAvailablePeer() { - task.Log.Infof("task dose not need to back-to-source, because of task has available peer and state is %s", task.FSM.Current()) - return task, false - } - } else { - // Create a task for the first time. - task = resource.NewTask(req.TaskId, req.Url, commonv1.TaskType_Normal, req.UrlMeta, resource.WithBackToSourceLimit(int32(s.config.Scheduler.BackToSourceCount))) - s.resource.TaskManager().Store(task) +// triggerTask triggers the first download of the task. +func (s *Service) triggerTask(ctx context.Context, task *resource.Task, host *resource.Host, peer *resource.Peer, dynconfig config.DynconfigInterface) error { + // If task has available peer, peer does not need to be triggered. + blocklist := set.NewSafeSet[string]() + blocklist.Add(peer.ID) + if (task.FSM.Is(resource.TaskStateRunning) || + task.FSM.Is(resource.TaskStateSucceeded)) && + task.HasAvailablePeer(blocklist) { + peer.Log.Info("peer does not need to trigger") + return nil } // If the task triggers the TaskEventDownload failed and it has no available peer, // let the peer do the scheduling. - if err := task.FSM.Event(resource.TaskEventDownload); err != nil { - task.Log.Warnf("task dose not need to back-to-source, because of %s", err.Error()) - return task, false - } - - // Seed peer registers the task, then it needs to back-to-source. - host, loaded := s.resource.HostManager().Load(req.PeerHost.Id) - if loaded && host.Type != types.HostTypeNormal { - task.Log.Infof("task needs to back-to-source, because of host can be loaded and type is %d", host.Type) - return task, true - } - - // FIXME Need to add the condition that the seed peer grpc client is - // available and can be triggered back-to-source. - if s.config.SeedPeer.Enable { - if task.IsSeedPeerFailed() { - task.Log.Info("task needs to back-to-source, because of seed peer is failed") - return task, true + if !task.FSM.Is(resource.TaskStateRunning) { + if err := task.FSM.Event(resource.TaskEventDownload); err != nil { + peer.Log.Errorf("task fsm event failed: %s", err.Error()) + return err } - - go s.triggerSeedPeerTask(ctx, task) - task.Log.Info("task dose not need to back-to-source, because of seed peer has been triggered") - return task, false } - // Task need to back-to-source. - task.Log.Info("task needs to back-to-source, because of seed peer disabled") - return task, true + // If host type is not HostTypeNormal, then it needs to back-to-source. + if host.Type != types.HostTypeNormal { + peer.Log.Infof("peer back-to-source, because of host type is %d", host.Type) + peer.NeedBackToSource.Store(true) + return nil + } + + // The first download is triggered according to + // the different priorities of the peer. + priority := peer.GetPriority(dynconfig) + peer.Log.Infof("peer priority is %d", priority) + switch priority { + case managerv1.Priority_LEVEL5: + if s.config.SeedPeer.Enable && !task.IsSeedPeerFailed() { + go s.triggerSeedPeerTask(ctx, task) + return nil + } + fallthrough + case managerv1.Priority_LEVEL4: + fallthrough + case managerv1.Priority_LEVEL3: + fallthrough + case managerv1.Priority_LEVEL2: + peer.Log.Infof("peer back-to-source, because of hitting priority %d", managerv1.Priority_LEVEL2) + peer.NeedBackToSource.Store(true) + return nil + case managerv1.Priority_LEVEL1: + return fmt.Errorf("priority is %d and no available peers", managerv1.Priority_LEVEL1) + case managerv1.Priority_LEVEL0: + return fmt.Errorf("priority is %d", managerv1.Priority_LEVEL0) + } + + peer.Log.Infof("peer back-to-source, because of peer has invalid priority %d", priority) + peer.NeedBackToSource.Store(true) + return nil } -// registerHost creates a new host or reuses a previous host. -func (s *Service) registerHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *resource.Host { +// triggerSeedPeerTask starts to trigger seed peer task. +func (s *Service) triggerSeedPeerTask(ctx context.Context, task *resource.Task) { + task.Log.Info("trigger seed peer") + peer, endOfPiece, err := s.resource.SeedPeer().TriggerTask( + trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)), task) + if err != nil { + task.Log.Errorf("trigger seed peer failed: %s", err.Error()) + s.handleTaskFailure(ctx, task, nil, err) + return + } + + // Update the task status first to help peer scheduling evaluation and scoring. + peer.Log.Info("trigger seed peer successfully") + s.handleTaskSuccess(ctx, task, endOfPiece) + s.handlePeerSuccess(ctx, peer) +} + +// storeTask stores a new task or reuses a previous task. +func (s *Service) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, taskType commonv1.TaskType) *resource.Task { + task, loaded := s.resource.TaskManager().Load(req.TaskId) + if !loaded { + // Create a task for the first time. + task = resource.NewTask(req.TaskId, req.Url, taskType, req.UrlMeta, resource.WithBackToSourceLimit(int32(s.config.Scheduler.BackToSourceCount))) + s.resource.TaskManager().Store(task) + task.Log.Info("create new task") + return task + } + + // Task is the pointer, if the task already exists, the next request will + // update the task's Url and UrlMeta in task manager. + task.URL = req.Url + task.URLMeta = req.UrlMeta + task.Log.Info("task already exists") + return task +} + +// storeHost stores a new host or reuses a previous host. +func (s *Service) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *resource.Host { host, loaded := s.resource.HostManager().Load(peerHost.Id) if !loaded { // Get scheduler cluster client config by manager. @@ -562,8 +613,8 @@ func (s *Service) registerHost(ctx context.Context, peerHost *schedulerv1.PeerHo return host } -// registerPeer creates a new peer or reuses a previous peer. -func (s *Service) registerPeer(ctx context.Context, peerID string, task *resource.Task, host *resource.Host, tag, application string) *resource.Peer { +// storePeer stores a new peer or reuses a previous peer. +func (s *Service) storePeer(ctx context.Context, peerID string, task *resource.Task, host *resource.Host, tag, application string) *resource.Peer { var options []resource.PeerOption if tag != "" { options = append(options, resource.WithTag(tag)) @@ -578,27 +629,10 @@ func (s *Service) registerPeer(ctx context.Context, peerID string, task *resourc return peer } - peer.Log.Infof("peer already exists, state %s", peer.FSM.Current()) + peer.Log.Info("peer already exists") return peer } -// triggerSeedPeerTask starts to trigger seed peer task. -func (s *Service) triggerSeedPeerTask(ctx context.Context, task *resource.Task) { - task.Log.Infof("trigger seed peer download task and task status is %s", task.FSM.Current()) - peer, endOfPiece, err := s.resource.SeedPeer().TriggerTask( - trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)), task) - if err != nil { - task.Log.Errorf("trigger seed peer download task failed: %s", err.Error()) - s.handleTaskFail(ctx, task, nil, err) - return - } - - // Update the task status first to help peer scheduling evaluation and scoring. - peer.Log.Info("trigger seed peer download task successfully") - s.handleTaskSuccess(ctx, task, endOfPiece) - s.handlePeerSuccess(ctx, peer) -} - // registerEmptyTask registers the empty task. func (s *Service) registerEmptyTask(ctx context.Context, peer *resource.Peer) (*schedulerv1.RegisterResult, error) { if err := peer.FSM.Event(resource.PeerEventRegisterEmpty); err != nil { @@ -635,18 +669,18 @@ func (s *Service) registerTinyTask(ctx context.Context, peer *resource.Peer) (*s func (s *Service) registerSmallTask(ctx context.Context, peer *resource.Peer) (*schedulerv1.RegisterResult, error) { parent, ok := s.scheduler.FindParent(ctx, peer, set.NewSafeSet[string]()) if !ok { - return nil, errors.New("can not found parent") + return nil, errors.New("parent not found") } // When task size scope is small, parent must be downloaded successfully // before returning to the parent directly. if !parent.FSM.Is(resource.PeerStateSucceeded) { - return nil, fmt.Errorf("parent state %s is not PeerStateSucceede", parent.FSM.Current()) + return nil, fmt.Errorf("parent state is %s", parent.FSM.Current()) } firstPiece, loaded := peer.Task.LoadPiece(0) if !loaded { - return nil, fmt.Errorf("can not found first piece") + return nil, fmt.Errorf("first piece not found") } // Delete inedges of peer. @@ -697,17 +731,28 @@ func (s *Service) registerNormalTask(ctx context.Context, peer *resource.Peer) ( }, nil } +// handleRegisterFailure handles failure of register. +func (s *Service) handleRegisterFailure(ctx context.Context, peer *resource.Peer) { + if err := peer.FSM.Event(resource.PeerEventLeave); err != nil { + peer.Log.Error(err) + } + + s.resource.PeerManager().Delete(peer.ID) + return +} + // handleBeginOfPiece handles begin of piece. func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { - switch peer.FSM.Current() { + state := peer.FSM.Current() + peer.Log.Infof("peer state is %s", state) + + switch state { case resource.PeerStateBackToSource: // Back to the source download process, peer directly returns. - peer.Log.Info("peer downloads back-to-source when receive the begin of piece") return case resource.PeerStateReceivedTiny: // When the task is tiny, // the peer has already returned to piece data when registering. - peer.Log.Info("file type is tiny, peer has already returned to piece data when registering") if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return @@ -715,7 +760,6 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { case resource.PeerStateReceivedSmall: // When the task is small, // the peer has already returned to the parent when registering. - peer.Log.Info("file type is small, peer has already returned to the parent when registering") if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return @@ -726,10 +770,8 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { return } - peer.Log.Infof("schedule parent because of peer receive begin of piece") s.scheduler.ScheduleParent(ctx, peer, set.NewSafeSet[string]()) default: - peer.Log.Warnf("peer state is %s when receive the begin of piece", peer.FSM.Current()) } } @@ -765,8 +807,8 @@ func (s *Service) handlePieceSuccess(ctx context.Context, peer *resource.Peer, p } } -// handlePieceFail handles failed piece. -func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piece *schedulerv1.PieceResult) { +// handlePieceFailure handles failed piece. +func (s *Service) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece *schedulerv1.PieceResult) { // Failed to download piece back-to-source. if peer.FSM.Is(resource.PeerStateBackToSource) { return @@ -775,7 +817,7 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec // If parent can not found, reschedule parent. parent, loaded := s.resource.PeerManager().Load(piece.DstPid) if !loaded { - peer.Log.Errorf("reschedule parent because of peer can not found parent %s", piece.DstPid) + peer.Log.Errorf("parent %s not found", piece.DstPid) peer.BlockParents.Add(piece.DstPid) s.scheduler.ScheduleParent(ctx, peer, peer.BlockParents) return @@ -786,7 +828,10 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec // It’s not a case of back-to-source downloading failed, // to help peer to reschedule the parent node. - switch piece.Code { + code := piece.Code + peer.Log.Infof("piece error code is %s", code) + + switch code { case commonv1.Code_PeerTaskNotFound: if err := parent.FSM.Event(resource.PeerEventDownloadFailed); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) @@ -813,7 +858,7 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec // Peer state is PeerStateRunning will be rescheduled. if !peer.FSM.Is(resource.PeerStateRunning) { - peer.Log.Infof("peer can not be rescheduled because peer state is %s", peer.FSM.Current()) + peer.Log.Infof("peer state is %s and can not be rescheduled", peer.FSM.Current()) // Returns an scheduling error if the peer // state is not PeerStateRunning. @@ -824,14 +869,14 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec } if err := stream.Send(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedError}); err != nil { - peer.Log.Errorf("send packet failed: %s", err.Error()) + peer.Log.Error(err) return } return } - peer.Log.Infof("reschedule parent because of peer receive failed piece") + peer.Log.Infof("reschedule parent because of failed piece") peer.BlockParents.Add(parent.ID) s.scheduler.ScheduleParent(ctx, peer, peer.BlockParents) } @@ -845,7 +890,7 @@ func (s *Service) handlePeerSuccess(ctx context.Context, peer *resource.Peer) { sizeScope, err := peer.Task.SizeScope() if err != nil { - peer.Log.Errorf("get task size scope failed: %s", err.Error()) + peer.Log.Error(err) return } @@ -868,8 +913,8 @@ func (s *Service) handlePeerSuccess(ctx context.Context, peer *resource.Peer) { } } -// handlePeerFail handles failed peer. -func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) { +// handlePeerFailure handles failed peer. +func (s *Service) handlePeerFailure(ctx context.Context, peer *resource.Peer) { if err := peer.FSM.Event(resource.PeerEventDownloadFailed); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return @@ -885,7 +930,6 @@ func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) { // handleLegacySeedPeer handles seed server's task has left, // but did not notify the scheduler to leave the task. func (s *Service) handleLegacySeedPeer(ctx context.Context, peer *resource.Peer) { - peer.Log.Info("peer is legacy seed peer, causing the peer to leave") if err := peer.FSM.Event(resource.PeerEventLeave); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return @@ -920,7 +964,7 @@ func (s *Service) handleTaskSuccess(ctx context.Context, task *resource.Task, re // Conditions for the task to switch to the TaskStateSucceeded are: // 1. Seed peer downloads the resource failed. // 2. Dfdaemon back-to-source to download failed. -func (s *Service) handleTaskFail(ctx context.Context, task *resource.Task, backToSourceErr *errordetailsv1.SourceError, seedPeerErr error) { +func (s *Service) handleTaskFailure(ctx context.Context, task *resource.Task, backToSourceErr *errordetailsv1.SourceError, seedPeerErr error) { // If peer back-to-source fails due to an unrecoverable error, // notify other peers of the failure, // and return the source metadata to peer. @@ -946,18 +990,17 @@ func (s *Service) handleTaskFail(ctx context.Context, task *resource.Task, backT if u, err := url.Parse(task.URL); err == nil { proto = u.Scheme } + + task.Log.Infof("source error: %#v", d) // TODO currently, metrics.PeerTaskSourceErrorCounter is only updated for seed peer source error, need update for normal peer if d.Metadata != nil { - task.Log.Infof("source error: %d/%s", d.Metadata.StatusCode, d.Metadata.Status) metrics.PeerTaskSourceErrorCounter.WithLabelValues( task.URLMeta.Tag, task.URLMeta.Application, proto, fmt.Sprintf("%d", d.Metadata.StatusCode)).Inc() } else { - task.Log.Warn("source error, but no metadata found") metrics.PeerTaskSourceErrorCounter.WithLabelValues( task.URLMeta.Tag, task.URLMeta.Application, proto, "0").Inc() } if !d.Temporary { - task.Log.Infof("source error is not temporary, notify other peers task aborted") task.NotifyPeers(&schedulerv1.PeerPacket{ Code: commonv1.Code_BackToSourceAborted, Errordetails: &schedulerv1.PeerPacket_SourceError{ diff --git a/scheduler/service/service_test.go b/scheduler/service/service_test.go index 97446dfde..30f6e506b 100644 --- a/scheduler/service/service_test.go +++ b/scheduler/service/service_test.go @@ -37,6 +37,7 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" errordetailsv1 "d7y.io/api/pkg/apis/errordetails/v1" + managerv1 "d7y.io/api/pkg/apis/manager/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" schedulerv1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks" @@ -210,7 +211,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) expect func(t *testing.T, peer *resource.Peer, result *schedulerv1.RegisterResult, err error) }{ @@ -225,7 +227,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateRunning) mockSeedPeer.FSM.SetState(resource.PeerStateRunning) @@ -247,6 +250,49 @@ func TestService_RegisterPeerTask(t *testing.T) { assert.Equal(peer.NeedBackToSource.Load(), false) }, }, + { + name: "task state is TaskStatePending and priority is Priority_LEVEL0", + req: &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{}, + PeerHost: &schedulerv1.PeerHost{ + Id: mockRawHost.Id, + }, + }, + mock: func( + req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, + scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, + ) { + mockPeer.Task.FSM.SetState(resource.TaskStatePending) + mockPeer.Application = "baz" + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + md.GetApplications().Return([]*managerv1.Application{ + { + Name: "baz", + Priority: &managerv1.ApplicationPriority{ + Value: managerv1.Priority_LEVEL0, + }, + }, + }, nil).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Delete(gomock.Any()).Return().Times(1), + ) + }, + expect: func(t *testing.T, peer *resource.Peer, result *schedulerv1.RegisterResult, err error) { + assert := assert.New(t) + dferr, ok := err.(*dferrors.DfError) + assert.True(ok) + assert.Equal(dferr.Code, commonv1.Code_SchedForbidden) + assert.Equal(peer.FSM.Current(), resource.PeerStateLeave) + }, + }, { name: "task state is TaskStateRunning and peer state is PeerStateFailed", req: &schedulerv1.PeerTaskRequest{ @@ -258,7 +304,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateRunning) mockSeedPeer.FSM.SetState(resource.PeerStateRunning) @@ -271,6 +318,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1), mr.PeerManager().Return(peerManager).Times(1), mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, result *schedulerv1.RegisterResult, err error) { @@ -279,6 +328,7 @@ func TestService_RegisterPeerTask(t *testing.T) { assert.True(ok) assert.Equal(dferr.Code, commonv1.Code_SchedError) assert.Equal(peer.NeedBackToSource.Load(), false) + assert.Equal(peer.FSM.Current(), resource.PeerStateLeave) }, }, { @@ -292,7 +342,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockSeedPeer.FSM.SetState(resource.PeerStateRunning) @@ -327,7 +378,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockSeedPeer.FSM.SetState(resource.PeerStateRunning) @@ -364,7 +416,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockSeedPeer.FSM.SetState(resource.PeerStateRunning) @@ -403,7 +456,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockSeedPeer.FSM.SetState(resource.PeerStateRunning) @@ -439,7 +493,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockSeedPeer.FSM.SetState(resource.PeerStateRunning) @@ -454,6 +509,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1), mr.PeerManager().Return(peerManager).Times(1), mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, result *schedulerv1.RegisterResult, err error) { @@ -462,6 +519,7 @@ func TestService_RegisterPeerTask(t *testing.T) { assert.True(ok) assert.Equal(dferr.Code, commonv1.Code_SchedError) assert.Equal(peer.NeedBackToSource.Load(), false) + assert.Equal(peer.FSM.Current(), resource.PeerStateLeave) }, }, { @@ -475,7 +533,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockPeer.Task.StorePeer(mockPeer) @@ -518,7 +577,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockPeer.Task.StorePeer(mockPeer) @@ -539,6 +599,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mr.PeerManager().Return(peerManager).Times(1), mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), ms.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSeedPeer, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, result *schedulerv1.RegisterResult, err error) { @@ -547,6 +609,7 @@ func TestService_RegisterPeerTask(t *testing.T) { assert.True(ok) assert.Equal(dferr.Code, commonv1.Code_SchedError) assert.Equal(peer.NeedBackToSource.Load(), false) + assert.Equal(peer.FSM.Current(), resource.PeerStateLeave) }, }, { @@ -560,7 +623,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockPeer.Task.StorePeer(mockPeer) @@ -581,6 +645,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mr.PeerManager().Return(peerManager).Times(1), mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), ms.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSeedPeer, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, result *schedulerv1.RegisterResult, err error) { @@ -589,6 +655,7 @@ func TestService_RegisterPeerTask(t *testing.T) { assert.True(ok) assert.Equal(dferr.Code, commonv1.Code_SchedError) assert.Equal(peer.NeedBackToSource.Load(), false) + assert.Equal(peer.FSM.Current(), resource.PeerStateLeave) }, }, { @@ -602,7 +669,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockPeer.Task.StorePeer(mockSeedPeer) @@ -643,7 +711,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockPeer.Task.StorePeer(mockPeer) @@ -685,7 +754,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockSeedPeer.FSM.SetState(resource.PeerStateRunning) @@ -700,6 +770,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1), mr.PeerManager().Return(peerManager).Times(1), mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, result *schedulerv1.RegisterResult, err error) { @@ -708,6 +780,7 @@ func TestService_RegisterPeerTask(t *testing.T) { assert.True(ok) assert.Equal(dferr.Code, commonv1.Code_SchedError) assert.Equal(peer.NeedBackToSource.Load(), false) + assert.Equal(peer.FSM.Current(), resource.PeerStateLeave) }, }, { @@ -721,7 +794,8 @@ func TestService_RegisterPeerTask(t *testing.T) { mock: func( req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, - ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder, ) { mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) mockSeedPeer.FSM.SetState(resource.PeerStateRunning) @@ -769,7 +843,8 @@ func TestService_RegisterPeerTask(t *testing.T) { tc.mock( tc.req, mockPeer, mockSeedPeer, scheduler, res, hostManager, taskManager, peerManager, - scheduler.EXPECT(), res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), + scheduler.EXPECT(), res.EXPECT(), hostManager.EXPECT(), + taskManager.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT(), ) result, err := svc.RegisterPeerTask(context.Background(), tc.req) @@ -1966,400 +2041,315 @@ func TestService_LeaveHost(t *testing.T) { } } -func TestService_registerTask(t *testing.T) { +func TestService_triggerTask(t *testing.T) { tests := []struct { name string config *config.Config - req *schedulerv1.PeerTaskRequest - run func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) + run func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) }{ { - name: "task already exists and state is TaskStatePending", + name: "task state is TaskStateRunning and it has available peers", config: &config.Config{ Scheduler: mockSchedulerConfig, - SeedPeer: config.SeedPeerConfig{ - Enable: true, - }, }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mockTask.FSM.SetState(resource.TaskStateRunning) - mockTask.StorePeer(mockPeer) - mockPeer.FSM.SetState(resource.PeerStateRunning) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, true).Times(1), - ) + mockSeedPeer.FSM.SetState(resource.PeerStateRunning) + mockTask.StorePeer(mockSeedPeer) - task, needBackToSource := svc.registerTask(context.Background(), req) + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) - assert.False(needBackToSource) - assert.EqualValues(mockTask, task) + assert.NoError(err) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) }, }, { - name: "task already exists and state is TaskStateRunning", + name: "task state is TaskStateSucceeded and it has available peers", config: &config.Config{ Scheduler: mockSchedulerConfig, - SeedPeer: config.SeedPeerConfig{ - Enable: true, - }, }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { - mockTask.FSM.SetState(resource.TaskStateRunning) - mockTask.StorePeer(mockPeer) - mockPeer.FSM.SetState(resource.PeerStateRunning) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, true).Times(1), - ) - - task, needBackToSource := svc.registerTask(context.Background(), req) - assert := assert.New(t) - assert.False(needBackToSource) - assert.EqualValues(mockTask, task) - }, - }, - { - name: "task already exists and state is TaskStateSucceeded", - config: &config.Config{ - Scheduler: mockSchedulerConfig, - SeedPeer: config.SeedPeerConfig{ - Enable: true, - }, - }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mockTask.FSM.SetState(resource.TaskStateSucceeded) - mockTask.StorePeer(mockPeer) - mockPeer.FSM.SetState(resource.PeerStateRunning) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, true).Times(1), - ) + mockSeedPeer.FSM.SetState(resource.PeerStateRunning) + mockTask.StorePeer(mockSeedPeer) - task, needBackToSource := svc.registerTask(context.Background(), req) + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) - assert.False(needBackToSource) - assert.EqualValues(mockTask, task) - }, - }, - { - name: "task state is TaskStatePending", - config: &config.Config{ - Scheduler: mockSchedulerConfig, - SeedPeer: config.SeedPeerConfig{ - Enable: true, - }, - }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - PeerHost: &schedulerv1.PeerHost{ - Id: mockRawSeedHost.Id, - }, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { - var wg sync.WaitGroup - wg.Add(2) - defer wg.Wait() - - mockTask.FSM.SetState(resource.TaskStatePending) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, false).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Store(gomock.Any()).Return().Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Any()).Return(nil, false).Times(1), - mr.SeedPeer().Do(func() { wg.Done() }).Return(seedPeer).Times(1), - mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1), - ) - - task, needBackToSource := svc.registerTask(context.Background(), req) - assert := assert.New(t) - assert.False(needBackToSource) - assert.EqualValues(mockTaskURL, task.URL) - assert.EqualValues(mockTaskURLMeta, task.URLMeta) - }, - }, - { - name: "task state is TaskStateFailed", - config: &config.Config{ - Scheduler: mockSchedulerConfig, - SeedPeer: config.SeedPeerConfig{ - Enable: true, - }, - }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - PeerHost: &schedulerv1.PeerHost{ - Id: mockRawSeedHost.Id, - }, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { - var wg sync.WaitGroup - wg.Add(2) - defer wg.Wait() - - mockTask.FSM.SetState(resource.TaskStateFailed) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, true).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Any()).Return(nil, false), - mr.SeedPeer().Do(func() { wg.Done() }).Return(seedPeer).Times(1), - mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1), - ) - - task, needBackToSource := svc.registerTask(context.Background(), req) - assert := assert.New(t) - assert.False(needBackToSource) - assert.EqualValues(mockTask, task) - }, - }, - { - name: "task state is TaskStateRunning and it has not available peer", - config: &config.Config{ - Scheduler: mockSchedulerConfig, - SeedPeer: config.SeedPeerConfig{ - Enable: true, - }, - }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - PeerHost: &schedulerv1.PeerHost{ - Id: mockRawSeedHost.Id, - }, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { - mockTask.FSM.SetState(resource.TaskStateRunning) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, true).Times(1), - ) - - task, needBackToSource := svc.registerTask(context.Background(), req) - assert := assert.New(t) - assert.False(needBackToSource) - assert.EqualValues(mockTask, task) - }, - }, - { - name: "task state is TaskStateLeave", - config: &config.Config{ - Scheduler: mockSchedulerConfig, - SeedPeer: config.SeedPeerConfig{ - Enable: true, - }, - }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - PeerHost: &schedulerv1.PeerHost{ - Id: mockRawSeedHost.Id, - }, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { - var wg sync.WaitGroup - wg.Add(2) - defer wg.Wait() - - mockTask.FSM.SetState(resource.TaskStateLeave) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, true).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Any()).Return(nil, false), - mr.SeedPeer().Do(func() { wg.Done() }).Return(seedPeer).Times(1), - mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1), - ) - - task, needBackToSource := svc.registerTask(context.Background(), req) - assert := assert.New(t) - assert.False(needBackToSource) - assert.EqualValues(mockTask, task) + assert.NoError(err) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateSucceeded) }, }, { name: "task state is TaskStateFailed and host type is HostTypeSuperSeed", config: &config.Config{ Scheduler: mockSchedulerConfig, - SeedPeer: config.SeedPeerConfig{ - Enable: true, - }, }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - PeerHost: &schedulerv1.PeerHost{ - Id: mockRawSeedHost.Id, - }, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { - mockHost := resource.NewHost(mockRawSeedHost) + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mockTask.FSM.SetState(resource.TaskStateFailed) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, true).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Any()).Return(mockHost, true).Times(1), - ) + mockHost.Type = pkgtypes.HostTypeSuperSeed - task, needBackToSource := svc.registerTask(context.Background(), req) + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) - assert.True(needBackToSource) - assert.EqualValues(mockTask, task) + assert.NoError(err) + assert.Equal(mockPeer.NeedBackToSource.Load(), true) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) }, }, { - name: "task state is TaskStatePending, but trigger seedPeer failed", + name: "task state is TaskStatePending and host type is HostTypeStrongSeed", config: &config.Config{ Scheduler: mockSchedulerConfig, - SeedPeer: config.SeedPeerConfig{ - Enable: true, - }, }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - PeerHost: &schedulerv1.PeerHost{ - Id: mockRawSeedHost.Id, - }, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { - var wg sync.WaitGroup - wg.Add(2) - defer wg.Wait() - + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mockTask.FSM.SetState(resource.TaskStatePending) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, false).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Store(gomock.Any()).Return().Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Any()).Return(nil, false).Times(1), - mr.SeedPeer().Do(func() { wg.Done() }).Return(seedPeer).Times(1), - mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, errors.New("foo")).Times(1), - ) + mockHost.Type = pkgtypes.HostTypeStrongSeed - task, needBackToSource := svc.registerTask(context.Background(), req) + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) - assert.False(needBackToSource) - assert.EqualValues(mockTaskURL, task.URL) - assert.EqualValues(mockTaskURLMeta, task.URLMeta) + assert.NoError(err) + assert.Equal(mockPeer.NeedBackToSource.Load(), true) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) }, }, { - name: "task state is TaskStateFailed, but trigger seedPeer failed", + name: "task state is TaskStateRunning and host type is HostTypeWeakSeed", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + }, + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mockTask.FSM.SetState(resource.TaskStateRunning) + mockHost.Type = pkgtypes.HostTypeWeakSeed + + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockPeer.NeedBackToSource.Load(), true) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) + }, + }, + { + name: "task state is TaskStateSucceeded and host type is HostTypeStrongSeed", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + }, + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mockTask.FSM.SetState(resource.TaskStateSucceeded) + mockHost.Type = pkgtypes.HostTypeStrongSeed + + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockPeer.NeedBackToSource.Load(), true) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) + }, + }, + { + name: "task state is TaskStateLeave and host type is HostTypeStrongSeed", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + }, + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mockTask.FSM.SetState(resource.TaskStateLeave) + mockHost.Type = pkgtypes.HostTypeStrongSeed + + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockPeer.NeedBackToSource.Load(), true) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) + }, + }, + { + name: "priority is Priority_LEVEL5 and seed peer is enabled", config: &config.Config{ Scheduler: mockSchedulerConfig, SeedPeer: config.SeedPeerConfig{ Enable: true, }, }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - PeerHost: &schedulerv1.PeerHost{ - Id: mockRawSeedHost.Id, - }, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mockTask.FSM.SetState(resource.TaskStatePending) + var wg sync.WaitGroup wg.Add(2) defer wg.Wait() mockTask.FSM.SetState(resource.TaskStateFailed) gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, true).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Any()).Return(nil, false).Times(1), + md.GetApplications().Return(nil, errors.New("foo")).Times(1), mr.SeedPeer().Do(func() { wg.Done() }).Return(seedPeer).Times(1), - mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, errors.New("foo")).Times(1), + mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &schedulerv1.PeerResult{}, nil).Times(1), ) - task, needBackToSource := svc.registerTask(context.Background(), req) + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) - assert.False(needBackToSource) - assert.EqualValues(mockTask, task) + assert.NoError(err) + assert.Equal(mockPeer.NeedBackToSource.Load(), false) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) }, }, { - name: "task state is TaskStatePending and disable seed peer", + name: "priority is Priority_LEVEL5 and seed peer downloads failed", config: &config.Config{ Scheduler: mockSchedulerConfig, SeedPeer: config.SeedPeerConfig{ - Enable: false, + Enable: true, }, }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - PeerHost: &schedulerv1.PeerHost{ - Id: mockRawSeedHost.Id, - }, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mockTask.FSM.SetState(resource.TaskStatePending) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, false).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Store(gomock.Any()).Return().Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Any()).Return(nil, false).Times(1), - ) + mockSeedPeer.FSM.SetState(resource.PeerStateFailed) + mockTask.StorePeer(mockSeedPeer) - task, needBackToSource := svc.registerTask(context.Background(), req) + mockTask.FSM.SetState(resource.TaskStateFailed) + md.GetApplications().Return(nil, errors.New("foo")).Times(1) + + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) - assert.True(needBackToSource) - assert.EqualValues(mockTaskURL, task.URL) - assert.EqualValues(mockTaskURLMeta, task.URLMeta) + assert.NoError(err) + assert.Equal(mockPeer.NeedBackToSource.Load(), true) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) }, }, { - name: "task state is TaskStateFailed and disable seed peer", + name: "priority is Priority_LEVEL4", config: &config.Config{ Scheduler: mockSchedulerConfig, SeedPeer: config.SeedPeerConfig{ - Enable: false, + Enable: true, }, }, - req: &schedulerv1.PeerTaskRequest{ - Url: mockTaskURL, - UrlMeta: mockTaskURLMeta, - PeerHost: &schedulerv1.PeerHost{ - Id: mockRawSeedHost.Id, - }, - }, - run: func(t *testing.T, svc *Service, req *schedulerv1.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, hostManager resource.HostManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mh *resource.MockHostManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { - mockTask.FSM.SetState(resource.TaskStateFailed) - gomock.InOrder( - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Any()).Return(mockTask, true).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Any()).Return(nil, false).Times(1), - ) + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mockTask.FSM.SetState(resource.TaskStatePending) + mockPeer.Application = "bas" - task, needBackToSource := svc.registerTask(context.Background(), req) + mockTask.FSM.SetState(resource.TaskStateFailed) + md.GetApplications().Return([]*managerv1.Application{ + { + Name: "bas", + Priority: &managerv1.ApplicationPriority{ + Value: managerv1.Priority_LEVEL4, + }, + }, + }, nil).Times(1) + + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) assert := assert.New(t) - assert.True(needBackToSource) - assert.EqualValues(mockTask, task) + assert.NoError(err) + assert.Equal(mockPeer.NeedBackToSource.Load(), true) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) + }, + }, + { + name: "priority is Priority_LEVEL3", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mockTask.FSM.SetState(resource.TaskStatePending) + mockPeer.Application = "bas" + + mockTask.FSM.SetState(resource.TaskStateFailed) + md.GetApplications().Return([]*managerv1.Application{ + { + Name: "bas", + Priority: &managerv1.ApplicationPriority{ + Value: managerv1.Priority_LEVEL3, + }, + }, + }, nil).Times(1) + + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockPeer.NeedBackToSource.Load(), true) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) + }, + }, + { + name: "priority is Priority_LEVEL2", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mockTask.FSM.SetState(resource.TaskStatePending) + mockPeer.Application = "bae" + + mockTask.FSM.SetState(resource.TaskStateFailed) + md.GetApplications().Return([]*managerv1.Application{ + { + Name: "bae", + Priority: &managerv1.ApplicationPriority{ + Value: managerv1.Priority_LEVEL2, + }, + }, + }, nil).Times(1) + + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockPeer.NeedBackToSource.Load(), true) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateRunning) + }, + }, + { + name: "priority is Priority_LEVEL1", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mockTask.FSM.SetState(resource.TaskStatePending) + mockPeer.Application = "bae" + + mockTask.FSM.SetState(resource.TaskStateFailed) + md.GetApplications().Return([]*managerv1.Application{ + { + Name: "bae", + Priority: &managerv1.ApplicationPriority{ + Value: managerv1.Priority_LEVEL1, + }, + }, + }, nil).Times(1) + + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) + assert := assert.New(t) + assert.Error(err) + }, + }, + { + name: "priority is Priority_LEVEL0", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *Service, mockTask *resource.Task, mockHost *resource.Host, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, dynconfig config.DynconfigInterface, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + mockTask.FSM.SetState(resource.TaskStatePending) + mockPeer.Application = "bat" + + mockTask.FSM.SetState(resource.TaskStateFailed) + md.GetApplications().Return([]*managerv1.Application{ + { + Name: "bat", + Priority: &managerv1.ApplicationPriority{ + Value: managerv1.Priority_LEVEL0, + }, + }, + }, nil).Times(1) + + err := svc.triggerTask(context.Background(), mockTask, mockHost, mockPeer, dynconfig) + assert := assert.New(t) + assert.Error(err) }, }, } @@ -2374,18 +2364,95 @@ func TestService_registerTask(t *testing.T) { storage := storagemocks.NewMockStorage(ctl) svc := New(tc.config, res, scheduler, dynconfig, storage) - taskManager := resource.NewMockTaskManager(ctl) - hostManager := resource.NewMockHostManager(ctl) mockHost := resource.NewHost(mockRawHost) + mockSeedHost := resource.NewHost(mockRawSeedHost) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) + mockSeedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockSeedHost) seedPeer := resource.NewMockSeedPeer(ctl) - tc.run(t, svc, tc.req, mockTask, mockPeer, taskManager, hostManager, seedPeer, res.EXPECT(), taskManager.EXPECT(), hostManager.EXPECT(), seedPeer.EXPECT()) + tc.run(t, svc, mockTask, mockHost, mockPeer, mockSeedPeer, dynconfig, seedPeer, res.EXPECT(), seedPeer.EXPECT(), dynconfig.EXPECT()) }) } } -func TestService_registerHost(t *testing.T) { +func TestService_storeTask(t *testing.T) { + tests := []struct { + name string + req *schedulerv1.PeerTaskRequest + taskType commonv1.TaskType + mock func(mockTask *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) + expect func(t *testing.T, task *resource.Task, req *schedulerv1.PeerTaskRequest) + }{ + { + name: "task already exists", + req: &schedulerv1.PeerTaskRequest{ + TaskId: mockTaskID, + Url: "https://example.com", + UrlMeta: &commonv1.UrlMeta{}, + PeerHost: mockPeerHost, + }, + taskType: commonv1.TaskType_Normal, + mock: func(mockTask *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) { + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(mockTaskID)).Return(mockTask, true).Times(1), + ) + }, + expect: func(t *testing.T, task *resource.Task, req *schedulerv1.PeerTaskRequest) { + assert := assert.New(t) + assert.Equal(task.ID, mockTaskID) + assert.Equal(task.URL, req.Url) + assert.Equal(task.Type, commonv1.TaskType_Normal) + assert.EqualValues(task.URLMeta, req.UrlMeta) + }, + }, + { + name: "task does not exist", + req: &schedulerv1.PeerTaskRequest{ + TaskId: mockTaskID, + Url: "https://example.com", + UrlMeta: &commonv1.UrlMeta{}, + PeerHost: mockPeerHost, + }, + taskType: commonv1.TaskType_DfCache, + mock: func(mockTask *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) { + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(mockTaskID)).Return(nil, false).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Store(gomock.Any()).Return().Times(1), + ) + }, + expect: func(t *testing.T, task *resource.Task, req *schedulerv1.PeerTaskRequest) { + assert := assert.New(t) + assert.Equal(task.ID, mockTaskID) + assert.Equal(task.URL, req.Url) + assert.Equal(task.Type, commonv1.TaskType_DfCache) + assert.EqualValues(task.URLMeta, req.UrlMeta) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduler := mocks.NewMockScheduler(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + svc := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage) + taskManager := resource.NewMockTaskManager(ctl) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta) + + tc.mock(mockTask, taskManager, res.EXPECT(), taskManager.EXPECT()) + task := svc.storeTask(context.Background(), tc.req, tc.taskType) + tc.expect(t, task, tc.req) + }) + } +} + +func TestService_storeHost(t *testing.T) { tests := []struct { name string req *schedulerv1.PeerTaskRequest @@ -2468,12 +2535,78 @@ func TestService_registerHost(t *testing.T) { mockHost := resource.NewHost(mockRawHost) tc.mock(mockHost, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT()) - host := svc.registerHost(context.Background(), tc.req.PeerHost) + host := svc.storeHost(context.Background(), tc.req.PeerHost) tc.expect(t, host) }) } } +func TestService_storePeer(t *testing.T) { + tests := []struct { + name string + req *schedulerv1.PeerTaskRequest + mock func(mockPeer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + expect func(t *testing.T, peer *resource.Peer) + }{ + { + name: "peer already exists", + req: &schedulerv1.PeerTaskRequest{ + PeerId: mockPeerID, + UrlMeta: &commonv1.UrlMeta{}, + }, + mock: func(mockPeer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + ) + }, + expect: func(t *testing.T, peer *resource.Peer) { + assert := assert.New(t) + assert.Equal(peer.ID, mockPeerID) + assert.Equal(peer.Tag, resource.DefaultTag) + }, + }, + { + name: "peer does not exists", + req: &schedulerv1.PeerTaskRequest{ + PeerId: mockPeerID, + UrlMeta: &commonv1.UrlMeta{}, + }, + mock: func(mockPeer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.LoadOrStore(gomock.Any()).Return(mockPeer, false).Times(1), + ) + }, + expect: func(t *testing.T, peer *resource.Peer) { + assert := assert.New(t) + assert.Equal(peer.ID, mockPeerID) + assert.Equal(peer.Tag, resource.DefaultTag) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduler := mocks.NewMockScheduler(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + svc := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage) + peerManager := resource.NewMockPeerManager(ctl) + mockHost := resource.NewHost(mockRawHost) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) + mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) + + tc.mock(mockPeer, peerManager, res.EXPECT(), peerManager.EXPECT()) + peer := svc.storePeer(context.Background(), tc.req.PeerId, mockTask, mockHost, tc.req.UrlMeta.Tag, tc.req.UrlMeta.Application) + tc.expect(t, peer) + }) + } +} + func TestService_triggerSeedPeerTask(t *testing.T) { tests := []struct { name string @@ -2617,72 +2750,6 @@ func TestService_handleBeginOfPiece(t *testing.T) { } } -func TestService_registerPeer(t *testing.T) { - tests := []struct { - name string - req *schedulerv1.PeerTaskRequest - mock func(mockPeer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) - expect func(t *testing.T, peer *resource.Peer) - }{ - { - name: "peer already exists", - req: &schedulerv1.PeerTaskRequest{ - PeerId: mockPeerID, - UrlMeta: &commonv1.UrlMeta{}, - }, - mock: func(mockPeer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { - gomock.InOrder( - mr.PeerManager().Return(peerManager).Times(1), - mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), - ) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - assert.Equal(peer.ID, mockPeerID) - assert.Equal(peer.Tag, resource.DefaultTag) - }, - }, - { - name: "peer does not exists", - req: &schedulerv1.PeerTaskRequest{ - PeerId: mockPeerID, - UrlMeta: &commonv1.UrlMeta{}, - }, - mock: func(mockPeer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { - gomock.InOrder( - mr.PeerManager().Return(peerManager).Times(1), - mp.LoadOrStore(gomock.Any()).Return(mockPeer, false).Times(1), - ) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - assert.Equal(peer.ID, mockPeerID) - assert.Equal(peer.Tag, resource.DefaultTag) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - scheduler := mocks.NewMockScheduler(ctl) - res := resource.NewMockResource(ctl) - dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - svc := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage) - peerManager := resource.NewMockPeerManager(ctl) - mockHost := resource.NewHost(mockRawHost) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) - mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) - - tc.mock(mockPeer, peerManager, res.EXPECT(), peerManager.EXPECT()) - peer := svc.registerPeer(context.Background(), tc.req.PeerId, mockTask, mockHost, tc.req.UrlMeta.Tag, tc.req.UrlMeta.Application) - tc.expect(t, peer) - }) - } -} - func TestService_handlePieceSuccess(t *testing.T) { mockHost := resource.NewHost(mockRawHost) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) @@ -2783,7 +2850,7 @@ func TestService_handlePieceFail(t *testing.T) { run: func(t *testing.T, svc *Service, peer *resource.Peer, parent *resource.Peer, piece *schedulerv1.PieceResult, peerManager resource.PeerManager, seedPeer resource.SeedPeer, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) { peer.FSM.SetState(resource.PeerStateBackToSource) - svc.handlePieceFail(context.Background(), peer, piece) + svc.handlePieceFailure(context.Background(), peer, piece) assert := assert.New(t) assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) assert.Equal(parent.Host.UploadFailedCount.Load(), int64(0)) @@ -2810,7 +2877,7 @@ func TestService_handlePieceFail(t *testing.T) { ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) - svc.handlePieceFail(context.Background(), peer, piece) + svc.handlePieceFailure(context.Background(), peer, piece) assert := assert.New(t) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.Equal(parent.Host.UploadFailedCount.Load(), int64(0)) @@ -2838,7 +2905,7 @@ func TestService_handlePieceFail(t *testing.T) { ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) - svc.handlePieceFail(context.Background(), peer, piece) + svc.handlePieceFailure(context.Background(), peer, piece) assert := assert.New(t) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.True(parent.FSM.Is(resource.PeerStateFailed)) @@ -2867,7 +2934,7 @@ func TestService_handlePieceFail(t *testing.T) { ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) - svc.handlePieceFail(context.Background(), peer, piece) + svc.handlePieceFailure(context.Background(), peer, piece) assert := assert.New(t) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.Equal(parent.Host.UploadFailedCount.Load(), int64(1)) @@ -2895,7 +2962,7 @@ func TestService_handlePieceFail(t *testing.T) { ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) - svc.handlePieceFail(context.Background(), peer, piece) + svc.handlePieceFailure(context.Background(), peer, piece) assert := assert.New(t) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.True(parent.FSM.Is(resource.PeerStateRunning)) @@ -2924,7 +2991,7 @@ func TestService_handlePieceFail(t *testing.T) { ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) - svc.handlePieceFail(context.Background(), peer, piece) + svc.handlePieceFailure(context.Background(), peer, piece) assert := assert.New(t) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.True(parent.FSM.Is(resource.PeerStateRunning)) @@ -3145,7 +3212,7 @@ func TestService_handlePeerFail(t *testing.T) { child := resource.NewPeer(mockPeerID, mockTask, mockHost) tc.mock(peer, child, scheduler.EXPECT()) - svc.handlePeerFail(context.Background(), peer) + svc.handlePeerFailure(context.Background(), peer) tc.expect(t, peer, child) }) } @@ -3365,7 +3432,7 @@ func TestService_handleTaskFail(t *testing.T) { task := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) tc.mock(task) - svc.handleTaskFail(context.Background(), task, tc.backToSourceErr, tc.seedPeerErr) + svc.handleTaskFailure(context.Background(), task, tc.backToSourceErr, tc.seedPeerErr) tc.expect(t, task) }) }