refactor follower read (#126) (#209)

* cherry pick #126 to release-3.1

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

* fix conflict

Signed-off-by: birdstorm <samuelwyf@hotmail.com>

Co-authored-by: Liangliang Gu <marsishandsome@gmail.com>
Co-authored-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
ti-srebot 2021-06-23 15:02:51 +08:00 committed by GitHub
parent 7f468277c3
commit 031745b41b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 356 additions and 126 deletions

View File

@ -64,13 +64,13 @@ def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPul
killall -9 pd-server || true
killall -9 java || true
sleep 10
bin/pd-server --name=pd --data-dir=pd --config=../.ci/config/pd.toml &>pd.log &
bin/pd-server --name=pd --data-dir=pd --config=../config/pd.toml &>pd.log &
sleep 10
bin/tikv-server --pd=127.0.0.1:2379 -s tikv --addr=0.0.0.0:20160 --advertise-addr=127.0.0.1:20160 --config=../.ci/config/tikv.toml &>tikv.log &
bin/tikv-server --pd=127.0.0.1:2379 -s tikv --addr=0.0.0.0:20160 --advertise-addr=127.0.0.1:20160 --config=../config/tikv.toml &>tikv.log &
sleep 10
ps aux | grep '-server' || true
curl -s 127.0.0.1:2379/pd/api/v1/status || true
bin/tidb-server --store=tikv --path="127.0.0.1:2379" --config=../.ci/config/tidb.toml &>tidb.log &
bin/tidb-server --store=tikv --path="127.0.0.1:2379" --config=../config/tidb.toml &>tidb.log &
sleep 60
"""
}

4
config/pd.toml Normal file
View File

@ -0,0 +1,4 @@
# PD Configuration.
[replication]
enable-placement-rules = true
max-replicas = 1

1
config/tidb.toml Normal file
View File

@ -0,0 +1 @@
# TiDB Configuration.

5
config/tikv.toml Normal file
View File

@ -0,0 +1,5 @@
# TiKV Configuration.
[raftstore]
# set store capacity, if no set, use disk capacity.
capacity = "8G"

View File

@ -42,7 +42,7 @@ public class ConfigUtils {
public static final String TIKV_KV_CLIENT_CONCURRENCY = "tikv.kv_client_concurrency";
public static final String TIKV_KV_MODE = "tikv.kv_mode";
public static final String TIKV_IS_REPLICA_READ = "tikv.is_replica_read";
public static final String TIKV_REPLICA_READ = "tikv.replica_read";
public static final String TIKV_METRICS_ENABLE = "tikv.metrics.enable";
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";
@ -72,7 +72,7 @@ public class ConfigUtils {
public static final String DEF_DB_PREFIX = "";
public static final int DEF_KV_CLIENT_CONCURRENCY = 10;
public static final TiConfiguration.KVMode DEF_KV_MODE = TiConfiguration.KVMode.TXN;
public static final boolean DEF_IS_REPLICA_READ = false;
public static final String DEF_REPLICA_READ = "LEADER";
public static final boolean DEF_METRICS_ENABLE = false;
public static final int DEF_METRICS_PORT = 3140;
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
@ -86,4 +86,8 @@ public class ConfigUtils {
public static final String RAW_KV_MODE = "RAW";
public static final String TXN_KV_MODE = "TXN";
public static final String LEADER = "LEADER";
public static final String FOLLOWER = "FOLLOWER";
public static final String LEADER_AND_FOLLOWER = "LEADER_AND_FOLLOWER";
}

View File

@ -244,7 +244,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead());
conf.getReplicaRead());
} finally {
requestTimer.observeDuration();
}
@ -261,7 +261,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead()));
conf.getReplicaRead()));
Supplier<GetRegionRequest> request =
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
@ -288,7 +288,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead());
conf.getReplicaRead());
}
@Override
@ -302,7 +302,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead()));
conf.getReplicaRead()));
Supplier<GetRegionByIDRequest> request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
@ -361,8 +361,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
@Override
public boolean isReplicaRead() {
return conf.isReplicaRead();
public TiConfiguration.ReplicaRead getReplicaRead() {
return conf.getReplicaRead();
}
@Override

View File

@ -66,5 +66,5 @@ public interface ReadOnlyPDClient {
List<Store> getAllStores(BackOffer backOffer);
boolean isReplicaRead();
TiConfiguration.ReplicaRead getReplicaRead();
}

View File

@ -67,7 +67,7 @@ public class TiConfiguration implements Serializable {
setIfMissing(TIKV_DB_PREFIX, DEF_DB_PREFIX);
setIfMissing(TIKV_KV_CLIENT_CONCURRENCY, DEF_KV_CLIENT_CONCURRENCY);
setIfMissing(TIKV_KV_MODE, TXN_KV_MODE);
setIfMissing(TIKV_IS_REPLICA_READ, DEF_IS_REPLICA_READ);
setIfMissing(TIKV_REPLICA_READ, DEF_REPLICA_READ);
setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE);
setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT);
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
@ -216,6 +216,17 @@ public class TiConfiguration implements Serializable {
}
}
private static ReplicaRead getReplicaRead(String key) {
String value = get(key).toUpperCase(Locale.ROOT);
if (FOLLOWER.equals(value)) {
return ReplicaRead.FOLLOWER;
} else if (LEADER_AND_FOLLOWER.equals(value)) {
return ReplicaRead.LEADER_AND_FOLLOWER;
} else {
return ReplicaRead.LEADER;
}
}
private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT);
private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT);
private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE);
@ -235,7 +246,7 @@ public class TiConfiguration implements Serializable {
private KVMode kvMode = getKvMode(TIKV_KV_MODE);
private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY);
private boolean isReplicaRead = getBoolean(TIKV_IS_REPLICA_READ);
private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ);
private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
private int metricsPort = getInt(TIKV_METRICS_PORT);
@ -247,6 +258,12 @@ public class TiConfiguration implements Serializable {
RAW
}
public enum ReplicaRead {
LEADER,
FOLLOWER,
LEADER_AND_FOLLOWER
}
public static TiConfiguration createDefault() {
return new TiConfiguration();
}
@ -457,12 +474,12 @@ public class TiConfiguration implements Serializable {
return this;
}
public boolean isReplicaRead() {
return isReplicaRead;
public ReplicaRead getReplicaRead() {
return replicaRead;
}
public TiConfiguration setReplicaRead(boolean isReplicaRead) {
this.isReplicaRead = isReplicaRead;
public TiConfiguration setReplicaRead(ReplicaRead replicaRead) {
this.replicaRead = replicaRead;
return this;
}

View File

@ -50,7 +50,6 @@ public class RegionManager {
// TODO: the region cache logic need rewrite.
// https://github.com/pingcap/tispark/issues/1170
private final RegionCache cache;
private final boolean isReplicaRead;
private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;
@ -65,13 +64,11 @@ public class RegionManager {
public RegionManager(
ReadOnlyPDClient pdClient, Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
this.cache = new RegionCache(pdClient);
this.isReplicaRead = pdClient.isReplicaRead();
this.cacheInvalidateCallback = cacheInvalidateCallback;
}
public RegionManager(ReadOnlyPDClient pdClient) {
this.cache = new RegionCache(pdClient);
this.isReplicaRead = pdClient.isReplicaRead();
this.cacheInvalidateCallback = null;
}
@ -126,19 +123,8 @@ public class RegionManager {
Store store = null;
if (storeType == TiStoreType.TiKV) {
if (isReplicaRead) {
Peer peer = region.getCurrentFollower();
store = cache.getStoreById(peer.getStoreId(), backOffer);
if (store == null) {
cache.invalidateRegion(region);
}
} else {
Peer leader = region.getLeader();
store = cache.getStoreById(leader.getStoreId(), backOffer);
if (store == null) {
cache.clearAll();
}
}
Peer peer = region.getCurrentReplica();
store = cache.getStoreById(peer.getStoreId(), backOffer);
} else {
outerLoop:
for (Peer peer : region.getLearnerList()) {

View File

@ -169,7 +169,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<GetRequest> factory =
() ->
GetRequest.newBuilder()
.setContext(region.getContext(getResolvedLocks(version)))
.setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType))
.setKey(key)
.setVersion(version)
.build();
@ -214,7 +214,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<BatchGetRequest> request =
() ->
BatchGetRequest.newBuilder()
.setContext(region.getContext(getResolvedLocks(version)))
.setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType))
.addAllKeys(keys)
.setVersion(version)
.build();
@ -277,7 +277,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<ScanRequest> request =
() ->
ScanRequest.newBuilder()
.setContext(region.getContext(getResolvedLocks(version)))
.setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType))
.setStartKey(startKey)
.setVersion(version)
.setKeyOnly(keyOnly)
@ -379,7 +379,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
() ->
getIsV4()
? PrewriteRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.setStartVersion(startTs)
.setPrimaryLock(primaryLock)
.addAllMutations(mutations)
@ -389,7 +389,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setTxnSize(16)
.build()
: PrewriteRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.setStartVersion(startTs)
.setPrimaryLock(primaryLock)
.addAllMutations(mutations)
@ -469,7 +469,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<TxnHeartBeatRequest> factory =
() ->
TxnHeartBeatRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.setStartVersion(startTs)
.setPrimaryLock(primaryLock)
.setAdviseLockTtl(ttl)
@ -527,7 +527,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setStartVersion(startTs)
.setCommitVersion(commitTs)
.addAllKeys(keys)
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.build();
KVErrorHandler<CommitResponse> handler =
new KVErrorHandler<>(
@ -588,7 +588,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<Coprocessor.Request> reqToSend =
() ->
Coprocessor.Request.newBuilder()
.setContext(region.getContext(getResolvedLocks(startTs)))
.setContext(region.getReplicaContext(getResolvedLocks(startTs), this.storeType))
.setTp(REQ_TYPE_DAG.getValue())
.setStartTs(startTs)
.setData(req.toByteString())
@ -711,7 +711,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<Coprocessor.Request> reqToSend =
() ->
Coprocessor.Request.newBuilder()
.setContext(region.getContext(getResolvedLocks(startTs)))
.setContext(region.getReplicaContext(getResolvedLocks(startTs), this.storeType))
// TODO: If no executors...?
.setTp(REQ_TYPE_DAG.getValue())
.setData(req.toByteString())
@ -749,7 +749,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<SplitRegionRequest> request =
() ->
SplitRegionRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.addAllSplitKeys(splitKeys)
.build();
@ -790,7 +790,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead()))
conf.getReplicaRead()))
.collect(Collectors.toList());
}
@ -801,7 +801,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer();
try {
Supplier<RawGetRequest> factory =
() -> RawGetRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
() ->
RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.build();
KVErrorHandler<RawGetResponse> handler =
new KVErrorHandler<>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
@ -833,7 +837,10 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
try {
Supplier<RawGetKeyTTLRequest> factory =
() ->
RawGetKeyTTLRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
RawGetKeyTTLRequest.newBuilder()
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.build();
KVErrorHandler<RawGetKeyTTLResponse> handler =
new KVErrorHandler<>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
@ -868,7 +875,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete").startTimer();
try {
Supplier<RawDeleteRequest> factory =
() -> RawDeleteRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
() ->
RawDeleteRequest.newBuilder()
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.build();
KVErrorHandler<RawDeleteResponse> handler =
new KVErrorHandler<>(
@ -902,7 +913,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<RawPutRequest> factory =
() ->
RawPutRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.setValue(value)
.setTtl(ttl)
@ -940,7 +951,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<RawCASRequest> factory =
() ->
RawCASRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.setValue(value)
.setPreviousNotExist(true)
@ -986,7 +997,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<RawBatchGetRequest> factory =
() ->
RawBatchGetRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.addAllKeys(keys)
.build();
KVErrorHandler<RawBatchGetResponse> handler =
@ -1021,7 +1032,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<RawBatchPutRequest> factory =
() ->
RawBatchPutRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.addAllPairs(kvPairs)
.setTtl(ttl)
.setForCas(atomic)
@ -1073,7 +1084,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<RawBatchDeleteRequest> factory =
() ->
RawBatchDeleteRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.addAllKeys(keys)
.setForCas(atomic)
.build();
@ -1118,7 +1129,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<RawScanRequest> factory =
() ->
RawScanRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.setStartKey(key)
.setKeyOnly(keyOnly)
.setLimit(limit)
@ -1164,7 +1175,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Supplier<RawDeleteRangeRequest> factory =
() ->
RawDeleteRangeRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getReplicaContext(storeType))
.setStartKey(startKey)
.setEndKey(endKey)
.build();

View File

@ -20,10 +20,13 @@ package org.tikv.common.region;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataInput;
@ -39,12 +42,16 @@ import org.tikv.kvproto.Metapb.Peer;
import org.tikv.kvproto.Metapb.Region;
public class TiRegion implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(TiRegion.class);
private final Region meta;
private final KVMode kvMode;
private final IsolationLevel isolationLevel;
private final Kvrpcpb.CommandPri commandPri;
private final Peer leader;
private int followerIdx = 0;
private final boolean isReplicaRead;
private final TiConfiguration.ReplicaRead replicaRead;
private final List<Peer> replicaList;
private int replicaIdx;
public TiRegion(
Region meta,
@ -52,20 +59,7 @@ public class TiRegion implements Serializable {
IsolationLevel isolationLevel,
Kvrpcpb.CommandPri commandPri,
KVMode kvMode) {
this(meta, leader, isolationLevel, commandPri, kvMode, false);
}
private TiRegion(
Region meta,
Peer leader,
IsolationLevel isolationLevel,
Kvrpcpb.CommandPri commandPri,
boolean isReplicaRead) {
this.meta = meta;
this.leader = leader;
this.isolationLevel = isolationLevel;
this.commandPri = commandPri;
this.isReplicaRead = isReplicaRead;
this(meta, leader, isolationLevel, commandPri, kvMode, TiConfiguration.ReplicaRead.LEADER);
}
public TiRegion(
@ -74,9 +68,13 @@ public class TiRegion implements Serializable {
IsolationLevel isolationLevel,
Kvrpcpb.CommandPri commandPri,
KVMode kvMode,
boolean isReplicaRead) {
TiConfiguration.ReplicaRead replicaRead) {
Objects.requireNonNull(meta, "meta is null");
this.meta = decodeRegion(meta, kvMode == KVMode.RAW);
this.kvMode = kvMode;
this.isolationLevel = isolationLevel;
this.commandPri = commandPri;
this.replicaRead = replicaRead;
if (leader == null || leader.getId() == 0) {
if (meta.getPeersCount() == 0) {
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
@ -86,17 +84,21 @@ public class TiRegion implements Serializable {
} else {
this.leader = leader;
}
if (isReplicaRead && meta.getPeersCount() > 0) {
// try to get first follower
try {
chooseRandomFollower();
} catch (Exception ignore) {
// ignore
}
// init replicaList
List<Peer> followerList = getFollowerList();
replicaList = new ArrayList<>();
if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) {
replicaList.add(this.leader);
} else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) {
replicaList.addAll(followerList);
Collections.shuffle(replicaList);
} else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) {
replicaList.addAll(followerList);
Collections.shuffle(replicaList);
replicaList.add(this.leader);
}
this.isolationLevel = isolationLevel;
this.commandPri = commandPri;
this.isReplicaRead = isReplicaRead;
replicaIdx = 0;
}
private Region decodeRegion(Region region, boolean isRawRegion) {
@ -127,36 +129,41 @@ public class TiRegion implements Serializable {
return leader;
}
public Peer getCurrentFollower() {
return meta.getPeers(followerIdx);
}
private boolean isValidFollower(Peer peer) {
return Metapb.PeerRole.valueOf(peer.getRole().getValueDescriptor()) == Metapb.PeerRole.Voter;
}
private void chooseRandomFollower() {
int cnt = meta.getPeersCount();
followerIdx = new Random().nextInt(cnt);
for (int retry = cnt - 1; retry > 0; retry--) {
followerIdx = (followerIdx + 1) % cnt;
Peer cur = meta.getPeers(followerIdx);
if (isValidFollower(cur)) {
return;
public List<Peer> getFollowerList() {
List<Peer> peers = new ArrayList<>();
for (Peer peer : getMeta().getPeersList()) {
if (!peer.equals(this.leader)) {
if (peer.getRole().equals(Metapb.PeerRole.Voter)) {
peers.add(peer);
}
}
}
return peers;
}
public List<Peer> getLearnerList() {
List<Peer> peers = new ArrayList<>();
for (Peer peer : getMeta().getPeersList()) {
if (isValidFollower(peer)) {
if (peer.getRole().equals(Metapb.PeerRole.Learner)) {
peers.add(peer);
}
}
return peers;
}
public Peer getCurrentReplica() {
return replicaList.get(replicaIdx);
}
public Peer getNextReplica() {
replicaIdx = (replicaIdx + 1) % replicaList.size();
return getCurrentReplica();
}
private boolean isLeader(Peer peer) {
return getLeader().equals(peer);
}
public long getId() {
return this.meta.getId();
}
@ -177,26 +184,30 @@ public class TiRegion implements Serializable {
return Key.toRawKey(getEndKey());
}
public Kvrpcpb.Context getContext() {
return getContext(java.util.Collections.emptySet());
public Kvrpcpb.Context getLeaderContext() {
return getContext(this.leader, java.util.Collections.emptySet(), TiStoreType.TiKV);
}
public Kvrpcpb.Context getContext(Set<Long> resolvedLocks) {
public Kvrpcpb.Context getReplicaContext(TiStoreType storeType) {
return getContext(getCurrentReplica(), java.util.Collections.emptySet(), storeType);
}
public Kvrpcpb.Context getReplicaContext(Set<Long> resolvedLocks, TiStoreType storeType) {
return getContext(getCurrentReplica(), resolvedLocks, storeType);
}
private Kvrpcpb.Context getContext(
Peer currentPeer, Set<Long> resolvedLocks, TiStoreType storeType) {
boolean replicaRead = !isLeader(getCurrentReplica()) && TiStoreType.TiKV.equals(storeType);
Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder();
builder.setIsolationLevel(this.isolationLevel);
builder.setPriority(this.commandPri);
if (isReplicaRead) {
builder
.setRegionId(meta.getId())
.setPeer(getCurrentFollower())
.setReplicaRead(true)
.setRegionEpoch(this.meta.getRegionEpoch());
} else {
builder
.setRegionId(meta.getId())
.setPeer(this.leader)
.setRegionEpoch(this.meta.getRegionEpoch());
}
builder
.setIsolationLevel(this.isolationLevel)
.setPriority(this.commandPri)
.setRegionId(meta.getId())
.setPeer(currentPeer)
.setReplicaRead(replicaRead)
.setRegionEpoch(this.meta.getRegionEpoch());
builder.addAllResolvedLocks(resolvedLocks);
return builder.build();
}
@ -218,7 +229,8 @@ public class TiRegion implements Serializable {
List<Peer> peers = meta.getPeersList();
for (Peer p : peers) {
if (p.getStoreId() == leaderStoreID) {
return new TiRegion(this.meta, p, this.isolationLevel, this.commandPri, this.isReplicaRead);
return new TiRegion(
this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaRead);
}
}
return null;

View File

@ -125,7 +125,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
Supplier<CleanupRequest> factory =
() ->
CleanupRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getLeaderContext())
.setKey(primary)
.setStartVersion(txnID)
.build();
@ -232,7 +232,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
factory =
() ->
ResolveLockRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getLeaderContext())
.setStartVersion(lock.getTxnID())
.setCommitVersion(txnStatus)
.build();
@ -240,7 +240,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
factory =
() ->
ResolveLockRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getLeaderContext())
.setStartVersion(lock.getTxnID())
.build();
}

View File

@ -151,7 +151,7 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient
Kvrpcpb.ResolveLockRequest.Builder builder =
Kvrpcpb.ResolveLockRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getLeaderContext())
.setStartVersion(lock.getTxnID());
if (txnStatus.isCommitted()) {
@ -230,7 +230,7 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient
() -> {
TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
return CleanupRequest.newBuilder()
.setContext(primaryKeyRegion.getContext())
.setContext(primaryKeyRegion.getLeaderContext())
.setKey(primary)
.setStartVersion(txnID)
.setCurrentTs(currentTS)

View File

@ -169,7 +169,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
Supplier<Kvrpcpb.PessimisticRollbackRequest> factory =
() ->
Kvrpcpb.PessimisticRollbackRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getLeaderContext())
.setStartVersion(lock.getTxnID())
.setForUpdateTs(forUpdateTS)
.addKeys(lock.getKey())
@ -287,7 +287,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
() -> {
TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
return Kvrpcpb.CheckTxnStatusRequest.newBuilder()
.setContext(primaryKeyRegion.getContext())
.setContext(primaryKeyRegion.getLeaderContext())
.setPrimaryKey(primary)
.setLockTs(txnID)
.setCallerStartTs(callerStartTS)
@ -364,7 +364,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
Kvrpcpb.ResolveLockRequest.Builder builder =
Kvrpcpb.ResolveLockRequest.newBuilder()
.setContext(region.getContext())
.setContext(region.getLeaderContext())
.setStartVersion(lock.getTxnID());
if (txnStatus.isCommitted()) {

View File

@ -94,6 +94,7 @@ public class RawKVClientTest {
}
}
// tikv-4.0 does not support atomic api
@Ignore
public void atomicAPITest() {
if (!initialized) return;
@ -115,6 +116,7 @@ public class RawKVClientTest {
assertTrue(res3.isEmpty());
}
// tikv-4.0 doest not support ttl
@Ignore
public void getKeyTTLTest() {
if (!initialized) return;

View File

@ -0,0 +1,57 @@
package org.tikv.txn;
import com.google.protobuf.ByteString;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
public class ReplicaReadTest extends TXNTest {
private TiSession session;
private String key;
private String value;
@Test
public void leaderReadTest() {
doTest(TiConfiguration.ReplicaRead.LEADER);
}
// ci only has one TiKV instance
@Ignore
public void followerReadTest() {
doTest(TiConfiguration.ReplicaRead.FOLLOWER);
}
@Test
public void leadAndFollowerReadTest() {
doTest(TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER);
}
private void doTest(TiConfiguration.ReplicaRead replicaRead) {
TiConfiguration conf = TiConfiguration.createDefault();
conf.setReplicaRead(replicaRead);
session = TiSession.create(conf);
putKV(key, value);
ByteString v = session.createSnapshot().get(ByteString.copyFromUtf8(key));
Assert.assertEquals(value, v.toStringUtf8());
}
@Before
public void setUp() {
super.setUp();
key = genRandomKey(64);
value = "v0";
}
@After
public void tearDown() throws Exception {
if (session != null) {
session.close();
}
super.tearDown();
}
}

View File

@ -0,0 +1,131 @@
package org.tikv.txn;
import static junit.framework.TestCase.assertTrue;
import static junit.framework.TestCase.fail;
import com.google.protobuf.ByteString;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.RegionException;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;
public class TXNTest {
static final int DEFAULT_TTL = 10;
private TiSession session;
RegionStoreClient.RegionStoreClientBuilder builder;
@Before
public void setUp() {
TiConfiguration conf = TiConfiguration.createDefault();
try {
session = TiSession.create(conf);
this.builder = session.getRegionStoreClientBuilder();
} catch (Exception e) {
fail("TiDB cluster may not be present");
}
}
@After
public void tearDown() throws Exception {
if (session != null) {
session.close();
}
}
void putKV(String key, String value) {
long startTS = session.getTimestamp().getVersion();
long commitTS = session.getTimestamp().getVersion();
putKV(key, value, startTS, commitTS);
}
void putKV(String key, String value, long startTS, long commitTS) {
Kvrpcpb.Mutation m =
Kvrpcpb.Mutation.newBuilder()
.setKey(ByteString.copyFromUtf8(key))
.setOp(Kvrpcpb.Op.Put)
.setValue(ByteString.copyFromUtf8(value))
.build();
boolean res = prewriteString(Collections.singletonList(m), startTS, key, DEFAULT_TTL);
assertTrue(res);
res = commitString(Collections.singletonList(key), startTS, commitTS);
assertTrue(res);
}
boolean prewriteString(List<Kvrpcpb.Mutation> mutations, long startTS, String primary, long ttl) {
return prewrite(mutations, startTS, ByteString.copyFromUtf8(primary), ttl);
}
boolean prewrite(List<Kvrpcpb.Mutation> mutations, long startTS, ByteString primary, long ttl) {
if (mutations.size() == 0) return true;
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000);
for (Kvrpcpb.Mutation m : mutations) {
while (true) {
try {
TiRegion region = session.getRegionManager().getRegionByKey(m.getKey());
RegionStoreClient client = builder.build(region);
client.prewrite(backOffer, primary, Collections.singletonList(m), startTS, ttl, false);
break;
} catch (RegionException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
}
}
}
return true;
}
boolean commitString(List<String> keys, long startTS, long commitTS) {
return commit(
keys.stream().map(ByteString::copyFromUtf8).collect(Collectors.toList()),
startTS,
commitTS);
}
boolean commit(List<ByteString> keys, long startTS, long commitTS) {
if (keys.size() == 0) return true;
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000);
for (ByteString byteStringK : keys) {
while (true) {
try {
TiRegion tiRegion = session.getRegionManager().getRegionByKey(byteStringK);
RegionStoreClient client = builder.build(tiRegion);
client.commit(backOffer, Collections.singletonList(byteStringK), startTS, commitTS);
break;
} catch (RegionException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
}
}
}
return true;
}
String genRandomKey(int strLength) {
Random rnd = ThreadLocalRandom.current();
String prefix = rnd.nextInt(2) % 2 == 0 ? "a-test-" : "z-test-";
StringBuilder ret = new StringBuilder(prefix);
for (int i = 0; i < strLength; i++) {
boolean isChar = (rnd.nextInt(2) % 2 == 0);
if (isChar) {
int choice = rnd.nextInt(2) % 2 == 0 ? 65 : 97;
ret.append((char) (choice + rnd.nextInt(26)));
} else {
ret.append(rnd.nextInt(10));
}
}
return ret.toString();
}
}