mirror of https://github.com/tikv/client-rust.git
GC: initial implementation (#182)
Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
parent
540b50ace4
commit
9ca9aa79c6
|
@ -146,6 +146,10 @@ impl PdClient for MockPdClient {
|
|||
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn update_safepoint(self: Arc<Self>, _safepoint: u64) -> BoxFuture<'static, Result<bool>> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl DispatchHook for kvrpcpb::ResolveLockRequest {
|
||||
|
|
|
@ -59,6 +59,8 @@ pub trait PdClient: Send + Sync + 'static {
|
|||
|
||||
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>>;
|
||||
|
||||
fn update_safepoint(self: Arc<Self>, safepoint: u64) -> BoxFuture<'static, Result<bool>>;
|
||||
|
||||
// In transactional API, `key` is in raw format
|
||||
fn store_for_key(
|
||||
self: Arc<Self>,
|
||||
|
@ -255,6 +257,10 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
|
|||
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
|
||||
self.pd.clone().get_timestamp().boxed()
|
||||
}
|
||||
|
||||
fn update_safepoint(self: Arc<Self>, safepoint: u64) -> BoxFuture<'static, Result<bool>> {
|
||||
self.pd.clone().update_safepoint(safepoint).boxed()
|
||||
}
|
||||
}
|
||||
|
||||
impl PdRpcClient<TikvConnect, Cluster> {
|
||||
|
|
|
@ -142,6 +142,15 @@ impl RetryClient<Cluster> {
|
|||
pub async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
|
||||
retry!(self, "get_timestamp", |cluster| cluster.get_timestamp())
|
||||
}
|
||||
|
||||
pub async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
|
||||
retry!(self, "update_gc_safepoint", |cluster| async {
|
||||
cluster
|
||||
.update_safepoint(safepoint, self.timeout)
|
||||
.await
|
||||
.map(|resp| resp.get_new_safe_point() == safepoint)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for RetryClient {
|
||||
|
|
|
@ -283,6 +283,7 @@ impl_kv_rpc_request!(CleanupRequest);
|
|||
impl_kv_rpc_request!(BatchGetRequest);
|
||||
impl_kv_rpc_request!(BatchRollbackRequest);
|
||||
impl_kv_rpc_request!(ResolveLockRequest);
|
||||
impl_kv_rpc_request!(ScanLockRequest);
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
|
|
@ -1,14 +1,17 @@
|
|||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||
|
||||
use super::{requests::new_scan_lock_request, resolve_locks};
|
||||
use crate::{
|
||||
pd::PdRpcClient,
|
||||
pd::{PdClient, PdRpcClient},
|
||||
request::KvRequest,
|
||||
transaction::{Snapshot, Transaction},
|
||||
};
|
||||
|
||||
use crate::pd::PdClient;
|
||||
use futures::executor::ThreadPool;
|
||||
use std::sync::Arc;
|
||||
use tikv_client_common::{Config, Result, Timestamp};
|
||||
use kvproto::kvrpcpb;
|
||||
use std::{mem, sync::Arc};
|
||||
use tikv_client_common::{Config, Result, Timestamp, TimestampExt};
|
||||
|
||||
const SCAN_LOCK_BATCH_SIZE: u32 = 1024; // TODO: cargo-culted value
|
||||
|
||||
/// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster.
|
||||
pub struct Client {
|
||||
|
@ -88,6 +91,48 @@ impl Client {
|
|||
self.pd.clone().get_timestamp().await
|
||||
}
|
||||
|
||||
/// Cleans stale MVCC records in TiKV.
|
||||
///
|
||||
/// It is done by:
|
||||
/// 1. resolve all locks with ts <= safepoint
|
||||
/// 2. update safepoint to PD
|
||||
///
|
||||
/// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview).
|
||||
/// We omit the second step "delete ranges" which is an optimization for TiDB.
|
||||
pub async fn gc(&self, safepoint: Timestamp) -> Result<bool> {
|
||||
// scan all locks with ts <= safepoint
|
||||
let mut locks: Vec<kvrpcpb::LockInfo> = vec![];
|
||||
let mut start_key = vec![];
|
||||
loop {
|
||||
let req = new_scan_lock_request(
|
||||
mem::take(&mut start_key),
|
||||
safepoint.clone(),
|
||||
SCAN_LOCK_BATCH_SIZE,
|
||||
);
|
||||
let res: Vec<kvrpcpb::LockInfo> = req.execute(self.pd.clone()).await?;
|
||||
if res.is_empty() {
|
||||
break;
|
||||
}
|
||||
start_key = res.last().unwrap().key.clone();
|
||||
start_key.push(0);
|
||||
locks.extend(res);
|
||||
}
|
||||
|
||||
// resolve locks
|
||||
resolve_locks(locks, self.pd.clone()).await?;
|
||||
|
||||
// update safepoint to PD
|
||||
let res: bool = self
|
||||
.pd
|
||||
.clone()
|
||||
.update_safepoint(safepoint.version())
|
||||
.await?;
|
||||
if !res {
|
||||
info!("new safepoint != user-specified safepoint");
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn new_transaction(&self, timestamp: Timestamp) -> Transaction {
|
||||
Transaction::new(
|
||||
timestamp,
|
||||
|
|
|
@ -475,7 +475,63 @@ pub fn new_batch_rollback_request(
|
|||
req
|
||||
}
|
||||
|
||||
impl KvRequest for kvrpcpb::ScanLockRequest {
|
||||
type Result = Vec<kvrpcpb::LockInfo>;
|
||||
|
||||
type RpcResponse = kvrpcpb::ScanLockResponse;
|
||||
|
||||
type KeyData = (Key, Key); // end_key should always be empty. Used to satisfy `store_stream_for_range`
|
||||
|
||||
const REQUEST_NAME: &'static str = "kv_scan_lock";
|
||||
|
||||
const RPC_FN: RpcFnType<Self, Self::RpcResponse> = TikvClient::kv_scan_lock_async_opt;
|
||||
|
||||
fn store_stream<PdC: PdClient>(
|
||||
&mut self,
|
||||
pd_client: Arc<PdC>,
|
||||
) -> BoxStream<'static, Result<(Self::KeyData, Store<PdC::KvClient>)>> {
|
||||
let start_key = mem::take(&mut self.start_key);
|
||||
let range = BoundRange::from((start_key, vec![]));
|
||||
store_stream_for_range(range, pd_client)
|
||||
}
|
||||
|
||||
fn make_rpc_request<KvC: KvClient>(
|
||||
&self,
|
||||
(start_key, _): Self::KeyData,
|
||||
store: &Store<KvC>,
|
||||
) -> Self {
|
||||
let mut req = self.request_from_store(store);
|
||||
req.set_max_version(self.max_version);
|
||||
req.set_start_key(start_key.into());
|
||||
req.set_limit(self.limit);
|
||||
req
|
||||
}
|
||||
|
||||
fn map_result(mut result: Self::RpcResponse) -> Self::Result {
|
||||
result.take_locks()
|
||||
}
|
||||
|
||||
fn reduce(
|
||||
results: BoxStream<'static, Result<Self::Result>>,
|
||||
) -> BoxFuture<'static, Result<Self::Result>> {
|
||||
results.try_concat().boxed()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_scan_lock_request(
|
||||
start_key: impl Into<Key>,
|
||||
safepoint: Timestamp,
|
||||
limit: u32,
|
||||
) -> kvrpcpb::ScanLockRequest {
|
||||
let mut req = kvrpcpb::ScanLockRequest::default();
|
||||
req.set_start_key(start_key.into().into());
|
||||
req.set_max_version(safepoint.version());
|
||||
req.set_limit(limit);
|
||||
req
|
||||
}
|
||||
|
||||
impl HasLocks for kvrpcpb::CommitResponse {}
|
||||
impl HasLocks for kvrpcpb::CleanupResponse {}
|
||||
impl HasLocks for kvrpcpb::BatchRollbackResponse {}
|
||||
impl HasLocks for kvrpcpb::ResolveLockResponse {}
|
||||
impl HasLocks for kvrpcpb::ScanLockResponse {}
|
||||
|
|
|
@ -438,6 +438,17 @@ async fn raw_req() -> Fallible<()> {
|
|||
Fallible::Ok(())
|
||||
}
|
||||
|
||||
/// Only checks if we successfully update safepoint to PD.
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_update_safepoint() -> Fallible<()> {
|
||||
clear_tikv().await?;
|
||||
let config = Config::new(pd_addrs());
|
||||
let client = TransactionClient::new(config).await?;
|
||||
let res = client.gc(client.current_timestamp().await?).await?;
|
||||
assert!(res);
|
||||
Fallible::Ok(())
|
||||
}
|
||||
/// Tests raw API when there are multiple regions.
|
||||
/// Write large volumes of data to enforce region splitting.
|
||||
/// In order to test `scan`, data is uniformly inserted.
|
||||
|
|
|
@ -65,6 +65,16 @@ impl Cluster {
|
|||
pub async fn get_timestamp(&self) -> Result<Timestamp> {
|
||||
self.tso.clone().get_timestamp().await
|
||||
}
|
||||
|
||||
pub async fn update_safepoint(
|
||||
&self,
|
||||
safepoint: u64,
|
||||
timeout: Duration,
|
||||
) -> Result<pdpb::UpdateGcSafePointResponse> {
|
||||
let mut req = pd_request!(self.id, pdpb::UpdateGcSafePointRequest);
|
||||
req.set_safe_point(safepoint);
|
||||
req.send(&self.client, timeout).await
|
||||
}
|
||||
}
|
||||
|
||||
/// An object for connecting and reconnecting to a PD cluster.
|
||||
|
@ -310,3 +320,15 @@ impl PdMessage for pdpb::GetAllStoresRequest {
|
|||
client.get_all_stores_async_opt(self, opt).unwrap().await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PdMessage for pdpb::UpdateGcSafePointRequest {
|
||||
type Response = pdpb::UpdateGcSafePointResponse;
|
||||
|
||||
async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response> {
|
||||
client
|
||||
.update_gc_safe_point_async_opt(self, opt)
|
||||
.unwrap()
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,3 +30,9 @@ impl PdResponse for pdpb::GetAllStoresResponse {
|
|||
self.get_header()
|
||||
}
|
||||
}
|
||||
|
||||
impl PdResponse for pdpb::UpdateGcSafePointResponse {
|
||||
fn header(&self) -> &pdpb::ResponseHeader {
|
||||
self.get_header()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue