mirror of https://github.com/tikv/client-rust.git
address review issues
Signed-off-by: haojinming <jinming.hao@pingcap.com>
This commit is contained in:
parent
1628598ddc
commit
0c2a5d329a
25
src/store.rs
25
src/store.rs
|
@ -53,7 +53,11 @@ pub fn store_stream_for_range<PdC: PdClient>(
|
||||||
range: (Vec<u8>, Vec<u8>),
|
range: (Vec<u8>, Vec<u8>),
|
||||||
pd_client: Arc<PdC>,
|
pd_client: Arc<PdC>,
|
||||||
) -> BoxStream<'static, Result<((Vec<u8>, Vec<u8>), RegionStore)>> {
|
) -> BoxStream<'static, Result<((Vec<u8>, Vec<u8>), RegionStore)>> {
|
||||||
let bnd_range = BoundRange::from(range.clone());
|
let bnd_range = if range.1.is_empty() {
|
||||||
|
BoundRange::range_from(range.0.clone().into())
|
||||||
|
} else {
|
||||||
|
BoundRange::from(range.clone())
|
||||||
|
};
|
||||||
pd_client
|
pd_client
|
||||||
.stores_for_range(bnd_range)
|
.stores_for_range(bnd_range)
|
||||||
.map_ok(move |store| {
|
.map_ok(move |store| {
|
||||||
|
@ -67,25 +71,6 @@ pub fn store_stream_for_range<PdC: PdClient>(
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn store_stream_for_range_by_start_key<PdC: PdClient>(
|
|
||||||
start_key: Key,
|
|
||||||
pd_client: Arc<PdC>,
|
|
||||||
) -> BoxStream<'static, Result<(Vec<u8>, RegionStore)>> {
|
|
||||||
let bnd_range = BoundRange::range_from(start_key.clone());
|
|
||||||
pd_client
|
|
||||||
.stores_for_range(bnd_range)
|
|
||||||
.map_ok(move |store| {
|
|
||||||
let region_range = store.region_with_leader.range();
|
|
||||||
(
|
|
||||||
range_intersection(region_range, (start_key.clone(), vec![].into()))
|
|
||||||
.0
|
|
||||||
.into(),
|
|
||||||
store,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.boxed()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The range used for request should be the intersection of `region_range` and `range`.
|
/// The range used for request should be the intersection of `region_range` and `range`.
|
||||||
fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) {
|
fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) {
|
||||||
let (lower, upper) = region_range;
|
let (lower, upper) = region_range;
|
||||||
|
|
|
@ -7,7 +7,7 @@ use crate::{
|
||||||
Collect, CollectSingle, CollectWithShard, DefaultProcessor, HasNextBatch, KvRequest, Merge,
|
Collect, CollectSingle, CollectWithShard, DefaultProcessor, HasNextBatch, KvRequest, Merge,
|
||||||
NextBatch, Process, ResponseWithShard, Shardable, SingleKey,
|
NextBatch, Process, ResponseWithShard, Shardable, SingleKey,
|
||||||
},
|
},
|
||||||
store::{store_stream_for_keys, store_stream_for_range_by_start_key, RegionStore},
|
store::{store_stream_for_keys, store_stream_for_range, RegionStore},
|
||||||
timestamp::TimestampExt,
|
timestamp::TimestampExt,
|
||||||
transaction::HasLocks,
|
transaction::HasLocks,
|
||||||
util::iter::FlatMapOkIterExt,
|
util::iter::FlatMapOkIterExt,
|
||||||
|
@ -462,18 +462,21 @@ impl KvRequest for kvrpcpb::ScanLockRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Shardable for kvrpcpb::ScanLockRequest {
|
impl Shardable for kvrpcpb::ScanLockRequest {
|
||||||
type Shard = Vec<u8>;
|
type Shard = (Vec<u8>, Vec<u8>);
|
||||||
|
|
||||||
fn shards(
|
fn shards(
|
||||||
&self,
|
&self,
|
||||||
pd_client: &Arc<impl PdClient>,
|
pd_client: &Arc<impl PdClient>,
|
||||||
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
|
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
|
||||||
store_stream_for_range_by_start_key(self.start_key.clone().into(), pd_client.clone())
|
store_stream_for_range(
|
||||||
|
(self.start_key.clone().into(), self.end_key.clone().into()),
|
||||||
|
pd_client.clone(),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
|
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
|
||||||
self.set_context(store.region_with_leader.context()?);
|
self.set_context(store.region_with_leader.context()?);
|
||||||
self.set_start_key(shard);
|
self.set_start_key(shard.0);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -481,6 +484,7 @@ impl Shardable for kvrpcpb::ScanLockRequest {
|
||||||
impl HasNextBatch for kvrpcpb::ScanLockResponse {
|
impl HasNextBatch for kvrpcpb::ScanLockResponse {
|
||||||
fn has_next_batch(&self) -> Option<(Vec<u8>, Vec<u8>)> {
|
fn has_next_batch(&self) -> Option<(Vec<u8>, Vec<u8>)> {
|
||||||
self.get_locks().last().map(|lock| {
|
self.get_locks().last().map(|lock| {
|
||||||
|
// TODO: if last key is larger or equal than ScanLockRequest.end_key, return None.
|
||||||
let mut start_key: Vec<u8> = lock.get_key().to_vec();
|
let mut start_key: Vec<u8> = lock.get_key().to_vec();
|
||||||
start_key.push(0);
|
start_key.push(0);
|
||||||
(start_key, vec![])
|
(start_key, vec![])
|
||||||
|
|
|
@ -209,7 +209,7 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> {
|
||||||
let keys = write_data(&client, true, true).await?;
|
let keys = write_data(&client, true, true).await?;
|
||||||
assert_eq!(count_locks(&client).await?, keys.len());
|
assert_eq!(count_locks(&client).await?, keys.len());
|
||||||
|
|
||||||
info!(logger, "total keys {}", keys.len());
|
info!(logger, "total keys' count {}", keys.len());
|
||||||
let mut sorted_keys: Vec<Vec<u8>> = Vec::from_iter(keys.clone().into_iter());
|
let mut sorted_keys: Vec<Vec<u8>> = Vec::from_iter(keys.clone().into_iter());
|
||||||
sorted_keys.sort();
|
sorted_keys.sort();
|
||||||
let start_key = sorted_keys[1].clone();
|
let start_key = sorted_keys[1].clone();
|
||||||
|
|
Loading…
Reference in New Issue