Merge branch 'handle-mutations' into request-based-kv

This commit is contained in:
Yilin Chen 2019-08-15 10:47:12 +08:00
commit 236c510551
1 changed files with 24 additions and 32 deletions

View File

@ -34,7 +34,7 @@ enum Mutation {
}
impl Mutation {
fn with_key(self, key: Key) -> kvrpcpb::Mutation {
fn into_proto_with_key(self, key: Key) -> kvrpcpb::Mutation {
let mut pb = kvrpcpb::Mutation {
key: key.into(),
..Default::default()
@ -50,7 +50,6 @@ impl Mutation {
pb
}
/// Returns a `Some` if the value can be determined by this mutation. Otherwise, returns `None`.
fn get_value(&self) -> MutationValue {
match self {
Mutation::Put(value) => MutationValue::Determined(Some(value.clone())),
@ -142,36 +141,29 @@ impl Transaction {
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = (Key, Option<Value>)>> {
let mut keys_from_snapshot = Vec::new();
let mut results_in_buffer = keys
.into_iter()
.map(|key| {
let key = key.into();
let mutation_value = self.get_from_mutations(&key);
if let MutationValue::Undetermined = mutation_value {
keys_from_snapshot.push(key.clone());
}
(key, mutation_value)
})
.collect::<Vec<_>>()
.into_iter();
let mut results_from_snapshot = self
.snapshot
.batch_get(keys_from_snapshot)
.await?
.peekable();
Ok(std::iter::from_fn(move || {
let (key, mutation_value) = results_in_buffer.next()?;
match mutation_value {
MutationValue::Determined(value) => Some((key, value)),
MutationValue::Undetermined => match results_from_snapshot.peek() {
Some((key_from_snapshot, _)) if &key == key_from_snapshot => {
results_from_snapshot.next()
}
_ => Some((key, None)),
},
let mut undetermined_keys = Vec::new();
let mut results_in_buffer = Vec::new();
for key in keys {
let key = key.into();
let mutation_value = self.get_from_mutations(&key);
// If the value cannot be determined according to the buffered mutations, we need to
// query from the snapshot.
if let MutationValue::Undetermined = mutation_value {
undetermined_keys.push(key.clone());
}
}))
results_in_buffer.push((key, mutation_value));
}
let mut results_from_snapshot = self.snapshot.batch_get(undetermined_keys).await?;
Ok(results_in_buffer
.into_iter()
.map(move |(key, mutation_value)| match mutation_value {
MutationValue::Determined(value) => (key, value),
// `results_from_snapshot` should contain all undetermined keys. If not, it's a bug
// in `Snapshot::batch_get`.
MutationValue::Undetermined => results_from_snapshot
.next()
.expect("not enough results from snapshot"),
}))
}
pub fn scan(&self, _range: impl RangeBounds<Key>) -> Scanner {
@ -311,7 +303,7 @@ impl Transaction {
let _rpc_mutations: Vec<_> = self
.mutations
.iter()
.map(|(k, v)| v.clone().with_key(k.clone()))
.map(|(k, v)| v.clone().into_proto_with_key(k.clone()))
.collect();
unimplemented!()
}