diff --git a/src/kv_client/mod.rs b/src/kv_client/mod.rs index 00937ef..959d93c 100644 --- a/src/kv_client/mod.rs +++ b/src/kv_client/mod.rs @@ -78,23 +78,21 @@ impl KvClient for KvRpcClient { } } -fn map_errors_and_trace( +async fn map_errors_and_trace( request_name: &'static str, fut: ::grpcio::Result, -) -> impl Future> +) -> Result where Compat01As03: Future>, Resp: HasError + Sized + Clone + Send + 'static, { - let context = tikv_stats(request_name); + let res = match fut { + Ok(f) => Compat01As03::new(f).await, + Err(e) => Err(e), + }; - // FIXME should handle the error, not unwrap. - Compat01As03::new(fut.unwrap()) - .map(|r| match r { - Err(e) => Err(ErrorKind::Grpc(e).into()), - Ok(r) => Ok(r), - }) - .map(move |r| context.done(r)) + let context = tikv_stats(request_name); + context.done(res.map_err(|e| ErrorKind::Grpc(e).into())) } #[derive(new)] diff --git a/src/raw/client.rs b/src/raw/client.rs index 91824f8..41c9994 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -6,8 +6,6 @@ use crate::{ Result, Value, }; -use futures::future::Either; -use futures::prelude::*; use std::{sync::Arc, u32}; const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; @@ -48,8 +46,8 @@ impl Client { /// # use tikv_client::{Config, RawClient}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { - /// let client = RawClient::new(Config::default()).unwrap(); - /// let get_request = client.with_cf("write").get("foo".to_owned()); + /// let client = RawClient::new(Config::default()).unwrap().with_cf("write"); + /// let get_request = client.get("foo".to_owned()); /// # }); /// ``` pub fn with_cf(&self, cf: impl Into) -> Client { @@ -71,8 +69,8 @@ impl Client { /// # use tikv_client::{Config, RawClient, ToOwnedRange}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { - /// let client = RawClient::new(Config::default()).unwrap(); - /// let scan_request = client.with_key_only(true).scan(("TiKV"..="TiDB").to_owned(), 2); + /// let client = RawClient::new(Config::default()).unwrap().with_key_only(true); + /// let scan_request = client.scan(("TiKV"..="TiDB").to_owned(), 2); /// # }); /// ``` pub fn with_key_only(&self, key_only: bool) -> Client { @@ -98,8 +96,10 @@ impl Client { /// let result: Option = req.await.unwrap(); /// # }); /// ``` - pub fn get(&self, key: impl Into) -> impl Future>> { - requests::new_raw_get_request(key, self.cf.clone()).execute(self.rpc.clone()) + pub async fn get(&self, key: impl Into) -> Result> { + requests::new_raw_get_request(key, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'batch get' request. @@ -117,11 +117,13 @@ impl Client { /// let result: Vec = req.await.unwrap(); /// # }); /// ``` - pub fn batch_get( + pub async fn batch_get( &self, keys: impl IntoIterator>, - ) -> impl Future>> { - requests::new_raw_batch_get_request(keys, self.cf.clone()).execute(self.rpc.clone()) + ) -> Result> { + requests::new_raw_batch_get_request(keys, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'put' request. @@ -139,12 +141,10 @@ impl Client { /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn put( - &self, - key: impl Into, - value: impl Into, - ) -> impl Future> { - requests::new_raw_put_request(key, value, self.cf.clone()).execute(self.rpc.clone()) + pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { + requests::new_raw_put_request(key, value, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'batch put' request. @@ -163,11 +163,13 @@ impl Client { /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn batch_put( + pub async fn batch_put( &self, pairs: impl IntoIterator>, - ) -> impl Future> { - requests::new_raw_batch_put_request(pairs, self.cf.clone()).execute(self.rpc.clone()) + ) -> Result<()> { + requests::new_raw_batch_put_request(pairs, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'delete' request. @@ -184,8 +186,10 @@ impl Client { /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn delete(&self, key: impl Into) -> impl Future> { - requests::new_raw_delete_request(key, self.cf.clone()).execute(self.rpc.clone()) + pub async fn delete(&self, key: impl Into) -> Result<()> { + requests::new_raw_delete_request(key, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'batch delete' request. @@ -202,11 +206,10 @@ impl Client { /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn batch_delete( - &self, - keys: impl IntoIterator>, - ) -> impl Future> { - requests::new_raw_batch_delete_request(keys, self.cf.clone()).execute(self.rpc.clone()) + pub async fn batch_delete(&self, keys: impl IntoIterator>) -> Result<()> { + requests::new_raw_batch_delete_request(keys, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'delete range' request. @@ -223,8 +226,10 @@ impl Client { /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn delete_range(&self, range: impl Into) -> impl Future> { - requests::new_raw_delete_range_request(range, self.cf.clone()).execute(self.rpc.clone()) + pub async fn delete_range(&self, range: impl Into) -> Result<()> { + requests::new_raw_delete_range_request(range, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'scan' request. @@ -241,22 +246,14 @@ impl Client { /// let result: Vec = req.await.unwrap(); /// # }); /// ``` - pub fn scan( - &self, - range: impl Into, - limit: u32, - ) -> impl Future>> { + pub async fn scan(&self, range: impl Into, limit: u32) -> Result> { if limit > MAX_RAW_KV_SCAN_LIMIT { - Either::Right(future::err(Error::max_scan_limit_exceeded( - limit, - MAX_RAW_KV_SCAN_LIMIT, - ))) - } else { - Either::Left( - requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) - .execute(self.rpc.clone()), - ) + return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT)); } + + requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'batch scan' request. @@ -275,26 +272,20 @@ impl Client { /// let result = req.await; /// # }); /// ``` - pub fn batch_scan( + pub async fn batch_scan( &self, ranges: impl IntoIterator>, each_limit: u32, - ) -> impl Future>> { + ) -> Result> { if each_limit > MAX_RAW_KV_SCAN_LIMIT { - Either::Right(future::err(Error::max_scan_limit_exceeded( + return Err(Error::max_scan_limit_exceeded( each_limit, MAX_RAW_KV_SCAN_LIMIT, - ))) - } else { - Either::Left( - requests::new_raw_batch_scan_request( - ranges, - each_limit, - self.key_only, - self.cf.clone(), - ) - .execute(self.rpc.clone()), - ) + )); } + + requests::new_raw_batch_scan_request(ranges, each_limit, self.key_only, self.cf.clone()) + .execute(self.rpc.clone()) + .await } }