plan: Handle no leader and invalidate store region (#484)

close tikv/client-rust#479

Signed-off-by: Ping Yu <yuping@pingcap.com>
This commit is contained in:
Ping Yu 2025-05-22 10:38:07 +08:00 committed by GitHub
parent 32b0cff3aa
commit 7c78aadf44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 282 additions and 192 deletions

View File

@ -61,6 +61,7 @@ jobs:
CARGO_INCREMENTAL: 0
NEXTEST_PROFILE: ci
TIKV_VERSION: v8.5.1
RUST_LOG: info
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

View File

@ -57,7 +57,6 @@ reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
rstest = "0.18.2"
serde_json = "1"
serial_test = "0.5.0"
simple_logger = "1"
tempfile = "3.6"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }

View File

@ -5,6 +5,7 @@ use std::result;
use thiserror::Error;
use crate::proto::kvrpcpb;
use crate::region::RegionVerId;
use crate::BoundRange;
/// An error originating from the TiKV client or dependencies.
@ -90,8 +91,8 @@ pub enum Error {
#[error("Region {} is not found in the response", region_id)]
RegionNotFoundInResponse { region_id: u64 },
/// No leader is found for the given id.
#[error("Leader of region {} is not found", region_id)]
LeaderNotFound { region_id: u64 },
#[error("Leader of region {} is not found", region.id)]
LeaderNotFound { region: RegionVerId },
/// Scan limit exceeds the maximum
#[error("Limit {} exceeds max scan limit {}", limit, max_limit)]
MaxScanLimitExceeded { limit: u32, max_limit: u32 },

View File

@ -216,6 +216,8 @@ impl PdClient for MockPdClient {
async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
async fn invalidate_store_cache(&self, _store_id: crate::region::StoreId) {}
async fn load_keyspace(&self, _keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
unimplemented!()
}

View File

@ -20,6 +20,7 @@ use crate::proto::metapb;
use crate::region::RegionId;
use crate::region::RegionVerId;
use crate::region::RegionWithLeader;
use crate::region::StoreId;
use crate::region_cache::RegionCache;
use crate::store::KvConnect;
use crate::store::RegionStore;
@ -84,7 +85,7 @@ pub trait PdClient: Send + Sync + 'static {
fn group_keys_by_region<K, K2>(
self: Arc<Self>,
keys: impl Iterator<Item = K> + Send + Sync + 'static,
) -> BoxStream<'static, Result<(RegionWithLeader, Vec<K2>)>>
) -> BoxStream<'static, Result<(Vec<K2>, RegionWithLeader)>>
where
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
K2: Send + Sync + 'static,
@ -102,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static {
}
grouped.push(keys.next().unwrap().into());
}
Ok(Some((keys, (region, grouped))))
Ok(Some((keys, (grouped, region))))
} else {
Ok(None)
}
@ -112,10 +113,10 @@ pub trait PdClient: Send + Sync + 'static {
}
/// Returns a Stream which iterates over the contexts for each region covered by range.
fn stores_for_range(
fn regions_for_range(
self: Arc<Self>,
range: BoundRange,
) -> BoxStream<'static, Result<RegionStore>> {
) -> BoxStream<'static, Result<RegionWithLeader>> {
let (start_key, end_key) = range.into_keys();
stream_fn(Some(start_key), move |start_key| {
let end_key = end_key.clone();
@ -128,15 +129,14 @@ pub trait PdClient: Send + Sync + 'static {
let region = this.region_for_key(&start_key).await?;
let region_end = region.end_key();
let store = this.map_region_to_store(region).await?;
if end_key
.map(|x| x <= region_end && !x.is_empty())
.unwrap_or(false)
|| region_end.is_empty()
{
return Ok(Some((None, store)));
return Ok(Some((None, region)));
}
Ok(Some((Some(region_end), store)))
Ok(Some((Some(region_end), region)))
}
})
.boxed()
@ -146,7 +146,7 @@ pub trait PdClient: Send + Sync + 'static {
fn group_ranges_by_region(
self: Arc<Self>,
mut ranges: Vec<kvrpcpb::KeyRange>,
) -> BoxStream<'static, Result<(RegionWithLeader, Vec<kvrpcpb::KeyRange>)>> {
) -> BoxStream<'static, Result<(Vec<kvrpcpb::KeyRange>, RegionWithLeader)>> {
ranges.reverse();
stream_fn(Some(ranges), move |ranges| {
let this = self.clone();
@ -166,7 +166,7 @@ pub trait PdClient: Send + Sync + 'static {
if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) {
grouped.push(make_key_range(start_key.into(), region_end.clone().into()));
ranges.push(make_key_range(region_end.into(), end_key.into()));
return Ok(Some((Some(ranges), (region, grouped))));
return Ok(Some((Some(ranges), (grouped, region))));
}
grouped.push(range);
@ -181,11 +181,11 @@ pub trait PdClient: Send + Sync + 'static {
grouped
.push(make_key_range(start_key.into(), region_end.clone().into()));
ranges.push(make_key_range(region_end.into(), end_key.into()));
return Ok(Some((Some(ranges), (region, grouped))));
return Ok(Some((Some(ranges), (grouped, region))));
}
grouped.push(range);
}
Ok(Some((Some(ranges), (region, grouped))))
Ok(Some((Some(ranges), (grouped, region))))
} else {
Ok(None)
}
@ -205,6 +205,8 @@ pub trait PdClient: Send + Sync + 'static {
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;
async fn invalidate_region_cache(&self, ver_id: RegionVerId);
async fn invalidate_store_cache(&self, store_id: StoreId);
}
/// This client converts requests for the logical TiKV cluster into requests
@ -271,6 +273,10 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
self.region_cache.invalidate_region_cache(ver_id).await
}
async fn invalidate_store_cache(&self, store_id: StoreId) {
self.region_cache.invalidate_store_cache(store_id).await
}
async fn load_keyspace(&self, keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
self.pd.load_keyspace(keyspace).await
}
@ -390,7 +396,7 @@ pub mod test {
let stream = Arc::new(client).group_keys_by_region(tasks.into_iter());
let mut stream = executor::block_on_stream(stream);
let result: Vec<Key> = stream.next().unwrap().unwrap().1;
let result: Vec<Key> = stream.next().unwrap().unwrap().0;
assert_eq!(
result,
vec![
@ -401,27 +407,27 @@ pub mod test {
]
);
assert_eq!(
stream.next().unwrap().unwrap().1,
stream.next().unwrap().unwrap().0,
vec![vec![12].into(), vec![11, 4].into()]
);
assert!(stream.next().is_none());
}
#[test]
fn test_stores_for_range() {
fn test_regions_for_range() {
let client = Arc::new(MockPdClient::default());
let k1: Key = vec![1].into();
let k2: Key = vec![5, 2].into();
let k3: Key = vec![11, 4].into();
let range1 = (k1, k2.clone()).into();
let mut stream = executor::block_on_stream(client.clone().stores_for_range(range1));
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1);
let mut stream = executor::block_on_stream(client.clone().regions_for_range(range1));
assert_eq!(stream.next().unwrap().unwrap().id(), 1);
assert!(stream.next().is_none());
let range2 = (k2, k3).into();
let mut stream = executor::block_on_stream(client.stores_for_range(range2));
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1);
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 2);
let mut stream = executor::block_on_stream(client.regions_for_range(range2));
assert_eq!(stream.next().unwrap().unwrap().id(), 1);
assert_eq!(stream.next().unwrap().unwrap().id(), 2);
assert!(stream.next().is_none());
}
@ -446,20 +452,20 @@ pub mod test {
let ranges3 = stream.next().unwrap().unwrap();
let ranges4 = stream.next().unwrap().unwrap();
assert_eq!(ranges1.0.id(), 1);
assert_eq!(ranges1.1.id(), 1);
assert_eq!(
ranges1.1,
ranges1.0,
vec![
make_key_range(k1.clone(), k2.clone()),
make_key_range(k1.clone(), k_split.clone()),
]
);
assert_eq!(ranges2.0.id(), 2);
assert_eq!(ranges2.1, vec![make_key_range(k_split.clone(), k3.clone())]);
assert_eq!(ranges3.0.id(), 1);
assert_eq!(ranges3.1, vec![make_key_range(k2.clone(), k_split.clone())]);
assert_eq!(ranges4.0.id(), 2);
assert_eq!(ranges4.1, vec![make_key_range(k_split, k4.clone())]);
assert_eq!(ranges2.1.id(), 2);
assert_eq!(ranges2.0, vec![make_key_range(k_split.clone(), k3.clone())]);
assert_eq!(ranges3.1.id(), 1);
assert_eq!(ranges3.0, vec![make_key_range(k2.clone(), k_split.clone())]);
assert_eq!(ranges4.1.id(), 2);
assert_eq!(ranges4.0, vec![make_key_range(k_split, k4.clone())]);
assert!(stream.next().is_none());
let range1 = make_key_range(k1.clone(), k2.clone());
@ -470,11 +476,11 @@ pub mod test {
let ranges1 = stream.next().unwrap().unwrap();
let ranges2 = stream.next().unwrap().unwrap();
let ranges3 = stream.next().unwrap().unwrap();
assert_eq!(ranges1.0.id(), 1);
assert_eq!(ranges1.1, vec![make_key_range(k1, k2)]);
assert_eq!(ranges2.0.id(), 2);
assert_eq!(ranges2.1, vec![make_key_range(k3, k4)]);
assert_eq!(ranges3.0.id(), 3);
assert_eq!(ranges3.1, vec![make_key_range(k5, k6)]);
assert_eq!(ranges1.1.id(), 1);
assert_eq!(ranges1.0, vec![make_key_range(k1, k2)]);
assert_eq!(ranges2.1.id(), 2);
assert_eq!(ranges2.0, vec![make_key_range(k3, k4)]);
assert_eq!(ranges3.1.id(), 3);
assert_eq!(ranges3.0, vec![make_key_range(k5, k6)]);
}
}

