propagates error instead of panicking when getting context returns error

Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
ekexium 2020-10-26 22:15:15 +08:00
parent a399f8157a
commit e516617b36
3 changed files with 142 additions and 81 deletions

View File

@ -30,12 +30,16 @@ impl KvRequest for kvrpcpb::RawGetRequest {
store_stream_for_key(key, pd_client) store_stream_for_key(key, pd_client)
} }
fn make_rpc_request<KvC: KvClient>(&self, key: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
key: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_key(key.into()); req.set_key(key.into());
req.set_cf(self.cf.clone()); req.set_cf(self.cf.clone());
req Ok(req)
} }
fn map_result(mut resp: Self::RpcResponse) -> Self::Result { fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
@ -74,12 +78,16 @@ impl KvRequest for kvrpcpb::RawBatchGetRequest {
const REQUEST_NAME: &'static str = "raw_batch_get"; const REQUEST_NAME: &'static str = "raw_batch_get";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_get_async_opt; const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_get_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys.into_iter().map(Into::into).collect()); req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_cf(self.cf.clone()); req.set_cf(self.cf.clone());
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -120,13 +128,17 @@ impl KvRequest for kvrpcpb::RawPutRequest {
const REQUEST_NAME: &'static str = "raw_put"; const REQUEST_NAME: &'static str = "raw_put";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_put_async_opt; const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_put_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, key: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
key: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_key(key.0.into()); req.set_key(key.0.into());
req.set_value(key.1); req.set_value(key.1);
req.set_cf(self.cf.clone()); req.set_cf(self.cf.clone());
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -171,12 +183,16 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
const REQUEST_NAME: &'static str = "raw_batch_put"; const REQUEST_NAME: &'static str = "raw_batch_put";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_put_async_opt; const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_put_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, pairs: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
pairs: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_pairs(pairs.into_iter().map(Into::into).collect()); req.set_pairs(pairs.into_iter().map(Into::into).collect());
req.set_cf(self.cf.clone()); req.set_cf(self.cf.clone());
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -215,12 +231,16 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
const REQUEST_NAME: &'static str = "raw_delete"; const REQUEST_NAME: &'static str = "raw_delete";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_delete_async_opt; const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_delete_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, key: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
key: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_key(key.into()); req.set_key(key.into());
req.set_cf(self.cf.clone()); req.set_cf(self.cf.clone());
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -261,12 +281,16 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
const REQUEST_NAME: &'static str = "raw_batch_delete"; const REQUEST_NAME: &'static str = "raw_batch_delete";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_delete_async_opt; const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_delete_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys.into_iter().map(Into::into).collect()); req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_cf(self.cf.clone()); req.set_cf(self.cf.clone());
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -309,13 +333,13 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
&self, &self,
(start_key, end_key): Self::KeyData, (start_key, end_key): Self::KeyData,
store: &Store<KvC>, store: &Store<KvC>,
) -> Self { ) -> Result<Self> {
let mut req = self.request_from_store(store); let mut req = self.request_from_store(store)?;
req.set_start_key(start_key.into()); req.set_start_key(start_key.into());
req.set_end_key(end_key.into()); req.set_end_key(end_key.into());
req.set_cf(self.cf.clone()); req.set_cf(self.cf.clone());
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -363,15 +387,15 @@ impl KvRequest for kvrpcpb::RawScanRequest {
&self, &self,
(start_key, end_key): Self::KeyData, (start_key, end_key): Self::KeyData,
store: &Store<KvC>, store: &Store<KvC>,
) -> Self { ) -> Result<Self> {
let mut req = self.request_from_store(store); let mut req = self.request_from_store(store)?;
req.set_start_key(start_key.into()); req.set_start_key(start_key.into());
req.set_end_key(end_key.into()); req.set_end_key(end_key.into());
req.set_limit(self.limit); req.set_limit(self.limit);
req.set_key_only(self.key_only); req.set_key_only(self.key_only);
req.set_cf(self.cf.clone()); req.set_cf(self.cf.clone());
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -419,14 +443,18 @@ impl KvRequest for kvrpcpb::RawBatchScanRequest {
const REQUEST_NAME: &'static str = "raw_batch_scan"; const REQUEST_NAME: &'static str = "raw_batch_scan";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_scan_async_opt; const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::raw_batch_scan_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, ranges: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
ranges: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_ranges(ranges.into_iter().map(Into::into).collect()); req.set_ranges(ranges.into_iter().map(Into::into).collect());
req.set_each_limit(self.each_limit); req.set_each_limit(self.each_limit);
req.set_key_only(self.key_only); req.set_key_only(self.key_only);
req.set_cf(self.cf.clone()); req.set_cf(self.cf.clone());
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(

View File

@ -64,20 +64,31 @@ pub trait KvRequest: Sync + Send + 'static + Sized {
stores stores
.and_then(move |(key_data, store)| { .and_then(move |(key_data, store)| {
let request = self.make_rpc_request(key_data, &store); let request = self.make_rpc_request(key_data, &store);
let mut req = None;
let mut err = None;
match request {
Ok(r) => req = Some(r),
Err(e) => err = Some(e),
}
self.dispatch_hook(store.call_options()) self.dispatch_hook(store.call_options())
.unwrap_or_else(|| { .unwrap_or_else(|| {
store.dispatch( if req.is_some() {
Self::REQUEST_NAME, store.dispatch(
Self::RPC_FN( Self::REQUEST_NAME,
&store.client.get_rpc_client(), Self::RPC_FN(
&request, &store.client.get_rpc_client(),
store.call_options(), req.as_ref().unwrap(),
), store.call_options(),
) ),
)
} else {
future::err(err.unwrap()).boxed()
}
}) })
.map_ok(move |response| (request, response)) .map_ok(move |response| (req, response))
}) })
.map_ok(move |(request, mut response)| { .map_ok(move |(request, mut response)| {
let request = request.unwrap();
if let Some(region_error) = response.region_error() { if let Some(region_error) = response.region_error() {
return request.on_region_error( return request.on_region_error(
region_error, region_error,
@ -164,7 +175,11 @@ pub trait KvRequest: Sync + Send + 'static + Sized {
pd_client: Arc<PdC>, pd_client: Arc<PdC>,
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>>; ) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>>;
fn make_rpc_request<KvC: KvClient>(&self, key_data: Self::KeyData, store: &Store<KvC>) -> Self; fn make_rpc_request<KvC: KvClient>(
&self,
key_data: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self>;
fn map_result(result: Self::RpcResponse) -> Self::Result; fn map_result(result: Self::RpcResponse) -> Self::Result;
@ -172,19 +187,13 @@ pub trait KvRequest: Sync + Send + 'static + Sized {
results: BoxStream<'static, Result<Self::Result>>, results: BoxStream<'static, Result<Self::Result>>,
) -> BoxFuture<'static, Result<Self::Result>>; ) -> BoxFuture<'static, Result<Self::Result>>;
fn request_from_store<KvC: KvClient>(&self, store: &Store<KvC>) -> Self fn request_from_store<KvC: KvClient>(&self, store: &Store<KvC>) -> Result<Self>
where where
Self: Default + KvRpcRequest, Self: Default + KvRpcRequest,
{ {
let mut request = Self::default(); let mut request = Self::default();
// FIXME propagate the error instead of using `expect` request.set_context(store.region.context()?);
request.set_context( Ok(request)
store
.region
.context()
.expect("Cannot create context from region"),
);
request
} }
} }
@ -386,10 +395,10 @@ mod test {
&self, &self,
_key_data: Self::KeyData, _key_data: Self::KeyData,
_store: &Store<KvC>, _store: &Store<KvC>,
) -> Self { ) -> Result<Self> {
Self { Ok(Self {
test_invoking_count: self.test_invoking_count.clone(), test_invoking_count: self.test_invoking_count.clone(),
} })
} }
fn map_result(_: Self::RpcResponse) -> Self::Result {} fn map_result(_: Self::RpcResponse) -> Self::Result {}

View File

@ -26,12 +26,16 @@ impl KvRequest for kvrpcpb::GetRequest {
store_stream_for_key(key, pd_client) store_stream_for_key(key, pd_client)
} }
fn make_rpc_request<KvC: KvClient>(&self, key: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store::<KvC>(store); &self,
key: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store::<KvC>(store)?;
req.set_key(key.into()); req.set_key(key.into());
req.set_version(self.version); req.set_version(self.version);
req Ok(req)
} }
fn map_result(mut resp: Self::RpcResponse) -> Self::Result { fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
@ -76,12 +80,16 @@ impl KvRequest for kvrpcpb::BatchGetRequest {
const REQUEST_NAME: &'static str = "kv_batch_get"; const REQUEST_NAME: &'static str = "kv_batch_get";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_batch_get_async_opt; const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_batch_get_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys.into_iter().map(Into::into).collect()); req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_version(self.version); req.set_version(self.version);
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -134,15 +142,15 @@ impl KvRequest for kvrpcpb::ScanRequest {
&self, &self,
(start_key, end_key): Self::KeyData, (start_key, end_key): Self::KeyData,
store: &Store<KvC>, store: &Store<KvC>,
) -> Self { ) -> Result<Self> {
let mut req = self.request_from_store(store); let mut req = self.request_from_store(store)?;
req.set_start_key(start_key.into()); req.set_start_key(start_key.into());
req.set_end_key(end_key.into()); req.set_end_key(end_key.into());
req.set_limit(self.limit); req.set_limit(self.limit);
req.set_key_only(self.key_only); req.set_key_only(self.key_only);
req.set_version(self.version); req.set_version(self.version);
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -205,7 +213,7 @@ impl KvRequest for kvrpcpb::ResolveLockRequest {
&self, &self,
(context, keys): Self::KeyData, (context, keys): Self::KeyData,
_store: &Store<KvC>, _store: &Store<KvC>,
) -> Self { ) -> Result<Self> {
let mut req = Self::default(); let mut req = Self::default();
req.set_context(context); req.set_context(context);
req.set_start_version(self.start_version); req.set_start_version(self.start_version);
@ -213,7 +221,7 @@ impl KvRequest for kvrpcpb::ResolveLockRequest {
req.set_txn_infos(self.txn_infos.clone()); req.set_txn_infos(self.txn_infos.clone());
req.set_keys(keys); req.set_keys(keys);
req Ok(req)
} }
fn on_region_error( fn on_region_error(
@ -274,12 +282,16 @@ impl KvRequest for kvrpcpb::CleanupRequest {
const REQUEST_NAME: &'static str = "kv_cleanup"; const REQUEST_NAME: &'static str = "kv_cleanup";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_cleanup_async_opt; const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_cleanup_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, key: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
key: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_key(key.into()); req.set_key(key.into());
req.set_start_version(self.start_version); req.set_start_version(self.start_version);
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -323,8 +335,8 @@ impl KvRequest for kvrpcpb::PrewriteRequest {
&self, &self,
mutations: Self::KeyData, mutations: Self::KeyData,
store: &Store<KvC>, store: &Store<KvC>,
) -> Self { ) -> Result<Self> {
let mut req = self.request_from_store(store); let mut req = self.request_from_store(store)?;
req.set_mutations(mutations); req.set_mutations(mutations);
req.set_primary_lock(self.primary_lock.clone()); req.set_primary_lock(self.primary_lock.clone());
req.set_start_version(self.start_version); req.set_start_version(self.start_version);
@ -334,7 +346,7 @@ impl KvRequest for kvrpcpb::PrewriteRequest {
req.set_for_update_ts(self.for_update_ts); req.set_for_update_ts(self.for_update_ts);
req.set_is_pessimistic_lock(self.is_pessimistic_lock.clone()); req.set_is_pessimistic_lock(self.is_pessimistic_lock.clone());
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -404,13 +416,17 @@ impl KvRequest for kvrpcpb::CommitRequest {
const REQUEST_NAME: &'static str = "kv_commit"; const REQUEST_NAME: &'static str = "kv_commit";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_commit_async_opt; const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_commit_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys); req.set_keys(keys);
req.set_start_version(self.start_version); req.set_start_version(self.start_version);
req.set_commit_version(self.commit_version); req.set_commit_version(self.commit_version);
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -453,12 +469,16 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest {
const REQUEST_NAME: &'static str = "kv_batch_rollback"; const REQUEST_NAME: &'static str = "kv_batch_rollback";
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_batch_rollback_async_opt; const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_batch_rollback_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys); req.set_keys(keys);
req.set_start_version(self.start_version); req.set_start_version(self.start_version);
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -500,12 +520,16 @@ impl KvRequest for kvrpcpb::PessimisticRollbackRequest {
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = const RPC_FN: RpcFnType<Self, Self::RpcResponse> =
TikvClient::kv_pessimistic_rollback_async_opt; TikvClient::kv_pessimistic_rollback_async_opt;
fn make_rpc_request<KvC: KvClient>(&self, keys: Self::KeyData, store: &Store<KvC>) -> Self { fn make_rpc_request<KvC: KvClient>(
let mut req = self.request_from_store(store); &self,
keys: Self::KeyData,
store: &Store<KvC>,
) -> Result<Self> {
let mut req = self.request_from_store(store)?;
req.set_keys(keys); req.set_keys(keys);
req.set_start_version(self.start_version); req.set_start_version(self.start_version);
req Ok(req)
} }
fn store_stream<PdC: PdClient>( fn store_stream<PdC: PdClient>(
@ -561,8 +585,8 @@ impl KvRequest for kvrpcpb::PessimisticLockRequest {
&self, &self,
mutations: Self::KeyData, mutations: Self::KeyData,
store: &Store<KvC>, store: &Store<KvC>,
) -> Self { ) -> Result<Self> {
let mut req = self.request_from_store(store); let mut req = self.request_from_store(store)?;
req.set_mutations(mutations); req.set_mutations(mutations);
req.set_primary_lock(self.primary_lock.clone()); req.set_primary_lock(self.primary_lock.clone());
req.set_start_version(self.start_version); req.set_start_version(self.start_version);
@ -574,7 +598,7 @@ impl KvRequest for kvrpcpb::PessimisticLockRequest {
req.set_return_values(self.return_values); req.set_return_values(self.return_values);
req.set_min_commit_ts(self.min_commit_ts); req.set_min_commit_ts(self.min_commit_ts);
req Ok(req)
} }
fn map_result(_result: Self::RpcResponse) -> Self::Result {} fn map_result(_result: Self::RpcResponse) -> Self::Result {}
@ -645,12 +669,12 @@ impl KvRequest for kvrpcpb::ScanLockRequest {
&self, &self,
(start_key, _): Self::KeyData, (start_key, _): Self::KeyData,
store: &Store<KvC>, store: &Store<KvC>,
) -> Self { ) -> Result<Self> {
let mut req = self.request_from_store(store); let mut req = self.request_from_store(store)?;
req.set_max_version(self.max_version); req.set_max_version(self.max_version);
req.set_start_key(start_key.into()); req.set_start_key(start_key.into());
req.set_limit(self.limit); req.set_limit(self.limit);
req Ok(req)
} }
fn map_result(mut result: Self::RpcResponse) -> Self::Result { fn map_result(mut result: Self::RpcResponse) -> Self::Result {