From 86ce09f53afc93de6ab329f40557e33eb1ac77c7 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 14 Nov 2023 15:07:24 +0800 Subject: [PATCH] feat: remove TinyTaskResponse and SmallTaskResponse message (#2881) Signed-off-by: Gaius --- go.mod | 2 +- go.sum | 4 +- scheduler/scheduling/scheduling.go | 168 ------- scheduler/scheduling/scheduling_test.go | 153 ------- scheduler/service/service_v2.go | 88 +--- scheduler/service/service_v2_test.go | 578 ------------------------ 6 files changed, 4 insertions(+), 989 deletions(-) diff --git a/go.mod b/go.mod index 297604ea2..6c30c1793 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.43 + d7y.io/api/v2 v2.0.45 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index cbc645a6e..565ed6fc1 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api/v2 v2.0.43 h1:4IdL+j1CAJp4QIs71YeItKZV/lzqya98ksxoGVnaQUQ= -d7y.io/api/v2 v2.0.43/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E= +d7y.io/api/v2 v2.0.45 h1:a39URUlu6SpkFeeGxDTnl9QQTn4bHaEdex1ARpZfmAo= +d7y.io/api/v2 v2.0.45/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index 6eb207adf..d717aeeb5 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -544,174 +544,6 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S return candidateParents } -// ConstructSuccessSmallTaskResponse constructs scheduling successful response of the small task. -// Used only in v2 version of the grpc. -func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedulerv2.AnnouncePeerResponse_SmallTaskResponse { - parent := &commonv2.Peer{ - Id: candidateParent.ID, - Priority: candidateParent.Priority, - Cost: durationpb.New(candidateParent.Cost.Load()), - State: candidateParent.FSM.Current(), - NeedBackToSource: candidateParent.NeedBackToSource.Load(), - CreatedAt: timestamppb.New(candidateParent.CreatedAt.Load()), - UpdatedAt: timestamppb.New(candidateParent.UpdatedAt.Load()), - } - - // Set range to parent. - if candidateParent.Range != nil { - parent.Range = &commonv2.Range{ - Start: candidateParent.Range.Start, - Length: candidateParent.Range.Length, - } - } - - // Set pieces to parent. - candidateParent.Pieces.Range(func(key, value any) bool { - candidateParentPiece, ok := value.(*resource.Piece) - if !ok { - candidateParent.Log.Errorf("invalid piece %s %#v", key, value) - return true - } - - piece := &commonv2.Piece{ - Number: candidateParentPiece.Number, - ParentId: &candidateParentPiece.ParentID, - Offset: candidateParentPiece.Offset, - Length: candidateParentPiece.Length, - TrafficType: &candidateParentPiece.TrafficType, - Cost: durationpb.New(candidateParentPiece.Cost), - CreatedAt: timestamppb.New(candidateParentPiece.CreatedAt), - } - - if candidateParentPiece.Digest != nil { - piece.Digest = candidateParentPiece.Digest.String() - } - - parent.Pieces = append(parent.Pieces, piece) - return true - }) - - // Set task to parent. - parent.Task = &commonv2.Task{ - Id: candidateParent.Task.ID, - Type: candidateParent.Task.Type, - Url: candidateParent.Task.URL, - Tag: &candidateParent.Task.Tag, - Application: &candidateParent.Task.Application, - Filters: candidateParent.Task.Filters, - Header: candidateParent.Task.Header, - PieceLength: candidateParent.Task.PieceLength, - ContentLength: candidateParent.Task.ContentLength.Load(), - PieceCount: candidateParent.Task.TotalPieceCount.Load(), - SizeScope: candidateParent.Task.SizeScope(), - State: candidateParent.Task.FSM.Current(), - PeerCount: int32(candidateParent.Task.PeerCount()), - CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()), - UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()), - } - - // Set digest to parent task. - if candidateParent.Task.Digest != nil { - dgst := candidateParent.Task.Digest.String() - parent.Task.Digest = &dgst - } - - // Set pieces to parent task. - candidateParent.Task.Pieces.Range(func(key, value any) bool { - taskPiece, ok := value.(*resource.Piece) - if !ok { - candidateParent.Task.Log.Errorf("invalid piece %s %#v", key, value) - return true - } - - piece := &commonv2.Piece{ - Number: taskPiece.Number, - ParentId: &taskPiece.ParentID, - Offset: taskPiece.Offset, - Length: taskPiece.Length, - TrafficType: &taskPiece.TrafficType, - Cost: durationpb.New(taskPiece.Cost), - CreatedAt: timestamppb.New(taskPiece.CreatedAt), - } - - if taskPiece.Digest != nil { - piece.Digest = taskPiece.Digest.String() - } - - parent.Task.Pieces = append(parent.Task.Pieces, piece) - return true - }) - - // Set host to parent. - parent.Host = &commonv2.Host{ - Id: candidateParent.Host.ID, - Type: uint32(candidateParent.Host.Type), - Hostname: candidateParent.Host.Hostname, - Ip: candidateParent.Host.IP, - Port: candidateParent.Host.Port, - DownloadPort: candidateParent.Host.DownloadPort, - Os: candidateParent.Host.OS, - Platform: candidateParent.Host.Platform, - PlatformFamily: candidateParent.Host.PlatformFamily, - PlatformVersion: candidateParent.Host.PlatformVersion, - KernelVersion: candidateParent.Host.KernelVersion, - Cpu: &commonv2.CPU{ - LogicalCount: candidateParent.Host.CPU.LogicalCount, - PhysicalCount: candidateParent.Host.CPU.PhysicalCount, - Percent: candidateParent.Host.CPU.Percent, - ProcessPercent: candidateParent.Host.CPU.ProcessPercent, - Times: &commonv2.CPUTimes{ - User: candidateParent.Host.CPU.Times.User, - System: candidateParent.Host.CPU.Times.System, - Idle: candidateParent.Host.CPU.Times.Idle, - Nice: candidateParent.Host.CPU.Times.Nice, - Iowait: candidateParent.Host.CPU.Times.Iowait, - Irq: candidateParent.Host.CPU.Times.Irq, - Softirq: candidateParent.Host.CPU.Times.Softirq, - Steal: candidateParent.Host.CPU.Times.Steal, - Guest: candidateParent.Host.CPU.Times.Guest, - GuestNice: candidateParent.Host.CPU.Times.GuestNice, - }, - }, - Memory: &commonv2.Memory{ - Total: candidateParent.Host.Memory.Total, - Available: candidateParent.Host.Memory.Available, - Used: candidateParent.Host.Memory.Used, - UsedPercent: candidateParent.Host.Memory.UsedPercent, - ProcessUsedPercent: candidateParent.Host.Memory.ProcessUsedPercent, - Free: candidateParent.Host.Memory.Free, - }, - Network: &commonv2.Network{ - TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount, - UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount, - Location: &candidateParent.Host.Network.Location, - Idc: &candidateParent.Host.Network.IDC, - }, - Disk: &commonv2.Disk{ - Total: candidateParent.Host.Disk.Total, - Free: candidateParent.Host.Disk.Free, - Used: candidateParent.Host.Disk.Used, - UsedPercent: candidateParent.Host.Disk.UsedPercent, - InodesTotal: candidateParent.Host.Disk.InodesTotal, - InodesUsed: candidateParent.Host.Disk.InodesUsed, - InodesFree: candidateParent.Host.Disk.InodesFree, - InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent, - }, - Build: &commonv2.Build{ - GitVersion: candidateParent.Host.Build.GitVersion, - GitCommit: &candidateParent.Host.Build.GitCommit, - GoVersion: &candidateParent.Host.Build.GoVersion, - Platform: &candidateParent.Host.Build.Platform, - }, - } - - return &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{ - SmallTaskResponse: &schedulerv2.SmallTaskResponse{ - CandidateParent: parent, - }, - } -} - // ConstructSuccessNormalTaskResponse constructs scheduling successful response of the normal task. // Used only in v2 version of the grpc. func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse { diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 6541de84d..4a3808890 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -1291,159 +1291,6 @@ func TestScheduling_FindSuccessParent(t *testing.T) { } } -func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) { - tests := []struct { - name string - expect func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer) - }{ - { - name: "construct success", - expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer) { - dgst := candidateParent.Task.Digest.String() - - assert := assert.New(t) - assert.EqualValues(resp, &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{ - SmallTaskResponse: &schedulerv2.SmallTaskResponse{ - CandidateParent: &commonv2.Peer{ - Id: candidateParent.ID, - Range: &commonv2.Range{ - Start: candidateParent.Range.Start, - Length: candidateParent.Range.Length, - }, - Priority: candidateParent.Priority, - Pieces: []*commonv2.Piece{ - { - Number: mockPiece.Number, - ParentId: &mockPiece.ParentID, - Offset: mockPiece.Offset, - Length: mockPiece.Length, - Digest: mockPiece.Digest.String(), - TrafficType: &mockPiece.TrafficType, - Cost: durationpb.New(mockPiece.Cost), - CreatedAt: timestamppb.New(mockPiece.CreatedAt), - }, - }, - Cost: durationpb.New(candidateParent.Cost.Load()), - State: candidateParent.FSM.Current(), - Task: &commonv2.Task{ - Id: candidateParent.Task.ID, - Type: candidateParent.Task.Type, - Url: candidateParent.Task.URL, - Digest: &dgst, - Tag: &candidateParent.Task.Tag, - Application: &candidateParent.Task.Application, - Filters: candidateParent.Task.Filters, - Header: candidateParent.Task.Header, - PieceLength: candidateParent.Task.PieceLength, - ContentLength: candidateParent.Task.ContentLength.Load(), - PieceCount: candidateParent.Task.TotalPieceCount.Load(), - SizeScope: candidateParent.Task.SizeScope(), - Pieces: []*commonv2.Piece{ - { - Number: mockPiece.Number, - ParentId: &mockPiece.ParentID, - Offset: mockPiece.Offset, - Length: mockPiece.Length, - Digest: mockPiece.Digest.String(), - TrafficType: &mockPiece.TrafficType, - Cost: durationpb.New(mockPiece.Cost), - CreatedAt: timestamppb.New(mockPiece.CreatedAt), - }, - }, - State: candidateParent.Task.FSM.Current(), - PeerCount: int32(candidateParent.Task.PeerCount()), - CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()), - UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()), - }, - Host: &commonv2.Host{ - Id: candidateParent.Host.ID, - Type: uint32(candidateParent.Host.Type), - Hostname: candidateParent.Host.Hostname, - Ip: candidateParent.Host.IP, - Port: candidateParent.Host.Port, - DownloadPort: candidateParent.Host.DownloadPort, - Os: candidateParent.Host.OS, - Platform: candidateParent.Host.Platform, - PlatformFamily: candidateParent.Host.PlatformFamily, - PlatformVersion: candidateParent.Host.PlatformVersion, - KernelVersion: candidateParent.Host.KernelVersion, - Cpu: &commonv2.CPU{ - LogicalCount: candidateParent.Host.CPU.LogicalCount, - PhysicalCount: candidateParent.Host.CPU.PhysicalCount, - Percent: candidateParent.Host.CPU.Percent, - ProcessPercent: candidateParent.Host.CPU.ProcessPercent, - Times: &commonv2.CPUTimes{ - User: candidateParent.Host.CPU.Times.User, - System: candidateParent.Host.CPU.Times.System, - Idle: candidateParent.Host.CPU.Times.Idle, - Nice: candidateParent.Host.CPU.Times.Nice, - Iowait: candidateParent.Host.CPU.Times.Iowait, - Irq: candidateParent.Host.CPU.Times.Irq, - Softirq: candidateParent.Host.CPU.Times.Softirq, - Steal: candidateParent.Host.CPU.Times.Steal, - Guest: candidateParent.Host.CPU.Times.Guest, - GuestNice: candidateParent.Host.CPU.Times.GuestNice, - }, - }, - Memory: &commonv2.Memory{ - Total: candidateParent.Host.Memory.Total, - Available: candidateParent.Host.Memory.Available, - Used: candidateParent.Host.Memory.Used, - UsedPercent: candidateParent.Host.Memory.UsedPercent, - ProcessUsedPercent: candidateParent.Host.Memory.ProcessUsedPercent, - Free: candidateParent.Host.Memory.Free, - }, - Network: &commonv2.Network{ - TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount, - UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount, - Location: &candidateParent.Host.Network.Location, - Idc: &candidateParent.Host.Network.IDC, - }, - Disk: &commonv2.Disk{ - Total: candidateParent.Host.Disk.Total, - Free: candidateParent.Host.Disk.Free, - Used: candidateParent.Host.Disk.Used, - UsedPercent: candidateParent.Host.Disk.UsedPercent, - InodesTotal: candidateParent.Host.Disk.InodesTotal, - InodesUsed: candidateParent.Host.Disk.InodesUsed, - InodesFree: candidateParent.Host.Disk.InodesFree, - InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent, - }, - Build: &commonv2.Build{ - GitVersion: candidateParent.Host.Build.GitVersion, - GitCommit: &candidateParent.Host.Build.GitCommit, - GoVersion: &candidateParent.Host.Build.GoVersion, - Platform: &candidateParent.Host.Build.Platform, - }, - }, - NeedBackToSource: candidateParent.NeedBackToSource.Load(), - CreatedAt: timestamppb.New(candidateParent.CreatedAt.Load()), - UpdatedAt: timestamppb.New(candidateParent.UpdatedAt.Load()), - }, - }, - }) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - mockHost := resource.NewHost( - mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, - mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) - candidateParent := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost, resource.WithRange(nethttp.Range{ - Start: 1, - Length: 10, - })) - candidateParent.StorePiece(&mockPiece) - candidateParent.Task.StorePiece(&mockPiece) - - tc.expect(t, ConstructSuccessSmallTaskResponse(candidateParent), candidateParent) - }) - } -} - func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { tests := []struct { name string diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 87a16354f..dd89a711b 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -993,23 +993,6 @@ func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context, if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil { return status.Error(codes.Internal, err.Error()) } - - // If the task size scope is tiny, scheduler needs to download the tiny file from peer and - // store the data in task DirectPiece. - if peer.Task.SizeScope() == commonv2.SizeScope_TINY { - data, err := peer.DownloadTinyFile() - if err != nil { - peer.Log.Errorf("download failed: %s", err.Error()) - return nil - } - - if len(data) != int(peer.Task.ContentLength.Load()) { - peer.Log.Errorf("data length %d is not equal content length %d", len(data), peer.Task.ContentLength.Load()) - return nil - } - - peer.Task.DirectPiece = data - } } // Collect DownloadPeerCount and DownloadPeerDuration metrics. @@ -1388,76 +1371,7 @@ func (v *V2) schedule(ctx context.Context, peer *resource.Peer) error { } return nil - case commonv2.SizeScope_TINY: - // If the task.DirectPiece of the task can be reused, the data of - // the task will be included in the TinyTaskResponse. - // If the task.DirectPiece cannot be reused, - // it will be scheduled as a Normal Task. - peer.Log.Info("scheduling as SizeScope_TINY") - if !peer.Task.CanReuseDirectPiece() { - peer.Log.Warnf("can not reuse direct piece %d %d", len(peer.Task.DirectPiece), peer.Task.ContentLength.Load()) - break - } - - stream, loaded := peer.LoadAnnouncePeerStream() - if !loaded { - return status.Error(codes.NotFound, "AnnouncePeerStream not found") - } - - if err := peer.FSM.Event(ctx, resource.PeerEventRegisterTiny); err != nil { - return status.Error(codes.Internal, err.Error()) - } - - if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ - Response: &schedulerv2.AnnouncePeerResponse_TinyTaskResponse{ - TinyTaskResponse: &schedulerv2.TinyTaskResponse{ - Content: peer.Task.DirectPiece, - }, - }, - }); err != nil { - return status.Error(codes.Internal, err.Error()) - } - - return nil - case commonv2.SizeScope_SMALL: - // If a parent with the state of PeerStateSucceeded can be found in the task, - // its information will be returned. If a parent with the state of - // PeerStateSucceeded cannot be found in the task, - // it will be scheduled as a Normal Task. - peer.Log.Info("scheduling as SizeScope_SMALL") - parent, found := v.scheduling.FindSuccessParent(ctx, peer, set.NewSafeSet[string]()) - if !found { - peer.Log.Warn("candidate parents not found") - break - } - - // Delete inedges of peer. - if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil { - return status.Error(codes.Internal, err.Error()) - } - - // Add edges between success parent and peer. - if err := peer.Task.AddPeerEdge(parent, peer); err != nil { - return status.Error(codes.Internal, err.Error()) - } - - stream, loaded := peer.LoadAnnouncePeerStream() - if !loaded { - return status.Error(codes.NotFound, "AnnouncePeerStream not found") - } - - if err := peer.FSM.Event(ctx, resource.PeerEventRegisterSmall); err != nil { - return status.Error(codes.Internal, err.Error()) - } - - if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ - Response: scheduling.ConstructSuccessSmallTaskResponse(parent), - }); err != nil { - return status.Error(codes.Internal, err.Error()) - } - - return nil - case commonv2.SizeScope_NORMAL, commonv2.SizeScope_UNKNOW: + case commonv2.SizeScope_NORMAL, commonv2.SizeScope_TINY, commonv2.SizeScope_SMALL, commonv2.SizeScope_UNKNOW: default: return status.Errorf(codes.FailedPrecondition, "invalid size cope %#v", sizeScope) } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index b1aa00551..25597f6bb 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -1593,255 +1593,6 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty) }, }, - { - name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and event PeerEventRegisterNormal failed", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ) - - peer.Task.ContentLength.Store(1) - peer.Priority = commonv2.Priority_LEVEL6 - peer.FSM.SetState(resource.PeerStateReceivedNormal) - - assert := assert.New(t) - assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Error(codes.Internal, "event RegisterNormal inappropriate in current state ReceivedNormal")) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) - }, - }, - { - name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and scheduling failed", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1), - ) - - peer.Task.ContentLength.Store(1) - peer.Priority = commonv2.Priority_LEVEL6 - - assert := assert.New(t) - assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Error(codes.FailedPrecondition, "foo")) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) - }, - }, - { - name: "size scope is SizeScope_TINY and task can not reuse DirectPiece", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), - ) - - peer.Task.ContentLength.Store(1) - peer.Priority = commonv2.Priority_LEVEL6 - - assert := assert.New(t) - assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) - }, - }, - { - name: "size scope is SizeScope_SMALL and task can not found success parent", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1), - ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), - ) - - peer.Task.ContentLength.Store(129) - peer.Task.TotalPieceCount.Store(1) - peer.Priority = commonv2.Priority_LEVEL6 - - assert := assert.New(t) - assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) - }, - }, - { - name: "size scope is SizeScope_SMALL and load AnnouncePeerStream failed", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), - ) - - peer.Task.ContentLength.Store(129) - peer.Task.TotalPieceCount.Store(1) - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - peer.Priority = commonv2.Priority_LEVEL6 - - assert := assert.New(t) - assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Error(codes.NotFound, "AnnouncePeerStream not found")) - assert.Equal(peer.FSM.Current(), resource.PeerStatePending) - }, - }, - { - name: "size scope is SizeScope_SMALL and event PeerEventRegisterSmall failed", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), - ) - - peer.Task.ContentLength.Store(129) - peer.Task.TotalPieceCount.Store(1) - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - peer.Priority = commonv2.Priority_LEVEL6 - peer.StoreAnnouncePeerStream(stream) - peer.FSM.SetState(resource.PeerStateReceivedSmall) - - assert := assert.New(t) - assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Error(codes.Internal, "event RegisterSmall inappropriate in current state ReceivedSmall")) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) - }, - }, - { - name: "size scope is SizeScope_SMALL and send SmallTaskResponse failed", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), - ma.Send(gomock.Any()).Return(errors.New("foo")).Times(1), - ) - - peer.Task.ContentLength.Store(129) - peer.Task.TotalPieceCount.Store(1) - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - peer.Priority = commonv2.Priority_LEVEL6 - peer.StoreAnnouncePeerStream(stream) - - assert := assert.New(t) - assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Error(codes.Internal, "foo")) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) - }, - }, - { - name: "size scope is SizeScope_SMALL", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), - ma.Send(gomock.Any()).Return(nil).Times(1), - ) - - peer.Task.ContentLength.Store(129) - peer.Task.TotalPieceCount.Store(1) - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - peer.Priority = commonv2.Priority_LEVEL6 - peer.StoreAnnouncePeerStream(stream) - - assert := assert.New(t) - assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) - }, - }, { name: "size scope is SizeScope_NORMAL", req: &schedulerv2.RegisterPeerRequest{ @@ -2105,255 +1856,6 @@ func TestServiceV2_handleRegisterSeedPeerRequest(t *testing.T) { assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty) }, }, - { - name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and event PeerEventRegisterNormal failed", - req: &schedulerv2.RegisterSeedPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ) - - peer.Task.ContentLength.Store(1) - peer.Priority = commonv2.Priority_LEVEL6 - peer.FSM.SetState(resource.PeerStateReceivedNormal) - - assert := assert.New(t) - assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Error(codes.Internal, "event RegisterNormal inappropriate in current state ReceivedNormal")) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) - }, - }, - { - name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and scheduling failed", - req: &schedulerv2.RegisterSeedPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1), - ) - - peer.Task.ContentLength.Store(1) - peer.Priority = commonv2.Priority_LEVEL6 - - assert := assert.New(t) - assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Error(codes.FailedPrecondition, "foo")) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) - }, - }, - { - name: "size scope is SizeScope_TINY and task can not reuse DirectPiece", - req: &schedulerv2.RegisterSeedPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), - ) - - peer.Task.ContentLength.Store(1) - peer.Priority = commonv2.Priority_LEVEL6 - - assert := assert.New(t) - assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) - }, - }, - { - name: "size scope is SizeScope_SMALL and task can not found success parent", - req: &schedulerv2.RegisterSeedPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1), - ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), - ) - - peer.Task.ContentLength.Store(129) - peer.Task.TotalPieceCount.Store(1) - peer.Priority = commonv2.Priority_LEVEL6 - - assert := assert.New(t) - assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) - }, - }, - { - name: "size scope is SizeScope_SMALL and load AnnouncePeerStream failed", - req: &schedulerv2.RegisterSeedPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), - ) - - peer.Task.ContentLength.Store(129) - peer.Task.TotalPieceCount.Store(1) - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - peer.Priority = commonv2.Priority_LEVEL6 - - assert := assert.New(t) - assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Error(codes.NotFound, "AnnouncePeerStream not found")) - assert.Equal(peer.FSM.Current(), resource.PeerStatePending) - }, - }, - { - name: "size scope is SizeScope_SMALL and event PeerEventRegisterSmall failed", - req: &schedulerv2.RegisterSeedPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), - ) - - peer.Task.ContentLength.Store(129) - peer.Task.TotalPieceCount.Store(1) - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - peer.Priority = commonv2.Priority_LEVEL6 - peer.StoreAnnouncePeerStream(stream) - peer.FSM.SetState(resource.PeerStateReceivedSmall) - - assert := assert.New(t) - assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Error(codes.Internal, "event RegisterSmall inappropriate in current state ReceivedSmall")) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) - }, - }, - { - name: "size scope is SizeScope_SMALL and send SmallTaskResponse failed", - req: &schedulerv2.RegisterSeedPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), - ma.Send(gomock.Any()).Return(errors.New("foo")).Times(1), - ) - - peer.Task.ContentLength.Store(129) - peer.Task.TotalPieceCount.Store(1) - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - peer.Priority = commonv2.Priority_LEVEL6 - peer.StoreAnnouncePeerStream(stream) - - assert := assert.New(t) - assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Error(codes.Internal, "foo")) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) - }, - }, - { - name: "size scope is SizeScope_SMALL", - req: &schedulerv2.RegisterSeedPeerRequest{ - Download: &commonv2.Download{ - Digest: &dgst, - }, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, - peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, - mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), - mr.TaskManager().Return(taskManager).Times(1), - mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), - ma.Send(gomock.Any()).Return(nil).Times(1), - ) - - peer.Task.ContentLength.Store(129) - peer.Task.TotalPieceCount.Store(1) - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - peer.Priority = commonv2.Priority_LEVEL6 - peer.StoreAnnouncePeerStream(stream) - - assert := assert.New(t) - assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) - assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) - }, - }, { name: "size scope is SizeScope_NORMAL", req: &schedulerv2.RegisterSeedPeerRequest{ @@ -2859,86 +2361,6 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded) }, }, - { - name: "task size scope is SizeScope_TINY and download tiny file failed", - req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{ - ContentLength: 127, - PieceCount: 1, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, - mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - gomock.InOrder( - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ) - - peer.FSM.SetState(resource.PeerStateRunning) - peer.Task.FSM.SetState(resource.TaskStateRunning) - peer.Host.DownloadPort = 0 - - assert := assert.New(t) - assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req)) - assert.NotEqual(peer.Cost.Load(), 0) - assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded) - assert.Equal(peer.Task.ContentLength.Load(), int64(127)) - assert.Equal(peer.Task.TotalPieceCount.Load(), int32(1)) - assert.Equal(len(peer.Task.DirectPiece), 0) - assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded) - }, - }, - { - name: "task size scope is SizeScope_TINY and validate tiny file of downloading failed", - req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{ - ContentLength: 126, - PieceCount: 1, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, - mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - gomock.InOrder( - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ) - - peer.FSM.SetState(resource.PeerStateRunning) - peer.Task.FSM.SetState(resource.TaskStateRunning) - - assert := assert.New(t) - assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req)) - assert.NotEqual(peer.Cost.Load(), 0) - assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded) - assert.Equal(peer.Task.ContentLength.Load(), int64(126)) - assert.Equal(peer.Task.TotalPieceCount.Load(), int32(1)) - assert.Equal(len(peer.Task.DirectPiece), 0) - assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded) - }, - }, - { - name: "task size scope is SizeScope_TINY and validate tiny file of downloading failed", - req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{ - ContentLength: 1, - PieceCount: 1, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, - mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - gomock.InOrder( - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), - ) - - peer.FSM.SetState(resource.PeerStateRunning) - peer.Task.FSM.SetState(resource.TaskStateRunning) - - assert := assert.New(t) - assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req)) - assert.NotEqual(peer.Cost.Load(), 0) - assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded) - assert.Equal(peer.Task.ContentLength.Load(), int64(1)) - assert.Equal(peer.Task.TotalPieceCount.Load(), int32(1)) - assert.Equal(len(peer.Task.DirectPiece), 1) - assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded) - }, - }, } for _, tc := range tests {