Read from mutations in transactions

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2019-08-07 17:00:38 +08:00
parent f79c8c57f0
commit ad49e78c29
No known key found for this signature in database
GPG Key ID: 353E7ED34BF326E0
2 changed files with 63 additions and 8 deletions

View File

@ -23,11 +23,11 @@ mod requests;
#[allow(clippy::module_inception)] #[allow(clippy::module_inception)]
mod transaction; mod transaction;
#[derive(Debug, Clone)]
pub enum Mutation { pub enum Mutation {
Put(Value), Put(Value),
Del, Del,
Lock, Lock,
Rollback,
} }
impl Mutation { impl Mutation {
@ -43,8 +43,16 @@ impl Mutation {
} }
Mutation::Del => pb.set_op(kvrpcpb::Op::Del), Mutation::Del => pb.set_op(kvrpcpb::Op::Del),
Mutation::Lock => pb.set_op(kvrpcpb::Op::Lock), Mutation::Lock => pb.set_op(kvrpcpb::Op::Lock),
Mutation::Rollback => pb.set_op(kvrpcpb::Op::Rollback),
}; };
pb pb
} }
/// Returns a `Some` if the value can be determined by this mutation. Otherwise, returns `None`.
fn get_value(&self) -> Option<Value> {
match self {
Mutation::Put(value) => Some(value.clone()),
Mutation::Del => Some(Value::default()),
Mutation::Lock => None,
}
}
} }

View File

@ -51,8 +51,13 @@ impl Transaction {
/// txn.commit().await.unwrap(); /// txn.commit().await.unwrap();
/// # }); /// # });
/// ``` /// ```
pub async fn get(&self, _key: impl Into<Key>) -> Result<Value> { pub async fn get(&self, key: impl Into<Key>) -> Result<Value> {
unimplemented!() let key = key.into();
if let Some(value) = self.get_from_mutations(&key) {
Ok(value)
} else {
self.snapshot.get(key).await
}
} }
/// Gets the values associated with the given keys. /// Gets the values associated with the given keys.
@ -74,9 +79,38 @@ impl Transaction {
/// ``` /// ```
pub async fn batch_get( pub async fn batch_get(
&self, &self,
_keys: impl IntoIterator<Item = impl Into<Key>>, keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> { ) -> Result<Vec<KvPair>> {
unimplemented!() let mut result = Vec::new();
let mut keys_from_snapshot = Vec::new();
let mut result_indices_from_snapshot = Vec::new();
// Try to fill the result vector from mutations
for key in keys {
let key = key.into();
if let Some(value) = self.get_from_mutations(&key) {
result.push((key, value).into());
} else {
keys_from_snapshot.push(key);
result_indices_from_snapshot.push(result.len());
// Push a placeholder
result.push(KvPair::default());
}
}
// Get others from snapshot
let kv_pairs_from_snapshot = self
.snapshot
.batch_get(keys_from_snapshot.into_iter())
.await?;
for (kv_pair, index) in kv_pairs_from_snapshot
.into_iter()
.zip(result_indices_from_snapshot)
{
result[index] = kv_pair;
}
Ok(result)
} }
pub fn scan(&self, _range: impl RangeBounds<Key>) -> Scanner { pub fn scan(&self, _range: impl RangeBounds<Key>) -> Scanner {
@ -145,8 +179,11 @@ impl Transaction {
/// # }); /// # });
/// ``` /// ```
pub fn lock_keys(&mut self, keys: impl IntoIterator<Item = impl Into<Key>>) { pub fn lock_keys(&mut self, keys: impl IntoIterator<Item = impl Into<Key>>) {
self.mutations for key in keys {
.extend(keys.into_iter().map(|key| (key.into(), Mutation::Lock))); let key = key.into();
// Mutated keys don't need a lock.
self.mutations.entry(key).or_insert(Mutation::Lock);
}
} }
/// Commits the actions of the transaction. /// Commits the actions of the transaction.
@ -209,6 +246,12 @@ impl Transaction {
} }
async fn prewrite(&mut self) -> Result<()> { async fn prewrite(&mut self) -> Result<()> {
// TODO: Too many clones. Consider using bytes::Byte.
let _rpc_mutations: Vec<_> = self
.mutations
.iter()
.map(|(k, v)| v.clone().with_key(k.clone()))
.collect();
unimplemented!() unimplemented!()
} }
@ -219,6 +262,10 @@ impl Transaction {
async fn commit_secondary(&mut self) -> Result<()> { async fn commit_secondary(&mut self) -> Result<()> {
unimplemented!() unimplemented!()
} }
fn get_from_mutations(&self, key: &Key) -> Option<Value> {
self.mutations.get(key).and_then(Mutation::get_value)
}
} }
pub struct TxnInfo { pub struct TxnInfo {