mirror of https://github.com/tikv/client-rust.git
prevent too frequent reconnect
Signed-off-by: Renkai <gaelookair@gmail.com>
This commit is contained in:
parent
9c9abfee0d
commit
601b2442a9
|
@ -20,6 +20,7 @@ use crate::{
|
|||
transaction::Timestamp,
|
||||
Result,
|
||||
};
|
||||
use std::ops::Add;
|
||||
use std::time::Instant;
|
||||
|
||||
// FIXME: these numbers and how they are used are all just cargo-culted in, there
|
||||
|
@ -110,7 +111,7 @@ impl fmt::Debug for RetryClient {
|
|||
#[async_trait]
|
||||
trait Reconnect {
|
||||
type Cl;
|
||||
async fn reconnect(&self) -> Result<()>;
|
||||
async fn reconnect(&self, interval_sec: u64) -> Result<()>;
|
||||
async fn with_cluster<T, F: Fn(&Self::Cl) -> T + Send + Sync>(&self, f: F) -> T;
|
||||
}
|
||||
|
||||
|
@ -118,12 +119,15 @@ trait Reconnect {
|
|||
impl Reconnect for RetryClient<Cluster> {
|
||||
type Cl = Cluster;
|
||||
|
||||
async fn reconnect(&self) -> Result<()> {
|
||||
async fn reconnect(&self, interval_sec: u64) -> Result<()> {
|
||||
let reconnect_begin = Instant::now();
|
||||
let write_lock = self.cluster.write().await;
|
||||
// If last_update is larger or equal than reconnect_begin,
|
||||
// If `last_connected + interval_sec` is larger or equal than reconnect_begin,
|
||||
// a concurrent reconnect is just succeed when this thread trying to get write lock
|
||||
let should_connect = reconnect_begin > write_lock.last_connected;
|
||||
let should_connect = reconnect_begin
|
||||
> write_lock
|
||||
.last_connected
|
||||
.add(Duration::from_secs(interval_sec));
|
||||
if should_connect {
|
||||
self.connection.reconnect(write_lock, self.timeout).await?;
|
||||
Ok(())
|
||||
|
@ -154,7 +158,7 @@ where
|
|||
|
||||
// Reconnect.
|
||||
let mut reconnect_count = MAX_REQUEST_COUNT;
|
||||
while let Err(e) = client.reconnect().await {
|
||||
while let Err(e) = client.reconnect(RECONNECT_INTERVAL_SEC).await {
|
||||
reconnect_count -= 1;
|
||||
if reconnect_count == 0 {
|
||||
return Err(e);
|
||||
|
@ -185,7 +189,7 @@ mod test {
|
|||
impl Reconnect for MockClient {
|
||||
type Cl = ();
|
||||
|
||||
async fn reconnect(&self) -> Result<()> {
|
||||
async fn reconnect(&self, _: u64) -> Result<()> {
|
||||
*self.reconnect_count.lock().unwrap() += 1;
|
||||
// Not actually unimplemented, we just don't care about the error.
|
||||
Err(Error::unimplemented())
|
||||
|
@ -224,7 +228,7 @@ mod test {
|
|||
impl Reconnect for MockClient {
|
||||
type Cl = ();
|
||||
|
||||
async fn reconnect(&self) -> Result<()> {
|
||||
async fn reconnect(&self, _: u64) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue