From fa21c38e32998334e9f44ff90799c1424830a50e Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 12 Dec 2023 11:00:54 +0800 Subject: [PATCH] feat: add piece collector for sync pieces from remote peers (#139) Signed-off-by: Gaius --- Cargo.toml | 3 +- src/grpc/dfdaemon.rs | 126 +++++++++++++++++------ src/task/mod.rs | 16 ++- src/task/piece.rs | 102 +----------------- src/task/piece_collector.rs | 199 ++++++++++++++++++++++++++++++++++++ 5 files changed, 303 insertions(+), 143 deletions(-) create mode 100644 src/task/piece_collector.rs diff --git a/Cargo.toml b/Cargo.toml index e138ff99..709116e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ local-ip-address = "0.5.3" rocksdb = "0.21.0" num_cpus = "1.0" chrono = { version = "0.4.26", features = ["serde"] } -dragonfly-api = "2.0.59" +dragonfly-api = "2.0.60" sysinfo = "0.29.6" sha2 = "0.10" hex = "0.4" @@ -69,3 +69,4 @@ rand = "0.8.5" prost-wkt-types = "0.4" tower = "0.4.13" indicatif = "0.17.7" +dashmap = "5.5.3" diff --git a/src/grpc/dfdaemon.rs b/src/grpc/dfdaemon.rs index 50ae19fb..62dd31e7 100644 --- a/src/grpc/dfdaemon.rs +++ b/src/grpc/dfdaemon.rs @@ -23,8 +23,8 @@ use dragonfly_api::dfdaemon::v2::{ dfdaemon_client::DfdaemonClient as DfdaemonGRPCClient, dfdaemon_server::{Dfdaemon, DfdaemonServer as DfdaemonGRPCServer}, DeleteTaskRequest, DownloadPieceRequest, DownloadPieceResponse, DownloadTaskRequest, - DownloadTaskResponse, GetPieceNumbersRequest, GetPieceNumbersResponse, - StatTaskRequest as DfdaemonStatTaskRequest, UploadTaskRequest, + DownloadTaskResponse, StatTaskRequest as DfdaemonStatTaskRequest, SyncPiecesRequest, + SyncPiecesResponse, UploadTaskRequest, }; use dragonfly_api::scheduler::v2::StatTaskRequest as SchedulerStatTaskRequest; use std::net::SocketAddr; @@ -43,6 +43,9 @@ use tonic::{ use tower::service_fn; use tracing::{error, info, instrument, Instrument, Span}; +// DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL is the default interval for waiting for the piece to be finished. +const DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL: Duration = Duration::from_millis(800); + // DfdaemonUploadServer is the grpc server of the upload. pub struct DfdaemonUploadServer { // addr is the address of the grpc server. @@ -191,44 +194,105 @@ pub struct DfdaemonServerHandler { // DfdaemonServerHandler implements the dfdaemon grpc service. #[tonic::async_trait] impl Dfdaemon for DfdaemonServerHandler { + // SyncPiecesStream is the stream of the sync pieces response. + type SyncPiecesStream = ReceiverStream>; + // get_piece_numbers gets the piece numbers. #[instrument(skip_all, fields(task_id))] - async fn get_piece_numbers( + async fn sync_pieces( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { // Clone the request. let request = request.into_inner(); - // Get the task id from the request. - let task_id = request.task_id.clone(); + // Get the task id from tae request. + let task_id = request.task_id; // Span record the task id. - Span::current().record("task_id", task_id.as_str()); + Span::current().record("task_id", task_id.clone()); + + // Get the interested piece numbers from the request. + let mut interested_piece_numbers = request.interested_piece_numbers.clone(); // Clone the task. let task = self.task.clone(); - // Get the piece numbers from the local storage. - let piece_numbers: Vec = task - .piece - .get_all(task_id.as_str()) - .map_err(|e| { - error!("get piece numbers from local storage: {}", e); - Status::internal(e.to_string()) - })? - .iter() - .map(|piece| piece.number) - .collect(); - info!("piece numbers: {:?}", piece_numbers); + // Initialize stream channel. + let (out_stream_tx, out_stream_rx) = mpsc::channel(128); + tokio::spawn( + async move { + loop { + let mut has_started_piece = false; + let mut finished_piece_numbers = Vec::new(); + for interested_piece_number in interested_piece_numbers.iter() { + let piece = match task.piece.get(task_id.as_str(), *interested_piece_number) + { + Ok(Some(piece)) => piece, + Ok(None) => continue, + Err(err) => { + error!("get piece metadata: {}", err); + out_stream_tx + .send(Err(Status::internal(err.to_string()))) + .await + .unwrap_or_else(|err| { + error!("send piece metadata to stream: {}", err); + }); - // Check whether the piece numbers is empty. - if piece_numbers.is_empty() { - error!("piece numbers not found"); - return Err(Status::not_found("piece numbers not found")); - } + drop(out_stream_tx); + return; + } + }; - Ok(Response::new(GetPieceNumbersResponse { piece_numbers })) + // Send the piece metadata to the stream. + if piece.is_finished() { + info!("piece {} is finished", piece.number); + out_stream_tx + .send(Ok(SyncPiecesResponse { + piece_number: piece.number, + })) + .await + .unwrap_or_else(|err| { + error!("send finished pieces to stream: {}", err); + }); + + // Add the finished piece number to the finished piece numbers. + finished_piece_numbers.push(piece.number); + continue; + } + + // Check whether the piece is started. + if piece.is_started() { + has_started_piece = true; + } + } + + // Remove the finished piece numbers from the interested piece numbers. + interested_piece_numbers + .retain(|number| !finished_piece_numbers.contains(number)); + + // If all the interested pieces are finished, return. + if interested_piece_numbers.is_empty() { + info!("all the interested pieces are finished"); + drop(out_stream_tx); + return; + } + + // If there is no started piece, return. + if !has_started_piece { + info!("there is no started piece"); + drop(out_stream_tx); + return; + } + + // Wait for the piece to be finished. + tokio::time::sleep(DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL).await; + } + } + .in_current_span(), + ); + + Ok(Response::new(ReceiverStream::new(out_stream_rx))) } // sync_pieces syncs the pieces. @@ -515,13 +579,13 @@ impl DfdaemonClient { // get_piece_numbers gets the piece numbers. #[instrument(skip_all)] - pub async fn get_piece_numbers( + pub async fn sync_pieces( &self, - request: GetPieceNumbersRequest, - ) -> ClientResult> { + request: SyncPiecesRequest, + ) -> ClientResult>> { let request = Self::make_request(request); - let response = self.client.clone().get_piece_numbers(request).await?; - Ok(response.into_inner().piece_numbers) + let response = self.client.clone().sync_pieces(request).await?; + Ok(response) } // sync_pieces syncs the pieces. diff --git a/src/task/mod.rs b/src/task/mod.rs index c97641c9..5ae2fc87 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -30,13 +30,15 @@ use dragonfly_api::scheduler::v2::{ DownloadPeerStartedRequest, DownloadPieceBackToSourceFailedRequest, DownloadPieceFailedRequest, DownloadPieceFinishedRequest, HttpResponse, RegisterPeerRequest, }; -use mpsc::Sender; use reqwest::header::{self, HeaderMap}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::fs::{self, OpenOptions}; -use tokio::sync::{mpsc, Semaphore}; +use tokio::sync::{ + mpsc::{self, Sender}, + Semaphore, +}; use tokio::task::JoinSet; use tokio::time::sleep; use tokio_stream::wrappers::ReceiverStream; @@ -45,6 +47,7 @@ use tonic::Status; use tracing::{error, info}; pub mod piece; +pub mod piece_collector; // Task represents a task manager. pub struct Task { @@ -456,14 +459,7 @@ impl Task { announce_peer_response::Response::NormalTaskResponse(response) => { // If the task is normal, download the pieces from the remote peer. info!("normal task response: {:?}", response); - let collect_interested_pieces = self - .piece - .collect_interested_from_remote_peer( - task_id, - interested_pieces.clone(), - response.candidate_parents, - ) - .await; + let collect_interested_pieces = Vec::new(); // Download the pieces from the remote peer. finished_pieces = self diff --git a/src/task/piece.rs b/src/task/piece.rs index 42d36e9e..45b866a9 100644 --- a/src/task/piece.rs +++ b/src/task/piece.rs @@ -22,12 +22,10 @@ use crate::utils::digest::{Algorithm, Digest as UtilsDigest}; use crate::{Error, HTTPError, Result}; use chrono::Utc; use dragonfly_api::common::v2::{Peer, Range}; -use dragonfly_api::dfdaemon::v2::{DownloadPieceRequest, GetPieceNumbersRequest}; -use rand::prelude::*; +use dragonfly_api::dfdaemon::v2::DownloadPieceRequest; use reqwest::header::{self, HeaderMap}; use sha2::{Digest, Sha256}; use std::sync::Arc; -use tokio::task::JoinSet; use tokio::{ fs, io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, SeekFrom}, @@ -240,104 +238,6 @@ impl Piece { .collect::>() } - // collect_interested_from_remote_peer collects the interested pieces from remote peers. - pub async fn collect_interested_from_remote_peer( - &self, - task_id: &str, - interested_pieces: Vec, - candidate_parents: Vec, - ) -> Vec { - // Initialize the collect pieces. - let mut collect_pieces: Vec = Vec::new(); - - let mut join_set = JoinSet::new(); - for candidate_parent in candidate_parents { - async fn get_piece_numbers( - task_id: String, - candidate_parent: Peer, - ) -> Result> { - // If candidate_parent.host is None, skip it. - let candidate_parent_host = candidate_parent.host.clone().ok_or_else(|| { - error!("peer {:?} host is empty", candidate_parent); - Error::InvalidPeer(candidate_parent.id.clone()) - })?; - - // Initialize the collect pieces. - let mut collect_pieces: Vec = Vec::new(); - - // Create a dfdaemon client. - let dfdaemon_client = DfdaemonClient::new(format!( - "http://{}:{}", - candidate_parent_host.ip, candidate_parent_host.port - )) - .await - .map_err(|err| { - error!("create dfdaemon client failed: {}", err); - err - })?; - - let collect_piece_numbers = dfdaemon_client - .get_piece_numbers(GetPieceNumbersRequest { - task_id: task_id.to_string(), - }) - .await - .map_err(|err| { - error!("get piece numbers failed: {}", err); - err - })?; - - // Construct the collect pieces. - for collect_piece_number in collect_piece_numbers { - collect_pieces.push(CollectPiece { - number: collect_piece_number, - parent: candidate_parent.clone(), - }); - } - - Ok(collect_pieces) - } - - join_set.spawn(get_piece_numbers( - task_id.to_string(), - candidate_parent.clone(), - )); - } - - while let Some(message) = join_set.join_next().await { - match message { - Ok(Ok(new_collect_pieces)) => { - collect_pieces.extend(new_collect_pieces); - } - Ok(Err(err)) => { - error!("get piece numbers failed: {}", err); - } - Err(err) => { - error!("join set failed: {}", err); - } - } - } - - // Shuffle the collect pieces. - collect_pieces.shuffle(&mut rand::thread_rng()); - - // Filter the collect pieces and remove the duplicate pieces. - let mut visited: Vec = Vec::new(); - collect_pieces.retain(|collect_piece| { - interested_pieces - .iter() - .any(|interested_piece| interested_piece.number != collect_piece.number) - || match visited.contains(&collect_piece.number) { - true => false, - false => { - visited.push(collect_piece.number); - true - } - } - }); - - collect_pieces - } - // download_from_local_peer_into_async_read downloads a single piece from a local peer. pub async fn download_from_local_peer_into_async_read( &self, diff --git a/src/task/piece_collector.rs b/src/task/piece_collector.rs new file mode 100644 index 00000000..9944e0cd --- /dev/null +++ b/src/task/piece_collector.rs @@ -0,0 +1,199 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::grpc::dfdaemon::DfdaemonClient; +use crate::storage::metadata; +use crate::{Error, Result}; +use dashmap::{DashMap, DashSet}; +use dragonfly_api::common::v2::Peer; +use dragonfly_api::dfdaemon::v2::SyncPiecesRequest; +use std::sync::Arc; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::task::JoinSet; +use tracing::{error, info}; + +// CollectedPiece is the piece collected from a peer. +pub struct CollectedPiece { + // number is the piece number. + pub number: u32, + + // parent is the parent peer. + pub parent: Peer, +} + +// PieceCollector is used to collect pieces from peers. +pub struct PieceCollector { + // task_id is the id of the task. + task_id: String, + + // peers is the peers to collect pieces from. + peers: Vec, + + // interested_pieces is the pieces interested by the collector. + interested_pieces: Vec, + + // collected_pieces is the pieces collected from peers. + collected_pieces: Arc>>, +} + +impl PieceCollector { + // NewPieceCollector returns a new PieceCollector. + pub fn new(task_id: &str, interested_pieces: Vec, peers: Vec) -> Self { + // Initialize collected_pieces. + let collected_pieces = Arc::new(DashMap::new()); + interested_pieces + .clone() + .into_iter() + .for_each(|interested_piece| { + collected_pieces.insert(interested_piece.number, DashSet::new()); + }); + + Self { + task_id: task_id.to_string(), + peers, + interested_pieces, + collected_pieces, + } + } + + // Run runs the collector. + pub async fn collect(&self) -> Receiver { + let task_id = self.task_id.clone(); + let peers = self.peers.clone(); + let interested_pieces = self.interested_pieces.clone(); + let collected_pieces = self.collected_pieces.clone(); + let (collected_piece_tx, collected_piece_rx) = mpsc::channel(128); + tokio::spawn(async move { + Self::collect_from_remote_peers( + task_id.clone(), + peers.clone(), + interested_pieces.clone(), + collected_pieces.clone(), + collected_piece_tx, + ) + .await; + }); + + collected_piece_rx + } + + // collect collects a piece from peers. + pub async fn collect_from_remote_peers( + task_id: String, + peers: Vec, + interested_pieces: Vec, + collected_pieces: Arc>>, + collected_piece_tx: Sender, + ) { + // Create a task to collect pieces from peers. + let mut join_set = JoinSet::new(); + for peer in peers.iter() { + async fn sync_pieces( + task_id: String, + peer: Peer, + peers: Vec, + interested_pieces: Vec, + collected_pieces: Arc>>, + collected_piece_tx: Sender, + ) -> Result { + // If candidate_parent.host is None, skip it. + let host = peer.host.clone().ok_or_else(|| { + error!("peer {:?} host is empty", peer); + Error::InvalidPeer(peer.id.clone()) + })?; + + // Create a dfdaemon client. + let dfdaemon_client = + DfdaemonClient::new(format!("http://{}:{}", host.ip, host.port)).await?; + + let response = dfdaemon_client + .sync_pieces(SyncPiecesRequest { + task_id: task_id.to_string(), + interested_piece_numbers: interested_pieces + .iter() + .map(|piece| piece.number) + .collect(), + }) + .await?; + + let mut out_stream = response.into_inner(); + while let Some(message) = out_stream.message().await? { + collected_pieces + .entry(message.piece_number) + .and_modify(|peers| { + peers.insert(peer.id.clone()); + }); + + match collected_pieces.get(&message.piece_number) { + Some(parents) => { + if let Some(parent) = parents.iter().next() { + let number = message.piece_number; + let parent = peers + .iter() + .find(|peer| peer.id == parent.as_str()) + .ok_or_else(|| { + error!("parent {} not found", parent.as_str()); + Error::InvalidPeer(parent.clone()) + })?; + + collected_piece_tx + .send(CollectedPiece { + number, + parent: parent.clone(), + }) + .await?; + + collected_pieces.remove(&number); + } + } + None => continue, + }; + } + + Ok(peer) + } + + join_set.spawn(sync_pieces( + task_id.clone(), + peer.clone(), + peers.clone(), + interested_pieces.clone(), + collected_pieces.clone(), + collected_piece_tx.clone(), + )); + } + + // Wait for all tasks to finish. + while let Some(message) = join_set.join_next().await { + match message { + Ok(Ok(peer)) => { + info!("peer {} sync pieces finished", peer.id); + } + Ok(Err(err)) => { + error!("sync pieces failed: {}", err); + } + Err(err) => { + error!("sync pieces failed: {}", err); + } + } + + // If all pieces are collected, abort all tasks. + if collected_pieces.len() == 0 { + join_set.abort_all(); + } + } + } +}