apply_shard + apply_store

Signed-off-by: Ping Yu <yuping@pingcap.com>
This commit is contained in:
Ping Yu 2025-05-20 18:57:16 +08:00
parent 48df203e04
commit a2972981a5
6 changed files with 194 additions and 168 deletions

View File

@ -85,7 +85,7 @@ pub trait PdClient: Send + Sync + 'static {
fn group_keys_by_region<K, K2>(
self: Arc<Self>,
keys: impl Iterator<Item = K> + Send + Sync + 'static,
) -> BoxStream<'static, Result<(RegionWithLeader, Vec<K2>)>>
) -> BoxStream<'static, Result<(Vec<K2>, RegionWithLeader)>>
where
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
K2: Send + Sync + 'static,
@ -103,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static {
}
grouped.push(keys.next().unwrap().into());
}
Ok(Some((keys, (region, grouped))))
Ok(Some((keys, (grouped, region))))
} else {
Ok(None)
}
@ -113,10 +113,10 @@ pub trait PdClient: Send + Sync + 'static {
}
/// Returns a Stream which iterates over the contexts for each region covered by range.
fn stores_for_range(
fn regions_for_range(
self: Arc<Self>,
range: BoundRange,
) -> BoxStream<'static, Result<RegionStore>> {
) -> BoxStream<'static, Result<RegionWithLeader>> {
let (start_key, end_key) = range.into_keys();
stream_fn(Some(start_key), move |start_key| {
let end_key = end_key.clone();
@ -129,15 +129,14 @@ pub trait PdClient: Send + Sync + 'static {
let region = this.region_for_key(&start_key).await?;
let region_end = region.end_key();
let store = this.map_region_to_store(region).await?;
if end_key
.map(|x| x <= region_end && !x.is_empty())
.unwrap_or(false)
|| region_end.is_empty()
{
return Ok(Some((None, store)));
return Ok(Some((None, region)));
}
Ok(Some((Some(region_end), store)))
Ok(Some((Some(region_end), region)))
}
})
.boxed()
@ -147,7 +146,7 @@ pub trait PdClient: Send + Sync + 'static {
fn group_ranges_by_region(
self: Arc<Self>,
mut ranges: Vec<kvrpcpb::KeyRange>,
) -> BoxStream<'static, Result<(RegionWithLeader, Vec<kvrpcpb::KeyRange>)>> {
) -> BoxStream<'static, Result<(Vec<kvrpcpb::KeyRange>, RegionWithLeader)>> {
ranges.reverse();
stream_fn(Some(ranges), move |ranges| {
let this = self.clone();
@ -167,7 +166,7 @@ pub trait PdClient: Send + Sync + 'static {
if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) {
grouped.push(make_key_range(start_key.into(), region_end.clone().into()));
ranges.push(make_key_range(region_end.into(), end_key.into()));
return Ok(Some((Some(ranges), (region, grouped))));
return Ok(Some((Some(ranges), (grouped, region))));
}
grouped.push(range);
@ -182,11 +181,11 @@ pub trait PdClient: Send + Sync + 'static {
grouped
.push(make_key_range(start_key.into(), region_end.clone().into()));
ranges.push(make_key_range(region_end.into(), end_key.into()));
return Ok(Some((Some(ranges), (region, grouped))));
return Ok(Some((Some(ranges), (grouped, region))));
}
grouped.push(range);
}
Ok(Some((Some(ranges), (region, grouped))))
Ok(Some((Some(ranges), (grouped, region))))
} else {
Ok(None)
}
@ -397,7 +396,7 @@ pub mod test {
let stream = Arc::new(client).group_keys_by_region(tasks.into_iter());
let mut stream = executor::block_on_stream(stream);
let result: Vec<Key> = stream.next().unwrap().unwrap().1;
let result: Vec<Key> = stream.next().unwrap().unwrap().0;
assert_eq!(
result,
vec![
@ -408,27 +407,27 @@ pub mod test {
]
);
assert_eq!(
stream.next().unwrap().unwrap().1,
stream.next().unwrap().unwrap().0,
vec![vec![12].into(), vec![11, 4].into()]
);
assert!(stream.next().is_none());
}
#[test]
fn test_stores_for_range() {
fn test_regions_for_range() {
let client = Arc::new(MockPdClient::default());
let k1: Key = vec![1].into();
let k2: Key = vec![5, 2].into();
let k3: Key = vec![11, 4].into();
let range1 = (k1, k2.clone()).into();
let mut stream = executor::block_on_stream(client.clone().stores_for_range(range1));
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1);
let mut stream = executor::block_on_stream(client.clone().regions_for_range(range1));
assert_eq!(stream.next().unwrap().unwrap().id(), 1);
assert!(stream.next().is_none());
let range2 = (k2, k3).into();
let mut stream = executor::block_on_stream(client.stores_for_range(range2));
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1);
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 2);
let mut stream = executor::block_on_stream(client.regions_for_range(range2));
assert_eq!(stream.next().unwrap().unwrap().id(), 1);
assert_eq!(stream.next().unwrap().unwrap().id(), 2);
assert!(stream.next().is_none());
}
@ -453,20 +452,20 @@ pub mod test {
let ranges3 = stream.next().unwrap().unwrap();
let ranges4 = stream.next().unwrap().unwrap();
assert_eq!(ranges1.0.id(), 1);
assert_eq!(ranges1.1.id(), 1);
assert_eq!(
ranges1.1,
ranges1.0,
vec![
make_key_range(k1.clone(), k2.clone()),
make_key_range(k1.clone(), k_split.clone()),
]
);
assert_eq!(ranges2.0.id(), 2);
assert_eq!(ranges2.1, vec![make_key_range(k_split.clone(), k3.clone())]);
assert_eq!(ranges3.0.id(), 1);
assert_eq!(ranges3.1, vec![make_key_range(k2.clone(), k_split.clone())]);
assert_eq!(ranges4.0.id(), 2);
assert_eq!(ranges4.1, vec![make_key_range(k_split, k4.clone())]);
assert_eq!(ranges2.1.id(), 2);
assert_eq!(ranges2.0, vec![make_key_range(k_split.clone(), k3.clone())]);
assert_eq!(ranges3.1.id(), 1);
assert_eq!(ranges3.0, vec![make_key_range(k2.clone(), k_split.clone())]);
assert_eq!(ranges4.1.id(), 2);
assert_eq!(ranges4.0, vec![make_key_range(k_split, k4.clone())]);
assert!(stream.next().is_none());
let range1 = make_key_range(k1.clone(), k2.clone());
@ -477,11 +476,11 @@ pub mod test {
let ranges1 = stream.next().unwrap().unwrap();
let ranges2 = stream.next().unwrap().unwrap();
let ranges3 = stream.next().unwrap().unwrap();
assert_eq!(ranges1.0.id(), 1);
assert_eq!(ranges1.1, vec![make_key_range(k1, k2)]);
assert_eq!(ranges2.0.id(), 2);
assert_eq!(ranges2.1, vec![make_key_range(k3, k4)]);
assert_eq!(ranges3.0.id(), 3);
assert_eq!(ranges3.1, vec![make_key_range(k5, k6)]);
assert_eq!(ranges1.1.id(), 1);
assert_eq!(ranges1.0, vec![make_key_range(k1, k2)]);
assert_eq!(ranges2.1.id(), 2);
assert_eq!(ranges2.0, vec![make_key_range(k3, k4)]);
assert_eq!(ranges3.1.id(), 3);
assert_eq!(ranges3.0, vec![make_key_range(k5, k6)]);
}
}

