mirror of https://github.com/tikv/client-java.git
This commit is contained in:
parent
36feccb3fa
commit
97983823cc
|
@ -30,6 +30,7 @@ import org.tikv.common.util.BackOffer;
|
|||
import org.tikv.kvproto.Kvrpcpb;
|
||||
|
||||
public class RawScanIterator extends ScanIterator {
|
||||
|
||||
private final BackOffer scanBackOffer;
|
||||
|
||||
public RawScanIterator(
|
||||
|
@ -67,11 +68,12 @@ public class RawScanIterator extends ScanIterator {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean notEndOfScan() {
|
||||
return limit > 0
|
||||
&& !(processingLastBatch
|
||||
&& (index >= currentCache.size()
|
||||
|| Key.toRawKey(currentCache.get(index).getKey()).compareTo(endKey) >= 0));
|
||||
private boolean endOfScan() {
|
||||
if (!processingLastBatch) {
|
||||
return false;
|
||||
}
|
||||
ByteString lastKey = currentCache.get(index).getKey();
|
||||
return !lastKey.isEmpty() && Key.toRawKey(lastKey).compareTo(endKey) >= 0;
|
||||
}
|
||||
|
||||
boolean isCacheDrained() {
|
||||
|
@ -90,7 +92,7 @@ public class RawScanIterator extends ScanIterator {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
return notEndOfScan();
|
||||
return !endOfScan();
|
||||
}
|
||||
|
||||
private Kvrpcpb.KvPair getCurrent() {
|
||||
|
|
|
@ -90,7 +90,7 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
|
|||
Key lastKey = Key.EMPTY;
|
||||
// Session should be single-threaded itself
|
||||
// so that we don't worry about conf change in the middle
|
||||
// of a transaction. Otherwise below code might lose data
|
||||
// of a transaction. Otherwise, below code might lose data
|
||||
if (currentCache.size() < limit) {
|
||||
startKey = curRegionEndKey;
|
||||
lastKey = Key.toRawKey(curRegionEndKey);
|
||||
|
|
|
@ -17,14 +17,32 @@
|
|||
|
||||
package org.tikv.raw;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.protobuf.ByteString;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -45,6 +63,7 @@ import org.tikv.common.util.ScanOption;
|
|||
import org.tikv.kvproto.Kvrpcpb;
|
||||
|
||||
public class RawKVClientTest extends BaseRawKVTest {
|
||||
|
||||
private static final String RAW_PREFIX = "raw_\u0001_";
|
||||
private static final int KEY_POOL_SIZE = 1000000;
|
||||
private static final int TEST_CASES = 10000;
|
||||
|
@ -360,6 +379,34 @@ public class RawKVClientTest extends BaseRawKVTest {
|
|||
return client.scan(RAW_START_KEY, RAW_END_KEY);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scanTestForIssue540() {
|
||||
ByteString splitKeyA = ByteString.copyFromUtf8("splitKeyA");
|
||||
ByteString splitKeyB = ByteString.copyFromUtf8("splitKeyB");
|
||||
session.splitRegionAndScatter(
|
||||
ImmutableList.of(splitKeyA.toByteArray(), splitKeyB.toByteArray()));
|
||||
client.deleteRange(ByteString.EMPTY, ByteString.EMPTY);
|
||||
|
||||
client.put(ByteString.EMPTY, ByteString.EMPTY);
|
||||
client.put(splitKeyA, ByteString.EMPTY);
|
||||
Assert.assertEquals(0, client.scan(ByteString.EMPTY, 0).size());
|
||||
Assert.assertEquals(1, client.scan(ByteString.EMPTY, 1).size());
|
||||
Assert.assertEquals(2, client.scan(ByteString.EMPTY, 2).size());
|
||||
Assert.assertEquals(2, client.scan(ByteString.EMPTY, 3).size());
|
||||
|
||||
client.deleteRange(ByteString.EMPTY, ByteString.EMPTY);
|
||||
|
||||
client.put(ByteString.EMPTY, ByteString.EMPTY);
|
||||
client.put(splitKeyA, ByteString.EMPTY);
|
||||
client.put(splitKeyA.concat(ByteString.copyFromUtf8("1")), ByteString.EMPTY);
|
||||
client.put(splitKeyA.concat(ByteString.copyFromUtf8("2")), ByteString.EMPTY);
|
||||
client.put(splitKeyA.concat(ByteString.copyFromUtf8("3")), ByteString.EMPTY);
|
||||
client.put(splitKeyB.concat(ByteString.copyFromUtf8("1")), ByteString.EMPTY);
|
||||
Assert.assertEquals(6, client.scan(ByteString.EMPTY, 7).size());
|
||||
Assert.assertEquals(0, client.scan(ByteString.EMPTY, -1).size());
|
||||
client.deleteRange(ByteString.EMPTY, ByteString.EMPTY);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void validate() {
|
||||
baseTest(100, 100, 100, 100, false, false, false, false, false);
|
||||
|
@ -449,7 +496,9 @@ public class RawKVClientTest extends BaseRawKVTest {
|
|||
int i = cnt;
|
||||
completionService.submit(
|
||||
() -> {
|
||||
for (int j = 0; j < base; j++) checkDelete(remainingKeys.get(i * base + j).getKey());
|
||||
for (int j = 0; j < base; j++) {
|
||||
checkDelete(remainingKeys.get(i * base + j).getKey());
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
@ -955,6 +1004,7 @@ public class RawKVClientTest extends BaseRawKVTest {
|
|||
}
|
||||
|
||||
private static class ByteStringComparator implements Comparator<ByteString> {
|
||||
|
||||
@Override
|
||||
public int compare(ByteString startKey, ByteString endKey) {
|
||||
return FastByteComparisons.compareTo(startKey.toByteArray(), endKey.toByteArray());
|
||||
|
|
Loading…
Reference in New Issue