diff --git a/.ci/integration_test.groovy b/.ci/integration_test.groovy index dc68801b29..ea7f95f898 100644 --- a/.ci/integration_test.groovy +++ b/.ci/integration_test.groovy @@ -61,13 +61,13 @@ def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPul killall -9 pd-server || true killall -9 java || true sleep 10 - bin/pd-server --name=pd --data-dir=pd --config=../.ci/config/pd.toml &>pd.log & + bin/pd-server --name=pd --data-dir=pd --config=../config/pd.toml &>pd.log & sleep 10 - bin/tikv-server --pd=127.0.0.1:2379 -s tikv --addr=0.0.0.0:20160 --advertise-addr=127.0.0.1:20160 --config=../.ci/config/tikv.toml &>tikv.log & + bin/tikv-server --pd=127.0.0.1:2379 -s tikv --addr=0.0.0.0:20160 --advertise-addr=127.0.0.1:20160 --config=../config/tikv.toml &>tikv.log & sleep 10 ps aux | grep '-server' || true curl -s 127.0.0.1:2379/pd/api/v1/status || true - bin/tidb-server --store=tikv --path="127.0.0.1:2379" --config=../.ci/config/tidb.toml &>tidb.log & + bin/tidb-server --store=tikv --path="127.0.0.1:2379" --config=../config/tidb.toml &>tidb.log & sleep 60 """ } diff --git a/config/pd.toml b/config/pd.toml new file mode 100644 index 0000000000..f2795de343 --- /dev/null +++ b/config/pd.toml @@ -0,0 +1,4 @@ +# PD Configuration. +[replication] +enable-placement-rules = true +max-replicas = 1 \ No newline at end of file diff --git a/config/tidb.toml b/config/tidb.toml new file mode 100644 index 0000000000..c4d7422aef --- /dev/null +++ b/config/tidb.toml @@ -0,0 +1 @@ +# TiDB Configuration. \ No newline at end of file diff --git a/config/tikv.toml b/config/tikv.toml new file mode 100644 index 0000000000..287abc69e4 --- /dev/null +++ b/config/tikv.toml @@ -0,0 +1,5 @@ +# TiKV Configuration. + +[raftstore] +# set store capacity, if no set, use disk capacity. +capacity = "8G" diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index a9dab95f60..6df036d4e0 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -42,7 +42,7 @@ public class ConfigUtils { public static final String TIKV_KV_CLIENT_CONCURRENCY = "tikv.kv_client_concurrency"; public static final String TIKV_KV_MODE = "tikv.kv_mode"; - public static final String TIKV_IS_REPLICA_READ = "tikv.is_replica_read"; + public static final String TIKV_REPLICA_READ = "tikv.replica_read"; public static final String TIKV_METRICS_ENABLE = "tikv.metrics.enable"; public static final String TIKV_METRICS_PORT = "tikv.metrics.port"; @@ -72,7 +72,7 @@ public class ConfigUtils { public static final String DEF_DB_PREFIX = ""; public static final int DEF_KV_CLIENT_CONCURRENCY = 10; public static final TiConfiguration.KVMode DEF_KV_MODE = TiConfiguration.KVMode.TXN; - public static final boolean DEF_IS_REPLICA_READ = false; + public static final String DEF_REPLICA_READ = "LEADER"; public static final boolean DEF_METRICS_ENABLE = false; public static final int DEF_METRICS_PORT = 3140; public static final String DEF_TIKV_NETWORK_MAPPING_NAME = ""; @@ -86,4 +86,8 @@ public class ConfigUtils { public static final String RAW_KV_MODE = "RAW"; public static final String TXN_KV_MODE = "TXN"; + + public static final String LEADER = "LEADER"; + public static final String FOLLOWER = "FOLLOWER"; + public static final String LEADER_AND_FOLLOWER = "LEADER_AND_FOLLOWER"; } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index e652102bb6..e7270bff91 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -244,7 +244,7 @@ public class PDClient extends AbstractGRPCClient conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.isReplicaRead()); + conf.getReplicaRead()); } finally { requestTimer.observeDuration(); } @@ -261,7 +261,7 @@ public class PDClient extends AbstractGRPCClient conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.isReplicaRead())); + conf.getReplicaRead())); Supplier request = () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build(); @@ -288,7 +288,7 @@ public class PDClient extends AbstractGRPCClient conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.isReplicaRead()); + conf.getReplicaRead()); } @Override @@ -302,7 +302,7 @@ public class PDClient extends AbstractGRPCClient conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.isReplicaRead())); + conf.getReplicaRead())); Supplier request = () -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build(); @@ -361,8 +361,8 @@ public class PDClient extends AbstractGRPCClient } @Override - public boolean isReplicaRead() { - return conf.isReplicaRead(); + public TiConfiguration.ReplicaRead getReplicaRead() { + return conf.getReplicaRead(); } @Override diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index e759e03ce4..eab6850abe 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -66,5 +66,5 @@ public interface ReadOnlyPDClient { List getAllStores(BackOffer backOffer); - boolean isReplicaRead(); + TiConfiguration.ReplicaRead getReplicaRead(); } diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 7d81540b0b..5af96a6d48 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -67,7 +67,7 @@ public class TiConfiguration implements Serializable { setIfMissing(TIKV_DB_PREFIX, DEF_DB_PREFIX); setIfMissing(TIKV_KV_CLIENT_CONCURRENCY, DEF_KV_CLIENT_CONCURRENCY); setIfMissing(TIKV_KV_MODE, TXN_KV_MODE); - setIfMissing(TIKV_IS_REPLICA_READ, DEF_IS_REPLICA_READ); + setIfMissing(TIKV_REPLICA_READ, DEF_REPLICA_READ); setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE); setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT); setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME); @@ -216,6 +216,17 @@ public class TiConfiguration implements Serializable { } } + private static ReplicaRead getReplicaRead(String key) { + String value = get(key).toUpperCase(Locale.ROOT); + if (FOLLOWER.equals(value)) { + return ReplicaRead.FOLLOWER; + } else if (LEADER_AND_FOLLOWER.equals(value)) { + return ReplicaRead.LEADER_AND_FOLLOWER; + } else { + return ReplicaRead.LEADER; + } + } + private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT); private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT); private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE); @@ -235,7 +246,7 @@ public class TiConfiguration implements Serializable { private KVMode kvMode = getKvMode(TIKV_KV_MODE); private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY); - private boolean isReplicaRead = getBoolean(TIKV_IS_REPLICA_READ); + private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ); private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE); private int metricsPort = getInt(TIKV_METRICS_PORT); @@ -247,6 +258,12 @@ public class TiConfiguration implements Serializable { RAW } + public enum ReplicaRead { + LEADER, + FOLLOWER, + LEADER_AND_FOLLOWER + } + public static TiConfiguration createDefault() { return new TiConfiguration(); } @@ -457,12 +474,12 @@ public class TiConfiguration implements Serializable { return this; } - public boolean isReplicaRead() { - return isReplicaRead; + public ReplicaRead getReplicaRead() { + return replicaRead; } - public TiConfiguration setReplicaRead(boolean isReplicaRead) { - this.isReplicaRead = isReplicaRead; + public TiConfiguration setReplicaRead(ReplicaRead replicaRead) { + this.replicaRead = replicaRead; return this; } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 4cf6fc668b..67eee685cf 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -50,7 +50,6 @@ public class RegionManager { // TODO: the region cache logic need rewrite. // https://github.com/pingcap/tispark/issues/1170 private final RegionCache cache; - private final boolean isReplicaRead; private final Function cacheInvalidateCallback; @@ -65,13 +64,11 @@ public class RegionManager { public RegionManager( ReadOnlyPDClient pdClient, Function cacheInvalidateCallback) { this.cache = new RegionCache(pdClient); - this.isReplicaRead = pdClient.isReplicaRead(); this.cacheInvalidateCallback = cacheInvalidateCallback; } public RegionManager(ReadOnlyPDClient pdClient) { this.cache = new RegionCache(pdClient); - this.isReplicaRead = pdClient.isReplicaRead(); this.cacheInvalidateCallback = null; } @@ -126,13 +123,8 @@ public class RegionManager { Store store = null; if (storeType == TiStoreType.TiKV) { - if (isReplicaRead) { - Peer peer = region.getCurrentFollower(); - store = cache.getStoreById(peer.getStoreId(), backOffer); - } else { - Peer leader = region.getLeader(); - store = cache.getStoreById(leader.getStoreId(), backOffer); - } + Peer peer = region.getCurrentReplica(); + store = cache.getStoreById(peer.getStoreId(), backOffer); } else { outerLoop: for (Peer peer : region.getLearnerList()) { diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index a67575b9cb..7e5d500f41 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -169,7 +169,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> GetRequest.newBuilder() - .setContext(region.getContext(getResolvedLocks(version))) + .setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType)) .setKey(key) .setVersion(version) .build(); @@ -214,7 +214,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier request = () -> BatchGetRequest.newBuilder() - .setContext(region.getContext(getResolvedLocks(version))) + .setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType)) .addAllKeys(keys) .setVersion(version) .build(); @@ -277,7 +277,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier request = () -> ScanRequest.newBuilder() - .setContext(region.getContext(getResolvedLocks(version))) + .setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType)) .setStartKey(startKey) .setVersion(version) .setKeyOnly(keyOnly) @@ -379,7 +379,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { () -> getIsV4() ? PrewriteRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .setStartVersion(startTs) .setPrimaryLock(primaryLock) .addAllMutations(mutations) @@ -389,7 +389,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setTxnSize(16) .build() : PrewriteRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .setStartVersion(startTs) .setPrimaryLock(primaryLock) .addAllMutations(mutations) @@ -469,7 +469,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> TxnHeartBeatRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .setStartVersion(startTs) .setPrimaryLock(primaryLock) .setAdviseLockTtl(ttl) @@ -527,7 +527,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setStartVersion(startTs) .setCommitVersion(commitTs) .addAllKeys(keys) - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .build(); KVErrorHandler handler = new KVErrorHandler<>( @@ -588,7 +588,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier reqToSend = () -> Coprocessor.Request.newBuilder() - .setContext(region.getContext(getResolvedLocks(startTs))) + .setContext(region.getReplicaContext(getResolvedLocks(startTs), this.storeType)) .setTp(REQ_TYPE_DAG.getValue()) .setStartTs(startTs) .setData(req.toByteString()) @@ -711,7 +711,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier reqToSend = () -> Coprocessor.Request.newBuilder() - .setContext(region.getContext(getResolvedLocks(startTs))) + .setContext(region.getReplicaContext(getResolvedLocks(startTs), this.storeType)) // TODO: If no executors...? .setTp(REQ_TYPE_DAG.getValue()) .setData(req.toByteString()) @@ -749,7 +749,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier request = () -> SplitRegionRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .addAllSplitKeys(splitKeys) .build(); @@ -790,7 +790,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.isReplicaRead())) + conf.getReplicaRead())) .collect(Collectors.toList()); } @@ -801,7 +801,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient { GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer(); try { Supplier factory = - () -> RawGetRequest.newBuilder().setContext(region.getContext()).setKey(key).build(); + () -> + RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(storeType)) + .setKey(key) + .build(); KVErrorHandler handler = new KVErrorHandler<>( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); @@ -833,7 +837,10 @@ public class RegionStoreClient extends AbstractRegionStoreClient { try { Supplier factory = () -> - RawGetKeyTTLRequest.newBuilder().setContext(region.getContext()).setKey(key).build(); + RawGetKeyTTLRequest.newBuilder() + .setContext(region.getReplicaContext(storeType)) + .setKey(key) + .build(); KVErrorHandler handler = new KVErrorHandler<>( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); @@ -868,7 +875,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient { GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete").startTimer(); try { Supplier factory = - () -> RawDeleteRequest.newBuilder().setContext(region.getContext()).setKey(key).build(); + () -> + RawDeleteRequest.newBuilder() + .setContext(region.getReplicaContext(storeType)) + .setKey(key) + .build(); KVErrorHandler handler = new KVErrorHandler<>( @@ -902,7 +913,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawPutRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .setKey(key) .setValue(value) .setTtl(ttl) @@ -940,7 +951,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawCASRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .setKey(key) .setValue(value) .setPreviousNotExist(true) @@ -986,7 +997,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawBatchGetRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .addAllKeys(keys) .build(); KVErrorHandler handler = @@ -1021,7 +1032,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawBatchPutRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .addAllPairs(kvPairs) .setTtl(ttl) .setForCas(atomic) @@ -1073,7 +1084,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawBatchDeleteRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .addAllKeys(keys) .setForCas(atomic) .build(); @@ -1118,7 +1129,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawScanRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .setStartKey(key) .setKeyOnly(keyOnly) .setLimit(limit) @@ -1164,7 +1175,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawDeleteRangeRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getReplicaContext(storeType)) .setStartKey(startKey) .setEndKey(endKey) .build(); diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 663be5c2c4..19dac68f1a 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -20,10 +20,13 @@ package org.tikv.common.region; import com.google.protobuf.ByteString; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.Random; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.codec.Codec.BytesCodec; import org.tikv.common.codec.CodecDataInput; @@ -39,12 +42,16 @@ import org.tikv.kvproto.Metapb.Peer; import org.tikv.kvproto.Metapb.Region; public class TiRegion implements Serializable { + private static final Logger logger = LoggerFactory.getLogger(TiRegion.class); + private final Region meta; + private final KVMode kvMode; private final IsolationLevel isolationLevel; private final Kvrpcpb.CommandPri commandPri; private final Peer leader; - private int followerIdx = 0; - private final boolean isReplicaRead; + private final TiConfiguration.ReplicaRead replicaRead; + private final List replicaList; + private int replicaIdx; public TiRegion( Region meta, @@ -52,20 +59,7 @@ public class TiRegion implements Serializable { IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri, KVMode kvMode) { - this(meta, leader, isolationLevel, commandPri, kvMode, false); - } - - private TiRegion( - Region meta, - Peer leader, - IsolationLevel isolationLevel, - Kvrpcpb.CommandPri commandPri, - boolean isReplicaRead) { - this.meta = meta; - this.leader = leader; - this.isolationLevel = isolationLevel; - this.commandPri = commandPri; - this.isReplicaRead = isReplicaRead; + this(meta, leader, isolationLevel, commandPri, kvMode, TiConfiguration.ReplicaRead.LEADER); } public TiRegion( @@ -74,9 +68,13 @@ public class TiRegion implements Serializable { IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri, KVMode kvMode, - boolean isReplicaRead) { + TiConfiguration.ReplicaRead replicaRead) { Objects.requireNonNull(meta, "meta is null"); this.meta = decodeRegion(meta, kvMode == KVMode.RAW); + this.kvMode = kvMode; + this.isolationLevel = isolationLevel; + this.commandPri = commandPri; + this.replicaRead = replicaRead; if (leader == null || leader.getId() == 0) { if (meta.getPeersCount() == 0) { throw new TiClientInternalException("Empty peer list for region " + meta.getId()); @@ -86,17 +84,21 @@ public class TiRegion implements Serializable { } else { this.leader = leader; } - if (isReplicaRead && meta.getPeersCount() > 0) { - // try to get first follower - try { - chooseRandomFollower(); - } catch (Exception ignore) { - // ignore - } + + // init replicaList + List followerList = getFollowerList(); + replicaList = new ArrayList<>(); + if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) { + replicaList.add(this.leader); + } else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) { + replicaList.addAll(followerList); + Collections.shuffle(replicaList); + } else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) { + replicaList.addAll(followerList); + Collections.shuffle(replicaList); + replicaList.add(this.leader); } - this.isolationLevel = isolationLevel; - this.commandPri = commandPri; - this.isReplicaRead = isReplicaRead; + replicaIdx = 0; } private Region decodeRegion(Region region, boolean isRawRegion) { @@ -127,36 +129,41 @@ public class TiRegion implements Serializable { return leader; } - public Peer getCurrentFollower() { - return meta.getPeers(followerIdx); - } - - private boolean isValidFollower(Peer peer) { - return Metapb.PeerRole.valueOf(peer.getRole().getValueDescriptor()) == Metapb.PeerRole.Voter; - } - - private void chooseRandomFollower() { - int cnt = meta.getPeersCount(); - followerIdx = new Random().nextInt(cnt); - for (int retry = cnt - 1; retry > 0; retry--) { - followerIdx = (followerIdx + 1) % cnt; - Peer cur = meta.getPeers(followerIdx); - if (isValidFollower(cur)) { - return; + public List getFollowerList() { + List peers = new ArrayList<>(); + for (Peer peer : getMeta().getPeersList()) { + if (!peer.equals(this.leader)) { + if (peer.getRole().equals(Metapb.PeerRole.Voter)) { + peers.add(peer); + } } } + return peers; } public List getLearnerList() { List peers = new ArrayList<>(); for (Peer peer : getMeta().getPeersList()) { - if (isValidFollower(peer)) { + if (peer.getRole().equals(Metapb.PeerRole.Learner)) { peers.add(peer); } } return peers; } + public Peer getCurrentReplica() { + return replicaList.get(replicaIdx); + } + + public Peer getNextReplica() { + replicaIdx = (replicaIdx + 1) % replicaList.size(); + return getCurrentReplica(); + } + + private boolean isLeader(Peer peer) { + return getLeader().equals(peer); + } + public long getId() { return this.meta.getId(); } @@ -177,26 +184,30 @@ public class TiRegion implements Serializable { return Key.toRawKey(getEndKey()); } - public Kvrpcpb.Context getContext() { - return getContext(java.util.Collections.emptySet()); + public Kvrpcpb.Context getLeaderContext() { + return getContext(this.leader, java.util.Collections.emptySet(), TiStoreType.TiKV); } - public Kvrpcpb.Context getContext(Set resolvedLocks) { + public Kvrpcpb.Context getReplicaContext(TiStoreType storeType) { + return getContext(getCurrentReplica(), java.util.Collections.emptySet(), storeType); + } + + public Kvrpcpb.Context getReplicaContext(Set resolvedLocks, TiStoreType storeType) { + return getContext(getCurrentReplica(), resolvedLocks, storeType); + } + + private Kvrpcpb.Context getContext( + Peer currentPeer, Set resolvedLocks, TiStoreType storeType) { + boolean replicaRead = !isLeader(getCurrentReplica()) && TiStoreType.TiKV.equals(storeType); + Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder(); - builder.setIsolationLevel(this.isolationLevel); - builder.setPriority(this.commandPri); - if (isReplicaRead) { - builder - .setRegionId(meta.getId()) - .setPeer(getCurrentFollower()) - .setReplicaRead(true) - .setRegionEpoch(this.meta.getRegionEpoch()); - } else { - builder - .setRegionId(meta.getId()) - .setPeer(this.leader) - .setRegionEpoch(this.meta.getRegionEpoch()); - } + builder + .setIsolationLevel(this.isolationLevel) + .setPriority(this.commandPri) + .setRegionId(meta.getId()) + .setPeer(currentPeer) + .setReplicaRead(replicaRead) + .setRegionEpoch(this.meta.getRegionEpoch()); builder.addAllResolvedLocks(resolvedLocks); return builder.build(); } @@ -218,7 +229,8 @@ public class TiRegion implements Serializable { List peers = meta.getPeersList(); for (Peer p : peers) { if (p.getStoreId() == leaderStoreID) { - return new TiRegion(this.meta, p, this.isolationLevel, this.commandPri, this.isReplicaRead); + return new TiRegion( + this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaRead); } } return null; diff --git a/src/main/java/org/tikv/txn/LockResolverClientV2.java b/src/main/java/org/tikv/txn/LockResolverClientV2.java index 5c221a5b0e..526c483116 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV2.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV2.java @@ -125,7 +125,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient Supplier factory = () -> CleanupRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getLeaderContext()) .setKey(primary) .setStartVersion(txnID) .build(); @@ -232,7 +232,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient factory = () -> ResolveLockRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getLeaderContext()) .setStartVersion(lock.getTxnID()) .setCommitVersion(txnStatus) .build(); @@ -240,7 +240,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient factory = () -> ResolveLockRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getLeaderContext()) .setStartVersion(lock.getTxnID()) .build(); } diff --git a/src/main/java/org/tikv/txn/LockResolverClientV3.java b/src/main/java/org/tikv/txn/LockResolverClientV3.java index dbc4aa700c..4ec1f676e8 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV3.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV3.java @@ -151,7 +151,7 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient Kvrpcpb.ResolveLockRequest.Builder builder = Kvrpcpb.ResolveLockRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getLeaderContext()) .setStartVersion(lock.getTxnID()); if (txnStatus.isCommitted()) { @@ -230,7 +230,7 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient () -> { TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary); return CleanupRequest.newBuilder() - .setContext(primaryKeyRegion.getContext()) + .setContext(primaryKeyRegion.getLeaderContext()) .setKey(primary) .setStartVersion(txnID) .setCurrentTs(currentTS) diff --git a/src/main/java/org/tikv/txn/LockResolverClientV4.java b/src/main/java/org/tikv/txn/LockResolverClientV4.java index edea6ab2fe..9b23733553 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV4.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV4.java @@ -169,7 +169,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient Supplier factory = () -> Kvrpcpb.PessimisticRollbackRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getLeaderContext()) .setStartVersion(lock.getTxnID()) .setForUpdateTs(forUpdateTS) .addKeys(lock.getKey()) @@ -287,7 +287,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient () -> { TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary); return Kvrpcpb.CheckTxnStatusRequest.newBuilder() - .setContext(primaryKeyRegion.getContext()) + .setContext(primaryKeyRegion.getLeaderContext()) .setPrimaryKey(primary) .setLockTs(txnID) .setCallerStartTs(callerStartTS) @@ -364,7 +364,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient Kvrpcpb.ResolveLockRequest.Builder builder = Kvrpcpb.ResolveLockRequest.newBuilder() - .setContext(region.getContext()) + .setContext(region.getLeaderContext()) .setStartVersion(lock.getTxnID()); if (txnStatus.isCommitted()) { diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index ada12695c3..cda50aa830 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -8,6 +8,7 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +92,8 @@ public class RawKVClientTest { } } - @Test + // tikv-4.0 does not support atomic api + @Ignore public void atomicAPITest() { if (!initialized) return; long ttl = 10; @@ -112,7 +114,8 @@ public class RawKVClientTest { assert res3.isEmpty(); } - @Test + // tikv-4.0 doest not support ttl + @Ignore public void getKeyTTLTest() { if (!initialized) return; long ttl = 10; diff --git a/src/test/java/org/tikv/txn/ReplicaReadTest.java b/src/test/java/org/tikv/txn/ReplicaReadTest.java new file mode 100644 index 0000000000..8a150eaab5 --- /dev/null +++ b/src/test/java/org/tikv/txn/ReplicaReadTest.java @@ -0,0 +1,57 @@ +package org.tikv.txn; + +import com.google.protobuf.ByteString; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; + +public class ReplicaReadTest extends TXNTest { + private TiSession session; + private String key; + private String value; + + @Test + public void leaderReadTest() { + doTest(TiConfiguration.ReplicaRead.LEADER); + } + + // ci only has one TiKV instance + @Ignore + public void followerReadTest() { + doTest(TiConfiguration.ReplicaRead.FOLLOWER); + } + + @Test + public void leadAndFollowerReadTest() { + doTest(TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER); + } + + private void doTest(TiConfiguration.ReplicaRead replicaRead) { + TiConfiguration conf = TiConfiguration.createDefault(); + conf.setReplicaRead(replicaRead); + session = TiSession.create(conf); + + putKV(key, value); + ByteString v = session.createSnapshot().get(ByteString.copyFromUtf8(key)); + Assert.assertEquals(value, v.toStringUtf8()); + } + + @Before + public void setUp() { + super.setUp(); + key = genRandomKey(64); + value = "v0"; + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + super.tearDown(); + } +} diff --git a/src/test/java/org/tikv/txn/TXNTest.java b/src/test/java/org/tikv/txn/TXNTest.java new file mode 100644 index 0000000000..3b2ba242c4 --- /dev/null +++ b/src/test/java/org/tikv/txn/TXNTest.java @@ -0,0 +1,131 @@ +package org.tikv.txn; + +import static junit.framework.TestCase.assertTrue; +import static junit.framework.TestCase.fail; + +import com.google.protobuf.ByteString; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.exception.RegionException; +import org.tikv.common.region.RegionStoreClient; +import org.tikv.common.region.TiRegion; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.Kvrpcpb; + +public class TXNTest { + static final int DEFAULT_TTL = 10; + private TiSession session; + RegionStoreClient.RegionStoreClientBuilder builder; + + @Before + public void setUp() { + TiConfiguration conf = TiConfiguration.createDefault(); + try { + session = TiSession.create(conf); + this.builder = session.getRegionStoreClientBuilder(); + } catch (Exception e) { + fail("TiDB cluster may not be present"); + } + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + } + + void putKV(String key, String value) { + long startTS = session.getTimestamp().getVersion(); + long commitTS = session.getTimestamp().getVersion(); + putKV(key, value, startTS, commitTS); + } + + void putKV(String key, String value, long startTS, long commitTS) { + Kvrpcpb.Mutation m = + Kvrpcpb.Mutation.newBuilder() + .setKey(ByteString.copyFromUtf8(key)) + .setOp(Kvrpcpb.Op.Put) + .setValue(ByteString.copyFromUtf8(value)) + .build(); + + boolean res = prewriteString(Collections.singletonList(m), startTS, key, DEFAULT_TTL); + assertTrue(res); + res = commitString(Collections.singletonList(key), startTS, commitTS); + assertTrue(res); + } + + boolean prewriteString(List mutations, long startTS, String primary, long ttl) { + return prewrite(mutations, startTS, ByteString.copyFromUtf8(primary), ttl); + } + + boolean prewrite(List mutations, long startTS, ByteString primary, long ttl) { + if (mutations.size() == 0) return true; + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000); + + for (Kvrpcpb.Mutation m : mutations) { + while (true) { + try { + TiRegion region = session.getRegionManager().getRegionByKey(m.getKey()); + RegionStoreClient client = builder.build(region); + client.prewrite(backOffer, primary, Collections.singletonList(m), startTS, ttl, false); + break; + } catch (RegionException e) { + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + } + } + } + return true; + } + + boolean commitString(List keys, long startTS, long commitTS) { + return commit( + keys.stream().map(ByteString::copyFromUtf8).collect(Collectors.toList()), + startTS, + commitTS); + } + + boolean commit(List keys, long startTS, long commitTS) { + if (keys.size() == 0) return true; + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000); + + for (ByteString byteStringK : keys) { + while (true) { + try { + TiRegion tiRegion = session.getRegionManager().getRegionByKey(byteStringK); + RegionStoreClient client = builder.build(tiRegion); + client.commit(backOffer, Collections.singletonList(byteStringK), startTS, commitTS); + break; + } catch (RegionException e) { + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + } + } + } + return true; + } + + String genRandomKey(int strLength) { + Random rnd = ThreadLocalRandom.current(); + String prefix = rnd.nextInt(2) % 2 == 0 ? "a-test-" : "z-test-"; + StringBuilder ret = new StringBuilder(prefix); + for (int i = 0; i < strLength; i++) { + boolean isChar = (rnd.nextInt(2) % 2 == 0); + if (isChar) { + int choice = rnd.nextInt(2) % 2 == 0 ? 65 : 97; + ret.append((char) (choice + rnd.nextInt(26))); + } else { + ret.append(rnd.nextInt(10)); + } + } + return ret.toString(); + } +}