From 73b2e6643019fe6e02a7b730d4d6e80489410dcd Mon Sep 17 00:00:00 2001 From: Steven Gu Date: Sat, 25 Jan 2020 21:57:24 +0800 Subject: [PATCH 1/3] =?UTF-8?q?Fixes=20issue-119=EF=BC=9AConvert=20futures?= =?UTF-8?q?=20to=20`async/await`.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Steven Gu --- src/kv_client/mod.rs | 22 +++++++----- src/mock.rs | 22 ++++++++---- src/raw/client.rs | 85 ++++++++++++++++++++++++-------------------- 3 files changed, 74 insertions(+), 55 deletions(-) diff --git a/src/kv_client/mod.rs b/src/kv_client/mod.rs index 00937ef..eca1b07 100644 --- a/src/kv_client/mod.rs +++ b/src/kv_client/mod.rs @@ -78,23 +78,27 @@ impl KvClient for KvRpcClient { } } -fn map_errors_and_trace( +async fn map_errors_and_trace( request_name: &'static str, fut: ::grpcio::Result, -) -> impl Future> +) -> Result where Compat01As03: Future>, Resp: HasError + Sized + Clone + Send + 'static, { let context = tikv_stats(request_name); - // FIXME should handle the error, not unwrap. - Compat01As03::new(fut.unwrap()) - .map(|r| match r { - Err(e) => Err(ErrorKind::Grpc(e).into()), - Ok(r) => Ok(r), - }) - .map(move |r| context.done(r)) + let fut = match fut { + Err(e) => return context.done(Err(ErrorKind::Grpc(e).into())), + Ok(f) => f, + }; + + let res = match Compat01As03::new(fut).await { + Err(e) => Err(ErrorKind::Grpc(e).into()), + Ok(r) => Ok(r), + }; + + context.done(res) } #[derive(new)] diff --git a/src/mock.rs b/src/mock.rs index ae837c7..aabcc8a 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -14,7 +14,7 @@ use crate::{ }; use fail::fail_point; -use futures::future::{ready, BoxFuture, FutureExt}; +use futures::future::{BoxFuture, FutureExt}; use grpcio::CallOption; use kvproto::{errorpb, kvrpcpb, metapb}; use std::{sync::Arc, time::Duration}; @@ -102,13 +102,16 @@ impl PdClient for MockPdClient { self: Arc, region: Region, ) -> BoxFuture<'static, Result>> { - Box::pin(ready(Ok(Store::new( + let store = Store::new( region, MockKvClient { addr: String::new(), }, Duration::from_secs(60), - )))) + ); + + let future = async move { Ok(store) }; + future.boxed() } fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result> { @@ -119,7 +122,8 @@ impl PdClient for MockPdClient { Self::region2() }; - Box::pin(ready(Ok(region))) + let future = async move { Ok(region) }; + future.boxed() } fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result> { @@ -129,7 +133,8 @@ impl PdClient for MockPdClient { _ => Err(Error::region_not_found(id)), }; - Box::pin(ready(result)) + let future = async move { result }; + future.boxed() } fn get_timestamp(self: Arc) -> BoxFuture<'static, Result> { @@ -145,8 +150,11 @@ impl DispatchHook for kvrpcpb::ResolveLockRequest { fail_point!("region-error", |_| { let mut resp = kvrpcpb::ResolveLockResponse::default(); resp.region_error = Some(errorpb::Error::default()); - Some(ready(Ok(resp)).boxed()) + let future = async move { Ok(resp) }; + Some(future.boxed()) }); - Some(ready(Ok(kvrpcpb::ResolveLockResponse::default())).boxed()) + + let future = async { Ok(kvrpcpb::ResolveLockResponse::default()) }; + Some(future.boxed()) } } diff --git a/src/raw/client.rs b/src/raw/client.rs index 91824f8..acfd9e7 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -7,7 +7,7 @@ use crate::{ }; use futures::future::Either; -use futures::prelude::*; +use futures::prelude::future; use std::{sync::Arc, u32}; const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; @@ -48,8 +48,8 @@ impl Client { /// # use tikv_client::{Config, RawClient}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { - /// let client = RawClient::new(Config::default()).unwrap(); - /// let get_request = client.with_cf("write").get("foo".to_owned()); + /// let client = RawClient::new(Config::default()).unwrap().with_cf("write"); + /// let get_request = client.get("foo".to_owned()); /// # }); /// ``` pub fn with_cf(&self, cf: impl Into) -> Client { @@ -71,8 +71,8 @@ impl Client { /// # use tikv_client::{Config, RawClient, ToOwnedRange}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { - /// let client = RawClient::new(Config::default()).unwrap(); - /// let scan_request = client.with_key_only(true).scan(("TiKV"..="TiDB").to_owned(), 2); + /// let client = RawClient::new(Config::default()).unwrap().with_key_only(true); + /// let scan_request = client.scan(("TiKV"..="TiDB").to_owned(), 2); /// # }); /// ``` pub fn with_key_only(&self, key_only: bool) -> Client { @@ -98,8 +98,10 @@ impl Client { /// let result: Option = req.await.unwrap(); /// # }); /// ``` - pub fn get(&self, key: impl Into) -> impl Future>> { - requests::new_raw_get_request(key, self.cf.clone()).execute(self.rpc.clone()) + pub async fn get(&self, key: impl Into) -> Result> { + requests::new_raw_get_request(key, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'batch get' request. @@ -117,11 +119,13 @@ impl Client { /// let result: Vec = req.await.unwrap(); /// # }); /// ``` - pub fn batch_get( + pub async fn batch_get( &self, keys: impl IntoIterator>, - ) -> impl Future>> { - requests::new_raw_batch_get_request(keys, self.cf.clone()).execute(self.rpc.clone()) + ) -> Result> { + requests::new_raw_batch_get_request(keys, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'put' request. @@ -139,12 +143,10 @@ impl Client { /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn put( - &self, - key: impl Into, - value: impl Into, - ) -> impl Future> { - requests::new_raw_put_request(key, value, self.cf.clone()).execute(self.rpc.clone()) + pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { + requests::new_raw_put_request(key, value, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'batch put' request. @@ -163,11 +165,13 @@ impl Client { /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn batch_put( + pub async fn batch_put( &self, pairs: impl IntoIterator>, - ) -> impl Future> { - requests::new_raw_batch_put_request(pairs, self.cf.clone()).execute(self.rpc.clone()) + ) -> Result<()> { + requests::new_raw_batch_put_request(pairs, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'delete' request. @@ -184,8 +188,10 @@ impl Client { /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn delete(&self, key: impl Into) -> impl Future> { - requests::new_raw_delete_request(key, self.cf.clone()).execute(self.rpc.clone()) + pub async fn delete(&self, key: impl Into) -> Result<()> { + requests::new_raw_delete_request(key, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'batch delete' request. @@ -202,11 +208,10 @@ impl Client { /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn batch_delete( - &self, - keys: impl IntoIterator>, - ) -> impl Future> { - requests::new_raw_batch_delete_request(keys, self.cf.clone()).execute(self.rpc.clone()) + pub async fn batch_delete(&self, keys: impl IntoIterator>) -> Result<()> { + requests::new_raw_batch_delete_request(keys, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'delete range' request. @@ -223,8 +228,10 @@ impl Client { /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn delete_range(&self, range: impl Into) -> impl Future> { - requests::new_raw_delete_range_request(range, self.cf.clone()).execute(self.rpc.clone()) + pub async fn delete_range(&self, range: impl Into) -> Result<()> { + requests::new_raw_delete_range_request(range, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'scan' request. @@ -241,12 +248,8 @@ impl Client { /// let result: Vec = req.await.unwrap(); /// # }); /// ``` - pub fn scan( - &self, - range: impl Into, - limit: u32, - ) -> impl Future>> { - if limit > MAX_RAW_KV_SCAN_LIMIT { + pub async fn scan(&self, range: impl Into, limit: u32) -> Result> { + let request = if limit > MAX_RAW_KV_SCAN_LIMIT { Either::Right(future::err(Error::max_scan_limit_exceeded( limit, MAX_RAW_KV_SCAN_LIMIT, @@ -256,7 +259,9 @@ impl Client { requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) .execute(self.rpc.clone()), ) - } + }; + + request.await } /// Create a new 'batch scan' request. @@ -275,12 +280,12 @@ impl Client { /// let result = req.await; /// # }); /// ``` - pub fn batch_scan( + pub async fn batch_scan( &self, ranges: impl IntoIterator>, each_limit: u32, - ) -> impl Future>> { - if each_limit > MAX_RAW_KV_SCAN_LIMIT { + ) -> Result> { + let request = if each_limit > MAX_RAW_KV_SCAN_LIMIT { Either::Right(future::err(Error::max_scan_limit_exceeded( each_limit, MAX_RAW_KV_SCAN_LIMIT, @@ -295,6 +300,8 @@ impl Client { ) .execute(self.rpc.clone()), ) - } + }; + + request.await } } From e3836ade98e0ba7c11a79511c05766cf9baf921e Mon Sep 17 00:00:00 2001 From: Steven Gu Date: Mon, 3 Feb 2020 10:33:18 +0800 Subject: [PATCH 2/3] Reverts changing `Box::pin`. Signed-off-by: Steven Gu --- src/mock.rs | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/mock.rs b/src/mock.rs index aabcc8a..ae837c7 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -14,7 +14,7 @@ use crate::{ }; use fail::fail_point; -use futures::future::{BoxFuture, FutureExt}; +use futures::future::{ready, BoxFuture, FutureExt}; use grpcio::CallOption; use kvproto::{errorpb, kvrpcpb, metapb}; use std::{sync::Arc, time::Duration}; @@ -102,16 +102,13 @@ impl PdClient for MockPdClient { self: Arc, region: Region, ) -> BoxFuture<'static, Result>> { - let store = Store::new( + Box::pin(ready(Ok(Store::new( region, MockKvClient { addr: String::new(), }, Duration::from_secs(60), - ); - - let future = async move { Ok(store) }; - future.boxed() + )))) } fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result> { @@ -122,8 +119,7 @@ impl PdClient for MockPdClient { Self::region2() }; - let future = async move { Ok(region) }; - future.boxed() + Box::pin(ready(Ok(region))) } fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result> { @@ -133,8 +129,7 @@ impl PdClient for MockPdClient { _ => Err(Error::region_not_found(id)), }; - let future = async move { result }; - future.boxed() + Box::pin(ready(result)) } fn get_timestamp(self: Arc) -> BoxFuture<'static, Result> { @@ -150,11 +145,8 @@ impl DispatchHook for kvrpcpb::ResolveLockRequest { fail_point!("region-error", |_| { let mut resp = kvrpcpb::ResolveLockResponse::default(); resp.region_error = Some(errorpb::Error::default()); - let future = async move { Ok(resp) }; - Some(future.boxed()) + Some(ready(Ok(resp)).boxed()) }); - - let future = async { Ok(kvrpcpb::ResolveLockResponse::default()) }; - Some(future.boxed()) + Some(ready(Ok(kvrpcpb::ResolveLockResponse::default())).boxed()) } } From 7f07f162be3db7e6b4521ab8e54c428f546a7a32 Mon Sep 17 00:00:00 2001 From: Steven Gu Date: Thu, 13 Feb 2020 00:43:20 +0800 Subject: [PATCH 3/3] Simplifies code according to the review. Signed-off-by: Steven Gu --- src/kv_client/mod.rs | 18 ++++++------------ src/raw/client.rs | 42 +++++++++++++----------------------------- 2 files changed, 19 insertions(+), 41 deletions(-) diff --git a/src/kv_client/mod.rs b/src/kv_client/mod.rs index eca1b07..959d93c 100644 --- a/src/kv_client/mod.rs +++ b/src/kv_client/mod.rs @@ -86,19 +86,13 @@ where Compat01As03: Future>, Resp: HasError + Sized + Clone + Send + 'static, { + let res = match fut { + Ok(f) => Compat01As03::new(f).await, + Err(e) => Err(e), + }; + let context = tikv_stats(request_name); - - let fut = match fut { - Err(e) => return context.done(Err(ErrorKind::Grpc(e).into())), - Ok(f) => f, - }; - - let res = match Compat01As03::new(fut).await { - Err(e) => Err(ErrorKind::Grpc(e).into()), - Ok(r) => Ok(r), - }; - - context.done(res) + context.done(res.map_err(|e| ErrorKind::Grpc(e).into())) } #[derive(new)] diff --git a/src/raw/client.rs b/src/raw/client.rs index acfd9e7..41c9994 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -6,8 +6,6 @@ use crate::{ Result, Value, }; -use futures::future::Either; -use futures::prelude::future; use std::{sync::Arc, u32}; const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; @@ -249,19 +247,13 @@ impl Client { /// # }); /// ``` pub async fn scan(&self, range: impl Into, limit: u32) -> Result> { - let request = if limit > MAX_RAW_KV_SCAN_LIMIT { - Either::Right(future::err(Error::max_scan_limit_exceeded( - limit, - MAX_RAW_KV_SCAN_LIMIT, - ))) - } else { - Either::Left( - requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) - .execute(self.rpc.clone()), - ) - }; + if limit > MAX_RAW_KV_SCAN_LIMIT { + return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT)); + } - request.await + requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) + .execute(self.rpc.clone()) + .await } /// Create a new 'batch scan' request. @@ -285,23 +277,15 @@ impl Client { ranges: impl IntoIterator>, each_limit: u32, ) -> Result> { - let request = if each_limit > MAX_RAW_KV_SCAN_LIMIT { - Either::Right(future::err(Error::max_scan_limit_exceeded( + if each_limit > MAX_RAW_KV_SCAN_LIMIT { + return Err(Error::max_scan_limit_exceeded( each_limit, MAX_RAW_KV_SCAN_LIMIT, - ))) - } else { - Either::Left( - requests::new_raw_batch_scan_request( - ranges, - each_limit, - self.key_only, - self.cf.clone(), - ) - .execute(self.rpc.clone()), - ) - }; + )); + } - request.await + requests::new_raw_batch_scan_request(ranges, each_limit, self.key_only, self.cf.clone()) + .execute(self.rpc.clone()) + .await } }