fix txn.batch_get() signature; now it returns Iter<KvPair>, and skips non-existent entries

Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
ekexium 2020-09-23 16:02:53 +08:00
parent 6282c65c79
commit 1c383ae2e1
5 changed files with 32 additions and 24 deletions

View File

@ -103,7 +103,7 @@ impl Client {
/// Create a new 'batch get' request.
///
/// Once resolved this request will result in the fetching of the values associated with the
/// given keys.
/// given keys, skipping non-existent entris.
///
/// ```rust,no_run
/// # use tikv_client::{KvPair, Config, RawClient};

View File

@ -39,7 +39,7 @@ impl Buffer {
&self,
keys: impl Iterator<Item = Key>,
f: F,
) -> Result<impl Iterator<Item = (Key, Option<Value>)>>
) -> Result<impl Iterator<Item = KvPair>>
where
F: FnOnce(Vec<Key>) -> Fut,
Fut: Future<Output = Result<Vec<KvPair>>>,
@ -48,7 +48,10 @@ impl Buffer {
let mutations = self.mutations.lock().unwrap();
// Partition the keys into those we have buffered and those we have to
// get from the store.
let (undetermined_keys, cached_results): (Vec<(Key, MutationValue)>, _) = keys
let (undetermined_keys, cached_results): (
Vec<(Key, MutationValue)>,
Vec<(Key, MutationValue)>,
) = keys
.map(|key| {
let value = mutations
.get(&key)
@ -58,7 +61,12 @@ impl Buffer {
})
.partition(|(_, v)| *v == MutationValue::Undetermined);
let cached_results = cached_results.into_iter().map(|(k, v)| (k, v.unwrap()));
let cached_results = cached_results
.into_iter()
.filter_map(|(k, v)| match v.unwrap() {
Some(v) => Some(KvPair(k, v)),
None => None,
});
let undetermined_keys = undetermined_keys.into_iter().map(|(k, _)| k).collect();
(cached_results, undetermined_keys)
@ -70,7 +78,7 @@ impl Buffer {
mutations.insert(pair.0.clone(), Mutation::Cached(Some(pair.1.clone())));
}
let results = cached_results.chain(fetched_results.into_iter().map(|p| (p.0, Some(p.1))));
let results = cached_results.chain(fetched_results.into_iter());
Ok(results)
}
@ -260,13 +268,10 @@ mod tests {
))
.unwrap()
.collect::<Vec<_>>(),
vec![
(Key::from(b"key2".to_vec()), None),
(
Key::from(b"key1".to_vec()),
Some(Value::from(b"value".to_vec()))
),
]
vec![KvPair(
Key::from(b"key1".to_vec()),
Value::from(b"value".to_vec())
),]
);
}
@ -304,16 +309,16 @@ mod tests {
assert_eq!(
r1.unwrap().collect::<Vec<_>>(),
vec![
(k1.clone(), Some(v1.clone())),
(k2.clone(), Some(v2.clone()))
KvPair(k1.clone(), v1.clone()),
KvPair(k2.clone(), v2.clone())
]
);
assert_eq!(r2.unwrap().unwrap(), v2.clone());
assert_eq!(
r3.unwrap().collect::<Vec<_>>(),
vec![
(k1.clone(), Some(v1.clone())),
(k2.clone(), Some(v2.clone()))
KvPair(k1.clone(), v1.clone()),
KvPair(k2.clone(), v2.clone())
]
);
}

View File

@ -24,7 +24,7 @@ impl Snapshot {
pub async fn batch_get(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = (Key, Option<Value>)>> {
) -> Result<impl Iterator<Item = KvPair>> {
self.transaction.batch_get(keys).await
}

View File

@ -72,7 +72,7 @@ impl Transaction {
.await
}
/// Gets the values associated with the given keys.
/// Gets the values associated with the given keys, skipping non-existent entris.
///
/// ```rust,no_run
/// # use tikv_client::{Key, Value, Config, transaction::Client};
@ -86,7 +86,8 @@ impl Transaction {
/// .batch_get(keys)
/// .await
/// .unwrap()
/// .filter_map(|(k, v)| v.map(move |v| (k, v))).collect();
/// .map(|pair| (pair.0, pair.1))
/// .collect();
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
@ -94,7 +95,7 @@ impl Transaction {
pub async fn batch_get(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = (Key, Option<Value>)>> {
) -> Result<impl Iterator<Item = KvPair>> {
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
self.buffer

View File

@ -81,12 +81,14 @@ async fn crud() -> Fallible<()> {
let client = TransactionClient::new(config).await?;
let mut txn = client.begin().await?;
// Get non-existent keys
assert!(txn.get("foo".to_owned()).await?.is_none());
// batch_get do not return non-existent entries
assert_eq!(
txn.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter(|(_, v)| v.is_some())
.count(),
0
);
@ -101,7 +103,7 @@ async fn crud() -> Fallible<()> {
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.map(|pair| (pair.0, pair.1))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
@ -122,7 +124,7 @@ async fn crud() -> Fallible<()> {
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.map(|pair| (pair.0, pair.1))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
@ -141,7 +143,7 @@ async fn crud() -> Fallible<()> {
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.map(|pair| (pair.0, pair.1))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),