Add delay if all locks are not resolved

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2019-08-27 00:44:12 +08:00
parent 16a2abc9a2
commit a6505e5bbb
No known key found for this signature in database
GPG Key ID: 353E7ED34BF326E0
3 changed files with 19 additions and 4 deletions

View File

@ -29,6 +29,7 @@ regex = "1"
serde = "1.0"
serde_derive = "1.0"
tokio-timer = "0.2"
futures-timer = "0.3"
[dependencies.prometheus]
version = "0.4.2"

View File

@ -13,6 +13,9 @@ use futures::stream::BoxStream;
use grpcio::CallOption;
use kvproto::kvrpcpb;
use std::sync::Arc;
use std::time::Duration;
const LOCK_RETRY_DELAY_MS: u64 = 10;
pub trait KvRequest: Sync + Send + 'static + Sized {
type Result;
@ -58,6 +61,11 @@ pub trait KvRequest: Sync + Send + 'static + Sized {
if !locks.is_empty() {
let pd_client = pd_client.clone();
return resolve_locks(locks, pd_client.clone())
.map_ok(|resolved| {
// TODO: backoff
let delay_ms = if resolved { 0 } else { LOCK_RETRY_DELAY_MS };
futures_timer::Delay::new(Duration::from_millis(delay_ms))
})
.map_ok(move |_| request.response_stream(pd_client))
.try_flatten_stream()
.boxed();

View File

@ -9,7 +9,7 @@ use std::sync::Arc;
const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;
/// _Resolves_ the given locks.
/// _Resolves_ the given locks. Returns whether all the given locks are resolved.
///
/// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock,
/// which means the key is finally either committed or rolled back, before we read the value of
@ -19,10 +19,16 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;
pub async fn resolve_locks(
locks: Vec<kvrpcpb::LockInfo>,
pd_client: Arc<impl PdClient>,
) -> Result<()> {
) -> Result<bool> {
let ts = pd_client.clone().get_timestamp().await?;
let mut has_live_locks = false;
let expired_locks = locks.into_iter().filter(|lock| {
ts.physical - Timestamp::from_version(lock.lock_version).physical >= lock.lock_ttl as i64
let expired = ts.physical - Timestamp::from_version(lock.lock_version).physical
>= lock.lock_ttl as i64;
if !expired {
has_live_locks = true;
}
expired
});
// records the commit version of each primary lock (representing the status of the transaction)
@ -63,7 +69,7 @@ pub async fn resolve_locks(
.or_insert_with(HashSet::new)
.insert(cleaned_region);
}
Ok(())
Ok(has_live_locks)
}
async fn resolve_lock_with_retry(