mirror of https://github.com/tikv/client-rust.git
Merge branch 'master' into update-readme
This commit is contained in:
commit
59d8307d08
|
@ -32,6 +32,7 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
|
||||||
MockCluster,
|
MockCluster,
|
||||||
))
|
))
|
||||||
},
|
},
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
|
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
|
||||||
|
|
||||||
use crate::{pd::RetryClient, Config, Key, Region, RegionId};
|
use crate::{pd::RetryClient, tikv_client_common::kv::codec, Config, Key, Region, RegionId};
|
||||||
use futures::{
|
use futures::{
|
||||||
future::{ready, BoxFuture, Either},
|
future::{ready, BoxFuture, Either},
|
||||||
prelude::*,
|
prelude::*,
|
||||||
stream::BoxStream,
|
stream::BoxStream,
|
||||||
|
FutureExt, TryFutureExt,
|
||||||
};
|
};
|
||||||
use grpcio::{EnvBuilder, Environment};
|
use grpcio::{EnvBuilder, Environment};
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -23,20 +24,42 @@ use tikv_client_store::{KvClient, KvConnect, Store, TikvConnect};
|
||||||
const CQ_COUNT: usize = 1;
|
const CQ_COUNT: usize = 1;
|
||||||
const CLIENT_PREFIX: &str = "tikv-client";
|
const CLIENT_PREFIX: &str = "tikv-client";
|
||||||
|
|
||||||
|
// The PdClient handles all the encoding stuff.
|
||||||
|
//
|
||||||
|
// Raw APIs does not require encoding/decoding at all.
|
||||||
|
// All keys in all places (client, PD, TiKV) are in the same encoding (here called "raw format").
|
||||||
|
//
|
||||||
|
// Transactional APIs are a bit complicated.
|
||||||
|
// We need encode and decode keys when we communicate with PD, but not with TiKV.
|
||||||
|
// We encode keys before sending requests to PD, and decode keys in the response from PD.
|
||||||
|
// That's all we need to do with encoding.
|
||||||
|
//
|
||||||
|
// client -encoded-> PD, PD -encoded-> client
|
||||||
|
// client -raw-> TiKV, TiKV -raw-> client
|
||||||
|
//
|
||||||
|
// The reason for the behavior is that in transaction mode, TiKV encode keys for MVCC.
|
||||||
|
// In raw mode, TiKV doesn't encode them.
|
||||||
|
// TiKV tells PD using its internal representation, whatever the encoding is.
|
||||||
|
// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff.
|
||||||
|
//
|
||||||
pub trait PdClient: Send + Sync + 'static {
|
pub trait PdClient: Send + Sync + 'static {
|
||||||
type KvClient: KvClient + Send + Sync + 'static;
|
type KvClient: KvClient + Send + Sync + 'static;
|
||||||
|
|
||||||
|
// In transactional API, `region` is decoded (keys in raw format).
|
||||||
fn map_region_to_store(
|
fn map_region_to_store(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
region: Region,
|
region: Region,
|
||||||
) -> BoxFuture<'static, Result<Store<Self::KvClient>>>;
|
) -> BoxFuture<'static, Result<Store<Self::KvClient>>>;
|
||||||
|
|
||||||
|
// In transactional API, the key and returned region are both decoded (keys in raw format).
|
||||||
fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>>;
|
fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>>;
|
||||||
|
|
||||||
|
// In transactional API, the returned region is decoded (keys in raw format)
|
||||||
fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>>;
|
fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>>;
|
||||||
|
|
||||||
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>>;
|
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>>;
|
||||||
|
|
||||||
|
// In transactional API, `key` is in raw format
|
||||||
fn store_for_key(
|
fn store_for_key(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
key: &Key,
|
key: &Key,
|
||||||
|
@ -80,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static {
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns a Steam which iterates over the contexts for each region covered by range.
|
// Returns a Stream which iterates over the contexts for each region covered by range.
|
||||||
fn stores_for_range(
|
fn stores_for_range(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
range: BoundRange,
|
range: BoundRange,
|
||||||
|
@ -92,12 +115,15 @@ pub trait PdClient: Send + Sync + 'static {
|
||||||
Some(sk) => sk,
|
Some(sk) => sk,
|
||||||
};
|
};
|
||||||
let end_key = end_key.clone();
|
let end_key = end_key.clone();
|
||||||
|
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
Either::Left(self.region_for_key(&start_key).and_then(move |region| {
|
Either::Left(self.region_for_key(&start_key).and_then(move |region| {
|
||||||
let region_end = region.end_key();
|
let region_end = region.end_key();
|
||||||
this.map_region_to_store(region).map_ok(move |store| {
|
this.map_region_to_store(region).map_ok(move |store| {
|
||||||
if end_key.map(|x| x <= region_end).unwrap_or(false) || region_end.is_empty() {
|
if end_key
|
||||||
|
.map(|x| x <= region_end && !x.is_empty())
|
||||||
|
.unwrap_or(false)
|
||||||
|
|| region_end.is_empty()
|
||||||
|
{
|
||||||
return Some((None, store));
|
return Some((None, store));
|
||||||
}
|
}
|
||||||
Some((Some(region_end), store))
|
Some((Some(region_end), store))
|
||||||
|
@ -158,6 +184,14 @@ pub trait PdClient: Send + Sync + 'static {
|
||||||
})
|
})
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn decode_region(mut region: Region, enable_codec: bool) -> Result<Region> {
|
||||||
|
if enable_codec {
|
||||||
|
codec::decode_bytes_in_place(&mut region.region.mut_start_key(), false)?;
|
||||||
|
codec::decode_bytes_in_place(&mut region.region.mut_end_key(), false)?;
|
||||||
|
}
|
||||||
|
Ok(region)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This client converts requests for the logical TiKV cluster into requests
|
/// This client converts requests for the logical TiKV cluster into requests
|
||||||
|
@ -167,6 +201,7 @@ pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl
|
||||||
kv_connect: KvC,
|
kv_connect: KvC,
|
||||||
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
|
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
|
enable_codec: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
||||||
|
@ -191,12 +226,24 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>> {
|
fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>> {
|
||||||
let key: &[u8] = key.into();
|
let enable_codec = self.enable_codec;
|
||||||
self.pd.clone().get_region(key.to_owned()).boxed()
|
let key = if enable_codec {
|
||||||
|
key.to_encoded().into()
|
||||||
|
} else {
|
||||||
|
key.clone().into()
|
||||||
|
};
|
||||||
|
let region = self.pd.clone().get_region(key).boxed();
|
||||||
|
region
|
||||||
|
.ok_and_then(move |region| Self::decode_region(region, enable_codec))
|
||||||
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>> {
|
fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>> {
|
||||||
self.pd.clone().get_region_by_id(id).boxed()
|
let region = self.pd.clone().get_region_by_id(id).boxed();
|
||||||
|
let enable_codec = self.enable_codec;
|
||||||
|
region
|
||||||
|
.ok_and_then(move |region| Self::decode_region(region, enable_codec))
|
||||||
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
|
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
|
||||||
|
@ -205,13 +252,14 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PdRpcClient<TikvConnect, Cluster> {
|
impl PdRpcClient<TikvConnect, Cluster> {
|
||||||
pub async fn connect(config: &Config) -> Result<PdRpcClient> {
|
pub async fn connect(config: &Config, enable_codec: bool) -> Result<PdRpcClient> {
|
||||||
PdRpcClient::new(
|
PdRpcClient::new(
|
||||||
config,
|
config,
|
||||||
|env, security_mgr| TikvConnect::new(env, security_mgr),
|
|env, security_mgr| TikvConnect::new(env, security_mgr),
|
||||||
|env, security_mgr| {
|
|env, security_mgr| {
|
||||||
RetryClient::connect(env, &config.pd_endpoints, security_mgr, config.timeout)
|
RetryClient::connect(env, &config.pd_endpoints, security_mgr, config.timeout)
|
||||||
},
|
},
|
||||||
|
enable_codec,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -222,6 +270,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
|
||||||
config: &Config,
|
config: &Config,
|
||||||
kv_connect: MakeKvC,
|
kv_connect: MakeKvC,
|
||||||
pd: MakePd,
|
pd: MakePd,
|
||||||
|
enable_codec: bool,
|
||||||
) -> Result<PdRpcClient<KvC, Cl>>
|
) -> Result<PdRpcClient<KvC, Cl>>
|
||||||
where
|
where
|
||||||
PdFut: Future<Output = Result<RetryClient<Cl>>>,
|
PdFut: Future<Output = Result<RetryClient<Cl>>>,
|
||||||
|
@ -251,6 +300,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
|
||||||
kv_client_cache,
|
kv_client_cache,
|
||||||
kv_connect: kv_connect(env, security_mgr),
|
kv_connect: kv_connect(env, security_mgr),
|
||||||
timeout: config.timeout,
|
timeout: config.timeout,
|
||||||
|
enable_codec,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,6 +96,7 @@ impl RetryClient<Cluster> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
|
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
|
||||||
|
// It does not know about encoding. Caller should take care of it.
|
||||||
pub async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<Region> {
|
pub async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<Region> {
|
||||||
retry!(self, "get_region", |cluster| {
|
retry!(self, "get_region", |cluster| {
|
||||||
let key = key.clone();
|
let key = key.clone();
|
||||||
|
|
|
@ -26,7 +26,7 @@ impl Client {
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn new(config: Config) -> Result<Client> {
|
pub async fn new(config: Config) -> Result<Client> {
|
||||||
let rpc = Arc::new(PdRpcClient::connect(&config).await?);
|
let rpc = Arc::new(PdRpcClient::connect(&config, false).await?);
|
||||||
Ok(Client {
|
Ok(Client {
|
||||||
rpc,
|
rpc,
|
||||||
cf: None,
|
cf: None,
|
||||||
|
@ -103,8 +103,8 @@ impl Client {
|
||||||
/// Create a new 'batch get' request.
|
/// Create a new 'batch get' request.
|
||||||
///
|
///
|
||||||
/// Once resolved this request will result in the fetching of the values associated with the
|
/// Once resolved this request will result in the fetching of the values associated with the
|
||||||
/// given keys.
|
/// given keys
|
||||||
/// It only returns the entries that exist.
|
/// Non-existent entries will be skipped. The order of the keys is not retained.
|
||||||
///
|
///
|
||||||
/// ```rust,no_run
|
/// ```rust,no_run
|
||||||
/// # use tikv_client::{KvPair, Config, RawClient};
|
/// # use tikv_client::{KvPair, Config, RawClient};
|
||||||
|
|
|
@ -10,11 +10,10 @@ use crate::{
|
||||||
transaction::HasLocks,
|
transaction::HasLocks,
|
||||||
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
|
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
|
||||||
};
|
};
|
||||||
use tikv_client_store::{KvClient, RpcFnType, Store};
|
|
||||||
|
|
||||||
use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
|
use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
|
||||||
use kvproto::{kvrpcpb, tikvpb::TikvClient};
|
use kvproto::{kvrpcpb, tikvpb::TikvClient};
|
||||||
use std::{mem, sync::Arc};
|
use std::{mem, sync::Arc};
|
||||||
|
use tikv_client_store::{KvClient, RpcFnType, Store};
|
||||||
|
|
||||||
impl KvRequest for kvrpcpb::RawGetRequest {
|
impl KvRequest for kvrpcpb::RawGetRequest {
|
||||||
type Result = Option<Value>;
|
type Result = Option<Value>;
|
||||||
|
@ -87,6 +86,7 @@ impl KvRequest for kvrpcpb::RawBatchGetRequest {
|
||||||
&mut self,
|
&mut self,
|
||||||
pd_client: Arc<PdC>,
|
pd_client: Arc<PdC>,
|
||||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||||
|
self.keys.sort();
|
||||||
let keys = mem::take(&mut self.keys);
|
let keys = mem::take(&mut self.keys);
|
||||||
store_stream_for_keys(keys, pd_client)
|
store_stream_for_keys(keys, pd_client)
|
||||||
}
|
}
|
||||||
|
@ -183,6 +183,7 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
|
||||||
&mut self,
|
&mut self,
|
||||||
pd_client: Arc<PdC>,
|
pd_client: Arc<PdC>,
|
||||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||||
|
self.pairs.sort_by(|a, b| a.key.cmp(&b.key));
|
||||||
let pairs = mem::take(&mut self.pairs);
|
let pairs = mem::take(&mut self.pairs);
|
||||||
store_stream_for_keys(pairs, pd_client)
|
store_stream_for_keys(pairs, pd_client)
|
||||||
}
|
}
|
||||||
|
@ -272,6 +273,7 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
|
||||||
&mut self,
|
&mut self,
|
||||||
pd_client: Arc<PdC>,
|
pd_client: Arc<PdC>,
|
||||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||||
|
self.keys.sort();
|
||||||
let keys = mem::take(&mut self.keys);
|
let keys = mem::take(&mut self.keys);
|
||||||
store_stream_for_keys(keys, pd_client)
|
store_stream_for_keys(keys, pd_client)
|
||||||
}
|
}
|
||||||
|
@ -331,10 +333,7 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
|
||||||
fn reduce(
|
fn reduce(
|
||||||
results: BoxStream<'static, Result<Self::Result>>,
|
results: BoxStream<'static, Result<Self::Result>>,
|
||||||
) -> BoxFuture<'static, Result<Self::Result>> {
|
) -> BoxFuture<'static, Result<Self::Result>> {
|
||||||
results
|
results.try_for_each(|_| future::ready(Ok(()))).boxed()
|
||||||
.into_future()
|
|
||||||
.map(|(f, _)| f.expect("no results should be impossible"))
|
|
||||||
.boxed()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -164,6 +164,7 @@ where
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order
|
||||||
pub fn store_stream_for_keys<KeyData, IntoKey, I, PdC>(
|
pub fn store_stream_for_keys<KeyData, IntoKey, I, PdC>(
|
||||||
key_data: I,
|
key_data: I,
|
||||||
pd_client: Arc<PdC>,
|
pd_client: Arc<PdC>,
|
||||||
|
@ -208,6 +209,7 @@ fn bound_range(region_range: (Key, Key), range: BoundRange) -> (Key, Key) {
|
||||||
let up = match (upper.is_empty(), upper_bound) {
|
let up = match (upper.is_empty(), upper_bound) {
|
||||||
(_, None) => upper,
|
(_, None) => upper,
|
||||||
(true, Some(ub)) => ub,
|
(true, Some(ub)) => ub,
|
||||||
|
(_, Some(ub)) if ub.is_empty() => upper,
|
||||||
(_, Some(ub)) => min(upper, ub),
|
(_, Some(ub)) => min(upper, ub),
|
||||||
};
|
};
|
||||||
(max(lower, lower_bound), up)
|
(max(lower, lower_bound), up)
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||||
|
|
||||||
use crate::transaction::{Snapshot, Transaction};
|
use crate::{
|
||||||
|
pd::PdRpcClient,
|
||||||
|
transaction::{Snapshot, Transaction},
|
||||||
|
};
|
||||||
|
|
||||||
use crate::pd::{PdClient, PdRpcClient};
|
use crate::pd::PdClient;
|
||||||
use futures::executor::ThreadPool;
|
use futures::executor::ThreadPool;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tikv_client_common::{Config, Result, Timestamp};
|
use tikv_client_common::{Config, Result, Timestamp};
|
||||||
|
@ -30,7 +33,7 @@ impl Client {
|
||||||
let bg_worker = ThreadPool::new()?;
|
let bg_worker = ThreadPool::new()?;
|
||||||
// TODO: PdRpcClient::connect currently uses a blocking implementation.
|
// TODO: PdRpcClient::connect currently uses a blocking implementation.
|
||||||
// Make it asynchronous later.
|
// Make it asynchronous later.
|
||||||
let pd = Arc::new(PdRpcClient::connect(&config).await?);
|
let pd = Arc::new(PdRpcClient::connect(&config, true).await?);
|
||||||
Ok(Client {
|
Ok(Client {
|
||||||
pd,
|
pd,
|
||||||
bg_worker,
|
bg_worker,
|
||||||
|
|
|
@ -88,6 +88,7 @@ impl KvRequest for kvrpcpb::BatchGetRequest {
|
||||||
&mut self,
|
&mut self,
|
||||||
pd_client: Arc<PdC>,
|
pd_client: Arc<PdC>,
|
||||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||||
|
self.keys.sort();
|
||||||
let keys = mem::take(&mut self.keys);
|
let keys = mem::take(&mut self.keys);
|
||||||
store_stream_for_keys(keys, pd_client)
|
store_stream_for_keys(keys, pd_client)
|
||||||
}
|
}
|
||||||
|
@ -337,6 +338,7 @@ impl KvRequest for kvrpcpb::PrewriteRequest {
|
||||||
&mut self,
|
&mut self,
|
||||||
pd_client: Arc<PdC>,
|
pd_client: Arc<PdC>,
|
||||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||||
|
self.mutations.sort_by(|a, b| a.key.cmp(&b.key));
|
||||||
let mutations = mem::take(&mut self.mutations);
|
let mutations = mem::take(&mut self.mutations);
|
||||||
store_stream_for_keys(mutations, pd_client)
|
store_stream_for_keys(mutations, pd_client)
|
||||||
}
|
}
|
||||||
|
@ -346,10 +348,7 @@ impl KvRequest for kvrpcpb::PrewriteRequest {
|
||||||
fn reduce(
|
fn reduce(
|
||||||
results: BoxStream<'static, Result<Self::Result>>,
|
results: BoxStream<'static, Result<Self::Result>>,
|
||||||
) -> BoxFuture<'static, Result<Self::Result>> {
|
) -> BoxFuture<'static, Result<Self::Result>> {
|
||||||
results
|
results.try_for_each(|_| future::ready(Ok(()))).boxed()
|
||||||
.into_future()
|
|
||||||
.map(|(f, _)| f.expect("no results should be impossible"))
|
|
||||||
.boxed()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -399,6 +398,7 @@ impl KvRequest for kvrpcpb::CommitRequest {
|
||||||
&mut self,
|
&mut self,
|
||||||
pd_client: Arc<PdC>,
|
pd_client: Arc<PdC>,
|
||||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||||
|
self.keys.sort();
|
||||||
let keys = mem::take(&mut self.keys);
|
let keys = mem::take(&mut self.keys);
|
||||||
store_stream_for_keys(keys, pd_client)
|
store_stream_for_keys(keys, pd_client)
|
||||||
}
|
}
|
||||||
|
@ -408,10 +408,7 @@ impl KvRequest for kvrpcpb::CommitRequest {
|
||||||
fn reduce(
|
fn reduce(
|
||||||
results: BoxStream<'static, Result<Self::Result>>,
|
results: BoxStream<'static, Result<Self::Result>>,
|
||||||
) -> BoxFuture<'static, Result<Self::Result>> {
|
) -> BoxFuture<'static, Result<Self::Result>> {
|
||||||
results
|
results.try_for_each(|_| future::ready(Ok(()))).boxed()
|
||||||
.into_future()
|
|
||||||
.map(|(f, _)| f.expect("no results should be impossible"))
|
|
||||||
.boxed()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -447,6 +444,7 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest {
|
||||||
&mut self,
|
&mut self,
|
||||||
pd_client: Arc<PdC>,
|
pd_client: Arc<PdC>,
|
||||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||||
|
self.keys.sort();
|
||||||
let keys = mem::take(&mut self.keys);
|
let keys = mem::take(&mut self.keys);
|
||||||
store_stream_for_keys(keys, pd_client)
|
store_stream_for_keys(keys, pd_client)
|
||||||
}
|
}
|
||||||
|
@ -456,10 +454,7 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest {
|
||||||
fn reduce(
|
fn reduce(
|
||||||
results: BoxStream<'static, Result<Self::Result>>,
|
results: BoxStream<'static, Result<Self::Result>>,
|
||||||
) -> BoxFuture<'static, Result<Self::Result>> {
|
) -> BoxFuture<'static, Result<Self::Result>> {
|
||||||
results
|
results.try_for_each(|_| future::ready(Ok(()))).boxed()
|
||||||
.into_future()
|
|
||||||
.map(|(f, _)| f.expect("no results should be impossible"))
|
|
||||||
.boxed()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,8 @@ impl Transaction {
|
||||||
|
|
||||||
/// Gets the values associated with the given keys.
|
/// Gets the values associated with the given keys.
|
||||||
///
|
///
|
||||||
|
/// Non-existent entries will be skipped. The order of the keys is not retained.
|
||||||
|
///
|
||||||
/// ```rust,no_run
|
/// ```rust,no_run
|
||||||
/// # use tikv_client::{Key, Value, Config, transaction::Client};
|
/// # use tikv_client::{Key, Value, Config, transaction::Client};
|
||||||
/// # use futures::prelude::*;
|
/// # use futures::prelude::*;
|
||||||
|
@ -304,6 +306,7 @@ impl TwoPhaseCommitter {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(commit_version)
|
Ok(commit_version)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,50 +7,31 @@ use serial_test::serial;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
convert::TryInto,
|
convert::TryInto,
|
||||||
env,
|
env, iter,
|
||||||
|
};
|
||||||
|
use tikv_client::{
|
||||||
|
ColumnFamily, Config, Key, KvPair, RawClient, Result, Transaction, TransactionClient, Value,
|
||||||
};
|
};
|
||||||
use tikv_client::{Config, Key, RawClient, Result, Transaction, TransactionClient, Value};
|
|
||||||
|
|
||||||
/// The limit of scan in each iteration in `clear_tikv`.
|
|
||||||
const SCAN_BATCH_SIZE: u32 = 1000;
|
|
||||||
|
|
||||||
// Parameters used in test
|
// Parameters used in test
|
||||||
const NUM_PEOPLE: u32 = 100;
|
const NUM_PEOPLE: u32 = 100;
|
||||||
const NUM_TRNASFER: u32 = 100;
|
const NUM_TRNASFER: u32 = 100;
|
||||||
|
|
||||||
/// Delete all entris in TiKV to leave a clean space for following tests.
|
/// Delete all entris in TiKV to leave a clean space for following tests.
|
||||||
/// TiKV does not provide an elegant way to do this, so it is done by scanning and deletions.
|
|
||||||
async fn clear_tikv() -> Fallible<()> {
|
async fn clear_tikv() -> Fallible<()> {
|
||||||
delete_all_raw().await?;
|
let cfs = vec![
|
||||||
delete_all_txn().await?;
|
ColumnFamily::Default,
|
||||||
Fallible::Ok(())
|
ColumnFamily::Lock,
|
||||||
}
|
ColumnFamily::Write,
|
||||||
|
];
|
||||||
async fn delete_all_raw() -> Fallible<()> {
|
for cf in cfs {
|
||||||
let config = Config::new(pd_addrs());
|
let config = Config::new(pd_addrs());
|
||||||
let raw_client = RawClient::new(config).await?.with_key_only(true);
|
let raw_client = RawClient::new(config)
|
||||||
|
.await?
|
||||||
|
.with_key_only(true)
|
||||||
|
.with_cf(cf);
|
||||||
raw_client.delete_range(vec![]..).await?;
|
raw_client.delete_range(vec![]..).await?;
|
||||||
Fallible::Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_all_txn() -> Fallible<()> {
|
|
||||||
let config = Config::new(pd_addrs());
|
|
||||||
let txn_client = TransactionClient::new(config).await?.with_key_only(true);
|
|
||||||
let mut txn = txn_client.begin().await?;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let mut keys = txn.scan(vec![].., SCAN_BATCH_SIZE).await?.peekable();
|
|
||||||
|
|
||||||
if keys.peek().is_none() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
for kv in keys {
|
|
||||||
txn.delete(kv.into_key()).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
txn.commit().await?;
|
|
||||||
Fallible::Ok(())
|
Fallible::Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,6 +54,7 @@ async fn get_timestamp() -> Fallible<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests transactional get, put, delete, batch_get
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn crud() -> Fallible<()> {
|
async fn crud() -> Fallible<()> {
|
||||||
|
@ -200,6 +182,80 @@ async fn raw_bank_transfer() -> Fallible<()> {
|
||||||
Fallible::Ok(())
|
Fallible::Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tests transactional API when there are multiple regions.
|
||||||
|
/// Write large volumes of data to enforce region splitting.
|
||||||
|
/// In order to test `scan`, data is uniformly inserted.
|
||||||
|
#[tokio::test]
|
||||||
|
#[serial]
|
||||||
|
async fn txn_write_million() -> Fallible<()> {
|
||||||
|
const NUM_BITS_TXN: u32 = 7;
|
||||||
|
const NUM_BITS_KEY_PER_TXN: u32 = 3;
|
||||||
|
let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN);
|
||||||
|
|
||||||
|
clear_tikv().await?;
|
||||||
|
let config = Config::new(pd_addrs());
|
||||||
|
let client = TransactionClient::new(config).await?;
|
||||||
|
|
||||||
|
for i in 0..2u32.pow(NUM_BITS_TXN) {
|
||||||
|
let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN);
|
||||||
|
let keys = iter::repeat_with(|| {
|
||||||
|
let v = cur;
|
||||||
|
cur = cur.overflowing_add(interval).0;
|
||||||
|
v
|
||||||
|
})
|
||||||
|
.map(|u| u.to_be_bytes().to_vec())
|
||||||
|
.take(2usize.pow(NUM_BITS_KEY_PER_TXN))
|
||||||
|
.collect::<Vec<_>>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13
|
||||||
|
let mut txn = client.begin().await?;
|
||||||
|
for (k, v) in keys.iter().zip(iter::repeat(1u32.to_be_bytes().to_vec())) {
|
||||||
|
txn.put(k.clone(), v).await?;
|
||||||
|
}
|
||||||
|
txn.commit().await?;
|
||||||
|
|
||||||
|
let mut txn = client.begin().await?;
|
||||||
|
let res = txn.batch_get(keys).await?;
|
||||||
|
assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
|
||||||
|
txn.commit().await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// test scan
|
||||||
|
let limit = 2u32.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN + 2); // large enough
|
||||||
|
let txn = client.begin().await?;
|
||||||
|
let res = txn.scan(vec![].., limit).await?;
|
||||||
|
assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN));
|
||||||
|
|
||||||
|
// scan by small range and combine them
|
||||||
|
let mut rng = thread_rng();
|
||||||
|
let mut keys = gen_u32_keys(10, &mut rng)
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
keys.sort();
|
||||||
|
|
||||||
|
let mut sum = 0;
|
||||||
|
|
||||||
|
// empty key to key[0]
|
||||||
|
let txn = client.begin().await?;
|
||||||
|
let res = txn.scan(vec![]..keys[0].clone(), limit).await?;
|
||||||
|
sum += res.count();
|
||||||
|
|
||||||
|
// key[i] .. key[i+1]
|
||||||
|
for i in 0..keys.len() - 1 {
|
||||||
|
let res = txn
|
||||||
|
.scan(keys[i].clone()..keys[i + 1].clone(), limit)
|
||||||
|
.await?;
|
||||||
|
sum += res.count();
|
||||||
|
}
|
||||||
|
|
||||||
|
// keys[last] to unbounded
|
||||||
|
let res = txn.scan(keys[keys.len() - 1].clone().., limit).await?;
|
||||||
|
sum += res.count();
|
||||||
|
|
||||||
|
assert_eq!(sum, 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN));
|
||||||
|
|
||||||
|
Fallible::Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn txn_bank_transfer() -> Fallible<()> {
|
async fn txn_bank_transfer() -> Fallible<()> {
|
||||||
|
@ -252,7 +308,7 @@ async fn txn_bank_transfer() -> Fallible<()> {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn raw() -> Fallible<()> {
|
async fn raw_req() -> Fallible<()> {
|
||||||
clear_tikv().await?;
|
clear_tikv().await?;
|
||||||
let config = Config::new(pd_addrs());
|
let config = Config::new(pd_addrs());
|
||||||
let client = RawClient::new(config).await?;
|
let client = RawClient::new(config).await?;
|
||||||
|
@ -283,8 +339,8 @@ async fn raw() -> Fallible<()> {
|
||||||
let res = client
|
let res = client
|
||||||
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
|
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
|
||||||
.await?;
|
.await?;
|
||||||
assert_eq!(res[0].1, "v4".as_bytes());
|
assert_eq!(res[0], KvPair::new("k3".to_owned(), "v3"));
|
||||||
assert_eq!(res[1].1, "v3".as_bytes());
|
assert_eq!(res[1], KvPair::new("k4".to_owned(), "v4"));
|
||||||
|
|
||||||
// k1,k2,k3,k4; delete then get
|
// k1,k2,k3,k4; delete then get
|
||||||
let res = client.delete("k3".to_owned()).await;
|
let res = client.delete("k3".to_owned()).await;
|
||||||
|
|
|
@ -47,8 +47,8 @@ mod test {
|
||||||
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
|
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(res[0].1, "v4".as_bytes());
|
assert_eq!(res[0].1, "v3".as_bytes());
|
||||||
assert_eq!(res[1].1, "v3".as_bytes());
|
assert_eq!(res[1].1, "v4".as_bytes());
|
||||||
|
|
||||||
// k1,k2,k3,k4; delete then get
|
// k1,k2,k3,k4; delete then get
|
||||||
let res = client.delete("k3".to_owned()).await;
|
let res = client.delete("k3".to_owned()).await;
|
||||||
|
|
|
@ -0,0 +1,217 @@
|
||||||
|
use crate::{errors::Result, Error};
|
||||||
|
use std::{io::Write, ptr};
|
||||||
|
|
||||||
|
const ENC_GROUP_SIZE: usize = 8;
|
||||||
|
const ENC_MARKER: u8 = 0xff;
|
||||||
|
const ENC_ASC_PADDING: [u8; ENC_GROUP_SIZE] = [0; ENC_GROUP_SIZE];
|
||||||
|
const ENC_DESC_PADDING: [u8; ENC_GROUP_SIZE] = [!0; ENC_GROUP_SIZE];
|
||||||
|
|
||||||
|
/// Returns the maximum encoded bytes size.
|
||||||
|
///
|
||||||
|
/// Duplicate from components/tikv_util/src/codec/bytes.rs.
|
||||||
|
pub fn max_encoded_bytes_size(n: usize) -> usize {
|
||||||
|
(n / ENC_GROUP_SIZE + 1) * (ENC_GROUP_SIZE + 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait BytesEncoder: Write {
|
||||||
|
/// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format
|
||||||
|
///
|
||||||
|
/// Duplicate from components/tikv_util/src/codec/bytes.rs.
|
||||||
|
fn encode_bytes(&mut self, key: &[u8], desc: bool) -> Result<()> {
|
||||||
|
let len = key.len();
|
||||||
|
let mut index = 0;
|
||||||
|
let mut buf = [0; ENC_GROUP_SIZE];
|
||||||
|
while index <= len {
|
||||||
|
let remain = len - index;
|
||||||
|
let mut pad: usize = 0;
|
||||||
|
if remain > ENC_GROUP_SIZE {
|
||||||
|
self.write_all(adjust_bytes_order(
|
||||||
|
&key[index..index + ENC_GROUP_SIZE],
|
||||||
|
desc,
|
||||||
|
&mut buf,
|
||||||
|
))?;
|
||||||
|
} else {
|
||||||
|
pad = ENC_GROUP_SIZE - remain;
|
||||||
|
self.write_all(adjust_bytes_order(&key[index..], desc, &mut buf))?;
|
||||||
|
if desc {
|
||||||
|
self.write_all(&ENC_DESC_PADDING[..pad])?;
|
||||||
|
} else {
|
||||||
|
self.write_all(&ENC_ASC_PADDING[..pad])?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.write_all(adjust_bytes_order(
|
||||||
|
&[ENC_MARKER - (pad as u8)],
|
||||||
|
desc,
|
||||||
|
&mut buf,
|
||||||
|
))?;
|
||||||
|
index += ENC_GROUP_SIZE;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Write> BytesEncoder for T {}
|
||||||
|
|
||||||
|
fn adjust_bytes_order<'a>(bs: &'a [u8], desc: bool, buf: &'a mut [u8]) -> &'a [u8] {
|
||||||
|
if desc {
|
||||||
|
let mut buf_idx = 0;
|
||||||
|
for &b in bs {
|
||||||
|
buf[buf_idx] = !b;
|
||||||
|
buf_idx += 1;
|
||||||
|
}
|
||||||
|
&buf[..buf_idx]
|
||||||
|
} else {
|
||||||
|
bs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decodes bytes which are encoded by `encode_bytes` before just in place without malloc.
|
||||||
|
///
|
||||||
|
/// Duplicate from components/tikv_util/src/codec/bytes.rs.
|
||||||
|
pub fn decode_bytes_in_place(data: &mut Vec<u8>, desc: bool) -> Result<()> {
|
||||||
|
if data.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let mut write_offset = 0;
|
||||||
|
let mut read_offset = 0;
|
||||||
|
loop {
|
||||||
|
let marker_offset = read_offset + ENC_GROUP_SIZE;
|
||||||
|
if marker_offset >= data.len() {
|
||||||
|
return Err(Error::internal_error(format!(
|
||||||
|
"unexpected EOF, original key = {:?}",
|
||||||
|
data
|
||||||
|
)));
|
||||||
|
};
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
// it is semantically equivalent to C's memmove()
|
||||||
|
// and the src and dest may overlap
|
||||||
|
// if src == dest do nothing
|
||||||
|
ptr::copy(
|
||||||
|
data.as_ptr().add(read_offset),
|
||||||
|
data.as_mut_ptr().add(write_offset),
|
||||||
|
ENC_GROUP_SIZE,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
write_offset += ENC_GROUP_SIZE;
|
||||||
|
// everytime make ENC_GROUP_SIZE + 1 elements as a decode unit
|
||||||
|
read_offset += ENC_GROUP_SIZE + 1;
|
||||||
|
|
||||||
|
// the last byte in decode unit is for marker which indicates pad size
|
||||||
|
let marker = data[marker_offset];
|
||||||
|
let pad_size = if desc {
|
||||||
|
marker as usize
|
||||||
|
} else {
|
||||||
|
(ENC_MARKER - marker) as usize
|
||||||
|
};
|
||||||
|
|
||||||
|
if pad_size > 0 {
|
||||||
|
if pad_size > ENC_GROUP_SIZE {
|
||||||
|
return Err(Error::internal_error("invalid key padding"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// check the padding pattern whether validate or not
|
||||||
|
let padding_slice = if desc {
|
||||||
|
&ENC_DESC_PADDING[..pad_size]
|
||||||
|
} else {
|
||||||
|
&ENC_ASC_PADDING[..pad_size]
|
||||||
|
};
|
||||||
|
if &data[write_offset - pad_size..write_offset] != padding_slice {
|
||||||
|
return Err(Error::internal_error("invalid key padding"));
|
||||||
|
}
|
||||||
|
unsafe {
|
||||||
|
data.set_len(write_offset - pad_size);
|
||||||
|
}
|
||||||
|
if desc {
|
||||||
|
for k in data {
|
||||||
|
*k = !*k;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub mod test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn encode_bytes(bs: &[u8]) -> Vec<u8> {
|
||||||
|
encode_order_bytes(bs, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encode_bytes_desc(bs: &[u8]) -> Vec<u8> {
|
||||||
|
encode_order_bytes(bs, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encode_order_bytes(bs: &[u8], desc: bool) -> Vec<u8> {
|
||||||
|
let cap = max_encoded_bytes_size(bs.len());
|
||||||
|
let mut encoded = Vec::with_capacity(cap);
|
||||||
|
encoded.encode_bytes(bs, desc).unwrap();
|
||||||
|
encoded
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_enc_dec_bytes() {
|
||||||
|
let pairs = vec![
|
||||||
|
(
|
||||||
|
vec![],
|
||||||
|
vec![0, 0, 0, 0, 0, 0, 0, 0, 247],
|
||||||
|
vec![255, 255, 255, 255, 255, 255, 255, 255, 8],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
vec![0],
|
||||||
|
vec![0, 0, 0, 0, 0, 0, 0, 0, 248],
|
||||||
|
vec![255, 255, 255, 255, 255, 255, 255, 255, 7],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
vec![1, 2, 3],
|
||||||
|
vec![1, 2, 3, 0, 0, 0, 0, 0, 250],
|
||||||
|
vec![254, 253, 252, 255, 255, 255, 255, 255, 5],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
vec![1, 2, 3, 0],
|
||||||
|
vec![1, 2, 3, 0, 0, 0, 0, 0, 251],
|
||||||
|
vec![254, 253, 252, 255, 255, 255, 255, 255, 4],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
vec![1, 2, 3, 4, 5, 6, 7],
|
||||||
|
vec![1, 2, 3, 4, 5, 6, 7, 0, 254],
|
||||||
|
vec![254, 253, 252, 251, 250, 249, 248, 255, 1],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
vec![0, 0, 0, 0, 0, 0, 0, 0],
|
||||||
|
vec![0, 0, 0, 0, 0, 0, 0, 0, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247],
|
||||||
|
vec![
|
||||||
|
255, 255, 255, 255, 255, 255, 255, 255, 0, 255, 255, 255, 255, 255, 255, 255,
|
||||||
|
255, 8,
|
||||||
|
],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
vec![1, 2, 3, 4, 5, 6, 7, 8],
|
||||||
|
vec![1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247],
|
||||||
|
vec![
|
||||||
|
254, 253, 252, 251, 250, 249, 248, 247, 0, 255, 255, 255, 255, 255, 255, 255,
|
||||||
|
255, 8,
|
||||||
|
],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
vec![1, 2, 3, 4, 5, 6, 7, 8, 9],
|
||||||
|
vec![1, 2, 3, 4, 5, 6, 7, 8, 255, 9, 0, 0, 0, 0, 0, 0, 0, 248],
|
||||||
|
vec![
|
||||||
|
254, 253, 252, 251, 250, 249, 248, 247, 0, 246, 255, 255, 255, 255, 255, 255,
|
||||||
|
255, 7,
|
||||||
|
],
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (source, mut asc, mut desc) in pairs {
|
||||||
|
assert_eq!(encode_bytes(&source), asc);
|
||||||
|
assert_eq!(encode_bytes_desc(&source), desc);
|
||||||
|
decode_bytes_in_place(&mut asc, false).unwrap();
|
||||||
|
assert_eq!(source, asc);
|
||||||
|
decode_bytes_in_place(&mut desc, true).unwrap();
|
||||||
|
assert_eq!(source, desc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||||
|
|
||||||
use super::HexRepr;
|
use super::HexRepr;
|
||||||
|
use crate::kv::codec::{self, BytesEncoder};
|
||||||
use kvproto::kvrpcpb;
|
use kvproto::kvrpcpb;
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -104,6 +105,14 @@ impl Key {
|
||||||
Bound::Excluded(self)
|
Bound::Excluded(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn to_encoded(&self) -> Key {
|
||||||
|
let len = codec::max_encoded_bytes_size(self.0.len());
|
||||||
|
let mut encoded = Vec::with_capacity(len);
|
||||||
|
encoded.encode_bytes(&self.0, false).unwrap();
|
||||||
|
Key(encoded)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Vec<u8>> for Key {
|
impl From<Vec<u8>> for Key {
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
use std::{fmt, u8};
|
use std::{fmt, u8};
|
||||||
|
|
||||||
mod bound_range;
|
mod bound_range;
|
||||||
|
pub mod codec;
|
||||||
mod key;
|
mod key;
|
||||||
mod kvpair;
|
mod kvpair;
|
||||||
mod value;
|
mod value;
|
||||||
|
|
Loading…
Reference in New Issue