feat: add download_piece_from_source to task (#90)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-09-14 17:50:05 +08:00 committed by GitHub
parent c22e170be1
commit 1d033d2a61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 156 additions and 14 deletions

View File

@ -42,13 +42,23 @@ pub struct Response<R: AsyncRead> {
}
// HTTP is the HTTP backend.
pub struct HTTP {}
pub struct HTTP {
// client is the http client.
client: reqwest::Client,
}
// HTTP implements the http interface.
impl HTTP {
// new returns a new HTTP.
pub fn new() -> Self {
Self {
client: reqwest::Client::new(),
}
}
// Get gets the content of the request.
pub async fn get(&self, req: Request) -> Result<Response<impl AsyncRead>> {
let mut request_builder = reqwest::Client::new().get(&req.url).headers(req.header);
let mut request_builder = self.client.get(&req.url).headers(req.header);
if let Some(timeout) = req.timeout {
request_builder = request_builder.timeout(timeout);
}
@ -69,3 +79,11 @@ impl HTTP {
})
}
}
// Default implements the Default trait.
impl Default for HTTP {
// default returns a new default HTTP.
fn default() -> Self {
Self::new()
}
}

View File

@ -17,6 +17,7 @@
use anyhow::Context;
use clap::Parser;
use dragonfly_client::announcer::{ManagerAnnouncer, SchedulerAnnouncer};
use dragonfly_client::backend::http::HTTP;
use dragonfly_client::config::dfdaemon;
use dragonfly_client::dynconfig::Dynconfig;
use dragonfly_client::grpc::{
@ -26,6 +27,7 @@ use dragonfly_client::health::Health;
use dragonfly_client::metrics::Metrics;
use dragonfly_client::shutdown;
use dragonfly_client::storage::Storage;
use dragonfly_client::task::Task;
use dragonfly_client::tracing::init_tracing;
use dragonfly_client::utils::id_generator::IDGenerator;
use std::net::SocketAddr;
@ -87,7 +89,8 @@ async fn main() -> Result<(), anyhow::Error> {
);
// Initialize storage.
let _storage = Storage::new(&config.server.data_dir)?;
let storage = Storage::new(&config.server.data_dir)?;
let storage = Arc::new(storage);
// Initialize id generator.
let id_generator = IDGenerator::new(
@ -95,12 +98,19 @@ async fn main() -> Result<(), anyhow::Error> {
config.host.hostname.clone(),
);
// Initialize http client.
let http_client = HTTP::new();
let http_client = Arc::new(http_client);
// Initialize manager client.
let manager_client = ManagerClient::new(config.manager.addr.as_ref().unwrap())
.await
.context("failed to initialize manager client")?;
let manager_client = Arc::new(manager_client);
// Initialize task manager.
let _task = Task::new(storage.clone(), http_client.clone());
// Initialize channel for graceful shutdown.
let shutdown = shutdown::Shutdown::default();
let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::unbounded_channel();

View File

@ -14,6 +14,8 @@
* limitations under the License.
*/
use reqwest::header::HeaderMap;
pub mod announcer;
pub mod backend;
pub mod config;
@ -60,6 +62,14 @@ pub enum Error {
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
// Reqwest is the error for reqwest.
#[error(transparent)]
HTTP(HttpError),
// HostNotFound is the error when the host is not found.
#[error{"host {0} not found"}]
HostNotFound(String),
// TaskNotFound is the error when the task is not found.
#[error{"task {0} not found"}]
TaskNotFound(String),
@ -92,6 +102,10 @@ pub enum Error {
#[error("invalid uri {0}")]
InvalidURI(String),
// InvalidPeer is the error when the peer is invalid.
#[error("invalid peer {0}")]
InvalidPeer(String),
// SchedulerClientNotFound is the error when the scheduler client is not found.
#[error{"scheduler client not found"}]
SchedulerClientNotFound(),
@ -101,5 +115,19 @@ pub enum Error {
UnexpectedResponse(),
}
// HttpError is the error for http.
#[derive(Debug, thiserror::Error)]
#[error("http error {status_code}: {body}")]
pub struct HttpError {
// status_code is the status code of the response.
pub status_code: reqwest::StatusCode,
// header is the headers of the response.
pub header: HeaderMap,
// body is the body of the response.
pub body: String,
}
// Result is the result for Client.
pub type Result<T> = std::result::Result<T, Error>;

View File

@ -14,33 +14,37 @@
* limitations under the License.
*/
use crate::backend::http::{Request, HTTP};
use crate::grpc::dfdaemon::DfdaemonClient;
use crate::storage::Storage;
use crate::{Error, Result};
use crate::{Error, HttpError, Result};
use dragonfly_api::common::v2::Peer;
use dragonfly_api::dfdaemon::v2::{
sync_pieces_request, sync_pieces_response, InterestedPiecesRequest, InterestedPiecesResponse,
SyncPiecesRequest,
};
use reqwest::header::HeaderMap;
use std::sync::Arc;
use tokio::io::AsyncRead;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt};
use tracing::error;
// Task represents a task manager.
pub struct Task {
// manager_client is the grpc client of the manager.
dfdaemon_client: Arc<DfdaemonClient>,
// manager_client is the grpc client of the manager.
storage: Arc<Storage>,
// http_client is the http client.
http_client: Arc<HTTP>,
}
// NewTask returns a new Task.
impl Task {
// new returns a new Task.
pub fn new(dfdaemon_client: Arc<DfdaemonClient>, storage: Arc<Storage>) -> Self {
pub fn new(storage: Arc<Storage>, http_client: Arc<HTTP>) -> Self {
Self {
dfdaemon_client,
storage,
http_client,
}
}
@ -49,7 +53,16 @@ impl Task {
&self,
task_id: &str,
number: u32,
remote_peer: Peer,
) -> Result<impl AsyncRead> {
// Create a dfdaemon client.
let host = remote_peer
.host
.clone()
.ok_or(Error::InvalidPeer(remote_peer.id))?;
let dfdaemon_client =
DfdaemonClient::new(format!("http://{}:{}", host.ip, host.port)).await?;
// Record the start of downloading piece.
self.storage.download_piece_started(task_id, number)?;
@ -64,7 +77,7 @@ impl Task {
});
// Send the interested pieces request.
let response = self.dfdaemon_client.sync_pieces(in_stream).await?;
let response = dfdaemon_client.sync_pieces(in_stream).await?;
let mut resp_stream = response.into_inner();
if let Some(message) = resp_stream.message().await? {
if let Some(response) = message.response {
@ -82,7 +95,18 @@ impl Task {
&piece.digest,
&mut piece.content.as_slice(),
)
.await?;
.await
.map_err(|err| {
// Record the failure of downloading piece,
// If storage fails to record piece.
error!("download piece finished: {}", err);
if let Some(err) =
self.storage.download_piece_failed(task_id, number).err()
{
error!("download piece failed: {}", err)
};
err
})?;
// Return reader of the piece.
return self.storage.upload_piece(task_id, number).await;
@ -121,7 +145,69 @@ impl Task {
}
// download_piece_from_source downloads a piece from the source.
pub fn download_piece_from_source(&self) -> Result<()> {
unimplemented!()
pub async fn download_piece_from_source(
&self,
task_id: &str,
number: u32,
url: &str,
offset: u64,
header: HeaderMap,
timeout: Duration,
) -> Result<impl AsyncRead> {
// Record the start of downloading piece.
self.storage.download_piece_started(task_id, number)?;
// Download the piece from the source.
let mut response = self
.http_client
.get(Request {
url: url.to_string(),
header,
timeout: Some(timeout),
})
.await
.map_err(|err| {
// Record the failure of downloading piece,
// if the request is failed.
error!("http error: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
error!("download piece failed: {}", err)
};
err
})?;
// HTTP status code is not OK, handle the error.
if !response.status_code.is_success() {
// Record the failure of downloading piece,
// if the status code is not OK.
self.storage.download_piece_failed(task_id, number)?;
let mut buffer = String::new();
response.reader.read_to_string(&mut buffer).await?;
error!("http error {}: {}", response.status_code, buffer.as_str());
return Err(Error::HTTP(HttpError {
status_code: response.status_code,
header: response.header,
body: buffer,
}));
}
// TODO Calculate the digest of the piece.
// Record the finish of downloading piece.
self.storage
.download_piece_finished(task_id, number, offset, "", &mut response.reader)
.await
.map_err(|err| {
// Record the failure of downloading piece,
// If storage fails to record piece.
error!("download piece finished: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
error!("download piece failed: {}", err)
};
err
})?;
// Return reader of the piece.
self.storage.upload_piece(task_id, number).await
}
}