diff --git a/Cargo.lock b/Cargo.lock index 1cc35708..a19de9fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -774,6 +774,26 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + [[package]] name = "constant_time_eq" version = "0.3.0" @@ -814,6 +834,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -982,6 +1011,15 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "dragonfly-api" version = "2.0.134" @@ -2687,6 +2725,7 @@ dependencies = [ "base64 0.22.1", "bytes", "chrono", + "crc32c", "flagset", "futures", "getrandom", @@ -2841,6 +2880,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.3", +] + [[package]] name = "overload" version = "0.1.1" @@ -3528,8 +3577,10 @@ dependencies = [ "log", "once_cell", "percent-encoding", + "quick-xml 0.31.0", "rand 0.8.5", "reqwest", + "rust-ini", "serde", "serde_json", "sha1", @@ -3642,6 +3693,17 @@ dependencies = [ "chrono", ] +[[package]] +name = "rust-ini" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d625ed57d8f49af6cfa514c42e1a71fadcff60eb0b1c517ff82fe41aa025b41" +dependencies = [ + "cfg-if", + "ordered-multimap", + "trim-in-place", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -3654,6 +3716,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rusticata-macros" version = "4.1.0" @@ -3818,6 +3889,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.204" @@ -4652,6 +4729,12 @@ dependencies = [ "tracing-log 0.2.0", ] +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 8f4f6953..c768cfc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,10 @@ humantime = "2.1.0" prost-wkt-types = "0.4" chrono = { version = "0.4.35", features = ["serde"] } openssl = { version = "0.10", features = ["vendored"] } -opendal = { version = "0.47.3", features = ["services-oss"]} +opendal = { version = "0.47.3", features = [ + "services-s3", + "services-oss" +] } clap = { version = "4.5.9", features = ["derive"] } anyhow = "1.0.86" toml_edit = "0.22.14" diff --git a/dragonfly-client-backend/src/lib.rs b/dragonfly-client-backend/src/lib.rs index 8d75bdfe..5f7ea44c 100644 --- a/dragonfly-client-backend/src/lib.rs +++ b/dragonfly-client-backend/src/lib.rs @@ -31,6 +31,7 @@ use url::Url; pub mod http; pub mod oss; +pub mod s3; // NAME is the name of the package. pub const NAME: &str = "backend"; @@ -232,6 +233,10 @@ impl BackendFactory { .insert("https".to_string(), Box::new(http::HTTP::new())); info!("load [https] builtin backend "); + self.backends + .insert("s3".to_string(), Box::new(s3::S3::new())); + info!("load [s3] builtin backend"); + self.backends .insert("oss".to_string(), Box::new(oss::OSS::new())); info!("load [oss] builtin backend "); @@ -286,4 +291,12 @@ mod tests { let backend = backend_factory.build("http://example.com"); assert!(backend.is_ok()); } + + #[test] + fn should_return_s3_backend() { + let backend_factory = + BackendFactory::new(Some(Path::new("/var/lib/dragonfly/plugins/backend/"))).unwrap(); + let backend = backend_factory.build("s3://example.com"); + assert!(backend.is_ok()); + } } diff --git a/dragonfly-client-backend/src/s3.rs b/dragonfly-client-backend/src/s3.rs new file mode 100644 index 00000000..d09dba6b --- /dev/null +++ b/dragonfly-client-backend/src/s3.rs @@ -0,0 +1,322 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::time::Duration; + +use dragonfly_client_core::*; +use error::BackendError; +use opendal::layers::LoggingLayer; +use opendal::{raw::HttpClient, Metakey, Operator}; +use tokio_util::io::StreamReader; +use tracing::info; +use url::Url; + +use crate::*; + +// ParsedURL is a struct that contains the parsed URL, bucket, and path. +#[derive(Debug)] +pub struct ParsedURL { + // url is the requested URL of the S3. + url: Url, + + // bucket is the bucket of the S3. + bucket: String, + + // key is the key of the S3. + key: String, +} + +// ParsedURL implements the ParsedURL trait. +impl ParsedURL { + // is_dir returns true if the URL path ends with a slash. + fn is_dir(&self) -> bool { + self.url.path().ends_with('/') + } + + // make_url_by_entry_path makes a URL by the entry path when the URL is a directory. + fn make_url_by_entry_path(&self, entry_path: &str) -> Url { + let mut url = self.url.clone(); + url.set_path(entry_path); + url + } +} + +// ParsedURL implements the TryFrom trait for the URL. +// +// The S3 URl should be in the format of `s3:///`. +impl TryFrom for ParsedURL { + type Error = Error; + + // try_from parses the URL and returns a ParsedURL. + fn try_from(url: Url) -> std::result::Result { + let bucket = url + .host_str() + .ok_or_else(|| Error::InvalidURI(url.to_string()))? + .to_string(); + + let key = url + .path() + .strip_prefix('/') + .ok_or_else(|| Error::InvalidURI(url.to_string()))? + .to_string(); + + Ok(Self { url, bucket, key }) + } +} + +// S3 is a struct that implements the backend trait. +#[derive(Default)] +pub struct S3; + +// S3 implements the S3 trait. +impl S3 { + /// Returns S3 that implements the Backend trait. + pub fn new() -> S3 { + Self + } + + // operator initializes the operator with the parsed URL and object storage. + pub fn operator( + &self, + parsed_url: &ParsedURL, + object_storage: Option, + timeout: Duration, + ) -> Result { + // Retrieve the access key ID, access key secret, and session token (optional) from the object storage. + let Some(ObjectStorage { + access_key_id, + access_key_secret, + session_token, + }) = object_storage + else { + error!("need access_key_id and access_key_secret"); + return Err(Error::BackendError(BackendError { + message: "need access_key_id and access_key_secret".to_string(), + status_code: None, + header: None, + })); + }; + + // Create a reqwest http client. + let client = reqwest::Client::builder().timeout(timeout).build()?; + + // Set up operator builder. + let mut builder = opendal::services::S3::default(); + + builder + .access_key_id(&access_key_id) + .secret_access_key(&access_key_secret) + .http_client(HttpClient::with(client)) + .bucket(&parsed_url.bucket); + + // Configure the session token if it is provided. + if let Some(token) = session_token.as_deref() { + builder.security_token(token); + } + + let operator = Operator::new(builder) + .map_err(|err| { + Error::BackendError(BackendError { + message: err.to_string(), + status_code: None, + header: None, + }) + })? + .layer(LoggingLayer::default()) + .finish(); + + Ok(operator) + } +} + +// Backend implements the Backend trait. +#[tonic::async_trait] +impl crate::Backend for S3 { + //head gets the header of the request. + async fn head(&self, request: HeadRequest) -> Result { + info!( + "head request {} {}: {:?}", + request.task_id, request.url, request.http_header + ); + + // Parse the URL and convert it to a ParsedURL for create the S3 operator. + let url: Url = request + .url + .parse() + .map_err(|_| Error::InvalidURI(request.url.clone()))?; + let parsed_url: ParsedURL = url.try_into().map_err(|err| { + error!( + "parse head request url failed {} {}: {}", + request.task_id, request.url, err + ); + err + })?; + + // Initialize the operator with the parsed URL, object storage, and timeout. + let operator = self.operator(&parsed_url, request.object_storage, request.timeout)?; + + // Get the entries if url point to a directory. + let entries = if parsed_url.is_dir() { + Some( + operator + .list_with(&parsed_url.key) + .recursive(true) + .metakey(Metakey::ContentLength | Metakey::Mode) + .await // Do the list op here. + .map_err(|err| { + error!( + "list request failed {} {}: {}", + request.task_id, request.url, err + ); + Error::BackendError(BackendError { + message: err.to_string(), + status_code: None, + header: None, + }) + })? + .into_iter() + .map(|entry| { + let metadata = entry.metadata(); + DirEntry { + url: parsed_url.make_url_by_entry_path(entry.path()).to_string(), + content_length: metadata.content_length() as usize, + is_dir: metadata.is_dir(), + } + }) + .collect(), + ) + } else { + None + }; + + // Stat the object to get the response from the S3. + let response = operator.stat_with(&parsed_url.key).await.map_err(|err| { + error!( + "stat request failed {} {}: {}", + request.task_id, request.url, err + ); + Error::BackendError(BackendError { + message: err.to_string(), + status_code: None, + header: None, + }) + })?; + + info!( + "head response {} {}: {}", + request.task_id, + request.url, + response.content_length() + ); + + Ok(HeadResponse { + success: true, + content_length: Some(response.content_length()), + http_header: None, + http_status_code: None, + error_message: None, + entries, + }) + } + + // Returns content of requested file. + async fn get(&self, request: GetRequest) -> Result> { + info!( + "get request {} {}: {:?}", + request.piece_id, request.url, request.http_header + ); + + // Parse the URL and convert it to a ParsedURL for create the S3 operator. + let url: Url = request + .url + .parse() + .map_err(|_| Error::InvalidURI(request.url.clone()))?; + let parsed_url: ParsedURL = url.try_into().map_err(|err| { + error!( + "parse head request url failed {} {}: {}", + request.piece_id, request.url, err + ); + err + })?; + + // Initialize the operator with the parsed URL, object storage, and timeout. + let operator_reader = self + .operator(&parsed_url, request.object_storage, request.timeout)? + .reader(&parsed_url.key) + .await + .map_err(|err| { + error!( + "get request failed {} {}: {}", + request.piece_id, request.url, err + ); + Error::BackendError(BackendError { + message: err.to_string(), + status_code: None, + header: None, + }) + })?; + + let stream = match request.range { + Some(range) => operator_reader + .into_bytes_stream(range.start..range.start + range.length) + .await + .map_err(|err| { + error!( + "get request failed {} {}: {}", + request.piece_id, request.url, err + ); + Error::BackendError(BackendError { + message: err.to_string(), + status_code: None, + header: None, + }) + })?, + None => operator_reader.into_bytes_stream(..).await.map_err(|err| { + error!( + "get request failed {} {}: {}", + request.piece_id, request.url, err + ); + Error::BackendError(BackendError { + message: err.to_string(), + status_code: None, + header: None, + }) + })?, + }; + + Ok(crate::GetResponse { + success: true, + http_header: None, + http_status_code: Some(reqwest::StatusCode::OK), + reader: Box::new(StreamReader::new(stream)), + error_message: None, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use url::Url; + + #[test] + fn test_valid_s3_url() { + let url = Url::parse("s3://my-bucket/path/to/object.txt").unwrap(); + let parsed_url = ParsedURL::try_from(url).unwrap(); + assert_eq!(parsed_url.bucket, "my-bucket"); + assert_eq!(parsed_url.key, "path/to/object.txt"); + } +} diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 5c5687aa..fa60071f 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -150,6 +150,9 @@ struct Args { )] access_key_secret: Option, + #[arg(long, help = "Specify the session token for AWS S3")] + session_token: Option, + #[arg( short = 'l', long,