diff --git a/Cargo.toml b/Cargo.toml index a7a4d01..99361b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ readme = "README.md" tonic = "0.9.0" prost = "0.11" prost-types = "0.11" -tokio = { version = "1.8.2", features = ["rt-multi-thread", "macros"] } +tokio = { version = "1.27.0", features = ["rt-multi-thread", "macros"] } [build-dependencies] -tonic-build = "0.8.3" +tonic-build = "0.9.0" diff --git a/src/dfdaemon.rs b/src/dfdaemon.rs index bb23478..a2484ab 100644 --- a/src/dfdaemon.rs +++ b/src/dfdaemon.rs @@ -106,7 +106,7 @@ pub mod dfdaemon_client { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -162,11 +162,27 @@ pub mod dfdaemon_client { self.inner = self.inner.accept_compressed(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } /// SyncPieces syncs pieces from the other peers. pub async fn sync_pieces( &mut self, request: impl tonic::IntoStreamingRequest, - ) -> Result< + ) -> std::result::Result< tonic::Response>, tonic::Status, > { @@ -183,13 +199,16 @@ pub mod dfdaemon_client { let path = http::uri::PathAndQuery::from_static( "/dfdaemon.Dfdaemon/SyncPieces", ); - self.inner.streaming(request.into_streaming_request(), path, codec).await + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert(GrpcMethod::new("dfdaemon.Dfdaemon", "SyncPieces")); + self.inner.streaming(req, path, codec).await } /// DownloadTask downloads task back-to-source. pub async fn download_task( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -203,13 +222,16 @@ pub mod dfdaemon_client { let path = http::uri::PathAndQuery::from_static( "/dfdaemon.Dfdaemon/DownloadTask", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("dfdaemon.Dfdaemon", "DownloadTask")); + self.inner.unary(req, path, codec).await } /// UploadTask uploads task to p2p network. pub async fn upload_task( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -223,13 +245,19 @@ pub mod dfdaemon_client { let path = http::uri::PathAndQuery::from_static( "/dfdaemon.Dfdaemon/UploadTask", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("dfdaemon.Dfdaemon", "UploadTask")); + self.inner.unary(req, path, codec).await } /// StatTask stats task information. pub async fn stat_task( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -243,13 +271,16 @@ pub mod dfdaemon_client { let path = http::uri::PathAndQuery::from_static( "/dfdaemon.Dfdaemon/StatTask", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("dfdaemon.Dfdaemon", "StatTask")); + self.inner.unary(req, path, codec).await } /// DeleteTask deletes task from p2p network. pub async fn delete_task( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -263,7 +294,10 @@ pub mod dfdaemon_client { let path = http::uri::PathAndQuery::from_static( "/dfdaemon.Dfdaemon/DeleteTask", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("dfdaemon.Dfdaemon", "DeleteTask")); + self.inner.unary(req, path, codec).await } } } @@ -276,7 +310,7 @@ pub mod dfdaemon_server { pub trait Dfdaemon: Send + Sync + 'static { /// Server streaming response type for the SyncPieces method. type SyncPiecesStream: futures_core::Stream< - Item = Result, + Item = std::result::Result, > + Send + 'static; @@ -284,27 +318,30 @@ pub mod dfdaemon_server { async fn sync_pieces( &self, request: tonic::Request>, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// DownloadTask downloads task back-to-source. async fn download_task( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// UploadTask uploads task to p2p network. async fn upload_task( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// StatTask stats task information. async fn stat_task( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// DeleteTask deletes task from p2p network. async fn delete_task( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } /// Dfdaemon RPC Service. #[derive(Debug)] @@ -312,6 +349,8 @@ pub mod dfdaemon_server { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl DfdaemonServer { @@ -324,6 +363,8 @@ pub mod dfdaemon_server { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor( @@ -347,6 +388,22 @@ pub mod dfdaemon_server { self.send_compression_encodings.enable(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for DfdaemonServer where @@ -360,7 +417,7 @@ pub mod dfdaemon_server { fn poll_ready( &mut self, _cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -385,13 +442,15 @@ pub mod dfdaemon_server { tonic::Streaming, >, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).sync_pieces(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -401,6 +460,10 @@ pub mod dfdaemon_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.streaming(method, req).await; Ok(res) @@ -423,7 +486,7 @@ pub mod dfdaemon_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).download_task(request).await }; @@ -432,6 +495,8 @@ pub mod dfdaemon_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -441,6 +506,10 @@ pub mod dfdaemon_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -463,13 +532,15 @@ pub mod dfdaemon_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).upload_task(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -479,6 +550,10 @@ pub mod dfdaemon_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -499,13 +574,15 @@ pub mod dfdaemon_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).stat_task(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -515,6 +592,10 @@ pub mod dfdaemon_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -537,13 +618,15 @@ pub mod dfdaemon_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).delete_task(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -553,6 +636,10 @@ pub mod dfdaemon_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -581,12 +668,14 @@ pub mod dfdaemon_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { diff --git a/src/manager.rs b/src/manager.rs index f2fd222..9274923 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -472,7 +472,7 @@ pub mod manager_client { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -528,11 +528,27 @@ pub mod manager_client { self.inner = self.inner.accept_compressed(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } /// Get SeedPeer and SeedPeer cluster configuration. pub async fn get_seed_peer( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -546,13 +562,16 @@ pub mod manager_client { let path = http::uri::PathAndQuery::from_static( "/manager.Manager/GetSeedPeer", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("manager.Manager", "GetSeedPeer")); + self.inner.unary(req, path, codec).await } /// Update SeedPeer configuration. pub async fn update_seed_peer( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -566,13 +585,16 @@ pub mod manager_client { let path = http::uri::PathAndQuery::from_static( "/manager.Manager/UpdateSeedPeer", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("manager.Manager", "UpdateSeedPeer")); + self.inner.unary(req, path, codec).await } /// Get Scheduler and Scheduler cluster configuration. pub async fn get_scheduler( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -586,13 +608,16 @@ pub mod manager_client { let path = http::uri::PathAndQuery::from_static( "/manager.Manager/GetScheduler", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("manager.Manager", "GetScheduler")); + self.inner.unary(req, path, codec).await } /// Update scheduler configuration. pub async fn update_scheduler( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -606,13 +631,19 @@ pub mod manager_client { let path = http::uri::PathAndQuery::from_static( "/manager.Manager/UpdateScheduler", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("manager.Manager", "UpdateScheduler")); + self.inner.unary(req, path, codec).await } /// List acitve schedulers configuration. pub async fn list_schedulers( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -626,13 +657,16 @@ pub mod manager_client { let path = http::uri::PathAndQuery::from_static( "/manager.Manager/ListSchedulers", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("manager.Manager", "ListSchedulers")); + self.inner.unary(req, path, codec).await } /// Get ObjectStorage configuration. pub async fn get_object_storage( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -646,13 +680,19 @@ pub mod manager_client { let path = http::uri::PathAndQuery::from_static( "/manager.Manager/GetObjectStorage", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("manager.Manager", "GetObjectStorage")); + self.inner.unary(req, path, codec).await } /// List buckets configuration. pub async fn list_buckets( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -666,13 +706,19 @@ pub mod manager_client { let path = http::uri::PathAndQuery::from_static( "/manager.Manager/ListBuckets", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("manager.Manager", "ListBuckets")); + self.inner.unary(req, path, codec).await } /// List applications configuration. pub async fn list_applications( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -686,13 +732,16 @@ pub mod manager_client { let path = http::uri::PathAndQuery::from_static( "/manager.Manager/ListApplications", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("manager.Manager", "ListApplications")); + self.inner.unary(req, path, codec).await } /// KeepAlive with manager. pub async fn keep_alive( &mut self, request: impl tonic::IntoStreamingRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -706,9 +755,9 @@ pub mod manager_client { let path = http::uri::PathAndQuery::from_static( "/manager.Manager/KeepAlive", ); - self.inner - .client_streaming(request.into_streaming_request(), path, codec) - .await + let mut req = request.into_streaming_request(); + req.extensions_mut().insert(GrpcMethod::new("manager.Manager", "KeepAlive")); + self.inner.client_streaming(req, path, codec).await } } } @@ -723,47 +772,56 @@ pub mod manager_server { async fn get_seed_peer( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// Update SeedPeer configuration. async fn update_seed_peer( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// Get Scheduler and Scheduler cluster configuration. async fn get_scheduler( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// Update scheduler configuration. async fn update_scheduler( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// List acitve schedulers configuration. async fn list_schedulers( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Get ObjectStorage configuration. async fn get_object_storage( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// List buckets configuration. async fn list_buckets( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// List applications configuration. async fn list_applications( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// KeepAlive with manager. async fn keep_alive( &self, request: tonic::Request>, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } /// Manager RPC Service. #[derive(Debug)] @@ -771,6 +829,8 @@ pub mod manager_server { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl ManagerServer { @@ -783,6 +843,8 @@ pub mod manager_server { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor( @@ -806,6 +868,22 @@ pub mod manager_server { self.send_compression_encodings.enable(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for ManagerServer where @@ -819,7 +897,7 @@ pub mod manager_server { fn poll_ready( &mut self, _cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -841,7 +919,7 @@ pub mod manager_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).get_seed_peer(request).await }; @@ -850,6 +928,8 @@ pub mod manager_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -859,6 +939,10 @@ pub mod manager_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -881,7 +965,7 @@ pub mod manager_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).update_seed_peer(request).await }; @@ -890,6 +974,8 @@ pub mod manager_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -899,6 +985,10 @@ pub mod manager_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -921,7 +1011,7 @@ pub mod manager_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).get_scheduler(request).await }; @@ -930,6 +1020,8 @@ pub mod manager_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -939,6 +1031,10 @@ pub mod manager_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -961,7 +1057,7 @@ pub mod manager_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).update_scheduler(request).await }; @@ -970,6 +1066,8 @@ pub mod manager_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -979,6 +1077,10 @@ pub mod manager_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1001,7 +1103,7 @@ pub mod manager_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).list_schedulers(request).await }; @@ -1010,6 +1112,8 @@ pub mod manager_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1019,6 +1123,10 @@ pub mod manager_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1041,7 +1149,7 @@ pub mod manager_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).get_object_storage(request).await }; @@ -1050,6 +1158,8 @@ pub mod manager_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1059,6 +1169,10 @@ pub mod manager_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1081,7 +1195,7 @@ pub mod manager_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).list_buckets(request).await }; @@ -1090,6 +1204,8 @@ pub mod manager_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1099,6 +1215,10 @@ pub mod manager_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1121,7 +1241,7 @@ pub mod manager_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).list_applications(request).await }; @@ -1130,6 +1250,8 @@ pub mod manager_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1139,6 +1261,10 @@ pub mod manager_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1163,13 +1289,15 @@ pub mod manager_server { tonic::Streaming, >, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).keep_alive(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1179,6 +1307,10 @@ pub mod manager_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.client_streaming(method, req).await; Ok(res) @@ -1207,12 +1339,14 @@ pub mod manager_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { diff --git a/src/scheduler.rs b/src/scheduler.rs index 6893110..9c7ad60 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -451,7 +451,7 @@ pub mod scheduler_client { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -507,13 +507,29 @@ pub mod scheduler_client { self.inner = self.inner.accept_compressed(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } /// AnnouncePeer announces peer to scheduler. pub async fn announce_peer( &mut self, request: impl tonic::IntoStreamingRequest< Message = super::AnnouncePeerRequest, >, - ) -> Result< + ) -> std::result::Result< tonic::Response>, tonic::Status, > { @@ -530,13 +546,19 @@ pub mod scheduler_client { let path = http::uri::PathAndQuery::from_static( "/scheduler.Scheduler/AnnouncePeer", ); - self.inner.streaming(request.into_streaming_request(), path, codec).await + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert(GrpcMethod::new("scheduler.Scheduler", "AnnouncePeer")); + self.inner.streaming(req, path, codec).await } /// Checks information of peer. pub async fn stat_peer( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -550,13 +572,16 @@ pub mod scheduler_client { let path = http::uri::PathAndQuery::from_static( "/scheduler.Scheduler/StatPeer", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("scheduler.Scheduler", "StatPeer")); + self.inner.unary(req, path, codec).await } /// LeavePeer releases peer in scheduler. pub async fn leave_peer( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -570,14 +595,20 @@ pub mod scheduler_client { let path = http::uri::PathAndQuery::from_static( "/scheduler.Scheduler/LeavePeer", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("scheduler.Scheduler", "LeavePeer")); + self.inner.unary(req, path, codec).await } /// TODO exchange peer api definition. /// ExchangePeer exchanges peer information. pub async fn exchange_peer( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -591,13 +622,19 @@ pub mod scheduler_client { let path = http::uri::PathAndQuery::from_static( "/scheduler.Scheduler/ExchangePeer", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("scheduler.Scheduler", "ExchangePeer")); + self.inner.unary(req, path, codec).await } /// Checks information of task. pub async fn stat_task( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -611,13 +648,16 @@ pub mod scheduler_client { let path = http::uri::PathAndQuery::from_static( "/scheduler.Scheduler/StatTask", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("scheduler.Scheduler", "StatTask")); + self.inner.unary(req, path, codec).await } /// AnnounceHost announces host to scheduler. pub async fn announce_host( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -631,13 +671,16 @@ pub mod scheduler_client { let path = http::uri::PathAndQuery::from_static( "/scheduler.Scheduler/AnnounceHost", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("scheduler.Scheduler", "AnnounceHost")); + self.inner.unary(req, path, codec).await } /// LeaveHost releases host in scheduler. pub async fn leave_host( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -651,13 +694,16 @@ pub mod scheduler_client { let path = http::uri::PathAndQuery::from_static( "/scheduler.Scheduler/LeaveHost", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("scheduler.Scheduler", "LeaveHost")); + self.inner.unary(req, path, codec).await } /// SyncProbes sync probes of the host. pub async fn sync_probes( &mut self, request: impl tonic::IntoStreamingRequest, - ) -> Result< + ) -> std::result::Result< tonic::Response>, tonic::Status, > { @@ -674,7 +720,10 @@ pub mod scheduler_client { let path = http::uri::PathAndQuery::from_static( "/scheduler.Scheduler/SyncProbes", ); - self.inner.streaming(request.into_streaming_request(), path, codec).await + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert(GrpcMethod::new("scheduler.Scheduler", "SyncProbes")); + self.inner.streaming(req, path, codec).await } /// SyncNetworkTopology sync network topology of the hosts. pub async fn sync_network_topology( @@ -682,7 +731,7 @@ pub mod scheduler_client { request: impl tonic::IntoStreamingRequest< Message = super::SyncNetworkTopologyRequest, >, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -696,9 +745,10 @@ pub mod scheduler_client { let path = http::uri::PathAndQuery::from_static( "/scheduler.Scheduler/SyncNetworkTopology", ); - self.inner - .client_streaming(request.into_streaming_request(), path, codec) - .await + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert(GrpcMethod::new("scheduler.Scheduler", "SyncNetworkTopology")); + self.inner.client_streaming(req, path, codec).await } } } @@ -711,7 +761,7 @@ pub mod scheduler_server { pub trait Scheduler: Send + Sync + 'static { /// Server streaming response type for the AnnouncePeer method. type AnnouncePeerStream: futures_core::Stream< - Item = Result, + Item = std::result::Result, > + Send + 'static; @@ -719,41 +769,53 @@ pub mod scheduler_server { async fn announce_peer( &self, request: tonic::Request>, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Checks information of peer. async fn stat_peer( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// LeavePeer releases peer in scheduler. async fn leave_peer( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// TODO exchange peer api definition. /// ExchangePeer exchanges peer information. async fn exchange_peer( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Checks information of task. async fn stat_task( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// AnnounceHost announces host to scheduler. async fn announce_host( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// LeaveHost releases host in scheduler. async fn leave_host( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// Server streaming response type for the SyncProbes method. type SyncProbesStream: futures_core::Stream< - Item = Result, + Item = std::result::Result, > + Send + 'static; @@ -761,12 +823,12 @@ pub mod scheduler_server { async fn sync_probes( &self, request: tonic::Request>, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// SyncNetworkTopology sync network topology of the hosts. async fn sync_network_topology( &self, request: tonic::Request>, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } /// Scheduler RPC Service. #[derive(Debug)] @@ -774,6 +836,8 @@ pub mod scheduler_server { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl SchedulerServer { @@ -786,6 +850,8 @@ pub mod scheduler_server { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor( @@ -809,6 +875,22 @@ pub mod scheduler_server { self.send_compression_encodings.enable(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for SchedulerServer where @@ -822,7 +904,7 @@ pub mod scheduler_server { fn poll_ready( &mut self, _cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -847,7 +929,7 @@ pub mod scheduler_server { tonic::Streaming, >, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).announce_peer(request).await }; @@ -856,6 +938,8 @@ pub mod scheduler_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -865,6 +949,10 @@ pub mod scheduler_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.streaming(method, req).await; Ok(res) @@ -887,13 +975,15 @@ pub mod scheduler_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).stat_peer(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -903,6 +993,10 @@ pub mod scheduler_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -925,13 +1019,15 @@ pub mod scheduler_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).leave_peer(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -941,6 +1037,10 @@ pub mod scheduler_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -963,7 +1063,7 @@ pub mod scheduler_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).exchange_peer(request).await }; @@ -972,6 +1072,8 @@ pub mod scheduler_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -981,6 +1083,10 @@ pub mod scheduler_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1003,13 +1109,15 @@ pub mod scheduler_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).stat_task(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1019,6 +1127,10 @@ pub mod scheduler_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1041,7 +1153,7 @@ pub mod scheduler_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).announce_host(request).await }; @@ -1050,6 +1162,8 @@ pub mod scheduler_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1059,6 +1173,10 @@ pub mod scheduler_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1081,13 +1199,15 @@ pub mod scheduler_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).leave_host(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1097,6 +1217,10 @@ pub mod scheduler_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1122,13 +1246,15 @@ pub mod scheduler_server { tonic::Streaming, >, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).sync_probes(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1138,6 +1264,10 @@ pub mod scheduler_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.streaming(method, req).await; Ok(res) @@ -1163,7 +1293,7 @@ pub mod scheduler_server { tonic::Streaming, >, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).sync_network_topology(request).await }; @@ -1172,6 +1302,8 @@ pub mod scheduler_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1181,6 +1313,10 @@ pub mod scheduler_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.client_streaming(method, req).await; Ok(res) @@ -1209,12 +1345,14 @@ pub mod scheduler_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { diff --git a/src/security.rs b/src/security.rs index 4a15340..f999fc8 100644 --- a/src/security.rs +++ b/src/security.rs @@ -36,7 +36,7 @@ pub mod certificate_client { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -92,11 +92,30 @@ pub mod certificate_client { self.inner = self.inner.accept_compressed(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } /// Using provided CSR, returns a signed certificate. pub async fn issue_certificate( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -110,7 +129,10 @@ pub mod certificate_client { let path = http::uri::PathAndQuery::from_static( "/security.Certificate/IssueCertificate", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("security.Certificate", "IssueCertificate")); + self.inner.unary(req, path, codec).await } } } @@ -125,7 +147,10 @@ pub mod certificate_server { async fn issue_certificate( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// Service for managing certificates issued by the CA. #[derive(Debug)] @@ -133,6 +158,8 @@ pub mod certificate_server { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl CertificateServer { @@ -145,6 +172,8 @@ pub mod certificate_server { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor( @@ -168,6 +197,22 @@ pub mod certificate_server { self.send_compression_encodings.enable(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for CertificateServer where @@ -181,7 +226,7 @@ pub mod certificate_server { fn poll_ready( &mut self, _cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -203,7 +248,7 @@ pub mod certificate_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).issue_certificate(request).await }; @@ -212,6 +257,8 @@ pub mod certificate_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -221,6 +268,10 @@ pub mod certificate_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -249,12 +300,14 @@ pub mod certificate_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner {