feat: add dynconfig to dfdaemon (#70)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-08-28 16:51:14 +08:00 committed by GitHub
parent 09fcb6eed6
commit c44225b018
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 197 additions and 3 deletions

View File

@ -103,7 +103,6 @@ impl ManagerAnnouncer {
info!("announce to manager shutting down"); info!("announce to manager shutting down");
} else { } else {
// TODO Announce the peer to the manager.
self.shutdown.recv().await; self.shutdown.recv().await;
info!("announce to manager shutting down"); info!("announce to manager shutting down");
} }

View File

@ -17,6 +17,7 @@
use clap::Parser; use clap::Parser;
use dragonfly_client::announcer::{ManagerAnnouncer, SchedulerAnnouncer}; use dragonfly_client::announcer::{ManagerAnnouncer, SchedulerAnnouncer};
use dragonfly_client::config::dfdaemon; use dragonfly_client::config::dfdaemon;
use dragonfly_client::dynconfig::Dynconfig;
use dragonfly_client::grpc::{ use dragonfly_client::grpc::{
dfdaemon::DfdaemonServer, manager::ManagerClient, scheduler::SchedulerClient, dfdaemon::DfdaemonServer, manager::ManagerClient, scheduler::SchedulerClient,
}; };
@ -118,10 +119,19 @@ async fn main() -> Result<(), Box<dyn Error>> {
shutdown_complete_tx.clone(), shutdown_complete_tx.clone(),
); );
// Initialize dynconfig server.
let mut dynconfig = Dynconfig::new(
config.clone(),
manager_client.clone(),
shutdown::Shutdown::new(notify_shutdown.subscribe()),
shutdown_complete_tx.clone(),
)
.await?;
// Initialize manager announcer. // Initialize manager announcer.
let mut manager_announcer = ManagerAnnouncer::new( let mut manager_announcer = ManagerAnnouncer::new(
config.clone(), config.clone(),
manager_client, manager_client.clone(),
shutdown::Shutdown::new(notify_shutdown.subscribe()), shutdown::Shutdown::new(notify_shutdown.subscribe()),
shutdown_complete_tx.clone(), shutdown_complete_tx.clone(),
); );
@ -152,6 +162,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
info!("health server exited"); info!("health server exited");
}, },
_ = tokio::spawn(async move { dynconfig.run().await }) => {
info!("dynconfig manager exited");
},
_ = tokio::spawn(async move { manager_announcer.run().await }) => { _ = tokio::spawn(async move { manager_announcer.run().await }) => {
info!("announcer manager exited"); info!("announcer manager exited");
}, },

View File

@ -84,7 +84,7 @@ pub fn default_scheduler_schedule_timeout() -> Duration {
// default_dynconfig_refresh_interval is the default interval to // default_dynconfig_refresh_interval is the default interval to
// refresh dynamic configuration from manager. // refresh dynamic configuration from manager.
pub fn default_dynconfig_refresh_interval() -> Duration { pub fn default_dynconfig_refresh_interval() -> Duration {
Duration::from_secs(600) Duration::from_secs(1800)
} }
// default_seed_peer_keepalive_interval is the default interval to keepalive with manager. // default_seed_peer_keepalive_interval is the default interval to keepalive with manager.

View File

@ -32,6 +32,9 @@ pub const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
// CARGO_PKG_RUSTC_VERSION is the minimum Rust version supported by the package, not the current Rust version. // CARGO_PKG_RUSTC_VERSION is the minimum Rust version supported by the package, not the current Rust version.
pub const CARGO_PKG_RUSTC_VERSION: &str = env!("CARGO_PKG_RUST_VERSION"); pub const CARGO_PKG_RUSTC_VERSION: &str = env!("CARGO_PKG_RUST_VERSION");
// GIT_HASH is the git hash of the package.
pub const GIT_HASH: Option<&str> = option_env!("GIT_HASH");
// default_root_dir is the default root directory for client. // default_root_dir is the default root directory for client.
pub fn default_root_dir() -> PathBuf { pub fn default_root_dir() -> PathBuf {
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]

172
src/dynconfig/mod.rs Normal file
View File

@ -0,0 +1,172 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::config::{dfdaemon::Config, CARGO_PKG_VERSION, GIT_HASH};
use crate::grpc::manager::ManagerClient;
use crate::shutdown;
use crate::{Error, Result};
use dragonfly_api::manager::v2::{
GetObjectStorageRequest, ListSchedulersRequest, ListSchedulersResponse, ObjectStorage,
SourceType,
};
use tokio::sync::mpsc;
use tracing::{error, info};
// Data is the dynamic configuration of the dfdaemon.
pub struct Data {
// schedulers is the schedulers of the dfdaemon.
schedulers: ListSchedulersResponse,
// object_storage is the object storage configuration of the dfdaemon.
object_storage: Option<ObjectStorage>,
}
// Dynconfig supports dynamic configuration of the client.
pub struct Dynconfig {
// config is the configuration of the dfdaemon.
config: Config,
// data is the dynamic configuration of the dfdaemon.
data: Data,
// manager_client is the grpc client of the manager.
manager_client: ManagerClient,
// shutdown is used to shutdown the announcer.
shutdown: shutdown::Shutdown,
// _shutdown_complete is used to notify the announcer is shutdown.
_shutdown_complete: mpsc::UnboundedSender<()>,
}
// Dynconfig is the implementation of Dynconfig.
impl Dynconfig {
// new creates a new Dynconfig.
pub async fn new(
config: Config,
manager_client: ManagerClient,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Result<Self> {
// Create a new Dynconfig.
let mut dc = Dynconfig {
config,
data: Data {
schedulers: ListSchedulersResponse::default(),
object_storage: None,
},
manager_client,
shutdown,
_shutdown_complete: shutdown_complete_tx,
};
// Get the initial dynamic configuration.
dc.refresh().await?;
Ok(dc)
}
// run starts the dynconfig server.
pub async fn run(&mut self) -> Result<()> {
// Start the refresh loop.
let mut interval = tokio::time::interval(self.config.dynconfig.refresh_interval);
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(err) = self.refresh().await {
error!("refresh dynconfig failed: {}", err);
};
}
_ = self.shutdown.recv() => {
// Dynconfig server shutting down with signals.
info!("dynconfig server shutting down");
return Ok(());
}
}
}
}
// get_schedulers returns the schedulers of the dfdaemon.
pub fn get_schedulers(&self) -> ListSchedulersResponse {
self.data.schedulers.clone()
}
// get_object_storage returns the object storage configuration of the dfdaemon.
pub fn get_object_storage(&self) -> Option<ObjectStorage> {
self.data.object_storage.clone()
}
// refresh refreshes the dynamic configuration of the dfdaemon.
async fn refresh(&mut self) -> Result<()> {
self.data = self.get().await?;
Ok(())
}
// get returns the dynamic configuration of the dfdaemon.
async fn get(&mut self) -> Result<Data> {
// Get the source type.
let source_type = if self.config.seed_peer.enable {
SourceType::SeedPeerSource.into()
} else {
SourceType::PeerSource.into()
};
// Get the schedulers from the manager.
let schedulers = self
.manager_client
.list_schedulers(ListSchedulersRequest {
source_type,
hostname: self.config.host.hostname.clone(),
ip: self.config.host.ip.unwrap().to_string(),
idc: self.config.host.idc.clone(),
location: self.config.host.location.clone(),
version: CARGO_PKG_VERSION.to_string(),
commit: GIT_HASH.unwrap_or("").to_string(),
})
.await?;
// Get the object storage configuration from the manager.
match self
.manager_client
.get_object_storage(GetObjectStorageRequest {
source_type,
hostname: self.config.host.hostname.clone(),
ip: self.config.host.ip.unwrap().to_string(),
})
.await
{
Ok(object_storage) => Ok(Data {
schedulers,
object_storage: Some(object_storage),
}),
Err(err) => {
// If the object storage is not found, return the schedulers only.
if let Error::TonicStatus(status) = err {
if status.code() == tonic::Code::NotFound {
return Ok(Data {
schedulers,
object_storage: None,
});
}
Err(Error::TonicStatus(status))
} else {
Err(err)
}
}
}
}
}

