feat: add ListTaskEntries for downloading directory

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-07-08 11:41:12 +08:00
parent b3635340f7
commit e61d16e446
No known key found for this signature in database
GPG Key ID: 647A0EE86907F1AF
10 changed files with 1641 additions and 448 deletions

2
Cargo.lock generated
View File

@ -190,7 +190,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-api" name = "dragonfly-api"
version = "2.1.40" version = "2.1.41"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"prost-types 0.14.1", "prost-types 0.14.1",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "dragonfly-api" name = "dragonfly-api"
version = "2.1.40" version = "2.1.41"
authors = ["Gaius <gaius.qi@gmail.com>"] authors = ["Gaius <gaius.qi@gmail.com>"]
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"

File diff suppressed because it is too large Load Diff

View File

@ -1372,6 +1372,491 @@ var _ interface {
ErrorName() string ErrorName() string
} = StatTaskRequestValidationError{} } = StatTaskRequestValidationError{}
// Validate checks the field values on ListTaskEntriesRequest with the rules
// defined in the proto definition for this message. If any rules are
// violated, the first error encountered is returned, or nil if there are no violations.
func (m *ListTaskEntriesRequest) Validate() error {
return m.validate(false)
}
// ValidateAll checks the field values on ListTaskEntriesRequest with the rules
// defined in the proto definition for this message. If any rules are
// violated, the result is a list of violation errors wrapped in
// ListTaskEntriesRequestMultiError, or nil if none found.
func (m *ListTaskEntriesRequest) ValidateAll() error {
return m.validate(true)
}
func (m *ListTaskEntriesRequest) validate(all bool) error {
if m == nil {
return nil
}
var errors []error
if utf8.RuneCountInString(m.GetTaskId()) < 1 {
err := ListTaskEntriesRequestValidationError{
field: "TaskId",
reason: "value length must be at least 1 runes",
}
if !all {
return err
}
errors = append(errors, err)
}
// no validation rules for Url
// no validation rules for RequestHeader
if m.Timeout != nil {
if all {
switch v := interface{}(m.GetTimeout()).(type) {
case interface{ ValidateAll() error }:
if err := v.ValidateAll(); err != nil {
errors = append(errors, ListTaskEntriesRequestValidationError{
field: "Timeout",
reason: "embedded message failed validation",
cause: err,
})
}
case interface{ Validate() error }:
if err := v.Validate(); err != nil {
errors = append(errors, ListTaskEntriesRequestValidationError{
field: "Timeout",
reason: "embedded message failed validation",
cause: err,
})
}
}
} else if v, ok := interface{}(m.GetTimeout()).(interface{ Validate() error }); ok {
if err := v.Validate(); err != nil {
return ListTaskEntriesRequestValidationError{
field: "Timeout",
reason: "embedded message failed validation",
cause: err,
}
}
}
}
if m.ObjectStorage != nil {
if all {
switch v := interface{}(m.GetObjectStorage()).(type) {
case interface{ ValidateAll() error }:
if err := v.ValidateAll(); err != nil {
errors = append(errors, ListTaskEntriesRequestValidationError{
field: "ObjectStorage",
reason: "embedded message failed validation",
cause: err,
})
}
case interface{ Validate() error }:
if err := v.Validate(); err != nil {
errors = append(errors, ListTaskEntriesRequestValidationError{
field: "ObjectStorage",
reason: "embedded message failed validation",
cause: err,
})
}
}
} else if v, ok := interface{}(m.GetObjectStorage()).(interface{ Validate() error }); ok {
if err := v.Validate(); err != nil {
return ListTaskEntriesRequestValidationError{
field: "ObjectStorage",
reason: "embedded message failed validation",
cause: err,
}
}
}
}
if m.Hdfs != nil {
if all {
switch v := interface{}(m.GetHdfs()).(type) {
case interface{ ValidateAll() error }:
if err := v.ValidateAll(); err != nil {
errors = append(errors, ListTaskEntriesRequestValidationError{
field: "Hdfs",
reason: "embedded message failed validation",
cause: err,
})
}
case interface{ Validate() error }:
if err := v.Validate(); err != nil {
errors = append(errors, ListTaskEntriesRequestValidationError{
field: "Hdfs",
reason: "embedded message failed validation",
cause: err,
})
}
}
} else if v, ok := interface{}(m.GetHdfs()).(interface{ Validate() error }); ok {
if err := v.Validate(); err != nil {
return ListTaskEntriesRequestValidationError{
field: "Hdfs",
reason: "embedded message failed validation",
cause: err,
}
}
}
}
if len(errors) > 0 {
return ListTaskEntriesRequestMultiError(errors)
}
return nil
}
// ListTaskEntriesRequestMultiError is an error wrapping multiple validation
// errors returned by ListTaskEntriesRequest.ValidateAll() if the designated
// constraints aren't met.
type ListTaskEntriesRequestMultiError []error
// Error returns a concatenation of all the error messages it wraps.
func (m ListTaskEntriesRequestMultiError) Error() string {
var msgs []string
for _, err := range m {
msgs = append(msgs, err.Error())
}
return strings.Join(msgs, "; ")
}
// AllErrors returns a list of validation violation errors.
func (m ListTaskEntriesRequestMultiError) AllErrors() []error { return m }
// ListTaskEntriesRequestValidationError is the validation error returned by
// ListTaskEntriesRequest.Validate if the designated constraints aren't met.
type ListTaskEntriesRequestValidationError struct {
field string
reason string
cause error
key bool
}
// Field function returns field value.
func (e ListTaskEntriesRequestValidationError) Field() string { return e.field }
// Reason function returns reason value.
func (e ListTaskEntriesRequestValidationError) Reason() string { return e.reason }
// Cause function returns cause value.
func (e ListTaskEntriesRequestValidationError) Cause() error { return e.cause }
// Key function returns key value.
func (e ListTaskEntriesRequestValidationError) Key() bool { return e.key }
// ErrorName returns error name.
func (e ListTaskEntriesRequestValidationError) ErrorName() string {
return "ListTaskEntriesRequestValidationError"
}
// Error satisfies the builtin error interface
func (e ListTaskEntriesRequestValidationError) Error() string {
cause := ""
if e.cause != nil {
cause = fmt.Sprintf(" | caused by: %v", e.cause)
}
key := ""
if e.key {
key = "key for "
}
return fmt.Sprintf(
"invalid %sListTaskEntriesRequest.%s: %s%s",
key,
e.field,
e.reason,
cause)
}
var _ error = ListTaskEntriesRequestValidationError{}
var _ interface {
Field() string
Reason() string
Key() bool
Cause() error
ErrorName() string
} = ListTaskEntriesRequestValidationError{}
// Validate checks the field values on ListTaskEntriesResponse with the rules
// defined in the proto definition for this message. If any rules are
// violated, the first error encountered is returned, or nil if there are no violations.
func (m *ListTaskEntriesResponse) Validate() error {
return m.validate(false)
}
// ValidateAll checks the field values on ListTaskEntriesResponse with the
// rules defined in the proto definition for this message. If any rules are
// violated, the result is a list of violation errors wrapped in
// ListTaskEntriesResponseMultiError, or nil if none found.
func (m *ListTaskEntriesResponse) ValidateAll() error {
return m.validate(true)
}
func (m *ListTaskEntriesResponse) validate(all bool) error {
if m == nil {
return nil
}
var errors []error
// no validation rules for Success
// no validation rules for ContentLength
// no validation rules for ResponseHeader
for idx, item := range m.GetEntries() {
_, _ = idx, item
if all {
switch v := interface{}(item).(type) {
case interface{ ValidateAll() error }:
if err := v.ValidateAll(); err != nil {
errors = append(errors, ListTaskEntriesResponseValidationError{
field: fmt.Sprintf("Entries[%v]", idx),
reason: "embedded message failed validation",
cause: err,
})
}
case interface{ Validate() error }:
if err := v.Validate(); err != nil {
errors = append(errors, ListTaskEntriesResponseValidationError{
field: fmt.Sprintf("Entries[%v]", idx),
reason: "embedded message failed validation",
cause: err,
})
}
}
} else if v, ok := interface{}(item).(interface{ Validate() error }); ok {
if err := v.Validate(); err != nil {
return ListTaskEntriesResponseValidationError{
field: fmt.Sprintf("Entries[%v]", idx),
reason: "embedded message failed validation",
cause: err,
}
}
}
}
if m.StatusCode != nil {
if m.GetStatusCode() != 0 {
if val := m.GetStatusCode(); val < 100 || val >= 599 {
err := ListTaskEntriesResponseValidationError{
field: "StatusCode",
reason: "value must be inside range [100, 599)",
}
if !all {
return err
}
errors = append(errors, err)
}
}
}
if m.ErrorMessage != nil {
// no validation rules for ErrorMessage
}
if len(errors) > 0 {
return ListTaskEntriesResponseMultiError(errors)
}
return nil
}
// ListTaskEntriesResponseMultiError is an error wrapping multiple validation
// errors returned by ListTaskEntriesResponse.ValidateAll() if the designated
// constraints aren't met.
type ListTaskEntriesResponseMultiError []error
// Error returns a concatenation of all the error messages it wraps.
func (m ListTaskEntriesResponseMultiError) Error() string {
var msgs []string
for _, err := range m {
msgs = append(msgs, err.Error())
}
return strings.Join(msgs, "; ")
}
// AllErrors returns a list of validation violation errors.
func (m ListTaskEntriesResponseMultiError) AllErrors() []error { return m }
// ListTaskEntriesResponseValidationError is the validation error returned by
// ListTaskEntriesResponse.Validate if the designated constraints aren't met.
type ListTaskEntriesResponseValidationError struct {
field string
reason string
cause error
key bool
}
// Field function returns field value.
func (e ListTaskEntriesResponseValidationError) Field() string { return e.field }
// Reason function returns reason value.
func (e ListTaskEntriesResponseValidationError) Reason() string { return e.reason }
// Cause function returns cause value.
func (e ListTaskEntriesResponseValidationError) Cause() error { return e.cause }
// Key function returns key value.
func (e ListTaskEntriesResponseValidationError) Key() bool { return e.key }
// ErrorName returns error name.
func (e ListTaskEntriesResponseValidationError) ErrorName() string {
return "ListTaskEntriesResponseValidationError"
}
// Error satisfies the builtin error interface
func (e ListTaskEntriesResponseValidationError) Error() string {
cause := ""
if e.cause != nil {
cause = fmt.Sprintf(" | caused by: %v", e.cause)
}
key := ""
if e.key {
key = "key for "
}
return fmt.Sprintf(
"invalid %sListTaskEntriesResponse.%s: %s%s",
key,
e.field,
e.reason,
cause)
}
var _ error = ListTaskEntriesResponseValidationError{}
var _ interface {
Field() string
Reason() string
Key() bool
Cause() error
ErrorName() string
} = ListTaskEntriesResponseValidationError{}
// Validate checks the field values on Entry with the rules defined in the
// proto definition for this message. If any rules are violated, the first
// error encountered is returned, or nil if there are no violations.
func (m *Entry) Validate() error {
return m.validate(false)
}
// ValidateAll checks the field values on Entry with the rules defined in the
// proto definition for this message. If any rules are violated, the result is
// a list of violation errors wrapped in EntryMultiError, or nil if none found.
func (m *Entry) ValidateAll() error {
return m.validate(true)
}
func (m *Entry) validate(all bool) error {
if m == nil {
return nil
}
var errors []error
// no validation rules for Url
// no validation rules for ContentLength
// no validation rules for IsDir
if len(errors) > 0 {
return EntryMultiError(errors)
}
return nil
}
// EntryMultiError is an error wrapping multiple validation errors returned by
// Entry.ValidateAll() if the designated constraints aren't met.
type EntryMultiError []error
// Error returns a concatenation of all the error messages it wraps.
func (m EntryMultiError) Error() string {
var msgs []string
for _, err := range m {
msgs = append(msgs, err.Error())
}
return strings.Join(msgs, "; ")
}
// AllErrors returns a list of validation violation errors.
func (m EntryMultiError) AllErrors() []error { return m }
// EntryValidationError is the validation error returned by Entry.Validate if
// the designated constraints aren't met.
type EntryValidationError struct {
field string
reason string
cause error
key bool
}
// Field function returns field value.
func (e EntryValidationError) Field() string { return e.field }
// Reason function returns reason value.
func (e EntryValidationError) Reason() string { return e.reason }
// Cause function returns cause value.
func (e EntryValidationError) Cause() error { return e.cause }
// Key function returns key value.
func (e EntryValidationError) Key() bool { return e.key }
// ErrorName returns error name.
func (e EntryValidationError) ErrorName() string { return "EntryValidationError" }
// Error satisfies the builtin error interface
func (e EntryValidationError) Error() string {
cause := ""
if e.cause != nil {
cause = fmt.Sprintf(" | caused by: %v", e.cause)
}
key := ""
if e.key {
key = "key for "
}
return fmt.Sprintf(
"invalid %sEntry.%s: %s%s",
key,
e.field,
e.reason,
cause)
}
var _ error = EntryValidationError{}
var _ interface {
Field() string
Reason() string
Key() bool
Cause() error
ErrorName() string
} = EntryValidationError{}
// Validate checks the field values on DeleteTaskRequest with the rules defined // Validate checks the field values on DeleteTaskRequest with the rules defined
// in the proto definition for this message. If any rules are violated, the // in the proto definition for this message. If any rules are violated, the
// first error encountered is returned, or nil if there are no violations. // first error encountered is returned, or nil if there are no violations.

