diff --git a/Cargo.lock b/Cargo.lock index d1b08d9..b437533 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,7 +160,7 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.0.121" +version = "2.0.122" dependencies = [ "prost 0.11.9", "prost-types 0.12.6", diff --git a/Cargo.toml b/Cargo.toml index e3f6266..f316952 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dragonfly-api" -version = "2.0.121" +version = "2.0.122" authors = ["Gaius "] edition = "2021" license = "Apache-2.0" diff --git a/pkg/apis/dfdaemon/v2/dfdaemon.proto b/pkg/apis/dfdaemon/v2/dfdaemon.proto index 7fdd9b6..7a98d1a 100644 --- a/pkg/apis/dfdaemon/v2/dfdaemon.proto +++ b/pkg/apis/dfdaemon/v2/dfdaemon.proto @@ -175,8 +175,10 @@ message UploadCacheTaskRequest { optional string application = 4; // Task piece length. uint64 piece_length = 5 [(validate.rules).uint64.gte = 1]; - // Download timeout. - optional google.protobuf.Duration timeout = 6; + // TTL of the cache task. + google.protobuf.Duration ttl = 6 [(validate.rules).duration = {gte.seconds: 60, lte.seconds: 604800}]; + // Upload timeout. + optional google.protobuf.Duration timeout = 7; } // StatCacheTaskRequest represents request of StatCacheTask. diff --git a/pkg/apis/scheduler/v2/scheduler.proto b/pkg/apis/scheduler/v2/scheduler.proto index 76ec6b2..4cd4434 100644 --- a/pkg/apis/scheduler/v2/scheduler.proto +++ b/pkg/apis/scheduler/v2/scheduler.proto @@ -373,6 +373,28 @@ message DeleteCachePeerRequest { string peer_id = 3 [(validate.rules).string.min_len = 1]; } +// UploadCacheTaskRequest represents request of UploadCacheTask. +message UploadCacheTaskRequest { + // Host id. + string host_id = 1 [(validate.rules).string.min_len = 1]; + // Task id. + string task_id = 2 [(validate.rules).string.min_len = 1]; + // Peer id. + string peer_id = 3 [(validate.rules).string.min_len = 1]; + // Replica count of the persistent cache task. + uint64 persistent_replica_count = 4 [(validate.rules).uint64.gte = 1]; + // Tag is used to distinguish different cache tasks. + optional string tag = 5; + // Application of task. + optional string application = 6; + // Task piece length. + uint64 piece_length = 7 [(validate.rules).uint64.gte = 1]; + // TTL of the cache task. + google.protobuf.Duration ttl = 8 [(validate.rules).duration = {gte.seconds: 60, lte.seconds: 604800}]; + // Upload timeout. + optional google.protobuf.Duration timeout = 9; +} + // StatCacheTaskRequest represents request of StatCacheTask. message StatCacheTaskRequest { // Task id. @@ -422,6 +444,9 @@ service Scheduler { // DeleteCachePeer releases cache peer in scheduler. rpc DeleteCachePeer(DeleteCachePeerRequest)returns(google.protobuf.Empty); + // UploadCacheTask uploads cache task to scheduler. + rpc UploadCacheTask(UploadCacheTaskRequest)returns(common.v2.CacheTask); + // Checks information of cache task. rpc StatCacheTask(StatCacheTaskRequest)returns(common.v2.CacheTask); diff --git a/proto/common.proto b/proto/common.proto index 7ba868a..d37f515 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -214,10 +214,12 @@ message CacheTask { uint32 piece_count = 9; // Task state. string state = 10; + // TTL of the cache task. + google.protobuf.Duration ttl = 11; // Task create time. - google.protobuf.Timestamp created_at = 11; + google.protobuf.Timestamp created_at = 12; // Task update time. - google.protobuf.Timestamp updated_at = 12; + google.protobuf.Timestamp updated_at = 13; } // Host metadata. diff --git a/proto/dfdaemon.proto b/proto/dfdaemon.proto index 80eacbd..ad5d8dc 100644 --- a/proto/dfdaemon.proto +++ b/proto/dfdaemon.proto @@ -162,8 +162,10 @@ message UploadCacheTaskRequest { optional string application = 4; // Task piece length. uint64 piece_length = 5; - // Download timeout. - optional google.protobuf.Duration timeout = 6; + // TTL of the cache task. + google.protobuf.Duration ttl = 6; + // Upload timeout. + optional google.protobuf.Duration timeout = 7; } // StatCacheTaskRequest represents request of StatCacheTask. diff --git a/proto/scheduler.proto b/proto/scheduler.proto index 9a5776f..fc47262 100644 --- a/proto/scheduler.proto +++ b/proto/scheduler.proto @@ -357,6 +357,28 @@ message DeleteCachePeerRequest { string peer_id = 3; } +// UploadCacheTaskRequest represents request of UploadCacheTask. +message UploadCacheTaskRequest { + // Host id. + string host_id = 1; + // Task id. + string task_id = 2; + // Peer id. + string peer_id = 3; + // Replica count of the persistent cache task. + uint64 persistent_replica_count = 4; + // Tag is used to distinguish different cache tasks. + optional string tag = 5; + // Application of task. + optional string application = 6; + // Task piece length. + uint64 piece_length = 7; + // TTL of the cache task. + google.protobuf.Duration ttl = 8; + // Upload timeout. + optional google.protobuf.Duration timeout = 9; +} + // StatCacheTaskRequest represents request of StatCacheTask. message StatCacheTaskRequest { // Task id. @@ -406,6 +428,9 @@ service Scheduler{ // DeleteCachePeer releases cache peer in scheduler. rpc DeleteCachePeer(DeleteCachePeerRequest)returns(google.protobuf.Empty); + // UploadCacheTask uploads cache task to scheduler. + rpc UploadCacheTask(UploadCacheTaskRequest)returns(common.v2.CacheTask); + // Checks information of cache task. rpc StatCacheTask(StatCacheTaskRequest)returns(common.v2.CacheTask); diff --git a/src/common.v2.rs b/src/common.v2.rs index ebceca4..255516e 100644 --- a/src/common.v2.rs +++ b/src/common.v2.rs @@ -171,11 +171,14 @@ pub struct CacheTask { /// Task state. #[prost(string, tag = "10")] pub state: ::prost::alloc::string::String, - /// Task create time. + /// TTL of the cache task. #[prost(message, optional, tag = "11")] + pub ttl: ::core::option::Option<::prost_wkt_types::Duration>, + /// Task create time. + #[prost(message, optional, tag = "12")] pub created_at: ::core::option::Option<::prost_wkt_types::Timestamp>, /// Task update time. - #[prost(message, optional, tag = "12")] + #[prost(message, optional, tag = "13")] pub updated_at: ::core::option::Option<::prost_wkt_types::Timestamp>, } /// Host metadata. diff --git a/src/descriptor.bin b/src/descriptor.bin index 11fbcda..1e7b6e1 100644 Binary files a/src/descriptor.bin and b/src/descriptor.bin differ diff --git a/src/dfdaemon.v2.rs b/src/dfdaemon.v2.rs index 95f4a40..34afc12 100644 --- a/src/dfdaemon.v2.rs +++ b/src/dfdaemon.v2.rs @@ -221,8 +221,11 @@ pub struct UploadCacheTaskRequest { /// Task piece length. #[prost(uint64, tag = "5")] pub piece_length: u64, - /// Download timeout. + /// TTL of the cache task. #[prost(message, optional, tag = "6")] + pub ttl: ::core::option::Option<::prost_wkt_types::Duration>, + /// Upload timeout. + #[prost(message, optional, tag = "7")] pub timeout: ::core::option::Option<::prost_wkt_types::Duration>, } /// StatCacheTaskRequest represents request of StatCacheTask. diff --git a/src/scheduler.v2.rs b/src/scheduler.v2.rs index 69c15f9..a2c5d8d 100644 --- a/src/scheduler.v2.rs +++ b/src/scheduler.v2.rs @@ -564,6 +564,39 @@ pub struct DeleteCachePeerRequest { #[prost(string, tag = "3")] pub peer_id: ::prost::alloc::string::String, } +/// UploadCacheTaskRequest represents request of UploadCacheTask. +#[derive(serde::Serialize, serde::Deserialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct UploadCacheTaskRequest { + /// Host id. + #[prost(string, tag = "1")] + pub host_id: ::prost::alloc::string::String, + /// Task id. + #[prost(string, tag = "2")] + pub task_id: ::prost::alloc::string::String, + /// Peer id. + #[prost(string, tag = "3")] + pub peer_id: ::prost::alloc::string::String, + /// Replica count of the persistent cache task. + #[prost(uint64, tag = "4")] + pub persistent_replica_count: u64, + /// Tag is used to distinguish different cache tasks. + #[prost(string, optional, tag = "5")] + pub tag: ::core::option::Option<::prost::alloc::string::String>, + /// Application of task. + #[prost(string, optional, tag = "6")] + pub application: ::core::option::Option<::prost::alloc::string::String>, + /// Task piece length. + #[prost(uint64, tag = "7")] + pub piece_length: u64, + /// TTL of the cache task. + #[prost(message, optional, tag = "8")] + pub ttl: ::core::option::Option<::prost_wkt_types::Duration>, + /// Upload timeout. + #[prost(message, optional, tag = "9")] + pub timeout: ::core::option::Option<::prost_wkt_types::Duration>, +} /// StatCacheTaskRequest represents request of StatCacheTask. #[derive(serde::Serialize, serde::Deserialize)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -946,6 +979,32 @@ pub mod scheduler_client { .insert(GrpcMethod::new("scheduler.v2.Scheduler", "DeleteCachePeer")); self.inner.unary(req, path, codec).await } + /// UploadCacheTask uploads cache task to scheduler. + pub async fn upload_cache_task( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/scheduler.v2.Scheduler/UploadCacheTask", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("scheduler.v2.Scheduler", "UploadCacheTask")); + self.inner.unary(req, path, codec).await + } /// Checks information of cache task. pub async fn stat_cache_task( &mut self, @@ -1095,6 +1154,14 @@ pub mod scheduler_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + /// UploadCacheTask uploads cache task to scheduler. + async fn upload_cache_task( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Checks information of cache task. async fn stat_cache_task( &self, @@ -1692,6 +1759,52 @@ pub mod scheduler_server { }; Box::pin(fut) } + "/scheduler.v2.Scheduler/UploadCacheTask" => { + #[allow(non_camel_case_types)] + struct UploadCacheTaskSvc(pub Arc); + impl< + T: Scheduler, + > tonic::server::UnaryService + for UploadCacheTaskSvc { + type Response = super::super::super::common::v2::CacheTask; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).upload_cache_task(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = UploadCacheTaskSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/scheduler.v2.Scheduler/StatCacheTask" => { #[allow(non_camel_case_types)] struct StatCacheTaskSvc(pub Arc);