Merge pull request #197 from ekexium/async-mutex

Use async aware Mutex instead of std::mutex in async functions
This commit is contained in:
Nick Cameron 2020-11-09 19:47:25 +13:00 committed by GitHub
commit 1c72722ed5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 26 deletions

View File

@ -4,9 +4,9 @@ use crate::{BoundRange, Key, KvPair, Result, Value};
use std::{
collections::{BTreeMap, HashMap},
future::Future,
sync::Mutex,
};
use tikv_client_proto::kvrpcpb;
use tokio::sync::Mutex;
/// A caching layer which buffers reads and writes in a transaction.
#[derive(Default)]
@ -22,11 +22,11 @@ impl Buffer {
F: FnOnce(Key) -> Fut,
Fut: Future<Output = Result<Option<Value>>>,
{
match self.get_from_mutations(&key) {
match self.get_from_mutations(&key).await {
MutationValue::Determined(value) => Ok(value),
MutationValue::Undetermined => {
let value = f(key.clone()).await?;
let mut mutations = self.mutations.lock().unwrap();
let mut mutations = self.mutations.lock().await;
mutations.insert(key, Mutation::Cached(value.clone()));
Ok(value)
}
@ -45,7 +45,7 @@ impl Buffer {
Fut: Future<Output = Result<Vec<KvPair>>>,
{
let (cached_results, undetermined_keys) = {
let mutations = self.mutations.lock().unwrap();
let mutations = self.mutations.lock().await;
// Partition the keys into those we have buffered and those we have to
// get from the store.
let (undetermined_keys, cached_results): (
@ -70,7 +70,7 @@ impl Buffer {
};
let fetched_results = f(undetermined_keys).await?;
let mut mutations = self.mutations.lock().unwrap();
let mut mutations = self.mutations.lock().await;
for pair in &fetched_results {
mutations.insert(pair.0.clone(), Mutation::Cached(Some(pair.1.clone())));
}
@ -91,7 +91,7 @@ impl Buffer {
Fut: Future<Output = Result<Vec<KvPair>>>,
{
// read from local buffer
let mut mutations = self.mutations.lock().unwrap();
let mut mutations = self.mutations.lock().await;
let mutation_range = mutations.range(range.clone());
// fetch from TiKV
@ -136,8 +136,8 @@ impl Buffer {
}
/// Lock the given key if necessary.
pub fn lock(&self, key: Key) {
let mut mutations = self.mutations.lock().unwrap();
pub async fn lock(&self, key: Key) {
let mut mutations = self.mutations.lock().await;
let value = mutations
.entry(key)
// Mutated keys don't need a lock.
@ -149,32 +149,32 @@ impl Buffer {
}
/// Insert a value into the buffer (does not write through).
pub fn put(&self, key: Key, value: Value) {
pub async fn put(&self, key: Key, value: Value) {
self.mutations
.lock()
.unwrap()
.await
.insert(key, Mutation::Put(value));
}
/// Mark a value as deleted.
pub fn delete(&self, key: Key) {
self.mutations.lock().unwrap().insert(key, Mutation::Del);
pub async fn delete(&self, key: Key) {
self.mutations.lock().await.insert(key, Mutation::Del);
}
/// Converts the buffered mutations to the proto buffer version
pub fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
self.mutations
.lock()
.unwrap()
.await
.iter()
.filter_map(|(key, mutation)| mutation.to_proto_with_key(key))
.collect()
}
fn get_from_mutations(&self, key: &Key) -> MutationValue {
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
self.mutations
.lock()
.unwrap()
.await
.get(&key)
.map(Mutation::get_value)
.unwrap_or(MutationValue::Undetermined)
@ -243,12 +243,16 @@ mod tests {
use super::*;
use futures::{executor::block_on, future::ready};
#[test]
#[tokio::test]
#[allow(unreachable_code)]
fn set_and_get_from_buffer() {
async fn set_and_get_from_buffer() {
let buffer = Buffer::default();
buffer.put(b"key1".to_vec().into(), b"value1".to_vec().into());
buffer.put(b"key2".to_vec().into(), b"value2".to_vec().into());
buffer
.put(b"key1".to_vec().into(), b"value1".to_vec().into())
.await;
buffer
.put(b"key2".to_vec().into(), b"value2".to_vec().into())
.await;
assert_eq!(
block_on(buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(panic!())))
.unwrap()
@ -256,8 +260,10 @@ mod tests {
b"value1".to_vec()
);
buffer.delete(b"key2".to_vec().into());
buffer.put(b"key1".to_vec().into(), b"value".to_vec().into());
buffer.delete(b"key2".to_vec().into()).await;
buffer
.put(b"key1".to_vec().into(), b"value".to_vec().into())
.await;
assert_eq!(
block_on(buffer.batch_get_or_else(
vec![b"key2".to_vec().into(), b"key1".to_vec().into()].into_iter(),

View File

@ -244,7 +244,7 @@ impl Transaction {
if self.is_pessimistic {
self.pessimistic_lock(iter::once(key.clone())).await?;
}
self.buffer.put(key, value.into());
self.buffer.put(key, value.into()).await;
Ok(())
}
@ -267,7 +267,7 @@ impl Transaction {
if self.is_pessimistic {
self.pessimistic_lock(iter::once(key.clone())).await?;
}
self.buffer.delete(key);
self.buffer.delete(key).await;
Ok(())
}
@ -286,7 +286,7 @@ impl Transaction {
/// ```
pub async fn lock_keys(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
for key in keys {
self.buffer.lock(key.into());
self.buffer.lock(key.into()).await;
}
Ok(())
}
@ -306,7 +306,7 @@ impl Transaction {
/// ```
pub async fn commit(&mut self) -> Result<()> {
TwoPhaseCommitter::new(
self.buffer.to_proto_mutations(),
self.buffer.to_proto_mutations().await,
self.timestamp.version(),
self.bg_worker.clone(),
self.rpc.clone(),