Merge pull request #187 from ekexium/remove-expect

Propagates error instead of panicking when getting context returns error
This commit is contained in:
Nick Cameron 2020-10-28 14:14:30 +13:00 committed by GitHub
commit fa4a3f3c89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 81 deletions

View File

@ -30,12 +30,16 @@ impl KvRequest for kvrpcpb::RawGetRequest {
store_stream_for_key(key, pd_client)
}
fn make_rpc_request<KvC: KvClient>(&self, key: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
key: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_key(key.into());
req.set_cf(self.cf.clone());
req
Ok(req)
}
fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
@ -74,12 +78,16 @@ impl KvRequest for kvrpcpb::RawBatchGetRequest {
const REQUEST_NAME: &'static str = "raw_batch_get";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_get_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_cf(self.cf.clone());
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -120,13 +128,17 @@ impl KvRequest for kvrpcpb::RawPutRequest {
const REQUEST_NAME: &'static str = "raw_put";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_put_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, key: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
key: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_key(key.0.into());
req.set_value(key.1);
req.set_cf(self.cf.clone());
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -171,12 +183,16 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
const REQUEST_NAME: &'static str = "raw_batch_put";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_put_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, pairs: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
pairs: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_pairs(pairs.into_iter().map(Into::into).collect());
req.set_cf(self.cf.clone());
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -215,12 +231,16 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
const REQUEST_NAME: &'static str = "raw_delete";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_delete_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, key: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
key: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_key(key.into());
req.set_cf(self.cf.clone());
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -261,12 +281,16 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
const REQUEST_NAME: &'static str = "raw_batch_delete";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_delete_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_cf(self.cf.clone());
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -309,13 +333,13 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
&self,
(start_key, end_key): Self::KeyData,
store: &Store<KvC>,
) -> Self {
let mut req = self.request_from_store(store);
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_start_key(start_key.into());
req.set_end_key(end_key.into());
req.set_cf(self.cf.clone());
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -363,15 +387,15 @@ impl KvRequest for kvrpcpb::RawScanRequest {
&self,
(start_key, end_key): Self::KeyData,
store: &Store<KvC>,
) -> Self {
let mut req = self.request_from_store(store);
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_start_key(start_key.into());
req.set_end_key(end_key.into());
req.set_limit(self.limit);
req.set_key_only(self.key_only);
req.set_cf(self.cf.clone());
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -419,14 +443,18 @@ impl KvRequest for kvrpcpb::RawBatchScanRequest {
const REQUEST_NAME: &'static str = "raw_batch_scan";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_scan_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, ranges: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
ranges: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_ranges(ranges.into_iter().map(Into::into).collect());
req.set_each_limit(self.each_limit);
req.set_key_only(self.key_only);
req.set_cf(self.cf.clone());
req
Ok(req)
}
fn store_stream<PdC: PdClient>(

View File

@ -64,20 +64,23 @@ pub trait KvRequest: Sync + Send + 'static + Sized {
stores
.and_then(move |(key_data, store)| {
let request = self.make_rpc_request(key_data, &store);
let (req, err) = match request {
Ok(r) => (Some(r), None),
Err(e) => (None, Some(e)),
};
self.dispatch_hook(store.call_options())
.unwrap_or_else(|| {
store.dispatch(
.unwrap_or_else(|| match (&req, err) {
(Some(req), None) => store.dispatch(
Self::REQUEST_NAME,
Self::RPC_FN(
&store.client.get_rpc_client(),
&request,
store.call_options(),
),
)
Self::RPC_FN(&store.client.get_rpc_client(), req, store.call_options()),
),
(None, Some(err)) => future::err(err).boxed(),
_ => unreachable!(),
})
.map_ok(move |response| (request, response))
.map_ok(move |response| (req, response))
})
.map_ok(move |(request, mut response)| {
let request = request.unwrap();
if let Some(region_error) = response.region_error() {
return request.on_region_error(
region_error,
@ -164,7 +167,11 @@ pub trait KvRequest: Sync + Send + 'static + Sized {
pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>>;
fn make_rpc_request<KvC: KvClient>(&self, key_data: Self::KeyData, store: &Store<KvC>) -> Self;
fn make_rpc_request<KvC: KvClient>(
&self,
key_data: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self>;
fn map_result(result: Self::RpcResponse) -> Self::Result;
@ -172,19 +179,13 @@ pub trait KvRequest: Sync + Send + 'static + Sized {
results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>>;
fn request_from_store<KvC: KvClient>(&self, store: &Store<KvC>) -> Self
fn request_from_store<KvC: KvClient>(&self, store: &Store<KvC>) -> Result<Self>
where
Self: Default + KvRpcRequest,
{
let mut request = Self::default();
// FIXME propagate the error instead of using `expect`
request.set_context(
store
.region
.context()
.expect("Cannot create context from region"),
);
request
request.set_context(store.region.context()?);
Ok(request)
}
}
@ -386,10 +387,10 @@ mod test {
&self,
_key_data: Self::KeyData,
_store: &Store<KvC>,
) -> Self {
Self {
) -> Result<Self> {
Ok(Self {
test_invoking_count: self.test_invoking_count.clone(),
}
})
}
fn map_result(_: Self::RpcResponse) -> Self::Result {}

View File

@ -26,12 +26,16 @@ impl KvRequest for kvrpcpb::GetRequest {
store_stream_for_key(key, pd_client)
}
fn make_rpc_request<KvC: KvClient>(&self, key: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store::<KvC>(store);
fn make_rpc_request<KvC: KvClient>(
&self,
key: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store::<KvC>(store)?;
req.set_key(key.into());
req.set_version(self.version);
req
Ok(req)
}
fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
@ -76,12 +80,16 @@ impl KvRequest for kvrpcpb::BatchGetRequest {
const REQUEST_NAME: &'static str = "kv_batch_get";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_batch_get_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_version(self.version);
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -134,15 +142,15 @@ impl KvRequest for kvrpcpb::ScanRequest {
&self,
(start_key, end_key): Self::KeyData,
store: &Store<KvC>,
) -> Self {
let mut req = self.request_from_store(store);
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_start_key(start_key.into());
req.set_end_key(end_key.into());
req.set_limit(self.limit);
req.set_key_only(self.key_only);
req.set_version(self.version);
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -205,7 +213,7 @@ impl KvRequest for kvrpcpb::ResolveLockRequest {
&self,
(context, keys): Self::KeyData,
_store: &Store<KvC>,
) -> Self {
) -> Result<Self> {
let mut req = Self::default();
req.set_context(context);
req.set_start_version(self.start_version);
@ -213,7 +221,7 @@ impl KvRequest for kvrpcpb::ResolveLockRequest {
req.set_txn_infos(self.txn_infos.clone());
req.set_keys(keys);
req
Ok(req)
}
fn on_region_error(
@ -274,12 +282,16 @@ impl KvRequest for kvrpcpb::CleanupRequest {
const REQUEST_NAME: &'static str = "kv_cleanup";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_cleanup_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, key: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
key: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_key(key.into());
req.set_start_version(self.start_version);
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -323,8 +335,8 @@ impl KvRequest for kvrpcpb::PrewriteRequest {
&self,
mutations: Self::KeyData,
store: &Store<KvC>,
) -> Self {
let mut req = self.request_from_store(store);
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_mutations(mutations);
req.set_primary_lock(self.primary_lock.clone());
req.set_start_version(self.start_version);
@ -334,7 +346,7 @@ impl KvRequest for kvrpcpb::PrewriteRequest {
req.set_for_update_ts(self.for_update_ts);
req.set_is_pessimistic_lock(self.is_pessimistic_lock.clone());
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -404,13 +416,17 @@ impl KvRequest for kvrpcpb::CommitRequest {
const REQUEST_NAME: &'static str = "kv_commit";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_commit_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys);
req.set_start_version(self.start_version);
req.set_commit_version(self.commit_version);
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -453,12 +469,16 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest {
const REQUEST_NAME: &'static str = "kv_batch_rollback";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_batch_rollback_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys);
req.set_start_version(self.start_version);
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -500,12 +520,16 @@ impl KvRequest for kvrpcpb::PessimisticRollbackRequest {
const RPC_FN: RpcFnType<Self, Self::RpcResponse> =
TikvClient::kv_pessimistic_rollback_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self {
let mut req = self.request_from_store(store);
fn make_rpc_request<KvC: KvClient>(
&self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys);
req.set_start_version(self.start_version);
req
Ok(req)
}
fn store_stream<PdC: PdClient>(
@ -561,8 +585,8 @@ impl KvRequest for kvrpcpb::PessimisticLockRequest {
&self,
mutations: Self::KeyData,
store: &Store<KvC>,
) -> Self {
let mut req = self.request_from_store(store);
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_mutations(mutations);
req.set_primary_lock(self.primary_lock.clone());
req.set_start_version(self.start_version);
@ -574,7 +598,7 @@ impl KvRequest for kvrpcpb::PessimisticLockRequest {
req.set_return_values(self.return_values);
req.set_min_commit_ts(self.min_commit_ts);
req
Ok(req)
}
fn map_result(_result: Self::RpcResponse) -> Self::Result {}
@ -645,12 +669,12 @@ impl KvRequest for kvrpcpb::ScanLockRequest {
&self,
(start_key, _): Self::KeyData,
store: &Store<KvC>,
) -> Self {
let mut req = self.request_from_store(store);
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_max_version(self.max_version);
req.set_start_key(start_key.into());
req.set_limit(self.limit);
req
Ok(req)
}
fn map_result(mut result: Self::RpcResponse) -> Self::Result {