diff --git a/go.mod b/go.mod index 7081819f9..b3c2c9e12 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.48 + d7y.io/api/v2 v2.0.56 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 @@ -83,7 +83,7 @@ require ( golang.org/x/sys v0.14.0 golang.org/x/time v0.4.0 google.golang.org/api v0.138.0 - google.golang.org/grpc v1.60.0-dev + google.golang.org/grpc v1.61.0-dev google.golang.org/protobuf v1.31.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v3 v3.0.1 @@ -98,10 +98,10 @@ require ( ) require ( - cloud.google.com/go v0.110.7 // indirect + cloud.google.com/go v0.110.8 // indirect cloud.google.com/go/compute v1.23.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect - cloud.google.com/go/iam v1.1.1 // indirect + cloud.google.com/go/iam v1.1.2 // indirect cloud.google.com/go/pubsub v1.33.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible // indirect @@ -231,10 +231,10 @@ require ( golang.org/x/term v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.14.0 // indirect - google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/appengine v1.6.8 // indirect + google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gorm.io/driver/sqlserver v1.4.1 // indirect diff --git a/go.sum b/go.sum index 801882241..7807f5d01 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKP cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk= cloud.google.com/go v0.75.0/go.mod h1:VGuuCn7PG0dwsd5XPVm2Mm3wlh3EL55/79EKB6hlPTY= cloud.google.com/go v0.76.0/go.mod h1:r9EvIAvLrunusnetGdQ50M/gKui1x3zdGW/VELGkdpw= -cloud.google.com/go v0.110.7 h1:rJyC7nWRg2jWGZ4wSJ5nY65GTdYJkg0cd/uXb+ACI6o= -cloud.google.com/go v0.110.7/go.mod h1:+EYjdK8e5RME/VY/qLCAtuyALQ9q67dvuum8i+H5xsI= +cloud.google.com/go v0.110.8 h1:tyNdfIxjzaWctIiLYOTalaLKZ17SI44SKFW26QbOhME= +cloud.google.com/go v0.110.8/go.mod h1:Iz8AkXJf1qmxC3Oxoep8R1T36w8B92yU29PcBhHO5fk= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -34,10 +34,10 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= -cloud.google.com/go/iam v1.1.1 h1:lW7fzj15aVIXYHREOqjRBV9PsH0Z6u8Y46a1YGvQP4Y= -cloud.google.com/go/iam v1.1.1/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU= -cloud.google.com/go/kms v1.15.0 h1:xYl5WEaSekKYN5gGRyhjvZKM22GVBBCzegGNVPy+aIs= -cloud.google.com/go/kms v1.15.0/go.mod h1:c9J991h5DTl+kg7gi3MYomh12YEENGrf48ee/N/2CDM= +cloud.google.com/go/iam v1.1.2 h1:gacbrBdWcoVmGLozRuStX45YKvJtzIjJdAolzUs1sm4= +cloud.google.com/go/iam v1.1.2/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU= +cloud.google.com/go/kms v1.15.2 h1:lh6qra6oC4AyWe5fUUUBe/S27k12OHAleOOOw6KakdE= +cloud.google.com/go/kms v1.15.2/go.mod h1:3hopT4+7ooWRCjc2DxgnpESFxhIraaI2IpAVUEhbT/w= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api/v2 v2.0.48 h1:vhEjO2OsL+eWo+z60wFeopEnyMbt5ti5fkiWwQflJY4= -d7y.io/api/v2 v2.0.48/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E= +d7y.io/api/v2 v2.0.56 h1:ftWo2yk9/CZNnwbAZ/7GlQymq9TUWqw6Ye5imGrtfIE= +d7y.io/api/v2 v2.0.56/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= @@ -1634,8 +1634,9 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= -google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -1677,12 +1678,12 @@ google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210202153253-cf70463f6119/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210207032614-bba0dbe2a9ea/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY= -google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4= -google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q= -google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 h1:SeZZZx0cP0fqUyA+oRzP9k7cSwJlvDFiROO72uwD6i0= +google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97/go.mod h1:t1VqOqqvce95G3hIDCT5FeO3YUc6Q4Oe24L/+rNMxRk= +google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 h1:W18sezcAYs+3tDZX4F80yctqa12jcP1PUS2gQu1zTPU= +google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97/go.mod h1:iargEX0SFPm3xcfMI0d1domjg0ZF4Aa0p2awqyxhvF0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 h1:6GQBEOdGkX6MMTLT9V+TjtIRZCw9VPD5Z+yHY9wMgS0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97/go.mod h1:v7nGkzlmW8P3n/bKmWBn2WpBjpOEx8Q6gMueudAmKfY= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1710,8 +1711,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= -google.golang.org/grpc v1.60.0-dev h1:K5TTpndC/q9FIgl69DKogILdN5bynmV5G5GDDxZdQvw= -google.golang.org/grpc v1.60.0-dev/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/grpc v1.61.0-dev h1:4hYEJpeWmHdy3fYSJPJLGpu6rJV4oBkuAQZB7MtNC3g= +google.golang.org/grpc v1.61.0-dev/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/rpc/dfdaemon/client/client_v2.go b/pkg/rpc/dfdaemon/client/client_v2.go index 1037ca3bf..fbcf8e8f1 100644 --- a/pkg/rpc/dfdaemon/client/client_v2.go +++ b/pkg/rpc/dfdaemon/client/client_v2.go @@ -69,7 +69,7 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err // V2 is the interface for v2 version of the grpc client. type V2 interface { // SyncPieces syncs pieces from the other peers. - SyncPieces(context.Context, ...grpc.CallOption) (dfdaemonv2.Dfdaemon_SyncPiecesClient, error) + SyncPieces(context.Context, *dfdaemonv2.SyncPiecesRequest, ...grpc.CallOption) (dfdaemonv2.Dfdaemon_SyncPiecesClient, error) // DownloadTask downloads task back-to-source. DownloadTask(context.Context, *dfdaemonv2.DownloadTaskRequest, ...grpc.CallOption) error @@ -94,9 +94,10 @@ type v2 struct { } // Trigger client to download file. -func (v *v2) SyncPieces(ctx context.Context, opts ...grpc.CallOption) (dfdaemonv2.Dfdaemon_SyncPiecesClient, error) { +func (v *v2) SyncPieces(ctx context.Context, req *dfdaemonv2.SyncPiecesRequest, opts ...grpc.CallOption) (dfdaemonv2.Dfdaemon_SyncPiecesClient, error) { return v.DfdaemonClient.SyncPieces( ctx, + req, opts..., ) } diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index 531eebea8..4d9390d13 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -558,8 +558,8 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can // Set range to parent. if candidateParent.Range != nil { parent.Range = &commonv2.Range{ - Start: candidateParent.Range.Start, - Length: candidateParent.Range.Length, + Start: uint64(candidateParent.Range.Start), + Length: uint64(candidateParent.Range.Length), } } @@ -572,7 +572,7 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can } piece := &commonv2.Piece{ - Number: candidateParentPiece.Number, + Number: uint32(candidateParentPiece.Number), ParentId: &candidateParentPiece.ParentID, Offset: candidateParentPiece.Offset, Length: candidateParentPiece.Length, @@ -598,12 +598,12 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can Application: &candidateParent.Task.Application, Filters: candidateParent.Task.Filters, Header: candidateParent.Task.Header, - PieceLength: candidateParent.Task.PieceLength, - ContentLength: candidateParent.Task.ContentLength.Load(), - PieceCount: candidateParent.Task.TotalPieceCount.Load(), + PieceLength: uint32(candidateParent.Task.PieceLength), + ContentLength: uint64(candidateParent.Task.ContentLength.Load()), + PieceCount: uint32(candidateParent.Task.TotalPieceCount.Load()), SizeScope: candidateParent.Task.SizeScope(), State: candidateParent.Task.FSM.Current(), - PeerCount: int32(candidateParent.Task.PeerCount()), + PeerCount: uint32(candidateParent.Task.PeerCount()), CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()), UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()), } @@ -623,7 +623,7 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can } piece := &commonv2.Piece{ - Number: taskPiece.Number, + Number: uint32(taskPiece.Number), ParentId: &taskPiece.ParentID, Offset: taskPiece.Offset, Length: taskPiece.Length, @@ -709,7 +709,7 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can return &schedulerv2.AnnouncePeerResponse_NormalTaskResponse{ NormalTaskResponse: &schedulerv2.NormalTaskResponse{ CandidateParents: parents, - ConcurrentPieceCount: int32(concurrentPieceCount), + ConcurrentPieceCount: uint32(concurrentPieceCount), }, } } diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 8e7aa1aca..d54ac1853 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -1368,13 +1368,13 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { { Id: candidateParents[0].ID, Range: &commonv2.Range{ - Start: candidateParents[0].Range.Start, - Length: candidateParents[0].Range.Length, + Start: uint64(candidateParents[0].Range.Start), + Length: uint64(candidateParents[0].Range.Length), }, Priority: candidateParents[0].Priority, Pieces: []*commonv2.Piece{ { - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -1395,13 +1395,13 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { Application: &candidateParents[0].Task.Application, Filters: candidateParents[0].Task.Filters, Header: candidateParents[0].Task.Header, - PieceLength: candidateParents[0].Task.PieceLength, - ContentLength: candidateParents[0].Task.ContentLength.Load(), - PieceCount: candidateParents[0].Task.TotalPieceCount.Load(), + PieceLength: uint32(candidateParents[0].Task.PieceLength), + ContentLength: uint64(candidateParents[0].Task.ContentLength.Load()), + PieceCount: uint32(candidateParents[0].Task.TotalPieceCount.Load()), SizeScope: candidateParents[0].Task.SizeScope(), Pieces: []*commonv2.Piece{ { - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -1412,7 +1412,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { }, }, State: candidateParents[0].Task.FSM.Current(), - PeerCount: int32(candidateParents[0].Task.PeerCount()), + PeerCount: uint32(candidateParents[0].Task.PeerCount()), CreatedAt: timestamppb.New(candidateParents[0].Task.CreatedAt.Load()), UpdatedAt: timestamppb.New(candidateParents[0].Task.UpdatedAt.Load()), }, @@ -1502,13 +1502,13 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { { Id: candidateParents[0].ID, Range: &commonv2.Range{ - Start: candidateParents[0].Range.Start, - Length: candidateParents[0].Range.Length, + Start: uint64(candidateParents[0].Range.Start), + Length: uint64(candidateParents[0].Range.Length), }, Priority: candidateParents[0].Priority, Pieces: []*commonv2.Piece{ { - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -1529,13 +1529,13 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { Application: &candidateParents[0].Task.Application, Filters: candidateParents[0].Task.Filters, Header: candidateParents[0].Task.Header, - PieceLength: candidateParents[0].Task.PieceLength, - ContentLength: candidateParents[0].Task.ContentLength.Load(), - PieceCount: candidateParents[0].Task.TotalPieceCount.Load(), + PieceLength: uint32(candidateParents[0].Task.PieceLength), + ContentLength: uint64(candidateParents[0].Task.ContentLength.Load()), + PieceCount: uint32(candidateParents[0].Task.TotalPieceCount.Load()), SizeScope: candidateParents[0].Task.SizeScope(), Pieces: []*commonv2.Piece{ { - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -1546,7 +1546,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { }, }, State: candidateParents[0].Task.FSM.Current(), - PeerCount: int32(candidateParents[0].Task.PeerCount()), + PeerCount: uint32(candidateParents[0].Task.PeerCount()), CreatedAt: timestamppb.New(candidateParents[0].Task.CreatedAt.Load()), UpdatedAt: timestamppb.New(candidateParents[0].Task.UpdatedAt.Load()), }, diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index fb58a67ae..7b7f96728 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -219,8 +219,8 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c // Set range to response. if peer.Range != nil { resp.Range = &commonv2.Range{ - Start: peer.Range.Start, - Length: peer.Range.Length, + Start: uint64(peer.Range.Start), + Length: uint64(peer.Range.Length), } } @@ -233,7 +233,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c } respPiece := &commonv2.Piece{ - Number: piece.Number, + Number: uint32(piece.Number), ParentId: &piece.ParentID, Offset: piece.Offset, Length: piece.Length, @@ -259,12 +259,12 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c Application: &peer.Task.Application, Filters: peer.Task.Filters, Header: peer.Task.Header, - PieceLength: peer.Task.PieceLength, - ContentLength: peer.Task.ContentLength.Load(), - PieceCount: peer.Task.TotalPieceCount.Load(), + PieceLength: uint32(peer.Task.PieceLength), + ContentLength: uint64(peer.Task.ContentLength.Load()), + PieceCount: uint32(peer.Task.TotalPieceCount.Load()), SizeScope: peer.Task.SizeScope(), State: peer.Task.FSM.Current(), - PeerCount: int32(peer.Task.PeerCount()), + PeerCount: uint32(peer.Task.PeerCount()), CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()), UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()), } @@ -284,7 +284,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c } respPiece := &commonv2.Piece{ - Number: piece.Number, + Number: uint32(piece.Number), ParentId: &piece.ParentID, Offset: piece.Offset, Length: piece.Length, @@ -412,12 +412,12 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c Application: &task.Application, Filters: task.Filters, Header: task.Header, - PieceLength: task.PieceLength, - ContentLength: task.ContentLength.Load(), - PieceCount: task.TotalPieceCount.Load(), + PieceLength: uint32(task.PieceLength), + ContentLength: uint64(task.ContentLength.Load()), + PieceCount: uint32(task.TotalPieceCount.Load()), SizeScope: task.SizeScope(), State: task.FSM.Current(), - PeerCount: int32(task.PeerCount()), + PeerCount: uint32(task.PeerCount()), CreatedAt: timestamppb.New(task.CreatedAt.Load()), UpdatedAt: timestamppb.New(task.UpdatedAt.Load()), } @@ -437,7 +437,7 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c } respPiece := &commonv2.Piece{ - Number: piece.Number, + Number: uint32(piece.Number), ParentId: &piece.ParentID, Offset: piece.Offset, Length: piece.Length, @@ -1008,8 +1008,8 @@ func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context, // Handle task with peer back-to-source finished request, peer can only represent // a successful task after downloading the complete task. if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) { - peer.Task.ContentLength.Store(req.GetContentLength()) - peer.Task.TotalPieceCount.Store(req.GetPieceCount()) + peer.Task.ContentLength.Store(int64(req.GetContentLength())) + peer.Task.TotalPieceCount.Store(int32(req.GetPieceCount())) if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil { return status.Error(codes.Internal, err.Error()) } @@ -1084,7 +1084,7 @@ func (v *V2) handleDownloadPeerBackToSourceFailedRequest(ctx context.Context, pe func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceFinishedRequest) error { // Construct piece. piece := &resource.Piece{ - Number: req.Piece.GetNumber(), + Number: int32(req.Piece.GetNumber()), ParentID: req.Piece.GetParentId(), Offset: req.Piece.GetOffset(), Length: req.Piece.GetLength(), @@ -1147,7 +1147,7 @@ func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, peerID stri func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) error { // Construct piece. piece := &resource.Piece{ - Number: req.Piece.GetNumber(), + Number: int32(req.Piece.GetNumber()), ParentID: req.Piece.GetParentId(), Offset: req.Piece.GetOffset(), Length: req.Piece.GetLength(), @@ -1263,7 +1263,7 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An // Store new task or update task. task, loaded := v.resource.TaskManager().Load(taskID) if !loaded { - options := []resource.TaskOption{resource.WithPieceLength(download.GetPieceLength())} + options := []resource.TaskOption{resource.WithPieceLength(int32(download.GetPieceLength()))} if download.Digest != nil { d, err := digest.Parse(download.GetDigest()) if err != nil { @@ -1288,7 +1288,7 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An if !loaded { options := []resource.PeerOption{resource.WithPriority(download.GetPriority()), resource.WithAnnouncePeerStream(stream)} if download.Range != nil { - options = append(options, resource.WithRange(http.Range{Start: download.Range.GetStart(), Length: download.Range.GetLength()})) + options = append(options, resource.WithRange(http.Range{Start: int64(download.Range.GetStart()), Length: int64(download.Range.GetLength())})) } peer = resource.NewPeer(peerID, &v.config.Resource, task, host, options...) diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index e992dd233..beeb0eebb 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -120,13 +120,13 @@ func TestServiceV2_StatPeer(t *testing.T) { assert.EqualValues(resp, &commonv2.Peer{ Id: peer.ID, Range: &commonv2.Range{ - Start: peer.Range.Start, - Length: peer.Range.Length, + Start: uint64(peer.Range.Start), + Length: uint64(peer.Range.Length), }, Priority: peer.Priority, Pieces: []*commonv2.Piece{ { - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -147,13 +147,13 @@ func TestServiceV2_StatPeer(t *testing.T) { Application: &peer.Task.Application, Filters: peer.Task.Filters, Header: peer.Task.Header, - PieceLength: peer.Task.PieceLength, - ContentLength: peer.Task.ContentLength.Load(), - PieceCount: peer.Task.TotalPieceCount.Load(), + PieceLength: uint32(peer.Task.PieceLength), + ContentLength: uint64(peer.Task.ContentLength.Load()), + PieceCount: uint32(peer.Task.TotalPieceCount.Load()), SizeScope: peer.Task.SizeScope(), Pieces: []*commonv2.Piece{ { - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -164,7 +164,7 @@ func TestServiceV2_StatPeer(t *testing.T) { }, }, State: peer.Task.FSM.Current(), - PeerCount: int32(peer.Task.PeerCount()), + PeerCount: uint32(peer.Task.PeerCount()), CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()), UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()), }, @@ -373,13 +373,13 @@ func TestServiceV2_StatTask(t *testing.T) { Application: &task.Application, Filters: task.Filters, Header: task.Header, - PieceLength: task.PieceLength, - ContentLength: task.ContentLength.Load(), - PieceCount: task.TotalPieceCount.Load(), + PieceLength: uint32(task.PieceLength), + ContentLength: uint64(task.ContentLength.Load()), + PieceCount: uint32(task.TotalPieceCount.Load()), SizeScope: task.SizeScope(), Pieces: []*commonv2.Piece{ { - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -390,7 +390,7 @@ func TestServiceV2_StatTask(t *testing.T) { }, }, State: task.FSM.Current(), - PeerCount: int32(task.PeerCount()), + PeerCount: uint32(task.PeerCount()), CreatedAt: timestamppb.New(task.CreatedAt.Load()), UpdatedAt: timestamppb.New(task.UpdatedAt.Load()), }) @@ -2678,7 +2678,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { name: "invalid digest", req: &schedulerv2.DownloadPieceFinishedRequest{ Piece: &commonv2.Piece{ - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -2697,7 +2697,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { name: "peer can not be loaded", req: &schedulerv2.DownloadPieceFinishedRequest{ Piece: &commonv2.Piece{ - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -2721,7 +2721,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { name: "parent can not be loaded", req: &schedulerv2.DownloadPieceFinishedRequest{ Piece: &commonv2.Piece{ - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -2742,7 +2742,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { assert := assert.New(t) assert.NoError(svc.handleDownloadPieceFinishedRequest(context.Background(), peer.ID, req)) - piece, loaded := peer.LoadPiece(req.Piece.Number) + piece, loaded := peer.LoadPiece(int32(req.Piece.Number)) assert.True(loaded) assert.Equal(piece.Number, mockPiece.Number) assert.Equal(piece.ParentID, mockPiece.ParentID) @@ -2763,7 +2763,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { name: "parent can be loaded", req: &schedulerv2.DownloadPieceFinishedRequest{ Piece: &commonv2.Piece{ - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -2784,7 +2784,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { assert := assert.New(t) assert.NoError(svc.handleDownloadPieceFinishedRequest(context.Background(), peer.ID, req)) - piece, loaded := peer.LoadPiece(req.Piece.Number) + piece, loaded := peer.LoadPiece(int32(req.Piece.Number)) assert.True(loaded) assert.Equal(piece.Number, mockPiece.Number) assert.Equal(piece.ParentID, mockPiece.ParentID) @@ -2837,7 +2837,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) name: "invalid digest", req: &schedulerv2.DownloadPieceBackToSourceFinishedRequest{ Piece: &commonv2.Piece{ - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -2856,7 +2856,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) name: "peer can not be loaded", req: &schedulerv2.DownloadPieceBackToSourceFinishedRequest{ Piece: &commonv2.Piece{ - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -2880,7 +2880,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) name: "peer can be loaded", req: &schedulerv2.DownloadPieceBackToSourceFinishedRequest{ Piece: &commonv2.Piece{ - Number: mockPiece.Number, + Number: uint32(mockPiece.Number), ParentId: &mockPiece.ParentID, Offset: mockPiece.Offset, Length: mockPiece.Length, @@ -2899,7 +2899,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) assert := assert.New(t) assert.NoError(svc.handleDownloadPieceBackToSourceFinishedRequest(context.Background(), peer.ID, req)) - piece, loaded := peer.LoadPiece(req.Piece.Number) + piece, loaded := peer.LoadPiece(int32(req.Piece.Number)) assert.True(loaded) assert.Equal(piece.Number, mockPiece.Number) assert.Equal(piece.ParentID, mockPiece.ParentID) @@ -2914,7 +2914,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) assert.NotEqual(peer.PieceUpdatedAt.Load(), 0) assert.NotEqual(peer.UpdatedAt.Load(), 0) - piece, loaded = peer.Task.LoadPiece(req.Piece.Number) + piece, loaded = peer.Task.LoadPiece(int32(req.Piece.Number)) assert.True(loaded) assert.Equal(piece.Number, mockPiece.Number) assert.Equal(piece.ParentID, mockPiece.ParentID) @@ -3088,7 +3088,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) { { name: "peer can be loaded", req: &schedulerv2.DownloadPieceBackToSourceFailedRequest{ - PieceNumber: mockPiece.Number, + PieceNumber: uint32(mockPiece.Number), }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { @@ -3278,8 +3278,8 @@ func TestServiceV2_handleResource(t *testing.T) { Digest: &dgst, Priority: commonv2.Priority_LEVEL1, Range: &commonv2.Range{ - Start: mockPeerRange.Start, - Length: mockPeerRange.Length, + Start: uint64(mockPeerRange.Start), + Length: uint64(mockPeerRange.Length), }, }, run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, @@ -3307,8 +3307,8 @@ func TestServiceV2_handleResource(t *testing.T) { assert.EqualValues(task.Header, download.Header) assert.Equal(peer.ID, mockPeer.ID) assert.Equal(peer.Priority, download.Priority) - assert.Equal(peer.Range.Start, download.Range.Start) - assert.Equal(peer.Range.Length, download.Range.Length) + assert.Equal(peer.Range.Start, int64(download.Range.Start)) + assert.Equal(peer.Range.Length, int64(download.Range.Length)) assert.NotNil(peer.AnnouncePeerStream) assert.EqualValues(peer.Host, mockHost) assert.EqualValues(peer.Task, mockTask)