Add request ctor functions taking high-level types

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2021-01-21 13:48:28 +13:00
parent 71dd80e87d
commit 2baddc6158
12 changed files with 388 additions and 163 deletions

View File

@ -103,14 +103,14 @@ pub use crate::backoff::Backoff;
#[doc(inline)]
pub use crate::kv::{BoundRange, IntoOwnedRange, Key, KvPair, Value};
#[doc(inline)]
pub use crate::raw::{Client as RawClient, ColumnFamily};
pub use crate::raw::{lowering::*, Client as RawClient, ColumnFamily};
#[doc(inline)]
pub use crate::request::RetryOptions;
#[doc(inline)]
pub use crate::timestamp::{Timestamp, TimestampExt};
#[doc(inline)]
pub use crate::transaction::{
CheckLevel, Client as TransactionClient, Snapshot, Transaction, TransactionOptions,
lowering::*, CheckLevel, Client as TransactionClient, Snapshot, Transaction, TransactionOptions,
};
#[doc(inline)]
pub use config::Config;

View File

@ -2,10 +2,10 @@
use tikv_client_common::Error;
use super::requests;
use crate::{
config::Config,
pd::PdRpcClient,
raw::lowering::*,
request::{KvRequest, RetryOptions},
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
};
@ -110,7 +110,7 @@ impl Client {
/// # });
/// ```
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
requests::new_raw_get_request(key, self.cf.clone())
new_raw_get_request(key.into(), self.cf.clone())
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -137,7 +137,7 @@ impl Client {
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
requests::new_raw_batch_get_request(keys, self.cf.clone())
new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone())
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -159,7 +159,7 @@ impl Client {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
requests::new_raw_put_request(key, value, self.cf.clone())
new_raw_put_request(key.into(), value.into(), self.cf.clone())
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -185,7 +185,7 @@ impl Client {
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
requests::new_raw_batch_put_request(pairs, self.cf.clone())
new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone())
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -208,7 +208,7 @@ impl Client {
/// # });
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
requests::new_raw_delete_request(key, self.cf.clone())
new_raw_delete_request(key.into(), self.cf.clone())
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -231,7 +231,7 @@ impl Client {
/// # });
/// ```
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
requests::new_raw_batch_delete_request(keys, self.cf.clone())
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone())
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -252,7 +252,7 @@ impl Client {
/// # });
/// ```
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
requests::new_raw_delete_range_request(range, self.cf.clone())
new_raw_delete_range_request(range.into(), self.cf.clone())
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
@ -277,7 +277,7 @@ impl Client {
/// # });
/// ```
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
self.scan_inner(range, limit, false).await
self.scan_inner(range.into(), limit, false).await
}
/// Create a new 'scan' request that only returns the keys.
@ -388,7 +388,7 @@ impl Client {
});
}
let res = requests::new_raw_scan_request(range, limit, key_only, self.cf.clone())
let res = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone())
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await;
res.map(|mut s| {
@ -410,8 +410,13 @@ impl Client {
});
}
requests::new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone())
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
new_raw_batch_scan_request(
ranges.into_iter().map(Into::into),
each_limit,
key_only,
self.cf.clone(),
)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
}

78
src/raw/lowering.rs Normal file
View File

