Use failure for error management (#23)

* Use failure for error management.

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

* Add error constructors

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

* Remove NoSuchKey error

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2019-02-20 02:29:57 +08:00 committed by Ana Hobden
parent a5a6b152cd
commit 4e9a42fdf8
11 changed files with 280 additions and 174 deletions

View File

@ -15,7 +15,6 @@ name = "tikv_client"
futures = "0.1"
serde = "1.0"
serde_derive = "1.0"
quick-error = "1.2"
grpcio = { version = "0.4", features = [ "secure" ] }
protobuf = "~2.0"
tokio-core = "0.1"
@ -23,6 +22,7 @@ tokio-timer = "0.2"
fxhash = "0.2"
lazy_static = "0.2.1"
log = "0.3.9"
failure = "0.1"
[dependencies.kvproto]
git = "https://github.com/pingcap/kvproto.git"

View File

@ -70,10 +70,8 @@ fn main() -> Result<()> {
.expect("Could not delete value");
println!("Key: {:?} deleted", key);
client
.get(key)
.wait()
.expect_err("Get returned value for not existing key");
// Get returns None for non-existing key
assert!(client.get(key).wait()?.is_none());
let pairs: Vec<KvPair> = (1..3)
.map(|i| KvPair::from((Key::from(format!("k{}", i)), Value::from(format!("v{}", i)))))

View File

@ -10,127 +10,245 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use failure::{Backtrace, Context, Fail};
use grpcio;
use quick_error::quick_error;
use std::{error, result};
use std::fmt::{self, Display};
use std::result;
quick_error! {
/// An error originating from the TiKV client or dependencies.
///
/// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/)
/// for errors. *This may change in future versions.*
#[derive(Debug)]
pub enum Error {
/// Wraps a `std::io::Error`.
Io(err: ::std::io::Error) {
from()
cause(err)
description(err.description())
}
/// Wraps a `grpcio::Error`.
Grpc(err: grpcio::Error) {
from()
cause(err)
description(err.description())
}
/// Represents that a futures oneshot channel was cancelled.
Canceled(err: ::futures::sync::oneshot::Canceled) {
from()
cause(err)
description(err.description())
}
/// An unknown error.
///
/// Generally, this is not an expected error. Please report it if encountered.
Other(err: Box<error::Error + Sync + Send>) {
from()
cause(err.as_ref())
description(err.description())
display("unknown error {:?}", err)
}
/// A region was not found for the given key.
RegionForKeyNotFound(key: Vec<u8>) {
description("region is not found")
display("region is not found for key {:?}", key)
}
/// A region was not found.
RegionNotFound(region_id: u64, message: Option<String>) {
description("region is not found")
display("region {:?} is not found. {}", region_id, message.as_ref().unwrap_or(&"".to_owned()))
}
/// The peer is not a leader of the given region.
NotLeader(region_id: u64, message: Option<String>) {
description("peer is not leader")
display("peer is not leader for region {}. {}", region_id, message.as_ref().unwrap_or(&"".to_owned()))
}
/// The store does not match.
StoreNotMatch(request_store_id: u64, actual_store_id: u64, message: String) {
description("store not match")
display("requesting store '{}' when actual store is '{}'. {}", request_store_id, actual_store_id, message)
}
/// The given key is not within the given region.
KeyNotInRegion(key: Vec<u8>, region_id: u64, start_key: Vec<u8>, end_key: Vec<u8>) {
description("region is not found")
display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key)
}
/// A stale epoch.
StaleEpoch(message: Option<String>) {
description("stale epoch")
display("{}", message.as_ref().unwrap_or(&"".to_owned()))
}
StaleCommand(message: String) {
description("stale command")
display("{}", message)
}
/// The server is too busy.
ServerIsBusy(reason: String, backoff: u64) {
description("server is busy")
display("server is busy: {:?}. Backoff {} ms", reason, backoff)
}
/// The given raft entry is too large for the region.
RaftEntryTooLarge(region_id: u64, entry_size: u64, message: String) {
description("raft entry too large")
display("{:?} bytes raft entry of region {:?} is too large. {}", entry_size, region_id, message)
}
KeyError(message: String) {
description("key error")
display("{}", message)
}
KVError(message: String) {
description("kv error")
display("{}", message)
}
InternalError(message: String) {
description("internal error")
display("{}", message)
}
InvalidKeyRange {
description("invalid key range")
display("Only left closed intervals are supported")
}
Unimplemented {
description("unimplemented feature")
display("Unimplemented feature")
}
EmptyValue {
description("can not set empty value")
display("Can not set empty value")
}
NoSuchKey {
description("key does not exist")
display("Key doest not exist")
}
InvalidOverlappingRanges {
description("ranges can not be overlapping")
display("Ranges can not be overlapping")
}
MaxScanLimitExceeded(limit: u32, max_limit: u32) {
description("limit exceeds max scan limit")
display("Limit {} excceds max scan limit {}", limit, max_limit)
#[derive(Debug)]
pub struct Error {
inner: Context<ErrorKind>,
}
/// An error originating from the TiKV client or dependencies.
#[derive(Debug, Fail)]
pub enum ErrorKind {
/// Wraps a `std::io::Error`.
#[fail(display = "IO error: {}", _0)]
Io(#[fail(cause)] std::io::Error),
/// Wraps a `grpcio::Error`.
#[fail(display = "gRPC error: {}", _0)]
Grpc(#[fail(cause)] grpcio::Error),
/// Represents that a futures oneshot channel was cancelled.
#[fail(display = "A futures oneshot channel was canceled. {}", _0)]
Canceled(#[fail(cause)] futures::sync::oneshot::Canceled),
/// Feature is not implemented.
#[fail(display = "Unimplemented feature")]
Unimplemented,
// No region is found for the given key.
#[fail(display = "Region is not found for key: {:?}", key)]
RegionForKeyNotFound { key: Vec<u8> },
/// The peer is not the leader for the region.
#[fail(display = "Peer is not leader for region {}. {}", region_id, message)]
NotLeader { region_id: u64, message: String },
/// Stale epoch
#[fail(display = "Stale epoch. {}", message)]
StaleEpoch { message: String },
/// No region is found for the given id.
#[fail(display = "Region {} is not found. {}", region_id, message)]
RegionNotFound { region_id: u64, message: String },
/// Invalid key range to scan. Only left bounded intervals are supported.
#[fail(display = "Only left bounded intervals are supported")]
InvalidKeyRange,
/// Cannot set an empty value
#[fail(display = "Cannot set an empty value")]
EmptyValue,
/// Scan limit exceeds the maximum
#[fail(display = "Limit {} exceeds max scan limit {}", limit, max_limit)]
MaxScanLimitExceeded { limit: u32, max_limit: u32 },
/// Wraps `kvproto::kvrpcpb::KeyError`
#[fail(display = "{:?}", _0)]
KeyError(kvproto::kvrpcpb::KeyError),
/// A string error returned by TiKV server
#[fail(display = "Kv error. {}", message)]
KvError { message: String },
/// Reconstructed `kvproto::errorpb::KeyNotInRegion`
#[fail(
display = "Key {:?} is not in region {}: [{:?}, {:?})",
key, region_id, start_key, end_key
)]
KeyNotInRegion {
key: Vec<u8>,
region_id: u64,
start_key: Vec<u8>,
end_key: Vec<u8>,
},
/// Reconstructed `kvproto::errorpb::ServerIsBusy`
#[fail(display = "Server is busy: {}. Backoff {} ms", reason, backoff_ms)]
ServerIsBusy { reason: String, backoff_ms: u64 },
/// Represents `kvproto::errorpb::StaleCommand` with additional error message
#[fail(display = "Stale command. {}", message)]
StaleCommand { message: String },
/// Represents `kvproto::errorpb::StoreNotMatch` with additional error message
#[fail(
display = "Requesting store {} when actual store is {}. {}",
request_store_id, actual_store_id, message
)]
StoreNotMatch {
request_store_id: u64,
actual_store_id: u64,
message: String,
},
/// Represents `kvproto::errorpb::RaftEntryTooLarge` with additional error message
#[fail(
display = "{} bytes raft entry of region {} is too large. {}",
entry_size, region_id, message
)]
RaftEntryTooLarge {
region_id: u64,
entry_size: u64,
message: String,
},
#[fail(display = "{}", message)]
InternalError { message: String },
}
impl Fail for Error {
fn cause(&self) -> Option<&Fail> {
self.inner.cause()
}
fn backtrace(&self) -> Option<&Backtrace> {
self.inner.backtrace()
}
}
impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(&self.inner, f)
}
}
impl Error {
pub fn kind(&self) -> &ErrorKind {
self.inner.get_context()
}
pub(crate) fn unimplemented() -> Self {
Error::from(ErrorKind::Unimplemented)
}
pub(crate) fn region_for_key_not_found(key: Vec<u8>) -> Self {
Error::from(ErrorKind::RegionForKeyNotFound { key })
}
pub(crate) fn not_leader(region_id: u64, message: Option<String>) -> Self {
Error::from(ErrorKind::NotLeader {
region_id,
message: message.unwrap_or_default(),
})
}
pub(crate) fn stale_epoch(message: Option<String>) -> Self {
Error::from(ErrorKind::StaleEpoch {
message: message.unwrap_or_default(),
})
}
pub(crate) fn region_not_found(region_id: u64, message: Option<String>) -> Self {
Error::from(ErrorKind::RegionNotFound {
region_id,
message: message.unwrap_or_default(),
})
}
pub(crate) fn invalid_key_range() -> Self {
Error::from(ErrorKind::InvalidKeyRange)
}
pub(crate) fn empty_value() -> Self {
Error::from(ErrorKind::EmptyValue)
}
pub(crate) fn max_scan_limit_exceeded(limit: u32, max_limit: u32) -> Self {
Error::from(ErrorKind::MaxScanLimitExceeded { limit, max_limit })
}
pub(crate) fn kv_error(message: String) -> Self {
Error::from(ErrorKind::KvError { message })
}
pub(crate) fn key_not_in_region(mut e: kvproto::errorpb::KeyNotInRegion) -> Self {
Error::from(ErrorKind::KeyNotInRegion {
key: e.take_key(),
region_id: e.get_region_id(),
start_key: e.take_start_key(),
end_key: e.take_end_key(),
})
}
pub(crate) fn server_is_busy(mut e: kvproto::errorpb::ServerIsBusy) -> Self {
Error::from(ErrorKind::ServerIsBusy {
reason: e.take_reason(),
backoff_ms: e.get_backoff_ms(),
})
}
pub(crate) fn stale_command(message: String) -> Self {
Error::from(ErrorKind::StaleCommand { message })
}
pub(crate) fn store_not_match(e: kvproto::errorpb::StoreNotMatch, message: String) -> Self {
Error::from(ErrorKind::StoreNotMatch {
request_store_id: e.get_request_store_id(),
actual_store_id: e.get_actual_store_id(),
message,
})
}
pub(crate) fn raft_entry_too_large(
e: kvproto::errorpb::RaftEntryTooLarge,
message: String,
) -> Self {
Error::from(ErrorKind::RaftEntryTooLarge {
region_id: e.get_region_id(),
entry_size: e.get_entry_size(),
message,
})
}
pub(crate) fn internal_error(message: String) -> Self {
Error::from(ErrorKind::InternalError { message })
}
}
impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Error {
Error {
inner: Context::new(kind),
}
}
}
impl From<Context<ErrorKind>> for Error {
fn from(inner: Context<ErrorKind>) -> Error {
Error { inner }
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Error::from(ErrorKind::Io(err))
}
}
impl From<grpcio::Error> for Error {
fn from(err: grpcio::Error) -> Self {
Error::from(ErrorKind::Grpc(err))
}
}
impl From<futures::sync::oneshot::Canceled> for Error {
fn from(err: futures::sync::oneshot::Canceled) -> Self {
Error::from(ErrorKind::Canceled(err))
}
}
impl From<kvproto::kvrpcpb::KeyError> for Error {
fn from(err: kvproto::kvrpcpb::KeyError) -> Self {
Error::from(ErrorKind::KeyError(err))
}
}
/// A result holding an [`Error`](enum.Error.html).
pub type Result<T> = result::Result<T, Error>;

