/* * Copyright 2023 The Dragonfly Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ use clap::Parser; use dragonfly_api::common::v2::{Download, TaskType}; use dragonfly_api::dfdaemon::v2::{download_task_response, DownloadTaskRequest}; use dragonfly_api::errordetails::v2::Http; use dragonfly_client::grpc::dfdaemon_download::DfdaemonDownloadClient; use dragonfly_client::grpc::health::HealthClient; use dragonfly_client::tracing::init_tracing; use dragonfly_client_config::{self, default_piece_length, dfdaemon, dfget}; use dragonfly_client_core::{ error::{ErrorType, OrErr}, Error, Result, }; use dragonfly_client_util::http::header_vec_to_hashmap; use fslock::LockFile; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use std::path::PathBuf; use std::process::Stdio; use std::time::Duration; use std::{cmp::min, fmt::Write}; use termion::{color, style}; use tokio::process::{Child, Command}; use tracing::{debug, error, info, Level}; use url::Url; // DEFAULT_DFDAEMON_CHECK_HEALTH_INTERVAL is the default interval of checking dfdaemon's health. const DEFAULT_DFDAEMON_CHECK_HEALTH_INTERVAL: Duration = Duration::from_millis(200); // DEFAULT_DFDAEMON_CHECK_HEALTH_TIMEOUT is the default timeout of checking dfdaemon's health. const DEFAULT_DFDAEMON_CHECK_HEALTH_TIMEOUT: Duration = Duration::from_secs(10); #[derive(Debug, Parser)] #[command( name = dfget::NAME, author, version, about = "dfget is a download command line based on P2P technology", long_about = "A download command line based on P2P technology in Dragonfly that can download resources of different protocols." )] struct Args { #[arg(help = "Specify the URL to download")] url: Url, #[arg( short = 'O', long = "output", help = "Specify the output path of downloading file" )] output: PathBuf, #[arg( short = 'e', long = "endpoint", default_value_os_t = dfdaemon::default_download_unix_socket_path(), help = "Endpoint of dfdaemon's GRPC server" )] endpoint: PathBuf, #[arg( long = "timeout", value_parser= humantime::parse_duration, default_value = "2h", help = "Specify the timeout for downloading a file" )] timeout: Duration, #[arg( long = "piece-length", default_value_t = default_piece_length(), help = "Specify the byte length of the piece" )] piece_length: u64, #[arg( short = 'd', long = "digest", default_value = "", help = "Verify the integrity of the downloaded file using the specified digest, e.g. md5:86d3f3a95c324c9479bd8986968f4327" )] digest: String, #[arg( short = 'p', long = "priority", default_value_t = 6, help = "Specify the priority for scheduling task" )] priority: i32, #[arg( long = "application", default_value = "", help = "Caller application which is used for statistics and access control" )] application: String, #[arg( long = "tag", default_value = "", help = "Different tags for the same url will be divided into different tasks" )] tag: String, #[arg( short = 'H', long = "header", required = false, help = "Specify the header for downloading file, e.g. --header='Content-Type: application/json' --header='Accept: application/json'" )] header: Option>, #[arg( long = "filtered-query-param", required = false, help = "Filter the query parameters of the downloaded URL. If the download URL is the same, it will be scheduled as the same task, e.g. --filtered-query-param='signature' --filtered-query-param='timeout'" )] filtered_query_params: Option>, #[arg( long = "disable-back-to-source", default_value_t = false, help = "Disable back-to-source download when dfget download failed" )] disable_back_to_source: bool, #[arg( short = 'l', long, default_value = "info", help = "Specify the logging level [trace, debug, info, warn, error]" )] log_level: Level, #[arg( long, default_value_os_t = dfget::default_dfget_log_dir(), help = "Specify the log directory" )] log_dir: PathBuf, #[arg( long, default_value_t = 24, help = "Specify the max number of log files" )] log_max_files: usize, #[arg( long = "verbose", default_value_t = false, help = "Specify whether to print log" )] verbose: bool, #[arg( short = 'c', long = "dfdaemon-config", default_value_os_t = dfdaemon::default_dfdaemon_config_path(), help = "Specify dfdaemon's config file to use") ] dfdaemon_config: PathBuf, #[arg( long = "dfdaemon-lock-path", default_value_os_t = dfdaemon::default_dfdaemon_lock_path(), help = "Specify the dfdaemon's lock file path" )] dfdaemon_lock_path: PathBuf, #[arg( long = "dfdaemon-log-level", default_value = "info", help = "Specify the dfdaemon's logging level [trace, debug, info, warn, error]" )] dfdaemon_log_level: Level, #[arg( long = "dfdaemon-log-dir", default_value_os_t = dfdaemon::default_dfdaemon_log_dir(), help = "Specify the dfdaemon's log directory" )] dfdaemon_log_dir: PathBuf, #[arg( long, default_value_t = 24, help = "Specify the dfdaemon's max number of log files" )] dfdaemon_log_max_files: usize, } #[tokio::main] async fn main() -> anyhow::Result<()> { // Parse command line arguments. let args = Args::parse(); // Initialize tracting. let _guards = init_tracing( dfget::NAME, &args.log_dir, args.log_level, args.log_max_files, None, args.verbose, ); // Run dfget command. if let Err(err) = run(args).await { match err { Error::TonicStatus(status) => { let details = status.details(); if let Ok(http_err) = serde_json::from_slice::(details) { eprintln!( "{}{}{}Downloading failed, bad status code:{} {}", color::Fg(color::Red), style::Italic, style::Bold, style::Reset, http_err.status_code ); eprintln!( "{}{}{}Header:{}", color::Fg(color::Cyan), style::Italic, style::Bold, style::Reset ); for (key, value) in http_err.header.iter() { eprintln!(" [{}]: {}", key.as_str(), value.as_str()); } } else { eprintln!("Error Code: {}", status.code()); eprintln!("Message: {}", status.message()); eprintln!( "Details: {}", std::str::from_utf8(status.details()).unwrap().to_string() ); } } Error::HTTP(err) => { eprintln!("Error Status Code: {}", err.status_code); eprintln!("Header:"); for (key, value) in err.header.iter() { eprintln!("{}: {}", key.as_str(), value.to_str().unwrap()); } } err => { eprintln!("Message: {}", err); } } std::process::exit(1); } Ok(()) } // run runs the dfget command. async fn run(args: Args) -> Result<()> { // Get or create dfdaemon download client. let dfdaemon_download_client = get_or_create_dfdaemon_download_client( args.dfdaemon_config, args.endpoint.clone(), args.dfdaemon_log_dir, args.dfdaemon_log_level, args.dfdaemon_log_max_files, args.dfdaemon_lock_path, ) .await .map_err(|err| { error!("initialize dfdaemon download client failed: {}", err); err })?; // Create dfdaemon client. let response = dfdaemon_download_client .download_task(DownloadTaskRequest { download: Some(Download { url: args.url.to_string(), digest: Some(args.digest), // NOTE: Dfget does not support range download. range: None, r#type: TaskType::Dfdaemon as i32, tag: Some(args.tag), application: Some(args.application), priority: args.priority, filtered_query_params: args.filtered_query_params.unwrap_or_default(), request_header: header_vec_to_hashmap(args.header.unwrap_or_default())?, piece_length: args.piece_length, output_path: Some(args.output.into_os_string().into_string().unwrap()), timeout: Some( prost_wkt_types::Duration::try_from(args.timeout) .or_err(ErrorType::ParseError)?, ), need_back_to_source: false, disable_back_to_source: args.disable_back_to_source, certificate_chain: Vec::new(), prefetch: false, }), }) .await .map_err(|err| { error!("download task failed: {}", err); err })?; // Initialize progress bar. let pb = ProgressBar::new(0); pb.set_style( ProgressStyle::with_template( "[{elapsed_precise}] [{wide_bar}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})", ) .or_err(ErrorType::ParseError)? .with_key("eta", |state: &ProgressState, w: &mut dyn Write| { write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap() }) .progress_chars("#>-"), ); // Dwonload file. let mut downloaded = 0; let mut out_stream = response.into_inner(); while let Some(message) = out_stream.message().await.map_err(|err| { error!("get message failed: {}", err); err })? { match message.response { Some(download_task_response::Response::DownloadTaskStartedResponse(response)) => { pb.set_length(response.content_length); } Some(download_task_response::Response::DownloadPieceFinishedResponse(response)) => { let piece = response.piece.ok_or(Error::InvalidParameter)?; downloaded += piece.length; let position = min(downloaded + piece.length, pb.length().unwrap_or(0)); pb.set_position(position); } None => {} } } pb.finish_with_message("downloaded"); Ok(()) } // get_or_create_dfdaemon_download_client gets a dfdaemon download client or creates a new one. async fn get_or_create_dfdaemon_download_client( config_path: PathBuf, endpoint: PathBuf, log_dir: PathBuf, log_level: Level, log_max_files: usize, lock_path: PathBuf, ) -> Result { // Get dfdaemon download client and check its health. match get_dfdaemon_download_client(endpoint.clone()).await { Ok(dfdaemon_download_client) => return Ok(dfdaemon_download_client), Err(err) => debug!("get dfdaemon download client failed: {}", err), } // Create a lock file to prevent multiple dfdaemon processes from being created. let mut f = LockFile::open(lock_path.as_path())?; f.lock()?; // Check dfdaemon download client again. match get_dfdaemon_download_client(endpoint.clone()).await { Ok(dfdaemon_download_client) => return Ok(dfdaemon_download_client), Err(err) => debug!("get dfdaemon download client failed: {}", err), } // Spawn a dfdaemon process. let child = spawn_dfdaemon(config_path, log_dir, log_level, log_max_files)?; info!("spawn dfdaemon process: {:?}", child); // Initialize the timeout of checking dfdaemon's health. let check_health_timeout = tokio::time::sleep(DEFAULT_DFDAEMON_CHECK_HEALTH_TIMEOUT); tokio::pin!(check_health_timeout); // Wait for dfdaemon's health. let mut interval = tokio::time::interval(DEFAULT_DFDAEMON_CHECK_HEALTH_INTERVAL); loop { tokio::select! { _ = interval.tick() => { match get_dfdaemon_download_client(endpoint.clone()).await { Ok(dfdaemon_download_client) => { f.unlock()?; return Ok(dfdaemon_download_client); } Err(err) => debug!("get dfdaemon download client failed: {}", err), } } _ = &mut check_health_timeout => { return Err(Error::Unknown("get dfdaemon download client timeout".to_string())); } } } } // get_and_check_dfdaemon_download_client gets a dfdaemon download client and checks its health. async fn get_dfdaemon_download_client(endpoint: PathBuf) -> Result { // Check dfdaemon's health. let health_client = HealthClient::new_unix(endpoint.clone()).await?; health_client.check_dfdaemon_download().await?; // Get dfdaemon download client. let dfdaemon_download_client = DfdaemonDownloadClient::new_unix(endpoint).await?; Ok(dfdaemon_download_client) } // spawn_dfdaemon spawns a dfdaemon process in the background. fn spawn_dfdaemon( config_path: PathBuf, log_dir: PathBuf, log_level: Level, log_max_files: usize, ) -> Result { // Create dfdaemon command. let mut cmd = Command::new("dfdaemon"); // Set command line arguments. cmd.arg("--config") .arg(config_path) .arg("--log-dir") .arg(log_dir) .arg("--log-level") .arg(log_level.to_string()) .arg("--log-max-files") .arg(log_max_files.to_string()); // Redirect stdin, stdout, stderr to /dev/null. cmd.stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::null()); // Create a new session for dfdaemon by calling setsid. unsafe { cmd.pre_exec(|| { libc::setsid(); Ok(()) }); } let child = cmd.spawn()?; Ok(child) }