From 1a14267135194bb10a1279ceb36b90becab341bc Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 12 Mar 2025 12:24:21 +0800 Subject: [PATCH] handle no leader error Signed-off-by: Ping Yu --- src/common/errors.rs | 5 +++-- src/region.rs | 4 ++-- src/request/plan.rs | 42 +++++++++++++++++++++++++++++------------- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/src/common/errors.rs b/src/common/errors.rs index 246aff0..1798be7 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -5,6 +5,7 @@ use std::result; use thiserror::Error; use crate::proto::kvrpcpb; +use crate::region::RegionVerId; use crate::BoundRange; /// An error originating from the TiKV client or dependencies. @@ -89,8 +90,8 @@ pub enum Error { #[error("Region {} is not found in the response", region_id)] RegionNotFoundInResponse { region_id: u64 }, /// No leader is found for the given id. - #[error("Leader of region {} is not found", region_id)] - LeaderNotFound { region_id: u64 }, + #[error("Leader of region {} is not found", region.id)] + LeaderNotFound { region: RegionVerId }, /// Scan limit exceeds the maximum #[error("Limit {} exceeds max scan limit {}", limit, max_limit)] MaxScanLimitExceeded { limit: u32, max_limit: u32 }, diff --git a/src/region.rs b/src/region.rs index 8e58522..2a5f45d 100644 --- a/src/region.rs +++ b/src/region.rs @@ -47,7 +47,7 @@ impl RegionWithLeader { self.leader .as_ref() .ok_or(Error::LeaderNotFound { - region_id: self.region.id, + region: self.ver_id(), }) .map(|l| { let mut ctx = kvrpcpb::Context::default(); @@ -89,7 +89,7 @@ impl RegionWithLeader { .as_ref() .cloned() .ok_or_else(|| Error::LeaderNotFound { - region_id: self.id(), + region: self.ver_id(), }) .map(|s| s.store_id) } diff --git a/src/request/plan.rs b/src/request/plan.rs index a3da3ec..26c04db 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -17,6 +17,7 @@ use crate::pd::PdClient; use crate::proto::errorpb; use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; +use crate::region::RegionVerId; use crate::request::shard::HasNextBatch; use crate::request::NextBatch; use crate::request::Shardable; @@ -115,13 +116,10 @@ where let shards = current_plan.shards(&pd_client).collect::>().await; let mut handles = Vec::new(); for shard in shards { - let (shard, region_store) = shard?; - let mut clone = current_plan.clone(); - clone.apply_shard(shard, ®ion_store)?; let handle = tokio::spawn(Self::single_shard_handler( pd_client.clone(), - clone, - region_store, + current_plan.clone(), + shard, backoff.clone(), permits.clone(), preserve_region_results, @@ -152,12 +150,31 @@ where #[async_recursion] async fn single_shard_handler( pd_client: Arc, - plan: P, - region_store: RegionStore, + mut plan: P, + shard: Result<(

::Shard, RegionStore)>, mut backoff: Backoff, permits: Arc, preserve_region_results: bool, ) -> Result<::Result> { + let region_store = match shard.and_then(|(shard, region_store)| { + plan.apply_shard(shard, ®ion_store).map(|_| region_store) + }) { + Ok(region_store) => region_store, + Err(Error::LeaderNotFound { region }) => { + return Self::handle_other_error( + pd_client, + plan, + region.clone(), + backoff, + permits, + preserve_region_results, + Error::LeaderNotFound { region }, + ) + .await + } + Err(err) => return Err(err), + }; + // limit concurrent requests let permit = permits.acquire().await.unwrap(); let res = plan.execute().await; @@ -166,10 +183,10 @@ where let mut resp = match res { Ok(resp) => resp, Err(e) if is_grpc_error(&e) => { - return Self::handle_grpc_error( + return Self::handle_other_error( pd_client, plan, - region_store, + region_store.region_with_leader.ver_id(), backoff, permits, preserve_region_results, @@ -303,18 +320,17 @@ where Ok(false) } - async fn handle_grpc_error( + async fn handle_other_error( pd_client: Arc, plan: P, - region_store: RegionStore, + region: RegionVerId, mut backoff: Backoff, permits: Arc, preserve_region_results: bool, e: Error, ) -> Result<::Result> { debug!("handle grpc error: {:?}", e); - let ver_id = region_store.region_with_leader.ver_id(); - pd_client.invalidate_region_cache(ver_id).await; + pd_client.invalidate_region_cache(region).await; match backoff.next_delay_duration() { Some(duration) => { sleep(duration).await;