@ -0,0 +1,78 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
/// This module provides constructor functions for requests which take arguments as high-level
/// types (i.e., the types from the client crate) and converts these to the types used in the
/// generated protobuf code, then calls the low-level ctor functions in the requests module.
use crate::{raw::requests, BoundRange, ColumnFamily, Key, KvPair, Value};
use std::iter::Iterator;
use tikv_client_proto::kvrpcpb;
pub fn new_raw_get_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
requests::new_raw_get_request(key.into(), cf)
}
pub fn new_raw_batch_get_request(
keys: impl Iterator<Item = Key>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawBatchGetRequest {
requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf)
}
pub fn new_raw_put_request(
key: Key,
value: Value,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawPutRequest {
requests::new_raw_put_request(key.into(), value, cf)
}
pub fn new_raw_batch_put_request(
pairs: impl Iterator<Item = KvPair>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawBatchPutRequest {
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf)
}
pub fn new_raw_delete_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
requests::new_raw_delete_request(key.into(), cf)
}
pub fn new_raw_batch_delete_request(
keys: impl Iterator<Item = Key>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawBatchDeleteRequest {
requests::new_raw_batch_delete_request(keys.map(Into::into).collect(), cf)
}
pub fn new_raw_delete_range_request(
range: BoundRange,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawDeleteRangeRequest {
let (start_key, end_key) = range.into_keys();
requests::new_raw_delete_range_request(start_key.into(), end_key.unwrap_or_default().into(), cf)
}
pub fn new_raw_scan_request(
range: BoundRange,
limit: u32,
key_only: bool,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawScanRequest {
let (start_key, end_key) = range.into_keys();
requests::new_raw_scan_request(
start_key.into(),
end_key.unwrap_or_default().into(),
limit,
key_only,
cf,
)
}
pub fn new_raw_batch_scan_request(
ranges: impl Iterator<Item = BoundRange>,
each_limit: u32,
key_only: bool,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawBatchScanRequest {
requests::new_raw_batch_scan_request(ranges.map(Into::into).collect(), each_limit, key_only, cf)
}

View File

@ -14,6 +14,7 @@ use crate::Error;
use std::{convert::TryFrom, fmt};
mod client;
pub mod lowering;
mod requests;
/// A [`ColumnFamily`](ColumnFamily) is an optional parameter for [`raw::Client`](Client) requests.

View File

@ -54,12 +54,9 @@ impl KvRequest for kvrpcpb::RawGetRequest {
}
}
pub fn new_raw_get_request(
key: impl Into<Key>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawGetRequest {
pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
let mut req = kvrpcpb::RawGetRequest::default();
req.set_key(key.into().into());
req.set_key(key);
req.maybe_set_cf(cf);
req
@ -98,11 +95,11 @@ impl KvRequest for kvrpcpb::RawBatchGetRequest {
}
pub fn new_raw_batch_get_request(
keys: impl IntoIterator<Item = impl Into<Key>>,
keys: Vec<Vec<u8>>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawBatchGetRequest {
let mut req = kvrpcpb::RawBatchGetRequest::default();
req.set_keys(keys.into_iter().map(Into::into).map(Into::into).collect());
req.set_keys(keys);
req.maybe_set_cf(cf);
req
@ -144,13 +141,13 @@ impl KvRequest for kvrpcpb::RawPutRequest {
}
pub fn new_raw_put_request(
key: impl Into<Key>,
value: impl Into<Value>,
key: Vec<u8>,
value: Vec<u8>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawPutRequest {
let mut req = kvrpcpb::RawPutRequest::default();
req.set_key(key.into().into());
req.set_value(value.into());
req.set_key(key);
req.set_value(value);
req.maybe_set_cf(cf);
req
@ -187,11 +184,11 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
}
pub fn new_raw_batch_put_request(
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
pairs: Vec<kvrpcpb::KvPair>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawBatchPutRequest {
let mut req = kvrpcpb::RawBatchPutRequest::default();
req.set_pairs(pairs.into_iter().map(Into::into).map(Into::into).collect());
req.set_pairs(pairs);
req.maybe_set_cf(cf);
req
@ -229,12 +226,9 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
}
}
pub fn new_raw_delete_request(
key: impl Into<Key>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawDeleteRequest {
pub fn new_raw_delete_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
let mut req = kvrpcpb::RawDeleteRequest::default();
req.set_key(key.into().into());
req.set_key(key);
req.maybe_set_cf(cf);
req
@ -271,11 +265,11 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
}
pub fn new_raw_batch_delete_request(
keys: impl IntoIterator<Item = impl Into<Key>>,
keys: Vec<Vec<u8>>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawBatchDeleteRequest {
let mut req = kvrpcpb::RawBatchDeleteRequest::default();
req.set_keys(keys.into_iter().map(Into::into).map(Into::into).collect());
req.set_keys(keys);
req.maybe_set_cf(cf);
req
@ -316,13 +310,13 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
}
pub fn new_raw_delete_range_request(
range: impl Into<BoundRange>,
start_key: Vec<u8>,
end_key: Vec<u8>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawDeleteRangeRequest {
let (start_key, end_key) = range.into().into_keys();
let mut req = kvrpcpb::RawDeleteRangeRequest::default();
req.set_start_key(start_key.into());
req.set_end_key(end_key.unwrap_or_default().into());
req.set_start_key(start_key);
req.set_end_key(end_key);
req.maybe_set_cf(cf);
req
@ -365,15 +359,15 @@ impl KvRequest for kvrpcpb::RawScanRequest {
}
pub fn new_raw_scan_request(
range: impl Into<BoundRange>,
start_key: Vec<u8>,
end_key: Vec<u8>,
limit: u32,
key_only: bool,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawScanRequest {
let (start_key, end_key) = range.into().into_keys();
let mut req = kvrpcpb::RawScanRequest::default();
req.set_start_key(start_key.into());
req.set_end_key(end_key.unwrap_or_default().into());
req.set_start_key(start_key);
req.set_end_key(end_key);
req.set_limit(limit);
req.set_key_only(key_only);
req.maybe_set_cf(cf);
@ -418,13 +412,13 @@ impl KvRequest for kvrpcpb::RawBatchScanRequest {
}
pub fn new_raw_batch_scan_request(
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
ranges: Vec<kvrpcpb::KeyRange>,
each_limit: u32,
key_only: bool,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawBatchScanRequest {
let mut req = kvrpcpb::RawBatchScanRequest::default();
req.set_ranges(ranges.into_iter().map(Into::into).map(Into::into).collect());
req.set_ranges(ranges);
req.set_each_limit(each_limit);
req.set_key_only(key_only);
req.maybe_set_cf(cf);

View File

@ -81,7 +81,7 @@ impl Buffer {
f: F,
) -> Result<impl Iterator<Item = KvPair>>
where
F: FnOnce(Vec<Key>) -> Fut,
F: FnOnce(Box<dyn Iterator<Item = Key>>) -> Fut,
Fut: Future<Output = Result<Vec<KvPair>>>,
{
let (cached_results, undetermined_keys) = {
@ -106,11 +106,11 @@ impl Buffer {
.into_iter()
.filter_map(|(k, v)| v.unwrap().map(|v| KvPair(k, v)));
let undetermined_keys = undetermined_keys.into_iter().map(|(k, _)| k).collect();
let undetermined_keys = undetermined_keys.into_iter().map(|(k, _)| k);
(cached_results, undetermined_keys)
};
let fetched_results = f(undetermined_keys).await?;
let fetched_results = f(Box::new(undetermined_keys)).await?;
let mut mutations = self.mutations.lock().await;
for kvpair in &fetched_results {
let key = kvpair.0.clone();

View File

@ -185,8 +185,8 @@ impl Client {
let mut start_key = vec![];
loop {
let req = new_scan_lock_request(
mem::take(&mut start_key).into(),
safepoint.clone(),
mem::take(&mut start_key),
safepoint.version(),
SCAN_LOCK_BATCH_SIZE,
);
let res: Vec<kvrpcpb::LockInfo> = req

View File

@ -39,8 +39,10 @@ pub async fn resolve_locks(
let mut commit_versions: HashMap<u64, u64> = HashMap::new();
let mut clean_regions: HashMap<u64, HashSet<RegionVerId>> = HashMap::new();
for lock in expired_locks {
let primary_key: Key = lock.primary_lock.into();
let region_ver_id = pd_client.region_for_key(&primary_key).await?.ver_id();
let region_ver_id = pd_client
.region_for_key(&lock.primary_lock.clone().into())
.await?
.ver_id();
// skip if the region is cleaned
if clean_regions
.get(&lock.lock_version)
@ -53,9 +55,10 @@ pub async fn resolve_locks(
let commit_version = match commit_versions.get(&lock.lock_version) {
Some(&commit_version) => commit_version,
None => {
let commit_version = requests::new_cleanup_request(primary_key, lock.lock_version)
.execute(pd_client.clone(), RetryOptions::default_optimistic())
.await?;
let commit_version =
requests::new_cleanup_request(lock.primary_lock, lock.lock_version)
.execute(pd_client.clone(), RetryOptions::default_optimistic())
.await?;
commit_versions.insert(lock.lock_version, commit_version);
commit_version
}

148
src/transaction/lowering.rs Normal file
View File

@ -0,0 +1,148 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
/// This module provides constructor functions for requests which take arguments as high-level
/// types (i.e., the types from the client crate) and converts these to the types used in the
/// generated protobuf code, then calls the low-level ctor functions in the requests module.
use crate::{timestamp::TimestampExt, transaction::requests, BoundRange, Key};
use std::iter::Iterator;
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
pub fn new_get_request(key: Key, timestamp: Timestamp) -> kvrpcpb::GetRequest {
requests::new_get_request(key.into(), timestamp.version())
}
pub fn new_get_batch_request(
keys: impl Iterator<Item = Key>,
timestamp: Timestamp,
) -> kvrpcpb::BatchGetRequest {
requests::new_get_batch_request(keys.map(Into::into).collect(), timestamp.version())
}
pub fn new_scan_request(
range: BoundRange,
timestamp: Timestamp,
limit: u32,
key_only: bool,
) -> kvrpcpb::ScanRequest {
let (start_key, end_key) = range.into_keys();
requests::new_scan_request(
start_key.into(),
end_key.unwrap_or_default().into(),
timestamp.version(),
limit,
key_only,
)
}
pub fn new_resolve_lock_request(
context: kvrpcpb::Context,
start_version: Timestamp,
commit_version: Timestamp,
) -> kvrpcpb::ResolveLockRequest {
requests::new_resolve_lock_request(context, start_version.version(), commit_version.version())
}
pub fn new_cleanup_request(key: Key, start_version: Timestamp) -> kvrpcpb::CleanupRequest {
requests::new_cleanup_request(key.into(), start_version.version())
}
pub fn new_prewrite_request(
mutations: Vec<kvrpcpb::Mutation>,
primary_lock: Key,
start_version: Timestamp,
lock_ttl: u64,
) -> kvrpcpb::PrewriteRequest {
requests::new_prewrite_request(
mutations,
primary_lock.into(),
start_version.version(),
lock_ttl,
)
}
pub fn new_pessimistic_prewrite_request(
mutations: Vec<kvrpcpb::Mutation>,
primary_lock: Key,
start_version: Timestamp,
lock_ttl: u64,
for_update_ts: Timestamp,
) -> kvrpcpb::PrewriteRequest {
requests::new_pessimistic_prewrite_request(
mutations,
primary_lock.into(),
start_version.version(),
lock_ttl,
for_update_ts.version(),
)
}
pub fn new_commit_request(
keys: impl Iterator<Item = Key>,
start_version: Timestamp,
commit_version: Timestamp,
) -> kvrpcpb::CommitRequest {
requests::new_commit_request(
keys.map(Into::into).collect(),
start_version.version(),
commit_version.version(),
)
}
pub fn new_batch_rollback_request(
keys: impl Iterator<Item = Key>,
start_version: Timestamp,
) -> kvrpcpb::BatchRollbackRequest {
requests::new_batch_rollback_request(keys.map(Into::into).collect(), start_version.version())
}
pub fn new_pessimistic_rollback_request(
keys: impl Iterator<Item = Key>,
start_version: Timestamp,
for_update_ts: Timestamp,
) -> kvrpcpb::PessimisticRollbackRequest {
requests::new_pessimistic_rollback_request(
keys.map(Into::into).collect(),
start_version.version(),
for_update_ts.version(),
)
}
pub fn new_pessimistic_lock_request(
keys: impl Iterator<Item = Key>,
primary_lock: Key,
start_version: Timestamp,
lock_ttl: u64,
for_update_ts: Timestamp,
need_value: bool,
) -> kvrpcpb::PessimisticLockRequest {
requests::new_pessimistic_lock_request(
keys.map(|key| {
let mut mutation = kvrpcpb::Mutation::default();
mutation.set_op(kvrpcpb::Op::PessimisticLock);
mutation.set_key(key.into());
mutation
})
.collect(),
primary_lock.into(),
start_version.version(),
lock_ttl,
for_update_ts.version(),
need_value,
)
}
pub fn new_scan_lock_request(
start_key: Key,
safepoint: Timestamp,
limit: u32,
) -> kvrpcpb::ScanLockRequest {
requests::new_scan_lock_request(start_key.into(), safepoint.version(), limit)
}
pub fn new_heart_beat_request(
start_ts: Timestamp,
primary_lock: Key,
ttl: u64,
) -> kvrpcpb::TxnHeartBeatRequest {
requests::new_heart_beat_request(start_ts.version(), primary_lock.into(), ttl)
}

View File

@ -15,6 +15,7 @@ pub use transaction::{CheckLevel, Transaction, TransactionOptions};
mod buffer;
mod client;
pub mod lowering;
#[macro_use]
mod requests;
mod lock;

View File

@ -61,10 +61,10 @@ impl HasLocks for kvrpcpb::GetResponse {
}
}
pub fn new_mvcc_get_request(key: Key, timestamp: Timestamp) -> kvrpcpb::GetRequest {
pub fn new_get_request(key: Vec<u8>, timestamp: u64) -> kvrpcpb::GetRequest {
let mut req = kvrpcpb::GetRequest::default();
req.set_key(key.into());
req.set_version(timestamp.version());
req.set_key(key);
req.set_version(timestamp);
req
}
@ -109,13 +109,10 @@ impl HasLocks for kvrpcpb::BatchGetResponse {
}
}
pub fn new_mvcc_get_batch_request(
keys: Vec<Key>,
timestamp: Timestamp,
) -> kvrpcpb::BatchGetRequest {
pub fn new_get_batch_request(keys: Vec<Vec<u8>>, timestamp: u64) -> kvrpcpb::BatchGetRequest {
let mut req = kvrpcpb::BatchGetRequest::default();
req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_version(timestamp.version());
req.set_keys(keys);
req.set_version(timestamp);
req
}
@ -154,19 +151,19 @@ impl KvRequest for kvrpcpb::ScanRequest {
}
}
pub fn new_mvcc_scan_request(
range: BoundRange,
timestamp: Timestamp,
pub fn new_scan_request(
start_key: Vec<u8>,
end_key: Vec<u8>,
timestamp: u64,
limit: u32,
key_only: bool,
) -> kvrpcpb::ScanRequest {
let (start_key, end_key) = range.into_keys();
let mut req = kvrpcpb::ScanRequest::default();
req.set_start_key(start_key.into());
req.set_end_key(end_key.unwrap_or_default().into());
req.set_start_key(start_key);
req.set_end_key(end_key);
req.set_limit(limit);
req.set_key_only(key_only);
req.set_version(timestamp.version());
req.set_version(timestamp);
req
}
@ -279,9 +276,9 @@ impl KvRequest for kvrpcpb::CleanupRequest {
}
}
pub fn new_cleanup_request(key: impl Into<Key>, start_version: u64) -> kvrpcpb::CleanupRequest {
pub fn new_cleanup_request(key: Vec<u8>, start_version: u64) -> kvrpcpb::CleanupRequest {
let mut req = kvrpcpb::CleanupRequest::default();
req.set_key(key.into().into());
req.set_key(key);
req.set_start_version(start_version);
req
@ -352,13 +349,13 @@ impl HasLocks for kvrpcpb::PrewriteResponse {
pub fn new_prewrite_request(
mutations: Vec<kvrpcpb::Mutation>,
primary_lock: Key,
primary_lock: Vec<u8>,
start_version: u64,
lock_ttl: u64,
) -> kvrpcpb::PrewriteRequest {
let mut req = kvrpcpb::PrewriteRequest::default();
req.set_mutations(mutations);
req.set_primary_lock(primary_lock.into());
req.set_primary_lock(primary_lock);
req.set_start_version(start_version);
req.set_lock_ttl(lock_ttl);
// TODO: Lite resolve lock is currently disabled
@ -369,7 +366,7 @@ pub fn new_prewrite_request(
pub fn new_pessimistic_prewrite_request(
mutations: Vec<kvrpcpb::Mutation>,
primary_lock: Key,
primary_lock: Vec<u8>,
start_version: u64,
lock_ttl: u64,
for_update_ts: u64,
@ -414,12 +411,12 @@ impl KvRequest for kvrpcpb::CommitRequest {
}
pub fn new_commit_request(
keys: Vec<Key>,
keys: Vec<Vec<u8>>,
start_version: u64,
commit_version: u64,
) -> kvrpcpb::CommitRequest {
let mut req = kvrpcpb::CommitRequest::default();
req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_keys(keys);
req.set_start_version(start_version);
req.set_commit_version(commit_version);
@ -502,12 +499,12 @@ impl KvRequest for kvrpcpb::PessimisticRollbackRequest {
}
pub fn new_pessimistic_rollback_request(
keys: Vec<Key>,
keys: Vec<Vec<u8>>,
start_version: u64,
for_update_ts: u64,
) -> kvrpcpb::PessimisticRollbackRequest {
let mut req = kvrpcpb::PessimisticRollbackRequest::default();
req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_keys(keys);
req.set_start_version(start_version);
req.set_for_update_ts(for_update_ts);
@ -576,25 +573,16 @@ impl KvRequest for kvrpcpb::PessimisticLockRequest {
}
pub fn new_pessimistic_lock_request(
keys: Vec<Key>,
primary_lock: Key,
mutations: Vec<kvrpcpb::Mutation>,
primary_lock: Vec<u8>,
start_version: u64,
lock_ttl: u64,
for_update_ts: u64,
need_value: bool,
) -> kvrpcpb::PessimisticLockRequest {
let mut req = kvrpcpb::PessimisticLockRequest::default();
let mutations = keys
.into_iter()
.map(|key| {
let mut mutation = kvrpcpb::Mutation::default();
mutation.set_op(kvrpcpb::Op::PessimisticLock);
mutation.set_key(key.into());
mutation
})
.collect();
req.set_mutations(mutations);
req.set_primary_lock(primary_lock.into());
req.set_primary_lock(primary_lock);
req.set_start_version(start_version);
req.set_lock_ttl(lock_ttl);
req.set_for_update_ts(for_update_ts);
@ -642,13 +630,13 @@ impl KvRequest for kvrpcpb::ScanLockRequest {
}
pub fn new_scan_lock_request(
start_key: Key,
safepoint: Timestamp,
start_key: Vec<u8>,
safepoint: u64,
limit: u32,
) -> kvrpcpb::ScanLockRequest {
let mut req = kvrpcpb::ScanLockRequest::default();
req.set_start_key(start_key.into());
req.set_max_version(safepoint.version());
req.set_start_key(start_key);
req.set_max_version(safepoint);
req.set_limit(limit);
req
}
@ -689,13 +677,13 @@ impl KvRequest for kvrpcpb::TxnHeartBeatRequest {
}
pub fn new_heart_beat_request(
start_ts: Timestamp,
primary_lock: Key,
start_ts: u64,
primary_lock: Vec<u8>,
ttl: u64,
) -> kvrpcpb::TxnHeartBeatRequest {
let mut req = kvrpcpb::TxnHeartBeatRequest::default();
req.set_start_version(start_ts.version());
req.set_primary_lock(primary_lock.into());
req.set_start_version(start_ts);
req.set_primary_lock(primary_lock);
req.set_advise_lock_ttl(ttl);
req
}

View File

@ -5,7 +5,7 @@ use crate::{
pd::{PdClient, PdRpcClient},
request::{KvRequest, RetryOptions},
timestamp::TimestampExt,
transaction::{buffer::Buffer, requests::*},
transaction::{buffer::Buffer, lowering::*},
BoundRange, Error, Key, KvPair, Result, Value,
};
use derive_new::new;
@ -98,7 +98,7 @@ impl Transaction {
let key = key.into();
self.buffer
.get_or_else(key, |key| {
new_mvcc_get_request(key, self.timestamp.clone())
new_get_request(key, self.timestamp.clone())
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
})
.await
@ -199,7 +199,7 @@ impl Transaction {
let rpc = self.rpc.clone();
self.buffer
.batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| {
new_mvcc_get_batch_request(keys, timestamp)
new_get_batch_request(keys, timestamp)
.execute(rpc, RetryOptions::default_optimistic())
})
.await
@ -462,21 +462,22 @@ impl Transaction {
Ok(())
}
/// Commits the actions of the transaction. On success, we return the commit timestamp.
/// Commits the actions of the transaction. On success, we return the commit timestamp (or None
/// if there was nothing to commit).
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Config, TransactionClient};
/// # use tikv_client::{Config, Timestamp, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// // ... Do some actions.
/// let req = txn.commit();
/// let result: u64 = req.await.unwrap();
/// let result: Timestamp = req.await.unwrap().unwrap();
/// # });
/// ```
pub async fn commit(&mut self) -> Result<u64> {
pub async fn commit(&mut self) -> Result<Option<Timestamp>> {
if !matches!(
self.status,
TransactionStatus::StartedCommit | TransactionStatus::Active
@ -490,7 +491,7 @@ impl Transaction {
let res = Committer::new(
primary_key,
mutations,
self.timestamp.version(),
self.timestamp.clone(),
self.bg_worker.clone(),
self.rpc.clone(),
self.options.clone(),
@ -521,7 +522,7 @@ impl Transaction {
let res = Committer::new(
primary_key,
mutations,
self.timestamp.version(),
self.timestamp.clone(),
self.bg_worker.clone(),
self.rpc.clone(),
self.options.clone(),
@ -561,7 +562,7 @@ impl Transaction {
self.buffer
.scan_and_fetch(range.into(), limit, move |new_range, new_limit| {
new_mvcc_scan_request(new_range, timestamp, new_limit, key_only)
new_scan_request(new_range, timestamp, new_limit, key_only)
.execute(rpc, RetryOptions::default_optimistic())
})
.await
@ -585,14 +586,14 @@ impl Transaction {
let keys: Vec<Key> = keys.into_iter().collect();
let first_key = keys[0].clone();
let primary_lock = self.buffer.get_primary_key_or(&first_key.into()).await;
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
let lock_ttl = DEFAULT_LOCK_TTL;
let for_update_ts = self.rpc.clone().get_timestamp().await.unwrap().version();
self.options.push_for_update_ts(for_update_ts);
let for_update_ts = self.rpc.clone().get_timestamp().await.unwrap();
self.options.push_for_update_ts(for_update_ts.clone());
let values = new_pessimistic_lock_request(
keys.clone(),
keys.clone().into_iter(),
primary_lock,
self.timestamp.version(),
self.timestamp.clone(),
lock_ttl,
for_update_ts,
need_value,
@ -601,7 +602,7 @@ impl Transaction {
.await?;
for key in keys {
self.buffer.lock(key.into()).await;
self.buffer.lock(key).await;
}
Ok(values)
@ -640,15 +641,15 @@ impl Drop for Transaction {
}
/// Optimistic or pessimistic transaction.
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
#[derive(Clone, PartialEq, Debug)]
pub enum TransactionKind {
Optimistic,
/// Argument is for_update_ts
Pessimistic(u64),
Pessimistic(Timestamp),
}
/// Options for configuring a transaction.
#[derive(Clone, Eq, PartialEq, Debug)]
#[derive(Clone, PartialEq, Debug)]
pub struct TransactionOptions {
/// Optimistic or pessimistic (default) transaction.
kind: TransactionKind,
@ -693,7 +694,7 @@ impl TransactionOptions {
/// Default options for a pessimistic transaction.
pub fn new_pessimistic() -> TransactionOptions {
TransactionOptions {
kind: TransactionKind::Pessimistic(0),
kind: TransactionKind::Pessimistic(Timestamp::from_version(0)),
try_one_pc: false,
async_commit: false,
read_only: false,
@ -744,12 +745,14 @@ impl TransactionOptions {
self
}
fn push_for_update_ts(&mut self, for_update_ts: u64) {
fn push_for_update_ts(&mut self, for_update_ts: Timestamp) {
match &mut self.kind {
TransactionKind::Optimistic => unreachable!(),
TransactionKind::Pessimistic(old_for_update_ts) => {
self.kind =
TransactionKind::Pessimistic(std::cmp::max(*old_for_update_ts, for_update_ts));
self.kind = TransactionKind::Pessimistic(Timestamp::from_version(std::cmp::max(
old_for_update_ts.version(),
for_update_ts.version(),
)));
}
}
}
@ -769,7 +772,7 @@ const DEFAULT_LOCK_TTL: u64 = 3000;
struct Committer {
primary_key: Option<Key>,
mutations: Vec<kvrpcpb::Mutation>,
start_version: u64,
start_version: Timestamp,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
options: TransactionOptions,
@ -778,10 +781,10 @@ struct Committer {
}
impl Committer {
async fn commit(mut self) -> Result<u64> {
async fn commit(mut self) -> Result<Option<Timestamp>> {
if self.mutations.is_empty() {
assert!(self.primary_key.is_none());
return Ok(0);
return Ok(None);
}
let min_commit_ts = self.prewrite().await?;
@ -792,8 +795,7 @@ impl Committer {
}
let commit_ts = if self.options.async_commit {
assert_ne!(min_commit_ts, 0);
min_commit_ts
min_commit_ts.unwrap()
} else {
match self.commit_primary().await {
Ok(commit_ts) => commit_ts,
@ -808,31 +810,31 @@ impl Committer {
};
self.bg_worker
.clone()
.spawn_ok(self.commit_secondary(commit_ts).map(|res| {
.spawn_ok(self.commit_secondary(commit_ts.clone()).map(|res| {
if let Err(e) = res {
warn!("Failed to commit secondary keys: {}", e);
}
}));
Ok(commit_ts)
Ok(Some(commit_ts))
}
async fn prewrite(&mut self) -> Result<u64> {
async fn prewrite(&mut self) -> Result<Option<Timestamp>> {
let primary_lock = self.primary_key.clone().unwrap();
// TODO: calculate TTL for big transactions
let lock_ttl = DEFAULT_LOCK_TTL;
let mut request = match self.options.kind {
let mut request = match &self.options.kind {
TransactionKind::Optimistic => new_prewrite_request(
self.mutations.clone(),
primary_lock,
self.start_version,
self.start_version.clone(),
lock_ttl,
),
TransactionKind::Pessimistic(for_update_ts) => new_pessimistic_prewrite_request(
self.mutations.clone(),
primary_lock,
self.start_version,
self.start_version.clone(),
lock_ttl,
for_update_ts,
for_update_ts.clone(),
),
};
@ -855,7 +857,7 @@ impl Committer {
return Err(Error::OnePcFailure);
}
return Ok(response[0].one_pc_commit_ts);
return Ok(Timestamp::try_from_version(response[0].one_pc_commit_ts));
}
self.options.try_one_pc = false;
@ -867,58 +869,63 @@ impl Committer {
r.min_commit_ts
})
.max()
.unwrap();
.map(|ts| Timestamp::from_version(ts));
Ok(min_commit_ts)
}
/// Commits the primary key and returns the commit version
async fn commit_primary(&mut self) -> Result<u64> {
let primary_key = vec![self.primary_key.clone().unwrap()];
let commit_version = self.rpc.clone().get_timestamp().await?.version();
new_commit_request(primary_key, self.start_version, commit_version)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.inspect_err(|e| {
// We don't know whether the transaction is committed or not if we fail to receive
// the response. Then, we mark the transaction as undetermined and propagate the
// error to the user.
if let Error::Grpc(_) = e {
self.undetermined = true;
}
})
.await?;
async fn commit_primary(&mut self) -> Result<Timestamp> {
let primary_key = self.primary_key.clone().into_iter();
let commit_version = self.rpc.clone().get_timestamp().await?;
new_commit_request(
primary_key,
self.start_version.clone(),
commit_version.clone(),
)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.inspect_err(|e| {
// We don't know whether the transaction is committed or not if we fail to receive
// the response. Then, we mark the transaction as undetermined and propagate the
// error to the user.
if let Error::Grpc(_) = e {
self.undetermined = true;
}
})
.await?;
Ok(commit_version)
}
async fn commit_secondary(self, commit_version: u64) -> Result<()> {
async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
let primary_only = self.mutations.len() == 1;
let mutations = self.mutations.into_iter();
let keys: Vec<Key> = if self.options.async_commit {
mutations.map(|m| m.key.into()).collect()
let req = if self.options.async_commit {
let keys = mutations.map(|m| m.key.into());
new_commit_request(keys, self.start_version, commit_version)
} else if primary_only {
return Ok(());
} else {
let primary_key = self.primary_key.unwrap();
mutations
let keys = mutations
.map(|m| m.key.into())
.filter(|key| &primary_key != key)
.collect()
.filter(|key| &primary_key != key);
new_commit_request(keys, self.start_version, commit_version)
};
new_commit_request(keys, self.start_version, commit_version)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
req.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
async fn rollback(self) -> Result<()> {
let keys: Vec<_> = self
if self.options.kind == TransactionKind::Optimistic && self.mutations.is_empty() {
return Ok(());
}
let keys = self
.mutations
.into_iter()
.map(|mutation| mutation.key.into())
.collect();
.map(|mutation| mutation.key.into());
match self.options.kind {
TransactionKind::Optimistic if keys.is_empty() => Ok(()),
TransactionKind::Optimistic => {
new_batch_rollback_request(keys, self.start_version)
.execute(self.rpc, RetryOptions::default_optimistic())