feat: add indicatif to dfget and change length to uint32 (#119)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-11-20 21:42:24 +08:00 committed by GitHub
parent 829491a04f
commit af1e5c2784
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 151 additions and 113 deletions

View File

@ -55,7 +55,7 @@ local-ip-address = "0.5.3"
rocksdb = "0.21.0" rocksdb = "0.21.0"
num_cpus = "1.0" num_cpus = "1.0"
chrono = { version = "0.4.26", features = ["serde"] } chrono = { version = "0.4.26", features = ["serde"] }
dragonfly-api = "2.0.53" dragonfly-api = "2.0.56"
sysinfo = "0.29.6" sysinfo = "0.29.6"
sha2 = "0.10" sha2 = "0.10"
hex = "0.4" hex = "0.4"
@ -68,3 +68,4 @@ http = "0.2"
rand = "0.8.5" rand = "0.8.5"
prost-wkt-types = "0.4" prost-wkt-types = "0.4"
tower = "0.4.13" tower = "0.4.13"
indicatif = "0.17.7"

View File

@ -23,9 +23,11 @@ use dragonfly_client::config::dfget;
use dragonfly_client::grpc::dfdaemon::DfdaemonClient; use dragonfly_client::grpc::dfdaemon::DfdaemonClient;
use dragonfly_client::tracing::init_tracing; use dragonfly_client::tracing::init_tracing;
use dragonfly_client::Error; use dragonfly_client::Error;
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use std::{cmp::min, fmt::Write};
use tracing::Level; use tracing::Level;
use url::Url; use url::Url;
@ -69,7 +71,7 @@ struct Args {
default_value_t = 4194304, default_value_t = 4194304,
help = "Set the byte length of the piece" help = "Set the byte length of the piece"
)] )]
piece_length: i32, piece_length: u64,
#[arg( #[arg(
long = "download-rate-limit", long = "download-rate-limit",
@ -178,13 +180,32 @@ async fn main() -> Result<(), anyhow::Error> {
}) })
.await?; .await?;
// TODO: Support to print progress. // Initialize progress bar.
let pb = ProgressBar::new(0);
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] [{wide_bar}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})",
)
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| {
write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()
})
.progress_chars("#>-"),
);
// Dwonload file.
let mut downloaded = 0;
let mut out_stream = response.into_inner(); let mut out_stream = response.into_inner();
while let Some(message) = out_stream.message().await? { while let Some(message) = out_stream.message().await? {
let piece = message.piece.ok_or(Error::InvalidParameter())?; let piece = message.piece.ok_or(Error::InvalidParameter())?;
println!("{:?}", piece) pb.set_length(message.content_length);
downloaded += piece.length;
let position = min(downloaded + piece.length, message.content_length);
pb.set_position(position);
} }
pb.finish_with_message("downloaded");
Ok(()) Ok(())
} }

View File

