diff --git a/Cargo.lock b/Cargo.lock index 4ab76f1..5608d99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1670,6 +1670,28 @@ dependencies = [ "url", ] +[[package]] +name = "serial_test" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b15f74add9a9d4a3eb2bf739c9a427d266d3895b53d992c3a7c234fec2ff1f1" +dependencies = [ + "lazy_static", + "parking_lot", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65f59259be9fc1bf677d06cc1456e97756004a1a5a577480f71430bd7c17ba33" +dependencies = [ + "proc-macro2 1.0.19", + "quote 1.0.7", + "syn 1.0.35", +] + [[package]] name = "shlex" version = "0.1.1" @@ -1844,6 +1866,7 @@ dependencies = [ "regex", "serde", "serde_derive", + "serial_test", "simple_logger", "tempdir", "tikv-client-common", diff --git a/Cargo.toml b/Cargo.toml index 58147cf..0091507 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ proptest = "0.9" proptest-derive = "0.1.0" fail = { version = "0.3", features = [ "failpoints" ] } simple_logger = "1.9.0" +serial_test = "0.5.0" [patch.crates-io] raft-proto = { git = "https://github.com/tikv/raft-rs", rev = "e624c1d48460940a40d8aa69b5329460d9af87dd" } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index c07124b..e6d6511 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -2,12 +2,85 @@ use failure::Fallible; use futures::prelude::*; -use std::{collections::HashMap, env}; -use tikv_client::{Config, Key, Result, TransactionClient, Value}; +use rand::{seq::IteratorRandom, thread_rng, Rng}; +use serial_test::serial; +use std::{ + collections::{HashMap, HashSet}, + convert::TryInto, + env, +}; +use tikv_client::{Config, Key, RawClient, Result, TransactionClient, Value}; + +/// The number of keys that we scan in one iteration. +const SCAN_BATCH_SIZE: u32 = 10000; +const NUM_PEOPLE: u32 = 100; +const NUM_TRNASFER: u32 = 100; + +/// Delete all entris in TiKV to leave a clean space for following tests. +/// TiKV does not provide an elegant way to do this, so it is finished by scanning and deletions. +async fn clear_tikv() -> Fallible<()> { + delete_all_raw().await?; + delete_all_txn().await?; + Fallible::Ok(()) +} + +async fn delete_all_raw() -> Fallible<()> { + let config = Config::new(pd_addrs()); + let raw_client = RawClient::new(config).await?.with_key_only(true); + loop { + println!("in delete raw loop"); + let keys = raw_client + .scan(vec![]..vec![], SCAN_BATCH_SIZE) + .await + .expect("delete_all failed scanning all keys") + .into_iter() + .map(|pair| pair.into_key()) + .collect::>(); + + if keys.is_empty() { + break; + } + + raw_client + .batch_delete(keys) + .await + .expect("delete_all failed batch_delete"); + } + + Fallible::Ok(()) +} + +async fn delete_all_txn() -> Fallible<()> { + let config = Config::new(pd_addrs()); + let txn_client = TransactionClient::new(config).await?; + let mut txn = txn_client.begin().await?; + + loop { + println!("in delete txn loop"); + let mut keys = txn + .scan("".to_owned().."".to_owned(), SCAN_BATCH_SIZE, true) + .await? + .peekable(); + + if keys.peek().is_none() { + break; + } + + // FIXME: scan does not use local buffer? anyone else? + + for kv in keys { + println!("{:?}", kv.key()); + txn.delete(kv.into_key()).await?; + } + } + + txn.commit().await?; + Fallible::Ok(()) +} #[tokio::test] async fn get_timestamp() -> Fallible<()> { - const COUNT: usize = 1 << 16; + const COUNT: usize = 1 << 8; // use a small number to make test fast let config = Config::new(pd_addrs()); let client = TransactionClient::new(config).await?; @@ -25,7 +98,9 @@ async fn get_timestamp() -> Fallible<()> { } #[tokio::test] +#[serial] async fn crud() -> Fallible<()> { + clear_tikv().await?; let config = Config::new(pd_addrs()); let client = TransactionClient::new(config).await?; @@ -100,6 +175,73 @@ async fn crud() -> Fallible<()> { Fallible::Ok(()) } +/// bank transfer mainly tests raw put and get +#[tokio::test] +#[serial] +async fn raw_bank_transfer() -> Fallible<()> { + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = RawClient::new(config).await?; + let mut rng = thread_rng(); + + let people = gen_people(NUM_PEOPLE, &mut rng); + let mut sum: u32 = 0; + for person in &people { + let init = rng.gen::() as u32; + sum += init as u32; + client + .put(person.clone(), init.to_be_bytes().to_vec()) + .await?; + } + + // transfer + for _ in 0..NUM_TRNASFER { + let chosen_people = people.iter().choose_multiple(&mut rng, 2); + let alice = chosen_people[0]; + let mut alice_balance = get_account(&client, alice.clone()).await?; + let bob = chosen_people[1]; + let mut bob_balance = get_account(&client, bob.clone()).await?; + if alice_balance > bob_balance { + let transfer = rng.gen_range(0, alice_balance); + alice_balance -= transfer; + bob_balance += transfer; + client + .put(alice.clone(), alice_balance.to_be_bytes().to_vec()) + .await?; + client + .put(bob.clone(), bob_balance.to_be_bytes().to_vec()) + .await?; + } + } + + // check + let mut new_sum = 0; + for person in &people { + new_sum += get_account(&client, person.clone()).await?; + } + assert_eq!(sum, new_sum); + Fallible::Ok(()) +} + +// helper function +async fn get_account(client: &RawClient, person: Vec) -> Fallible { + let x = client.get(person).await?.unwrap(); + let boxed_slice = x.into_boxed_slice(); + let array: Box<[u8; 4]> = boxed_slice + .try_into() + .expect("Value should not exceed u32 (4 * u8)"); + Fallible::Ok(u32::from_be_bytes(*array)) +} + +// helper function +fn gen_people(num: u32, rng: &mut impl Rng) -> HashSet> { + let mut set = HashSet::new(); + for _ in 0..num { + set.insert(rng.gen::().to_be_bytes().to_vec()); + } + set +} + const ENV_PD_ADDRS: &str = "PD_ADDRS"; fn pd_addrs() -> Vec {