Merge pull request #172 from ekexium/rename

Make raw and transactional clients have similar API
This commit is contained in:
ekexium 2020-09-23 15:38:51 +08:00 committed by GitHub
commit 899b5267b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 18 deletions

View File

@ -9,7 +9,7 @@ async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>
let mut txn = client.begin().await.expect("Could not begin a transaction");
for pair in pairs {
let (key, value) = pair.into().into();
txn.set(key, value).await.expect("Could not set key value");
txn.put(key, value).await.expect("Could not set key value");
}
txn.commit().await.expect("Could not commit transaction");
}
@ -21,7 +21,7 @@ async fn get(client: &Client, key: Key) -> Option<Value> {
async fn scan(client: &Client, range: impl Into<BoundRange>, limit: u32) {
let mut txn = client.begin().await.expect("Could not begin a transaction");
txn.scan(range, limit, false)
txn.scan(range, limit)
.await
.expect("Could not scan key-value pairs in range")
.for_each(|pair| println!("{:?}", pair));

View File

@ -13,6 +13,7 @@ pub struct Client {
/// The thread pool for background tasks including committing secondary keys and failed
/// transaction cleanups.
bg_worker: ThreadPool,
key_only: bool,
}
impl Client {
@ -30,7 +31,19 @@ impl Client {
// TODO: PdRpcClient::connect currently uses a blocking implementation.
// Make it asynchronous later.
let pd = Arc::new(PdRpcClient::connect(&config).await?);
Ok(Client { pd, bg_worker })
Ok(Client {
pd,
bg_worker,
key_only: false,
})
}
pub fn with_key_only(&self, key_only: bool) -> Client {
Client {
pd: self.pd.clone(),
bg_worker: self.bg_worker.clone(),
key_only,
}
}
/// Creates a new [`Transaction`](Transaction).
@ -73,6 +86,11 @@ impl Client {
}
fn new_transaction(&self, timestamp: Timestamp) -> Transaction {
Transaction::new(timestamp, self.bg_worker.clone(), self.pd.clone())
Transaction::new(
timestamp,
self.bg_worker.clone(),
self.pd.clone(),
self.key_only,
)
}
}

View File

@ -32,9 +32,8 @@ impl Snapshot {
&self,
range: impl Into<BoundRange>,
limit: u32,
key_only: bool,
) -> Result<impl Iterator<Item = KvPair>> {
self.transaction.scan(range, limit, key_only).await
self.transaction.scan(range, limit).await
}
pub fn scan_reverse(&self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {

View File

@ -33,6 +33,7 @@ pub struct Transaction {
buffer: Buffer,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
key_only: bool,
}
impl Transaction {
@ -40,12 +41,14 @@ impl Transaction {
timestamp: Timestamp,
bg_worker: ThreadPool,
rpc: Arc<PdRpcClient>,
key_only: bool,
) -> Transaction {
Transaction {
timestamp,
buffer: Default::default(),
bg_worker,
rpc,
key_only,
}
}
@ -116,7 +119,7 @@ impl Transaction {
/// let key1: Key = b"TiKV".to_vec().into();
/// let key2: Key = b"TiDB".to_vec().into();
/// let result: Vec<KvPair> = txn
/// .scan(key1..key2, 10, false)
/// .scan(key1..key2, 10)
/// .await
/// .unwrap()
/// .collect();
@ -128,11 +131,11 @@ impl Transaction {
&self,
range: impl Into<BoundRange>,
limit: u32,
key_only: bool,
) -> Result<impl Iterator<Item = KvPair>> {
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let key_only = self.key_only;
self.buffer
.scan_and_fetch(range.into(), limit, move |new_range, new_limit| {
new_mvcc_scan_request(new_range, timestamp, new_limit, key_only).execute(rpc)
@ -154,12 +157,12 @@ impl Transaction {
/// let mut txn = client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let val = "TiKV".to_owned();
/// txn.set(key, val);
/// txn.put(key, val);
/// // Finish the transaction...
/// txn.commit().await.unwrap();
/// # });
/// ```
pub async fn set(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.buffer.put(key.into(), value.into());
Ok(())
}

View File

@ -35,11 +35,11 @@ async fn delete_all_raw() -> Fallible<()> {
async fn delete_all_txn() -> Fallible<()> {
let config = Config::new(pd_addrs());
let txn_client = TransactionClient::new(config).await?;
let txn_client = TransactionClient::new(config).await?.with_key_only(true);
let mut txn = txn_client.begin().await?;
loop {
let mut keys = txn.scan(vec![].., SCAN_BATCH_SIZE, true).await?.peekable();
let mut keys = txn.scan(vec![].., SCAN_BATCH_SIZE).await?.peekable();
if keys.peek().is_none() {
break;
@ -91,8 +91,8 @@ async fn crud() -> Fallible<()> {
0
);
txn.set("foo".to_owned(), "bar".to_owned()).await?;
txn.set("bar".to_owned(), "foo".to_owned()).await?;
txn.put("foo".to_owned(), "bar".to_owned()).await?;
txn.put("bar".to_owned(), "foo".to_owned()).await?;
// Read buffered values
assert_eq!(
txn.get("foo".to_owned()).await?,
@ -132,7 +132,7 @@ async fn crud() -> Fallible<()> {
batch_get_res.get(&Key::from("bar".to_owned())),
Some(Value::from("foo".to_owned())).as_ref()
);
txn.set("foo".to_owned(), "foo".to_owned()).await?;
txn.put("foo".to_owned(), "foo".to_owned()).await?;
txn.delete("bar".to_owned()).await?;
txn.commit().await?;
@ -177,6 +177,9 @@ async fn raw_bank_transfer() -> Fallible<()> {
let mut alice_balance = get_u32(&client, alice.clone()).await?;
let bob = chosen_people[1];
let mut bob_balance = get_u32(&client, bob.clone()).await?;
if alice_balance == 0 {
continue;
}
let transfer = rng.gen_range(0, alice_balance);
alice_balance -= transfer;
bob_balance += transfer;
@ -211,7 +214,7 @@ async fn txn_bank_transfer() -> Fallible<()> {
for person in &people {
let init = rng.gen::<u8>() as u32;
sum += init as u32;
txn.set(person.clone(), init.to_be_bytes().to_vec()).await?;
txn.put(person.clone(), init.to_be_bytes().to_vec()).await?;
}
txn.commit().await?;
@ -223,12 +226,15 @@ async fn txn_bank_transfer() -> Fallible<()> {
let mut alice_balance = get_txn_u32(&txn, alice.clone()).await?;
let bob = chosen_people[1];
let mut bob_balance = get_txn_u32(&txn, bob.clone()).await?;
if alice_balance == 0 {
continue;
}
let transfer = rng.gen_range(0, alice_balance);
alice_balance -= transfer;
bob_balance += transfer;
txn.set(alice.clone(), alice_balance.to_be_bytes().to_vec())
txn.put(alice.clone(), alice_balance.to_be_bytes().to_vec())
.await?;
txn.set(bob.clone(), bob_balance.to_be_bytes().to_vec())
txn.put(bob.clone(), bob_balance.to_be_bytes().to_vec())
.await?;
txn.commit().await?;
}