add a mock PD

Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
ekexium 2020-09-09 15:21:15 +08:00
parent 44cc11dc9c
commit 26e1a566e3
13 changed files with 454 additions and 348 deletions

1
Cargo.lock generated
View File

@ -899,6 +899,7 @@ dependencies = [
"futures 0.3.5",
"grpcio",
"kvproto",
"log",
"tikv-client-common",
]

View File

@ -8,4 +8,5 @@ futures = "0.3"
grpcio = { version = "0.6", features = [ "secure", "prost-codec" ], default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "1e28226154c374788f38d3a542fc505cd74720f3", features = [ "prost-codec" ], default-features = false }
derive-new = "0.5.8"
tikv-client-common = { path = "../tikv-client-common"}
tikv-client-common = { path = "../tikv-client-common"}
log = "0.4"

View File

@ -1,6 +1,19 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
mod pd;
mod server;
mod store;
pub use server::{start_server, MockTikv, PORT};
pub use pd::{start_mock_pd_server, MockPd, MOCK_PD_PORT};
pub use server::{start_mock_tikv_server, MockTikv, MOCK_TIKV_PORT};
pub use store::KvStore;
#[macro_export]
macro_rules! spawn_unary_success {
($ctx:ident, $req:ident, $resp:ident, $sink:ident) => {
let f = $sink
.success($resp)
.map_err(move |e| panic!("failed to reply {:?}: {:?}", $req, e))
.map(|_| ());
$ctx.spawn(f);
};
}

313
mock-tikv/src/pd.rs Normal file
View File

