feat: add handleRegisterSeedPeerRequest to service v2 in scheduler (#2148)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-03-08 14:10:33 +08:00
parent 047a55fb84
commit ff63681e88
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
2 changed files with 688 additions and 196 deletions

View File

@ -109,7 +109,10 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
}
case *schedulerv2.AnnouncePeerRequest_RegisterSeedPeerRequest:
logger.Infof("receive AnnouncePeerRequest_RegisterSeedPeerRequest: %#v", announcePeerRequest.RegisterSeedPeerRequest.Download)
v.handleRegisterSeedPeerRequest(ctx, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterSeedPeerRequest)
if err := v.handleRegisterSeedPeerRequest(ctx, stream, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterSeedPeerRequest); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest)
v.handleDownloadPeerStartedRequest(ctx, announcePeerRequest.DownloadPeerStartedRequest)
@ -151,11 +154,6 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
}
}
// TODO Implement function.
// handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, hostID, taskID, peerID string, req *schedulerv2.RegisterSeedPeerRequest) {
}
// TODO Implement function.
// handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, req *schedulerv2.DownloadPeerStartedRequest) {
@ -669,9 +667,9 @@ func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) e
// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer.
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req)
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.Download)
if err != nil {
return status.Error(codes.FailedPrecondition, err.Error())
return err
}
// When there are no available peers for a task, the scheduler needs to trigger
@ -684,8 +682,145 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
}
}
// Provide different scheduling strategies for different task type.
sizeScope := task.SizeScope()
// Scheduling parent for the peer..
return v.schedule(ctx, peer)
}
// handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterSeedPeerRequest) error {
// Handle resource included host, task, and peer.
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.Download)
if err != nil {
return err
}
// When there are no available peers for a task, the scheduler needs to trigger
// the first task download in the p2p cluster.
blocklist := set.NewSafeSet[string]()
blocklist.Add(peer.ID)
if !task.HasAvailablePeer(blocklist) {
// When the task has no available peer,
// the seed peer will download back-to-source directly.
peer.NeedBackToSource.Store(true)
}
// Scheduling parent for the peer..
return v.schedule(ctx, peer)
}
// handleResource handles resource included host, task, and peer.
func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, download *commonv2.Download) (*resource.Host, *resource.Task, *resource.Peer, error) {
// If the host does not exist and the host address cannot be found,
// it may cause an exception.
host, loaded := v.resource.HostManager().Load(hostID)
if !loaded {
return nil, nil, nil, status.Errorf(codes.NotFound, "host %s not found", hostID)
}
// Store new task or update task.
task, loaded := v.resource.TaskManager().Load(taskID)
if !loaded {
options := []resource.TaskOption{resource.WithPieceLength(download.PieceLength)}
if download.Digest != "" {
d, err := digest.Parse(download.Digest)
if err != nil {
return nil, nil, nil, status.Error(codes.InvalidArgument, err.Error())
}
// If request has invalid digest, then new task with the nil digest.
options = append(options, resource.WithDigest(d))
}
task = resource.NewTask(taskID, download.Url, download.Tag, download.Application, download.Type,
download.Filters, download.Header, int32(v.config.Scheduler.BackToSourceCount), options...)
v.resource.TaskManager().Store(task)
} else {
task.URL = download.Url
task.Filters = download.Filters
task.Header = download.Header
}
// Store new peer or load peer.
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
options := []resource.PeerOption{resource.WithPriority(download.Priority), resource.WithAnnouncePeerStream(stream)}
if download.Range != nil {
options = append(options, resource.WithRange(http.Range{Start: download.Range.Start, Length: download.Range.Length}))
}
peer = resource.NewPeer(peerID, task, host, options...)
v.resource.PeerManager().Store(peer)
}
return host, task, peer, nil
}
// downloadTaskBySeedPeer downloads task by seed peer.
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error {
// Trigger the first download task based on different priority levels,
// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
priority := peer.CalculatePriority(v.dynconfig)
peer.Log.Infof("peer priority is %s", priority.String())
switch priority {
case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0:
// Super peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeSuperSeed)
break
}
fallthrough
case commonv2.Priority_LEVEL5:
// Strong peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeStrongSeed)
break
}
fallthrough
case commonv2.Priority_LEVEL4:
// Weak peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeWeakSeed)
break
}
fallthrough
case commonv2.Priority_LEVEL3:
// When the task has no available peer,
// the peer is first to download back-to-source.
peer.NeedBackToSource.Store(true)
case commonv2.Priority_LEVEL2:
// Peer is first to download back-to-source.
return status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String())
case commonv2.Priority_LEVEL1:
// Download task is forbidden.
return status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String())
default:
return status.Errorf(codes.InvalidArgument, "invalid priority %#v", priority)
}
return nil
}
// schedule provides different scheduling strategies for different task type.
func (v *V2) schedule(ctx context.Context, peer *resource.Peer) error {
sizeScope := peer.Task.SizeScope()
switch sizeScope {
case commonv2.SizeScope_EMPTY:
// Return an EmptyTaskResponse directly.
@ -716,7 +851,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
// it will be scheduled as a Normal Task.
peer.Log.Info("scheduling as SizeScope_TINY")
if !peer.Task.CanReuseDirectPiece() {
peer.Log.Warnf("can not reuse direct piece %d %d", len(task.DirectPiece), task.ContentLength.Load())
peer.Log.Warnf("can not reuse direct piece %d %d", len(peer.Task.DirectPiece), peer.Task.ContentLength.Load())
break
}
@ -796,113 +931,3 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
return nil
}
// handleResource handles resource included host, task, and peer.
func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) (*resource.Host, *resource.Task, *resource.Peer, error) {
// If the host does not exist and the host address cannot be found,
// it may cause an exception.
host, loaded := v.resource.HostManager().Load(hostID)
if !loaded {
return nil, nil, nil, fmt.Errorf("host %s not found", hostID)
}
// Store new task or update task.
task, loaded := v.resource.TaskManager().Load(taskID)
if !loaded {
options := []resource.TaskOption{resource.WithPieceLength(req.Download.PieceLength)}
if req.Download.Digest != "" {
d, err := digest.Parse(req.Download.Digest)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid digest %s", req.Download.Digest)
}
// If request has invalid digest, then new task with the nil digest.
options = append(options, resource.WithDigest(d))
}
task = resource.NewTask(taskID, req.Download.Url, req.Download.Tag, req.Download.Application, req.Download.Type,
req.Download.Filters, req.Download.Header, int32(v.config.Scheduler.BackToSourceCount), options...)
v.resource.TaskManager().Store(task)
} else {
task.URL = req.Download.Url
task.Filters = req.Download.Filters
task.Header = req.Download.Header
}
// Store new peer or load peer.
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
options := []resource.PeerOption{resource.WithPriority(req.Download.Priority), resource.WithAnnouncePeerStream(stream)}
if req.Download.Range != nil {
options = append(options, resource.WithRange(http.Range{Start: req.Download.Range.Start, Length: req.Download.Range.Length}))
}
peer = resource.NewPeer(peerID, task, host, options...)
v.resource.PeerManager().Store(peer)
}
return host, task, peer, nil
}
// downloadTaskBySeedPeer downloads task by seed peer.
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error {
// Trigger the first download task based on different priority levels,
// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
priority := peer.CalculatePriority(v.dynconfig)
peer.Log.Infof("peer priority is %s", priority.String())
switch priority {
case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0:
// Super peer is first triggered to back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeSuperSeed)
break
}
fallthrough
case commonv2.Priority_LEVEL5:
// Strong peer is first triggered to back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeStrongSeed)
break
}
fallthrough
case commonv2.Priority_LEVEL4:
// Weak peer is first triggered to back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeWeakSeed)
break
}
fallthrough
case commonv2.Priority_LEVEL3:
// When the task is downloaded for the first time,
// the normal peer is first to download back-to-source.
peer.NeedBackToSource.Store(true)
case commonv2.Priority_LEVEL2:
// Peer is first to download back-to-source.
return status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String())
case commonv2.Priority_LEVEL1:
// Download task is forbidden.
return status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String())
default:
return status.Errorf(codes.InvalidArgument, "invalid priority %#v", priority)
}
return nil
}

