#![allow(dead_code)] mod ctl; use std::collections::HashSet; use std::convert::TryInto; use std::env; use std::time::Duration; use log::info; use log::warn; use rand::Rng; use tikv_client::Key; use tikv_client::RawClient; use tikv_client::Result; use tikv_client::Transaction; use tikv_client::TransactionClient; use tikv_client::{ColumnFamily, Snapshot, TransactionOptions}; use tokio::time::sleep; const ENV_PD_ADDRS: &str = "PD_ADDRS"; const ENV_ENABLE_MULIT_REGION: &str = "MULTI_REGION"; const REGION_SPLIT_TIME_LIMIT: Duration = Duration::from_secs(15); // Delete all entries in TiKV to leave a clean space for following tests. pub async fn clear_tikv() { let cfs = vec![ ColumnFamily::Default, ColumnFamily::Lock, ColumnFamily::Write, ]; // DEFAULT_REGION_BACKOFF is not long enough for CI environment. So set a longer backoff. let backoff = tikv_client::Backoff::no_jitter_backoff(100, 30000, 20); for cf in cfs { let raw_client = RawClient::new(pd_addrs()).await.unwrap().with_cf(cf); raw_client .with_backoff(backoff.clone()) .delete_range(vec![]..) .await .unwrap(); } } // To test with multiple regions, prewrite some data. Tests that hope to test // with multiple regions should use keys in the corresponding ranges. pub async fn init() -> Result<()> { if env::var(ENV_ENABLE_MULIT_REGION).is_ok() { // 1000 keys: 0..1000 let keys_1 = std::iter::successors(Some(0u32), |x| Some(x + 1)) .take(1000) .map(|x| x.to_be_bytes().to_vec()); // 1024 keys: 0..u32::MAX let count = 1024; let step = u32::MAX / count; let keys_2 = std::iter::successors(Some(0u32), |x| Some(x + step)) .take(count as usize - 1) .map(|x| x.to_be_bytes().to_vec()); // 10 regions is enough for most tests ensure_region_split(keys_1.chain(keys_2), 10).await?; } clear_tikv().await; let region_cnt = ctl::get_region_count().await?; // print log for debug convenience println!("init finish with {region_cnt} regions"); Ok(()) } async fn ensure_region_split( keys: impl IntoIterator>, region_count: u32, ) -> Result<()> { if ctl::get_region_count().await? as u32 >= region_count { return Ok(()); } // 1. write plenty transactional keys // 2. wait until regions split let client = TransactionClient::new(pd_addrs()).await?; let mut txn = client.begin_optimistic().await?; for key in keys.into_iter() { txn.put(key.into(), vec![0, 0, 0, 0]).await?; } txn.commit().await?; let mut txn = client.begin_optimistic().await?; let _ = txn.scan(vec![].., 2048).await?; txn.commit().await?; info!("splitting regions..."); let start_time = std::time::Instant::now(); loop { if ctl::get_region_count().await? as u32 >= region_count { break; } if start_time.elapsed() > REGION_SPLIT_TIME_LIMIT { warn!("Stop splitting regions: time limit exceeded"); break; } sleep(Duration::from_millis(200)).await; } Ok(()) } pub fn pd_addrs() -> Vec { env::var(ENV_PD_ADDRS) .unwrap_or_else(|_| { info!( "Environment variable {} is not found. Using {:?} as default.", ENV_PD_ADDRS, "127.0.0.1:2379" ); "127.0.0.1:2379".to_owned() }) .split(',') .map(From::from) .collect() } // helper function pub async fn get_u32(client: &RawClient, key: Vec) -> Result { let x = client.get(key).await?.unwrap(); let boxed_slice = x.into_boxed_slice(); let array: Box<[u8; 4]> = boxed_slice .try_into() .expect("Value should not exceed u32 (4 * u8)"); Ok(u32::from_be_bytes(*array)) } // helper function pub async fn get_txn_u32(txn: &mut Transaction, key: Vec) -> Result { let x = txn.get(key).await?.unwrap(); let boxed_slice = x.into_boxed_slice(); let array: Box<[u8; 4]> = boxed_slice .try_into() .expect("Value should not exceed u32 (4 * u8)"); Ok(u32::from_be_bytes(*array)) } // helper function pub fn gen_u32_keys(num: u32, rng: &mut impl Rng) -> HashSet> { let mut set = HashSet::new(); for _ in 0..num { set.insert(rng.gen::().to_be_bytes().to_vec()); } set } pub async fn snapshot(client: &TransactionClient, is_pessimistic: bool) -> Result { let options = if is_pessimistic { TransactionOptions::new_pessimistic() } else { TransactionOptions::new_optimistic() }; Ok(client.snapshot(client.current_timestamp().await?, options)) } /// Copied from https://github.com/tikv/tikv/blob/d86a449d7f5b656cef28576f166e73291f501d77/components/tikv_util/src/macros.rs#L55 /// Simulates Go's defer. /// /// Please note that, different from go, this defer is bound to scope. /// When exiting the scope, its deferred calls are executed in last-in-first-out /// order. #[doc(hidden)] #[macro_export] macro_rules! defer { ($t:expr) => { let __ctx = $crate::DeferContext::new(|| $t); }; } /// Invokes the wrapped closure when dropped. pub struct DeferContext { t: Option, } impl DeferContext { pub fn new(t: T) -> DeferContext { DeferContext { t: Some(t) } } } impl Drop for DeferContext { fn drop(&mut self) { self.t.take().unwrap()() } }