Remove explicit Futures for transaction

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2019-07-31 13:53:07 +08:00
parent 94f080e269
commit 78fa34fd37
6 changed files with 151 additions and 335 deletions

View File

@ -7,23 +7,14 @@ mod common;
use crate::common::parse_args;
use futures::prelude::*;
use std::ops::RangeBounds;
use tikv_client::{
transaction::{Client, IsolationLevel},
Config, Key, KvPair, Value,
};
use tikv_client::{transaction::Client, Config, Key, KvPair, Value};
async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
let mut txn = client.begin().await.expect("Could not begin a transaction");
future::join_all(
pairs
.into_iter()
.map(Into::into)
.map(|p| txn.set(p.key().clone(), p.value().clone())),
)
.await
.into_iter()
.collect::<Result<Vec<()>, _>>()
.expect("Could not set key value pairs");
for pair in pairs {
let (key, value) = pair.into().into();
txn.set(key, value);
}
txn.commit().await.expect("Could not commit transaction");
}
@ -56,14 +47,9 @@ async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
let mut txn = client.begin().await.expect("Could not begin a transaction");
txn.set_isolation_level(IsolationLevel::ReadCommitted);
let _: Vec<()> = stream::iter(keys.into_iter())
.then(|p| {
txn.delete(p)
.unwrap_or_else(|e| panic!("error in delete: {:?}", e))
})
.collect()
.await;
for key in keys {
txn.delete(key);
}
txn.commit().await.expect("Could not commit transaction");
}

View File

@ -87,7 +87,7 @@ impl Region {
}
}
#[derive(Eq, PartialEq, Debug)]
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub struct Timestamp {
pub physical: i64,
pub logical: i64,

View File

@ -42,14 +42,15 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let transaction = client.begin().await.unwrap();
/// let mut transaction = client.begin().await.unwrap();
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result: () = commit.await.unwrap();
/// # });
/// ```
pub async fn begin(&self) -> Result<Transaction> {
unimplemented!()
let snapshot = self.snapshot().await?;
Ok(Transaction::new(snapshot))
}
/// Gets the latest [`Snapshot`](Snapshot).
@ -66,7 +67,8 @@ impl Client {
/// # });
/// ```
pub async fn snapshot(&self) -> Result<Snapshot> {
unimplemented!()
let timestamp = self.current_timestamp().await?;
self.snapshot_at(timestamp).await
}
/// Gets a [`Snapshot`](Snapshot) at the given point in time.
@ -83,8 +85,8 @@ impl Client {
/// // ... Issue some commands.
/// # });
/// ```
pub async fn snapshot_at(&self, _timestamp: Timestamp) -> Result<Snapshot> {
unimplemented!()
pub async fn snapshot_at(&self, timestamp: Timestamp) -> Result<Snapshot> {
Ok(Snapshot::new(timestamp))
}
/// Retrieves the current [`Timestamp`](Timestamp).

View File

@ -10,8 +10,8 @@
//!
pub use self::client::{Client, Connect};
pub use self::requests::{BatchGet, Commit, Delete, Get, LockKeys, Rollback, Scanner, Set};
pub use self::transaction::{IsolationLevel, Snapshot, Transaction, TxnInfo};
pub use self::requests::Scanner;
pub use self::transaction::{Snapshot, Transaction, TxnInfo};
pub use super::rpc::Timestamp;
use crate::{Key, Value};

View File

@ -1,9 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use super::Transaction;
use crate::{Error, Key, KvPair, Value};
use crate::{Error, KvPair};
use derive_new::new;
use futures::prelude::*;
use futures::task::{Context, Poll};
use std::pin::Pin;
@ -20,127 +18,3 @@ impl Stream for Scanner {
unimplemented!()
}
}
/// An unresolved [`Transaction::get`](Transaction::get) request.
///
/// Once resolved this request will result in the fetching of the value associated with the given
/// key.
#[derive(new)]
pub struct Get {
key: Key,
}
impl Future for Get {
type Output = Result<Value, Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _key = &self.key;
unimplemented!()
}
}
/// An unresolved [`Transaction::batch_get`](Transaction::batch_get) request.
///
/// Once resolved this request will result in the fetching of the values associated with the given
/// keys.
#[derive(new)]
pub struct BatchGet {
keys: Vec<Key>,
}
impl Future for BatchGet {
type Output = Result<Vec<KvPair>, Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _keys = &self.keys;
unimplemented!()
}
}
/// An unresolved [`Transaction::commit`](Transaction::commit) request.
///
/// Once resolved this request will result in the committing of the transaction.
#[derive(new)]
pub struct Commit {
txn: Transaction,
}
impl Future for Commit {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _txn = &self.txn;
unimplemented!()
}
}
/// An unresolved [`Transaction::rollback`](Transaction::rollback) request.
///
/// Once resolved this request will result in the rolling back of the transaction.
#[derive(new)]
pub struct Rollback {
txn: Transaction,
}
impl Future for Rollback {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _txn = &self.txn;
unimplemented!()
}
}
/// An unresolved [`Transaction::lock_keys`](Transaction::lock_keys) request.
///
/// Once resolved this request will result in the locking of the given keys.
#[derive(new)]
pub struct LockKeys {
keys: Vec<Key>,
}
impl Future for LockKeys {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _keys = &self.keys;
unimplemented!()
}
}
/// An unresolved [`Transaction::set`](Transaction::set) request.
///
/// Once resolved this request will result in the setting of the value associated with the given
/// key.
#[derive(new)]
pub struct Set {
key: Key,
value: Value,
}
impl Future for Set {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _key = &self.key;
let _value = &self.value;
unimplemented!()
}
}
/// An unresolved [`Transaction::delete`](Transaction::delete) request.
///
/// Once resolved this request will result in the deletion of the given key.
#[derive(new)]
pub struct Delete {
key: Key,
}
impl Future for Delete {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _key = &self.key;
unimplemented!()
}
}

