// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ backoff::{Backoff, DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF, PESSIMISTIC_BACKOFF}, transaction::HasLocks, }; use async_trait::async_trait; use derive_new::new; use tikv_client_store::{HasKeyErrors, Request}; pub use self::{ plan::{ Collect, CollectError, CollectSingle, CollectWithShard, DefaultProcessor, Dispatch, ExtractError, Merge, MergeResponse, Plan, Process, ProcessResponse, ResolveLock, ResponseWithShard, RetryableMultiRegion, }, plan_builder::{PlanBuilder, SingleKey}, shard::{HasNextBatch, NextBatch, Shardable}, }; pub mod plan; mod plan_builder; #[macro_use] mod shard; /// Abstracts any request sent to a TiKV server. #[async_trait] pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static { /// The expected response to the request. type Response: HasKeyErrors + HasLocks + Clone + Send + 'static; } #[derive(Clone, Debug, new, Eq, PartialEq)] pub struct RetryOptions { /// How to retry when there is a region error and we need to resolve regions with PD. pub region_backoff: Backoff, /// How to retry when a key is locked. pub lock_backoff: Backoff, } impl RetryOptions { pub const fn default_optimistic() -> RetryOptions { RetryOptions { region_backoff: DEFAULT_REGION_BACKOFF, lock_backoff: OPTIMISTIC_BACKOFF, } } pub const fn default_pessimistic() -> RetryOptions { RetryOptions { region_backoff: DEFAULT_REGION_BACKOFF, lock_backoff: PESSIMISTIC_BACKOFF, } } pub const fn none() -> RetryOptions { RetryOptions { region_backoff: Backoff::no_backoff(), lock_backoff: Backoff::no_backoff(), } } } #[cfg(test)] mod test { use super::*; use crate::{ mock::{MockKvClient, MockPdClient}, store::store_stream_for_keys, transaction::lowering::new_commit_request, Error, Key, Result, }; use grpcio::CallOption; use std::{ any::Any, iter, sync::{atomic::AtomicUsize, Arc}, }; use tikv_client_proto::{kvrpcpb, pdpb::Timestamp, tikvpb::TikvClient}; use tikv_client_store::HasRegionError; #[tokio::test] async fn test_region_retry() { #[derive(Clone)] struct MockRpcResponse; impl HasKeyErrors for MockRpcResponse { fn key_errors(&mut self) -> Option> { None } } impl HasRegionError for MockRpcResponse { fn region_error(&mut self) -> Option { Some(tikv_client_proto::errorpb::Error::default()) } } impl HasLocks for MockRpcResponse {} #[derive(Clone)] struct MockKvRequest { test_invoking_count: Arc, } #[async_trait] impl Request for MockKvRequest { async fn dispatch(&self, _: &TikvClient, _: CallOption) -> Result> { Ok(Box::new(MockRpcResponse {})) } fn label(&self) -> &'static str { "mock" } fn as_any(&self) -> &dyn Any { self } fn set_context(&mut self, _: kvrpcpb::Context) { unreachable!(); } } #[async_trait] impl KvRequest for MockKvRequest { type Response = MockRpcResponse; } impl Shardable for MockKvRequest { type Shard = Vec>; fn shards( &self, pd_client: &std::sync::Arc, ) -> futures::stream::BoxStream< 'static, crate::Result<(Self::Shard, crate::store::RegionStore)>, > { // Increases by 1 for each call. self.test_invoking_count .fetch_add(1, std::sync::atomic::Ordering::SeqCst); store_stream_for_keys( Some(Key::from("mock_key".to_owned())).into_iter(), pd_client.clone(), ) } fn apply_shard( &mut self, _shard: Self::Shard, _store: &crate::store::RegionStore, ) -> crate::Result<()> { Ok(()) } } let invoking_count = Arc::new(AtomicUsize::new(0)); let request = MockKvRequest { test_invoking_count: invoking_count.clone(), }; let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box), ))); let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3)) .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3)) .extract_error() .plan(); let _ = plan.execute().await; // Original call plus the 3 retries assert_eq!(invoking_count.load(std::sync::atomic::Ordering::SeqCst), 4); } #[tokio::test] async fn test_extract_error() { let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |_: &dyn Any| { Ok(Box::new(kvrpcpb::CommitResponse { error: Some(kvrpcpb::KeyError::default()).into(), ..Default::default() }) as Box) }, ))); let key: Key = "key".to_owned().into(); let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default()); // does not extract error let plan = crate::request::PlanBuilder::new(pd_client.clone(), req.clone()) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(OPTIMISTIC_BACKOFF) .plan(); assert!(plan.execute().await.is_ok()); // extract error let plan = crate::request::PlanBuilder::new(pd_client.clone(), req) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(OPTIMISTIC_BACKOFF) .extract_error() .plan(); assert!(plan.execute().await.is_err()); } }