feat: download_task with spawn (#123)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-11-22 15:22:15 +08:00 committed by GitHub
parent 6eefe73218
commit d13c0af919
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 344 additions and 207 deletions

View File

@ -292,7 +292,7 @@ impl Dfdaemon for DfdaemonServerHandler {
// Get the piece content from the local storage.
let mut reader = match task
.piece
.download_from_local_peer(&task_id, None, interested_piece_number)
.download_from_local_peer(&task_id, interested_piece_number)
.await
{
Ok(reader) => reader,
@ -347,7 +347,7 @@ impl Dfdaemon for DfdaemonServerHandler {
type DownloadTaskStream = ReceiverStream<Result<DownloadTaskResponse, Status>>;
// download_task tells the dfdaemon to download the task.
#[instrument(skip_all, fields(task_id, peer_id), ret)]
#[instrument(skip_all, fields(task_id, peer_id))]
async fn download_task(
&self,
request: Request<DownloadTaskRequest>,
@ -431,58 +431,17 @@ impl Dfdaemon for DfdaemonServerHandler {
// Initialize stream channel.
let (out_stream_tx, out_stream_rx) = mpsc::channel(128);
tokio::spawn(async move {
match task
.download_into_file(
task_id.as_str(),
host_id.as_str(),
peer_id.as_str(),
content_length,
header.clone(),
download.clone(),
)
.await
{
Ok(mut download_progress_rx) => {
while let Some(finished_piece) = download_progress_rx.recv().await {
out_stream_tx
.send(Ok(DownloadTaskResponse {
content_length,
piece: Some(Piece {
number: finished_piece.number,
parent_id: None,
offset: finished_piece.offset,
length: finished_piece.length,
digest: finished_piece.clone().digest,
content: None,
traffic_type: None,
cost: finished_piece.prost_cost(),
created_at: Some(prost_wkt_types::Timestamp::from(
finished_piece.created_at,
)),
}),
}))
.await
.unwrap_or_else(|e| {
error!("send to out stream: {}", e);
});
}
}
Err(e) => {
error!("download task: {}", e);
out_stream_tx
.send(Err(Status::internal(e.to_string())))
.await
.unwrap_or_else(|e| {
error!("send to out stream: {}", e);
});
// Download task failed.
task.download_task_failed(task_id.as_str())
.unwrap_or_else(|e| {
error!("download task failed: {}", e);
});
}
}
task.download_into_file(
task_id.as_str(),
host_id.as_str(),
peer_id.as_str(),
content_length,
header.clone(),
download.clone(),
out_stream_tx.clone(),
)
.await;
drop(out_stream_tx);
});
Ok(Response::new(ReceiverStream::new(out_stream_rx)))

View File

@ -54,7 +54,7 @@ impl Content {
}
// read_piece reads the piece from the content.
#[instrument(skip(self, offset, length))]
#[instrument(skip_all)]
pub async fn read_piece(
&self,
task_id: &str,
@ -67,7 +67,7 @@ impl Content {
}
// write_piece writes the piece to the content.
#[instrument(skip(self, offset, reader))]
#[instrument(skip_all)]
pub async fn write_piece<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,

View File

@ -205,7 +205,7 @@ impl Metadata {
}
// download_task_started updates the metadata of the task when the task downloads started.
#[instrument(skip(self, piece_length))]
#[instrument(skip_all)]
pub fn download_task_started(&self, id: &str, piece_length: u64) -> Result<()> {
let task = match self.get_task(id)? {
// If the task exists, update the updated_at.
@ -227,7 +227,7 @@ impl Metadata {
}
// set_task_content_length sets the content length of the task.
#[instrument(skip(self, content_length))]
#[instrument(skip_all)]
pub fn set_task_content_length(&self, id: &str, content_length: u64) -> Result<()> {
if let Some(mut task) = self.get_task(id)? {
task.content_length = Some(content_length);
@ -238,7 +238,7 @@ impl Metadata {
}
// upload_task_finished updates the metadata of the task when task uploads finished.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn upload_task_finished(&self, id: &str) -> Result<()> {
match self.get_task(id)? {
Some(mut task) => {
@ -251,7 +251,7 @@ impl Metadata {
}
// download_task_failed updates the metadata of the task when the task downloads failed.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn download_task_failed(&self, id: &str) -> Result<()> {
match self.get_task(id)? {
Some(_piece) => self.delete_task(id),
@ -260,7 +260,7 @@ impl Metadata {
}
// get_task gets the task metadata.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn get_task(&self, id: &str) -> Result<Option<Task>> {
let handle = self.cf_handle(TASK_CF_NAME)?;
match self.db.get_cf(handle, id)? {
@ -270,7 +270,7 @@ impl Metadata {
}
// put_task puts the task metadata.
#[instrument(skip(self, task))]
#[instrument(skip_all)]
fn put_task(&self, id: &str, task: &Task) -> Result<()> {
let handle = self.cf_handle(TASK_CF_NAME)?;
let json = serde_json::to_string(&task)?;
@ -279,7 +279,7 @@ impl Metadata {
}
// delete_task deletes the task metadata.
#[instrument(skip(self))]
#[instrument(skip_all)]
fn delete_task(&self, id: &str) -> Result<()> {
let handle = self.cf_handle(TASK_CF_NAME)?;
self.db.delete_cf(handle, id.as_bytes())?;
@ -287,7 +287,7 @@ impl Metadata {
}
// download_piece_started updates the metadata of the piece when the piece downloads started.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<()> {
self.put_piece(
task_id,
@ -301,7 +301,7 @@ impl Metadata {
}
// download_piece_finished updates the metadata of the piece when the piece downloads finished.
#[instrument(skip(self, offset, length, digest))]
#[instrument(skip_all)]
pub fn download_piece_finished(
&self,
task_id: &str,
@ -324,7 +324,7 @@ impl Metadata {
}
// download_piece_failed updates the metadata of the piece when the piece downloads failed.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
match self.get_piece(task_id, number)? {
Some(_piece) => self.delete_piece(task_id, number),
@ -333,7 +333,7 @@ impl Metadata {
}
// upload_piece_finished updates the metadata of the piece when piece uploads finished.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result<()> {
match self.get_piece(task_id, number)? {
Some(mut piece) => {
@ -346,7 +346,7 @@ impl Metadata {
}
// get_piece gets the piece metadata.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn get_piece(&self, task_id: &str, number: u32) -> Result<Option<Piece>> {
let id = self.piece_id(task_id, number);
let handle = self.cf_handle(PIECE_CF_NAME)?;
@ -357,7 +357,7 @@ impl Metadata {
}
// get_pieces gets the pieces metadata.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn get_pieces(&self, task_id: &str) -> Result<Vec<Piece>> {
let handle = self.cf_handle(PIECE_CF_NAME)?;
let iter = self.db.prefix_iterator_cf(handle, task_id.as_bytes());
@ -374,7 +374,7 @@ impl Metadata {
}
// put_piece puts the piece metadata.
#[instrument(skip(self, piece))]
#[instrument(skip_all)]
fn put_piece(&self, task_id: &str, piece: &Piece) -> Result<()> {
let id = self.piece_id(task_id, piece.number);
let handle = self.cf_handle(PIECE_CF_NAME)?;
@ -384,7 +384,7 @@ impl Metadata {
}
// delete_piece deletes the piece metadata.
#[instrument(skip(self))]
#[instrument(skip_all)]
fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> {
let id = self.piece_id(task_id, number);
let handle = self.cf_handle(PIECE_CF_NAME)?;
@ -393,7 +393,7 @@ impl Metadata {
}
// piece_id returns the piece id.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn piece_id(&self, task_id: &str, number: u32) -> String {
format!("{}-{}", task_id, number)
}

View File

@ -42,31 +42,31 @@ impl Storage {
}
// download_task_started updates the metadata of the task when the task downloads started.
#[instrument(skip(self, piece_length))]
#[instrument(skip_all)]
pub fn download_task_started(&self, id: &str, piece_length: u64) -> Result<()> {
self.metadata.download_task_started(id, piece_length)
}
// set_task_content_length sets the content length of the task.
#[instrument(skip(self, content_length))]
#[instrument(skip_all)]
pub fn set_task_content_length(&self, id: &str, content_length: u64) -> Result<()> {
self.metadata.set_task_content_length(id, content_length)
}
// download_task_failed updates the metadata of the task when the task downloads failed.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn download_task_failed(&self, id: &str) -> Result<()> {
self.metadata.download_task_failed(id)
}
// upload_task_finished updates the metadata of the task when task uploads finished.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn upload_task_finished(&self, id: &str) -> Result<()> {
self.metadata.upload_task_finished(id)
}
// get_task returns the task metadata.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn get_task(&self, id: &str) -> Result<Option<metadata::Task>> {
let task = self.metadata.get_task(id)?;
Ok(task)
@ -74,13 +74,13 @@ impl Storage {
// download_piece_started updates the metadata of the piece and writes
// the data of piece to file when the piece downloads started.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<()> {
self.metadata.download_piece_started(task_id, number)
}
// download_piece_from_source_finished is used for downloading piece from source.
#[instrument(skip(self, offset, length, reader))]
#[instrument(skip_all)]
pub async fn download_piece_from_source_finished<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,
@ -103,7 +103,7 @@ impl Storage {
}
// download_piece_from_remote_peer_finished is used for downloading piece from remote peer.
#[instrument(skip(self, offset, expected_digest, reader))]
#[instrument(skip_all)]
pub async fn download_piece_from_remote_peer_finished<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,
@ -132,14 +132,14 @@ impl Storage {
}
// download_piece_failed updates the metadata of the piece when the piece downloads failed.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.metadata.download_piece_failed(task_id, number)
}
// upload_piece updates the metadata of the piece and
// returns the data of the piece.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub async fn upload_piece(&self, task_id: &str, number: u32) -> Result<impl AsyncRead> {
match self.metadata.get_piece(task_id, number)? {
Some(piece) => {
@ -155,20 +155,20 @@ impl Storage {
}
// get_piece returns the piece metadata.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn get_piece(&self, task_id: &str, number: u32) -> Result<Option<metadata::Piece>> {
let piece = self.metadata.get_piece(task_id, number)?;
Ok(piece)
}
// get_pieces returns the pieces metadata.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn get_pieces(&self, task_id: &str) -> Result<Vec<metadata::Piece>> {
self.metadata.get_pieces(task_id)
}
// piece_id returns the piece id.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn piece_id(&self, task_id: &str, number: u32) -> String {
self.metadata.piece_id(task_id, number)
}

View File

@ -19,14 +19,15 @@ use crate::grpc::scheduler::SchedulerClient;
use crate::storage::{metadata, Storage};
use crate::utils::http::headermap_to_hashmap;
use crate::utils::id_generator::IDGenerator;
use crate::{Error, Result};
use crate::{Error, Result as ClientResult};
use dragonfly_api::common::v2::{Download, Piece, TrafficType};
use dragonfly_api::dfdaemon::v2::DownloadTaskResponse;
use dragonfly_api::scheduler::v2::{
announce_peer_request, announce_peer_response, download_piece_back_to_source_failed_request,
AnnouncePeerRequest, DownloadPeerStartedRequest, DownloadPieceBackToSourceFailedRequest,
DownloadPieceFailedRequest, DownloadPieceFinishedRequest, HttpResponse, RegisterPeerRequest,
};
use mpsc::Receiver;
use mpsc::Sender;
use reqwest::header::{self, HeaderMap};
use std::sync::Arc;
use std::time::Duration;
@ -37,6 +38,7 @@ use tokio::{
};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Request;
use tonic::Status;
use tracing::{error, info, instrument};
pub mod piece;
@ -86,24 +88,25 @@ impl Task {
// get gets a task metadata.
#[instrument(skip(self))]
pub fn get(&self, task_id: &str) -> Result<Option<metadata::Task>> {
pub fn get(&self, task_id: &str) -> ClientResult<Option<metadata::Task>> {
self.storage.get_task(task_id)
}
// download_task_started updates the metadata of the task when the task downloads started.
#[instrument(skip(self, piece_length))]
pub fn download_task_started(&self, id: &str, piece_length: u64) -> Result<()> {
pub fn download_task_started(&self, id: &str, piece_length: u64) -> ClientResult<()> {
self.storage.download_task_started(id, piece_length)
}
// download_task_failed updates the metadata of the task when the task downloads failed.
#[instrument(skip(self))]
pub fn download_task_failed(&self, id: &str) -> Result<()> {
pub fn download_task_failed(&self, id: &str) -> ClientResult<()> {
self.storage.download_task_failed(id)
}
// download_into_file downloads a task into a file.
#[instrument(skip(self, content_length, header, download))]
#[instrument(skip(self, content_length, header, download, download_progress_tx))]
#[allow(clippy::too_many_arguments)]
pub async fn download_into_file(
&self,
task_id: &str,
@ -112,78 +115,154 @@ impl Task {
content_length: u64,
header: HeaderMap,
download: Download,
) -> Result<Receiver<metadata::Piece>> {
// Initialize the download progress channel.
let (download_progress_tx, download_progress_rx) = mpsc::channel(128);
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
) {
// Convert the timeout.
let timeout: Option<Duration> = match download.timeout.clone() {
Some(timeout) => {
Some(Duration::try_from(timeout).map_err(|_| Error::InvalidParameter())?)
}
Some(timeout) => match Duration::try_from(timeout) {
Ok(timeout) => Some(timeout),
Err(err) => {
error!("convert timeout error: {:?}", err);
if let Err(err) = download_progress_tx
.send(Err(Status::invalid_argument("invalid timeout")))
.await
{
error!("send download progress error: {:?}", err);
}
return;
}
},
None => None,
};
// Open the file.
let mut f = OpenOptions::new()
let mut f = match OpenOptions::new()
.create(true)
.write(true)
.open(download.output_path.as_str())
.await?;
.await
{
Ok(f) => f,
Err(err) => {
error!("open file error: {:?}", err);
if let Err(err) = download_progress_tx
.send(Err(Status::internal("open file error")))
.await
{
error!("send download progress error: {:?}", err);
}
return;
}
};
// Calculate the interested pieces to download.
let interested_pieces = self.piece.calculate_interested(
task_id,
peer_id,
let interested_pieces = match self.piece.calculate_interested(
download.piece_length,
content_length,
download.range.clone(),
)?;
) {
Ok(interested_pieces) => interested_pieces,
Err(err) => {
error!("calculate interested pieces error: {:?}", err);
if let Err(err) = download_progress_tx
.send(Err(Status::invalid_argument(
"calculate interested pieces error",
)))
.await
{
error!("send download progress error: {:?}", err);
}
return;
}
};
info!("interested pieces: {:?}", interested_pieces);
// Get the task from the local storage.
let task = self
.get(task_id)?
.ok_or(Error::TaskNotFound(task_id.to_string()))?;
let task = match self.get(task_id) {
Ok(Some(task)) => task,
Ok(None) => {
error!("task not found");
if let Err(err) = download_progress_tx
.send(Err(Status::not_found("task not found")))
.await
{
error!("send download progress error: {:?}", err);
}
return;
}
Err(err) => {
error!("get task error: {:?}", err);
if let Err(err) = download_progress_tx
.send(Err(Status::internal("get task error")))
.await
{
error!("send download progress error: {:?}", err);
}
return;
}
};
// If the task is finished, return the file.
if task.is_finished() {
info!("task is finished, download the pieces from the local peer");
// Download the pieces from the local peer.
return self
match self
.download_partial_from_local_peer_into_file(
&mut f,
task_id,
peer_id,
interested_pieces,
interested_pieces.clone(),
content_length,
download_progress_tx.clone(),
)
.await;
.await
{
Ok(_) => {}
Err(err) => {
error!("download from local peer error: {:?}", err);
if let Err(err) = download_progress_tx
.send(Err(Status::internal("download from local peer error")))
.await
{
error!("send download progress error: {:?}", err);
}
return;
}
};
}
info!("download the pieces from local peer");
// Download the pieces from the local peer.
let mut finished_pieces: Vec<metadata::Piece> = Vec::new();
while let Some(finished_piece) = self
let finished_pieces = match self
.download_partial_from_local_peer_into_file(
&mut f,
task_id,
peer_id,
interested_pieces.clone(),
content_length,
download_progress_tx.clone(),
)
.await?
.recv()
.await
{
info!("finished piece from local peer: {:?}", finished_piece);
Ok(finished_pieces) => finished_pieces,
Err(err) => {
error!("download from local peer error: {:?}", err);
if let Err(err) = download_progress_tx
.send(Err(Status::internal("download from local peer error")))
.await
{
error!("send download progress error: {:?}", err);
}
// Send the download progress.
download_progress_tx.send(finished_piece.clone()).await?;
// Store the finished piece.
finished_pieces.push(finished_piece);
}
return;
}
};
// Remove the finished pieces from the pieces.
let interested_pieces = self
@ -197,7 +276,7 @@ impl Task {
// Check if all pieces are downloaded.
if interested_pieces.is_empty() {
info!("all pieces are downloaded from local peer");
return Ok(download_progress_rx);
return;
};
info!("download the pieces with scheduler");
@ -210,31 +289,50 @@ impl Task {
host_id,
peer_id,
interested_pieces.clone(),
content_length,
download.clone(),
download_progress_tx.clone(),
)
.await
{
Ok(download_progress_rx) => Ok(download_progress_rx),
Ok(_) => {}
Err(err) => {
error!("download partial with scheduler into file error: {:?}", err);
error!("download with scheduler error: {:?}", err);
// Download the pieces from the source.
self.download_partial_from_source_into_file(
&mut f,
interested_pieces,
task_id,
peer_id,
download.url.clone(),
header.clone(),
timeout,
)
.await
match self
.download_partial_from_source_into_file(
&mut f,
task_id,
interested_pieces.clone(),
download.url.clone(),
header.clone(),
content_length,
timeout,
download_progress_tx.clone(),
)
.await
{
Ok(_) => {}
Err(err) => {
error!("download from source error: {:?}", err);
if let Err(err) = download_progress_tx
.send(Err(Status::internal("download from source error")))
.await
{
error!("send download progress error: {:?}", err);
}
return;
}
};
}
}
};
}
// download_partial_with_scheduler_into_file downloads a partial task with scheduler into a file.
#[instrument(skip(self, f, interested_pieces, download))]
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
async fn download_partial_with_scheduler_into_file(
&self,
f: &mut fs::File,
@ -242,11 +340,10 @@ impl Task {
host_id: &str,
peer_id: &str,
interested_pieces: Vec<metadata::Piece>,
content_length: u64,
download: Download,
) -> Result<Receiver<metadata::Piece>> {
// Initialize the download progress channel.
let (download_progress_tx, download_progress_rx) = mpsc::channel(128);
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
) -> ClientResult<Vec<metadata::Piece>> {
// Convert the header.
let header: HeaderMap = (&download.header).try_into()?;
@ -296,7 +393,8 @@ impl Task {
announce_peer_response::Response::EmptyTaskResponse(response) => {
// If the task is empty, return an empty vector.
info!("empty task response: {:?}", response);
return Ok(download_progress_rx);
drop(in_stream_tx);
return Ok(Vec::new());
}
announce_peer_response::Response::NormalTaskResponse(response) => {
// If the task is normal, download the pieces from the remote peer.
@ -307,7 +405,6 @@ impl Task {
.piece
.collect_interested_from_remote_peer(
task_id,
peer_id,
interested_pieces.clone(),
candidate_parents,
)
@ -318,7 +415,6 @@ impl Task {
.piece
.download_from_remote_peer(
task_id,
peer_id,
collect_interested_piece.number,
collect_interested_piece.parent.clone(),
)
@ -365,6 +461,24 @@ impl Task {
.write_into_file_and_verify(&mut reader, f, metadata.digest.as_str())
.await?;
info!(
"finished piece {} from remote peer {}",
collect_interested_piece.parent.id, metadata.number
);
// Construct the piece.
let piece = Piece {
number: metadata.number,
parent_id: Some(collect_interested_piece.parent.id.clone()),
offset: metadata.offset,
length: metadata.length,
digest: metadata.digest.clone(),
content: None,
traffic_type: Some(TrafficType::RemotePeer as i32),
cost: metadata.prost_cost(),
created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)),
};
// Send the download piece finished request.
in_stream_tx
.send(AnnouncePeerRequest {
@ -374,21 +488,7 @@ impl Task {
request: Some(
announce_peer_request::Request::DownloadPieceFinishedRequest(
DownloadPieceFinishedRequest {
piece: Some(Piece {
number: metadata.number,
parent_id: Some(
collect_interested_piece.parent.id.clone(),
),
offset: metadata.offset,
length: metadata.length,
digest: metadata.digest.clone(),
content: None,
traffic_type: Some(TrafficType::RemotePeer as i32),
cost: metadata.prost_cost(),
created_at: Some(prost_wkt_types::Timestamp::from(
metadata.created_at,
)),
}),
piece: Some(piece.clone()),
},
),
),
@ -396,11 +496,23 @@ impl Task {
.await?;
// Send the download progress.
download_progress_tx.send(metadata.clone()).await?;
download_progress_tx
.send(Ok(DownloadTaskResponse {
content_length,
piece: Some(piece.clone()),
}))
.await?;
// Store the finished piece.
finished_pieces.push(metadata.clone());
}
// Check if all pieces are downloaded.
if finished_pieces.len() == interested_pieces.len() {
info!("all pieces are downloaded with scheduler");
drop(in_stream_tx);
return Ok(finished_pieces);
}
}
announce_peer_response::Response::NeedBackToSourceResponse(response) => {
// If the task need back to source, download the pieces from the source.
@ -410,7 +522,7 @@ impl Task {
interested_pieces.clone(),
);
for interested_piece in interested_pieces {
for interested_piece in interested_pieces.clone() {
// Seek to the offset of the piece.
if let Err(err) = f.seek(SeekFrom::Start(interested_piece.offset)).await {
error!("seek error: {:?}", err);
@ -422,7 +534,6 @@ impl Task {
.piece
.download_from_source(
task_id,
peer_id,
interested_piece.number,
download.url.clone().as_str(),
interested_piece.offset,
@ -496,21 +607,58 @@ impl Task {
.write_into_file_and_verify(&mut reader, f, metadata.digest.as_str())
.await?;
info!("finished piece {} from source", metadata.number);
// Construct the piece.
let piece = Piece {
number: metadata.number,
parent_id: None,
offset: metadata.offset,
length: metadata.length,
digest: metadata.digest.clone(),
content: None,
traffic_type: Some(TrafficType::BackToSource as i32),
cost: metadata.prost_cost(),
created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)),
};
// Send the download piece finished request.
in_stream_tx
.send(AnnouncePeerRequest {
host_id: host_id.to_string(),
task_id: task_id.to_string(),
peer_id: peer_id.to_string(),
request: Some(
announce_peer_request::Request::DownloadPieceFinishedRequest(
DownloadPieceFinishedRequest {
piece: Some(piece.clone()),
},
),
),
})
.await?;
// Send the download progress.
download_progress_tx.send(metadata).await?;
download_progress_tx
.send(Ok(DownloadTaskResponse {
content_length,
piece: Some(piece.clone()),
}))
.await?;
// Store the finished piece.
finished_pieces.push(interested_piece.clone());
finished_pieces.push(metadata.clone());
}
if finished_pieces.len() == interested_pieces.len() {
info!("all pieces are downloaded from source");
drop(in_stream_tx);
return Ok(finished_pieces);
}
}
}
}
// Check if all pieces are downloaded.
if finished_pieces.len() == interested_pieces.len() {
return Ok(download_progress_rx);
}
// If not all pieces are downloaded, return an error.
Err(Error::Unknown(
"not all pieces are downloaded with scheduler".to_string(),
@ -518,17 +666,15 @@ impl Task {
}
// download_partial_from_local_peer_into_file downloads a partial task from a local peer into a file.
#[instrument(skip(self, f, interested_pieces))]
#[instrument(skip_all)]
async fn download_partial_from_local_peer_into_file(
&self,
f: &mut fs::File,
task_id: &str,
peer_id: &str,
interested_pieces: Vec<metadata::Piece>,
) -> Result<Receiver<metadata::Piece>> {
// Initialize the download progress channel.
let (download_progress_tx, download_progress_rx) = mpsc::channel(128);
content_length: u64,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
) -> ClientResult<Vec<metadata::Piece>> {
// Initialize the finished pieces.
let mut finished_pieces: Vec<metadata::Piece> = Vec::new();
@ -542,7 +688,7 @@ impl Task {
// Download the piece from the local peer.
let mut reader = match self
.piece
.download_from_local_peer(task_id, Some(peer_id), interested_piece.number)
.download_from_local_peer(task_id, interested_piece.number)
.await
{
Ok(reader) => reader,
@ -563,32 +709,50 @@ impl Task {
.write_into_file_and_verify(&mut reader, f, metadata.digest.as_str())
.await?;
info!("finished piece {} from local peer", metadata.number);
// Construct the piece.
let piece = Piece {
number: metadata.number,
parent_id: None,
offset: metadata.offset,
length: metadata.length,
digest: metadata.digest.clone(),
content: None,
traffic_type: Some(TrafficType::LocalPeer as i32),
cost: metadata.prost_cost(),
created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)),
};
// Send the download progress.
download_progress_tx.send(metadata).await?;
download_progress_tx
.send(Ok(DownloadTaskResponse {
content_length,
piece: Some(piece.clone()),
}))
.await?;
// Store the finished piece.
finished_pieces.push(interested_piece.clone());
}
Ok(download_progress_rx)
Ok(finished_pieces)
}
// download_partial_from_source_into_file downloads a partial task from the source into a file.
#[instrument(skip(self, f, interested_pieces, url, header, timeout))]
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
async fn download_partial_from_source_into_file(
&self,
f: &mut fs::File,
interested_pieces: Vec<metadata::Piece>,
task_id: &str,
peer_id: &str,
interested_pieces: Vec<metadata::Piece>,
url: String,
header: HeaderMap,
content_length: u64,
timeout: Option<Duration>,
) -> Result<Receiver<metadata::Piece>> {
// Initialize the download progress channel.
let (download_progress_tx, download_progress_rx) = mpsc::channel(128);
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
) -> ClientResult<Vec<metadata::Piece>> {
// Initialize the finished pieces.
let mut finished_pieces: Vec<metadata::Piece> = Vec::new();
@ -599,7 +763,6 @@ impl Task {
.piece
.download_from_source(
task_id,
peer_id,
interested_piece.number,
url.as_str(),
interested_piece.offset,
@ -620,16 +783,37 @@ impl Task {
.write_into_file_and_verify(&mut reader, f, metadata.digest.as_str())
.await?;
info!("finished piece {} from source", metadata.number);
// Construct the piece.
let piece = Piece {
number: metadata.number,
parent_id: None,
offset: metadata.offset,
length: metadata.length,
digest: metadata.digest.clone(),
content: None,
traffic_type: Some(TrafficType::LocalPeer as i32),
cost: metadata.prost_cost(),
created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)),
};
// Send the download progress.
download_progress_tx.send(metadata).await?;
download_progress_tx
.send(Ok(DownloadTaskResponse {
content_length,
piece: Some(piece.clone()),
}))
.await?;
// Store the finished piece.
finished_pieces.push(interested_piece.clone());
finished_pieces.push(metadata.clone());
}
// Check if all pieces are downloaded.
if finished_pieces.len() == interested_pieces.len() {
return Ok(download_progress_rx);
info!("all pieces are downloaded from source");
return Ok(finished_pieces);
}
// If not all pieces are downloaded, return an error.
@ -639,14 +823,14 @@ impl Task {
}
// get_content_length gets the content length of the task.
#[instrument(skip(self, url, header, timeout))]
#[instrument(skip_all)]
pub async fn get_content_length(
&self,
task_id: &str,
url: &str,
header: HeaderMap,
timeout: Option<Duration>,
) -> Result<u64> {
) -> ClientResult<u64> {
let task = self
.storage
.get_task(task_id)?

View File

@ -117,11 +117,9 @@ impl Piece {
}
// calculate_interested calculates the interested pieces by content_length and range.
#[instrument(skip(self, piece_length, content_length, range))]
#[instrument(skip(self))]
pub fn calculate_interested(
&self,
task_id: &str,
peer_id: &str,
piece_length: u64,
content_length: u64,
range: Option<Range>,
@ -146,7 +144,7 @@ impl Piece {
// If offset is greater than content_length, break the loop.
if offset >= content_length {
let mut piece = pieces.pop().ok_or(Error::InvalidParameter())?;
piece.length = piece_length + content_length - piece.offset;
piece.length = piece_length + content_length - offset;
pieces.push(piece);
break;
}
@ -156,7 +154,6 @@ impl Piece {
break;
}
offset = (number + 1) * piece_length;
if offset > range.start {
pieces.push(metadata::Piece {
number: number as u32,
@ -170,6 +167,7 @@ impl Piece {
});
}
offset = (number + 1) * piece_length;
number += 1;
}
@ -188,12 +186,11 @@ impl Piece {
// If offset is greater than content_length, break the loop.
if offset >= content_length {
let mut piece = pieces.pop().ok_or(Error::InvalidParameter())?;
piece.length = piece_length + content_length - piece.offset;
piece.length = piece_length + content_length - offset;
pieces.push(piece);
break;
}
offset = (number + 1) * piece_length;
pieces.push(metadata::Piece {
number: number as u32,
offset,
@ -205,6 +202,7 @@ impl Piece {
finished_at: None,
});
offset = (number + 1) * piece_length;
number += 1;
}
@ -234,11 +232,10 @@ impl Piece {
}
// collect_interested_from_remote_peer collects the interested pieces from remote peers.
#[instrument(skip(self, interested_pieces, candidate_parents))]
#[instrument(skip_all)]
pub async fn collect_interested_from_remote_peer(
&self,
task_id: &str,
peer_id: &str,
interested_pieces: Vec<metadata::Piece>,
candidate_parents: Vec<Peer>,
) -> Vec<CollectPiece> {
@ -309,22 +306,20 @@ impl Piece {
}
// download_from_local_peer downloads a single piece from a local peer.
#[instrument(skip(self))]
#[instrument(skip_all, fields(number))]
pub async fn download_from_local_peer(
&self,
task_id: &str,
peer_id: Option<&str>,
number: u32,
) -> Result<impl AsyncRead> {
self.storage.upload_piece(task_id, number).await
}
// download_from_remote_peer downloads a single piece from a remote peer.
#[instrument(skip(self, remote_peer))]
#[instrument(skip_all, fields(number))]
pub async fn download_from_remote_peer(
&self,
task_id: &str,
peer_id: &str,
number: u32,
remote_peer: Peer,
) -> Result<impl AsyncRead> {
@ -408,12 +403,11 @@ impl Piece {
}
// download_from_source downloads a single piece from the source.
#[instrument(skip(self, url, offset, length, header, timeout))]
#[instrument(skip_all, fields(number))]
#[allow(clippy::too_many_arguments)]
pub async fn download_from_source(
&self,
task_id: &str,
peer_id: &str,
number: u32,
url: &str,
offset: u64,