diff --git a/src/pd/client.rs b/src/pd/client.rs index bdfb386..417fd19 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -85,7 +85,7 @@ pub trait PdClient: Send + Sync + 'static { fn group_keys_by_region( self: Arc, keys: impl Iterator + Send + Sync + 'static, - ) -> BoxStream<'static, Result<(RegionWithLeader, Vec)>> + ) -> BoxStream<'static, Result<(Vec, RegionWithLeader)>> where K: AsRef + Into + Send + Sync + 'static, K2: Send + Sync + 'static, @@ -103,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static { } grouped.push(keys.next().unwrap().into()); } - Ok(Some((keys, (region, grouped)))) + Ok(Some((keys, (grouped, region)))) } else { Ok(None) } @@ -113,10 +113,10 @@ pub trait PdClient: Send + Sync + 'static { } /// Returns a Stream which iterates over the contexts for each region covered by range. - fn stores_for_range( + fn regions_for_range( self: Arc, range: BoundRange, - ) -> BoxStream<'static, Result> { + ) -> BoxStream<'static, Result> { let (start_key, end_key) = range.into_keys(); stream_fn(Some(start_key), move |start_key| { let end_key = end_key.clone(); @@ -129,15 +129,14 @@ pub trait PdClient: Send + Sync + 'static { let region = this.region_for_key(&start_key).await?; let region_end = region.end_key(); - let store = this.map_region_to_store(region).await?; if end_key .map(|x| x <= region_end && !x.is_empty()) .unwrap_or(false) || region_end.is_empty() { - return Ok(Some((None, store))); + return Ok(Some((None, region))); } - Ok(Some((Some(region_end), store))) + Ok(Some((Some(region_end), region))) } }) .boxed() @@ -147,7 +146,7 @@ pub trait PdClient: Send + Sync + 'static { fn group_ranges_by_region( self: Arc, mut ranges: Vec, - ) -> BoxStream<'static, Result<(RegionWithLeader, Vec)>> { + ) -> BoxStream<'static, Result<(Vec, RegionWithLeader)>> { ranges.reverse(); stream_fn(Some(ranges), move |ranges| { let this = self.clone(); @@ -167,7 +166,7 @@ pub trait PdClient: Send + Sync + 'static { if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) { grouped.push(make_key_range(start_key.into(), region_end.clone().into())); ranges.push(make_key_range(region_end.into(), end_key.into())); - return Ok(Some((Some(ranges), (region, grouped)))); + return Ok(Some((Some(ranges), (grouped, region)))); } grouped.push(range); @@ -182,11 +181,11 @@ pub trait PdClient: Send + Sync + 'static { grouped .push(make_key_range(start_key.into(), region_end.clone().into())); ranges.push(make_key_range(region_end.into(), end_key.into())); - return Ok(Some((Some(ranges), (region, grouped)))); + return Ok(Some((Some(ranges), (grouped, region)))); } grouped.push(range); } - Ok(Some((Some(ranges), (region, grouped)))) + Ok(Some((Some(ranges), (grouped, region)))) } else { Ok(None) } @@ -397,7 +396,7 @@ pub mod test { let stream = Arc::new(client).group_keys_by_region(tasks.into_iter()); let mut stream = executor::block_on_stream(stream); - let result: Vec = stream.next().unwrap().unwrap().1; + let result: Vec = stream.next().unwrap().unwrap().0; assert_eq!( result, vec![ @@ -408,27 +407,27 @@ pub mod test { ] ); assert_eq!( - stream.next().unwrap().unwrap().1, + stream.next().unwrap().unwrap().0, vec![vec![12].into(), vec![11, 4].into()] ); assert!(stream.next().is_none()); } #[test] - fn test_stores_for_range() { + fn test_regions_for_range() { let client = Arc::new(MockPdClient::default()); let k1: Key = vec![1].into(); let k2: Key = vec![5, 2].into(); let k3: Key = vec![11, 4].into(); let range1 = (k1, k2.clone()).into(); - let mut stream = executor::block_on_stream(client.clone().stores_for_range(range1)); - assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1); + let mut stream = executor::block_on_stream(client.clone().regions_for_range(range1)); + assert_eq!(stream.next().unwrap().unwrap().id(), 1); assert!(stream.next().is_none()); let range2 = (k2, k3).into(); - let mut stream = executor::block_on_stream(client.stores_for_range(range2)); - assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1); - assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 2); + let mut stream = executor::block_on_stream(client.regions_for_range(range2)); + assert_eq!(stream.next().unwrap().unwrap().id(), 1); + assert_eq!(stream.next().unwrap().unwrap().id(), 2); assert!(stream.next().is_none()); } @@ -453,20 +452,20 @@ pub mod test { let ranges3 = stream.next().unwrap().unwrap(); let ranges4 = stream.next().unwrap().unwrap(); - assert_eq!(ranges1.0.id(), 1); + assert_eq!(ranges1.1.id(), 1); assert_eq!( - ranges1.1, + ranges1.0, vec![ make_key_range(k1.clone(), k2.clone()), make_key_range(k1.clone(), k_split.clone()), ] ); - assert_eq!(ranges2.0.id(), 2); - assert_eq!(ranges2.1, vec![make_key_range(k_split.clone(), k3.clone())]); - assert_eq!(ranges3.0.id(), 1); - assert_eq!(ranges3.1, vec![make_key_range(k2.clone(), k_split.clone())]); - assert_eq!(ranges4.0.id(), 2); - assert_eq!(ranges4.1, vec![make_key_range(k_split, k4.clone())]); + assert_eq!(ranges2.1.id(), 2); + assert_eq!(ranges2.0, vec![make_key_range(k_split.clone(), k3.clone())]); + assert_eq!(ranges3.1.id(), 1); + assert_eq!(ranges3.0, vec![make_key_range(k2.clone(), k_split.clone())]); + assert_eq!(ranges4.1.id(), 2); + assert_eq!(ranges4.0, vec![make_key_range(k_split, k4.clone())]); assert!(stream.next().is_none()); let range1 = make_key_range(k1.clone(), k2.clone()); @@ -477,11 +476,11 @@ pub mod test { let ranges1 = stream.next().unwrap().unwrap(); let ranges2 = stream.next().unwrap().unwrap(); let ranges3 = stream.next().unwrap().unwrap(); - assert_eq!(ranges1.0.id(), 1); - assert_eq!(ranges1.1, vec![make_key_range(k1, k2)]); - assert_eq!(ranges2.0.id(), 2); - assert_eq!(ranges2.1, vec![make_key_range(k3, k4)]); - assert_eq!(ranges3.0.id(), 3); - assert_eq!(ranges3.1, vec![make_key_range(k5, k6)]); + assert_eq!(ranges1.1.id(), 1); + assert_eq!(ranges1.0, vec![make_key_range(k1, k2)]); + assert_eq!(ranges2.1.id(), 2); + assert_eq!(ranges2.0, vec![make_key_range(k3, k4)]); + assert_eq!(ranges3.1.id(), 3); + assert_eq!(ranges3.0, vec![make_key_range(k5, k6)]); } } diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 4422c88..f6f32f3 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -22,8 +22,8 @@ use crate::request::SingleKey; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; -use crate::store::store_stream_for_keys; -use crate::store::store_stream_for_ranges; +use crate::store::region_stream_for_keys; +use crate::store::region_stream_for_ranges; use crate::store::RegionStore; use crate::store::Request; use crate::transaction::HasLocks; @@ -194,7 +194,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { let kvs = self.pairs.clone(); let ttls = self.ttls.clone(); let mut kv_ttl: Vec = kvs @@ -203,15 +203,17 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { .map(|(kv, ttl)| KvPairTTL(kv, ttl)) .collect(); kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key)); - store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) + region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard) { let (pairs, ttls) = shard.into_iter().unzip(); - self.set_leader(&store.region_with_leader)?; self.pairs = pairs; self.ttls = ttls; - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -344,14 +346,16 @@ impl Shardable for kvrpcpb::RawBatchScanRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_ranges(self.ranges.clone(), pd_client.clone()) + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + region_stream_for_ranges(self.ranges.clone(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, shard: Self::Shard) { self.ranges = shard; - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -470,14 +474,20 @@ impl Shardable for RawCoprocessorRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone()) + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + region_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard) { + self.inner.ranges = shard; + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { self.set_leader(&store.region_with_leader)?; - self.inner.ranges.clone_from(&shard); - self.inner.data = (self.data_builder)(store.region_with_leader.region.clone(), shard); + self.inner.data = (self.data_builder)( + store.region_with_leader.region.clone(), + self.inner.ranges.clone(), + ); Ok(()) } } diff --git a/src/request/plan.rs b/src/request/plan.rs index 9a93bbe..be915f7 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -17,8 +17,8 @@ use crate::pd::PdClient; use crate::proto::errorpb; use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; -use crate::region::RegionVerId; use crate::region::StoreId; +use crate::region::{RegionVerId, RegionWithLeader}; use crate::request::shard::HasNextBatch; use crate::request::NextBatch; use crate::request::Shardable; @@ -119,10 +119,13 @@ where debug!("single_plan_handler, shards: {}", shards.len()); let mut handles = Vec::new(); for shard in shards { + let (shard, region) = shard?; + let mut clone = current_plan.clone(); + clone.apply_shard(shard); let handle = tokio::spawn(Self::single_shard_handler( pd_client.clone(), - current_plan.clone(), - shard, + clone, + region, backoff.clone(), permits.clone(), preserve_region_results, @@ -154,15 +157,20 @@ where async fn single_shard_handler( pd_client: Arc, mut plan: P, - shard: Result<(

