feat: remove TinyTaskResponse and SmallTaskResponse message (#2881)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
d24c20b719
commit
86ce09f53a
2
go.mod
2
go.mod
|
|
@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
|
||||||
go 1.21
|
go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
d7y.io/api/v2 v2.0.43
|
d7y.io/api/v2 v2.0.45
|
||||||
github.com/MysteriousPotato/go-lockable v1.0.0
|
github.com/MysteriousPotato/go-lockable v1.0.0
|
||||||
github.com/RichardKnop/machinery v1.10.6
|
github.com/RichardKnop/machinery v1.10.6
|
||||||
github.com/Showmax/go-fqdn v1.0.0
|
github.com/Showmax/go-fqdn v1.0.0
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
|
||||||
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
||||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||||
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
|
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
|
||||||
d7y.io/api/v2 v2.0.43 h1:4IdL+j1CAJp4QIs71YeItKZV/lzqya98ksxoGVnaQUQ=
|
d7y.io/api/v2 v2.0.45 h1:a39URUlu6SpkFeeGxDTnl9QQTn4bHaEdex1ARpZfmAo=
|
||||||
d7y.io/api/v2 v2.0.43/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
|
d7y.io/api/v2 v2.0.45/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||||
|
|
|
||||||
|
|
@ -544,174 +544,6 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S
|
||||||
return candidateParents
|
return candidateParents
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConstructSuccessSmallTaskResponse constructs scheduling successful response of the small task.
|
|
||||||
// Used only in v2 version of the grpc.
|
|
||||||
func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedulerv2.AnnouncePeerResponse_SmallTaskResponse {
|
|
||||||
parent := &commonv2.Peer{
|
|
||||||
Id: candidateParent.ID,
|
|
||||||
Priority: candidateParent.Priority,
|
|
||||||
Cost: durationpb.New(candidateParent.Cost.Load()),
|
|
||||||
State: candidateParent.FSM.Current(),
|
|
||||||
NeedBackToSource: candidateParent.NeedBackToSource.Load(),
|
|
||||||
CreatedAt: timestamppb.New(candidateParent.CreatedAt.Load()),
|
|
||||||
UpdatedAt: timestamppb.New(candidateParent.UpdatedAt.Load()),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set range to parent.
|
|
||||||
if candidateParent.Range != nil {
|
|
||||||
parent.Range = &commonv2.Range{
|
|
||||||
Start: candidateParent.Range.Start,
|
|
||||||
Length: candidateParent.Range.Length,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set pieces to parent.
|
|
||||||
candidateParent.Pieces.Range(func(key, value any) bool {
|
|
||||||
candidateParentPiece, ok := value.(*resource.Piece)
|
|
||||||
if !ok {
|
|
||||||
candidateParent.Log.Errorf("invalid piece %s %#v", key, value)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
piece := &commonv2.Piece{
|
|
||||||
Number: candidateParentPiece.Number,
|
|
||||||
ParentId: &candidateParentPiece.ParentID,
|
|
||||||
Offset: candidateParentPiece.Offset,
|
|
||||||
Length: candidateParentPiece.Length,
|
|
||||||
TrafficType: &candidateParentPiece.TrafficType,
|
|
||||||
Cost: durationpb.New(candidateParentPiece.Cost),
|
|
||||||
CreatedAt: timestamppb.New(candidateParentPiece.CreatedAt),
|
|
||||||
}
|
|
||||||
|
|
||||||
if candidateParentPiece.Digest != nil {
|
|
||||||
piece.Digest = candidateParentPiece.Digest.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
parent.Pieces = append(parent.Pieces, piece)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
// Set task to parent.
|
|
||||||
parent.Task = &commonv2.Task{
|
|
||||||
Id: candidateParent.Task.ID,
|
|
||||||
Type: candidateParent.Task.Type,
|
|
||||||
Url: candidateParent.Task.URL,
|
|
||||||
Tag: &candidateParent.Task.Tag,
|
|
||||||
Application: &candidateParent.Task.Application,
|
|
||||||
Filters: candidateParent.Task.Filters,
|
|
||||||
Header: candidateParent.Task.Header,
|
|
||||||
PieceLength: candidateParent.Task.PieceLength,
|
|
||||||
ContentLength: candidateParent.Task.ContentLength.Load(),
|
|
||||||
PieceCount: candidateParent.Task.TotalPieceCount.Load(),
|
|
||||||
SizeScope: candidateParent.Task.SizeScope(),
|
|
||||||
State: candidateParent.Task.FSM.Current(),
|
|
||||||
PeerCount: int32(candidateParent.Task.PeerCount()),
|
|
||||||
CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()),
|
|
||||||
UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set digest to parent task.
|
|
||||||
if candidateParent.Task.Digest != nil {
|
|
||||||
dgst := candidateParent.Task.Digest.String()
|
|
||||||
parent.Task.Digest = &dgst
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set pieces to parent task.
|
|
||||||
candidateParent.Task.Pieces.Range(func(key, value any) bool {
|
|
||||||
taskPiece, ok := value.(*resource.Piece)
|
|
||||||
if !ok {
|
|
||||||
candidateParent.Task.Log.Errorf("invalid piece %s %#v", key, value)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
piece := &commonv2.Piece{
|
|
||||||
Number: taskPiece.Number,
|
|
||||||
ParentId: &taskPiece.ParentID,
|
|
||||||
Offset: taskPiece.Offset,
|
|
||||||
Length: taskPiece.Length,
|
|
||||||
TrafficType: &taskPiece.TrafficType,
|
|
||||||
Cost: durationpb.New(taskPiece.Cost),
|
|
||||||
CreatedAt: timestamppb.New(taskPiece.CreatedAt),
|
|
||||||
}
|
|
||||||
|
|
||||||
if taskPiece.Digest != nil {
|
|
||||||
piece.Digest = taskPiece.Digest.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
parent.Task.Pieces = append(parent.Task.Pieces, piece)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
// Set host to parent.
|
|
||||||
parent.Host = &commonv2.Host{
|
|
||||||
Id: candidateParent.Host.ID,
|
|
||||||
Type: uint32(candidateParent.Host.Type),
|
|
||||||
Hostname: candidateParent.Host.Hostname,
|
|
||||||
Ip: candidateParent.Host.IP,
|
|
||||||
Port: candidateParent.Host.Port,
|
|
||||||
DownloadPort: candidateParent.Host.DownloadPort,
|
|
||||||
Os: candidateParent.Host.OS,
|
|
||||||
Platform: candidateParent.Host.Platform,
|
|
||||||
PlatformFamily: candidateParent.Host.PlatformFamily,
|
|
||||||
PlatformVersion: candidateParent.Host.PlatformVersion,
|
|
||||||
KernelVersion: candidateParent.Host.KernelVersion,
|
|
||||||
Cpu: &commonv2.CPU{
|
|
||||||
LogicalCount: candidateParent.Host.CPU.LogicalCount,
|
|
||||||
PhysicalCount: candidateParent.Host.CPU.PhysicalCount,
|
|
||||||
Percent: candidateParent.Host.CPU.Percent,
|
|
||||||
ProcessPercent: candidateParent.Host.CPU.ProcessPercent,
|
|
||||||
Times: &commonv2.CPUTimes{
|
|
||||||
User: candidateParent.Host.CPU.Times.User,
|
|
||||||
System: candidateParent.Host.CPU.Times.System,
|
|
||||||
Idle: candidateParent.Host.CPU.Times.Idle,
|
|
||||||
Nice: candidateParent.Host.CPU.Times.Nice,
|
|
||||||
Iowait: candidateParent.Host.CPU.Times.Iowait,
|
|
||||||
Irq: candidateParent.Host.CPU.Times.Irq,
|
|
||||||
Softirq: candidateParent.Host.CPU.Times.Softirq,
|
|
||||||
Steal: candidateParent.Host.CPU.Times.Steal,
|
|
||||||
Guest: candidateParent.Host.CPU.Times.Guest,
|
|
||||||
GuestNice: candidateParent.Host.CPU.Times.GuestNice,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Memory: &commonv2.Memory{
|
|
||||||
Total: candidateParent.Host.Memory.Total,
|
|
||||||
Available: candidateParent.Host.Memory.Available,
|
|
||||||
Used: candidateParent.Host.Memory.Used,
|
|
||||||
UsedPercent: candidateParent.Host.Memory.UsedPercent,
|
|
||||||
ProcessUsedPercent: candidateParent.Host.Memory.ProcessUsedPercent,
|
|
||||||
Free: candidateParent.Host.Memory.Free,
|
|
||||||
},
|
|
||||||
Network: &commonv2.Network{
|
|
||||||
TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount,
|
|
||||||
UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount,
|
|
||||||
Location: &candidateParent.Host.Network.Location,
|
|
||||||
Idc: &candidateParent.Host.Network.IDC,
|
|
||||||
},
|
|
||||||
Disk: &commonv2.Disk{
|
|
||||||
Total: candidateParent.Host.Disk.Total,
|
|
||||||
Free: candidateParent.Host.Disk.Free,
|
|
||||||
Used: candidateParent.Host.Disk.Used,
|
|
||||||
UsedPercent: candidateParent.Host.Disk.UsedPercent,
|
|
||||||
InodesTotal: candidateParent.Host.Disk.InodesTotal,
|
|
||||||
InodesUsed: candidateParent.Host.Disk.InodesUsed,
|
|
||||||
InodesFree: candidateParent.Host.Disk.InodesFree,
|
|
||||||
InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent,
|
|
||||||
},
|
|
||||||
Build: &commonv2.Build{
|
|
||||||
GitVersion: candidateParent.Host.Build.GitVersion,
|
|
||||||
GitCommit: &candidateParent.Host.Build.GitCommit,
|
|
||||||
GoVersion: &candidateParent.Host.Build.GoVersion,
|
|
||||||
Platform: &candidateParent.Host.Build.Platform,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{
|
|
||||||
SmallTaskResponse: &schedulerv2.SmallTaskResponse{
|
|
||||||
CandidateParent: parent,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ConstructSuccessNormalTaskResponse constructs scheduling successful response of the normal task.
|
// ConstructSuccessNormalTaskResponse constructs scheduling successful response of the normal task.
|
||||||
// Used only in v2 version of the grpc.
|
// Used only in v2 version of the grpc.
|
||||||
func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse {
|
func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse {
|
||||||
|
|
|
||||||
|
|
@ -1291,159 +1291,6 @@ func TestScheduling_FindSuccessParent(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
expect func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer)
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "construct success",
|
|
||||||
expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer) {
|
|
||||||
dgst := candidateParent.Task.Digest.String()
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.EqualValues(resp, &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{
|
|
||||||
SmallTaskResponse: &schedulerv2.SmallTaskResponse{
|
|
||||||
CandidateParent: &commonv2.Peer{
|
|
||||||
Id: candidateParent.ID,
|
|
||||||
Range: &commonv2.Range{
|
|
||||||
Start: candidateParent.Range.Start,
|
|
||||||
Length: candidateParent.Range.Length,
|
|
||||||
},
|
|
||||||
Priority: candidateParent.Priority,
|
|
||||||
Pieces: []*commonv2.Piece{
|
|
||||||
{
|
|
||||||
Number: mockPiece.Number,
|
|
||||||
ParentId: &mockPiece.ParentID,
|
|
||||||
Offset: mockPiece.Offset,
|
|
||||||
Length: mockPiece.Length,
|
|
||||||
Digest: mockPiece.Digest.String(),
|
|
||||||
TrafficType: &mockPiece.TrafficType,
|
|
||||||
Cost: durationpb.New(mockPiece.Cost),
|
|
||||||
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Cost: durationpb.New(candidateParent.Cost.Load()),
|
|
||||||
State: candidateParent.FSM.Current(),
|
|
||||||
Task: &commonv2.Task{
|
|
||||||
Id: candidateParent.Task.ID,
|
|
||||||
Type: candidateParent.Task.Type,
|
|
||||||
Url: candidateParent.Task.URL,
|
|
||||||
Digest: &dgst,
|
|
||||||
Tag: &candidateParent.Task.Tag,
|
|
||||||
Application: &candidateParent.Task.Application,
|
|
||||||
Filters: candidateParent.Task.Filters,
|
|
||||||
Header: candidateParent.Task.Header,
|
|
||||||
PieceLength: candidateParent.Task.PieceLength,
|
|
||||||
ContentLength: candidateParent.Task.ContentLength.Load(),
|
|
||||||
PieceCount: candidateParent.Task.TotalPieceCount.Load(),
|
|
||||||
SizeScope: candidateParent.Task.SizeScope(),
|
|
||||||
Pieces: []*commonv2.Piece{
|
|
||||||
{
|
|
||||||
Number: mockPiece.Number,
|
|
||||||
ParentId: &mockPiece.ParentID,
|
|
||||||
Offset: mockPiece.Offset,
|
|
||||||
Length: mockPiece.Length,
|
|
||||||
Digest: mockPiece.Digest.String(),
|
|
||||||
TrafficType: &mockPiece.TrafficType,
|
|
||||||
Cost: durationpb.New(mockPiece.Cost),
|
|
||||||
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
State: candidateParent.Task.FSM.Current(),
|
|
||||||
PeerCount: int32(candidateParent.Task.PeerCount()),
|
|
||||||
CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()),
|
|
||||||
UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()),
|
|
||||||
},
|
|
||||||
Host: &commonv2.Host{
|
|
||||||
Id: candidateParent.Host.ID,
|
|
||||||
Type: uint32(candidateParent.Host.Type),
|
|
||||||
Hostname: candidateParent.Host.Hostname,
|
|
||||||
Ip: candidateParent.Host.IP,
|
|
||||||
Port: candidateParent.Host.Port,
|
|
||||||
DownloadPort: candidateParent.Host.DownloadPort,
|
|
||||||
Os: candidateParent.Host.OS,
|
|
||||||
Platform: candidateParent.Host.Platform,
|
|
||||||
PlatformFamily: candidateParent.Host.PlatformFamily,
|
|
||||||
PlatformVersion: candidateParent.Host.PlatformVersion,
|
|
||||||
KernelVersion: candidateParent.Host.KernelVersion,
|
|
||||||
Cpu: &commonv2.CPU{
|
|
||||||
LogicalCount: candidateParent.Host.CPU.LogicalCount,
|
|
||||||
PhysicalCount: candidateParent.Host.CPU.PhysicalCount,
|
|
||||||
Percent: candidateParent.Host.CPU.Percent,
|
|
||||||
ProcessPercent: candidateParent.Host.CPU.ProcessPercent,
|
|
||||||
Times: &commonv2.CPUTimes{
|
|
||||||
User: candidateParent.Host.CPU.Times.User,
|
|
||||||
System: candidateParent.Host.CPU.Times.System,
|
|
||||||
Idle: candidateParent.Host.CPU.Times.Idle,
|
|
||||||
Nice: candidateParent.Host.CPU.Times.Nice,
|
|
||||||
Iowait: candidateParent.Host.CPU.Times.Iowait,
|
|
||||||
Irq: candidateParent.Host.CPU.Times.Irq,
|
|
||||||
Softirq: candidateParent.Host.CPU.Times.Softirq,
|
|
||||||
Steal: candidateParent.Host.CPU.Times.Steal,
|
|
||||||
Guest: candidateParent.Host.CPU.Times.Guest,
|
|
||||||
GuestNice: candidateParent.Host.CPU.Times.GuestNice,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Memory: &commonv2.Memory{
|
|
||||||
Total: candidateParent.Host.Memory.Total,
|
|
||||||
Available: candidateParent.Host.Memory.Available,
|
|
||||||
Used: candidateParent.Host.Memory.Used,
|
|
||||||
UsedPercent: candidateParent.Host.Memory.UsedPercent,
|
|
||||||
ProcessUsedPercent: candidateParent.Host.Memory.ProcessUsedPercent,
|
|
||||||
Free: candidateParent.Host.Memory.Free,
|
|
||||||
},
|
|
||||||
Network: &commonv2.Network{
|
|
||||||
TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount,
|
|
||||||
UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount,
|
|
||||||
Location: &candidateParent.Host.Network.Location,
|
|
||||||
Idc: &candidateParent.Host.Network.IDC,
|
|
||||||
},
|
|
||||||
Disk: &commonv2.Disk{
|
|
||||||
Total: candidateParent.Host.Disk.Total,
|
|
||||||
Free: candidateParent.Host.Disk.Free,
|
|
||||||
Used: candidateParent.Host.Disk.Used,
|
|
||||||
UsedPercent: candidateParent.Host.Disk.UsedPercent,
|
|
||||||
InodesTotal: candidateParent.Host.Disk.InodesTotal,
|
|
||||||
InodesUsed: candidateParent.Host.Disk.InodesUsed,
|
|
||||||
InodesFree: candidateParent.Host.Disk.InodesFree,
|
|
||||||
InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent,
|
|
||||||
},
|
|
||||||
Build: &commonv2.Build{
|
|
||||||
GitVersion: candidateParent.Host.Build.GitVersion,
|
|
||||||
GitCommit: &candidateParent.Host.Build.GitCommit,
|
|
||||||
GoVersion: &candidateParent.Host.Build.GoVersion,
|
|
||||||
Platform: &candidateParent.Host.Build.Platform,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
NeedBackToSource: candidateParent.NeedBackToSource.Load(),
|
|
||||||
CreatedAt: timestamppb.New(candidateParent.CreatedAt.Load()),
|
|
||||||
UpdatedAt: timestamppb.New(candidateParent.UpdatedAt.Load()),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range tests {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
mockHost := resource.NewHost(
|
|
||||||
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
|
|
||||||
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
|
|
||||||
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
|
|
||||||
candidateParent := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost, resource.WithRange(nethttp.Range{
|
|
||||||
Start: 1,
|
|
||||||
Length: 10,
|
|
||||||
}))
|
|
||||||
candidateParent.StorePiece(&mockPiece)
|
|
||||||
candidateParent.Task.StorePiece(&mockPiece)
|
|
||||||
|
|
||||||
tc.expect(t, ConstructSuccessSmallTaskResponse(candidateParent), candidateParent)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
|
func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
|
|
||||||
|
|
@ -993,23 +993,6 @@ func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context,
|
||||||
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil {
|
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil {
|
||||||
return status.Error(codes.Internal, err.Error())
|
return status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the task size scope is tiny, scheduler needs to download the tiny file from peer and
|
|
||||||
// store the data in task DirectPiece.
|
|
||||||
if peer.Task.SizeScope() == commonv2.SizeScope_TINY {
|
|
||||||
data, err := peer.DownloadTinyFile()
|
|
||||||
if err != nil {
|
|
||||||
peer.Log.Errorf("download failed: %s", err.Error())
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(data) != int(peer.Task.ContentLength.Load()) {
|
|
||||||
peer.Log.Errorf("data length %d is not equal content length %d", len(data), peer.Task.ContentLength.Load())
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
peer.Task.DirectPiece = data
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect DownloadPeerCount and DownloadPeerDuration metrics.
|
// Collect DownloadPeerCount and DownloadPeerDuration metrics.
|
||||||
|
|
@ -1388,76 +1371,7 @@ func (v *V2) schedule(ctx context.Context, peer *resource.Peer) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
case commonv2.SizeScope_TINY:
|
case commonv2.SizeScope_NORMAL, commonv2.SizeScope_TINY, commonv2.SizeScope_SMALL, commonv2.SizeScope_UNKNOW:
|
||||||
// If the task.DirectPiece of the task can be reused, the data of
|
|
||||||
// the task will be included in the TinyTaskResponse.
|
|
||||||
// If the task.DirectPiece cannot be reused,
|
|
||||||
// it will be scheduled as a Normal Task.
|
|
||||||
peer.Log.Info("scheduling as SizeScope_TINY")
|
|
||||||
if !peer.Task.CanReuseDirectPiece() {
|
|
||||||
peer.Log.Warnf("can not reuse direct piece %d %d", len(peer.Task.DirectPiece), peer.Task.ContentLength.Load())
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
stream, loaded := peer.LoadAnnouncePeerStream()
|
|
||||||
if !loaded {
|
|
||||||
return status.Error(codes.NotFound, "AnnouncePeerStream not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := peer.FSM.Event(ctx, resource.PeerEventRegisterTiny); err != nil {
|
|
||||||
return status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
|
|
||||||
Response: &schedulerv2.AnnouncePeerResponse_TinyTaskResponse{
|
|
||||||
TinyTaskResponse: &schedulerv2.TinyTaskResponse{
|
|
||||||
Content: peer.Task.DirectPiece,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}); err != nil {
|
|
||||||
return status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
case commonv2.SizeScope_SMALL:
|
|
||||||
// If a parent with the state of PeerStateSucceeded can be found in the task,
|
|
||||||
// its information will be returned. If a parent with the state of
|
|
||||||
// PeerStateSucceeded cannot be found in the task,
|
|
||||||
// it will be scheduled as a Normal Task.
|
|
||||||
peer.Log.Info("scheduling as SizeScope_SMALL")
|
|
||||||
parent, found := v.scheduling.FindSuccessParent(ctx, peer, set.NewSafeSet[string]())
|
|
||||||
if !found {
|
|
||||||
peer.Log.Warn("candidate parents not found")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete inedges of peer.
|
|
||||||
if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil {
|
|
||||||
return status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add edges between success parent and peer.
|
|
||||||
if err := peer.Task.AddPeerEdge(parent, peer); err != nil {
|
|
||||||
return status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
stream, loaded := peer.LoadAnnouncePeerStream()
|
|
||||||
if !loaded {
|
|
||||||
return status.Error(codes.NotFound, "AnnouncePeerStream not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := peer.FSM.Event(ctx, resource.PeerEventRegisterSmall); err != nil {
|
|
||||||
return status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
|
|
||||||
Response: scheduling.ConstructSuccessSmallTaskResponse(parent),
|
|
||||||
}); err != nil {
|
|
||||||
return status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
case commonv2.SizeScope_NORMAL, commonv2.SizeScope_UNKNOW:
|
|
||||||
default:
|
default:
|
||||||
return status.Errorf(codes.FailedPrecondition, "invalid size cope %#v", sizeScope)
|
return status.Errorf(codes.FailedPrecondition, "invalid size cope %#v", sizeScope)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1593,255 +1593,6 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) {
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty)
|
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and event PeerEventRegisterNormal failed",
|
|
||||||
req: &schedulerv2.RegisterPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(1)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
peer.FSM.SetState(resource.PeerStateReceivedNormal)
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
|
|
||||||
status.Error(codes.Internal, "event RegisterNormal inappropriate in current state ReceivedNormal"))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and scheduling failed",
|
|
||||||
req: &schedulerv2.RegisterPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(1)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
|
|
||||||
status.Error(codes.FailedPrecondition, "foo"))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_TINY and task can not reuse DirectPiece",
|
|
||||||
req: &schedulerv2.RegisterPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(1)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_SMALL and task can not found success parent",
|
|
||||||
req: &schedulerv2.RegisterPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1),
|
|
||||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(129)
|
|
||||||
peer.Task.TotalPieceCount.Store(1)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_SMALL and load AnnouncePeerStream failed",
|
|
||||||
req: &schedulerv2.RegisterPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(129)
|
|
||||||
peer.Task.TotalPieceCount.Store(1)
|
|
||||||
peer.Task.StorePeer(peer)
|
|
||||||
peer.Task.StorePeer(seedPeer)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
|
|
||||||
status.Error(codes.NotFound, "AnnouncePeerStream not found"))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStatePending)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_SMALL and event PeerEventRegisterSmall failed",
|
|
||||||
req: &schedulerv2.RegisterPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(129)
|
|
||||||
peer.Task.TotalPieceCount.Store(1)
|
|
||||||
peer.Task.StorePeer(peer)
|
|
||||||
peer.Task.StorePeer(seedPeer)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
peer.StoreAnnouncePeerStream(stream)
|
|
||||||
peer.FSM.SetState(resource.PeerStateReceivedSmall)
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
|
|
||||||
status.Error(codes.Internal, "event RegisterSmall inappropriate in current state ReceivedSmall"))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_SMALL and send SmallTaskResponse failed",
|
|
||||||
req: &schedulerv2.RegisterPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
|
|
||||||
ma.Send(gomock.Any()).Return(errors.New("foo")).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(129)
|
|
||||||
peer.Task.TotalPieceCount.Store(1)
|
|
||||||
peer.Task.StorePeer(peer)
|
|
||||||
peer.Task.StorePeer(seedPeer)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
peer.StoreAnnouncePeerStream(stream)
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
|
|
||||||
status.Error(codes.Internal, "foo"))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_SMALL",
|
|
||||||
req: &schedulerv2.RegisterPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
|
|
||||||
ma.Send(gomock.Any()).Return(nil).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(129)
|
|
||||||
peer.Task.TotalPieceCount.Store(1)
|
|
||||||
peer.Task.StorePeer(peer)
|
|
||||||
peer.Task.StorePeer(seedPeer)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
peer.StoreAnnouncePeerStream(stream)
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "size scope is SizeScope_NORMAL",
|
name: "size scope is SizeScope_NORMAL",
|
||||||
req: &schedulerv2.RegisterPeerRequest{
|
req: &schedulerv2.RegisterPeerRequest{
|
||||||
|
|
@ -2105,255 +1856,6 @@ func TestServiceV2_handleRegisterSeedPeerRequest(t *testing.T) {
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty)
|
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and event PeerEventRegisterNormal failed",
|
|
||||||
req: &schedulerv2.RegisterSeedPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(1)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
peer.FSM.SetState(resource.PeerStateReceivedNormal)
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
|
|
||||||
status.Error(codes.Internal, "event RegisterNormal inappropriate in current state ReceivedNormal"))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_TINY, task can not reuse DirectPiece and scheduling failed",
|
|
||||||
req: &schedulerv2.RegisterSeedPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(1)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
|
|
||||||
status.Error(codes.FailedPrecondition, "foo"))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_TINY and task can not reuse DirectPiece",
|
|
||||||
req: &schedulerv2.RegisterSeedPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(1)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_SMALL and task can not found success parent",
|
|
||||||
req: &schedulerv2.RegisterSeedPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1),
|
|
||||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(129)
|
|
||||||
peer.Task.TotalPieceCount.Store(1)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_SMALL and load AnnouncePeerStream failed",
|
|
||||||
req: &schedulerv2.RegisterSeedPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(129)
|
|
||||||
peer.Task.TotalPieceCount.Store(1)
|
|
||||||
peer.Task.StorePeer(peer)
|
|
||||||
peer.Task.StorePeer(seedPeer)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
|
|
||||||
status.Error(codes.NotFound, "AnnouncePeerStream not found"))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStatePending)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_SMALL and event PeerEventRegisterSmall failed",
|
|
||||||
req: &schedulerv2.RegisterSeedPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(129)
|
|
||||||
peer.Task.TotalPieceCount.Store(1)
|
|
||||||
peer.Task.StorePeer(peer)
|
|
||||||
peer.Task.StorePeer(seedPeer)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
peer.StoreAnnouncePeerStream(stream)
|
|
||||||
peer.FSM.SetState(resource.PeerStateReceivedSmall)
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
|
|
||||||
status.Error(codes.Internal, "event RegisterSmall inappropriate in current state ReceivedSmall"))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_SMALL and send SmallTaskResponse failed",
|
|
||||||
req: &schedulerv2.RegisterSeedPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
|
|
||||||
ma.Send(gomock.Any()).Return(errors.New("foo")).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(129)
|
|
||||||
peer.Task.TotalPieceCount.Store(1)
|
|
||||||
peer.Task.StorePeer(peer)
|
|
||||||
peer.Task.StorePeer(seedPeer)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
peer.StoreAnnouncePeerStream(stream)
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.ErrorIs(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
|
|
||||||
status.Error(codes.Internal, "foo"))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "size scope is SizeScope_SMALL",
|
|
||||||
req: &schedulerv2.RegisterSeedPeerRequest{
|
|
||||||
Download: &commonv2.Download{
|
|
||||||
Digest: &dgst,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterSeedPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
|
|
||||||
peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
|
|
||||||
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
|
||||||
mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
|
|
||||||
mr.TaskManager().Return(taskManager).Times(1),
|
|
||||||
mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
ms.FindSuccessParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(seedPeer, true).Times(1),
|
|
||||||
ma.Send(gomock.Any()).Return(nil).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.Task.ContentLength.Store(129)
|
|
||||||
peer.Task.TotalPieceCount.Store(1)
|
|
||||||
peer.Task.StorePeer(peer)
|
|
||||||
peer.Task.StorePeer(seedPeer)
|
|
||||||
peer.Priority = commonv2.Priority_LEVEL6
|
|
||||||
peer.StoreAnnouncePeerStream(stream)
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.NoError(svc.handleRegisterSeedPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedSmall)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "size scope is SizeScope_NORMAL",
|
name: "size scope is SizeScope_NORMAL",
|
||||||
req: &schedulerv2.RegisterSeedPeerRequest{
|
req: &schedulerv2.RegisterSeedPeerRequest{
|
||||||
|
|
@ -2859,86 +2361,6 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) {
|
||||||
assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded)
|
assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "task size scope is SizeScope_TINY and download tiny file failed",
|
|
||||||
req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{
|
|
||||||
ContentLength: 127,
|
|
||||||
PieceCount: 1,
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
|
||||||
mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.FSM.SetState(resource.PeerStateRunning)
|
|
||||||
peer.Task.FSM.SetState(resource.TaskStateRunning)
|
|
||||||
peer.Host.DownloadPort = 0
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req))
|
|
||||||
assert.NotEqual(peer.Cost.Load(), 0)
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
|
|
||||||
assert.Equal(peer.Task.ContentLength.Load(), int64(127))
|
|
||||||
assert.Equal(peer.Task.TotalPieceCount.Load(), int32(1))
|
|
||||||
assert.Equal(len(peer.Task.DirectPiece), 0)
|
|
||||||
assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "task size scope is SizeScope_TINY and validate tiny file of downloading failed",
|
|
||||||
req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{
|
|
||||||
ContentLength: 126,
|
|
||||||
PieceCount: 1,
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
|
||||||
mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.FSM.SetState(resource.PeerStateRunning)
|
|
||||||
peer.Task.FSM.SetState(resource.TaskStateRunning)
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req))
|
|
||||||
assert.NotEqual(peer.Cost.Load(), 0)
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
|
|
||||||
assert.Equal(peer.Task.ContentLength.Load(), int64(126))
|
|
||||||
assert.Equal(peer.Task.TotalPieceCount.Load(), int32(1))
|
|
||||||
assert.Equal(len(peer.Task.DirectPiece), 0)
|
|
||||||
assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "task size scope is SizeScope_TINY and validate tiny file of downloading failed",
|
|
||||||
req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{
|
|
||||||
ContentLength: 1,
|
|
||||||
PieceCount: 1,
|
|
||||||
},
|
|
||||||
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
|
||||||
mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
|
||||||
gomock.InOrder(
|
|
||||||
mr.PeerManager().Return(peerManager).Times(1),
|
|
||||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
|
||||||
md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
|
|
||||||
)
|
|
||||||
|
|
||||||
peer.FSM.SetState(resource.PeerStateRunning)
|
|
||||||
peer.Task.FSM.SetState(resource.TaskStateRunning)
|
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req))
|
|
||||||
assert.NotEqual(peer.Cost.Load(), 0)
|
|
||||||
assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
|
|
||||||
assert.Equal(peer.Task.ContentLength.Load(), int64(1))
|
|
||||||
assert.Equal(peer.Task.TotalPieceCount.Load(), int32(1))
|
|
||||||
assert.Equal(len(peer.Task.DirectPiece), 1)
|
|
||||||
assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue