Address review comments

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2021-02-22 13:03:31 +13:00
parent 8cb8dd666a
commit 9a5fd1d8c9
22 changed files with 235 additions and 144 deletions

View File

@ -6,14 +6,14 @@ os:
# - windows # TODO: https://github.com/pingcap/kvproto/issues/355 # - windows # TODO: https://github.com/pingcap/kvproto/issues/355
- osx - osx
rust: rust:
# - stable - stable
- nightly - nightly
env: env:
global: global:
- RUST_BACKTRACE=1 - RUST_BACKTRACE=1
- RUSTFLAGS="-D warnings" # - RUSTFLAGS="-D warnings"
- RUSTDOCFLAGS="-D warnings" # - RUSTDOCFLAGS="-D warnings"
addons: addons:
apt: apt:
@ -43,7 +43,7 @@ script:
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "stable" ]]; then cargo clippy -- -D clippy::all; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "stable" ]]; then cargo clippy -- -D clippy::all; fi
- cargo build --all - cargo build --all
- cargo build --examples - cargo build --examples
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo doc --exclude tikv-client-proto --document-private-items; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo doc --workspace --exclude tikv-client-proto --document-private-items --no-deps; fi
- if [[ $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo test --all -- --nocapture; fi - if [[ $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo test --all -- --nocapture; fi
# For now we only run full integration tests on Linux. Here's why: # For now we only run full integration tests on Linux. Here's why:
# * Docker on OS X is not supported by Travis. # * Docker on OS X is not supported by Travis.
@ -53,7 +53,7 @@ script:
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker ps; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker ps; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs pd; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs pd; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs kv; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs kv; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then sleep 2; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then sleep 60; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example raw -- --pd="127.0.0.1:2379"; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example raw -- --pd="127.0.0.1:2379"; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example transaction -- --pd="127.0.0.1:2379"; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example transaction -- --pd="127.0.0.1:2379"; fi

View File

@ -22,7 +22,7 @@ async-trait = "0.1"
derive-new = "0.5" derive-new = "0.5"
futures = { version = "0.3.5", features = ["async-await", "thread-pool"] } futures = { version = "0.3.5", features = ["async-await", "thread-pool"] }
futures-timer = "3.0" futures-timer = "3.0"
grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false } grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false }
lazy_static = "1" lazy_static = "1"
log = "0.4" log = "0.4"
prometheus = { version = "0.11", features = [ "push", "process" ], default-features = false } prometheus = { version = "0.11", features = [ "push", "process" ], default-features = false }

View File

@ -6,6 +6,6 @@ edition = "2018"
[dependencies] [dependencies]
derive-new = "0.5.8" derive-new = "0.5.8"
futures = "0.3.5" futures = "0.3.5"
grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false } grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false }
log = "0.4" log = "0.4"
tikv-client-proto = { path = "../tikv-client-proto"} tikv-client-proto = { path = "../tikv-client-proto"}

View File

@ -1,2 +1,2 @@
merge_imports = true imports_granularity="Crate"
format_code_in_doc_comments = true format_code_in_doc_comments = true

View File