View File

@ -22,8 +22,8 @@ use crate::request::SingleKey;
use crate::shardable_key;
use crate::shardable_keys;
use crate::shardable_range;
use crate::store::store_stream_for_keys;
use crate::store::store_stream_for_ranges;
use crate::store::region_stream_for_keys;
use crate::store::region_stream_for_ranges;
use crate::store::RegionStore;
use crate::store::Request;
use crate::transaction::HasLocks;
@ -194,7 +194,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
let kvs = self.pairs.clone();
let ttls = self.ttls.clone();
let mut kv_ttl: Vec<KvPairTTL> = kvs
@ -203,15 +203,17 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
.map(|(kv, ttl)| KvPairTTL(kv, ttl))
.collect();
kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key));
store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
let (pairs, ttls) = shard.into_iter().unzip();
self.set_leader(&store.region_with_leader)?;
self.pairs = pairs;
self.ttls = ttls;
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -344,14 +346,16 @@ impl Shardable for kvrpcpb::RawBatchScanRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_ranges(self.ranges.clone(), pd_client.clone())
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_ranges(self.ranges.clone(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.ranges = shard;
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -470,14 +474,20 @@ impl Shardable for RawCoprocessorRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
self.inner.ranges = shard;
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
self.inner.ranges.clone_from(&shard);
self.inner.data = (self.data_builder)(store.region_with_leader.region.clone(), shard);
self.inner.data = (self.data_builder)(
store.region_with_leader.region.clone(),
self.inner.ranges.clone(),
);
Ok(())
}
}

