Refactor some types in rpc/client

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2019-05-06 19:35:03 +12:00
parent 89aee91bf4
commit 8da60f24ee
1 changed files with 26 additions and 57 deletions

View File

@ -4,9 +4,8 @@
#![allow(dead_code)]
use std::{
collections::HashMap,
collections::hash_map::{self, HashMap},
fmt,
ops::Deref,
sync::{Arc, RwLock},
time::Duration,
};
@ -108,7 +107,7 @@ impl RpcClientInner {
}
fn locate_key(&self, key: &Key) -> impl Future<Output = Result<KeyLocation>> {
self.load_region(key).map_ok(KeyLocation::new)
self.load_region(key)
}
fn kv_client(&self, context: RegionContext) -> Result<(RegionContext, Arc<KvClient>)> {
@ -197,7 +196,7 @@ impl RpcClient {
let peer = location.peer().expect("leader must exist");
let store_id = peer.get_store_id();
inner.load_store(store_id).map_ok(|store| RegionContext {
region: location.into_inner(),
region: location,
store,
})
})
@ -257,7 +256,6 @@ impl RpcClient {
let inner = self.inner();
self.group_tasks_by_region(keys)
.and_then(move |task_groups| {
let task_groups = task_groups.into_inner();
let mut tasks = Vec::with_capacity(task_groups.len());
for (region, keys) in task_groups.into_iter() {
let inner = Arc::clone(&inner);
@ -302,7 +300,6 @@ impl RpcClient {
Either::Right(
self.group_tasks_by_region(pairs)
.and_then(move |task_groups| {
let task_groups = task_groups.into_inner();
let mut tasks = Vec::with_capacity(task_groups.len());
for (region, pairs) in task_groups.into_iter() {
let inner = Arc::clone(&inner);
@ -336,7 +333,6 @@ impl RpcClient {
let inner = self.inner();
self.group_tasks_by_region(keys)
.and_then(move |task_groups| {
let task_groups = task_groups.into_inner();
let mut tasks = Vec::with_capacity(task_groups.len());
for (region, keys) in task_groups.into_iter() {
let inner = Arc::clone(&inner);
@ -374,8 +370,8 @@ impl RpcClient {
let inner = Arc::clone(&self.inner);
loop_fn((inner, scan), |(inner, scan)| {
inner.locate_key(scan.start_key()).and_then(|location| {
let region = location.into_inner();
let cf = scan.cf.clone();
let region = location;
let cf = scan.state.cf.clone();
Self::region_context_by_id(Arc::clone(&inner), region.id)
.map_ok(|(region, client)| {
(scan, region.range(), RawContext::new(region, client, cf))
@ -384,11 +380,17 @@ impl RpcClient {
let (start_key, end_key) = scan.range();
context
.client()
.raw_scan(context, start_key, end_key, scan.limit, scan.key_only)
.raw_scan(
context,
start_key,
end_key,
scan.state.limit,
scan.state.key_only,
)
.map_ok(|pairs| (scan, region_range, pairs))
})
.map_ok(|(mut scan, region_range, mut pairs)| {
let limit = scan.limit;
let limit = scan.state.limit;
scan.result_mut().append(&mut pairs);
if scan.result().len() as u32 >= limit {
Loop::Break(scan.into_inner())
@ -424,8 +426,8 @@ impl RpcClient {
let inner = Arc::clone(&self.inner);
loop_fn((inner, scan), |(inner, scan)| {
inner.locate_key(scan.start_key()).and_then(|location| {
let region = location.into_inner();
let cf = scan.clone();
let region = location;
let cf = scan.state.clone();
Self::region_context_by_id(Arc::clone(&inner), region.id)
.map_ok(|(region, client)| {
(scan, region.range(), RawContext::new(region, client, cf))
@ -523,25 +525,7 @@ impl TxnContext {
}
}
struct KeyLocation(Region);
impl KeyLocation {
fn new(region: Region) -> Self {
KeyLocation(region)
}
fn into_inner(self) -> Region {
self.0
}
}
impl Deref for KeyLocation {
type Target = Region;
fn deref(&self) -> &Self::Target {
&self.0
}
}
type KeyLocation = Region;
trait GroupingTask: Clone + Default + Sized {
fn key(&self) -> &Key;
@ -550,16 +534,14 @@ trait GroupingTask: Clone + Default + Sized {
#[derive(Default)]
struct GroupedTasks<Task: GroupingTask>(HashMap<RegionVerId, Vec<Task>>, RegionVerId);
impl<Task> GroupedTasks<Task>
where
Task: GroupingTask,
{
impl<Task: GroupingTask> GroupedTasks<Task> {
fn new(ver_id: RegionVerId, task: Task) -> Self {
let mut map = HashMap::with_capacity(1);
map.insert(ver_id.clone(), vec![task]);
GroupedTasks(map, ver_id)
}
#[inline]
fn add(&mut self, ver_id: RegionVerId, task: Task) {
self.0
.entry(ver_id)
@ -567,19 +549,18 @@ where
.push(task)
}
fn into_inner(self) -> HashMap<RegionVerId, Vec<Task>> {
self.0
#[inline]
fn len(&self) -> usize {
self.0.len()
}
}
impl<Task> Deref for GroupedTasks<Task>
where
Task: GroupingTask,
{
type Target = HashMap<RegionVerId, Vec<Task>>;
impl<Task: GroupingTask> IntoIterator for GroupedTasks<Task> {
type Item = (RegionVerId, Vec<Task>);
type IntoIter = hash_map::IntoIter<RegionVerId, Vec<Task>>;
fn deref(&self) -> &Self::Target {
&self.0
fn into_iter(self) -> hash_map::IntoIter<RegionVerId, Vec<Task>> {
self.0.into_iter()
}
}
@ -666,15 +647,3 @@ where
&self.result
}
}
impl<Res, State> Deref for ScanRegionsContext<Res, State>
where
Res: Default,
State: Sized,
{
type Target = State;
fn deref(&self) -> &Self::Target {
&self.state
}
}