View File

@ -22,8 +22,8 @@ use crate::request::SingleKey;
use crate::shardable_key;
use crate::shardable_keys;
use crate::shardable_range;
use crate::store::store_stream_for_keys;
use crate::store::store_stream_for_ranges;
use crate::store::region_stream_for_keys;
use crate::store::region_stream_for_ranges;
use crate::store::RegionStore;
use crate::store::Request;
use crate::transaction::HasLocks;
@ -194,7 +194,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
let kvs = self.pairs.clone();
let ttls = self.ttls.clone();
let mut kv_ttl: Vec<KvPairTTL> = kvs
@ -203,15 +203,17 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
.map(|(kv, ttl)| KvPairTTL(kv, ttl))
.collect();
kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key));
store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
let (pairs, ttls) = shard.into_iter().unzip();
self.set_leader(&store.region_with_leader)?;
self.pairs = pairs;
self.ttls = ttls;
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -344,14 +346,16 @@ impl Shardable for kvrpcpb::RawBatchScanRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_ranges(self.ranges.clone(), pd_client.clone())
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_ranges(self.ranges.clone(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.ranges = shard;
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -470,14 +474,20 @@ impl Shardable for RawCoprocessorRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
self.inner.ranges = shard;
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
self.inner.ranges.clone_from(&shard);
self.inner.data = (self.data_builder)(store.region_with_leader.region.clone(), shard);
self.inner.data = (self.data_builder)(
store.region_with_leader.region.clone(),
self.inner.ranges.clone(),
);
Ok(())
}
}

