Merge branch 'fix-txn-bank-transfer' of github.com:ekexium/client-rust into fix-txn-bank-transfer

This commit is contained in:
ekexium 2021-01-12 10:14:06 +08:00
commit 121352e173
9 changed files with 139 additions and 87 deletions

View File

@ -5,7 +5,7 @@
mod common;
use crate::common::parse_args;
use tikv_client::{Config, Key, KvPair, RawClient as Client, Result, ToOwnedRange, Value};
use tikv_client::{Config, IntoOwnedRange, Key, KvPair, RawClient as Client, Result, Value};
const KEY: &str = "TiKV";
const VALUE: &str = "Rust";
@ -86,7 +86,7 @@ async fn main() -> Result<()> {
let start = "k1";
let end = "k2";
let pairs = client
.scan((start..=end).to_owned(), 10)
.scan((start..=end).into_owned(), 10)
.await
.expect("Could not scan");

View File

@ -83,41 +83,41 @@ impl BoundRange {
///
/// # Examples
/// ```rust
/// use tikv_client::{BoundRange, Key, ToOwnedRange};
/// use tikv_client::{BoundRange, Key, IntoOwnedRange};
/// // Exclusive
/// let range = "a".."z";
/// assert_eq!(
/// BoundRange::from(range.to_owned()).into_keys(),
/// BoundRange::from(range.into_owned()).into_keys(),
/// (Key::from("a".to_owned()), Some(Key::from("z".to_owned()))),
/// );
/// // Inclusive
/// let range = "a"..="z";
/// assert_eq!(
/// BoundRange::from(range.to_owned()).into_keys(),
/// BoundRange::from(range.into_owned()).into_keys(),
/// (Key::from("a".to_owned()), Some(Key::from("z\0".to_owned()))),
/// );
/// // Open right
/// let range = "a".to_owned()..;
/// assert_eq!(
/// BoundRange::from(range.to_owned()).into_keys(),
/// BoundRange::from(range).into_keys(),
/// (Key::from("a".to_owned()), None),
/// );
/// // Left open right exclusive
/// let range = .."z";
/// assert_eq!(
/// BoundRange::from(range.to_owned()).into_keys(),
/// BoundRange::from(range.into_owned()).into_keys(),
/// (Key::from("".to_owned()), Some(Key::from("z".to_owned()))),
/// );
/// // Left open right inclusive
/// let range = ..="z";
/// assert_eq!(
/// BoundRange::from(range.to_owned()).into_keys(),
/// BoundRange::from(range.into_owned()).into_keys(),
/// (Key::from("".to_owned()), Some(Key::from("z\0".to_owned()))),
/// );
/// // Full range
/// let range = ..;
/// assert_eq!(
/// BoundRange::from(range.to_owned()).into_keys(),
/// BoundRange::from(range).into_keys(),
/// (Key::from("".to_owned()), None),
/// );
// ```
@ -249,9 +249,9 @@ impl<T: Into<Key> + Eq> From<(Bound<T>, Bound<T>)> for BoundRange {
}
}
impl Into<kvrpcpb::KeyRange> for BoundRange {
fn into(self) -> kvrpcpb::KeyRange {
let (start, end) = self.into_keys();
impl From<BoundRange> for kvrpcpb::KeyRange {
fn from(bound_range: BoundRange) -> Self {
let (start, end) = bound_range.into_keys();
let mut range = kvrpcpb::KeyRange::default();
range.set_start_key(start.into());
range.set_end_key(end.unwrap_or_default().into());
@ -271,36 +271,36 @@ impl From<kvrpcpb::KeyRange> for BoundRange {
///
/// # Examples
/// ```rust
/// # use tikv_client::{ToOwnedRange, BoundRange};
/// # use tikv_client::{IntoOwnedRange, BoundRange};
/// # use std::ops::*;
/// let r1: Range<&str> = "s".."e";
/// let r1: BoundRange = r1.to_owned();
/// let r1: BoundRange = r1.into_owned();
///
/// let r2: RangeFrom<&str> = "start"..;
/// let r2: BoundRange = r2.to_owned();
/// let r2: BoundRange = r2.into_owned();
///
/// let r3: RangeInclusive<&str> = "s"..="e";
/// let r3: BoundRange = r3.to_owned();
/// let r3: BoundRange = r3.into_owned();
///
/// let r4: RangeTo<&str> = .."z";
/// let r4: BoundRange = r4.to_owned();
/// let r4: BoundRange = r4.into_owned();
///
/// let k1: Vec<u8> = "start".to_owned().into_bytes();
/// let k2: Vec<u8> = "end".to_owned().into_bytes();
/// let r4: BoundRange = (&k1, &k2).to_owned();
/// let r5: BoundRange = (&k1, None).to_owned();
/// let r6: BoundRange = (&k1, Some(&k2)).to_owned();
/// let r4: BoundRange = (&k1, &k2).into_owned();
/// let r5: BoundRange = (&k1, None).into_owned();
/// let r6: BoundRange = (&k1, Some(&k2)).into_owned();
/// ```
pub trait ToOwnedRange {
pub trait IntoOwnedRange {
/// Transform a borrowed range of some form into an owned `BoundRange`.
fn to_owned(self) -> BoundRange;
fn into_owned(self) -> BoundRange;
}
#[test]
fn test_to_owned() {}
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> ToOwnedRange for Range<&U> {
fn to_owned(self) -> BoundRange {
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> IntoOwnedRange for Range<&U> {
fn into_owned(self) -> BoundRange {
From::from(Range {
start: self.start.to_owned(),
end: self.end.to_owned(),
@ -308,53 +308,55 @@ impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> ToOwnedRange for
}
}
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> ToOwnedRange for RangeFrom<&U> {
fn to_owned(self) -> BoundRange {
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> IntoOwnedRange for RangeFrom<&U> {
fn into_owned(self) -> BoundRange {
From::from(RangeFrom {
start: self.start.to_owned(),
})
}
}
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> ToOwnedRange for RangeTo<&U> {
fn to_owned(self) -> BoundRange {
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> IntoOwnedRange for RangeTo<&U> {
fn into_owned(self) -> BoundRange {
From::from(RangeTo {
end: self.end.to_owned(),
})
}
}
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> ToOwnedRange for RangeInclusive<&U> {
fn to_owned(self) -> BoundRange {
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> IntoOwnedRange
for RangeInclusive<&U>
{
fn into_owned(self) -> BoundRange {
let (from, to) = self.into_inner();
From::from(RangeInclusive::new(from.to_owned(), to.to_owned()))
}
}
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> ToOwnedRange
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> IntoOwnedRange
for RangeToInclusive<&U>
{
fn to_owned(self) -> BoundRange {
fn into_owned(self) -> BoundRange {
From::from(RangeToInclusive {
end: self.end.to_owned(),
})
}
}
impl ToOwnedRange for RangeFull {
fn to_owned(self) -> BoundRange {
impl IntoOwnedRange for RangeFull {
fn into_owned(self) -> BoundRange {
From::from(self)
}
}
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> ToOwnedRange for (&U, Option<&U>) {
fn to_owned(self) -> BoundRange {
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> IntoOwnedRange for (&U, Option<&U>) {
fn into_owned(self) -> BoundRange {
From::from((self.0.to_owned(), self.1.map(|u| u.to_owned())))
}
}
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> ToOwnedRange for (&U, &U) {
fn to_owned(self) -> BoundRange {
impl<T: Into<Key> + Borrow<U>, U: ToOwned<Owned = T> + ?Sized> IntoOwnedRange for (&U, &U) {
fn into_owned(self) -> BoundRange {
From::from((self.0.to_owned(), self.1.to_owned()))
}
}

View File

@ -140,15 +140,15 @@ impl From<String> for Key {
}
}
impl Into<Vec<u8>> for Key {
fn into(self) -> Vec<u8> {
self.0
impl From<Key> for Vec<u8> {
fn from(key: Key) -> Self {
key.0
}
}
impl<'a> Into<&'a [u8]> for &'a Key {
fn into(self) -> &'a [u8] {
&self.0
impl<'a> From<&'a Key> for &'a [u8] {
fn from(key: &'a Key) -> Self {
&key.0
}
}

View File

@ -90,9 +90,9 @@ where
}
}
impl Into<(Key, Value)> for KvPair {
fn into(self) -> (Key, Value) {
(self.0, self.1)
impl From<KvPair> for (Key, Value) {
fn from(pair: KvPair) -> Self {
(pair.0, pair.1)
}
}
@ -102,10 +102,10 @@ impl From<kvrpcpb::KvPair> for KvPair {
}
}
impl Into<kvrpcpb::KvPair> for KvPair {
fn into(self) -> kvrpcpb::KvPair {
impl From<KvPair> for kvrpcpb::KvPair {
fn from(pair: KvPair) -> Self {
let mut result = kvrpcpb::KvPair::default();
let (key, value) = self.into();
let (key, value) = pair.into();
result.set_key(key.into());
result.set_value(value);
result

View File

@ -7,7 +7,7 @@ mod key;
mod kvpair;
mod value;
pub use bound_range::{BoundRange, ToOwnedRange};
pub use bound_range::{BoundRange, IntoOwnedRange};
pub use key::Key;
pub use kvpair::KvPair;
pub use value::Value;

View File

@ -101,7 +101,7 @@ extern crate log;
#[doc(inline)]
pub use crate::backoff::Backoff;
#[doc(inline)]
pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value};
pub use crate::kv::{BoundRange, IntoOwnedRange, Key, KvPair, Value};
#[doc(inline)]
pub use crate::raw::{Client as RawClient, ColumnFamily};
#[doc(inline)]

View File

@ -170,7 +170,7 @@ impl Client {
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Result, KvPair, Key, Value, Config, RawClient, ToOwnedRange};
/// # use tikv_client::{Result, KvPair, Key, Value, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
@ -242,12 +242,12 @@ impl Client {
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Key, Config, RawClient, ToOwnedRange};
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = client.delete_range(inclusive_range.to_owned());
/// let req = client.delete_range(inclusive_range.into_owned());
/// let result: () = req.await.unwrap();
/// # });
/// ```
@ -267,12 +267,12 @@ impl Client {
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{KvPair, Config, RawClient, ToOwnedRange};
/// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = client.scan(inclusive_range.to_owned(), 2);
/// let req = client.scan(inclusive_range.into_owned(), 2);
/// let result: Vec<KvPair> = req.await.unwrap();
/// # });
/// ```
@ -290,12 +290,12 @@ impl Client {
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Key, Config, RawClient, ToOwnedRange};
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = client.scan_keys(inclusive_range.to_owned(), 2);
/// let req = client.scan_keys(inclusive_range.into_owned(), 2);
/// let result: Vec<Key> = req.await.unwrap();
/// # });
/// ```
@ -320,13 +320,13 @@ impl Client {
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Key, Config, RawClient, ToOwnedRange};
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range1 = "TiDB"..="TiKV";
/// let inclusive_range2 = "TiKV"..="TiSpark";
/// let iterable = vec![inclusive_range1.to_owned(), inclusive_range2.to_owned()];
/// let iterable = vec![inclusive_range1.into_owned(), inclusive_range2.into_owned()];
/// let req = client.batch_scan(iterable, 2);
/// let result = req.await;
/// # });
@ -351,13 +351,13 @@ impl Client {
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Key, Config, RawClient, ToOwnedRange};
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range1 = "TiDB"..="TiKV";
/// let inclusive_range2 = "TiKV"..="TiSpark";
/// let iterable = vec![inclusive_range1.to_owned(), inclusive_range2.to_owned()];
/// let iterable = vec![inclusive_range1.into_owned(), inclusive_range2.into_owned()];
/// let req = client.batch_scan(iterable, 2);
/// let result = req.await;
/// # });

View File

@ -8,13 +8,42 @@ use std::{
use tikv_client_proto::kvrpcpb;
use tokio::sync::Mutex;
#[derive(Default)]
struct Mutations {
primary_key: Option<Key>,
key_mutation_map: BTreeMap<Key, Mutation>,
}
impl Mutations {
fn insert(&mut self, key: impl Into<Key>, mutation: Mutation) {
let key = key.into();
if !matches!(mutation, Mutation::Cached(_)) {
self.primary_key.get_or_insert_with(|| key.clone());
}
self.key_mutation_map.insert(key, mutation);
}
pub fn get_primary_key_or(&mut self, key: &Key) -> &Key {
self.primary_key.get_or_insert(key.clone())
}
}
/// A caching layer which buffers reads and writes in a transaction.
#[derive(Default)]
pub struct Buffer {
mutations: Mutex<BTreeMap<Key, Mutation>>,
mutations: Mutex<Mutations>,
}
impl Buffer {
/// Get the primary key of the buffer.
pub async fn get_primary_key(&self) -> Option<Key> {
self.mutations.lock().await.primary_key.clone()
}
/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
self.mutations.lock().await.get_primary_key_or(key).clone()
}
/// Get a value from the buffer. If the value is not present, run `f` to get
/// the value.
pub async fn get_or_else<F, Fut>(&self, key: Key, f: F) -> Result<Option<Value>>
@ -54,6 +83,7 @@ impl Buffer {
) = keys
.map(|key| {
let value = mutations
.key_mutation_map
.get(&key)
.map(Mutation::get_value)
.unwrap_or(MutationValue::Undetermined);
@ -92,7 +122,7 @@ impl Buffer {
{
// read from local buffer
let mut mutations = self.mutations.lock().await;
let mutation_range = mutations.range(range.clone());
let mutation_range = mutations.key_mutation_map.range(range.clone());
// fetch from TiKV
// fetch more entries because some of them may be deleted.
@ -137,8 +167,10 @@ impl Buffer {
/// Lock the given key if necessary.
pub async fn lock(&self, key: Key) {
let mut mutations = self.mutations.lock().await;
let mutations = &mut self.mutations.lock().await;
mutations.primary_key.get_or_insert(key.clone());
let value = mutations
.key_mutation_map
.entry(key)
// Mutated keys don't need a lock.
.or_insert(Mutation::Lock);
@ -166,6 +198,7 @@ impl Buffer {
self.mutations
.lock()
.await
.key_mutation_map
.iter()
.filter_map(|(key, mutation)| mutation.to_proto_with_key(key))
.collect()
@ -175,6 +208,7 @@ impl Buffer {
self.mutations
.lock()
.await
.key_mutation_map
.get(&key)
.map(Mutation::get_value)
.unwrap_or(MutationValue::Undetermined)
@ -269,7 +303,10 @@ mod tests {
))
.unwrap()
.collect::<Vec<_>>(),
vec![KvPair(Key::from(b"key1".to_vec()), b"value".to_vec()),]
vec![KvPair(
Key::from(b"key1".to_vec()),
Value::from(b"value".to_vec()),
),]
);
}

View File

@ -404,8 +404,11 @@ impl Transaction {
}
self.status = TransactionStatus::StartedCommit;
let primary_key = self.buffer.get_primary_key().await;
let mutations = self.buffer.to_proto_mutations().await;
let res = Committer::new(
self.buffer.to_proto_mutations().await,
primary_key,
mutations,
self.timestamp.version(),
self.bg_worker.clone(),
self.rpc.clone(),
@ -432,8 +435,11 @@ impl Transaction {
}
self.status = TransactionStatus::StartedRollback;
let primary_key = self.buffer.get_primary_key().await;
let mutations = self.buffer.to_proto_mutations().await;
let res = Committer::new(
self.buffer.to_proto_mutations().await,
primary_key,
mutations,
self.timestamp.version(),
self.bg_worker.clone(),
self.rpc.clone(),
@ -468,7 +474,7 @@ impl Transaction {
/// Pessimistically lock the keys.
///
/// Once resovled it acquires a lock on the key in TiKV.
/// Once resolved it acquires a lock on the key in TiKV.
/// The lock prevents other transactions from mutating the entry until it is released.
///
/// Only valid for pessimistic transactions, panics if called on an optimistic transaction.
@ -481,19 +487,19 @@ impl Transaction {
"`pessimistic_lock` is only valid to use with pessimistic transactions"
);
let mut keys: Vec<Vec<u8>> = keys
let keys: Vec<Vec<u8>> = keys
.into_iter()
.map(|it| it.into())
.map(|it: Key| it.into())
.collect();
keys.sort();
let primary_lock = keys[0].clone();
let first_key = keys[0].clone();
let primary_lock = self.buffer.get_primary_key_or(&first_key.into()).await;
let lock_ttl = DEFAULT_LOCK_TTL;
let for_update_ts = self.rpc.clone().get_timestamp().await.unwrap().version();
self.options.push_for_update_ts(for_update_ts);
new_pessimistic_lock_request(
keys,
primary_lock.into(),
primary_lock,
self.timestamp.version(),
lock_ttl,
for_update_ts,
@ -662,6 +668,7 @@ const DEFAULT_LOCK_TTL: u64 = 3000;
/// The committer implements `prewrite`, `commit` and `rollback` functions.
#[derive(new)]
struct Committer {
primary_key: Option<Key>,
mutations: Vec<kvrpcpb::Mutation>,
start_version: u64,
bg_worker: ThreadPool,
@ -674,6 +681,7 @@ struct Committer {
impl Committer {
async fn commit(mut self) -> Result<u64> {
if self.mutations.is_empty() {
assert!(self.primary_key.is_none());
return Ok(0);
}
@ -710,7 +718,7 @@ impl Committer {
}
async fn prewrite(&mut self) -> Result<u64> {
let primary_lock = self.mutations[0].key.clone().into();
let primary_lock = self.primary_key.clone().unwrap();
// TODO: calculate TTL for big transactions
let lock_ttl = DEFAULT_LOCK_TTL;
let mut request = match self.options.kind {
@ -731,7 +739,12 @@ impl Committer {
request.use_async_commit = self.options.async_commit;
request.try_one_pc = self.options.try_one_pc;
request.secondaries = self.mutations[1..].iter().map(|m| m.key.clone()).collect();
request.secondaries = self
.mutations
.iter()
.filter(|m| self.primary_key.as_ref().unwrap() != m.key.as_ref())
.map(|m| m.key.clone())
.collect();
// FIXME set max_commit_ts and min_commit_ts
let response = request
@ -762,7 +775,7 @@ impl Committer {
/// Commits the primary key and returns the commit version
async fn commit_primary(&mut self) -> Result<u64> {
let primary_key = vec![self.mutations[0].key.clone().into()];
let primary_key = vec![self.primary_key.clone().unwrap()];
let commit_version = self.rpc.clone().get_timestamp().await?.version();
new_commit_request(primary_key, self.start_version, commit_version)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
@ -783,17 +796,17 @@ impl Committer {
let primary_only = self.mutations.len() == 1;
let mutations = self.mutations.into_iter();
// Only skip the primary if we committed it earlier (i.e., we are not using async commit).
let keys = if self.options.async_commit {
mutations.skip(0)
let keys: Vec<Key> = if self.options.async_commit {
mutations.map(|m| m.key.into()).collect()
} else if primary_only {
return Ok(());
} else {
if primary_only {
return Ok(());
}
mutations.skip(1)
}
.map(|mutation| mutation.key.into())
.collect();
let primary_key = self.primary_key.unwrap();
mutations
.map(|m| m.key.into())
.filter(|key| &primary_key != key)
.collect()
};
new_commit_request(keys, self.start_version, commit_version)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await