mirror of https://github.com/tikv/client-rust.git
raw bank transfer test
Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
parent
e29c1204da
commit
93182159d9
|
@ -1670,6 +1670,28 @@ dependencies = [
|
|||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b15f74add9a9d4a3eb2bf739c9a427d266d3895b53d992c3a7c234fec2ff1f1"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"parking_lot",
|
||||
"serial_test_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test_derive"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "65f59259be9fc1bf677d06cc1456e97756004a1a5a577480f71430bd7c17ba33"
|
||||
dependencies = [
|
||||
"proc-macro2 1.0.19",
|
||||
"quote 1.0.7",
|
||||
"syn 1.0.35",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "shlex"
|
||||
version = "0.1.1"
|
||||
|
@ -1844,6 +1866,7 @@ dependencies = [
|
|||
"regex",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serial_test",
|
||||
"simple_logger",
|
||||
"tempdir",
|
||||
"tikv-client-common",
|
||||
|
|
|
@ -51,6 +51,7 @@ proptest = "0.9"
|
|||
proptest-derive = "0.1.0"
|
||||
fail = { version = "0.3", features = [ "failpoints" ] }
|
||||
simple_logger = "1.9.0"
|
||||
serial_test = "0.5.0"
|
||||
|
||||
[patch.crates-io]
|
||||
raft-proto = { git = "https://github.com/tikv/raft-rs", rev = "e624c1d48460940a40d8aa69b5329460d9af87dd" }
|
||||
|
|
|
@ -2,12 +2,85 @@
|
|||
|
||||
use failure::Fallible;
|
||||
use futures::prelude::*;
|
||||
use std::{collections::HashMap, env};
|
||||
use tikv_client::{Config, Key, Result, TransactionClient, Value};
|
||||
use rand::{seq::IteratorRandom, thread_rng, Rng};
|
||||
use serial_test::serial;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
convert::TryInto,
|
||||
env,
|
||||
};
|
||||
use tikv_client::{Config, Key, RawClient, Result, TransactionClient, Value};
|
||||
|
||||
/// The number of keys that we scan in one iteration.
|
||||
const SCAN_BATCH_SIZE: u32 = 10000;
|
||||
const NUM_PEOPLE: u32 = 100;
|
||||
const NUM_TRNASFER: u32 = 100;
|
||||
|
||||
/// Delete all entris in TiKV to leave a clean space for following tests.
|
||||
/// TiKV does not provide an elegant way to do this, so it is finished by scanning and deletions.
|
||||
async fn clear_tikv() -> Fallible<()> {
|
||||
delete_all_raw().await?;
|
||||
delete_all_txn().await?;
|
||||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
async fn delete_all_raw() -> Fallible<()> {
|
||||
let config = Config::new(pd_addrs());
|
||||
let raw_client = RawClient::new(config).await?.with_key_only(true);
|
||||
loop {
|
||||
println!("in delete raw loop");
|
||||
let keys = raw_client
|
||||
.scan(vec![]..vec![], SCAN_BATCH_SIZE)
|
||||
.await
|
||||
.expect("delete_all failed scanning all keys")
|
||||
.into_iter()
|
||||
.map(|pair| pair.into_key())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if keys.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
raw_client
|
||||
.batch_delete(keys)
|
||||
.await
|
||||
.expect("delete_all failed batch_delete");
|
||||
}
|
||||
|
||||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
async fn delete_all_txn() -> Fallible<()> {
|
||||
let config = Config::new(pd_addrs());
|
||||
let txn_client = TransactionClient::new(config).await?;
|
||||
let mut txn = txn_client.begin().await?;
|
||||
|
||||
loop {
|
||||
println!("in delete txn loop");
|
||||
let mut keys = txn
|
||||
.scan("".to_owned().."".to_owned(), SCAN_BATCH_SIZE, true)
|
||||
.await?
|
||||
.peekable();
|
||||
|
||||
if keys.peek().is_none() {
|
||||
break;
|
||||
}
|
||||
|
||||
// FIXME: scan does not use local buffer? anyone else?
|
||||
|
||||
for kv in keys {
|
||||
println!("{:?}", kv.key());
|
||||
txn.delete(kv.into_key()).await?;
|
||||
}
|
||||
}
|
||||
|
||||
txn.commit().await?;
|
||||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_timestamp() -> Fallible<()> {
|
||||
const COUNT: usize = 1 << 16;
|
||||
const COUNT: usize = 1 << 8; // use a small number to make test fast
|
||||
let config = Config::new(pd_addrs());
|
||||
let client = TransactionClient::new(config).await?;
|
||||
|
||||
|
@ -25,7 +98,9 @@ async fn get_timestamp() -> Fallible<()> {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn crud() -> Fallible<()> {
|
||||
clear_tikv().await?;
|
||||
let config = Config::new(pd_addrs());
|
||||
|
||||
let client = TransactionClient::new(config).await?;
|
||||
|
@ -100,6 +175,73 @@ async fn crud() -> Fallible<()> {
|
|||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
/// bank transfer mainly tests raw put and get
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn raw_bank_transfer() -> Fallible<()> {
|
||||
clear_tikv().await?;
|
||||
let config = Config::new(pd_addrs());
|
||||
let client = RawClient::new(config).await?;
|
||||
let mut rng = thread_rng();
|
||||
|
||||
let people = gen_people(NUM_PEOPLE, &mut rng);
|
||||
let mut sum: u32 = 0;
|
||||
for person in &people {
|
||||
let init = rng.gen::<u8>() as u32;
|
||||
sum += init as u32;
|
||||
client
|
||||
.put(person.clone(), init.to_be_bytes().to_vec())
|
||||
.await?;
|
||||
}
|
||||
|
||||
// transfer
|
||||
for _ in 0..NUM_TRNASFER {
|
||||
let chosen_people = people.iter().choose_multiple(&mut rng, 2);
|
||||
let alice = chosen_people[0];
|
||||
let mut alice_balance = get_account(&client, alice.clone()).await?;
|
||||
let bob = chosen_people[1];
|
||||
let mut bob_balance = get_account(&client, bob.clone()).await?;
|
||||
if alice_balance > bob_balance {
|
||||
let transfer = rng.gen_range(0, alice_balance);
|
||||
alice_balance -= transfer;
|
||||
bob_balance += transfer;
|
||||
client
|
||||
.put(alice.clone(), alice_balance.to_be_bytes().to_vec())
|
||||
.await?;
|
||||
client
|
||||
.put(bob.clone(), bob_balance.to_be_bytes().to_vec())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// check
|
||||
let mut new_sum = 0;
|
||||
for person in &people {
|
||||
new_sum += get_account(&client, person.clone()).await?;
|
||||
}
|
||||
assert_eq!(sum, new_sum);
|
||||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
// helper function
|
||||
async fn get_account(client: &RawClient, person: Vec<u8>) -> Fallible<u32> {
|
||||
let x = client.get(person).await?.unwrap();
|
||||
let boxed_slice = x.into_boxed_slice();
|
||||
let array: Box<[u8; 4]> = boxed_slice
|
||||
.try_into()
|
||||
.expect("Value should not exceed u32 (4 * u8)");
|
||||
Fallible::Ok(u32::from_be_bytes(*array))
|
||||
}
|
||||
|
||||
// helper function
|
||||
fn gen_people(num: u32, rng: &mut impl Rng) -> HashSet<Vec<u8>> {
|
||||
let mut set = HashSet::new();
|
||||
for _ in 0..num {
|
||||
set.insert(rng.gen::<u32>().to_be_bytes().to_vec());
|
||||
}
|
||||
set
|
||||
}
|
||||
|
||||
const ENV_PD_ADDRS: &str = "PD_ADDRS";
|
||||
|
||||
fn pd_addrs() -> Vec<String> {
|
||||
|
|
Loading…
Reference in New Issue