View File

@ -17,8 +17,8 @@ use crate::pd::PdClient;
use crate::proto::errorpb;
use crate::proto::errorpb::EpochNotMatch;
use crate::proto::kvrpcpb;
use crate::region::RegionVerId;
use crate::region::StoreId;
use crate::region::{RegionVerId, RegionWithLeader};
use crate::request::shard::HasNextBatch;
use crate::request::NextBatch;
use crate::request::Shardable;
@ -119,10 +119,13 @@ where
debug!("single_plan_handler, shards: {}", shards.len());
let mut handles = Vec::new();
for shard in shards {
let (shard, region) = shard?;
let mut clone = current_plan.clone();
clone.apply_shard(shard);
let handle = tokio::spawn(Self::single_shard_handler(
pd_client.clone(),
current_plan.clone(),
shard,
clone,
region,
backoff.clone(),
permits.clone(),
preserve_region_results,
@ -154,15 +157,20 @@ where
async fn single_shard_handler(
pd_client: Arc<PdC>,
mut plan: P,
shard: Result<(<P as Shardable>::Shard, RegionStore)>,
region: RegionWithLeader,
mut backoff: Backoff,
permits: Arc<Semaphore>,
preserve_region_results: bool,
) -> Result<<Self as Plan>::Result> {
debug!("single_shard_handler");
let region_store = match shard.and_then(|(shard, region_store)| {
plan.apply_shard(shard, &region_store).map(|_| region_store)
}) {
let region_store = match pd_client
.clone()
.map_region_to_store(region)
.await
.and_then(|region_store| {
plan.apply_store(&region_store)?;
Ok(region_store)
}) {
Ok(region_store) => region_store,
Err(Error::LeaderNotFound { region }) => {
debug!(
@ -930,11 +938,13 @@ mod test {
fn shards(
&self,
_: &Arc<impl crate::pd::PdClient>,
) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::RegionStore)>> {
) -> BoxStream<'static, crate::Result<(Self::Shard, RegionWithLeader)>> {
Box::pin(stream::iter(1..=3).map(|_| Err(Error::Unimplemented))).boxed()
}
fn apply_shard(&mut self, _: Self::Shard, _: &crate::store::RegionStore) -> Result<()> {
fn apply_shard(&mut self, _: Self::Shard) {}
fn apply_store(&mut self, _: &crate::store::RegionStore) -> Result<()> {
Ok(())
}
}

View File

@ -6,6 +6,7 @@ use futures::stream::BoxStream;
use super::plan::PreserveShard;
use crate::pd::PdClient;
use crate::region::RegionWithLeader;
use crate::request::plan::CleanupLocks;
use crate::request::Dispatch;
use crate::request::KvRequest;
@ -23,12 +24,16 @@ macro_rules! impl_inner_shardable {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.inner.apply_shard(shard, store)
fn apply_shard(&mut self, shard: Self::Shard) {
self.inner.apply_shard(shard);
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.inner.apply_store(store)
}
};
}
@ -39,9 +44,11 @@ pub trait Shardable {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>>;
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>>;
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>;
fn apply_shard(&mut self, shard: Self::Shard);
fn apply_store(&mut self, store: &RegionStore) -> Result<()>;
}
pub trait Batchable {
@ -88,13 +95,17 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
self.request.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
self.request.apply_shard(shard);
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.kv_client = Some(store.client.clone());
self.request.apply_shard(shard, store)
self.request.apply_store(store)
}
}
@ -110,13 +121,17 @@ impl<P: Plan + Shardable> Shardable for PreserveShard<P> {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
self.shard = Some(shard.clone());
self.inner.apply_shard(shard, store)
self.inner.apply_shard(shard)
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.inner.apply_store(store)
}
}
@ -130,13 +145,17 @@ impl<P: Plan + Shardable, PdC: PdClient> Shardable for CleanupLocks<P, PdC> {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
self.inner.apply_shard(shard)
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.store = Some(store.clone());
self.inner.apply_shard(shard, store)
self.inner.apply_store(store)
}
}
@ -152,23 +171,21 @@ macro_rules! shardable_key {
pd_client: &std::sync::Arc<impl $crate::pd::PdClient>,
) -> futures::stream::BoxStream<
'static,
$crate::Result<(Self::Shard, $crate::store::RegionStore)>,
$crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>,
> {
$crate::store::store_stream_for_keys(
$crate::store::region_stream_for_keys(
std::iter::once(self.key.clone()),
pd_client.clone(),
)
}
fn apply_shard(
&mut self,
mut shard: Self::Shard,
store: &$crate::store::RegionStore,
) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, mut shard: Self::Shard) {
assert!(shard.len() == 1);
self.key = shard.pop().unwrap();
Ok(())
}
fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)
}
}
};
@ -186,21 +203,19 @@ macro_rules! shardable_keys {
pd_client: &std::sync::Arc<impl $crate::pd::PdClient>,
) -> futures::stream::BoxStream<
'static,
$crate::Result<(Self::Shard, $crate::store::RegionStore)>,
$crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>,
> {
let mut keys = self.keys.clone();
keys.sort();
$crate::store::store_stream_for_keys(keys.into_iter(), pd_client.clone())
$crate::store::region_stream_for_keys(keys.into_iter(), pd_client.clone())
}
fn apply_shard(
&mut self,
shard: Self::Shard,
store: &$crate::store::RegionStore,
) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.keys = shard.into_iter().map(Into::into).collect();
Ok(())
}
fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)
}
}
};
@ -242,7 +257,8 @@ macro_rules! shardable_range {
fn shards(
&self,
pd_client: &Arc<impl $crate::pd::PdClient>,
) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::store::RegionStore)>> {
) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>>
{
let mut start_key = self.start_key.clone().into();
let mut end_key = self.end_key.clone().into();
// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
@ -250,16 +266,10 @@ macro_rules! shardable_range {
if self.is_reverse() {
std::mem::swap(&mut start_key, &mut end_key);
}
$crate::store::store_stream_for_range((start_key, end_key), pd_client.clone())
$crate::store::region_stream_for_range((start_key, end_key), pd_client.clone())
}
fn apply_shard(
&mut self,
shard: Self::Shard,
store: &$crate::store::RegionStore,
) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
// As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request.
self.start_key = shard.0;
@ -267,7 +277,10 @@ macro_rules! shardable_range {
if self.is_reverse() {
std::mem::swap(&mut self.start_key, &mut self.end_key);
}
Ok(())
}
fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)
}
}
};

