Add limit param for transactional scan

Signed-off-by: Weihang Lo <me@weihanglo.tw>
This commit is contained in:
Weihang Lo 2020-05-05 07:38:38 +08:00
parent 35eeceb82c
commit 963e3b3368
No known key found for this signature in database
GPG Key ID: D7DBF189825E82E7
4 changed files with 19 additions and 8 deletions

View File

@ -19,9 +19,9 @@ async fn get(client: &Client, key: Key) -> Option<Value> {
txn.get(key).await.expect("Could not get value") txn.get(key).await.expect("Could not get value")
} }
async fn scan(client: &Client, range: impl Into<BoundRange>) { async fn scan(client: &Client, range: impl Into<BoundRange>, limit: u32) {
let mut txn = client.begin().await.expect("Could not begin a transaction"); let mut txn = client.begin().await.expect("Could not begin a transaction");
txn.scan(range) txn.scan(range, limit)
.await .await
.expect("Could not scan key-value pairs in rnage") .expect("Could not scan key-value pairs in rnage")
.for_each(|pair| println!("{:?}", pair)); .for_each(|pair| println!("{:?}", pair));

View File

@ -173,11 +173,13 @@ impl KvRequest for kvrpcpb::ScanRequest {
pub fn new_mvcc_scan_request( pub fn new_mvcc_scan_request(
range: impl Into<BoundRange>, range: impl Into<BoundRange>,
timestamp: Timestamp, timestamp: Timestamp,
limit: u32,
) -> kvrpcpb::ScanRequest { ) -> kvrpcpb::ScanRequest {
let (start_key, end_key) = range.into().into_keys(); let (start_key, end_key) = range.into().into_keys();
let mut req = kvrpcpb::ScanRequest::default(); let mut req = kvrpcpb::ScanRequest::default();
req.set_start_key(start_key.into()); req.set_start_key(start_key.into());
req.set_end_key(end_key.unwrap_or_default().into()); req.set_end_key(end_key.unwrap_or_default().into());
req.set_limit(limit);
req.set_version(timestamp.into_version()); req.set_version(timestamp.into_version());
req req
} }

View File

@ -28,8 +28,12 @@ impl Snapshot {
self.transaction.batch_get(keys).await self.transaction.batch_get(keys).await
} }
pub async fn scan(&self, range: impl Into<BoundRange>) -> Result<impl Iterator<Item = KvPair>> { pub async fn scan(
self.transaction.scan(range).await &self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
self.transaction.scan(range, limit).await
} }
pub fn scan_reverse(&self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> { pub fn scan_reverse(&self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {

View File

@ -121,7 +121,7 @@ impl Transaction {
/// let key1: Key = b"TiKV".to_vec().into(); /// let key1: Key = b"TiKV".to_vec().into();
/// let key2: Key = b"TiDB".to_vec().into(); /// let key2: Key = b"TiDB".to_vec().into();
/// let result: Vec<KvPair> = txn /// let result: Vec<KvPair> = txn
/// .scan(key1..key2) /// .scan(key1..key2, std::u32::MAX)
/// .await /// .await
/// .unwrap() /// .unwrap()
/// .collect(); /// .collect();
@ -129,13 +129,18 @@ impl Transaction {
/// txn.commit().await.unwrap(); /// txn.commit().await.unwrap();
/// # }); /// # });
/// ``` /// ```
pub async fn scan(&self, range: impl Into<BoundRange>) -> Result<impl Iterator<Item = KvPair>> { pub async fn scan(
&self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
// TODO: determine params and pass them to `new_mvcc_scan_request` // TODO: determine params and pass them to `new_mvcc_scan_request`
// - limit
// - key_only // - key_only
let timestamp = self.timestamp; let timestamp = self.timestamp;
let rpc = self.rpc.clone(); let rpc = self.rpc.clone();
let pairs = new_mvcc_scan_request(range, timestamp).execute(rpc).await?; let pairs = new_mvcc_scan_request(range, timestamp, limit)
.execute(rpc)
.await?;
Ok(pairs.into_iter()) Ok(pairs.into_iter())
} }