diff --git a/src/backend/http.rs b/src/backend/http.rs index c3f36fc4..ad61cc57 100644 --- a/src/backend/http.rs +++ b/src/backend/http.rs @@ -42,13 +42,23 @@ pub struct Response { } // HTTP is the HTTP backend. -pub struct HTTP {} +pub struct HTTP { + // client is the http client. + client: reqwest::Client, +} // HTTP implements the http interface. impl HTTP { + // new returns a new HTTP. + pub fn new() -> Self { + Self { + client: reqwest::Client::new(), + } + } + // Get gets the content of the request. pub async fn get(&self, req: Request) -> Result> { - let mut request_builder = reqwest::Client::new().get(&req.url).headers(req.header); + let mut request_builder = self.client.get(&req.url).headers(req.header); if let Some(timeout) = req.timeout { request_builder = request_builder.timeout(timeout); } @@ -69,3 +79,11 @@ impl HTTP { }) } } + +// Default implements the Default trait. +impl Default for HTTP { + // default returns a new default HTTP. + fn default() -> Self { + Self::new() + } +} diff --git a/src/bin/dfdaemon/main.rs b/src/bin/dfdaemon/main.rs index 02f3f635..d7187f59 100644 --- a/src/bin/dfdaemon/main.rs +++ b/src/bin/dfdaemon/main.rs @@ -17,6 +17,7 @@ use anyhow::Context; use clap::Parser; use dragonfly_client::announcer::{ManagerAnnouncer, SchedulerAnnouncer}; +use dragonfly_client::backend::http::HTTP; use dragonfly_client::config::dfdaemon; use dragonfly_client::dynconfig::Dynconfig; use dragonfly_client::grpc::{ @@ -26,6 +27,7 @@ use dragonfly_client::health::Health; use dragonfly_client::metrics::Metrics; use dragonfly_client::shutdown; use dragonfly_client::storage::Storage; +use dragonfly_client::task::Task; use dragonfly_client::tracing::init_tracing; use dragonfly_client::utils::id_generator::IDGenerator; use std::net::SocketAddr; @@ -87,7 +89,8 @@ async fn main() -> Result<(), anyhow::Error> { ); // Initialize storage. - let _storage = Storage::new(&config.server.data_dir)?; + let storage = Storage::new(&config.server.data_dir)?; + let storage = Arc::new(storage); // Initialize id generator. let id_generator = IDGenerator::new( @@ -95,12 +98,19 @@ async fn main() -> Result<(), anyhow::Error> { config.host.hostname.clone(), ); + // Initialize http client. + let http_client = HTTP::new(); + let http_client = Arc::new(http_client); + // Initialize manager client. let manager_client = ManagerClient::new(config.manager.addr.as_ref().unwrap()) .await .context("failed to initialize manager client")?; let manager_client = Arc::new(manager_client); + // Initialize task manager. + let _task = Task::new(storage.clone(), http_client.clone()); + // Initialize channel for graceful shutdown. let shutdown = shutdown::Shutdown::default(); let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::unbounded_channel(); diff --git a/src/lib.rs b/src/lib.rs index 4cc9125c..c0a61405 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,8 @@ * limitations under the License. */ +use reqwest::header::HeaderMap; + pub mod announcer; pub mod backend; pub mod config; @@ -60,6 +62,14 @@ pub enum Error { #[error(transparent)] Reqwest(#[from] reqwest::Error), + // Reqwest is the error for reqwest. + #[error(transparent)] + HTTP(HttpError), + + // HostNotFound is the error when the host is not found. + #[error{"host {0} not found"}] + HostNotFound(String), + // TaskNotFound is the error when the task is not found. #[error{"task {0} not found"}] TaskNotFound(String), @@ -92,6 +102,10 @@ pub enum Error { #[error("invalid uri {0}")] InvalidURI(String), + // InvalidPeer is the error when the peer is invalid. + #[error("invalid peer {0}")] + InvalidPeer(String), + // SchedulerClientNotFound is the error when the scheduler client is not found. #[error{"scheduler client not found"}] SchedulerClientNotFound(), @@ -101,5 +115,19 @@ pub enum Error { UnexpectedResponse(), } +// HttpError is the error for http. +#[derive(Debug, thiserror::Error)] +#[error("http error {status_code}: {body}")] +pub struct HttpError { + // status_code is the status code of the response. + pub status_code: reqwest::StatusCode, + + // header is the headers of the response. + pub header: HeaderMap, + + // body is the body of the response. + pub body: String, +} + // Result is the result for Client. pub type Result = std::result::Result; diff --git a/src/task/mod.rs b/src/task/mod.rs index 9959a264..3c71bc5a 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -14,33 +14,37 @@ * limitations under the License. */ +use crate::backend::http::{Request, HTTP}; use crate::grpc::dfdaemon::DfdaemonClient; use crate::storage::Storage; -use crate::{Error, Result}; +use crate::{Error, HttpError, Result}; +use dragonfly_api::common::v2::Peer; use dragonfly_api::dfdaemon::v2::{ sync_pieces_request, sync_pieces_response, InterestedPiecesRequest, InterestedPiecesResponse, SyncPiecesRequest, }; +use reqwest::header::HeaderMap; use std::sync::Arc; -use tokio::io::AsyncRead; +use std::time::Duration; +use tokio::io::{AsyncRead, AsyncReadExt}; use tracing::error; // Task represents a task manager. pub struct Task { - // manager_client is the grpc client of the manager. - dfdaemon_client: Arc, - // manager_client is the grpc client of the manager. storage: Arc, + + // http_client is the http client. + http_client: Arc, } // NewTask returns a new Task. impl Task { // new returns a new Task. - pub fn new(dfdaemon_client: Arc, storage: Arc) -> Self { + pub fn new(storage: Arc, http_client: Arc) -> Self { Self { - dfdaemon_client, storage, + http_client, } } @@ -49,7 +53,16 @@ impl Task { &self, task_id: &str, number: u32, + remote_peer: Peer, ) -> Result { + // Create a dfdaemon client. + let host = remote_peer + .host + .clone() + .ok_or(Error::InvalidPeer(remote_peer.id))?; + let dfdaemon_client = + DfdaemonClient::new(format!("http://{}:{}", host.ip, host.port)).await?; + // Record the start of downloading piece. self.storage.download_piece_started(task_id, number)?; @@ -64,7 +77,7 @@ impl Task { }); // Send the interested pieces request. - let response = self.dfdaemon_client.sync_pieces(in_stream).await?; + let response = dfdaemon_client.sync_pieces(in_stream).await?; let mut resp_stream = response.into_inner(); if let Some(message) = resp_stream.message().await? { if let Some(response) = message.response { @@ -82,7 +95,18 @@ impl Task { &piece.digest, &mut piece.content.as_slice(), ) - .await?; + .await + .map_err(|err| { + // Record the failure of downloading piece, + // If storage fails to record piece. + error!("download piece finished: {}", err); + if let Some(err) = + self.storage.download_piece_failed(task_id, number).err() + { + error!("download piece failed: {}", err) + }; + err + })?; // Return reader of the piece. return self.storage.upload_piece(task_id, number).await; @@ -121,7 +145,69 @@ impl Task { } // download_piece_from_source downloads a piece from the source. - pub fn download_piece_from_source(&self) -> Result<()> { - unimplemented!() + pub async fn download_piece_from_source( + &self, + task_id: &str, + number: u32, + url: &str, + offset: u64, + header: HeaderMap, + timeout: Duration, + ) -> Result { + // Record the start of downloading piece. + self.storage.download_piece_started(task_id, number)?; + + // Download the piece from the source. + let mut response = self + .http_client + .get(Request { + url: url.to_string(), + header, + timeout: Some(timeout), + }) + .await + .map_err(|err| { + // Record the failure of downloading piece, + // if the request is failed. + error!("http error: {}", err); + if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + error!("download piece failed: {}", err) + }; + err + })?; + + // HTTP status code is not OK, handle the error. + if !response.status_code.is_success() { + // Record the failure of downloading piece, + // if the status code is not OK. + self.storage.download_piece_failed(task_id, number)?; + + let mut buffer = String::new(); + response.reader.read_to_string(&mut buffer).await?; + error!("http error {}: {}", response.status_code, buffer.as_str()); + return Err(Error::HTTP(HttpError { + status_code: response.status_code, + header: response.header, + body: buffer, + })); + } + + // TODO Calculate the digest of the piece. + // Record the finish of downloading piece. + self.storage + .download_piece_finished(task_id, number, offset, "", &mut response.reader) + .await + .map_err(|err| { + // Record the failure of downloading piece, + // If storage fails to record piece. + error!("download piece finished: {}", err); + if let Some(err) = self.storage.download_piece_failed(task_id, number).err() { + error!("download piece failed: {}", err) + }; + err + })?; + + // Return reader of the piece. + self.storage.upload_piece(task_id, number).await } }