diff --git a/examples/transaction.rs b/examples/transaction.rs index eeb6e28..a05495d 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -19,9 +19,9 @@ async fn get(client: &Client, key: Key) -> Option { txn.get(key).await.expect("Could not get value") } -async fn scan(client: &Client, range: impl Into) { +async fn scan(client: &Client, range: impl Into, limit: u32) { let mut txn = client.begin().await.expect("Could not begin a transaction"); - txn.scan(range) + txn.scan(range, limit) .await .expect("Could not scan key-value pairs in rnage") .for_each(|pair| println!("{:?}", pair)); diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index c7b937d..4fbc4e2 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -173,11 +173,13 @@ impl KvRequest for kvrpcpb::ScanRequest { pub fn new_mvcc_scan_request( range: impl Into, timestamp: Timestamp, + limit: u32, ) -> kvrpcpb::ScanRequest { let (start_key, end_key) = range.into().into_keys(); let mut req = kvrpcpb::ScanRequest::default(); req.set_start_key(start_key.into()); req.set_end_key(end_key.unwrap_or_default().into()); + req.set_limit(limit); req.set_version(timestamp.into_version()); req } diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index e4bcbbb..8d5a44b 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -28,8 +28,12 @@ impl Snapshot { self.transaction.batch_get(keys).await } - pub async fn scan(&self, range: impl Into) -> Result> { - self.transaction.scan(range).await + pub async fn scan( + &self, + range: impl Into, + limit: u32, + ) -> Result> { + self.transaction.scan(range, limit).await } pub fn scan_reverse(&self, range: impl RangeBounds) -> BoxStream> { diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index f46c612..270fbb4 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -121,7 +121,7 @@ impl Transaction { /// let key1: Key = b"TiKV".to_vec().into(); /// let key2: Key = b"TiDB".to_vec().into(); /// let result: Vec = txn - /// .scan(key1..key2) + /// .scan(key1..key2, std::u32::MAX) /// .await /// .unwrap() /// .collect(); @@ -129,13 +129,18 @@ impl Transaction { /// txn.commit().await.unwrap(); /// # }); /// ``` - pub async fn scan(&self, range: impl Into) -> Result> { + pub async fn scan( + &self, + range: impl Into, + limit: u32, + ) -> Result> { // TODO: determine params and pass them to `new_mvcc_scan_request` - // - limit // - key_only let timestamp = self.timestamp; let rpc = self.rpc.clone(); - let pairs = new_mvcc_scan_request(range, timestamp).execute(rpc).await?; + let pairs = new_mvcc_scan_request(range, timestamp, limit) + .execute(rpc) + .await?; Ok(pairs.into_iter()) }