Use async/await in the transaction APIs (#91)

Use async/await in the transaction APIs
This commit is contained in:
Shirly 2019-07-31 17:05:50 +08:00 committed by GitHub
commit 91bc7f637f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 43 additions and 32 deletions

View File

@ -6,7 +6,7 @@ os:
# - windows # TODO: https://github.com/pingcap/kvproto/issues/355
- osx
rust:
# Requires nightly for now, stable can be re-enabled when 1.36 is stable.
# Requires nightly for now, stable can be re-enabled when async/await is stable.
# - stable
- nightly
env:

View File

@ -14,7 +14,9 @@ This is an open source (Apache 2) project hosted by the Cloud Native Computing F
## Using the client
The TiKV client is a Rust library (crate). It requires version 1.36 of the compiler and standard libraries (which will be stable from the 4th July 2019, see below for ensuring compatibility).
The TiKV client is a Rust library (crate). It uses async/await internally and exposes some `async fn` APIs as well.
Async/await is a new feature in Rust and is currently unstable. To use it you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below).
To use this crate in your project, add it as a dependency in your `Cargo.toml`:
@ -28,8 +30,6 @@ The client requires a Git dependency until we can [publish it](https://github.co
There are [examples](examples) which show how to use the client in a Rust program.
The examples and documentation use async/await syntax. This is a new feature in Rust and is currently unstable. To use async/await you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below).
## Access the documentation
We recommend using the cargo-generated documentation to browse and understand the API. We've done
@ -52,7 +52,7 @@ To check what version of Rust you are using, run
rustc --version
```
You'll see something like `rustc 1.36.0-nightly (a784a8022 2019-05-09)` where the `1.36.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use
You'll see something like `rustc 1.38.0-nightly (dddb7fca0 2019-07-30)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use
```bash
rustup toolchain install nightly

View File

@ -13,7 +13,7 @@ use tikv_client::{
};
async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
let mut txn = client.begin();
let mut txn = client.begin().await.expect("Could not begin a transaction");
future::join_all(
pairs
.into_iter()
@ -28,7 +28,7 @@ async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>
}
async fn get(client: &Client, key: Key) -> Value {
let txn = client.begin();
let txn = client.begin().await.expect("Could not begin a transaction");
txn.get(key).await.expect("Could not get value")
}
@ -37,6 +37,8 @@ async fn get(client: &Client, key: Key) -> Value {
async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
client
.begin()
.await
.expect("Could not begin a transaction")
.scan(range)
.into_stream()
.take_while(move |r| {
@ -53,7 +55,7 @@ async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
}
async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
let mut txn = client.begin();
let mut txn = client.begin().await.expect("Could not begin a transaction");
txn.set_isolation_level(IsolationLevel::ReadCommitted);
let _: Vec<()> = stream::iter(keys.into_iter())
.then(|p| {

View File

@ -1 +1 @@
nightly-2019-07-16
nightly-2019-07-31

View File

@ -3,6 +3,7 @@
// Long and nested future chains can quickly result in large generic types.
#![type_length_limit = "16777216"]
#![allow(clippy::redundant_closure)]
#![feature(async_await)]
//! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
//! distributed transactional Key-Value database written in Rust.

View File

@ -20,7 +20,7 @@ use crate::{
kv::BoundRange,
raw::ColumnFamily,
rpc::{
pd::{PdClient, Region, RegionId, RetryClient, StoreId},
pd::{PdClient, Region, RegionId, RetryClient, StoreId, Timestamp},
security::SecurityManager,
tikv::KvClient,
Address, RawContext, Store, TxnContext,
@ -225,6 +225,10 @@ impl<PdC: PdClient> RpcClient<PdC> {
future::err(Error::unimplemented())
}
pub fn get_timestamp(self: Arc<Self>) -> impl Future<Output = Result<Timestamp>> {
Arc::clone(&self.pd).get_timestamp()
}
// Returns a Steam which iterates over the contexts for each region covered by range.
fn regions_for_range(
self: Arc<Self>,

View File

@ -1,15 +1,19 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use super::{Snapshot, Timestamp, Transaction};
use crate::{Config, Error};
use crate::rpc::RpcClient;
use crate::{Config, Result};
use derive_new::new;
use futures::prelude::*;
use futures::task::{Context, Poll};
use std::pin::Pin;
use std::sync::Arc;
/// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster.
pub struct Client;
pub struct Client {
rpc: Arc<RpcClient>,
}
impl Client {
/// Creates a new [`Client`](Client) once the [`Connect`](Connect) resolves.
@ -38,13 +42,13 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let transaction = client.begin();
/// let transaction = client.begin().await.unwrap();
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result: () = commit.await.unwrap();
/// # });
/// ```
pub fn begin(&self) -> Transaction {
pub async fn begin(&self) -> Result<Transaction> {
unimplemented!()
}
@ -57,11 +61,11 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let snapshot = client.snapshot();
/// let snapshot = client.snapshot().await.unwrap();
/// // ... Issue some commands.
/// # });
/// ```
pub fn snapshot(&self) -> Snapshot {
pub async fn snapshot(&self) -> Result<Snapshot> {
unimplemented!()
}
@ -79,7 +83,7 @@ impl Client {
/// // ... Issue some commands.
/// # });
/// ```
pub fn snapshot_at(&self, _timestamp: Timestamp) -> Snapshot {
pub async fn snapshot_at(&self, _timestamp: Timestamp) -> Result<Snapshot> {
unimplemented!()
}
@ -92,11 +96,11 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let timestamp = client.current_timestamp();
/// let timestamp = client.current_timestamp().await.unwrap();
/// # });
/// ```
pub fn current_timestamp(&self) -> Timestamp {
unimplemented!()
pub async fn current_timestamp(&self) -> Result<Timestamp> {
self.rpc.clone().get_timestamp().await
}
}
@ -120,7 +124,7 @@ pub struct Connect {
}
impl Future for Connect {
type Output = Result<Client, Error>;
type Output = Result<Client>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _config = &self.config;

View File

@ -22,7 +22,7 @@ use std::ops::RangeBounds;
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let txn = client.begin();
/// let txn = client.begin().await.unwrap();
/// # });
/// ```
#[derive(new)]
@ -42,7 +42,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// let txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let req = txn.commit();
/// let result: () = req.await.unwrap();
@ -61,7 +61,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// let txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let req = txn.rollback();
/// let result: () = req.await.unwrap();
@ -80,7 +80,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let req = txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
/// let result: () = req.await.unwrap();
@ -103,7 +103,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// let txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let ts: Timestamp = txn.start_ts();
/// # });
@ -121,7 +121,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// let txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let snap: Snapshot = txn.snapshot();
/// # });
@ -139,7 +139,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// txn.set_isolation_level(IsolationLevel::SnapshotIsolation);
/// # });
/// ```
@ -159,7 +159,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = txn.get(key);
/// let result: Value = req.await.unwrap();
@ -183,7 +183,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let req = txn.batch_get(keys);
/// let result: Vec<KvPair> = req.await.unwrap();
@ -214,7 +214,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let val = "TiKV".to_owned();
/// let req = txn.set(key, val);
@ -238,7 +238,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = txn.delete(key);
/// let result: () = req.await.unwrap();