Use .await syntax

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2019-05-10 14:13:06 +12:00
parent 6353dbcfe3
commit bb044e6a83
7 changed files with 175 additions and 143 deletions

View File

@ -27,14 +27,14 @@ async fn main() -> Result<()> {
// When we first create a client we receive a `Connect` structure which must be resolved before
// the client is actually connected and usable.
let unconnnected_client = Client::new(config);
let client = await!(unconnnected_client)?;
let client = unconnnected_client.await?;
// Requests are created from the connected client. These calls return structures which
// implement `Future`. This means the `Future` must be resolved before the action ever takes
// place.
//
// Here we set the key `TiKV` to have the value `Rust` associated with it.
await!(client.put(KEY, VALUE)).unwrap(); // Returns a `tikv_client::Error` on failure.
client.put(KEY, VALUE).await.unwrap(); // Returns a `tikv_client::Error` on failure.
println!("Put key {:?}, value {:?}.", KEY, VALUE);
// Unlike a standard Rust HashMap all calls take owned values. This is because under the hood
@ -46,17 +46,20 @@ async fn main() -> Result<()> {
//
// It is best to pass a `Vec<u8>` in terms of explictness and speed. `String`s and a few other
// types are supported as well, but it all ends up as `Vec<u8>` in the end.
let value: Option<Value> = await!(client.get(KEY))?;
let value: Option<Value> = client.get(KEY).await?;
assert_eq!(value, Some(Value::from(VALUE)));
println!("Get key {:?} returned value {:?}.", Key::from(KEY), value);
// You can also set the `ColumnFamily` used by the request.
// This is *advanced usage* and should have some special considerations.
await!(client.delete(KEY)).expect("Could not delete value");
client.delete(KEY).await.expect("Could not delete value");
println!("Key: {:?} deleted", Key::from(KEY));
// Here we check if the key has been deleted from the key-value store.
let value: Option<Value> = await!(client.get(KEY)).expect("Could not get just deleted entry");
let value: Option<Value> = client
.get(KEY)
.await
.expect("Could not get just deleted entry");
assert!(value.is_none());
// You can ask to write multiple key-values at the same time, it is much more
@ -66,18 +69,25 @@ async fn main() -> Result<()> {
KvPair::from(("k2", "v2")),
KvPair::from(("k3", "v3")),
];
await!(client.batch_put(pairs)).expect("Could not put pairs");
client.batch_put(pairs).await.expect("Could not put pairs");
// Same thing when you want to retrieve multiple values.
let keys = vec![Key::from("k1"), Key::from("k2")];
let values = await!(client.batch_get(keys.clone())).expect("Could not get values");
let values = client
.batch_get(keys.clone())
.await
.expect("Could not get values");
println!("Found values: {:?} for keys: {:?}", values, keys);
// Scanning a range of keys is also possible giving it two bounds
// it will returns all entries between these two.
let start = "k1";
let end = "k2";
let pairs = await!(client.scan(start..=end, 10).key_only()).expect("Could not scan");
let pairs = client
.scan(start..=end, 10)
.key_only()
.await
.expect("Could not scan");
let keys: Vec<_> = pairs.into_iter().map(|p| p.key().clone()).collect();
assert_eq!(&keys, &[Key::from("k1"), Key::from("k2")]);

View File

@ -18,27 +18,28 @@ use tikv_client::{
async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
let mut txn = client.begin();
await!(future::join_all(
future::join_all(
pairs
.into_iter()
.map(Into::into)
.map(|p| txn.set(p.key().clone(), p.value().clone()))
))
.map(|p| txn.set(p.key().clone(), p.value().clone())),
)
.await
.into_iter()
.collect::<Result<Vec<()>, _>>()
.expect("Could not set key value pairs");
await!(txn.commit()).expect("Could not commit transaction");
txn.commit().await.expect("Could not commit transaction");
}
async fn get(client: &Client, key: Key) -> Value {
let txn = client.begin();
await!(txn.get(key)).expect("Could not get value")
txn.get(key).await.expect("Could not get value")
}
// Ignore a spurious warning from rustc (https://github.com/rust-lang/rust/issues/60566).
#[allow(unused_mut)]
async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
await!(client
client
.begin()
.scan(range)
.into_stream()
@ -51,18 +52,21 @@ async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
true
})
})
.for_each(|pair| { future::ready(println!("{:?}", pair)) }));
.for_each(|pair| future::ready(println!("{:?}", pair)))
.await;
}
async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
let mut txn = client.begin();
txn.set_isolation_level(IsolationLevel::ReadCommitted);
let _: Vec<()> = await!(stream::iter(keys.into_iter())
.then(|p| txn
.delete(p)
.unwrap_or_else(|e| panic!("error in delete: {:?}", e)))
.collect());
await!(txn.commit()).expect("Could not commit transaction");
let _: Vec<()> = stream::iter(keys.into_iter())
.then(|p| {
txn.delete(p)
.unwrap_or_else(|e| panic!("error in delete: {:?}", e))
})
.collect()
.await;
txn.commit().await.expect("Could not commit transaction");
}
#[runtime::main(runtime_tokio::Tokio)]
@ -79,26 +83,28 @@ async fn main() {
Config::new(args.pd)
};
let txn = await!(Client::new(config)).expect("Could not connect to tikv");
let txn = Client::new(config)
.await
.expect("Could not connect to tikv");
// set
let key1: Key = b"key1".to_vec().into();
let value1: Value = b"value1".to_vec().into();
let key2: Key = b"key2".to_vec().into();
let value2: Value = b"value2".to_vec().into();
await!(puts(&txn, vec![(key1, value1), (key2, value2)]));
puts(&txn, vec![(key1, value1), (key2, value2)]).await;
// get
let key1: Key = b"key1".to_vec().into();
let value1 = await!(get(&txn, key1.clone()));
let value1 = get(&txn, key1.clone()).await;
println!("{:?}", (key1, value1));
// scan
let key1: Key = b"key1".to_vec().into();
await!(scan(&txn, key1.., 10));
scan(&txn, key1.., 10).await;
// delete
let key1: Key = b"key1".to_vec().into();
let key2: Key = b"key2".to_vec().into();
await!(dels(&txn, vec![key1, key2]));
dels(&txn, vec![key1, key2]).await;
}

