diff --git a/examples/raw.rs b/examples/raw.rs index 2fb2981..eebe624 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -58,7 +58,7 @@ fn main() -> Result<()> { // It is best to pass a `Vec` in terms of explictness and speed. `String`s and a few other // types are supported as well, but it all ends up as `Vec` in the end. let key: String = String::from(KEY); - let value: Value = client.get(key.clone()).wait()?; + let value: Value = client.get(key.clone()).wait()?.expect("value must exist"); assert_eq!(value.as_ref(), VALUE.as_bytes()); println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY); diff --git a/src/raw.rs b/src/raw.rs index 3e3a432..ce55204 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -66,7 +66,7 @@ impl Client { /// # let connected_client = connecting_client.wait().unwrap(); /// let key = "TiKV"; /// let req = connected_client.get(key); - /// let result: Value = req.wait().unwrap(); + /// let result: Option = req.wait().unwrap(); /// ``` pub fn get(&self, key: impl Into) -> Get { Get::new(self.rpc(), GetInner::new(key.into())) @@ -388,7 +388,7 @@ impl Get { } impl Future for Get { - type Item = Value; + type Item = Option; type Error = Error; fn poll(&mut self) -> Poll { @@ -407,7 +407,7 @@ impl GetInner { } impl RequestInner for GetInner { - type Resp = Value; + type Resp = Option; fn execute(self, client: Arc, cf: Option) -> KvFuture { Box::new(client.raw_get(self.key, cf)) diff --git a/src/rpc/client.rs b/src/rpc/client.rs index c6e1508..89a19a1 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -231,12 +231,11 @@ impl RpcClient { key: &Key, cf: Option, ) -> impl Future { - Self::region_context(inner, key) - .map(move |(region, client)| RawContext::new(region, client, cf)) + Self::region_context(inner, key).map(|(region, client)| RawContext::new(region, client, cf)) } fn txn(inner: Arc, key: &Key) -> impl Future { - Self::region_context(inner, key).map(move |(region, _client)| TxnContext::new(region)) + Self::region_context(inner, key).map(|(region, _client)| TxnContext::new(region)) } #[inline] @@ -248,16 +247,10 @@ impl RpcClient { &self, key: Key, cf: Option, - ) -> impl Future { + ) -> impl Future, Error = Error> { Self::raw(self.inner(), &key, cf) - .and_then(move |context| context.client().raw_get(context, key)) - .and_then(move |value| { - if value.is_empty() { - Err(Error::NoSuchKey) - } else { - Ok(value) - } - }) + .and_then(|context| context.client().raw_get(context, key)) + .map(|value| if value.is_empty() { None } else { Some(value) }) } pub fn raw_batch_get( @@ -274,8 +267,8 @@ impl RpcClient { let inner = Arc::clone(&inner); let cf = cf.clone(); let task = Self::region_context_by_id(inner, region.id) - .map(move |(region, client)| RawContext::new(region, client, cf)) - .and_then(move |context| { + .map(|(region, client)| RawContext::new(region, client, cf)) + .and_then(|context| { context.client().raw_batch_get(context, keys.into_iter()) }); tasks.push(task); @@ -296,7 +289,7 @@ impl RpcClient { } else { Either::B( Self::raw(self.inner(), &key, cf) - .and_then(move |context| context.client().raw_put(context, key, value)), + .and_then(|context| context.client().raw_put(context, key, value)), ) } } @@ -319,10 +312,8 @@ impl RpcClient { let inner = Arc::clone(&inner); let cf = cf.clone(); let task = Self::region_context_by_id(inner, region.id) - .map(move |(region, client)| RawContext::new(region, client, cf)) - .and_then(move |context| { - context.client().raw_batch_put(context, pairs) - }); + .map(|(region, client)| RawContext::new(region, client, cf)) + .and_then(|context| context.client().raw_batch_put(context, pairs)); tasks.push(task); } future::join_all(tasks) @@ -338,7 +329,7 @@ impl RpcClient { cf: Option, ) -> impl Future { Self::raw(self.inner(), &key, cf) - .and_then(move |context| context.client().raw_delete(context, key)) + .and_then(|context| context.client().raw_delete(context, key)) } pub fn raw_batch_delete( @@ -355,8 +346,8 @@ impl RpcClient { let inner = Arc::clone(&inner); let cf = cf.clone(); let task = Self::region_context_by_id(inner, region.id) - .map(move |(region, client)| RawContext::new(region, client, cf)) - .and_then(move |context| context.client().raw_batch_delete(context, keys)); + .map(|(region, client)| RawContext::new(region, client, cf)) + .and_then(|context| context.client().raw_batch_delete(context, keys)); tasks.push(task); } future::join_all(tasks) @@ -386,35 +377,33 @@ impl RpcClient { ); let inner = Arc::clone(&self.inner); loop_fn((inner, scan), |(inner, scan)| { - inner - .locate_key(scan.start_key()) - .and_then(move |location| { - let region = location.into_inner(); - let cf = scan.cf.clone(); - Self::region_context_by_id(Arc::clone(&inner), region.id) - .map(move |(region, client)| { - (scan, region.range(), RawContext::new(region, client, cf)) - }) - .and_then(move |(mut scan, region_range, context)| { - let (start_key, end_key) = scan.range(); - context - .client() - .raw_scan(context, start_key, end_key, scan.limit, scan.key_only) - .map(move |pairs| (scan, region_range, pairs)) - }) - .map(move |(mut scan, region_range, mut pairs)| { - let limit = scan.limit; - scan.result_mut().append(&mut pairs); - if scan.result().len() as u32 >= limit { - Loop::Break(scan.into_inner()) - } else { - match scan.next(region_range) { - ScanRegionsStatus::Continue => Loop::Continue((inner, scan)), - ScanRegionsStatus::Break => Loop::Break(scan.into_inner()), - } + inner.locate_key(scan.start_key()).and_then(|location| { + let region = location.into_inner(); + let cf = scan.cf.clone(); + Self::region_context_by_id(Arc::clone(&inner), region.id) + .map(|(region, client)| { + (scan, region.range(), RawContext::new(region, client, cf)) + }) + .and_then(|(mut scan, region_range, context)| { + let (start_key, end_key) = scan.range(); + context + .client() + .raw_scan(context, start_key, end_key, scan.limit, scan.key_only) + .map(|pairs| (scan, region_range, pairs)) + }) + .map(|(mut scan, region_range, mut pairs)| { + let limit = scan.limit; + scan.result_mut().append(&mut pairs); + if scan.result().len() as u32 >= limit { + Loop::Break(scan.into_inner()) + } else { + match scan.next(region_range) { + ScanRegionsStatus::Continue => Loop::Continue((inner, scan)), + ScanRegionsStatus::Break => Loop::Break(scan.into_inner()), } - }) - }) + } + }) + }) }) } @@ -438,31 +427,27 @@ impl RpcClient { let scan: ScanRegionsContext<(), Option> = ScanRegionsContext::new(range, cf); let inner = Arc::clone(&self.inner); loop_fn((inner, scan), |(inner, scan)| { - inner - .locate_key(scan.start_key()) - .and_then(move |location| { - let region = location.into_inner(); - let cf = scan.clone(); - Self::region_context_by_id(Arc::clone(&inner), region.id) - .map(move |(region, client)| { - (scan, region.range(), RawContext::new(region, client, cf)) - }) - .and_then(move |(mut scan, region_range, context)| { - let (start_key, end_key) = scan.range(); - let start_key = start_key.expect("start key must be specified"); - let end_key = end_key.expect("end key must be specified"); - context - .client() - .raw_delete_range(context, start_key, end_key) - .map(move |_| (scan, region_range)) - }) - .map( - move |(mut scan, region_range)| match scan.next(region_range) { - ScanRegionsStatus::Continue => Loop::Continue((inner, scan)), - ScanRegionsStatus::Break => Loop::Break(()), - }, - ) - }) + inner.locate_key(scan.start_key()).and_then(|location| { + let region = location.into_inner(); + let cf = scan.clone(); + Self::region_context_by_id(Arc::clone(&inner), region.id) + .map(|(region, client)| { + (scan, region.range(), RawContext::new(region, client, cf)) + }) + .and_then(|(mut scan, region_range, context)| { + let (start_key, end_key) = scan.range(); + let start_key = start_key.expect("start key must be specified"); + let end_key = end_key.expect("end key must be specified"); + context + .client() + .raw_delete_range(context, start_key, end_key) + .map(|_| (scan, region_range)) + }) + .map(|(mut scan, region_range)| match scan.next(region_range) { + ScanRegionsStatus::Continue => Loop::Continue((inner, scan)), + ScanRegionsStatus::Break => Loop::Break(()), + }) + }) }) } } diff --git a/src/rpc/pd/leader.rs b/src/rpc/pd/leader.rs index 5a1a9a3..0261d59 100644 --- a/src/rpc/pd/leader.rs +++ b/src/rpc/pd/leader.rs @@ -401,7 +401,7 @@ pub fn try_connect_leader( let mut resp = None; // Try to connect to other members, then the previous leader. 'outer: for m in members - .into_iter() + .iter() .filter(|m| *m != previous_leader) .chain(&[previous_leader.clone()]) { diff --git a/src/rpc/security.rs b/src/rpc/security.rs index 7d84953..ed12be3 100644 --- a/src/rpc/security.rs +++ b/src/rpc/security.rs @@ -115,7 +115,7 @@ mod tests { let example_cert = temp.path().join("cert"); let example_pem = temp.path().join("key"); for (id, f) in (&[&example_ca, &example_cert, &example_pem]) - .into_iter() + .iter() .enumerate() { File::create(f).unwrap().write_all(&[id as u8]).unwrap(); diff --git a/tests/raw.rs b/tests/raw.rs index 12b71a1..7eb6420 100644 --- a/tests/raw.rs +++ b/tests/raw.rs @@ -59,14 +59,18 @@ fn test_existence(client: &Client, existing_pairs: &[KvPair], not_existing_keys: for pair in existing_pairs.iter().map(Clone::clone) { let (key, value) = pair.into_inner(); assert_eq!( - client.get(key).wait().expect("Could not get value"), + client + .get(key) + .wait() + .expect("Could not get value") + .expect("key doesn't exist"), value.clone(), ); } for key in not_existing_keys.clone().into_iter() { - let r = client.get(key).wait(); - assert!(r.is_err()); + let r = client.get(key).wait().expect("Cound not get value"); + assert!(r.is_none()); } let mut existing_keys = Vec::with_capacity(existing_pairs.len());