From 0c2a5d329aa31dcb8414b0f86b971f297115ebe4 Mon Sep 17 00:00:00 2001 From: haojinming Date: Wed, 4 Jan 2023 11:07:42 +0800 Subject: [PATCH] address review issues Signed-off-by: haojinming --- src/store.rs | 25 +++++-------------------- src/transaction/requests.rs | 12 ++++++++---- tests/failpoint_tests.rs | 2 +- 3 files changed, 14 insertions(+), 25 deletions(-) diff --git a/src/store.rs b/src/store.rs index 44c56a0..7e6eabd 100644 --- a/src/store.rs +++ b/src/store.rs @@ -53,7 +53,11 @@ pub fn store_stream_for_range( range: (Vec, Vec), pd_client: Arc, ) -> BoxStream<'static, Result<((Vec, Vec), RegionStore)>> { - let bnd_range = BoundRange::from(range.clone()); + let bnd_range = if range.1.is_empty() { + BoundRange::range_from(range.0.clone().into()) + } else { + BoundRange::from(range.clone()) + }; pd_client .stores_for_range(bnd_range) .map_ok(move |store| { @@ -67,25 +71,6 @@ pub fn store_stream_for_range( .boxed() } -pub fn store_stream_for_range_by_start_key( - start_key: Key, - pd_client: Arc, -) -> BoxStream<'static, Result<(Vec, RegionStore)>> { - let bnd_range = BoundRange::range_from(start_key.clone()); - pd_client - .stores_for_range(bnd_range) - .map_ok(move |store| { - let region_range = store.region_with_leader.range(); - ( - range_intersection(region_range, (start_key.clone(), vec![].into())) - .0 - .into(), - store, - ) - }) - .boxed() -} - /// The range used for request should be the intersection of `region_range` and `range`. fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) { let (lower, upper) = region_range; diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 99e11ff..79002d8 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -7,7 +7,7 @@ use crate::{ Collect, CollectSingle, CollectWithShard, DefaultProcessor, HasNextBatch, KvRequest, Merge, NextBatch, Process, ResponseWithShard, Shardable, SingleKey, }, - store::{store_stream_for_keys, store_stream_for_range_by_start_key, RegionStore}, + store::{store_stream_for_keys, store_stream_for_range, RegionStore}, timestamp::TimestampExt, transaction::HasLocks, util::iter::FlatMapOkIterExt, @@ -462,18 +462,21 @@ impl KvRequest for kvrpcpb::ScanLockRequest { } impl Shardable for kvrpcpb::ScanLockRequest { - type Shard = Vec; + type Shard = (Vec, Vec); fn shards( &self, pd_client: &Arc, ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_range_by_start_key(self.start_key.clone().into(), pd_client.clone()) + store_stream_for_range( + (self.start_key.clone().into(), self.end_key.clone().into()), + pd_client.clone(), + ) } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { self.set_context(store.region_with_leader.context()?); - self.set_start_key(shard); + self.set_start_key(shard.0); Ok(()) } } @@ -481,6 +484,7 @@ impl Shardable for kvrpcpb::ScanLockRequest { impl HasNextBatch for kvrpcpb::ScanLockResponse { fn has_next_batch(&self) -> Option<(Vec, Vec)> { self.get_locks().last().map(|lock| { + // TODO: if last key is larger or equal than ScanLockRequest.end_key, return None. let mut start_key: Vec = lock.get_key().to_vec(); start_key.push(0); (start_key, vec![]) diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index f281d65..52b0905 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -209,7 +209,7 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> { let keys = write_data(&client, true, true).await?; assert_eq!(count_locks(&client).await?, keys.len()); - info!(logger, "total keys {}", keys.len()); + info!(logger, "total keys' count {}", keys.len()); let mut sorted_keys: Vec> = Vec::from_iter(keys.clone().into_iter()); sorted_keys.sort(); let start_key = sorted_keys[1].clone();