batch_get returns an iterator

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2019-08-14 13:14:29 +08:00
parent a74e1abd1a
commit a601c39c1f
No known key found for this signature in database
GPG Key ID: 353E7ED34BF326E0
1 changed files with 45 additions and 28 deletions

View File

@ -15,10 +15,7 @@ pub use self::requests::Scanner;
use crate::{Key, Result, Value};
use derive_new::new;
use kvproto::kvrpcpb;
use std::{
collections::{BTreeMap, HashMap},
ops::RangeBounds,
};
use std::{collections::BTreeMap, ops::RangeBounds};
mod client;
pub(crate) mod requests;
@ -119,8 +116,8 @@ impl Transaction {
}
}
/// Gets the values associated with the given keys. Non-existent keys are not included in the
/// result.
/// Gets the values associated with the given keys. The returned iterator is in the same order
/// as the given keys.
///
/// ```rust,no_run
/// # #![feature(async_await)]
@ -132,7 +129,11 @@ impl Transaction {
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin().await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let result: HashMap<Key, Value> = txn.batch_get(keys).await.unwrap();
/// let result: HashMap<Key, Value> = txn
/// .batch_get(keys)
/// .await
/// .unwrap()
/// .filter_map(|(k, v)| v.map(move |v| (k, v))).collect();
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
@ -140,25 +141,37 @@ impl Transaction {
pub async fn batch_get(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<HashMap<Key, Value>> {
let mut result = HashMap::new();
) -> Result<impl Iterator<Item = (Key, Option<Value>)>> {
let mut keys_from_snapshot = Vec::new();
// Try to get the result from buffered mutations first
for key in keys {
let key = key.into();
match self.get_from_mutations(&key) {
MutationValue::Determined(Some(value)) => {
result.insert(key, value);
let mut results_in_buffer = keys
.into_iter()
.map(|key| {
let key = key.into();
let mutation_value = self.get_from_mutations(&key);
if let MutationValue::Undetermined = mutation_value {
keys_from_snapshot.push(key.clone());
}
MutationValue::Determined(None) => {}
MutationValue::Undetermined => keys_from_snapshot.push(key),
(key, mutation_value)
})
.collect::<Vec<_>>()
.into_iter();
let mut results_from_snapshot = self
.snapshot
.batch_get(keys_from_snapshot)
.await?
.peekable();
Ok(std::iter::from_fn(move || {
let (key, mutation_value) = results_in_buffer.next()?;
match mutation_value {
MutationValue::Determined(value) => Some((key, value)),
MutationValue::Undetermined => match results_from_snapshot.peek() {
Some((key_from_snapshot, _)) if &key == key_from_snapshot => {
results_from_snapshot.next()
}
_ => Some((key, None)),
},
}
}
// Get others from snapshot
result.extend(self.snapshot.batch_get(keys_from_snapshot).await?);
Ok(result)
}))
}
pub fn scan(&self, _range: impl RangeBounds<Key>) -> Scanner {
@ -349,8 +362,8 @@ impl Snapshot {
unimplemented!()
}
/// Gets the values associated with the given keys. Non-existent keys are not included in the
/// result.
/// Gets the values associated with the given keys. The returned iterator is in the same order
/// as the given keys.
///
/// ```rust,no_run
/// # #![feature(async_await)]
@ -362,14 +375,18 @@ impl Snapshot {
/// # let connected_client = connecting_client.await.unwrap();
/// let snapshot = connected_client.snapshot().await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let result: HashMap<Key, Value> = snapshot.batch_get(keys).await.unwrap();
/// let result: HashMap<Key, Value> = snapshot
/// .batch_get(keys)
/// .await
/// .unwrap()
/// .filter_map(|(k, v)| v.map(move |v| (k, v))).collect();
/// # });
/// ```
pub async fn batch_get(
&self,
_keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<HashMap<Key, Value>> {
unimplemented!()
) -> Result<impl Iterator<Item = (Key, Option<Value>)>> {
Ok(std::iter::repeat_with(|| unimplemented!()))
}
pub fn scan(&self, range: impl RangeBounds<Key>) -> Scanner {