Change raw::Client::get to return Option<Value> (#24)

Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>
This commit is contained in:
Xiaoguang Sun 2019-01-21 13:49:50 +08:00 committed by Ana Hobden
parent c33bc136b4
commit c47b92bab4
6 changed files with 73 additions and 84 deletions

View File

@ -58,7 +58,7 @@ fn main() -> Result<()> {
// It is best to pass a `Vec<u8>` in terms of explictness and speed. `String`s and a few other // It is best to pass a `Vec<u8>` in terms of explictness and speed. `String`s and a few other
// types are supported as well, but it all ends up as `Vec<u8>` in the end. // types are supported as well, but it all ends up as `Vec<u8>` in the end.
let key: String = String::from(KEY); let key: String = String::from(KEY);
let value: Value = client.get(key.clone()).wait()?; let value: Value = client.get(key.clone()).wait()?.expect("value must exist");
assert_eq!(value.as_ref(), VALUE.as_bytes()); assert_eq!(value.as_ref(), VALUE.as_bytes());
println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY); println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY);

View File

@ -66,7 +66,7 @@ impl Client {
/// # let connected_client = connecting_client.wait().unwrap(); /// # let connected_client = connecting_client.wait().unwrap();
/// let key = "TiKV"; /// let key = "TiKV";
/// let req = connected_client.get(key); /// let req = connected_client.get(key);
/// let result: Value = req.wait().unwrap(); /// let result: Option<Value> = req.wait().unwrap();
/// ``` /// ```
pub fn get(&self, key: impl Into<Key>) -> Get { pub fn get(&self, key: impl Into<Key>) -> Get {
Get::new(self.rpc(), GetInner::new(key.into())) Get::new(self.rpc(), GetInner::new(key.into()))
@ -388,7 +388,7 @@ impl Get {
} }
impl Future for Get { impl Future for Get {
type Item = Value; type Item = Option<Value>;
type Error = Error; type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -407,7 +407,7 @@ impl GetInner {
} }
impl RequestInner for GetInner { impl RequestInner for GetInner {
type Resp = Value; type Resp = Option<Value>;
fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> KvFuture<Self::Resp> { fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> KvFuture<Self::Resp> {
Box::new(client.raw_get(self.key, cf)) Box::new(client.raw_get(self.key, cf))

View File

@ -231,12 +231,11 @@ impl RpcClient {
key: &Key, key: &Key,
cf: Option<ColumnFamily>, cf: Option<ColumnFamily>,
) -> impl Future<Item = RawContext, Error = Error> { ) -> impl Future<Item = RawContext, Error = Error> {
Self::region_context(inner, key) Self::region_context(inner, key).map(|(region, client)| RawContext::new(region, client, cf))
.map(move |(region, client)| RawContext::new(region, client, cf))
} }
fn txn(inner: Arc<RpcClientInner>, key: &Key) -> impl Future<Item = TxnContext, Error = Error> { fn txn(inner: Arc<RpcClientInner>, key: &Key) -> impl Future<Item = TxnContext, Error = Error> {
Self::region_context(inner, key).map(move |(region, _client)| TxnContext::new(region)) Self::region_context(inner, key).map(|(region, _client)| TxnContext::new(region))
} }
#[inline] #[inline]
@ -248,16 +247,10 @@ impl RpcClient {
&self, &self,
key: Key, key: Key,
cf: Option<ColumnFamily>, cf: Option<ColumnFamily>,
) -> impl Future<Item = Value, Error = Error> { ) -> impl Future<Item = Option<Value>, Error = Error> {
Self::raw(self.inner(), &key, cf) Self::raw(self.inner(), &key, cf)
.and_then(move |context| context.client().raw_get(context, key)) .and_then(|context| context.client().raw_get(context, key))
.and_then(move |value| { .map(|value| if value.is_empty() { None } else { Some(value) })
if value.is_empty() {
Err(Error::NoSuchKey)
} else {
Ok(value)
}
})
} }
pub fn raw_batch_get( pub fn raw_batch_get(
@ -274,8 +267,8 @@ impl RpcClient {
let inner = Arc::clone(&inner); let inner = Arc::clone(&inner);
let cf = cf.clone(); let cf = cf.clone();
let task = Self::region_context_by_id(inner, region.id) let task = Self::region_context_by_id(inner, region.id)
.map(move |(region, client)| RawContext::new(region, client, cf)) .map(|(region, client)| RawContext::new(region, client, cf))
.and_then(move |context| { .and_then(|context| {
context.client().raw_batch_get(context, keys.into_iter()) context.client().raw_batch_get(context, keys.into_iter())
}); });
tasks.push(task); tasks.push(task);
@ -296,7 +289,7 @@ impl RpcClient {
} else { } else {
Either::B( Either::B(
Self::raw(self.inner(), &key, cf) Self::raw(self.inner(), &key, cf)
.and_then(move |context| context.client().raw_put(context, key, value)), .and_then(|context| context.client().raw_put(context, key, value)),
) )
} }
} }
@ -319,10 +312,8 @@ impl RpcClient {
let inner = Arc::clone(&inner); let inner = Arc::clone(&inner);
let cf = cf.clone(); let cf = cf.clone();
let task = Self::region_context_by_id(inner, region.id) let task = Self::region_context_by_id(inner, region.id)
.map(move |(region, client)| RawContext::new(region, client, cf)) .map(|(region, client)| RawContext::new(region, client, cf))
.and_then(move |context| { .and_then(|context| context.client().raw_batch_put(context, pairs));
context.client().raw_batch_put(context, pairs)
});
tasks.push(task); tasks.push(task);
} }
future::join_all(tasks) future::join_all(tasks)
@ -338,7 +329,7 @@ impl RpcClient {
cf: Option<ColumnFamily>, cf: Option<ColumnFamily>,
) -> impl Future<Item = (), Error = Error> { ) -> impl Future<Item = (), Error = Error> {
Self::raw(self.inner(), &key, cf) Self::raw(self.inner(), &key, cf)
.and_then(move |context| context.client().raw_delete(context, key)) .and_then(|context| context.client().raw_delete(context, key))
} }
pub fn raw_batch_delete( pub fn raw_batch_delete(
@ -355,8 +346,8 @@ impl RpcClient {
let inner = Arc::clone(&inner); let inner = Arc::clone(&inner);
let cf = cf.clone(); let cf = cf.clone();
let task = Self::region_context_by_id(inner, region.id) let task = Self::region_context_by_id(inner, region.id)
.map(move |(region, client)| RawContext::new(region, client, cf)) .map(|(region, client)| RawContext::new(region, client, cf))
.and_then(move |context| context.client().raw_batch_delete(context, keys)); .and_then(|context| context.client().raw_batch_delete(context, keys));
tasks.push(task); tasks.push(task);
} }
future::join_all(tasks) future::join_all(tasks)
@ -386,35 +377,33 @@ impl RpcClient {
); );
let inner = Arc::clone(&self.inner); let inner = Arc::clone(&self.inner);
loop_fn((inner, scan), |(inner, scan)| { loop_fn((inner, scan), |(inner, scan)| {
inner inner.locate_key(scan.start_key()).and_then(|location| {
.locate_key(scan.start_key()) let region = location.into_inner();
.and_then(move |location| { let cf = scan.cf.clone();
let region = location.into_inner(); Self::region_context_by_id(Arc::clone(&inner), region.id)
let cf = scan.cf.clone(); .map(|(region, client)| {
Self::region_context_by_id(Arc::clone(&inner), region.id) (scan, region.range(), RawContext::new(region, client, cf))
.map(move |(region, client)| { })
(scan, region.range(), RawContext::new(region, client, cf)) .and_then(|(mut scan, region_range, context)| {
}) let (start_key, end_key) = scan.range();
.and_then(move |(mut scan, region_range, context)| { context
let (start_key, end_key) = scan.range(); .client()
context .raw_scan(context, start_key, end_key, scan.limit, scan.key_only)
.client() .map(|pairs| (scan, region_range, pairs))
.raw_scan(context, start_key, end_key, scan.limit, scan.key_only) })
.map(move |pairs| (scan, region_range, pairs)) .map(|(mut scan, region_range, mut pairs)| {
}) let limit = scan.limit;
.map(move |(mut scan, region_range, mut pairs)| { scan.result_mut().append(&mut pairs);
let limit = scan.limit; if scan.result().len() as u32 >= limit {
scan.result_mut().append(&mut pairs); Loop::Break(scan.into_inner())
if scan.result().len() as u32 >= limit { } else {
Loop::Break(scan.into_inner()) match scan.next(region_range) {
} else { ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
match scan.next(region_range) { ScanRegionsStatus::Break => Loop::Break(scan.into_inner()),
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
ScanRegionsStatus::Break => Loop::Break(scan.into_inner()),
}
} }
}) }
}) })
})
}) })
} }
@ -438,31 +427,27 @@ impl RpcClient {
let scan: ScanRegionsContext<(), Option<ColumnFamily>> = ScanRegionsContext::new(range, cf); let scan: ScanRegionsContext<(), Option<ColumnFamily>> = ScanRegionsContext::new(range, cf);
let inner = Arc::clone(&self.inner); let inner = Arc::clone(&self.inner);
loop_fn((inner, scan), |(inner, scan)| { loop_fn((inner, scan), |(inner, scan)| {
inner inner.locate_key(scan.start_key()).and_then(|location| {
.locate_key(scan.start_key()) let region = location.into_inner();
.and_then(move |location| { let cf = scan.clone();
let region = location.into_inner(); Self::region_context_by_id(Arc::clone(&inner), region.id)
let cf = scan.clone(); .map(|(region, client)| {
Self::region_context_by_id(Arc::clone(&inner), region.id) (scan, region.range(), RawContext::new(region, client, cf))
.map(move |(region, client)| { })
(scan, region.range(), RawContext::new(region, client, cf)) .and_then(|(mut scan, region_range, context)| {
}) let (start_key, end_key) = scan.range();
.and_then(move |(mut scan, region_range, context)| { let start_key = start_key.expect("start key must be specified");
let (start_key, end_key) = scan.range(); let end_key = end_key.expect("end key must be specified");
let start_key = start_key.expect("start key must be specified"); context
let end_key = end_key.expect("end key must be specified"); .client()
context .raw_delete_range(context, start_key, end_key)
.client() .map(|_| (scan, region_range))
.raw_delete_range(context, start_key, end_key) })
.map(move |_| (scan, region_range)) .map(|(mut scan, region_range)| match scan.next(region_range) {
}) ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
.map( ScanRegionsStatus::Break => Loop::Break(()),
move |(mut scan, region_range)| match scan.next(region_range) { })
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)), })
ScanRegionsStatus::Break => Loop::Break(()),
},
)
})
}) })
} }
} }

