diff --git a/src/backend/http.rs b/src/backend/http.rs index 2005111a..266b16ac 100644 --- a/src/backend/http.rs +++ b/src/backend/http.rs @@ -30,7 +30,7 @@ pub struct Request { pub header: HeaderMap, // timeout is the timeout of the request. - pub timeout: Option, + pub timeout: Duration, } // HeadResponse is the head response for HTTP backend. @@ -70,13 +70,9 @@ impl HTTP { } // Head gets the header of the request. - pub async fn head(&self, req: Request) -> Result { - let mut request_builder = self.client.head(&req.url).headers(req.header); - if let Some(timeout) = req.timeout { - request_builder = request_builder.timeout(timeout); - } else { - request_builder = request_builder.timeout(super::REQUEST_TIMEOUT); - } + pub async fn head(&self, request: Request) -> Result { + let mut request_builder = self.client.head(&request.url).headers(request.header); + request_builder = request_builder.timeout(request.timeout); let response = request_builder.send().await?; let header = response.headers().clone(); @@ -89,13 +85,9 @@ impl HTTP { } // Get gets the content of the request. - pub async fn get(&self, req: Request) -> Result> { - let mut request_builder = self.client.get(&req.url).headers(req.header); - if let Some(timeout) = req.timeout { - request_builder = request_builder.timeout(timeout); - } else { - request_builder = request_builder.timeout(super::REQUEST_TIMEOUT); - } + pub async fn get(&self, request: Request) -> Result> { + let mut request_builder = self.client.get(&request.url).headers(request.header); + request_builder = request_builder.timeout(request.timeout); let response = request_builder.send().await?; let header = response.headers().clone(); diff --git a/src/backend/mod.rs b/src/backend/mod.rs index 6df9cc1f..780033dd 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -15,7 +15,3 @@ */ pub mod http; -use std::time::Duration; - -// REQUEST_TIMEOUT is the timeout for requests. -const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); diff --git a/src/bin/dfdaemon/main.rs b/src/bin/dfdaemon/main.rs index 572380eb..12de9b98 100644 --- a/src/bin/dfdaemon/main.rs +++ b/src/bin/dfdaemon/main.rs @@ -133,6 +133,7 @@ async fn main() -> Result<(), anyhow::Error> { // Initialize task manager. let task = Task::new( + config.clone(), id_generator.clone(), storage.clone(), scheduler_client.clone(), diff --git a/src/bin/dfget/main.rs b/src/bin/dfget/main.rs index be04e0b4..a26e5f01 100644 --- a/src/bin/dfget/main.rs +++ b/src/bin/dfget/main.rs @@ -53,7 +53,7 @@ struct Args { #[arg( short = 'e', long = "endpoint", - default_value_os_t = dfdaemon::default_dfdaemon_unix_socket_path(), + default_value_os_t = dfdaemon::default_download_unix_socket_path(), help = "Endpoint of dfdaemon's GRPC server" )] endpoint: PathBuf, @@ -158,7 +158,6 @@ async fn main() -> Result<(), anyhow::Error> { // Create dfdaemon client. let client = DfdaemonClient::new_unix(args.endpoint).await.unwrap(); - let response = client .download_task(DownloadTaskRequest { download: Some(Download { diff --git a/src/bin/dfstore/main.rs b/src/bin/dfstore/main.rs index fcafe5fc..07383583 100644 --- a/src/bin/dfstore/main.rs +++ b/src/bin/dfstore/main.rs @@ -36,7 +36,7 @@ struct Args { #[arg( short = 'e', long = "endpoint", - default_value_os_t = dfdaemon::default_dfdaemon_unix_socket_path(), + default_value_os_t = dfdaemon::default_download_unix_socket_path(), help = "Endpoint of dfdaemon's GRPC server" )] endpoint: PathBuf, diff --git a/src/config/dfdaemon.rs b/src/config/dfdaemon.rs index e240fbf3..d9b4d408 100644 --- a/src/config/dfdaemon.rs +++ b/src/config/dfdaemon.rs @@ -26,8 +26,8 @@ use validator::Validate; // NAME is the name of dfdaemon. pub const NAME: &str = "dfdaemon"; -// DEFAULT_GRPC_SERVER_PORT is the default port of the grpc server. -const DEFAULT_GRPC_SERVER_PORT: u16 = 65000; +// DEFAULT_UPLOAD_GRPC_SERVER_PORT is the default port of the upload grpc server. +const DEFAULT_UPLOAD_GRPC_SERVER_PORT: u16 = 65000; // DEFAULT_PROXY_SERVER_PORT is the default port of the proxy server. // const DEFAULT_PROXY_SERVER_PORT: u16 = 65001; @@ -61,11 +61,16 @@ pub fn default_dfdaemon_cache_dir() -> PathBuf { super::default_cache_dir().join(NAME) } -// default_dfdaemon_unix_socket_path is the default unix socket path for dfdaemon GRPC service. -pub fn default_dfdaemon_unix_socket_path() -> PathBuf { +// default_download_unix_socket_path is the default unix socket path for download GRPC service. +pub fn default_download_unix_socket_path() -> PathBuf { super::default_root_dir().join("dfdaemon.sock") } +// default_download_piece_timeout is the default timeout for downloading a piece from source. +pub fn default_download_piece_timeout() -> Duration { + Duration::from_secs(120) +} + // default_dfdaemon_lock_path is the default file lock path for dfdaemon service. pub fn default_dfdaemon_lock_path() -> PathBuf { super::default_lock_dir().join("dfdaemon.lock") @@ -177,19 +182,31 @@ pub struct DwonloadServer { impl Default for DwonloadServer { fn default() -> Self { Self { - socket_path: default_dfdaemon_unix_socket_path(), + socket_path: default_download_unix_socket_path(), } } } // Server is the server configuration for dfdaemon. -#[derive(Debug, Clone, Default, Validate, Deserialize)] +#[derive(Debug, Clone, Validate, Deserialize)] #[serde(default, rename_all = "camelCase")] pub struct Download { // server is the download server configuration for dfdaemon. pub server: DwonloadServer, + + // piece_timeout is the timeout for downloading a piece from source. + pub piece_timeout: Duration, } +// Server implements default value for Server. +impl Default for Download { + fn default() -> Self { + Self { + server: DwonloadServer::default(), + piece_timeout: default_download_piece_timeout(), + } + } +} // UploadServer is the upload server configuration for dfdaemon. #[derive(Debug, Clone, Validate, Deserialize)] #[serde(default, rename_all = "camelCase")] @@ -206,7 +223,7 @@ impl Default for UploadServer { fn default() -> Self { Self { ip: None, - port: DEFAULT_GRPC_SERVER_PORT, + port: DEFAULT_UPLOAD_GRPC_SERVER_PORT, } } } diff --git a/src/grpc/dfdaemon.rs b/src/grpc/dfdaemon.rs index b511dd84..9a29d350 100644 --- a/src/grpc/dfdaemon.rs +++ b/src/grpc/dfdaemon.rs @@ -410,12 +410,7 @@ impl Dfdaemon for DfdaemonServerHandler { // Get the content length. let content_length = self .task - .get_content_length( - task_id.as_str(), - download.url.as_str(), - header.clone(), - None, - ) + .get_content_length(task_id.as_str(), download.url.as_str(), header.clone()) .await .map_err(|e| { // Download task failed. diff --git a/src/task/mod.rs b/src/task/mod.rs index 5c736cd4..e5302536 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -15,6 +15,7 @@ */ use crate::backend::http::{Request as HTTPRequest, HTTP}; +use crate::config::dfdaemon::Config; use crate::grpc::scheduler::SchedulerClient; use crate::storage::{metadata, Storage}; use crate::utils::http::headermap_to_hashmap; @@ -32,7 +33,6 @@ use mpsc::Sender; use reqwest::header::{self, HeaderMap}; use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; use tokio::fs::{self, OpenOptions}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -44,6 +44,9 @@ pub mod piece; // Task represents a task manager. pub struct Task { + // config is the configuration of the dfdaemon. + config: Arc, + // id_generator is the id generator. pub id_generator: Arc, @@ -64,12 +67,14 @@ pub struct Task { impl Task { // new returns a new Task. pub fn new( + config: Arc, id_generator: Arc, storage: Arc, scheduler_client: Arc, http_client: Arc, ) -> Self { let piece = piece::Piece::new( + config.clone(), storage.clone(), scheduler_client.clone(), http_client.clone(), @@ -77,6 +82,7 @@ impl Task { let piece = Arc::new(piece); Self { + config, id_generator, storage: storage.clone(), scheduler_client: scheduler_client.clone(), @@ -116,23 +122,6 @@ impl Task { download: Download, download_progress_tx: Sender>, ) -> ClientResult<()> { - // Convert the timeout. - let timeout: Option = match download.timeout.clone() { - Some(timeout) => match Duration::try_from(timeout) { - Ok(timeout) => Some(timeout), - Err(err) => { - error!("convert timeout error: {:?}", err); - download_progress_tx - .send(Err(Status::invalid_argument("invalid timeout"))) - .await - .unwrap_or_else(|err| error!("send download progress error: {:?}", err)); - - return Err(Error::InvalidParameter()); - } - }, - None => None, - }; - // Open the file. let mut f = match OpenOptions::new() .create(true) @@ -291,7 +280,6 @@ impl Task { download.url.clone(), download.header.clone(), content_length, - timeout, download_progress_tx.clone(), ) .await @@ -672,7 +660,6 @@ impl Task { interested_piece.offset, interested_piece.length, header.clone(), - None, ) .await { @@ -811,7 +798,7 @@ impl Task { { Ok(reader) => reader, Err(err) => { - error!( + info!( "download piece {} from local peer error: {:?}", interested_piece.number, err ); @@ -875,7 +862,6 @@ impl Task { url: String, header: HashMap, content_length: u64, - timeout: Option, download_progress_tx: Sender>, ) -> ClientResult> { // Convert the header. @@ -896,7 +882,6 @@ impl Task { interested_piece.offset, interested_piece.length, header.clone(), - timeout, ) .await .map_err(|err| { @@ -967,7 +952,6 @@ impl Task { task_id: &str, url: &str, header: HeaderMap, - timeout: Option, ) -> ClientResult { let task = self .storage @@ -984,7 +968,7 @@ impl Task { .head(HTTPRequest { url: url.to_string(), header, - timeout, + timeout: self.config.download.piece_timeout, }) .await?; diff --git a/src/task/piece.rs b/src/task/piece.rs index e75a2328..3d1b3503 100644 --- a/src/task/piece.rs +++ b/src/task/piece.rs @@ -15,6 +15,7 @@ */ use crate::backend::http::{Request, HTTP}; +use crate::config::dfdaemon::Config; use crate::grpc::{dfdaemon::DfdaemonClient, scheduler::SchedulerClient}; use crate::storage::{metadata, Storage}; use crate::utils::digest::{Algorithm, Digest as UtilsDigest}; @@ -29,7 +30,6 @@ use rand::prelude::*; use reqwest::header::{self, HeaderMap}; use sha2::{Digest, Sha256}; use std::sync::Arc; -use std::time::Duration; use tokio::{ fs, io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, SeekFrom}, @@ -48,6 +48,9 @@ pub struct CollectPiece { // Piece represents a piece manager. pub struct Piece { + // config is the configuration of the dfdaemon. + config: Arc, + // manager_client is the grpc client of the manager. storage: Arc, @@ -62,11 +65,13 @@ pub struct Piece { impl Piece { // new returns a new Piece. pub fn new( + config: Arc, storage: Arc, scheduler_client: Arc, http_client: Arc, ) -> Self { Self { + config, storage, scheduler_client, http_client, @@ -413,7 +418,6 @@ impl Piece { offset: u64, length: u64, header: HeaderMap, - timeout: Option, ) -> Result { // Record the start of downloading piece. self.storage.download_piece_started(task_id, number)?; @@ -433,7 +437,7 @@ impl Piece { .get(Request { url: url.to_string(), header: header.to_owned(), - timeout, + timeout: self.config.download.piece_timeout, }) .await .map_err(|err| {