Tidy up API of tikv-client-store

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2020-10-28 14:14:48 +13:00
parent 744f45fda9
commit 0b2eb449a6
10 changed files with 76 additions and 149 deletions

1
Cargo.lock generated
View File

@ -1921,6 +1921,7 @@ dependencies = [
name = "tikv-client-store"
version = "0.0.0"
dependencies = [
"async-trait",
"derive-new",
"futures 0.3.5",
"grpcio",

View File

@ -30,7 +30,7 @@ regex = "1"
serde = "1.0"
serde_derive = "1.0"
futures-timer = "3.0"
async-trait = "0.1.27"
async-trait = "0.1"
tokio = { version = "0.2", features = ["sync"] }
tikv-client-common = { path = "tikv-client-common" }

View File

@ -13,7 +13,7 @@ use std::{path::PathBuf, time::Duration};
///
/// By default, this client will use an insecure connection over instead of one protected by
/// Transport Layer Security (TLS). Your deployment may have chosen to rely on security measures
/// such as a private network, or a VPN layer to provid secure transmission.
/// such as a private network, or a VPN layer to provide secure transmission.
///
/// To use a TLS secured connection, use the `with_security` function to set the required
/// parameters.

View File

@ -13,10 +13,9 @@ use crate::{
use async_trait::async_trait;
use fail::fail_point;
use futures::future::{ready, BoxFuture, FutureExt};
use grpcio::CallOption;
use kvproto::{errorpb, kvrpcpb, metapb, tikvpb::TikvClient};
use std::{future::Future, sync::Arc, time::Duration};
use tikv_client_store::{HasError, KvClient, KvConnect, Region, RegionId, Store};
use kvproto::{errorpb, kvrpcpb, metapb};
use std::sync::Arc;
use tikv_client_store::{HasError, KvClient, KvConnect, Region, RegionId, RpcFnType, Store};
/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
/// client can be tested without doing any RPC calls.
@ -50,23 +49,15 @@ pub struct MockCluster;
pub struct MockPdClient;
#[async_trait]
impl KvClient for MockKvClient {
fn dispatch<Resp, RpcFuture>(
&self,
_request_name: &'static str,
_fut: grpcio::Result<RpcFuture>,
) -> BoxFuture<'static, Result<Resp>>
async fn dispatch<Req, Resp>(&self, _fun: RpcFnType<Req, Resp>, _request: Req) -> Result<Resp>
where
RpcFuture: Future<Output = std::result::Result<Resp, ::grpcio::Error>>,
Req: Send + Sync + 'static,
Resp: HasError + Sized + Clone + Send + 'static,
RpcFuture: Send + 'static,
{
unimplemented!()
}
fn get_rpc_client(&self) -> Arc<TikvClient> {
unimplemented!()
}
}
impl KvConnect for MockKvConnect {
@ -117,7 +108,6 @@ impl PdClient for MockPdClient {
MockKvClient {
addr: String::new(),
},
Duration::from_secs(60),
))
}
@ -150,10 +140,7 @@ impl PdClient for MockPdClient {
}
impl DispatchHook for kvrpcpb::ResolveLockRequest {
fn dispatch_hook(
&self,
_opt: CallOption,
) -> Option<BoxFuture<'static, Result<kvrpcpb::ResolveLockResponse>>> {
fn dispatch_hook(&self) -> Option<BoxFuture<'static, Result<kvrpcpb::ResolveLockResponse>>> {
fail_point!("region-error", |_| {
let mut resp = kvrpcpb::ResolveLockResponse::default();
resp.region_error = Some(errorpb::Error::default());

View File

@ -11,7 +11,6 @@ use std::{
collections::HashMap,
sync::{Arc, RwLock},
thread,
time::Duration,
};
use tikv_client_common::{kv::codec, Timestamp};
use tikv_client_pd::Cluster;
@ -198,7 +197,6 @@ pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl
pd: Arc<RetryClient<Cl>>,
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
timeout: Duration,
enable_codec: bool,
}
@ -207,11 +205,10 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
type KvClient = KvC::KvClient;
async fn map_region_to_store(self: Arc<Self>, region: Region) -> Result<Store<KvC::KvClient>> {
let timeout = self.timeout;
let store_id = region.get_store_id()?;
let store = self.pd.clone().get_store(store_id).await?;
let kv_client = self.kv_client(store.get_address())?;
Ok(Store::new(region, kv_client, timeout))
Ok(Store::new(region, kv_client))
}
async fn region_for_key(&self, key: &Key) -> Result<Region> {
@ -243,7 +240,7 @@ impl PdRpcClient<TikvConnect, Cluster> {
pub async fn connect(config: &Config, enable_codec: bool) -> Result<PdRpcClient> {
PdRpcClient::new(
config,
|env, security_mgr| TikvConnect::new(env, security_mgr),
|env, security_mgr| TikvConnect::new(env, security_mgr, config.timeout),
|env, security_mgr| {
RetryClient::connect(env, &config.pd_endpoints, security_mgr, config.timeout)
},
@ -296,7 +293,6 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
pd,
kv_client_cache,
kv_connect: kv_connect(env, security_mgr),
timeout: config.timeout,
enable_codec,
})
}

View File

@ -535,14 +535,10 @@ mod test {
executor,
future::{ready, BoxFuture},
};
use grpcio::CallOption;
use kvproto::kvrpcpb;
impl DispatchHook for kvrpcpb::RawScanRequest {
fn dispatch_hook(
&self,
_opt: CallOption,
) -> Option<BoxFuture<'static, Result<kvrpcpb::RawScanResponse>>> {
fn dispatch_hook(&self) -> Option<BoxFuture<'static, Result<kvrpcpb::RawScanResponse>>> {
assert!(self.key_only);
assert_eq!(self.limit, 10);

View File

@ -7,7 +7,6 @@ use crate::{
BoundRange, Error, ErrorKind, Key, Result,
};
use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
use grpcio::CallOption;
use kvproto::kvrpcpb;
use std::{
cmp::{max, min},
@ -19,7 +18,7 @@ const DEFAULT_REGION_BACKOFF: NoJitterBackoff = NoJitterBackoff::new(2, 500, 10)
pub const OPTIMISTIC_BACKOFF: NoJitterBackoff = NoJitterBackoff::new(2, 500, 10);
pub const PESSIMISTIC_BACKOFF: NoBackoff = NoBackoff;
pub trait KvRequest: Sync + Send + 'static + Sized {
pub trait KvRequest: Clone + Sync + Send + 'static + Sized {
type Result;
type RpcResponse: HasError + HasLocks + Clone + Send + 'static;
/// A single `KvRequest` can be divided into a number of RPC requests because the keys span
@ -68,14 +67,21 @@ pub trait KvRequest: Sync + Send + 'static + Sized {
Ok(r) => (Some(r), None),
Err(e) => (None, Some(e)),
};
self.dispatch_hook(store.call_options())
.unwrap_or_else(|| match (&req, err) {
(Some(req), None) => store.dispatch(
Self::REQUEST_NAME,
Self::RPC_FN(&store.client.get_rpc_client(), req, store.call_options()),
),
(None, Some(err)) => future::err(err).boxed(),
_ => unreachable!(),
self.dispatch_hook()
.unwrap_or_else({
let req = req.clone();
|| {
async move {
match (req, err) {
(Some(req), None) => {
store.dispatch(Self::REQUEST_NAME, Self::RPC_FN, req).await
}
(None, Some(err)) => Err(err),
_ => unreachable!(),
}
}
.boxed()
}
})
.map_ok(move |response| (req, response))
})
@ -274,20 +280,14 @@ pub fn store_stream_for_ranges<PdC: PdClient>(
/// Permits easy mocking of rpc calls.
pub trait DispatchHook: KvRequest {
fn dispatch_hook(
&self,
_opt: CallOption,
) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
fn dispatch_hook(&self) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
None
}
}
impl<T: KvRequest> DispatchHook for T {
#[cfg(test)]
default fn dispatch_hook(
&self,
_opt: CallOption,
) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
default fn dispatch_hook(&self) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
None
}
}
@ -332,6 +332,7 @@ mod test {
use super::*;
use crate::mock::MockPdClient;
use futures::executor;
use grpcio::CallOption;
use kvproto::tikvpb::TikvClient;
use std::sync::Mutex;
@ -354,6 +355,7 @@ mod test {
impl HasLocks for MockRpcResponse {}
#[derive(Clone)]
struct MockKvRequest {
test_invoking_count: Arc<Mutex<usize>>,
}
@ -368,10 +370,7 @@ mod test {
}
impl DispatchHook for MockKvRequest {
fn dispatch_hook(
&self,
_opt: CallOption,
) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
fn dispatch_hook(&self) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
Some(future::ok(MockRpcResponse {}).boxed())
}
}

View File

@ -3,12 +3,11 @@ name = "tikv-client-store"
version = "0.0.0"
edition = "2018"
[dependencies]
futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] }
kvproto = { git = "https://github.com/pingcap/kvproto.git", features = ["prost-codec"], default-features = false }
async-trait = "0.1"
derive-new = "0.5"
futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] }
grpcio = { version = "0.6", features = ["secure", "prost-codec"], default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git", features = ["prost-codec"], default-features = false }
log = "0.4"
tikv-client-common = { path = "../tikv-client-common" }

View File

@ -1,32 +1,35 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
#![feature(type_alias_impl_trait)]
#[macro_use]
extern crate log;
mod errors;
pub mod region;
mod store_builder;
pub use crate::errors::{HasError, HasRegionError};
#[doc(inline)]
pub use crate::region::{Region, RegionId, RegionVerId, StoreId};
#[doc(inline)]
pub use crate::store_builder::StoreBuilder;
pub use kvproto::tikvpb::TikvClient;
pub use tikv_client_common::{
security::SecurityManager, stats::tikv_stats, Error, ErrorKind, Key, Result,
};
pub use tikv_client_common::{security::SecurityManager, Error, ErrorKind, Key, Result};
use async_trait::async_trait;
use derive_new::new;
use futures::{future::BoxFuture, prelude::*};
use grpcio::{CallOption, Environment};
use std::{sync::Arc, time::Duration};
use tikv_client_common::stats::tikv_stats;
/// A trait for connecting to TiKV stores.
pub trait KvConnect: Sized + Send + Sync + 'static {
type KvClient: KvClient + Clone + Send + Sync + 'static;
fn connect(&self, address: &str) -> Result<Self::KvClient>;
fn connect_to_store(&self, region: Region, address: String) -> Result<Store<Self::KvClient>> {
info!("connect to tikv endpoint: {:?}", &address);
let client = self.connect(address.as_str())?;
Ok(Store::new(region, client))
}
}
pub type RpcFnType<Req, Resp> =
@ -41,55 +44,49 @@ pub type RpcFnType<Req, Resp> =
pub struct TikvConnect {
env: Arc<Environment>,
security_mgr: Arc<SecurityManager>,
timeout: Duration,
}
impl KvConnect for TikvConnect {
type KvClient = KvRpcClient;
type KvClient = impl KvClient + Clone + Send + Sync + 'static;
fn connect(&self, address: &str) -> Result<KvRpcClient> {
fn connect(&self, address: &str) -> Result<Self::KvClient> {
self.security_mgr
.connect(self.env.clone(), address, TikvClient::new)
.map(|c| KvRpcClient::new(Arc::new(c)))
.map(|c| KvRpcClient::new(Arc::new(c), self.timeout))
}
}
#[async_trait]
pub trait KvClient {
fn dispatch<Resp, RpcFuture>(
&self,
request_name: &'static str,
fut: ::grpcio::Result<RpcFuture>,
) -> BoxFuture<'static, Result<Resp>>
async fn dispatch<Req, Resp>(&self, fun: RpcFnType<Req, Resp>, request: Req) -> Result<Resp>
where
RpcFuture: Future<Output = std::result::Result<Resp, ::grpcio::Error>>,
Resp: HasError + Sized + Clone + Send + 'static,
RpcFuture: Send + 'static;
fn get_rpc_client(&self) -> Arc<TikvClient>;
Req: Send + Sync + 'static,
Resp: HasError + Sized + Clone + Send + 'static;
}
/// This client handles requests for a single TiKV node. It converts the data
/// types and abstractions of the client program into the grpc data types.
#[derive(new, Clone)]
pub struct KvRpcClient {
pub rpc_client: Arc<TikvClient>,
struct KvRpcClient {
rpc_client: Arc<TikvClient>,
timeout: Duration,
}
#[async_trait]
impl KvClient for KvRpcClient {
fn dispatch<Resp, RpcFuture>(
&self,
request_name: &'static str,
fut: ::grpcio::Result<RpcFuture>,
) -> BoxFuture<'static, Result<Resp>>
async fn dispatch<Req, Resp>(&self, fun: RpcFnType<Req, Resp>, request: Req) -> Result<Resp>
where
RpcFuture: Future<Output = std::result::Result<Resp, ::grpcio::Error>>,
Req: Send + Sync + 'static,
Resp: HasError + Sized + Clone + Send + 'static,
RpcFuture: Send + 'static,
{
map_errors_and_trace(request_name, fut).boxed()
}
fn get_rpc_client(&self) -> Arc<TikvClient> {
self.rpc_client.clone()
fun(
&self.rpc_client,
&request,
CallOption::default().timeout(self.timeout),
)?
.await
.map_err(|e| ErrorKind::Grpc(e).into())
}
}
@ -97,51 +94,21 @@ impl KvClient for KvRpcClient {
pub struct Store<Client: KvClient> {
pub region: Region,
pub client: Client,
timeout: Duration,
}
impl<Client: KvClient> Store<Client> {
pub fn from_builder<T>(builder: StoreBuilder, connect: Arc<T>) -> Result<Store<Client>>
where
Client: KvClient + Clone + Send + Sync + 'static,
T: KvConnect<KvClient = Client>,
{
info!("connect to tikv endpoint: {:?}", &builder.address);
let client = connect.connect(builder.address.as_str())?;
Ok(Store::new(builder.region, client, builder.timeout))
}
pub fn call_options(&self) -> CallOption {
CallOption::default().timeout(self.timeout)
}
pub fn dispatch<Resp, RpcFuture>(
pub async fn dispatch<Req, Resp>(
&self,
request_name: &'static str,
fut: ::grpcio::Result<RpcFuture>,
) -> BoxFuture<'static, Result<Resp>>
fun: RpcFnType<Req, Resp>,
request: Req,
) -> Result<Resp>
where
RpcFuture: Future<Output = std::result::Result<Resp, ::grpcio::Error>>,
Req: Send + Sync + 'static,
Resp: HasError + Sized + Clone + Send + 'static,
RpcFuture: Send + 'static,
{
self.client.dispatch(request_name, fut)
let result = self.client.dispatch(fun, request).await;
tikv_stats(request_name).done(result)
}
}
async fn map_errors_and_trace<Resp, RpcFuture>(
request_name: &'static str,
fut: ::grpcio::Result<RpcFuture>,
) -> Result<Resp>
where
RpcFuture: Future<Output = std::result::Result<Resp, ::grpcio::Error>>,
Resp: HasError + Sized + Clone + Send + 'static,
{
let res = match fut {
Ok(f) => f.await,
Err(e) => Err(e),
};
let context = tikv_stats(request_name);
context.done(res.map_err(|e| ErrorKind::Grpc(e).into()))
}

View File

@ -1,18 +0,0 @@
use crate::Region;
use std::time::Duration;
pub struct StoreBuilder {
pub region: Region,
pub address: String,
pub timeout: Duration,
}
impl StoreBuilder {
pub fn new(region: Region, address: String, timeout: Duration) -> StoreBuilder {
StoreBuilder {
region,
address,
timeout,
}
}
}