Some renaming (and adds one method)

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2020-12-22 08:39:39 +13:00
parent 6c3b9a74f2
commit b46022b7db
6 changed files with 74 additions and 42 deletions

View File

@ -28,7 +28,10 @@ async fn main() {
let value1: Value = b"value1".to_vec();
let key2: Key = b"key2".to_vec().into();
let value2: Value = b"value2".to_vec();
let mut txn0 = client.begin().await.expect("Could not begin a transaction");
let mut txn0 = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
for (key, value) in vec![(key1, value1), (key2, value2)] {
txn0.put(key, value).await.expect("Could not set key value");
}
@ -47,7 +50,10 @@ async fn main() {
println!("{:?}", (&key1, value));
{
// another txn cannot write to the locked key
let mut txn2 = client.begin().await.expect("Could not begin a transaction");
let mut txn2 = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
let key1: Key = b"key1".to_vec().into();
let value2: Value = b"value2".to_vec();
txn2.put(key1, value2).await.unwrap();
@ -58,7 +64,10 @@ async fn main() {
let value3: Value = b"value3".to_vec();
txn1.put(key1.clone(), value3).await.unwrap();
txn1.commit().await.unwrap();
let mut txn3 = client.begin().await.expect("Could not begin a transaction");
let mut txn3 = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
let result = txn3.get(key1.clone()).await.unwrap().unwrap();
txn3.commit()
.await

View File

@ -6,7 +6,10 @@ use crate::common::parse_args;
use tikv_client::{BoundRange, Config, Key, KvPair, TransactionClient as Client, Value};
async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
let mut txn = client.begin().await.expect("Could not begin a transaction");
let mut txn = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
for pair in pairs {
let (key, value) = pair.into().into();
txn.put(key, value).await.expect("Could not set key value");
@ -15,7 +18,10 @@ async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>
}
async fn get(client: &Client, key: Key) -> Option<Value> {
let mut txn = client.begin().await.expect("Could not begin a transaction");
let mut txn = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
let res = txn.get(key).await.expect("Could not get value");
txn.commit()
.await
@ -24,7 +30,10 @@ async fn get(client: &Client, key: Key) -> Option<Value> {
}
async fn scan(client: &Client, range: impl Into<BoundRange>, limit: u32) {
let mut txn = client.begin().await.expect("Could not begin a transaction");
let mut txn = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
txn.scan(range, limit)
.await
.expect("Could not scan key-value pairs in range")
@ -33,7 +42,10 @@ async fn scan(client: &Client, range: impl Into<BoundRange>, limit: u32) {
}
async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
let mut txn = client.begin().await.expect("Could not begin a transaction");
let mut txn = client
.begin_optimistic()
.await
.expect("Could not begin a transaction");
for key in keys {
txn.delete(key).await.expect("Could not delete the key");
}

View File

@ -6,7 +6,7 @@ use crate::{
pd::{PdClient, PdRpcClient},
request::{KvRequest, OPTIMISTIC_BACKOFF},
timestamp::TimestampExt,
transaction::{Snapshot, Transaction, TransactionStyle},
transaction::{Snapshot, Transaction, TransactionOptions},
Result,
};
use futures::executor::ThreadPool;
@ -99,17 +99,17 @@ impl Client {
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut transaction = client.begin().await.unwrap();
/// let mut transaction = client.begin_optimistic().await.unwrap();
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result = commit.await.unwrap();
/// # });
/// ```
pub async fn begin(&self) -> Result<Transaction> {
pub async fn begin_optimistic(&self) -> Result<Transaction> {
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(
timestamp,
TransactionStyle::new_optimistic(self.config.try_one_pc),
TransactionOptions::new_optimistic(self.config.try_one_pc),
false,
))
}
@ -135,16 +135,21 @@ impl Client {
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(
timestamp,
TransactionStyle::new_pessimistic(self.config.try_one_pc),
TransactionOptions::new_pessimistic(self.config.try_one_pc),
false,
))
}
pub async fn begin_with_options(&self, opts: TransactionOptions) -> Result<Transaction> {
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, opts, false))
}
/// Creates a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp).
pub fn snapshot(&self, timestamp: Timestamp) -> Snapshot {
Snapshot::new(self.new_transaction(
timestamp,
TransactionStyle::new_optimistic(self.config.try_one_pc),
TransactionOptions::new_optimistic(self.config.try_one_pc),
true,
))
}
@ -212,7 +217,7 @@ impl Client {
fn new_transaction(
&self,
timestamp: Timestamp,
style: TransactionStyle,
style: TransactionOptions,
read_only: bool,
) -> Transaction {
Transaction::new(

View File

@ -12,7 +12,7 @@ pub use client::Client;
pub(crate) use lock::{resolve_locks, HasLocks};
pub use snapshot::Snapshot;
pub use transaction::Transaction;
use transaction::TransactionStyle;
use transaction::TransactionOptions;
mod buffer;
mod client;

View File

@ -35,31 +35,37 @@ pub enum TransactionKind {
Pessimistic(u64),
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct TransactionStyle {
impl Default for TransactionKind {
fn default() -> TransactionKind {
TransactionKind::Pessimistic(0)
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug, Default)]
pub struct TransactionOptions {
kind: TransactionKind,
try_one_pc: bool,
async_commit: bool,
}
impl TransactionStyle {
pub fn new_optimistic(try_one_pc: bool) -> TransactionStyle {
TransactionStyle {
impl TransactionOptions {
pub fn new_optimistic(try_one_pc: bool) -> TransactionOptions {
TransactionOptions {
kind: TransactionKind::Optimistic,
try_one_pc,
async_commit: false,
}
}
pub fn new_pessimistic(try_one_pc: bool) -> TransactionStyle {
TransactionStyle {
pub fn new_pessimistic(try_one_pc: bool) -> TransactionOptions {
TransactionOptions {
kind: TransactionKind::Pessimistic(0),
try_one_pc,
async_commit: false,
}
}
pub fn async_commit(mut self) -> TransactionStyle {
pub fn async_commit(mut self) -> TransactionOptions {
self.async_commit = true;
self
}
@ -101,7 +107,7 @@ impl TransactionStyle {
/// use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let txn = client.begin().await.unwrap();
/// let txn = client.begin_optimistic().await.unwrap();
/// # });
/// ```
pub struct Transaction {
@ -110,7 +116,7 @@ pub struct Transaction {
buffer: Buffer,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
style: TransactionStyle,
style: TransactionOptions,
}
impl Transaction {
@ -118,7 +124,7 @@ impl Transaction {
timestamp: Timestamp,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
style: TransactionStyle,
style: TransactionOptions,
read_only: bool,
) -> Transaction {
let status = if read_only {
@ -149,7 +155,7 @@ impl Transaction {
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key = "TiKV".to_owned();
/// let result: Option<Value> = txn.get(key).await.unwrap();
/// // Finish the transaction...
@ -216,7 +222,7 @@ impl Transaction {
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let result: HashMap<Key, Value> = txn
/// .batch_get(keys)
@ -297,7 +303,7 @@ impl Transaction {
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key1: Key = b"TiKV".to_vec().into();
/// let key2: Key = b"TiDB".to_vec().into();
/// let result: Vec<KvPair> = txn
@ -331,7 +337,7 @@ impl Transaction {
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key1: Key = b"TiKV".to_vec().into();
/// let key2: Key = b"TiDB".to_vec().into();
/// let result: Vec<Key> = txn
@ -369,7 +375,7 @@ impl Transaction {
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key = "TiKV".to_owned();
/// let val = "TiKV".to_owned();
/// txn.put(key, val);
@ -397,7 +403,7 @@ impl Transaction {
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key = "TiKV".to_owned();
/// txn.delete(key);
/// // Finish the transaction...
@ -429,7 +435,7 @@ impl Transaction {
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
/// // ... Do some actions.
/// txn.commit().await.unwrap();
@ -451,7 +457,7 @@ impl Transaction {
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut txn = client.begin().await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// // ... Do some actions.
/// let req = txn.commit();
/// let result: u64 = req.await.unwrap();
@ -600,7 +606,7 @@ struct TwoPhaseCommitter {
start_version: u64,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
style: TransactionStyle,
style: TransactionOptions,
#[new(default)]
undetermined: bool,
}

View File

@ -55,7 +55,7 @@ async fn crud() -> Result<()> {
clear_tikv().await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
// Get non-existent keys
assert!(txn.get("foo".to_owned()).await?.is_none());
@ -91,7 +91,7 @@ async fn crud() -> Result<()> {
txn.commit().await?;
// Read from TiKV then update and delete
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
assert_eq!(
txn.get("foo".to_owned()).await?,
Some("bar".to_owned().into())
@ -199,13 +199,13 @@ async fn txn_write_million() -> Result<()> {
.map(|u| u.to_be_bytes().to_vec())
.take(2usize.pow(NUM_BITS_KEY_PER_TXN))
.collect::<Vec<_>>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
for (k, v) in keys.iter().zip(iter::repeat(1u32.to_be_bytes().to_vec())) {
txn.put(k.clone(), v).await?;
}
txn.commit().await?;
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
let res = txn.batch_get(keys).await?;
assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
txn.commit().await?;
@ -258,7 +258,7 @@ async fn txn_bank_transfer() -> Result<()> {
let mut rng = thread_rng();
let people = gen_u32_keys(NUM_PEOPLE, &mut rng);
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
let mut sum: u32 = 0;
for person in &people {
let init = rng.gen::<u8>() as u32;
@ -269,7 +269,7 @@ async fn txn_bank_transfer() -> Result<()> {
// transfer
for _ in 0..NUM_TRNASFER {
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
txn.use_async_commit();
let chosen_people = people.iter().choose_multiple(&mut rng, 2);
let alice = chosen_people[0];
@ -291,7 +291,7 @@ async fn txn_bank_transfer() -> Result<()> {
// check
let mut new_sum = 0;
let mut txn = client.begin().await?;
let mut txn = client.begin_optimistic().await?;
for person in people.iter() {
new_sum += get_txn_u32(&txn, person.clone()).await?;
}