fix: barrier blocks thread (#858)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-11-22 13:45:32 +08:00 committed by GitHub
parent 84f0b30ca9
commit 574f37633e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 38 additions and 35 deletions

View File

@ -36,8 +36,9 @@ use dragonfly_client_storage::Storage;
use dragonfly_client_util::id_generator::IDGenerator; use dragonfly_client_util::id_generator::IDGenerator;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Barrier}; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::Barrier;
use tracing::{error, info, Level}; use tracing::{error, info, Level};
#[cfg(not(target_env = "msvc"))] #[cfg(not(target_env = "msvc"))]
@ -297,24 +298,6 @@ async fn main() -> Result<(), anyhow::Error> {
// Wait for servers to exit or shutdown signal. // Wait for servers to exit or shutdown signal.
tokio::select! { tokio::select! {
_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
dfdaemon_upload_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon upload grpc server failed: {}", err));
})
} => {
info!("dfdaemon upload grpc server exited");
},
_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
dfdaemon_download_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon download grpc server failed: {}", err));
})
} => {
info!("dfdaemon download grpc unix server exited");
},
_ = tokio::spawn(async move { dynconfig.run().await }) => { _ = tokio::spawn(async move { dynconfig.run().await }) => {
info!("dynconfig manager exited"); info!("dynconfig manager exited");
}, },
@ -343,11 +326,29 @@ async fn main() -> Result<(), anyhow::Error> {
info!("garbage collector exited"); info!("garbage collector exited");
}, },
_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
dfdaemon_upload_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon upload grpc server failed: {}", err));
})
} => {
info!("dfdaemon upload grpc server exited");
},
_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
dfdaemon_download_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon download grpc server failed: {}", err));
})
} => {
info!("dfdaemon download grpc unix server exited");
},
_ = { _ = {
let barrier = grpc_server_started_barrier.clone(); let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move { tokio::spawn(async move {
// Wait for grpc server started. // Wait for grpc server started.
barrier.wait(); barrier.wait().await;
proxy.run().await.unwrap_or_else(|err| error!("proxy server failed: {}", err)); proxy.run().await.unwrap_or_else(|err| error!("proxy server failed: {}", err));
}) })
} => { } => {

View File

@ -47,11 +47,12 @@ use dragonfly_client_util::{
}; };
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Barrier}; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::fs; use tokio::fs;
use tokio::net::{UnixListener, UnixStream}; use tokio::net::{UnixListener, UnixStream};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::Barrier;
use tokio_stream::wrappers::{ReceiverStream, UnixListenerStream}; use tokio_stream::wrappers::{ReceiverStream, UnixListenerStream};
use tonic::{ use tonic::{
transport::{Channel, Endpoint, Server, Uri}, transport::{Channel, Endpoint, Server, Uri},
@ -123,18 +124,12 @@ impl DfdaemonDownloadServer {
.await; .await;
// Start download grpc server with unix domain socket. // Start download grpc server with unix domain socket.
info!(
"download server listening on {}",
self.socket_path.display()
);
fs::create_dir_all(self.socket_path.parent().unwrap()).await?; fs::create_dir_all(self.socket_path.parent().unwrap()).await?;
let uds = UnixListener::bind(&self.socket_path)?; let uds = UnixListener::bind(&self.socket_path)?;
let uds_stream = UnixListenerStream::new(uds); let uds_stream = UnixListenerStream::new(uds);
let server = Server::builder() let server = Server::builder()
.tcp_nodelay(true)
.max_frame_size(super::MAX_FRAME_SIZE) .max_frame_size(super::MAX_FRAME_SIZE)
.initial_connection_window_size(super::INITIAL_WINDOW_SIZE)
.initial_stream_window_size(super::INITIAL_WINDOW_SIZE)
.tcp_keepalive(Some(super::TCP_KEEPALIVE)) .tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL)) .http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL))
.http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT)) .http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT))
@ -148,9 +143,13 @@ impl DfdaemonDownloadServer {
}); });
// Notify the grpc server is started. // Notify the grpc server is started.
grpc_server_started_barrier.wait(); grpc_server_started_barrier.wait().await;
// Wait for the download grpc server to shutdown. // Wait for the download grpc server to shutdown.
info!(
"download server listening on {}",
self.socket_path.display()
);
server.await?; server.await?;
// Remove the unix domain socket file. // Remove the unix domain socket file.
@ -994,7 +993,6 @@ impl DfdaemonDownloadClient {
// Ignore the uri because it is not used. // Ignore the uri because it is not used.
let channel = Endpoint::try_from("http://[::]:50051") let channel = Endpoint::try_from("http://[::]:50051")
.unwrap() .unwrap()
.tcp_nodelay(true)
.buffer_size(super::BUFFER_SIZE) .buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT) .connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT) .timeout(super::REQUEST_TIMEOUT)

View File

@ -41,10 +41,11 @@ use dragonfly_client_core::{
use dragonfly_client_util::http::{get_range, hashmap_to_headermap, headermap_to_hashmap}; use dragonfly_client_util::http::{get_range, hashmap_to_headermap, headermap_to_hashmap};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Barrier}; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::Barrier;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tonic::{ use tonic::{
transport::{Channel, Server}, transport::{Channel, Server},
@ -121,13 +122,13 @@ impl DfdaemonUploadServer {
.await; .await;
// Start upload grpc server. // Start upload grpc server.
info!("upload server listening on {}", self.addr);
let mut server_builder = Server::builder(); let mut server_builder = Server::builder();
if let Ok(Some(server_tls_config)) = if let Ok(Some(server_tls_config)) =
self.config.upload.server.load_server_tls_config().await self.config.upload.server.load_server_tls_config().await
{ {
server_builder = server_builder.tls_config(server_tls_config)?; server_builder = server_builder.tls_config(server_tls_config)?;
} }
let server = server_builder let server = server_builder
.max_frame_size(super::MAX_FRAME_SIZE) .max_frame_size(super::MAX_FRAME_SIZE)
.initial_connection_window_size(super::INITIAL_WINDOW_SIZE) .initial_connection_window_size(super::INITIAL_WINDOW_SIZE)
@ -142,9 +143,10 @@ impl DfdaemonUploadServer {
}); });
// Notify the grpc server is started. // Notify the grpc server is started.
grpc_server_started_barrier.wait(); grpc_server_started_barrier.wait().await;
// Wait for the upload grpc server to shutdown. // Wait for the upload grpc server to shutdown.
info!("upload server listening on {}", self.addr);
Ok(server.await?) Ok(server.await?)
} }
} }

View File

@ -49,8 +49,10 @@ use dragonfly_client_util::{
}; };
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{
use std::sync::{Arc, Mutex}; atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::{ use tokio::sync::{
mpsc::{self, Sender}, mpsc::{self, Sender},