View File

@ -10,8 +10,9 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#![recursion_limit = "128"]
#![type_length_limit = "1572864"]
// Long and nested future chains can quickly result in large generic types.
#![type_length_limit = "16777216"]
//! TiKV Client for Rust.
//!
@ -131,6 +132,8 @@ pub mod transaction;
#[doc(inline)]
pub use crate::errors::Error;
#[doc(inline)]
pub use crate::errors::ErrorKind;
#[doc(inline)]
pub use crate::errors::Result;
struct HexRepr<'a>(pub &'a [u8]);
@ -596,7 +599,7 @@ fn range_to_keys(range: (Bound<Key>, Bound<Key>)) -> Result<(Key, Option<Key>)>
}
v
}
Bound::Unbounded => return Err(Error::InvalidKeyRange),
Bound::Unbounded => Err(Error::invalid_key_range())?,
};
let end = match range.1 {
Bound::Included(v) => Some(v),

View File

@ -679,7 +679,7 @@ impl RequestInner for ScanInner {
fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> KvFuture<Self::Resp> {
if self.limit > MAX_RAW_KV_SCAN_LIMIT {
Box::new(future::err(Error::MaxScanLimitExceeded(
Box::new(future::err(Error::max_scan_limit_exceeded(
self.limit,
MAX_RAW_KV_SCAN_LIMIT,
)))
@ -751,13 +751,13 @@ impl RequestInner for BatchScanInner {
fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> KvFuture<Self::Resp> {
if self.each_limit > MAX_RAW_KV_SCAN_LIMIT {
Box::new(future::err(Error::MaxScanLimitExceeded(
Box::new(future::err(Error::max_scan_limit_exceeded(
self.each_limit,
MAX_RAW_KV_SCAN_LIMIT,
)))
} else if self.ranges.iter().any(Result::is_err) {
// All errors must be InvalidKeyRange so we can simply return a new InvalidKeyRange
Box::new(future::err(Error::InvalidKeyRange))
Box::new(future::err(Error::invalid_key_range()))
} else {
Box::new(client.raw_batch_scan(
self.ranges.into_iter().map(Result::unwrap).collect(),

View File

@ -285,7 +285,7 @@ impl RpcClient {
cf: Option<ColumnFamily>,
) -> impl Future<Item = (), Error = Error> {
if value.is_empty() {
Either::A(future::err(Error::EmptyValue))
Either::A(future::err(Error::empty_value()))
} else {
Either::B(
Self::raw(self.inner(), &key, cf)
@ -300,7 +300,7 @@ impl RpcClient {
cf: Option<ColumnFamily>,
) -> impl Future<Item = (), Error = Error> {
if pairs.iter().any(|p| p.value().is_empty()) {
Either::A(future::err(Error::EmptyValue))
Either::A(future::err(Error::empty_value()))
} else {
let inner = self.inner();
Either::B(
@ -416,7 +416,7 @@ impl RpcClient {
) -> impl Future<Item = Vec<KvPair>, Error = Error> {
drop(ranges);
drop(cf);
future::err(Error::Unimplemented)
future::err(Error::unimplemented())
}
pub fn raw_delete_range(

View File

@ -102,7 +102,7 @@ impl PdClient {
let region = if resp.has_region() {
resp.take_region()
} else {
return Err(Error::RegionForKeyNotFound(key));
Err(Error::region_for_key_not_found(key))?
};
let leader = if resp.has_leader() {
Some(resp.take_leader())
@ -128,7 +128,7 @@ impl PdClient {
let region = if resp.has_region() {
resp.take_region()
} else {
return Err(Error::RegionNotFound(region_id, None));
Err(Error::region_not_found(region_id, None))?
};
let leader = if resp.has_leader() {
Some(resp.take_leader())
@ -155,7 +155,7 @@ impl PdClient {
let cli = &cli.rl().client;
executor(cli, option)
.unwrap()
.map_err(Error::Grpc)
.map_err(Into::into)
.and_then(|r| {
{
let header = r.header();

View File

@ -139,12 +139,12 @@ impl PdReactor {
let (tx, rx) = client.wl().client.tso().unwrap();
let tso_rx = client.wl().reactor.tso_rx.take().unwrap();
handle.spawn(
tx.sink_map_err(Error::Grpc)
tx.sink_map_err(Into::into)
.send_all(tso_rx.then(|r| match r {
Ok(r) => Ok((r, WriteFlags::default())),
Err(()) => Err(internal_err!("failed to recv tso requests")),
}))
.then(|r| match r {
.then(|r: Result<_>| match r {
Ok((mut sender, _)) => {
sender.get_mut().cancel();
Ok(())
@ -220,7 +220,7 @@ impl PdReactor {
// Schedule tso request to run.
self.schedule(PdTask::Request);
}
rx.map_err(Error::Canceled).then(move |r| context.done(r))
rx.map_err(Into::into).then(move |r| context.done(r))
}
}
@ -362,10 +362,10 @@ fn connect(
) -> Result<(pdpb_grpc::PdClient, pdpb::GetMembersResponse)> {
let client = security_mgr.connect(env, addr, pdpb_grpc::PdClient::new)?;
let option = CallOption::default().timeout(timeout);
match client.get_members_opt(&pdpb::GetMembersRequest::new(), option) {
Ok(resp) => Ok((client, resp)),
Err(e) => Err(Error::Grpc(e)),
}
let resp = client
.get_members_opt(&pdpb::GetMembersRequest::new(), option)
.map_err(Error::from)?;
Ok((client, resp))
}
fn try_connect(

View File

@ -76,7 +76,7 @@ impl Region {
pub fn context(&self) -> Result<kvrpcpb::Context> {
self.leader
.as_ref()
.ok_or_else(|| Error::NotLeader(self.region.get_id(), None))
.ok_or_else(|| Error::not_leader(self.region.get_id(), None))
.map(|l| {
let mut ctx = kvrpcpb::Context::default();
ctx.set_region_id(self.region.get_id());
@ -113,7 +113,7 @@ impl Region {
.as_ref()
.map(Clone::clone)
.map(Into::into)
.ok_or_else(|| Error::StaleEpoch(None))
.ok_or_else(|| Error::stale_epoch(None))
}
pub fn meta(&self) -> metapb::Region {

View File

@ -25,7 +25,7 @@ use crate::{
tikv::context::{request_context, RequestContext},
},
transaction::{Mutation, TxnInfo},
Error, Key, KvPair, Result, Value,
Error, ErrorKind, Key, KvPair, Result, Value,
};
trait HasRegionError {
@ -41,39 +41,29 @@ impl From<errorpb::Error> for Error {
let message = e.take_message();
if e.has_not_leader() {
let e = e.get_not_leader();
Error::NotLeader(
e.get_region_id(),
Some(format!("{}. Leader: {:?}", message, e.get_leader())),
)
let message = format!("{}. Leader: {:?}", message, e.get_leader());
Error::not_leader(e.get_region_id(), Some(message))
} else if e.has_region_not_found() {
Error::RegionNotFound(e.get_region_not_found().get_region_id(), Some(message))
Error::region_not_found(e.get_region_not_found().get_region_id(), Some(message))
} else if e.has_key_not_in_region() {
let mut e = e.take_key_not_in_region();
Error::KeyNotInRegion(
e.take_key(),
e.get_region_id(),
e.take_start_key(),
e.take_end_key(),
)
Error::key_not_in_region(e.take_key_not_in_region())
} else if e.has_stale_epoch() {
Error::StaleEpoch(Some(format!(
let message = format!(
"{}. New epoch: {:?}",
message,
e.get_stale_epoch().get_new_regions()
)))
);
Error::stale_epoch(Some(message))
} else if e.has_server_is_busy() {
let mut e = e.take_server_is_busy();
Error::ServerIsBusy(e.take_reason(), e.get_backoff_ms())
Error::server_is_busy(e.take_server_is_busy())
} else if e.has_stale_command() {
Error::StaleCommand(message)
Error::stale_command(message)
} else if e.has_store_not_match() {
let e = e.get_store_not_match();
Error::StoreNotMatch(e.get_request_store_id(), e.get_actual_store_id(), message)
Error::store_not_match(e.take_store_not_match(), message)
} else if e.has_raft_entry_too_large() {
let e = e.get_raft_entry_too_large();
Error::RaftEntryTooLarge(e.get_region_id(), e.get_entry_size(), message)
Error::raft_entry_too_large(e.take_raft_entry_too_large(), message)
} else {
Error::InternalError(message)
Error::internal_error(message)
}
}
}
@ -114,12 +104,6 @@ has_region_error!(kvrpcpb::RawDeleteRangeResponse);
has_region_error!(kvrpcpb::RawScanResponse);
has_region_error!(kvrpcpb::RawBatchScanResponse);
impl From<kvrpcpb::KeyError> for Error {
fn from(e: kvrpcpb::KeyError) -> Error {
Error::KeyError(format!("{:?}", e))
}
}
macro_rules! has_key_error {
($type:ty) => {
impl HasError for $type {
@ -149,7 +133,7 @@ macro_rules! has_str_error {
if self.get_error().is_empty() {
None
} else {
Some(Error::KVError(self.take_error()))
Some(Error::kv_error(self.take_error()) )
}
}
}
@ -632,7 +616,7 @@ impl KvClient {
)
.unwrap()
.then(|r| match r {
Err(e) => Err(Error::Grpc(e)),
Err(e) => Err(ErrorKind::Grpc(e))?,
Ok(mut r) => {
if let Some(e) = r.region_error() {
Err(e)

View File

@ -22,7 +22,10 @@ use tokio_timer::{self, timer::Handle};
macro_rules! internal_err {
($e:expr) => ({
$crate::Error::InternalError(format!("[{}:{}]: {}", file!(), line!(), $e))
let kind = $crate::Error::internal_error(
format!("[{}:{}]: {}", file!(), line!(), $e)
);
$crate::Error::from(kind)
});
($f:tt, $($arg:expr),+) => ({
internal_err!(format!($f, $($arg),+))