feat: add upload cache task for scheduler (#328)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-06-18 14:29:45 +08:00 committed by GitHub
parent e1e59d1696
commit 240563b417
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 186 additions and 11 deletions

2
Cargo.lock generated
View File

@ -160,7 +160,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-api" name = "dragonfly-api"
version = "2.0.121" version = "2.0.122"
dependencies = [ dependencies = [
"prost 0.11.9", "prost 0.11.9",
"prost-types 0.12.6", "prost-types 0.12.6",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "dragonfly-api" name = "dragonfly-api"
version = "2.0.121" version = "2.0.122"
authors = ["Gaius <gaius.qi@gmail.com>"] authors = ["Gaius <gaius.qi@gmail.com>"]
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"

View File

@ -175,8 +175,10 @@ message UploadCacheTaskRequest {
optional string application = 4; optional string application = 4;
// Task piece length. // Task piece length.
uint64 piece_length = 5 [(validate.rules).uint64.gte = 1]; uint64 piece_length = 5 [(validate.rules).uint64.gte = 1];
// Download timeout. // TTL of the cache task.
optional google.protobuf.Duration timeout = 6; google.protobuf.Duration ttl = 6 [(validate.rules).duration = {gte.seconds: 60, lte.seconds: 604800}];
// Upload timeout.
optional google.protobuf.Duration timeout = 7;
} }
// StatCacheTaskRequest represents request of StatCacheTask. // StatCacheTaskRequest represents request of StatCacheTask.

View File

@ -373,6 +373,28 @@ message DeleteCachePeerRequest {
string peer_id = 3 [(validate.rules).string.min_len = 1]; string peer_id = 3 [(validate.rules).string.min_len = 1];
} }
// UploadCacheTaskRequest represents request of UploadCacheTask.
message UploadCacheTaskRequest {
// Host id.
string host_id = 1 [(validate.rules).string.min_len = 1];
// Task id.
string task_id = 2 [(validate.rules).string.min_len = 1];
// Peer id.
string peer_id = 3 [(validate.rules).string.min_len = 1];
// Replica count of the persistent cache task.
uint64 persistent_replica_count = 4 [(validate.rules).uint64.gte = 1];
// Tag is used to distinguish different cache tasks.
optional string tag = 5;
// Application of task.
optional string application = 6;
// Task piece length.
uint64 piece_length = 7 [(validate.rules).uint64.gte = 1];
// TTL of the cache task.
google.protobuf.Duration ttl = 8 [(validate.rules).duration = {gte.seconds: 60, lte.seconds: 604800}];
// Upload timeout.
optional google.protobuf.Duration timeout = 9;
}
// StatCacheTaskRequest represents request of StatCacheTask. // StatCacheTaskRequest represents request of StatCacheTask.
message StatCacheTaskRequest { message StatCacheTaskRequest {
// Task id. // Task id.
@ -422,6 +444,9 @@ service Scheduler {
// DeleteCachePeer releases cache peer in scheduler. // DeleteCachePeer releases cache peer in scheduler.
rpc DeleteCachePeer(DeleteCachePeerRequest)returns(google.protobuf.Empty); rpc DeleteCachePeer(DeleteCachePeerRequest)returns(google.protobuf.Empty);
// UploadCacheTask uploads cache task to scheduler.
rpc UploadCacheTask(UploadCacheTaskRequest)returns(common.v2.CacheTask);
// Checks information of cache task. // Checks information of cache task.
rpc StatCacheTask(StatCacheTaskRequest)returns(common.v2.CacheTask); rpc StatCacheTask(StatCacheTaskRequest)returns(common.v2.CacheTask);

View File

@ -214,10 +214,12 @@ message CacheTask {
uint32 piece_count = 9; uint32 piece_count = 9;
// Task state. // Task state.
string state = 10; string state = 10;
// TTL of the cache task.
google.protobuf.Duration ttl = 11;
// Task create time. // Task create time.
google.protobuf.Timestamp created_at = 11; google.protobuf.Timestamp created_at = 12;
// Task update time. // Task update time.
google.protobuf.Timestamp updated_at = 12; google.protobuf.Timestamp updated_at = 13;
} }
// Host metadata. // Host metadata.

View File

@ -162,8 +162,10 @@ message UploadCacheTaskRequest {
optional string application = 4; optional string application = 4;
// Task piece length. // Task piece length.
uint64 piece_length = 5; uint64 piece_length = 5;
// Download timeout. // TTL of the cache task.
optional google.protobuf.Duration timeout = 6; google.protobuf.Duration ttl = 6;
// Upload timeout.
optional google.protobuf.Duration timeout = 7;
} }
// StatCacheTaskRequest represents request of StatCacheTask. // StatCacheTaskRequest represents request of StatCacheTask.

View File

@ -357,6 +357,28 @@ message DeleteCachePeerRequest {
string peer_id = 3; string peer_id = 3;
} }
// UploadCacheTaskRequest represents request of UploadCacheTask.
message UploadCacheTaskRequest {
// Host id.
string host_id = 1;
// Task id.
string task_id = 2;
// Peer id.
string peer_id = 3;
// Replica count of the persistent cache task.
uint64 persistent_replica_count = 4;
// Tag is used to distinguish different cache tasks.
optional string tag = 5;
// Application of task.
optional string application = 6;
// Task piece length.
uint64 piece_length = 7;
// TTL of the cache task.
google.protobuf.Duration ttl = 8;
// Upload timeout.
optional google.protobuf.Duration timeout = 9;
}
// StatCacheTaskRequest represents request of StatCacheTask. // StatCacheTaskRequest represents request of StatCacheTask.
message StatCacheTaskRequest { message StatCacheTaskRequest {
// Task id. // Task id.
@ -406,6 +428,9 @@ service Scheduler{
// DeleteCachePeer releases cache peer in scheduler. // DeleteCachePeer releases cache peer in scheduler.
rpc DeleteCachePeer(DeleteCachePeerRequest)returns(google.protobuf.Empty); rpc DeleteCachePeer(DeleteCachePeerRequest)returns(google.protobuf.Empty);
// UploadCacheTask uploads cache task to scheduler.
rpc UploadCacheTask(UploadCacheTaskRequest)returns(common.v2.CacheTask);
// Checks information of cache task. // Checks information of cache task.
rpc StatCacheTask(StatCacheTaskRequest)returns(common.v2.CacheTask); rpc StatCacheTask(StatCacheTaskRequest)returns(common.v2.CacheTask);

View File

@ -171,11 +171,14 @@ pub struct CacheTask {
/// Task state. /// Task state.
#[prost(string, tag = "10")] #[prost(string, tag = "10")]
pub state: ::prost::alloc::string::String, pub state: ::prost::alloc::string::String,
/// Task create time. /// TTL of the cache task.
#[prost(message, optional, tag = "11")] #[prost(message, optional, tag = "11")]
pub ttl: ::core::option::Option<::prost_wkt_types::Duration>,
/// Task create time.
#[prost(message, optional, tag = "12")]
pub created_at: ::core::option::Option<::prost_wkt_types::Timestamp>, pub created_at: ::core::option::Option<::prost_wkt_types::Timestamp>,
/// Task update time. /// Task update time.
#[prost(message, optional, tag = "12")] #[prost(message, optional, tag = "13")]
pub updated_at: ::core::option::Option<::prost_wkt_types::Timestamp>, pub updated_at: ::core::option::Option<::prost_wkt_types::Timestamp>,
} }
/// Host metadata. /// Host metadata.

Binary file not shown.

View File

@ -221,8 +221,11 @@ pub struct UploadCacheTaskRequest {
/// Task piece length. /// Task piece length.
#[prost(uint64, tag = "5")] #[prost(uint64, tag = "5")]
pub piece_length: u64, pub piece_length: u64,
/// Download timeout. /// TTL of the cache task.
#[prost(message, optional, tag = "6")] #[prost(message, optional, tag = "6")]
pub ttl: ::core::option::Option<::prost_wkt_types::Duration>,
/// Upload timeout.
#[prost(message, optional, tag = "7")]
pub timeout: ::core::option::Option<::prost_wkt_types::Duration>, pub timeout: ::core::option::Option<::prost_wkt_types::Duration>,
} }
/// StatCacheTaskRequest represents request of StatCacheTask. /// StatCacheTaskRequest represents request of StatCacheTask.

View File

@ -564,6 +564,39 @@ pub struct DeleteCachePeerRequest {
#[prost(string, tag = "3")] #[prost(string, tag = "3")]
pub peer_id: ::prost::alloc::string::String, pub peer_id: ::prost::alloc::string::String,
} }
/// UploadCacheTaskRequest represents request of UploadCacheTask.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UploadCacheTaskRequest {
/// Host id.
#[prost(string, tag = "1")]
pub host_id: ::prost::alloc::string::String,
/// Task id.
#[prost(string, tag = "2")]
pub task_id: ::prost::alloc::string::String,
/// Peer id.
#[prost(string, tag = "3")]
pub peer_id: ::prost::alloc::string::String,
/// Replica count of the persistent cache task.
#[prost(uint64, tag = "4")]
pub persistent_replica_count: u64,
/// Tag is used to distinguish different cache tasks.
#[prost(string, optional, tag = "5")]
pub tag: ::core::option::Option<::prost::alloc::string::String>,
/// Application of task.
#[prost(string, optional, tag = "6")]
pub application: ::core::option::Option<::prost::alloc::string::String>,
/// Task piece length.
#[prost(uint64, tag = "7")]
pub piece_length: u64,
/// TTL of the cache task.
#[prost(message, optional, tag = "8")]
pub ttl: ::core::option::Option<::prost_wkt_types::Duration>,
/// Upload timeout.
#[prost(message, optional, tag = "9")]
pub timeout: ::core::option::Option<::prost_wkt_types::Duration>,
}
/// StatCacheTaskRequest represents request of StatCacheTask. /// StatCacheTaskRequest represents request of StatCacheTask.
#[derive(serde::Serialize, serde::Deserialize)] #[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)] #[allow(clippy::derive_partial_eq_without_eq)]
@ -946,6 +979,32 @@ pub mod scheduler_client {
.insert(GrpcMethod::new("scheduler.v2.Scheduler", "DeleteCachePeer")); .insert(GrpcMethod::new("scheduler.v2.Scheduler", "DeleteCachePeer"));
self.inner.unary(req, path, codec).await self.inner.unary(req, path, codec).await
} }
/// UploadCacheTask uploads cache task to scheduler.
pub async fn upload_cache_task(
&mut self,
request: impl tonic::IntoRequest<super::UploadCacheTaskRequest>,
) -> std::result::Result<
tonic::Response<super::super::super::common::v2::CacheTask>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/scheduler.v2.Scheduler/UploadCacheTask",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("scheduler.v2.Scheduler", "UploadCacheTask"));
self.inner.unary(req, path, codec).await
}
/// Checks information of cache task. /// Checks information of cache task.
pub async fn stat_cache_task( pub async fn stat_cache_task(
&mut self, &mut self,
@ -1095,6 +1154,14 @@ pub mod scheduler_server {
&self, &self,
request: tonic::Request<super::DeleteCachePeerRequest>, request: tonic::Request<super::DeleteCachePeerRequest>,
) -> std::result::Result<tonic::Response<()>, tonic::Status>; ) -> std::result::Result<tonic::Response<()>, tonic::Status>;
/// UploadCacheTask uploads cache task to scheduler.
async fn upload_cache_task(
&self,
request: tonic::Request<super::UploadCacheTaskRequest>,
) -> std::result::Result<
tonic::Response<super::super::super::common::v2::CacheTask>,
tonic::Status,
>;
/// Checks information of cache task. /// Checks information of cache task.
async fn stat_cache_task( async fn stat_cache_task(
&self, &self,
@ -1692,6 +1759,52 @@ pub mod scheduler_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/scheduler.v2.Scheduler/UploadCacheTask" => {
#[allow(non_camel_case_types)]
struct UploadCacheTaskSvc<T: Scheduler>(pub Arc<T>);
impl<
T: Scheduler,
> tonic::server::UnaryService<super::UploadCacheTaskRequest>
for UploadCacheTaskSvc<T> {
type Response = super::super::super::common::v2::CacheTask;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::UploadCacheTaskRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).upload_cache_task(request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = UploadCacheTaskSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/scheduler.v2.Scheduler/StatCacheTask" => { "/scheduler.v2.Scheduler/StatCacheTask" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct StatCacheTaskSvc<T: Scheduler>(pub Arc<T>); struct StatCacheTaskSvc<T: Scheduler>(pub Arc<T>);