feat: support unix socket for dfdaemon download client (#115)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-11-17 18:28:21 +08:00 committed by GitHub
parent 2d5bc7c4c8
commit 2d01c0ac70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 7 deletions

View File

@ -67,3 +67,4 @@ base16ct = { version = "0.2", features = ["alloc"] }
http = "0.2"
rand = "0.8.5"
prost-wkt-types = "0.4"
tower = "0.4.13"

View File

@ -15,12 +15,19 @@
*/
use clap::Parser;
use dragonfly_api::common::v2::Download;
use dragonfly_api::common::v2::TaskType;
use dragonfly_api::dfdaemon::v2::DownloadTaskRequest;
use dragonfly_client::config::dfdaemon;
use dragonfly_client::config::dfget;
use dragonfly_client::grpc::dfdaemon::DfdaemonClient;
use dragonfly_client::tracing::init_tracing;
use dragonfly_client::Error;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
use tracing::Level;
use url::Url;
#[derive(Debug, Parser)]
#[command(
@ -31,6 +38,9 @@ use tracing::Level;
long_about = "A download command line based on P2P technology in Dragonfly that can download resources of different protocols."
)]
struct Args {
#[arg(help = "Specify the URL to download")]
url: Url,
#[arg(
short = 'o',
long = "output",
@ -68,7 +78,7 @@ struct Args {
default_value_t = 6,
help = "Set the priority for scheduling task"
)]
priority: u32,
priority: i32,
#[arg(
long = "application",
@ -85,18 +95,19 @@ struct Args {
tag: String,
#[arg(
short = 'H',
long = "header",
required = false,
help = "Set the header for downloading file, e.g. --header='Content-Type: application/json' --header='Accept: application/json'"
)]
header: Vec<String>,
headers: Vec<String>,
#[arg(
long = "filter",
required = false,
help = "Filter the query parameters of the downloaded URL. If the download URL is the same, it will be scheduled as the same task, e.g. --filter='signature' --filter='timeout'"
)]
filter: Vec<String>,
filters: Vec<String>,
#[arg(
long = "disable-back-to-source",
@ -121,10 +132,48 @@ struct Args {
log_dir: PathBuf,
}
fn main() {
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// Parse command line arguments.
let args = Args::parse();
// Initialize tracting.
let _guards = init_tracing(dfget::NAME, &args.log_dir, args.log_level, None);
// Create dfdaemon client.
let client = DfdaemonClient::new_unix(args.endpoint).await.unwrap();
let response = client
.download_task(DownloadTaskRequest {
download: Some(Download {
url: args.url.to_string(),
digest: Some(args.digest),
range: None,
r#type: TaskType::Dfdaemon as i32,
tag: Some(args.tag),
application: Some(args.application),
priority: args.priority,
filters: args.filters,
// TODO: Support to parse headers with clap.
header: HashMap::new(),
// TODO: Support to set piece_length.
piece_length: 0,
output_path: args.output.into_os_string().into_string().unwrap(),
// TODO: Support to set timeout.
timeout: None,
// TODO: Support to set download_rate_limit.
download_rate_limit: None,
need_back_to_source: false,
}),
})
.await?;
// TODO: Support to print progress.
let mut out_stream = response.into_inner();
while let Some(message) = out_stream.message().await? {
let piece = message.piece.ok_or(Error::InvalidParameter())?;
println!("{:?}", piece)
}
Ok(())
}

View File

@ -32,14 +32,15 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::net::UnixListener;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::mpsc;
use tokio_stream::wrappers::{ReceiverStream, UnixListenerStream};
use tonic::codec::CompressionEncoding;
use tonic::{
transport::{Channel, Server},
transport::{Channel, Endpoint, Server, Uri},
Request, Response, Status,
};
use tower::service_fn;
use tracing::{error, info};
// DfdaemonServer is the grpc server of the dfdaemon.
@ -90,7 +91,10 @@ impl DfdaemonServer {
// Initialize the grpc service.
let service = DfdaemonGRPCServer::new(DfdaemonServerHandler {
task: self.task.clone(),
});
})
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip)
.max_decoding_message_size(usize::MAX);
// Clone the shutdown channel.
let mut shutdown = self.shutdown.clone();
@ -397,6 +401,22 @@ impl DfdaemonClient {
Ok(Self { client })
}
// new_unix creates a new DfdaemonClient with unix domain socket.
pub async fn new_unix(socket_path: PathBuf) -> ClientResult<Self> {
// Ignore the uri because it is not used.
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(service_fn(move |_: Uri| {
UnixStream::connect(socket_path.clone())
}))
.await?;
let client = DfdaemonGRPCClient::new(channel)
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip)
.max_decoding_message_size(usize::MAX);
Ok(Self { client })
}
// get_piece_numbers gets the piece numbers.
pub async fn get_piece_numbers(
&self,