handle no leader error

Signed-off-by: Ping Yu <yuping@pingcap.com>
This commit is contained in:
Ping Yu 2025-03-12 12:24:21 +08:00
parent 83ad8119f5
commit 1a14267135
3 changed files with 34 additions and 17 deletions

View File

@ -5,6 +5,7 @@ use std::result;
use thiserror::Error; use thiserror::Error;
use crate::proto::kvrpcpb; use crate::proto::kvrpcpb;
use crate::region::RegionVerId;
use crate::BoundRange; use crate::BoundRange;
/// An error originating from the TiKV client or dependencies. /// An error originating from the TiKV client or dependencies.
@ -89,8 +90,8 @@ pub enum Error {
#[error("Region {} is not found in the response", region_id)] #[error("Region {} is not found in the response", region_id)]
RegionNotFoundInResponse { region_id: u64 }, RegionNotFoundInResponse { region_id: u64 },
/// No leader is found for the given id. /// No leader is found for the given id.
#[error("Leader of region {} is not found", region_id)] #[error("Leader of region {} is not found", region.id)]
LeaderNotFound { region_id: u64 }, LeaderNotFound { region: RegionVerId },
/// Scan limit exceeds the maximum /// Scan limit exceeds the maximum
#[error("Limit {} exceeds max scan limit {}", limit, max_limit)] #[error("Limit {} exceeds max scan limit {}", limit, max_limit)]
MaxScanLimitExceeded { limit: u32, max_limit: u32 }, MaxScanLimitExceeded { limit: u32, max_limit: u32 },

View File

@ -47,7 +47,7 @@ impl RegionWithLeader {
self.leader self.leader
.as_ref() .as_ref()
.ok_or(Error::LeaderNotFound { .ok_or(Error::LeaderNotFound {
region_id: self.region.id, region: self.ver_id(),
}) })
.map(|l| { .map(|l| {
let mut ctx = kvrpcpb::Context::default(); let mut ctx = kvrpcpb::Context::default();
@ -89,7 +89,7 @@ impl RegionWithLeader {
.as_ref() .as_ref()
.cloned() .cloned()
.ok_or_else(|| Error::LeaderNotFound { .ok_or_else(|| Error::LeaderNotFound {
region_id: self.id(), region: self.ver_id(),
}) })
.map(|s| s.store_id) .map(|s| s.store_id)
} }

View File

@ -17,6 +17,7 @@ use crate::pd::PdClient;
use crate::proto::errorpb; use crate::proto::errorpb;
use crate::proto::errorpb::EpochNotMatch; use crate::proto::errorpb::EpochNotMatch;
use crate::proto::kvrpcpb; use crate::proto::kvrpcpb;
use crate::region::RegionVerId;
use crate::request::shard::HasNextBatch; use crate::request::shard::HasNextBatch;
use crate::request::NextBatch; use crate::request::NextBatch;
use crate::request::Shardable; use crate::request::Shardable;
@ -115,13 +116,10 @@ where
let shards = current_plan.shards(&pd_client).collect::<Vec<_>>().await; let shards = current_plan.shards(&pd_client).collect::<Vec<_>>().await;
let mut handles = Vec::new(); let mut handles = Vec::new();
for shard in shards { for shard in shards {
let (shard, region_store) = shard?;
let mut clone = current_plan.clone();
clone.apply_shard(shard, &region_store)?;
let handle = tokio::spawn(Self::single_shard_handler( let handle = tokio::spawn(Self::single_shard_handler(
pd_client.clone(), pd_client.clone(),
clone, current_plan.clone(),
region_store, shard,
backoff.clone(), backoff.clone(),
permits.clone(), permits.clone(),
preserve_region_results, preserve_region_results,
@ -152,12 +150,31 @@ where
#[async_recursion] #[async_recursion]
async fn single_shard_handler( async fn single_shard_handler(
pd_client: Arc<PdC>, pd_client: Arc<PdC>,
plan: P, mut plan: P,
region_store: RegionStore, shard: Result<(<P as Shardable>::Shard, RegionStore)>,
mut backoff: Backoff, mut backoff: Backoff,
permits: Arc<Semaphore>, permits: Arc<Semaphore>,
preserve_region_results: bool, preserve_region_results: bool,
) -> Result<<Self as Plan>::Result> { ) -> Result<<Self as Plan>::Result> {
let region_store = match shard.and_then(|(shard, region_store)| {
plan.apply_shard(shard, &region_store).map(|_| region_store)
}) {
Ok(region_store) => region_store,
Err(Error::LeaderNotFound { region }) => {
return Self::handle_other_error(
pd_client,
plan,
region.clone(),
backoff,
permits,
preserve_region_results,
Error::LeaderNotFound { region },
)
.await
}
Err(err) => return Err(err),
};
// limit concurrent requests // limit concurrent requests
let permit = permits.acquire().await.unwrap(); let permit = permits.acquire().await.unwrap();
let res = plan.execute().await; let res = plan.execute().await;
@ -166,10 +183,10 @@ where
let mut resp = match res { let mut resp = match res {
Ok(resp) => resp, Ok(resp) => resp,
Err(e) if is_grpc_error(&e) => { Err(e) if is_grpc_error(&e) => {
return Self::handle_grpc_error( return Self::handle_other_error(
pd_client, pd_client,
plan, plan,
region_store, region_store.region_with_leader.ver_id(),
backoff, backoff,
permits, permits,
preserve_region_results, preserve_region_results,
@ -303,18 +320,17 @@ where
Ok(false) Ok(false)
} }
async fn handle_grpc_error( async fn handle_other_error(
pd_client: Arc<PdC>, pd_client: Arc<PdC>,
plan: P, plan: P,
region_store: RegionStore, region: RegionVerId,
mut backoff: Backoff, mut backoff: Backoff,
permits: Arc<Semaphore>, permits: Arc<Semaphore>,
preserve_region_results: bool, preserve_region_results: bool,
e: Error, e: Error,
) -> Result<<Self as Plan>::Result> { ) -> Result<<Self as Plan>::Result> {
debug!("handle grpc error: {:?}", e); debug!("handle grpc error: {:?}", e);
let ver_id = region_store.region_with_leader.ver_id(); pd_client.invalidate_region_cache(region).await;
pd_client.invalidate_region_cache(ver_id).await;
match backoff.next_delay_duration() { match backoff.next_delay_duration() {
Some(duration) => { Some(duration) => {
sleep(duration).await; sleep(duration).await;