@ -0,0 +1,313 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{spawn_unary_success, MOCK_TIKV_PORT};
use futures::{FutureExt, StreamExt, TryFutureExt};
use grpcio::{Environment, Server, ServerBuilder, WriteFlags};
use kvproto::pdpb::*;
use std::sync::Arc;
pub const MOCK_PD_PORT: u16 = 50021;
/// This is mock pd server, used with mock tikv server.
#[derive(Debug, Clone)]
pub struct MockPd {
ts: i64,
}
impl MockPd {
fn new() -> MockPd {
MockPd { ts: 0 }
}
fn region() -> kvproto::metapb::Region {
let mut meta_region = kvproto::metapb::Region::default();
meta_region.set_end_key(vec![0xff; 20]);
meta_region.set_start_key(vec![0x00]);
meta_region.set_id(0);
meta_region.set_peers(vec![Self::leader()]);
meta_region
}
fn leader() -> kvproto::metapb::Peer {
kvproto::metapb::Peer::default()
}
fn store() -> kvproto::metapb::Store {
let mut store = kvproto::metapb::Store::default();
store.set_address(format!("localhost:{}", MOCK_TIKV_PORT));
// TODO: start_timestamp?
store
}
}
pub fn start_mock_pd_server() -> Server {
let env = Arc::new(Environment::new(1));
let mut server = ServerBuilder::new(env)
.register_service(create_pd(MockPd::new()))
.bind("localhost", MOCK_PD_PORT)
.build()
.unwrap();
server.start();
server
}
impl Pd for MockPd {
fn get_members(
&mut self,
ctx: ::grpcio::RpcContext,
req: GetMembersRequest,
sink: ::grpcio::UnarySink<GetMembersResponse>,
) {
let mut resp = GetMembersResponse::default();
resp.set_header(ResponseHeader::default());
let mut member = Member::default();
member.set_name("mock tikv".to_owned());
member.set_member_id(0);
member.set_client_urls(vec![format!("localhost:{}", MOCK_PD_PORT)]);
// member.set_peer_urls(vec![format!("localhost:{}", MOCK_PD_PORT)]);
resp.set_members(vec![member.clone()]);
resp.set_leader(member);
spawn_unary_success!(ctx, req, resp, sink);
}
fn tso(
&mut self,
ctx: ::grpcio::RpcContext,
stream: ::grpcio::RequestStream<TsoRequest>,
sink: ::grpcio::DuplexSink<TsoResponse>,
) {
let f = stream
.map(|_| {
let mut resp = TsoResponse::default();
// TODO: make ts monotonic
resp.set_timestamp(Timestamp::default());
Ok((resp, WriteFlags::default()))
})
.forward(sink)
.map(|_| ());
ctx.spawn(f);
}
fn bootstrap(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: BootstrapRequest,
_sink: ::grpcio::UnarySink<BootstrapResponse>,
) {
todo!()
}
fn is_bootstrapped(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: IsBootstrappedRequest,
_sink: ::grpcio::UnarySink<IsBootstrappedResponse>,
) {
todo!()
}
fn alloc_id(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: AllocIdRequest,
_sink: ::grpcio::UnarySink<AllocIdResponse>,
) {
todo!()
}
fn get_store(
&mut self,
ctx: ::grpcio::RpcContext,
req: GetStoreRequest,
sink: ::grpcio::UnarySink<GetStoreResponse>,
) {
let mut resp = GetStoreResponse::default();
resp.set_store(Self::store());
spawn_unary_success!(ctx, req, resp, sink);
}
fn put_store(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: PutStoreRequest,
_sink: ::grpcio::UnarySink<PutStoreResponse>,
) {
todo!()
}
fn get_all_stores(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: GetAllStoresRequest,
_sink: ::grpcio::UnarySink<GetAllStoresResponse>,
) {
todo!()
}
fn store_heartbeat(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: StoreHeartbeatRequest,
_sink: ::grpcio::UnarySink<StoreHeartbeatResponse>,
) {
todo!()
}
fn region_heartbeat(
&mut self,
_ctx: ::grpcio::RpcContext,
_stream: ::grpcio::RequestStream<RegionHeartbeatRequest>,
_sink: ::grpcio::DuplexSink<RegionHeartbeatResponse>,
) {
todo!()
}
fn get_region(
&mut self,
ctx: ::grpcio::RpcContext,
req: GetRegionRequest,
sink: ::grpcio::UnarySink<GetRegionResponse>,
) {
let mut resp = GetRegionResponse::default();
resp.set_region(Self::region());
resp.set_leader(Self::leader());
spawn_unary_success!(ctx, req, resp, sink);
}
fn get_prev_region(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: GetRegionRequest,
_sink: ::grpcio::UnarySink<GetRegionResponse>,
) {
todo!()
}
fn get_region_by_id(
&mut self,
ctx: ::grpcio::RpcContext,
req: GetRegionByIdRequest,
sink: ::grpcio::UnarySink<GetRegionResponse>,
) {
let mut resp = GetRegionResponse::default();
resp.set_region(Self::region());
resp.set_leader(Self::leader());
spawn_unary_success!(ctx, req, resp, sink);
}
fn scan_regions(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: ScanRegionsRequest,
_sink: ::grpcio::UnarySink<ScanRegionsResponse>,
) {
todo!()
}
fn ask_split(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: AskSplitRequest,
_sink: ::grpcio::UnarySink<AskSplitResponse>,
) {
todo!()
}
fn report_split(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: ReportSplitRequest,
_sink: ::grpcio::UnarySink<ReportSplitResponse>,
) {
todo!()
}
fn ask_batch_split(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: AskBatchSplitRequest,
_sink: ::grpcio::UnarySink<AskBatchSplitResponse>,
) {
todo!()
}
fn report_batch_split(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: ReportBatchSplitRequest,
_sink: ::grpcio::UnarySink<ReportBatchSplitResponse>,
) {
todo!()
}
fn get_cluster_config(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: GetClusterConfigRequest,
_sink: ::grpcio::UnarySink<GetClusterConfigResponse>,
) {
todo!()
}
fn put_cluster_config(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: PutClusterConfigRequest,
_sink: ::grpcio::UnarySink<PutClusterConfigResponse>,
) {
todo!()
}
fn scatter_region(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: ScatterRegionRequest,
_sink: ::grpcio::UnarySink<ScatterRegionResponse>,
) {
todo!()
}
fn get_gc_safe_point(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: GetGcSafePointRequest,
_sink: ::grpcio::UnarySink<GetGcSafePointResponse>,
) {
todo!()
}
fn update_gc_safe_point(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: UpdateGcSafePointRequest,
_sink: ::grpcio::UnarySink<UpdateGcSafePointResponse>,
) {
todo!()
}
fn update_service_gc_safe_point(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: UpdateServiceGcSafePointRequest,
_sink: ::grpcio::UnarySink<UpdateServiceGcSafePointResponse>,
) {
todo!()
}
fn sync_regions(
&mut self,
_ctx: ::grpcio::RpcContext,
_stream: ::grpcio::RequestStream<SyncRegionRequest>,
_sink: ::grpcio::DuplexSink<SyncRegionResponse>,
) {
todo!()
}
fn get_operator(
&mut self,
_ctx: ::grpcio::RpcContext,
_req: GetOperatorRequest,
_sink: ::grpcio::UnarySink<GetOperatorResponse>,
) {
todo!()
}
}

