feat: change piece size to length (#2079)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-02-14 21:04:01 +08:00
parent d30c758411
commit d8f36a44be
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
12 changed files with 58 additions and 58 deletions

@ -1 +1 @@
Subproject commit 41e6b25362c03402ca356ef67b067be6ee140f5b
Subproject commit be86afe69937a54f9f94c75be986a74ce4dbc2c0

2
go.mod
View File

@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.19
require (
d7y.io/api v1.5.1
d7y.io/api v1.5.2
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
github.com/VividCortex/mysqlerr v1.0.0

4
go.sum
View File

@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api v1.5.1 h1:OMSQrd/dhcbA5FszKz+aHahl0P+kvuoesIZtOTS+OGE=
d7y.io/api v1.5.1/go.mod h1:7G3t9YO5esDzQVUgdUrS+6yCDAMWS5c9ux8yX5L9Ync=
d7y.io/api v1.5.2 h1:U6K4V5YuSwjtqXNnmnpV3OKS4dkB2ho4nEtBVAtTIqY=
d7y.io/api v1.5.2/go.mod h1:7G3t9YO5esDzQVUgdUrS+6yCDAMWS5c9ux8yX5L9Ync=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=

View File

@ -110,7 +110,7 @@ const (
// PeerOption is a functional option for peer.
type PeerOption func(peer *Peer)
// WithPieceSize set Priority for peer.
// WithPriority set Priority for peer.
func WithPriority(priority commonv2.Priority) PeerOption {
return func(p *Peer) {
p.Priority = priority

View File

@ -35,8 +35,8 @@ type Piece struct {
ParentID string
// Piece offset.
Offset uint64
// Piece size.
Size uint64
// Piece length.
Length uint64
// Digest of the piece data, for example md5:xxx or sha256:yyy.
Digest string
// Traffic type.

View File

@ -159,7 +159,7 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task)
peer.Pieces.Add(&Piece{
Number: uint32(piece.PieceInfo.PieceNum),
Offset: piece.PieceInfo.PieceOffset,
Size: uint64(piece.PieceInfo.RangeSize),
Length: uint64(piece.PieceInfo.RangeSize),
Digest: digest.New("md5", piece.PieceInfo.PieceMd5).String(),
TrafficType: commonv2.TrafficType_BACK_TO_SOURCE,
Cost: cost,

View File

@ -87,10 +87,10 @@ const (
// TaskOption is a functional option for task.
type TaskOption func(task *Task)
// WithPieceSize set PieceSize for task.
func WithPieceSize(pieceSize int32) TaskOption {
// WithPieceLength set PieceLength for task.
func WithPieceLength(pieceLength int32) TaskOption {
return func(t *Task) {
t.PieceSize = pieceSize
t.PieceLength = pieceLength
}
}
@ -120,8 +120,8 @@ type Task struct {
// Task request headers.
Header map[string]string
// Task piece size.
PieceSize int32
// Task piece length.
PieceLength int32
// DirectPiece is tiny piece data.
DirectPiece []byte

View File

@ -53,7 +53,7 @@ var (
mockTaskApplication = "foo"
mockTaskFilters = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceSize int32 = 2048
mockTaskPieceLength int32 = 2048
)
func TestTask_NewTask(t *testing.T) {
@ -75,7 +75,7 @@ func TestTask_NewTask(t *testing.T) {
assert.Equal(task.Application, mockTaskApplication)
assert.EqualValues(task.Filters, mockTaskFilters)
assert.EqualValues(task.Header, mockTaskHeader)
assert.Equal(task.PieceSize, int32(0))
assert.Equal(task.PieceLength, int32(0))
assert.Empty(task.DirectPiece)
assert.Equal(task.ContentLength.Load(), int64(-1))
assert.Equal(task.TotalPieceCount.Load(), int32(0))
@ -91,7 +91,7 @@ func TestTask_NewTask(t *testing.T) {
},
{
name: "new task with piece size",
options: []TaskOption{WithPieceSize(mockTaskPieceSize)},
options: []TaskOption{WithPieceLength(mockTaskPieceLength)},
expect: func(t *testing.T, task *Task) {
assert := assert.New(t)
assert.Equal(task.ID, mockTaskID)
@ -102,7 +102,7 @@ func TestTask_NewTask(t *testing.T) {
assert.Equal(task.Application, mockTaskApplication)
assert.EqualValues(task.Filters, mockTaskFilters)
assert.EqualValues(task.Header, mockTaskHeader)
assert.Equal(task.PieceSize, mockTaskPieceSize)
assert.Equal(task.PieceLength, mockTaskPieceLength)
assert.Empty(task.DirectPiece)
assert.Equal(task.ContentLength.Load(), int64(-1))
assert.Equal(task.TotalPieceCount.Load(), int32(0))

View File

@ -130,7 +130,7 @@ var (
mockTaskApplication = "foo"
mockTaskFilters = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceSize int32 = 2048
mockTaskPieceLength int32 = 2048
mockHostID = idgen.HostIDV2("127.0.0.1", "hostname", 8003)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "hostname_seed", 8003)
mockHostSecurityDomain = "security_domain"
@ -172,12 +172,12 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
{
name: "security domain is not the same",
parent: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize)),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize)),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
@ -194,12 +194,12 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
{
name: "security domain is same",
parent: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize)),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize)),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
@ -217,12 +217,12 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
{
name: "parent security domain is empty",
parent: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize)),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize)),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
@ -240,12 +240,12 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
{
name: "child security domain is empty",
parent: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize)),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize)),
resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
@ -275,7 +275,7 @@ func TestEvaluatorBase_calculatePieceScore(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
tests := []struct {
name string
@ -436,7 +436,7 @@ func TestEvaluatorBase_calculatehostUploadSuccessScore(t *testing.T) {
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockTask, host)
tc.mock(host)
tc.expect(t, calculateParentHostUploadSuccessScore(mockPeer))
@ -475,7 +475,7 @@ func TestEvaluatorBase_calculateFreeUploadScore(t *testing.T) {
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockTask, host)
tc.mock(host, mockPeer)
tc.expect(t, calculateFreeUploadScore(host))
@ -526,7 +526,7 @@ func TestEvaluatorBase_calculateHostTypeScore(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
tc.mock(peer)
tc.expect(t, calculateHostTypeScore(peer))
@ -737,7 +737,7 @@ func TestEvaluatorBase_IsBadNode(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
tests := []struct {
name string

View File

@ -153,7 +153,7 @@ var (
mockTaskApplication = "foo"
mockTaskFilters = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceSize int32 = 2048
mockTaskPieceLength int32 = 2048
mockHostID = idgen.HostIDV2("127.0.0.1", "hostname", 8003)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "hostname_seed", 8003)
mockHostSecurityDomain = "security_domain"
@ -385,7 +385,7 @@ func TestScheduler_ScheduleParent(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
mockSeedHost := resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
@ -664,7 +664,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
mockPeer := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, resource.NewHost(
idgen.HostIDV1(uuid.New().String(), 8003), mockRawHost.IP, mockRawHost.Hostname,
@ -935,7 +935,7 @@ func TestScheduler_FindParent(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
var mockPeers []*resource.Peer
@ -1028,7 +1028,7 @@ func TestScheduler_constructSuccessPeerPacket(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
parent := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost)

View File

@ -348,7 +348,7 @@ func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequ
Number: uint32(pieceInfo.PieceNum),
ParentID: req.PiecePacket.DstPid,
Offset: pieceInfo.PieceOffset,
Size: uint64(pieceInfo.RangeSize),
Length: uint64(pieceInfo.RangeSize),
Digest: digest.New("md5", pieceInfo.PieceMd5).String(),
TrafficType: commonv2.TrafficType_LOCAL_PEER,
Cost: 0,
@ -957,7 +957,7 @@ func (v *V1) handlePieceSuccess(ctx context.Context, peer *resource.Peer, piece
Number: uint32(piece.PieceInfo.PieceNum),
ParentID: piece.DstPid,
Offset: piece.PieceInfo.PieceOffset,
Size: uint64(piece.PieceInfo.RangeSize),
Length: uint64(piece.PieceInfo.RangeSize),
Digest: digest.New("md5", piece.PieceInfo.PieceMd5).String(),
TrafficType: trafficType,
Cost: cost,

View File

@ -178,7 +178,7 @@ var (
mockTaskApplication = "foo"
mockTaskFilters = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceSize int32 = 2048
mockTaskPieceLength int32 = 2048
mockHostID = idgen.HostIDV2("127.0.0.1", "hostname", 8003)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "hostname_seed", 8003)
mockHostSecurityDomain = "security_domain"
@ -884,7 +884,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
mockSeedHost := resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
@ -1148,7 +1148,7 @@ func TestService_ReportPieceResult(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
tc.mock(mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), stream.EXPECT())
tc.expect(t, mockPeer, svc.ReportPieceResult(stream))
@ -1346,7 +1346,7 @@ func TestService_ReportPeerResult(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), storage.EXPECT())
})
@ -1406,7 +1406,7 @@ func TestService_StatTask(t *testing.T) {
storage := storagemocks.NewMockStorage(ctl)
taskManager := resource.NewMockTaskManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduler, dynconfig, storage)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
tc.mock(mockTask, taskManager, res.EXPECT(), taskManager.EXPECT())
task, err := svc.StatTask(context.Background(), &schedulerv1.StatTaskRequest{TaskId: mockTaskID})
@ -1659,7 +1659,7 @@ func TestService_AnnounceTask(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
tc.mock(mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT())
@ -1856,7 +1856,7 @@ func TestService_LeaveTask(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduler, dynconfig, storage)
@ -2071,7 +2071,7 @@ func TestService_LeaveHost(t *testing.T) {
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduler, dynconfig, storage)
@ -2541,7 +2541,7 @@ func TestService_triggerTask(t *testing.T) {
mockSeedHost := resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
mockSeedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockSeedHost)
seedPeer := resource.NewMockSeedPeer(ctl)
@ -2613,7 +2613,7 @@ func TestService_storeTask(t *testing.T) {
assert.Equal(task.Application, mockTaskApplication)
assert.EqualValues(task.Filters, mockTaskFilters)
assert.EqualValues(task.Header, mockTaskHeader)
assert.Equal(task.PieceSize, int32(0))
assert.Equal(task.PieceLength, int32(0))
assert.Empty(task.DirectPiece)
assert.Equal(task.ContentLength.Load(), int64(-1))
assert.Equal(task.TotalPieceCount.Load(), int32(0))
@ -2734,7 +2734,7 @@ func TestService_storePeer(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
gomock.InOrder(
@ -2754,7 +2754,7 @@ func TestService_storePeer(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(mockPeerID)).Return(nil, false).Times(1),
@ -2858,7 +2858,7 @@ func TestService_triggerSeedPeerTask(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, task, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage)
@ -2939,7 +2939,7 @@ func TestService_handleBeginOfPiece(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage)
@ -2954,7 +2954,7 @@ func TestService_handlePieceSuccess(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
now := time.Now()
tests := []struct {
@ -3214,7 +3214,7 @@ func TestService_handlePieceFail(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
parent := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
seedPeer := resource.NewMockSeedPeer(ctl)
@ -3339,7 +3339,7 @@ func TestService_handlePeerSuccess(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduler, dynconfig, storage)
@ -3420,7 +3420,7 @@ func TestService_handlePeerFail(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
child := resource.NewPeer(mockPeerID, mockTask, mockHost)
@ -3503,7 +3503,7 @@ func TestService_handleTaskSuccess(t *testing.T) {
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduler, dynconfig, storage)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
tc.mock(task)
svc.handleTaskSuccess(context.Background(), task, tc.result)
@ -3642,7 +3642,7 @@ func TestService_handleTaskFail(t *testing.T) {
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduler, dynconfig, storage)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceSize(mockTaskPieceSize))
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskDigest, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithPieceLength(mockTaskPieceLength))
tc.mock(task)
svc.handleTaskFailure(context.Background(), task, tc.backToSourceErr, tc.seedPeerErr)