refactor: support hdfs for backend (#852)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-11-19 14:46:23 +08:00 committed by GitHub
parent 8ce9f23d4c
commit 9c6d16a5ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 63 additions and 38 deletions

View File

@ -23,17 +23,26 @@ use tokio_util::io::StreamReader;
use tracing::{error, info, instrument};
use url::Url;
const NAMENODE_DEFAULT_WEB_PORT: u16 = 9870;
/// HDFS_SCHEME is the scheme of the HDFS.
pub const HDFS_SCHEME: &str = "hdfs";
/// DEFAULT_NAMENODE_PORT is the default port of the HDFS namenode.
const DEFAULT_NAMENODE_PORT: u16 = 9870;
/// Hdfs is a struct that implements the Backend trait.
pub struct Hdfs {}
pub struct Hdfs {
/// scheme is the scheme of the HDFS.
scheme: String,
}
/// Hdfs implements the Backend trait.
impl Hdfs {
/// new returns a new HDFS backend.
#[instrument(skip_all)]
pub fn new() -> Self {
Self {}
Self {
scheme: HDFS_SCHEME.to_string(),
}
}
/// operator initializes the operator with the parsed URL and HDFS config.
@ -44,7 +53,7 @@ impl Hdfs {
.host_str()
.ok_or_else(|| ClientError::InvalidURI(url.to_string()))?
.to_string();
let port = url.port().unwrap_or(NAMENODE_DEFAULT_WEB_PORT);
let port = url.port().unwrap_or(DEFAULT_NAMENODE_PORT);
// Initialize the HDFS operator.
let mut builder = opendal::services::Webhdfs::default();
@ -54,7 +63,7 @@ impl Hdfs {
// If HDFS config is not None, set the config for builder.
if let Some(config) = config {
if let Some(delegation_token) = config.delegation_token {
if let Some(delegation_token) = &config.delegation_token {
builder = builder.delegation(delegation_token.as_str());
}
}
@ -69,7 +78,7 @@ impl super::Backend for Hdfs {
/// scheme returns the scheme of the HDFS backend.
#[instrument(skip_all)]
fn scheme(&self) -> String {
"hdfs".to_string()
self.scheme.clone()
}
/// head gets the header of the request.
@ -239,9 +248,6 @@ mod tests {
let url: Url = Url::parse("hdfs://127.0.0.1:9870/file").unwrap();
let operator = Hdfs::new().operator(url, None);
// If HDFS is running on localhost, the following code can be used to check the operator.
// operator.unwrap().check().await.unwrap();
assert!(
operator.is_ok(),
"can not get hdfs operator, due to: {}",

View File

@ -22,6 +22,12 @@ use std::io::{Error as IOError, ErrorKind};
use tokio_util::io::StreamReader;
use tracing::{debug, error, instrument};
// HTTP_SCHEME is the HTTP scheme.
pub const HTTP_SCHEME: &str = "http";
// HTTPS_SCHEME is the HTTPS scheme.
pub const HTTPS_SCHEME: &str = "https";
/// HTTP is the HTTP backend.
pub struct HTTP {
/// scheme is the scheme of the HTTP backend.
@ -173,13 +179,16 @@ impl super::Backend for HTTP {
impl Default for HTTP {
/// default returns a new default HTTP.
fn default() -> Self {
Self::new("http")
Self::new(HTTP_SCHEME)
}
}
#[cfg(test)]
mod tests {
use crate::{http::HTTP, Backend, GetRequest, HeadRequest};
use crate::{
http::{HTTP, HTTPS_SCHEME, HTTP_SCHEME},
Backend, GetRequest, HeadRequest,
};
use dragonfly_client_util::tls::{load_certs_from_pem, load_key_from_pem};
use hyper_util::rt::{TokioExecutor, TokioIo};
use reqwest::{header::HeaderMap, StatusCode};
@ -348,7 +357,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
.mount(&server)
.await;
let resp = HTTP::new("http")
let resp = HTTP::new(HTTP_SCHEME)
.head(HeadRequest {
task_id: "test".to_string(),
url: format!("{}/head", server.uri()),
@ -376,7 +385,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
.mount(&server)
.await;
let resp = HTTP::new("http")
let resp = HTTP::new(HTTP_SCHEME)
.head(HeadRequest {
task_id: "test".to_string(),
url: format!("{}/head", server.uri()),
@ -404,7 +413,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
.mount(&server)
.await;
let mut resp = HTTP::new("http")
let mut resp = HTTP::new(HTTP_SCHEME)
.get(GetRequest {
task_id: "test".to_string(),
piece_id: "test".to_string(),
@ -426,7 +435,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_get_head_response_with_self_signed_cert() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let resp = HTTP::new("https")
let resp = HTTP::new(HTTPS_SCHEME)
.head(HeadRequest {
task_id: "test".to_string(),
url: server_addr,
@ -445,7 +454,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_return_error_response_when_head_with_wrong_cert() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let resp = HTTP::new("https")
let resp = HTTP::new(HTTPS_SCHEME)
.head(HeadRequest {
task_id: "test".to_string(),
url: server_addr,
@ -463,7 +472,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_get_response_with_self_signed_cert() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let mut resp = HTTP::new("https")
let mut resp = HTTP::new(HTTPS_SCHEME)
.get(GetRequest {
task_id: "test".to_string(),
piece_id: "test".to_string(),
@ -485,7 +494,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_return_error_response_when_get_with_wrong_cert() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let resp = HTTP::new("https")
let resp = HTTP::new(HTTPS_SCHEME)
.get(GetRequest {
task_id: "test".to_string(),
piece_id: "test".to_string(),
@ -505,7 +514,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_get_head_response_with_no_verifier() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let resp = HTTP::new("https")
let resp = HTTP::new(HTTPS_SCHEME)
.head(HeadRequest {
task_id: "test".to_string(),
url: server_addr,
@ -524,7 +533,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_get_response_with_no_verifier() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let http_backend = HTTP::new("https");
let http_backend = HTTP::new(HTTPS_SCHEME);
let mut resp = http_backend
.get(GetRequest {
task_id: "test".to_string(),

View File

@ -23,6 +23,7 @@ use libloading::Library;
use reqwest::header::HeaderMap;
use rustls_pki_types::CertificateDer;
use std::path::Path;
use std::str::FromStr;
use std::{collections::HashMap, pin::Pin, time::Duration};
use std::{fmt::Debug, fs};
use tokio::io::{AsyncRead, AsyncReadExt};
@ -227,6 +228,12 @@ impl BackendFactory {
Ok(backend_factory)
}
/// supported_download_directory returns whether the scheme supports directory download.
#[instrument(skip_all)]
pub fn supported_download_directory(scheme: &str) -> bool {
object_storage::Scheme::from_str(scheme).is_ok() || scheme == hdfs::HDFS_SCHEME
}
/// build returns the backend by the scheme of the url.
#[instrument(skip_all)]
pub fn build(&self, url: &str) -> Result<&(dyn Backend + Send + Sync)> {
@ -241,12 +248,16 @@ impl BackendFactory {
/// load_builtin_backends loads the builtin backends.
#[instrument(skip_all)]
fn load_builtin_backends(&mut self) {
self.backends
.insert("http".to_string(), Box::new(http::HTTP::new("http")));
self.backends.insert(
"http".to_string(),
Box::new(http::HTTP::new(http::HTTP_SCHEME)),
);
info!("load [http] builtin backend");
self.backends
.insert("https".to_string(), Box::new(http::HTTP::new("https")));
self.backends.insert(
"https".to_string(),
Box::new(http::HTTP::new(http::HTTPS_SCHEME)),
);
info!("load [https] builtin backend");
self.backends.insert(

View File

@ -25,7 +25,7 @@ use dragonfly_client::metrics::{
collect_backend_request_started_metrics,
};
use dragonfly_client::tracing::init_tracing;
use dragonfly_client_backend::{object_storage, BackendFactory, DirEntry, HeadRequest};
use dragonfly_client_backend::{hdfs, object_storage, BackendFactory, DirEntry, HeadRequest};
use dragonfly_client_config::VersionValueParser;
use dragonfly_client_config::{self, dfdaemon, dfget};
use dragonfly_client_core::error::{BackendError, ErrorType, OrErr};
@ -55,6 +55,9 @@ The full documentation is here: https://d7y.io/docs/next/reference/commands/clie
Examples:
# Download a file from HTTP server.
$ dfget https://<host>:<port>/<path> -O /tmp/file.txt
# Download a file from HDFS.
$ dfget hdfs://<host>:<port>/<path> -O /tmp/file.txt --hdfs-delegation-token=<delegation_token>
# Download a file from Amazon Simple Storage Service(S3).
$ dfget s3://<bucket>/<path> -O /tmp/file.txt --storage-access-key-id=<access_key_id> --storage-access-key-secret=<access_key_secret>
@ -204,7 +207,7 @@ struct Args {
long,
help = "Specify the delegation token for Hadoop Distributed File System(HDFS)"
)]
storage_delegation_token: Option<String>,
hdfs_delegation_token: Option<String>,
#[arg(
long,
@ -574,24 +577,19 @@ async fn run(mut args: Args, dfdaemon_download_client: DfdaemonDownloadClient) -
// then download all files in the directory. Otherwise, download the single file.
let scheme = args.url.scheme();
if args.url.path().ends_with('/') {
if !is_support_directory_download(scheme) {
if !BackendFactory::supported_download_directory(scheme) {
return Err(Error::Unsupported(format!("{} download directory", scheme)));
};
return download_dir(args, dfdaemon_download_client).await;
};
download(args, ProgressBar::new(0), dfdaemon_download_client).await
}
/// is_support_directory_download checks whether the scheme supports directory download.
fn is_support_directory_download(scheme: &str) -> bool {
// Only object storage protocol and hdfs protocol supports directory download.
object_storage::Scheme::from_str(scheme).is_ok() || scheme == "hdfs"
}
/// download_dir downloads all files in the directory.
async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Result<()> {
// Initialize the object storage and the hdfs.
// Initialize the object storage config and the hdfs config.
let object_storage = Some(ObjectStorage {
access_key_id: args.storage_access_key_id.clone(),
access_key_secret: args.storage_access_key_secret.clone(),
@ -601,8 +599,9 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re
credential_path: args.storage_credential_path.clone(),
predefined_acl: args.storage_predefined_acl.clone(),
});
let hdfs = Some(Hdfs {
delegation_token: args.storage_delegation_token.clone(),
delegation_token: args.hdfs_delegation_token.clone(),
});
// Get all entries in the directory. If the directory is empty, then return directly.
@ -702,10 +701,10 @@ async fn download(
Err(_) => None,
};
// Only initialize hdfs when the scheme is HDFS protocol.
// Only initialize HDFS when the scheme is HDFS protocol.
let hdfs = match args.url.scheme() {
"hdfs" => Some(Hdfs {
delegation_token: args.storage_delegation_token.clone(),
hdfs::HDFS_SCHEME => Some(Hdfs {
delegation_token: args.hdfs_delegation_token.clone(),
}),
_ => None,
};