From 9ca9aa79c6e28a878e9ee9574fd96bbc2688ccea Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 13 Oct 2020 16:22:56 +0800 Subject: [PATCH] GC: initial implementation (#182) Signed-off-by: ekexium --- src/mock.rs | 4 +++ src/pd/client.rs | 6 ++++ src/pd/retry.rs | 9 ++++++ src/request.rs | 1 + src/transaction/client.rs | 55 ++++++++++++++++++++++++++++++---- src/transaction/requests.rs | 56 +++++++++++++++++++++++++++++++++++ tests/integration_tests.rs | 11 +++++++ tikv-client-pd/src/cluster.rs | 22 ++++++++++++++ tikv-client-pd/src/lib.rs | 6 ++++ 9 files changed, 165 insertions(+), 5 deletions(-) diff --git a/src/mock.rs b/src/mock.rs index 257c8e8..d77b639 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -146,6 +146,10 @@ impl PdClient for MockPdClient { fn get_timestamp(self: Arc) -> BoxFuture<'static, Result> { unimplemented!() } + + fn update_safepoint(self: Arc, _safepoint: u64) -> BoxFuture<'static, Result> { + unimplemented!() + } } impl DispatchHook for kvrpcpb::ResolveLockRequest { diff --git a/src/pd/client.rs b/src/pd/client.rs index 715a111..6ef7288 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -59,6 +59,8 @@ pub trait PdClient: Send + Sync + 'static { fn get_timestamp(self: Arc) -> BoxFuture<'static, Result>; + fn update_safepoint(self: Arc, safepoint: u64) -> BoxFuture<'static, Result>; + // In transactional API, `key` is in raw format fn store_for_key( self: Arc, @@ -255,6 +257,10 @@ impl PdClient for PdRpcClient { fn get_timestamp(self: Arc) -> BoxFuture<'static, Result> { self.pd.clone().get_timestamp().boxed() } + + fn update_safepoint(self: Arc, safepoint: u64) -> BoxFuture<'static, Result> { + self.pd.clone().update_safepoint(safepoint).boxed() + } } impl PdRpcClient { diff --git a/src/pd/retry.rs b/src/pd/retry.rs index e8baa5e..e740fca 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -142,6 +142,15 @@ impl RetryClient { pub async fn get_timestamp(self: Arc) -> Result { retry!(self, "get_timestamp", |cluster| cluster.get_timestamp()) } + + pub async fn update_safepoint(self: Arc, safepoint: u64) -> Result { + retry!(self, "update_gc_safepoint", |cluster| async { + cluster + .update_safepoint(safepoint, self.timeout) + .await + .map(|resp| resp.get_new_safe_point() == safepoint) + }) + } } impl fmt::Debug for RetryClient { diff --git a/src/request.rs b/src/request.rs index d69ac56..a26d02f 100644 --- a/src/request.rs +++ b/src/request.rs @@ -283,6 +283,7 @@ impl_kv_rpc_request!(CleanupRequest); impl_kv_rpc_request!(BatchGetRequest); impl_kv_rpc_request!(BatchRollbackRequest); impl_kv_rpc_request!(ResolveLockRequest); +impl_kv_rpc_request!(ScanLockRequest); #[cfg(test)] mod test { diff --git a/src/transaction/client.rs b/src/transaction/client.rs index d6bc6ab..745dee7 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -1,14 +1,17 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use super::{requests::new_scan_lock_request, resolve_locks}; use crate::{ - pd::PdRpcClient, + pd::{PdClient, PdRpcClient}, + request::KvRequest, transaction::{Snapshot, Transaction}, }; - -use crate::pd::PdClient; use futures::executor::ThreadPool; -use std::sync::Arc; -use tikv_client_common::{Config, Result, Timestamp}; +use kvproto::kvrpcpb; +use std::{mem, sync::Arc}; +use tikv_client_common::{Config, Result, Timestamp, TimestampExt}; + +const SCAN_LOCK_BATCH_SIZE: u32 = 1024; // TODO: cargo-culted value /// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster. pub struct Client { @@ -88,6 +91,48 @@ impl Client { self.pd.clone().get_timestamp().await } + /// Cleans stale MVCC records in TiKV. + /// + /// It is done by: + /// 1. resolve all locks with ts <= safepoint + /// 2. update safepoint to PD + /// + /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). + /// We omit the second step "delete ranges" which is an optimization for TiDB. + pub async fn gc(&self, safepoint: Timestamp) -> Result { + // scan all locks with ts <= safepoint + let mut locks: Vec = vec![]; + let mut start_key = vec![]; + loop { + let req = new_scan_lock_request( + mem::take(&mut start_key), + safepoint.clone(), + SCAN_LOCK_BATCH_SIZE, + ); + let res: Vec = req.execute(self.pd.clone()).await?; + if res.is_empty() { + break; + } + start_key = res.last().unwrap().key.clone(); + start_key.push(0); + locks.extend(res); + } + + // resolve locks + resolve_locks(locks, self.pd.clone()).await?; + + // update safepoint to PD + let res: bool = self + .pd + .clone() + .update_safepoint(safepoint.version()) + .await?; + if !res { + info!("new safepoint != user-specified safepoint"); + } + Ok(res) + } + fn new_transaction(&self, timestamp: Timestamp) -> Transaction { Transaction::new( timestamp, diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index cddcc97..2646816 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -475,7 +475,63 @@ pub fn new_batch_rollback_request( req } +impl KvRequest for kvrpcpb::ScanLockRequest { + type Result = Vec; + + type RpcResponse = kvrpcpb::ScanLockResponse; + + type KeyData = (Key, Key); // end_key should always be empty. Used to satisfy `store_stream_for_range` + + const REQUEST_NAME: &'static str = "kv_scan_lock"; + + const RPC_FN: RpcFnType = TikvClient::kv_scan_lock_async_opt; + + fn store_stream( + &mut self, + pd_client: Arc, + ) -> BoxStream<'static, Result<(Self::KeyData, Store)>> { + let start_key = mem::take(&mut self.start_key); + let range = BoundRange::from((start_key, vec![])); + store_stream_for_range(range, pd_client) + } + + fn make_rpc_request( + &self, + (start_key, _): Self::KeyData, + store: &Store, + ) -> Self { + let mut req = self.request_from_store(store); + req.set_max_version(self.max_version); + req.set_start_key(start_key.into()); + req.set_limit(self.limit); + req + } + + fn map_result(mut result: Self::RpcResponse) -> Self::Result { + result.take_locks() + } + + fn reduce( + results: BoxStream<'static, Result>, + ) -> BoxFuture<'static, Result> { + results.try_concat().boxed() + } +} + +pub fn new_scan_lock_request( + start_key: impl Into, + safepoint: Timestamp, + limit: u32, +) -> kvrpcpb::ScanLockRequest { + let mut req = kvrpcpb::ScanLockRequest::default(); + req.set_start_key(start_key.into().into()); + req.set_max_version(safepoint.version()); + req.set_limit(limit); + req +} + impl HasLocks for kvrpcpb::CommitResponse {} impl HasLocks for kvrpcpb::CleanupResponse {} impl HasLocks for kvrpcpb::BatchRollbackResponse {} impl HasLocks for kvrpcpb::ResolveLockResponse {} +impl HasLocks for kvrpcpb::ScanLockResponse {} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 6c7653e..1f4a12f 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -438,6 +438,17 @@ async fn raw_req() -> Fallible<()> { Fallible::Ok(()) } +/// Only checks if we successfully update safepoint to PD. +#[tokio::test] +#[serial] +async fn test_update_safepoint() -> Fallible<()> { + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = TransactionClient::new(config).await?; + let res = client.gc(client.current_timestamp().await?).await?; + assert!(res); + Fallible::Ok(()) +} /// Tests raw API when there are multiple regions. /// Write large volumes of data to enforce region splitting. /// In order to test `scan`, data is uniformly inserted. diff --git a/tikv-client-pd/src/cluster.rs b/tikv-client-pd/src/cluster.rs index 8f08f4b..3b2fdab 100644 --- a/tikv-client-pd/src/cluster.rs +++ b/tikv-client-pd/src/cluster.rs @@ -65,6 +65,16 @@ impl Cluster { pub async fn get_timestamp(&self) -> Result { self.tso.clone().get_timestamp().await } + + pub async fn update_safepoint( + &self, + safepoint: u64, + timeout: Duration, + ) -> Result { + let mut req = pd_request!(self.id, pdpb::UpdateGcSafePointRequest); + req.set_safe_point(safepoint); + req.send(&self.client, timeout).await + } } /// An object for connecting and reconnecting to a PD cluster. @@ -310,3 +320,15 @@ impl PdMessage for pdpb::GetAllStoresRequest { client.get_all_stores_async_opt(self, opt).unwrap().await } } + +#[async_trait] +impl PdMessage for pdpb::UpdateGcSafePointRequest { + type Response = pdpb::UpdateGcSafePointResponse; + + async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult { + client + .update_gc_safe_point_async_opt(self, opt) + .unwrap() + .await + } +} diff --git a/tikv-client-pd/src/lib.rs b/tikv-client-pd/src/lib.rs index c08a59d..38bfaeb 100644 --- a/tikv-client-pd/src/lib.rs +++ b/tikv-client-pd/src/lib.rs @@ -30,3 +30,9 @@ impl PdResponse for pdpb::GetAllStoresResponse { self.get_header() } } + +impl PdResponse for pdpb::UpdateGcSafePointResponse { + fn header(&self) -> &pdpb::ResponseHeader { + self.get_header() + } +}