test group_tasks_by_region (#86)

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2019-07-26 06:22:01 +12:00 committed by Ana Hobden
parent abc39fe10f
commit 010fd4bea0
2 changed files with 48 additions and 8 deletions

View File

@ -263,16 +263,17 @@ impl<PdC: PdClient> RpcClient<PdC> {
if tasks.is_empty() {
Either::Right(ready(Ok(None)))
} else {
Either::Left(self.get_region(tasks[0].key()).map_ok(move |location| {
let ver_id = location.ver_id();
Either::Left(self.get_region(tasks[0].key()).map_ok(move |region| {
let id = region.id();
let mut grouped = Vec::new();
while let Some(task) = tasks.pop_front() {
if !location.contains(task.key()) {
if !region.contains(task.key()) {
tasks.push_front(task);
break;
}
grouped.push(task);
}
Some((tasks, (ver_id.id, grouped)))
Some((tasks, (id, grouped)))
}))
}
})
@ -384,7 +385,8 @@ impl GroupingTask for (Key, Option<Key>) {
mod test {
use super::*;
use crate::rpc::pd::Timestamp;
use futures::future::BoxFuture;
use futures::executor;
use futures::future::{ready, BoxFuture};
struct MockPdClient {}
@ -398,8 +400,20 @@ mod test {
Ok(MockPdClient {})
}
fn get_region(self: Arc<Self>, _key: &[u8]) -> BoxFuture<'static, Result<Region>> {
unimplemented!();
fn get_region(self: Arc<Self>, key: &[u8]) -> BoxFuture<'static, Result<Region>> {
let region = if key.len() <= 1 {
let mut region = Region::default();
region.region.set_start_key(vec![0]);
region.region.set_end_key(vec![4]);
region
} else {
let mut region = Region::default();
region.region.set_start_key(vec![4]);
region.region.set_end_key(vec![8]);
region
};
Box::pin(ready(Ok(region)))
}
fn get_region_by_id(self: Arc<Self>, _id: RegionId) -> BoxFuture<'static, Result<Region>> {
@ -448,4 +462,30 @@ mod test {
assert!(&*kv1 as *const _ != &*kv2 as *const _);
assert_eq!(&*kv2 as *const _, &*kv3 as *const _);
}
#[test]
fn test_group_tasks_by_region() {
let client = mock_rpc_client();
let tasks: Vec<Key> = vec![
vec![1].into(),
vec![2].into(),
vec![3].into(),
vec![5, 1].into(),
vec![5, 2].into(),
];
let stream = Arc::new(client).group_tasks_by_region(tasks);
let mut stream = executor::block_on_stream(stream);
assert_eq!(
stream.next().unwrap().unwrap().1,
vec![vec![1].into(), vec![2].into(), vec![3].into()]
);
assert_eq!(
stream.next().unwrap().unwrap().1,
vec![vec![5, 1].into(), vec![5, 2].into()]
);
assert!(stream.next().is_none());
}
}

View File

@ -40,7 +40,7 @@ impl Region {
let key: &[u8] = key.into();
let start_key = self.region.get_start_key();
let end_key = self.region.get_end_key();
start_key <= key && (end_key > key || end_key.is_empty())
key >= start_key && (key < end_key || end_key.is_empty())
}
pub fn context(&self) -> Result<kvrpcpb::Context> {