View File

@ -401,7 +401,7 @@ pub fn try_connect_leader(
let mut resp = None; let mut resp = None;
// Try to connect to other members, then the previous leader. // Try to connect to other members, then the previous leader.
'outer: for m in members 'outer: for m in members
.into_iter() .iter()
.filter(|m| *m != previous_leader) .filter(|m| *m != previous_leader)
.chain(&[previous_leader.clone()]) .chain(&[previous_leader.clone()])
{ {

View File

@ -115,7 +115,7 @@ mod tests {
let example_cert = temp.path().join("cert"); let example_cert = temp.path().join("cert");
let example_pem = temp.path().join("key"); let example_pem = temp.path().join("key");
for (id, f) in (&[&example_ca, &example_cert, &example_pem]) for (id, f) in (&[&example_ca, &example_cert, &example_pem])
.into_iter() .iter()
.enumerate() .enumerate()
{ {
File::create(f).unwrap().write_all(&[id as u8]).unwrap(); File::create(f).unwrap().write_all(&[id as u8]).unwrap();

View File

@ -59,14 +59,18 @@ fn test_existence(client: &Client, existing_pairs: &[KvPair], not_existing_keys:
for pair in existing_pairs.iter().map(Clone::clone) { for pair in existing_pairs.iter().map(Clone::clone) {
let (key, value) = pair.into_inner(); let (key, value) = pair.into_inner();
assert_eq!( assert_eq!(
client.get(key).wait().expect("Could not get value"), client
.get(key)
.wait()
.expect("Could not get value")
.expect("key doesn't exist"),
value.clone(), value.clone(),
); );
} }
for key in not_existing_keys.clone().into_iter() { for key in not_existing_keys.clone().into_iter() {
let r = client.get(key).wait(); let r = client.get(key).wait().expect("Cound not get value");
assert!(r.is_err()); assert!(r.is_none());
} }
let mut existing_keys = Vec::with_capacity(existing_pairs.len()); let mut existing_keys = Vec::with_capacity(existing_pairs.len());