integration tests: add raw client test

Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
ekexium 2020-09-21 12:05:24 +08:00
parent 4c887a4415
commit 67b52a998c
1 changed files with 144 additions and 38 deletions

View File

@ -11,13 +11,15 @@ use std::{
};
use tikv_client::{Config, Key, RawClient, Result, TransactionClient, Value};
/// The number of keys that we scan in one iteration.
const SCAN_BATCH_SIZE: u32 = 10000;
/// The limit of scan in each iteration in `clear_tikv`.
const SCAN_BATCH_SIZE: u32 = 1000;
// Parameters used in test
const NUM_PEOPLE: u32 = 100;
const NUM_TRNASFER: u32 = 100;
/// Delete all entris in TiKV to leave a clean space for following tests.
/// TiKV does not provide an elegant way to do this, so it is finished by scanning and deletions.
/// TiKV does not provide an elegant way to do this, so it is done by scanning and deletions.
async fn clear_tikv() -> Fallible<()> {
delete_all_raw().await?;
delete_all_txn().await?;
@ -27,26 +29,7 @@ async fn clear_tikv() -> Fallible<()> {
async fn delete_all_raw() -> Fallible<()> {
let config = Config::new(pd_addrs());
let raw_client = RawClient::new(config).await?.with_key_only(true);
loop {
println!("in delete raw loop");
let keys = raw_client
.scan(vec![]..vec![], SCAN_BATCH_SIZE)
.await
.expect("delete_all failed scanning all keys")
.into_iter()
.map(|pair| pair.into_key())
.collect::<Vec<_>>();
if keys.is_empty() {
break;
}
raw_client
.batch_delete(keys)
.await
.expect("delete_all failed batch_delete");
}
raw_client.delete_range(vec![]..).await?;
Fallible::Ok(())
}
@ -56,20 +39,13 @@ async fn delete_all_txn() -> Fallible<()> {
let mut txn = txn_client.begin().await?;
loop {
println!("in delete txn loop");
let mut keys = txn
.scan("".to_owned().."".to_owned(), SCAN_BATCH_SIZE, true)
.await?
.peekable();
let mut keys = txn.scan(vec![].., SCAN_BATCH_SIZE, true).await?.peekable();
if keys.peek().is_none() {
break;
}
// FIXME: scan does not use local buffer? anyone else?
for kv in keys {
println!("{:?}", kv.key());
txn.delete(kv.into_key()).await?;
}
}
@ -184,7 +160,7 @@ async fn raw_bank_transfer() -> Fallible<()> {
let client = RawClient::new(config).await?;
let mut rng = thread_rng();
let people = gen_people(NUM_PEOPLE, &mut rng);
let people = gen_u32_keys(NUM_PEOPLE, &mut rng);
let mut sum: u32 = 0;
for person in &people {
let init = rng.gen::<u8>() as u32;
@ -198,9 +174,9 @@ async fn raw_bank_transfer() -> Fallible<()> {
for _ in 0..NUM_TRNASFER {
let chosen_people = people.iter().choose_multiple(&mut rng, 2);
let alice = chosen_people[0];
let mut alice_balance = get_account(&client, alice.clone()).await?;
let mut alice_balance = get_u32(&client, alice.clone()).await?;
let bob = chosen_people[1];
let mut bob_balance = get_account(&client, bob.clone()).await?;
let mut bob_balance = get_u32(&client, bob.clone()).await?;
if alice_balance > bob_balance {
let transfer = rng.gen_range(0, alice_balance);
alice_balance -= transfer;
@ -217,15 +193,145 @@ async fn raw_bank_transfer() -> Fallible<()> {
// check
let mut new_sum = 0;
for person in &people {
new_sum += get_account(&client, person.clone()).await?;
new_sum += get_u32(&client, person.clone()).await?;
}
assert_eq!(sum, new_sum);
Fallible::Ok(())
}
#[tokio::test]
#[serial]
async fn raw() -> Fallible<()> {
clear_tikv().await?;
let config = Config::new(pd_addrs());
let client = RawClient::new(config).await?;
// empty; get non-existent key
let res = client.get("k1".to_owned()).await;
assert_eq!(res?, None);
// empty; put then batch_get
client.put("k1".to_owned(), "v1".to_owned()).await?;
client.put("k2".to_owned(), "v2".to_owned()).await?;
let res = client
.batch_get(vec!["k1".to_owned(), "k2".to_owned(), "k3".to_owned()])
.await?;
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, "v1".as_bytes());
assert_eq!(res[1].1, "v2".as_bytes());
// k1,k2; batch_put then batch_get
let _ = client
.batch_put(vec![
("k3".to_owned(), "v3".to_owned()),
("k4".to_owned(), "v4".to_owned()),
])
.await?;
let res = client
.batch_get(vec!["k4".to_owned(), "k3".to_owned()])
.await?;
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());
// k1,k2,k3,k4; delete then get
let res = client.delete("k3".to_owned()).await;
assert!(res.is_ok());
let res = client.get("k3".to_owned()).await?;
assert_eq!(res, None);
// k1,k2,k4; batch_delete then batch_get
let res = client
.batch_delete(vec![
"k1".to_owned(),
"k2".to_owned(),
"k3".to_owned(),
"k4".to_owned(),
])
.await;
assert!(res.is_ok());
let res = client
.batch_get(vec![
"k1".to_owned(),
"k2".to_owned(),
"k3".to_owned(),
"k4".to_owned(),
])
.await?;
assert_eq!(res.len(), 0);
// empty; batch_put then scan
let _ = client
.batch_put(vec![
("k3".to_owned(), "v3".to_owned()),
("k5".to_owned(), "v5".to_owned()),
("k1".to_owned(), "v1".to_owned()),
("k2".to_owned(), "v2".to_owned()),
("k4".to_owned(), "v4".to_owned()),
])
.await?;
let res = client.scan("k2".to_owned()..="k5".to_owned(), 5).await?;
assert_eq!(res.len(), 4);
assert_eq!(res[0].1, "v2".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());
assert_eq!(res[2].1, "v4".as_bytes());
assert_eq!(res[3].1, "v5".as_bytes());
let res = client.scan("k2".to_owned().."k5".to_owned(), 2).await?;
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, "v2".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());
let res = client.scan("k1".to_owned().., 20).await?;
assert_eq!(res.len(), 5);
assert_eq!(res[0].1, "v1".as_bytes());
assert_eq!(res[1].1, "v2".as_bytes());
assert_eq!(res[2].1, "v3".as_bytes());
assert_eq!(res[3].1, "v4".as_bytes());
assert_eq!(res[4].1, "v5".as_bytes());
let res = client
.batch_scan(
vec![
"".to_owned().."k1".to_owned(),
"k1".to_owned().."k2".to_owned(),
"k2".to_owned().."k3".to_owned(),
"k3".to_owned().."k4".to_owned(),
"k4".to_owned().."k5".to_owned(),
],
2,
)
.await?;
assert_eq!(res.len(), 4);
let res = client
.batch_scan(
vec![
"".to_owned()..="k3".to_owned(),
"k2".to_owned()..="k5".to_owned(),
],
4,
)
.await?;
assert_eq!(res.len(), 7);
assert_eq!(res[0].1, "v1".as_bytes());
assert_eq!(res[1].1, "v2".as_bytes());
assert_eq!(res[2].1, "v3".as_bytes());
assert_eq!(res[3].1, "v2".as_bytes());
assert_eq!(res[4].1, "v3".as_bytes());
assert_eq!(res[5].1, "v4".as_bytes());
assert_eq!(res[6].1, "v5".as_bytes());
Fallible::Ok(())
}
// helper function
async fn get_account(client: &RawClient, person: Vec<u8>) -> Fallible<u32> {
let x = client.get(person).await?.unwrap();
async fn get_u32(client: &RawClient, key: Vec<u8>) -> Fallible<u32> {
let x = client.get(key).await?.unwrap();
let boxed_slice = x.into_boxed_slice();
let array: Box<[u8; 4]> = boxed_slice
.try_into()
@ -234,7 +340,7 @@ async fn get_account(client: &RawClient, person: Vec<u8>) -> Fallible<u32> {
}
// helper function
fn gen_people(num: u32, rng: &mut impl Rng) -> HashSet<Vec<u8>> {
fn gen_u32_keys(num: u32, rng: &mut impl Rng) -> HashSet<Vec<u8>> {
let mut set = HashSet::new();
for _ in 0..num {
set.insert(rng.gen::<u32>().to_be_bytes().to_vec());