@ -18,6 +18,7 @@ use std::pin::Pin;
pub struct LoopFn<A, F> { pub struct LoopFn<A, F> {
future: A, future: A,
func: F, func: F,
errored: bool,
} }
pub fn stream_fn<S, T, A, F, E>(initial_state: S, mut func: F) -> LoopFn<A, F> pub fn stream_fn<S, T, A, F, E>(initial_state: S, mut func: F) -> LoopFn<A, F>
@ -28,6 +29,7 @@ where
LoopFn { LoopFn {
future: func(initial_state), future: func(initial_state),
func, func,
errored: false,
} }
} }
@ -38,10 +40,16 @@ where
{ {
type Item = Result<T, E>; type Item = Result<T, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.errored {
return Poll::Ready(None);
}
unsafe { unsafe {
let this = Pin::get_unchecked_mut(self); let this = Pin::get_unchecked_mut(self);
match ready!(Pin::new_unchecked(&mut this.future).poll(cx)) { match ready!(Pin::new_unchecked(&mut this.future).poll(cx)) {
Err(e) => Poll::Ready(Some(Err(e))), Err(e) => {
this.errored = true;
Poll::Ready(Some(Err(e)))
}
Ok(None) => Poll::Ready(None), Ok(None) => Poll::Ready(None),
Ok(Some((s, t))) => { Ok(Some((s, t))) => {
this.future = (this.func)(s); this.future = (this.func)(s);

View File

@ -5,9 +5,6 @@
#![allow(clippy::redundant_closure)] #![allow(clippy::redundant_closure)]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
#![allow(incomplete_features)] #![allow(incomplete_features)]
#![cfg_attr(test, feature(specialization))]
#![feature(associated_type_bounds)]
#![feature(decl_macro)]
//! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a //! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
//! distributed transactional Key-Value database written in Rust. //! distributed transactional Key-Value database written in Rust.
@ -77,6 +74,9 @@
//! //!
//! At this point, you should seek the documentation in the related API modules. //! At this point, you should seek the documentation in the related API modules.
#[macro_use]
mod request;
#[macro_use] #[macro_use]
mod transaction; mod transaction;
@ -87,7 +87,6 @@ mod kv;
mod pd; mod pd;
mod raw; mod raw;
mod region; mod region;
mod request;
mod stats; mod stats;
mod store; mod store;
mod timestamp; mod timestamp;

View File

@ -154,3 +154,10 @@ impl PdClient for MockPdClient {
unimplemented!() unimplemented!()
} }
} }
pub fn mock_store() -> Store {
Store {
region: Region::default(),
client: Arc::new(MockKvClient::new("foo".to_owned(), None)),
}
}

View File

@ -117,7 +117,7 @@ impl Client {
.await? .await?
.resolve_lock(OPTIMISTIC_BACKOFF) .resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF) .retry_region(DEFAULT_REGION_BACKOFF)
.post_process() .post_process_default()
.plan(); .plan();
plan.execute().await plan.execute().await
} }
@ -147,8 +147,8 @@ impl Client {
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone()); let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(OPTIMISTIC_BACKOFF) .resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF)
.multi_region() .multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.merge(Collect) .merge(Collect)
.plan(); .plan();
plan.execute() plan.execute()

View File

@ -3,9 +3,7 @@
use super::RawRpcRequest; use super::RawRpcRequest;
use crate::{ use crate::{
pd::PdClient, pd::PdClient,
request::{ request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey},
shardable_keys, shardable_range, Collect, KvRequest, Merge, Process, Shardable, SingleKey,
},
store::{store_stream_for_keys, store_stream_for_ranges, Store}, store::{store_stream_for_keys, store_stream_for_ranges, Store},
transaction::HasLocks, transaction::HasLocks,
util::iter::FlatMapOkIterExt, util::iter::FlatMapOkIterExt,
@ -33,10 +31,10 @@ impl SingleKey for kvrpcpb::RawGetRequest {
} }
} }
impl Process for kvrpcpb::RawGetResponse { impl Process<kvrpcpb::RawGetResponse> for DefaultProcessor {
type Out = Option<Value>; type Out = Option<Value>;
fn process(input: Result<Self>) -> Result<Self::Out> { fn process(&self, input: Result<kvrpcpb::RawGetResponse>) -> Result<Self::Out> {
let mut input = input?; let mut input = input?;
Ok(if input.not_found { Ok(if input.not_found {
None None

View File

@ -10,15 +10,16 @@ use tikv_client_store::{HasError, Request};
pub use self::{ pub use self::{
plan::{ plan::{
Collect, CollectError, Dispatch, Merge, MergeResponse, MultiRegionPlan, Plan, Process, Collect, CollectError, DefaultProcessor, Dispatch, Merge, MergeResponse, MultiRegion, Plan,
ProcessResponse, ResolveLockPlan, RetryRegionPlan, Process, ProcessResponse, ResolveLock, RetryRegion,
}, },
plan_builder::{PlanBuilder, SingleKey}, plan_builder::{PlanBuilder, SingleKey},
shard::{shardable_keys, shardable_range, Shardable}, shard::Shardable,
}; };
mod plan; mod plan;
mod plan_builder; mod plan_builder;
#[macro_use]
mod shard; mod shard;
/// Abstracts any request sent to a TiKV server. /// Abstracts any request sent to a TiKV server.

View File

@ -48,14 +48,14 @@ impl<Req: KvRequest> Plan for Dispatch<Req> {
} }
} }
pub struct MultiRegionPlan<P: Plan, PdC: PdClient> { pub struct MultiRegion<P: Plan, PdC: PdClient> {
pub(super) inner: P, pub(super) inner: P,
pub pd_client: Arc<PdC>, pub pd_client: Arc<PdC>,
} }
impl<P: Plan, PdC: PdClient> Clone for MultiRegionPlan<P, PdC> { impl<P: Plan, PdC: PdClient> Clone for MultiRegion<P, PdC> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
MultiRegionPlan { MultiRegion {
inner: self.inner.clone(), inner: self.inner.clone(),
pd_client: self.pd_client.clone(), pd_client: self.pd_client.clone(),
} }
@ -63,7 +63,10 @@ impl<P: Plan, PdC: PdClient> Clone for MultiRegionPlan<P, PdC> {
} }
#[async_trait] #[async_trait]
impl<P: Plan<Result: HasError> + Shardable, PdC: PdClient> Plan for MultiRegionPlan<P, PdC> { impl<P: Plan + Shardable, PdC: PdClient> Plan for MultiRegion<P, PdC>
where
P::Result: HasError,
{
type Result = Vec<Result<P::Result>>; type Result = Vec<Result<P::Result>>;
async fn execute(&self) -> Result<Self::Result> { async fn execute(&self) -> Result<Self::Result> {
@ -105,8 +108,7 @@ impl<In: Clone + Send + Sync + 'static, P: Plan<Result = Vec<Result<In>>>, M: Me
type Result = M::Out; type Result = M::Out;
async fn execute(&self) -> Result<Self::Result> { async fn execute(&self) -> Result<Self::Result> {
let result = self.inner.execute().await?; self.merge.merge(self.inner.execute().await?)
self.merge.merge(result)
} }
} }
@ -128,36 +130,42 @@ impl<T: Send> Merge<T> for CollectError {
} }
/// Process data into another kind of data. /// Process data into another kind of data.
pub trait Process: Sized + Clone + Send + Sync + 'static { pub trait Process<In>: Sized + Clone + Send + Sync + 'static {
type Out: Send; type Out: Send;
fn process(input: Result<Self>) -> Result<Self::Out>; fn process(&self, input: Result<In>) -> Result<Self::Out>;
} }
#[derive(Clone)] #[derive(Clone)]
pub struct ProcessResponse<P: Plan, Pr: Process> { pub struct ProcessResponse<P: Plan, In, Pr: Process<In>> {
pub inner: P, pub inner: P,
pub phantom: PhantomData<Pr>, pub processor: Pr,
pub phantom: PhantomData<In>,
} }
#[async_trait] #[async_trait]
impl<P: Plan<Result = Pr>, Pr: Process> Plan for ProcessResponse<P, Pr> { impl<In: Clone + Sync + Send + 'static, P: Plan<Result = In>, Pr: Process<In>> Plan
for ProcessResponse<P, In, Pr>
{
type Result = Pr::Out; type Result = Pr::Out;
async fn execute(&self) -> Result<Self::Result> { async fn execute(&self) -> Result<Self::Result> {
Pr::process(self.inner.execute().await) self.processor.process(self.inner.execute().await)
} }
} }
pub struct RetryRegionPlan<P: Plan, PdC: PdClient> { #[derive(Clone, Copy, Debug)]
pub struct DefaultProcessor;
pub struct RetryRegion<P: Plan, PdC: PdClient> {
pub inner: P, pub inner: P,
pub pd_client: Arc<PdC>, pub pd_client: Arc<PdC>,
pub backoff: Backoff, pub backoff: Backoff,
} }
impl<P: Plan, PdC: PdClient> Clone for RetryRegionPlan<P, PdC> { impl<P: Plan, PdC: PdClient> Clone for RetryRegion<P, PdC> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
RetryRegionPlan { RetryRegion {
inner: self.inner.clone(), inner: self.inner.clone(),
pd_client: self.pd_client.clone(), pd_client: self.pd_client.clone(),
backoff: self.backoff.clone(), backoff: self.backoff.clone(),
@ -166,7 +174,10 @@ impl<P: Plan, PdC: PdClient> Clone for RetryRegionPlan<P, PdC> {
} }
#[async_trait] #[async_trait]
impl<P: Plan<Result: HasError>, PdC: PdClient> Plan for RetryRegionPlan<P, PdC> { impl<P: Plan, PdC: PdClient> Plan for RetryRegion<P, PdC>
where
P::Result: HasError,
{
type Result = P::Result; type Result = P::Result;
async fn execute(&self) -> Result<Self::Result> { async fn execute(&self) -> Result<Self::Result> {
@ -186,15 +197,15 @@ impl<P: Plan<Result: HasError>, PdC: PdClient> Plan for RetryRegionPlan<P, PdC>
} }
} }
pub struct ResolveLockPlan<P: Plan, PdC: PdClient> { pub struct ResolveLock<P: Plan, PdC: PdClient> {
pub inner: P, pub inner: P,
pub pd_client: Arc<PdC>, pub pd_client: Arc<PdC>,
pub backoff: Backoff, pub backoff: Backoff,
} }
impl<P: Plan, PdC: PdClient> Clone for ResolveLockPlan<P, PdC> { impl<P: Plan, PdC: PdClient> Clone for ResolveLock<P, PdC> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
ResolveLockPlan { ResolveLock {
inner: self.inner.clone(), inner: self.inner.clone(),
pd_client: self.pd_client.clone(), pd_client: self.pd_client.clone(),
backoff: self.backoff.clone(), backoff: self.backoff.clone(),
@ -203,11 +214,15 @@ impl<P: Plan, PdC: PdClient> Clone for ResolveLockPlan<P, PdC> {
} }
#[async_trait] #[async_trait]
impl<P: Plan<Result: HasLocks>, PdC: PdClient> Plan for ResolveLockPlan<P, PdC> { impl<P: Plan, PdC: PdClient> Plan for ResolveLock<P, PdC>
where
P::Result: HasLocks,
{
type Result = P::Result; type Result = P::Result;
async fn execute(&self) -> Result<Self::Result> { async fn execute(&self) -> Result<Self::Result> {
let mut result = self.inner.execute().await?; let mut result = self.inner.execute().await?;
let mut clone = self.clone();
loop { loop {
let locks = result.take_locks(); let locks = result.take_locks();
if locks.is_empty() { if locks.is_empty() {
@ -222,7 +237,6 @@ impl<P: Plan<Result: HasLocks>, PdC: PdClient> Plan for ResolveLockPlan<P, PdC>
if resolve_locks(locks, pd_client.clone()).await? { if resolve_locks(locks, pd_client.clone()).await? {
result = self.inner.execute().await?; result = self.inner.execute().await?;
} else { } else {
let mut clone = self.clone();
match clone.backoff.next_delay_duration() { match clone.backoff.next_delay_duration() {
None => return Err(Error::ResolveLockError), None => return Err(Error::ResolveLockError),
Some(delay_duration) => { Some(delay_duration) => {
@ -234,3 +248,61 @@ impl<P: Plan<Result: HasLocks>, PdC: PdClient> Plan for ResolveLockPlan<P, PdC>
} }
} }
} }
#[cfg(test)]
mod test {
use super::*;
use crate::mock::{mock_store, MockPdClient};
use futures::stream::BoxStream;
use tikv_client_proto::kvrpcpb::BatchGetResponse;
#[derive(Clone)]
struct ErrPlan;
#[async_trait]
impl Plan for ErrPlan {
type Result = BatchGetResponse;
async fn execute(&self) -> Result<Self::Result> {
Err(Error::Unimplemented)
}
}
impl Shardable for ErrPlan {
type Shard = u8;
fn shards(
&self,
_: &Arc<impl crate::pd::PdClient>,
) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::Store)>> {
Box::pin(stream::iter(1..=3).map(|_| Err(Error::Unimplemented)))
.map_ok(|_: u8| (42, mock_store()))
.boxed()
}
fn apply_shard(&mut self, _: Self::Shard, _: &crate::store::Store) -> Result<()> {
Ok(())
}
}
#[tokio::test]
async fn test_err() {
let plan = RetryRegion {
inner: MultiRegion {
inner: ResolveLock {
inner: ErrPlan,
backoff: Backoff::no_backoff(),
pd_client: Arc::new(MockPdClient::default()),
},
pd_client: Arc::new(MockPdClient::default()),
},
backoff: Backoff::no_backoff(),
pd_client: Arc::new(MockPdClient::default()),
};
plan.execute()
.await
.unwrap()
.iter()
.for_each(|r| assert!(r.is_err()));
}
}

View File

@ -4,8 +4,8 @@ use crate::{
backoff::Backoff, backoff::Backoff,
pd::PdClient, pd::PdClient,
request::{ request::{
Dispatch, KvRequest, Merge, MergeResponse, MultiRegionPlan, Plan, Process, ProcessResponse, DefaultProcessor, Dispatch, KvRequest, Merge, MergeResponse, MultiRegion, Plan, Process,
ResolveLockPlan, RetryRegionPlan, Shardable, ProcessResponse, ResolveLock, RetryRegion, Shardable,
}, },
store::Store, store::Store,
transaction::HasLocks, transaction::HasLocks,
@ -52,13 +52,13 @@ impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted> {
impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> { impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
/// If there is a lock error, then resolve the lock and retry the request. /// If there is a lock error, then resolve the lock and retry the request.
pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder<PdC, ResolveLockPlan<P, PdC>, Ph> pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
where where
P::Result: HasLocks, P::Result: HasLocks,
{ {
PlanBuilder { PlanBuilder {
pd_client: self.pd_client.clone(), pd_client: self.pd_client.clone(),
plan: ResolveLockPlan { plan: ResolveLock {
inner: self.plan, inner: self.plan,
backoff, backoff,
pd_client: self.pd_client, pd_client: self.pd_client,
@ -70,13 +70,13 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
/// If there is a region error, re-shard the request and re-resolve regions, then retry. /// If there is a region error, re-shard the request and re-resolve regions, then retry.
/// ///
/// Note that this plan must wrap a multi-region plan if the request should be re-sharded. /// Note that this plan must wrap a multi-region plan if the request should be re-sharded.
pub fn retry_region(self, backoff: Backoff) -> PlanBuilder<PdC, RetryRegionPlan<P, PdC>, Ph> pub fn retry_region(self, backoff: Backoff) -> PlanBuilder<PdC, RetryRegion<P, PdC>, Ph>
where where
P::Result: HasError, P::Result: HasError,
{ {
PlanBuilder { PlanBuilder {
pd_client: self.pd_client.clone(), pd_client: self.pd_client.clone(),
plan: RetryRegionPlan { plan: RetryRegion {
inner: self.plan, inner: self.plan,
backoff, backoff,
pd_client: self.pd_client, pd_client: self.pd_client,
@ -103,17 +103,21 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
} }
} }
/// Apply a processing step to a response (usually only needed if the request is sent to a /// Apply the default processing step to a response (usually only needed if the request is sent
/// single region because post-porcessing can be incorporated in the merge step for multi-region /// to a single region because post-porcessing can be incorporated in the merge step for
/// requests). /// multi-region requests).
pub fn post_process(self) -> PlanBuilder<PdC, ProcessResponse<P, P::Result>, Ph> pub fn post_process_default<In: Clone + Sync + Send + 'static>(
self,
) -> PlanBuilder<PdC, ProcessResponse<P, In, DefaultProcessor>, Ph>
where where
P: Plan<Result: Process>, P: Plan<Result = In>,
DefaultProcessor: Process<In>,
{ {
PlanBuilder { PlanBuilder {
pd_client: self.pd_client.clone(), pd_client: self.pd_client.clone(),
plan: ProcessResponse { plan: ProcessResponse {
inner: self.plan, inner: self.plan,
processor: DefaultProcessor,
phantom: PhantomData, phantom: PhantomData,
}, },
phantom: PhantomData, phantom: PhantomData,
@ -126,10 +130,10 @@ where
P::Result: HasError, P::Result: HasError,
{ {
/// Split the request into shards sending a request to the region of each shard. /// Split the request into shards sending a request to the region of each shard.
pub fn multi_region(self) -> PlanBuilder<PdC, MultiRegionPlan<P, PdC>, Targetted> { pub fn multi_region(self) -> PlanBuilder<PdC, MultiRegion<P, PdC>, Targetted> {
PlanBuilder { PlanBuilder {
pd_client: self.pd_client.clone(), pd_client: self.pd_client.clone(),
plan: MultiRegionPlan { plan: MultiRegion {
inner: self.plan, inner: self.plan,
pd_client: self.pd_client, pd_client: self.pd_client,
}, },

View File

@ -2,7 +2,7 @@
use crate::{ use crate::{
pd::PdClient, pd::PdClient,
request::{Dispatch, KvRequest, Plan, ResolveLockPlan, RetryRegionPlan}, request::{Dispatch, KvRequest, Plan, ResolveLock, RetryRegion},
store::Store, store::Store,
Result, Result,
}; };
@ -36,7 +36,7 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
} }
} }
impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLockPlan<P, PdC> { impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLock<P, PdC> {
type Shard = P::Shard; type Shard = P::Shard;
fn shards( fn shards(
@ -51,7 +51,7 @@ impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLockPlan<P, PdC> {
} }
} }
impl<P: Plan + Shardable, PdC: PdClient> Shardable for RetryRegionPlan<P, PdC> { impl<P: Plan + Shardable, PdC: PdClient> Shardable for RetryRegion<P, PdC> {
type Shard = P::Shard; type Shard = P::Shard;
fn shards( fn shards(
@ -66,15 +66,19 @@ impl<P: Plan + Shardable, PdC: PdClient> Shardable for RetryRegionPlan<P, PdC> {
} }
} }
pub macro shardable_keys($type_: ty) { #[macro_export]
macro_rules! shardable_keys {
($type_: ty) => {
impl Shardable for $type_ { impl Shardable for $type_ {
type Shard = Vec<Vec<u8>>; type Shard = Vec<Vec<u8>>;
fn shards( fn shards(
&self, &self,
pd_client: &std::sync::Arc<impl crate::pd::PdClient>, pd_client: &std::sync::Arc<impl crate::pd::PdClient>,
) -> futures::stream::BoxStream<'static, crate::Result<(Self::Shard, crate::store::Store)>> ) -> futures::stream::BoxStream<
{ 'static,
crate::Result<(Self::Shard, crate::store::Store)>,
> {
let mut keys = self.keys.clone(); let mut keys = self.keys.clone();
keys.sort(); keys.sort();
crate::store::store_stream_for_keys(keys.into_iter(), pd_client.clone()) crate::store::store_stream_for_keys(keys.into_iter(), pd_client.clone())
@ -90,9 +94,12 @@ pub macro shardable_keys($type_: ty) {
Ok(()) Ok(())
} }
} }
};
} }
pub macro shardable_range($type_: ty) { #[macro_export]
macro_rules! shardable_range {
($type_: ty) => {
impl Shardable for $type_ { impl Shardable for $type_ {
type Shard = (Vec<u8>, Vec<u8>); type Shard = (Vec<u8>, Vec<u8>);
@ -117,4 +124,5 @@ pub macro shardable_range($type_: ty) {
Ok(()) Ok(())
} }
} }
};
} }

View File

@ -63,7 +63,6 @@ pub fn store_stream_for_range<PdC: PdClient>(
); );
((result_range.0.into(), result_range.1.into()), store) ((result_range.0.into(), result_range.1.into()), store)
}) })
.into_stream()
.boxed() .boxed()
} }
@ -83,7 +82,6 @@ pub fn store_stream_for_range_by_start_key<PdC: PdClient>(
store, store,
) )
}) })
.into_stream()
.boxed() .boxed()
} }
@ -113,6 +111,5 @@ pub fn store_stream_for_ranges<PdC: PdClient>(
.store_for_id(region_id) .store_for_id(region_id)
.map_ok(move |store| (range, store)) .map_ok(move |store| (range, store))
}) })
.into_stream()
.boxed() .boxed()
} }

View File

@ -64,7 +64,7 @@ pub async fn resolve_locks(
.await? .await?
.resolve_lock(OPTIMISTIC_BACKOFF) .resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF) .retry_region(DEFAULT_REGION_BACKOFF)
.post_process() .post_process_default()
.plan(); .plan();
let commit_version = plan.execute().await?; let commit_version = plan.execute().await?;
commit_versions.insert(lock.lock_version, commit_version); commit_versions.insert(lock.lock_version, commit_version);

View File

@ -2,9 +2,7 @@
use crate::{ use crate::{
pd::PdClient, pd::PdClient,
request::{ request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey},
shardable_keys, shardable_range, Collect, KvRequest, Merge, Process, Shardable, SingleKey,
},
store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store}, store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store},
timestamp::TimestampExt, timestamp::TimestampExt,
transaction::HasLocks, transaction::HasLocks,
@ -32,10 +30,10 @@ impl SingleKey for kvrpcpb::GetRequest {
} }
} }
impl Process for kvrpcpb::GetResponse { impl Process<kvrpcpb::GetResponse> for DefaultProcessor {
type Out = Option<Value>; type Out = Option<Value>;
fn process(input: Result<Self>) -> Result<Self::Out> { fn process(&self, input: Result<kvrpcpb::GetResponse>) -> Result<Self::Out> {
let mut input = input?; let mut input = input?;
Ok(if input.not_found { Ok(if input.not_found {
None None
@ -163,10 +161,10 @@ impl SingleKey for kvrpcpb::CleanupRequest {
} }
} }
impl Process for kvrpcpb::CleanupResponse { impl Process<kvrpcpb::CleanupResponse> for DefaultProcessor {
type Out = u64; type Out = u64;
fn process(input: Result<Self>) -> Result<Self::Out> { fn process(&self, input: Result<kvrpcpb::CleanupResponse>) -> Result<Self::Out> {
Ok(input?.commit_version) Ok(input?.commit_version)
} }
} }
@ -343,7 +341,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest {
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.set_context(store.region.context()?); self.set_context(store.region.context()?);
self.set_mutations(shard.into_iter().map(Into::into).collect()); self.set_mutations(shard);
Ok(()) Ok(())
} }
} }
@ -450,10 +448,10 @@ impl SingleKey for kvrpcpb::TxnHeartBeatRequest {
} }
} }
impl Process for kvrpcpb::TxnHeartBeatResponse { impl Process<kvrpcpb::TxnHeartBeatResponse> for DefaultProcessor {
type Out = u64; type Out = u64;
fn process(input: Result<Self>) -> Result<Self::Out> { fn process(&self, input: Result<kvrpcpb::TxnHeartBeatResponse>) -> Result<Self::Out> {
Ok(input?.lock_ttl) Ok(input?.lock_ttl)
} }
} }
@ -468,10 +466,10 @@ impl SingleKey for kvrpcpb::CheckTxnStatusRequest {
} }
} }
impl Process for kvrpcpb::CheckTxnStatusResponse { impl Process<kvrpcpb::CheckTxnStatusResponse> for DefaultProcessor {
type Out = TransactionStatus; type Out = TransactionStatus;
fn process(input: Result<Self>) -> Result<Self::Out> { fn process(&self, input: Result<kvrpcpb::CheckTxnStatusResponse>) -> Result<Self::Out> {
Ok(input?.into()) Ok(input?.into())
} }
} }

View File

@ -108,7 +108,7 @@ impl Transaction {
.await? .await?
.resolve_lock(retry_options.lock_backoff) .resolve_lock(retry_options.lock_backoff)
.retry_region(retry_options.region_backoff) .retry_region(retry_options.region_backoff)
.post_process() .post_process_default()
.plan(); .plan();
plan.execute().await plan.execute().await
}) })
@ -572,7 +572,7 @@ impl Transaction {
.await? .await?
.resolve_lock(self.options.retry_options.lock_backoff.clone()) .resolve_lock(self.options.retry_options.lock_backoff.clone())
.retry_region(self.options.retry_options.region_backoff.clone()) .retry_region(self.options.retry_options.region_backoff.clone())
.post_process() .post_process_default()
.plan(); .plan();
plan.execute().await plan.execute().await
} }

View File

@ -18,17 +18,16 @@ const NUM_PEOPLE: u32 = 100;
const NUM_TRNASFER: u32 = 100; const NUM_TRNASFER: u32 = 100;
/// Delete all entris in TiKV to leave a clean space for following tests. /// Delete all entris in TiKV to leave a clean space for following tests.
async fn clear_tikv() -> Result<()> { async fn clear_tikv() {
let cfs = vec![ let cfs = vec![
ColumnFamily::Default, ColumnFamily::Default,
ColumnFamily::Lock, ColumnFamily::Lock,
ColumnFamily::Write, ColumnFamily::Write,
]; ];
for cf in cfs { for cf in cfs {
let raw_client = RawClient::new(pd_addrs()).await?.with_cf(cf); let raw_client = RawClient::new(pd_addrs()).await.unwrap().with_cf(cf);
raw_client.delete_range(vec![]..).await?; raw_client.delete_range(vec![]..).await.unwrap();
} }
Ok(())
} }
#[tokio::test] #[tokio::test]
@ -53,7 +52,7 @@ async fn get_timestamp() -> Result<()> {
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn crud() -> Result<()> { async fn crud() -> Result<()> {
clear_tikv().await?; clear_tikv().await;
let client = TransactionClient::new(pd_addrs()).await?; let client = TransactionClient::new(pd_addrs()).await?;
let mut txn = client.begin_optimistic().await?; let mut txn = client.begin_optimistic().await?;
@ -137,7 +136,7 @@ async fn crud() -> Result<()> {
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn insert_duplicate_keys() -> Result<()> { async fn insert_duplicate_keys() -> Result<()> {
clear_tikv().await?; clear_tikv().await;
let client = TransactionClient::new(pd_addrs()).await?; let client = TransactionClient::new(pd_addrs()).await?;
// Initialize TiKV store with {foo => bar} // Initialize TiKV store with {foo => bar}
@ -161,7 +160,7 @@ async fn insert_duplicate_keys() -> Result<()> {
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn pessimistic() -> Result<()> { async fn pessimistic() -> Result<()> {
clear_tikv().await?; clear_tikv().await;
let client = TransactionClient::new(pd_addrs()).await?; let client = TransactionClient::new(pd_addrs()).await?;
let mut txn = client.begin_pessimistic().await?; let mut txn = client.begin_pessimistic().await?;
@ -179,7 +178,7 @@ async fn pessimistic() -> Result<()> {
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn raw_bank_transfer() -> Result<()> { async fn raw_bank_transfer() -> Result<()> {
clear_tikv().await?; clear_tikv().await;
let client = RawClient::new(pd_addrs()).await?; let client = RawClient::new(pd_addrs()).await?;
let mut rng = thread_rng(); let mut rng = thread_rng();
@ -233,7 +232,7 @@ async fn txn_write_million() -> Result<()> {
const NUM_BITS_KEY_PER_TXN: u32 = 3; const NUM_BITS_KEY_PER_TXN: u32 = 3;
let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN); let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN);
clear_tikv().await?; clear_tikv().await;
let client = TransactionClient::new(pd_addrs()).await?; let client = TransactionClient::new(pd_addrs()).await?;
for i in 0..2u32.pow(NUM_BITS_TXN) { for i in 0..2u32.pow(NUM_BITS_TXN) {
@ -305,7 +304,7 @@ async fn txn_write_million() -> Result<()> {
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn txn_bank_transfer() -> Result<()> { async fn txn_bank_transfer() -> Result<()> {
clear_tikv().await?; clear_tikv().await;
let client = TransactionClient::new(pd_addrs()).await?; let client = TransactionClient::new(pd_addrs()).await?;
let mut rng = thread_rng(); let mut rng = thread_rng();
@ -359,7 +358,7 @@ async fn txn_bank_transfer() -> Result<()> {
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn raw_req() -> Result<()> { async fn raw_req() -> Result<()> {
clear_tikv().await?; clear_tikv().await;
let client = RawClient::new(pd_addrs()).await?; let client = RawClient::new(pd_addrs()).await?;
// empty; get non-existent key // empty; get non-existent key
@ -489,7 +488,7 @@ async fn raw_req() -> Result<()> {
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn test_update_safepoint() -> Result<()> { async fn test_update_safepoint() -> Result<()> {
clear_tikv().await?; clear_tikv().await;
let client = TransactionClient::new(pd_addrs()).await?; let client = TransactionClient::new(pd_addrs()).await?;
let res = client.gc(client.current_timestamp().await?).await?; let res = client.gc(client.current_timestamp().await?).await?;
assert!(res); assert!(res);
@ -509,7 +508,7 @@ async fn raw_write_million() -> Result<()> {
const NUM_BITS_KEY_PER_TXN: u32 = 10; const NUM_BITS_KEY_PER_TXN: u32 = 10;
let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN); let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN);
clear_tikv().await?; clear_tikv().await;
let client = RawClient::new(pd_addrs()).await?; let client = RawClient::new(pd_addrs()).await?;
for i in 0..2u32.pow(NUM_BITS_TXN) { for i in 0..2u32.pow(NUM_BITS_TXN) {
@ -555,7 +554,7 @@ async fn raw_write_million() -> Result<()> {
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn pessimistic_rollback() -> Result<()> { async fn pessimistic_rollback() -> Result<()> {
clear_tikv().await?; clear_tikv().await;
let client = let client =
TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?; TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?;
let mut preload_txn = client.begin_optimistic().await?; let mut preload_txn = client.begin_optimistic().await?;
@ -587,7 +586,7 @@ async fn pessimistic_rollback() -> Result<()> {
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn lock_keys() -> Result<()> { async fn lock_keys() -> Result<()> {
clear_tikv().await?; clear_tikv().await;
let client = let client =
TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?; TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?;

View File

@ -7,7 +7,7 @@ edition = "2018"
[dependencies] [dependencies]
thiserror = "1" thiserror = "1"
futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] } futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] }
grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false } grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false }
lazy_static = "1" lazy_static = "1"
log = "0.4" log = "0.4"
regex = "1" regex = "1"

View File

@ -6,7 +6,7 @@ edition = "2018"
[dependencies] [dependencies]
async-trait = "0.1" async-trait = "0.1"
futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] } futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] }
grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false } grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false }
log = "0.4" log = "0.4"
tikv-client-common = { path = "../tikv-client-common" } tikv-client-common = { path = "../tikv-client-common" }
tikv-client-proto = { path = "../tikv-client-proto" } tikv-client-proto = { path = "../tikv-client-proto" }

View File

@ -5,12 +5,12 @@ edition = "2018"
build = "build.rs" build = "build.rs"
[build-dependencies] [build-dependencies]
protobuf-build = { version = "0.11", default-features = false, features = ["grpcio-prost-codec"] } protobuf-build = { version = "0.12", default-features = false, features = ["grpcio-prost-codec"] }
[dependencies] [dependencies]
protobuf = "=2.8.0" protobuf = "=2.8.0"
prost = { version = "0.6" } prost = { version = "0.7" }
prost-derive = { version = "0.6" } prost-derive = { version = "0.7" }
futures = "0.3.5" futures = "0.3.5"
grpcio = { version = "0.7", default-features = false, features = ["secure", "prost-codec"] } grpcio = { version = "0.8", default-features = false, features = ["secure", "prost-codec", "use-bindgen"] }
lazy_static = { version = "1.3" } lazy_static = { version = "1.3" }

View File

@ -7,7 +7,7 @@ edition = "2018"
async-trait = "0.1" async-trait = "0.1"
derive-new = "0.5" derive-new = "0.5"
futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] } futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] }
grpcio = { version = "0.7", features = ["secure", "prost-codec"], default-features = false } grpcio = { version = "0.8", features = ["secure", "prost-codec", "use-bindgen"], default-features = false }
log = "0.4" log = "0.4"
tikv-client-common = { path = "../tikv-client-common" } tikv-client-common = { path = "../tikv-client-common" }
tikv-client-proto = { path = "../tikv-client-proto" } tikv-client-proto = { path = "../tikv-client-proto" }