Merge branch 'master' into fix-txn-batch-scan

This commit is contained in:
ekexium 2020-10-12 11:55:21 +08:00 committed by GitHub
commit 7c584fbb80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 513 additions and 74 deletions

104
README.md
View File

@ -8,33 +8,110 @@
This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
distributed transactional Key-Value database written in Rust.
With this crate you can easily connect to any TiKV deployment, interact with it, and mutate the data it contains.
With this crate you can easily connect to any TiKV deployment, interact with it, and mutate the data it contains. It uses async/await internally and exposes some `async fn` APIs as well.
This is an open source (Apache 2) project hosted by the Cloud Native Computing Foundation (CNCF) and maintained by the TiKV Authors. *We'd love it if you joined us in improving this project.*
## Using the client
## Getting started
The TiKV client is a Rust library (crate). It uses async/await internally and exposes some `async fn` APIs as well.
To use this crate in your project, add it as a dependency in your `Cargo.toml`:
The TiKV client is a Rust library (crate). To use this crate in your project, add following dependencies in your `Cargo.toml`:
```toml
[dependencies]
# ...Your other dependencies...
tikv-client = { git = "https://github.com/tikv/client-rust.git" }
[patch.crates-io]
raft-proto = { git = "https://github.com/tikv/raft-rs", rev = "e624c1d48460940a40d8aa69b5329460d9af87dd" }
```
The client requires a Git dependency until we can [publish it](https://github.com/tikv/client-rust/issues/32).
There are [examples](examples) which show how to use the client in a Rust program.
The client provides two modes to interact with TiKV: raw and transactional.
In the current version (0.0.0), the transactional API only supports optimistic transactions.
Important note: It is **not recommended or supported** to use both the raw and transactional APIs on the same database.
### Code examples
Raw mode:
```rust
let config = Config::new(vec!["127.0.0.1:2379"]);
let client = RawClient::new(config).await?;
client.put("key".to_owned(), "value".to_owned()).await;
let value = client.get("key".to_owned()).await;
```
Transactional mode:
```rust
let config = Config::new(vec!["127.0.0.1:2379"]);
let txn_client = TransactionClient::new(config).await?;
let mut txn = txn_client.begin().await?;
txn.put("key".to_owned(), "value".to_owned()).await?;
let value = txn.get("key".to_owned()).await;
txn.commit().await?;
```
There are some [examples](examples) which show how to use the client in a Rust program.
### API
#### Raw requests
| Request | Main parameter type | Successful result type | Noteworthy Behavior |
| -------------- | ------------------- | ---------------------- | --------------------------------------------- |
| `put` | `KvPair` | `()` | |
| `get` | `Key` | `Option<Value>` | |
| `delete` | `Key` | `()` | |
| `scan` | `BoundRange` | `Vec<KvPair>` | |
| `batch_put` | `Iter<KvPair>` | `()` | |
| `batch_get` | `Iter<Key>` | `Vec<KvPair>` | Skip non-existent keys; Does not retain order |
| `batch_delete` | `Iter<Key>` | `()` | |
| `delete_range` | `BoundRange` | `()` | |
#### Transactional requests
| Request | Main parameter type | Successful result type | Noteworthy Behavior |
| ----------- | ------------------- | ---------------------- | --------------------------------------------- |
| `put` | `KvPair` | `()` | |
| `get` | `Key` | `Option<value>` | |
| `delete` | `Key` | `()` | |
| `scan` | `BoundRange` | `Iter<KvPair>` | |
| `batch_get` | `Iter<Key>` | `Iter<KvPair>` | Skip non-existent keys; Does not retain order |
| `lock_keys` | `KvPair` | `()` | |
For detailed behavior of each request, please refer to the [doc](#Access-the-documentation).
#### Experimental raw requests
You must be careful if you want to use the following request(s). Read the description for reasons.
| Request | Main parameter type | Successful result type |
| -------------- | ------------------- | ---------------------- |
| `batch_scan` | `Iter<BoundRange>` | `Vec<KvPair>` |
The `each_limit` parameter does not work as expected. It does not limit the number of results returned of each range, instead it limits the number of results in each region of each range. As a result, you may get **more than** `each_limit` key-value pairs for each range. But you should not miss any entries.
The results of `batch_scan` are flattened. The order of ranges is retained.
### Useful types
To use the client, there are 4 types you will need.
`Key` is simply a vector of bytes(`Vec<u8>`). `String` and `Vec<u8>` implements `Into<Key>`, so you can directly pass them to clients.
`Value` is just an alias of `Vec<u8>`.
`KvPair` is a tuple consisting of a `Key` and a `Value`. It also provides some convenience methods for conversion to and from other types.
`BoundRange` is used for range related requests like `scan`. It implements `From` for usual ranges so you can just create a range and pass them to the request.For instance, `client.scan("k2".to_owned()..="k5".to_owned(), 5)` or `client.delete_range(vec![]..)`.
## Access the documentation
We recommend using the cargo-generated documentation to browse and understand the API. We've done
our best to include ample, tested, and understandable examples.
You can visit [docs.rs/tikv-client](https://docs.rs/tikv-client/), or build the documentation yourself.
You can access the documentation on your machine by running the following in any project that depends on `tikv-client`.
```bash
@ -42,8 +119,13 @@ cargo doc --package tikv-client --open
# If it didn't work, browse file URL it tried to open with your browser.
```
## Minimal Rust Version
## Known issues
If you use transactional APIs, you'll need to perform GC in TiKV to save storage.
However, current implementation does not provide a direct way to do this. A workaround is described in [#180](https://github.com/tikv/client-rust/issues/180).
## Minimal Rust version
This crate supports Rust 1.40 and above.
For development, a nightly Rust compiler is needed to compile the tests.
For development, a nightly Rust compiler is needed to compile the tests.

View File

@ -94,11 +94,7 @@ async fn main() -> Result<()> {
let keys: Vec<_> = pairs.into_iter().map(|p| p.key().clone()).collect();
assert_eq!(
&keys,
&[
Key::from("k1".to_owned()),
Key::from("k2".to_owned()),
Key::from("k3".to_owned())
]
&[Key::from("k1".to_owned()), Key::from("k2".to_owned()),]
);
println!("Scaning from {:?} to {:?} gives: {:?}", start, end, keys);

View File

@ -32,6 +32,7 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
MockCluster,
))
},
false,
)
.await
.unwrap()

View File

@ -1,10 +1,11 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{pd::RetryClient, Config, Key, Region, RegionId};
use crate::{pd::RetryClient, tikv_client_common::kv::codec, Config, Key, Region, RegionId};
use futures::{
future::{ready, BoxFuture, Either},
prelude::*,
stream::BoxStream,
FutureExt, TryFutureExt,
};
use grpcio::{EnvBuilder, Environment};
use std::{
@ -23,20 +24,42 @@ use tikv_client_store::{KvClient, KvConnect, Store, TikvConnect};
const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "tikv-client";
// The PdClient handles all the encoding stuff.
//
// Raw APIs does not require encoding/decoding at all.
// All keys in all places (client, PD, TiKV) are in the same encoding (here called "raw format").
//
// Transactional APIs are a bit complicated.
// We need encode and decode keys when we communicate with PD, but not with TiKV.
// We encode keys before sending requests to PD, and decode keys in the response from PD.
// That's all we need to do with encoding.
//
// client -encoded-> PD, PD -encoded-> client
// client -raw-> TiKV, TiKV -raw-> client
//
// The reason for the behavior is that in transaction mode, TiKV encode keys for MVCC.
// In raw mode, TiKV doesn't encode them.
// TiKV tells PD using its internal representation, whatever the encoding is.
// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff.
//
pub trait PdClient: Send + Sync + 'static {
type KvClient: KvClient + Send + Sync + 'static;
// In transactional API, `region` is decoded (keys in raw format).
fn map_region_to_store(
self: Arc<Self>,
region: Region,
) -> BoxFuture<'static, Result<Store<Self::KvClient>>>;
// In transactional API, the key and returned region are both decoded (keys in raw format).
fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>>;
// In transactional API, the returned region is decoded (keys in raw format)
fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>>;
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>>;
// In transactional API, `key` is in raw format
fn store_for_key(
self: Arc<Self>,
key: &Key,
@ -80,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static {
.boxed()
}
// Returns a Steam which iterates over the contexts for each region covered by range.
// Returns a Stream which iterates over the contexts for each region covered by range.
fn stores_for_range(
self: Arc<Self>,
range: BoundRange,
@ -92,12 +115,15 @@ pub trait PdClient: Send + Sync + 'static {
Some(sk) => sk,
};
let end_key = end_key.clone();
let this = self.clone();
Either::Left(self.region_for_key(&start_key).and_then(move |region| {
let region_end = region.end_key();
this.map_region_to_store(region).map_ok(move |store| {
if end_key.map(|x| x <= region_end).unwrap_or(false) || region_end.is_empty() {
if end_key
.map(|x| x <= region_end && !x.is_empty())
.unwrap_or(false)
|| region_end.is_empty()
{
return Some((None, store));
}
Some((Some(region_end), store))
@ -158,6 +184,14 @@ pub trait PdClient: Send + Sync + 'static {
})
.boxed()
}
fn decode_region(mut region: Region, enable_codec: bool) -> Result<Region> {
if enable_codec {
codec::decode_bytes_in_place(&mut region.region.mut_start_key(), false)?;
codec::decode_bytes_in_place(&mut region.region.mut_end_key(), false)?;
}
Ok(region)
}
}
/// This client converts requests for the logical TiKV cluster into requests
@ -167,6 +201,7 @@ pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
timeout: Duration,
enable_codec: bool,
}
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
@ -191,12 +226,24 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
}
fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>> {
let key: &[u8] = key.into();
self.pd.clone().get_region(key.to_owned()).boxed()
let enable_codec = self.enable_codec;
let key = if enable_codec {
key.to_encoded().into()
} else {
key.clone().into()
};
let region = self.pd.clone().get_region(key).boxed();
region
.ok_and_then(move |region| Self::decode_region(region, enable_codec))
.boxed()
}
fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>> {
self.pd.clone().get_region_by_id(id).boxed()
let region = self.pd.clone().get_region_by_id(id).boxed();
let enable_codec = self.enable_codec;
region
.ok_and_then(move |region| Self::decode_region(region, enable_codec))
.boxed()
}
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
@ -205,13 +252,14 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
}
impl PdRpcClient<TikvConnect, Cluster> {
pub async fn connect(config: &Config) -> Result<PdRpcClient> {
pub async fn connect(config: &Config, enable_codec: bool) -> Result<PdRpcClient> {
PdRpcClient::new(
config,
|env, security_mgr| TikvConnect::new(env, security_mgr),
|env, security_mgr| {
RetryClient::connect(env, &config.pd_endpoints, security_mgr, config.timeout)
},
enable_codec,
)
.await
}
@ -222,6 +270,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
config: &Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_codec: bool,
) -> Result<PdRpcClient<KvC, Cl>>
where
PdFut: Future<Output = Result<RetryClient<Cl>>>,
@ -251,6 +300,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
kv_client_cache,
kv_connect: kv_connect(env, security_mgr),
timeout: config.timeout,
enable_codec,
})
}

