From 2d01c0ac7012c3b5a48ec0fef92ead50f7c1d15f Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 17 Nov 2023 18:28:21 +0800 Subject: [PATCH] feat: support unix socket for dfdaemon download client (#115) Signed-off-by: Gaius --- Cargo.toml | 1 + src/bin/dfget/main.rs | 57 ++++++++++++++++++++++++++++++++++++++++--- src/grpc/dfdaemon.rs | 26 +++++++++++++++++--- 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d0b59c69..d6e94eaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,3 +67,4 @@ base16ct = { version = "0.2", features = ["alloc"] } http = "0.2" rand = "0.8.5" prost-wkt-types = "0.4" +tower = "0.4.13" diff --git a/src/bin/dfget/main.rs b/src/bin/dfget/main.rs index 0aed11cf..0c40ace8 100644 --- a/src/bin/dfget/main.rs +++ b/src/bin/dfget/main.rs @@ -15,12 +15,19 @@ */ use clap::Parser; +use dragonfly_api::common::v2::Download; +use dragonfly_api::common::v2::TaskType; +use dragonfly_api::dfdaemon::v2::DownloadTaskRequest; use dragonfly_client::config::dfdaemon; use dragonfly_client::config::dfget; +use dragonfly_client::grpc::dfdaemon::DfdaemonClient; use dragonfly_client::tracing::init_tracing; +use dragonfly_client::Error; +use std::collections::HashMap; use std::path::PathBuf; use std::time::Duration; use tracing::Level; +use url::Url; #[derive(Debug, Parser)] #[command( @@ -31,6 +38,9 @@ use tracing::Level; long_about = "A download command line based on P2P technology in Dragonfly that can download resources of different protocols." )] struct Args { + #[arg(help = "Specify the URL to download")] + url: Url, + #[arg( short = 'o', long = "output", @@ -68,7 +78,7 @@ struct Args { default_value_t = 6, help = "Set the priority for scheduling task" )] - priority: u32, + priority: i32, #[arg( long = "application", @@ -85,18 +95,19 @@ struct Args { tag: String, #[arg( + short = 'H', long = "header", required = false, help = "Set the header for downloading file, e.g. --header='Content-Type: application/json' --header='Accept: application/json'" )] - header: Vec, + headers: Vec, #[arg( long = "filter", required = false, help = "Filter the query parameters of the downloaded URL. If the download URL is the same, it will be scheduled as the same task, e.g. --filter='signature' --filter='timeout'" )] - filter: Vec, + filters: Vec, #[arg( long = "disable-back-to-source", @@ -121,10 +132,48 @@ struct Args { log_dir: PathBuf, } -fn main() { +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { // Parse command line arguments. let args = Args::parse(); // Initialize tracting. let _guards = init_tracing(dfget::NAME, &args.log_dir, args.log_level, None); + + // Create dfdaemon client. + let client = DfdaemonClient::new_unix(args.endpoint).await.unwrap(); + + let response = client + .download_task(DownloadTaskRequest { + download: Some(Download { + url: args.url.to_string(), + digest: Some(args.digest), + range: None, + r#type: TaskType::Dfdaemon as i32, + tag: Some(args.tag), + application: Some(args.application), + priority: args.priority, + filters: args.filters, + // TODO: Support to parse headers with clap. + header: HashMap::new(), + // TODO: Support to set piece_length. + piece_length: 0, + output_path: args.output.into_os_string().into_string().unwrap(), + // TODO: Support to set timeout. + timeout: None, + // TODO: Support to set download_rate_limit. + download_rate_limit: None, + need_back_to_source: false, + }), + }) + .await?; + + // TODO: Support to print progress. + let mut out_stream = response.into_inner(); + while let Some(message) = out_stream.message().await? { + let piece = message.piece.ok_or(Error::InvalidParameter())?; + println!("{:?}", piece) + } + + Ok(()) } diff --git a/src/grpc/dfdaemon.rs b/src/grpc/dfdaemon.rs index 2d1440d0..a20ae248 100644 --- a/src/grpc/dfdaemon.rs +++ b/src/grpc/dfdaemon.rs @@ -32,14 +32,15 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncReadExt; -use tokio::net::UnixListener; +use tokio::net::{UnixListener, UnixStream}; use tokio::sync::mpsc; use tokio_stream::wrappers::{ReceiverStream, UnixListenerStream}; use tonic::codec::CompressionEncoding; use tonic::{ - transport::{Channel, Server}, + transport::{Channel, Endpoint, Server, Uri}, Request, Response, Status, }; +use tower::service_fn; use tracing::{error, info}; // DfdaemonServer is the grpc server of the dfdaemon. @@ -90,7 +91,10 @@ impl DfdaemonServer { // Initialize the grpc service. let service = DfdaemonGRPCServer::new(DfdaemonServerHandler { task: self.task.clone(), - }); + }) + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX); // Clone the shutdown channel. let mut shutdown = self.shutdown.clone(); @@ -397,6 +401,22 @@ impl DfdaemonClient { Ok(Self { client }) } + // new_unix creates a new DfdaemonClient with unix domain socket. + pub async fn new_unix(socket_path: PathBuf) -> ClientResult { + // Ignore the uri because it is not used. + let channel = Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(service_fn(move |_: Uri| { + UnixStream::connect(socket_path.clone()) + })) + .await?; + let client = DfdaemonGRPCClient::new(channel) + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX); + Ok(Self { client }) + } + // get_piece_numbers gets the piece numbers. pub async fn get_piece_numbers( &self,