feat: remove state for piece (#50)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-07-03 12:07:55 +08:00 committed by GitHub
parent 9491454c1c
commit a477dba0d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 129 deletions

View File

@ -19,7 +19,6 @@ use crate::storage::{Error, Result};
use chrono::{NaiveDateTime, Utc};
use rocksdb::{BlockBasedOptions, Cache, ColumnFamily, Options, DB};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::Path;
use tracing::info;
@ -64,34 +63,6 @@ pub struct Task {
pub created_at: NaiveDateTime,
}
// PieceState is the state of the piece.
#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PieceState {
// Running is the state of the piece which is running,
// piece is being downloaded from source or other peers.
#[default]
Running,
// Succeeded is the state of the piece which is succeeded, piece downloaded successfully.
Succeeded,
// Failed is the state of the piece which is failed. When the enableBackToSource is true in dfdaemon configuration,
// it means that peer back-to-source downloads failed, and when the enableBackToSource is false
// in dfdaemon configuration, it means that piece downloads from other peers failed.
Failed,
}
// PieceState implements the fmt::Display interface.
impl fmt::Display for PieceState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Running => write!(f, "Running"),
Self::Succeeded => write!(f, "Succeeded"),
Self::Failed => write!(f, "Failed"),
}
}
}
// Piece is the metadata of the piece.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Piece {
@ -107,9 +78,6 @@ pub struct Piece {
// digest is the digest of the piece.
pub digest: String,
// state is the state of the piece.
pub state: PieceState,
// uploaded_count is the count of the piece uploaded by other peers.
pub uploaded_count: u64,
@ -204,7 +172,6 @@ impl Metadata {
id,
&Piece {
number,
state: PieceState::Running,
updated_at: Utc::now().naive_utc(),
created_at: Utc::now().naive_utc(),
..Default::default()
@ -212,8 +179,8 @@ impl Metadata {
)
}
// download_piece_succeeded updates the metadata of the piece when the piece downloads succeeded.
pub fn download_piece_succeeded(
// download_piece_finished updates the metadata of the piece when the piece downloads finished.
pub fn download_piece_finished(
&self,
id: &str,
offset: u64,
@ -222,36 +189,9 @@ impl Metadata {
) -> Result<()> {
match self.get_piece(id)? {
Some(mut piece) => {
if piece.state != PieceState::Running {
return Err(Error::InvalidStateTransition(
piece.state.to_string(),
PieceState::Succeeded.to_string(),
));
}
piece.offset = offset;
piece.length = length;
piece.digest = digest.to_string();
piece.state = PieceState::Succeeded;
piece.updated_at = Utc::now().naive_utc();
self.put_piece(id, &piece)
}
None => Err(Error::PieceNotFound(id.to_string())),
}
}
// download_piece_failed updates the metadata of the piece when the piece downloads failed.
pub fn download_piece_failed(&self, id: &str) -> Result<()> {
match self.get_piece(id)? {
Some(mut piece) => {
if piece.state != PieceState::Running {
return Err(Error::InvalidStateTransition(
piece.state.to_string(),
PieceState::Failed.to_string(),
));
}
piece.state = PieceState::Failed;
piece.updated_at = Utc::now().naive_utc();
self.put_piece(id, &piece)
}
@ -263,10 +203,6 @@ impl Metadata {
pub fn upload_piece_finished(&self, id: &str) -> Result<()> {
match self.get_piece(id)? {
Some(mut piece) => {
if piece.state != PieceState::Succeeded {
return Err(Error::InvalidState(piece.state.to_string()));
}
piece.uploaded_count += 1;
piece.updated_at = Utc::now().naive_utc();
self.put_piece(id, &piece)
@ -284,14 +220,6 @@ impl Metadata {
}
}
// get_piece_state gets the piece state.
pub fn get_piece_state(&self, id: &str) -> Result<Option<PieceState>> {
match self.get_piece(id)? {
Some(piece) => Ok(Some(piece.state)),
None => Ok(None),
}
}
// piece_id returns the piece id.
pub fn piece_id(&self, task_id: &str, number: u32) -> String {
format!("{}-{}", task_id, number)

View File

@ -15,14 +15,10 @@
*/
use std::path::Path;
use std::time::Duration;
mod content;
mod metadata;
// DEFAULT_RETRY_INTERVAL is the default retry interval for waiting piece download finished.
const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_millis(10);
// Error is the error for Storage.
#[derive(Debug, thiserror::Error)]
pub enum Error {
@ -105,8 +101,8 @@ impl Storage {
self.metadata.download_piece_started(task_id, number)
}
// download_piece_succeeded updates the metadata of the piece when the piece downloads succeeded.
pub fn download_piece_succeeded(
// download_piece_finished updates the metadata of the piece and writes the data of piece to file.
pub fn download_piece_finished(
&self,
task_id: &str,
offset: u64,
@ -116,18 +112,12 @@ impl Storage {
) -> Result<()> {
self.content.write_piece(task_id, offset, data)?;
self.metadata
.download_piece_succeeded(task_id, offset, length, digest)
.download_piece_finished(task_id, offset, length, digest)
}
// download_piece_failed updates the metadata of the piece when the piece downloads failed.
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.metadata
.download_piece_failed(self.metadata.piece_id(task_id, number).as_str())
}
// upload_piece updates the metadata of the piece when the piece uploads finished and
// upload_piece_finished updates the metadata of the piece when the piece uploads finished and
// returns the data of the piece.
pub fn upload_piece(&self, task_id: &str, number: u32) -> Result<Vec<u8>> {
pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result<Vec<u8>> {
let id = self.metadata.piece_id(task_id, number);
match self.metadata.get_piece(&id)? {
Some(piece) => {
@ -141,49 +131,9 @@ impl Storage {
}
}
// reuse_piece return the data of piece, if piece is running, waiting for piece download finished.
pub async fn reuse_piece(&self, task_id: &str, number: u32) -> Result<Vec<u8>> {
loop {
let id = self.metadata.piece_id(task_id, number);
match self
.metadata
.get_piece(self.metadata.piece_id(task_id, number).as_str())
{
Ok(Some(piece)) => match piece.state {
// If the piece is succeeded, return the data of
// the piece and update the metadata.
metadata::PieceState::Succeeded => return self.upload_piece(task_id, number),
// If the piece is failed, return the error.
metadata::PieceState::Failed => return Err(Error::PieceStateIsFailed(id)),
// If the piece is running, poll the matedata of the piece
// until the piece download is finished.
metadata::PieceState::Running => {}
},
// If the piece is not found, return the error.
Ok(None) => return Err(Error::PieceNotFound(id)),
Err(err) => return Err(err),
}
// Sleep to avoid hot looping and wait for the piece to be download finished.
tokio::time::sleep(DEFAULT_RETRY_INTERVAL).await;
}
}
// get_piece returns the piece metadata.
pub fn get_piece(&self, task_id: &str, number: u32) -> Result<Option<metadata::Piece>> {
self.metadata
.get_piece(self.metadata.piece_id(task_id, number).as_str())
}
// get_piece_state returns the piece state.
pub fn get_piece_state(
&self,
task_id: &str,
number: u32,
) -> Result<Option<metadata::PieceState>> {
self.metadata
.get_piece_state(self.metadata.piece_id(task_id, number).as_str())
}
}