fix raw scan (#409)

* fix raw scan

Signed-off-by: Smityz <smityz@qq.com>

* fix

Signed-off-by: Smityz <smityz@qq.com>

---------

Signed-off-by: Smityz <smityz@qq.com>
This commit is contained in:
Smilencer 2023-07-23 02:59:32 +08:00 committed by GitHub
parent 8b3ada28ee
commit abf22ba680
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 148 additions and 18 deletions

View File

@ -4,6 +4,8 @@ use std::result;
use thiserror::Error;
use crate::BoundRange;
/// An error originating from the TiKV client or dependencies.
#[derive(Debug, Error)]
#[allow(clippy::large_enum_variant)]
@ -80,6 +82,8 @@ pub enum Error {
/// No region is found for the given key.
#[error("Region is not found for key: {:?}", key)]
RegionForKeyNotFound { key: Vec<u8> },
#[error("Region is not found for range: {:?}", range)]
RegionForRangeNotFound { range: BoundRange },
/// No region is found for the given id. note: distinguish it with the RegionNotFound error in errorpb.
#[error("Region {} is not found in the response", region_id)]
RegionNotFoundInResponse { region_id: u64 },

View File

@ -67,15 +67,15 @@ use crate::proto::kvrpcpb;
#[derive(Clone, Debug, Eq, PartialEq)]
#[cfg_attr(test, derive(Arbitrary))]
pub struct BoundRange {
from: Bound<Key>,
to: Bound<Key>,
pub from: Bound<Key>,
pub to: Bound<Key>,
}
impl BoundRange {
/// Create a new BoundRange.
///
/// The caller must ensure that `from` is not `Unbounded`.
fn new(from: Bound<Key>, to: Bound<Key>) -> BoundRange {
pub fn new(from: Bound<Key>, to: Bound<Key>) -> BoundRange {
BoundRange { from, to }
}

View File

@ -5,6 +5,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::u32;
use futures::StreamExt;
use log::debug;
use crate::backoff::DEFAULT_REGION_BACKOFF;
@ -591,17 +592,54 @@ impl<PdC: PdClient> Client<PdC> {
max_limit: MAX_RAW_KV_SCAN_LIMIT,
});
}
let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.merge(Collect)
.plan();
let res = plan.execute().await;
res.map(|mut s| {
s.truncate(limit as usize);
s
})
let mut result = Vec::new();
let mut cur_range = range.into();
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed();
let mut region_store =
scan_regions
.next()
.await
.ok_or(Error::RegionForRangeNotFound {
range: (cur_range.clone()),
})??;
let mut cur_limit = limit;
while cur_limit > 0 {
let request =
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region_with_store(region_store.clone())
.await?
.plan()
.execute()
.await?;
let mut region_scan_res = resp
.kvs
.into_iter()
.map(Into::into)
.collect::<Vec<KvPair>>();
let res_len = region_scan_res.len();
result.append(&mut region_scan_res);
// if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
if res_len < cur_limit as usize {
region_store = match scan_regions.next().await {
Some(Ok(rs)) => {
cur_range = BoundRange::new(
std::ops::Bound::Included(region_store.region_with_leader.range().1),
cur_range.to,
);
rs
}
Some(Err(e)) => return Err(e),
None => return Ok(result),
};
cur_limit -= res_len as u32;
} else {
break;
}
}
// limit is a soft limit, so we need check the number of results
result.truncate(limit as usize);
Ok(result)
}
async fn batch_scan_inner(

View File

@ -590,10 +590,98 @@ async fn raw_write_million() -> Result<()> {
assert_eq!(res.len(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
}
// test scan
let limit = 10;
let res = client.scan(vec![].., limit).await?;
assert_eq!(res.len(), limit as usize);
// test scan, key range from [0,0,0,0] to [255.0.0.0]
let mut limit = 2000;
let mut r = client.scan(.., limit).await?;
assert_eq!(r.len(), 256);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8);
}
r = client.scan(vec![100, 0, 0, 0].., limit).await?;
assert_eq!(r.len(), 156);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 100);
}
r = client
.scan(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), 195);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), 196);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 251);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 0);
r = client.scan(..vec![0, 0, 0, 0], limit).await?;
assert_eq!(r.len(), 0);
limit = 3;
let mut r = client.scan(.., limit).await?;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8);
}
r = client.scan(vec![100, 0, 0, 0].., limit).await?;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 100);
}
r = client
.scan(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 0);
r = client.scan(..vec![0, 0, 0, 0], limit).await?;
assert_eq!(r.len(), 0);
limit = 0;
r = client.scan(.., limit).await?;
assert_eq!(r.len(), limit as usize);
// test batch_scan
for batch_num in 1..4 {