mirror of https://github.com/tikv/client-java.git
refactor follower read (#126)
Signed-off-by: marsishandsome <marsishandsome@gmail.com>
This commit is contained in:
parent
f24dce7a14
commit
e84f2f819e
|
|
@ -61,13 +61,13 @@ def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPul
|
||||||
killall -9 pd-server || true
|
killall -9 pd-server || true
|
||||||
killall -9 java || true
|
killall -9 java || true
|
||||||
sleep 10
|
sleep 10
|
||||||
bin/pd-server --name=pd --data-dir=pd --config=../.ci/config/pd.toml &>pd.log &
|
bin/pd-server --name=pd --data-dir=pd --config=../config/pd.toml &>pd.log &
|
||||||
sleep 10
|
sleep 10
|
||||||
bin/tikv-server --pd=127.0.0.1:2379 -s tikv --addr=0.0.0.0:20160 --advertise-addr=127.0.0.1:20160 --config=../.ci/config/tikv.toml &>tikv.log &
|
bin/tikv-server --pd=127.0.0.1:2379 -s tikv --addr=0.0.0.0:20160 --advertise-addr=127.0.0.1:20160 --config=../config/tikv.toml &>tikv.log &
|
||||||
sleep 10
|
sleep 10
|
||||||
ps aux | grep '-server' || true
|
ps aux | grep '-server' || true
|
||||||
curl -s 127.0.0.1:2379/pd/api/v1/status || true
|
curl -s 127.0.0.1:2379/pd/api/v1/status || true
|
||||||
bin/tidb-server --store=tikv --path="127.0.0.1:2379" --config=../.ci/config/tidb.toml &>tidb.log &
|
bin/tidb-server --store=tikv --path="127.0.0.1:2379" --config=../config/tidb.toml &>tidb.log &
|
||||||
sleep 60
|
sleep 60
|
||||||
"""
|
"""
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,4 @@
|
||||||
|
# PD Configuration.
|
||||||
|
[replication]
|
||||||
|
enable-placement-rules = true
|
||||||
|
max-replicas = 1
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
# TiDB Configuration.
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
# TiKV Configuration.
|
||||||
|
|
||||||
|
[raftstore]
|
||||||
|
# set store capacity, if no set, use disk capacity.
|
||||||
|
capacity = "8G"
|
||||||
|
|
@ -42,7 +42,7 @@ public class ConfigUtils {
|
||||||
public static final String TIKV_KV_CLIENT_CONCURRENCY = "tikv.kv_client_concurrency";
|
public static final String TIKV_KV_CLIENT_CONCURRENCY = "tikv.kv_client_concurrency";
|
||||||
|
|
||||||
public static final String TIKV_KV_MODE = "tikv.kv_mode";
|
public static final String TIKV_KV_MODE = "tikv.kv_mode";
|
||||||
public static final String TIKV_IS_REPLICA_READ = "tikv.is_replica_read";
|
public static final String TIKV_REPLICA_READ = "tikv.replica_read";
|
||||||
|
|
||||||
public static final String TIKV_METRICS_ENABLE = "tikv.metrics.enable";
|
public static final String TIKV_METRICS_ENABLE = "tikv.metrics.enable";
|
||||||
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";
|
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";
|
||||||
|
|
@ -72,7 +72,7 @@ public class ConfigUtils {
|
||||||
public static final String DEF_DB_PREFIX = "";
|
public static final String DEF_DB_PREFIX = "";
|
||||||
public static final int DEF_KV_CLIENT_CONCURRENCY = 10;
|
public static final int DEF_KV_CLIENT_CONCURRENCY = 10;
|
||||||
public static final TiConfiguration.KVMode DEF_KV_MODE = TiConfiguration.KVMode.TXN;
|
public static final TiConfiguration.KVMode DEF_KV_MODE = TiConfiguration.KVMode.TXN;
|
||||||
public static final boolean DEF_IS_REPLICA_READ = false;
|
public static final String DEF_REPLICA_READ = "LEADER";
|
||||||
public static final boolean DEF_METRICS_ENABLE = false;
|
public static final boolean DEF_METRICS_ENABLE = false;
|
||||||
public static final int DEF_METRICS_PORT = 3140;
|
public static final int DEF_METRICS_PORT = 3140;
|
||||||
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
|
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
|
||||||
|
|
@ -86,4 +86,8 @@ public class ConfigUtils {
|
||||||
|
|
||||||
public static final String RAW_KV_MODE = "RAW";
|
public static final String RAW_KV_MODE = "RAW";
|
||||||
public static final String TXN_KV_MODE = "TXN";
|
public static final String TXN_KV_MODE = "TXN";
|
||||||
|
|
||||||
|
public static final String LEADER = "LEADER";
|
||||||
|
public static final String FOLLOWER = "FOLLOWER";
|
||||||
|
public static final String LEADER_AND_FOLLOWER = "LEADER_AND_FOLLOWER";
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -244,7 +244,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
||||||
conf.getIsolationLevel(),
|
conf.getIsolationLevel(),
|
||||||
conf.getCommandPriority(),
|
conf.getCommandPriority(),
|
||||||
conf.getKvMode(),
|
conf.getKvMode(),
|
||||||
conf.isReplicaRead());
|
conf.getReplicaRead());
|
||||||
} finally {
|
} finally {
|
||||||
requestTimer.observeDuration();
|
requestTimer.observeDuration();
|
||||||
}
|
}
|
||||||
|
|
@ -261,7 +261,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
||||||
conf.getIsolationLevel(),
|
conf.getIsolationLevel(),
|
||||||
conf.getCommandPriority(),
|
conf.getCommandPriority(),
|
||||||
conf.getKvMode(),
|
conf.getKvMode(),
|
||||||
conf.isReplicaRead()));
|
conf.getReplicaRead()));
|
||||||
Supplier<GetRegionRequest> request =
|
Supplier<GetRegionRequest> request =
|
||||||
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
|
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
|
||||||
|
|
||||||
|
|
@ -288,7 +288,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
||||||
conf.getIsolationLevel(),
|
conf.getIsolationLevel(),
|
||||||
conf.getCommandPriority(),
|
conf.getCommandPriority(),
|
||||||
conf.getKvMode(),
|
conf.getKvMode(),
|
||||||
conf.isReplicaRead());
|
conf.getReplicaRead());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -302,7 +302,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
||||||
conf.getIsolationLevel(),
|
conf.getIsolationLevel(),
|
||||||
conf.getCommandPriority(),
|
conf.getCommandPriority(),
|
||||||
conf.getKvMode(),
|
conf.getKvMode(),
|
||||||
conf.isReplicaRead()));
|
conf.getReplicaRead()));
|
||||||
|
|
||||||
Supplier<GetRegionByIDRequest> request =
|
Supplier<GetRegionByIDRequest> request =
|
||||||
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
|
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
|
||||||
|
|
@ -361,8 +361,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isReplicaRead() {
|
public TiConfiguration.ReplicaRead getReplicaRead() {
|
||||||
return conf.isReplicaRead();
|
return conf.getReplicaRead();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -66,5 +66,5 @@ public interface ReadOnlyPDClient {
|
||||||
|
|
||||||
List<Store> getAllStores(BackOffer backOffer);
|
List<Store> getAllStores(BackOffer backOffer);
|
||||||
|
|
||||||
boolean isReplicaRead();
|
TiConfiguration.ReplicaRead getReplicaRead();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,7 @@ public class TiConfiguration implements Serializable {
|
||||||
setIfMissing(TIKV_DB_PREFIX, DEF_DB_PREFIX);
|
setIfMissing(TIKV_DB_PREFIX, DEF_DB_PREFIX);
|
||||||
setIfMissing(TIKV_KV_CLIENT_CONCURRENCY, DEF_KV_CLIENT_CONCURRENCY);
|
setIfMissing(TIKV_KV_CLIENT_CONCURRENCY, DEF_KV_CLIENT_CONCURRENCY);
|
||||||
setIfMissing(TIKV_KV_MODE, TXN_KV_MODE);
|
setIfMissing(TIKV_KV_MODE, TXN_KV_MODE);
|
||||||
setIfMissing(TIKV_IS_REPLICA_READ, DEF_IS_REPLICA_READ);
|
setIfMissing(TIKV_REPLICA_READ, DEF_REPLICA_READ);
|
||||||
setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE);
|
setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE);
|
||||||
setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT);
|
setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT);
|
||||||
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
|
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
|
||||||
|
|
@ -216,6 +216,17 @@ public class TiConfiguration implements Serializable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ReplicaRead getReplicaRead(String key) {
|
||||||
|
String value = get(key).toUpperCase(Locale.ROOT);
|
||||||
|
if (FOLLOWER.equals(value)) {
|
||||||
|
return ReplicaRead.FOLLOWER;
|
||||||
|
} else if (LEADER_AND_FOLLOWER.equals(value)) {
|
||||||
|
return ReplicaRead.LEADER_AND_FOLLOWER;
|
||||||
|
} else {
|
||||||
|
return ReplicaRead.LEADER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT);
|
private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT);
|
||||||
private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT);
|
private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT);
|
||||||
private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE);
|
private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE);
|
||||||
|
|
@ -235,7 +246,7 @@ public class TiConfiguration implements Serializable {
|
||||||
private KVMode kvMode = getKvMode(TIKV_KV_MODE);
|
private KVMode kvMode = getKvMode(TIKV_KV_MODE);
|
||||||
|
|
||||||
private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY);
|
private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY);
|
||||||
private boolean isReplicaRead = getBoolean(TIKV_IS_REPLICA_READ);
|
private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ);
|
||||||
|
|
||||||
private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
|
private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
|
||||||
private int metricsPort = getInt(TIKV_METRICS_PORT);
|
private int metricsPort = getInt(TIKV_METRICS_PORT);
|
||||||
|
|
@ -247,6 +258,12 @@ public class TiConfiguration implements Serializable {
|
||||||
RAW
|
RAW
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public enum ReplicaRead {
|
||||||
|
LEADER,
|
||||||
|
FOLLOWER,
|
||||||
|
LEADER_AND_FOLLOWER
|
||||||
|
}
|
||||||
|
|
||||||
public static TiConfiguration createDefault() {
|
public static TiConfiguration createDefault() {
|
||||||
return new TiConfiguration();
|
return new TiConfiguration();
|
||||||
}
|
}
|
||||||
|
|
@ -457,12 +474,12 @@ public class TiConfiguration implements Serializable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isReplicaRead() {
|
public ReplicaRead getReplicaRead() {
|
||||||
return isReplicaRead;
|
return replicaRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TiConfiguration setReplicaRead(boolean isReplicaRead) {
|
public TiConfiguration setReplicaRead(ReplicaRead replicaRead) {
|
||||||
this.isReplicaRead = isReplicaRead;
|
this.replicaRead = replicaRead;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,6 @@ public class RegionManager {
|
||||||
// TODO: the region cache logic need rewrite.
|
// TODO: the region cache logic need rewrite.
|
||||||
// https://github.com/pingcap/tispark/issues/1170
|
// https://github.com/pingcap/tispark/issues/1170
|
||||||
private final RegionCache cache;
|
private final RegionCache cache;
|
||||||
private final boolean isReplicaRead;
|
|
||||||
|
|
||||||
private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;
|
private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;
|
||||||
|
|
||||||
|
|
@ -65,13 +64,11 @@ public class RegionManager {
|
||||||
public RegionManager(
|
public RegionManager(
|
||||||
ReadOnlyPDClient pdClient, Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
|
ReadOnlyPDClient pdClient, Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
|
||||||
this.cache = new RegionCache(pdClient);
|
this.cache = new RegionCache(pdClient);
|
||||||
this.isReplicaRead = pdClient.isReplicaRead();
|
|
||||||
this.cacheInvalidateCallback = cacheInvalidateCallback;
|
this.cacheInvalidateCallback = cacheInvalidateCallback;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegionManager(ReadOnlyPDClient pdClient) {
|
public RegionManager(ReadOnlyPDClient pdClient) {
|
||||||
this.cache = new RegionCache(pdClient);
|
this.cache = new RegionCache(pdClient);
|
||||||
this.isReplicaRead = pdClient.isReplicaRead();
|
|
||||||
this.cacheInvalidateCallback = null;
|
this.cacheInvalidateCallback = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -126,13 +123,8 @@ public class RegionManager {
|
||||||
|
|
||||||
Store store = null;
|
Store store = null;
|
||||||
if (storeType == TiStoreType.TiKV) {
|
if (storeType == TiStoreType.TiKV) {
|
||||||
if (isReplicaRead) {
|
Peer peer = region.getCurrentReplica();
|
||||||
Peer peer = region.getCurrentFollower();
|
store = cache.getStoreById(peer.getStoreId(), backOffer);
|
||||||
store = cache.getStoreById(peer.getStoreId(), backOffer);
|
|
||||||
} else {
|
|
||||||
Peer leader = region.getLeader();
|
|
||||||
store = cache.getStoreById(leader.getStoreId(), backOffer);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
outerLoop:
|
outerLoop:
|
||||||
for (Peer peer : region.getLearnerList()) {
|
for (Peer peer : region.getLearnerList()) {
|
||||||
|
|
|
||||||
|
|
@ -169,7 +169,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<GetRequest> factory =
|
Supplier<GetRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
GetRequest.newBuilder()
|
GetRequest.newBuilder()
|
||||||
.setContext(region.getContext(getResolvedLocks(version)))
|
.setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType))
|
||||||
.setKey(key)
|
.setKey(key)
|
||||||
.setVersion(version)
|
.setVersion(version)
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -214,7 +214,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<BatchGetRequest> request =
|
Supplier<BatchGetRequest> request =
|
||||||
() ->
|
() ->
|
||||||
BatchGetRequest.newBuilder()
|
BatchGetRequest.newBuilder()
|
||||||
.setContext(region.getContext(getResolvedLocks(version)))
|
.setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType))
|
||||||
.addAllKeys(keys)
|
.addAllKeys(keys)
|
||||||
.setVersion(version)
|
.setVersion(version)
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -277,7 +277,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<ScanRequest> request =
|
Supplier<ScanRequest> request =
|
||||||
() ->
|
() ->
|
||||||
ScanRequest.newBuilder()
|
ScanRequest.newBuilder()
|
||||||
.setContext(region.getContext(getResolvedLocks(version)))
|
.setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType))
|
||||||
.setStartKey(startKey)
|
.setStartKey(startKey)
|
||||||
.setVersion(version)
|
.setVersion(version)
|
||||||
.setKeyOnly(keyOnly)
|
.setKeyOnly(keyOnly)
|
||||||
|
|
@ -379,7 +379,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
() ->
|
() ->
|
||||||
getIsV4()
|
getIsV4()
|
||||||
? PrewriteRequest.newBuilder()
|
? PrewriteRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.setStartVersion(startTs)
|
.setStartVersion(startTs)
|
||||||
.setPrimaryLock(primaryLock)
|
.setPrimaryLock(primaryLock)
|
||||||
.addAllMutations(mutations)
|
.addAllMutations(mutations)
|
||||||
|
|
@ -389,7 +389,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
.setTxnSize(16)
|
.setTxnSize(16)
|
||||||
.build()
|
.build()
|
||||||
: PrewriteRequest.newBuilder()
|
: PrewriteRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.setStartVersion(startTs)
|
.setStartVersion(startTs)
|
||||||
.setPrimaryLock(primaryLock)
|
.setPrimaryLock(primaryLock)
|
||||||
.addAllMutations(mutations)
|
.addAllMutations(mutations)
|
||||||
|
|
@ -469,7 +469,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<TxnHeartBeatRequest> factory =
|
Supplier<TxnHeartBeatRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
TxnHeartBeatRequest.newBuilder()
|
TxnHeartBeatRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.setStartVersion(startTs)
|
.setStartVersion(startTs)
|
||||||
.setPrimaryLock(primaryLock)
|
.setPrimaryLock(primaryLock)
|
||||||
.setAdviseLockTtl(ttl)
|
.setAdviseLockTtl(ttl)
|
||||||
|
|
@ -527,7 +527,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
.setStartVersion(startTs)
|
.setStartVersion(startTs)
|
||||||
.setCommitVersion(commitTs)
|
.setCommitVersion(commitTs)
|
||||||
.addAllKeys(keys)
|
.addAllKeys(keys)
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.build();
|
.build();
|
||||||
KVErrorHandler<CommitResponse> handler =
|
KVErrorHandler<CommitResponse> handler =
|
||||||
new KVErrorHandler<>(
|
new KVErrorHandler<>(
|
||||||
|
|
@ -588,7 +588,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<Coprocessor.Request> reqToSend =
|
Supplier<Coprocessor.Request> reqToSend =
|
||||||
() ->
|
() ->
|
||||||
Coprocessor.Request.newBuilder()
|
Coprocessor.Request.newBuilder()
|
||||||
.setContext(region.getContext(getResolvedLocks(startTs)))
|
.setContext(region.getReplicaContext(getResolvedLocks(startTs), this.storeType))
|
||||||
.setTp(REQ_TYPE_DAG.getValue())
|
.setTp(REQ_TYPE_DAG.getValue())
|
||||||
.setStartTs(startTs)
|
.setStartTs(startTs)
|
||||||
.setData(req.toByteString())
|
.setData(req.toByteString())
|
||||||
|
|
@ -711,7 +711,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<Coprocessor.Request> reqToSend =
|
Supplier<Coprocessor.Request> reqToSend =
|
||||||
() ->
|
() ->
|
||||||
Coprocessor.Request.newBuilder()
|
Coprocessor.Request.newBuilder()
|
||||||
.setContext(region.getContext(getResolvedLocks(startTs)))
|
.setContext(region.getReplicaContext(getResolvedLocks(startTs), this.storeType))
|
||||||
// TODO: If no executors...?
|
// TODO: If no executors...?
|
||||||
.setTp(REQ_TYPE_DAG.getValue())
|
.setTp(REQ_TYPE_DAG.getValue())
|
||||||
.setData(req.toByteString())
|
.setData(req.toByteString())
|
||||||
|
|
@ -749,7 +749,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<SplitRegionRequest> request =
|
Supplier<SplitRegionRequest> request =
|
||||||
() ->
|
() ->
|
||||||
SplitRegionRequest.newBuilder()
|
SplitRegionRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.addAllSplitKeys(splitKeys)
|
.addAllSplitKeys(splitKeys)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
@ -790,7 +790,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
conf.getIsolationLevel(),
|
conf.getIsolationLevel(),
|
||||||
conf.getCommandPriority(),
|
conf.getCommandPriority(),
|
||||||
conf.getKvMode(),
|
conf.getKvMode(),
|
||||||
conf.isReplicaRead()))
|
conf.getReplicaRead()))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -801,7 +801,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer();
|
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer();
|
||||||
try {
|
try {
|
||||||
Supplier<RawGetRequest> factory =
|
Supplier<RawGetRequest> factory =
|
||||||
() -> RawGetRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
|
() ->
|
||||||
|
RawGetRequest.newBuilder()
|
||||||
|
.setContext(region.getReplicaContext(storeType))
|
||||||
|
.setKey(key)
|
||||||
|
.build();
|
||||||
KVErrorHandler<RawGetResponse> handler =
|
KVErrorHandler<RawGetResponse> handler =
|
||||||
new KVErrorHandler<>(
|
new KVErrorHandler<>(
|
||||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||||
|
|
@ -833,7 +837,10 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
try {
|
try {
|
||||||
Supplier<RawGetKeyTTLRequest> factory =
|
Supplier<RawGetKeyTTLRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
RawGetKeyTTLRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
|
RawGetKeyTTLRequest.newBuilder()
|
||||||
|
.setContext(region.getReplicaContext(storeType))
|
||||||
|
.setKey(key)
|
||||||
|
.build();
|
||||||
KVErrorHandler<RawGetKeyTTLResponse> handler =
|
KVErrorHandler<RawGetKeyTTLResponse> handler =
|
||||||
new KVErrorHandler<>(
|
new KVErrorHandler<>(
|
||||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||||
|
|
@ -868,7 +875,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete").startTimer();
|
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete").startTimer();
|
||||||
try {
|
try {
|
||||||
Supplier<RawDeleteRequest> factory =
|
Supplier<RawDeleteRequest> factory =
|
||||||
() -> RawDeleteRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
|
() ->
|
||||||
|
RawDeleteRequest.newBuilder()
|
||||||
|
.setContext(region.getReplicaContext(storeType))
|
||||||
|
.setKey(key)
|
||||||
|
.build();
|
||||||
|
|
||||||
KVErrorHandler<RawDeleteResponse> handler =
|
KVErrorHandler<RawDeleteResponse> handler =
|
||||||
new KVErrorHandler<>(
|
new KVErrorHandler<>(
|
||||||
|
|
@ -902,7 +913,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<RawPutRequest> factory =
|
Supplier<RawPutRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
RawPutRequest.newBuilder()
|
RawPutRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.setKey(key)
|
.setKey(key)
|
||||||
.setValue(value)
|
.setValue(value)
|
||||||
.setTtl(ttl)
|
.setTtl(ttl)
|
||||||
|
|
@ -940,7 +951,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<RawCASRequest> factory =
|
Supplier<RawCASRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
RawCASRequest.newBuilder()
|
RawCASRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.setKey(key)
|
.setKey(key)
|
||||||
.setValue(value)
|
.setValue(value)
|
||||||
.setPreviousNotExist(true)
|
.setPreviousNotExist(true)
|
||||||
|
|
@ -986,7 +997,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<RawBatchGetRequest> factory =
|
Supplier<RawBatchGetRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
RawBatchGetRequest.newBuilder()
|
RawBatchGetRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.addAllKeys(keys)
|
.addAllKeys(keys)
|
||||||
.build();
|
.build();
|
||||||
KVErrorHandler<RawBatchGetResponse> handler =
|
KVErrorHandler<RawBatchGetResponse> handler =
|
||||||
|
|
@ -1021,7 +1032,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<RawBatchPutRequest> factory =
|
Supplier<RawBatchPutRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
RawBatchPutRequest.newBuilder()
|
RawBatchPutRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.addAllPairs(kvPairs)
|
.addAllPairs(kvPairs)
|
||||||
.setTtl(ttl)
|
.setTtl(ttl)
|
||||||
.setForCas(atomic)
|
.setForCas(atomic)
|
||||||
|
|
@ -1073,7 +1084,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<RawBatchDeleteRequest> factory =
|
Supplier<RawBatchDeleteRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
RawBatchDeleteRequest.newBuilder()
|
RawBatchDeleteRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.addAllKeys(keys)
|
.addAllKeys(keys)
|
||||||
.setForCas(atomic)
|
.setForCas(atomic)
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -1118,7 +1129,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<RawScanRequest> factory =
|
Supplier<RawScanRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
RawScanRequest.newBuilder()
|
RawScanRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.setStartKey(key)
|
.setStartKey(key)
|
||||||
.setKeyOnly(keyOnly)
|
.setKeyOnly(keyOnly)
|
||||||
.setLimit(limit)
|
.setLimit(limit)
|
||||||
|
|
@ -1164,7 +1175,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Supplier<RawDeleteRangeRequest> factory =
|
Supplier<RawDeleteRangeRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
RawDeleteRangeRequest.newBuilder()
|
RawDeleteRangeRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getReplicaContext(storeType))
|
||||||
.setStartKey(startKey)
|
.setStartKey(startKey)
|
||||||
.setEndKey(endKey)
|
.setEndKey(endKey)
|
||||||
.build();
|
.build();
|
||||||
|
|
|
||||||
|
|
@ -20,10 +20,13 @@ package org.tikv.common.region;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.tikv.common.TiConfiguration;
|
||||||
import org.tikv.common.TiConfiguration.KVMode;
|
import org.tikv.common.TiConfiguration.KVMode;
|
||||||
import org.tikv.common.codec.Codec.BytesCodec;
|
import org.tikv.common.codec.Codec.BytesCodec;
|
||||||
import org.tikv.common.codec.CodecDataInput;
|
import org.tikv.common.codec.CodecDataInput;
|
||||||
|
|
@ -39,12 +42,16 @@ import org.tikv.kvproto.Metapb.Peer;
|
||||||
import org.tikv.kvproto.Metapb.Region;
|
import org.tikv.kvproto.Metapb.Region;
|
||||||
|
|
||||||
public class TiRegion implements Serializable {
|
public class TiRegion implements Serializable {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(TiRegion.class);
|
||||||
|
|
||||||
private final Region meta;
|
private final Region meta;
|
||||||
|
private final KVMode kvMode;
|
||||||
private final IsolationLevel isolationLevel;
|
private final IsolationLevel isolationLevel;
|
||||||
private final Kvrpcpb.CommandPri commandPri;
|
private final Kvrpcpb.CommandPri commandPri;
|
||||||
private final Peer leader;
|
private final Peer leader;
|
||||||
private int followerIdx = 0;
|
private final TiConfiguration.ReplicaRead replicaRead;
|
||||||
private final boolean isReplicaRead;
|
private final List<Peer> replicaList;
|
||||||
|
private int replicaIdx;
|
||||||
|
|
||||||
public TiRegion(
|
public TiRegion(
|
||||||
Region meta,
|
Region meta,
|
||||||
|
|
@ -52,20 +59,7 @@ public class TiRegion implements Serializable {
|
||||||
IsolationLevel isolationLevel,
|
IsolationLevel isolationLevel,
|
||||||
Kvrpcpb.CommandPri commandPri,
|
Kvrpcpb.CommandPri commandPri,
|
||||||
KVMode kvMode) {
|
KVMode kvMode) {
|
||||||
this(meta, leader, isolationLevel, commandPri, kvMode, false);
|
this(meta, leader, isolationLevel, commandPri, kvMode, TiConfiguration.ReplicaRead.LEADER);
|
||||||
}
|
|
||||||
|
|
||||||
private TiRegion(
|
|
||||||
Region meta,
|
|
||||||
Peer leader,
|
|
||||||
IsolationLevel isolationLevel,
|
|
||||||
Kvrpcpb.CommandPri commandPri,
|
|
||||||
boolean isReplicaRead) {
|
|
||||||
this.meta = meta;
|
|
||||||
this.leader = leader;
|
|
||||||
this.isolationLevel = isolationLevel;
|
|
||||||
this.commandPri = commandPri;
|
|
||||||
this.isReplicaRead = isReplicaRead;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public TiRegion(
|
public TiRegion(
|
||||||
|
|
@ -74,9 +68,13 @@ public class TiRegion implements Serializable {
|
||||||
IsolationLevel isolationLevel,
|
IsolationLevel isolationLevel,
|
||||||
Kvrpcpb.CommandPri commandPri,
|
Kvrpcpb.CommandPri commandPri,
|
||||||
KVMode kvMode,
|
KVMode kvMode,
|
||||||
boolean isReplicaRead) {
|
TiConfiguration.ReplicaRead replicaRead) {
|
||||||
Objects.requireNonNull(meta, "meta is null");
|
Objects.requireNonNull(meta, "meta is null");
|
||||||
this.meta = decodeRegion(meta, kvMode == KVMode.RAW);
|
this.meta = decodeRegion(meta, kvMode == KVMode.RAW);
|
||||||
|
this.kvMode = kvMode;
|
||||||
|
this.isolationLevel = isolationLevel;
|
||||||
|
this.commandPri = commandPri;
|
||||||
|
this.replicaRead = replicaRead;
|
||||||
if (leader == null || leader.getId() == 0) {
|
if (leader == null || leader.getId() == 0) {
|
||||||
if (meta.getPeersCount() == 0) {
|
if (meta.getPeersCount() == 0) {
|
||||||
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
|
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
|
||||||
|
|
@ -86,17 +84,21 @@ public class TiRegion implements Serializable {
|
||||||
} else {
|
} else {
|
||||||
this.leader = leader;
|
this.leader = leader;
|
||||||
}
|
}
|
||||||
if (isReplicaRead && meta.getPeersCount() > 0) {
|
|
||||||
// try to get first follower
|
// init replicaList
|
||||||
try {
|
List<Peer> followerList = getFollowerList();
|
||||||
chooseRandomFollower();
|
replicaList = new ArrayList<>();
|
||||||
} catch (Exception ignore) {
|
if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) {
|
||||||
// ignore
|
replicaList.add(this.leader);
|
||||||
}
|
} else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) {
|
||||||
|
replicaList.addAll(followerList);
|
||||||
|
Collections.shuffle(replicaList);
|
||||||
|
} else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) {
|
||||||
|
replicaList.addAll(followerList);
|
||||||
|
Collections.shuffle(replicaList);
|
||||||
|
replicaList.add(this.leader);
|
||||||
}
|
}
|
||||||
this.isolationLevel = isolationLevel;
|
replicaIdx = 0;
|
||||||
this.commandPri = commandPri;
|
|
||||||
this.isReplicaRead = isReplicaRead;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Region decodeRegion(Region region, boolean isRawRegion) {
|
private Region decodeRegion(Region region, boolean isRawRegion) {
|
||||||
|
|
@ -127,36 +129,41 @@ public class TiRegion implements Serializable {
|
||||||
return leader;
|
return leader;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Peer getCurrentFollower() {
|
public List<Peer> getFollowerList() {
|
||||||
return meta.getPeers(followerIdx);
|
List<Peer> peers = new ArrayList<>();
|
||||||
}
|
for (Peer peer : getMeta().getPeersList()) {
|
||||||
|
if (!peer.equals(this.leader)) {
|
||||||
private boolean isValidFollower(Peer peer) {
|
if (peer.getRole().equals(Metapb.PeerRole.Voter)) {
|
||||||
return Metapb.PeerRole.valueOf(peer.getRole().getValueDescriptor()) == Metapb.PeerRole.Voter;
|
peers.add(peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void chooseRandomFollower() {
|
|
||||||
int cnt = meta.getPeersCount();
|
|
||||||
followerIdx = new Random().nextInt(cnt);
|
|
||||||
for (int retry = cnt - 1; retry > 0; retry--) {
|
|
||||||
followerIdx = (followerIdx + 1) % cnt;
|
|
||||||
Peer cur = meta.getPeers(followerIdx);
|
|
||||||
if (isValidFollower(cur)) {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return peers;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Peer> getLearnerList() {
|
public List<Peer> getLearnerList() {
|
||||||
List<Peer> peers = new ArrayList<>();
|
List<Peer> peers = new ArrayList<>();
|
||||||
for (Peer peer : getMeta().getPeersList()) {
|
for (Peer peer : getMeta().getPeersList()) {
|
||||||
if (isValidFollower(peer)) {
|
if (peer.getRole().equals(Metapb.PeerRole.Learner)) {
|
||||||
peers.add(peer);
|
peers.add(peer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return peers;
|
return peers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Peer getCurrentReplica() {
|
||||||
|
return replicaList.get(replicaIdx);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Peer getNextReplica() {
|
||||||
|
replicaIdx = (replicaIdx + 1) % replicaList.size();
|
||||||
|
return getCurrentReplica();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isLeader(Peer peer) {
|
||||||
|
return getLeader().equals(peer);
|
||||||
|
}
|
||||||
|
|
||||||
public long getId() {
|
public long getId() {
|
||||||
return this.meta.getId();
|
return this.meta.getId();
|
||||||
}
|
}
|
||||||
|
|
@ -177,26 +184,30 @@ public class TiRegion implements Serializable {
|
||||||
return Key.toRawKey(getEndKey());
|
return Key.toRawKey(getEndKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
public Kvrpcpb.Context getContext() {
|
public Kvrpcpb.Context getLeaderContext() {
|
||||||
return getContext(java.util.Collections.emptySet());
|
return getContext(this.leader, java.util.Collections.emptySet(), TiStoreType.TiKV);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Kvrpcpb.Context getContext(Set<Long> resolvedLocks) {
|
public Kvrpcpb.Context getReplicaContext(TiStoreType storeType) {
|
||||||
|
return getContext(getCurrentReplica(), java.util.Collections.emptySet(), storeType);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Kvrpcpb.Context getReplicaContext(Set<Long> resolvedLocks, TiStoreType storeType) {
|
||||||
|
return getContext(getCurrentReplica(), resolvedLocks, storeType);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Kvrpcpb.Context getContext(
|
||||||
|
Peer currentPeer, Set<Long> resolvedLocks, TiStoreType storeType) {
|
||||||
|
boolean replicaRead = !isLeader(getCurrentReplica()) && TiStoreType.TiKV.equals(storeType);
|
||||||
|
|
||||||
Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder();
|
Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder();
|
||||||
builder.setIsolationLevel(this.isolationLevel);
|
builder
|
||||||
builder.setPriority(this.commandPri);
|
.setIsolationLevel(this.isolationLevel)
|
||||||
if (isReplicaRead) {
|
.setPriority(this.commandPri)
|
||||||
builder
|
.setRegionId(meta.getId())
|
||||||
.setRegionId(meta.getId())
|
.setPeer(currentPeer)
|
||||||
.setPeer(getCurrentFollower())
|
.setReplicaRead(replicaRead)
|
||||||
.setReplicaRead(true)
|
.setRegionEpoch(this.meta.getRegionEpoch());
|
||||||
.setRegionEpoch(this.meta.getRegionEpoch());
|
|
||||||
} else {
|
|
||||||
builder
|
|
||||||
.setRegionId(meta.getId())
|
|
||||||
.setPeer(this.leader)
|
|
||||||
.setRegionEpoch(this.meta.getRegionEpoch());
|
|
||||||
}
|
|
||||||
builder.addAllResolvedLocks(resolvedLocks);
|
builder.addAllResolvedLocks(resolvedLocks);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
@ -218,7 +229,8 @@ public class TiRegion implements Serializable {
|
||||||
List<Peer> peers = meta.getPeersList();
|
List<Peer> peers = meta.getPeersList();
|
||||||
for (Peer p : peers) {
|
for (Peer p : peers) {
|
||||||
if (p.getStoreId() == leaderStoreID) {
|
if (p.getStoreId() == leaderStoreID) {
|
||||||
return new TiRegion(this.meta, p, this.isolationLevel, this.commandPri, this.isReplicaRead);
|
return new TiRegion(
|
||||||
|
this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaRead);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|
|
||||||
|
|
@ -125,7 +125,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
|
||||||
Supplier<CleanupRequest> factory =
|
Supplier<CleanupRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
CleanupRequest.newBuilder()
|
CleanupRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getLeaderContext())
|
||||||
.setKey(primary)
|
.setKey(primary)
|
||||||
.setStartVersion(txnID)
|
.setStartVersion(txnID)
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -232,7 +232,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
|
||||||
factory =
|
factory =
|
||||||
() ->
|
() ->
|
||||||
ResolveLockRequest.newBuilder()
|
ResolveLockRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getLeaderContext())
|
||||||
.setStartVersion(lock.getTxnID())
|
.setStartVersion(lock.getTxnID())
|
||||||
.setCommitVersion(txnStatus)
|
.setCommitVersion(txnStatus)
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -240,7 +240,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
|
||||||
factory =
|
factory =
|
||||||
() ->
|
() ->
|
||||||
ResolveLockRequest.newBuilder()
|
ResolveLockRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getLeaderContext())
|
||||||
.setStartVersion(lock.getTxnID())
|
.setStartVersion(lock.getTxnID())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -151,7 +151,7 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient
|
||||||
|
|
||||||
Kvrpcpb.ResolveLockRequest.Builder builder =
|
Kvrpcpb.ResolveLockRequest.Builder builder =
|
||||||
Kvrpcpb.ResolveLockRequest.newBuilder()
|
Kvrpcpb.ResolveLockRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getLeaderContext())
|
||||||
.setStartVersion(lock.getTxnID());
|
.setStartVersion(lock.getTxnID());
|
||||||
|
|
||||||
if (txnStatus.isCommitted()) {
|
if (txnStatus.isCommitted()) {
|
||||||
|
|
@ -230,7 +230,7 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient
|
||||||
() -> {
|
() -> {
|
||||||
TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
|
TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
|
||||||
return CleanupRequest.newBuilder()
|
return CleanupRequest.newBuilder()
|
||||||
.setContext(primaryKeyRegion.getContext())
|
.setContext(primaryKeyRegion.getLeaderContext())
|
||||||
.setKey(primary)
|
.setKey(primary)
|
||||||
.setStartVersion(txnID)
|
.setStartVersion(txnID)
|
||||||
.setCurrentTs(currentTS)
|
.setCurrentTs(currentTS)
|
||||||
|
|
|
||||||
|
|
@ -169,7 +169,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
|
||||||
Supplier<Kvrpcpb.PessimisticRollbackRequest> factory =
|
Supplier<Kvrpcpb.PessimisticRollbackRequest> factory =
|
||||||
() ->
|
() ->
|
||||||
Kvrpcpb.PessimisticRollbackRequest.newBuilder()
|
Kvrpcpb.PessimisticRollbackRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getLeaderContext())
|
||||||
.setStartVersion(lock.getTxnID())
|
.setStartVersion(lock.getTxnID())
|
||||||
.setForUpdateTs(forUpdateTS)
|
.setForUpdateTs(forUpdateTS)
|
||||||
.addKeys(lock.getKey())
|
.addKeys(lock.getKey())
|
||||||
|
|
@ -287,7 +287,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
|
||||||
() -> {
|
() -> {
|
||||||
TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
|
TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
|
||||||
return Kvrpcpb.CheckTxnStatusRequest.newBuilder()
|
return Kvrpcpb.CheckTxnStatusRequest.newBuilder()
|
||||||
.setContext(primaryKeyRegion.getContext())
|
.setContext(primaryKeyRegion.getLeaderContext())
|
||||||
.setPrimaryKey(primary)
|
.setPrimaryKey(primary)
|
||||||
.setLockTs(txnID)
|
.setLockTs(txnID)
|
||||||
.setCallerStartTs(callerStartTS)
|
.setCallerStartTs(callerStartTS)
|
||||||
|
|
@ -364,7 +364,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
|
||||||
|
|
||||||
Kvrpcpb.ResolveLockRequest.Builder builder =
|
Kvrpcpb.ResolveLockRequest.Builder builder =
|
||||||
Kvrpcpb.ResolveLockRequest.newBuilder()
|
Kvrpcpb.ResolveLockRequest.newBuilder()
|
||||||
.setContext(region.getContext())
|
.setContext(region.getLeaderContext())
|
||||||
.setStartVersion(lock.getTxnID());
|
.setStartVersion(lock.getTxnID());
|
||||||
|
|
||||||
if (txnStatus.isCommitted()) {
|
if (txnStatus.isCommitted()) {
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import java.util.stream.Collectors;
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
@ -91,7 +92,8 @@ public class RawKVClientTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
// tikv-4.0 does not support atomic api
|
||||||
|
@Ignore
|
||||||
public void atomicAPITest() {
|
public void atomicAPITest() {
|
||||||
if (!initialized) return;
|
if (!initialized) return;
|
||||||
long ttl = 10;
|
long ttl = 10;
|
||||||
|
|
@ -112,7 +114,8 @@ public class RawKVClientTest {
|
||||||
assert res3.isEmpty();
|
assert res3.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
// tikv-4.0 doest not support ttl
|
||||||
|
@Ignore
|
||||||
public void getKeyTTLTest() {
|
public void getKeyTTLTest() {
|
||||||
if (!initialized) return;
|
if (!initialized) return;
|
||||||
long ttl = 10;
|
long ttl = 10;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,57 @@
|
||||||
|
package org.tikv.txn;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.tikv.common.TiConfiguration;
|
||||||
|
import org.tikv.common.TiSession;
|
||||||
|
|
||||||
|
public class ReplicaReadTest extends TXNTest {
|
||||||
|
private TiSession session;
|
||||||
|
private String key;
|
||||||
|
private String value;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void leaderReadTest() {
|
||||||
|
doTest(TiConfiguration.ReplicaRead.LEADER);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ci only has one TiKV instance
|
||||||
|
@Ignore
|
||||||
|
public void followerReadTest() {
|
||||||
|
doTest(TiConfiguration.ReplicaRead.FOLLOWER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void leadAndFollowerReadTest() {
|
||||||
|
doTest(TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTest(TiConfiguration.ReplicaRead replicaRead) {
|
||||||
|
TiConfiguration conf = TiConfiguration.createDefault();
|
||||||
|
conf.setReplicaRead(replicaRead);
|
||||||
|
session = TiSession.create(conf);
|
||||||
|
|
||||||
|
putKV(key, value);
|
||||||
|
ByteString v = session.createSnapshot().get(ByteString.copyFromUtf8(key));
|
||||||
|
Assert.assertEquals(value, v.toStringUtf8());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
super.setUp();
|
||||||
|
key = genRandomKey(64);
|
||||||
|
value = "v0";
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (session != null) {
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,131 @@
|
||||||
|
package org.tikv.txn;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.assertTrue;
|
||||||
|
import static junit.framework.TestCase.fail;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.tikv.common.TiConfiguration;
|
||||||
|
import org.tikv.common.TiSession;
|
||||||
|
import org.tikv.common.exception.RegionException;
|
||||||
|
import org.tikv.common.region.RegionStoreClient;
|
||||||
|
import org.tikv.common.region.TiRegion;
|
||||||
|
import org.tikv.common.util.BackOffFunction;
|
||||||
|
import org.tikv.common.util.BackOffer;
|
||||||
|
import org.tikv.common.util.ConcreteBackOffer;
|
||||||
|
import org.tikv.kvproto.Kvrpcpb;
|
||||||
|
|
||||||
|
public class TXNTest {
|
||||||
|
static final int DEFAULT_TTL = 10;
|
||||||
|
private TiSession session;
|
||||||
|
RegionStoreClient.RegionStoreClientBuilder builder;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
TiConfiguration conf = TiConfiguration.createDefault();
|
||||||
|
try {
|
||||||
|
session = TiSession.create(conf);
|
||||||
|
this.builder = session.getRegionStoreClientBuilder();
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("TiDB cluster may not be present");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (session != null) {
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void putKV(String key, String value) {
|
||||||
|
long startTS = session.getTimestamp().getVersion();
|
||||||
|
long commitTS = session.getTimestamp().getVersion();
|
||||||
|
putKV(key, value, startTS, commitTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
void putKV(String key, String value, long startTS, long commitTS) {
|
||||||
|
Kvrpcpb.Mutation m =
|
||||||
|
Kvrpcpb.Mutation.newBuilder()
|
||||||
|
.setKey(ByteString.copyFromUtf8(key))
|
||||||
|
.setOp(Kvrpcpb.Op.Put)
|
||||||
|
.setValue(ByteString.copyFromUtf8(value))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
boolean res = prewriteString(Collections.singletonList(m), startTS, key, DEFAULT_TTL);
|
||||||
|
assertTrue(res);
|
||||||
|
res = commitString(Collections.singletonList(key), startTS, commitTS);
|
||||||
|
assertTrue(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean prewriteString(List<Kvrpcpb.Mutation> mutations, long startTS, String primary, long ttl) {
|
||||||
|
return prewrite(mutations, startTS, ByteString.copyFromUtf8(primary), ttl);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean prewrite(List<Kvrpcpb.Mutation> mutations, long startTS, ByteString primary, long ttl) {
|
||||||
|
if (mutations.size() == 0) return true;
|
||||||
|
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000);
|
||||||
|
|
||||||
|
for (Kvrpcpb.Mutation m : mutations) {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
TiRegion region = session.getRegionManager().getRegionByKey(m.getKey());
|
||||||
|
RegionStoreClient client = builder.build(region);
|
||||||
|
client.prewrite(backOffer, primary, Collections.singletonList(m), startTS, ttl, false);
|
||||||
|
break;
|
||||||
|
} catch (RegionException e) {
|
||||||
|
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean commitString(List<String> keys, long startTS, long commitTS) {
|
||||||
|
return commit(
|
||||||
|
keys.stream().map(ByteString::copyFromUtf8).collect(Collectors.toList()),
|
||||||
|
startTS,
|
||||||
|
commitTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean commit(List<ByteString> keys, long startTS, long commitTS) {
|
||||||
|
if (keys.size() == 0) return true;
|
||||||
|
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000);
|
||||||
|
|
||||||
|
for (ByteString byteStringK : keys) {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
TiRegion tiRegion = session.getRegionManager().getRegionByKey(byteStringK);
|
||||||
|
RegionStoreClient client = builder.build(tiRegion);
|
||||||
|
client.commit(backOffer, Collections.singletonList(byteStringK), startTS, commitTS);
|
||||||
|
break;
|
||||||
|
} catch (RegionException e) {
|
||||||
|
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
String genRandomKey(int strLength) {
|
||||||
|
Random rnd = ThreadLocalRandom.current();
|
||||||
|
String prefix = rnd.nextInt(2) % 2 == 0 ? "a-test-" : "z-test-";
|
||||||
|
StringBuilder ret = new StringBuilder(prefix);
|
||||||
|
for (int i = 0; i < strLength; i++) {
|
||||||
|
boolean isChar = (rnd.nextInt(2) % 2 == 0);
|
||||||
|
if (isChar) {
|
||||||
|
int choice = rnd.nextInt(2) % 2 == 0 ? 65 : 97;
|
||||||
|
ret.append((char) (choice + rnd.nextInt(26)));
|
||||||
|
} else {
|
||||||
|
ret.append(rnd.nextInt(10));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue