feat: optimize mutex in dynconfig and add keepalive to hyper proxy (#513)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-05-30 15:05:28 +08:00 committed by GitHub
parent 71a8892057
commit e2a907a05c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 73 additions and 53 deletions

16
Cargo.lock generated
View File

@ -959,7 +959,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client" name = "dragonfly-client"
version = "0.1.72" version = "0.1.73"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
@ -1023,7 +1023,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-backend" name = "dragonfly-client-backend"
version = "0.1.72" version = "0.1.73"
dependencies = [ dependencies = [
"dragonfly-api", "dragonfly-api",
"dragonfly-client-core", "dragonfly-client-core",
@ -1042,7 +1042,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-config" name = "dragonfly-client-config"
version = "0.1.72" version = "0.1.73"
dependencies = [ dependencies = [
"dragonfly-client-core", "dragonfly-client-core",
"home", "home",
@ -1061,7 +1061,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-core" name = "dragonfly-client-core"
version = "0.1.72" version = "0.1.73"
dependencies = [ dependencies = [
"libloading", "libloading",
"reqwest", "reqwest",
@ -1072,7 +1072,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-init" name = "dragonfly-client-init"
version = "0.1.72" version = "0.1.73"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
@ -1088,7 +1088,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-storage" name = "dragonfly-client-storage"
version = "0.1.72" version = "0.1.73"
dependencies = [ dependencies = [
"base16ct", "base16ct",
"blake3", "blake3",
@ -1112,7 +1112,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-util" name = "dragonfly-client-util"
version = "0.1.72" version = "0.1.73"
dependencies = [ dependencies = [
"dragonfly-api", "dragonfly-api",
"dragonfly-client-core", "dragonfly-client-core",
@ -1570,7 +1570,7 @@ dependencies = [
[[package]] [[package]]
name = "hdfs" name = "hdfs"
version = "0.1.72" version = "0.1.73"
dependencies = [ dependencies = [
"dragonfly-client-backend", "dragonfly-client-backend",
"dragonfly-client-core", "dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "0.1.72" version = "0.1.73"
authors = ["The Dragonfly Developers"] authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/" homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git" repository = "https://github.com/dragonflyoss/client.git"
@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021" edition = "2021"
[workspace.dependencies] [workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.72" } dragonfly-client = { path = "dragonfly-client", version = "0.1.73" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.72" } dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.73" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.72" } dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.73" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.72" } dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.73" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.72" } dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.73" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.72" } dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.73" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.72" } dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.73" }
thiserror = "1.0" thiserror = "1.0"
dragonfly-api = "2.0.114" dragonfly-api = "2.0.114"
reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }

View File

@ -275,7 +275,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
error!("missing content length in the response"); error!("missing content length in the response");
return Err(Status::internal("missing content length in the response")); return Err(Status::internal("missing content length in the response"));
}; };
info!("content length: {}", content_length); info!("content length: {}", content_length);
// Download's range priority is higher than the request header's range. // Download's range priority is higher than the request header's range.

View File

@ -85,10 +85,11 @@ impl SchedulerClient {
pub async fn announce_peer( pub async fn announce_peer(
&self, &self,
task_id: &str, task_id: &str,
peer_id: &str,
request: impl tonic::IntoStreamingRequest<Message = AnnouncePeerRequest>, request: impl tonic::IntoStreamingRequest<Message = AnnouncePeerRequest>,
) -> Result<tonic::Response<tonic::codec::Streaming<AnnouncePeerResponse>>> { ) -> Result<tonic::Response<tonic::codec::Streaming<AnnouncePeerResponse>>> {
let response = self let response = self
.client(task_id.to_string()) .client(task_id, Some(peer_id))
.await? .await?
.announce_peer(request) .announce_peer(request)
.await?; .await?;
@ -99,11 +100,7 @@ impl SchedulerClient {
#[instrument(skip(self))] #[instrument(skip(self))]
pub async fn stat_peer(&self, task_id: &str, request: StatPeerRequest) -> Result<Peer> { pub async fn stat_peer(&self, task_id: &str, request: StatPeerRequest) -> Result<Peer> {
let request = Self::make_request(request); let request = Self::make_request(request);
let response = self let response = self.client(task_id, None).await?.stat_peer(request).await?;
.client(task_id.to_string())
.await?
.stat_peer(request)
.await?;
Ok(response.into_inner()) Ok(response.into_inner())
} }
@ -111,7 +108,7 @@ impl SchedulerClient {
#[instrument(skip(self))] #[instrument(skip(self))]
pub async fn leave_peer(&self, task_id: &str, request: LeavePeerRequest) -> Result<()> { pub async fn leave_peer(&self, task_id: &str, request: LeavePeerRequest) -> Result<()> {
let request = Self::make_request(request); let request = Self::make_request(request);
self.client(task_id.to_string()) self.client(task_id, None)
.await? .await?
.leave_peer(request) .leave_peer(request)
.await?; .await?;
@ -127,7 +124,7 @@ impl SchedulerClient {
) -> Result<ExchangePeerResponse> { ) -> Result<ExchangePeerResponse> {
let request = Self::make_request(request); let request = Self::make_request(request);
let response = self let response = self
.client(task_id.to_string()) .client(task_id, None)
.await? .await?
.exchange_peer(request) .exchange_peer(request)
.await?; .await?;
@ -138,11 +135,7 @@ impl SchedulerClient {
#[instrument(skip(self))] #[instrument(skip(self))]
pub async fn stat_task(&self, task_id: &str, request: StatTaskRequest) -> Result<Task> { pub async fn stat_task(&self, task_id: &str, request: StatTaskRequest) -> Result<Task> {
let request = Self::make_request(request); let request = Self::make_request(request);
let response = self let response = self.client(task_id, None).await?.stat_task(request).await?;
.client(task_id.to_string())
.await?
.stat_task(request)
.await?;
Ok(response.into_inner()) Ok(response.into_inner())
} }
@ -150,7 +143,7 @@ impl SchedulerClient {
#[instrument(skip(self))] #[instrument(skip(self))]
pub async fn leave_task(&self, task_id: &str, request: LeaveTaskRequest) -> Result<()> { pub async fn leave_task(&self, task_id: &str, request: LeaveTaskRequest) -> Result<()> {
let request = Self::make_request(request); let request = Self::make_request(request);
self.client(task_id.to_string()) self.client(task_id, None)
.await? .await?
.leave_task(request) .leave_task(request)
.await?; .await?;
@ -162,7 +155,10 @@ impl SchedulerClient {
pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> { pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> {
let mut join_set = JoinSet::new(); let mut join_set = JoinSet::new();
let available_scheduler_addrs = self.available_scheduler_addrs.read().await; let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler_addr in available_scheduler_addrs.iter() { let available_scheduler_addrs_clone = available_scheduler_addrs.clone();
drop(available_scheduler_addrs);
for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone()); let request = Self::make_request(request.clone());
async fn announce_host( async fn announce_host(
addr: SocketAddr, addr: SocketAddr,
@ -214,7 +210,10 @@ impl SchedulerClient {
// Announce the host to the scheduler. // Announce the host to the scheduler.
let mut join_set = JoinSet::new(); let mut join_set = JoinSet::new();
let available_scheduler_addrs = self.available_scheduler_addrs.read().await; let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler_addr in available_scheduler_addrs.iter() { let available_scheduler_addrs_clone = available_scheduler_addrs.clone();
drop(available_scheduler_addrs);
for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone()); let request = Self::make_request(request.clone());
async fn announce_host( async fn announce_host(
addr: SocketAddr, addr: SocketAddr,
@ -265,7 +264,10 @@ impl SchedulerClient {
// Leave the host from the scheduler. // Leave the host from the scheduler.
let mut join_set = JoinSet::new(); let mut join_set = JoinSet::new();
let available_scheduler_addrs = self.available_scheduler_addrs.read().await; let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler_addr in available_scheduler_addrs.iter() { let available_scheduler_addrs_clone = available_scheduler_addrs.clone();
drop(available_scheduler_addrs);
for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone()); let request = Self::make_request(request.clone());
async fn leave_host( async fn leave_host(
addr: SocketAddr, addr: SocketAddr,
@ -309,16 +311,21 @@ impl SchedulerClient {
// client gets the grpc client of the scheduler. // client gets the grpc client of the scheduler.
#[instrument(skip(self))] #[instrument(skip(self))]
async fn client(&self, key: String) -> Result<SchedulerGRPCClient<Channel>> { async fn client(
&self,
task_id: &str,
peer_id: Option<&str>,
) -> Result<SchedulerGRPCClient<Channel>> {
// Update scheduler addresses of the client. // Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await?; self.update_available_scheduler_addrs().await?;
// Get the scheduler address from the hashring. // Get the scheduler address from the hashring.
let addr = self.hashring.read().await; let addrs = self.hashring.read().await;
let addr = addr let addr = *addrs
.get(&key[0..5].to_string()) .get(&task_id[0..5].to_string())
.ok_or_else(|| Error::HashRing(key.clone()))?; .ok_or_else(|| Error::HashRing(task_id.to_string()))?;
info!("{} picked {:?}", key, addr); drop(addrs);
info!("picked {:?}", addr);
let channel = match Channel::from_shared(format!("http://{}", addr)) let channel = match Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))? .map_err(|_| Error::InvalidURI(addr.to_string()))?
@ -347,38 +354,40 @@ impl SchedulerClient {
async fn update_available_scheduler_addrs(&self) -> Result<()> { async fn update_available_scheduler_addrs(&self) -> Result<()> {
// Get the endpoints of available schedulers. // Get the endpoints of available schedulers.
let data = self.dynconfig.data.read().await; let data = self.dynconfig.data.read().await;
let data_available_schedulers_clone = data.available_schedulers.clone();
drop(data);
// Check if the available schedulers is empty. // Check if the available schedulers is empty.
if data.available_schedulers.is_empty() { if data_available_schedulers_clone.is_empty() {
return Err(Error::AvailableSchedulersNotFound); return Err(Error::AvailableSchedulersNotFound);
} }
// Get the available schedulers. // Get the available schedulers.
let available_schedulers = self.available_schedulers.read().await; let available_schedulers = self.available_schedulers.read().await;
let available_schedulers_clone = available_schedulers.clone();
drop(available_schedulers);
// Check if the available schedulers is not changed. // Check if the available schedulers is not changed.
if data.available_schedulers.len() == available_schedulers.len() if data_available_schedulers_clone.len() == available_schedulers_clone.len()
&& data && data_available_schedulers_clone
.available_schedulers
.iter() .iter()
.zip(available_schedulers.iter()) .zip(available_schedulers_clone.iter())
.all(|(a, b)| a == b) .all(|(a, b)| a == b)
{ {
info!( info!(
"available schedulers is not changed: {:?}", "available schedulers is not changed: {:?}",
data.available_schedulers data_available_schedulers_clone
.iter() .iter()
.map(|s| s.ip.clone()) .map(|s| s.ip.clone())
.collect::<Vec<String>>() .collect::<Vec<String>>()
); );
return Ok(()); return Ok(());
} }
drop(available_schedulers);
let mut new_available_schedulers = Vec::new(); let mut new_available_schedulers = Vec::new();
let mut new_available_scheduler_addrs = Vec::new(); let mut new_available_scheduler_addrs = Vec::new();
let mut new_hashring = HashRing::new(); let mut new_hashring = HashRing::new();
for available_scheduler in data.available_schedulers.iter() { for available_scheduler in data_available_schedulers_clone.iter() {
let ip = match IpAddr::from_str(&available_scheduler.ip) { let ip = match IpAddr::from_str(&available_scheduler.ip) {
Ok(ip) => ip, Ok(ip) => ip,
Err(err) => { Err(err) => {
@ -403,20 +412,25 @@ impl SchedulerClient {
// Update the available schedulers. // Update the available schedulers.
let mut available_schedulers = self.available_schedulers.write().await; let mut available_schedulers = self.available_schedulers.write().await;
*available_schedulers = new_available_schedulers; *available_schedulers = new_available_schedulers;
drop(available_schedulers);
// Update the addresses of available schedulers. // Update the addresses of available schedulers.
let mut available_scheduler_addrs = self.available_scheduler_addrs.write().await; let mut available_scheduler_addrs = self.available_scheduler_addrs.write().await;
*available_scheduler_addrs = new_available_scheduler_addrs; *available_scheduler_addrs = new_available_scheduler_addrs;
drop(available_scheduler_addrs);
// Update the hashring. // Update the hashring.
let mut hashring = self.hashring.write().await; let mut hashring = self.hashring.write().await;
*hashring = new_hashring; *hashring = new_hashring;
drop(hashring);
let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
info!( info!(
"refresh available scheduler addresses: {:?}", "refresh available scheduler addresses: {:?}",
available_scheduler_addrs available_scheduler_addrs
.iter() .iter()
.map(|s| s.ip().to_string()) .map(|s| s.ip().to_string())
.collect::<Vec<String>>() .collect::<Vec<String>>(),
); );
Ok(()) Ok(())
} }

View File

@ -175,6 +175,7 @@ impl Proxy {
let server_ca_cert = self.server_ca_cert.clone(); let server_ca_cert = self.server_ca_cert.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new() if let Err(err) = http1::Builder::new()
.keep_alive(true)
.preserve_header_case(true) .preserve_header_case(true)
.title_case_headers(true) .title_case_headers(true)
.serve_connection( .serve_connection(
@ -184,7 +185,7 @@ impl Proxy {
.with_upgrades() .with_upgrades()
.await .await
{ {
error!("failed to serve connection: {}", err); error!("failed to serve connection from {}: {}", remote_address, err);
} }
}); });
} }

View File

@ -418,7 +418,7 @@ impl Task {
let mut finished_pieces: Vec<metadata::Piece> = Vec::new(); let mut finished_pieces: Vec<metadata::Piece> = Vec::new();
// Initialize stream channel. // Initialize stream channel.
let (in_stream_tx, in_stream_rx) = mpsc::channel(1024); let (in_stream_tx, in_stream_rx) = mpsc::channel(4096);
// Send the register peer request. // Send the register peer request.
in_stream_tx in_stream_tx
@ -444,10 +444,16 @@ impl Task {
// Initialize the stream. // Initialize the stream.
let in_stream = ReceiverStream::new(in_stream_rx); let in_stream = ReceiverStream::new(in_stream_rx);
let request = Request::new(in_stream);
let response = self let response = self
.scheduler_client .scheduler_client
.announce_peer(task.id.as_str(), Request::new(in_stream)) .announce_peer(task.id.as_str(), peer_id, request)
.await?; .await
.map_err(|err| {
error!("announce peer failed: {:?}", err);
err
})?;
info!("announced peer has been connected");
let out_stream = response let out_stream = response
.into_inner() .into_inner()