feat: add piece manager (#105)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-11-07 10:49:13 +08:00 committed by GitHub
parent a0dfe7aa12
commit c2c1f648df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 254 additions and 190 deletions

View File

@ -120,7 +120,8 @@ impl Dfdaemon for DfdaemonServerHandler {
// Get the piece numbers from the local storage. // Get the piece numbers from the local storage.
let piece_numbers = task let piece_numbers = task
.get_pieces(task_id.as_str()) .piece
.get_all(task_id.as_str())
.map_err(|e| { .map_err(|e| {
error!("get piece numbers from local storage: {}", e); error!("get piece numbers from local storage: {}", e);
Status::internal(e.to_string()) Status::internal(e.to_string())
@ -154,7 +155,7 @@ impl Dfdaemon for DfdaemonServerHandler {
{ {
for piece_number in piece_numbers { for piece_number in piece_numbers {
// Get the piece metadata from the local storage. // Get the piece metadata from the local storage.
let piece = match task.get_piece(&task_id, piece_number) { let piece = match task.piece.get(&task_id, piece_number) {
Ok(piece) => piece, Ok(piece) => piece,
Err(e) => { Err(e) => {
error!("get piece metadata from local storage: {}", e); error!("get piece metadata from local storage: {}", e);
@ -163,7 +164,7 @@ impl Dfdaemon for DfdaemonServerHandler {
}; };
// Get the piece content from the local storage. // Get the piece content from the local storage.
let mut reader = match task.download_piece_from_local_peer(&task_id, piece_number).await { let mut reader = match task.piece.download_from_local_peer(&task_id, piece_number).await {
Ok(reader) => reader, Ok(reader) => reader,
Err(e) => { Err(e) => {
error!("get piece content from local peer: {}", e); error!("get piece content from local peer: {}", e);

View File

@ -74,10 +74,10 @@ impl Storage {
task_id: &str, task_id: &str,
number: i32, number: i32,
offset: u64, offset: u64,
length: u64,
reader: &mut R, reader: &mut R,
) -> Result<u64> { ) -> Result<u64> {
let response = self.content.write_piece(task_id, offset, reader).await?; let response = self.content.write_piece(task_id, offset, reader).await?;
let length = response.length;
let digest = Digest::new(Algorithm::Sha256, response.hash); let digest = Digest::new(Algorithm::Sha256, response.hash);
self.metadata.download_piece_finished( self.metadata.download_piece_finished(

View File

@ -14,20 +14,13 @@
* limitations under the License. * limitations under the License.
*/ */
use crate::backend::http::{Request, HTTP}; use crate::backend::http::HTTP;
use crate::grpc::{dfdaemon::DfdaemonClient, scheduler::SchedulerClient}; use crate::grpc::scheduler::SchedulerClient;
use crate::storage::{metadata, Storage}; use crate::storage::{metadata, Storage};
use crate::{Error, HttpError, Result}; use crate::Result;
use dragonfly_api::common::v2::Peer;
use dragonfly_api::dfdaemon::v2::{
sync_pieces_request, sync_pieces_response, InterestedPiecesRequest, InterestedPiecesResponse,
SyncPiecesRequest,
};
use reqwest::header::HeaderMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt}; pub mod piece;
use tracing::error;
// Task represents a task manager. // Task represents a task manager.
pub struct Task { pub struct Task {
@ -37,8 +30,8 @@ pub struct Task {
// scheduler_client is the grpc client of the scheduler. // scheduler_client is the grpc client of the scheduler.
pub scheduler_client: Arc<SchedulerClient>, pub scheduler_client: Arc<SchedulerClient>,
// http_client is the http client. // piece is the piece manager.
http_client: Arc<HTTP>, pub piece: piece::Piece,
} }
// Task implements the task manager. // Task implements the task manager.
@ -50,9 +43,13 @@ impl Task {
http_client: Arc<HTTP>, http_client: Arc<HTTP>,
) -> Self { ) -> Self {
Self { Self {
storage, storage: storage.clone(),
scheduler_client, scheduler_client: scheduler_client.clone(),
http_client, piece: piece::Piece::new(
storage.clone(),
scheduler_client.clone(),
http_client.clone(),
),
} }
} }
@ -60,173 +57,4 @@ impl Task {
pub fn get_task(&self, task_id: &str) -> Result<metadata::Task> { pub fn get_task(&self, task_id: &str) -> Result<metadata::Task> {
self.storage.get_task(task_id) self.storage.get_task(task_id)
} }
// get_piece gets a piece from the local storage.
pub fn get_piece(&self, task_id: &str, number: i32) -> Result<metadata::Piece> {
self.storage.get_piece(task_id, number)
}
// get_pieces gets pieces from the local storage.
pub fn get_pieces(&self, task_id: &str) -> Result<Vec<metadata::Piece>> {
self.storage.get_pieces(task_id)
}
// download_piece_from_local_peer downloads a piece from a local peer.
pub async fn download_piece_from_local_peer(
&self,
task_id: &str,
number: i32,
) -> Result<impl AsyncRead> {
self.storage.upload_piece(task_id, number).await
}
// download_piece_from_remote_peer downloads a piece from a remote peer.
pub async fn download_piece_from_remote_peer(
&self,
task_id: &str,
number: i32,
remote_peer: Peer,
) -> Result<impl AsyncRead> {
// Create a dfdaemon client.
let host = remote_peer
.host
.clone()
.ok_or(Error::InvalidPeer(remote_peer.id))?;
let dfdaemon_client =
DfdaemonClient::new(format!("http://{}:{}", host.ip, host.port)).await?;
// Record the start of downloading piece.
self.storage.download_piece_started(task_id, number)?;
// Construct the interested pieces request.
let in_stream = tokio_stream::once(SyncPiecesRequest {
task_id: task_id.to_string(),
request: Some(sync_pieces_request::Request::InterestedPiecesRequest(
InterestedPiecesRequest {
piece_numbers: vec![number],
},
)),
});
// Send the interested pieces request.
let response = dfdaemon_client.sync_pieces(in_stream).await?;
let mut resp_stream = response.into_inner();
if let Some(message) = resp_stream.message().await? {
if let Some(sync_pieces_response::Response::InterestedPiecesResponse(
InterestedPiecesResponse { piece },
)) = message.response
{
if let Some(piece) = piece {
// Record the finish of downloading piece.
self.storage
.download_piece_from_remote_peer_finished(
task_id,
number,
piece.offset,
piece.digest.clone(),
&mut piece.content.as_slice(),
)
.await
.map_err(|err| {
// Record the failure of downloading piece,
// If storage fails to record piece.
error!("download piece finished: {}", err);
if let Some(err) =
self.storage.download_piece_failed(task_id, number).err()
{
error!("download piece failed: {}", err)
};
err
})?;
// Return reader of the piece.
return self.storage.upload_piece(task_id, number).await;
}
// Record the failure of downloading piece,
// if the piece is not found.
error!("piece not found");
self.storage.download_piece_failed(task_id, number)?;
return Err(Error::UnexpectedResponse());
}
// Record the failure of downloading piece,
// if the response is not found.
error!("response not found");
self.storage.download_piece_failed(task_id, number)?;
return Err(Error::UnexpectedResponse());
}
// Record the failure of downloading piece,
// if the message is not found.
error!("message not found");
self.storage.download_piece_failed(task_id, number)?;
Err(Error::UnexpectedResponse())
}
// download_piece_from_source downloads a piece from the source.
pub async fn download_piece_from_source(
&self,
task_id: &str,
number: i32,
url: &str,
offset: u64,
header: HeaderMap,
timeout: Duration,
) -> Result<impl AsyncRead> {
// Record the start of downloading piece.
self.storage.download_piece_started(task_id, number)?;
// Download the piece from the source.
let mut response = self
.http_client
.get(Request {
url: url.to_string(),
header,
timeout: Some(timeout),
})
.await
.map_err(|err| {
// Record the failure of downloading piece,
// if the request is failed.
error!("http error: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
error!("download piece failed: {}", err)
};
err
})?;
// HTTP status code is not OK, handle the error.
if !response.status_code.is_success() {
// Record the failure of downloading piece,
// if the status code is not OK.
self.storage.download_piece_failed(task_id, number)?;
let mut buffer = String::new();
response.reader.read_to_string(&mut buffer).await?;
error!("http error {}: {}", response.status_code, buffer.as_str());
return Err(Error::HTTP(HttpError {
status_code: response.status_code,
header: response.header,
body: buffer,
}));
}
// Record the finish of downloading piece.
self.storage
.download_piece_from_source_finished(task_id, number, offset, &mut response.reader)
.await
.map_err(|err| {
// Record the failure of downloading piece,
// If storage fails to record piece.
error!("download piece finished: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
error!("download piece failed: {}", err)
};
err
})?;
// Return reader of the piece.
self.storage.upload_piece(task_id, number).await
}
} }

235
src/task/piece.rs Normal file
View File

@ -0,0 +1,235 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::backend::http::{Request, HTTP};
use crate::grpc::{dfdaemon::DfdaemonClient, scheduler::SchedulerClient};
use crate::storage::{metadata, Storage};
use crate::{Error, HttpError, Result};
use dragonfly_api::common::v2::Peer;
use dragonfly_api::dfdaemon::v2::{
sync_pieces_request, sync_pieces_response, InterestedPiecesRequest, InterestedPiecesResponse,
SyncPiecesRequest,
};
use reqwest::header::HeaderMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt};
use tracing::error;
// Piece represents a piece manager.
pub struct Piece {
// manager_client is the grpc client of the manager.
storage: Arc<Storage>,
// scheduler_client is the grpc client of the scheduler.
pub scheduler_client: Arc<SchedulerClient>,
// http_client is the http client.
http_client: Arc<HTTP>,
}
// Piece implements the piece manager.
impl Piece {
// new returns a new Piece.
pub fn new(
storage: Arc<Storage>,
scheduler_client: Arc<SchedulerClient>,
http_client: Arc<HTTP>,
) -> Self {
Self {
storage,
scheduler_client,
http_client,
}
}
// get gets a piece from the local storage.
pub fn get(&self, task_id: &str, number: i32) -> Result<metadata::Piece> {
self.storage.get_piece(task_id, number)
}
// get_all gets all pieces from the local storage.
pub fn get_all(&self, task_id: &str) -> Result<Vec<metadata::Piece>> {
self.storage.get_pieces(task_id)
}
// download_from_local_peer downloads a piece from a local peer.
pub async fn download_from_local_peer(
&self,
task_id: &str,
number: i32,
) -> Result<impl AsyncRead> {
self.storage.upload_piece(task_id, number).await
}
// download_from_remote_peer downloads a piece from a remote peer.
pub async fn download_from_remote_peer(
&self,
task_id: &str,
number: i32,
remote_peer: Peer,
) -> Result<impl AsyncRead> {
// Create a dfdaemon client.
let host = remote_peer
.host
.clone()
.ok_or(Error::InvalidPeer(remote_peer.id))?;
let dfdaemon_client =
DfdaemonClient::new(format!("http://{}:{}", host.ip, host.port)).await?;
// Record the start of downloading piece.
self.storage.download_piece_started(task_id, number)?;
// Construct the interested pieces request.
let in_stream = tokio_stream::once(SyncPiecesRequest {
task_id: task_id.to_string(),
request: Some(sync_pieces_request::Request::InterestedPiecesRequest(
InterestedPiecesRequest {
piece_numbers: vec![number],
},
)),
});
// Send the interested pieces request.
let response = dfdaemon_client.sync_pieces(in_stream).await?;
let mut resp_stream = response.into_inner();
if let Some(message) = resp_stream.message().await? {
if let Some(sync_pieces_response::Response::InterestedPiecesResponse(
InterestedPiecesResponse { piece },
)) = message.response
{
if let Some(piece) = piece {
// Record the finish of downloading piece.
self.storage
.download_piece_from_remote_peer_finished(
task_id,
number,
piece.offset,
piece.digest.clone(),
&mut piece.content.as_slice(),
)
.await
.map_err(|err| {
// Record the failure of downloading piece,
// If storage fails to record piece.
error!("download piece finished: {}", err);
if let Some(err) =
self.storage.download_piece_failed(task_id, number).err()
{
error!("download piece failed: {}", err)
};
err
})?;
// Return reader of the piece.
return self.storage.upload_piece(task_id, number).await;
}
// Record the failure of downloading piece,
// if the piece is not found.
error!("piece not found");
self.storage.download_piece_failed(task_id, number)?;
return Err(Error::UnexpectedResponse());
}
// Record the failure of downloading piece,
// if the response is not found.
error!("response not found");
self.storage.download_piece_failed(task_id, number)?;
return Err(Error::UnexpectedResponse());
}
// Record the failure of downloading piece,
// if the message is not found.
error!("message not found");
self.storage.download_piece_failed(task_id, number)?;
Err(Error::UnexpectedResponse())
}
// download_from_source downloads a piece from the source.
#[allow(clippy::too_many_arguments)]
pub async fn download_from_source(
&self,
task_id: &str,
number: i32,
url: &str,
offset: u64,
length: u64,
header: HeaderMap,
timeout: Duration,
) -> Result<impl AsyncRead> {
// Record the start of downloading piece.
self.storage.download_piece_started(task_id, number)?;
// Download the piece from the source.
let mut response = self
.http_client
.get(Request {
url: url.to_string(),
header,
timeout: Some(timeout),
})
.await
.map_err(|err| {
// Record the failure of downloading piece,
// if the request is failed.
error!("http error: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
error!("download piece failed: {}", err)
};
err
})?;
// HTTP status code is not OK, handle the error.
if !response.status_code.is_success() {
// Record the failure of downloading piece,
// if the status code is not OK.
self.storage.download_piece_failed(task_id, number)?;
let mut buffer = String::new();
response.reader.read_to_string(&mut buffer).await?;
error!("http error {}: {}", response.status_code, buffer.as_str());
return Err(Error::HTTP(HttpError {
status_code: response.status_code,
header: response.header,
body: buffer,
}));
}
// Record the finish of downloading piece.
self.storage
.download_piece_from_source_finished(
task_id,
number,
offset,
length,
&mut response.reader,
)
.await
.map_err(|err| {
// Record the failure of downloading piece,
// If storage fails to record piece.
error!("download piece finished: {}", err);
if let Some(err) = self.storage.download_piece_failed(task_id, number).err() {
error!("download piece failed: {}", err)
};
err
})?;
// Return reader of the piece.
self.storage.upload_piece(task_id, number).await
}
}