diff --git a/src/storage/metadata.rs b/src/storage/metadata.rs index 9776eaec..4539ca5c 100644 --- a/src/storage/metadata.rs +++ b/src/storage/metadata.rs @@ -19,7 +19,6 @@ use crate::storage::{Error, Result}; use chrono::{NaiveDateTime, Utc}; use rocksdb::{BlockBasedOptions, Cache, ColumnFamily, Options, DB}; use serde::{Deserialize, Serialize}; -use std::fmt; use std::path::Path; use tracing::info; @@ -64,34 +63,6 @@ pub struct Task { pub created_at: NaiveDateTime, } -// PieceState is the state of the piece. -#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub enum PieceState { - // Running is the state of the piece which is running, - // piece is being downloaded from source or other peers. - #[default] - Running, - - // Succeeded is the state of the piece which is succeeded, piece downloaded successfully. - Succeeded, - - // Failed is the state of the piece which is failed. When the enableBackToSource is true in dfdaemon configuration, - // it means that peer back-to-source downloads failed, and when the enableBackToSource is false - // in dfdaemon configuration, it means that piece downloads from other peers failed. - Failed, -} - -// PieceState implements the fmt::Display interface. -impl fmt::Display for PieceState { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::Running => write!(f, "Running"), - Self::Succeeded => write!(f, "Succeeded"), - Self::Failed => write!(f, "Failed"), - } - } -} - // Piece is the metadata of the piece. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Piece { @@ -107,9 +78,6 @@ pub struct Piece { // digest is the digest of the piece. pub digest: String, - // state is the state of the piece. - pub state: PieceState, - // uploaded_count is the count of the piece uploaded by other peers. pub uploaded_count: u64, @@ -204,7 +172,6 @@ impl Metadata { id, &Piece { number, - state: PieceState::Running, updated_at: Utc::now().naive_utc(), created_at: Utc::now().naive_utc(), ..Default::default() @@ -212,8 +179,8 @@ impl Metadata { ) } - // download_piece_succeeded updates the metadata of the piece when the piece downloads succeeded. - pub fn download_piece_succeeded( + // download_piece_finished updates the metadata of the piece when the piece downloads finished. + pub fn download_piece_finished( &self, id: &str, offset: u64, @@ -222,36 +189,9 @@ impl Metadata { ) -> Result<()> { match self.get_piece(id)? { Some(mut piece) => { - if piece.state != PieceState::Running { - return Err(Error::InvalidStateTransition( - piece.state.to_string(), - PieceState::Succeeded.to_string(), - )); - } - piece.offset = offset; piece.length = length; piece.digest = digest.to_string(); - piece.state = PieceState::Succeeded; - piece.updated_at = Utc::now().naive_utc(); - self.put_piece(id, &piece) - } - None => Err(Error::PieceNotFound(id.to_string())), - } - } - - // download_piece_failed updates the metadata of the piece when the piece downloads failed. - pub fn download_piece_failed(&self, id: &str) -> Result<()> { - match self.get_piece(id)? { - Some(mut piece) => { - if piece.state != PieceState::Running { - return Err(Error::InvalidStateTransition( - piece.state.to_string(), - PieceState::Failed.to_string(), - )); - } - - piece.state = PieceState::Failed; piece.updated_at = Utc::now().naive_utc(); self.put_piece(id, &piece) } @@ -263,10 +203,6 @@ impl Metadata { pub fn upload_piece_finished(&self, id: &str) -> Result<()> { match self.get_piece(id)? { Some(mut piece) => { - if piece.state != PieceState::Succeeded { - return Err(Error::InvalidState(piece.state.to_string())); - } - piece.uploaded_count += 1; piece.updated_at = Utc::now().naive_utc(); self.put_piece(id, &piece) @@ -284,14 +220,6 @@ impl Metadata { } } - // get_piece_state gets the piece state. - pub fn get_piece_state(&self, id: &str) -> Result> { - match self.get_piece(id)? { - Some(piece) => Ok(Some(piece.state)), - None => Ok(None), - } - } - // piece_id returns the piece id. pub fn piece_id(&self, task_id: &str, number: u32) -> String { format!("{}-{}", task_id, number) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 31c31295..caed0e8d 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -15,14 +15,10 @@ */ use std::path::Path; -use std::time::Duration; mod content; mod metadata; -// DEFAULT_RETRY_INTERVAL is the default retry interval for waiting piece download finished. -const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_millis(10); - // Error is the error for Storage. #[derive(Debug, thiserror::Error)] pub enum Error { @@ -105,8 +101,8 @@ impl Storage { self.metadata.download_piece_started(task_id, number) } - // download_piece_succeeded updates the metadata of the piece when the piece downloads succeeded. - pub fn download_piece_succeeded( + // download_piece_finished updates the metadata of the piece and writes the data of piece to file. + pub fn download_piece_finished( &self, task_id: &str, offset: u64, @@ -116,18 +112,12 @@ impl Storage { ) -> Result<()> { self.content.write_piece(task_id, offset, data)?; self.metadata - .download_piece_succeeded(task_id, offset, length, digest) + .download_piece_finished(task_id, offset, length, digest) } - // download_piece_failed updates the metadata of the piece when the piece downloads failed. - pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> { - self.metadata - .download_piece_failed(self.metadata.piece_id(task_id, number).as_str()) - } - - // upload_piece updates the metadata of the piece when the piece uploads finished and + // upload_piece_finished updates the metadata of the piece when the piece uploads finished and // returns the data of the piece. - pub fn upload_piece(&self, task_id: &str, number: u32) -> Result> { + pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result> { let id = self.metadata.piece_id(task_id, number); match self.metadata.get_piece(&id)? { Some(piece) => { @@ -141,49 +131,9 @@ impl Storage { } } - // reuse_piece return the data of piece, if piece is running, waiting for piece download finished. - pub async fn reuse_piece(&self, task_id: &str, number: u32) -> Result> { - loop { - let id = self.metadata.piece_id(task_id, number); - match self - .metadata - .get_piece(self.metadata.piece_id(task_id, number).as_str()) - { - Ok(Some(piece)) => match piece.state { - // If the piece is succeeded, return the data of - // the piece and update the metadata. - metadata::PieceState::Succeeded => return self.upload_piece(task_id, number), - - // If the piece is failed, return the error. - metadata::PieceState::Failed => return Err(Error::PieceStateIsFailed(id)), - - // If the piece is running, poll the matedata of the piece - // until the piece download is finished. - metadata::PieceState::Running => {} - }, - // If the piece is not found, return the error. - Ok(None) => return Err(Error::PieceNotFound(id)), - Err(err) => return Err(err), - } - - // Sleep to avoid hot looping and wait for the piece to be download finished. - tokio::time::sleep(DEFAULT_RETRY_INTERVAL).await; - } - } - // get_piece returns the piece metadata. pub fn get_piece(&self, task_id: &str, number: u32) -> Result> { self.metadata .get_piece(self.metadata.piece_id(task_id, number).as_str()) } - - // get_piece_state returns the piece state. - pub fn get_piece_state( - &self, - task_id: &str, - number: u32, - ) -> Result> { - self.metadata - .get_piece_state(self.metadata.piece_id(task_id, number).as_str()) - } }