View File

@ -38,46 +38,37 @@ pub struct Store {
}
/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order
pub fn store_stream_for_keys<K, KOut, PdC>(
pub fn region_stream_for_keys<K, KOut, PdC>(
key_data: impl Iterator<Item = K> + Send + Sync + 'static,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<KOut>, RegionStore)>>
) -> BoxStream<'static, Result<(Vec<KOut>, RegionWithLeader)>>
where
PdC: PdClient,
K: AsRef<Key> + Into<KOut> + Send + Sync + 'static,
KOut: Send + Sync + 'static,
{
pd_client
.clone()
.group_keys_by_region(key_data)
.and_then(move |(region, key)| {
pd_client
.clone()
.map_region_to_store(region)
.map_ok(move |store| (key, store))
})
.boxed()
pd_client.clone().group_keys_by_region(key_data)
}
#[allow(clippy::type_complexity)]
pub fn store_stream_for_range<PdC: PdClient>(
pub fn region_stream_for_range<PdC: PdClient>(
range: (Vec<u8>, Vec<u8>),
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<((Vec<u8>, Vec<u8>), RegionStore)>> {
) -> BoxStream<'static, Result<((Vec<u8>, Vec<u8>), RegionWithLeader)>> {
let bnd_range = if range.1.is_empty() {
BoundRange::range_from(range.0.clone().into())
} else {
BoundRange::from(range.clone())
};
pd_client
.stores_for_range(bnd_range)
.map_ok(move |store| {
let region_range = store.region_with_leader.range();
.regions_for_range(bnd_range)
.map_ok(move |region| {
let region_range = region.range();
let result_range = range_intersection(
region_range,
(range.0.clone().into(), range.1.clone().into()),
);
((result_range.0.into(), result_range.1.into()), store)
((result_range.0.into(), result_range.1.into()), region)
})
.boxed()
}
@ -95,18 +86,9 @@ fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key)
(max(lower, range.0), up)
}
pub fn store_stream_for_ranges<PdC: PdClient>(
pub fn region_stream_for_ranges<PdC: PdClient>(
ranges: Vec<kvrpcpb::KeyRange>,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<kvrpcpb::KeyRange>, RegionStore)>> {
pd_client
.clone()
.group_ranges_by_region(ranges)
.and_then(move |(region, range)| {
pd_client
.clone()
.map_region_to_store(region)
.map_ok(move |store| (range, store))
})
.boxed()
) -> BoxStream<'static, Result<(Vec<kvrpcpb::KeyRange>, RegionWithLeader)>> {
pd_client.clone().group_ranges_by_region(ranges)
}

View File

@ -19,6 +19,7 @@ use crate::proto::kvrpcpb::TxnHeartBeatResponse;
use crate::proto::kvrpcpb::TxnInfo;
use crate::proto::kvrpcpb::{self};
use crate::proto::pdpb::Timestamp;
use crate::region::RegionWithLeader;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::CollectWithShard;
@ -37,10 +38,10 @@ use crate::reversible_range_request;
use crate::shardable_key;
use crate::shardable_keys;
use crate::shardable_range;
use crate::store::store_stream_for_range;
use crate::store::RegionStore;
use crate::store::Request;
use crate::store::{store_stream_for_keys, Store};
use crate::store::Store;
use crate::store::{region_stream_for_keys, region_stream_for_range};
use crate::timestamp::TimestampExt;
use crate::transaction::requests::kvrpcpb::prewrite_request::PessimisticAction;
use crate::transaction::HasLocks;
@ -283,26 +284,24 @@ impl Shardable for kvrpcpb::PrewriteRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
let mut mutations = self.mutations.clone();
mutations.sort_by(|a, b| a.key.cmp(&b.key));
store_stream_for_keys(mutations.into_iter(), pd_client.clone())
region_stream_for_keys(mutations.into_iter(), pd_client.clone())
.flat_map(|result| match result {
Ok((mutations, store)) => stream::iter(kvrpcpb::PrewriteRequest::batches(
Ok((mutations, region)) => stream::iter(kvrpcpb::PrewriteRequest::batches(
mutations,
TXN_COMMIT_BATCH_SIZE,
))
.map(move |batch| Ok((batch, store.clone())))
.map(move |batch| Ok((batch, region.clone())))
.boxed(),
Err(e) => stream::iter(Err(e)).boxed(),
})
.boxed()
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
// Only need to set secondary keys if we're sending the primary key.
if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) {
self.secondaries = vec![];
@ -314,7 +313,10 @@ impl Shardable for kvrpcpb::PrewriteRequest {
}
self.mutations = shard;
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -351,15 +353,15 @@ impl Shardable for kvrpcpb::CommitRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
let mut keys = self.keys.clone();
keys.sort();
store_stream_for_keys(keys.into_iter(), pd_client.clone())
region_stream_for_keys(keys.into_iter(), pd_client.clone())
.flat_map(|result| match result {
Ok((keys, store)) => {
Ok((keys, region)) => {
stream::iter(kvrpcpb::CommitRequest::batches(keys, TXN_COMMIT_BATCH_SIZE))
.map(move |batch| Ok((batch, store.clone())))
.map(move |batch| Ok((batch, region.clone())))
.boxed()
}
Err(e) => stream::iter(Err(e)).boxed(),
@ -367,10 +369,12 @@ impl Shardable for kvrpcpb::CommitRequest {
.boxed()
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.keys = shard.into_iter().map(Into::into).collect();
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -452,16 +456,18 @@ impl Shardable for kvrpcpb::PessimisticLockRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
let mut mutations = self.mutations.clone();
mutations.sort_by(|a, b| a.key.cmp(&b.key));
store_stream_for_keys(mutations.into_iter(), pd_client.clone())
region_stream_for_keys(mutations.into_iter(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.mutations = shard;
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -552,17 +558,19 @@ impl Shardable for kvrpcpb::ScanLockRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_range(
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_range(
(self.start_key.clone(), self.end_key.clone()),
pd_client.clone(),
)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.start_key = shard.0;
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -616,15 +624,17 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
}
fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, mut shard: Self::Shard) {
assert!(shard.len() == 1);
self.primary_lock = shard.pop().unwrap();
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -674,15 +684,17 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
}
fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, mut shard: Self::Shard) {
assert!(shard.len() == 1);
self.primary_key = shard.pop().unwrap();
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}