refactor: export encrypted file

Signed-off-by: chohee <cc5281@126.com>
This commit is contained in:
chohee 2025-09-01 23:07:11 +08:00
parent c68de9d437
commit 090da4d591
1 changed files with 80 additions and 69 deletions

View File

@ -27,7 +27,7 @@ use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use tokio::fs::{self, File, OpenOptions}; use tokio::fs::{self, File, OpenOptions};
use tokio::io::{ use tokio::io::{
self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter, SeekFrom self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter, SeekFrom
}; };
use tokio_util::io::InspectReader; use tokio_util::io::InspectReader;
use tracing::{error, info, instrument, warn}; use tracing::{error, info, instrument, warn};
@ -243,7 +243,8 @@ impl Content {
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn copy_task(&self, task_id: &str, to: &Path) -> Result<()> { pub async fn copy_task(&self, task_id: &str, to: &Path) -> Result<()> {
if self.config.storage.encryption.enable { if self.config.storage.encryption.enable {
self.export_encrypted_file_by_range(task_id, to, true, None).await?; let task_path = self.get_task_path(task_id);
self.export_encrypted_file(task_id, task_path.as_path(), to, None).await?;
} else { } else {
fs::copy(self.get_task_path(task_id), to).await?; fs::copy(self.get_task_path(task_id), to).await?;
} }
@ -255,7 +256,8 @@ impl Content {
#[instrument(skip_all)] #[instrument(skip_all)]
async fn copy_task_by_range(&self, task_id: &str, to: &Path, range: Range) -> Result<()> { async fn copy_task_by_range(&self, task_id: &str, to: &Path, range: Range) -> Result<()> {
if self.config.storage.encryption.enable { if self.config.storage.encryption.enable {
self.export_encrypted_file_by_range(task_id, to, true, Some(range)).await?; let task_path = self.get_task_path(task_id);
self.export_encrypted_file(task_id, task_path.as_path(), to, Some(range)).await?;
return Ok(()); return Ok(());
} }
// Ensure the parent directory of the destination exists. // Ensure the parent directory of the destination exists.
@ -332,12 +334,13 @@ impl Content {
limited_reader, limited_reader,
key, key,
task_id, task_id,
// same as f_reader
target_offset, target_offset,
); );
Ok(Either::Left(decrypt_reader))
} else { return Ok(Either::Left(decrypt_reader));
Ok(Either::Right(limited_reader))
} }
Ok(Either::Right(limited_reader))
} }
/// read_piece_with_dual_read return two readers, one is the range reader, and the other is the /// read_piece_with_dual_read return two readers, one is the range reader, and the other is the
@ -350,8 +353,6 @@ impl Content {
length: u64, length: u64,
range: Option<Range>, range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> { ) -> Result<(impl AsyncRead, impl AsyncRead)> {
// TODO: encryption
todo!("encryption");
let task_path = self.get_task_path(task_id); let task_path = self.get_task_path(task_id);
// Calculate the target offset and length based on the range. // Calculate the target offset and length based on the range.
@ -370,6 +371,21 @@ impl Content {
})?; })?;
let range_reader = f_range_reader.take(target_length); let range_reader = f_range_reader.take(target_length);
// encryption
let range_reader = if self.config.storage.encryption.enable {
let key = self.key.as_ref().expect("should have key when encryption enabled");
let decrypt_reader = DecryptReader::new(
range_reader,
key,
task_id,
// same as f_range_reader
target_offset
);
Either::Left(decrypt_reader)
} else {
Either::Right(range_reader)
};
// Create full reader of the piece. // Create full reader of the piece.
let f = File::open(task_path.as_path()).await.inspect_err(|err| { let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err); error!("open {:?} failed: {}", task_path, err);
@ -384,6 +400,21 @@ impl Content {
})?; })?;
let reader = f_reader.take(length); let reader = f_reader.take(length);
// encryption
let reader = if self.config.storage.encryption.enable {
let key = self.key.as_ref().expect("should have key when encryption enabled");
let decrypt_reader = DecryptReader::new(
reader,
key,
task_id,
// same as f_reader
offset
);
Either::Left(decrypt_reader)
} else {
Either::Right(reader)
};
Ok((range_reader, reader)) Ok((range_reader, reader))
} }
@ -544,7 +575,8 @@ impl Content {
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> { pub async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> {
if self.config.storage.encryption.enable { if self.config.storage.encryption.enable {
self.export_encrypted_file_by_range(task_id, to, false, None).await?; let task_path = self.get_persistent_cache_task_path(task_id);
self.export_encrypted_file(task_id, task_path.as_path(), to, None).await?;
} else { } else {
fs::copy(self.get_persistent_cache_task_path(task_id), to).await?; fs::copy(self.get_persistent_cache_task_path(task_id), to).await?;
} }
@ -587,6 +619,7 @@ impl Content {
limited_reader, limited_reader,
key, key,
task_id, task_id,
// same as f_reader
target_offset, target_offset,
); );
@ -606,8 +639,6 @@ impl Content {
length: u64, length: u64,
range: Option<Range>, range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> { ) -> Result<(impl AsyncRead, impl AsyncRead)> {
// TODO: encryption
todo!("encryption");
let task_path = self.get_persistent_cache_task_path(task_id); let task_path = self.get_persistent_cache_task_path(task_id);
// Calculate the target offset and length based on the range. // Calculate the target offset and length based on the range.
@ -626,6 +657,21 @@ impl Content {
})?; })?;
let range_reader = f_range_reader.take(target_length); let range_reader = f_range_reader.take(target_length);
// encryption
let range_reader = if self.config.storage.encryption.enable {
let key = self.key.as_ref().expect("should have key when encryption enabled");
let decrypt_reader = DecryptReader::new(
range_reader,
key,
task_id,
// same as f_range_reader
target_offset
);
Either::Left(decrypt_reader)
} else {
Either::Right(range_reader)
};
// Create full reader of the piece. // Create full reader of the piece.
let f = File::open(task_path.as_path()).await.inspect_err(|err| { let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err); error!("open {:?} failed: {}", task_path, err);
@ -640,6 +686,21 @@ impl Content {
})?; })?;
let reader = f_reader.take(length); let reader = f_reader.take(length);
// encryption
let reader = if self.config.storage.encryption.enable {
let key = self.key.as_ref().expect("should have key when encryption enabled");
let decrypt_reader = DecryptReader::new(
reader,
key,
task_id,
// same as f_reader
offset
);
Either::Left(decrypt_reader)
} else {
Either::Right(reader)
};
Ok((range_reader, reader)) Ok((range_reader, reader))
} }
@ -735,17 +796,10 @@ impl Content {
.join(task_id) .join(task_id)
} }
/// TODO: copy encrypt file /// export_encrypted_file decrypts the encrypted file to the destination.
async fn export_encrypted_file_by_range(&self, task_id: &str, to: &Path, is_task: bool, range: Option<Range>) -> Result<()> { async fn export_encrypted_file(&self, task_id: &str, task_path: &Path, to: &Path, range: Option<Range>) -> Result<()> {
let task_path = if is_task {
self.get_task_path(task_id)
} else {
self.get_persistent_cache_task_path(task_id)
};
// source file // source file
// TODO: difference with OpenOption? let mut src_file = File::open(task_path).await.inspect_err(|err| {
let mut src_file = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err); error!("open {:?} failed: {}", task_path, err);
})?; })?;
@ -758,7 +812,6 @@ impl Content {
(Either::Right(src_file), 0) (Either::Right(src_file), 0)
}; };
// TODO: need buf?
let src_buf_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, src_file); let src_buf_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, src_file);
let key = self.key.as_ref().expect("should have key when encryption enabled"); let key = self.key.as_ref().expect("should have key when encryption enabled");
let decrypt_reader = DecryptReader::new( let decrypt_reader = DecryptReader::new(
@ -779,11 +832,10 @@ impl Content {
error!("open {:?} failed: {}", to, err); error!("open {:?} failed: {}", to, err);
})?; })?;
// ALT
// TODO: need buf?
let mut reader = decrypt_reader; let mut reader = decrypt_reader;
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, dest_file); let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, dest_file);
// timer
let start = Instant::now(); let start = Instant::now();
let length = io::copy(&mut reader, &mut writer).await.inspect_err(|err| { let length = io::copy(&mut reader, &mut writer).await.inspect_err(|err| {
@ -794,22 +846,14 @@ impl Content {
error!("flush {:?} failed: {}", task_path, err); error!("flush {:?} failed: {}", task_path, err);
})?; })?;
// timer
let duration = start.elapsed(); let duration = start.elapsed();
let task_type = if is_task {
"Task"
} else {
"PersistentCacheTask"
};
info!( info!(
"{} copy decrypted {} B = {} KB = {} MB to {:?}, costing {:?}", "copy decrypted {} B = {} KB = {} MB to {:?}, cost {:?}",
task_type, length, length / 1024, length / 1024 / 1024, to, duration length, length / 1024, length / 1024 / 1024, to, duration
); );
// ALT
// let writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, dest_file);
// let _len = file_copy(decrypt_reader, writer, self.config.storage.write_buffer_size).await?;
Ok(()) Ok(())
} }
} }
@ -827,39 +871,6 @@ pub fn calculate_piece_range(offset: u64, length: u64, range: Option<Range>) ->
} }
} }
// TODO: better then tokio::io::copy?
pub async fn file_copy<R, W>(mut reader: R, mut writer: W, buf_size: usize) -> Result<u64>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
let start = Instant::now();
// const BUF_SIZE: usize = 8;
// let mut buf = vec![0u8; BUF_SIZE * 1024 * 1024];
let mut buf = vec![0u8; buf_size];
let mut total = 0;
loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
break;
}
writer.write_all(&buf[..n]).await?;
total += n as u64;
}
writer.flush().await.unwrap();
let elapsed = start.elapsed();
let total_kb = total / 1024;
let total_mb = total / 1024 / 1024;
// let total_mb = total as f64 / (1024.0 * 1024.0);
info!("diy file copy {}B = {}KB = {}MB, costing {:?}", total, total_kb, total_mb, elapsed);
Ok(total)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;