Merge branch 'resolve_lock_in_range' of github.com:haojinming/client-rust into resolve_lock_in_range

This commit is contained in:
haojinming 2023-01-05 10:45:32 +08:00
commit 399efc0bc9
7 changed files with 82 additions and 32 deletions

View File

@ -502,7 +502,7 @@ impl<P: Plan, PdC: PdClient> Clone for CleanupLocks<P, PdC> {
logger: self.logger.clone(),
inner: self.inner.clone(),
ctx: self.ctx.clone(),
options: self.options,
options: self.options.clone(),
store: None,
pd_client: self.pd_client.clone(),
backoff: self.backoff.clone(),

View File

@ -53,7 +53,11 @@ pub fn store_stream_for_range<PdC: PdClient>(
range: (Vec<u8>, Vec<u8>),
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<((Vec<u8>, Vec<u8>), RegionStore)>> {
let bnd_range = BoundRange::from(range.clone());
let bnd_range = if range.1.is_empty() {
BoundRange::range_from(range.0.clone().into())
} else {
BoundRange::from(range.clone())
};
pd_client
.stores_for_range(bnd_range)
.map_ok(move |store| {
@ -67,25 +71,6 @@ pub fn store_stream_for_range<PdC: PdClient>(
.boxed()
}
pub fn store_stream_for_range_by_start_key<PdC: PdClient>(
start_key: Key,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Vec<u8>, RegionStore)>> {
let bnd_range = BoundRange::range_from(start_key.clone());
pd_client
.stores_for_range(bnd_range)
.map_ok(move |store| {
let region_range = store.region_with_leader.range();
(
range_intersection(region_range, (start_key.clone(), vec![].into()))
.0
.into(),
store,
)
})
.boxed()
}
/// The range used for request should be the intersection of `region_range` and `range`.
fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) {
let (lower, upper) = region_range;

View File

@ -13,7 +13,7 @@ use crate::{
Backoff, Result,
};
use slog::{Drain, Logger};
use std::{mem, sync::Arc};
use std::sync::Arc;
use tikv_client_proto::pdpb::Timestamp;
// FIXME: cargo-culted value
@ -263,11 +263,11 @@ impl Client {
) -> Result<CleanupLocksResult> {
debug!(self.logger, "invoking cleanup async commit locks");
// scan all locks with ts <= safepoint
let mut start_key = vec![];
let ctx = ResolveLocksContext::default();
let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
let req = new_scan_lock_request(
mem::take(&mut start_key),
options.start_key.clone(),
options.end_key.clone(),
safepoint.version(),
options.batch_size,
);
@ -287,9 +287,15 @@ impl Client {
&self,
safepoint: &Timestamp,
mut start_key: Vec<u8>,
mut end_key: Vec<u8>,
batch_size: u32,
) -> Result<Vec<tikv_client_proto::kvrpcpb::LockInfo>> {
let req = new_scan_lock_request(mem::take(&mut start_key), safepoint.version(), batch_size);
let req = new_scan_lock_request(
std::mem::take(&mut start_key),
std::mem::take(&mut end_key),
safepoint.version(),
batch_size,
);
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(crate::request::Collect)

View File

@ -149,9 +149,11 @@ pub struct ResolveLocksContext {
pub(crate) clean_regions: Arc<RwLock<HashMap<u64, HashSet<RegionVerId>>>>,
}
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Debug)]
pub struct ResolveLocksOptions {
pub async_commit_only: bool,
pub start_key: Vec<u8>,
pub end_key: Vec<u8>,
pub batch_size: u32,
}
@ -159,6 +161,8 @@ impl Default for ResolveLocksOptions {
fn default() -> Self {
Self {
async_commit_only: false,
start_key: vec![],
end_key: vec![],
batch_size: 1024,
}
}

View File

@ -162,10 +162,11 @@ pub fn new_pessimistic_lock_request(
pub fn new_scan_lock_request(
start_key: Key,
end_key: Key,
safepoint: Timestamp,
limit: u32,
) -> kvrpcpb::ScanLockRequest {
requests::new_scan_lock_request(start_key.into(), safepoint.version(), limit)
requests::new_scan_lock_request(start_key.into(), end_key.into(), safepoint.version(), limit)
}
pub fn new_heart_beat_request(

View File

@ -7,7 +7,7 @@ use crate::{
Collect, CollectSingle, CollectWithShard, DefaultProcessor, HasNextBatch, KvRequest, Merge,
NextBatch, Process, ResponseWithShard, Shardable, SingleKey,
},
store::{store_stream_for_keys, store_stream_for_range_by_start_key, RegionStore},
store::{store_stream_for_keys, store_stream_for_range, RegionStore},
timestamp::TimestampExt,
transaction::HasLocks,
util::iter::FlatMapOkIterExt,
@ -445,11 +445,13 @@ impl Merge<ResponseWithShard<kvrpcpb::PessimisticLockResponse, Vec<kvrpcpb::Muta
pub fn new_scan_lock_request(
start_key: Vec<u8>,
end_key: Vec<u8>,
safepoint: u64,
limit: u32,
) -> kvrpcpb::ScanLockRequest {
let mut req = kvrpcpb::ScanLockRequest::default();
req.set_start_key(start_key);
req.set_end_key(end_key);
req.set_max_version(safepoint);
req.set_limit(limit);
req
@ -460,18 +462,21 @@ impl KvRequest for kvrpcpb::ScanLockRequest {
}
impl Shardable for kvrpcpb::ScanLockRequest {
type Shard = Vec<u8>;
type Shard = (Vec<u8>, Vec<u8>);
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_range_by_start_key(self.start_key.clone().into(), pd_client.clone())
store_stream_for_range(
(self.start_key.clone(), self.end_key.clone()),
pd_client.clone(),
)
}
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.set_start_key(shard);
self.set_start_key(shard.0);
Ok(())
}
}
@ -479,6 +484,7 @@ impl Shardable for kvrpcpb::ScanLockRequest {
impl HasNextBatch for kvrpcpb::ScanLockResponse {
fn has_next_batch(&self) -> Option<(Vec<u8>, Vec<u8>)> {
self.get_locks().last().map(|lock| {
// TODO: if last key is larger or equal than ScanLockRequest.end_key, return None.
let mut start_key: Vec<u8> = lock.get_key().to_vec();
start_key.push(0);
(start_key, vec![])

View File

@ -99,6 +99,7 @@ async fn txn_cleanup_locks_batch_size() -> Result<()> {
let options = ResolveLocksOptions {
async_commit_only: false,
batch_size: 4,
..Default::default()
};
let res = client.cleanup_locks(&safepoint, options).await?;
@ -190,6 +191,53 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
Ok(())
}
#[tokio::test]
#[serial]
async fn txn_cleanup_range_async_commit_locks() -> Result<()> {
let logger = new_logger(slog::Level::Info);
init().await?;
let scenario = FailScenario::setup();
info!(logger, "test range clean lock");
fail::cfg("after-prewrite", "return").unwrap();
defer! {
fail::cfg("after-prewrite", "off").unwrap()
}
let client = TransactionClient::new(pd_addrs(), Some(logger.clone())).await?;
let keys = write_data(&client, true, true).await?;
assert_eq!(count_locks(&client).await?, keys.len());
info!(logger, "total keys' count {}", keys.len());
let mut sorted_keys: Vec<Vec<u8>> = Vec::from_iter(keys.clone().into_iter());
sorted_keys.sort();
let start_key = sorted_keys[1].clone();
let end_key = sorted_keys[sorted_keys.len() - 2].clone();
let safepoint = client.current_timestamp().await?;
let options = ResolveLocksOptions {
async_commit_only: true,
start_key,
end_key,
..Default::default()
};
let res = client.cleanup_locks(&safepoint, options).await?;
assert_eq!(res.meet_locks, keys.len() - 3);
// cleanup all locks to avoid affecting following cases.
let options = ResolveLocksOptions {
async_commit_only: false,
..Default::default()
};
client.cleanup_locks(&safepoint, options).await?;
must_committed(&client, keys).await;
assert_eq!(count_locks(&client).await?, 0);
scenario.teardown();
Ok(())
}
#[tokio::test]
#[serial]
async fn txn_cleanup_2pc_locks() -> Result<()> {
@ -271,7 +319,7 @@ async fn must_rollbacked(client: &TransactionClient, keys: HashSet<Vec<u8>>) {
async fn count_locks(client: &TransactionClient) -> Result<usize> {
let ts = client.current_timestamp().await.unwrap();
let locks = client.scan_locks(&ts, vec![], 1024).await?;
let locks = client.scan_locks(&ts, vec![], vec![], 1024).await?;
// De-duplicated as `scan_locks` will return duplicated locks due to retry on region changes.
let locks_set: HashSet<Vec<u8>> =
HashSet::from_iter(locks.into_iter().map(|mut l| l.take_key()));