feat: handle download with range parameter (#107)

feat: add calculate_piece_numbers_by_range to task

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-11-13 12:17:04 +08:00 committed by GitHub
parent b993a133d0
commit 81f1675e61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 8 deletions

View File

@ -55,6 +55,9 @@ pub struct Task {
// uploaded_count is the count of the task uploaded by other peers. // uploaded_count is the count of the task uploaded by other peers.
pub uploaded_count: u64, pub uploaded_count: u64,
// content_length is the length of the task.
pub content_length: i64,
// updated_at is the time when the task metadata is updated. If the task is downloaded // updated_at is the time when the task metadata is updated. If the task is downloaded
// by other peers, it will also update updated_at. // by other peers, it will also update updated_at.
pub updated_at: NaiveDateTime, pub updated_at: NaiveDateTime,

View File

@ -104,9 +104,7 @@ impl Storage {
// Check the digest of the piece. // Check the digest of the piece.
if digest.to_string() != expected_digest { if digest.to_string() != expected_digest {
return Err(Error::PieceDigestMismatch( return Err(Error::PieceDigestMismatch(self.piece_id(task_id, number)));
self.metadata.piece_id(task_id, number),
));
} }
self.metadata.download_piece_finished( self.metadata.download_piece_finished(
@ -136,9 +134,7 @@ impl Storage {
self.metadata.upload_piece_finished(task_id, number)?; self.metadata.upload_piece_finished(task_id, number)?;
Ok(reader) Ok(reader)
} }
None => Err(Error::PieceNotFound( None => Err(Error::PieceNotFound(self.piece_id(task_id, number))),
self.metadata.piece_id(task_id, number),
)),
} }
} }
@ -152,4 +148,9 @@ impl Storage {
pub fn get_pieces(&self, task_id: &str) -> Result<Vec<metadata::Piece>> { pub fn get_pieces(&self, task_id: &str) -> Result<Vec<metadata::Piece>> {
self.metadata.get_pieces(task_id) self.metadata.get_pieces(task_id)
} }
// piece_id returns the piece id.
pub fn piece_id(&self, task_id: &str, number: i32) -> String {
self.metadata.piece_id(task_id, number)
}
} }

View File

@ -98,7 +98,19 @@ impl Task {
Some(task) => { Some(task) => {
// If the task is finished, return the file. // If the task is finished, return the file.
if task.is_finished() { if task.is_finished() {
let pieces = self.piece.get_all(task_id.as_str())?; let pieces = match download.range {
Some(range) => {
// Calculate the piece numbers to download.
let numbers = self
.piece
.calculate_numbers_by_range(task.piece_length, range);
// Get the pieces by numbers.
self.piece.get_by_numbers(task_id.as_str(), &numbers)?
}
None => self.piece.get_all(task_id.as_str())?,
};
for piece in pieces { for piece in pieces {
// Seek to the offset of the piece. // Seek to the offset of the piece.
f.seek(SeekFrom::Start(piece.offset)).await?; f.seek(SeekFrom::Start(piece.offset)).await?;

View File

@ -18,7 +18,7 @@ use crate::backend::http::{Request, HTTP};
use crate::grpc::{dfdaemon::DfdaemonClient, scheduler::SchedulerClient}; use crate::grpc::{dfdaemon::DfdaemonClient, scheduler::SchedulerClient};
use crate::storage::{metadata, Storage}; use crate::storage::{metadata, Storage};
use crate::{Error, HttpError, Result}; use crate::{Error, HttpError, Result};
use dragonfly_api::common::v2::Peer; use dragonfly_api::common::v2::{Peer, Range};
use dragonfly_api::dfdaemon::v2::{ use dragonfly_api::dfdaemon::v2::{
sync_pieces_request, sync_pieces_response, InterestedPiecesRequest, InterestedPiecesResponse, sync_pieces_request, sync_pieces_response, InterestedPiecesRequest, InterestedPiecesResponse,
SyncPiecesRequest, SyncPiecesRequest,
@ -66,6 +66,39 @@ impl Piece {
self.storage.get_pieces(task_id) self.storage.get_pieces(task_id)
} }
// get_by_numbers gets pieces by numbers from the local storage.
pub fn get_by_numbers(&self, task_id: &str, numbers: &[i32]) -> Result<Vec<metadata::Piece>> {
let mut pieces = Vec::new();
for number in numbers {
let piece = self
.storage
.get_piece(task_id, *number)?
.ok_or(Error::PieceNotFound(
self.storage.piece_id(task_id, *number),
))?;
pieces.push(piece);
}
Ok(pieces)
}
// calculate_numbers_by_range calculates the piece numbers to download.
pub fn calculate_numbers_by_range(&self, piece_length: i32, range: Range) -> Vec<i32> {
let mut numbers = Vec::new();
let mut number = 0;
let mut current_length = 0;
while current_length < range.start + range.length {
current_length = i64::from((number + 1) * piece_length);
if current_length > range.start {
numbers.push(number);
}
number += 1;
}
numbers
}
// download_from_local_peer downloads a piece from a local peer. // download_from_local_peer downloads a piece from a local peer.
pub async fn download_from_local_peer( pub async fn download_from_local_peer(
&self, &self,