From 26e1a566e3c52e5e955df76354338406ef42c7c9 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 9 Sep 2020 15:21:15 +0800 Subject: [PATCH] add a mock PD Signed-off-by: ekexium --- Cargo.lock | 1 + mock-tikv/Cargo.toml | 3 +- mock-tikv/src/lib.rs | 15 +- mock-tikv/src/pd.rs | 313 ++++++++++++++++++++++++++++++++ mock-tikv/src/server.rs | 44 +---- mock-tikv/src/store.rs | 10 +- src/mock/mock_raw.rs | 228 ----------------------- src/mock/mock_rpcpd.rs | 71 -------- src/mock/mod.rs | 4 - src/proptests/mod.rs | 6 +- tests/mock_tikv_tests.rs | 98 ++++++++++ tikv-client-pd/src/cluster.rs | 1 - tikv-client-pd/src/timestamp.rs | 8 +- 13 files changed, 454 insertions(+), 348 deletions(-) create mode 100644 mock-tikv/src/pd.rs delete mode 100644 src/mock/mock_raw.rs delete mode 100644 src/mock/mock_rpcpd.rs create mode 100644 tests/mock_tikv_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 1a964f4..6e3d775 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -899,6 +899,7 @@ dependencies = [ "futures 0.3.5", "grpcio", "kvproto", + "log", "tikv-client-common", ] diff --git a/mock-tikv/Cargo.toml b/mock-tikv/Cargo.toml index c9ae470..fa81f51 100644 --- a/mock-tikv/Cargo.toml +++ b/mock-tikv/Cargo.toml @@ -8,4 +8,5 @@ futures = "0.3" grpcio = { version = "0.6", features = [ "secure", "prost-codec" ], default-features = false } kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "1e28226154c374788f38d3a542fc505cd74720f3", features = [ "prost-codec" ], default-features = false } derive-new = "0.5.8" -tikv-client-common = { path = "../tikv-client-common"} \ No newline at end of file +tikv-client-common = { path = "../tikv-client-common"} +log = "0.4" diff --git a/mock-tikv/src/lib.rs b/mock-tikv/src/lib.rs index 1fc38f4..cf43679 100644 --- a/mock-tikv/src/lib.rs +++ b/mock-tikv/src/lib.rs @@ -1,6 +1,19 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +mod pd; mod server; mod store; -pub use server::{start_server, MockTikv, PORT}; +pub use pd::{start_mock_pd_server, MockPd, MOCK_PD_PORT}; +pub use server::{start_mock_tikv_server, MockTikv, MOCK_TIKV_PORT}; pub use store::KvStore; + +#[macro_export] +macro_rules! spawn_unary_success { + ($ctx:ident, $req:ident, $resp:ident, $sink:ident) => { + let f = $sink + .success($resp) + .map_err(move |e| panic!("failed to reply {:?}: {:?}", $req, e)) + .map(|_| ()); + $ctx.spawn(f); + }; +} diff --git a/mock-tikv/src/pd.rs b/mock-tikv/src/pd.rs new file mode 100644 index 0000000..d8b88d0 --- /dev/null +++ b/mock-tikv/src/pd.rs @@ -0,0 +1,313 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::{spawn_unary_success, MOCK_TIKV_PORT}; +use futures::{FutureExt, StreamExt, TryFutureExt}; +use grpcio::{Environment, Server, ServerBuilder, WriteFlags}; +use kvproto::pdpb::*; +use std::sync::Arc; + +pub const MOCK_PD_PORT: u16 = 50021; +/// This is mock pd server, used with mock tikv server. +#[derive(Debug, Clone)] +pub struct MockPd { + ts: i64, +} + +impl MockPd { + fn new() -> MockPd { + MockPd { ts: 0 } + } + + fn region() -> kvproto::metapb::Region { + let mut meta_region = kvproto::metapb::Region::default(); + meta_region.set_end_key(vec![0xff; 20]); + meta_region.set_start_key(vec![0x00]); + meta_region.set_id(0); + meta_region.set_peers(vec![Self::leader()]); + meta_region + } + + fn leader() -> kvproto::metapb::Peer { + kvproto::metapb::Peer::default() + } + + fn store() -> kvproto::metapb::Store { + let mut store = kvproto::metapb::Store::default(); + store.set_address(format!("localhost:{}", MOCK_TIKV_PORT)); + // TODO: start_timestamp? + store + } +} + +pub fn start_mock_pd_server() -> Server { + let env = Arc::new(Environment::new(1)); + let mut server = ServerBuilder::new(env) + .register_service(create_pd(MockPd::new())) + .bind("localhost", MOCK_PD_PORT) + .build() + .unwrap(); + server.start(); + server +} + +impl Pd for MockPd { + fn get_members( + &mut self, + ctx: ::grpcio::RpcContext, + req: GetMembersRequest, + sink: ::grpcio::UnarySink, + ) { + let mut resp = GetMembersResponse::default(); + resp.set_header(ResponseHeader::default()); + let mut member = Member::default(); + member.set_name("mock tikv".to_owned()); + member.set_member_id(0); + member.set_client_urls(vec![format!("localhost:{}", MOCK_PD_PORT)]); + // member.set_peer_urls(vec![format!("localhost:{}", MOCK_PD_PORT)]); + resp.set_members(vec![member.clone()]); + resp.set_leader(member); + spawn_unary_success!(ctx, req, resp, sink); + } + + fn tso( + &mut self, + ctx: ::grpcio::RpcContext, + stream: ::grpcio::RequestStream, + sink: ::grpcio::DuplexSink, + ) { + let f = stream + .map(|_| { + let mut resp = TsoResponse::default(); + // TODO: make ts monotonic + resp.set_timestamp(Timestamp::default()); + Ok((resp, WriteFlags::default())) + }) + .forward(sink) + .map(|_| ()); + ctx.spawn(f); + } + + fn bootstrap( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: BootstrapRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn is_bootstrapped( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: IsBootstrappedRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn alloc_id( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: AllocIdRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn get_store( + &mut self, + ctx: ::grpcio::RpcContext, + req: GetStoreRequest, + sink: ::grpcio::UnarySink, + ) { + let mut resp = GetStoreResponse::default(); + resp.set_store(Self::store()); + spawn_unary_success!(ctx, req, resp, sink); + } + + fn put_store( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: PutStoreRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn get_all_stores( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: GetAllStoresRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn store_heartbeat( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: StoreHeartbeatRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn region_heartbeat( + &mut self, + _ctx: ::grpcio::RpcContext, + _stream: ::grpcio::RequestStream, + _sink: ::grpcio::DuplexSink, + ) { + todo!() + } + + fn get_region( + &mut self, + ctx: ::grpcio::RpcContext, + req: GetRegionRequest, + sink: ::grpcio::UnarySink, + ) { + let mut resp = GetRegionResponse::default(); + resp.set_region(Self::region()); + resp.set_leader(Self::leader()); + spawn_unary_success!(ctx, req, resp, sink); + } + + fn get_prev_region( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: GetRegionRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn get_region_by_id( + &mut self, + ctx: ::grpcio::RpcContext, + req: GetRegionByIdRequest, + sink: ::grpcio::UnarySink, + ) { + let mut resp = GetRegionResponse::default(); + resp.set_region(Self::region()); + resp.set_leader(Self::leader()); + spawn_unary_success!(ctx, req, resp, sink); + } + + fn scan_regions( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: ScanRegionsRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn ask_split( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: AskSplitRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn report_split( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: ReportSplitRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn ask_batch_split( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: AskBatchSplitRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn report_batch_split( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: ReportBatchSplitRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn get_cluster_config( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: GetClusterConfigRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn put_cluster_config( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: PutClusterConfigRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn scatter_region( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: ScatterRegionRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn get_gc_safe_point( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: GetGcSafePointRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn update_gc_safe_point( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: UpdateGcSafePointRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn update_service_gc_safe_point( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: UpdateServiceGcSafePointRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } + + fn sync_regions( + &mut self, + _ctx: ::grpcio::RpcContext, + _stream: ::grpcio::RequestStream, + _sink: ::grpcio::DuplexSink, + ) { + todo!() + } + + fn get_operator( + &mut self, + _ctx: ::grpcio::RpcContext, + _req: GetOperatorRequest, + _sink: ::grpcio::UnarySink, + ) { + todo!() + } +} diff --git a/mock-tikv/src/server.rs b/mock-tikv/src/server.rs index f9e0a68..268646d 100644 --- a/mock-tikv/src/server.rs +++ b/mock-tikv/src/server.rs @@ -1,19 +1,19 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. -use crate::KvStore; +use crate::{spawn_unary_success, KvStore}; use derive_new::new; use futures::{FutureExt, TryFutureExt}; use grpcio::{Environment, Server, ServerBuilder}; use kvproto::{kvrpcpb::*, tikvpb::*}; use std::sync::Arc; -pub const PORT: u16 = 50019; +pub const MOCK_TIKV_PORT: u16 = 50019; -pub fn start_server() -> Server { +pub fn start_mock_tikv_server() -> Server { let env = Arc::new(Environment::new(1)); let mut server = ServerBuilder::new(env) .register_service(create_tikv(MockTikv::new(KvStore::new()))) - .bind("localhost", PORT) + .bind("localhost", MOCK_TIKV_PORT) .build() .unwrap(); server.start(); @@ -178,11 +178,7 @@ impl Tikv for MockTikv { ) { let mut resp = RawGetResponse::default(); resp.set_value(self.inner.raw_get(req.get_key())); - let f = sink - .success(resp) - .map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e)) - .map(|_| ()); - ctx.spawn(f) + spawn_unary_success!(ctx, req, resp, sink); } fn raw_batch_get( @@ -193,11 +189,7 @@ impl Tikv for MockTikv { ) { let mut resp = kvproto::kvrpcpb::RawBatchGetResponse::default(); resp.set_pairs(self.inner.raw_batch_get(req.get_keys())); - let f = sink - .success(resp) - .map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e)) - .map(|_| ()); - ctx.spawn(f) + spawn_unary_success!(ctx, req, resp, sink); } fn raw_put( @@ -208,11 +200,7 @@ impl Tikv for MockTikv { ) { self.inner.raw_put(req.get_key(), req.get_value()); let resp = RawPutResponse::default(); - let f = sink - .success(resp) - .map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e)) - .map(|_| ()); - ctx.spawn(f) + spawn_unary_success!(ctx, req, resp, sink); } fn raw_batch_put( @@ -224,11 +212,7 @@ impl Tikv for MockTikv { let pairs = req.get_pairs(); self.inner.raw_batch_put(pairs); let resp = RawBatchPutResponse::default(); - let f = sink - .success(resp) - .map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e)) - .map(|_| ()); - ctx.spawn(f) + spawn_unary_success!(ctx, req, resp, sink); } fn raw_delete( @@ -243,11 +227,7 @@ impl Tikv for MockTikv { if res.is_err() { resp.set_error("Key not exist".to_owned()); } - let f = sink - .success(resp) - .map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e)) - .map(|_| ()); - ctx.spawn(f) + spawn_unary_success!(ctx, req, resp, sink); } fn raw_batch_delete( @@ -265,11 +245,7 @@ impl Tikv for MockTikv { res.err().unwrap().join(", ") )); } - let f = sink - .success(resp) - .map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e)) - .map(|_| ()); - ctx.spawn(f) + spawn_unary_success!(ctx, req, resp, sink); } fn raw_scan( diff --git a/mock-tikv/src/store.rs b/mock-tikv/src/store.rs index f694bdc..cfdf374 100644 --- a/mock-tikv/src/store.rs +++ b/mock-tikv/src/store.rs @@ -11,6 +11,12 @@ pub struct KvStore { data: Arc, Vec>>>, } +impl Default for KvStore { + fn default() -> Self { + Self::new() + } +} + impl KvStore { pub fn new() -> KvStore { KvStore { @@ -18,10 +24,6 @@ impl KvStore { } } - pub fn default() -> KvStore { - Self::new() - } - pub fn raw_get(&self, key: &[u8]) -> Vec { let data = self.data.read().unwrap(); data.get(key).unwrap_or(&vec![]).to_vec() diff --git a/src/mock/mock_raw.rs b/src/mock/mock_raw.rs deleted file mode 100644 index 0482e66..0000000 --- a/src/mock/mock_raw.rs +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. - -use super::MockRpcPdClient; -use crate::{raw::requests, request::KvRequest, ColumnFamily}; -use grpcio::Environment; -pub use mock_tikv::{start_server, MockTikv, PORT}; -use std::{sync::Arc, u32}; -use tikv_client_common::{ - security::SecurityManager, BoundRange, Config, Error, Key, KvPair, Result, Value, -}; -use tikv_client_store::{KvConnect, TikvConnect}; - -const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; - -// TODO: remove this struct later. This is only used to verify mock tikv server. -#[derive(Clone)] -pub struct MockRawClient { - rpc: Arc, - cf: Option, - key_only: bool, -} - -impl MockRawClient { - pub async fn new(_config: Config) -> Result { - let rpc_client = TikvConnect::new( - Arc::new(Environment::new(1)), - Arc::new(SecurityManager::default()), - ) - .connect(format!("localhost:{}", PORT).as_str()) - .unwrap(); - let rpc = Arc::new(MockRpcPdClient::new(rpc_client)); - Ok(MockRawClient { - rpc, - cf: None, - key_only: false, - }) - } - - pub fn with_cf(&self, cf: ColumnFamily) -> MockRawClient { - MockRawClient { - rpc: self.rpc.clone(), - cf: Some(cf), - key_only: self.key_only, - } - } - - pub fn with_key_only(&self, key_only: bool) -> MockRawClient { - MockRawClient { - rpc: self.rpc.clone(), - cf: self.cf.clone(), - key_only, - } - } - - pub async fn get(&self, key: impl Into) -> Result> { - requests::new_raw_get_request(key, self.cf.clone()) - .execute(self.rpc.clone()) - .await - } - - pub async fn batch_get( - &self, - keys: impl IntoIterator>, - ) -> Result> { - requests::new_raw_batch_get_request(keys, self.cf.clone()) - .execute(self.rpc.clone()) - .await - } - - pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { - requests::new_raw_put_request(key, value, self.cf.clone()) - .execute(self.rpc.clone()) - .await - } - - pub async fn batch_put( - &self, - pairs: impl IntoIterator>, - ) -> Result<()> { - requests::new_raw_batch_put_request(pairs, self.cf.clone()) - .execute(self.rpc.clone()) - .await - } - - pub async fn delete(&self, key: impl Into) -> Result<()> { - requests::new_raw_delete_request(key, self.cf.clone()) - .execute(self.rpc.clone()) - .await - } - - pub async fn batch_delete(&self, keys: impl IntoIterator>) -> Result<()> { - requests::new_raw_batch_delete_request(keys, self.cf.clone()) - .execute(self.rpc.clone()) - .await - } - - pub async fn delete_range(&self, range: impl Into) -> Result<()> { - requests::new_raw_delete_range_request(range, self.cf.clone()) - .execute(self.rpc.clone()) - .await - } - - pub async fn scan(&self, range: impl Into, limit: u32) -> Result> { - if limit > MAX_RAW_KV_SCAN_LIMIT { - return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT)); - } - - requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) - .execute(self.rpc.clone()) - .await - } - - pub async fn batch_scan( - &self, - ranges: impl IntoIterator>, - each_limit: u32, - ) -> Result> { - if each_limit > MAX_RAW_KV_SCAN_LIMIT { - return Err(Error::max_scan_limit_exceeded( - each_limit, - MAX_RAW_KV_SCAN_LIMIT, - )); - } - - requests::new_raw_batch_scan_request(ranges, each_limit, self.key_only, self.cf.clone()) - .execute(self.rpc.clone()) - .await - } -} - -#[cfg(test)] -mod test { - - use super::MockRawClient; - use grpcio::redirect_log; - - use mock_tikv::start_server; - use simple_logger::SimpleLogger; - use tikv_client_common::{Config, KvPair}; - - #[tokio::test] - async fn test_raw_put_get() { - SimpleLogger::new().init().unwrap(); - redirect_log(); - - let mut server = start_server(); - let mock_client = MockRawClient::new(Config::default()).await.unwrap(); - - // empty; get non-existent key - let res = mock_client.get("k1".to_owned()).await; - assert_eq!(res.unwrap().unwrap(), vec![]); - - // empty; put then batch_get - let _ = mock_client - .put("k1".to_owned(), "v1".to_owned()) - .await - .unwrap(); - let _ = mock_client - .put("k2".to_owned(), "v2".to_owned()) - .await - .unwrap(); - - let res = mock_client - .batch_get(vec!["k1".to_owned(), "k2".to_owned(), "k3".to_owned()]) - .await - .unwrap(); - assert_eq!(res[0].1, "v1".as_bytes()); - assert_eq!(res[1].1, "v2".as_bytes()); - assert_eq!(res[2].1, "".as_bytes()); - - // k1,k2; batch_put then batch_get - let _ = mock_client - .batch_put(vec![ - KvPair::new("k3".to_owned(), "v3".to_owned()), - KvPair::new("k4".to_owned(), "v4".to_owned()), - ]) - .await - .unwrap(); - - let res = mock_client - .batch_get(vec!["k4".to_owned(), "k3".to_owned()]) - .await - .unwrap(); - assert_eq!(res[0].1, "v4".as_bytes()); - assert_eq!(res[1].1, "v3".as_bytes()); - - // k1,k2,k3,k4; delete then get - let res = mock_client.delete("k3".to_owned()).await; - assert!(res.is_ok()); - - let res = mock_client.delete("key-not-exist".to_owned()).await; - assert!(res.is_err()); - - let res = mock_client.get("k3".to_owned()).await; - assert_eq!(res.unwrap().unwrap(), "".as_bytes()); - - // k1,k2,k4; batch_delete then batch_get - let res = mock_client - .batch_delete(vec![ - "k1".to_owned(), - "k2".to_owned(), - "k3".to_owned(), - "k4".to_owned(), - ]) - .await; - assert!(res.is_err()); - - let res = mock_client - .batch_delete(vec!["k1".to_owned(), "k2".to_owned(), "k4".to_owned()]) - .await; - assert!(res.is_ok()); - - let res = mock_client - .batch_get(vec![ - "k1".to_owned(), - "k2".to_owned(), - "k3".to_owned(), - "k4".to_owned(), - ]) - .await - .unwrap(); - for i in 0..3 { - assert_eq!(res[i].1, "".as_bytes()); - } - - let _ = server.shutdown().await; - } -} diff --git a/src/mock/mock_rpcpd.rs b/src/mock/mock_rpcpd.rs deleted file mode 100644 index 2da8c2c..0000000 --- a/src/mock/mock_rpcpd.rs +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. - -use crate::{pd::PdClient, Timestamp}; -use futures::future::ready; -use kvproto::metapb; -use std::time::Duration; -use tikv_client_store::{KvRpcClient, Region, Store}; - -// Mocked pd client that always returns a TikvClient -pub struct MockRpcPdClient { - client: KvRpcClient, -} - -impl MockRpcPdClient { - pub fn new(client: KvRpcClient) -> MockRpcPdClient { - MockRpcPdClient { client } - } - - pub fn region() -> Region { - let mut region = Region::default(); - region.region.id = 1; - region.region.set_start_key(vec![0]); - region.region.set_end_key(vec![250, 250]); - - let mut leader = metapb::Peer::default(); - leader.store_id = 41; - region.leader = Some(leader); - - region - } -} - -impl PdClient for MockRpcPdClient { - type KvClient = KvRpcClient; - - fn map_region_to_store( - self: std::sync::Arc, - region: tikv_client_store::Region, - ) -> futures::future::BoxFuture< - 'static, - tikv_client_common::Result>, - > { - Box::pin(ready(Ok(Store::new( - region, - self.client.clone(), - Duration::from_secs(60), - )))) - } - - fn region_for_key( - &self, - _key: &tikv_client_common::Key, - ) -> futures::future::BoxFuture<'static, tikv_client_common::Result> - { - Box::pin(ready(Ok(Self::region()))) - } - - fn region_for_id( - &self, - _id: tikv_client_store::RegionId, - ) -> futures::future::BoxFuture<'static, tikv_client_common::Result> - { - Box::pin(ready(Ok(Self::region()))) - } - - fn get_timestamp( - self: std::sync::Arc, - ) -> futures::future::BoxFuture<'static, tikv_client_common::Result> { - todo!() - } -} diff --git a/src/mock/mod.rs b/src/mock/mod.rs index 2cce33b..bd8b216 100644 --- a/src/mock/mod.rs +++ b/src/mock/mod.rs @@ -7,13 +7,9 @@ mod mock_kv; mod mock_pd; -mod mock_raw; -mod mock_rpcpd; pub use mock_kv::{MockKvClient, MockKvConnect}; pub use mock_pd::{pd_rpc_client, MockPdClient}; -pub use mock_raw::MockRawClient; -pub use mock_rpcpd::MockRpcPdClient; use crate::{request::DispatchHook, Result}; use fail::fail_point; diff --git a/src/proptests/mod.rs b/src/proptests/mod.rs index d67783d..88c60ae 100644 --- a/src/proptests/mod.rs +++ b/src/proptests/mod.rs @@ -3,11 +3,14 @@ // Note: This module exists and includes some integration tests because the `/tests/` // directory tests don't have access to `cfg(tests)` functions and we don't want to force // users to depend on proptest or manually enable features to test. + +/* + * Temporarily disabled + use proptest::strategy::Strategy; use std::env::var; mod raw; - pub(crate) const ENV_PD_ADDRS: &str = "PD_ADDRS"; pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16; @@ -26,3 +29,4 @@ pub fn pd_addrs() -> Vec { .map(From::from) .collect() } +*/ diff --git a/tests/mock_tikv_tests.rs b/tests/mock_tikv_tests.rs new file mode 100644 index 0000000..04bed3c --- /dev/null +++ b/tests/mock_tikv_tests.rs @@ -0,0 +1,98 @@ +#[cfg(test)] +mod test { + use grpcio::redirect_log; + use log::debug; + use mock_tikv::{start_mock_pd_server, start_mock_tikv_server, MOCK_PD_PORT}; + use simple_logger::SimpleLogger; + use tikv_client::RawClient; + use tikv_client_common::{Config, KvPair}; + + #[tokio::test] + async fn test_raw_put_get() { + SimpleLogger::new().init().unwrap(); + redirect_log(); + + let mut tikv_server = start_mock_tikv_server(); + let _pd_server = start_mock_pd_server(); + + let config = Config::new(vec![format!("localhost:{}", MOCK_PD_PORT)]); + let client = RawClient::new(config).await.unwrap(); + + // empty; get non-existent key + let res = client.get("k1".to_owned()).await; + assert_eq!(res.unwrap().unwrap(), vec![]); + + // empty; put then batch_get + let _ = client.put("k1".to_owned(), "v1".to_owned()).await.unwrap(); + let _ = client.put("k2".to_owned(), "v2".to_owned()).await.unwrap(); + + let res = client + .batch_get(vec!["k1".to_owned(), "k2".to_owned(), "k3".to_owned()]) + .await + .unwrap(); + assert_eq!(res[0].1, "v1".as_bytes()); + assert_eq!(res[1].1, "v2".as_bytes()); + assert_eq!(res[2].1, "".as_bytes()); + + // k1,k2; batch_put then batch_get + let _ = client + .batch_put(vec![ + KvPair::new("k3".to_owned(), "v3".to_owned()), + KvPair::new("k4".to_owned(), "v4".to_owned()), + ]) + .await + .unwrap(); + + let res = client + .batch_get(vec!["k4".to_owned(), "k3".to_owned()]) + .await + .unwrap(); + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + // k1,k2,k3,k4; delete then get + let res = client.delete("k3".to_owned()).await; + assert!(res.is_ok()); + + let res = client.delete("key-not-exist".to_owned()).await; + assert!(res.is_err()); + + let res = client.get("k3".to_owned()).await; + assert_eq!(res.unwrap().unwrap(), "".as_bytes()); + + // k1,k2,k4; batch_delete then batch_get + let res = client + .batch_delete(vec![ + "k1".to_owned(), + "k2".to_owned(), + "k3".to_owned(), + "k4".to_owned(), + ]) + .await; + assert!(res.is_err()); + + let res = client + .batch_delete(vec!["k1".to_owned(), "k2".to_owned(), "k4".to_owned()]) + .await; + assert!(res.is_ok()); + + let res = client + .batch_get(vec![ + "k1".to_owned(), + "k2".to_owned(), + "k3".to_owned(), + "k4".to_owned(), + ]) + .await + .unwrap(); + for i in 0..3 { + assert_eq!(res[i].1, "".as_bytes()); + } + + debug!("Pass all tests"); + + let _ = tikv_server.shutdown().await; + // FIXME: shutdown PD server + // let _ = pd_server.shutdown().await; + } +} diff --git a/tikv-client-pd/src/cluster.rs b/tikv-client-pd/src/cluster.rs index 35f9bed..8f08f4b 100644 --- a/tikv-client-pd/src/cluster.rs +++ b/tikv-client-pd/src/cluster.rs @@ -85,7 +85,6 @@ impl Connection { ) -> Result { let members = self.validate_endpoints(endpoints, timeout).await?; let (client, members) = self.try_connect_leader(&members, timeout).await?; - let id = members.get_header().get_cluster_id(); let tso = TimestampOracle::new(id, &client)?; let cluster = Cluster { diff --git a/tikv-client-pd/src/timestamp.rs b/tikv-client-pd/src/timestamp.rs index 95c4525..944c92b 100644 --- a/tikv-client-pd/src/timestamp.rs +++ b/tikv-client-pd/src/timestamp.rs @@ -108,12 +108,14 @@ async fn run_tso( allocate_timestamps(&resp, &mut pending_requests)?; } - Err(Error::internal_error("TSO stream terminated")) + // TODO: distinguish between unexpected stream termination and expected end of test + info!("TSO stream terminated"); + Ok(()) }; let (send_res, recv_res): (_, Result<()>) = join!(send_requests, receive_and_handle_responses); - error!("TSO send error: {:?}", send_res); - error!("TSO receive error: {:?}", recv_res); + info!("TSO send termination: {:?}", send_res); + info!("TSO receive termination: {:?}", recv_res); } struct RequestGroup {