Address comments

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2019-08-15 10:45:44 +08:00
parent 3f9960f308
commit c1751ac4b9
No known key found for this signature in database
GPG Key ID: 353E7ED34BF326E0
1 changed files with 24 additions and 31 deletions

View File

@ -34,7 +34,7 @@ enum Mutation {
}
impl Mutation {
fn with_key(self, key: Key) -> kvrpcpb::Mutation {
fn into_proto_with_key(self, key: Key) -> kvrpcpb::Mutation {
let mut pb = kvrpcpb::Mutation {
key: key.into(),
..Default::default()
@ -141,35 +141,28 @@ impl Transaction {
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = (Key, Option<Value>)>> {
let mut keys_from_snapshot = Vec::new();
let mut results_in_buffer = keys
.into_iter()
.map(|key| {
let mut undetermined_keys = Vec::new();
let mut results_in_buffer = Vec::new();
for key in keys {
let key = key.into();
let mutation_value = self.get_from_mutations(&key);
// If the value cannot be determined according to the buffered mutations, we need to
// query from the snapshot.
if let MutationValue::Undetermined = mutation_value {
keys_from_snapshot.push(key.clone());
undetermined_keys.push(key.clone());
}
(key, mutation_value)
})
.collect::<Vec<_>>()
.into_iter();
let mut results_from_snapshot = self
.snapshot
.batch_get(keys_from_snapshot)
.await?
.peekable();
Ok(std::iter::from_fn(move || {
let (key, mutation_value) = results_in_buffer.next()?;
match mutation_value {
MutationValue::Determined(value) => Some((key, value)),
MutationValue::Undetermined => match results_from_snapshot.peek() {
Some((key_from_snapshot, _)) if &key == key_from_snapshot => {
results_from_snapshot.next()
}
_ => Some((key, None)),
},
results_in_buffer.push((key, mutation_value));
}
let mut results_from_snapshot = self.snapshot.batch_get(undetermined_keys).await?;
Ok(results_in_buffer
.into_iter()
.map(move |(key, mutation_value)| match mutation_value {
MutationValue::Determined(value) => (key, value),
// `results_from_snapshot` should contain all undetermined keys. If not, it's a bug
// in `Snapshot::batch_get`.
MutationValue::Undetermined => results_from_snapshot
.next()
.expect("not enough results from snapshot"),
}))
}
@ -310,7 +303,7 @@ impl Transaction {
let _rpc_mutations: Vec<_> = self
.mutations
.iter()
.map(|(k, v)| v.clone().with_key(k.clone()))
.map(|(k, v)| v.clone().into_proto_with_key(k.clone()))
.collect();
unimplemented!()
}