feat: optimize generater of the piece id (#839)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-11-12 23:53:54 +08:00 committed by GitHub
parent da033a8219
commit 6d27c92ac6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 208 additions and 206 deletions

View File

@ -275,14 +275,14 @@ impl Storage {
#[instrument(skip_all)]
pub async fn download_piece_started(
&self,
task_id: &str,
piece_id: &str,
number: u32,
) -> Result<metadata::Piece> {
// Wait for the piece to be finished.
match self.wait_for_piece_finished(task_id, number).await {
match self.wait_for_piece_finished(piece_id).await {
Ok(piece) => Ok(piece),
// If piece is not found or wait timeout, create piece metadata.
Err(_) => self.metadata.download_piece_started(task_id, number),
Err(_) => self.metadata.download_piece_started(piece_id, number),
}
}
@ -290,8 +290,8 @@ impl Storage {
#[instrument(skip_all)]
pub async fn download_piece_from_source_finished<R: AsyncRead + Unpin + ?Sized>(
&self,
piece_id: &str,
task_id: &str,
number: u32,
offset: u64,
length: u64,
reader: &mut R,
@ -300,8 +300,7 @@ impl Storage {
let digest = Digest::new(Algorithm::Crc32, response.hash);
self.metadata.download_piece_finished(
task_id,
number,
piece_id,
offset,
length,
digest.to_string().as_str(),
@ -313,8 +312,8 @@ impl Storage {
#[instrument(skip_all)]
pub async fn download_piece_from_remote_peer_finished<R: AsyncRead + Unpin + ?Sized>(
&self,
piece_id: &str,
task_id: &str,
number: u32,
offset: u64,
expected_digest: &str,
parent_id: &str,
@ -333,8 +332,7 @@ impl Storage {
}
self.metadata.download_piece_finished(
task_id,
number,
piece_id,
offset,
length,
digest.to_string().as_str(),
@ -344,8 +342,8 @@ impl Storage {
/// download_piece_failed updates the metadata of the piece when the piece downloads failed.
#[instrument(skip_all)]
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.metadata.download_piece_failed(task_id, number)
pub fn download_piece_failed(&self, piece_id: &str) -> Result<()> {
self.metadata.download_piece_failed(piece_id)
}
/// upload_piece updates the metadata of the piece and
@ -353,25 +351,25 @@ impl Storage {
#[instrument(skip_all)]
pub async fn upload_piece(
&self,
piece_id: &str,
task_id: &str,
number: u32,
range: Option<Range>,
) -> Result<impl AsyncRead> {
// Wait for the piece to be finished.
self.wait_for_piece_finished(task_id, number).await?;
self.wait_for_piece_finished(piece_id).await?;
// Start uploading the task.
self.metadata.upload_task_started(task_id)?;
// Start uploading the piece.
if let Err(err) = self.metadata.upload_piece_started(task_id, number) {
if let Err(err) = self.metadata.upload_piece_started(piece_id) {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
return Err(err);
}
// Get the piece metadata and return the content of the piece.
match self.metadata.get_piece(task_id, number) {
match self.metadata.get_piece(piece_id) {
Ok(Some(piece)) => {
match self
.content
@ -383,7 +381,7 @@ impl Storage {
self.metadata.upload_task_finished(task_id)?;
// Finish uploading the piece.
self.metadata.upload_piece_finished(task_id, number)?;
self.metadata.upload_piece_finished(piece_id)?;
Ok(reader)
}
Err(err) => {
@ -391,7 +389,7 @@ impl Storage {
self.metadata.upload_task_failed(task_id)?;
// Failed uploading the piece.
self.metadata.upload_piece_failed(task_id, number)?;
self.metadata.upload_piece_failed(piece_id)?;
Err(err)
}
}
@ -401,15 +399,15 @@ impl Storage {
self.metadata.upload_task_failed(task_id)?;
// Failed uploading the piece.
self.metadata.upload_piece_failed(task_id, number)?;
Err(Error::PieceNotFound(self.piece_id(task_id, number)))
self.metadata.upload_piece_failed(piece_id)?;
Err(Error::PieceNotFound(piece_id.to_string()))
}
Err(err) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
// Failed uploading the piece.
self.metadata.upload_piece_failed(task_id, number)?;
self.metadata.upload_piece_failed(piece_id)?;
Err(err)
}
}
@ -417,8 +415,8 @@ impl Storage {
/// get_piece returns the piece metadata.
#[instrument(skip_all)]
pub fn get_piece(&self, task_id: &str, number: u32) -> Result<Option<metadata::Piece>> {
self.metadata.get_piece(task_id, number)
pub fn get_piece(&self, piece_id: &str) -> Result<Option<metadata::Piece>> {
self.metadata.get_piece(piece_id)
}
/// get_pieces returns the piece metadatas.
@ -434,7 +432,7 @@ impl Storage {
/// wait_for_piece_finished waits for the piece to be finished.
#[instrument(skip_all)]
async fn wait_for_piece_finished(&self, task_id: &str, number: u32) -> Result<metadata::Piece> {
async fn wait_for_piece_finished(&self, piece_id: &str) -> Result<metadata::Piece> {
// Initialize the timeout of piece.
let piece_timeout = tokio::time::sleep(self.config.download.piece_timeout);
tokio::pin!(piece_timeout);
@ -446,8 +444,8 @@ impl Storage {
tokio::select! {
_ = interval.tick() => {
let piece = self
.get_piece(task_id, number)?
.ok_or_else(|| Error::PieceNotFound(self.piece_id(task_id, number)))?;
.get_piece(piece_id)?
.ok_or_else(|| Error::PieceNotFound(piece_id.to_string()))?;
// If the piece is finished, return.
if piece.is_finished() {
@ -461,8 +459,8 @@ impl Storage {
wait_for_piece_count += 1;
}
_ = &mut piece_timeout => {
self.metadata.wait_for_piece_finished_failed(task_id, number).unwrap_or_else(|err| error!("delete piece metadata failed: {}", err));
return Err(Error::WaitForPieceFinishedTimeout(self.piece_id(task_id, number)));
self.metadata.wait_for_piece_finished_failed(piece_id).unwrap_or_else(|err| error!("delete piece metadata failed: {}", err));
return Err(Error::WaitForPieceFinishedTimeout(piece_id.to_string()));
}
}
}

View File

@ -699,7 +699,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
/// download_piece_started updates the metadata of the piece when the piece downloads started.
#[instrument(skip_all)]
pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<Piece> {
pub fn download_piece_started(&self, piece_id: &str, number: u32) -> Result<Piece> {
// Construct the piece metadata.
let piece = Piece {
number,
@ -709,8 +709,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
};
// Put the piece metadata.
self.db
.put(self.piece_id(task_id, number).as_bytes(), &piece)?;
self.db.put(piece_id.as_bytes(), &piece)?;
Ok(piece)
}
@ -718,16 +717,13 @@ impl<E: StorageEngineOwned> Metadata<E> {
#[instrument(skip_all)]
pub fn download_piece_finished(
&self,
task_id: &str,
number: u32,
piece_id: &str,
offset: u64,
length: u64,
digest: &str,
parent_id: Option<String>,
) -> Result<Piece> {
// Get the piece id.
let id = self.piece_id(task_id, number);
let piece = match self.db.get::<Piece>(id.as_bytes())? {
let piece = match self.db.get::<Piece>(piece_id.as_bytes())? {
Some(mut piece) => {
piece.offset = offset;
piece.length = length;
@ -737,84 +733,78 @@ impl<E: StorageEngineOwned> Metadata<E> {
piece.finished_at = Some(Utc::now().naive_utc());
piece
}
None => return Err(Error::PieceNotFound(id)),
None => return Err(Error::PieceNotFound(piece_id.to_string())),
};
self.db.put(id.as_bytes(), &piece)?;
self.db.put(piece_id.as_bytes(), &piece)?;
Ok(piece)
}
/// download_piece_failed updates the metadata of the piece when the piece downloads failed.
#[instrument(skip_all)]
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.delete_piece(task_id, number)
pub fn download_piece_failed(&self, piece_id: &str) -> Result<()> {
self.delete_piece(piece_id)
}
/// wait_for_piece_finished_failed waits for the piece to be finished or failed.
#[instrument(skip_all)]
pub fn wait_for_piece_finished_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.delete_piece(task_id, number)
pub fn wait_for_piece_finished_failed(&self, piece_id: &str) -> Result<()> {
self.delete_piece(piece_id)
}
/// upload_piece_started updates the metadata of the piece when piece uploads started.
#[instrument(skip_all)]
pub fn upload_piece_started(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the piece id.
let id = self.piece_id(task_id, number);
let piece = match self.db.get::<Piece>(id.as_bytes())? {
pub fn upload_piece_started(&self, piece_id: &str) -> Result<Piece> {
let piece = match self.db.get::<Piece>(piece_id.as_bytes())? {
Some(mut piece) => {
piece.uploading_count += 1;
piece.updated_at = Utc::now().naive_utc();
piece
}
None => return Err(Error::PieceNotFound(id)),
None => return Err(Error::PieceNotFound(piece_id.to_string())),
};
self.db.put(id.as_bytes(), &piece)?;
self.db.put(piece_id.as_bytes(), &piece)?;
Ok(piece)
}
/// upload_piece_finished updates the metadata of the piece when piece uploads finished.
#[instrument(skip_all)]
pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the piece id.
let id = self.piece_id(task_id, number);
let piece = match self.db.get::<Piece>(id.as_bytes())? {
pub fn upload_piece_finished(&self, piece_id: &str) -> Result<Piece> {
let piece = match self.db.get::<Piece>(piece_id.as_bytes())? {
Some(mut piece) => {
piece.uploading_count -= 1;
piece.uploaded_count += 1;
piece.updated_at = Utc::now().naive_utc();
piece
}
None => return Err(Error::PieceNotFound(id)),
None => return Err(Error::PieceNotFound(piece_id.to_string())),
};
self.db.put(id.as_bytes(), &piece)?;
self.db.put(piece_id.as_bytes(), &piece)?;
Ok(piece)
}
/// upload_piece_failed updates the metadata of the piece when the piece uploads failed.
#[instrument(skip_all)]
pub fn upload_piece_failed(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the piece id.
let id = self.piece_id(task_id, number);
let piece = match self.db.get::<Piece>(id.as_bytes())? {
pub fn upload_piece_failed(&self, piece_id: &str) -> Result<Piece> {
let piece = match self.db.get::<Piece>(piece_id.as_bytes())? {
Some(mut piece) => {
piece.uploading_count -= 1;
piece.updated_at = Utc::now().naive_utc();
piece
}
None => return Err(Error::PieceNotFound(id)),
None => return Err(Error::PieceNotFound(piece_id.to_string())),
};
self.db.put(id.as_bytes(), &piece)?;
self.db.put(piece_id.as_bytes(), &piece)?;
Ok(piece)
}
/// get_piece gets the piece metadata.
#[instrument(skip_all)]
pub fn get_piece(&self, task_id: &str, number: u32) -> Result<Option<Piece>> {
self.db.get(self.piece_id(task_id, number).as_bytes())
pub fn get_piece(&self, piece_id: &str) -> Result<Option<Piece>> {
self.db.get(piece_id.as_bytes())
}
/// get_pieces gets the piece metadatas.
@ -836,10 +826,9 @@ impl<E: StorageEngineOwned> Metadata<E> {
/// delete_piece deletes the piece metadata.
#[instrument(skip_all)]
pub fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> {
info!("delete piece metadata {}", self.piece_id(task_id, number));
self.db
.delete::<Piece>(self.piece_id(task_id, number).as_bytes())
pub fn delete_piece(&self, piece_id: &str) -> Result<()> {
info!("delete piece metadata {}", piece_id);
self.db.delete::<Piece>(piece_id.as_bytes())
}
/// delete_pieces deletes the piece metadatas.
@ -950,20 +939,20 @@ mod tests {
// Test upload_task_started.
metadata.upload_task_started(task_id).unwrap();
let task = metadata.get_task(task_id).unwrap().unwrap();
assert_eq!(task.uploading_count, 1,);
assert_eq!(task.uploading_count, 1);
// Test upload_task_finished.
metadata.upload_task_finished(task_id).unwrap();
let task = metadata.get_task(task_id).unwrap().unwrap();
assert_eq!(task.uploading_count, 0,);
assert_eq!(task.uploaded_count, 1,);
assert_eq!(task.uploading_count, 0);
assert_eq!(task.uploaded_count, 1);
// Test upload_task_failed.
let task = metadata.upload_task_started(task_id).unwrap();
assert_eq!(task.uploading_count, 1);
let task = metadata.upload_task_failed(task_id).unwrap();
assert_eq!(task.uploading_count, 0,);
assert_eq!(task.uploaded_count, 1,);
assert_eq!(task.uploading_count, 0);
assert_eq!(task.uploaded_count, 1);
// Test get_tasks.
let task_id = "a535b115f18d96870f0422ac891f91dd162f2f391e4778fb84279701fcd02dd1";
@ -985,49 +974,64 @@ mod tests {
let log_dir = dir.path().join("log");
let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap();
let task_id = "d3c4e940ad06c47fc36ac67801e6f8e36cb400e2391708620bc7e865b102062c";
let piece_id = metadata.piece_id(task_id, 1);
// Test download_piece_started.
metadata.download_piece_started(task_id, 1).unwrap();
let piece = metadata.get_piece(task_id, 1).unwrap().unwrap();
assert_eq!(piece.number, 1,);
metadata
.download_piece_started(piece_id.as_str(), 1)
.unwrap();
let piece = metadata.get_piece(piece_id.as_str()).unwrap().unwrap();
assert_eq!(piece.number, 1);
// Test download_piece_finished.
metadata
.download_piece_finished(task_id, 1, 0, 1024, "digest1", None)
.download_piece_finished(piece_id.as_str(), 0, 1024, "digest1", None)
.unwrap();
let piece = metadata.get_piece(task_id, 1).unwrap().unwrap();
assert_eq!(piece.length, 1024,);
assert_eq!(piece.digest, "digest1",);
let piece = metadata.get_piece(piece_id.as_str()).unwrap().unwrap();
assert_eq!(piece.length, 1024);
assert_eq!(piece.digest, "digest1");
// Test get_pieces.
metadata.download_piece_started(task_id, 2).unwrap();
metadata.download_piece_started(task_id, 3).unwrap();
metadata
.download_piece_started(metadata.piece_id(task_id, 2).as_str(), 2)
.unwrap();
metadata
.download_piece_started(metadata.piece_id(task_id, 3).as_str(), 3)
.unwrap();
let pieces = metadata.get_pieces(task_id).unwrap();
assert_eq!(pieces.len(), 3, "should get 3 pieces in total");
assert_eq!(pieces.len(), 3);
// Test download_piece_failed.
metadata.download_piece_started(task_id, 2).unwrap();
metadata.download_piece_started(task_id, 3).unwrap();
metadata.download_piece_failed(task_id, 2).unwrap();
let piece = metadata.get_piece(task_id, 2).unwrap();
let piece_id = metadata.piece_id(task_id, 2);
metadata
.download_piece_started(piece_id.as_str(), 2)
.unwrap();
metadata
.download_piece_started(metadata.piece_id(task_id, 3).as_str(), 3)
.unwrap();
metadata.download_piece_failed(piece_id.as_str()).unwrap();
let piece = metadata.get_piece(piece_id.as_str()).unwrap();
assert!(piece.is_none());
// Test upload_piece_started.
metadata.upload_piece_started(task_id, 3).unwrap();
let piece = metadata.get_piece(task_id, 3).unwrap().unwrap();
assert_eq!(piece.uploading_count, 1,);
let piece_id = metadata.piece_id(task_id, 3);
metadata.upload_piece_started(piece_id.as_str()).unwrap();
let piece = metadata.get_piece(piece_id.as_str()).unwrap().unwrap();
assert_eq!(piece.uploading_count, 1);
// Test upload_piece_finished.
metadata.upload_piece_finished(task_id, 3).unwrap();
let piece = metadata.get_piece(task_id, 3).unwrap().unwrap();
assert_eq!(piece.uploading_count, 0,);
assert_eq!(piece.uploaded_count, 1,);
let piece_id = metadata.piece_id(task_id, 3);
metadata.upload_piece_finished(piece_id.as_str()).unwrap();
let piece = metadata.get_piece(piece_id.as_str()).unwrap().unwrap();
assert_eq!(piece.uploading_count, 0);
assert_eq!(piece.uploaded_count, 1);
// Test upload_piece_failed.
metadata.upload_piece_started(task_id, 3).unwrap();
metadata.upload_piece_failed(task_id, 3).unwrap();
let piece = metadata.get_piece(task_id, 3).unwrap().unwrap();
assert_eq!(piece.uploading_count, 0,);
let piece_id = metadata.piece_id(task_id, 3);
metadata.upload_piece_started(piece_id.as_str()).unwrap();
metadata.upload_piece_failed(piece_id.as_str()).unwrap();
let piece = metadata.get_piece(piece_id.as_str()).unwrap().unwrap();
assert_eq!(piece.uploading_count, 0);
// Test delete_pieces.
metadata.delete_pieces(task_id).unwrap();

View File

@ -54,11 +54,11 @@ impl RocksdbStorageEngine {
// DEFAULT_MAX_BACKGROUND_JOBS is the default max background jobs for rocksdb, default is 2.
const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 2;
/// DEFAULT_BLOCK_SIZE is the default block size for rocksdb, default is 128KB.
const DEFAULT_BLOCK_SIZE: usize = 128 * 1024;
/// DEFAULT_BLOCK_SIZE is the default block size for rocksdb, default is 64KB.
const DEFAULT_BLOCK_SIZE: usize = 64 * 1024;
/// DEFAULT_CACHE_SIZE is the default cache size for rocksdb, default is 512MB.
const DEFAULT_CACHE_SIZE: usize = 512 * 1024 * 1024;
/// DEFAULT_CACHE_SIZE is the default cache size for rocksdb, default is 1GB.
const DEFAULT_CACHE_SIZE: usize = 1024 * 1024 * 1024;
/// DEFAULT_LOG_MAX_SIZE is the default max log size for rocksdb, default is 64MB.
const DEFAULT_LOG_MAX_SIZE: usize = 64 * 1024 * 1024;
@ -74,16 +74,21 @@ impl RocksdbStorageEngine {
let mut options = rocksdb::Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
options.increase_parallelism(num_cpus::get() as i32);
// Optimize compression.
options.set_compression_type(rocksdb::DBCompressionType::Lz4);
options.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
// Improved parallelism.
options.increase_parallelism(num_cpus::get() as i32);
options.set_max_background_jobs(std::cmp::max(
num_cpus::get() as i32 / 2,
num_cpus::get() as i32,
Self::DEFAULT_MAX_BACKGROUND_JOBS,
));
// Set rocksdb log options.
options.set_db_log_dir(log_dir);
options.set_log_level(rocksdb::LogLevel::Debug);
options.set_log_level(rocksdb::LogLevel::Info);
options.set_max_log_file_size(Self::DEFAULT_LOG_MAX_SIZE);
options.set_keep_log_file_num(Self::DEFAULT_LOG_MAX_FILES);
@ -91,12 +96,14 @@ impl RocksdbStorageEngine {
let mut block_options = rocksdb::BlockBasedOptions::default();
block_options.set_block_cache(&rocksdb::Cache::new_lru_cache(Self::DEFAULT_CACHE_SIZE));
block_options.set_block_size(Self::DEFAULT_BLOCK_SIZE);
block_options.set_cache_index_and_filter_blocks(true);
block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
options.set_block_based_table_factory(&block_options);
// Initialize column family options.
let mut cf_options = rocksdb::Options::default();
cf_options.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(64));
cf_options.set_memtable_prefix_bloom_ratio(0.2);
cf_options.set_memtable_prefix_bloom_ratio(0.25);
cf_options.optimize_level_style_compaction(Self::DEFAULT_MEMTABLE_MEMORY_BUDGET);
// Initialize column families.

View File

@ -652,10 +652,12 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let mut has_started_piece = false;
let mut finished_piece_numbers = Vec::new();
for interested_piece_number in interested_piece_numbers.iter() {
let piece = match task_manager
let piece = match task_manager.piece.get(
task_manager
.piece
.get(task_id.as_str(), *interested_piece_number)
{
.id(task_id.as_str(), *interested_piece_number)
.as_str(),
) {
Ok(Some(piece)) => piece,
Ok(None) => continue,
Err(err) => {
@ -765,17 +767,20 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
// Get the interested piece number from the request.
let piece_number = request.piece_number;
// Generate the piece id.
let piece_id = self.task.piece.id(task_id.as_str(), piece_number);
// Span record the host id, task id and piece number.
Span::current().record("host_id", host_id.as_str());
Span::current().record("remote_host_id", remote_host_id.as_str());
Span::current().record("task_id", task_id.as_str());
Span::current().record("piece_id", format!("{}-{}", task_id, piece_number).as_str());
Span::current().record("piece_id", piece_id.as_str());
// Get the piece metadata from the local storage.
let piece = self
.task
.piece
.get(task_id.as_str(), piece_number)
.get(piece_id.as_str())
.map_err(|err| {
error!("upload piece metadata from local storage: {}", err);
Status::internal(err.to_string())
@ -794,8 +799,8 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
.task
.piece
.upload_from_local_peer_into_async_read(
piece_id.as_str(),
task_id.as_str(),
piece_number,
piece.length,
None,
false,

View File

@ -740,8 +740,10 @@ async fn proxy_by_dfdaemon(
let piece_reader = match task
.piece
.download_from_local_peer_into_async_read(
task.piece
.id(message.task_id.as_str(), piece.number)
.as_str(),
message.task_id.as_str(),
piece.number,
piece.length,
download_task_started_response.range,
true,

View File

@ -841,14 +841,16 @@ impl PersistentCacheTask {
// Limit the concurrent download count.
let _permit = semaphore.acquire().await.unwrap();
let piece_id = storage.piece_id(task_id.as_str(), number);
info!(
"start to download piece {} from remote peer {:?}",
storage.piece_id(task_id.as_str(), number),
piece_id,
parent.id.clone()
);
let metadata = piece_manager
.download_from_remote_peer(
peer_id.as_str(),
host_id.as_str(),
task_id.as_str(),
number,
@ -859,7 +861,7 @@ impl PersistentCacheTask {
.map_err(|err| {
error!(
"download piece {} from remote peer {:?} error: {:?}",
storage.piece_id(task_id.as_str(), number),
piece_id,
parent.id.clone(),
err
);
@ -930,8 +932,7 @@ impl PersistentCacheTask {
info!(
"finished piece {} from remote peer {:?}",
storage.piece_id(task_id.as_str(), metadata.number),
metadata.parent_id
piece_id, metadata.parent_id
);
Ok(metadata)
@ -1029,24 +1030,19 @@ impl PersistentCacheTask {
// Download the piece from the local peer.
for interested_piece in interested_pieces {
let piece_id = self
.storage
.piece_id(task.id.as_str(), interested_piece.number);
// Get the piece metadata from the local storage.
let piece = match self.piece.get(task.id.as_str(), interested_piece.number) {
let piece = match self.piece.get(piece_id.as_str()) {
Ok(Some(piece)) => piece,
Ok(None) => {
info!(
"piece {} not found in local storage",
self.storage
.piece_id(task.id.as_str(), interested_piece.number)
);
info!("piece {} not found in local storage", piece_id);
continue;
}
Err(err) => {
error!(
"get piece {} from local storage error: {:?}",
self.storage
.piece_id(task.id.as_str(), interested_piece.number),
err
);
error!("get piece {} from local storage error: {:?}", piece_id, err);
continue;
}
};
@ -1054,10 +1050,7 @@ impl PersistentCacheTask {
// Fake the download from the local peer.
self.piece
.download_from_local_peer(task.id.as_str(), piece.length);
info!(
"finished piece {} from local peer",
self.storage.piece_id(task.id.as_str(), piece.number)
);
info!("finished piece {} from local peer", piece_id);
// Construct the piece.
let piece = Piece {

View File

@ -110,10 +110,16 @@ impl Piece {
}
}
/// id generates a new piece id.
#[instrument(skip_all)]
pub fn id(&self, task_id: &str, number: u32) -> String {
self.storage.piece_id(task_id, number)
}
/// get gets a piece from the local storage.
#[instrument(skip_all)]
pub fn get(&self, task_id: &str, number: u32) -> Result<Option<metadata::Piece>> {
self.storage.get_piece(task_id, number)
pub fn get(&self, piece_id: &str) -> Result<Option<metadata::Piece>> {
self.storage.get_piece(piece_id)
}
/// calculate_interested calculates the interested pieces by content_length and range.
@ -302,14 +308,14 @@ impl Piece {
#[instrument(skip_all, fields(piece_id))]
pub async fn upload_from_local_peer_into_async_read(
&self,
piece_id: &str,
task_id: &str,
number: u32,
length: u64,
range: Option<Range>,
disable_rate_limit: bool,
) -> Result<impl AsyncRead> {
// Span record the piece_id.
Span::current().record("piece_id", self.storage.piece_id(task_id, number));
Span::current().record("piece_id", piece_id);
// Acquire the upload rate limiter.
if !disable_rate_limit {
@ -318,7 +324,7 @@ impl Piece {
// Upload the piece content.
self.storage
.upload_piece(task_id, number, range)
.upload_piece(piece_id, task_id, range)
.await
.map(|reader| {
collect_upload_piece_traffic_metrics(
@ -333,14 +339,14 @@ impl Piece {
#[instrument(skip_all, fields(piece_id))]
pub async fn download_from_local_peer_into_async_read(
&self,
piece_id: &str,
task_id: &str,
number: u32,
length: u64,
range: Option<Range>,
disable_rate_limit: bool,
) -> Result<impl AsyncRead> {
// Span record the piece_id.
Span::current().record("piece_id", self.storage.piece_id(task_id, number));
Span::current().record("piece_id", piece_id);
// Acquire the download rate limiter.
if !disable_rate_limit {
@ -348,7 +354,7 @@ impl Piece {
}
// Upload the piece content.
self.storage.upload_piece(task_id, number, range).await
self.storage.upload_piece(piece_id, task_id, range).await
}
/// download_from_local_peer downloads a single piece from a local peer. Fake the download piece
@ -366,6 +372,7 @@ impl Piece {
#[instrument(skip_all, fields(piece_id))]
pub async fn download_from_remote_peer(
&self,
piece_id: &str,
host_id: &str,
task_id: &str,
number: u32,
@ -373,13 +380,16 @@ impl Piece {
parent: piece_collector::CollectedParent,
) -> Result<metadata::Piece> {
// Span record the piece_id.
Span::current().record("piece_id", self.storage.piece_id(task_id, number));
Span::current().record("piece_id", piece_id);
// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
// Record the start of downloading piece.
let piece = self.storage.download_piece_started(task_id, number).await?;
let piece = self
.storage
.download_piece_started(piece_id, number)
.await?;
// If the piece is downloaded by the other thread,
// return the piece directly.
@ -390,7 +400,7 @@ impl Piece {
// Create a dfdaemon client.
let host = parent.host.clone().ok_or_else(|| {
error!("peer host is empty");
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};
@ -406,7 +416,7 @@ impl Piece {
"create dfdaemon upload client from {}:{} failed: {}",
host.ip, host.port, err
);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};
@ -426,7 +436,7 @@ impl Piece {
.await
.map_err(|err| {
error!("download piece failed: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};
@ -435,7 +445,7 @@ impl Piece {
let piece = response.piece.ok_or_else(|| {
error!("piece is empty");
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};
@ -445,7 +455,7 @@ impl Piece {
// Get the piece content.
let content = piece.content.ok_or_else(|| {
error!("piece content is empty");
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};
@ -455,8 +465,8 @@ impl Piece {
// Record the finish of downloading piece.
self.storage
.download_piece_from_remote_peer_finished(
piece_id,
task_id,
number,
piece.offset,
piece.digest.as_str(),
parent.id.as_str(),
@ -467,7 +477,7 @@ impl Piece {
// Record the failure of downloading piece,
// If storage fails to record piece.
error!("download piece finished: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};
@ -475,7 +485,7 @@ impl Piece {
})?;
self.storage
.get_piece(task_id, number)?
.get_piece(piece_id)?
.ok_or_else(|| {
error!("piece not found");
Error::PieceNotFound(number.to_string())
@ -495,6 +505,7 @@ impl Piece {
#[instrument(skip_all, fields(piece_id))]
pub async fn download_from_source(
&self,
piece_id: &str,
task_id: &str,
number: u32,
url: &str,
@ -504,13 +515,16 @@ impl Piece {
object_storage: Option<ObjectStorage>,
) -> Result<metadata::Piece> {
// Span record the piece_id.
Span::current().record("piece_id", self.storage.piece_id(task_id, number));
Span::current().record("piece_id", piece_id);
// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
// Record the start of downloading piece.
let piece = self.storage.download_piece_started(task_id, number).await?;
let piece = self
.storage
.download_piece_started(piece_id, number)
.await?;
// If the piece is downloaded by the other thread,
// return the piece directly.
@ -530,7 +544,7 @@ impl Piece {
// Download the piece from the source.
let backend = self.backend_factory.build(url).map_err(|err| {
error!("build backend failed: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};
@ -548,7 +562,7 @@ impl Piece {
let mut response = backend
.get(GetRequest {
task_id: task_id.to_string(),
piece_id: self.storage.piece_id(task_id, number),
piece_id: piece_id.to_string(),
url: url.to_string(),
range: Some(Range {
start: offset,
@ -569,7 +583,7 @@ impl Piece {
// if the request is failed.
error!("backend get failed: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};
@ -594,7 +608,7 @@ impl Piece {
let error_message = response.error_message.unwrap_or_default();
error!("backend get failed: {} {}", error_message, buffer.as_str());
self.storage.download_piece_failed(task_id, number)?;
self.storage.download_piece_failed(piece_id)?;
return Err(Error::BackendError(BackendError {
message: error_message,
status_code: Some(response.http_status_code.unwrap_or_default()),
@ -612,8 +626,8 @@ impl Piece {
// Record the finish of downloading piece.
self.storage
.download_piece_from_source_finished(
piece_id,
task_id,
number,
offset,
length,
&mut response.reader,
@ -623,7 +637,7 @@ impl Piece {
// Record the failure of downloading piece,
// If storage fails to record piece.
error!("download piece finished: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};
@ -631,7 +645,7 @@ impl Piece {
})?;
self.storage
.get_piece(task_id, number)?
.get_piece(piece_id)?
.ok_or_else(|| {
error!("piece not found");
Error::PieceNotFound(number.to_string())

View File

@ -964,14 +964,16 @@ impl Task {
// Limit the concurrent piece count.
let _permit = semaphore.acquire().await.unwrap();
let piece_id = storage.piece_id(task_id.as_str(), number);
info!(
"start to download piece {} from remote peer {:?}",
storage.piece_id(task_id.as_str(), number),
piece_id,
parent.id.clone()
);
let metadata = piece_manager
.download_from_remote_peer(
piece_id.as_str(),
host_id.as_str(),
task_id.as_str(),
number,
@ -982,7 +984,7 @@ impl Task {
.map_err(|err| {
error!(
"download piece {} from remote peer {:?} error: {:?}",
storage.piece_id(task_id.as_str(), number),
piece_id,
parent.id.clone(),
err
);
@ -1026,8 +1028,7 @@ impl Task {
.map_err(|err| {
error!(
"send DownloadPieceFinishedRequest for piece {} failed: {:?}",
storage.piece_id(task_id.as_str(), number),
err
piece_id, err
);
interrupt.store(true, Ordering::SeqCst);
err
@ -1054,8 +1055,7 @@ impl Task {
.map_err(|err| {
error!(
"send DownloadPieceFinishedResponse for piece {} failed: {:?}",
storage.piece_id(task_id.as_str(), number),
err
piece_id, err
);
interrupt.store(true, Ordering::SeqCst);
err
@ -1063,8 +1063,7 @@ impl Task {
info!(
"finished piece {} from remote peer {:?}",
storage.piece_id(task_id.as_str(), metadata.number),
metadata.parent_id
piece_id, metadata.parent_id
);
let mut finished_pieces = finished_pieces.lock().unwrap();
@ -1206,13 +1205,12 @@ impl Task {
// Limit the concurrent download count.
let _permit = semaphore.acquire().await.unwrap();
info!(
"start to download piece {} from source",
storage.piece_id(task_id.as_str(), number)
);
let piece_id = storage.piece_id(task_id.as_str(), number);
info!("start to download piece {} from source", piece_id);
let metadata = piece_manager
.download_from_source(
piece_id.as_str(),
task_id.as_str(),
number,
url.as_str(),
@ -1254,7 +1252,7 @@ impl Task {
REQUEST_TIMEOUT,
)
.await.map_err(|err| {
error!("send DownloadPieceBackToSourceFinishedRequest for piece {} failed: {:?}", storage.piece_id(task_id.as_str(), number), err);
error!("send DownloadPieceBackToSourceFinishedRequest for piece {} failed: {:?}", piece_id, err);
err
})?;
@ -1279,17 +1277,12 @@ impl Task {
.map_err(|err| {
error!(
"send DownloadPieceFinishedResponse for piece {} failed: {:?}",
storage.piece_id(task_id.as_str(), number),
err
piece_id, err
);
err
})?;
info!(
"finished piece {} from source",
storage.piece_id(task_id.as_str(), piece.number)
);
info!("finished piece {} from source", piece_id);
Ok(metadata)
}
@ -1423,24 +1416,19 @@ impl Task {
// Download the piece from the local peer.
for interested_piece in interested_pieces {
let piece_id = self
.storage
.piece_id(task.id.as_str(), interested_piece.number);
// Get the piece metadata from the local storage.
let piece = match self.piece.get(task.id.as_str(), interested_piece.number) {
let piece = match self.piece.get(piece_id.as_str()) {
Ok(Some(piece)) => piece,
Ok(None) => {
info!(
"piece {} not found in local storage",
self.storage
.piece_id(task.id.as_str(), interested_piece.number)
);
info!("piece {} not found in local storage", piece_id);
continue;
}
Err(err) => {
error!(
"get piece {} from local storage error: {:?}",
self.storage
.piece_id(task.id.as_str(), interested_piece.number),
err
);
error!("get piece {} from local storage error: {:?}", piece_id, err);
continue;
}
};
@ -1448,10 +1436,7 @@ impl Task {
// Fake the download from the local peer.
self.piece
.download_from_local_peer(task.id.as_str(), piece.length);
info!(
"finished piece {} from local peer",
self.storage.piece_id(task.id.as_str(), piece.number)
);
info!("finished piece {} from local peer", piece_id,);
// Construct the piece.
let piece = Piece {
@ -1487,8 +1472,7 @@ impl Task {
.map_err(|err| {
error!(
"send DownloadPieceFinishedResponse for piece {} failed: {:?}",
self.storage.piece_id(task.id.as_str(), piece.number),
err
piece_id, err
);
err
})?;
@ -1545,13 +1529,12 @@ impl Task {
// Limit the concurrent download count.
let _permit = semaphore.acquire().await.unwrap();
info!(
"start to download piece {} from source",
storage.piece_id(task_id.as_str(), number)
);
let piece_id = storage.piece_id(task_id.as_str(), number);
info!("start to download piece {} from source", piece_id);
let metadata = piece_manager
.download_from_source(
piece_id.as_str(),
task_id.as_str(),
number,
url.as_str(),
@ -1596,16 +1579,12 @@ impl Task {
.map_err(|err| {
error!(
"send DownloadPieceFinishedResponse for piece {} failed: {:?}",
storage.piece_id(task_id.as_str(), metadata.number),
err
piece_id, err
);
err
})?;
info!(
"finished piece {} from source",
storage.piece_id(task_id.as_str(), metadata.number)
);
info!("finished piece {} from source", piece_id);
Ok(metadata)
}