Improve rustfmt config (#401)

Signed-off-by: Andy Lok <andylokandy@hotmail.com>
This commit is contained in:
Andy Lok 2023-07-09 17:07:13 +08:00 committed by GitHub
parent 0a1de19450
commit 797960edfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 960 additions and 594 deletions

View File

@ -1,8 +1,11 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use clap::{crate_version, App, Arg};
use std::path::PathBuf; use std::path::PathBuf;
use clap::crate_version;
use clap::App;
use clap::Arg;
pub struct CommandArgs { pub struct CommandArgs {
pub pd: Vec<String>, pub pd: Vec<String>,
pub ca: Option<PathBuf>, pub ca: Option<PathBuf>,

View File

@ -2,8 +2,13 @@
mod common; mod common;
use tikv_client::Config;
use tikv_client::Key;
use tikv_client::TransactionClient as Client;
use tikv_client::TransactionOptions;
use tikv_client::Value;
use crate::common::parse_args; use crate::common::parse_args;
use tikv_client::{Config, Key, TransactionClient as Client, TransactionOptions, Value};
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {

View File

@ -4,8 +4,15 @@
mod common; mod common;
use tikv_client::Config;
use tikv_client::IntoOwnedRange;
use tikv_client::Key;
use tikv_client::KvPair;
use tikv_client::RawClient as Client;
use tikv_client::Result;
use tikv_client::Value;
use crate::common::parse_args; use crate::common::parse_args;
use tikv_client::{Config, IntoOwnedRange, Key, KvPair, RawClient as Client, Result, Value};
const KEY: &str = "TiKV"; const KEY: &str = "TiKV";
const VALUE: &str = "Rust"; const VALUE: &str = "Rust";
@ -93,10 +100,10 @@ async fn main() -> Result<()> {
.expect("Could not scan"); .expect("Could not scan");
let keys: Vec<_> = pairs.into_iter().map(|p| p.key().clone()).collect(); let keys: Vec<_> = pairs.into_iter().map(|p| p.key().clone()).collect();
assert_eq!( assert_eq!(&keys, &[
&keys, Key::from("k1".to_owned()),
&[Key::from("k1".to_owned()), Key::from("k2".to_owned()),] Key::from("k2".to_owned()),
); ]);
println!("Scanning from {start:?} to {end:?} gives: {keys:?}"); println!("Scanning from {start:?} to {end:?} gives: {keys:?}");
let k1 = "k1"; let k1 = "k1";
@ -115,18 +122,15 @@ async fn main() -> Result<()> {
.into_iter() .into_iter()
.map(|p| String::from_utf8(p.1).unwrap()) .map(|p| String::from_utf8(p.1).unwrap())
.collect(); .collect();
assert_eq!( assert_eq!(&vals, &[
&vals, "v1".to_owned(),
&[ "v2".to_owned(),
"v1".to_owned(), "v2".to_owned(),
"v2".to_owned(), "v3".to_owned(),
"v2".to_owned(), "v1".to_owned(),
"v3".to_owned(), "v2".to_owned(),
"v1".to_owned(), "v3".to_owned()
"v2".to_owned(), ]);
"v3".to_owned()
]
);
println!("Scanning batch scan from {batch_scan_keys:?} gives: {vals:?}"); println!("Scanning batch scan from {batch_scan_keys:?} gives: {vals:?}");
// Cleanly exit. // Cleanly exit.

View File

@ -2,8 +2,14 @@
mod common; mod common;
use tikv_client::BoundRange;
use tikv_client::Config;
use tikv_client::Key;
use tikv_client::KvPair;
use tikv_client::TransactionClient as Client;
use tikv_client::Value;
use crate::common::parse_args; use crate::common::parse_args;
use tikv_client::{BoundRange, Config, Key, KvPair, TransactionClient as Client, Value};
async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) { async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
let mut txn = client let mut txn = client
@ -104,10 +110,10 @@ async fn main() {
let key1_exists = key_exists(&txn, key1.clone()).await; let key1_exists = key_exists(&txn, key1.clone()).await;
let key2: Key = b"key_not_exist".to_vec().into(); let key2: Key = b"key_not_exist".to_vec().into();
let key2_exists = key_exists(&txn, key2.clone()).await; let key2_exists = key_exists(&txn, key2.clone()).await;
println!( println!("check exists {:?}", vec![
"check exists {:?}", (key1, key1_exists),
vec![(key1, key1_exists), (key2, key2_exists)] (key2, key2_exists)
); ]);
// scan // scan
let key1: Key = b"key1".to_vec().into(); let key1: Key = b"key1".to_vec().into();

View File

@ -1,3 +1,10 @@
imports_granularity="Crate" edition = "2021"
version = "Two"
reorder_imports = true
imports_granularity = "Item"
group_imports = "StdExternalCrate"
where_single_line = true
trailing_comma = "Vertical"
overflow_delimited_expr = true
format_code_in_doc_comments = true format_code_in_doc_comments = true
edition = "2018" normalize_comments = true

View File

@ -2,9 +2,11 @@
// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ // https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
use rand::{thread_rng, Rng};
use std::time::Duration; use std::time::Duration;
use rand::thread_rng;
use rand::Rng;
pub const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); pub const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
pub const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); pub const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
pub const PESSIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); pub const PESSIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
@ -198,9 +200,10 @@ enum BackoffKind {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*;
use std::convert::TryInto; use std::convert::TryInto;
use super::*;
#[test] #[test]
fn test_no_jitter_backoff() { fn test_no_jitter_backoff() {
// Tests for zero attempts. // Tests for zero attempts.

View File

@ -3,13 +3,13 @@
//! This module contains utility types and functions for making the transition //! This module contains utility types and functions for making the transition
//! from futures 0.1 to 1.0 easier. //! from futures 0.1 to 1.0 easier.
use futures::{
prelude::*,
ready,
task::{Context, Poll},
};
use std::pin::Pin; use std::pin::Pin;
use futures::prelude::*;
use futures::ready;
use futures::task::Context;
use futures::task::Poll;
/// A future implementing a tail-recursive loop. /// A future implementing a tail-recursive loop.
/// ///
/// Created by the `loop_fn` function. /// Created by the `loop_fn` function.

View File

@ -1,7 +1,10 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use serde_derive::{Deserialize, Serialize}; use std::path::PathBuf;
use std::{path::PathBuf, time::Duration}; use std::time::Duration;
use serde_derive::Deserialize;
use serde_derive::Serialize;
/// The configuration for either a [`RawClient`](crate::RawClient) or a /// The configuration for either a [`RawClient`](crate::RawClient) or a
/// [`TransactionClient`](crate::TransactionClient). /// [`TransactionClient`](crate::TransactionClient).

View File

@ -1,12 +1,16 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use std::{ use std::borrow::Borrow;
borrow::Borrow, use std::cmp::Eq;
cmp::{Eq, PartialEq}, use std::cmp::PartialEq;
ops::{ use std::ops::Bound;
Bound, Range, RangeBounds, RangeFrom, RangeFull, RangeInclusive, RangeTo, RangeToInclusive, use std::ops::Range;
}, use std::ops::RangeBounds;
}; use std::ops::RangeFrom;
use std::ops::RangeFull;
use std::ops::RangeInclusive;
use std::ops::RangeTo;
use std::ops::RangeToInclusive;
#[cfg(test)] #[cfg(test)]
use proptest_derive::Arbitrary; use proptest_derive::Arbitrary;

View File

@ -1,4 +1,6 @@
use std::{io::Write, ptr}; use std::io::Write;
use std::ptr;
use tikv_client_common::internal_err; use tikv_client_common::internal_err;
use crate::Result; use crate::Result;
@ -153,26 +155,18 @@ pub mod test {
#[test] #[test]
fn test_enc_dec_bytes() { fn test_enc_dec_bytes() {
let pairs = vec![ let pairs = vec![
( (vec![], vec![0, 0, 0, 0, 0, 0, 0, 0, 247], vec![
vec![], 255, 255, 255, 255, 255, 255, 255, 255, 8,
vec![0, 0, 0, 0, 0, 0, 0, 0, 247], ]),
vec![255, 255, 255, 255, 255, 255, 255, 255, 8], (vec![0], vec![0, 0, 0, 0, 0, 0, 0, 0, 248], vec![
), 255, 255, 255, 255, 255, 255, 255, 255, 7,
( ]),
vec![0], (vec![1, 2, 3], vec![1, 2, 3, 0, 0, 0, 0, 0, 250], vec![
vec![0, 0, 0, 0, 0, 0, 0, 0, 248], 254, 253, 252, 255, 255, 255, 255, 255, 5,
vec![255, 255, 255, 255, 255, 255, 255, 255, 7], ]),
), (vec![1, 2, 3, 0], vec![1, 2, 3, 0, 0, 0, 0, 0, 251], vec![
( 254, 253, 252, 255, 255, 255, 255, 255, 4,
vec![1, 2, 3], ]),
vec![1, 2, 3, 0, 0, 0, 0, 0, 250],
vec![254, 253, 252, 255, 255, 255, 255, 255, 5],
),
(
vec![1, 2, 3, 0],
vec![1, 2, 3, 0, 0, 0, 0, 0, 251],
vec![254, 253, 252, 255, 255, 255, 255, 255, 4],
),
( (
vec![1, 2, 3, 4, 5, 6, 7], vec![1, 2, 3, 4, 5, 6, 7],
vec![1, 2, 3, 4, 5, 6, 7, 0, 254], vec![1, 2, 3, 4, 5, 6, 7, 0, 254],

View File

@ -1,15 +1,23 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use super::HexRepr; use std::fmt;
use crate::kv::codec::{self, BytesEncoder}; use std::ops::Bound;
use std::u8;
#[allow(unused_imports)] #[allow(unused_imports)]
#[cfg(test)] #[cfg(test)]
use proptest::{arbitrary::any_with, collection::size_range}; use proptest::arbitrary::any_with;
#[allow(unused_imports)]
#[cfg(test)]
use proptest::collection::size_range;
#[cfg(test)] #[cfg(test)]
use proptest_derive::Arbitrary; use proptest_derive::Arbitrary;
use std::{fmt, ops::Bound, u8};
use tikv_client_proto::kvrpcpb; use tikv_client_proto::kvrpcpb;
use super::HexRepr;
use crate::kv::codec::BytesEncoder;
use crate::kv::codec::{self};
const _PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB const _PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB
/// The key part of a key/value pair. /// The key part of a key/value pair.

View File

@ -1,11 +1,16 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use super::{HexRepr, Key, Value}; use std::fmt;
use std::str;
#[cfg(test)] #[cfg(test)]
use proptest_derive::Arbitrary; use proptest_derive::Arbitrary;
use std::{fmt, str};
use tikv_client_proto::kvrpcpb; use tikv_client_proto::kvrpcpb;
use super::HexRepr;
use super::Key;
use super::Value;
/// A key/value pair. /// A key/value pair.
/// ///
/// # Examples /// # Examples

View File

@ -1,5 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use std::{fmt, u8}; use std::fmt;
use std::u8;
mod bound_range; mod bound_range;
pub mod codec; pub mod codec;
@ -7,7 +8,8 @@ mod key;
mod kvpair; mod kvpair;
mod value; mod value;
pub use bound_range::{BoundRange, IntoOwnedRange}; pub use bound_range::BoundRange;
pub use bound_range::IntoOwnedRange;
pub use key::Key; pub use key::Key;
pub use kvpair::KvPair; pub use kvpair::KvPair;
pub use value::Value; pub use value::Value;

View File

@ -124,22 +124,48 @@ mod proptests;
extern crate slog; extern crate slog;
extern crate slog_term; extern crate slog_term;
#[doc(inline)]
pub use config::Config;
#[doc(inline)]
pub use tikv_client_common::security::SecurityManager;
#[doc(inline)]
pub use tikv_client_common::Error;
#[doc(inline)]
pub use tikv_client_common::Result;
#[doc(inline)] #[doc(inline)]
pub use crate::backoff::Backoff; pub use crate::backoff::Backoff;
#[doc(inline)] #[doc(inline)]
pub use crate::kv::{BoundRange, IntoOwnedRange, Key, KvPair, Value}; pub use crate::kv::BoundRange;
#[doc(inline)] #[doc(inline)]
pub use crate::raw::{lowering as raw_lowering, Client as RawClient, ColumnFamily}; pub use crate::kv::IntoOwnedRange;
#[doc(inline)]
pub use crate::kv::Key;
#[doc(inline)]
pub use crate::kv::KvPair;
#[doc(inline)]
pub use crate::kv::Value;
#[doc(inline)]
pub use crate::raw::lowering as raw_lowering;
#[doc(inline)]
pub use crate::raw::Client as RawClient;
#[doc(inline)]
pub use crate::raw::ColumnFamily;
#[doc(inline)] #[doc(inline)]
pub use crate::request::RetryOptions; pub use crate::request::RetryOptions;
#[doc(inline)] #[doc(inline)]
pub use crate::timestamp::{Timestamp, TimestampExt}; pub use crate::timestamp::Timestamp;
#[doc(inline)] #[doc(inline)]
pub use crate::transaction::{ pub use crate::timestamp::TimestampExt;
lowering as transaction_lowering, CheckLevel, Client as TransactionClient, Snapshot,
Transaction, TransactionOptions,
};
#[doc(inline)] #[doc(inline)]
pub use config::Config; pub use crate::transaction::lowering as transaction_lowering;
#[doc(inline)] #[doc(inline)]
pub use tikv_client_common::{security::SecurityManager, Error, Result}; pub use crate::transaction::CheckLevel;
#[doc(inline)]
pub use crate::transaction::Client as TransactionClient;
#[doc(inline)]
pub use crate::transaction::Snapshot;
#[doc(inline)]
pub use crate::transaction::Transaction;
#[doc(inline)]
pub use crate::transaction::TransactionOptions;

View File

@ -5,18 +5,30 @@
//! The goal is to be able to test functionality independently of the rest of //! The goal is to be able to test functionality independently of the rest of
//! the system, in particular without requiring a TiKV or PD server, or RPC layer. //! the system, in particular without requiring a TiKV or PD server, or RPC layer.
use crate::{ use std::any::Any;
pd::{PdClient, PdRpcClient, RetryClient}, use std::sync::Arc;
region::{RegionId, RegionWithLeader},
store::RegionStore,
Config, Error, Key, Result, Timestamp,
};
use async_trait::async_trait; use async_trait::async_trait;
use derive_new::new; use derive_new::new;
use slog::{Drain, Logger}; use slog::Drain;
use std::{any::Any, sync::Arc}; use slog::Logger;
use tikv_client_proto::metapb::{self, RegionEpoch}; use tikv_client_proto::metapb::RegionEpoch;
use tikv_client_store::{KvClient, KvConnect, Request}; use tikv_client_proto::metapb::{self};
use tikv_client_store::KvClient;
use tikv_client_store::KvConnect;
use tikv_client_store::Request;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::pd::RetryClient;
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::store::RegionStore;
use crate::Config;
use crate::Error;
use crate::Key;
use crate::Result;
use crate::Timestamp;
/// Create a `PdRpcClient` with it's internals replaced with mocks so that the /// Create a `PdRpcClient` with it's internals replaced with mocks so that the
/// client can be tested without doing any RPC calls. /// client can be tested without doing any RPC calls.
@ -56,9 +68,7 @@ pub struct MockKvClient {
impl MockKvClient { impl MockKvClient {
pub fn with_dispatch_hook<F>(dispatch: F) -> MockKvClient pub fn with_dispatch_hook<F>(dispatch: F) -> MockKvClient
where where F: Fn(&dyn Any) -> Result<Box<dyn Any>> + Send + Sync + 'static {
F: Fn(&dyn Any) -> Result<Box<dyn Any>> + Send + Sync + 'static,
{
MockKvClient { MockKvClient {
addr: String::new(), addr: String::new(),
dispatch: Some(Arc::new(dispatch)), dispatch: Some(Arc::new(dispatch)),

View File

@ -1,23 +1,36 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{ use std::collections::HashMap;
compat::stream_fn, use std::sync::Arc;
kv::codec,
pd::{retry::RetryClientTrait, RetryClient},
region::{RegionId, RegionVerId, RegionWithLeader},
region_cache::RegionCache,
store::RegionStore,
BoundRange, Config, Key, Result, SecurityManager, Timestamp,
};
use async_trait::async_trait; use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream}; use futures::prelude::*;
use futures::stream::BoxStream;
use slog::Logger; use slog::Logger;
use std::{collections::HashMap, sync::Arc};
use tikv_client_pd::Cluster; use tikv_client_pd::Cluster;
use tikv_client_proto::{kvrpcpb, metapb}; use tikv_client_proto::kvrpcpb;
use tikv_client_store::{KvClient, KvConnect, TikvConnect}; use tikv_client_proto::metapb;
use tikv_client_store::KvClient;
use tikv_client_store::KvConnect;
use tikv_client_store::TikvConnect;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::compat::stream_fn;
use crate::kv::codec;
use crate::pd::retry::RetryClientTrait;
use crate::pd::RetryClient;
use crate::region::RegionId;
use crate::region::RegionVerId;
use crate::region::RegionWithLeader;
use crate::region_cache::RegionCache;
use crate::store::RegionStore;
use crate::BoundRange;
use crate::Config;
use crate::Key;
use crate::Result;
use crate::SecurityManager;
use crate::Timestamp;
/// The PdClient handles all the encoding stuff. /// The PdClient handles all the encoding stuff.
/// ///
/// Raw APIs does not require encoding/decoding at all. /// Raw APIs does not require encoding/decoding at all.
@ -325,11 +338,12 @@ fn make_key_range(start_key: Vec<u8>, end_key: Vec<u8>) -> kvrpcpb::KeyRange {
#[cfg(test)] #[cfg(test)]
pub mod test { pub mod test {
use futures::executor;
use futures::executor::block_on;
use super::*; use super::*;
use crate::mock::*; use crate::mock::*;
use futures::{executor, executor::block_on};
#[tokio::test] #[tokio::test]
async fn test_kv_client_caching() { async fn test_kv_client_caching() {
let client = block_on(pd_rpc_client()); let client = block_on(pd_rpc_client());
@ -363,19 +377,16 @@ pub mod test {
let mut stream = executor::block_on_stream(stream); let mut stream = executor::block_on_stream(stream);
let result: Vec<Key> = stream.next().unwrap().unwrap().1; let result: Vec<Key> = stream.next().unwrap().unwrap().1;
assert_eq!( assert_eq!(result, vec![
result, vec![1].into(),
vec![ vec![2].into(),
vec![1].into(), vec![3].into(),
vec![2].into(), vec![5, 2].into()
vec![3].into(), ]);
vec![5, 2].into() assert_eq!(stream.next().unwrap().unwrap().1, vec![
] vec![12].into(),
); vec![11, 4].into()
assert_eq!( ]);
stream.next().unwrap().unwrap().1,
vec![vec![12].into(), vec![11, 4].into()]
);
assert!(stream.next().is_none()); assert!(stream.next().is_none());
} }
@ -417,13 +428,10 @@ pub mod test {
let ranges4 = stream.next().unwrap().unwrap(); let ranges4 = stream.next().unwrap().unwrap();
assert_eq!(ranges1.0.id(), 1); assert_eq!(ranges1.0.id(), 1);
assert_eq!( assert_eq!(ranges1.1, vec![
ranges1.1, make_key_range(k1.clone(), k2.clone()),
vec![ make_key_range(k1, k_split.clone()),
make_key_range(k1.clone(), k2.clone()), ]);
make_key_range(k1, k_split.clone()),
]
);
assert_eq!(ranges2.0.id(), 2); assert_eq!(ranges2.0.id(), 2);
assert_eq!(ranges2.1, vec![make_key_range(k_split.clone(), k3)]); assert_eq!(ranges2.1, vec![make_key_range(k_split.clone(), k3)]);
assert_eq!(ranges3.0.id(), 1); assert_eq!(ranges3.0.id(), 1);

View File

@ -1,5 +1,7 @@
mod client; mod client;
mod retry; mod retry;
pub use client::{PdClient, PdRpcClient}; pub use client::PdClient;
pub use retry::{RetryClient, RetryClientTrait}; pub use client::PdRpcClient;
pub use retry::RetryClient;
pub use retry::RetryClientTrait;

View File

@ -2,23 +2,27 @@
//! A utility module for managing and retrying PD requests. //! A utility module for managing and retrying PD requests.
use crate::{ use std::fmt;
region::{RegionId, RegionWithLeader, StoreId}, use std::sync::Arc;
stats::pd_stats, use std::time::Duration;
Error, Result, SecurityManager, use std::time::Instant;
};
use async_trait::async_trait; use async_trait::async_trait;
use std::{ use tikv_client_pd::Cluster;
fmt, use tikv_client_pd::Connection;
sync::Arc, use tikv_client_proto::metapb;
time::{Duration, Instant}, use tikv_client_proto::pdpb::Timestamp;
}; use tikv_client_proto::pdpb::{self};
use tikv_client_pd::{Cluster, Connection}; use tokio::sync::RwLock;
use tikv_client_proto::{ use tokio::time::sleep;
metapb,
pdpb::{self, Timestamp}, use crate::region::RegionId;
}; use crate::region::RegionWithLeader;
use tokio::{sync::RwLock, time::sleep}; use crate::region::StoreId;
use crate::stats::pd_stats;
use crate::Error;
use crate::Result;
use crate::SecurityManager;
// FIXME: these numbers and how they are used are all just cargo-culted in, there // FIXME: these numbers and how they are used are all just cargo-culted in, there
// may be more optimal values. // may be more optimal values.
@ -224,14 +228,16 @@ impl Reconnect for RetryClient<Cluster> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use std::sync::atomic::AtomicUsize;
use futures::{executor, future::ready}; use std::sync::atomic::Ordering;
use std::sync::{ use std::sync::Mutex;
atomic::{AtomicUsize, Ordering},
Mutex, use futures::executor;
}; use futures::future::ready;
use tikv_client_common::internal_err; use tikv_client_common::internal_err;
use super::*;
#[tokio::test(flavor = "multi_thread")] #[tokio::test(flavor = "multi_thread")]
async fn test_reconnect() { async fn test_reconnect() {
struct MockClient { struct MockClient {

View File

@ -4,29 +4,27 @@
// directory tests don't have access to `cfg(tests)` functions and we don't want to force // directory tests don't have access to `cfg(tests)` functions and we don't want to force
// users to depend on proptest or manually enable features to test. // users to depend on proptest or manually enable features to test.
/* // Temporarily disabled
* Temporarily disabled //
// use proptest::strategy::Strategy;
use proptest::strategy::Strategy; // use std::env::var;
use std::env::var; //
// mod raw;
mod raw; // pub(crate) const ENV_PD_ADDRS: &str = "PD_ADDRS";
pub(crate) const ENV_PD_ADDRS: &str = "PD_ADDRS"; // pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16;
pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16; //
// pub fn arb_batch<T: core::fmt::Debug>(
pub fn arb_batch<T: core::fmt::Debug>( // single_strategy: impl Strategy<Value = T>,
single_strategy: impl Strategy<Value = T>, // max_batch_size: impl Into<Option<usize>>,
max_batch_size: impl Into<Option<usize>>, // ) -> impl Strategy<Value = Vec<T>> {
) -> impl Strategy<Value = Vec<T>> { // let max_batch_size = max_batch_size.into().unwrap_or(PROPTEST_BATCH_SIZE_MAX);
let max_batch_size = max_batch_size.into().unwrap_or(PROPTEST_BATCH_SIZE_MAX); // proptest::collection::vec(single_strategy, 0..max_batch_size)
proptest::collection::vec(single_strategy, 0..max_batch_size) // }
} //
// pub fn pd_addrs() -> Vec<String> {
pub fn pd_addrs() -> Vec<String> { // var(ENV_PD_ADDRS)
var(ENV_PD_ADDRS) // .expect(&format!("Expected {}:", ENV_PD_ADDRS))
.expect(&format!("Expected {}:", ENV_PD_ADDRS)) // .split(",")
.split(",") // .map(From::from)
.map(From::from) // .collect()
.collect() // }
}
*/

View File

@ -1,20 +1,30 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use core::ops::Range; use core::ops::Range;
use std::{str::FromStr, sync::Arc, u32}; use std::str::FromStr;
use std::sync::Arc;
use std::u32;
use slog::{Drain, Logger}; use slog::Drain;
use slog::Logger;
use tikv_client_common::Error; use tikv_client_common::Error;
use tikv_client_proto::metapb; use tikv_client_proto::metapb;
use crate::{ use crate::backoff::DEFAULT_REGION_BACKOFF;
backoff::DEFAULT_REGION_BACKOFF, use crate::config::Config;
config::Config, use crate::pd::PdClient;
pd::{PdClient, PdRpcClient}, use crate::pd::PdRpcClient;
raw::lowering::*, use crate::raw::lowering::*;
request::{Collect, CollectSingle, Plan}, use crate::request::Collect;
Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value, use crate::request::CollectSingle;
}; use crate::request::Plan;
use crate::Backoff;
use crate::BoundRange;
use crate::ColumnFamily;
use crate::Key;
use crate::KvPair;
use crate::Result;
use crate::Value;
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
@ -762,14 +772,16 @@ impl<PdC: PdClient> Client<PdC> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use std::any::Any;
use crate::{ use std::sync::Arc;
mock::{MockKvClient, MockPdClient},
Result,
};
use std::{any::Any, sync::Arc};
use tikv_client_proto::kvrpcpb; use tikv_client_proto::kvrpcpb;
use super::*;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::Result;
#[tokio::test] #[tokio::test]
async fn test_raw_coprocessor() -> Result<()> { async fn test_raw_coprocessor() -> Result<()> {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
@ -813,26 +825,21 @@ mod tests {
.into_iter() .into_iter()
.map(|(data, ranges)| (String::from_utf8(data).unwrap(), ranges)) .map(|(data, ranges)| (String::from_utf8(data).unwrap(), ranges))
.collect(); .collect();
assert_eq!( assert_eq!(resps, vec![
resps, ("1:[Key(05)..Key(0A)]".to_string(), vec![
vec![ Key::from(vec![5])..Key::from(vec![10])
( ]),
"1:[Key(05)..Key(0A)]".to_string(), (
vec![Key::from(vec![5])..Key::from(vec![10])] "2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]".to_string(),
), vec![
( Key::from(vec![10])..Key::from(vec![15]),
"2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]".to_string(), Key::from(vec![20])..Key::from(vec![250, 250])
vec![ ]
Key::from(vec![10])..Key::from(vec![15]), ),
Key::from(vec![20])..Key::from(vec![250, 250]) ("3:[Key(FAFA)..Key()]".to_string(), vec![
] Key::from(vec![250, 250])..Key::from(vec![])
), ])
( ]);
"3:[Key(FAFA)..Key()]".to_string(),
vec![Key::from(vec![250, 250])..Key::from(vec![])]
)
]
);
Ok(()) Ok(())
} }
} }

View File

@ -4,11 +4,19 @@
//! types (i.e., the types from the client crate) and converts these to the types used in the //! types (i.e., the types from the client crate) and converts these to the types used in the
//! generated protobuf code, then calls the low-level ctor functions in the requests module. //! generated protobuf code, then calls the low-level ctor functions in the requests module.
use std::{iter::Iterator, ops::Range, sync::Arc}; use std::iter::Iterator;
use std::ops::Range;
use std::sync::Arc;
use tikv_client_proto::{kvrpcpb, metapb}; use tikv_client_proto::kvrpcpb;
use tikv_client_proto::metapb;
use crate::{raw::requests, BoundRange, ColumnFamily, Key, KvPair, Value}; use crate::raw::requests;
use crate::BoundRange;
use crate::ColumnFamily;
use crate::Key;
use crate::KvPair;
use crate::Value;
pub fn new_raw_get_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest { pub fn new_raw_get_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
requests::new_raw_get_request(key.into(), cf) requests::new_raw_get_request(key.into(), cf)

View File

@ -9,9 +9,11 @@
//! //!
//! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace. //! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace.
use std::convert::TryFrom;
use std::fmt;
pub use self::client::Client; pub use self::client::Client;
use crate::Error; use crate::Error;
use std::{convert::TryFrom, fmt};
mod client; mod client;
pub mod lowering; pub mod lowering;

View File

@ -1,26 +1,40 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use std::{any::Any, ops::Range, sync::Arc, time::Duration}; use std::any::Any;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use futures::stream::BoxStream; use futures::stream::BoxStream;
use tikv_client_proto::{kvrpcpb, metapb, tikvpb::tikv_client::TikvClient}; use tikv_client_proto::kvrpcpb;
use tikv_client_proto::metapb;
use tikv_client_proto::tikvpb::tikv_client::TikvClient;
use tikv_client_store::Request; use tikv_client_store::Request;
use tonic::transport::Channel; use tonic::transport::Channel;
use super::RawRpcRequest; use super::RawRpcRequest;
use crate::{ use crate::collect_first;
collect_first, use crate::pd::PdClient;
pd::PdClient, use crate::request::plan::ResponseWithShard;
request::{ use crate::request::Collect;
plan::ResponseWithShard, Collect, CollectSingle, DefaultProcessor, KvRequest, Merge, use crate::request::CollectSingle;
Process, Shardable, SingleKey, use crate::request::DefaultProcessor;
}, use crate::request::KvRequest;
store::{store_stream_for_keys, store_stream_for_ranges, RegionStore}, use crate::request::Merge;
transaction::HasLocks, use crate::request::Process;
util::iter::FlatMapOkIterExt, use crate::request::Shardable;
ColumnFamily, Key, KvPair, Result, Value, use crate::request::SingleKey;
}; use crate::store::store_stream_for_keys;
use crate::store::store_stream_for_ranges;
use crate::store::RegionStore;
use crate::transaction::HasLocks;
use crate::util::iter::FlatMapOkIterExt;
use crate::ColumnFamily;
use crate::Key;
use crate::KvPair;
use crate::Result;
use crate::Value;
pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest { pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
let mut req = kvrpcpb::RawGetRequest::default(); let mut req = kvrpcpb::RawGetRequest::default();
@ -469,17 +483,19 @@ impl HasLocks for kvrpcpb::RawCoprocessorResponse {}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*;
use crate::{
backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
mock::{MockKvClient, MockPdClient},
request::Plan,
Key,
};
use futures::executor;
use std::any::Any; use std::any::Any;
use futures::executor;
use tikv_client_proto::kvrpcpb; use tikv_client_proto::kvrpcpb;
use super::*;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::request::Plan;
use crate::Key;
#[test] #[test]
#[ignore] #[ignore]
fn test_raw_scan() { fn test_raw_scan() {

View File

@ -1,8 +1,12 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{Error, Key, Result};
use derive_new::new; use derive_new::new;
use tikv_client_proto::{kvrpcpb, metapb}; use tikv_client_proto::kvrpcpb;
use tikv_client_proto::metapb;
use crate::Error;
use crate::Key;
use crate::Result;
/// The ID of a region /// The ID of a region
pub type RegionId = u64; pub type RegionId = u64;

View File

@ -1,18 +1,25 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{ use std::collections::BTreeMap;
pd::{RetryClient, RetryClientTrait}, use std::collections::HashMap;
region::{RegionId, RegionVerId, RegionWithLeader, StoreId}, use std::collections::HashSet;
Key, Result, use std::sync::Arc;
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};
use tikv_client_common::Error; use tikv_client_common::Error;
use tikv_client_pd::Cluster; use tikv_client_pd::Cluster;
use tikv_client_proto::metapb::{self, Store}; use tikv_client_proto::metapb::Store;
use tokio::sync::{Notify, RwLock}; use tikv_client_proto::metapb::{self};
use tokio::sync::Notify;
use tokio::sync::RwLock;
use crate::pd::RetryClient;
use crate::pd::RetryClientTrait;
use crate::region::RegionId;
use crate::region::RegionVerId;
use crate::region::RegionWithLeader;
use crate::region::StoreId;
use crate::Key;
use crate::Result;
const MAX_RETRY_WAITING_CONCURRENT_REQUEST: usize = 4; const MAX_RETRY_WAITING_CONCURRENT_REQUEST: usize = 4;
@ -229,24 +236,26 @@ impl<C: RetryClientTrait> RegionCache<C> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::RegionCache; use std::collections::BTreeMap;
use crate::{ use std::collections::HashMap;
pd::RetryClientTrait, use std::collections::HashSet;
region::{RegionId, RegionWithLeader}, use std::sync::atomic::AtomicU64;
Key, Result, use std::sync::atomic::Ordering::SeqCst;
}; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::{
atomic::{AtomicU64, Ordering::SeqCst},
Arc,
},
};
use tikv_client_common::Error; use tikv_client_common::Error;
use tikv_client_proto::metapb::{self, RegionEpoch}; use tikv_client_proto::metapb::RegionEpoch;
use tikv_client_proto::metapb::{self};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use super::RegionCache;
use crate::pd::RetryClientTrait;
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::Key;
use crate::Result;
#[derive(Default)] #[derive(Default)]
struct MockRetryClient { struct MockRetryClient {
pub regions: Mutex<HashMap<RegionId, RegionWithLeader>>, pub regions: Mutex<HashMap<RegionId, RegionWithLeader>>,
@ -309,9 +318,11 @@ mod test {
async fn cache_is_used() -> Result<()> { async fn cache_is_used() -> Result<()> {
let retry_client = Arc::new(MockRetryClient::default()); let retry_client = Arc::new(MockRetryClient::default());
let cache = RegionCache::new(retry_client.clone()); let cache = RegionCache::new(retry_client.clone());
retry_client.regions.lock().await.insert( retry_client
1, .regions
RegionWithLeader { .lock()
.await
.insert(1, RegionWithLeader {
region: metapb::Region { region: metapb::Region {
id: 1, id: 1,
start_key: vec![], start_key: vec![],
@ -326,11 +337,12 @@ mod test {
store_id: 1, store_id: 1,
..Default::default() ..Default::default()
}), }),
}, });
); retry_client
retry_client.regions.lock().await.insert( .regions
2, .lock()
RegionWithLeader { .await
.insert(2, RegionWithLeader {
region: metapb::Region { region: metapb::Region {
id: 2, id: 2,
start_key: vec![101], start_key: vec![101],
@ -345,8 +357,7 @@ mod test {
store_id: 2, store_id: 2,
..Default::default() ..Default::default()
}), }),
}, });
);
assert_eq!(retry_client.get_region_count.load(SeqCst), 0); assert_eq!(retry_client.get_region_count.load(SeqCst), 0);
@ -367,13 +378,10 @@ mod test {
// update leader should work // update leader should work
cache cache
.update_leader( .update_leader(cache.get_region_by_id(2).await?.ver_id(), metapb::Peer {
cache.get_region_by_id(2).await?.ver_id(), store_id: 102,
metapb::Peer { ..Default::default()
store_id: 102, })
..Default::default()
},
)
.await?; .await?;
assert_eq!( assert_eq!(
cache.get_region_by_id(2).await?.leader.unwrap().store_id, cache.get_region_by_id(2).await?.leader.unwrap().store_id,

View File

@ -1,22 +1,36 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
backoff::{Backoff, DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF, PESSIMISTIC_BACKOFF},
transaction::HasLocks,
};
use async_trait::async_trait; use async_trait::async_trait;
use derive_new::new; use derive_new::new;
use tikv_client_store::{HasKeyErrors, Request}; use tikv_client_store::HasKeyErrors;
use tikv_client_store::Request;
pub use self::{ pub use self::plan::Collect;
plan::{ pub use self::plan::CollectError;
Collect, CollectError, CollectSingle, CollectWithShard, DefaultProcessor, Dispatch, pub use self::plan::CollectSingle;
ExtractError, Merge, MergeResponse, Plan, Process, ProcessResponse, ResolveLock, pub use self::plan::CollectWithShard;
ResponseWithShard, RetryableMultiRegion, pub use self::plan::DefaultProcessor;
}, pub use self::plan::Dispatch;
plan_builder::{PlanBuilder, SingleKey}, pub use self::plan::ExtractError;
shard::{Batchable, HasNextBatch, NextBatch, Shardable}, pub use self::plan::Merge;
}; pub use self::plan::MergeResponse;
pub use self::plan::Plan;
pub use self::plan::Process;
pub use self::plan::ProcessResponse;
pub use self::plan::ResolveLock;
pub use self::plan::ResponseWithShard;
pub use self::plan::RetryableMultiRegion;
pub use self::plan_builder::PlanBuilder;
pub use self::plan_builder::SingleKey;
pub use self::shard::Batchable;
pub use self::shard::HasNextBatch;
pub use self::shard::NextBatch;
pub use self::shard::Shardable;
use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::backoff::PESSIMISTIC_BACKOFF;
use crate::transaction::HasLocks;
pub mod plan; pub mod plan;
mod plan_builder; mod plan_builder;
@ -63,23 +77,27 @@ impl RetryOptions {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use std::any::Any;
use crate::{ use std::iter;
mock::{MockKvClient, MockPdClient}, use std::sync::atomic::AtomicUsize;
store::store_stream_for_keys, use std::sync::Arc;
transaction::lowering::new_commit_request, use std::time::Duration;
Error, Key, Result,
}; use tikv_client_proto::kvrpcpb;
use std::{ use tikv_client_proto::pdpb::Timestamp;
any::Any, use tikv_client_proto::tikvpb::tikv_client::TikvClient;
iter,
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp, tikvpb::tikv_client::TikvClient};
use tikv_client_store::HasRegionError; use tikv_client_store::HasRegionError;
use tonic::transport::Channel; use tonic::transport::Channel;
use super::*;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::store::store_stream_for_keys;
use crate::transaction::lowering::new_commit_request;
use crate::Error;
use crate::Key;
use crate::Result;
#[tokio::test] #[tokio::test]
async fn test_region_retry() { async fn test_region_retry() {
#[derive(Clone)] #[derive(Clone)]

View File

@ -1,24 +1,37 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use std::{marker::PhantomData, sync::Arc}; use std::marker::PhantomData;
use std::sync::Arc;
use async_recursion::async_recursion; use async_recursion::async_recursion;
use async_trait::async_trait; use async_trait::async_trait;
use futures::{future::try_join_all, prelude::*}; use futures::future::try_join_all;
use tikv_client_proto::{errorpb, errorpb::EpochNotMatch, kvrpcpb}; use futures::prelude::*;
use tikv_client_store::{HasKeyErrors, HasRegionError, HasRegionErrors, KvClient}; use tikv_client_proto::errorpb;
use tokio::{sync::Semaphore, time::sleep}; use tikv_client_proto::errorpb::EpochNotMatch;
use tikv_client_proto::kvrpcpb;
use tikv_client_store::HasKeyErrors;
use tikv_client_store::HasRegionError;
use tikv_client_store::HasRegionErrors;
use tikv_client_store::KvClient;
use tokio::sync::Semaphore;
use tokio::time::sleep;
use crate::{ use crate::backoff::Backoff;
backoff::Backoff, use crate::pd::PdClient;
pd::PdClient, use crate::request::shard::HasNextBatch;
request::{shard::HasNextBatch, KvRequest, NextBatch, Shardable}, use crate::request::KvRequest;
stats::tikv_stats, use crate::request::NextBatch;
store::RegionStore, use crate::request::Shardable;
transaction::{resolve_locks, HasLocks, ResolveLocksContext, ResolveLocksOptions}, use crate::stats::tikv_stats;
util::iter::FlatMapOkIterExt, use crate::store::RegionStore;
Error, Result, use crate::transaction::resolve_locks;
}; use crate::transaction::HasLocks;
use crate::transaction::ResolveLocksContext;
use crate::transaction::ResolveLocksOptions;
use crate::util::iter::FlatMapOkIterExt;
use crate::Error;
use crate::Result;
/// A plan for how to execute a request. A user builds up a plan with various /// A plan for how to execute a request. A user builds up a plan with various
/// options, then exectutes it. /// options, then exectutes it.
@ -72,8 +85,7 @@ pub struct RetryableMultiRegion<P: Plan, PdC: PdClient> {
} }
impl<P: Plan + Shardable, PdC: PdClient> RetryableMultiRegion<P, PdC> impl<P: Plan + Shardable, PdC: PdClient> RetryableMultiRegion<P, PdC>
where where P::Result: HasKeyErrors + HasRegionError
P::Result: HasKeyErrors + HasRegionError,
{ {
// A plan may involve multiple shards // A plan may involve multiple shards
#[async_recursion] #[async_recursion]
@ -272,8 +284,7 @@ impl<P: Plan, PdC: PdClient> Clone for RetryableMultiRegion<P, PdC> {
#[async_trait] #[async_trait]
impl<P: Plan + Shardable, PdC: PdClient> Plan for RetryableMultiRegion<P, PdC> impl<P: Plan + Shardable, PdC: PdClient> Plan for RetryableMultiRegion<P, PdC>
where where P::Result: HasKeyErrors + HasRegionError
P::Result: HasKeyErrors + HasRegionError,
{ {
type Result = Vec<Result<P::Result>>; type Result = Vec<Result<P::Result>>;
@ -403,8 +414,7 @@ impl<P: Plan, PdC: PdClient> Clone for ResolveLock<P, PdC> {
#[async_trait] #[async_trait]
impl<P: Plan, PdC: PdClient> Plan for ResolveLock<P, PdC> impl<P: Plan, PdC: PdClient> Plan for ResolveLock<P, PdC>
where where P::Result: HasLocks
P::Result: HasLocks,
{ {
type Result = P::Result; type Result = P::Result;
@ -506,8 +516,7 @@ impl<P: Plan, PdC: PdClient> Clone for CleanupLocks<P, PdC> {
#[async_trait] #[async_trait]
impl<P: Plan + Shardable + NextBatch, PdC: PdClient> Plan for CleanupLocks<P, PdC> impl<P: Plan + Shardable + NextBatch, PdC: PdClient> Plan for CleanupLocks<P, PdC>
where where P::Result: HasLocks + HasNextBatch + HasKeyErrors + HasRegionError
P::Result: HasLocks + HasNextBatch + HasKeyErrors + HasRegionError,
{ {
type Result = CleanupLocksResult; type Result = CleanupLocksResult;
@ -622,8 +631,7 @@ impl<P: Plan> Clone for ExtractError<P> {
#[async_trait] #[async_trait]
impl<P: Plan> Plan for ExtractError<P> impl<P: Plan> Plan for ExtractError<P>
where where P::Result: HasKeyErrors + HasRegionErrors
P::Result: HasKeyErrors + HasRegionErrors,
{ {
type Result = P::Result; type Result = P::Result;
@ -665,8 +673,7 @@ impl<P: Plan + Shardable> Clone for PreserveShard<P> {
#[async_trait] #[async_trait]
impl<P> Plan for PreserveShard<P> impl<P> Plan for PreserveShard<P>
where where P: Plan + Shardable
P: Plan + Shardable,
{ {
type Result = ResponseWithShard<P::Result, P::Shard>; type Result = ResponseWithShard<P::Result, P::Shard>;
@ -705,10 +712,12 @@ impl<Resp: HasRegionError, Shard> HasRegionError for ResponseWithShard<Resp, Sha
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use futures::stream::BoxStream;
use futures::stream::{self};
use tikv_client_proto::kvrpcpb::BatchGetResponse;
use super::*; use super::*;
use crate::mock::MockPdClient; use crate::mock::MockPdClient;
use futures::stream::{self, BoxStream};
use tikv_client_proto::kvrpcpb::BatchGetResponse;
#[derive(Clone)] #[derive(Clone)]
struct ErrPlan; struct ErrPlan;

View File

@ -1,20 +1,35 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use std::marker::PhantomData;
use std::sync::Arc;
use tikv_client_store::HasKeyErrors;
use tikv_client_store::HasRegionError;
use tikv_client_store::HasRegionErrors;
use super::plan::PreserveShard; use super::plan::PreserveShard;
use crate::{ use crate::backoff::Backoff;
backoff::Backoff, use crate::pd::PdClient;
pd::PdClient, use crate::request::plan::CleanupLocks;
request::{ use crate::request::shard::HasNextBatch;
plan::CleanupLocks, shard::HasNextBatch, DefaultProcessor, Dispatch, ExtractError, use crate::request::DefaultProcessor;
KvRequest, Merge, MergeResponse, NextBatch, Plan, Process, ProcessResponse, ResolveLock, use crate::request::Dispatch;
RetryableMultiRegion, Shardable, use crate::request::ExtractError;
}, use crate::request::KvRequest;
store::RegionStore, use crate::request::Merge;
transaction::{HasLocks, ResolveLocksContext, ResolveLocksOptions}, use crate::request::MergeResponse;
Result, use crate::request::NextBatch;
}; use crate::request::Plan;
use std::{marker::PhantomData, sync::Arc}; use crate::request::Process;
use tikv_client_store::{HasKeyErrors, HasRegionError, HasRegionErrors}; use crate::request::ProcessResponse;
use crate::request::ResolveLock;
use crate::request::RetryableMultiRegion;
use crate::request::Shardable;
use crate::store::RegionStore;
use crate::transaction::HasLocks;
use crate::transaction::ResolveLocksContext;
use crate::transaction::ResolveLocksOptions;
use crate::Result;
/// Builder type for plans (see that module for more). /// Builder type for plans (see that module for more).
pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> { pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
@ -55,9 +70,7 @@ impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted> {
impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> { impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
/// If there is a lock error, then resolve the lock and retry the request. /// If there is a lock error, then resolve the lock and retry the request.
pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph> pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
where where P::Result: HasLocks {
P::Result: HasLocks,
{
PlanBuilder { PlanBuilder {
pd_client: self.pd_client.clone(), pd_client: self.pd_client.clone(),
plan: ResolveLock { plan: ResolveLock {
@ -133,8 +146,7 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
} }
impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget> impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
where where P::Result: HasKeyErrors + HasRegionError
P::Result: HasKeyErrors + HasRegionError,
{ {
/// Split the request into shards sending a request to the region of each shard. /// Split the request into shards sending a request to the region of each shard.
pub fn retry_multi_region( pub fn retry_multi_region(
@ -194,8 +206,7 @@ impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
} }
impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget> impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
where where P::Result: HasKeyErrors
P::Result: HasKeyErrors,
{ {
pub fn preserve_shard(self) -> PlanBuilder<PdC, PreserveShard<P>, NoTarget> { pub fn preserve_shard(self) -> PlanBuilder<PdC, PreserveShard<P>, NoTarget> {
PlanBuilder { PlanBuilder {
@ -210,8 +221,7 @@ where
} }
impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted> impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted>
where where P::Result: HasKeyErrors + HasRegionErrors
P::Result: HasKeyErrors + HasRegionErrors,
{ {
pub fn extract_error(self) -> PlanBuilder<PdC, ExtractError<P>, Targetted> { pub fn extract_error(self) -> PlanBuilder<PdC, ExtractError<P>, Targetted> {
PlanBuilder { PlanBuilder {

View File

@ -1,15 +1,19 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use super::plan::PreserveShard;
use crate::{
pd::PdClient,
request::{plan::CleanupLocks, Dispatch, KvRequest, Plan, ResolveLock},
store::RegionStore,
Result,
};
use futures::stream::BoxStream;
use std::sync::Arc; use std::sync::Arc;
use futures::stream::BoxStream;
use super::plan::PreserveShard;
use crate::pd::PdClient;
use crate::request::plan::CleanupLocks;
use crate::request::Dispatch;
use crate::request::KvRequest;
use crate::request::Plan;
use crate::request::ResolveLock;
use crate::store::RegionStore;
use crate::Result;
macro_rules! impl_inner_shardable { macro_rules! impl_inner_shardable {
() => { () => {
type Shard = P::Shard; type Shard = P::Shard;
@ -230,7 +234,8 @@ macro_rules! shardable_range {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use rand::{thread_rng, Rng}; use rand::thread_rng;
use rand::Rng;
use super::Batchable; use super::Batchable;

View File

@ -1,11 +1,16 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
use std::time::Duration;
use std::time::Instant;
use prometheus::register_histogram;
use prometheus::register_histogram_vec;
use prometheus::register_int_counter_vec;
use prometheus::Histogram;
use prometheus::HistogramVec;
use prometheus::IntCounterVec;
use crate::Result; use crate::Result;
use prometheus::{
register_histogram, register_histogram_vec, register_int_counter_vec, Histogram, HistogramVec,
IntCounterVec,
};
use std::time::{Duration, Instant};
pub struct RequestStats { pub struct RequestStats {
start: Instant, start: Instant,

View File

@ -1,15 +1,23 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{pd::PdClient, region::RegionWithLeader, BoundRange, Key, Result}; use std::cmp::max;
use std::cmp::min;
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use derive_new::new; use derive_new::new;
use futures::{prelude::*, stream::BoxStream}; use futures::prelude::*;
use std::{ use futures::stream::BoxStream;
cmp::{max, min},
sync::Arc,
};
use tikv_client_proto::kvrpcpb; use tikv_client_proto::kvrpcpb;
use tikv_client_store::{KvClient, KvConnect, TikvConnect}; use tikv_client_store::KvClient;
use tikv_client_store::KvConnect;
use tikv_client_store::TikvConnect;
use crate::pd::PdClient;
use crate::region::RegionWithLeader;
use crate::BoundRange;
use crate::Key;
use crate::Result;
#[derive(new, Clone)] #[derive(new, Clone)]
pub struct RegionStore { pub struct RegionStore {

View File

@ -5,6 +5,7 @@
//! The higher bits of the version are the physical part of the timestamp. //! The higher bits of the version are the physical part of the timestamp.
use std::convert::TryInto; use std::convert::TryInto;
pub use tikv_client_proto::pdpb::Timestamp; pub use tikv_client_proto::pdpb::Timestamp;
const PHYSICAL_SHIFT_BITS: i64 = 18; const PHYSICAL_SHIFT_BITS: i64 = 18;

View File

@ -1,12 +1,18 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{BoundRange, Key, KvPair, Result, Value}; use std::collections::btree_map::Entry;
use std::{ use std::collections::BTreeMap;
collections::{btree_map::Entry, BTreeMap, HashMap}, use std::collections::HashMap;
future::Future, use std::future::Future;
};
use tikv_client_proto::kvrpcpb; use tikv_client_proto::kvrpcpb;
use crate::BoundRange;
use crate::Key;
use crate::KvPair;
use crate::Result;
use crate::Value;
/// A caching layer which buffers reads and writes in a transaction. /// A caching layer which buffers reads and writes in a transaction.
pub struct Buffer { pub struct Buffer {
primary_key: Option<Key>, primary_key: Option<Key>,
@ -391,10 +397,12 @@ impl MutationValue {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use futures::executor::block_on;
use futures::{executor::block_on, future::ready}; use futures::future::ready;
use tikv_client_common::internal_err; use tikv_client_common::internal_err;
use super::*;
#[test] #[test]
fn set_and_get_from_buffer() { fn set_and_get_from_buffer() {
let mut buffer = Buffer::new(false); let mut buffer = Buffer::new(false);
@ -483,18 +491,15 @@ mod tests {
ready(Ok(vec![])) ready(Ok(vec![]))
}), }),
); );
assert_eq!( assert_eq!(r1.unwrap().collect::<Vec<_>>(), vec![
r1.unwrap().collect::<Vec<_>>(), KvPair(k1.clone(), v1.clone()),
vec![ KvPair(k2.clone(), v2.clone())
KvPair(k1.clone(), v1.clone()), ]);
KvPair(k2.clone(), v2.clone())
]
);
assert_eq!(r2.unwrap().unwrap(), v2); assert_eq!(r2.unwrap().unwrap(), v2);
assert_eq!( assert_eq!(r3.unwrap().collect::<Vec<_>>(), vec![
r3.unwrap().collect::<Vec<_>>(), KvPair(k1, v1),
vec![KvPair(k1, v1), KvPair(k2, v2)] KvPair(k2, v2)
); ]);
} }
// Check that multiple writes to the same key combine in the correct way. // Check that multiple writes to the same key combine in the correct way.

View File

@ -1,21 +1,28 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
backoff::DEFAULT_REGION_BACKOFF,
config::Config,
pd::{PdClient, PdRpcClient},
request::{plan::CleanupLocksResult, Plan},
timestamp::TimestampExt,
transaction::{
lock::ResolveLocksOptions, ResolveLocksContext, Snapshot, Transaction, TransactionOptions,
},
transaction_lowering::new_scan_lock_request,
Backoff, BoundRange, Result,
};
use slog::{Drain, Logger};
use std::sync::Arc; use std::sync::Arc;
use slog::Drain;
use slog::Logger;
use tikv_client_proto::pdpb::Timestamp; use tikv_client_proto::pdpb::Timestamp;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::request::plan::CleanupLocksResult;
use crate::request::Plan;
use crate::timestamp::TimestampExt;
use crate::transaction::lock::ResolveLocksOptions;
use crate::transaction::ResolveLocksContext;
use crate::transaction::Snapshot;
use crate::transaction::Transaction;
use crate::transaction::TransactionOptions;
use crate::transaction_lowering::new_scan_lock_request;
use crate::Backoff;
use crate::BoundRange;
use crate::Result;
// FIXME: cargo-culted value // FIXME: cargo-culted value
const SCAN_LOCK_BATCH_SIZE: u32 = 1024; const SCAN_LOCK_BATCH_SIZE: u32 = 1024;

View File

@ -1,31 +1,36 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{ use std::collections::HashMap;
backoff::{Backoff, DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF}, use std::collections::HashSet;
pd::PdClient, use std::sync::Arc;
region::RegionVerId,
request::{Collect, CollectSingle, Plan},
store::RegionStore,
timestamp::TimestampExt,
transaction::{
requests,
requests::{
new_check_secondary_locks_request, new_check_txn_status_request, SecondaryLocksStatus,
TransactionStatus, TransactionStatusKind,
},
},
Error, Result,
};
use fail::fail_point; use fail::fail_point;
use log::debug; use log::debug;
use slog::Logger; use slog::Logger;
use std::{ use tikv_client_proto::kvrpcpb;
collections::{HashMap, HashSet}, use tikv_client_proto::kvrpcpb::TxnInfo;
sync::Arc, use tikv_client_proto::pdpb::Timestamp;
};
use tikv_client_proto::{kvrpcpb, kvrpcpb::TxnInfo, pdpb::Timestamp};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::pd::PdClient;
use crate::region::RegionVerId;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::Plan;
use crate::store::RegionStore;
use crate::timestamp::TimestampExt;
use crate::transaction::requests;
use crate::transaction::requests::new_check_secondary_locks_request;
use crate::transaction::requests::new_check_txn_status_request;
use crate::transaction::requests::SecondaryLocksStatus;
use crate::transaction::requests::TransactionStatus;
use crate::transaction::requests::TransactionStatusKind;
use crate::Error;
use crate::Result;
const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;
/// _Resolves_ the given locks. Returns whether all the given locks are resolved. /// _Resolves_ the given locks. Returns whether all the given locks are resolved.
@ -251,7 +256,10 @@ impl LockResolver {
self.logger, self.logger,
"secondary status, txn_id:{}, commit_ts:{:?}, min_commit_version:{}, fallback_2pc:{}", "secondary status, txn_id:{}, commit_ts:{:?}, min_commit_version:{}, fallback_2pc:{}",
txn_id, txn_id,
secondary_status.commit_ts.as_ref().map_or(0, |ts| ts.version()), secondary_status
.commit_ts
.as_ref()
.map_or(0, |ts| ts.version()),
secondary_status.min_commit_ts, secondary_status.min_commit_ts,
secondary_status.fallback_2pc, secondary_status.fallback_2pc,
); );
@ -417,11 +425,14 @@ pub trait HasLocks {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*;
use crate::mock::{MockKvClient, MockPdClient};
use std::any::Any; use std::any::Any;
use tikv_client_proto::errorpb; use tikv_client_proto::errorpb;
use super::*;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
#[tokio::test] #[tokio::test]
async fn test_resolve_lock_with_retry() { async fn test_resolve_lock_with_retry() {
// Test resolve lock within retry limit // Test resolve lock within retry limit

View File

@ -1,11 +1,26 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use std::iter::Iterator;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::pdpb::Timestamp;
/// This module provides constructor functions for requests which take arguments as high-level /// This module provides constructor functions for requests which take arguments as high-level
/// types (i.e., the types from the client crate) and converts these to the types used in the /// types (i.e., the types from the client crate) and converts these to the types used in the
/// generated protobuf code, then calls the low-level ctor functions in the requests module. /// generated protobuf code, then calls the low-level ctor functions in the requests module.
use crate::{timestamp::TimestampExt, transaction::requests, BoundRange, Key}; use crate::timestamp::TimestampExt;
use std::iter::Iterator; /// This module provides constructor functions for requests which take arguments as high-level
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; /// types (i.e., the types from the client crate) and converts these to the types used in the
/// generated protobuf code, then calls the low-level ctor functions in the requests module.
use crate::transaction::requests;
/// This module provides constructor functions for requests which take arguments as high-level
/// types (i.e., the types from the client crate) and converts these to the types used in the
/// generated protobuf code, then calls the low-level ctor functions in the requests module.
use crate::BoundRange;
/// This module provides constructor functions for requests which take arguments as high-level
/// types (i.e., the types from the client crate) and converts these to the types used in the
/// generated protobuf code, then calls the low-level ctor functions in the requests module.
use crate::Key;
pub fn new_get_request(key: Key, timestamp: Timestamp) -> kvrpcpb::GetRequest { pub fn new_get_request(key: Key, timestamp: Timestamp) -> kvrpcpb::GetRequest {
requests::new_get_request(key.into(), timestamp.version()) requests::new_get_request(key.into(), timestamp.version())

View File

@ -9,11 +9,14 @@
//! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace. //! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace.
pub use client::Client; pub use client::Client;
pub(crate) use lock::{resolve_locks, HasLocks}; pub(crate) use lock::resolve_locks;
pub(crate) use lock::HasLocks;
pub use snapshot::Snapshot; pub use snapshot::Snapshot;
pub use transaction::CheckLevel;
#[doc(hidden)] #[doc(hidden)]
pub use transaction::HeartbeatOption; pub use transaction::HeartbeatOption;
pub use transaction::{CheckLevel, Transaction, TransactionOptions}; pub use transaction::Transaction;
pub use transaction::TransactionOptions;
mod buffer; mod buffer;
mod client; mod client;
@ -21,7 +24,9 @@ pub mod lowering;
#[macro_use] #[macro_use]
mod requests; mod requests;
mod lock; mod lock;
pub use lock::{LockResolver, ResolveLocksContext, ResolveLocksOptions}; pub use lock::LockResolver;
pub use lock::ResolveLocksContext;
pub use lock::ResolveLocksOptions;
mod snapshot; mod snapshot;
#[allow(clippy::module_inception)] #[allow(clippy::module_inception)]
mod transaction; mod transaction;

View File

@ -1,31 +1,46 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{ use std::cmp;
collect_first, use std::iter;
pd::PdClient, use std::sync::Arc;
request::{
Batchable, Collect, CollectSingle, CollectWithShard, DefaultProcessor, HasNextBatch,
KvRequest, Merge, NextBatch, Process, ResponseWithShard, Shardable, SingleKey,
},
store::{store_stream_for_keys, store_stream_for_range, RegionStore},
timestamp::TimestampExt,
transaction::HasLocks,
util::iter::FlatMapOkIterExt,
KvPair, Result, Value,
};
use either::Either; use either::Either;
use futures::{ use futures::stream::BoxStream;
stream::{self, BoxStream}, use futures::stream::{self};
StreamExt, use futures::StreamExt;
};
use std::{cmp, iter, sync::Arc};
use tikv_client_common::Error::PessimisticLockError; use tikv_client_common::Error::PessimisticLockError;
use tikv_client_proto::{ use tikv_client_proto::kvrpcpb::Action;
kvrpcpb::{self, Action, LockInfo, TxnHeartBeatResponse, TxnInfo}, use tikv_client_proto::kvrpcpb::LockInfo;
pdpb::Timestamp, use tikv_client_proto::kvrpcpb::TxnHeartBeatResponse;
}; use tikv_client_proto::kvrpcpb::TxnInfo;
use tikv_client_proto::kvrpcpb::{self};
use tikv_client_proto::pdpb::Timestamp;
use super::transaction::TXN_COMMIT_BATCH_SIZE; use super::transaction::TXN_COMMIT_BATCH_SIZE;
use crate::collect_first;
use crate::pd::PdClient;
use crate::request::Batchable;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::CollectWithShard;
use crate::request::DefaultProcessor;
use crate::request::HasNextBatch;
use crate::request::KvRequest;
use crate::request::Merge;
use crate::request::NextBatch;
use crate::request::Process;
use crate::request::ResponseWithShard;
use crate::request::Shardable;
use crate::request::SingleKey;
use crate::store::store_stream_for_keys;
use crate::store::store_stream_for_range;
use crate::store::RegionStore;
use crate::timestamp::TimestampExt;
use crate::transaction::HasLocks;
use crate::util::iter::FlatMapOkIterExt;
use crate::KvPair;
use crate::Result;
use crate::Value;
// implement HasLocks for a response type that has a `pairs` field, // implement HasLocks for a response type that has a `pairs` field,
// where locks can be extracted from both the `pairs` and `error` fields // where locks can be extracted from both the `pairs` and `error` fields
@ -490,11 +505,7 @@ impl Merge<ResponseWithShard<kvrpcpb::PessimisticLockResponse, Vec<kvrpcpb::Muta
} else { } else {
assert_eq!(kvpairs.len(), not_founds.len()); assert_eq!(kvpairs.len(), not_founds.len());
Either::Right(kvpairs.zip(not_founds).filter_map(|(kvpair, not_found)| { Either::Right(kvpairs.zip(not_founds).filter_map(|(kvpair, not_found)| {
if not_found { if not_found { None } else { Some(kvpair) }
None
} else {
Some(kvpair)
}
})) }))
} }
}) })
@ -851,13 +862,15 @@ impl HasLocks for kvrpcpb::PrewriteResponse {
#[cfg(test)] #[cfg(test)]
#[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))] #[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))]
mod tests { mod tests {
use crate::{ use tikv_client_common::Error::PessimisticLockError;
request::{plan::Merge, CollectWithShard, ResponseWithShard}, use tikv_client_common::Error::ResolveLockError;
KvPair,
};
use tikv_client_common::Error::{PessimisticLockError, ResolveLockError};
use tikv_client_proto::kvrpcpb; use tikv_client_proto::kvrpcpb;
use crate::request::plan::Merge;
use crate::request::CollectWithShard;
use crate::request::ResponseWithShard;
use crate::KvPair;
#[tokio::test] #[tokio::test]
async fn test_merge_pessimistic_lock_response() { async fn test_merge_pessimistic_lock_response() {
let (key1, key2, key3, key4) = (b"key1", b"key2", b"key3", b"key4"); let (key1, key2, key3, key4) = (b"key1", b"key2", b"key3", b"key4");
@ -918,13 +931,10 @@ mod tests {
]; ];
let result = merger.merge(input); let result = merger.merge(input);
assert_eq!( assert_eq!(result.unwrap(), vec![
result.unwrap(), KvPair::new(key1.to_vec(), value1.to_vec()),
vec![ KvPair::new(key4.to_vec(), value4.to_vec()),
KvPair::new(key1.to_vec(), value1.to_vec()), ]);
KvPair::new(key4.to_vec(), value4.to_vec()),
]
);
} }
{ {
let input = vec![ let input = vec![
@ -941,10 +951,12 @@ mod tests {
} = result.unwrap_err() } = result.unwrap_err()
{ {
assert!(matches!(*inner, ResolveLockError)); assert!(matches!(*inner, ResolveLockError));
assert_eq!( assert_eq!(success_keys, vec![
success_keys, key1.to_vec(),
vec![key1.to_vec(), key2.to_vec(), key3.to_vec(), key4.to_vec()] key2.to_vec(),
); key3.to_vec(),
key4.to_vec()
]);
} else { } else {
panic!(); panic!();
} }

View File

@ -1,9 +1,15 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{BoundRange, Key, KvPair, Result, Transaction, Value};
use derive_new::new; use derive_new::new;
use slog::Logger; use slog::Logger;
use crate::BoundRange;
use crate::Key;
use crate::KvPair;
use crate::Result;
use crate::Transaction;
use crate::Value;
/// A read-only transaction which reads at the given timestamp. /// A read-only transaction which reads at the given timestamp.
/// ///
/// It behaves as if the snapshot was taken at the given timestamp, /// It behaves as if the snapshot was taken at the given timestamp,

View File

@ -1,22 +1,38 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{ use std::iter;
backoff::{Backoff, DEFAULT_REGION_BACKOFF}, use std::sync::Arc;
pd::{PdClient, PdRpcClient}, use std::time::Instant;
request::{
Collect, CollectError, CollectSingle, CollectWithShard, Plan, PlanBuilder, RetryOptions,
},
timestamp::TimestampExt,
transaction::{buffer::Buffer, lowering::*},
BoundRange, Error, Key, KvPair, Result, Value,
};
use derive_new::new; use derive_new::new;
use fail::fail_point; use fail::fail_point;
use futures::prelude::*; use futures::prelude::*;
use slog::Logger; use slog::Logger;
use std::{iter, sync::Arc, time::Instant}; use tikv_client_proto::kvrpcpb;
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; use tikv_client_proto::pdpb::Timestamp;
use tokio::{sync::RwLock, time::Duration}; use tokio::sync::RwLock;
use tokio::time::Duration;
use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::request::Collect;
use crate::request::CollectError;
use crate::request::CollectSingle;
use crate::request::CollectWithShard;
use crate::request::Plan;
use crate::request::PlanBuilder;
use crate::request::RetryOptions;
use crate::timestamp::TimestampExt;
use crate::transaction::buffer::Buffer;
use crate::transaction::lowering::*;
use crate::BoundRange;
use crate::Error;
use crate::Key;
use crate::KvPair;
use crate::Result;
use crate::Value;
/// An undo-able set of actions on the dataset. /// An undo-able set of actions on the dataset.
/// ///
@ -1386,23 +1402,24 @@ enum TransactionStatus {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{ use std::any::Any;
mock::{MockKvClient, MockPdClient}, use std::io;
transaction::HeartbeatOption, use std::sync::atomic::AtomicUsize;
Transaction, TransactionOptions, use std::sync::atomic::Ordering;
}; use std::sync::Arc;
use std::time::Duration;
use fail::FailScenario; use fail::FailScenario;
use slog::{Drain, Logger}; use slog::Drain;
use std::{ use slog::Logger;
any::Any, use tikv_client_proto::kvrpcpb;
io, use tikv_client_proto::pdpb::Timestamp;
sync::{
atomic::{AtomicUsize, Ordering}, use crate::mock::MockKvClient;
Arc, use crate::mock::MockPdClient;
}, use crate::transaction::HeartbeatOption;
time::Duration, use crate::Transaction;
}; use crate::TransactionOptions;
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
#[tokio::test] #[tokio::test]
async fn test_optimistic_heartbeat() -> Result<(), io::Error> { async fn test_optimistic_heartbeat() -> Result<(), io::Error> {

View File

@ -32,13 +32,13 @@ pub struct FlatMapOk<U, F, Ti, E> {
// FIXME: implement other iterator methods like size_hint, etc. // FIXME: implement other iterator methods like size_hint, etc.
impl< impl<
T: IntoIterator<IntoIter = Ti>, T: IntoIterator<IntoIter = Ti>,
U: Iterator<Item = std::result::Result<I, E>>, U: Iterator<Item = std::result::Result<I, E>>,
F: FnMut(I) -> T, F: FnMut(I) -> T,
Ti: Iterator<Item = T::Item>, Ti: Iterator<Item = T::Item>,
I, I,
E, E,
> Iterator for FlatMapOk<U, F, Ti, E> > Iterator for FlatMapOk<U, F, Ti, E>
{ {
type Item = std::result::Result<T::Item, E>; type Item = std::result::Result<T::Item, E>;
@ -92,9 +92,14 @@ mod test {
.into_iter() .into_iter()
.flat_map_ok(|i| vec![i, i, i].into_iter()) .flat_map_ok(|i| vec![i, i, i].into_iter())
.collect(); .collect();
assert_eq!( assert_eq!(result, vec![
result, Ok(0),
vec![Ok(0), Ok(0), Ok(0), Err(()), Ok(2), Ok(2), Ok(2)] Ok(0),
); Ok(0),
Err(()),
Ok(2),
Ok(2),
Ok(2)
]);
} }
} }

View File

@ -2,8 +2,10 @@
//! The module provides some utility functions to control and get information //! The module provides some utility functions to control and get information
//! from PD, using its HTTP API. //! from PD, using its HTTP API.
use tikv_client_common::Error;
use tikv_client_common::Result;
use super::pd_addrs; use super::pd_addrs;
use tikv_client_common::{Error, Result};
pub async fn get_region_count() -> Result<u64> { pub async fn get_region_count() -> Result<u64> {
let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0])) let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0]))

View File

@ -2,11 +2,21 @@
mod ctl; mod ctl;
use log::{info, warn}; use std::collections::HashSet;
use std::convert::TryInto;
use std::env;
use std::time::Duration;
use log::info;
use log::warn;
use rand::Rng; use rand::Rng;
use slog::Drain; use slog::Drain;
use std::{collections::HashSet, convert::TryInto, env, time::Duration}; use tikv_client::ColumnFamily;
use tikv_client::{ColumnFamily, Key, RawClient, Result, Transaction, TransactionClient}; use tikv_client::Key;
use tikv_client::RawClient;
use tikv_client::Result;
use tikv_client::Transaction;
use tikv_client::TransactionClient;
use tokio::time::sleep; use tokio::time::sleep;
const ENV_PD_ADDRS: &str = "PD_ADDRS"; const ENV_PD_ADDRS: &str = "PD_ADDRS";

View File

@ -2,16 +2,25 @@
mod common; mod common;
use std::collections::HashSet;
use std::iter::FromIterator;
use std::thread;
use std::time::Duration;
use common::*; use common::*;
use fail::FailScenario; use fail::FailScenario;
use rand::thread_rng; use rand::thread_rng;
use serial_test::serial; use serial_test::serial;
use slog::info; use slog::info;
use std::{collections::HashSet, iter::FromIterator, thread, time::Duration}; use tikv_client::transaction::Client;
use tikv_client::{ use tikv_client::transaction::HeartbeatOption;
transaction::{Client, HeartbeatOption, ResolveLocksOptions}, use tikv_client::transaction::ResolveLocksOptions;
Backoff, CheckLevel, Result, RetryOptions, TransactionClient, TransactionOptions, use tikv_client::Backoff;
}; use tikv_client::CheckLevel;
use tikv_client::Result;
use tikv_client::RetryOptions;
use tikv_client::TransactionClient;
use tikv_client::TransactionOptions;
#[tokio::test] #[tokio::test]
#[serial] #[serial]

View File

@ -12,15 +12,25 @@
//! requirements on the region boundaries. //! requirements on the region boundaries.
mod common; mod common;
use std::collections::HashMap;
use std::iter;
use common::*; use common::*;
use futures::prelude::*; use futures::prelude::*;
use rand::{seq::IteratorRandom, thread_rng, Rng}; use rand::seq::IteratorRandom;
use rand::thread_rng;
use rand::Rng;
use serial_test::serial; use serial_test::serial;
use std::{collections::HashMap, iter}; use tikv_client::transaction::HeartbeatOption;
use tikv_client::{ use tikv_client::BoundRange;
transaction::HeartbeatOption, BoundRange, Error, Key, KvPair, RawClient, Result, use tikv_client::Error;
TransactionClient, TransactionOptions, Value, use tikv_client::Key;
}; use tikv_client::KvPair;
use tikv_client::RawClient;
use tikv_client::Result;
use tikv_client::TransactionClient;
use tikv_client::TransactionOptions;
use tikv_client::Value;
// Parameters used in test // Parameters used in test
const NUM_PEOPLE: u32 = 100; const NUM_PEOPLE: u32 = 100;
@ -737,10 +747,11 @@ async fn txn_lock_keys_error_handle() -> Result<()> {
let mut t3 = client.begin_pessimistic().await?; let mut t3 = client.begin_pessimistic().await?;
t1.lock_keys(vec![k[0].clone(), k[1].clone()]).await?; t1.lock_keys(vec![k[0].clone(), k[1].clone()]).await?;
assert!(t2 assert!(
.lock_keys(vec![k[0].clone(), k[2].clone()]) t2.lock_keys(vec![k[0].clone(), k[2].clone()])
.await .await
.is_err()); .is_err()
);
t3.lock_keys(vec![k[2].clone(), k[3].clone()]).await?; t3.lock_keys(vec![k[2].clone(), k[3].clone()]).await?;
t1.rollback().await?; t1.rollback().await?;

View File

@ -1,6 +1,7 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
use std::result; use std::result;
use thiserror::Error; use thiserror::Error;
/// An error originating from the TiKV client or dependencies. /// An error originating from the TiKV client or dependencies.

View File

@ -6,4 +6,6 @@ pub mod security;
extern crate log; extern crate log;
#[doc(inline)] #[doc(inline)]
pub use crate::errors::{Error, Result}; pub use crate::errors::Error;
#[doc(inline)]
pub use crate::errors::Result;

View File

@ -1,15 +1,19 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
use crate::Result; use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
// use grpcio::{Channel, ChannelBuilder, ChannelCredentialsBuilder, Environment}; // use grpcio::{Channel, ChannelBuilder, ChannelCredentialsBuilder, Environment};
use regex::Regex; use regex::Regex;
use std::{ use tonic::transport::Certificate;
fs::File, use tonic::transport::Channel;
io::Read, use tonic::transport::ClientTlsConfig;
path::{Path, PathBuf}, use tonic::transport::Identity;
time::Duration,
}; use crate::Result;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref SCHEME_REG: Regex = Regex::new(r"^\s*(https?://)").unwrap(); static ref SCHEME_REG: Regex = Regex::new(r"^\s*(https?://)").unwrap();
@ -98,11 +102,14 @@ impl SecurityManager {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::{fs::File, io::Write, path::PathBuf};
use tempfile; use tempfile;
use super::*;
#[test] #[test]
fn test_security() { fn test_security() {
let temp = tempfile::tempdir().unwrap(); let temp = tempfile::tempdir().unwrap();

View File

@ -1,15 +1,21 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{timestamp::TimestampOracle, Result, SecurityManager}; use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use async_trait::async_trait; use async_trait::async_trait;
use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};
use tikv_client_common::internal_err; use tikv_client_common::internal_err;
use tikv_client_proto::pdpb::{self, Timestamp}; use tikv_client_proto::pdpb::Timestamp;
use tonic::{transport::Channel, IntoRequest, Request}; use tikv_client_proto::pdpb::{self};
use tonic::transport::Channel;
use tonic::IntoRequest;
use tonic::Request;
use crate::timestamp::TimestampOracle;
use crate::Result;
use crate::SecurityManager;
/// A PD cluster. /// A PD cluster.
pub struct Cluster { pub struct Cluster {

View File

@ -1,9 +1,15 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
#[doc(inline)] #[doc(inline)]
pub use cluster::{Cluster, Connection}; pub use cluster::Cluster;
#[doc(inline)] #[doc(inline)]
pub use tikv_client_common::{security::SecurityManager, Error, Result}; pub use cluster::Connection;
#[doc(inline)]
pub use tikv_client_common::security::SecurityManager;
#[doc(inline)]
pub use tikv_client_common::Error;
#[doc(inline)]
pub use tikv_client_common::Result;
#[macro_use] #[macro_use]
extern crate log; extern crate log;

View File

@ -11,20 +11,27 @@
//! single `TsoRequest` to the PD server. The other future receives `TsoResponse`s from the PD //! single `TsoRequest` to the PD server. The other future receives `TsoResponse`s from the PD
//! server and allocates timestamps for the requests. //! server and allocates timestamps for the requests.
use crate::Result; use std::collections::VecDeque;
use futures::{ use std::pin::Pin;
pin_mut, use std::sync::Arc;
prelude::*,
task::{AtomicWaker, Context, Poll}, use futures::pin_mut;
}; use futures::prelude::*;
use futures::task::AtomicWaker;
use futures::task::Context;
use futures::task::Poll;
use log::debug; use log::debug;
use pin_project::pin_project; use pin_project::pin_project;
use std::{collections::VecDeque, pin::Pin, sync::Arc};
use tikv_client_common::internal_err; use tikv_client_common::internal_err;
use tikv_client_proto::pdpb::{pd_client::PdClient, *}; use tikv_client_proto::pdpb::pd_client::PdClient;
use tokio::sync::{mpsc, oneshot, Mutex}; use tikv_client_proto::pdpb::*;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tonic::transport::Channel; use tonic::transport::Channel;
use crate::Result;
/// It is an empirical value. /// It is an empirical value.
const MAX_BATCH_SIZE: usize = 64; const MAX_BATCH_SIZE: usize = 64;

View File

@ -1,12 +1,18 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{request::Request, Result, SecurityManager}; use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use derive_new::new; use derive_new::new;
use std::{any::Any, sync::Arc, time::Duration};
use tikv_client_proto::tikvpb::tikv_client::TikvClient; use tikv_client_proto::tikvpb::tikv_client::TikvClient;
use tonic::transport::Channel; use tonic::transport::Channel;
use crate::request::Request;
use crate::Result;
use crate::SecurityManager;
/// A trait for connecting to TiKV stores. /// A trait for connecting to TiKV stores.
#[async_trait] #[async_trait]
pub trait KvConnect: Sized + Send + Sync + 'static { pub trait KvConnect: Sized + Send + Sync + 'static {

View File

@ -1,9 +1,11 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::Error;
use std::fmt::Display; use std::fmt::Display;
use tikv_client_proto::kvrpcpb; use tikv_client_proto::kvrpcpb;
use crate::Error;
// Those that can have a single region error // Those that can have a single region error
pub trait HasRegionError { pub trait HasRegionError {
fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error>; fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error>;
@ -210,9 +212,11 @@ fn extract_errors(
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::HasKeyErrors; use tikv_client_common::internal_err;
use tikv_client_common::{internal_err, Error}; use tikv_client_common::Error;
use tikv_client_proto::kvrpcpb; use tikv_client_proto::kvrpcpb;
use super::HasKeyErrors;
#[test] #[test]
fn result_haslocks() { fn result_haslocks() {
let mut resp: Result<_, Error> = Ok(kvrpcpb::CommitResponse::default()); let mut resp: Result<_, Error> = Ok(kvrpcpb::CommitResponse::default());

View File

@ -4,10 +4,21 @@ mod client;
mod errors; mod errors;
mod request; mod request;
pub use tikv_client_common::security::SecurityManager;
pub use tikv_client_common::Error;
pub use tikv_client_common::Result;
#[doc(inline)] #[doc(inline)]
pub use crate::{ pub use crate::client::KvClient;
client::{KvClient, KvConnect, TikvConnect}, #[doc(inline)]
errors::{HasKeyErrors, HasRegionError, HasRegionErrors}, pub use crate::client::KvConnect;
request::Request, #[doc(inline)]
}; pub use crate::client::TikvConnect;
pub use tikv_client_common::{security::SecurityManager, Error, Result}; #[doc(inline)]
pub use crate::errors::HasKeyErrors;
#[doc(inline)]
pub use crate::errors::HasRegionError;
#[doc(inline)]
pub use crate::errors::HasRegionErrors;
#[doc(inline)]
pub use crate::request::Request;

View File

@ -1,10 +1,16 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{Error, Result}; use std::any::Any;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use std::{any::Any, time::Duration}; use tikv_client_proto::kvrpcpb;
use tikv_client_proto::{kvrpcpb, tikvpb::tikv_client::TikvClient}; use tikv_client_proto::tikvpb::tikv_client::TikvClient;
use tonic::{transport::Channel, IntoRequest}; use tonic::transport::Channel;
use tonic::IntoRequest;
use crate::Error;
use crate::Result;
#[async_trait] #[async_trait]
pub trait Request: Any + Sync + Send + 'static { pub trait Request: Any + Sync + Send + 'static {