Implementing reverseScan

Signed-off-by: Jiaming Lu <jiaming.lu@simplytyped.cn>
This commit is contained in:
Jiaming Lu 2023-01-11 13:28:16 +08:00
parent d278e3ad19
commit 11cfac0739
8 changed files with 149 additions and 37 deletions

View File

@ -101,7 +101,25 @@ public class KVClient implements AutoCloseable {
*/
public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
throws GrpcException {
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, endKey, version);
return scan(startKey, endKey, version, false);
}
/**
* Scan key-value pairs from TiKV in range [startKey, endKey) or if reversely, [endKey, startKey)
*
* @param startKey start key, inclusive
* @param endKey end key, exclusive
* @param reverse whether to scan reversely
* @return list of key-value pairs in range
*/
public List<KvPair> scan(ByteString startKey, ByteString endKey, long version, boolean reverse)
throws GrpcException {
Iterator<KvPair> iterator;
if (reverse) {
iterator = scanIterator(conf, clientBuilder, endKey, startKey, version, reverse);
} else {
iterator = scanIterator(conf, clientBuilder, startKey, endKey, version, reverse);
}
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
@ -115,7 +133,7 @@ public class KVClient implements AutoCloseable {
* @return list of key-value pairs in range
*/
public List<KvPair> scan(ByteString startKey, long version, int limit) throws GrpcException {
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit, false);
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
@ -183,8 +201,9 @@ public class KVClient implements AutoCloseable {
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
long version,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version, reverse);
}
private Iterator<KvPair> scanIterator(
@ -192,7 +211,8 @@ public class KVClient implements AutoCloseable {
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
int limit,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit, reverse);
}
}

View File

