The minimal region cache (#291)

This commit is contained in:
Ziqian Qin 2021-07-21 15:33:42 +08:00 committed by GitHub
parent 4404c7e1f0
commit c14f23a545
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1328 additions and 478 deletions

View File

@ -20,6 +20,7 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: check
args: --all-targets --all-features
fmt:
name: rustfmt
@ -49,7 +50,7 @@ jobs:
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features
args: --all-targets --all-features -- -D clippy::all
name: Clippy Output
unit-test:
name: unit test
@ -98,7 +99,6 @@ jobs:
path: |
~/.cargo/.crates.toml
~/.cargo/.crates2.json
~/.cargo/bin
~/.cargo/registry/index
~/.cargo/registry/cache
key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('Cargo.lock') }}

View File

@ -34,7 +34,8 @@ serde_derive = "1.0"
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-term = { version = "2.4" }
thiserror = "1"
tokio = { version = "1.0", features = [ "sync", "time" ] }
tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] }
async-recursion = "0.3"
tikv-client-common = { version = "0.1.0", path = "tikv-client-common" }
tikv-client-pd = { version = "0.1.0", path = "tikv-client-pd" }
@ -50,7 +51,7 @@ proptest = "1"
proptest-derive = "0.3"
serial_test = "0.5.0"
simple_logger = "1"
tokio = { version = "1.0", features = [ "sync", "rt-multi-thread", "macros" ] }
tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] }
reqwest = {version = "0.11", default-features = false, features = ["native-tls-vendored"]}
serde_json = "1"

View File

@ -3,9 +3,9 @@
default: check
check:
cargo check --all
cargo check --all --all-targets --all-features
cargo fmt -- --check
cargo clippy -- -D clippy::all
cargo clippy --all-targets --all-features -- -D clippy::all
unit-test:
cargo test --all

View File