View File

@ -19,7 +19,6 @@ package service
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"testing"
@ -932,7 +931,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) {
assert := assert.New(t)
assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req),
status.Errorf(codes.FailedPrecondition, "host %s not found", peer.Host.ID))
status.Errorf(codes.NotFound, "host %s not found", peer.Host.ID))
},
},
{
@ -1388,18 +1387,496 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) {
}
}
func TestServiceV2_handleRegisterSeedPeerRequest(t *testing.T) {
tests := []struct {
name string
req *schedulerv2.RegisterSeedPeerRequest
run func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
}{
{
name: "host not found",
req: &schedulerv2.RegisterSeedPeerRequest{},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(nil, false).Times(1),
)
assert := assert.New(t)
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req),
status.Errorf(codes.NotFound, "host %s not found", peer.Host.ID))
},
},
{
name: "can not found available peer",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
)
peer.Priority = commonv2.Priority_LEVEL1
assert := assert.New(t)
assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req))
assert.Equal(peer.NeedBackToSource.Load(), true)
},
},
{
name: "size scope is SizeScope_EMPTY and load AnnouncePeerStream failed",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
)
peer.Task.ContentLength.Store(0)
peer.Priority = commonv2.Priority_LEVEL6
assert := assert.New(t)
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
status.Error(codes.NotFound, "AnnouncePeerStream not found"))
assert.Equal(peer.FSM.Current(), resource.PeerStatePending)
},
},
{
name: "size scope is SizeScope_EMPTY and event PeerEventRegisterEmpty failed",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
)
peer.Task.ContentLength.Store(0)
peer.Priority = commonv2.Priority_LEVEL6
peer.StoreAnnouncePeerStream(stream)
peer.FSM.SetState(resource.PeerStateReceivedEmpty)
assert := assert.New(t)
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
status.Errorf(codes.Internal, "event RegisterEmpty inappropriate in current state ReceivedEmpty"))
},
},
{
name: "size scope is SizeScope_EMPTY and send EmptyTaskResponse failed",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_EmptyTaskResponse{
EmptyTaskResponse: &schedulerv2.EmptyTaskResponse{},
},
})).Return(errors.New("foo")).Times(1),
)
peer.Task.ContentLength.Store(0)
peer.Priority = commonv2.Priority_LEVEL6
peer.StoreAnnouncePeerStream(stream)
assert := assert.New(t)
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
status.Errorf(codes.Internal, "foo"))
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty)
},
},
{
name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and event PeerEventRegisterNormal failed",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
)
peer.Task.ContentLength.Store(1)
peer.Priority = commonv2.Priority_LEVEL6
peer.FSM.SetState(resource.PeerStateReceivedNormal)
assert := assert.New(t)
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
status.Error(codes.Internal, "event RegisterNormal inappropriate in current state ReceivedNormal"))
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
},
},
{
name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and scheduling failed",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
)
peer.Task.ContentLength.Store(1)
peer.Priority = commonv2.Priority_LEVEL6
assert := assert.New(t)
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
status.Error(codes.FailedPrecondition, "foo"))
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
},
},
{
name: "size scope is SizeScope_TINY and task can not reuse DirectPiece",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
)
peer.Task.ContentLength.Store(1)
peer.Priority = commonv2.Priority_LEVEL6
assert := assert.New(t)
assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
},
},
{
name: "size scope is SizeScope_SMALL and task can not found success parent",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
)
peer.Task.ContentLength.Store(129)
peer.Task.TotalPieceCount.Store(1)
peer.Priority = commonv2.Priority_LEVEL6
assert := assert.New(t)
assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
},
},
{
name: "size scope is SizeScope_SMALL and load AnnouncePeerStream failed",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
)
peer.Task.ContentLength.Store(129)
peer.Task.TotalPieceCount.Store(1)
peer.Task.StorePeer(peer)
peer.Task.StorePeer(seedPeer)
peer.Priority = commonv2.Priority_LEVEL6
assert := assert.New(t)
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
status.Error(codes.NotFound, "AnnouncePeerStream not found"))
assert.Equal(peer.FSM.Current(), resource.PeerStatePending)
},
},
{
name: "size scope is SizeScope_SMALL and event PeerEventRegisterSmall failed",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
)
peer.Task.ContentLength.Store(129)
peer.Task.TotalPieceCount.Store(1)
peer.Task.StorePeer(peer)
peer.Task.StorePeer(seedPeer)
peer.Priority = commonv2.Priority_LEVEL6
peer.StoreAnnouncePeerStream(stream)
peer.FSM.SetState(resource.PeerStateReceivedSmall)
assert := assert.New(t)
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
status.Error(codes.Internal, "event RegisterSmall inappropriate in current state ReceivedSmall"))
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall)
},
},
{
name: "size scope is SizeScope_SMALL and send SmallTaskResponse failed",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
ma.Send(gomock.Any()).Return(errors.New("foo")).Times(1),
)
peer.Task.ContentLength.Store(129)
peer.Task.TotalPieceCount.Store(1)
peer.Task.StorePeer(peer)
peer.Task.StorePeer(seedPeer)
peer.Priority = commonv2.Priority_LEVEL6
peer.StoreAnnouncePeerStream(stream)
assert := assert.New(t)
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
status.Error(codes.Internal, "foo"))
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall)
},
},
{
name: "size scope is SizeScope_SMALL",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
ma.Send(gomock.Any()).Return(nil).Times(1),
)
peer.Task.ContentLength.Store(129)
peer.Task.TotalPieceCount.Store(1)
peer.Task.StorePeer(peer)
peer.Task.StorePeer(seedPeer)
peer.Priority = commonv2.Priority_LEVEL6
peer.StoreAnnouncePeerStream(stream)
assert := assert.New(t)
assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall)
},
},
{
name: "size scope is SizeScope_NORMAL",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
)
peer.Task.ContentLength.Store(129)
peer.Task.TotalPieceCount.Store(2)
peer.Task.StorePeer(peer)
peer.Task.StorePeer(seedPeer)
peer.Priority = commonv2.Priority_LEVEL6
peer.StoreAnnouncePeerStream(stream)
assert := assert.New(t)
assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
},
},
{
name: "size scope is SizeScope_UNKNOW",
req: &schedulerv2.RegisterSeedPeerRequest{
Download: &commonv2.Download{
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
)
peer.Priority = commonv2.Priority_LEVEL6
assert := assert.New(t)
assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
scheduling := schedulingmocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := resource.NewMockHostManager(ctl)
peerManager := resource.NewMockPeerManager(ctl)
taskManager := resource.NewMockTaskManager(ctl)
stream := schedulerv2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
seedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT())
})
}
}
func TestServiceV2_handleResource(t *testing.T) {
tests := []struct {
name string
req *schedulerv2.RegisterPeerRequest
run func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
download *commonv2.Download
run func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder)
}{
{
name: "host can not be loaded",
req: &schedulerv2.RegisterPeerRequest{},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
download: &commonv2.Download{},
run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
@ -1408,20 +1885,18 @@ func TestServiceV2_handleResource(t *testing.T) {
)
assert := assert.New(t)
_, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req)
assert.EqualError(err, fmt.Sprintf("host %s not found", mockHost.ID))
_, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
assert.ErrorIs(err, status.Errorf(codes.NotFound, "host %s not found", mockHost.ID))
},
},
{
name: "task can be loaded",
req: &schedulerv2.RegisterPeerRequest{
Download: &commonv2.Download{
download: &commonv2.Download{
Url: "foo",
Filters: []string{"bar"},
Header: map[string]string{"baz": "bas"},
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
@ -1434,26 +1909,24 @@ func TestServiceV2_handleResource(t *testing.T) {
)
assert := assert.New(t)
host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req)
host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
assert.NoError(err)
assert.EqualValues(host, mockHost)
assert.Equal(task.ID, mockTask.ID)
assert.Equal(task.URL, req.Download.Url)
assert.EqualValues(task.Filters, req.Download.Filters)
assert.EqualValues(task.Header, req.Download.Header)
assert.Equal(task.URL, download.Url)
assert.EqualValues(task.Filters, download.Filters)
assert.EqualValues(task.Header, download.Header)
},
},
{
name: "task can not be loaded",
req: &schedulerv2.RegisterPeerRequest{
Download: &commonv2.Download{
download: &commonv2.Download{
Url: "foo",
Filters: []string{"bar"},
Header: map[string]string{"baz": "bas"},
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
@ -1468,24 +1941,22 @@ func TestServiceV2_handleResource(t *testing.T) {
)
assert := assert.New(t)
host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req)
host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
assert.NoError(err)
assert.EqualValues(host, mockHost)
assert.Equal(task.ID, mockTask.ID)
assert.Equal(task.Digest.String(), req.Download.Digest)
assert.Equal(task.URL, req.Download.Url)
assert.EqualValues(task.Filters, req.Download.Filters)
assert.EqualValues(task.Header, req.Download.Header)
assert.Equal(task.Digest.String(), download.Digest)
assert.Equal(task.URL, download.Url)
assert.EqualValues(task.Filters, download.Filters)
assert.EqualValues(task.Header, download.Header)
},
},
{
name: "invalid digest",
req: &schedulerv2.RegisterPeerRequest{
Download: &commonv2.Download{
download: &commonv2.Download{
Digest: "foo",
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
@ -1496,21 +1967,19 @@ func TestServiceV2_handleResource(t *testing.T) {
)
assert := assert.New(t)
_, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req)
assert.EqualError(err, "invalid digest foo")
_, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
assert.ErrorIs(err, status.Error(codes.InvalidArgument, "invalid digest"))
},
},
{
name: "peer can be loaded",
req: &schedulerv2.RegisterPeerRequest{
Download: &commonv2.Download{
download: &commonv2.Download{
Url: "foo",
Filters: []string{"bar"},
Header: map[string]string{"baz": "bas"},
Digest: mockTaskDigest.String(),
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
@ -1523,21 +1992,20 @@ func TestServiceV2_handleResource(t *testing.T) {
)
assert := assert.New(t)
host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req)
host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
assert.NoError(err)
assert.EqualValues(host, mockHost)
assert.Equal(task.ID, mockTask.ID)
assert.Equal(task.Digest.String(), req.Download.Digest)
assert.Equal(task.URL, req.Download.Url)
assert.EqualValues(task.Filters, req.Download.Filters)
assert.EqualValues(task.Header, req.Download.Header)
assert.Equal(task.Digest.String(), download.Digest)
assert.Equal(task.URL, download.Url)
assert.EqualValues(task.Filters, download.Filters)
assert.EqualValues(task.Header, download.Header)
assert.EqualValues(peer, mockPeer)
},
},
{
name: "peer can not be loaded",
req: &schedulerv2.RegisterPeerRequest{
Download: &commonv2.Download{
download: &commonv2.Download{
Url: "foo",
Filters: []string{"bar"},
Header: map[string]string{"baz": "bas"},
@ -1548,8 +2016,7 @@ func TestServiceV2_handleResource(t *testing.T) {
Length: mockPeerRange.Length,
},
},
},
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
@ -1564,18 +2031,18 @@ func TestServiceV2_handleResource(t *testing.T) {
)
assert := assert.New(t)
host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, req)
host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
assert.NoError(err)
assert.EqualValues(host, mockHost)
assert.Equal(task.ID, mockTask.ID)
assert.Equal(task.Digest.String(), req.Download.Digest)
assert.Equal(task.URL, req.Download.Url)
assert.EqualValues(task.Filters, req.Download.Filters)
assert.EqualValues(task.Header, req.Download.Header)
assert.Equal(task.Digest.String(), download.Digest)
assert.Equal(task.URL, download.Url)
assert.EqualValues(task.Filters, download.Filters)
assert.EqualValues(task.Header, download.Header)
assert.Equal(peer.ID, mockPeer.ID)
assert.Equal(peer.Priority, req.Download.Priority)
assert.Equal(peer.Range.Start, req.Download.Range.Start)
assert.Equal(peer.Range.Length, req.Download.Range.Length)
assert.Equal(peer.Priority, download.Priority)
assert.Equal(peer.Range.Start, download.Range.Start)
assert.Equal(peer.Range.Length, download.Range.Length)
assert.NotNil(peer.AnnouncePeerStream)
assert.EqualValues(peer.Host, mockHost)
assert.EqualValues(peer.Task, mockTask)
@ -1603,7 +2070,7 @@ func TestServiceV2_handleResource(t *testing.T) {
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
tc.run(t, svc, tc.req, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT())
tc.run(t, svc, tc.download, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT())
})
}
}