Add Batch APIs (#118)

This commit is contained in:
birdstorm 2021-02-01 11:38:12 +08:00 committed by GitHub
parent a627ad6f8d
commit 25e0ab3b14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 692 additions and 194 deletions

View File

@ -17,17 +17,16 @@
package org.tikv.common;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import static org.tikv.common.util.ClientUtils.getKvPairs;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,6 +38,7 @@ import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.Batch;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb.KvPair;
@ -47,25 +47,17 @@ public class KVClient implements AutoCloseable {
private static final int BATCH_GET_SIZE = 16 * 1024;
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final ExecutorService executorService;
private final ExecutorService batchGetThreadPool;
public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) {
Objects.requireNonNull(conf, "conf is null");
public KVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
Objects.requireNonNull(clientBuilder, "clientBuilder is null");
this.conf = conf;
this.conf = session.getConf();
this.clientBuilder = clientBuilder;
executorService =
Executors.newFixedThreadPool(
conf.getKvClientConcurrency(),
new ThreadFactoryBuilder().setNameFormat("kvclient-pool-%d").setDaemon(true).build());
this.batchGetThreadPool = session.getThreadPoolForBatchGet();
}
@Override
public void close() {
if (executorService != null) {
executorService.shutdownNow();
}
}
public void close() {}
/**
* Get a key-value pair from TiKV if key exists
@ -134,7 +126,7 @@ public class KVClient implements AutoCloseable {
private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys, long version) {
ExecutorCompletionService<List<KvPair>> completionService =
new ExecutorCompletionService<>(executorService);
new ExecutorCompletionService<>(batchGetThreadPool);
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(keys);
List<Batch> batches = new ArrayList<>();
@ -149,18 +141,7 @@ public class KVClient implements AutoCloseable {
() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch, version));
}
try {
List<KvPair> result = new ArrayList<>();
for (int i = 0; i < batches.size(); i++) {
result.addAll(completionService.take().get());
}
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
return getKvPairs(completionService, batches);
}
private List<KvPair> doSendBatchGetInBatchesWithRetry(
@ -258,15 +239,4 @@ public class KVClient implements AutoCloseable {
int limit) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
}
/** A Batch containing the region and a list of keys to send */
private static final class Batch {
private final TiRegion region;
private final List<ByteString> keys;
Batch(TiRegion region, List<ByteString> keys) {
this.region = region;
this.keys = keys;
}
}
}

View File

