Add request based mvcc api

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2019-08-14 10:49:25 +08:00
parent a74e1abd1a
commit 332933fc9e
No known key found for this signature in database
GPG Key ID: 353E7ED34BF326E0
8 changed files with 341 additions and 122 deletions

View File

@ -13,7 +13,10 @@ use kvproto::tikvpb::TikvClient;
use std::{sync::Arc, time::Duration};
use crate::{
kv_client::HasError, pd::Region, raw::RawRequest, stats::tikv_stats, transaction::TxnInfo,
kv_client::{requests::KvRequest, HasError},
pd::Region,
stats::tikv_stats,
transaction::TxnInfo,
ErrorKind, Key, Result,
};
@ -25,7 +28,7 @@ pub struct KvRpcClient {
}
impl super::KvClient for KvRpcClient {
fn dispatch<T: RawRequest>(
fn dispatch<T: KvRequest>(
&self,
request: &T::RpcRequest,
opt: CallOption,

View File

@ -2,20 +2,21 @@
mod client;
mod errors;
pub mod requests;
pub use self::client::KvRpcClient;
pub use self::errors::HasError;
pub use kvproto::tikvpb::TikvClient;
use self::requests::{KvRequest, KvRpcRequest};
use crate::pd::Region;
use crate::raw::{ColumnFamily, RawRequest};
use crate::security::SecurityManager;
use crate::Result;
use derive_new::new;
use futures::future::BoxFuture;
use grpcio::CallOption;
use grpcio::Environment;
use kvproto::kvrpcpb;
use std::sync::Arc;
use std::time::Duration;
@ -51,7 +52,7 @@ impl KvConnect for TikvConnect {
}
pub trait KvClient {
fn dispatch<T: RawRequest>(
fn dispatch<T: KvRequest>(
&self,
request: &T::RpcRequest,
opt: CallOption,
@ -70,7 +71,7 @@ impl<Client: KvClient> Store<Client> {
CallOption::default().timeout(self.timeout)
}
pub fn request<T: KvRawRequest>(&self) -> T {
pub fn request<T: KvRpcRequest>(&self) -> T {
let mut request = T::default();
// FIXME propagate the error instead of using `expect`
request.set_context(
@ -81,7 +82,7 @@ impl<Client: KvClient> Store<Client> {
request
}
pub fn dispatch<T: RawRequest>(
pub fn dispatch<T: KvRequest>(
&self,
request: &T::RpcRequest,
opt: CallOption,
@ -89,37 +90,3 @@ impl<Client: KvClient> Store<Client> {
self.client.dispatch::<T>(request, opt)
}
}
pub trait KvRawRequest: Default {
fn set_cf(&mut self, cf: String);
fn set_context(&mut self, context: kvrpcpb::Context);
fn maybe_set_cf(&mut self, cf: Option<ColumnFamily>) {
if let Some(cf) = cf {
self.set_cf(cf.to_string());
}
}
}
macro_rules! impl_raw_request {
($name: ident) => {
impl KvRawRequest for kvrpcpb::$name {
fn set_cf(&mut self, cf: String) {
self.set_cf(cf);
}
fn set_context(&mut self, context: kvrpcpb::Context) {
self.set_context(context);
}
}
};
}
impl_raw_request!(RawGetRequest);
impl_raw_request!(RawBatchGetRequest);
impl_raw_request!(RawPutRequest);
impl_raw_request!(RawBatchPutRequest);
impl_raw_request!(RawDeleteRequest);
impl_raw_request!(RawBatchDeleteRequest);
impl_raw_request!(RawScanRequest);
impl_raw_request!(RawBatchScanRequest);
impl_raw_request!(RawDeleteRangeRequest);

View File

@ -0,0 +1,107 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
pub use raw::*;
use crate::{
kv_client::{HasError, KvClient, RpcFnType, Store},
pd::PdClient,
Result,
};
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::BoxStream;
use grpcio::CallOption;
use kvproto::kvrpcpb;
use std::sync::Arc;
mod mvcc;
mod raw;
pub trait KvRequest: Sync + Send + 'static + Sized + Clone {
type Result;
type RpcRequest;
type RpcResponse: HasError + Clone + Send + 'static;
type KeyType;
const REQUEST_NAME: &'static str;
const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse>;
fn execute(
mut self,
pd_client: Arc<impl PdClient>,
) -> BoxFuture<'static, Result<Self::Result>> {
let stores = self.store_stream(pd_client);
Self::reduce(
stores
.and_then(move |(key, store)| {
let request = self.clone().into_request(key, &store);
self.mock_dispatch(&request, store.call_options())
.unwrap_or_else(|| store.dispatch::<Self>(&request, store.call_options()))
})
.map_ok(move |r| Self::map_result(r))
.boxed(),
)
}
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>>;
fn into_request<KvC: KvClient>(
self,
key: Self::KeyType,
store: &Store<KvC>,
) -> Self::RpcRequest;
fn map_result(result: Self::RpcResponse) -> Self::Result;
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>>;
}
/// Permits easy mocking of rpc calls.
pub trait MockDispatch: KvRequest {
fn mock_dispatch(
&self,
_request: &Self::RpcRequest,
_opt: CallOption,
) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
None
}
}
impl<T: KvRequest> MockDispatch for T {}
pub trait KvRpcRequest: Default {
fn set_context(&mut self, context: kvrpcpb::Context);
}
macro_rules! impl_kv_rpc_request {
($name: ident) => {
impl KvRpcRequest for kvrpcpb::$name {
fn set_context(&mut self, context: kvrpcpb::Context) {
self.set_context(context);
}
}
};
}
impl_kv_rpc_request!(RawGetRequest);
impl_kv_rpc_request!(RawBatchGetRequest);
impl_kv_rpc_request!(RawPutRequest);
impl_kv_rpc_request!(RawBatchPutRequest);
impl_kv_rpc_request!(RawDeleteRequest);
impl_kv_rpc_request!(RawBatchDeleteRequest);
impl_kv_rpc_request!(RawScanRequest);
impl_kv_rpc_request!(RawBatchScanRequest);
impl_kv_rpc_request!(RawDeleteRangeRequest);
impl_kv_rpc_request!(GetRequest);
impl_kv_rpc_request!(ScanRequest);
impl_kv_rpc_request!(PrewriteRequest);
impl_kv_rpc_request!(CommitRequest);
impl_kv_rpc_request!(CleanupRequest);
impl_kv_rpc_request!(BatchGetRequest);
impl_kv_rpc_request!(BatchRollbackRequest);
impl_kv_rpc_request!(ResolveLockRequest);

View File

@ -0,0 +1,175 @@
use crate::{
kv_client::{requests::KvRequest, KvClient, RpcFnType, Store},
pd::PdClient,
BoundRange, Error, Key, KvPair, Result, Value,
};
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::BoxStream;
use kvproto::kvrpcpb;
use kvproto::tikvpb::TikvClient;
use std::mem;
use std::sync::Arc;
#[derive(Clone)]
pub struct Get {
pub key: Key,
pub version: u64,
}
impl KvRequest for Get {
type Result = Option<Value>;
type RpcRequest = kvrpcpb::GetRequest;
type RpcResponse = kvrpcpb::GetResponse;
type KeyType = Key;
const REQUEST_NAME: &'static str = "kv_get";
const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> = TikvClient::kv_get_async_opt;
fn into_request<KvC: KvClient>(
self,
key: Self::KeyType,
store: &Store<KvC>,
) -> Self::RpcRequest {
let mut req = store.request::<Self::RpcRequest>();
req.set_key(key.into());
req
}
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>> {
let key = self.key.clone();
pd_client
.store_for_key(&self.key)
.map_ok(move |store| (key, store))
.into_stream()
.boxed()
}
fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
let result: Value = resp.take_value().into();
if result.is_empty() {
None
} else {
Some(result)
}
}
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results
.into_future()
.map(|(f, _)| f.expect("no results should be impossible"))
.boxed()
}
}
#[derive(Clone)]
pub struct BatchGet {
pub keys: Vec<Key>,
pub version: u64,
}
impl KvRequest for BatchGet {
type Result = Vec<KvPair>;
type RpcRequest = kvrpcpb::BatchGetRequest;
type RpcResponse = kvrpcpb::BatchGetResponse;
type KeyType = Vec<Key>;
const REQUEST_NAME: &'static str = "kv_batch_get";
const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> =
TikvClient::kv_batch_get_async_opt;
fn into_request<KvC: KvClient>(
self,
keys: Self::KeyType,
store: &Store<KvC>,
) -> Self::RpcRequest {
let mut req = store.request::<Self::RpcRequest>();
req.set_keys(keys.into_iter().map(Into::into).collect());
req
}
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>> {
let mut keys = Vec::new();
mem::swap(&mut keys, &mut self.keys);
pd_client
.clone()
.group_keys_by_region(keys.into_iter())
.and_then(move |(region_id, key)| {
pd_client
.clone()
.store_for_id(region_id)
.map_ok(move |store| (key, store))
})
.boxed()
}
fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
resp.take_pairs().into_iter().map(Into::into).collect()
}
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results.try_concat().boxed()
}
}
#[derive(Clone)]
pub struct Scan {
pub range: BoundRange,
// TODO this limit is currently treated as a per-region limit, not a total
// limit.
pub limit: u32,
pub key_only: bool,
pub reverse: bool,
pub version: u64,
}
impl KvRequest for Scan {
type Result = Vec<KvPair>;
type RpcRequest = kvrpcpb::ScanRequest;
type RpcResponse = kvrpcpb::ScanResponse;
type KeyType = (Key, Key);
const REQUEST_NAME: &'static str = "kv_scan";
const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> = TikvClient::kv_scan_async_opt;
fn into_request<KvC: KvClient>(
self,
(start_key, end_key): Self::KeyType,
store: &Store<KvC>,
) -> Self::RpcRequest {
let mut req = store.request::<Self::RpcRequest>();
req.set_start_key(start_key.into());
req.set_end_key(end_key.into());
req.set_limit(self.limit);
req.set_key_only(self.key_only);
req
}
fn store_stream<PdC: PdClient>(
&mut self,
_pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>> {
future::err(Error::unimplemented()).into_stream().boxed()
}
fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
resp.take_pairs().into_iter().map(Into::into).collect()
}
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results.try_concat().boxed()
}
}

View File

@ -1,84 +1,26 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
kv_client::{HasError, KvClient, KvRawRequest, RpcFnType, Store},
kv_client::{requests::KvRequest, KvClient, RpcFnType, Store},
pd::PdClient,
raw::ColumnFamily,
BoundRange, Error, Key, KvPair, Result, Value,
};
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::BoxStream;
use grpcio::CallOption;
use kvproto::kvrpcpb;
use kvproto::tikvpb::TikvClient;
use std::mem;
use std::sync::Arc;
pub trait RawRequest: Sync + Send + 'static + Sized + Clone {
type Result;
type RpcRequest;
type RpcResponse: HasError + Clone + Send + 'static;
type KeyType;
const REQUEST_NAME: &'static str;
const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse>;
fn execute(
mut self,
pd_client: Arc<impl PdClient>,
) -> BoxFuture<'static, Result<Self::Result>> {
let stores = self.store_stream(pd_client);
Self::reduce(
stores
.and_then(move |(key, store)| {
let request = self.clone().into_request(key, &store);
self.mock_dispatch(&request, store.call_options())
.unwrap_or_else(|| store.dispatch::<Self>(&request, store.call_options()))
})
.map_ok(move |r| Self::map_result(r))
.boxed(),
)
}
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>>;
fn into_request<KvC: KvClient>(
self,
key: Self::KeyType,
store: &Store<KvC>,
) -> Self::RpcRequest;
fn map_result(result: Self::RpcResponse) -> Self::Result;
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>>;
}
/// Permits easy mocking of rpc calls.
pub trait MockDispatch: RawRequest {
fn mock_dispatch(
&self,
_request: &Self::RpcRequest,
_opt: CallOption,
) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
None
}
}
impl<T: RawRequest> MockDispatch for T {}
#[derive(Clone)]
pub struct RawGet {
pub key: Key,
pub cf: Option<ColumnFamily>,
}
impl RawRequest for RawGet {
impl KvRequest for RawGet {
type Result = Option<Value>;
type RpcRequest = kvrpcpb::RawGetRequest;
type RpcResponse = kvrpcpb::RawGetResponse;
@ -135,7 +77,7 @@ pub struct RawBatchGet {
pub cf: Option<ColumnFamily>,
}
impl RawRequest for RawBatchGet {
impl KvRequest for RawBatchGet {
type Result = Vec<KvPair>;
type RpcRequest = kvrpcpb::RawBatchGetRequest;
type RpcResponse = kvrpcpb::RawBatchGetResponse;
@ -213,7 +155,7 @@ impl RawPut {
}
}
impl RawRequest for RawPut {
impl KvRequest for RawPut {
type Result = ();
type RpcRequest = kvrpcpb::RawPutRequest;
type RpcResponse = kvrpcpb::RawPutResponse;
@ -281,7 +223,7 @@ impl RawBatchPut {
}
}
impl RawRequest for RawBatchPut {
impl KvRequest for RawBatchPut {
type Result = ();
type RpcRequest = kvrpcpb::RawBatchPutRequest;
type RpcResponse = kvrpcpb::RawBatchPutResponse;
@ -336,7 +278,7 @@ pub struct RawDelete {
pub cf: Option<ColumnFamily>,
}
impl RawRequest for RawDelete {
impl KvRequest for RawDelete {
type Result = ();
type RpcRequest = kvrpcpb::RawDeleteRequest;
type RpcResponse = kvrpcpb::RawDeleteResponse;
@ -386,7 +328,7 @@ pub struct RawBatchDelete {
pub cf: Option<ColumnFamily>,
}
impl RawRequest for RawBatchDelete {
impl KvRequest for RawBatchDelete {
type Result = ();
type RpcRequest = kvrpcpb::RawBatchDeleteRequest;
type RpcResponse = kvrpcpb::RawBatchDeleteResponse;
@ -441,7 +383,7 @@ pub struct RawDeleteRange {
pub cf: Option<ColumnFamily>,
}
impl RawRequest for RawDeleteRange {
impl KvRequest for RawDeleteRange {
type Result = ();
type RpcRequest = kvrpcpb::RawDeleteRangeRequest;
type RpcResponse = kvrpcpb::RawDeleteRangeResponse;
@ -501,7 +443,7 @@ pub struct RawScan {
pub cf: Option<ColumnFamily>,
}
impl RawRequest for RawScan {
impl KvRequest for RawScan {
type Result = Vec<KvPair>;
type RpcRequest = kvrpcpb::RawScanRequest;
type RpcResponse = kvrpcpb::RawScanResponse;
@ -559,7 +501,7 @@ pub struct RawBatchScan {
pub cf: Option<ColumnFamily>,
}
impl RawRequest for RawBatchScan {
impl KvRequest for RawBatchScan {
type Result = Vec<KvPair>;
type RpcRequest = kvrpcpb::RawBatchScanRequest;
type RpcResponse = kvrpcpb::RawBatchScanResponse;
@ -600,6 +542,36 @@ impl RawRequest for RawBatchScan {
}
}
trait RawRpcRequest {
fn set_cf(&mut self, cf: String);
fn maybe_set_cf(&mut self, cf: Option<ColumnFamily>) {
if let Some(cf) = cf {
self.set_cf(cf.to_string());
}
}
}
macro_rules! impl_raw_rpc_request {
($name: ident) => {
impl RawRpcRequest for kvrpcpb::$name {
fn set_cf(&mut self, cf: String) {
self.set_cf(cf);
}
}
};
}
impl_raw_rpc_request!(RawGetRequest);
impl_raw_rpc_request!(RawBatchGetRequest);
impl_raw_rpc_request!(RawPutRequest);
impl_raw_rpc_request!(RawBatchPutRequest);
impl_raw_rpc_request!(RawDeleteRequest);
impl_raw_rpc_request!(RawBatchDeleteRequest);
impl_raw_rpc_request!(RawScanRequest);
impl_raw_rpc_request!(RawBatchScanRequest);
impl_raw_rpc_request!(RawDeleteRangeRequest);
#[cfg(test)]
mod test {
use super::*;

View File

@ -204,7 +204,10 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdRpcClient<KvC> {
#[cfg(test)]
pub mod test {
use super::*;
use crate::raw::{MockDispatch, RawRequest, RawScan};
use crate::kv_client::{
requests::{KvRequest, MockDispatch, RawScan},
KvClient,
};
use crate::Error;
use futures::executor;
@ -217,7 +220,7 @@ pub mod test {
pub struct MockKvClient;
impl KvClient for MockKvClient {
fn dispatch<T: RawRequest>(
fn dispatch<T: KvRequest>(
&self,
_request: &T::RpcRequest,
_opt: CallOption,

View File

@ -1,13 +1,9 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use super::{
requests::{
RawBatchDelete, RawBatchGet, RawBatchPut, RawBatchScan, RawDelete, RawDeleteRange, RawGet,
RawPut, RawRequest, RawScan,
},
ColumnFamily,
use super::ColumnFamily;
use crate::{
kv_client::requests::*, pd::PdRpcClient, BoundRange, Config, Error, Key, KvPair, Result, Value,
};
use crate::{pd::PdRpcClient, BoundRange, Config, Error, Key, KvPair, Result, Value};
use futures::future::Either;
use futures::prelude::*;

View File

@ -11,14 +11,10 @@
//!
pub use self::client::Client;
pub(crate) use requests::RawRequest;
#[cfg(test)]
pub use requests::*;
use std::fmt;
mod client;
mod requests;
/// A [`ColumnFamily`](ColumnFamily) is an optional parameter for [`raw::Client`](Client) requests.
///