mirror of https://github.com/tikv/client-rust.git
*: Support API v2 (part 1) (#415)
* API v2 part1 Signed-off-by: Ping Yu <yuping@pingcap.com> * inplace encoding Signed-off-by: Ping Yu <yuping@pingcap.com> * polish Signed-off-by: Ping Yu <yuping@pingcap.com> * polish Signed-off-by: Ping Yu <yuping@pingcap.com> * export proto Signed-off-by: Ping Yu <yuping@pingcap.com> * fix set_context Signed-off-by: Ping Yu <yuping@pingcap.com> * add Codec parameter to Transaction & Snapshot Signed-off-by: Ping Yu <yuping@pingcap.com> --------- Signed-off-by: Ping Yu <yuping@pingcap.com>
This commit is contained in:
parent
abf22ba680
commit
4b0e844a40
|
@ -94,6 +94,8 @@
|
|||
|
||||
pub mod backoff;
|
||||
#[doc(hidden)]
|
||||
pub mod proto; // export `proto` to enable user customized codec
|
||||
#[doc(hidden)]
|
||||
pub mod raw;
|
||||
pub mod request;
|
||||
#[doc(hidden)]
|
||||
|
@ -104,7 +106,6 @@ mod compat;
|
|||
mod config;
|
||||
mod kv;
|
||||
mod pd;
|
||||
mod proto;
|
||||
mod region;
|
||||
mod region_cache;
|
||||
mod stats;
|
||||
|
@ -145,6 +146,8 @@ pub use crate::raw::Client as RawClient;
|
|||
#[doc(inline)]
|
||||
pub use crate::raw::ColumnFamily;
|
||||
#[doc(inline)]
|
||||
pub use crate::request::codec;
|
||||
#[doc(inline)]
|
||||
pub use crate::request::RetryOptions;
|
||||
#[doc(inline)]
|
||||
pub use crate::timestamp::Timestamp;
|
||||
|
|
21
src/mock.rs
21
src/mock.rs
|
@ -18,6 +18,7 @@ use crate::proto::metapb::RegionEpoch;
|
|||
use crate::proto::metapb::{self};
|
||||
use crate::region::RegionId;
|
||||
use crate::region::RegionWithLeader;
|
||||
use crate::request::codec::ApiV1TxnCodec;
|
||||
use crate::store::KvClient;
|
||||
use crate::store::KvConnect;
|
||||
use crate::store::RegionStore;
|
||||
|
@ -30,7 +31,7 @@ use crate::Timestamp;
|
|||
|
||||
/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
|
||||
/// client can be tested without doing any RPC calls.
|
||||
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
|
||||
pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCluster> {
|
||||
let config = Config::default();
|
||||
PdRpcClient::new(
|
||||
config.clone(),
|
||||
|
@ -43,6 +44,7 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
|
|||
))
|
||||
},
|
||||
false,
|
||||
Some(ApiV1TxnCodec::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -71,9 +73,18 @@ pub struct MockKvConnect;
|
|||
|
||||
pub struct MockCluster;
|
||||
|
||||
#[derive(new)]
|
||||
pub struct MockPdClient {
|
||||
client: MockKvClient,
|
||||
codec: ApiV1TxnCodec,
|
||||
}
|
||||
|
||||
impl MockPdClient {
|
||||
pub fn new(client: MockKvClient) -> MockPdClient {
|
||||
MockPdClient {
|
||||
client,
|
||||
codec: ApiV1TxnCodec::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -102,6 +113,7 @@ impl MockPdClient {
|
|||
pub fn default() -> MockPdClient {
|
||||
MockPdClient {
|
||||
client: MockKvClient::default(),
|
||||
codec: ApiV1TxnCodec::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -165,6 +177,7 @@ impl MockPdClient {
|
|||
|
||||
#[async_trait]
|
||||
impl PdClient for MockPdClient {
|
||||
type Codec = ApiV1TxnCodec;
|
||||
type KvClient = MockKvClient;
|
||||
|
||||
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
|
||||
|
@ -210,4 +223,8 @@ impl PdClient for MockPdClient {
|
|||
}
|
||||
|
||||
async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
|
||||
|
||||
fn get_codec(&self) -> &Self::Codec {
|
||||
&self.codec
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ use crate::region::RegionId;
|
|||
use crate::region::RegionVerId;
|
||||
use crate::region::RegionWithLeader;
|
||||
use crate::region_cache::RegionCache;
|
||||
use crate::request::codec::{ApiV1TxnCodec, Codec};
|
||||
use crate::store::KvClient;
|
||||
use crate::store::KvConnect;
|
||||
use crate::store::RegionStore;
|
||||
|
@ -50,6 +51,7 @@ use crate::Timestamp;
|
|||
/// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff.
|
||||
#[async_trait]
|
||||
pub trait PdClient: Send + Sync + 'static {
|
||||
type Codec: Codec;
|
||||
type KvClient: KvClient + Send + Sync + 'static;
|
||||
|
||||
/// In transactional API, `region` is decoded (keys in raw format).
|
||||
|
@ -189,8 +191,11 @@ pub trait PdClient: Send + Sync + 'static {
|
|||
.boxed()
|
||||
}
|
||||
|
||||
fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
|
||||
if enable_codec {
|
||||
fn decode_region(
|
||||
mut region: RegionWithLeader,
|
||||
enable_mvcc_codec: bool,
|
||||
) -> Result<RegionWithLeader> {
|
||||
if enable_mvcc_codec {
|
||||
codec::decode_bytes_in_place(&mut region.region.start_key, false)?;
|
||||
codec::decode_bytes_in_place(&mut region.region.end_key, false)?;
|
||||
}
|
||||
|
@ -200,20 +205,30 @@ pub trait PdClient: Send + Sync + 'static {
|
|||
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;
|
||||
|
||||
async fn invalidate_region_cache(&self, ver_id: RegionVerId);
|
||||
|
||||
/// Get the codec carried by `PdClient`.
|
||||
/// The purpose of carrying the codec is to avoid passing it on so many calling paths.
|
||||
fn get_codec(&self) -> &Self::Codec;
|
||||
}
|
||||
|
||||
/// This client converts requests for the logical TiKV cluster into requests
|
||||
/// for a single TiKV store using PD and internal logic.
|
||||
pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
|
||||
pub struct PdRpcClient<
|
||||
Cod: Codec = ApiV1TxnCodec,
|
||||
KvC: KvConnect + Send + Sync + 'static = TikvConnect,
|
||||
Cl = Cluster,
|
||||
> {
|
||||
pd: Arc<RetryClient<Cl>>,
|
||||
kv_connect: KvC,
|
||||
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
|
||||
enable_codec: bool,
|
||||
enable_mvcc_codec: bool,
|
||||
region_cache: RegionCache<RetryClient<Cl>>,
|
||||
codec: Option<Cod>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
||||
impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<Cod, KvC> {
|
||||
type Codec = Cod;
|
||||
type KvClient = KvC::KvClient;
|
||||
|
||||
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
|
||||
|
@ -224,20 +239,20 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
|||
}
|
||||
|
||||
async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
|
||||
let enable_codec = self.enable_codec;
|
||||
let key = if enable_codec {
|
||||
let enable_mvcc_codec = self.enable_mvcc_codec;
|
||||
let key = if enable_mvcc_codec {
|
||||
key.to_encoded()
|
||||
} else {
|
||||
key.clone()
|
||||
};
|
||||
|
||||
let region = self.region_cache.get_region_by_key(&key).await?;
|
||||
Self::decode_region(region, enable_codec)
|
||||
Self::decode_region(region, enable_mvcc_codec)
|
||||
}
|
||||
|
||||
async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
|
||||
let region = self.region_cache.get_region_by_id(id).await?;
|
||||
Self::decode_region(region, self.enable_codec)
|
||||
Self::decode_region(region, self.enable_mvcc_codec)
|
||||
}
|
||||
|
||||
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
|
||||
|
@ -255,31 +270,40 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
|||
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
|
||||
self.region_cache.invalidate_region_cache(ver_id).await
|
||||
}
|
||||
|
||||
fn get_codec(&self) -> &Self::Codec {
|
||||
self.codec
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| panic!("codec not set"))
|
||||
}
|
||||
}
|
||||
|
||||
impl PdRpcClient<TikvConnect, Cluster> {
|
||||
impl<Cod: Codec> PdRpcClient<Cod, TikvConnect, Cluster> {
|
||||
pub async fn connect(
|
||||
pd_endpoints: &[String],
|
||||
config: Config,
|
||||
enable_codec: bool,
|
||||
) -> Result<PdRpcClient> {
|
||||
enable_mvcc_codec: bool, // TODO: infer from `codec`.
|
||||
codec: Option<Cod>,
|
||||
) -> Result<PdRpcClient<Cod>> {
|
||||
PdRpcClient::new(
|
||||
config.clone(),
|
||||
|security_mgr| TikvConnect::new(security_mgr, config.timeout),
|
||||
|security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
|
||||
enable_codec,
|
||||
enable_mvcc_codec,
|
||||
codec,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
|
||||
impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, KvC, Cl> {
|
||||
pub async fn new<PdFut, MakeKvC, MakePd>(
|
||||
config: Config,
|
||||
kv_connect: MakeKvC,
|
||||
pd: MakePd,
|
||||
enable_codec: bool,
|
||||
) -> Result<PdRpcClient<KvC, Cl>>
|
||||
enable_mvcc_codec: bool,
|
||||
codec: Option<Cod>,
|
||||
) -> Result<PdRpcClient<Cod, KvC, Cl>>
|
||||
where
|
||||
PdFut: Future<Output = Result<RetryClient<Cl>>>,
|
||||
MakeKvC: FnOnce(Arc<SecurityManager>) -> KvC,
|
||||
|
@ -301,8 +325,9 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
|
|||
pd: pd.clone(),
|
||||
kv_client_cache,
|
||||
kv_connect: kv_connect(security_mgr),
|
||||
enable_codec,
|
||||
enable_mvcc_codec,
|
||||
region_cache: RegionCache::new(pd),
|
||||
codec,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -322,6 +347,10 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
|
|||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_codec(&mut self, codec: Cod) {
|
||||
self.codec = Some(codec);
|
||||
}
|
||||
}
|
||||
|
||||
fn make_key_range(start_key: Vec<u8>, end_key: Vec<u8>) -> kvrpcpb::KeyRange {
|
||||
|
|
|
@ -15,6 +15,7 @@ use crate::pd::PdClient;
|
|||
use crate::pd::PdRpcClient;
|
||||
use crate::proto::metapb;
|
||||
use crate::raw::lowering::*;
|
||||
use crate::request::codec::{ApiV1RawCodec, Codec, EncodedRequest};
|
||||
use crate::request::Collect;
|
||||
use crate::request::CollectSingle;
|
||||
use crate::request::Plan;
|
||||
|
@ -35,7 +36,11 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
|
|||
///
|
||||
/// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
|
||||
/// awaited to execute.
|
||||
pub struct Client<PdC: PdClient = PdRpcClient> {
|
||||
pub struct Client<Cod = ApiV1RawCodec, PdC = PdRpcClient<Cod>>
|
||||
where
|
||||
Cod: Codec,
|
||||
PdC: PdClient<Codec = Cod>,
|
||||
{
|
||||
rpc: Arc<PdC>,
|
||||
cf: Option<ColumnFamily>,
|
||||
backoff: Backoff,
|
||||
|
@ -54,7 +59,7 @@ impl Clone for Client {
|
|||
}
|
||||
}
|
||||
|
||||
impl Client<PdRpcClient> {
|
||||
impl Client<ApiV1RawCodec, PdRpcClient<ApiV1RawCodec>> {
|
||||
/// Create a raw [`Client`] and connect to the TiKV cluster.
|
||||
///
|
||||
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
|
||||
|
@ -100,7 +105,10 @@ impl Client<PdRpcClient> {
|
|||
config: Config,
|
||||
) -> Result<Self> {
|
||||
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
|
||||
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config, false).await?);
|
||||
let rpc = Arc::new(
|
||||
PdRpcClient::connect(&pd_endpoints, config, false, Some(ApiV1RawCodec::default()))
|
||||
.await?,
|
||||
);
|
||||
Ok(Client {
|
||||
rpc,
|
||||
cf: None,
|
||||
|
@ -142,7 +150,9 @@ impl Client<PdRpcClient> {
|
|||
atomic: self.atomic,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Cod: Codec> Client<Cod, PdRpcClient<Cod>> {
|
||||
/// Set the [`Backoff`] strategy for retrying requests.
|
||||
/// The default strategy is [`DEFAULT_REGION_BACKOFF`](crate::backoff::DEFAULT_REGION_BACKOFF).
|
||||
/// See [`Backoff`] for more information.
|
||||
|
@ -189,7 +199,7 @@ impl Client<PdRpcClient> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<PdC: PdClient> Client<PdC> {
|
||||
impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
|
||||
/// Create a new 'get' request.
|
||||
///
|
||||
/// Once resolved this request will result in the fetching of the value associated with the
|
||||
|
@ -211,7 +221,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
|
||||
debug!("invoking raw get request");
|
||||
let request = new_raw_get_request(key.into(), self.cf.clone());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.retry_multi_region(self.backoff.clone())
|
||||
.merge(CollectSingle)
|
||||
.post_process_default()
|
||||
|
@ -243,7 +254,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
) -> Result<Vec<KvPair>> {
|
||||
debug!("invoking raw batch_get request");
|
||||
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.retry_multi_region(self.backoff.clone())
|
||||
.merge(Collect)
|
||||
.plan();
|
||||
|
@ -271,7 +283,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
|
||||
debug!("invoking raw put request");
|
||||
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.retry_multi_region(self.backoff.clone())
|
||||
.merge(CollectSingle)
|
||||
.extract_error()
|
||||
|
@ -307,7 +320,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
self.cf.clone(),
|
||||
self.atomic,
|
||||
);
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.retry_multi_region(self.backoff.clone())
|
||||
.extract_error()
|
||||
.plan();
|
||||
|
@ -335,7 +349,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
|
||||
debug!("invoking raw delete request");
|
||||
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.retry_multi_region(self.backoff.clone())
|
||||
.merge(CollectSingle)
|
||||
.extract_error()
|
||||
|
@ -366,7 +381,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
self.assert_non_atomic()?;
|
||||
let request =
|
||||
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.retry_multi_region(self.backoff.clone())
|
||||
.extract_error()
|
||||
.plan();
|
||||
|
@ -393,7 +409,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
debug!("invoking raw delete_range request");
|
||||
self.assert_non_atomic()?;
|
||||
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.retry_multi_region(self.backoff.clone())
|
||||
.extract_error()
|
||||
.plan();
|
||||
|
@ -549,7 +566,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
previous_value.into(),
|
||||
self.cf.clone(),
|
||||
);
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
|
||||
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.retry_multi_region(self.backoff.clone())
|
||||
.merge(CollectSingle)
|
||||
.post_process_default()
|
||||
|
@ -572,7 +590,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
ranges.into_iter().map(Into::into),
|
||||
request_builder,
|
||||
);
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
|
||||
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.preserve_shard()
|
||||
.retry_multi_region(self.backoff.clone())
|
||||
.post_process_default()
|
||||
|
@ -606,7 +625,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
while cur_limit > 0 {
|
||||
let request =
|
||||
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
|
||||
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.single_region_with_store(region_store.clone())
|
||||
.await?
|
||||
.plan()
|
||||
|
@ -661,7 +681,8 @@ impl<PdC: PdClient> Client<PdC> {
|
|||
key_only,
|
||||
self.cf.clone(),
|
||||
);
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.retry_multi_region(self.backoff.clone())
|
||||
.merge(Collect)
|
||||
.plan();
|
||||
|
|
|
@ -13,6 +13,7 @@ use super::RawRpcRequest;
|
|||
use crate::collect_first;
|
||||
use crate::pd::PdClient;
|
||||
use crate::proto::kvrpcpb;
|
||||
use crate::proto::kvrpcpb::ApiVersion;
|
||||
use crate::proto::metapb;
|
||||
use crate::proto::tikvpb::tikv_client::TikvClient;
|
||||
use crate::request::plan::ResponseWithShard;
|
||||
|
@ -161,7 +162,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
|
|||
}
|
||||
|
||||
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
self.pairs = shard;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -292,7 +293,7 @@ impl Shardable for kvrpcpb::RawBatchScanRequest {
|
|||
}
|
||||
|
||||
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
self.ranges = shard;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -397,6 +398,10 @@ impl Request for RawCoprocessorRequest {
|
|||
fn set_context(&mut self, context: kvrpcpb::Context) {
|
||||
self.inner.set_context(context);
|
||||
}
|
||||
|
||||
fn set_api_version(&mut self, api_version: ApiVersion) {
|
||||
self.inner.set_api_version(api_version);
|
||||
}
|
||||
}
|
||||
|
||||
impl KvRequest for RawCoprocessorRequest {
|
||||
|
@ -496,6 +501,7 @@ mod test {
|
|||
use crate::mock::MockKvClient;
|
||||
use crate::mock::MockPdClient;
|
||||
use crate::proto::kvrpcpb;
|
||||
use crate::request::codec::EncodedRequest;
|
||||
use crate::request::Plan;
|
||||
use crate::Key;
|
||||
|
||||
|
@ -530,7 +536,8 @@ mod test {
|
|||
key_only: true,
|
||||
..Default::default()
|
||||
};
|
||||
let plan = crate::request::PlanBuilder::new(client, scan)
|
||||
let encoded_scan = EncodedRequest::new(scan, client.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(client, encoded_scan)
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.retry_multi_region(DEFAULT_REGION_BACKOFF)
|
||||
.merge(Collect)
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.
|
||||
|
||||
use crate::proto::kvrpcpb;
|
||||
use crate::request::KvRequest;
|
||||
|
||||
pub trait Codec: Clone + Sync + Send + 'static {
|
||||
fn encode_request<R: KvRequest>(&self, _req: &mut R) {}
|
||||
// TODO: fn decode_response()
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ApiV1TxnCodec {}
|
||||
|
||||
impl Codec for ApiV1TxnCodec {}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ApiV1RawCodec {}
|
||||
|
||||
impl Codec for ApiV1RawCodec {}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ApiV2TxnCodec {
|
||||
_keyspace_id: u32,
|
||||
}
|
||||
|
||||
impl ApiV2TxnCodec {
|
||||
pub fn new(keyspace_id: u32) -> Self {
|
||||
Self {
|
||||
_keyspace_id: keyspace_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Codec for ApiV2TxnCodec {
|
||||
fn encode_request<R: KvRequest>(&self, req: &mut R) {
|
||||
req.set_api_version(kvrpcpb::ApiVersion::V2);
|
||||
// TODO: req.encode_request(self);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: pub struct ApiV2RawCodec
|
||||
|
||||
// EncodeRequest is just a type wrapper to avoid passing not encoded request to `PlanBuilder` by mistake.
|
||||
#[derive(Clone)]
|
||||
pub struct EncodedRequest<Req: KvRequest> {
|
||||
pub inner: Req,
|
||||
}
|
||||
|
||||
impl<Req: KvRequest> EncodedRequest<Req> {
|
||||
pub fn new<C: Codec>(mut req: Req, codec: &C) -> Self {
|
||||
codec.encode_request(&mut req);
|
||||
Self { inner: req }
|
||||
}
|
||||
}
|
|
@ -32,6 +32,7 @@ use crate::store::HasKeyErrors;
|
|||
use crate::store::Request;
|
||||
use crate::transaction::HasLocks;
|
||||
|
||||
pub mod codec;
|
||||
pub mod plan;
|
||||
mod plan_builder;
|
||||
mod shard;
|
||||
|
@ -41,6 +42,9 @@ mod shard;
|
|||
pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static {
|
||||
/// The expected response to the request.
|
||||
type Response: HasKeyErrors + HasLocks + Clone + Send + 'static;
|
||||
|
||||
// TODO: fn encode_request()
|
||||
// TODO: fn decode_response()
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, new, Eq, PartialEq)]
|
||||
|
@ -87,9 +91,12 @@ mod test {
|
|||
use super::*;
|
||||
use crate::mock::MockKvClient;
|
||||
use crate::mock::MockPdClient;
|
||||
use crate::pd::PdClient;
|
||||
use crate::proto::kvrpcpb;
|
||||
use crate::proto::kvrpcpb::ApiVersion;
|
||||
use crate::proto::pdpb::Timestamp;
|
||||
use crate::proto::tikvpb::tikv_client::TikvClient;
|
||||
use crate::request::codec::EncodedRequest;
|
||||
use crate::store::store_stream_for_keys;
|
||||
use crate::store::HasRegionError;
|
||||
use crate::transaction::lowering::new_commit_request;
|
||||
|
@ -138,6 +145,8 @@ mod test {
|
|||
fn set_context(&mut self, _: kvrpcpb::Context) {
|
||||
unreachable!();
|
||||
}
|
||||
|
||||
fn set_api_version(&mut self, _api_version: ApiVersion) {}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -183,7 +192,8 @@ mod test {
|
|||
|_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box<dyn Any>),
|
||||
)));
|
||||
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
|
||||
.resolve_lock(Backoff::no_jitter_backoff(1, 1, 3))
|
||||
.retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
|
||||
.extract_error()
|
||||
|
@ -207,16 +217,17 @@ mod test {
|
|||
|
||||
let key: Key = "key".to_owned().into();
|
||||
let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default());
|
||||
let encoded_req = EncodedRequest::new(req, pd_client.get_codec());
|
||||
|
||||
// does not extract error
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req.clone())
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req.clone())
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.retry_multi_region(OPTIMISTIC_BACKOFF)
|
||||
.plan();
|
||||
assert!(plan.execute().await.is_ok());
|
||||
|
||||
// extract error
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.retry_multi_region(OPTIMISTIC_BACKOFF)
|
||||
.extract_error()
|
||||
|
|
|
@ -6,6 +6,7 @@ use std::sync::Arc;
|
|||
use super::plan::PreserveShard;
|
||||
use crate::backoff::Backoff;
|
||||
use crate::pd::PdClient;
|
||||
use crate::request::codec::EncodedRequest;
|
||||
use crate::request::plan::CleanupLocks;
|
||||
use crate::request::shard::HasNextBatch;
|
||||
use crate::request::DefaultProcessor;
|
||||
|
@ -46,11 +47,11 @@ pub struct Targetted;
|
|||
impl PlanBuilderPhase for Targetted {}
|
||||
|
||||
impl<PdC: PdClient, Req: KvRequest> PlanBuilder<PdC, Dispatch<Req>, NoTarget> {
|
||||
pub fn new(pd_client: Arc<PdC>, request: Req) -> Self {
|
||||
pub fn new(pd_client: Arc<PdC>, encoded_request: EncodedRequest<Req>) -> Self {
|
||||
PlanBuilder {
|
||||
pd_client,
|
||||
plan: Dispatch {
|
||||
request,
|
||||
request: encoded_request.inner,
|
||||
kv_client: None,
|
||||
},
|
||||
phantom: PhantomData,
|
||||
|
|
|
@ -163,7 +163,7 @@ macro_rules! shardable_key {
|
|||
mut shard: Self::Shard,
|
||||
store: &$crate::store::RegionStore,
|
||||
) -> $crate::Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
assert!(shard.len() == 1);
|
||||
self.key = shard.pop().unwrap();
|
||||
Ok(())
|
||||
|
@ -196,7 +196,7 @@ macro_rules! shardable_keys {
|
|||
shard: Self::Shard,
|
||||
store: &$crate::store::RegionStore,
|
||||
) -> $crate::Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
self.keys = shard.into_iter().map(Into::into).collect();
|
||||
Ok(())
|
||||
}
|
||||
|
@ -225,7 +225,7 @@ macro_rules! shardable_range {
|
|||
shard: Self::Shard,
|
||||
store: &$crate::store::RegionStore,
|
||||
) -> $crate::Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
|
||||
self.start_key = shard.0.into();
|
||||
self.end_key = shard.1.into();
|
||||
|
|
|
@ -21,7 +21,10 @@ pub trait Request: Any + Sync + Send + 'static {
|
|||
) -> Result<Box<dyn Any>>;
|
||||
fn label(&self) -> &'static str;
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
/// Set the context for the request.
|
||||
/// Should always use `set_context` other than modify the `self.context` directly.
|
||||
fn set_context(&mut self, context: kvrpcpb::Context);
|
||||
fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion);
|
||||
}
|
||||
|
||||
macro_rules! impl_request {
|
||||
|
@ -52,7 +55,18 @@ macro_rules! impl_request {
|
|||
}
|
||||
|
||||
fn set_context(&mut self, context: kvrpcpb::Context) {
|
||||
let api_version = self
|
||||
.context
|
||||
.as_ref()
|
||||
.map(|c| c.api_version)
|
||||
.unwrap_or_default();
|
||||
self.context = Some(context);
|
||||
self.set_api_version(kvrpcpb::ApiVersion::from_i32(api_version).unwrap());
|
||||
}
|
||||
|
||||
fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) {
|
||||
let context = self.context.get_or_insert(kvrpcpb::Context::default());
|
||||
context.api_version = api_version.into();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -10,6 +10,7 @@ use crate::config::Config;
|
|||
use crate::pd::PdClient;
|
||||
use crate::pd::PdRpcClient;
|
||||
use crate::proto::pdpb::Timestamp;
|
||||
use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest};
|
||||
use crate::request::plan::CleanupLocksResult;
|
||||
use crate::request::Plan;
|
||||
use crate::timestamp::TimestampExt;
|
||||
|
@ -42,11 +43,11 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024;
|
|||
///
|
||||
/// The returned results of transactional requests are [`Future`](std::future::Future)s that must be
|
||||
/// awaited to execute.
|
||||
pub struct Client {
|
||||
pd: Arc<PdRpcClient>,
|
||||
pub struct Client<Cod: Codec = ApiV1TxnCodec> {
|
||||
pd: Arc<PdRpcClient<Cod>>,
|
||||
}
|
||||
|
||||
impl Clone for Client {
|
||||
impl<Cod: Codec> Clone for Client<Cod> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
pd: self.pd.clone(),
|
||||
|
@ -54,7 +55,7 @@ impl Clone for Client {
|
|||
}
|
||||
}
|
||||
|
||||
impl Client {
|
||||
impl Client<ApiV1TxnCodec> {
|
||||
/// Create a transactional [`Client`] and connect to the TiKV cluster.
|
||||
///
|
||||
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
|
||||
|
@ -71,7 +72,6 @@ impl Client {
|
|||
/// # });
|
||||
/// ```
|
||||
pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Client> {
|
||||
// debug!("creating transactional client");
|
||||
Self::new_with_config(pd_endpoints, Config::default()).await
|
||||
}
|
||||
|
||||
|
@ -100,9 +100,35 @@ impl Client {
|
|||
pd_endpoints: Vec<S>,
|
||||
config: Config,
|
||||
) -> Result<Client> {
|
||||
Self::new_with_codec(pd_endpoints, config, ApiV1TxnCodec::default()).await
|
||||
}
|
||||
}
|
||||
|
||||
impl Client<ApiV2TxnCodec> {
|
||||
pub async fn new_with_config_v2<S: Into<String>>(
|
||||
_keyspace_name: &str,
|
||||
pd_endpoints: Vec<S>,
|
||||
config: Config,
|
||||
) -> Result<Client<ApiV2TxnCodec>> {
|
||||
debug!("creating new transactional client APIv2");
|
||||
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
|
||||
let mut pd = PdRpcClient::connect(&pd_endpoints, config, true, None).await?;
|
||||
let keyspace_id = 0; // TODO: get keyspace_id by pd.get_keyspace(keyspace_name)
|
||||
pd.set_codec(ApiV2TxnCodec::new(keyspace_id));
|
||||
Ok(Client { pd: Arc::new(pd) })
|
||||
}
|
||||
}
|
||||
|
||||
impl<Cod: Codec> Client<Cod> {
|
||||
pub async fn new_with_codec<S: Into<String>>(
|
||||
pd_endpoints: Vec<S>,
|
||||
config: Config,
|
||||
codec: Cod,
|
||||
) -> Result<Client<Cod>> {
|
||||
debug!("creating new transactional client");
|
||||
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
|
||||
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true).await?);
|
||||
let pd =
|
||||
Arc::new(PdRpcClient::<Cod>::connect(&pd_endpoints, config, true, Some(codec)).await?);
|
||||
Ok(Client { pd })
|
||||
}
|
||||
|
||||
|
@ -126,7 +152,7 @@ impl Client {
|
|||
/// transaction.commit().await.unwrap();
|
||||
/// # });
|
||||
/// ```
|
||||
pub async fn begin_optimistic(&self) -> Result<Transaction> {
|
||||
pub async fn begin_optimistic(&self) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
|
||||
debug!("creating new optimistic transaction");
|
||||
let timestamp = self.current_timestamp().await?;
|
||||
Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
|
||||
|
@ -149,7 +175,7 @@ impl Client {
|
|||
/// transaction.commit().await.unwrap();
|
||||
/// # });
|
||||
/// ```
|
||||
pub async fn begin_pessimistic(&self) -> Result<Transaction> {
|
||||
pub async fn begin_pessimistic(&self) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
|
||||
debug!("creating new pessimistic transaction");
|
||||
let timestamp = self.current_timestamp().await?;
|
||||
Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
|
||||
|
@ -172,14 +198,21 @@ impl Client {
|
|||
/// transaction.commit().await.unwrap();
|
||||
/// # });
|
||||
/// ```
|
||||
pub async fn begin_with_options(&self, options: TransactionOptions) -> Result<Transaction> {
|
||||
pub async fn begin_with_options(
|
||||
&self,
|
||||
options: TransactionOptions,
|
||||
) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
|
||||
debug!("creating new customized transaction");
|
||||
let timestamp = self.current_timestamp().await?;
|
||||
Ok(self.new_transaction(timestamp, options))
|
||||
}
|
||||
|
||||
/// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp).
|
||||
pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot {
|
||||
pub fn snapshot(
|
||||
&self,
|
||||
timestamp: Timestamp,
|
||||
options: TransactionOptions,
|
||||
) -> Snapshot<Cod, PdRpcClient<Cod>> {
|
||||
debug!("creating new snapshot");
|
||||
Snapshot::new(self.new_transaction(timestamp, options.read_only()))
|
||||
}
|
||||
|
@ -246,7 +279,8 @@ impl Client {
|
|||
let ctx = ResolveLocksContext::default();
|
||||
let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
|
||||
let req = new_scan_lock_request(range.into(), safepoint, options.batch_size);
|
||||
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
|
||||
let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
|
||||
.cleanup_locks(ctx.clone(), options, backoff)
|
||||
.retry_multi_region(DEFAULT_REGION_BACKOFF)
|
||||
.extract_error()
|
||||
|
@ -265,14 +299,19 @@ impl Client {
|
|||
batch_size: u32,
|
||||
) -> Result<Vec<crate::proto::kvrpcpb::LockInfo>> {
|
||||
let req = new_scan_lock_request(range.into(), safepoint, batch_size);
|
||||
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
|
||||
let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
|
||||
.retry_multi_region(DEFAULT_REGION_BACKOFF)
|
||||
.merge(crate::request::Collect)
|
||||
.plan();
|
||||
plan.execute().await
|
||||
}
|
||||
|
||||
fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction {
|
||||
fn new_transaction(
|
||||
&self,
|
||||
timestamp: Timestamp,
|
||||
options: TransactionOptions,
|
||||
) -> Transaction<Cod, PdRpcClient<Cod>> {
|
||||
Transaction::new(timestamp, self.pd.clone(), options)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ use crate::proto::kvrpcpb;
|
|||
use crate::proto::kvrpcpb::TxnInfo;
|
||||
use crate::proto::pdpb::Timestamp;
|
||||
use crate::region::RegionVerId;
|
||||
use crate::request::codec::EncodedRequest;
|
||||
use crate::request::Collect;
|
||||
use crate::request::CollectSingle;
|
||||
use crate::request::Plan;
|
||||
|
@ -77,7 +78,8 @@ pub async fn resolve_locks(
|
|||
Some(&commit_version) => commit_version,
|
||||
None => {
|
||||
let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version);
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.retry_multi_region(DEFAULT_REGION_BACKOFF)
|
||||
.merge(CollectSingle)
|
||||
|
@ -118,8 +120,8 @@ async fn resolve_lock_with_retry(
|
|||
let store = pd_client.clone().store_for_key(key.into()).await?;
|
||||
let ver_id = store.region_with_leader.ver_id();
|
||||
let request = requests::new_resolve_lock_request(start_version, commit_version);
|
||||
// The only place where single-region is used
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
|
||||
.single_region_with_store(store)
|
||||
.await?
|
||||
.resolve_lock(Backoff::no_backoff())
|
||||
|
@ -359,7 +361,8 @@ impl LockResolver {
|
|||
force_sync_commit,
|
||||
resolving_pessimistic_lock,
|
||||
);
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
|
||||
let encoded_req = EncodedRequest::new(req, pd_client.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
|
||||
.retry_multi_region(DEFAULT_REGION_BACKOFF)
|
||||
.merge(CollectSingle)
|
||||
.extract_error()
|
||||
|
@ -383,7 +386,8 @@ impl LockResolver {
|
|||
txn_id: u64,
|
||||
) -> Result<SecondaryLocksStatus> {
|
||||
let req = new_check_secondary_locks_request(keys, txn_id);
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
|
||||
let encoded_req = EncodedRequest::new(req, pd_client.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
|
||||
.retry_multi_region(DEFAULT_REGION_BACKOFF)
|
||||
.extract_error()
|
||||
.merge(Collect)
|
||||
|
@ -399,7 +403,8 @@ impl LockResolver {
|
|||
) -> Result<RegionVerId> {
|
||||
let ver_id = store.region_with_leader.ver_id();
|
||||
let request = requests::new_batch_resolve_lock_request(txn_infos.clone());
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
|
||||
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
|
||||
.single_region_with_store(store.clone())
|
||||
.await?
|
||||
.extract_error()
|
||||
|
|
|
@ -38,6 +38,7 @@ use crate::shardable_range;
|
|||
use crate::store::store_stream_for_keys;
|
||||
use crate::store::store_stream_for_range;
|
||||
use crate::store::RegionStore;
|
||||
use crate::store::Request;
|
||||
use crate::timestamp::TimestampExt;
|
||||
use crate::transaction::HasLocks;
|
||||
use crate::util::iter::FlatMapOkIterExt;
|
||||
|
@ -294,7 +295,7 @@ impl Shardable for kvrpcpb::PrewriteRequest {
|
|||
}
|
||||
|
||||
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
|
||||
// Only need to set secondary keys if we're sending the primary key.
|
||||
if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) {
|
||||
|
@ -361,7 +362,7 @@ impl Shardable for kvrpcpb::CommitRequest {
|
|||
}
|
||||
|
||||
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
self.keys = shard.into_iter().map(Into::into).collect();
|
||||
Ok(())
|
||||
}
|
||||
|
@ -452,7 +453,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest {
|
|||
}
|
||||
|
||||
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
self.mutations = shard;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -553,7 +554,7 @@ impl Shardable for kvrpcpb::ScanLockRequest {
|
|||
}
|
||||
|
||||
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
self.start_key = shard.0;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -614,7 +615,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
|
|||
}
|
||||
|
||||
fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
assert!(shard.len() == 1);
|
||||
self.primary_lock = shard.pop().unwrap();
|
||||
Ok(())
|
||||
|
@ -672,7 +673,7 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest {
|
|||
}
|
||||
|
||||
fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
|
||||
self.context = Some(store.region_with_leader.context()?);
|
||||
self.set_context(store.region_with_leader.context()?);
|
||||
assert!(shard.len() == 1);
|
||||
self.primary_key = shard.pop().unwrap();
|
||||
Ok(())
|
||||
|
|
|
@ -2,7 +2,11 @@
|
|||
|
||||
use derive_new::new;
|
||||
use log::debug;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use crate::codec::ApiV1TxnCodec;
|
||||
use crate::pd::{PdClient, PdRpcClient};
|
||||
use crate::request::codec::Codec;
|
||||
use crate::BoundRange;
|
||||
use crate::Key;
|
||||
use crate::KvPair;
|
||||
|
@ -18,11 +22,12 @@ use crate::Value;
|
|||
///
|
||||
/// See the [Transaction](struct@crate::Transaction) docs for more information on the methods.
|
||||
#[derive(new)]
|
||||
pub struct Snapshot {
|
||||
transaction: Transaction,
|
||||
pub struct Snapshot<Cod: Codec = ApiV1TxnCodec, PdC: PdClient = PdRpcClient<Cod>> {
|
||||
transaction: Transaction<Cod, PdC>,
|
||||
phantom: PhantomData<Cod>,
|
||||
}
|
||||
|
||||
impl Snapshot {
|
||||
impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Snapshot<Cod, PdC> {
|
||||
/// Get the value associated with the given key.
|
||||
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
|
||||
debug!("invoking get request on snapshot");
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||
|
||||
use std::iter;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
|
@ -14,10 +15,12 @@ use tokio::time::Duration;
|
|||
|
||||
use crate::backoff::Backoff;
|
||||
use crate::backoff::DEFAULT_REGION_BACKOFF;
|
||||
use crate::codec::ApiV1TxnCodec;
|
||||
use crate::pd::PdClient;
|
||||
use crate::pd::PdRpcClient;
|
||||
use crate::proto::kvrpcpb;
|
||||
use crate::proto::pdpb::Timestamp;
|
||||
use crate::request::codec::{Codec, EncodedRequest};
|
||||
use crate::request::Collect;
|
||||
use crate::request::CollectError;
|
||||
use crate::request::CollectSingle;
|
||||
|
@ -73,7 +76,7 @@ use crate::Value;
|
|||
/// txn.commit().await.unwrap();
|
||||
/// # });
|
||||
/// ```
|
||||
pub struct Transaction<PdC: PdClient = PdRpcClient> {
|
||||
pub struct Transaction<Cod: Codec = ApiV1TxnCodec, PdC: PdClient = PdRpcClient<Cod>> {
|
||||
status: Arc<RwLock<TransactionStatus>>,
|
||||
timestamp: Timestamp,
|
||||
buffer: Buffer,
|
||||
|
@ -81,14 +84,15 @@ pub struct Transaction<PdC: PdClient = PdRpcClient> {
|
|||
options: TransactionOptions,
|
||||
is_heartbeat_started: bool,
|
||||
start_instant: Instant,
|
||||
phantom: PhantomData<Cod>,
|
||||
}
|
||||
|
||||
impl<PdC: PdClient> Transaction<PdC> {
|
||||
impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
|
||||
pub(crate) fn new(
|
||||
timestamp: Timestamp,
|
||||
rpc: Arc<PdC>,
|
||||
options: TransactionOptions,
|
||||
) -> Transaction<PdC> {
|
||||
) -> Transaction<Cod, PdC> {
|
||||
let status = if options.read_only {
|
||||
TransactionStatus::ReadOnly
|
||||
} else {
|
||||
|
@ -102,6 +106,7 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
options,
|
||||
is_heartbeat_started: false,
|
||||
start_instant: std::time::Instant::now(),
|
||||
phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -134,7 +139,8 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
self.buffer
|
||||
.get_or_else(key, |key| async move {
|
||||
let request = new_get_request(key, timestamp);
|
||||
let plan = PlanBuilder::new(rpc, request)
|
||||
let encoded_req = EncodedRequest::new(request, rpc.get_codec());
|
||||
let plan = PlanBuilder::new(rpc, encoded_req)
|
||||
.resolve_lock(retry_options.lock_backoff)
|
||||
.retry_multi_region(DEFAULT_REGION_BACKOFF)
|
||||
.merge(CollectSingle)
|
||||
|
@ -264,7 +270,8 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
self.buffer
|
||||
.batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| async move {
|
||||
let request = new_batch_get_request(keys, timestamp);
|
||||
let plan = PlanBuilder::new(rpc, request)
|
||||
let encoded_req = EncodedRequest::new(request, rpc.get_codec());
|
||||
let plan = PlanBuilder::new(rpc, encoded_req)
|
||||
.resolve_lock(retry_options.lock_backoff)
|
||||
.retry_multi_region(retry_options.region_backoff)
|
||||
.merge(Collect)
|
||||
|
@ -691,7 +698,8 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
primary_key,
|
||||
self.start_instant.elapsed().as_millis() as u64 + MAX_TTL,
|
||||
);
|
||||
let plan = PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.resolve_lock(self.options.retry_options.lock_backoff.clone())
|
||||
.retry_multi_region(self.options.retry_options.region_backoff.clone())
|
||||
.merge(CollectSingle)
|
||||
|
@ -721,7 +729,8 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
move |new_range, new_limit| async move {
|
||||
let request =
|
||||
new_scan_request(new_range, timestamp, new_limit, key_only, reverse);
|
||||
let plan = PlanBuilder::new(rpc, request)
|
||||
let encoded_req = EncodedRequest::new(request, rpc.get_codec());
|
||||
let plan = PlanBuilder::new(rpc, encoded_req)
|
||||
.resolve_lock(retry_options.lock_backoff)
|
||||
.retry_multi_region(retry_options.region_backoff)
|
||||
.merge(Collect)
|
||||
|
@ -776,7 +785,8 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
for_update_ts.clone(),
|
||||
need_value,
|
||||
);
|
||||
let plan = PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.resolve_lock(self.options.retry_options.lock_backoff.clone())
|
||||
.preserve_shard()
|
||||
.retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone())
|
||||
|
@ -830,7 +840,8 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
start_version,
|
||||
for_update_ts,
|
||||
);
|
||||
let plan = PlanBuilder::new(self.rpc.clone(), req)
|
||||
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
|
||||
let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.resolve_lock(self.options.retry_options.lock_backoff.clone())
|
||||
.retry_multi_region(self.options.retry_options.region_backoff.clone())
|
||||
.extract_error()
|
||||
|
@ -900,7 +911,8 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
primary_key.clone(),
|
||||
start_instant.elapsed().as_millis() as u64 + MAX_TTL,
|
||||
);
|
||||
let plan = PlanBuilder::new(rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, rpc.get_codec());
|
||||
let plan = PlanBuilder::new(rpc.clone(), encoded_req)
|
||||
.retry_multi_region(region_backoff.clone())
|
||||
.merge(CollectSingle)
|
||||
.plan();
|
||||
|
@ -917,7 +929,7 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<PdC: PdClient> Drop for Transaction<PdC> {
|
||||
impl<Cod: Codec, PdC: PdClient> Drop for Transaction<Cod, PdC> {
|
||||
fn drop(&mut self) {
|
||||
debug!("dropping transaction");
|
||||
if std::thread::panicking() {
|
||||
|
@ -1209,7 +1221,8 @@ impl<PdC: PdClient> Committer<PdC> {
|
|||
.collect();
|
||||
// FIXME set max_commit_ts and min_commit_ts
|
||||
|
||||
let plan = PlanBuilder::new(self.rpc.clone(), request)
|
||||
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
|
||||
let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.resolve_lock(self.options.retry_options.lock_backoff.clone())
|
||||
.retry_multi_region(self.options.retry_options.region_backoff.clone())
|
||||
.merge(CollectError)
|
||||
|
@ -1249,7 +1262,8 @@ impl<PdC: PdClient> Committer<PdC> {
|
|||
self.start_version.clone(),
|
||||
commit_version.clone(),
|
||||
);
|
||||
let plan = PlanBuilder::new(self.rpc.clone(), req)
|
||||
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
|
||||
let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
|
||||
.resolve_lock(self.options.retry_options.lock_backoff.clone())
|
||||
.retry_multi_region(self.options.retry_options.region_backoff.clone())
|
||||
.extract_error()
|
||||
|
@ -1313,7 +1327,8 @@ impl<PdC: PdClient> Committer<PdC> {
|
|||
.filter(|key| &primary_key != key);
|
||||
new_commit_request(keys, self.start_version, commit_version)
|
||||
};
|
||||
let plan = PlanBuilder::new(self.rpc, req)
|
||||
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
|
||||
let plan = PlanBuilder::new(self.rpc, encoded_req)
|
||||
.resolve_lock(self.options.retry_options.lock_backoff)
|
||||
.retry_multi_region(self.options.retry_options.region_backoff)
|
||||
.extract_error()
|
||||
|
@ -1334,7 +1349,8 @@ impl<PdC: PdClient> Committer<PdC> {
|
|||
match self.options.kind {
|
||||
TransactionKind::Optimistic => {
|
||||
let req = new_batch_rollback_request(keys, self.start_version);
|
||||
let plan = PlanBuilder::new(self.rpc, req)
|
||||
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
|
||||
let plan = PlanBuilder::new(self.rpc, encoded_req)
|
||||
.resolve_lock(self.options.retry_options.lock_backoff)
|
||||
.retry_multi_region(self.options.retry_options.region_backoff)
|
||||
.extract_error()
|
||||
|
@ -1343,7 +1359,8 @@ impl<PdC: PdClient> Committer<PdC> {
|
|||
}
|
||||
TransactionKind::Pessimistic(for_update_ts) => {
|
||||
let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts);
|
||||
let plan = PlanBuilder::new(self.rpc, req)
|
||||
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
|
||||
let plan = PlanBuilder::new(self.rpc, encoded_req)
|
||||
.resolve_lock(self.options.retry_options.lock_backoff)
|
||||
.retry_multi_region(self.options.retry_options.region_backoff)
|
||||
.extract_error()
|
||||
|
|
Loading…
Reference in New Issue