Merge branch 'master' of github.com:tikv/client-java into remove-shadowing-of-grpc-and-netty

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
This commit is contained in:
iosmanthus 2024-03-28 14:01:39 +08:00
commit fb372d73d9
No known key found for this signature in database
GPG Key ID: DEE5BAABFE092169
43 changed files with 1566 additions and 143 deletions

View File

@ -2,17 +2,20 @@
[raftstore]
# set store capacity, if no set, use disk capacity.
capacity = "8G"
capacity = "6G"
pd-heartbeat-tick-interval = "2s"
pd-store-heartbeat-tick-interval = "5s"
split-region-check-tick-interval = "1s"
[storage]
enable-ttl = true
[rocksdb]
max-open-files = 10000
[raftdb]
max-open-files = 10000
[storage.block-cache]
capacity = "128MB"
[storage]
reserve-space = "0MB"
enable-ttl = true

View File

@ -2,7 +2,7 @@
[raftstore]
# set store capacity, if no set, use disk capacity.
capacity = "8G"
capacity = "6G"
pd-heartbeat-tick-interval = "2s"
pd-store-heartbeat-tick-interval = "5s"
split-region-check-tick-interval = "1s"
@ -12,3 +12,9 @@ max-open-files = 10000
[raftdb]
max-open-files = 10000
[storage.block-cache]
capacity = "128MB"
[storage]
reserve-space = "0MB"

17
.github/config/tikv_v2.toml vendored Normal file
View File

@ -0,0 +1,17 @@
# TiKV Configuration.
[raftstore]
pd-heartbeat-tick-interval = "2s"
pd-store-heartbeat-tick-interval = "5s"
split-region-check-tick-interval = "1s"
[rocksdb]
max-open-files = 10000
[raftdb]
max-open-files = 10000
[storage]
reserve-space = "0MB"
api-version = 2
enable-ttl = true

View File

@ -31,7 +31,8 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
tikv_version: [nightly, v5.0.4, v5.3.0, v5.4.0]
tikv_version: [v5.0.6, v5.3.4, v5.4.3]
fail-fast: false
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
@ -40,29 +41,41 @@ jobs:
java-version: '8.0'
distribution: 'adopt'
- name: Install TiUP
run: curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
run: |
curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
/home/runner/.tiup/bin/tiup install playground pd:${{ matrix.tikv_version }} tikv:${{ matrix.tikv_version }}
- name: Start TiUP Playground
run: |
# Start TiKV in APIV1TTL
/home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_rawkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml &> raw.out 2>&1 &
# The first run of `tiup` has to download all components so it'll take longer.
sleep 1m 30s
touch tiup-v1ttl.log
/home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --host 127.0.0.1 --tag rawkv --mode tikv-slim --kv 1 --without-monitor --kv.port 20160 --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_rawkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2379 2>&1 >> tiup-v1ttl.log &
timeout 300 grep -q "PD Endpoints:" <(tail -f tiup-v1ttl.log)
cat tiup-v1ttl.log
echo "Wait for bootstrap"
sleep 10s
# Start TiKV in APIV1
/home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_txnkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml &> txn.out 2>&1 &
touch tiup-v1.log
/home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --host 127.0.0.1 --tag txnkv --mode tikv-slim --kv 1 --without-monitor --kv.port 30160 --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_txnkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2381 2>&1 >> tiup-v1.log &
timeout 300 grep -q "PD Endpoints:" <(tail -f tiup-v1.log)
cat tiup-v1.log
echo "Wait for bootstrap"
sleep 10s
sleep 30s
# Get PD address
echo "RAWKV_PD_ADDRESSES=127.0.0.1:2379" >> $GITHUB_ENV
echo "TXNKV_PD_ADDRESSES=127.0.0.1:2381" >> $GITHUB_ENV
# Parse PD address from `tiup` output
echo "RAWKV_PD_ADDRESSES=$(cat raw.out | grep -oP '(?<=PD client endpoints: \[)[0-9\.:]+(?=\])')" >> $GITHUB_ENV
echo "TXNKV_PD_ADDRESSES=$(cat txn.out | grep -oP '(?<=PD client endpoints: \[)[0-9\.:]+(?=\])')" >> $GITHUB_ENV
# Log the output
echo "$(cat raw.out)" >&2
echo "$(cat txn.out)" >&2
- name: Run Integration Test
run: mvn clean test
- name: Print TiKV logs
if: failure()
run: |
echo "RawKV TiKV logs"
cat /home/runner/.tiup/data/rawkv/tikv-0/tikv.log
echo "TxnKV TiKV logs"
cat /home/runner/.tiup/data/txnkv/tikv-0/tikv.log
- name: Upload coverage
uses: codecov/codecov-action@v2
with:

52
.github/workflows/ci_v2.yml vendored Normal file
View File

@ -0,0 +1,52 @@
name: CI (APIv2)
on:
pull_request:
push:
branches:
- master
jobs:
integration-test:
name: Integration Test - ${{ matrix.tikv_version }}
runs-on: ubuntu-latest
strategy:
matrix:
tikv_version: [v6.5.3, v7.1.1, nightly]
fail-fast: false
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
uses: actions/setup-java@v2
with:
java-version: '8.0'
distribution: 'adopt'
- name: Install TiUP
run: |
curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
/home/runner/.tiup/bin/tiup install playground pd:${{ matrix.tikv_version }} tikv:${{ matrix.tikv_version }}
- name: Start TiUP Playground
run: |
# Start TiKV in APIV2
touch tiup.log
/home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --tag kv --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_v2.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2379 2>&1 >> tiup.log &
timeout 300 grep -q "PD Endpoints:" <(tail -f tiup.log)
cat tiup.log
# Get PD address
echo "RAWKV_PD_ADDRESSES=127.0.0.1:2379" >> $GITHUB_ENV
echo "TXNKV_PD_ADDRESSES=127.0.0.1:2379" >> $GITHUB_ENV
- name: Run Integration Test
run: mvn clean test
- name: Print TiKV logs
if: failure()
run: |
echo "TiKV logs"
cat /home/runner/.tiup/data/kv/tikv-0/tikv.log
- name: Upload coverage
uses: codecov/codecov-action@v2
with:
files: ${{ github.workspace }}/target/site/jacoco/jacoco.xml
fail_ci_if_error: true
verbose: true

View File

@ -15,7 +15,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Check License Header
uses: apache/skywalking-eyes@main
uses: apache/skywalking-eyes@v0.3.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:

View File

@ -1,19 +0,0 @@
name: 'Stale Checker'
on:
schedule:
- cron: '0 0 * * *'
jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v4
with:
days-before-stale: 30
stale-issue-message: 'This issue is stale because it has been open 30 days with no activity.'
stale-issue-label: 'status/stale'
days-before-issue-close: -1
stale-pr-message: 'This PR is stale because it has been open 30 days with no activity. Remove the status/stale label or comment or this PR will be closed in 7 days.'
stale-pr-label: 'status/stale'
days-before-pr-close: 7
close-pr-message: 'This PR was closed because it has been stalled for 7 days with no activity.'

View File