View File

@ -1,19 +1,19 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use crate::KvStore;
use crate::{spawn_unary_success, KvStore};
use derive_new::new;
use futures::{FutureExt, TryFutureExt};
use grpcio::{Environment, Server, ServerBuilder};
use kvproto::{kvrpcpb::*, tikvpb::*};
use std::sync::Arc;
pub const PORT: u16 = 50019;
pub const MOCK_TIKV_PORT: u16 = 50019;
pub fn start_server() -> Server {
pub fn start_mock_tikv_server() -> Server {
let env = Arc::new(Environment::new(1));
let mut server = ServerBuilder::new(env)
.register_service(create_tikv(MockTikv::new(KvStore::new())))
.bind("localhost", PORT)
.bind("localhost", MOCK_TIKV_PORT)
.build()
.unwrap();
server.start();
@ -178,11 +178,7 @@ impl Tikv for MockTikv {
) {
let mut resp = RawGetResponse::default();
resp.set_value(self.inner.raw_get(req.get_key()));
let f = sink
.success(resp)
.map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e))
.map(|_| ());
ctx.spawn(f)
spawn_unary_success!(ctx, req, resp, sink);
}
fn raw_batch_get(
@ -193,11 +189,7 @@ impl Tikv for MockTikv {
) {
let mut resp = kvproto::kvrpcpb::RawBatchGetResponse::default();
resp.set_pairs(self.inner.raw_batch_get(req.get_keys()));
let f = sink
.success(resp)
.map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e))
.map(|_| ());
ctx.spawn(f)
spawn_unary_success!(ctx, req, resp, sink);
}
fn raw_put(
@ -208,11 +200,7 @@ impl Tikv for MockTikv {
) {
self.inner.raw_put(req.get_key(), req.get_value());
let resp = RawPutResponse::default();
let f = sink
.success(resp)
.map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e))
.map(|_| ());
ctx.spawn(f)
spawn_unary_success!(ctx, req, resp, sink);
}
fn raw_batch_put(
@ -224,11 +212,7 @@ impl Tikv for MockTikv {
let pairs = req.get_pairs();
self.inner.raw_batch_put(pairs);
let resp = RawBatchPutResponse::default();
let f = sink
.success(resp)
.map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e))
.map(|_| ());
ctx.spawn(f)
spawn_unary_success!(ctx, req, resp, sink);
}
fn raw_delete(
@ -243,11 +227,7 @@ impl Tikv for MockTikv {
if res.is_err() {
resp.set_error("Key not exist".to_owned());
}
let f = sink
.success(resp)
.map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e))
.map(|_| ());
ctx.spawn(f)
spawn_unary_success!(ctx, req, resp, sink);
}
fn raw_batch_delete(
@ -265,11 +245,7 @@ impl Tikv for MockTikv {
res.err().unwrap().join(", ")
));
}
let f = sink
.success(resp)
.map_err(move |e| panic!("failed to reply {:?}: {:?}", req, e))
.map(|_| ());
ctx.spawn(f)
spawn_unary_success!(ctx, req, resp, sink);
}
fn raw_scan(

View File

@ -11,6 +11,12 @@ pub struct KvStore {
data: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>,
}
impl Default for KvStore {
fn default() -> Self {
Self::new()
}
}
impl KvStore {
pub fn new() -> KvStore {
KvStore {
@ -18,10 +24,6 @@ impl KvStore {
}
}
pub fn default() -> KvStore {
Self::new()
}
pub fn raw_get(&self, key: &[u8]) -> Vec<u8> {
let data = self.data.read().unwrap();
data.get(key).unwrap_or(&vec![]).to_vec()

View File

@ -1,228 +0,0 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use super::MockRpcPdClient;
use crate::{raw::requests, request::KvRequest, ColumnFamily};
use grpcio::Environment;
pub use mock_tikv::{start_server, MockTikv, PORT};
use std::{sync::Arc, u32};
use tikv_client_common::{
security::SecurityManager, BoundRange, Config, Error, Key, KvPair, Result, Value,
};
use tikv_client_store::{KvConnect, TikvConnect};
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
// TODO: remove this struct later. This is only used to verify mock tikv server.
#[derive(Clone)]
pub struct MockRawClient {
rpc: Arc<MockRpcPdClient>,
cf: Option<ColumnFamily>,
key_only: bool,
}
impl MockRawClient {
pub async fn new(_config: Config) -> Result<MockRawClient> {
let rpc_client = TikvConnect::new(
Arc::new(Environment::new(1)),
Arc::new(SecurityManager::default()),
)
.connect(format!("localhost:{}", PORT).as_str())
.unwrap();
let rpc = Arc::new(MockRpcPdClient::new(rpc_client));
Ok(MockRawClient {
rpc,
cf: None,
key_only: false,
})
}
pub fn with_cf(&self, cf: ColumnFamily) -> MockRawClient {
MockRawClient {
rpc: self.rpc.clone(),
cf: Some(cf),
key_only: self.key_only,
}
}
pub fn with_key_only(&self, key_only: bool) -> MockRawClient {
MockRawClient {
rpc: self.rpc.clone(),
cf: self.cf.clone(),
key_only,
}
}
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
requests::new_raw_get_request(key, self.cf.clone())
.execute(self.rpc.clone())
.await
}
pub async fn batch_get(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
requests::new_raw_batch_get_request(keys, self.cf.clone())
.execute(self.rpc.clone())
.await
}
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
requests::new_raw_put_request(key, value, self.cf.clone())
.execute(self.rpc.clone())
.await
}
pub async fn batch_put(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
requests::new_raw_batch_put_request(pairs, self.cf.clone())
.execute(self.rpc.clone())
.await
}
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
requests::new_raw_delete_request(key, self.cf.clone())
.execute(self.rpc.clone())
.await
}
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
requests::new_raw_batch_delete_request(keys, self.cf.clone())
.execute(self.rpc.clone())
.await
}
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
requests::new_raw_delete_range_request(range, self.cf.clone())
.execute(self.rpc.clone())
.await
}
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
if limit > MAX_RAW_KV_SCAN_LIMIT {
return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT));
}
requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone())
.execute(self.rpc.clone())
.await
}
pub async fn batch_scan(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
) -> Result<Vec<KvPair>> {
if each_limit > MAX_RAW_KV_SCAN_LIMIT {
return Err(Error::max_scan_limit_exceeded(
each_limit,
MAX_RAW_KV_SCAN_LIMIT,
));
}
requests::new_raw_batch_scan_request(ranges, each_limit, self.key_only, self.cf.clone())
.execute(self.rpc.clone())
.await
}
}
#[cfg(test)]
mod test {
use super::MockRawClient;
use grpcio::redirect_log;
use mock_tikv::start_server;
use simple_logger::SimpleLogger;
use tikv_client_common::{Config, KvPair};
#[tokio::test]
async fn test_raw_put_get() {
SimpleLogger::new().init().unwrap();
redirect_log();
let mut server = start_server();
let mock_client = MockRawClient::new(Config::default()).await.unwrap();
// empty; get non-existent key
let res = mock_client.get("k1".to_owned()).await;
assert_eq!(res.unwrap().unwrap(), vec![]);
// empty; put then batch_get
let _ = mock_client
.put("k1".to_owned(), "v1".to_owned())
.await
.unwrap();
let _ = mock_client
.put("k2".to_owned(), "v2".to_owned())
.await
.unwrap();
let res = mock_client
.batch_get(vec!["k1".to_owned(), "k2".to_owned(), "k3".to_owned()])
.await
.unwrap();
assert_eq!(res[0].1, "v1".as_bytes());
assert_eq!(res[1].1, "v2".as_bytes());
assert_eq!(res[2].1, "".as_bytes());
// k1,k2; batch_put then batch_get
let _ = mock_client
.batch_put(vec![
KvPair::new("k3".to_owned(), "v3".to_owned()),
KvPair::new("k4".to_owned(), "v4".to_owned()),
])
.await
.unwrap();
let res = mock_client
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
.await
.unwrap();
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());
// k1,k2,k3,k4; delete then get
let res = mock_client.delete("k3".to_owned()).await;
assert!(res.is_ok());
let res = mock_client.delete("key-not-exist".to_owned()).await;
assert!(res.is_err());
let res = mock_client.get("k3".to_owned()).await;
assert_eq!(res.unwrap().unwrap(), "".as_bytes());
// k1,k2,k4; batch_delete then batch_get
let res = mock_client
.batch_delete(vec![
"k1".to_owned(),
"k2".to_owned(),
"k3".to_owned(),
"k4".to_owned(),
])
.await;
assert!(res.is_err());
let res = mock_client
.batch_delete(vec!["k1".to_owned(), "k2".to_owned(), "k4".to_owned()])
.await;
assert!(res.is_ok());
let res = mock_client
.batch_get(vec![
"k1".to_owned(),
"k2".to_owned(),
"k3".to_owned(),
"k4".to_owned(),
])
.await
.unwrap();
for i in 0..3 {
assert_eq!(res[i].1, "".as_bytes());
}
let _ = server.shutdown().await;
}
}

