add multi-thread for batchPut

Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
birdstorm 2018-12-19 16:22:32 +08:00
parent 2a50639074
commit 106a7c3b3f
3 changed files with 150 additions and 42 deletions

View File

@ -45,6 +45,7 @@ public class TiConfiguration implements Serializable {
private static final boolean DEF_SHOW_ROWID = false;
private static final String DEF_DB_PREFIX = "";
private static final String DEF_KV_MODE = "KV";
private static final int DEF_RAW_CLIENT_CONCURRENCY = 200;
private int timeout = DEF_TIMEOUT;
private TimeUnit timeoutUnit = DEF_TIMEOUT_UNIT;
@ -63,6 +64,7 @@ public class TiConfiguration implements Serializable {
private boolean showRowId = DEF_SHOW_ROWID;
private String dbPrefix = DEF_DB_PREFIX;
private String kvMode = DEF_KV_MODE;
private int rawClientConcurrency = DEF_RAW_CLIENT_CONCURRENCY;
public static TiConfiguration createDefault(String pdAddrsStr) {
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
@ -234,4 +236,12 @@ public class TiConfiguration implements Serializable {
public void setKvMode(String kvMode) {
this.kvMode = kvMode;
}
public int getRawClientConcurrency() {
return rawClientConcurrency;
}
public void setRawClientConcurrency(int rawClientConcurrency) {
this.rawClientConcurrency = rawClientConcurrency;
}
}

View File

@ -16,6 +16,10 @@
package org.tikv.raw;
import com.google.protobuf.ByteString;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.apache.log4j.Logger;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.TiKVException;
@ -30,16 +34,21 @@ import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
import java.util.*;
public class RawKVClient implements AutoCloseable {
private static final String DEFAULT_PD_ADDRESS = "127.0.0.1:2379";
private final TiSession session;
private final RegionManager regionManager;
private final ExecutorCompletionService<Object> completionService;
private static final Logger logger = Logger.getLogger(RawKVClient.class);
private static final int RAW_BATCH_PUT_SIZE = 16 * 1024;
private RawKVClient(String addresses) {
session = TiSession.create(TiConfiguration.createRawDefault(addresses));
regionManager = session.getRegionManager();
ExecutorService executors =
Executors.newFixedThreadPool(session.getConf().getRawClientConcurrency());
completionService = new ExecutorCompletionService<>(executors);
}
private RawKVClient() {
@ -79,41 +88,68 @@ public class RawKVClient implements AutoCloseable {
}
}
public void batchPut(List<ByteString> keys, List<ByteString> values) {
batchPut(ConcreteBackOffer.newRawKVBackOff(), keys, values);
}
/**
* Put a list of raw key-value pair to TiKV
*
* @param kvPairs kvPairs
* @param keys keys
* @param values values
*/
public void batchPut(List<Kvrpcpb.KvPair> kvPairs) {
BackOffer backOffer = defaultBackOff();
List<Kvrpcpb.KvPair> remainingPairs = new ArrayList<>();
while (true) {
kvPairs.addAll(remainingPairs);
remainingPairs.clear();
Map<Pair<TiRegion, Metapb.Store>, List<Kvrpcpb.KvPair>> regionMap = new HashMap<>();
for (Kvrpcpb.KvPair kvPair : kvPairs) {
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByKey(kvPair.getKey());
regionMap.computeIfAbsent(pair, t -> new ArrayList<>()).add(kvPair);
}
public void batchPut(BackOffer backOffer, List<ByteString> keys, List<ByteString> values) {
Map<ByteString, ByteString> keysToValues = mapKeysToValues(keys, values);
Map<Long, List<ByteString>> groupKeys = groupKeysByRegion(keys);
keys.clear();
List<Batch> batches = new ArrayList<>();
for (Map.Entry<Pair<TiRegion, Metapb.Store>, List<Kvrpcpb.KvPair>> entry :
regionMap.entrySet()) {
RegionStoreClient client =
RegionStoreClient.create(entry.getKey().first, entry.getKey().second, session);
try {
client.rawBatchPut(defaultBackOff(), entry.getValue());
} catch (final TiKVException e) {
remainingPairs.addAll(entry.getValue());
}
for (Map.Entry<Long, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
batches,
entry.getKey(),
entry.getValue(),
entry.getValue().stream().map(keysToValues::get).collect(Collectors.toList()),
RAW_BATCH_PUT_SIZE);
}
for (Batch batch : batches) {
completionService.submit(
() -> {
Pair<TiRegion, Metapb.Store> pair =
regionManager.getRegionStorePairByRegionId(batch.regionId);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
List<Kvrpcpb.KvPair> kvPairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
kvPairs.add(
Kvrpcpb.KvPair.newBuilder()
.setKey(batch.keys.get(i))
.setValue(batch.values.get(i))
.build());
}
try {
client.rawBatchPut(backOffer, kvPairs);
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("ReSplitting ranges for BatchPutRequest");
// recursive calls
batchPut(backOffer, batch.keys, batch.values);
}
return null;
});
}
try {
for (int i = 0; i < batches.size(); i++) {
completionService.take().get(BackOffer.rawkvMaxBackoff, TimeUnit.SECONDS);
}
if (remainingPairs.isEmpty()) {
return;
}
// re-splitting ranges
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss,
new TiKVException("BatchPut encounter exception, need re-split the ranges"));
kvPairs.clear();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}
@ -190,6 +226,61 @@ public class RawKVClient implements AutoCloseable {
}
}
private class Batch {
private long regionId;
private List<ByteString> keys;
private List<ByteString> values;
public Batch(long regionId, List<ByteString> keys, List<ByteString> values) {
this.regionId = regionId;
this.keys = keys;
this.values = values;
}
}
private void appendBatches(
List<Batch> batches,
long regionId,
List<ByteString> keys,
List<ByteString> values,
int limit) {
List<ByteString> tmpKeys = new ArrayList<>();
List<ByteString> tmpValues = new ArrayList<>();
for (int i = 0; i < keys.size(); i++) {
if (i >= limit) {
batches.add(new Batch(regionId, tmpKeys, tmpValues));
tmpKeys.clear();
tmpValues.clear();
}
tmpKeys.add(keys.get(i));
tmpValues.add(values.get(i));
}
if (!tmpKeys.isEmpty()) {
batches.add(new Batch(regionId, tmpKeys, tmpValues));
}
}
private Map<Long, List<ByteString>> groupKeysByRegion(List<ByteString> keys) {
Map<Long, List<ByteString>> groups = new HashMap<>();
TiRegion lastRegion = null;
for (ByteString key : keys) {
if (lastRegion == null || !lastRegion.contains(key)) {
lastRegion = regionManager.getRegionByKey(key);
}
groups.computeIfAbsent(lastRegion.getId(), k -> new ArrayList<>()).add(key);
}
return groups;
}
static Map<ByteString, ByteString> mapKeysToValues(
List<ByteString> keys, List<ByteString> values) {
Map<ByteString, ByteString> map = new HashMap<>();
for (int i = 0; i < keys.size(); i++) {
map.put(keys.get(i), values.get(i));
}
return map;
}
private Iterator<Kvrpcpb.KvPair> rawScanIterator(ByteString startKey, ByteString endKey) {
return new RawScanIterator(startKey, endKey, Integer.MAX_VALUE, session);
}

View File

@ -1,5 +1,7 @@
package org.tikv.raw;
import static org.tikv.raw.RawKVClient.mapKeysToValues;
import com.google.protobuf.ByteString;
import java.util.*;
import java.util.concurrent.*;
@ -244,7 +246,7 @@ public class RawKVClientTest {
}
private void rawBatchPutTest(int putCases, boolean benchmark) {
System.out.println("put testing");
System.out.println("batchPut testing");
if (benchmark) {
for (int i = 0; i < putCases; i++) {
ByteString key = orderedKeys.get(i), value = values.get(i);
@ -257,13 +259,15 @@ public class RawKVClientTest {
int i = cnt;
completionService.submit(
() -> {
List<Kvrpcpb.KvPair> list = new ArrayList<>();
List<ByteString> keyList = new ArrayList<>();
List<ByteString> valueList = new ArrayList<>();
for (int j = 0; j < base; j++) {
int num = i * base + j;
ByteString key = orderedKeys.get(num), value = values.get(num);
list.add(Kvrpcpb.KvPair.newBuilder().setKey(key).setValue(value).build());
keyList.add(key);
valueList.add(value);
}
client.batchPut(list);
client.batchPut(keyList, valueList);
return null;
});
}
@ -278,13 +282,15 @@ public class RawKVClientTest {
+ " put="
+ rawKeys().size());
} else {
List<Kvrpcpb.KvPair> list = new ArrayList<>();
List<ByteString> keyList = new ArrayList<>();
List<ByteString> valueList = new ArrayList<>();
for (int i = 0; i < putCases; i++) {
ByteString key = randomKeys.get(i), value = values.get(r.nextInt(KEY_POOL_SIZE));
data.put(key, value);
list.add(Kvrpcpb.KvPair.newBuilder().setKey(key).setValue(value).build());
keyList.add(key);
valueList.add(value);
}
checkBatchPut(list);
checkBatchPut(keyList, valueList);
}
}
@ -396,10 +402,11 @@ public class RawKVClientTest {
assert client.get(key).equals(value);
}
private void checkBatchPut(List<Kvrpcpb.KvPair> pairs) {
client.batchPut(pairs);
for (Kvrpcpb.KvPair pair : pairs) {
assert client.get(pair.getKey()).equals(pair.getValue());
private void checkBatchPut(List<ByteString> keyList, List<ByteString> valueList) {
client.batchPut(keyList, valueList);
Map<ByteString, ByteString> keysToValues = mapKeysToValues(keyList, valueList);
for (ByteString key : keyList) {
assert client.get(key).equals(keysToValues.get(key));
}
}