Remove DispatchHook

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2020-10-29 13:06:00 +13:00
parent 79a2c87408
commit edf6a8055e
6 changed files with 97 additions and 105 deletions

View File

@ -7,13 +7,11 @@
use crate::{
pd::{PdClient, PdRpcClient, RetryClient},
request::DispatchHook,
Config, Error, Key, Result, Timestamp,
};
use async_trait::async_trait;
use fail::fail_point;
use futures::future::{ready, BoxFuture, FutureExt};
use kvproto::{errorpb, kvrpcpb, metapb};
use derive_new::new;
use kvproto::metapb;
use std::{any::Any, sync::Arc};
use tikv_client_store::{KvClient, KvConnect, Region, RegionId, Request, Store};
@ -38,21 +36,40 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
.unwrap()
}
#[derive(Clone, Eq, PartialEq, Debug)]
#[derive(new, Default, Clone)]
pub struct MockKvClient {
addr: String,
pub addr: String,
dispatch: Option<Arc<dyn Fn(&dyn Any) -> Result<Box<dyn Any>> + Send + Sync + 'static>>,
}
impl MockKvClient {
pub fn with_dispatch_hook<F>(dispatch: F) -> MockKvClient
where
F: Fn(&dyn Any) -> Result<Box<dyn Any>> + Send + Sync + 'static,
{
MockKvClient {
addr: String::new(),
dispatch: Some(Arc::new(dispatch)),
}
}
}
pub struct MockKvConnect;
pub struct MockCluster;
pub struct MockPdClient;
#[derive(new)]
pub struct MockPdClient {
client: MockKvClient,
}
#[async_trait]
impl KvClient for MockKvClient {
async fn dispatch(&self, _: &dyn Request) -> Result<Box<dyn Any>> {
unimplemented!()
async fn dispatch(&self, req: &dyn Request) -> Result<Box<dyn Any>> {
match &self.dispatch {
Some(f) => f(req.as_any()),
None => panic!("no dispatch hook set"),
}
}
}
@ -62,11 +79,18 @@ impl KvConnect for MockKvConnect {
fn connect(&self, address: &str) -> Result<Self::KvClient> {
Ok(MockKvClient {
addr: address.to_owned(),
dispatch: None,
})
}
}
impl MockPdClient {
pub fn default() -> MockPdClient {
MockPdClient {
client: MockKvClient::default(),
}
}
pub fn region1() -> Region {
let mut region = Region::default();
region.region.id = 1;
@ -99,12 +123,7 @@ impl PdClient for MockPdClient {
type KvClient = MockKvClient;
async fn map_region_to_store(self: Arc<Self>, region: Region) -> Result<Store> {
Ok(Store::new(
region,
Box::new(MockKvClient {
addr: String::new(),
}),
))
Ok(Store::new(region, Box::new(self.client.clone())))
}
async fn region_for_key(&self, key: &Key) -> Result<Region> {
@ -134,14 +153,3 @@ impl PdClient for MockPdClient {
unimplemented!()
}
}
impl DispatchHook for kvrpcpb::ResolveLockRequest {
fn dispatch_hook(&self) -> Option<BoxFuture<'static, Result<kvrpcpb::ResolveLockResponse>>> {
fail_point!("region-error", |_| {
let mut resp = kvrpcpb::ResolveLockResponse::default();
resp.region_error = Some(errorpb::Error::default());
Some(ready(Ok(resp)).boxed())
});
Some(ready(Ok(kvrpcpb::ResolveLockResponse::default())).boxed())
}
}

View File

@ -327,13 +327,13 @@ pub mod test {
let kv1 = client.kv_client(&addr1).unwrap();
let kv2 = client.kv_client(&addr2).unwrap();
let kv3 = client.kv_client(&addr2).unwrap();
assert!(kv1 != kv2);
assert_eq!(kv2, kv3);
assert!(kv1.addr != kv2.addr);
assert_eq!(kv2.addr, kv3.addr);
}
#[test]
fn test_group_keys_by_region() {
let client = MockPdClient;
let client = MockPdClient::default();
// FIXME This only works if the keys are in order of regions. Not sure if
// that is a reasonable constraint.
@ -367,7 +367,7 @@ pub mod test {
#[test]
fn test_stores_for_range() {
let client = Arc::new(MockPdClient);
let client = Arc::new(MockPdClient::default());
let k1: Key = vec![1].into();
let k2: Key = vec![5, 2].into();
let k3: Key = vec![11, 4].into();
@ -385,7 +385,7 @@ pub mod test {
#[test]
fn test_group_ranges_by_region() {
let client = Arc::new(MockPdClient);
let client = Arc::new(MockPdClient::default());
let k1: Key = vec![1].into();
let k2: Key = vec![5, 2].into();
let k3: Key = vec![11, 4].into();

View File

@ -473,36 +473,33 @@ impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
#[cfg(test)]
mod test {
use super::*;
use crate::{mock::MockPdClient, request::DispatchHook};
use crate::request::OPTIMISTIC_BACKOFF;
use futures::{
executor,
future::{ready, BoxFuture},
use crate::{
mock::{MockKvClient, MockPdClient},
request::OPTIMISTIC_BACKOFF,
};
use futures::executor;
use kvproto::kvrpcpb;
impl DispatchHook for kvrpcpb::RawScanRequest {
fn dispatch_hook(&self) -> Option<BoxFuture<'static, Result<kvrpcpb::RawScanResponse>>> {
assert!(self.key_only);
assert_eq!(self.limit, 10);
let mut resp = kvrpcpb::RawScanResponse::default();
for i in self.start_key[0]..self.end_key[0] {
let mut kv = kvrpcpb::KvPair::default();
kv.key = vec![i];
resp.kvs.push(kv);
}
Some(Box::pin(ready(Ok(resp))))
}
}
use std::any::Any;
#[test]
#[ignore]
fn test_raw_scan() {
let client = Arc::new(MockPdClient);
let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|req: &dyn Any| {
let req: &kvrpcpb::RawScanRequest = req.downcast_ref().unwrap();
assert!(req.key_only);
assert_eq!(req.limit, 10);
let mut resp = kvrpcpb::RawScanResponse::default();
for i in req.start_key[0]..req.end_key[0] {
let mut kv = kvrpcpb::KvPair::default();
kv.key = vec![i];
resp.kvs.push(kv);
}
Ok(Box::new(resp) as Box<dyn Any>)
},
)));
let start: Key = vec![1].into();
let end: Key = vec![50].into();

View File

@ -25,7 +25,7 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
/// several regions or a single RPC request is too large. Most of the fields in these requests
/// share the same content while `KeyData`, which contains keys (and associated data if any),
/// is the part which differs among the requests.
type KeyData;
type KeyData: Send;
fn execute(
self,
@ -61,28 +61,13 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
stores
.and_then(move |(key_data, store)| {
let request = self.make_rpc_request(key_data, &store);
let (req, err) = match request {
Ok(r) => (Some(r), None),
Err(e) => (None, Some(e)),
};
self.dispatch_hook()
.unwrap_or_else({
let req = req.clone();
|| {
async move {
match (req, err) {
(Some(req), None) => store.dispatch(&req).await.map(|r| *r),
(None, Some(err)) => Err(err),
_ => unreachable!(),
}
}
.boxed()
}
})
.map_ok(move |response| (req, response))
async move {
let request = request?;
let response = store.dispatch::<_, Self::RpcResponse>(&request).await?;
Ok((request, *response))
}
})
.map_ok(move |(request, mut response)| {
let request = request.unwrap();
if let Some(region_error) = response.region_error() {
return request.on_region_error(
region_error,
@ -270,20 +255,6 @@ pub fn store_stream_for_ranges<PdC: PdClient>(
.boxed()
}
/// Permits easy mocking of rpc calls.
pub trait DispatchHook: KvRequest {
fn dispatch_hook(&self) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
None
}
}
impl<T: KvRequest> DispatchHook for T {
#[cfg(test)]
default fn dispatch_hook(&self) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
None
}
}
pub trait KvRpcRequest: Default {
fn set_context(&mut self, context: kvrpcpb::Context);
}
@ -322,18 +293,18 @@ impl_kv_rpc_request!(PessimisticLockRequest);
#[cfg(test)]
mod test {
use super::*;
use crate::mock::MockPdClient;
use crate::mock::{MockKvClient, MockPdClient};
use async_trait::async_trait;
use futures::executor;
use grpcio::CallOption;
use kvproto::tikvpb::TikvClient;
use std::{any::Any, sync::Mutex};
use tikv_client_common::stats::RequestStats;
use tikv_client_common::stats::{tikv_stats, RequestStats};
#[test]
fn test_region_retry() {
#[derive(Clone)]
struct MockRpcResponse {}
struct MockRpcResponse;
impl HasError for MockRpcResponse {
fn error(&mut self) -> Option<Error> {
@ -357,17 +328,15 @@ mod test {
#[async_trait]
impl Request for MockKvRequest {
async fn dispatch(&self, _: &TikvClient, _: CallOption) -> Result<Box<dyn Any>> {
unreachable!()
Ok(Box::new(MockRpcResponse {}))
}
fn stats(&self) -> RequestStats {
unreachable!()
tikv_stats("mock")
}
}
impl DispatchHook for MockKvRequest {
fn dispatch_hook(&self) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
Some(future::ok(MockRpcResponse {}).boxed())
fn as_any(&self) -> &dyn Any {
self
}
}
@ -408,7 +377,9 @@ mod test {
test_invoking_count: invoking_count.clone(),
};
let pd_client = Arc::new(MockPdClient);
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box<dyn Any>),
)));
let region_backoff = NoJitterBackoff::new(1, 1, 3);
let lock_backoff = NoJitterBackoff::new(1, 1, 3);
let stream = request.retry_response_stream(pd_client, region_backoff, lock_backoff);

View File

@ -123,15 +123,27 @@ pub trait HasLocks {
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::MockPdClient;
use crate::mock::{MockKvClient, MockPdClient};
use futures::executor;
use kvproto::errorpb;
use std::any::Any;
#[test]
fn test_resolve_lock_with_retry() {
// Test resolve lock within retry limit
fail::cfg("region-error", "9*return").unwrap();
let client = Arc::new(MockPdClient);
let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|_: &dyn Any| {
fail::fail_point!("region-error", |_| {
let mut resp = kvrpcpb::ResolveLockResponse::default();
resp.region_error = Some(errorpb::Error::default());
Ok(Box::new(resp) as Box<dyn Any>)
});
Ok(Box::new(kvrpcpb::ResolveLockResponse::default()) as Box<dyn Any>)
},
)));
let key: Key = vec![1].into();
let region1 = MockPdClient::region1();
let resolved_region =
@ -140,7 +152,6 @@ mod tests {
// Test resolve lock over retry limit
fail::cfg("region-error", "10*return").unwrap();
let client = Arc::new(MockPdClient);
let key: Key = vec![100].into();
executor::block_on(resolve_lock_with_retry(key, 3, 4, client.clone()))
.expect_err("should return error");

View File

@ -8,9 +8,10 @@ use std::any::Any;
use tikv_client_common::stats::tikv_stats;
#[async_trait]
pub trait Request: Sync + Send + 'static {
pub trait Request: Any + Sync + Send + 'static {
async fn dispatch(&self, client: &TikvClient, options: CallOption) -> Result<Box<dyn Any>>;
fn stats(&self) -> RequestStats;
fn as_any(&self) -> &dyn Any;
}
macro_rules! impl_request {
@ -32,6 +33,10 @@ macro_rules! impl_request {
fn stats(&self) -> RequestStats {
tikv_stats($label)
}
fn as_any(&self) -> &dyn Any {
self
}
}
};
}