style: resolve comments

Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
ekexium 2021-03-18 14:28:43 +08:00
parent d789cc650b
commit 4120b76455
8 changed files with 92 additions and 113 deletions

View File

@ -128,10 +128,6 @@ impl Key {
} }
} }
pub trait HasKeys {
fn get_keys(&self) -> Vec<Key>;
}
impl From<Vec<u8>> for Key { impl From<Vec<u8>> for Key {
fn from(v: Vec<u8>) -> Self { fn from(v: Vec<u8>) -> Self {
Key(v) Key(v)

View File

@ -8,7 +8,7 @@ mod kvpair;
mod value; mod value;
pub use bound_range::{BoundRange, IntoOwnedRange}; pub use bound_range::{BoundRange, IntoOwnedRange};
pub use key::{HasKeys, Key}; pub use key::Key;
pub use kvpair::KvPair; pub use kvpair::KvPair;
pub use value::Value; pub use value::Value;

View File

@ -10,8 +10,9 @@ use tikv_client_store::{HasError, Request};
pub use self::{ pub use self::{
plan::{ plan::{
Collect, CollectAndMatchKey, CollectError, DefaultProcessor, Dispatch, ExtractError, Merge, Collect, CollectAndMatchKey, CollectError, DefaultProcessor, Dispatch, ExtractError,
MergeResponse, MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion, HasKeys, Merge, MergeResponse, MultiRegion, Plan, PreserveKey, Process, ProcessResponse,
ResolveLock, RetryRegion,
}, },
plan_builder::{PlanBuilder, SingleKey}, plan_builder::{PlanBuilder, SingleKey},
shard::Shardable, shard::Shardable,

View File

@ -2,7 +2,6 @@
use crate::{ use crate::{
backoff::Backoff, backoff::Backoff,
kv::HasKeys,
pd::PdClient, pd::PdClient,
request::{KvRequest, Shardable}, request::{KvRequest, Shardable},
stats::tikv_stats, stats::tikv_stats,
@ -340,11 +339,71 @@ where
async fn execute(&self) -> Result<Self::Result> { async fn execute(&self) -> Result<Self::Result> {
let keys = self.inner.get_keys(); let keys = self.inner.get_keys();
let res = self.inner.execute().await?; let res = self.inner.execute().await?;
// TODO: should we check they have the same length?
Ok(ResponseAndKeys(res, keys)) Ok(ResponseAndKeys(res, keys))
} }
} }
pub trait HasKeys {
fn get_keys(&self) -> Vec<Key>;
}
// contains a response and the corresponding keys
// currently only used for matching keys and values in pessimistic lock requests
#[derive(Debug, Clone)]
pub struct ResponseAndKeys<Resp>(Resp, Vec<Key>);
impl<Resp: HasError> HasError for ResponseAndKeys<Resp> {
fn error(&mut self) -> Option<Error> {
self.0.error()
}
}
impl<Resp: HasLocks> HasLocks for ResponseAndKeys<Resp> {
fn take_locks(&mut self) -> Vec<tikv_client_proto::kvrpcpb::LockInfo> {
self.0.take_locks()
}
}
impl<Resp: HasRegionError> HasRegionError for ResponseAndKeys<Resp> {
fn region_error(&mut self) -> Option<Error> {
self.0.region_error()
}
}
impl Merge<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>> for CollectAndMatchKey {
type Out = Vec<KvPair>;
fn merge(
&self,
input: Vec<Result<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>>>,
) -> Result<Self::Out> {
input
.into_iter()
.flat_map_ok(|ResponseAndKeys(mut resp, keys)| {
let values = resp.take_values();
let not_founds = resp.take_not_founds();
let v: Vec<_> = if not_founds.is_empty() {
// Legacy TiKV does not distiguish not existing key and existing key
// that with empty value. We assume that key does not exist if value
// is empty.
let values: Vec<Value> = values.into_iter().filter(|v| v.is_empty()).collect();
keys.into_iter().zip(values).map(From::from).collect()
} else {
assert_eq!(values.len(), not_founds.len());
let values: Vec<Value> = values
.into_iter()
.zip(not_founds.into_iter())
.filter_map(|(v, not_found)| if not_found { None } else { Some(v) })
.collect();
keys.into_iter().zip(values).map(From::from).collect()
};
// FIXME sucks to collect and re-iterate, but the iterators have different types
v.into_iter()
})
.collect()
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
@ -402,60 +461,3 @@ mod test {
.for_each(|r| assert!(r.is_err())); .for_each(|r| assert!(r.is_err()));
} }
} }
// contains a response and the corresponding keys
// currently only used for matching keys and values in pessimistic lock requests
#[derive(Debug, Clone)]
pub struct ResponseAndKeys<Resp>(Resp, Vec<Key>);
impl<Resp: HasError> HasError for ResponseAndKeys<Resp> {
fn error(&mut self) -> Option<Error> {
self.0.error()
}
}
impl<Resp: HasLocks> HasLocks for ResponseAndKeys<Resp> {
fn take_locks(&mut self) -> Vec<tikv_client_proto::kvrpcpb::LockInfo> {
self.0.take_locks()
}
}
impl<Resp: HasRegionError> HasRegionError for ResponseAndKeys<Resp> {
fn region_error(&mut self) -> Option<Error> {
self.0.region_error()
}
}
impl Merge<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>> for CollectAndMatchKey {
type Out = Vec<KvPair>;
fn merge(
&self,
input: Vec<Result<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>>>,
) -> Result<Self::Out> {
input
.into_iter()
.flat_map_ok(|ResponseAndKeys(mut resp, keys)| {
let values = resp.take_values();
let not_founds = resp.take_not_founds();
let v: Vec<_> = if not_founds.is_empty() {
// Legacy TiKV does not distiguish not existing key and existing key
// that with empty value. We assume that key does not exist if value
// is empty.
let values: Vec<Value> = values.into_iter().filter(|v| v.is_empty()).collect();
keys.into_iter().zip(values).map(From::from).collect()
} else {
assert_eq!(values.len(), not_founds.len());
let values: Vec<Value> = values
.into_iter()
.zip(not_founds.into_iter())
.filter_map(|(v, not_found)| if not_found { None } else { Some(v) })
.collect();
keys.into_iter().zip(values).map(From::from).collect()
};
// FIXME sucks to collect and re-iterate, but the iterators have different types
v.into_iter()
})
.collect()
}
}

View File

@ -1,12 +1,12 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use super::PreserveKey;
use crate::{ use crate::{
backoff::Backoff, backoff::Backoff,
kv::HasKeys,
pd::PdClient, pd::PdClient,
request::{ request::{
DefaultProcessor, Dispatch, ExtractError, KvRequest, Merge, MergeResponse, MultiRegion, DefaultProcessor, Dispatch, ExtractError, HasKeys, KvRequest, Merge, MergeResponse,
Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable, MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable,
}, },
store::Store, store::Store,
transaction::HasLocks, transaction::HasLocks,
@ -15,8 +15,6 @@ use crate::{
use std::{marker::PhantomData, sync::Arc}; use std::{marker::PhantomData, sync::Arc};
use tikv_client_store::HasError; use tikv_client_store::HasError;
use super::plan::PreserveKey;
/// Builder type for plans (see that module for more). /// Builder type for plans (see that module for more).
pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> { pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
pd_client: Arc<PdC>, pd_client: Arc<PdC>,

View File

@ -1,16 +1,30 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{ use crate::{
kv::HasKeys,
pd::PdClient, pd::PdClient,
request::{Dispatch, KvRequest, Plan, ResolveLock, RetryRegion}, request::{Dispatch, HasKeys, KvRequest, Plan, PreserveKey, ResolveLock, RetryRegion},
store::Store, store::Store,
Result, Result,
}; };
use futures::stream::BoxStream; use futures::stream::BoxStream;
use std::sync::Arc; use std::sync::Arc;
use super::plan::PreserveKey; macro_rules! impl_inner_shardable {
() => {
type Shard = P::Shard;
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.inner.apply_shard(shard, store)
}
};
}
pub trait Shardable { pub trait Shardable {
type Shard: Send; type Shard: Send;
@ -40,48 +54,15 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
} }
impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLock<P, PdC> { impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLock<P, PdC> {
type Shard = P::Shard; impl_inner_shardable!();
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.inner.apply_shard(shard, store)
}
} }
impl<P: Plan + HasKeys + Shardable> Shardable for PreserveKey<P> { impl<P: Plan + HasKeys + Shardable> Shardable for PreserveKey<P> {
type Shard = P::Shard; impl_inner_shardable!();
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.inner.apply_shard(shard, store)
}
} }
impl<P: Plan + Shardable, PdC: PdClient> Shardable for RetryRegion<P, PdC> { impl<P: Plan + Shardable, PdC: PdClient> Shardable for RetryRegion<P, PdC> {
type Shard = P::Shard; impl_inner_shardable!();
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
self.inner.shards(pd_client)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.inner.apply_shard(shard, store)
}
} }
#[macro_export] #[macro_export]

View File

@ -1,9 +1,10 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{ use crate::{
kv::HasKeys,
pd::PdClient, pd::PdClient,
request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey}, request::{
Collect, DefaultProcessor, HasKeys, KvRequest, Merge, Process, Shardable, SingleKey,
},
store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store}, store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store},
timestamp::TimestampExt, timestamp::TimestampExt,
transaction::HasLocks, transaction::HasLocks,

View File

@ -290,7 +290,7 @@ async fn txn_write_million() -> Result<()> {
assert_eq!(sum, 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN)); assert_eq!(sum, 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN));
// test batch_get and batch_get_for_update // test batch_get and batch_get_for_update
const SKIP_BITS: u32 = 6; // do not retrive all because there's a limit of message size const SKIP_BITS: u32 = 7; // do not retrive all because there's a limit of message size
let mut cur = 0u32; let mut cur = 0u32;
let keys = iter::repeat_with(|| { let keys = iter::repeat_with(|| {
let v = cur; let v = cur;