mirror of https://github.com/tikv/client-rust.git
address review comments
Signed-off-by: haojinming <jinming.hao@pingcap.com>
This commit is contained in:
parent
768df3c48d
commit
b22710b3ef
|
@ -446,20 +446,13 @@ where
|
||||||
pub struct CleanupLocksResult {
|
pub struct CleanupLocksResult {
|
||||||
pub region_error: Option<errorpb::Error>,
|
pub region_error: Option<errorpb::Error>,
|
||||||
pub key_error: Option<Vec<Error>>,
|
pub key_error: Option<Vec<Error>>,
|
||||||
pub meet_locks: usize,
|
pub resolved_locks: usize,
|
||||||
// TODO: pub resolved_locks: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CleanupLocksResult {
|
|
||||||
fn has_error(&self) -> bool {
|
|
||||||
self.key_error.is_some() || self.region_error.is_some()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clone for CleanupLocksResult {
|
impl Clone for CleanupLocksResult {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
meet_locks: self.meet_locks,
|
resolved_locks: self.resolved_locks,
|
||||||
..Default::default() // Ignore errors, which should be extracted by `extract_error()`.
|
..Default::default() // Ignore errors, which should be extracted by `extract_error()`.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -484,14 +477,8 @@ impl Merge<CleanupLocksResult> for Collect {
|
||||||
input
|
input
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.fold(Ok(CleanupLocksResult::default()), |acc, x| {
|
.fold(Ok(CleanupLocksResult::default()), |acc, x| {
|
||||||
let new_ret = x?;
|
|
||||||
let new_lock_cnt = if new_ret.has_error() {
|
|
||||||
0
|
|
||||||
} else {
|
|
||||||
new_ret.meet_locks
|
|
||||||
};
|
|
||||||
Ok(CleanupLocksResult {
|
Ok(CleanupLocksResult {
|
||||||
meet_locks: acc?.meet_locks + new_lock_cnt,
|
resolved_locks: acc?.resolved_locks + x?.resolved_locks,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -593,7 +580,7 @@ where
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
result.meet_locks += lock_size;
|
result.resolved_locks += lock_size;
|
||||||
}
|
}
|
||||||
Err(Error::ExtractedErrors(mut errors)) => {
|
Err(Error::ExtractedErrors(mut errors)) => {
|
||||||
// Propagate errors to `retry_multi_region` for retry.
|
// Propagate errors to `retry_multi_region` for retry.
|
||||||
|
|
|
@ -242,7 +242,7 @@ impl Client {
|
||||||
batch_size: SCAN_LOCK_BATCH_SIZE,
|
batch_size: SCAN_LOCK_BATCH_SIZE,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
self.cleanup_locks(&safepoint, vec![].., options).await?;
|
self.cleanup_locks(.., &safepoint, options).await?;
|
||||||
|
|
||||||
// update safepoint to PD
|
// update safepoint to PD
|
||||||
let res: bool = self
|
let res: bool = self
|
||||||
|
@ -258,8 +258,8 @@ impl Client {
|
||||||
|
|
||||||
pub async fn cleanup_locks(
|
pub async fn cleanup_locks(
|
||||||
&self,
|
&self,
|
||||||
safepoint: &Timestamp,
|
|
||||||
range: impl Into<BoundRange>,
|
range: impl Into<BoundRange>,
|
||||||
|
safepoint: &Timestamp,
|
||||||
options: ResolveLocksOptions,
|
options: ResolveLocksOptions,
|
||||||
) -> Result<CleanupLocksResult> {
|
) -> Result<CleanupLocksResult> {
|
||||||
debug!(self.logger, "invoking cleanup async commit locks");
|
debug!(self.logger, "invoking cleanup async commit locks");
|
||||||
|
@ -270,8 +270,8 @@ impl Client {
|
||||||
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
|
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
|
||||||
.cleanup_locks(self.logger.clone(), ctx.clone(), options, backoff)
|
.cleanup_locks(self.logger.clone(), ctx.clone(), options, backoff)
|
||||||
.retry_multi_region(DEFAULT_REGION_BACKOFF)
|
.retry_multi_region(DEFAULT_REGION_BACKOFF)
|
||||||
.merge(crate::request::Collect)
|
|
||||||
.extract_error()
|
.extract_error()
|
||||||
|
.merge(crate::request::Collect)
|
||||||
.plan();
|
.plan();
|
||||||
plan.execute().await
|
plan.execute().await
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,7 +94,7 @@ async fn txn_cleanup_locks_batch_size() -> Result<()> {
|
||||||
|
|
||||||
init().await?;
|
init().await?;
|
||||||
let scenario = FailScenario::setup();
|
let scenario = FailScenario::setup();
|
||||||
let full_range = vec![]..;
|
let full_range = ..;
|
||||||
|
|
||||||
fail::cfg("after-prewrite", "return").unwrap();
|
fail::cfg("after-prewrite", "return").unwrap();
|
||||||
fail::cfg("before-cleanup-locks", "return").unwrap();
|
fail::cfg("before-cleanup-locks", "return").unwrap();
|
||||||
|
@ -113,10 +113,10 @@ async fn txn_cleanup_locks_batch_size() -> Result<()> {
|
||||||
batch_size: 4,
|
batch_size: 4,
|
||||||
};
|
};
|
||||||
let res = client
|
let res = client
|
||||||
.cleanup_locks(&safepoint, full_range, options)
|
.cleanup_locks(full_range, &safepoint, options)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
assert_eq!(res.meet_locks, keys.len());
|
assert_eq!(res.resolved_locks, keys.len());
|
||||||
assert_eq!(count_locks(&client).await?, keys.len());
|
assert_eq!(count_locks(&client).await?, keys.len());
|
||||||
|
|
||||||
scenario.teardown();
|
scenario.teardown();
|
||||||
|
@ -130,7 +130,7 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
|
||||||
|
|
||||||
init().await?;
|
init().await?;
|
||||||
let scenario = FailScenario::setup();
|
let scenario = FailScenario::setup();
|
||||||
let full_range = vec![]..;
|
let full_range = ..;
|
||||||
|
|
||||||
// no commit
|
// no commit
|
||||||
{
|
{
|
||||||
|
@ -150,7 +150,7 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
client
|
client
|
||||||
.cleanup_locks(&safepoint, full_range.clone(), options)
|
.cleanup_locks(full_range, &safepoint, options)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
must_committed(&client, keys).await;
|
must_committed(&client, keys).await;
|
||||||
|
@ -177,7 +177,7 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
client
|
client
|
||||||
.cleanup_locks(&safepoint, full_range.clone(), options)
|
.cleanup_locks(full_range, &safepoint, options)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
must_committed(&client, keys).await;
|
must_committed(&client, keys).await;
|
||||||
|
@ -196,7 +196,7 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
client
|
client
|
||||||
.cleanup_locks(&safepoint, full_range, options)
|
.cleanup_locks(full_range, &safepoint, options)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
must_committed(&client, keys).await;
|
must_committed(&client, keys).await;
|
||||||
|
@ -240,17 +240,17 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let res = client
|
let res = client
|
||||||
.cleanup_locks(&safepoint, start_key..end_key, options)
|
.cleanup_locks(start_key..end_key, &safepoint, options)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
assert_eq!(res.meet_locks, keys.len() - 3);
|
assert_eq!(res.resolved_locks, keys.len() - 3);
|
||||||
|
|
||||||
// cleanup all locks to avoid affecting following cases.
|
// cleanup all locks to avoid affecting following cases.
|
||||||
let options = ResolveLocksOptions {
|
let options = ResolveLocksOptions {
|
||||||
async_commit_only: false,
|
async_commit_only: false,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
client.cleanup_locks(&safepoint, vec![].., options).await?;
|
client.cleanup_locks(.., &safepoint, options).await?;
|
||||||
must_committed(&client, keys).await;
|
must_committed(&client, keys).await;
|
||||||
assert_eq!(count_locks(&client).await?, 0);
|
assert_eq!(count_locks(&client).await?, 0);
|
||||||
|
|
||||||
|
@ -265,7 +265,7 @@ async fn txn_cleanup_2pc_locks() -> Result<()> {
|
||||||
|
|
||||||
init().await?;
|
init().await?;
|
||||||
let scenario = FailScenario::setup();
|
let scenario = FailScenario::setup();
|
||||||
let full_range = vec![]..;
|
let full_range = ..;
|
||||||
|
|
||||||
// no commit
|
// no commit
|
||||||
{
|
{
|
||||||
|
@ -286,7 +286,7 @@ async fn txn_cleanup_2pc_locks() -> Result<()> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
client
|
client
|
||||||
.cleanup_locks(&safepoint, full_range.clone(), options)
|
.cleanup_locks(full_range, &safepoint, options)
|
||||||
.await?;
|
.await?;
|
||||||
assert_eq!(count_locks(&client).await?, keys.len());
|
assert_eq!(count_locks(&client).await?, keys.len());
|
||||||
}
|
}
|
||||||
|
@ -295,7 +295,7 @@ async fn txn_cleanup_2pc_locks() -> Result<()> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
client
|
client
|
||||||
.cleanup_locks(&safepoint, full_range.clone(), options)
|
.cleanup_locks(full_range, &safepoint, options)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
must_rollbacked(&client, keys).await;
|
must_rollbacked(&client, keys).await;
|
||||||
|
@ -315,7 +315,7 @@ async fn txn_cleanup_2pc_locks() -> Result<()> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
client
|
client
|
||||||
.cleanup_locks(&safepoint, full_range, options)
|
.cleanup_locks(full_range, &safepoint, options)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
must_committed(&client, keys).await;
|
must_committed(&client, keys).await;
|
||||||
|
|
Loading…
Reference in New Issue