@ -65,7 +65,7 @@ public class Snapshot {
}
public ByteString get(ByteString key) {
try (KVClient client = new KVClient(session.getConf(), session.getRegionStoreClientBuilder())) {
try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) {
return client.get(key, timestamp.getVersion());
}
}
@ -75,7 +75,7 @@ public class Snapshot {
for (byte[] key : keys) {
list.add(ByteString.copyFrom(key));
}
try (KVClient client = new KVClient(session.getConf(), session.getRegionStoreClientBuilder())) {
try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) {
List<KvPair> kvPairList =
client.batchGet(
ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion());

View File

@ -40,7 +40,9 @@ public class TiConfiguration implements Serializable {
private static final int MAX_REQUEST_KEY_RANGE_SIZE = 20000;
private static final int DEF_INDEX_SCAN_CONCURRENCY = 5;
private static final int DEF_TABLE_SCAN_CONCURRENCY = 512;
private static final int DEF_BATCH_GET_CONCURRENCY = 20;
private static final int DEF_BATCH_PUT_CONCURRENCY = 20;
private static final int DEF_BATCH_SCAN_CONCURRENCY = 5;
private static final CommandPri DEF_COMMAND_PRIORITY = CommandPri.Low;
private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.SI;
private static final boolean DEF_SHOW_ROWID = false;
@ -65,7 +67,9 @@ public class TiConfiguration implements Serializable {
private int downgradeThreshold = DEF_REGION_SCAN_DOWNGRADE_THRESHOLD;
private int indexScanConcurrency = DEF_INDEX_SCAN_CONCURRENCY;
private int tableScanConcurrency = DEF_TABLE_SCAN_CONCURRENCY;
private int batchGetConcurrency = DEF_BATCH_GET_CONCURRENCY;
private int batchPutConcurrency = DEF_BATCH_PUT_CONCURRENCY;
private int batchScanConcurrency = DEF_BATCH_SCAN_CONCURRENCY;
private CommandPri commandPriority = DEF_COMMAND_PRIORITY;
private IsolationLevel isolationLevel = DEF_ISOLATION_LEVEL;
private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE;
@ -204,6 +208,14 @@ public class TiConfiguration implements Serializable {
this.tableScanConcurrency = tableScanConcurrency;
}
public int getBatchGetConcurrency() {
return batchGetConcurrency;
}
public void setBatchGetConcurrency(int batchGetConcurrency) {
this.batchGetConcurrency = batchGetConcurrency;
}
public int getBatchPutConcurrency() {
return batchPutConcurrency;
}
@ -212,6 +224,14 @@ public class TiConfiguration implements Serializable {
this.batchPutConcurrency = batchPutConcurrency;
}
public int getBatchScanConcurrency() {
return batchScanConcurrency;
}
public void setBatchScanConcurrency(int batchScanConcurrency) {
this.batchScanConcurrency = batchScanConcurrency;
}
public CommandPri getCommandPriority() {
return commandPriority;
}

View File

@ -59,7 +59,9 @@ public class TiSession implements AutoCloseable {
private volatile Catalog catalog;
private volatile ExecutorService indexScanThreadPool;
private volatile ExecutorService tableScanThreadPool;
private volatile ExecutorService batchGetThreadPool;
private volatile ExecutorService batchPutThreadPool;
private volatile ExecutorService batchScanThreadPool;
private volatile RegionManager regionManager;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private boolean isClosed = false;
@ -230,6 +232,38 @@ public class TiSession implements AutoCloseable {
return res;
}
public ExecutorService getThreadPoolForBatchGet() {
ExecutorService res = batchGetThreadPool;
if (res == null) {
synchronized (this) {
if (batchGetThreadPool == null) {
batchGetThreadPool =
Executors.newFixedThreadPool(
conf.getBatchGetConcurrency(),
new ThreadFactoryBuilder().setDaemon(true).build());
}
res = batchGetThreadPool;
}
}
return res;
}
public ExecutorService getThreadPoolForBatchScan() {
ExecutorService res = batchScanThreadPool;
if (res == null) {
synchronized (this) {
if (batchScanThreadPool == null) {
batchScanThreadPool =
Executors.newFixedThreadPool(
conf.getBatchScanConcurrency(),
new ThreadFactoryBuilder().setDaemon(true).build());
}
res = batchScanThreadPool;
}
}
return res;
}
@VisibleForTesting
public ChannelFactory getChannelFactory() {
return channelFactory;

View File

@ -64,7 +64,7 @@ public class ConcreteScanIterator extends ScanIterator {
ByteString endKey,
long version,
int limit) {
super(conf, builder, startKey, endKey, limit);
super(conf, builder, startKey, endKey, limit, false);
this.version = version;
}

View File

@ -35,8 +35,9 @@ public class RawScanIterator extends ScanIterator {
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
int limit) {
super(conf, builder, startKey, endKey, limit);
int limit,
boolean keyOnly) {
super(conf, builder, startKey, endKey, limit, keyOnly);
}
TiRegion loadCurrentRegionToCache() throws GrpcException {
@ -48,7 +49,7 @@ public class RawScanIterator extends ScanIterator {
currentCache = null;
} else {
try {
currentCache = client.rawScan(backOffer, startKey, limit);
currentCache = client.rawScan(backOffer, startKey, limit, keyOnly);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
continue;

View File

@ -35,6 +35,7 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
protected ByteString startKey;
protected int index = -1;
protected int limit;
protected boolean keyOnly;
protected boolean endOfScan = false;
protected Key endKey;
@ -46,7 +47,8 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
int limit) {
int limit,
boolean keyOnly) {
this.startKey = requireNonNull(startKey, "start key is null");
if (startKey.isEmpty()) {
throw new IllegalArgumentException("start key cannot be empty");
@ -54,6 +56,7 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
this.hasEndKey = !endKey.equals(ByteString.EMPTY);
this.limit = limit;
this.keyOnly = keyOnly;
this.conf = conf;
this.builder = builder;
}

View File

@ -887,6 +887,38 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
}
public List<KvPair> rawBatchGet(BackOffer backoffer, List<ByteString> keys) {
if (keys.isEmpty()) {
return new ArrayList<>();
}
Supplier<RawBatchGetRequest> factory =
() ->
RawBatchGetRequest.newBuilder()
.setContext(region.getContext())
.addAllKeys(keys)
.build();
KVErrorHandler<RawBatchGetResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawBatchGetResponse resp =
callWithRetry(backoffer, TikvGrpc.getRawBatchGetMethod(), factory, handler);
return handleRawBatchGet(resp);
}
private List<KvPair> handleRawBatchGet(RawBatchGetResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawBatchPutResponse failed without a cause");
}
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}
return resp.getPairsList();
}
public void rawBatchPut(BackOffer backOffer, List<KvPair> kvPairs) {
if (kvPairs.isEmpty()) {
return;
@ -908,6 +940,15 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
handleRawBatchPut(resp);
}
public void rawBatchPut(BackOffer backOffer, Batch batch) {
List<KvPair> pairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
pairs.add(
KvPair.newBuilder().setKey(batch.keys.get(i)).setValue(batch.values.get(i)).build());
}
rawBatchPut(backOffer, pairs);
}
private void handleRawBatchPut(RawBatchPutResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(region);
@ -927,7 +968,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
* @param keyOnly true if value of KvPair is not needed
* @return KvPair list
*/
private List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
Supplier<RawScanRequest> factory =
() ->
RawScanRequest.newBuilder()
@ -947,12 +988,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
return rawScanHelper(resp);
}
public List<KvPair> rawScan(BackOffer backOffer, ByteString key) {
return rawScan(backOffer, key, getConf().getScanBatchSize());
}
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit) {
return rawScan(backOffer, key, limit, false);
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, boolean keyOnly) {
return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly);
}
private List<KvPair> rawScanHelper(RawScanResponse resp) {

View File

@ -0,0 +1,53 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common.util;
import com.google.protobuf.ByteString;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.tikv.common.region.TiRegion;
/** A Batch containing the region, a list of keys and/or values to send */
public class Batch {
public final TiRegion region;
public final List<ByteString> keys;
public final List<ByteString> values;
public final Map<ByteString, ByteString> map;
public Batch(TiRegion region, List<ByteString> keys) {
this.region = region;
this.keys = keys;
this.values = null;
this.map = null;
}
public Batch(TiRegion region, List<ByteString> keys, List<ByteString> values) {
this.region = region;
this.keys = keys;
this.values = values;
this.map = toMap(keys, values);
}
private Map<ByteString, ByteString> toMap(List<ByteString> keys, List<ByteString> values) {
assert keys.size() == values.size();
Map<ByteString, ByteString> kvMap = new HashMap<>();
for (int i = 0; i < keys.size(); i++) {
kvMap.put(keys.get(i), values.get(i));
}
return kvMap;
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import org.tikv.common.exception.TiKVException;
import org.tikv.kvproto.Kvrpcpb;
public class ClientUtils {
public static List<Kvrpcpb.KvPair> getKvPairs(
ExecutorCompletionService<List<Kvrpcpb.KvPair>> completionService, List<Batch> batches) {
try {
List<Kvrpcpb.KvPair> result = new ArrayList<>();
for (int i = 0; i < batches.size(); i++) {
result.addAll(completionService.take().get());
}
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 2020 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common.util;
import com.google.protobuf.ByteString;
/** if Limit of a ScanBatch is 0, it means to scan all keys in */
public class ScanOption {
private final ByteString startKey;
private final ByteString endKey;
private final int limit;
private final boolean keyOnly;
private ScanOption(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
this.startKey = startKey;
this.endKey = endKey;
this.limit = limit;
this.keyOnly = keyOnly;
}
public static ScanOptionBuilder newBuilder() {
return new ScanOptionBuilder();
}
public ByteString getStartKey() {
return startKey;
}
public ByteString getEndKey() {
return endKey;
}
public int getLimit() {
return limit;
}
public boolean isKeyOnly() {
return keyOnly;
}
public static class ScanOptionBuilder {
private ByteString startKey;
private ByteString endKey;
private int limit;
private boolean keyOnly;
private ScanOptionBuilder() {
this.startKey = ByteString.EMPTY;
this.endKey = ByteString.EMPTY;
this.limit = 0;
this.keyOnly = false;
}
public ScanOption build() {
return new ScanOption(startKey, endKey, limit, keyOnly);
}
public ScanOptionBuilder setStartKey(ByteString startKey) {
this.startKey = startKey;
return this;
}
public ScanOptionBuilder setEndKey(ByteString endKey) {
this.endKey = endKey;
return this;
}
public ScanOptionBuilder setLimit(int limit) {
this.limit = limit;
return this;
}
public ScanOptionBuilder setKeyOnly(boolean keyOnly) {
this.keyOnly = keyOnly;
return this;
}
}
}

View File

@ -15,6 +15,8 @@
package org.tikv.raw;
import static org.tikv.common.util.ClientUtils.getKvPairs;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.HashMap;
@ -23,10 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,21 +38,23 @@ import org.tikv.common.operation.iterator.RawScanIterator;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.common.util.*;
import org.tikv.kvproto.Kvrpcpb.KvPair;
public class RawKVClient implements AutoCloseable {
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final ExecutorCompletionService<Object> completionService;
private final ExecutorService batchGetThreadPool;
private final ExecutorService batchPutThreadPool;
private final ExecutorService batchScanThreadPool;
private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class);
private static final int MAX_RETRY_LIMIT = 3;
// https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go
private static final int MAX_RAW_SCAN_LIMIT = 10240;
private static final int RAW_BATCH_PUT_SIZE = 16 * 1024;
private static final int RAW_BATCH_GET_SIZE = 16 * 1024;
private static final int RAW_BATCH_SCAN_SIZE = 16;
private static final int RAW_BATCH_PAIR_COUNT = 512;
private static final TiKVException ERR_RETRY_LIMIT_EXCEEDED =
@ -66,7 +67,9 @@ public class RawKVClient implements AutoCloseable {
Objects.requireNonNull(clientBuilder, "clientBuilder is null");
this.conf = session.getConf();
this.clientBuilder = clientBuilder;
this.completionService = new ExecutorCompletionService<>(session.getThreadPoolForBatchPut());
this.batchGetThreadPool = session.getThreadPoolForBatchGet();
this.batchPutThreadPool = session.getThreadPoolForBatchPut();
this.batchScanThreadPool = session.getThreadPoolForBatchScan();
}
@Override
@ -98,27 +101,12 @@ public class RawKVClient implements AutoCloseable {
* @param kvPairs kvPairs
*/
public void batchPut(Map<ByteString, ByteString> kvPairs) {
batchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs);
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs);
}
private void batchPut(BackOffer backOffer, List<ByteString> keys, List<ByteString> values) {
Map<ByteString, ByteString> keysToValues = mapKeysToValues(keys, values);
batchPut(backOffer, keysToValues);
}
private void batchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(kvPairs.keySet());
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
batches,
entry.getKey(),
entry.getValue(),
entry.getValue().stream().map(kvPairs::get).collect(Collectors.toList()),
RAW_BATCH_PUT_SIZE);
}
sendBatchPut(backOffer, batches);
doSendBatchPut(backOffer, keysToValues);
}
/**
@ -140,6 +128,50 @@ public class RawKVClient implements AutoCloseable {
throw ERR_RETRY_LIMIT_EXCEEDED;
}
/**
* Get a list of raw key-value pair from TiKV if key exists
*
* @param keys list of raw key
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
public List<KvPair> batchGet(List<ByteString> keys) {
BackOffer backOffer = defaultBackOff();
return doSendBatchGet(backOffer, keys);
}
public List<List<KvPair>> batchScan(List<ScanOption> ranges) {
if (ranges.isEmpty()) {
return new ArrayList<>();
}
ExecutorCompletionService<Pair<Integer, List<KvPair>>> completionService =
new ExecutorCompletionService<>(batchScanThreadPool);
int num = 0;
for (ScanOption scanOption : ranges) {
int i = num;
completionService.submit(() -> Pair.create(i, scan(scanOption)));
++num;
}
List<List<KvPair>> scanResults = new ArrayList<>();
for (int i = 0; i < num; i++) {
scanResults.add(new ArrayList<>());
}
for (int i = 0; i < num; i++) {
try {
Pair<Integer, List<KvPair>> scanResult =
completionService.take().get(BackOffer.RAWKV_MAX_BACKOFF, TimeUnit.SECONDS);
scanResults.set(scanResult.first, scanResult.second);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}
return scanResults;
}
/**
* Scan raw key-value pairs from TiKV in range [startKey, endKey)
*
@ -148,26 +180,31 @@ public class RawKVClient implements AutoCloseable {
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
Iterator<Kvrpcpb.KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, limit);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
return scan(startKey, endKey, limit, false);
}
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
Iterator<KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly);
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
}
/**
* Scan raw key-value pairs from TiKV in range [startKey, endKey)
* Scan raw key-value pairs from TiKV in range [startKey, )
*
* @param startKey raw start key, inclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit) {
Iterator<Kvrpcpb.KvPair> iterator = rawScanIterator(conf, clientBuilder, startKey, limit);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
public List<KvPair> scan(ByteString startKey, int limit) {
return scan(startKey, limit, false);
}
public List<KvPair> scan(ByteString startKey, int limit, boolean keyOnly) {
return scan(startKey, ByteString.EMPTY, limit, keyOnly);
}
/**
@ -177,11 +214,15 @@ public class RawKVClient implements AutoCloseable {
* @param endKey raw end key, exclusive
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey) {
List<Kvrpcpb.KvPair> result = new ArrayList<>();
public List<KvPair> scan(ByteString startKey, ByteString endKey) {
return scan(startKey, endKey, false);
}
public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
List<KvPair> result = new ArrayList<>();
while (true) {
Iterator<Kvrpcpb.KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, conf.getScanBatchSize());
Iterator<KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly);
if (!iterator.hasNext()) {
break;
}
@ -191,6 +232,14 @@ public class RawKVClient implements AutoCloseable {
return result;
}
private List<KvPair> scan(ScanOption scanOption) {
ByteString startKey = scanOption.getStartKey();
ByteString endKey = scanOption.getEndKey();
int limit = scanOption.getLimit();
boolean keyOnly = scanOption.isKeyOnly();
return scan(startKey, endKey, limit, keyOnly);
}
/**
* Delete a raw key-value pair from TiKV if key exists
*
@ -209,19 +258,6 @@ public class RawKVClient implements AutoCloseable {
}
}
/** A Batch containing the region, a list of keys and/or values to send */
private static final class Batch {
private final TiRegion region;
private final List<ByteString> keys;
private final List<ByteString> values;
public Batch(TiRegion region, List<ByteString> keys, List<ByteString> values) {
this.region = region;
this.keys = keys;
this.values = values;
}
}
/**
* Append batch to list and split them according to batch limit
*
@ -253,6 +289,155 @@ public class RawKVClient implements AutoCloseable {
}
}
private void appendBatches(
List<Batch> batches, TiRegion region, List<ByteString> keys, int limit) {
List<ByteString> tmpKeys = new ArrayList<>();
for (int i = 0; i < keys.size(); i++) {
if (i >= limit) {
batches.add(new Batch(region, tmpKeys));
tmpKeys.clear();
}
tmpKeys.add(keys.get(i));
}
if (!tmpKeys.isEmpty()) {
batches.add(new Batch(region, tmpKeys));
}
}
private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs) {
ExecutorCompletionService<Object> completionService =
new ExecutorCompletionService<>(batchPutThreadPool);
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(kvPairs.keySet());
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
batches,
entry.getKey(),
entry.getValue(),
entry.getValue().stream().map(kvPairs::get).collect(Collectors.toList()),
RAW_BATCH_PUT_SIZE);
}
for (Batch batch : batches) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(() -> doSendBatchPutInBatchesWithRetry(singleBatchBackOffer, batch));
}
try {
for (int i = 0; i < batches.size(); i++) {
completionService.take().get(BackOffer.RAWKV_MAX_BACKOFF, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}
private Object doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch) {
TiRegion oldRegion = batch.region;
TiRegion currentRegion =
clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey());
if (oldRegion.equals(currentRegion)) {
try (RegionStoreClient client = clientBuilder.build(batch.region); ) {
client.rawBatchPut(backOffer, batch);
return null;
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("ReSplitting ranges for BatchPutRequest");
// retry
return doSendBatchPutWithRefetchRegion(backOffer, batch);
}
} else {
return doSendBatchPutWithRefetchRegion(backOffer, batch);
}
}
private Object doSendBatchPutWithRefetchRegion(BackOffer backOffer, Batch batch) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(batch.keys);
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
retryBatches,
entry.getKey(),
entry.getValue(),
entry.getValue().stream().map(batch.map::get).collect(Collectors.toList()),
RAW_BATCH_PUT_SIZE);
}
for (Batch retryBatch : retryBatches) {
// recursive calls
doSendBatchPutInBatchesWithRetry(backOffer, retryBatch);
}
return null;
}
private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys) {
ExecutorCompletionService<List<KvPair>> completionService =
new ExecutorCompletionService<>(batchGetThreadPool);
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(keys);
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(batches, entry.getKey(), entry.getValue(), RAW_BATCH_GET_SIZE);
}
for (Batch batch : batches) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch));
}
return getKvPairs(completionService, batches);
}
private List<KvPair> doSendBatchGetInBatchesWithRetry(BackOffer backOffer, Batch batch) {
TiRegion oldRegion = batch.region;
TiRegion currentRegion =
clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey());
if (oldRegion.equals(currentRegion)) {
RegionStoreClient client = clientBuilder.build(batch.region);
try {
return client.rawBatchGet(backOffer, batch.keys);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
logger.warn("ReSplitting ranges for BatchGetRequest", e);
// retry
return doSendBatchGetWithRefetchRegion(backOffer, batch);
}
} else {
return doSendBatchGetWithRefetchRegion(backOffer, batch);
}
}
private List<KvPair> doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch batch) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(batch.keys);
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(retryBatches, entry.getKey(), entry.getValue(), RAW_BATCH_GET_SIZE);
}
ArrayList<KvPair> results = new ArrayList<>();
for (Batch retryBatch : retryBatches) {
// recursive calls
List<KvPair> batchResult = doSendBatchGetInBatchesWithRetry(backOffer, retryBatch);
results.addAll(batchResult);
}
return results;
}
/**
* Group by list of keys according to its region
*
@ -271,6 +456,11 @@ public class RawKVClient implements AutoCloseable {
return groups;
}
private Map<TiRegion, List<ByteString>> groupKeysByRegion(List<ByteString> keys) {
return keys.stream()
.collect(Collectors.groupingBy(clientBuilder.getRegionManager()::getRegionByKey));
}
private static Map<ByteString, ByteString> mapKeysToValues(
List<ByteString> keys, List<ByteString> values) {
Map<ByteString, ByteString> map = new HashMap<>();
@ -280,66 +470,17 @@ public class RawKVClient implements AutoCloseable {
return map;
}
/**
* Send batchPut request concurrently
*
* @param backOffer current backOffer
* @param batches list of batch to send
*/
private void sendBatchPut(BackOffer backOffer, List<Batch> batches) {
for (Batch batch : batches) {
completionService.submit(
() -> {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
List<Kvrpcpb.KvPair> kvPairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
kvPairs.add(
Kvrpcpb.KvPair.newBuilder()
.setKey(batch.keys.get(i))
.setValue(batch.values.get(i))
.build());
}
try (RegionStoreClient client = clientBuilder.build(batch.region); ) {
client.rawBatchPut(singleBatchBackOffer, kvPairs);
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
singleBatchBackOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("ReSplitting ranges for BatchPutRequest");
// recursive calls
batchPut(singleBatchBackOffer, batch.keys, batch.values);
}
return null;
});
}
try {
for (int i = 0; i < batches.size(); i++) {
completionService.take().get(BackOffer.RAWKV_MAX_BACKOFF, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}
private Iterator<Kvrpcpb.KvPair> rawScanIterator(
private Iterator<KvPair> rawScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
int limit) {
int limit,
boolean keyOnly) {
if (limit > MAX_RAW_SCAN_LIMIT) {
throw ERR_MAX_SCAN_LIMIT_EXCEEDED;
}
return new RawScanIterator(conf, builder, startKey, endKey, limit);
}
private Iterator<Kvrpcpb.KvPair> rawScanIterator(
TiConfiguration conf, RegionStoreClientBuilder builder, ByteString startKey, int limit) {
return rawScanIterator(conf, builder, startKey, ByteString.EMPTY, limit);
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly);
}
private BackOffer defaultBackOff() {

View File

@ -15,10 +15,11 @@
package org.tikv.txn;
import static org.tikv.common.util.ClientUtils.getKvPairs;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -34,6 +35,7 @@ import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.Batch;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;
@ -147,18 +149,7 @@ public class KVClient implements AutoCloseable {
() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch, version));
}
try {
List<Kvrpcpb.KvPair> result = new ArrayList<>();
for (int i = 0; i < batches.size(); i++) {
result.addAll(completionService.take().get());
}
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
return getKvPairs(completionService, batches);
}
private List<Kvrpcpb.KvPair> doSendBatchGetInBatchesWithRetry(
@ -257,15 +248,4 @@ public class KVClient implements AutoCloseable {
int limit) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
}
/** A Batch containing the region and a list of keys to send */
private static final class Batch {
private final TiRegion region;
private final List<ByteString> keys;
Batch(TiRegion region, List<ByteString> keys) {
this.region = region;
this.keys = keys;
}
}
}

View File

@ -17,6 +17,7 @@ import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.util.FastByteComparisons;
import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb;
public class RawKVClientTest {
@ -26,7 +27,8 @@ public class RawKVClientTest {
private static final int TEST_CASES = 10000;
private static final int WORKER_CNT = 100;
private static final ByteString RAW_START_KEY = ByteString.copyFromUtf8(RAW_PREFIX);
private static final ByteString RAW_END_KEY = Key.toRawKey(RAW_START_KEY).next().toByteString();
private static final ByteString RAW_END_KEY =
Key.toRawKey(RAW_START_KEY).nextPrefix().toByteString();
private RawKVClient client;
private static final List<ByteString> orderedKeys;
private static final List<ByteString> randomKeys;
@ -130,15 +132,15 @@ public class RawKVClientTest {
@Test
public void validate() {
if (!initialized) return;
baseTest(100, 100, 100, 100, false, false);
baseTest(100, 100, 100, 100, false, true);
baseTest(100, 100, 100, 100, false, false, false, false);
baseTest(100, 100, 100, 100, false, true, true, true);
}
/** Example of benchmarking base test */
public void benchmark() {
if (!initialized) return;
baseTest(TEST_CASES, TEST_CASES, 200, 5000, true, false);
baseTest(TEST_CASES, TEST_CASES, 200, 5000, true, true);
baseTest(TEST_CASES, TEST_CASES, 200, 5000, true, false, false, false);
baseTest(TEST_CASES, TEST_CASES, 200, 5000, true, true, true, true);
}
private void baseTest(
@ -147,7 +149,9 @@ public class RawKVClientTest {
int scanCases,
int deleteCases,
boolean benchmark,
boolean batchPut) {
boolean batchPut,
boolean batchGet,
boolean batchScan) {
if (putCases > KEY_POOL_SIZE) {
logger.info("Number of distinct orderedKeys required exceeded pool size " + KEY_POOL_SIZE);
return;
@ -165,8 +169,16 @@ public class RawKVClientTest {
} else {
rawPutTest(putCases, benchmark);
}
rawGetTest(getCases, benchmark);
rawScanTest(scanCases, benchmark);
if (batchGet) {
rawBatchGetTest(getCases, benchmark);
} else {
rawGetTest(getCases, benchmark);
}
if (batchScan) {
rawBatchScanTest(scanCases, benchmark);
} else {
rawScanTest(scanCases, benchmark);
}
rawDeleteTest(deleteCases, benchmark);
prepare();
@ -338,8 +350,44 @@ public class RawKVClientTest {
}
}
private void rawBatchGetTest(int getCases, boolean benchmark) {
logger.info("batchGet testing");
if (benchmark) {
long start = System.currentTimeMillis();
int base = getCases / WORKER_CNT;
for (int cnt = 0; cnt < WORKER_CNT; cnt++) {
int i = cnt;
completionService.submit(
() -> {
List<ByteString> keys = new ArrayList<>();
for (int j = 0; j < base; j++) {
int num = i * base + j;
ByteString key = orderedKeys.get(num);
keys.add(key);
}
client.batchGet(keys);
return null;
});
}
awaitTimeOut(200);
long end = System.currentTimeMillis();
logger.info(getCases + " batchGet: " + (end - start) / 1000.0 + "s");
} else {
int i = 0;
List<ByteString> keys = new ArrayList<>();
for (Map.Entry<ByteString, ByteString> pair : data.entrySet()) {
keys.add(pair.getKey());
i++;
if (i >= getCases) {
break;
}
}
checkBatchGet(keys);
}
}
private void rawScanTest(int scanCases, boolean benchmark) {
logger.info("rawScan testing");
logger.info("scan testing");
if (benchmark) {
long start = System.currentTimeMillis();
int base = scanCases / WORKER_CNT;
@ -377,6 +425,57 @@ public class RawKVClientTest {
}
}
private void rawBatchScanTest(int scanCases, boolean benchmark) {
logger.info("batchScan testing");
if (benchmark) {
long start = System.currentTimeMillis();
int base = scanCases / WORKER_CNT;
for (int cnt = 0; cnt < WORKER_CNT; cnt++) {
int i = cnt;
completionService.submit(
() -> {
List<ScanOption> scanOptions = new ArrayList<>();
for (int j = 0; j < base; j++) {
int num = i * base + j;
ByteString startKey = randomKeys.get(num), endKey = randomKeys.get(num + 1);
if (bsc.compare(startKey, endKey) > 0) {
ByteString tmp = startKey;
startKey = endKey;
endKey = tmp;
}
ScanOption scanOption =
ScanOption.newBuilder()
.setStartKey(startKey)
.setEndKey(endKey)
.setLimit(limit)
.build();
scanOptions.add(scanOption);
}
client.batchScan(scanOptions);
return null;
});
}
awaitTimeOut(200);
long end = System.currentTimeMillis();
logger.info(scanCases + " batchScan: " + (end - start) / 1000.0 + "s");
} else {
List<ScanOption> scanOptions = new ArrayList<>();
for (int i = 0; i < scanCases; i++) {
ByteString startKey = randomKeys.get(r.nextInt(KEY_POOL_SIZE)),
endKey = randomKeys.get(r.nextInt(KEY_POOL_SIZE));
if (bsc.compare(startKey, endKey) > 0) {
ByteString tmp = startKey;
startKey = endKey;
endKey = tmp;
}
ScanOption scanOption =
ScanOption.newBuilder().setStartKey(startKey).setEndKey(endKey).setLimit(limit).build();
scanOptions.add(scanOption);
}
checkBatchScan(scanOptions);
}
}
private void rawDeleteTest(int deleteCases, boolean benchmark) {
logger.info("delete testing");
if (benchmark) {
@ -409,6 +508,14 @@ public class RawKVClientTest {
}
}
private void checkBatchGet(List<ByteString> keys) {
List<Kvrpcpb.KvPair> result = client.batchGet(keys);
for (Kvrpcpb.KvPair kvPair : result) {
assert data.containsKey(kvPair.getKey());
assert kvPair.getValue().equals(data.get(kvPair.getKey()));
}
}
private void checkPut(ByteString key, ByteString value) {
client.put(key, value);
assert client.get(key).equals(value);
@ -445,6 +552,26 @@ public class RawKVClientTest {
limit);
}
private void checkBatchScan(List<ScanOption> scanOptions) {
List<List<Kvrpcpb.KvPair>> result = client.batchScan(scanOptions);
int i = 0;
for (ScanOption scanOption : scanOptions) {
List<Kvrpcpb.KvPair> partialResult =
data.subMap(scanOption.getStartKey(), scanOption.getEndKey())
.entrySet()
.stream()
.map(
kvPair ->
Kvrpcpb.KvPair.newBuilder()
.setKey(kvPair.getKey())
.setValue(kvPair.getValue())
.build())
.collect(Collectors.toList());
assert result.get(i).equals(partialResult);
i++;
}
}
private void checkDelete(ByteString key) {
client.delete(key);
checkEmpty(key);