mirror of https://github.com/tikv/client-rust.git
PR feedback along with a unit test
Signed-off-by: limbooverlambda <schakra1@gmail.com>
This commit is contained in:
parent
6e52912724
commit
da1785ed7f
|
@ -1,12 +1,11 @@
|
|||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||
|
||||
use async_recursion::async_recursion;
|
||||
use core::ops::Range;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use log::debug;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
|
||||
|
@ -15,7 +14,7 @@ use crate::config::Config;
|
|||
use crate::pd::PdClient;
|
||||
use crate::pd::PdRpcClient;
|
||||
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
|
||||
use crate::proto::metapb;
|
||||
use crate::proto::{errorpb, metapb};
|
||||
use crate::raw::lowering::*;
|
||||
use crate::request::CollectSingle;
|
||||
use crate::request::EncodeKeyspace;
|
||||
|
@ -761,12 +760,16 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
});
|
||||
}
|
||||
let backoff = DEFAULT_STORE_BACKOFF;
|
||||
let permits = Arc::new(Semaphore::new(16));
|
||||
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
|
||||
let mut result = Vec::new();
|
||||
let mut current_limit = limit;
|
||||
let (start_key, end_key) = range.clone().into_keys();
|
||||
let mut current_key: Option<Key> = Some(start_key);
|
||||
let mut current_key: Key = start_key;
|
||||
|
||||
let region_error_handler =
|
||||
|pd_rpc_client: Arc<PdC>, err: errorpb::Error, store: RegionStore| {
|
||||
Box::pin(plan::handle_region_error(pd_rpc_client, err, store))
|
||||
} as _;
|
||||
while current_limit > 0 {
|
||||
let scan_args = ScanInnerArgs {
|
||||
start_key: current_key.clone(),
|
||||
|
@ -774,10 +777,9 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
limit,
|
||||
key_only,
|
||||
reverse,
|
||||
permits: permits.clone(),
|
||||
backoff: backoff.clone(),
|
||||
};
|
||||
let (res, next_key) = self.retryable_scan(scan_args).await?;
|
||||
let (res, next_key) = self.retryable_scan(scan_args, region_error_handler).await?;
|
||||
|
||||
let mut kvs = res
|
||||
.map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
|
||||
|
@ -789,13 +791,13 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
}
|
||||
if end_key
|
||||
.as_ref()
|
||||
.map(|ek| ek <= next_key.as_ref() && !ek.is_empty())
|
||||
.map(|ek| ek <= next_key.as_ref())
|
||||
.unwrap_or(false)
|
||||
|| next_key.is_empty()
|
||||
{
|
||||
break;
|
||||
} else {
|
||||
current_key = Some(next_key);
|
||||
current_key = next_key;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -808,17 +810,21 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
#[async_recursion]
|
||||
async fn retryable_scan(
|
||||
async fn retryable_scan<'a, F>(
|
||||
&self,
|
||||
mut scan_args: ScanInnerArgs,
|
||||
) -> Result<(Option<RawScanResponse>, Key)> {
|
||||
let start_key = match scan_args.start_key {
|
||||
None => return Ok((None, Key::EMPTY)),
|
||||
Some(ref sk) => sk,
|
||||
};
|
||||
let permit = scan_args.permits.acquire().await.unwrap();
|
||||
let region = self.rpc.clone().region_for_key(start_key).await?;
|
||||
mut error_handler: F,
|
||||
) -> Result<(Option<RawScanResponse>, Key)>
|
||||
where
|
||||
F: FnMut(
|
||||
Arc<PdC>,
|
||||
errorpb::Error,
|
||||
RegionStore,
|
||||
) -> Pin<Box<dyn Future<Output = Result<bool>>>>,
|
||||
{
|
||||
let start_key = scan_args.start_key;
|
||||
|
||||
let region = self.rpc.clone().region_for_key(&start_key).await?;
|
||||
let store = self.rpc.clone().store_for_id(region.id()).await?;
|
||||
let request = new_raw_scan_request(
|
||||
scan_args.range.clone(),
|
||||
|
@ -827,26 +833,26 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
scan_args.reverse,
|
||||
self.cf.clone(),
|
||||
);
|
||||
let resp = self.do_store_scan(store.clone(), request).await;
|
||||
drop(permit);
|
||||
match resp {
|
||||
Ok(mut r) => {
|
||||
if let Some(err) = r.region_error() {
|
||||
let status =
|
||||
plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone())
|
||||
.await?;
|
||||
return if status {
|
||||
self.retryable_scan(scan_args.clone()).await
|
||||
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
|
||||
sleep(duration).await;
|
||||
self.retryable_scan(scan_args.clone()).await
|
||||
} else {
|
||||
Err(RegionError(Box::new(err)))
|
||||
};
|
||||
loop {
|
||||
let resp = self.do_store_scan(store.clone(), request.clone()).await;
|
||||
return match resp {
|
||||
Ok(mut r) => {
|
||||
if let Some(err) = r.region_error() {
|
||||
let status =
|
||||
error_handler(self.rpc.clone(), err.clone(), store.clone()).await?;
|
||||
if status {
|
||||
continue;
|
||||
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
|
||||
sleep(duration).await;
|
||||
continue;
|
||||
} else {
|
||||
return Err(RegionError(Box::new(err)));
|
||||
}
|
||||
}
|
||||
Ok((Some(r), region.end_key()))
|
||||
}
|
||||
Ok((Some(r), region.end_key()))
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
Err(err) => Err(err),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -911,23 +917,24 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
|
||||
#[derive(Clone)]
|
||||
struct ScanInnerArgs {
|
||||
start_key: Option<Key>,
|
||||
start_key: Key,
|
||||
range: BoundRange,
|
||||
limit: u32,
|
||||
key_only: bool,
|
||||
reverse: bool,
|
||||
permits: Arc<Semaphore>,
|
||||
backoff: Backoff,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::any::Any;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::mock::MockKvClient;
|
||||
use crate::mock::MockPdClient;
|
||||
use crate::proto::errorpb::EpochNotMatch;
|
||||
use crate::proto::kvrpcpb;
|
||||
use crate::Result;
|
||||
|
||||
|
@ -1009,4 +1016,63 @@ mod tests {
|
|||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_raw_scan_retryer() -> Result<()> {
|
||||
let epoch_not_match_error = EpochNotMatch {
|
||||
current_regions: vec![],
|
||||
};
|
||||
let region_error = errorpb::Error {
|
||||
epoch_not_match: Some(epoch_not_match_error),
|
||||
..Default::default()
|
||||
};
|
||||
let flag = Arc::new(AtomicBool::new(true));
|
||||
let tikv_flag = flag.clone();
|
||||
let error_handler_flag = flag.clone();
|
||||
let mock_tikv_client = MockKvClient::with_dispatch_hook(move |req: &dyn Any| {
|
||||
if let Some(_) = req.downcast_ref::<RawScanRequest>() {
|
||||
let v = tikv_flag.clone().load(Ordering::Relaxed);
|
||||
let resp = if v {
|
||||
RawScanResponse {
|
||||
region_error: Some(region_error.clone()),
|
||||
..Default::default()
|
||||
}
|
||||
} else {
|
||||
RawScanResponse {
|
||||
..Default::default()
|
||||
}
|
||||
};
|
||||
Ok(Box::new(resp) as Box<dyn Any>)
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
});
|
||||
let mock_pd_client = MockPdClient::new(mock_tikv_client);
|
||||
let client = Client {
|
||||
rpc: Arc::new(mock_pd_client),
|
||||
cf: Some(ColumnFamily::Default),
|
||||
backoff: DEFAULT_REGION_BACKOFF,
|
||||
atomic: false,
|
||||
keyspace: Keyspace::Enable { keyspace_id: 0 },
|
||||
};
|
||||
|
||||
let scan_args = ScanInnerArgs {
|
||||
start_key: "k1".to_string().into(),
|
||||
range: BoundRange::from("k1".to_owned().."k2".to_owned()),
|
||||
limit: 10,
|
||||
key_only: false,
|
||||
reverse: false,
|
||||
backoff: Backoff::no_backoff(),
|
||||
};
|
||||
let error_handler = |_, _, _| {
|
||||
error_handler_flag.clone().store(false, Ordering::Relaxed);
|
||||
Box::pin(async move {
|
||||
let res: Result<bool> = Ok(true);
|
||||
res
|
||||
})
|
||||
} as _;
|
||||
client.retryable_scan(scan_args, error_handler).await?;
|
||||
assert!(!error_handler_flag.load(Ordering::Relaxed));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue