feat: add piece collector for sync pieces from remote peers (#139)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-12-12 11:00:54 +08:00 committed by GitHub
parent 29ceb3b030
commit fa21c38e32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 303 additions and 143 deletions

View File

@ -55,7 +55,7 @@ local-ip-address = "0.5.3"
rocksdb = "0.21.0"
num_cpus = "1.0"
chrono = { version = "0.4.26", features = ["serde"] }
dragonfly-api = "2.0.59"
dragonfly-api = "2.0.60"
sysinfo = "0.29.6"
sha2 = "0.10"
hex = "0.4"
@ -69,3 +69,4 @@ rand = "0.8.5"
prost-wkt-types = "0.4"
tower = "0.4.13"
indicatif = "0.17.7"
dashmap = "5.5.3"

View File

@ -23,8 +23,8 @@ use dragonfly_api::dfdaemon::v2::{
dfdaemon_client::DfdaemonClient as DfdaemonGRPCClient,
dfdaemon_server::{Dfdaemon, DfdaemonServer as DfdaemonGRPCServer},
DeleteTaskRequest, DownloadPieceRequest, DownloadPieceResponse, DownloadTaskRequest,
DownloadTaskResponse, GetPieceNumbersRequest, GetPieceNumbersResponse,
StatTaskRequest as DfdaemonStatTaskRequest, UploadTaskRequest,
DownloadTaskResponse, StatTaskRequest as DfdaemonStatTaskRequest, SyncPiecesRequest,
SyncPiecesResponse, UploadTaskRequest,
};
use dragonfly_api::scheduler::v2::StatTaskRequest as SchedulerStatTaskRequest;
use std::net::SocketAddr;
@ -43,6 +43,9 @@ use tonic::{
use tower::service_fn;
use tracing::{error, info, instrument, Instrument, Span};
// DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL is the default interval for waiting for the piece to be finished.
const DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL: Duration = Duration::from_millis(800);
// DfdaemonUploadServer is the grpc server of the upload.
pub struct DfdaemonUploadServer {
// addr is the address of the grpc server.
@ -191,44 +194,105 @@ pub struct DfdaemonServerHandler {
// DfdaemonServerHandler implements the dfdaemon grpc service.
#[tonic::async_trait]
impl Dfdaemon for DfdaemonServerHandler {
// SyncPiecesStream is the stream of the sync pieces response.
type SyncPiecesStream = ReceiverStream<Result<SyncPiecesResponse, Status>>;
// get_piece_numbers gets the piece numbers.
#[instrument(skip_all, fields(task_id))]
async fn get_piece_numbers(
async fn sync_pieces(
&self,
request: Request<GetPieceNumbersRequest>,
) -> Result<Response<GetPieceNumbersResponse>, Status> {
request: Request<SyncPiecesRequest>,
) -> Result<Response<Self::SyncPiecesStream>, Status> {
// Clone the request.
let request = request.into_inner();
// Get the task id from the request.
let task_id = request.task_id.clone();
// Get the task id from tae request.
let task_id = request.task_id;
// Span record the task id.
Span::current().record("task_id", task_id.as_str());
Span::current().record("task_id", task_id.clone());
// Get the interested piece numbers from the request.
let mut interested_piece_numbers = request.interested_piece_numbers.clone();
// Clone the task.
let task = self.task.clone();
// Get the piece numbers from the local storage.
let piece_numbers: Vec<u32> = task
.piece
.get_all(task_id.as_str())
.map_err(|e| {
error!("get piece numbers from local storage: {}", e);
Status::internal(e.to_string())
})?
.iter()
.map(|piece| piece.number)
.collect();
info!("piece numbers: {:?}", piece_numbers);
// Initialize stream channel.
let (out_stream_tx, out_stream_rx) = mpsc::channel(128);
tokio::spawn(
async move {
loop {
let mut has_started_piece = false;
let mut finished_piece_numbers = Vec::new();
for interested_piece_number in interested_piece_numbers.iter() {
let piece = match task.piece.get(task_id.as_str(), *interested_piece_number)
{
Ok(Some(piece)) => piece,
Ok(None) => continue,
Err(err) => {
error!("get piece metadata: {}", err);
out_stream_tx
.send(Err(Status::internal(err.to_string())))
.await
.unwrap_or_else(|err| {
error!("send piece metadata to stream: {}", err);
});
// Check whether the piece numbers is empty.
if piece_numbers.is_empty() {
error!("piece numbers not found");
return Err(Status::not_found("piece numbers not found"));
}
drop(out_stream_tx);
return;
}
};
Ok(Response::new(GetPieceNumbersResponse { piece_numbers }))
// Send the piece metadata to the stream.
if piece.is_finished() {
info!("piece {} is finished", piece.number);
out_stream_tx
.send(Ok(SyncPiecesResponse {
piece_number: piece.number,
}))
.await
.unwrap_or_else(|err| {
error!("send finished pieces to stream: {}", err);
});
// Add the finished piece number to the finished piece numbers.
finished_piece_numbers.push(piece.number);
continue;
}
// Check whether the piece is started.
if piece.is_started() {
has_started_piece = true;
}
}
// Remove the finished piece numbers from the interested piece numbers.
interested_piece_numbers
.retain(|number| !finished_piece_numbers.contains(number));
// If all the interested pieces are finished, return.
if interested_piece_numbers.is_empty() {
info!("all the interested pieces are finished");
drop(out_stream_tx);
return;
}
// If there is no started piece, return.
if !has_started_piece {
info!("there is no started piece");
drop(out_stream_tx);
return;
}
// Wait for the piece to be finished.
tokio::time::sleep(DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL).await;
}
}
.in_current_span(),
);
Ok(Response::new(ReceiverStream::new(out_stream_rx)))
}
// sync_pieces syncs the pieces.
@ -515,13 +579,13 @@ impl DfdaemonClient {
// get_piece_numbers gets the piece numbers.
#[instrument(skip_all)]
pub async fn get_piece_numbers(
pub async fn sync_pieces(
&self,
request: GetPieceNumbersRequest,
) -> ClientResult<Vec<u32>> {
request: SyncPiecesRequest,
) -> ClientResult<tonic::Response<tonic::codec::Streaming<SyncPiecesResponse>>> {
let request = Self::make_request(request);
let response = self.client.clone().get_piece_numbers(request).await?;
Ok(response.into_inner().piece_numbers)
let response = self.client.clone().sync_pieces(request).await?;
Ok(response)
}
// sync_pieces syncs the pieces.

View File

@ -30,13 +30,15 @@ use dragonfly_api::scheduler::v2::{
DownloadPeerStartedRequest, DownloadPieceBackToSourceFailedRequest, DownloadPieceFailedRequest,
DownloadPieceFinishedRequest, HttpResponse, RegisterPeerRequest,
};
use mpsc::Sender;
use reqwest::header::{self, HeaderMap};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::{self, OpenOptions};
use tokio::sync::{mpsc, Semaphore};
use tokio::sync::{
mpsc::{self, Sender},
Semaphore,
};
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
@ -45,6 +47,7 @@ use tonic::Status;
use tracing::{error, info};
pub mod piece;
pub mod piece_collector;
// Task represents a task manager.
pub struct Task {
@ -456,14 +459,7 @@ impl Task {
announce_peer_response::Response::NormalTaskResponse(response) => {
// If the task is normal, download the pieces from the remote peer.
info!("normal task response: {:?}", response);
let collect_interested_pieces = self
.piece
.collect_interested_from_remote_peer(
task_id,
interested_pieces.clone(),
response.candidate_parents,
)
.await;
let collect_interested_pieces = Vec::new();
// Download the pieces from the remote peer.
finished_pieces = self

View File

@ -22,12 +22,10 @@ use crate::utils::digest::{Algorithm, Digest as UtilsDigest};
use crate::{Error, HTTPError, Result};
use chrono::Utc;
use dragonfly_api::common::v2::{Peer, Range};
use dragonfly_api::dfdaemon::v2::{DownloadPieceRequest, GetPieceNumbersRequest};
use rand::prelude::*;
use dragonfly_api::dfdaemon::v2::DownloadPieceRequest;
use reqwest::header::{self, HeaderMap};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use tokio::task::JoinSet;
use tokio::{
fs,
io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, SeekFrom},
@ -240,104 +238,6 @@ impl Piece {
.collect::<Vec<metadata::Piece>>()
}
// collect_interested_from_remote_peer collects the interested pieces from remote peers.
pub async fn collect_interested_from_remote_peer(
&self,
task_id: &str,
interested_pieces: Vec<metadata::Piece>,
candidate_parents: Vec<Peer>,
) -> Vec<CollectPiece> {
// Initialize the collect pieces.
let mut collect_pieces: Vec<CollectPiece> = Vec::new();
let mut join_set = JoinSet::new();
for candidate_parent in candidate_parents {
async fn get_piece_numbers(
task_id: String,
candidate_parent: Peer,
) -> Result<Vec<CollectPiece>> {
// If candidate_parent.host is None, skip it.
let candidate_parent_host = candidate_parent.host.clone().ok_or_else(|| {
error!("peer {:?} host is empty", candidate_parent);
Error::InvalidPeer(candidate_parent.id.clone())
})?;
// Initialize the collect pieces.
let mut collect_pieces: Vec<CollectPiece> = Vec::new();
// Create a dfdaemon client.
let dfdaemon_client = DfdaemonClient::new(format!(
"http://{}:{}",
candidate_parent_host.ip, candidate_parent_host.port
))
.await
.map_err(|err| {
error!("create dfdaemon client failed: {}", err);
err
})?;
let collect_piece_numbers = dfdaemon_client
.get_piece_numbers(GetPieceNumbersRequest {
task_id: task_id.to_string(),
})
.await
.map_err(|err| {
error!("get piece numbers failed: {}", err);
err
})?;
// Construct the collect pieces.
for collect_piece_number in collect_piece_numbers {
collect_pieces.push(CollectPiece {
number: collect_piece_number,
parent: candidate_parent.clone(),
});
}
Ok(collect_pieces)
}
join_set.spawn(get_piece_numbers(
task_id.to_string(),
candidate_parent.clone(),
));
}
while let Some(message) = join_set.join_next().await {
match message {
Ok(Ok(new_collect_pieces)) => {
collect_pieces.extend(new_collect_pieces);
}
Ok(Err(err)) => {
error!("get piece numbers failed: {}", err);
}
Err(err) => {
error!("join set failed: {}", err);
}
}
}
// Shuffle the collect pieces.
collect_pieces.shuffle(&mut rand::thread_rng());
// Filter the collect pieces and remove the duplicate pieces.
let mut visited: Vec<u32> = Vec::new();
collect_pieces.retain(|collect_piece| {
interested_pieces
.iter()
.any(|interested_piece| interested_piece.number != collect_piece.number)
|| match visited.contains(&collect_piece.number) {
true => false,
false => {
visited.push(collect_piece.number);
true
}
}
});
collect_pieces
}
// download_from_local_peer_into_async_read downloads a single piece from a local peer.
pub async fn download_from_local_peer_into_async_read(
&self,

199
src/task/piece_collector.rs Normal file
View File

@ -0,0 +1,199 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::grpc::dfdaemon::DfdaemonClient;
use crate::storage::metadata;
use crate::{Error, Result};
use dashmap::{DashMap, DashSet};
use dragonfly_api::common::v2::Peer;
use dragonfly_api::dfdaemon::v2::SyncPiecesRequest;
use std::sync::Arc;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;
use tracing::{error, info};
// CollectedPiece is the piece collected from a peer.
pub struct CollectedPiece {
// number is the piece number.
pub number: u32,
// parent is the parent peer.
pub parent: Peer,
}
// PieceCollector is used to collect pieces from peers.
pub struct PieceCollector {
// task_id is the id of the task.
task_id: String,
// peers is the peers to collect pieces from.
peers: Vec<Peer>,
// interested_pieces is the pieces interested by the collector.
interested_pieces: Vec<metadata::Piece>,
// collected_pieces is the pieces collected from peers.
collected_pieces: Arc<DashMap<u32, DashSet<String>>>,
}
impl PieceCollector {
// NewPieceCollector returns a new PieceCollector.
pub fn new(task_id: &str, interested_pieces: Vec<metadata::Piece>, peers: Vec<Peer>) -> Self {
// Initialize collected_pieces.
let collected_pieces = Arc::new(DashMap::new());
interested_pieces
.clone()
.into_iter()
.for_each(|interested_piece| {
collected_pieces.insert(interested_piece.number, DashSet::new());
});
Self {
task_id: task_id.to_string(),
peers,
interested_pieces,
collected_pieces,
}
}
// Run runs the collector.
pub async fn collect(&self) -> Receiver<CollectedPiece> {
let task_id = self.task_id.clone();
let peers = self.peers.clone();
let interested_pieces = self.interested_pieces.clone();
let collected_pieces = self.collected_pieces.clone();
let (collected_piece_tx, collected_piece_rx) = mpsc::channel(128);
tokio::spawn(async move {
Self::collect_from_remote_peers(
task_id.clone(),
peers.clone(),
interested_pieces.clone(),
collected_pieces.clone(),
collected_piece_tx,
)
.await;
});
collected_piece_rx
}
// collect collects a piece from peers.
pub async fn collect_from_remote_peers(
task_id: String,
peers: Vec<Peer>,
interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<DashMap<u32, DashSet<String>>>,
collected_piece_tx: Sender<CollectedPiece>,
) {
// Create a task to collect pieces from peers.
let mut join_set = JoinSet::new();
for peer in peers.iter() {
async fn sync_pieces(
task_id: String,
peer: Peer,
peers: Vec<Peer>,
interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<DashMap<u32, DashSet<String>>>,
collected_piece_tx: Sender<CollectedPiece>,
) -> Result<Peer> {
// If candidate_parent.host is None, skip it.
let host = peer.host.clone().ok_or_else(|| {
error!("peer {:?} host is empty", peer);
Error::InvalidPeer(peer.id.clone())
})?;
// Create a dfdaemon client.
let dfdaemon_client =
DfdaemonClient::new(format!("http://{}:{}", host.ip, host.port)).await?;
let response = dfdaemon_client
.sync_pieces(SyncPiecesRequest {
task_id: task_id.to_string(),
interested_piece_numbers: interested_pieces
.iter()
.map(|piece| piece.number)
.collect(),
})
.await?;
let mut out_stream = response.into_inner();
while let Some(message) = out_stream.message().await? {
collected_pieces
.entry(message.piece_number)
.and_modify(|peers| {
peers.insert(peer.id.clone());
});
match collected_pieces.get(&message.piece_number) {
Some(parents) => {
if let Some(parent) = parents.iter().next() {
let number = message.piece_number;
let parent = peers
.iter()
.find(|peer| peer.id == parent.as_str())
.ok_or_else(|| {
error!("parent {} not found", parent.as_str());
Error::InvalidPeer(parent.clone())
})?;
collected_piece_tx
.send(CollectedPiece {
number,
parent: parent.clone(),
})
.await?;
collected_pieces.remove(&number);
}
}
None => continue,
};
}
Ok(peer)
}
join_set.spawn(sync_pieces(
task_id.clone(),
peer.clone(),
peers.clone(),
interested_pieces.clone(),
collected_pieces.clone(),
collected_piece_tx.clone(),
));
}
// Wait for all tasks to finish.
while let Some(message) = join_set.join_next().await {
match message {
Ok(Ok(peer)) => {
info!("peer {} sync pieces finished", peer.id);
}
Ok(Err(err)) => {
error!("sync pieces failed: {}", err);
}
Err(err) => {
error!("sync pieces failed: {}", err);
}
}
// If all pieces are collected, abort all tasks.
if collected_pieces.len() == 0 {
join_set.abort_all();
}
}
}
}