feat: change TriggerDownloadTask to DownloadTask with stream (#302)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-04-16 20:00:32 +08:00 committed by GitHub
parent 20049c9ad8
commit 9f7fd897b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 1480 additions and 1432 deletions

2
Cargo.lock generated
View File

@ -160,7 +160,7 @@ dependencies = [
[[package]]
name = "dragonfly-api"
version = "2.0.109"
version = "2.0.110"
dependencies = [
"prost 0.11.9",
"prost-types 0.12.4",

View File

@ -1,6 +1,6 @@
[package]
name = "dragonfly-api"
version = "2.0.109"
version = "2.0.110"
authors = ["Gaius <gaius.qi@gmail.com>"]
edition = "2021"
license = "Apache-2.0"

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -24,56 +24,6 @@ import "validate/validate.proto";
option go_package = "d7y.io/api/v2/pkg/apis/dfdaemon/v2;dfdaemon";
// TriggerDownloadTaskRequest represents request of TriggerDownloadTask.
message TriggerDownloadTaskRequest {
// Download information.
common.v2.Download download = 1 [(validate.rules).message.required = true];
}
// SyncPiecesRequest represents request of SyncPieces.
message SyncPiecesRequest {
// Task id.
string task_id = 1 [(validate.rules).string.min_len = 1];
// Interested piece numbers.
repeated uint32 interested_piece_numbers = 2 [(validate.rules).repeated = {min_items: 1}];
}
// SyncPiecesResponse represents response of SyncPieces.
message SyncPiecesResponse {
// Exist piece number.
uint32 number = 1;
// Piece offset.
uint64 offset = 2;
// Piece length.
uint64 length = 3;
}
// DownloadPieceRequest represents request of DownloadPiece.
message DownloadPieceRequest{
// Task id.
string task_id = 1 [(validate.rules).string.min_len = 1];
// Piece number.
uint32 piece_number = 2;
}
// DownloadPieceResponse represents response of DownloadPieces.
message DownloadPieceResponse {
// Piece information.
common.v2.Piece piece = 1 [(validate.rules).message.required = true];
}
// DfdaemonUpload represents dfdaemon upload service.
service DfdaemonUpload {
// TriggerDownloadTask triggers download task.
rpc TriggerDownloadTask(TriggerDownloadTaskRequest) returns(google.protobuf.Empty);
// SyncPieces syncs piece metadatas from remote peer.
rpc SyncPieces(SyncPiecesRequest) returns(stream SyncPiecesResponse);
// DownloadPiece downloads piece from the remote peer.
rpc DownloadPiece(DownloadPieceRequest)returns(DownloadPieceResponse);
}
// DownloadTaskRequest represents request of DownloadTask.
message DownloadTaskRequest {
// Download information.
@ -120,6 +70,50 @@ message DownloadTaskResponse {
}
}
// SyncPiecesRequest represents request of SyncPieces.
message SyncPiecesRequest {
// Task id.
string task_id = 1 [(validate.rules).string.min_len = 1];
// Interested piece numbers.
repeated uint32 interested_piece_numbers = 2 [(validate.rules).repeated = {min_items: 1}];
}
// SyncPiecesResponse represents response of SyncPieces.
message SyncPiecesResponse {
// Exist piece number.
uint32 number = 1;
// Piece offset.
uint64 offset = 2;
// Piece length.
uint64 length = 3;
}
// DownloadPieceRequest represents request of DownloadPiece.
message DownloadPieceRequest{
// Task id.
string task_id = 1 [(validate.rules).string.min_len = 1];
// Piece number.
uint32 piece_number = 2;
}
// DownloadPieceResponse represents response of DownloadPieces.
message DownloadPieceResponse {
// Piece information.
common.v2.Piece piece = 1 [(validate.rules).message.required = true];
}
// DfdaemonUpload represents dfdaemon upload service.
service DfdaemonUpload {
// DownloadTask downloads task from p2p network.
rpc DownloadTask(DownloadTaskRequest) returns(stream DownloadTaskResponse);
// SyncPieces syncs piece metadatas from remote peer.
rpc SyncPieces(SyncPiecesRequest) returns(stream SyncPiecesResponse);
// DownloadPiece downloads piece from the remote peer.
rpc DownloadPiece(DownloadPieceRequest)returns(DownloadPieceResponse);
}
// UploadTaskRequest represents request of UploadTask.
message UploadTaskRequest {
// Task metadata.
@ -140,7 +134,7 @@ message DeleteTaskRequest {
// DfdaemonDownload represents dfdaemon download service.
service DfdaemonDownload {
// DownloadTask downloads task back-to-source.
// DownloadTask downloads task from p2p network.
rpc DownloadTask(DownloadTaskRequest) returns(stream DownloadTaskResponse);
// UploadTask uploads task to p2p network.

View File

@ -24,8 +24,8 @@ const _ = grpc.SupportPackageIsVersion7
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type DfdaemonUploadClient interface {
// TriggerDownloadTask triggers download task.
TriggerDownloadTask(ctx context.Context, in *TriggerDownloadTaskRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
// DownloadTask downloads task from p2p network.
DownloadTask(ctx context.Context, in *DownloadTaskRequest, opts ...grpc.CallOption) (DfdaemonUpload_DownloadTaskClient, error)
// SyncPieces syncs piece metadatas from remote peer.
SyncPieces(ctx context.Context, in *SyncPiecesRequest, opts ...grpc.CallOption) (DfdaemonUpload_SyncPiecesClient, error)
// DownloadPiece downloads piece from the remote peer.
@ -40,17 +40,40 @@ func NewDfdaemonUploadClient(cc grpc.ClientConnInterface) DfdaemonUploadClient {
return &dfdaemonUploadClient{cc}
}
func (c *dfdaemonUploadClient) TriggerDownloadTask(ctx context.Context, in *TriggerDownloadTaskRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/dfdaemon.v2.DfdaemonUpload/TriggerDownloadTask", in, out, opts...)
func (c *dfdaemonUploadClient) DownloadTask(ctx context.Context, in *DownloadTaskRequest, opts ...grpc.CallOption) (DfdaemonUpload_DownloadTaskClient, error) {
stream, err := c.cc.NewStream(ctx, &DfdaemonUpload_ServiceDesc.Streams[0], "/dfdaemon.v2.DfdaemonUpload/DownloadTask", opts...)
if err != nil {
return nil, err
}
return out, nil
x := &dfdaemonUploadDownloadTaskClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type DfdaemonUpload_DownloadTaskClient interface {
Recv() (*DownloadTaskResponse, error)
grpc.ClientStream
}
type dfdaemonUploadDownloadTaskClient struct {
grpc.ClientStream
}
func (x *dfdaemonUploadDownloadTaskClient) Recv() (*DownloadTaskResponse, error) {
m := new(DownloadTaskResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *dfdaemonUploadClient) SyncPieces(ctx context.Context, in *SyncPiecesRequest, opts ...grpc.CallOption) (DfdaemonUpload_SyncPiecesClient, error) {
stream, err := c.cc.NewStream(ctx, &DfdaemonUpload_ServiceDesc.Streams[0], "/dfdaemon.v2.DfdaemonUpload/SyncPieces", opts...)
stream, err := c.cc.NewStream(ctx, &DfdaemonUpload_ServiceDesc.Streams[1], "/dfdaemon.v2.DfdaemonUpload/SyncPieces", opts...)
if err != nil {
return nil, err
}
@ -94,8 +117,8 @@ func (c *dfdaemonUploadClient) DownloadPiece(ctx context.Context, in *DownloadPi
// All implementations should embed UnimplementedDfdaemonUploadServer
// for forward compatibility
type DfdaemonUploadServer interface {
// TriggerDownloadTask triggers download task.
TriggerDownloadTask(context.Context, *TriggerDownloadTaskRequest) (*emptypb.Empty, error)
// DownloadTask downloads task from p2p network.
DownloadTask(*DownloadTaskRequest, DfdaemonUpload_DownloadTaskServer) error
// SyncPieces syncs piece metadatas from remote peer.
SyncPieces(*SyncPiecesRequest, DfdaemonUpload_SyncPiecesServer) error
// DownloadPiece downloads piece from the remote peer.
@ -106,8 +129,8 @@ type DfdaemonUploadServer interface {
type UnimplementedDfdaemonUploadServer struct {
}
func (UnimplementedDfdaemonUploadServer) TriggerDownloadTask(context.Context, *TriggerDownloadTaskRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TriggerDownloadTask not implemented")
func (UnimplementedDfdaemonUploadServer) DownloadTask(*DownloadTaskRequest, DfdaemonUpload_DownloadTaskServer) error {
return status.Errorf(codes.Unimplemented, "method DownloadTask not implemented")
}
func (UnimplementedDfdaemonUploadServer) SyncPieces(*SyncPiecesRequest, DfdaemonUpload_SyncPiecesServer) error {
return status.Errorf(codes.Unimplemented, "method SyncPieces not implemented")
@ -127,22 +150,25 @@ func RegisterDfdaemonUploadServer(s grpc.ServiceRegistrar, srv DfdaemonUploadSer
s.RegisterService(&DfdaemonUpload_ServiceDesc, srv)
}
func _DfdaemonUpload_TriggerDownloadTask_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TriggerDownloadTaskRequest)
if err := dec(in); err != nil {
return nil, err
func _DfdaemonUpload_DownloadTask_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(DownloadTaskRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
if interceptor == nil {
return srv.(DfdaemonUploadServer).TriggerDownloadTask(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dfdaemon.v2.DfdaemonUpload/TriggerDownloadTask",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DfdaemonUploadServer).TriggerDownloadTask(ctx, req.(*TriggerDownloadTaskRequest))
}
return interceptor(ctx, in, info, handler)
return srv.(DfdaemonUploadServer).DownloadTask(m, &dfdaemonUploadDownloadTaskServer{stream})
}
type DfdaemonUpload_DownloadTaskServer interface {
Send(*DownloadTaskResponse) error
grpc.ServerStream
}
type dfdaemonUploadDownloadTaskServer struct {
grpc.ServerStream
}
func (x *dfdaemonUploadDownloadTaskServer) Send(m *DownloadTaskResponse) error {
return x.ServerStream.SendMsg(m)
}
func _DfdaemonUpload_SyncPieces_Handler(srv interface{}, stream grpc.ServerStream) error {
@ -191,16 +217,17 @@ var DfdaemonUpload_ServiceDesc = grpc.ServiceDesc{
ServiceName: "dfdaemon.v2.DfdaemonUpload",
HandlerType: (*DfdaemonUploadServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "TriggerDownloadTask",
Handler: _DfdaemonUpload_TriggerDownloadTask_Handler,
},
{
MethodName: "DownloadPiece",
Handler: _DfdaemonUpload_DownloadPiece_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "DownloadTask",
Handler: _DfdaemonUpload_DownloadTask_Handler,
ServerStreams: true,
},
{
StreamName: "SyncPieces",
Handler: _DfdaemonUpload_SyncPieces_Handler,
@ -214,7 +241,7 @@ var DfdaemonUpload_ServiceDesc = grpc.ServiceDesc{
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type DfdaemonDownloadClient interface {
// DownloadTask downloads task back-to-source.
// DownloadTask downloads task from p2p network.
DownloadTask(ctx context.Context, in *DownloadTaskRequest, opts ...grpc.CallOption) (DfdaemonDownload_DownloadTaskClient, error)
// UploadTask uploads task to p2p network.
UploadTask(ctx context.Context, in *UploadTaskRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
@ -306,7 +333,7 @@ func (c *dfdaemonDownloadClient) LeaveHost(ctx context.Context, in *emptypb.Empt
// All implementations should embed UnimplementedDfdaemonDownloadServer
// for forward compatibility
type DfdaemonDownloadServer interface {
// DownloadTask downloads task back-to-source.
// DownloadTask downloads task from p2p network.
DownloadTask(*DownloadTaskRequest, DfdaemonDownload_DownloadTaskServer) error
// UploadTask uploads task to p2p network.
UploadTask(context.Context, *UploadTaskRequest) (*emptypb.Empty, error)

View File

@ -63,6 +63,26 @@ func (mr *MockDfdaemonUploadClientMockRecorder) DownloadPiece(ctx, in any, opts
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadPiece", reflect.TypeOf((*MockDfdaemonUploadClient)(nil).DownloadPiece), varargs...)
}
// DownloadTask mocks base method.
func (m *MockDfdaemonUploadClient) DownloadTask(ctx context.Context, in *dfdaemon.DownloadTaskRequest, opts ...grpc.CallOption) (dfdaemon.DfdaemonUpload_DownloadTaskClient, error) {
m.ctrl.T.Helper()
varargs := []any{ctx, in}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DownloadTask", varargs...)
ret0, _ := ret[0].(dfdaemon.DfdaemonUpload_DownloadTaskClient)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DownloadTask indicates an expected call of DownloadTask.
func (mr *MockDfdaemonUploadClientMockRecorder) DownloadTask(ctx, in any, opts ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{ctx, in}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockDfdaemonUploadClient)(nil).DownloadTask), varargs...)
}
// SyncPieces mocks base method.
func (m *MockDfdaemonUploadClient) SyncPieces(ctx context.Context, in *dfdaemon.SyncPiecesRequest, opts ...grpc.CallOption) (dfdaemon.DfdaemonUpload_SyncPiecesClient, error) {
m.ctrl.T.Helper()
@ -83,24 +103,127 @@ func (mr *MockDfdaemonUploadClientMockRecorder) SyncPieces(ctx, in any, opts ...
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieces", reflect.TypeOf((*MockDfdaemonUploadClient)(nil).SyncPieces), varargs...)
}
// TriggerDownloadTask mocks base method.
func (m *MockDfdaemonUploadClient) TriggerDownloadTask(ctx context.Context, in *dfdaemon.TriggerDownloadTaskRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
// MockDfdaemonUpload_DownloadTaskClient is a mock of DfdaemonUpload_DownloadTaskClient interface.
type MockDfdaemonUpload_DownloadTaskClient struct {
ctrl *gomock.Controller
recorder *MockDfdaemonUpload_DownloadTaskClientMockRecorder
}
// MockDfdaemonUpload_DownloadTaskClientMockRecorder is the mock recorder for MockDfdaemonUpload_DownloadTaskClient.
type MockDfdaemonUpload_DownloadTaskClientMockRecorder struct {
mock *MockDfdaemonUpload_DownloadTaskClient
}
// NewMockDfdaemonUpload_DownloadTaskClient creates a new mock instance.
func NewMockDfdaemonUpload_DownloadTaskClient(ctrl *gomock.Controller) *MockDfdaemonUpload_DownloadTaskClient {
mock := &MockDfdaemonUpload_DownloadTaskClient{ctrl: ctrl}
mock.recorder = &MockDfdaemonUpload_DownloadTaskClientMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockDfdaemonUpload_DownloadTaskClient) EXPECT() *MockDfdaemonUpload_DownloadTaskClientMockRecorder {
return m.recorder
}
// CloseSend mocks base method.
func (m *MockDfdaemonUpload_DownloadTaskClient) CloseSend() error {
m.ctrl.T.Helper()
varargs := []any{ctx, in}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "TriggerDownloadTask", varargs...)
ret0, _ := ret[0].(*emptypb.Empty)
ret := m.ctrl.Call(m, "CloseSend")
ret0, _ := ret[0].(error)
return ret0
}
// CloseSend indicates an expected call of CloseSend.
func (mr *MockDfdaemonUpload_DownloadTaskClientMockRecorder) CloseSend() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskClient)(nil).CloseSend))
}
// Context mocks base method.
func (m *MockDfdaemonUpload_DownloadTaskClient) Context() context.Context {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Context")
ret0, _ := ret[0].(context.Context)
return ret0
}
// Context indicates an expected call of Context.
func (mr *MockDfdaemonUpload_DownloadTaskClientMockRecorder) Context() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskClient)(nil).Context))
}
// Header mocks base method.
func (m *MockDfdaemonUpload_DownloadTaskClient) Header() (metadata.MD, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Header")
ret0, _ := ret[0].(metadata.MD)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// TriggerDownloadTask indicates an expected call of TriggerDownloadTask.
func (mr *MockDfdaemonUploadClientMockRecorder) TriggerDownloadTask(ctx, in any, opts ...any) *gomock.Call {
// Header indicates an expected call of Header.
func (mr *MockDfdaemonUpload_DownloadTaskClientMockRecorder) Header() *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{ctx, in}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerDownloadTask", reflect.TypeOf((*MockDfdaemonUploadClient)(nil).TriggerDownloadTask), varargs...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskClient)(nil).Header))
}
// Recv mocks base method.
func (m *MockDfdaemonUpload_DownloadTaskClient) Recv() (*dfdaemon.DownloadTaskResponse, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Recv")
ret0, _ := ret[0].(*dfdaemon.DownloadTaskResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Recv indicates an expected call of Recv.
func (mr *MockDfdaemonUpload_DownloadTaskClientMockRecorder) Recv() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskClient)(nil).Recv))
}
// RecvMsg mocks base method.
func (m_2 *MockDfdaemonUpload_DownloadTaskClient) RecvMsg(m any) error {
m_2.ctrl.T.Helper()
ret := m_2.ctrl.Call(m_2, "RecvMsg", m)
ret0, _ := ret[0].(error)
return ret0
}
// RecvMsg indicates an expected call of RecvMsg.
func (mr *MockDfdaemonUpload_DownloadTaskClientMockRecorder) RecvMsg(m any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskClient)(nil).RecvMsg), m)
}
// SendMsg mocks base method.
func (m_2 *MockDfdaemonUpload_DownloadTaskClient) SendMsg(m any) error {
m_2.ctrl.T.Helper()
ret := m_2.ctrl.Call(m_2, "SendMsg", m)
ret0, _ := ret[0].(error)
return ret0
}
// SendMsg indicates an expected call of SendMsg.
func (mr *MockDfdaemonUpload_DownloadTaskClientMockRecorder) SendMsg(m any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskClient)(nil).SendMsg), m)
}
// Trailer mocks base method.
func (m *MockDfdaemonUpload_DownloadTaskClient) Trailer() metadata.MD {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Trailer")
ret0, _ := ret[0].(metadata.MD)
return ret0
}
// Trailer indicates an expected call of Trailer.
func (mr *MockDfdaemonUpload_DownloadTaskClientMockRecorder) Trailer() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskClient)(nil).Trailer))
}
// MockDfdaemonUpload_SyncPiecesClient is a mock of DfdaemonUpload_SyncPiecesClient interface.
@ -264,6 +387,20 @@ func (mr *MockDfdaemonUploadServerMockRecorder) DownloadPiece(arg0, arg1 any) *g
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadPiece", reflect.TypeOf((*MockDfdaemonUploadServer)(nil).DownloadPiece), arg0, arg1)
}
// DownloadTask mocks base method.
func (m *MockDfdaemonUploadServer) DownloadTask(arg0 *dfdaemon.DownloadTaskRequest, arg1 dfdaemon.DfdaemonUpload_DownloadTaskServer) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DownloadTask", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// DownloadTask indicates an expected call of DownloadTask.
func (mr *MockDfdaemonUploadServerMockRecorder) DownloadTask(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockDfdaemonUploadServer)(nil).DownloadTask), arg0, arg1)
}
// SyncPieces mocks base method.
func (m *MockDfdaemonUploadServer) SyncPieces(arg0 *dfdaemon.SyncPiecesRequest, arg1 dfdaemon.DfdaemonUpload_SyncPiecesServer) error {
m.ctrl.T.Helper()
@ -278,21 +415,6 @@ func (mr *MockDfdaemonUploadServerMockRecorder) SyncPieces(arg0, arg1 any) *gomo
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieces", reflect.TypeOf((*MockDfdaemonUploadServer)(nil).SyncPieces), arg0, arg1)
}
// TriggerDownloadTask mocks base method.
func (m *MockDfdaemonUploadServer) TriggerDownloadTask(arg0 context.Context, arg1 *dfdaemon.TriggerDownloadTaskRequest) (*emptypb.Empty, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TriggerDownloadTask", arg0, arg1)
ret0, _ := ret[0].(*emptypb.Empty)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// TriggerDownloadTask indicates an expected call of TriggerDownloadTask.
func (mr *MockDfdaemonUploadServerMockRecorder) TriggerDownloadTask(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerDownloadTask", reflect.TypeOf((*MockDfdaemonUploadServer)(nil).TriggerDownloadTask), arg0, arg1)
}
// MockUnsafeDfdaemonUploadServer is a mock of UnsafeDfdaemonUploadServer interface.
type MockUnsafeDfdaemonUploadServer struct {
ctrl *gomock.Controller
@ -328,6 +450,125 @@ func (mr *MockUnsafeDfdaemonUploadServerMockRecorder) mustEmbedUnimplementedDfda
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedDfdaemonUploadServer", reflect.TypeOf((*MockUnsafeDfdaemonUploadServer)(nil).mustEmbedUnimplementedDfdaemonUploadServer))
}
// MockDfdaemonUpload_DownloadTaskServer is a mock of DfdaemonUpload_DownloadTaskServer interface.
type MockDfdaemonUpload_DownloadTaskServer struct {
ctrl *gomock.Controller
recorder *MockDfdaemonUpload_DownloadTaskServerMockRecorder
}
// MockDfdaemonUpload_DownloadTaskServerMockRecorder is the mock recorder for MockDfdaemonUpload_DownloadTaskServer.
type MockDfdaemonUpload_DownloadTaskServerMockRecorder struct {
mock *MockDfdaemonUpload_DownloadTaskServer
}
// NewMockDfdaemonUpload_DownloadTaskServer creates a new mock instance.
func NewMockDfdaemonUpload_DownloadTaskServer(ctrl *gomock.Controller) *MockDfdaemonUpload_DownloadTaskServer {
mock := &MockDfdaemonUpload_DownloadTaskServer{ctrl: ctrl}
mock.recorder = &MockDfdaemonUpload_DownloadTaskServerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockDfdaemonUpload_DownloadTaskServer) EXPECT() *MockDfdaemonUpload_DownloadTaskServerMockRecorder {
return m.recorder
}
// Context mocks base method.
func (m *MockDfdaemonUpload_DownloadTaskServer) Context() context.Context {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Context")
ret0, _ := ret[0].(context.Context)
return ret0
}
// Context indicates an expected call of Context.
func (mr *MockDfdaemonUpload_DownloadTaskServerMockRecorder) Context() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskServer)(nil).Context))
}
// RecvMsg mocks base method.
func (m_2 *MockDfdaemonUpload_DownloadTaskServer) RecvMsg(m any) error {
m_2.ctrl.T.Helper()
ret := m_2.ctrl.Call(m_2, "RecvMsg", m)
ret0, _ := ret[0].(error)
return ret0
}
// RecvMsg indicates an expected call of RecvMsg.
func (mr *MockDfdaemonUpload_DownloadTaskServerMockRecorder) RecvMsg(m any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskServer)(nil).RecvMsg), m)
}
// Send mocks base method.
func (m *MockDfdaemonUpload_DownloadTaskServer) Send(arg0 *dfdaemon.DownloadTaskResponse) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Send", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Send indicates an expected call of Send.
func (mr *MockDfdaemonUpload_DownloadTaskServerMockRecorder) Send(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskServer)(nil).Send), arg0)
}
// SendHeader mocks base method.
func (m *MockDfdaemonUpload_DownloadTaskServer) SendHeader(arg0 metadata.MD) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendHeader", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// SendHeader indicates an expected call of SendHeader.
func (mr *MockDfdaemonUpload_DownloadTaskServerMockRecorder) SendHeader(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskServer)(nil).SendHeader), arg0)
}
// SendMsg mocks base method.
func (m_2 *MockDfdaemonUpload_DownloadTaskServer) SendMsg(m any) error {
m_2.ctrl.T.Helper()
ret := m_2.ctrl.Call(m_2, "SendMsg", m)
ret0, _ := ret[0].(error)
return ret0
}
// SendMsg indicates an expected call of SendMsg.
func (mr *MockDfdaemonUpload_DownloadTaskServerMockRecorder) SendMsg(m any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskServer)(nil).SendMsg), m)
}
// SetHeader mocks base method.
func (m *MockDfdaemonUpload_DownloadTaskServer) SetHeader(arg0 metadata.MD) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetHeader", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// SetHeader indicates an expected call of SetHeader.
func (mr *MockDfdaemonUpload_DownloadTaskServerMockRecorder) SetHeader(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskServer)(nil).SetHeader), arg0)
}
// SetTrailer mocks base method.
func (m *MockDfdaemonUpload_DownloadTaskServer) SetTrailer(arg0 metadata.MD) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetTrailer", arg0)
}
// SetTrailer indicates an expected call of SetTrailer.
func (mr *MockDfdaemonUpload_DownloadTaskServerMockRecorder) SetTrailer(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockDfdaemonUpload_DownloadTaskServer)(nil).SetTrailer), arg0)
}
// MockDfdaemonUpload_SyncPiecesServer is a mock of DfdaemonUpload_SyncPiecesServer interface.
type MockDfdaemonUpload_SyncPiecesServer struct {
ctrl *gomock.Controller

View File

@ -21,56 +21,6 @@ package dfdaemon.v2;
import "common.proto";
import "google/protobuf/empty.proto";
// TriggerDownloadTaskRequest represents request of TriggerDownloadTask.
message TriggerDownloadTaskRequest {
// Download information.
common.v2.Download download = 1;
}
// SyncPiecesRequest represents request of SyncPieces.
message SyncPiecesRequest {
// Task id.
string task_id = 1;
// Interested piece numbers.
repeated uint32 interested_piece_numbers = 2;
}
// SyncPiecesResponse represents response of SyncPieces.
message SyncPiecesResponse {
// Exist piece number.
uint32 number = 1;
// Piece offset.
uint64 offset = 2;
// Piece length.
uint64 length = 3;
}
// DownloadPieceRequest represents request of DownloadPiece.
message DownloadPieceRequest{
// Task id.
string task_id = 1;
// Piece number.
uint32 piece_number = 2;
}
// DownloadPieceResponse represents response of DownloadPieces.
message DownloadPieceResponse {
// Piece information.
common.v2.Piece piece = 1;
}
// DfdaemonUpload represents upload service of dfdaemon.
service DfdaemonUpload{
// TriggerDownloadTask triggers download task.
rpc TriggerDownloadTask(TriggerDownloadTaskRequest) returns(google.protobuf.Empty);
// SyncPieces syncs piece metadatas from remote peer.
rpc SyncPieces(SyncPiecesRequest) returns(stream SyncPiecesResponse);
// DownloadPiece downloads piece from the remote peer.
rpc DownloadPiece(DownloadPieceRequest)returns(DownloadPieceResponse);
}
// DownloadTaskRequest represents request of DownloadTask.
message DownloadTaskRequest {
// Download information.
@ -115,6 +65,50 @@ message DownloadTaskResponse {
}
}
// SyncPiecesRequest represents request of SyncPieces.
message SyncPiecesRequest {
// Task id.
string task_id = 1;
// Interested piece numbers.
repeated uint32 interested_piece_numbers = 2;
}
// SyncPiecesResponse represents response of SyncPieces.
message SyncPiecesResponse {
// Exist piece number.
uint32 number = 1;
// Piece offset.
uint64 offset = 2;
// Piece length.
uint64 length = 3;
}
// DownloadPieceRequest represents request of DownloadPiece.
message DownloadPieceRequest{
// Task id.
string task_id = 1;
// Piece number.
uint32 piece_number = 2;
}
// DownloadPieceResponse represents response of DownloadPieces.
message DownloadPieceResponse {
// Piece information.
common.v2.Piece piece = 1;
}
// DfdaemonUpload represents upload service of dfdaemon.
service DfdaemonUpload{
// DownloadTask downloads task from p2p network.
rpc DownloadTask(DownloadTaskRequest) returns(stream DownloadTaskResponse);
// SyncPieces syncs piece metadatas from remote peer.
rpc SyncPieces(SyncPiecesRequest) returns(stream SyncPiecesResponse);
// DownloadPiece downloads piece from the remote peer.
rpc DownloadPiece(DownloadPieceRequest)returns(DownloadPieceResponse);
}
// UploadTaskRequest represents request of UploadTask.
message UploadTaskRequest {
// Task metadata.
@ -135,7 +129,7 @@ message DeleteTaskRequest {
// DfdaemonDownload represents download service of dfdaemon.
service DfdaemonDownload{
// DownloadTask downloads task back-to-source.
// DownloadTask downloads task from p2p network.
rpc DownloadTask(DownloadTaskRequest) returns(stream DownloadTaskResponse);
// UploadTask uploads task to p2p network.

Binary file not shown.

View File

@ -1,60 +1,3 @@
/// TriggerDownloadTaskRequest represents request of TriggerDownloadTask.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TriggerDownloadTaskRequest {
/// Download information.
#[prost(message, optional, tag = "1")]
pub download: ::core::option::Option<super::super::common::v2::Download>,
}
/// SyncPiecesRequest represents request of SyncPieces.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SyncPiecesRequest {
/// Task id.
#[prost(string, tag = "1")]
pub task_id: ::prost::alloc::string::String,
/// Interested piece numbers.
#[prost(uint32, repeated, tag = "2")]
pub interested_piece_numbers: ::prost::alloc::vec::Vec<u32>,
}
/// SyncPiecesResponse represents response of SyncPieces.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SyncPiecesResponse {
/// Exist piece number.
#[prost(uint32, tag = "1")]
pub number: u32,
/// Piece offset.
#[prost(uint64, tag = "2")]
pub offset: u64,
/// Piece length.
#[prost(uint64, tag = "3")]
pub length: u64,
}
/// DownloadPieceRequest represents request of DownloadPiece.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DownloadPieceRequest {
/// Task id.
#[prost(string, tag = "1")]
pub task_id: ::prost::alloc::string::String,
/// Piece number.
#[prost(uint32, tag = "2")]
pub piece_number: u32,
}
/// DownloadPieceResponse represents response of DownloadPieces.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DownloadPieceResponse {
/// Piece information.
#[prost(message, optional, tag = "1")]
pub piece: ::core::option::Option<super::super::common::v2::Piece>,
}
/// DownloadTaskRequest represents request of DownloadTask.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
@ -125,6 +68,54 @@ pub mod download_task_response {
DownloadPieceFinishedResponse(super::DownloadPieceFinishedResponse),
}
}
/// SyncPiecesRequest represents request of SyncPieces.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SyncPiecesRequest {
/// Task id.
#[prost(string, tag = "1")]
pub task_id: ::prost::alloc::string::String,
/// Interested piece numbers.
#[prost(uint32, repeated, tag = "2")]
pub interested_piece_numbers: ::prost::alloc::vec::Vec<u32>,
}
/// SyncPiecesResponse represents response of SyncPieces.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SyncPiecesResponse {
/// Exist piece number.
#[prost(uint32, tag = "1")]
pub number: u32,
/// Piece offset.
#[prost(uint64, tag = "2")]
pub offset: u64,
/// Piece length.
#[prost(uint64, tag = "3")]
pub length: u64,
}
/// DownloadPieceRequest represents request of DownloadPiece.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DownloadPieceRequest {
/// Task id.
#[prost(string, tag = "1")]
pub task_id: ::prost::alloc::string::String,
/// Piece number.
#[prost(uint32, tag = "2")]
pub piece_number: u32,
}
/// DownloadPieceResponse represents response of DownloadPieces.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DownloadPieceResponse {
/// Piece information.
#[prost(message, optional, tag = "1")]
pub piece: ::core::option::Option<super::super::common::v2::Piece>,
}
/// UploadTaskRequest represents request of UploadTask.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
@ -238,11 +229,14 @@ pub mod dfdaemon_upload_client {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
/// TriggerDownloadTask triggers download task.
pub async fn trigger_download_task(
/// DownloadTask downloads task from p2p network.
pub async fn download_task(
&mut self,
request: impl tonic::IntoRequest<super::TriggerDownloadTaskRequest>,
) -> std::result::Result<tonic::Response<()>, tonic::Status> {
request: impl tonic::IntoRequest<super::DownloadTaskRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::DownloadTaskResponse>>,
tonic::Status,
> {
self.inner
.ready()
.await
@ -254,14 +248,12 @@ pub mod dfdaemon_upload_client {
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/dfdaemon.v2.DfdaemonUpload/TriggerDownloadTask",
"/dfdaemon.v2.DfdaemonUpload/DownloadTask",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new("dfdaemon.v2.DfdaemonUpload", "TriggerDownloadTask"),
);
self.inner.unary(req, path, codec).await
.insert(GrpcMethod::new("dfdaemon.v2.DfdaemonUpload", "DownloadTask"));
self.inner.server_streaming(req, path, codec).await
}
/// SyncPieces syncs piece metadatas from remote peer.
pub async fn sync_pieces(
@ -403,7 +395,7 @@ pub mod dfdaemon_download_client {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
/// DownloadTask downloads task back-to-source.
/// DownloadTask downloads task from p2p network.
pub async fn download_task(
&mut self,
request: impl tonic::IntoRequest<super::DownloadTaskRequest>,
@ -533,11 +525,20 @@ pub mod dfdaemon_upload_server {
/// Generated trait containing gRPC methods that should be implemented for use with DfdaemonUploadServer.
#[async_trait]
pub trait DfdaemonUpload: Send + Sync + 'static {
/// TriggerDownloadTask triggers download task.
async fn trigger_download_task(
/// Server streaming response type for the DownloadTask method.
type DownloadTaskStream: futures_core::Stream<
Item = std::result::Result<super::DownloadTaskResponse, tonic::Status>,
>
+ Send
+ 'static;
/// DownloadTask downloads task from p2p network.
async fn download_task(
&self,
request: tonic::Request<super::TriggerDownloadTaskRequest>,
) -> std::result::Result<tonic::Response<()>, tonic::Status>;
request: tonic::Request<super::DownloadTaskRequest>,
) -> std::result::Result<
tonic::Response<Self::DownloadTaskStream>,
tonic::Status,
>;
/// Server streaming response type for the SyncPieces method.
type SyncPiecesStream: futures_core::Stream<
Item = std::result::Result<super::SyncPiecesResponse, tonic::Status>,
@ -638,25 +639,26 @@ pub mod dfdaemon_upload_server {
fn call(&mut self, req: http::Request<B>) -> Self::Future {
let inner = self.inner.clone();
match req.uri().path() {
"/dfdaemon.v2.DfdaemonUpload/TriggerDownloadTask" => {
"/dfdaemon.v2.DfdaemonUpload/DownloadTask" => {
#[allow(non_camel_case_types)]
struct TriggerDownloadTaskSvc<T: DfdaemonUpload>(pub Arc<T>);
struct DownloadTaskSvc<T: DfdaemonUpload>(pub Arc<T>);
impl<
T: DfdaemonUpload,
> tonic::server::UnaryService<super::TriggerDownloadTaskRequest>
for TriggerDownloadTaskSvc<T> {
type Response = ();
> tonic::server::ServerStreamingService<super::DownloadTaskRequest>
for DownloadTaskSvc<T> {
type Response = super::DownloadTaskResponse;
type ResponseStream = T::DownloadTaskStream;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::TriggerDownloadTaskRequest>,
request: tonic::Request<super::DownloadTaskRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).trigger_download_task(request).await
(*inner).download_task(request).await
};
Box::pin(fut)
}
@ -668,7 +670,7 @@ pub mod dfdaemon_upload_server {
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = TriggerDownloadTaskSvc(inner);
let method = DownloadTaskSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
@ -679,7 +681,7 @@ pub mod dfdaemon_upload_server {
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
@ -829,7 +831,7 @@ pub mod dfdaemon_download_server {
>
+ Send
+ 'static;
/// DownloadTask downloads task back-to-source.
/// DownloadTask downloads task from p2p network.
async fn download_task(
&self,
request: tonic::Request<super::DownloadTaskRequest>,