update raw scan api (#78)

Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
birdstorm 2020-05-12 16:00:11 +08:00 committed by GitHub
parent 863b07d6d3
commit 2ca05e9256
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 19 deletions

View File

@ -126,9 +126,10 @@ ByteString get(ByteString key)
*
* @param startKey raw start key, inclusive
* @param endKey raw end key, exclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @return list of key-value pairs in range
*/
List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey)
List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, int limit)
```
```java
@ -136,7 +137,7 @@ List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey)
* Scan raw key-value pairs from TiKV in range [startKey, endKey)
*
* @param startKey raw start key, inclusive
* @param limit limit of key-value pairs
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @return list of key-value pairs in range
*/
List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit)

View File

@ -48,7 +48,13 @@ public class RawKVClient implements AutoCloseable {
private final ExecutorCompletionService<Object> completionService;
private static final Logger logger = Logger.getLogger(RawKVClient.class);
// https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go
private static final int MAX_RAW_SCAN_LIMIT = 10240;
private static final int RAW_BATCH_PUT_SIZE = 16 * 1024;
private static final int RAW_BATCH_PAIR_COUNT = 512;
private static final TiKVException ERR_MAX_SCAN_LIMIT_EXCEEDED =
new TiKVException("limit should be less than MAX_RAW_SCAN_LIMIT");
public RawKVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) {
Objects.requireNonNull(conf, "conf is null");
@ -133,10 +139,12 @@ public class RawKVClient implements AutoCloseable {
*
* @param startKey raw start key, inclusive
* @param endKey raw end key, exclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey) {
Iterator<Kvrpcpb.KvPair> iterator = rawScanIterator(conf, clientBuilder, startKey, endKey);
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
Iterator<Kvrpcpb.KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, limit);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
@ -146,7 +154,7 @@ public class RawKVClient implements AutoCloseable {
* Scan raw key-value pairs from TiKV in range [startKey, endKey)
*
* @param startKey raw start key, inclusive
* @param limit limit of key-value pairs
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit) {
@ -295,13 +303,17 @@ public class RawKVClient implements AutoCloseable {
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey) {
return new RawScanIterator(conf, builder, startKey, endKey, Integer.MAX_VALUE);
ByteString endKey,
int limit) {
if (limit > MAX_RAW_SCAN_LIMIT) {
throw ERR_MAX_SCAN_LIMIT_EXCEEDED;
}
return new RawScanIterator(conf, builder, startKey, endKey, limit);
}
private Iterator<Kvrpcpb.KvPair> rawScanIterator(
TiConfiguration conf, RegionStoreClientBuilder builder, ByteString startKey, int limit) {
return new RawScanIterator(conf, builder, startKey, ByteString.EMPTY, limit);
return rawScanIterator(conf, builder, startKey, ByteString.EMPTY, limit);
}
private BackOffer defaultBackOff() {

View File

@ -30,9 +30,10 @@ public class RawKVClientTest {
private static final List<ByteString> orderedKeys;
private static final List<ByteString> randomKeys;
private static final List<ByteString> values;
private static final int limit;
private TreeMap<ByteString, ByteString> data;
private boolean initialized;
private Random r = new Random(1234);
private final Random r = new Random(1234);
private static final ByteStringComparator bsc = new ByteStringComparator();
private static final ExecutorService executors = Executors.newFixedThreadPool(WORKER_CNT);
private final ExecutorCompletionService<Object> completionService =
@ -44,6 +45,7 @@ public class RawKVClientTest {
orderedKeys = new ArrayList<>();
randomKeys = new ArrayList<>();
values = new ArrayList<>();
limit = 10000;
for (int i = 0; i < KEY_POOL_SIZE; i++) {
orderedKeys.add(rawKey(String.valueOf(i)));
randomKeys.add(getRandomRawKey());
@ -106,10 +108,10 @@ public class RawKVClientTest {
List<Kvrpcpb.KvPair> result2 = new ArrayList<>();
result.add(kv1);
result.add(kv2);
checkScan(key, key3, result);
checkScan(key1, key3, result);
checkScan(key, key3, result, limit);
checkScan(key1, key3, result, limit);
result2.add(kv1);
checkScan(key, key2, result2);
checkScan(key, key2, result2, limit);
checkDelete(key1);
checkDelete(key2);
} catch (final TiKVException e) {
@ -118,7 +120,7 @@ public class RawKVClientTest {
}
private List<Kvrpcpb.KvPair> rawKeys() {
return client.scan(RAW_START_KEY, RAW_END_KEY);
return client.scan(RAW_START_KEY, RAW_END_KEY, limit);
}
@Test
@ -349,7 +351,7 @@ public class RawKVClientTest {
startKey = endKey;
endKey = tmp;
}
client.scan(startKey, endKey);
client.scan(startKey, endKey, limit);
}
return null;
});
@ -366,7 +368,7 @@ public class RawKVClientTest {
startKey = endKey;
endKey = tmp;
}
checkScan(startKey, endKey, data);
checkScan(startKey, endKey, data, limit);
}
}
}
@ -415,13 +417,14 @@ public class RawKVClientTest {
}
}
private void checkScan(ByteString startKey, ByteString endKey, List<Kvrpcpb.KvPair> ans) {
List<Kvrpcpb.KvPair> result = client.scan(startKey, endKey);
private void checkScan(
ByteString startKey, ByteString endKey, List<Kvrpcpb.KvPair> ans, int limit) {
List<Kvrpcpb.KvPair> result = client.scan(startKey, endKey, limit);
assert result.equals(ans);
}
private void checkScan(
ByteString startKey, ByteString endKey, TreeMap<ByteString, ByteString> data) {
ByteString startKey, ByteString endKey, TreeMap<ByteString, ByteString> data, int limit) {
checkScan(
startKey,
endKey,
@ -434,7 +437,8 @@ public class RawKVClientTest {
.setKey(kvPair.getKey())
.setValue(kvPair.getValue())
.build())
.collect(Collectors.toList()));
.collect(Collectors.toList()),
limit);
}
private void checkDelete(ByteString key) {