mirror of https://github.com/tikv/client-rust.git
Add undetermined error
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
parent
d677b32c12
commit
d926300a45
|
@ -7,7 +7,7 @@ use std::result;
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct Error {
|
||||
inner: Context<ErrorKind>,
|
||||
inner: Box<Context<ErrorKind>>,
|
||||
}
|
||||
|
||||
/// An error originating from the TiKV client or dependencies.
|
||||
|
@ -38,6 +38,9 @@ pub enum ErrorKind {
|
|||
/// No region is found for the given id.
|
||||
#[fail(display = "Leader of region {} is not found", region_id)]
|
||||
LeaderNotFound { region_id: u64 },
|
||||
/// Whether the transaction is committed or not is undetermined
|
||||
#[fail(display = "Whether the transaction is committed or not is undetermined")]
|
||||
UndeterminedError(#[fail(cause)] Error),
|
||||
/// Invalid key range to scan. Only left bounded intervals are supported.
|
||||
#[fail(display = "Only left bounded intervals are supported")]
|
||||
InvalidKeyRange,
|
||||
|
@ -122,19 +125,25 @@ impl Error {
|
|||
pub(crate) fn multiple_errors(errors: Vec<Error>) -> Self {
|
||||
Error::from(ErrorKind::MultipleErrors(errors))
|
||||
}
|
||||
|
||||
pub(crate) fn undetermined_error(error: Error) -> Self {
|
||||
Error::from(ErrorKind::UndeterminedError(error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ErrorKind> for Error {
|
||||
fn from(kind: ErrorKind) -> Error {
|
||||
Error {
|
||||
inner: Context::new(kind),
|
||||
inner: Box::new(Context::new(kind)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Context<ErrorKind>> for Error {
|
||||
fn from(inner: Context<ErrorKind>) -> Error {
|
||||
Error { inner }
|
||||
Error {
|
||||
inner: Box::new(inner),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ use crate::{
|
|||
pd::{PdClient, PdRpcClient},
|
||||
request::KvRequest,
|
||||
transaction::{buffer::Buffer, requests::*, Timestamp},
|
||||
ErrorKind, Key, KvPair, Result, Value,
|
||||
Error, ErrorKind, Key, KvPair, Result, Value,
|
||||
};
|
||||
|
||||
use derive_new::new;
|
||||
|
@ -195,6 +195,8 @@ struct TwoPhaseCommitter {
|
|||
mutations: Vec<kvrpcpb::Mutation>,
|
||||
start_version: u64,
|
||||
rpc: Arc<PdRpcClient>,
|
||||
#[new(default)]
|
||||
undetermined: bool,
|
||||
}
|
||||
|
||||
impl TwoPhaseCommitter {
|
||||
|
@ -203,20 +205,20 @@ impl TwoPhaseCommitter {
|
|||
return Ok(());
|
||||
}
|
||||
self.prewrite().await?;
|
||||
// FIXME: rollback when prewrite fails
|
||||
// FIXME: commit secondary keys in background
|
||||
match self.commit_primary().await {
|
||||
// FIXME: commit or rollback in background
|
||||
Ok(commit_version) => {
|
||||
let _ = self.commit_secondary(commit_version).await;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
match e.kind() {
|
||||
ErrorKind::Io(_) | ErrorKind::Grpc(_) => {}
|
||||
_ => {
|
||||
let _ = self.rollback_secondary().await;
|
||||
}
|
||||
if self.undetermined {
|
||||
Err(Error::undetermined_error(e))
|
||||
} else {
|
||||
let _ = self.rollback().await;
|
||||
Err(e)
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -238,10 +240,21 @@ impl TwoPhaseCommitter {
|
|||
async fn commit_primary(&mut self) -> Result<u64> {
|
||||
let primary_key = vec![self.mutations[0].key.clone().into()];
|
||||
let commit_version = self.rpc.clone().get_timestamp().await?.into_version();
|
||||
new_commit_request(primary_key, self.start_version, commit_version)
|
||||
match new_commit_request(primary_key, self.start_version, commit_version)
|
||||
.execute(self.rpc.clone())
|
||||
.await?;
|
||||
Ok(commit_version)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(commit_version),
|
||||
Err(e) => {
|
||||
// We don't know whether the transaction is committed or not if we fail to receive
|
||||
// the response. Then, we mark the transaction as undetermined and propagate the
|
||||
// error to the user.
|
||||
if let ErrorKind::Grpc(_) = e.kind() {
|
||||
self.undetermined = true;
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn commit_secondary(&mut self, commit_version: u64) -> Result<()> {
|
||||
|
@ -256,11 +269,10 @@ impl TwoPhaseCommitter {
|
|||
.await
|
||||
}
|
||||
|
||||
async fn rollback_secondary(&mut self) -> Result<()> {
|
||||
async fn rollback(&mut self) -> Result<()> {
|
||||
let mutations = mem::replace(&mut self.mutations, Vec::default());
|
||||
let keys = mutations
|
||||
.into_iter()
|
||||
.skip(1) // skip primary key
|
||||
.map(|mutation| mutation.key.into())
|
||||
.collect();
|
||||
new_batch_rollback_request(keys, self.start_version)
|
||||
|
|
Loading…
Reference in New Issue