@ -157,7 +157,24 @@ public class Snapshot {
session.getRegionStoreClientBuilder(),
startKey,
timestamp.getVersion(),
Integer.MAX_VALUE);
Integer.MAX_VALUE,
false);
}
/**
* scan all keys becofe startKey, inclusive
*
* @param startKey start of keys
* @return iterator of kvPair
*/
public Iterator<KvPair> reverseScan(ByteString startKey) {
return new ConcreteScanIterator(
session.getConf(),
session.getRegionStoreClientBuilder(),
startKey,
timestamp.getVersion(),
Integer.MAX_VALUE,
true);
}
/**
@ -173,7 +190,24 @@ public class Snapshot {
session.getRegionStoreClientBuilder(),
prefix,
nextPrefix,
timestamp.getVersion());
timestamp.getVersion(),
false);
}
/**
* scan all keys with prefix, reversely
*
* @param prefix prefix of keys
* @return iterator of kvPair
*/
public Iterator<KvPair> reverseScanPrefix(ByteString prefix) {
ByteString nextPrefix = Key.toRawKey(prefix).nextPrefix().toByteString();
return new ConcreteScanIterator(
session.getConf(),
session.getRegionStoreClientBuilder(),
nextPrefix,
prefix,
timestamp.getVersion(),
true);
}
public TiConfiguration getConf() {

View File

@ -44,9 +44,10 @@ public class ConcreteScanIterator extends ScanIterator {
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
int limit,
boolean reverse) {
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
this(conf, builder, startKey, ByteString.EMPTY, version, limit);
this(conf, builder, startKey, ByteString.EMPTY, version, limit, reverse);
}
public ConcreteScanIterator(
@ -54,9 +55,10 @@ public class ConcreteScanIterator extends ScanIterator {
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
long version,
boolean reverse) {
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE);
this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE, reverse);
}
private ConcreteScanIterator(
@ -65,8 +67,9 @@ public class ConcreteScanIterator extends ScanIterator {
ByteString startKey,
ByteString endKey,
long version,
int limit) {
super(conf, builder, startKey, endKey, limit, false);
int limit,
boolean reverse) {
super(conf, builder, startKey, endKey, limit, false, reverse);
this.version = version;
}
@ -76,7 +79,7 @@ public class ConcreteScanIterator extends ScanIterator {
try (RegionStoreClient client = builder.build(startKey)) {
client.setTimeout(conf.getScanTimeout());
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
currentCache = client.scan(backOffer, startKey, version);
currentCache = client.scan(backOffer, startKey, version, reverse);
// If we get region before scan, we will use region from cache which
// may have wrong end key. This may miss some regions that split from old region.
// Client will get the newest region during scan. So we need to

View File

@ -39,8 +39,9 @@ public class RawScanIterator extends ScanIterator {
ByteString endKey,
int limit,
boolean keyOnly,
boolean reverse,
BackOffer scanBackOffer) {
super(conf, builder, startKey, endKey, limit, keyOnly);
super(conf, builder, startKey, endKey, limit, keyOnly, reverse);
this.scanBackOffer = scanBackOffer;
}
@ -56,7 +57,7 @@ public class RawScanIterator extends ScanIterator {
currentCache = null;
} else {
try {
currentCache = client.rawScan(backOffer, startKey, limit, keyOnly);
currentCache = client.rawScan(backOffer, startKey, limit, keyOnly, reverse);
// Client will get the newest region during scan. So we need to
// update region after scan.
region = client.getRegion();

View File

@ -43,6 +43,7 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
protected Key endKey;
protected boolean hasEndKey;
protected boolean processingLastBatch = false;
protected boolean reverse = false;
ScanIterator(
TiConfiguration conf,
@ -50,7 +51,8 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
ByteString startKey,
ByteString endKey,
int limit,
boolean keyOnly) {
boolean keyOnly,
boolean reverse) {
this.startKey = requireNonNull(startKey, "start key is null");
this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
this.hasEndKey = !endKey.isEmpty();
@ -58,6 +60,7 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
this.keyOnly = keyOnly;
this.conf = conf;
this.builder = builder;
this.reverse = reverse;
}
/**

View File

@ -70,8 +70,7 @@ import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.HistogramUtils;
import org.tikv.common.util.Pair;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.*;
import org.tikv.kvproto.Kvrpcpb.BatchGetRequest;
import org.tikv.kvproto.Kvrpcpb.BatchGetResponse;
import org.tikv.kvproto.Kvrpcpb.CommitRequest;
@ -109,8 +108,6 @@ import org.tikv.kvproto.Kvrpcpb.SplitRegionRequest;
import org.tikv.kvproto.Kvrpcpb.SplitRegionResponse;
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest;
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
import org.tikv.txn.AbstractLockResolverClient;
@ -336,7 +333,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
public List<KvPair> scan(
BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) {
BackOffer backOffer, ByteString startKey, long version, boolean keyOnly, boolean reverse) {
boolean forWrite = false;
while (true) {
Supplier<ScanRequest> request =
@ -348,6 +345,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setStartKey(codec.encodeKey(startKey))
.setVersion(version)
.setKeyOnly(keyOnly)
.setReverse(reverse)
.setLimit(getConf().getScanBatchSize())
.build();
@ -417,6 +415,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
return scan(backOffer, startKey, version, false);
}
public List<KvPair> scan(
BackOffer backOffer, ByteString startKey, long version, boolean reverse) {
return scan(backOffer, startKey, version, false, reverse);
}
/**
* Prewrite batch keys
*
@ -1238,9 +1241,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
* @param backOffer BackOffer
* @param key startKey
* @param keyOnly true if value of KvPair is not needed
* @param reverse
* @return KvPair list
*/
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
public List<KvPair> rawScan(
BackOffer backOffer, ByteString key, int limit, boolean keyOnly, boolean reverse) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan", clusterId.toString()).startTimer();
@ -1254,6 +1259,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setEndKey(range.second)
.setKeyOnly(keyOnly)
.setLimit(limit)
.setReverse(reverse)
.build();
};
@ -1271,8 +1277,9 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
}
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, boolean keyOnly) {
return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly);
public List<KvPair> rawScan(
BackOffer backOffer, ByteString key, boolean keyOnly, boolean reverse) {
return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly, reverse);
}
private List<KvPair> rawScanHelper(RawScanResponse resp) {

View File

@ -1011,7 +1011,7 @@ public class RawKVClient implements RawKVClientBase {
if (limit > MAX_RAW_SCAN_LIMIT) {
throw ERR_MAX_SCAN_LIMIT_EXCEEDED;
}
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer);
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, false, backOffer);
}
/**

View File

@ -114,8 +114,37 @@ public class KVClient implements AutoCloseable {
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, long version)
throws GrpcException {
return scan(startKey, endKey, version, false);
}
/**
* Scan key-value pairs from TiKV reversely in range (startKey, endKey]
*
* @param startKey start key, inclusive
* @param endKey end key, exclusive
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> reverseScan(ByteString startKey, ByteString endKey, long version)
throws GrpcException {
return scan(endKey, startKey, version, true);
}
public List<Kvrpcpb.KvPair> scan(
ByteString startKey, ByteString endKey, long version, boolean reverse) throws GrpcException {
Iterator<Kvrpcpb.KvPair> iterator;
if (reverse) {
iterator = scanIterator(conf, clientBuilder, endKey, startKey, version, reverse);
} else {
iterator = scanIterator(conf, clientBuilder, startKey, endKey, version, reverse);
}
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
}
public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version, int limit, boolean reverse)
throws GrpcException {
Iterator<Kvrpcpb.KvPair> iterator =
scanIterator(conf, clientBuilder, startKey, endKey, version);
scanIterator(conf, clientBuilder, startKey, version, limit, reverse);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
@ -130,14 +159,27 @@ public class KVClient implements AutoCloseable {
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version, int limit)
throws GrpcException {
Iterator<Kvrpcpb.KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
return scan(startKey, version, limit, false);
}
/**
* Scan key-value pairs reversively from TiKV in range ('', endKey], maximum to `limit` pairs
*
* @param endKey start key, inclusive
* @param limit limit of kv pairs
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> reverseScan(ByteString endKey, long version, int limit)
throws GrpcException {
return scan(endKey, version, limit, true);
}
public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version) throws GrpcException {
return scan(startKey, version, Integer.MAX_VALUE);
return scan(startKey, version, Integer.MAX_VALUE, false);
}
public List<Kvrpcpb.KvPair> reverseScan(ByteString endKey, long version) throws GrpcException {
return scan(endKey, version, Integer.MAX_VALUE, true);
}
public synchronized void ingest(List<Pair<ByteString, ByteString>> list) throws GrpcException {
@ -264,8 +306,9 @@ public class KVClient implements AutoCloseable {
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
long version,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version, reverse);
}
private Iterator<Kvrpcpb.KvPair> scanIterator(
@ -273,8 +316,9 @@ public class KVClient implements AutoCloseable {
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
int limit,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit, reverse);
}
private void doIngest(TiRegion region, List<Pair<ByteString, ByteString>> sortedList)