View File

@ -1,7 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use super::{BatchGet, Commit, Delete, Get, LockKeys, Rollback, Scanner, Set, Timestamp};
use crate::{Key, Value};
use super::{Scanner, Timestamp};
use crate::{Key, KvPair, Result, Value};
use derive_new::new;
use std::ops::RangeBounds;
@ -31,9 +31,101 @@ pub struct Transaction {
}
impl Transaction {
/// Commit the actions of the transaction.
/// Gets the value associated with the given key.
///
/// Once committed, it is no longer possible to `rollback` the actions in the transaction.
/// ```rust,no_run
/// # #![feature(async_await)]
/// # use tikv_client::{Value, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = txn.get(key);
/// let result: Value = req.await.unwrap();
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
/// ```
pub async fn get(&self, _key: impl Into<Key>) -> Result<Value> {
unimplemented!()
}
/// Gets the values associated with the given keys.
///
/// ```rust,no_run
/// # #![feature(async_await)]
/// # use tikv_client::{KvPair, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let req = txn.batch_get(keys);
/// let result: Vec<KvPair> = req.await.unwrap();
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
/// ```
pub async fn batch_get(
&self,
_keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
unimplemented!()
}
pub fn scan(&self, _range: impl RangeBounds<Key>) -> Scanner {
unimplemented!()
}
pub fn scan_reverse(&self, _range: impl RangeBounds<Key>) -> Scanner {
unimplemented!()
}
/// Sets the value associated with the given key.
///
/// ```rust,no_run
/// # #![feature(async_await)]
/// # use tikv_client::{Key, Value, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let val = "TiKV".to_owned();
/// txn.set(key, val);
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn set(&mut self, _key: impl Into<Key>, _value: impl Into<Value>) {
unimplemented!()
}
/// Deletes the given key.
///
/// ```rust,no_run
/// # #![feature(async_await)]
/// # use tikv_client::{Key, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// txn.delete(key);
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn delete(&mut self, _key: impl Into<Key>) {
unimplemented!()
}
/// Locks the given keys.
///
/// ```rust,no_run
/// # #![feature(async_await)]
@ -42,36 +134,17 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin().await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
/// // ... Do some actions.
/// let req = txn.commit();
/// let result: () = req.await.unwrap();
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn commit(self) -> Commit {
Commit::new(self)
pub fn lock_keys(&mut self, _keys: impl IntoIterator<Item = impl Into<Key>>) {
unimplemented!()
}
/// Rollback the actions of the transaction.
///
/// ```rust,no_run
/// # #![feature(async_await)]
/// # use tikv_client::{Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let req = txn.rollback();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub fn rollback(self) -> Rollback {
Rollback::new(self)
}
/// Lock the given keys.
/// Commits the actions of the transaction.
///
/// ```rust,no_run
/// # #![feature(async_await)]
@ -82,15 +155,11 @@ impl Transaction {
/// # let connected_client = connect.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let req = txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
/// let req = txn.commit();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub fn lock_keys(&mut self, keys: impl IntoIterator<Item = impl Into<Key>>) -> LockKeys {
LockKeys::new(keys.into_iter().map(|v| v.into()).collect())
}
pub fn is_readonly(&self) -> bool {
pub async fn commit(&mut self) -> Result<()> {
unimplemented!()
}
@ -109,10 +178,10 @@ impl Transaction {
/// # });
/// ```
pub fn start_ts(&self) -> Timestamp {
unimplemented!()
self.snapshot().timestamp
}
/// Get the `Snapshot` the transaction is operating on.
/// Gets the `Snapshot` the transaction is operating on.
///
/// ```rust,no_run
/// # #![feature(async_await)]
@ -123,34 +192,27 @@ impl Transaction {
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let snap: Snapshot = txn.snapshot();
/// let snap: &Snapshot = txn.snapshot();
/// # });
/// ```
pub fn snapshot(&self) -> Snapshot {
unimplemented!()
pub fn snapshot(&self) -> &Snapshot {
&self.snapshot
}
}
/// Set the isolation level of the transaction.
///
/// ```rust,no_run
/// # #![feature(async_await)]
/// # use tikv_client::{Config, transaction::{Client, IsolationLevel}};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// txn.set_isolation_level(IsolationLevel::SnapshotIsolation);
/// # });
/// ```
pub fn set_isolation_level(&mut self, _level: IsolationLevel) {
unimplemented!()
}
pub struct TxnInfo {
pub txn: u64,
pub status: u64,
}
/// Create a new [`Get`](Get) request.
///
/// Once resolved this request will result in the fetching of the value associated with the
/// given key.
/// A snapshot of dataset at a particular point in time.
#[derive(new)]
pub struct Snapshot {
timestamp: Timestamp,
}
impl Snapshot {
/// Gets the value associated with the given key.
///
/// ```rust,no_run
/// # #![feature(async_await)]
@ -159,22 +221,17 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// let snapshot = connected_client.snapshot().await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = txn.get(key);
/// let req = snapshot.get(key);
/// let result: Value = req.await.unwrap();
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn get(&self, key: impl Into<Key>) -> Get {
self.snapshot.get(key.into())
pub async fn get(&self, _key: impl Into<Key>) -> Result<Value> {
unimplemented!()
}
/// Create a new [`BatchGet`](BatchGet) request.
///
/// Once resolved this request will result in the fetching of the values associated with the
/// given keys.
/// Gets the values associated with the given keys.
///
/// ```rust,no_run
/// # #![feature(async_await)]
@ -191,83 +248,11 @@ impl Transaction {
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn batch_get(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchGet {
self.snapshot.batch_get(keys)
}
pub fn scan(&self, range: impl RangeBounds<Key>) -> Scanner {
self.snapshot.scan(range)
}
pub fn scan_reverse(&self, range: impl RangeBounds<Key>) -> Scanner {
self.snapshot.scan_reverse(range)
}
/// Create a new [`Set`](Set) request.
///
/// Once resolved this request will result in the setting of the value associated with the given key.
///
/// ```rust,no_run
/// # #![feature(async_await)]
/// # use tikv_client::{Key, Value, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let val = "TiKV".to_owned();
/// let req = txn.set(key, val);
/// let result: () = req.await.unwrap();
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn set(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Set {
Set::new(key.into(), value.into())
}
/// Create a new [`Delete`](Delete) request.
///
/// Once resolved this request will result in the deletion of the given key.
///
/// ```rust,no_run
/// # #![feature(async_await)]
/// # use tikv_client::{Key, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = txn.delete(key);
/// let result: () = req.await.unwrap();
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn delete(&mut self, key: impl Into<Key>) -> Delete {
Delete::new(key.into())
}
}
pub struct TxnInfo {
pub txn: u64,
pub status: u64,
}
/// A snapshot of dataset at a particular point in time.
pub struct Snapshot {
_timestamp: Timestamp,
}
impl Snapshot {
pub fn get(&self, key: impl Into<Key>) -> Get {
Get::new(key.into())
}
pub fn batch_get(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchGet {
BatchGet::new(keys.into_iter().map(|v| v.into()).collect())
pub async fn batch_get(
&self,
_keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
unimplemented!()
}
pub fn scan(&self, range: impl RangeBounds<Key>) -> Scanner {
@ -280,34 +265,3 @@ impl Snapshot {
unimplemented!()
}
}
/// The isolation level guarantees provided by the transaction.
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum IsolationLevel {
/// Consistent reads and conflict free writes.
///
/// Snapshot isolation guarantees:
/// * All reads will see the last committed value of the data at the snapshot timestamp.
/// * The transaction will only successfully commit if no updates to the data have created a
/// conflict with concurrent updates made sine the snapshot.
///
/// Using this level means:
/// * Lost updates don't occur.
/// * Dirty reads don't occur.
/// * Non-repeatable reads don't occur.
/// * Phantom reads don't occur.
SnapshotIsolation,
/// Reads may not be consistent, but writes are conflict free.
///
/// Read committed guarantees:
/// * All reads are committed at the moment it is read.
/// not repeatable.
/// * Write locks are only released at the end of the transaction.
///
/// Using this level means:
/// * Lost updates don't occur.
/// * Dirty reads don't occur.
/// * Non-repeatable reads may occur.
/// * Phantom reads may occur.
ReadCommitted,
}