diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index c7956917..235ce657 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use std::time::Instant; use tokio::fs::{self, File, OpenOptions}; use tokio::io::{ - self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter, SeekFrom + self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter, SeekFrom }; use tokio_util::io::InspectReader; use tracing::{error, info, instrument, warn}; @@ -243,7 +243,8 @@ impl Content { #[instrument(skip_all)] pub async fn copy_task(&self, task_id: &str, to: &Path) -> Result<()> { if self.config.storage.encryption.enable { - self.export_encrypted_file_by_range(task_id, to, true, None).await?; + let task_path = self.get_task_path(task_id); + self.export_encrypted_file(task_id, task_path.as_path(), to, None).await?; } else { fs::copy(self.get_task_path(task_id), to).await?; } @@ -255,7 +256,8 @@ impl Content { #[instrument(skip_all)] async fn copy_task_by_range(&self, task_id: &str, to: &Path, range: Range) -> Result<()> { if self.config.storage.encryption.enable { - self.export_encrypted_file_by_range(task_id, to, true, Some(range)).await?; + let task_path = self.get_task_path(task_id); + self.export_encrypted_file(task_id, task_path.as_path(), to, Some(range)).await?; return Ok(()); } // Ensure the parent directory of the destination exists. @@ -332,12 +334,13 @@ impl Content { limited_reader, key, task_id, + // same as f_reader target_offset, ); - Ok(Either::Left(decrypt_reader)) - } else { - Ok(Either::Right(limited_reader)) + + return Ok(Either::Left(decrypt_reader)); } + Ok(Either::Right(limited_reader)) } /// read_piece_with_dual_read return two readers, one is the range reader, and the other is the @@ -350,10 +353,8 @@ impl Content { length: u64, range: Option, ) -> Result<(impl AsyncRead, impl AsyncRead)> { - // TODO: encryption - todo!("encryption"); let task_path = self.get_task_path(task_id); - + // Calculate the target offset and length based on the range. let (target_offset, target_length) = calculate_piece_range(offset, length, range); @@ -370,6 +371,21 @@ impl Content { })?; let range_reader = f_range_reader.take(target_length); + // encryption + let range_reader = if self.config.storage.encryption.enable { + let key = self.key.as_ref().expect("should have key when encryption enabled"); + let decrypt_reader = DecryptReader::new( + range_reader, + key, + task_id, + // same as f_range_reader + target_offset + ); + Either::Left(decrypt_reader) + } else { + Either::Right(range_reader) + }; + // Create full reader of the piece. let f = File::open(task_path.as_path()).await.inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); @@ -384,6 +400,21 @@ impl Content { })?; let reader = f_reader.take(length); + // encryption + let reader = if self.config.storage.encryption.enable { + let key = self.key.as_ref().expect("should have key when encryption enabled"); + let decrypt_reader = DecryptReader::new( + reader, + key, + task_id, + // same as f_reader + offset + ); + Either::Left(decrypt_reader) + } else { + Either::Right(reader) + }; + Ok((range_reader, reader)) } @@ -544,7 +575,8 @@ impl Content { #[instrument(skip_all)] pub async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> { if self.config.storage.encryption.enable { - self.export_encrypted_file_by_range(task_id, to, false, None).await?; + let task_path = self.get_persistent_cache_task_path(task_id); + self.export_encrypted_file(task_id, task_path.as_path(), to, None).await?; } else { fs::copy(self.get_persistent_cache_task_path(task_id), to).await?; } @@ -587,6 +619,7 @@ impl Content { limited_reader, key, task_id, + // same as f_reader target_offset, ); @@ -606,8 +639,6 @@ impl Content { length: u64, range: Option, ) -> Result<(impl AsyncRead, impl AsyncRead)> { - // TODO: encryption - todo!("encryption"); let task_path = self.get_persistent_cache_task_path(task_id); // Calculate the target offset and length based on the range. @@ -626,6 +657,21 @@ impl Content { })?; let range_reader = f_range_reader.take(target_length); + // encryption + let range_reader = if self.config.storage.encryption.enable { + let key = self.key.as_ref().expect("should have key when encryption enabled"); + let decrypt_reader = DecryptReader::new( + range_reader, + key, + task_id, + // same as f_range_reader + target_offset + ); + Either::Left(decrypt_reader) + } else { + Either::Right(range_reader) + }; + // Create full reader of the piece. let f = File::open(task_path.as_path()).await.inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); @@ -640,6 +686,21 @@ impl Content { })?; let reader = f_reader.take(length); + // encryption + let reader = if self.config.storage.encryption.enable { + let key = self.key.as_ref().expect("should have key when encryption enabled"); + let decrypt_reader = DecryptReader::new( + reader, + key, + task_id, + // same as f_reader + offset + ); + Either::Left(decrypt_reader) + } else { + Either::Right(reader) + }; + Ok((range_reader, reader)) } @@ -735,17 +796,10 @@ impl Content { .join(task_id) } - /// TODO: copy encrypt file - async fn export_encrypted_file_by_range(&self, task_id: &str, to: &Path, is_task: bool, range: Option) -> Result<()> { - let task_path = if is_task { - self.get_task_path(task_id) - } else { - self.get_persistent_cache_task_path(task_id) - }; - + /// export_encrypted_file decrypts the encrypted file to the destination. + async fn export_encrypted_file(&self, task_id: &str, task_path: &Path, to: &Path, range: Option) -> Result<()> { // source file - // TODO: difference with OpenOption? - let mut src_file = File::open(task_path.as_path()).await.inspect_err(|err| { + let mut src_file = File::open(task_path).await.inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); })?; @@ -758,7 +812,6 @@ impl Content { (Either::Right(src_file), 0) }; - // TODO: need buf? let src_buf_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, src_file); let key = self.key.as_ref().expect("should have key when encryption enabled"); let decrypt_reader = DecryptReader::new( @@ -779,11 +832,10 @@ impl Content { error!("open {:?} failed: {}", to, err); })?; - // ALT - // TODO: need buf? let mut reader = decrypt_reader; let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, dest_file); + // timer let start = Instant::now(); let length = io::copy(&mut reader, &mut writer).await.inspect_err(|err| { @@ -794,22 +846,14 @@ impl Content { error!("flush {:?} failed: {}", task_path, err); })?; + // timer let duration = start.elapsed(); - let task_type = if is_task { - "Task" - } else { - "PersistentCacheTask" - }; info!( - "{} copy decrypted {} B = {} KB = {} MB to {:?}, costing {:?}", - task_type, length, length / 1024, length / 1024 / 1024, to, duration + "copy decrypted {} B = {} KB = {} MB to {:?}, cost {:?}", + length, length / 1024, length / 1024 / 1024, to, duration ); - // ALT - // let writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, dest_file); - // let _len = file_copy(decrypt_reader, writer, self.config.storage.write_buffer_size).await?; - Ok(()) } } @@ -827,39 +871,6 @@ pub fn calculate_piece_range(offset: u64, length: u64, range: Option) -> } } -// TODO: better then tokio::io::copy? -pub async fn file_copy(mut reader: R, mut writer: W, buf_size: usize) -> Result -where - R: AsyncRead + Unpin, - W: AsyncWrite + Unpin, -{ - let start = Instant::now(); - // const BUF_SIZE: usize = 8; - // let mut buf = vec![0u8; BUF_SIZE * 1024 * 1024]; - let mut buf = vec![0u8; buf_size]; - let mut total = 0; - - loop { - let n = reader.read(&mut buf).await?; - if n == 0 { - break; - } - writer.write_all(&buf[..n]).await?; - total += n as u64; - } - - writer.flush().await.unwrap(); - - let elapsed = start.elapsed(); - - let total_kb = total / 1024; - let total_mb = total / 1024 / 1024; - // let total_mb = total as f64 / (1024.0 * 1024.0); - info!("diy file copy {}B = {}KB = {}MB, costing {:?}", total, total_kb, total_mb, elapsed); - - Ok(total) -} - #[cfg(test)] mod tests { use super::*;