feat: remove concurrent_piece_count in scheduler (#2942)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-12-12 16:39:16 +08:00 committed by GitHub
parent 04f65de041
commit 67c36f62b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 33 additions and 255 deletions

View File

@ -4839,11 +4839,6 @@ const docTemplate = `{
"d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig": { "d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig": {
"type": "object", "type": "object",
"properties": { "properties": {
"concurrent_piece_count": {
"type": "integer",
"maximum": 50,
"minimum": 1
},
"load_limit": { "load_limit": {
"type": "integer", "type": "integer",
"maximum": 2000, "maximum": 2000,

View File

@ -4833,11 +4833,6 @@
"d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig": { "d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig": {
"type": "object", "type": "object",
"properties": { "properties": {
"concurrent_piece_count": {
"type": "integer",
"maximum": 50,
"minimum": 1
},
"load_limit": { "load_limit": {
"type": "integer", "type": "integer",
"maximum": 2000, "maximum": 2000,

View File

@ -811,10 +811,6 @@ definitions:
type: object type: object
d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig: d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig:
properties: properties:
concurrent_piece_count:
maximum: 50
minimum: 1
type: integer
load_limit: load_limit:
maximum: 2000 maximum: 2000
minimum: 1 minimum: 1

View File

@ -747,13 +747,12 @@ loop:
pt.Warnf("scheduler client send a peerPacket with empty peers") pt.Warnf("scheduler client send a peerPacket with empty peers")
continue continue
} }
pt.Infof("receive new peer packet, main peer: %s, parallel count: %d", pt.Infof("receive new peer packet, main peer: %s", peerPacket.MainPeer.PeerId)
peerPacket.MainPeer.PeerId, peerPacket.ParallelCount)
pt.span.AddEvent("receive new peer packet", pt.span.AddEvent("receive new peer packet",
trace.WithAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId))) trace.WithAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)))
if !firstPacketReceived { if !firstPacketReceived {
pt.initDownloadPieceWorkers(peerPacket.ParallelCount, pieceRequestQueue) pt.initDownloadPieceWorkers(pieceRequestQueue)
firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)) firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId))
firstPeerSpan.End() firstPeerSpan.End()
} }
@ -952,11 +951,9 @@ func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) {
} }
} }
func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQueue PieceDispatcher) { func (pt *peerTaskConductor) initDownloadPieceWorkers(pieceRequestQueue PieceDispatcher) {
if count < 1 { count := 4
count = 4 for i := int32(0); i < int32(count); i++ {
}
for i := int32(0); i < count; i++ {
go pt.downloadPieceWorker(i, pieceRequestQueue) go pt.downloadPieceWorker(i, pieceRequestQueue)
} }
} }

View File

@ -200,10 +200,9 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
return nil, dferrors.Newf(commonv1.Code_SchedNeedBackSource, "fake back source error") return nil, dferrors.Newf(commonv1.Code_SchedNeedBackSource, "fake back source error")
} }
return &schedulerv1.PeerPacket{ return &schedulerv1.PeerPacket{
Code: commonv1.Code_Success, Code: commonv1.Code_Success,
TaskId: opt.taskID, TaskId: opt.taskID,
SrcPid: "127.0.0.1", SrcPid: "127.0.0.1",
ParallelCount: opt.pieceParallelCount,
MainPeer: &schedulerv1.PeerPacket_DestPeer{ MainPeer: &schedulerv1.PeerPacket_DestPeer{
Ip: "127.0.0.1", Ip: "127.0.0.1",
RpcPort: port, RpcPort: port,

View File

@ -187,10 +187,9 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte,
} }
schedPeerPacket = true schedPeerPacket = true
return &schedulerv1.PeerPacket{ return &schedulerv1.PeerPacket{
Code: commonv1.Code_Success, Code: commonv1.Code_Success,
TaskId: opt.taskID, TaskId: opt.taskID,
SrcPid: "127.0.0.1", SrcPid: "127.0.0.1",
ParallelCount: opt.pieceParallelCount,
MainPeer: &schedulerv1.PeerPacket_DestPeer{ MainPeer: &schedulerv1.PeerPacket_DestPeer{
Ip: "127.0.0.1", Ip: "127.0.0.1",
RpcPort: port, RpcPort: port,

View File

@ -189,10 +189,9 @@ func trafficShaperSetupPeerTaskManagerComponents(ctrl *gomock.Controller, opt tr
return nil, dferrors.Newf(commonv1.Code_SchedNeedBackSource, "fake back source error") return nil, dferrors.Newf(commonv1.Code_SchedNeedBackSource, "fake back source error")
} }
return &schedulerv1.PeerPacket{ return &schedulerv1.PeerPacket{
Code: commonv1.Code_Success, Code: commonv1.Code_Success,
TaskId: task.taskID, TaskId: task.taskID,
SrcPid: "127.0.0.1", SrcPid: "127.0.0.1",
ParallelCount: 4,
MainPeer: &schedulerv1.PeerPacket_DestPeer{ MainPeer: &schedulerv1.PeerPacket_DestPeer{
Ip: "127.0.0.1", Ip: "127.0.0.1",
RpcPort: port, RpcPort: port,

2
go.mod
View File

@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21 go 1.21
require ( require (
d7y.io/api/v2 v2.0.60 d7y.io/api/v2 v2.0.62
github.com/MysteriousPotato/go-lockable v1.0.0 github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6 github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0 github.com/Showmax/go-fqdn v1.0.0

4
go.sum
View File

@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.60 h1:er07NeKpjnBOB8JzkddjtGWNRdRkhavO1Qn+0meajVw= d7y.io/api/v2 v2.0.62 h1:q4/r24DxWT+4zsGGMe8HqbjC3cw+B/s2+gwI2oKC7Og=
d7y.io/api/v2 v2.0.60/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io= d7y.io/api/v2 v2.0.62/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=

@ -1 +1 @@
Subproject commit cbfd04915aad1aef734102ed24a576310ea9714e Subproject commit 99681b829aa574d71763982d38e2e20eb8899575

View File

@ -116,8 +116,7 @@ func seed(db *gorm.DB) error {
"filter_parent_limit": schedulerconfig.DefaultSchedulerFilterParentLimit, "filter_parent_limit": schedulerconfig.DefaultSchedulerFilterParentLimit,
}, },
ClientConfig: map[string]any{ ClientConfig: map[string]any{
"load_limit": schedulerconfig.DefaultPeerConcurrentUploadLimit, "load_limit": schedulerconfig.DefaultPeerConcurrentUploadLimit,
"concurrent_piece_count": schedulerconfig.DefaultPeerConcurrentPieceCount,
}, },
Scopes: map[string]any{}, Scopes: map[string]any{},
IsDefault: true, IsDefault: true,

View File

@ -57,8 +57,7 @@ type SchedulerClusterConfig struct {
} }
type SchedulerClusterClientConfig struct { type SchedulerClusterClientConfig struct {
LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=2000"` LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=2000"`
ConcurrentPieceCount uint32 `yaml:"concurrentPieceCount" mapstructure:"concurrentPieceCount" json:"concurrent_piece_count" binding:"omitempty,gte=1,lte=50"`
} }
type SchedulerClusterScopes struct { type SchedulerClusterScopes struct {

View File

@ -30,9 +30,6 @@ const (
// DefaultPeerConcurrentUploadLimit is default number for peer concurrent upload limit. // DefaultPeerConcurrentUploadLimit is default number for peer concurrent upload limit.
DefaultPeerConcurrentUploadLimit = 50 DefaultPeerConcurrentUploadLimit = 50
// DefaultPeerConcurrentPieceCount is default number for pieces to concurrent downloading.
DefaultPeerConcurrentPieceCount = 4
// DefaultSchedulerCandidateParentLimit is default limit the number of candidate parent. // DefaultSchedulerCandidateParentLimit is default limit the number of candidate parent.
DefaultSchedulerCandidateParentLimit = 4 DefaultSchedulerCandidateParentLimit = 4

View File

@ -191,7 +191,7 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc
// Send NormalTaskResponse to peer. // Send NormalTaskResponse to peer.
peer.Log.Info("send NormalTaskResponse") peer.Log.Info("send NormalTaskResponse")
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
Response: ConstructSuccessNormalTaskResponse(s.dynconfig, candidateParents), Response: ConstructSuccessNormalTaskResponse(candidateParents),
}); err != nil { }); err != nil {
peer.Log.Error(err) peer.Log.Error(err)
return status.Error(codes.FailedPrecondition, err.Error()) return status.Error(codes.FailedPrecondition, err.Error())
@ -359,7 +359,7 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer
// Send PeerPacket to peer. // Send PeerPacket to peer.
peer.Log.Info("send PeerPacket to peer") peer.Log.Info("send PeerPacket to peer")
if err := stream.Send(ConstructSuccessPeerPacket(s.dynconfig, peer, candidateParents[0], candidateParents[1:])); err != nil { if err := stream.Send(ConstructSuccessPeerPacket(peer, candidateParents[0], candidateParents[1:])); err != nil {
n++ n++
peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error()) peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error())
@ -537,12 +537,7 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S
// ConstructSuccessNormalTaskResponse constructs scheduling successful response of the normal task. // ConstructSuccessNormalTaskResponse constructs scheduling successful response of the normal task.
// Used only in v2 version of the grpc. // Used only in v2 version of the grpc.
func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse { func ConstructSuccessNormalTaskResponse(candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse {
concurrentPieceCount := config.DefaultPeerConcurrentPieceCount
if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 {
concurrentPieceCount = int(config.ConcurrentPieceCount)
}
var parents []*commonv2.Peer var parents []*commonv2.Peer
for _, candidateParent := range candidateParents { for _, candidateParent := range candidateParents {
parent := &commonv2.Peer{ parent := &commonv2.Peer{
@ -708,20 +703,14 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can
return &schedulerv2.AnnouncePeerResponse_NormalTaskResponse{ return &schedulerv2.AnnouncePeerResponse_NormalTaskResponse{
NormalTaskResponse: &schedulerv2.NormalTaskResponse{ NormalTaskResponse: &schedulerv2.NormalTaskResponse{
CandidateParents: parents, CandidateParents: parents,
ConcurrentPieceCount: uint32(concurrentPieceCount),
}, },
} }
} }
// ConstructSuccessPeerPacket constructs peer successful packet. // ConstructSuccessPeerPacket constructs peer successful packet.
// Used only in v1 version of the grpc. // Used only in v1 version of the grpc.
func ConstructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket { func ConstructSuccessPeerPacket(peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket {
concurrentPieceCount := config.DefaultPeerConcurrentPieceCount
if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 {
concurrentPieceCount = int(config.ConcurrentPieceCount)
}
var parents []*schedulerv1.PeerPacket_DestPeer var parents []*schedulerv1.PeerPacket_DestPeer
for _, candidateParent := range candidateParents { for _, candidateParent := range candidateParents {
parents = append(parents, &schedulerv1.PeerPacket_DestPeer{ parents = append(parents, &schedulerv1.PeerPacket_DestPeer{
@ -732,9 +721,8 @@ func ConstructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resou
} }
return &schedulerv1.PeerPacket{ return &schedulerv1.PeerPacket{
TaskId: peer.Task.ID, TaskId: peer.Task.ID,
SrcPid: peer.ID, SrcPid: peer.ID,
ParallelCount: int32(concurrentPieceCount),
MainPeer: &schedulerv1.PeerPacket_DestPeer{ MainPeer: &schedulerv1.PeerPacket_DestPeer{
Ip: parent.Host.IP, Ip: parent.Host.IP,
RpcPort: parent.Host.Port, RpcPort: parent.Host.Port,

View File

@ -437,9 +437,6 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) {
peer.StoreAnnouncePeerStream(stream) peer.StoreAnnouncePeerStream(stream)
gomock.InOrder( gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2), md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2),
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{
ConcurrentPieceCount: 2,
}, nil).Times(1),
ma.Send(gomock.Any()).Return(nil).Times(1), ma.Send(gomock.Any()).Return(nil).Times(1),
) )
}, },
@ -726,9 +723,6 @@ func TestScheduling_ScheduleParentAndCandidateParents(t *testing.T) {
peer.StoreReportPieceResultStream(stream) peer.StoreReportPieceResultStream(stream)
gomock.InOrder( gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2), md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2),
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{
ConcurrentPieceCount: 2,
}, nil).Times(1),
mr.Send(gomock.Any()).Return(nil).Times(1), mr.Send(gomock.Any()).Return(nil).Times(1),
) )
}, },
@ -1348,16 +1342,10 @@ func TestScheduling_FindSuccessParent(t *testing.T) {
func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
mock func(md *configmocks.MockDynconfigInterfaceMockRecorder)
expect func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_NormalTaskResponse, candidateParents []*resource.Peer) expect func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_NormalTaskResponse, candidateParents []*resource.Peer)
}{ }{
{ {
name: "get concurrentPieceCount from dynconfig", name: "construct success normal task response",
mock: func(md *configmocks.MockDynconfigInterfaceMockRecorder) {
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{
ConcurrentPieceCount: 1,
}, nil).Times(1)
},
expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_NormalTaskResponse, candidateParents []*resource.Peer) { expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_NormalTaskResponse, candidateParents []*resource.Peer) {
dgst := candidateParents[0].Task.Digest.String() dgst := candidateParents[0].Task.Digest.String()
@ -1482,141 +1470,6 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
UpdatedAt: timestamppb.New(candidateParents[0].UpdatedAt.Load()), UpdatedAt: timestamppb.New(candidateParents[0].UpdatedAt.Load()),
}, },
}, },
ConcurrentPieceCount: 1,
},
})
},
},
{
name: "use default concurrentPieceCount",
mock: func(md *configmocks.MockDynconfigInterfaceMockRecorder) {
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1)
},
expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_NormalTaskResponse, candidateParents []*resource.Peer) {
dgst := candidateParents[0].Task.Digest.String()
assert := assert.New(t)
assert.EqualValues(resp, &schedulerv2.AnnouncePeerResponse_NormalTaskResponse{
NormalTaskResponse: &schedulerv2.NormalTaskResponse{
CandidateParents: []*commonv2.Peer{
{
Id: candidateParents[0].ID,
Range: &commonv2.Range{
Start: uint64(candidateParents[0].Range.Start),
Length: uint64(candidateParents[0].Range.Length),
},
Priority: candidateParents[0].Priority,
Pieces: []*commonv2.Piece{
{
Number: uint32(mockPiece.Number),
ParentId: &mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: &mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
},
Cost: durationpb.New(candidateParents[0].Cost.Load()),
State: candidateParents[0].FSM.Current(),
Task: &commonv2.Task{
Id: candidateParents[0].Task.ID,
Type: candidateParents[0].Task.Type,
Url: candidateParents[0].Task.URL,
Digest: &dgst,
Tag: &candidateParents[0].Task.Tag,
Application: &candidateParents[0].Task.Application,
Filters: candidateParents[0].Task.Filters,
Header: candidateParents[0].Task.Header,
PieceLength: uint32(candidateParents[0].Task.PieceLength),
ContentLength: uint64(candidateParents[0].Task.ContentLength.Load()),
PieceCount: uint32(candidateParents[0].Task.TotalPieceCount.Load()),
SizeScope: candidateParents[0].Task.SizeScope(),
Pieces: []*commonv2.Piece{
{
Number: uint32(mockPiece.Number),
ParentId: &mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: &mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
},
State: candidateParents[0].Task.FSM.Current(),
PeerCount: uint32(candidateParents[0].Task.PeerCount()),
CreatedAt: timestamppb.New(candidateParents[0].Task.CreatedAt.Load()),
UpdatedAt: timestamppb.New(candidateParents[0].Task.UpdatedAt.Load()),
},
Host: &commonv2.Host{
Id: candidateParents[0].Host.ID,
Type: uint32(candidateParents[0].Host.Type),
Hostname: candidateParents[0].Host.Hostname,
Ip: candidateParents[0].Host.IP,
Port: candidateParents[0].Host.Port,
DownloadPort: candidateParents[0].Host.DownloadPort,
Os: candidateParents[0].Host.OS,
Platform: candidateParents[0].Host.Platform,
PlatformFamily: candidateParents[0].Host.PlatformFamily,
PlatformVersion: candidateParents[0].Host.PlatformVersion,
KernelVersion: candidateParents[0].Host.KernelVersion,
Cpu: &commonv2.CPU{
LogicalCount: candidateParents[0].Host.CPU.LogicalCount,
PhysicalCount: candidateParents[0].Host.CPU.PhysicalCount,
Percent: candidateParents[0].Host.CPU.Percent,
ProcessPercent: candidateParents[0].Host.CPU.ProcessPercent,
Times: &commonv2.CPUTimes{
User: candidateParents[0].Host.CPU.Times.User,
System: candidateParents[0].Host.CPU.Times.System,
Idle: candidateParents[0].Host.CPU.Times.Idle,
Nice: candidateParents[0].Host.CPU.Times.Nice,
Iowait: candidateParents[0].Host.CPU.Times.Iowait,
Irq: candidateParents[0].Host.CPU.Times.Irq,
Softirq: candidateParents[0].Host.CPU.Times.Softirq,
Steal: candidateParents[0].Host.CPU.Times.Steal,
Guest: candidateParents[0].Host.CPU.Times.Guest,
GuestNice: candidateParents[0].Host.CPU.Times.GuestNice,
},
},
Memory: &commonv2.Memory{
Total: candidateParents[0].Host.Memory.Total,
Available: candidateParents[0].Host.Memory.Available,
Used: candidateParents[0].Host.Memory.Used,
UsedPercent: candidateParents[0].Host.Memory.UsedPercent,
ProcessUsedPercent: candidateParents[0].Host.Memory.ProcessUsedPercent,
Free: candidateParents[0].Host.Memory.Free,
},
Network: &commonv2.Network{
TcpConnectionCount: candidateParents[0].Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: candidateParents[0].Host.Network.UploadTCPConnectionCount,
Location: &candidateParents[0].Host.Network.Location,
Idc: &candidateParents[0].Host.Network.IDC,
},
Disk: &commonv2.Disk{
Total: candidateParents[0].Host.Disk.Total,
Free: candidateParents[0].Host.Disk.Free,
Used: candidateParents[0].Host.Disk.Used,
UsedPercent: candidateParents[0].Host.Disk.UsedPercent,
InodesTotal: candidateParents[0].Host.Disk.InodesTotal,
InodesUsed: candidateParents[0].Host.Disk.InodesUsed,
InodesFree: candidateParents[0].Host.Disk.InodesFree,
InodesUsedPercent: candidateParents[0].Host.Disk.InodesUsedPercent,
},
Build: &commonv2.Build{
GitVersion: candidateParents[0].Host.Build.GitVersion,
GitCommit: &candidateParents[0].Host.Build.GitCommit,
GoVersion: &candidateParents[0].Host.Build.GoVersion,
Platform: &candidateParents[0].Host.Build.Platform,
},
},
NeedBackToSource: candidateParents[0].NeedBackToSource.Load(),
CreatedAt: timestamppb.New(candidateParents[0].CreatedAt.Load()),
UpdatedAt: timestamppb.New(candidateParents[0].UpdatedAt.Load()),
},
},
ConcurrentPieceCount: 4,
}, },
}) })
}, },
@ -1627,7 +1480,6 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t) ctl := gomock.NewController(t)
defer ctl.Finish() defer ctl.Finish()
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
mockHost := resource.NewHost( mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
@ -1639,8 +1491,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
candidateParents[0].StorePiece(&mockPiece) candidateParents[0].StorePiece(&mockPiece)
candidateParents[0].Task.StorePiece(&mockPiece) candidateParents[0].Task.StorePiece(&mockPiece)
tc.mock(dynconfig.EXPECT()) tc.expect(t, ConstructSuccessNormalTaskResponse(candidateParents), candidateParents)
tc.expect(t, ConstructSuccessNormalTaskResponse(dynconfig, candidateParents), candidateParents)
}) })
} }
} }
@ -1648,49 +1499,15 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
func TestScheduling_ConstructSuccessPeerPacket(t *testing.T) { func TestScheduling_ConstructSuccessPeerPacket(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
mock func(md *configmocks.MockDynconfigInterfaceMockRecorder)
expect func(t *testing.T, packet *schedulerv1.PeerPacket, parent *resource.Peer, candidateParents []*resource.Peer) expect func(t *testing.T, packet *schedulerv1.PeerPacket, parent *resource.Peer, candidateParents []*resource.Peer)
}{ }{
{ {
name: "get concurrentPieceCount from dynconfig", name: "construct success peer packet",
mock: func(md *configmocks.MockDynconfigInterfaceMockRecorder) {
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{
ConcurrentPieceCount: 1,
}, nil).Times(1)
},
expect: func(t *testing.T, packet *schedulerv1.PeerPacket, parent *resource.Peer, candidateParents []*resource.Peer) { expect: func(t *testing.T, packet *schedulerv1.PeerPacket, parent *resource.Peer, candidateParents []*resource.Peer) {
assert := assert.New(t) assert := assert.New(t)
assert.EqualValues(packet, &schedulerv1.PeerPacket{ assert.EqualValues(packet, &schedulerv1.PeerPacket{
TaskId: mockTaskID, TaskId: mockTaskID,
SrcPid: mockPeerID, SrcPid: mockPeerID,
ParallelCount: 1,
MainPeer: &schedulerv1.PeerPacket_DestPeer{
Ip: parent.Host.IP,
RpcPort: parent.Host.Port,
PeerId: parent.ID,
},
CandidatePeers: []*schedulerv1.PeerPacket_DestPeer{
{
Ip: candidateParents[0].Host.IP,
RpcPort: candidateParents[0].Host.Port,
PeerId: candidateParents[0].ID,
},
},
Code: commonv1.Code_Success,
})
},
},
{
name: "use default concurrentPieceCount",
mock: func(md *configmocks.MockDynconfigInterfaceMockRecorder) {
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1)
},
expect: func(t *testing.T, packet *schedulerv1.PeerPacket, parent *resource.Peer, candidateParents []*resource.Peer) {
assert := assert.New(t)
assert.EqualValues(packet, &schedulerv1.PeerPacket{
TaskId: mockTaskID,
SrcPid: mockPeerID,
ParallelCount: 4,
MainPeer: &schedulerv1.PeerPacket_DestPeer{ MainPeer: &schedulerv1.PeerPacket_DestPeer{
Ip: parent.Host.IP, Ip: parent.Host.IP,
RpcPort: parent.Host.Port, RpcPort: parent.Host.Port,
@ -1713,7 +1530,6 @@ func TestScheduling_ConstructSuccessPeerPacket(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t) ctl := gomock.NewController(t)
defer ctl.Finish() defer ctl.Finish()
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
mockHost := resource.NewHost( mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
@ -1723,8 +1539,7 @@ func TestScheduling_ConstructSuccessPeerPacket(t *testing.T) {
parent := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost) parent := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost)
candidateParents := []*resource.Peer{resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost)} candidateParents := []*resource.Peer{resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost)}
tc.mock(dynconfig.EXPECT()) tc.expect(t, ConstructSuccessPeerPacket(peer, parent, candidateParents), parent, candidateParents)
tc.expect(t, ConstructSuccessPeerPacket(dynconfig, peer, parent, candidateParents), parent, candidateParents)
}) })
} }
} }