Add background worker

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2019-08-29 13:58:53 +08:00
parent 0f93a3c756
commit c89c0fc21a
5 changed files with 48 additions and 52 deletions

View File

@ -62,7 +62,7 @@ async fn main() {
Config::new(args.pd)
};
let txn = Client::connect(config)
let txn = Client::new(config)
.await
.expect("Could not connect to tikv");

View File

@ -116,6 +116,4 @@ pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value};
#[doc(inline)]
pub use crate::raw::{Client as RawClient, ColumnFamily};
#[doc(inline)]
pub use crate::transaction::{
Client as TransactionClient, Connect, Snapshot, Timestamp, Transaction,
};
pub use crate::transaction::{Client as TransactionClient, Snapshot, Timestamp, Transaction};

View File

@ -6,15 +6,15 @@ use crate::{
Config, Result,
};
use derive_new::new;
use futures::prelude::*;
use futures::task::{Context, Poll};
use std::pin::Pin;
use futures::executor::ThreadPool;
use std::sync::Arc;
/// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster.
pub struct Client {
pd: Arc<PdRpcClient>,
/// The thread pool for background tasks including committing secondary keys and failed
/// transaction cleanups.
bg_worker: ThreadPool,
}
impl Client {
@ -24,12 +24,15 @@ impl Client {
/// use tikv_client::{Config, TransactionClient};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = TransactionClient::connect(Config::default());
/// let client = connect.await.unwrap();
/// let client = TransactionClient::new(Config::default()).await.unwrap;
/// # });
/// ```
pub fn connect(config: Config) -> Connect {
Connect::new(config)
pub async fn new(config: Config) -> Result<Client> {
let bg_worker = ThreadPool::new()?;
// TODO: PdRpcClient::connect currently uses a blocking implementation.
// Make it asynchronous later.
let pd = Arc::new(PdRpcClient::connect(&config)?);
Ok(Client { pd, bg_worker })
}
/// Creates a new [`Transaction`](Transaction).
@ -50,12 +53,12 @@ impl Client {
/// ```
pub async fn begin(&self) -> Result<Transaction> {
let timestamp = self.current_timestamp().await?;
Ok(Transaction::new(timestamp, self.pd.clone()))
Ok(self.new_transaction(timestamp))
}
/// Creates a new [`Snapshot`](Snapshot) at the given time.
pub fn snapshot(&self, timestamp: Timestamp) -> Snapshot {
Snapshot::new(Transaction::new(timestamp, self.pd.clone()))
Snapshot::new(self.new_transaction(timestamp))
}
/// Retrieves the current [`Timestamp`](Timestamp).
@ -72,34 +75,8 @@ impl Client {
pub async fn current_timestamp(&self) -> Result<Timestamp> {
self.pd.clone().get_timestamp().await
}
}
/// An unresolved [`Client`](Client) connection to a TiKV cluster.
///
/// Once resolved it will result in a connected [`Client`](Client).
///
/// ```rust,no_run
/// use tikv_client::{Config, TransactionClient, Connect};
/// use futures::prelude::*;
///
/// # futures::executor::block_on(async {
/// let connect: Connect = TransactionClient::connect(Config::default());
/// let client: TransactionClient = connect.await.unwrap();
/// # });
/// ```
#[derive(new)]
pub struct Connect {
config: Config,
}
impl Future for Connect {
type Output = Result<Client>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let config = &self.config;
// TODO: PdRpcClient::connect currently uses a blocking implementation.
// Make it asynchronous later.
let pd = Arc::new(PdRpcClient::connect(config)?);
Poll::Ready(Ok(Client { pd }))
fn new_transaction(&self, timestamp: Timestamp) -> Transaction {
Transaction::new(timestamp, self.bg_worker.clone(), self.pd.clone())
}
}

View File

@ -9,7 +9,7 @@
//! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace.
//!
pub use client::{Client, Connect};
pub use client::Client;
pub use lock::{resolve_locks, HasLocks};
pub use snapshot::Snapshot;
pub use transaction::Transaction;

View File

@ -8,6 +8,8 @@ use crate::{
};
use derive_new::new;
use futures::executor::ThreadPool;
use futures::prelude::*;
use futures::stream::BoxStream;
use kvproto::kvrpcpb;
use std::mem;
@ -37,6 +39,7 @@ pub struct Transaction {
timestamp: Timestamp,
#[new(default)]
buffer: Buffer,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
}
@ -183,6 +186,7 @@ impl Transaction {
TwoPhaseCommitter::new(
self.buffer.to_proto_mutations(),
self.timestamp.into_version(),
self.bg_worker.clone(),
self.rpc.clone(),
)
.commit()
@ -197,8 +201,11 @@ const DEFAULT_LOCK_TTL: u64 = 3000;
struct TwoPhaseCommitter {
mutations: Vec<kvrpcpb::Mutation>,
start_version: u64,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
#[new(default)]
committed: bool,
#[new(default)]
undetermined: bool,
}
@ -208,18 +215,22 @@ impl TwoPhaseCommitter {
return Ok(());
}
self.prewrite().await?;
// FIXME: rollback when prewrite fails
// FIXME: commit secondary keys in background
match self.commit_primary().await {
Ok(commit_version) => {
let _ = self.commit_secondary(commit_version).await;
self.committed = true;
self.bg_worker
.clone()
.spawn_ok(self.commit_secondary(commit_version).map(|res| {
if let Err(e) = res {
warn!("Failed to commit secondary keys: {}", e);
}
}));
Ok(())
}
Err(e) => {
if self.undetermined {
Err(Error::undetermined_error(e))
} else {
let _ = self.rollback().await;
Err(e)
}
}
@ -261,7 +272,7 @@ impl TwoPhaseCommitter {
}
}
async fn commit_secondary(&mut self, commit_version: u64) -> Result<()> {
async fn commit_secondary(mut self, commit_version: u64) -> Result<()> {
let mutations = mem::replace(&mut self.mutations, Vec::default());
let keys = mutations
.into_iter()
@ -273,14 +284,24 @@ impl TwoPhaseCommitter {
.await
}
async fn rollback(&mut self) -> Result<()> {
fn rollback(&mut self) -> impl Future<Output = Result<()>> + 'static {
let mutations = mem::replace(&mut self.mutations, Vec::default());
let keys = mutations
.into_iter()
.map(|mutation| mutation.key.into())
.collect();
new_batch_rollback_request(keys, self.start_version)
.execute(self.rpc.clone())
.await
new_batch_rollback_request(keys, self.start_version).execute(self.rpc.clone())
}
}
impl Drop for TwoPhaseCommitter {
fn drop(&mut self) {
if !self.committed {
self.bg_worker.clone().spawn_ok(self.rollback().map(|res| {
if let Err(e) = res {
warn!("Failed to rollback: {}", e);
}
}))
}
}
}