::Shard, RegionStore)>, + region: RegionWithLeader, mut backoff: Backoff, permits: Arc, preserve_region_results: bool, ) -> Result<::Result> { debug!("single_shard_handler"); - let region_store = match shard.and_then(|(shard, region_store)| { - plan.apply_shard(shard, ®ion_store).map(|_| region_store) - }) { + let region_store = match pd_client + .clone() + .map_region_to_store(region) + .await + .and_then(|region_store| { + plan.apply_store(®ion_store)?; + Ok(region_store) + }) { Ok(region_store) => region_store, Err(Error::LeaderNotFound { region }) => { debug!( @@ -930,11 +938,13 @@ mod test { fn shards( &self, _: &Arc, - ) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::RegionStore)>> { + ) -> BoxStream<'static, crate::Result<(Self::Shard, RegionWithLeader)>> { Box::pin(stream::iter(1..=3).map(|_| Err(Error::Unimplemented))).boxed() } - fn apply_shard(&mut self, _: Self::Shard, _: &crate::store::RegionStore) -> Result<()> { + fn apply_shard(&mut self, _: Self::Shard) {} + + fn apply_store(&mut self, _: &crate::store::RegionStore) -> Result<()> { Ok(()) } } diff --git a/src/request/shard.rs b/src/request/shard.rs index 1f116f7..a68f446 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -6,6 +6,7 @@ use futures::stream::BoxStream; use super::plan::PreserveShard; use crate::pd::PdClient; +use crate::region::RegionWithLeader; use crate::request::plan::CleanupLocks; use crate::request::Dispatch; use crate::request::KvRequest; @@ -23,12 +24,16 @@ macro_rules! impl_inner_shardable { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { self.inner.shards(pd_client) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.inner.apply_shard(shard, store) + fn apply_shard(&mut self, shard: Self::Shard) { + self.inner.apply_shard(shard); + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.inner.apply_store(store) } }; } @@ -39,9 +44,11 @@ pub trait Shardable { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>>; + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>>; - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>; + fn apply_shard(&mut self, shard: Self::Shard); + + fn apply_store(&mut self, store: &RegionStore) -> Result<()>; } pub trait Batchable { @@ -88,13 +95,17 @@ impl Shardable for Dispatch { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { self.request.shards(pd_client) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard) { + self.request.apply_shard(shard); + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { self.kv_client = Some(store.client.clone()); - self.request.apply_shard(shard, store) + self.request.apply_store(store) } } @@ -110,13 +121,17 @@ impl Shardable for PreserveShard

{ fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { self.inner.shards(pd_client) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard) { self.shard = Some(shard.clone()); - self.inner.apply_shard(shard, store) + self.inner.apply_shard(shard) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.inner.apply_store(store) } } @@ -130,13 +145,17 @@ impl Shardable for CleanupLocks { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { self.inner.shards(pd_client) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard) { + self.inner.apply_shard(shard) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { self.store = Some(store.clone()); - self.inner.apply_shard(shard, store) + self.inner.apply_store(store) } } @@ -152,23 +171,21 @@ macro_rules! shardable_key { pd_client: &std::sync::Arc, ) -> futures::stream::BoxStream< 'static, - $crate::Result<(Self::Shard, $crate::store::RegionStore)>, + $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>, > { - $crate::store::store_stream_for_keys( + $crate::store::region_stream_for_keys( std::iter::once(self.key.clone()), pd_client.clone(), ) } - fn apply_shard( - &mut self, - mut shard: Self::Shard, - store: &$crate::store::RegionStore, - ) -> $crate::Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, mut shard: Self::Shard) { assert!(shard.len() == 1); self.key = shard.pop().unwrap(); - Ok(()) + } + + fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> { + self.set_leader(&store.region_with_leader) } } }; @@ -186,21 +203,19 @@ macro_rules! shardable_keys { pd_client: &std::sync::Arc, ) -> futures::stream::BoxStream< 'static, - $crate::Result<(Self::Shard, $crate::store::RegionStore)>, + $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>, > { let mut keys = self.keys.clone(); keys.sort(); - $crate::store::store_stream_for_keys(keys.into_iter(), pd_client.clone()) + $crate::store::region_stream_for_keys(keys.into_iter(), pd_client.clone()) } - fn apply_shard( - &mut self, - shard: Self::Shard, - store: &$crate::store::RegionStore, - ) -> $crate::Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, shard: Self::Shard) { self.keys = shard.into_iter().map(Into::into).collect(); - Ok(()) + } + + fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> { + self.set_leader(&store.region_with_leader) } } }; @@ -242,7 +257,8 @@ macro_rules! shardable_range { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::store::RegionStore)>> { + ) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>> + { let mut start_key = self.start_key.clone().into(); let mut end_key = self.end_key.clone().into(); // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. @@ -250,16 +266,10 @@ macro_rules! shardable_range { if self.is_reverse() { std::mem::swap(&mut start_key, &mut end_key); } - $crate::store::store_stream_for_range((start_key, end_key), pd_client.clone()) + $crate::store::region_stream_for_range((start_key, end_key), pd_client.clone()) } - fn apply_shard( - &mut self, - shard: Self::Shard, - store: &$crate::store::RegionStore, - ) -> $crate::Result<()> { - self.set_leader(&store.region_with_leader)?; - + fn apply_shard(&mut self, shard: Self::Shard) { // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. // As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request. self.start_key = shard.0; @@ -267,7 +277,10 @@ macro_rules! shardable_range { if self.is_reverse() { std::mem::swap(&mut self.start_key, &mut self.end_key); } - Ok(()) + } + + fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> { + self.set_leader(&store.region_with_leader) } } }; diff --git a/src/store/mod.rs b/src/store/mod.rs index f21373b..fb6ed70 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -38,46 +38,37 @@ pub struct Store { } /// Maps keys to a stream of stores. `key_data` must be sorted in increasing order -pub fn store_stream_for_keys( +pub fn region_stream_for_keys( key_data: impl Iterator + Send + Sync + 'static, pd_client: Arc, -) -> BoxStream<'static, Result<(Vec, RegionStore)>> +) -> BoxStream<'static, Result<(Vec, RegionWithLeader)>> where PdC: PdClient, K: AsRef + Into + Send + Sync + 'static, KOut: Send + Sync + 'static, { - pd_client - .clone() - .group_keys_by_region(key_data) - .and_then(move |(region, key)| { - pd_client - .clone() - .map_region_to_store(region) - .map_ok(move |store| (key, store)) - }) - .boxed() + pd_client.clone().group_keys_by_region(key_data) } #[allow(clippy::type_complexity)] -pub fn store_stream_for_range( +pub fn region_stream_for_range( range: (Vec, Vec), pd_client: Arc, -) -> BoxStream<'static, Result<((Vec, Vec), RegionStore)>> { +) -> BoxStream<'static, Result<((Vec, Vec), RegionWithLeader)>> { let bnd_range = if range.1.is_empty() { BoundRange::range_from(range.0.clone().into()) } else { BoundRange::from(range.clone()) }; pd_client - .stores_for_range(bnd_range) - .map_ok(move |store| { - let region_range = store.region_with_leader.range(); + .regions_for_range(bnd_range) + .map_ok(move |region| { + let region_range = region.range(); let result_range = range_intersection( region_range, (range.0.clone().into(), range.1.clone().into()), ); - ((result_range.0.into(), result_range.1.into()), store) + ((result_range.0.into(), result_range.1.into()), region) }) .boxed() } @@ -95,18 +86,9 @@ fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) (max(lower, range.0), up) } -pub fn store_stream_for_ranges( +pub fn region_stream_for_ranges( ranges: Vec, pd_client: Arc, -) -> BoxStream<'static, Result<(Vec, RegionStore)>> { - pd_client - .clone() - .group_ranges_by_region(ranges) - .and_then(move |(region, range)| { - pd_client - .clone() - .map_region_to_store(region) - .map_ok(move |store| (range, store)) - }) - .boxed() +) -> BoxStream<'static, Result<(Vec, RegionWithLeader)>> { + pd_client.clone().group_ranges_by_region(ranges) } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 6a5538d..ebbb52a 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -19,6 +19,7 @@ use crate::proto::kvrpcpb::TxnHeartBeatResponse; use crate::proto::kvrpcpb::TxnInfo; use crate::proto::kvrpcpb::{self}; use crate::proto::pdpb::Timestamp; +use crate::region::RegionWithLeader; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::CollectWithShard; @@ -37,10 +38,10 @@ use crate::reversible_range_request; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; -use crate::store::store_stream_for_range; use crate::store::RegionStore; use crate::store::Request; -use crate::store::{store_stream_for_keys, Store}; +use crate::store::Store; +use crate::store::{region_stream_for_keys, region_stream_for_range}; use crate::timestamp::TimestampExt; use crate::transaction::requests::kvrpcpb::prewrite_request::PessimisticAction; use crate::transaction::HasLocks; @@ -283,26 +284,24 @@ impl Shardable for kvrpcpb::PrewriteRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { let mut mutations = self.mutations.clone(); mutations.sort_by(|a, b| a.key.cmp(&b.key)); - store_stream_for_keys(mutations.into_iter(), pd_client.clone()) + region_stream_for_keys(mutations.into_iter(), pd_client.clone()) .flat_map(|result| match result { - Ok((mutations, store)) => stream::iter(kvrpcpb::PrewriteRequest::batches( + Ok((mutations, region)) => stream::iter(kvrpcpb::PrewriteRequest::batches( mutations, TXN_COMMIT_BATCH_SIZE, )) - .map(move |batch| Ok((batch, store.clone()))) + .map(move |batch| Ok((batch, region.clone()))) .boxed(), Err(e) => stream::iter(Err(e)).boxed(), }) .boxed() } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; - + fn apply_shard(&mut self, shard: Self::Shard) { // Only need to set secondary keys if we're sending the primary key. if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) { self.secondaries = vec![]; @@ -314,7 +313,10 @@ impl Shardable for kvrpcpb::PrewriteRequest { } self.mutations = shard; - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -351,15 +353,15 @@ impl Shardable for kvrpcpb::CommitRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { let mut keys = self.keys.clone(); keys.sort(); - store_stream_for_keys(keys.into_iter(), pd_client.clone()) + region_stream_for_keys(keys.into_iter(), pd_client.clone()) .flat_map(|result| match result { - Ok((keys, store)) => { + Ok((keys, region)) => { stream::iter(kvrpcpb::CommitRequest::batches(keys, TXN_COMMIT_BATCH_SIZE)) - .map(move |batch| Ok((batch, store.clone()))) + .map(move |batch| Ok((batch, region.clone()))) .boxed() } Err(e) => stream::iter(Err(e)).boxed(), @@ -367,10 +369,12 @@ impl Shardable for kvrpcpb::CommitRequest { .boxed() } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, shard: Self::Shard) { self.keys = shard.into_iter().map(Into::into).collect(); - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -452,16 +456,18 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { let mut mutations = self.mutations.clone(); mutations.sort_by(|a, b| a.key.cmp(&b.key)); - store_stream_for_keys(mutations.into_iter(), pd_client.clone()) + region_stream_for_keys(mutations.into_iter(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, shard: Self::Shard) { self.mutations = shard; - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -552,17 +558,19 @@ impl Shardable for kvrpcpb::ScanLockRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_range( + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + region_stream_for_range( (self.start_key.clone(), self.end_key.clone()), pd_client.clone(), ) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, shard: Self::Shard) { self.start_key = shard.0; - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -616,15 +624,17 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone()) + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone()) } - fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, mut shard: Self::Shard) { assert!(shard.len() == 1); self.primary_lock = shard.pop().unwrap(); - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -674,15 +684,17 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone()) + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone()) } - fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, mut shard: Self::Shard) { assert!(shard.len() == 1); self.primary_key = shard.pop().unwrap(); - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } }