Merge pull request #221 from nrc/cmds

Implement some missing API
This commit is contained in:
Nick Cameron 2021-01-14 09:12:49 +13:00 committed by GitHub
commit 61299d840a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 267 additions and 10 deletions

View File

@ -14,11 +14,13 @@ const LOGICAL_MASK: i64 = (1 << PHYSICAL_SHIFT_BITS) - 1;
///
/// Currently the only implmentation of this trait is [`Timestamp`](Timestamp) in TiKV.
/// It contains a physical part (first 46 bits) and a logical part (last 18 bits).
pub trait TimestampExt {
pub trait TimestampExt: Sized {
/// Convert the timestamp to u64.
fn version(&self) -> u64;
/// Convert u64 to a timestamp.
fn from_version(version: u64) -> Self;
/// Convert u64 to an optional timestamp, where `0` represents no timestamp.
fn try_from_version(version: u64) -> Option<Self>;
}
impl TimestampExt for Timestamp {
@ -35,4 +37,12 @@ impl TimestampExt for Timestamp {
logical: version & LOGICAL_MASK,
}
}
fn try_from_version(version: u64) -> Option<Self> {
if version == 0 {
None
} else {
Some(Self::from_version(version))
}
}
}

View File

@ -13,7 +13,7 @@ use crate::{
};
use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use std::{iter, mem, sync::Arc};
use std::{collections::HashMap, iter, mem, sync::Arc};
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
#[async_trait]
@ -519,6 +519,7 @@ impl KvRequest for kvrpcpb::PessimisticLockRequest {
type Result = ();
type RpcResponse = kvrpcpb::PessimisticLockResponse;
type KeyData = Vec<kvrpcpb::Mutation>;
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
@ -591,6 +592,7 @@ impl KvRequest for kvrpcpb::ScanLockRequest {
type Result = Vec<kvrpcpb::LockInfo>;
type RpcResponse = kvrpcpb::ScanLockResponse;
type KeyData = (Key, Key); // end_key should always be empty. Used to satisfy `store_stream_for_range`
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
@ -629,16 +631,200 @@ pub fn new_scan_lock_request(
req
}
#[async_trait]
impl KvRequest for kvrpcpb::TxnHeartBeatRequest {
type Result = u64;
type RpcResponse = kvrpcpb::TxnHeartBeatResponse;
type KeyData = Key;
fn make_rpc_request(&self, key: Self::KeyData, store: &Store) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_start_version(self.start_version);
req.set_primary_lock(key.into());
req.set_advise_lock_ttl(self.advise_lock_ttl);
Ok(req)
}
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store)>> {
let key = mem::take(&mut self.primary_lock).into();
store_stream_for_key(key, pd_client)
}
fn map_result(resp: Self::RpcResponse) -> Self::Result {
resp.lock_ttl
}
async fn reduce(results: BoxStream<'static, Result<Self::Result>>) -> Result<Self::Result> {
results
.into_future()
.map(|(f, _)| f.expect("no results should be impossible"))
.await
}
}
pub fn new_heart_beat_request(
start_ts: Timestamp,
primary_lock: Key,
ttl: u64,
) -> kvrpcpb::TxnHeartBeatRequest {
let mut req = kvrpcpb::TxnHeartBeatRequest::default();
req.set_start_version(start_ts.version());
req.set_primary_lock(primary_lock.into());
req.set_advise_lock_ttl(ttl);
req
}
#[async_trait]
impl KvRequest for kvrpcpb::CheckTxnStatusRequest {
type Result = TransactionStatus;
type RpcResponse = kvrpcpb::CheckTxnStatusResponse;
type KeyData = Key;
fn make_rpc_request(&self, key: Self::KeyData, store: &Store) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_primary_key(key.into());
req.set_lock_ts(self.lock_ts);
req.set_caller_start_ts(self.caller_start_ts);
req.set_current_ts(self.current_ts);
req.set_rollback_if_not_exist(self.rollback_if_not_exist);
Ok(req)
}
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store)>> {
let key = mem::take(&mut self.primary_key);
store_stream_for_key(key.into(), pd_client)
}
fn map_result(resp: Self::RpcResponse) -> Self::Result {
resp.into()
}
async fn reduce(results: BoxStream<'static, Result<Self::Result>>) -> Result<Self::Result> {
results
.into_future()
.map(|(f, _)| f.expect("no results should be impossible"))
.await
}
}
#[derive(Debug, Clone)]
pub struct TransactionStatus {
pub kind: TransactionStatusKind,
pub action: kvrpcpb::Action,
}
impl From<kvrpcpb::CheckTxnStatusResponse> for TransactionStatus {
fn from(resp: kvrpcpb::CheckTxnStatusResponse) -> TransactionStatus {
TransactionStatus {
action: resp.get_action(),
kind: (resp.commit_version, resp.lock_ttl, resp.lock_info).into(),
}
}
}
#[derive(Debug, Clone)]
pub enum TransactionStatusKind {
Committed(Timestamp),
RolledBack,
Locked(u64, kvrpcpb::LockInfo),
}
impl From<(u64, u64, Option<kvrpcpb::LockInfo>)> for TransactionStatusKind {
fn from((ts, ttl, info): (u64, u64, Option<kvrpcpb::LockInfo>)) -> TransactionStatusKind {
match (ts, ttl, info) {
(0, 0, None) => TransactionStatusKind::RolledBack,
(ts, 0, None) => TransactionStatusKind::Committed(Timestamp::from_version(ts)),
(0, ttl, Some(info)) => TransactionStatusKind::Locked(ttl, info),
_ => unreachable!(),
}
}
}
#[async_trait]
impl KvRequest for kvrpcpb::CheckSecondaryLocksRequest {
type Result = SecondaryLocksStatus;
type RpcResponse = kvrpcpb::CheckSecondaryLocksResponse;
type KeyData = Vec<Key>;
fn make_rpc_request(&self, keys: Self::KeyData, store: &Store) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_start_version(self.start_version);
Ok(req)
}
fn store_stream<PdC: PdClient>(
&mut self,
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store)>> {
self.keys.sort();
let keys = mem::take(&mut self.keys);
store_stream_for_keys(keys, pd_client)
}
fn map_result(resp: Self::RpcResponse) -> Self::Result {
SecondaryLocksStatus {
locks: resp
.locks
.into_iter()
.map(|l| (l.key.clone().into(), l))
.collect(),
commit_ts: Timestamp::try_from_version(resp.commit_ts),
}
}
async fn reduce(results: BoxStream<'static, Result<Self::Result>>) -> Result<Self::Result> {
results
.try_fold(
SecondaryLocksStatus {
locks: HashMap::new(),
commit_ts: None,
},
|mut a, b| async move {
a.merge(b);
Ok(a)
},
)
.await
}
}
pub struct SecondaryLocksStatus {
pub locks: HashMap<Key, kvrpcpb::LockInfo>,
pub commit_ts: Option<Timestamp>,
}
impl SecondaryLocksStatus {
fn merge(&mut self, other: SecondaryLocksStatus) {
self.commit_ts = match (self.commit_ts.take(), other.commit_ts) {
(Some(a), Some(b)) => {
assert_eq!(a, b);
Some(a)
}
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
};
self.locks.extend(other.locks.into_iter());
}
}
impl HasLocks for kvrpcpb::CommitResponse {}
impl HasLocks for kvrpcpb::CleanupResponse {}
impl HasLocks for kvrpcpb::BatchRollbackResponse {}
impl HasLocks for kvrpcpb::PessimisticRollbackResponse {}
impl HasLocks for kvrpcpb::ResolveLockResponse {}
impl HasLocks for kvrpcpb::ScanLockResponse {}
impl HasLocks for kvrpcpb::PessimisticLockResponse {}
impl HasLocks for kvrpcpb::TxnHeartBeatResponse {}
impl HasLocks for kvrpcpb::CheckTxnStatusResponse {}
impl HasLocks for kvrpcpb::CheckSecondaryLocksResponse {}

