diff --git a/src/kv_client/client.rs b/src/kv_client/client.rs index dc63ea8..fddcd3e 100644 --- a/src/kv_client/client.rs +++ b/src/kv_client/client.rs @@ -13,7 +13,10 @@ use kvproto::tikvpb::TikvClient; use std::{sync::Arc, time::Duration}; use crate::{ - kv_client::HasError, pd::Region, raw::RawRequest, stats::tikv_stats, transaction::TxnInfo, + kv_client::{requests::KvRequest, HasError}, + pd::Region, + stats::tikv_stats, + transaction::TxnInfo, ErrorKind, Key, Result, }; @@ -25,7 +28,7 @@ pub struct KvRpcClient { } impl super::KvClient for KvRpcClient { - fn dispatch( + fn dispatch( &self, request: &T::RpcRequest, opt: CallOption, diff --git a/src/kv_client/mod.rs b/src/kv_client/mod.rs index 40ecac8..8098301 100644 --- a/src/kv_client/mod.rs +++ b/src/kv_client/mod.rs @@ -2,20 +2,21 @@ mod client; mod errors; +pub mod requests; pub use self::client::KvRpcClient; pub use self::errors::HasError; pub use kvproto::tikvpb::TikvClient; +use self::requests::{KvRequest, KvRpcRequest}; use crate::pd::Region; -use crate::raw::{ColumnFamily, RawRequest}; use crate::security::SecurityManager; use crate::Result; + use derive_new::new; use futures::future::BoxFuture; use grpcio::CallOption; use grpcio::Environment; -use kvproto::kvrpcpb; use std::sync::Arc; use std::time::Duration; @@ -51,7 +52,7 @@ impl KvConnect for TikvConnect { } pub trait KvClient { - fn dispatch( + fn dispatch( &self, request: &T::RpcRequest, opt: CallOption, @@ -70,7 +71,7 @@ impl Store { CallOption::default().timeout(self.timeout) } - pub fn request(&self) -> T { + pub fn request(&self) -> T { let mut request = T::default(); // FIXME propagate the error instead of using `expect` request.set_context( @@ -81,7 +82,7 @@ impl Store { request } - pub fn dispatch( + pub fn dispatch( &self, request: &T::RpcRequest, opt: CallOption, @@ -89,37 +90,3 @@ impl Store { self.client.dispatch::(request, opt) } } - -pub trait KvRawRequest: Default { - fn set_cf(&mut self, cf: String); - fn set_context(&mut self, context: kvrpcpb::Context); - - fn maybe_set_cf(&mut self, cf: Option) { - if let Some(cf) = cf { - self.set_cf(cf.to_string()); - } - } -} - -macro_rules! impl_raw_request { - ($name: ident) => { - impl KvRawRequest for kvrpcpb::$name { - fn set_cf(&mut self, cf: String) { - self.set_cf(cf); - } - fn set_context(&mut self, context: kvrpcpb::Context) { - self.set_context(context); - } - } - }; -} - -impl_raw_request!(RawGetRequest); -impl_raw_request!(RawBatchGetRequest); -impl_raw_request!(RawPutRequest); -impl_raw_request!(RawBatchPutRequest); -impl_raw_request!(RawDeleteRequest); -impl_raw_request!(RawBatchDeleteRequest); -impl_raw_request!(RawScanRequest); -impl_raw_request!(RawBatchScanRequest); -impl_raw_request!(RawDeleteRangeRequest); diff --git a/src/kv_client/requests/mod.rs b/src/kv_client/requests/mod.rs new file mode 100644 index 0000000..78b78a7 --- /dev/null +++ b/src/kv_client/requests/mod.rs @@ -0,0 +1,107 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +pub use raw::*; + +use crate::{ + kv_client::{HasError, KvClient, RpcFnType, Store}, + pd::PdClient, + Result, +}; + +use futures::future::BoxFuture; +use futures::prelude::*; +use futures::stream::BoxStream; +use grpcio::CallOption; +use kvproto::kvrpcpb; +use std::sync::Arc; + +mod mvcc; +mod raw; + +pub trait KvRequest: Sync + Send + 'static + Sized + Clone { + type Result; + type RpcRequest; + type RpcResponse: HasError + Clone + Send + 'static; + type KeyType; + const REQUEST_NAME: &'static str; + const RPC_FN: RpcFnType; + + fn execute( + mut self, + pd_client: Arc, + ) -> BoxFuture<'static, Result> { + let stores = self.store_stream(pd_client); + Self::reduce( + stores + .and_then(move |(key, store)| { + let request = self.clone().into_request(key, &store); + self.mock_dispatch(&request, store.call_options()) + .unwrap_or_else(|| store.dispatch::(&request, store.call_options())) + }) + .map_ok(move |r| Self::map_result(r)) + .boxed(), + ) + } + + fn store_stream( + &mut self, + pd_client: Arc, + ) -> BoxStream<'static, Result<(Self::KeyType, Store)>>; + + fn into_request( + self, + key: Self::KeyType, + store: &Store, + ) -> Self::RpcRequest; + + fn map_result(result: Self::RpcResponse) -> Self::Result; + + fn reduce( + results: BoxStream<'static, Result>, + ) -> BoxFuture<'static, Result>; +} + +/// Permits easy mocking of rpc calls. +pub trait MockDispatch: KvRequest { + fn mock_dispatch( + &self, + _request: &Self::RpcRequest, + _opt: CallOption, + ) -> Option>> { + None + } +} + +impl MockDispatch for T {} + +pub trait KvRpcRequest: Default { + fn set_context(&mut self, context: kvrpcpb::Context); +} + +macro_rules! impl_kv_rpc_request { + ($name: ident) => { + impl KvRpcRequest for kvrpcpb::$name { + fn set_context(&mut self, context: kvrpcpb::Context) { + self.set_context(context); + } + } + }; +} + +impl_kv_rpc_request!(RawGetRequest); +impl_kv_rpc_request!(RawBatchGetRequest); +impl_kv_rpc_request!(RawPutRequest); +impl_kv_rpc_request!(RawBatchPutRequest); +impl_kv_rpc_request!(RawDeleteRequest); +impl_kv_rpc_request!(RawBatchDeleteRequest); +impl_kv_rpc_request!(RawScanRequest); +impl_kv_rpc_request!(RawBatchScanRequest); +impl_kv_rpc_request!(RawDeleteRangeRequest); +impl_kv_rpc_request!(GetRequest); +impl_kv_rpc_request!(ScanRequest); +impl_kv_rpc_request!(PrewriteRequest); +impl_kv_rpc_request!(CommitRequest); +impl_kv_rpc_request!(CleanupRequest); +impl_kv_rpc_request!(BatchGetRequest); +impl_kv_rpc_request!(BatchRollbackRequest); +impl_kv_rpc_request!(ResolveLockRequest); diff --git a/src/kv_client/requests/mvcc.rs b/src/kv_client/requests/mvcc.rs new file mode 100644 index 0000000..10b40b7 --- /dev/null +++ b/src/kv_client/requests/mvcc.rs @@ -0,0 +1,175 @@ +use crate::{ + kv_client::{requests::KvRequest, KvClient, RpcFnType, Store}, + pd::PdClient, + BoundRange, Error, Key, KvPair, Result, Value, +}; +use futures::future::BoxFuture; +use futures::prelude::*; +use futures::stream::BoxStream; +use kvproto::kvrpcpb; +use kvproto::tikvpb::TikvClient; +use std::mem; +use std::sync::Arc; + +#[derive(Clone)] +pub struct Get { + pub key: Key, + pub version: u64, +} + +impl KvRequest for Get { + type Result = Option; + type RpcRequest = kvrpcpb::GetRequest; + type RpcResponse = kvrpcpb::GetResponse; + type KeyType = Key; + const REQUEST_NAME: &'static str = "kv_get"; + const RPC_FN: RpcFnType = TikvClient::kv_get_async_opt; + + fn into_request( + self, + key: Self::KeyType, + store: &Store, + ) -> Self::RpcRequest { + let mut req = store.request::(); + req.set_key(key.into()); + + req + } + + fn store_stream( + &mut self, + pd_client: Arc, + ) -> BoxStream<'static, Result<(Self::KeyType, Store)>> { + let key = self.key.clone(); + pd_client + .store_for_key(&self.key) + .map_ok(move |store| (key, store)) + .into_stream() + .boxed() + } + + fn map_result(mut resp: Self::RpcResponse) -> Self::Result { + let result: Value = resp.take_value().into(); + if result.is_empty() { + None + } else { + Some(result) + } + } + + fn reduce( + results: BoxStream<'static, Result>, + ) -> BoxFuture<'static, Result> { + results + .into_future() + .map(|(f, _)| f.expect("no results should be impossible")) + .boxed() + } +} + +#[derive(Clone)] +pub struct BatchGet { + pub keys: Vec, + pub version: u64, +} + +impl KvRequest for BatchGet { + type Result = Vec; + type RpcRequest = kvrpcpb::BatchGetRequest; + type RpcResponse = kvrpcpb::BatchGetResponse; + type KeyType = Vec; + const REQUEST_NAME: &'static str = "kv_batch_get"; + const RPC_FN: RpcFnType = + TikvClient::kv_batch_get_async_opt; + + fn into_request( + self, + keys: Self::KeyType, + store: &Store, + ) -> Self::RpcRequest { + let mut req = store.request::(); + req.set_keys(keys.into_iter().map(Into::into).collect()); + + req + } + + fn store_stream( + &mut self, + pd_client: Arc, + ) -> BoxStream<'static, Result<(Self::KeyType, Store)>> { + let mut keys = Vec::new(); + mem::swap(&mut keys, &mut self.keys); + + pd_client + .clone() + .group_keys_by_region(keys.into_iter()) + .and_then(move |(region_id, key)| { + pd_client + .clone() + .store_for_id(region_id) + .map_ok(move |store| (key, store)) + }) + .boxed() + } + + fn map_result(mut resp: Self::RpcResponse) -> Self::Result { + resp.take_pairs().into_iter().map(Into::into).collect() + } + + fn reduce( + results: BoxStream<'static, Result>, + ) -> BoxFuture<'static, Result> { + results.try_concat().boxed() + } +} + +#[derive(Clone)] +pub struct Scan { + pub range: BoundRange, + // TODO this limit is currently treated as a per-region limit, not a total + // limit. + pub limit: u32, + pub key_only: bool, + pub reverse: bool, + pub version: u64, +} + +impl KvRequest for Scan { + type Result = Vec; + type RpcRequest = kvrpcpb::ScanRequest; + type RpcResponse = kvrpcpb::ScanResponse; + type KeyType = (Key, Key); + const REQUEST_NAME: &'static str = "kv_scan"; + const RPC_FN: RpcFnType = TikvClient::kv_scan_async_opt; + + fn into_request( + self, + (start_key, end_key): Self::KeyType, + store: &Store, + ) -> Self::RpcRequest { + let mut req = store.request::(); + req.set_start_key(start_key.into()); + req.set_end_key(end_key.into()); + req.set_limit(self.limit); + req.set_key_only(self.key_only); + + req + } + + fn store_stream( + &mut self, + _pd_client: Arc, + ) -> BoxStream<'static, Result<(Self::KeyType, Store)>> { + future::err(Error::unimplemented()).into_stream().boxed() + } + + fn map_result(mut resp: Self::RpcResponse) -> Self::Result { + resp.take_pairs().into_iter().map(Into::into).collect() + } + + fn reduce( + results: BoxStream<'static, Result>, + ) -> BoxFuture<'static, Result> { + results.try_concat().boxed() + } +} diff --git a/src/raw/requests.rs b/src/kv_client/requests/raw.rs similarity index 88% rename from src/raw/requests.rs rename to src/kv_client/requests/raw.rs index 9d9d6d4..f8538da 100644 --- a/src/raw/requests.rs +++ b/src/kv_client/requests/raw.rs @@ -1,84 +1,26 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ - kv_client::{HasError, KvClient, KvRawRequest, RpcFnType, Store}, + kv_client::{requests::KvRequest, KvClient, RpcFnType, Store}, pd::PdClient, raw::ColumnFamily, BoundRange, Error, Key, KvPair, Result, Value, }; - use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::BoxStream; -use grpcio::CallOption; use kvproto::kvrpcpb; use kvproto::tikvpb::TikvClient; use std::mem; use std::sync::Arc; -pub trait RawRequest: Sync + Send + 'static + Sized + Clone { - type Result; - type RpcRequest; - type RpcResponse: HasError + Clone + Send + 'static; - type KeyType; - const REQUEST_NAME: &'static str; - const RPC_FN: RpcFnType; - - fn execute( - mut self, - pd_client: Arc, - ) -> BoxFuture<'static, Result> { - let stores = self.store_stream(pd_client); - Self::reduce( - stores - .and_then(move |(key, store)| { - let request = self.clone().into_request(key, &store); - self.mock_dispatch(&request, store.call_options()) - .unwrap_or_else(|| store.dispatch::(&request, store.call_options())) - }) - .map_ok(move |r| Self::map_result(r)) - .boxed(), - ) - } - - fn store_stream( - &mut self, - pd_client: Arc, - ) -> BoxStream<'static, Result<(Self::KeyType, Store)>>; - - fn into_request( - self, - key: Self::KeyType, - store: &Store, - ) -> Self::RpcRequest; - - fn map_result(result: Self::RpcResponse) -> Self::Result; - - fn reduce( - results: BoxStream<'static, Result>, - ) -> BoxFuture<'static, Result>; -} - -/// Permits easy mocking of rpc calls. -pub trait MockDispatch: RawRequest { - fn mock_dispatch( - &self, - _request: &Self::RpcRequest, - _opt: CallOption, - ) -> Option>> { - None - } -} - -impl MockDispatch for T {} - #[derive(Clone)] pub struct RawGet { pub key: Key, pub cf: Option, } -impl RawRequest for RawGet { +impl KvRequest for RawGet { type Result = Option; type RpcRequest = kvrpcpb::RawGetRequest; type RpcResponse = kvrpcpb::RawGetResponse; @@ -135,7 +77,7 @@ pub struct RawBatchGet { pub cf: Option, } -impl RawRequest for RawBatchGet { +impl KvRequest for RawBatchGet { type Result = Vec; type RpcRequest = kvrpcpb::RawBatchGetRequest; type RpcResponse = kvrpcpb::RawBatchGetResponse; @@ -213,7 +155,7 @@ impl RawPut { } } -impl RawRequest for RawPut { +impl KvRequest for RawPut { type Result = (); type RpcRequest = kvrpcpb::RawPutRequest; type RpcResponse = kvrpcpb::RawPutResponse; @@ -281,7 +223,7 @@ impl RawBatchPut { } } -impl RawRequest for RawBatchPut { +impl KvRequest for RawBatchPut { type Result = (); type RpcRequest = kvrpcpb::RawBatchPutRequest; type RpcResponse = kvrpcpb::RawBatchPutResponse; @@ -336,7 +278,7 @@ pub struct RawDelete { pub cf: Option, } -impl RawRequest for RawDelete { +impl KvRequest for RawDelete { type Result = (); type RpcRequest = kvrpcpb::RawDeleteRequest; type RpcResponse = kvrpcpb::RawDeleteResponse; @@ -386,7 +328,7 @@ pub struct RawBatchDelete { pub cf: Option, } -impl RawRequest for RawBatchDelete { +impl KvRequest for RawBatchDelete { type Result = (); type RpcRequest = kvrpcpb::RawBatchDeleteRequest; type RpcResponse = kvrpcpb::RawBatchDeleteResponse; @@ -441,7 +383,7 @@ pub struct RawDeleteRange { pub cf: Option, } -impl RawRequest for RawDeleteRange { +impl KvRequest for RawDeleteRange { type Result = (); type RpcRequest = kvrpcpb::RawDeleteRangeRequest; type RpcResponse = kvrpcpb::RawDeleteRangeResponse; @@ -501,7 +443,7 @@ pub struct RawScan { pub cf: Option, } -impl RawRequest for RawScan { +impl KvRequest for RawScan { type Result = Vec; type RpcRequest = kvrpcpb::RawScanRequest; type RpcResponse = kvrpcpb::RawScanResponse; @@ -559,7 +501,7 @@ pub struct RawBatchScan { pub cf: Option, } -impl RawRequest for RawBatchScan { +impl KvRequest for RawBatchScan { type Result = Vec; type RpcRequest = kvrpcpb::RawBatchScanRequest; type RpcResponse = kvrpcpb::RawBatchScanResponse; @@ -600,6 +542,36 @@ impl RawRequest for RawBatchScan { } } +trait RawRpcRequest { + fn set_cf(&mut self, cf: String); + + fn maybe_set_cf(&mut self, cf: Option) { + if let Some(cf) = cf { + self.set_cf(cf.to_string()); + } + } +} + +macro_rules! impl_raw_rpc_request { + ($name: ident) => { + impl RawRpcRequest for kvrpcpb::$name { + fn set_cf(&mut self, cf: String) { + self.set_cf(cf); + } + } + }; +} + +impl_raw_rpc_request!(RawGetRequest); +impl_raw_rpc_request!(RawBatchGetRequest); +impl_raw_rpc_request!(RawPutRequest); +impl_raw_rpc_request!(RawBatchPutRequest); +impl_raw_rpc_request!(RawDeleteRequest); +impl_raw_rpc_request!(RawBatchDeleteRequest); +impl_raw_rpc_request!(RawScanRequest); +impl_raw_rpc_request!(RawBatchScanRequest); +impl_raw_rpc_request!(RawDeleteRangeRequest); + #[cfg(test)] mod test { use super::*; diff --git a/src/pd/client.rs b/src/pd/client.rs index 036a851..1f0615c 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -204,7 +204,10 @@ impl PdRpcClient { #[cfg(test)] pub mod test { use super::*; - use crate::raw::{MockDispatch, RawRequest, RawScan}; + use crate::kv_client::{ + requests::{KvRequest, MockDispatch, RawScan}, + KvClient, + }; use crate::Error; use futures::executor; @@ -217,7 +220,7 @@ pub mod test { pub struct MockKvClient; impl KvClient for MockKvClient { - fn dispatch( + fn dispatch( &self, _request: &T::RpcRequest, _opt: CallOption, diff --git a/src/raw/client.rs b/src/raw/client.rs index 345ee4d..73e2fcd 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -1,13 +1,9 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use super::{ - requests::{ - RawBatchDelete, RawBatchGet, RawBatchPut, RawBatchScan, RawDelete, RawDeleteRange, RawGet, - RawPut, RawRequest, RawScan, - }, - ColumnFamily, +use super::ColumnFamily; +use crate::{ + kv_client::requests::*, pd::PdRpcClient, BoundRange, Config, Error, Key, KvPair, Result, Value, }; -use crate::{pd::PdRpcClient, BoundRange, Config, Error, Key, KvPair, Result, Value}; use futures::future::Either; use futures::prelude::*; diff --git a/src/raw/mod.rs b/src/raw/mod.rs index 27e23d2..e97c0ee 100644 --- a/src/raw/mod.rs +++ b/src/raw/mod.rs @@ -11,14 +11,10 @@ //! pub use self::client::Client; -pub(crate) use requests::RawRequest; -#[cfg(test)] -pub use requests::*; use std::fmt; mod client; -mod requests; /// A [`ColumnFamily`](ColumnFamily) is an optional parameter for [`raw::Client`](Client) requests. ///