From 93182159d9a49cc869760a26579f9d5f32dd774a Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 16 Sep 2020 14:13:15 +0800 Subject: [PATCH 1/3] raw bank transfer test Signed-off-by: ekexium --- Cargo.lock | 23 ++++++ Cargo.toml | 1 + tests/integration_tests.rs | 148 ++++++++++++++++++++++++++++++++++++- 3 files changed, 169 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ab76f1..5608d99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1670,6 +1670,28 @@ dependencies = [ "url", ] +[[package]] +name = "serial_test" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b15f74add9a9d4a3eb2bf739c9a427d266d3895b53d992c3a7c234fec2ff1f1" +dependencies = [ + "lazy_static", + "parking_lot", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65f59259be9fc1bf677d06cc1456e97756004a1a5a577480f71430bd7c17ba33" +dependencies = [ + "proc-macro2 1.0.19", + "quote 1.0.7", + "syn 1.0.35", +] + [[package]] name = "shlex" version = "0.1.1" @@ -1844,6 +1866,7 @@ dependencies = [ "regex", "serde", "serde_derive", + "serial_test", "simple_logger", "tempdir", "tikv-client-common", diff --git a/Cargo.toml b/Cargo.toml index 58147cf..0091507 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ proptest = "0.9" proptest-derive = "0.1.0" fail = { version = "0.3", features = [ "failpoints" ] } simple_logger = "1.9.0" +serial_test = "0.5.0" [patch.crates-io] raft-proto = { git = "https://github.com/tikv/raft-rs", rev = "e624c1d48460940a40d8aa69b5329460d9af87dd" } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index c07124b..e6d6511 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -2,12 +2,85 @@ use failure::Fallible; use futures::prelude::*; -use std::{collections::HashMap, env}; -use tikv_client::{Config, Key, Result, TransactionClient, Value}; +use rand::{seq::IteratorRandom, thread_rng, Rng}; +use serial_test::serial; +use std::{ + collections::{HashMap, HashSet}, + convert::TryInto, + env, +}; +use tikv_client::{Config, Key, RawClient, Result, TransactionClient, Value}; + +/// The number of keys that we scan in one iteration. +const SCAN_BATCH_SIZE: u32 = 10000; +const NUM_PEOPLE: u32 = 100; +const NUM_TRNASFER: u32 = 100; + +/// Delete all entris in TiKV to leave a clean space for following tests. +/// TiKV does not provide an elegant way to do this, so it is finished by scanning and deletions. +async fn clear_tikv() -> Fallible<()> { + delete_all_raw().await?; + delete_all_txn().await?; + Fallible::Ok(()) +} + +async fn delete_all_raw() -> Fallible<()> { + let config = Config::new(pd_addrs()); + let raw_client = RawClient::new(config).await?.with_key_only(true); + loop { + println!("in delete raw loop"); + let keys = raw_client + .scan(vec![]..vec![], SCAN_BATCH_SIZE) + .await + .expect("delete_all failed scanning all keys") + .into_iter() + .map(|pair| pair.into_key()) + .collect::>(); + + if keys.is_empty() { + break; + } + + raw_client + .batch_delete(keys) + .await + .expect("delete_all failed batch_delete"); + } + + Fallible::Ok(()) +} + +async fn delete_all_txn() -> Fallible<()> { + let config = Config::new(pd_addrs()); + let txn_client = TransactionClient::new(config).await?; + let mut txn = txn_client.begin().await?; + + loop { + println!("in delete txn loop"); + let mut keys = txn + .scan("".to_owned().."".to_owned(), SCAN_BATCH_SIZE, true) + .await? + .peekable(); + + if keys.peek().is_none() { + break; + } + + // FIXME: scan does not use local buffer? anyone else? + + for kv in keys { + println!("{:?}", kv.key()); + txn.delete(kv.into_key()).await?; + } + } + + txn.commit().await?; + Fallible::Ok(()) +} #[tokio::test] async fn get_timestamp() -> Fallible<()> { - const COUNT: usize = 1 << 16; + const COUNT: usize = 1 << 8; // use a small number to make test fast let config = Config::new(pd_addrs()); let client = TransactionClient::new(config).await?; @@ -25,7 +98,9 @@ async fn get_timestamp() -> Fallible<()> { } #[tokio::test] +#[serial] async fn crud() -> Fallible<()> { + clear_tikv().await?; let config = Config::new(pd_addrs()); let client = TransactionClient::new(config).await?; @@ -100,6 +175,73 @@ async fn crud() -> Fallible<()> { Fallible::Ok(()) } +/// bank transfer mainly tests raw put and get +#[tokio::test] +#[serial] +async fn raw_bank_transfer() -> Fallible<()> { + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = RawClient::new(config).await?; + let mut rng = thread_rng(); + + let people = gen_people(NUM_PEOPLE, &mut rng); + let mut sum: u32 = 0; + for person in &people { + let init = rng.gen::() as u32; + sum += init as u32; + client + .put(person.clone(), init.to_be_bytes().to_vec()) + .await?; + } + + // transfer + for _ in 0..NUM_TRNASFER { + let chosen_people = people.iter().choose_multiple(&mut rng, 2); + let alice = chosen_people[0]; + let mut alice_balance = get_account(&client, alice.clone()).await?; + let bob = chosen_people[1]; + let mut bob_balance = get_account(&client, bob.clone()).await?; + if alice_balance > bob_balance { + let transfer = rng.gen_range(0, alice_balance); + alice_balance -= transfer; + bob_balance += transfer; + client + .put(alice.clone(), alice_balance.to_be_bytes().to_vec()) + .await?; + client + .put(bob.clone(), bob_balance.to_be_bytes().to_vec()) + .await?; + } + } + + // check + let mut new_sum = 0; + for person in &people { + new_sum += get_account(&client, person.clone()).await?; + } + assert_eq!(sum, new_sum); + Fallible::Ok(()) +} + +// helper function +async fn get_account(client: &RawClient, person: Vec) -> Fallible { + let x = client.get(person).await?.unwrap(); + let boxed_slice = x.into_boxed_slice(); + let array: Box<[u8; 4]> = boxed_slice + .try_into() + .expect("Value should not exceed u32 (4 * u8)"); + Fallible::Ok(u32::from_be_bytes(*array)) +} + +// helper function +fn gen_people(num: u32, rng: &mut impl Rng) -> HashSet> { + let mut set = HashSet::new(); + for _ in 0..num { + set.insert(rng.gen::().to_be_bytes().to_vec()); + } + set +} + const ENV_PD_ADDRS: &str = "PD_ADDRS"; fn pd_addrs() -> Vec { From 67b52a998c6d87e0fc42845b725225a2933096db Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 21 Sep 2020 12:05:24 +0800 Subject: [PATCH 2/3] integration tests: add raw client test Signed-off-by: ekexium --- tests/integration_tests.rs | 182 +++++++++++++++++++++++++++++-------- 1 file changed, 144 insertions(+), 38 deletions(-) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index e6d6511..17a3452 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -11,13 +11,15 @@ use std::{ }; use tikv_client::{Config, Key, RawClient, Result, TransactionClient, Value}; -/// The number of keys that we scan in one iteration. -const SCAN_BATCH_SIZE: u32 = 10000; +/// The limit of scan in each iteration in `clear_tikv`. +const SCAN_BATCH_SIZE: u32 = 1000; + +// Parameters used in test const NUM_PEOPLE: u32 = 100; const NUM_TRNASFER: u32 = 100; /// Delete all entris in TiKV to leave a clean space for following tests. -/// TiKV does not provide an elegant way to do this, so it is finished by scanning and deletions. +/// TiKV does not provide an elegant way to do this, so it is done by scanning and deletions. async fn clear_tikv() -> Fallible<()> { delete_all_raw().await?; delete_all_txn().await?; @@ -27,26 +29,7 @@ async fn clear_tikv() -> Fallible<()> { async fn delete_all_raw() -> Fallible<()> { let config = Config::new(pd_addrs()); let raw_client = RawClient::new(config).await?.with_key_only(true); - loop { - println!("in delete raw loop"); - let keys = raw_client - .scan(vec![]..vec![], SCAN_BATCH_SIZE) - .await - .expect("delete_all failed scanning all keys") - .into_iter() - .map(|pair| pair.into_key()) - .collect::>(); - - if keys.is_empty() { - break; - } - - raw_client - .batch_delete(keys) - .await - .expect("delete_all failed batch_delete"); - } - + raw_client.delete_range(vec![]..).await?; Fallible::Ok(()) } @@ -56,20 +39,13 @@ async fn delete_all_txn() -> Fallible<()> { let mut txn = txn_client.begin().await?; loop { - println!("in delete txn loop"); - let mut keys = txn - .scan("".to_owned().."".to_owned(), SCAN_BATCH_SIZE, true) - .await? - .peekable(); + let mut keys = txn.scan(vec![].., SCAN_BATCH_SIZE, true).await?.peekable(); if keys.peek().is_none() { break; } - // FIXME: scan does not use local buffer? anyone else? - for kv in keys { - println!("{:?}", kv.key()); txn.delete(kv.into_key()).await?; } } @@ -184,7 +160,7 @@ async fn raw_bank_transfer() -> Fallible<()> { let client = RawClient::new(config).await?; let mut rng = thread_rng(); - let people = gen_people(NUM_PEOPLE, &mut rng); + let people = gen_u32_keys(NUM_PEOPLE, &mut rng); let mut sum: u32 = 0; for person in &people { let init = rng.gen::() as u32; @@ -198,9 +174,9 @@ async fn raw_bank_transfer() -> Fallible<()> { for _ in 0..NUM_TRNASFER { let chosen_people = people.iter().choose_multiple(&mut rng, 2); let alice = chosen_people[0]; - let mut alice_balance = get_account(&client, alice.clone()).await?; + let mut alice_balance = get_u32(&client, alice.clone()).await?; let bob = chosen_people[1]; - let mut bob_balance = get_account(&client, bob.clone()).await?; + let mut bob_balance = get_u32(&client, bob.clone()).await?; if alice_balance > bob_balance { let transfer = rng.gen_range(0, alice_balance); alice_balance -= transfer; @@ -217,15 +193,145 @@ async fn raw_bank_transfer() -> Fallible<()> { // check let mut new_sum = 0; for person in &people { - new_sum += get_account(&client, person.clone()).await?; + new_sum += get_u32(&client, person.clone()).await?; } assert_eq!(sum, new_sum); Fallible::Ok(()) } +#[tokio::test] +#[serial] +async fn raw() -> Fallible<()> { + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = RawClient::new(config).await?; + + // empty; get non-existent key + let res = client.get("k1".to_owned()).await; + assert_eq!(res?, None); + + // empty; put then batch_get + client.put("k1".to_owned(), "v1".to_owned()).await?; + client.put("k2".to_owned(), "v2".to_owned()).await?; + + let res = client + .batch_get(vec!["k1".to_owned(), "k2".to_owned(), "k3".to_owned()]) + .await?; + assert_eq!(res.len(), 2); + assert_eq!(res[0].1, "v1".as_bytes()); + assert_eq!(res[1].1, "v2".as_bytes()); + + // k1,k2; batch_put then batch_get + let _ = client + .batch_put(vec![ + ("k3".to_owned(), "v3".to_owned()), + ("k4".to_owned(), "v4".to_owned()), + ]) + .await?; + + let res = client + .batch_get(vec!["k4".to_owned(), "k3".to_owned()]) + .await?; + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + // k1,k2,k3,k4; delete then get + let res = client.delete("k3".to_owned()).await; + assert!(res.is_ok()); + + let res = client.get("k3".to_owned()).await?; + assert_eq!(res, None); + + // k1,k2,k4; batch_delete then batch_get + let res = client + .batch_delete(vec![ + "k1".to_owned(), + "k2".to_owned(), + "k3".to_owned(), + "k4".to_owned(), + ]) + .await; + assert!(res.is_ok()); + + let res = client + .batch_get(vec![ + "k1".to_owned(), + "k2".to_owned(), + "k3".to_owned(), + "k4".to_owned(), + ]) + .await?; + assert_eq!(res.len(), 0); + + // empty; batch_put then scan + let _ = client + .batch_put(vec![ + ("k3".to_owned(), "v3".to_owned()), + ("k5".to_owned(), "v5".to_owned()), + ("k1".to_owned(), "v1".to_owned()), + ("k2".to_owned(), "v2".to_owned()), + ("k4".to_owned(), "v4".to_owned()), + ]) + .await?; + + let res = client.scan("k2".to_owned()..="k5".to_owned(), 5).await?; + assert_eq!(res.len(), 4); + assert_eq!(res[0].1, "v2".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + assert_eq!(res[2].1, "v4".as_bytes()); + assert_eq!(res[3].1, "v5".as_bytes()); + + let res = client.scan("k2".to_owned().."k5".to_owned(), 2).await?; + assert_eq!(res.len(), 2); + assert_eq!(res[0].1, "v2".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + let res = client.scan("k1".to_owned().., 20).await?; + assert_eq!(res.len(), 5); + assert_eq!(res[0].1, "v1".as_bytes()); + assert_eq!(res[1].1, "v2".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + assert_eq!(res[3].1, "v4".as_bytes()); + assert_eq!(res[4].1, "v5".as_bytes()); + + let res = client + .batch_scan( + vec![ + "".to_owned().."k1".to_owned(), + "k1".to_owned().."k2".to_owned(), + "k2".to_owned().."k3".to_owned(), + "k3".to_owned().."k4".to_owned(), + "k4".to_owned().."k5".to_owned(), + ], + 2, + ) + .await?; + assert_eq!(res.len(), 4); + + let res = client + .batch_scan( + vec![ + "".to_owned()..="k3".to_owned(), + "k2".to_owned()..="k5".to_owned(), + ], + 4, + ) + .await?; + assert_eq!(res.len(), 7); + assert_eq!(res[0].1, "v1".as_bytes()); + assert_eq!(res[1].1, "v2".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + assert_eq!(res[3].1, "v2".as_bytes()); + assert_eq!(res[4].1, "v3".as_bytes()); + assert_eq!(res[5].1, "v4".as_bytes()); + assert_eq!(res[6].1, "v5".as_bytes()); + + Fallible::Ok(()) +} + // helper function -async fn get_account(client: &RawClient, person: Vec) -> Fallible { - let x = client.get(person).await?.unwrap(); +async fn get_u32(client: &RawClient, key: Vec) -> Fallible { + let x = client.get(key).await?.unwrap(); let boxed_slice = x.into_boxed_slice(); let array: Box<[u8; 4]> = boxed_slice .try_into() @@ -234,7 +340,7 @@ async fn get_account(client: &RawClient, person: Vec) -> Fallible { } // helper function -fn gen_people(num: u32, rng: &mut impl Rng) -> HashSet> { +fn gen_u32_keys(num: u32, rng: &mut impl Rng) -> HashSet> { let mut set = HashSet::new(); for _ in 0..num { set.insert(rng.gen::().to_be_bytes().to_vec()); From fae7ac85f4d39ff0752e14ac409229bf75951bbc Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 23 Sep 2020 10:19:48 +0800 Subject: [PATCH 3/3] add serial txn bank transfer test Signed-off-by: ekexium --- tests/integration_tests.rs | 79 ++++++++++++++++++++++++++++++++------ 1 file changed, 67 insertions(+), 12 deletions(-) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 17a3452..b9c178c 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -9,7 +9,7 @@ use std::{ convert::TryInto, env, }; -use tikv_client::{Config, Key, RawClient, Result, TransactionClient, Value}; +use tikv_client::{Config, Key, RawClient, Result, Transaction, TransactionClient, Value}; /// The limit of scan in each iteration in `clear_tikv`. const SCAN_BATCH_SIZE: u32 = 1000; @@ -177,17 +177,15 @@ async fn raw_bank_transfer() -> Fallible<()> { let mut alice_balance = get_u32(&client, alice.clone()).await?; let bob = chosen_people[1]; let mut bob_balance = get_u32(&client, bob.clone()).await?; - if alice_balance > bob_balance { - let transfer = rng.gen_range(0, alice_balance); - alice_balance -= transfer; - bob_balance += transfer; - client - .put(alice.clone(), alice_balance.to_be_bytes().to_vec()) - .await?; - client - .put(bob.clone(), bob_balance.to_be_bytes().to_vec()) - .await?; - } + let transfer = rng.gen_range(0, alice_balance); + alice_balance -= transfer; + bob_balance += transfer; + client + .put(alice.clone(), alice_balance.to_be_bytes().to_vec()) + .await?; + client + .put(bob.clone(), bob_balance.to_be_bytes().to_vec()) + .await?; } // check @@ -199,6 +197,53 @@ async fn raw_bank_transfer() -> Fallible<()> { Fallible::Ok(()) } +#[tokio::test] +#[serial] +async fn txn_bank_transfer() -> Fallible<()> { + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = TransactionClient::new(config).await?; + let mut rng = thread_rng(); + + let people = gen_u32_keys(NUM_PEOPLE, &mut rng); + let mut txn = client.begin().await?; + let mut sum: u32 = 0; + for person in &people { + let init = rng.gen::() as u32; + sum += init as u32; + txn.set(person.clone(), init.to_be_bytes().to_vec()).await?; + } + txn.commit().await?; + + // transfer + for _ in 0..NUM_TRNASFER { + let mut txn = client.begin().await?; + let chosen_people = people.iter().choose_multiple(&mut rng, 2); + let alice = chosen_people[0]; + let mut alice_balance = get_txn_u32(&txn, alice.clone()).await?; + let bob = chosen_people[1]; + let mut bob_balance = get_txn_u32(&txn, bob.clone()).await?; + let transfer = rng.gen_range(0, alice_balance); + alice_balance -= transfer; + bob_balance += transfer; + txn.set(alice.clone(), alice_balance.to_be_bytes().to_vec()) + .await?; + txn.set(bob.clone(), bob_balance.to_be_bytes().to_vec()) + .await?; + txn.commit().await?; + } + + // check + let mut new_sum = 0; + let mut txn = client.begin().await?; + for person in people.iter() { + new_sum += get_txn_u32(&txn, person.clone()).await?; + } + assert_eq!(sum, new_sum); + txn.commit().await?; + Fallible::Ok(()) +} + #[tokio::test] #[serial] async fn raw() -> Fallible<()> { @@ -339,6 +384,16 @@ async fn get_u32(client: &RawClient, key: Vec) -> Fallible { Fallible::Ok(u32::from_be_bytes(*array)) } +// helper function +async fn get_txn_u32(txn: &Transaction, key: Vec) -> Fallible { + let x = txn.get(key).await?.unwrap(); + let boxed_slice = x.into_boxed_slice(); + let array: Box<[u8; 4]> = boxed_slice + .try_into() + .expect("Value should not exceed u32 (4 * u8)"); + Fallible::Ok(u32::from_be_bytes(*array)) +} + // helper function fn gen_u32_keys(num: u32, rng: &mut impl Rng) -> HashSet> { let mut set = HashSet::new();