Merge pull request #228 from andylokandy/not_founds

Distinguish not existing key and existing key with empty value in get_for_update
This commit is contained in:
Nick Cameron 2021-02-04 15:26:54 +13:00 committed by GitHub
commit c59911a211
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 6 deletions

View File

@ -521,7 +521,7 @@ impl KvRequest for kvrpcpb::PessimisticLockRequest {
// We need to pair keys and values returned somewhere.
// But it's blocked by the structure of the program that `map_result` only accepts the response as input
// Before we fix this `batch_get_for_update` is problematic.
type Result = Vec<Vec<u8>>;
type Result = Vec<Option<Vec<u8>>>;
type RpcResponse = kvrpcpb::PessimisticLockResponse;
type KeyData = Vec<kvrpcpb::Mutation>;
@ -551,7 +551,24 @@ impl KvRequest for kvrpcpb::PessimisticLockRequest {
}
fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
resp.take_values()
let values = resp.take_values();
let not_founds = resp.take_not_founds();
if not_founds.is_empty() {
// Legacy TiKV does not distiguish not existing key and existing key
// that with empty value. We assume that key does not exist if value
// is emtpy.
values
.into_iter()
.map(|v| if v.is_empty() { None } else { Some(v) })
.collect()
} else {
assert_eq!(values.len(), not_founds.len());
values
.into_iter()
.zip(not_founds.into_iter())
.map(|(v, not_found)| if not_found { None } else { Some(v) })
.collect()
}
}
async fn reduce(results: BoxStream<'static, Result<Self::Result>>) -> Result<Self::Result> {

View File

@ -128,13 +128,13 @@ impl Transaction {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_pessimistic().await.unwrap();
/// let key = "TiKV".to_owned();
/// let result: Value = txn.get_for_update(key).await.unwrap();
/// let result: Value = txn.get_for_update(key).await.unwrap().unwrap();
/// // now the key "TiKV" is locked, other transactions cannot modify it
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
/// ```
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Value> {
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
self.check_allow_operation()?;
if !self.is_pessimistic() {
Err(Error::InvalidTransactionType)
@ -142,7 +142,7 @@ impl Transaction {
let key = key.into();
let mut values = self.pessimistic_lock(iter::once(key.clone()), true).await?;
assert!(values.len() == 1);
Ok(values.swap_remove(0))
Ok(values.pop().unwrap())
}
}
@ -545,7 +545,7 @@ impl Transaction {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
need_value: bool,
) -> Result<Vec<Vec<u8>>> {
) -> Result<Vec<Option<Vec<u8>>>> {
assert!(
matches!(self.options.kind, TransactionKind::Pessimistic(_)),
"`pessimistic_lock` is only valid to use with pessimistic transactions"

View File

@ -160,6 +160,10 @@ message PessimisticLockResponse {
// The values is set if 'return_values' is true in the request and no error.
// If 'force' is true, this field is not used.
repeated bytes values = 5;
// Indicates whether the values at the same index is correspond to an existing key.
// In legacy TiKV, this field is not used even 'force' is false. In that case, an empty value indicates
// two possible situations: (1) the key does not exist. (2) the key exists but the value is empty.
repeated bool not_founds = 6;
}
// Unlock keys locked using `PessimisticLockRequest`.