View File

@ -454,6 +454,20 @@ impl Transaction {
res
}
/// Send a heart beat message to keep the transaction alive on the server and update its TTL.
///
/// Returns the TTL set on the lock by the server.
pub async fn send_heart_beat(&mut self) -> Result<u64> {
self.check_allow_operation()?;
let primary_key = match self.buffer.get_primary_key().await {
Some(k) => k,
None => return Err(Error::NoPrimaryKey),
};
new_heart_beat_request(self.timestamp.clone(), primary_key, DEFAULT_LOCK_TTL)
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
.await
}
async fn scan_inner(
&self,
range: impl Into<BoundRange>,
@ -526,7 +540,7 @@ impl Transaction {
impl Drop for Transaction {
fn drop(&mut self) {
if self.status == TransactionStatus::Active {
if !std::thread::panicking() && self.status == TransactionStatus::Active {
match self.options.check_level {
CheckLevel::Panic => {
panic!("Dropping an active transaction. Consider commit or rollback it.")

View File

@ -132,6 +132,23 @@ async fn crud() -> Result<()> {
Ok(())
}
#[tokio::test]
#[serial]
async fn pessimistic() -> Fallible<()> {
clear_tikv().await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut txn = client.begin_pessimistic().await?;
txn.put("foo".to_owned(), "foo".to_owned()).await.unwrap();
let ttl = txn.send_heart_beat().await.unwrap();
assert!(ttl > 0);
txn.commit().await.unwrap();
Fallible::Ok(())
}
/// bank transfer mainly tests raw put and get
#[tokio::test]
#[serial]

View File

@ -22,6 +22,9 @@ pub enum Error {
/// We tried to use 1pc for a transaction, but it didn't work. Probably should have used 2pc.
#[error("1PC transaction could not be committed.")]
OnePcFailure,
/// An operation requires a primary key, but the transaction was empty.
#[error("transaction has no primary key")]
NoPrimaryKey,
/// Wraps a `std::io::Error`.
#[error("IO error: {0}")]
Io(#[from] std::io::Error),

View File

@ -37,6 +37,10 @@ has_region_error!(kvrpcpb::CleanupResponse);
has_region_error!(kvrpcpb::BatchGetResponse);
has_region_error!(kvrpcpb::ScanLockResponse);
has_region_error!(kvrpcpb::ResolveLockResponse);
has_region_error!(kvrpcpb::TxnHeartBeatResponse);
has_region_error!(kvrpcpb::CheckTxnStatusResponse);
has_region_error!(kvrpcpb::CheckSecondaryLocksResponse);
has_region_error!(kvrpcpb::DeleteRangeResponse);
has_region_error!(kvrpcpb::GcResponse);
has_region_error!(kvrpcpb::RawGetResponse);
has_region_error!(kvrpcpb::RawBatchGetResponse);
@ -44,7 +48,6 @@ has_region_error!(kvrpcpb::RawPutResponse);
has_region_error!(kvrpcpb::RawBatchPutResponse);
has_region_error!(kvrpcpb::RawDeleteResponse);
has_region_error!(kvrpcpb::RawBatchDeleteResponse);
has_region_error!(kvrpcpb::DeleteRangeResponse);
has_region_error!(kvrpcpb::RawDeleteRangeResponse);
has_region_error!(kvrpcpb::RawScanResponse);
has_region_error!(kvrpcpb::RawBatchScanResponse);
@ -70,6 +73,9 @@ has_key_error!(kvrpcpb::CleanupResponse);
has_key_error!(kvrpcpb::ScanLockResponse);
has_key_error!(kvrpcpb::ResolveLockResponse);
has_key_error!(kvrpcpb::GcResponse);
has_key_error!(kvrpcpb::TxnHeartBeatResponse);
has_key_error!(kvrpcpb::CheckTxnStatusResponse);
has_key_error!(kvrpcpb::CheckSecondaryLocksResponse);
macro_rules! has_str_error {
($type:ty) => {

View File

@ -93,3 +93,24 @@ impl_request!(
kv_pessimistic_lock_async_opt,
"kv_pessimistic_lock"
);
impl_request!(
TxnHeartBeatRequest,
kv_txn_heart_beat_async_opt,
"kv_txn_heart_beat"
);
impl_request!(
CheckTxnStatusRequest,
kv_check_txn_status_async_opt,
"kv_check_txn_status"
);
impl_request!(
CheckSecondaryLocksRequest,
kv_check_secondary_locks_async_opt,
"kv_check_secondary_locks_request"
);
impl_request!(GcRequest, kv_gc_async_opt, "kv_gc");
impl_request!(
DeleteRangeRequest,
kv_delete_range_async_opt,
"kv_delete_range"
);