[to #773] implement UpdateServiceGCSafePoint (#723)

* implement UpdateServiceGCSafePoint

Signed-off-by: shiyuhang <1136742008@qq.com>
This commit is contained in:
shi yuhang 2023-03-07 14:21:08 +08:00 committed by GitHub
parent 2c48b4a358
commit bcd11f34ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 0 deletions

View File

@ -104,6 +104,7 @@ import org.tikv.kvproto.Pdpb.ScatterRegionResponse;
import org.tikv.kvproto.Pdpb.Timestamp; import org.tikv.kvproto.Pdpb.Timestamp;
import org.tikv.kvproto.Pdpb.TsoRequest; import org.tikv.kvproto.Pdpb.TsoRequest;
import org.tikv.kvproto.Pdpb.TsoResponse; import org.tikv.kvproto.Pdpb.TsoResponse;
import org.tikv.kvproto.Pdpb.UpdateServiceGCSafePointRequest;
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub> public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
implements ReadOnlyPDClient { implements ReadOnlyPDClient {
@ -383,6 +384,17 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
return () -> GetAllStoresRequest.newBuilder().setHeader(header).build(); return () -> GetAllStoresRequest.newBuilder().setHeader(header).build();
} }
private Supplier<UpdateServiceGCSafePointRequest> buildUpdateServiceGCSafePointRequest(
ByteString serviceId, long ttl, long safePoint) {
return () ->
UpdateServiceGCSafePointRequest.newBuilder()
.setHeader(header)
.setSafePoint(safePoint)
.setServiceId(serviceId)
.setTTL(ttl)
.build();
}
private <T> PDErrorHandler<GetStoreResponse> buildPDErrorHandler() { private <T> PDErrorHandler<GetStoreResponse> buildPDErrorHandler() {
return new PDErrorHandler<>( return new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this); r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this);
@ -419,6 +431,20 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
return conf.getReplicaRead(); return conf.getReplicaRead();
} }
@Override
public Long updateServiceGCSafePoint(
String serviceId, long ttl, long safePoint, BackOffer backOffer) {
return callWithRetry(
backOffer,
PDGrpc.getUpdateServiceGCSafePointMethod(),
buildUpdateServiceGCSafePointRequest(
ByteString.copyFromUtf8(serviceId), ttl, safePoint),
new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
this))
.getMinSafePoint();
}
@Override @Override
public void close() throws InterruptedException { public void close() throws InterruptedException {
etcdClient.close(); etcdClient.close();

View File

@ -72,4 +72,16 @@ public interface ReadOnlyPDClient {
Long getClusterId(); Long getClusterId();
RequestKeyCodec getCodec(); RequestKeyCodec getCodec();
/**
* Update ServiceGCSafePoint
*
* @param serviceId ServiceId
* @param ttl TTL in seconds
* @param safePoint The TiTimestamp you want to set. Set to start_ts.getPrevious() is a good
* practice
* @return the MinSafePoint of all services. If this value is greater than safePoint, it means
* update failed
*/
Long updateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer);
} }