feat: S3 Support for Single File Download (#577)

Signed-off-by: Zyyeric <eric1976808123@gmail.com>
This commit is contained in:
Zyyeric 2024-07-16 10:51:36 +08:00 committed by GitHub
parent c3987cece2
commit cd6f981ca0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 425 additions and 1 deletions

83
Cargo.lock generated
View File

@ -774,6 +774,26 @@ version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
[[package]]
name = "const-random"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
dependencies = [
"const-random-macro",
]
[[package]]
name = "const-random-macro"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
dependencies = [
"getrandom",
"once_cell",
"tiny-keccak",
]
[[package]]
name = "constant_time_eq"
version = "0.3.0"
@ -814,6 +834,15 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32c"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47"
dependencies = [
"rustc_version",
]
[[package]]
name = "crc32fast"
version = "1.3.2"
@ -982,6 +1011,15 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "dlv-list"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f"
dependencies = [
"const-random",
]
[[package]]
name = "dragonfly-api"
version = "2.0.134"
@ -2687,6 +2725,7 @@ dependencies = [
"base64 0.22.1",
"bytes",
"chrono",
"crc32c",
"flagset",
"futures",
"getrandom",
@ -2841,6 +2880,16 @@ dependencies = [
"num-traits",
]
[[package]]
name = "ordered-multimap"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79"
dependencies = [
"dlv-list",
"hashbrown 0.14.3",
]
[[package]]
name = "overload"
version = "0.1.1"
@ -3528,8 +3577,10 @@ dependencies = [
"log",
"once_cell",
"percent-encoding",
"quick-xml 0.31.0",
"rand 0.8.5",
"reqwest",
"rust-ini",
"serde",
"serde_json",
"sha1",
@ -3642,6 +3693,17 @@ dependencies = [
"chrono",
]
[[package]]
name = "rust-ini"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d625ed57d8f49af6cfa514c42e1a71fadcff60eb0b1c517ff82fe41aa025b41"
dependencies = [
"cfg-if",
"ordered-multimap",
"trim-in-place",
]
[[package]]
name = "rustc-demangle"
version = "0.1.23"
@ -3654,6 +3716,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "rusticata-macros"
version = "4.1.0"
@ -3818,6 +3889,12 @@ dependencies = [
"libc",
]
[[package]]
name = "semver"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
[[package]]
name = "serde"
version = "1.0.204"
@ -4652,6 +4729,12 @@ dependencies = [
"tracing-log 0.2.0",
]
[[package]]
name = "trim-in-place"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc"
[[package]]
name = "try-lock"
version = "0.2.5"

View File

@ -70,7 +70,10 @@ humantime = "2.1.0"
prost-wkt-types = "0.4"
chrono = { version = "0.4.35", features = ["serde"] }
openssl = { version = "0.10", features = ["vendored"] }
opendal = { version = "0.47.3", features = ["services-oss"]}
opendal = { version = "0.47.3", features = [
"services-s3",
"services-oss"
] }
clap = { version = "4.5.9", features = ["derive"] }
anyhow = "1.0.86"
toml_edit = "0.22.14"

View File

@ -31,6 +31,7 @@ use url::Url;
pub mod http;
pub mod oss;
pub mod s3;
// NAME is the name of the package.
pub const NAME: &str = "backend";
@ -232,6 +233,10 @@ impl BackendFactory {
.insert("https".to_string(), Box::new(http::HTTP::new()));
info!("load [https] builtin backend ");
self.backends
.insert("s3".to_string(), Box::new(s3::S3::new()));
info!("load [s3] builtin backend");
self.backends
.insert("oss".to_string(), Box::new(oss::OSS::new()));
info!("load [oss] builtin backend ");
@ -286,4 +291,12 @@ mod tests {
let backend = backend_factory.build("http://example.com");
assert!(backend.is_ok());
}
#[test]
fn should_return_s3_backend() {
let backend_factory =
BackendFactory::new(Some(Path::new("/var/lib/dragonfly/plugins/backend/"))).unwrap();
let backend = backend_factory.build("s3://example.com");
assert!(backend.is_ok());
}
}

View File

@ -0,0 +1,322 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::time::Duration;
use dragonfly_client_core::*;
use error::BackendError;
use opendal::layers::LoggingLayer;
use opendal::{raw::HttpClient, Metakey, Operator};
use tokio_util::io::StreamReader;
use tracing::info;
use url::Url;
use crate::*;
// ParsedURL is a struct that contains the parsed URL, bucket, and path.
#[derive(Debug)]
pub struct ParsedURL {
// url is the requested URL of the S3.
url: Url,
// bucket is the bucket of the S3.
bucket: String,
// key is the key of the S3.
key: String,
}
// ParsedURL implements the ParsedURL trait.
impl ParsedURL {
// is_dir returns true if the URL path ends with a slash.
fn is_dir(&self) -> bool {
self.url.path().ends_with('/')
}
// make_url_by_entry_path makes a URL by the entry path when the URL is a directory.
fn make_url_by_entry_path(&self, entry_path: &str) -> Url {
let mut url = self.url.clone();
url.set_path(entry_path);
url
}
}
// ParsedURL implements the TryFrom trait for the URL.
//
// The S3 URl should be in the format of `s3://<bucket>/<path>`.
impl TryFrom<Url> for ParsedURL {
type Error = Error;
// try_from parses the URL and returns a ParsedURL.
fn try_from(url: Url) -> std::result::Result<Self, Self::Error> {
let bucket = url
.host_str()
.ok_or_else(|| Error::InvalidURI(url.to_string()))?
.to_string();
let key = url
.path()
.strip_prefix('/')
.ok_or_else(|| Error::InvalidURI(url.to_string()))?
.to_string();
Ok(Self { url, bucket, key })
}
}
// S3 is a struct that implements the backend trait.
#[derive(Default)]
pub struct S3;
// S3 implements the S3 trait.
impl S3 {
/// Returns S3 that implements the Backend trait.
pub fn new() -> S3 {
Self
}
// operator initializes the operator with the parsed URL and object storage.
pub fn operator(
&self,
parsed_url: &ParsedURL,
object_storage: Option<ObjectStorage>,
timeout: Duration,
) -> Result<Operator> {
// Retrieve the access key ID, access key secret, and session token (optional) from the object storage.
let Some(ObjectStorage {
access_key_id,
access_key_secret,
session_token,
}) = object_storage
else {
error!("need access_key_id and access_key_secret");
return Err(Error::BackendError(BackendError {
message: "need access_key_id and access_key_secret".to_string(),
status_code: None,
header: None,
}));
};
// Create a reqwest http client.
let client = reqwest::Client::builder().timeout(timeout).build()?;
// Set up operator builder.
let mut builder = opendal::services::S3::default();
builder
.access_key_id(&access_key_id)
.secret_access_key(&access_key_secret)
.http_client(HttpClient::with(client))
.bucket(&parsed_url.bucket);
// Configure the session token if it is provided.
if let Some(token) = session_token.as_deref() {
builder.security_token(token);
}
let operator = Operator::new(builder)
.map_err(|err| {
Error::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?
.layer(LoggingLayer::default())
.finish();
Ok(operator)
}
}
// Backend implements the Backend trait.
#[tonic::async_trait]
impl crate::Backend for S3 {
//head gets the header of the request.
async fn head(&self, request: HeadRequest) -> Result<HeadResponse> {
info!(
"head request {} {}: {:?}",
request.task_id, request.url, request.http_header
);
// Parse the URL and convert it to a ParsedURL for create the S3 operator.
let url: Url = request
.url
.parse()
.map_err(|_| Error::InvalidURI(request.url.clone()))?;
let parsed_url: ParsedURL = url.try_into().map_err(|err| {
error!(
"parse head request url failed {} {}: {}",
request.task_id, request.url, err
);
err
})?;
// Initialize the operator with the parsed URL, object storage, and timeout.
let operator = self.operator(&parsed_url, request.object_storage, request.timeout)?;
// Get the entries if url point to a directory.
let entries = if parsed_url.is_dir() {
Some(
operator
.list_with(&parsed_url.key)
.recursive(true)
.metakey(Metakey::ContentLength | Metakey::Mode)
.await // Do the list op here.
.map_err(|err| {
error!(
"list request failed {} {}: {}",
request.task_id, request.url, err
);
Error::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?
.into_iter()
.map(|entry| {
let metadata = entry.metadata();
DirEntry {
url: parsed_url.make_url_by_entry_path(entry.path()).to_string(),
content_length: metadata.content_length() as usize,
is_dir: metadata.is_dir(),
}
})
.collect(),
)
} else {
None
};
// Stat the object to get the response from the S3.
let response = operator.stat_with(&parsed_url.key).await.map_err(|err| {
error!(
"stat request failed {} {}: {}",
request.task_id, request.url, err
);
Error::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?;
info!(
"head response {} {}: {}",
request.task_id,
request.url,
response.content_length()
);
Ok(HeadResponse {
success: true,
content_length: Some(response.content_length()),
http_header: None,
http_status_code: None,
error_message: None,
entries,
})
}
// Returns content of requested file.
async fn get(&self, request: GetRequest) -> Result<GetResponse<Body>> {
info!(
"get request {} {}: {:?}",
request.piece_id, request.url, request.http_header
);
// Parse the URL and convert it to a ParsedURL for create the S3 operator.
let url: Url = request
.url
.parse()
.map_err(|_| Error::InvalidURI(request.url.clone()))?;
let parsed_url: ParsedURL = url.try_into().map_err(|err| {
error!(
"parse head request url failed {} {}: {}",
request.piece_id, request.url, err
);
err
})?;
// Initialize the operator with the parsed URL, object storage, and timeout.
let operator_reader = self
.operator(&parsed_url, request.object_storage, request.timeout)?
.reader(&parsed_url.key)
.await
.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
);
Error::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?;
let stream = match request.range {
Some(range) => operator_reader
.into_bytes_stream(range.start..range.start + range.length)
.await
.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
);
Error::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?,
None => operator_reader.into_bytes_stream(..).await.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
);
Error::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?,
};
Ok(crate::GetResponse {
success: true,
http_header: None,
http_status_code: Some(reqwest::StatusCode::OK),
reader: Box::new(StreamReader::new(stream)),
error_message: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use url::Url;
#[test]
fn test_valid_s3_url() {
let url = Url::parse("s3://my-bucket/path/to/object.txt").unwrap();
let parsed_url = ParsedURL::try_from(url).unwrap();
assert_eq!(parsed_url.bucket, "my-bucket");
assert_eq!(parsed_url.key, "path/to/object.txt");
}
}

View File

@ -150,6 +150,9 @@ struct Args {
)]
access_key_secret: Option<String>,
#[arg(long, help = "Specify the session token for AWS S3")]
session_token: Option<String>,
#[arg(
short = 'l',
long,