@ -4,18 +4,17 @@
## TiKV JAVA Client
A Java client for [TiDB](https://github.com/pingcap/tidb)/[TiKV](https://github.com/tikv/tikv).
A Java client for [TiKV](https://github.com/tikv/tikv).
It is supposed to:
+ Communicate via [gRPC](http://www.grpc.io/)
+ Talk to Placement Driver searching for a region
+ Talk to TiKV for reading/writing data and the resulted data is encoded/decoded just like what we do in TiDB.
+ Talk to Coprocessor for calculation pushdown
+ Talk to TiKV for reading/writing data
## Quick Start
> TiKV Java Client is designed to communicate with [pd](https://github.com/tikv/pd) and [tikv](https://github.com/tikv/tikv), please run TiKV and PD in advance.
> TiKV Java Client is designed to communicate with [PD](https://github.com/tikv/pd) and [TiKV](https://github.com/tikv/tikv), please run PD and TiKV in advance.
Build java client from source file:
Build Java client from source file:
```sh
mvn clean install -Dmaven.test.skip=true
@ -27,11 +26,27 @@ Add maven dependency to `pom.xml`:
<dependency>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
<version>3.1.0</version>
<version>3.3.0</version>
</dependency>
```
Create a RawKVClient and communicates with TiKV:
Create a transactional `KVClient` and communicates with TiKV:
```java
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.txn.KVClient;
public class Main {
public static void main(String[] args) throws Exception {
TiConfiguration conf = TiConfiguration.createDefault(YOUR_PD_ADDRESSES);
TiSession session = TiSession.create(conf);
KVClient client = session.createKVClient();
}
}
```
Or create a `RawKVClient` if you don't need the transaction semantic:
```java
import org.tikv.common.TiConfiguration;
@ -39,8 +54,7 @@ import org.tikv.common.TiSession;
import org.tikv.raw.RawKVClient;
public class Main {
public static void main() {
// You MUST create a raw configuration if you are using RawKVClient.
public static void main(String[] args) throws Exception {
TiConfiguration conf = TiConfiguration.createRawDefault(YOUR_PD_ADDRESSES);
TiSession session = TiSession.create(conf);
RawKVClient client = session.createRawClient();
@ -48,7 +62,7 @@ public class Main {
}
```
Find more demo in [KVRawClientTest](https://github.com/birdstorm/KVRawClientTest/)
Find more demo in [TiKV Java Client User Documents](https://tikv.github.io/client-java/examples/introduction.html)
## Documentation

View File

@ -4,6 +4,8 @@
- [Introduction](./introduction/introduction.md)
- [Production Readiness](./production-readiness.md)
- [Start With Examples](./examples/introduction.md)
- [Quick Start](./examples/quick-start.md)
- [Interact with TiKV RawKV API](./examples/rawkv.md)

View File

@ -24,7 +24,7 @@ Add maven dependency to `pom.xml`.
<dependency>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
<version>3.1.0</version>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>

View File

@ -15,7 +15,7 @@ import org.tikv.raw.RawKVClient;
import org.tikv.shade.com.google.protobuf.ByteString;
public class Main {
public static void main() {
public static void main(String[] args) throws Exception {
// You MUST create a raw configuration if you are using RawKVClient.
TiConfiguration conf = TiConfiguration.createRawDefault("127.0.0.1:2379");
TiSession session = TiSession.create(conf);
@ -61,7 +61,7 @@ To enable the API V2 mode, users need to specify the API version of the client.
import org.tikv.common.TiConfiguration.ApiVersion;
public class Main {
public static void main() {
public static void main(String[] args) throws Exception {
TiConfiguration conf = TiConfiguration.createRawDefault("127.0.0.1:2379");
conf.setApiVersion(ApiVersion.V2);
try(TiSession session = TiSession.create(conf)) {

View File

@ -23,7 +23,7 @@ import org.tikv.txn.TwoPhaseCommitter;
public class App {
public static void main(String[] args) throws Exception {
TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:2389");
TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:2379");
try (TiSession session = TiSession.create(conf)) {
// two-phrase write
long startTS = session.getTimestamp().getVersion();

View File

@ -0,0 +1,18 @@
# Production Readiness
In general, the latest [release](https://github.com/tikv/client-java/releases) of TiKV Java Client is ready for production use. But it is not battle-tested as full featured client for TiKV in all use cases. This page will give you more details.
## RawKV
All RawKV APIs are covered by [CI](https://github.com/tikv/client-java/actions/workflows/ci.yml).
At this time, RawKV has been used in the production environment of some commercial customers in latency sensitive systems. But they only use part of the RawKV APIs (mainly including `raw_put`, `raw_get`, `raw_compare_and_swap`, and `raw_batch_put`).
## TxnKV
All TxnKV APIs are covered by [CI](https://github.com/tikv/client-java/actions/workflows/ci.yml).
In addition, TxnKV has been used in the [TiSpark](https://docs.pingcap.com/tidb/stable/tispark-overview) and [TiBigData](https://github.com/tidb-incubator/TiBigData) project to integrate data from TiDB to Big Data ecosystem. TiSpark and TiBigData are used in the production system of some commercial customers and internet companies.
Similar to RawKV, only part of APIs are used in this scenario (mainly including `prewrite/commit` and `coprocessor`). And this use case doesn't care about latency but throughput and reliability.
## TiDB Cloud
Directly using TiKV is not possible on TiDB Cloud due to the fact that client has to access the whole cluster, which has security issues. And TiKV managed service is not coming soon as it's not contained in [roadmap](https://docs.pingcap.com/tidbcloud/tidb-cloud-roadmap) yet.

View File

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
<version>3.3.0-SNAPSHOT</version>
<version>3.3.4-SNAPSHOT</version>
<packaging>jar</packaging>
<name>TiKV Java Client</name>
<description>A Java Client for TiKV</description>
@ -54,7 +54,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<protobuf.version>3.5.1</protobuf.version>
<protobuf.version>3.18.0</protobuf.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.16</slf4j.version>
<grpc.version>1.48.0</grpc.version>
@ -62,7 +62,7 @@
<gson.version>2.8.9</gson.version>
<powermock.version>1.6.6</powermock.version>
<jackson-annotations.version>2.13.2</jackson-annotations.version>
<jackson.version>2.13.2.2</jackson.version>
<jackson.version>2.13.4.2</jackson.version>
<trove4j.version>3.0.1</trove4j.version>
<jetcd.version>0.4.1</jetcd.version>
<joda-time.version>2.9.9</joda-time.version>

View File

@ -104,6 +104,7 @@ import org.tikv.kvproto.Pdpb.ScatterRegionResponse;
import org.tikv.kvproto.Pdpb.Timestamp;
import org.tikv.kvproto.Pdpb.TsoRequest;
import org.tikv.kvproto.Pdpb.TsoResponse;
import org.tikv.kvproto.Pdpb.UpdateServiceGCSafePointRequest;
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
implements ReadOnlyPDClient {
@ -383,6 +384,17 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
return () -> GetAllStoresRequest.newBuilder().setHeader(header).build();
}
private Supplier<UpdateServiceGCSafePointRequest> buildUpdateServiceGCSafePointRequest(
ByteString serviceId, long ttl, long safePoint) {
return () ->
UpdateServiceGCSafePointRequest.newBuilder()
.setHeader(header)
.setSafePoint(safePoint)
.setServiceId(serviceId)
.setTTL(ttl)
.build();
}
private <T> PDErrorHandler<GetStoreResponse> buildPDErrorHandler() {
return new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this);
@ -419,6 +431,20 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
return conf.getReplicaRead();
}
@Override
public Long updateServiceGCSafePoint(
String serviceId, long ttl, long safePoint, BackOffer backOffer) {
return callWithRetry(
backOffer,
PDGrpc.getUpdateServiceGCSafePointMethod(),
buildUpdateServiceGCSafePointRequest(
ByteString.copyFromUtf8(serviceId), ttl, safePoint),
new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
this))
.getMinSafePoint();
}
@Override
public void close() throws InterruptedException {
etcdClient.close();

View File

@ -72,4 +72,16 @@ public interface ReadOnlyPDClient {
Long getClusterId();
RequestKeyCodec getCodec();
/**
* Update ServiceGCSafePoint
*
* @param serviceId ServiceId
* @param ttl TTL in seconds
* @param safePoint The TiTimestamp you want to set. Set to start_ts.getPrevious() is a good
* practice
* @return the MinSafePoint of all services. If this value is greater than safePoint, it means
* update failed
*/
Long updateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer);
}

View File

@ -23,7 +23,7 @@ import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.CodecDataOutput;
// TODO(iosmanthus): use ByteString.wrap to avoid once more copying.
class CodecUtils {
public class CodecUtils {
public static ByteString encode(ByteString key) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key.toByteArray());

View File

@ -81,22 +81,21 @@ public class RequestKeyV2Codec implements RequestKeyCodec {
if (!start.isEmpty()) {
start = CodecUtils.decode(start);
if (ByteString.unsignedLexicographicalComparator().compare(start, keyPrefix) < 0) {
start = ByteString.EMPTY;
} else {
start = decodeKey(start);
}
}
if (!end.isEmpty()) {
end = CodecUtils.decode(end);
if (ByteString.unsignedLexicographicalComparator().compare(end, infiniteEndKey) >= 0) {
end = ByteString.EMPTY;
} else {
end = decodeKey(end);
}
}
if (ByteString.unsignedLexicographicalComparator().compare(start, infiniteEndKey) >= 0
|| (!end.isEmpty()
&& ByteString.unsignedLexicographicalComparator().compare(end, keyPrefix) <= 0)) {
throw new IllegalArgumentException("region out of keyspace" + region.toString());
}
start = start.startsWith(keyPrefix) ? start.substring(keyPrefix.size()) : ByteString.EMPTY;
end = end.startsWith(keyPrefix) ? end.substring(keyPrefix.size()) : ByteString.EMPTY;
return builder.setStartKey(start).setEndKey(end).build();
}
}

View File

@ -147,7 +147,7 @@ public class RowV2 {
if (this.large) {
v = this.colIDs32[h];
} else {
v = this.colIDs[h];
v = this.colIDs[h] & 0xFF;
}
if (v < colID) {
i = h + 1;

View File

@ -97,6 +97,8 @@ public class CacheInvalidateEvent implements Serializable {
public enum CacheType implements Serializable {
REGION_STORE,
STORE,
REGION,
REQ_FAILED,
LEADER
}

View File

@ -123,9 +123,9 @@ public class ColumnRef extends Expression {
@Override
public int hashCode() {
if (isResolved()) {
return Objects.hash(this.name, this.dataType);
return Objects.hash(this.name.toLowerCase(), this.dataType);
} else {
return Objects.hashCode(name);
return Objects.hashCode(name.toLowerCase());
}
}

View File

@ -21,10 +21,13 @@ import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.event.CacheInvalidateEvent.CacheType;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.region.RegionErrorReceiver;
@ -43,6 +46,11 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
private final Function<RespT, Errorpb.Error> getRegionError;
private final RegionManager regionManager;
private final RegionErrorReceiver recv;
private final List<Function<CacheInvalidateEvent, Void>> cacheInvalidateCallBackList;
private final ExecutorService callBackThreadPool;
private final int INVALID_STORE_ID = 0;
private final int INVALID_REGION_ID = 0;
public RegionErrorHandler(
RegionManager regionManager,
@ -51,6 +59,8 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
this.recv = recv;
this.regionManager = regionManager;
this.getRegionError = getRegionError;
this.cacheInvalidateCallBackList = regionManager.getCacheInvalidateCallbackList();
this.callBackThreadPool = regionManager.getCallBackThreadPool();
}
@Override
@ -107,6 +117,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
if (!retry) {
this.regionManager.invalidateRegion(recv.getRegion());
notifyRegionLeaderError(recv.getRegion());
}
backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString()));
@ -116,15 +127,14 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
// this error is reported from raftstore:
// store_id requested at the moment is inconsistent with that expected
// Solutionre-fetch from PD
long storeId = recv.getRegion().getLeader().getStoreId();
long storeId = error.getStoreNotMatch().getRequestStoreId();
long actualStoreId = error.getStoreNotMatch().getActualStoreId();
logger.warn(
String.format(
"Store Not Match happened with region id %d, store id %d, actual store id %d",
recv.getRegion().getId(), storeId, actualStoreId));
this.regionManager.invalidateRegion(recv.getRegion());
this.regionManager.invalidateStore(storeId);
// may request store which is not leader.
invalidateRegionStoreCache(recv.getRegion(), storeId);
// assume this is a low probability error, do not retry, just re-split the request by
// throwing it out.
return false;
@ -143,8 +153,6 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
BackOffFunction.BackOffFuncType.BoServerBusy,
new StatusRuntimeException(
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())));
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
return true;
} else if (error.hasStaleCommand()) {
// this error is reported from raftstore:
@ -179,7 +187,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion()));
// For other errors, we only drop cache here.
// Upper level may split this task.
invalidateRegionStoreCache(recv.getRegion());
invalidateRegionStoreCache(recv.getRegion(), recv.getRegion().getLeader().getStoreId());
// retry if raft proposal is dropped, it indicates the store is in the middle of transition
if (error.getMessage().contains("Raft ProposalDropped")) {
backOffer.doBackOff(
@ -196,6 +204,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
private boolean onRegionEpochNotMatch(BackOffer backOffer, List<Metapb.Region> currentRegions) {
if (currentRegions.size() == 0) {
this.regionManager.onRegionStale(recv.getRegion());
notifyRegionCacheInvalidate(recv.getRegion());
return false;
}
@ -220,7 +229,16 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
// If the region epoch is not ahead of TiKV's, replace region meta in region cache.
for (Metapb.Region meta : currentRegions) {
// The region needs to be decoded to plain format.
meta = regionManager.getPDClient().getCodec().decodeRegion(meta);
try {
meta = regionManager.getPDClient().getCodec().decodeRegion(meta);
} catch (Exception e) {
logger.warn("ignore invalid region: " + meta.toString());
// if the region is invalid, ignore it since the following situation might appear.
// Assuming a region with range [r000, z), then it splits into:
// [r000, x) [x, z), the right region is invalid for keyspace `r000`.
// We should only care about the valid region.
continue;
}
TiRegion region = regionManager.createRegion(meta, backOffer);
newRegions.add(region);
if (recv.getRegion().getVerID() == region.getVerID()) {
@ -229,6 +247,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
}
if (needInvalidateOld) {
notifyRegionCacheInvalidate(recv.getRegion());
this.regionManager.onRegionStale(recv.getRegion());
}
@ -271,8 +290,51 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
return recv.getRegion();
}
private void invalidateRegionStoreCache(TiRegion ctxRegion) {
private void notifyRegionRequestError(
TiRegion ctxRegion, long storeId, CacheInvalidateEvent.CacheType type) {
CacheInvalidateEvent event;
// When store(region) id is invalid,
// it implies that the error was not caused by store(region) error.
switch (type) {
case REGION:
case LEADER:
event = new CacheInvalidateEvent(ctxRegion.getId(), INVALID_STORE_ID, true, false, type);
break;
case REGION_STORE:
event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type);
break;
case REQ_FAILED:
event = new CacheInvalidateEvent(INVALID_REGION_ID, INVALID_STORE_ID, false, false, type);
break;
default:
throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type);
}
if (cacheInvalidateCallBackList != null) {
for (Function<CacheInvalidateEvent, Void> cacheInvalidateCallBack :
cacheInvalidateCallBackList) {
callBackThreadPool.submit(
() -> {
try {
cacheInvalidateCallBack.apply(event);
} catch (Exception e) {
logger.error(String.format("CacheInvalidCallBack failed %s", e));
}
});
}
}
}
private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) {
regionManager.invalidateRegion(ctxRegion);
regionManager.invalidateStore(ctxRegion.getLeader().getStoreId());
regionManager.invalidateStore(storeId);
notifyRegionRequestError(ctxRegion, storeId, CacheType.REGION_STORE);
}
private void notifyRegionCacheInvalidate(TiRegion ctxRegion) {
notifyRegionRequestError(ctxRegion, 0, CacheType.REGION);
}
private void notifyRegionLeaderError(TiRegion ctxRegion) {
notifyRegionRequestError(ctxRegion, 0, CacheType.LEADER);
}
}

View File

@ -23,13 +23,18 @@ import com.google.protobuf.ByteString;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.InvalidStoreException;
import org.tikv.common.exception.TiClientInternalException;
@ -68,9 +73,36 @@ public class RegionManager {
private final TiConfiguration conf;
private final ScheduledExecutorService executor;
private final StoreHealthyChecker storeChecker;
private final CopyOnWriteArrayList<Function<CacheInvalidateEvent, Void>>
cacheInvalidateCallbackList;
private final ExecutorService callBackThreadPool;
private AtomicInteger tiflashStoreIndex = new AtomicInteger(0);
public RegionManager(
TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) {
this(conf, pdClient, channelFactory, 1);
}
public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
this(conf, pdClient, 1);
}
public RegionManager(
TiConfiguration conf, ReadOnlyPDClient pdClient, int callBackExecutorThreadNum) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
this.storeChecker = null;
this.executor = null;
this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
}
public RegionManager(
TiConfiguration conf,
ReadOnlyPDClient pdClient,
ChannelFactory channelFactory,
int callBackExecutorThreadNum) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
@ -81,26 +113,34 @@ public class RegionManager {
this.storeChecker = storeChecker;
this.executor = Executors.newScheduledThreadPool(1);
this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS);
}
public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
this.storeChecker = null;
this.executor = null;
this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
}
public synchronized void close() {
if (this.executor != null) {
this.executor.shutdownNow();
}
this.callBackThreadPool.shutdownNow();
}
public ReadOnlyPDClient getPDClient() {
return this.pdClient;
}
public ExecutorService getCallBackThreadPool() {
return callBackThreadPool;
}
public List<Function<CacheInvalidateEvent, Void>> getCacheInvalidateCallbackList() {
return cacheInvalidateCallbackList;
}
public void addCacheInvalidateCallback(
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
this.cacheInvalidateCallbackList.add(cacheInvalidateCallback);
}
public void invalidateAll() {
cache.invalidateAll();
}
@ -137,8 +177,13 @@ public class RegionManager {
Pair<Metapb.Region, Metapb.Peer> regionAndLeader = pdClient.getRegionByKey(backOffer, key);
region =
cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer));
logger.debug(
String.format(
"get region id: %d with leader: %d",
region.getId(), region.getLeader().getStoreId()));
}
} catch (Exception e) {
logger.warn("Get region failed: ", e);
return null;
} finally {
requestTimer.observeDuration();
@ -188,20 +233,40 @@ public class RegionManager {
TiStore store = null;
if (storeType == TiStoreType.TiKV) {
Peer peer = region.getCurrentReplica();
store = getStoreById(peer.getStoreId(), backOffer);
// check from the first replica in case it recovers
List<Peer> replicaList = region.getReplicaList();
for (int i = 0; i < replicaList.size(); i++) {
Peer peer = replicaList.get(i);
store = getStoreById(peer.getStoreId(), backOffer);
if (store.isReachable()) {
// update replica's index
region.setReplicaIdx(i);
break;
}
logger.info("Store {} is unreachable, try to get the next replica", peer.getStoreId());
}
// Does not set unreachable store to null in case it is incompatible with GrpcForward
if (store == null || !store.isReachable()) {
logger.warn("No TiKV store available for region: " + region);
}
} else {
outerLoop:
List<TiStore> tiflashStores = new ArrayList<>();
for (Peer peer : region.getLearnerList()) {
TiStore s = getStoreById(peer.getStoreId(), backOffer);
for (Metapb.StoreLabel label : s.getStore().getLabelsList()) {
if (label.getKey().equals(storeType.getLabelKey())
&& label.getValue().equals(storeType.getLabelValue())) {
store = s;
break outerLoop;
}
if (!s.isReachable()) {
continue;
}
if (s.isTiFlash()) {
tiflashStores.add(s);
}
}
// select a tiflash with Round-Robin strategy
if (tiflashStores.size() > 0) {
store =
tiflashStores.get(
Math.floorMod(tiflashStoreIndex.getAndIncrement(), tiflashStores.size()));
}
if (store == null) {
// clear the region cache, so we may get the learner peer next time
cache.invalidateRegion(region);

View File

@ -357,7 +357,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
this,
lockResolverClient,
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
resp -> null,
resp -> resp.hasError() ? resp.getError() : null,
resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
version,
forWrite);
@ -366,13 +366,14 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
// we need to update region after retry
region = regionManager.getRegionByKey(startKey, backOffer);
if (isScanSuccess(backOffer, resp)) {
return doScan(resp);
if (handleScanResponse(backOffer, resp, version, forWrite)) {
return resp.getPairsList();
}
}
}
private boolean isScanSuccess(BackOffer backOffer, ScanResponse resp) {
private boolean handleScanResponse(
BackOffer backOffer, ScanResponse resp, long version, boolean forWrite) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("ScanResponse failed without a cause");
@ -381,28 +382,35 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
backOffer.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
return false;
}
return true;
}
// TODO: resolve locks after scan
private List<KvPair> doScan(ScanResponse resp) {
// Check if kvPair contains error, it should be a Lock if hasError is true.
List<KvPair> kvPairs = resp.getPairsList();
List<KvPair> newKvPairs = new ArrayList<>();
for (KvPair kvPair : kvPairs) {
// Resolve locks
// Note: Memory lock conflict is returned by both `ScanResponse.error` &
// `ScanResponse.pairs[0].error`, while other key errors are returned by
// `ScanResponse.pairs.error`
// See https://github.com/pingcap/kvproto/pull/697
List<Lock> locks = new ArrayList<>();
for (KvPair kvPair : resp.getPairsList()) {
if (kvPair.hasError()) {
Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError(), codec);
newKvPairs.add(
KvPair.newBuilder()
.setError(kvPair.getError())
.setValue(kvPair.getValue())
.setKey(lock.getKey())
.build());
} else {
newKvPairs.add(codec.decodeKvPair(kvPair));
locks.add(lock);
}
}
return Collections.unmodifiableList(newKvPairs);
if (!locks.isEmpty()) {
ResolveLockResult resolveLockResult =
lockResolverClient.resolveLocks(backOffer, version, locks, forWrite);
addResolvedLocks(version, resolveLockResult.getResolvedLocks());
long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
if (msBeforeExpired > 0) {
// if not resolve all locks, we wait and retry
backOffer.doBackOffWithMaxSleep(
BoTxnLockFast, msBeforeExpired, new KeyException(locks.toString()));
}
return false;
}
return true;
}
public List<KvPair> scan(BackOffer backOffer, ByteString startKey, long version) {

View File

@ -20,17 +20,22 @@ import io.grpc.ManagedChannel;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.ClientCalls;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Mpp;
import org.tikv.kvproto.Mpp.IsAliveRequest;
import org.tikv.kvproto.TikvGrpc;
public class StoreHealthyChecker implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class);
@ -75,6 +80,30 @@ public class StoreHealthyChecker implements Runnable {
private boolean checkStoreHealth(TiStore store) {
String addressStr = store.getStore().getAddress();
if (store.isTiFlash()) {
return checkTiFlashHealth(addressStr);
}
return checkTiKVHealth(addressStr);
}
private boolean checkTiFlashHealth(String addressStr) {
try {
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
TikvGrpc.TikvBlockingStub stub =
TikvGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Supplier<IsAliveRequest> factory = () -> Mpp.IsAliveRequest.newBuilder().build();
Mpp.IsAliveResponse resp =
ClientCalls.blockingUnaryCall(
stub.getChannel(), TikvGrpc.getIsAliveMethod(), stub.getCallOptions(), factory.get());
return resp != null && resp.getAvailable();
} catch (Exception e) {
logger.info(
"fail to check TiFlash health, regard as unhealthy. TiFlash address: " + addressStr, e);
return false;
}
}
private boolean checkTiKVHealth(String addressStr) {
try {
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
HealthGrpc.HealthBlockingStub stub =
@ -83,6 +112,7 @@ public class StoreHealthyChecker implements Runnable {
HealthCheckResponse resp = stub.check(req);
return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
} catch (Exception e) {
logger.info("fail to check TiKV health, regard as unhealthy. TiKV address: " + addressStr, e);
return false;
}
}

View File

@ -126,6 +126,14 @@ public class TiRegion implements Serializable {
return getCurrentReplica();
}
public void setReplicaIdx(int idx) {
replicaIdx = idx;
}
public List<Peer> getReplicaList() {
return replicaList;
}
private boolean isLeader(Peer peer) {
return getLeader().equals(peer);
}

View File

@ -105,4 +105,14 @@ public class TiStore implements Serializable {
public long getId() {
return this.store.getId();
}
public boolean isTiFlash() {
for (Metapb.StoreLabel label : store.getLabelsList()) {
if (label.getKey().equals(TiStoreType.TiFlash.getLabelKey())
&& label.getValue().equals(TiStoreType.TiFlash.getLabelValue())) {
return true;
}
}
return false;
}
}

View File

@ -18,6 +18,7 @@
package org.tikv.common.util;
import java.io.Serializable;
import java.util.Objects;
public class Pair<F, S> implements Serializable {
public final F first;
@ -36,4 +37,21 @@ public class Pair<F, S> implements Serializable {
public String toString() {
return String.format("[%s:%s]", first, second);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Pair<?, ?> pair = (Pair<?, ?>) o;
return Objects.equals(first, pair.first) && Objects.equals(second, pair.second);
}
@Override
public int hashCode() {
return Objects.hash(first, second);
}
}

View File

@ -50,6 +50,7 @@ public class KVClient implements AutoCloseable {
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final ExecutorService executorService;
private Set<Long> resolvedLocks = Collections.emptySet();
public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder, TiSession session) {
Objects.requireNonNull(conf, "conf is null");
@ -223,6 +224,10 @@ public class KVClient implements AutoCloseable {
if (oldRegion.equals(currentRegion)) {
RegionStoreClient client = clientBuilder.build(batch.getRegion());
// set resolvedLocks for the new client
if (!resolvedLocks.isEmpty()) {
client.addResolvedLocks(version, resolvedLocks);
}
try {
return client.batchGet(backOffer, batch.getKeys(), version);
} catch (final TiKVException e) {
@ -230,7 +235,8 @@ public class KVClient implements AutoCloseable {
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
logger.warn("ReSplitting ranges for BatchGetRequest", e);
// retry
// get resolved locks and retry
resolvedLocks = client.getResolvedLocks(version);
return doSendBatchGetWithRefetchRegion(backOffer, batch, version);
}
} else {

View File

@ -0,0 +1,130 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.junit.Test;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Errorpb.EpochNotMatch;
import org.tikv.kvproto.Errorpb.NotLeader;
import org.tikv.kvproto.Errorpb.StoreNotMatch;
import org.tikv.kvproto.Metapb;
public class CacheInvalidCallBackTest extends MockServerTest {
private RegionStoreClient createClient(
String version, Function<CacheInvalidateEvent, Void> cacheInvalidateCallBack) {
Metapb.Store meta =
Metapb.Store.newBuilder()
.setAddress(LOCAL_ADDR + ":" + port)
.setId(1)
.setState(Metapb.StoreState.Up)
.setVersion(version)
.build();
TiStore store = new TiStore(meta);
RegionManager manager = new RegionManager(session.getConf(), session.getPDClient());
manager.addCacheInvalidateCallback(cacheInvalidateCallBack);
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(
session.getConf(), session.getChannelFactory(), manager, session.getPDClient());
return builder.build(region, store);
}
@Test
public void testcacheInvalidCallBack() {
String version = "3.0.12";
CacheInvalidateCallBack cacheInvalidateCallBack = new CacheInvalidateCallBack();
doRawGetTest(createClient(version, cacheInvalidateCallBack), cacheInvalidateCallBack);
}
public void doRawGetTest(
RegionStoreClient client, CacheInvalidateCallBack cacheInvalidateCallBack) {
server.put("key1", "value1");
Optional<ByteString> value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1"));
assertEquals(ByteString.copyFromUtf8("value1"), value.get());
try {
server.putError(
"error1", () -> Errorpb.Error.newBuilder().setNotLeader(NotLeader.getDefaultInstance()));
client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("error1"));
fail();
} catch (Exception e) {
assertEquals(1, cacheInvalidateCallBack.cacheInvalidateEvents.size());
}
server.putError(
"failure",
() -> Errorpb.Error.newBuilder().setEpochNotMatch(EpochNotMatch.getDefaultInstance()));
try {
client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure"));
fail();
} catch (Exception e) {
sleep(1000);
assertEquals(2, cacheInvalidateCallBack.cacheInvalidateEvents.size());
}
server.putError(
"store_not_match",
() -> Errorpb.Error.newBuilder().setStoreNotMatch(StoreNotMatch.getDefaultInstance()));
try {
client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure"));
fail();
} catch (Exception e) {
sleep(1000);
assertEquals(3, cacheInvalidateCallBack.cacheInvalidateEvents.size());
}
server.clearAllMap();
client.close();
}
private void sleep(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
fail();
}
}
private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(1000);
}
static class CacheInvalidateCallBack implements Function<CacheInvalidateEvent, Void> {
public List<CacheInvalidateEvent> cacheInvalidateEvents = new ArrayList<>();
@Override
public Void apply(CacheInvalidateEvent cacheInvalidateEvent) {
cacheInvalidateEvents.add(cacheInvalidateEvent);
return null;
}
}
}

View File

@ -24,6 +24,7 @@ import java.util.Arrays;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.kvproto.Metapb.Peer;
import org.tikv.kvproto.Metapb.PeerRole;
import org.tikv.kvproto.Metapb.Region;
import org.tikv.kvproto.Metapb.RegionEpoch;
import org.tikv.kvproto.Metapb.Store;
@ -61,6 +62,10 @@ public class GrpcUtils {
return Peer.newBuilder().setStoreId(storeId).setId(id).build();
}
public static Peer makeLearnerPeer(long id, long storeId) {
return Peer.newBuilder().setRole(PeerRole.Learner).setStoreId(storeId).setId(id).build();
}
public static ByteString encodeKey(byte[] key) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key);

View File

@ -46,6 +46,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.key.Key;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
@ -67,6 +68,10 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
private final Map<Key, Supplier<Kvrpcpb.KeyError.Builder>> keyErrMap = new HashMap<>();
private final Map<Key, Supplier<Kvrpcpb.LockInfo.Builder>> lockMap = new HashMap<>();
private final Map<Long, Supplier<Kvrpcpb.CheckTxnStatusResponse.Builder>> txnStatusMap =
new HashMap<>();
// for KV error
public static final int ABORT = 1;
public static final int RETRY = 2;
@ -117,9 +122,68 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
regionErrMap.put(toRawKey(key.getBytes(StandardCharsets.UTF_8)), builder);
}
public void removeError(String key) {
regionErrMap.remove(toRawKey(key.getBytes(StandardCharsets.UTF_8)));
}
// putWithLock is used to "prewrite" key-value without "commit"
public void putWithLock(
ByteString key, ByteString value, ByteString primaryKey, Long startTs, Long ttl) {
put(key, value);
Kvrpcpb.LockInfo.Builder lock =
Kvrpcpb.LockInfo.newBuilder()
.setPrimaryLock(primaryKey)
.setLockVersion(startTs)
.setKey(key)
.setLockTtl(ttl);
lockMap.put(toRawKey(key), () -> lock);
}
public void removeLock(ByteString key) {
lockMap.remove(toRawKey(key));
}
public boolean hasLock(ByteString key) {
return lockMap.containsKey(toRawKey(key));
}
// putTxnStatus is used to save transaction status
// commitTs > 0: committed
// commitTs == 0 && key is empty: rollback
// commitTs == 0 && key not empty: locked by key
public void putTxnStatus(Long startTs, Long commitTs, ByteString key) {
if (commitTs > 0 || (commitTs == 0 && key.isEmpty())) { // committed || rollback
Kvrpcpb.CheckTxnStatusResponse.Builder txnStatus =
Kvrpcpb.CheckTxnStatusResponse.newBuilder()
.setCommitVersion(commitTs)
.setLockTtl(0)
.setAction(Kvrpcpb.Action.NoAction);
txnStatusMap.put(startTs, () -> txnStatus);
} else { // locked
Kvrpcpb.LockInfo.Builder lock = lockMap.get(toRawKey(key)).get();
Kvrpcpb.CheckTxnStatusResponse.Builder txnStatus =
Kvrpcpb.CheckTxnStatusResponse.newBuilder()
.setCommitVersion(commitTs)
.setLockTtl(lock.getLockTtl())
.setAction(Kvrpcpb.Action.NoAction)
.setLockInfo(lock);
txnStatusMap.put(startTs, () -> txnStatus);
}
}
// putTxnStatus is used to save transaction status
// commitTs > 0: committed
// commitTs == 0: rollback
public void putTxnStatus(Long startTs, Long commitTs) {
putTxnStatus(startTs, commitTs, ByteString.EMPTY);
}
public void clearAllMap() {
dataMap.clear();
regionErrMap.clear();
lockMap.clear();
txnStatusMap.clear();
}
private Errorpb.Error verifyContext(Context context) throws Exception {
@ -255,9 +319,12 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
return;
}
Supplier<Kvrpcpb.LockInfo.Builder> lock = lockMap.get(key);
Supplier<Kvrpcpb.KeyError.Builder> errProvider = keyErrMap.remove(key);
if (errProvider != null) {
builder.setError(errProvider.get().build());
} else if (lock != null) {
builder.setError(Kvrpcpb.KeyError.newBuilder().setLocked(lock.get()));
} else {
ByteString value = dataMap.get(key);
builder.setValue(value);
@ -299,11 +366,17 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
kvs.entrySet()
.stream()
.map(
kv ->
Kvrpcpb.KvPair.newBuilder()
.setKey(kv.getKey().toByteString())
.setValue(kv.getValue())
.build())
kv -> {
Kvrpcpb.KvPair.Builder kvBuilder =
Kvrpcpb.KvPair.newBuilder()
.setKey(kv.getKey().toByteString())
.setValue(kv.getValue());
Supplier<Kvrpcpb.LockInfo.Builder> lock = lockMap.get(kv.getKey());
if (lock != null) {
kvBuilder.setError(Kvrpcpb.KeyError.newBuilder().setLocked(lock.get()));
}
return kvBuilder.build();
})
.collect(Collectors.toList()));
}
responseObserver.onNext(builder.build());
@ -354,6 +427,96 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
}
}
@Override
public void kvCheckTxnStatus(
org.tikv.kvproto.Kvrpcpb.CheckTxnStatusRequest request,
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.CheckTxnStatusResponse>
responseObserver) {
logger.info("KVMockServer.kvCheckTxnStatus");
try {
Long startTs = request.getLockTs();
Long currentTs = request.getCurrentTs();
logger.info("kvCheckTxnStatus for txn: " + startTs);
Kvrpcpb.CheckTxnStatusResponse.Builder builder = Kvrpcpb.CheckTxnStatusResponse.newBuilder();
Error e = verifyContext(request.getContext());
if (e != null) {
responseObserver.onNext(builder.setRegionError(e).build());
responseObserver.onCompleted();
return;
}
Supplier<Kvrpcpb.CheckTxnStatusResponse.Builder> txnStatus = txnStatusMap.get(startTs);
if (txnStatus != null) {
Kvrpcpb.CheckTxnStatusResponse resp = txnStatus.get().build();
if (resp.getCommitVersion() == 0
&& resp.getLockTtl() > 0
&& TiTimestamp.extractPhysical(startTs) + resp.getLockInfo().getLockTtl()
< TiTimestamp.extractPhysical(currentTs)) {
ByteString key = resp.getLockInfo().getKey();
logger.info(
String.format(
"kvCheckTxnStatus rollback expired txn: %d, remove lock: %s",
startTs, key.toStringUtf8()));
removeLock(key);
putTxnStatus(startTs, 0L, ByteString.EMPTY);
resp = txnStatusMap.get(startTs).get().build();
}
logger.info("kvCheckTxnStatus resp: " + resp);
responseObserver.onNext(resp);
} else {
builder.setError(
Kvrpcpb.KeyError.newBuilder()
.setTxnNotFound(
Kvrpcpb.TxnNotFound.newBuilder()
.setPrimaryKey(request.getPrimaryKey())
.setStartTs(startTs)));
logger.info("kvCheckTxnStatus, TxnNotFound");
responseObserver.onNext(builder.build());
}
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("kvCheckTxnStatus error: " + e);
responseObserver.onError(Status.INTERNAL.asRuntimeException());
}
}
@Override
public void kvResolveLock(
org.tikv.kvproto.Kvrpcpb.ResolveLockRequest request,
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.ResolveLockResponse> responseObserver) {
logger.info("KVMockServer.kvResolveLock");
try {
Long startTs = request.getStartVersion();
Long commitTs = request.getCommitVersion();
logger.info(
String.format(
"kvResolveLock for txn: %d, commitTs: %d, keys: %d",
startTs, commitTs, request.getKeysCount()));
Kvrpcpb.ResolveLockResponse.Builder builder = Kvrpcpb.ResolveLockResponse.newBuilder();
Error e = verifyContext(request.getContext());
if (e != null) {
responseObserver.onNext(builder.setRegionError(e).build());
responseObserver.onCompleted();
return;
}
if (request.getKeysCount() == 0) {
lockMap.entrySet().removeIf(entry -> entry.getValue().get().getLockVersion() == startTs);
} else {
for (int i = 0; i < request.getKeysCount(); i++) {
removeLock(request.getKeys(i));
}
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL.asRuntimeException());
}
}
@Override
public void coprocessor(
org.tikv.kvproto.Coprocessor.Request requestWrap,

View File

@ -0,0 +1,89 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common;
import static org.tikv.common.GrpcUtils.encodeKey;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.tikv.common.apiversion.RequestKeyV1TxnCodec;
import org.tikv.common.key.Key;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Coprocessor.KeyRange;
import org.tikv.kvproto.Kvrpcpb.CommandPri;
import org.tikv.kvproto.Kvrpcpb.IsolationLevel;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Peer;
import org.tikv.kvproto.Metapb.Region;
public class MockRegionManager extends RegionManager {
private final Map<KeyRange, TiRegion> mockRegionMap;
private static TiRegion region(long id, KeyRange range) {
RequestKeyV1TxnCodec v1 = new RequestKeyV1TxnCodec();
TiConfiguration configuration = new TiConfiguration();
configuration.setIsolationLevel(IsolationLevel.RC);
configuration.setCommandPriority(CommandPri.Low);
Region r =
Metapb.Region.newBuilder()
.setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
.setId(id)
.setStartKey(encodeKey(range.getStart().toByteArray()))
.setEndKey(encodeKey(range.getEnd().toByteArray()))
.addPeers(Peer.getDefaultInstance())
.build();
List<Metapb.Store> s = ImmutableList.of(Metapb.Store.newBuilder().setId(id).build());
return new TiRegion(
configuration,
v1.decodeRegion(r),
null,
r.getPeersList(),
s.stream().map(TiStore::new).collect(Collectors.toList()));
}
public MockRegionManager(List<KeyRange> ranges) {
super(null, null);
mockRegionMap =
ranges.stream().collect(Collectors.toMap(kr -> kr, kr -> region(ranges.indexOf(kr), kr)));
}
@Override
public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, TiStoreType storeType) {
for (Map.Entry<KeyRange, TiRegion> entry : mockRegionMap.entrySet()) {
KeyRange range = entry.getKey();
if (KeyRangeUtils.makeRange(range.getStart(), range.getEnd()).contains(Key.toRawKey(key))) {
TiRegion region = entry.getValue();
return Pair.create(
region, new TiStore(Metapb.Store.newBuilder().setId(region.getId()).build()));
}
}
return null;
}
}

View File

@ -39,6 +39,8 @@ public class MockServerTest extends PDMockServerTest {
public void setup() throws IOException {
super.setup();
port = GrpcUtils.getFreePort();
Metapb.Region r =
Metapb.Region.newBuilder()
.setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
@ -51,7 +53,7 @@ public class MockServerTest extends PDMockServerTest {
List<Metapb.Store> s =
ImmutableList.of(
Metapb.Store.newBuilder()
.setAddress("localhost:1234")
.setAddress(LOCAL_ADDR + ":" + port)
.setVersion("5.0.0")
.setId(13)
.build());
@ -70,6 +72,6 @@ public class MockServerTest extends PDMockServerTest {
(request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
}
server = new KVMockServer();
port = server.start(region);
server.start(region, port);
}
}

View File

@ -74,9 +74,12 @@ public class PDClientMockTest extends PDMockServerTest {
@Test
public void testTso() throws Exception {
try (PDClient client = session.getPDClient()) {
Long current = System.currentTimeMillis();
TiTimestamp ts = client.getTimestamp(defaultBackOff());
// Test pdServer is set to generate physical == logical + 1
assertEquals(ts.getPhysical(), ts.getLogical() + 1);
// Test pdServer is set to generate physical to current, logical to 1
assertTrue(ts.getPhysical() >= current);
assertTrue(ts.getPhysical() < current + 100);
assertEquals(ts.getLogical(), 1);
}
}

View File

@ -75,8 +75,17 @@ public class PDMockServer extends PDGrpc.PDImplBase {
@Override
public StreamObserver<TsoRequest> tso(StreamObserver<TsoResponse> resp) {
return new StreamObserver<TsoRequest>() {
private int physical = 1;
private int logical = 0;
private long physical = System.currentTimeMillis();
private long logical = 0;
private void updateTso() {
logical++;
if (logical >= (1 << 18)) {
logical = 0;
physical++;
}
physical = Math.max(physical, System.currentTimeMillis());
}
@Override
public void onNext(TsoRequest value) {}
@ -86,7 +95,8 @@ public class PDMockServer extends PDGrpc.PDImplBase {
@Override
public void onCompleted() {
resp.onNext(GrpcUtils.makeTsoResponse(clusterId, physical++, logical++));
updateTso();
resp.onNext(GrpcUtils.makeTsoResponse(clusterId, physical, logical));
resp.onCompleted();
}
};

View File

@ -31,6 +31,7 @@ import org.tikv.common.key.Key;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
@ -135,7 +136,7 @@ public class RegionManagerTest extends PDMockServerTest {
Pair<TiRegion, TiStore> pair = mgr.getRegionStorePairByKey(searchKey);
assertEquals(pair.first.getId(), regionId);
assertEquals(pair.first.getId(), storeId);
assertEquals(pair.second.getId(), 10);
}
@Test
@ -179,4 +180,52 @@ public class RegionManagerTest extends PDMockServerTest {
} catch (Exception ignored) {
}
}
@Test
public void getRegionStorePairByKeyWithTiFlash() {
ByteString startKey = ByteString.copyFrom(new byte[] {1});
ByteString endKey = ByteString.copyFrom(new byte[] {10});
ByteString searchKey = ByteString.copyFrom(new byte[] {5});
String testAddress = "testAddress";
long firstStoreId = 233;
long secondStoreId = 234;
int confVer = 1026;
int ver = 1027;
long regionId = 233;
leader.addGetRegionListener(
request ->
GrpcUtils.makeGetRegionResponse(
leader.getClusterId(),
GrpcUtils.makeRegion(
regionId,
GrpcUtils.encodeKey(startKey.toByteArray()),
GrpcUtils.encodeKey(endKey.toByteArray()),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makeLearnerPeer(1, firstStoreId),
GrpcUtils.makeLearnerPeer(2, secondStoreId))));
AtomicInteger i = new AtomicInteger(0);
long[] ids = new long[] {firstStoreId, secondStoreId};
leader.addGetStoreListener(
(request ->
GrpcUtils.makeGetStoreResponse(
leader.getClusterId(),
GrpcUtils.makeStore(
ids[i.getAndIncrement()],
testAddress,
StoreState.Up,
GrpcUtils.makeStoreLabel("engine", "tiflash"),
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2")))));
Pair<TiRegion, TiStore> pair = mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash);
assertEquals(pair.first.getId(), regionId);
assertEquals(pair.second.getId(), firstStoreId);
Pair<TiRegion, TiStore> secondPair =
mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash);
assertEquals(secondPair.first.getId(), regionId);
assertEquals(secondPair.second.getId(), secondStoreId);
}
}

View File

@ -17,15 +17,16 @@
package org.tikv.common;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Optional;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.exception.KeyException;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
@ -40,6 +41,7 @@ import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
public class RegionStoreClientTest extends MockServerTest {
private static final Logger logger = LoggerFactory.getLogger(MockServerTest.class);
private RegionStoreClient createClientV2() {
return createClient("2.1.19");
@ -49,6 +51,10 @@ public class RegionStoreClientTest extends MockServerTest {
return createClient("3.0.12");
}
private RegionStoreClient createClientV4() {
return createClient("6.1.0");
}
private RegionStoreClient createClient(String version) {
Metapb.Store meta =
Metapb.Store.newBuilder()
@ -161,30 +167,130 @@ public class RegionStoreClientTest extends MockServerTest {
@Test
public void scanTest() {
doScanTest(createClientV3());
doScanTest(createClientV4());
}
public void doScanTest(RegionStoreClient client) {
Long startTs = session.getTimestamp().getVersion();
server.put("key1", "value1");
server.put("key2", "value2");
server.put("key4", "value4");
server.put("key5", "value5");
List<Kvrpcpb.KvPair> kvs = client.scan(defaultBackOff(), ByteString.copyFromUtf8("key2"), 1);
assertEquals(3, kvs.size());
// put lock will expire in 1s
ByteString key6 = ByteString.copyFromUtf8("key6");
server.putWithLock(key6, ByteString.copyFromUtf8("value6"), key6, startTs, 100L);
server.putTxnStatus(startTs, 0L, key6);
assertTrue(server.hasLock(key6));
List<Kvrpcpb.KvPair> kvs =
client.scan(
defaultBackOff(), ByteString.copyFromUtf8("key2"), session.getTimestamp().getVersion());
assertEquals(4, kvs.size());
kvs.forEach(
kv ->
assertEquals(
kv.getKey().toStringUtf8().replace("key", "value"), kv.getValue().toStringUtf8()));
assertFalse(server.hasLock(key6));
// put region error
server.putError(
"error1",
() -> Errorpb.Error.newBuilder().setServerIsBusy(ServerIsBusy.getDefaultInstance()));
try {
client.scan(defaultBackOff(), ByteString.copyFromUtf8("error1"), 1);
client.scan(
defaultBackOff(), ByteString.copyFromUtf8("error1"), session.getTimestamp().getVersion());
fail();
} catch (Exception e) {
assertTrue(true);
}
server.removeError("error1");
// put lock
Long startTs7 = session.getTimestamp().getVersion();
ByteString key7 = ByteString.copyFromUtf8("key7");
server.putWithLock(key7, ByteString.copyFromUtf8("value7"), key7, startTs7, 3000L);
server.putTxnStatus(startTs7, 0L, key7);
assertTrue(server.hasLock(key7));
try {
client.scan(
defaultBackOff(), ByteString.copyFromUtf8("key2"), session.getTimestamp().getVersion());
fail();
} catch (Exception e) {
KeyException keyException = (KeyException) e.getCause();
assertTrue(keyException.getMessage().contains("org.tikv.txn.Lock"));
}
assertTrue(server.hasLock(key7));
server.clearAllMap();
client.close();
}
@Test
public void resolveLocksTest() {
doResolveLocksTest(createClientV4());
}
public void doResolveLocksTest(RegionStoreClient client) {
ByteString primaryKey = ByteString.copyFromUtf8("primary");
server.put(primaryKey, ByteString.copyFromUtf8("value0"));
// get with committed lock
{
Long startTs = session.getTimestamp().getVersion();
Long commitTs = session.getTimestamp().getVersion();
logger.info("startTs: " + startTs);
ByteString key1 = ByteString.copyFromUtf8("key1");
ByteString value1 = ByteString.copyFromUtf8("value1");
server.putWithLock(key1, value1, primaryKey, startTs, 1L);
server.putTxnStatus(startTs, commitTs);
assertTrue(server.hasLock(key1));
ByteString expected1 = client.get(defaultBackOff(), key1, 200);
assertEquals(value1, expected1);
assertFalse(server.hasLock(key1));
}
// get with not expired lock.
{
Long startTs = session.getTimestamp().getVersion();
logger.info("startTs: " + startTs);
ByteString key2 = ByteString.copyFromUtf8("key2");
ByteString value2 = ByteString.copyFromUtf8("value2");
server.putWithLock(key2, value2, key2, startTs, 3000L);
server.putTxnStatus(startTs, 0L, key2);
assertTrue(server.hasLock(key2));
try {
client.get(defaultBackOff(), key2, session.getTimestamp().getVersion());
fail();
} catch (Exception e) {
KeyException keyException = (KeyException) e.getCause();
assertTrue(keyException.getMessage().contains("org.tikv.txn.Lock"));
}
assertTrue(server.hasLock(key2));
}
// get with expired lock.
{
Long startTs = session.getTimestamp().getVersion();
logger.info("startTs: " + startTs);
ByteString key3 = ByteString.copyFromUtf8("key3");
ByteString value3 = ByteString.copyFromUtf8("value3");
server.putWithLock(key3, value3, key3, startTs, 100L);
server.putTxnStatus(startTs, 0L, key3);
assertTrue(server.hasLock(key3));
ByteString expected3 =
client.get(defaultBackOff(), key3, session.getTimestamp().getVersion());
assertEquals(expected3, value3);
assertFalse(server.hasLock(key3));
}
server.clearAllMap();
client.close();
}

View File

@ -17,8 +17,7 @@
package org.tikv.common.apiversion;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
@ -177,5 +176,85 @@ public class RequestKeyCodecTest {
decoded = v2.decodeRegion(region);
assertEquals(start, decoded.getStartKey());
assertEquals(ByteString.EMPTY, decoded.getEndKey());
// test region out of keyspace
{
ByteString m_123 = CodecUtils.encode(ByteString.copyFromUtf8("m_123"));
ByteString m_124 = CodecUtils.encode(ByteString.copyFromUtf8("m_124"));
ByteString infiniteEndKey_0 =
CodecUtils.encode(v2.infiniteEndKey.concat(ByteString.copyFrom(new byte[] {0})));
ByteString t_123 = CodecUtils.encode(ByteString.copyFromUtf8("t_123"));
ByteString y_123 = CodecUtils.encode(ByteString.copyFromUtf8("y_123"));
ByteString[][] outOfKeyspaceCases = {
{ByteString.EMPTY, CodecUtils.encode(v2.keyPrefix)}, // ["", "r000"/"x000")
{ByteString.EMPTY, m_123},
{m_123, m_124},
{m_124, CodecUtils.encode(v2.keyPrefix)},
{CodecUtils.encode(v2.infiniteEndKey), ByteString.EMPTY}, // ["r001"/"x001", "")
{CodecUtils.encode(v2.infiniteEndKey), infiniteEndKey_0},
{infiniteEndKey_0, t_123},
{y_123, ByteString.EMPTY}, // "y_123" is bigger than "infiniteEndKey" for both raw & txn.
};
for (ByteString[] testCase : outOfKeyspaceCases) {
region = Region.newBuilder().setStartKey(testCase[0]).setEndKey(testCase[1]).build();
try {
decoded = v2.decodeRegion(region);
fail(String.format("[%s,%s): %s", testCase[0], testCase[1], decoded.toString()));
} catch (Exception ignored) {
}
}
}
// case: regionStartKey == "" < keyPrefix < regionEndKey < infiniteEndKey
region =
Region.newBuilder()
.setStartKey(ByteString.EMPTY)
.setEndKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0"))))
.build();
decoded = v2.decodeRegion(region);
assertTrue(decoded.getStartKey().isEmpty());
assertEquals(ByteString.copyFromUtf8("0"), decoded.getEndKey());
// case: "" < regionStartKey < keyPrefix < regionEndKey < infiniteEndKey < ""
region =
Region.newBuilder()
.setStartKey(CodecUtils.encode(ByteString.copyFromUtf8("m_123")))
.setEndKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0"))))
.build();
decoded = v2.decodeRegion(region);
assertEquals(ByteString.EMPTY, decoded.getStartKey());
assertEquals(ByteString.copyFromUtf8("0"), decoded.getEndKey());
// case: "" < regionStartKey < keyPrefix < infiniteEndKey < regionEndKey < ""
region =
Region.newBuilder()
.setStartKey(CodecUtils.encode(ByteString.copyFromUtf8("m_123")))
.setEndKey(CodecUtils.encode(v2.infiniteEndKey.concat(ByteString.copyFromUtf8("0"))))
.build();
decoded = v2.decodeRegion(region);
assertEquals(ByteString.EMPTY, decoded.getStartKey());
assertEquals(ByteString.EMPTY, decoded.getEndKey());
// case: keyPrefix < regionStartKey < infiniteEndKey < regionEndKey < ""
region =
Region.newBuilder()
.setStartKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0"))))
.setEndKey(CodecUtils.encode(v2.infiniteEndKey.concat(ByteString.copyFromUtf8("0"))))
.build();
decoded = v2.decodeRegion(region);
assertEquals(ByteString.copyFromUtf8("0"), decoded.getStartKey());
assertTrue(decoded.getEndKey().isEmpty());
// case: keyPrefix < regionStartKey < infiniteEndKey < regionEndKey == ""
region =
Region.newBuilder()
.setStartKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0"))))
.setEndKey(ByteString.EMPTY)
.build();
decoded = v2.decodeRegion(region);
assertEquals(ByteString.copyFromUtf8("0"), decoded.getStartKey());
assertTrue(decoded.getEndKey().isEmpty());
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2023 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.util;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.Test;
import org.tikv.common.PDMockServerTest;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Peer;
public class PairTest extends PDMockServerTest {
@Test
public void testPair() {
Metapb.Region r =
Metapb.Region.newBuilder()
.setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
.setId(233)
.setStartKey(ByteString.EMPTY)
.setEndKey(ByteString.EMPTY)
.addPeers(Peer.getDefaultInstance())
.build();
List<Metapb.Store> s =
ImmutableList.of(
Metapb.Store.newBuilder()
.setAddress(LOCAL_ADDR + ":" + 4000)
.setVersion("5.0.0")
.setId(1)
.build());
TiRegion region =
new TiRegion(
session.getConf(),
r,
r.getPeers(0),
r.getPeersList(),
s.stream().map(TiStore::new).collect(Collectors.toList()));
TiStore store = new TiStore(s.get(0));
Map<Pair<TiRegion, TiStore>, List<ByteString>> groupKeyMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
Pair<TiRegion, TiStore> pair = Pair.create(region, store);
groupKeyMap
.computeIfAbsent(pair, e -> new ArrayList<>())
.add(ByteString.copyFromUtf8("test"));
}
Pair<TiRegion, TiStore> pair = Pair.create(region, store);
assert (groupKeyMap.get(pair).size() == 10);
}
}

View File

@ -0,0 +1,103 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.txn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import com.google.protobuf.ByteString;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import org.tikv.common.BytePairWrapper;
import org.tikv.common.ByteWrapper;
import org.tikv.common.exception.KeyException;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb.KvPair;
public class BatchGetTest extends TXNTest {
@Test
public void BatchGetResolveLockTest() throws Exception {
long lockTTL = 20000L;
String key1 = "batchGetResolveLockTestKey1";
String key2 = "batchGetResolveLockTestKey2";
String val1 = "val1";
String val2 = "val2";
String val1_update = "val1_update";
String val2_update = "val2_update";
// put key1 and key2
putKV(key1, val1);
putKV(key2, val2);
// run 2PC background
new Thread(
() -> {
long startTS = session.getTimestamp().getVersion();
try (TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(session, startTS, lockTTL)) {
byte[] primaryKey = key1.getBytes("UTF-8");
byte[] secondary = key2.getBytes("UTF-8");
// prewrite primary key
twoPhaseCommitter.prewritePrimaryKey(
ConcreteBackOffer.newCustomBackOff(5000),
primaryKey,
val1_update.getBytes("UTF-8"));
List<BytePairWrapper> pairs =
Arrays.asList(new BytePairWrapper(secondary, val2_update.getBytes("UTF-8")));
// prewrite secondary key
twoPhaseCommitter.prewriteSecondaryKeys(primaryKey, pairs.iterator(), 5000);
// get commitTS
long commitTS = session.getTimestamp().getVersion();
Thread.sleep(5000);
// commit primary key
twoPhaseCommitter.commitPrimaryKey(
ConcreteBackOffer.newCustomBackOff(5000), primaryKey, commitTS);
// commit secondary key
List<ByteWrapper> keys = Arrays.asList(new ByteWrapper(secondary));
twoPhaseCommitter.commitSecondaryKeys(keys.iterator(), commitTS, 5000);
} catch (Exception e) {
KeyException keyException = (KeyException) e.getCause().getCause();
assertNotSame("", keyException.getKeyErr().getCommitTsExpired().toString());
}
})
.start();
// wait 2PC get commitTS
Thread.sleep(2000);
// batch get key1 and key2
try (KVClient kvClient = session.createKVClient()) {
long version = session.getTimestamp().getVersion();
ByteString k1 = ByteString.copyFromUtf8(key1);
ByteString k2 = ByteString.copyFromUtf8(key2);
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(5000);
List<KvPair> kvPairs = kvClient.batchGet(backOffer, Arrays.asList(k1, k2), version);
// Since TiKV v4.0.0 write locked key will not block read. it is supported by Min Commit
// Timestamp
assertEquals(ByteString.copyFromUtf8(val1), kvPairs.get(0).getValue());
assertEquals(ByteString.copyFromUtf8(val2), kvPairs.get(1).getValue());
System.out.println(kvPairs);
// wait 2PC finish
Thread.sleep(10000);
}
}
}

View File

@ -41,7 +41,7 @@ import org.tikv.kvproto.Kvrpcpb;
public class TXNTest extends BaseTxnKVTest {
static final int DEFAULT_TTL = 10;
private TiSession session;
public TiSession session;
RegionStoreClient.RegionStoreClientBuilder builder;
@Before

View File

@ -0,0 +1,258 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.util;
import static org.junit.Assert.assertEquals;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import gnu.trove.list.array.TLongArrayList;
import gnu.trove.map.hash.TLongObjectHashMap;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.tikv.common.MockRegionManager;
import org.tikv.common.codec.Codec.IntegerCodec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.key.RowKey;
import org.tikv.common.key.RowKey.DecodeResult.Status;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor.KeyRange;
public class RangeSplitterTest {
private static KeyRange keyRange(Long s, Long e) {
ByteString sKey = ByteString.EMPTY;
ByteString eKey = ByteString.EMPTY;
if (s != null) {
CodecDataOutput cdo = new CodecDataOutput();
IntegerCodec.writeLongFully(cdo, s, true);
sKey = cdo.toByteString();
}
if (e != null) {
CodecDataOutput cdo = new CodecDataOutput();
IntegerCodec.writeLongFully(cdo, e, true);
eKey = cdo.toByteString();
}
return KeyRange.newBuilder().setStart(sKey).setEnd(eKey).build();
}
private static KeyRange keyRangeByHandle(long tableId, Long s, Long e) {
return keyRangeByHandle(tableId, s, Status.EQUAL, e, Status.EQUAL);
}
private static KeyRange keyRangeByHandle(long tableId, Long s, Status ss, Long e, Status es) {
ByteString sKey = shiftByStatus(handleToByteString(tableId, s), ss);
ByteString eKey = shiftByStatus(handleToByteString(tableId, e), es);
return KeyRange.newBuilder().setStart(sKey).setEnd(eKey).build();
}
private static ByteString shiftByStatus(ByteString v, Status s) {
switch (s) {
case EQUAL:
return v;
case LESS:
return v.substring(0, v.size() - 1);
case GREATER:
return v.concat(ByteString.copyFrom(new byte[] {1, 0}));
default:
throw new IllegalArgumentException("Only EQUAL,LESS,GREATER allowed");
}
}
private static ByteString handleToByteString(long tableId, Long k) {
if (k != null) {
return RowKey.toRowKey(tableId, k).toByteString();
}
return ByteString.EMPTY;
}
@Test
public void splitRangeByRegionTest() {
MockRegionManager mgr =
new MockRegionManager(
ImmutableList.of(keyRange(null, 30L), keyRange(30L, 50L), keyRange(50L, null)));
RangeSplitter s = RangeSplitter.newSplitter(mgr);
List<RangeSplitter.RegionTask> tasks =
s.splitRangeByRegion(
ImmutableList.of(
keyRange(0L, 40L), keyRange(41L, 42L), keyRange(45L, 50L), keyRange(70L, 1000L)));
assertEquals(tasks.get(0).getRegion().getId(), 0);
assertEquals(tasks.get(0).getRanges().size(), 1);
KeyRange range = tasks.get(0).getRanges().get(0);
assertEquals(tasks.get(0).getRanges().get(0), keyRange(0L, 30L));
assertEquals(tasks.get(1).getRegion().getId(), 1);
assertEquals(tasks.get(1).getRanges().get(0), keyRange(30L, 40L));
assertEquals(tasks.get(1).getRanges().get(1), keyRange(41L, 42L));
assertEquals(tasks.get(1).getRanges().get(2), keyRange(45L, 50L));
assertEquals(tasks.get(1).getRanges().size(), 3);
assertEquals(tasks.get(2).getRegion().getId(), 2);
assertEquals(tasks.get(2).getRanges().size(), 1);
assertEquals(tasks.get(2).getRanges().get(0), keyRange(70L, 1000L));
}
@Test
public void splitAndSortHandlesByRegionTest() {
final long tableId = 1;
List<Long> handles = new ArrayList<>();
handles.add(1L);
handles.add(5L);
handles.add(4L);
handles.add(3L);
handles.add(10L);
handles.add(2L);
handles.add(100L);
handles.add(101L);
handles.add(99L);
handles.add(88L);
handles.add(-1L);
handles.add(-255L);
handles.add(-100L);
handles.add(-99L);
handles.add(-98L);
handles.add(Long.MIN_VALUE);
handles.add(8960L);
handles.add(8959L);
handles.add(19999L);
handles.add(15001L);
MockRegionManager mgr =
new MockRegionManager(
ImmutableList.of(
keyRangeByHandle(tableId, null, Status.EQUAL, -100L, Status.EQUAL),
keyRangeByHandle(tableId, -100L, Status.EQUAL, 10L, Status.GREATER),
keyRangeByHandle(tableId, 10L, Status.GREATER, 50L, Status.EQUAL),
keyRangeByHandle(tableId, 50L, Status.EQUAL, 100L, Status.GREATER),
keyRangeByHandle(tableId, 100L, Status.GREATER, 9000L, Status.LESS),
keyRangeByHandle(tableId, 0x2300L /*8960*/, Status.LESS, 16000L, Status.EQUAL),
keyRangeByHandle(tableId, 16000L, Status.EQUAL, null, Status.EQUAL)));
RangeSplitter s = RangeSplitter.newSplitter(mgr);
List<RangeSplitter.RegionTask> tasks =
new ArrayList<>(
s.splitAndSortHandlesByRegion(
ImmutableList.of(tableId),
new TLongArrayList(handles.stream().mapToLong(t -> t).toArray())));
tasks.sort(
(l, r) -> {
Long regionIdLeft = l.getRegion().getId();
Long regionIdRight = r.getRegion().getId();
return regionIdLeft.compareTo(regionIdRight);
});
// [-INF, -100): [Long.MIN_VALUE, Long.MIN_VALUE + 1), [-255, -254)
assertEquals(tasks.get(0).getRegion().getId(), 0);
assertEquals(tasks.get(0).getRanges().size(), 2);
assertEquals(
tasks.get(0).getRanges().get(0),
keyRangeByHandle(tableId, Long.MIN_VALUE, Long.MIN_VALUE + 1));
assertEquals(tasks.get(0).getRanges().get(1), keyRangeByHandle(tableId, -255L, -254L));
// [-100, 10.x): [-100, -97), [-1, 0), [1, 6), [10, 11)
assertEquals(tasks.get(1).getRegion().getId(), 1);
assertEquals(tasks.get(1).getRanges().size(), 4);
assertEquals(tasks.get(1).getRanges().get(0), keyRangeByHandle(tableId, -100L, -97L));
assertEquals(tasks.get(1).getRanges().get(1), keyRangeByHandle(tableId, -1L, 0L));
assertEquals(tasks.get(1).getRanges().get(2), keyRangeByHandle(tableId, 1L, 6L));
assertEquals(tasks.get(1).getRanges().get(3), keyRangeByHandle(tableId, 10L, 11L));
// [10.x, 50): empty
// [50, 100.x): [88, 89) [99, 101)
assertEquals(tasks.get(2).getRegion().getId(), 3);
assertEquals(tasks.get(2).getRanges().size(), 2);
assertEquals(tasks.get(2).getRanges().get(0), keyRangeByHandle(tableId, 88L, 89L));
assertEquals(tasks.get(2).getRanges().get(1), keyRangeByHandle(tableId, 99L, 101L));
// [100.x, less than 8960): [101, 102) [8959, 8960)
assertEquals(tasks.get(3).getRegion().getId(), 4);
assertEquals(tasks.get(3).getRanges().size(), 2);
assertEquals(tasks.get(3).getRanges().get(0), keyRangeByHandle(tableId, 101L, 102L));
assertEquals(tasks.get(3).getRanges().get(1), keyRangeByHandle(tableId, 8959L, 8960L));
// [less than 8960, 16000): [9000, 9001), [15001, 15002)
assertEquals(tasks.get(4).getRegion().getId(), 5);
assertEquals(tasks.get(4).getRanges().size(), 2);
assertEquals(tasks.get(4).getRanges().get(0), keyRangeByHandle(tableId, 8960L, 8961L));
assertEquals(tasks.get(4).getRanges().get(1), keyRangeByHandle(tableId, 15001L, 15002L));
// [16000, INF): [19999, 20000)
assertEquals(tasks.get(5).getRegion().getId(), 6);
assertEquals(tasks.get(5).getRanges().size(), 1);
assertEquals(tasks.get(5).getRanges().get(0), keyRangeByHandle(tableId, 19999L, 20000L));
}
@Test
public void groupByAndSortHandlesByRegionIdTest() {
final long tableId = 1;
List<Long> handles = new ArrayList<>();
handles.add(1L);
handles.add(5L);
handles.add(4L);
handles.add(3L);
handles.add(10L);
handles.add(11L);
handles.add(12L);
handles.add(2L);
handles.add(100L);
handles.add(101L);
handles.add(99L);
handles.add(88L);
handles.add(-1L);
handles.add(-255L);
handles.add(-100L);
handles.add(-99L);
handles.add(-98L);
handles.add(Long.MIN_VALUE);
handles.add(8960L);
handles.add(8959L);
handles.add(19999L);
handles.add(15001L);
handles.add(99999999999L);
handles.add(Long.MAX_VALUE);
MockRegionManager mgr =
new MockRegionManager(
ImmutableList.of(
keyRangeByHandle(tableId, null, Status.EQUAL, -100L, Status.EQUAL),
keyRangeByHandle(tableId, -100L, Status.EQUAL, 10L, Status.GREATER),
keyRangeByHandle(tableId, 10L, Status.GREATER, 50L, Status.EQUAL),
keyRangeByHandle(tableId, 50L, Status.EQUAL, 100L, Status.GREATER),
keyRangeByHandle(tableId, 100L, Status.GREATER, 9000L, Status.LESS),
keyRangeByHandle(tableId, 0x2300L /*8960*/, Status.LESS, 16000L, Status.EQUAL),
keyRangeByHandle(tableId, 16000L, Status.EQUAL, null, Status.EQUAL)));
TLongObjectHashMap<TLongArrayList> result = new TLongObjectHashMap<>();
RangeSplitter.newSplitter(mgr)
.groupByAndSortHandlesByRegionId(
tableId, new TLongArrayList(handles.stream().mapToLong(t -> t).toArray()))
.forEach((k, v) -> result.put(k.first.getId(), v));
assertEquals(2, result.get(0).size());
assertEquals(10, result.get(1).size());
assertEquals(2, result.get(2).size());
assertEquals(3, result.get(3).size());
assertEquals(2, result.get(4).size());
assertEquals(2, result.get(5).size());
assertEquals(3, result.get(6).size());
}
}