feat: add piece_timeout for downloading from origin (#129)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-11-30 16:52:50 +08:00 committed by GitHub
parent c5d52b984d
commit 77d429075f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 51 additions and 63 deletions

View File

@ -30,7 +30,7 @@ pub struct Request {
pub header: HeaderMap, pub header: HeaderMap,
// timeout is the timeout of the request. // timeout is the timeout of the request.
pub timeout: Option<Duration>, pub timeout: Duration,
} }
// HeadResponse is the head response for HTTP backend. // HeadResponse is the head response for HTTP backend.
@ -70,13 +70,9 @@ impl HTTP {
} }
// Head gets the header of the request. // Head gets the header of the request.
pub async fn head(&self, req: Request) -> Result<HeadResponse> { pub async fn head(&self, request: Request) -> Result<HeadResponse> {
let mut request_builder = self.client.head(&req.url).headers(req.header); let mut request_builder = self.client.head(&request.url).headers(request.header);
if let Some(timeout) = req.timeout { request_builder = request_builder.timeout(request.timeout);
request_builder = request_builder.timeout(timeout);
} else {
request_builder = request_builder.timeout(super::REQUEST_TIMEOUT);
}
let response = request_builder.send().await?; let response = request_builder.send().await?;
let header = response.headers().clone(); let header = response.headers().clone();
@ -89,13 +85,9 @@ impl HTTP {
} }
// Get gets the content of the request. // Get gets the content of the request.
pub async fn get(&self, req: Request) -> Result<GetResponse<impl AsyncRead>> { pub async fn get(&self, request: Request) -> Result<GetResponse<impl AsyncRead>> {
let mut request_builder = self.client.get(&req.url).headers(req.header); let mut request_builder = self.client.get(&request.url).headers(request.header);
if let Some(timeout) = req.timeout { request_builder = request_builder.timeout(request.timeout);
request_builder = request_builder.timeout(timeout);
} else {
request_builder = request_builder.timeout(super::REQUEST_TIMEOUT);
}
let response = request_builder.send().await?; let response = request_builder.send().await?;
let header = response.headers().clone(); let header = response.headers().clone();

View File

@ -15,7 +15,3 @@
*/ */
pub mod http; pub mod http;
use std::time::Duration;
// REQUEST_TIMEOUT is the timeout for requests.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);

View File

@ -133,6 +133,7 @@ async fn main() -> Result<(), anyhow::Error> {
// Initialize task manager. // Initialize task manager.
let task = Task::new( let task = Task::new(
config.clone(),
id_generator.clone(), id_generator.clone(),
storage.clone(), storage.clone(),
scheduler_client.clone(), scheduler_client.clone(),

View File

@ -53,7 +53,7 @@ struct Args {
#[arg( #[arg(
short = 'e', short = 'e',
long = "endpoint", long = "endpoint",
default_value_os_t = dfdaemon::default_dfdaemon_unix_socket_path(), default_value_os_t = dfdaemon::default_download_unix_socket_path(),
help = "Endpoint of dfdaemon's GRPC server" help = "Endpoint of dfdaemon's GRPC server"
)] )]
endpoint: PathBuf, endpoint: PathBuf,
@ -158,7 +158,6 @@ async fn main() -> Result<(), anyhow::Error> {
// Create dfdaemon client. // Create dfdaemon client.
let client = DfdaemonClient::new_unix(args.endpoint).await.unwrap(); let client = DfdaemonClient::new_unix(args.endpoint).await.unwrap();
let response = client let response = client
.download_task(DownloadTaskRequest { .download_task(DownloadTaskRequest {
download: Some(Download { download: Some(Download {

View File

@ -36,7 +36,7 @@ struct Args {
#[arg( #[arg(
short = 'e', short = 'e',
long = "endpoint", long = "endpoint",
default_value_os_t = dfdaemon::default_dfdaemon_unix_socket_path(), default_value_os_t = dfdaemon::default_download_unix_socket_path(),
help = "Endpoint of dfdaemon's GRPC server" help = "Endpoint of dfdaemon's GRPC server"
)] )]
endpoint: PathBuf, endpoint: PathBuf,

View File

@ -26,8 +26,8 @@ use validator::Validate;
// NAME is the name of dfdaemon. // NAME is the name of dfdaemon.
pub const NAME: &str = "dfdaemon"; pub const NAME: &str = "dfdaemon";
// DEFAULT_GRPC_SERVER_PORT is the default port of the grpc server. // DEFAULT_UPLOAD_GRPC_SERVER_PORT is the default port of the upload grpc server.
const DEFAULT_GRPC_SERVER_PORT: u16 = 65000; const DEFAULT_UPLOAD_GRPC_SERVER_PORT: u16 = 65000;
// DEFAULT_PROXY_SERVER_PORT is the default port of the proxy server. // DEFAULT_PROXY_SERVER_PORT is the default port of the proxy server.
// const DEFAULT_PROXY_SERVER_PORT: u16 = 65001; // const DEFAULT_PROXY_SERVER_PORT: u16 = 65001;
@ -61,11 +61,16 @@ pub fn default_dfdaemon_cache_dir() -> PathBuf {
super::default_cache_dir().join(NAME) super::default_cache_dir().join(NAME)
} }
// default_dfdaemon_unix_socket_path is the default unix socket path for dfdaemon GRPC service. // default_download_unix_socket_path is the default unix socket path for download GRPC service.
pub fn default_dfdaemon_unix_socket_path() -> PathBuf { pub fn default_download_unix_socket_path() -> PathBuf {
super::default_root_dir().join("dfdaemon.sock") super::default_root_dir().join("dfdaemon.sock")
} }
// default_download_piece_timeout is the default timeout for downloading a piece from source.
pub fn default_download_piece_timeout() -> Duration {
Duration::from_secs(120)
}
// default_dfdaemon_lock_path is the default file lock path for dfdaemon service. // default_dfdaemon_lock_path is the default file lock path for dfdaemon service.
pub fn default_dfdaemon_lock_path() -> PathBuf { pub fn default_dfdaemon_lock_path() -> PathBuf {
super::default_lock_dir().join("dfdaemon.lock") super::default_lock_dir().join("dfdaemon.lock")
@ -177,19 +182,31 @@ pub struct DwonloadServer {
impl Default for DwonloadServer { impl Default for DwonloadServer {
fn default() -> Self { fn default() -> Self {
Self { Self {
socket_path: default_dfdaemon_unix_socket_path(), socket_path: default_download_unix_socket_path(),
} }
} }
} }
// Server is the server configuration for dfdaemon. // Server is the server configuration for dfdaemon.
#[derive(Debug, Clone, Default, Validate, Deserialize)] #[derive(Debug, Clone, Validate, Deserialize)]
#[serde(default, rename_all = "camelCase")] #[serde(default, rename_all = "camelCase")]
pub struct Download { pub struct Download {
// server is the download server configuration for dfdaemon. // server is the download server configuration for dfdaemon.
pub server: DwonloadServer, pub server: DwonloadServer,
// piece_timeout is the timeout for downloading a piece from source.
pub piece_timeout: Duration,
} }
// Server implements default value for Server.
impl Default for Download {
fn default() -> Self {
Self {
server: DwonloadServer::default(),
piece_timeout: default_download_piece_timeout(),
}
}
}
// UploadServer is the upload server configuration for dfdaemon. // UploadServer is the upload server configuration for dfdaemon.
#[derive(Debug, Clone, Validate, Deserialize)] #[derive(Debug, Clone, Validate, Deserialize)]
#[serde(default, rename_all = "camelCase")] #[serde(default, rename_all = "camelCase")]
@ -206,7 +223,7 @@ impl Default for UploadServer {
fn default() -> Self { fn default() -> Self {
Self { Self {
ip: None, ip: None,
port: DEFAULT_GRPC_SERVER_PORT, port: DEFAULT_UPLOAD_GRPC_SERVER_PORT,
} }
} }
} }

View File

@ -410,12 +410,7 @@ impl Dfdaemon for DfdaemonServerHandler {
// Get the content length. // Get the content length.
let content_length = self let content_length = self
.task .task
.get_content_length( .get_content_length(task_id.as_str(), download.url.as_str(), header.clone())
task_id.as_str(),
download.url.as_str(),
header.clone(),
None,
)
.await .await
.map_err(|e| { .map_err(|e| {
// Download task failed. // Download task failed.

View File

@ -15,6 +15,7 @@
*/ */
use crate::backend::http::{Request as HTTPRequest, HTTP}; use crate::backend::http::{Request as HTTPRequest, HTTP};
use crate::config::dfdaemon::Config;
use crate::grpc::scheduler::SchedulerClient; use crate::grpc::scheduler::SchedulerClient;
use crate::storage::{metadata, Storage}; use crate::storage::{metadata, Storage};
use crate::utils::http::headermap_to_hashmap; use crate::utils::http::headermap_to_hashmap;
@ -32,7 +33,6 @@ use mpsc::Sender;
use reqwest::header::{self, HeaderMap}; use reqwest::header::{self, HeaderMap};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::fs::{self, OpenOptions}; use tokio::fs::{self, OpenOptions};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
@ -44,6 +44,9 @@ pub mod piece;
// Task represents a task manager. // Task represents a task manager.
pub struct Task { pub struct Task {
// config is the configuration of the dfdaemon.
config: Arc<Config>,
// id_generator is the id generator. // id_generator is the id generator.
pub id_generator: Arc<IDGenerator>, pub id_generator: Arc<IDGenerator>,
@ -64,12 +67,14 @@ pub struct Task {
impl Task { impl Task {
// new returns a new Task. // new returns a new Task.
pub fn new( pub fn new(
config: Arc<Config>,
id_generator: Arc<IDGenerator>, id_generator: Arc<IDGenerator>,
storage: Arc<Storage>, storage: Arc<Storage>,
scheduler_client: Arc<SchedulerClient>, scheduler_client: Arc<SchedulerClient>,
http_client: Arc<HTTP>, http_client: Arc<HTTP>,
) -> Self { ) -> Self {
let piece = piece::Piece::new( let piece = piece::Piece::new(
config.clone(),
storage.clone(), storage.clone(),
scheduler_client.clone(), scheduler_client.clone(),
http_client.clone(), http_client.clone(),
@ -77,6 +82,7 @@ impl Task {
let piece = Arc::new(piece); let piece = Arc::new(piece);
Self { Self {
config,
id_generator, id_generator,
storage: storage.clone(), storage: storage.clone(),
scheduler_client: scheduler_client.clone(), scheduler_client: scheduler_client.clone(),
@ -116,23 +122,6 @@ impl Task {
download: Download, download: Download,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>, download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
) -> ClientResult<()> { ) -> ClientResult<()> {
// Convert the timeout.
let timeout: Option<Duration> = match download.timeout.clone() {
Some(timeout) => match Duration::try_from(timeout) {
Ok(timeout) => Some(timeout),
Err(err) => {
error!("convert timeout error: {:?}", err);
download_progress_tx
.send(Err(Status::invalid_argument("invalid timeout")))
.await
.unwrap_or_else(|err| error!("send download progress error: {:?}", err));
return Err(Error::InvalidParameter());
}
},
None => None,
};
// Open the file. // Open the file.
let mut f = match OpenOptions::new() let mut f = match OpenOptions::new()
.create(true) .create(true)
@ -291,7 +280,6 @@ impl Task {
download.url.clone(), download.url.clone(),
download.header.clone(), download.header.clone(),
content_length, content_length,
timeout,
download_progress_tx.clone(), download_progress_tx.clone(),
) )
.await .await
@ -672,7 +660,6 @@ impl Task {
interested_piece.offset, interested_piece.offset,
interested_piece.length, interested_piece.length,
header.clone(), header.clone(),
None,
) )
.await .await
{ {
@ -811,7 +798,7 @@ impl Task {
{ {
Ok(reader) => reader, Ok(reader) => reader,
Err(err) => { Err(err) => {
error!( info!(
"download piece {} from local peer error: {:?}", "download piece {} from local peer error: {:?}",
interested_piece.number, err interested_piece.number, err
); );
@ -875,7 +862,6 @@ impl Task {
url: String, url: String,
header: HashMap<String, String>, header: HashMap<String, String>,
content_length: u64, content_length: u64,
timeout: Option<Duration>,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>, download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
) -> ClientResult<Vec<metadata::Piece>> { ) -> ClientResult<Vec<metadata::Piece>> {
// Convert the header. // Convert the header.
@ -896,7 +882,6 @@ impl Task {
interested_piece.offset, interested_piece.offset,
interested_piece.length, interested_piece.length,
header.clone(), header.clone(),
timeout,
) )
.await .await
.map_err(|err| { .map_err(|err| {
@ -967,7 +952,6 @@ impl Task {
task_id: &str, task_id: &str,
url: &str, url: &str,
header: HeaderMap, header: HeaderMap,
timeout: Option<Duration>,
) -> ClientResult<u64> { ) -> ClientResult<u64> {
let task = self let task = self
.storage .storage
@ -984,7 +968,7 @@ impl Task {
.head(HTTPRequest { .head(HTTPRequest {
url: url.to_string(), url: url.to_string(),
header, header,
timeout, timeout: self.config.download.piece_timeout,
}) })
.await?; .await?;

View File

@ -15,6 +15,7 @@
*/ */
use crate::backend::http::{Request, HTTP}; use crate::backend::http::{Request, HTTP};
use crate::config::dfdaemon::Config;
use crate::grpc::{dfdaemon::DfdaemonClient, scheduler::SchedulerClient}; use crate::grpc::{dfdaemon::DfdaemonClient, scheduler::SchedulerClient};
use crate::storage::{metadata, Storage}; use crate::storage::{metadata, Storage};
use crate::utils::digest::{Algorithm, Digest as UtilsDigest}; use crate::utils::digest::{Algorithm, Digest as UtilsDigest};
@ -29,7 +30,6 @@ use rand::prelude::*;
use reqwest::header::{self, HeaderMap}; use reqwest::header::{self, HeaderMap};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::{ use tokio::{
fs, fs,
io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, SeekFrom}, io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, SeekFrom},
@ -48,6 +48,9 @@ pub struct CollectPiece {
// Piece represents a piece manager. // Piece represents a piece manager.
pub struct Piece { pub struct Piece {
// config is the configuration of the dfdaemon.
config: Arc<Config>,
// manager_client is the grpc client of the manager. // manager_client is the grpc client of the manager.
storage: Arc<Storage>, storage: Arc<Storage>,
@ -62,11 +65,13 @@ pub struct Piece {
impl Piece { impl Piece {
// new returns a new Piece. // new returns a new Piece.
pub fn new( pub fn new(
config: Arc<Config>,
storage: Arc<Storage>, storage: Arc<Storage>,
scheduler_client: Arc<SchedulerClient>, scheduler_client: Arc<SchedulerClient>,
http_client: Arc<HTTP>, http_client: Arc<HTTP>,
) -> Self { ) -> Self {
Self { Self {
config,
storage, storage,
scheduler_client, scheduler_client,
http_client, http_client,
@ -413,7 +418,6 @@ impl Piece {
offset: u64, offset: u64,
length: u64, length: u64,
header: HeaderMap, header: HeaderMap,
timeout: Option<Duration>,
) -> Result<impl AsyncRead> { ) -> Result<impl AsyncRead> {
// Record the start of downloading piece. // Record the start of downloading piece.
self.storage.download_piece_started(task_id, number)?; self.storage.download_piece_started(task_id, number)?;
@ -433,7 +437,7 @@ impl Piece {
.get(Request { .get(Request {
url: url.to_string(), url: url.to_string(),
header: header.to_owned(), header: header.to_owned(),
timeout, timeout: self.config.download.piece_timeout,
}) })
.await .await
.map_err(|err| { .map_err(|err| {