mirror of https://github.com/tikv/client-rust.git
Merge pull request #211 from andylokandy/error
Use thiserror and rename Error to ClientError
This commit is contained in:
commit
0c7fd9594c
|
@ -1,20 +1,5 @@
|
|||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
[[package]]
|
||||
name = "addr2line"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b6a2d3371669ab3ca9797670853d61402b03d0b4b9ebf33d677dfa720203072"
|
||||
dependencies = [
|
||||
"gimli",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "adler"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e"
|
||||
|
||||
[[package]]
|
||||
name = "adler32"
|
||||
version = "1.1.0"
|
||||
|
@ -79,20 +64,6 @@ version = "1.0.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.50"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "46254cf2fdcdf1badb5934448c1bcbe046a56537b3987d96c51a7afc5d03f293"
|
||||
dependencies = [
|
||||
"addr2line",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"miniz_oxide",
|
||||
"object",
|
||||
"rustc-demangle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.12.3"
|
||||
|
@ -312,28 +283,6 @@ dependencies = [
|
|||
"rand 0.6.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "failure"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d32e9bd16cc02eae7db7ef620b392808b89f6a5e16bb3497d159c6b92a0f4f86"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"failure_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "failure_derive"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4"
|
||||
dependencies = [
|
||||
"proc-macro2 1.0.19",
|
||||
"quote 1.0.7",
|
||||
"syn 1.0.35",
|
||||
"synstructure",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.2.0"
|
||||
|
@ -503,12 +452,6 @@ dependencies = [
|
|||
"wasi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aaf91faf136cb47367fa430cd46e37a788775e7fa104f8b4bcb3861dc389b724"
|
||||
|
||||
[[package]]
|
||||
name = "glob"
|
||||
version = "0.3.0"
|
||||
|
@ -836,15 +779,6 @@ dependencies = [
|
|||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be0f75932c1f6cfae3c04000e40114adf955636e19040f9c0a2c380702aa1c7f"
|
||||
dependencies = [
|
||||
"adler",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.6.22"
|
||||
|
@ -961,12 +895,6 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ab52be62400ca80aa00285d25253d7f7c437b7375c4de678f5405d3afe82ca5"
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.4.0"
|
||||
|
@ -1525,12 +1453,6 @@ version = "1.0.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-demangle"
|
||||
version = "0.1.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4c691c0e608126e00913e33f0ccf3727d5fc84573623b8d65b2df340b5201783"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "1.1.0"
|
||||
|
@ -1742,18 +1664,6 @@ dependencies = [
|
|||
"unicode-xid 0.2.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "synstructure"
|
||||
version = "0.12.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701"
|
||||
dependencies = [
|
||||
"proc-macro2 1.0.19",
|
||||
"quote 1.0.7",
|
||||
"syn 1.0.35",
|
||||
"unicode-xid 0.2.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tempdir"
|
||||
version = "0.3.7"
|
||||
|
@ -1824,7 +1734,6 @@ dependencies = [
|
|||
"clap",
|
||||
"derive-new",
|
||||
"fail",
|
||||
"failure",
|
||||
"futures 0.3.5",
|
||||
"futures-timer",
|
||||
"grpcio",
|
||||
|
@ -1841,6 +1750,7 @@ dependencies = [
|
|||
"serial_test",
|
||||
"simple_logger",
|
||||
"tempdir",
|
||||
"thiserror",
|
||||
"tikv-client-common",
|
||||
"tikv-client-pd",
|
||||
"tikv-client-proto",
|
||||
|
@ -1854,7 +1764,6 @@ version = "0.0.0"
|
|||
dependencies = [
|
||||
"clap",
|
||||
"fail",
|
||||
"failure",
|
||||
"futures 0.3.5",
|
||||
"grpcio",
|
||||
"lazy_static",
|
||||
|
@ -1863,6 +1772,7 @@ dependencies = [
|
|||
"proptest-derive",
|
||||
"regex",
|
||||
"tempdir",
|
||||
"thiserror",
|
||||
"tikv-client-proto",
|
||||
"tokio",
|
||||
]
|
||||
|
|
|
@ -18,8 +18,8 @@ integration-tests = []
|
|||
name = "tikv_client"
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
derive-new = "0.5"
|
||||
failure = "0.1"
|
||||
futures = { version = "0.3.5", features = ["async-await", "thread-pool"] }
|
||||
futures-timer = "3.0"
|
||||
grpcio = { version = "0.6", features = [ "secure", "prost-codec" ], default-features = false }
|
||||
|
@ -30,7 +30,7 @@ rand = "0.7"
|
|||
regex = "1"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
async-trait = "0.1"
|
||||
thiserror = "1"
|
||||
tokio = { version = "0.2", features = ["sync"] }
|
||||
|
||||
tikv-client-common = { path = "tikv-client-common" }
|
||||
|
|
|
@ -25,9 +25,9 @@ async fn main() {
|
|||
.expect("Could not connect to tikv");
|
||||
|
||||
let key1: Key = b"key1".to_vec().into();
|
||||
let value1: Value = b"value1".to_vec().into();
|
||||
let value1: Value = b"value1".to_vec();
|
||||
let key2: Key = b"key2".to_vec().into();
|
||||
let value2: Value = b"value2".to_vec().into();
|
||||
let value2: Value = b"value2".to_vec();
|
||||
let mut txn0 = client.begin().await.expect("Could not begin a transaction");
|
||||
for (key, value) in vec![(key1, value1), (key2, value2)] {
|
||||
txn0.put(key, value).await.expect("Could not set key value");
|
||||
|
@ -49,13 +49,13 @@ async fn main() {
|
|||
// another txn cannot write to the locked key
|
||||
let mut txn2 = client.begin().await.expect("Could not begin a transaction");
|
||||
let key1: Key = b"key1".to_vec().into();
|
||||
let value2: Value = b"value2".to_vec().into();
|
||||
let value2: Value = b"value2".to_vec();
|
||||
txn2.put(key1, value2).await.unwrap();
|
||||
let result = txn2.commit().await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
// while this txn can still write it
|
||||
let value3: Value = b"value3".to_vec().into();
|
||||
let value3: Value = b"value3".to_vec();
|
||||
txn1.put(key1.clone(), value3).await.unwrap();
|
||||
txn1.commit().await.unwrap();
|
||||
let txn3 = client.begin().await.expect("Could not begin a transaction");
|
||||
|
|
|
@ -111,7 +111,7 @@ async fn main() -> Result<()> {
|
|||
.expect("Could not batch scan");
|
||||
let vals: Vec<_> = kv_pairs
|
||||
.into_iter()
|
||||
.map(|p| String::from_utf8(p.1).unwrap().to_owned())
|
||||
.map(|p| String::from_utf8(p.1).unwrap())
|
||||
.collect();
|
||||
assert_eq!(
|
||||
&vals,
|
||||
|
|
|
@ -56,9 +56,9 @@ async fn main() {
|
|||
|
||||
// set
|
||||
let key1: Key = b"key1".to_vec().into();
|
||||
let value1: Value = b"value1".to_vec().into();
|
||||
let value1: Value = b"value1".to_vec();
|
||||
let key2: Key = b"key2".to_vec().into();
|
||||
let value2: Value = b"value2".to_vec().into();
|
||||
let value2: Value = b"value2".to_vec();
|
||||
puts(&txn, vec![(key1, value1), (key2, value2)]).await;
|
||||
|
||||
// get
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use std::{io::Write, ptr};
|
||||
use tikv_client_common::Error;
|
||||
use tikv_client_common::internal_err;
|
||||
|
||||
use crate::Result;
|
||||
|
||||
|
@ -79,10 +79,7 @@ pub fn decode_bytes_in_place(data: &mut Vec<u8>, desc: bool) -> Result<()> {
|
|||
loop {
|
||||
let marker_offset = read_offset + ENC_GROUP_SIZE;
|
||||
if marker_offset >= data.len() {
|
||||
return Err(Error::internal_error(format!(
|
||||
"unexpected EOF, original key = {:?}",
|
||||
data
|
||||
)));
|
||||
return Err(internal_err!("unexpected EOF, original key = {:?}", data));
|
||||
};
|
||||
|
||||
unsafe {
|
||||
|
@ -109,7 +106,7 @@ pub fn decode_bytes_in_place(data: &mut Vec<u8>, desc: bool) -> Result<()> {
|
|||
|
||||
if pad_size > 0 {
|
||||
if pad_size > ENC_GROUP_SIZE {
|
||||
return Err(Error::internal_error("invalid key padding"));
|
||||
return Err(internal_err!("invalid key padding"));
|
||||
}
|
||||
|
||||
// check the padding pattern whether validate or not
|
||||
|
@ -119,7 +116,7 @@ pub fn decode_bytes_in_place(data: &mut Vec<u8>, desc: bool) -> Result<()> {
|
|||
&ENC_ASC_PADDING[..pad_size]
|
||||
};
|
||||
if &data[write_offset - pad_size..write_offset] != padding_slice {
|
||||
return Err(Error::internal_error("invalid key padding"));
|
||||
return Err(internal_err!("invalid key padding"));
|
||||
}
|
||||
unsafe {
|
||||
data.set_len(write_offset - pad_size);
|
||||
|
|
|
@ -110,4 +110,4 @@ pub use config::Config;
|
|||
#[doc(inline)]
|
||||
pub use region::{Region, RegionId, RegionVerId, StoreId};
|
||||
#[doc(inline)]
|
||||
pub use tikv_client_common::{security::SecurityManager, Error, ErrorKind, Result};
|
||||
pub use tikv_client_common::{security::SecurityManager, Error, Result};
|
||||
|
|
|
@ -142,7 +142,7 @@ impl PdClient for MockPdClient {
|
|||
match id {
|
||||
1 => Ok(Self::region1()),
|
||||
2 => Ok(Self::region2()),
|
||||
_ => Err(Error::region_not_found(id)),
|
||||
_ => Err(Error::RegionNotFound { region_id: id }),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -409,24 +409,21 @@ pub mod test {
|
|||
ranges1.1,
|
||||
vec![
|
||||
(k1.clone(), k2.clone()).into(),
|
||||
(k1.clone(), k_split.clone()).into()
|
||||
(k1, k_split.clone()).into()
|
||||
] as Vec<BoundRange>
|
||||
);
|
||||
assert_eq!(ranges2.0, 2);
|
||||
assert_eq!(
|
||||
ranges2.1,
|
||||
vec![(k_split.clone(), k3.clone()).into()] as Vec<BoundRange>
|
||||
vec![(k_split.clone(), k3).into()] as Vec<BoundRange>
|
||||
);
|
||||
assert_eq!(ranges3.0, 1);
|
||||
assert_eq!(
|
||||
ranges3.1,
|
||||
vec![(k2.clone(), k_split.clone()).into()] as Vec<BoundRange>
|
||||
vec![(k2, k_split.clone()).into()] as Vec<BoundRange>
|
||||
);
|
||||
assert_eq!(ranges4.0, 2);
|
||||
assert_eq!(
|
||||
ranges4.1,
|
||||
vec![(k_split.clone(), k4.clone()).into()] as Vec<BoundRange>
|
||||
);
|
||||
assert_eq!(ranges4.1, vec![(k_split, k4).into()] as Vec<BoundRange>);
|
||||
assert!(stream.next().is_none());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,18 +104,18 @@ impl RetryClient<Cluster> {
|
|||
.get_region(key.clone(), self.timeout)
|
||||
.await
|
||||
.and_then(|resp| {
|
||||
region_from_response(resp, || Error::region_for_key_not_found(key))
|
||||
region_from_response(resp, || Error::RegionForKeyNotFound { key })
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_region_by_id(self: Arc<Self>, id: RegionId) -> Result<Region> {
|
||||
pub async fn get_region_by_id(self: Arc<Self>, region_id: RegionId) -> Result<Region> {
|
||||
retry!(self, "get_region_by_id", |cluster| async {
|
||||
cluster
|
||||
.get_region_by_id(id, self.timeout)
|
||||
.get_region_by_id(region_id, self.timeout)
|
||||
.await
|
||||
.and_then(|resp| region_from_response(resp, || Error::region_not_found(id)))
|
||||
.and_then(|resp| region_from_response(resp, || Error::RegionNotFound { region_id }))
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -215,7 +215,7 @@ mod test {
|
|||
async fn reconnect(&self, _: u64) -> Result<()> {
|
||||
*self.reconnect_count.lock().unwrap() += 1;
|
||||
// Not actually unimplemented, we just don't care about the error.
|
||||
Err(Error::unimplemented())
|
||||
Err(Error::Unimplemented)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||
|
||||
use tikv_client_common::Error;
|
||||
|
||||
use super::requests;
|
||||
use crate::{
|
||||
config::Config,
|
||||
pd::PdRpcClient,
|
||||
request::{KvRequest, OPTIMISTIC_BACKOFF},
|
||||
BoundRange, ColumnFamily, Error, Key, KvPair, Result, Value,
|
||||
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
|
||||
};
|
||||
use std::{sync::Arc, u32};
|
||||
|
||||
|
@ -168,7 +170,7 @@ impl Client {
|
|||
///
|
||||
/// # Examples
|
||||
/// ```rust,no_run
|
||||
/// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, RawClient, ToOwnedRange};
|
||||
/// # use tikv_client::{Result, KvPair, Key, Value, Config, RawClient, ToOwnedRange};
|
||||
/// # use futures::prelude::*;
|
||||
/// # futures::executor::block_on(async {
|
||||
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
|
||||
|
@ -380,7 +382,10 @@ impl Client {
|
|||
key_only: bool,
|
||||
) -> Result<Vec<KvPair>> {
|
||||
if limit > MAX_RAW_KV_SCAN_LIMIT {
|
||||
return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT));
|
||||
return Err(Error::MaxScanLimitExceeded {
|
||||
limit,
|
||||
max_limit: MAX_RAW_KV_SCAN_LIMIT,
|
||||
});
|
||||
}
|
||||
|
||||
let res = requests::new_raw_scan_request(range, limit, key_only, self.cf.clone())
|
||||
|
@ -399,10 +404,10 @@ impl Client {
|
|||
key_only: bool,
|
||||
) -> Result<Vec<KvPair>> {
|
||||
if each_limit > MAX_RAW_KV_SCAN_LIMIT {
|
||||
return Err(Error::max_scan_limit_exceeded(
|
||||
each_limit,
|
||||
MAX_RAW_KV_SCAN_LIMIT,
|
||||
));
|
||||
return Err(Error::MaxScanLimitExceeded {
|
||||
limit: each_limit,
|
||||
max_limit: MAX_RAW_KV_SCAN_LIMIT,
|
||||
});
|
||||
}
|
||||
|
||||
requests::new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone())
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
//! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace.
|
||||
|
||||
pub use self::client::Client;
|
||||
use crate::{Error, ErrorKind};
|
||||
use crate::Error;
|
||||
use std::{convert::TryFrom, fmt};
|
||||
|
||||
mod client;
|
||||
|
@ -59,7 +59,7 @@ impl TryFrom<&str> for ColumnFamily {
|
|||
"lock" => Ok(ColumnFamily::Lock),
|
||||
"write" => Ok(ColumnFamily::Write),
|
||||
"ver_default" => Ok(ColumnFamily::VersionDefault),
|
||||
s => Err(ErrorKind::ColumnFamilyError(s.to_owned()).into()),
|
||||
s => Err(Error::ColumnFamilyError(s.to_owned())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,9 @@ impl Region {
|
|||
pub fn context(&self) -> Result<kvrpcpb::Context> {
|
||||
self.leader
|
||||
.as_ref()
|
||||
.ok_or_else(|| Error::leader_not_found(self.region.get_id()))
|
||||
.ok_or_else(|| Error::LeaderNotFound {
|
||||
region_id: self.region.get_id(),
|
||||
})
|
||||
.map(|l| {
|
||||
let mut ctx = kvrpcpb::Context::default();
|
||||
ctx.set_region_id(self.region.get_id());
|
||||
|
@ -83,7 +85,9 @@ impl Region {
|
|||
self.leader
|
||||
.as_ref()
|
||||
.cloned()
|
||||
.ok_or_else(|| Error::leader_not_found(self.id()))
|
||||
.ok_or_else(|| Error::LeaderNotFound {
|
||||
region_id: self.id(),
|
||||
})
|
||||
.map(|s| s.get_store_id())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use crate::{
|
|||
stats::tikv_stats,
|
||||
store::Store,
|
||||
transaction::{resolve_locks, HasLocks},
|
||||
BoundRange, Error, ErrorKind, Key, Result,
|
||||
BoundRange, Error, Key, Result,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use futures::{prelude::*, stream::BoxStream};
|
||||
|
@ -140,7 +140,7 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized {
|
|||
mut lock_backoff: impl Backoff,
|
||||
) -> BoxStream<'static, Result<Self::RpcResponse>> {
|
||||
lock_backoff.next_delay_duration().map_or(
|
||||
stream::once(future::err(ErrorKind::ResolveLockError.into())).boxed(),
|
||||
stream::once(future::err(Error::ResolveLockError)).boxed(),
|
||||
move |delay_duration| {
|
||||
let fut = async move {
|
||||
futures_timer::Delay::new(delay_duration).await;
|
||||
|
@ -281,7 +281,7 @@ mod test {
|
|||
|
||||
impl HasRegionError for MockRpcResponse {
|
||||
fn region_error(&mut self) -> Option<Error> {
|
||||
Some(Error::region_not_found(1))
|
||||
Some(Error::RegionNotFound { region_id: 1 })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -248,10 +248,10 @@ mod tests {
|
|||
async fn set_and_get_from_buffer() {
|
||||
let buffer = Buffer::default();
|
||||
buffer
|
||||
.put(b"key1".to_vec().into(), b"value1".to_vec().into())
|
||||
.put(b"key1".to_vec().into(), b"value1".to_vec())
|
||||
.await;
|
||||
buffer
|
||||
.put(b"key2".to_vec().into(), b"value2".to_vec().into())
|
||||
.put(b"key2".to_vec().into(), b"value2".to_vec())
|
||||
.await;
|
||||
assert_eq!(
|
||||
block_on(buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(panic!())))
|
||||
|
@ -261,9 +261,7 @@ mod tests {
|
|||
);
|
||||
|
||||
buffer.delete(b"key2".to_vec().into()).await;
|
||||
buffer
|
||||
.put(b"key1".to_vec().into(), b"value".to_vec().into())
|
||||
.await;
|
||||
buffer.put(b"key1".to_vec().into(), b"value".to_vec()).await;
|
||||
assert_eq!(
|
||||
block_on(buffer.batch_get_or_else(
|
||||
vec![b"key2".to_vec().into(), b"key1".to_vec().into()].into_iter(),
|
||||
|
@ -271,10 +269,7 @@ mod tests {
|
|||
))
|
||||
.unwrap()
|
||||
.collect::<Vec<_>>(),
|
||||
vec![KvPair(
|
||||
Key::from(b"key1".to_vec()),
|
||||
Value::from(b"value".to_vec())
|
||||
),]
|
||||
vec![KvPair(Key::from(b"key1".to_vec()), b"value".to_vec()),]
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -285,17 +280,17 @@ mod tests {
|
|||
let k1_ = k1.clone();
|
||||
let k2: Key = b"key2".to_vec().into();
|
||||
let k2_ = k2.clone();
|
||||
let v1: Value = b"value1".to_vec().into();
|
||||
let v1: Value = b"value1".to_vec();
|
||||
let v1_ = v1.clone();
|
||||
let v1__ = v1.clone();
|
||||
let v2: Value = b"value2".to_vec().into();
|
||||
let v2: Value = b"value2".to_vec();
|
||||
let v2_ = v2.clone();
|
||||
|
||||
let buffer = Buffer::default();
|
||||
let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_)))));
|
||||
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!())));
|
||||
assert_eq!(r1.unwrap().unwrap(), v1.clone());
|
||||
assert_eq!(r2.unwrap().unwrap(), v1.clone());
|
||||
assert_eq!(r1.unwrap().unwrap(), v1);
|
||||
assert_eq!(r2.unwrap().unwrap(), v1);
|
||||
|
||||
let buffer = Buffer::default();
|
||||
let r1 = block_on(
|
||||
|
@ -316,13 +311,10 @@ mod tests {
|
|||
KvPair(k2.clone(), v2.clone())
|
||||
]
|
||||
);
|
||||
assert_eq!(r2.unwrap().unwrap(), v2.clone());
|
||||
assert_eq!(r2.unwrap().unwrap(), v2);
|
||||
assert_eq!(
|
||||
r3.unwrap().collect::<Vec<_>>(),
|
||||
vec![
|
||||
KvPair(k1.clone(), v1.clone()),
|
||||
KvPair(k2.clone(), v2.clone())
|
||||
]
|
||||
vec![KvPair(k1, v1), KvPair(k2, v2)]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use crate::{
|
|||
request::{KvRequest, OPTIMISTIC_BACKOFF},
|
||||
timestamp::TimestampExt,
|
||||
transaction::requests,
|
||||
ErrorKind, Key, RegionVerId, Result,
|
||||
Error, Key, RegionVerId, Result,
|
||||
};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
|
@ -101,14 +101,12 @@ async fn resolve_lock_with_retry(
|
|||
Ok(_) => {
|
||||
return Ok(region.ver_id());
|
||||
}
|
||||
Err(e) => match e.kind() {
|
||||
ErrorKind::RegionError(_) => {
|
||||
// Retry on region error
|
||||
error = Some(e);
|
||||
continue;
|
||||
}
|
||||
_ => return Err(e),
|
||||
},
|
||||
Err(e @ Error::RegionError(_)) => {
|
||||
// Retry on region error
|
||||
error = Some(e);
|
||||
continue;
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Err(error.expect("no error is impossible"))
|
||||
|
@ -153,7 +151,7 @@ mod tests {
|
|||
// Test resolve lock over retry limit
|
||||
fail::cfg("region-error", "10*return").unwrap();
|
||||
let key: Key = vec![100].into();
|
||||
executor::block_on(resolve_lock_with_retry(key, 3, 4, client.clone()))
|
||||
executor::block_on(resolve_lock_with_retry(key, 3, 4, client))
|
||||
.expect_err("should return error");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ use crate::{
|
|||
request::{KvRequest, OPTIMISTIC_BACKOFF, PESSIMISTIC_BACKOFF},
|
||||
timestamp::TimestampExt,
|
||||
transaction::{buffer::Buffer, requests::*},
|
||||
BoundRange, Error, ErrorKind, Key, KvPair, Result, Value,
|
||||
BoundRange, Error, Key, KvPair, Result, Value,
|
||||
};
|
||||
use derive_new::new;
|
||||
use futures::{executor::ThreadPool, prelude::*, stream::BoxStream};
|
||||
|
@ -189,7 +189,7 @@ impl Transaction {
|
|||
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
|
||||
self.check_allow_operation()?;
|
||||
if !self.is_pessimistic() {
|
||||
Err(ErrorKind::InvalidTransactionType.into())
|
||||
Err(Error::InvalidTransactionType)
|
||||
} else {
|
||||
let key = key.into();
|
||||
self.pessimistic_lock(iter::once(key.clone())).await?;
|
||||
|
@ -275,7 +275,7 @@ impl Transaction {
|
|||
) -> Result<impl Iterator<Item = KvPair>> {
|
||||
self.check_allow_operation()?;
|
||||
if !self.is_pessimistic() {
|
||||
Err(ErrorKind::InvalidTransactionType.into())
|
||||
Err(Error::InvalidTransactionType)
|
||||
} else {
|
||||
let keys: Vec<Key> = keys.into_iter().map(|it| it.into()).collect();
|
||||
self.pessimistic_lock(keys.clone()).await?;
|
||||
|
@ -462,7 +462,7 @@ impl Transaction {
|
|||
self.status,
|
||||
TransactionStatus::StartedCommit | TransactionStatus::Active
|
||||
) {
|
||||
return Err(ErrorKind::OperationAfterCommitError.into());
|
||||
return Err(Error::OperationAfterCommitError);
|
||||
}
|
||||
self.status = TransactionStatus::StartedCommit;
|
||||
|
||||
|
@ -490,7 +490,7 @@ impl Transaction {
|
|||
self.status,
|
||||
TransactionStatus::StartedRollback | TransactionStatus::Active
|
||||
) {
|
||||
return Err(ErrorKind::OperationAfterCommitError.into());
|
||||
return Err(Error::OperationAfterCommitError);
|
||||
}
|
||||
self.status = TransactionStatus::StartedRollback;
|
||||
|
||||
|
@ -571,9 +571,7 @@ impl Transaction {
|
|||
TransactionStatus::Committed
|
||||
| TransactionStatus::Rolledback
|
||||
| TransactionStatus::StartedCommit
|
||||
| TransactionStatus::StartedRollback => {
|
||||
Err(ErrorKind::OperationAfterCommitError.into())
|
||||
}
|
||||
| TransactionStatus::StartedRollback => Err(Error::OperationAfterCommitError),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -628,7 +626,7 @@ impl TwoPhaseCommitter {
|
|||
Ok(commit_ts) => commit_ts,
|
||||
Err(e) => {
|
||||
return if self.undetermined {
|
||||
Err(Error::undetermined_error(e))
|
||||
Err(Error::UndeterminedError(Box::new(e)))
|
||||
} else {
|
||||
Err(e)
|
||||
};
|
||||
|
@ -685,7 +683,7 @@ impl TwoPhaseCommitter {
|
|||
|
||||
if self.style.try_one_pc && response.len() == 1 {
|
||||
if response[0].one_pc_commit_ts == 0 {
|
||||
return Err(ErrorKind::OnePcFailure.into());
|
||||
return Err(Error::OnePcFailure);
|
||||
}
|
||||
|
||||
return Ok(response[0].one_pc_commit_ts);
|
||||
|
@ -715,7 +713,7 @@ impl TwoPhaseCommitter {
|
|||
// We don't know whether the transaction is committed or not if we fail to receive
|
||||
// the response. Then, we mark the transaction as undetermined and propagate the
|
||||
// error to the user.
|
||||
if let ErrorKind::Grpc(_) = e.kind() {
|
||||
if let Error::Grpc(_) = e {
|
||||
self.undetermined = true;
|
||||
}
|
||||
})
|
||||
|
|
|
@ -5,7 +5,7 @@ edition = "2018"
|
|||
|
||||
|
||||
[dependencies]
|
||||
failure = "0.1"
|
||||
thiserror = "1"
|
||||
futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] }
|
||||
grpcio = { version = "0.6", features = [ "secure", "prost-codec" ], default-features = false }
|
||||
lazy_static = "1"
|
||||
|
|
|
@ -1,190 +1,79 @@
|
|||
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
|
||||
|
||||
use failure::{Backtrace, Context, Fail};
|
||||
use std::{
|
||||
fmt::{self, Display},
|
||||
result,
|
||||
};
|
||||
use tikv_client_proto::errorpb;
|
||||
|
||||
/// The error type used in tikv-client.
|
||||
#[derive(Debug)]
|
||||
pub struct Error {
|
||||
inner: Box<Context<ErrorKind>>,
|
||||
}
|
||||
use std::result;
|
||||
use thiserror::Error;
|
||||
|
||||
/// An error originating from the TiKV client or dependencies.
|
||||
#[derive(Debug, Fail)]
|
||||
#[derive(Debug, Error)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum ErrorKind {
|
||||
pub enum Error {
|
||||
/// Feature is not implemented.
|
||||
#[fail(display = "Unimplemented feature")]
|
||||
#[error("Unimplemented feature")]
|
||||
Unimplemented,
|
||||
/// Failed to resolve a lock
|
||||
#[fail(display = "Failed to resolve lock")]
|
||||
#[error("Failed to resolve lock")]
|
||||
ResolveLockError,
|
||||
/// Will raise this error when using a pessimistic txn only operation on an optimistic txn
|
||||
#[fail(display = "Invalid operation for this type of transaction")]
|
||||
#[error("Invalid operation for this type of transaction")]
|
||||
InvalidTransactionType,
|
||||
/// It's not allowed to perform operations in a transaction after it has been committed or rolled back.
|
||||
#[fail(
|
||||
display = "Cannot read or write data after any attempt to commit or roll back the transaction"
|
||||
)]
|
||||
#[error("Cannot read or write data after any attempt to commit or roll back the transaction")]
|
||||
OperationAfterCommitError,
|
||||
/// We tried to use 1pc for a transaction, but it didn't work. Probably should have used 2pc.
|
||||
#[fail(display = "1PC transaction could not be committed.")]
|
||||
#[error("1PC transaction could not be committed.")]
|
||||
OnePcFailure,
|
||||
/// Wraps a `std::io::Error`.
|
||||
#[fail(display = "IO error: {}", _0)]
|
||||
Io(#[fail(cause)] std::io::Error),
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
/// Wraps a `grpcio::Error`.
|
||||
#[fail(display = "gRPC error: {}", _0)]
|
||||
Grpc(#[fail(cause)] grpcio::Error),
|
||||
#[error("gRPC error: {0}")]
|
||||
Grpc(#[from] grpcio::Error),
|
||||
/// Represents that a futures oneshot channel was cancelled.
|
||||
#[fail(display = "A futures oneshot channel was canceled. {}", _0)]
|
||||
Canceled(#[fail(cause)] futures::channel::oneshot::Canceled),
|
||||
#[error("A futures oneshot channel was canceled. {0}")]
|
||||
Canceled(#[from] futures::channel::oneshot::Canceled),
|
||||
/// Errors caused by changes of region information
|
||||
#[fail(display = "Region error: {:?}", _0)]
|
||||
#[error("Region error: {0:?}")]
|
||||
RegionError(tikv_client_proto::errorpb::Error),
|
||||
/// Whether the transaction is committed or not is undetermined
|
||||
#[fail(display = "Whether the transaction is committed or not is undetermined")]
|
||||
UndeterminedError(#[fail(cause)] Error),
|
||||
#[error("Whether the transaction is committed or not is undetermined")]
|
||||
UndeterminedError(Box<Error>),
|
||||
/// Wraps `tikv_client_proto::kvrpcpb::KeyError`
|
||||
#[fail(display = "{:?}", _0)]
|
||||
#[error("{0:?}")]
|
||||
KeyError(tikv_client_proto::kvrpcpb::KeyError),
|
||||
/// Multiple errors
|
||||
#[fail(display = "Multiple errors: {:?}", _0)]
|
||||
#[error("Multiple errors: {0:?}")]
|
||||
MultipleErrors(Vec<Error>),
|
||||
/// Invalid ColumnFamily
|
||||
#[fail(display = "Unsupported column family {}", _0)]
|
||||
#[error("Unsupported column family {}", _0)]
|
||||
ColumnFamilyError(String),
|
||||
/// No region is found for the given key.
|
||||
#[fail(display = "Region is not found for key: {:?}", key)]
|
||||
#[error("Region is not found for key: {:?}", key)]
|
||||
RegionForKeyNotFound { key: Vec<u8> },
|
||||
/// No region is found for the given id.
|
||||
#[fail(display = "Region {} is not found", region_id)]
|
||||
#[error("Region {} is not found", region_id)]
|
||||
RegionNotFound { region_id: u64 },
|
||||
/// No leader is found for the given id.
|
||||
#[fail(display = "Leader of region {} is not found", region_id)]
|
||||
#[error("Leader of region {} is not found", region_id)]
|
||||
LeaderNotFound { region_id: u64 },
|
||||
/// Scan limit exceeds the maximum
|
||||
#[fail(display = "Limit {} exceeds max scan limit {}", limit, max_limit)]
|
||||
#[error("Limit {} exceeds max scan limit {}", limit, max_limit)]
|
||||
MaxScanLimitExceeded { limit: u32, max_limit: u32 },
|
||||
/// A string error returned by TiKV server
|
||||
#[fail(display = "Kv error. {}", message)]
|
||||
#[error("Kv error. {}", message)]
|
||||
KvError { message: String },
|
||||
#[fail(display = "{}", message)]
|
||||
#[error("{}", message)]
|
||||
InternalError { message: String },
|
||||
}
|
||||
|
||||
impl Fail for Error {
|
||||
fn cause(&self) -> Option<&dyn Fail> {
|
||||
self.inner.cause()
|
||||
}
|
||||
|
||||
fn backtrace(&self) -> Option<&Backtrace> {
|
||||
self.inner.backtrace()
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
Display::fmt(&self.inner, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub fn kind(&self) -> &ErrorKind {
|
||||
self.inner.get_context()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn unimplemented() -> Self {
|
||||
Error::from(ErrorKind::Unimplemented)
|
||||
}
|
||||
|
||||
pub fn region_for_key_not_found(key: Vec<u8>) -> Self {
|
||||
Error::from(ErrorKind::RegionForKeyNotFound { key })
|
||||
}
|
||||
|
||||
pub fn region_error(error: tikv_client_proto::errorpb::Error) -> Self {
|
||||
Error::from(ErrorKind::RegionError(error))
|
||||
}
|
||||
|
||||
pub fn region_not_found(region_id: u64) -> Self {
|
||||
Error::from(ErrorKind::RegionNotFound { region_id })
|
||||
}
|
||||
|
||||
pub fn leader_not_found(region_id: u64) -> Self {
|
||||
Error::from(ErrorKind::LeaderNotFound { region_id })
|
||||
}
|
||||
|
||||
pub fn max_scan_limit_exceeded(limit: u32, max_limit: u32) -> Self {
|
||||
Error::from(ErrorKind::MaxScanLimitExceeded { limit, max_limit })
|
||||
}
|
||||
|
||||
pub fn kv_error(message: String) -> Self {
|
||||
Error::from(ErrorKind::KvError { message })
|
||||
}
|
||||
|
||||
pub fn internal_error(message: impl Into<String>) -> Self {
|
||||
Error::from(ErrorKind::InternalError {
|
||||
message: message.into(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn multiple_errors(errors: Vec<Error>) -> Self {
|
||||
Error::from(ErrorKind::MultipleErrors(errors))
|
||||
}
|
||||
|
||||
pub fn undetermined_error(error: Error) -> Self {
|
||||
Error::from(ErrorKind::UndeterminedError(error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<errorpb::Error> for Error {
|
||||
fn from(e: errorpb::Error) -> Error {
|
||||
Error::region_error(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ErrorKind> for Error {
|
||||
fn from(kind: ErrorKind) -> Error {
|
||||
Error {
|
||||
inner: Box::new(Context::new(kind)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Context<ErrorKind>> for Error {
|
||||
fn from(inner: Context<ErrorKind>) -> Error {
|
||||
Error {
|
||||
inner: Box::new(inner),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(err: std::io::Error) -> Self {
|
||||
Error::from(ErrorKind::Io(err))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<grpcio::Error> for Error {
|
||||
fn from(err: grpcio::Error) -> Self {
|
||||
Error::from(ErrorKind::Grpc(err))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<futures::channel::oneshot::Canceled> for Error {
|
||||
fn from(err: futures::channel::oneshot::Canceled) -> Self {
|
||||
Error::from(ErrorKind::Canceled(err))
|
||||
impl From<tikv_client_proto::errorpb::Error> for Error {
|
||||
fn from(e: tikv_client_proto::errorpb::Error) -> Error {
|
||||
Error::RegionError(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tikv_client_proto::kvrpcpb::KeyError> for Error {
|
||||
fn from(err: tikv_client_proto::kvrpcpb::KeyError) -> Self {
|
||||
Error::from(ErrorKind::KeyError(err))
|
||||
fn from(e: tikv_client_proto::kvrpcpb::KeyError) -> Error {
|
||||
Error::KeyError(e)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -194,10 +83,9 @@ pub type Result<T> = result::Result<T, Error>;
|
|||
#[macro_export]
|
||||
macro_rules! internal_err {
|
||||
($e:expr) => ({
|
||||
let kind = $crate::Error::internal_error(
|
||||
format!("[{}:{}]: {}", file!(), line!(), $e)
|
||||
);
|
||||
$crate::Error::from(kind)
|
||||
$crate::Error::InternalError {
|
||||
message: format!("[{}:{}]: {}", file!(), line!(), $e)
|
||||
}
|
||||
});
|
||||
($f:tt, $($arg:expr),+) => ({
|
||||
internal_err!(format!($f, $($arg),+))
|
||||
|
|
|
@ -6,4 +6,4 @@ pub mod security;
|
|||
extern crate log;
|
||||
|
||||
#[doc(inline)]
|
||||
pub use crate::errors::{Error, ErrorKind, Result};
|
||||
pub use crate::errors::{Error, Result};
|
||||
|
|
|
@ -21,6 +21,7 @@ use futures::{
|
|||
};
|
||||
use grpcio::WriteFlags;
|
||||
use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, thread};
|
||||
use tikv_client_common::internal_err;
|
||||
use tikv_client_proto::pdpb::*;
|
||||
|
||||
/// It is an empirical value.
|
||||
|
@ -66,7 +67,7 @@ impl TimestampOracle {
|
|||
self.request_tx
|
||||
.send(request)
|
||||
.await
|
||||
.map_err(|_| Error::internal_error("TimestampRequest channel is closed"))?;
|
||||
.map_err(|_| internal_err!("TimestampRequest channel is closed"))?;
|
||||
Ok(response.await?)
|
||||
}
|
||||
}
|
||||
|
@ -187,7 +188,7 @@ fn allocate_timestamps(
|
|||
let tail_ts = resp
|
||||
.timestamp
|
||||
.as_ref()
|
||||
.ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?;
|
||||
.ok_or_else(|| internal_err!("No timestamp in TsoResponse"))?;
|
||||
|
||||
let mut offset = resp.count;
|
||||
if let Some(RequestGroup {
|
||||
|
@ -196,8 +197,8 @@ fn allocate_timestamps(
|
|||
}) = pending_requests.pop_front()
|
||||
{
|
||||
if tso_request.count != offset {
|
||||
return Err(Error::internal_error(
|
||||
"PD gives different number of timestamps than expected",
|
||||
return Err(internal_err!(
|
||||
"PD gives different number of timestamps than expected"
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -210,9 +211,7 @@ fn allocate_timestamps(
|
|||
let _ = request.send(ts);
|
||||
}
|
||||
} else {
|
||||
return Err(Error::internal_error(
|
||||
"PD gives more TsoResponse than expected",
|
||||
));
|
||||
return Err(internal_err!("PD gives more TsoResponse than expected"));
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -78,7 +78,9 @@ macro_rules! has_str_error {
|
|||
if self.get_error().is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Error::kv_error(self.take_error()))
|
||||
Some(Error::KvError {
|
||||
message: self.take_error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -149,6 +151,6 @@ fn extract_errors(error_iter: impl Iterator<Item = Option<kvrpcpb::KeyError>>) -
|
|||
} else if errors.len() == 1 {
|
||||
Some(errors.into_iter().next().unwrap())
|
||||
} else {
|
||||
Some(Error::multiple_errors(errors))
|
||||
Some(Error::MultipleErrors(errors))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,4 +10,4 @@ pub use crate::{
|
|||
errors::{HasError, HasRegionError},
|
||||
request::Request,
|
||||
};
|
||||
pub use tikv_client_common::{security::SecurityManager, Error, ErrorKind, Result};
|
||||
pub use tikv_client_common::{security::SecurityManager, Error, Result};
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
|
||||
|
||||
use crate::{ErrorKind, Result};
|
||||
use crate::{Error, Result};
|
||||
use async_trait::async_trait;
|
||||
use grpcio::CallOption;
|
||||
use std::any::Any;
|
||||
|
@ -27,7 +27,7 @@ macro_rules! impl_request {
|
|||
.$fun(self, options)?
|
||||
.await
|
||||
.map(|r| Box::new(r) as Box<dyn Any>)
|
||||
.map_err(|e| ErrorKind::Grpc(e).into())
|
||||
.map_err(|e| Error::Grpc(e))
|
||||
}
|
||||
|
||||
fn label(&self) -> &'static str {
|
||||
|
|
Loading…
Reference in New Issue