diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index ce6a4a111..9548788b9 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -109,7 +109,10 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error } case *schedulerv2.AnnouncePeerRequest_RegisterSeedPeerRequest: logger.Infof("receive AnnouncePeerRequest_RegisterSeedPeerRequest: %#v", announcePeerRequest.RegisterSeedPeerRequest.Download) - v.handleRegisterSeedPeerRequest(ctx, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterSeedPeerRequest) + if err := v.handleRegisterSeedPeerRequest(ctx, stream, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterSeedPeerRequest); err != nil { + logger.Error(err) + return err + } case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest: logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest) v.handleDownloadPeerStartedRequest(ctx, announcePeerRequest.DownloadPeerStartedRequest) @@ -151,11 +154,6 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error } } -// TODO Implement function. -// handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest. -func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, hostID, taskID, peerID string, req *schedulerv2.RegisterSeedPeerRequest) { -} - // TODO Implement function. // handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest. func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, req *schedulerv2.DownloadPeerStartedRequest) { @@ -669,9 +667,9 @@ func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) e // handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { // Handle resource included host, task, and peer. - _, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req) + _, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.Download) if err != nil { - return status.Error(codes.FailedPrecondition, err.Error()) + return err } // When there are no available peers for a task, the scheduler needs to trigger @@ -684,8 +682,145 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S } } - // Provide different scheduling strategies for different task type. - sizeScope := task.SizeScope() + // Scheduling parent for the peer.. + return v.schedule(ctx, peer) +} + +// handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest. +func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterSeedPeerRequest) error { + // Handle resource included host, task, and peer. + _, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.Download) + if err != nil { + return err + } + + // When there are no available peers for a task, the scheduler needs to trigger + // the first task download in the p2p cluster. + blocklist := set.NewSafeSet[string]() + blocklist.Add(peer.ID) + if !task.HasAvailablePeer(blocklist) { + // When the task has no available peer, + // the seed peer will download back-to-source directly. + peer.NeedBackToSource.Store(true) + } + + // Scheduling parent for the peer.. + return v.schedule(ctx, peer) +} + +// handleResource handles resource included host, task, and peer. +func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, download *commonv2.Download) (*resource.Host, *resource.Task, *resource.Peer, error) { + // If the host does not exist and the host address cannot be found, + // it may cause an exception. + host, loaded := v.resource.HostManager().Load(hostID) + if !loaded { + return nil, nil, nil, status.Errorf(codes.NotFound, "host %s not found", hostID) + } + + // Store new task or update task. + task, loaded := v.resource.TaskManager().Load(taskID) + if !loaded { + options := []resource.TaskOption{resource.WithPieceLength(download.PieceLength)} + if download.Digest != "" { + d, err := digest.Parse(download.Digest) + if err != nil { + return nil, nil, nil, status.Error(codes.InvalidArgument, err.Error()) + } + + // If request has invalid digest, then new task with the nil digest. + options = append(options, resource.WithDigest(d)) + } + + task = resource.NewTask(taskID, download.Url, download.Tag, download.Application, download.Type, + download.Filters, download.Header, int32(v.config.Scheduler.BackToSourceCount), options...) + v.resource.TaskManager().Store(task) + } else { + task.URL = download.Url + task.Filters = download.Filters + task.Header = download.Header + } + + // Store new peer or load peer. + peer, loaded := v.resource.PeerManager().Load(peerID) + if !loaded { + options := []resource.PeerOption{resource.WithPriority(download.Priority), resource.WithAnnouncePeerStream(stream)} + if download.Range != nil { + options = append(options, resource.WithRange(http.Range{Start: download.Range.Start, Length: download.Range.Length})) + } + + peer = resource.NewPeer(peerID, task, host, options...) + v.resource.PeerManager().Store(peer) + } + + return host, task, peer, nil +} + +// downloadTaskBySeedPeer downloads task by seed peer. +func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error { + // Trigger the first download task based on different priority levels, + // refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74. + priority := peer.CalculatePriority(v.dynconfig) + peer.Log.Infof("peer priority is %s", priority.String()) + switch priority { + case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0: + // Super peer is first triggered to download back-to-source. + if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { + go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { + if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil { + peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) + return + } + }(ctx, peer, types.HostTypeSuperSeed) + break + } + + fallthrough + case commonv2.Priority_LEVEL5: + // Strong peer is first triggered to download back-to-source. + if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { + go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { + if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil { + peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) + return + } + }(ctx, peer, types.HostTypeStrongSeed) + break + } + + fallthrough + case commonv2.Priority_LEVEL4: + // Weak peer is first triggered to download back-to-source. + if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { + go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { + if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil { + peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) + return + } + }(ctx, peer, types.HostTypeWeakSeed) + break + } + + fallthrough + case commonv2.Priority_LEVEL3: + // When the task has no available peer, + // the peer is first to download back-to-source. + peer.NeedBackToSource.Store(true) + case commonv2.Priority_LEVEL2: + // Peer is first to download back-to-source. + return status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String()) + case commonv2.Priority_LEVEL1: + // Download task is forbidden. + return status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String()) + default: + return status.Errorf(codes.InvalidArgument, "invalid priority %#v", priority) + } + + return nil +} + +// schedule provides different scheduling strategies for different task type. +func (v *V2) schedule(ctx context.Context, peer *resource.Peer) error { + sizeScope := peer.Task.SizeScope() switch sizeScope { case commonv2.SizeScope_EMPTY: // Return an EmptyTaskResponse directly. @@ -716,7 +851,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S // it will be scheduled as a Normal Task. peer.Log.Info("scheduling as SizeScope_TINY") if !peer.Task.CanReuseDirectPiece() { - peer.Log.Warnf("can not reuse direct piece %d %d", len(task.DirectPiece), task.ContentLength.Load()) + peer.Log.Warnf("can not reuse direct piece %d %d", len(peer.Task.DirectPiece), peer.Task.ContentLength.Load()) break } @@ -796,113 +931,3 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S return nil } - -// handleResource handles resource included host, task, and peer. -func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) (*resource.Host, *resource.Task, *resource.Peer, error) { - // If the host does not exist and the host address cannot be found, - // it may cause an exception. - host, loaded := v.resource.HostManager().Load(hostID) - if !loaded { - return nil, nil, nil, fmt.Errorf("host %s not found", hostID) - } - - // Store new task or update task. - task, loaded := v.resource.TaskManager().Load(taskID) - if !loaded { - options := []resource.TaskOption{resource.WithPieceLength(req.Download.PieceLength)} - if req.Download.Digest != "" { - d, err := digest.Parse(req.Download.Digest) - if err != nil { - return nil, nil, nil, fmt.Errorf("invalid digest %s", req.Download.Digest) - } - - // If request has invalid digest, then new task with the nil digest. - options = append(options, resource.WithDigest(d)) - } - - task = resource.NewTask(taskID, req.Download.Url, req.Download.Tag, req.Download.Application, req.Download.Type, - req.Download.Filters, req.Download.Header, int32(v.config.Scheduler.BackToSourceCount), options...) - v.resource.TaskManager().Store(task) - } else { - task.URL = req.Download.Url - task.Filters = req.Download.Filters - task.Header = req.Download.Header - } - - // Store new peer or load peer. - peer, loaded := v.resource.PeerManager().Load(peerID) - if !loaded { - options := []resource.PeerOption{resource.WithPriority(req.Download.Priority), resource.WithAnnouncePeerStream(stream)} - if req.Download.Range != nil { - options = append(options, resource.WithRange(http.Range{Start: req.Download.Range.Start, Length: req.Download.Range.Length})) - } - - peer = resource.NewPeer(peerID, task, host, options...) - v.resource.PeerManager().Store(peer) - } - - return host, task, peer, nil -} - -// downloadTaskBySeedPeer downloads task by seed peer. -func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error { - // Trigger the first download task based on different priority levels, - // refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74. - priority := peer.CalculatePriority(v.dynconfig) - peer.Log.Infof("peer priority is %s", priority.String()) - switch priority { - case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0: - // Super peer is first triggered to back-to-source. - if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { - go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { - if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil { - peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) - return - } - }(ctx, peer, types.HostTypeSuperSeed) - break - } - - fallthrough - case commonv2.Priority_LEVEL5: - // Strong peer is first triggered to back-to-source. - if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { - go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { - if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil { - peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) - return - } - }(ctx, peer, types.HostTypeStrongSeed) - break - } - - fallthrough - case commonv2.Priority_LEVEL4: - // Weak peer is first triggered to back-to-source. - if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { - go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { - if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil { - peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) - return - } - }(ctx, peer, types.HostTypeWeakSeed) - break - } - - fallthrough - case commonv2.Priority_LEVEL3: - // When the task is downloaded for the first time, - // the normal peer is first to download back-to-source. - peer.NeedBackToSource.Store(true) - case commonv2.Priority_LEVEL2: - // Peer is first to download back-to-source. - return status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String()) - case commonv2.Priority_LEVEL1: - // Download task is forbidden. - return status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String()) - default: - return status.Errorf(codes.InvalidArgument, "invalid priority %#v", priority) - } - - return nil -} diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index fe665f340..a6425acc2 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -19,7 +19,6 @@ package service import ( "context" "errors" - "fmt" "reflect" "sync" "testing" @@ -932,7 +931,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { assert := assert.New(t) assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req), - status.Errorf(codes.FailedPrecondition, "host %s not found", peer.Host.ID)) + status.Errorf(codes.NotFound, "host %s not found", peer.Host.ID)) }, }, { @@ -1388,18 +1387,496 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { } } -func TestServiceV2_handleResource(t *testing.T) { +func TestServiceV2_handleRegisterSeedPeerRequest(t *testing.T) { tests := []struct { name string - req *schedulerv2.RegisterPeerRequest - run func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + req *schedulerv2.RegisterSeedPeerRequest + run func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) + }{ + { + name: "host not found", + req: &schedulerv2.RegisterSeedPeerRequest{}, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(nil, false).Times(1), + ) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Errorf(codes.NotFound, "host %s not found", peer.Host.ID)) + }, + }, + { + name: "can not found available peer", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), + ) + + peer.Priority = commonv2.Priority_LEVEL1 + + assert := assert.New(t) + assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.NeedBackToSource.Load(), true) + }, + }, + { + name: "size scope is SizeScope_EMPTY and load AnnouncePeerStream failed", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ) + + peer.Task.ContentLength.Store(0) + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.NotFound, "AnnouncePeerStream not found")) + assert.Equal(peer.FSM.Current(), resource.PeerStatePending) + }, + }, + { + name: "size scope is SizeScope_EMPTY and event PeerEventRegisterEmpty failed", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ) + + peer.Task.ContentLength.Store(0) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + peer.FSM.SetState(resource.PeerStateReceivedEmpty) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Errorf(codes.Internal, "event RegisterEmpty inappropriate in current state ReceivedEmpty")) + }, + }, + { + name: "size scope is SizeScope_EMPTY and send EmptyTaskResponse failed", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ + Response: &schedulerv2.AnnouncePeerResponse_EmptyTaskResponse{ + EmptyTaskResponse: &schedulerv2.EmptyTaskResponse{}, + }, + })).Return(errors.New("foo")).Times(1), + ) + + peer.Task.ContentLength.Store(0) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Errorf(codes.Internal, "foo")) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty) + }, + }, + { + name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and event PeerEventRegisterNormal failed", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ) + + peer.Task.ContentLength.Store(1) + peer.Priority = commonv2.Priority_LEVEL6 + peer.FSM.SetState(resource.PeerStateReceivedNormal) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.Internal, "event RegisterNormal inappropriate in current state ReceivedNormal")) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + { + name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and scheduling failed", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1), + ) + + peer.Task.ContentLength.Store(1) + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.FailedPrecondition, "foo")) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + { + name: "size scope is SizeScope_TINY and task can not reuse DirectPiece", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), + ) + + peer.Task.ContentLength.Store(1) + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + { + name: "size scope is SizeScope_SMALL and task can not found success parent", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(1) + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + { + name: "size scope is SizeScope_SMALL and load AnnouncePeerStream failed", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(1) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(seedPeer) + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.NotFound, "AnnouncePeerStream not found")) + assert.Equal(peer.FSM.Current(), resource.PeerStatePending) + }, + }, + { + name: "size scope is SizeScope_SMALL and event PeerEventRegisterSmall failed", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(1) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(seedPeer) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + peer.FSM.SetState(resource.PeerStateReceivedSmall) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.Internal, "event RegisterSmall inappropriate in current state ReceivedSmall")) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) + }, + }, + { + name: "size scope is SizeScope_SMALL and send SmallTaskResponse failed", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), + ma.Send(gomock.Any()).Return(errors.New("foo")).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(1) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(seedPeer) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req), + status.Error(codes.Internal, "foo")) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) + }, + }, + { + name: "size scope is SizeScope_SMALL", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1), + ma.Send(gomock.Any()).Return(nil).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(1) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(seedPeer) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + + assert := assert.New(t) + assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall) + }, + }, + { + name: "size scope is SizeScope_NORMAL", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), + ) + + peer.Task.ContentLength.Store(129) + peer.Task.TotalPieceCount.Store(2) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(seedPeer) + peer.Priority = commonv2.Priority_LEVEL6 + peer.StoreAnnouncePeerStream(stream) + + assert := assert.New(t) + assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + { + name: "size scope is SizeScope_UNKNOW", + req: &schedulerv2.RegisterSeedPeerRequest{ + Download: &commonv2.Download{ + Digest: mockTaskDigest.String(), + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, + peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), + ) + + peer.Priority = commonv2.Priority_LEVEL6 + + assert := assert.New(t) + assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req)) + assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := schedulingmocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + hostManager := resource.NewMockHostManager(ctl) + peerManager := resource.NewMockPeerManager(ctl) + taskManager := resource.NewMockTaskManager(ctl) + stream := schedulerv2mocks.NewMockScheduler_AnnouncePeerServer(ctl) + + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + peer := resource.NewPeer(mockPeerID, mockTask, mockHost) + seedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + + tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT()) + }) + } +} + +func TestServiceV2_handleResource(t *testing.T) { + tests := []struct { + name string + download *commonv2.Download + run func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) }{ { - name: "host can not be loaded", - req: &schedulerv2.RegisterPeerRequest{}, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + name: "host can not be loaded", + download: &commonv2.Download{}, + run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( @@ -1408,20 +1885,18 @@ func TestServiceV2_handleResource(t *testing.T) { ) assert := assert.New(t) - _, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) - assert.EqualError(err, fmt.Sprintf("host %s not found", mockHost.ID)) + _, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download) + assert.ErrorIs(err, status.Errorf(codes.NotFound, "host %s not found", mockHost.ID)) }, }, { name: "task can be loaded", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Url: "foo", - Filters: []string{"bar"}, - Header: map[string]string{"baz": "bas"}, - }, + download: &commonv2.Download{ + Url: "foo", + Filters: []string{"bar"}, + Header: map[string]string{"baz": "bas"}, }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( @@ -1434,26 +1909,24 @@ func TestServiceV2_handleResource(t *testing.T) { ) assert := assert.New(t) - host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) + host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download) assert.NoError(err) assert.EqualValues(host, mockHost) assert.Equal(task.ID, mockTask.ID) - assert.Equal(task.URL, req.Download.Url) - assert.EqualValues(task.Filters, req.Download.Filters) - assert.EqualValues(task.Header, req.Download.Header) + assert.Equal(task.URL, download.Url) + assert.EqualValues(task.Filters, download.Filters) + assert.EqualValues(task.Header, download.Header) }, }, { name: "task can not be loaded", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Url: "foo", - Filters: []string{"bar"}, - Header: map[string]string{"baz": "bas"}, - Digest: mockTaskDigest.String(), - }, + download: &commonv2.Download{ + Url: "foo", + Filters: []string{"bar"}, + Header: map[string]string{"baz": "bas"}, + Digest: mockTaskDigest.String(), }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( @@ -1468,24 +1941,22 @@ func TestServiceV2_handleResource(t *testing.T) { ) assert := assert.New(t) - host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) + host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download) assert.NoError(err) assert.EqualValues(host, mockHost) assert.Equal(task.ID, mockTask.ID) - assert.Equal(task.Digest.String(), req.Download.Digest) - assert.Equal(task.URL, req.Download.Url) - assert.EqualValues(task.Filters, req.Download.Filters) - assert.EqualValues(task.Header, req.Download.Header) + assert.Equal(task.Digest.String(), download.Digest) + assert.Equal(task.URL, download.Url) + assert.EqualValues(task.Filters, download.Filters) + assert.EqualValues(task.Header, download.Header) }, }, { name: "invalid digest", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Digest: "foo", - }, + download: &commonv2.Download{ + Digest: "foo", }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( @@ -1496,21 +1967,19 @@ func TestServiceV2_handleResource(t *testing.T) { ) assert := assert.New(t) - _, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) - assert.EqualError(err, "invalid digest foo") + _, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download) + assert.ErrorIs(err, status.Error(codes.InvalidArgument, "invalid digest")) }, }, { name: "peer can be loaded", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Url: "foo", - Filters: []string{"bar"}, - Header: map[string]string{"baz": "bas"}, - Digest: mockTaskDigest.String(), - }, + download: &commonv2.Download{ + Url: "foo", + Filters: []string{"bar"}, + Header: map[string]string{"baz": "bas"}, + Digest: mockTaskDigest.String(), }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( @@ -1523,33 +1992,31 @@ func TestServiceV2_handleResource(t *testing.T) { ) assert := assert.New(t) - host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) + host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download) assert.NoError(err) assert.EqualValues(host, mockHost) assert.Equal(task.ID, mockTask.ID) - assert.Equal(task.Digest.String(), req.Download.Digest) - assert.Equal(task.URL, req.Download.Url) - assert.EqualValues(task.Filters, req.Download.Filters) - assert.EqualValues(task.Header, req.Download.Header) + assert.Equal(task.Digest.String(), download.Digest) + assert.Equal(task.URL, download.Url) + assert.EqualValues(task.Filters, download.Filters) + assert.EqualValues(task.Header, download.Header) assert.EqualValues(peer, mockPeer) }, }, { name: "peer can not be loaded", - req: &schedulerv2.RegisterPeerRequest{ - Download: &commonv2.Download{ - Url: "foo", - Filters: []string{"bar"}, - Header: map[string]string{"baz": "bas"}, - Digest: mockTaskDigest.String(), - Priority: commonv2.Priority_LEVEL1, - Range: &commonv2.Range{ - Start: mockPeerRange.Start, - Length: mockPeerRange.Length, - }, + download: &commonv2.Download{ + Url: "foo", + Filters: []string{"bar"}, + Header: map[string]string{"baz": "bas"}, + Digest: mockTaskDigest.String(), + Priority: commonv2.Priority_LEVEL1, + Range: &commonv2.Range{ + Start: mockPeerRange.Start, + Length: mockPeerRange.Length, }, }, - run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( @@ -1564,18 +2031,18 @@ func TestServiceV2_handleResource(t *testing.T) { ) assert := assert.New(t) - host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req) + host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download) assert.NoError(err) assert.EqualValues(host, mockHost) assert.Equal(task.ID, mockTask.ID) - assert.Equal(task.Digest.String(), req.Download.Digest) - assert.Equal(task.URL, req.Download.Url) - assert.EqualValues(task.Filters, req.Download.Filters) - assert.EqualValues(task.Header, req.Download.Header) + assert.Equal(task.Digest.String(), download.Digest) + assert.Equal(task.URL, download.Url) + assert.EqualValues(task.Filters, download.Filters) + assert.EqualValues(task.Header, download.Header) assert.Equal(peer.ID, mockPeer.ID) - assert.Equal(peer.Priority, req.Download.Priority) - assert.Equal(peer.Range.Start, req.Download.Range.Start) - assert.Equal(peer.Range.Length, req.Download.Range.Length) + assert.Equal(peer.Priority, download.Priority) + assert.Equal(peer.Range.Start, download.Range.Start) + assert.Equal(peer.Range.Length, download.Range.Length) assert.NotNil(peer.AnnouncePeerStream) assert.EqualValues(peer.Host, mockHost) assert.EqualValues(peer.Task, mockTask) @@ -1603,7 +2070,7 @@ func TestServiceV2_handleResource(t *testing.T) { mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) - tc.run(t, svc, tc.req, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT()) + tc.run(t, svc, tc.download, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT()) }) } }