Merge pull request #179 from ekexium/fix-raw-scan-limit

Fix limit problem in raw_scan and unbouned problem in batch_scan
This commit is contained in:
Nick Cameron 2020-10-13 09:31:56 +13:00 committed by GitHub
commit d9935c2936
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 5 deletions

View File

@ -133,7 +133,7 @@ pub trait PdClient: Send + Sync + 'static {
.boxed()
}
// Returns a Steam which iterates over the contexts for ranges in the same region.
// Returns a Stream which iterates over the contexts for ranges in the same region.
fn group_ranges_by_region(
self: Arc<Self>,
mut ranges: Vec<BoundRange>,
@ -153,7 +153,10 @@ pub trait PdClient: Send + Sync + 'static {
let region_end = region.end_key();
let mut grouped = vec![];
if !region_end.is_empty()
&& end_key.clone().map(|x| x > region_end).unwrap_or(true)
&& end_key
.clone()
.map(|x| x > region_end || x.is_empty())
.unwrap_or(true)
{
grouped.push((start_key, region_end.clone()).into());
ranges.push((region_end, end_key).into());
@ -168,7 +171,10 @@ pub trait PdClient: Send + Sync + 'static {
break;
}
if !region_end.is_empty()
&& end_key.clone().map(|x| x > region_end).unwrap_or(true)
&& end_key
.clone()
.map(|x| x > region_end || x.is_empty())
.unwrap_or(true)
{
grouped.push((start_key, region_end.clone()).into());
ranges.push((region_end, end_key).into());

View File

@ -252,9 +252,13 @@ impl Client {
return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT));
}
requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone())
let res = requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone())
.execute(self.rpc.clone())
.await
.await;
res.map(|mut s| {
s.truncate(limit as usize);
s
})
}
/// Create a new 'batch scan' request.

View File

@ -438,6 +438,63 @@ async fn raw_req() -> Fallible<()> {
Fallible::Ok(())
}
/// Tests raw API when there are multiple regions.
/// Write large volumes of data to enforce region splitting.
/// In order to test `scan`, data is uniformly inserted.
///
/// Ignoring this because we don't want to mess up transactional tests.
#[tokio::test]
#[serial]
#[ignore]
async fn raw_write_million() -> Fallible<()> {
const NUM_BITS_TXN: u32 = 9;
const NUM_BITS_KEY_PER_TXN: u32 = 10;
let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN);
clear_tikv().await?;
let config = Config::new(pd_addrs());
let client = RawClient::new(config).await?;
for i in 0..2u32.pow(NUM_BITS_TXN) {
let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN);
let keys = iter::repeat_with(|| {
let v = cur;
cur = cur.overflowing_add(interval).0;
v
})
.map(|u| u.to_be_bytes().to_vec())
.take(2usize.pow(NUM_BITS_KEY_PER_TXN))
.collect::<Vec<_>>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13
client
.batch_put(
keys.iter()
.cloned()
.zip(iter::repeat(1u32.to_be_bytes().to_vec())),
)
.await?;
let res = client.batch_get(keys).await?;
assert_eq!(res.len(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
}
// test scan
let limit = 10;
let res = client.scan(vec![].., limit).await?;
assert_eq!(res.len(), limit as usize);
// test batch_scan
for batch_num in 1..4 {
let _ = client
.batch_scan(iter::repeat(vec![]..).take(batch_num), limit)
.await?;
// FIXME: `each_limit` parameter does no work as expected.
// It limits the entries on each region of each rangqe, instead of each range.
// assert_eq!(res.len(), limit as usize * batch_num);
}
Fallible::Ok(())
}
// helper function
async fn get_u32(client: &RawClient, key: Vec<u8>) -> Fallible<u32> {
let x = client.get(key).await?.unwrap();