From 316a194002fd7dbf85b90cb41671a3cf21d6660d Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 21 Sep 2020 15:51:10 +0800 Subject: [PATCH 01/10] update readme: dependency, limit and code snippet Signed-off-by: ekexium --- README.md | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index e7d77a3..62281cb 100644 --- a/README.md +++ b/README.md @@ -8,33 +8,56 @@ This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a distributed transactional Key-Value database written in Rust. -With this crate you can easily connect to any TiKV deployment, interact with it, and mutate the data it contains. +With this crate you can easily connect to any TiKV deployment, interact with it, and mutate the data it contains. It uses async/await internally and exposes some `async fn` APIs as well. This is an open source (Apache 2) project hosted by the Cloud Native Computing Foundation (CNCF) and maintained by the TiKV Authors. *We'd love it if you joined us in improving this project.* -## Using the client +## Getting started -The TiKV client is a Rust library (crate). It uses async/await internally and exposes some `async fn` APIs as well. - -To use this crate in your project, add it as a dependency in your `Cargo.toml`: +The TiKV client is a Rust library (crate). To use this crate in your project, add following dependencies in your `Cargo.toml`: ```toml [dependencies] -# ...Your other dependencies... tikv-client = { git = "https://github.com/tikv/client-rust.git" } + +[patch.crates-io] +raft-proto = { git = "https://github.com/tikv/raft-rs", rev = "e624c1d48460940a40d8aa69b5329460d9af87dd" } ``` The client requires a Git dependency until we can [publish it](https://github.com/tikv/client-rust/issues/32). -There are [examples](examples) which show how to use the client in a Rust program. +The client provides two modes to interact with TiKV: raw and transactional. +In the current version (0.0.0), the transactional API only supports optimistic transactions. + +Important note: It is **not recommended or supported** to use both the raw and transactional APIs on the same database. + +Raw mode: + +```rust +let config = Config::new(vec!["127.0.0.1:2379"]); +let client = RawClient::new(config).await?; +client.put("key".to_owned(), "value".to_owned()).await; +let value = client.get("key".to_owned()).await; +``` + +Transactional mode: + +```rust +let config = Config::new(vec!["127.0.0.1:2379"]); +let txn_client = TransactionClient::new(config).await?; +let mut txn = txn_client.begin().await?; +txn.set("key".to_owned(), "value".to_owned()).await?; +let value = txn.get("key".to_owned()).await; +txn.commit().await?; +``` + +There are some [examples](examples) which show how to use the client in a Rust program. ## Access the documentation We recommend using the cargo-generated documentation to browse and understand the API. We've done our best to include ample, tested, and understandable examples. -You can visit [docs.rs/tikv-client](https://docs.rs/tikv-client/), or build the documentation yourself. - You can access the documentation on your machine by running the following in any project that depends on `tikv-client`. ```bash From 914ed7238933e9930c50636855417462a9ae7eee Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 23 Sep 2020 15:07:35 +0800 Subject: [PATCH 02/10] readme: add API list and intro to types Signed-off-by: ekexium --- README.md | 42 +++++++++++++++++++++++++++++++++++++++++- src/raw/client.rs | 3 +++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 62281cb..2d77684 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,8 @@ In the current version (0.0.0), the transactional API only supports optimistic t Important note: It is **not recommended or supported** to use both the raw and transactional APIs on the same database. +### Code examples + Raw mode: ```rust @@ -46,13 +48,51 @@ Transactional mode: let config = Config::new(vec!["127.0.0.1:2379"]); let txn_client = TransactionClient::new(config).await?; let mut txn = txn_client.begin().await?; -txn.set("key".to_owned(), "value".to_owned()).await?; +txn.put("key".to_owned(), "value".to_owned()).await?; let value = txn.get("key".to_owned()).await; txn.commit().await?; ``` There are some [examples](examples) which show how to use the client in a Rust program. +### API + +| Mode | Request | Main parameter type | Successful result type | +| ---- | -------------- | ------------------- | ---------------------- | +| raw | `put` | `KvPair` | `()` | +| raw | `get` | `Key` | `Option` | +| raw | `delete` | `Key` | `()` | +| raw | `scan` | `BoundRange` | `Vec` | +| raw | `batch_put` | `Iter` | `()` | +| raw | `batch_get` | `Iter` | `Vec` | +| raw | `batch_delete` | `Iter` | `()` | +| raw | `batch_scan` | `Iter` | `Vec` | +| raw | `delete_range` | `BoundRange` | `()` | +| txn | `put` | `KvPair` | `()` | +| txn | `get` | `Key` | `Option` | +| txn | `delete` | `Key` | `()` | +| txn | `scan` | `BoundRange` | `Iter` | +| txn | `batch_get` | `Iter` | `Iter` | +| txn | `lock_keys` | `KvPair` | `()` | + +For detailed behavior of each reqeust, please refer to the [doc](#Access-the-documentation). + +### Useful types + +To use the client, there are 4 types you will need. + +`Key` is simply a vector of bytes(`Vec`). `String` and `Vec` implements `Into`, so you can directly pass them to clients. + +`Value` is just an alias of `Vec`. + +`KvPair` is a tuple consisting of a `Key` and a `Value`. It also provides some convenience methods for conversion to and from other types. + +`BoundRange` is used for range related requests like `scan`. It implements `From` for usual ranges so you can just create a range and pass them to the request.For instance, `client.scan("k2".to_owned()..="k5".to_owned(), 5)` or `client.delete_range(vec![]..)`. + + + + + ## Access the documentation We recommend using the cargo-generated documentation to browse and understand the API. We've done diff --git a/src/raw/client.rs b/src/raw/client.rs index a9917b2..0595fc5 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -104,6 +104,7 @@ impl Client { /// /// Once resolved this request will result in the fetching of the values associated with the /// given keys. + /// It only returns the entries that exist. /// /// ```rust,no_run /// # use tikv_client::{KvPair, Config, RawClient}; @@ -173,6 +174,7 @@ impl Client { /// Create a new 'delete' request. /// /// Once resolved this request will result in the deletion of the given key. + /// It does not return an error if the key does not exist. /// /// ```rust,no_run /// # use tikv_client::{Key, Config, RawClient}; @@ -193,6 +195,7 @@ impl Client { /// Create a new 'batch delete' request. /// /// Once resolved this request will result in the deletion of the given keys. + /// It does not return an error if some of the keys do not exist and will delete the others. /// /// ```rust,no_run /// # use tikv_client::{Config, RawClient}; From 398a673a5fa925820a3acf317338f7e75a6c1a1a Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 23 Sep 2020 16:39:10 +0800 Subject: [PATCH 03/10] readme: separate raw and txn API table Signed-off-by: ekexium --- README.md | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 2d77684..be7fe17 100644 --- a/README.md +++ b/README.md @@ -57,23 +57,30 @@ There are some [examples](examples) which show how to use the client in a Rust p ### API -| Mode | Request | Main parameter type | Successful result type | -| ---- | -------------- | ------------------- | ---------------------- | -| raw | `put` | `KvPair` | `()` | -| raw | `get` | `Key` | `Option` | -| raw | `delete` | `Key` | `()` | -| raw | `scan` | `BoundRange` | `Vec` | -| raw | `batch_put` | `Iter` | `()` | -| raw | `batch_get` | `Iter` | `Vec` | -| raw | `batch_delete` | `Iter` | `()` | -| raw | `batch_scan` | `Iter` | `Vec` | -| raw | `delete_range` | `BoundRange` | `()` | -| txn | `put` | `KvPair` | `()` | -| txn | `get` | `Key` | `Option` | -| txn | `delete` | `Key` | `()` | -| txn | `scan` | `BoundRange` | `Iter` | -| txn | `batch_get` | `Iter` | `Iter` | -| txn | `lock_keys` | `KvPair` | `()` | +#### Raw requests + +| Request | Main parameter type | Successful result type | +| -------------- | ------------------- | ---------------------- | +| `put` | `KvPair` | `()` | +| `get` | `Key` | `Option` | +| `delete` | `Key` | `()` | +| `scan` | `BoundRange` | `Vec` | +| `batch_put` | `Iter` | `()` | +| `batch_get` | `Iter` | `Vec` | +| `batch_delete` | `Iter` | `()` | +| `batch_scan` | `Iter` | `Vec` | +| `delete_range` | `BoundRange` | `()` | + +#### Transactional requests + +| Request | Main parameter type | Successful result type | +| -------------- | ------------------- | ---------------------- | +| `put` | `KvPair` | `()` | +| `get` | `Key` | `Option` | +| `delete` | `Key` | `()` | +| `scan` | `BoundRange` | `Iter` | +| `batch_get` | `Iter` | `Iter` | +| `lock_keys` | `KvPair` | `()` | For detailed behavior of each reqeust, please refer to the [doc](#Access-the-documentation). From 344f7cce759f810a99c83498272a3a1d581e7607 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 28 Sep 2020 10:45:33 +0800 Subject: [PATCH 04/10] use try_for_each_concurrent in reduce() Signed-off-by: ekexium --- src/raw/requests.rs | 4 +++- src/transaction/requests.rs | 12 +++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 681fca7..fa710fe 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -330,7 +330,9 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest { fn reduce( results: BoxStream<'static, Result>, ) -> BoxFuture<'static, Result> { - results.try_for_each(|_| future::ready(Ok(()))).boxed() + results + .try_for_each_concurrent(None, |_| future::ready(Ok(()))) + .boxed() } } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 08d0663..78e3007 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -346,7 +346,9 @@ impl KvRequest for kvrpcpb::PrewriteRequest { fn reduce( results: BoxStream<'static, Result>, ) -> BoxFuture<'static, Result> { - results.try_for_each(|_| future::ready(Ok(()))).boxed() + results + .try_for_each_concurrent(None, |_| future::ready(Ok(()))) + .boxed() } } @@ -405,7 +407,9 @@ impl KvRequest for kvrpcpb::CommitRequest { fn reduce( results: BoxStream<'static, Result>, ) -> BoxFuture<'static, Result> { - results.try_for_each(|_| future::ready(Ok(()))).boxed() + results + .try_for_each_concurrent(None, |_| future::ready(Ok(()))) + .boxed() } } @@ -450,7 +454,9 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest { fn reduce( results: BoxStream<'static, Result>, ) -> BoxFuture<'static, Result> { - results.try_for_each(|_| future::ready(Ok(()))).boxed() + results + .try_for_each_concurrent(None, |_| future::ready(Ok(()))) + .boxed() } } From 3f8c3a7200a2479d860cc5ab122bab95f3299c27 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 28 Sep 2020 11:19:36 +0800 Subject: [PATCH 05/10] fix a bug in examples/raw Signed-off-by: ekexium --- examples/raw.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index 2fb3492..da10551 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -94,11 +94,7 @@ async fn main() -> Result<()> { let keys: Vec<_> = pairs.into_iter().map(|p| p.key().clone()).collect(); assert_eq!( &keys, - &[ - Key::from("k1".to_owned()), - Key::from("k2".to_owned()), - Key::from("k3".to_owned()) - ] + &[Key::from("k1".to_owned()), Key::from("k2".to_owned()),] ); println!("Scaning from {:?} to {:?} gives: {:?}", start, end, keys); From 3ffcb6f0ed5c24b4c05502595b01f1681d1de559 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 28 Sep 2020 17:34:03 +0800 Subject: [PATCH 06/10] add some descriptions on noteworthy behavior of requests Signed-off-by: ekexium --- README.md | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index be7fe17..4d26400 100644 --- a/README.md +++ b/README.md @@ -59,28 +59,28 @@ There are some [examples](examples) which show how to use the client in a Rust p #### Raw requests -| Request | Main parameter type | Successful result type | -| -------------- | ------------------- | ---------------------- | -| `put` | `KvPair` | `()` | -| `get` | `Key` | `Option` | -| `delete` | `Key` | `()` | -| `scan` | `BoundRange` | `Vec` | -| `batch_put` | `Iter` | `()` | -| `batch_get` | `Iter` | `Vec` | -| `batch_delete` | `Iter` | `()` | -| `batch_scan` | `Iter` | `Vec` | -| `delete_range` | `BoundRange` | `()` | +| Request | Main parameter type | Successful result type | Noteworthy Behavior | +| -------------- | ------------------- | ---------------------- | --------------------------------------------- | +| `put` | `KvPair` | `()` | | +| `get` | `Key` | `Option` | | +| `delete` | `Key` | `()` | | +| `scan` | `BoundRange` | `Vec` | | +| `batch_put` | `Iter` | `()` | | +| `batch_get` | `Iter` | `Vec` | Skip non-existent keys; Does not retain order | +| `batch_delete` | `Iter` | `()` | | +| `batch_scan` | `Iter` | `Vec` | Results are flattened; Retain order of ranges | +| `delete_range` | `BoundRange` | `()` | | #### Transactional requests -| Request | Main parameter type | Successful result type | -| -------------- | ------------------- | ---------------------- | -| `put` | `KvPair` | `()` | -| `get` | `Key` | `Option` | -| `delete` | `Key` | `()` | -| `scan` | `BoundRange` | `Iter` | -| `batch_get` | `Iter` | `Iter` | -| `lock_keys` | `KvPair` | `()` | +| Request | Main parameter type | Successful result type | Noteworthy Behavior | +| ----------- | ------------------- | ---------------------- | --------------------------------------------- | +| `put` | `KvPair` | `()` | | +| `get` | `Key` | `Option` | | +| `delete` | `Key` | `()` | | +| `scan` | `BoundRange` | `Iter` | | +| `batch_get` | `Iter` | `Iter` | Skip non-existent keys; Does not retain order | +| `lock_keys` | `KvPair` | `()` | | For detailed behavior of each reqeust, please refer to the [doc](#Access-the-documentation). From ad8ef075af4ca27c231203162e21f08c2e639ea6 Mon Sep 17 00:00:00 2001 From: George Teo Date: Mon, 28 Sep 2020 23:05:39 -0700 Subject: [PATCH 07/10] Add codec for encoding region for transaction client (#162) Add codec for encoding region for transaction client. Fix some other bugs. Signed-off-by: George Teo Co-authored-by: ekexium --- README.md | 2 +- src/mock.rs | 1 + src/pd/client.rs | 66 +++++++-- src/pd/retry.rs | 1 + src/raw/client.rs | 3 +- src/raw/requests.rs | 3 + src/request.rs | 2 + src/transaction/client.rs | 9 +- src/transaction/requests.rs | 4 + src/transaction/transaction.rs | 3 + tests/integration_tests.rs | 132 +++++++++++++----- tests/mock_tikv_tests.rs | 4 +- tikv-client-common/src/kv/codec.rs | 217 +++++++++++++++++++++++++++++ tikv-client-common/src/kv/key.rs | 9 ++ tikv-client-common/src/kv/mod.rs | 1 + 15 files changed, 404 insertions(+), 53 deletions(-) create mode 100644 tikv-client-common/src/kv/codec.rs diff --git a/README.md b/README.md index e7d77a3..b336ec7 100644 --- a/README.md +++ b/README.md @@ -46,4 +46,4 @@ cargo doc --package tikv-client --open This crate supports Rust 1.40 and above. -For development, a nightly Rust compiler is needed to compile the tests. \ No newline at end of file +For development, a nightly Rust compiler is needed to compile the tests. diff --git a/src/mock.rs b/src/mock.rs index c33bd14..257c8e8 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -32,6 +32,7 @@ pub async fn pd_rpc_client() -> PdRpcClient { MockCluster, )) }, + false, ) .await .unwrap() diff --git a/src/pd/client.rs b/src/pd/client.rs index ec9c060..b03dbfa 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -1,10 +1,11 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. -use crate::{pd::RetryClient, Config, Key, Region, RegionId}; +use crate::{pd::RetryClient, tikv_client_common::kv::codec, Config, Key, Region, RegionId}; use futures::{ future::{ready, BoxFuture, Either}, prelude::*, stream::BoxStream, + FutureExt, TryFutureExt, }; use grpcio::{EnvBuilder, Environment}; use std::{ @@ -23,20 +24,42 @@ use tikv_client_store::{KvClient, KvConnect, Store, TikvConnect}; const CQ_COUNT: usize = 1; const CLIENT_PREFIX: &str = "tikv-client"; +// The PdClient handles all the encoding stuff. +// +// Raw APIs does not require encoding/decoding at all. +// All keys in all places (client, PD, TiKV) are in the same encoding (here called "raw format"). +// +// Transactional APIs are a bit complicated. +// We need encode and decode keys when we communicate with PD, but not with TiKV. +// We encode keys before sending requests to PD, and decode keys in the response from PD. +// That's all we need to do with encoding. +// +// client -encoded-> PD, PD -encoded-> client +// client -raw-> TiKV, TiKV -raw-> client +// +// The reason for the behavior is that in transaction mode, TiKV encode keys for MVCC. +// In raw mode, TiKV doesn't encode them. +// TiKV tells PD using its internal representation, whatever the encoding is. +// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff. +// pub trait PdClient: Send + Sync + 'static { type KvClient: KvClient + Send + Sync + 'static; + // In transactional API, `region` is decoded (keys in raw format). fn map_region_to_store( self: Arc, region: Region, ) -> BoxFuture<'static, Result>>; + // In transactional API, the key and returned region are both decoded (keys in raw format). fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result>; + // In transactional API, the returned region is decoded (keys in raw format) fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result>; fn get_timestamp(self: Arc) -> BoxFuture<'static, Result>; + // In transactional API, `key` is in raw format fn store_for_key( self: Arc, key: &Key, @@ -80,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static { .boxed() } - // Returns a Steam which iterates over the contexts for each region covered by range. + // Returns a Stream which iterates over the contexts for each region covered by range. fn stores_for_range( self: Arc, range: BoundRange, @@ -92,12 +115,15 @@ pub trait PdClient: Send + Sync + 'static { Some(sk) => sk, }; let end_key = end_key.clone(); - let this = self.clone(); Either::Left(self.region_for_key(&start_key).and_then(move |region| { let region_end = region.end_key(); this.map_region_to_store(region).map_ok(move |store| { - if end_key.map(|x| x <= region_end).unwrap_or(false) || region_end.is_empty() { + if end_key + .map(|x| x <= region_end && !x.is_empty()) + .unwrap_or(false) + || region_end.is_empty() + { return Some((None, store)); } Some((Some(region_end), store)) @@ -158,6 +184,14 @@ pub trait PdClient: Send + Sync + 'static { }) .boxed() } + + fn decode_region(mut region: Region, enable_codec: bool) -> Result { + if enable_codec { + codec::decode_bytes_in_place(&mut region.region.mut_start_key(), false)?; + codec::decode_bytes_in_place(&mut region.region.mut_end_key(), false)?; + } + Ok(region) + } } /// This client converts requests for the logical TiKV cluster into requests @@ -167,6 +201,7 @@ pub struct PdRpcClient>>, timeout: Duration, + enable_codec: bool, } impl PdClient for PdRpcClient { @@ -191,12 +226,24 @@ impl PdClient for PdRpcClient { } fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result> { - let key: &[u8] = key.into(); - self.pd.clone().get_region(key.to_owned()).boxed() + let enable_codec = self.enable_codec; + let key = if enable_codec { + key.to_encoded().into() + } else { + key.clone().into() + }; + let region = self.pd.clone().get_region(key).boxed(); + region + .ok_and_then(move |region| Self::decode_region(region, enable_codec)) + .boxed() } fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result> { - self.pd.clone().get_region_by_id(id).boxed() + let region = self.pd.clone().get_region_by_id(id).boxed(); + let enable_codec = self.enable_codec; + region + .ok_and_then(move |region| Self::decode_region(region, enable_codec)) + .boxed() } fn get_timestamp(self: Arc) -> BoxFuture<'static, Result> { @@ -205,13 +252,14 @@ impl PdClient for PdRpcClient { } impl PdRpcClient { - pub async fn connect(config: &Config) -> Result { + pub async fn connect(config: &Config, enable_codec: bool) -> Result { PdRpcClient::new( config, |env, security_mgr| TikvConnect::new(env, security_mgr), |env, security_mgr| { RetryClient::connect(env, &config.pd_endpoints, security_mgr, config.timeout) }, + enable_codec, ) .await } @@ -222,6 +270,7 @@ impl PdRpcClient { config: &Config, kv_connect: MakeKvC, pd: MakePd, + enable_codec: bool, ) -> Result> where PdFut: Future>>, @@ -251,6 +300,7 @@ impl PdRpcClient { kv_client_cache, kv_connect: kv_connect(env, security_mgr), timeout: config.timeout, + enable_codec, }) } diff --git a/src/pd/retry.rs b/src/pd/retry.rs index f3cb383..e8baa5e 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -96,6 +96,7 @@ impl RetryClient { } // These get_* functions will try multiple times to make a request, reconnecting as necessary. + // It does not know about encoding. Caller should take care of it. pub async fn get_region(self: Arc, key: Vec) -> Result { retry!(self, "get_region", |cluster| { let key = key.clone(); diff --git a/src/raw/client.rs b/src/raw/client.rs index a9917b2..a425b42 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -26,7 +26,7 @@ impl Client { /// # }); /// ``` pub async fn new(config: Config) -> Result { - let rpc = Arc::new(PdRpcClient::connect(&config).await?); + let rpc = Arc::new(PdRpcClient::connect(&config, false).await?); Ok(Client { rpc, cf: None, @@ -104,6 +104,7 @@ impl Client { /// /// Once resolved this request will result in the fetching of the values associated with the /// given keys. + /// Non-existent entries will be skipped. The order of the keys is not retained. /// /// ```rust,no_run /// # use tikv_client::{KvPair, Config, RawClient}; diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 681fca7..25700cf 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -86,6 +86,7 @@ impl KvRequest for kvrpcpb::RawBatchGetRequest { &mut self, pd_client: Arc, ) -> BoxStream<'static, Result<(Self::KeyData, Store)>> { + self.keys.sort(); let keys = mem::take(&mut self.keys); store_stream_for_keys(keys, pd_client) } @@ -182,6 +183,7 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest { &mut self, pd_client: Arc, ) -> BoxStream<'static, Result<(Self::KeyData, Store)>> { + self.pairs.sort_by(|a, b| a.key.cmp(&b.key)); let pairs = mem::take(&mut self.pairs); store_stream_for_keys(pairs, pd_client) } @@ -271,6 +273,7 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest { &mut self, pd_client: Arc, ) -> BoxStream<'static, Result<(Self::KeyData, Store)>> { + self.keys.sort(); let keys = mem::take(&mut self.keys); store_stream_for_keys(keys, pd_client) } diff --git a/src/request.rs b/src/request.rs index f1db65e..d69ac56 100644 --- a/src/request.rs +++ b/src/request.rs @@ -164,6 +164,7 @@ where .boxed() } +/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order pub fn store_stream_for_keys( key_data: I, pd_client: Arc, @@ -208,6 +209,7 @@ fn bound_range(region_range: (Key, Key), range: BoundRange) -> (Key, Key) { let up = match (upper.is_empty(), upper_bound) { (_, None) => upper, (true, Some(ub)) => ub, + (_, Some(ub)) if ub.is_empty() => upper, (_, Some(ub)) => min(upper, ub), }; (max(lower, lower_bound), up) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index fe1c8dd..d6bc6ab 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -1,8 +1,11 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use crate::transaction::{Snapshot, Transaction}; +use crate::{ + pd::PdRpcClient, + transaction::{Snapshot, Transaction}, +}; -use crate::pd::{PdClient, PdRpcClient}; +use crate::pd::PdClient; use futures::executor::ThreadPool; use std::sync::Arc; use tikv_client_common::{Config, Result, Timestamp}; @@ -30,7 +33,7 @@ impl Client { let bg_worker = ThreadPool::new()?; // TODO: PdRpcClient::connect currently uses a blocking implementation. // Make it asynchronous later. - let pd = Arc::new(PdRpcClient::connect(&config).await?); + let pd = Arc::new(PdRpcClient::connect(&config, true).await?); Ok(Client { pd, bg_worker, diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 08d0663..e5f956d 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -88,6 +88,7 @@ impl KvRequest for kvrpcpb::BatchGetRequest { &mut self, pd_client: Arc, ) -> BoxStream<'static, Result<(Self::KeyData, Store)>> { + self.keys.sort(); let keys = mem::take(&mut self.keys); store_stream_for_keys(keys, pd_client) } @@ -337,6 +338,7 @@ impl KvRequest for kvrpcpb::PrewriteRequest { &mut self, pd_client: Arc, ) -> BoxStream<'static, Result<(Self::KeyData, Store)>> { + self.mutations.sort_by(|a, b| a.key.cmp(&b.key)); let mutations = mem::take(&mut self.mutations); store_stream_for_keys(mutations, pd_client) } @@ -396,6 +398,7 @@ impl KvRequest for kvrpcpb::CommitRequest { &mut self, pd_client: Arc, ) -> BoxStream<'static, Result<(Self::KeyData, Store)>> { + self.keys.sort(); let keys = mem::take(&mut self.keys); store_stream_for_keys(keys, pd_client) } @@ -441,6 +444,7 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest { &mut self, pd_client: Arc, ) -> BoxStream<'static, Result<(Self::KeyData, Store)>> { + self.keys.sort(); let keys = mem::take(&mut self.keys); store_stream_for_keys(keys, pd_client) } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index a24e5a1..af0420c 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -77,6 +77,8 @@ impl Transaction { /// Gets the values associated with the given keys. /// + /// Non-existent entries will be skipped. The order of the keys is not retained. + /// /// ```rust,no_run /// # use tikv_client::{Key, Value, Config, transaction::Client}; /// # use futures::prelude::*; @@ -304,6 +306,7 @@ impl TwoPhaseCommitter { } }) .await?; + Ok(commit_version) } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index f5e2669..ac930ed 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -7,50 +7,31 @@ use serial_test::serial; use std::{ collections::{HashMap, HashSet}, convert::TryInto, - env, + env, iter, +}; +use tikv_client::{ + ColumnFamily, Config, Key, KvPair, RawClient, Result, Transaction, TransactionClient, Value, }; -use tikv_client::{Config, Key, RawClient, Result, Transaction, TransactionClient, Value}; - -/// The limit of scan in each iteration in `clear_tikv`. -const SCAN_BATCH_SIZE: u32 = 1000; // Parameters used in test const NUM_PEOPLE: u32 = 100; const NUM_TRNASFER: u32 = 100; /// Delete all entris in TiKV to leave a clean space for following tests. -/// TiKV does not provide an elegant way to do this, so it is done by scanning and deletions. async fn clear_tikv() -> Fallible<()> { - delete_all_raw().await?; - delete_all_txn().await?; - Fallible::Ok(()) -} - -async fn delete_all_raw() -> Fallible<()> { - let config = Config::new(pd_addrs()); - let raw_client = RawClient::new(config).await?.with_key_only(true); - raw_client.delete_range(vec![]..).await?; - Fallible::Ok(()) -} - -async fn delete_all_txn() -> Fallible<()> { - let config = Config::new(pd_addrs()); - let txn_client = TransactionClient::new(config).await?.with_key_only(true); - let mut txn = txn_client.begin().await?; - - loop { - let mut keys = txn.scan(vec![].., SCAN_BATCH_SIZE).await?.peekable(); - - if keys.peek().is_none() { - break; - } - - for kv in keys { - txn.delete(kv.into_key()).await?; - } + let cfs = vec![ + ColumnFamily::Default, + ColumnFamily::Lock, + ColumnFamily::Write, + ]; + for cf in cfs { + let config = Config::new(pd_addrs()); + let raw_client = RawClient::new(config) + .await? + .with_key_only(true) + .with_cf(cf); + raw_client.delete_range(vec![]..).await?; } - - txn.commit().await?; Fallible::Ok(()) } @@ -73,6 +54,7 @@ async fn get_timestamp() -> Fallible<()> { Ok(()) } +// Tests transactional get, put, delete, batch_get #[tokio::test] #[serial] async fn crud() -> Fallible<()> { @@ -200,6 +182,80 @@ async fn raw_bank_transfer() -> Fallible<()> { Fallible::Ok(()) } +/// Tests transactional API when there are multiple regions. +/// Write large volumes of data to enforce region splitting. +/// In order to test `scan`, data is uniformly inserted. +#[tokio::test] +#[serial] +async fn txn_write_million() -> Fallible<()> { + const NUM_BITS_TXN: u32 = 7; + const NUM_BITS_KEY_PER_TXN: u32 = 3; + let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN); + + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = TransactionClient::new(config).await?; + + for i in 0..2u32.pow(NUM_BITS_TXN) { + let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN); + let keys = iter::repeat_with(|| { + let v = cur; + cur = cur.overflowing_add(interval).0; + v + }) + .map(|u| u.to_be_bytes().to_vec()) + .take(2usize.pow(NUM_BITS_KEY_PER_TXN)) + .collect::>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13 + let mut txn = client.begin().await?; + for (k, v) in keys.iter().zip(iter::repeat(1u32.to_be_bytes().to_vec())) { + txn.put(k.clone(), v).await?; + } + txn.commit().await?; + + let mut txn = client.begin().await?; + let res = txn.batch_get(keys).await?; + assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN)); + txn.commit().await?; + } + + // test scan + let limit = 2u32.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN + 2); // large enough + let txn = client.begin().await?; + let res = txn.scan(vec![].., limit).await?; + assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN)); + + // scan by small range and combine them + let mut rng = thread_rng(); + let mut keys = gen_u32_keys(10, &mut rng) + .iter() + .cloned() + .collect::>(); + keys.sort(); + + let mut sum = 0; + + // empty key to key[0] + let txn = client.begin().await?; + let res = txn.scan(vec![]..keys[0].clone(), limit).await?; + sum += res.count(); + + // key[i] .. key[i+1] + for i in 0..keys.len() - 1 { + let res = txn + .scan(keys[i].clone()..keys[i + 1].clone(), limit) + .await?; + sum += res.count(); + } + + // keys[last] to unbounded + let res = txn.scan(keys[keys.len() - 1].clone().., limit).await?; + sum += res.count(); + + assert_eq!(sum, 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN)); + + Fallible::Ok(()) +} + #[tokio::test] #[serial] async fn txn_bank_transfer() -> Fallible<()> { @@ -252,7 +308,7 @@ async fn txn_bank_transfer() -> Fallible<()> { #[tokio::test] #[serial] -async fn raw() -> Fallible<()> { +async fn raw_req() -> Fallible<()> { clear_tikv().await?; let config = Config::new(pd_addrs()); let client = RawClient::new(config).await?; @@ -283,8 +339,8 @@ async fn raw() -> Fallible<()> { let res = client .batch_get(vec!["k4".to_owned(), "k3".to_owned()]) .await?; - assert_eq!(res[0].1, "v4".as_bytes()); - assert_eq!(res[1].1, "v3".as_bytes()); + assert_eq!(res[0], KvPair::new("k3".to_owned(), "v3")); + assert_eq!(res[1], KvPair::new("k4".to_owned(), "v4")); // k1,k2,k3,k4; delete then get let res = client.delete("k3".to_owned()).await; diff --git a/tests/mock_tikv_tests.rs b/tests/mock_tikv_tests.rs index 8bf6b50..b087da7 100644 --- a/tests/mock_tikv_tests.rs +++ b/tests/mock_tikv_tests.rs @@ -47,8 +47,8 @@ mod test { .batch_get(vec!["k4".to_owned(), "k3".to_owned()]) .await .unwrap(); - assert_eq!(res[0].1, "v4".as_bytes()); - assert_eq!(res[1].1, "v3".as_bytes()); + assert_eq!(res[0].1, "v3".as_bytes()); + assert_eq!(res[1].1, "v4".as_bytes()); // k1,k2,k3,k4; delete then get let res = client.delete("k3".to_owned()).await; diff --git a/tikv-client-common/src/kv/codec.rs b/tikv-client-common/src/kv/codec.rs new file mode 100644 index 0000000..dd9b9b4 --- /dev/null +++ b/tikv-client-common/src/kv/codec.rs @@ -0,0 +1,217 @@ +use crate::{errors::Result, Error}; +use std::{io::Write, ptr}; + +const ENC_GROUP_SIZE: usize = 8; +const ENC_MARKER: u8 = 0xff; +const ENC_ASC_PADDING: [u8; ENC_GROUP_SIZE] = [0; ENC_GROUP_SIZE]; +const ENC_DESC_PADDING: [u8; ENC_GROUP_SIZE] = [!0; ENC_GROUP_SIZE]; + +/// Returns the maximum encoded bytes size. +/// +/// Duplicate from components/tikv_util/src/codec/bytes.rs. +pub fn max_encoded_bytes_size(n: usize) -> usize { + (n / ENC_GROUP_SIZE + 1) * (ENC_GROUP_SIZE + 1) +} + +pub trait BytesEncoder: Write { + /// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format + /// + /// Duplicate from components/tikv_util/src/codec/bytes.rs. + fn encode_bytes(&mut self, key: &[u8], desc: bool) -> Result<()> { + let len = key.len(); + let mut index = 0; + let mut buf = [0; ENC_GROUP_SIZE]; + while index <= len { + let remain = len - index; + let mut pad: usize = 0; + if remain > ENC_GROUP_SIZE { + self.write_all(adjust_bytes_order( + &key[index..index + ENC_GROUP_SIZE], + desc, + &mut buf, + ))?; + } else { + pad = ENC_GROUP_SIZE - remain; + self.write_all(adjust_bytes_order(&key[index..], desc, &mut buf))?; + if desc { + self.write_all(&ENC_DESC_PADDING[..pad])?; + } else { + self.write_all(&ENC_ASC_PADDING[..pad])?; + } + } + self.write_all(adjust_bytes_order( + &[ENC_MARKER - (pad as u8)], + desc, + &mut buf, + ))?; + index += ENC_GROUP_SIZE; + } + Ok(()) + } +} + +impl BytesEncoder for T {} + +fn adjust_bytes_order<'a>(bs: &'a [u8], desc: bool, buf: &'a mut [u8]) -> &'a [u8] { + if desc { + let mut buf_idx = 0; + for &b in bs { + buf[buf_idx] = !b; + buf_idx += 1; + } + &buf[..buf_idx] + } else { + bs + } +} + +/// Decodes bytes which are encoded by `encode_bytes` before just in place without malloc. +/// +/// Duplicate from components/tikv_util/src/codec/bytes.rs. +pub fn decode_bytes_in_place(data: &mut Vec, desc: bool) -> Result<()> { + if data.is_empty() { + return Ok(()); + } + let mut write_offset = 0; + let mut read_offset = 0; + loop { + let marker_offset = read_offset + ENC_GROUP_SIZE; + if marker_offset >= data.len() { + return Err(Error::internal_error(format!( + "unexpected EOF, original key = {:?}", + data + ))); + }; + + unsafe { + // it is semantically equivalent to C's memmove() + // and the src and dest may overlap + // if src == dest do nothing + ptr::copy( + data.as_ptr().add(read_offset), + data.as_mut_ptr().add(write_offset), + ENC_GROUP_SIZE, + ); + } + write_offset += ENC_GROUP_SIZE; + // everytime make ENC_GROUP_SIZE + 1 elements as a decode unit + read_offset += ENC_GROUP_SIZE + 1; + + // the last byte in decode unit is for marker which indicates pad size + let marker = data[marker_offset]; + let pad_size = if desc { + marker as usize + } else { + (ENC_MARKER - marker) as usize + }; + + if pad_size > 0 { + if pad_size > ENC_GROUP_SIZE { + return Err(Error::internal_error("invalid key padding")); + } + + // check the padding pattern whether validate or not + let padding_slice = if desc { + &ENC_DESC_PADDING[..pad_size] + } else { + &ENC_ASC_PADDING[..pad_size] + }; + if &data[write_offset - pad_size..write_offset] != padding_slice { + return Err(Error::internal_error("invalid key padding")); + } + unsafe { + data.set_len(write_offset - pad_size); + } + if desc { + for k in data { + *k = !*k; + } + } + return Ok(()); + } + } +} + +#[cfg(test)] +pub mod test { + use super::*; + + fn encode_bytes(bs: &[u8]) -> Vec { + encode_order_bytes(bs, false) + } + + fn encode_bytes_desc(bs: &[u8]) -> Vec { + encode_order_bytes(bs, true) + } + + fn encode_order_bytes(bs: &[u8], desc: bool) -> Vec { + let cap = max_encoded_bytes_size(bs.len()); + let mut encoded = Vec::with_capacity(cap); + encoded.encode_bytes(bs, desc).unwrap(); + encoded + } + + #[test] + fn test_enc_dec_bytes() { + let pairs = vec![ + ( + vec![], + vec![0, 0, 0, 0, 0, 0, 0, 0, 247], + vec![255, 255, 255, 255, 255, 255, 255, 255, 8], + ), + ( + vec![0], + vec![0, 0, 0, 0, 0, 0, 0, 0, 248], + vec![255, 255, 255, 255, 255, 255, 255, 255, 7], + ), + ( + vec![1, 2, 3], + vec![1, 2, 3, 0, 0, 0, 0, 0, 250], + vec![254, 253, 252, 255, 255, 255, 255, 255, 5], + ), + ( + vec![1, 2, 3, 0], + vec![1, 2, 3, 0, 0, 0, 0, 0, 251], + vec![254, 253, 252, 255, 255, 255, 255, 255, 4], + ), + ( + vec![1, 2, 3, 4, 5, 6, 7], + vec![1, 2, 3, 4, 5, 6, 7, 0, 254], + vec![254, 253, 252, 251, 250, 249, 248, 255, 1], + ), + ( + vec![0, 0, 0, 0, 0, 0, 0, 0], + vec![0, 0, 0, 0, 0, 0, 0, 0, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247], + vec![ + 255, 255, 255, 255, 255, 255, 255, 255, 0, 255, 255, 255, 255, 255, 255, 255, + 255, 8, + ], + ), + ( + vec![1, 2, 3, 4, 5, 6, 7, 8], + vec![1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247], + vec![ + 254, 253, 252, 251, 250, 249, 248, 247, 0, 255, 255, 255, 255, 255, 255, 255, + 255, 8, + ], + ), + ( + vec![1, 2, 3, 4, 5, 6, 7, 8, 9], + vec![1, 2, 3, 4, 5, 6, 7, 8, 255, 9, 0, 0, 0, 0, 0, 0, 0, 248], + vec![ + 254, 253, 252, 251, 250, 249, 248, 247, 0, 246, 255, 255, 255, 255, 255, 255, + 255, 7, + ], + ), + ]; + + for (source, mut asc, mut desc) in pairs { + assert_eq!(encode_bytes(&source), asc); + assert_eq!(encode_bytes_desc(&source), desc); + decode_bytes_in_place(&mut asc, false).unwrap(); + assert_eq!(source, asc); + decode_bytes_in_place(&mut desc, true).unwrap(); + assert_eq!(source, desc); + } + } +} diff --git a/tikv-client-common/src/kv/key.rs b/tikv-client-common/src/kv/key.rs index 8b0f606..5664cff 100644 --- a/tikv-client-common/src/kv/key.rs +++ b/tikv-client-common/src/kv/key.rs @@ -1,6 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use super::HexRepr; +use crate::kv::codec::{self, BytesEncoder}; use kvproto::kvrpcpb; #[allow(unused_imports)] #[cfg(test)] @@ -104,6 +105,14 @@ impl Key { Bound::Excluded(self) } } + + #[inline] + pub fn to_encoded(&self) -> Key { + let len = codec::max_encoded_bytes_size(self.0.len()); + let mut encoded = Vec::with_capacity(len); + encoded.encode_bytes(&self.0, false).unwrap(); + Key(encoded) + } } impl From> for Key { diff --git a/tikv-client-common/src/kv/mod.rs b/tikv-client-common/src/kv/mod.rs index 02ebd43..bb24401 100644 --- a/tikv-client-common/src/kv/mod.rs +++ b/tikv-client-common/src/kv/mod.rs @@ -2,6 +2,7 @@ use std::{fmt, u8}; mod bound_range; +pub mod codec; mod key; mod kvpair; mod value; From 46d20a6c42ab388a1154130d9733e8414b35ca72 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 29 Sep 2020 16:14:40 +0800 Subject: [PATCH 08/10] move raw_batch_scan to experimental Signed-off-by: ekexium --- README.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index bb30386..1e33023 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,6 @@ There are some [examples](examples) which show how to use the client in a Rust p | `batch_put` | `Iter` | `()` | | | `batch_get` | `Iter` | `Vec` | Skip non-existent keys; Does not retain order | | `batch_delete` | `Iter` | `()` | | -| `batch_scan` | `Iter` | `Vec` | Results are flattened; Retain order of ranges | | `delete_range` | `BoundRange` | `()` | | #### Transactional requests @@ -84,6 +83,18 @@ There are some [examples](examples) which show how to use the client in a Rust p For detailed behavior of each reqeust, please refer to the [doc](#Access-the-documentation). +#### Experimental raw requests + +You must be careful if you want to use the following request(s). Read the description for reasons. + +| Request | Main parameter type | Successful result type | +| -------------- | ------------------- | ---------------------- | +| `batch_scan` | `Iter` | `Vec` | + +The `each_limit` parameter does not work as expected. It does not limit the number of results returned of each range, instead it limits the number of results in each region of each range. As a result, you may get **more than** `each_limit` key-value pairs for each range. But you should not miss any entries. + +The results of `batch_scan` are flattened. The order of ranges is retained. + ### Useful types To use the client, there are 4 types you will need. From f4f86b18f99bf0568b0f1882d76098f803717876 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 30 Sep 2020 15:10:21 +0800 Subject: [PATCH 09/10] add a workaround of GC Signed-off-by: ekexium --- README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 1e33023..60932e5 100644 --- a/README.md +++ b/README.md @@ -107,10 +107,6 @@ To use the client, there are 4 types you will need. `BoundRange` is used for range related requests like `scan`. It implements `From` for usual ranges so you can just create a range and pass them to the request.For instance, `client.scan("k2".to_owned()..="k5".to_owned(), 5)` or `client.delete_range(vec![]..)`. - - - - ## Access the documentation We recommend using the cargo-generated documentation to browse and understand the API. We've done @@ -123,7 +119,12 @@ cargo doc --package tikv-client --open # If it didn't work, browse file URL it tried to open with your browser. ``` -## Minimal Rust Version +## Known issues + +If you use transactional APIs, you'll need to perform GC in TiKV to save storage. +However, current implementation does not provide a direct way to do this. A workaround is described in [#180](https://github.com/tikv/client-rust/issues/180). + +## Minimal Rust version This crate supports Rust 1.40 and above. From 06283c0ad3772b1802164ad67800652d2b62b034 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 12 Oct 2020 10:12:05 +0800 Subject: [PATCH 10/10] fix typo Signed-off-by: ekexium --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 60932e5..4ad9704 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ There are some [examples](examples) which show how to use the client in a Rust p | `batch_get` | `Iter` | `Iter` | Skip non-existent keys; Does not retain order | | `lock_keys` | `KvPair` | `()` | | -For detailed behavior of each reqeust, please refer to the [doc](#Access-the-documentation). +For detailed behavior of each request, please refer to the [doc](#Access-the-documentation). #### Experimental raw requests