Move store-related functions from request to store

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2021-01-21 11:23:56 +13:00
parent ecab156fec
commit 304d8d1243
4 changed files with 104 additions and 99 deletions

View File

@ -3,11 +3,11 @@
use super::RawRpcRequest;
use crate::{
pd::PdClient,
request::{
request::KvRequest,
store::{
store_stream_for_key, store_stream_for_keys, store_stream_for_range,
store_stream_for_ranges, KvRequest,
store_stream_for_ranges, Store,
},
store::Store,
transaction::HasLocks,
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
};

View File

@ -6,15 +6,12 @@ use crate::{
stats::tikv_stats,
store::Store,
transaction::{resolve_locks, HasLocks},
BoundRange, Error, Key, Result,
Error, Result,
};
use async_trait::async_trait;
use derive_new::new;
use futures::{prelude::*, stream::BoxStream};
use std::{
cmp::{max, min},
sync::Arc,
};
use std::sync::Arc;
use tikv_client_store::{HasError, HasRegionError, Request};
const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
@ -179,93 +176,14 @@ impl RetryOptions {
}
}
pub fn store_stream_for_key<KeyData, PdC>(
key_data: KeyData,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(KeyData, Store)>>
where
KeyData: AsRef<Key> + Send + 'static,
PdC: PdClient,
{
pd_client
.store_for_key(key_data.as_ref().clone())
.map_ok(move |store| (key_data, store))
.into_stream()
.boxed()
}
/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order
pub fn store_stream_for_keys<KeyData, IntoKey, I, PdC>(
key_data: I,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<KeyData>, Store)>>
where
KeyData: AsRef<Key> + Send + Sync + 'static,
IntoKey: Into<KeyData> + 'static,
I: IntoIterator<Item = IntoKey>,
I::IntoIter: Send + Sync + 'static,
PdC: PdClient,
{
pd_client
.clone()
.group_keys_by_region(key_data.into_iter().map(Into::into))
.and_then(move |(region_id, key)| {
pd_client
.clone()
.store_for_id(region_id)
.map_ok(move |store| (key, store))
})
.boxed()
}
pub fn store_stream_for_range<PdC: PdClient>(
range: BoundRange,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<((Key, Key), Store)>> {
pd_client
.stores_for_range(range.clone())
.map_ok(move |store| {
let region_range = store.region.range();
(bound_range(region_range, range.clone()), store)
})
.into_stream()
.boxed()
}
/// The range used for request should be the intersection of `region_range` and `range`.
fn bound_range(region_range: (Key, Key), range: BoundRange) -> (Key, Key) {
let (lower, upper) = region_range;
let (lower_bound, upper_bound) = range.into_keys();
let up = match (upper.is_empty(), upper_bound) {
(_, None) => upper,
(true, Some(ub)) => ub,
(_, Some(ub)) if ub.is_empty() => upper,
(_, Some(ub)) => min(upper, ub),
};
(max(lower, lower_bound), up)
}
pub fn store_stream_for_ranges<PdC: PdClient>(
ranges: Vec<BoundRange>,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<BoundRange>, Store)>> {
pd_client
.clone()
.group_ranges_by_region(ranges)
.and_then(move |(region_id, range)| {
pd_client
.clone()
.store_for_id(region_id)
.map_ok(move |store| (range, store))
})
.into_stream()
.boxed()
}
#[cfg(test)]
mod test {
use super::*;
use crate::mock::{MockKvClient, MockPdClient};
use crate::{
mock::{MockKvClient, MockPdClient},
store::store_stream_for_key,
Key,
};
use futures::executor;
use grpcio::CallOption;
use std::{any::Any, sync::Mutex};

View File

@ -1,6 +1,13 @@
use crate::{Region, Result};
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{pd::PdClient, BoundRange, Key, Region, Result};
use derive_new::new;
use std::any::Any;
use futures::{prelude::*, stream::BoxStream};
use std::{
any::Any,
cmp::{max, min},
sync::Arc,
};
use tikv_client_store::{KvClient, KvConnect, Request, TikvConnect};
#[derive(new)]
@ -29,3 +36,86 @@ pub trait KvConnectStore: KvConnect {
}
impl KvConnectStore for TikvConnect {}
pub fn store_stream_for_key<KeyData, PdC>(
key_data: KeyData,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(KeyData, Store)>>
where
KeyData: AsRef<Key> + Send + 'static,
PdC: PdClient,
{
pd_client
.store_for_key(key_data.as_ref().clone())
.map_ok(move |store| (key_data, store))
.into_stream()
.boxed()
}
/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order
pub fn store_stream_for_keys<KeyData, IntoKey, I, PdC>(
key_data: I,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<KeyData>, Store)>>
where
KeyData: AsRef<Key> + Send + Sync + 'static,
IntoKey: Into<KeyData> + 'static,
I: IntoIterator<Item = IntoKey>,
I::IntoIter: Send + Sync + 'static,
PdC: PdClient,
{
pd_client
.clone()
.group_keys_by_region(key_data.into_iter().map(Into::into))
.and_then(move |(region_id, key)| {
pd_client
.clone()
.store_for_id(region_id)
.map_ok(move |store| (key, store))
})
.boxed()
}
pub fn store_stream_for_range<PdC: PdClient>(
range: BoundRange,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<((Key, Key), Store)>> {
pd_client
.stores_for_range(range.clone())
.map_ok(move |store| {
let region_range = store.region.range();
(bound_range(region_range, range.clone()), store)
})
.into_stream()
.boxed()
}
/// The range used for request should be the intersection of `region_range` and `range`.
fn bound_range(region_range: (Key, Key), range: BoundRange) -> (Key, Key) {
let (lower, upper) = region_range;
let (lower_bound, upper_bound) = range.into_keys();
let up = match (upper.is_empty(), upper_bound) {
(_, None) => upper,
(true, Some(ub)) => ub,
(_, Some(ub)) if ub.is_empty() => upper,
(_, Some(ub)) => min(upper, ub),
};
(max(lower, lower_bound), up)
}
pub fn store_stream_for_ranges<PdC: PdClient>(
ranges: Vec<BoundRange>,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<BoundRange>, Store)>> {
pd_client
.clone()
.group_ranges_by_region(ranges)
.and_then(move |(region_id, range)| {
pd_client
.clone()
.store_for_id(region_id)
.map_ok(move |store| (range, store))
})
.into_stream()
.boxed()
}

View File

@ -2,11 +2,8 @@
use crate::{
pd::PdClient,
request::{
store_stream_for_key, store_stream_for_keys, store_stream_for_range, KvRequest,
RetryOptions,
},
store::Store,
request::{KvRequest, RetryOptions},
store::{store_stream_for_key, store_stream_for_keys, store_stream_for_range, Store},
timestamp::TimestampExt,
transaction::HasLocks,
BoundRange, Error, Key, KvPair, Result, Value,