diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index c9ee2d3..feffbd8 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -34,7 +34,7 @@ enum Mutation { } impl Mutation { - fn with_key(self, key: Key) -> kvrpcpb::Mutation { + fn into_proto_with_key(self, key: Key) -> kvrpcpb::Mutation { let mut pb = kvrpcpb::Mutation { key: key.into(), ..Default::default() @@ -141,36 +141,29 @@ impl Transaction { &self, keys: impl IntoIterator>, ) -> Result)>> { - let mut keys_from_snapshot = Vec::new(); - let mut results_in_buffer = keys - .into_iter() - .map(|key| { - let key = key.into(); - let mutation_value = self.get_from_mutations(&key); - if let MutationValue::Undetermined = mutation_value { - keys_from_snapshot.push(key.clone()); - } - (key, mutation_value) - }) - .collect::>() - .into_iter(); - let mut results_from_snapshot = self - .snapshot - .batch_get(keys_from_snapshot) - .await? - .peekable(); - Ok(std::iter::from_fn(move || { - let (key, mutation_value) = results_in_buffer.next()?; - match mutation_value { - MutationValue::Determined(value) => Some((key, value)), - MutationValue::Undetermined => match results_from_snapshot.peek() { - Some((key_from_snapshot, _)) if &key == key_from_snapshot => { - results_from_snapshot.next() - } - _ => Some((key, None)), - }, + let mut undetermined_keys = Vec::new(); + let mut results_in_buffer = Vec::new(); + for key in keys { + let key = key.into(); + let mutation_value = self.get_from_mutations(&key); + // If the value cannot be determined according to the buffered mutations, we need to + // query from the snapshot. + if let MutationValue::Undetermined = mutation_value { + undetermined_keys.push(key.clone()); } - })) + results_in_buffer.push((key, mutation_value)); + } + let mut results_from_snapshot = self.snapshot.batch_get(undetermined_keys).await?; + Ok(results_in_buffer + .into_iter() + .map(move |(key, mutation_value)| match mutation_value { + MutationValue::Determined(value) => (key, value), + // `results_from_snapshot` should contain all undetermined keys. If not, it's a bug + // in `Snapshot::batch_get`. + MutationValue::Undetermined => results_from_snapshot + .next() + .expect("not enough results from snapshot"), + })) } pub fn scan(&self, _range: impl RangeBounds) -> Scanner { @@ -310,7 +303,7 @@ impl Transaction { let _rpc_mutations: Vec<_> = self .mutations .iter() - .map(|(k, v)| v.clone().with_key(k.clone())) + .map(|(k, v)| v.clone().into_proto_with_key(k.clone())) .collect(); unimplemented!() }