View File

@ -73,7 +73,7 @@ impl RegionWithLeader {
.as_ref()
.cloned()
.ok_or_else(|| Error::LeaderNotFound {
region_id: self.id(),
region: self.ver_id(),
})
.map(|s| s.store_id)
}

View File

@ -233,6 +233,11 @@ impl<C: RetryClientTrait> RegionCache<C> {
}
}
pub async fn invalidate_store_cache(&self, store_id: StoreId) {
let mut cache = self.store_cache.write().await;
cache.remove(&store_id);
}
pub async fn read_through_all_stores(&self) -> Result<Vec<Store>> {
let stores = self
.inner_client

View File

@ -103,7 +103,7 @@ mod test {
use crate::proto::pdpb::Timestamp;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::region::RegionWithLeader;
use crate::store::store_stream_for_keys;
use crate::store::region_stream_for_keys;
use crate::store::HasRegionError;
use crate::transaction::lowering::new_commit_request;
use crate::Error;
@ -168,22 +168,20 @@ mod test {
pd_client: &std::sync::Arc<impl crate::pd::PdClient>,
) -> futures::stream::BoxStream<
'static,
crate::Result<(Self::Shard, crate::store::RegionStore)>,
crate::Result<(Self::Shard, crate::region::RegionWithLeader)>,
> {
// Increases by 1 for each call.
self.test_invoking_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
store_stream_for_keys(
region_stream_for_keys(
Some(Key::from("mock_key".to_owned())).into_iter(),
pd_client.clone(),
)
}
fn apply_shard(
&mut self,
_shard: Self::Shard,
_store: &crate::store::RegionStore,
) -> crate::Result<()> {
fn apply_shard(&mut self, _shard: Self::Shard) {}
fn apply_store(&mut self, _store: &crate::store::RegionStore) -> crate::Result<()> {
Ok(())
}
}

View File

@ -17,6 +17,8 @@ use crate::pd::PdClient;
use crate::proto::errorpb;
use crate::proto::errorpb::EpochNotMatch;
use crate::proto::kvrpcpb;
use crate::region::StoreId;
use crate::region::{RegionVerId, RegionWithLeader};
use crate::request::shard::HasNextBatch;
use crate::request::NextBatch;
use crate::request::Shardable;
@ -114,15 +116,16 @@ where
preserve_region_results: bool,
) -> Result<<Self as Plan>::Result> {
let shards = current_plan.shards(&pd_client).collect::<Vec<_>>().await;
debug!("single_plan_handler, shards: {}", shards.len());
let mut handles = Vec::new();
for shard in shards {
let (shard, region_store) = shard?;
let (shard, region) = shard?;
let mut clone = current_plan.clone();
clone.apply_shard(shard, &region_store)?;
clone.apply_shard(shard);
let handle = tokio::spawn(Self::single_shard_handler(
pd_client.clone(),
clone,
region_store,
region,
backoff.clone(),
permits.clone(),
preserve_region_results,
@ -153,12 +156,45 @@ where
#[async_recursion]
async fn single_shard_handler(
pd_client: Arc<PdC>,
plan: P,
region_store: RegionStore,
mut plan: P,
region: RegionWithLeader,
mut backoff: Backoff,
permits: Arc<Semaphore>,
preserve_region_results: bool,
) -> Result<<Self as Plan>::Result> {
debug!("single_shard_handler");
let region_store = match pd_client
.clone()
.map_region_to_store(region)
.await
.and_then(|region_store| {
plan.apply_store(&region_store)?;
Ok(region_store)
}) {
Ok(region_store) => region_store,
Err(Error::LeaderNotFound { region }) => {
debug!(
"single_shard_handler::sharding: leader not found: {:?}",
region
);
return Self::handle_other_error(
pd_client,
plan,
region.clone(),
None,
backoff,
permits,
preserve_region_results,
Error::LeaderNotFound { region },
)
.await;
}
Err(err) => {
debug!("single_shard_handler::sharding, error: {:?}", err);
return Err(err);
}
};
// limit concurrent requests
let permit = permits.acquire().await.unwrap();
let res = plan.execute().await;
@ -167,10 +203,12 @@ where
let mut resp = match res {
Ok(resp) => resp,
Err(e) if is_grpc_error(&e) => {
return Self::handle_grpc_error(
debug!("single_shard_handler:execute: grpc error: {:?}", e);
return Self::handle_other_error(
pd_client,
plan,
region_store,
region_store.region_with_leader.ver_id(),
region_store.region_with_leader.get_store_id().ok(),
backoff,
permits,
preserve_region_results,
@ -178,12 +216,17 @@ where
)
.await;
}
Err(e) => return Err(e),
Err(e) => {
debug!("single_shard_handler:execute: error: {:?}", e);
return Err(e);
}
};
if let Some(e) = resp.key_errors() {
debug!("single_shard_handler:execute: key errors: {:?}", e);
Ok(vec![Err(Error::MultipleKeyErrors(e))])
} else if let Some(e) = resp.region_error() {
debug!("single_shard_handler:execute: region error: {:?}", e);
match backoff.next_delay_duration() {
Some(duration) => {
let region_error_resolved =
@ -208,18 +251,24 @@ where
}
}
async fn handle_grpc_error(
#[allow(clippy::too_many_arguments)]
async fn handle_other_error(
pd_client: Arc<PdC>,
plan: P,
region_store: RegionStore,
region: RegionVerId,
store: Option<StoreId>,
mut backoff: Backoff,
permits: Arc<Semaphore>,
preserve_region_results: bool,
e: Error,
) -> Result<<Self as Plan>::Result> {
debug!("handle grpc error: {:?}", e);
let ver_id = region_store.region_with_leader.ver_id();
pd_client.invalidate_region_cache(ver_id).await;
debug!("handle_other_error: {:?}", e);
pd_client.invalidate_region_cache(region).await;
if is_grpc_error(&e) {
if let Some(store_id) = store {
pd_client.invalidate_store_cache(store_id).await;
}
}
match backoff.next_delay_duration() {
Some(duration) => {
sleep(duration).await;
@ -246,7 +295,9 @@ pub(crate) async fn handle_region_error<PdC: PdClient>(
e: errorpb::Error,
region_store: RegionStore,
) -> Result<bool> {
debug!("handle_region_error: {:?}", e);
let ver_id = region_store.region_with_leader.ver_id();
let store_id = region_store.region_with_leader.get_store_id();
if let Some(not_leader) = e.not_leader {
if let Some(leader) = not_leader.leader {
match pd_client
@ -269,6 +320,9 @@ pub(crate) async fn handle_region_error<PdC: PdClient>(
}
} else if e.store_not_match.is_some() {
pd_client.invalidate_region_cache(ver_id).await;
if let Ok(store_id) = store_id {
pd_client.invalidate_store_cache(store_id).await;
}
Ok(false)
} else if e.epoch_not_match.is_some() {
on_region_epoch_not_match(pd_client.clone(), region_store, e.epoch_not_match.unwrap()).await
@ -284,6 +338,9 @@ pub(crate) async fn handle_region_error<PdC: PdClient>(
// TODO: pass the logger around
// info!("unknwon region error: {:?}", e);
pd_client.invalidate_region_cache(ver_id).await;
if let Ok(store_id) = store_id {
pd_client.invalidate_store_cache(store_id).await;
}
Ok(false)
}
}
@ -881,11 +938,13 @@ mod test {
fn shards(
&self,
_: &Arc<impl crate::pd::PdClient>,
) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::RegionStore)>> {
) -> BoxStream<'static, crate::Result<(Self::Shard, RegionWithLeader)>> {
Box::pin(stream::iter(1..=3).map(|_| Err(Error::Unimplemented))).boxed()
}
fn apply_shard(&mut self, _: Self::Shard, _: &crate::store::RegionStore) -> Result<()> {
fn apply_shard(&mut self, _: Self::Shard) {}
fn apply_store(&mut self, _: &crate::store::RegionStore) -> Result<()> {
Ok(())
}
}

View File

@ -6,6 +6,7 @@ use futures::stream::BoxStream;
use super::plan::PreserveShard;
use crate::pd::PdClient;
use crate::region::RegionWithLeader;
use crate::request::plan::CleanupLocks;
use crate::request::Dispatch;
use crate::request::KvRequest;
@ -23,12 +24,16 @@ macro_rules! impl_inner_shardable {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.inner.apply_shard(shard, store)
fn apply_shard(&mut self, shard: Self::Shard) {
self.inner.apply_shard(shard);
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.inner.apply_store(store)
}
};
}
@ -39,9 +44,11 @@ pub trait Shardable {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>>;
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>>;
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>;
fn apply_shard(&mut self, shard: Self::Shard);
fn apply_store(&mut self, store: &RegionStore) -> Result<()>;
}
pub trait Batchable {
@ -88,13 +95,17 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
self.request.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
self.request.apply_shard(shard);
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.kv_client = Some(store.client.clone());
self.request.apply_shard(shard, store)
self.request.apply_store(store)
}
}
@ -110,13 +121,17 @@ impl<P: Plan + Shardable> Shardable for PreserveShard<P> {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
self.shard = Some(shard.clone());
self.inner.apply_shard(shard, store)
self.inner.apply_shard(shard)
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.inner.apply_store(store)
}
}
@ -130,13 +145,17 @@ impl<P: Plan + Shardable, PdC: PdClient> Shardable for CleanupLocks<P, PdC> {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
self.inner.apply_shard(shard)
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.store = Some(store.clone());
self.inner.apply_shard(shard, store)
self.inner.apply_store(store)
}
}
@ -152,23 +171,21 @@ macro_rules! shardable_key {
pd_client: &std::sync::Arc<impl $crate::pd::PdClient>,
) -> futures::stream::BoxStream<
'static,
$crate::Result<(Self::Shard, $crate::store::RegionStore)>,
$crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>,
> {
$crate::store::store_stream_for_keys(
$crate::store::region_stream_for_keys(
std::iter::once(self.key.clone()),
pd_client.clone(),
)
}
fn apply_shard(
&mut self,
mut shard: Self::Shard,
store: &$crate::store::RegionStore,
) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, mut shard: Self::Shard) {
assert!(shard.len() == 1);
self.key = shard.pop().unwrap();
Ok(())
}
fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)
}
}
};
@ -186,21 +203,19 @@ macro_rules! shardable_keys {
pd_client: &std::sync::Arc<impl $crate::pd::PdClient>,
) -> futures::stream::BoxStream<
'static,
$crate::Result<(Self::Shard, $crate::store::RegionStore)>,
$crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>,
> {
let mut keys = self.keys.clone();
keys.sort();
$crate::store::store_stream_for_keys(keys.into_iter(), pd_client.clone())
$crate::store::region_stream_for_keys(keys.into_iter(), pd_client.clone())
}
fn apply_shard(
&mut self,
shard: Self::Shard,
store: &$crate::store::RegionStore,
) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.keys = shard.into_iter().map(Into::into).collect();
Ok(())
}
fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)
}
}
};
@ -242,7 +257,8 @@ macro_rules! shardable_range {
fn shards(
&self,
pd_client: &Arc<impl $crate::pd::PdClient>,
) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::store::RegionStore)>> {
) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>>
{
let mut start_key = self.start_key.clone().into();
let mut end_key = self.end_key.clone().into();
// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
@ -250,16 +266,10 @@ macro_rules! shardable_range {
if self.is_reverse() {
std::mem::swap(&mut start_key, &mut end_key);
}
$crate::store::store_stream_for_range((start_key, end_key), pd_client.clone())
$crate::store::region_stream_for_range((start_key, end_key), pd_client.clone())
}
fn apply_shard(
&mut self,
shard: Self::Shard,
store: &$crate::store::RegionStore,
) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
// As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request.
self.start_key = shard.0;
@ -267,7 +277,10 @@ macro_rules! shardable_range {
if self.is_reverse() {
std::mem::swap(&mut self.start_key, &mut self.end_key);
}
Ok(())
}
fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
self.set_leader(&store.region_with_leader)
}
}
};

