diff --git a/src/storage/metadata.rs b/src/storage/metadata.rs index da675162..2cc1aa50 100644 --- a/src/storage/metadata.rs +++ b/src/storage/metadata.rs @@ -55,6 +55,9 @@ pub struct Task { // uploaded_count is the count of the task uploaded by other peers. pub uploaded_count: u64, + // content_length is the length of the task. + pub content_length: i64, + // updated_at is the time when the task metadata is updated. If the task is downloaded // by other peers, it will also update updated_at. pub updated_at: NaiveDateTime, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d124c219..18319888 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -104,9 +104,7 @@ impl Storage { // Check the digest of the piece. if digest.to_string() != expected_digest { - return Err(Error::PieceDigestMismatch( - self.metadata.piece_id(task_id, number), - )); + return Err(Error::PieceDigestMismatch(self.piece_id(task_id, number))); } self.metadata.download_piece_finished( @@ -136,9 +134,7 @@ impl Storage { self.metadata.upload_piece_finished(task_id, number)?; Ok(reader) } - None => Err(Error::PieceNotFound( - self.metadata.piece_id(task_id, number), - )), + None => Err(Error::PieceNotFound(self.piece_id(task_id, number))), } } @@ -152,4 +148,9 @@ impl Storage { pub fn get_pieces(&self, task_id: &str) -> Result> { self.metadata.get_pieces(task_id) } + + // piece_id returns the piece id. + pub fn piece_id(&self, task_id: &str, number: i32) -> String { + self.metadata.piece_id(task_id, number) + } } diff --git a/src/task/mod.rs b/src/task/mod.rs index 4c46d209..626fe121 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -98,7 +98,19 @@ impl Task { Some(task) => { // If the task is finished, return the file. if task.is_finished() { - let pieces = self.piece.get_all(task_id.as_str())?; + let pieces = match download.range { + Some(range) => { + // Calculate the piece numbers to download. + let numbers = self + .piece + .calculate_numbers_by_range(task.piece_length, range); + + // Get the pieces by numbers. + self.piece.get_by_numbers(task_id.as_str(), &numbers)? + } + None => self.piece.get_all(task_id.as_str())?, + }; + for piece in pieces { // Seek to the offset of the piece. f.seek(SeekFrom::Start(piece.offset)).await?; diff --git a/src/task/piece.rs b/src/task/piece.rs index f2ec16aa..1a3d4215 100644 --- a/src/task/piece.rs +++ b/src/task/piece.rs @@ -18,7 +18,7 @@ use crate::backend::http::{Request, HTTP}; use crate::grpc::{dfdaemon::DfdaemonClient, scheduler::SchedulerClient}; use crate::storage::{metadata, Storage}; use crate::{Error, HttpError, Result}; -use dragonfly_api::common::v2::Peer; +use dragonfly_api::common::v2::{Peer, Range}; use dragonfly_api::dfdaemon::v2::{ sync_pieces_request, sync_pieces_response, InterestedPiecesRequest, InterestedPiecesResponse, SyncPiecesRequest, @@ -66,6 +66,39 @@ impl Piece { self.storage.get_pieces(task_id) } + // get_by_numbers gets pieces by numbers from the local storage. + pub fn get_by_numbers(&self, task_id: &str, numbers: &[i32]) -> Result> { + let mut pieces = Vec::new(); + for number in numbers { + let piece = self + .storage + .get_piece(task_id, *number)? + .ok_or(Error::PieceNotFound( + self.storage.piece_id(task_id, *number), + ))?; + pieces.push(piece); + } + + Ok(pieces) + } + + // calculate_numbers_by_range calculates the piece numbers to download. + pub fn calculate_numbers_by_range(&self, piece_length: i32, range: Range) -> Vec { + let mut numbers = Vec::new(); + let mut number = 0; + let mut current_length = 0; + while current_length < range.start + range.length { + current_length = i64::from((number + 1) * piece_length); + if current_length > range.start { + numbers.push(number); + } + + number += 1; + } + + numbers + } + // download_from_local_peer downloads a piece from a local peer. pub async fn download_from_local_peer( &self,