Merge pull request #109 from sticnarf/2pc

Implement 2PC
This commit is contained in:
Nick Cameron 2019-09-26 12:37:05 +12:00 committed by GitHub
commit 9d1256fba6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 480 additions and 134 deletions

View File

@ -11,7 +11,7 @@ async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>
let mut txn = client.begin().await.expect("Could not begin a transaction");
for pair in pairs {
let (key, value) = pair.into().into();
txn.set(key, value);
txn.set(key, value).await.expect("Could not set key value");
}
txn.commit().await.expect("Could not commit transaction");
}
@ -43,7 +43,7 @@ async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
let mut txn = client.begin().await.expect("Could not begin a transaction");
for key in keys {
txn.delete(key);
txn.delete(key).await.expect("Could not delete the key");
}
txn.commit().await.expect("Could not commit transaction");
}
@ -62,7 +62,7 @@ async fn main() {
Config::new(args.pd)
};
let txn = Client::connect(config)
let txn = Client::new(config)
.await
.expect("Could not connect to tikv");

View File

@ -7,7 +7,7 @@ use std::result;
#[derive(Debug)]
pub struct Error {
inner: Context<ErrorKind>,
inner: Box<Context<ErrorKind>>,
}
/// An error originating from the TiKV client or dependencies.
@ -38,6 +38,9 @@ pub enum ErrorKind {
/// No region is found for the given id.
#[fail(display = "Leader of region {} is not found", region_id)]
LeaderNotFound { region_id: u64 },
/// Whether the transaction is committed or not is undetermined
#[fail(display = "Whether the transaction is committed or not is undetermined")]
UndeterminedError(#[fail(cause)] Error),
/// Invalid key range to scan. Only left bounded intervals are supported.
#[fail(display = "Only left bounded intervals are supported")]
InvalidKeyRange,
@ -122,19 +125,25 @@ impl Error {
pub(crate) fn multiple_errors(errors: Vec<Error>) -> Self {
Error::from(ErrorKind::MultipleErrors(errors))
}
pub(crate) fn undetermined_error(error: Error) -> Self {
Error::from(ErrorKind::UndeterminedError(error))
}
}
impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Error {
Error {
inner: Context::new(kind),
inner: Box::new(Context::new(kind)),
}
}
}
impl From<Context<ErrorKind>> for Error {
fn from(inner: Context<ErrorKind>) -> Error {
Error { inner }
Error {
inner: Box::new(inner),
}
}
}

View File

@ -51,6 +51,7 @@ use std::{fmt, u8};
/// can be passed directly to those functions.
#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[cfg_attr(test, derive(Arbitrary))]
#[repr(transparent)]
pub struct Key(
#[cfg_attr(
test,
@ -128,6 +129,12 @@ impl AsRef<Key> for Key {
}
}
impl AsRef<Key> for Vec<u8> {
fn as_ref(&self) -> &Key {
unsafe { &*(self as *const Vec<u8> as *const Key) }
}
}
impl fmt::Debug for Key {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Key({})", HexRepr(&self.0))

View File

@ -54,7 +54,7 @@
//! Regardless of which API you choose, you'll need to connect your client
//! ([raw](raw::Client), [transactional](transaction::Client)).
//!
//! ```rust
//! ```rust,no_run
//! # use tikv_client::*;
//! # use futures::prelude::*;
//!
@ -65,11 +65,8 @@
//! "192.168.0.101:2379",
//! ]).with_security("root.ca", "internal.cert", "internal.key");
//!
//! // Get an unresolved connection.
//! let connect = TransactionClient::connect(config);
//!
//! // Resolve the connection into a client.
//! let client = connect.into_future().await;
//! // Get a transactional client.
//! let client = TransactionClient::new(config).await.unwrap();
//! # });
//! ```
//!
@ -116,6 +113,4 @@ pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value};
#[doc(inline)]
pub use crate::raw::{Client as RawClient, ColumnFamily};
#[doc(inline)]
pub use crate::transaction::{
Client as TransactionClient, Connect, Snapshot, Timestamp, Transaction,
};
pub use crate::transaction::{Client as TransactionClient, Snapshot, Timestamp, Transaction};

View File

@ -5,6 +5,7 @@ use crate::{
kv_client::{KvClient, RpcFnType, Store},
pd::PdClient,
request::{store_stream_for_key, store_stream_for_keys, store_stream_for_range, KvRequest},
transaction::HasLocks,
BoundRange, ColumnFamily, Error, Key, KvPair, Result, Value,
};
@ -482,15 +483,15 @@ impl_raw_rpc_request!(RawScanRequest);
impl_raw_rpc_request!(RawBatchScanRequest);
impl_raw_rpc_request!(RawDeleteRangeRequest);
dummy_impl_has_locks!(RawGetResponse);
dummy_impl_has_locks!(RawBatchGetResponse);
dummy_impl_has_locks!(RawPutResponse);
dummy_impl_has_locks!(RawBatchPutResponse);
dummy_impl_has_locks!(RawDeleteResponse);
dummy_impl_has_locks!(RawBatchDeleteResponse);
dummy_impl_has_locks!(RawScanResponse);
dummy_impl_has_locks!(RawBatchScanResponse);
dummy_impl_has_locks!(RawDeleteRangeResponse);
impl HasLocks for kvrpcpb::RawGetResponse {}
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
impl HasLocks for kvrpcpb::RawPutResponse {}
impl HasLocks for kvrpcpb::RawBatchPutResponse {}
impl HasLocks for kvrpcpb::RawDeleteResponse {}
impl HasLocks for kvrpcpb::RawBatchDeleteResponse {}
impl HasLocks for kvrpcpb::RawScanResponse {}
impl HasLocks for kvrpcpb::RawBatchScanResponse {}
impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
#[cfg(test)]
mod test {

View File

@ -99,6 +99,16 @@ impl Buffer {
self.mutations.lock().unwrap().insert(key, Mutation::Del);
}
/// Converts the buffered mutations to the proto buffer version
pub fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
self.mutations
.lock()
.unwrap()
.iter()
.filter_map(|(key, mutation)| mutation.to_proto_with_key(key))
.collect()
}
fn get_from_mutations(&self, key: &Key) -> MutationValue {
self.mutations
.lock()
@ -123,21 +133,18 @@ enum Mutation {
}
impl Mutation {
#[allow(dead_code)]
fn into_proto_with_key(self, key: Key) -> Option<kvrpcpb::Mutation> {
let mut pb = kvrpcpb::Mutation {
key: key.into(),
..Default::default()
};
fn to_proto_with_key(&self, key: &Key) -> Option<kvrpcpb::Mutation> {
let mut pb = kvrpcpb::Mutation::default();
match self {
Mutation::Cached(_) => return None,
Mutation::Put(v) => {
pb.set_op(kvrpcpb::Op::Put);
pb.set_value(v.into());
pb.set_value(v.clone().into());
}
Mutation::Del => pb.set_op(kvrpcpb::Op::Del),
Mutation::Lock => pb.set_op(kvrpcpb::Op::Lock),
};
pb.set_key(key.clone().into());
Some(pb)
}

View File

@ -6,15 +6,15 @@ use crate::{
Config, Result,
};
use derive_new::new;
use futures::prelude::*;
use futures::task::{Context, Poll};
use std::pin::Pin;
use futures::executor::ThreadPool;
use std::sync::Arc;
/// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster.
pub struct Client {
pd: Arc<PdRpcClient>,
/// The thread pool for background tasks including committing secondary keys and failed
/// transaction cleanups.
bg_worker: ThreadPool,
}
impl Client {
@ -24,12 +24,15 @@ impl Client {
/// use tikv_client::{Config, TransactionClient};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = TransactionClient::connect(Config::default());
/// let client = connect.await.unwrap();
/// let client = TransactionClient::new(Config::default()).await.unwrap();
/// # });
/// ```
pub fn connect(config: Config) -> Connect {
Connect::new(config)
pub async fn new(config: Config) -> Result<Client> {
let bg_worker = ThreadPool::new()?;
// TODO: PdRpcClient::connect currently uses a blocking implementation.
// Make it asynchronous later.
let pd = Arc::new(PdRpcClient::connect(&config)?);
Ok(Client { pd, bg_worker })
}
/// Creates a new [`Transaction`](Transaction).
@ -40,8 +43,7 @@ impl Client {
/// use tikv_client::{Config, TransactionClient};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = TransactionClient::connect(Config::default());
/// let client = connect.await.unwrap();
/// let client = TransactionClient::new(Config::default()).await.unwrap();
/// let mut transaction = client.begin().await.unwrap();
/// // ... Issue some commands.
/// let commit = transaction.commit();
@ -50,12 +52,12 @@ impl Client {
/// ```
pub async fn begin(&self) -> Result<Transaction> {
let timestamp = self.current_timestamp().await?;
Ok(Transaction::new(timestamp, self.pd.clone()))
Ok(self.new_transaction(timestamp))
}
/// Creates a new [`Snapshot`](Snapshot) at the given time.
pub fn snapshot(&self, timestamp: Timestamp) -> Snapshot {
Snapshot::new(Transaction::new(timestamp, self.pd.clone()))
Snapshot::new(self.new_transaction(timestamp))
}
/// Retrieves the current [`Timestamp`](Timestamp).
@ -64,42 +66,15 @@ impl Client {
/// use tikv_client::{Config, TransactionClient};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = TransactionClient::connect(Config::default());
/// let client = connect.await.unwrap();
/// let client = TransactionClient::new(Config::default()).await.unwrap();
/// let timestamp = client.current_timestamp().await.unwrap();
/// # });
/// ```
pub async fn current_timestamp(&self) -> Result<Timestamp> {
self.pd.clone().get_timestamp().await
}
}
/// An unresolved [`Client`](Client) connection to a TiKV cluster.
///
/// Once resolved it will result in a connected [`Client`](Client).
///
/// ```rust,no_run
/// use tikv_client::{Config, TransactionClient, Connect};
/// use futures::prelude::*;
///
/// # futures::executor::block_on(async {
/// let connect: Connect = TransactionClient::connect(Config::default());
/// let client: TransactionClient = connect.await.unwrap();
/// # });
/// ```
#[derive(new)]
pub struct Connect {
config: Config,
}
impl Future for Connect {
type Output = Result<Client>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let config = &self.config;
// TODO: PdRpcClient::connect currently uses a blocking implementation.
// Make it asynchronous later.
let pd = Arc::new(PdRpcClient::connect(config)?);
Poll::Ready(Ok(Client { pd }))
fn new_transaction(&self, timestamp: Timestamp) -> Transaction {
Transaction::new(timestamp, self.bg_worker.clone(), self.pd.clone())
}
}

View File

@ -111,17 +111,9 @@ async fn resolve_lock_with_retry(
}
pub trait HasLocks {
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo>;
}
macro_rules! dummy_impl_has_locks {
($t: tt) => {
impl crate::transaction::HasLocks for kvrpcpb::$t {
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
Vec::new()
}
}
};
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
Vec::new()
}
}
#[cfg(test)]

View File

@ -9,7 +9,7 @@
//! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace.
//!
pub use client::{Client, Connect};
pub use client::Client;
pub use lock::{resolve_locks, HasLocks};
pub use snapshot::Snapshot;
pub use transaction::Transaction;

View File

@ -294,8 +294,177 @@ pub fn new_cleanup_request(key: impl Into<Key>, start_version: u64) -> kvrpcpb::
req
}
dummy_impl_has_locks!(PrewriteResponse);
dummy_impl_has_locks!(CommitResponse);
dummy_impl_has_locks!(CleanupResponse);
dummy_impl_has_locks!(BatchRollbackResponse);
dummy_impl_has_locks!(ResolveLockResponse);
impl AsRef<Key> for kvrpcpb::Mutation {
fn as_ref(&self) -> &Key {
self.key.as_ref()
}
}
impl KvRequest for kvrpcpb::PrewriteRequest {
type Result = ();
type RpcResponse = kvrpcpb::PrewriteResponse;
type KeyData = Vec<kvrpcpb::Mutation>;
const REQUEST_NAME: &'static str = "kv_prewrite";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_prewrite_async_opt;
fn make_rpc_request<KvC: KvClient>(
&self,
mutations: Self::KeyData,
store: &Store<KvC>,
) -> Self {
let mut req = store.request::<Self>();
req.set_mutations(mutations);
req.set_primary_lock(self.primary_lock.clone());
req.set_start_version(self.start_version);
req.set_lock_ttl(self.lock_ttl);
req.set_skip_constraint_check(self.skip_constraint_check);
req.set_txn_size(self.txn_size);
req
}
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
let mutations = mem::replace(&mut self.mutations, Vec::default());
store_stream_for_keys(mutations, pd_client)
}
fn map_result(_: Self::RpcResponse) -> Self::Result {}
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results
.into_future()
.map(|(f, _)| f.expect("no results should be impossible"))
.boxed()
}
}
impl HasLocks for kvrpcpb::PrewriteResponse {
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
self.errors
.iter_mut()
.filter_map(|error| error.locked.take())
.collect()
}
}
pub fn new_prewrite_request(
mutations: Vec<kvrpcpb::Mutation>,
primary_lock: Key,
start_version: u64,
lock_ttl: u64,
) -> kvrpcpb::PrewriteRequest {
let mut req = kvrpcpb::PrewriteRequest::default();
req.set_mutations(mutations);
req.set_primary_lock(primary_lock.into());
req.set_start_version(start_version);
req.set_lock_ttl(lock_ttl);
// TODO: Lite resolve lock is currently disabled
req.set_txn_size(std::u64::MAX);
req
}
impl KvRequest for kvrpcpb::CommitRequest {
type Result = ();
type RpcResponse = kvrpcpb::CommitResponse;
type KeyData = Vec<Vec<u8>>;
const REQUEST_NAME: &'static str = "kv_commit";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_commit_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = store.request::<Self>();
req.set_keys(keys);
req.set_start_version(self.start_version);
req.set_commit_version(self.commit_version);
req
}
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
let keys = mem::replace(&mut self.keys, Vec::default());
store_stream_for_keys(keys, pd_client)
}
fn map_result(_: Self::RpcResponse) -> Self::Result {}
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results
.into_future()
.map(|(f, _)| f.expect("no results should be impossible"))
.boxed()
}
}
pub fn new_commit_request(
keys: Vec<Key>,
start_version: u64,
commit_version: u64,
) -> kvrpcpb::CommitRequest {
let mut req = kvrpcpb::CommitRequest::default();
req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_start_version(start_version);
req.set_commit_version(commit_version);
req
}
impl KvRequest for kvrpcpb::BatchRollbackRequest {
type Result = ();
type RpcResponse = kvrpcpb::BatchRollbackResponse;
type KeyData = Vec<Vec<u8>>;
const REQUEST_NAME: &'static str = "kv_batch_rollback";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_batch_rollback_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = store.request::<Self>();
req.set_keys(keys);
req.set_start_version(self.start_version);
req
}
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
let keys = mem::replace(&mut self.keys, Vec::default());
store_stream_for_keys(keys, pd_client)
}
fn map_result(_: Self::RpcResponse) -> Self::Result {}
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results
.into_future()
.map(|(f, _)| f.expect("no results should be impossible"))
.boxed()
}
}
pub fn new_batch_rollback_request(
keys: Vec<Key>,
start_version: u64,
) -> kvrpcpb::BatchRollbackRequest {
let mut req = kvrpcpb::BatchRollbackRequest::default();
req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_start_version(start_version);
req
}
impl HasLocks for kvrpcpb::CommitResponse {}
impl HasLocks for kvrpcpb::CleanupResponse {}
impl HasLocks for kvrpcpb::BatchRollbackResponse {}
impl HasLocks for kvrpcpb::ResolveLockResponse {}

