diff --git a/Cargo.lock b/Cargo.lock index 4ab76f1..5608d99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1670,6 +1670,28 @@ dependencies = [ "url", ] +[[package]] +name = "serial_test" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b15f74add9a9d4a3eb2bf739c9a427d266d3895b53d992c3a7c234fec2ff1f1" +dependencies = [ + "lazy_static", + "parking_lot", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65f59259be9fc1bf677d06cc1456e97756004a1a5a577480f71430bd7c17ba33" +dependencies = [ + "proc-macro2 1.0.19", + "quote 1.0.7", + "syn 1.0.35", +] + [[package]] name = "shlex" version = "0.1.1" @@ -1844,6 +1866,7 @@ dependencies = [ "regex", "serde", "serde_derive", + "serial_test", "simple_logger", "tempdir", "tikv-client-common", diff --git a/Cargo.toml b/Cargo.toml index 58147cf..0091507 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ proptest = "0.9" proptest-derive = "0.1.0" fail = { version = "0.3", features = [ "failpoints" ] } simple_logger = "1.9.0" +serial_test = "0.5.0" [patch.crates-io] raft-proto = { git = "https://github.com/tikv/raft-rs", rev = "e624c1d48460940a40d8aa69b5329460d9af87dd" } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 05f5f7b..9c7b412 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -2,12 +2,61 @@ use failure::Fallible; use futures::prelude::*; -use std::{collections::HashMap, env}; -use tikv_client::{Config, Key, Result, TransactionClient, Value}; +use rand::{seq::IteratorRandom, thread_rng, Rng}; +use serial_test::serial; +use std::{ + collections::{HashMap, HashSet}, + convert::TryInto, + env, +}; +use tikv_client::{Config, Key, RawClient, Result, Transaction, TransactionClient, Value}; + +/// The limit of scan in each iteration in `clear_tikv`. +const SCAN_BATCH_SIZE: u32 = 1000; + +// Parameters used in test +const NUM_PEOPLE: u32 = 100; +const NUM_TRNASFER: u32 = 100; + +/// Delete all entris in TiKV to leave a clean space for following tests. +/// TiKV does not provide an elegant way to do this, so it is done by scanning and deletions. +async fn clear_tikv() -> Fallible<()> { + delete_all_raw().await?; + delete_all_txn().await?; + Fallible::Ok(()) +} + +async fn delete_all_raw() -> Fallible<()> { + let config = Config::new(pd_addrs()); + let raw_client = RawClient::new(config).await?.with_key_only(true); + raw_client.delete_range(vec![]..).await?; + Fallible::Ok(()) +} + +async fn delete_all_txn() -> Fallible<()> { + let config = Config::new(pd_addrs()); + let txn_client = TransactionClient::new(config).await?; + let mut txn = txn_client.begin().await?; + + loop { + let mut keys = txn.scan(vec![].., SCAN_BATCH_SIZE, true).await?.peekable(); + + if keys.peek().is_none() { + break; + } + + for kv in keys { + txn.delete(kv.into_key()).await?; + } + } + + txn.commit().await?; + Fallible::Ok(()) +} #[tokio::test] async fn get_timestamp() -> Fallible<()> { - const COUNT: usize = 1 << 16; + const COUNT: usize = 1 << 8; // use a small number to make test fast let config = Config::new(pd_addrs()); let client = TransactionClient::new(config).await?; @@ -25,7 +74,9 @@ async fn get_timestamp() -> Fallible<()> { } #[tokio::test] +#[serial] async fn crud() -> Fallible<()> { + clear_tikv().await?; let config = Config::new(pd_addrs()); let client = TransactionClient::new(config).await?; @@ -100,6 +151,258 @@ async fn crud() -> Fallible<()> { Fallible::Ok(()) } +/// bank transfer mainly tests raw put and get +#[tokio::test] +#[serial] +async fn raw_bank_transfer() -> Fallible<()> { + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = RawClient::new(config).await?; + let mut rng = thread_rng(); + + let people = gen_u32_keys(NUM_PEOPLE, &mut rng); + let mut sum: u32 = 0; + for person in &people { + let init = rng.gen::() as u32; + sum += init as u32; + client + .put(person.clone(), init.to_be_bytes().to_vec()) + .await?; + } + + // transfer + for _ in 0..NUM_TRNASFER { + let chosen_people = people.iter().choose_multiple(&mut rng, 2); + let alice = chosen_people[0]; + let mut alice_balance = get_u32(&client, alice.clone()).await?; + let bob = chosen_people[1]; + let mut bob_balance = get_u32(&client, bob.clone()).await?; + let transfer = rng.gen_range(0, alice_balance); + alice_balance -= transfer; + bob_balance += transfer; + client + .put(alice.clone(), alice_balance.to_be_bytes().to_vec()) + .await?; + client + .put(bob.clone(), bob_balance.to_be_bytes().to_vec()) + .await?; + } + + // check + let mut new_sum = 0; + for person in &people { + new_sum += get_u32(&client, person.clone()).await?; + } + assert_eq!(sum, new_sum); + Fallible::Ok(()) +} + +#[tokio::test] +#[serial] +async fn txn_bank_transfer() -> Fallible<()> { + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = TransactionClient::new(config).await?; + let mut rng = thread_rng(); + + let people = gen_u32_keys(NUM_PEOPLE, &mut rng); + let mut txn = client.begin().await?; + let mut sum: u32 = 0; + for person in &people { + let init = rng.gen::() as u32; + sum += init as u32; + txn.set(person.clone(), init.to_be_bytes().to_vec()).await?; + } + txn.commit().await?; + + // transfer + for _ in 0..NUM_TRNASFER { + let mut txn = client.begin().await?; + let chosen_people = people.iter().choose_multiple(&mut rng, 2); + let alice = chosen_people[0]; + let mut alice_balance = get_txn_u32(&txn, alice.clone()).await?; + let bob = chosen_people[1]; + let mut bob_balance = get_txn_u32(&txn, bob.clone()).await?; + let transfer = rng.gen_range(0, alice_balance); + alice_balance -= transfer; + bob_balance += transfer; + txn.set(alice.clone(), alice_balance.to_be_bytes().to_vec()) + .await?; + txn.set(bob.clone(), bob_balance.to_be_bytes().to_vec()) + .await?; + txn.commit().await?; + } + + // check + let mut new_sum = 0; + let mut txn = client.begin().await?; + for person in people.iter() { + new_sum += get_txn_u32(&txn, person.clone()).await?; + } + assert_eq!(sum, new_sum); + txn.commit().await?; + Fallible::Ok(()) +} + +#[tokio::test] +#[serial] +async fn raw() -> Fallible<()> { + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = RawClient::new(config).await?; + + // empty; get non-existent key + let res = client.get("k1".to_owned()).await; + assert_eq!(res?, None); + + // empty; put then batch_get + client.put("k1".to_owned(), "v1".to_owned()).await?; + client.put("k2".to_owned(), "v2".to_owned()).await?; + + let res = client + .batch_get(vec!["k1".to_owned(), "k2".to_owned(), "k3".to_owned()]) + .await?; + assert_eq!(res.len(), 2); + assert_eq!(res[0].1, "v1".as_bytes()); + assert_eq!(res[1].1, "v2".as_bytes()); + + // k1,k2; batch_put then batch_get + let _ = client + .batch_put(vec![ + ("k3".to_owned(), "v3".to_owned()), + ("k4".to_owned(), "v4".to_owned()), + ]) + .await?; + + let res = client + .batch_get(vec!["k4".to_owned(), "k3".to_owned()]) + .await?; + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + // k1,k2,k3,k4; delete then get + let res = client.delete("k3".to_owned()).await; + assert!(res.is_ok()); + + let res = client.get("k3".to_owned()).await?; + assert_eq!(res, None); + + // k1,k2,k4; batch_delete then batch_get + let res = client + .batch_delete(vec![ + "k1".to_owned(), + "k2".to_owned(), + "k3".to_owned(), + "k4".to_owned(), + ]) + .await; + assert!(res.is_ok()); + + let res = client + .batch_get(vec![ + "k1".to_owned(), + "k2".to_owned(), + "k3".to_owned(), + "k4".to_owned(), + ]) + .await?; + assert_eq!(res.len(), 0); + + // empty; batch_put then scan + let _ = client + .batch_put(vec![ + ("k3".to_owned(), "v3".to_owned()), + ("k5".to_owned(), "v5".to_owned()), + ("k1".to_owned(), "v1".to_owned()), + ("k2".to_owned(), "v2".to_owned()), + ("k4".to_owned(), "v4".to_owned()), + ]) + .await?; + + let res = client.scan("k2".to_owned()..="k5".to_owned(), 5).await?; + assert_eq!(res.len(), 4); + assert_eq!(res[0].1, "v2".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + assert_eq!(res[2].1, "v4".as_bytes()); + assert_eq!(res[3].1, "v5".as_bytes()); + + let res = client.scan("k2".to_owned().."k5".to_owned(), 2).await?; + assert_eq!(res.len(), 2); + assert_eq!(res[0].1, "v2".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + let res = client.scan("k1".to_owned().., 20).await?; + assert_eq!(res.len(), 5); + assert_eq!(res[0].1, "v1".as_bytes()); + assert_eq!(res[1].1, "v2".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + assert_eq!(res[3].1, "v4".as_bytes()); + assert_eq!(res[4].1, "v5".as_bytes()); + + let res = client + .batch_scan( + vec![ + "".to_owned().."k1".to_owned(), + "k1".to_owned().."k2".to_owned(), + "k2".to_owned().."k3".to_owned(), + "k3".to_owned().."k4".to_owned(), + "k4".to_owned().."k5".to_owned(), + ], + 2, + ) + .await?; + assert_eq!(res.len(), 4); + + let res = client + .batch_scan( + vec![ + "".to_owned()..="k3".to_owned(), + "k2".to_owned()..="k5".to_owned(), + ], + 4, + ) + .await?; + assert_eq!(res.len(), 7); + assert_eq!(res[0].1, "v1".as_bytes()); + assert_eq!(res[1].1, "v2".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + assert_eq!(res[3].1, "v2".as_bytes()); + assert_eq!(res[4].1, "v3".as_bytes()); + assert_eq!(res[5].1, "v4".as_bytes()); + assert_eq!(res[6].1, "v5".as_bytes()); + + Fallible::Ok(()) +} + +// helper function +async fn get_u32(client: &RawClient, key: Vec) -> Fallible { + let x = client.get(key).await?.unwrap(); + let boxed_slice = x.into_boxed_slice(); + let array: Box<[u8; 4]> = boxed_slice + .try_into() + .expect("Value should not exceed u32 (4 * u8)"); + Fallible::Ok(u32::from_be_bytes(*array)) +} + +// helper function +async fn get_txn_u32(txn: &Transaction, key: Vec) -> Fallible { + let x = txn.get(key).await?.unwrap(); + let boxed_slice = x.into_boxed_slice(); + let array: Box<[u8; 4]> = boxed_slice + .try_into() + .expect("Value should not exceed u32 (4 * u8)"); + Fallible::Ok(u32::from_be_bytes(*array)) +} + +// helper function +fn gen_u32_keys(num: u32, rng: &mut impl Rng) -> HashSet> { + let mut set = HashSet::new(); + for _ in 0..num { + set.insert(rng.gen::().to_be_bytes().to_vec()); + } + set +} + const ENV_PD_ADDRS: &str = "PD_ADDRS"; fn pd_addrs() -> Vec {