View File

@ -115,6 +115,50 @@ message StatTaskRequest {
string task_id = 1 [(validate.rules).string.min_len = 1]; string task_id = 1 [(validate.rules).string.min_len = 1];
} }
// ListTaskEntriesRequest represents request of ListTaskEntries.
message ListTaskEntriesRequest {
// Task id.
string task_id = 1 [(validate.rules).string.min_len = 1];
// URL to be listed the entries.
string url = 2;
// HTTP header to be sent with the request.
map<string, string> request_header = 3;
// List timeout.
optional google.protobuf.Duration timeout = 4;
// certificate_chain is the client certs with DER format for the backend client to list the entries.
repeated bytes certificate_chain = 5;
// Object storage protocol information.
optional common.v2.ObjectStorage object_storage = 6;
// HDFS protocol information.
optional common.v2.HDFS hdfs = 7;
}
// ListTaskEntriesResponse represents response of ListTaskEntries.
message ListTaskEntriesResponse {
// Success is the success of the response.
bool success = 1;
// Content length is the content length of the response
uint64 content_length = 2;
// HTTP header to be sent with the request.
map<string, string> response_header = 3;
// Backend HTTP status code.
optional int32 status_code = 4 [(validate.rules).int32 = {gte: 100, lt: 599, ignore_empty: true}];
/// Entries is the information of the entries in the directory.
repeated Entry entries = 5;
/// Error message is the error message of the response.
optional string error_message = 6;
}
// Entry represents an entry in a directory.
message Entry {
// URL of the entry.
string url = 1;
// Size of the entry.
uint64 content_length = 2;
// Is directory or not.
bool is_dir = 3;
}
// DeleteTaskRequest represents request of DeleteTask. // DeleteTaskRequest represents request of DeleteTask.
message DeleteTaskRequest { message DeleteTaskRequest {
// Task id. // Task id.
@ -340,6 +384,9 @@ service DfdaemonDownload {
// StatTask stats task information. // StatTask stats task information.
rpc StatTask(StatTaskRequest) returns(common.v2.Task); rpc StatTask(StatTaskRequest) returns(common.v2.Task);
// ListTaskEntries lists task entries for downloading directory.
rpc ListTaskEntries(ListTaskEntriesRequest) returns(ListTaskEntriesResponse);
// DeleteTask deletes task from p2p network. // DeleteTask deletes task from p2p network.
rpc DeleteTask(DeleteTaskRequest) returns(google.protobuf.Empty); rpc DeleteTask(DeleteTaskRequest) returns(google.protobuf.Empty);

View File

@ -706,6 +706,8 @@ type DfdaemonDownloadClient interface {
DownloadTask(ctx context.Context, in *DownloadTaskRequest, opts ...grpc.CallOption) (DfdaemonDownload_DownloadTaskClient, error) DownloadTask(ctx context.Context, in *DownloadTaskRequest, opts ...grpc.CallOption) (DfdaemonDownload_DownloadTaskClient, error)
// StatTask stats task information. // StatTask stats task information.
StatTask(ctx context.Context, in *StatTaskRequest, opts ...grpc.CallOption) (*v2.Task, error) StatTask(ctx context.Context, in *StatTaskRequest, opts ...grpc.CallOption) (*v2.Task, error)
// ListTaskEntries lists task entries for downloading directory.
ListTaskEntries(ctx context.Context, in *ListTaskEntriesRequest, opts ...grpc.CallOption) (*ListTaskEntriesResponse, error)
// DeleteTask deletes task from p2p network. // DeleteTask deletes task from p2p network.
DeleteTask(ctx context.Context, in *DeleteTaskRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) DeleteTask(ctx context.Context, in *DeleteTaskRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
// DeleteHost releases host in scheduler. // DeleteHost releases host in scheduler.
@ -767,6 +769,15 @@ func (c *dfdaemonDownloadClient) StatTask(ctx context.Context, in *StatTaskReque
return out, nil return out, nil
} }
func (c *dfdaemonDownloadClient) ListTaskEntries(ctx context.Context, in *ListTaskEntriesRequest, opts ...grpc.CallOption) (*ListTaskEntriesResponse, error) {
out := new(ListTaskEntriesResponse)
err := c.cc.Invoke(ctx, "/dfdaemon.v2.DfdaemonDownload/ListTaskEntries", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dfdaemonDownloadClient) DeleteTask(ctx context.Context, in *DeleteTaskRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { func (c *dfdaemonDownloadClient) DeleteTask(ctx context.Context, in *DeleteTaskRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty) out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/dfdaemon.v2.DfdaemonDownload/DeleteTask", in, out, opts...) err := c.cc.Invoke(ctx, "/dfdaemon.v2.DfdaemonDownload/DeleteTask", in, out, opts...)
@ -843,6 +854,8 @@ type DfdaemonDownloadServer interface {
DownloadTask(*DownloadTaskRequest, DfdaemonDownload_DownloadTaskServer) error DownloadTask(*DownloadTaskRequest, DfdaemonDownload_DownloadTaskServer) error
// StatTask stats task information. // StatTask stats task information.
StatTask(context.Context, *StatTaskRequest) (*v2.Task, error) StatTask(context.Context, *StatTaskRequest) (*v2.Task, error)
// ListTaskEntries lists task entries for downloading directory.
ListTaskEntries(context.Context, *ListTaskEntriesRequest) (*ListTaskEntriesResponse, error)
// DeleteTask deletes task from p2p network. // DeleteTask deletes task from p2p network.
DeleteTask(context.Context, *DeleteTaskRequest) (*emptypb.Empty, error) DeleteTask(context.Context, *DeleteTaskRequest) (*emptypb.Empty, error)
// DeleteHost releases host in scheduler. // DeleteHost releases host in scheduler.
@ -865,6 +878,9 @@ func (UnimplementedDfdaemonDownloadServer) DownloadTask(*DownloadTaskRequest, Df
func (UnimplementedDfdaemonDownloadServer) StatTask(context.Context, *StatTaskRequest) (*v2.Task, error) { func (UnimplementedDfdaemonDownloadServer) StatTask(context.Context, *StatTaskRequest) (*v2.Task, error) {
return nil, status.Errorf(codes.Unimplemented, "method StatTask not implemented") return nil, status.Errorf(codes.Unimplemented, "method StatTask not implemented")
} }
func (UnimplementedDfdaemonDownloadServer) ListTaskEntries(context.Context, *ListTaskEntriesRequest) (*ListTaskEntriesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListTaskEntries not implemented")
}
func (UnimplementedDfdaemonDownloadServer) DeleteTask(context.Context, *DeleteTaskRequest) (*emptypb.Empty, error) { func (UnimplementedDfdaemonDownloadServer) DeleteTask(context.Context, *DeleteTaskRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method DeleteTask not implemented") return nil, status.Errorf(codes.Unimplemented, "method DeleteTask not implemented")
} }
@ -931,6 +947,24 @@ func _DfdaemonDownload_StatTask_Handler(srv interface{}, ctx context.Context, de
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _DfdaemonDownload_ListTaskEntries_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListTaskEntriesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DfdaemonDownloadServer).ListTaskEntries(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dfdaemon.v2.DfdaemonDownload/ListTaskEntries",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DfdaemonDownloadServer).ListTaskEntries(ctx, req.(*ListTaskEntriesRequest))
}
return interceptor(ctx, in, info, handler)
}
func _DfdaemonDownload_DeleteTask_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _DfdaemonDownload_DeleteTask_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteTaskRequest) in := new(DeleteTaskRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -1035,6 +1069,10 @@ var DfdaemonDownload_ServiceDesc = grpc.ServiceDesc{
MethodName: "StatTask", MethodName: "StatTask",
Handler: _DfdaemonDownload_StatTask_Handler, Handler: _DfdaemonDownload_StatTask_Handler,
}, },
{
MethodName: "ListTaskEntries",
Handler: _DfdaemonDownload_ListTaskEntries_Handler,
},
{ {
MethodName: "DeleteTask", MethodName: "DeleteTask",
Handler: _DfdaemonDownload_DeleteTask_Handler, Handler: _DfdaemonDownload_DeleteTask_Handler,

View File

@ -1879,6 +1879,26 @@ func (mr *MockDfdaemonDownloadClientMockRecorder) DownloadTask(ctx, in any, opts
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockDfdaemonDownloadClient)(nil).DownloadTask), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockDfdaemonDownloadClient)(nil).DownloadTask), varargs...)
} }
// ListTaskEntries mocks base method.
func (m *MockDfdaemonDownloadClient) ListTaskEntries(ctx context.Context, in *dfdaemon.ListTaskEntriesRequest, opts ...grpc.CallOption) (*dfdaemon.ListTaskEntriesResponse, error) {
m.ctrl.T.Helper()
varargs := []any{ctx, in}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "ListTaskEntries", varargs...)
ret0, _ := ret[0].(*dfdaemon.ListTaskEntriesResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ListTaskEntries indicates an expected call of ListTaskEntries.
func (mr *MockDfdaemonDownloadClientMockRecorder) ListTaskEntries(ctx, in any, opts ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{ctx, in}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTaskEntries", reflect.TypeOf((*MockDfdaemonDownloadClient)(nil).ListTaskEntries), varargs...)
}
// StatPersistentCacheTask mocks base method. // StatPersistentCacheTask mocks base method.
func (m *MockDfdaemonDownloadClient) StatPersistentCacheTask(ctx context.Context, in *dfdaemon.StatPersistentCacheTaskRequest, opts ...grpc.CallOption) (*common.PersistentCacheTask, error) { func (m *MockDfdaemonDownloadClient) StatPersistentCacheTask(ctx context.Context, in *dfdaemon.StatPersistentCacheTaskRequest, opts ...grpc.CallOption) (*common.PersistentCacheTask, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -2269,6 +2289,21 @@ func (mr *MockDfdaemonDownloadServerMockRecorder) DownloadTask(arg0, arg1 any) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockDfdaemonDownloadServer)(nil).DownloadTask), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockDfdaemonDownloadServer)(nil).DownloadTask), arg0, arg1)
} }
// ListTaskEntries mocks base method.
func (m *MockDfdaemonDownloadServer) ListTaskEntries(arg0 context.Context, arg1 *dfdaemon.ListTaskEntriesRequest) (*dfdaemon.ListTaskEntriesResponse, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ListTaskEntries", arg0, arg1)
ret0, _ := ret[0].(*dfdaemon.ListTaskEntriesResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ListTaskEntries indicates an expected call of ListTaskEntries.
func (mr *MockDfdaemonDownloadServerMockRecorder) ListTaskEntries(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTaskEntries", reflect.TypeOf((*MockDfdaemonDownloadServer)(nil).ListTaskEntries), arg0, arg1)
}
// StatPersistentCacheTask mocks base method. // StatPersistentCacheTask mocks base method.
func (m *MockDfdaemonDownloadServer) StatPersistentCacheTask(arg0 context.Context, arg1 *dfdaemon.StatPersistentCacheTaskRequest) (*common.PersistentCacheTask, error) { func (m *MockDfdaemonDownloadServer) StatPersistentCacheTask(arg0 context.Context, arg1 *dfdaemon.StatPersistentCacheTaskRequest) (*common.PersistentCacheTask, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -110,6 +110,50 @@ message StatTaskRequest {
string task_id = 1; string task_id = 1;
} }
// ListTaskEntriesRequest represents request of ListTaskEntries.
message ListTaskEntriesRequest {
// Task id.
string task_id = 1;
// URL to be listed the entries.
string url = 2;
// HTTP header to be sent with the request.
map<string, string> request_header = 3;
// List timeout.
optional google.protobuf.Duration timeout = 4;
// certificate_chain is the client certs with DER format for the backend client to list the entries.
repeated bytes certificate_chain = 5;
// Object storage protocol information.
optional common.v2.ObjectStorage object_storage = 6;
// HDFS protocol information.
optional common.v2.HDFS hdfs = 7;
}
// ListTaskEntriesResponse represents response of ListTaskEntries.
message ListTaskEntriesResponse {
// Success is the success of the response.
bool success = 1;
// Content length is the content length of the response
uint64 content_length = 2;
// HTTP header to be sent with the request.
map<string, string> response_header = 3;
// Backend HTTP status code.
optional int32 status_code = 4;
/// Entries is the information of the entries in the directory.
repeated Entry entries = 5;
/// Error message is the error message of the response.
optional string error_message = 6;
}
// Entry represents an entry in a directory.
message Entry {
// URL of the entry.
string url = 1;
// Size of the entry.
uint64 content_length = 2;
// Is directory or not.
bool is_dir = 3;
}
// DeleteTaskRequest represents request of DeleteTask. // DeleteTaskRequest represents request of DeleteTask.
message DeleteTaskRequest { message DeleteTaskRequest {
// Task id. // Task id.
@ -333,6 +377,9 @@ service DfdaemonDownload {
// StatTask stats task information. // StatTask stats task information.
rpc StatTask(StatTaskRequest) returns(common.v2.Task); rpc StatTask(StatTaskRequest) returns(common.v2.Task);
// ListTaskEntries lists task entries for downloading directory.
rpc ListTaskEntries(ListTaskEntriesRequest) returns(ListTaskEntriesResponse);
// DeleteTask deletes task from p2p network. // DeleteTask deletes task from p2p network.
rpc DeleteTask(DeleteTaskRequest) returns(google.protobuf.Empty); rpc DeleteTask(DeleteTaskRequest) returns(google.protobuf.Empty);

Binary file not shown.

View File

@ -135,6 +135,78 @@ pub struct StatTaskRequest {
#[prost(string, tag = "1")] #[prost(string, tag = "1")]
pub task_id: ::prost::alloc::string::String, pub task_id: ::prost::alloc::string::String,
} }
/// ListTaskEntriesRequest represents request of ListTaskEntries.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListTaskEntriesRequest {
/// Task id.
#[prost(string, tag = "1")]
pub task_id: ::prost::alloc::string::String,
/// URL to be listed the entries.
#[prost(string, tag = "2")]
pub url: ::prost::alloc::string::String,
/// HTTP header to be sent with the request.
#[prost(map = "string, string", tag = "3")]
pub request_header: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
/// List timeout.
#[prost(message, optional, tag = "4")]
pub timeout: ::core::option::Option<::prost_wkt_types::Duration>,
/// certificate_chain is the client certs with DER format for the backend client to list the entries.
#[prost(bytes = "vec", repeated, tag = "5")]
pub certificate_chain: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Object storage protocol information.
#[prost(message, optional, tag = "6")]
pub object_storage: ::core::option::Option<super::super::common::v2::ObjectStorage>,
/// HDFS protocol information.
#[prost(message, optional, tag = "7")]
pub hdfs: ::core::option::Option<super::super::common::v2::Hdfs>,
}
/// ListTaskEntriesResponse represents response of ListTaskEntries.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListTaskEntriesResponse {
/// Success is the success of the response.
#[prost(bool, tag = "1")]
pub success: bool,
/// Content length is the content length of the response
#[prost(uint64, tag = "2")]
pub content_length: u64,
/// HTTP header to be sent with the request.
#[prost(map = "string, string", tag = "3")]
pub response_header: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
/// Backend HTTP status code.
#[prost(int32, optional, tag = "4")]
pub status_code: ::core::option::Option<i32>,
/// / Entries is the information of the entries in the directory.
#[prost(message, repeated, tag = "5")]
pub entries: ::prost::alloc::vec::Vec<Entry>,
/// / Error message is the error message of the response.
#[prost(string, optional, tag = "6")]
pub error_message: ::core::option::Option<::prost::alloc::string::String>,
}
/// Entry represents an entry in a directory.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Entry {
/// URL of the entry.
#[prost(string, tag = "1")]
pub url: ::prost::alloc::string::String,
/// Size of the entry.
#[prost(uint64, tag = "2")]
pub content_length: u64,
/// Is directory or not.
#[prost(bool, tag = "3")]
pub is_dir: bool,
}
/// DeleteTaskRequest represents request of DeleteTask. /// DeleteTaskRequest represents request of DeleteTask.
#[derive(serde::Serialize, serde::Deserialize)] #[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)] #[allow(clippy::derive_partial_eq_without_eq)]
@ -997,6 +1069,34 @@ pub mod dfdaemon_download_client {
.insert(GrpcMethod::new("dfdaemon.v2.DfdaemonDownload", "StatTask")); .insert(GrpcMethod::new("dfdaemon.v2.DfdaemonDownload", "StatTask"));
self.inner.unary(req, path, codec).await self.inner.unary(req, path, codec).await
} }
/// ListTaskEntries lists task entries for downloading directory.
pub async fn list_task_entries(
&mut self,
request: impl tonic::IntoRequest<super::ListTaskEntriesRequest>,
) -> std::result::Result<
tonic::Response<super::ListTaskEntriesResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/dfdaemon.v2.DfdaemonDownload/ListTaskEntries",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new("dfdaemon.v2.DfdaemonDownload", "ListTaskEntries"),
);
self.inner.unary(req, path, codec).await
}
/// DeleteTask deletes task from p2p network. /// DeleteTask deletes task from p2p network.
pub async fn delete_task( pub async fn delete_task(
&mut self, &mut self,
@ -2060,6 +2160,14 @@ pub mod dfdaemon_download_server {
tonic::Response<super::super::super::common::v2::Task>, tonic::Response<super::super::super::common::v2::Task>,
tonic::Status, tonic::Status,
>; >;
/// ListTaskEntries lists task entries for downloading directory.
async fn list_task_entries(
&self,
request: tonic::Request<super::ListTaskEntriesRequest>,
) -> std::result::Result<
tonic::Response<super::ListTaskEntriesResponse>,
tonic::Status,
>;
/// DeleteTask deletes task from p2p network. /// DeleteTask deletes task from p2p network.
async fn delete_task( async fn delete_task(
&self, &self,
@ -2273,6 +2381,52 @@ pub mod dfdaemon_download_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/dfdaemon.v2.DfdaemonDownload/ListTaskEntries" => {
#[allow(non_camel_case_types)]
struct ListTaskEntriesSvc<T: DfdaemonDownload>(pub Arc<T>);
impl<
T: DfdaemonDownload,
> tonic::server::UnaryService<super::ListTaskEntriesRequest>
for ListTaskEntriesSvc<T> {
type Response = super::ListTaskEntriesResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ListTaskEntriesRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as DfdaemonDownload>::list_task_entries(&inner, request)
.await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = ListTaskEntriesSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/dfdaemon.v2.DfdaemonDownload/DeleteTask" => { "/dfdaemon.v2.DfdaemonDownload/DeleteTask" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct DeleteTaskSvc<T: DfdaemonDownload>(pub Arc<T>); struct DeleteTaskSvc<T: DfdaemonDownload>(pub Arc<T>);