mirror of https://github.com/tikv/client-rust.git
Merge branch 'master' into fix-example
This commit is contained in:
commit
e07e973cbf
|
@ -46,4 +46,4 @@ cargo doc --package tikv-client --open
|
|||
|
||||
This crate supports Rust 1.40 and above.
|
||||
|
||||
For development, a nightly Rust compiler is needed to compile the tests.
|
||||
For development, a nightly Rust compiler is needed to compile the tests.
|
||||
|
|
|
@ -32,6 +32,7 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
|
|||
MockCluster,
|
||||
))
|
||||
},
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
|
||||
|
||||
use crate::{pd::RetryClient, Config, Key, Region, RegionId};
|
||||
use crate::{pd::RetryClient, tikv_client_common::kv::codec, Config, Key, Region, RegionId};
|
||||
use futures::{
|
||||
future::{ready, BoxFuture, Either},
|
||||
prelude::*,
|
||||
stream::BoxStream,
|
||||
FutureExt, TryFutureExt,
|
||||
};
|
||||
use grpcio::{EnvBuilder, Environment};
|
||||
use std::{
|
||||
|
@ -23,20 +24,42 @@ use tikv_client_store::{KvClient, KvConnect, Store, TikvConnect};
|
|||
const CQ_COUNT: usize = 1;
|
||||
const CLIENT_PREFIX: &str = "tikv-client";
|
||||
|
||||
// The PdClient handles all the encoding stuff.
|
||||
//
|
||||
// Raw APIs does not require encoding/decoding at all.
|
||||
// All keys in all places (client, PD, TiKV) are in the same encoding (here called "raw format").
|
||||
//
|
||||
// Transactional APIs are a bit complicated.
|
||||
// We need encode and decode keys when we communicate with PD, but not with TiKV.
|
||||
// We encode keys before sending requests to PD, and decode keys in the response from PD.
|
||||
// That's all we need to do with encoding.
|
||||
//
|
||||
// client -encoded-> PD, PD -encoded-> client
|
||||
// client -raw-> TiKV, TiKV -raw-> client
|
||||
//
|
||||
// The reason for the behavior is that in transaction mode, TiKV encode keys for MVCC.
|
||||
// In raw mode, TiKV doesn't encode them.
|
||||
// TiKV tells PD using its internal representation, whatever the encoding is.
|
||||
// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff.
|
||||
//
|
||||
pub trait PdClient: Send + Sync + 'static {
|
||||
type KvClient: KvClient + Send + Sync + 'static;
|
||||
|
||||
// In transactional API, `region` is decoded (keys in raw format).
|
||||
fn map_region_to_store(
|
||||
self: Arc<Self>,
|
||||
region: Region,
|
||||
) -> BoxFuture<'static, Result<Store<Self::KvClient>>>;
|
||||
|
||||
// In transactional API, the key and returned region are both decoded (keys in raw format).
|
||||
fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>>;
|
||||
|
||||
// In transactional API, the returned region is decoded (keys in raw format)
|
||||
fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>>;
|
||||
|
||||
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>>;
|
||||
|
||||
// In transactional API, `key` is in raw format
|
||||
fn store_for_key(
|
||||
self: Arc<Self>,
|
||||
key: &Key,
|
||||
|
@ -80,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static {
|
|||
.boxed()
|
||||
}
|
||||
|
||||
// Returns a Steam which iterates over the contexts for each region covered by range.
|
||||
// Returns a Stream which iterates over the contexts for each region covered by range.
|
||||
fn stores_for_range(
|
||||
self: Arc<Self>,
|
||||
range: BoundRange,
|
||||
|
@ -92,12 +115,15 @@ pub trait PdClient: Send + Sync + 'static {
|
|||
Some(sk) => sk,
|
||||
};
|
||||
let end_key = end_key.clone();
|
||||
|
||||
let this = self.clone();
|
||||
Either::Left(self.region_for_key(&start_key).and_then(move |region| {
|
||||
let region_end = region.end_key();
|
||||
this.map_region_to_store(region).map_ok(move |store| {
|
||||
if end_key.map(|x| x <= region_end).unwrap_or(false) || region_end.is_empty() {
|
||||
if end_key
|
||||
.map(|x| x <= region_end && !x.is_empty())
|
||||
.unwrap_or(false)
|
||||
|| region_end.is_empty()
|
||||
{
|
||||
return Some((None, store));
|
||||
}
|
||||
Some((Some(region_end), store))
|
||||
|
@ -158,6 +184,14 @@ pub trait PdClient: Send + Sync + 'static {
|
|||
})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
fn decode_region(mut region: Region, enable_codec: bool) -> Result<Region> {
|
||||
if enable_codec {
|
||||
codec::decode_bytes_in_place(&mut region.region.mut_start_key(), false)?;
|
||||
codec::decode_bytes_in_place(&mut region.region.mut_end_key(), false)?;
|
||||
}
|
||||
Ok(region)
|
||||
}
|
||||
}
|
||||
|
||||
/// This client converts requests for the logical TiKV cluster into requests
|
||||
|
@ -167,6 +201,7 @@ pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl
|
|||
kv_connect: KvC,
|
||||
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
|
||||
timeout: Duration,
|
||||
enable_codec: bool,
|
||||
}
|
||||
|
||||
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
||||
|
@ -191,12 +226,24 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
|||
}
|
||||
|
||||
fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>> {
|
||||
let key: &[u8] = key.into();
|
||||
self.pd.clone().get_region(key.to_owned()).boxed()
|
||||
let enable_codec = self.enable_codec;
|
||||
let key = if enable_codec {
|
||||
key.to_encoded().into()
|
||||
} else {
|
||||
key.clone().into()
|
||||
};
|
||||
let region = self.pd.clone().get_region(key).boxed();
|
||||
region
|
||||
.ok_and_then(move |region| Self::decode_region(region, enable_codec))
|
||||
.boxed()
|
||||
}
|
||||
|
||||
fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>> {
|
||||
self.pd.clone().get_region_by_id(id).boxed()
|
||||
let region = self.pd.clone().get_region_by_id(id).boxed();
|
||||
let enable_codec = self.enable_codec;
|
||||
region
|
||||
.ok_and_then(move |region| Self::decode_region(region, enable_codec))
|
||||
.boxed()
|
||||
}
|
||||
|
||||
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
|
||||
|
@ -205,13 +252,14 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
|||
}
|
||||
|
||||
impl PdRpcClient<TikvConnect, Cluster> {
|
||||
pub async fn connect(config: &Config) -> Result<PdRpcClient> {
|
||||
pub async fn connect(config: &Config, enable_codec: bool) -> Result<PdRpcClient> {
|
||||
PdRpcClient::new(
|
||||
config,
|
||||
|env, security_mgr| TikvConnect::new(env, security_mgr),
|
||||
|env, security_mgr| {
|
||||
RetryClient::connect(env, &config.pd_endpoints, security_mgr, config.timeout)
|
||||
},
|
||||
enable_codec,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
@ -222,6 +270,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
|
|||
config: &Config,
|
||||
kv_connect: MakeKvC,
|
||||
pd: MakePd,
|
||||
enable_codec: bool,
|
||||
) -> Result<PdRpcClient<KvC, Cl>>
|
||||
where
|
||||
PdFut: Future<Output = Result<RetryClient<Cl>>>,
|
||||
|
@ -251,6 +300,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
|
|||
kv_client_cache,
|
||||
kv_connect: kv_connect(env, security_mgr),
|
||||
timeout: config.timeout,
|
||||
enable_codec,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -96,6 +96,7 @@ impl RetryClient<Cluster> {
|
|||
}
|
||||
|
||||
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
|
||||
// It does not know about encoding. Caller should take care of it.
|
||||
pub async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<Region> {
|
||||
retry!(self, "get_region", |cluster| {
|
||||
let key = key.clone();
|
||||
|
|
|
@ -26,7 +26,7 @@ impl Client {
|
|||
/// # });
|
||||
/// ```
|
||||
pub async fn new(config: Config) -> Result<Client> {
|
||||
let rpc = Arc::new(PdRpcClient::connect(&config).await?);
|
||||
let rpc = Arc::new(PdRpcClient::connect(&config, false).await?);
|
||||
Ok(Client {
|
||||
rpc,
|
||||
cf: None,
|
||||
|
@ -104,6 +104,7 @@ impl Client {
|
|||
///
|
||||
/// Once resolved this request will result in the fetching of the values associated with the
|
||||
/// given keys.
|
||||
/// Non-existent entries will be skipped. The order of the keys is not retained.
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use tikv_client::{KvPair, Config, RawClient};
|
||||
|
|
|
@ -86,6 +86,7 @@ impl KvRequest for kvrpcpb::RawBatchGetRequest {
|
|||
&mut self,
|
||||
pd_client: Arc<PdC>,
|
||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||
self.keys.sort();
|
||||
let keys = mem::take(&mut self.keys);
|
||||
store_stream_for_keys(keys, pd_client)
|
||||
}
|
||||
|
@ -182,6 +183,7 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
|
|||
&mut self,
|
||||
pd_client: Arc<PdC>,
|
||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||
self.pairs.sort_by(|a, b| a.key.cmp(&b.key));
|
||||
let pairs = mem::take(&mut self.pairs);
|
||||
store_stream_for_keys(pairs, pd_client)
|
||||
}
|
||||
|
@ -271,6 +273,7 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
|
|||
&mut self,
|
||||
pd_client: Arc<PdC>,
|
||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||
self.keys.sort();
|
||||
let keys = mem::take(&mut self.keys);
|
||||
store_stream_for_keys(keys, pd_client)
|
||||
}
|
||||
|
|
|
@ -164,6 +164,7 @@ where
|
|||
.boxed()
|
||||
}
|
||||
|
||||
/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order
|
||||
pub fn store_stream_for_keys<KeyData, IntoKey, I, PdC>(
|
||||
key_data: I,
|
||||
pd_client: Arc<PdC>,
|
||||
|
@ -208,6 +209,7 @@ fn bound_range(region_range: (Key, Key), range: BoundRange) -> (Key, Key) {
|
|||
let up = match (upper.is_empty(), upper_bound) {
|
||||
(_, None) => upper,
|
||||
(true, Some(ub)) => ub,
|
||||
(_, Some(ub)) if ub.is_empty() => upper,
|
||||
(_, Some(ub)) => min(upper, ub),
|
||||
};
|
||||
(max(lower, lower_bound), up)
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||
|
||||
use crate::transaction::{Snapshot, Transaction};
|
||||
use crate::{
|
||||
pd::PdRpcClient,
|
||||
transaction::{Snapshot, Transaction},
|
||||
};
|
||||
|
||||
use crate::pd::{PdClient, PdRpcClient};
|
||||
use crate::pd::PdClient;
|
||||
use futures::executor::ThreadPool;
|
||||
use std::sync::Arc;
|
||||
use tikv_client_common::{Config, Result, Timestamp};
|
||||
|
@ -30,7 +33,7 @@ impl Client {
|
|||
let bg_worker = ThreadPool::new()?;
|
||||
// TODO: PdRpcClient::connect currently uses a blocking implementation.
|
||||
// Make it asynchronous later.
|
||||
let pd = Arc::new(PdRpcClient::connect(&config).await?);
|
||||
let pd = Arc::new(PdRpcClient::connect(&config, true).await?);
|
||||
Ok(Client {
|
||||
pd,
|
||||
bg_worker,
|
||||
|
|
|
@ -88,6 +88,7 @@ impl KvRequest for kvrpcpb::BatchGetRequest {
|
|||
&mut self,
|
||||
pd_client: Arc<PdC>,
|
||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||
self.keys.sort();
|
||||
let keys = mem::take(&mut self.keys);
|
||||
store_stream_for_keys(keys, pd_client)
|
||||
}
|
||||
|
@ -337,6 +338,7 @@ impl KvRequest for kvrpcpb::PrewriteRequest {
|
|||
&mut self,
|
||||
pd_client: Arc<PdC>,
|
||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||
self.mutations.sort_by(|a, b| a.key.cmp(&b.key));
|
||||
let mutations = mem::take(&mut self.mutations);
|
||||
store_stream_for_keys(mutations, pd_client)
|
||||
}
|
||||
|
@ -396,6 +398,7 @@ impl KvRequest for kvrpcpb::CommitRequest {
|
|||
&mut self,
|
||||
pd_client: Arc<PdC>,
|
||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||
self.keys.sort();
|
||||
let keys = mem::take(&mut self.keys);
|
||||
store_stream_for_keys(keys, pd_client)
|
||||
}
|
||||
|
@ -441,6 +444,7 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest {
|
|||
&mut self,
|
||||
pd_client: Arc<PdC>,
|
||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||
self.keys.sort();
|
||||
let keys = mem::take(&mut self.keys);
|
||||
store_stream_for_keys(keys, pd_client)
|
||||
}
|
||||
|
|
|
@ -77,6 +77,8 @@ impl Transaction {
|
|||
|
||||
/// Gets the values associated with the given keys.
|
||||
///
|
||||
/// Non-existent entries will be skipped. The order of the keys is not retained.
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use tikv_client::{Key, Value, Config, transaction::Client};
|
||||
/// # use futures::prelude::*;
|
||||
|
@ -304,6 +306,7 @@ impl TwoPhaseCommitter {
|
|||
}
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(commit_version)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,50 +7,31 @@ use serial_test::serial;
|
|||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
convert::TryInto,
|
||||
env,
|
||||
env, iter,
|
||||
};
|
||||
use tikv_client::{
|
||||
ColumnFamily, Config, Key, KvPair, RawClient, Result, Transaction, TransactionClient, Value,
|
||||
};
|
||||
use tikv_client::{Config, Key, RawClient, Result, Transaction, TransactionClient, Value};
|
||||
|
||||
/// The limit of scan in each iteration in `clear_tikv`.
|
||||
const SCAN_BATCH_SIZE: u32 = 1000;
|
||||
|
||||
// Parameters used in test
|
||||
const NUM_PEOPLE: u32 = 100;
|
||||
const NUM_TRNASFER: u32 = 100;
|
||||
|
||||
/// Delete all entris in TiKV to leave a clean space for following tests.
|
||||
/// TiKV does not provide an elegant way to do this, so it is done by scanning and deletions.
|
||||
async fn clear_tikv() -> Fallible<()> {
|
||||
delete_all_raw().await?;
|
||||
delete_all_txn().await?;
|
||||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
async fn delete_all_raw() -> Fallible<()> {
|
||||
let config = Config::new(pd_addrs());
|
||||
let raw_client = RawClient::new(config).await?.with_key_only(true);
|
||||
raw_client.delete_range(vec![]..).await?;
|
||||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
async fn delete_all_txn() -> Fallible<()> {
|
||||
let config = Config::new(pd_addrs());
|
||||
let txn_client = TransactionClient::new(config).await?.with_key_only(true);
|
||||
let mut txn = txn_client.begin().await?;
|
||||
|
||||
loop {
|
||||
let mut keys = txn.scan(vec![].., SCAN_BATCH_SIZE).await?.peekable();
|
||||
|
||||
if keys.peek().is_none() {
|
||||
break;
|
||||
}
|
||||
|
||||
for kv in keys {
|
||||
txn.delete(kv.into_key()).await?;
|
||||
}
|
||||
let cfs = vec![
|
||||
ColumnFamily::Default,
|
||||
ColumnFamily::Lock,
|
||||
ColumnFamily::Write,
|
||||
];
|
||||
for cf in cfs {
|
||||
let config = Config::new(pd_addrs());
|
||||
let raw_client = RawClient::new(config)
|
||||
.await?
|
||||
.with_key_only(true)
|
||||
.with_cf(cf);
|
||||
raw_client.delete_range(vec![]..).await?;
|
||||
}
|
||||
|
||||
txn.commit().await?;
|
||||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
|
@ -73,6 +54,7 @@ async fn get_timestamp() -> Fallible<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
// Tests transactional get, put, delete, batch_get
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn crud() -> Fallible<()> {
|
||||
|
@ -200,6 +182,80 @@ async fn raw_bank_transfer() -> Fallible<()> {
|
|||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
/// Tests transactional API when there are multiple regions.
|
||||
/// Write large volumes of data to enforce region splitting.
|
||||
/// In order to test `scan`, data is uniformly inserted.
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn txn_write_million() -> Fallible<()> {
|
||||
const NUM_BITS_TXN: u32 = 7;
|
||||
const NUM_BITS_KEY_PER_TXN: u32 = 3;
|
||||
let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN);
|
||||
|
||||
clear_tikv().await?;
|
||||
let config = Config::new(pd_addrs());
|
||||
let client = TransactionClient::new(config).await?;
|
||||
|
||||
for i in 0..2u32.pow(NUM_BITS_TXN) {
|
||||
let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN);
|
||||
let keys = iter::repeat_with(|| {
|
||||
let v = cur;
|
||||
cur = cur.overflowing_add(interval).0;
|
||||
v
|
||||
})
|
||||
.map(|u| u.to_be_bytes().to_vec())
|
||||
.take(2usize.pow(NUM_BITS_KEY_PER_TXN))
|
||||
.collect::<Vec<_>>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13
|
||||
let mut txn = client.begin().await?;
|
||||
for (k, v) in keys.iter().zip(iter::repeat(1u32.to_be_bytes().to_vec())) {
|
||||
txn.put(k.clone(), v).await?;
|
||||
}
|
||||
txn.commit().await?;
|
||||
|
||||
let mut txn = client.begin().await?;
|
||||
let res = txn.batch_get(keys).await?;
|
||||
assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
|
||||
txn.commit().await?;
|
||||
}
|
||||
|
||||
// test scan
|
||||
let limit = 2u32.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN + 2); // large enough
|
||||
let txn = client.begin().await?;
|
||||
let res = txn.scan(vec![].., limit).await?;
|
||||
assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN));
|
||||
|
||||
// scan by small range and combine them
|
||||
let mut rng = thread_rng();
|
||||
let mut keys = gen_u32_keys(10, &mut rng)
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
keys.sort();
|
||||
|
||||
let mut sum = 0;
|
||||
|
||||
// empty key to key[0]
|
||||
let txn = client.begin().await?;
|
||||
let res = txn.scan(vec![]..keys[0].clone(), limit).await?;
|
||||
sum += res.count();
|
||||
|
||||
// key[i] .. key[i+1]
|
||||
for i in 0..keys.len() - 1 {
|
||||
let res = txn
|
||||
.scan(keys[i].clone()..keys[i + 1].clone(), limit)
|
||||
.await?;
|
||||
sum += res.count();
|
||||
}
|
||||
|
||||
// keys[last] to unbounded
|
||||
let res = txn.scan(keys[keys.len() - 1].clone().., limit).await?;
|
||||
sum += res.count();
|
||||
|
||||
assert_eq!(sum, 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN));
|
||||
|
||||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn txn_bank_transfer() -> Fallible<()> {
|
||||
|
@ -252,7 +308,7 @@ async fn txn_bank_transfer() -> Fallible<()> {
|
|||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn raw() -> Fallible<()> {
|
||||
async fn raw_req() -> Fallible<()> {
|
||||
clear_tikv().await?;
|
||||
let config = Config::new(pd_addrs());
|
||||
let client = RawClient::new(config).await?;
|
||||
|
@ -283,8 +339,8 @@ async fn raw() -> Fallible<()> {
|
|||
let res = client
|
||||
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
|
||||
.await?;
|
||||
assert_eq!(res[0].1, "v4".as_bytes());
|
||||
assert_eq!(res[1].1, "v3".as_bytes());
|
||||
assert_eq!(res[0], KvPair::new("k3".to_owned(), "v3"));
|
||||
assert_eq!(res[1], KvPair::new("k4".to_owned(), "v4"));
|
||||
|
||||
// k1,k2,k3,k4; delete then get
|
||||
let res = client.delete("k3".to_owned()).await;
|
||||
|
|
|
@ -47,8 +47,8 @@ mod test {
|
|||
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res[0].1, "v4".as_bytes());
|
||||
assert_eq!(res[1].1, "v3".as_bytes());
|
||||
assert_eq!(res[0].1, "v3".as_bytes());
|
||||
assert_eq!(res[1].1, "v4".as_bytes());
|
||||
|
||||
// k1,k2,k3,k4; delete then get
|
||||
let res = client.delete("k3".to_owned()).await;
|
||||
|
|
|
@ -0,0 +1,217 @@
|
|||
use crate::{errors::Result, Error};
|
||||
use std::{io::Write, ptr};
|
||||
|
||||
const ENC_GROUP_SIZE: usize = 8;
|
||||
const ENC_MARKER: u8 = 0xff;
|
||||
const ENC_ASC_PADDING: [u8; ENC_GROUP_SIZE] = [0; ENC_GROUP_SIZE];
|
||||
const ENC_DESC_PADDING: [u8; ENC_GROUP_SIZE] = [!0; ENC_GROUP_SIZE];
|
||||
|
||||
/// Returns the maximum encoded bytes size.
|
||||
///
|
||||
/// Duplicate from components/tikv_util/src/codec/bytes.rs.
|
||||
pub fn max_encoded_bytes_size(n: usize) -> usize {
|
||||
(n / ENC_GROUP_SIZE + 1) * (ENC_GROUP_SIZE + 1)
|
||||
}
|
||||
|
||||
pub trait BytesEncoder: Write {
|
||||
/// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format
|
||||
///
|
||||
/// Duplicate from components/tikv_util/src/codec/bytes.rs.
|
||||
fn encode_bytes(&mut self, key: &[u8], desc: bool) -> Result<()> {
|
||||
let len = key.len();
|
||||
let mut index = 0;
|
||||
let mut buf = [0; ENC_GROUP_SIZE];
|
||||
while index <= len {
|
||||
let remain = len - index;
|
||||
let mut pad: usize = 0;
|
||||
if remain > ENC_GROUP_SIZE {
|
||||
self.write_all(adjust_bytes_order(
|
||||
&key[index..index + ENC_GROUP_SIZE],
|
||||
desc,
|
||||
&mut buf,
|
||||
))?;
|
||||
} else {
|
||||
pad = ENC_GROUP_SIZE - remain;
|
||||
self.write_all(adjust_bytes_order(&key[index..], desc, &mut buf))?;
|
||||
if desc {
|
||||
self.write_all(&ENC_DESC_PADDING[..pad])?;
|
||||
} else {
|
||||
self.write_all(&ENC_ASC_PADDING[..pad])?;
|
||||
}
|
||||
}
|
||||
self.write_all(adjust_bytes_order(
|
||||
&[ENC_MARKER - (pad as u8)],
|
||||
desc,
|
||||
&mut buf,
|
||||
))?;
|
||||
index += ENC_GROUP_SIZE;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Write> BytesEncoder for T {}
|
||||
|
||||
fn adjust_bytes_order<'a>(bs: &'a [u8], desc: bool, buf: &'a mut [u8]) -> &'a [u8] {
|
||||
if desc {
|
||||
let mut buf_idx = 0;
|
||||
for &b in bs {
|
||||
buf[buf_idx] = !b;
|
||||
buf_idx += 1;
|
||||
}
|
||||
&buf[..buf_idx]
|
||||
} else {
|
||||
bs
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes bytes which are encoded by `encode_bytes` before just in place without malloc.
|
||||
///
|
||||
/// Duplicate from components/tikv_util/src/codec/bytes.rs.
|
||||
pub fn decode_bytes_in_place(data: &mut Vec<u8>, desc: bool) -> Result<()> {
|
||||
if data.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let mut write_offset = 0;
|
||||
let mut read_offset = 0;
|
||||
loop {
|
||||
let marker_offset = read_offset + ENC_GROUP_SIZE;
|
||||
if marker_offset >= data.len() {
|
||||
return Err(Error::internal_error(format!(
|
||||
"unexpected EOF, original key = {:?}",
|
||||
data
|
||||
)));
|
||||
};
|
||||
|
||||
unsafe {
|
||||
// it is semantically equivalent to C's memmove()
|
||||
// and the src and dest may overlap
|
||||
// if src == dest do nothing
|
||||
ptr::copy(
|
||||
data.as_ptr().add(read_offset),
|
||||
data.as_mut_ptr().add(write_offset),
|
||||
ENC_GROUP_SIZE,
|
||||
);
|
||||
}
|
||||
write_offset += ENC_GROUP_SIZE;
|
||||
// everytime make ENC_GROUP_SIZE + 1 elements as a decode unit
|
||||
read_offset += ENC_GROUP_SIZE + 1;
|
||||
|
||||
// the last byte in decode unit is for marker which indicates pad size
|
||||
let marker = data[marker_offset];
|
||||
let pad_size = if desc {
|
||||
marker as usize
|
||||
} else {
|
||||
(ENC_MARKER - marker) as usize
|
||||
};
|
||||
|
||||
if pad_size > 0 {
|
||||
if pad_size > ENC_GROUP_SIZE {
|
||||
return Err(Error::internal_error("invalid key padding"));
|
||||
}
|
||||
|
||||
// check the padding pattern whether validate or not
|
||||
let padding_slice = if desc {
|
||||
&ENC_DESC_PADDING[..pad_size]
|
||||
} else {
|
||||
&ENC_ASC_PADDING[..pad_size]
|
||||
};
|
||||
if &data[write_offset - pad_size..write_offset] != padding_slice {
|
||||
return Err(Error::internal_error("invalid key padding"));
|
||||
}
|
||||
unsafe {
|
||||
data.set_len(write_offset - pad_size);
|
||||
}
|
||||
if desc {
|
||||
for k in data {
|
||||
*k = !*k;
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use super::*;
|
||||
|
||||
fn encode_bytes(bs: &[u8]) -> Vec<u8> {
|
||||
encode_order_bytes(bs, false)
|
||||
}
|
||||
|
||||
fn encode_bytes_desc(bs: &[u8]) -> Vec<u8> {
|
||||
encode_order_bytes(bs, true)
|
||||
}
|
||||
|
||||
fn encode_order_bytes(bs: &[u8], desc: bool) -> Vec<u8> {
|
||||
let cap = max_encoded_bytes_size(bs.len());
|
||||
let mut encoded = Vec::with_capacity(cap);
|
||||
encoded.encode_bytes(bs, desc).unwrap();
|
||||
encoded
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_enc_dec_bytes() {
|
||||
let pairs = vec![
|
||||
(
|
||||
vec![],
|
||||
vec![0, 0, 0, 0, 0, 0, 0, 0, 247],
|
||||
vec![255, 255, 255, 255, 255, 255, 255, 255, 8],
|
||||
),
|
||||
(
|
||||
vec![0],
|
||||
vec![0, 0, 0, 0, 0, 0, 0, 0, 248],
|
||||
vec![255, 255, 255, 255, 255, 255, 255, 255, 7],
|
||||
),
|
||||
(
|
||||
vec![1, 2, 3],
|
||||
vec![1, 2, 3, 0, 0, 0, 0, 0, 250],
|
||||
vec![254, 253, 252, 255, 255, 255, 255, 255, 5],
|
||||
),
|
||||
(
|
||||
vec![1, 2, 3, 0],
|
||||
vec![1, 2, 3, 0, 0, 0, 0, 0, 251],
|
||||
vec![254, 253, 252, 255, 255, 255, 255, 255, 4],
|
||||
),
|
||||
(
|
||||
vec![1, 2, 3, 4, 5, 6, 7],
|
||||
vec![1, 2, 3, 4, 5, 6, 7, 0, 254],
|
||||
vec![254, 253, 252, 251, 250, 249, 248, 255, 1],
|
||||
),
|
||||
(
|
||||
vec![0, 0, 0, 0, 0, 0, 0, 0],
|
||||
vec![0, 0, 0, 0, 0, 0, 0, 0, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247],
|
||||
vec![
|
||||
255, 255, 255, 255, 255, 255, 255, 255, 0, 255, 255, 255, 255, 255, 255, 255,
|
||||
255, 8,
|
||||
],
|
||||
),
|
||||
(
|
||||
vec![1, 2, 3, 4, 5, 6, 7, 8],
|
||||
vec![1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247],
|
||||
vec![
|
||||
254, 253, 252, 251, 250, 249, 248, 247, 0, 255, 255, 255, 255, 255, 255, 255,
|
||||
255, 8,
|
||||
],
|
||||
),
|
||||
(
|
||||
vec![1, 2, 3, 4, 5, 6, 7, 8, 9],
|
||||
vec![1, 2, 3, 4, 5, 6, 7, 8, 255, 9, 0, 0, 0, 0, 0, 0, 0, 248],
|
||||
vec![
|
||||
254, 253, 252, 251, 250, 249, 248, 247, 0, 246, 255, 255, 255, 255, 255, 255,
|
||||
255, 7,
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
for (source, mut asc, mut desc) in pairs {
|
||||
assert_eq!(encode_bytes(&source), asc);
|
||||
assert_eq!(encode_bytes_desc(&source), desc);
|
||||
decode_bytes_in_place(&mut asc, false).unwrap();
|
||||
assert_eq!(source, asc);
|
||||
decode_bytes_in_place(&mut desc, true).unwrap();
|
||||
assert_eq!(source, desc);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||
|
||||
use super::HexRepr;
|
||||
use crate::kv::codec::{self, BytesEncoder};
|
||||
use kvproto::kvrpcpb;
|
||||
#[allow(unused_imports)]
|
||||
#[cfg(test)]
|
||||
|
@ -104,6 +105,14 @@ impl Key {
|
|||
Bound::Excluded(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn to_encoded(&self) -> Key {
|
||||
let len = codec::max_encoded_bytes_size(self.0.len());
|
||||
let mut encoded = Vec::with_capacity(len);
|
||||
encoded.encode_bytes(&self.0, false).unwrap();
|
||||
Key(encoded)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<u8>> for Key {
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
use std::{fmt, u8};
|
||||
|
||||
mod bound_range;
|
||||
pub mod codec;
|
||||
mod key;
|
||||
mod kvpair;
|
||||
mod value;
|
||||
|
|
Loading…
Reference in New Issue