@ -76,6 +76,11 @@ impl Backoff {
self.kind == BackoffKind::None
}
/// Returns the number of attempts
pub fn current_attempts(&self) -> u32 {
self.current_attempts
}
/// Don't wait. Usually indicates that we should not retry a request.
pub const fn no_backoff() -> Backoff {
Backoff {

View File

@ -104,6 +104,7 @@ mod pd;
#[doc(hidden)]
pub mod raw;
mod region;
mod region_cache;
mod stats;
mod store;
mod timestamp;

View File

@ -7,12 +7,13 @@
use crate::{
pd::{PdClient, PdRpcClient, RetryClient},
region::{Region, RegionId},
store::Store,
region::{RegionId, RegionWithLeader},
store::RegionStore,
Config, Error, Key, Result, Timestamp,
};
use async_trait::async_trait;
use derive_new::new;
use slog::{Drain, Logger};
use std::{any::Any, sync::Arc};
use tikv_client_proto::metapb;
use tikv_client_store::{KvClient, KvConnect, Request};
@ -21,8 +22,16 @@ use tikv_client_store::{KvClient, KvConnect, Request};
/// client can be tested without doing any RPC calls.
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
let config = Config::default();
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let logger = Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
);
PdRpcClient::new(
&config,
config.clone(),
|_, _| MockKvConnect,
|e, sm| {
futures::future::ok(RetryClient::new_with_cluster(
@ -33,11 +42,13 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
))
},
false,
logger,
)
.await
.unwrap()
}
#[allow(clippy::type_complexity)]
#[derive(new, Default, Clone)]
pub struct MockKvClient {
pub addr: String,
@ -93,27 +104,31 @@ impl MockPdClient {
}
}
pub fn region1() -> Region {
let mut region = Region::default();
pub fn region1() -> RegionWithLeader {
let mut region = RegionWithLeader::default();
region.region.id = 1;
region.region.set_start_key(vec![0]);
region.region.set_end_key(vec![10]);
let mut leader = metapb::Peer::default();
leader.store_id = 41;
let leader = metapb::Peer {
store_id: 41,
..Default::default()
};
region.leader = Some(leader);
region
}
pub fn region2() -> Region {
let mut region = Region::default();
pub fn region2() -> RegionWithLeader {
let mut region = RegionWithLeader::default();
region.region.id = 2;
region.region.set_start_key(vec![10]);
region.region.set_end_key(vec![250, 250]);
let mut leader = metapb::Peer::default();
leader.store_id = 42;
let leader = metapb::Peer {
store_id: 42,
..Default::default()
};
region.leader = Some(leader);
region
@ -124,11 +139,11 @@ impl MockPdClient {
impl PdClient for MockPdClient {
type KvClient = MockKvClient;
async fn map_region_to_store(self: Arc<Self>, region: Region) -> Result<Store> {
Ok(Store::new(region, Arc::new(self.client.clone())))
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
Ok(RegionStore::new(region, Arc::new(self.client.clone())))
}
async fn region_for_key(&self, key: &Key) -> Result<Region> {
async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
let bytes: &[_] = key.into();
let region = if bytes.is_empty() || bytes[0] < 10 {
Self::region1()
@ -139,11 +154,11 @@ impl PdClient for MockPdClient {
Ok(region)
}
async fn region_for_id(&self, id: RegionId) -> Result<Region> {
async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
match id {
1 => Ok(Self::region1()),
2 => Ok(Self::region2()),
_ => Err(Error::RegionNotFound { region_id: id }),
_ => Err(Error::RegionNotFoundInResponse { region_id: id }),
}
}
@ -154,11 +169,21 @@ impl PdClient for MockPdClient {
async fn update_safepoint(self: Arc<Self>, _safepoint: u64) -> Result<bool> {
unimplemented!()
}
async fn update_leader(
&self,
_ver_id: crate::region::RegionVerId,
_leader: metapb::Peer,
) -> Result<()> {
todo!()
}
async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
}
pub fn mock_store() -> Store {
Store {
region: Region::default(),
pub fn mock_store() -> RegionStore {
RegionStore {
region_with_leader: RegionWithLeader::default(),
client: Arc::new(MockKvClient::new("foo".to_owned(), None)),
}
}

View File

@ -3,23 +3,21 @@
use crate::{
compat::stream_fn,
kv::codec,
pd::RetryClient,
region::{Region, RegionId},
store::Store,
pd::{retry::RetryClientTrait, RetryClient},
region::{RegionId, RegionVerId, RegionWithLeader},
region_cache::RegionCache,
store::RegionStore,
BoundRange, Config, Key, Result, SecurityManager, Timestamp,
};
use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use grpcio::{EnvBuilder, Environment};
use slog::{Drain, Logger};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
thread,
};
use slog::Logger;
use std::{collections::HashMap, sync::Arc, thread};
use tikv_client_pd::Cluster;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::{kvrpcpb, metapb};
use tikv_client_store::{KvClient, KvConnect, TikvConnect};
use tokio::sync::RwLock;
const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "tikv-client";
@ -46,25 +44,25 @@ pub trait PdClient: Send + Sync + 'static {
type KvClient: KvClient + Send + Sync + 'static;
/// In transactional API, `region` is decoded (keys in raw format).
async fn map_region_to_store(self: Arc<Self>, region: Region) -> Result<Store>;
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore>;
/// In transactional API, the key and returned region are both decoded (keys in raw format).
async fn region_for_key(&self, key: &Key) -> Result<Region>;
async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader>;
/// In transactional API, the returned region is decoded (keys in raw format)
async fn region_for_id(&self, id: RegionId) -> Result<Region>;
async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader>;
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp>;
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;
/// In transactional API, `key` is in raw format
async fn store_for_key(self: Arc<Self>, key: &Key) -> Result<Store> {
async fn store_for_key(self: Arc<Self>, key: &Key) -> Result<RegionStore> {
let region = self.region_for_key(key).await?;
self.map_region_to_store(region).await
}
async fn store_for_id(self: Arc<Self>, id: RegionId) -> Result<Store> {
async fn store_for_id(self: Arc<Self>, id: RegionId) -> Result<RegionStore> {
let region = self.region_for_id(id).await?;
self.map_region_to_store(region).await
}
@ -101,7 +99,10 @@ pub trait PdClient: Send + Sync + 'static {
}
/// Returns a Stream which iterates over the contexts for each region covered by range.
fn stores_for_range(self: Arc<Self>, range: BoundRange) -> BoxStream<'static, Result<Store>> {
fn stores_for_range(
self: Arc<Self>,
range: BoundRange,
) -> BoxStream<'static, Result<RegionStore>> {
let (start_key, end_key) = range.into_keys();
stream_fn(Some(start_key), move |start_key| {
let end_key = end_key.clone();
@ -192,13 +193,17 @@ pub trait PdClient: Send + Sync + 'static {
.boxed()
}
fn decode_region(mut region: Region, enable_codec: bool) -> Result<Region> {
fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
if enable_codec {
codec::decode_bytes_in_place(&mut region.region.mut_start_key(), false)?;
codec::decode_bytes_in_place(&mut region.region.mut_end_key(), false)?;
}
Ok(region)
}
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;
async fn invalidate_region_cache(&self, ver_id: RegionVerId);
}
/// This client converts requests for the logical TiKV cluster into requests
@ -208,6 +213,7 @@ pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
enable_codec: bool,
region_cache: RegionCache<RetryClient<Cl>>,
logger: Logger,
}
@ -215,26 +221,27 @@ pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
type KvClient = KvC::KvClient;
async fn map_region_to_store(self: Arc<Self>, region: Region) -> Result<Store> {
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
let store_id = region.get_store_id()?;
let store = self.pd.clone().get_store(store_id).await?;
let kv_client = self.kv_client(store.get_address())?;
Ok(Store::new(region, Arc::new(kv_client)))
let store = self.region_cache.get_store_by_id(store_id).await?;
let kv_client = self.kv_client(store.get_address()).await?;
Ok(RegionStore::new(region, Arc::new(kv_client)))
}
async fn region_for_key(&self, key: &Key) -> Result<Region> {
async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
let enable_codec = self.enable_codec;
let key = if enable_codec {
key.to_encoded().into()
key.to_encoded()
} else {
key.clone().into()
key.clone()
};
let region = self.pd.clone().get_region(key).await?;
let region = self.region_cache.get_region_by_key(&key).await?;
Self::decode_region(region, enable_codec)
}
async fn region_for_id(&self, id: RegionId) -> Result<Region> {
let region = self.pd.clone().get_region_by_id(id).await?;
async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
let region = self.region_cache.get_region_by_id(id).await?;
Self::decode_region(region, self.enable_codec)
}
@ -245,21 +252,31 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
self.pd.clone().update_safepoint(safepoint).await
}
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()> {
self.region_cache.update_leader(ver_id, leader).await
}
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
self.region_cache.invalidate_region_cache(ver_id).await
}
}
impl PdRpcClient<TikvConnect, Cluster> {
pub async fn connect(
pd_endpoints: &[String],
config: &Config,
config: Config,
enable_codec: bool,
logger: Logger,
) -> Result<PdRpcClient> {
PdRpcClient::new(
config,
config.clone(),
|env, security_mgr| TikvConnect::new(env, security_mgr, config.timeout),
|env, security_mgr| {
RetryClient::connect(env, pd_endpoints, security_mgr, config.timeout)
},
enable_codec,
logger,
)
.await
}
@ -276,19 +293,17 @@ fn thread_name(prefix: &str) -> String {
impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
pub async fn new<PdFut, MakeKvC, MakePd>(
config: &Config,
config: Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_codec: bool,
logger: Logger,
) -> Result<PdRpcClient<KvC, Cl>>
where
PdFut: Future<Output = Result<RetryClient<Cl>>>,
MakeKvC: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> KvC,
MakePd: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> PdFut,
{
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let logger = Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!());
info!(logger, "Logging ready!");
let env = Arc::new(
EnvBuilder::new()
.cq_count(CQ_COUNT)
@ -308,26 +323,30 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
let pd = Arc::new(pd(env.clone(), security_mgr.clone()).await?);
let kv_client_cache = Default::default();
Ok(PdRpcClient {
pd,
pd: pd.clone(),
kv_client_cache,
kv_connect: kv_connect(env, security_mgr),
enable_codec,
region_cache: RegionCache::new(pd),
logger,
})
}
fn kv_client(&self, address: &str) -> Result<KvC::KvClient> {
if let Some(client) = self.kv_client_cache.read().unwrap().get(address) {
async fn kv_client(&self, address: &str) -> Result<KvC::KvClient> {
if let Some(client) = self.kv_client_cache.read().await.get(address) {
return Ok(client.clone());
};
info!(self.logger, "connect to tikv endpoint: {:?}", address);
self.kv_connect.connect(address).map(|client| {
match self.kv_connect.connect(address) {
Ok(client) => {
self.kv_client_cache
.write()
.unwrap()
.await
.insert(address.to_owned(), client.clone());
client
})
Ok(client)
}
Err(e) => Err(e),
}
}
}
@ -338,16 +357,16 @@ pub mod test {
use futures::{executor, executor::block_on};
#[test]
fn test_kv_client_caching() {
#[tokio::test]
async fn test_kv_client_caching() {
let client = block_on(pd_rpc_client());
let addr1 = "foo";
let addr2 = "bar";
let kv1 = client.kv_client(&addr1).unwrap();
let kv2 = client.kv_client(&addr2).unwrap();
let kv3 = client.kv_client(&addr2).unwrap();
let kv1 = client.kv_client(addr1).await.unwrap();
let kv2 = client.kv_client(addr2).await.unwrap();
let kv3 = client.kv_client(addr2).await.unwrap();
assert!(kv1.addr != kv2.addr);
assert_eq!(kv2.addr, kv3.addr);
}
@ -395,13 +414,13 @@ pub mod test {
let k3: Key = vec![11, 4].into();
let range1 = (k1, k2.clone()).into();
let mut stream = executor::block_on_stream(client.clone().stores_for_range(range1));
assert_eq!(stream.next().unwrap().unwrap().region.id(), 1);
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1);
assert!(stream.next().is_none());
let range2 = (k2, k3).into();
let mut stream = executor::block_on_stream(client.stores_for_range(range2));
assert_eq!(stream.next().unwrap().unwrap().region.id(), 1);
assert_eq!(stream.next().unwrap().unwrap().region.id(), 2);
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1);
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 2);
assert!(stream.next().is_none());
}

View File

@ -2,4 +2,4 @@ mod client;
mod retry;
pub use client::{PdClient, PdRpcClient};
pub use retry::RetryClient;
pub use retry::{RetryClient, RetryClientTrait};

View File

@ -3,7 +3,7 @@
//! A utility module for managing and retrying PD requests.
use crate::{
region::{Region, RegionId, StoreId},
region::{RegionId, RegionWithLeader, StoreId},
stats::pd_stats,
Error, Result, SecurityManager,
};
@ -28,6 +28,22 @@ const RECONNECT_INTERVAL_SEC: u64 = 1;
const MAX_REQUEST_COUNT: usize = 5;
const LEADER_CHANGE_RETRY: usize = 10;
#[async_trait]
pub trait RetryClientTrait {
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
// It does not know about encoding. Caller should take care of it.
async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<RegionWithLeader>;
async fn get_region_by_id(self: Arc<Self>, region_id: RegionId) -> Result<RegionWithLeader>;
async fn get_store(self: Arc<Self>, id: StoreId) -> Result<metapb::Store>;
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>>;
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp>;
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;
}
/// Client for communication with a PD cluster. Has the facility to reconnect to the cluster.
pub struct RetryClient<Cl = Cluster> {
// Tuple is the cluster and the time of the cluster's last reconnect.
@ -104,10 +120,13 @@ impl RetryClient<Cluster> {
timeout,
})
}
}
#[async_trait]
impl RetryClientTrait for RetryClient<Cluster> {
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
// It does not know about encoding. Caller should take care of it.
pub async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<Region> {
async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<RegionWithLeader> {
retry!(self, "get_region", |cluster| {
let key = key.clone();
async {
@ -121,16 +140,18 @@ impl RetryClient<Cluster> {
})
}
pub async fn get_region_by_id(self: Arc<Self>, region_id: RegionId) -> Result<Region> {
async fn get_region_by_id(self: Arc<Self>, region_id: RegionId) -> Result<RegionWithLeader> {
retry!(self, "get_region_by_id", |cluster| async {
cluster
.get_region_by_id(region_id, self.timeout)
.await
.and_then(|resp| region_from_response(resp, || Error::RegionNotFound { region_id }))
.and_then(|resp| {
region_from_response(resp, || Error::RegionNotFoundInResponse { region_id })
})
})
}
pub async fn get_store(self: Arc<Self>, id: StoreId) -> Result<metapb::Store> {
async fn get_store(self: Arc<Self>, id: StoreId) -> Result<metapb::Store> {
retry!(self, "get_store", |cluster| async {
cluster
.get_store(id, self.timeout)
@ -140,7 +161,7 @@ impl RetryClient<Cluster> {
}
#[allow(dead_code)]
pub async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>> {
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>> {
retry!(self, "get_all_stores", |cluster| async {
cluster
.get_all_stores(self.timeout)
@ -149,11 +170,11 @@ impl RetryClient<Cluster> {
})
}
pub async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
retry!(self, "get_timestamp", |cluster| cluster.get_timestamp())
}
pub async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
retry!(self, "update_gc_safepoint", |cluster| async {
cluster
.update_safepoint(safepoint, self.timeout)
@ -174,9 +195,9 @@ impl fmt::Debug for RetryClient {
fn region_from_response(
resp: pdpb::GetRegionResponse,
err: impl FnOnce() -> Error,
) -> Result<Region> {
) -> Result<RegionWithLeader> {
let region = resp.region.ok_or_else(err)?;
Ok(Region::new(region, resp.leader))
Ok(RegionWithLeader::new(region, resp.leader))
}
// A node-like thing that can be connected to.
@ -209,13 +230,16 @@ impl Reconnect for RetryClient<Cluster> {
mod test {
use super::*;
use futures::{executor, future::ready};
use std::sync::Mutex;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Mutex,
};
use tikv_client_common::internal_err;
#[test]
fn test_reconnect() {
struct MockClient {
reconnect_count: Mutex<usize>,
reconnect_count: AtomicUsize,
cluster: RwLock<((), Instant)>,
}
@ -224,7 +248,8 @@ mod test {
type Cl = ();
async fn reconnect(&self, _: u64) -> Result<()> {
*self.reconnect_count.lock().unwrap() += 1;
self.reconnect_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
// Not actually unimplemented, we just don't care about the error.
Err(Error::Unimplemented)
}
@ -240,23 +265,35 @@ mod test {
executor::block_on(async {
let client = Arc::new(MockClient {
reconnect_count: Mutex::new(0),
reconnect_count: AtomicUsize::new(0),
cluster: RwLock::new(((), Instant::now())),
});
assert!(retry_err(client.clone()).await.is_err());
assert_eq!(*client.reconnect_count.lock().unwrap(), MAX_REQUEST_COUNT);
assert_eq!(
client
.reconnect_count
.load(std::sync::atomic::Ordering::SeqCst),
MAX_REQUEST_COUNT
);
*client.reconnect_count.lock().unwrap() = 0;
client
.reconnect_count
.store(0, std::sync::atomic::Ordering::SeqCst);
assert!(retry_ok(client.clone()).await.is_ok());
assert_eq!(*client.reconnect_count.lock().unwrap(), 0);
assert_eq!(
client
.reconnect_count
.load(std::sync::atomic::Ordering::SeqCst),
0
);
})
}
#[test]
fn test_retry() {
struct MockClient {
cluster: RwLock<(Mutex<usize>, Instant)>,
cluster: RwLock<(AtomicUsize, Instant)>,
}
#[async_trait]
@ -270,15 +307,13 @@ mod test {
async fn retry_max_err(
client: Arc<MockClient>,
max_retries: Arc<Mutex<usize>>,
max_retries: Arc<AtomicUsize>,
) -> Result<()> {
retry!(client, "test", |c| {
let mut c = c.lock().unwrap();
*c += 1;
c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let mut max_retries = max_retries.lock().unwrap();
*max_retries -= 1;
if *max_retries == 0 {
let max_retries = max_retries.fetch_sub(1, Ordering::SeqCst) - 1;
if max_retries == 0 {
ready(Ok(()))
} else {
ready(Err(internal_err!("whoops")))
@ -288,15 +323,13 @@ mod test {
async fn retry_max_ok(
client: Arc<MockClient>,
max_retries: Arc<Mutex<usize>>,
max_retries: Arc<AtomicUsize>,
) -> Result<()> {
retry!(client, "test", |c| {
let mut c = c.lock().unwrap();
*c += 1;
c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let mut max_retries = max_retries.lock().unwrap();
*max_retries -= 1;
if *max_retries == 0 {
let max_retries = max_retries.fetch_sub(1, Ordering::SeqCst) - 1;
if max_retries == 0 {
ready(Ok(()))
} else {
ready(Err(internal_err!("whoops")))
@ -306,23 +339,23 @@ mod test {
executor::block_on(async {
let client = Arc::new(MockClient {
cluster: RwLock::new((Mutex::new(0), Instant::now())),
cluster: RwLock::new((AtomicUsize::new(0), Instant::now())),
});
let max_retries = Arc::new(Mutex::new(1000));
let max_retries = Arc::new(AtomicUsize::new(1000));
assert!(retry_max_err(client.clone(), max_retries).await.is_err());
assert_eq!(
*client.cluster.read().await.0.lock().unwrap(),
client.cluster.read().await.0.load(Ordering::SeqCst),
LEADER_CHANGE_RETRY
);
let client = Arc::new(MockClient {
cluster: RwLock::new((Mutex::new(0), Instant::now())),
cluster: RwLock::new((AtomicUsize::new(0), Instant::now())),
});
let max_retries = Arc::new(Mutex::new(2));
let max_retries = Arc::new(AtomicUsize::new(2));
assert!(retry_max_ok(client.clone(), max_retries).await.is_ok());
assert_eq!(*client.cluster.read().await.0.lock().unwrap(), 2);
assert_eq!(client.cluster.read().await.0.load(Ordering::SeqCst), 2);
})
}
}

View File

@ -7,7 +7,7 @@ use crate::{
config::Config,
pd::PdRpcClient,
raw::lowering::*,
request::{Collect, Plan},
request::{Collect, CollectSingle, Plan},
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
};
use slog::{Drain, Logger};
@ -81,11 +81,18 @@ impl Client {
) -> Result<Client> {
let logger = optional_logger.unwrap_or_else(|| {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!())
Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
)
});
debug!(logger, "creating new raw client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, false).await?);
let rpc =
Arc::new(PdRpcClient::connect(&pd_endpoints, config, false, logger.clone()).await?);
Ok(Client {
rpc,
cf: None,
@ -165,9 +172,8 @@ impl Client {
debug!(self.logger, "invoking raw get request");
let request = new_raw_get_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.post_process_default()
.plan();
plan.execute().await
@ -198,8 +204,7 @@ impl Client {
debug!(self.logger, "invoking raw batch_get request");
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
.plan();
plan.execute()
@ -227,9 +232,8 @@ impl Client {
debug!(self.logger, "invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.extract_error()
.plan();
plan.execute().await?;
@ -264,8 +268,7 @@ impl Client {
self.atomic,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
@ -293,9 +296,8 @@ impl Client {
debug!(self.logger, "invoking raw delete request");
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.extract_error()
.plan();
plan.execute().await?;
@ -325,8 +327,7 @@ impl Client {
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
@ -353,8 +354,7 @@ impl Client {
self.assert_non_atomic()?;
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
@ -510,9 +510,8 @@ impl Client {
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
.single_region()
.await?
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.post_process_default()
.plan();
plan.execute().await
@ -533,8 +532,7 @@ impl Client {
let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
.plan();
let res = plan.execute().await;
@ -564,8 +562,7 @@ impl Client {
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
.plan();
plan.execute().await

View File

@ -2,9 +2,12 @@
use super::RawRpcRequest;
use crate::{
collect_first,
pd::PdClient,
request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey},
store::{store_stream_for_keys, store_stream_for_ranges, Store},
request::{
Collect, CollectSingle, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey,
},
store::{store_stream_for_keys, store_stream_for_ranges, RegionStore},
transaction::HasLocks,
util::iter::FlatMapOkIterExt,
ColumnFamily, KvPair, Result, Value,
@ -25,6 +28,9 @@ impl KvRequest for kvrpcpb::RawGetRequest {
type Response = kvrpcpb::RawGetResponse;
}
shardable_key!(kvrpcpb::RawGetRequest);
collect_first!(kvrpcpb::RawGetResponse);
impl SingleKey for kvrpcpb::RawGetRequest {
fn key(&self) -> &Vec<u8> {
&self.key
@ -91,6 +97,8 @@ impl KvRequest for kvrpcpb::RawPutRequest {
type Response = kvrpcpb::RawPutResponse;
}
shardable_key!(kvrpcpb::RawPutRequest);
collect_first!(kvrpcpb::RawPutResponse);
impl SingleKey for kvrpcpb::RawPutRequest {
fn key(&self) -> &Vec<u8> {
&self.key
@ -120,7 +128,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
let mut pairs = self.pairs.clone();
pairs.sort_by(|a, b| a.key.cmp(&b.key));
store_stream_for_keys(
@ -129,8 +137,8 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.set_context(store.region.context()?);
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.set_pairs(shard);
Ok(())
}
@ -153,6 +161,8 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
type Response = kvrpcpb::RawDeleteResponse;
}
shardable_key!(kvrpcpb::RawDeleteRequest);
collect_first!(kvrpcpb::RawDeleteResponse);
impl SingleKey for kvrpcpb::RawDeleteRequest {
fn key(&self) -> &Vec<u8> {
&self.key
@ -254,12 +264,12 @@ impl Shardable for kvrpcpb::RawBatchScanRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_ranges(self.ranges.clone(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.set_context(store.region.context()?);
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.set_ranges(shard);
Ok(())
}
@ -297,6 +307,8 @@ impl KvRequest for kvrpcpb::RawCasRequest {
type Response = kvrpcpb::RawCasResponse;
}
shardable_key!(kvrpcpb::RawCasRequest);
collect_first!(kvrpcpb::RawCasResponse);
impl SingleKey for kvrpcpb::RawCasRequest {
fn key(&self) -> &Vec<u8> {
&self.key
@ -372,8 +384,10 @@ mod test {
let mut resp = kvrpcpb::RawScanResponse::default();
for i in req.start_key[0]..req.end_key[0] {
let mut kv = kvrpcpb::KvPair::default();
kv.key = vec![i];
let kv = kvrpcpb::KvPair {
key: vec![i],
..Default::default()
};
resp.kvs.push(kv);
}
@ -390,10 +404,9 @@ mod test {
key_only: true,
..Default::default()
};
let plan = crate::request::PlanBuilder::new(client.clone(), scan)
let plan = crate::request::PlanBuilder::new(client, scan)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
.plan();
let scan = executor::block_on(async { plan.execute().await }).unwrap();

View File

@ -1,3 +1,5 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{Error, Key, Result};
use derive_new::new;
use tikv_client_proto::{kvrpcpb, metapb};
@ -22,12 +24,14 @@ pub struct RegionVerId {
///
/// In TiKV all data is partitioned by range. Each partition is called a region.
#[derive(new, Clone, Default, Debug, PartialEq)]
pub struct Region {
pub struct RegionWithLeader {
pub region: metapb::Region,
pub leader: Option<metapb::Peer>,
}
impl Region {
impl Eq for RegionWithLeader {}
impl RegionWithLeader {
pub fn contains(&self, key: &Key) -> bool {
let key: &[u8] = key.into();
let start_key = self.region.get_start_key();

494
src/region_cache.rs Normal file
View File

@ -0,0 +1,494 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
pd::{RetryClient, RetryClientTrait},
region::{RegionId, RegionVerId, RegionWithLeader, StoreId},
Key, Result,
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};
use tikv_client_common::Error;
use tikv_client_pd::Cluster;
use tikv_client_proto::metapb::{self, Store};
use tokio::sync::{Notify, RwLock};
const MAX_RETRY_WAITING_CONCURRENT_REQUEST: usize = 4;
struct RegionCacheMap {
/// RegionVerID -> Region. It stores the concrete region caches.
/// RegionVerID is the unique identifer of a region *across time*.
// TODO: does it need TTL?
ver_id_to_region: HashMap<RegionVerId, RegionWithLeader>,
/// Start_key -> RegionVerID
///
/// Invariant: there are no intersecting regions in the map at any time.
key_to_ver_id: BTreeMap<Key, RegionVerId>,
/// RegionID -> RegionVerID. Note: regions with identical ID doesn't necessarily
/// mean they are the same, they can be different regions across time.
id_to_ver_id: HashMap<RegionId, RegionVerId>,
/// We don't want to spawn multiple queries querying a same region id. If a
/// request is on its way, others will wait for its completion.
on_my_way_id: HashMap<RegionId, Arc<Notify>>,
}
impl RegionCacheMap {
fn new() -> RegionCacheMap {
RegionCacheMap {
ver_id_to_region: HashMap::new(),
key_to_ver_id: BTreeMap::new(),
id_to_ver_id: HashMap::new(),
on_my_way_id: HashMap::new(),
}
}
}
pub struct RegionCache<Client = RetryClient<Cluster>> {
region_cache: RwLock<RegionCacheMap>,
store_cache: RwLock<HashMap<StoreId, Store>>,
inner_client: Arc<Client>,
}
impl<Client> RegionCache<Client> {
pub fn new(inner_client: Arc<Client>) -> RegionCache<Client> {
RegionCache {
region_cache: RwLock::new(RegionCacheMap::new()),
store_cache: RwLock::new(HashMap::new()),
inner_client,
}
}
}
impl<C: RetryClientTrait> RegionCache<C> {
// Retrieve cache entry by key. If there's no entry, query PD and update cache.
pub async fn get_region_by_key(&self, key: &Key) -> Result<RegionWithLeader> {
let region_cache_guard = self.region_cache.read().await;
let res = {
region_cache_guard
.key_to_ver_id
.range(..=key)
.next_back()
.map(|(x, y)| (x.clone(), y.clone()))
};
if let Some((_, candidate_region_ver_id)) = res {
let region = region_cache_guard
.ver_id_to_region
.get(&candidate_region_ver_id)
.unwrap();
if region.contains(key) {
return Ok(region.clone());
}
}
drop(region_cache_guard);
self.read_through_region_by_key(key.clone()).await
}
// Retrieve cache entry by RegionId. If there's no entry, query PD and update cache.
pub async fn get_region_by_id(&self, id: RegionId) -> Result<RegionWithLeader> {
for _ in 0..=MAX_RETRY_WAITING_CONCURRENT_REQUEST {
let region_cache_guard = self.region_cache.read().await;
// check cache
let ver_id = region_cache_guard.id_to_ver_id.get(&id);
if let Some(ver_id) = ver_id {
let region = region_cache_guard.ver_id_to_region.get(ver_id).unwrap();
return Ok(region.clone());
}
// check concurrent requests
let notify = region_cache_guard.on_my_way_id.get(&id).cloned();
drop(region_cache_guard);
if let Some(n) = notify {
n.notified().await;
continue;
} else {
return self.read_through_region_by_id(id).await;
}
}
Err(Error::StringError(format!(
"Concurrent PD requests failed for {} times",
MAX_RETRY_WAITING_CONCURRENT_REQUEST
)))
}
pub async fn get_store_by_id(&self, id: StoreId) -> Result<Store> {
let store = self.store_cache.read().await.get(&id).cloned();
match store {
Some(store) => Ok(store),
None => self.read_through_store_by_id(id).await,
}
}
/// Force read through (query from PD) and update cache
pub async fn read_through_region_by_key(&self, key: Key) -> Result<RegionWithLeader> {
let region = self.inner_client.clone().get_region(key.into()).await?;
self.add_region(region.clone()).await;
Ok(region)
}
/// Force read through (query from PD) and update cache
async fn read_through_region_by_id(&self, id: RegionId) -> Result<RegionWithLeader> {
// put a notify to let others know the region id is being queried
let notify = Arc::new(Notify::new());
{
let mut region_cache_guard = self.region_cache.write().await;
region_cache_guard.on_my_way_id.insert(id, notify.clone());
}
let region = self.inner_client.clone().get_region_by_id(id).await?;
self.add_region(region.clone()).await;
// notify others
{
let mut region_cache_guard = self.region_cache.write().await;
notify.notify_waiters();
region_cache_guard.on_my_way_id.remove(&id);
}
Ok(region)
}
async fn read_through_store_by_id(&self, id: StoreId) -> Result<Store> {
let store = self.inner_client.clone().get_store(id).await?;
self.store_cache.write().await.insert(id, store.clone());
Ok(store)
}
pub async fn add_region(&self, region: RegionWithLeader) {
// FIXME: will it be the performance bottleneck?
let mut cache = self.region_cache.write().await;
let end_key = region.end_key();
let mut to_be_removed: HashSet<RegionVerId> = HashSet::new();
if let Some(ver_id) = cache.id_to_ver_id.get(&region.id()) {
if ver_id != &region.ver_id() {
to_be_removed.insert(ver_id.clone());
}
}
let mut search_range = {
if end_key.is_empty() {
cache.key_to_ver_id.range(..)
} else {
cache.key_to_ver_id.range(..end_key)
}
};
while let Some((_, ver_id_in_cache)) = search_range.next_back() {
let region_in_cache = cache.ver_id_to_region.get(ver_id_in_cache).unwrap();
if region_in_cache.region.end_key > region.region.start_key {
to_be_removed.insert(ver_id_in_cache.clone());
} else {
break;
}
}
for ver_id in to_be_removed {
let region_to_remove = cache.ver_id_to_region.remove(&ver_id).unwrap();
cache.key_to_ver_id.remove(&region_to_remove.start_key());
cache.id_to_ver_id.remove(&region_to_remove.id());
}
cache
.key_to_ver_id
.insert(region.start_key(), region.ver_id());
cache.id_to_ver_id.insert(region.id(), region.ver_id());
cache.ver_id_to_region.insert(region.ver_id(), region);
}
pub async fn update_leader(
&self,
ver_id: crate::region::RegionVerId,
leader: metapb::Peer,
) -> Result<()> {
let mut cache = self.region_cache.write().await;
let region_entry = cache
.ver_id_to_region
.get_mut(&ver_id)
.ok_or(Error::EntryNotFoundInRegionCache)?;
region_entry.leader = Some(leader);
Ok(())
}
pub async fn invalidate_region_cache(&self, ver_id: crate::region::RegionVerId) {
let mut cache = self.region_cache.write().await;
let region_entry = cache.ver_id_to_region.get(&ver_id);
if let Some(region) = region_entry {
let id = region.id();
let start_key = region.start_key();
cache.ver_id_to_region.remove(&ver_id);
cache.id_to_ver_id.remove(&id);
cache.key_to_ver_id.remove(&start_key);
}
}
}
#[cfg(test)]
mod test {
use super::RegionCache;
use crate::{
pd::RetryClientTrait,
region::{RegionId, RegionWithLeader},
Key, Result,
};
use async_trait::async_trait;
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::{
atomic::{AtomicU64, Ordering::SeqCst},
Arc,
},
};
use tikv_client_common::Error;
use tikv_client_proto::metapb;
use tokio::sync::Mutex;
#[derive(Default)]
struct MockRetryClient {
pub regions: Mutex<HashMap<RegionId, RegionWithLeader>>,
pub get_region_count: AtomicU64,
}
#[async_trait]
impl RetryClientTrait for MockRetryClient {
async fn get_region(
self: Arc<Self>,
key: Vec<u8>,
) -> Result<crate::region::RegionWithLeader> {
self.get_region_count.fetch_add(1, SeqCst);
self.regions
.lock()
.await
.iter()
.filter(|(_, r)| r.contains(&key.clone().into()))
.map(|(_, r)| r.clone())
.next()
.ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned()))
}
async fn get_region_by_id(
self: Arc<Self>,
region_id: crate::region::RegionId,
) -> Result<crate::region::RegionWithLeader> {
self.get_region_count.fetch_add(1, SeqCst);
self.regions
.lock()
.await
.iter()
.filter(|(id, _)| id == &&region_id)
.map(|(_, r)| r.clone())
.next()
.ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned()))
}
async fn get_store(
self: Arc<Self>,
_id: crate::region::StoreId,
) -> Result<tikv_client_proto::metapb::Store> {
todo!()
}
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<tikv_client_proto::metapb::Store>> {
todo!()
}
async fn get_timestamp(self: Arc<Self>) -> Result<tikv_client_proto::pdpb::Timestamp> {
todo!()
}
async fn update_safepoint(self: Arc<Self>, _safepoint: u64) -> Result<bool> {
todo!()
}
}
#[tokio::test]
async fn cache_is_used() -> Result<()> {
let retry_client = Arc::new(MockRetryClient::default());
let cache = RegionCache::new(retry_client.clone());
retry_client.regions.lock().await.insert(
1,
RegionWithLeader {
region: metapb::Region {
id: 1,
start_key: vec![],
end_key: vec![100],
..Default::default()
},
leader: Some(metapb::Peer {
store_id: 1,
..Default::default()
}),
},
);
retry_client.regions.lock().await.insert(
2,
RegionWithLeader {
region: metapb::Region {
id: 2,
start_key: vec![101],
end_key: vec![],
..Default::default()
},
leader: Some(metapb::Peer {
store_id: 2,
..Default::default()
}),
},
);
assert_eq!(retry_client.get_region_count.load(SeqCst), 0);
// first query, read through
assert_eq!(cache.get_region_by_id(1).await?.end_key(), vec![100].into());
assert_eq!(retry_client.get_region_count.load(SeqCst), 1);
// should read from cache
assert_eq!(cache.get_region_by_id(1).await?.end_key(), vec![100].into());
assert_eq!(retry_client.get_region_count.load(SeqCst), 1);
// invalidate, should read through
cache
.invalidate_region_cache(cache.get_region_by_id(1).await?.ver_id())
.await;
assert_eq!(cache.get_region_by_id(1).await?.end_key(), vec![100].into());
assert_eq!(retry_client.get_region_count.load(SeqCst), 2);
// update leader should work
cache
.update_leader(
cache.get_region_by_id(2).await?.ver_id(),
metapb::Peer {
store_id: 102,
..Default::default()
},
)
.await?;
assert_eq!(
cache.get_region_by_id(2).await?.leader.unwrap().store_id,
102
);
Ok(())
}
#[tokio::test]
async fn test_add_disjoint_regions() {
let retry_client = Arc::new(MockRetryClient::default());
let cache = RegionCache::new(retry_client.clone());
let region1 = region(1, vec![], vec![10]);
let region2 = region(2, vec![10], vec![20]);
let region3 = region(3, vec![30], vec![]);
cache.add_region(region1.clone()).await;
cache.add_region(region2.clone()).await;
cache.add_region(region3.clone()).await;
let mut expected_cache = BTreeMap::new();
expected_cache.insert(vec![].into(), region1);
expected_cache.insert(vec![10].into(), region2);
expected_cache.insert(vec![30].into(), region3);
assert(&cache, &expected_cache).await
}
#[tokio::test]
async fn test_add_intersecting_regions() {
let retry_client = Arc::new(MockRetryClient::default());
let cache = RegionCache::new(retry_client.clone());
cache.add_region(region(1, vec![], vec![10])).await;
cache.add_region(region(2, vec![10], vec![20])).await;
cache.add_region(region(3, vec![30], vec![40])).await;
cache.add_region(region(4, vec![50], vec![60])).await;
cache.add_region(region(5, vec![20], vec![35])).await;
let mut expected_cache: BTreeMap<Key, _> = BTreeMap::new();
expected_cache.insert(vec![].into(), region(1, vec![], vec![10]));
expected_cache.insert(vec![10].into(), region(2, vec![10], vec![20]));
expected_cache.insert(vec![20].into(), region(5, vec![20], vec![35]));
expected_cache.insert(vec![50].into(), region(4, vec![50], vec![60]));
assert(&cache, &expected_cache).await;
cache.add_region(region(6, vec![15], vec![25])).await;
let mut expected_cache = BTreeMap::new();
expected_cache.insert(vec![].into(), region(1, vec![], vec![10]));
expected_cache.insert(vec![15].into(), region(6, vec![15], vec![25]));
expected_cache.insert(vec![50].into(), region(4, vec![50], vec![60]));
assert(&cache, &expected_cache).await;
cache.add_region(region(7, vec![20], vec![])).await;
let mut expected_cache = BTreeMap::new();
expected_cache.insert(vec![].into(), region(1, vec![], vec![10]));
expected_cache.insert(vec![20].into(), region(7, vec![20], vec![]));
assert(&cache, &expected_cache).await;
cache.add_region(region(8, vec![], vec![15])).await;
let mut expected_cache = BTreeMap::new();
expected_cache.insert(vec![].into(), region(8, vec![], vec![15]));
expected_cache.insert(vec![20].into(), region(7, vec![20], vec![]));
assert(&cache, &expected_cache).await;
}
#[tokio::test]
async fn test_get_region_by_key() -> Result<()> {
let retry_client = Arc::new(MockRetryClient::default());
let cache = RegionCache::new(retry_client.clone());
let region1 = region(1, vec![], vec![10]);
let region2 = region(2, vec![10], vec![20]);
let region3 = region(3, vec![30], vec![40]);
let region4 = region(4, vec![50], vec![]);
cache.add_region(region1.clone()).await;
cache.add_region(region2.clone()).await;
cache.add_region(region3.clone()).await;
cache.add_region(region4.clone()).await;
assert_eq!(
cache.get_region_by_key(&vec![].into()).await?,
region1.clone()
);
assert_eq!(
cache.get_region_by_key(&vec![5].into()).await?,
region1.clone()
);
assert_eq!(
cache.get_region_by_key(&vec![10].into()).await?,
region2.clone()
);
assert!(cache.get_region_by_key(&vec![20].into()).await.is_err());
assert!(cache.get_region_by_key(&vec![25].into()).await.is_err());
assert_eq!(cache.get_region_by_key(&vec![60].into()).await?, region4);
Ok(())
}
// a helper function to assert the cache is in expected state
async fn assert(
cache: &RegionCache<MockRetryClient>,
expected_cache: &BTreeMap<Key, RegionWithLeader>,
) {
let guard = cache.region_cache.read().await;
let mut actual_keys = guard.ver_id_to_region.values().collect::<Vec<_>>();
let mut expected_keys = expected_cache.values().collect::<Vec<_>>();
actual_keys.sort_by_cached_key(|r| r.id());
expected_keys.sort_by_cached_key(|r| r.id());
assert_eq!(actual_keys, expected_keys);
assert_eq!(
guard.key_to_ver_id.keys().collect::<HashSet<_>>(),
expected_cache.keys().collect::<HashSet<_>>()
)
}
fn region(id: RegionId, start_key: Vec<u8>, end_key: Vec<u8>) -> RegionWithLeader {
let mut region = RegionWithLeader::default();
region.region.set_id(id);
region.region.set_start_key(start_key);
region.region.set_end_key(end_key);
// We don't care about other fields here
region
}
}

View File

@ -6,13 +6,13 @@ use crate::{
};
use async_trait::async_trait;
use derive_new::new;
use tikv_client_store::{HasError, Request};
use tikv_client_store::{HasKeyErrors, Request};
pub use self::{
plan::{
Collect, CollectAndMatchKey, CollectError, DefaultProcessor, Dispatch, ExtractError,
HasKeys, Merge, MergeResponse, MultiRegion, Plan, PreserveKey, Process, ProcessResponse,
ResolveLock, RetryRegion,
Collect, CollectAndMatchKey, CollectError, CollectSingle, DefaultProcessor, Dispatch,
ExtractError, HasKeys, Merge, MergeResponse, Plan, PreserveKey, Process, ProcessResponse,
ResolveLock, RetryableMultiRegion,
},
plan_builder::{PlanBuilder, SingleKey},
shard::Shardable,
@ -27,7 +27,7 @@ mod shard;
#[async_trait]
pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static {
/// The expected response to the request.
type Response: HasError + HasLocks + Clone + Send + 'static;
type Response: HasKeyErrors + HasLocks + Clone + Send + 'static;
}
#[derive(Clone, Debug, new, Eq, PartialEq)]
@ -70,30 +70,29 @@ mod test {
transaction::lowering::new_commit_request,
Error, Key, Result,
};
use futures::executor;
use grpcio::CallOption;
use std::{
any::Any,
iter,
sync::{Arc, Mutex},
sync::{atomic::AtomicUsize, Arc},
};
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp, tikvpb::TikvClient};
use tikv_client_store::HasRegionError;
#[test]
fn test_region_retry() {
#[tokio::test]
async fn test_region_retry() {
#[derive(Clone)]
struct MockRpcResponse;
impl HasError for MockRpcResponse {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for MockRpcResponse {
fn key_errors(&mut self) -> Option<Vec<Error>> {
None
}
}
impl HasRegionError for MockRpcResponse {
fn region_error(&mut self) -> Option<Error> {
Some(Error::RegionNotFound { region_id: 1 })
fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error> {
Some(tikv_client_proto::errorpb::Error::default())
}
}
@ -101,7 +100,7 @@ mod test {
#[derive(Clone)]
struct MockKvRequest {
test_invoking_count: Arc<Mutex<usize>>,
test_invoking_count: Arc<AtomicUsize>,
}
#[async_trait]
@ -136,11 +135,11 @@ mod test {
pd_client: &std::sync::Arc<impl crate::pd::PdClient>,
) -> futures::stream::BoxStream<
'static,
crate::Result<(Self::Shard, crate::store::Store)>,
crate::Result<(Self::Shard, crate::store::RegionStore)>,
> {
// Increases by 1 for each call.
let mut test_invoking_count = self.test_invoking_count.lock().unwrap();
*test_invoking_count += 1;
self.test_invoking_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
store_stream_for_keys(
Some(Key::from("mock_key".to_owned())).into_iter(),
pd_client.clone(),
@ -150,13 +149,13 @@ mod test {
fn apply_shard(
&mut self,
_shard: Self::Shard,
_store: &crate::store::Store,
_store: &crate::store::RegionStore,
) -> crate::Result<()> {
Ok(())
}
}
let invoking_count = Arc::new(Mutex::new(0));
let invoking_count = Arc::new(AtomicUsize::new(0));
let request = MockKvRequest {
test_invoking_count: invoking_count.clone(),
@ -168,18 +167,17 @@ mod test {
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
.resolve_lock(Backoff::no_jitter_backoff(1, 1, 3))
.multi_region()
.retry_region(Backoff::no_jitter_backoff(1, 1, 3))
.retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
.extract_error()
.plan();
let _ = executor::block_on(async { plan.execute().await });
let _ = plan.execute().await;
// Original call plus the 3 retries
assert_eq!(*invoking_count.lock().unwrap(), 4);
assert_eq!(invoking_count.load(std::sync::atomic::Ordering::SeqCst), 4);
}
#[test]
fn test_extract_error() {
#[tokio::test]
async fn test_extract_error() {
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|_: &dyn Any| {
Ok(Box::new(kvrpcpb::CommitResponse {
@ -206,18 +204,16 @@ mod test {
// does not extract error
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req.clone())
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(OPTIMISTIC_BACKOFF)
.retry_multi_region(OPTIMISTIC_BACKOFF)
.plan();
assert!(executor::block_on(async { plan.execute().await }).is_ok());
assert!(plan.execute().await.is_ok());
// extract error
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(OPTIMISTIC_BACKOFF)
.retry_multi_region(OPTIMISTIC_BACKOFF)
.extract_error()
.plan();
assert!(executor::block_on(async { plan.execute().await }).is_err());
assert!(plan.execute().await.is_err());
}
}

View File

@ -5,15 +5,18 @@ use crate::{
pd::PdClient,
request::{KvRequest, Shardable},
stats::tikv_stats,
store::RegionStore,
transaction::{resolve_locks, HasLocks},
util::iter::FlatMapOkIterExt,
Error, Key, KvPair, Result, Value,
};
use async_recursion::async_recursion;
use async_trait::async_trait;
use futures::{prelude::*, stream::StreamExt};
use futures::{future::try_join_all, stream::StreamExt};
use std::{marker::PhantomData, sync::Arc};
use tikv_client_proto::kvrpcpb;
use tikv_client_store::{HasError, HasRegionError, KvClient};
use tikv_client_proto::{errorpb::EpochNotMatch, kvrpcpb};
use tikv_client_store::{HasKeyErrors, HasRegionError, HasRegionErrors, KvClient};
use tokio::sync::Semaphore;
/// A plan for how to execute a request. A user builds up a plan with various
/// options, then exectutes it.
@ -63,42 +66,214 @@ impl<Req: KvRequest + HasKeys> HasKeys for Dispatch<Req> {
}
}
pub struct MultiRegion<P: Plan, PdC: PdClient> {
const MULTI_REGION_CONCURRENCY: usize = 16;
pub struct RetryableMultiRegion<P: Plan, PdC: PdClient> {
pub(super) inner: P,
pub pd_client: Arc<PdC>,
pub backoff: Backoff,
}
impl<P: Plan, PdC: PdClient> Clone for MultiRegion<P, PdC> {
impl<P: Plan + Shardable, PdC: PdClient> RetryableMultiRegion<P, PdC>
where
P::Result: HasKeyErrors + HasRegionError,
{
// A plan may involve multiple shards
#[async_recursion]
async fn single_plan_handler(
pd_client: Arc<PdC>,
current_plan: P,
backoff: Backoff,
permits: Arc<Semaphore>,
) -> Result<<Self as Plan>::Result> {
let shards = current_plan.shards(&pd_client).collect::<Vec<_>>().await;
let mut handles = Vec::new();
for shard in shards {
let (shard, region_store) = shard?;
let mut clone = current_plan.clone();
clone.apply_shard(shard, &region_store)?;
let handle = tokio::spawn(Self::single_shard_handler(
pd_client.clone(),
clone,
region_store,
backoff.clone(),
permits.clone(),
));
handles.push(handle);
}
Ok(try_join_all(handles)
.await?
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect())
}
#[async_recursion]
async fn single_shard_handler(
pd_client: Arc<PdC>,
plan: P,
region_store: RegionStore,
mut backoff: Backoff,
permits: Arc<Semaphore>,
) -> Result<<Self as Plan>::Result> {
// limit concurrent requests
let permit = permits.acquire().await.unwrap();
let mut resp = plan.execute().await?;
drop(permit);
if let Some(e) = resp.key_errors() {
Ok(vec![Err(Error::MultipleKeyErrors(e))])
} else if let Some(e) = resp.region_error() {
match backoff.next_delay_duration() {
Some(duration) => {
let region_error_resolved =
Self::handle_region_error(pd_client.clone(), e, region_store).await?;
// don't sleep if we have resolved the region error
if !region_error_resolved {
futures_timer::Delay::new(duration).await;
}
Self::single_plan_handler(pd_client, plan, backoff, permits).await
}
None => Err(Error::RegionError(e)),
}
} else {
Ok(vec![Ok(resp)])
}
}
// Returns
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
async fn handle_region_error(
pd_client: Arc<PdC>,
mut e: tikv_client_proto::errorpb::Error,
region_store: RegionStore,
) -> Result<bool> {
let ver_id = region_store.region_with_leader.ver_id();
if e.has_not_leader() {
let not_leader = e.get_not_leader();
if not_leader.has_leader() {
match pd_client
.update_leader(
region_store.region_with_leader.ver_id(),
not_leader.get_leader().clone(),
)
.await
{
Ok(_) => Ok(true),
Err(e) => {
pd_client.invalidate_region_cache(ver_id).await;
Err(e)
}
}
} else {
// The peer doesn't know who is the current leader. Generally it's because
// the Raft group is in an election, but it's possible that the peer is
// isolated and removed from the Raft group. So it's necessary to reload
// the region from PD.
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
} else if e.has_store_not_match() {
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
} else if e.has_epoch_not_match() {
Self::on_region_epoch_not_match(
pd_client.clone(),
region_store,
e.take_epoch_not_match(),
)
.await
} else if e.has_stale_command() || e.has_region_not_found() {
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
} else if e.has_server_is_busy()
|| e.has_raft_entry_too_large()
|| e.has_max_timestamp_not_synced()
{
Err(Error::RegionError(e))
} else {
// TODO: pass the logger around
// info!("unknwon region error: {:?}", e);
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
}
// Returns
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
async fn on_region_epoch_not_match(
pd_client: Arc<PdC>,
region_store: RegionStore,
error: EpochNotMatch,
) -> Result<bool> {
let ver_id = region_store.region_with_leader.ver_id();
if error.get_current_regions().is_empty() {
pd_client.invalidate_region_cache(ver_id).await;
return Ok(true);
}
for r in error.get_current_regions() {
if r.get_id() == region_store.region_with_leader.id() {
let returned_conf_ver = r.get_region_epoch().get_conf_ver();
let returned_version = r.get_region_epoch().get_version();
let current_conf_ver = region_store
.region_with_leader
.region
.get_region_epoch()
.get_conf_ver();
let current_version = region_store
.region_with_leader
.region
.get_region_epoch()
.get_version();
// Find whether the current region is ahead of TiKV's. If so, backoff.
if returned_conf_ver < current_conf_ver || returned_version < current_version {
return Ok(false);
}
}
}
// TODO: finer grained processing
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
}
impl<P: Plan, PdC: PdClient> Clone for RetryableMultiRegion<P, PdC> {
fn clone(&self) -> Self {
MultiRegion {
RetryableMultiRegion {
inner: self.inner.clone(),
pd_client: self.pd_client.clone(),
backoff: self.backoff.clone(),
}
}
}
#[async_trait]
impl<P: Plan + Shardable, PdC: PdClient> Plan for MultiRegion<P, PdC>
impl<P: Plan + Shardable, PdC: PdClient> Plan for RetryableMultiRegion<P, PdC>
where
P::Result: HasError,
P::Result: HasKeyErrors + HasRegionError,
{
type Result = Vec<Result<P::Result>>;
async fn execute(&self) -> Result<Self::Result> {
Ok(self
.inner
.shards(&self.pd_client)
.and_then(move |(shard, store)| async move {
let mut clone = self.inner.clone();
clone.apply_shard(shard, &store)?;
let mut response = clone.execute().await?;
match response.error() {
Some(e) => Err(e),
None => Ok(response),
}
})
.collect()
.await)
// Limit the maximum concurrency of multi-region request. If there are
// too many concurrent requests, TiKV is more likely to return a "TiKV
// is busy" error
let concurrency_permits = Arc::new(Semaphore::new(MULTI_REGION_CONCURRENCY));
Self::single_plan_handler(
self.pd_client.clone(),
self.inner.clone(),
self.backoff.clone(),
concurrency_permits.clone(),
)
.await
}
}
@ -131,6 +306,25 @@ impl<In: Clone + Send + Sync + 'static, P: Plan<Result = Vec<Result<In>>>, M: Me
#[derive(Clone, Copy)]
pub struct Collect;
/// A merge strategy that only takes the first element. It's used for requests
/// that should have exactly one response, e.g. a get request.
#[derive(Clone, Copy)]
pub struct CollectSingle;
#[macro_export]
macro_rules! collect_first {
($type_: ty) => {
impl Merge<$type_> for CollectSingle {
type Out = $type_;
fn merge(&self, mut input: Vec<Result<$type_>>) -> Result<Self::Out> {
assert!(input.len() == 1);
input.pop().unwrap()
}
}
};
}
/// A merge strategy to be used with
/// [`preserve_keys`](super::plan_builder::PlanBuilder::preserve_keys).
/// It matches the keys preserved before and the values returned in the response.
@ -178,46 +372,6 @@ impl<In: Clone + Sync + Send + 'static, P: Plan<Result = In>, Pr: Process<In>> P
#[derive(Clone, Copy, Debug)]
pub struct DefaultProcessor;
pub struct RetryRegion<P: Plan, PdC: PdClient> {
pub inner: P,
pub pd_client: Arc<PdC>,
pub backoff: Backoff,
}
impl<P: Plan, PdC: PdClient> Clone for RetryRegion<P, PdC> {
fn clone(&self) -> Self {
RetryRegion {
inner: self.inner.clone(),
pd_client: self.pd_client.clone(),
backoff: self.backoff.clone(),
}
}
}
#[async_trait]
impl<P: Plan, PdC: PdClient> Plan for RetryRegion<P, PdC>
where
P::Result: HasError,
{
type Result = P::Result;
async fn execute(&self) -> Result<Self::Result> {
let mut result = self.inner.execute().await?;
let mut clone = self.clone();
while let Some(region_error) = result.region_error() {
match clone.backoff.next_delay_duration() {
None => return Err(region_error),
Some(delay_duration) => {
futures_timer::Delay::new(delay_duration).await;
result = clone.inner.execute().await?;
}
}
}
Ok(result)
}
}
pub struct ResolveLock<P: Plan, PdC: PdClient> {
pub inner: P,
pub pd_client: Arc<PdC>,
@ -299,16 +453,18 @@ impl<P: Plan> Clone for ExtractError<P> {
#[async_trait]
impl<P: Plan> Plan for ExtractError<P>
where
P::Result: HasError,
P::Result: HasKeyErrors + HasRegionErrors,
{
type Result = P::Result;
async fn execute(&self) -> Result<Self::Result> {
let mut result = self.inner.execute().await?;
if let Some(error) = result.error() {
Err(error)
} else if let Some(error) = result.region_error() {
Err(error)
if let Some(errors) = result.key_errors() {
Err(Error::ExtractedErrors(errors))
} else if let Some(errors) = result.region_errors() {
Err(Error::ExtractedErrors(
errors.into_iter().map(Error::RegionError).collect(),
))
} else {
Ok(result)
}
@ -355,9 +511,9 @@ pub trait HasKeys {
#[derive(Debug, Clone)]
pub struct ResponseAndKeys<Resp>(Resp, Vec<Key>);
impl<Resp: HasError> HasError for ResponseAndKeys<Resp> {
fn error(&mut self) -> Option<Error> {
self.0.error()
impl<Resp: HasKeyErrors> HasKeyErrors for ResponseAndKeys<Resp> {
fn key_errors(&mut self) -> Option<Vec<Error>> {
self.0.key_errors()
}
}
@ -368,7 +524,7 @@ impl<Resp: HasLocks> HasLocks for ResponseAndKeys<Resp> {
}
impl<Resp: HasRegionError> HasRegionError for ResponseAndKeys<Resp> {
fn region_error(&mut self) -> Option<Error> {
fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error> {
self.0.region_error()
}
}
@ -411,7 +567,10 @@ impl Merge<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>> for CollectAndMatc
mod test {
use super::*;
use crate::mock::{mock_store, MockPdClient};
use futures::stream::BoxStream;
use futures::{
stream::{self, BoxStream},
TryStreamExt,
};
use tikv_client_proto::kvrpcpb::BatchGetResponse;
#[derive(Clone)]
@ -432,35 +591,28 @@ mod test {
fn shards(
&self,
_: &Arc<impl crate::pd::PdClient>,
) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::Store)>> {
) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::RegionStore)>> {
Box::pin(stream::iter(1..=3).map(|_| Err(Error::Unimplemented)))
.map_ok(|_: u8| (42, mock_store()))
.boxed()
}
fn apply_shard(&mut self, _: Self::Shard, _: &crate::store::Store) -> Result<()> {
fn apply_shard(&mut self, _: Self::Shard, _: &crate::store::RegionStore) -> Result<()> {
Ok(())
}
}
#[tokio::test]
async fn test_err() {
let plan = RetryRegion {
inner: MultiRegion {
let plan = RetryableMultiRegion {
inner: ResolveLock {
inner: ErrPlan,
backoff: Backoff::no_backoff(),
pd_client: Arc::new(MockPdClient::default()),
},
pd_client: Arc::new(MockPdClient::default()),
},
backoff: Backoff::no_backoff(),
pd_client: Arc::new(MockPdClient::default()),
};
plan.execute()
.await
.unwrap()
.iter()
.for_each(|r| assert!(r.is_err()));
assert!(plan.execute().await.is_err())
}
}

View File

@ -5,15 +5,15 @@ use crate::{
backoff::Backoff,
pd::PdClient,
request::{
DefaultProcessor, Dispatch, ExtractError, HasKeys, KvRequest, Merge, MergeResponse,
MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable,
DefaultProcessor, Dispatch, ExtractError, HasKeys, KvRequest, Merge, MergeResponse, Plan,
Process, ProcessResponse, ResolveLock, RetryableMultiRegion, Shardable,
},
store::Store,
store::RegionStore,
transaction::HasLocks,
Result,
};
use std::{marker::PhantomData, sync::Arc};
use tikv_client_store::HasError;
use tikv_client_store::{HasKeyErrors, HasRegionError, HasRegionErrors};
/// Builder type for plans (see that module for more).
pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
@ -68,24 +68,6 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
}
}
/// If there is a region error, re-shard the request and re-resolve regions, then retry.
///
/// Note that this plan must wrap a multi-region plan if the request should be re-sharded.
pub fn retry_region(self, backoff: Backoff) -> PlanBuilder<PdC, RetryRegion<P, PdC>, Ph>
where
P::Result: HasError,
{
PlanBuilder {
pd_client: self.pd_client.clone(),
plan: RetryRegion {
inner: self.plan,
backoff,
pd_client: self.pd_client,
},
phantom: PhantomData,
}
}
/// Merge the results of a request. Usually used where a request is sent to multiple regions
/// to combine the responses from each region.
pub fn merge<In, M: Merge<In>>(self, merge: M) -> PlanBuilder<PdC, MergeResponse<P, In, M>, Ph>
@ -128,15 +110,19 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
where
P::Result: HasError,
P::Result: HasKeyErrors + HasRegionError,
{
/// Split the request into shards sending a request to the region of each shard.
pub fn multi_region(self) -> PlanBuilder<PdC, MultiRegion<P, PdC>, Targetted> {
pub fn retry_multi_region(
self,
backoff: Backoff,
) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
PlanBuilder {
pd_client: self.pd_client.clone(),
plan: MultiRegion {
plan: RetryableMultiRegion {
inner: self.plan,
pd_client: self.pd_client,
backoff,
},
phantom: PhantomData,
}
@ -144,9 +130,12 @@ where
}
impl<PdC: PdClient, R: KvRequest + SingleKey> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
/// Target the request at a single region.
/// Target the request at a single region. *Note*: single region plan will
/// cannot automatically retry on region errors. It's only used for requests
/// that target at a specific region but not keys (e.g. ResolveLockRequest).
pub async fn single_region(self) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
let key = self.plan.request.key();
// TODO: retry when region error occurred
let store = self.pd_client.clone().store_for_key(key.into()).await?;
set_single_region_store(self.plan, store, self.pd_client)
}
@ -156,7 +145,7 @@ impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
/// Target the request at a single region; caller supplies the store to target.
pub async fn single_region_with_store(
self,
store: Store,
store: RegionStore,
) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
set_single_region_store(self.plan, store, self.pd_client)
}
@ -164,7 +153,7 @@ impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
impl<PdC: PdClient, P: Plan + HasKeys> PlanBuilder<PdC, P, NoTarget>
where
P::Result: HasError,
P::Result: HasKeyErrors,
{
pub fn preserve_keys(self) -> PlanBuilder<PdC, PreserveKey<P>, NoTarget> {
PlanBuilder {
@ -177,7 +166,7 @@ where
impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted>
where
P::Result: HasError,
P::Result: HasKeyErrors + HasRegionErrors,
{
pub fn extract_error(self) -> PlanBuilder<PdC, ExtractError<P>, Targetted> {
PlanBuilder {
@ -190,10 +179,11 @@ where
fn set_single_region_store<PdC: PdClient, R: KvRequest>(
mut plan: Dispatch<R>,
store: Store,
store: RegionStore,
pd_client: Arc<PdC>,
) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
plan.request.set_context(store.region.context()?);
plan.request
.set_context(store.region_with_leader.context()?);
plan.kv_client = Some(store.client);
Ok(PlanBuilder {
plan,

View File

@ -2,8 +2,8 @@
use crate::{
pd::PdClient,
request::{Dispatch, HasKeys, KvRequest, Plan, PreserveKey, ResolveLock, RetryRegion},
store::Store,
request::{Dispatch, HasKeys, KvRequest, Plan, PreserveKey, ResolveLock},
store::RegionStore,
Result,
};
use futures::stream::BoxStream;
@ -16,11 +16,11 @@ macro_rules! impl_inner_shardable {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.inner.apply_shard(shard, store)
}
};
@ -32,9 +32,9 @@ pub trait Shardable {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>>;
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>>;
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()>;
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>;
}
impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
@ -43,11 +43,11 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
self.request.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.kv_client = Some(store.client.clone());
self.request.apply_shard(shard, store)
}
@ -61,8 +61,37 @@ impl<P: Plan + HasKeys + Shardable> Shardable for PreserveKey<P> {
impl_inner_shardable!();
}
impl<P: Plan + Shardable, PdC: PdClient> Shardable for RetryRegion<P, PdC> {
impl_inner_shardable!();
#[macro_export]
macro_rules! shardable_key {
($type_: ty) => {
impl Shardable for $type_ {
type Shard = Vec<Vec<u8>>;
fn shards(
&self,
pd_client: &std::sync::Arc<impl crate::pd::PdClient>,
) -> futures::stream::BoxStream<
'static,
crate::Result<(Self::Shard, crate::store::RegionStore)>,
> {
crate::store::store_stream_for_keys(
std::iter::once(self.key.clone()),
pd_client.clone(),
)
}
fn apply_shard(
&mut self,
mut shard: Self::Shard,
store: &crate::store::RegionStore,
) -> crate::Result<()> {
self.set_context(store.region_with_leader.context()?);
assert!(shard.len() == 1);
self.set_key(shard.pop().unwrap());
Ok(())
}
}
};
}
#[macro_export]
@ -76,7 +105,7 @@ macro_rules! shardable_keys {
pd_client: &std::sync::Arc<impl crate::pd::PdClient>,
) -> futures::stream::BoxStream<
'static,
crate::Result<(Self::Shard, crate::store::Store)>,
crate::Result<(Self::Shard, crate::store::RegionStore)>,
> {
let mut keys = self.keys.clone();
keys.sort();
@ -86,9 +115,9 @@ macro_rules! shardable_keys {
fn apply_shard(
&mut self,
shard: Self::Shard,
store: &crate::store::Store,
store: &crate::store::RegionStore,
) -> crate::Result<()> {
self.set_context(store.region.context()?);
self.set_context(store.region_with_leader.context()?);
self.set_keys(shard.into_iter().map(Into::into).collect());
Ok(())
}
@ -105,7 +134,7 @@ macro_rules! shardable_range {
fn shards(
&self,
pd_client: &Arc<impl crate::pd::PdClient>,
) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::Store)>> {
) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::RegionStore)>> {
let start_key = self.start_key.clone().into();
let end_key = self.end_key.clone().into();
crate::store::store_stream_for_range((start_key, end_key), pd_client.clone())
@ -114,9 +143,9 @@ macro_rules! shardable_range {
fn apply_shard(
&mut self,
shard: Self::Shard,
store: &crate::store::Store,
store: &crate::store::RegionStore,
) -> crate::Result<()> {
self.set_context(store.region.context()?);
self.set_context(store.region_with_leader.context()?);
self.set_start_key(shard.0.into());
self.set_end_key(shard.1.into());

View File

@ -1,6 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{pd::PdClient, region::Region, BoundRange, Key, Result};
use crate::{pd::PdClient, region::RegionWithLeader, BoundRange, Key, Result};
use derive_new::new;
use futures::{prelude::*, stream::BoxStream};
use std::{
@ -10,17 +10,17 @@ use std::{
use tikv_client_proto::kvrpcpb;
use tikv_client_store::{KvClient, KvConnect, TikvConnect};
#[derive(new)]
pub struct Store {
pub region: Region,
#[derive(new, Clone)]
pub struct RegionStore {
pub region_with_leader: RegionWithLeader,
pub client: Arc<dyn KvClient + Send + Sync>,
}
pub trait KvConnectStore: KvConnect {
fn connect_to_store(&self, region: Region, address: String) -> Result<Store> {
fn connect_to_store(&self, region: RegionWithLeader, address: String) -> Result<RegionStore> {
log::info!("connect to tikv endpoint: {:?}", &address);
let client = self.connect(address.as_str())?;
Ok(Store::new(region, Arc::new(client)))
Ok(RegionStore::new(region, Arc::new(client)))
}
}
@ -30,7 +30,7 @@ impl KvConnectStore for TikvConnect {}
pub fn store_stream_for_keys<K, KOut, PdC>(
key_data: impl Iterator<Item = K> + Send + Sync + 'static,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<KOut>, Store)>>
) -> BoxStream<'static, Result<(Vec<KOut>, RegionStore)>>
where
PdC: PdClient,
K: AsRef<Key> + Into<KOut> + Send + Sync + 'static,
@ -52,12 +52,12 @@ where
pub fn store_stream_for_range<PdC: PdClient>(
range: (Vec<u8>, Vec<u8>),
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<((Vec<u8>, Vec<u8>), Store)>> {
) -> BoxStream<'static, Result<((Vec<u8>, Vec<u8>), RegionStore)>> {
let bnd_range = BoundRange::from(range.clone());
pd_client
.stores_for_range(bnd_range)
.map_ok(move |store| {
let region_range = store.region.range();
let region_range = store.region_with_leader.range();
let result_range = range_intersection(
region_range,
(range.0.clone().into(), range.1.clone().into()),
@ -70,12 +70,12 @@ pub fn store_stream_for_range<PdC: PdClient>(
pub fn store_stream_for_range_by_start_key<PdC: PdClient>(
start_key: Key,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<u8>, Store)>> {
) -> BoxStream<'static, Result<(Vec<u8>, RegionStore)>> {
let bnd_range = BoundRange::range_from(start_key.clone());
pd_client
.stores_for_range(bnd_range)
.map_ok(move |store| {
let region_range = store.region.range();
let region_range = store.region_with_leader.range();
(
range_intersection(region_range, (start_key.clone(), vec![].into()))
.0
@ -102,7 +102,7 @@ fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key)
pub fn store_stream_for_ranges<PdC: PdClient>(
ranges: Vec<kvrpcpb::KeyRange>,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<kvrpcpb::KeyRange>, Store)>> {
) -> BoxStream<'static, Result<(Vec<kvrpcpb::KeyRange>, RegionStore)>> {
pd_client
.clone()
.group_ranges_by_region(ranges)

View File

@ -401,10 +401,7 @@ mod tests {
))
.unwrap()
.collect::<Vec<_>>(),
vec![KvPair(
Key::from(b"key1".to_vec()),
Value::from(b"value".to_vec()),
),]
vec![KvPair(Key::from(b"key1".to_vec()), b"value".to_vec(),),]
);
}

View File

@ -89,11 +89,17 @@ impl Client {
) -> Result<Client> {
let logger = optional_logger.unwrap_or_else(|| {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!())
Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
)
});
debug!(logger, "creating new transactional client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, true).await?);
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true, logger.clone()).await?);
Ok(Client { pd, logger })
}
@ -220,8 +226,7 @@ impl Client {
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(crate::request::Collect)
.plan();
let res: Vec<kvrpcpb::LockInfo> = plan.execute().await?;
@ -235,6 +240,7 @@ impl Client {
}
// resolve locks
// FIXME: (1) this is inefficient (2) when region error occurred
resolve_locks(locks, self.pd.clone()).await?;
// update safepoint to PD

View File

@ -4,7 +4,7 @@ use crate::{
backoff::{Backoff, DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
pd::PdClient,
region::RegionVerId,
request::Plan,
request::{CollectSingle, Plan},
timestamp::TimestampExt,
transaction::requests,
Error, Result,
@ -63,10 +63,9 @@ pub async fn resolve_locks(
None => {
let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version);
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
.single_region()
.await?
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.post_process_default()
.plan();
let commit_version = plan.execute().await?;
@ -102,24 +101,31 @@ async fn resolve_lock_with_retry(
for i in 0..RESOLVE_LOCK_RETRY_LIMIT {
debug!("resolving locks: attempt {}", (i + 1));
let store = pd_client.clone().store_for_key(key.into()).await?;
let ver_id = store.region.ver_id();
let ver_id = store.region_with_leader.ver_id();
let request = requests::new_resolve_lock_request(start_version, commit_version);
// The only place where single-region is used
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
.single_region_with_store(store)
.await?
.resolve_lock(Backoff::no_backoff())
.retry_region(Backoff::no_backoff())
.extract_error()
.plan();
match plan.execute().await {
Ok(_) => {
return Ok(ver_id);
}
Err(e @ Error::RegionError(_)) => {
// Retry on region error
error = Some(e);
Err(Error::ExtractedErrors(mut errors)) => {
// ResolveLockResponse can have at most 1 error
match errors.pop() {
e @ Some(Error::RegionError(_)) => {
error = e;
continue;
}
Some(e) => return Err(e),
None => unreachable!(),
}
}
Err(e) => return Err(e),
}
}
@ -136,20 +142,21 @@ pub trait HasLocks {
mod tests {
use super::*;
use crate::mock::{MockKvClient, MockPdClient};
use futures::executor;
use std::any::Any;
use tikv_client_proto::errorpb;
#[test]
fn test_resolve_lock_with_retry() {
#[tokio::test]
async fn test_resolve_lock_with_retry() {
// Test resolve lock within retry limit
fail::cfg("region-error", "9*return").unwrap();
let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|_: &dyn Any| {
fail::fail_point!("region-error", |_| {
let mut resp = kvrpcpb::ResolveLockResponse::default();
resp.region_error = Some(errorpb::Error::default());
let resp = kvrpcpb::ResolveLockResponse {
region_error: Some(errorpb::Error::default()),
..Default::default()
};
Ok(Box::new(resp) as Box<dyn Any>)
});
Ok(Box::new(kvrpcpb::ResolveLockResponse::default()) as Box<dyn Any>)
@ -158,14 +165,16 @@ mod tests {
let key = vec![1];
let region1 = MockPdClient::region1();
let resolved_region =
executor::block_on(resolve_lock_with_retry(&key, 1, 2, client.clone())).unwrap();
let resolved_region = resolve_lock_with_retry(&key, 1, 2, client.clone())
.await
.unwrap();
assert_eq!(region1.ver_id(), resolved_region);
// Test resolve lock over retry limit
fail::cfg("region-error", "10*return").unwrap();
let key = vec![100];
executor::block_on(resolve_lock_with_retry(&key, 3, 4, client))
resolve_lock_with_retry(&key, 3, 4, client)
.await
.expect_err("should return error");
}
}

View File

@ -1,11 +1,13 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
collect_first,
pd::PdClient,
request::{
Collect, DefaultProcessor, HasKeys, KvRequest, Merge, Process, Shardable, SingleKey,
Collect, CollectSingle, DefaultProcessor, HasKeys, KvRequest, Merge, Process, Shardable,
SingleKey,
},
store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store},
store::{store_stream_for_keys, store_stream_for_range_by_start_key, RegionStore},
timestamp::TimestampExt,
transaction::HasLocks,
util::iter::FlatMapOkIterExt,
@ -13,7 +15,10 @@ use crate::{
};
use futures::stream::BoxStream;
use std::{collections::HashMap, iter, sync::Arc};
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
use tikv_client_proto::{
kvrpcpb::{self, TxnHeartBeatResponse},
pdpb::Timestamp,
};
// implement HasLocks for a response type that has a `pairs` field,
// where locks can be extracted from both the `pairs` and `error` fields
@ -67,6 +72,8 @@ impl KvRequest for kvrpcpb::GetRequest {
type Response = kvrpcpb::GetResponse;
}
shardable_key!(kvrpcpb::GetRequest);
collect_first!(kvrpcpb::GetResponse);
impl SingleKey for kvrpcpb::GetRequest {
fn key(&self) -> &Vec<u8> {
&self.key
@ -154,6 +161,10 @@ pub fn new_resolve_lock_request(
req
}
// Note: ResolveLockRequest is a special one: it can be sent to a specified
// region without keys. So it's not Shardable. And we don't automatically retry
// on its region errors (in the Plan level). The region error must be manually
// handled (in the upper level).
impl KvRequest for kvrpcpb::ResolveLockRequest {
type Response = kvrpcpb::ResolveLockResponse;
}
@ -170,6 +181,8 @@ impl KvRequest for kvrpcpb::CleanupRequest {
type Response = kvrpcpb::CleanupResponse;
}
shardable_key!(kvrpcpb::CleanupRequest);
collect_first!(kvrpcpb::CleanupResponse);
impl SingleKey for kvrpcpb::CleanupRequest {
fn key(&self) -> &Vec<u8> {
&self.key
@ -225,14 +238,14 @@ impl Shardable for kvrpcpb::PrewriteRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
let mut mutations = self.mutations.clone();
mutations.sort_by(|a, b| a.key.cmp(&b.key));
store_stream_for_keys(mutations.into_iter(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.set_context(store.region.context()?);
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
// Only need to set secondary keys if we're sending the primary key.
if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) {
@ -348,14 +361,14 @@ impl Shardable for kvrpcpb::PessimisticLockRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
let mut mutations = self.mutations.clone();
mutations.sort_by(|a, b| a.key.cmp(&b.key));
store_stream_for_keys(mutations.into_iter(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.set_context(store.region.context()?);
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.set_mutations(shard);
Ok(())
}
@ -428,12 +441,12 @@ impl Shardable for kvrpcpb::ScanLockRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_range_by_start_key(self.start_key.clone().into(), pd_client.clone())
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.set_context(store.region.context()?);
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.set_start_key(shard);
Ok(())
}
@ -466,6 +479,26 @@ impl KvRequest for kvrpcpb::TxnHeartBeatRequest {
type Response = kvrpcpb::TxnHeartBeatResponse;
}
impl Shardable for kvrpcpb::TxnHeartBeatRequest {
type Shard = Vec<Vec<u8>>;
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
}
fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
assert!(shard.len() == 1);
self.primary_lock = shard.pop().unwrap();
Ok(())
}
}
collect_first!(TxnHeartBeatResponse);
impl SingleKey for kvrpcpb::TxnHeartBeatRequest {
fn key(&self) -> &Vec<u8> {
&self.primary_lock
@ -484,6 +517,24 @@ impl KvRequest for kvrpcpb::CheckTxnStatusRequest {
type Response = kvrpcpb::CheckTxnStatusResponse;
}
impl Shardable for kvrpcpb::CheckTxnStatusRequest {
type Shard = Vec<Vec<u8>>;
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
}
fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
assert!(shard.len() == 1);
self.set_primary_key(shard.pop().unwrap());
Ok(())
}
}
impl SingleKey for kvrpcpb::CheckTxnStatusRequest {
fn key(&self) -> &Vec<u8> {
&self.primary_key

View File

@ -1,9 +1,11 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
backoff::Backoff,
backoff::{Backoff, DEFAULT_REGION_BACKOFF},
pd::{PdClient, PdRpcClient},
request::{Collect, CollectAndMatchKey, CollectError, Plan, PlanBuilder, RetryOptions},
request::{
Collect, CollectAndMatchKey, CollectError, CollectSingle, Plan, PlanBuilder, RetryOptions,
},
timestamp::TimestampExt,
transaction::{buffer::Buffer, lowering::*},
BoundRange, Error, Key, KvPair, Result, Value,
@ -119,10 +121,9 @@ impl<PdC: PdClient> Transaction<PdC> {
.get_or_else(key, |key| async move {
let request = new_get_request(key, timestamp);
let plan = PlanBuilder::new(rpc, request)
.single_region()
.await?
.resolve_lock(retry_options.lock_backoff)
.retry_region(retry_options.region_backoff)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.post_process_default()
.plan();
plan.execute().await
@ -252,8 +253,7 @@ impl<PdC: PdClient> Transaction<PdC> {
let request = new_batch_get_request(keys, timestamp);
let plan = PlanBuilder::new(rpc, request)
.resolve_lock(retry_options.lock_backoff)
.multi_region()
.retry_region(retry_options.region_backoff)
.retry_multi_region(retry_options.region_backoff)
.merge(Collect)
.plan();
plan.execute()
@ -522,8 +522,8 @@ impl<PdC: PdClient> Transaction<PdC> {
}
}
TransactionKind::Pessimistic(_) => {
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
self.pessimistic_lock(keys.into_iter(), false).await?;
self.pessimistic_lock(keys.into_iter().map(|k| k.into()), false)
.await?;
}
}
Ok(())
@ -665,10 +665,9 @@ impl<PdC: PdClient> Transaction<PdC> {
self.start_instant.elapsed().as_millis() as u64 + DEFAULT_LOCK_TTL,
);
let plan = PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.resolve_lock(self.options.retry_options.lock_backoff.clone())
.retry_region(self.options.retry_options.region_backoff.clone())
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.merge(CollectSingle)
.post_process_default()
.plan();
plan.execute().await
@ -693,8 +692,7 @@ impl<PdC: PdClient> Transaction<PdC> {
let request = new_scan_request(new_range, timestamp, new_limit, key_only);
let plan = PlanBuilder::new(rpc, request)
.resolve_lock(retry_options.lock_backoff)
.multi_region()
.retry_region(retry_options.region_backoff)
.retry_multi_region(retry_options.region_backoff)
.merge(Collect)
.plan();
plan.execute()
@ -750,8 +748,7 @@ impl<PdC: PdClient> Transaction<PdC> {
let plan = PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(self.options.retry_options.lock_backoff.clone())
.preserve_keys()
.multi_region()
.retry_region(self.options.retry_options.region_backoff.clone())
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.merge(CollectAndMatchKey)
.plan();
let pairs = plan.execute().await;
@ -826,9 +823,8 @@ impl<PdC: PdClient> Transaction<PdC> {
start_instant.elapsed().as_millis() as u64 + DEFAULT_LOCK_TTL,
);
let plan = PlanBuilder::new(rpc.clone(), request)
.single_region()
.await?
.retry_region(region_backoff.clone())
.retry_multi_region(region_backoff.clone())
.merge(CollectSingle)
.plan();
plan.execute().await?;
}
@ -1045,6 +1041,7 @@ impl HeartbeatOption {
/// The `commit` phase is to mark all written data as successfully committed.
///
/// The committer implements `prewrite`, `commit` and `rollback` functions.
#[allow(clippy::too_many_arguments)]
#[derive(new)]
struct Committer<PdC: PdClient = PdRpcClient> {
primary_key: Option<Key>,
@ -1128,8 +1125,7 @@ impl<PdC: PdClient> Committer<PdC> {
let plan = PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(self.options.retry_options.lock_backoff.clone())
.multi_region()
.retry_region(self.options.retry_options.region_backoff.clone())
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.merge(CollectError)
.extract_error()
.plan();
@ -1169,8 +1165,7 @@ impl<PdC: PdClient> Committer<PdC> {
);
let plan = PlanBuilder::new(self.rpc.clone(), req)
.resolve_lock(self.options.retry_options.lock_backoff.clone())
.multi_region()
.retry_region(self.options.retry_options.region_backoff.clone())
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.extract_error()
.plan();
plan.execute()
@ -1207,8 +1202,7 @@ impl<PdC: PdClient> Committer<PdC> {
};
let plan = PlanBuilder::new(self.rpc, req)
.resolve_lock(self.options.retry_options.lock_backoff)
.multi_region()
.retry_region(self.options.retry_options.region_backoff)
.retry_multi_region(self.options.retry_options.region_backoff)
.extract_error()
.plan();
plan.execute().await?;
@ -1229,8 +1223,7 @@ impl<PdC: PdClient> Committer<PdC> {
let req = new_batch_rollback_request(keys, self.start_version);
let plan = PlanBuilder::new(self.rpc, req)
.resolve_lock(self.options.retry_options.lock_backoff)
.multi_region()
.retry_region(self.options.retry_options.region_backoff)
.retry_multi_region(self.options.retry_options.region_backoff)
.extract_error()
.plan();
plan.execute().await?;
@ -1239,8 +1232,7 @@ impl<PdC: PdClient> Committer<PdC> {
let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts);
let plan = PlanBuilder::new(self.rpc, req)
.resolve_lock(self.options.retry_options.lock_backoff)
.multi_region()
.retry_region(self.options.retry_options.region_backoff)
.retry_multi_region(self.options.retry_options.region_backoff)
.extract_error()
.plan();
plan.execute().await?;
@ -1301,18 +1293,23 @@ mod tests {
#[tokio::test]
async fn test_optimistic_heartbeat() -> Result<(), io::Error> {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let logger = Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!());
info!(logger, "Testing: test_optimistic_heartbeat");
let logger = Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
);
let scenario = FailScenario::setup();
fail::cfg("after-prewrite", "sleep(1500)").unwrap();
let heartbeats = Arc::new(AtomicUsize::new(0));
let heartbeats_cloned = heartbeats.clone();
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
move |req: &dyn Any| {
if let Some(_) = req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>() {
if req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>().is_some() {
heartbeats_cloned.fetch_add(1, Ordering::SeqCst);
Ok(Box::new(kvrpcpb::TxnHeartBeatResponse::default()) as Box<dyn Any>)
} else if let Some(_) = req.downcast_ref::<kvrpcpb::PrewriteRequest>() {
} else if req.downcast_ref::<kvrpcpb::PrewriteRequest>().is_some() {
Ok(Box::new(kvrpcpb::PrewriteResponse::default()) as Box<dyn Any>)
} else {
Ok(Box::new(kvrpcpb::CommitResponse::default()) as Box<dyn Any>)
@ -1341,18 +1338,27 @@ mod tests {
#[tokio::test]
async fn test_pessimistic_heartbeat() -> Result<(), io::Error> {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let logger = Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!());
info!(logger, "Testing: test_pessimistic_heartbeat");
let logger = Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
);
let heartbeats = Arc::new(AtomicUsize::new(0));
let heartbeats_cloned = heartbeats.clone();
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
move |req: &dyn Any| {
if let Some(_) = req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>() {
if req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>().is_some() {
heartbeats_cloned.fetch_add(1, Ordering::SeqCst);
Ok(Box::new(kvrpcpb::TxnHeartBeatResponse::default()) as Box<dyn Any>)
} else if let Some(_) = req.downcast_ref::<kvrpcpb::PrewriteRequest>() {
} else if req.downcast_ref::<kvrpcpb::PrewriteRequest>().is_some() {
Ok(Box::new(kvrpcpb::PrewriteResponse::default()) as Box<dyn Any>)
} else if let Some(_) = req.downcast_ref::<kvrpcpb::PessimisticLockRequest>() {
} else if req
.downcast_ref::<kvrpcpb::PessimisticLockRequest>()
.is_some()
{
Ok(Box::new(kvrpcpb::PessimisticLockResponse::default()) as Box<dyn Any>)
} else {
Ok(Box::new(kvrpcpb::CommitResponse::default()) as Box<dyn Any>)

View File

@ -25,11 +25,6 @@ pub async fn clear_tikv() {
// To test with multiple regions, prewrite some data. Tests that hope to test
// with multiple regions should use keys in the corresponding ranges.
pub async fn init() -> Result<()> {
// ignore SetLoggerError
let _ = simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Warn)
.init();
if env::var(ENV_ENABLE_MULIT_REGION).is_ok() {
// 1000 keys: 0..1000
let keys_1 = std::iter::successors(Some(0u32), |x| Some(x + 1))
@ -88,8 +83,14 @@ async fn ensure_region_split(
pub fn pd_addrs() -> Vec<String> {
env::var(ENV_PD_ADDRS)
.expect(&format!("Expected {}:", ENV_PD_ADDRS))
.split(",")
.unwrap_or_else(|_| {
info!(
"Environment variable {} is not found. Using {:?} as default.",
ENV_PD_ADDRS, "127.0.0.1:2379"
);
"127.0.0.1:2379".to_owned()
})
.split(',')
.map(From::from)
.collect()
}

View File

@ -309,8 +309,8 @@ async fn txn_read() -> Result<()> {
.collect::<Vec<_>>();
let mut txn = client.begin_pessimistic().await?;
let res = txn.batch_get(keys.clone()).await?.collect::<Vec<_>>();
assert_eq!(res.len(), keys.len());
let res = txn.batch_get(keys.clone()).await?;
assert_eq!(res.count(), keys.len());
let res = txn.batch_get_for_update(keys.clone()).await?;
assert_eq!(res.len(), keys.len());
@ -322,7 +322,6 @@ async fn txn_read() -> Result<()> {
// FIXME: the test is temporarily ingnored since it's easy to fail when scheduling is frequent.
#[tokio::test]
#[serial]
#[ignore]
async fn txn_bank_transfer() -> Result<()> {
init().await?;
let client = TransactionClient::new(pd_addrs(), None).await?;
@ -699,7 +698,7 @@ async fn txn_get_for_update() -> Result<()> {
assert!(t1.get_for_update(key1.clone()).await?.unwrap() == value1);
t1.commit().await?;
assert!(t2.batch_get(keys.clone()).await?.collect::<Vec<_>>().len() == 0);
assert!(t2.batch_get(keys.clone()).await?.count() == 0);
let res: HashMap<_, _> = t2
.batch_get_for_update(keys.clone())
.await?
@ -713,7 +712,7 @@ async fn txn_get_for_update() -> Result<()> {
assert!(t3.get_for_update(key1).await?.is_none());
assert!(t3.commit().await.is_err());
assert!(t4.batch_get_for_update(keys).await?.len() == 0);
assert!(t4.batch_get_for_update(keys).await?.is_empty());
assert!(t4.commit().await.is_err());
Ok(())

View File

@ -15,6 +15,7 @@ lazy_static = "1"
log = "0.4"
regex = "1"
tikv-client-proto = { version = "0.1.0", path = "../tikv-client-proto" }
tokio = "1"
[dev-dependencies]
clap = "2"
@ -22,4 +23,4 @@ fail = { version = "0.4", features = [ "failpoints" ] }
proptest = "1"
proptest-derive = "0.3"
tempfile = "3"
tokio = "1.0"
tokio = "1"

View File

@ -33,6 +33,10 @@ pub enum Error {
"The operation is not supported in current mode, please consider using RawClient with or without atomic mode"
)]
UnsupportedMode,
#[error("There is no current_regions in the EpochNotMatch error")]
NoCurrentRegions,
#[error("The specified entry is not found in the region cache")]
EntryNotFoundInRegionCache,
/// Wraps a `std::io::Error`.
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
@ -51,18 +55,24 @@ pub enum Error {
/// Wraps `tikv_client_proto::kvrpcpb::KeyError`
#[error("{0:?}")]
KeyError(tikv_client_proto::kvrpcpb::KeyError),
/// Multiple errors
/// Multiple errors generated from the ExtractError plan.
#[error("Multiple errors: {0:?}")]
MultipleErrors(Vec<Error>),
ExtractedErrors(Vec<Error>),
/// Multiple key errors
#[error("Multiple key errors: {0:?}")]
MultipleKeyErrors(Vec<Error>),
/// Invalid ColumnFamily
#[error("Unsupported column family {}", _0)]
ColumnFamilyError(String),
/// Can't join tokio tasks
#[error("Failed to join tokio tasks")]
JoinError(#[from] tokio::task::JoinError),
/// No region is found for the given key.
#[error("Region is not found for key: {:?}", key)]
RegionForKeyNotFound { key: Vec<u8> },
/// No region is found for the given id.
#[error("Region {} is not found", region_id)]
RegionNotFound { region_id: u64 },
/// No region is found for the given id. note: distinguish it with the RegionNotFound error in errorpb.
#[error("Region {} is not found in the response", region_id)]
RegionNotFoundInResponse { region_id: u64 },
/// No leader is found for the given id.
#[error("Leader of region {} is not found", region_id)]
LeaderNotFound { region_id: u64 },

View File

@ -4,18 +4,30 @@ use crate::Error;
use std::fmt::Display;
use tikv_client_proto::kvrpcpb;
// Those that can have a single region error
pub trait HasRegionError {
fn region_error(&mut self) -> Option<Error>;
fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error>;
}
pub trait HasError: HasRegionError {
fn error(&mut self) -> Option<Error>;
// Those that can have multiple region errors
pub trait HasRegionErrors {
fn region_errors(&mut self) -> Option<Vec<tikv_client_proto::errorpb::Error>>;
}
pub trait HasKeyErrors {
fn key_errors(&mut self) -> Option<Vec<Error>>;
}
impl<T: HasRegionError> HasRegionErrors for T {
fn region_errors(&mut self) -> Option<Vec<tikv_client_proto::errorpb::Error>> {
self.region_error().map(|e| vec![e])
}
}
macro_rules! has_region_error {
($type:ty) => {
impl HasRegionError for $type {
fn region_error(&mut self) -> Option<Error> {
fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error> {
if self.has_region_error() {
Some(self.take_region_error().into())
} else {
@ -56,10 +68,10 @@ has_region_error!(kvrpcpb::RawCasResponse);
macro_rules! has_key_error {
($type:ty) => {
impl HasError for $type {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for $type {
fn key_errors(&mut self) -> Option<Vec<Error>> {
if self.has_error() {
Some(self.take_error().into())
Some(vec![self.take_error().into()])
} else {
None
}
@ -81,14 +93,14 @@ has_key_error!(kvrpcpb::CheckSecondaryLocksResponse);
macro_rules! has_str_error {
($type:ty) => {
impl HasError for $type {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for $type {
fn key_errors(&mut self) -> Option<Vec<Error>> {
if self.get_error().is_empty() {
None
} else {
Some(Error::KvError {
Some(vec![Error::KvError {
message: self.take_error(),
})
}])
}
}
}
@ -105,67 +117,67 @@ has_str_error!(kvrpcpb::RawCasResponse);
has_str_error!(kvrpcpb::ImportResponse);
has_str_error!(kvrpcpb::DeleteRangeResponse);
impl HasError for kvrpcpb::ScanResponse {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for kvrpcpb::ScanResponse {
fn key_errors(&mut self) -> Option<Vec<Error>> {
extract_errors(self.pairs.iter_mut().map(|pair| pair.error.take()))
}
}
impl HasError for kvrpcpb::BatchGetResponse {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for kvrpcpb::BatchGetResponse {
fn key_errors(&mut self) -> Option<Vec<Error>> {
extract_errors(self.pairs.iter_mut().map(|pair| pair.error.take()))
}
}
impl HasError for kvrpcpb::RawBatchGetResponse {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for kvrpcpb::RawBatchGetResponse {
fn key_errors(&mut self) -> Option<Vec<Error>> {
extract_errors(self.pairs.iter_mut().map(|pair| pair.error.take()))
}
}
impl HasError for kvrpcpb::RawScanResponse {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for kvrpcpb::RawScanResponse {
fn key_errors(&mut self) -> Option<Vec<Error>> {
extract_errors(self.kvs.iter_mut().map(|pair| pair.error.take()))
}
}
impl HasError for kvrpcpb::RawBatchScanResponse {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for kvrpcpb::RawBatchScanResponse {
fn key_errors(&mut self) -> Option<Vec<Error>> {
extract_errors(self.kvs.iter_mut().map(|pair| pair.error.take()))
}
}
impl HasError for kvrpcpb::PrewriteResponse {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for kvrpcpb::PrewriteResponse {
fn key_errors(&mut self) -> Option<Vec<Error>> {
extract_errors(self.take_errors().into_iter().map(Some))
}
}
impl HasError for kvrpcpb::PessimisticLockResponse {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for kvrpcpb::PessimisticLockResponse {
fn key_errors(&mut self) -> Option<Vec<Error>> {
extract_errors(self.take_errors().into_iter().map(Some))
}
}
impl HasError for kvrpcpb::PessimisticRollbackResponse {
fn error(&mut self) -> Option<Error> {
impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse {
fn key_errors(&mut self) -> Option<Vec<Error>> {
extract_errors(self.take_errors().into_iter().map(Some))
}
}
impl<T: HasError, E: Display> HasError for Result<T, E> {
fn error(&mut self) -> Option<Error> {
impl<T: HasKeyErrors, E: Display> HasKeyErrors for Result<T, E> {
fn key_errors(&mut self) -> Option<Vec<Error>> {
match self {
Ok(x) => x.error(),
Err(e) => Some(Error::StringError(e.to_string())),
Ok(x) => x.key_errors(),
Err(e) => Some(vec![Error::StringError(e.to_string())]),
}
}
}
impl<T: HasError> HasError for Vec<T> {
fn error(&mut self) -> Option<Error> {
impl<T: HasKeyErrors> HasKeyErrors for Vec<T> {
fn key_errors(&mut self) -> Option<Vec<Error>> {
for t in self {
if let Some(e) = t.error() {
if let Some(e) = t.key_errors() {
return Some(e);
}
}
@ -175,37 +187,36 @@ impl<T: HasError> HasError for Vec<T> {
}
impl<T: HasRegionError, E> HasRegionError for Result<T, E> {
fn region_error(&mut self) -> Option<Error> {
fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error> {
self.as_mut().ok().and_then(|t| t.region_error())
}
}
impl<T: HasRegionError> HasRegionError for Vec<T> {
fn region_error(&mut self) -> Option<Error> {
for t in self {
if let Some(e) = t.region_error() {
return Some(e);
}
}
impl<T: HasRegionError> HasRegionErrors for Vec<T> {
fn region_errors(&mut self) -> Option<Vec<tikv_client_proto::errorpb::Error>> {
let errors: Vec<_> = self.iter_mut().filter_map(|x| x.region_error()).collect();
if errors.is_empty() {
None
} else {
Some(errors)
}
}
}
fn extract_errors(error_iter: impl Iterator<Item = Option<kvrpcpb::KeyError>>) -> Option<Error> {
fn extract_errors(
error_iter: impl Iterator<Item = Option<kvrpcpb::KeyError>>,
) -> Option<Vec<Error>> {
let errors: Vec<Error> = error_iter.flatten().map(Into::into).collect();
if errors.is_empty() {
None
} else if errors.len() == 1 {
Some(errors.into_iter().next().unwrap())
} else {
Some(Error::MultipleErrors(errors))
Some(errors)
}
}
#[cfg(test)]
mod test {
use super::HasError;
use super::HasKeyErrors;
use tikv_client_common::{internal_err, Error};
use tikv_client_proto::kvrpcpb;
#[test]
@ -215,7 +226,7 @@ mod test {
error: None,
commit_version: 0,
});
assert!(resp.error().is_none());
assert!(resp.key_errors().is_none());
let mut resp: Result<_, Error> = Ok(kvrpcpb::CommitResponse {
region_error: None,
@ -232,9 +243,9 @@ mod test {
}),
commit_version: 0,
});
assert!(resp.error().is_some());
assert!(resp.key_errors().is_some());
let mut resp: Result<kvrpcpb::CommitResponse, _> = Err(internal_err!("some error"));
assert!(resp.error().is_some());
assert!(resp.key_errors().is_some());
}
}

View File

@ -7,7 +7,7 @@ mod request;
#[doc(inline)]
pub use crate::{
client::{KvClient, KvConnect, TikvConnect},
errors::{HasError, HasRegionError},
errors::{HasKeyErrors, HasRegionError, HasRegionErrors},
request::Request,
};
pub use tikv_client_common::{security::SecurityManager, Error, Result};