diff --git a/Cargo.lock b/Cargo.lock index ab4e928b..0f749f7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1011,6 +1011,7 @@ name = "dragonfly-client" version = "1.0.24" dependencies = [ "anyhow", + "base64 0.22.1", "bytes", "bytesize", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 31c9e66b..734ea921 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,6 +118,7 @@ tonic-health = "0.12.3" hashring = "0.3.6" reqwest-tracing = "0.5" mocktail = "0.3.0" +base64 = "0.22.1" [profile.release] opt-level = 3 diff --git a/dragonfly-client-storage/Cargo.toml b/dragonfly-client-storage/Cargo.toml index f85adbb9..0e7a1b96 100644 --- a/dragonfly-client-storage/Cargo.toml +++ b/dragonfly-client-storage/Cargo.toml @@ -31,10 +31,10 @@ bytes.workspace = true bytesize.workspace = true leaky-bucket.workspace = true vortex-protocol.workspace = true +base64.workspace = true num_cpus = "1.17" bincode = "1.3.3" walkdir = "2.5.0" -base64 = "0.22.1" # encryption # chacha20poly1305 = { version = "0.10", features = ["std"] } diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 7ef56e91..57524496 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -216,6 +216,11 @@ impl Content { /// 2.2. If the hard link fails, copy the task content to the destination once the task is finished, then return immediately. #[instrument(skip_all)] pub async fn hard_link_task(&self, task_id: &str, to: &Path) -> Result<()> { + // check + if self.config.storage.encryption.enable { + panic!("HARD LINK is not compatible with encryption"); + } + let task_path = self.get_task_path(task_id); if let Err(err) = fs::hard_link(task_path.clone(), to).await { if err.kind() == std::io::ErrorKind::AlreadyExists { @@ -292,6 +297,8 @@ impl Content { offset: u64, length: u64, range: Option, + // Added + piece_id: &str, ) -> Result { let task_path = self.get_task_path(task_id); @@ -310,7 +317,19 @@ impl Content { error!("seek {:?} failed: {}", task_path, err); })?; - Ok(f_reader.take(target_length)) + let limited_reader = f_reader.take(target_length); + + if self.config.storage.encryption.enable { + let key = self.key.as_ref().expect("should have key when encryption enabled"); + let decrypt_reader = DecryptReader::new( + limited_reader, + key, + piece_id + ); + Ok(Either::Left(decrypt_reader)) + } else { + Ok(Either::Right(limited_reader)) + } } /// read_piece_with_dual_read return two readers, one is the range reader, and the other is the @@ -366,6 +385,8 @@ impl Content { offset: u64, expected_length: u64, reader: &mut R, + // ADDED + piece_id: &str, ) -> Result { // Open the file and seek to the offset. let task_path = self.get_task_path(task_id); @@ -387,11 +408,24 @@ impl Content { // Copy the piece to the file while updating the CRC32 value. let mut hasher = crc32fast::Hasher::new(); - let mut tee = InspectReader::new(reader, |bytes| { + let tee = InspectReader::new(reader, |bytes| { hasher.update(bytes); }); - let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { + let mut tee_wrapper = if self.config.storage.encryption.enable { + let key = self.key.as_ref().expect("should have key when encryption enabled"); + + let encrypt_reader = EncryptReader::new( + tee, + key, + piece_id + ); + Either::Left(encrypt_reader) + } else { + Either::Right(tee) + }; + + let length = io::copy(&mut tee_wrapper, &mut writer).await.inspect_err(|err| { error!("copy {:?} failed: {}", task_path, err); })?; @@ -533,11 +567,12 @@ impl Content { .inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); })?; - + + let limited_reader = f_reader.take(target_length); + if self.config.storage.encryption.enable { let key = self.key.as_ref().expect("should have key when encryption enabled"); - let limited_reader = f_reader.take(target_length); let decrypt_reader = DecryptReader::new( limited_reader, key, @@ -547,7 +582,7 @@ impl Content { return Ok(Either::Left(decrypt_reader)); } - Ok(Either::Right(f_reader.take(target_length))) + Ok(Either::Right(limited_reader)) } // TODO encryption? @@ -633,7 +668,7 @@ impl Content { hasher.update(bytes); }); - let mut tee_warpper = if self.config.storage.encryption.enable { + let mut tee_wrapper = if self.config.storage.encryption.enable { let key = self.key.as_ref().expect("should have key when encryption enabled"); let encrypt_reader = EncryptReader::new( @@ -646,7 +681,7 @@ impl Content { Either::Right(tee) }; - let length = io::copy(&mut tee_warpper, &mut writer).await.inspect_err(|err| { + let length = io::copy(&mut tee_wrapper, &mut writer).await.inspect_err(|err| { error!("copy {:?} failed: {}", task_path, err); })?; @@ -785,11 +820,11 @@ mod tests { let data = b"hello, world!"; let mut reader = Cursor::new(data); content - .write_piece(task_id, 0, 13, &mut reader) + .write_piece(task_id, 0, 13, &mut reader, "") .await .unwrap(); - let mut reader = content.read_piece(task_id, 0, 13, None).await.unwrap(); + let mut reader = content.read_piece(task_id, 0, 13, None, "").await.unwrap(); let mut buffer = Vec::new(); reader.read_to_end(&mut buffer).await.unwrap(); assert_eq!(buffer, data); @@ -803,6 +838,7 @@ mod tests { start: 0, length: 5, }), + "", ) .await .unwrap(); @@ -823,7 +859,7 @@ mod tests { let data = b"test"; let mut reader = Cursor::new(data); let response = content - .write_piece(task_id, 0, 4, &mut reader) + .write_piece(task_id, 0, 4, &mut reader, "") .await .unwrap(); assert_eq!(response.length, 4); diff --git a/dragonfly-client-storage/src/encrypt/algorithm/aes_ctr.rs b/dragonfly-client-storage/src/encrypt/algorithm/aes_ctr.rs index 715d748f..23e7a694 100644 --- a/dragonfly-client-storage/src/encrypt/algorithm/aes_ctr.rs +++ b/dragonfly-client-storage/src/encrypt/algorithm/aes_ctr.rs @@ -2,11 +2,11 @@ use aes::Aes256; use ctr::Ctr128BE; use ctr::cipher::{StreamCipher, KeyIvInit}; -use super::EncryptAlgo; +use super::EncryptionAlgorithm; pub type Aes256Ctr = Ctr128BE; -impl EncryptAlgo for Aes256Ctr { +impl EncryptionAlgorithm for Aes256Ctr { const NONCE_SIZE: usize = 16; const KEY_SIZE: usize = 32; diff --git a/dragonfly-client-storage/src/encrypt/algorithm/mod.rs b/dragonfly-client-storage/src/encrypt/algorithm/mod.rs index 7aa63aba..50292ac6 100644 --- a/dragonfly-client-storage/src/encrypt/algorithm/mod.rs +++ b/dragonfly-client-storage/src/encrypt/algorithm/mod.rs @@ -2,9 +2,11 @@ mod aes_ctr; pub use aes_ctr::Aes256Ctr; -pub trait EncryptAlgo { - const NONCE_SIZE: usize; +pub trait EncryptionAlgorithm { + /// Bytes of key const KEY_SIZE: usize; + /// Bytes of nonce + const NONCE_SIZE: usize; fn new(key: &[u8], nonce: &[u8]) -> Self; diff --git a/dragonfly-client-storage/src/encrypt/cryptor/reader.rs b/dragonfly-client-storage/src/encrypt/cryptor/reader.rs index 83361a1b..a769bc47 100644 --- a/dragonfly-client-storage/src/encrypt/cryptor/reader.rs +++ b/dragonfly-client-storage/src/encrypt/cryptor/reader.rs @@ -3,9 +3,9 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::ReadBuf; -use crate::encrypt::{EncryptAlgo, Aes256Ctr}; +use crate::encrypt::{EncryptionAlgorithm, Aes256Ctr}; -pub struct EncryptReader { +pub struct EncryptReader { inner: R, cipher: A, } @@ -21,7 +21,7 @@ fn parse_piece_id(piece_id: &str) -> Option<(&str, u32)> { }) } -// impl EncryptReader { +// impl EncryptReader { // pub fn new(inner: R, key: &[u8], piece_id: &str) -> Self { // let (task_id, piece_num) = parse_piece_id(piece_id) // .expect("should have task_id and piece_num"); @@ -33,22 +33,22 @@ fn parse_piece_id(piece_id: &str) -> Option<(&str, u32)> { // } // } -impl EncryptReader { +impl EncryptReader { /// default for Aes256Ctr pub fn new(inner: R, key: &[u8], piece_id: &str) -> Self { let (task_id, piece_num) = parse_piece_id(piece_id) .expect("should have task_id and piece_num"); let nonce = Aes256Ctr::build_nonce(task_id, piece_num); - let cipher = ::new(key, &nonce); + let cipher = ::new(key, &nonce); Self { inner, cipher } } } -impl Unpin for EncryptReader where R: Unpin {} +impl Unpin for EncryptReader where R: Unpin {} -impl AsyncRead for EncryptReader { +impl AsyncRead for EncryptReader { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/dragonfly-client-storage/src/encrypt/mod.rs b/dragonfly-client-storage/src/encrypt/mod.rs index be0150fb..aaf7bf11 100644 --- a/dragonfly-client-storage/src/encrypt/mod.rs +++ b/dragonfly-client-storage/src/encrypt/mod.rs @@ -1,6 +1,6 @@ mod algorithm; mod cryptor; -use algorithm::{EncryptAlgo, Aes256Ctr}; +use algorithm::{EncryptionAlgorithm, Aes256Ctr}; pub use cryptor::{EncryptReader, DecryptReader}; diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 1257ec60..1b07a6f5 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -25,7 +25,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::io::AsyncRead; use tokio::time::sleep; use tokio_util::either::Either; use tokio_util::io::InspectReader; @@ -550,7 +550,7 @@ impl Storage { ) -> Result { let response = self .content - .write_piece(task_id, offset, length, reader) + .write_piece(task_id, offset, length, reader, piece_id) .await?; let digest = Digest::new(Algorithm::Crc32, response.hash); @@ -602,7 +602,7 @@ impl Storage { ) -> Result { let response = self .content - .write_piece(task_id, offset, length, reader) + .write_piece(task_id, offset, length, reader, piece_id) .await?; let length = response.length; @@ -669,7 +669,7 @@ impl Storage { match self .content - .read_piece(task_id, piece.offset, piece.length, range) + .read_piece(task_id, piece.offset, piece.length, range, piece_id) .await { Ok(reader) => { diff --git a/dragonfly-client-util/Cargo.toml b/dragonfly-client-util/Cargo.toml index 4039aa7a..d5c8593a 100644 --- a/dragonfly-client-util/Cargo.toml +++ b/dragonfly-client-util/Cargo.toml @@ -45,6 +45,7 @@ reqwest-tracing.workspace = true reqwest-middleware.workspace = true rustix = { version = "1.1.2", features = ["fs"] } base64 = "0.22.1" +base64.workspace = true pnet = "0.35.0" protobuf = "3.7.2" diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index 56889a6b..a33e1b6a 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -70,6 +70,7 @@ vortex-protocol.workspace = true dashmap.workspace = true tonic-health.workspace = true hashring.workspace = true +base64.workspace = true tracing-appender = "0.2.3" tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] } tracing-panic = "0.1.2" diff --git a/dragonfly-client/src/bin/dfdaemon/main.rs b/dragonfly-client/src/bin/dfdaemon/main.rs index 906ec407..409edf76 100644 --- a/dragonfly-client/src/bin/dfdaemon/main.rs +++ b/dragonfly-client/src/bin/dfdaemon/main.rs @@ -42,6 +42,7 @@ use termion::{color, style}; use tokio::sync::mpsc; use tokio::sync::Barrier; use tracing::{error, info, Level}; +use base64::{prelude::BASE64_STANDARD, Engine as _}; #[cfg(not(target_env = "msvc"))] #[global_allocator] @@ -166,21 +167,7 @@ async fn main() -> Result<(), anyhow::Error> { // Get key from Manager let key = if config.storage.encryption.enable { - let source_type = if config.seed_peer.enable { - SourceType::SeedPeerSource.into() - } else { - SourceType::PeerSource.into() - }; - // Request a key from Manager - let key = manager_client.request_encryption_key( - RequestEncryptionKeyRequest { - source_type: source_type, - hostname: config.host.hostname.clone(), - ip: config.host.ip.unwrap().to_string(), - } - ).await?; - - info!("Key response: \n{:x?}", key); + let key = get_key_from_manager(&config, &manager_client).await; Some(key) } else { None @@ -475,3 +462,26 @@ async fn main() -> Result<(), anyhow::Error> { Ok(()) } + +async fn get_key_from_manager(config: &dfdaemon::Config, client: &ManagerClient) -> Vec{ + let source_type = if config.seed_peer.enable { + SourceType::SeedPeerSource.into() + } else { + SourceType::PeerSource.into() + }; + // Request a key from Manager + let key = client.request_encryption_key( + RequestEncryptionKeyRequest { + source_type: source_type, + hostname: config.host.hostname.clone(), + ip: config.host.ip.unwrap().to_string(), + } + ).await + .expect("Fail to get key from Manager"); + + let key_base64 = BASE64_STANDARD.encode(&key); + + info!("Key response(base64): {} \n (hex): {:x?}", key_base64, key); + + key +} diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index c3170e71..4da08383 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -444,6 +444,10 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { .unwrap_or_else(|err| error!("send download progress error: {:?}", err)); } + + // clone config for async + let config_clone = self.config.clone(); + tokio::spawn( async move { match task_manager_clone @@ -483,8 +487,25 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { return; } + // TODO: encryption is not compatible with hardlink + let encryption_enable = config_clone.storage.encryption.enable; + assert!(!encryption_enable || !download_clone.force_hard_link); + if let Some(output_path) = &download_clone.output_path { - if !download_clone.force_hard_link { + // if encryption is enabled, copy to path instead of hard link + if encryption_enable { + let output_path = Path::new(output_path.as_str()); + if output_path.exists() { + return; + } + let _res = task_manager_clone.export_encryption_file( + task_clone.id.as_ref(), + output_path + ) + .await + .map_err(|e| {error!("export encryption file: {}", e); return;}) + .expect("export should success"); + } else if !download_clone.force_hard_link { let output_path = Path::new(output_path.as_str()); if output_path.exists() { match task_manager_clone @@ -1031,6 +1052,9 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { .unwrap_or_else(|err| error!("send download progress error: {:?}", err)); } + // clone config for async + let config_clone = self.config.clone(); + tokio::spawn( async move { match task_manager_clone @@ -1070,11 +1094,12 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { } // TODO: encryption is not compatible with hardlink - assert!(!task_manager_clone.encrypted() || !request_clone.force_hard_link); + let encryption_enable = config_clone.storage.encryption.enable; + assert!(!encryption_enable || !request_clone.force_hard_link); if let Some(output_path) = &request_clone.output_path { // if encryption is enabled, copy to path instead of hard link - if task_manager_clone.encrypted() { + if encryption_enable { let output_path = Path::new(output_path.as_str()); if output_path.exists() { return; diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index a6e5eae2..95b88cc5 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -900,14 +900,11 @@ impl PersistentCacheTask { } }; - // // Remove the finished pieces from the pieces. - // let remaining_interested_pieces = self.piece.remove_finished_from_interested( - // finished_pieces.clone(), - // interested_pieces.clone(), - // ); - - // TODO: The remove function useless? Cause `finished_pieces` MUST be empty here - let remaining_interested_pieces = interested_pieces.clone(); + // Remove the finished pieces from the pieces. + let remaining_interested_pieces = self.piece.remove_finished_from_interested( + finished_pieces.clone(), + interested_pieces.clone(), + ); // Download the pieces from the parent. let partial_finished_pieces = match self @@ -1453,11 +1450,6 @@ impl PersistentCacheTask { self.storage.delete_persistent_cache_task(task_id).await } - /// whether the encrytion is enabled - pub fn encrypted(&self) -> bool { - self.config.storage.encryption.enable - } - /// export encryption file to path pub async fn export_encryption_file(&self, task_id: &str, to: &Path) -> ClientResult<()> { let task = self.storage.get_persistent_cache_task(task_id)? diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 23d43b30..56e251e9 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -57,7 +57,7 @@ use std::sync::{ Arc, Mutex, }; use std::time::Instant; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::sync::{ mpsc::{self, Sender}, Semaphore, @@ -141,6 +141,12 @@ impl Task { let task = self.storage.prepare_download_task_started(id).await?; if task.content_length.is_some() && task.piece_length.is_some() { + // Omit HARD-LINK when use encryption + if self.config.storage.encryption.enable { + info!("Omit HARD-LINK when encryption is enabled"); + return Ok(task); + } + // Attempt to create a hard link from the task file to the output path. // // Behavior based on force_hard_link setting: @@ -259,6 +265,12 @@ impl Task { .download_task_started(id, piece_length, content_length, response.http_header) .await; + // Omit HARD-LINK when use encryption + if self.config.storage.encryption.enable { + info!("Omit HARD-LINK when encryption is enabled"); + return task; + } + // Attempt to create a hard link from the task file to the output path. // // Behavior based on force_hard_link setting: @@ -726,14 +738,11 @@ impl Task { } }; - // // Remove the finished pieces from the pieces. - // let remaining_interested_pieces = self.piece.remove_finished_from_interested( - // finished_pieces.clone(), - // interested_pieces.clone(), - // ); - - // TODO: The remove function useless? Cause `finished_pieces` MUST be empty here - let remaining_interested_pieces = interested_pieces.clone(); + // Remove the finished pieces from the pieces. + let remaining_interested_pieces = self.piece.remove_finished_from_interested( + finished_pieces.clone(), + interested_pieces.clone(), + ); // Download the pieces from the parent. let partial_finished_pieces = match self @@ -1994,6 +2003,75 @@ impl Task { } } } + + /// export encryption file to path + pub async fn export_encryption_file(&self, task_id: &str, to: &Path) -> ClientResult<()> { + let task = self.storage.get_task(task_id)? + .ok_or_else(|| Error::TaskNotFound(task_id.to_string()))?; + + if !task.is_finished() { + // TODO errors + return Err(Error::Unknown(format!("Task {} is not finished", task_id))); + } + + // get all pieces + let mut pieces = self.storage.get_pieces(task_id)?; + // TODO: need sort? + pieces.sort_by_key(|p| p.number); + + if pieces.is_empty() { + return Err(Error::Unknown(format!("No pieces found for task {}", task_id))); + } + + info!("Exporting encrypted file for task {} with {} pieces to {:?}", + task_id, pieces.len(), to); + + let output_file = tokio::fs::File::create(to).await + // .or_err(ErrorType::StorageError)?; + .map_err(|e| Error::Unknown(format!("Failed to create output file: {}", e)))?; + let mut writer = BufWriter::new(output_file); + + // process every piece + for piece in pieces { + if !piece.is_finished() { + // warn!("Piece {} is not finished, skipping", piece.number); + // continue; + panic!("Piece {} is not finished", piece.number); + } + + debug!("Processing piece {} (offset: {}, length: {})", + piece.number, piece.offset, piece.length); + + // read and decrypt piece + let piece_id = self.piece.id(task_id, piece.number); + let mut reader = self.storage.upload_piece( + piece_id.as_str(), + task_id, + None, // read whole piece + ).await?; + + // write decrypted content to file + let bytes_copied = tokio::io::copy(&mut reader, &mut writer).await + // .or_err(ErrorType::StorageError)?; + .map_err(|e| Error::Unknown(format!("Failed to copy piece {}: {}", piece.number, e)))?; + + if bytes_copied != piece.length { + return Err(Error::Unknown(format!( + "Piece {} length mismatch: expected {}, got {}", + piece.number, piece.length, bytes_copied + ))); + } + + debug!("Successfully processed piece {} ({} bytes)", piece.number, bytes_copied); + } + + writer.flush().await + // .or_err(ErrorType::StorageError)?; + .map_err(|e| Error::Unknown(format!("Failed to flush output file: {}", e)))?; + + info!("Successfully exported encrypted file for task {} to {:?}", task_id, to); + Ok(()) + } } #[cfg(test)]