From 010fd4bea00527e85eb5a510ce2e4b88b766cc23 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Fri, 26 Jul 2019 06:22:01 +1200 Subject: [PATCH] test group_tasks_by_region (#86) Signed-off-by: Nick Cameron --- src/rpc/client.rs | 54 +++++++++++++++++++++++++++++++++++++++++------ src/rpc/pd/mod.rs | 2 +- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 30f31f4..b52d883 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -263,16 +263,17 @@ impl RpcClient { if tasks.is_empty() { Either::Right(ready(Ok(None))) } else { - Either::Left(self.get_region(tasks[0].key()).map_ok(move |location| { - let ver_id = location.ver_id(); + Either::Left(self.get_region(tasks[0].key()).map_ok(move |region| { + let id = region.id(); let mut grouped = Vec::new(); while let Some(task) = tasks.pop_front() { - if !location.contains(task.key()) { + if !region.contains(task.key()) { + tasks.push_front(task); break; } grouped.push(task); } - Some((tasks, (ver_id.id, grouped))) + Some((tasks, (id, grouped))) })) } }) @@ -384,7 +385,8 @@ impl GroupingTask for (Key, Option) { mod test { use super::*; use crate::rpc::pd::Timestamp; - use futures::future::BoxFuture; + use futures::executor; + use futures::future::{ready, BoxFuture}; struct MockPdClient {} @@ -398,8 +400,20 @@ mod test { Ok(MockPdClient {}) } - fn get_region(self: Arc, _key: &[u8]) -> BoxFuture<'static, Result> { - unimplemented!(); + fn get_region(self: Arc, key: &[u8]) -> BoxFuture<'static, Result> { + let region = if key.len() <= 1 { + let mut region = Region::default(); + region.region.set_start_key(vec![0]); + region.region.set_end_key(vec![4]); + region + } else { + let mut region = Region::default(); + region.region.set_start_key(vec![4]); + region.region.set_end_key(vec![8]); + region + }; + + Box::pin(ready(Ok(region))) } fn get_region_by_id(self: Arc, _id: RegionId) -> BoxFuture<'static, Result> { @@ -448,4 +462,30 @@ mod test { assert!(&*kv1 as *const _ != &*kv2 as *const _); assert_eq!(&*kv2 as *const _, &*kv3 as *const _); } + + #[test] + fn test_group_tasks_by_region() { + let client = mock_rpc_client(); + + let tasks: Vec = vec![ + vec![1].into(), + vec![2].into(), + vec![3].into(), + vec![5, 1].into(), + vec![5, 2].into(), + ]; + + let stream = Arc::new(client).group_tasks_by_region(tasks); + let mut stream = executor::block_on_stream(stream); + + assert_eq!( + stream.next().unwrap().unwrap().1, + vec![vec![1].into(), vec![2].into(), vec![3].into()] + ); + assert_eq!( + stream.next().unwrap().unwrap().1, + vec![vec![5, 1].into(), vec![5, 2].into()] + ); + assert!(stream.next().is_none()); + } } diff --git a/src/rpc/pd/mod.rs b/src/rpc/pd/mod.rs index 2715701..79d2cbf 100644 --- a/src/rpc/pd/mod.rs +++ b/src/rpc/pd/mod.rs @@ -40,7 +40,7 @@ impl Region { let key: &[u8] = key.into(); let start_key = self.region.get_start_key(); let end_key = self.region.get_end_key(); - start_key <= key && (end_key > key || end_key.is_empty()) + key >= start_key && (key < end_key || end_key.is_empty()) } pub fn context(&self) -> Result {