View File

@ -145,6 +145,7 @@ impl Dfdaemon for DfdaemonServerHandler {
} }
// DfdaemonClient is a wrapper of DfdaemonGRPCClient. // DfdaemonClient is a wrapper of DfdaemonGRPCClient.
#[derive(Clone)]
pub struct DfdaemonClient { pub struct DfdaemonClient {
// client is the grpc client of the dfdaemon. // client is the grpc client of the dfdaemon.
pub client: DfdaemonGRPCClient<Channel>, pub client: DfdaemonGRPCClient<Channel>,

View File

@ -21,6 +21,7 @@ use tonic_health::pb::{
}; };
// HealthClient is a wrapper of HealthGRPCClient. // HealthClient is a wrapper of HealthGRPCClient.
#[derive(Clone)]
pub struct HealthClient { pub struct HealthClient {
// client is the grpc client of the certificate. // client is the grpc client of the certificate.
client: HealthGRPCClient<Channel>, client: HealthGRPCClient<Channel>,

View File

@ -23,6 +23,7 @@ use dragonfly_api::manager::v2::{
use tonic::transport::Channel; use tonic::transport::Channel;
// ManagerClient is a wrapper of ManagerGRPCClient. // ManagerClient is a wrapper of ManagerGRPCClient.
#[derive(Clone)]
pub struct ManagerClient { pub struct ManagerClient {
// client is the grpc client of the manager. // client is the grpc client of the manager.
pub client: ManagerGRPCClient<Channel>, pub client: ManagerGRPCClient<Channel>,

View File

@ -24,6 +24,7 @@ use dragonfly_api::scheduler::v2::{
use tonic::transport::Channel; use tonic::transport::Channel;
// SchedulerClient is a wrapper of SchedulerGRPCClient. // SchedulerClient is a wrapper of SchedulerGRPCClient.
#[derive(Clone)]
pub struct SchedulerClient { pub struct SchedulerClient {
// client is the grpc client of the scehduler. // client is the grpc client of the scehduler.
pub client: SchedulerGRPCClient<Channel>, pub client: SchedulerGRPCClient<Channel>,

View File

@ -22,6 +22,7 @@ use dragonfly_api::security::{
use tonic::transport::Channel; use tonic::transport::Channel;
// CertificateClient is a wrapper of CertificateGRPCClient. // CertificateClient is a wrapper of CertificateGRPCClient.
#[derive(Clone)]
pub struct CertificateClient { pub struct CertificateClient {
// client is the grpc client of the certificate. // client is the grpc client of the certificate.
pub client: CertificateGRPCClient<Channel>, pub client: CertificateGRPCClient<Channel>,

View File

@ -18,6 +18,7 @@ pub mod announcer;
pub mod backend; pub mod backend;
pub mod config; pub mod config;
pub mod downloader; pub mod downloader;
pub mod dynconfig;
pub mod grpc; pub mod grpc;
pub mod health; pub mod health;
pub mod metrics; pub mod metrics;