From d926300a458d4b4430ae1856c1619240b53ea3a0 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Thu, 29 Aug 2019 00:27:02 +0800 Subject: [PATCH] Add undetermined error Signed-off-by: Yilin Chen --- src/errors.rs | 15 +++++++++++--- src/transaction/transaction.rs | 38 ++++++++++++++++++++++------------ 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index 4cf19a1..622a9ab 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -7,7 +7,7 @@ use std::result; #[derive(Debug)] pub struct Error { - inner: Context, + inner: Box>, } /// An error originating from the TiKV client or dependencies. @@ -38,6 +38,9 @@ pub enum ErrorKind { /// No region is found for the given id. #[fail(display = "Leader of region {} is not found", region_id)] LeaderNotFound { region_id: u64 }, + /// Whether the transaction is committed or not is undetermined + #[fail(display = "Whether the transaction is committed or not is undetermined")] + UndeterminedError(#[fail(cause)] Error), /// Invalid key range to scan. Only left bounded intervals are supported. #[fail(display = "Only left bounded intervals are supported")] InvalidKeyRange, @@ -122,19 +125,25 @@ impl Error { pub(crate) fn multiple_errors(errors: Vec) -> Self { Error::from(ErrorKind::MultipleErrors(errors)) } + + pub(crate) fn undetermined_error(error: Error) -> Self { + Error::from(ErrorKind::UndeterminedError(error)) + } } impl From for Error { fn from(kind: ErrorKind) -> Error { Error { - inner: Context::new(kind), + inner: Box::new(Context::new(kind)), } } } impl From> for Error { fn from(inner: Context) -> Error { - Error { inner } + Error { + inner: Box::new(inner), + } } } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 188d8e9..1e94d81 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -4,7 +4,7 @@ use crate::{ pd::{PdClient, PdRpcClient}, request::KvRequest, transaction::{buffer::Buffer, requests::*, Timestamp}, - ErrorKind, Key, KvPair, Result, Value, + Error, ErrorKind, Key, KvPair, Result, Value, }; use derive_new::new; @@ -195,6 +195,8 @@ struct TwoPhaseCommitter { mutations: Vec, start_version: u64, rpc: Arc, + #[new(default)] + undetermined: bool, } impl TwoPhaseCommitter { @@ -203,20 +205,20 @@ impl TwoPhaseCommitter { return Ok(()); } self.prewrite().await?; + // FIXME: rollback when prewrite fails + // FIXME: commit secondary keys in background match self.commit_primary().await { - // FIXME: commit or rollback in background Ok(commit_version) => { let _ = self.commit_secondary(commit_version).await; Ok(()) } Err(e) => { - match e.kind() { - ErrorKind::Io(_) | ErrorKind::Grpc(_) => {} - _ => { - let _ = self.rollback_secondary().await; - } + if self.undetermined { + Err(Error::undetermined_error(e)) + } else { + let _ = self.rollback().await; + Err(e) } - Err(e) } } } @@ -238,10 +240,21 @@ impl TwoPhaseCommitter { async fn commit_primary(&mut self) -> Result { let primary_key = vec![self.mutations[0].key.clone().into()]; let commit_version = self.rpc.clone().get_timestamp().await?.into_version(); - new_commit_request(primary_key, self.start_version, commit_version) + match new_commit_request(primary_key, self.start_version, commit_version) .execute(self.rpc.clone()) - .await?; - Ok(commit_version) + .await + { + Ok(_) => Ok(commit_version), + Err(e) => { + // We don't know whether the transaction is committed or not if we fail to receive + // the response. Then, we mark the transaction as undetermined and propagate the + // error to the user. + if let ErrorKind::Grpc(_) = e.kind() { + self.undetermined = true; + } + Err(e) + } + } } async fn commit_secondary(&mut self, commit_version: u64) -> Result<()> { @@ -256,11 +269,10 @@ impl TwoPhaseCommitter { .await } - async fn rollback_secondary(&mut self) -> Result<()> { + async fn rollback(&mut self) -> Result<()> { let mutations = mem::replace(&mut self.mutations, Vec::default()); let keys = mutations .into_iter() - .skip(1) // skip primary key .map(|mutation| mutation.key.into()) .collect(); new_batch_rollback_request(keys, self.start_version)