View File

@ -1,71 +0,0 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{pd::PdClient, Timestamp};
use futures::future::ready;
use kvproto::metapb;
use std::time::Duration;
use tikv_client_store::{KvRpcClient, Region, Store};
// Mocked pd client that always returns a TikvClient
pub struct MockRpcPdClient {
client: KvRpcClient,
}
impl MockRpcPdClient {
pub fn new(client: KvRpcClient) -> MockRpcPdClient {
MockRpcPdClient { client }
}
pub fn region() -> Region {
let mut region = Region::default();
region.region.id = 1;
region.region.set_start_key(vec![0]);
region.region.set_end_key(vec![250, 250]);
let mut leader = metapb::Peer::default();
leader.store_id = 41;
region.leader = Some(leader);
region
}
}
impl PdClient for MockRpcPdClient {
type KvClient = KvRpcClient;
fn map_region_to_store(
self: std::sync::Arc<Self>,
region: tikv_client_store::Region,
) -> futures::future::BoxFuture<
'static,
tikv_client_common::Result<tikv_client_store::Store<Self::KvClient>>,
> {
Box::pin(ready(Ok(Store::new(
region,
self.client.clone(),
Duration::from_secs(60),
))))
}
fn region_for_key(
&self,
_key: &tikv_client_common::Key,
) -> futures::future::BoxFuture<'static, tikv_client_common::Result<tikv_client_store::Region>>
{
Box::pin(ready(Ok(Self::region())))
}
fn region_for_id(
&self,
_id: tikv_client_store::RegionId,
) -> futures::future::BoxFuture<'static, tikv_client_common::Result<tikv_client_store::Region>>
{
Box::pin(ready(Ok(Self::region())))
}
fn get_timestamp(
self: std::sync::Arc<Self>,
) -> futures::future::BoxFuture<'static, tikv_client_common::Result<Timestamp>> {
todo!()
}
}

