mirror of https://github.com/tikv/client-rust.git
Return live locks on resolve locks error (#417)
* API v2 part1 Signed-off-by: Ping Yu <yuping@pingcap.com> * inplace encoding Signed-off-by: Ping Yu <yuping@pingcap.com> * polish Signed-off-by: Ping Yu <yuping@pingcap.com> * polish Signed-off-by: Ping Yu <yuping@pingcap.com> * export proto Signed-off-by: Ping Yu <yuping@pingcap.com> * fix set_context Signed-off-by: Ping Yu <yuping@pingcap.com> * get live locks Signed-off-by: Ping Yu <yuping@pingcap.com> * wip Signed-off-by: Ping Yu <yuping@pingcap.com> * add Codec parameter to Transaction & Snapshot Signed-off-by: Ping Yu <yuping@pingcap.com> * polish Signed-off-by: Ping Yu <yuping@pingcap.com> --------- Signed-off-by: Ping Yu <yuping@pingcap.com>
This commit is contained in:
parent
4b0e844a40
commit
d42b31a09d
|
@ -4,6 +4,7 @@ use std::result;
|
||||||
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
|
use crate::proto::kvrpcpb;
|
||||||
use crate::BoundRange;
|
use crate::BoundRange;
|
||||||
|
|
||||||
/// An error originating from the TiKV client or dependencies.
|
/// An error originating from the TiKV client or dependencies.
|
||||||
|
@ -18,7 +19,7 @@ pub enum Error {
|
||||||
DuplicateKeyInsertion,
|
DuplicateKeyInsertion,
|
||||||
/// Failed to resolve a lock
|
/// Failed to resolve a lock
|
||||||
#[error("Failed to resolve lock")]
|
#[error("Failed to resolve lock")]
|
||||||
ResolveLockError,
|
ResolveLockError(Vec<kvrpcpb::LockInfo>),
|
||||||
/// Will raise this error when using a pessimistic txn only operation on an optimistic txn
|
/// Will raise this error when using a pessimistic txn only operation on an optimistic txn
|
||||||
#[error("Invalid operation for this type of transaction")]
|
#[error("Invalid operation for this type of transaction")]
|
||||||
InvalidTransactionType,
|
InvalidTransactionType,
|
||||||
|
|
|
@ -434,15 +434,16 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.backoff.is_none() {
|
if self.backoff.is_none() {
|
||||||
return Err(Error::ResolveLockError);
|
return Err(Error::ResolveLockError(locks));
|
||||||
}
|
}
|
||||||
|
|
||||||
let pd_client = self.pd_client.clone();
|
let pd_client = self.pd_client.clone();
|
||||||
if resolve_locks(locks, pd_client.clone()).await? {
|
let live_locks = resolve_locks(locks, pd_client.clone()).await?;
|
||||||
|
if live_locks.is_empty() {
|
||||||
result = self.inner.execute().await?;
|
result = self.inner.execute().await?;
|
||||||
} else {
|
} else {
|
||||||
match clone.backoff.next_delay_duration() {
|
match clone.backoff.next_delay_duration() {
|
||||||
None => return Err(Error::ResolveLockError),
|
None => return Err(Error::ResolveLockError(live_locks)),
|
||||||
Some(delay_duration) => {
|
Some(delay_duration) => {
|
||||||
sleep(delay_duration).await;
|
sleep(delay_duration).await;
|
||||||
result = clone.inner.execute().await?;
|
result = clone.inner.execute().await?;
|
||||||
|
|
|
@ -34,7 +34,7 @@ use crate::Result;
|
||||||
|
|
||||||
const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;
|
const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;
|
||||||
|
|
||||||
/// _Resolves_ the given locks. Returns whether all the given locks are resolved.
|
/// _Resolves_ the given locks. Returns locks still live. When there is no live locks, all the given locks are resolved.
|
||||||
///
|
///
|
||||||
/// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock,
|
/// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock,
|
||||||
/// which means the key is finally either committed or rolled back, before we read the value of
|
/// which means the key is finally either committed or rolled back, before we read the value of
|
||||||
|
@ -44,17 +44,15 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;
|
||||||
pub async fn resolve_locks(
|
pub async fn resolve_locks(
|
||||||
locks: Vec<kvrpcpb::LockInfo>,
|
locks: Vec<kvrpcpb::LockInfo>,
|
||||||
pd_client: Arc<impl PdClient>,
|
pd_client: Arc<impl PdClient>,
|
||||||
) -> Result<bool> {
|
) -> Result<Vec<kvrpcpb::LockInfo> /* live_locks */> {
|
||||||
debug!("resolving locks");
|
debug!("resolving locks");
|
||||||
let ts = pd_client.clone().get_timestamp().await?;
|
let ts = pd_client.clone().get_timestamp().await?;
|
||||||
let mut has_live_locks = false;
|
let (expired_locks, live_locks) =
|
||||||
let expired_locks = locks.into_iter().filter(|lock| {
|
locks
|
||||||
let expired = ts.physical - Timestamp::from_version(lock.lock_version).physical
|
.into_iter()
|
||||||
>= lock.lock_ttl as i64;
|
.partition::<Vec<kvrpcpb::LockInfo>, _>(|lock| {
|
||||||
if !expired {
|
ts.physical - Timestamp::from_version(lock.lock_version).physical
|
||||||
has_live_locks = true;
|
>= lock.lock_ttl as i64
|
||||||
}
|
|
||||||
expired
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// records the commit version of each primary lock (representing the status of the transaction)
|
// records the commit version of each primary lock (representing the status of the transaction)
|
||||||
|
@ -103,7 +101,7 @@ pub async fn resolve_locks(
|
||||||
.or_insert_with(HashSet::new)
|
.or_insert_with(HashSet::new)
|
||||||
.insert(cleaned_region);
|
.insert(cleaned_region);
|
||||||
}
|
}
|
||||||
Ok(!has_live_locks)
|
Ok(live_locks)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resolve_lock_with_retry(
|
async fn resolve_lock_with_retry(
|
||||||
|
@ -290,12 +288,12 @@ impl LockResolver {
|
||||||
}
|
}
|
||||||
|
|
||||||
match &status.kind {
|
match &status.kind {
|
||||||
TransactionStatusKind::Locked(..) => {
|
TransactionStatusKind::Locked(_, lock_info) => {
|
||||||
error!(
|
error!(
|
||||||
"cleanup_locks fail to clean locks, this result is not expected. txn_id:{}",
|
"cleanup_locks fail to clean locks, this result is not expected. txn_id:{}",
|
||||||
txn_id
|
txn_id
|
||||||
);
|
);
|
||||||
return Err(Error::ResolveLockError);
|
return Err(Error::ResolveLockError(vec![lock_info.clone()]));
|
||||||
}
|
}
|
||||||
TransactionStatusKind::Committed(ts) => txn_infos.insert(txn_id, ts.version()),
|
TransactionStatusKind::Committed(ts) => txn_infos.insert(txn_id, ts.version()),
|
||||||
TransactionStatusKind::RolledBack => txn_infos.insert(txn_id, 0),
|
TransactionStatusKind::RolledBack => txn_infos.insert(txn_id, 0),
|
||||||
|
|
|
@ -950,7 +950,7 @@ mod tests {
|
||||||
let input = vec![
|
let input = vec![
|
||||||
Ok(resp1),
|
Ok(resp1),
|
||||||
Ok(resp_empty_value),
|
Ok(resp_empty_value),
|
||||||
Err(ResolveLockError),
|
Err(ResolveLockError(vec![])),
|
||||||
Ok(resp_not_found),
|
Ok(resp_not_found),
|
||||||
];
|
];
|
||||||
let result = merger.merge(input);
|
let result = merger.merge(input);
|
||||||
|
@ -960,7 +960,7 @@ mod tests {
|
||||||
success_keys,
|
success_keys,
|
||||||
} = result.unwrap_err()
|
} = result.unwrap_err()
|
||||||
{
|
{
|
||||||
assert!(matches!(*inner, ResolveLockError));
|
assert!(matches!(*inner, ResolveLockError(_)));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
success_keys,
|
success_keys,
|
||||||
vec![key1.to_vec(), key2.to_vec(), key3.to_vec(), key4.to_vec()]
|
vec![key1.to_vec(), key2.to_vec(), key3.to_vec(), key4.to_vec()]
|
||||||
|
|
Loading…
Reference in New Issue