feat: get hashring by prefix string in task id (#203)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-01-08 22:20:20 +08:00 committed by GitHub
parent dc1da74722
commit 65769f452d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 10 deletions

View File

@ -117,7 +117,7 @@ fn default_scheduler_enable_back_to_source() -> bool {
// default_dynconfig_refresh_interval is the default interval to refresh dynamic configuration from manager.
#[inline]
fn default_dynconfig_refresh_interval() -> Duration {
Duration::from_secs(1800)
Duration::from_secs(300)
}
// default_seed_peer_cluster_id is the default cluster id of seed peer.

View File

@ -263,7 +263,9 @@ impl SchedulerClient {
// Get the scheduler address from the hashring.
let addr = self.hashring.read().await;
let addr = addr.get(&key).ok_or_else(|| Error::HashRing(key.clone()))?;
let addr = addr
.get(&key[0..5].to_string())
.ok_or_else(|| Error::HashRing(key.clone()))?;
info!("{} picked {:?}", key, addr);
let channel = match Channel::from_shared(format!("http://{}", addr.to_string()))
@ -307,7 +309,10 @@ impl SchedulerClient {
.iter()
.all(|available_scheduler| available_schedulers.contains(available_scheduler))
{
info!("available schedulers is not changed");
info!(
"available schedulers is not changed: {:?}",
data.available_schedulers
);
return Ok(());
}
drop(available_schedulers);

View File

@ -692,6 +692,7 @@ impl Task {
number: u32,
parent: Peer,
piece: Arc<piece::Piece>,
storage: Arc<Storage>,
semaphore: Arc<Semaphore>,
) -> ClientResult<metadata::Piece> {
let _permit = semaphore.acquire().await.map_err(|err| {
@ -708,7 +709,7 @@ impl Task {
.map_err(|err| {
error!(
"download piece {} from remote peer {:?} error: {:?}",
number,
storage.piece_id(task_id.as_str(), number),
parent.id.clone(),
err
);
@ -726,6 +727,7 @@ impl Task {
collect_piece.number,
collect_piece.parent.clone(),
self.piece.clone(),
self.storage.clone(),
semaphore.clone(),
));
}
@ -752,7 +754,8 @@ impl Task {
info!(
"finished piece {} from remote peer {:?}",
metadata.number, metadata.parent_id
self.storage.piece_id(task_id, metadata.number),
metadata.parent_id
);
// Construct the piece.
@ -807,7 +810,9 @@ impl Task {
Ok(Err(Error::DownloadFromRemotePeerFailed(err))) => {
error!(
"download piece {} from remote peer {} error: {:?}",
err.piece_number, err.parent_id, err
self.storage.piece_id(task_id, err.piece_number),
err.parent_id,
err
);
// Send the download piece failed request.
@ -932,7 +937,10 @@ impl Task {
)
.await?;
info!("finished piece {} from source", metadata.number);
info!(
"finished piece {} from source",
self.storage.piece_id(task_id, metadata.number)
);
// Construct the piece.
let piece = Piece {
@ -1104,7 +1112,8 @@ impl Task {
Err(err) => {
info!(
"download piece {} from local peer error: {:?}",
interested_piece.number, err
self.storage.piece_id(task_id, interested_piece.number),
err
);
continue;
}
@ -1126,7 +1135,10 @@ impl Task {
)
.await?;
info!("finished piece {} from local peer", metadata.number);
info!(
"finished piece {} from local peer",
self.storage.piece_id(task_id, metadata.number)
);
// Construct the piece.
let piece = Piece {
@ -1244,7 +1256,10 @@ impl Task {
)
.await?;
info!("finished piece {} from source", metadata.number);
info!(
"finished piece {} from source",
self.storage.piece_id(task_id, metadata.number)
);
// Construct the piece.
let piece = Piece {