feat: implement atomic requests

Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
ekexium 2021-03-25 10:50:30 +08:00
parent 12f2aa8492
commit 9c898959f3
7 changed files with 202 additions and 37 deletions

View File

@ -173,16 +173,15 @@ impl Client {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
self.put_inner(key, value, false).await
}
/// Create a new *atomic* 'put' request.
/// Atomic operations can block each other on the same key.
///
/// Once resolved this request will result in the setting of the value associated with the given key.
pub async fn atomic_put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.put_inner(key, value, true).await
}
/// Create a new 'batch put' request.
@ -206,15 +205,19 @@ impl Client {
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
let request = new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
self.batch_put_inner(pairs, false).await
}
/// Create a new *atomic* 'batch put' request.
/// Atomic operations can block each other on the same key.
///
/// Once resolved this request will result in the setting of the values
/// associated with the given keys.
pub async fn atomic_batch_put(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
self.batch_put_inner(pairs, true).await
}
/// Create a new 'delete' request.
@ -235,16 +238,29 @@ impl Client {
/// # });
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
let request = new_raw_delete_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
self.delete_inner(key, false).await
}
/// Create a new *atomic* 'delete' request.
/// Atomic operations can block each other on the same key.
///
/// Once resolved this request will result in the deletion of the given key.
///
/// It does not return an error if the key does not exist in TiKV.
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Key, Config, RawClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = client.delete(key);
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub async fn atomic_delete(&self, key: impl Into<Key>) -> Result<()> {
self.delete_inner(key, true).await
}
/// Create a new 'batch delete' request.
@ -422,6 +438,30 @@ impl Client {
.collect())
}
/// Create a new *atomic* 'compare and set' request.
///
/// Once resolved this request will result in an atomic `compare and set' operation for the given key.
///
/// If the value retrived is equal to `current_value`, `new_value` is written.
///
/// # Return Value
/// A tuple is returned if successful: the previous value and whether the value is swapped
pub async fn atomic_compare_and_swap(
&self,
key: impl Into<Key>,
previous_value: Option<Value>,
new_value: Value,
) -> Result<(Option<Value>, bool)> {
let req = new_cas_request(key.into(), new_value, previous_value, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
.single_region()
.await?
.retry_region(DEFAULT_REGION_BACKOFF)
.post_process_default()
.plan();
plan.execute().await
}
async fn scan_inner(
&self,
range: impl Into<BoundRange>,
@ -476,4 +516,52 @@ impl Client {
.plan();
plan.execute().await
}
async fn put_inner(
&self,
key: impl Into<Key>,
value: impl Into<Value>,
atomic: bool,
) -> Result<()> {
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
}
async fn batch_put_inner(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
atomic: bool,
) -> Result<()> {
let request =
new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone(), atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
}
async fn delete_inner(&self, key: impl Into<Key>, atomic: bool) -> Result<()> {
let request = new_raw_delete_request(key.into(), self.cf.clone(), atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
}
}

View File

@ -22,19 +22,25 @@ pub fn new_raw_put_request(
key: Key,
value: Value,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
requests::new_raw_put_request(key.into(), value, cf)
requests::new_raw_put_request(key.into(), value, cf, atomic)
}
pub fn new_raw_batch_put_request(
pairs: impl Iterator<Item = KvPair>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf)
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic)
}
pub fn new_raw_delete_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
requests::new_raw_delete_request(key.into(), cf)
pub fn new_raw_delete_request(
key: Key,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawDeleteRequest {
requests::new_raw_delete_request(key.into(), cf, atomic)
}
pub fn new_raw_batch_delete_request(
@ -76,3 +82,12 @@ pub fn new_raw_batch_scan_request(
) -> kvrpcpb::RawBatchScanRequest {
requests::new_raw_batch_scan_request(ranges.map(Into::into).collect(), each_limit, key_only, cf)
}
pub fn new_cas_request(
key: Key,
value: Value,
previous_value: Option<Value>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawCasRequest {
requests::new_cas_request(key.into(), value, previous_value, cf)
}

View File

@ -76,11 +76,13 @@ pub fn new_raw_put_request(
key: Vec<u8>,
value: Vec<u8>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
let mut req = kvrpcpb::RawPutRequest::default();
req.set_key(key);
req.set_value(value);
req.maybe_set_cf(cf);
req.set_for_cas(atomic);
req
}
@ -98,10 +100,12 @@ impl SingleKey for kvrpcpb::RawPutRequest {
pub fn new_raw_batch_put_request(
pairs: Vec<kvrpcpb::KvPair>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
let mut req = kvrpcpb::RawBatchPutRequest::default();
req.set_pairs(pairs);
req.maybe_set_cf(cf);
req.set_for_cas(atomic);
req
}
@ -132,10 +136,15 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
}
}
pub fn new_raw_delete_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
pub fn new_raw_delete_request(
key: Vec<u8>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawDeleteRequest {
let mut req = kvrpcpb::RawDeleteRequest::default();
req.set_key(key);
req.maybe_set_cf(cf);
req.set_for_cas(atomic);
req
}
@ -267,6 +276,46 @@ impl Merge<kvrpcpb::RawBatchScanResponse> for Collect {
}
}
pub fn new_cas_request(
key: Vec<u8>,
value: Vec<u8>,
previous_value: Option<Vec<u8>>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawCasRequest {
let mut req = kvrpcpb::RawCasRequest::default();
req.set_key(key);
req.set_value(value);
match previous_value {
Some(v) => req.set_previous_value(v),
None => req.set_previous_not_exist(true),
}
req.maybe_set_cf(cf);
req
}
impl KvRequest for kvrpcpb::RawCasRequest {
type Response = kvrpcpb::RawCasResponse;
}
impl SingleKey for kvrpcpb::RawCasRequest {
fn key(&self) -> &Vec<u8> {
&self.key
}
}
impl Process<kvrpcpb::RawCasResponse> for DefaultProcessor {
type Out = (Option<Value>, bool); // (previous_value, swapped)
fn process(&self, input: Result<kvrpcpb::RawCasResponse>) -> Result<Self::Out> {
let input = input?;
if input.previous_not_exist {
Ok((None, input.succeed))
} else {
Ok((Some(input.previous_value), input.succeed))
}
}
}
macro_rules! impl_raw_rpc_request {
($name: ident) => {
impl RawRpcRequest for kvrpcpb::$name {
@ -286,6 +335,7 @@ impl_raw_rpc_request!(RawBatchDeleteRequest);
impl_raw_rpc_request!(RawScanRequest);
impl_raw_rpc_request!(RawBatchScanRequest);
impl_raw_rpc_request!(RawDeleteRangeRequest);
impl_raw_rpc_request!(RawCasRequest);
impl HasLocks for kvrpcpb::RawGetResponse {}
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
@ -296,6 +346,7 @@ impl HasLocks for kvrpcpb::RawBatchDeleteResponse {}
impl HasLocks for kvrpcpb::RawScanResponse {}
impl HasLocks for kvrpcpb::RawBatchScanResponse {}
impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
impl HasLocks for kvrpcpb::RawCasResponse {}
#[cfg(test)]
mod test {

View File

@ -276,8 +276,11 @@ impl<P: Plan + HasKeys, PdC: PdClient> HasKeys for ResolveLock<P, PdC> {
}
}
/// When executed, the plan extracts errors from its inner plan, and
/// returns an `Err` wrapping the error.
/// When executed, the plan extracts errors from its inner plan, and returns an
/// `Err` wrapping the error.
///
/// We usually need to apply this plan if (and only if) the output of the inner
/// plan is of a response type.
///
/// The errors come from two places: `Err` from inner plans, and `Ok(response)`
/// where `response` contains unresolved errors (`error` and `region_error`).

View File

@ -35,7 +35,7 @@ impl TimestampExt for Timestamp {
Self {
physical: version >> PHYSICAL_SHIFT_BITS,
logical: version & LOGICAL_MASK,
// We only support global transactions
// Now we only support global transactions
suffix_bits: 0,
}
}

View File

@ -52,6 +52,7 @@ has_region_error!(kvrpcpb::RawBatchDeleteResponse);
has_region_error!(kvrpcpb::RawDeleteRangeResponse);
has_region_error!(kvrpcpb::RawScanResponse);
has_region_error!(kvrpcpb::RawBatchScanResponse);
has_region_error!(kvrpcpb::RawCasResponse);
macro_rules! has_key_error {
($type:ty) => {
@ -100,6 +101,7 @@ has_str_error!(kvrpcpb::RawBatchPutResponse);
has_str_error!(kvrpcpb::RawDeleteResponse);
has_str_error!(kvrpcpb::RawBatchDeleteResponse);
has_str_error!(kvrpcpb::RawDeleteRangeResponse);
has_str_error!(kvrpcpb::RawCasResponse);
has_str_error!(kvrpcpb::ImportResponse);
has_str_error!(kvrpcpb::DeleteRangeResponse);

View File

@ -66,6 +66,12 @@ impl_request!(
raw_delete_range_async_opt,
"raw_delete_range"
);
impl_request!(
RawCasRequest,
raw_compare_and_swap_async_opt,
"raw_compare_and_swap"
);
impl_request!(GetRequest, kv_get_async_opt, "kv_get");
impl_request!(ScanRequest, kv_scan_async_opt, "kv_scan");
impl_request!(PrewriteRequest, kv_prewrite_async_opt, "kv_prewrite");