Merge pull request #158 from nrc/refactor

Refactor PD crate to make it more independent
This commit is contained in:
Nick Cameron 2020-07-27 21:40:09 +12:00 committed by GitHub
commit 9d9dca4103
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 303 additions and 299 deletions

1
Cargo.lock generated
View File

@ -1820,6 +1820,7 @@ dependencies = [
name = "tikv-client-pd"
version = "0.0.0"
dependencies = [
"async-trait",
"clap",
"derive-new",
"fail",

View File

@ -100,3 +100,5 @@ pub use crate::transaction::{Client as TransactionClient, Snapshot, Transaction}
pub use tikv_client_common::{
BoundRange, Config, Error, ErrorKind, Key, KvPair, Result, Timestamp, ToOwnedRange, Value,
};
#[doc(inline)]
pub use tikv_client_store::region::{Region, RegionId, RegionVerId, StoreId};

View File

@ -15,8 +15,7 @@ use futures::future::{ready, BoxFuture, FutureExt};
use grpcio::CallOption;
use kvproto::{errorpb, kvrpcpb, metapb, tikvpb::TikvClient};
use std::{future::Future, sync::Arc, time::Duration};
use tikv_client_common::{Region, RegionId};
use tikv_client_store::{HasError, KvClient, KvConnect, Store};
use tikv_client_store::{HasError, KvClient, KvConnect, Region, RegionId, Store};
/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
/// client can be tested without doing any RPC calls.

View File

@ -1,6 +1,6 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{pd::RetryClient, Config, Key};
use crate::{pd::RetryClient, Config, Key, Region, RegionId};
use futures::{
future::{ready, BoxFuture, Either},
prelude::*,
@ -15,7 +15,7 @@ use std::{
use tikv_client_common::{
compat::{stream_fn, ClientFutureExt},
security::SecurityManager,
BoundRange, Region, RegionId, Result, Timestamp,
BoundRange, Result, Timestamp,
};
use tikv_client_pd::Cluster;
use tikv_client_store::{KvClient, KvConnect, Store, TikvConnect};

View File

@ -2,18 +2,21 @@
//! A utility module for managing and retrying PD requests.
use crate::{Region, RegionId, StoreId};
use async_trait::async_trait;
use futures::prelude::*;
use futures_timer::Delay;
use grpcio::Environment;
use kvproto::metapb;
use kvproto::{
metapb,
pdpb::{self, Timestamp},
};
use std::{
fmt,
sync::Arc,
time::{Duration, Instant},
};
use tikv_client_common::{security::SecurityManager, Region, RegionId, Result, StoreId, Timestamp};
use tikv_client_pd::cluster::{Cluster, Connection};
use tikv_client_common::{security::SecurityManager, stats::pd_stats, Error, Result};
use tikv_client_pd::{Cluster, Connection};
use tokio::sync::RwLock;
// FIXME: these numbers and how they are used are all just cargo-culted in, there
@ -24,7 +27,8 @@ const LEADER_CHANGE_RETRY: usize = 10;
/// Client for communication with a PD cluster. Has the facility to reconnect to the cluster.
pub struct RetryClient<Cl = Cluster> {
cluster: RwLock<Cl>,
// Tuple is the cluster and the time of the cluster's last reconnect.
cluster: RwLock<(Cl, Instant)>,
connection: Connection,
timeout: Duration,
}
@ -39,13 +43,39 @@ impl<Cl> RetryClient<Cl> {
) -> RetryClient<Cl> {
let connection = Connection::new(env, security_mgr);
RetryClient {
cluster: RwLock::new(cluster),
cluster: RwLock::new((cluster, Instant::now())),
connection,
timeout,
}
}
}
macro_rules! retry {
($self: ident, $tag: literal, |$cluster: ident| $call: expr) => {{
let context = pd_stats($tag);
let mut last_err = Ok(());
for _ in 0..LEADER_CHANGE_RETRY {
let $cluster = &$self.cluster.read().await.0;
match context.done($call.await) {
Ok(r) => return Ok(r),
Err(e) => last_err = Err(e),
}
let mut reconnect_count = MAX_REQUEST_COUNT;
while let Err(e) = $self.reconnect(RECONNECT_INTERVAL_SEC).await {
reconnect_count -= 1;
if reconnect_count == 0 {
return Err(e);
}
Delay::new(Duration::from_secs(RECONNECT_INTERVAL_SEC)).await;
}
}
last_err?;
unreachable!();
}};
}
impl RetryClient<Cluster> {
pub async fn connect(
env: Arc<Environment>,
@ -54,7 +84,10 @@ impl RetryClient<Cluster> {
timeout: Duration,
) -> Result<RetryClient> {
let connection = Connection::new(env, security_mgr);
let cluster = RwLock::new(connection.connect_cluster(endpoints, timeout).await?);
let cluster = RwLock::new((
connection.connect_cluster(endpoints, timeout).await?,
Instant::now(),
));
Ok(RetryClient {
cluster,
connection,
@ -64,31 +97,49 @@ impl RetryClient<Cluster> {
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
pub async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<Region> {
let timeout = self.timeout;
retry_request(self, move |cluster| {
cluster.get_region(key.clone(), timeout)
retry!(self, "get_region", |cluster| {
let key = key.clone();
async {
cluster
.get_region(key.clone(), self.timeout)
.await
.and_then(|resp| {
region_from_response(resp, || Error::region_for_key_not_found(key))
})
}
})
.await
}
pub async fn get_region_by_id(self: Arc<Self>, id: RegionId) -> Result<Region> {
let timeout = self.timeout;
retry_request(self, move |cluster| cluster.get_region_by_id(id, timeout)).await
retry!(self, "get_region_by_id", |cluster| async {
cluster
.get_region_by_id(id, self.timeout)
.await
.and_then(|resp| region_from_response(resp, || Error::region_not_found(id)))
})
}
pub async fn get_store(self: Arc<Self>, id: StoreId) -> Result<metapb::Store> {
let timeout = self.timeout;
retry_request(self, move |cluster| cluster.get_store(id, timeout)).await
retry!(self, "get_store", |cluster| async {
cluster
.get_store(id, self.timeout)
.await
.map(|mut resp| resp.take_store())
})
}
#[allow(dead_code)]
pub async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>> {
let timeout = self.timeout;
retry_request(self, move |cluster| cluster.get_all_stores(timeout)).await
retry!(self, "get_all_stores", |cluster| async {
cluster
.get_all_stores(self.timeout)
.await
.map(|mut resp| resp.take_stores().into_iter().map(Into::into).collect())
})
}
pub async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
retry_request(self, move |cluster| cluster.get_timestamp()).await
retry!(self, "get_timestamp", |cluster| cluster.get_timestamp())
}
}
@ -100,12 +151,19 @@ impl fmt::Debug for RetryClient {
}
}
fn region_from_response(
resp: pdpb::GetRegionResponse,
err: impl FnOnce() -> Error,
) -> Result<Region> {
let region = resp.region.ok_or_else(err)?;
Ok(Region::new(region, resp.leader))
}
// A node-like thing that can be connected to.
#[async_trait]
trait Reconnect {
type Cl;
async fn reconnect(&self, interval_sec: u64) -> Result<()>;
async fn with_cluster<T, F: Fn(&Self::Cl) -> T + Send + Sync>(&self, f: F) -> T;
}
#[async_trait]
@ -114,56 +172,22 @@ impl Reconnect for RetryClient<Cluster> {
async fn reconnect(&self, interval_sec: u64) -> Result<()> {
let reconnect_begin = Instant::now();
let write_lock = self.cluster.write().await;
let mut lock = self.cluster.write().await;
let (cluster, last_connected) = &mut *lock;
// If `last_connected + interval_sec` is larger or equal than reconnect_begin,
// a concurrent reconnect is just succeed when this thread trying to get write lock
let should_connect =
reconnect_begin > write_lock.last_connected + Duration::from_secs(interval_sec);
let should_connect = reconnect_begin > *last_connected + Duration::from_secs(interval_sec);
if should_connect {
self.connection.reconnect(write_lock, self.timeout).await?;
self.connection.reconnect(cluster, self.timeout).await?;
*last_connected = Instant::now();
}
Ok(())
}
async fn with_cluster<T, F: Fn(&Cluster) -> T + Send + Sync>(&self, f: F) -> T {
f(&*self.cluster.read().await)
}
}
async fn retry_request<Rc, Resp, Func, RespFuture>(client: Arc<Rc>, func: Func) -> Result<Resp>
where
Rc: Reconnect,
Resp: Send + 'static,
Func: Fn(&Rc::Cl) -> RespFuture + Send + Sync,
RespFuture: Future<Output = Result<Resp>> + Send + 'static,
{
let mut last_err = Ok(());
for _ in 0..LEADER_CHANGE_RETRY {
let fut = client.with_cluster(&func).await;
match fut.await {
Ok(r) => return Ok(r),
Err(e) => last_err = Err(e),
}
// Reconnect.
let mut reconnect_count = MAX_REQUEST_COUNT;
while let Err(e) = client.reconnect(RECONNECT_INTERVAL_SEC).await {
reconnect_count -= 1;
if reconnect_count == 0 {
return Err(e);
}
Delay::new(Duration::from_secs(RECONNECT_INTERVAL_SEC)).await;
}
}
last_err?;
unreachable!();
}
#[cfg(test)]
mod test {
use super::*;
use crate::Error;
use futures::{executor, future::ready};
use std::sync::Mutex;
@ -171,6 +195,7 @@ mod test {
fn test_reconnect() {
struct MockClient {
reconnect_count: Mutex<usize>,
cluster: RwLock<((), Instant)>,
}
#[async_trait]
@ -182,82 +207,101 @@ mod test {
// Not actually unimplemented, we just don't care about the error.
Err(Error::unimplemented())
}
async fn with_cluster<T, F: Fn(&Self::Cl) -> T + Send + Sync>(&self, f: F) -> T {
f(&())
}
}
let client = Arc::new(MockClient {
reconnect_count: Mutex::new(0),
});
fn ready_err(_: &()) -> impl Future<Output = Result<()>> + Send + 'static {
ready(Err(internal_err!("whoops")))
async fn retry_err(client: Arc<MockClient>) -> Result<()> {
retry!(client, "test", |_c| ready(Err(internal_err!("whoops"))))
}
let result = executor::block_on(retry_request(client.clone(), ready_err));
assert!(result.is_err());
assert_eq!(*client.reconnect_count.lock().unwrap(), MAX_REQUEST_COUNT);
async fn retry_ok(client: Arc<MockClient>) -> Result<()> {
retry!(client, "test", |_c| ready(Ok::<_, Error>(())))
}
*client.reconnect_count.lock().unwrap() = 0;
let result = executor::block_on(retry_request(client.clone(), |_| ready(Ok(()))));
assert!(result.is_ok());
assert_eq!(*client.reconnect_count.lock().unwrap(), 0);
executor::block_on(async {
let client = Arc::new(MockClient {
reconnect_count: Mutex::new(0),
cluster: RwLock::new(((), Instant::now())),
});
assert!(retry_err(client.clone()).await.is_err());
assert_eq!(*client.reconnect_count.lock().unwrap(), MAX_REQUEST_COUNT);
*client.reconnect_count.lock().unwrap() = 0;
assert!(retry_ok(client.clone()).await.is_ok());
assert_eq!(*client.reconnect_count.lock().unwrap(), 0);
})
}
#[test]
fn test_retry() {
struct MockClient {
retry_count: Mutex<usize>,
cluster: RwLock<(Mutex<usize>, Instant)>,
}
#[async_trait]
impl Reconnect for MockClient {
type Cl = ();
type Cl = Mutex<usize>;
async fn reconnect(&self, _: u64) -> Result<()> {
Ok(())
}
async fn with_cluster<T, F: Fn(&Self::Cl) -> T + Send + Sync>(&self, f: F) -> T {
*self.retry_count.lock().unwrap() += 1;
f(&())
}
}
let client = Arc::new(MockClient {
retry_count: Mutex::new(0),
});
let max_retries = Arc::new(Mutex::new(1000));
async fn retry_max_err(
client: Arc<MockClient>,
max_retries: Arc<Mutex<usize>>,
) -> Result<()> {
retry!(client, "test", |c| {
let mut c = c.lock().unwrap();
*c += 1;
let result = executor::block_on(retry_request(client.clone(), |_| {
let mut max_retries = max_retries.lock().unwrap();
*max_retries -= 1;
if *max_retries == 0 {
ready(Ok(()))
} else {
ready(Err(internal_err!("whoops")))
}
}));
assert!(result.is_err());
assert_eq!(*client.retry_count.lock().unwrap(), LEADER_CHANGE_RETRY);
let mut max_retries = max_retries.lock().unwrap();
*max_retries -= 1;
if *max_retries == 0 {
ready(Ok(()))
} else {
ready(Err(internal_err!("whoops")))
}
})
}
let client = Arc::new(MockClient {
retry_count: Mutex::new(0),
});
let max_retries = Arc::new(Mutex::new(2));
async fn retry_max_ok(
client: Arc<MockClient>,
max_retries: Arc<Mutex<usize>>,
) -> Result<()> {
retry!(client, "test", |c| {
let mut c = c.lock().unwrap();
*c += 1;
let result = executor::block_on(retry_request(client.clone(), |_| {
let mut max_retries = max_retries.lock().unwrap();
*max_retries -= 1;
if *max_retries == 0 {
ready(Ok(()))
} else {
ready(Err(internal_err!("whoops")))
}
}));
assert!(result.is_ok());
assert_eq!(*client.retry_count.lock().unwrap(), 2);
let mut max_retries = max_retries.lock().unwrap();
*max_retries -= 1;
if *max_retries == 0 {
ready(Ok(()))
} else {
ready(Err(internal_err!("whoops")))
}
})
}
executor::block_on(async {
let client = Arc::new(MockClient {
cluster: RwLock::new((Mutex::new(0), Instant::now())),
});
let max_retries = Arc::new(Mutex::new(1000));
assert!(retry_max_err(client.clone(), max_retries).await.is_err());
assert_eq!(
*client.cluster.read().await.0.lock().unwrap(),
LEADER_CHANGE_RETRY
);
let client = Arc::new(MockClient {
cluster: RwLock::new((Mutex::new(0), Instant::now())),
});
let max_retries = Arc::new(Mutex::new(2));
assert!(retry_max_ok(client.clone(), max_retries).await.is_ok());
assert_eq!(*client.cluster.read().await.0.lock().unwrap(), 2);
})
}
}

View File

@ -1,10 +1,10 @@
use crate::{pd::PdClient, request::KvRequest, transaction::requests};
use kvproto::kvrpcpb;
use crate::{pd::PdClient, request::KvRequest, transaction::requests, RegionVerId};
use kvproto::{kvrpcpb, pdpb::Timestamp};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tikv_client_common::{ErrorKind, Key, RegionVerId, Result, Timestamp};
use tikv_client_common::{ErrorKind, Key, Result, TimestampExt};
const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;

View File

@ -6,9 +6,9 @@ use crate::{
BoundRange, Error, Key, KvPair, Result, Value,
};
use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
use kvproto::{kvrpcpb, tikvpb::TikvClient};
use kvproto::{kvrpcpb, pdpb::Timestamp, tikvpb::TikvClient};
use std::{mem, sync::Arc};
use tikv_client_common::Timestamp;
use tikv_client_common::TimestampExt;
use tikv_client_store::{KvClient, RpcFnType, Store};
impl KvRequest for kvrpcpb::GetRequest {
@ -65,7 +65,7 @@ impl HasLocks for kvrpcpb::GetResponse {
pub fn new_mvcc_get_request(key: impl Into<Key>, timestamp: Timestamp) -> kvrpcpb::GetRequest {
let mut req = kvrpcpb::GetRequest::default();
req.set_key(key.into().into());
req.set_version(timestamp.into_version());
req.set_version(timestamp.version());
req
}
@ -118,7 +118,7 @@ pub fn new_mvcc_get_batch_request(
) -> kvrpcpb::BatchGetRequest {
let mut req = kvrpcpb::BatchGetRequest::default();
req.set_keys(keys.into_iter().map(Into::into).collect());
req.set_version(timestamp.into_version());
req.set_version(timestamp.version());
req
}
@ -177,7 +177,7 @@ pub fn new_mvcc_scan_request(
req.set_end_key(end_key.unwrap_or_default().into());
req.set_limit(limit);
req.set_key_only(key_only);
req.set_version(timestamp.into_version());
req.set_version(timestamp.version());
req
}

View File

@ -7,9 +7,9 @@ use crate::{
};
use derive_new::new;
use futures::{executor::ThreadPool, prelude::*, stream::BoxStream};
use kvproto::kvrpcpb;
use kvproto::{kvrpcpb, pdpb::Timestamp};
use std::{mem, ops::RangeBounds, sync::Arc};
use tikv_client_common::{BoundRange, Error, ErrorKind, Key, KvPair, Result, Timestamp, Value};
use tikv_client_common::{BoundRange, Error, ErrorKind, Key, KvPair, Result, TimestampExt, Value};
/// A undo-able set of actions on the dataset.
///
@ -67,7 +67,7 @@ impl Transaction {
let key = key.into();
self.buffer
.get_or_else(key, |key| {
new_mvcc_get_request(key, self.timestamp).execute(self.rpc.clone())
new_mvcc_get_request(key, self.timestamp.clone()).execute(self.rpc.clone())
})
.await
}
@ -95,7 +95,7 @@ impl Transaction {
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = (Key, Option<Value>)>> {
let timestamp = self.timestamp;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
self.buffer
.batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| {
@ -130,7 +130,7 @@ impl Transaction {
limit: u32,
key_only: bool,
) -> Result<impl Iterator<Item = KvPair>> {
let timestamp = self.timestamp;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let pairs = new_mvcc_scan_request(range, timestamp, limit, key_only)
.execute(rpc)
@ -217,7 +217,7 @@ impl Transaction {
pub async fn commit(&mut self) -> Result<()> {
TwoPhaseCommitter::new(
self.buffer.to_proto_mutations(),
self.timestamp.into_version(),
self.timestamp.version(),
self.bg_worker.clone(),
self.rpc.clone(),
)
@ -287,7 +287,7 @@ impl TwoPhaseCommitter {
/// Commits the primary key and returns the commit version
async fn commit_primary(&mut self) -> Result<u64> {
let primary_key = vec![self.mutations[0].key.clone().into()];
let commit_version = self.rpc.clone().get_timestamp().await?.into_version();
let commit_version = self.rpc.clone().get_timestamp().await?.version();
new_commit_request(primary_key, self.start_version, commit_version)
.execute(self.rpc.clone())
.inspect_err(|e| {

View File

@ -4,11 +4,9 @@ pub mod compat;
pub mod config;
pub mod errors;
pub mod kv;
pub mod region;
pub mod security;
pub mod stats;
pub mod store_builder;
pub mod timestamp;
mod timestamp;
#[macro_use]
extern crate lazy_static;
@ -28,8 +26,4 @@ pub use crate::errors::Result;
#[doc(inline)]
pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value};
#[doc(inline)]
pub use crate::region::{Region, RegionId, RegionVerId, StoreId};
#[doc(inline)]
pub use crate::store_builder::StoreBuilder;
#[doc(inline)]
pub use crate::timestamp::Timestamp;
pub use crate::timestamp::{Timestamp, TimestampExt};

View File

@ -1,27 +1,28 @@
//! A timestamp returned from the timestamp oracle.
//!
//! The version used in transactions can be converted from a timestamp.
//! The lower 18 (PHYSICAL_SHIFT_BITS) bits are the logical part of the timestamp.
//! The higher bits of the version are the physical part of the timestamp.
pub use kvproto::pdpb::Timestamp;
use std::convert::TryInto;
const PHYSICAL_SHIFT_BITS: i64 = 18;
const LOGICAL_MASK: i64 = (1 << PHYSICAL_SHIFT_BITS) - 1;
/// A timestamp returned from the timestamp oracle.
///
/// The version used in transactions can be converted from a timestamp.
/// The lower 18 (PHYSICAL_SHIFT_BITS) bits are the logical part of the timestamp.
/// The higher bits of the version are the physical part of the timestamp.
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub struct Timestamp {
pub physical: i64,
pub logical: i64,
pub trait TimestampExt {
fn version(&self) -> u64;
fn from_version(version: u64) -> Self;
}
impl Timestamp {
pub fn into_version(self) -> u64 {
impl TimestampExt for Timestamp {
fn version(&self) -> u64 {
((self.physical << PHYSICAL_SHIFT_BITS) + self.logical)
.try_into()
.expect("Overflow converting timestamp to version")
}
pub fn from_version(version: u64) -> Self {
fn from_version(version: u64) -> Self {
let version = version as i64;
Self {
physical: version >> PHYSICAL_SHIFT_BITS,

View File

@ -3,21 +3,20 @@ name = "tikv-client-pd"
version = "0.0.0"
edition = "2018"
[dependencies]
async-trait = "0.1"
derive-new = "0.5"
kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "1e28226154c374788f38d3a542fc505cd74720f3", features = [ "prost-codec" ], default-features = false }
futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] }
tokio = { version = "0.2", features = ["sync"] }
grpcio = { version = "0.6", features = [ "secure", "prost-codec" ], default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "1e28226154c374788f38d3a542fc505cd74720f3", features = [ "prost-codec" ], default-features = false }
log = "0.4"
tikv-client-common = { path = "../tikv-client-common" }
tokio = { version = "0.2", features = ["sync"] }
[dev-dependencies]
clap = "2.32"
tempdir = "0.3"
tokio = { version = "0.2", features = ["rt-threaded", "macros"] }
fail = { version = "0.3", features = [ "failpoints" ] }
proptest = "0.9"
proptest-derive = "0.1.0"
fail = { version = "0.3", features = [ "failpoints" ] }
tempdir = "0.3"
tokio = { version = "0.2", features = ["rt-threaded", "macros"] }

View File

@ -1,21 +1,23 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
// FIXME: Remove this when txn is done.
#![allow(dead_code)]
use crate::timestamp::TimestampOracle;
use futures::prelude::*;
use crate::{timestamp::TimestampOracle, PdResponse};
use async_trait::async_trait;
use grpcio::{CallOption, Environment};
use kvproto::{metapb, pdpb};
use kvproto::pdpb::{self, Timestamp};
use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};
use tikv_client_common::{
security::SecurityManager, stats::pd_stats, Error, Region, RegionId, Result, StoreId, Timestamp,
};
use tokio::sync::RwLockWriteGuard;
use tikv_client_common::{internal_err, security::SecurityManager, Error, Result};
/// A PD cluster.
pub struct Cluster {
id: u64,
client: pdpb::PdClient,
members: pdpb::GetMembersResponse,
tso: TimestampOracle,
}
macro_rules! pd_request {
($cluster_id:expr, $type:ty) => {{
@ -27,127 +29,41 @@ macro_rules! pd_request {
}};
}
/// A PD cluster.
pub struct Cluster {
pub id: u64,
pub last_connected: Instant,
pub(super) client: pdpb::PdClient,
members: pdpb::GetMembersResponse,
tso: TimestampOracle,
}
// These methods make a single attempt to make a request.
impl Cluster {
pub fn get_region(
pub async fn get_region(
&self,
key: Vec<u8>,
timeout: Duration,
) -> impl Future<Output = Result<Region>> {
let context = pd_stats("get_region");
let option = CallOption::default().timeout(timeout);
) -> Result<pdpb::GetRegionResponse> {
let mut req = pd_request!(self.id, pdpb::GetRegionRequest);
req.set_region_key(key.clone());
self.client
.get_region_async_opt(&req, option)
.unwrap()
.map(move |r| context.done(r.map_err(|e| e.into())))
.and_then(move |resp| {
if resp.get_header().has_error() {
return future::ready(Err(internal_err!(resp
.get_header()
.get_error()
.get_message())));
}
let region = resp
.region
.ok_or_else(|| Error::region_for_key_not_found(key));
let leader = resp.leader;
future::ready(region.map(move |r| Region::new(r, leader)))
})
req.send(&self.client, timeout).await
}
pub fn get_region_by_id(
pub async fn get_region_by_id(
&self,
id: RegionId,
id: u64,
timeout: Duration,
) -> impl Future<Output = Result<Region>> {
let context = pd_stats("get_region_by_id");
let option = CallOption::default().timeout(timeout);
) -> Result<pdpb::GetRegionResponse> {
let mut req = pd_request!(self.id, pdpb::GetRegionByIdRequest);
req.set_region_id(id);
self.client
.get_region_by_id_async_opt(&req, option)
.unwrap()
.map(move |r| context.done(r.map_err(|e| e.into())))
.and_then(move |resp| {
if resp.get_header().has_error() {
return future::ready(Err(internal_err!(resp
.get_header()
.get_error()
.get_message())));
}
let region = resp.region.ok_or_else(|| Error::region_not_found(id));
let leader = resp.leader;
future::ready(region.map(move |r| Region::new(r, leader)))
})
req.send(&self.client, timeout).await
}
pub fn get_store(
&self,
id: StoreId,
timeout: Duration,
) -> impl Future<Output = Result<metapb::Store>> {
let context = pd_stats("get_store");
let option = CallOption::default().timeout(timeout);
pub async fn get_store(&self, id: u64, timeout: Duration) -> Result<pdpb::GetStoreResponse> {
let mut req = pd_request!(self.id, pdpb::GetStoreRequest);
req.set_store_id(id);
self.client
.get_store_async_opt(&req, option)
.unwrap()
.map(move |r| context.done(r.map_err(|e| e.into())))
.and_then(|mut resp| {
if resp.get_header().has_error() {
return future::ready(Err(internal_err!(resp
.get_header()
.get_error()
.get_message())));
}
future::ready(Ok(resp.take_store()))
})
req.send(&self.client, timeout).await
}
pub fn get_all_stores(
&self,
timeout: Duration,
) -> impl Future<Output = Result<Vec<metapb::Store>>> {
let context = pd_stats("get_all_stores");
let option = CallOption::default().timeout(timeout);
pub async fn get_all_stores(&self, timeout: Duration) -> Result<pdpb::GetAllStoresResponse> {
let req = pd_request!(self.id, pdpb::GetAllStoresRequest);
self.client
.get_all_stores_async_opt(&req, option)
.unwrap()
.map(move |r| context.done(r.map_err(|e| e.into())))
.and_then(|mut resp| {
if resp.get_header().has_error() {
return future::ready(Err(internal_err!(resp
.get_header()
.get_error()
.get_message())));
}
future::ready(Ok(resp.take_stores().into_iter().map(Into::into).collect()))
})
req.send(&self.client, timeout).await
}
pub fn get_timestamp(&self) -> impl Future<Output = Result<Timestamp>> {
self.tso.clone().get_timestamp()
pub async fn get_timestamp(&self) -> Result<Timestamp> {
self.tso.clone().get_timestamp().await
}
}
@ -174,7 +90,6 @@ impl Connection {
let tso = TimestampOracle::new(id, &client)?;
let cluster = Cluster {
id,
last_connected: Instant::now(),
members,
client,
tso,
@ -183,28 +98,19 @@ impl Connection {
}
// Re-establish connection with PD leader in asynchronous fashion.
pub async fn reconnect(
&self,
mut cluster_guard: RwLockWriteGuard<'_, Cluster>,
timeout: Duration,
) -> Result<()> {
pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> {
warn!("updating pd client");
let start = Instant::now();
let (client, members) = self
.try_connect_leader(&cluster_guard.members, timeout)
.await?;
let tso = TimestampOracle::new(cluster_guard.id, &client)?;
let last_connected = Instant::now();
let cluster = Cluster {
id: cluster_guard.id,
last_connected,
let (client, members) = self.try_connect_leader(&cluster.members, timeout).await?;
let tso = TimestampOracle::new(cluster.id, &client)?;
*cluster = Cluster {
id: cluster.id,
client,
members,
tso,
};
warn!("updating PD client done, spent {:?}", start.elapsed());
*cluster_guard = cluster;
info!("updating PD client done, spent {:?}", start.elapsed());
Ok(())
}
@ -226,7 +132,7 @@ impl Connection {
Ok(resp) => resp,
// Ignore failed PD node.
Err(e) => {
error!("PD endpoint {} failed to respond: {:?}", ep, e);
warn!("PD endpoint {} failed to respond: {:?}", ep, e);
continue;
}
};
@ -349,3 +255,59 @@ impl Connection {
Err(internal_err!("failed to connect to {:?}", members))
}
}
type GrpcResult<T> = std::result::Result<T, grpcio::Error>;
#[async_trait]
trait PdMessage {
type Response: PdResponse;
async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response>;
async fn send(&self, client: &pdpb::PdClient, timeout: Duration) -> Result<Self::Response> {
let option = CallOption::default().timeout(timeout);
let response = self.rpc(client, option).await?;
if response.header().has_error() {
Err(internal_err!(response.header().get_error().get_message()))
} else {
Ok(response)
}
}
}
#[async_trait]
impl PdMessage for pdpb::GetRegionRequest {
type Response = pdpb::GetRegionResponse;
async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response> {
client.get_region_async_opt(self, opt).unwrap().await
}
}
#[async_trait]
impl PdMessage for pdpb::GetRegionByIdRequest {
type Response = pdpb::GetRegionResponse;
async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response> {
client.get_region_by_id_async_opt(self, opt).unwrap().await
}
}
#[async_trait]
impl PdMessage for pdpb::GetStoreRequest {
type Response = pdpb::GetStoreResponse;
async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response> {
client.get_store_async_opt(self, opt).unwrap().await
}
}
#[async_trait]
impl PdMessage for pdpb::GetAllStoresRequest {
type Response = pdpb::GetAllStoresResponse;
async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response> {
client.get_all_stores_async_opt(self, opt).unwrap().await
}
}

View File

@ -1,15 +1,12 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
pub use cluster::Cluster;
pub use cluster::{Cluster, Connection};
use kvproto::pdpb;
#[macro_use]
extern crate tikv_client_common;
#[macro_use]
extern crate log;
#[macro_use]
pub mod cluster;
mod cluster;
mod timestamp;
trait PdResponse {

View File

@ -21,7 +21,7 @@ use futures::{
use grpcio::WriteFlags;
use kvproto::pdpb::*;
use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, thread};
use tikv_client_common::{Error, Result, Timestamp};
use tikv_client_common::{Error, Result};
/// It is an empirical value.
const MAX_BATCH_SIZE: usize = 64;
@ -33,7 +33,7 @@ type TimestampRequest = oneshot::Sender<Timestamp>;
/// The timestamp oracle (TSO) which provides monotonically increasing timestamps.
#[derive(Clone)]
pub struct TimestampOracle {
pub(crate) struct TimestampOracle {
/// The transmitter of a bounded channel which transports requests of getting a single
/// timestamp to the TSO working thread. A bounded channel is used to prevent using
/// too much memory unexpectedly.
@ -43,7 +43,7 @@ pub struct TimestampOracle {
}
impl TimestampOracle {
pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result<TimestampOracle> {
pub(crate) fn new(cluster_id: u64, pd_client: &PdClient) -> Result<TimestampOracle> {
let (request_tx, request_rx) = mpsc::channel(MAX_BATCH_SIZE);
// FIXME: use tso_opt
let (rpc_sender, rpc_receiver) = pd_client.tso()?;
@ -61,7 +61,7 @@ impl TimestampOracle {
Ok(TimestampOracle { request_tx })
}
pub async fn get_timestamp(mut self) -> Result<Timestamp> {
pub(crate) async fn get_timestamp(mut self) -> Result<Timestamp> {
let (request, response) = oneshot::channel();
self.request_tx
.send(request)

View File

@ -4,16 +4,21 @@
extern crate log;
mod errors;
pub mod region;
mod store_builder;
pub use crate::errors::{HasError, HasRegionError};
#[doc(inline)]
pub use crate::region::{Region, RegionId, RegionVerId, StoreId};
#[doc(inline)]
pub use crate::store_builder::StoreBuilder;
pub use kvproto::tikvpb::TikvClient;
pub use self::errors::{HasError, HasRegionError};
use derive_new::new;
use futures::{future::BoxFuture, prelude::*};
use grpcio::{CallOption, Environment};
pub use kvproto::tikvpb::TikvClient;
use std::{sync::Arc, time::Duration};
use tikv_client_common::{
security::SecurityManager, stats::tikv_stats, ErrorKind, Region, Result, StoreBuilder,
};
use tikv_client_common::{security::SecurityManager, stats::tikv_stats, ErrorKind, Result};
/// A trait for connecting to TiKV stores.
pub trait KvConnect: Sized + Send + Sync + 'static {

View File

@ -1,6 +1,6 @@
use crate::{Error, Key, Result};
use derive_new::new;
use kvproto::{kvrpcpb, metapb};
use tikv_client_common::{Error, Key, Result};
pub type RegionId = u64;
pub type StoreId = u64;