Compare commits

...

7 Commits

Author SHA1 Message Date
dependabot[bot] d4aaa03301
chore(deps): Bump github/codeql-action from 3.29.7 to 3.29.8 (#1282)
Bumps [github/codeql-action](https://github.com/github/codeql-action) from 3.29.7 to 3.29.8.
- [Release notes](https://github.com/github/codeql-action/releases)
- [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md)
- [Commits](51f77329af...76621b61de)

---
updated-dependencies:
- dependency-name: github/codeql-action
  dependency-version: 3.29.8
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-13 11:39:56 +08:00
dependabot[bot] d68dba20c1
chore(deps): Bump clap from 4.5.41 to 4.5.43 (#1283)
Bumps [clap](https://github.com/clap-rs/clap) from 4.5.41 to 4.5.43.
- [Release notes](https://github.com/clap-rs/clap/releases)
- [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md)
- [Commits](https://github.com/clap-rs/clap/compare/clap_complete-v4.5.41...clap_complete-v4.5.43)

---
updated-dependencies:
- dependency-name: clap
  dependency-version: 4.5.43
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-13 11:39:31 +08:00
dependabot[bot] 8116538556
chore(deps): Bump glob from 0.3.2 to 0.3.3 (#1285)
Bumps [glob](https://github.com/rust-lang/glob) from 0.3.2 to 0.3.3.
- [Release notes](https://github.com/rust-lang/glob/releases)
- [Changelog](https://github.com/rust-lang/glob/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/glob/compare/v0.3.2...v0.3.3)

---
updated-dependencies:
- dependency-name: glob
  dependency-version: 0.3.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-13 11:39:00 +08:00
dependabot[bot] 3822048e6b
chore(deps): Bump actions/download-artifact from 4 to 5 (#1286)
Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 4 to 5.
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/download-artifact
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-13 11:38:35 +08:00
Gaius e05270e598
feat: get network speed for scheduler (#1279)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-08-11 22:24:16 +08:00
Chlins Zhang 2cca5c7b9b
refactor: remove the seed peer announcement to manager (#1261)
Signed-off-by: chlins <chlins.zhang@gmail.com>
2025-08-08 12:25:08 +08:00
this is my name 1f8a323665
docs:Modify Cache storage comments (#1277)
Modify Cache storage comments to clarify its usage scenarios.

Signed-off-by: fu220 <2863318196@qq.com>
2025-08-06 17:04:20 +08:00
12 changed files with 350 additions and 313 deletions

View File

@ -94,7 +94,7 @@ jobs:
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@51f77329afa6477de8c49fc9c7046c15b9a4e79d
uses: github/codeql-action/upload-sarif@76621b61decf072c1cee8dd1ce2d2a82d33c17ed
with:
sarif_file: 'trivy-results.sarif'
@ -189,7 +189,7 @@ jobs:
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@51f77329afa6477de8c49fc9c7046c15b9a4e79d
uses: github/codeql-action/upload-sarif@76621b61decf072c1cee8dd1ce2d2a82d33c17ed
with:
sarif_file: 'trivy-results.sarif'
@ -284,7 +284,7 @@ jobs:
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@51f77329afa6477de8c49fc9c7046c15b9a4e79d
uses: github/codeql-action/upload-sarif@76621b61decf072c1cee8dd1ce2d2a82d33c17ed
with:
sarif_file: 'trivy-results.sarif'

View File

@ -119,7 +119,7 @@ jobs:
contents: write
steps:
- name: Download Release Artifacts
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
with:
path: releases
pattern: release-*

33
Cargo.lock generated
View File

@ -603,9 +603,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.41"
version = "4.5.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be92d32e80243a54711e5d7ce823c35c41c9d929dc4ab58e1276f625841aadf9"
checksum = "1fc0e74a703892159f5ae7d3aac52c8e6c392f5ae5f359c70b5881d60aaac318"
dependencies = [
"clap_builder",
"clap_derive",
@ -613,9 +613,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.41"
version = "4.5.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "707eab41e9622f9139419d573eca0900137718000c517d47da73045f54331c3d"
checksum = "b3e7f4214277f3c7aa526a59dd3fbe306a370daee1f8b7b8c987069cd8e888a8"
dependencies = [
"anstream",
"anstyle",
@ -625,9 +625,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.5.41"
version = "4.5.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef4f52386a59ca4c860f7393bcf8abd8dfd91ecccc0f774635ff68e92eeef491"
checksum = "14cb31bb0a7d536caef2639baa7fad459e15c3144efefa6dbd1c84562c4739f6"
dependencies = [
"heck",
"proc-macro2",
@ -978,9 +978,9 @@ dependencies = [
[[package]]
name = "dragonfly-api"
version = "2.1.55"
version = "2.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfca467dd19bb026b667a89fc3631213957899766dfc95ad62ac0539ce68939b"
checksum = "8d07e740a105d6dd2ce968318897beaf37ef8b8f581fbae3d0e227722857786b"
dependencies = [
"prost 0.13.5",
"prost-types 0.14.1",
@ -1024,6 +1024,7 @@ dependencies = [
"lazy_static",
"leaky-bucket",
"local-ip-address",
"openssl",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
@ -1200,6 +1201,7 @@ dependencies = [
"http-range-header",
"lazy_static",
"lru",
"openssl",
"pnet",
"rcgen",
"reqwest",
@ -1208,6 +1210,7 @@ dependencies = [
"rustls-pemfile 2.2.0",
"rustls-pki-types",
"sha2",
"sysinfo",
"tempfile",
"tokio",
"tracing",
@ -1499,9 +1502,9 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
[[package]]
name = "glob"
version = "0.3.2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "h2"
@ -2863,6 +2866,15 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-src"
version = "300.5.1+3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "735230c832b28c000e3bc117119e6466a663ec73506bc0a9907ea4187508e42a"
dependencies = [
"cc",
]
[[package]]
name = "openssl-sys"
version = "0.9.107"
@ -2871,6 +2883,7 @@ checksum = "8288979acd84749c744a9014b4382d42b8f7b2592847b5afb2ed29e5d16ede07"
dependencies = [
"cc",
"libc",
"openssl-src",
"pkg-config",
"vcpkg",
]

View File

@ -29,7 +29,7 @@ dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.1
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.10" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.10" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.10" }
dragonfly-api = "2.1.55"
dragonfly-api = "2.1.57"
thiserror = "2.0"
futures = "0.3.31"
reqwest = { version = "0.12.4", features = [
@ -91,7 +91,7 @@ opendal = { version = "0.48.0", features = [
"services-cos",
"services-webhdfs",
] }
clap = { version = "4.5.41", features = ["derive"] }
clap = { version = "4.5.45", features = ["derive"] }
anyhow = "1.0.98"
toml_edit = "0.22.26"
toml = "0.8.23"
@ -106,6 +106,7 @@ fs2 = "0.4.3"
lazy_static = "1.5"
bytes = "1.10"
local-ip-address = "0.6.5"
sysinfo = { version = "0.32.1", default-features = false, features = ["component", "disk", "network", "system", "user"] }
[profile.release]
opt-level = 3

View File

@ -226,18 +226,6 @@ fn default_storage_cache_capacity() -> ByteSize {
ByteSize::mib(64)
}
/// default_seed_peer_cluster_id is the default cluster id of seed peer.
#[inline]
fn default_seed_peer_cluster_id() -> u64 {
1
}
/// default_seed_peer_keepalive_interval is the default interval to keepalive with manager.
#[inline]
fn default_seed_peer_keepalive_interval() -> Duration {
Duration::from_secs(15)
}
/// default_gc_interval is the default interval to do gc.
#[inline]
fn default_gc_interval() -> Duration {
@ -924,18 +912,6 @@ pub struct SeedPeer {
/// kind is the type of seed peer.
#[serde(default, rename = "type")]
pub kind: HostType,
/// cluster_id is the cluster id of the seed peer cluster.
#[serde(default = "default_seed_peer_cluster_id", rename = "clusterID")]
#[validate(range(min = 1))]
pub cluster_id: u64,
/// keepalive_interval is the interval to keep alive with manager.
#[serde(
default = "default_seed_peer_keepalive_interval",
with = "humantime_serde"
)]
pub keepalive_interval: Duration,
}
/// SeedPeer implements Default.
@ -944,8 +920,6 @@ impl Default for SeedPeer {
SeedPeer {
enable: false,
kind: HostType::Normal,
cluster_id: default_seed_peer_cluster_id(),
keepalive_interval: default_seed_peer_keepalive_interval(),
}
}
}
@ -1027,31 +1001,33 @@ pub struct Storage {
/// cache_capacity is the cache capacity for downloading, default is 100.
///
/// Cache storage:
/// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`.
/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443.
/// 1. Users can preheat task by caching to memory (via CacheTask) or to disk (via Task).
/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/dfdaemon.proto#L174.
/// 2. If the download hits the memory cache, it will be faster than reading from the disk, because there is no
/// page cache for the first read.
///
/// ```text
/// 1.Preheat
/// |
/// |
/// +--------------------------------------------------+
/// | | Peer |
/// | | +-----------+ |
/// | | -- Partial -->| Cache | |
/// | | | +-----------+ |
/// | v | | | |
/// | Download | Miss | |
/// | Task -->| | --- Hit ------>|<-- 2.Download
/// | | | ^ |
/// | | v | |
/// | | +-----------+ | |
/// | -- Full -->| Disk |---------- |
/// | +-----------+ |
/// | |
/// +--------------------------------------------------+
/// ```
///```text
/// +--------+
/// │ Source │
/// +--------+
/// ^ ^ Preheat
/// │ │ |
/// +-----------------+ │ │ +----------------------------+
/// │ Other Peers │ │ │ │ Peer | │
/// │ │ │ │ │ v │
/// │ +----------+ │ │ │ │ +----------+ │
/// │ │ Cache |<--|----------|<-Miss--| Cache |--Hit-->|<----Download CacheTask
/// │ +----------+ │ │ │ +----------+ │
/// │ │ │ │ │
/// │ +----------+ │ │ │ +----------+ │
/// │ │ Disk |<--|----------|<-Miss--| Disk |--Hit-->|<----Download Task
/// │ +----------+ │ │ +----------+ │
/// │ │ │ ^ │
/// │ │ │ | │
/// +-----------------+ +----------------------------+
/// |
/// Preheat
///```
#[serde(with = "bytesize_serde", default = "default_storage_cache_capacity")]
pub cache_capacity: ByteSize,
}
@ -2013,11 +1989,6 @@ key: /etc/ssl/private/client.pem
let default_seed_peer = SeedPeer::default();
assert!(!default_seed_peer.enable);
assert_eq!(default_seed_peer.kind, HostType::Normal);
assert_eq!(default_seed_peer.cluster_id, 1);
assert_eq!(
default_seed_peer.keepalive_interval,
default_seed_peer_keepalive_interval()
);
}
#[test]
@ -2025,20 +1996,9 @@ key: /etc/ssl/private/client.pem
let valid_seed_peer = SeedPeer {
enable: true,
kind: HostType::Weak,
cluster_id: 5,
keepalive_interval: Duration::from_secs(90),
};
assert!(valid_seed_peer.validate().is_ok());
let invalid_seed_peer = SeedPeer {
enable: true,
kind: HostType::Weak,
cluster_id: 0,
keepalive_interval: Duration::from_secs(90),
};
assert!(invalid_seed_peer.validate().is_err());
}
#[test]
@ -2055,8 +2015,6 @@ key: /etc/ssl/private/client.pem
assert!(seed_peer.enable);
assert_eq!(seed_peer.kind, HostType::Super);
assert_eq!(seed_peer.cluster_id, 2);
assert_eq!(seed_peer.keepalive_interval, Duration::from_secs(60));
}
#[test]

View File

@ -76,31 +76,33 @@ impl Task {
/// Cache is the cache for storing piece content by LRU algorithm.
///
/// Cache storage:
/// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`.
/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443.
/// 1. Users can preheat task by caching to memory (via CacheTask) or to disk (via Task).
/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/dfdaemon.proto#L174.
/// 2. If the download hits the memory cache, it will be faster than reading from the disk, because there is no
/// page cache for the first read.
///
/// ```text
/// 1.Preheat
/// |
/// |
/// +--------------------------------------------------+
/// | | Peer |
/// | | +-----------+ |
/// | | -- Partial -->| Cache | |
/// | | | +-----------+ |
/// | v | | | |
/// | Download | Miss | |
/// | Task -->| | --- Hit ------>|<-- 2.Download
/// | | | ^ |
/// | | v | |
/// | | +-----------+ | |
/// | -- Full -->| Disk |---------- |
/// | +-----------+ |
/// | |
/// +--------------------------------------------------+
/// ```
///```text
/// +--------+
/// │ Source │
/// +--------+
/// ^ ^ Preheat
/// │ │ |
/// +-----------------+ │ │ +----------------------------+
/// │ Other Peers │ │ │ │ Peer | │
/// │ │ │ │ │ v │
/// │ +----------+ │ │ │ │ +----------+ │
/// │ │ Cache |<--|----------|<-Miss--| Cache |--Hit-->|<----Download CacheTask
/// │ +----------+ │ │ │ +----------+ │
/// │ │ │ │ │
/// │ +----------+ │ │ │ +----------+ │
/// │ │ Disk |<--|----------|<-Miss--| Disk |--Hit-->|<----Download Task
/// │ +----------+ │ │ +----------+ │
/// │ │ │ ^ │
/// │ │ │ | │
/// +-----------------+ +----------------------------+
/// |
/// Preheat
///```
/// Task is the metadata of the task.
#[derive(Clone)]
pub struct Cache {

View File

@ -23,6 +23,7 @@ rustls-pki-types.workspace = true
rustls-pemfile.workspace = true
sha2.workspace = true
uuid.workspace = true
sysinfo.workspace = true
hex.workspace = true
crc32fast.workspace = true
openssl.workspace = true

View File

@ -14,48 +14,15 @@
* limitations under the License.
*/
use bytesize::{ByteSize, MB};
use bytesize::ByteSize;
use pnet::datalink::{self, NetworkInterface};
use std::cmp::min;
use std::net::IpAddr;
#[cfg(not(target_os = "linux"))]
use tracing::warn;
/// get_interface_by_ip returns the name of the network interface that has the specified IP
/// address.
pub fn get_interface_by_ip(ip: IpAddr) -> Option<NetworkInterface> {
for interface in datalink::interfaces() {
for ip_network in interface.ips.iter() {
if ip_network.ip() == ip {
return Some(interface);
}
}
}
None
}
/// get_interface_speed_by_ip returns the speed of the network interface that has the specified IP
/// address in Mbps.
pub fn get_interface_speed(interface_name: &str) -> Option<u64> {
#[cfg(target_os = "linux")]
{
let speed_path = format!("/sys/class/net/{}/speed", interface_name);
std::fs::read_to_string(&speed_path)
.ok()
.and_then(|speed_str| speed_str.trim().parse::<u64>().ok())
}
#[cfg(not(target_os = "linux"))]
{
warn!(
"can not get interface {} speed on non-linux platform",
interface_name
);
None
}
}
use std::sync::Arc;
use std::time::Duration;
use sysinfo::Networks;
use tokio::sync::Mutex;
use tracing::{info, warn};
/// Interface represents a network interface with its information.
#[derive(Debug, Clone, Default)]
@ -63,23 +30,201 @@ pub struct Interface {
/// name is the name of the network interface.
pub name: String,
// bandwidth is the bandwidth of the network interface in Mbps.
/// bandwidth is the bandwidth of the network interface in bps.
pub bandwidth: u64,
// network_data_mutex is a mutex to protect access to network data.
network_data_mutex: Arc<Mutex<()>>,
}
/// get_interface_info returns the network interface information for the specified IP address.
pub fn get_interface_info(ip: IpAddr, rate_limit: ByteSize) -> Option<Interface> {
let rate_limit = rate_limit.as_u64() / MB * 8; // convert to Mbps
/// NetworkData represents the network data for a specific interface,
#[derive(Debug, Clone, Default)]
pub struct NetworkData {
/// max_rx_bandwidth is the maximum receive bandwidth of the interface in bps.
pub max_rx_bandwidth: u64,
let interface = get_interface_by_ip(ip)?;
match get_interface_speed(&interface.name) {
Some(speed) => Some(Interface {
name: interface.name,
bandwidth: min(speed, rate_limit),
}),
None => Some(Interface {
name: interface.name,
bandwidth: rate_limit,
}),
/// rx_bandwidth is the current receive bandwidth of the interface in bps.
pub rx_bandwidth: Option<u64>,
/// max_tx_bandwidth is the maximum transmit bandwidth of the interface in bps.
pub max_tx_bandwidth: u64,
/// tx_bandwidth is the current transmit bandwidth of the interface in bps.
pub tx_bandwidth: Option<u64>,
}
/// Interface methods provide functionality to get network interface information.
impl Interface {
/// DEFAULT_NETWORKS_REFRESH_INTERVAL is the default interval for refreshing network data.
const DEFAULT_NETWORKS_REFRESH_INTERVAL: Duration = Duration::from_secs(2);
/// new creates a new Interface instance based on the provided IP address and rate limit.
pub fn new(ip: IpAddr, rate_limit: ByteSize) -> Interface {
let rate_limit = Self::byte_size_to_bits(rate_limit); // convert to bps
let Some(interface) = Self::get_network_interface_by_ip(ip) else {
warn!(
"can not find interface for IP address {}, network interface unknown with bandwidth {} bps",
ip, rate_limit
);
return Interface {
name: "unknown".to_string(),
bandwidth: rate_limit,
network_data_mutex: Arc::new(Mutex::new(())),
};
};
match Self::get_speed(&interface.name) {
Some(speed) => {
let bandwidth = min(Self::megabits_to_bits(speed), rate_limit);
info!(
"network interface {} with bandwidth {} bps",
interface.name, bandwidth
);
Interface {
name: interface.name,
bandwidth,
network_data_mutex: Arc::new(Mutex::new(())),
}
}
None => {
warn!(
"can not get speed, network interface {} with bandwidth {} bps",
interface.name, rate_limit
);
Interface {
name: interface.name,
bandwidth: rate_limit,
network_data_mutex: Arc::new(Mutex::new(())),
}
}
}
}
/// get_network_data retrieves the network data for the interface.
pub async fn get_network_data(&self) -> NetworkData {
// Lock the mutex to ensure exclusive access to network data.
let _guard = self.network_data_mutex.lock().await;
// Initialize sysinfo network.
let mut networks = Networks::new_with_refreshed_list();
// Sleep to calculate the network traffic difference over
// the DEFAULT_NETWORKS_REFRESH_INTERVAL.
tokio::time::sleep(Self::DEFAULT_NETWORKS_REFRESH_INTERVAL).await;
// Refresh network information.
networks.refresh();
let Some(network_data) = networks.get(self.name.as_str()) else {
warn!("can not find network data for interface {}", self.name);
return NetworkData {
max_rx_bandwidth: self.bandwidth,
max_tx_bandwidth: self.bandwidth,
..Default::default()
};
};
// Calculate the receive and transmit bandwidth in bits per second.
let rx_bandwidth = (Self::bytes_to_bits(network_data.received()) as f64
/ Self::DEFAULT_NETWORKS_REFRESH_INTERVAL.as_secs_f64())
.round() as u64;
// Calculate the transmit bandwidth in bits per second.
let tx_bandwidth = (Self::bytes_to_bits(network_data.transmitted()) as f64
/ Self::DEFAULT_NETWORKS_REFRESH_INTERVAL.as_secs_f64())
.round() as u64;
NetworkData {
max_rx_bandwidth: self.bandwidth,
rx_bandwidth: Some(rx_bandwidth),
max_tx_bandwidth: self.bandwidth,
tx_bandwidth: Some(tx_bandwidth),
}
}
/// get_speed returns the speed of the network interface in Mbps.
pub fn get_speed(name: &str) -> Option<u64> {
#[cfg(target_os = "linux")]
{
let speed_path = format!("/sys/class/net/{}/speed", name);
std::fs::read_to_string(&speed_path)
.ok()
.and_then(|speed_str| speed_str.trim().parse::<u64>().ok())
}
#[cfg(not(target_os = "linux"))]
{
warn!("can not get interface {} speed on non-linux platform", name);
None
}
}
/// get_network_interface_by_ip returns the network interface that has the specified
/// IP address.
pub fn get_network_interface_by_ip(ip: IpAddr) -> Option<NetworkInterface> {
datalink::interfaces()
.into_iter()
.find(|interface| interface.ips.iter().any(|ip_net| ip_net.ip() == ip))
}
/// byte_size_to_bits converts a ByteSize to bits.
pub fn byte_size_to_bits(size: ByteSize) -> u64 {
size.as_u64() * 8
}
/// megabits_to_bit converts megabits to bits.
pub fn megabits_to_bits(size: u64) -> u64 {
size * 1_000_000 // 1 Mbit = 1,000,000 bits
}
/// bytes_to_bits converts bytes to bits.
pub fn bytes_to_bits(size: u64) -> u64 {
size * 8 // 1 byte = 8 bits
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytesize::ByteSize;
#[test]
fn test_byte_size_to_bits() {
let test_cases = vec![
(ByteSize::kb(1), 8_000u64),
(ByteSize::mb(1), 8_000_000u64),
(ByteSize::gb(1), 8_000_000_000u64),
(ByteSize::b(0), 0u64),
];
for (input, expected) in test_cases {
let result = Interface::byte_size_to_bits(input);
assert_eq!(result, expected);
}
}
#[test]
fn test_megabits_to_bits() {
let test_cases = vec![
(1u64, 1_000_000u64),
(1000u64, 1_000_000_000u64),
(0u64, 0u64),
];
for (input, expected) in test_cases {
let result = Interface::megabits_to_bits(input);
assert_eq!(result, expected);
}
}
#[test]
fn test_bytes_to_bits() {
let test_cases = vec![(1u64, 8u64), (1000u64, 8_000u64), (0u64, 0u64)];
for (input, expected) in test_cases {
let result = Interface::bytes_to_bits(input);
assert_eq!(result, expected);
}
}
}

View File

@ -62,6 +62,7 @@ fs2.workspace = true
lazy_static.workspace = true
futures.workspace = true
local-ip-address.workspace = true
sysinfo.workspace = true
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] }
tracing-panic = "0.1.2"
@ -74,7 +75,6 @@ rolling-file = "0.2.0"
pprof = { version = "0.15", features = ["flamegraph", "protobuf-codec"] }
prometheus = { version = "0.13", features = ["process"] }
tonic-health = "0.12.3"
sysinfo = { version = "0.32.1", default-features = false, features = ["component", "disk", "network", "system", "user"] }
tower = { version = "0.4.13", features = ["limit", "load-shed", "buffer"] }
indicatif = "0.18.0"
hashring = "0.3.6"
@ -85,7 +85,7 @@ tabled = "0.20.0"
path-absolutize = "3.1.1"
dashmap = "6.1.0"
fastrand = "2.3.0"
glob = "0.3.2"
glob = "0.3.3"
console-subscriber = "0.4.1"
[dev-dependencies]

View File

@ -14,10 +14,9 @@
* limitations under the License.
*/
use crate::grpc::{manager::ManagerClient, scheduler::SchedulerClient};
use crate::grpc::scheduler::SchedulerClient;
use crate::shutdown;
use dragonfly_api::common::v2::{Build, Cpu, Disk, Host, Memory, Network};
use dragonfly_api::manager::v2::{DeleteSeedPeerRequest, SourceType, UpdateSeedPeerRequest};
use dragonfly_api::scheduler::v2::{AnnounceHostRequest, DeleteHostRequest};
use dragonfly_client_config::{
dfdaemon::{Config, HostType},
@ -25,89 +24,13 @@ use dragonfly_client_config::{
};
use dragonfly_client_core::error::{ErrorType, OrErr};
use dragonfly_client_core::Result;
use dragonfly_client_util::net::Interface;
use std::env;
use std::sync::Arc;
use std::time::Duration;
use sysinfo::System;
use tokio::sync::mpsc;
use tracing::{error, info, instrument};
/// ManagerAnnouncer is used to announce the dfdaemon information to the manager.
pub struct ManagerAnnouncer {
/// config is the configuration of the dfdaemon.
config: Arc<Config>,
/// manager_client is the grpc client of the manager.
manager_client: Arc<ManagerClient>,
/// shutdown is used to shutdown the announcer.
shutdown: shutdown::Shutdown,
/// _shutdown_complete is used to notify the announcer is shutdown.
_shutdown_complete: mpsc::UnboundedSender<()>,
}
/// ManagerAnnouncer implements the manager announcer of the dfdaemon.
impl ManagerAnnouncer {
/// new creates a new manager announcer.
pub fn new(
config: Arc<Config>,
manager_client: Arc<ManagerClient>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Self {
Self {
config,
manager_client,
shutdown,
_shutdown_complete: shutdown_complete_tx,
}
}
/// run announces the dfdaemon information to the manager.
pub async fn run(&self) -> Result<()> {
// Clone the shutdown channel.
let mut shutdown = self.shutdown.clone();
// If the seed peer is enabled, we should announce the seed peer to the manager.
if self.config.seed_peer.enable {
// Register the seed peer to the manager.
self.manager_client
.update_seed_peer(UpdateSeedPeerRequest {
source_type: SourceType::SeedPeerSource.into(),
hostname: self.config.host.hostname.clone(),
r#type: self.config.seed_peer.kind.to_string(),
idc: self.config.host.idc.clone(),
location: self.config.host.location.clone(),
ip: self.config.host.ip.unwrap().to_string(),
port: self.config.upload.server.port as i32,
download_port: self.config.upload.server.port as i32,
seed_peer_cluster_id: self.config.seed_peer.cluster_id,
})
.await?;
// Announce to scheduler shutting down with signals.
shutdown.recv().await;
// Delete the seed peer from the manager.
self.manager_client
.delete_seed_peer(DeleteSeedPeerRequest {
source_type: SourceType::SeedPeerSource.into(),
hostname: self.config.host.hostname.clone(),
ip: self.config.host.ip.unwrap().to_string(),
seed_peer_cluster_id: self.config.seed_peer.cluster_id,
})
.await?;
info!("announce to manager shutting down");
} else {
shutdown.recv().await;
info!("announce to manager shutting down");
}
Ok(())
}
}
use tracing::{debug, error, info, instrument};
/// Announcer is used to announce the dfdaemon information to the manager and scheduler.
pub struct SchedulerAnnouncer {
@ -120,6 +43,9 @@ pub struct SchedulerAnnouncer {
/// scheduler_client is the grpc client of the scheduler.
scheduler_client: Arc<SchedulerClient>,
/// interface is the network interface.
interface: Arc<Interface>,
/// shutdown is used to shutdown the announcer.
shutdown: shutdown::Shutdown,
@ -134,6 +60,7 @@ impl SchedulerAnnouncer {
config: Arc<Config>,
host_id: String,
scheduler_client: Arc<SchedulerClient>,
interface: Arc<Interface>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Result<Self> {
@ -141,6 +68,7 @@ impl SchedulerAnnouncer {
config,
host_id,
scheduler_client,
interface,
shutdown,
_shutdown_complete: shutdown_complete_tx,
};
@ -148,7 +76,7 @@ impl SchedulerAnnouncer {
// Initialize the scheduler announcer.
announcer
.scheduler_client
.init_announce_host(announcer.make_announce_host_request(Duration::ZERO)?)
.init_announce_host(announcer.make_announce_host_request(Duration::ZERO).await?)
.await?;
Ok(announcer)
}
@ -163,7 +91,7 @@ impl SchedulerAnnouncer {
loop {
tokio::select! {
_ = interval.tick() => {
let request = match self.make_announce_host_request(interval.period()) {
let request = match self.make_announce_host_request(interval.period()).await {
Ok(request) => request,
Err(err) => {
error!("make announce host request failed: {}", err);
@ -192,7 +120,7 @@ impl SchedulerAnnouncer {
/// make_announce_host_request makes the announce host request.
#[instrument(skip_all)]
fn make_announce_host_request(&self, interval: Duration) -> Result<AnnounceHostRequest> {
async fn make_announce_host_request(&self, interval: Duration) -> Result<AnnounceHostRequest> {
// If the seed peer is enabled, we should announce the seed peer to the scheduler.
let host_type = if self.config.seed_peer.enable {
self.config.seed_peer.kind
@ -228,25 +156,25 @@ impl SchedulerAnnouncer {
free: sys.free_memory(),
};
// Wait for getting the network data.
let network_data = self.interface.get_network_data().await;
debug!(
"network data: rx bandwidth {}/{} bps, tx bandwidth {}/{} bps",
network_data.rx_bandwidth.unwrap_or(0),
network_data.max_rx_bandwidth,
network_data.tx_bandwidth.unwrap_or(0),
network_data.max_tx_bandwidth
);
// Get the network information.
let network = Network {
// TODO: Get the count of the tcp connection.
tcp_connection_count: 0,
// TODO: Get the count of the upload tcp connection.
upload_tcp_connection_count: 0,
idc: self.config.host.idc.clone(),
location: self.config.host.location.clone(),
// TODO: Get the network download rate, refer to
// https://docs.rs/sysinfo/latest/sysinfo/struct.NetworkData.html#method.received.
download_rate: 0,
download_rate_limit: self.config.download.rate_limit.as_u64(),
// TODO: Get the network download rate, refer to
// https://docs.rs/sysinfo/latest/sysinfo/struct.NetworkData.html#method.transmitted
upload_rate: 0,
upload_rate_limit: self.config.upload.rate_limit.as_u64(),
max_rx_bandwidth: network_data.max_rx_bandwidth,
rx_bandwidth: network_data.rx_bandwidth,
max_tx_bandwidth: network_data.max_tx_bandwidth,
tx_bandwidth: network_data.tx_bandwidth,
..Default::default()
};
// Get the disk information.

View File

@ -15,7 +15,7 @@
*/
use clap::Parser;
use dragonfly_client::announcer::{ManagerAnnouncer, SchedulerAnnouncer};
use dragonfly_client::announcer::SchedulerAnnouncer;
use dragonfly_client::dynconfig::Dynconfig;
use dragonfly_client::gc::GC;
use dragonfly_client::grpc::{
@ -32,7 +32,7 @@ use dragonfly_client::tracing::init_tracing;
use dragonfly_client_backend::BackendFactory;
use dragonfly_client_config::{dfdaemon, VersionValueParser};
use dragonfly_client_storage::Storage;
use dragonfly_client_util::id_generator::IDGenerator;
use dragonfly_client_util::{id_generator::IDGenerator, net::Interface};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
@ -229,6 +229,9 @@ async fn main() -> Result<(), anyhow::Error> {
)?;
let persistent_cache_task = Arc::new(persistent_cache_task);
let interface = Interface::new(config.host.ip.unwrap(), config.upload.rate_limit);
let interface = Arc::new(interface);
// Initialize health server.
let health = Health::new(
SocketAddr::new(config.health.server.ip.unwrap(), config.health.server.port),
@ -258,19 +261,12 @@ async fn main() -> Result<(), anyhow::Error> {
shutdown_complete_tx.clone(),
);
// Initialize manager announcer.
let manager_announcer = ManagerAnnouncer::new(
config.clone(),
manager_client.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
// Initialize scheduler announcer.
let scheduler_announcer = SchedulerAnnouncer::new(
config.clone(),
id_generator.host_id(),
scheduler_client.clone(),
interface.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
)
@ -285,6 +281,7 @@ async fn main() -> Result<(), anyhow::Error> {
SocketAddr::new(config.upload.server.ip.unwrap(), config.upload.server.port),
task.clone(),
persistent_cache_task.clone(),
interface.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
@ -333,10 +330,6 @@ async fn main() -> Result<(), anyhow::Error> {
info!("stats server exited");
},
_ = tokio::spawn(async move { manager_announcer.run().await.unwrap_or_else(|err| error!("announcer manager failed: {}", err))} ) => {
info!("announcer manager exited");
},
_ = tokio::spawn(async move { scheduler_announcer.run().await }) => {
info!("announcer scheduler exited");
},

View File

@ -24,7 +24,6 @@ use crate::metrics::{
};
use crate::resource::{persistent_cache_task, task};
use crate::shutdown;
use bytesize::MB;
use dragonfly_api::common::v2::{
CacheTask, Host, Network, PersistentCacheTask, Piece, Priority, Task, TaskType,
};
@ -51,14 +50,13 @@ use dragonfly_client_core::{
use dragonfly_client_util::{
http::{get_range, hashmap_to_headermap, headermap_to_hashmap},
id_generator::TaskIDParameter,
net::{get_interface_info, Interface},
net::Interface,
};
use opentelemetry::Context;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use sysinfo::Networks;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
@ -70,7 +68,7 @@ use tonic::{
Code, Request, Response, Status,
};
use tower::ServiceBuilder;
use tracing::{error, info, instrument, Instrument, Span};
use tracing::{debug, error, info, instrument, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use url::Url;
@ -90,6 +88,9 @@ pub struct DfdaemonUploadServer {
/// persistent_cache_task is the persistent cache task manager.
persistent_cache_task: Arc<persistent_cache_task::PersistentCacheTask>,
/// interface is the network interface.
interface: Arc<Interface>,
/// shutdown is used to shutdown the grpc server.
shutdown: shutdown::Shutdown,
@ -105,6 +106,7 @@ impl DfdaemonUploadServer {
addr: SocketAddr,
task: Arc<task::Task>,
persistent_cache_task: Arc<persistent_cache_task::PersistentCacheTask>,
interface: Arc<Interface>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Self {
@ -112,6 +114,7 @@ impl DfdaemonUploadServer {
config,
addr,
task,
interface,
persistent_cache_task,
shutdown,
_shutdown_complete: shutdown_complete_tx,
@ -120,16 +123,12 @@ impl DfdaemonUploadServer {
/// run starts the upload server.
pub async fn run(&mut self, grpc_server_started_barrier: Arc<Barrier>) -> ClientResult<()> {
// Initialize the grpc service.
let interface =
get_interface_info(self.config.host.ip.unwrap(), self.config.upload.rate_limit);
let service = DfdaemonUploadGRPCServer::with_interceptor(
DfdaemonUploadServerHandler {
interface,
socket_path: self.config.download.server.socket_path.clone(),
task: self.task.clone(),
persistent_cache_task: self.persistent_cache_task.clone(),
interface: self.interface.clone(),
},
ExtractTracingInterceptor,
);
@ -205,9 +204,6 @@ impl DfdaemonUploadServer {
/// DfdaemonUploadServerHandler is the handler of the dfdaemon upload grpc service.
pub struct DfdaemonUploadServerHandler {
/// interface is the network interface.
interface: Option<Interface>,
/// socket_path is the path of the unix domain socket.
socket_path: PathBuf,
@ -216,6 +212,9 @@ pub struct DfdaemonUploadServerHandler {
/// persistent_cache_task is the persistent cache task manager.
persistent_cache_task: Arc<persistent_cache_task::PersistentCacheTask>,
/// interface is the network interface.
interface: Arc<Interface>,
}
/// DfdaemonUploadServerHandler implements the dfdaemon upload grpc service.
@ -1003,9 +1002,6 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
Span::current().set_parent(parent_ctx.clone());
};
// DEFAULT_HOST_INFO_REFRESH_INTERVAL is the default interval for refreshing the host info.
const DEFAULT_HOST_INFO_REFRESH_INTERVAL: Duration = Duration::from_millis(500);
// Clone the request.
let request = request.into_inner();
@ -1027,42 +1023,42 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
// Get local interface.
let interface = self.interface.clone();
// DEFAULT_HOST_INFO_REFRESH_INTERVAL is the default interval for refreshing the host info.
const DEFAULT_HOST_INFO_REFRESH_INTERVAL: Duration = Duration::from_millis(500);
// Initialize stream channel.
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
tokio::spawn(
async move {
// Initialize sysinfo network.
let mut networks = Networks::new_with_refreshed_list();
// Start the host info update loop.
loop {
// Sleep to calculate the network traffic difference over
// the DEFAULT_HOST_INFO_REFRESH_INTERVAL.
// Wait for the host info refresh interval.
tokio::time::sleep(DEFAULT_HOST_INFO_REFRESH_INTERVAL).await;
// Refresh network information.
networks.refresh();
// Init response.
let mut host = Host::default();
if let Some(interface) = &interface {
if let Some(network_data) = networks.get(&interface.name) {
host.network = Some(Network {
download_rate: network_data.received()
/ DEFAULT_HOST_INFO_REFRESH_INTERVAL.as_secs(),
// Convert bandwidth to bytes per second.
download_rate_limit: interface.bandwidth / 8 * MB,
upload_rate: network_data.transmitted()
/ DEFAULT_HOST_INFO_REFRESH_INTERVAL.as_secs(),
// Convert bandwidth to bytes per second.
upload_rate_limit: interface.bandwidth / 8 * MB,
..Default::default()
});
};
}
// Wait for getting the network data.
let network_data = interface.get_network_data().await;
debug!(
"network data: rx bandwidth {}/{} bps, tx bandwidth {}/{} bps",
network_data.rx_bandwidth.unwrap_or(0),
network_data.max_rx_bandwidth,
network_data.tx_bandwidth.unwrap_or(0),
network_data.max_tx_bandwidth
);
// Send host info.
match out_stream_tx.send(Ok(host.clone())).await {
match out_stream_tx
.send(Ok(Host {
network: Some(Network {
max_rx_bandwidth: network_data.max_rx_bandwidth,
rx_bandwidth: network_data.rx_bandwidth,
max_tx_bandwidth: network_data.max_tx_bandwidth,
tx_bandwidth: network_data.tx_bandwidth,
..Default::default()
}),
..Default::default()
}))
.await
{
Ok(_) => {}
Err(err) => {
error!(
@ -1070,7 +1066,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
remote_host_id, err
);
break;
return;
}
};
}