Add TTL support (#129)

* add ttl support

Signed-off-by: birdstorm <samuelwyf@hotmail.com>

* fix time unit

Signed-off-by: birdstorm <samuelwyf@hotmail.com>

* ignore test temporary

Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
birdstorm 2021-02-21 14:31:49 +09:00 committed by GitHub
parent 10d8efbbc4
commit 45219438f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 118 additions and 18 deletions

View File

@ -658,7 +658,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
String otherError = response.getOtherError();
if (otherError != null && !otherError.isEmpty()) {
if (!otherError.isEmpty()) {
logger.warn(String.format("Other error occurred, message: %s", otherError));
throw new GrpcException(otherError);
}
@ -816,7 +816,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
throw new TiClientInternalException("RawGetResponse failed without a cause");
}
String error = resp.getError();
if (error != null && !error.isEmpty()) {
if (!error.isEmpty()) {
throw new KeyException(resp.getError());
}
if (resp.hasRegionError()) {
@ -854,13 +854,14 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
}
public void rawPut(BackOffer backOffer, ByteString key, ByteString value) {
public void rawPut(BackOffer backOffer, ByteString key, ByteString value, long ttl) {
Supplier<RawPutRequest> factory =
() ->
RawPutRequest.newBuilder()
.setContext(region.getContext())
.setKey(key)
.setValue(value)
.setTtl(ttl)
.build();
KVErrorHandler<RawPutResponse> handler =
@ -919,7 +920,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
return resp.getPairsList();
}
public void rawBatchPut(BackOffer backOffer, List<KvPair> kvPairs) {
public void rawBatchPut(BackOffer backOffer, List<KvPair> kvPairs, long ttl) {
if (kvPairs.isEmpty()) {
return;
}
@ -928,6 +929,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
RawBatchPutRequest.newBuilder()
.setContext(region.getContext())
.addAllPairs(kvPairs)
.setTtl(ttl)
.build();
KVErrorHandler<RawBatchPutResponse> handler =
new KVErrorHandler<>(
@ -940,13 +942,13 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
handleRawBatchPut(resp);
}
public void rawBatchPut(BackOffer backOffer, Batch batch) {
public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl) {
List<KvPair> pairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
pairs.add(
KvPair.newBuilder().setKey(batch.keys.get(i)).setValue(batch.values.get(i)).build());
}
rawBatchPut(backOffer, pairs);
rawBatchPut(backOffer, pairs, ttl);
}
private void handleRawBatchPut(RawBatchPutResponse resp) {

View File

@ -84,11 +84,22 @@ public class RawKVClient implements AutoCloseable {
* @param value raw value
*/
public void put(ByteString key, ByteString value) {
put(key, value, 0);
}
/**
* Put a raw key-value pair to TiKV
*
* @param key raw key
* @param value raw value
* @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
*/
public void put(ByteString key, ByteString value, long ttl) {
BackOffer backOffer = defaultBackOff();
for (int i = 0; i < MAX_RETRY_LIMIT; i++) {
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawPut(backOffer, key, value);
client.rawPut(backOffer, key, value, ttl);
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
@ -103,12 +114,27 @@ public class RawKVClient implements AutoCloseable {
* @param kvPairs kvPairs
*/
public void batchPut(Map<ByteString, ByteString> kvPairs) {
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs);
batchPut(kvPairs, 0);
}
/**
* Put a set of raw key-value pair to TiKV
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl);
}
private void batchPut(BackOffer backOffer, List<ByteString> keys, List<ByteString> values) {
batchPut(backOffer, keys, values, 0);
}
private void batchPut(
BackOffer backOffer, List<ByteString> keys, List<ByteString> values, long ttl) {
Map<ByteString, ByteString> keysToValues = mapKeysToValues(keys, values);
doSendBatchPut(backOffer, keysToValues);
doSendBatchPut(backOffer, keysToValues, ttl);
}
/**
@ -332,7 +358,7 @@ public class RawKVClient implements AutoCloseable {
deleteRange(key, endKey);
}
private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs) {
private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl) {
ExecutorCompletionService<Object> completionService =
new ExecutorCompletionService<>(batchPutThreadPool);
@ -350,33 +376,34 @@ public class RawKVClient implements AutoCloseable {
for (Batch batch : batches) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(() -> doSendBatchPutInBatchesWithRetry(singleBatchBackOffer, batch));
completionService.submit(
() -> doSendBatchPutInBatchesWithRetry(singleBatchBackOffer, batch, ttl));
}
getTasks(completionService, batches, BackOffer.RAWKV_MAX_BACKOFF);
}
private Object doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch) {
private Object doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) {
TiRegion oldRegion = batch.region;
TiRegion currentRegion =
clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey());
if (oldRegion.equals(currentRegion)) {
try (RegionStoreClient client = clientBuilder.build(batch.region); ) {
client.rawBatchPut(backOffer, batch);
client.rawBatchPut(backOffer, batch, ttl);
return null;
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("ReSplitting ranges for BatchPutRequest");
// retry
return doSendBatchPutWithRefetchRegion(backOffer, batch);
return doSendBatchPutWithRefetchRegion(backOffer, batch, ttl);
}
} else {
return doSendBatchPutWithRefetchRegion(backOffer, batch);
return doSendBatchPutWithRefetchRegion(backOffer, batch, ttl);
}
}
private Object doSendBatchPutWithRefetchRegion(BackOffer backOffer, Batch batch) {
private Object doSendBatchPutWithRefetchRegion(BackOffer backOffer, Batch batch, long ttl) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(batch.keys);
List<Batch> retryBatches = new ArrayList<>();
@ -391,7 +418,7 @@ public class RawKVClient implements AutoCloseable {
for (Batch retryBatch : retryBatches) {
// recursive calls
doSendBatchPutInBatchesWithRetry(backOffer, retryBatch);
doSendBatchPutInBatchesWithRetry(backOffer, retryBatch, ttl);
}
return null;

View File

@ -20,7 +20,12 @@ import com.google.protobuf.ByteString;
import java.util.Arrays;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.kvproto.Metapb.*;
import org.tikv.kvproto.Metapb.Peer;
import org.tikv.kvproto.Metapb.Region;
import org.tikv.kvproto.Metapb.RegionEpoch;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreLabel;
import org.tikv.kvproto.Metapb.StoreState;
import org.tikv.kvproto.Pdpb.*;
public class GrpcUtils {

View File

@ -187,6 +187,12 @@ public class RawKVClientTest {
}
prepare();
// TODO: check whether cluster supports ttl
// long ttl = 10;
// rawTTLTest(10, ttl, benchmark);
prepare();
} catch (final TiKVException e) {
logger.warn("Test fails with Exception " + e);
}
@ -195,6 +201,7 @@ public class RawKVClientTest {
private void prepare() {
logger.info("Initializing test");
data.clear();
List<Kvrpcpb.KvPair> remainingKeys = rawKeys();
int sz = remainingKeys.size();
logger.info("deleting " + sz);
@ -538,6 +545,56 @@ public class RawKVClientTest {
}
}
private void rawTTLTest(int cases, long ttl, boolean benchmark) {
logger.info("ttl testing");
if (benchmark) {
for (int i = 0; i < cases; i++) {
ByteString key = orderedKeys.get(i), value = values.get(i);
data.put(key, value);
}
long start = System.currentTimeMillis();
int base = cases / WORKER_CNT;
for (int cnt = 0; cnt < WORKER_CNT; cnt++) {
int i = cnt;
completionService.submit(
() -> {
for (int j = 0; j < base; j++) {
int num = i * base + j;
ByteString key = orderedKeys.get(num), value = values.get(num);
client.put(key, value, ttl);
}
return null;
});
}
awaitTimeOut(100);
long end = System.currentTimeMillis();
logger.info(
cases
+ " ttl put: "
+ (end - start) / 1000.0
+ "s workers="
+ WORKER_CNT
+ " put="
+ rawKeys().size());
} else {
for (int i = 0; i < cases; i++) {
ByteString key = randomKeys.get(i), value = values.get(r.nextInt(KEY_POOL_SIZE));
data.put(key, value);
checkPutTTL(key, value, ttl);
}
try {
Thread.sleep(ttl * 1000);
} catch (InterruptedException e) {
throw new TiKVException(e);
}
for (int i = 0; i < cases; i++) {
ByteString key = randomKeys.get(i);
checkGetTTLTimeOut(key);
}
}
}
private void checkBatchGet(List<ByteString> keys) {
List<Kvrpcpb.KvPair> result = client.batchGet(keys);
for (Kvrpcpb.KvPair kvPair : result) {
@ -615,6 +672,15 @@ public class RawKVClientTest {
assert result.isEmpty();
}
private void checkPutTTL(ByteString key, ByteString value, long ttl) {
client.put(key, value, ttl);
assert client.get(key).equals(value);
}
private void checkGetTTLTimeOut(ByteString key) {
assert client.get(key).isEmpty();
}
private void checkEmpty(ByteString key) {
assert client.get(key).isEmpty();
}