client/main.rs

461 lines
15 KiB
Rust

/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use clap::Parser;
use dragonfly_api::common::v2::{Download, TaskType};
use dragonfly_api::dfdaemon::v2::{download_task_response, DownloadTaskRequest};
use dragonfly_api::errordetails::v2::Http;
use dragonfly_client::grpc::dfdaemon_download::DfdaemonDownloadClient;
use dragonfly_client::grpc::health::HealthClient;
use dragonfly_client::tracing::init_tracing;
use dragonfly_client_config::{self, default_piece_length, dfdaemon, dfget};
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
};
use dragonfly_client_util::http::header_vec_to_hashmap;
use fslock::LockFile;
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use std::{cmp::min, fmt::Write};
use termion::{color, style};
use tokio::process::{Child, Command};
use tracing::{debug, error, info, Level};
use url::Url;
// DEFAULT_DFDAEMON_CHECK_HEALTH_INTERVAL is the default interval of checking dfdaemon's health.
const DEFAULT_DFDAEMON_CHECK_HEALTH_INTERVAL: Duration = Duration::from_millis(200);
// DEFAULT_DFDAEMON_CHECK_HEALTH_TIMEOUT is the default timeout of checking dfdaemon's health.
const DEFAULT_DFDAEMON_CHECK_HEALTH_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Parser)]
#[command(
name = dfget::NAME,
author,
version,
about = "dfget is a download command line based on P2P technology",
long_about = "A download command line based on P2P technology in Dragonfly that can download resources of different protocols."
)]
struct Args {
#[arg(help = "Specify the URL to download")]
url: Url,
#[arg(
short = 'O',
long = "output",
help = "Specify the output path of downloading file"
)]
output: PathBuf,
#[arg(
short = 'e',
long = "endpoint",
default_value_os_t = dfdaemon::default_download_unix_socket_path(),
help = "Endpoint of dfdaemon's GRPC server"
)]
endpoint: PathBuf,
#[arg(
long = "timeout",
value_parser= humantime::parse_duration,
default_value = "2h",
help = "Specify the timeout for downloading a file"
)]
timeout: Duration,
#[arg(
long = "piece-length",
default_value_t = default_piece_length(),
help = "Specify the byte length of the piece"
)]
piece_length: u64,
#[arg(
short = 'd',
long = "digest",
default_value = "",
help = "Verify the integrity of the downloaded file using the specified digest, e.g. md5:86d3f3a95c324c9479bd8986968f4327"
)]
digest: String,
#[arg(
short = 'p',
long = "priority",
default_value_t = 6,
help = "Specify the priority for scheduling task"
)]
priority: i32,
#[arg(
long = "application",
default_value = "",
help = "Caller application which is used for statistics and access control"
)]
application: String,
#[arg(
long = "tag",
default_value = "",
help = "Different tags for the same url will be divided into different tasks"
)]
tag: String,
#[arg(
short = 'H',
long = "header",
required = false,
help = "Specify the header for downloading file, e.g. --header='Content-Type: application/json' --header='Accept: application/json'"
)]
header: Option<Vec<String>>,
#[arg(
long = "filtered-query-param",
required = false,
help = "Filter the query parameters of the downloaded URL. If the download URL is the same, it will be scheduled as the same task, e.g. --filtered-query-param='signature' --filtered-query-param='timeout'"
)]
filtered_query_params: Option<Vec<String>>,
#[arg(
long = "disable-back-to-source",
default_value_t = false,
help = "Disable back-to-source download when dfget download failed"
)]
disable_back_to_source: bool,
#[arg(
short = 'l',
long,
default_value = "info",
help = "Specify the logging level [trace, debug, info, warn, error]"
)]
log_level: Level,
#[arg(
long,
default_value_os_t = dfget::default_dfget_log_dir(),
help = "Specify the log directory"
)]
log_dir: PathBuf,
#[arg(
long,
default_value_t = 24,
help = "Specify the max number of log files"
)]
log_max_files: usize,
#[arg(
long = "verbose",
default_value_t = false,
help = "Specify whether to print log"
)]
verbose: bool,
#[arg(
short = 'c',
long = "dfdaemon-config",
default_value_os_t = dfdaemon::default_dfdaemon_config_path(),
help = "Specify dfdaemon's config file to use")
]
dfdaemon_config: PathBuf,
#[arg(
long = "dfdaemon-lock-path",
default_value_os_t = dfdaemon::default_dfdaemon_lock_path(),
help = "Specify the dfdaemon's lock file path"
)]
dfdaemon_lock_path: PathBuf,
#[arg(
long = "dfdaemon-log-level",
default_value = "info",
help = "Specify the dfdaemon's logging level [trace, debug, info, warn, error]"
)]
dfdaemon_log_level: Level,
#[arg(
long = "dfdaemon-log-dir",
default_value_os_t = dfdaemon::default_dfdaemon_log_dir(),
help = "Specify the dfdaemon's log directory"
)]
dfdaemon_log_dir: PathBuf,
#[arg(
long,
default_value_t = 24,
help = "Specify the dfdaemon's max number of log files"
)]
dfdaemon_log_max_files: usize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Parse command line arguments.
let args = Args::parse();
// Initialize tracting.
let _guards = init_tracing(
dfget::NAME,
&args.log_dir,
args.log_level,
args.log_max_files,
None,
args.verbose,
);
// Run dfget command.
if let Err(err) = run(args).await {
match err {
Error::TonicStatus(status) => {
let details = status.details();
if let Ok(http_err) = serde_json::from_slice::<Http>(details) {
eprintln!(
"{}{}{}Downloading failed, bad status code:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
http_err.status_code
);
eprintln!(
"{}{}{}Header:{}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset
);
for (key, value) in http_err.header.iter() {
eprintln!(" [{}]: {}", key.as_str(), value.as_str());
}
} else {
eprintln!("Error Code: {}", status.code());
eprintln!("Message: {}", status.message());
eprintln!(
"Details: {}",
std::str::from_utf8(status.details()).unwrap().to_string()
);
}
}
Error::HTTP(err) => {
eprintln!("Error Status Code: {}", err.status_code);
eprintln!("Header:");
for (key, value) in err.header.iter() {
eprintln!("{}: {}", key.as_str(), value.to_str().unwrap());
}
}
err => {
eprintln!("Message: {}", err);
}
}
std::process::exit(1);
}
Ok(())
}
// run runs the dfget command.
async fn run(args: Args) -> Result<()> {
// Get or create dfdaemon download client.
let dfdaemon_download_client = get_or_create_dfdaemon_download_client(
args.dfdaemon_config,
args.endpoint.clone(),
args.dfdaemon_log_dir,
args.dfdaemon_log_level,
args.dfdaemon_log_max_files,
args.dfdaemon_lock_path,
)
.await
.map_err(|err| {
error!("initialize dfdaemon download client failed: {}", err);
err
})?;
// Create dfdaemon client.
let response = dfdaemon_download_client
.download_task(DownloadTaskRequest {
download: Some(Download {
url: args.url.to_string(),
digest: Some(args.digest),
// NOTE: Dfget does not support range download.
range: None,
r#type: TaskType::Dfdaemon as i32,
tag: Some(args.tag),
application: Some(args.application),
priority: args.priority,
filtered_query_params: args.filtered_query_params.unwrap_or_default(),
request_header: header_vec_to_hashmap(args.header.unwrap_or_default())?,
piece_length: args.piece_length,
output_path: Some(args.output.into_os_string().into_string().unwrap()),
timeout: Some(
prost_wkt_types::Duration::try_from(args.timeout)
.or_err(ErrorType::ParseError)?,
),
need_back_to_source: false,
disable_back_to_source: args.disable_back_to_source,
certificate_chain: Vec::new(),
prefetch: false,
}),
})
.await
.map_err(|err| {
error!("download task failed: {}", err);
err
})?;
// Initialize progress bar.
let pb = ProgressBar::new(0);
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] [{wide_bar}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})",
)
.or_err(ErrorType::ParseError)?
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| {
write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()
})
.progress_chars("#>-"),
);
// Dwonload file.
let mut downloaded = 0;
let mut out_stream = response.into_inner();
while let Some(message) = out_stream.message().await.map_err(|err| {
error!("get message failed: {}", err);
err
})? {
match message.response {
Some(download_task_response::Response::DownloadTaskStartedResponse(response)) => {
pb.set_length(response.content_length);
}
Some(download_task_response::Response::DownloadPieceFinishedResponse(response)) => {
let piece = response.piece.ok_or(Error::InvalidParameter)?;
downloaded += piece.length;
let position = min(downloaded + piece.length, pb.length().unwrap_or(0));
pb.set_position(position);
}
None => {}
}
}
pb.finish_with_message("downloaded");
Ok(())
}
// get_or_create_dfdaemon_download_client gets a dfdaemon download client or creates a new one.
async fn get_or_create_dfdaemon_download_client(
config_path: PathBuf,
endpoint: PathBuf,
log_dir: PathBuf,
log_level: Level,
log_max_files: usize,
lock_path: PathBuf,
) -> Result<DfdaemonDownloadClient> {
// Get dfdaemon download client and check its health.
match get_dfdaemon_download_client(endpoint.clone()).await {
Ok(dfdaemon_download_client) => return Ok(dfdaemon_download_client),
Err(err) => debug!("get dfdaemon download client failed: {}", err),
}
// Create a lock file to prevent multiple dfdaemon processes from being created.
let mut f = LockFile::open(lock_path.as_path())?;
f.lock()?;
// Check dfdaemon download client again.
match get_dfdaemon_download_client(endpoint.clone()).await {
Ok(dfdaemon_download_client) => return Ok(dfdaemon_download_client),
Err(err) => debug!("get dfdaemon download client failed: {}", err),
}
// Spawn a dfdaemon process.
let child = spawn_dfdaemon(config_path, log_dir, log_level, log_max_files)?;
info!("spawn dfdaemon process: {:?}", child);
// Initialize the timeout of checking dfdaemon's health.
let check_health_timeout = tokio::time::sleep(DEFAULT_DFDAEMON_CHECK_HEALTH_TIMEOUT);
tokio::pin!(check_health_timeout);
// Wait for dfdaemon's health.
let mut interval = tokio::time::interval(DEFAULT_DFDAEMON_CHECK_HEALTH_INTERVAL);
loop {
tokio::select! {
_ = interval.tick() => {
match get_dfdaemon_download_client(endpoint.clone()).await {
Ok(dfdaemon_download_client) => {
f.unlock()?;
return Ok(dfdaemon_download_client);
}
Err(err) => debug!("get dfdaemon download client failed: {}", err),
}
}
_ = &mut check_health_timeout => {
return Err(Error::Unknown("get dfdaemon download client timeout".to_string()));
}
}
}
}
// get_and_check_dfdaemon_download_client gets a dfdaemon download client and checks its health.
async fn get_dfdaemon_download_client(endpoint: PathBuf) -> Result<DfdaemonDownloadClient> {
// Check dfdaemon's health.
let health_client = HealthClient::new_unix(endpoint.clone()).await?;
health_client.check_dfdaemon_download().await?;
// Get dfdaemon download client.
let dfdaemon_download_client = DfdaemonDownloadClient::new_unix(endpoint).await?;
Ok(dfdaemon_download_client)
}
// spawn_dfdaemon spawns a dfdaemon process in the background.
fn spawn_dfdaemon(
config_path: PathBuf,
log_dir: PathBuf,
log_level: Level,
log_max_files: usize,
) -> Result<Child> {
// Create dfdaemon command.
let mut cmd = Command::new("dfdaemon");
// Set command line arguments.
cmd.arg("--config")
.arg(config_path)
.arg("--log-dir")
.arg(log_dir)
.arg("--log-level")
.arg(log_level.to_string())
.arg("--log-max-files")
.arg(log_max_files.to_string());
// Redirect stdin, stdout, stderr to /dev/null.
cmd.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null());
// Create a new session for dfdaemon by calling setsid.
unsafe {
cmd.pre_exec(|| {
libc::setsid();
Ok(())
});
}
let child = cmd.spawn()?;
Ok(child)
}