diff --git a/src/pd/client.rs b/src/pd/client.rs index b03dbfa..25d1b33 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -133,7 +133,7 @@ pub trait PdClient: Send + Sync + 'static { .boxed() } - // Returns a Steam which iterates over the contexts for ranges in the same region. + // Returns a Stream which iterates over the contexts for ranges in the same region. fn group_ranges_by_region( self: Arc, mut ranges: Vec, diff --git a/src/raw/client.rs b/src/raw/client.rs index a425b42..7d1dc23 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -250,9 +250,13 @@ impl Client { return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT)); } - requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) + let res = requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) .execute(self.rpc.clone()) - .await + .await; + res.map(|mut s| { + s.truncate(limit as usize); + s + }) } /// Create a new 'batch scan' request. diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index ac930ed..b38e543 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -436,6 +436,61 @@ async fn raw_req() -> Fallible<()> { Fallible::Ok(()) } +/// Tests raw API when there are multiple regions. +/// Write large volumes of data to enforce region splitting. +/// In order to test `scan`, data is uniformly inserted. +/// +/// Ignoring this because we don't want to mess up transactional tests. +#[tokio::test] +#[serial] +#[ignore] +async fn raw_write_million() -> Fallible<()> { + const NUM_BITS_TXN: u32 = 9; + const NUM_BITS_KEY_PER_TXN: u32 = 10; + let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN); + + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = RawClient::new(config).await?; + + for i in 0..2u32.pow(NUM_BITS_TXN) { + let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN); + let keys = iter::repeat_with(|| { + let v = cur; + cur = cur.overflowing_add(interval).0; + v + }) + .map(|u| u.to_be_bytes().to_vec()) + .take(2usize.pow(NUM_BITS_KEY_PER_TXN)) + .collect::>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13 + client + .batch_put( + keys.iter() + .cloned() + .zip(iter::repeat(1u32.to_be_bytes().to_vec())), + ) + .await?; + + let res = client.batch_get(keys).await?; + assert_eq!(res.len(), 2usize.pow(NUM_BITS_KEY_PER_TXN)); + } + + // test scan + let limit = 10; + let res = client.scan(vec![].., limit).await?; + assert_eq!(res.len(), limit as usize); + + // test batch_scan + for batch_num in 4..8 { + let res = client + .batch_scan(iter::repeat(vec![]..).take(batch_num), limit) + .await?; + assert_eq!(res.len(), limit as usize * batch_num); + } + + Fallible::Ok(()) +} + // helper function async fn get_u32(client: &RawClient, key: Vec) -> Fallible { let x = client.get(key).await?.unwrap();