diff --git a/Cargo.lock b/Cargo.lock index 6e3d775..d28b699 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -713,6 +713,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.6" @@ -898,6 +907,7 @@ dependencies = [ "derive-new", "futures 0.3.5", "grpcio", + "itertools 0.9.0", "kvproto", "log", "tikv-client-common", @@ -1222,7 +1232,7 @@ checksum = "02b10678c913ecbd69350e8535c3aef91a8676c0773fc1d7b95cdd196d7f2f26" dependencies = [ "bytes", "heck", - "itertools", + "itertools 0.8.2", "log", "multimap", "petgraph", @@ -1239,7 +1249,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "537aa19b95acde10a12fec4301466386f757403de4cd4e5b4fa78fb5ecb18f72" dependencies = [ "anyhow", - "itertools", + "itertools 0.8.2", "proc-macro2 1.0.19", "quote 1.0.7", "syn 1.0.35", diff --git a/mock-tikv/Cargo.toml b/mock-tikv/Cargo.toml index fa81f51..92b024c 100644 --- a/mock-tikv/Cargo.toml +++ b/mock-tikv/Cargo.toml @@ -10,3 +10,4 @@ kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "1e28226154c37 derive-new = "0.5.8" tikv-client-common = { path = "../tikv-client-common"} log = "0.4" +itertools = "0.9.0" diff --git a/mock-tikv/src/lib.rs b/mock-tikv/src/lib.rs index cf43679..8eee772 100644 --- a/mock-tikv/src/lib.rs +++ b/mock-tikv/src/lib.rs @@ -7,6 +7,8 @@ pub use pd::{start_mock_pd_server, MockPd, MOCK_PD_PORT}; pub use server::{start_mock_tikv_server, MockTikv, MOCK_TIKV_PORT}; pub use store::KvStore; +/// A common pattern for implementing an unary RPC call. +/// Successfully returns the result. #[macro_export] macro_rules! spawn_unary_success { ($ctx:ident, $req:ident, $resp:ident, $sink:ident) => { diff --git a/mock-tikv/src/pd.rs b/mock-tikv/src/pd.rs index d8b88d0..82dbcce 100644 --- a/mock-tikv/src/pd.rs +++ b/mock-tikv/src/pd.rs @@ -19,12 +19,12 @@ impl MockPd { } fn region() -> kvproto::metapb::Region { - let mut meta_region = kvproto::metapb::Region::default(); - meta_region.set_end_key(vec![0xff; 20]); - meta_region.set_start_key(vec![0x00]); - meta_region.set_id(0); - meta_region.set_peers(vec![Self::leader()]); - meta_region + kvproto::metapb::Region { + start_key: vec![], + end_key: vec![], + peers: vec![Self::leader()], + ..Default::default() + } } fn leader() -> kvproto::metapb::Peer { @@ -32,10 +32,11 @@ impl MockPd { } fn store() -> kvproto::metapb::Store { - let mut store = kvproto::metapb::Store::default(); - store.set_address(format!("localhost:{}", MOCK_TIKV_PORT)); // TODO: start_timestamp? - store + kvproto::metapb::Store { + address: format!("localhost:{}", MOCK_TIKV_PORT), + ..Default::default() + } } } @@ -57,15 +58,16 @@ impl Pd for MockPd { req: GetMembersRequest, sink: ::grpcio::UnarySink, ) { - let mut resp = GetMembersResponse::default(); - resp.set_header(ResponseHeader::default()); - let mut member = Member::default(); - member.set_name("mock tikv".to_owned()); - member.set_member_id(0); - member.set_client_urls(vec![format!("localhost:{}", MOCK_PD_PORT)]); - // member.set_peer_urls(vec![format!("localhost:{}", MOCK_PD_PORT)]); - resp.set_members(vec![member.clone()]); - resp.set_leader(member); + let member = Member { + name: "mock tikv".to_owned(), + client_urls: vec![format!("localhost:{}", MOCK_PD_PORT)], + ..Default::default() + }; + let resp = GetMembersResponse { + members: vec![member.clone()], + leader: Some(member), + ..Default::default() + }; spawn_unary_success!(ctx, req, resp, sink); } @@ -77,9 +79,8 @@ impl Pd for MockPd { ) { let f = stream .map(|_| { - let mut resp = TsoResponse::default(); + let resp = TsoResponse::default(); // TODO: make ts monotonic - resp.set_timestamp(Timestamp::default()); Ok((resp, WriteFlags::default())) }) .forward(sink) @@ -120,8 +121,10 @@ impl Pd for MockPd { req: GetStoreRequest, sink: ::grpcio::UnarySink, ) { - let mut resp = GetStoreResponse::default(); - resp.set_store(Self::store()); + let resp = GetStoreResponse { + store: Some(Self::store()), + ..Default::default() + }; spawn_unary_success!(ctx, req, resp, sink); } @@ -167,9 +170,11 @@ impl Pd for MockPd { req: GetRegionRequest, sink: ::grpcio::UnarySink, ) { - let mut resp = GetRegionResponse::default(); - resp.set_region(Self::region()); - resp.set_leader(Self::leader()); + let resp = GetRegionResponse { + region: Some(Self::region()), + leader: Some(Self::leader()), + ..Default::default() + }; spawn_unary_success!(ctx, req, resp, sink); } @@ -188,9 +193,11 @@ impl Pd for MockPd { req: GetRegionByIdRequest, sink: ::grpcio::UnarySink, ) { - let mut resp = GetRegionResponse::default(); - resp.set_region(Self::region()); - resp.set_leader(Self::leader()); + let resp = GetRegionResponse { + region: Some(Self::region()), + leader: Some(Self::leader()), + ..Default::default() + }; spawn_unary_success!(ctx, req, resp, sink); } diff --git a/mock-tikv/src/server.rs b/mock-tikv/src/server.rs index 268646d..abacbf5 100644 --- a/mock-tikv/src/server.rs +++ b/mock-tikv/src/server.rs @@ -4,6 +4,7 @@ use crate::{spawn_unary_success, KvStore}; use derive_new::new; use futures::{FutureExt, TryFutureExt}; use grpcio::{Environment, Server, ServerBuilder}; +use itertools::Itertools; use kvproto::{kvrpcpb::*, tikvpb::*}; use std::sync::Arc; @@ -198,7 +199,8 @@ impl Tikv for MockTikv { req: kvproto::kvrpcpb::RawPutRequest, sink: grpcio::UnarySink, ) { - self.inner.raw_put(req.get_key(), req.get_value()); + self.inner + .raw_put(req.get_key().to_vec(), req.get_value().to_vec()); let resp = RawPutResponse::default(); spawn_unary_success!(ctx, req, resp, sink); } @@ -224,7 +226,7 @@ impl Tikv for MockTikv { let key = req.get_key(); let res = self.inner.raw_delete(key); let mut resp = RawDeleteResponse::default(); - if res.is_err() { + if res.is_none() { resp.set_error("Key not exist".to_owned()); } spawn_unary_success!(ctx, req, resp, sink); @@ -242,7 +244,11 @@ impl Tikv for MockTikv { if res.is_err() { resp.set_error(format!( "Non-existent keys:[{}]", - res.err().unwrap().join(", ") + res.err() + .unwrap() + .iter() + .map(|k| std::str::from_utf8(k).unwrap()) + .join(", ") )); } spawn_unary_success!(ctx, req, resp, sink); diff --git a/mock-tikv/src/store.rs b/mock-tikv/src/store.rs index a7ffbed..5226d2c 100644 --- a/mock-tikv/src/store.rs +++ b/mock-tikv/src/store.rs @@ -26,7 +26,7 @@ impl KvStore { pub fn raw_get(&self, key: &[u8]) -> Vec { let data = self.data.read().unwrap(); - data.get(key).unwrap_or(&vec![]).to_vec() + data.get(key).map(|v| v.to_vec()).unwrap_or_else(Vec::new) } pub fn raw_batch_get(&self, keys: &[Vec]) -> Vec { @@ -34,38 +34,37 @@ impl KvStore { keys.iter() .map(|key| { let mut pair = KvPair::default(); - pair.set_value(data.get(key).unwrap_or(&vec![]).to_vec()); + pair.set_value(data.get(key).map(|v| v.to_vec()).unwrap_or_else(Vec::new)); pair.set_key(key.to_vec()); pair }) - .collect::>() + .collect() } - pub fn raw_put(&self, key: &[u8], value: &[u8]) { + pub fn raw_put(&self, key: Vec, value: Vec) { let mut data = self.data.write().unwrap(); - *data.entry(key.to_vec()).or_default() = value.to_vec(); + data.insert(key, value); } pub fn raw_batch_put(&self, pairs: &[KvPair]) { let mut data = self.data.write().unwrap(); for pair in pairs { - *data.entry(pair.get_key().to_vec()).or_default() = pair.get_value().to_vec(); + data.insert(pair.get_key().to_vec(), pair.get_value().to_vec()); } } // if success, return the key deleted - pub fn raw_delete(&self, key: &[u8]) -> Result, ()> { + pub fn raw_delete(&self, key: &[u8]) -> Option> { let mut data = self.data.write().unwrap(); - data.remove(key).ok_or(()) + data.remove(key) } // if any of the key does not exist, return non-existent keys - pub fn raw_batch_delete<'a>(&self, keys: &'a [Vec]) -> Result<(), Vec<&'a str>> { + pub fn raw_batch_delete<'a>(&self, keys: &'a [Vec]) -> Result<(), Vec<&'a Vec>> { let mut data = self.data.write().unwrap(); let non_exist_keys = keys .iter() .filter(|&key| !data.contains_key(key)) - .map(|key| std::str::from_utf8(key).unwrap()) .collect::>(); if !non_exist_keys.is_empty() { Err(non_exist_keys) diff --git a/src/raw/mod.rs b/src/raw/mod.rs index b1fb530..7aea986 100644 --- a/src/raw/mod.rs +++ b/src/raw/mod.rs @@ -16,7 +16,7 @@ use std::{convert::TryFrom, fmt}; use tikv_client_common::errors::ErrorKind::ColumnFamilyError; mod client; -pub(crate) mod requests; +mod requests; /// A [`ColumnFamily`](ColumnFamily) is an optional parameter for [`raw::Client`](Client) requests. ///