Merge branch 'master' into update-readme

This commit is contained in:
ekexium 2020-10-13 11:15:47 +08:00 committed by GitHub
commit 8f40ed061a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 115 additions and 39 deletions

View File

@ -46,8 +46,8 @@ script:
# For now we only run full integration tests on Linux. Here's why:
# * Docker on OS X is not supported by Travis.
# * Docker on Windows seems to not have the correct binary at `"/c/Program Files/Docker/Docker/DockerCli.exe" to switch it to Linux containers.
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name pd --rm pingcap/pd:nightly --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379"; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name kv --rm --ulimit nofile=90000:90000 pingcap/tikv:nightly --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv"; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name pd --rm pingcap/pd:latest --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379"; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name kv --rm --ulimit nofile=90000:90000 pingcap/tikv:latest --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv"; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker ps; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs pd; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs kv; fi

View File

@ -94,11 +94,7 @@ async fn main() -> Result<()> {
let keys: Vec<_> = pairs.into_iter().map(|p| p.key().clone()).collect();
assert_eq!(
&keys,
&[
Key::from("k1".to_owned()),
Key::from("k2".to_owned()),
Key::from("k3".to_owned())
]
&[Key::from("k1".to_owned()), Key::from("k2".to_owned()),]
);
println!("Scaning from {:?} to {:?} gives: {:?}", start, end, keys);

View File

@ -133,7 +133,7 @@ pub trait PdClient: Send + Sync + 'static {
.boxed()
}
// Returns a Steam which iterates over the contexts for ranges in the same region.
// Returns a Stream which iterates over the contexts for ranges in the same region.
fn group_ranges_by_region(
self: Arc<Self>,
mut ranges: Vec<BoundRange>,
@ -153,7 +153,10 @@ pub trait PdClient: Send + Sync + 'static {
let region_end = region.end_key();
let mut grouped = vec![];
if !region_end.is_empty()
&& end_key.clone().map(|x| x > region_end).unwrap_or(true)
&& end_key
.clone()
.map(|x| x > region_end || x.is_empty())
.unwrap_or(true)
{
grouped.push((start_key, region_end.clone()).into());
ranges.push((region_end, end_key).into());
@ -168,7 +171,10 @@ pub trait PdClient: Send + Sync + 'static {
break;
}
if !region_end.is_empty()
&& end_key.clone().map(|x| x > region_end).unwrap_or(true)
&& end_key
.clone()
.map(|x| x > region_end || x.is_empty())
.unwrap_or(true)
{
grouped.push((start_key, region_end.clone()).into());
ranges.push((region_end, end_key).into());

View File

@ -252,9 +252,13 @@ impl Client {
return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT));
}
requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone())
let res = requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone())
.execute(self.rpc.clone())
.await
.await;
res.map(|mut s| {
s.truncate(limit as usize);
s
})
}
/// Create a new 'batch scan' request.

View File

@ -333,7 +333,9 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results.try_for_each(|_| future::ready(Ok(()))).boxed()
results
.try_for_each_concurrent(None, |_| future::ready(Ok(())))
.boxed()
}
}

View File

@ -39,7 +39,7 @@ impl Buffer {
&self,
keys: impl Iterator<Item = Key>,
f: F,
) -> Result<impl Iterator<Item = (Key, Option<Value>)>>
) -> Result<impl Iterator<Item = KvPair>>
where
F: FnOnce(Vec<Key>) -> Fut,
Fut: Future<Output = Result<Vec<KvPair>>>,
@ -48,7 +48,10 @@ impl Buffer {
let mutations = self.mutations.lock().unwrap();
// Partition the keys into those we have buffered and those we have to
// get from the store.
let (undetermined_keys, cached_results): (Vec<(Key, MutationValue)>, _) = keys
let (undetermined_keys, cached_results): (
Vec<(Key, MutationValue)>,
Vec<(Key, MutationValue)>,
) = keys
.map(|key| {
let value = mutations
.get(&key)
@ -58,7 +61,9 @@ impl Buffer {
})
.partition(|(_, v)| *v == MutationValue::Undetermined);
let cached_results = cached_results.into_iter().map(|(k, v)| (k, v.unwrap()));
let cached_results = cached_results
.into_iter()
.filter_map(|(k, v)| v.unwrap().map(|v| KvPair(k, v)));
let undetermined_keys = undetermined_keys.into_iter().map(|(k, _)| k).collect();
(cached_results, undetermined_keys)
@ -70,7 +75,7 @@ impl Buffer {
mutations.insert(pair.0.clone(), Mutation::Cached(Some(pair.1.clone())));
}
let results = cached_results.chain(fetched_results.into_iter().map(|p| (p.0, Some(p.1))));
let results = cached_results.chain(fetched_results.into_iter());
Ok(results)
}
@ -260,13 +265,10 @@ mod tests {
))
.unwrap()
.collect::<Vec<_>>(),
vec![
(Key::from(b"key2".to_vec()), None),
(
vec![KvPair(
Key::from(b"key1".to_vec()),
Some(Value::from(b"value".to_vec()))
),
]
Value::from(b"value".to_vec())
),]
);
}
@ -304,16 +306,16 @@ mod tests {
assert_eq!(
r1.unwrap().collect::<Vec<_>>(),
vec![
(k1.clone(), Some(v1.clone())),
(k2.clone(), Some(v2.clone()))
KvPair(k1.clone(), v1.clone()),
KvPair(k2.clone(), v2.clone())
]
);
assert_eq!(r2.unwrap().unwrap(), v2.clone());
assert_eq!(
r3.unwrap().collect::<Vec<_>>(),
vec![
(k1.clone(), Some(v1.clone())),
(k2.clone(), Some(v2.clone()))
KvPair(k1.clone(), v1.clone()),
KvPair(k2.clone(), v2.clone())
]
);
}

