feat: add cache_peer apis and cache_task apis for scheduler client (#550)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-06-20 15:31:59 +08:00 committed by GitHub
parent 213a54ecaf
commit 6b4c38cf8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 154 additions and 58 deletions

4
Cargo.lock generated
View File

@ -950,9 +950,9 @@ dependencies = [
[[package]]
name = "dragonfly-api"
version = "2.0.123"
version = "2.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86b83bf914712fc12df2129ed54ed889dbdddf0714af3013b68e85f9f7b844f7"
checksum = "1ab958fa6959f459c170f87cccae5adae0010fc3be09b9f98a433b898a7f4158"
dependencies = [
"prost 0.11.9",
"prost-types 0.12.6",

View File

@ -30,7 +30,7 @@ dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.8
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.81" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.81" }
thiserror = "1.0"
dragonfly-api = "2.0.123"
dragonfly-api = "2.0.124"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.2", features = ["full"] }

View File

@ -16,12 +16,14 @@
// use crate::dynconfig::Dynconfig;
use crate::dynconfig::Dynconfig;
use dragonfly_api::common::v2::{Peer, Task};
use dragonfly_api::common::v2::{CachePeer, CacheTask, Peer, Task};
use dragonfly_api::manager::v2::Scheduler;
use dragonfly_api::scheduler::v2::{
scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest,
AnnouncePeerRequest, AnnouncePeerResponse, DeleteHostRequest, DeletePeerRequest,
DeleteTaskRequest, StatPeerRequest, StatTaskRequest,
scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceCachePeerRequest,
AnnounceCachePeerResponse, AnnounceHostRequest, AnnouncePeerRequest, AnnouncePeerResponse,
DeleteCachePeerRequest, DeleteCacheTaskRequest, DeleteHostRequest, DeletePeerRequest,
DeleteTaskRequest, StatCachePeerRequest, StatCacheTaskRequest, StatPeerRequest,
StatTaskRequest, UploadCacheTaskRequest,
};
use dragonfly_client_core::error::{ErrorType, ExternalError, OrErr};
use dragonfly_client_core::{Error, Result};
@ -134,57 +136,6 @@ impl SchedulerClient {
Ok(())
}
// init_announce_host announces the host to the scheduler.
#[instrument(skip(self))]
pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> {
let mut join_set = JoinSet::new();
let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
let available_scheduler_addrs_clone = available_scheduler_addrs.clone();
drop(available_scheduler_addrs);
for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone());
async fn announce_host(
addr: SocketAddr,
request: tonic::Request<AnnounceHostRequest>,
) -> Result<()> {
info!("announce host to {:?}", addr);
// Connect to the scheduler.
let channel = Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.connect()
.await
.map_err(|err| {
error!("connect to {} failed: {}", addr.to_string(), err);
err
})
.or_err(ErrorType::ConnectError)?;
let mut client = SchedulerGRPCClient::new(channel);
client.announce_host(request).await?;
Ok(())
}
join_set.spawn(announce_host(*available_scheduler_addr, request).in_current_span());
}
while let Some(message) = join_set
.join_next()
.await
.transpose()
.or_err(ErrorType::AsyncRuntimeError)?
{
if let Err(err) = message {
error!("failed to init announce host: {}", err);
return Err(err);
}
}
Ok(())
}
// announce_host announces the host to the scheduler.
#[instrument(skip(self))]
pub async fn announce_host(&self, request: AnnounceHostRequest) -> Result<()> {
@ -239,6 +190,57 @@ impl SchedulerClient {
Ok(())
}
// init_announce_host announces the host to the scheduler.
#[instrument(skip(self))]
pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> {
let mut join_set = JoinSet::new();
let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
let available_scheduler_addrs_clone = available_scheduler_addrs.clone();
drop(available_scheduler_addrs);
for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone());
async fn announce_host(
addr: SocketAddr,
request: tonic::Request<AnnounceHostRequest>,
) -> Result<()> {
info!("announce host to {:?}", addr);
// Connect to the scheduler.
let channel = Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.connect()
.await
.map_err(|err| {
error!("connect to {} failed: {}", addr.to_string(), err);
err
})
.or_err(ErrorType::ConnectError)?;
let mut client = SchedulerGRPCClient::new(channel);
client.announce_host(request).await?;
Ok(())
}
join_set.spawn(announce_host(*available_scheduler_addr, request).in_current_span());
}
while let Some(message) = join_set
.join_next()
.await
.transpose()
.or_err(ErrorType::AsyncRuntimeError)?
{
if let Err(err) = message {
error!("failed to init announce host: {}", err);
return Err(err);
}
}
Ok(())
}
// delete_host tells the scheduler that the host is deleting.
#[instrument(skip(self))]
pub async fn delete_host(&self, request: DeleteHostRequest) -> Result<()> {
@ -293,6 +295,100 @@ impl SchedulerClient {
Ok(())
}
// announce_cache_peer announces the cache peer to the scheduler.
#[instrument(skip_all)]
pub async fn announce_cache_peer(
&self,
task_id: &str,
peer_id: &str,
request: impl tonic::IntoStreamingRequest<Message = AnnounceCachePeerRequest>,
) -> Result<tonic::Response<tonic::codec::Streaming<AnnounceCachePeerResponse>>> {
let response = self
.client(task_id, Some(peer_id))
.await?
.announce_cache_peer(request)
.await?;
Ok(response)
}
// stat_cache_peer gets the status of the cache peer.
#[instrument(skip(self))]
pub async fn stat_cache_peer(
&self,
task_id: &str,
request: StatCachePeerRequest,
) -> Result<CachePeer> {
let request = Self::make_request(request);
let response = self
.client(task_id, None)
.await?
.stat_cache_peer(request)
.await?;
Ok(response.into_inner())
}
// delete_cache_peer tells the scheduler that the cache peer is deleting.
#[instrument(skip(self))]
pub async fn delete_cache_peer(
&self,
task_id: &str,
request: DeleteCachePeerRequest,
) -> Result<()> {
let request = Self::make_request(request);
self.client(task_id, None)
.await?
.delete_cache_peer(request)
.await?;
Ok(())
}
// upload_cache_task uploads the metadata of the cache task.
#[instrument(skip(self))]
pub async fn upload_cache_task(
&self,
task_id: &str,
request: UploadCacheTaskRequest,
) -> Result<CacheTask> {
let request = Self::make_request(request);
let response = self
.client(task_id, None)
.await?
.upload_cache_task(request)
.await?;
Ok(response.into_inner())
}
// stat_cache_task gets the status of the cache task.
#[instrument(skip(self))]
pub async fn stat_cache_task(
&self,
task_id: &str,
request: StatCacheTaskRequest,
) -> Result<CacheTask> {
let request = Self::make_request(request);
let response = self
.client(task_id, None)
.await?
.stat_cache_task(request)
.await?;
Ok(response.into_inner())
}
// delete_cache_task tells the scheduler that the cache task is deleting.
#[instrument(skip(self))]
pub async fn delete_cache_task(
&self,
task_id: &str,
request: DeleteCacheTaskRequest,
) -> Result<()> {
let request = Self::make_request(request);
self.client(task_id, None)
.await?
.delete_cache_task(request)
.await?;
Ok(())
}
// client gets the grpc client of the scheduler.
#[instrument(skip(self))]
async fn client(