diff --git a/build/images/dfdaemon/Dockerfile b/build/images/dfdaemon/Dockerfile index 73130432a..0a2bca964 100644 --- a/build/images/dfdaemon/Dockerfile +++ b/build/images/dfdaemon/Dockerfile @@ -38,4 +38,3 @@ COPY --from=health /bin/grpc_health_probe /bin/grpc_health_probe EXPOSE 65001 ENTRYPOINT ["/opt/dragonfly/bin/dfget", "daemon"] - diff --git a/client/daemon/rpcserver/mocks/rpcserver_mock.go b/client/daemon/rpcserver/mocks/rpcserver_mock.go index 83554ef31..dbe22eba2 100644 --- a/client/daemon/rpcserver/mocks/rpcserver_mock.go +++ b/client/daemon/rpcserver/mocks/rpcserver_mock.go @@ -10,6 +10,7 @@ import ( time "time" dfdaemon "d7y.io/api/pkg/apis/dfdaemon/v1" + config "d7y.io/dragonfly/v2/client/config" gomock "github.com/golang/mock/gomock" ) @@ -62,6 +63,18 @@ func (mr *MockServerMockRecorder) Keep() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Keep", reflect.TypeOf((*MockServer)(nil).Keep)) } +// OnNotify mocks base method. +func (m *MockServer) OnNotify(arg0 *config.DynconfigData) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnNotify", arg0) +} + +// OnNotify indicates an expected call of OnNotify. +func (mr *MockServerMockRecorder) OnNotify(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnNotify", reflect.TypeOf((*MockServer)(nil).OnNotify), arg0) +} + // ServeDownload mocks base method. func (m *MockServer) ServeDownload(listener net.Listener) error { m.ctrl.T.Helper() diff --git a/go.mod b/go.mod index 6d2adcc33..4b6b2dbe1 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.20 require ( - d7y.io/api v1.6.8 + d7y.io/api v1.7.5 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 github.com/VividCortex/mysqlerr v1.0.0 diff --git a/go.sum b/go.sum index b9abf9cd8..9213606b6 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api v1.6.8 h1:/oNEZC8FC8P1vPHlzgtJbBQzh5lnf0mZ+9VBx/Nq+iU= -d7y.io/api v1.6.8/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw= +d7y.io/api v1.7.5 h1:JLtbTLAiNom+qT/sQHgzqKApw/tG5MQaTBcsH/Lb2wE= +d7y.io/api v1.7.5/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index 4a3cb1f62..72a12f3a7 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -111,6 +111,13 @@ const ( // PeerOption is a functional option for peer. type PeerOption func(peer *Peer) +// WithAnnouncePeerStream set AnnouncePeerStream for peer. +func WithAnnouncePeerStream(stream schedulerv2.Scheduler_AnnouncePeerServer) PeerOption { + return func(p *Peer) { + p.StoreAnnouncePeerStream(stream) + } +} + // WithPriority set Priority for peer. func WithPriority(priority commonv2.Priority) PeerOption { return func(p *Peer) { @@ -354,7 +361,7 @@ func (p *Peer) DeleteReportPieceResultStream() { // LoadAnnouncePeerStream return the grpc stream of Scheduler_AnnouncePeerServer, // Used only in v2 version of the grpc. func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServer, bool) { - rawStream := p.ReportPieceResultStream.Load() + rawStream := p.AnnouncePeerStream.Load() if rawStream == nil { return nil, false } @@ -365,13 +372,13 @@ func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServe // StoreAnnouncePeerStream set the grpc stream of Scheduler_AnnouncePeerServer, // Used only in v2 version of the grpc. func (p *Peer) StoreAnnouncePeerStream(stream schedulerv2.Scheduler_AnnouncePeerServer) { - p.ReportPieceResultStream.Store(stream) + p.AnnouncePeerStream.Store(stream) } // DeleteAnnouncePeerStream deletes the grpc stream of Scheduler_AnnouncePeerServer, // Used only in v2 version of the grpc. func (p *Peer) DeleteAnnouncePeerStream() { - p.ReportPieceResultStream = &atomic.Value{} + p.AnnouncePeerStream = &atomic.Value{} } // LoadPiece return piece for a key. @@ -515,8 +522,8 @@ func (p *Peer) DownloadFile() ([]byte, error) { return io.ReadAll(resp.Body) } -// GetPriority returns priority of peer. -func (p *Peer) GetPriority(dynconfig config.DynconfigInterface) commonv2.Priority { +// CalculatePriority returns priority of peer. +func (p *Peer) CalculatePriority(dynconfig config.DynconfigInterface) commonv2.Priority { if p.Priority != commonv2.Priority_LEVEL0 { return p.Priority } diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index 4707664f7..90bcc434e 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -49,6 +49,10 @@ var ( ) func TestPeer_NewPeer(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl) + tests := []struct { name string id string @@ -136,6 +140,32 @@ func TestPeer_NewPeer(t *testing.T) { assert.NotNil(peer.Log) }, }, + { + name: "new peer with AnnouncePeerStream", + id: mockPeerID, + options: []PeerOption{WithAnnouncePeerStream(stream)}, + expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) { + assert := assert.New(t) + assert.Equal(peer.ID, mockPeerID) + assert.Nil(peer.Range) + assert.Equal(peer.Priority, commonv2.Priority_LEVEL0) + assert.Empty(peer.Pieces) + assert.Empty(peer.FinishedPieces) + assert.Equal(len(peer.PieceCosts()), 0) + assert.Empty(peer.ReportPieceResultStream) + assert.NotEmpty(peer.AnnouncePeerStream) + assert.Equal(peer.FSM.Current(), PeerStatePending) + assert.EqualValues(peer.Task, mockTask) + assert.EqualValues(peer.Host, mockHost) + assert.Equal(peer.BlockParents.Len(), uint(0)) + assert.Equal(peer.NeedBackToSource.Load(), false) + assert.Equal(peer.IsBackToSource.Load(), false) + assert.NotEqual(peer.PieceUpdatedAt.Load(), 0) + assert.NotEqual(peer.CreatedAt.Load(), 0) + assert.NotEqual(peer.UpdatedAt.Load(), 0) + assert.NotNil(peer.Log) + }, + }, } for _, tc := range tests { @@ -886,7 +916,7 @@ func TestPeer_DownloadFile(t *testing.T) { } } -func TestPeer_GetPriority(t *testing.T) { +func TestPeer_CalculatePriority(t *testing.T) { tests := []struct { name string mock func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) @@ -1009,7 +1039,7 @@ func TestPeer_GetPriority(t *testing.T) { mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.mock(peer, dynconfig.EXPECT()) - tc.expect(t, peer.GetPriority(dynconfig)) + tc.expect(t, peer.CalculatePriority(dynconfig)) }) } } diff --git a/scheduler/resource/seed_peer.go b/scheduler/resource/seed_peer.go index 21bc0d81c..2471a36f0 100644 --- a/scheduler/resource/seed_peer.go +++ b/scheduler/resource/seed_peer.go @@ -24,6 +24,8 @@ import ( "strings" "time" + "go.opentelemetry.io/otel/trace" + cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1" commonv1 "d7y.io/api/pkg/apis/common/v1" commonv2 "d7y.io/api/pkg/apis/common/v2" @@ -33,6 +35,7 @@ import ( "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/rpc/common" + "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/metrics" ) @@ -45,7 +48,7 @@ const ( type SeedPeer interface { // DownloadTask downloads task back-to-source. // Used only in v2 version of the grpc. - DownloadTask(context.Context, *Task) error + DownloadTask(context.Context, *Task, types.HostType) error // TriggerTask triggers the seed peer to download task. // Used only in v1 version of the grpc. @@ -80,14 +83,17 @@ func newSeedPeer(client SeedPeerClient, peerManager PeerManager, hostManager Hos // TODO Implement DownloadTask // DownloadTask downloads task back-to-source. // Used only in v2 version of the grpc. -func (s *seedPeer) DownloadTask(ctx context.Context, task *Task) error { +func (s *seedPeer) DownloadTask(ctx context.Context, task *Task, hostType types.HostType) error { + // ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))) + // defer cancel() + return nil } // TriggerTask triggers the seed peer to download task. // Used only in v1 version of the grpc. func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task) (*Peer, *schedulerv1.PeerResult, error) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))) defer cancel() urlMeta := &commonv1.UrlMeta{ diff --git a/scheduler/resource/seed_peer_mock.go b/scheduler/resource/seed_peer_mock.go index 85ecb4d50..d6731d5ed 100644 --- a/scheduler/resource/seed_peer_mock.go +++ b/scheduler/resource/seed_peer_mock.go @@ -10,6 +10,7 @@ import ( v1 "d7y.io/api/pkg/apis/scheduler/v1" http "d7y.io/dragonfly/v2/pkg/net/http" + types "d7y.io/dragonfly/v2/pkg/types" gomock "github.com/golang/mock/gomock" ) @@ -51,17 +52,17 @@ func (mr *MockSeedPeerMockRecorder) Client() *gomock.Call { } // DownloadTask mocks base method. -func (m *MockSeedPeer) DownloadTask(arg0 context.Context, arg1 *Task) error { +func (m *MockSeedPeer) DownloadTask(arg0 context.Context, arg1 *Task, arg2 types.HostType) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DownloadTask", arg0, arg1) + ret := m.ctrl.Call(m, "DownloadTask", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // DownloadTask indicates an expected call of DownloadTask. -func (mr *MockSeedPeerMockRecorder) DownloadTask(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockSeedPeerMockRecorder) DownloadTask(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockSeedPeer)(nil).DownloadTask), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockSeedPeer)(nil).DownloadTask), arg0, arg1, arg2) } // Stop mocks base method. diff --git a/scheduler/scheduling/mocks/scheduling_mock.go b/scheduler/scheduling/mocks/scheduling_mock.go index ed48452af..591869705 100644 --- a/scheduler/scheduling/mocks/scheduling_mock.go +++ b/scheduler/scheduling/mocks/scheduling_mock.go @@ -51,6 +51,21 @@ func (mr *MockSchedulingMockRecorder) FindCandidateParents(arg0, arg1, arg2 inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindCandidateParents", reflect.TypeOf((*MockScheduling)(nil).FindCandidateParents), arg0, arg1, arg2) } +// FindSuccessParent mocks base method. +func (m *MockScheduling) FindSuccessParent(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) (*resource.Peer, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindSuccessParent", arg0, arg1, arg2) + ret0, _ := ret[0].(*resource.Peer) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// FindSuccessParent indicates an expected call of FindSuccessParent. +func (mr *MockSchedulingMockRecorder) FindSuccessParent(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindSuccessParent", reflect.TypeOf((*MockScheduling)(nil).FindSuccessParent), arg0, arg1, arg2) +} + // ScheduleCandidateParents mocks base method. func (m *MockScheduling) ScheduleCandidateParents(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) error { m.ctrl.T.Helper() diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index 83a66ae9b..d32768a11 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -52,6 +52,9 @@ type Scheduling interface { // FindCandidateParents finds candidate parents for the peer. FindCandidateParents(context.Context, *resource.Peer, set.SafeSet[string]) ([]*resource.Peer, bool) + + // FindSuccessParent finds success parent for the peer. + FindSuccessParent(context.Context, *resource.Peer, set.SafeSet[string]) (*resource.Peer, bool) } type scheduling struct { @@ -196,7 +199,7 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc // Send NormalTaskResponse to peer. peer.Log.Info("send NormalTaskResponse") if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ - Response: constructSuccessNormalTaskResponse(s.dynconfig, candidateParents), + Response: ConstructSuccessNormalTaskResponse(s.dynconfig, candidateParents), }); err != nil { peer.Log.Error(err) return status.Error(codes.FailedPrecondition, err.Error()) @@ -358,7 +361,7 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer // Send PeerPacket to peer. peer.Log.Info("send PeerPacket to peer") - if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, candidateParents[0], candidateParents[1:])); err != nil { + if err := stream.Send(ConstructSuccessPeerPacket(s.dynconfig, peer, candidateParents[0], candidateParents[1:])); err != nil { n++ peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error()) @@ -409,21 +412,52 @@ func (s *scheduling) FindCandidateParents(ctx context.Context, peer *resource.Pe }, ) - // Add edges between candidate parent and peer. var parentIDs []string for _, candidateParent := range candidateParents { parentIDs = append(parentIDs, candidateParent.ID) } - if len(candidateParents) <= 0 { - peer.Log.Info("can not add edges for vertex") - return []*resource.Peer{}, false - } - peer.Log.Infof("scheduling candidate parents is %#v", parentIDs) return candidateParents, true } +// FindSuccessParent finds success parent for the peer. +func (s *scheduling) FindSuccessParent(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) (*resource.Peer, bool) { + // Only PeerStateRunning peers need to be rescheduled, + // and other states including the PeerStateBackToSource indicate that + // they have been scheduled. + if !peer.FSM.Is(resource.PeerStateRunning) { + peer.Log.Infof("peer state is %s, can not schedule parent", peer.FSM.Current()) + return nil, false + } + + // Find the candidate parent that can be scheduled. + candidateParents := s.filterCandidateParents(peer, blocklist) + if len(candidateParents) == 0 { + peer.Log.Info("can not find candidate parents") + return nil, false + } + + var successParents []*resource.Peer + for _, candidateParent := range candidateParents { + if candidateParent.FSM.Is(resource.PeerStateSucceeded) { + successParents = append(successParents, candidateParent) + } + } + + // Sort candidate parents by evaluation score. + taskTotalPieceCount := peer.Task.TotalPieceCount.Load() + sort.Slice( + successParents, + func(i, j int) bool { + return s.evaluator.Evaluate(successParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(successParents[j], peer, taskTotalPieceCount) + }, + ) + + peer.Log.Infof("scheduling success parent is %s", successParents[0].ID) + return successParents[0], true +} + // filterCandidateParents filters the candidate parents that can be scheduled. func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.SafeSet[string]) []*resource.Peer { filterParentLimit := config.DefaultSchedulerFilterParentLimit @@ -509,9 +543,177 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S return candidateParents } -// constructSuccessNormalTaskResponse constructs scheduling successful response of the normal task. +// ConstructSuccessSmallTaskResponse constructs scheduling successful response of the small task. // Used only in v2 version of the grpc. -func constructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse { +func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedulerv2.AnnouncePeerResponse_SmallTaskResponse { + parent := &commonv2.Peer{ + Id: candidateParent.ID, + Priority: candidateParent.Priority, + Cost: durationpb.New(candidateParent.Cost.Load()), + State: candidateParent.FSM.Current(), + NeedBackToSource: candidateParent.NeedBackToSource.Load(), + CreatedAt: timestamppb.New(candidateParent.CreatedAt.Load()), + UpdatedAt: timestamppb.New(candidateParent.UpdatedAt.Load()), + } + + // Set range to parent. + if candidateParent.Range != nil { + parent.Range = &commonv2.Range{ + Start: candidateParent.Range.Start, + Length: candidateParent.Range.Length, + } + } + + // Set pieces to parent. + candidateParent.Pieces.Range(func(key, value any) bool { + candidateParentPiece, ok := value.(*resource.Piece) + if !ok { + candidateParent.Log.Errorf("invalid piece %s %#v", key, value) + return true + } + + piece := &commonv2.Piece{ + Number: candidateParentPiece.Number, + ParentId: candidateParentPiece.ParentID, + Offset: candidateParentPiece.Offset, + Length: candidateParentPiece.Length, + TrafficType: candidateParentPiece.TrafficType, + Cost: durationpb.New(candidateParentPiece.Cost), + CreatedAt: timestamppb.New(candidateParentPiece.CreatedAt), + } + + if candidateParentPiece.Digest != nil { + piece.Digest = candidateParentPiece.Digest.String() + } + + parent.Pieces = append(parent.Pieces, piece) + return true + }) + + // Set task to parent. + parent.Task = &commonv2.Task{ + Id: candidateParent.Task.ID, + Type: candidateParent.Task.Type, + Url: candidateParent.Task.URL, + Tag: candidateParent.Task.Tag, + Application: candidateParent.Task.Application, + Filters: candidateParent.Task.Filters, + Header: candidateParent.Task.Header, + PieceLength: candidateParent.Task.PieceLength, + ContentLength: candidateParent.Task.ContentLength.Load(), + PieceCount: candidateParent.Task.TotalPieceCount.Load(), + SizeScope: candidateParent.Task.SizeScope(), + State: candidateParent.Task.FSM.Current(), + PeerCount: int32(candidateParent.Task.PeerCount()), + CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()), + UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()), + } + + // Set digest to parent task. + if candidateParent.Task.Digest != nil { + parent.Task.Digest = candidateParent.Task.Digest.String() + } + + // Set pieces to parent task. + candidateParent.Task.Pieces.Range(func(key, value any) bool { + taskPiece, ok := value.(*resource.Piece) + if !ok { + candidateParent.Task.Log.Errorf("invalid piece %s %#v", key, value) + return true + } + + piece := &commonv2.Piece{ + Number: taskPiece.Number, + ParentId: taskPiece.ParentID, + Offset: taskPiece.Offset, + Length: taskPiece.Length, + TrafficType: taskPiece.TrafficType, + Cost: durationpb.New(taskPiece.Cost), + CreatedAt: timestamppb.New(taskPiece.CreatedAt), + } + + if taskPiece.Digest != nil { + piece.Digest = taskPiece.Digest.String() + } + + parent.Task.Pieces = append(parent.Task.Pieces, piece) + return true + }) + + // Set host to parent. + parent.Host = &commonv2.Host{ + Id: candidateParent.Host.ID, + Type: uint32(candidateParent.Host.Type), + Hostname: candidateParent.Host.Hostname, + Ip: candidateParent.Host.IP, + Port: candidateParent.Host.Port, + DownloadPort: candidateParent.Host.DownloadPort, + Os: candidateParent.Host.OS, + Platform: candidateParent.Host.Platform, + PlatformFamily: candidateParent.Host.PlatformFamily, + PlatformVersion: candidateParent.Host.PlatformVersion, + KernelVersion: candidateParent.Host.KernelVersion, + Cpu: &commonv2.CPU{ + LogicalCount: candidateParent.Host.CPU.LogicalCount, + PhysicalCount: candidateParent.Host.CPU.PhysicalCount, + Percent: candidateParent.Host.CPU.Percent, + ProcessPercent: candidateParent.Host.CPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: candidateParent.Host.CPU.Times.User, + System: candidateParent.Host.CPU.Times.System, + Idle: candidateParent.Host.CPU.Times.Idle, + Nice: candidateParent.Host.CPU.Times.Nice, + Iowait: candidateParent.Host.CPU.Times.Iowait, + Irq: candidateParent.Host.CPU.Times.Irq, + Softirq: candidateParent.Host.CPU.Times.Softirq, + Steal: candidateParent.Host.CPU.Times.Steal, + Guest: candidateParent.Host.CPU.Times.Guest, + GuestNice: candidateParent.Host.CPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: candidateParent.Host.Memory.Total, + Available: candidateParent.Host.Memory.Available, + Used: candidateParent.Host.Memory.Used, + UsedPercent: candidateParent.Host.Memory.UsedPercent, + ProcessUsedPercent: candidateParent.Host.Memory.ProcessUsedPercent, + Free: candidateParent.Host.Memory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount, + UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount, + SecurityDomain: candidateParent.Host.Network.SecurityDomain, + Location: candidateParent.Host.Network.Location, + Idc: candidateParent.Host.Network.IDC, + }, + Disk: &commonv2.Disk{ + Total: candidateParent.Host.Disk.Total, + Free: candidateParent.Host.Disk.Free, + Used: candidateParent.Host.Disk.Used, + UsedPercent: candidateParent.Host.Disk.UsedPercent, + InodesTotal: candidateParent.Host.Disk.InodesTotal, + InodesUsed: candidateParent.Host.Disk.InodesUsed, + InodesFree: candidateParent.Host.Disk.InodesFree, + InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: candidateParent.Host.Build.GitVersion, + GitCommit: candidateParent.Host.Build.GitCommit, + GoVersion: candidateParent.Host.Build.GoVersion, + Platform: candidateParent.Host.Build.Platform, + }, + } + + return &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{ + SmallTaskResponse: &schedulerv2.SmallTaskResponse{ + CandidateParent: parent, + }, + } +} + +// ConstructSuccessNormalTaskResponse constructs scheduling successful response of the normal task. +// Used only in v2 version of the grpc. +func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse { concurrentPieceCount := config.DefaultPeerConcurrentPieceCount if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 { concurrentPieceCount = int(config.ConcurrentPieceCount) @@ -688,9 +890,9 @@ func constructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can } } -// constructSuccessPeerPacket constructs peer successful packet. +// ConstructSuccessPeerPacket constructs peer successful packet. // Used only in v1 version of the grpc. -func constructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket { +func ConstructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket { concurrentPieceCount := config.DefaultPeerConcurrentPieceCount if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 { concurrentPieceCount = int(config.ConcurrentPieceCount) diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 85f79f18a..af53ef9b5 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -994,13 +994,420 @@ func TestScheduling_FindCandidateParents(t *testing.T) { blocklist := set.NewSafeSet[string]() tc.mock(peer, mockPeers, blocklist, dynconfig.EXPECT()) scheduling := New(mockSchedulerConfig, dynconfig, mockPluginDir) - parent, found := scheduling.FindCandidateParents(context.Background(), peer, blocklist) + parents, found := scheduling.FindCandidateParents(context.Background(), peer, blocklist) + tc.expect(t, peer, mockPeers, parents, found) + }) + } +} + +func TestScheduling_FindSuccessParent(t *testing.T) { + tests := []struct { + name string + mock func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) + expect func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) + }{ + { + name: "task peers is empty", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.False(ok) + }, + }, + { + name: "task contains only one peer and peer is itself", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + peer.Task.StorePeer(peer) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.False(ok) + }, + }, + { + name: "peer is in blocklist", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + blocklist.Add(mockPeers[0].ID) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.False(ok) + }, + }, + { + name: "peer is bad node", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + mockPeers[0].FSM.SetState(resource.PeerStateFailed) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.False(ok) + }, + }, + { + name: "parent is peer's descendant", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + mockPeers[0].FSM.SetState(resource.PeerStateRunning) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + if err := peer.Task.AddPeerEdge(peer, mockPeers[0]); err != nil { + t.Fatal(err) + } + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.False(ok) + }, + }, + { + name: "parent free upload load is zero", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + mockPeers[0].FSM.SetState(resource.PeerStateRunning) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + mockPeers[0].Host.ConcurrentUploadLimit.Store(0) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.False(ok) + }, + }, + { + name: "find back-to-source parent", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + mockPeers[0].FSM.SetState(resource.PeerStateSucceeded) + mockPeers[1].FSM.SetState(resource.PeerStateSucceeded) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + peer.Task.StorePeer(mockPeers[1]) + peer.Task.BackToSourcePeers.Add(mockPeers[0].ID) + peer.Task.BackToSourcePeers.Add(mockPeers[1].ID) + mockPeers[0].IsBackToSource.Store(true) + mockPeers[1].IsBackToSource.Store(true) + mockPeers[0].FinishedPieces.Set(0) + mockPeers[1].FinishedPieces.Set(0) + mockPeers[1].FinishedPieces.Set(1) + mockPeers[1].FinishedPieces.Set(2) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.True(ok) + assert.Equal(mockPeers[1].ID, parent.ID) + }, + }, + { + name: "find seed peer parent", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + mockPeers[0].FSM.SetState(resource.PeerStateSucceeded) + mockPeers[1].FSM.SetState(resource.PeerStateSucceeded) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + peer.Task.StorePeer(mockPeers[1]) + peer.Task.StorePeer(mockPeers[2]) + mockPeers[0].Host.Type = pkgtypes.HostTypeSuperSeed + mockPeers[1].Host.Type = pkgtypes.HostTypeSuperSeed + mockPeers[0].FinishedPieces.Set(0) + mockPeers[1].FinishedPieces.Set(0) + mockPeers[1].FinishedPieces.Set(1) + mockPeers[1].FinishedPieces.Set(2) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.True(ok) + assert.Equal(mockPeers[1].ID, parent.ID) + }, + }, + { + name: "find parent with ancestor", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + mockPeers[0].FSM.SetState(resource.PeerStateSucceeded) + mockPeers[1].FSM.SetState(resource.PeerStateSucceeded) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + peer.Task.StorePeer(mockPeers[1]) + peer.Task.StorePeer(mockPeers[2]) + if err := peer.Task.AddPeerEdge(mockPeers[2], mockPeers[0]); err != nil { + t.Fatal(err) + } + + if err := peer.Task.AddPeerEdge(mockPeers[2], mockPeers[1]); err != nil { + t.Fatal(err) + } + + mockPeers[0].FinishedPieces.Set(0) + mockPeers[1].FinishedPieces.Set(0) + mockPeers[1].FinishedPieces.Set(1) + mockPeers[1].FinishedPieces.Set(2) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.True(ok) + assert.Equal(mockPeers[1].ID, parent.ID) + }, + }, + { + name: "find parent with same host", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + mockPeers[0].FSM.SetState(resource.PeerStateSucceeded) + mockPeers[1].FSM.SetState(resource.PeerStateSucceeded) + mockPeers[0].IsBackToSource.Store(true) + mockPeers[1].Host = peer.Host + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + peer.Task.StorePeer(mockPeers[1]) + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.True(ok) + assert.Equal(mockPeers[0].ID, parent.ID) + }, + }, + { + name: "find parent and fetch filterParentLimit from manager dynconfig", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + mockPeers[0].FSM.SetState(resource.PeerStateSucceeded) + mockPeers[1].FSM.SetState(resource.PeerStateSucceeded) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + peer.Task.StorePeer(mockPeers[1]) + peer.Task.BackToSourcePeers.Add(mockPeers[0].ID) + peer.Task.BackToSourcePeers.Add(mockPeers[1].ID) + mockPeers[0].IsBackToSource.Store(true) + mockPeers[1].IsBackToSource.Store(true) + mockPeers[0].FinishedPieces.Set(0) + mockPeers[1].FinishedPieces.Set(0) + mockPeers[1].FinishedPieces.Set(1) + mockPeers[1].FinishedPieces.Set(2) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{ + FilterParentLimit: 3, + }, nil).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.True(ok) + assert.Contains([]string{mockPeers[0].ID, mockPeers[1].ID, peer.ID}, parent.ID) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + peer := resource.NewPeer(mockPeerID, mockTask, mockHost) + + var mockPeers []*resource.Peer + for i := 0; i < 11; i++ { + mockHost := resource.NewHost( + idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost) + mockPeers = append(mockPeers, peer) + } + + blocklist := set.NewSafeSet[string]() + tc.mock(peer, mockPeers, blocklist, dynconfig.EXPECT()) + scheduling := New(mockSchedulerConfig, dynconfig, mockPluginDir) + parent, found := scheduling.FindSuccessParent(context.Background(), peer, blocklist) tc.expect(t, peer, mockPeers, parent, found) }) } } -func TestScheduling_constructSuccessNormalTaskResponse(t *testing.T) { +func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) { + tests := []struct { + name string + expect func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer) + }{ + { + name: "construct success", + expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer) { + assert := assert.New(t) + assert.EqualValues(resp, &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{ + SmallTaskResponse: &schedulerv2.SmallTaskResponse{ + CandidateParent: &commonv2.Peer{ + Id: candidateParent.ID, + Range: &commonv2.Range{ + Start: candidateParent.Range.Start, + Length: candidateParent.Range.Length, + }, + Priority: candidateParent.Priority, + Pieces: []*commonv2.Piece{ + { + Number: mockPiece.Number, + ParentId: mockPiece.ParentID, + Offset: mockPiece.Offset, + Length: mockPiece.Length, + Digest: mockPiece.Digest.String(), + TrafficType: mockPiece.TrafficType, + Cost: durationpb.New(mockPiece.Cost), + CreatedAt: timestamppb.New(mockPiece.CreatedAt), + }, + }, + Cost: durationpb.New(candidateParent.Cost.Load()), + State: candidateParent.FSM.Current(), + Task: &commonv2.Task{ + Id: candidateParent.Task.ID, + Type: candidateParent.Task.Type, + Url: candidateParent.Task.URL, + Digest: candidateParent.Task.Digest.String(), + Tag: candidateParent.Task.Tag, + Application: candidateParent.Task.Application, + Filters: candidateParent.Task.Filters, + Header: candidateParent.Task.Header, + PieceLength: candidateParent.Task.PieceLength, + ContentLength: candidateParent.Task.ContentLength.Load(), + PieceCount: candidateParent.Task.TotalPieceCount.Load(), + SizeScope: candidateParent.Task.SizeScope(), + Pieces: []*commonv2.Piece{ + { + Number: mockPiece.Number, + ParentId: mockPiece.ParentID, + Offset: mockPiece.Offset, + Length: mockPiece.Length, + Digest: mockPiece.Digest.String(), + TrafficType: mockPiece.TrafficType, + Cost: durationpb.New(mockPiece.Cost), + CreatedAt: timestamppb.New(mockPiece.CreatedAt), + }, + }, + State: candidateParent.Task.FSM.Current(), + PeerCount: int32(candidateParent.Task.PeerCount()), + CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()), + UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()), + }, + Host: &commonv2.Host{ + Id: candidateParent.Host.ID, + Type: uint32(candidateParent.Host.Type), + Hostname: candidateParent.Host.Hostname, + Ip: candidateParent.Host.IP, + Port: candidateParent.Host.Port, + DownloadPort: candidateParent.Host.DownloadPort, + Os: candidateParent.Host.OS, + Platform: candidateParent.Host.Platform, + PlatformFamily: candidateParent.Host.PlatformFamily, + PlatformVersion: candidateParent.Host.PlatformVersion, + KernelVersion: candidateParent.Host.KernelVersion, + Cpu: &commonv2.CPU{ + LogicalCount: candidateParent.Host.CPU.LogicalCount, + PhysicalCount: candidateParent.Host.CPU.PhysicalCount, + Percent: candidateParent.Host.CPU.Percent, + ProcessPercent: candidateParent.Host.CPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: candidateParent.Host.CPU.Times.User, + System: candidateParent.Host.CPU.Times.System, + Idle: candidateParent.Host.CPU.Times.Idle, + Nice: candidateParent.Host.CPU.Times.Nice, + Iowait: candidateParent.Host.CPU.Times.Iowait, + Irq: candidateParent.Host.CPU.Times.Irq, + Softirq: candidateParent.Host.CPU.Times.Softirq, + Steal: candidateParent.Host.CPU.Times.Steal, + Guest: candidateParent.Host.CPU.Times.Guest, + GuestNice: candidateParent.Host.CPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: candidateParent.Host.Memory.Total, + Available: candidateParent.Host.Memory.Available, + Used: candidateParent.Host.Memory.Used, + UsedPercent: candidateParent.Host.Memory.UsedPercent, + ProcessUsedPercent: candidateParent.Host.Memory.ProcessUsedPercent, + Free: candidateParent.Host.Memory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount, + UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount, + SecurityDomain: candidateParent.Host.Network.SecurityDomain, + Location: candidateParent.Host.Network.Location, + Idc: candidateParent.Host.Network.IDC, + }, + Disk: &commonv2.Disk{ + Total: candidateParent.Host.Disk.Total, + Free: candidateParent.Host.Disk.Free, + Used: candidateParent.Host.Disk.Used, + UsedPercent: candidateParent.Host.Disk.UsedPercent, + InodesTotal: candidateParent.Host.Disk.InodesTotal, + InodesUsed: candidateParent.Host.Disk.InodesUsed, + InodesFree: candidateParent.Host.Disk.InodesFree, + InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: candidateParent.Host.Build.GitVersion, + GitCommit: candidateParent.Host.Build.GitCommit, + GoVersion: candidateParent.Host.Build.GoVersion, + Platform: candidateParent.Host.Build.Platform, + }, + }, + NeedBackToSource: candidateParent.NeedBackToSource.Load(), + CreatedAt: timestamppb.New(candidateParent.CreatedAt.Load()), + UpdatedAt: timestamppb.New(candidateParent.UpdatedAt.Load()), + }, + }, + }) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + candidateParent := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost, resource.WithRange(nethttp.Range{ + Start: 1, + Length: 10, + })) + candidateParent.StorePiece(&mockPiece) + candidateParent.Task.StorePiece(&mockPiece) + + tc.expect(t, ConstructSuccessSmallTaskResponse(candidateParent), candidateParent) + }) + } +} + +func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { tests := []struct { name string mock func(md *configmocks.MockDynconfigInterfaceMockRecorder) @@ -1293,12 +1700,12 @@ func TestScheduling_constructSuccessNormalTaskResponse(t *testing.T) { candidateParents[0].Task.StorePiece(&mockPiece) tc.mock(dynconfig.EXPECT()) - tc.expect(t, constructSuccessNormalTaskResponse(dynconfig, candidateParents), candidateParents) + tc.expect(t, ConstructSuccessNormalTaskResponse(dynconfig, candidateParents), candidateParents) }) } } -func TestScheduling_constructSuccessPeerPacket(t *testing.T) { +func TestScheduling_ConstructSuccessPeerPacket(t *testing.T) { tests := []struct { name string mock func(md *configmocks.MockDynconfigInterfaceMockRecorder) @@ -1377,7 +1784,7 @@ func TestScheduling_constructSuccessPeerPacket(t *testing.T) { candidateParents := []*resource.Peer{resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost)} tc.mock(dynconfig.EXPECT()) - tc.expect(t, constructSuccessPeerPacket(dynconfig, peer, parent, candidateParents), parent, candidateParents) + tc.expect(t, ConstructSuccessPeerPacket(dynconfig, peer, parent, candidateParents), parent, candidateParents) }) } } diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index f2ff4dd57..34e38653f 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -25,7 +25,6 @@ import ( "strings" "time" - "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/status" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -673,7 +672,7 @@ func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, priority = req.UrlMeta.Priority } else { // Compatible with v1 version of priority enum. - priority = types.PriorityV2ToV1(peer.GetPriority(dynconfig)) + priority = types.PriorityV2ToV1(peer.CalculatePriority(dynconfig)) } peer.Log.Infof("peer priority is %d", priority) @@ -714,8 +713,6 @@ func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, // triggerSeedPeerTask starts to trigger seed peer task. func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *resource.Task) { - ctx = trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)) - task.Log.Info("trigger seed peer") peer, endOfPiece, err := v.resource.SeedPeer().TriggerTask(ctx, rg, task) if err != nil { diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index febcb4d96..ce6a4a111 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -31,6 +31,9 @@ import ( schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/container/set" + "d7y.io/dragonfly/v2/pkg/digest" + "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/resource" @@ -100,37 +103,46 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error switch announcePeerRequest := req.GetRequest().(type) { case *schedulerv2.AnnouncePeerRequest_RegisterPeerRequest: logger.Infof("receive AnnouncePeerRequest_RegisterPeerRequest: %#v", announcePeerRequest.RegisterPeerRequest.Download) - if err := v.handleRegisterPeerRequest(req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterPeerRequest); err != nil { + if err := v.handleRegisterPeerRequest(ctx, stream, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterPeerRequest); err != nil { logger.Error(err) return err } + case *schedulerv2.AnnouncePeerRequest_RegisterSeedPeerRequest: + logger.Infof("receive AnnouncePeerRequest_RegisterSeedPeerRequest: %#v", announcePeerRequest.RegisterSeedPeerRequest.Download) + v.handleRegisterSeedPeerRequest(ctx, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterSeedPeerRequest) case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest: logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest) - v.handleDownloadPeerStartedRequest(announcePeerRequest.DownloadPeerStartedRequest) + v.handleDownloadPeerStartedRequest(ctx, announcePeerRequest.DownloadPeerStartedRequest) case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceStartedRequest) - v.handleDownloadPeerBackToSourceStartedRequest(announcePeerRequest.DownloadPeerBackToSourceStartedRequest) + v.handleDownloadPeerBackToSourceStartedRequest(ctx, announcePeerRequest.DownloadPeerBackToSourceStartedRequest) + case *schedulerv2.AnnouncePeerRequest_DownloadSeedPeerBackToSourceStartedRequest: + logger.Infof("receive AnnouncePeerRequest_DownloadSeedPeerBackToSourceStartedRequest: %#v", announcePeerRequest.DownloadSeedPeerBackToSourceStartedRequest) + v.handleDownloadSeedPeerBackToSourceStartedRequest(ctx, announcePeerRequest.DownloadSeedPeerBackToSourceStartedRequest) case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest: logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest) - v.handleDownloadPeerFinishedRequest(announcePeerRequest.DownloadPeerFinishedRequest) + v.handleDownloadPeerFinishedRequest(ctx, announcePeerRequest.DownloadPeerFinishedRequest) case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFinishedRequest) - v.handleDownloadPeerBackToSourceFinishedRequest(announcePeerRequest.DownloadPeerBackToSourceFinishedRequest) + v.handleDownloadPeerBackToSourceFinishedRequest(ctx, announcePeerRequest.DownloadPeerBackToSourceFinishedRequest) + case *schedulerv2.AnnouncePeerRequest_DownloadSeedPeerBackToSourceFinishedRequest: + logger.Infof("receive AnnouncePeerRequest_DownloadSeedPeerBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadSeedPeerBackToSourceFinishedRequest) + v.handleDownloadSeedPeerBackToSourceFinishedRequest(ctx, announcePeerRequest.DownloadSeedPeerBackToSourceFinishedRequest) case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest: logger.Infof("receive AnnouncePeerRequest_DownloadPieceFinishedRequest: %#v", announcePeerRequest.DownloadPieceFinishedRequest) - v.handleDownloadPieceFinishedRequest(announcePeerRequest.DownloadPieceFinishedRequest) + v.handleDownloadPieceFinishedRequest(ctx, announcePeerRequest.DownloadPieceFinishedRequest) case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFinishedRequest) - v.handleDownloadPieceBackToSourceFinishedRequest(announcePeerRequest.DownloadPieceBackToSourceFinishedRequest) + v.handleDownloadPieceBackToSourceFinishedRequest(ctx, announcePeerRequest.DownloadPieceBackToSourceFinishedRequest) case *schedulerv2.AnnouncePeerRequest_DownloadPieceFailedRequest: logger.Infof("receive AnnouncePeerRequest_DownloadPieceFailedRequest: %#v", announcePeerRequest.DownloadPieceFailedRequest) - v.handleDownloadPieceFailedRequest(announcePeerRequest.DownloadPieceFailedRequest) + v.handleDownloadPieceFailedRequest(ctx, announcePeerRequest.DownloadPieceFailedRequest) case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFailedRequest) - v.handleDownloadPieceBackToSourceFailedRequest(announcePeerRequest.DownloadPieceBackToSourceFailedRequest) + v.handleDownloadPieceBackToSourceFailedRequest(ctx, announcePeerRequest.DownloadPieceBackToSourceFailedRequest) case *schedulerv2.AnnouncePeerRequest_SyncPiecesFailedRequest: logger.Infof("receive AnnouncePeerRequest_SyncPiecesFailedRequest: %#v", announcePeerRequest.SyncPiecesFailedRequest) - v.handleSyncPiecesFailedRequest(announcePeerRequest.SyncPiecesFailedRequest) + v.handleSyncPiecesFailedRequest(ctx, announcePeerRequest.SyncPiecesFailedRequest) default: msg := fmt.Sprintf("receive unknow request: %#v", announcePeerRequest) logger.Error(msg) @@ -140,54 +152,63 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error } // TODO Implement function. -// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. -func (v *V2) handleRegisterPeerRequest(hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { - return nil +// handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest. +func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, hostID, taskID, peerID string, req *schedulerv2.RegisterSeedPeerRequest) { } // TODO Implement function. // handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest. -func (v *V2) handleDownloadPeerStartedRequest(req *schedulerv2.DownloadPeerStartedRequest) { +func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, req *schedulerv2.DownloadPeerStartedRequest) { } // TODO Implement function. // handleDownloadPeerBackToSourceStartedRequest handles DownloadPeerBackToSourceStartedRequest of AnnouncePeerRequest. -func (v *V2) handleDownloadPeerBackToSourceStartedRequest(req *schedulerv2.DownloadPeerBackToSourceStartedRequest) { +func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, req *schedulerv2.DownloadPeerBackToSourceStartedRequest) { +} + +// TODO Implement function. +// handleDownloadSeedPeerBackToSourceStartedRequest handles DownloadSeedPeerBackToSourceStartedRequest of AnnouncePeerRequest. +func (v *V2) handleDownloadSeedPeerBackToSourceStartedRequest(ctx context.Context, req *schedulerv2.DownloadSeedPeerBackToSourceStartedRequest) { } // TODO Implement function. // handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest. -func (v *V2) handleDownloadPeerFinishedRequest(req *schedulerv2.DownloadPeerFinishedRequest) { +func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, req *schedulerv2.DownloadPeerFinishedRequest) { } // TODO Implement function. // handleDownloadPeerBackToSourceFinishedRequest handles DownloadPeerBackToSourceFinishedRequest of AnnouncePeerRequest. -func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(req *schedulerv2.DownloadPeerBackToSourceFinishedRequest) { +func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest) { +} + +// TODO Implement function. +// handleDownloadSeedPeerBackToSourceFinishedRequest handles DownloadSeedPeerBackToSourceFinishedRequest of AnnouncePeerRequest. +func (v *V2) handleDownloadSeedPeerBackToSourceFinishedRequest(ctx context.Context, req *schedulerv2.DownloadSeedPeerBackToSourceFinishedRequest) { } // TODO Implement function. // handleDownloadPieceFinishedRequest handles DownloadPieceFinishedRequest of AnnouncePeerRequest. -func (v *V2) handleDownloadPieceFinishedRequest(req *schedulerv2.DownloadPieceFinishedRequest) { +func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, req *schedulerv2.DownloadPieceFinishedRequest) { } // TODO Implement function. // handleDownloadPieceBackToSourceFinishedRequest handles DownloadPieceBackToSourceFinishedRequest of AnnouncePeerRequest. -func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) { +func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) { } // TODO Implement function. // handleDownloadPieceFailedRequest handles DownloadPieceFailedRequest of AnnouncePeerRequest. -func (v *V2) handleDownloadPieceFailedRequest(req *schedulerv2.DownloadPieceFailedRequest) { +func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, req *schedulerv2.DownloadPieceFailedRequest) { } // TODO Implement function. // handleDownloadPieceBackToSourceFailedRequest handles DownloadPieceBackToSourceFailedRequest of AnnouncePeerRequest. -func (v *V2) handleDownloadPieceBackToSourceFailedRequest(req *schedulerv2.DownloadPieceBackToSourceFailedRequest) { +func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, req *schedulerv2.DownloadPieceBackToSourceFailedRequest) { } // TODO Implement function. // handleSyncPiecesFailedRequest handles SyncPiecesFailedRequest of AnnouncePeerRequest. -func (v *V2) handleSyncPiecesFailedRequest(req *schedulerv2.SyncPiecesFailedRequest) { +func (v *V2) handleSyncPiecesFailedRequest(ctx context.Context, req *schedulerv2.SyncPiecesFailedRequest) { } // StatPeer checks information of peer. @@ -644,3 +665,244 @@ func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) e host.LeavePeers() return nil } + +// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. +func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { + // Handle resource included host, task, and peer. + _, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req) + if err != nil { + return status.Error(codes.FailedPrecondition, err.Error()) + } + + // When there are no available peers for a task, the scheduler needs to trigger + // the first task download in the p2p cluster. + blocklist := set.NewSafeSet[string]() + blocklist.Add(peer.ID) + if !task.HasAvailablePeer(blocklist) { + if err := v.downloadTaskBySeedPeer(ctx, peer); err != nil { + return err + } + } + + // Provide different scheduling strategies for different task type. + sizeScope := task.SizeScope() + switch sizeScope { + case commonv2.SizeScope_EMPTY: + // Return an EmptyTaskResponse directly. + peer.Log.Info("scheduling as SizeScope_EMPTY") + stream, loaded := peer.LoadAnnouncePeerStream() + if !loaded { + return status.Error(codes.NotFound, "AnnouncePeerStream not found") + } + + if err := peer.FSM.Event(ctx, resource.PeerEventRegisterEmpty); err != nil { + return status.Errorf(codes.Internal, err.Error()) + } + + if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ + Response: &schedulerv2.AnnouncePeerResponse_EmptyTaskResponse{ + EmptyTaskResponse: &schedulerv2.EmptyTaskResponse{}, + }, + }); err != nil { + peer.Log.Error(err) + return status.Error(codes.Internal, err.Error()) + } + + return nil + case commonv2.SizeScope_TINY: + // If the task.DirectPiece of the task can be reused, the data of + // the task will be included in the TinyTaskResponse. + // If the task.DirectPiece cannot be reused, + // it will be scheduled as a Normal Task. + peer.Log.Info("scheduling as SizeScope_TINY") + if !peer.Task.CanReuseDirectPiece() { + peer.Log.Warnf("can not reuse direct piece %d %d", len(task.DirectPiece), task.ContentLength.Load()) + break + } + + stream, loaded := peer.LoadAnnouncePeerStream() + if !loaded { + return status.Error(codes.NotFound, "AnnouncePeerStream not found") + } + + if err := peer.FSM.Event(ctx, resource.PeerEventRegisterTiny); err != nil { + return status.Error(codes.Internal, err.Error()) + } + + if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ + Response: &schedulerv2.AnnouncePeerResponse_TinyTaskResponse{ + TinyTaskResponse: &schedulerv2.TinyTaskResponse{ + Data: peer.Task.DirectPiece, + }, + }, + }); err != nil { + return status.Error(codes.Internal, err.Error()) + } + + return nil + case commonv2.SizeScope_SMALL: + // If a parent with the state of PeerStateSucceeded can be found in the task, + // its information will be returned. If a parent with the state of + // PeerStateSucceeded cannot be found in the task, + // it will be scheduled as a Normal Task. + peer.Log.Info("scheduling as SizeScope_SMALL") + parent, found := v.scheduling.FindSuccessParent(ctx, peer, set.NewSafeSet[string]()) + if !found { + peer.Log.Warn("candidate parents not found") + break + } + + // Delete inedges of peer. + if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil { + return status.Error(codes.Internal, err.Error()) + } + + // Add edges between success parent and peer. + if err := peer.Task.AddPeerEdge(parent, peer); err != nil { + return status.Error(codes.Internal, err.Error()) + } + + stream, loaded := peer.LoadAnnouncePeerStream() + if !loaded { + return status.Error(codes.NotFound, "AnnouncePeerStream not found") + } + + if err := peer.FSM.Event(ctx, resource.PeerEventRegisterSmall); err != nil { + return status.Error(codes.Internal, err.Error()) + } + + if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ + Response: scheduling.ConstructSuccessSmallTaskResponse(parent), + }); err != nil { + return status.Error(codes.Internal, err.Error()) + } + + return nil + case commonv2.SizeScope_NORMAL, commonv2.SizeScope_UNKNOW: + default: + return status.Errorf(codes.FailedPrecondition, "invalid size cope %#v", sizeScope) + } + + // Scheduling as a normal task, it will control how peers download tasks + // based on RetryLimit and RetryBackToSourceLimit configurations. + peer.Log.Info("scheduling as SizeScope_NORMAL") + if err := peer.FSM.Event(ctx, resource.PeerEventRegisterNormal); err != nil { + return status.Error(codes.Internal, err.Error()) + } + + if err := v.scheduling.ScheduleCandidateParents(ctx, peer, set.NewSafeSet[string]()); err != nil { + return status.Error(codes.FailedPrecondition, err.Error()) + } + + return nil +} + +// handleResource handles resource included host, task, and peer. +func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) (*resource.Host, *resource.Task, *resource.Peer, error) { + // If the host does not exist and the host address cannot be found, + // it may cause an exception. + host, loaded := v.resource.HostManager().Load(hostID) + if !loaded { + return nil, nil, nil, fmt.Errorf("host %s not found", hostID) + } + + // Store new task or update task. + task, loaded := v.resource.TaskManager().Load(taskID) + if !loaded { + options := []resource.TaskOption{resource.WithPieceLength(req.Download.PieceLength)} + if req.Download.Digest != "" { + d, err := digest.Parse(req.Download.Digest) + if err != nil { + return nil, nil, nil, fmt.Errorf("invalid digest %s", req.Download.Digest) + } + + // If request has invalid digest, then new task with the nil digest. + options = append(options, resource.WithDigest(d)) + } + + task = resource.NewTask(taskID, req.Download.Url, req.Download.Tag, req.Download.Application, req.Download.Type, + req.Download.Filters, req.Download.Header, int32(v.config.Scheduler.BackToSourceCount), options...) + v.resource.TaskManager().Store(task) + } else { + task.URL = req.Download.Url + task.Filters = req.Download.Filters + task.Header = req.Download.Header + } + + // Store new peer or load peer. + peer, loaded := v.resource.PeerManager().Load(peerID) + if !loaded { + options := []resource.PeerOption{resource.WithPriority(req.Download.Priority), resource.WithAnnouncePeerStream(stream)} + if req.Download.Range != nil { + options = append(options, resource.WithRange(http.Range{Start: req.Download.Range.Start, Length: req.Download.Range.Length})) + } + + peer = resource.NewPeer(peerID, task, host, options...) + v.resource.PeerManager().Store(peer) + } + + return host, task, peer, nil +} + +// downloadTaskBySeedPeer downloads task by seed peer. +func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error { + // Trigger the first download task based on different priority levels, + // refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74. + priority := peer.CalculatePriority(v.dynconfig) + peer.Log.Infof("peer priority is %s", priority.String()) + switch priority { + case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0: + // Super peer is first triggered to back-to-source. + if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { + go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { + if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil { + peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) + return + } + }(ctx, peer, types.HostTypeSuperSeed) + break + } + + fallthrough + case commonv2.Priority_LEVEL5: + // Strong peer is first triggered to back-to-source. + if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { + go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { + if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil { + peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) + return + } + }(ctx, peer, types.HostTypeStrongSeed) + break + } + + fallthrough + case commonv2.Priority_LEVEL4: + // Weak peer is first triggered to back-to-source. + if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { + go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { + if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil { + peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) + return + } + }(ctx, peer, types.HostTypeWeakSeed) + break + } + + fallthrough + case commonv2.Priority_LEVEL3: + // When the task is downloaded for the first time, + // the normal peer is first to download back-to-source. + peer.NeedBackToSource.Store(true) + case commonv2.Priority_LEVEL2: + // Peer is first to download back-to-source. + return status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String()) + case commonv2.Priority_LEVEL1: + // Download task is forbidden. + return status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String()) + default: + return status.Errorf(codes.InvalidArgument, "invalid priority %#v", priority) + } + + return nil +} diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 3e5af185c..fe665f340 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -19,7 +19,9 @@ package service import ( "context" "errors" + "fmt" "reflect" + "sync" "testing" "github.com/golang/mock/gomock" @@ -31,13 +33,15 @@ import ( commonv2 "d7y.io/api/pkg/apis/common/v2" schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" + schedulerv2mocks "d7y.io/api/pkg/apis/scheduler/v2/mocks" managertypes "d7y.io/dragonfly/v2/manager/types" + "d7y.io/dragonfly/v2/pkg/types" pkgtypes "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" "d7y.io/dragonfly/v2/scheduler/resource" - "d7y.io/dragonfly/v2/scheduler/scheduling/mocks" + schedulingmocks "d7y.io/dragonfly/v2/scheduler/scheduling/mocks" storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" ) @@ -59,7 +63,7 @@ func TestService_NewV2(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - scheduling := mocks.NewMockScheduling(ctl) + scheduling := schedulingmocks.NewMockScheduling(ctl) resource := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) @@ -224,7 +228,7 @@ func TestServiceV2_StatPeer(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - scheduling := mocks.NewMockScheduling(ctl) + scheduling := schedulingmocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) @@ -295,7 +299,7 @@ func TestServiceV2_LeavePeer(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - scheduling := mocks.NewMockScheduling(ctl) + scheduling := schedulingmocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) @@ -381,7 +385,7 @@ func TestServiceV2_StatTask(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - scheduling := mocks.NewMockScheduling(ctl) + scheduling := schedulingmocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) @@ -820,7 +824,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - scheduling := mocks.NewMockScheduling(ctl) + scheduling := schedulingmocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) @@ -889,7 +893,7 @@ func TestServiceV2_LeaveHost(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - scheduling := mocks.NewMockScheduling(ctl) + scheduling := schedulingmocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) @@ -906,3 +910,976 @@ func TestServiceV2_LeaveHost(t *testing.T) { }) } } + +func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { + tests := []struct { + name string + req *schedulerv2.RegisterPeerRequest + run func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) + }{ + { + name: "host not found", + req: &schedulerv2.RegisterPeerRequest{}, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(nil, false).Times(1), + ) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Errorf(codes.FailedPrecondition, "host %s not found", peer.Host.ID)) + }, + }, + { + name: "can not found available peer and download task failed", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ) + + peer.Priority = commonv2.Priority_LEVEL1 + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String())) + }, + }, + { + name: "size scope is SizeScope_EMPTY and load AnnouncePeerStream failed", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ) + + peer.Task.ContentLength.Store(0) + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.NotFound, "AnnouncePeerStream not found")) + assert.Equal(peer.FSM.Current(), resource.PeerStatePending) + }, + }, + { + name: "size scope is SizeScope_EMPTY and event PeerEventRegisterEmpty failed", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ) + + peer.Task.ContentLength.Store(0) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + peer.FSM.SetState(resource.PeerStateReceivedEmpty) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Errorf(codes.Internal, "event RegisterEmpty inappropriate in current state ReceivedEmpty")) + }, + }, + { + name: "size scope is SizeScope_EMPTY and send EmptyTaskResponse failed", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ + Response: &schedulerv2.AnnouncePeerResponse_EmptyTaskResponse{ + EmptyTaskResponse: &schedulerv2.EmptyTaskResponse{}, + }, + })).Return(errors.New("foo")).Times(1), + ) + + peer.Task.ContentLength.Store(0) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Errorf(codes.Internal, "foo")) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty) + }, + }, + { + name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and event PeerEventRegisterNormal failed", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ) + + peer.Task.ContentLength.Store(1) + peer.Priority = commonv2.Priority_LEVEL6 + peer.FSM.SetState(resource.PeerStateReceivedNormal) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.Internal, "event RegisterNormal inappropriate in current state ReceivedNormal")) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + { + name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and scheduling failed", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1), + ) + + peer.Task.ContentLength.Store(1) + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.FailedPrecondition, "foo")) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + { + name: "size scope is SizeScope_TINY and task can not reuse DirectPiece", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), + ) + + peer.Task.ContentLength.Store(1) + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + { + name: "size scope is SizeScope_SMALL and task can not found success parent", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(1) + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + { + name: "size scope is SizeScope_SMALL and load AnnouncePeerStream failed", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(1) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(seedPeer) + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.NotFound, "AnnouncePeerStream not found")) + assert.Equal(peer.FSM.Current(), resource.PeerStatePending) + }, + }, + { + name: "size scope is SizeScope_SMALL and event PeerEventRegisterSmall failed", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(1) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(seedPeer) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + peer.FSM.SetState(resource.PeerStateReceivedSmall) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.Internal, "event RegisterSmall inappropriate in current state ReceivedSmall")) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) + }, + }, + { + name: "size scope is SizeScope_SMALL and send SmallTaskResponse failed", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), + ma.Send(gomock.Any()).Return(errors.New("foo")).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(1) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(seedPeer) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.Internal, "foo")) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) + }, + }, + { + name: "size scope is SizeScope_SMALL", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), + ma.Send(gomock.Any()).Return(nil).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(1) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(seedPeer) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + + assert := assert.New(t) + assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) + }, + }, + { + name: "size scope is SizeScope_NORMAL", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(2) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(seedPeer) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + + assert := assert.New(t) + assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + { + name: "size scope is SizeScope_UNKNOW", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), + ) + + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := schedulingmocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + hostManager := resource.NewMockHostManager(ctl) + peerManager := resource.NewMockPeerManager(ctl) + taskManager := resource.NewMockTaskManager(ctl) + stream := schedulerv2mocks.NewMockScheduler_AnnouncePeerServer(ctl) + + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + peer := resource.NewPeer(mockPeerID, mockTask, mockHost) + seedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + + tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT()) + }) + } +} + +func TestServiceV2_handleResource(t *testing.T) { + tests := []struct { + name string + req *schedulerv2.RegisterPeerRequest + run func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) + }{ + { + name: "host can not be loaded", + req: &schedulerv2.RegisterPeerRequest{}, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockHost.ID)).Return(nil, false).Times(1), + ) + + assert := assert.New(t) + _, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) + assert.EqualError(err, fmt.Sprintf("host %s not found", mockHost.ID)) + }, + }, + { + name: "task can be loaded", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Url: "foo", + Filters: []string{"bar"}, + Header: map[string]string{"baz": "bas"}, + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(mockPeer.ID)).Return(mockPeer, true).Times(1), + ) + + assert := assert.New(t) + host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) + assert.NoError(err) + assert.EqualValues(host, mockHost) + assert.Equal(task.ID, mockTask.ID) + assert.Equal(task.URL, req.Download.Url) + assert.EqualValues(task.Filters, req.Download.Filters) + assert.EqualValues(task.Header, req.Download.Header) + }, + }, + { + name: "task can not be loaded", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Url: "foo", + Filters: []string{"bar"}, + Header: map[string]string{"baz": "bas"}, + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(mockTask.ID)).Return(nil, false).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Store(gomock.Any()).Return().Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(mockPeer.ID)).Return(mockPeer, true).Times(1), + ) + + assert := assert.New(t) + host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) + assert.NoError(err) + assert.EqualValues(host, mockHost) + assert.Equal(task.ID, mockTask.ID) + assert.Equal(task.Digest.String(), req.Download.Digest) + assert.Equal(task.URL, req.Download.Url) + assert.EqualValues(task.Filters, req.Download.Filters) + assert.EqualValues(task.Header, req.Download.Header) + }, + }, + { + name: "invalid digest", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Digest: "foo", + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(mockTask.ID)).Return(nil, false).Times(1), + ) + + assert := assert.New(t) + _, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) + assert.EqualError(err, "invalid digest foo") + }, + }, + { + name: "peer can be loaded", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Url: "foo", + Filters: []string{"bar"}, + Header: map[string]string{"baz": "bas"}, + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(mockPeer.ID)).Return(mockPeer, true).Times(1), + ) + + assert := assert.New(t) + host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) + assert.NoError(err) + assert.EqualValues(host, mockHost) + assert.Equal(task.ID, mockTask.ID) + assert.Equal(task.Digest.String(), req.Download.Digest) + assert.Equal(task.URL, req.Download.Url) + assert.EqualValues(task.Filters, req.Download.Filters) + assert.EqualValues(task.Header, req.Download.Header) + assert.EqualValues(peer, mockPeer) + }, + }, + { + name: "peer can not be loaded", + req: &schedulerv2.RegisterPeerRequest{ + Download: &commonv2.Download{ + Url: "foo", + Filters: []string{"bar"}, + Header: map[string]string{"baz": "bas"}, + Digest: mockTaskDigest.String(), + Priority: commonv2.Priority_LEVEL1, + Range: &commonv2.Range{ + Start: mockPeerRange.Start, + Length: mockPeerRange.Length, + }, + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Store(gomock.Any()).Return().Times(1), + ) + + assert := assert.New(t) + host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) + assert.NoError(err) + assert.EqualValues(host, mockHost) + assert.Equal(task.ID, mockTask.ID) + assert.Equal(task.Digest.String(), req.Download.Digest) + assert.Equal(task.URL, req.Download.Url) + assert.EqualValues(task.Filters, req.Download.Filters) + assert.EqualValues(task.Header, req.Download.Header) + assert.Equal(peer.ID, mockPeer.ID) + assert.Equal(peer.Priority, req.Download.Priority) + assert.Equal(peer.Range.Start, req.Download.Range.Start) + assert.Equal(peer.Range.Length, req.Download.Range.Length) + assert.NotNil(peer.AnnouncePeerStream) + assert.EqualValues(peer.Host, mockHost) + assert.EqualValues(peer.Task, mockTask) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := schedulingmocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + hostManager := resource.NewMockHostManager(ctl) + taskManager := resource.NewMockTaskManager(ctl) + peerManager := resource.NewMockPeerManager(ctl) + stream := schedulerv2mocks.NewMockScheduler_AnnouncePeerServer(ctl) + + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + + tc.run(t, svc, tc.req, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT()) + }) + } +} + +func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { + tests := []struct { + name string + config config.Config + run func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) + }{ + { + name: "priority is Priority_LEVEL6 and enable seed peer", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + gomock.InOrder( + mr.SeedPeer().Return(seedPeerClient).Times(1), + ms.DownloadTask(gomock.All(), gomock.Any(), types.HostTypeSuperSeed).Do(func(context.Context, *resource.Task, types.HostType) { wg.Done() }).Return(nil).Times(1), + ) + + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), peer)) + assert.False(peer.NeedBackToSource.Load()) + }, + }, + { + name: "priority is Priority_LEVEL6, enable seed peer and download task failed", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + gomock.InOrder( + mr.SeedPeer().Return(seedPeerClient).Times(1), + ms.DownloadTask(gomock.All(), gomock.Any(), types.HostTypeSuperSeed).Do(func(context.Context, *resource.Task, types.HostType) { wg.Done() }).Return(errors.New("foo")).Times(1), + ) + + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), peer)) + assert.False(peer.NeedBackToSource.Load()) + }, + }, + { + name: "priority is Priority_LEVEL6 and disable seed peer", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: false, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), peer)) + assert.True(peer.NeedBackToSource.Load()) + }, + }, + { + name: "priority is Priority_LEVEL5 and enable seed peer", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + gomock.InOrder( + mr.SeedPeer().Return(seedPeerClient).Times(1), + ms.DownloadTask(gomock.All(), gomock.Any(), types.HostTypeStrongSeed).Do(func(context.Context, *resource.Task, types.HostType) { wg.Done() }).Return(nil).Times(1), + ) + + peer.Priority = commonv2.Priority_LEVEL5 + + assert := assert.New(t) + assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), peer)) + assert.False(peer.NeedBackToSource.Load()) + }, + }, + { + name: "priority is Priority_LEVEL5, enable seed peer and download task failed", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + gomock.InOrder( + mr.SeedPeer().Return(seedPeerClient).Times(1), + ms.DownloadTask(gomock.All(), gomock.Any(), types.HostTypeStrongSeed).Do(func(context.Context, *resource.Task, types.HostType) { wg.Done() }).Return(errors.New("foo")).Times(1), + ) + + peer.Priority = commonv2.Priority_LEVEL5 + + assert := assert.New(t) + assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), peer)) + assert.False(peer.NeedBackToSource.Load()) + }, + }, + { + name: "priority is Priority_LEVEL5 and disable seed peer", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: false, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + peer.Priority = commonv2.Priority_LEVEL5 + + assert := assert.New(t) + assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), peer)) + assert.True(peer.NeedBackToSource.Load()) + }, + }, + { + name: "priority is Priority_LEVEL4 and enable seed peer", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + gomock.InOrder( + mr.SeedPeer().Return(seedPeerClient).Times(1), + ms.DownloadTask(gomock.All(), gomock.Any(), types.HostTypeWeakSeed).Do(func(context.Context, *resource.Task, types.HostType) { wg.Done() }).Return(nil).Times(1), + ) + + peer.Priority = commonv2.Priority_LEVEL4 + + assert := assert.New(t) + assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), peer)) + assert.False(peer.NeedBackToSource.Load()) + }, + }, + { + name: "priority is Priority_LEVEL4, enable seed peer and download task failed", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + gomock.InOrder( + mr.SeedPeer().Return(seedPeerClient).Times(1), + ms.DownloadTask(gomock.All(), gomock.Any(), types.HostTypeWeakSeed).Do(func(context.Context, *resource.Task, types.HostType) { wg.Done() }).Return(errors.New("foo")).Times(1), + ) + + peer.Priority = commonv2.Priority_LEVEL4 + + assert := assert.New(t) + assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), peer)) + assert.False(peer.NeedBackToSource.Load()) + }, + }, + { + name: "priority is Priority_LEVEL4 and disable seed peer", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: false, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + peer.Priority = commonv2.Priority_LEVEL4 + + assert := assert.New(t) + assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), peer)) + assert.True(peer.NeedBackToSource.Load()) + }, + }, + { + name: "priority is Priority_LEVEL3", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + peer.Priority = commonv2.Priority_LEVEL3 + + assert := assert.New(t) + assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), peer)) + assert.True(peer.NeedBackToSource.Load()) + }, + }, + { + name: "priority is Priority_LEVEL2", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + peer.Priority = commonv2.Priority_LEVEL2 + + assert := assert.New(t) + assert.ErrorIs(svc.downloadTaskBySeedPeer(context.Background(), peer), status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String())) + }, + }, + { + name: "priority is Priority_LEVEL1", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + peer.Priority = commonv2.Priority_LEVEL1 + + assert := assert.New(t) + assert.ErrorIs(svc.downloadTaskBySeedPeer(context.Background(), peer), status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String())) + }, + }, + { + name: "priority is Priority_LEVEL0", + config: config.Config{ + SeedPeer: config.SeedPeerConfig{ + Enable: true, + }, + }, + run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) { + peer.Priority = commonv2.Priority(100) + + assert := assert.New(t) + assert.ErrorIs(svc.downloadTaskBySeedPeer(context.Background(), peer), status.Errorf(codes.InvalidArgument, "invalid priority %#v", peer.Priority)) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := schedulingmocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + seedPeerClient := resource.NewMockSeedPeer(ctl) + + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + peer := resource.NewPeer(mockPeerID, mockTask, mockHost) + svc := NewV2(&tc.config, res, scheduling, dynconfig, storage) + + tc.run(t, svc, peer, seedPeerClient, res.EXPECT(), seedPeerClient.EXPECT()) + }) + } +}