feat: add Task encryption

Signed-off-by: chohee <cc5281@126.com>
This commit is contained in:
chohee 2025-08-11 23:14:40 +08:00
parent 0e073e6367
commit ceee92f27e
15 changed files with 215 additions and 68 deletions

1
Cargo.lock generated
View File

@ -1011,6 +1011,7 @@ name = "dragonfly-client"
version = "1.0.24"
dependencies = [
"anyhow",
"base64 0.22.1",
"bytes",
"bytesize",
"chrono",

View File

@ -118,6 +118,7 @@ tonic-health = "0.12.3"
hashring = "0.3.6"
reqwest-tracing = "0.5"
mocktail = "0.3.0"
base64 = "0.22.1"
[profile.release]
opt-level = 3

View File

@ -31,10 +31,10 @@ bytes.workspace = true
bytesize.workspace = true
leaky-bucket.workspace = true
vortex-protocol.workspace = true
base64.workspace = true
num_cpus = "1.17"
bincode = "1.3.3"
walkdir = "2.5.0"
base64 = "0.22.1"
# encryption
# chacha20poly1305 = { version = "0.10", features = ["std"] }

View File

@ -216,6 +216,11 @@ impl Content {
/// 2.2. If the hard link fails, copy the task content to the destination once the task is finished, then return immediately.
#[instrument(skip_all)]
pub async fn hard_link_task(&self, task_id: &str, to: &Path) -> Result<()> {
// check
if self.config.storage.encryption.enable {
panic!("HARD LINK is not compatible with encryption");
}
let task_path = self.get_task_path(task_id);
if let Err(err) = fs::hard_link(task_path.clone(), to).await {
if err.kind() == std::io::ErrorKind::AlreadyExists {
@ -292,6 +297,8 @@ impl Content {
offset: u64,
length: u64,
range: Option<Range>,
// Added
piece_id: &str,
) -> Result<impl AsyncRead> {
let task_path = self.get_task_path(task_id);
@ -310,7 +317,19 @@ impl Content {
error!("seek {:?} failed: {}", task_path, err);
})?;
Ok(f_reader.take(target_length))
let limited_reader = f_reader.take(target_length);
if self.config.storage.encryption.enable {
let key = self.key.as_ref().expect("should have key when encryption enabled");
let decrypt_reader = DecryptReader::new(
limited_reader,
key,
piece_id
);
Ok(Either::Left(decrypt_reader))
} else {
Ok(Either::Right(limited_reader))
}
}
/// read_piece_with_dual_read return two readers, one is the range reader, and the other is the
@ -366,6 +385,8 @@ impl Content {
offset: u64,
expected_length: u64,
reader: &mut R,
// ADDED
piece_id: &str,
) -> Result<WritePieceResponse> {
// Open the file and seek to the offset.
let task_path = self.get_task_path(task_id);
@ -387,11 +408,24 @@ impl Content {
// Copy the piece to the file while updating the CRC32 value.
let mut hasher = crc32fast::Hasher::new();
let mut tee = InspectReader::new(reader, |bytes| {
let tee = InspectReader::new(reader, |bytes| {
hasher.update(bytes);
});
let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| {
let mut tee_wrapper = if self.config.storage.encryption.enable {
let key = self.key.as_ref().expect("should have key when encryption enabled");
let encrypt_reader = EncryptReader::new(
tee,
key,
piece_id
);
Either::Left(encrypt_reader)
} else {
Either::Right(tee)
};
let length = io::copy(&mut tee_wrapper, &mut writer).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;
@ -534,10 +568,11 @@ impl Content {
error!("seek {:?} failed: {}", task_path, err);
})?;
let limited_reader = f_reader.take(target_length);
if self.config.storage.encryption.enable {
let key = self.key.as_ref().expect("should have key when encryption enabled");
let limited_reader = f_reader.take(target_length);
let decrypt_reader = DecryptReader::new(
limited_reader,
key,
@ -547,7 +582,7 @@ impl Content {
return Ok(Either::Left(decrypt_reader));
}
Ok(Either::Right(f_reader.take(target_length)))
Ok(Either::Right(limited_reader))
}
// TODO encryption?
@ -633,7 +668,7 @@ impl Content {
hasher.update(bytes);
});
let mut tee_warpper = if self.config.storage.encryption.enable {
let mut tee_wrapper = if self.config.storage.encryption.enable {
let key = self.key.as_ref().expect("should have key when encryption enabled");
let encrypt_reader = EncryptReader::new(
@ -646,7 +681,7 @@ impl Content {
Either::Right(tee)
};
let length = io::copy(&mut tee_warpper, &mut writer).await.inspect_err(|err| {
let length = io::copy(&mut tee_wrapper, &mut writer).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;
@ -785,11 +820,11 @@ mod tests {
let data = b"hello, world!";
let mut reader = Cursor::new(data);
content
.write_piece(task_id, 0, 13, &mut reader)
.write_piece(task_id, 0, 13, &mut reader, "")
.await
.unwrap();
let mut reader = content.read_piece(task_id, 0, 13, None).await.unwrap();
let mut reader = content.read_piece(task_id, 0, 13, None, "").await.unwrap();
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await.unwrap();
assert_eq!(buffer, data);
@ -803,6 +838,7 @@ mod tests {
start: 0,
length: 5,
}),
"",
)
.await
.unwrap();
@ -823,7 +859,7 @@ mod tests {
let data = b"test";
let mut reader = Cursor::new(data);
let response = content
.write_piece(task_id, 0, 4, &mut reader)
.write_piece(task_id, 0, 4, &mut reader, "")
.await
.unwrap();
assert_eq!(response.length, 4);

View File

@ -2,11 +2,11 @@ use aes::Aes256;
use ctr::Ctr128BE;
use ctr::cipher::{StreamCipher, KeyIvInit};
use super::EncryptAlgo;
use super::EncryptionAlgorithm;
pub type Aes256Ctr = Ctr128BE<Aes256>;
impl EncryptAlgo for Aes256Ctr {
impl EncryptionAlgorithm for Aes256Ctr {
const NONCE_SIZE: usize = 16;
const KEY_SIZE: usize = 32;

View File

@ -2,9 +2,11 @@ mod aes_ctr;
pub use aes_ctr::Aes256Ctr;
pub trait EncryptAlgo {
const NONCE_SIZE: usize;
pub trait EncryptionAlgorithm {
/// Bytes of key
const KEY_SIZE: usize;
/// Bytes of nonce
const NONCE_SIZE: usize;
fn new(key: &[u8], nonce: &[u8]) -> Self;

View File

@ -3,9 +3,9 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::ReadBuf;
use crate::encrypt::{EncryptAlgo, Aes256Ctr};
use crate::encrypt::{EncryptionAlgorithm, Aes256Ctr};
pub struct EncryptReader<R, A: EncryptAlgo> {
pub struct EncryptReader<R: AsyncRead, A: EncryptionAlgorithm> {
inner: R,
cipher: A,
}
@ -21,7 +21,7 @@ fn parse_piece_id(piece_id: &str) -> Option<(&str, u32)> {
})
}
// impl<R, A: EncryptAlgo> EncryptReader<R, A> {
// impl<R, A: EncryptionAlgorithm> EncryptReader<R, A> {
// pub fn new(inner: R, key: &[u8], piece_id: &str) -> Self {
// let (task_id, piece_num) = parse_piece_id(piece_id)
// .expect("should have task_id and piece_num");
@ -33,22 +33,22 @@ fn parse_piece_id(piece_id: &str) -> Option<(&str, u32)> {
// }
// }
impl<R> EncryptReader<R, Aes256Ctr> {
impl<R: AsyncRead> EncryptReader<R, Aes256Ctr> {
/// default for Aes256Ctr
pub fn new(inner: R, key: &[u8], piece_id: &str) -> Self {
let (task_id, piece_num) = parse_piece_id(piece_id)
.expect("should have task_id and piece_num");
let nonce = Aes256Ctr::build_nonce(task_id, piece_num);
let cipher = <Aes256Ctr as EncryptAlgo>::new(key, &nonce);
let cipher = <Aes256Ctr as EncryptionAlgorithm>::new(key, &nonce);
Self { inner, cipher }
}
}
impl<R, A: EncryptAlgo> Unpin for EncryptReader<R, A> where R: Unpin {}
impl<R: AsyncRead, A: EncryptionAlgorithm> Unpin for EncryptReader<R, A> where R: Unpin {}
impl<R: AsyncRead + Unpin, A: EncryptAlgo> AsyncRead for EncryptReader<R, A> {
impl<R: AsyncRead + Unpin, A: EncryptionAlgorithm> AsyncRead for EncryptReader<R, A> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,

View File

@ -1,6 +1,6 @@
mod algorithm;
mod cryptor;
use algorithm::{EncryptAlgo, Aes256Ctr};
use algorithm::{EncryptionAlgorithm, Aes256Ctr};
pub use cryptor::{EncryptReader, DecryptReader};

View File

@ -25,7 +25,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::io::AsyncRead;
use tokio::time::sleep;
use tokio_util::either::Either;
use tokio_util::io::InspectReader;
@ -550,7 +550,7 @@ impl Storage {
) -> Result<metadata::Piece> {
let response = self
.content
.write_piece(task_id, offset, length, reader)
.write_piece(task_id, offset, length, reader, piece_id)
.await?;
let digest = Digest::new(Algorithm::Crc32, response.hash);
@ -602,7 +602,7 @@ impl Storage {
) -> Result<metadata::Piece> {
let response = self
.content
.write_piece(task_id, offset, length, reader)
.write_piece(task_id, offset, length, reader, piece_id)
.await?;
let length = response.length;
@ -669,7 +669,7 @@ impl Storage {
match self
.content
.read_piece(task_id, piece.offset, piece.length, range)
.read_piece(task_id, piece.offset, piece.length, range, piece_id)
.await
{
Ok(reader) => {

View File

@ -45,6 +45,7 @@ reqwest-tracing.workspace = true
reqwest-middleware.workspace = true
rustix = { version = "1.1.2", features = ["fs"] }
base64 = "0.22.1"
base64.workspace = true
pnet = "0.35.0"
protobuf = "3.7.2"

View File

@ -70,6 +70,7 @@ vortex-protocol.workspace = true
dashmap.workspace = true
tonic-health.workspace = true
hashring.workspace = true
base64.workspace = true
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] }
tracing-panic = "0.1.2"

View File

@ -42,6 +42,7 @@ use termion::{color, style};
use tokio::sync::mpsc;
use tokio::sync::Barrier;
use tracing::{error, info, Level};
use base64::{prelude::BASE64_STANDARD, Engine as _};
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
@ -166,21 +167,7 @@ async fn main() -> Result<(), anyhow::Error> {
// Get key from Manager
let key = if config.storage.encryption.enable {
let source_type = if config.seed_peer.enable {
SourceType::SeedPeerSource.into()
} else {
SourceType::PeerSource.into()
};
// Request a key from Manager
let key = manager_client.request_encryption_key(
RequestEncryptionKeyRequest {
source_type: source_type,
hostname: config.host.hostname.clone(),
ip: config.host.ip.unwrap().to_string(),
}
).await?;
info!("Key response: \n{:x?}", key);
let key = get_key_from_manager(&config, &manager_client).await;
Some(key)
} else {
None
@ -475,3 +462,26 @@ async fn main() -> Result<(), anyhow::Error> {
Ok(())
}
async fn get_key_from_manager(config: &dfdaemon::Config, client: &ManagerClient) -> Vec<u8>{
let source_type = if config.seed_peer.enable {
SourceType::SeedPeerSource.into()
} else {
SourceType::PeerSource.into()
};
// Request a key from Manager
let key = client.request_encryption_key(
RequestEncryptionKeyRequest {
source_type: source_type,
hostname: config.host.hostname.clone(),
ip: config.host.ip.unwrap().to_string(),
}
).await
.expect("Fail to get key from Manager");
let key_base64 = BASE64_STANDARD.encode(&key);
info!("Key response(base64): {} \n (hex): {:x?}", key_base64, key);
key
}

View File

@ -444,6 +444,10 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
.unwrap_or_else(|err| error!("send download progress error: {:?}", err));
}
// clone config for async
let config_clone = self.config.clone();
tokio::spawn(
async move {
match task_manager_clone
@ -483,8 +487,25 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
return;
}
// TODO: encryption is not compatible with hardlink
let encryption_enable = config_clone.storage.encryption.enable;
assert!(!encryption_enable || !download_clone.force_hard_link);
if let Some(output_path) = &download_clone.output_path {
if !download_clone.force_hard_link {
// if encryption is enabled, copy to path instead of hard link
if encryption_enable {
let output_path = Path::new(output_path.as_str());
if output_path.exists() {
return;
}
let _res = task_manager_clone.export_encryption_file(
task_clone.id.as_ref(),
output_path
)
.await
.map_err(|e| {error!("export encryption file: {}", e); return;})
.expect("export should success");
} else if !download_clone.force_hard_link {
let output_path = Path::new(output_path.as_str());
if output_path.exists() {
match task_manager_clone
@ -1031,6 +1052,9 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
.unwrap_or_else(|err| error!("send download progress error: {:?}", err));
}
// clone config for async
let config_clone = self.config.clone();
tokio::spawn(
async move {
match task_manager_clone
@ -1070,11 +1094,12 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
}
// TODO: encryption is not compatible with hardlink
assert!(!task_manager_clone.encrypted() || !request_clone.force_hard_link);
let encryption_enable = config_clone.storage.encryption.enable;
assert!(!encryption_enable || !request_clone.force_hard_link);
if let Some(output_path) = &request_clone.output_path {
// if encryption is enabled, copy to path instead of hard link
if task_manager_clone.encrypted() {
if encryption_enable {
let output_path = Path::new(output_path.as_str());
if output_path.exists() {
return;

View File

@ -900,14 +900,11 @@ impl PersistentCacheTask {
}
};
// // Remove the finished pieces from the pieces.
// let remaining_interested_pieces = self.piece.remove_finished_from_interested(
// finished_pieces.clone(),
// interested_pieces.clone(),
// );
// TODO: The remove function useless? Cause `finished_pieces` MUST be empty here
let remaining_interested_pieces = interested_pieces.clone();
// Remove the finished pieces from the pieces.
let remaining_interested_pieces = self.piece.remove_finished_from_interested(
finished_pieces.clone(),
interested_pieces.clone(),
);
// Download the pieces from the parent.
let partial_finished_pieces = match self
@ -1453,11 +1450,6 @@ impl PersistentCacheTask {
self.storage.delete_persistent_cache_task(task_id).await
}
/// whether the encrytion is enabled
pub fn encrypted(&self) -> bool {
self.config.storage.encryption.enable
}
/// export encryption file to path
pub async fn export_encryption_file(&self, task_id: &str, to: &Path) -> ClientResult<()> {
let task = self.storage.get_persistent_cache_task(task_id)?

View File

@ -57,7 +57,7 @@ use std::sync::{
Arc, Mutex,
};
use std::time::Instant;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::sync::{
mpsc::{self, Sender},
Semaphore,
@ -141,6 +141,12 @@ impl Task {
let task = self.storage.prepare_download_task_started(id).await?;
if task.content_length.is_some() && task.piece_length.is_some() {
// Omit HARD-LINK when use encryption
if self.config.storage.encryption.enable {
info!("Omit HARD-LINK when encryption is enabled");
return Ok(task);
}
// Attempt to create a hard link from the task file to the output path.
//
// Behavior based on force_hard_link setting:
@ -259,6 +265,12 @@ impl Task {
.download_task_started(id, piece_length, content_length, response.http_header)
.await;
// Omit HARD-LINK when use encryption
if self.config.storage.encryption.enable {
info!("Omit HARD-LINK when encryption is enabled");
return task;
}
// Attempt to create a hard link from the task file to the output path.
//
// Behavior based on force_hard_link setting:
@ -726,14 +738,11 @@ impl Task {
}
};
// // Remove the finished pieces from the pieces.
// let remaining_interested_pieces = self.piece.remove_finished_from_interested(
// finished_pieces.clone(),
// interested_pieces.clone(),
// );
// TODO: The remove function useless? Cause `finished_pieces` MUST be empty here
let remaining_interested_pieces = interested_pieces.clone();
// Remove the finished pieces from the pieces.
let remaining_interested_pieces = self.piece.remove_finished_from_interested(
finished_pieces.clone(),
interested_pieces.clone(),
);
// Download the pieces from the parent.
let partial_finished_pieces = match self
@ -1994,6 +2003,75 @@ impl Task {
}
}
}
/// export encryption file to path
pub async fn export_encryption_file(&self, task_id: &str, to: &Path) -> ClientResult<()> {
let task = self.storage.get_task(task_id)?
.ok_or_else(|| Error::TaskNotFound(task_id.to_string()))?;
if !task.is_finished() {
// TODO errors
return Err(Error::Unknown(format!("Task {} is not finished", task_id)));
}
// get all pieces
let mut pieces = self.storage.get_pieces(task_id)?;
// TODO: need sort?
pieces.sort_by_key(|p| p.number);
if pieces.is_empty() {
return Err(Error::Unknown(format!("No pieces found for task {}", task_id)));
}
info!("Exporting encrypted file for task {} with {} pieces to {:?}",
task_id, pieces.len(), to);
let output_file = tokio::fs::File::create(to).await
// .or_err(ErrorType::StorageError)?;
.map_err(|e| Error::Unknown(format!("Failed to create output file: {}", e)))?;
let mut writer = BufWriter::new(output_file);
// process every piece
for piece in pieces {
if !piece.is_finished() {
// warn!("Piece {} is not finished, skipping", piece.number);
// continue;
panic!("Piece {} is not finished", piece.number);
}
debug!("Processing piece {} (offset: {}, length: {})",
piece.number, piece.offset, piece.length);
// read and decrypt piece
let piece_id = self.piece.id(task_id, piece.number);
let mut reader = self.storage.upload_piece(
piece_id.as_str(),
task_id,
None, // read whole piece
).await?;
// write decrypted content to file
let bytes_copied = tokio::io::copy(&mut reader, &mut writer).await
// .or_err(ErrorType::StorageError)?;
.map_err(|e| Error::Unknown(format!("Failed to copy piece {}: {}", piece.number, e)))?;
if bytes_copied != piece.length {
return Err(Error::Unknown(format!(
"Piece {} length mismatch: expected {}, got {}",
piece.number, piece.length, bytes_copied
)));
}
debug!("Successfully processed piece {} ({} bytes)", piece.number, bytes_copied);
}
writer.flush().await
// .or_err(ErrorType::StorageError)?;
.map_err(|e| Error::Unknown(format!("Failed to flush output file: {}", e)))?;
info!("Successfully exported encrypted file for task {} to {:?}", task_id, to);
Ok(())
}
}
#[cfg(test)]