From c89c0fc21a3b3aaccfee06e9d516f1335066deba Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Thu, 29 Aug 2019 13:58:53 +0800 Subject: [PATCH] Add background worker Signed-off-by: Yilin Chen --- examples/transaction.rs | 2 +- src/lib.rs | 4 +-- src/transaction/client.rs | 53 ++++++++++------------------------ src/transaction/mod.rs | 2 +- src/transaction/transaction.rs | 39 +++++++++++++++++++------ 5 files changed, 48 insertions(+), 52 deletions(-) diff --git a/examples/transaction.rs b/examples/transaction.rs index 8379368..0b4d982 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -62,7 +62,7 @@ async fn main() { Config::new(args.pd) }; - let txn = Client::connect(config) + let txn = Client::new(config) .await .expect("Could not connect to tikv"); diff --git a/src/lib.rs b/src/lib.rs index 2e4bf4a..1618f3d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -116,6 +116,4 @@ pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value}; #[doc(inline)] pub use crate::raw::{Client as RawClient, ColumnFamily}; #[doc(inline)] -pub use crate::transaction::{ - Client as TransactionClient, Connect, Snapshot, Timestamp, Transaction, -}; +pub use crate::transaction::{Client as TransactionClient, Snapshot, Timestamp, Transaction}; diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 0394a1f..56a2a11 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -6,15 +6,15 @@ use crate::{ Config, Result, }; -use derive_new::new; -use futures::prelude::*; -use futures::task::{Context, Poll}; -use std::pin::Pin; +use futures::executor::ThreadPool; use std::sync::Arc; /// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster. pub struct Client { pd: Arc, + /// The thread pool for background tasks including committing secondary keys and failed + /// transaction cleanups. + bg_worker: ThreadPool, } impl Client { @@ -24,12 +24,15 @@ impl Client { /// use tikv_client::{Config, TransactionClient}; /// use futures::prelude::*; /// # futures::executor::block_on(async { - /// let connect = TransactionClient::connect(Config::default()); - /// let client = connect.await.unwrap(); + /// let client = TransactionClient::new(Config::default()).await.unwrap; /// # }); /// ``` - pub fn connect(config: Config) -> Connect { - Connect::new(config) + pub async fn new(config: Config) -> Result { + let bg_worker = ThreadPool::new()?; + // TODO: PdRpcClient::connect currently uses a blocking implementation. + // Make it asynchronous later. + let pd = Arc::new(PdRpcClient::connect(&config)?); + Ok(Client { pd, bg_worker }) } /// Creates a new [`Transaction`](Transaction). @@ -50,12 +53,12 @@ impl Client { /// ``` pub async fn begin(&self) -> Result { let timestamp = self.current_timestamp().await?; - Ok(Transaction::new(timestamp, self.pd.clone())) + Ok(self.new_transaction(timestamp)) } /// Creates a new [`Snapshot`](Snapshot) at the given time. pub fn snapshot(&self, timestamp: Timestamp) -> Snapshot { - Snapshot::new(Transaction::new(timestamp, self.pd.clone())) + Snapshot::new(self.new_transaction(timestamp)) } /// Retrieves the current [`Timestamp`](Timestamp). @@ -72,34 +75,8 @@ impl Client { pub async fn current_timestamp(&self) -> Result { self.pd.clone().get_timestamp().await } -} -/// An unresolved [`Client`](Client) connection to a TiKV cluster. -/// -/// Once resolved it will result in a connected [`Client`](Client). -/// -/// ```rust,no_run -/// use tikv_client::{Config, TransactionClient, Connect}; -/// use futures::prelude::*; -/// -/// # futures::executor::block_on(async { -/// let connect: Connect = TransactionClient::connect(Config::default()); -/// let client: TransactionClient = connect.await.unwrap(); -/// # }); -/// ``` -#[derive(new)] -pub struct Connect { - config: Config, -} - -impl Future for Connect { - type Output = Result; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let config = &self.config; - // TODO: PdRpcClient::connect currently uses a blocking implementation. - // Make it asynchronous later. - let pd = Arc::new(PdRpcClient::connect(config)?); - Poll::Ready(Ok(Client { pd })) + fn new_transaction(&self, timestamp: Timestamp) -> Transaction { + Transaction::new(timestamp, self.bg_worker.clone(), self.pd.clone()) } } diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 1d6ea2c..3066d75 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -9,7 +9,7 @@ //! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace. //! -pub use client::{Client, Connect}; +pub use client::Client; pub use lock::{resolve_locks, HasLocks}; pub use snapshot::Snapshot; pub use transaction::Transaction; diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 3403764..f739fba 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -8,6 +8,8 @@ use crate::{ }; use derive_new::new; +use futures::executor::ThreadPool; +use futures::prelude::*; use futures::stream::BoxStream; use kvproto::kvrpcpb; use std::mem; @@ -37,6 +39,7 @@ pub struct Transaction { timestamp: Timestamp, #[new(default)] buffer: Buffer, + bg_worker: ThreadPool, rpc: Arc, } @@ -183,6 +186,7 @@ impl Transaction { TwoPhaseCommitter::new( self.buffer.to_proto_mutations(), self.timestamp.into_version(), + self.bg_worker.clone(), self.rpc.clone(), ) .commit() @@ -197,8 +201,11 @@ const DEFAULT_LOCK_TTL: u64 = 3000; struct TwoPhaseCommitter { mutations: Vec, start_version: u64, + bg_worker: ThreadPool, rpc: Arc, #[new(default)] + committed: bool, + #[new(default)] undetermined: bool, } @@ -208,18 +215,22 @@ impl TwoPhaseCommitter { return Ok(()); } self.prewrite().await?; - // FIXME: rollback when prewrite fails - // FIXME: commit secondary keys in background match self.commit_primary().await { Ok(commit_version) => { - let _ = self.commit_secondary(commit_version).await; + self.committed = true; + self.bg_worker + .clone() + .spawn_ok(self.commit_secondary(commit_version).map(|res| { + if let Err(e) = res { + warn!("Failed to commit secondary keys: {}", e); + } + })); Ok(()) } Err(e) => { if self.undetermined { Err(Error::undetermined_error(e)) } else { - let _ = self.rollback().await; Err(e) } } @@ -261,7 +272,7 @@ impl TwoPhaseCommitter { } } - async fn commit_secondary(&mut self, commit_version: u64) -> Result<()> { + async fn commit_secondary(mut self, commit_version: u64) -> Result<()> { let mutations = mem::replace(&mut self.mutations, Vec::default()); let keys = mutations .into_iter() @@ -273,14 +284,24 @@ impl TwoPhaseCommitter { .await } - async fn rollback(&mut self) -> Result<()> { + fn rollback(&mut self) -> impl Future> + 'static { let mutations = mem::replace(&mut self.mutations, Vec::default()); let keys = mutations .into_iter() .map(|mutation| mutation.key.into()) .collect(); - new_batch_rollback_request(keys, self.start_version) - .execute(self.rpc.clone()) - .await + new_batch_rollback_request(keys, self.start_version).execute(self.rpc.clone()) + } +} + +impl Drop for TwoPhaseCommitter { + fn drop(&mut self) { + if !self.committed { + self.bg_worker.clone().spawn_ok(self.rollback().map(|res| { + if let Err(e) = res { + warn!("Failed to rollback: {}", e); + } + })) + } } }