View File

@ -96,6 +96,7 @@ impl RetryClient<Cluster> {
}
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
// It does not know about encoding. Caller should take care of it.
pub async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<Region> {
retry!(self, "get_region", |cluster| {
let key = key.clone();

View File

@ -26,7 +26,7 @@ impl Client {
/// # });
/// ```
pub async fn new(config: Config) -> Result<Client> {
let rpc = Arc::new(PdRpcClient::connect(&config).await?);
let rpc = Arc::new(PdRpcClient::connect(&config, false).await?);
Ok(Client {
rpc,
cf: None,
@ -103,7 +103,8 @@ impl Client {
/// Create a new 'batch get' request.
///
/// Once resolved this request will result in the fetching of the values associated with the
/// given keys, skipping non-existent entries.
/// given keys
/// Non-existent entries will be skipped. The order of the keys is not retained.
///
/// ```rust,no_run
/// # use tikv_client::{KvPair, Config, RawClient};
@ -173,6 +174,7 @@ impl Client {
/// Create a new 'delete' request.
///
/// Once resolved this request will result in the deletion of the given key.
/// It does not return an error if the key does not exist.
///
/// ```rust,no_run
/// # use tikv_client::{Key, Config, RawClient};
@ -193,6 +195,7 @@ impl Client {
/// Create a new 'batch delete' request.
///
/// Once resolved this request will result in the deletion of the given keys.
/// It does not return an error if some of the keys do not exist and will delete the others.
///
/// ```rust,no_run
/// # use tikv_client::{Config, RawClient};

View File

@ -86,6 +86,7 @@ impl KvRequest for kvrpcpb::RawBatchGetRequest {
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
self.keys.sort();
let keys = mem::take(&mut self.keys);
store_stream_for_keys(keys, pd_client)
}
@ -182,6 +183,7 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
self.pairs.sort_by(|a, b| a.key.cmp(&b.key));
let pairs = mem::take(&mut self.pairs);
store_stream_for_keys(pairs, pd_client)
}
@ -271,6 +273,7 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
self.keys.sort();
let keys = mem::take(&mut self.keys);
store_stream_for_keys(keys, pd_client)
}
@ -330,7 +333,9 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results.try_for_each(|_| future::ready(Ok(()))).boxed()
results
.try_for_each_concurrent(None, |_| future::ready(Ok(())))
.boxed()
}
}

View File

@ -164,6 +164,7 @@ where
.boxed()
}
/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order
pub fn store_stream_for_keys<KeyData, IntoKey, I, PdC>(
key_data: I,
pd_client: Arc<PdC>,
@ -208,6 +209,7 @@ fn bound_range(region_range: (Key, Key), range: BoundRange) -> (Key, Key) {
let up = match (upper.is_empty(), upper_bound) {
(_, None) => upper,
(true, Some(ub)) => ub,
(_, Some(ub)) if ub.is_empty() => upper,
(_, Some(ub)) => min(upper, ub),
};
(max(lower, lower_bound), up)

View File

@ -1,8 +1,11 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::transaction::{Snapshot, Transaction};
use crate::{
pd::PdRpcClient,
transaction::{Snapshot, Transaction},
};
use crate::pd::{PdClient, PdRpcClient};
use crate::pd::PdClient;
use futures::executor::ThreadPool;
use std::sync::Arc;
use tikv_client_common::{Config, Result, Timestamp};
@ -30,7 +33,7 @@ impl Client {
let bg_worker = ThreadPool::new()?;
// TODO: PdRpcClient::connect currently uses a blocking implementation.
// Make it asynchronous later.
let pd = Arc::new(PdRpcClient::connect(&config).await?);
let pd = Arc::new(PdRpcClient::connect(&config, true).await?);
Ok(Client {
pd,
bg_worker,

View File

@ -88,6 +88,7 @@ impl KvRequest for kvrpcpb::BatchGetRequest {
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
self.keys.sort();
let keys = mem::take(&mut self.keys);
store_stream_for_keys(keys, pd_client)
}
@ -337,6 +338,7 @@ impl KvRequest for kvrpcpb::PrewriteRequest {
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
self.mutations.sort_by(|a, b| a.key.cmp(&b.key));
let mutations = mem::take(&mut self.mutations);
store_stream_for_keys(mutations, pd_client)
}
@ -346,7 +348,9 @@ impl KvRequest for kvrpcpb::PrewriteRequest {
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results.try_for_each(|_| future::ready(Ok(()))).boxed()
results
.try_for_each_concurrent(None, |_| future::ready(Ok(())))
.boxed()
}
}
@ -396,6 +400,7 @@ impl KvRequest for kvrpcpb::CommitRequest {
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
self.keys.sort();
let keys = mem::take(&mut self.keys);
store_stream_for_keys(keys, pd_client)
}
@ -405,7 +410,9 @@ impl KvRequest for kvrpcpb::CommitRequest {
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results.try_for_each(|_| future::ready(Ok(()))).boxed()
results
.try_for_each_concurrent(None, |_| future::ready(Ok(())))
.boxed()
}
}
@ -441,6 +448,7 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest {
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
self.keys.sort();
let keys = mem::take(&mut self.keys);
store_stream_for_keys(keys, pd_client)
}
@ -450,7 +458,9 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest {
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results.try_for_each(|_| future::ready(Ok(()))).boxed()
results
.try_for_each_concurrent(None, |_| future::ready(Ok(())))
.boxed()
}
}

View File

@ -77,6 +77,8 @@ impl Transaction {
/// Gets the values associated with the given keys, skipping non-existent entries.
///
/// Non-existent entries will be skipped. The order of the keys is not retained.
///
/// ```rust,no_run
/// # use tikv_client::{Key, Value, Config, transaction::Client};
/// # use futures::prelude::*;
@ -305,6 +307,7 @@ impl TwoPhaseCommitter {
}
})
.await?;
Ok(commit_version)
}

View File

@ -7,50 +7,31 @@ use serial_test::serial;
use std::{
collections::{HashMap, HashSet},
convert::TryInto,
env,
env, iter,
};
use tikv_client::{
ColumnFamily, Config, Key, KvPair, RawClient, Result, Transaction, TransactionClient, Value,
};
use tikv_client::{Config, Key, RawClient, Result, Transaction, TransactionClient, Value};
/// The limit of scan in each iteration in `clear_tikv`.
const SCAN_BATCH_SIZE: u32 = 1000;
// Parameters used in test
const NUM_PEOPLE: u32 = 100;
const NUM_TRNASFER: u32 = 100;
/// Delete all entries in TiKV to leave a clean space for following tests.
/// TiKV does not provide an elegant way to do this, so it is done by scanning and deletions.
/// Delete all entris in TiKV to leave a clean space for following tests.
async fn clear_tikv() -> Fallible<()> {
delete_all_raw().await?;
delete_all_txn().await?;
Fallible::Ok(())
}
async fn delete_all_raw() -> Fallible<()> {
let config = Config::new(pd_addrs());
let raw_client = RawClient::new(config).await?.with_key_only(true);
raw_client.delete_range(vec![]..).await?;
Fallible::Ok(())
}
async fn delete_all_txn() -> Fallible<()> {
let config = Config::new(pd_addrs());
let txn_client = TransactionClient::new(config).await?.with_key_only(true);
let mut txn = txn_client.begin().await?;
loop {
let mut keys = txn.scan(vec![].., SCAN_BATCH_SIZE).await?.peekable();
if keys.peek().is_none() {
break;
}
for kv in keys {
txn.delete(kv.into_key()).await?;
}
let cfs = vec![
ColumnFamily::Default,
ColumnFamily::Lock,
ColumnFamily::Write,
];
for cf in cfs {
let config = Config::new(pd_addrs());
let raw_client = RawClient::new(config)
.await?
.with_key_only(true)
.with_cf(cf);
raw_client.delete_range(vec![]..).await?;
}
txn.commit().await?;
Fallible::Ok(())
}
@ -73,6 +54,7 @@ async fn get_timestamp() -> Fallible<()> {
Ok(())
}
// Tests transactional get, put, delete, batch_get
#[tokio::test]
#[serial]
async fn crud() -> Fallible<()> {
@ -202,6 +184,80 @@ async fn raw_bank_transfer() -> Fallible<()> {
Fallible::Ok(())
}
/// Tests transactional API when there are multiple regions.
/// Write large volumes of data to enforce region splitting.
/// In order to test `scan`, data is uniformly inserted.
#[tokio::test]
#[serial]
async fn txn_write_million() -> Fallible<()> {
const NUM_BITS_TXN: u32 = 7;
const NUM_BITS_KEY_PER_TXN: u32 = 3;
let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN);
clear_tikv().await?;
let config = Config::new(pd_addrs());
let client = TransactionClient::new(config).await?;
for i in 0..2u32.pow(NUM_BITS_TXN) {
let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN);
let keys = iter::repeat_with(|| {
let v = cur;
cur = cur.overflowing_add(interval).0;
v
})
.map(|u| u.to_be_bytes().to_vec())
.take(2usize.pow(NUM_BITS_KEY_PER_TXN))
.collect::<Vec<_>>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13
let mut txn = client.begin().await?;
for (k, v) in keys.iter().zip(iter::repeat(1u32.to_be_bytes().to_vec())) {
txn.put(k.clone(), v).await?;
}
txn.commit().await?;
let mut txn = client.begin().await?;
let res = txn.batch_get(keys).await?;
assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
txn.commit().await?;
}
// test scan
let limit = 2u32.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN + 2); // large enough
let txn = client.begin().await?;
let res = txn.scan(vec![].., limit).await?;
assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN));
// scan by small range and combine them
let mut rng = thread_rng();
let mut keys = gen_u32_keys(10, &mut rng)
.iter()
.cloned()
.collect::<Vec<_>>();
keys.sort();
let mut sum = 0;
// empty key to key[0]
let txn = client.begin().await?;
let res = txn.scan(vec![]..keys[0].clone(), limit).await?;
sum += res.count();
// key[i] .. key[i+1]
for i in 0..keys.len() - 1 {
let res = txn
.scan(keys[i].clone()..keys[i + 1].clone(), limit)
.await?;
sum += res.count();
}
// keys[last] to unbounded
let res = txn.scan(keys[keys.len() - 1].clone().., limit).await?;
sum += res.count();
assert_eq!(sum, 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN));
Fallible::Ok(())
}
#[tokio::test]
#[serial]
async fn txn_bank_transfer() -> Fallible<()> {
@ -254,7 +310,7 @@ async fn txn_bank_transfer() -> Fallible<()> {
#[tokio::test]
#[serial]
async fn raw() -> Fallible<()> {
async fn raw_req() -> Fallible<()> {
clear_tikv().await?;
let config = Config::new(pd_addrs());
let client = RawClient::new(config).await?;
@ -285,8 +341,8 @@ async fn raw() -> Fallible<()> {
let res = client
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
.await?;
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());
assert_eq!(res[0], KvPair::new("k3".to_owned(), "v3"));
assert_eq!(res[1], KvPair::new("k4".to_owned(), "v4"));
// k1,k2,k3,k4; delete then get
let res = client.delete("k3".to_owned()).await;

View File

@ -47,8 +47,8 @@ mod test {
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
.await
.unwrap();
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());
assert_eq!(res[0].1, "v3".as_bytes());
assert_eq!(res[1].1, "v4".as_bytes());
// k1,k2,k3,k4; delete then get
let res = client.delete("k3".to_owned()).await;

View File

@ -0,0 +1,217 @@
use crate::{errors::Result, Error};
use std::{io::Write, ptr};
const ENC_GROUP_SIZE: usize = 8;
const ENC_MARKER: u8 = 0xff;
const ENC_ASC_PADDING: [u8; ENC_GROUP_SIZE] = [0; ENC_GROUP_SIZE];
const ENC_DESC_PADDING: [u8; ENC_GROUP_SIZE] = [!0; ENC_GROUP_SIZE];
/// Returns the maximum encoded bytes size.
///
/// Duplicate from components/tikv_util/src/codec/bytes.rs.
pub fn max_encoded_bytes_size(n: usize) -> usize {
(n / ENC_GROUP_SIZE + 1) * (ENC_GROUP_SIZE + 1)
}
pub trait BytesEncoder: Write {
/// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format
///
/// Duplicate from components/tikv_util/src/codec/bytes.rs.
fn encode_bytes(&mut self, key: &[u8], desc: bool) -> Result<()> {
let len = key.len();
let mut index = 0;
let mut buf = [0; ENC_GROUP_SIZE];
while index <= len {
let remain = len - index;
let mut pad: usize = 0;
if remain > ENC_GROUP_SIZE {
self.write_all(adjust_bytes_order(
&key[index..index + ENC_GROUP_SIZE],
desc,
&mut buf,
))?;
} else {
pad = ENC_GROUP_SIZE - remain;
self.write_all(adjust_bytes_order(&key[index..], desc, &mut buf))?;
if desc {
self.write_all(&ENC_DESC_PADDING[..pad])?;
} else {
self.write_all(&ENC_ASC_PADDING[..pad])?;
}
}
self.write_all(adjust_bytes_order(
&[ENC_MARKER - (pad as u8)],
desc,
&mut buf,
))?;
index += ENC_GROUP_SIZE;
}
Ok(())
}
}
impl<T: Write> BytesEncoder for T {}
fn adjust_bytes_order<'a>(bs: &'a [u8], desc: bool, buf: &'a mut [u8]) -> &'a [u8] {
if desc {
let mut buf_idx = 0;
for &b in bs {
buf[buf_idx] = !b;
buf_idx += 1;
}
&buf[..buf_idx]
} else {
bs
}
}
/// Decodes bytes which are encoded by `encode_bytes` before just in place without malloc.
///
/// Duplicate from components/tikv_util/src/codec/bytes.rs.
pub fn decode_bytes_in_place(data: &mut Vec<u8>, desc: bool) -> Result<()> {
if data.is_empty() {
return Ok(());
}
let mut write_offset = 0;
let mut read_offset = 0;
loop {
let marker_offset = read_offset + ENC_GROUP_SIZE;
if marker_offset >= data.len() {
return Err(Error::internal_error(format!(
"unexpected EOF, original key = {:?}",
data
)));
};
unsafe {
// it is semantically equivalent to C's memmove()
// and the src and dest may overlap
// if src == dest do nothing
ptr::copy(
data.as_ptr().add(read_offset),
data.as_mut_ptr().add(write_offset),
ENC_GROUP_SIZE,
);
}
write_offset += ENC_GROUP_SIZE;
// everytime make ENC_GROUP_SIZE + 1 elements as a decode unit
read_offset += ENC_GROUP_SIZE + 1;
// the last byte in decode unit is for marker which indicates pad size
let marker = data[marker_offset];
let pad_size = if desc {
marker as usize
} else {
(ENC_MARKER - marker) as usize
};
if pad_size > 0 {
if pad_size > ENC_GROUP_SIZE {
return Err(Error::internal_error("invalid key padding"));
}
// check the padding pattern whether validate or not
let padding_slice = if desc {
&ENC_DESC_PADDING[..pad_size]
} else {
&ENC_ASC_PADDING[..pad_size]
};
if &data[write_offset - pad_size..write_offset] != padding_slice {
return Err(Error::internal_error("invalid key padding"));
}
unsafe {
data.set_len(write_offset - pad_size);
}
if desc {
for k in data {
*k = !*k;
}
}
return Ok(());
}
}
}
#[cfg(test)]
pub mod test {
use super::*;
fn encode_bytes(bs: &[u8]) -> Vec<u8> {
encode_order_bytes(bs, false)
}
fn encode_bytes_desc(bs: &[u8]) -> Vec<u8> {
encode_order_bytes(bs, true)
}
fn encode_order_bytes(bs: &[u8], desc: bool) -> Vec<u8> {
let cap = max_encoded_bytes_size(bs.len());
let mut encoded = Vec::with_capacity(cap);
encoded.encode_bytes(bs, desc).unwrap();
encoded
}
#[test]
fn test_enc_dec_bytes() {
let pairs = vec![
(
vec![],
vec![0, 0, 0, 0, 0, 0, 0, 0, 247],
vec![255, 255, 255, 255, 255, 255, 255, 255, 8],
),
(
vec![0],
vec![0, 0, 0, 0, 0, 0, 0, 0, 248],
vec![255, 255, 255, 255, 255, 255, 255, 255, 7],
),
(
vec![1, 2, 3],
vec![1, 2, 3, 0, 0, 0, 0, 0, 250],
vec![254, 253, 252, 255, 255, 255, 255, 255, 5],
),
(
vec![1, 2, 3, 0],
vec![1, 2, 3, 0, 0, 0, 0, 0, 251],
vec![254, 253, 252, 255, 255, 255, 255, 255, 4],
),
(
vec![1, 2, 3, 4, 5, 6, 7],
vec![1, 2, 3, 4, 5, 6, 7, 0, 254],
vec![254, 253, 252, 251, 250, 249, 248, 255, 1],
),
(
vec![0, 0, 0, 0, 0, 0, 0, 0],
vec![0, 0, 0, 0, 0, 0, 0, 0, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247],
vec![
255, 255, 255, 255, 255, 255, 255, 255, 0, 255, 255, 255, 255, 255, 255, 255,
255, 8,
],
),
(
vec![1, 2, 3, 4, 5, 6, 7, 8],
vec![1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247],
vec![
254, 253, 252, 251, 250, 249, 248, 247, 0, 255, 255, 255, 255, 255, 255, 255,
255, 8,
],
),
(
vec![1, 2, 3, 4, 5, 6, 7, 8, 9],
vec![1, 2, 3, 4, 5, 6, 7, 8, 255, 9, 0, 0, 0, 0, 0, 0, 0, 248],
vec![
254, 253, 252, 251, 250, 249, 248, 247, 0, 246, 255, 255, 255, 255, 255, 255,
255, 7,
],
),
];
for (source, mut asc, mut desc) in pairs {
assert_eq!(encode_bytes(&source), asc);
assert_eq!(encode_bytes_desc(&source), desc);
decode_bytes_in_place(&mut asc, false).unwrap();
assert_eq!(source, asc);
decode_bytes_in_place(&mut desc, true).unwrap();
assert_eq!(source, desc);
}
}
}

View File

@ -1,6 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use super::HexRepr;
use crate::kv::codec::{self, BytesEncoder};
use kvproto::kvrpcpb;
#[allow(unused_imports)]
#[cfg(test)]
@ -104,6 +105,14 @@ impl Key {
Bound::Excluded(self)
}
}
#[inline]
pub fn to_encoded(&self) -> Key {
let len = codec::max_encoded_bytes_size(self.0.len());
let mut encoded = Vec::with_capacity(len);
encoded.encode_bytes(&self.0, false).unwrap();
Key(encoded)
}
}
impl From<Vec<u8>> for Key {

View File

@ -2,6 +2,7 @@
use std::{fmt, u8};
mod bound_range;
pub mod codec;
mod key;
mod kvpair;
mod value;