Combine workspace into a unicrate (#402)

Signed-off-by: Andy Lok <andylokandy@hotmail.com>
This commit is contained in:
Andy Lok 2023-07-10 10:59:30 +08:00 committed by GitHub
parent bae5ee9d09
commit e49bc09496
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
93 changed files with 349 additions and 658 deletions

View File

@ -26,26 +26,20 @@ fail = "0.4"
futures = { version = "0.3" }
lazy_static = "1"
log = "0.4"
pin-project = "1"
prometheus = { version = "0.13", features = ["push"], default-features = false }
prost = "0.11"
rand = "0.8"
regex = "1"
semver = "1.0"
serde = "1.0"
serde_derive = "1.0"
slog = { version = "2.3", features = [
"max_level_trace",
"release_max_level_debug",
] }
slog-term = { version = "2.4" }
thiserror = "1"
tikv-client-common = { version = "0.2.0", path = "tikv-client-common" }
tikv-client-pd = { version = "0.2.0", path = "tikv-client-pd" }
tikv-client-proto = { version = "0.2.0", path = "tikv-client-proto" }
tikv-client-store = { version = "0.2.0", path = "tikv-client-store" }
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
tonic = "0.9"
tonic = { version = "0.9", features = ["tls"] }
[dev-dependencies]
tempfile = "3.6"
clap = "2"
env_logger = "0.10"
fail = { version = "0.4", features = ["failpoints"] }
@ -59,13 +53,11 @@ serial_test = "0.5.0"
simple_logger = "1"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
[workspace]
members = [
"tikv-client-common",
"tikv-client-pd",
"tikv-client-proto",
"tikv-client-store",
]
[build-dependencies]
glob = "0.3"
tonic-build = "0.9"
# Suppress doctest bug (https://stackoverflow.com/questions/66074003/how-to-turn-off-cargo-doc-test-and-compile-for-a-specific-module-in-rust)
tonic-disable-doctest = "0.1.0"
[[test]]
name = "failpoint_tests"

View File

@ -1,7 +1,10 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use tonic_disable_doctest::BuilderEx;
fn main() {
tonic_build::configure()
.disable_doctests_for_types([".google.api.HttpRule"])
.build_server(false)
.include_file("mod.rs")
.compile(
@ -9,7 +12,7 @@ fn main() {
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap(),
&["include", "proto"],
&["proto/include", "proto"],
)
.unwrap();
}

View File

@ -27,7 +27,7 @@ async fn main() {
};
// init
let client = Client::new_with_config(args.pd, config, None)
let client = Client::new_with_config(args.pd, config)
.await
.expect("Could not connect to tikv");

View File

@ -35,7 +35,7 @@ async fn main() -> Result<()> {
// When we first create a client we receive a `Connect` structure which must be resolved before
// the client is actually connected and usable.
let client = Client::new_with_config(args.pd, config, None).await?;
let client = Client::new_with_config(args.pd, config).await?;
// Requests are created from the connected client. These calls return structures which
// implement `Future`. This means the `Future` must be resolved before the action ever takes

View File

@ -89,7 +89,7 @@ async fn main() {
Config::default()
};
let txn = Client::new_with_config(args.pd, config, None)
let txn = Client::new_with_config(args.pd, config)
.await
.expect("Could not connect to tikv");

View File

@ -58,13 +58,13 @@ pub enum Error {
Canceled(#[from] futures::channel::oneshot::Canceled),
/// Errors caused by changes of region information
#[error("Region error: {0:?}")]
RegionError(Box<tikv_client_proto::errorpb::Error>),
RegionError(Box<crate::proto::errorpb::Error>),
/// Whether the transaction is committed or not is undetermined
#[error("Whether the transaction is committed or not is undetermined")]
UndeterminedError(Box<Error>),
/// Wraps `tikv_client_proto::kvrpcpb::KeyError`
/// Wraps `crate::proto::kvrpcpb::KeyError`
#[error("{0:?}")]
KeyError(Box<tikv_client_proto::kvrpcpb::KeyError>),
KeyError(Box<crate::proto::kvrpcpb::KeyError>),
/// Multiple errors generated from the ExtractError plan.
#[error("Multiple errors: {0:?}")]
ExtractedErrors(Vec<Error>),
@ -105,14 +105,14 @@ pub enum Error {
},
}
impl From<tikv_client_proto::errorpb::Error> for Error {
fn from(e: tikv_client_proto::errorpb::Error) -> Error {
impl From<crate::proto::errorpb::Error> for Error {
fn from(e: crate::proto::errorpb::Error) -> Error {
Error::RegionError(Box::new(e))
}
}
impl From<tikv_client_proto::kvrpcpb::KeyError> for Error {
fn from(e: tikv_client_proto::kvrpcpb::KeyError) -> Error {
impl From<crate::proto::kvrpcpb::KeyError> for Error {
fn from(e: crate::proto::kvrpcpb::KeyError) -> Error {
Error::KeyError(Box::new(e))
}
}

7
src/common/mod.rs Normal file
View File

@ -0,0 +1,7 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.
mod errors;
pub mod security;
pub use self::errors::Error;
pub use self::errors::Result;

View File

@ -6,13 +6,14 @@ use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
// use grpcio::{Channel, ChannelBuilder, ChannelCredentialsBuilder, Environment};
use log::info;
use regex::Regex;
use tonic::transport::Certificate;
use tonic::transport::Channel;
use tonic::transport::ClientTlsConfig;
use tonic::transport::Identity;
use crate::internal_err;
use crate::Result;
lazy_static::lazy_static! {
@ -116,7 +117,7 @@ mod tests {
let example_ca = temp.path().join("ca");
let example_cert = temp.path().join("cert");
let example_pem = temp.path().join("key");
for (id, f) in (&[&example_ca, &example_cert, &example_pem])
for (id, f) in [&example_ca, &example_cert, &example_pem]
.iter()
.enumerate()
{
@ -125,7 +126,7 @@ mod tests {
let cert_path: PathBuf = format!("{}", example_cert.display()).into();
let key_path: PathBuf = format!("{}", example_pem.display()).into();
let ca_path: PathBuf = format!("{}", example_ca.display()).into();
let mgr = SecurityManager::load(&ca_path, &cert_path, &key_path).unwrap();
let mgr = SecurityManager::load(ca_path, cert_path, &key_path).unwrap();
assert_eq!(mgr.ca, vec![0]);
assert_eq!(mgr.cert, vec![1]);
let key = load_pem_file("private key", &key_path).unwrap();

View File

@ -14,9 +14,9 @@ use std::ops::RangeToInclusive;
#[cfg(test)]
use proptest_derive::Arbitrary;
use tikv_client_proto::kvrpcpb;
use super::Key;
use crate::proto::kvrpcpb;
/// A struct for expressing ranges. This type is semi-opaque and is not really meant for users to
/// deal with directly. Most functions which operate on ranges will accept any types which

View File

@ -1,8 +1,7 @@
use std::io::Write;
use std::ptr;
use tikv_client_common::internal_err;
use crate::internal_err;
use crate::Result;
const ENC_GROUP_SIZE: usize = 8;

View File

@ -12,11 +12,11 @@ use proptest::arbitrary::any_with;
use proptest::collection::size_range;
#[cfg(test)]
use proptest_derive::Arbitrary;
use tikv_client_proto::kvrpcpb;
use super::HexRepr;
use crate::kv::codec::BytesEncoder;
use crate::kv::codec::{self};
use crate::proto::kvrpcpb;
const _PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB

View File

@ -5,11 +5,11 @@ use std::str;
#[cfg(test)]
use proptest_derive::Arbitrary;
use tikv_client_proto::kvrpcpb;
use super::HexRepr;
use super::Key;
use super::Value;
use crate::proto::kvrpcpb;
/// A key/value pair.
///

View File

@ -67,7 +67,7 @@
//! # use futures::prelude::*;
//! # fn main() -> Result<()> {
//! # futures::executor::block_on(async {
//! let client = RawClient::new(vec!["127.0.0.1:2379"], None).await?;
//! let client = RawClient::new(vec!["127.0.0.1:2379"]).await?;
//! client.put("key".to_owned(), "value".to_owned()).await?;
//! let value = client.get("key".to_owned()).await?;
//! # Ok(())
@ -81,7 +81,7 @@
//! # use futures::prelude::*;
//! # fn main() -> Result<()> {
//! # futures::executor::block_on(async {
//! let txn_client = TransactionClient::new(vec!["127.0.0.1:2379"], None).await?;
//! let txn_client = TransactionClient::new(vec!["127.0.0.1:2379"]).await?;
//! let mut txn = txn_client.begin_optimistic().await?;
//! txn.put("key".to_owned(), "value".to_owned()).await?;
//! let value = txn.get("key".to_owned()).await?;
@ -95,17 +95,17 @@
#![allow(clippy::field_reassign_with_default)]
#![allow(clippy::arc_with_non_send_sync)]
#[macro_use]
pub mod request;
#[macro_use]
#[doc(hidden)]
pub mod transaction;
mod backoff;
mod common;
mod compat;
mod config;
mod kv;
mod pd;
mod proto;
#[doc(hidden)]
pub mod raw;
mod region;
@ -120,18 +120,14 @@ mod mock;
#[cfg(test)]
mod proptests;
#[macro_use]
extern crate slog;
extern crate slog_term;
#[doc(inline)]
pub use common::security::SecurityManager;
#[doc(inline)]
pub use common::Error;
#[doc(inline)]
pub use common::Result;
#[doc(inline)]
pub use config::Config;
#[doc(inline)]
pub use tikv_client_common::security::SecurityManager;
#[doc(inline)]
pub use tikv_client_common::Error;
#[doc(inline)]
pub use tikv_client_common::Result;
#[doc(inline)]
pub use crate::backoff::Backoff;

View File

@ -10,20 +10,18 @@ use std::sync::Arc;
use async_trait::async_trait;
use derive_new::new;
use slog::Drain;
use slog::Logger;
use tikv_client_proto::metapb::RegionEpoch;
use tikv_client_proto::metapb::{self};
use tikv_client_store::KvClient;
use tikv_client_store::KvConnect;
use tikv_client_store::Request;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::pd::RetryClient;
use crate::proto::metapb::RegionEpoch;
use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::store::KvClient;
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::Request;
use crate::Config;
use crate::Error;
use crate::Key;
@ -34,14 +32,6 @@ use crate::Timestamp;
/// client can be tested without doing any RPC calls.
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
let config = Config::default();
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let logger = Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
);
PdRpcClient::new(
config.clone(),
|_| MockKvConnect,
@ -53,7 +43,6 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
))
},
false,
logger,
)
.await
.unwrap()

View File

@ -6,24 +6,24 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::prelude::*;
use futures::stream::BoxStream;
use slog::Logger;
use tikv_client_pd::Cluster;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::metapb;
use tikv_client_store::KvClient;
use tikv_client_store::KvConnect;
use tikv_client_store::TikvConnect;
use log::info;
use tokio::sync::RwLock;
use crate::compat::stream_fn;
use crate::kv::codec;
use crate::pd::retry::RetryClientTrait;
use crate::pd::Cluster;
use crate::pd::RetryClient;
use crate::proto::kvrpcpb;
use crate::proto::metapb;
use crate::region::RegionId;
use crate::region::RegionVerId;
use crate::region::RegionWithLeader;
use crate::region_cache::RegionCache;
use crate::store::KvClient;
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::TikvConnect;
use crate::BoundRange;
use crate::Config;
use crate::Key;
@ -210,7 +210,6 @@ pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
enable_codec: bool,
region_cache: RegionCache<RetryClient<Cl>>,
logger: Logger,
}
#[async_trait]
@ -263,14 +262,12 @@ impl PdRpcClient<TikvConnect, Cluster> {
pd_endpoints: &[String],
config: Config,
enable_codec: bool,
logger: Logger,
) -> Result<PdRpcClient> {
PdRpcClient::new(
config.clone(),
|security_mgr| TikvConnect::new(security_mgr, config.timeout),
|security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
enable_codec,
logger,
)
.await
}
@ -282,7 +279,6 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
kv_connect: MakeKvC,
pd: MakePd,
enable_codec: bool,
logger: Logger,
) -> Result<PdRpcClient<KvC, Cl>>
where
PdFut: Future<Output = Result<RetryClient<Cl>>>,
@ -307,7 +303,6 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
kv_connect: kv_connect(security_mgr),
enable_codec,
region_cache: RegionCache::new(pd),
logger,
})
}
@ -315,7 +310,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
if let Some(client) = self.kv_client_cache.read().await.get(address) {
return Ok(client.clone());
};
info!(self.logger, "connect to tikv endpoint: {:?}", address);
info!("connect to tikv endpoint: {:?}", address);
match self.kv_connect.connect(address).await {
Ok(client) => {
self.kv_client_cache

View File

@ -6,16 +6,19 @@ use std::time::Duration;
use std::time::Instant;
use async_trait::async_trait;
use tikv_client_common::internal_err;
use tikv_client_proto::pdpb::Timestamp;
use tikv_client_proto::pdpb::{self};
use log::error;
use log::info;
use log::warn;
use tonic::transport::Channel;
use tonic::IntoRequest;
use tonic::Request;
use crate::timestamp::TimestampOracle;
use super::timestamp::TimestampOracle;
use crate::internal_err;
use crate::proto::pdpb;
use crate::Result;
use crate::SecurityManager;
use crate::Timestamp;
/// A PD cluster.
pub struct Cluster {
@ -28,7 +31,7 @@ pub struct Cluster {
macro_rules! pd_request {
($cluster_id:expr, $type:ty) => {{
let mut request = <$type>::default();
let mut header = ::tikv_client_proto::pdpb::RequestHeader::default();
let mut header = pdpb::RequestHeader::default();
header.cluster_id = $cluster_id;
request.header = Some(header);
request

View File

@ -1,7 +1,11 @@
mod client;
mod cluster;
mod retry;
mod timestamp;
pub use client::PdClient;
pub use client::PdRpcClient;
pub use retry::RetryClient;
pub use retry::RetryClientTrait;
pub use self::client::PdClient;
pub use self::client::PdRpcClient;
pub use self::cluster::Cluster;
pub use self::cluster::Connection;
pub use self::retry::RetryClient;
pub use self::retry::RetryClientTrait;

View File

@ -8,14 +8,14 @@ use std::time::Duration;
use std::time::Instant;
use async_trait::async_trait;
use tikv_client_pd::Cluster;
use tikv_client_pd::Connection;
use tikv_client_proto::metapb;
use tikv_client_proto::pdpb::Timestamp;
use tikv_client_proto::pdpb::{self};
use tokio::sync::RwLock;
use tokio::time::sleep;
use crate::pd::Cluster;
use crate::pd::Connection;
use crate::proto::metapb;
use crate::proto::pdpb::Timestamp;
use crate::proto::pdpb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::region::StoreId;
@ -234,9 +234,9 @@ mod test {
use futures::executor;
use futures::future::ready;
use tikv_client_common::internal_err;
use super::*;
use crate::internal_err;
#[tokio::test(flavor = "multi_thread")]
async fn test_reconnect() {

View File

@ -21,15 +21,16 @@ use futures::task::AtomicWaker;
use futures::task::Context;
use futures::task::Poll;
use log::debug;
use log::info;
use pin_project::pin_project;
use tikv_client_common::internal_err;
use tikv_client_proto::pdpb::pd_client::PdClient;
use tikv_client_proto::pdpb::*;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use crate::internal_err;
use crate::proto::pdpb::pd_client::PdClient;
use crate::proto::pdpb::*;
use crate::Result;
/// It is an empirical value.

View File

@ -1,6 +1,7 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
#![allow(clippy::large_enum_variant)]
#![allow(clippy::enum_variant_names)]
pub use protos::*;

View File

@ -5,15 +5,14 @@ use std::str::FromStr;
use std::sync::Arc;
use std::u32;
use slog::Drain;
use slog::Logger;
use tikv_client_common::Error;
use tikv_client_proto::metapb;
use log::debug;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::common::Error;
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::metapb;
use crate::raw::lowering::*;
use crate::request::Collect;
use crate::request::CollectSingle;
@ -40,7 +39,6 @@ pub struct Client<PdC: PdClient = PdRpcClient> {
cf: Option<ColumnFamily>,
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
atomic: bool,
logger: Logger,
}
impl Clone for Client {
@ -49,7 +47,6 @@ impl Clone for Client {
rpc: self.rpc.clone(),
cf: self.cf.clone(),
atomic: self.atomic,
logger: self.logger.clone(),
}
}
}
@ -67,14 +64,11 @@ impl Client<PdRpcClient> {
/// # use tikv_client::RawClient;
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// # });
/// ```
pub async fn new<S: Into<String>>(
pd_endpoints: Vec<S>,
logger: Option<Logger>,
) -> Result<Self> {
Self::new_with_config(pd_endpoints, Config::default(), logger).await
pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Self> {
Self::new_with_config(pd_endpoints, Config::default()).await
}
/// Create a raw [`Client`] with a custom configuration, and connect to the TiKV cluster.
@ -93,7 +87,6 @@ impl Client<PdRpcClient> {
/// let client = RawClient::new_with_config(
/// vec!["192.168.0.100"],
/// Config::default().with_timeout(Duration::from_secs(60)),
/// None,
/// )
/// .await
/// .unwrap();
@ -102,27 +95,13 @@ impl Client<PdRpcClient> {
pub async fn new_with_config<S: Into<String>>(
pd_endpoints: Vec<S>,
config: Config,
optional_logger: Option<Logger>,
) -> Result<Self> {
let logger = optional_logger.unwrap_or_else(|| {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
)
});
debug!(logger, "creating new raw client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc =
Arc::new(PdRpcClient::connect(&pd_endpoints, config, false, logger.clone()).await?);
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config, false).await?);
Ok(Client {
rpc,
cf: None,
atomic: false,
logger,
})
}
@ -142,7 +121,7 @@ impl Client<PdRpcClient> {
/// # use futures::prelude::*;
/// # use std::convert::TryInto;
/// # futures::executor::block_on(async {
/// let client = RawClient::new(vec!["192.168.0.100"], None)
/// let client = RawClient::new(vec!["192.168.0.100"])
/// .await
/// .unwrap()
/// .with_cf(ColumnFamily::Write);
@ -156,7 +135,6 @@ impl Client<PdRpcClient> {
rpc: self.rpc.clone(),
cf: Some(cf),
atomic: self.atomic,
logger: self.logger.clone(),
}
}
@ -173,7 +151,6 @@ impl Client<PdRpcClient> {
rpc: self.rpc.clone(),
cf: self.cf.clone(),
atomic: true,
logger: self.logger.clone(),
}
}
}
@ -191,7 +168,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{Value, Config, RawClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = client.get(key);
/// let result: Option<Value> = req.await.unwrap();
@ -203,7 +180,7 @@ impl<PdC: PdClient> Client<PdC> {
/// Same as [`get`](Client::get) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn get_opt(&self, key: impl Into<Key>, backoff: Backoff) -> Result<Option<Value>> {
debug!(self.logger, "invoking raw get request");
debug!("invoking raw get request");
let request = new_raw_get_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(backoff)
@ -225,7 +202,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{KvPair, Config, RawClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let req = client.batch_get(keys);
/// let result: Vec<KvPair> = req.await.unwrap();
@ -244,7 +221,7 @@ impl<PdC: PdClient> Client<PdC> {
keys: impl IntoIterator<Item = impl Into<Key>>,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_get request");
debug!("invoking raw batch_get request");
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(backoff)
@ -264,7 +241,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{Key, Value, Config, RawClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let key = "TiKV".to_owned();
/// let val = "TiKV".to_owned();
/// let req = client.put(key, val);
@ -282,7 +259,7 @@ impl<PdC: PdClient> Client<PdC> {
value: impl Into<Value>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw put request");
debug!("invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(backoff)
@ -302,7 +279,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{Result, KvPair, Key, Value, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let kvpair1 = ("PD".to_owned(), "Go".to_owned());
/// let kvpair2 = ("TiKV".to_owned(), "Rust".to_owned());
/// let iterable = vec![kvpair1, kvpair2];
@ -323,7 +300,7 @@ impl<PdC: PdClient> Client<PdC> {
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_put request");
debug!("invoking raw batch_put request");
let request = new_raw_batch_put_request(
pairs.into_iter().map(Into::into),
self.cf.clone(),
@ -348,7 +325,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{Key, Config, RawClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = client.delete(key);
/// let result: () = req.await.unwrap();
@ -360,7 +337,7 @@ impl<PdC: PdClient> Client<PdC> {
/// Same as [`delete`](Client::delete) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn delete_opt(&self, key: impl Into<Key>, backoff: Backoff) -> Result<()> {
debug!(self.logger, "invoking raw delete request");
debug!("invoking raw delete request");
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(backoff)
@ -382,7 +359,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{Config, RawClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let req = client.batch_delete(keys);
/// let result: () = req.await.unwrap();
@ -398,7 +375,7 @@ impl<PdC: PdClient> Client<PdC> {
keys: impl IntoIterator<Item = impl Into<Key>>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_delete request");
debug!("invoking raw batch_delete request");
self.assert_non_atomic()?;
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
@ -419,7 +396,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = client.delete_range(inclusive_range.into_owned());
/// let result: () = req.await.unwrap();
@ -435,7 +412,7 @@ impl<PdC: PdClient> Client<PdC> {
range: impl Into<BoundRange>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw delete_range request");
debug!("invoking raw delete_range request");
self.assert_non_atomic()?;
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
@ -459,7 +436,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = client.scan(inclusive_range.into_owned(), 2);
/// let result: Vec<KvPair> = req.await.unwrap();
@ -476,7 +453,7 @@ impl<PdC: PdClient> Client<PdC> {
limit: u32,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw scan request");
debug!("invoking raw scan request");
self.scan_inner(range.into(), limit, false, backoff).await
}
@ -493,7 +470,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = client.scan_keys(inclusive_range.into_owned(), 2);
/// let result: Vec<Key> = req.await.unwrap();
@ -511,7 +488,7 @@ impl<PdC: PdClient> Client<PdC> {
limit: u32,
backoff: Backoff,
) -> Result<Vec<Key>> {
debug!(self.logger, "invoking raw scan_keys request");
debug!("invoking raw scan_keys request");
Ok(self
.scan_inner(range, limit, true, backoff)
.await?
@ -535,7 +512,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range1 = "TiDB"..="TiKV";
/// let inclusive_range2 = "TiKV"..="TiSpark";
/// let iterable = vec![inclusive_range1.into_owned(), inclusive_range2.into_owned()];
@ -559,7 +536,7 @@ impl<PdC: PdClient> Client<PdC> {
each_limit: u32,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_scan request");
debug!("invoking raw batch_scan request");
self.batch_scan_inner(ranges, each_limit, false, backoff)
.await
}
@ -579,7 +556,7 @@ impl<PdC: PdClient> Client<PdC> {
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range1 = "TiDB"..="TiKV";
/// let inclusive_range2 = "TiKV"..="TiSpark";
/// let iterable = vec![inclusive_range1.into_owned(), inclusive_range2.into_owned()];
@ -603,7 +580,7 @@ impl<PdC: PdClient> Client<PdC> {
each_limit: u32,
backoff: Backoff,
) -> Result<Vec<Key>> {
debug!(self.logger, "invoking raw batch_scan_keys request");
debug!("invoking raw batch_scan_keys request");
Ok(self
.batch_scan_inner(ranges, each_limit, true, backoff)
.await?
@ -642,7 +619,7 @@ impl<PdC: PdClient> Client<PdC> {
new_value: impl Into<Value>,
backoff: Backoff,
) -> Result<(Option<Value>, bool)> {
debug!(self.logger, "invoking raw compare_and_swap request");
debug!("invoking raw compare_and_swap request");
self.assert_atomic()?;
let req = new_cas_request(
key.into(),
@ -775,23 +752,14 @@ mod tests {
use std::any::Any;
use std::sync::Arc;
use tikv_client_proto::kvrpcpb;
use super::*;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb;
use crate::Result;
#[tokio::test]
async fn test_raw_coprocessor() -> Result<()> {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let logger = Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
);
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
move |req: &dyn Any| {
if let Some(req) = req.downcast_ref::<kvrpcpb::RawCoprocessorRequest>() {
@ -811,7 +779,6 @@ mod tests {
rpc: pd_client,
cf: Some(ColumnFamily::Default),
atomic: false,
logger,
};
let resps = client
.coprocessor(

View File

@ -8,9 +8,8 @@ use std::iter::Iterator;
use std::ops::Range;
use std::sync::Arc;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::metapb;
use crate::proto::kvrpcpb;
use crate::proto::metapb;
use crate::raw::requests;
use crate::BoundRange;
use crate::ColumnFamily;

View File

@ -7,15 +7,14 @@ use std::time::Duration;
use async_trait::async_trait;
use futures::stream::BoxStream;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::metapb;
use tikv_client_proto::tikvpb::tikv_client::TikvClient;
use tikv_client_store::Request;
use tonic::transport::Channel;
use super::RawRpcRequest;
use crate::collect_first;
use crate::pd::PdClient;
use crate::proto::kvrpcpb;
use crate::proto::metapb;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::request::plan::ResponseWithShard;
use crate::request::Collect;
use crate::request::CollectSingle;
@ -25,9 +24,13 @@ use crate::request::Merge;
use crate::request::Process;
use crate::request::Shardable;
use crate::request::SingleKey;
use crate::shardable_key;
use crate::shardable_keys;
use crate::shardable_range;
use crate::store::store_stream_for_keys;
use crate::store::store_stream_for_ranges;
use crate::store::RegionStore;
use crate::store::Request;
use crate::transaction::HasLocks;
use crate::util::iter::FlatMapOkIterExt;
use crate::ColumnFamily;
@ -486,13 +489,13 @@ mod test {
use std::any::Any;
use futures::executor;
use tikv_client_proto::kvrpcpb;
use super::*;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb;
use crate::request::Plan;
use crate::Key;

View File

@ -1,9 +1,9 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use derive_new::new;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::metapb;
use crate::proto::kvrpcpb;
use crate::proto::metapb;
use crate::Error;
use crate::Key;
use crate::Result;

View File

@ -5,15 +5,15 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use tikv_client_common::Error;
use tikv_client_pd::Cluster;
use tikv_client_proto::metapb::Store;
use tikv_client_proto::metapb::{self};
use tokio::sync::Notify;
use tokio::sync::RwLock;
use crate::common::Error;
use crate::pd::Cluster;
use crate::pd::RetryClient;
use crate::pd::RetryClientTrait;
use crate::proto::metapb::Store;
use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionVerId;
use crate::region::RegionWithLeader;
@ -244,13 +244,13 @@ mod test {
use std::sync::Arc;
use async_trait::async_trait;
use tikv_client_common::Error;
use tikv_client_proto::metapb::RegionEpoch;
use tikv_client_proto::metapb::{self};
use tokio::sync::Mutex;
use super::RegionCache;
use crate::common::Error;
use crate::pd::RetryClientTrait;
use crate::proto::metapb::RegionEpoch;
use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::Key;
@ -297,15 +297,15 @@ mod test {
async fn get_store(
self: Arc<Self>,
_id: crate::region::StoreId,
) -> Result<tikv_client_proto::metapb::Store> {
) -> Result<crate::proto::metapb::Store> {
todo!()
}
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<tikv_client_proto::metapb::Store>> {
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<crate::proto::metapb::Store>> {
todo!()
}
async fn get_timestamp(self: Arc<Self>) -> Result<tikv_client_proto::pdpb::Timestamp> {
async fn get_timestamp(self: Arc<Self>) -> Result<crate::proto::pdpb::Timestamp> {
todo!()
}

View File

@ -2,8 +2,6 @@
use async_trait::async_trait;
use derive_new::new;
use tikv_client_store::HasKeyErrors;
use tikv_client_store::Request;
pub use self::plan::Collect;
pub use self::plan::CollectError;
@ -30,11 +28,12 @@ use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::backoff::PESSIMISTIC_BACKOFF;
use crate::store::HasKeyErrors;
use crate::store::Request;
use crate::transaction::HasLocks;
pub mod plan;
mod plan_builder;
#[macro_use]
mod shard;
/// Abstracts any request sent to a TiKV server.
@ -83,16 +82,16 @@ mod test {
use std::sync::Arc;
use std::time::Duration;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::pdpb::Timestamp;
use tikv_client_proto::tikvpb::tikv_client::TikvClient;
use tikv_client_store::HasRegionError;
use tonic::transport::Channel;
use super::*;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::store::store_stream_for_keys;
use crate::store::HasRegionError;
use crate::transaction::lowering::new_commit_request;
use crate::Error;
use crate::Key;
@ -110,8 +109,8 @@ mod test {
}
impl HasRegionError for MockRpcResponse {
fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error> {
Some(tikv_client_proto::errorpb::Error::default())
fn region_error(&mut self) -> Option<crate::proto::errorpb::Error> {
Some(crate::proto::errorpb::Error::default())
}
}

View File

@ -7,23 +7,25 @@ use async_recursion::async_recursion;
use async_trait::async_trait;
use futures::future::try_join_all;
use futures::prelude::*;
use tikv_client_proto::errorpb;
use tikv_client_proto::errorpb::EpochNotMatch;
use tikv_client_proto::kvrpcpb;
use tikv_client_store::HasKeyErrors;
use tikv_client_store::HasRegionError;
use tikv_client_store::HasRegionErrors;
use tikv_client_store::KvClient;
use log::debug;
use log::info;
use tokio::sync::Semaphore;
use tokio::time::sleep;
use crate::backoff::Backoff;
use crate::pd::PdClient;
use crate::proto::errorpb;
use crate::proto::errorpb::EpochNotMatch;
use crate::proto::kvrpcpb;
use crate::request::shard::HasNextBatch;
use crate::request::KvRequest;
use crate::request::NextBatch;
use crate::request::Shardable;
use crate::stats::tikv_stats;
use crate::store::HasKeyErrors;
use crate::store::HasRegionError;
use crate::store::HasRegionErrors;
use crate::store::KvClient;
use crate::store::RegionStore;
use crate::transaction::resolve_locks;
use crate::transaction::HasLocks;
@ -491,7 +493,6 @@ impl Merge<CleanupLocksResult> for Collect {
}
pub struct CleanupLocks<P: Plan, PdC: PdClient> {
pub logger: slog::Logger,
pub inner: P,
pub ctx: ResolveLocksContext,
pub options: ResolveLocksOptions,
@ -503,7 +504,6 @@ pub struct CleanupLocks<P: Plan, PdC: PdClient> {
impl<P: Plan, PdC: PdClient> Clone for CleanupLocks<P, PdC> {
fn clone(&self) -> Self {
CleanupLocks {
logger: self.logger.clone(),
inner: self.inner.clone(),
ctx: self.ctx.clone(),
options: self.options,
@ -523,8 +523,7 @@ where P::Result: HasLocks + HasNextBatch + HasKeyErrors + HasRegionError
async fn execute(&self) -> Result<Self::Result> {
let mut result = CleanupLocksResult::default();
let mut inner = self.inner.clone();
let mut lock_resolver =
crate::transaction::LockResolver::new(self.logger.clone(), self.ctx.clone());
let mut lock_resolver = crate::transaction::LockResolver::new(self.ctx.clone());
let region = &self.store.as_ref().unwrap().region_with_leader;
let mut has_more_batch = true;
@ -533,17 +532,11 @@ where P::Result: HasLocks + HasNextBatch + HasKeyErrors + HasRegionError
// Propagate errors to `retry_multi_region` for retry.
if let Some(e) = scan_lock_resp.key_errors() {
info!(
self.logger,
"CleanupLocks::execute, inner key errors:{:?}", e
);
info!("CleanupLocks::execute, inner key errors:{:?}", e);
result.key_error = Some(e);
return Ok(result);
} else if let Some(e) = scan_lock_resp.region_error() {
info!(
self.logger,
"CleanupLocks::execute, inner region error:{}", e.message
);
info!("CleanupLocks::execute, inner region error:{}", e.message);
result.region_error = Some(e);
return Ok(result);
}
@ -551,7 +544,7 @@ where P::Result: HasLocks + HasNextBatch + HasKeyErrors + HasRegionError
// Iterate to next batch of inner.
match scan_lock_resp.has_next_batch() {
Some(range) if region.contains(range.0.as_ref()) => {
debug!(self.logger, "CleanupLocks::execute, next range:{:?}", range);
debug!("CleanupLocks::execute, next range:{:?}", range);
inner.next_batch(range);
}
_ => has_more_batch = false,
@ -571,11 +564,7 @@ where P::Result: HasLocks + HasNextBatch + HasKeyErrors + HasRegionError
.filter(|l| l.use_async_commit)
.collect::<Vec<_>>();
}
debug!(
self.logger,
"CleanupLocks::execute, meet locks:{}",
locks.len()
);
debug!("CleanupLocks::execute, meet locks:{}", locks.len());
let lock_size = locks.len();
match lock_resolver
@ -714,10 +703,10 @@ impl<Resp: HasRegionError, Shard> HasRegionError for ResponseWithShard<Resp, Sha
mod test {
use futures::stream::BoxStream;
use futures::stream::{self};
use tikv_client_proto::kvrpcpb::BatchGetResponse;
use super::*;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb::BatchGetResponse;
#[derive(Clone)]
struct ErrPlan;

View File

@ -3,10 +3,6 @@
use std::marker::PhantomData;
use std::sync::Arc;
use tikv_client_store::HasKeyErrors;
use tikv_client_store::HasRegionError;
use tikv_client_store::HasRegionErrors;
use super::plan::PreserveShard;
use crate::backoff::Backoff;
use crate::pd::PdClient;
@ -25,6 +21,9 @@ use crate::request::ProcessResponse;
use crate::request::ResolveLock;
use crate::request::RetryableMultiRegion;
use crate::request::Shardable;
use crate::store::HasKeyErrors;
use crate::store::HasRegionError;
use crate::store::HasRegionErrors;
use crate::store::RegionStore;
use crate::transaction::HasLocks;
use crate::transaction::ResolveLocksContext;
@ -84,7 +83,6 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
pub fn cleanup_locks(
self,
logger: slog::Logger, // TODO: add logger to PlanBuilder.
ctx: ResolveLocksContext,
options: ResolveLocksOptions,
backoff: Backoff,
@ -96,7 +94,6 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
PlanBuilder {
pd_client: self.pd_client.clone(),
plan: CleanupLocks {
logger,
inner: self.plan,
ctx,
options,

View File

@ -6,10 +6,10 @@ use std::time::Duration;
use async_trait::async_trait;
use derive_new::new;
use tikv_client_proto::tikvpb::tikv_client::TikvClient;
use tonic::transport::Channel;
use crate::request::Request;
use super::Request;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::Result;
use crate::SecurityManager;

View File

@ -2,18 +2,17 @@
use std::fmt::Display;
use tikv_client_proto::kvrpcpb;
use crate::proto::kvrpcpb;
use crate::Error;
// Those that can have a single region error
pub trait HasRegionError {
fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error>;
fn region_error(&mut self) -> Option<crate::proto::errorpb::Error>;
}
// Those that can have multiple region errors
pub trait HasRegionErrors {
fn region_errors(&mut self) -> Option<Vec<tikv_client_proto::errorpb::Error>>;
fn region_errors(&mut self) -> Option<Vec<crate::proto::errorpb::Error>>;
}
pub trait HasKeyErrors {
@ -21,7 +20,7 @@ pub trait HasKeyErrors {
}
impl<T: HasRegionError> HasRegionErrors for T {
fn region_errors(&mut self) -> Option<Vec<tikv_client_proto::errorpb::Error>> {
fn region_errors(&mut self) -> Option<Vec<crate::proto::errorpb::Error>> {
self.region_error().map(|e| vec![e])
}
}
@ -29,7 +28,7 @@ impl<T: HasRegionError> HasRegionErrors for T {
macro_rules! has_region_error {
($type:ty) => {
impl HasRegionError for $type {
fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error> {
fn region_error(&mut self) -> Option<crate::proto::errorpb::Error> {
self.region_error.take().map(|e| e.into())
}
}
@ -183,13 +182,13 @@ impl<T: HasKeyErrors> HasKeyErrors for Vec<T> {
}
impl<T: HasRegionError, E> HasRegionError for Result<T, E> {
fn region_error(&mut self) -> Option<tikv_client_proto::errorpb::Error> {
fn region_error(&mut self) -> Option<crate::proto::errorpb::Error> {
self.as_mut().ok().and_then(|t| t.region_error())
}
}
impl<T: HasRegionError> HasRegionErrors for Vec<T> {
fn region_errors(&mut self) -> Option<Vec<tikv_client_proto::errorpb::Error>> {
fn region_errors(&mut self) -> Option<Vec<crate::proto::errorpb::Error>> {
let errors: Vec<_> = self.iter_mut().filter_map(|x| x.region_error()).collect();
if errors.is_empty() {
None
@ -212,18 +211,17 @@ fn extract_errors(
#[cfg(test)]
mod test {
use tikv_client_common::internal_err;
use tikv_client_common::Error;
use tikv_client_proto::kvrpcpb;
use super::HasKeyErrors;
use crate::common::Error;
use crate::internal_err;
use crate::proto::kvrpcpb;
#[test]
fn result_haslocks() {
let mut resp: Result<_, Error> = Ok(kvrpcpb::CommitResponse::default());
assert!(resp.key_errors().is_none());
let mut resp: Result<_, Error> = Ok(kvrpcpb::CommitResponse {
error: Some(kvrpcpb::KeyError::default()).into(),
error: Some(kvrpcpb::KeyError::default()),
..Default::default()
});
assert!(resp.key_errors().is_some());

View File

@ -1,5 +1,9 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
mod client;
mod errors;
mod request;
use std::cmp::max;
use std::cmp::min;
use std::sync::Arc;
@ -8,12 +12,16 @@ use async_trait::async_trait;
use derive_new::new;
use futures::prelude::*;
use futures::stream::BoxStream;
use tikv_client_proto::kvrpcpb;
use tikv_client_store::KvClient;
use tikv_client_store::KvConnect;
use tikv_client_store::TikvConnect;
pub use self::client::KvClient;
pub use self::client::KvConnect;
pub use self::client::TikvConnect;
pub use self::errors::HasKeyErrors;
pub use self::errors::HasRegionError;
pub use self::errors::HasRegionErrors;
pub use self::request::Request;
use crate::pd::PdClient;
use crate::proto::kvrpcpb;
use crate::region::RegionWithLeader;
use crate::BoundRange;
use crate::Key;

View File

@ -4,11 +4,11 @@ use std::any::Any;
use std::time::Duration;
use async_trait::async_trait;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::tikvpb::tikv_client::TikvClient;
use tonic::transport::Channel;
use tonic::IntoRequest;
use crate::proto::kvrpcpb;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::Error;
use crate::Result;

View File

@ -6,7 +6,7 @@
use std::convert::TryInto;
pub use tikv_client_proto::pdpb::Timestamp;
pub use crate::proto::pdpb::Timestamp;
const PHYSICAL_SHIFT_BITS: i64 = 18;
const LOGICAL_MASK: i64 = (1 << PHYSICAL_SHIFT_BITS) - 1;

View File

@ -5,8 +5,7 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::future::Future;
use tikv_client_proto::kvrpcpb;
use crate::proto::kvrpcpb;
use crate::BoundRange;
use crate::Key;
use crate::KvPair;
@ -399,9 +398,9 @@ impl MutationValue {
mod tests {
use futures::executor::block_on;
use futures::future::ready;
use tikv_client_common::internal_err;
use super::*;
use crate::internal_err;
#[test]
fn set_and_get_from_buffer() {

View File

@ -2,14 +2,14 @@
use std::sync::Arc;
use slog::Drain;
use slog::Logger;
use tikv_client_proto::pdpb::Timestamp;
use log::debug;
use log::info;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::pdpb::Timestamp;
use crate::request::plan::CleanupLocksResult;
use crate::request::Plan;
use crate::timestamp::TimestampExt;
@ -44,14 +44,12 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024;
/// awaited to execute.
pub struct Client {
pd: Arc<PdRpcClient>,
logger: Logger,
}
impl Clone for Client {
fn clone(&self) -> Self {
Self {
pd: self.pd.clone(),
logger: self.logger.clone(),
}
}
}
@ -69,17 +67,12 @@ impl Client {
/// # use tikv_client::{Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"], None)
/// .await
/// .unwrap();
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// # });
/// ```
pub async fn new<S: Into<String>>(
pd_endpoints: Vec<S>,
logger: Option<Logger>,
) -> Result<Client> {
// debug!(self.logger, "creating transactional client");
Self::new_with_config(pd_endpoints, Config::default(), logger).await
pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Client> {
// debug!("creating transactional client");
Self::new_with_config(pd_endpoints, Config::default()).await
}
/// Create a transactional [`Client`] with a custom configuration, and connect to the TiKV cluster.
@ -98,7 +91,6 @@ impl Client {
/// let client = TransactionClient::new_with_config(
/// vec!["192.168.0.100"],
/// Config::default().with_timeout(Duration::from_secs(60)),
/// None,
/// )
/// .await
/// .unwrap();
@ -107,22 +99,11 @@ impl Client {
pub async fn new_with_config<S: Into<String>>(
pd_endpoints: Vec<S>,
config: Config,
optional_logger: Option<Logger>,
) -> Result<Client> {
let logger = optional_logger.unwrap_or_else(|| {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
)
});
debug!(logger, "creating new transactional client");
debug!("creating new transactional client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true, logger.clone()).await?);
Ok(Client { pd, logger })
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true).await?);
Ok(Client { pd })
}
/// Creates a new optimistic [`Transaction`].
@ -139,16 +120,14 @@ impl Client {
/// # use tikv_client::{Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"], None)
/// .await
/// .unwrap();
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut transaction = client.begin_optimistic().await.unwrap();
/// // ... Issue some commands.
/// transaction.commit().await.unwrap();
/// # });
/// ```
pub async fn begin_optimistic(&self) -> Result<Transaction> {
debug!(self.logger, "creating new optimistic transaction");
debug!("creating new optimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
}
@ -164,16 +143,14 @@ impl Client {
/// # use tikv_client::{Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"], None)
/// .await
/// .unwrap();
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut transaction = client.begin_pessimistic().await.unwrap();
/// // ... Issue some commands.
/// transaction.commit().await.unwrap();
/// # });
/// ```
pub async fn begin_pessimistic(&self) -> Result<Transaction> {
debug!(self.logger, "creating new pessimistic transaction");
debug!("creating new pessimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
}
@ -186,9 +163,7 @@ impl Client {
/// # use tikv_client::{Config, TransactionClient, TransactionOptions};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"], None)
/// .await
/// .unwrap();
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut transaction = client
/// .begin_with_options(TransactionOptions::default().use_async_commit())
/// .await
@ -198,16 +173,15 @@ impl Client {
/// # });
/// ```
pub async fn begin_with_options(&self, options: TransactionOptions) -> Result<Transaction> {
debug!(self.logger, "creating new customized transaction");
debug!("creating new customized transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, options))
}
/// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp).
pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot {
debug!(self.logger, "creating new snapshot");
let logger = self.logger.new(o!("child" => 1));
Snapshot::new(self.new_transaction(timestamp, options.read_only()), logger)
debug!("creating new snapshot");
Snapshot::new(self.new_transaction(timestamp, options.read_only()))
}
/// Retrieve the current [`Timestamp`].
@ -218,9 +192,7 @@ impl Client {
/// # use tikv_client::{Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"], None)
/// .await
/// .unwrap();
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let timestamp = client.current_timestamp().await.unwrap();
/// # });
/// ```
@ -243,7 +215,7 @@ impl Client {
/// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview).
/// We skip the second step "delete ranges" which is an optimization for TiDB.
pub async fn gc(&self, safepoint: Timestamp) -> Result<bool> {
debug!(self.logger, "invoking transactional gc request");
debug!("invoking transactional gc request");
let options = ResolveLocksOptions {
batch_size: SCAN_LOCK_BATCH_SIZE,
@ -258,7 +230,7 @@ impl Client {
.update_safepoint(safepoint.version())
.await?;
if !res {
info!(self.logger, "new safepoint != user-specified safepoint");
info!("new safepoint != user-specified safepoint");
}
Ok(res)
}
@ -269,13 +241,13 @@ impl Client {
safepoint: &Timestamp,
options: ResolveLocksOptions,
) -> Result<CleanupLocksResult> {
debug!(self.logger, "invoking cleanup async commit locks");
debug!("invoking cleanup async commit locks");
// scan all locks with ts <= safepoint
let ctx = ResolveLocksContext::default();
let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
let req = new_scan_lock_request(range.into(), safepoint, options.batch_size);
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
.cleanup_locks(self.logger.clone(), ctx.clone(), options, backoff)
.cleanup_locks(ctx.clone(), options, backoff)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.merge(crate::request::Collect)
@ -291,7 +263,7 @@ impl Client {
safepoint: &Timestamp,
range: impl Into<BoundRange>,
batch_size: u32,
) -> Result<Vec<tikv_client_proto::kvrpcpb::LockInfo>> {
) -> Result<Vec<crate::proto::kvrpcpb::LockInfo>> {
let req = new_scan_lock_request(range.into(), safepoint, batch_size);
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
@ -301,7 +273,6 @@ impl Client {
}
fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction {
let logger = self.logger.new(o!("child" => 1));
Transaction::new(timestamp, self.pd.clone(), options, logger)
Transaction::new(timestamp, self.pd.clone(), options)
}
}

View File

@ -6,16 +6,16 @@ use std::sync::Arc;
use fail::fail_point;
use log::debug;
use slog::Logger;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::kvrpcpb::TxnInfo;
use tikv_client_proto::pdpb::Timestamp;
use log::error;
use tokio::sync::RwLock;
use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::pd::PdClient;
use crate::proto::kvrpcpb;
use crate::proto::kvrpcpb::TxnInfo;
use crate::proto::pdpb::Timestamp;
use crate::region::RegionVerId;
use crate::request::Collect;
use crate::request::CollectSingle;
@ -198,13 +198,12 @@ impl ResolveLocksContext {
}
pub struct LockResolver {
logger: Logger,
ctx: ResolveLocksContext,
}
impl LockResolver {
pub fn new(logger: Logger, ctx: ResolveLocksContext) -> Self {
Self { logger, ctx }
pub fn new(ctx: ResolveLocksContext) -> Self {
Self { ctx }
}
/// _Cleanup_ the given locks. Returns whether all the given locks are resolved.
@ -252,8 +251,7 @@ impl LockResolver {
let secondary_status = self
.check_all_secondaries(pd_client.clone(), lock_info.secondaries.clone(), txn_id)
.await?;
slog_debug!(
self.logger,
debug!(
"secondary status, txn_id:{}, commit_ts:{:?}, min_commit_version:{}, fallback_2pc:{}",
txn_id,
secondary_status
@ -265,11 +263,7 @@ impl LockResolver {
);
if secondary_status.fallback_2pc {
slog_debug!(
self.logger,
"fallback to 2pc, txn_id:{}, check_txn_status again",
txn_id
);
debug!("fallback to 2pc, txn_id:{}, check_txn_status again", txn_id);
status = self
.check_txn_status(
pd_client.clone(),
@ -296,7 +290,6 @@ impl LockResolver {
match &status.kind {
TransactionStatusKind::Locked(..) => {
error!(
self.logger,
"cleanup_locks fail to clean locks, this result is not expected. txn_id:{}",
txn_id
);
@ -307,8 +300,7 @@ impl LockResolver {
};
}
slog_debug!(
self.logger,
debug!(
"batch resolve locks, region:{:?}, txn:{:?}",
store.region_with_leader.ver_id(),
txn_infos
@ -427,11 +419,10 @@ pub trait HasLocks {
mod tests {
use std::any::Any;
use tikv_client_proto::errorpb;
use super::*;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::errorpb;
#[tokio::test]
async fn test_resolve_lock_with_retry() {

View File

@ -2,9 +2,8 @@
use std::iter::Iterator;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::pdpb::Timestamp;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
/// This module provides constructor functions for requests which take arguments as high-level
/// types (i.e., the types from the client crate) and converts these to the types used in the
/// generated protobuf code, then calls the low-level ctor functions in the requests module.

View File

@ -20,10 +20,9 @@ pub use transaction::TransactionOptions;
mod buffer;
mod client;
pub mod lowering;
#[macro_use]
mod requests;
mod lock;
pub mod lowering;
mod requests;
pub use lock::LockResolver;
pub use lock::ResolveLocksContext;
pub use lock::ResolveLocksOptions;

View File

@ -8,17 +8,17 @@ use either::Either;
use futures::stream::BoxStream;
use futures::stream::{self};
use futures::StreamExt;
use tikv_client_common::Error::PessimisticLockError;
use tikv_client_proto::kvrpcpb::Action;
use tikv_client_proto::kvrpcpb::LockInfo;
use tikv_client_proto::kvrpcpb::TxnHeartBeatResponse;
use tikv_client_proto::kvrpcpb::TxnInfo;
use tikv_client_proto::kvrpcpb::{self};
use tikv_client_proto::pdpb::Timestamp;
use super::transaction::TXN_COMMIT_BATCH_SIZE;
use crate::collect_first;
use crate::common::Error::PessimisticLockError;
use crate::pd::PdClient;
use crate::proto::kvrpcpb::Action;
use crate::proto::kvrpcpb::LockInfo;
use crate::proto::kvrpcpb::TxnHeartBeatResponse;
use crate::proto::kvrpcpb::TxnInfo;
use crate::proto::kvrpcpb::{self};
use crate::proto::pdpb::Timestamp;
use crate::request::Batchable;
use crate::request::Collect;
use crate::request::CollectSingle;
@ -32,6 +32,9 @@ use crate::request::Process;
use crate::request::ResponseWithShard;
use crate::request::Shardable;
use crate::request::SingleKey;
use crate::shardable_key;
use crate::shardable_keys;
use crate::shardable_range;
use crate::store::store_stream_for_keys;
use crate::store::store_stream_for_range;
use crate::store::RegionStore;
@ -862,10 +865,9 @@ impl HasLocks for kvrpcpb::PrewriteResponse {
#[cfg(test)]
#[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))]
mod tests {
use tikv_client_common::Error::PessimisticLockError;
use tikv_client_common::Error::ResolveLockError;
use tikv_client_proto::kvrpcpb;
use crate::common::Error::PessimisticLockError;
use crate::common::Error::ResolveLockError;
use crate::proto::kvrpcpb;
use crate::request::plan::Merge;
use crate::request::CollectWithShard;
use crate::request::ResponseWithShard;

View File

@ -1,7 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use derive_new::new;
use slog::Logger;
use log::debug;
use crate::BoundRange;
use crate::Key;
@ -20,19 +20,18 @@ use crate::Value;
#[derive(new)]
pub struct Snapshot {
transaction: Transaction,
logger: Logger,
}
impl Snapshot {
/// Get the value associated with the given key.
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!(self.logger, "invoking get request on snapshot");
debug!("invoking get request on snapshot");
self.transaction.get(key).await
}
/// Check whether the key exists.
pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
debug!(self.logger, "invoking key_exists request on snapshot");
debug!("invoking key_exists request on snapshot");
self.transaction.key_exists(key).await
}
@ -41,7 +40,7 @@ impl Snapshot {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
debug!(self.logger, "invoking batch_get request on snapshot");
debug!("invoking batch_get request on snapshot");
self.transaction.batch_get(keys).await
}
@ -51,7 +50,7 @@ impl Snapshot {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
debug!(self.logger, "invoking scan request on snapshot");
debug!("invoking scan request on snapshot");
self.transaction.scan(range, limit).await
}
@ -61,7 +60,7 @@ impl Snapshot {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = Key>> {
debug!(self.logger, "invoking scan_keys request on snapshot");
debug!("invoking scan_keys request on snapshot");
self.transaction.scan_keys(range, limit).await
}
@ -71,7 +70,7 @@ impl Snapshot {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
debug!(self.logger, "invoking scan_reverse request on snapshot");
debug!("invoking scan_reverse request on snapshot");
self.transaction.scan_reverse(range, limit).await
}
@ -81,10 +80,7 @@ impl Snapshot {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = Key>> {
debug!(
self.logger,
"invoking scan_keys_reverse request on snapshot"
);
debug!("invoking scan_keys_reverse request on snapshot");
self.transaction.scan_keys_reverse(range, limit).await
}
}

View File

@ -7,9 +7,8 @@ use std::time::Instant;
use derive_new::new;
use fail::fail_point;
use futures::prelude::*;
use slog::Logger;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::pdpb::Timestamp;
use log::debug;
use log::warn;
use tokio::sync::RwLock;
use tokio::time::Duration;
@ -17,6 +16,8 @@ use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
use crate::request::Collect;
use crate::request::CollectError;
use crate::request::CollectSingle;
@ -65,9 +66,7 @@ use crate::Value;
/// # use tikv_client::{Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"], None)
/// .await
/// .unwrap();
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let foo = txn.get("foo".to_owned()).await.unwrap().unwrap();
/// txn.put("bar".to_owned(), foo).await.unwrap();
@ -82,7 +81,6 @@ pub struct Transaction<PdC: PdClient = PdRpcClient> {
options: TransactionOptions,
is_heartbeat_started: bool,
start_instant: Instant,
logger: Logger,
}
impl<PdC: PdClient> Transaction<PdC> {
@ -90,7 +88,6 @@ impl<PdC: PdClient> Transaction<PdC> {
timestamp: Timestamp,
rpc: Arc<PdC>,
options: TransactionOptions,
logger: Logger,
) -> Transaction<PdC> {
let status = if options.read_only {
TransactionStatus::ReadOnly
@ -105,7 +102,6 @@ impl<PdC: PdClient> Transaction<PdC> {
options,
is_heartbeat_started: false,
start_instant: std::time::Instant::now(),
logger,
}
}
@ -121,14 +117,14 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use tikv_client::{Value, Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key = "TiKV".to_owned();
/// let result: Option<Value> = txn.get(key).await.unwrap();
/// # });
/// ```
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!(self.logger, "invoking transactional get request");
debug!("invoking transactional get request");
self.check_allow_operation().await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
@ -182,7 +178,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use tikv_client::{Value, Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_pessimistic().await.unwrap();
/// let key = "TiKV".to_owned();
/// let result: Value = txn.get_for_update(key).await.unwrap().unwrap();
@ -192,7 +188,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!(self.logger, "invoking transactional get_for_update request");
debug!("invoking transactional get_for_update request");
self.check_allow_operation().await?;
if !self.is_pessimistic() {
let key = key.into();
@ -216,14 +212,14 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use tikv_client::{Value, Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_pessimistic().await.unwrap();
/// let exists = txn.key_exists("k1".to_owned()).await.unwrap();
/// txn.commit().await.unwrap();
/// # });
/// ```
pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
debug!(self.logger, "invoking transactional key_exists request");
debug!("invoking transactional key_exists request");
Ok(self.get(key).await?.is_some())
}
@ -242,7 +238,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use futures::prelude::*;
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let result: HashMap<Key, Value> = txn
@ -259,7 +255,7 @@ impl<PdC: PdClient> Transaction<PdC> {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
debug!(self.logger, "invoking transactional batch_get request");
debug!("invoking transactional batch_get request");
self.check_allow_operation().await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
@ -295,7 +291,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use futures::prelude::*;
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_pessimistic().await.unwrap();
/// let keys = vec!["foo".to_owned(), "bar".to_owned()];
/// let result: Vec<KvPair> = txn
@ -311,10 +307,7 @@ impl<PdC: PdClient> Transaction<PdC> {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
debug!(
self.logger,
"invoking transactional batch_get_for_update request"
);
debug!("invoking transactional batch_get_for_update request");
self.check_allow_operation().await?;
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
if !self.is_pessimistic() {
@ -340,7 +333,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use futures::prelude::*;
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key1: Key = b"foo".to_vec().into();
/// let key2: Key = b"bar".to_vec().into();
@ -358,7 +351,7 @@ impl<PdC: PdClient> Transaction<PdC> {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
debug!(self.logger, "invoking transactional scan request");
debug!("invoking transactional scan request");
self.scan_inner(range, limit, false, false).await
}
@ -376,7 +369,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use futures::prelude::*;
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key1: Key = b"foo".to_vec().into();
/// let key2: Key = b"bar".to_vec().into();
@ -394,7 +387,7 @@ impl<PdC: PdClient> Transaction<PdC> {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = Key>> {
debug!(self.logger, "invoking transactional scan_keys request");
debug!("invoking transactional scan_keys request");
Ok(self
.scan_inner(range, limit, true, false)
.await?
@ -409,7 +402,7 @@ impl<PdC: PdClient> Transaction<PdC> {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
debug!(self.logger, "invoking transactional scan_reverse request");
debug!("invoking transactional scan_reverse request");
self.scan_inner(range, limit, false, true).await
}
@ -421,10 +414,7 @@ impl<PdC: PdClient> Transaction<PdC> {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = Key>> {
debug!(
self.logger,
"invoking transactional scan_keys_reverse request"
);
debug!("invoking transactional scan_keys_reverse request");
Ok(self
.scan_inner(range, limit, true, true)
.await?
@ -439,7 +429,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use tikv_client::{Key, Value, Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key = "foo".to_owned();
/// let val = "FOO".to_owned();
@ -448,7 +438,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn put(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!(self.logger, "invoking transactional put request");
debug!("invoking transactional put request");
self.check_allow_operation().await?;
let key = key.into();
if self.is_pessimistic() {
@ -470,7 +460,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use tikv_client::{Key, Value, Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key = "foo".to_owned();
/// let val = "FOO".to_owned();
@ -479,7 +469,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!(self.logger, "invoking transactional insert request");
debug!("invoking transactional insert request");
self.check_allow_operation().await?;
let key = key.into();
if self.buffer.get(&key).is_some() {
@ -506,7 +496,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use tikv_client::{Key, Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let key = "foo".to_owned();
/// txn.delete(key);
@ -514,7 +504,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn delete(&mut self, key: impl Into<Key>) -> Result<()> {
debug!(self.logger, "invoking transactional delete request");
debug!("invoking transactional delete request");
self.check_allow_operation().await?;
let key = key.into();
if self.is_pessimistic() {
@ -541,7 +531,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use tikv_client::{Config, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
/// // ... Do some actions.
@ -552,7 +542,7 @@ impl<PdC: PdClient> Transaction<PdC> {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<()> {
debug!(self.logger, "invoking transactional lock_keys request");
debug!("invoking transactional lock_keys request");
self.check_allow_operation().await?;
match self.options.kind {
TransactionKind::Optimistic => {
@ -577,14 +567,14 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use tikv_client::{Config, Timestamp, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// // ... Do some actions.
/// let result: Timestamp = txn.commit().await.unwrap().unwrap();
/// # });
/// ```
pub async fn commit(&mut self) -> Result<Option<Timestamp>> {
debug!(self.logger, "commiting transaction");
debug!("commiting transaction");
{
let mut status = self.status.write().await;
if !matches!(
@ -613,7 +603,6 @@ impl<PdC: PdClient> Transaction<PdC> {
self.options.clone(),
self.buffer.get_write_size() as u64,
self.start_instant,
self.logger.new(o!("child" => 1)),
)
.commit()
.await;
@ -635,14 +624,14 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # use tikv_client::{Config, Timestamp, TransactionClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100"], None).await.unwrap();
/// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// // ... Do some actions.
/// txn.rollback().await.unwrap();
/// # });
/// ```
pub async fn rollback(&mut self) -> Result<()> {
debug!(self.logger, "rolling back transaction");
debug!("rolling back transaction");
{
let status = self.status.read().await;
if !matches!(
@ -670,7 +659,6 @@ impl<PdC: PdClient> Transaction<PdC> {
self.options.clone(),
self.buffer.get_write_size() as u64,
self.start_instant,
self.logger.new(o!("child" => 1)),
)
.rollback()
.await;
@ -692,7 +680,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// Returns the TTL set on the transaction's locks by TiKV.
#[doc(hidden)]
pub async fn send_heart_beat(&mut self) -> Result<u64> {
debug!(self.logger, "sending heart_beat");
debug!("sending heart_beat");
self.check_allow_operation().await?;
let primary_key = match self.buffer.get_primary_key() {
Some(k) => k,
@ -760,7 +748,7 @@ impl<PdC: PdClient> Transaction<PdC> {
keys: impl IntoIterator<Item = impl PessimisticLock>,
need_value: bool,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "acquiring pessimistic lock");
debug!("acquiring pessimistic lock");
assert!(
matches!(self.options.kind, TransactionKind::Pessimistic(_)),
"`pessimistic_lock` is only valid to use with pessimistic transactions"
@ -830,7 +818,7 @@ impl<PdC: PdClient> Transaction<PdC> {
start_version: Timestamp,
for_update_ts: Timestamp,
) -> Result<()> {
debug!(self.logger, "rollback pessimistic lock");
debug!("rollback pessimistic lock");
let keys: Vec<_> = keys.into_iter().collect();
if keys.is_empty() {
@ -873,7 +861,7 @@ impl<PdC: PdClient> Transaction<PdC> {
}
async fn start_auto_heartbeat(&mut self) {
debug!(self.logger, "starting auto_heartbeat");
debug!("starting auto_heartbeat");
if !self.options.heartbeat_option.is_auto_heartbeat() || self.is_heartbeat_started {
return;
}
@ -931,7 +919,7 @@ impl<PdC: PdClient> Transaction<PdC> {
impl<PdC: PdClient> Drop for Transaction<PdC> {
fn drop(&mut self) {
debug!(self.logger, "dropping transaction");
debug!("dropping transaction");
if std::thread::panicking() {
return;
}
@ -941,10 +929,9 @@ impl<PdC: PdClient> Drop for Transaction<PdC> {
CheckLevel::Panic => {
panic!("Dropping an active transaction. Consider commit or rollback it.")
}
CheckLevel::Warn => warn!(
self.logger,
"Dropping an active transaction. Consider commit or rollback it."
),
CheckLevel::Warn => {
warn!("Dropping an active transaction. Consider commit or rollback it.")
}
CheckLevel::None => {}
}
@ -1149,12 +1136,11 @@ struct Committer<PdC: PdClient = PdRpcClient> {
undetermined: bool,
write_size: u64,
start_instant: Instant,
logger: Logger,
}
impl<PdC: PdClient> Committer<PdC> {
async fn commit(mut self) -> Result<Option<Timestamp>> {
debug!(self.logger, "committing");
debug!("committing");
let min_commit_ts = self.prewrite().await?;
@ -1193,7 +1179,7 @@ impl<PdC: PdClient> Committer<PdC> {
}
async fn prewrite(&mut self) -> Result<Option<Timestamp>> {
debug!(self.logger, "prewriting");
debug!("prewriting");
let primary_lock = self.primary_key.clone().unwrap();
let elapsed = self.start_instant.elapsed().as_millis() as u64;
let lock_ttl = self.calc_txn_lock_ttl();
@ -1255,7 +1241,7 @@ impl<PdC: PdClient> Committer<PdC> {
/// Commits the primary key and returns the commit version
async fn commit_primary(&mut self) -> Result<Timestamp> {
debug!(self.logger, "committing primary");
debug!("committing primary");
let primary_key = self.primary_key.clone().into_iter();
let commit_version = self.rpc.clone().get_timestamp().await?;
let req = new_commit_request(
@ -1283,7 +1269,7 @@ impl<PdC: PdClient> Committer<PdC> {
}
async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
debug!(self.logger, "committing secondary");
debug!("committing secondary");
let mutations_len = self.mutations.len();
let primary_only = mutations_len == 1;
#[cfg(not(feature = "integration-tests"))]
@ -1293,7 +1279,6 @@ impl<PdC: PdClient> Committer<PdC> {
let mutations = self.mutations.into_iter().take({
// Truncate mutation to a new length as `percent/100`.
// Return error when truncate to zero.
let logger = self.logger.clone();
let fp = || -> Result<usize> {
let mut new_len = mutations_len;
fail_point!("before-commit-secondary", |percent| {
@ -1305,10 +1290,8 @@ impl<PdC: PdClient> Committer<PdC> {
))
} else {
debug!(
logger,
"failpoint: before-commit-secondary truncate mutation {} -> {}",
mutations_len,
new_len
mutations_len, new_len
);
Ok(new_len)
}
@ -1340,7 +1323,7 @@ impl<PdC: PdClient> Committer<PdC> {
}
async fn rollback(self) -> Result<()> {
debug!(self.logger, "rolling back");
debug!("rolling back");
if self.options.kind == TransactionKind::Optimistic && self.mutations.is_empty() {
return Ok(());
}
@ -1410,27 +1393,17 @@ mod tests {
use std::time::Duration;
use fail::FailScenario;
use slog::Drain;
use slog::Logger;
use tikv_client_proto::kvrpcpb;
use tikv_client_proto::pdpb::Timestamp;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
use crate::transaction::HeartbeatOption;
use crate::Transaction;
use crate::TransactionOptions;
#[tokio::test]
async fn test_optimistic_heartbeat() -> Result<(), io::Error> {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let logger = Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
);
let scenario = FailScenario::setup();
fail::cfg("after-prewrite", "sleep(1500)").unwrap();
let heartbeats = Arc::new(AtomicUsize::new(0));
@ -1453,7 +1426,6 @@ mod tests {
pd_client,
TransactionOptions::new_optimistic()
.heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))),
logger.new(o!("child" => 1)),
);
heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
let heartbeat_txn_handle = tokio::task::spawn_blocking(move || {
@ -1468,15 +1440,6 @@ mod tests {
#[tokio::test]
async fn test_pessimistic_heartbeat() -> Result<(), io::Error> {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let logger = Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
);
let heartbeats = Arc::new(AtomicUsize::new(0));
let heartbeats_cloned = heartbeats.clone();
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
@ -1502,7 +1465,6 @@ mod tests {
pd_client,
TransactionOptions::new_pessimistic()
.heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))),
logger.new(o!("child" => 1)),
);
heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
assert_eq!(heartbeats.load(Ordering::SeqCst), 0);

View File

@ -2,10 +2,10 @@
//! The module provides some utility functions to control and get information
//! from PD, using its HTTP API.
use tikv_client_common::Error;
use tikv_client_common::Result;
use tikv_client::Error;
use super::pd_addrs;
use crate::common::Result;
pub async fn get_region_count() -> Result<u64> {
let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0]))

View File

@ -10,7 +10,6 @@ use std::time::Duration;
use log::info;
use log::warn;
use rand::Rng;
use slog::Drain;
use tikv_client::ColumnFamily;
use tikv_client::Key;
use tikv_client::RawClient;
@ -33,7 +32,7 @@ pub async fn clear_tikv() {
// DEFAULT_REGION_BACKOFF is not long enough for CI environment. So set a longer backoff.
let backoff = tikv_client::Backoff::no_jitter_backoff(100, 30000, 20);
for cf in cfs {
let raw_client = RawClient::new(pd_addrs(), None).await.unwrap().with_cf(cf);
let raw_client = RawClient::new(pd_addrs()).await.unwrap().with_cf(cf);
raw_client
.delete_range_opt(vec![].., backoff.clone())
.await
@ -78,7 +77,7 @@ async fn ensure_region_split(
// 1. write plenty transactional keys
// 2. wait until regions split
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut txn = client.begin_optimistic().await?;
for key in keys.into_iter() {
txn.put(key.into(), vec![0, 0, 0, 0]).await?;
@ -118,17 +117,6 @@ pub fn pd_addrs() -> Vec<String> {
.collect()
}
pub fn new_logger(level: slog::Level) -> slog::Logger {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
slog::Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(level)
.fuse(),
slog::o!(),
)
}
// helper function
pub async fn get_u32(client: &RawClient, key: Vec<u8>) -> Result<u32> {
let x = client.get(key).await?.unwrap();

View File

@ -9,9 +9,9 @@ use std::time::Duration;
use common::*;
use fail::FailScenario;
use log::info;
use rand::thread_rng;
use serial_test::serial;
use slog::info;
use tikv_client::transaction::Client;
use tikv_client::transaction::HeartbeatOption;
use tikv_client::transaction::ResolveLocksOptions;
@ -34,7 +34,7 @@ async fn txn_optimistic_heartbeat() -> Result<()> {
let key1 = "key1".to_owned();
let key2 = "key2".to_owned();
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
// CheckLevel::Panic makes the case unstable, change to Warn level for now.
// See https://github.com/tikv/client-rust/issues/389
@ -99,8 +99,6 @@ async fn txn_optimistic_heartbeat() -> Result<()> {
#[tokio::test]
#[serial]
async fn txn_cleanup_locks_batch_size() -> Result<()> {
let logger = new_logger(slog::Level::Info);
init().await?;
let scenario = FailScenario::setup();
let full_range = ..;
@ -112,7 +110,7 @@ async fn txn_cleanup_locks_batch_size() -> Result<()> {
fail::cfg("before-cleanup-locks", "off").unwrap();
}}
let client = TransactionClient::new(pd_addrs(), Some(logger.clone())).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let keys = write_data(&client, true, true).await?;
assert_eq!(count_locks(&client).await?, keys.len());
@ -135,21 +133,19 @@ async fn txn_cleanup_locks_batch_size() -> Result<()> {
#[tokio::test]
#[serial]
async fn txn_cleanup_async_commit_locks() -> Result<()> {
let logger = new_logger(slog::Level::Info);
init().await?;
let scenario = FailScenario::setup();
let full_range = ..;
// no commit
{
info!(logger, "test no commit");
info!("test no commit");
fail::cfg("after-prewrite", "return").unwrap();
defer! {
fail::cfg("after-prewrite", "off").unwrap()
}
let client = TransactionClient::new(pd_addrs(), Some(logger.clone())).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let keys = write_data(&client, true, true).await?;
assert_eq!(count_locks(&client).await?, keys.len());
@ -168,14 +164,14 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
// partial commit
{
info!(logger, "test partial commit");
info!("test partial commit");
let percent = 50;
fail::cfg("before-commit-secondary", &format!("return({percent})")).unwrap();
defer! {
fail::cfg("before-commit-secondary", "off").unwrap()
}
let client = TransactionClient::new(pd_addrs(), Some(logger.clone())).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let keys = write_data(&client, true, false).await?;
thread::sleep(Duration::from_secs(1)); // Wait for async commit to complete.
assert_eq!(count_locks(&client).await?, keys.len() * percent / 100);
@ -195,8 +191,8 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
// all committed
{
info!(logger, "test all committed");
let client = TransactionClient::new(pd_addrs(), Some(logger.clone())).await?;
info!("test all committed");
let client = TransactionClient::new(pd_addrs()).await?;
let keys = write_data(&client, true, false).await?;
let safepoint = client.current_timestamp().await?;
@ -223,21 +219,19 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
#[tokio::test]
#[serial]
async fn txn_cleanup_range_async_commit_locks() -> Result<()> {
let logger = new_logger(slog::Level::Info);
init().await?;
let scenario = FailScenario::setup();
info!(logger, "test range clean lock");
info!("test range clean lock");
fail::cfg("after-prewrite", "return").unwrap();
defer! {
fail::cfg("after-prewrite", "off").unwrap()
}
let client = TransactionClient::new(pd_addrs(), Some(logger.clone())).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let keys = write_data(&client, true, true).await?;
assert_eq!(count_locks(&client).await?, keys.len());
info!(logger, "total keys' count {}", keys.len());
info!("total keys' count {}", keys.len());
let mut sorted_keys: Vec<Vec<u8>> = Vec::from_iter(keys.clone());
sorted_keys.sort();
let start_key = sorted_keys[1].clone();
@ -270,21 +264,19 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> {
#[tokio::test]
#[serial]
async fn txn_cleanup_2pc_locks() -> Result<()> {
let logger = new_logger(slog::Level::Info);
init().await?;
let scenario = FailScenario::setup();
let full_range = ..;
// no commit
{
info!(logger, "test no commit");
info!("test no commit");
fail::cfg("after-prewrite", "return").unwrap();
defer! {
fail::cfg("after-prewrite", "off").unwrap()
}
let client = TransactionClient::new(pd_addrs(), Some(logger.clone())).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let keys = write_data(&client, false, true).await?;
assert_eq!(count_locks(&client).await?, keys.len());
@ -313,8 +305,8 @@ async fn txn_cleanup_2pc_locks() -> Result<()> {
// all committed
{
info!(logger, "test all committed");
let client = TransactionClient::new(pd_addrs(), Some(logger.clone())).await?;
info!("test all committed");
let client = TransactionClient::new(pd_addrs()).await?;
let keys = write_data(&client, false, false).await?;
assert_eq!(count_locks(&client).await?, 0);

View File

@ -40,7 +40,7 @@ const NUM_TRNASFER: u32 = 100;
#[serial]
async fn txn_get_timestamp() -> Result<()> {
const COUNT: usize = 1 << 8; // use a small number to make test fast
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut versions = future::join_all((0..COUNT).map(|_| client.current_timestamp()))
.await
@ -61,7 +61,7 @@ async fn txn_get_timestamp() -> Result<()> {
async fn txn_crud() -> Result<()> {
init().await?;
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut txn = client.begin_optimistic().await?;
// Get non-existent keys
@ -145,7 +145,7 @@ async fn txn_crud() -> Result<()> {
async fn txn_insert_duplicate_keys() -> Result<()> {
init().await?;
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
// Initialize TiKV store with {foo => bar}
let mut txn = client.begin_optimistic().await?;
txn.put("foo".to_owned(), "bar".to_owned()).await?;
@ -169,7 +169,7 @@ async fn txn_insert_duplicate_keys() -> Result<()> {
async fn txn_pessimistic() -> Result<()> {
init().await?;
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut txn = client.begin_pessimistic().await?;
txn.put("foo".to_owned(), "foo".to_owned()).await.unwrap();
@ -186,7 +186,7 @@ async fn txn_pessimistic() -> Result<()> {
async fn txn_split_batch() -> Result<()> {
init().await?;
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut txn = client.begin_optimistic().await?;
let mut rng = thread_rng();
@ -224,7 +224,7 @@ async fn txn_split_batch() -> Result<()> {
#[serial]
async fn raw_bank_transfer() -> Result<()> {
init().await?;
let client = RawClient::new(pd_addrs(), None).await?;
let client = RawClient::new(pd_addrs()).await?;
let mut rng = thread_rng();
let people = gen_u32_keys(NUM_PEOPLE, &mut rng);
@ -276,7 +276,7 @@ async fn txn_read() -> Result<()> {
let value = "large_value".repeat(10);
init().await?;
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
for i in 0..2u32.pow(NUM_BITS_TXN) {
let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN);
@ -368,7 +368,7 @@ async fn txn_read() -> Result<()> {
#[serial]
async fn txn_bank_transfer() -> Result<()> {
init().await?;
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut rng = thread_rng();
let options = TransactionOptions::new_optimistic()
.use_async_commit()
@ -421,7 +421,7 @@ async fn txn_bank_transfer() -> Result<()> {
#[serial]
async fn raw_req() -> Result<()> {
init().await?;
let client = RawClient::new(pd_addrs(), None).await?;
let client = RawClient::new(pd_addrs()).await?;
// empty; get non-existent key
let res = client.get("k1".to_owned()).await;
@ -551,7 +551,7 @@ async fn raw_req() -> Result<()> {
#[serial]
async fn txn_update_safepoint() -> Result<()> {
init().await?;
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let res = client.gc(client.current_timestamp().await?).await?;
assert!(res);
Ok(())
@ -566,7 +566,7 @@ async fn raw_write_million() -> Result<()> {
let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN);
init().await?;
let client = RawClient::new(pd_addrs(), None).await?;
let client = RawClient::new(pd_addrs()).await?;
for i in 0..2u32.pow(NUM_BITS_TXN) {
let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN);
@ -612,7 +612,7 @@ async fn raw_write_million() -> Result<()> {
#[serial]
async fn txn_pessimistic_rollback() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;
let mut preload_txn = client.begin_optimistic().await?;
let key1 = vec![1];
let key2 = vec![2];
@ -644,7 +644,7 @@ async fn txn_pessimistic_rollback() -> Result<()> {
#[serial]
async fn txn_pessimistic_delete() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;
// The transaction will lock the keys and must release the locks on commit,
// even when values are not written to the DB.
@ -695,7 +695,7 @@ async fn txn_pessimistic_delete() -> Result<()> {
#[serial]
async fn txn_lock_keys() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;
let k1 = b"key1".to_vec();
let k2 = b"key2".to_vec();
@ -729,7 +729,7 @@ async fn txn_lock_keys() -> Result<()> {
#[serial]
async fn txn_lock_keys_error_handle() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;
// Keys in `k` should locate in different regions. See `init()` for boundary of regions.
let k: Vec<Key> = vec![
@ -767,7 +767,7 @@ async fn txn_lock_keys_error_handle() -> Result<()> {
#[serial]
async fn txn_get_for_update() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;
let key1 = "key".to_owned();
let key2 = "another key".to_owned();
let value1 = b"some value".to_owned();
@ -814,7 +814,7 @@ async fn txn_pessimistic_heartbeat() -> Result<()> {
let key1 = "key1".to_owned();
let key2 = "key2".to_owned();
let client = TransactionClient::new(pd_addrs(), None).await?;
let client = TransactionClient::new(pd_addrs()).await?;
let mut heartbeat_txn = client
.begin_with_options(TransactionOptions::new_pessimistic())
@ -854,9 +854,7 @@ async fn txn_pessimistic_heartbeat() -> Result<()> {
#[serial]
async fn raw_cas() -> Result<()> {
init().await?;
let client = RawClient::new(pd_addrs(), None)
.await?
.with_atomic_for_cas();
let client = RawClient::new(pd_addrs()).await?.with_atomic_for_cas();
let key = "key".to_owned();
let value = "value".to_owned();
let new_value = "new value".to_owned();
@ -899,7 +897,7 @@ async fn raw_cas() -> Result<()> {
client.batch_delete(vec![key.clone()]).await.err().unwrap(),
Error::UnsupportedMode
));
let client = RawClient::new(pd_addrs(), None).await?;
let client = RawClient::new(pd_addrs()).await?;
assert!(matches!(
client
.compare_and_swap(key.clone(), None, vec![])
@ -916,7 +914,7 @@ async fn raw_cas() -> Result<()> {
#[serial]
async fn txn_scan() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;
let k1 = b"a".to_vec();
let v = b"b".to_vec();
@ -939,7 +937,7 @@ async fn txn_scan() -> Result<()> {
#[serial]
async fn txn_scan_reverse() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;
let k1 = b"a1".to_vec();
let k2 = b"a2".to_vec();
@ -975,7 +973,7 @@ async fn txn_scan_reverse() -> Result<()> {
#[serial]
async fn txn_key_exists() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;
let key = "key".to_owned();
let value = "value".to_owned();
let mut t1 = client.begin_optimistic().await?;

View File

@ -1,30 +0,0 @@
[package]
name = "tikv-client-common"
version = "0.2.0"
edition = "2021"
license = "Apache-2.0"
authors = ["The TiKV Project Authors"]
repository = "https://github.com/tikv/client-rust"
description = "Common components of the TiKV Rust client"
[dependencies]
futures = { version = "0.3", features = [
"compat",
"async-await",
"thread-pool",
] }
lazy_static = "1"
log = "0.4"
regex = "1"
semver = "1"
thiserror = "1"
tikv-client-proto = { version = "0.2.0", path = "../tikv-client-proto" }
tokio = "1"
tonic = { version = "0.9", features = ["tls"] }
[dev-dependencies]
clap = "2"
fail = { version = "0.4", features = ["failpoints"] }
proptest = "1"
proptest-derive = "0.3"
tempfile = "3"

View File

@ -1,11 +0,0 @@
#[macro_use]
mod errors;
pub mod security;
#[macro_use]
extern crate log;
#[doc(inline)]
pub use crate::errors::Error;
#[doc(inline)]
pub use crate::errors::Result;

View File

@ -1,24 +0,0 @@
[package]
name = "tikv-client-pd"
version = "0.2.0"
edition = "2021"
license = "Apache-2.0"
authors = ["The TiKV Project Authors"]
repository = "https://github.com/tikv/client-rust"
description = "Low level PD components for the TiKV Rust client"
[dependencies]
async-trait = "0.1"
futures = "0.3"
log = "0.4"
pin-project = "1"
tikv-client-common = { version = "0.2.0", path = "../tikv-client-common" }
tikv-client-proto = { version = "0.2.0", path = "../tikv-client-proto" }
tokio = { version = "1", features = ["sync"] }
tonic = "0.9"
[dev-dependencies]
clap = "2"
fail = { version = "0.4", features = ["failpoints"] }
proptest = "1"
proptest-derive = "0.3"

View File

@ -1,18 +0,0 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
#[doc(inline)]
pub use cluster::Cluster;
#[doc(inline)]
pub use cluster::Connection;
#[doc(inline)]
pub use tikv_client_common::security::SecurityManager;
#[doc(inline)]
pub use tikv_client_common::Error;
#[doc(inline)]
pub use tikv_client_common::Result;
#[macro_use]
extern crate log;
mod cluster;
mod timestamp;

View File

@ -1,22 +0,0 @@
[package]
name = "tikv-client-proto"
version = "0.2.0"
edition = "2021"
license = "Apache-2.0"
authors = ["The TiKV Project Authors"]
repository = "https://github.com/tikv/client-rust"
description = "Protobuf specs for the TiKV Rust client"
build = "build.rs"
[build-dependencies]
glob = "0.3.1"
tonic-build = "0.9"
[dependencies]
futures = "0.3"
lazy_static = { version = "1" }
prost = "0.11"
tonic = "0.9"
[lib]
doctest = false

View File

@ -1,7 +0,0 @@
# TiKV client protobuf definitions
This crate builds Rust protobufs required by the TiKV client.
The protobuf definitions are in proto and include. These are copied from the [kvproto repo](https://github.com/pingcap/kvproto). They are copied because the kvproto crate is difficult to publish.
To update the protos, copy them all from that repo. They will be rebuilt automatically when you build the client (or this crate).

View File

@ -1,21 +0,0 @@
[package]
name = "tikv-client-store"
version = "0.2.0"
edition = "2018"
license = "Apache-2.0"
authors = ["The TiKV Project Authors"]
repository = "https://github.com/tikv/client-rust"
description = "Low level TiKV node components of the TiKV Rust client"
[dependencies]
async-trait = "0.1"
derive-new = "0.5"
futures = { version = "0.3", features = [
"compat",
"async-await",
"thread-pool",
] }
log = "0.4"
tikv-client-common = { version = "0.2.0", path = "../tikv-client-common" }
tikv-client-proto = { version = "0.2.0", path = "../tikv-client-proto" }
tonic = "0.9"

View File

@ -1,24 +0,0 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
mod client;
mod errors;
mod request;
pub use tikv_client_common::security::SecurityManager;
pub use tikv_client_common::Error;
pub use tikv_client_common::Result;
#[doc(inline)]
pub use crate::client::KvClient;
#[doc(inline)]
pub use crate::client::KvConnect;
#[doc(inline)]
pub use crate::client::TikvConnect;
#[doc(inline)]
pub use crate::errors::HasKeyErrors;
#[doc(inline)]
pub use crate::errors::HasRegionError;
#[doc(inline)]
pub use crate::errors::HasRegionErrors;
#[doc(inline)]
pub use crate::request::Request;