Merge pull request #213 from nrc/options

Refactor the transaction API re how we set options, and add some options to skip resolve and retry
This commit is contained in:
Nick Cameron 2021-01-06 10:22:50 +13:00 committed by GitHub
commit 2d657d74be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 520 additions and 469 deletions

View File

@ -24,12 +24,15 @@ async fn main() {
.await
.expect("Could not connect to tikv");
let key1: Key = b"key1".to_vec().into();
let key1: Key = b"key01".to_vec().into();
let value1: Value = b"value1".to_vec();
let key2: Key = b"key2".to_vec().into();
let key2: Key = b"key02".to_vec().into();
let value2: Value = b"value2".to_vec();
let mut txn0 = client.begin().await.expect("Could not begin a transaction");
for (key, value) in vec![(key1, value1), (key2, value2)] {
let mut txn0 = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
for (key, value) in vec![(key1.clone(), value1), (key2, value2)] {
txn0.put(key, value).await.expect("Could not set key value");
}
txn0.commit().await.expect("Could not commit");
@ -39,7 +42,6 @@ async fn main() {
.await
.expect("Could not begin a transaction");
// lock the key
let key1: Key = b"key1".to_vec().into();
let value = txn1
.get_for_update(key1.clone())
.await
@ -47,10 +49,12 @@ async fn main() {
println!("{:?}", (&key1, value));
{
// another txn cannot write to the locked key
let mut txn2 = client.begin().await.expect("Could not begin a transaction");
let key1: Key = b"key1".to_vec().into();
let mut txn2 = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
let value2: Value = b"value2".to_vec();
txn2.put(key1, value2).await.unwrap();
txn2.put(key1.clone(), value2).await.unwrap();
let result = txn2.commit().await;
assert!(result.is_err());
}
@ -58,7 +62,10 @@ async fn main() {
let value3: Value = b"value3".to_vec();
txn1.put(key1.clone(), value3).await.unwrap();
txn1.commit().await.unwrap();
let mut txn3 = client.begin().await.expect("Could not begin a transaction");
let mut txn3 = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
let result = txn3.get(key1.clone()).await.unwrap().unwrap();
txn3.commit()
.await

View File

@ -6,7 +6,10 @@ use crate::common::parse_args;
use tikv_client::{BoundRange, Config, Key, KvPair, TransactionClient as Client, Value};
async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
let mut txn = client.begin().await.expect("Could not begin a transaction");
let mut txn = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
for pair in pairs {
let (key, value) = pair.into().into();
txn.put(key, value).await.expect("Could not set key value");
@ -15,7 +18,10 @@ async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>
}
async fn get(client: &Client, key: Key) -> Option<Value> {
let mut txn = client.begin().await.expect("Could not begin a transaction");
let mut txn = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
let res = txn.get(key).await.expect("Could not get value");
txn.commit()
.await
@ -24,7 +30,10 @@ async fn get(client: &Client, key: Key) -> Option<Value> {
}
async fn scan(client: &Client, range: impl Into<BoundRange>, limit: u32) {
let mut txn = client.begin().await.expect("Could not begin a transaction");
let mut txn = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
txn.scan(range, limit)
.await
.expect("Could not scan key-value pairs in range")
@ -33,7 +42,10 @@ async fn scan(client: &Client, range: impl Into<BoundRange>, limit: u32) {
}
async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
let mut txn = client.begin().await.expect("Could not begin a transaction");
let mut txn = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
for key in keys {
txn.delete(key).await.expect("Could not delete the key");
}

View File

@ -5,162 +5,12 @@
use rand::{thread_rng, Rng};
use std::time::Duration;
pub trait Backoff: Clone + Send + 'static {
// Returns the delay period for next retry. If the maximum retry count is hit returns None.
fn next_delay_duration(&mut self) -> Option<Duration>;
}
// NoBackoff means that we don't want any retry here.
#[derive(Clone)]
pub struct NoBackoff;
impl Backoff for NoBackoff {
fn next_delay_duration(&mut self) -> Option<Duration> {
None
}
}
// Exponential backoff means that the retry delay should multiply a constant
// after each attempt, up to a maximum value. After each attempt, the new retry
// delay should be:
//
// new_delay = min(max_delay, base_delay * 2 ** attempts)
#[derive(Clone)]
pub struct NoJitterBackoff {
current_attempts: u32,
max_attempts: u32,
current_delay_ms: u64,
max_delay_ms: u64,
}
impl NoJitterBackoff {
pub const fn new(base_delay_ms: u64, max_delay_ms: u64, max_attempts: u32) -> Self {
Self {
current_attempts: 0,
max_attempts,
current_delay_ms: base_delay_ms,
max_delay_ms,
}
}
}
impl Backoff for NoJitterBackoff {
fn next_delay_duration(&mut self) -> Option<Duration> {
if self.current_attempts >= self.max_attempts {
return None;
}
let delay_ms = self.max_delay_ms.min(self.current_delay_ms);
self.current_attempts += 1;
self.current_delay_ms <<= 1;
Some(Duration::from_millis(delay_ms))
}
}
// Adds Jitter to the basic exponential backoff. Returns a random value between
// zero and the calculated exponential backoff:
//
// temp = min(max_delay, base_delay * 2 ** attempts)
// new_delay = random_between(0, temp)
#[derive(Clone)]
pub struct FullJitterBackoff {
current_attempts: u32,
max_attempts: u32,
current_delay_ms: u64,
max_delay_ms: u64,
}
impl FullJitterBackoff {
#[allow(dead_code)]
pub fn new(base_delay_ms: u64, max_delay_ms: u64, max_attempts: u32) -> Self {
if base_delay_ms == 0 || max_delay_ms == 0 {
panic!("Both base_delay_ms and max_delay_ms must be positive");
}
Self {
current_attempts: 0,
max_attempts,
current_delay_ms: base_delay_ms,
max_delay_ms,
}
}
}
impl Backoff for FullJitterBackoff {
fn next_delay_duration(&mut self) -> Option<Duration> {
if self.current_attempts >= self.max_attempts {
return None;
}
let delay_ms = self.max_delay_ms.min(self.current_delay_ms);
let mut rng = thread_rng();
let delay_ms: u64 = rng.gen_range(0, delay_ms);
self.current_attempts += 1;
self.current_delay_ms <<= 1;
Some(Duration::from_millis(delay_ms))
}
}
// Equal Jitter limits the random value should be equal or greater than half of
// the calculated exponential backoff:
//
// temp = min(max_delay, base_delay * 2 ** attempts)
// new_delay = random_between(temp / 2, temp)
#[derive(Clone)]
pub struct EqualJitterBackoff {
current_attempts: u32,
max_attempts: u32,
current_delay_ms: u64,
max_delay_ms: u64,
}
impl EqualJitterBackoff {
#[allow(dead_code)]
pub fn new(base_delay_ms: u64, max_delay_ms: u64, max_attempts: u32) -> Self {
if base_delay_ms < 2 || max_delay_ms < 2 {
panic!("Both base_delay_ms and max_delay_ms must be greater than 1");
}
Self {
current_attempts: 0,
max_attempts,
current_delay_ms: base_delay_ms,
max_delay_ms,
}
}
}
impl Backoff for EqualJitterBackoff {
fn next_delay_duration(&mut self) -> Option<Duration> {
if self.current_attempts >= self.max_attempts {
return None;
}
let delay_ms = self.max_delay_ms.min(self.current_delay_ms);
let half_delay_ms = delay_ms >> 1;
let mut rng = thread_rng();
let delay_ms: u64 = rng.gen_range(0, half_delay_ms) + half_delay_ms;
self.current_attempts += 1;
self.current_delay_ms <<= 1;
Some(Duration::from_millis(delay_ms))
}
}
// Decorrelated Jitter is always calculated with the previous backoff
// (the initial value is base_delay):
//
// temp = random_between(base_delay, previous_delay * 3)
// new_delay = min(max_delay, temp)
#[derive(Clone)]
pub struct DecorrelatedJitterBackoff {
/// When a request is retried, we can backoff for some time to avoid saturating the network.
///
/// `Backoff` is an object which determines how long to wait for.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Backoff {
kind: BackoffKind,
current_attempts: u32,
max_attempts: u32,
base_delay_ms: u64,
@ -168,14 +18,156 @@ pub struct DecorrelatedJitterBackoff {
max_delay_ms: u64,
}
impl DecorrelatedJitterBackoff {
#[allow(dead_code)]
pub fn new(base_delay_ms: u64, max_delay_ms: u64, max_attempts: u32) -> Self {
if base_delay_ms == 0 {
panic!("base_delay_ms must be positive");
impl Backoff {
// Returns the delay period for next retry. If the maximum retry count is hit returns None.
pub fn next_delay_duration(&mut self) -> Option<Duration> {
if self.current_attempts >= self.max_attempts {
return None;
}
self.current_attempts += 1;
Self {
match self.kind {
BackoffKind::None => None,
BackoffKind::NoJitter => {
let delay_ms = self.max_delay_ms.min(self.current_delay_ms);
self.current_delay_ms <<= 1;
Some(Duration::from_millis(delay_ms))
}
BackoffKind::FullJitter => {
let delay_ms = self.max_delay_ms.min(self.current_delay_ms);
let mut rng = thread_rng();
let delay_ms: u64 = rng.gen_range(0, delay_ms);
self.current_delay_ms <<= 1;
Some(Duration::from_millis(delay_ms))
}
BackoffKind::EqualJitter => {
let delay_ms = self.max_delay_ms.min(self.current_delay_ms);
let half_delay_ms = delay_ms >> 1;
let mut rng = thread_rng();
let delay_ms: u64 = rng.gen_range(0, half_delay_ms) + half_delay_ms;
self.current_delay_ms <<= 1;
Some(Duration::from_millis(delay_ms))
}
BackoffKind::DecorrelatedJitter => {
let mut rng = thread_rng();
let delay_ms: u64 = rng
.gen_range(0, self.current_delay_ms * 3 - self.base_delay_ms)
+ self.base_delay_ms;
let delay_ms = delay_ms.min(self.max_delay_ms);
self.current_delay_ms = delay_ms;
Some(Duration::from_millis(delay_ms))
}
}
}
/// True if we should not backoff at all (usually indicates that we should not retry a request).
pub fn is_none(&self) -> bool {
self.kind == BackoffKind::None
}
/// Don't wait. Usually indicates that we should not retry a request.
pub const fn no_backoff() -> Backoff {
Backoff {
kind: BackoffKind::None,
current_attempts: 0,
max_attempts: 0,
base_delay_ms: 0,
current_delay_ms: 0,
max_delay_ms: 0,
}
}
// Exponential backoff means that the retry delay should multiply a constant
// after each attempt, up to a maximum value. After each attempt, the new retry
// delay should be:
//
// new_delay = min(max_delay, base_delay * 2 ** attempts)
pub const fn no_jitter_backoff(
base_delay_ms: u64,
max_delay_ms: u64,
max_attempts: u32,
) -> Backoff {
Backoff {
kind: BackoffKind::NoJitter,
current_attempts: 0,
max_attempts,
base_delay_ms,
current_delay_ms: base_delay_ms,
max_delay_ms,
}
}
// Adds Jitter to the basic exponential backoff. Returns a random value between
// zero and the calculated exponential backoff:
//
// temp = min(max_delay, base_delay * 2 ** attempts)
// new_delay = random_between(0, temp)
pub fn full_jitter_backoff(
base_delay_ms: u64,
max_delay_ms: u64,
max_attempts: u32,
) -> Backoff {
assert!(
base_delay_ms > 0 && max_delay_ms > 0,
"Both base_delay_ms and max_delay_ms must be positive"
);
Backoff {
kind: BackoffKind::FullJitter,
current_attempts: 0,
max_attempts,
base_delay_ms,
current_delay_ms: base_delay_ms,
max_delay_ms,
}
}
// Equal Jitter limits the random value should be equal or greater than half of
// the calculated exponential backoff:
//
// temp = min(max_delay, base_delay * 2 ** attempts)
// new_delay = random_between(temp / 2, temp)
pub fn equal_jitter_backoff(
base_delay_ms: u64,
max_delay_ms: u64,
max_attempts: u32,
) -> Backoff {
assert!(
base_delay_ms > 1 && max_delay_ms > 1,
"Both base_delay_ms and max_delay_ms must be greater than 1"
);
Backoff {
kind: BackoffKind::EqualJitter,
current_attempts: 0,
max_attempts,
base_delay_ms,
current_delay_ms: base_delay_ms,
max_delay_ms,
}
}
// Decorrelated Jitter is always calculated with the previous backoff
// (the initial value is base_delay):
//
// temp = random_between(base_delay, previous_delay * 3)
// new_delay = min(max_delay, temp)
pub fn decorrelated_jitter_backoff(
base_delay_ms: u64,
max_delay_ms: u64,
max_attempts: u32,
) -> Backoff {
assert!(base_delay_ms > 0, "base_delay_ms must be positive");
Backoff {
kind: BackoffKind::DecorrelatedJitter,
current_attempts: 0,
max_attempts,
base_delay_ms,
@ -185,23 +177,14 @@ impl DecorrelatedJitterBackoff {
}
}
impl Backoff for DecorrelatedJitterBackoff {
fn next_delay_duration(&mut self) -> Option<Duration> {
if self.current_attempts >= self.max_attempts {
return None;
}
let mut rng = thread_rng();
let delay_ms: u64 =
rng.gen_range(0, self.current_delay_ms * 3 - self.base_delay_ms) + self.base_delay_ms;
let delay_ms = delay_ms.min(self.max_delay_ms);
self.current_attempts += 1;
self.current_delay_ms = delay_ms;
Some(Duration::from_millis(delay_ms))
}
/// The pattern for computing backoff times.
#[derive(Debug, Clone, PartialEq, Eq)]
enum BackoffKind {
None,
NoJitter,
FullJitter,
EqualJitter,
DecorrelatedJitter,
}
#[cfg(test)]
@ -212,22 +195,10 @@ mod test {
#[test]
fn test_no_jitter_backoff() {
// Tests for zero attempts.
let mut backoff = NoJitterBackoff {
current_attempts: 0,
max_attempts: 0,
current_delay_ms: 0,
max_delay_ms: 0,
};
let mut backoff = Backoff::no_jitter_backoff(0, 0, 0);
assert_eq!(backoff.next_delay_duration(), None);
let mut backoff = NoJitterBackoff {
current_attempts: 0,
max_attempts: 3,
current_delay_ms: 2,
max_delay_ms: 7,
};
let mut backoff = Backoff::no_jitter_backoff(2, 7, 3);
assert_eq!(
backoff.next_delay_duration(),
Some(Duration::from_millis(2))
@ -245,7 +216,7 @@ mod test {
#[test]
fn test_full_jitter_backoff() {
let mut backoff = FullJitterBackoff::new(2, 7, 3);
let mut backoff = Backoff::full_jitter_backoff(2, 7, 3);
assert!(backoff.next_delay_duration().unwrap() <= Duration::from_millis(2));
assert!(backoff.next_delay_duration().unwrap() <= Duration::from_millis(4));
assert!(backoff.next_delay_duration().unwrap() <= Duration::from_millis(7));
@ -255,18 +226,18 @@ mod test {
#[test]
#[should_panic(expected = "Both base_delay_ms and max_delay_ms must be positive")]
fn test_full_jitter_backoff_with_invalid_base_delay_ms() {
FullJitterBackoff::new(0, 7, 3);
Backoff::full_jitter_backoff(0, 7, 3);
}
#[test]
#[should_panic(expected = "Both base_delay_ms and max_delay_ms must be positive")]
fn test_full_jitter_backoff_with_invalid_max_delay_ms() {
FullJitterBackoff::new(2, 0, 3);
Backoff::full_jitter_backoff(2, 0, 3);
}
#[test]
fn test_equal_jitter_backoff() {
let mut backoff = EqualJitterBackoff::new(2, 7, 3);
let mut backoff = Backoff::equal_jitter_backoff(2, 7, 3);
let first_delay_dur = backoff.next_delay_duration().unwrap();
assert!(first_delay_dur >= Duration::from_millis(1));
@ -286,18 +257,18 @@ mod test {
#[test]
#[should_panic(expected = "Both base_delay_ms and max_delay_ms must be greater than 1")]
fn test_equal_jitter_backoff_with_invalid_base_delay_ms() {
EqualJitterBackoff::new(1, 7, 3);
Backoff::equal_jitter_backoff(1, 7, 3);
}
#[test]
#[should_panic(expected = "Both base_delay_ms and max_delay_ms must be greater than 1")]
fn test_equal_jitter_backoff_with_invalid_max_delay_ms() {
EqualJitterBackoff::new(2, 1, 3);
Backoff::equal_jitter_backoff(2, 1, 3);
}
#[test]
fn test_decorrelated_jitter_backoff() {
let mut backoff = DecorrelatedJitterBackoff::new(2, 7, 3);
let mut backoff = Backoff::decorrelated_jitter_backoff(2, 7, 3);
let first_delay_dur = backoff.next_delay_duration().unwrap();
assert!(first_delay_dur >= Duration::from_millis(2));
@ -319,6 +290,6 @@ mod test {
#[test]
#[should_panic(expected = "base_delay_ms must be positive")]
fn test_decorrelated_jitter_backoff_with_invalid_base_delay_ms() {
DecorrelatedJitterBackoff::new(0, 7, 3);
Backoff::decorrelated_jitter_backoff(0, 7, 3);
}
}

View File

@ -27,7 +27,6 @@ pub struct Config {
pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>,
pub timeout: Duration,
pub try_one_pc: bool,
}
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
@ -39,7 +38,6 @@ impl Default for Config {
cert_path: None,
key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT,
try_one_pc: false,
}
}
}
@ -81,9 +79,4 @@ impl Config {
self.timeout = timeout;
self
}
pub fn try_one_pc(mut self) -> Self {
self.try_one_pc = true;
self
}
}

View File

@ -98,14 +98,21 @@ mod proptests;
#[macro_use]
extern crate log;
#[doc(inline)]
pub use crate::backoff::Backoff;
#[doc(inline)]
pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value};
#[doc(inline)]
pub use crate::raw::{Client as RawClient, ColumnFamily};
#[doc(inline)]
pub use crate::request::RetryOptions;
#[doc(inline)]
pub use crate::timestamp::{Timestamp, TimestampExt};
#[doc(inline)]
pub use crate::transaction::{Client as TransactionClient, Snapshot, Transaction};
pub use crate::transaction::{
Client as TransactionClient, Snapshot, Transaction, TransactionOptions,
};
#[doc(inline)]
pub use config::Config;
#[doc(inline)]
pub use region::{Region, RegionId, RegionVerId, StoreId};

View File

@ -6,7 +6,7 @@ use super::requests;
use crate::{
config::Config,
pd::PdRpcClient,
request::{KvRequest, OPTIMISTIC_BACKOFF},
request::{KvRequest, RetryOptions},
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
};
use std::{sync::Arc, u32};
@ -111,7 +111,7 @@ impl Client {
/// ```
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
requests::new_raw_get_request(key, self.cf.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -138,7 +138,7 @@ impl Client {
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
requests::new_raw_batch_get_request(keys, self.cf.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -160,7 +160,7 @@ impl Client {
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
requests::new_raw_put_request(key, value, self.cf.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -186,7 +186,7 @@ impl Client {
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
requests::new_raw_batch_put_request(pairs, self.cf.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -209,7 +209,7 @@ impl Client {
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
requests::new_raw_delete_request(key, self.cf.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -232,7 +232,7 @@ impl Client {
/// ```
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
requests::new_raw_batch_delete_request(keys, self.cf.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -253,7 +253,7 @@ impl Client {
/// ```
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
requests::new_raw_delete_range_request(range, self.cf.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -389,7 +389,7 @@ impl Client {
}
let res = requests::new_raw_scan_request(range, limit, key_only, self.cf.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await;
res.map(|mut s| {
s.truncate(limit as usize);
@ -411,7 +411,7 @@ impl Client {
}
requests::new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
}

View File

@ -467,7 +467,7 @@ mod test {
use super::*;
use crate::{
mock::{MockKvClient, MockPdClient},
request::OPTIMISTIC_BACKOFF,
request::RetryOptions,
};
use futures::executor;
use std::any::Any;
@ -502,7 +502,8 @@ mod test {
key_only: true,
..Default::default()
};
let scan = executor::block_on(scan.execute(client, OPTIMISTIC_BACKOFF)).unwrap();
let scan =
executor::block_on(scan.execute(client, RetryOptions::default_optimistic())).unwrap();
assert_eq!(scan.len(), 10);
// TODO test the keys returned.

View File

@ -1,7 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
backoff::{Backoff, NoBackoff, NoJitterBackoff},
backoff::Backoff,
pd::PdClient,
stats::tikv_stats,
store::Store,
@ -9,6 +9,7 @@ use crate::{
BoundRange, Error, Key, Result,
};
use async_trait::async_trait;
use derive_new::new;
use futures::{prelude::*, stream::BoxStream};
use std::{
cmp::{max, min},
@ -16,9 +17,9 @@ use std::{
};
use tikv_client_store::{HasError, HasRegionError, Request};
const DEFAULT_REGION_BACKOFF: NoJitterBackoff = NoJitterBackoff::new(2, 500, 10);
pub const OPTIMISTIC_BACKOFF: NoJitterBackoff = NoJitterBackoff::new(2, 500, 10);
pub const PESSIMISTIC_BACKOFF: NoBackoff = NoBackoff;
const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
pub const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
pub const PESSIMISTIC_BACKOFF: Backoff = Backoff::no_backoff();
#[async_trait]
pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
@ -30,13 +31,13 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
/// is the part which differs among the requests.
type KeyData: Send;
async fn execute<Pd: PdClient, B: Backoff>(
async fn execute<Pd: PdClient>(
self,
pd_client: Arc<Pd>,
lock_backoff: B,
retry: RetryOptions,
) -> Result<Self::Result> {
Self::reduce(
self.response_stream(pd_client, lock_backoff)
self.response_stream(pd_client, retry)
.and_then(|mut response| match response.error() {
Some(e) => future::err(e),
None => future::ok(response),
@ -50,16 +51,15 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
fn response_stream(
self,
pd_client: Arc<impl PdClient>,
lock_backoff: impl Backoff,
retry: RetryOptions,
) -> BoxStream<'static, Result<Self::RpcResponse>> {
self.retry_response_stream(pd_client, DEFAULT_REGION_BACKOFF, lock_backoff)
self.retry_response_stream(pd_client, retry)
}
fn retry_response_stream(
mut self,
pd_client: Arc<impl PdClient>,
region_backoff: impl Backoff,
lock_backoff: impl Backoff,
retry: RetryOptions,
) -> BoxStream<'static, Result<Self::RpcResponse>> {
let stores = self.store_stream(pd_client.clone());
stores
@ -75,29 +75,22 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
})
.map_ok(move |(request, mut response)| {
if let Some(region_error) = response.region_error() {
return request.on_region_error(
region_error,
pd_client.clone(),
region_backoff.clone(),
lock_backoff.clone(),
);
return request.on_region_error(region_error, pd_client.clone(), retry.clone());
}
// Resolve locks
let locks = response.take_locks();
if !locks.is_empty() {
if retry.lock_backoff.is_none() {
return stream::once(future::err(Error::ResolveLockError)).boxed();
}
let pd_client = pd_client.clone();
let region_backoff = region_backoff.clone();
let lock_backoff = lock_backoff.clone();
let retry = retry.clone();
return resolve_locks(locks, pd_client.clone())
.map_ok(move |resolved| {
if !resolved {
request.on_resolve_lock_failed(
pd_client,
region_backoff,
lock_backoff,
)
request.on_resolve_lock_failed(pd_client, retry)
} else {
request.response_stream(pd_client, OPTIMISTIC_BACKOFF)
request.response_stream(pd_client, retry)
}
})
.try_flatten_stream()
@ -113,10 +106,9 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
self,
region_error: Error,
pd_client: Arc<impl PdClient>,
mut region_backoff: impl Backoff,
lock_backoff: impl Backoff,
mut retry: RetryOptions,
) -> BoxStream<'static, Result<Self::RpcResponse>> {
region_backoff.next_delay_duration().map_or(
retry.region_backoff.next_delay_duration().map_or(
stream::once(future::err(region_error)).boxed(),
move |delay_duration| {
let fut = async move {
@ -124,11 +116,9 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
Ok(())
};
fut.map_ok(move |_| {
self.retry_response_stream(pd_client, region_backoff, lock_backoff)
})
.try_flatten_stream()
.boxed()
fut.map_ok(move |_| self.retry_response_stream(pd_client, retry))
.try_flatten_stream()
.boxed()
},
)
}
@ -136,21 +126,18 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
fn on_resolve_lock_failed(
self,
pd_client: Arc<impl PdClient>,
region_backoff: impl Backoff,
mut lock_backoff: impl Backoff,
mut retry: RetryOptions,
) -> BoxStream<'static, Result<Self::RpcResponse>> {
lock_backoff.next_delay_duration().map_or(
retry.lock_backoff.next_delay_duration().map_or(
stream::once(future::err(Error::ResolveLockError)).boxed(),
move |delay_duration| {
let fut = async move {
futures_timer::Delay::new(delay_duration).await;
Ok(())
};
fut.map_ok(move |_| {
self.retry_response_stream(pd_client, region_backoff, lock_backoff)
})
.try_flatten_stream()
.boxed()
fut.map_ok(move |_| self.retry_response_stream(pd_client, retry))
.try_flatten_stream()
.boxed()
},
)
}
@ -176,6 +163,30 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
}
}
#[derive(Clone, Debug, new, Eq, PartialEq)]
pub struct RetryOptions {
/// How to retry when there is a region error and we need to resolve regions with PD.
pub region_backoff: Backoff,
/// How to retry when a key is locked.
pub lock_backoff: Backoff,
}
impl RetryOptions {
pub const fn default_optimistic() -> RetryOptions {
RetryOptions {
region_backoff: DEFAULT_REGION_BACKOFF,
lock_backoff: OPTIMISTIC_BACKOFF,
}
}
pub const fn default_pessimistic() -> RetryOptions {
RetryOptions {
region_backoff: DEFAULT_REGION_BACKOFF,
lock_backoff: PESSIMISTIC_BACKOFF,
}
}
}
pub fn store_stream_for_key<KeyData, PdC>(
key_data: KeyData,
pd_client: Arc<PdC>,
@ -352,9 +363,11 @@ mod test {
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box<dyn Any>),
)));
let region_backoff = NoJitterBackoff::new(1, 1, 3);
let lock_backoff = NoJitterBackoff::new(1, 1, 3);
let stream = request.retry_response_stream(pd_client, region_backoff, lock_backoff);
let retry = RetryOptions {
region_backoff: Backoff::no_jitter_backoff(1, 1, 3),
lock_backoff: Backoff::no_jitter_backoff(1, 1, 3),
};
let stream = request.retry_response_stream(pd_client, retry);
executor::block_on(async { stream.collect::<Vec<Result<MockRpcResponse>>>().await });

View File

@ -4,9 +4,9 @@ use super::{requests::new_scan_lock_request, resolve_locks};
use crate::{
config::Config,
pd::{PdClient, PdRpcClient},
request::{KvRequest, OPTIMISTIC_BACKOFF},
request::{KvRequest, RetryOptions},
timestamp::TimestampExt,
transaction::{Snapshot, Transaction, TransactionStyle},
transaction::{Snapshot, Transaction, TransactionOptions},
Result,
};
use futures::executor::ThreadPool;
@ -37,7 +37,6 @@ pub struct Client {
/// The thread pool for background tasks including committing secondary keys and failed
/// transaction cleanups.
bg_worker: ThreadPool,
config: Config,
}
impl Client {
@ -78,11 +77,7 @@ impl Client {
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let bg_worker = ThreadPool::new()?;
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, true).await?);
Ok(Client {
pd,
bg_worker,
config,
})
Ok(Client { pd, bg_worker })
}
/// Creates a new [`Transaction`](Transaction) in optimistic mode.
@ -99,19 +94,15 @@ impl Client {
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut transaction = client.begin().await.unwrap();
/// let mut transaction = client.begin_optimistic().await.unwrap();
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result = commit.await.unwrap();
/// # });
/// ```
pub async fn begin(&self) -> Result<Transaction> {
pub async fn begin_optimistic(&self) -> Result<Transaction> {
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(
timestamp,
TransactionStyle::new_optimistic(self.config.try_one_pc),
false,
))
Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
}
/// Creates a new [`Transaction`](Transaction) in pessimistic mode.
@ -133,20 +124,34 @@ impl Client {
/// ```
pub async fn begin_pessimistic(&self) -> Result<Transaction> {
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(
timestamp,
TransactionStyle::new_pessimistic(self.config.try_one_pc),
false,
))
Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
}
/// Creates a new customized [`Transaction`](Transaction).
///
/// # Examples
/// ```rust,no_run
/// use tikv_client::{Config, TransactionClient, TransactionOptions};
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut transaction = client
/// .begin_with_options(TransactionOptions::default().use_async_commit())
/// .await
/// .unwrap();
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result = commit.await.unwrap();
/// # });
/// ```
pub async fn begin_with_options(&self, options: TransactionOptions) -> Result<Transaction> {
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, options))
}
/// Creates a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp).
pub fn snapshot(&self, timestamp: Timestamp) -> Snapshot {
Snapshot::new(self.new_transaction(
timestamp,
TransactionStyle::new_optimistic(self.config.try_one_pc),
true,
))
pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot {
Snapshot::new(self.new_transaction(timestamp, options.read_only()))
}
/// Retrieves the current [`Timestamp`](Timestamp).
@ -184,8 +189,9 @@ impl Client {
safepoint.clone(),
SCAN_LOCK_BATCH_SIZE,
);
let res: Vec<kvrpcpb::LockInfo> =
req.execute(self.pd.clone(), OPTIMISTIC_BACKOFF).await?;
let res: Vec<kvrpcpb::LockInfo> = req
.execute(self.pd.clone(), RetryOptions::default_optimistic())
.await?;
if res.is_empty() {
break;
}
@ -209,18 +215,7 @@ impl Client {
Ok(res)
}
fn new_transaction(
&self,
timestamp: Timestamp,
style: TransactionStyle,
read_only: bool,
) -> Transaction {
Transaction::new(
timestamp,
self.bg_worker.clone(),
self.pd.clone(),
style,
read_only,
)
fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction {
Transaction::new(timestamp, self.bg_worker.clone(), self.pd.clone(), options)
}
}

View File

@ -1,6 +1,6 @@
use crate::{
pd::PdClient,
request::{KvRequest, OPTIMISTIC_BACKOFF},
request::{KvRequest, RetryOptions},
timestamp::TimestampExt,
transaction::requests,
Error, Key, RegionVerId, Result,
@ -54,7 +54,7 @@ pub async fn resolve_locks(
Some(&commit_version) => commit_version,
None => {
let commit_version = requests::new_cleanup_request(primary_key, lock.lock_version)
.execute(pd_client.clone(), OPTIMISTIC_BACKOFF)
.execute(pd_client.clone(), RetryOptions::default_optimistic())
.await?;
commit_versions.insert(lock.lock_version, commit_version);
commit_version
@ -95,7 +95,7 @@ async fn resolve_lock_with_retry(
}
};
match requests::new_resolve_lock_request(context, start_version, commit_version)
.execute(pd_client.clone(), OPTIMISTIC_BACKOFF)
.execute(pd_client.clone(), RetryOptions::default_optimistic())
.await
{
Ok(_) => {

View File

@ -11,8 +11,7 @@
pub use client::Client;
pub(crate) use lock::{resolve_locks, HasLocks};
pub use snapshot::Snapshot;
pub use transaction::Transaction;
use transaction::TransactionStyle;
pub use transaction::{Transaction, TransactionOptions};
mod buffer;
mod client;

View File

@ -1,9 +1,11 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
backoff::Backoff,
pd::PdClient,
request::{store_stream_for_key, store_stream_for_keys, store_stream_for_range, KvRequest},
request::{
store_stream_for_key, store_stream_for_keys, store_stream_for_range, KvRequest,
RetryOptions,
},
store::Store,
timestamp::TimestampExt,
transaction::HasLocks,
@ -201,8 +203,7 @@ impl KvRequest for kvrpcpb::ResolveLockRequest {
self,
region_error: Error,
_pd_client: Arc<impl PdClient>,
_region_backoff: impl Backoff,
_lock_backoff: impl Backoff,
_: RetryOptions,
) -> BoxStream<'static, Result<Self::RpcResponse>> {
stream::once(future::err(region_error)).boxed()
}

View File

@ -1,8 +1,9 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
backoff::Backoff,
pd::{PdClient, PdRpcClient},
request::{KvRequest, OPTIMISTIC_BACKOFF, PESSIMISTIC_BACKOFF},
request::{KvRequest, RetryOptions},
timestamp::TimestampExt,
transaction::{buffer::Buffer, requests::*},
BoundRange, Error, Key, KvPair, Result, Value,
@ -12,70 +13,7 @@ use futures::{executor::ThreadPool, prelude::*, stream::BoxStream};
use std::{iter, ops::RangeBounds, sync::Arc};
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
#[derive(PartialEq)]
enum TransactionStatus {
/// The transaction is read-only [`Snapshot`](super::Snapshot::Snapshot), no need to commit or rollback or panic on drop.
ReadOnly,
/// The transaction have not been committed or rolled back.
Active,
/// The transaction has committed.
Committed,
/// The transaction has tried to commit. Only `commit` is allowed.
StartedCommit,
/// The transaction has rolled back.
Rolledback,
/// The transaction has tried to rollback. Only `rollback` is allowed.
StartedRollback,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum TransactionKind {
Optimistic,
/// Argument is for_update_ts
Pessimistic(u64),
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct TransactionStyle {
kind: TransactionKind,
try_one_pc: bool,
async_commit: bool,
}
impl TransactionStyle {
pub fn new_optimistic(try_one_pc: bool) -> TransactionStyle {
TransactionStyle {
kind: TransactionKind::Optimistic,
try_one_pc,
async_commit: false,
}
}
pub fn new_pessimistic(try_one_pc: bool) -> TransactionStyle {
TransactionStyle {
kind: TransactionKind::Pessimistic(0),
try_one_pc,
async_commit: false,
}
}
pub fn async_commit(mut self) -> TransactionStyle {
self.async_commit = true;
self
}
fn push_for_update_ts(&mut self, for_update_ts: u64) {
match &mut self.kind {
TransactionKind::Optimistic => unreachable!(),
TransactionKind::Pessimistic(old_for_update_ts) => {
self.kind =
TransactionKind::Pessimistic(std::cmp::max(*old_for_update_ts, for_update_ts));
}
}
}
}
/// A undo-able set of actions on the dataset.
/// An undo-able set of actions on the dataset.
///
/// Using a transaction you can prepare a set of actions (such as `get`, or `put`) on data at a
/// particular timestamp called `start_ts` obtained from the placement driver.
@ -101,7 +39,7 @@ impl TransactionStyle {
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let txn = client.begin().await.unwrap();
/// let txn = client.begin_optimistic().await.unwrap();
/// # });
/// ```
pub struct Transaction {
@ -110,7 +48,7 @@ pub struct Transaction {
buffer: Buffer,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
style: TransactionStyle,
options: TransactionOptions,
}
impl Transaction {
@ -118,10 +56,9 @@ impl Transaction {
timestamp: Timestamp,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
style: TransactionStyle,
read_only: bool,
options: TransactionOptions,
) -> Transaction {
let status = if read_only {
let status = if options.read_only {
TransactionStatus::ReadOnly
} else {
TransactionStatus::Active
@ -132,7 +69,7 @@ impl Transaction {
buffer: Default::default(),
bg_worker,
rpc,
style,
options,
}
}
@ -149,7 +86,7 @@ impl Transaction {
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key = "TiKV".to_owned();
/// let result: Option<Value> = txn.get(key).await.unwrap();
/// // Finish the transaction...
@ -162,7 +99,7 @@ impl Transaction {
self.buffer
.get_or_else(key, |key| {
new_mvcc_get_request(key, self.timestamp.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
})
.await
}
@ -196,7 +133,7 @@ impl Transaction {
self.buffer
.get_or_else(key, |key| {
new_mvcc_get_request(key, self.timestamp.clone())
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
})
.await
}
@ -216,7 +153,7 @@ impl Transaction {
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let result: HashMap<Key, Value> = txn
/// .batch_get(keys)
@ -237,7 +174,8 @@ impl Transaction {
let rpc = self.rpc.clone();
self.buffer
.batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| {
new_mvcc_get_batch_request(keys, timestamp).execute(rpc, OPTIMISTIC_BACKOFF)
new_mvcc_get_batch_request(keys, timestamp)
.execute(rpc, RetryOptions::default_optimistic())
})
.await
}
@ -297,7 +235,7 @@ impl Transaction {
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key1: Key = b"TiKV".to_vec().into();
/// let key2: Key = b"TiDB".to_vec().into();
/// let result: Vec<KvPair> = txn
@ -331,7 +269,7 @@ impl Transaction {
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key1: Key = b"TiKV".to_vec().into();
/// let key2: Key = b"TiDB".to_vec().into();
/// let result: Vec<Key> = txn
@ -369,7 +307,7 @@ impl Transaction {
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key = "TiKV".to_owned();
/// let val = "TiKV".to_owned();
/// txn.put(key, val);
@ -397,7 +335,7 @@ impl Transaction {
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key = "TiKV".to_owned();
/// txn.delete(key);
/// // Finish the transaction...
@ -429,7 +367,7 @@ impl Transaction {
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
/// // ... Do some actions.
/// txn.commit().await.unwrap();
@ -451,7 +389,7 @@ impl Transaction {
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// // ... Do some actions.
/// let req = txn.commit();
/// let result: u64 = req.await.unwrap();
@ -466,12 +404,12 @@ impl Transaction {
}
self.status = TransactionStatus::StartedCommit;
let res = TwoPhaseCommitter::new(
let res = Committer::new(
self.buffer.to_proto_mutations().await,
self.timestamp.version(),
self.bg_worker.clone(),
self.rpc.clone(),
self.style,
self.options.clone(),
)
.commit()
.await;
@ -494,12 +432,12 @@ impl Transaction {
}
self.status = TransactionStatus::StartedRollback;
let res = TwoPhaseCommitter::new(
let res = Committer::new(
self.buffer.to_proto_mutations().await,
self.timestamp.version(),
self.bg_worker.clone(),
self.rpc.clone(),
self.style,
self.options.clone(),
)
.rollback()
.await;
@ -523,7 +461,7 @@ impl Transaction {
self.buffer
.scan_and_fetch(range.into(), limit, move |new_range, new_limit| {
new_mvcc_scan_request(new_range, timestamp, new_limit, key_only)
.execute(rpc, OPTIMISTIC_BACKOFF)
.execute(rpc, RetryOptions::default_optimistic())
})
.await
}
@ -539,7 +477,7 @@ impl Transaction {
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<()> {
assert!(
matches!(self.style.kind, TransactionKind::Pessimistic(_)),
matches!(self.options.kind, TransactionKind::Pessimistic(_)),
"`pessimistic_lock` is only valid to use with pessimistic transactions"
);
@ -552,7 +490,7 @@ impl Transaction {
let primary_lock = keys[0].clone();
let lock_ttl = DEFAULT_LOCK_TTL;
let for_update_ts = self.rpc.clone().get_timestamp().await.unwrap().version();
self.style.push_for_update_ts(for_update_ts);
self.options.push_for_update_ts(for_update_ts);
new_pessimistic_lock_request(
keys,
primary_lock.into(),
@ -560,7 +498,7 @@ impl Transaction {
lock_ttl,
for_update_ts,
)
.execute(self.rpc.clone(), PESSIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_pessimistic())
.await
}
@ -576,11 +514,114 @@ impl Transaction {
}
fn is_pessimistic(&self) -> bool {
matches!(self.style.kind, TransactionKind::Pessimistic(_))
matches!(self.options.kind, TransactionKind::Pessimistic(_))
}
}
impl Drop for Transaction {
fn drop(&mut self) {
if self.status == TransactionStatus::Active {
panic!("Dropping an active transaction. Consider commit or rollback it.")
}
}
}
/// Optimistic or pessimistic transaction.
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum TransactionKind {
Optimistic,
/// Argument is for_update_ts
Pessimistic(u64),
}
/// Options for configuring a transaction.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct TransactionOptions {
/// Optimistic or pessimistic (default) transaction.
kind: TransactionKind,
/// Try using 1pc rather than 2pc (default is to always use 2pc).
try_one_pc: bool,
/// Try to use async commit (default is not to).
async_commit: bool,
/// Is the transaction read only? (Default is no).
read_only: bool,
/// How to retry in the event of certain errors.
retry_options: RetryOptions,
}
impl Default for TransactionOptions {
fn default() -> TransactionOptions {
Self::new_pessimistic()
}
}
impl TransactionOptions {
/// Default options for an optimistic transaction.
pub fn new_optimistic() -> TransactionOptions {
TransactionOptions {
kind: TransactionKind::Optimistic,
try_one_pc: false,
async_commit: false,
read_only: false,
retry_options: RetryOptions::default_optimistic(),
}
}
pub fn use_async_commit(&mut self) {
self.style = self.style.async_commit();
/// Default options for a pessimistic transaction.
pub fn new_pessimistic() -> TransactionOptions {
TransactionOptions {
kind: TransactionKind::Pessimistic(0),
try_one_pc: false,
async_commit: false,
read_only: false,
retry_options: RetryOptions::default_pessimistic(),
}
}
/// Try to use async commit.
pub fn use_async_commit(mut self) -> TransactionOptions {
self.async_commit = true;
self
}
/// Try to use 1pc.
pub fn try_one_pc(mut self) -> TransactionOptions {
self.try_one_pc = true;
self
}
/// Make the transaction read only.
pub fn read_only(mut self) -> TransactionOptions {
self.read_only = true;
self
}
/// Don't automatically resolve locks and retry if keys are locked.
pub fn no_resolve_locks(mut self) -> TransactionOptions {
self.retry_options.lock_backoff = Backoff::no_backoff();
self
}
/// Don't automatically resolve regions with PD if we have outdated region information.
pub fn no_resolve_regions(mut self) -> TransactionOptions {
self.retry_options.region_backoff = Backoff::no_backoff();
self
}
/// Set RetryOptions.
pub fn retry_options(mut self, options: RetryOptions) -> TransactionOptions {
self.retry_options = options;
self
}
fn push_for_update_ts(&mut self, for_update_ts: u64) {
match &mut self.kind {
TransactionKind::Optimistic => unreachable!(),
TransactionKind::Pessimistic(old_for_update_ts) => {
self.kind =
TransactionKind::Pessimistic(std::cmp::max(*old_for_update_ts, for_update_ts));
}
}
}
}
@ -595,17 +636,17 @@ const DEFAULT_LOCK_TTL: u64 = 3000;
///
/// The committer implements `prewrite`, `commit` and `rollback` functions.
#[derive(new)]
struct TwoPhaseCommitter {
struct Committer {
mutations: Vec<kvrpcpb::Mutation>,
start_version: u64,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
style: TransactionStyle,
options: TransactionOptions,
#[new(default)]
undetermined: bool,
}
impl TwoPhaseCommitter {
impl Committer {
async fn commit(mut self) -> Result<u64> {
if self.mutations.is_empty() {
return Ok(0);
@ -614,11 +655,11 @@ impl TwoPhaseCommitter {
let min_commit_ts = self.prewrite().await?;
// If we didn't use 1pc, prewrite will set `try_one_pc` to false.
if self.style.try_one_pc {
if self.options.try_one_pc {
return Ok(min_commit_ts);
}
let commit_ts = if self.style.async_commit {
let commit_ts = if self.options.async_commit {
assert_ne!(min_commit_ts, 0);
min_commit_ts
} else {
@ -647,7 +688,7 @@ impl TwoPhaseCommitter {
let primary_lock = self.mutations[0].key.clone().into();
// TODO: calculate TTL for big transactions
let lock_ttl = DEFAULT_LOCK_TTL;
let mut request = match self.style.kind {
let mut request = match self.options.kind {
TransactionKind::Optimistic => new_prewrite_request(
self.mutations.clone(),
primary_lock,
@ -663,25 +704,16 @@ impl TwoPhaseCommitter {
),
};
request.use_async_commit = self.style.async_commit;
request.try_one_pc = self.style.try_one_pc;
request.use_async_commit = self.options.async_commit;
request.try_one_pc = self.options.try_one_pc;
request.secondaries = self.mutations[1..].iter().map(|m| m.key.clone()).collect();
// FIXME set max_commit_ts and min_commit_ts
let response = match self.style.kind {
TransactionKind::Optimistic => {
request
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.await?
}
TransactionKind::Pessimistic(_) => {
request
.execute(self.rpc.clone(), PESSIMISTIC_BACKOFF)
.await?
}
};
let response = request
.execute(self.rpc.clone(), self.options.retry_options.clone())
.await?;
if self.style.try_one_pc && response.len() == 1 {
if self.options.try_one_pc && response.len() == 1 {
if response[0].one_pc_commit_ts == 0 {
return Err(Error::OnePcFailure);
}
@ -689,7 +721,7 @@ impl TwoPhaseCommitter {
return Ok(response[0].one_pc_commit_ts);
}
self.style.try_one_pc = false;
self.options.try_one_pc = false;
let min_commit_ts = response
.iter()
@ -708,7 +740,7 @@ impl TwoPhaseCommitter {
let primary_key = vec![self.mutations[0].key.clone().into()];
let commit_version = self.rpc.clone().get_timestamp().await?.version();
new_commit_request(primary_key, self.start_version, commit_version)
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.inspect_err(|e| {
// We don't know whether the transaction is committed or not if we fail to receive
// the response. Then, we mark the transaction as undetermined and propagate the
@ -727,7 +759,7 @@ impl TwoPhaseCommitter {
let mutations = self.mutations.into_iter();
// Only skip the primary if we committed it earlier (i.e., we are not using async commit).
let keys = if self.style.async_commit {
let keys = if self.options.async_commit {
mutations.skip(0)
} else {
if primary_only {
@ -738,7 +770,7 @@ impl TwoPhaseCommitter {
.map(|mutation| mutation.key.into())
.collect();
new_commit_request(keys, self.start_version, commit_version)
.execute(self.rpc.clone(), OPTIMISTIC_BACKOFF)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -748,26 +780,34 @@ impl TwoPhaseCommitter {
.into_iter()
.map(|mutation| mutation.key.into())
.collect();
match self.style.kind {
match self.options.kind {
TransactionKind::Optimistic if keys.is_empty() => Ok(()),
TransactionKind::Optimistic => {
new_batch_rollback_request(keys, self.start_version)
.execute(self.rpc, OPTIMISTIC_BACKOFF)
.execute(self.rpc, RetryOptions::default_optimistic())
.await
}
TransactionKind::Pessimistic(for_update_ts) => {
new_pessimistic_rollback_request(keys, self.start_version, for_update_ts)
.execute(self.rpc, OPTIMISTIC_BACKOFF)
.execute(self.rpc, RetryOptions::default_optimistic())
.await
}
}
}
}
impl Drop for Transaction {
fn drop(&mut self) {
if self.status == TransactionStatus::Active {
panic!("Dropping an active transaction. Consider commit or rollback it.")
}
}
#[derive(PartialEq)]
enum TransactionStatus {
/// The transaction is read-only [`Snapshot`](super::Snapshot::Snapshot), no need to commit or rollback or panic on drop.
ReadOnly,
/// The transaction have not been committed or rolled back.
Active,
/// The transaction has committed.
Committed,
/// The transaction has tried to commit. Only `commit` is allowed.
StartedCommit,
/// The transaction has rolled back.
Rolledback,
/// The transaction has tried to rollback. Only `rollback` is allowed.
StartedRollback,
}

View File

@ -9,7 +9,8 @@ use std::{
env, iter,
};
use tikv_client::{
ColumnFamily, Config, Key, KvPair, RawClient, Result, Transaction, TransactionClient, Value,
ColumnFamily, Key, KvPair, RawClient, Result, Transaction, TransactionClient,
TransactionOptions, Value,
};
// Parameters used in test
@ -55,7 +56,7 @@ async fn crud() -> Result<()> {
clear_tikv().await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
// Get non-existent keys
assert!(txn.get("foo".to_owned()).await?.is_none());
@ -91,7 +92,7 @@ async fn crud() -> Result<()> {
txn.commit().await?;
// Read from TiKV then update and delete
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
assert_eq!(
txn.get("foo".to_owned()).await?,
Some("bar".to_owned().into())
@ -114,7 +115,10 @@ async fn crud() -> Result<()> {
txn.commit().await?;
// Read again from TiKV
let snapshot = client.snapshot(client.current_timestamp().await?);
let snapshot = client.snapshot(
client.current_timestamp().await?,
TransactionOptions::default(),
);
let batch_get_res: HashMap<Key, Value> = snapshot
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
@ -199,13 +203,13 @@ async fn txn_write_million() -> Result<()> {
.map(|u| u.to_be_bytes().to_vec())
.take(2usize.pow(NUM_BITS_KEY_PER_TXN))
.collect::<Vec<_>>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
for (k, v) in keys.iter().zip(iter::repeat(1u32.to_be_bytes().to_vec())) {
txn.put(k.clone(), v).await?;
}
txn.commit().await?;
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
let res = txn.batch_get(keys).await?;
assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
txn.commit().await?;
@ -213,7 +217,10 @@ async fn txn_write_million() -> Result<()> {
// test scan
let limit = 2u32.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN + 2); // large enough
let snapshot = client.snapshot(client.current_timestamp().await?);
let snapshot = client.snapshot(
client.current_timestamp().await?,
TransactionOptions::default(),
);
let res = snapshot.scan(vec![].., limit).await?;
assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN));
@ -228,7 +235,10 @@ async fn txn_write_million() -> Result<()> {
let mut sum = 0;
// empty key to key[0]
let snapshot = client.snapshot(client.current_timestamp().await?);
let snapshot = client.snapshot(
client.current_timestamp().await?,
TransactionOptions::default(),
);
let res = snapshot.scan(vec![]..keys[0].clone(), limit).await?;
sum += res.count();
@ -253,12 +263,13 @@ async fn txn_write_million() -> Result<()> {
#[serial]
async fn txn_bank_transfer() -> Result<()> {
clear_tikv().await?;
let config = Config::default().try_one_pc();
let client = TransactionClient::new_with_config(pd_addrs(), config).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut rng = thread_rng();
let people = gen_u32_keys(NUM_PEOPLE, &mut rng);
let mut txn = client.begin().await?;
let mut txn = client
.begin_with_options(TransactionOptions::new_optimistic().try_one_pc())
.await?;
let mut sum: u32 = 0;
for person in &people {
let init = rng.gen::<u8>() as u32;
@ -269,8 +280,9 @@ async fn txn_bank_transfer() -> Result<()> {
// transfer
for _ in 0..NUM_TRNASFER {
let mut txn = client.begin().await?;
txn.use_async_commit();
let mut txn = client
.begin_with_options(TransactionOptions::new_optimistic().use_async_commit())
.await?;
let chosen_people = people.iter().choose_multiple(&mut rng, 2);
let alice = chosen_people[0];
let mut alice_balance = get_txn_u32(&txn, alice.clone()).await?;
@ -291,7 +303,7 @@ async fn txn_bank_transfer() -> Result<()> {
// check
let mut new_sum = 0;
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
for person in people.iter() {
new_sum += get_txn_u32(&txn, person.clone()).await?;
}