feat: wait for piece finished before update piece metadata (#535)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-06-13 15:51:54 +08:00 committed by GitHub
parent 3fa0e87059
commit 832af14c91
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 36 additions and 31 deletions

16
Cargo.lock generated
View File

@ -965,7 +965,7 @@ dependencies = [
[[package]]
name = "dragonfly-client"
version = "0.1.79"
version = "0.1.80"
dependencies = [
"anyhow",
"bytes",
@ -1029,7 +1029,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-backend"
version = "0.1.79"
version = "0.1.80"
dependencies = [
"dragonfly-api",
"dragonfly-client-core",
@ -1049,7 +1049,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-config"
version = "0.1.79"
version = "0.1.80"
dependencies = [
"dragonfly-client-core",
"home",
@ -1068,7 +1068,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-core"
version = "0.1.79"
version = "0.1.80"
dependencies = [
"hyper 1.2.0",
"hyper-util",
@ -1082,7 +1082,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-init"
version = "0.1.79"
version = "0.1.80"
dependencies = [
"anyhow",
"clap",
@ -1098,7 +1098,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-storage"
version = "0.1.79"
version = "0.1.80"
dependencies = [
"base16ct",
"blake3",
@ -1122,7 +1122,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-util"
version = "0.1.79"
version = "0.1.80"
dependencies = [
"dragonfly-api",
"dragonfly-client-core",
@ -1581,7 +1581,7 @@ dependencies = [
[[package]]
name = "hdfs"
version = "0.1.79"
version = "0.1.80"
dependencies = [
"dragonfly-client-backend",
"dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
]
[workspace.package]
version = "0.1.79"
version = "0.1.80"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"
[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.79" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.79" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.79" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.79" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.79" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.79" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.79" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.80" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.80" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.80" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.80" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.80" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.80" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.80" }
thiserror = "1.0"
dragonfly-api = "2.0.115"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }

View File

@ -23,7 +23,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncRead;
use tracing::info;
use tracing::{error, info};
pub mod content;
pub mod metadata;
@ -210,6 +210,9 @@ impl Storage {
number: u32,
range: Option<Range>,
) -> Result<impl AsyncRead> {
// Wait for the piece to be finished.
self.wait_for_piece_finished(task_id, number).await?;
// Start uploading the task.
self.metadata.upload_task_started(task_id)?;
@ -220,16 +223,6 @@ impl Storage {
return Err(err);
}
// Wait for the piece to be finished.
if let Err(err) = self.wait_for_piece_finished(task_id, number).await {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
// Failed uploading the piece.
self.metadata.upload_piece_failed(task_id, number)?;
return Err(err);
}
// Get the piece metadata and return the content of the piece.
match self.metadata.get_piece(task_id, number)? {
Some(piece) => {
@ -294,7 +287,6 @@ impl Storage {
loop {
tokio::select! {
_ = interval.tick() => {
let piece = self
.get_piece(task_id, number)?
.ok_or_else(|| Error::PieceNotFound(self.piece_id(task_id, number)))?;
@ -311,6 +303,7 @@ impl Storage {
wait_for_piece_count += 1;
}
_ = &mut piece_timeout => {
self.metadata.wait_for_piece_finished_failed(task_id, number).unwrap_or_else(|err| error!("delete piece metadata failed: {}", err));
return Err(Error::WaitForPieceFinishedTimeout(self.piece_id(task_id, number)));
}
}

View File

@ -24,7 +24,7 @@ use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tracing::error;
use tracing::{error, info};
use crate::storage_engine::{rocksdb::RocksdbStorageEngine, DatabaseObject, StorageEngineOwned};
@ -391,6 +391,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
/// delete_task deletes the task metadata.
pub fn delete_task(&self, task_id: &str) -> Result<()> {
info!("delete task metadata {}", task_id);
self.db.delete::<Task>(task_id.as_bytes())
}
@ -441,8 +442,12 @@ impl<E: StorageEngineOwned> Metadata<E> {
/// download_piece_failed updates the metadata of the piece when the piece downloads failed.
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.db
.delete::<Piece>(self.piece_id(task_id, number).as_bytes())
self.delete_piece(task_id, number)
}
// wait_for_piece_finished_failed waits for the piece to be finished or failed.
pub fn wait_for_piece_finished_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.delete_piece(task_id, number)
}
/// upload_piece_started updates the metadata of the piece when piece uploads started.
@ -508,6 +513,13 @@ impl<E: StorageEngineOwned> Metadata<E> {
iter.map(|ele| ele.map(|(_, piece)| piece)).collect()
}
/// delete_piece deletes the piece metadata.
pub fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> {
info!("delete piece metadata {}", self.piece_id(task_id, number));
self.db
.delete::<Piece>(self.piece_id(task_id, number).as_bytes())
}
/// delete_pieces deletes the piece metadatas.
pub fn delete_pieces(&self, task_id: &str) -> Result<()> {
let iter = self.db.prefix_iter::<Piece>(task_id.as_bytes())?;