polish code

Signed-off-by: Renkai <gaelookair@gmail.com>
This commit is contained in:
Renkai 2020-05-20 16:43:07 +08:00
parent 5d8ea81bda
commit 570ebc3f41
4 changed files with 16 additions and 25 deletions

View File

@ -31,7 +31,7 @@ serde = "1.0"
serde_derive = "1.0"
futures-timer = "3.0"
async-trait = "0.1.27"
tokio = { version = "0.2", features = ["sync" ] }
tokio = { version = "0.2", features = ["sync"] }
[dependencies.prometheus]
version = "0.8"

View File

@ -87,7 +87,7 @@ async fn main() -> Result<()> {
let end = "k2";
let pairs = client
.with_key_only(true)
.scan((start..=end).to_owned(), 10 as u32)
.scan((start..=end).to_owned(), 10)
.await
.expect("Could not scan");

View File

@ -37,7 +37,7 @@ pub struct Cluster {
pub id: u64,
pub last_connected: Instant,
pub(super) client: pdpb::PdClient,
pub members: pdpb::GetMembersResponse,
members: pdpb::GetMembersResponse,
tso: TimestampOracle,
}
@ -191,28 +191,21 @@ impl Connection {
Ok(cluster)
}
// Re-establish connection with PD leader in synchronized fashion.
// Re-establish connection with PD leader in asynchronous fashion.
pub async fn reconnect(
&self,
mut cluster_guard: RwLockWriteGuard<'_, Cluster>,
interval: u64,
timeout: Duration,
) -> Result<()> {
if cluster_guard.last_connected.elapsed() < Duration::from_secs(interval) {
// Avoid unnecessary updating.
return Ok(());
}
let cluster_id = cluster_guard.id;
let previous_members = &cluster_guard.members;
warn!("updating pd client, blocking the tokio core");
warn!("updating pd client");
let start = Instant::now();
let (client, members) = self.try_connect_leader(previous_members, timeout).await?;
let tso = TimestampOracle::new(cluster_id, &client)?;
let (client, members) = self
.try_connect_leader(&cluster_guard.members, timeout)
.await?;
let tso = TimestampOracle::new(cluster_guard.id, &client)?;
let last_connected = Instant::now();
let cluster = Cluster {
id: cluster_id,
id: cluster_guard.id,
last_connected,
client,
members,

View File

@ -110,7 +110,7 @@ impl fmt::Debug for RetryClient {
#[async_trait]
trait Reconnect {
type Cl;
async fn reconnect(&self, interval: u64) -> Result<()>;
async fn reconnect(&self) -> Result<()>;
async fn with_cluster<T, F: Fn(&Self::Cl) -> T + Send + Sync>(&self, f: F) -> T;
}
@ -118,16 +118,14 @@ trait Reconnect {
impl Reconnect for RetryClient<Cluster> {
type Cl = Cluster;
async fn reconnect(&self, interval: u64) -> Result<()> {
async fn reconnect(&self) -> Result<()> {
let reconnect_begin = Instant::now();
let write_lock = self.cluster.write().await;
// If last_update is larger or equal than reconnect_begin,
// a concurrent reconnect is just succeed when this thread trying to get write lock
let should_connect = reconnect_begin > write_lock.last_connected;
if should_connect {
self.connection
.reconnect(write_lock, interval, self.timeout)
.await?;
self.connection.reconnect(write_lock, self.timeout).await?;
Ok(())
} else {
Ok(())
@ -156,7 +154,7 @@ where
// Reconnect.
let mut reconnect_count = MAX_REQUEST_COUNT;
while let Err(e) = client.reconnect(RECONNECT_INTERVAL_SEC).await {
while let Err(e) = client.reconnect().await {
reconnect_count -= 1;
if reconnect_count == 0 {
return Err(e);
@ -187,7 +185,7 @@ mod test {
impl Reconnect for MockClient {
type Cl = ();
async fn reconnect(&self, _: u64) -> Result<()> {
async fn reconnect(&self) -> Result<()> {
*self.reconnect_count.lock().unwrap() += 1;
// Not actually unimplemented, we just don't care about the error.
Err(Error::unimplemented())
@ -226,7 +224,7 @@ mod test {
impl Reconnect for MockClient {
type Cl = ();
async fn reconnect(&self, _: u64) -> Result<()> {
async fn reconnect(&self) -> Result<()> {
Ok(())
}