From 9a5fd1d8c9b73d0c7381f526d14ac34b0cb89b51 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Mon, 22 Feb 2021 13:03:31 +1300 Subject: [PATCH] Address review comments Signed-off-by: Nick Cameron --- .travis.yml | 10 +-- Cargo.toml | 2 +- mock-tikv/Cargo.toml | 2 +- rustfmt.toml | 2 +- src/compat.rs | 10 ++- src/lib.rs | 7 +- src/mock.rs | 7 ++ src/raw/client.rs | 4 +- src/raw/requests.rs | 8 +-- src/request/mod.rs | 7 +- src/request/plan.rs | 114 +++++++++++++++++++++++++++------ src/request/plan_builder.rs | 30 +++++---- src/request/shard.rs | 102 +++++++++++++++-------------- src/store.rs | 3 - src/transaction/lock.rs | 2 +- src/transaction/requests.rs | 22 +++---- src/transaction/transaction.rs | 4 +- tests/integration_tests.rs | 29 ++++----- tikv-client-common/Cargo.toml | 2 +- tikv-client-pd/Cargo.toml | 2 +- tikv-client-proto/Cargo.toml | 8 +-- tikv-client-store/Cargo.toml | 2 +- 22 files changed, 235 insertions(+), 144 deletions(-) diff --git a/.travis.yml b/.travis.yml index 144ed57..e725227 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,14 +6,14 @@ os: # - windows # TODO: https://github.com/pingcap/kvproto/issues/355 - osx rust: - # - stable + - stable - nightly env: global: - RUST_BACKTRACE=1 - - RUSTFLAGS="-D warnings" - - RUSTDOCFLAGS="-D warnings" +# - RUSTFLAGS="-D warnings" +# - RUSTDOCFLAGS="-D warnings" addons: apt: @@ -43,7 +43,7 @@ script: - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "stable" ]]; then cargo clippy -- -D clippy::all; fi - cargo build --all - cargo build --examples - - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo doc --exclude tikv-client-proto --document-private-items; fi + - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo doc --workspace --exclude tikv-client-proto --document-private-items --no-deps; fi - if [[ $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo test --all -- --nocapture; fi # For now we only run full integration tests on Linux. Here's why: # * Docker on OS X is not supported by Travis. @@ -53,7 +53,7 @@ script: - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker ps; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs pd; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs kv; fi - - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then sleep 2; fi + - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then sleep 60; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example raw -- --pd="127.0.0.1:2379"; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example transaction -- --pd="127.0.0.1:2379"; fi diff --git a/Cargo.toml b/Cargo.toml index bbf0b18..4b4b6ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ async-trait = "0.1" derive-new = "0.5" futures = { version = "0.3.5", features = ["async-await", "thread-pool"] } futures-timer = "3.0" -grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false } +grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false } lazy_static = "1" log = "0.4" prometheus = { version = "0.11", features = [ "push", "process" ], default-features = false } diff --git a/mock-tikv/Cargo.toml b/mock-tikv/Cargo.toml index ff7d083..047794e 100644 --- a/mock-tikv/Cargo.toml +++ b/mock-tikv/Cargo.toml @@ -6,6 +6,6 @@ edition = "2018" [dependencies] derive-new = "0.5.8" futures = "0.3.5" -grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false } +grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false } log = "0.4" tikv-client-proto = { path = "../tikv-client-proto"} diff --git a/rustfmt.toml b/rustfmt.toml index 179084b..2468391 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,2 +1,2 @@ -merge_imports = true +imports_granularity="Crate" format_code_in_doc_comments = true diff --git a/src/compat.rs b/src/compat.rs index a6b049d..0f67c63 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -18,6 +18,7 @@ use std::pin::Pin; pub struct LoopFn { future: A, func: F, + errored: bool, } pub fn stream_fn(initial_state: S, mut func: F) -> LoopFn @@ -28,6 +29,7 @@ where LoopFn { future: func(initial_state), func, + errored: false, } } @@ -38,10 +40,16 @@ where { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if self.errored { + return Poll::Ready(None); + } unsafe { let this = Pin::get_unchecked_mut(self); match ready!(Pin::new_unchecked(&mut this.future).poll(cx)) { - Err(e) => Poll::Ready(Some(Err(e))), + Err(e) => { + this.errored = true; + Poll::Ready(Some(Err(e))) + } Ok(None) => Poll::Ready(None), Ok(Some((s, t))) => { this.future = (this.func)(s); diff --git a/src/lib.rs b/src/lib.rs index 78474ca..a90cb82 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,9 +5,6 @@ #![allow(clippy::redundant_closure)] #![allow(clippy::type_complexity)] #![allow(incomplete_features)] -#![cfg_attr(test, feature(specialization))] -#![feature(associated_type_bounds)] -#![feature(decl_macro)] //! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a //! distributed transactional Key-Value database written in Rust. @@ -77,6 +74,9 @@ //! //! At this point, you should seek the documentation in the related API modules. +#[macro_use] +mod request; + #[macro_use] mod transaction; @@ -87,7 +87,6 @@ mod kv; mod pd; mod raw; mod region; -mod request; mod stats; mod store; mod timestamp; diff --git a/src/mock.rs b/src/mock.rs index b683b1f..51c2d90 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -154,3 +154,10 @@ impl PdClient for MockPdClient { unimplemented!() } } + +pub fn mock_store() -> Store { + Store { + region: Region::default(), + client: Arc::new(MockKvClient::new("foo".to_owned(), None)), + } +} diff --git a/src/raw/client.rs b/src/raw/client.rs index 35922ef..4ba15fb 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -117,7 +117,7 @@ impl Client { .await? .resolve_lock(OPTIMISTIC_BACKOFF) .retry_region(DEFAULT_REGION_BACKOFF) - .post_process() + .post_process_default() .plan(); plan.execute().await } @@ -147,8 +147,8 @@ impl Client { let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .resolve_lock(OPTIMISTIC_BACKOFF) - .retry_region(DEFAULT_REGION_BACKOFF) .multi_region() + .retry_region(DEFAULT_REGION_BACKOFF) .merge(Collect) .plan(); plan.execute() diff --git a/src/raw/requests.rs b/src/raw/requests.rs index b650e00..f65eec0 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -3,9 +3,7 @@ use super::RawRpcRequest; use crate::{ pd::PdClient, - request::{ - shardable_keys, shardable_range, Collect, KvRequest, Merge, Process, Shardable, SingleKey, - }, + request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey}, store::{store_stream_for_keys, store_stream_for_ranges, Store}, transaction::HasLocks, util::iter::FlatMapOkIterExt, @@ -33,10 +31,10 @@ impl SingleKey for kvrpcpb::RawGetRequest { } } -impl Process for kvrpcpb::RawGetResponse { +impl Process for DefaultProcessor { type Out = Option; - fn process(input: Result) -> Result { + fn process(&self, input: Result) -> Result { let mut input = input?; Ok(if input.not_found { None diff --git a/src/request/mod.rs b/src/request/mod.rs index 79ca718..49cb94f 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -10,15 +10,16 @@ use tikv_client_store::{HasError, Request}; pub use self::{ plan::{ - Collect, CollectError, Dispatch, Merge, MergeResponse, MultiRegionPlan, Plan, Process, - ProcessResponse, ResolveLockPlan, RetryRegionPlan, + Collect, CollectError, DefaultProcessor, Dispatch, Merge, MergeResponse, MultiRegion, Plan, + Process, ProcessResponse, ResolveLock, RetryRegion, }, plan_builder::{PlanBuilder, SingleKey}, - shard::{shardable_keys, shardable_range, Shardable}, + shard::Shardable, }; mod plan; mod plan_builder; +#[macro_use] mod shard; /// Abstracts any request sent to a TiKV server. diff --git a/src/request/plan.rs b/src/request/plan.rs index fdd418f..3ecc7f5 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -48,14 +48,14 @@ impl Plan for Dispatch { } } -pub struct MultiRegionPlan { +pub struct MultiRegion { pub(super) inner: P, pub pd_client: Arc, } -impl Clone for MultiRegionPlan { +impl Clone for MultiRegion { fn clone(&self) -> Self { - MultiRegionPlan { + MultiRegion { inner: self.inner.clone(), pd_client: self.pd_client.clone(), } @@ -63,7 +63,10 @@ impl Clone for MultiRegionPlan { } #[async_trait] -impl + Shardable, PdC: PdClient> Plan for MultiRegionPlan { +impl Plan for MultiRegion +where + P::Result: HasError, +{ type Result = Vec>; async fn execute(&self) -> Result { @@ -105,8 +108,7 @@ impl>>, M: Me type Result = M::Out; async fn execute(&self) -> Result { - let result = self.inner.execute().await?; - self.merge.merge(result) + self.merge.merge(self.inner.execute().await?) } } @@ -128,36 +130,42 @@ impl Merge for CollectError { } /// Process data into another kind of data. -pub trait Process: Sized + Clone + Send + Sync + 'static { +pub trait Process: Sized + Clone + Send + Sync + 'static { type Out: Send; - fn process(input: Result) -> Result; + fn process(&self, input: Result) -> Result; } #[derive(Clone)] -pub struct ProcessResponse { +pub struct ProcessResponse> { pub inner: P, - pub phantom: PhantomData, + pub processor: Pr, + pub phantom: PhantomData, } #[async_trait] -impl, Pr: Process> Plan for ProcessResponse { +impl, Pr: Process> Plan + for ProcessResponse +{ type Result = Pr::Out; async fn execute(&self) -> Result { - Pr::process(self.inner.execute().await) + self.processor.process(self.inner.execute().await) } } -pub struct RetryRegionPlan { +#[derive(Clone, Copy, Debug)] +pub struct DefaultProcessor; + +pub struct RetryRegion { pub inner: P, pub pd_client: Arc, pub backoff: Backoff, } -impl Clone for RetryRegionPlan { +impl Clone for RetryRegion { fn clone(&self) -> Self { - RetryRegionPlan { + RetryRegion { inner: self.inner.clone(), pd_client: self.pd_client.clone(), backoff: self.backoff.clone(), @@ -166,7 +174,10 @@ impl Clone for RetryRegionPlan { } #[async_trait] -impl, PdC: PdClient> Plan for RetryRegionPlan { +impl Plan for RetryRegion +where + P::Result: HasError, +{ type Result = P::Result; async fn execute(&self) -> Result { @@ -186,15 +197,15 @@ impl, PdC: PdClient> Plan for RetryRegionPlan } } -pub struct ResolveLockPlan { +pub struct ResolveLock { pub inner: P, pub pd_client: Arc, pub backoff: Backoff, } -impl Clone for ResolveLockPlan { +impl Clone for ResolveLock { fn clone(&self) -> Self { - ResolveLockPlan { + ResolveLock { inner: self.inner.clone(), pd_client: self.pd_client.clone(), backoff: self.backoff.clone(), @@ -203,11 +214,15 @@ impl Clone for ResolveLockPlan { } #[async_trait] -impl, PdC: PdClient> Plan for ResolveLockPlan { +impl Plan for ResolveLock +where + P::Result: HasLocks, +{ type Result = P::Result; async fn execute(&self) -> Result { let mut result = self.inner.execute().await?; + let mut clone = self.clone(); loop { let locks = result.take_locks(); if locks.is_empty() { @@ -222,7 +237,6 @@ impl, PdC: PdClient> Plan for ResolveLockPlan if resolve_locks(locks, pd_client.clone()).await? { result = self.inner.execute().await?; } else { - let mut clone = self.clone(); match clone.backoff.next_delay_duration() { None => return Err(Error::ResolveLockError), Some(delay_duration) => { @@ -234,3 +248,61 @@ impl, PdC: PdClient> Plan for ResolveLockPlan } } } + +#[cfg(test)] +mod test { + use super::*; + use crate::mock::{mock_store, MockPdClient}; + use futures::stream::BoxStream; + use tikv_client_proto::kvrpcpb::BatchGetResponse; + + #[derive(Clone)] + struct ErrPlan; + + #[async_trait] + impl Plan for ErrPlan { + type Result = BatchGetResponse; + + async fn execute(&self) -> Result { + Err(Error::Unimplemented) + } + } + + impl Shardable for ErrPlan { + type Shard = u8; + + fn shards( + &self, + _: &Arc, + ) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::Store)>> { + Box::pin(stream::iter(1..=3).map(|_| Err(Error::Unimplemented))) + .map_ok(|_: u8| (42, mock_store())) + .boxed() + } + + fn apply_shard(&mut self, _: Self::Shard, _: &crate::store::Store) -> Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_err() { + let plan = RetryRegion { + inner: MultiRegion { + inner: ResolveLock { + inner: ErrPlan, + backoff: Backoff::no_backoff(), + pd_client: Arc::new(MockPdClient::default()), + }, + pd_client: Arc::new(MockPdClient::default()), + }, + backoff: Backoff::no_backoff(), + pd_client: Arc::new(MockPdClient::default()), + }; + plan.execute() + .await + .unwrap() + .iter() + .for_each(|r| assert!(r.is_err())); + } +} diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 5803939..ab626cc 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -4,8 +4,8 @@ use crate::{ backoff::Backoff, pd::PdClient, request::{ - Dispatch, KvRequest, Merge, MergeResponse, MultiRegionPlan, Plan, Process, ProcessResponse, - ResolveLockPlan, RetryRegionPlan, Shardable, + DefaultProcessor, Dispatch, KvRequest, Merge, MergeResponse, MultiRegion, Plan, Process, + ProcessResponse, ResolveLock, RetryRegion, Shardable, }, store::Store, transaction::HasLocks, @@ -52,13 +52,13 @@ impl PlanBuilder { impl PlanBuilder { /// If there is a lock error, then resolve the lock and retry the request. - pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder, Ph> + pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder, Ph> where P::Result: HasLocks, { PlanBuilder { pd_client: self.pd_client.clone(), - plan: ResolveLockPlan { + plan: ResolveLock { inner: self.plan, backoff, pd_client: self.pd_client, @@ -70,13 +70,13 @@ impl PlanBuilder { /// If there is a region error, re-shard the request and re-resolve regions, then retry. /// /// Note that this plan must wrap a multi-region plan if the request should be re-sharded. - pub fn retry_region(self, backoff: Backoff) -> PlanBuilder, Ph> + pub fn retry_region(self, backoff: Backoff) -> PlanBuilder, Ph> where P::Result: HasError, { PlanBuilder { pd_client: self.pd_client.clone(), - plan: RetryRegionPlan { + plan: RetryRegion { inner: self.plan, backoff, pd_client: self.pd_client, @@ -103,17 +103,21 @@ impl PlanBuilder { } } - /// Apply a processing step to a response (usually only needed if the request is sent to a - /// single region because post-porcessing can be incorporated in the merge step for multi-region - /// requests). - pub fn post_process(self) -> PlanBuilder, Ph> + /// Apply the default processing step to a response (usually only needed if the request is sent + /// to a single region because post-porcessing can be incorporated in the merge step for + /// multi-region requests). + pub fn post_process_default( + self, + ) -> PlanBuilder, Ph> where - P: Plan, + P: Plan, + DefaultProcessor: Process, { PlanBuilder { pd_client: self.pd_client.clone(), plan: ProcessResponse { inner: self.plan, + processor: DefaultProcessor, phantom: PhantomData, }, phantom: PhantomData, @@ -126,10 +130,10 @@ where P::Result: HasError, { /// Split the request into shards sending a request to the region of each shard. - pub fn multi_region(self) -> PlanBuilder, Targetted> { + pub fn multi_region(self) -> PlanBuilder, Targetted> { PlanBuilder { pd_client: self.pd_client.clone(), - plan: MultiRegionPlan { + plan: MultiRegion { inner: self.plan, pd_client: self.pd_client, }, diff --git a/src/request/shard.rs b/src/request/shard.rs index 3d46fa1..a632783 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -2,7 +2,7 @@ use crate::{ pd::PdClient, - request::{Dispatch, KvRequest, Plan, ResolveLockPlan, RetryRegionPlan}, + request::{Dispatch, KvRequest, Plan, ResolveLock, RetryRegion}, store::Store, Result, }; @@ -36,7 +36,7 @@ impl Shardable for Dispatch { } } -impl Shardable for ResolveLockPlan { +impl Shardable for ResolveLock { type Shard = P::Shard; fn shards( @@ -51,7 +51,7 @@ impl Shardable for ResolveLockPlan { } } -impl Shardable for RetryRegionPlan { +impl Shardable for RetryRegion { type Shard = P::Shard; fn shards( @@ -66,55 +66,63 @@ impl Shardable for RetryRegionPlan { } } -pub macro shardable_keys($type_: ty) { - impl Shardable for $type_ { - type Shard = Vec>; +#[macro_export] +macro_rules! shardable_keys { + ($type_: ty) => { + impl Shardable for $type_ { + type Shard = Vec>; - fn shards( - &self, - pd_client: &std::sync::Arc, - ) -> futures::stream::BoxStream<'static, crate::Result<(Self::Shard, crate::store::Store)>> - { - let mut keys = self.keys.clone(); - keys.sort(); - crate::store::store_stream_for_keys(keys.into_iter(), pd_client.clone()) - } + fn shards( + &self, + pd_client: &std::sync::Arc, + ) -> futures::stream::BoxStream< + 'static, + crate::Result<(Self::Shard, crate::store::Store)>, + > { + let mut keys = self.keys.clone(); + keys.sort(); + crate::store::store_stream_for_keys(keys.into_iter(), pd_client.clone()) + } - fn apply_shard( - &mut self, - shard: Self::Shard, - store: &crate::store::Store, - ) -> crate::Result<()> { - self.set_context(store.region.context()?); - self.set_keys(shard.into_iter().map(Into::into).collect()); - Ok(()) + fn apply_shard( + &mut self, + shard: Self::Shard, + store: &crate::store::Store, + ) -> crate::Result<()> { + self.set_context(store.region.context()?); + self.set_keys(shard.into_iter().map(Into::into).collect()); + Ok(()) + } } - } + }; } -pub macro shardable_range($type_: ty) { - impl Shardable for $type_ { - type Shard = (Vec, Vec); +#[macro_export] +macro_rules! shardable_range { + ($type_: ty) => { + impl Shardable for $type_ { + type Shard = (Vec, Vec); - fn shards( - &self, - pd_client: &Arc, - ) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::Store)>> { - let start_key = self.start_key.clone().into(); - let end_key = self.end_key.clone().into(); - crate::store::store_stream_for_range((start_key, end_key), pd_client.clone()) + fn shards( + &self, + pd_client: &Arc, + ) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::Store)>> { + let start_key = self.start_key.clone().into(); + let end_key = self.end_key.clone().into(); + crate::store::store_stream_for_range((start_key, end_key), pd_client.clone()) + } + + fn apply_shard( + &mut self, + shard: Self::Shard, + store: &crate::store::Store, + ) -> crate::Result<()> { + self.set_context(store.region.context()?); + + self.set_start_key(shard.0.into()); + self.set_end_key(shard.1.into()); + Ok(()) + } } - - fn apply_shard( - &mut self, - shard: Self::Shard, - store: &crate::store::Store, - ) -> crate::Result<()> { - self.set_context(store.region.context()?); - - self.set_start_key(shard.0.into()); - self.set_end_key(shard.1.into()); - Ok(()) - } - } + }; } diff --git a/src/store.rs b/src/store.rs index d8252bb..c13ec19 100644 --- a/src/store.rs +++ b/src/store.rs @@ -63,7 +63,6 @@ pub fn store_stream_for_range( ); ((result_range.0.into(), result_range.1.into()), store) }) - .into_stream() .boxed() } @@ -83,7 +82,6 @@ pub fn store_stream_for_range_by_start_key( store, ) }) - .into_stream() .boxed() } @@ -113,6 +111,5 @@ pub fn store_stream_for_ranges( .store_for_id(region_id) .map_ok(move |store| (range, store)) }) - .into_stream() .boxed() } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index c57cfea..29ccc19 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -64,7 +64,7 @@ pub async fn resolve_locks( .await? .resolve_lock(OPTIMISTIC_BACKOFF) .retry_region(DEFAULT_REGION_BACKOFF) - .post_process() + .post_process_default() .plan(); let commit_version = plan.execute().await?; commit_versions.insert(lock.lock_version, commit_version); diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 24e12fc..5d3e69b 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -2,9 +2,7 @@ use crate::{ pd::PdClient, - request::{ - shardable_keys, shardable_range, Collect, KvRequest, Merge, Process, Shardable, SingleKey, - }, + request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey}, store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store}, timestamp::TimestampExt, transaction::HasLocks, @@ -32,10 +30,10 @@ impl SingleKey for kvrpcpb::GetRequest { } } -impl Process for kvrpcpb::GetResponse { +impl Process for DefaultProcessor { type Out = Option; - fn process(input: Result) -> Result { + fn process(&self, input: Result) -> Result { let mut input = input?; Ok(if input.not_found { None @@ -163,10 +161,10 @@ impl SingleKey for kvrpcpb::CleanupRequest { } } -impl Process for kvrpcpb::CleanupResponse { +impl Process for DefaultProcessor { type Out = u64; - fn process(input: Result) -> Result { + fn process(&self, input: Result) -> Result { Ok(input?.commit_version) } } @@ -343,7 +341,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { self.set_context(store.region.context()?); - self.set_mutations(shard.into_iter().map(Into::into).collect()); + self.set_mutations(shard); Ok(()) } } @@ -450,10 +448,10 @@ impl SingleKey for kvrpcpb::TxnHeartBeatRequest { } } -impl Process for kvrpcpb::TxnHeartBeatResponse { +impl Process for DefaultProcessor { type Out = u64; - fn process(input: Result) -> Result { + fn process(&self, input: Result) -> Result { Ok(input?.lock_ttl) } } @@ -468,10 +466,10 @@ impl SingleKey for kvrpcpb::CheckTxnStatusRequest { } } -impl Process for kvrpcpb::CheckTxnStatusResponse { +impl Process for DefaultProcessor { type Out = TransactionStatus; - fn process(input: Result) -> Result { + fn process(&self, input: Result) -> Result { Ok(input?.into()) } } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index fc7ab74..896d6d1 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -108,7 +108,7 @@ impl Transaction { .await? .resolve_lock(retry_options.lock_backoff) .retry_region(retry_options.region_backoff) - .post_process() + .post_process_default() .plan(); plan.execute().await }) @@ -572,7 +572,7 @@ impl Transaction { .await? .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_region(self.options.retry_options.region_backoff.clone()) - .post_process() + .post_process_default() .plan(); plan.execute().await } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index c3ef3dc..8160314 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -18,17 +18,16 @@ const NUM_PEOPLE: u32 = 100; const NUM_TRNASFER: u32 = 100; /// Delete all entris in TiKV to leave a clean space for following tests. -async fn clear_tikv() -> Result<()> { +async fn clear_tikv() { let cfs = vec![ ColumnFamily::Default, ColumnFamily::Lock, ColumnFamily::Write, ]; for cf in cfs { - let raw_client = RawClient::new(pd_addrs()).await?.with_cf(cf); - raw_client.delete_range(vec![]..).await?; + let raw_client = RawClient::new(pd_addrs()).await.unwrap().with_cf(cf); + raw_client.delete_range(vec![]..).await.unwrap(); } - Ok(()) } #[tokio::test] @@ -53,7 +52,7 @@ async fn get_timestamp() -> Result<()> { #[tokio::test] #[serial] async fn crud() -> Result<()> { - clear_tikv().await?; + clear_tikv().await; let client = TransactionClient::new(pd_addrs()).await?; let mut txn = client.begin_optimistic().await?; @@ -137,7 +136,7 @@ async fn crud() -> Result<()> { #[tokio::test] #[serial] async fn insert_duplicate_keys() -> Result<()> { - clear_tikv().await?; + clear_tikv().await; let client = TransactionClient::new(pd_addrs()).await?; // Initialize TiKV store with {foo => bar} @@ -161,7 +160,7 @@ async fn insert_duplicate_keys() -> Result<()> { #[tokio::test] #[serial] async fn pessimistic() -> Result<()> { - clear_tikv().await?; + clear_tikv().await; let client = TransactionClient::new(pd_addrs()).await?; let mut txn = client.begin_pessimistic().await?; @@ -179,7 +178,7 @@ async fn pessimistic() -> Result<()> { #[tokio::test] #[serial] async fn raw_bank_transfer() -> Result<()> { - clear_tikv().await?; + clear_tikv().await; let client = RawClient::new(pd_addrs()).await?; let mut rng = thread_rng(); @@ -233,7 +232,7 @@ async fn txn_write_million() -> Result<()> { const NUM_BITS_KEY_PER_TXN: u32 = 3; let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN); - clear_tikv().await?; + clear_tikv().await; let client = TransactionClient::new(pd_addrs()).await?; for i in 0..2u32.pow(NUM_BITS_TXN) { @@ -305,7 +304,7 @@ async fn txn_write_million() -> Result<()> { #[tokio::test] #[serial] async fn txn_bank_transfer() -> Result<()> { - clear_tikv().await?; + clear_tikv().await; let client = TransactionClient::new(pd_addrs()).await?; let mut rng = thread_rng(); @@ -359,7 +358,7 @@ async fn txn_bank_transfer() -> Result<()> { #[tokio::test] #[serial] async fn raw_req() -> Result<()> { - clear_tikv().await?; + clear_tikv().await; let client = RawClient::new(pd_addrs()).await?; // empty; get non-existent key @@ -489,7 +488,7 @@ async fn raw_req() -> Result<()> { #[tokio::test] #[serial] async fn test_update_safepoint() -> Result<()> { - clear_tikv().await?; + clear_tikv().await; let client = TransactionClient::new(pd_addrs()).await?; let res = client.gc(client.current_timestamp().await?).await?; assert!(res); @@ -509,7 +508,7 @@ async fn raw_write_million() -> Result<()> { const NUM_BITS_KEY_PER_TXN: u32 = 10; let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN); - clear_tikv().await?; + clear_tikv().await; let client = RawClient::new(pd_addrs()).await?; for i in 0..2u32.pow(NUM_BITS_TXN) { @@ -555,7 +554,7 @@ async fn raw_write_million() -> Result<()> { #[tokio::test] #[serial] async fn pessimistic_rollback() -> Result<()> { - clear_tikv().await?; + clear_tikv().await; let client = TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?; let mut preload_txn = client.begin_optimistic().await?; @@ -587,7 +586,7 @@ async fn pessimistic_rollback() -> Result<()> { #[tokio::test] #[serial] async fn lock_keys() -> Result<()> { - clear_tikv().await?; + clear_tikv().await; let client = TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?; diff --git a/tikv-client-common/Cargo.toml b/tikv-client-common/Cargo.toml index cae3eab..2eedd2a 100644 --- a/tikv-client-common/Cargo.toml +++ b/tikv-client-common/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] thiserror = "1" futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] } -grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false } +grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false } lazy_static = "1" log = "0.4" regex = "1" diff --git a/tikv-client-pd/Cargo.toml b/tikv-client-pd/Cargo.toml index 7eca62f..17b3798 100644 --- a/tikv-client-pd/Cargo.toml +++ b/tikv-client-pd/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] async-trait = "0.1" futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] } -grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false } +grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false } log = "0.4" tikv-client-common = { path = "../tikv-client-common" } tikv-client-proto = { path = "../tikv-client-proto" } diff --git a/tikv-client-proto/Cargo.toml b/tikv-client-proto/Cargo.toml index be9db2d..070db05 100644 --- a/tikv-client-proto/Cargo.toml +++ b/tikv-client-proto/Cargo.toml @@ -5,12 +5,12 @@ edition = "2018" build = "build.rs" [build-dependencies] -protobuf-build = { version = "0.11", default-features = false, features = ["grpcio-prost-codec"] } +protobuf-build = { version = "0.12", default-features = false, features = ["grpcio-prost-codec"] } [dependencies] protobuf = "=2.8.0" -prost = { version = "0.6" } -prost-derive = { version = "0.6" } +prost = { version = "0.7" } +prost-derive = { version = "0.7" } futures = "0.3.5" -grpcio = { version = "0.7", default-features = false, features = ["secure", "prost-codec"] } +grpcio = { version = "0.8", default-features = false, features = ["secure", "prost-codec", "use-bindgen"] } lazy_static = { version = "1.3" } diff --git a/tikv-client-store/Cargo.toml b/tikv-client-store/Cargo.toml index 67e4b13..db26090 100644 --- a/tikv-client-store/Cargo.toml +++ b/tikv-client-store/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" async-trait = "0.1" derive-new = "0.5" futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] } -grpcio = { version = "0.7", features = ["secure", "prost-codec"], default-features = false } +grpcio = { version = "0.8", features = ["secure", "prost-codec", "use-bindgen"], default-features = false } log = "0.4" tikv-client-common = { path = "../tikv-client-common" } tikv-client-proto = { path = "../tikv-client-proto" }