diff --git a/.travis.yml b/.travis.yml index 2047f62..e0cbb7f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -43,18 +43,18 @@ script: - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "stable" ]]; then cargo clippy -- -D clippy::all; fi - cargo build --all - cargo build --examples - - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo doc --workspace --exclude tikv-client-proto --document-private-items --no-deps; fi + - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then make doc; fi - if [[ $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo test --all -- --nocapture; fi # For now we only run full integration tests on Linux. Here's why: # * Docker on OS X is not supported by Travis. # * Docker on Windows seems to not have the correct binary at `"/c/Program Files/Docker/Docker/DockerCli.exe" to switch it to Linux containers. - - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name pd --rm pingcap/pd --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379"; fi - - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name kv --rm --ulimit nofile=90000:90000 pingcap/tikv --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv"; fi + - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then make docker-pd; fi + - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then make docker-kv; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker ps; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs pd; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs kv; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then sleep 60; fi - - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi + - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then MULTI_REGION=1 make integration-test; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example raw -- --pd="127.0.0.1:2379"; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example transaction -- --pd="127.0.0.1:2379"; fi - if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example pessimistic -- --pd="127.0.0.1:2379"; fi diff --git a/Cargo.toml b/Cargo.toml index f3f9c34..c9904fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ derive-new = "0.5" fail = "0.4" futures = { version = "0.3", features = ["async-await", "thread-pool"] } futures-timer = "3.0" -grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false } +grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen", "openssl-vendored" ], default-features = false } lazy_static = "1" log = "0.4" prometheus = { version = "0.12", features = [ "push", "process" ], default-features = false } @@ -49,6 +49,8 @@ proptest-derive = "0.3" serial_test = "0.5.0" simple_logger = "1" tokio = { version = "1.0", features = [ "sync", "rt-multi-thread", "macros" ] } +reqwest = {version = "0.11", default-features = false, features = ["native-tls-vendored"]} +serde_json = "1" [workspace] members = [ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..beef4e3 --- /dev/null +++ b/Makefile @@ -0,0 +1,32 @@ +.PHONY: default check unit-test integration-tests test doc docker-pd docker-kv docker all + +default: check + +check: + cargo check --all + cargo fmt -- --check + cargo clippy -- -D clippy::all + +unit-test: + cargo test --all + +integration-test: +# MULTI_REGION shall be set manually if needed + PD_ADDRS="127.0.0.1:2379" cargo test txn_ --all --features integration-tests -- --nocapture + PD_ADDRS="127.0.0.1:2379" cargo test raw_ --all --features integration-tests -- --nocapture + PD_ADDRS="127.0.0.1:2379" cargo test misc_ --all --features integration-tests -- --nocapture + +test: unit-test integration-test + +doc: + cargo doc --workspace --exclude tikv-client-proto --document-private-items --no-deps + +docker-pd: + docker run -d -v $(shell pwd)/config:/config --net=host --name pd --rm pingcap/pd:latest --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379" --config /config/pd.toml + +docker-kv: + docker run -d -v $(shell pwd)/config:/config --net=host --name kv --rm --ulimit nofile=90000:90000 pingcap/tikv:latest --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv" --config /config/tikv.toml + +docker: docker-pd docker-kv + +all: check doc test diff --git a/config/pd.toml b/config/pd.toml new file mode 100644 index 0000000..b6d78e9 --- /dev/null +++ b/config/pd.toml @@ -0,0 +1,3 @@ +[schedule] +max-merge-region-size = 1 +max-merge-region-keys = 3 diff --git a/config/tikv.toml b/config/tikv.toml new file mode 100644 index 0000000..bf00961 --- /dev/null +++ b/config/tikv.toml @@ -0,0 +1,10 @@ +[coprocessor] +region-max-keys = 5 +region-split-keys = 3 +batch-split-limit = 100 + +[raftstore] +region-split-check-diff = "0KB" +pd-heartbeat-tick-interval = "2s" +pd-store-heartbeat-tick-interval = "5s" +split-region-check-tick-interval = "1s" \ No newline at end of file diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 9dfc07b..ba42f60 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -25,7 +25,7 @@ use tokio::sync::RwLock; // FIXME: these numbers and how they are used are all just cargo-culted in, there // may be more optimal values. const RECONNECT_INTERVAL_SEC: u64 = 1; -const MAX_REQUEST_COUNT: usize = 3; +const MAX_REQUEST_COUNT: usize = 5; const LEADER_CHANGE_RETRY: usize = 10; /// Client for communication with a PD cluster. Has the facility to reconnect to the cluster. diff --git a/src/util/iter.rs b/src/util/iter.rs index d251e60..3c84e3b 100644 --- a/src/util/iter.rs +++ b/src/util/iter.rs @@ -74,7 +74,7 @@ mod test { .into_iter() .flat_map_ok(|i| Some(i).into_iter()) .collect(); - assert_eq!(result.unwrap(), vec![]); + assert_eq!(result.unwrap(), Vec::::new()); let result: Result, ()> = vec![Result::::Ok(0), Ok(1), Ok(2)] .into_iter() diff --git a/tests/common/ctl.rs b/tests/common/ctl.rs new file mode 100644 index 0000000..01046c6 --- /dev/null +++ b/tests/common/ctl.rs @@ -0,0 +1,28 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. +//! The module provides some utility functions to control and get information +//! from PD, using its HTTP API. + +use super::pd_addrs; +use tikv_client_common::{Error, Result}; + +pub async fn get_region_count() -> Result { + let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0])) + .await + .map_err(|e| Error::StringError(e.to_string()))?; + + let body = res + .text() + .await + .map_err(|e| Error::StringError(e.to_string()))?; + let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap(); + value["count"] + .as_u64() + .ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned())) +} + +#[tokio::test] +#[ignore] +async fn test_get_region_count() -> Result<()> { + println!("{}", get_region_count().await?); + Ok(()) +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index e3a4730..1fb3f18 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,5 +1,13 @@ -use std::env; -use tikv_client::{ColumnFamily, RawClient}; +mod ctl; + +use futures_timer::Delay; +use log::{info, warn}; +use std::{env, time::Duration}; +use tikv_client::{ColumnFamily, Key, RawClient, Result, TransactionClient}; + +const ENV_PD_ADDRS: &str = "PD_ADDRS"; +const ENV_ENABLE_MULIT_REGION: &str = "MULTI_REGION"; +const REGION_SPLIT_TIME_LIMIT: Duration = Duration::from_secs(15); // Delete all entries in TiKV to leave a clean space for following tests. pub async fn clear_tikv() { @@ -14,7 +22,69 @@ pub async fn clear_tikv() { } } -const ENV_PD_ADDRS: &str = "PD_ADDRS"; +// To test with multiple regions, prewrite some data. Tests that hope to test +// with multiple regions should use keys in the corresponding ranges. +pub async fn init() -> Result<()> { + // ignore SetLoggerError + let _ = simple_logger::SimpleLogger::new() + .with_level(log::LevelFilter::Warn) + .init(); + + if env::var(ENV_ENABLE_MULIT_REGION).is_ok() { + // 1000 keys: 0..1000 + let keys_1 = std::iter::successors(Some(0u32), |x| Some(x + 1)) + .take(1000) + .map(|x| x.to_be_bytes().to_vec()); + // 1024 keys: 0..u32::MAX + let count = 1024; + let step = u32::MAX / count; + let keys_2 = std::iter::successors(Some(0u32), |x| Some(x + step)) + .take(count as usize - 1) + .map(|x| x.to_be_bytes().to_vec()); + + ensure_region_split(keys_1.chain(keys_2), 100).await?; + } + + clear_tikv().await; + Ok(()) +} + +async fn ensure_region_split( + keys: impl IntoIterator>, + region_count: u32, +) -> Result<()> { + if ctl::get_region_count().await? as u32 >= region_count { + return Ok(()); + } + + // 1. write plenty transactional keys + // 2. wait until regions split + + let client = TransactionClient::new(pd_addrs()).await?; + let mut txn = client.begin_optimistic().await?; + for key in keys.into_iter() { + txn.put(key.into(), vec![0, 0, 0, 0]).await?; + } + txn.commit().await?; + let mut txn = client.begin_optimistic().await?; + let _ = txn.scan(vec![].., 2048).await?; + txn.commit().await?; + + info!("splitting regions..."); + let start_time = std::time::Instant::now(); + loop { + if ctl::get_region_count().await? as u32 >= region_count { + break; + } + if start_time.elapsed() > REGION_SPLIT_TIME_LIMIT { + warn!("Stop splitting regions: time limit exceeded"); + break; + } + Delay::new(Duration::from_millis(200)).await; + } + + Ok(()) +} pub fn pd_addrs() -> Vec { env::var(ENV_PD_ADDRS) diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index 2e90613..90393ff 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -1,15 +1,15 @@ #![cfg(feature = "integration-tests")] mod common; -use common::{clear_tikv, pd_addrs}; +use common::{init, pd_addrs}; use fail::FailScenario; use serial_test::serial; use tikv_client::{Result, TransactionClient, TransactionOptions}; #[tokio::test] #[serial] -async fn optimistic_heartbeat() -> Result<()> { - clear_tikv().await; +async fn txn_optimistic_heartbeat() -> Result<()> { + init().await?; let scenario = FailScenario::setup(); fail::cfg("after-prewrite", "sleep(10000)").unwrap(); diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 0b1d757..4a6cfd7 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,7 +1,18 @@ #![cfg(feature = "integration-tests")] +//! # Naming convention +//! +//! Test names should begin with one of the following: +//! 1. txn_ +//! 2. raw_ +//! 3. misc_ +//! +//! We make use of the convention to control the order of tests in CI, to allow +//! transactional and raw tests to coexist, since transactional requests have +//! requirements on the region boundaries. + mod common; -use common::{clear_tikv, pd_addrs}; +use common::{init, pd_addrs}; use futures::prelude::*; use rand::{seq::IteratorRandom, thread_rng, Rng}; use serial_test::serial; @@ -20,7 +31,8 @@ const NUM_PEOPLE: u32 = 100; const NUM_TRNASFER: u32 = 100; #[tokio::test] -async fn get_timestamp() -> Result<()> { +#[serial] +async fn txn_get_timestamp() -> Result<()> { const COUNT: usize = 1 << 8; // use a small number to make test fast let client = TransactionClient::new(pd_addrs()).await?; @@ -40,8 +52,8 @@ async fn get_timestamp() -> Result<()> { // Tests transactional get, put, delete, batch_get #[tokio::test] #[serial] -async fn crud() -> Result<()> { - clear_tikv().await; +async fn txn_crud() -> Result<()> { + init().await?; let client = TransactionClient::new(pd_addrs()).await?; let mut txn = client.begin_optimistic().await?; @@ -124,8 +136,8 @@ async fn crud() -> Result<()> { // Tests transactional insert and delete-your-writes cases #[tokio::test] #[serial] -async fn insert_duplicate_keys() -> Result<()> { - clear_tikv().await; +async fn txn_insert_duplicate_keys() -> Result<()> { + init().await?; let client = TransactionClient::new(pd_addrs()).await?; // Initialize TiKV store with {foo => bar} @@ -148,8 +160,8 @@ async fn insert_duplicate_keys() -> Result<()> { #[tokio::test] #[serial] -async fn pessimistic() -> Result<()> { - clear_tikv().await; +async fn txn_pessimistic() -> Result<()> { + init().await?; let client = TransactionClient::new(pd_addrs()).await?; let mut txn = client.begin_pessimistic().await?; @@ -167,7 +179,7 @@ async fn pessimistic() -> Result<()> { #[tokio::test] #[serial] async fn raw_bank_transfer() -> Result<()> { - clear_tikv().await; + init().await?; let client = RawClient::new(pd_addrs()).await?; let mut rng = thread_rng(); @@ -211,20 +223,15 @@ async fn raw_bank_transfer() -> Result<()> { Ok(()) } -/// Tests transactional API when there are multiple regions. -/// Write large volumes of data to enforce region splitting. -/// In order to test `scan`, data is uniformly inserted. -// FIXME: this test is stupid. We should use pd-ctl or config files to make -// multiple regions, instead of bulk writing. #[tokio::test] #[serial] -async fn txn_write_million() -> Result<()> { - const NUM_BITS_TXN: u32 = 13; +async fn txn_read() -> Result<()> { + const NUM_BITS_TXN: u32 = 4; const NUM_BITS_KEY_PER_TXN: u32 = 4; let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN); let value = "large_value".repeat(10); - clear_tikv().await; + init().await?; let client = TransactionClient::new(pd_addrs()).await?; for i in 0..2u32.pow(NUM_BITS_TXN) { @@ -248,50 +255,49 @@ async fn txn_write_million() -> Result<()> { assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN)); txn.commit().await?; } - /* FIXME: scan all keys will make the message size exceed its limit - // test scan - let limit = 2u32.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN + 2); // large enough - let snapshot = client.snapshot( - client.current_timestamp().await?, - TransactionOptions::default(), - ); - let res = snapshot.scan(vec![].., limit).await?; - assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN)); + // test scan + let limit = 2u32.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN + 2); // large enough + let mut snapshot = client.snapshot( + client.current_timestamp().await?, + TransactionOptions::default(), + ); + let res = snapshot.scan(vec![].., limit).await?; + assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN)); - // scan by small range and combine them - let mut rng = thread_rng(); - let mut keys = gen_u32_keys(200, &mut rng) - .iter() - .cloned() - .collect::>(); - keys.sort(); + // scan by small range and combine them + let mut rng = thread_rng(); + let mut keys = gen_u32_keys(200, &mut rng) + .iter() + .cloned() + .collect::>(); + keys.sort(); - let mut sum = 0; + let mut sum = 0; - // empty key to key[0] - let snapshot = client.snapshot( - client.current_timestamp().await?, - TransactionOptions::default(), - ); - let res = snapshot.scan(vec![]..keys[0].clone(), limit).await?; - sum += res.count(); + // empty key to key[0] + let mut snapshot = client.snapshot( + client.current_timestamp().await?, + TransactionOptions::default(), + ); + let res = snapshot.scan(vec![]..keys[0].clone(), limit).await?; + sum += res.count(); - // key[i] .. key[i+1] - for i in 0..keys.len() - 1 { - let res = snapshot - .scan(keys[i].clone()..keys[i + 1].clone(), limit) - .await?; - sum += res.count(); - } + // key[i] .. key[i+1] + for i in 0..keys.len() - 1 { + let res = snapshot + .scan(keys[i].clone()..keys[i + 1].clone(), limit) + .await?; + sum += res.count(); + } - // keys[last] to unbounded - let res = snapshot.scan(keys[keys.len() - 1].clone().., limit).await?; - sum += res.count(); + // keys[last] to unbounded + let res = snapshot.scan(keys[keys.len() - 1].clone().., limit).await?; + sum += res.count(); + + assert_eq!(sum, 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN)); - assert_eq!(sum, 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN)); - */ // test batch_get and batch_get_for_update - const SKIP_BITS: u32 = 12; // do not retrieve all because there's a limit of message size + const SKIP_BITS: u32 = 0; // do not retrieve all because there's a limit of message size let mut cur = 0u32; let keys = iter::repeat_with(|| { let v = cur; @@ -316,14 +322,15 @@ async fn txn_write_million() -> Result<()> { #[tokio::test] #[serial] async fn txn_bank_transfer() -> Result<()> { - clear_tikv().await; + init().await?; let client = TransactionClient::new(pd_addrs()).await?; let mut rng = thread_rng(); + let options = TransactionOptions::new_optimistic() + .use_async_commit() + .drop_check(tikv_client::CheckLevel::Warn); let people = gen_u32_keys(NUM_PEOPLE, &mut rng); - let mut txn = client - .begin_with_options(TransactionOptions::new_optimistic()) - .await?; + let mut txn = client.begin_with_options(options.clone()).await?; let mut sum: u32 = 0; for person in &people { let init = rng.gen::() as u32; @@ -334,9 +341,7 @@ async fn txn_bank_transfer() -> Result<()> { // transfer for _ in 0..NUM_TRNASFER { - let mut txn = client - .begin_with_options(TransactionOptions::new_optimistic().use_async_commit()) - .await?; + let mut txn = client.begin_with_options(options.clone()).await?; let chosen_people = people.iter().choose_multiple(&mut rng, 2); let alice = chosen_people[0]; let mut alice_balance = get_txn_u32(&mut txn, alice.clone()).await?; @@ -370,7 +375,7 @@ async fn txn_bank_transfer() -> Result<()> { #[tokio::test] #[serial] async fn raw_req() -> Result<()> { - clear_tikv().await; + init().await?; let client = RawClient::new(pd_addrs()).await?; // empty; get non-existent key @@ -499,8 +504,8 @@ async fn raw_req() -> Result<()> { /// Only checks if we successfully update safepoint to PD. #[tokio::test] #[serial] -async fn test_update_safepoint() -> Result<()> { - clear_tikv().await; +async fn txn_update_safepoint() -> Result<()> { + init().await?; let client = TransactionClient::new(pd_addrs()).await?; let res = client.gc(client.current_timestamp().await?).await?; assert!(res); @@ -508,19 +513,14 @@ async fn test_update_safepoint() -> Result<()> { } /// Tests raw API when there are multiple regions. -/// Write large volumes of data to enforce region splitting. -/// In order to test `scan`, data is uniformly inserted. -/// -/// Ignoring this because we don't want to mess up transactional tests. #[tokio::test] #[serial] -#[ignore] async fn raw_write_million() -> Result<()> { - const NUM_BITS_TXN: u32 = 9; - const NUM_BITS_KEY_PER_TXN: u32 = 10; + const NUM_BITS_TXN: u32 = 4; + const NUM_BITS_KEY_PER_TXN: u32 = 4; let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN); - clear_tikv().await; + init().await?; let client = RawClient::new(pd_addrs()).await?; for i in 0..2u32.pow(NUM_BITS_TXN) { @@ -555,8 +555,8 @@ async fn raw_write_million() -> Result<()> { let _ = client .batch_scan(iter::repeat(vec![]..).take(batch_num), limit) .await?; - // FIXME: `each_limit` parameter does no work as expected. - // It limits the entries on each region of each rangqe, instead of each range. + // FIXME: `each_limit` parameter does no work as expected. It limits the + // entries on each region of each rangqe, instead of each range. // assert_eq!(res.len(), limit as usize * batch_num); } @@ -565,11 +565,12 @@ async fn raw_write_million() -> Result<()> { #[tokio::test] #[serial] -async fn pessimistic_rollback() -> Result<()> { - clear_tikv().await; +async fn txn_pessimistic_rollback() -> Result<()> { + init().await?; let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; let mut preload_txn = client.begin_optimistic().await?; let key1 = vec![1]; + let key2 = vec![2]; let value = key1.clone(); preload_txn.put(key1.clone(), value).await?; @@ -582,27 +583,27 @@ async fn pessimistic_rollback() -> Result<()> { result?; } - // for _ in 0..100 { - // let mut txn = client.begin_pessimistic().await?; - // let result = txn - // .batch_get_for_update(vec![key1.clone(), key2.clone()]) - // .await; - // txn.rollback().await?; - // let _ = result?; - // } + for _ in 0..100 { + let mut txn = client.begin_pessimistic().await?; + let result = txn + .batch_get_for_update(vec![key1.clone(), key2.clone()]) + .await; + txn.rollback().await?; + let _ = result?; + } Ok(()) } #[tokio::test] #[serial] -async fn pessimistic_delete() -> Result<()> { - clear_tikv().await; +async fn txn_pessimistic_delete() -> Result<()> { + init().await?; let client = TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?; - // The transaction will lock the keys and must release the locks on commit, even when values are - // not written to the DB. + // The transaction will lock the keys and must release the locks on commit, + // even when values are not written to the DB. let mut txn = client.begin_pessimistic().await?; txn.put(vec![1], vec![42]).await?; txn.delete(vec![1]).await?; @@ -639,8 +640,8 @@ async fn pessimistic_delete() -> Result<()> { #[tokio::test] #[serial] -async fn lock_keys() -> Result<()> { - clear_tikv().await; +async fn txn_lock_keys() -> Result<()> { + init().await?; let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; let k1 = b"key1".to_vec(); @@ -673,8 +674,8 @@ async fn lock_keys() -> Result<()> { #[tokio::test] #[serial] -async fn get_for_update() -> Result<()> { - clear_tikv().await; +async fn txn_get_for_update() -> Result<()> { + init().await?; let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; let key1 = "key".to_owned(); let key2 = "another key".to_owned(); @@ -717,8 +718,8 @@ async fn get_for_update() -> Result<()> { #[tokio::test] #[serial] -async fn pessimistic_heartbeat() -> Result<()> { - clear_tikv().await; +async fn txn_pessimistic_heartbeat() -> Result<()> { + init().await?; let key1 = "key1".to_owned(); let key2 = "key2".to_owned(); @@ -759,7 +760,7 @@ async fn pessimistic_heartbeat() -> Result<()> { #[tokio::test] #[serial] async fn raw_cas() -> Result<()> { - clear_tikv().await; + init().await?; let client = RawClient::new(pd_addrs()).await?.with_atomic_for_cas(); let key = "key".to_owned(); let value = "value".to_owned();