View File

@ -38,46 +38,37 @@ pub struct Store {
}
/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order
pub fn store_stream_for_keys<K, KOut, PdC>(
pub fn region_stream_for_keys<K, KOut, PdC>(
key_data: impl Iterator<Item = K> + Send + Sync + 'static,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<KOut>, RegionStore)>>
) -> BoxStream<'static, Result<(Vec<KOut>, RegionWithLeader)>>
where
PdC: PdClient,
K: AsRef<Key> + Into<KOut> + Send + Sync + 'static,
KOut: Send + Sync + 'static,
{
pd_client
.clone()
.group_keys_by_region(key_data)
.and_then(move |(region, key)| {
pd_client
.clone()
.map_region_to_store(region)
.map_ok(move |store| (key, store))
})
.boxed()
pd_client.clone().group_keys_by_region(key_data)
}
#[allow(clippy::type_complexity)]
pub fn store_stream_for_range<PdC: PdClient>(
pub fn region_stream_for_range<PdC: PdClient>(
range: (Vec<u8>, Vec<u8>),
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<((Vec<u8>, Vec<u8>), RegionStore)>> {
) -> BoxStream<'static, Result<((Vec<u8>, Vec<u8>), RegionWithLeader)>> {
let bnd_range = if range.1.is_empty() {
BoundRange::range_from(range.0.clone().into())
} else {
BoundRange::from(range.clone())
};
pd_client
.stores_for_range(bnd_range)
.map_ok(move |store| {
let region_range = store.region_with_leader.range();
.regions_for_range(bnd_range)
.map_ok(move |region| {
let region_range = region.range();
let result_range = range_intersection(
region_range,
(range.0.clone().into(), range.1.clone().into()),
);
((result_range.0.into(), result_range.1.into()), store)
((result_range.0.into(), result_range.1.into()), region)
})
.boxed()
}
@ -95,18 +86,9 @@ fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key)
(max(lower, range.0), up)
}
pub fn store_stream_for_ranges<PdC: PdClient>(
pub fn region_stream_for_ranges<PdC: PdClient>(
ranges: Vec<kvrpcpb::KeyRange>,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<kvrpcpb::KeyRange>, RegionStore)>> {
pd_client
.clone()
.group_ranges_by_region(ranges)
.and_then(move |(region, range)| {
pd_client
.clone()
.map_region_to_store(region)
.map_ok(move |store| (range, store))
})
.boxed()
) -> BoxStream<'static, Result<(Vec<kvrpcpb::KeyRange>, RegionWithLeader)>> {
pd_client.clone().group_ranges_by_region(ranges)
}

View File

@ -56,7 +56,7 @@ macro_rules! impl_request {
fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()> {
let ctx = self.context.get_or_insert(kvrpcpb::Context::default());
let leader_peer = leader.leader.as_ref().ok_or(Error::LeaderNotFound {
region_id: leader.region.id,
region: leader.ver_id(),
})?;
ctx.region_id = leader.region.id;
ctx.region_epoch = leader.region.region_epoch.clone();

View File

@ -19,6 +19,7 @@ use crate::proto::kvrpcpb::TxnHeartBeatResponse;
use crate::proto::kvrpcpb::TxnInfo;
use crate::proto::kvrpcpb::{self};
use crate::proto::pdpb::Timestamp;
use crate::region::RegionWithLeader;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::CollectWithShard;
@ -37,10 +38,10 @@ use crate::reversible_range_request;
use crate::shardable_key;
use crate::shardable_keys;
use crate::shardable_range;
use crate::store::store_stream_for_range;
use crate::store::RegionStore;
use crate::store::Request;
use crate::store::{store_stream_for_keys, Store};
use crate::store::Store;
use crate::store::{region_stream_for_keys, region_stream_for_range};
use crate::timestamp::TimestampExt;
use crate::transaction::requests::kvrpcpb::prewrite_request::PessimisticAction;
use crate::transaction::HasLocks;
@ -283,26 +284,24 @@ impl Shardable for kvrpcpb::PrewriteRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
let mut mutations = self.mutations.clone();
mutations.sort_by(|a, b| a.key.cmp(&b.key));
store_stream_for_keys(mutations.into_iter(), pd_client.clone())
region_stream_for_keys(mutations.into_iter(), pd_client.clone())
.flat_map(|result| match result {
Ok((mutations, store)) => stream::iter(kvrpcpb::PrewriteRequest::batches(
Ok((mutations, region)) => stream::iter(kvrpcpb::PrewriteRequest::batches(
mutations,
TXN_COMMIT_BATCH_SIZE,
))
.map(move |batch| Ok((batch, store.clone())))
.map(move |batch| Ok((batch, region.clone())))
.boxed(),
Err(e) => stream::iter(Err(e)).boxed(),
})
.boxed()
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
// Only need to set secondary keys if we're sending the primary key.
if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) {
self.secondaries = vec![];
@ -314,7 +313,10 @@ impl Shardable for kvrpcpb::PrewriteRequest {
}
self.mutations = shard;
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -351,15 +353,15 @@ impl Shardable for kvrpcpb::CommitRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
let mut keys = self.keys.clone();
keys.sort();
store_stream_for_keys(keys.into_iter(), pd_client.clone())
region_stream_for_keys(keys.into_iter(), pd_client.clone())
.flat_map(|result| match result {
Ok((keys, store)) => {
Ok((keys, region)) => {
stream::iter(kvrpcpb::CommitRequest::batches(keys, TXN_COMMIT_BATCH_SIZE))
.map(move |batch| Ok((batch, store.clone())))
.map(move |batch| Ok((batch, region.clone())))
.boxed()
}
Err(e) => stream::iter(Err(e)).boxed(),
@ -367,10 +369,12 @@ impl Shardable for kvrpcpb::CommitRequest {
.boxed()
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.keys = shard.into_iter().map(Into::into).collect();
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -452,16 +456,18 @@ impl Shardable for kvrpcpb::PessimisticLockRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
let mut mutations = self.mutations.clone();
mutations.sort_by(|a, b| a.key.cmp(&b.key));
store_stream_for_keys(mutations.into_iter(), pd_client.clone())
region_stream_for_keys(mutations.into_iter(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.mutations = shard;
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -552,17 +558,19 @@ impl Shardable for kvrpcpb::ScanLockRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_range(
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_range(
(self.start_key.clone(), self.end_key.clone()),
pd_client.clone(),
)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.start_key = shard.0;
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -616,15 +624,17 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
}
fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, mut shard: Self::Shard) {
assert!(shard.len() == 1);
self.primary_lock = shard.pop().unwrap();
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
@ -674,15 +684,17 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
}
fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, mut shard: Self::Shard) {
assert!(shard.len() == 1);
self.primary_key = shard.pop().unwrap();
Ok(())
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}

View File

@ -1,7 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use derive_new::new;
use log::debug;
use log::{debug, trace};
use crate::BoundRange;
use crate::Key;
@ -25,7 +25,7 @@ pub struct Snapshot {
impl Snapshot {
/// Get the value associated with the given key.
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking get request on snapshot");
trace!("invoking get request on snapshot");
self.transaction.get(key).await
}

View File

@ -9,8 +9,8 @@ use std::time::Instant;
use derive_new::new;
use fail::fail_point;
use futures::prelude::*;
use log::debug;
use log::warn;
use log::{debug, trace};
use tokio::time::Duration;
use crate::backoff::Backoff;
@ -132,7 +132,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking transactional get request");
trace!("invoking transactional get request");
self.check_allow_operation().await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
@ -461,7 +461,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn put(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!("invoking transactional put request");
trace!("invoking transactional put request");
self.check_allow_operation().await?;
let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
if self.is_pessimistic() {

View File

@ -46,6 +46,8 @@ pub async fn clear_tikv() {
// To test with multiple regions, prewrite some data. Tests that hope to test
// with multiple regions should use keys in the corresponding ranges.
pub async fn init() -> Result<()> {
let _ = env_logger::try_init();
if env::var(ENV_ENABLE_MULIT_REGION).is_ok() {
// 1000 keys: 0..1000
let keys_1 = std::iter::successors(Some(0u32), |x| Some(x + 1))