From bcd11f34ca2b59c4a0ec0691a40adaf6e534e183 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Tue, 7 Mar 2023 14:21:08 +0800 Subject: [PATCH] [to #773] implement UpdateServiceGCSafePoint (#723) * implement UpdateServiceGCSafePoint Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/PDClient.java | 26 +++++++++++++++++++ .../org/tikv/common/ReadOnlyPDClient.java | 12 +++++++++ 2 files changed, 38 insertions(+) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 43383bbda8..e3695b6916 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -104,6 +104,7 @@ import org.tikv.kvproto.Pdpb.ScatterRegionResponse; import org.tikv.kvproto.Pdpb.Timestamp; import org.tikv.kvproto.Pdpb.TsoRequest; import org.tikv.kvproto.Pdpb.TsoResponse; +import org.tikv.kvproto.Pdpb.UpdateServiceGCSafePointRequest; public class PDClient extends AbstractGRPCClient implements ReadOnlyPDClient { @@ -383,6 +384,17 @@ public class PDClient extends AbstractGRPCClient return () -> GetAllStoresRequest.newBuilder().setHeader(header).build(); } + private Supplier buildUpdateServiceGCSafePointRequest( + ByteString serviceId, long ttl, long safePoint) { + return () -> + UpdateServiceGCSafePointRequest.newBuilder() + .setHeader(header) + .setSafePoint(safePoint) + .setServiceId(serviceId) + .setTTL(ttl) + .build(); + } + private PDErrorHandler buildPDErrorHandler() { return new PDErrorHandler<>( r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this); @@ -419,6 +431,20 @@ public class PDClient extends AbstractGRPCClient return conf.getReplicaRead(); } + @Override + public Long updateServiceGCSafePoint( + String serviceId, long ttl, long safePoint, BackOffer backOffer) { + return callWithRetry( + backOffer, + PDGrpc.getUpdateServiceGCSafePointMethod(), + buildUpdateServiceGCSafePointRequest( + ByteString.copyFromUtf8(serviceId), ttl, safePoint), + new PDErrorHandler<>( + r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, + this)) + .getMinSafePoint(); + } + @Override public void close() throws InterruptedException { etcdClient.close(); diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index ddf1855e61..58ad9b2a62 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -72,4 +72,16 @@ public interface ReadOnlyPDClient { Long getClusterId(); RequestKeyCodec getCodec(); + + /** + * Update ServiceGCSafePoint + * + * @param serviceId ServiceId + * @param ttl TTL in seconds + * @param safePoint The TiTimestamp you want to set. Set to start_ts.getPrevious() is a good + * practice + * @return the MinSafePoint of all services. If this value is greater than safePoint, it means + * update failed + */ + Long updateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer); }