From 3f9960f308f8d0dd61b3717d9f2b31e7c2c2e662 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Wed, 14 Aug 2019 16:00:16 +0800 Subject: [PATCH 1/2] Remove outdated comment Signed-off-by: Yilin Chen --- src/transaction/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 55c0a12..c9ee2d3 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -50,7 +50,6 @@ impl Mutation { pb } - /// Returns a `Some` if the value can be determined by this mutation. Otherwise, returns `None`. fn get_value(&self) -> MutationValue { match self { Mutation::Put(value) => MutationValue::Determined(Some(value.clone())), From c1751ac4b9741f17929171a490f7a8068fc34057 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Thu, 15 Aug 2019 10:45:44 +0800 Subject: [PATCH 2/2] Address comments Signed-off-by: Yilin Chen --- src/transaction/mod.rs | 55 ++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index c9ee2d3..feffbd8 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -34,7 +34,7 @@ enum Mutation { } impl Mutation { - fn with_key(self, key: Key) -> kvrpcpb::Mutation { + fn into_proto_with_key(self, key: Key) -> kvrpcpb::Mutation { let mut pb = kvrpcpb::Mutation { key: key.into(), ..Default::default() @@ -141,36 +141,29 @@ impl Transaction { &self, keys: impl IntoIterator>, ) -> Result)>> { - let mut keys_from_snapshot = Vec::new(); - let mut results_in_buffer = keys - .into_iter() - .map(|key| { - let key = key.into(); - let mutation_value = self.get_from_mutations(&key); - if let MutationValue::Undetermined = mutation_value { - keys_from_snapshot.push(key.clone()); - } - (key, mutation_value) - }) - .collect::>() - .into_iter(); - let mut results_from_snapshot = self - .snapshot - .batch_get(keys_from_snapshot) - .await? - .peekable(); - Ok(std::iter::from_fn(move || { - let (key, mutation_value) = results_in_buffer.next()?; - match mutation_value { - MutationValue::Determined(value) => Some((key, value)), - MutationValue::Undetermined => match results_from_snapshot.peek() { - Some((key_from_snapshot, _)) if &key == key_from_snapshot => { - results_from_snapshot.next() - } - _ => Some((key, None)), - }, + let mut undetermined_keys = Vec::new(); + let mut results_in_buffer = Vec::new(); + for key in keys { + let key = key.into(); + let mutation_value = self.get_from_mutations(&key); + // If the value cannot be determined according to the buffered mutations, we need to + // query from the snapshot. + if let MutationValue::Undetermined = mutation_value { + undetermined_keys.push(key.clone()); } - })) + results_in_buffer.push((key, mutation_value)); + } + let mut results_from_snapshot = self.snapshot.batch_get(undetermined_keys).await?; + Ok(results_in_buffer + .into_iter() + .map(move |(key, mutation_value)| match mutation_value { + MutationValue::Determined(value) => (key, value), + // `results_from_snapshot` should contain all undetermined keys. If not, it's a bug + // in `Snapshot::batch_get`. + MutationValue::Undetermined => results_from_snapshot + .next() + .expect("not enough results from snapshot"), + })) } pub fn scan(&self, _range: impl RangeBounds) -> Scanner { @@ -310,7 +303,7 @@ impl Transaction { let _rpc_mutations: Vec<_> = self .mutations .iter() - .map(|(k, v)| v.clone().with_key(k.clone())) + .map(|(k, v)| v.clone().into_proto_with_key(k.clone())) .collect(); unimplemented!() }