diff --git a/src/kv/key.rs b/src/kv/key.rs index 1bbe11e..b46510b 100644 --- a/src/kv/key.rs +++ b/src/kv/key.rs @@ -128,10 +128,6 @@ impl Key { } } -pub trait HasKeys { - fn get_keys(&self) -> Vec; -} - impl From> for Key { fn from(v: Vec) -> Self { Key(v) diff --git a/src/kv/mod.rs b/src/kv/mod.rs index 7831bea..e394216 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -8,7 +8,7 @@ mod kvpair; mod value; pub use bound_range::{BoundRange, IntoOwnedRange}; -pub use key::{HasKeys, Key}; +pub use key::Key; pub use kvpair::KvPair; pub use value::Value; diff --git a/src/request/mod.rs b/src/request/mod.rs index 9252280..0f21efb 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -10,8 +10,9 @@ use tikv_client_store::{HasError, Request}; pub use self::{ plan::{ - Collect, CollectAndMatchKey, CollectError, DefaultProcessor, Dispatch, ExtractError, Merge, - MergeResponse, MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion, + Collect, CollectAndMatchKey, CollectError, DefaultProcessor, Dispatch, ExtractError, + HasKeys, Merge, MergeResponse, MultiRegion, Plan, PreserveKey, Process, ProcessResponse, + ResolveLock, RetryRegion, }, plan_builder::{PlanBuilder, SingleKey}, shard::Shardable, diff --git a/src/request/plan.rs b/src/request/plan.rs index d9b3059..0f3bb51 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -2,7 +2,6 @@ use crate::{ backoff::Backoff, - kv::HasKeys, pd::PdClient, request::{KvRequest, Shardable}, stats::tikv_stats, @@ -340,11 +339,71 @@ where async fn execute(&self) -> Result { let keys = self.inner.get_keys(); let res = self.inner.execute().await?; - // TODO: should we check they have the same length? Ok(ResponseAndKeys(res, keys)) } } +pub trait HasKeys { + fn get_keys(&self) -> Vec; +} + +// contains a response and the corresponding keys +// currently only used for matching keys and values in pessimistic lock requests +#[derive(Debug, Clone)] +pub struct ResponseAndKeys(Resp, Vec); + +impl HasError for ResponseAndKeys { + fn error(&mut self) -> Option { + self.0.error() + } +} + +impl HasLocks for ResponseAndKeys { + fn take_locks(&mut self) -> Vec { + self.0.take_locks() + } +} + +impl HasRegionError for ResponseAndKeys { + fn region_error(&mut self) -> Option { + self.0.region_error() + } +} + +impl Merge> for CollectAndMatchKey { + type Out = Vec; + + fn merge( + &self, + input: Vec>>, + ) -> Result { + input + .into_iter() + .flat_map_ok(|ResponseAndKeys(mut resp, keys)| { + let values = resp.take_values(); + let not_founds = resp.take_not_founds(); + let v: Vec<_> = if not_founds.is_empty() { + // Legacy TiKV does not distiguish not existing key and existing key + // that with empty value. We assume that key does not exist if value + // is empty. + let values: Vec = values.into_iter().filter(|v| v.is_empty()).collect(); + keys.into_iter().zip(values).map(From::from).collect() + } else { + assert_eq!(values.len(), not_founds.len()); + let values: Vec = values + .into_iter() + .zip(not_founds.into_iter()) + .filter_map(|(v, not_found)| if not_found { None } else { Some(v) }) + .collect(); + keys.into_iter().zip(values).map(From::from).collect() + }; + // FIXME sucks to collect and re-iterate, but the iterators have different types + v.into_iter() + }) + .collect() + } +} + #[cfg(test)] mod test { use super::*; @@ -402,60 +461,3 @@ mod test { .for_each(|r| assert!(r.is_err())); } } - -// contains a response and the corresponding keys -// currently only used for matching keys and values in pessimistic lock requests -#[derive(Debug, Clone)] -pub struct ResponseAndKeys(Resp, Vec); - -impl HasError for ResponseAndKeys { - fn error(&mut self) -> Option { - self.0.error() - } -} - -impl HasLocks for ResponseAndKeys { - fn take_locks(&mut self) -> Vec { - self.0.take_locks() - } -} - -impl HasRegionError for ResponseAndKeys { - fn region_error(&mut self) -> Option { - self.0.region_error() - } -} - -impl Merge> for CollectAndMatchKey { - type Out = Vec; - - fn merge( - &self, - input: Vec>>, - ) -> Result { - input - .into_iter() - .flat_map_ok(|ResponseAndKeys(mut resp, keys)| { - let values = resp.take_values(); - let not_founds = resp.take_not_founds(); - let v: Vec<_> = if not_founds.is_empty() { - // Legacy TiKV does not distiguish not existing key and existing key - // that with empty value. We assume that key does not exist if value - // is empty. - let values: Vec = values.into_iter().filter(|v| v.is_empty()).collect(); - keys.into_iter().zip(values).map(From::from).collect() - } else { - assert_eq!(values.len(), not_founds.len()); - let values: Vec = values - .into_iter() - .zip(not_founds.into_iter()) - .filter_map(|(v, not_found)| if not_found { None } else { Some(v) }) - .collect(); - keys.into_iter().zip(values).map(From::from).collect() - }; - // FIXME sucks to collect and re-iterate, but the iterators have different types - v.into_iter() - }) - .collect() - } -} diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 7ee5a87..5df26c9 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -1,12 +1,12 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. +use super::PreserveKey; use crate::{ backoff::Backoff, - kv::HasKeys, pd::PdClient, request::{ - DefaultProcessor, Dispatch, ExtractError, KvRequest, Merge, MergeResponse, MultiRegion, - Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable, + DefaultProcessor, Dispatch, ExtractError, HasKeys, KvRequest, Merge, MergeResponse, + MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable, }, store::Store, transaction::HasLocks, @@ -15,8 +15,6 @@ use crate::{ use std::{marker::PhantomData, sync::Arc}; use tikv_client_store::HasError; -use super::plan::PreserveKey; - /// Builder type for plans (see that module for more). pub struct PlanBuilder { pd_client: Arc, diff --git a/src/request/shard.rs b/src/request/shard.rs index 4a565f5..e1300c5 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -1,16 +1,30 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ - kv::HasKeys, pd::PdClient, - request::{Dispatch, KvRequest, Plan, ResolveLock, RetryRegion}, + request::{Dispatch, HasKeys, KvRequest, Plan, PreserveKey, ResolveLock, RetryRegion}, store::Store, Result, }; use futures::stream::BoxStream; use std::sync::Arc; -use super::plan::PreserveKey; +macro_rules! impl_inner_shardable { + () => { + type Shard = P::Shard; + + fn shards( + &self, + pd_client: &Arc, + ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { + self.inner.shards(pd_client) + } + + fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { + self.inner.apply_shard(shard, store) + } + }; +} pub trait Shardable { type Shard: Send; @@ -40,48 +54,15 @@ impl Shardable for Dispatch { } impl Shardable for ResolveLock { - type Shard = P::Shard; - - fn shards( - &self, - pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { - self.inner.shards(pd_client) - } - - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { - self.inner.apply_shard(shard, store) - } + impl_inner_shardable!(); } impl Shardable for PreserveKey

{ - type Shard = P::Shard; - - fn shards( - &self, - pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { - self.inner.shards(pd_client) - } - - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { - self.inner.apply_shard(shard, store) - } + impl_inner_shardable!(); } impl Shardable for RetryRegion { - type Shard = P::Shard; - - fn shards( - &self, - pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { - self.inner.shards(pd_client) - } - - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { - self.inner.apply_shard(shard, store) - } + impl_inner_shardable!(); } #[macro_export] diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 6b48c21..07667bb 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -1,9 +1,10 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ - kv::HasKeys, pd::PdClient, - request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey}, + request::{ + Collect, DefaultProcessor, HasKeys, KvRequest, Merge, Process, Shardable, SingleKey, + }, store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store}, timestamp::TimestampExt, transaction::HasLocks, diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index d3bb5f3..27f2ac2 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -290,7 +290,7 @@ async fn txn_write_million() -> Result<()> { assert_eq!(sum, 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN)); // test batch_get and batch_get_for_update - const SKIP_BITS: u32 = 6; // do not retrive all because there's a limit of message size + const SKIP_BITS: u32 = 7; // do not retrive all because there's a limit of message size let mut cur = 0u32; let keys = iter::repeat_with(|| { let v = cur;