View File

@ -1,18 +1,18 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
pd::PdRpcClient,
pd::{PdClient, PdRpcClient},
request::KvRequest,
transaction::{
buffer::Buffer,
requests::{new_mvcc_get_batch_request, new_mvcc_get_request},
Timestamp,
},
Key, KvPair, Result, Value,
transaction::{buffer::Buffer, requests::*, Timestamp},
Error, ErrorKind, Key, KvPair, Result, Value,
};
use derive_new::new;
use futures::executor::ThreadPool;
use futures::prelude::*;
use futures::stream::BoxStream;
use kvproto::kvrpcpb;
use std::mem;
use std::ops::RangeBounds;
use std::sync::Arc;
@ -29,29 +29,39 @@ use std::sync::Arc;
/// use tikv_client::{Config, TransactionClient};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = TransactionClient::connect(Config::default());
/// let client = connect.await.unwrap();
/// let client = TransactionClient::new(Config::default()).await.unwrap();
/// let txn = client.begin().await.unwrap();
/// # });
/// ```
#[derive(new)]
pub struct Transaction {
timestamp: Timestamp,
#[new(default)]
buffer: Buffer,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
}
impl Transaction {
pub(crate) fn new(
timestamp: Timestamp,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
) -> Transaction {
Transaction {
timestamp,
buffer: Default::default(),
bg_worker,
rpc,
}
}
/// Gets the value associated with the given key.
///
/// ```rust,no_run
/// # use tikv_client::{Value, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// # let client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"])).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let result: Option<Value> = txn.get(key).await.unwrap();
/// // Finish the transaction...
@ -74,9 +84,8 @@ impl Transaction {
/// # use futures::prelude::*;
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// # let client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"])).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let result: HashMap<Key, Value> = txn
/// .batch_get(keys)
@ -114,9 +123,8 @@ impl Transaction {
/// # use tikv_client::{Key, Value, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// # let client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"])).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let val = "TiKV".to_owned();
/// txn.set(key, val);
@ -124,8 +132,9 @@ impl Transaction {
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn set(&self, key: impl Into<Key>, value: impl Into<Value>) {
pub async fn set(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.buffer.put(key.into(), value.into());
Ok(())
}
/// Deletes the given key.
@ -134,17 +143,17 @@ impl Transaction {
/// # use tikv_client::{Key, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// # let client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"])).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// txn.delete(key);
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn delete(&self, key: impl Into<Key>) {
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
self.buffer.delete(key.into());
Ok(())
}
/// Locks the given keys.
@ -153,18 +162,18 @@ impl Transaction {
/// # use tikv_client::{Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// # let client = Client::new(Config::default()).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
/// // ... Do some actions.
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn lock_keys(&self, keys: impl IntoIterator<Item = impl Into<Key>>) {
pub async fn lock_keys(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
for key in keys {
self.buffer.lock(key.into());
}
Ok(())
}
/// Commits the actions of the transaction.
@ -173,31 +182,135 @@ impl Transaction {
/// # use tikv_client::{Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// # let client = Client::new(Config::default()).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// // ... Do some actions.
/// let req = txn.commit();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub async fn commit(&mut self) -> Result<()> {
TwoPhaseCommitter::new(
self.buffer.to_proto_mutations(),
self.timestamp.into_version(),
self.bg_worker.clone(),
self.rpc.clone(),
)
.commit()
.await
}
}
/// The default TTL of a lock in milliseconds
const DEFAULT_LOCK_TTL: u64 = 3000;
#[derive(new)]
struct TwoPhaseCommitter {
mutations: Vec<kvrpcpb::Mutation>,
start_version: u64,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
#[new(default)]
committed: bool,
#[new(default)]
undetermined: bool,
}
impl TwoPhaseCommitter {
async fn commit(mut self) -> Result<()> {
if self.mutations.is_empty() {
return Ok(());
}
self.prewrite().await?;
self.commit_primary().await?;
// FIXME: return from this method once the primary key is committed
let _ = self.commit_secondary().await;
Ok(())
match self.commit_primary().await {
Ok(commit_version) => {
self.committed = true;
self.bg_worker
.clone()
.spawn_ok(self.commit_secondary(commit_version).map(|res| {
if let Err(e) = res {
warn!("Failed to commit secondary keys: {}", e);
}
}));
Ok(())
}
Err(e) => {
if self.undetermined {
Err(Error::undetermined_error(e))
} else {
Err(e)
}
}
}
}
async fn prewrite(&mut self) -> Result<()> {
unimplemented!()
let primary_lock = self.mutations[0].key.clone().into();
// TODO: calculate TTL for big transactions
let lock_ttl = DEFAULT_LOCK_TTL;
new_prewrite_request(
self.mutations.clone(),
primary_lock,
self.start_version,
lock_ttl,
)
.execute(self.rpc.clone())
.await
}
async fn commit_primary(&mut self) -> Result<()> {
unimplemented!()
/// Commits the primary key and returns the commit version
async fn commit_primary(&mut self) -> Result<u64> {
let primary_key = vec![self.mutations[0].key.clone().into()];
let commit_version = self.rpc.clone().get_timestamp().await?.into_version();
new_commit_request(primary_key, self.start_version, commit_version)
.execute(self.rpc.clone())
.inspect_err(|e| {
// We don't know whether the transaction is committed or not if we fail to receive
// the response. Then, we mark the transaction as undetermined and propagate the
// error to the user.
if let ErrorKind::Grpc(_) = e.kind() {
self.undetermined = true;
}
})
.await?;
Ok(commit_version)
}
async fn commit_secondary(&mut self) -> Result<()> {
unimplemented!()
async fn commit_secondary(mut self, commit_version: u64) -> Result<()> {
let mutations = mem::replace(&mut self.mutations, Vec::default());
// No need to commit secondary keys when there is only one key
if mutations.len() == 1 {
return Ok(());
}
let keys = mutations
.into_iter()
.skip(1) // skip primary key
.map(|mutation| mutation.key.into())
.collect();
new_commit_request(keys, self.start_version, commit_version)
.execute(self.rpc.clone())
.await
}
fn rollback(&mut self) -> impl Future<Output = Result<()>> + 'static {
let mutations = mem::replace(&mut self.mutations, Vec::default());
let keys = mutations
.into_iter()
.map(|mutation| mutation.key.into())
.collect();
new_batch_rollback_request(keys, self.start_version).execute(self.rpc.clone())
}
}
impl Drop for TwoPhaseCommitter {
fn drop(&mut self) {
if !self.committed {
self.bg_worker.clone().spawn_ok(self.rollback().map(|res| {
if let Err(e) = res {
warn!("Failed to rollback: {}", e);
}
}))
}
}
}

View File

@ -1,10 +1,11 @@
#![cfg(feature = "integration-tests")]
use failure::Fallible;
use futures::executor::ThreadPool;
use futures::executor::{block_on, ThreadPool};
use futures::prelude::*;
use std::collections::HashMap;
use std::env;
use tikv_client::{Config, Result, TransactionClient};
use tikv_client::{Config, Key, Result, TransactionClient, Value};
#[test]
fn get_timestamp() -> Fallible<()> {
@ -12,7 +13,7 @@ fn get_timestamp() -> Fallible<()> {
let mut pool = ThreadPool::new()?;
let config = Config::new(pd_addrs());
let fut = async {
let client = TransactionClient::connect(config).await?;
let client = TransactionClient::new(config).await?;
Result::Ok(future::join_all((0..COUNT).map(|_| client.current_timestamp())).await)
};
// Calculate each version of retrieved timestamp
@ -29,6 +30,83 @@ fn get_timestamp() -> Fallible<()> {
Ok(())
}
#[test]
fn crud() -> Fallible<()> {
let config = Config::new(pd_addrs());
block_on(async move {
let client = TransactionClient::new(config).await?;
let mut txn = client.begin().await?;
// Get non-existent keys
assert!(txn.get("foo".to_owned()).await?.is_none());
assert_eq!(
txn.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter(|(_, v)| v.is_some())
.count(),
0
);
txn.set("foo".to_owned(), "bar".to_owned()).await?;
txn.set("bar".to_owned(), "foo".to_owned()).await?;
// Read buffered values
assert_eq!(
txn.get("foo".to_owned()).await?,
Some("bar".to_owned().into())
);
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
Some(Value::from("bar".to_owned())).as_ref()
);
assert_eq!(
batch_get_res.get(&Key::from("bar".to_owned())),
Some(Value::from("foo".to_owned())).as_ref()
);
txn.commit().await?;
// Read from TiKV then update and delete
let mut txn = client.begin().await?;
assert_eq!(
txn.get("foo".to_owned()).await?,
Some("bar".to_owned().into())
);
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
Some(Value::from("bar".to_owned())).as_ref()
);
assert_eq!(
batch_get_res.get(&Key::from("bar".to_owned())),
Some(Value::from("foo".to_owned())).as_ref()
);
txn.set("foo".to_owned(), "foo".to_owned()).await?;
txn.delete("bar".to_owned()).await?;
txn.commit().await?;
// Read again from TiKV
let txn = client.begin().await?;
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
Some(Value::from("foo".to_owned())).as_ref()
);
assert_eq!(batch_get_res.get(&Key::from("bar".to_owned())), None);
Fallible::Ok(())
})
}
const ENV_PD_ADDRS: &str = "PD_ADDRS";
fn pd_addrs() -> Vec<String> {