client-rust/tests/failpoint_tests.rs

425 lines
13 KiB
Rust

#![cfg(feature = "integration-tests")]
mod common;
use std::collections::HashSet;
use std::iter::FromIterator;
use std::thread;
use std::time::Duration;
use common::*;
use fail::FailScenario;
use log::info;
use rand::thread_rng;
use serial_test::serial;
use tikv_client::transaction::Client;
use tikv_client::transaction::HeartbeatOption;
use tikv_client::transaction::ResolveLocksOptions;
use tikv_client::Backoff;
use tikv_client::CheckLevel;
use tikv_client::Config;
use tikv_client::Result;
use tikv_client::RetryOptions;
use tikv_client::TransactionClient;
use tikv_client::TransactionOptions;
#[tokio::test]
#[serial]
async fn txn_optimistic_heartbeat() -> Result<()> {
init().await?;
let scenario = FailScenario::setup();
fail::cfg("after-prewrite", "sleep(6000)").unwrap();
defer! {{
fail::cfg("after-prewrite", "off").unwrap();
}}
let key1 = "key1".to_owned();
let key2 = "key2".to_owned();
let client =
TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace())
.await?;
// CheckLevel::Panic makes the case unstable, change to Warn level for now.
// See https://github.com/tikv/client-rust/issues/389
let mut heartbeat_txn = client
.begin_with_options(
TransactionOptions::new_optimistic()
.heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1)))
.drop_check(CheckLevel::Warn),
)
.await?;
heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
let mut txn_without_heartbeat = client
.begin_with_options(
TransactionOptions::new_optimistic()
.heartbeat_option(HeartbeatOption::NoHeartbeat)
.drop_check(CheckLevel::Warn),
)
.await?;
txn_without_heartbeat
.put(key2.clone(), "fooo")
.await
.unwrap();
let heartbeat_txn_handle = tokio::task::spawn_blocking(move || {
assert!(futures::executor::block_on(heartbeat_txn.commit()).is_ok())
});
let txn_without_heartbeat_handle = tokio::task::spawn_blocking(move || {
assert!(futures::executor::block_on(txn_without_heartbeat.commit()).is_err())
});
// inital TTL is 3 seconds, before which TTL is valid regardless of heartbeat.
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
fail::cfg("after-prewrite", "off").unwrap();
// use other txns to check these locks
let mut t3 = client
.begin_with_options(
TransactionOptions::new_optimistic()
.no_resolve_locks()
.heartbeat_option(HeartbeatOption::NoHeartbeat)
.drop_check(CheckLevel::Warn),
)
.await?;
t3.put(key1.clone(), "gee").await?;
assert!(t3.commit().await.is_err());
let mut t4 = client
.begin_with_options(TransactionOptions::new_optimistic().drop_check(CheckLevel::Warn))
.await?;
t4.put(key2.clone(), "geee").await?;
t4.commit().await?;
heartbeat_txn_handle.await.unwrap();
txn_without_heartbeat_handle.await.unwrap();
scenario.teardown();
Ok(())
}
#[tokio::test]
#[serial]
async fn txn_cleanup_locks_batch_size() -> Result<()> {
init().await?;
let scenario = FailScenario::setup();
let full_range = ..;
fail::cfg("after-prewrite", "return").unwrap();
fail::cfg("before-cleanup-locks", "return").unwrap();
defer! {{
fail::cfg("after-prewrite", "off").unwrap();
fail::cfg("before-cleanup-locks", "off").unwrap();
}}
let client =
TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace())
.await?;
let keys = write_data(&client, true, true).await?;
assert_eq!(count_locks(&client).await?, keys.len());
let safepoint = client.current_timestamp().await?;
let options = ResolveLocksOptions {
async_commit_only: false,
batch_size: 4,
};
let res = client
.cleanup_locks(full_range, &safepoint, options)
.await?;
assert_eq!(res.resolved_locks, keys.len());
assert_eq!(count_locks(&client).await?, keys.len());
scenario.teardown();
Ok(())
}
#[tokio::test]
#[serial]
async fn txn_cleanup_async_commit_locks() -> Result<()> {
init().await?;
let scenario = FailScenario::setup();
let full_range = ..;
// no commit
{
info!("test no commit");
fail::cfg("after-prewrite", "return").unwrap();
defer! {
fail::cfg("after-prewrite", "off").unwrap()
}
let client = TransactionClient::new_with_config(
pd_addrs(),
Config::default().with_default_keyspace(),
)
.await?;
let keys = write_data(&client, true, true).await?;
assert_eq!(count_locks(&client).await?, keys.len());
let safepoint = client.current_timestamp().await?;
let options = ResolveLocksOptions {
async_commit_only: true,
..Default::default()
};
client
.cleanup_locks(full_range, &safepoint, options)
.await?;
must_committed(&client, keys).await;
assert_eq!(count_locks(&client).await?, 0);
}
// partial commit
{
info!("test partial commit");
let percent = 50;
fail::cfg("before-commit-secondary", &format!("return({percent})")).unwrap();
defer! {
fail::cfg("before-commit-secondary", "off").unwrap()
}
let client = TransactionClient::new_with_config(
pd_addrs(),
Config::default().with_default_keyspace(),
)
.await?;
let keys = write_data(&client, true, false).await?;
thread::sleep(Duration::from_secs(1)); // Wait for async commit to complete.
assert_eq!(count_locks(&client).await?, keys.len() * percent / 100);
let safepoint = client.current_timestamp().await?;
let options = ResolveLocksOptions {
async_commit_only: true,
..Default::default()
};
client
.cleanup_locks(full_range, &safepoint, options)
.await?;
must_committed(&client, keys).await;
assert_eq!(count_locks(&client).await?, 0);
}
// all committed
{
info!("test all committed");
let client = TransactionClient::new_with_config(
pd_addrs(),
Config::default().with_default_keyspace(),
)
.await?;
let keys = write_data(&client, true, false).await?;
let safepoint = client.current_timestamp().await?;
let options = ResolveLocksOptions {
async_commit_only: true,
..Default::default()
};
client
.cleanup_locks(full_range, &safepoint, options)
.await?;
must_committed(&client, keys).await;
assert_eq!(count_locks(&client).await?, 0);
}
// TODO: test rollback
// TODO: test region error
scenario.teardown();
Ok(())
}
#[tokio::test]
#[serial]
async fn txn_cleanup_range_async_commit_locks() -> Result<()> {
init().await?;
let scenario = FailScenario::setup();
info!("test range clean lock");
fail::cfg("after-prewrite", "return").unwrap();
defer! {
fail::cfg("after-prewrite", "off").unwrap()
}
let client =
TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace())
.await?;
let keys = write_data(&client, true, true).await?;
assert_eq!(count_locks(&client).await?, keys.len());
info!("total keys' count {}", keys.len());
let mut sorted_keys: Vec<Vec<u8>> = Vec::from_iter(keys.clone());
sorted_keys.sort();
let start_key = sorted_keys[1].clone();
let end_key = sorted_keys[sorted_keys.len() - 2].clone();
let safepoint = client.current_timestamp().await?;
let options = ResolveLocksOptions {
async_commit_only: true,
..Default::default()
};
let res = client
.cleanup_locks(start_key..end_key, &safepoint, options)
.await?;
assert_eq!(res.resolved_locks, keys.len() - 3);
// cleanup all locks to avoid affecting following cases.
let options = ResolveLocksOptions {
async_commit_only: false,
..Default::default()
};
client.cleanup_locks(.., &safepoint, options).await?;
must_committed(&client, keys).await;
assert_eq!(count_locks(&client).await?, 0);
scenario.teardown();
Ok(())
}
#[tokio::test]
#[serial]
async fn txn_cleanup_2pc_locks() -> Result<()> {
init().await?;
let scenario = FailScenario::setup();
let full_range = ..;
// no commit
{
info!("test no commit");
fail::cfg("after-prewrite", "return").unwrap();
defer! {
fail::cfg("after-prewrite", "off").unwrap()
}
let client = TransactionClient::new_with_config(
pd_addrs(),
Config::default().with_default_keyspace(),
)
.await?;
let keys = write_data(&client, false, true).await?;
assert_eq!(count_locks(&client).await?, keys.len());
let safepoint = client.current_timestamp().await?;
{
let options = ResolveLocksOptions {
async_commit_only: true, // Skip 2pc locks.
..Default::default()
};
client
.cleanup_locks(full_range, &safepoint, options)
.await?;
assert_eq!(count_locks(&client).await?, keys.len());
}
let options = ResolveLocksOptions {
async_commit_only: false,
..Default::default()
};
client
.cleanup_locks(full_range, &safepoint, options)
.await?;
must_rollbacked(&client, keys).await;
assert_eq!(count_locks(&client).await?, 0);
}
// all committed
{
info!("test all committed");
let client = TransactionClient::new_with_config(
pd_addrs(),
Config::default().with_default_keyspace(),
)
.await?;
let keys = write_data(&client, false, false).await?;
assert_eq!(count_locks(&client).await?, 0);
let safepoint = client.current_timestamp().await?;
let options = ResolveLocksOptions {
async_commit_only: false,
..Default::default()
};
client
.cleanup_locks(full_range, &safepoint, options)
.await?;
must_committed(&client, keys).await;
assert_eq!(count_locks(&client).await?, 0);
}
scenario.teardown();
Ok(())
}
async fn must_committed(client: &TransactionClient, keys: HashSet<Vec<u8>>) {
let ts = client.current_timestamp().await.unwrap();
let mut snapshot = client.snapshot(ts, TransactionOptions::default());
for key in keys {
let val = snapshot.get(key.clone()).await.unwrap();
assert_eq!(Some(key), val);
}
}
async fn must_rollbacked(client: &TransactionClient, keys: HashSet<Vec<u8>>) {
let ts = client.current_timestamp().await.unwrap();
let mut snapshot = client.snapshot(ts, TransactionOptions::default());
for key in keys {
let val = snapshot.get(key.clone()).await.unwrap();
assert_eq!(None, val);
}
}
async fn count_locks(client: &TransactionClient) -> Result<usize> {
let ts = client.current_timestamp().await.unwrap();
let locks = client.scan_locks(&ts, .., 1024).await?;
// De-duplicated as `scan_locks` will return duplicated locks due to retry on region changes.
let locks_set: HashSet<Vec<u8>> = HashSet::from_iter(locks.into_iter().map(|l| l.key));
Ok(locks_set.len())
}
// Note: too many transactions or keys will make CI unstable due to timeout.
const TXN_COUNT: usize = 16;
const KEY_COUNT: usize = 32;
const REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 5000, 20);
const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
async fn write_data(
client: &Client,
async_commit: bool,
commit_error: bool,
) -> Result<HashSet<Vec<u8>>> {
let mut rng = thread_rng();
let keys = gen_u32_keys((TXN_COUNT * KEY_COUNT) as u32, &mut rng);
let mut txns = Vec::with_capacity(TXN_COUNT);
let mut options = TransactionOptions::new_optimistic()
.retry_options(RetryOptions {
region_backoff: REGION_BACKOFF,
lock_backoff: OPTIMISTIC_BACKOFF,
})
.drop_check(CheckLevel::Warn);
if async_commit {
options = options.use_async_commit();
}
for _ in 0..TXN_COUNT {
let txn = client.begin_with_options(options.clone()).await?;
txns.push(txn);
}
for (i, key) in keys.iter().enumerate() {
txns[i % TXN_COUNT]
.put(key.to_owned(), key.to_owned())
.await?;
}
for txn in &mut txns {
let res = txn.commit().await;
assert_eq!(res.is_err(), commit_error, "error: {res:?}");
}
Ok(keys)
}