fix raw batch put (#84)

Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
birdstorm 2021-01-14 18:01:15 +08:00 committed by GitHub
parent 4b07ecf0ac
commit 80eecbd348
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 10 deletions

View File

@ -40,6 +40,7 @@ public class TiConfiguration implements Serializable {
private static final int MAX_REQUEST_KEY_RANGE_SIZE = 20000;
private static final int DEF_INDEX_SCAN_CONCURRENCY = 5;
private static final int DEF_TABLE_SCAN_CONCURRENCY = 512;
private static final int DEF_BATCH_PUT_CONCURRENCY = 20;
private static final CommandPri DEF_COMMAND_PRIORITY = CommandPri.Low;
private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.SI;
private static final boolean DEF_SHOW_ROWID = false;
@ -64,6 +65,7 @@ public class TiConfiguration implements Serializable {
private int downgradeThreshold = DEF_REGION_SCAN_DOWNGRADE_THRESHOLD;
private int indexScanConcurrency = DEF_INDEX_SCAN_CONCURRENCY;
private int tableScanConcurrency = DEF_TABLE_SCAN_CONCURRENCY;
private int batchPutConcurrency = DEF_BATCH_PUT_CONCURRENCY;
private CommandPri commandPriority = DEF_COMMAND_PRIORITY;
private IsolationLevel isolationLevel = DEF_ISOLATION_LEVEL;
private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE;
@ -202,6 +204,14 @@ public class TiConfiguration implements Serializable {
this.tableScanConcurrency = tableScanConcurrency;
}
public int getBatchPutConcurrency() {
return batchPutConcurrency;
}
public void setBatchPutConcurrency(int batchPutConcurrency) {
this.batchPutConcurrency = batchPutConcurrency;
}
public CommandPri getCommandPriority() {
return commandPriority;
}

View File

@ -59,6 +59,7 @@ public class TiSession implements AutoCloseable {
private volatile Catalog catalog;
private volatile ExecutorService indexScanThreadPool;
private volatile ExecutorService tableScanThreadPool;
private volatile ExecutorService batchPutThreadPool;
private volatile RegionManager regionManager;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private boolean isClosed = false;
@ -91,7 +92,7 @@ public class TiSession implements AutoCloseable {
RegionManager regionMgr = new RegionManager(client);
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(conf, channelFactory, regionMgr, client);
return new RawKVClient(conf, builder);
return new RawKVClient(this, builder);
}
public KVClient createKVClient() {
@ -211,6 +212,22 @@ public class TiSession implements AutoCloseable {
return res;
}
public ExecutorService getThreadPoolForBatchPut() {
ExecutorService res = batchPutThreadPool;
if (res == null) {
synchronized (this) {
if (batchPutThreadPool == null) {
batchPutThreadPool =
Executors.newFixedThreadPool(
conf.getBatchPutConcurrency(),
new ThreadFactoryBuilder().setDaemon(true).build());
}
res = batchPutThreadPool;
}
}
return res;
}
@VisibleForTesting
public ChannelFactory getChannelFactory() {
return channelFactory;

View File

@ -25,14 +25,13 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.operation.iterator.RawScanIterator;
@ -61,13 +60,12 @@ public class RawKVClient implements AutoCloseable {
private static final TiKVException ERR_MAX_SCAN_LIMIT_EXCEEDED =
new TiKVException("limit should be less than MAX_RAW_SCAN_LIMIT");
public RawKVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) {
Objects.requireNonNull(conf, "conf is null");
public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
Objects.requireNonNull(session, "session is null");
Objects.requireNonNull(clientBuilder, "clientBuilder is null");
this.conf = conf;
this.conf = session.getConf();
this.clientBuilder = clientBuilder;
ExecutorService executors = Executors.newFixedThreadPool(conf.getRawClientConcurrency());
this.completionService = new ExecutorCompletionService<>(executors);
this.completionService = new ExecutorCompletionService<>(session.getThreadPoolForBatchPut());
}
@Override
@ -270,7 +268,6 @@ public class RawKVClient implements AutoCloseable {
for (Batch batch : batches) {
completionService.submit(
() -> {
RegionStoreClient client = clientBuilder.build(batch.region);
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
List<Kvrpcpb.KvPair> kvPairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
@ -280,7 +277,7 @@ public class RawKVClient implements AutoCloseable {
.setValue(batch.values.get(i))
.build());
}
try {
try (RegionStoreClient client = clientBuilder.build(batch.region); ) {
client.rawBatchPut(singleBatchBackOffer, kvPairs);
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?