@ -16,6 +16,7 @@
use crate::shutdown; use crate::shutdown;
use crate::task; use crate::task;
use crate::utils::http::hashmap_to_headermap;
use crate::Result as ClientResult; use crate::Result as ClientResult;
use dragonfly_api::common::v2::{Piece, Task}; use dragonfly_api::common::v2::{Piece, Task};
use dragonfly_api::dfdaemon::v2::{ use dragonfly_api::dfdaemon::v2::{
@ -154,7 +155,7 @@ impl Dfdaemon for DfdaemonServerHandler {
let task = self.task.clone(); let task = self.task.clone();
// Get the piece numbers from the local storage. // Get the piece numbers from the local storage.
let piece_numbers: Vec<i32> = task let piece_numbers: Vec<u32> = task
.piece .piece
.get_all(task_id.as_str()) .get_all(task_id.as_str())
.map_err(|e| { .map_err(|e| {
@ -298,14 +299,67 @@ impl Dfdaemon for DfdaemonServerHandler {
// Clone the task. // Clone the task.
let task = self.task.clone(); let task = self.task.clone();
// Generate the task id.
let task_id = task
.id_generator
.task_id(
download.url.as_str(),
download.digest.as_deref(),
download.tag.as_deref(),
download.application.as_deref(),
download.piece_length,
download.filters.clone(),
)
.map_err(|e| {
error!("generate task id: {}", e);
Status::invalid_argument(e.to_string())
})?;
// Generate the host id.
let host_id = task.id_generator.host_id();
// Generate the peer id.
let peer_id = task.id_generator.peer_id();
// Convert the header.
let header = hashmap_to_headermap(&download.header).map_err(|e| {
error!("convert header: {}", e);
Status::invalid_argument(e.to_string())
})?;
// Get the content length.
let content_length = task
.get_content_length(
task_id.as_str(),
download.url.as_str(),
header.clone(),
None,
)
.await
.map_err(|e| {
error!("get content length: {}", e);
Status::internal(e.to_string())
})?;
// Initialize stream channel. // Initialize stream channel.
let (out_stream_tx, out_stream_rx) = mpsc::channel(128); let (out_stream_tx, out_stream_rx) = mpsc::channel(128);
tokio::spawn(async move { tokio::spawn(async move {
match task.download_into_file(download).await { match task
.download_into_file(
task_id.as_str(),
host_id.as_str(),
peer_id.as_str(),
content_length,
header.clone(),
download.clone(),
)
.await
{
Ok(mut download_progress_rx) => { Ok(mut download_progress_rx) => {
while let Some(finished_piece) = download_progress_rx.recv().await { while let Some(finished_piece) = download_progress_rx.recv().await {
out_stream_tx out_stream_tx
.send(Ok(DownloadTaskResponse { .send(Ok(DownloadTaskResponse {
content_length,
piece: Some(Piece { piece: Some(Piece {
number: finished_piece.number, number: finished_piece.number,
parent_id: None, parent_id: None,
@ -419,7 +473,7 @@ impl DfdaemonClient {
pub async fn get_piece_numbers( pub async fn get_piece_numbers(
&self, &self,
request: GetPieceNumbersRequest, request: GetPieceNumbersRequest,
) -> ClientResult<Vec<i32>> { ) -> ClientResult<Vec<u32>> {
let mut request = tonic::Request::new(request); let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT); request.set_timeout(super::REQUEST_TIMEOUT);

View File

@ -51,13 +51,13 @@ pub struct Task {
pub id: String, pub id: String,
// piece_length is the length of the piece. // piece_length is the length of the piece.
pub piece_length: i32, pub piece_length: u64,
// uploaded_count is the count of the task uploaded by other peers. // uploaded_count is the count of the task uploaded by other peers.
pub uploaded_count: u64, pub uploaded_count: u64,
// content_length is the length of the task. // content_length is the length of the task.
pub content_length: Option<i64>, pub content_length: Option<u64>,
// updated_at is the time when the task metadata is updated. If the task is downloaded // updated_at is the time when the task metadata is updated. If the task is downloaded
// by other peers, it will also update updated_at. // by other peers, it will also update updated_at.
@ -87,7 +87,7 @@ impl Task {
#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Piece { pub struct Piece {
// number is the piece number. // number is the piece number.
pub number: i32, pub number: u32,
// offset is the offset of the piece in the task. // offset is the offset of the piece in the task.
pub offset: u64, pub offset: u64,
@ -203,7 +203,7 @@ impl Metadata {
} }
// download_task_started updates the metadata of the task when the task downloads started. // download_task_started updates the metadata of the task when the task downloads started.
pub fn download_task_started(&self, id: &str, piece_length: i32) -> Result<()> { pub fn download_task_started(&self, id: &str, piece_length: u64) -> Result<()> {
let task = match self.get_task(id)? { let task = match self.get_task(id)? {
// If the task exists, update the updated_at. // If the task exists, update the updated_at.
Some(mut task) => { Some(mut task) => {
@ -224,7 +224,7 @@ impl Metadata {
} }
// set_task_content_length sets the content length of the task. // set_task_content_length sets the content length of the task.
pub fn set_task_content_length(&self, id: &str, content_length: i64) -> Result<()> { pub fn set_task_content_length(&self, id: &str, content_length: u64) -> Result<()> {
if let Some(mut task) = self.get_task(id)? { if let Some(mut task) = self.get_task(id)? {
task.content_length = Some(content_length); task.content_length = Some(content_length);
return self.put_task(id, &task); return self.put_task(id, &task);
@ -278,7 +278,7 @@ impl Metadata {
} }
// download_piece_started updates the metadata of the piece when the piece downloads started. // download_piece_started updates the metadata of the piece when the piece downloads started.
pub fn download_piece_started(&self, task_id: &str, number: i32) -> Result<()> { pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<()> {
self.put_piece( self.put_piece(
task_id, task_id,
&Piece { &Piece {
@ -294,7 +294,7 @@ impl Metadata {
pub fn download_piece_finished( pub fn download_piece_finished(
&self, &self,
task_id: &str, task_id: &str,
number: i32, number: u32,
offset: u64, offset: u64,
length: u64, length: u64,
digest: &str, digest: &str,
@ -313,7 +313,7 @@ impl Metadata {
} }
// download_piece_failed updates the metadata of the piece when the piece downloads failed. // download_piece_failed updates the metadata of the piece when the piece downloads failed.
pub fn download_piece_failed(&self, task_id: &str, number: i32) -> Result<()> { pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
match self.get_piece(task_id, number)? { match self.get_piece(task_id, number)? {
Some(_piece) => self.delete_piece(task_id, number), Some(_piece) => self.delete_piece(task_id, number),
None => Err(Error::PieceNotFound(self.piece_id(task_id, number))), None => Err(Error::PieceNotFound(self.piece_id(task_id, number))),
@ -321,7 +321,7 @@ impl Metadata {
} }
// upload_piece_finished updates the metadata of the piece when piece uploads finished. // upload_piece_finished updates the metadata of the piece when piece uploads finished.
pub fn upload_piece_finished(&self, task_id: &str, number: i32) -> Result<()> { pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result<()> {
match self.get_piece(task_id, number)? { match self.get_piece(task_id, number)? {
Some(mut piece) => { Some(mut piece) => {
piece.uploaded_count += 1; piece.uploaded_count += 1;
@ -333,7 +333,7 @@ impl Metadata {
} }
// get_piece gets the piece metadata. // get_piece gets the piece metadata.
pub fn get_piece(&self, task_id: &str, number: i32) -> Result<Option<Piece>> { pub fn get_piece(&self, task_id: &str, number: u32) -> Result<Option<Piece>> {
let id = self.piece_id(task_id, number); let id = self.piece_id(task_id, number);
let handle = self.cf_handle(PIECE_CF_NAME)?; let handle = self.cf_handle(PIECE_CF_NAME)?;
match self.db.get_cf(handle, id.as_bytes())? { match self.db.get_cf(handle, id.as_bytes())? {
@ -368,7 +368,7 @@ impl Metadata {
} }
// delete_piece deletes the piece metadata. // delete_piece deletes the piece metadata.
fn delete_piece(&self, task_id: &str, number: i32) -> Result<()> { fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> {
let id = self.piece_id(task_id, number); let id = self.piece_id(task_id, number);
let handle = self.cf_handle(PIECE_CF_NAME)?; let handle = self.cf_handle(PIECE_CF_NAME)?;
self.db.delete_cf(handle, id.as_bytes())?; self.db.delete_cf(handle, id.as_bytes())?;
@ -376,7 +376,7 @@ impl Metadata {
} }
// piece_id returns the piece id. // piece_id returns the piece id.
pub fn piece_id(&self, task_id: &str, number: i32) -> String { pub fn piece_id(&self, task_id: &str, number: u32) -> String {
format!("{}-{}", task_id, number) format!("{}-{}", task_id, number)
} }

View File

@ -41,12 +41,12 @@ impl Storage {
} }
// download_task_started updates the metadata of the task when the task downloads started. // download_task_started updates the metadata of the task when the task downloads started.
pub fn download_task_started(&self, id: &str, piece_length: i32) -> Result<()> { pub fn download_task_started(&self, id: &str, piece_length: u64) -> Result<()> {
self.metadata.download_task_started(id, piece_length) self.metadata.download_task_started(id, piece_length)
} }
// set_task_content_length sets the content length of the task. // set_task_content_length sets the content length of the task.
pub fn set_task_content_length(&self, id: &str, content_length: i64) -> Result<()> { pub fn set_task_content_length(&self, id: &str, content_length: u64) -> Result<()> {
self.metadata.set_task_content_length(id, content_length) self.metadata.set_task_content_length(id, content_length)
} }
@ -68,7 +68,7 @@ impl Storage {
// download_piece_started updates the metadata of the piece and writes // download_piece_started updates the metadata of the piece and writes
// the data of piece to file when the piece downloads started. // the data of piece to file when the piece downloads started.
pub fn download_piece_started(&self, task_id: &str, number: i32) -> Result<()> { pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<()> {
self.metadata.download_piece_started(task_id, number) self.metadata.download_piece_started(task_id, number)
} }
@ -76,7 +76,7 @@ impl Storage {
pub async fn download_piece_from_source_finished<R: AsyncRead + Unpin + ?Sized>( pub async fn download_piece_from_source_finished<R: AsyncRead + Unpin + ?Sized>(
&self, &self,
task_id: &str, task_id: &str,
number: i32, number: u32,
offset: u64, offset: u64,
length: u64, length: u64,
reader: &mut R, reader: &mut R,
@ -98,7 +98,7 @@ impl Storage {
pub async fn download_piece_from_remote_peer_finished<R: AsyncRead + Unpin + ?Sized>( pub async fn download_piece_from_remote_peer_finished<R: AsyncRead + Unpin + ?Sized>(
&self, &self,
task_id: &str, task_id: &str,
number: i32, number: u32,
offset: u64, offset: u64,
expected_digest: &str, expected_digest: &str,
reader: &mut R, reader: &mut R,
@ -123,13 +123,13 @@ impl Storage {
} }
// download_piece_failed updates the metadata of the piece when the piece downloads failed. // download_piece_failed updates the metadata of the piece when the piece downloads failed.
pub fn download_piece_failed(&self, task_id: &str, number: i32) -> Result<()> { pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.metadata.download_piece_failed(task_id, number) self.metadata.download_piece_failed(task_id, number)
} }
// upload_piece updates the metadata of the piece and // upload_piece updates the metadata of the piece and
// returns the data of the piece. // returns the data of the piece.
pub async fn upload_piece(&self, task_id: &str, number: i32) -> Result<impl AsyncRead> { pub async fn upload_piece(&self, task_id: &str, number: u32) -> Result<impl AsyncRead> {
match self.metadata.get_piece(task_id, number)? { match self.metadata.get_piece(task_id, number)? {
Some(piece) => { Some(piece) => {
let reader = self let reader = self
@ -144,7 +144,7 @@ impl Storage {
} }
// get_piece returns the piece metadata. // get_piece returns the piece metadata.
pub fn get_piece(&self, task_id: &str, number: i32) -> Result<Option<metadata::Piece>> { pub fn get_piece(&self, task_id: &str, number: u32) -> Result<Option<metadata::Piece>> {
let piece = self.metadata.get_piece(task_id, number)?; let piece = self.metadata.get_piece(task_id, number)?;
Ok(piece) Ok(piece)
} }
@ -155,7 +155,7 @@ impl Storage {
} }
// piece_id returns the piece id. // piece_id returns the piece id.
pub fn piece_id(&self, task_id: &str, number: i32) -> String { pub fn piece_id(&self, task_id: &str, number: u32) -> String {
self.metadata.piece_id(task_id, number) self.metadata.piece_id(task_id, number)
} }
} }

View File

@ -17,7 +17,7 @@
use crate::backend::http::{Request as HTTPRequest, HTTP}; use crate::backend::http::{Request as HTTPRequest, HTTP};
use crate::grpc::scheduler::SchedulerClient; use crate::grpc::scheduler::SchedulerClient;
use crate::storage::{metadata, Storage}; use crate::storage::{metadata, Storage};
use crate::utils::http::{hashmap_to_headermap, headermap_to_hashmap}; use crate::utils::http::headermap_to_hashmap;
use crate::utils::id_generator::IDGenerator; use crate::utils::id_generator::IDGenerator;
use crate::{Error, Result}; use crate::{Error, Result};
use dragonfly_api::common::v2::{Download, Piece, TrafficType}; use dragonfly_api::common::v2::{Download, Piece, TrafficType};
@ -44,7 +44,7 @@ pub mod piece;
// Task represents a task manager. // Task represents a task manager.
pub struct Task { pub struct Task {
// id_generator is the id generator. // id_generator is the id generator.
id_generator: Arc<IDGenerator>, pub id_generator: Arc<IDGenerator>,
// manager_client is the grpc client of the manager. // manager_client is the grpc client of the manager.
storage: Arc<Storage>, storage: Arc<Storage>,
@ -92,33 +92,16 @@ impl Task {
// download_into_file downloads a task into a file. // download_into_file downloads a task into a file.
pub async fn download_into_file( pub async fn download_into_file(
&self, &self,
task_id: &str,
host_id: &str,
peer_id: &str,
content_length: u64,
header: HeaderMap,
download: Download, download: Download,
) -> Result<Receiver<metadata::Piece>> { ) -> Result<Receiver<metadata::Piece>> {
// Initialize the download progress channel. // Initialize the download progress channel.
let (download_progress_tx, download_progress_rx) = mpsc::channel(128); let (download_progress_tx, download_progress_rx) = mpsc::channel(128);
// Generate the host id.
let host_id = self.id_generator.host_id();
// Generate the task id.
let task_id = self.id_generator.task_id(
download.url.clone(),
download.digest.clone(),
download.tag.clone(),
download.application.clone(),
download.piece_length,
download.filters.clone(),
)?;
// Generate the peer id.
let peer_id = self.id_generator.peer_id();
// Get the output path.
let output_path = download.output_path.clone();
// Convert the header.
let header = hashmap_to_headermap(&download.header)?;
// Convert the timeout. // Convert the timeout.
let timeout: Option<Duration> = match download.timeout.clone() { let timeout: Option<Duration> = match download.timeout.clone() {
Some(timeout) => { Some(timeout) => {
@ -130,17 +113,7 @@ impl Task {
// Open the file. // Open the file.
let mut f = OpenOptions::new() let mut f = OpenOptions::new()
.write(true) .write(true)
.open(output_path.as_str()) .open(download.output_path.as_str())
.await?;
// Get the content length of the task.
let content_length = self
.get_content_length(
task_id.as_str(),
download.url.as_str(),
header.clone(),
timeout,
)
.await?; .await?;
// Calculate the interested pieces to download. // Calculate the interested pieces to download.
@ -151,7 +124,7 @@ impl Task {
)?; )?;
// Get the task from the local storage. // Get the task from the local storage.
let task = self.get(task_id.as_str())?; let task = self.get(task_id)?;
match task { match task {
Some(task) => { Some(task) => {
// If the task is finished, return the file. // If the task is finished, return the file.
@ -160,7 +133,7 @@ impl Task {
return self return self
.download_partial_from_local_peer_into_file( .download_partial_from_local_peer_into_file(
&mut f, &mut f,
task_id.as_str(), task_id,
interested_pieces, interested_pieces,
) )
.await; .await;
@ -171,7 +144,7 @@ impl Task {
while let Some(finished_piece) = self while let Some(finished_piece) = self
.download_partial_from_local_peer_into_file( .download_partial_from_local_peer_into_file(
&mut f, &mut f,
task_id.as_str(), task_id,
interested_pieces.clone(), interested_pieces.clone(),
) )
.await? .await?
@ -199,9 +172,9 @@ impl Task {
match self match self
.download_partial_with_scheduler_into_file( .download_partial_with_scheduler_into_file(
&mut f, &mut f,
task_id.as_str(), task_id,
host_id.as_str(), host_id,
peer_id.as_str(), peer_id,
interested_pieces.clone(), interested_pieces.clone(),
download.clone(), download.clone(),
) )
@ -215,7 +188,7 @@ impl Task {
self.download_partial_from_source_into_file( self.download_partial_from_source_into_file(
&mut f, &mut f,
interested_pieces, interested_pieces,
task_id.as_str(), task_id,
download.url.clone(), download.url.clone(),
header.clone(), header.clone(),
timeout, timeout,
@ -229,9 +202,9 @@ impl Task {
match self match self
.download_partial_with_scheduler_into_file( .download_partial_with_scheduler_into_file(
&mut f, &mut f,
task_id.as_str(), task_id,
host_id.as_str(), host_id,
peer_id.as_str(), peer_id,
interested_pieces.clone(), interested_pieces.clone(),
download.clone(), download.clone(),
) )
@ -245,7 +218,7 @@ impl Task {
self.download_partial_from_source_into_file( self.download_partial_from_source_into_file(
&mut f, &mut f,
interested_pieces, interested_pieces,
task_id.as_str(), task_id,
download.url.clone(), download.url.clone(),
header.clone(), header.clone(),
timeout, timeout,
@ -653,13 +626,13 @@ impl Task {
} }
// get_content_length gets the content length of the task. // get_content_length gets the content length of the task.
async fn get_content_length( pub async fn get_content_length(
&self, &self,
task_id: &str, task_id: &str,
url: &str, url: &str,
header: HeaderMap, header: HeaderMap,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> Result<i64> { ) -> Result<u64> {
let task = self let task = self
.storage .storage
.get_task(task_id)? .get_task(task_id)?
@ -686,7 +659,7 @@ impl Task {
.ok_or(Error::InvalidContentLength())? .ok_or(Error::InvalidContentLength())?
.to_str() .to_str()
.map_err(|_| Error::InvalidContentLength())? .map_err(|_| Error::InvalidContentLength())?
.parse::<i64>() .parse::<u64>()
.map_err(|_| Error::InvalidContentLength())?; .map_err(|_| Error::InvalidContentLength())?;
// Set the content length of the task. // Set the content length of the task.

View File

@ -40,7 +40,7 @@ use tracing::error;
// CollectPiece represents a piece to collect. // CollectPiece represents a piece to collect.
pub struct CollectPiece { pub struct CollectPiece {
// number is the piece number. // number is the piece number.
pub number: i32, pub number: u32,
// parent is the parent peer. // parent is the parent peer.
pub parent: Peer, pub parent: Peer,
@ -74,7 +74,7 @@ impl Piece {
} }
// get gets a piece from the local storage. // get gets a piece from the local storage.
pub fn get(&self, task_id: &str, number: i32) -> Result<Option<metadata::Piece>> { pub fn get(&self, task_id: &str, number: u32) -> Result<Option<metadata::Piece>> {
self.storage.get_piece(task_id, number) self.storage.get_piece(task_id, number)
} }
@ -116,20 +116,10 @@ impl Piece {
// calculate_interested calculates the interested pieces by content_length and range. // calculate_interested calculates the interested pieces by content_length and range.
pub fn calculate_interested( pub fn calculate_interested(
&self, &self,
piece_length: i32, piece_length: u64,
content_length: i64, content_length: u64,
range: Option<Range>, range: Option<Range>,
) -> Result<Vec<metadata::Piece>> { ) -> Result<Vec<metadata::Piece>> {
// piece_length must be greater than 0.
if piece_length <= 0 {
return Err(Error::InvalidParameter());
}
// content_length must be greater than 0.
if content_length < 0 {
return Err(Error::InvalidContentLength());
}
// If content_length is 0, return empty piece. // If content_length is 0, return empty piece.
if content_length == 0 { if content_length == 0 {
return Ok(Vec::new()); return Ok(Vec::new());
@ -137,7 +127,7 @@ impl Piece {
// If range is not None, calculate the pieces by range. // If range is not None, calculate the pieces by range.
if let Some(range) = range { if let Some(range) = range {
if range.start < 0 || range.length <= 0 { if range.length == 0 {
return Err(Error::InvalidParameter()); return Err(Error::InvalidParameter());
} }
@ -148,8 +138,7 @@ impl Piece {
// If offset is greater than content_length, break the loop. // If offset is greater than content_length, break the loop.
if offset >= content_length { if offset >= content_length {
let mut piece = pieces.pop().ok_or(Error::InvalidParameter())?; let mut piece = pieces.pop().ok_or(Error::InvalidParameter())?;
piece.length = piece.length = piece_length + content_length - piece.offset;
(piece_length + content_length as i32 - piece.offset as i32) as u64;
pieces.push(piece); pieces.push(piece);
break; break;
} }
@ -159,12 +148,12 @@ impl Piece {
break; break;
} }
offset = i64::from((number + 1) * piece_length); offset = (number + 1) * piece_length;
if offset > range.start { if offset > range.start {
pieces.push(metadata::Piece { pieces.push(metadata::Piece {
number, number: number as u32,
offset: offset as u64, offset,
length: piece_length as u64, length: piece_length,
digest: "".to_string(), digest: "".to_string(),
uploaded_count: 0, uploaded_count: 0,
updated_at: Utc::now().naive_utc(), updated_at: Utc::now().naive_utc(),
@ -187,16 +176,16 @@ impl Piece {
// If offset is greater than content_length, break the loop. // If offset is greater than content_length, break the loop.
if offset >= content_length { if offset >= content_length {
let mut piece = pieces.pop().ok_or(Error::InvalidParameter())?; let mut piece = pieces.pop().ok_or(Error::InvalidParameter())?;
piece.length = (piece_length + content_length as i32 - piece.offset as i32) as u64; piece.length = piece_length + content_length - piece.offset;
pieces.push(piece); pieces.push(piece);
break; break;
} }
offset = i64::from((number + 1) * piece_length); offset = (number + 1) * piece_length;
pieces.push(metadata::Piece { pieces.push(metadata::Piece {
number, number: number as u32,
offset: offset as u64, offset,
length: piece_length as u64, length: piece_length,
digest: "".to_string(), digest: "".to_string(),
uploaded_count: 0, uploaded_count: 0,
updated_at: Utc::now().naive_utc(), updated_at: Utc::now().naive_utc(),
@ -283,7 +272,7 @@ impl Piece {
collect_pieces.shuffle(&mut rand::thread_rng()); collect_pieces.shuffle(&mut rand::thread_rng());
// Filter the collect pieces and remove the duplicate pieces. // Filter the collect pieces and remove the duplicate pieces.
let mut visited: Vec<i32> = Vec::new(); let mut visited: Vec<u32> = Vec::new();
collect_pieces.retain(|collect_piece| { collect_pieces.retain(|collect_piece| {
interested_pieces interested_pieces
.iter() .iter()
@ -304,7 +293,7 @@ impl Piece {
pub async fn download_from_local_peer( pub async fn download_from_local_peer(
&self, &self,
task_id: &str, task_id: &str,
number: i32, number: u32,
) -> Result<impl AsyncRead> { ) -> Result<impl AsyncRead> {
self.storage.upload_piece(task_id, number).await self.storage.upload_piece(task_id, number).await
} }
@ -313,7 +302,7 @@ impl Piece {
pub async fn download_from_remote_peer( pub async fn download_from_remote_peer(
&self, &self,
task_id: &str, task_id: &str,
number: i32, number: u32,
remote_peer: Peer, remote_peer: Peer,
) -> Result<impl AsyncRead> { ) -> Result<impl AsyncRead> {
// Create a dfdaemon client. // Create a dfdaemon client.
@ -400,7 +389,7 @@ impl Piece {
pub async fn download_from_source( pub async fn download_from_source(
&self, &self,
task_id: &str, task_id: &str,
number: i32, number: u32,
url: &str, url: &str,
offset: u64, offset: u64,
length: u64, length: u64,

View File

@ -43,15 +43,15 @@ impl IDGenerator {
// task_id generates the task id. // task_id generates the task id.
pub fn task_id( pub fn task_id(
&self, &self,
url: String, url: &str,
digest: Option<String>, digest: Option<&str>,
tag: Option<String>, tag: Option<&str>,
application: Option<String>, application: Option<&str>,
piece_length: i32, piece_length: u64,
filters: Vec<String>, filters: Vec<String>,
) -> Result<String> { ) -> Result<String> {
// Filter the query parameters. // Filter the query parameters.
let url = Url::parse(url.as_str())?; let url = Url::parse(url)?;
let query = url let query = url
.query_pairs() .query_pairs()
.filter(|(k, _)| filters.contains(&k.to_string())); .filter(|(k, _)| filters.contains(&k.to_string()));