From 893f26e9df49049236f98fc9414b8b28c1e1a321 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Wed, 14 Aug 2019 15:32:42 +0800 Subject: [PATCH] Remove mvcc apis of old style Signed-off-by: Yilin Chen --- src/kv_client/client.rs | 262 ++------------------------------- src/kv_client/requests/mvcc.rs | 12 +- 2 files changed, 17 insertions(+), 257 deletions(-) diff --git a/src/kv_client/client.rs b/src/kv_client/client.rs index fddcd3e..fc5ac00 100644 --- a/src/kv_client/client.rs +++ b/src/kv_client/client.rs @@ -10,14 +10,13 @@ use futures::prelude::*; use grpcio::CallOption; use kvproto::kvrpcpb; use kvproto::tikvpb::TikvClient; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use crate::{ kv_client::{requests::KvRequest, HasError}, - pd::Region, stats::tikv_stats, transaction::TxnInfo, - ErrorKind, Key, Result, + ErrorKind, Result, }; /// This client handles requests for a single TiKV node. It converts the data @@ -37,254 +36,6 @@ impl super::KvClient for KvRpcClient { } } -pub struct TransactionRegionClient { - region: Region, - timeout: Duration, - client: Arc, -} - -// FIXME use `request` method instead. -macro_rules! txn_request { - ($region:expr, $type:ty) => {{ - let mut req = <$type>::default(); - // FIXME don't unwrap - req.set_context($region.context().unwrap()); - req - }}; -} - -impl From for kvrpcpb::TxnInfo { - fn from(txn_info: TxnInfo) -> kvrpcpb::TxnInfo { - let mut pb = kvrpcpb::TxnInfo::default(); - pb.set_txn(txn_info.txn); - pb.set_status(txn_info.status); - pb - } -} - -impl TransactionRegionClient { - pub fn kv_get( - &self, - version: u64, - key: Key, - ) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::GetRequest); - req.set_key(key.into()); - req.set_version(version); - - map_errors_and_trace( - "kv_get", - self.client - .clone() - .kv_get_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_scan( - &self, - version: u64, - start_key: Key, - end_key: Key, - limit: u32, - key_only: bool, - ) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::ScanRequest); - req.set_start_key(start_key.into()); - req.set_end_key(end_key.into()); - req.set_version(version); - req.set_limit(limit); - req.set_key_only(key_only); - - map_errors_and_trace( - "kv_scan", - self.client - .clone() - .kv_scan_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_prewrite( - &self, - mutations: impl Iterator, - primary_lock: Key, - start_version: u64, - lock_ttl: u64, - skip_constraint_check: bool, - ) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::PrewriteRequest); - req.set_mutations(mutations.collect()); - req.set_primary_lock(primary_lock.into()); - req.set_start_version(start_version); - req.set_lock_ttl(lock_ttl); - req.set_skip_constraint_check(skip_constraint_check); - - map_errors_and_trace( - "kv_prewrite", - self.client - .clone() - .kv_prewrite_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_commit( - &self, - keys: impl Iterator, - start_version: u64, - commit_version: u64, - ) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::CommitRequest); - req.set_keys(keys.map(|x| x.into()).collect()); - req.set_start_version(start_version); - req.set_commit_version(commit_version); - - map_errors_and_trace( - "kv_commit", - self.client - .clone() - .kv_commit_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_import( - &self, - mutations: impl Iterator, - commit_version: u64, - ) -> impl Future> { - let mut req = kvrpcpb::ImportRequest::default(); - req.set_mutations(mutations.collect()); - req.set_commit_version(commit_version); - - map_errors_and_trace( - "kv_import", - self.client - .clone() - .kv_import_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_cleanup( - &self, - key: Key, - start_version: u64, - ) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::CleanupRequest); - req.set_key(key.into()); - req.set_start_version(start_version); - - map_errors_and_trace( - "kv_cleanup", - self.client - .clone() - .kv_cleanup_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_batch_get( - &self, - keys: impl Iterator, - version: u64, - ) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::BatchGetRequest); - req.set_keys(keys.map(|x| x.into()).collect()); - req.set_version(version); - - map_errors_and_trace( - "kv_batch_get", - self.client - .clone() - .kv_batch_get_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_batch_rollback( - &self, - keys: impl Iterator, - start_version: u64, - ) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::BatchRollbackRequest); - req.set_keys(keys.map(|x| x.into()).collect()); - req.set_start_version(start_version); - - map_errors_and_trace( - "kv_batch_rollback", - self.client - .clone() - .kv_batch_rollback_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_scan_lock( - &self, - start_key: Key, - max_version: u64, - limit: u32, - ) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::ScanLockRequest); - req.set_start_key(start_key.into()); - req.set_max_version(max_version); - req.set_limit(limit); - - map_errors_and_trace( - "kv_scan_lock", - self.client - .clone() - .kv_scan_lock_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_resolve_lock( - &self, - txn_infos: impl Iterator, - start_version: u64, - commit_version: u64, - ) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::ResolveLockRequest); - req.set_start_version(start_version); - req.set_commit_version(commit_version); - req.set_txn_infos(txn_infos.map(Into::into).collect()); - - map_errors_and_trace( - "kv_resolve_lock", - self.client - .clone() - .kv_resolve_lock_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_gc(&self, safe_point: u64) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::GcRequest); - req.set_safe_point(safe_point); - - map_errors_and_trace( - "kv_gc", - self.client - .clone() - .kv_gc_async_opt(&req, self.call_options()), - ) - } - - pub fn kv_delete_range( - &self, - start_key: Key, - end_key: Key, - ) -> impl Future> { - let mut req = txn_request!(self.region, kvrpcpb::DeleteRangeRequest); - req.set_start_key(start_key.into()); - req.set_end_key(end_key.into()); - - map_errors_and_trace( - "kv_delete_range", - self.client - .clone() - .kv_delete_range_async_opt(&req, self.call_options()), - ) - } - - fn call_options(&self) -> CallOption { - CallOption::default().timeout(self.timeout) - } -} - fn map_errors_and_trace( request_name: &'static str, fut: ::grpcio::Result, @@ -311,3 +62,12 @@ where }) .map(move |r| context.done(r)) } + +impl From for kvrpcpb::TxnInfo { + fn from(txn_info: TxnInfo) -> kvrpcpb::TxnInfo { + let mut pb = kvrpcpb::TxnInfo::default(); + pb.set_txn(txn_info.txn); + pb.set_status(txn_info.status); + pb + } +} diff --git a/src/kv_client/requests/mvcc.rs b/src/kv_client/requests/mvcc.rs index 10b40b7..1c463df 100644 --- a/src/kv_client/requests/mvcc.rs +++ b/src/kv_client/requests/mvcc.rs @@ -12,12 +12,12 @@ use std::mem; use std::sync::Arc; #[derive(Clone)] -pub struct Get { +pub struct MvccGet { pub key: Key, pub version: u64, } -impl KvRequest for Get { +impl KvRequest for MvccGet { type Result = Option; type RpcRequest = kvrpcpb::GetRequest; type RpcResponse = kvrpcpb::GetResponse; @@ -68,12 +68,12 @@ impl KvRequest for Get { } #[derive(Clone)] -pub struct BatchGet { +pub struct MvccBatchGet { pub keys: Vec, pub version: u64, } -impl KvRequest for BatchGet { +impl KvRequest for MvccBatchGet { type Result = Vec; type RpcRequest = kvrpcpb::BatchGetRequest; type RpcResponse = kvrpcpb::BatchGetResponse; @@ -124,7 +124,7 @@ impl KvRequest for BatchGet { } #[derive(Clone)] -pub struct Scan { +pub struct MvccScan { pub range: BoundRange, // TODO this limit is currently treated as a per-region limit, not a total // limit. @@ -134,7 +134,7 @@ pub struct Scan { pub version: u64, } -impl KvRequest for Scan { +impl KvRequest for MvccScan { type Result = Vec; type RpcRequest = kvrpcpb::ScanRequest; type RpcResponse = kvrpcpb::ScanResponse;