mirror of https://github.com/tikv/client-rust.git
Merge remote-tracking branch 'upstream/master' into atomic
Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
commit
6e02f61a2f
|
@ -1 +0,0 @@
|
|||
nightly-2021-03-15
|
|
@ -3,7 +3,7 @@
|
|||
use tikv_client_common::Error;
|
||||
|
||||
use crate::{
|
||||
backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
|
||||
backoff::DEFAULT_REGION_BACKOFF,
|
||||
config::Config,
|
||||
pd::PdRpcClient,
|
||||
raw::lowering::*,
|
||||
|
@ -115,7 +115,6 @@ impl Client {
|
|||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
.single_region()
|
||||
.await?
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.retry_region(DEFAULT_REGION_BACKOFF)
|
||||
.post_process_default()
|
||||
.plan();
|
||||
|
@ -146,7 +145,6 @@ impl Client {
|
|||
) -> Result<Vec<KvPair>> {
|
||||
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.multi_region()
|
||||
.retry_region(DEFAULT_REGION_BACKOFF)
|
||||
.merge(Collect)
|
||||
|
@ -284,7 +282,6 @@ impl Client {
|
|||
let request =
|
||||
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.multi_region()
|
||||
.retry_region(DEFAULT_REGION_BACKOFF)
|
||||
.extract_error()
|
||||
|
@ -311,7 +308,6 @@ impl Client {
|
|||
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
|
||||
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.multi_region()
|
||||
.retry_region(DEFAULT_REGION_BACKOFF)
|
||||
.extract_error()
|
||||
|
@ -482,7 +478,6 @@ impl Client {
|
|||
|
||||
let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.multi_region()
|
||||
.retry_region(DEFAULT_REGION_BACKOFF)
|
||||
.merge(Collect)
|
||||
|
@ -514,7 +509,6 @@ impl Client {
|
|||
self.cf.clone(),
|
||||
);
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.multi_region()
|
||||
.retry_region(DEFAULT_REGION_BACKOFF)
|
||||
.merge(Collect)
|
||||
|
@ -532,7 +526,6 @@ impl Client {
|
|||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
.single_region()
|
||||
.await?
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.retry_region(DEFAULT_REGION_BACKOFF)
|
||||
.extract_error()
|
||||
.plan();
|
||||
|
@ -548,7 +541,6 @@ impl Client {
|
|||
let request =
|
||||
new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone(), atomic);
|
||||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.multi_region()
|
||||
.retry_region(DEFAULT_REGION_BACKOFF)
|
||||
.extract_error()
|
||||
|
@ -562,7 +554,6 @@ impl Client {
|
|||
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
|
||||
.single_region()
|
||||
.await?
|
||||
.resolve_lock(OPTIMISTIC_BACKOFF)
|
||||
.retry_region(DEFAULT_REGION_BACKOFF)
|
||||
.extract_error()
|
||||
.plan();
|
||||
|
|
|
@ -21,37 +21,38 @@ struct InnerBuffer {
|
|||
impl InnerBuffer {
|
||||
fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) {
|
||||
let key = key.into();
|
||||
if !matches!(entry, BufferEntry::Cached(_)) {
|
||||
if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
|
||||
self.primary_key.get_or_insert_with(|| key.clone());
|
||||
}
|
||||
self.entry_map.insert(key, entry);
|
||||
}
|
||||
|
||||
pub fn get_primary_key_or(&mut self, key: &Key) -> &Key {
|
||||
self.primary_key.get_or_insert(key.clone())
|
||||
/// Set the primary key if it is not set
|
||||
pub fn primary_key_or(&mut self, key: &Key) {
|
||||
self.primary_key.get_or_insert(key.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// A caching layer which buffers reads and writes in a transaction.
|
||||
pub struct Buffer {
|
||||
mutations: Mutex<InnerBuffer>,
|
||||
inner: Mutex<InnerBuffer>,
|
||||
}
|
||||
|
||||
impl Buffer {
|
||||
pub fn new(is_pessimistic: bool) -> Buffer {
|
||||
Buffer {
|
||||
mutations: Mutex::new(InnerBuffer::new(is_pessimistic)),
|
||||
inner: Mutex::new(InnerBuffer::new(is_pessimistic)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the primary key of the buffer.
|
||||
pub async fn get_primary_key(&self) -> Option<Key> {
|
||||
self.mutations.lock().await.primary_key.clone()
|
||||
self.inner.lock().await.primary_key.clone()
|
||||
}
|
||||
|
||||
/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
|
||||
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
|
||||
self.mutations.lock().await.get_primary_key_or(key).clone()
|
||||
/// Set the primary key if it is not set
|
||||
pub async fn primary_key_or(&self, key: &Key) {
|
||||
self.inner.lock().await.primary_key_or(key);
|
||||
}
|
||||
|
||||
/// Get a value from the buffer.
|
||||
|
@ -74,7 +75,7 @@ impl Buffer {
|
|||
MutationValue::Determined(value) => Ok(value),
|
||||
MutationValue::Undetermined => {
|
||||
let value = f(key.clone()).await?;
|
||||
let mut mutations = self.mutations.lock().await;
|
||||
let mut mutations = self.inner.lock().await;
|
||||
Self::update_cache(&mut mutations, key, value.clone());
|
||||
Ok(value)
|
||||
}
|
||||
|
@ -95,7 +96,7 @@ impl Buffer {
|
|||
Fut: Future<Output = Result<Vec<KvPair>>>,
|
||||
{
|
||||
let (cached_results, undetermined_keys) = {
|
||||
let mutations = self.mutations.lock().await;
|
||||
let mutations = self.inner.lock().await;
|
||||
// Partition the keys into those we have buffered and those we have to
|
||||
// get from the store.
|
||||
let (undetermined_keys, cached_results): (
|
||||
|
@ -121,7 +122,7 @@ impl Buffer {
|
|||
};
|
||||
|
||||
let fetched_results = f(Box::new(undetermined_keys)).await?;
|
||||
let mut mutations = self.mutations.lock().await;
|
||||
let mut mutations = self.inner.lock().await;
|
||||
for kvpair in &fetched_results {
|
||||
let key = kvpair.0.clone();
|
||||
let value = Some(kvpair.1.clone());
|
||||
|
@ -144,7 +145,7 @@ impl Buffer {
|
|||
Fut: Future<Output = Result<Vec<KvPair>>>,
|
||||
{
|
||||
// read from local buffer
|
||||
let mut mutations = self.mutations.lock().await;
|
||||
let mut mutations = self.inner.lock().await;
|
||||
let mutation_range = mutations.entry_map.range(range.clone());
|
||||
|
||||
// fetch from TiKV
|
||||
|
@ -190,8 +191,8 @@ impl Buffer {
|
|||
|
||||
/// Lock the given key if necessary.
|
||||
pub async fn lock(&self, key: Key) {
|
||||
let mutations = &mut self.mutations.lock().await;
|
||||
mutations.primary_key.get_or_insert(key.clone());
|
||||
let mutations = &mut self.inner.lock().await;
|
||||
mutations.primary_key.get_or_insert_with(|| key.clone());
|
||||
let value = mutations
|
||||
.entry_map
|
||||
.entry(key)
|
||||
|
@ -205,15 +206,12 @@ impl Buffer {
|
|||
|
||||
/// Insert a value into the buffer (does not write through).
|
||||
pub async fn put(&self, key: Key, value: Value) {
|
||||
self.mutations
|
||||
.lock()
|
||||
.await
|
||||
.insert(key, BufferEntry::Put(value));
|
||||
self.inner.lock().await.insert(key, BufferEntry::Put(value));
|
||||
}
|
||||
|
||||
/// Mark a value as Insert mutation into the buffer (does not write through).
|
||||
pub async fn insert(&self, key: Key, value: Value) {
|
||||
let mut mutations = self.mutations.lock().await;
|
||||
let mut mutations = self.inner.lock().await;
|
||||
let mut entry = mutations.entry_map.entry(key.clone());
|
||||
match entry {
|
||||
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
|
||||
|
@ -225,7 +223,7 @@ impl Buffer {
|
|||
|
||||
/// Mark a value as deleted.
|
||||
pub async fn delete(&self, key: Key) {
|
||||
let mut mutations = self.mutations.lock().await;
|
||||
let mut mutations = self.inner.lock().await;
|
||||
let is_pessimistic = mutations.is_pessimistic;
|
||||
let mut entry = mutations.entry_map.entry(key.clone());
|
||||
|
||||
|
@ -241,7 +239,7 @@ impl Buffer {
|
|||
|
||||
/// Converts the buffered mutations to the proto buffer version
|
||||
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
|
||||
self.mutations
|
||||
self.inner
|
||||
.lock()
|
||||
.await
|
||||
.entry_map
|
||||
|
@ -251,7 +249,7 @@ impl Buffer {
|
|||
}
|
||||
|
||||
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
|
||||
self.mutations
|
||||
self.inner
|
||||
.lock()
|
||||
.await
|
||||
.entry_map
|
||||
|
|
|
@ -116,18 +116,30 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
}
|
||||
|
||||
/// Create a `get for udpate` request.
|
||||
/// Once resolved this request will pessimistically lock and fetch the latest
|
||||
/// value associated with the given key at **current timestamp**.
|
||||
///
|
||||
/// The "current timestamp" (also called `for_update_ts` of the request) is fetched immediately from PD.
|
||||
/// The request reads and "locks" a key. It is similar to `SELECT ... FOR
|
||||
/// UPDATE` in TiDB, and has different behavior in optimistic and
|
||||
/// pessimistic transactions.
|
||||
///
|
||||
/// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
|
||||
/// which is similar to that in MySQL. It reads the latest value (using current timestamp),
|
||||
/// and the value is not cached in the local buffer.
|
||||
/// So normal `get`-like commands after `get_for_update` will not be influenced, they still read values at `start_ts`.
|
||||
/// # Optimistic transaction
|
||||
///
|
||||
/// It reads at the "start timestamp" and caches the value, just like normal
|
||||
/// get requests. The lock is written in prewrite and commit, so it cannot
|
||||
/// prevent concurrent transactions from writing the same key, but can only
|
||||
/// prevent itself from committing.
|
||||
///
|
||||
/// It can only be used in pessimistic mode.
|
||||
/// # Pessimistic transaction
|
||||
///
|
||||
/// It reads at the "current timestamp" and thus does not cache the value.
|
||||
/// So following read requests won't be affected by the `get_for_udpate`.
|
||||
/// A lock will be acquired immediately with this request, which prevents
|
||||
/// concurrent transactions from mutating the keys.
|
||||
///
|
||||
/// The "current timestamp" (also called `for_update_ts` of the request) is
|
||||
/// fetched immediately from the timestamp oracle.
|
||||
///
|
||||
/// Note: The behavior of the request under pessimistic transaction does not
|
||||
/// follow snapshot isolation.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust,no_run
|
||||
|
@ -146,7 +158,9 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
|
||||
self.check_allow_operation().await?;
|
||||
if !self.is_pessimistic() {
|
||||
Err(Error::InvalidTransactionType)
|
||||
let key = key.into();
|
||||
self.lock_keys(iter::once(key.clone())).await?;
|
||||
self.get(key).await
|
||||
} else {
|
||||
let mut pairs = self.pessimistic_lock(iter::once(key.into()), true).await?;
|
||||
debug_assert!(pairs.len() <= 1);
|
||||
|
@ -228,33 +242,25 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
|
||||
/// Create a new 'batch get for update' request.
|
||||
///
|
||||
/// Once resolved this request will pessimistically lock the keys and
|
||||
/// fetch the values associated with the given keys.
|
||||
/// Similar [`get_for_update`](Transaction::get_for_update), but it works
|
||||
/// for a batch of keys.
|
||||
///
|
||||
/// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
|
||||
/// which is similar to that in MySQL. It reads the latest value (using current timestamp),
|
||||
/// and the value is not cached in the local buffer.
|
||||
/// So normal `get`-like commands after `batch_get_for_update` will not be influenced, they still read values at `start_ts`.
|
||||
///
|
||||
/// Non-existent entries will not appear in the result. The order of the keys is not retained in the result.
|
||||
///
|
||||
/// It can only be used in pessimistic mode.
|
||||
/// Non-existent entries will not appear in the result. The order of the
|
||||
/// keys is not retained in the result.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust,no_run
|
||||
/// # use tikv_client::{Key, Value, Config, TransactionClient};
|
||||
/// # use tikv_client::{Key, Value, Config, TransactionClient, KvPair};
|
||||
/// # use futures::prelude::*;
|
||||
/// # use std::collections::HashMap;
|
||||
/// # futures::executor::block_on(async {
|
||||
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
|
||||
/// let mut txn = client.begin_pessimistic().await.unwrap();
|
||||
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
|
||||
/// let result: HashMap<Key, Value> = txn
|
||||
/// let result: Vec<KvPair> = txn
|
||||
/// .batch_get_for_update(keys)
|
||||
/// .await
|
||||
/// .unwrap()
|
||||
/// .map(|pair| (pair.0, pair.1))
|
||||
/// .collect();
|
||||
/// .unwrap();
|
||||
/// // now "TiKV" and "TiDB" are both locked
|
||||
/// // Finish the transaction...
|
||||
/// txn.commit().await.unwrap();
|
||||
|
@ -263,13 +269,15 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
pub async fn batch_get_for_update(
|
||||
&mut self,
|
||||
keys: impl IntoIterator<Item = impl Into<Key>>,
|
||||
) -> Result<impl Iterator<Item = KvPair>> {
|
||||
) -> Result<Vec<KvPair>> {
|
||||
self.check_allow_operation().await?;
|
||||
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
|
||||
if !self.is_pessimistic() {
|
||||
return Err(Error::InvalidTransactionType);
|
||||
self.lock_keys(keys.clone()).await?;
|
||||
Ok(self.batch_get(keys).await?.collect())
|
||||
} else {
|
||||
self.pessimistic_lock(keys, true).await
|
||||
}
|
||||
let keys: Vec<Key> = keys.into_iter().map(|it| it.into()).collect();
|
||||
Ok(self.pessimistic_lock(keys, true).await?.into_iter())
|
||||
}
|
||||
|
||||
/// Create a new 'scan' request.
|
||||
|
@ -473,8 +481,8 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
}
|
||||
}
|
||||
TransactionKind::Pessimistic(_) => {
|
||||
self.pessimistic_lock(keys.into_iter().map(|k| k.into()), false)
|
||||
.await?;
|
||||
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
|
||||
self.pessimistic_lock(keys.into_iter(), false).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
@ -649,7 +657,13 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
}
|
||||
|
||||
let first_key = keys[0].clone().key();
|
||||
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
|
||||
// we do not set the primary key here, because pessimistic lock request
|
||||
// can fail, in which case the keys may not be part of the transaction.
|
||||
let primary_lock = self
|
||||
.buffer
|
||||
.get_primary_key()
|
||||
.await
|
||||
.unwrap_or_else(|| first_key.clone());
|
||||
let for_update_ts = self.rpc.clone().get_timestamp().await?;
|
||||
self.options.push_for_update_ts(for_update_ts.clone());
|
||||
let request = new_pessimistic_lock_request(
|
||||
|
@ -669,6 +683,9 @@ impl<PdC: PdClient> Transaction<PdC> {
|
|||
.plan();
|
||||
let pairs = plan.execute().await;
|
||||
|
||||
// primary key will be set here if needed
|
||||
self.buffer.primary_key_or(&first_key).await;
|
||||
|
||||
self.start_auto_heartbeat().await;
|
||||
|
||||
for key in keys {
|
||||
|
|
|
@ -305,10 +305,7 @@ async fn txn_write_million() -> Result<()> {
|
|||
let res = txn.batch_get(keys.clone()).await?.collect::<Vec<_>>();
|
||||
assert_eq!(res.len(), keys.len());
|
||||
|
||||
let res = txn
|
||||
.batch_get_for_update(keys.clone())
|
||||
.await?
|
||||
.collect::<Vec<_>>();
|
||||
let res = txn.batch_get_for_update(keys.clone()).await?;
|
||||
assert_eq!(res.len(), keys.len());
|
||||
|
||||
txn.commit().await?;
|
||||
|
@ -686,7 +683,8 @@ async fn get_for_update() -> Result<()> {
|
|||
|
||||
let mut t1 = client.begin_pessimistic().await?;
|
||||
let mut t2 = client.begin_pessimistic().await?;
|
||||
|
||||
let mut t3 = client.begin_optimistic().await?;
|
||||
let mut t4 = client.begin_optimistic().await?;
|
||||
let mut t0 = client.begin_pessimistic().await?;
|
||||
t0.put(key1.clone(), value1).await?;
|
||||
t0.put(key2.clone(), value2).await?;
|
||||
|
@ -700,12 +698,19 @@ async fn get_for_update() -> Result<()> {
|
|||
let res: HashMap<_, _> = t2
|
||||
.batch_get_for_update(keys.clone())
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(From::from)
|
||||
.collect();
|
||||
t2.commit().await?;
|
||||
assert!(res.get(&key1.into()).unwrap() == &value1);
|
||||
assert!(res.get(&key1.clone().into()).unwrap() == &value1);
|
||||
assert!(res.get(&key2.into()).unwrap() == &value2);
|
||||
|
||||
assert!(t3.get_for_update(key1).await?.is_none());
|
||||
assert!(t3.commit().await.is_err());
|
||||
|
||||
assert!(t4.batch_get_for_update(keys).await?.len() == 0);
|
||||
assert!(t4.commit().await.is_err());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ message RawCoprocessorResponse {
|
|||
bytes data = 1;
|
||||
|
||||
errorpb.Error region_error = 2;
|
||||
|
||||
// Error message for cases like if no coprocessor with a matching name is found
|
||||
// or on a version mismatch between plugin_api and the coprocessor.
|
||||
string other_error = 4;
|
||||
|
|
|
@ -100,7 +100,7 @@ service Tikv {
|
|||
rpc EstablishMPPConnection(mpp.EstablishMPPConnectionRequest) returns (stream mpp.MPPDataPacket) {}
|
||||
|
||||
/// CheckLeader sends all information (includes region term and epoch) to other stores.
|
||||
/// Once a store receives a request, it checks term and epoch for each region, and sends the regions whose
|
||||
/// Once a store receives a request, it checks term and epoch for each region, and sends the regions whose
|
||||
/// term and epoch match with local information in the store.
|
||||
/// After the client collected all responses from all stores, it checks if got a quorum of responses from
|
||||
/// other stores for every region, and decides to advance resolved ts from these regions.
|
||||
|
|
Loading…
Reference in New Issue