View File

@ -81,7 +81,7 @@
//! ([raw](raw/struct.Client.html), [transactional](transaction/struct.Client.html)).
//!
//! ```rust
//! # #![feature(async_await, await_macro)]
//! # #![feature(async_await)]
//! # use tikv_client::{*, raw::*};
//! # use futures::prelude::*;
//!
@ -96,7 +96,7 @@
//! let connect = Client::new(config);
//!
//! // Resolve the connection into a client.
//! let client = await!(connect.into_future());
//! let client = connect.into_future().await;
//! # });
//! ```
//!

View File

@ -29,12 +29,12 @@ impl Client {
/// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Config, raw::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = Client::new(Config::default());
/// let client = await!(connect).unwrap();
/// let client = connect.await.unwrap();
/// # });
/// ```
#[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))]
@ -53,15 +53,15 @@ impl Client {
/// given key.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Value, Config, raw::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let key = "TiKV";
/// let req = connected_client.get(key);
/// let result: Option<Value> = await!(req).unwrap();
/// let result: Option<Value> = req.await.unwrap();
/// # });
/// ```
pub fn get(&self, key: impl Into<Key>) -> Get {
@ -74,15 +74,15 @@ impl Client {
/// given keys.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{KvPair, Config, raw::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let keys = vec!["TiKV", "TiDB"];
/// let req = connected_client.batch_get(keys);
/// let result: Vec<KvPair> = await!(req).unwrap();
/// let result: Vec<KvPair> = req.await.unwrap();
/// # });
/// ```
pub fn batch_get(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchGet {
@ -97,16 +97,16 @@ impl Client {
/// Once resolved this request will result in the setting of the value associated with the given key.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Key, Value, Config, raw::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let key = "TiKV";
/// let val = "TiKV";
/// let req = connected_client.put(key, val);
/// let result: () = await!(req).unwrap();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Put {
@ -118,17 +118,17 @@ impl Client {
/// Once resolved this request will result in the setting of the value associated with the given key.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, raw::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let kvpair1 = ("PD", "Go");
/// let kvpair2 = ("TiKV", "Rust");
/// let iterable = vec![kvpair1, kvpair2];
/// let req = connected_client.batch_put(iterable);
/// let result: () = await!(req).unwrap();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub fn batch_put(&self, pairs: impl IntoIterator<Item = impl Into<KvPair>>) -> BatchPut {
@ -143,15 +143,15 @@ impl Client {
/// Once resolved this request will result in the deletion of the given key.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Key, Config, raw::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let key = "TiKV";
/// let req = connected_client.delete(key);
/// let result: () = await!(req).unwrap();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub fn delete(&self, key: impl Into<Key>) -> Delete {
@ -163,15 +163,15 @@ impl Client {
/// Once resolved this request will result in the deletion of the given keys.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Config, raw::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let keys = vec!["TiKV", "TiDB"];
/// let req = connected_client.batch_delete(keys);
/// let result: () = await!(req).unwrap();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchDelete {
@ -186,15 +186,15 @@ impl Client {
/// Once resolved this request will result in a scanner over the given keys.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{KvPair, Config, raw::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = connected_client.scan(inclusive_range, 2);
/// let result: Vec<KvPair> = await!(req).unwrap();
/// let result: Vec<KvPair> = req.await.unwrap();
/// # });
/// ```
pub fn scan(&self, range: impl KeyRange, limit: u32) -> Scan {
@ -206,17 +206,17 @@ impl Client {
/// Once resolved this request will result in a set of scanners over the given keys.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Key, Config, raw::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let inclusive_range1 = "TiDB"..="TiKV";
/// let inclusive_range2 = "TiKV"..="TiSpark";
/// let iterable = vec![inclusive_range1, inclusive_range2];
/// let req = connected_client.batch_scan(iterable, 2);
/// let result = await!(req);
/// let result = req.await;
/// # });
/// ```
pub fn batch_scan(
@ -238,15 +238,15 @@ impl Client {
/// Once resolved this request will result in the deletion of all keys over the given range.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Key, Config, raw::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = connected_client.delete_range(inclusive_range);
/// let result: () = await!(req).unwrap();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub fn delete_range(&self, range: impl KeyRange) -> DeleteRange {
@ -259,13 +259,13 @@ impl Client {
/// Once resolved it will result in a connected [`Client`](struct.Client.html).
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// use tikv_client::{Config, raw::{Client, Connect}};
/// use futures::prelude::*;
///
/// # futures::executor::block_on(async {
/// let connect: Connect = Client::new(Config::default());
/// let client: Client = await!(connect).unwrap();
/// let client: Client = connect.await.unwrap();
/// # });
/// ```
pub struct Connect {

View File

@ -21,12 +21,12 @@ impl Client {
/// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// use tikv_client::{Config, transaction::Client};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = Client::new(Config::default());
/// let client = await!(connect).unwrap();
/// let client = connect.await.unwrap();
/// # });
/// ```
#[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))]
@ -39,16 +39,16 @@ impl Client {
/// Using the transaction you can issue commands like [`get`](struct.Transaction.html#method.get) or [`set`](file:///home/hoverbear/git/client-rust/target/doc/tikv_client/transaction/struct.Transaction.html#method.set).
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// use tikv_client::{Config, transaction::Client};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = Client::new(Config::default());
/// let client = await!(connect).unwrap();
/// let client = connect.await.unwrap();
/// let transaction = client.begin();
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result: () = await!(commit).unwrap();
/// let result: () = commit.await.unwrap();
/// # });
/// ```
pub fn begin(&self) -> Transaction {
@ -58,17 +58,17 @@ impl Client {
/// Create a new [`Transaction`](struct.Transaction.html) at the provded timestamp.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// use tikv_client::{Config, transaction::Client};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = Client::new(Config::default());
/// let client = await!(connect).unwrap();
/// let client = connect.await.unwrap();
/// let timestamp = client.current_timestamp();
/// let transaction = client.begin_with_timestamp(timestamp);
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result: () = await!(commit).unwrap();
/// let result: () = commit.await.unwrap();
/// # });
/// ```
pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction {
@ -78,12 +78,12 @@ impl Client {
/// Get a [`Snapshot`](struct.Snapshot.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp).
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// use tikv_client::{Config, transaction::Client};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = Client::new(Config::default());
/// let client = await!(connect).unwrap();
/// let client = connect.await.unwrap();
/// let snapshot = client.snapshot();
/// // ... Issue some commands.
/// # });
@ -95,12 +95,12 @@ impl Client {
/// Retrieve the current [`Timestamp`](struct.Timestamp.html).
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// use tikv_client::{Config, transaction::Client};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = Client::new(Config::default());
/// let client = await!(connect).unwrap();
/// let client = connect.await.unwrap();
/// let timestamp = client.current_timestamp();
/// # });
/// ```
@ -114,13 +114,13 @@ impl Client {
/// Once resolved it will result in a connected [`Client`](struct.Client.html).
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// use tikv_client::{Config, transaction::{Client, Connect}};
/// use futures::prelude::*;
///
/// # futures::executor::block_on(async {
/// let connect: Connect = Client::new(Config::default());
/// let client: Client = await!(connect).unwrap();
/// let client: Client = connect.await.unwrap();
/// # });
/// ```
pub struct Connect {
@ -192,12 +192,12 @@ impl Transaction {
/// Create a new transaction operating on the given snapshot.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// use tikv_client::{Config, transaction::Client};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let connect = Client::new(Config::default());
/// let client = await!(connect).unwrap();
/// let client = connect.await.unwrap();
/// let txn = client.begin();
/// # });
/// ```
@ -210,16 +210,16 @@ impl Transaction {
/// Once committed, it is no longer possible to `rollback` the actions in the transaction.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connect = Client::new(Config::default());
/// # let connected_client = await!(connect).unwrap();
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// // ... Do some actions.
/// let req = txn.commit();
/// let result: () = await!(req).unwrap();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub fn commit(self) -> Commit {
@ -229,16 +229,16 @@ impl Transaction {
/// Rollback the actions of the transaction.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connect = Client::new(Config::default());
/// # let connected_client = await!(connect).unwrap();
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// // ... Do some actions.
/// let req = txn.rollback();
/// let result: () = await!(req).unwrap();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub fn rollback(self) -> Rollback {
@ -248,16 +248,16 @@ impl Transaction {
/// Lock the given keys.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connect = Client::new(Config::default());
/// # let connected_client = await!(connect).unwrap();
/// # let connected_client = connect.await.unwrap();
/// let mut txn = connected_client.begin();
/// // ... Do some actions.
/// let req = txn.lock_keys(vec!["TiKV", "Rust"]);
/// let result: () = await!(req).unwrap();
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub fn lock_keys(&mut self, keys: impl IntoIterator<Item = impl Into<Key>>) -> LockKeys {
@ -271,12 +271,12 @@ impl Transaction {
/// Returns the timestamp which the transaction started at.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Config, transaction::{Client, Timestamp}};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connect = Client::new(Config::default());
/// # let connected_client = await!(connect).unwrap();
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// // ... Do some actions.
/// let ts: Timestamp = txn.start_ts();
@ -289,12 +289,12 @@ impl Transaction {
/// Get the `Snapshot` the transaction is operating on.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Config, transaction::{Client, Snapshot}};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connect = Client::new(Config::default());
/// # let connected_client = await!(connect).unwrap();
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// // ... Do some actions.
/// let snap: Snapshot = txn.snapshot();
@ -307,12 +307,12 @@ impl Transaction {
/// Set the isolation level of the transaction.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Config, transaction::{Client, IsolationLevel}};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connect = Client::new(Config::default());
/// # let connected_client = await!(connect).unwrap();
/// # let connected_client = connect.await.unwrap();
/// let mut txn = connected_client.begin();
/// txn.set_isolation_level(IsolationLevel::SnapshotIsolation);
/// # });
@ -327,18 +327,18 @@ impl Transaction {
/// given key.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Value, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let key = "TiKV";
/// let req = txn.get(key);
/// let result: Value = await!(req).unwrap();
/// let result: Value = req.await.unwrap();
/// // Finish the transaction...
/// await!(txn.commit()).unwrap();
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn get(&self, key: impl Into<Key>) -> Get {
@ -351,18 +351,18 @@ impl Transaction {
/// given keys.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{KvPair, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let keys = vec!["TiKV", "TiDB"];
/// let req = txn.batch_get(keys);
/// let result: Vec<KvPair> = await!(req).unwrap();
/// let result: Vec<KvPair> = req.await.unwrap();
/// // Finish the transaction...
/// await!(txn.commit()).unwrap();
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn batch_get(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchGet {
@ -382,19 +382,19 @@ impl Transaction {
/// Once resolved this request will result in the setting of the value associated with the given key.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Key, Value, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let key = "TiKV";
/// let val = "TiKV";
/// let req = txn.set(key, val);
/// let result: () = await!(req).unwrap();
/// let result: () = req.await.unwrap();
/// // Finish the transaction...
/// await!(txn.commit()).unwrap();
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn set(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Set {
@ -406,18 +406,18 @@ impl Transaction {
/// Once resolved this request will result in the deletion of the given key.
///
/// ```rust,no_run
/// # #![feature(async_await, await_macro)]
/// # #![feature(async_await)]
/// # use tikv_client::{Key, Config, transaction::Client};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::new(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = await!(connecting_client).unwrap();
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let key = "TiKV";
/// let req = txn.delete(key);
/// let result: () = await!(req).unwrap();
/// let result: () = req.await.unwrap();
/// // Finish the transaction...
/// await!(txn.commit()).unwrap();
/// txn.commit().await.unwrap();
/// # });
/// ```
pub fn delete(&mut self, key: impl Into<Key>) -> Delete {

View File

@ -15,12 +15,17 @@ fn generate_value(id: i32) -> Value {
async fn wipe_all(client: &Client) {
let test_key_start = generate_key(0);
let test_key_end = generate_key(NUM_TEST_KEYS as i32 - 1);
await!(client.delete_range(test_key_start..test_key_end)).expect("Could not delete test keys");
client
.delete_range(test_key_start..test_key_end)
.await
.expect("Could not delete test keys");
}
async fn connect() -> Client {
let client = await!(Client::new(Config::new(pd_addr()))).expect("Could not connect to tikv");
await!(wipe_all(&client));
let client = Client::new(Config::new(pd_addr()))
.await
.expect("Could not connect to tikv");
wipe_all(&client).await;
client
}
@ -28,11 +33,11 @@ async fn test_empty(client: &Client) {
let test_key_start = generate_key(0);
let test_key_end = generate_key(NUM_TEST_KEYS as i32 - 1);
assert!(
await!(client.scan(test_key_start..test_key_end, NUM_TEST_KEYS))
.expect("Could not scan")
.is_empty()
);
assert!(client
.scan(test_key_start..test_key_end, NUM_TEST_KEYS)
.await
.expect("Could not scan")
.is_empty());
}
async fn test_existence<'a>(
@ -46,7 +51,9 @@ async fn test_existence<'a>(
for pair in existing_pairs.iter().map(Clone::clone) {
let (key, value) = pair.into_inner();
assert_eq!(
await!(client.get(key))
client
.get(key)
.await
.expect("Could not get value")
.expect("key doesn't exist"),
value.clone(),
@ -54,7 +61,7 @@ async fn test_existence<'a>(
}
for key in not_existing_keys.clone().into_iter() {
let r = await!(client.get(key)).expect("Cound not get value");
let r = client.get(key).await.expect("Cound not get value");
assert!(r.is_none());
}
@ -70,76 +77,85 @@ async fn test_existence<'a>(
all_keys.extend_from_slice(&not_existing_keys);
assert_eq!(
await!(client.batch_get(all_keys)).expect("Could not get value in batch"),
client
.batch_get(all_keys)
.await
.expect("Could not get value in batch"),
existing_pairs,
);
assert_eq!(
await!(client.batch_get(not_existing_keys)).expect("Could not get value in batch"),
client
.batch_get(not_existing_keys)
.await
.expect("Could not get value in batch"),
Vec::new(),
);
assert_eq!(
await!(client.scan(test_key_start.clone()..test_key_end.clone(), NUM_TEST_KEYS))
client
.scan(test_key_start.clone()..test_key_end.clone(), NUM_TEST_KEYS)
.await
.expect("Could not scan"),
existing_pairs,
);
assert_eq!(
await!(client
client
.scan(test_key_start.clone()..test_key_end.clone(), NUM_TEST_KEYS)
.key_only())
.expect("Could not scan"),
.key_only()
.await
.expect("Could not scan"),
existing_key_only_pairs,
);
}
#[runtime::test(runtime_tokio::Tokio)]
async fn basic_raw_test() {
let client = await!(connect());
let client = connect().await;
await!(test_empty(&client));
test_empty(&client).await;
assert!(await!(client.put(generate_key(0), generate_value(0))).is_ok());
assert!(client.put(generate_key(0), generate_value(0)).await.is_ok());
let existing = &[KvPair::new(generate_key(0), generate_value(0))];
await!(test_existence(
&client,
existing,
vec![generate_key(1), generate_key(2)],
));
test_existence(&client, existing, vec![generate_key(1), generate_key(2)]).await;
let empty_pairs = Vec::new();
assert!(await!(client.delete(generate_key(0))).is_ok());
await!(test_existence(
assert!(client.delete(generate_key(0)).await.is_ok());
test_existence(
&client,
&empty_pairs,
vec![generate_key(0), generate_key(1), generate_key(2)],
));
)
.await;
let pairs: Vec<KvPair> = (0..10)
.map(|i| KvPair::new(generate_key(i), generate_value(i)))
.collect();
assert!(await!(client.batch_put(pairs.clone())).is_ok());
await!(test_existence(
assert!(client.batch_put(pairs.clone()).await.is_ok());
test_existence(
&client,
&pairs,
vec![generate_key(10), generate_key(11), generate_key(12)],
));
)
.await;
let keys: Vec<Key> = vec![generate_key(8), generate_key(9)];
assert!(await!(client.batch_delete(keys)).is_ok());
assert!(client.batch_delete(keys).await.is_ok());
let mut pairs = pairs;
pairs.truncate(8);
await!(test_existence(
test_existence(
&client,
&pairs,
vec![generate_key(8), generate_key(9), generate_key(10)],
));
)
.await;
await!(wipe_all(&client));
await!(test_existence(
wipe_all(&client).await;
test_existence(
&client,
&empty_pairs,
pairs.into_iter().map(|x| x.into_inner().0).collect(),
));
)
.await;
}

View File

@ -1,6 +1,6 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
#![feature(async_await, await_macro)]
#![feature(async_await)]
#[cfg(feature = "integration-tests")]
mod integration_tests;