View File

@ -7,13 +7,9 @@
mod mock_kv;
mod mock_pd;
mod mock_raw;
mod mock_rpcpd;
pub use mock_kv::{MockKvClient, MockKvConnect};
pub use mock_pd::{pd_rpc_client, MockPdClient};
pub use mock_raw::MockRawClient;
pub use mock_rpcpd::MockRpcPdClient;
use crate::{request::DispatchHook, Result};
use fail::fail_point;

View File

@ -3,11 +3,14 @@
// Note: This module exists and includes some integration tests because the `/tests/`
// directory tests don't have access to `cfg(tests)` functions and we don't want to force
// users to depend on proptest or manually enable features to test.
/*
* Temporarily disabled
use proptest::strategy::Strategy;
use std::env::var;
mod raw;
pub(crate) const ENV_PD_ADDRS: &str = "PD_ADDRS";
pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16;
@ -26,3 +29,4 @@ pub fn pd_addrs() -> Vec<String> {
.map(From::from)
.collect()
}
*/

98
tests/mock_tikv_tests.rs Normal file
View File

@ -0,0 +1,98 @@
#[cfg(test)]
mod test {
use grpcio::redirect_log;
use log::debug;
use mock_tikv::{start_mock_pd_server, start_mock_tikv_server, MOCK_PD_PORT};
use simple_logger::SimpleLogger;
use tikv_client::RawClient;
use tikv_client_common::{Config, KvPair};
#[tokio::test]
async fn test_raw_put_get() {
SimpleLogger::new().init().unwrap();
redirect_log();
let mut tikv_server = start_mock_tikv_server();
let _pd_server = start_mock_pd_server();
let config = Config::new(vec![format!("localhost:{}", MOCK_PD_PORT)]);
let client = RawClient::new(config).await.unwrap();
// empty; get non-existent key
let res = client.get("k1".to_owned()).await;
assert_eq!(res.unwrap().unwrap(), vec![]);
// empty; put then batch_get
let _ = client.put("k1".to_owned(), "v1".to_owned()).await.unwrap();
let _ = client.put("k2".to_owned(), "v2".to_owned()).await.unwrap();
let res = client
.batch_get(vec!["k1".to_owned(), "k2".to_owned(), "k3".to_owned()])
.await
.unwrap();
assert_eq!(res[0].1, "v1".as_bytes());
assert_eq!(res[1].1, "v2".as_bytes());
assert_eq!(res[2].1, "".as_bytes());
// k1,k2; batch_put then batch_get
let _ = client
.batch_put(vec![
KvPair::new("k3".to_owned(), "v3".to_owned()),
KvPair::new("k4".to_owned(), "v4".to_owned()),
])
.await
.unwrap();
let res = client
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
.await
.unwrap();
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());
// k1,k2,k3,k4; delete then get
let res = client.delete("k3".to_owned()).await;
assert!(res.is_ok());
let res = client.delete("key-not-exist".to_owned()).await;
assert!(res.is_err());
let res = client.get("k3".to_owned()).await;
assert_eq!(res.unwrap().unwrap(), "".as_bytes());
// k1,k2,k4; batch_delete then batch_get
let res = client
.batch_delete(vec![
"k1".to_owned(),
"k2".to_owned(),
"k3".to_owned(),
"k4".to_owned(),
])
.await;
assert!(res.is_err());
let res = client
.batch_delete(vec!["k1".to_owned(), "k2".to_owned(), "k4".to_owned()])
.await;
assert!(res.is_ok());
let res = client
.batch_get(vec![
"k1".to_owned(),
"k2".to_owned(),
"k3".to_owned(),
"k4".to_owned(),
])
.await
.unwrap();
for i in 0..3 {
assert_eq!(res[i].1, "".as_bytes());
}
debug!("Pass all tests");
let _ = tikv_server.shutdown().await;
// FIXME: shutdown PD server
// let _ = pd_server.shutdown().await;
}
}

View File

@ -85,7 +85,6 @@ impl Connection {
) -> Result<Cluster> {
let members = self.validate_endpoints(endpoints, timeout).await?;
let (client, members) = self.try_connect_leader(&members, timeout).await?;
let id = members.get_header().get_cluster_id();
let tso = TimestampOracle::new(id, &client)?;
let cluster = Cluster {

View File

@ -108,12 +108,14 @@ async fn run_tso(
allocate_timestamps(&resp, &mut pending_requests)?;
}
Err(Error::internal_error("TSO stream terminated"))
// TODO: distinguish between unexpected stream termination and expected end of test
info!("TSO stream terminated");
Ok(())
};
let (send_res, recv_res): (_, Result<()>) = join!(send_requests, receive_and_handle_responses);
error!("TSO send error: {:?}", send_res);
error!("TSO receive error: {:?}", recv_res);
info!("TSO send termination: {:?}", send_res);
info!("TSO receive termination: {:?}", recv_res);
}
struct RequestGroup {