pessimistic locks use MAX_TTL (#329)

* pessimistic locks use MAX_TTL

Signed-off-by: ekexium <ekexium@gmail.com>

* fix clippy and test

Signed-off-by: ekexium <ekexium@gmail.com>

* fix the newTTL, use MAX_TTL

Signed-off-by: ekexium <ekexium@gmail.com>

Co-authored-by: Andy Lok <andylokandy@hotmail.com>
This commit is contained in:
Ziqian Qin 2022-01-25 14:18:10 +08:00 committed by GitHub
parent c045d1e6bd
commit 196b06eb9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 17 additions and 4 deletions

View File

@ -48,6 +48,7 @@ impl Config {
/// # use tikv_client::Config; /// # use tikv_client::Config;
/// let config = Config::default().with_security("root.ca", "internal.cert", "internal.key"); /// let config = Config::default().with_security("root.ca", "internal.cert", "internal.key");
/// ``` /// ```
#[must_use]
pub fn with_security( pub fn with_security(
mut self, mut self,
ca_path: impl Into<PathBuf>, ca_path: impl Into<PathBuf>,
@ -74,6 +75,7 @@ impl Config {
/// # use std::time::Duration; /// # use std::time::Duration;
/// let config = Config::default().with_timeout(Duration::from_secs(10)); /// let config = Config::default().with_timeout(Duration::from_secs(10));
/// ``` /// ```
#[must_use]
pub fn with_timeout(mut self, timeout: Duration) -> Self { pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout; self.timeout = timeout;
self self

View File

@ -120,6 +120,7 @@ impl Key {
/// Return the MVCC-encoded representation of the key. /// Return the MVCC-encoded representation of the key.
#[inline] #[inline]
#[must_use]
pub fn to_encoded(&self) -> Key { pub fn to_encoded(&self) -> Key {
let len = codec::max_encoded_bytes_size(self.0.len()); let len = codec::max_encoded_bytes_size(self.0.len());
let mut encoded = Vec::with_capacity(len); let mut encoded = Vec::with_capacity(len);

View File

@ -130,6 +130,7 @@ impl Client<PdRpcClient> {
/// let get_request = client.get("foo".to_owned()); /// let get_request = client.get("foo".to_owned());
/// # }); /// # });
/// ``` /// ```
#[must_use]
pub fn with_cf(&self, cf: ColumnFamily) -> Self { pub fn with_cf(&self, cf: ColumnFamily) -> Self {
Client { Client {
rpc: self.rpc.clone(), rpc: self.rpc.clone(),
@ -146,6 +147,7 @@ impl Client<PdRpcClient> {
/// the atomicity of CAS, write operations like [`put`](Client::put) or /// the atomicity of CAS, write operations like [`put`](Client::put) or
/// [`delete`](Client::delete) in atomic mode are more expensive. Some /// [`delete`](Client::delete) in atomic mode are more expensive. Some
/// operations are not supported in the mode. /// operations are not supported in the mode.
#[must_use]
pub fn with_atomic_for_cas(&self) -> Self { pub fn with_atomic_for_cas(&self) -> Self {
Client { Client {
rpc: self.rpc.clone(), rpc: self.rpc.clone(),

View File

@ -664,7 +664,7 @@ impl<PdC: PdClient> Transaction<PdC> {
let request = new_heart_beat_request( let request = new_heart_beat_request(
self.timestamp.clone(), self.timestamp.clone(),
primary_key, primary_key,
self.start_instant.elapsed().as_millis() as u64 + DEFAULT_LOCK_TTL, self.start_instant.elapsed().as_millis() as u64 + MAX_TTL,
); );
let plan = PlanBuilder::new(self.rpc.clone(), request) let plan = PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(self.options.retry_options.lock_backoff.clone()) .resolve_lock(self.options.retry_options.lock_backoff.clone())
@ -743,7 +743,7 @@ impl<PdC: PdClient> Transaction<PdC> {
keys.clone().into_iter(), keys.clone().into_iter(),
primary_lock, primary_lock,
self.timestamp.clone(), self.timestamp.clone(),
DEFAULT_LOCK_TTL, MAX_TTL,
for_update_ts, for_update_ts,
need_value, need_value,
); );
@ -822,7 +822,7 @@ impl<PdC: PdClient> Transaction<PdC> {
let request = new_heart_beat_request( let request = new_heart_beat_request(
start_ts.clone(), start_ts.clone(),
primary_key.clone(), primary_key.clone(),
start_instant.elapsed().as_millis() as u64 + DEFAULT_LOCK_TTL, start_instant.elapsed().as_millis() as u64 + MAX_TTL,
); );
let plan = PlanBuilder::new(rpc.clone(), request) let plan = PlanBuilder::new(rpc.clone(), request)
.retry_multi_region(region_backoff.clone()) .retry_multi_region(region_backoff.clone())
@ -945,42 +945,49 @@ impl TransactionOptions {
} }
/// Try to use async commit. /// Try to use async commit.
#[must_use]
pub fn use_async_commit(mut self) -> TransactionOptions { pub fn use_async_commit(mut self) -> TransactionOptions {
self.async_commit = true; self.async_commit = true;
self self
} }
/// Try to use 1pc. /// Try to use 1pc.
#[must_use]
pub fn try_one_pc(mut self) -> TransactionOptions { pub fn try_one_pc(mut self) -> TransactionOptions {
self.try_one_pc = true; self.try_one_pc = true;
self self
} }
/// Make the transaction read only. /// Make the transaction read only.
#[must_use]
pub fn read_only(mut self) -> TransactionOptions { pub fn read_only(mut self) -> TransactionOptions {
self.read_only = true; self.read_only = true;
self self
} }
/// Don't automatically resolve locks and retry if keys are locked. /// Don't automatically resolve locks and retry if keys are locked.
#[must_use]
pub fn no_resolve_locks(mut self) -> TransactionOptions { pub fn no_resolve_locks(mut self) -> TransactionOptions {
self.retry_options.lock_backoff = Backoff::no_backoff(); self.retry_options.lock_backoff = Backoff::no_backoff();
self self
} }
/// Don't automatically resolve regions with PD if we have outdated region information. /// Don't automatically resolve regions with PD if we have outdated region information.
#[must_use]
pub fn no_resolve_regions(mut self) -> TransactionOptions { pub fn no_resolve_regions(mut self) -> TransactionOptions {
self.retry_options.region_backoff = Backoff::no_backoff(); self.retry_options.region_backoff = Backoff::no_backoff();
self self
} }
/// Set RetryOptions. /// Set RetryOptions.
#[must_use]
pub fn retry_options(mut self, options: RetryOptions) -> TransactionOptions { pub fn retry_options(mut self, options: RetryOptions) -> TransactionOptions {
self.retry_options = options; self.retry_options = options;
self self
} }
/// Set the behavior when dropping a transaction without an attempt to commit or rollback it. /// Set the behavior when dropping a transaction without an attempt to commit or rollback it.
#[must_use]
pub fn drop_check(mut self, level: CheckLevel) -> TransactionOptions { pub fn drop_check(mut self, level: CheckLevel) -> TransactionOptions {
self.check_level = level; self.check_level = level;
self self
@ -998,6 +1005,7 @@ impl TransactionOptions {
} }
} }
#[must_use]
pub fn heartbeat_option(mut self, heartbeat_option: HeartbeatOption) -> TransactionOptions { pub fn heartbeat_option(mut self, heartbeat_option: HeartbeatOption) -> TransactionOptions {
self.heartbeat_option = heartbeat_option; self.heartbeat_option = heartbeat_option;
self self

View File

@ -742,7 +742,7 @@ async fn txn_pessimistic_heartbeat() -> Result<()> {
.await .await
.unwrap(); .unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await; tokio::time::sleep(tokio::time::Duration::from_secs(23)).await;
// use other txns to check these locks // use other txns to check these locks
let mut t3 = client let mut t3 = client