Add atomic api support (#140)

* add atomic api support

Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
birdstorm 2021-03-15 19:58:25 +08:00 committed by GitHub
parent 56331e2f98
commit c9507c2fa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 359 additions and 81 deletions

View File

@ -18,7 +18,7 @@ CURRENT_DIR=`pwd`
TIKV_CLIENT_HOME="$(cd "`dirname "$0"`"/..; pwd)"
cd $TIKV_CLIENT_HOME
kvproto_hash=b2375dcc80adc9c9423bd010592c045241f29d5a
kvproto_hash=70a5912413a95aa47c069044dd531efa69ad7549
raft_rs_hash=b9891b673573fad77ebcf9bbe0969cf945841926

View File

@ -30,6 +30,7 @@ public class ConfigUtils {
public static final String TIKV_BATCH_GET_CONCURRENCY = "tikv.batch_get_concurrency";
public static final String TIKV_BATCH_PUT_CONCURRENCY = "tikv.batch_put_concurrency";
public static final String TIKV_BATCH_DELETE_CONCURRENCY = "tikv.batch_delete_concurrency";
public static final String TIKV_BATCH_SCAN_CONCURRENCY = "tikv.batch_scan_concurrency";
public static final String TIKV_DELETE_RANGE_CONCURRENCY = "tikv.delete_range_concurrency";
@ -46,6 +47,7 @@ public class ConfigUtils {
public static final String TIKV_METRICS_ENABLE = "tikv.metrics.enable";
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_SCAN_BATCH_SIZE = 10240;
@ -59,6 +61,7 @@ public class ConfigUtils {
public static final int DEF_TABLE_SCAN_CONCURRENCY = 512;
public static final int DEF_BATCH_GET_CONCURRENCY = 20;
public static final int DEF_BATCH_PUT_CONCURRENCY = 20;
public static final int DEF_BATCH_DELETE_CONCURRENCY = 20;
public static final int DEF_BATCH_SCAN_CONCURRENCY = 5;
public static final int DEF_DELETE_RANGE_CONCURRENCY = 20;
public static final Kvrpcpb.CommandPri DEF_COMMAND_PRIORITY = Kvrpcpb.CommandPri.Low;

View File

@ -23,7 +23,6 @@ import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
@ -128,13 +127,8 @@ public class KVClient implements AutoCloseable {
ExecutorCompletionService<List<KvPair>> completionService =
new ExecutorCompletionService<>(batchGetThreadPool);
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), keys, backOffer);
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE, MAX_BATCH_LIMIT);
}
List<Batch> batches =
getBatches(backOffer, keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
for (Batch batch : batches) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
@ -170,14 +164,8 @@ public class KVClient implements AutoCloseable {
private List<KvPair> doSendBatchGetWithRefetchRegion(
BackOffer backOffer, Batch batch, long version) {
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), batch.keys, backOffer);
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
retryBatches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE, MAX_BATCH_LIMIT);
}
List<Batch> retryBatches =
getBatches(backOffer, batch.keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
ArrayList<KvPair> results = new ArrayList<>();
for (Batch retryBatch : retryBatches) {

View File

@ -46,6 +46,7 @@ public class TiConfiguration implements Serializable {
}
private static void loadFromDefaultProperties() {
setIfMissing(TIKV_PD_ADDRESSES, DEF_PD_ADDRESSES);
setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE);
@ -55,6 +56,7 @@ public class TiConfiguration implements Serializable {
setIfMissing(TIKV_TABLE_SCAN_CONCURRENCY, DEF_TABLE_SCAN_CONCURRENCY);
setIfMissing(TIKV_BATCH_GET_CONCURRENCY, DEF_BATCH_GET_CONCURRENCY);
setIfMissing(TIKV_BATCH_PUT_CONCURRENCY, DEF_BATCH_PUT_CONCURRENCY);
setIfMissing(TIKV_BATCH_DELETE_CONCURRENCY, DEF_BATCH_DELETE_CONCURRENCY);
setIfMissing(TIKV_BATCH_SCAN_CONCURRENCY, DEF_BATCH_SCAN_CONCURRENCY);
setIfMissing(TIKV_DELETE_RANGE_CONCURRENCY, DEF_DELETE_RANGE_CONCURRENCY);
setIfMissing(TIKV_REQUEST_COMMAND_PRIORITY, LOW_COMMAND_PRIORITY);
@ -222,6 +224,7 @@ public class TiConfiguration implements Serializable {
private int tableScanConcurrency = getInt(TIKV_TABLE_SCAN_CONCURRENCY);
private int batchGetConcurrency = getInt(TIKV_BATCH_GET_CONCURRENCY);
private int batchPutConcurrency = getInt(TIKV_BATCH_PUT_CONCURRENCY);
private int batchDeleteConcurrency = getInt(TIKV_BATCH_DELETE_CONCURRENCY);
private int batchScanConcurrency = getInt(TIKV_BATCH_SCAN_CONCURRENCY);
private int deleteRangeConcurrency = getInt(TIKV_DELETE_RANGE_CONCURRENCY);
private CommandPri commandPriority = getCommandPri(TIKV_REQUEST_COMMAND_PRIORITY);
@ -241,6 +244,10 @@ public class TiConfiguration implements Serializable {
RAW
}
public static TiConfiguration createDefault() {
return new TiConfiguration();
}
public static TiConfiguration createDefault(String pdAddrsStr) {
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
TiConfiguration conf = new TiConfiguration();
@ -248,6 +255,12 @@ public class TiConfiguration implements Serializable {
return conf;
}
public static TiConfiguration createRawDefault() {
TiConfiguration conf = new TiConfiguration();
conf.kvMode = KVMode.RAW;
return conf;
}
public static TiConfiguration createRawDefault(String pdAddrsStr) {
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
TiConfiguration conf = new TiConfiguration();
@ -360,6 +373,15 @@ public class TiConfiguration implements Serializable {
return this;
}
public int getBatchDeleteConcurrency() {
return batchDeleteConcurrency;
}
public TiConfiguration setBatchDeleteConcurrency(int batchDeleteConcurrency) {
this.batchDeleteConcurrency = batchDeleteConcurrency;
return this;
}
public int getBatchScanConcurrency() {
return batchScanConcurrency;
}

View File

@ -67,6 +67,7 @@ public class TiSession implements AutoCloseable {
private volatile ExecutorService tableScanThreadPool;
private volatile ExecutorService batchGetThreadPool;
private volatile ExecutorService batchPutThreadPool;
private volatile ExecutorService batchDeleteThreadPool;
private volatile ExecutorService batchScanThreadPool;
private volatile ExecutorService deleteRangeThreadPool;
private volatile RegionManager regionManager;
@ -279,6 +280,25 @@ public class TiSession implements AutoCloseable {
return res;
}
public ExecutorService getThreadPoolForBatchDelete() {
ExecutorService res = batchDeleteThreadPool;
if (res == null) {
synchronized (this) {
if (batchDeleteThreadPool == null) {
batchDeleteThreadPool =
Executors.newFixedThreadPool(
conf.getBatchDeleteConcurrency(),
new ThreadFactoryBuilder()
.setNameFormat("batchDelete-thread-%d")
.setDaemon(true)
.build());
}
res = batchDeleteThreadPool;
}
}
return res;
}
public ExecutorService getThreadPoolForBatchScan() {
ExecutorService res = batchScanThreadPool;
if (res == null) {
@ -459,6 +479,9 @@ public class TiSession implements AutoCloseable {
if (batchPutThreadPool != null) {
batchPutThreadPool.shutdownNow();
}
if (batchDeleteThreadPool != null) {
batchDeleteThreadPool.shutdownNow();
}
if (batchScanThreadPool != null) {
batchScanThreadPool.shutdownNow();
}

View File

@ -30,6 +30,7 @@ public abstract class RetryPolicy<RespT> {
Histogram.build()
.name("client_java_grpc_single_requests_latency")
.help("grpc request latency.")
.labelNames("type")
.register();
// handles PD and TiKV's error.
@ -58,7 +59,7 @@ public abstract class RetryPolicy<RespT> {
RespT result = null;
try {
// add single request duration histogram
Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.startTimer();
Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
try {
result = proc.call();
} finally {

View File

@ -953,6 +953,53 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
}
public ByteString rawPutIfAbsent(
BackOffer backOffer, ByteString key, ByteString value, long ttl) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put_if_absent").startTimer();
try {
Supplier<RawCASRequest> factory =
() ->
RawCASRequest.newBuilder()
.setContext(region.getContext())
.setKey(key)
.setValue(value)
.setPreviousNotExist(true)
.setTtl(ttl)
.build();
KVErrorHandler<RawCASResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawCASResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawCompareAndSetMethod(), factory, handler);
return rawPutIfAbsentHelper(resp);
} finally {
requestTimer.observeDuration();
}
}
private ByteString rawPutIfAbsentHelper(RawCASResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawPutResponse failed without a cause");
}
String error = resp.getError();
if (!error.isEmpty()) {
throw new KeyException(resp.getError());
}
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}
if (!resp.getNotEqual()) {
return ByteString.EMPTY;
}
return resp.getValue();
}
public List<KvPair> rawBatchGet(BackOffer backoffer, List<ByteString> keys) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_get").startTimer();
@ -991,7 +1038,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
return resp.getPairsList();
}
public void rawBatchPut(BackOffer backOffer, List<KvPair> kvPairs, long ttl) {
public void rawBatchPut(BackOffer backOffer, List<KvPair> kvPairs, long ttl, boolean atomic) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_put").startTimer();
try {
@ -1004,6 +1051,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setContext(region.getContext())
.addAllPairs(kvPairs)
.setTtl(ttl)
.setForCas(atomic)
.build();
KVErrorHandler<RawBatchPutResponse> handler =
new KVErrorHandler<>(
@ -1019,13 +1067,13 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
}
public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl) {
public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atomic) {
List<KvPair> pairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
pairs.add(
KvPair.newBuilder().setKey(batch.keys.get(i)).setValue(batch.values.get(i)).build());
}
rawBatchPut(backOffer, pairs, ttl);
rawBatchPut(backOffer, pairs, ttl, atomic);
}
private void handleRawBatchPut(RawBatchPutResponse resp) {
@ -1033,6 +1081,52 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawBatchPutResponse failed without a cause");
}
String error = resp.getError();
if (!error.isEmpty()) {
throw new KeyException(resp.getError());
}
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}
}
public void rawBatchDelete(BackOffer backoffer, List<ByteString> keys, boolean atomic) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_delete").startTimer();
try {
if (keys.isEmpty()) {
return;
}
Supplier<RawBatchDeleteRequest> factory =
() ->
RawBatchDeleteRequest.newBuilder()
.setContext(region.getContext())
.addAllKeys(keys)
.setForCas(atomic)
.build();
KVErrorHandler<RawBatchDeleteResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawBatchDeleteResponse resp =
callWithRetry(backoffer, TikvGrpc.getRawBatchDeleteMethod(), factory, handler);
handleRawBatchDelete(resp);
} finally {
requestTimer.observeDuration();
}
}
private void handleRawBatchDelete(RawBatchDeleteResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawBatchDeleteResponse failed without a cause");
}
String error = resp.getError();
if (!error.isEmpty()) {
throw new KeyException(resp.getError());
}
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Kvrpcpb;
@ -90,6 +91,23 @@ public class ClientUtils {
}
}
public static List<Batch> getBatches(
BackOffer backOffer,
List<ByteString> keys,
int batchSize,
int batchLimit,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), keys, backOffer);
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(retryBatches, entry.getKey(), entry.getValue(), batchSize, batchLimit);
}
return retryBatches;
}
public static Map<TiRegion, List<ByteString>> groupKeysByRegion(
RegionManager regionManager, Set<ByteString> keys, BackOffer backoffer) {
return groupKeysByRegion(regionManager, new ArrayList<>(keys), backoffer, true);

View File

@ -41,6 +41,7 @@ public class RawKVClient implements AutoCloseable {
private final TiConfiguration conf;
private final ExecutorService batchGetThreadPool;
private final ExecutorService batchPutThreadPool;
private final ExecutorService batchDeleteThreadPool;
private final ExecutorService batchScanThreadPool;
private final ExecutorService deleteRangeThreadPool;
private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class);
@ -50,6 +51,7 @@ public class RawKVClient implements AutoCloseable {
private static final int MAX_RAW_BATCH_LIMIT = 1024;
private static final int RAW_BATCH_PUT_SIZE = 1024 * 1024; // 1 MB
private static final int RAW_BATCH_GET_SIZE = 16 * 1024; // 16 K
private static final int RAW_BATCH_DELETE_SIZE = 16 * 1024; // 16 K
private static final int RAW_BATCH_SCAN_SIZE = 16;
private static final int RAW_BATCH_PAIR_COUNT = 512;
@ -84,6 +86,7 @@ public class RawKVClient implements AutoCloseable {
this.clientBuilder = clientBuilder;
this.batchGetThreadPool = session.getThreadPoolForBatchGet();
this.batchPutThreadPool = session.getThreadPoolForBatchPut();
this.batchDeleteThreadPool = session.getThreadPoolForBatchDelete();
this.batchScanThreadPool = session.getThreadPoolForBatchScan();
this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange();
}
@ -132,26 +135,41 @@ public class RawKVClient implements AutoCloseable {
}
/**
* Put a set of raw key-value pair to TiKV
* Put a key-value pair if it does not exist. This API is atomic.
*
* @param kvPairs kvPairs
* @param key key
* @param value value
* @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns
* the previous key if the value already exists, and does not write to TiKV.
*/
public void batchPut(Map<ByteString, ByteString> kvPairs) {
batchPut(kvPairs, 0);
public ByteString putIfAbsent(ByteString key, ByteString value) {
return putIfAbsent(key, value, 0L);
}
/**
* Put a set of raw key-value pair to TiKV
* Put a key-value pair with TTL if it does not exist. This API is atomic.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
* @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns
* the previous key if the value already exists, and does not write to TiKV.
*/
public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
String label = "client_raw_batch_put";
public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) {
String label = "client_raw_put_if_absent";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
ByteString result = client.rawPutIfAbsent(backOffer, key, value, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
}
}
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
@ -160,14 +178,56 @@ public class RawKVClient implements AutoCloseable {
}
}
private void batchPut(BackOffer backOffer, List<ByteString> keys, List<ByteString> values) {
batchPut(backOffer, keys, values, 0);
/**
* Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic.
*
* @param kvPairs kvPairs
*/
public void batchPut(Map<ByteString, ByteString> kvPairs) {
batchPut(kvPairs, 0);
}
private void batchPut(
BackOffer backOffer, List<ByteString> keys, List<ByteString> values, long ttl) {
Map<ByteString, ByteString> keysToValues = mapKeysToValues(keys, values);
doSendBatchPut(backOffer, keysToValues, ttl);
/**
* Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
batchPut(kvPairs, ttl, false);
}
/**
* Put a set of raw key-value pair to TiKV, this API is atomic
*
* @param kvPairs kvPairs
*/
public void batchPutAtomic(Map<ByteString, ByteString> kvPairs) {
batchPutAtomic(kvPairs, 0);
}
/**
* Put a set of raw key-value pair to TiKV, this API is atomic.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
public void batchPutAtomic(Map<ByteString, ByteString> kvPairs, long ttl) {
batchPut(kvPairs, ttl, true);
}
private void batchPut(Map<ByteString, ByteString> kvPairs, long ttl, boolean atomic) {
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl, atomic);
RAW_REQUEST_SUCCESS.labels(label).inc();
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
}
/**
@ -221,6 +281,40 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Delete a list of raw key-value pair from TiKV if key exists
*
* @param keys list of raw key
*/
public void batchDelete(List<ByteString> keys) {
batchDelete(keys, false);
}
/**
* Delete a list of raw key-value pair from TiKV if key exists, this API is atomic
*
* @param keys list of raw key
*/
public void batchDeleteAtomic(List<ByteString> keys) {
batchDelete(keys, true);
}
private void batchDelete(List<ByteString> keys, boolean atomic) {
String label = "client_raw_batch_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
doSendBatchDelete(backOffer, keys, atomic);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
}
/**
* Get the TTL of a raw key from TiKV if key exists
*
@ -493,7 +587,8 @@ public class RawKVClient implements AutoCloseable {
deleteRange(key, endKey);
}
private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl) {
private void doSendBatchPut(
BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl, boolean atomic) {
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchPutThreadPool);
@ -518,15 +613,16 @@ public class RawKVClient implements AutoCloseable {
for (Batch batch : task) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(
() -> doSendBatchPutInBatchesWithRetry(singleBatchBackOffer, batch, ttl));
() -> doSendBatchPutInBatchesWithRetry(singleBatchBackOffer, batch, ttl, atomic));
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
}
private List<Batch> doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) {
private List<Batch> doSendBatchPutInBatchesWithRetry(
BackOffer backOffer, Batch batch, long ttl, boolean atomic) {
try (RegionStoreClient client = clientBuilder.build(batch.region)) {
client.rawBatchPut(backOffer, batch, ttl);
client.rawBatchPut(backOffer, batch, ttl, atomic);
return new ArrayList<>();
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
@ -559,19 +655,8 @@ public class RawKVClient implements AutoCloseable {
ExecutorCompletionService<Pair<List<Batch>, List<KvPair>>> completionService =
new ExecutorCompletionService<>(batchGetThreadPool);
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), keys, backOffer);
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
batches, entry.getKey(), entry.getValue(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT);
}
for (Batch batch : batches) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch));
}
List<Batch> batches =
getBatches(backOffer, keys, RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, this.clientBuilder);
Queue<List<Batch>> taskQueue = new LinkedList<>();
List<KvPair> result = new ArrayList<>();
@ -608,16 +693,50 @@ public class RawKVClient implements AutoCloseable {
}
private List<Batch> doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch batch) {
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), batch.keys, backOffer);
List<Batch> retryBatches = new ArrayList<>();
return getBatches(
backOffer, batch.keys, RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
}
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
retryBatches, entry.getKey(), entry.getValue(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT);
private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys, boolean atomic) {
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchDeleteThreadPool);
List<Batch> batches =
getBatches(backOffer, keys, RAW_BATCH_DELETE_SIZE, MAX_RAW_BATCH_LIMIT, this.clientBuilder);
Queue<List<Batch>> taskQueue = new LinkedList<>();
taskQueue.offer(batches);
while (!taskQueue.isEmpty()) {
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(
() -> doSendBatchDeleteInBatchesWithRetry(singleBatchBackOffer, batch, atomic));
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
}
return retryBatches;
private List<Batch> doSendBatchDeleteInBatchesWithRetry(
BackOffer backOffer, Batch batch, boolean atomic) {
RegionStoreClient client = clientBuilder.build(batch.region);
try {
client.rawBatchDelete(backOffer, batch.keys, atomic);
return new ArrayList<>();
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
logger.warn("ReSplitting ranges for BatchGetRequest", e);
// retry
return doSendBatchDeleteWithRefetchRegion(backOffer, batch);
}
}
private List<Batch> doSendBatchDeleteWithRefetchRegion(BackOffer backOffer, Batch batch) {
return getBatches(
backOffer, batch.keys, RAW_BATCH_DELETE_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
}
private ByteString calcKeyByCondition(boolean condition, ByteString key1, ByteString key2) {

View File

@ -136,13 +136,8 @@ public class KVClient implements AutoCloseable {
ExecutorCompletionService<List<Kvrpcpb.KvPair>> completionService =
new ExecutorCompletionService<>(executorService);
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), keys, backOffer);
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE, MAX_BATCH_LIMIT);
}
List<Batch> batches =
getBatches(backOffer, keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
for (Batch batch : batches) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
@ -178,14 +173,8 @@ public class KVClient implements AutoCloseable {
private List<Kvrpcpb.KvPair> doSendBatchGetWithRefetchRegion(
BackOffer backOffer, Batch batch, long version) {
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), batch.keys, backOffer);
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
retryBatches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE, MAX_BATCH_LIMIT);
}
List<Batch> retryBatches =
getBatches(backOffer, batch.keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
ArrayList<Kvrpcpb.KvPair> results = new ArrayList<>();
for (Batch retryBatch : retryBatches) {

View File

@ -21,7 +21,6 @@ import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb;
public class RawKVClientTest {
private static final String DEFAULT_PD_ADDRESS = "127.0.0.1:2379";
private static final String RAW_PREFIX = "raw_\u0001_";
private static final int KEY_POOL_SIZE = 1000000;
private static final int TEST_CASES = 10000;
@ -71,7 +70,7 @@ public class RawKVClientTest {
@Before
public void setup() throws IOException {
try {
TiConfiguration conf = TiConfiguration.createRawDefault(DEFAULT_PD_ADDRESS);
TiConfiguration conf = TiConfiguration.createRawDefault();
session = TiSession.create(conf);
initialized = false;
if (client == null) {
@ -92,6 +91,27 @@ public class RawKVClientTest {
}
}
@Test
public void atomicAPITest() {
if (!initialized) return;
long ttl = 10;
ByteString key = ByteString.copyFromUtf8("key_atomic");
ByteString value = ByteString.copyFromUtf8("value");
ByteString value2 = ByteString.copyFromUtf8("value2");
client.delete(key);
ByteString res1 = client.putIfAbsent(key, value, ttl);
assert res1.isEmpty();
ByteString res2 = client.putIfAbsent(key, value2, ttl);
assert res2.equals(value);
try {
Thread.sleep(ttl * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
ByteString res3 = client.putIfAbsent(key, value, ttl);
assert res3.isEmpty();
}
@Test
public void getKeyTTLTest() {
if (!initialized) return;
@ -104,7 +124,8 @@ public class RawKVClientTest {
logger.info("current ttl of key is " + t);
try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
Long t = client.getKeyTTL(key);
@ -697,7 +718,7 @@ public class RawKVClientTest {
try {
Thread.sleep(ttl * 1000);
} catch (InterruptedException e) {
throw new TiKVException(e);
Thread.currentThread().interrupt();
}
for (int i = 0; i < cases; i++) {
ByteString key = randomKeys.get(i);