adding retryable to scan

This commit is contained in:
limbooverlambda 2024-06-17 14:08:33 -07:00 committed by Supratik Chakraborty
parent 54fd72001b
commit dddf5198ac
2 changed files with 197 additions and 146 deletions

View File

@ -1,30 +1,35 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use async_recursion::async_recursion;
use core::ops::Range; use core::ops::Range;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::u32; use std::u32;
use futures::StreamExt;
use log::debug; use log::debug;
use tokio::sync::Semaphore;
use tokio::time::sleep;
use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::common::Error; use crate::common::Error;
use crate::config::Config; use crate::config::Config;
use crate::pd::PdClient; use crate::pd::PdClient;
use crate::pd::PdRpcClient; use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
use crate::proto::metapb; use crate::proto::metapb;
use crate::raw::lowering::*; use crate::raw::lowering::*;
use crate::request::Collect;
use crate::request::CollectSingle; use crate::request::CollectSingle;
use crate::request::EncodeKeyspace; use crate::request::EncodeKeyspace;
use crate::request::KeyMode; use crate::request::KeyMode;
use crate::request::Keyspace; use crate::request::Keyspace;
use crate::request::Plan; use crate::request::Plan;
use crate::request::TruncateKeyspace; use crate::request::TruncateKeyspace;
use crate::request::{plan, Collect};
use crate::store::{HasRegionError, RegionStore};
use crate::Backoff; use crate::Backoff;
use crate::BoundRange; use crate::BoundRange;
use crate::ColumnFamily; use crate::ColumnFamily;
use crate::Error::RegionError;
use crate::Key; use crate::Key;
use crate::KvPair; use crate::KvPair;
use crate::Result; use crate::Result;
@ -756,57 +761,42 @@ impl<PdC: PdClient> Client<PdC> {
max_limit: MAX_RAW_KV_SCAN_LIMIT, max_limit: MAX_RAW_KV_SCAN_LIMIT,
}); });
} }
let backoff = DEFAULT_STORE_BACKOFF;
let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); let permits = Arc::new(Semaphore::new(16));
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let mut result = Vec::new(); let mut result = Vec::new();
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed(); let mut current_limit = limit;
let mut region_store = let (start_key, end_key) = range.clone().into_keys();
scan_regions let mut current_key: Option<Key> = Some(start_key);
.next() while current_limit > 0 {
.await let scan_args = ScanInnerArgs {
.ok_or(Error::RegionForRangeNotFound { start_key: current_key.clone(),
range: (cur_range.clone()), range: range.clone(),
})??; limit,
let mut cur_limit = limit;
while cur_limit > 0 {
let request = new_raw_scan_request(
cur_range.clone(),
cur_limit,
key_only, key_only,
reverse, reverse,
self.cf.clone(), permits: permits.clone(),
); backoff: backoff.clone(),
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) };
.single_region_with_store(region_store.clone()) let (res, next_key) = self.retryable_scan(scan_args).await?;
.await?
.plan()
.execute()
.await?;
let mut region_scan_res = resp
.kvs
.into_iter()
.map(Into::into)
.collect::<Vec<KvPair>>();
let res_len = region_scan_res.len();
result.append(&mut region_scan_res);
// if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region let mut kvs = res
if res_len < cur_limit as usize { .map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
region_store = match scan_regions.next().await { .unwrap_or(Vec::new());
Some(Ok(rs)) => {
cur_range = BoundRange::new( if !kvs.is_empty() {
std::ops::Bound::Included(region_store.region_with_leader.range().1), current_limit -= kvs.len() as u32;
cur_range.to, result.append(&mut kvs);
); }
rs if end_key
} .as_ref()
Some(Err(e)) => return Err(e), .map(|ek| ek <= next_key.as_ref() && !ek.is_empty())
None => break, .unwrap_or(false)
}; || next_key.is_empty()
cur_limit -= res_len as u32; {
} else {
break; break;
} else {
current_key = Some(next_key);
} }
} }
@ -819,6 +809,61 @@ impl<PdC: PdClient> Client<PdC> {
Ok(result) Ok(result)
} }
#[async_recursion]
async fn retryable_scan(
&self,
mut scan_args: ScanInnerArgs,
) -> Result<(Option<RawScanResponse>, Key)> {
let start_key = match scan_args.start_key {
None => return Ok((None, Key::EMPTY)),
Some(ref sk) => sk,
};
let permit = scan_args.permits.acquire().await.unwrap();
let region = self.rpc.clone().region_for_key(start_key).await?;
let store = self.rpc.clone().store_for_id(region.id()).await?;
let request = new_raw_scan_request(
scan_args.range.clone(),
scan_args.limit,
scan_args.key_only,
scan_args.reverse,
self.cf.clone(),
);
let resp = self.do_store_scan(store.clone(), request).await;
drop(permit);
match resp {
Ok(mut r) => {
if let Some(err) = r.region_error() {
let status =
plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone())
.await?;
return if status {
self.retryable_scan(scan_args.clone()).await
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
sleep(duration).await;
self.retryable_scan(scan_args.clone()).await
} else {
Err(RegionError(Box::new(err)))
};
}
Ok((Some(r), region.end_key()))
}
Err(err) => Err(err),
}
}
async fn do_store_scan(
&self,
store: RegionStore,
scan_request: RawScanRequest,
) -> Result<RawScanResponse> {
crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request)
.single_region_with_store(store.clone())
.await?
.plan()
.execute()
.await
}
async fn batch_scan_inner( async fn batch_scan_inner(
&self, &self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>, ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
@ -865,6 +910,17 @@ impl<PdC: PdClient> Client<PdC> {
} }
} }
#[derive(Clone)]
struct ScanInnerArgs {
start_key: Option<Key>,
range: BoundRange,
limit: u32,
key_only: bool,
reverse: bool,
permits: Arc<Semaphore>,
backoff: Backoff,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::any::Any; use std::any::Any;

View File

@ -187,7 +187,7 @@ where
match backoff.next_delay_duration() { match backoff.next_delay_duration() {
Some(duration) => { Some(duration) => {
let region_error_resolved = let region_error_resolved =
Self::handle_region_error(pd_client.clone(), e, region_store).await?; handle_region_error(pd_client.clone(), e, region_store).await?;
// don't sleep if we have resolved the region error // don't sleep if we have resolved the region error
if !region_error_resolved { if !region_error_resolved {
sleep(duration).await; sleep(duration).await;
@ -208,102 +208,6 @@ where
} }
} }
// Returns
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
async fn handle_region_error(
pd_client: Arc<PdC>,
e: errorpb::Error,
region_store: RegionStore,
) -> Result<bool> {
let ver_id = region_store.region_with_leader.ver_id();
if let Some(not_leader) = e.not_leader {
if let Some(leader) = not_leader.leader {
match pd_client
.update_leader(region_store.region_with_leader.ver_id(), leader)
.await
{
Ok(_) => Ok(true),
Err(e) => {
pd_client.invalidate_region_cache(ver_id).await;
Err(e)
}
}
} else {
// The peer doesn't know who is the current leader. Generally it's because
// the Raft group is in an election, but it's possible that the peer is
// isolated and removed from the Raft group. So it's necessary to reload
// the region from PD.
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
} else if e.store_not_match.is_some() {
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
} else if e.epoch_not_match.is_some() {
Self::on_region_epoch_not_match(
pd_client.clone(),
region_store,
e.epoch_not_match.unwrap(),
)
.await
} else if e.stale_command.is_some() || e.region_not_found.is_some() {
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
} else if e.server_is_busy.is_some()
|| e.raft_entry_too_large.is_some()
|| e.max_timestamp_not_synced.is_some()
{
Err(Error::RegionError(Box::new(e)))
} else {
// TODO: pass the logger around
// info!("unknwon region error: {:?}", e);
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
}
// Returns
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
async fn on_region_epoch_not_match(
pd_client: Arc<PdC>,
region_store: RegionStore,
error: EpochNotMatch,
) -> Result<bool> {
let ver_id = region_store.region_with_leader.ver_id();
if error.current_regions.is_empty() {
pd_client.invalidate_region_cache(ver_id).await;
return Ok(true);
}
for r in error.current_regions {
if r.id == region_store.region_with_leader.id() {
let region_epoch = r.region_epoch.unwrap();
let returned_conf_ver = region_epoch.conf_ver;
let returned_version = region_epoch.version;
let current_region_epoch = region_store
.region_with_leader
.region
.region_epoch
.clone()
.unwrap();
let current_conf_ver = current_region_epoch.conf_ver;
let current_version = current_region_epoch.version;
// Find whether the current region is ahead of TiKV's. If so, backoff.
if returned_conf_ver < current_conf_ver || returned_version < current_version {
return Ok(false);
}
}
}
// TODO: finer grained processing
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
async fn handle_grpc_error( async fn handle_grpc_error(
pd_client: Arc<PdC>, pd_client: Arc<PdC>,
plan: P, plan: P,
@ -333,6 +237,97 @@ where
} }
} }
// Returns
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
pub(crate) async fn handle_region_error<PdC: PdClient>(
pd_client: Arc<PdC>,
e: errorpb::Error,
region_store: RegionStore,
) -> Result<bool> {
let ver_id = region_store.region_with_leader.ver_id();
if let Some(not_leader) = e.not_leader {
if let Some(leader) = not_leader.leader {
match pd_client
.update_leader(region_store.region_with_leader.ver_id(), leader)
.await
{
Ok(_) => Ok(true),
Err(e) => {
pd_client.invalidate_region_cache(ver_id).await;
Err(e)
}
}
} else {
// The peer doesn't know who is the current leader. Generally it's because
// the Raft group is in an election, but it's possible that the peer is
// isolated and removed from the Raft group. So it's necessary to reload
// the region from PD.
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
} else if e.store_not_match.is_some() {
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
} else if e.epoch_not_match.is_some() {
on_region_epoch_not_match(pd_client.clone(), region_store, e.epoch_not_match.unwrap()).await
} else if e.stale_command.is_some() || e.region_not_found.is_some() {
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
} else if e.server_is_busy.is_some()
|| e.raft_entry_too_large.is_some()
|| e.max_timestamp_not_synced.is_some()
{
Err(Error::RegionError(Box::new(e)))
} else {
// TODO: pass the logger around
// info!("unknwon region error: {:?}", e);
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
}
// Returns
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
pub(crate) async fn on_region_epoch_not_match<PdC: PdClient>(
pd_client: Arc<PdC>,
region_store: RegionStore,
error: EpochNotMatch,
) -> Result<bool> {
let ver_id = region_store.region_with_leader.ver_id();
if error.current_regions.is_empty() {
pd_client.invalidate_region_cache(ver_id).await;
return Ok(true);
}
for r in error.current_regions {
if r.id == region_store.region_with_leader.id() {
let region_epoch = r.region_epoch.unwrap();
let returned_conf_ver = region_epoch.conf_ver;
let returned_version = region_epoch.version;
let current_region_epoch = region_store
.region_with_leader
.region
.region_epoch
.clone()
.unwrap();
let current_conf_ver = current_region_epoch.conf_ver;
let current_version = current_region_epoch.version;
// Find whether the current region is ahead of TiKV's. If so, backoff.
if returned_conf_ver < current_conf_ver || returned_version < current_version {
return Ok(false);
}
}
}
// TODO: finer grained processing
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
impl<P: Plan, PdC: PdClient> Clone for RetryableMultiRegion<P, PdC> { impl<P: Plan, PdC: PdClient> Clone for RetryableMultiRegion<P, PdC> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
RetryableMultiRegion { RetryableMultiRegion {