mirror of https://github.com/tikv/client-rust.git
Address comments
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
parent
f093c64f21
commit
16a2abc9a2
|
@ -42,3 +42,4 @@ runtime = { version = "0.3.0-alpha.7", default-features = false }
|
|||
runtime-tokio = "0.3.0-alpha.6"
|
||||
proptest = "0.9"
|
||||
proptest-derive = "0.1.0"
|
||||
fail = { version = "0.3", features = [ "failpoints" ] }
|
||||
|
|
26
src/mock.rs
26
src/mock.rs
|
@ -8,15 +8,15 @@
|
|||
use crate::{
|
||||
kv_client::{KvClient, KvConnect, Store},
|
||||
pd::{PdClient, PdRpcClient, Region, RegionId, RetryClient},
|
||||
request::KvRequest,
|
||||
request::{DispatchHook, KvRequest},
|
||||
transaction::Timestamp,
|
||||
Config, Error, Key, Result,
|
||||
};
|
||||
|
||||
use futures::future::ready;
|
||||
use futures::future::BoxFuture;
|
||||
use fail::fail_point;
|
||||
use futures::future::{ready, BoxFuture, FutureExt};
|
||||
use grpcio::CallOption;
|
||||
use kvproto::metapb;
|
||||
use kvproto::{errorpb, kvrpcpb, metapb};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
|
||||
|
@ -68,7 +68,7 @@ impl KvConnect for MockKvConnect {
|
|||
}
|
||||
|
||||
impl MockPdClient {
|
||||
fn region1() -> Region {
|
||||
pub fn region1() -> Region {
|
||||
let mut region = Region::default();
|
||||
region.region.id = 1;
|
||||
region.region.set_start_key(vec![0]);
|
||||
|
@ -81,7 +81,7 @@ impl MockPdClient {
|
|||
region
|
||||
}
|
||||
|
||||
fn region2() -> Region {
|
||||
pub fn region2() -> Region {
|
||||
let mut region = Region::default();
|
||||
region.region.id = 2;
|
||||
region.region.set_start_key(vec![10]);
|
||||
|
@ -136,3 +136,17 @@ impl PdClient for MockPdClient {
|
|||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl DispatchHook for kvrpcpb::ResolveLockRequest {
|
||||
fn dispatch_hook(
|
||||
&self,
|
||||
_opt: CallOption,
|
||||
) -> Option<BoxFuture<'static, Result<kvrpcpb::ResolveLockResponse>>> {
|
||||
fail_point!("region-error", |_| {
|
||||
let mut resp = kvrpcpb::ResolveLockResponse::default();
|
||||
resp.region_error = Some(errorpb::Error::default());
|
||||
Some(ready(Ok(resp)).boxed())
|
||||
});
|
||||
Some(ready(Ok(kvrpcpb::ResolveLockResponse::default())).boxed())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,15 @@ use kvproto::kvrpcpb;
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;
|
||||
|
||||
/// _Resolves_ the given locks.
|
||||
///
|
||||
/// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock,
|
||||
/// which means the key is finally either committed or rolled back, before we read the value of
|
||||
/// the key. We first use `CleanupRequest` to let the status of the primary lock converge and get
|
||||
/// its status (committed or rolled back). Then, we use the status of its primary lock to determine
|
||||
/// the status of the other keys in the same transaction.
|
||||
pub async fn resolve_locks(
|
||||
locks: Vec<kvrpcpb::LockInfo>,
|
||||
pd_client: Arc<impl PdClient>,
|
||||
|
@ -63,13 +72,15 @@ async fn resolve_lock_with_retry(
|
|||
commit_version: u64,
|
||||
pd_client: Arc<impl PdClient>,
|
||||
) -> Result<RegionVerId> {
|
||||
// TODO: Add backoff and retry limit
|
||||
loop {
|
||||
// TODO: Add backoff
|
||||
let mut error = None;
|
||||
for _ in 0..RESOLVE_LOCK_RETRY_LIMIT {
|
||||
let region = pd_client.region_for_key(&key).await?;
|
||||
let context = match region.context() {
|
||||
Ok(context) => context,
|
||||
Err(_) => {
|
||||
Err(e) => {
|
||||
// Retry if the region has no leader
|
||||
error = Some(e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
@ -83,12 +94,14 @@ async fn resolve_lock_with_retry(
|
|||
Err(e) => match e.kind() {
|
||||
ErrorKind::RegionError(_) => {
|
||||
// Retry on region error
|
||||
error = Some(e);
|
||||
continue;
|
||||
}
|
||||
_ => return Err(e),
|
||||
},
|
||||
}
|
||||
}
|
||||
Err(error.expect("no error is impossible"))
|
||||
}
|
||||
|
||||
pub trait HasLocks {
|
||||
|
@ -104,3 +117,30 @@ macro_rules! dummy_impl_has_locks {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::mock::MockPdClient;
|
||||
|
||||
use futures::executor;
|
||||
|
||||
#[test]
|
||||
fn test_resolve_lock_with_retry() {
|
||||
// Test resolve lock within retry limit
|
||||
fail::cfg("region-error", "9*return").unwrap();
|
||||
let client = Arc::new(MockPdClient);
|
||||
let key: Key = vec![1].into();
|
||||
let region1 = MockPdClient::region1();
|
||||
let resolved_region =
|
||||
executor::block_on(resolve_lock_with_retry(key, 1, 2, client.clone())).unwrap();
|
||||
assert_eq!(region1.ver_id(), resolved_region);
|
||||
|
||||
// Test resolve lock over retry limit
|
||||
fail::cfg("region-error", "10*return").unwrap();
|
||||
let client = Arc::new(MockPdClient);
|
||||
let key: Key = vec![100].into();
|
||||
executor::block_on(resolve_lock_with_retry(key, 3, 4, client.clone()))
|
||||
.expect_err("should return error");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue