diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 27ff19d0..62a94297 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -275,14 +275,14 @@ impl Storage { #[instrument(skip_all)] pub async fn download_piece_started( &self, - task_id: &str, + piece_id: &str, number: u32, ) -> Result { // Wait for the piece to be finished. - match self.wait_for_piece_finished(task_id, number).await { + match self.wait_for_piece_finished(piece_id).await { Ok(piece) => Ok(piece), // If piece is not found or wait timeout, create piece metadata. - Err(_) => self.metadata.download_piece_started(task_id, number), + Err(_) => self.metadata.download_piece_started(piece_id, number), } } @@ -290,8 +290,8 @@ impl Storage { #[instrument(skip_all)] pub async fn download_piece_from_source_finished( &self, + piece_id: &str, task_id: &str, - number: u32, offset: u64, length: u64, reader: &mut R, @@ -300,8 +300,7 @@ impl Storage { let digest = Digest::new(Algorithm::Crc32, response.hash); self.metadata.download_piece_finished( - task_id, - number, + piece_id, offset, length, digest.to_string().as_str(), @@ -313,8 +312,8 @@ impl Storage { #[instrument(skip_all)] pub async fn download_piece_from_remote_peer_finished( &self, + piece_id: &str, task_id: &str, - number: u32, offset: u64, expected_digest: &str, parent_id: &str, @@ -333,8 +332,7 @@ impl Storage { } self.metadata.download_piece_finished( - task_id, - number, + piece_id, offset, length, digest.to_string().as_str(), @@ -344,8 +342,8 @@ impl Storage { /// download_piece_failed updates the metadata of the piece when the piece downloads failed. #[instrument(skip_all)] - pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> { - self.metadata.download_piece_failed(task_id, number) + pub fn download_piece_failed(&self, piece_id: &str) -> Result<()> { + self.metadata.download_piece_failed(piece_id) } /// upload_piece updates the metadata of the piece and @@ -353,25 +351,25 @@ impl Storage { #[instrument(skip_all)] pub async fn upload_piece( &self, + piece_id: &str, task_id: &str, - number: u32, range: Option, ) -> Result { // Wait for the piece to be finished. - self.wait_for_piece_finished(task_id, number).await?; + self.wait_for_piece_finished(piece_id).await?; // Start uploading the task. self.metadata.upload_task_started(task_id)?; // Start uploading the piece. - if let Err(err) = self.metadata.upload_piece_started(task_id, number) { + if let Err(err) = self.metadata.upload_piece_started(piece_id) { // Failed uploading the task. self.metadata.upload_task_failed(task_id)?; return Err(err); } // Get the piece metadata and return the content of the piece. - match self.metadata.get_piece(task_id, number) { + match self.metadata.get_piece(piece_id) { Ok(Some(piece)) => { match self .content @@ -383,7 +381,7 @@ impl Storage { self.metadata.upload_task_finished(task_id)?; // Finish uploading the piece. - self.metadata.upload_piece_finished(task_id, number)?; + self.metadata.upload_piece_finished(piece_id)?; Ok(reader) } Err(err) => { @@ -391,7 +389,7 @@ impl Storage { self.metadata.upload_task_failed(task_id)?; // Failed uploading the piece. - self.metadata.upload_piece_failed(task_id, number)?; + self.metadata.upload_piece_failed(piece_id)?; Err(err) } } @@ -401,15 +399,15 @@ impl Storage { self.metadata.upload_task_failed(task_id)?; // Failed uploading the piece. - self.metadata.upload_piece_failed(task_id, number)?; - Err(Error::PieceNotFound(self.piece_id(task_id, number))) + self.metadata.upload_piece_failed(piece_id)?; + Err(Error::PieceNotFound(piece_id.to_string())) } Err(err) => { // Failed uploading the task. self.metadata.upload_task_failed(task_id)?; // Failed uploading the piece. - self.metadata.upload_piece_failed(task_id, number)?; + self.metadata.upload_piece_failed(piece_id)?; Err(err) } } @@ -417,8 +415,8 @@ impl Storage { /// get_piece returns the piece metadata. #[instrument(skip_all)] - pub fn get_piece(&self, task_id: &str, number: u32) -> Result> { - self.metadata.get_piece(task_id, number) + pub fn get_piece(&self, piece_id: &str) -> Result> { + self.metadata.get_piece(piece_id) } /// get_pieces returns the piece metadatas. @@ -434,7 +432,7 @@ impl Storage { /// wait_for_piece_finished waits for the piece to be finished. #[instrument(skip_all)] - async fn wait_for_piece_finished(&self, task_id: &str, number: u32) -> Result { + async fn wait_for_piece_finished(&self, piece_id: &str) -> Result { // Initialize the timeout of piece. let piece_timeout = tokio::time::sleep(self.config.download.piece_timeout); tokio::pin!(piece_timeout); @@ -446,8 +444,8 @@ impl Storage { tokio::select! { _ = interval.tick() => { let piece = self - .get_piece(task_id, number)? - .ok_or_else(|| Error::PieceNotFound(self.piece_id(task_id, number)))?; + .get_piece(piece_id)? + .ok_or_else(|| Error::PieceNotFound(piece_id.to_string()))?; // If the piece is finished, return. if piece.is_finished() { @@ -461,8 +459,8 @@ impl Storage { wait_for_piece_count += 1; } _ = &mut piece_timeout => { - self.metadata.wait_for_piece_finished_failed(task_id, number).unwrap_or_else(|err| error!("delete piece metadata failed: {}", err)); - return Err(Error::WaitForPieceFinishedTimeout(self.piece_id(task_id, number))); + self.metadata.wait_for_piece_finished_failed(piece_id).unwrap_or_else(|err| error!("delete piece metadata failed: {}", err)); + return Err(Error::WaitForPieceFinishedTimeout(piece_id.to_string())); } } } diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index 04982adc..7daf41fe 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -699,7 +699,7 @@ impl Metadata { /// download_piece_started updates the metadata of the piece when the piece downloads started. #[instrument(skip_all)] - pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result { + pub fn download_piece_started(&self, piece_id: &str, number: u32) -> Result { // Construct the piece metadata. let piece = Piece { number, @@ -709,8 +709,7 @@ impl Metadata { }; // Put the piece metadata. - self.db - .put(self.piece_id(task_id, number).as_bytes(), &piece)?; + self.db.put(piece_id.as_bytes(), &piece)?; Ok(piece) } @@ -718,16 +717,13 @@ impl Metadata { #[instrument(skip_all)] pub fn download_piece_finished( &self, - task_id: &str, - number: u32, + piece_id: &str, offset: u64, length: u64, digest: &str, parent_id: Option, ) -> Result { - // Get the piece id. - let id = self.piece_id(task_id, number); - let piece = match self.db.get::(id.as_bytes())? { + let piece = match self.db.get::(piece_id.as_bytes())? { Some(mut piece) => { piece.offset = offset; piece.length = length; @@ -737,84 +733,78 @@ impl Metadata { piece.finished_at = Some(Utc::now().naive_utc()); piece } - None => return Err(Error::PieceNotFound(id)), + None => return Err(Error::PieceNotFound(piece_id.to_string())), }; - self.db.put(id.as_bytes(), &piece)?; + self.db.put(piece_id.as_bytes(), &piece)?; Ok(piece) } /// download_piece_failed updates the metadata of the piece when the piece downloads failed. #[instrument(skip_all)] - pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> { - self.delete_piece(task_id, number) + pub fn download_piece_failed(&self, piece_id: &str) -> Result<()> { + self.delete_piece(piece_id) } /// wait_for_piece_finished_failed waits for the piece to be finished or failed. #[instrument(skip_all)] - pub fn wait_for_piece_finished_failed(&self, task_id: &str, number: u32) -> Result<()> { - self.delete_piece(task_id, number) + pub fn wait_for_piece_finished_failed(&self, piece_id: &str) -> Result<()> { + self.delete_piece(piece_id) } /// upload_piece_started updates the metadata of the piece when piece uploads started. #[instrument(skip_all)] - pub fn upload_piece_started(&self, task_id: &str, number: u32) -> Result { - // Get the piece id. - let id = self.piece_id(task_id, number); - let piece = match self.db.get::(id.as_bytes())? { + pub fn upload_piece_started(&self, piece_id: &str) -> Result { + let piece = match self.db.get::(piece_id.as_bytes())? { Some(mut piece) => { piece.uploading_count += 1; piece.updated_at = Utc::now().naive_utc(); piece } - None => return Err(Error::PieceNotFound(id)), + None => return Err(Error::PieceNotFound(piece_id.to_string())), }; - self.db.put(id.as_bytes(), &piece)?; + self.db.put(piece_id.as_bytes(), &piece)?; Ok(piece) } /// upload_piece_finished updates the metadata of the piece when piece uploads finished. #[instrument(skip_all)] - pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result { - // Get the piece id. - let id = self.piece_id(task_id, number); - let piece = match self.db.get::(id.as_bytes())? { + pub fn upload_piece_finished(&self, piece_id: &str) -> Result { + let piece = match self.db.get::(piece_id.as_bytes())? { Some(mut piece) => { piece.uploading_count -= 1; piece.uploaded_count += 1; piece.updated_at = Utc::now().naive_utc(); piece } - None => return Err(Error::PieceNotFound(id)), + None => return Err(Error::PieceNotFound(piece_id.to_string())), }; - self.db.put(id.as_bytes(), &piece)?; + self.db.put(piece_id.as_bytes(), &piece)?; Ok(piece) } /// upload_piece_failed updates the metadata of the piece when the piece uploads failed. #[instrument(skip_all)] - pub fn upload_piece_failed(&self, task_id: &str, number: u32) -> Result { - // Get the piece id. - let id = self.piece_id(task_id, number); - let piece = match self.db.get::(id.as_bytes())? { + pub fn upload_piece_failed(&self, piece_id: &str) -> Result { + let piece = match self.db.get::(piece_id.as_bytes())? { Some(mut piece) => { piece.uploading_count -= 1; piece.updated_at = Utc::now().naive_utc(); piece } - None => return Err(Error::PieceNotFound(id)), + None => return Err(Error::PieceNotFound(piece_id.to_string())), }; - self.db.put(id.as_bytes(), &piece)?; + self.db.put(piece_id.as_bytes(), &piece)?; Ok(piece) } /// get_piece gets the piece metadata. #[instrument(skip_all)] - pub fn get_piece(&self, task_id: &str, number: u32) -> Result> { - self.db.get(self.piece_id(task_id, number).as_bytes()) + pub fn get_piece(&self, piece_id: &str) -> Result> { + self.db.get(piece_id.as_bytes()) } /// get_pieces gets the piece metadatas. @@ -836,10 +826,9 @@ impl Metadata { /// delete_piece deletes the piece metadata. #[instrument(skip_all)] - pub fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> { - info!("delete piece metadata {}", self.piece_id(task_id, number)); - self.db - .delete::(self.piece_id(task_id, number).as_bytes()) + pub fn delete_piece(&self, piece_id: &str) -> Result<()> { + info!("delete piece metadata {}", piece_id); + self.db.delete::(piece_id.as_bytes()) } /// delete_pieces deletes the piece metadatas. @@ -950,20 +939,20 @@ mod tests { // Test upload_task_started. metadata.upload_task_started(task_id).unwrap(); let task = metadata.get_task(task_id).unwrap().unwrap(); - assert_eq!(task.uploading_count, 1,); + assert_eq!(task.uploading_count, 1); // Test upload_task_finished. metadata.upload_task_finished(task_id).unwrap(); let task = metadata.get_task(task_id).unwrap().unwrap(); - assert_eq!(task.uploading_count, 0,); - assert_eq!(task.uploaded_count, 1,); + assert_eq!(task.uploading_count, 0); + assert_eq!(task.uploaded_count, 1); // Test upload_task_failed. let task = metadata.upload_task_started(task_id).unwrap(); assert_eq!(task.uploading_count, 1); let task = metadata.upload_task_failed(task_id).unwrap(); - assert_eq!(task.uploading_count, 0,); - assert_eq!(task.uploaded_count, 1,); + assert_eq!(task.uploading_count, 0); + assert_eq!(task.uploaded_count, 1); // Test get_tasks. let task_id = "a535b115f18d96870f0422ac891f91dd162f2f391e4778fb84279701fcd02dd1"; @@ -985,49 +974,64 @@ mod tests { let log_dir = dir.path().join("log"); let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap(); let task_id = "d3c4e940ad06c47fc36ac67801e6f8e36cb400e2391708620bc7e865b102062c"; + let piece_id = metadata.piece_id(task_id, 1); // Test download_piece_started. - metadata.download_piece_started(task_id, 1).unwrap(); - let piece = metadata.get_piece(task_id, 1).unwrap().unwrap(); - assert_eq!(piece.number, 1,); + metadata + .download_piece_started(piece_id.as_str(), 1) + .unwrap(); + let piece = metadata.get_piece(piece_id.as_str()).unwrap().unwrap(); + assert_eq!(piece.number, 1); // Test download_piece_finished. metadata - .download_piece_finished(task_id, 1, 0, 1024, "digest1", None) + .download_piece_finished(piece_id.as_str(), 0, 1024, "digest1", None) .unwrap(); - let piece = metadata.get_piece(task_id, 1).unwrap().unwrap(); - assert_eq!(piece.length, 1024,); - assert_eq!(piece.digest, "digest1",); + let piece = metadata.get_piece(piece_id.as_str()).unwrap().unwrap(); + assert_eq!(piece.length, 1024); + assert_eq!(piece.digest, "digest1"); // Test get_pieces. - metadata.download_piece_started(task_id, 2).unwrap(); - metadata.download_piece_started(task_id, 3).unwrap(); + metadata + .download_piece_started(metadata.piece_id(task_id, 2).as_str(), 2) + .unwrap(); + metadata + .download_piece_started(metadata.piece_id(task_id, 3).as_str(), 3) + .unwrap(); let pieces = metadata.get_pieces(task_id).unwrap(); - assert_eq!(pieces.len(), 3, "should get 3 pieces in total"); + assert_eq!(pieces.len(), 3); // Test download_piece_failed. - metadata.download_piece_started(task_id, 2).unwrap(); - metadata.download_piece_started(task_id, 3).unwrap(); - metadata.download_piece_failed(task_id, 2).unwrap(); - let piece = metadata.get_piece(task_id, 2).unwrap(); + let piece_id = metadata.piece_id(task_id, 2); + metadata + .download_piece_started(piece_id.as_str(), 2) + .unwrap(); + metadata + .download_piece_started(metadata.piece_id(task_id, 3).as_str(), 3) + .unwrap(); + metadata.download_piece_failed(piece_id.as_str()).unwrap(); + let piece = metadata.get_piece(piece_id.as_str()).unwrap(); assert!(piece.is_none()); // Test upload_piece_started. - metadata.upload_piece_started(task_id, 3).unwrap(); - let piece = metadata.get_piece(task_id, 3).unwrap().unwrap(); - assert_eq!(piece.uploading_count, 1,); + let piece_id = metadata.piece_id(task_id, 3); + metadata.upload_piece_started(piece_id.as_str()).unwrap(); + let piece = metadata.get_piece(piece_id.as_str()).unwrap().unwrap(); + assert_eq!(piece.uploading_count, 1); // Test upload_piece_finished. - metadata.upload_piece_finished(task_id, 3).unwrap(); - let piece = metadata.get_piece(task_id, 3).unwrap().unwrap(); - assert_eq!(piece.uploading_count, 0,); - assert_eq!(piece.uploaded_count, 1,); + let piece_id = metadata.piece_id(task_id, 3); + metadata.upload_piece_finished(piece_id.as_str()).unwrap(); + let piece = metadata.get_piece(piece_id.as_str()).unwrap().unwrap(); + assert_eq!(piece.uploading_count, 0); + assert_eq!(piece.uploaded_count, 1); // Test upload_piece_failed. - metadata.upload_piece_started(task_id, 3).unwrap(); - metadata.upload_piece_failed(task_id, 3).unwrap(); - let piece = metadata.get_piece(task_id, 3).unwrap().unwrap(); - assert_eq!(piece.uploading_count, 0,); + let piece_id = metadata.piece_id(task_id, 3); + metadata.upload_piece_started(piece_id.as_str()).unwrap(); + metadata.upload_piece_failed(piece_id.as_str()).unwrap(); + let piece = metadata.get_piece(piece_id.as_str()).unwrap().unwrap(); + assert_eq!(piece.uploading_count, 0); // Test delete_pieces. metadata.delete_pieces(task_id).unwrap(); diff --git a/dragonfly-client-storage/src/storage_engine/rocksdb.rs b/dragonfly-client-storage/src/storage_engine/rocksdb.rs index af68d8ed..5c379f8d 100644 --- a/dragonfly-client-storage/src/storage_engine/rocksdb.rs +++ b/dragonfly-client-storage/src/storage_engine/rocksdb.rs @@ -54,11 +54,11 @@ impl RocksdbStorageEngine { // DEFAULT_MAX_BACKGROUND_JOBS is the default max background jobs for rocksdb, default is 2. const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 2; - /// DEFAULT_BLOCK_SIZE is the default block size for rocksdb, default is 128KB. - const DEFAULT_BLOCK_SIZE: usize = 128 * 1024; + /// DEFAULT_BLOCK_SIZE is the default block size for rocksdb, default is 64KB. + const DEFAULT_BLOCK_SIZE: usize = 64 * 1024; - /// DEFAULT_CACHE_SIZE is the default cache size for rocksdb, default is 512MB. - const DEFAULT_CACHE_SIZE: usize = 512 * 1024 * 1024; + /// DEFAULT_CACHE_SIZE is the default cache size for rocksdb, default is 1GB. + const DEFAULT_CACHE_SIZE: usize = 1024 * 1024 * 1024; /// DEFAULT_LOG_MAX_SIZE is the default max log size for rocksdb, default is 64MB. const DEFAULT_LOG_MAX_SIZE: usize = 64 * 1024 * 1024; @@ -74,16 +74,21 @@ impl RocksdbStorageEngine { let mut options = rocksdb::Options::default(); options.create_if_missing(true); options.create_missing_column_families(true); - options.increase_parallelism(num_cpus::get() as i32); + + // Optimize compression. options.set_compression_type(rocksdb::DBCompressionType::Lz4); + options.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); + + // Improved parallelism. + options.increase_parallelism(num_cpus::get() as i32); options.set_max_background_jobs(std::cmp::max( - num_cpus::get() as i32 / 2, + num_cpus::get() as i32, Self::DEFAULT_MAX_BACKGROUND_JOBS, )); // Set rocksdb log options. options.set_db_log_dir(log_dir); - options.set_log_level(rocksdb::LogLevel::Debug); + options.set_log_level(rocksdb::LogLevel::Info); options.set_max_log_file_size(Self::DEFAULT_LOG_MAX_SIZE); options.set_keep_log_file_num(Self::DEFAULT_LOG_MAX_FILES); @@ -91,12 +96,14 @@ impl RocksdbStorageEngine { let mut block_options = rocksdb::BlockBasedOptions::default(); block_options.set_block_cache(&rocksdb::Cache::new_lru_cache(Self::DEFAULT_CACHE_SIZE)); block_options.set_block_size(Self::DEFAULT_BLOCK_SIZE); + block_options.set_cache_index_and_filter_blocks(true); + block_options.set_pin_l0_filter_and_index_blocks_in_cache(true); options.set_block_based_table_factory(&block_options); // Initialize column family options. let mut cf_options = rocksdb::Options::default(); cf_options.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(64)); - cf_options.set_memtable_prefix_bloom_ratio(0.2); + cf_options.set_memtable_prefix_bloom_ratio(0.25); cf_options.optimize_level_style_compaction(Self::DEFAULT_MEMTABLE_MEMORY_BUDGET); // Initialize column families. diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index 7f4a95e4..02350fef 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -652,10 +652,12 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { let mut has_started_piece = false; let mut finished_piece_numbers = Vec::new(); for interested_piece_number in interested_piece_numbers.iter() { - let piece = match task_manager - .piece - .get(task_id.as_str(), *interested_piece_number) - { + let piece = match task_manager.piece.get( + task_manager + .piece + .id(task_id.as_str(), *interested_piece_number) + .as_str(), + ) { Ok(Some(piece)) => piece, Ok(None) => continue, Err(err) => { @@ -765,17 +767,20 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { // Get the interested piece number from the request. let piece_number = request.piece_number; + // Generate the piece id. + let piece_id = self.task.piece.id(task_id.as_str(), piece_number); + // Span record the host id, task id and piece number. Span::current().record("host_id", host_id.as_str()); Span::current().record("remote_host_id", remote_host_id.as_str()); Span::current().record("task_id", task_id.as_str()); - Span::current().record("piece_id", format!("{}-{}", task_id, piece_number).as_str()); + Span::current().record("piece_id", piece_id.as_str()); // Get the piece metadata from the local storage. let piece = self .task .piece - .get(task_id.as_str(), piece_number) + .get(piece_id.as_str()) .map_err(|err| { error!("upload piece metadata from local storage: {}", err); Status::internal(err.to_string()) @@ -794,8 +799,8 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { .task .piece .upload_from_local_peer_into_async_read( + piece_id.as_str(), task_id.as_str(), - piece_number, piece.length, None, false, diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index e8e9c555..f9e1a166 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -740,8 +740,10 @@ async fn proxy_by_dfdaemon( let piece_reader = match task .piece .download_from_local_peer_into_async_read( + task.piece + .id(message.task_id.as_str(), piece.number) + .as_str(), message.task_id.as_str(), - piece.number, piece.length, download_task_started_response.range, true, diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index 41871504..c6013297 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -841,14 +841,16 @@ impl PersistentCacheTask { // Limit the concurrent download count. let _permit = semaphore.acquire().await.unwrap(); + let piece_id = storage.piece_id(task_id.as_str(), number); info!( "start to download piece {} from remote peer {:?}", - storage.piece_id(task_id.as_str(), number), + piece_id, parent.id.clone() ); let metadata = piece_manager .download_from_remote_peer( + peer_id.as_str(), host_id.as_str(), task_id.as_str(), number, @@ -859,7 +861,7 @@ impl PersistentCacheTask { .map_err(|err| { error!( "download piece {} from remote peer {:?} error: {:?}", - storage.piece_id(task_id.as_str(), number), + piece_id, parent.id.clone(), err ); @@ -930,8 +932,7 @@ impl PersistentCacheTask { info!( "finished piece {} from remote peer {:?}", - storage.piece_id(task_id.as_str(), metadata.number), - metadata.parent_id + piece_id, metadata.parent_id ); Ok(metadata) @@ -1029,24 +1030,19 @@ impl PersistentCacheTask { // Download the piece from the local peer. for interested_piece in interested_pieces { + let piece_id = self + .storage + .piece_id(task.id.as_str(), interested_piece.number); + // Get the piece metadata from the local storage. - let piece = match self.piece.get(task.id.as_str(), interested_piece.number) { + let piece = match self.piece.get(piece_id.as_str()) { Ok(Some(piece)) => piece, Ok(None) => { - info!( - "piece {} not found in local storage", - self.storage - .piece_id(task.id.as_str(), interested_piece.number) - ); + info!("piece {} not found in local storage", piece_id); continue; } Err(err) => { - error!( - "get piece {} from local storage error: {:?}", - self.storage - .piece_id(task.id.as_str(), interested_piece.number), - err - ); + error!("get piece {} from local storage error: {:?}", piece_id, err); continue; } }; @@ -1054,10 +1050,7 @@ impl PersistentCacheTask { // Fake the download from the local peer. self.piece .download_from_local_peer(task.id.as_str(), piece.length); - info!( - "finished piece {} from local peer", - self.storage.piece_id(task.id.as_str(), piece.number) - ); + info!("finished piece {} from local peer", piece_id); // Construct the piece. let piece = Piece { diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 07390ba7..f504de2a 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -110,10 +110,16 @@ impl Piece { } } + /// id generates a new piece id. + #[instrument(skip_all)] + pub fn id(&self, task_id: &str, number: u32) -> String { + self.storage.piece_id(task_id, number) + } + /// get gets a piece from the local storage. #[instrument(skip_all)] - pub fn get(&self, task_id: &str, number: u32) -> Result> { - self.storage.get_piece(task_id, number) + pub fn get(&self, piece_id: &str) -> Result> { + self.storage.get_piece(piece_id) } /// calculate_interested calculates the interested pieces by content_length and range. @@ -302,14 +308,14 @@ impl Piece { #[instrument(skip_all, fields(piece_id))] pub async fn upload_from_local_peer_into_async_read( &self, + piece_id: &str, task_id: &str, - number: u32, length: u64, range: Option, disable_rate_limit: bool, ) -> Result { // Span record the piece_id. - Span::current().record("piece_id", self.storage.piece_id(task_id, number)); + Span::current().record("piece_id", piece_id); // Acquire the upload rate limiter. if !disable_rate_limit { @@ -318,7 +324,7 @@ impl Piece { // Upload the piece content. self.storage - .upload_piece(task_id, number, range) + .upload_piece(piece_id, task_id, range) .await .map(|reader| { collect_upload_piece_traffic_metrics( @@ -333,14 +339,14 @@ impl Piece { #[instrument(skip_all, fields(piece_id))] pub async fn download_from_local_peer_into_async_read( &self, + piece_id: &str, task_id: &str, - number: u32, length: u64, range: Option, disable_rate_limit: bool, ) -> Result { // Span record the piece_id. - Span::current().record("piece_id", self.storage.piece_id(task_id, number)); + Span::current().record("piece_id", piece_id); // Acquire the download rate limiter. if !disable_rate_limit { @@ -348,7 +354,7 @@ impl Piece { } // Upload the piece content. - self.storage.upload_piece(task_id, number, range).await + self.storage.upload_piece(piece_id, task_id, range).await } /// download_from_local_peer downloads a single piece from a local peer. Fake the download piece @@ -366,6 +372,7 @@ impl Piece { #[instrument(skip_all, fields(piece_id))] pub async fn download_from_remote_peer( &self, + piece_id: &str, host_id: &str, task_id: &str, number: u32, @@ -373,13 +380,16 @@ impl Piece { parent: piece_collector::CollectedParent, ) -> Result { // Span record the piece_id. - Span::current().record("piece_id", self.storage.piece_id(task_id, number)); + Span::current().record("piece_id", piece_id); // Acquire the download rate limiter. self.download_rate_limiter.acquire(length as usize).await; // Record the start of downloading piece. - let piece = self.storage.download_piece_started(task_id, number).await?; + let piece = self + .storage + .download_piece_started(piece_id, number) + .await?; // If the piece is downloaded by the other thread, // return the piece directly. @@ -390,7 +400,7 @@ impl Piece { // Create a dfdaemon client. let host = parent.host.clone().ok_or_else(|| { error!("peer host is empty"); - if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; @@ -406,7 +416,7 @@ impl Piece { "create dfdaemon upload client from {}:{} failed: {}", host.ip, host.port, err ); - if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; @@ -426,7 +436,7 @@ impl Piece { .await .map_err(|err| { error!("download piece failed: {}", err); - if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; @@ -435,7 +445,7 @@ impl Piece { let piece = response.piece.ok_or_else(|| { error!("piece is empty"); - if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; @@ -445,7 +455,7 @@ impl Piece { // Get the piece content. let content = piece.content.ok_or_else(|| { error!("piece content is empty"); - if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; @@ -455,8 +465,8 @@ impl Piece { // Record the finish of downloading piece. self.storage .download_piece_from_remote_peer_finished( + piece_id, task_id, - number, piece.offset, piece.digest.as_str(), parent.id.as_str(), @@ -467,7 +477,7 @@ impl Piece { // Record the failure of downloading piece, // If storage fails to record piece. error!("download piece finished: {}", err); - if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; @@ -475,7 +485,7 @@ impl Piece { })?; self.storage - .get_piece(task_id, number)? + .get_piece(piece_id)? .ok_or_else(|| { error!("piece not found"); Error::PieceNotFound(number.to_string()) @@ -495,6 +505,7 @@ impl Piece { #[instrument(skip_all, fields(piece_id))] pub async fn download_from_source( &self, + piece_id: &str, task_id: &str, number: u32, url: &str, @@ -504,13 +515,16 @@ impl Piece { object_storage: Option, ) -> Result { // Span record the piece_id. - Span::current().record("piece_id", self.storage.piece_id(task_id, number)); + Span::current().record("piece_id", piece_id); // Acquire the download rate limiter. self.download_rate_limiter.acquire(length as usize).await; // Record the start of downloading piece. - let piece = self.storage.download_piece_started(task_id, number).await?; + let piece = self + .storage + .download_piece_started(piece_id, number) + .await?; // If the piece is downloaded by the other thread, // return the piece directly. @@ -530,7 +544,7 @@ impl Piece { // Download the piece from the source. let backend = self.backend_factory.build(url).map_err(|err| { error!("build backend failed: {}", err); - if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; @@ -548,7 +562,7 @@ impl Piece { let mut response = backend .get(GetRequest { task_id: task_id.to_string(), - piece_id: self.storage.piece_id(task_id, number), + piece_id: piece_id.to_string(), url: url.to_string(), range: Some(Range { start: offset, @@ -569,7 +583,7 @@ impl Piece { // if the request is failed. error!("backend get failed: {}", err); - if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; @@ -594,7 +608,7 @@ impl Piece { let error_message = response.error_message.unwrap_or_default(); error!("backend get failed: {} {}", error_message, buffer.as_str()); - self.storage.download_piece_failed(task_id, number)?; + self.storage.download_piece_failed(piece_id)?; return Err(Error::BackendError(BackendError { message: error_message, status_code: Some(response.http_status_code.unwrap_or_default()), @@ -612,8 +626,8 @@ impl Piece { // Record the finish of downloading piece. self.storage .download_piece_from_source_finished( + piece_id, task_id, - number, offset, length, &mut response.reader, @@ -623,7 +637,7 @@ impl Piece { // Record the failure of downloading piece, // If storage fails to record piece. error!("download piece finished: {}", err); - if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; @@ -631,7 +645,7 @@ impl Piece { })?; self.storage - .get_piece(task_id, number)? + .get_piece(piece_id)? .ok_or_else(|| { error!("piece not found"); Error::PieceNotFound(number.to_string()) diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 485ea5e3..322ee55c 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -964,14 +964,16 @@ impl Task { // Limit the concurrent piece count. let _permit = semaphore.acquire().await.unwrap(); + let piece_id = storage.piece_id(task_id.as_str(), number); info!( "start to download piece {} from remote peer {:?}", - storage.piece_id(task_id.as_str(), number), + piece_id, parent.id.clone() ); let metadata = piece_manager .download_from_remote_peer( + piece_id.as_str(), host_id.as_str(), task_id.as_str(), number, @@ -982,7 +984,7 @@ impl Task { .map_err(|err| { error!( "download piece {} from remote peer {:?} error: {:?}", - storage.piece_id(task_id.as_str(), number), + piece_id, parent.id.clone(), err ); @@ -1026,8 +1028,7 @@ impl Task { .map_err(|err| { error!( "send DownloadPieceFinishedRequest for piece {} failed: {:?}", - storage.piece_id(task_id.as_str(), number), - err + piece_id, err ); interrupt.store(true, Ordering::SeqCst); err @@ -1054,8 +1055,7 @@ impl Task { .map_err(|err| { error!( "send DownloadPieceFinishedResponse for piece {} failed: {:?}", - storage.piece_id(task_id.as_str(), number), - err + piece_id, err ); interrupt.store(true, Ordering::SeqCst); err @@ -1063,8 +1063,7 @@ impl Task { info!( "finished piece {} from remote peer {:?}", - storage.piece_id(task_id.as_str(), metadata.number), - metadata.parent_id + piece_id, metadata.parent_id ); let mut finished_pieces = finished_pieces.lock().unwrap(); @@ -1206,13 +1205,12 @@ impl Task { // Limit the concurrent download count. let _permit = semaphore.acquire().await.unwrap(); - info!( - "start to download piece {} from source", - storage.piece_id(task_id.as_str(), number) - ); + let piece_id = storage.piece_id(task_id.as_str(), number); + info!("start to download piece {} from source", piece_id); let metadata = piece_manager .download_from_source( + piece_id.as_str(), task_id.as_str(), number, url.as_str(), @@ -1254,7 +1252,7 @@ impl Task { REQUEST_TIMEOUT, ) .await.map_err(|err| { - error!("send DownloadPieceBackToSourceFinishedRequest for piece {} failed: {:?}", storage.piece_id(task_id.as_str(), number), err); + error!("send DownloadPieceBackToSourceFinishedRequest for piece {} failed: {:?}", piece_id, err); err })?; @@ -1279,17 +1277,12 @@ impl Task { .map_err(|err| { error!( "send DownloadPieceFinishedResponse for piece {} failed: {:?}", - storage.piece_id(task_id.as_str(), number), - err + piece_id, err ); err })?; - info!( - "finished piece {} from source", - storage.piece_id(task_id.as_str(), piece.number) - ); - + info!("finished piece {} from source", piece_id); Ok(metadata) } @@ -1423,24 +1416,19 @@ impl Task { // Download the piece from the local peer. for interested_piece in interested_pieces { + let piece_id = self + .storage + .piece_id(task.id.as_str(), interested_piece.number); + // Get the piece metadata from the local storage. - let piece = match self.piece.get(task.id.as_str(), interested_piece.number) { + let piece = match self.piece.get(piece_id.as_str()) { Ok(Some(piece)) => piece, Ok(None) => { - info!( - "piece {} not found in local storage", - self.storage - .piece_id(task.id.as_str(), interested_piece.number) - ); + info!("piece {} not found in local storage", piece_id); continue; } Err(err) => { - error!( - "get piece {} from local storage error: {:?}", - self.storage - .piece_id(task.id.as_str(), interested_piece.number), - err - ); + error!("get piece {} from local storage error: {:?}", piece_id, err); continue; } }; @@ -1448,10 +1436,7 @@ impl Task { // Fake the download from the local peer. self.piece .download_from_local_peer(task.id.as_str(), piece.length); - info!( - "finished piece {} from local peer", - self.storage.piece_id(task.id.as_str(), piece.number) - ); + info!("finished piece {} from local peer", piece_id,); // Construct the piece. let piece = Piece { @@ -1487,8 +1472,7 @@ impl Task { .map_err(|err| { error!( "send DownloadPieceFinishedResponse for piece {} failed: {:?}", - self.storage.piece_id(task.id.as_str(), piece.number), - err + piece_id, err ); err })?; @@ -1545,13 +1529,12 @@ impl Task { // Limit the concurrent download count. let _permit = semaphore.acquire().await.unwrap(); - info!( - "start to download piece {} from source", - storage.piece_id(task_id.as_str(), number) - ); + let piece_id = storage.piece_id(task_id.as_str(), number); + info!("start to download piece {} from source", piece_id); let metadata = piece_manager .download_from_source( + piece_id.as_str(), task_id.as_str(), number, url.as_str(), @@ -1596,16 +1579,12 @@ impl Task { .map_err(|err| { error!( "send DownloadPieceFinishedResponse for piece {} failed: {:?}", - storage.piece_id(task_id.as_str(), metadata.number), - err + piece_id, err ); err })?; - info!( - "finished piece {} from source", - storage.piece_id(task_id.as_str(), metadata.number) - ); + info!("finished piece {} from source", piece_id); Ok(metadata) }