From c44225b0180d2ffba01922b9528175b0a7ea9a60 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 28 Aug 2023 16:51:14 +0800 Subject: [PATCH] feat: add dynconfig to dfdaemon (#70) Signed-off-by: Gaius --- src/announcer/mod.rs | 1 - src/bin/dfdaemon/main.rs | 16 +++- src/config/dfdaemon.rs | 2 +- src/config/mod.rs | 3 + src/dynconfig/mod.rs | 172 +++++++++++++++++++++++++++++++++++++++ src/grpc/dfdaemon.rs | 1 + src/grpc/health.rs | 1 + src/grpc/manager.rs | 1 + src/grpc/scheduler.rs | 1 + src/grpc/security.rs | 1 + src/lib.rs | 1 + 11 files changed, 197 insertions(+), 3 deletions(-) create mode 100644 src/dynconfig/mod.rs diff --git a/src/announcer/mod.rs b/src/announcer/mod.rs index 67e3145d..9b3ef82c 100644 --- a/src/announcer/mod.rs +++ b/src/announcer/mod.rs @@ -103,7 +103,6 @@ impl ManagerAnnouncer { info!("announce to manager shutting down"); } else { - // TODO Announce the peer to the manager. self.shutdown.recv().await; info!("announce to manager shutting down"); } diff --git a/src/bin/dfdaemon/main.rs b/src/bin/dfdaemon/main.rs index 88b57f36..963171a5 100644 --- a/src/bin/dfdaemon/main.rs +++ b/src/bin/dfdaemon/main.rs @@ -17,6 +17,7 @@ use clap::Parser; use dragonfly_client::announcer::{ManagerAnnouncer, SchedulerAnnouncer}; use dragonfly_client::config::dfdaemon; +use dragonfly_client::dynconfig::Dynconfig; use dragonfly_client::grpc::{ dfdaemon::DfdaemonServer, manager::ManagerClient, scheduler::SchedulerClient, }; @@ -118,10 +119,19 @@ async fn main() -> Result<(), Box> { shutdown_complete_tx.clone(), ); + // Initialize dynconfig server. + let mut dynconfig = Dynconfig::new( + config.clone(), + manager_client.clone(), + shutdown::Shutdown::new(notify_shutdown.subscribe()), + shutdown_complete_tx.clone(), + ) + .await?; + // Initialize manager announcer. let mut manager_announcer = ManagerAnnouncer::new( config.clone(), - manager_client, + manager_client.clone(), shutdown::Shutdown::new(notify_shutdown.subscribe()), shutdown_complete_tx.clone(), ); @@ -152,6 +162,10 @@ async fn main() -> Result<(), Box> { info!("health server exited"); }, + _ = tokio::spawn(async move { dynconfig.run().await }) => { + info!("dynconfig manager exited"); + }, + _ = tokio::spawn(async move { manager_announcer.run().await }) => { info!("announcer manager exited"); }, diff --git a/src/config/dfdaemon.rs b/src/config/dfdaemon.rs index c7851906..bf0fc659 100644 --- a/src/config/dfdaemon.rs +++ b/src/config/dfdaemon.rs @@ -84,7 +84,7 @@ pub fn default_scheduler_schedule_timeout() -> Duration { // default_dynconfig_refresh_interval is the default interval to // refresh dynamic configuration from manager. pub fn default_dynconfig_refresh_interval() -> Duration { - Duration::from_secs(600) + Duration::from_secs(1800) } // default_seed_peer_keepalive_interval is the default interval to keepalive with manager. diff --git a/src/config/mod.rs b/src/config/mod.rs index a5eb7c96..08b21717 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -32,6 +32,9 @@ pub const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); // CARGO_PKG_RUSTC_VERSION is the minimum Rust version supported by the package, not the current Rust version. pub const CARGO_PKG_RUSTC_VERSION: &str = env!("CARGO_PKG_RUST_VERSION"); +// GIT_HASH is the git hash of the package. +pub const GIT_HASH: Option<&str> = option_env!("GIT_HASH"); + // default_root_dir is the default root directory for client. pub fn default_root_dir() -> PathBuf { #[cfg(target_os = "linux")] diff --git a/src/dynconfig/mod.rs b/src/dynconfig/mod.rs new file mode 100644 index 00000000..9696f038 --- /dev/null +++ b/src/dynconfig/mod.rs @@ -0,0 +1,172 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::config::{dfdaemon::Config, CARGO_PKG_VERSION, GIT_HASH}; +use crate::grpc::manager::ManagerClient; +use crate::shutdown; +use crate::{Error, Result}; +use dragonfly_api::manager::v2::{ + GetObjectStorageRequest, ListSchedulersRequest, ListSchedulersResponse, ObjectStorage, + SourceType, +}; +use tokio::sync::mpsc; +use tracing::{error, info}; + +// Data is the dynamic configuration of the dfdaemon. +pub struct Data { + // schedulers is the schedulers of the dfdaemon. + schedulers: ListSchedulersResponse, + + // object_storage is the object storage configuration of the dfdaemon. + object_storage: Option, +} + +// Dynconfig supports dynamic configuration of the client. +pub struct Dynconfig { + // config is the configuration of the dfdaemon. + config: Config, + + // data is the dynamic configuration of the dfdaemon. + data: Data, + + // manager_client is the grpc client of the manager. + manager_client: ManagerClient, + + // shutdown is used to shutdown the announcer. + shutdown: shutdown::Shutdown, + + // _shutdown_complete is used to notify the announcer is shutdown. + _shutdown_complete: mpsc::UnboundedSender<()>, +} + +// Dynconfig is the implementation of Dynconfig. +impl Dynconfig { + // new creates a new Dynconfig. + pub async fn new( + config: Config, + manager_client: ManagerClient, + shutdown: shutdown::Shutdown, + shutdown_complete_tx: mpsc::UnboundedSender<()>, + ) -> Result { + // Create a new Dynconfig. + let mut dc = Dynconfig { + config, + data: Data { + schedulers: ListSchedulersResponse::default(), + object_storage: None, + }, + manager_client, + shutdown, + _shutdown_complete: shutdown_complete_tx, + }; + + // Get the initial dynamic configuration. + dc.refresh().await?; + Ok(dc) + } + + // run starts the dynconfig server. + pub async fn run(&mut self) -> Result<()> { + // Start the refresh loop. + let mut interval = tokio::time::interval(self.config.dynconfig.refresh_interval); + + loop { + tokio::select! { + _ = interval.tick() => { + if let Err(err) = self.refresh().await { + error!("refresh dynconfig failed: {}", err); + }; + } + _ = self.shutdown.recv() => { + // Dynconfig server shutting down with signals. + info!("dynconfig server shutting down"); + return Ok(()); + } + } + } + } + + // get_schedulers returns the schedulers of the dfdaemon. + pub fn get_schedulers(&self) -> ListSchedulersResponse { + self.data.schedulers.clone() + } + + // get_object_storage returns the object storage configuration of the dfdaemon. + pub fn get_object_storage(&self) -> Option { + self.data.object_storage.clone() + } + + // refresh refreshes the dynamic configuration of the dfdaemon. + async fn refresh(&mut self) -> Result<()> { + self.data = self.get().await?; + Ok(()) + } + + // get returns the dynamic configuration of the dfdaemon. + async fn get(&mut self) -> Result { + // Get the source type. + let source_type = if self.config.seed_peer.enable { + SourceType::SeedPeerSource.into() + } else { + SourceType::PeerSource.into() + }; + + // Get the schedulers from the manager. + let schedulers = self + .manager_client + .list_schedulers(ListSchedulersRequest { + source_type, + hostname: self.config.host.hostname.clone(), + ip: self.config.host.ip.unwrap().to_string(), + idc: self.config.host.idc.clone(), + location: self.config.host.location.clone(), + version: CARGO_PKG_VERSION.to_string(), + commit: GIT_HASH.unwrap_or("").to_string(), + }) + .await?; + + // Get the object storage configuration from the manager. + match self + .manager_client + .get_object_storage(GetObjectStorageRequest { + source_type, + hostname: self.config.host.hostname.clone(), + ip: self.config.host.ip.unwrap().to_string(), + }) + .await + { + Ok(object_storage) => Ok(Data { + schedulers, + object_storage: Some(object_storage), + }), + Err(err) => { + // If the object storage is not found, return the schedulers only. + if let Error::TonicStatus(status) = err { + if status.code() == tonic::Code::NotFound { + return Ok(Data { + schedulers, + object_storage: None, + }); + } + + Err(Error::TonicStatus(status)) + } else { + Err(err) + } + } + } + } +} diff --git a/src/grpc/dfdaemon.rs b/src/grpc/dfdaemon.rs index 7704e573..e1d83a5c 100644 --- a/src/grpc/dfdaemon.rs +++ b/src/grpc/dfdaemon.rs @@ -145,6 +145,7 @@ impl Dfdaemon for DfdaemonServerHandler { } // DfdaemonClient is a wrapper of DfdaemonGRPCClient. +#[derive(Clone)] pub struct DfdaemonClient { // client is the grpc client of the dfdaemon. pub client: DfdaemonGRPCClient, diff --git a/src/grpc/health.rs b/src/grpc/health.rs index a5de24fe..eb794269 100644 --- a/src/grpc/health.rs +++ b/src/grpc/health.rs @@ -21,6 +21,7 @@ use tonic_health::pb::{ }; // HealthClient is a wrapper of HealthGRPCClient. +#[derive(Clone)] pub struct HealthClient { // client is the grpc client of the certificate. client: HealthGRPCClient, diff --git a/src/grpc/manager.rs b/src/grpc/manager.rs index 2380ff6a..03018933 100644 --- a/src/grpc/manager.rs +++ b/src/grpc/manager.rs @@ -23,6 +23,7 @@ use dragonfly_api::manager::v2::{ use tonic::transport::Channel; // ManagerClient is a wrapper of ManagerGRPCClient. +#[derive(Clone)] pub struct ManagerClient { // client is the grpc client of the manager. pub client: ManagerGRPCClient, diff --git a/src/grpc/scheduler.rs b/src/grpc/scheduler.rs index 00351c07..d8445c2a 100644 --- a/src/grpc/scheduler.rs +++ b/src/grpc/scheduler.rs @@ -24,6 +24,7 @@ use dragonfly_api::scheduler::v2::{ use tonic::transport::Channel; // SchedulerClient is a wrapper of SchedulerGRPCClient. +#[derive(Clone)] pub struct SchedulerClient { // client is the grpc client of the scehduler. pub client: SchedulerGRPCClient, diff --git a/src/grpc/security.rs b/src/grpc/security.rs index 62fa1981..635c0aba 100644 --- a/src/grpc/security.rs +++ b/src/grpc/security.rs @@ -22,6 +22,7 @@ use dragonfly_api::security::{ use tonic::transport::Channel; // CertificateClient is a wrapper of CertificateGRPCClient. +#[derive(Clone)] pub struct CertificateClient { // client is the grpc client of the certificate. pub client: CertificateGRPCClient, diff --git a/src/lib.rs b/src/lib.rs index 9604f3fb..d3c9023e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ pub mod announcer; pub mod backend; pub mod config; pub mod downloader; +pub mod dynconfig; pub mod grpc; pub mod health; pub mod metrics;