mirror of https://github.com/tikv/client-rust.git
transaction: Handle "commit ts expired" error (#491)
Signed-off-by: Ping Yu <yuping@pingcap.com>
This commit is contained in:
parent
fa7893173b
commit
ac9542152f
|
@ -103,7 +103,7 @@ pub enum Error {
|
||||||
#[error("{}", message)]
|
#[error("{}", message)]
|
||||||
InternalError { message: String },
|
InternalError { message: String },
|
||||||
#[error("{0}")]
|
#[error("{0}")]
|
||||||
StringError(String),
|
OtherError(String),
|
||||||
#[error("PessimisticLock error: {:?}", inner)]
|
#[error("PessimisticLock error: {:?}", inner)]
|
||||||
PessimisticLockError {
|
PessimisticLockError {
|
||||||
inner: Box<Error>,
|
inner: Box<Error>,
|
||||||
|
|
|
@ -13,7 +13,7 @@ pub use key::Key;
|
||||||
pub use kvpair::KvPair;
|
pub use kvpair::KvPair;
|
||||||
pub use value::Value;
|
pub use value::Value;
|
||||||
|
|
||||||
struct HexRepr<'a>(pub &'a [u8]);
|
pub struct HexRepr<'a>(pub &'a [u8]);
|
||||||
|
|
||||||
impl fmt::Display for HexRepr<'_> {
|
impl fmt::Display for HexRepr<'_> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
|
|
@ -117,7 +117,7 @@ impl<C: RetryClientTrait> RegionCache<C> {
|
||||||
return self.read_through_region_by_id(id).await;
|
return self.read_through_region_by_id(id).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(Error::StringError(format!(
|
Err(Error::OtherError(format!(
|
||||||
"Concurrent PD requests failed for {MAX_RETRY_WAITING_CONCURRENT_REQUEST} times"
|
"Concurrent PD requests failed for {MAX_RETRY_WAITING_CONCURRENT_REQUEST} times"
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
@ -315,7 +315,7 @@ mod test {
|
||||||
.filter(|(_, r)| r.contains(&key.clone().into()))
|
.filter(|(_, r)| r.contains(&key.clone().into()))
|
||||||
.map(|(_, r)| r.clone())
|
.map(|(_, r)| r.clone())
|
||||||
.next()
|
.next()
|
||||||
.ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned()))
|
.ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned()))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_region_by_id(
|
async fn get_region_by_id(
|
||||||
|
@ -330,7 +330,7 @@ mod test {
|
||||||
.filter(|(id, _)| id == &®ion_id)
|
.filter(|(id, _)| id == &®ion_id)
|
||||||
.map(|(_, r)| r.clone())
|
.map(|(_, r)| r.clone())
|
||||||
.next()
|
.next()
|
||||||
.ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned()))
|
.ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned()))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_store(
|
async fn get_store(
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||||
|
|
||||||
use std::fmt::Display;
|
|
||||||
|
|
||||||
use crate::proto::kvrpcpb;
|
use crate::proto::kvrpcpb;
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
|
|
||||||
|
@ -162,11 +160,15 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: HasKeyErrors, E: Display> HasKeyErrors for Result<T, E> {
|
impl<T: HasKeyErrors> HasKeyErrors for Result<T, Error> {
|
||||||
fn key_errors(&mut self) -> Option<Vec<Error>> {
|
fn key_errors(&mut self) -> Option<Vec<Error>> {
|
||||||
match self {
|
match self {
|
||||||
Ok(x) => x.key_errors(),
|
Ok(x) => x.key_errors(),
|
||||||
Err(e) => Some(vec![Error::StringError(e.to_string())]),
|
Err(Error::MultipleKeyErrors(errs)) => Some(std::mem::take(errs)),
|
||||||
|
Err(e) => Some(vec![std::mem::replace(
|
||||||
|
e,
|
||||||
|
Error::OtherError("".to_string()), // placeholder, no use.
|
||||||
|
)]),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,12 +11,15 @@ use derive_new::new;
|
||||||
use fail::fail_point;
|
use fail::fail_point;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
|
use log::error;
|
||||||
|
use log::info;
|
||||||
use log::warn;
|
use log::warn;
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
|
|
||||||
use crate::backoff::Backoff;
|
use crate::backoff::Backoff;
|
||||||
use crate::backoff::DEFAULT_REGION_BACKOFF;
|
use crate::backoff::DEFAULT_REGION_BACKOFF;
|
||||||
use crate::codec::ApiV1TxnCodec;
|
use crate::codec::ApiV1TxnCodec;
|
||||||
|
use crate::kv::HexRepr;
|
||||||
use crate::pd::PdClient;
|
use crate::pd::PdClient;
|
||||||
use crate::pd::PdRpcClient;
|
use crate::pd::PdRpcClient;
|
||||||
use crate::proto::kvrpcpb;
|
use crate::proto::kvrpcpb;
|
||||||
|
@ -1246,7 +1249,7 @@ impl<PdC: PdClient> Committer<PdC> {
|
||||||
let min_commit_ts = self.prewrite().await?;
|
let min_commit_ts = self.prewrite().await?;
|
||||||
|
|
||||||
fail_point!("after-prewrite", |_| {
|
fail_point!("after-prewrite", |_| {
|
||||||
Err(Error::StringError(
|
Err(Error::OtherError(
|
||||||
"failpoint: after-prewrite return error".to_owned(),
|
"failpoint: after-prewrite return error".to_owned(),
|
||||||
))
|
))
|
||||||
});
|
});
|
||||||
|
@ -1260,7 +1263,7 @@ impl<PdC: PdClient> Committer<PdC> {
|
||||||
// FIXME: min_commit_ts == 0 => fallback to normal 2PC
|
// FIXME: min_commit_ts == 0 => fallback to normal 2PC
|
||||||
min_commit_ts.unwrap()
|
min_commit_ts.unwrap()
|
||||||
} else {
|
} else {
|
||||||
match self.commit_primary().await {
|
match self.commit_primary_with_retry().await {
|
||||||
Ok(commit_ts) => commit_ts,
|
Ok(commit_ts) => commit_ts,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return if self.undetermined {
|
return if self.undetermined {
|
||||||
|
@ -1365,6 +1368,11 @@ impl<PdC: PdClient> Committer<PdC> {
|
||||||
.plan();
|
.plan();
|
||||||
plan.execute()
|
plan.execute()
|
||||||
.inspect_err(|e| {
|
.inspect_err(|e| {
|
||||||
|
debug!(
|
||||||
|
"commit primary error: {:?}, start_ts: {}",
|
||||||
|
e,
|
||||||
|
self.start_version.version()
|
||||||
|
);
|
||||||
// We don't know whether the transaction is committed or not if we fail to receive
|
// We don't know whether the transaction is committed or not if we fail to receive
|
||||||
// the response. Then, we mark the transaction as undetermined and propagate the
|
// the response. Then, we mark the transaction as undetermined and propagate the
|
||||||
// error to the user.
|
// error to the user.
|
||||||
|
@ -1377,6 +1385,48 @@ impl<PdC: PdClient> Committer<PdC> {
|
||||||
Ok(commit_version)
|
Ok(commit_version)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn commit_primary_with_retry(&mut self) -> Result<Timestamp> {
|
||||||
|
loop {
|
||||||
|
match self.commit_primary().await {
|
||||||
|
Ok(commit_version) => return Ok(commit_version),
|
||||||
|
Err(Error::ExtractedErrors(mut errors)) => match errors.pop() {
|
||||||
|
Some(Error::KeyError(key_err)) => {
|
||||||
|
if let Some(expired) = key_err.commit_ts_expired {
|
||||||
|
// Ref: https://github.com/tikv/client-go/blob/tidb-8.5/txnkv/transaction/commit.go
|
||||||
|
info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}",
|
||||||
|
self.start_version.version());
|
||||||
|
|
||||||
|
let primary_key = self.primary_key.as_ref().unwrap();
|
||||||
|
if primary_key != expired.key.as_ref() {
|
||||||
|
error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}",
|
||||||
|
self.start_version.version(), HexRepr(&expired.key), primary_key);
|
||||||
|
return Err(Error::OtherError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do not retry for a txn which has a too large min_commit_ts.
|
||||||
|
// 3600000 << 18 = 943718400000
|
||||||
|
if expired
|
||||||
|
.min_commit_ts
|
||||||
|
.saturating_sub(expired.attempted_commit_ts)
|
||||||
|
> 943718400000
|
||||||
|
{
|
||||||
|
let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}",
|
||||||
|
expired.min_commit_ts, expired.attempted_commit_ts);
|
||||||
|
return Err(Error::OtherError(msg));
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return Err(Error::KeyError(key_err));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(err) => return Err(err),
|
||||||
|
None => unreachable!(),
|
||||||
|
},
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
|
async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
|
||||||
debug!("committing secondary");
|
debug!("committing secondary");
|
||||||
let mutations_len = self.mutations.len();
|
let mutations_len = self.mutations.len();
|
||||||
|
@ -1394,7 +1444,7 @@ impl<PdC: PdClient> Committer<PdC> {
|
||||||
let percent = percent.unwrap().parse::<usize>().unwrap();
|
let percent = percent.unwrap().parse::<usize>().unwrap();
|
||||||
new_len = mutations_len * percent / 100;
|
new_len = mutations_len * percent / 100;
|
||||||
if new_len == 0 {
|
if new_len == 0 {
|
||||||
Err(Error::StringError(
|
Err(Error::OtherError(
|
||||||
"failpoint: before-commit-secondary return error".to_owned(),
|
"failpoint: before-commit-secondary return error".to_owned(),
|
||||||
))
|
))
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -10,16 +10,16 @@ use crate::common::Result;
|
||||||
pub async fn get_region_count() -> Result<u64> {
|
pub async fn get_region_count() -> Result<u64> {
|
||||||
let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0]))
|
let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0]))
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::StringError(e.to_string()))?;
|
.map_err(|e| Error::OtherError(e.to_string()))?;
|
||||||
|
|
||||||
let body = res
|
let body = res
|
||||||
.text()
|
.text()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::StringError(e.to_string()))?;
|
.map_err(|e| Error::OtherError(e.to_string()))?;
|
||||||
let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| {
|
let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| {
|
||||||
panic!("invalid body: {:?}, error: {:?}", body, err);
|
panic!("invalid body: {:?}, error: {:?}", body, err);
|
||||||
});
|
});
|
||||||
value["count"]
|
value["count"]
|
||||||
.as_u64()
|
.as_u64()
|
||||||
.ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned()))
|
.ok_or_else(|| Error::OtherError("pd region count does not return an integer".to_owned()))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue