Merge branch 'master' into integration-tests

This commit is contained in:
ekexium 2020-09-21 10:19:03 +08:00
commit 4c887a4415
4 changed files with 119 additions and 12 deletions

View File

@ -8,7 +8,11 @@ use crate::{
use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
use grpcio::CallOption;
use kvproto::kvrpcpb;
use std::{sync::Arc, time::Duration};
use std::{
cmp::{max, min},
sync::Arc,
time::Duration,
};
use tikv_client_common::{BoundRange, Error, Key, Result};
use tikv_client_store::{HasError, HasRegionError, KvClient, RpcFnType, Store};
@ -188,16 +192,27 @@ pub fn store_stream_for_range<PdC: PdClient>(
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<((Key, Key), Store<PdC::KvClient>)>> {
pd_client
.stores_for_range(range)
.stores_for_range(range.clone())
.map_ok(move |store| {
// FIXME should be bounded by self.range
let range = store.region.range();
(range, store)
let region_range = store.region.range();
(bound_range(region_range, range.clone()), store)
})
.into_stream()
.boxed()
}
/// The range used for request should be the intersection of `region_range` and `range`.
fn bound_range(region_range: (Key, Key), range: BoundRange) -> (Key, Key) {
let (lower, upper) = region_range;
let (lower_bound, upper_bound) = range.into_keys();
let up = match (upper.is_empty(), upper_bound) {
(_, None) => upper,
(true, Some(ub)) => ub,
(_, Some(ub)) => min(upper, ub),
};
(max(lower, lower_bound), up)
}
pub fn store_stream_for_ranges<PdC: PdClient>(
ranges: Vec<BoundRange>,
pd_client: Arc<PdC>,

View File

@ -1,8 +1,12 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use kvproto::kvrpcpb;
use std::{collections::BTreeMap, future::Future, sync::Mutex};
use tikv_client_common::{Key, KvPair, Result, Value};
use std::{
collections::{BTreeMap, HashMap},
future::Future,
sync::Mutex,
};
use tikv_client_common::{BoundRange, Key, KvPair, Result, Value};
/// A caching layer which buffers reads and writes in a transaction.
#[derive(Default)]
@ -70,6 +74,62 @@ impl Buffer {
Ok(results)
}
/// Run `f` to fetch entries in `range` from TiKV. Combine them with mutations in local buffer. Returns the results.
pub async fn scan_and_fetch<F, Fut>(
&self,
range: BoundRange,
limit: u32,
f: F,
) -> Result<impl Iterator<Item = KvPair>>
where
F: FnOnce(BoundRange, u32) -> Fut,
Fut: Future<Output = Result<Vec<KvPair>>>,
{
// read from local buffer
let mut mutations = self.mutations.lock().unwrap();
let mutation_range = mutations.range(range.clone());
// fetch from TiKV
// fetch more entries because some of them may be deleted.
let redundant_limit = limit
+ mutation_range
.clone()
.filter(|(_, m)| matches!(m, Mutation::Del))
.count() as u32;
let mut results = f(range, redundant_limit)
.await?
.into_iter()
.map(|pair| pair.into())
.collect::<HashMap<Key, Value>>();
// override using local data
for (k, m) in mutation_range {
match m {
Mutation::Put(v) => {
results.insert(k.clone(), v.clone());
}
Mutation::Del => {
results.remove(k);
}
_ => {}
}
}
// update local buffer
for (k, v) in &results {
mutations.insert(k.clone(), Mutation::Cached(Some(v.clone())));
}
let mut res = results
.into_iter()
.map(|(k, v)| KvPair::new(k, v))
.collect::<Vec<_>>();
res.sort_by_cached_key(|x| x.key().clone());
Ok(res.into_iter().take(limit as usize))
}
/// Lock the given key if necessary.
pub fn lock(&self, key: Key) {
let mut mutations = self.mutations.lock().unwrap();

View File

@ -132,10 +132,12 @@ impl Transaction {
) -> Result<impl Iterator<Item = KvPair>> {
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let pairs = new_mvcc_scan_request(range, timestamp, limit, key_only)
.execute(rpc)
.await?;
Ok(pairs.into_iter())
self.buffer
.scan_and_fetch(range.into(), limit, move |new_range, new_limit| {
new_mvcc_scan_request(new_range, timestamp, new_limit, key_only).execute(rpc)
})
.await
}
pub fn scan_reverse(&self, _range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {

View File

@ -9,7 +9,7 @@ use std::{
borrow::Borrow,
cmp::{Eq, PartialEq},
convert::TryFrom,
ops::{Bound, Range, RangeFrom, RangeInclusive},
ops::{Bound, Range, RangeBounds, RangeFrom, RangeInclusive},
};
/// A struct for expressing ranges. This type is semi-opaque and is not really meant for users to
@ -119,6 +119,36 @@ impl BoundRange {
}
}
impl RangeBounds<Key> for BoundRange {
fn start_bound(&self) -> Bound<&Key> {
match &self.from {
Bound::Included(f) => Bound::Included(f),
Bound::Excluded(f) => Bound::Excluded(f),
Bound::Unbounded => Bound::Unbounded,
}
}
fn end_bound(&self) -> Bound<&Key> {
match &self.to {
Bound::Included(t) => {
if t.is_empty() {
Bound::Unbounded
} else {
Bound::Included(t)
}
}
Bound::Excluded(t) => {
if t.is_empty() {
Bound::Unbounded
} else {
Bound::Excluded(t)
}
}
Bound::Unbounded => Bound::Unbounded,
}
}
}
// FIXME `==` should not `clone`
impl<T: Into<Key> + Clone> PartialEq<(Bound<T>, Bound<T>)> for BoundRange {
fn eq(&self, other: &(Bound<T>, Bound<T>)) -> bool {