mirror of https://github.com/tikv/client-rust.git
removing the lambda to simplify the retry logic
Signed-off-by: limbooverlambda <schakra1@gmail.com>
This commit is contained in:
parent
181cde120a
commit
3ee7efffda
|
@ -1,7 +1,6 @@
|
||||||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||||
use core::ops::Range;
|
use core::ops::Range;
|
||||||
use std::future::Future;
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
@ -14,7 +13,7 @@ use crate::config::Config;
|
||||||
use crate::pd::PdClient;
|
use crate::pd::PdClient;
|
||||||
use crate::pd::PdRpcClient;
|
use crate::pd::PdRpcClient;
|
||||||
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
|
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
|
||||||
use crate::proto::{errorpb, metapb};
|
use crate::proto::metapb;
|
||||||
use crate::raw::lowering::*;
|
use crate::raw::lowering::*;
|
||||||
use crate::request::CollectSingle;
|
use crate::request::CollectSingle;
|
||||||
use crate::request::EncodeKeyspace;
|
use crate::request::EncodeKeyspace;
|
||||||
|
@ -766,10 +765,6 @@ impl<PdC: PdClient> Client<PdC> {
|
||||||
let (start_key, end_key) = range.clone().into_keys();
|
let (start_key, end_key) = range.clone().into_keys();
|
||||||
let mut current_key: Key = start_key;
|
let mut current_key: Key = start_key;
|
||||||
|
|
||||||
let region_error_handler =
|
|
||||||
|pd_rpc_client: Arc<PdC>, err: errorpb::Error, store: RegionStore| {
|
|
||||||
Box::pin(plan::handle_region_error(pd_rpc_client, err, store))
|
|
||||||
} as _;
|
|
||||||
while current_limit > 0 {
|
while current_limit > 0 {
|
||||||
let scan_args = ScanInnerArgs {
|
let scan_args = ScanInnerArgs {
|
||||||
start_key: current_key.clone(),
|
start_key: current_key.clone(),
|
||||||
|
@ -779,7 +774,7 @@ impl<PdC: PdClient> Client<PdC> {
|
||||||
reverse,
|
reverse,
|
||||||
backoff: backoff.clone(),
|
backoff: backoff.clone(),
|
||||||
};
|
};
|
||||||
let (res, next_key) = self.retryable_scan(scan_args, region_error_handler).await?;
|
let (res, next_key) = self.retryable_scan(scan_args).await?;
|
||||||
|
|
||||||
let mut kvs = res
|
let mut kvs = res
|
||||||
.map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
|
.map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
|
||||||
|
@ -805,18 +800,10 @@ impl<PdC: PdClient> Client<PdC> {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn retryable_scan<'a, F>(
|
async fn retryable_scan(
|
||||||
&self,
|
&self,
|
||||||
mut scan_args: ScanInnerArgs,
|
mut scan_args: ScanInnerArgs,
|
||||||
mut error_handler: F,
|
) -> Result<(Option<RawScanResponse>, Key)> {
|
||||||
) -> Result<(Option<RawScanResponse>, Key)>
|
|
||||||
where
|
|
||||||
F: FnMut(
|
|
||||||
Arc<PdC>,
|
|
||||||
errorpb::Error,
|
|
||||||
RegionStore,
|
|
||||||
) -> Pin<Box<dyn Future<Output = Result<bool>>>>,
|
|
||||||
{
|
|
||||||
let start_key = scan_args.start_key;
|
let start_key = scan_args.start_key;
|
||||||
|
|
||||||
let region = self.rpc.clone().region_for_key(&start_key).await?;
|
let region = self.rpc.clone().region_for_key(&start_key).await?;
|
||||||
|
@ -834,7 +821,8 @@ impl<PdC: PdClient> Client<PdC> {
|
||||||
Ok(mut r) => {
|
Ok(mut r) => {
|
||||||
if let Some(err) = r.region_error() {
|
if let Some(err) = r.region_error() {
|
||||||
let status =
|
let status =
|
||||||
error_handler(self.rpc.clone(), err.clone(), store.clone()).await?;
|
plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone())
|
||||||
|
.await?;
|
||||||
if status {
|
if status {
|
||||||
continue;
|
continue;
|
||||||
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
|
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
|
||||||
|
|
Loading…
Reference in New Issue