fix: solve the memory leak caused by rayon (#1102)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-04-18 17:49:05 +08:00 committed by GitHub
parent f3b1b67607
commit 2a0ef8ec19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 28 additions and 30 deletions

18
Cargo.lock generated
View File

@ -953,7 +953,7 @@ dependencies = [
[[package]]
name = "dragonfly-client"
version = "0.2.23"
version = "0.2.24"
dependencies = [
"anyhow",
"bytes",
@ -1022,7 +1022,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-backend"
version = "0.2.23"
version = "0.2.24"
dependencies = [
"dragonfly-api",
"dragonfly-client-core",
@ -1053,7 +1053,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-config"
version = "0.2.23"
version = "0.2.24"
dependencies = [
"bytesize",
"bytesize-serde",
@ -1081,7 +1081,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-core"
version = "0.2.23"
version = "0.2.24"
dependencies = [
"headers 0.4.0",
"hyper 1.6.0",
@ -1099,7 +1099,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-init"
version = "0.2.23"
version = "0.2.24"
dependencies = [
"anyhow",
"clap",
@ -1117,7 +1117,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-storage"
version = "0.2.23"
version = "0.2.24"
dependencies = [
"bincode",
"bytes",
@ -1133,7 +1133,6 @@ dependencies = [
"lru",
"num_cpus",
"prost-wkt-types",
"rayon",
"reqwest",
"rocksdb",
"serde",
@ -1146,7 +1145,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-util"
version = "0.2.23"
version = "0.2.24"
dependencies = [
"base64 0.22.1",
"bytesize",
@ -1559,7 +1558,7 @@ dependencies = [
[[package]]
name = "hdfs"
version = "0.2.23"
version = "0.2.24"
dependencies = [
"dragonfly-client-backend",
"dragonfly-client-core",
@ -4558,7 +4557,6 @@ dependencies = [
"libc",
"memchr",
"ntapi",
"rayon",
"windows 0.57.0",
]

View File

@ -12,7 +12,7 @@ members = [
]
[workspace.package]
version = "0.2.23"
version = "0.2.24"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"
[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.2.23" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.23" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.23" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.23" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.23" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.23" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.23" }
dragonfly-client = { path = "dragonfly-client", version = "0.2.24" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.24" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.24" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.24" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.24" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.24" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.24" }
dragonfly-api = "=2.1.36"
thiserror = "1.0"
futures = "0.3.31"

View File

@ -30,7 +30,6 @@ bytes.workspace = true
bytesize.workspace = true
num_cpus = "1.0"
bincode = "1.3.3"
rayon = "1.10.0"
[dev-dependencies]
tempdir = "0.3"

View File

@ -18,7 +18,6 @@ use chrono::{NaiveDateTime, Utc};
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::{digest, http::headermap_to_hashmap};
use rayon::prelude::*;
use reqwest::header::HeaderMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@ -527,7 +526,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
.collect::<Result<Vec<Box<[u8]>>>>()?;
tasks
.par_iter()
.iter()
.map(|task| Task::deserialize_from(task))
.collect()
}
@ -864,7 +863,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
.collect::<Result<Vec<Box<[u8]>>>>()?;
pieces
.par_iter()
.iter()
.map(|piece| Piece::deserialize_from(piece))
.collect()
}
@ -889,7 +888,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
.collect::<Result<Vec<Box<[u8]>>>>()?;
let piece_ids_refs = piece_ids
.par_iter()
.iter()
.map(|id| {
let id_ref = id.as_ref();
info!(

View File

@ -77,7 +77,7 @@ opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] }
pprof = { version = "0.14", features = ["flamegraph", "protobuf-codec"] }
prometheus = { version = "0.13", features = ["process"] }
tonic-health = "0.12.3"
sysinfo = "0.32.1"
sysinfo = { version = "0.32.1", default-features = false, features = ["component", "disk", "network", "system", "user"] }
tower = { version = "0.4.13", features = ["limit", "load-shed", "buffer"] }
indicatif = "0.17.11"
hashring = "0.3.6"

View File

@ -26,7 +26,7 @@ use dragonfly_client_config::{
use dragonfly_client_core::error::{ErrorType, OrErr};
use dragonfly_client_core::Result;
use std::env;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;
use sysinfo::System;
use tokio::sync::mpsc;
@ -122,9 +122,6 @@ pub struct SchedulerAnnouncer {
/// scheduler_client is the grpc client of the scheduler.
scheduler_client: Arc<SchedulerClient>,
// system is the system information.
system: Arc<Mutex<System>>,
/// shutdown is used to shutdown the announcer.
shutdown: shutdown::Shutdown,
@ -147,7 +144,6 @@ impl SchedulerAnnouncer {
config,
host_id,
scheduler_client,
system: Arc::new(Mutex::new(System::new_all())),
shutdown,
_shutdown_complete: shutdown_complete_tx,
};
@ -209,7 +205,7 @@ impl SchedulerAnnouncer {
};
// Refresh the system information.
let mut sys = self.system.lock().unwrap();
let mut sys = System::new_all();
sys.refresh_all();
// Get the process information.

View File

@ -127,6 +127,7 @@ impl GC {
}
}
info!("evict by task ttl done");
Ok(())
}
@ -153,6 +154,8 @@ impl GC {
if let Err(err) = self.evict_task_space(need_evict_space as u64).await {
info!("failed to evict task by disk usage: {}", err);
}
info!("evict task by disk usage done");
}
Ok(())
@ -241,6 +244,7 @@ impl GC {
}
}
info!("evict by persistent cache task ttl done");
Ok(())
}
@ -270,6 +274,8 @@ impl GC {
{
info!("failed to evict task by disk usage: {}", err);
}
info!("evict persistent cache task by disk usage done");
}
Ok(())