View File

@ -348,7 +348,9 @@ impl KvRequest for kvrpcpb::PrewriteRequest {
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results.try_for_each(|_| future::ready(Ok(()))).boxed()
results
.try_for_each_concurrent(None, |_| future::ready(Ok(())))
.boxed()
}
}
@ -408,7 +410,9 @@ impl KvRequest for kvrpcpb::CommitRequest {
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results.try_for_each(|_| future::ready(Ok(()))).boxed()
results
.try_for_each_concurrent(None, |_| future::ready(Ok(())))
.boxed()
}
}
@ -454,7 +458,9 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest {
fn reduce(
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>> {
results.try_for_each(|_| future::ready(Ok(()))).boxed()
results
.try_for_each_concurrent(None, |_| future::ready(Ok(())))
.boxed()
}
}

View File

@ -24,7 +24,7 @@ impl Snapshot {
pub async fn batch_get(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = (Key, Option<Value>)>> {
) -> Result<impl Iterator<Item = KvPair>> {
self.transaction.batch_get(keys).await
}

View File

@ -75,7 +75,7 @@ impl Transaction {
.await
}
/// Gets the values associated with the given keys.
/// Gets the values associated with the given keys, skipping non-existent entries.
///
/// Non-existent entries will be skipped. The order of the keys is not retained.
///
@ -91,7 +91,8 @@ impl Transaction {
/// .batch_get(keys)
/// .await
/// .unwrap()
/// .filter_map(|(k, v)| v.map(move |v| (k, v))).collect();
/// .map(|pair| (pair.0, pair.1))
/// .collect();
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
@ -99,7 +100,7 @@ impl Transaction {
pub async fn batch_get(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = (Key, Option<Value>)>> {
) -> Result<impl Iterator<Item = KvPair>> {
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
self.buffer

View File

@ -63,12 +63,14 @@ async fn crud() -> Fallible<()> {
let client = TransactionClient::new(config).await?;
let mut txn = client.begin().await?;
// Get non-existent keys
assert!(txn.get("foo".to_owned()).await?.is_none());
// batch_get do not return non-existent entries
assert_eq!(
txn.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter(|(_, v)| v.is_some())
.count(),
0
);
@ -83,7 +85,7 @@ async fn crud() -> Fallible<()> {
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.map(|pair| (pair.0, pair.1))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
@ -104,7 +106,7 @@ async fn crud() -> Fallible<()> {
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.map(|pair| (pair.0, pair.1))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
@ -123,7 +125,7 @@ async fn crud() -> Fallible<()> {
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.map(|pair| (pair.0, pair.1))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
@ -436,6 +438,63 @@ async fn raw_req() -> Fallible<()> {
Fallible::Ok(())
}
/// Tests raw API when there are multiple regions.
/// Write large volumes of data to enforce region splitting.
/// In order to test `scan`, data is uniformly inserted.
///
/// Ignoring this because we don't want to mess up transactional tests.
#[tokio::test]
#[serial]
#[ignore]
async fn raw_write_million() -> Fallible<()> {
const NUM_BITS_TXN: u32 = 9;
const NUM_BITS_KEY_PER_TXN: u32 = 10;
let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN);
clear_tikv().await?;
let config = Config::new(pd_addrs());
let client = RawClient::new(config).await?;
for i in 0..2u32.pow(NUM_BITS_TXN) {
let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN);
let keys = iter::repeat_with(|| {
let v = cur;
cur = cur.overflowing_add(interval).0;
v
})
.map(|u| u.to_be_bytes().to_vec())
.take(2usize.pow(NUM_BITS_KEY_PER_TXN))
.collect::<Vec<_>>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13
client
.batch_put(
keys.iter()
.cloned()
.zip(iter::repeat(1u32.to_be_bytes().to_vec())),
)
.await?;
let res = client.batch_get(keys).await?;
assert_eq!(res.len(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
}
// test scan
let limit = 10;
let res = client.scan(vec![].., limit).await?;
assert_eq!(res.len(), limit as usize);
// test batch_scan
for batch_num in 1..4 {
let _ = client
.batch_scan(iter::repeat(vec![]..).take(batch_num), limit)
.await?;
// FIXME: `each_limit` parameter does no work as expected.
// It limits the entries on each region of each rangqe, instead of each range.
// assert_eq!(res.len(), limit as usize * batch_num);
}
Fallible::Ok(())
}
// helper function
async fn get_u32(client: &RawClient, key: Vec<u8>) -> Fallible<u32> {
let x = client.get(key).await?.unwrap();