diff --git a/src/raw/requests.rs b/src/raw/requests.rs index bc91d0c..75d5fe4 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -3,11 +3,11 @@ use super::RawRpcRequest; use crate::{ pd::PdClient, - request::{ + request::KvRequest, + store::{ store_stream_for_key, store_stream_for_keys, store_stream_for_range, - store_stream_for_ranges, KvRequest, + store_stream_for_ranges, Store, }, - store::Store, transaction::HasLocks, BoundRange, ColumnFamily, Key, KvPair, Result, Value, }; diff --git a/src/request.rs b/src/request.rs index 299f5a6..4b183de 100644 --- a/src/request.rs +++ b/src/request.rs @@ -6,15 +6,12 @@ use crate::{ stats::tikv_stats, store::Store, transaction::{resolve_locks, HasLocks}, - BoundRange, Error, Key, Result, + Error, Result, }; use async_trait::async_trait; use derive_new::new; use futures::{prelude::*, stream::BoxStream}; -use std::{ - cmp::{max, min}, - sync::Arc, -}; +use std::sync::Arc; use tikv_client_store::{HasError, HasRegionError, Request}; const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); @@ -179,93 +176,14 @@ impl RetryOptions { } } -pub fn store_stream_for_key( - key_data: KeyData, - pd_client: Arc, -) -> BoxStream<'static, Result<(KeyData, Store)>> -where - KeyData: AsRef + Send + 'static, - PdC: PdClient, -{ - pd_client - .store_for_key(key_data.as_ref().clone()) - .map_ok(move |store| (key_data, store)) - .into_stream() - .boxed() -} - -/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order -pub fn store_stream_for_keys( - key_data: I, - pd_client: Arc, -) -> BoxStream<'static, Result<(Vec, Store)>> -where - KeyData: AsRef + Send + Sync + 'static, - IntoKey: Into + 'static, - I: IntoIterator, - I::IntoIter: Send + Sync + 'static, - PdC: PdClient, -{ - pd_client - .clone() - .group_keys_by_region(key_data.into_iter().map(Into::into)) - .and_then(move |(region_id, key)| { - pd_client - .clone() - .store_for_id(region_id) - .map_ok(move |store| (key, store)) - }) - .boxed() -} - -pub fn store_stream_for_range( - range: BoundRange, - pd_client: Arc, -) -> BoxStream<'static, Result<((Key, Key), Store)>> { - pd_client - .stores_for_range(range.clone()) - .map_ok(move |store| { - let region_range = store.region.range(); - (bound_range(region_range, range.clone()), store) - }) - .into_stream() - .boxed() -} - -/// The range used for request should be the intersection of `region_range` and `range`. -fn bound_range(region_range: (Key, Key), range: BoundRange) -> (Key, Key) { - let (lower, upper) = region_range; - let (lower_bound, upper_bound) = range.into_keys(); - let up = match (upper.is_empty(), upper_bound) { - (_, None) => upper, - (true, Some(ub)) => ub, - (_, Some(ub)) if ub.is_empty() => upper, - (_, Some(ub)) => min(upper, ub), - }; - (max(lower, lower_bound), up) -} - -pub fn store_stream_for_ranges( - ranges: Vec, - pd_client: Arc, -) -> BoxStream<'static, Result<(Vec, Store)>> { - pd_client - .clone() - .group_ranges_by_region(ranges) - .and_then(move |(region_id, range)| { - pd_client - .clone() - .store_for_id(region_id) - .map_ok(move |store| (range, store)) - }) - .into_stream() - .boxed() -} - #[cfg(test)] mod test { use super::*; - use crate::mock::{MockKvClient, MockPdClient}; + use crate::{ + mock::{MockKvClient, MockPdClient}, + store::store_stream_for_key, + Key, + }; use futures::executor; use grpcio::CallOption; use std::{any::Any, sync::Mutex}; diff --git a/src/store.rs b/src/store.rs index d6d395e..5b96f6f 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,6 +1,13 @@ -use crate::{Region, Result}; +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::{pd::PdClient, BoundRange, Key, Region, Result}; use derive_new::new; -use std::any::Any; +use futures::{prelude::*, stream::BoxStream}; +use std::{ + any::Any, + cmp::{max, min}, + sync::Arc, +}; use tikv_client_store::{KvClient, KvConnect, Request, TikvConnect}; #[derive(new)] @@ -29,3 +36,86 @@ pub trait KvConnectStore: KvConnect { } impl KvConnectStore for TikvConnect {} + +pub fn store_stream_for_key( + key_data: KeyData, + pd_client: Arc, +) -> BoxStream<'static, Result<(KeyData, Store)>> +where + KeyData: AsRef + Send + 'static, + PdC: PdClient, +{ + pd_client + .store_for_key(key_data.as_ref().clone()) + .map_ok(move |store| (key_data, store)) + .into_stream() + .boxed() +} + +/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order +pub fn store_stream_for_keys( + key_data: I, + pd_client: Arc, +) -> BoxStream<'static, Result<(Vec, Store)>> +where + KeyData: AsRef + Send + Sync + 'static, + IntoKey: Into + 'static, + I: IntoIterator, + I::IntoIter: Send + Sync + 'static, + PdC: PdClient, +{ + pd_client + .clone() + .group_keys_by_region(key_data.into_iter().map(Into::into)) + .and_then(move |(region_id, key)| { + pd_client + .clone() + .store_for_id(region_id) + .map_ok(move |store| (key, store)) + }) + .boxed() +} + +pub fn store_stream_for_range( + range: BoundRange, + pd_client: Arc, +) -> BoxStream<'static, Result<((Key, Key), Store)>> { + pd_client + .stores_for_range(range.clone()) + .map_ok(move |store| { + let region_range = store.region.range(); + (bound_range(region_range, range.clone()), store) + }) + .into_stream() + .boxed() +} + +/// The range used for request should be the intersection of `region_range` and `range`. +fn bound_range(region_range: (Key, Key), range: BoundRange) -> (Key, Key) { + let (lower, upper) = region_range; + let (lower_bound, upper_bound) = range.into_keys(); + let up = match (upper.is_empty(), upper_bound) { + (_, None) => upper, + (true, Some(ub)) => ub, + (_, Some(ub)) if ub.is_empty() => upper, + (_, Some(ub)) => min(upper, ub), + }; + (max(lower, lower_bound), up) +} + +pub fn store_stream_for_ranges( + ranges: Vec, + pd_client: Arc, +) -> BoxStream<'static, Result<(Vec, Store)>> { + pd_client + .clone() + .group_ranges_by_region(ranges) + .and_then(move |(region_id, range)| { + pd_client + .clone() + .store_for_id(region_id) + .map_ok(move |store| (range, store)) + }) + .into_stream() + .boxed() +} diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 9f89e58..fb4672e 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -2,11 +2,8 @@ use crate::{ pd::PdClient, - request::{ - store_stream_for_key, store_stream_for_keys, store_stream_for_range, KvRequest, - RetryOptions, - }, - store::Store, + request::{KvRequest, RetryOptions}, + store::{store_stream_for_key, store_stream_for_keys, store_stream_for_range, Store}, timestamp::TimestampExt, transaction::HasLocks, BoundRange, Error, Key, KvPair, Result, Value,