fix: add rollback to txn; doesn't allow operation after successful commit/rollback

Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
ekexium 2020-11-06 17:47:55 +08:00
parent c7eeff9e4a
commit 03014f19c9
2 changed files with 82 additions and 20 deletions

View File

@ -9,9 +9,16 @@ use crate::{
};
use derive_new::new;
use futures::{executor::ThreadPool, prelude::*, stream::BoxStream};
use std::{iter, mem, ops::RangeBounds, sync::Arc};
use std::{iter, ops::RangeBounds, sync::Arc};
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
#[derive(PartialEq)]
enum TransactionStatus {
Normal,
Committed,
Rolledback,
}
/// A undo-able set of actions on the dataset.
///
/// Using a transaction you can prepare a set of actions (such as `get`, or `set`) on data at a
@ -30,6 +37,7 @@ use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
/// # });
/// ```
pub struct Transaction {
status: TransactionStatus,
timestamp: Timestamp,
buffer: Buffer,
bg_worker: ThreadPool,
@ -48,6 +56,7 @@ impl Transaction {
is_pessimistic: bool,
) -> Transaction {
Transaction {
status: TransactionStatus::Normal,
timestamp,
buffer: Default::default(),
bg_worker,
@ -73,6 +82,7 @@ impl Transaction {
/// # });
/// ```
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
self.check_status()?;
let key = key.into();
self.buffer
.get_or_else(key, |key| {
@ -98,6 +108,7 @@ impl Transaction {
/// # });
/// ```
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
self.check_status()?;
if !self.is_pessimistic {
Err(ErrorKind::InvalidTransactionType.into())
} else {
@ -138,6 +149,7 @@ impl Transaction {
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
self.check_status()?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
self.buffer
@ -174,6 +186,7 @@ impl Transaction {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
self.check_status()?;
if !self.is_pessimistic {
Err(ErrorKind::InvalidTransactionType.into())
} else {
@ -208,6 +221,7 @@ impl Transaction {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
self.check_status()?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
@ -240,6 +254,7 @@ impl Transaction {
/// # });
/// ```
pub async fn put(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.check_status()?;
let key = key.into();
if self.is_pessimistic {
self.pessimistic_lock(iter::once(key.clone())).await?;
@ -263,6 +278,7 @@ impl Transaction {
/// # });
/// ```
pub async fn delete(&mut self, key: impl Into<Key>) -> Result<()> {
self.check_status()?;
let key = key.into();
if self.is_pessimistic {
self.pessimistic_lock(iter::once(key.clone())).await?;
@ -285,6 +301,7 @@ impl Transaction {
/// # });
/// ```
pub async fn lock_keys(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
self.check_status()?;
for key in keys {
self.buffer.lock(key.into());
}
@ -305,7 +322,8 @@ impl Transaction {
/// # });
/// ```
pub async fn commit(&mut self) -> Result<()> {
TwoPhaseCommitter::new(
self.check_status()?;
let res = TwoPhaseCommitter::new(
self.buffer.to_proto_mutations(),
self.timestamp.version(),
self.bg_worker.clone(),
@ -313,7 +331,33 @@ impl Transaction {
self.for_update_ts,
)
.commit()
.await
.await;
if res.is_ok() {
self.status = TransactionStatus::Committed;
}
res
}
/// Rollback the transaction.
///
/// If it succeeds, all mutations made by this transaciton will not take effect.
pub async fn rollback(&mut self) -> Result<()> {
self.check_status()?;
let res = TwoPhaseCommitter::new(
self.buffer.to_proto_mutations(),
self.timestamp.version(),
self.bg_worker.clone(),
self.rpc.clone(),
self.for_update_ts,
)
.rollback()
.await;
if res.is_ok() {
self.status = TransactionStatus::Rolledback;
}
res
}
/// Pessimistically lock the keys
@ -321,6 +365,7 @@ impl Transaction {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<()> {
self.check_status()?;
let mut keys: Vec<Vec<u8>> = keys
.into_iter()
.map(|it| it.into())
@ -341,6 +386,14 @@ impl Transaction {
.execute(self.rpc.clone(), PESSIMISTIC_BACKOFF)
.await
}
fn check_status(&self) -> Result<()> {
match self.status {
TransactionStatus::Normal => Ok(()),
TransactionStatus::Committed => Err(ErrorKind::OperationAfterCommitError.into()),
TransactionStatus::Rolledback => Err(ErrorKind::OperationAfterCommitError.into()),
}
}
}
/// The default TTL of a lock in milliseconds
@ -354,21 +407,17 @@ struct TwoPhaseCommitter {
rpc: Arc<PdRpcClient>,
for_update_ts: u64,
#[new(default)]
committed: bool,
#[new(default)]
undetermined: bool,
}
impl TwoPhaseCommitter {
async fn commit(mut self) -> Result<()> {
if self.mutations.is_empty() {
self.committed = true;
return Ok(());
}
self.prewrite().await?;
match self.commit_primary().await {
Ok(commit_version) => {
self.committed = true;
self.bg_worker
.clone()
.spawn_ok(self.commit_secondary(commit_version).map(|res| {
@ -433,14 +482,14 @@ impl TwoPhaseCommitter {
Ok(commit_version)
}
async fn commit_secondary(mut self, commit_version: u64) -> Result<()> {
let mutations = mem::take(&mut self.mutations);
async fn commit_secondary(self, commit_version: u64) -> Result<()> {
// No need to commit secondary keys when there is only one key
if mutations.len() == 1 {
if self.mutations.len() == 1 {
return Ok(());
}
let keys = mutations
let keys = self
.mutations
.into_iter()
.skip(1) // skip primary key
.map(|mutation| mutation.key.into())
@ -450,8 +499,8 @@ impl TwoPhaseCommitter {
.await
}
fn rollback(&mut self) -> impl Future<Output = Result<()>> + 'static {
let mutations = mem::take(&mut self.mutations);
fn rollback(&self) -> impl Future<Output = Result<()>> + 'static {
let mutations = self.mutations.clone();
let keys = mutations
.into_iter()
.map(|mutation| mutation.key.into())
@ -466,14 +515,24 @@ impl TwoPhaseCommitter {
}
}
impl Drop for TwoPhaseCommitter {
impl Drop for Transaction {
fn drop(&mut self) {
if !self.committed {
self.bg_worker.clone().spawn_ok(self.rollback().map(|res| {
if let Err(e) = res {
warn!("Failed to rollback: {}", e);
}
}))
if self.status == TransactionStatus::Normal {
self.bg_worker.spawn_ok(
TwoPhaseCommitter::new(
self.buffer.to_proto_mutations(),
self.timestamp.version(),
self.bg_worker.clone(),
self.rpc.clone(),
self.for_update_ts,
)
.rollback()
.map(|res| {
if let Err(e) = res {
warn!("Failed to rollback at Drop: {}", e);
}
}),
)
}
}
}

View File

@ -72,6 +72,9 @@ pub enum ErrorKind {
/// Will raise this error when using a pessimistic txn only operation on an optimistic txn
#[fail(display = "Invalid operation for this type of transaction")]
InvalidTransactionType,
/// It's not allowed to perform operations in a transaction after it has been committed or rolled back.
#[fail(display = "Cannot operate after transaction successfully committed or rolled back")]
OperationAfterCommitError,
}
impl Fail for Error {