mirror of https://github.com/tikv/client-rust.git
Merge pull request #128 from silathdiir/fix-issue-119-convert-futures-to-async-await
Convert part of futures to `async/await`
This commit is contained in:
commit
f3f3db349c
|
@ -78,23 +78,21 @@ impl KvClient for KvRpcClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn map_errors_and_trace<Resp, RpcFuture>(
|
async fn map_errors_and_trace<Resp, RpcFuture>(
|
||||||
request_name: &'static str,
|
request_name: &'static str,
|
||||||
fut: ::grpcio::Result<RpcFuture>,
|
fut: ::grpcio::Result<RpcFuture>,
|
||||||
) -> impl Future<Output = Result<Resp>>
|
) -> Result<Resp>
|
||||||
where
|
where
|
||||||
Compat01As03<RpcFuture>: Future<Output = std::result::Result<Resp, ::grpcio::Error>>,
|
Compat01As03<RpcFuture>: Future<Output = std::result::Result<Resp, ::grpcio::Error>>,
|
||||||
Resp: HasError + Sized + Clone + Send + 'static,
|
Resp: HasError + Sized + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
let context = tikv_stats(request_name);
|
let res = match fut {
|
||||||
|
Ok(f) => Compat01As03::new(f).await,
|
||||||
|
Err(e) => Err(e),
|
||||||
|
};
|
||||||
|
|
||||||
// FIXME should handle the error, not unwrap.
|
let context = tikv_stats(request_name);
|
||||||
Compat01As03::new(fut.unwrap())
|
context.done(res.map_err(|e| ErrorKind::Grpc(e).into()))
|
||||||
.map(|r| match r {
|
|
||||||
Err(e) => Err(ErrorKind::Grpc(e).into()),
|
|
||||||
Ok(r) => Ok(r),
|
|
||||||
})
|
|
||||||
.map(move |r| context.done(r))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(new)]
|
#[derive(new)]
|
||||||
|
|
|
@ -6,8 +6,6 @@ use crate::{
|
||||||
Result, Value,
|
Result, Value,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::future::Either;
|
|
||||||
use futures::prelude::*;
|
|
||||||
use std::{sync::Arc, u32};
|
use std::{sync::Arc, u32};
|
||||||
|
|
||||||
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
|
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
|
||||||
|
@ -48,8 +46,8 @@ impl Client {
|
||||||
/// # use tikv_client::{Config, RawClient};
|
/// # use tikv_client::{Config, RawClient};
|
||||||
/// # use futures::prelude::*;
|
/// # use futures::prelude::*;
|
||||||
/// # futures::executor::block_on(async {
|
/// # futures::executor::block_on(async {
|
||||||
/// let client = RawClient::new(Config::default()).unwrap();
|
/// let client = RawClient::new(Config::default()).unwrap().with_cf("write");
|
||||||
/// let get_request = client.with_cf("write").get("foo".to_owned());
|
/// let get_request = client.get("foo".to_owned());
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn with_cf(&self, cf: impl Into<ColumnFamily>) -> Client {
|
pub fn with_cf(&self, cf: impl Into<ColumnFamily>) -> Client {
|
||||||
|
@ -71,8 +69,8 @@ impl Client {
|
||||||
/// # use tikv_client::{Config, RawClient, ToOwnedRange};
|
/// # use tikv_client::{Config, RawClient, ToOwnedRange};
|
||||||
/// # use futures::prelude::*;
|
/// # use futures::prelude::*;
|
||||||
/// # futures::executor::block_on(async {
|
/// # futures::executor::block_on(async {
|
||||||
/// let client = RawClient::new(Config::default()).unwrap();
|
/// let client = RawClient::new(Config::default()).unwrap().with_key_only(true);
|
||||||
/// let scan_request = client.with_key_only(true).scan(("TiKV"..="TiDB").to_owned(), 2);
|
/// let scan_request = client.scan(("TiKV"..="TiDB").to_owned(), 2);
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn with_key_only(&self, key_only: bool) -> Client {
|
pub fn with_key_only(&self, key_only: bool) -> Client {
|
||||||
|
@ -98,8 +96,10 @@ impl Client {
|
||||||
/// let result: Option<Value> = req.await.unwrap();
|
/// let result: Option<Value> = req.await.unwrap();
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn get(&self, key: impl Into<Key>) -> impl Future<Output = Result<Option<Value>>> {
|
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
|
||||||
requests::new_raw_get_request(key, self.cf.clone()).execute(self.rpc.clone())
|
requests::new_raw_get_request(key, self.cf.clone())
|
||||||
|
.execute(self.rpc.clone())
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new 'batch get' request.
|
/// Create a new 'batch get' request.
|
||||||
|
@ -117,11 +117,13 @@ impl Client {
|
||||||
/// let result: Vec<KvPair> = req.await.unwrap();
|
/// let result: Vec<KvPair> = req.await.unwrap();
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn batch_get(
|
pub async fn batch_get(
|
||||||
&self,
|
&self,
|
||||||
keys: impl IntoIterator<Item = impl Into<Key>>,
|
keys: impl IntoIterator<Item = impl Into<Key>>,
|
||||||
) -> impl Future<Output = Result<Vec<KvPair>>> {
|
) -> Result<Vec<KvPair>> {
|
||||||
requests::new_raw_batch_get_request(keys, self.cf.clone()).execute(self.rpc.clone())
|
requests::new_raw_batch_get_request(keys, self.cf.clone())
|
||||||
|
.execute(self.rpc.clone())
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new 'put' request.
|
/// Create a new 'put' request.
|
||||||
|
@ -139,12 +141,10 @@ impl Client {
|
||||||
/// let result: () = req.await.unwrap();
|
/// let result: () = req.await.unwrap();
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn put(
|
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
|
||||||
&self,
|
requests::new_raw_put_request(key, value, self.cf.clone())
|
||||||
key: impl Into<Key>,
|
.execute(self.rpc.clone())
|
||||||
value: impl Into<Value>,
|
.await
|
||||||
) -> impl Future<Output = Result<()>> {
|
|
||||||
requests::new_raw_put_request(key, value, self.cf.clone()).execute(self.rpc.clone())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new 'batch put' request.
|
/// Create a new 'batch put' request.
|
||||||
|
@ -163,11 +163,13 @@ impl Client {
|
||||||
/// let result: () = req.await.unwrap();
|
/// let result: () = req.await.unwrap();
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn batch_put(
|
pub async fn batch_put(
|
||||||
&self,
|
&self,
|
||||||
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
|
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
|
||||||
) -> impl Future<Output = Result<()>> {
|
) -> Result<()> {
|
||||||
requests::new_raw_batch_put_request(pairs, self.cf.clone()).execute(self.rpc.clone())
|
requests::new_raw_batch_put_request(pairs, self.cf.clone())
|
||||||
|
.execute(self.rpc.clone())
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new 'delete' request.
|
/// Create a new 'delete' request.
|
||||||
|
@ -184,8 +186,10 @@ impl Client {
|
||||||
/// let result: () = req.await.unwrap();
|
/// let result: () = req.await.unwrap();
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn delete(&self, key: impl Into<Key>) -> impl Future<Output = Result<()>> {
|
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
|
||||||
requests::new_raw_delete_request(key, self.cf.clone()).execute(self.rpc.clone())
|
requests::new_raw_delete_request(key, self.cf.clone())
|
||||||
|
.execute(self.rpc.clone())
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new 'batch delete' request.
|
/// Create a new 'batch delete' request.
|
||||||
|
@ -202,11 +206,10 @@ impl Client {
|
||||||
/// let result: () = req.await.unwrap();
|
/// let result: () = req.await.unwrap();
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn batch_delete(
|
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
|
||||||
&self,
|
requests::new_raw_batch_delete_request(keys, self.cf.clone())
|
||||||
keys: impl IntoIterator<Item = impl Into<Key>>,
|
.execute(self.rpc.clone())
|
||||||
) -> impl Future<Output = Result<()>> {
|
.await
|
||||||
requests::new_raw_batch_delete_request(keys, self.cf.clone()).execute(self.rpc.clone())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new 'delete range' request.
|
/// Create a new 'delete range' request.
|
||||||
|
@ -223,8 +226,10 @@ impl Client {
|
||||||
/// let result: () = req.await.unwrap();
|
/// let result: () = req.await.unwrap();
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn delete_range(&self, range: impl Into<BoundRange>) -> impl Future<Output = Result<()>> {
|
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
|
||||||
requests::new_raw_delete_range_request(range, self.cf.clone()).execute(self.rpc.clone())
|
requests::new_raw_delete_range_request(range, self.cf.clone())
|
||||||
|
.execute(self.rpc.clone())
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new 'scan' request.
|
/// Create a new 'scan' request.
|
||||||
|
@ -241,22 +246,14 @@ impl Client {
|
||||||
/// let result: Vec<KvPair> = req.await.unwrap();
|
/// let result: Vec<KvPair> = req.await.unwrap();
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn scan(
|
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
|
||||||
&self,
|
|
||||||
range: impl Into<BoundRange>,
|
|
||||||
limit: u32,
|
|
||||||
) -> impl Future<Output = Result<Vec<KvPair>>> {
|
|
||||||
if limit > MAX_RAW_KV_SCAN_LIMIT {
|
if limit > MAX_RAW_KV_SCAN_LIMIT {
|
||||||
Either::Right(future::err(Error::max_scan_limit_exceeded(
|
return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT));
|
||||||
limit,
|
|
||||||
MAX_RAW_KV_SCAN_LIMIT,
|
|
||||||
)))
|
|
||||||
} else {
|
|
||||||
Either::Left(
|
|
||||||
requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone())
|
|
||||||
.execute(self.rpc.clone()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone())
|
||||||
|
.execute(self.rpc.clone())
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new 'batch scan' request.
|
/// Create a new 'batch scan' request.
|
||||||
|
@ -275,26 +272,20 @@ impl Client {
|
||||||
/// let result = req.await;
|
/// let result = req.await;
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub fn batch_scan(
|
pub async fn batch_scan(
|
||||||
&self,
|
&self,
|
||||||
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
|
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
|
||||||
each_limit: u32,
|
each_limit: u32,
|
||||||
) -> impl Future<Output = Result<Vec<KvPair>>> {
|
) -> Result<Vec<KvPair>> {
|
||||||
if each_limit > MAX_RAW_KV_SCAN_LIMIT {
|
if each_limit > MAX_RAW_KV_SCAN_LIMIT {
|
||||||
Either::Right(future::err(Error::max_scan_limit_exceeded(
|
return Err(Error::max_scan_limit_exceeded(
|
||||||
each_limit,
|
each_limit,
|
||||||
MAX_RAW_KV_SCAN_LIMIT,
|
MAX_RAW_KV_SCAN_LIMIT,
|
||||||
)))
|
));
|
||||||
} else {
|
|
||||||
Either::Left(
|
|
||||||
requests::new_raw_batch_scan_request(
|
|
||||||
ranges,
|
|
||||||
each_limit,
|
|
||||||
self.key_only,
|
|
||||||
self.cf.clone(),
|
|
||||||
)
|
|
||||||
.execute(self.rpc.clone()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
requests::new_raw_batch_scan_request(ranges, each_limit, self.key_only, self.cf.clone())
|
||||||
|
.execute(self.rpc.clone())
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue