diff --git a/.github/config/tikv_rawkv.toml b/.github/config/tikv_rawkv.toml
index 5ee1bfe07e..c339b48639 100644
--- a/.github/config/tikv_rawkv.toml
+++ b/.github/config/tikv_rawkv.toml
@@ -2,17 +2,20 @@
[raftstore]
# set store capacity, if no set, use disk capacity.
-capacity = "8G"
+capacity = "6G"
pd-heartbeat-tick-interval = "2s"
pd-store-heartbeat-tick-interval = "5s"
split-region-check-tick-interval = "1s"
-[storage]
-enable-ttl = true
-
[rocksdb]
max-open-files = 10000
[raftdb]
max-open-files = 10000
+[storage.block-cache]
+capacity = "128MB"
+
+[storage]
+reserve-space = "0MB"
+enable-ttl = true
diff --git a/.github/config/tikv_txnkv.toml b/.github/config/tikv_txnkv.toml
index c083cfa31b..e327632e58 100644
--- a/.github/config/tikv_txnkv.toml
+++ b/.github/config/tikv_txnkv.toml
@@ -2,7 +2,7 @@
[raftstore]
# set store capacity, if no set, use disk capacity.
-capacity = "8G"
+capacity = "6G"
pd-heartbeat-tick-interval = "2s"
pd-store-heartbeat-tick-interval = "5s"
split-region-check-tick-interval = "1s"
@@ -12,3 +12,9 @@ max-open-files = 10000
[raftdb]
max-open-files = 10000
+
+[storage.block-cache]
+capacity = "128MB"
+
+[storage]
+reserve-space = "0MB"
diff --git a/.github/config/tikv_v2.toml b/.github/config/tikv_v2.toml
new file mode 100644
index 0000000000..a1b5b65706
--- /dev/null
+++ b/.github/config/tikv_v2.toml
@@ -0,0 +1,17 @@
+# TiKV Configuration.
+
+[raftstore]
+pd-heartbeat-tick-interval = "2s"
+pd-store-heartbeat-tick-interval = "5s"
+split-region-check-tick-interval = "1s"
+
+[rocksdb]
+max-open-files = 10000
+
+[raftdb]
+max-open-files = 10000
+
+[storage]
+reserve-space = "0MB"
+api-version = 2
+enable-ttl = true
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c2fbf68aa8..6511ec63c1 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -31,7 +31,8 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- tikv_version: [nightly, v5.0.4, v5.3.0, v5.4.0]
+ tikv_version: [v5.0.6, v5.3.4, v5.4.3]
+ fail-fast: false
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
@@ -40,29 +41,41 @@ jobs:
java-version: '8.0'
distribution: 'adopt'
- name: Install TiUP
- run: curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
+ run: |
+ curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
+ /home/runner/.tiup/bin/tiup install playground pd:${{ matrix.tikv_version }} tikv:${{ matrix.tikv_version }}
- name: Start TiUP Playground
run: |
# Start TiKV in APIV1TTL
- /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_rawkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml &> raw.out 2>&1 &
-
- # The first run of `tiup` has to download all components so it'll take longer.
- sleep 1m 30s
+ touch tiup-v1ttl.log
+ /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --host 127.0.0.1 --tag rawkv --mode tikv-slim --kv 1 --without-monitor --kv.port 20160 --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_rawkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2379 2>&1 >> tiup-v1ttl.log &
+ timeout 300 grep -q "PD Endpoints:" <(tail -f tiup-v1ttl.log)
+ cat tiup-v1ttl.log
+ echo "Wait for bootstrap"
+ sleep 10s
# Start TiKV in APIV1
- /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_txnkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml &> txn.out 2>&1 &
+ touch tiup-v1.log
+ /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --host 127.0.0.1 --tag txnkv --mode tikv-slim --kv 1 --without-monitor --kv.port 30160 --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_txnkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2381 2>&1 >> tiup-v1.log &
+ timeout 300 grep -q "PD Endpoints:" <(tail -f tiup-v1.log)
+ cat tiup-v1.log
+ echo "Wait for bootstrap"
+ sleep 10s
- sleep 30s
+ # Get PD address
+ echo "RAWKV_PD_ADDRESSES=127.0.0.1:2379" >> $GITHUB_ENV
+ echo "TXNKV_PD_ADDRESSES=127.0.0.1:2381" >> $GITHUB_ENV
- # Parse PD address from `tiup` output
- echo "RAWKV_PD_ADDRESSES=$(cat raw.out | grep -oP '(?<=PD client endpoints: \[)[0-9\.:]+(?=\])')" >> $GITHUB_ENV
- echo "TXNKV_PD_ADDRESSES=$(cat txn.out | grep -oP '(?<=PD client endpoints: \[)[0-9\.:]+(?=\])')" >> $GITHUB_ENV
-
- # Log the output
- echo "$(cat raw.out)" >&2
- echo "$(cat txn.out)" >&2
- name: Run Integration Test
run: mvn clean test
+ - name: Print TiKV logs
+ if: failure()
+ run: |
+ echo "RawKV TiKV logs"
+ cat /home/runner/.tiup/data/rawkv/tikv-0/tikv.log
+
+ echo "TxnKV TiKV logs"
+ cat /home/runner/.tiup/data/txnkv/tikv-0/tikv.log
- name: Upload coverage
uses: codecov/codecov-action@v2
with:
diff --git a/.github/workflows/ci_v2.yml b/.github/workflows/ci_v2.yml
new file mode 100644
index 0000000000..be69782a47
--- /dev/null
+++ b/.github/workflows/ci_v2.yml
@@ -0,0 +1,52 @@
+name: CI (APIv2)
+
+on:
+ pull_request:
+ push:
+ branches:
+ - master
+
+jobs:
+ integration-test:
+ name: Integration Test - ${{ matrix.tikv_version }}
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ tikv_version: [v6.5.3, v7.1.1, nightly]
+ fail-fast: false
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK 8
+ uses: actions/setup-java@v2
+ with:
+ java-version: '8.0'
+ distribution: 'adopt'
+ - name: Install TiUP
+ run: |
+ curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
+ /home/runner/.tiup/bin/tiup install playground pd:${{ matrix.tikv_version }} tikv:${{ matrix.tikv_version }}
+ - name: Start TiUP Playground
+ run: |
+ # Start TiKV in APIV2
+ touch tiup.log
+ /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --tag kv --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_v2.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2379 2>&1 >> tiup.log &
+ timeout 300 grep -q "PD Endpoints:" <(tail -f tiup.log)
+ cat tiup.log
+
+ # Get PD address
+ echo "RAWKV_PD_ADDRESSES=127.0.0.1:2379" >> $GITHUB_ENV
+ echo "TXNKV_PD_ADDRESSES=127.0.0.1:2379" >> $GITHUB_ENV
+
+ - name: Run Integration Test
+ run: mvn clean test
+ - name: Print TiKV logs
+ if: failure()
+ run: |
+ echo "TiKV logs"
+ cat /home/runner/.tiup/data/kv/tikv-0/tikv.log
+ - name: Upload coverage
+ uses: codecov/codecov-action@v2
+ with:
+ files: ${{ github.workspace }}/target/site/jacoco/jacoco.xml
+ fail_ci_if_error: true
+ verbose: true
diff --git a/.github/workflows/license-checker.yml b/.github/workflows/license-checker.yml
index 4e1cf90a4f..cd5c12f84c 100644
--- a/.github/workflows/license-checker.yml
+++ b/.github/workflows/license-checker.yml
@@ -15,7 +15,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Check License Header
- uses: apache/skywalking-eyes@main
+ uses: apache/skywalking-eyes@v0.3.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
diff --git a/.github/workflows/stale-checker.yml b/.github/workflows/stale-checker.yml
deleted file mode 100644
index 478fc504f7..0000000000
--- a/.github/workflows/stale-checker.yml
+++ /dev/null
@@ -1,19 +0,0 @@
-name: 'Stale Checker'
-on:
- schedule:
- - cron: '0 0 * * *'
-
-jobs:
- stale:
- runs-on: ubuntu-latest
- steps:
- - uses: actions/stale@v4
- with:
- days-before-stale: 30
- stale-issue-message: 'This issue is stale because it has been open 30 days with no activity.'
- stale-issue-label: 'status/stale'
- days-before-issue-close: -1
- stale-pr-message: 'This PR is stale because it has been open 30 days with no activity. Remove the status/stale label or comment or this PR will be closed in 7 days.'
- stale-pr-label: 'status/stale'
- days-before-pr-close: 7
- close-pr-message: 'This PR was closed because it has been stalled for 7 days with no activity.'
diff --git a/README.md b/README.md
index 0acd816deb..b2d64294b8 100644
--- a/README.md
+++ b/README.md
@@ -4,18 +4,17 @@
## TiKV JAVA Client
-A Java client for [TiDB](https://github.com/pingcap/tidb)/[TiKV](https://github.com/tikv/tikv).
+A Java client for [TiKV](https://github.com/tikv/tikv).
It is supposed to:
+ Communicate via [gRPC](http://www.grpc.io/)
+ Talk to Placement Driver searching for a region
-+ Talk to TiKV for reading/writing data and the resulted data is encoded/decoded just like what we do in TiDB.
-+ Talk to Coprocessor for calculation pushdown
++ Talk to TiKV for reading/writing data
## Quick Start
-> TiKV Java Client is designed to communicate with [pd](https://github.com/tikv/pd) and [tikv](https://github.com/tikv/tikv), please run TiKV and PD in advance.
+> TiKV Java Client is designed to communicate with [PD](https://github.com/tikv/pd) and [TiKV](https://github.com/tikv/tikv), please run PD and TiKV in advance.
-Build java client from source file:
+Build Java client from source file:
```sh
mvn clean install -Dmaven.test.skip=true
@@ -27,11 +26,27 @@ Add maven dependency to `pom.xml`:
org.tikv
tikv-client-java
- 3.1.0
+ 3.3.0
```
-Create a RawKVClient and communicates with TiKV:
+Create a transactional `KVClient` and communicates with TiKV:
+
+```java
+import org.tikv.common.TiConfiguration;
+import org.tikv.common.TiSession;
+import org.tikv.txn.KVClient;
+
+public class Main {
+ public static void main(String[] args) throws Exception {
+ TiConfiguration conf = TiConfiguration.createDefault(YOUR_PD_ADDRESSES);
+ TiSession session = TiSession.create(conf);
+ KVClient client = session.createKVClient();
+ }
+}
+```
+
+Or create a `RawKVClient` if you don't need the transaction semantic:
```java
import org.tikv.common.TiConfiguration;
@@ -39,8 +54,7 @@ import org.tikv.common.TiSession;
import org.tikv.raw.RawKVClient;
public class Main {
- public static void main() {
- // You MUST create a raw configuration if you are using RawKVClient.
+ public static void main(String[] args) throws Exception {
TiConfiguration conf = TiConfiguration.createRawDefault(YOUR_PD_ADDRESSES);
TiSession session = TiSession.create(conf);
RawKVClient client = session.createRawClient();
@@ -48,7 +62,7 @@ public class Main {
}
```
-Find more demo in [KVRawClientTest](https://github.com/birdstorm/KVRawClientTest/)
+Find more demo in [TiKV Java Client User Documents](https://tikv.github.io/client-java/examples/introduction.html)
## Documentation
diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md
index 36db8fa621..1916a51b1a 100644
--- a/docs/src/SUMMARY.md
+++ b/docs/src/SUMMARY.md
@@ -4,6 +4,8 @@
- [Introduction](./introduction/introduction.md)
+- [Production Readiness](./production-readiness.md)
+
- [Start With Examples](./examples/introduction.md)
- [Quick Start](./examples/quick-start.md)
- [Interact with TiKV RawKV API](./examples/rawkv.md)
diff --git a/docs/src/examples/quick-start.md b/docs/src/examples/quick-start.md
index 36078f2673..09c6b18645 100644
--- a/docs/src/examples/quick-start.md
+++ b/docs/src/examples/quick-start.md
@@ -24,7 +24,7 @@ Add maven dependency to `pom.xml`.
org.tikv
tikv-client-java
- 3.1.0
+ 3.3.0
org.slf4j
diff --git a/docs/src/examples/rawkv.md b/docs/src/examples/rawkv.md
index 182cc8d09c..e4c9bcacad 100644
--- a/docs/src/examples/rawkv.md
+++ b/docs/src/examples/rawkv.md
@@ -15,7 +15,7 @@ import org.tikv.raw.RawKVClient;
import org.tikv.shade.com.google.protobuf.ByteString;
public class Main {
- public static void main() {
+ public static void main(String[] args) throws Exception {
// You MUST create a raw configuration if you are using RawKVClient.
TiConfiguration conf = TiConfiguration.createRawDefault("127.0.0.1:2379");
TiSession session = TiSession.create(conf);
@@ -61,7 +61,7 @@ To enable the API V2 mode, users need to specify the API version of the client.
import org.tikv.common.TiConfiguration.ApiVersion;
public class Main {
- public static void main() {
+ public static void main(String[] args) throws Exception {
TiConfiguration conf = TiConfiguration.createRawDefault("127.0.0.1:2379");
conf.setApiVersion(ApiVersion.V2);
try(TiSession session = TiSession.create(conf)) {
diff --git a/docs/src/examples/txnkv.md b/docs/src/examples/txnkv.md
index 9bcfb430c4..e3e2e0d4a7 100644
--- a/docs/src/examples/txnkv.md
+++ b/docs/src/examples/txnkv.md
@@ -23,7 +23,7 @@ import org.tikv.txn.TwoPhaseCommitter;
public class App {
public static void main(String[] args) throws Exception {
- TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:2389");
+ TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:2379");
try (TiSession session = TiSession.create(conf)) {
// two-phrase write
long startTS = session.getTimestamp().getVersion();
diff --git a/docs/src/production-readiness.md b/docs/src/production-readiness.md
new file mode 100644
index 0000000000..6604363a30
--- /dev/null
+++ b/docs/src/production-readiness.md
@@ -0,0 +1,18 @@
+# Production Readiness
+
+In general, the latest [release](https://github.com/tikv/client-java/releases) of TiKV Java Client is ready for production use. But it is not battle-tested as full featured client for TiKV in all use cases. This page will give you more details.
+
+## RawKV
+All RawKV APIs are covered by [CI](https://github.com/tikv/client-java/actions/workflows/ci.yml).
+
+At this time, RawKV has been used in the production environment of some commercial customers in latency sensitive systems. But they only use part of the RawKV APIs (mainly including `raw_put`, `raw_get`, `raw_compare_and_swap`, and `raw_batch_put`).
+
+## TxnKV
+All TxnKV APIs are covered by [CI](https://github.com/tikv/client-java/actions/workflows/ci.yml).
+
+In addition, TxnKV has been used in the [TiSpark](https://docs.pingcap.com/tidb/stable/tispark-overview) and [TiBigData](https://github.com/tidb-incubator/TiBigData) project to integrate data from TiDB to Big Data ecosystem. TiSpark and TiBigData are used in the production system of some commercial customers and internet companies.
+
+Similar to RawKV, only part of APIs are used in this scenario (mainly including `prewrite/commit` and `coprocessor`). And this use case doesn't care about latency but throughput and reliability.
+
+## TiDB Cloud
+Directly using TiKV is not possible on TiDB Cloud due to the fact that client has to access the whole cluster, which has security issues. And TiKV managed service is not coming soon as it's not contained in [roadmap](https://docs.pingcap.com/tidbcloud/tidb-cloud-roadmap) yet.
diff --git a/pom.xml b/pom.xml
index 1b98009f30..dd34f060b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
org.tikv
tikv-client-java
- 3.3.0-SNAPSHOT
+ 3.3.4-SNAPSHOT
jar
TiKV Java Client
A Java Client for TiKV
@@ -54,7 +54,7 @@
1.8
UTF-8
UTF-8
- 3.5.1
+ 3.18.0
1.2.17
1.7.16
1.48.0
@@ -62,7 +62,7 @@
2.8.9
1.6.6
2.13.2
- 2.13.2.2
+ 2.13.4.2
3.0.1
0.4.1
2.9.9
diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java
index 43383bbda8..e3695b6916 100644
--- a/src/main/java/org/tikv/common/PDClient.java
+++ b/src/main/java/org/tikv/common/PDClient.java
@@ -104,6 +104,7 @@ import org.tikv.kvproto.Pdpb.ScatterRegionResponse;
import org.tikv.kvproto.Pdpb.Timestamp;
import org.tikv.kvproto.Pdpb.TsoRequest;
import org.tikv.kvproto.Pdpb.TsoResponse;
+import org.tikv.kvproto.Pdpb.UpdateServiceGCSafePointRequest;
public class PDClient extends AbstractGRPCClient
implements ReadOnlyPDClient {
@@ -383,6 +384,17 @@ public class PDClient extends AbstractGRPCClient
return () -> GetAllStoresRequest.newBuilder().setHeader(header).build();
}
+ private Supplier buildUpdateServiceGCSafePointRequest(
+ ByteString serviceId, long ttl, long safePoint) {
+ return () ->
+ UpdateServiceGCSafePointRequest.newBuilder()
+ .setHeader(header)
+ .setSafePoint(safePoint)
+ .setServiceId(serviceId)
+ .setTTL(ttl)
+ .build();
+ }
+
private PDErrorHandler buildPDErrorHandler() {
return new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this);
@@ -419,6 +431,20 @@ public class PDClient extends AbstractGRPCClient
return conf.getReplicaRead();
}
+ @Override
+ public Long updateServiceGCSafePoint(
+ String serviceId, long ttl, long safePoint, BackOffer backOffer) {
+ return callWithRetry(
+ backOffer,
+ PDGrpc.getUpdateServiceGCSafePointMethod(),
+ buildUpdateServiceGCSafePointRequest(
+ ByteString.copyFromUtf8(serviceId), ttl, safePoint),
+ new PDErrorHandler<>(
+ r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
+ this))
+ .getMinSafePoint();
+ }
+
@Override
public void close() throws InterruptedException {
etcdClient.close();
diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java
index ddf1855e61..58ad9b2a62 100644
--- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java
+++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java
@@ -72,4 +72,16 @@ public interface ReadOnlyPDClient {
Long getClusterId();
RequestKeyCodec getCodec();
+
+ /**
+ * Update ServiceGCSafePoint
+ *
+ * @param serviceId ServiceId
+ * @param ttl TTL in seconds
+ * @param safePoint The TiTimestamp you want to set. Set to start_ts.getPrevious() is a good
+ * practice
+ * @return the MinSafePoint of all services. If this value is greater than safePoint, it means
+ * update failed
+ */
+ Long updateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer);
}
diff --git a/src/main/java/org/tikv/common/apiversion/CodecUtils.java b/src/main/java/org/tikv/common/apiversion/CodecUtils.java
index 1c6cfea9fa..a2b0725be5 100644
--- a/src/main/java/org/tikv/common/apiversion/CodecUtils.java
+++ b/src/main/java/org/tikv/common/apiversion/CodecUtils.java
@@ -23,7 +23,7 @@ import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.CodecDataOutput;
// TODO(iosmanthus): use ByteString.wrap to avoid once more copying.
-class CodecUtils {
+public class CodecUtils {
public static ByteString encode(ByteString key) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key.toByteArray());
diff --git a/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java b/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java
index 11db11c641..ab86fb5e02 100644
--- a/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java
+++ b/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java
@@ -81,22 +81,21 @@ public class RequestKeyV2Codec implements RequestKeyCodec {
if (!start.isEmpty()) {
start = CodecUtils.decode(start);
- if (ByteString.unsignedLexicographicalComparator().compare(start, keyPrefix) < 0) {
- start = ByteString.EMPTY;
- } else {
- start = decodeKey(start);
- }
}
if (!end.isEmpty()) {
end = CodecUtils.decode(end);
- if (ByteString.unsignedLexicographicalComparator().compare(end, infiniteEndKey) >= 0) {
- end = ByteString.EMPTY;
- } else {
- end = decodeKey(end);
- }
}
+ if (ByteString.unsignedLexicographicalComparator().compare(start, infiniteEndKey) >= 0
+ || (!end.isEmpty()
+ && ByteString.unsignedLexicographicalComparator().compare(end, keyPrefix) <= 0)) {
+ throw new IllegalArgumentException("region out of keyspace" + region.toString());
+ }
+
+ start = start.startsWith(keyPrefix) ? start.substring(keyPrefix.size()) : ByteString.EMPTY;
+ end = end.startsWith(keyPrefix) ? end.substring(keyPrefix.size()) : ByteString.EMPTY;
+
return builder.setStartKey(start).setEndKey(end).build();
}
}
diff --git a/src/main/java/org/tikv/common/codec/RowV2.java b/src/main/java/org/tikv/common/codec/RowV2.java
index 44891e4e91..6893894a7b 100644
--- a/src/main/java/org/tikv/common/codec/RowV2.java
+++ b/src/main/java/org/tikv/common/codec/RowV2.java
@@ -147,7 +147,7 @@ public class RowV2 {
if (this.large) {
v = this.colIDs32[h];
} else {
- v = this.colIDs[h];
+ v = this.colIDs[h] & 0xFF;
}
if (v < colID) {
i = h + 1;
diff --git a/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java b/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java
index 10d21942c9..ca7d73bac3 100644
--- a/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java
+++ b/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java
@@ -97,6 +97,8 @@ public class CacheInvalidateEvent implements Serializable {
public enum CacheType implements Serializable {
REGION_STORE,
+ STORE,
+ REGION,
REQ_FAILED,
LEADER
}
diff --git a/src/main/java/org/tikv/common/expression/ColumnRef.java b/src/main/java/org/tikv/common/expression/ColumnRef.java
index 0a6ed6e4b0..61746cd2e1 100644
--- a/src/main/java/org/tikv/common/expression/ColumnRef.java
+++ b/src/main/java/org/tikv/common/expression/ColumnRef.java
@@ -123,9 +123,9 @@ public class ColumnRef extends Expression {
@Override
public int hashCode() {
if (isResolved()) {
- return Objects.hash(this.name, this.dataType);
+ return Objects.hash(this.name.toLowerCase(), this.dataType);
} else {
- return Objects.hashCode(name);
+ return Objects.hashCode(name.toLowerCase());
}
}
diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java
index c30f7b6f6f..debbccf7ee 100644
--- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java
+++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java
@@ -21,10 +21,13 @@ import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.KeyUtils;
+import org.tikv.common.event.CacheInvalidateEvent;
+import org.tikv.common.event.CacheInvalidateEvent.CacheType;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.region.RegionErrorReceiver;
@@ -43,6 +46,11 @@ public class RegionErrorHandler implements ErrorHandler {
private final Function getRegionError;
private final RegionManager regionManager;
private final RegionErrorReceiver recv;
+ private final List> cacheInvalidateCallBackList;
+
+ private final ExecutorService callBackThreadPool;
+ private final int INVALID_STORE_ID = 0;
+ private final int INVALID_REGION_ID = 0;
public RegionErrorHandler(
RegionManager regionManager,
@@ -51,6 +59,8 @@ public class RegionErrorHandler implements ErrorHandler {
this.recv = recv;
this.regionManager = regionManager;
this.getRegionError = getRegionError;
+ this.cacheInvalidateCallBackList = regionManager.getCacheInvalidateCallbackList();
+ this.callBackThreadPool = regionManager.getCallBackThreadPool();
}
@Override
@@ -107,6 +117,7 @@ public class RegionErrorHandler implements ErrorHandler {
if (!retry) {
this.regionManager.invalidateRegion(recv.getRegion());
+ notifyRegionLeaderError(recv.getRegion());
}
backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString()));
@@ -116,15 +127,14 @@ public class RegionErrorHandler implements ErrorHandler {
// this error is reported from raftstore:
// store_id requested at the moment is inconsistent with that expected
// Solution:re-fetch from PD
- long storeId = recv.getRegion().getLeader().getStoreId();
+ long storeId = error.getStoreNotMatch().getRequestStoreId();
long actualStoreId = error.getStoreNotMatch().getActualStoreId();
logger.warn(
String.format(
"Store Not Match happened with region id %d, store id %d, actual store id %d",
recv.getRegion().getId(), storeId, actualStoreId));
-
- this.regionManager.invalidateRegion(recv.getRegion());
- this.regionManager.invalidateStore(storeId);
+ // may request store which is not leader.
+ invalidateRegionStoreCache(recv.getRegion(), storeId);
// assume this is a low probability error, do not retry, just re-split the request by
// throwing it out.
return false;
@@ -143,8 +153,6 @@ public class RegionErrorHandler implements ErrorHandler {
BackOffFunction.BackOffFuncType.BoServerBusy,
new StatusRuntimeException(
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())));
- backOffer.doBackOff(
- BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
return true;
} else if (error.hasStaleCommand()) {
// this error is reported from raftstore:
@@ -179,7 +187,7 @@ public class RegionErrorHandler implements ErrorHandler {
logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion()));
// For other errors, we only drop cache here.
// Upper level may split this task.
- invalidateRegionStoreCache(recv.getRegion());
+ invalidateRegionStoreCache(recv.getRegion(), recv.getRegion().getLeader().getStoreId());
// retry if raft proposal is dropped, it indicates the store is in the middle of transition
if (error.getMessage().contains("Raft ProposalDropped")) {
backOffer.doBackOff(
@@ -196,6 +204,7 @@ public class RegionErrorHandler implements ErrorHandler {
private boolean onRegionEpochNotMatch(BackOffer backOffer, List currentRegions) {
if (currentRegions.size() == 0) {
this.regionManager.onRegionStale(recv.getRegion());
+ notifyRegionCacheInvalidate(recv.getRegion());
return false;
}
@@ -220,7 +229,16 @@ public class RegionErrorHandler implements ErrorHandler {
// If the region epoch is not ahead of TiKV's, replace region meta in region cache.
for (Metapb.Region meta : currentRegions) {
// The region needs to be decoded to plain format.
- meta = regionManager.getPDClient().getCodec().decodeRegion(meta);
+ try {
+ meta = regionManager.getPDClient().getCodec().decodeRegion(meta);
+ } catch (Exception e) {
+ logger.warn("ignore invalid region: " + meta.toString());
+ // if the region is invalid, ignore it since the following situation might appear.
+ // Assuming a region with range [r000, z), then it splits into:
+ // [r000, x) [x, z), the right region is invalid for keyspace `r000`.
+ // We should only care about the valid region.
+ continue;
+ }
TiRegion region = regionManager.createRegion(meta, backOffer);
newRegions.add(region);
if (recv.getRegion().getVerID() == region.getVerID()) {
@@ -229,6 +247,7 @@ public class RegionErrorHandler implements ErrorHandler {
}
if (needInvalidateOld) {
+ notifyRegionCacheInvalidate(recv.getRegion());
this.regionManager.onRegionStale(recv.getRegion());
}
@@ -271,8 +290,51 @@ public class RegionErrorHandler implements ErrorHandler {
return recv.getRegion();
}
- private void invalidateRegionStoreCache(TiRegion ctxRegion) {
+ private void notifyRegionRequestError(
+ TiRegion ctxRegion, long storeId, CacheInvalidateEvent.CacheType type) {
+ CacheInvalidateEvent event;
+ // When store(region) id is invalid,
+ // it implies that the error was not caused by store(region) error.
+ switch (type) {
+ case REGION:
+ case LEADER:
+ event = new CacheInvalidateEvent(ctxRegion.getId(), INVALID_STORE_ID, true, false, type);
+ break;
+ case REGION_STORE:
+ event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type);
+ break;
+ case REQ_FAILED:
+ event = new CacheInvalidateEvent(INVALID_REGION_ID, INVALID_STORE_ID, false, false, type);
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type);
+ }
+ if (cacheInvalidateCallBackList != null) {
+ for (Function cacheInvalidateCallBack :
+ cacheInvalidateCallBackList) {
+ callBackThreadPool.submit(
+ () -> {
+ try {
+ cacheInvalidateCallBack.apply(event);
+ } catch (Exception e) {
+ logger.error(String.format("CacheInvalidCallBack failed %s", e));
+ }
+ });
+ }
+ }
+ }
+
+ private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) {
regionManager.invalidateRegion(ctxRegion);
- regionManager.invalidateStore(ctxRegion.getLeader().getStoreId());
+ regionManager.invalidateStore(storeId);
+ notifyRegionRequestError(ctxRegion, storeId, CacheType.REGION_STORE);
+ }
+
+ private void notifyRegionCacheInvalidate(TiRegion ctxRegion) {
+ notifyRegionRequestError(ctxRegion, 0, CacheType.REGION);
+ }
+
+ private void notifyRegionLeaderError(TiRegion ctxRegion) {
+ notifyRegionRequestError(ctxRegion, 0, CacheType.LEADER);
}
}
diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java
index 44c8137510..9678d9e813 100644
--- a/src/main/java/org/tikv/common/region/RegionManager.java
+++ b/src/main/java/org/tikv/common/region/RegionManager.java
@@ -23,13 +23,18 @@ import com.google.protobuf.ByteString;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
+import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.InvalidStoreException;
import org.tikv.common.exception.TiClientInternalException;
@@ -68,9 +73,36 @@ public class RegionManager {
private final TiConfiguration conf;
private final ScheduledExecutorService executor;
private final StoreHealthyChecker storeChecker;
+ private final CopyOnWriteArrayList>
+ cacheInvalidateCallbackList;
+ private final ExecutorService callBackThreadPool;
+ private AtomicInteger tiflashStoreIndex = new AtomicInteger(0);
public RegionManager(
TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) {
+ this(conf, pdClient, channelFactory, 1);
+ }
+
+ public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
+ this(conf, pdClient, 1);
+ }
+
+ public RegionManager(
+ TiConfiguration conf, ReadOnlyPDClient pdClient, int callBackExecutorThreadNum) {
+ this.cache = new RegionCache();
+ this.pdClient = pdClient;
+ this.conf = conf;
+ this.storeChecker = null;
+ this.executor = null;
+ this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
+ this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
+ }
+
+ public RegionManager(
+ TiConfiguration conf,
+ ReadOnlyPDClient pdClient,
+ ChannelFactory channelFactory,
+ int callBackExecutorThreadNum) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
@@ -81,26 +113,34 @@ public class RegionManager {
this.storeChecker = storeChecker;
this.executor = Executors.newScheduledThreadPool(1);
this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS);
- }
-
- public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
- this.cache = new RegionCache();
- this.pdClient = pdClient;
- this.conf = conf;
- this.storeChecker = null;
- this.executor = null;
+ this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
+ this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
}
public synchronized void close() {
if (this.executor != null) {
this.executor.shutdownNow();
}
+ this.callBackThreadPool.shutdownNow();
}
public ReadOnlyPDClient getPDClient() {
return this.pdClient;
}
+ public ExecutorService getCallBackThreadPool() {
+ return callBackThreadPool;
+ }
+
+ public List> getCacheInvalidateCallbackList() {
+ return cacheInvalidateCallbackList;
+ }
+
+ public void addCacheInvalidateCallback(
+ Function cacheInvalidateCallback) {
+ this.cacheInvalidateCallbackList.add(cacheInvalidateCallback);
+ }
+
public void invalidateAll() {
cache.invalidateAll();
}
@@ -137,8 +177,13 @@ public class RegionManager {
Pair regionAndLeader = pdClient.getRegionByKey(backOffer, key);
region =
cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer));
+ logger.debug(
+ String.format(
+ "get region id: %d with leader: %d",
+ region.getId(), region.getLeader().getStoreId()));
}
} catch (Exception e) {
+ logger.warn("Get region failed: ", e);
return null;
} finally {
requestTimer.observeDuration();
@@ -188,20 +233,40 @@ public class RegionManager {
TiStore store = null;
if (storeType == TiStoreType.TiKV) {
- Peer peer = region.getCurrentReplica();
- store = getStoreById(peer.getStoreId(), backOffer);
+ // check from the first replica in case it recovers
+ List replicaList = region.getReplicaList();
+ for (int i = 0; i < replicaList.size(); i++) {
+ Peer peer = replicaList.get(i);
+ store = getStoreById(peer.getStoreId(), backOffer);
+ if (store.isReachable()) {
+ // update replica's index
+ region.setReplicaIdx(i);
+ break;
+ }
+ logger.info("Store {} is unreachable, try to get the next replica", peer.getStoreId());
+ }
+ // Does not set unreachable store to null in case it is incompatible with GrpcForward
+ if (store == null || !store.isReachable()) {
+ logger.warn("No TiKV store available for region: " + region);
+ }
} else {
- outerLoop:
+ List tiflashStores = new ArrayList<>();
for (Peer peer : region.getLearnerList()) {
TiStore s = getStoreById(peer.getStoreId(), backOffer);
- for (Metapb.StoreLabel label : s.getStore().getLabelsList()) {
- if (label.getKey().equals(storeType.getLabelKey())
- && label.getValue().equals(storeType.getLabelValue())) {
- store = s;
- break outerLoop;
- }
+ if (!s.isReachable()) {
+ continue;
+ }
+ if (s.isTiFlash()) {
+ tiflashStores.add(s);
}
}
+ // select a tiflash with Round-Robin strategy
+ if (tiflashStores.size() > 0) {
+ store =
+ tiflashStores.get(
+ Math.floorMod(tiflashStoreIndex.getAndIncrement(), tiflashStores.size()));
+ }
+
if (store == null) {
// clear the region cache, so we may get the learner peer next time
cache.invalidateRegion(region);
diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java
index ba742c872b..22607b2bdb 100644
--- a/src/main/java/org/tikv/common/region/RegionStoreClient.java
+++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java
@@ -357,7 +357,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
this,
lockResolverClient,
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
- resp -> null,
+ resp -> resp.hasError() ? resp.getError() : null,
resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
version,
forWrite);
@@ -366,13 +366,14 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
// we need to update region after retry
region = regionManager.getRegionByKey(startKey, backOffer);
- if (isScanSuccess(backOffer, resp)) {
- return doScan(resp);
+ if (handleScanResponse(backOffer, resp, version, forWrite)) {
+ return resp.getPairsList();
}
}
}
- private boolean isScanSuccess(BackOffer backOffer, ScanResponse resp) {
+ private boolean handleScanResponse(
+ BackOffer backOffer, ScanResponse resp, long version, boolean forWrite) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("ScanResponse failed without a cause");
@@ -381,28 +382,35 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
backOffer.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
return false;
}
- return true;
- }
- // TODO: resolve locks after scan
- private List doScan(ScanResponse resp) {
- // Check if kvPair contains error, it should be a Lock if hasError is true.
- List kvPairs = resp.getPairsList();
- List newKvPairs = new ArrayList<>();
- for (KvPair kvPair : kvPairs) {
+ // Resolve locks
+ // Note: Memory lock conflict is returned by both `ScanResponse.error` &
+ // `ScanResponse.pairs[0].error`, while other key errors are returned by
+ // `ScanResponse.pairs.error`
+ // See https://github.com/pingcap/kvproto/pull/697
+ List locks = new ArrayList<>();
+ for (KvPair kvPair : resp.getPairsList()) {
if (kvPair.hasError()) {
Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError(), codec);
- newKvPairs.add(
- KvPair.newBuilder()
- .setError(kvPair.getError())
- .setValue(kvPair.getValue())
- .setKey(lock.getKey())
- .build());
- } else {
- newKvPairs.add(codec.decodeKvPair(kvPair));
+ locks.add(lock);
}
}
- return Collections.unmodifiableList(newKvPairs);
+ if (!locks.isEmpty()) {
+ ResolveLockResult resolveLockResult =
+ lockResolverClient.resolveLocks(backOffer, version, locks, forWrite);
+ addResolvedLocks(version, resolveLockResult.getResolvedLocks());
+
+ long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
+ if (msBeforeExpired > 0) {
+ // if not resolve all locks, we wait and retry
+ backOffer.doBackOffWithMaxSleep(
+ BoTxnLockFast, msBeforeExpired, new KeyException(locks.toString()));
+ }
+
+ return false;
+ }
+
+ return true;
}
public List scan(BackOffer backOffer, ByteString startKey, long version) {
diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java
index 8d305649c4..3ae3f40d1f 100644
--- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java
+++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java
@@ -20,17 +20,22 @@ import io.grpc.ManagedChannel;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
+import io.grpc.stub.ClientCalls;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Metapb;
+import org.tikv.kvproto.Mpp;
+import org.tikv.kvproto.Mpp.IsAliveRequest;
+import org.tikv.kvproto.TikvGrpc;
public class StoreHealthyChecker implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class);
@@ -75,6 +80,30 @@ public class StoreHealthyChecker implements Runnable {
private boolean checkStoreHealth(TiStore store) {
String addressStr = store.getStore().getAddress();
+ if (store.isTiFlash()) {
+ return checkTiFlashHealth(addressStr);
+ }
+ return checkTiKVHealth(addressStr);
+ }
+
+ private boolean checkTiFlashHealth(String addressStr) {
+ try {
+ ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
+ TikvGrpc.TikvBlockingStub stub =
+ TikvGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
+ Supplier factory = () -> Mpp.IsAliveRequest.newBuilder().build();
+ Mpp.IsAliveResponse resp =
+ ClientCalls.blockingUnaryCall(
+ stub.getChannel(), TikvGrpc.getIsAliveMethod(), stub.getCallOptions(), factory.get());
+ return resp != null && resp.getAvailable();
+ } catch (Exception e) {
+ logger.info(
+ "fail to check TiFlash health, regard as unhealthy. TiFlash address: " + addressStr, e);
+ return false;
+ }
+ }
+
+ private boolean checkTiKVHealth(String addressStr) {
try {
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
HealthGrpc.HealthBlockingStub stub =
@@ -83,6 +112,7 @@ public class StoreHealthyChecker implements Runnable {
HealthCheckResponse resp = stub.check(req);
return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
} catch (Exception e) {
+ logger.info("fail to check TiKV health, regard as unhealthy. TiKV address: " + addressStr, e);
return false;
}
}
diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java
index 3c0ce8e48d..9db3397c5e 100644
--- a/src/main/java/org/tikv/common/region/TiRegion.java
+++ b/src/main/java/org/tikv/common/region/TiRegion.java
@@ -126,6 +126,14 @@ public class TiRegion implements Serializable {
return getCurrentReplica();
}
+ public void setReplicaIdx(int idx) {
+ replicaIdx = idx;
+ }
+
+ public List getReplicaList() {
+ return replicaList;
+ }
+
private boolean isLeader(Peer peer) {
return getLeader().equals(peer);
}
diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java
index 8513e2b56e..5feaa246fe 100644
--- a/src/main/java/org/tikv/common/region/TiStore.java
+++ b/src/main/java/org/tikv/common/region/TiStore.java
@@ -105,4 +105,14 @@ public class TiStore implements Serializable {
public long getId() {
return this.store.getId();
}
+
+ public boolean isTiFlash() {
+ for (Metapb.StoreLabel label : store.getLabelsList()) {
+ if (label.getKey().equals(TiStoreType.TiFlash.getLabelKey())
+ && label.getValue().equals(TiStoreType.TiFlash.getLabelValue())) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/src/main/java/org/tikv/common/util/Pair.java b/src/main/java/org/tikv/common/util/Pair.java
index 803880a9d1..65ae082e78 100644
--- a/src/main/java/org/tikv/common/util/Pair.java
+++ b/src/main/java/org/tikv/common/util/Pair.java
@@ -18,6 +18,7 @@
package org.tikv.common.util;
import java.io.Serializable;
+import java.util.Objects;
public class Pair implements Serializable {
public final F first;
@@ -36,4 +37,21 @@ public class Pair implements Serializable {
public String toString() {
return String.format("[%s:%s]", first, second);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Pair, ?> pair = (Pair, ?>) o;
+ return Objects.equals(first, pair.first) && Objects.equals(second, pair.second);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(first, second);
+ }
}
diff --git a/src/main/java/org/tikv/txn/KVClient.java b/src/main/java/org/tikv/txn/KVClient.java
index dfa9b8b296..e8c83c5446 100644
--- a/src/main/java/org/tikv/txn/KVClient.java
+++ b/src/main/java/org/tikv/txn/KVClient.java
@@ -50,6 +50,7 @@ public class KVClient implements AutoCloseable {
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final ExecutorService executorService;
+ private Set resolvedLocks = Collections.emptySet();
public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder, TiSession session) {
Objects.requireNonNull(conf, "conf is null");
@@ -223,6 +224,10 @@ public class KVClient implements AutoCloseable {
if (oldRegion.equals(currentRegion)) {
RegionStoreClient client = clientBuilder.build(batch.getRegion());
+ // set resolvedLocks for the new client
+ if (!resolvedLocks.isEmpty()) {
+ client.addResolvedLocks(version, resolvedLocks);
+ }
try {
return client.batchGet(backOffer, batch.getKeys(), version);
} catch (final TiKVException e) {
@@ -230,7 +235,8 @@ public class KVClient implements AutoCloseable {
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
logger.warn("ReSplitting ranges for BatchGetRequest", e);
- // retry
+ // get resolved locks and retry
+ resolvedLocks = client.getResolvedLocks(version);
return doSendBatchGetWithRefetchRegion(backOffer, batch, version);
}
} else {
diff --git a/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java
new file mode 100644
index 0000000000..5e4f0a992a
--- /dev/null
+++ b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2022 TiKV Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.tikv.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import org.junit.Test;
+import org.tikv.common.event.CacheInvalidateEvent;
+import org.tikv.common.region.RegionManager;
+import org.tikv.common.region.RegionStoreClient;
+import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
+import org.tikv.common.region.TiStore;
+import org.tikv.common.util.BackOffer;
+import org.tikv.common.util.ConcreteBackOffer;
+import org.tikv.kvproto.Errorpb;
+import org.tikv.kvproto.Errorpb.EpochNotMatch;
+import org.tikv.kvproto.Errorpb.NotLeader;
+import org.tikv.kvproto.Errorpb.StoreNotMatch;
+import org.tikv.kvproto.Metapb;
+
+public class CacheInvalidCallBackTest extends MockServerTest {
+
+ private RegionStoreClient createClient(
+ String version, Function cacheInvalidateCallBack) {
+ Metapb.Store meta =
+ Metapb.Store.newBuilder()
+ .setAddress(LOCAL_ADDR + ":" + port)
+ .setId(1)
+ .setState(Metapb.StoreState.Up)
+ .setVersion(version)
+ .build();
+ TiStore store = new TiStore(meta);
+
+ RegionManager manager = new RegionManager(session.getConf(), session.getPDClient());
+ manager.addCacheInvalidateCallback(cacheInvalidateCallBack);
+ RegionStoreClientBuilder builder =
+ new RegionStoreClientBuilder(
+ session.getConf(), session.getChannelFactory(), manager, session.getPDClient());
+
+ return builder.build(region, store);
+ }
+
+ @Test
+ public void testcacheInvalidCallBack() {
+ String version = "3.0.12";
+ CacheInvalidateCallBack cacheInvalidateCallBack = new CacheInvalidateCallBack();
+ doRawGetTest(createClient(version, cacheInvalidateCallBack), cacheInvalidateCallBack);
+ }
+
+ public void doRawGetTest(
+ RegionStoreClient client, CacheInvalidateCallBack cacheInvalidateCallBack) {
+ server.put("key1", "value1");
+ Optional value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1"));
+ assertEquals(ByteString.copyFromUtf8("value1"), value.get());
+ try {
+ server.putError(
+ "error1", () -> Errorpb.Error.newBuilder().setNotLeader(NotLeader.getDefaultInstance()));
+ client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("error1"));
+ fail();
+ } catch (Exception e) {
+ assertEquals(1, cacheInvalidateCallBack.cacheInvalidateEvents.size());
+ }
+ server.putError(
+ "failure",
+ () -> Errorpb.Error.newBuilder().setEpochNotMatch(EpochNotMatch.getDefaultInstance()));
+ try {
+ client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure"));
+ fail();
+ } catch (Exception e) {
+ sleep(1000);
+ assertEquals(2, cacheInvalidateCallBack.cacheInvalidateEvents.size());
+ }
+ server.putError(
+ "store_not_match",
+ () -> Errorpb.Error.newBuilder().setStoreNotMatch(StoreNotMatch.getDefaultInstance()));
+ try {
+ client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure"));
+ fail();
+ } catch (Exception e) {
+ sleep(1000);
+ assertEquals(3, cacheInvalidateCallBack.cacheInvalidateEvents.size());
+ }
+ server.clearAllMap();
+ client.close();
+ }
+
+ private void sleep(int time) {
+ try {
+ Thread.sleep(time);
+ } catch (InterruptedException e) {
+ fail();
+ }
+ }
+
+ private BackOffer defaultBackOff() {
+ return ConcreteBackOffer.newCustomBackOff(1000);
+ }
+
+ static class CacheInvalidateCallBack implements Function {
+
+ public List cacheInvalidateEvents = new ArrayList<>();
+
+ @Override
+ public Void apply(CacheInvalidateEvent cacheInvalidateEvent) {
+ cacheInvalidateEvents.add(cacheInvalidateEvent);
+ return null;
+ }
+ }
+}
diff --git a/src/test/java/org/tikv/common/GrpcUtils.java b/src/test/java/org/tikv/common/GrpcUtils.java
index e6793f01f1..e7a268f6c3 100644
--- a/src/test/java/org/tikv/common/GrpcUtils.java
+++ b/src/test/java/org/tikv/common/GrpcUtils.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.kvproto.Metapb.Peer;
+import org.tikv.kvproto.Metapb.PeerRole;
import org.tikv.kvproto.Metapb.Region;
import org.tikv.kvproto.Metapb.RegionEpoch;
import org.tikv.kvproto.Metapb.Store;
@@ -61,6 +62,10 @@ public class GrpcUtils {
return Peer.newBuilder().setStoreId(storeId).setId(id).build();
}
+ public static Peer makeLearnerPeer(long id, long storeId) {
+ return Peer.newBuilder().setRole(PeerRole.Learner).setStoreId(storeId).setId(id).build();
+ }
+
public static ByteString encodeKey(byte[] key) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key);
diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java
index 69d8a55ee0..ea09270cfc 100644
--- a/src/test/java/org/tikv/common/KVMockServer.java
+++ b/src/test/java/org/tikv/common/KVMockServer.java
@@ -46,6 +46,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.key.Key;
+import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
@@ -67,6 +68,10 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
private final Map> keyErrMap = new HashMap<>();
+ private final Map> lockMap = new HashMap<>();
+ private final Map> txnStatusMap =
+ new HashMap<>();
+
// for KV error
public static final int ABORT = 1;
public static final int RETRY = 2;
@@ -117,9 +122,68 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
regionErrMap.put(toRawKey(key.getBytes(StandardCharsets.UTF_8)), builder);
}
+ public void removeError(String key) {
+ regionErrMap.remove(toRawKey(key.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ // putWithLock is used to "prewrite" key-value without "commit"
+ public void putWithLock(
+ ByteString key, ByteString value, ByteString primaryKey, Long startTs, Long ttl) {
+ put(key, value);
+
+ Kvrpcpb.LockInfo.Builder lock =
+ Kvrpcpb.LockInfo.newBuilder()
+ .setPrimaryLock(primaryKey)
+ .setLockVersion(startTs)
+ .setKey(key)
+ .setLockTtl(ttl);
+ lockMap.put(toRawKey(key), () -> lock);
+ }
+
+ public void removeLock(ByteString key) {
+ lockMap.remove(toRawKey(key));
+ }
+
+ public boolean hasLock(ByteString key) {
+ return lockMap.containsKey(toRawKey(key));
+ }
+
+ // putTxnStatus is used to save transaction status
+ // commitTs > 0: committed
+ // commitTs == 0 && key is empty: rollback
+ // commitTs == 0 && key not empty: locked by key
+ public void putTxnStatus(Long startTs, Long commitTs, ByteString key) {
+ if (commitTs > 0 || (commitTs == 0 && key.isEmpty())) { // committed || rollback
+ Kvrpcpb.CheckTxnStatusResponse.Builder txnStatus =
+ Kvrpcpb.CheckTxnStatusResponse.newBuilder()
+ .setCommitVersion(commitTs)
+ .setLockTtl(0)
+ .setAction(Kvrpcpb.Action.NoAction);
+ txnStatusMap.put(startTs, () -> txnStatus);
+ } else { // locked
+ Kvrpcpb.LockInfo.Builder lock = lockMap.get(toRawKey(key)).get();
+ Kvrpcpb.CheckTxnStatusResponse.Builder txnStatus =
+ Kvrpcpb.CheckTxnStatusResponse.newBuilder()
+ .setCommitVersion(commitTs)
+ .setLockTtl(lock.getLockTtl())
+ .setAction(Kvrpcpb.Action.NoAction)
+ .setLockInfo(lock);
+ txnStatusMap.put(startTs, () -> txnStatus);
+ }
+ }
+
+ // putTxnStatus is used to save transaction status
+ // commitTs > 0: committed
+ // commitTs == 0: rollback
+ public void putTxnStatus(Long startTs, Long commitTs) {
+ putTxnStatus(startTs, commitTs, ByteString.EMPTY);
+ }
+
public void clearAllMap() {
dataMap.clear();
regionErrMap.clear();
+ lockMap.clear();
+ txnStatusMap.clear();
}
private Errorpb.Error verifyContext(Context context) throws Exception {
@@ -255,9 +319,12 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
return;
}
+ Supplier lock = lockMap.get(key);
Supplier errProvider = keyErrMap.remove(key);
if (errProvider != null) {
builder.setError(errProvider.get().build());
+ } else if (lock != null) {
+ builder.setError(Kvrpcpb.KeyError.newBuilder().setLocked(lock.get()));
} else {
ByteString value = dataMap.get(key);
builder.setValue(value);
@@ -299,11 +366,17 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
kvs.entrySet()
.stream()
.map(
- kv ->
- Kvrpcpb.KvPair.newBuilder()
- .setKey(kv.getKey().toByteString())
- .setValue(kv.getValue())
- .build())
+ kv -> {
+ Kvrpcpb.KvPair.Builder kvBuilder =
+ Kvrpcpb.KvPair.newBuilder()
+ .setKey(kv.getKey().toByteString())
+ .setValue(kv.getValue());
+ Supplier lock = lockMap.get(kv.getKey());
+ if (lock != null) {
+ kvBuilder.setError(Kvrpcpb.KeyError.newBuilder().setLocked(lock.get()));
+ }
+ return kvBuilder.build();
+ })
.collect(Collectors.toList()));
}
responseObserver.onNext(builder.build());
@@ -354,6 +427,96 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
}
}
+ @Override
+ public void kvCheckTxnStatus(
+ org.tikv.kvproto.Kvrpcpb.CheckTxnStatusRequest request,
+ io.grpc.stub.StreamObserver
+ responseObserver) {
+ logger.info("KVMockServer.kvCheckTxnStatus");
+ try {
+ Long startTs = request.getLockTs();
+ Long currentTs = request.getCurrentTs();
+ logger.info("kvCheckTxnStatus for txn: " + startTs);
+ Kvrpcpb.CheckTxnStatusResponse.Builder builder = Kvrpcpb.CheckTxnStatusResponse.newBuilder();
+
+ Error e = verifyContext(request.getContext());
+ if (e != null) {
+ responseObserver.onNext(builder.setRegionError(e).build());
+ responseObserver.onCompleted();
+ return;
+ }
+
+ Supplier txnStatus = txnStatusMap.get(startTs);
+ if (txnStatus != null) {
+ Kvrpcpb.CheckTxnStatusResponse resp = txnStatus.get().build();
+ if (resp.getCommitVersion() == 0
+ && resp.getLockTtl() > 0
+ && TiTimestamp.extractPhysical(startTs) + resp.getLockInfo().getLockTtl()
+ < TiTimestamp.extractPhysical(currentTs)) {
+ ByteString key = resp.getLockInfo().getKey();
+ logger.info(
+ String.format(
+ "kvCheckTxnStatus rollback expired txn: %d, remove lock: %s",
+ startTs, key.toStringUtf8()));
+ removeLock(key);
+ putTxnStatus(startTs, 0L, ByteString.EMPTY);
+ resp = txnStatusMap.get(startTs).get().build();
+ }
+ logger.info("kvCheckTxnStatus resp: " + resp);
+ responseObserver.onNext(resp);
+ } else {
+ builder.setError(
+ Kvrpcpb.KeyError.newBuilder()
+ .setTxnNotFound(
+ Kvrpcpb.TxnNotFound.newBuilder()
+ .setPrimaryKey(request.getPrimaryKey())
+ .setStartTs(startTs)));
+ logger.info("kvCheckTxnStatus, TxnNotFound");
+ responseObserver.onNext(builder.build());
+ }
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("kvCheckTxnStatus error: " + e);
+ responseObserver.onError(Status.INTERNAL.asRuntimeException());
+ }
+ }
+
+ @Override
+ public void kvResolveLock(
+ org.tikv.kvproto.Kvrpcpb.ResolveLockRequest request,
+ io.grpc.stub.StreamObserver responseObserver) {
+ logger.info("KVMockServer.kvResolveLock");
+ try {
+ Long startTs = request.getStartVersion();
+ Long commitTs = request.getCommitVersion();
+ logger.info(
+ String.format(
+ "kvResolveLock for txn: %d, commitTs: %d, keys: %d",
+ startTs, commitTs, request.getKeysCount()));
+ Kvrpcpb.ResolveLockResponse.Builder builder = Kvrpcpb.ResolveLockResponse.newBuilder();
+
+ Error e = verifyContext(request.getContext());
+ if (e != null) {
+ responseObserver.onNext(builder.setRegionError(e).build());
+ responseObserver.onCompleted();
+ return;
+ }
+
+ if (request.getKeysCount() == 0) {
+ lockMap.entrySet().removeIf(entry -> entry.getValue().get().getLockVersion() == startTs);
+ } else {
+ for (int i = 0; i < request.getKeysCount(); i++) {
+ removeLock(request.getKeys(i));
+ }
+ }
+
+ responseObserver.onNext(builder.build());
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ responseObserver.onError(Status.INTERNAL.asRuntimeException());
+ }
+ }
+
@Override
public void coprocessor(
org.tikv.kvproto.Coprocessor.Request requestWrap,
diff --git a/src/test/java/org/tikv/common/MockRegionManager.java b/src/test/java/org/tikv/common/MockRegionManager.java
new file mode 100644
index 0000000000..cac65312d0
--- /dev/null
+++ b/src/test/java/org/tikv/common/MockRegionManager.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2022 TiKV Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.tikv.common;
+
+import static org.tikv.common.GrpcUtils.encodeKey;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.tikv.common.apiversion.RequestKeyV1TxnCodec;
+import org.tikv.common.key.Key;
+import org.tikv.common.region.RegionManager;
+import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
+import org.tikv.common.region.TiStoreType;
+import org.tikv.common.util.KeyRangeUtils;
+import org.tikv.common.util.Pair;
+import org.tikv.kvproto.Coprocessor.KeyRange;
+import org.tikv.kvproto.Kvrpcpb.CommandPri;
+import org.tikv.kvproto.Kvrpcpb.IsolationLevel;
+import org.tikv.kvproto.Metapb;
+import org.tikv.kvproto.Metapb.Peer;
+import org.tikv.kvproto.Metapb.Region;
+
+public class MockRegionManager extends RegionManager {
+
+ private final Map mockRegionMap;
+
+ private static TiRegion region(long id, KeyRange range) {
+ RequestKeyV1TxnCodec v1 = new RequestKeyV1TxnCodec();
+
+ TiConfiguration configuration = new TiConfiguration();
+ configuration.setIsolationLevel(IsolationLevel.RC);
+ configuration.setCommandPriority(CommandPri.Low);
+ Region r =
+ Metapb.Region.newBuilder()
+ .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
+ .setId(id)
+ .setStartKey(encodeKey(range.getStart().toByteArray()))
+ .setEndKey(encodeKey(range.getEnd().toByteArray()))
+ .addPeers(Peer.getDefaultInstance())
+ .build();
+
+ List s = ImmutableList.of(Metapb.Store.newBuilder().setId(id).build());
+
+ return new TiRegion(
+ configuration,
+ v1.decodeRegion(r),
+ null,
+ r.getPeersList(),
+ s.stream().map(TiStore::new).collect(Collectors.toList()));
+ }
+
+ public MockRegionManager(List ranges) {
+ super(null, null);
+ mockRegionMap =
+ ranges.stream().collect(Collectors.toMap(kr -> kr, kr -> region(ranges.indexOf(kr), kr)));
+ }
+
+ @Override
+ public Pair getRegionStorePairByKey(ByteString key, TiStoreType storeType) {
+ for (Map.Entry entry : mockRegionMap.entrySet()) {
+ KeyRange range = entry.getKey();
+ if (KeyRangeUtils.makeRange(range.getStart(), range.getEnd()).contains(Key.toRawKey(key))) {
+ TiRegion region = entry.getValue();
+ return Pair.create(
+ region, new TiStore(Metapb.Store.newBuilder().setId(region.getId()).build()));
+ }
+ }
+ return null;
+ }
+}
diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java
index 02cab4c46f..db9ae5694b 100644
--- a/src/test/java/org/tikv/common/MockServerTest.java
+++ b/src/test/java/org/tikv/common/MockServerTest.java
@@ -39,6 +39,8 @@ public class MockServerTest extends PDMockServerTest {
public void setup() throws IOException {
super.setup();
+ port = GrpcUtils.getFreePort();
+
Metapb.Region r =
Metapb.Region.newBuilder()
.setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
@@ -51,7 +53,7 @@ public class MockServerTest extends PDMockServerTest {
List s =
ImmutableList.of(
Metapb.Store.newBuilder()
- .setAddress("localhost:1234")
+ .setAddress(LOCAL_ADDR + ":" + port)
.setVersion("5.0.0")
.setId(13)
.build());
@@ -70,6 +72,6 @@ public class MockServerTest extends PDMockServerTest {
(request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
}
server = new KVMockServer();
- port = server.start(region);
+ server.start(region, port);
}
}
diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java
index a8074d9457..6837334fee 100644
--- a/src/test/java/org/tikv/common/PDClientMockTest.java
+++ b/src/test/java/org/tikv/common/PDClientMockTest.java
@@ -74,9 +74,12 @@ public class PDClientMockTest extends PDMockServerTest {
@Test
public void testTso() throws Exception {
try (PDClient client = session.getPDClient()) {
+ Long current = System.currentTimeMillis();
TiTimestamp ts = client.getTimestamp(defaultBackOff());
- // Test pdServer is set to generate physical == logical + 1
- assertEquals(ts.getPhysical(), ts.getLogical() + 1);
+ // Test pdServer is set to generate physical to current, logical to 1
+ assertTrue(ts.getPhysical() >= current);
+ assertTrue(ts.getPhysical() < current + 100);
+ assertEquals(ts.getLogical(), 1);
}
}
diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java
index 723034f1e3..99ccb66bbb 100644
--- a/src/test/java/org/tikv/common/PDMockServer.java
+++ b/src/test/java/org/tikv/common/PDMockServer.java
@@ -75,8 +75,17 @@ public class PDMockServer extends PDGrpc.PDImplBase {
@Override
public StreamObserver tso(StreamObserver resp) {
return new StreamObserver() {
- private int physical = 1;
- private int logical = 0;
+ private long physical = System.currentTimeMillis();
+ private long logical = 0;
+
+ private void updateTso() {
+ logical++;
+ if (logical >= (1 << 18)) {
+ logical = 0;
+ physical++;
+ }
+ physical = Math.max(physical, System.currentTimeMillis());
+ }
@Override
public void onNext(TsoRequest value) {}
@@ -86,7 +95,8 @@ public class PDMockServer extends PDGrpc.PDImplBase {
@Override
public void onCompleted() {
- resp.onNext(GrpcUtils.makeTsoResponse(clusterId, physical++, logical++));
+ updateTso();
+ resp.onNext(GrpcUtils.makeTsoResponse(clusterId, physical, logical));
resp.onCompleted();
}
};
diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java
index 6052640f9b..eddd22a6c6 100644
--- a/src/test/java/org/tikv/common/RegionManagerTest.java
+++ b/src/test/java/org/tikv/common/RegionManagerTest.java
@@ -31,6 +31,7 @@ import org.tikv.common.key.Key;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
+import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
@@ -135,7 +136,7 @@ public class RegionManagerTest extends PDMockServerTest {
Pair pair = mgr.getRegionStorePairByKey(searchKey);
assertEquals(pair.first.getId(), regionId);
- assertEquals(pair.first.getId(), storeId);
+ assertEquals(pair.second.getId(), 10);
}
@Test
@@ -179,4 +180,52 @@ public class RegionManagerTest extends PDMockServerTest {
} catch (Exception ignored) {
}
}
+
+ @Test
+ public void getRegionStorePairByKeyWithTiFlash() {
+
+ ByteString startKey = ByteString.copyFrom(new byte[] {1});
+ ByteString endKey = ByteString.copyFrom(new byte[] {10});
+ ByteString searchKey = ByteString.copyFrom(new byte[] {5});
+ String testAddress = "testAddress";
+ long firstStoreId = 233;
+ long secondStoreId = 234;
+ int confVer = 1026;
+ int ver = 1027;
+ long regionId = 233;
+ leader.addGetRegionListener(
+ request ->
+ GrpcUtils.makeGetRegionResponse(
+ leader.getClusterId(),
+ GrpcUtils.makeRegion(
+ regionId,
+ GrpcUtils.encodeKey(startKey.toByteArray()),
+ GrpcUtils.encodeKey(endKey.toByteArray()),
+ GrpcUtils.makeRegionEpoch(confVer, ver),
+ GrpcUtils.makeLearnerPeer(1, firstStoreId),
+ GrpcUtils.makeLearnerPeer(2, secondStoreId))));
+
+ AtomicInteger i = new AtomicInteger(0);
+ long[] ids = new long[] {firstStoreId, secondStoreId};
+ leader.addGetStoreListener(
+ (request ->
+ GrpcUtils.makeGetStoreResponse(
+ leader.getClusterId(),
+ GrpcUtils.makeStore(
+ ids[i.getAndIncrement()],
+ testAddress,
+ StoreState.Up,
+ GrpcUtils.makeStoreLabel("engine", "tiflash"),
+ GrpcUtils.makeStoreLabel("k1", "v1"),
+ GrpcUtils.makeStoreLabel("k2", "v2")))));
+
+ Pair pair = mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash);
+ assertEquals(pair.first.getId(), regionId);
+ assertEquals(pair.second.getId(), firstStoreId);
+
+ Pair secondPair =
+ mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash);
+ assertEquals(secondPair.first.getId(), regionId);
+ assertEquals(secondPair.second.getId(), secondStoreId);
+ }
}
diff --git a/src/test/java/org/tikv/common/RegionStoreClientTest.java b/src/test/java/org/tikv/common/RegionStoreClientTest.java
index 1a03ad80e2..bb288c48ae 100644
--- a/src/test/java/org/tikv/common/RegionStoreClientTest.java
+++ b/src/test/java/org/tikv/common/RegionStoreClientTest.java
@@ -17,15 +17,16 @@
package org.tikv.common;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Optional;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.tikv.common.exception.KeyException;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
@@ -40,6 +41,7 @@ import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
public class RegionStoreClientTest extends MockServerTest {
+ private static final Logger logger = LoggerFactory.getLogger(MockServerTest.class);
private RegionStoreClient createClientV2() {
return createClient("2.1.19");
@@ -49,6 +51,10 @@ public class RegionStoreClientTest extends MockServerTest {
return createClient("3.0.12");
}
+ private RegionStoreClient createClientV4() {
+ return createClient("6.1.0");
+ }
+
private RegionStoreClient createClient(String version) {
Metapb.Store meta =
Metapb.Store.newBuilder()
@@ -161,30 +167,130 @@ public class RegionStoreClientTest extends MockServerTest {
@Test
public void scanTest() {
- doScanTest(createClientV3());
+ doScanTest(createClientV4());
}
public void doScanTest(RegionStoreClient client) {
+ Long startTs = session.getTimestamp().getVersion();
+
server.put("key1", "value1");
server.put("key2", "value2");
server.put("key4", "value4");
server.put("key5", "value5");
- List kvs = client.scan(defaultBackOff(), ByteString.copyFromUtf8("key2"), 1);
- assertEquals(3, kvs.size());
+
+ // put lock will expire in 1s
+ ByteString key6 = ByteString.copyFromUtf8("key6");
+ server.putWithLock(key6, ByteString.copyFromUtf8("value6"), key6, startTs, 100L);
+ server.putTxnStatus(startTs, 0L, key6);
+ assertTrue(server.hasLock(key6));
+
+ List kvs =
+ client.scan(
+ defaultBackOff(), ByteString.copyFromUtf8("key2"), session.getTimestamp().getVersion());
+ assertEquals(4, kvs.size());
kvs.forEach(
kv ->
assertEquals(
kv.getKey().toStringUtf8().replace("key", "value"), kv.getValue().toStringUtf8()));
+ assertFalse(server.hasLock(key6));
+ // put region error
server.putError(
"error1",
() -> Errorpb.Error.newBuilder().setServerIsBusy(ServerIsBusy.getDefaultInstance()));
try {
- client.scan(defaultBackOff(), ByteString.copyFromUtf8("error1"), 1);
+ client.scan(
+ defaultBackOff(), ByteString.copyFromUtf8("error1"), session.getTimestamp().getVersion());
fail();
} catch (Exception e) {
assertTrue(true);
}
+ server.removeError("error1");
+
+ // put lock
+ Long startTs7 = session.getTimestamp().getVersion();
+ ByteString key7 = ByteString.copyFromUtf8("key7");
+ server.putWithLock(key7, ByteString.copyFromUtf8("value7"), key7, startTs7, 3000L);
+ server.putTxnStatus(startTs7, 0L, key7);
+ assertTrue(server.hasLock(key7));
+ try {
+ client.scan(
+ defaultBackOff(), ByteString.copyFromUtf8("key2"), session.getTimestamp().getVersion());
+ fail();
+ } catch (Exception e) {
+ KeyException keyException = (KeyException) e.getCause();
+ assertTrue(keyException.getMessage().contains("org.tikv.txn.Lock"));
+ }
+ assertTrue(server.hasLock(key7));
+
+ server.clearAllMap();
+ client.close();
+ }
+
+ @Test
+ public void resolveLocksTest() {
+ doResolveLocksTest(createClientV4());
+ }
+
+ public void doResolveLocksTest(RegionStoreClient client) {
+ ByteString primaryKey = ByteString.copyFromUtf8("primary");
+ server.put(primaryKey, ByteString.copyFromUtf8("value0"));
+
+ // get with committed lock
+ {
+ Long startTs = session.getTimestamp().getVersion();
+ Long commitTs = session.getTimestamp().getVersion();
+ logger.info("startTs: " + startTs);
+
+ ByteString key1 = ByteString.copyFromUtf8("key1");
+ ByteString value1 = ByteString.copyFromUtf8("value1");
+ server.putWithLock(key1, value1, primaryKey, startTs, 1L);
+ server.putTxnStatus(startTs, commitTs);
+ assertTrue(server.hasLock(key1));
+
+ ByteString expected1 = client.get(defaultBackOff(), key1, 200);
+ assertEquals(value1, expected1);
+ assertFalse(server.hasLock(key1));
+ }
+
+ // get with not expired lock.
+ {
+ Long startTs = session.getTimestamp().getVersion();
+ logger.info("startTs: " + startTs);
+
+ ByteString key2 = ByteString.copyFromUtf8("key2");
+ ByteString value2 = ByteString.copyFromUtf8("value2");
+ server.putWithLock(key2, value2, key2, startTs, 3000L);
+ server.putTxnStatus(startTs, 0L, key2);
+ assertTrue(server.hasLock(key2));
+
+ try {
+ client.get(defaultBackOff(), key2, session.getTimestamp().getVersion());
+ fail();
+ } catch (Exception e) {
+ KeyException keyException = (KeyException) e.getCause();
+ assertTrue(keyException.getMessage().contains("org.tikv.txn.Lock"));
+ }
+ assertTrue(server.hasLock(key2));
+ }
+
+ // get with expired lock.
+ {
+ Long startTs = session.getTimestamp().getVersion();
+ logger.info("startTs: " + startTs);
+
+ ByteString key3 = ByteString.copyFromUtf8("key3");
+ ByteString value3 = ByteString.copyFromUtf8("value3");
+ server.putWithLock(key3, value3, key3, startTs, 100L);
+ server.putTxnStatus(startTs, 0L, key3);
+ assertTrue(server.hasLock(key3));
+
+ ByteString expected3 =
+ client.get(defaultBackOff(), key3, session.getTimestamp().getVersion());
+ assertEquals(expected3, value3);
+ assertFalse(server.hasLock(key3));
+ }
+
server.clearAllMap();
client.close();
}
diff --git a/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java b/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java
index 871a20cdf2..ed97fcdb81 100644
--- a/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java
+++ b/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java
@@ -17,8 +17,7 @@
package org.tikv.common.apiversion;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
@@ -177,5 +176,85 @@ public class RequestKeyCodecTest {
decoded = v2.decodeRegion(region);
assertEquals(start, decoded.getStartKey());
assertEquals(ByteString.EMPTY, decoded.getEndKey());
+
+ // test region out of keyspace
+ {
+ ByteString m_123 = CodecUtils.encode(ByteString.copyFromUtf8("m_123"));
+ ByteString m_124 = CodecUtils.encode(ByteString.copyFromUtf8("m_124"));
+ ByteString infiniteEndKey_0 =
+ CodecUtils.encode(v2.infiniteEndKey.concat(ByteString.copyFrom(new byte[] {0})));
+ ByteString t_123 = CodecUtils.encode(ByteString.copyFromUtf8("t_123"));
+ ByteString y_123 = CodecUtils.encode(ByteString.copyFromUtf8("y_123"));
+
+ ByteString[][] outOfKeyspaceCases = {
+ {ByteString.EMPTY, CodecUtils.encode(v2.keyPrefix)}, // ["", "r000"/"x000")
+ {ByteString.EMPTY, m_123},
+ {m_123, m_124},
+ {m_124, CodecUtils.encode(v2.keyPrefix)},
+ {CodecUtils.encode(v2.infiniteEndKey), ByteString.EMPTY}, // ["r001"/"x001", "")
+ {CodecUtils.encode(v2.infiniteEndKey), infiniteEndKey_0},
+ {infiniteEndKey_0, t_123},
+ {y_123, ByteString.EMPTY}, // "y_123" is bigger than "infiniteEndKey" for both raw & txn.
+ };
+
+ for (ByteString[] testCase : outOfKeyspaceCases) {
+ region = Region.newBuilder().setStartKey(testCase[0]).setEndKey(testCase[1]).build();
+ try {
+ decoded = v2.decodeRegion(region);
+ fail(String.format("[%s,%s): %s", testCase[0], testCase[1], decoded.toString()));
+ } catch (Exception ignored) {
+ }
+ }
+ }
+
+ // case: regionStartKey == "" < keyPrefix < regionEndKey < infiniteEndKey
+ region =
+ Region.newBuilder()
+ .setStartKey(ByteString.EMPTY)
+ .setEndKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0"))))
+ .build();
+ decoded = v2.decodeRegion(region);
+ assertTrue(decoded.getStartKey().isEmpty());
+ assertEquals(ByteString.copyFromUtf8("0"), decoded.getEndKey());
+
+ // case: "" < regionStartKey < keyPrefix < regionEndKey < infiniteEndKey < ""
+ region =
+ Region.newBuilder()
+ .setStartKey(CodecUtils.encode(ByteString.copyFromUtf8("m_123")))
+ .setEndKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0"))))
+ .build();
+ decoded = v2.decodeRegion(region);
+ assertEquals(ByteString.EMPTY, decoded.getStartKey());
+ assertEquals(ByteString.copyFromUtf8("0"), decoded.getEndKey());
+
+ // case: "" < regionStartKey < keyPrefix < infiniteEndKey < regionEndKey < ""
+ region =
+ Region.newBuilder()
+ .setStartKey(CodecUtils.encode(ByteString.copyFromUtf8("m_123")))
+ .setEndKey(CodecUtils.encode(v2.infiniteEndKey.concat(ByteString.copyFromUtf8("0"))))
+ .build();
+ decoded = v2.decodeRegion(region);
+ assertEquals(ByteString.EMPTY, decoded.getStartKey());
+ assertEquals(ByteString.EMPTY, decoded.getEndKey());
+
+ // case: keyPrefix < regionStartKey < infiniteEndKey < regionEndKey < ""
+ region =
+ Region.newBuilder()
+ .setStartKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0"))))
+ .setEndKey(CodecUtils.encode(v2.infiniteEndKey.concat(ByteString.copyFromUtf8("0"))))
+ .build();
+ decoded = v2.decodeRegion(region);
+ assertEquals(ByteString.copyFromUtf8("0"), decoded.getStartKey());
+ assertTrue(decoded.getEndKey().isEmpty());
+
+ // case: keyPrefix < regionStartKey < infiniteEndKey < regionEndKey == ""
+ region =
+ Region.newBuilder()
+ .setStartKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0"))))
+ .setEndKey(ByteString.EMPTY)
+ .build();
+ decoded = v2.decodeRegion(region);
+ assertEquals(ByteString.copyFromUtf8("0"), decoded.getStartKey());
+ assertTrue(decoded.getEndKey().isEmpty());
}
}
diff --git a/src/test/java/org/tikv/common/util/PairTest.java b/src/test/java/org/tikv/common/util/PairTest.java
new file mode 100644
index 0000000000..b1fd0c6bc9
--- /dev/null
+++ b/src/test/java/org/tikv/common/util/PairTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2023 TiKV Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.tikv.common.util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.junit.Test;
+import org.tikv.common.PDMockServerTest;
+import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
+import org.tikv.kvproto.Metapb;
+import org.tikv.kvproto.Metapb.Peer;
+
+public class PairTest extends PDMockServerTest {
+
+ @Test
+ public void testPair() {
+ Metapb.Region r =
+ Metapb.Region.newBuilder()
+ .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
+ .setId(233)
+ .setStartKey(ByteString.EMPTY)
+ .setEndKey(ByteString.EMPTY)
+ .addPeers(Peer.getDefaultInstance())
+ .build();
+ List s =
+ ImmutableList.of(
+ Metapb.Store.newBuilder()
+ .setAddress(LOCAL_ADDR + ":" + 4000)
+ .setVersion("5.0.0")
+ .setId(1)
+ .build());
+
+ TiRegion region =
+ new TiRegion(
+ session.getConf(),
+ r,
+ r.getPeers(0),
+ r.getPeersList(),
+ s.stream().map(TiStore::new).collect(Collectors.toList()));
+ TiStore store = new TiStore(s.get(0));
+
+ Map, List> groupKeyMap = new HashMap<>();
+
+ for (int i = 0; i < 10; i++) {
+ Pair pair = Pair.create(region, store);
+ groupKeyMap
+ .computeIfAbsent(pair, e -> new ArrayList<>())
+ .add(ByteString.copyFromUtf8("test"));
+ }
+ Pair pair = Pair.create(region, store);
+ assert (groupKeyMap.get(pair).size() == 10);
+ }
+}
diff --git a/src/test/java/org/tikv/txn/BatchGetTest.java b/src/test/java/org/tikv/txn/BatchGetTest.java
new file mode 100644
index 0000000000..cbdff1b392
--- /dev/null
+++ b/src/test/java/org/tikv/txn/BatchGetTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2022 TiKV Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.tikv.txn;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+
+import com.google.protobuf.ByteString;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+import org.tikv.common.BytePairWrapper;
+import org.tikv.common.ByteWrapper;
+import org.tikv.common.exception.KeyException;
+import org.tikv.common.util.BackOffer;
+import org.tikv.common.util.ConcreteBackOffer;
+import org.tikv.kvproto.Kvrpcpb.KvPair;
+
+public class BatchGetTest extends TXNTest {
+
+ @Test
+ public void BatchGetResolveLockTest() throws Exception {
+ long lockTTL = 20000L;
+ String key1 = "batchGetResolveLockTestKey1";
+ String key2 = "batchGetResolveLockTestKey2";
+ String val1 = "val1";
+ String val2 = "val2";
+ String val1_update = "val1_update";
+ String val2_update = "val2_update";
+
+ // put key1 and key2
+ putKV(key1, val1);
+ putKV(key2, val2);
+
+ // run 2PC background
+ new Thread(
+ () -> {
+ long startTS = session.getTimestamp().getVersion();
+ try (TwoPhaseCommitter twoPhaseCommitter =
+ new TwoPhaseCommitter(session, startTS, lockTTL)) {
+ byte[] primaryKey = key1.getBytes("UTF-8");
+ byte[] secondary = key2.getBytes("UTF-8");
+ // prewrite primary key
+ twoPhaseCommitter.prewritePrimaryKey(
+ ConcreteBackOffer.newCustomBackOff(5000),
+ primaryKey,
+ val1_update.getBytes("UTF-8"));
+ List pairs =
+ Arrays.asList(new BytePairWrapper(secondary, val2_update.getBytes("UTF-8")));
+ // prewrite secondary key
+ twoPhaseCommitter.prewriteSecondaryKeys(primaryKey, pairs.iterator(), 5000);
+
+ // get commitTS
+ long commitTS = session.getTimestamp().getVersion();
+ Thread.sleep(5000);
+ // commit primary key
+ twoPhaseCommitter.commitPrimaryKey(
+ ConcreteBackOffer.newCustomBackOff(5000), primaryKey, commitTS);
+ // commit secondary key
+ List keys = Arrays.asList(new ByteWrapper(secondary));
+ twoPhaseCommitter.commitSecondaryKeys(keys.iterator(), commitTS, 5000);
+ } catch (Exception e) {
+ KeyException keyException = (KeyException) e.getCause().getCause();
+ assertNotSame("", keyException.getKeyErr().getCommitTsExpired().toString());
+ }
+ })
+ .start();
+
+ // wait 2PC get commitTS
+ Thread.sleep(2000);
+ // batch get key1 and key2
+ try (KVClient kvClient = session.createKVClient()) {
+ long version = session.getTimestamp().getVersion();
+ ByteString k1 = ByteString.copyFromUtf8(key1);
+ ByteString k2 = ByteString.copyFromUtf8(key2);
+
+ BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(5000);
+ List kvPairs = kvClient.batchGet(backOffer, Arrays.asList(k1, k2), version);
+ // Since TiKV v4.0.0 write locked key will not block read. it is supported by Min Commit
+ // Timestamp
+ assertEquals(ByteString.copyFromUtf8(val1), kvPairs.get(0).getValue());
+ assertEquals(ByteString.copyFromUtf8(val2), kvPairs.get(1).getValue());
+ System.out.println(kvPairs);
+ // wait 2PC finish
+ Thread.sleep(10000);
+ }
+ }
+}
diff --git a/src/test/java/org/tikv/txn/TXNTest.java b/src/test/java/org/tikv/txn/TXNTest.java
index 92af0383da..386ad8182e 100644
--- a/src/test/java/org/tikv/txn/TXNTest.java
+++ b/src/test/java/org/tikv/txn/TXNTest.java
@@ -41,7 +41,7 @@ import org.tikv.kvproto.Kvrpcpb;
public class TXNTest extends BaseTxnKVTest {
static final int DEFAULT_TTL = 10;
- private TiSession session;
+ public TiSession session;
RegionStoreClient.RegionStoreClientBuilder builder;
@Before
diff --git a/src/test/java/org/tikv/util/RangeSplitterTest.java b/src/test/java/org/tikv/util/RangeSplitterTest.java
new file mode 100644
index 0000000000..7207f959d4
--- /dev/null
+++ b/src/test/java/org/tikv/util/RangeSplitterTest.java
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2022 TiKV Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.tikv.util;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import gnu.trove.list.array.TLongArrayList;
+import gnu.trove.map.hash.TLongObjectHashMap;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Test;
+import org.tikv.common.MockRegionManager;
+import org.tikv.common.codec.Codec.IntegerCodec;
+import org.tikv.common.codec.CodecDataOutput;
+import org.tikv.common.key.RowKey;
+import org.tikv.common.key.RowKey.DecodeResult.Status;
+import org.tikv.common.util.RangeSplitter;
+import org.tikv.kvproto.Coprocessor.KeyRange;
+
+public class RangeSplitterTest {
+
+ private static KeyRange keyRange(Long s, Long e) {
+ ByteString sKey = ByteString.EMPTY;
+ ByteString eKey = ByteString.EMPTY;
+ if (s != null) {
+ CodecDataOutput cdo = new CodecDataOutput();
+ IntegerCodec.writeLongFully(cdo, s, true);
+ sKey = cdo.toByteString();
+ }
+
+ if (e != null) {
+ CodecDataOutput cdo = new CodecDataOutput();
+ IntegerCodec.writeLongFully(cdo, e, true);
+ eKey = cdo.toByteString();
+ }
+
+ return KeyRange.newBuilder().setStart(sKey).setEnd(eKey).build();
+ }
+
+ private static KeyRange keyRangeByHandle(long tableId, Long s, Long e) {
+ return keyRangeByHandle(tableId, s, Status.EQUAL, e, Status.EQUAL);
+ }
+
+ private static KeyRange keyRangeByHandle(long tableId, Long s, Status ss, Long e, Status es) {
+ ByteString sKey = shiftByStatus(handleToByteString(tableId, s), ss);
+ ByteString eKey = shiftByStatus(handleToByteString(tableId, e), es);
+
+ return KeyRange.newBuilder().setStart(sKey).setEnd(eKey).build();
+ }
+
+ private static ByteString shiftByStatus(ByteString v, Status s) {
+ switch (s) {
+ case EQUAL:
+ return v;
+ case LESS:
+ return v.substring(0, v.size() - 1);
+ case GREATER:
+ return v.concat(ByteString.copyFrom(new byte[] {1, 0}));
+ default:
+ throw new IllegalArgumentException("Only EQUAL,LESS,GREATER allowed");
+ }
+ }
+
+ private static ByteString handleToByteString(long tableId, Long k) {
+ if (k != null) {
+ return RowKey.toRowKey(tableId, k).toByteString();
+ }
+ return ByteString.EMPTY;
+ }
+
+ @Test
+ public void splitRangeByRegionTest() {
+ MockRegionManager mgr =
+ new MockRegionManager(
+ ImmutableList.of(keyRange(null, 30L), keyRange(30L, 50L), keyRange(50L, null)));
+ RangeSplitter s = RangeSplitter.newSplitter(mgr);
+ List tasks =
+ s.splitRangeByRegion(
+ ImmutableList.of(
+ keyRange(0L, 40L), keyRange(41L, 42L), keyRange(45L, 50L), keyRange(70L, 1000L)));
+
+ assertEquals(tasks.get(0).getRegion().getId(), 0);
+ assertEquals(tasks.get(0).getRanges().size(), 1);
+ KeyRange range = tasks.get(0).getRanges().get(0);
+ assertEquals(tasks.get(0).getRanges().get(0), keyRange(0L, 30L));
+
+ assertEquals(tasks.get(1).getRegion().getId(), 1);
+ assertEquals(tasks.get(1).getRanges().get(0), keyRange(30L, 40L));
+ assertEquals(tasks.get(1).getRanges().get(1), keyRange(41L, 42L));
+ assertEquals(tasks.get(1).getRanges().get(2), keyRange(45L, 50L));
+ assertEquals(tasks.get(1).getRanges().size(), 3);
+
+ assertEquals(tasks.get(2).getRegion().getId(), 2);
+ assertEquals(tasks.get(2).getRanges().size(), 1);
+ assertEquals(tasks.get(2).getRanges().get(0), keyRange(70L, 1000L));
+ }
+
+ @Test
+ public void splitAndSortHandlesByRegionTest() {
+ final long tableId = 1;
+ List handles = new ArrayList<>();
+ handles.add(1L);
+ handles.add(5L);
+ handles.add(4L);
+ handles.add(3L);
+ handles.add(10L);
+ handles.add(2L);
+ handles.add(100L);
+ handles.add(101L);
+ handles.add(99L);
+ handles.add(88L);
+ handles.add(-1L);
+ handles.add(-255L);
+ handles.add(-100L);
+ handles.add(-99L);
+ handles.add(-98L);
+ handles.add(Long.MIN_VALUE);
+ handles.add(8960L);
+ handles.add(8959L);
+ handles.add(19999L);
+ handles.add(15001L);
+
+ MockRegionManager mgr =
+ new MockRegionManager(
+ ImmutableList.of(
+ keyRangeByHandle(tableId, null, Status.EQUAL, -100L, Status.EQUAL),
+ keyRangeByHandle(tableId, -100L, Status.EQUAL, 10L, Status.GREATER),
+ keyRangeByHandle(tableId, 10L, Status.GREATER, 50L, Status.EQUAL),
+ keyRangeByHandle(tableId, 50L, Status.EQUAL, 100L, Status.GREATER),
+ keyRangeByHandle(tableId, 100L, Status.GREATER, 9000L, Status.LESS),
+ keyRangeByHandle(tableId, 0x2300L /*8960*/, Status.LESS, 16000L, Status.EQUAL),
+ keyRangeByHandle(tableId, 16000L, Status.EQUAL, null, Status.EQUAL)));
+
+ RangeSplitter s = RangeSplitter.newSplitter(mgr);
+ List tasks =
+ new ArrayList<>(
+ s.splitAndSortHandlesByRegion(
+ ImmutableList.of(tableId),
+ new TLongArrayList(handles.stream().mapToLong(t -> t).toArray())));
+ tasks.sort(
+ (l, r) -> {
+ Long regionIdLeft = l.getRegion().getId();
+ Long regionIdRight = r.getRegion().getId();
+ return regionIdLeft.compareTo(regionIdRight);
+ });
+
+ // [-INF, -100): [Long.MIN_VALUE, Long.MIN_VALUE + 1), [-255, -254)
+ assertEquals(tasks.get(0).getRegion().getId(), 0);
+ assertEquals(tasks.get(0).getRanges().size(), 2);
+ assertEquals(
+ tasks.get(0).getRanges().get(0),
+ keyRangeByHandle(tableId, Long.MIN_VALUE, Long.MIN_VALUE + 1));
+ assertEquals(tasks.get(0).getRanges().get(1), keyRangeByHandle(tableId, -255L, -254L));
+
+ // [-100, 10.x): [-100, -97), [-1, 0), [1, 6), [10, 11)
+ assertEquals(tasks.get(1).getRegion().getId(), 1);
+ assertEquals(tasks.get(1).getRanges().size(), 4);
+ assertEquals(tasks.get(1).getRanges().get(0), keyRangeByHandle(tableId, -100L, -97L));
+ assertEquals(tasks.get(1).getRanges().get(1), keyRangeByHandle(tableId, -1L, 0L));
+ assertEquals(tasks.get(1).getRanges().get(2), keyRangeByHandle(tableId, 1L, 6L));
+ assertEquals(tasks.get(1).getRanges().get(3), keyRangeByHandle(tableId, 10L, 11L));
+
+ // [10.x, 50): empty
+ // [50, 100.x): [88, 89) [99, 101)
+ assertEquals(tasks.get(2).getRegion().getId(), 3);
+ assertEquals(tasks.get(2).getRanges().size(), 2);
+ assertEquals(tasks.get(2).getRanges().get(0), keyRangeByHandle(tableId, 88L, 89L));
+ assertEquals(tasks.get(2).getRanges().get(1), keyRangeByHandle(tableId, 99L, 101L));
+
+ // [100.x, less than 8960): [101, 102) [8959, 8960)
+ assertEquals(tasks.get(3).getRegion().getId(), 4);
+ assertEquals(tasks.get(3).getRanges().size(), 2);
+ assertEquals(tasks.get(3).getRanges().get(0), keyRangeByHandle(tableId, 101L, 102L));
+ assertEquals(tasks.get(3).getRanges().get(1), keyRangeByHandle(tableId, 8959L, 8960L));
+
+ // [less than 8960, 16000): [9000, 9001), [15001, 15002)
+ assertEquals(tasks.get(4).getRegion().getId(), 5);
+ assertEquals(tasks.get(4).getRanges().size(), 2);
+ assertEquals(tasks.get(4).getRanges().get(0), keyRangeByHandle(tableId, 8960L, 8961L));
+ assertEquals(tasks.get(4).getRanges().get(1), keyRangeByHandle(tableId, 15001L, 15002L));
+
+ // [16000, INF): [19999, 20000)
+ assertEquals(tasks.get(5).getRegion().getId(), 6);
+ assertEquals(tasks.get(5).getRanges().size(), 1);
+ assertEquals(tasks.get(5).getRanges().get(0), keyRangeByHandle(tableId, 19999L, 20000L));
+ }
+
+ @Test
+ public void groupByAndSortHandlesByRegionIdTest() {
+ final long tableId = 1;
+ List handles = new ArrayList<>();
+ handles.add(1L);
+ handles.add(5L);
+ handles.add(4L);
+ handles.add(3L);
+ handles.add(10L);
+ handles.add(11L);
+ handles.add(12L);
+ handles.add(2L);
+ handles.add(100L);
+ handles.add(101L);
+ handles.add(99L);
+ handles.add(88L);
+ handles.add(-1L);
+ handles.add(-255L);
+ handles.add(-100L);
+ handles.add(-99L);
+ handles.add(-98L);
+ handles.add(Long.MIN_VALUE);
+ handles.add(8960L);
+ handles.add(8959L);
+ handles.add(19999L);
+ handles.add(15001L);
+ handles.add(99999999999L);
+ handles.add(Long.MAX_VALUE);
+
+ MockRegionManager mgr =
+ new MockRegionManager(
+ ImmutableList.of(
+ keyRangeByHandle(tableId, null, Status.EQUAL, -100L, Status.EQUAL),
+ keyRangeByHandle(tableId, -100L, Status.EQUAL, 10L, Status.GREATER),
+ keyRangeByHandle(tableId, 10L, Status.GREATER, 50L, Status.EQUAL),
+ keyRangeByHandle(tableId, 50L, Status.EQUAL, 100L, Status.GREATER),
+ keyRangeByHandle(tableId, 100L, Status.GREATER, 9000L, Status.LESS),
+ keyRangeByHandle(tableId, 0x2300L /*8960*/, Status.LESS, 16000L, Status.EQUAL),
+ keyRangeByHandle(tableId, 16000L, Status.EQUAL, null, Status.EQUAL)));
+
+ TLongObjectHashMap result = new TLongObjectHashMap<>();
+ RangeSplitter.newSplitter(mgr)
+ .groupByAndSortHandlesByRegionId(
+ tableId, new TLongArrayList(handles.stream().mapToLong(t -> t).toArray()))
+ .forEach((k, v) -> result.put(k.first.getId(), v));
+ assertEquals(2, result.get(0).size());
+ assertEquals(10, result.get(1).size());
+ assertEquals(2, result.get(2).size());
+ assertEquals(3, result.get(3).size());
+ assertEquals(2, result.get(4).size());
+ assertEquals(2, result.get(5).size());
+ assertEquals(3, result.get(6).size());
+ }
+}