From 25e0ab3b14cede6a8c8663b7626e187553439ed7 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Mon, 1 Feb 2021 11:38:12 +0800 Subject: [PATCH] Add Batch APIs (#118) --- src/main/java/org/tikv/common/KVClient.java | 50 +-- src/main/java/org/tikv/common/Snapshot.java | 4 +- .../java/org/tikv/common/TiConfiguration.java | 20 + src/main/java/org/tikv/common/TiSession.java | 34 ++ .../iterator/ConcreteScanIterator.java | 2 +- .../operation/iterator/RawScanIterator.java | 7 +- .../operation/iterator/ScanIterator.java | 5 +- .../tikv/common/region/RegionStoreClient.java | 51 ++- src/main/java/org/tikv/common/util/Batch.java | 53 +++ .../org/tikv/common/util/ClientUtils.java | 41 ++ .../java/org/tikv/common/util/ScanOption.java | 91 +++++ src/main/java/org/tikv/raw/RawKVClient.java | 355 ++++++++++++------ src/main/java/org/tikv/txn/KVClient.java | 28 +- .../java/org/tikv/raw/RawKVClientTest.java | 145 ++++++- 14 files changed, 692 insertions(+), 194 deletions(-) create mode 100644 src/main/java/org/tikv/common/util/Batch.java create mode 100644 src/main/java/org/tikv/common/util/ClientUtils.java create mode 100644 src/main/java/org/tikv/common/util/ScanOption.java diff --git a/src/main/java/org/tikv/common/KVClient.java b/src/main/java/org/tikv/common/KVClient.java index 0a01423c4e..ff61be6540 100644 --- a/src/main/java/org/tikv/common/KVClient.java +++ b/src/main/java/org/tikv/common/KVClient.java @@ -17,17 +17,16 @@ package org.tikv.common; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.tikv.common.util.ClientUtils.getKvPairs; + import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +38,7 @@ import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; +import org.tikv.common.util.Batch; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.kvproto.Kvrpcpb.KvPair; @@ -47,25 +47,17 @@ public class KVClient implements AutoCloseable { private static final int BATCH_GET_SIZE = 16 * 1024; private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; - private final ExecutorService executorService; + private final ExecutorService batchGetThreadPool; - public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) { - Objects.requireNonNull(conf, "conf is null"); + public KVClient(TiSession session, RegionStoreClientBuilder clientBuilder) { Objects.requireNonNull(clientBuilder, "clientBuilder is null"); - this.conf = conf; + this.conf = session.getConf(); this.clientBuilder = clientBuilder; - executorService = - Executors.newFixedThreadPool( - conf.getKvClientConcurrency(), - new ThreadFactoryBuilder().setNameFormat("kvclient-pool-%d").setDaemon(true).build()); + this.batchGetThreadPool = session.getThreadPoolForBatchGet(); } @Override - public void close() { - if (executorService != null) { - executorService.shutdownNow(); - } - } + public void close() {} /** * Get a key-value pair from TiKV if key exists @@ -134,7 +126,7 @@ public class KVClient implements AutoCloseable { private List doSendBatchGet(BackOffer backOffer, List keys, long version) { ExecutorCompletionService> completionService = - new ExecutorCompletionService<>(executorService); + new ExecutorCompletionService<>(batchGetThreadPool); Map> groupKeys = groupKeysByRegion(keys); List batches = new ArrayList<>(); @@ -149,18 +141,7 @@ public class KVClient implements AutoCloseable { () -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch, version)); } - try { - List result = new ArrayList<>(); - for (int i = 0; i < batches.size(); i++) { - result.addAll(completionService.take().get()); - } - return result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new TiKVException("Current thread interrupted.", e); - } catch (ExecutionException e) { - throw new TiKVException("Execution exception met.", e); - } + return getKvPairs(completionService, batches); } private List doSendBatchGetInBatchesWithRetry( @@ -258,15 +239,4 @@ public class KVClient implements AutoCloseable { int limit) { return new ConcreteScanIterator(conf, builder, startKey, version, limit); } - - /** A Batch containing the region and a list of keys to send */ - private static final class Batch { - private final TiRegion region; - private final List keys; - - Batch(TiRegion region, List keys) { - this.region = region; - this.keys = keys; - } - } } diff --git a/src/main/java/org/tikv/common/Snapshot.java b/src/main/java/org/tikv/common/Snapshot.java index ad3c823003..820c40e71b 100644 --- a/src/main/java/org/tikv/common/Snapshot.java +++ b/src/main/java/org/tikv/common/Snapshot.java @@ -65,7 +65,7 @@ public class Snapshot { } public ByteString get(ByteString key) { - try (KVClient client = new KVClient(session.getConf(), session.getRegionStoreClientBuilder())) { + try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) { return client.get(key, timestamp.getVersion()); } } @@ -75,7 +75,7 @@ public class Snapshot { for (byte[] key : keys) { list.add(ByteString.copyFrom(key)); } - try (KVClient client = new KVClient(session.getConf(), session.getRegionStoreClientBuilder())) { + try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) { List kvPairList = client.batchGet( ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion()); diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index f96bf6fdb2..5bb90d9ad1 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -40,7 +40,9 @@ public class TiConfiguration implements Serializable { private static final int MAX_REQUEST_KEY_RANGE_SIZE = 20000; private static final int DEF_INDEX_SCAN_CONCURRENCY = 5; private static final int DEF_TABLE_SCAN_CONCURRENCY = 512; + private static final int DEF_BATCH_GET_CONCURRENCY = 20; private static final int DEF_BATCH_PUT_CONCURRENCY = 20; + private static final int DEF_BATCH_SCAN_CONCURRENCY = 5; private static final CommandPri DEF_COMMAND_PRIORITY = CommandPri.Low; private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.SI; private static final boolean DEF_SHOW_ROWID = false; @@ -65,7 +67,9 @@ public class TiConfiguration implements Serializable { private int downgradeThreshold = DEF_REGION_SCAN_DOWNGRADE_THRESHOLD; private int indexScanConcurrency = DEF_INDEX_SCAN_CONCURRENCY; private int tableScanConcurrency = DEF_TABLE_SCAN_CONCURRENCY; + private int batchGetConcurrency = DEF_BATCH_GET_CONCURRENCY; private int batchPutConcurrency = DEF_BATCH_PUT_CONCURRENCY; + private int batchScanConcurrency = DEF_BATCH_SCAN_CONCURRENCY; private CommandPri commandPriority = DEF_COMMAND_PRIORITY; private IsolationLevel isolationLevel = DEF_ISOLATION_LEVEL; private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE; @@ -204,6 +208,14 @@ public class TiConfiguration implements Serializable { this.tableScanConcurrency = tableScanConcurrency; } + public int getBatchGetConcurrency() { + return batchGetConcurrency; + } + + public void setBatchGetConcurrency(int batchGetConcurrency) { + this.batchGetConcurrency = batchGetConcurrency; + } + public int getBatchPutConcurrency() { return batchPutConcurrency; } @@ -212,6 +224,14 @@ public class TiConfiguration implements Serializable { this.batchPutConcurrency = batchPutConcurrency; } + public int getBatchScanConcurrency() { + return batchScanConcurrency; + } + + public void setBatchScanConcurrency(int batchScanConcurrency) { + this.batchScanConcurrency = batchScanConcurrency; + } + public CommandPri getCommandPriority() { return commandPriority; } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 5495a6ddd6..5f8a4de6c7 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -59,7 +59,9 @@ public class TiSession implements AutoCloseable { private volatile Catalog catalog; private volatile ExecutorService indexScanThreadPool; private volatile ExecutorService tableScanThreadPool; + private volatile ExecutorService batchGetThreadPool; private volatile ExecutorService batchPutThreadPool; + private volatile ExecutorService batchScanThreadPool; private volatile RegionManager regionManager; private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder; private boolean isClosed = false; @@ -230,6 +232,38 @@ public class TiSession implements AutoCloseable { return res; } + public ExecutorService getThreadPoolForBatchGet() { + ExecutorService res = batchGetThreadPool; + if (res == null) { + synchronized (this) { + if (batchGetThreadPool == null) { + batchGetThreadPool = + Executors.newFixedThreadPool( + conf.getBatchGetConcurrency(), + new ThreadFactoryBuilder().setDaemon(true).build()); + } + res = batchGetThreadPool; + } + } + return res; + } + + public ExecutorService getThreadPoolForBatchScan() { + ExecutorService res = batchScanThreadPool; + if (res == null) { + synchronized (this) { + if (batchScanThreadPool == null) { + batchScanThreadPool = + Executors.newFixedThreadPool( + conf.getBatchScanConcurrency(), + new ThreadFactoryBuilder().setDaemon(true).build()); + } + res = batchScanThreadPool; + } + } + return res; + } + @VisibleForTesting public ChannelFactory getChannelFactory() { return channelFactory; diff --git a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java index d30d3ea7b7..8bf0a537c1 100644 --- a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java @@ -64,7 +64,7 @@ public class ConcreteScanIterator extends ScanIterator { ByteString endKey, long version, int limit) { - super(conf, builder, startKey, endKey, limit); + super(conf, builder, startKey, endKey, limit, false); this.version = version; } diff --git a/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java index d2989ff013..fbd9a09412 100644 --- a/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java @@ -35,8 +35,9 @@ public class RawScanIterator extends ScanIterator { RegionStoreClientBuilder builder, ByteString startKey, ByteString endKey, - int limit) { - super(conf, builder, startKey, endKey, limit); + int limit, + boolean keyOnly) { + super(conf, builder, startKey, endKey, limit, keyOnly); } TiRegion loadCurrentRegionToCache() throws GrpcException { @@ -48,7 +49,7 @@ public class RawScanIterator extends ScanIterator { currentCache = null; } else { try { - currentCache = client.rawScan(backOffer, startKey, limit); + currentCache = client.rawScan(backOffer, startKey, limit, keyOnly); } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); continue; diff --git a/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java index b3027121de..ad487932e1 100644 --- a/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java @@ -35,6 +35,7 @@ public abstract class ScanIterator implements Iterator { protected ByteString startKey; protected int index = -1; protected int limit; + protected boolean keyOnly; protected boolean endOfScan = false; protected Key endKey; @@ -46,7 +47,8 @@ public abstract class ScanIterator implements Iterator { RegionStoreClientBuilder builder, ByteString startKey, ByteString endKey, - int limit) { + int limit, + boolean keyOnly) { this.startKey = requireNonNull(startKey, "start key is null"); if (startKey.isEmpty()) { throw new IllegalArgumentException("start key cannot be empty"); @@ -54,6 +56,7 @@ public abstract class ScanIterator implements Iterator { this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null")); this.hasEndKey = !endKey.equals(ByteString.EMPTY); this.limit = limit; + this.keyOnly = keyOnly; this.conf = conf; this.builder = builder; } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index b0a3866bf1..eff5d4b9db 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -887,6 +887,38 @@ public class RegionStoreClient extends AbstractRegionStoreClient { } } + public List rawBatchGet(BackOffer backoffer, List keys) { + if (keys.isEmpty()) { + return new ArrayList<>(); + } + Supplier factory = + () -> + RawBatchGetRequest.newBuilder() + .setContext(region.getContext()) + .addAllKeys(keys) + .build(); + KVErrorHandler handler = + new KVErrorHandler<>( + regionManager, + this, + region, + resp -> resp.hasRegionError() ? resp.getRegionError() : null); + RawBatchGetResponse resp = + callWithRetry(backoffer, TikvGrpc.getRawBatchGetMethod(), factory, handler); + return handleRawBatchGet(resp); + } + + private List handleRawBatchGet(RawBatchGetResponse resp) { + if (resp == null) { + this.regionManager.onRequestFail(region); + throw new TiClientInternalException("RawBatchPutResponse failed without a cause"); + } + if (resp.hasRegionError()) { + throw new RegionException(resp.getRegionError()); + } + return resp.getPairsList(); + } + public void rawBatchPut(BackOffer backOffer, List kvPairs) { if (kvPairs.isEmpty()) { return; @@ -908,6 +940,15 @@ public class RegionStoreClient extends AbstractRegionStoreClient { handleRawBatchPut(resp); } + public void rawBatchPut(BackOffer backOffer, Batch batch) { + List pairs = new ArrayList<>(); + for (int i = 0; i < batch.keys.size(); i++) { + pairs.add( + KvPair.newBuilder().setKey(batch.keys.get(i)).setValue(batch.values.get(i)).build()); + } + rawBatchPut(backOffer, pairs); + } + private void handleRawBatchPut(RawBatchPutResponse resp) { if (resp == null) { this.regionManager.onRequestFail(region); @@ -927,7 +968,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { * @param keyOnly true if value of KvPair is not needed * @return KvPair list */ - private List rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) { + public List rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) { Supplier factory = () -> RawScanRequest.newBuilder() @@ -947,12 +988,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { return rawScanHelper(resp); } - public List rawScan(BackOffer backOffer, ByteString key) { - return rawScan(backOffer, key, getConf().getScanBatchSize()); - } - - public List rawScan(BackOffer backOffer, ByteString key, int limit) { - return rawScan(backOffer, key, limit, false); + public List rawScan(BackOffer backOffer, ByteString key, boolean keyOnly) { + return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly); } private List rawScanHelper(RawScanResponse resp) { diff --git a/src/main/java/org/tikv/common/util/Batch.java b/src/main/java/org/tikv/common/util/Batch.java new file mode 100644 index 0000000000..1412070411 --- /dev/null +++ b/src/main/java/org/tikv/common/util/Batch.java @@ -0,0 +1,53 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.util; + +import com.google.protobuf.ByteString; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.tikv.common.region.TiRegion; + +/** A Batch containing the region, a list of keys and/or values to send */ +public class Batch { + public final TiRegion region; + public final List keys; + public final List values; + public final Map map; + + public Batch(TiRegion region, List keys) { + this.region = region; + this.keys = keys; + this.values = null; + this.map = null; + } + + public Batch(TiRegion region, List keys, List values) { + this.region = region; + this.keys = keys; + this.values = values; + this.map = toMap(keys, values); + } + + private Map toMap(List keys, List values) { + assert keys.size() == values.size(); + Map kvMap = new HashMap<>(); + for (int i = 0; i < keys.size(); i++) { + kvMap.put(keys.get(i), values.get(i)); + } + return kvMap; + } +} diff --git a/src/main/java/org/tikv/common/util/ClientUtils.java b/src/main/java/org/tikv/common/util/ClientUtils.java new file mode 100644 index 0000000000..5f4f963782 --- /dev/null +++ b/src/main/java/org/tikv/common/util/ClientUtils.java @@ -0,0 +1,41 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import org.tikv.common.exception.TiKVException; +import org.tikv.kvproto.Kvrpcpb; + +public class ClientUtils { + public static List getKvPairs( + ExecutorCompletionService> completionService, List batches) { + try { + List result = new ArrayList<>(); + for (int i = 0; i < batches.size(); i++) { + result.addAll(completionService.take().get()); + } + return result; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TiKVException("Current thread interrupted.", e); + } catch (ExecutionException e) { + throw new TiKVException("Execution exception met.", e); + } + } +} diff --git a/src/main/java/org/tikv/common/util/ScanOption.java b/src/main/java/org/tikv/common/util/ScanOption.java new file mode 100644 index 0000000000..749d664ae0 --- /dev/null +++ b/src/main/java/org/tikv/common/util/ScanOption.java @@ -0,0 +1,91 @@ +/* + * Copyright 2020 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.util; + +import com.google.protobuf.ByteString; + +/** if Limit of a ScanBatch is 0, it means to scan all keys in */ +public class ScanOption { + private final ByteString startKey; + private final ByteString endKey; + private final int limit; + private final boolean keyOnly; + + private ScanOption(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { + this.startKey = startKey; + this.endKey = endKey; + this.limit = limit; + this.keyOnly = keyOnly; + } + + public static ScanOptionBuilder newBuilder() { + return new ScanOptionBuilder(); + } + + public ByteString getStartKey() { + return startKey; + } + + public ByteString getEndKey() { + return endKey; + } + + public int getLimit() { + return limit; + } + + public boolean isKeyOnly() { + return keyOnly; + } + + public static class ScanOptionBuilder { + private ByteString startKey; + private ByteString endKey; + private int limit; + private boolean keyOnly; + + private ScanOptionBuilder() { + this.startKey = ByteString.EMPTY; + this.endKey = ByteString.EMPTY; + this.limit = 0; + this.keyOnly = false; + } + + public ScanOption build() { + return new ScanOption(startKey, endKey, limit, keyOnly); + } + + public ScanOptionBuilder setStartKey(ByteString startKey) { + this.startKey = startKey; + return this; + } + + public ScanOptionBuilder setEndKey(ByteString endKey) { + this.endKey = endKey; + return this; + } + + public ScanOptionBuilder setLimit(int limit) { + this.limit = limit; + return this; + } + + public ScanOptionBuilder setKeyOnly(boolean keyOnly) { + this.keyOnly = keyOnly; + return this; + } + } +} diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 19b409f347..50dbe8e373 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -15,6 +15,8 @@ package org.tikv.raw; +import static org.tikv.common.util.ClientUtils.getKvPairs; + import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.HashMap; @@ -23,10 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,21 +38,23 @@ import org.tikv.common.operation.iterator.RawScanIterator; import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; -import org.tikv.common.util.BackOffFunction; -import org.tikv.common.util.BackOffer; -import org.tikv.common.util.ConcreteBackOffer; -import org.tikv.kvproto.Kvrpcpb; +import org.tikv.common.util.*; +import org.tikv.kvproto.Kvrpcpb.KvPair; public class RawKVClient implements AutoCloseable { private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; - private final ExecutorCompletionService completionService; + private final ExecutorService batchGetThreadPool; + private final ExecutorService batchPutThreadPool; + private final ExecutorService batchScanThreadPool; private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class); private static final int MAX_RETRY_LIMIT = 3; // https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go private static final int MAX_RAW_SCAN_LIMIT = 10240; private static final int RAW_BATCH_PUT_SIZE = 16 * 1024; + private static final int RAW_BATCH_GET_SIZE = 16 * 1024; + private static final int RAW_BATCH_SCAN_SIZE = 16; private static final int RAW_BATCH_PAIR_COUNT = 512; private static final TiKVException ERR_RETRY_LIMIT_EXCEEDED = @@ -66,7 +67,9 @@ public class RawKVClient implements AutoCloseable { Objects.requireNonNull(clientBuilder, "clientBuilder is null"); this.conf = session.getConf(); this.clientBuilder = clientBuilder; - this.completionService = new ExecutorCompletionService<>(session.getThreadPoolForBatchPut()); + this.batchGetThreadPool = session.getThreadPoolForBatchGet(); + this.batchPutThreadPool = session.getThreadPoolForBatchPut(); + this.batchScanThreadPool = session.getThreadPoolForBatchScan(); } @Override @@ -98,27 +101,12 @@ public class RawKVClient implements AutoCloseable { * @param kvPairs kvPairs */ public void batchPut(Map kvPairs) { - batchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs); + doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs); } private void batchPut(BackOffer backOffer, List keys, List values) { Map keysToValues = mapKeysToValues(keys, values); - batchPut(backOffer, keysToValues); - } - - private void batchPut(BackOffer backOffer, Map kvPairs) { - Map> groupKeys = groupKeysByRegion(kvPairs.keySet()); - List batches = new ArrayList<>(); - - for (Map.Entry> entry : groupKeys.entrySet()) { - appendBatches( - batches, - entry.getKey(), - entry.getValue(), - entry.getValue().stream().map(kvPairs::get).collect(Collectors.toList()), - RAW_BATCH_PUT_SIZE); - } - sendBatchPut(backOffer, batches); + doSendBatchPut(backOffer, keysToValues); } /** @@ -140,6 +128,50 @@ public class RawKVClient implements AutoCloseable { throw ERR_RETRY_LIMIT_EXCEEDED; } + /** + * Get a list of raw key-value pair from TiKV if key exists + * + * @param keys list of raw key + * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist + */ + public List batchGet(List keys) { + BackOffer backOffer = defaultBackOff(); + return doSendBatchGet(backOffer, keys); + } + + public List> batchScan(List ranges) { + if (ranges.isEmpty()) { + return new ArrayList<>(); + } + ExecutorCompletionService>> completionService = + new ExecutorCompletionService<>(batchScanThreadPool); + int num = 0; + for (ScanOption scanOption : ranges) { + int i = num; + completionService.submit(() -> Pair.create(i, scan(scanOption))); + ++num; + } + List> scanResults = new ArrayList<>(); + for (int i = 0; i < num; i++) { + scanResults.add(new ArrayList<>()); + } + for (int i = 0; i < num; i++) { + try { + Pair> scanResult = + completionService.take().get(BackOffer.RAWKV_MAX_BACKOFF, TimeUnit.SECONDS); + scanResults.set(scanResult.first, scanResult.second); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TiKVException("Current thread interrupted.", e); + } catch (TimeoutException e) { + throw new TiKVException("TimeOut Exceeded for current operation. ", e); + } catch (ExecutionException e) { + throw new TiKVException("Execution exception met.", e); + } + } + return scanResults; + } + /** * Scan raw key-value pairs from TiKV in range [startKey, endKey) * @@ -148,26 +180,31 @@ public class RawKVClient implements AutoCloseable { * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} * @return list of key-value pairs in range */ - public List scan(ByteString startKey, ByteString endKey, int limit) { - Iterator iterator = - rawScanIterator(conf, clientBuilder, startKey, endKey, limit); - List result = new ArrayList<>(); + public List scan(ByteString startKey, ByteString endKey, int limit) { + return scan(startKey, endKey, limit, false); + } + + public List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { + Iterator iterator = + rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly); + List result = new ArrayList<>(); iterator.forEachRemaining(result::add); return result; } /** - * Scan raw key-value pairs from TiKV in range [startKey, endKey) + * Scan raw key-value pairs from TiKV in range [startKey, ♾) * * @param startKey raw start key, inclusive * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} * @return list of key-value pairs in range */ - public List scan(ByteString startKey, int limit) { - Iterator iterator = rawScanIterator(conf, clientBuilder, startKey, limit); - List result = new ArrayList<>(); - iterator.forEachRemaining(result::add); - return result; + public List scan(ByteString startKey, int limit) { + return scan(startKey, limit, false); + } + + public List scan(ByteString startKey, int limit, boolean keyOnly) { + return scan(startKey, ByteString.EMPTY, limit, keyOnly); } /** @@ -177,11 +214,15 @@ public class RawKVClient implements AutoCloseable { * @param endKey raw end key, exclusive * @return list of key-value pairs in range */ - public List scan(ByteString startKey, ByteString endKey) { - List result = new ArrayList<>(); + public List scan(ByteString startKey, ByteString endKey) { + return scan(startKey, endKey, false); + } + + public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { + List result = new ArrayList<>(); while (true) { - Iterator iterator = - rawScanIterator(conf, clientBuilder, startKey, endKey, conf.getScanBatchSize()); + Iterator iterator = + rawScanIterator(conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly); if (!iterator.hasNext()) { break; } @@ -191,6 +232,14 @@ public class RawKVClient implements AutoCloseable { return result; } + private List scan(ScanOption scanOption) { + ByteString startKey = scanOption.getStartKey(); + ByteString endKey = scanOption.getEndKey(); + int limit = scanOption.getLimit(); + boolean keyOnly = scanOption.isKeyOnly(); + return scan(startKey, endKey, limit, keyOnly); + } + /** * Delete a raw key-value pair from TiKV if key exists * @@ -209,19 +258,6 @@ public class RawKVClient implements AutoCloseable { } } - /** A Batch containing the region, a list of keys and/or values to send */ - private static final class Batch { - private final TiRegion region; - private final List keys; - private final List values; - - public Batch(TiRegion region, List keys, List values) { - this.region = region; - this.keys = keys; - this.values = values; - } - } - /** * Append batch to list and split them according to batch limit * @@ -253,6 +289,155 @@ public class RawKVClient implements AutoCloseable { } } + private void appendBatches( + List batches, TiRegion region, List keys, int limit) { + List tmpKeys = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + if (i >= limit) { + batches.add(new Batch(region, tmpKeys)); + tmpKeys.clear(); + } + tmpKeys.add(keys.get(i)); + } + if (!tmpKeys.isEmpty()) { + batches.add(new Batch(region, tmpKeys)); + } + } + + private void doSendBatchPut(BackOffer backOffer, Map kvPairs) { + ExecutorCompletionService completionService = + new ExecutorCompletionService<>(batchPutThreadPool); + + Map> groupKeys = groupKeysByRegion(kvPairs.keySet()); + List batches = new ArrayList<>(); + + for (Map.Entry> entry : groupKeys.entrySet()) { + appendBatches( + batches, + entry.getKey(), + entry.getValue(), + entry.getValue().stream().map(kvPairs::get).collect(Collectors.toList()), + RAW_BATCH_PUT_SIZE); + } + + for (Batch batch : batches) { + BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer); + completionService.submit(() -> doSendBatchPutInBatchesWithRetry(singleBatchBackOffer, batch)); + } + try { + for (int i = 0; i < batches.size(); i++) { + completionService.take().get(BackOffer.RAWKV_MAX_BACKOFF, TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TiKVException("Current thread interrupted.", e); + } catch (TimeoutException e) { + throw new TiKVException("TimeOut Exceeded for current operation. ", e); + } catch (ExecutionException e) { + throw new TiKVException("Execution exception met.", e); + } + } + + private Object doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch) { + TiRegion oldRegion = batch.region; + TiRegion currentRegion = + clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey()); + + if (oldRegion.equals(currentRegion)) { + try (RegionStoreClient client = clientBuilder.build(batch.region); ) { + client.rawBatchPut(backOffer, batch); + return null; + } catch (final TiKVException e) { + // TODO: any elegant way to re-split the ranges if fails? + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + logger.warn("ReSplitting ranges for BatchPutRequest"); + // retry + return doSendBatchPutWithRefetchRegion(backOffer, batch); + } + } else { + return doSendBatchPutWithRefetchRegion(backOffer, batch); + } + } + + private Object doSendBatchPutWithRefetchRegion(BackOffer backOffer, Batch batch) { + Map> groupKeys = groupKeysByRegion(batch.keys); + List retryBatches = new ArrayList<>(); + + for (Map.Entry> entry : groupKeys.entrySet()) { + appendBatches( + retryBatches, + entry.getKey(), + entry.getValue(), + entry.getValue().stream().map(batch.map::get).collect(Collectors.toList()), + RAW_BATCH_PUT_SIZE); + } + + for (Batch retryBatch : retryBatches) { + // recursive calls + doSendBatchPutInBatchesWithRetry(backOffer, retryBatch); + } + + return null; + } + + private List doSendBatchGet(BackOffer backOffer, List keys) { + ExecutorCompletionService> completionService = + new ExecutorCompletionService<>(batchGetThreadPool); + + Map> groupKeys = groupKeysByRegion(keys); + List batches = new ArrayList<>(); + + for (Map.Entry> entry : groupKeys.entrySet()) { + appendBatches(batches, entry.getKey(), entry.getValue(), RAW_BATCH_GET_SIZE); + } + + for (Batch batch : batches) { + BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer); + completionService.submit(() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch)); + } + + return getKvPairs(completionService, batches); + } + + private List doSendBatchGetInBatchesWithRetry(BackOffer backOffer, Batch batch) { + TiRegion oldRegion = batch.region; + TiRegion currentRegion = + clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey()); + + if (oldRegion.equals(currentRegion)) { + RegionStoreClient client = clientBuilder.build(batch.region); + try { + return client.rawBatchGet(backOffer, batch.keys); + } catch (final TiKVException e) { + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + clientBuilder.getRegionManager().invalidateRegion(batch.region.getId()); + logger.warn("ReSplitting ranges for BatchGetRequest", e); + + // retry + return doSendBatchGetWithRefetchRegion(backOffer, batch); + } + } else { + return doSendBatchGetWithRefetchRegion(backOffer, batch); + } + } + + private List doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch batch) { + Map> groupKeys = groupKeysByRegion(batch.keys); + List retryBatches = new ArrayList<>(); + + for (Map.Entry> entry : groupKeys.entrySet()) { + appendBatches(retryBatches, entry.getKey(), entry.getValue(), RAW_BATCH_GET_SIZE); + } + + ArrayList results = new ArrayList<>(); + for (Batch retryBatch : retryBatches) { + // recursive calls + List batchResult = doSendBatchGetInBatchesWithRetry(backOffer, retryBatch); + results.addAll(batchResult); + } + return results; + } + /** * Group by list of keys according to its region * @@ -271,6 +456,11 @@ public class RawKVClient implements AutoCloseable { return groups; } + private Map> groupKeysByRegion(List keys) { + return keys.stream() + .collect(Collectors.groupingBy(clientBuilder.getRegionManager()::getRegionByKey)); + } + private static Map mapKeysToValues( List keys, List values) { Map map = new HashMap<>(); @@ -280,66 +470,17 @@ public class RawKVClient implements AutoCloseable { return map; } - /** - * Send batchPut request concurrently - * - * @param backOffer current backOffer - * @param batches list of batch to send - */ - private void sendBatchPut(BackOffer backOffer, List batches) { - for (Batch batch : batches) { - completionService.submit( - () -> { - BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer); - List kvPairs = new ArrayList<>(); - for (int i = 0; i < batch.keys.size(); i++) { - kvPairs.add( - Kvrpcpb.KvPair.newBuilder() - .setKey(batch.keys.get(i)) - .setValue(batch.values.get(i)) - .build()); - } - try (RegionStoreClient client = clientBuilder.build(batch.region); ) { - client.rawBatchPut(singleBatchBackOffer, kvPairs); - } catch (final TiKVException e) { - // TODO: any elegant way to re-split the ranges if fails? - singleBatchBackOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); - logger.warn("ReSplitting ranges for BatchPutRequest"); - // recursive calls - batchPut(singleBatchBackOffer, batch.keys, batch.values); - } - return null; - }); - } - try { - for (int i = 0; i < batches.size(); i++) { - completionService.take().get(BackOffer.RAWKV_MAX_BACKOFF, TimeUnit.SECONDS); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new TiKVException("Current thread interrupted.", e); - } catch (TimeoutException e) { - throw new TiKVException("TimeOut Exceeded for current operation. ", e); - } catch (ExecutionException e) { - throw new TiKVException("Execution exception met.", e); - } - } - - private Iterator rawScanIterator( + private Iterator rawScanIterator( TiConfiguration conf, RegionStoreClientBuilder builder, ByteString startKey, ByteString endKey, - int limit) { + int limit, + boolean keyOnly) { if (limit > MAX_RAW_SCAN_LIMIT) { throw ERR_MAX_SCAN_LIMIT_EXCEEDED; } - return new RawScanIterator(conf, builder, startKey, endKey, limit); - } - - private Iterator rawScanIterator( - TiConfiguration conf, RegionStoreClientBuilder builder, ByteString startKey, int limit) { - return rawScanIterator(conf, builder, startKey, ByteString.EMPTY, limit); + return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly); } private BackOffer defaultBackOff() { diff --git a/src/main/java/org/tikv/txn/KVClient.java b/src/main/java/org/tikv/txn/KVClient.java index 53c37d94f5..66462837bc 100644 --- a/src/main/java/org/tikv/txn/KVClient.java +++ b/src/main/java/org/tikv/txn/KVClient.java @@ -15,10 +15,11 @@ package org.tikv.txn; +import static org.tikv.common.util.ClientUtils.getKvPairs; + import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -34,6 +35,7 @@ import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; +import org.tikv.common.util.Batch; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.kvproto.Kvrpcpb; @@ -147,18 +149,7 @@ public class KVClient implements AutoCloseable { () -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch, version)); } - try { - List result = new ArrayList<>(); - for (int i = 0; i < batches.size(); i++) { - result.addAll(completionService.take().get()); - } - return result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new TiKVException("Current thread interrupted.", e); - } catch (ExecutionException e) { - throw new TiKVException("Execution exception met.", e); - } + return getKvPairs(completionService, batches); } private List doSendBatchGetInBatchesWithRetry( @@ -257,15 +248,4 @@ public class KVClient implements AutoCloseable { int limit) { return new ConcreteScanIterator(conf, builder, startKey, version, limit); } - - /** A Batch containing the region and a list of keys to send */ - private static final class Batch { - private final TiRegion region; - private final List keys; - - Batch(TiRegion region, List keys) { - this.region = region; - this.keys = keys; - } - } } diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 9fe79e2251..fa5b69f5ca 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -17,6 +17,7 @@ import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.util.FastByteComparisons; +import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; public class RawKVClientTest { @@ -26,7 +27,8 @@ public class RawKVClientTest { private static final int TEST_CASES = 10000; private static final int WORKER_CNT = 100; private static final ByteString RAW_START_KEY = ByteString.copyFromUtf8(RAW_PREFIX); - private static final ByteString RAW_END_KEY = Key.toRawKey(RAW_START_KEY).next().toByteString(); + private static final ByteString RAW_END_KEY = + Key.toRawKey(RAW_START_KEY).nextPrefix().toByteString(); private RawKVClient client; private static final List orderedKeys; private static final List randomKeys; @@ -130,15 +132,15 @@ public class RawKVClientTest { @Test public void validate() { if (!initialized) return; - baseTest(100, 100, 100, 100, false, false); - baseTest(100, 100, 100, 100, false, true); + baseTest(100, 100, 100, 100, false, false, false, false); + baseTest(100, 100, 100, 100, false, true, true, true); } /** Example of benchmarking base test */ public void benchmark() { if (!initialized) return; - baseTest(TEST_CASES, TEST_CASES, 200, 5000, true, false); - baseTest(TEST_CASES, TEST_CASES, 200, 5000, true, true); + baseTest(TEST_CASES, TEST_CASES, 200, 5000, true, false, false, false); + baseTest(TEST_CASES, TEST_CASES, 200, 5000, true, true, true, true); } private void baseTest( @@ -147,7 +149,9 @@ public class RawKVClientTest { int scanCases, int deleteCases, boolean benchmark, - boolean batchPut) { + boolean batchPut, + boolean batchGet, + boolean batchScan) { if (putCases > KEY_POOL_SIZE) { logger.info("Number of distinct orderedKeys required exceeded pool size " + KEY_POOL_SIZE); return; @@ -165,8 +169,16 @@ public class RawKVClientTest { } else { rawPutTest(putCases, benchmark); } - rawGetTest(getCases, benchmark); - rawScanTest(scanCases, benchmark); + if (batchGet) { + rawBatchGetTest(getCases, benchmark); + } else { + rawGetTest(getCases, benchmark); + } + if (batchScan) { + rawBatchScanTest(scanCases, benchmark); + } else { + rawScanTest(scanCases, benchmark); + } rawDeleteTest(deleteCases, benchmark); prepare(); @@ -338,8 +350,44 @@ public class RawKVClientTest { } } + private void rawBatchGetTest(int getCases, boolean benchmark) { + logger.info("batchGet testing"); + if (benchmark) { + long start = System.currentTimeMillis(); + int base = getCases / WORKER_CNT; + for (int cnt = 0; cnt < WORKER_CNT; cnt++) { + int i = cnt; + completionService.submit( + () -> { + List keys = new ArrayList<>(); + for (int j = 0; j < base; j++) { + int num = i * base + j; + ByteString key = orderedKeys.get(num); + keys.add(key); + } + client.batchGet(keys); + return null; + }); + } + awaitTimeOut(200); + long end = System.currentTimeMillis(); + logger.info(getCases + " batchGet: " + (end - start) / 1000.0 + "s"); + } else { + int i = 0; + List keys = new ArrayList<>(); + for (Map.Entry pair : data.entrySet()) { + keys.add(pair.getKey()); + i++; + if (i >= getCases) { + break; + } + } + checkBatchGet(keys); + } + } + private void rawScanTest(int scanCases, boolean benchmark) { - logger.info("rawScan testing"); + logger.info("scan testing"); if (benchmark) { long start = System.currentTimeMillis(); int base = scanCases / WORKER_CNT; @@ -377,6 +425,57 @@ public class RawKVClientTest { } } + private void rawBatchScanTest(int scanCases, boolean benchmark) { + logger.info("batchScan testing"); + if (benchmark) { + long start = System.currentTimeMillis(); + int base = scanCases / WORKER_CNT; + for (int cnt = 0; cnt < WORKER_CNT; cnt++) { + int i = cnt; + completionService.submit( + () -> { + List scanOptions = new ArrayList<>(); + for (int j = 0; j < base; j++) { + int num = i * base + j; + ByteString startKey = randomKeys.get(num), endKey = randomKeys.get(num + 1); + if (bsc.compare(startKey, endKey) > 0) { + ByteString tmp = startKey; + startKey = endKey; + endKey = tmp; + } + ScanOption scanOption = + ScanOption.newBuilder() + .setStartKey(startKey) + .setEndKey(endKey) + .setLimit(limit) + .build(); + scanOptions.add(scanOption); + } + client.batchScan(scanOptions); + return null; + }); + } + awaitTimeOut(200); + long end = System.currentTimeMillis(); + logger.info(scanCases + " batchScan: " + (end - start) / 1000.0 + "s"); + } else { + List scanOptions = new ArrayList<>(); + for (int i = 0; i < scanCases; i++) { + ByteString startKey = randomKeys.get(r.nextInt(KEY_POOL_SIZE)), + endKey = randomKeys.get(r.nextInt(KEY_POOL_SIZE)); + if (bsc.compare(startKey, endKey) > 0) { + ByteString tmp = startKey; + startKey = endKey; + endKey = tmp; + } + ScanOption scanOption = + ScanOption.newBuilder().setStartKey(startKey).setEndKey(endKey).setLimit(limit).build(); + scanOptions.add(scanOption); + } + checkBatchScan(scanOptions); + } + } + private void rawDeleteTest(int deleteCases, boolean benchmark) { logger.info("delete testing"); if (benchmark) { @@ -409,6 +508,14 @@ public class RawKVClientTest { } } + private void checkBatchGet(List keys) { + List result = client.batchGet(keys); + for (Kvrpcpb.KvPair kvPair : result) { + assert data.containsKey(kvPair.getKey()); + assert kvPair.getValue().equals(data.get(kvPair.getKey())); + } + } + private void checkPut(ByteString key, ByteString value) { client.put(key, value); assert client.get(key).equals(value); @@ -445,6 +552,26 @@ public class RawKVClientTest { limit); } + private void checkBatchScan(List scanOptions) { + List> result = client.batchScan(scanOptions); + int i = 0; + for (ScanOption scanOption : scanOptions) { + List partialResult = + data.subMap(scanOption.getStartKey(), scanOption.getEndKey()) + .entrySet() + .stream() + .map( + kvPair -> + Kvrpcpb.KvPair.newBuilder() + .setKey(kvPair.getKey()) + .setValue(kvPair.getValue()) + .build()) + .collect(Collectors.toList()); + assert result.get(i).equals(partialResult); + i++; + } + } + private void checkDelete(ByteString key) { client.delete(key); checkEmpty(key);