fix limit of raw_scan

Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
ekexium 2020-09-29 15:20:42 +08:00
parent ad8ef075af
commit bcd18b3e53
3 changed files with 62 additions and 3 deletions

View File

@ -133,7 +133,7 @@ pub trait PdClient: Send + Sync + 'static {
.boxed() .boxed()
} }
// Returns a Steam which iterates over the contexts for ranges in the same region. // Returns a Stream which iterates over the contexts for ranges in the same region.
fn group_ranges_by_region( fn group_ranges_by_region(
self: Arc<Self>, self: Arc<Self>,
mut ranges: Vec<BoundRange>, mut ranges: Vec<BoundRange>,

View File

@ -250,9 +250,13 @@ impl Client {
return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT)); return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT));
} }
requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) let res = requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone())
.execute(self.rpc.clone()) .execute(self.rpc.clone())
.await .await;
res.map(|mut s| {
s.truncate(limit as usize);
s
})
} }
/// Create a new 'batch scan' request. /// Create a new 'batch scan' request.

View File

@ -436,6 +436,61 @@ async fn raw_req() -> Fallible<()> {
Fallible::Ok(()) Fallible::Ok(())
} }
/// Tests raw API when there are multiple regions.
/// Write large volumes of data to enforce region splitting.
/// In order to test `scan`, data is uniformly inserted.
///
/// Ignoring this because we don't want to mess up transactional tests.
#[tokio::test]
#[serial]
#[ignore]
async fn raw_write_million() -> Fallible<()> {
const NUM_BITS_TXN: u32 = 9;
const NUM_BITS_KEY_PER_TXN: u32 = 10;
let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN);
clear_tikv().await?;
let config = Config::new(pd_addrs());
let client = RawClient::new(config).await?;
for i in 0..2u32.pow(NUM_BITS_TXN) {
let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN);
let keys = iter::repeat_with(|| {
let v = cur;
cur = cur.overflowing_add(interval).0;
v
})
.map(|u| u.to_be_bytes().to_vec())
.take(2usize.pow(NUM_BITS_KEY_PER_TXN))
.collect::<Vec<_>>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13
client
.batch_put(
keys.iter()
.cloned()
.zip(iter::repeat(1u32.to_be_bytes().to_vec())),
)
.await?;
let res = client.batch_get(keys).await?;
assert_eq!(res.len(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
}
// test scan
let limit = 10;
let res = client.scan(vec![].., limit).await?;
assert_eq!(res.len(), limit as usize);
// test batch_scan
for batch_num in 4..8 {
let res = client
.batch_scan(iter::repeat(vec![]..).take(batch_num), limit)
.await?;
assert_eq!(res.len(), limit as usize * batch_num);
}
Fallible::Ok(())
}
// helper function // helper function
async fn get_u32(client: &RawClient, key: Vec<u8>) -> Fallible<u32> { async fn get_u32(client: &RawClient, key: Vec<u8>) -> Fallible<u32> {
let x = client.get(key).await?.unwrap(); let x = client.get(key).await?.unwrap();