diff --git a/pom.xml b/pom.xml
index da8009f7c0..1d9b1397fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -185,7 +185,7 @@
org.apache.commons
commons-lang3
- 3.9
+ 3.10
test
diff --git a/src/main/java/org/tikv/common/DefaultHostMapping.java b/src/main/java/org/tikv/common/DefaultHostMapping.java
new file mode 100644
index 0000000000..2e53fff37f
--- /dev/null
+++ b/src/main/java/org/tikv/common/DefaultHostMapping.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2021 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.common;
+
+import static org.tikv.common.pd.PDUtils.addrToUri;
+
+import com.google.common.annotations.Beta;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.KeyValue;
+import io.etcd.jetcd.kv.GetResponse;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultHostMapping implements HostMapping {
+ private static final String NETWORK_MAPPING_PATH = "/client/url-mapping";
+ private final Client etcdClient;
+ private final String networkMappingName;
+ private final ConcurrentMap hostMapping;
+ private final Logger logger = LoggerFactory.getLogger(DefaultHostMapping.class);
+
+ public DefaultHostMapping(Client etcdClient, String networkMappingName) {
+ this.etcdClient = etcdClient;
+ this.networkMappingName = networkMappingName;
+ this.hostMapping = new ConcurrentHashMap<>();
+ }
+
+ private ByteSequence hostToNetworkMappingKey(String host) {
+ String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host;
+ return ByteSequence.from(path, StandardCharsets.UTF_8);
+ }
+
+ @Beta
+ private String getMappedHostFromPD(String host) {
+ ByteSequence hostKey = hostToNetworkMappingKey(host);
+ for (int i = 0; i < 5; i++) {
+ CompletableFuture future = etcdClient.getKVClient().get(hostKey);
+ try {
+ GetResponse resp = future.get();
+ List kvs = resp.getKvs();
+ if (kvs.size() != 1) {
+ break;
+ }
+ return kvs.get(0).getValue().toString(StandardCharsets.UTF_8);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ logger.info("failed to get mapped Host from PD: " + host, e);
+ break;
+ } catch (Exception ignore) {
+ // ignore
+ break;
+ }
+ }
+ return host;
+ }
+
+ public URI getMappedURI(URI uri) {
+ if (networkMappingName.isEmpty()) {
+ return uri;
+ }
+ return addrToUri(
+ hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD)
+ + ":"
+ + uri.getPort());
+ }
+}
diff --git a/src/main/java/org/tikv/common/HostMapping.java b/src/main/java/org/tikv/common/HostMapping.java
index 08ae8048c7..e13eda8ff7 100644
--- a/src/main/java/org/tikv/common/HostMapping.java
+++ b/src/main/java/org/tikv/common/HostMapping.java
@@ -15,73 +15,9 @@
package org.tikv.common;
-import static org.tikv.common.pd.PDUtils.addrToUri;
-
-import com.google.common.annotations.Beta;
-import io.etcd.jetcd.ByteSequence;
-import io.etcd.jetcd.Client;
-import io.etcd.jetcd.KeyValue;
-import io.etcd.jetcd.kv.GetResponse;
+import java.io.Serializable;
import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class HostMapping {
- private static final String NETWORK_MAPPING_PATH = "/client/url-mapping";
- private final Client etcdClient;
- private final String networkMappingName;
- private final ConcurrentMap hostMapping;
- private final Logger logger = LoggerFactory.getLogger(HostMapping.class);
-
- public HostMapping(Client etcdClient, String networkMappingName) {
- this.etcdClient = etcdClient;
- this.networkMappingName = networkMappingName;
- this.hostMapping = new ConcurrentHashMap<>();
- }
-
- private ByteSequence hostToNetworkMappingKey(String host) {
- String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host;
- return ByteSequence.from(path, StandardCharsets.UTF_8);
- }
-
- @Beta
- private String getMappedHostFromPD(String host) {
- ByteSequence hostKey = hostToNetworkMappingKey(host);
- for (int i = 0; i < 5; i++) {
- CompletableFuture future = etcdClient.getKVClient().get(hostKey);
- try {
- GetResponse resp = future.get();
- List kvs = resp.getKvs();
- if (kvs.size() != 1) {
- break;
- }
- return kvs.get(0).getValue().toString(StandardCharsets.UTF_8);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- logger.info("failed to get mapped Host from PD: " + host, e);
- break;
- } catch (Exception ignore) {
- // ignore
- break;
- }
- }
- return host;
- }
-
- public URI getMappedURI(URI uri) {
- if (networkMappingName.isEmpty()) {
- return uri;
- }
- return addrToUri(
- hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD)
- + ":"
- + uri.getPort());
- }
+public interface HostMapping extends Serializable {
+ URI getMappedURI(URI uri);
}
diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java
index 3cd86ca2f5..47be732774 100644
--- a/src/main/java/org/tikv/common/PDClient.java
+++ b/src/main/java/org/tikv/common/PDClient.java
@@ -36,6 +36,7 @@ import io.prometheus.client.Histogram;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -49,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.codec.Codec.BytesCodec;
+import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException;
@@ -56,11 +58,12 @@ import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.operation.NoopHandler;
import org.tikv.common.operation.PDErrorHandler;
-import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction.BackOffFuncType;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
+import org.tikv.common.util.Pair;
+import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.PDGrpc.PDBlockingStub;
@@ -145,7 +148,7 @@ public class PDClient extends AbstractGRPCClient
*
* @param region represents a region info
*/
- void scatterRegion(TiRegion region, BackOffer backOffer) {
+ void scatterRegion(Metapb.Region region, BackOffer backOffer) {
Supplier request =
() ->
ScatterRegionRequest.newBuilder().setHeader(header).setRegionId(region.getId()).build();
@@ -169,7 +172,7 @@ public class PDClient extends AbstractGRPCClient
*
* @param region
*/
- void waitScatterRegionFinish(TiRegion region, BackOffer backOffer) {
+ void waitScatterRegionFinish(Metapb.Region region, BackOffer backOffer) {
for (; ; ) {
GetOperatorResponse resp = getOperator(region.getId());
if (resp != null) {
@@ -222,7 +225,7 @@ public class PDClient extends AbstractGRPCClient
}
@Override
- public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
+ public Pair getRegionByKey(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
try {
if (conf.getKvMode() == KVMode.TXN) {
@@ -240,21 +243,14 @@ public class PDClient extends AbstractGRPCClient
GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
- return new TiRegion(
- resp.getRegion(),
- resp.getLeader(),
- null,
- conf.getIsolationLevel(),
- conf.getCommandPriority(),
- conf.getKvMode(),
- conf.getReplicaSelector());
+ return new Pair(decodeRegion(resp.getRegion()), resp.getLeader());
} finally {
requestTimer.observeDuration();
}
}
@Override
- public TiRegion getRegionByID(BackOffer backOffer, long id) {
+ public Pair getRegionByID(BackOffer backOffer, long id) {
Supplier request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
PDErrorHandler handler =
@@ -262,15 +258,7 @@ public class PDClient extends AbstractGRPCClient
GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler);
- // Instead of using default leader instance, explicitly set no leader to null
- return new TiRegion(
- resp.getRegion(),
- resp.getLeader(),
- null,
- conf.getIsolationLevel(),
- conf.getCommandPriority(),
- conf.getKvMode(),
- conf.getReplicaSelector());
+ return new Pair(decodeRegion(resp.getRegion()), resp.getLeader());
}
private Supplier buildGetStoreReq(long storeId) {
@@ -569,12 +557,15 @@ public class PDClient extends AbstractGRPCClient
.setDaemon(true)
.build()))
.build();
- this.hostMapping = new HostMapping(this.etcdClient, conf.getNetworkMappingName());
+ this.hostMapping =
+ Optional.ofNullable(getConf().getHostMapping())
+ .orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName()));
for (URI u : pdAddrs) {
resp = getMembers(u);
if (resp != null) {
break;
}
+ logger.info("Could not get leader member with pd: " + u);
}
checkNotNull(resp, "Failed to init client for PD cluster.");
long clusterId = resp.getHeader().getClusterId();
@@ -668,4 +659,29 @@ public class PDClient extends AbstractGRPCClient
return "[leaderInfo: " + leaderInfo + "]";
}
}
+
+ private Metapb.Region decodeRegion(Metapb.Region region) {
+ final boolean isRawRegion = conf.getKvMode() == KVMode.RAW;
+ Metapb.Region.Builder builder =
+ Metapb.Region.newBuilder()
+ .setId(region.getId())
+ .setRegionEpoch(region.getRegionEpoch())
+ .addAllPeers(region.getPeersList());
+
+ if (region.getStartKey().isEmpty() || isRawRegion) {
+ builder.setStartKey(region.getStartKey());
+ } else {
+ byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
+ builder.setStartKey(ByteString.copyFrom(decodedStartKey));
+ }
+
+ if (region.getEndKey().isEmpty() || isRawRegion) {
+ builder.setEndKey(region.getEndKey());
+ } else {
+ byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
+ builder.setEndKey(ByteString.copyFrom(decodedEndKey));
+ }
+
+ return builder.build();
+ }
}
diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java
index 373209900e..63518b9377 100644
--- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java
+++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java
@@ -18,8 +18,9 @@ package org.tikv.common;
import com.google.protobuf.ByteString;
import java.util.List;
import org.tikv.common.meta.TiTimestamp;
-import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer;
+import org.tikv.common.util.Pair;
+import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
/** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */
@@ -37,7 +38,7 @@ public interface ReadOnlyPDClient {
* @param key key in bytes for locating a region
* @return the region whose startKey and endKey range covers the given key
*/
- TiRegion getRegionByKey(BackOffer backOffer, ByteString key);
+ Pair getRegionByKey(BackOffer backOffer, ByteString key);
/**
* Get Region by Region Id
@@ -45,7 +46,7 @@ public interface ReadOnlyPDClient {
* @param id Region Id
* @return the region corresponding to the given Id
*/
- TiRegion getRegionByID(BackOffer backOffer, long id);
+ Pair getRegionByID(BackOffer backOffer, long id);
HostMapping getHostMapping();
diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java
index cffafa53b4..a4a6a0226f 100644
--- a/src/main/java/org/tikv/common/TiConfiguration.java
+++ b/src/main/java/org/tikv/common/TiConfiguration.java
@@ -264,6 +264,7 @@ public class TiConfiguration implements Serializable {
private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT);
private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
+ private HostMapping hostMapping = null;
public enum KVMode {
TXN,
@@ -542,6 +543,14 @@ public class TiConfiguration implements Serializable {
return this.networkMappingName;
}
+ public HostMapping getHostMapping() {
+ return hostMapping;
+ }
+
+ public void setHostMapping(HostMapping mapping) {
+ this.hostMapping = mapping;
+ }
+
public boolean getEnableGrpcForward() {
return this.enableGrpcForward;
}
diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java
index dc50b6317a..f495c2bc44 100644
--- a/src/main/java/org/tikv/common/TiSession.java
+++ b/src/main/java/org/tikv/common/TiSession.java
@@ -41,6 +41,7 @@ import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.*;
+import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
import org.tikv.txn.KVClient;
import org.tikv.txn.TxnKVClient;
@@ -180,6 +181,7 @@ public class TiSession implements AutoCloseable {
if (regionManager == null) {
regionManager =
new RegionManager(
+ getConf(),
getPDClient(),
this.cacheInvalidateCallback,
this.channelFactory,
@@ -349,7 +351,7 @@ public class TiSession implements AutoCloseable {
long startMS = System.currentTimeMillis();
// split region
- List newRegions =
+ List newRegions =
splitRegion(
splitKeys
.stream()
@@ -358,7 +360,7 @@ public class TiSession implements AutoCloseable {
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS));
// scatter region
- for (TiRegion newRegion : newRegions) {
+ for (Metapb.Region newRegion : newRegions) {
try {
getPDClient()
.scatterRegion(newRegion, ConcreteBackOffer.newCustomBackOff(scatterRegionBackoffMS));
@@ -371,7 +373,7 @@ public class TiSession implements AutoCloseable {
if (scatterWaitMS > 0) {
logger.info("start to wait scatter region finish");
long scatterRegionStartMS = System.currentTimeMillis();
- for (TiRegion newRegion : newRegions) {
+ for (Metapb.Region newRegion : newRegions) {
long remainMS = (scatterRegionStartMS + scatterWaitMS) - System.currentTimeMillis();
if (remainMS <= 0) {
logger.warn("wait scatter region timeout");
@@ -388,8 +390,8 @@ public class TiSession implements AutoCloseable {
logger.info("splitRegionAndScatter cost {} seconds", (endMS - startMS) / 1000);
}
- private List splitRegion(List splitKeys, BackOffer backOffer) {
- List regions = new ArrayList<>();
+ private List splitRegion(List splitKeys, BackOffer backOffer) {
+ List regions = new ArrayList<>();
Map> groupKeys =
groupKeysByRegion(regionManager, splitKeys, backOffer);
@@ -411,7 +413,7 @@ public class TiSession implements AutoCloseable {
"split key equal to region start key or end key. Region splitting is not needed.");
} else {
logger.info("start to split region id={}, split size={}", region.getId(), splits.size());
- List newRegions;
+ List newRegions;
try {
newRegions = getRegionStoreClientBuilder().build(region, store).splitRegion(splits);
} catch (final TiKVException e) {
diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java
index 5bf7d86066..e4d769911e 100644
--- a/src/main/java/org/tikv/common/region/RegionManager.java
+++ b/src/main/java/org/tikv/common/region/RegionManager.java
@@ -32,9 +32,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
+import org.tikv.common.TiConfiguration;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException;
@@ -67,19 +69,22 @@ public class RegionManager {
// To avoid double retrieval, we used the async version of grpc
// When rpc not returned, instead of call again, it wait for previous one done
public RegionManager(
- ReadOnlyPDClient pdClient, Function cacheInvalidateCallback) {
- this.cache = new RegionCache(pdClient);
+ TiConfiguration conf,
+ ReadOnlyPDClient pdClient,
+ Function cacheInvalidateCallback) {
+ this.cache = new RegionCache(conf, pdClient);
this.cacheInvalidateCallback = cacheInvalidateCallback;
this.executor = null;
this.storeChecker = null;
}
public RegionManager(
+ TiConfiguration conf,
ReadOnlyPDClient pdClient,
Function cacheInvalidateCallback,
ChannelFactory channelFactory,
boolean enableGrpcForward) {
- this.cache = new RegionCache(pdClient);
+ this.cache = new RegionCache(conf, pdClient);
this.cacheInvalidateCallback = cacheInvalidateCallback;
if (enableGrpcForward) {
UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient);
@@ -92,8 +97,8 @@ public class RegionManager {
}
}
- public RegionManager(ReadOnlyPDClient pdClient) {
- this.cache = new RegionCache(pdClient);
+ public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
+ this.cache = new RegionCache(conf, pdClient);
this.cacheInvalidateCallback = null;
this.storeChecker = null;
this.executor = null;
@@ -252,12 +257,14 @@ public class RegionManager {
private final Map storeCache;
private final RangeMap keyToRegionIdCache;
private final ReadOnlyPDClient pdClient;
+ private final TiConfiguration conf;
- public RegionCache(ReadOnlyPDClient pdClient) {
+ public RegionCache(TiConfiguration conf, ReadOnlyPDClient pdClient) {
regionCache = new HashMap<>();
storeCache = new HashMap<>();
keyToRegionIdCache = TreeRangeMap.create();
+ this.conf = conf;
this.pdClient = pdClient;
}
@@ -278,7 +285,9 @@ public class RegionManager {
if (regionId == null) {
logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key));
- TiRegion region = pdClient.getRegionByKey(backOffer, key);
+ Pair regionAndLeader =
+ pdClient.getRegionByKey(backOffer, key);
+ TiRegion region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
if (!putRegion(region)) {
throw new TiClientInternalException("Invalid Region: " + region.toString());
}
@@ -312,7 +321,9 @@ public class RegionManager {
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
}
if (region == null) {
- region = pdClient.getRegionByID(backOffer, regionId);
+ Pair regionAndLeader =
+ pdClient.getRegionByID(backOffer, regionId);
+ region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
if (!putRegion(region)) {
throw new TiClientInternalException("Invalid Region: " + region.toString());
}
@@ -400,6 +411,19 @@ public class RegionManager {
}
}
+ private List getRegionStore(List peers, BackOffer backOffer) {
+ return peers
+ .stream()
+ .map(p -> getStoreById(p.getStoreId(), backOffer))
+ .collect(Collectors.toList());
+ }
+
+ private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
+ List peers = region.getPeersList();
+ List stores = getRegionStore(peers, backOffer);
+ return new TiRegion(conf, region, leader, peers, stores, null);
+ }
+
public synchronized void clearAll() {
keyToRegionIdCache.clear();
regionCache.clear();
diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java
index a02c0e86ed..f6af0684a3 100644
--- a/src/main/java/org/tikv/common/region/RegionStoreClient.java
+++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java
@@ -31,7 +31,6 @@ import io.grpc.stub.MetadataUtils;
import io.prometheus.client.Histogram;
import java.util.*;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.PDClient;
@@ -46,6 +45,7 @@ import org.tikv.common.util.*;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Kvrpcpb.*;
+import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub;
@@ -747,7 +747,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
* @param splitKeys is the split points for a specific region.
* @return a split region info.
*/
- public List splitRegion(Iterable splitKeys) {
+ public List splitRegion(Iterable splitKeys) {
Supplier request =
() ->
SplitRegionRequest.newBuilder()
@@ -782,19 +782,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
region.getId(), resp.getRegionError().toString()));
}
- return resp.getRegionsList()
- .stream()
- .map(
- region ->
- new TiRegion(
- region,
- null,
- null,
- conf.getIsolationLevel(),
- conf.getCommandPriority(),
- conf.getKvMode(),
- conf.getReplicaSelector()))
- .collect(Collectors.toList());
+ return resp.getRegionsList();
}
// APIs for Raw Scan/Put/Get/Delete
diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java
index 1e8a8c9532..2fbb980612 100644
--- a/src/main/java/org/tikv/common/region/TiRegion.java
+++ b/src/main/java/org/tikv/common/region/TiRegion.java
@@ -23,11 +23,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.tikv.common.TiConfiguration.KVMode;
-import org.tikv.common.codec.Codec.BytesCodec;
-import org.tikv.common.codec.CodecDataInput;
+import org.tikv.common.TiConfiguration;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.key.Key;
@@ -44,29 +43,31 @@ public class TiRegion implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(TiRegion.class);
private final Region meta;
- private final KVMode kvMode;
private final IsolationLevel isolationLevel;
private final Kvrpcpb.CommandPri commandPri;
+ private final TiConfiguration conf;
private final Peer leader;
private final ReplicaSelector replicaSelector;
private final List replicaList;
private final TiStore proxyStore;
private int replicaIdx;
+ private final List peers;
+ private final List stores;
public TiRegion(
+ TiConfiguration conf,
Region meta,
Peer leader,
- TiStore proxyStore,
- IsolationLevel isolationLevel,
- Kvrpcpb.CommandPri commandPri,
- KVMode kvMode,
- ReplicaSelector replicaSelector) {
- Objects.requireNonNull(meta, "meta is null");
- this.meta = decodeRegion(meta, kvMode == KVMode.RAW);
- this.kvMode = kvMode;
- this.isolationLevel = isolationLevel;
- this.commandPri = commandPri;
- this.replicaSelector = replicaSelector;
+ List peers,
+ List stores,
+ TiStore proxyStore) {
+ this.conf = Objects.requireNonNull(conf, "conf is null");
+ this.meta = Objects.requireNonNull(meta, "meta is null");
+ this.isolationLevel = conf.getIsolationLevel();
+ this.commandPri = conf.getCommandPriority();
+ this.peers = peers;
+ this.stores = stores;
+ this.replicaSelector = conf.getReplicaSelector();
this.proxyStore = proxyStore;
if (leader == null || leader.getId() == 0) {
if (meta.getPeersCount() == 0) {
@@ -79,34 +80,15 @@ public class TiRegion implements Serializable {
}
// init replicaList
- replicaList = replicaSelector.select(this.leader, getFollowerList(), getLearnerList());
+ replicaList =
+ replicaSelector
+ .select(new org.tikv.common.replica.Region(meta, this.leader, peers, stores))
+ .stream()
+ .map(org.tikv.common.replica.Store::getPeer)
+ .collect(Collectors.toList());
replicaIdx = 0;
}
- private Region decodeRegion(Region region, boolean isRawRegion) {
- Region.Builder builder =
- Region.newBuilder()
- .setId(region.getId())
- .setRegionEpoch(region.getRegionEpoch())
- .addAllPeers(region.getPeersList());
-
- if (region.getStartKey().isEmpty() || isRawRegion) {
- builder.setStartKey(region.getStartKey());
- } else {
- byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
- builder.setStartKey(ByteString.copyFrom(decodedStartKey));
- }
-
- if (region.getEndKey().isEmpty() || isRawRegion) {
- builder.setEndKey(region.getEndKey());
- } else {
- byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
- builder.setEndKey(ByteString.copyFrom(decodedEndKey));
- }
-
- return builder.build();
- }
-
public Peer getLeader() {
return leader;
}
@@ -180,7 +162,7 @@ public class TiRegion implements Serializable {
private Kvrpcpb.Context getContext(
Peer currentPeer, Set resolvedLocks, TiStoreType storeType) {
- boolean replicaRead = !isLeader(getCurrentReplica()) && TiStoreType.TiKV.equals(storeType);
+ boolean replicaRead = !isLeader(currentPeer) && TiStoreType.TiKV.equals(storeType);
Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder();
builder
@@ -215,28 +197,14 @@ public class TiRegion implements Serializable {
List peers = meta.getPeersList();
for (Peer p : peers) {
if (p.getStoreId() == leaderStoreID) {
- return new TiRegion(
- this.meta,
- p,
- this.proxyStore,
- this.isolationLevel,
- this.commandPri,
- this.kvMode,
- this.replicaSelector);
+ return new TiRegion(this.conf, this.meta, p, peers, this.stores, this.proxyStore);
}
}
return null;
}
public TiRegion switchProxyStore(TiStore store) {
- return new TiRegion(
- this.meta,
- this.leader,
- store,
- this.isolationLevel,
- this.commandPri,
- this.kvMode,
- this.replicaSelector);
+ return new TiRegion(this.conf, this.meta, this.leader, this.peers, this.stores, store);
}
public boolean isMoreThan(ByteString key) {
diff --git a/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java b/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java
index 64aa5cdfe5..c0c5c12542 100644
--- a/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java
+++ b/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java
@@ -18,13 +18,18 @@ package org.tikv.common.replica;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.tikv.kvproto.Metapb;
public class FollowerReplicaSelector implements ReplicaSelector {
@Override
- public List select(
- Metapb.Peer leader, List followers, List learners) {
- List list = new ArrayList<>(followers);
+ public List select(Region region) {
+ Store[] stores = region.getStores();
+ Store leader = region.getLeader();
+ List list = new ArrayList<>(stores.length);
+ for (Store store : stores) {
+ if (!store.isLearner() && !leader.equals(store)) {
+ list.add(store);
+ }
+ }
Collections.shuffle(list);
return list;
}
diff --git a/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java b/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java
index 52845d11ae..94b1aee2c8 100644
--- a/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java
+++ b/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java
@@ -18,13 +18,18 @@ package org.tikv.common.replica;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.tikv.kvproto.Metapb;
public class LeaderFollowerReplicaSelector implements ReplicaSelector {
@Override
- public List select(
- Metapb.Peer leader, List followers, List learners) {
- List list = new ArrayList<>(followers);
+ public List select(Region region) {
+ Store[] stores = region.getStores();
+ Store leader = region.getLeader();
+ List list = new ArrayList<>(stores.length);
+ for (Store store : stores) {
+ if (!store.isLearner() && !leader.equals(store)) {
+ list.add(store);
+ }
+ }
Collections.shuffle(list);
list.add(leader);
return list;
diff --git a/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java b/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java
index 0dc2bfc3a5..e654e62178 100644
--- a/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java
+++ b/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java
@@ -17,14 +17,12 @@ package org.tikv.common.replica;
import java.util.ArrayList;
import java.util.List;
-import org.tikv.kvproto.Metapb;
public class LeaderReplicaSelector implements ReplicaSelector {
@Override
- public List select(
- Metapb.Peer leader, List followers, List learners) {
- List list = new ArrayList<>();
- list.add(leader);
+ public List select(Region region) {
+ List list = new ArrayList<>(1);
+ list.add(region.getLeader());
return list;
}
}
diff --git a/src/main/java/org/tikv/common/replica/Region.java b/src/main/java/org/tikv/common/replica/Region.java
new file mode 100644
index 0000000000..b87c0552a6
--- /dev/null
+++ b/src/main/java/org/tikv/common/replica/Region.java
@@ -0,0 +1,57 @@
+package org.tikv.common.replica;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+import java.util.Iterator;
+import java.util.List;
+import org.tikv.common.region.TiStore;
+import org.tikv.kvproto.Metapb;
+
+public class Region {
+ private final Metapb.Region region;
+ private final Store[] stores;
+ private Store leaderStore;
+
+ public Region(
+ final Metapb.Region region,
+ final Metapb.Peer leader,
+ final List peers,
+ final List stores) {
+ this.region = region;
+ this.stores = new Store[stores.size()];
+ Iterator peer = peers.iterator();
+ Iterator store = stores.iterator();
+ for (int idx = 0; idx < peers.size(); idx++) {
+ Metapb.Peer currentPeer = peer.next();
+ boolean isLeader = currentPeer.equals(leader);
+ this.stores[idx] = new Store(currentPeer, store.next().getStore(), isLeader);
+ if (isLeader) {
+ leaderStore = this.stores[idx];
+ }
+ }
+ }
+
+ public Store[] getStores() {
+ return stores;
+ }
+
+ public Store getLeader() {
+ return leaderStore;
+ }
+
+ public long getId() {
+ return region.getId();
+ }
+
+ public byte[] getStartKey() {
+ return region.getStartKey().toByteArray();
+ }
+
+ public byte[] getEndKey() {
+ return region.getEndKey().toByteArray();
+ }
+
+ public String toString() {
+ return toStringHelper(this).add("region", region).add("stores", stores).toString();
+ }
+}
diff --git a/src/main/java/org/tikv/common/replica/ReplicaSelector.java b/src/main/java/org/tikv/common/replica/ReplicaSelector.java
index 144a6956df..ad4d6609b1 100644
--- a/src/main/java/org/tikv/common/replica/ReplicaSelector.java
+++ b/src/main/java/org/tikv/common/replica/ReplicaSelector.java
@@ -17,13 +17,11 @@ package org.tikv.common.replica;
import java.io.Serializable;
import java.util.List;
-import org.tikv.kvproto.Metapb;
public interface ReplicaSelector extends Serializable {
- public static final ReplicaSelector LEADER = new LeaderReplicaSelector();
- public static final ReplicaSelector FOLLOWER = new FollowerReplicaSelector();
- public static final ReplicaSelector LEADER_AND_FOLLOWER = new LeaderFollowerReplicaSelector();
+ ReplicaSelector LEADER = new LeaderReplicaSelector();
+ ReplicaSelector FOLLOWER = new FollowerReplicaSelector();
+ ReplicaSelector LEADER_AND_FOLLOWER = new LeaderFollowerReplicaSelector();
- List select(
- Metapb.Peer leader, List followers, List learners);
+ List select(Region region);
}
diff --git a/src/main/java/org/tikv/common/replica/Store.java b/src/main/java/org/tikv/common/replica/Store.java
new file mode 100644
index 0000000000..6881d4be91
--- /dev/null
+++ b/src/main/java/org/tikv/common/replica/Store.java
@@ -0,0 +1,111 @@
+package org.tikv.common.replica;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+import java.util.List;
+import org.tikv.kvproto.Metapb;
+
+public class Store {
+ public static class Label {
+ private final org.tikv.kvproto.Metapb.StoreLabel label;
+
+ Label(org.tikv.kvproto.Metapb.StoreLabel label) {
+ this.label = label;
+ }
+
+ public String getKey() {
+ return label.getKey();
+ }
+
+ public String getValue() {
+ return label.getValue();
+ }
+ }
+
+ public enum State {
+ Unknown,
+ Up,
+ Offline,
+ Tombstone
+ }
+
+ private static final Label[] EMPTY_LABELS = new Label[0];
+ private Label[] labels;
+ private final Metapb.Peer peer;
+ private final Metapb.Store store;
+ private final boolean isLeader;
+
+ Store(
+ final org.tikv.kvproto.Metapb.Peer peer,
+ final org.tikv.kvproto.Metapb.Store store,
+ boolean isLeader) {
+ this.peer = peer;
+ this.store = store;
+ this.isLeader = isLeader;
+ }
+
+ public Metapb.Peer getPeer() {
+ return peer;
+ }
+
+ public Label[] getLabels() {
+ if (labels == null) {
+ List labelList = store.getLabelsList();
+ if (labelList.isEmpty()) {
+ labels = EMPTY_LABELS;
+ } else {
+ labels = labelList.stream().map(Label::new).toArray(Label[]::new);
+ }
+ }
+ return labels;
+ }
+
+ public boolean isLearner() {
+ return peer.getRole() == Metapb.PeerRole.Learner;
+ }
+
+ public boolean isLeader() {
+ return isLeader;
+ }
+
+ public boolean isFollower() {
+ return peer.getRole() == Metapb.PeerRole.Voter && !isLeader;
+ }
+
+ public long getId() {
+ return store.getId();
+ }
+
+ public String getAddress() {
+ return store.getAddress();
+ }
+
+ public String getVersion() {
+ return store.getVersion();
+ }
+
+ public State getState() {
+ switch (store.getState()) {
+ case Up:
+ return State.Up;
+ case Offline:
+ return State.Offline;
+ case Tombstone:
+ return State.Tombstone;
+ default:
+ return State.Unknown;
+ }
+ }
+
+ public boolean equals(Object o) {
+ if (!(o instanceof Store)) {
+ return false;
+ }
+ Store other = (Store) o;
+ return this.peer.equals(other.peer);
+ }
+
+ public String toString() {
+ return toStringHelper(this).add("peer", peer).add("store", store).toString();
+ }
+}
diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java
index a69e10ba38..d3fd36f1fe 100644
--- a/src/test/java/org/tikv/common/MockServerTest.java
+++ b/src/test/java/org/tikv/common/MockServerTest.java
@@ -1,11 +1,13 @@
package org.tikv.common;
+import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
import org.junit.Before;
-import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.region.TiRegion;
-import org.tikv.common.replica.ReplicaSelector;
+import org.tikv.common.region.TiStore;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb;
@@ -28,16 +30,26 @@ public class MockServerTest extends PDMockServerTest {
.addPeers(Metapb.Peer.newBuilder().setId(11).setStoreId(13))
.build();
+ List s =
+ ImmutableList.of(
+ Metapb.Store.newBuilder()
+ .setAddress("localhost:1234")
+ .setVersion("5.0.0")
+ .setId(13)
+ .build());
+
region =
new TiRegion(
+ session.getConf(),
r,
r.getPeers(0),
- null,
- session.getConf().getIsolationLevel(),
- session.getConf().getCommandPriority(),
- KVMode.TXN,
- ReplicaSelector.LEADER);
+ r.getPeersList(),
+ s.stream().map(TiStore::new).collect(Collectors.toList()),
+ null);
pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
+ for (Metapb.Store store : s) {
+ pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
+ }
server = new KVMockServer();
port = server.start(region);
}
diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java
index ec76b5936c..bd49dce39e 100644
--- a/src/test/java/org/tikv/common/PDClientTest.java
+++ b/src/test/java/org/tikv/common/PDClientTest.java
@@ -23,9 +23,9 @@ import java.util.concurrent.*;
import org.junit.Test;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.meta.TiTimestamp;
-import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
+import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreState;
@@ -85,13 +85,16 @@ public class PDClientTest extends PDMockServerTest {
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
try (PDClient client = session.getPDClient()) {
- TiRegion r = client.getRegionByKey(defaultBackOff(), ByteString.EMPTY);
+ Pair rl =
+ client.getRegionByKey(defaultBackOff(), ByteString.EMPTY);
+ Metapb.Region r = rl.first;
+ Metapb.Peer l = rl.second;
assertEquals(r.getStartKey(), ByteString.copyFrom(startKey));
assertEquals(r.getEndKey(), ByteString.copyFrom(endKey));
assertEquals(r.getRegionEpoch().getConfVer(), confVer);
assertEquals(r.getRegionEpoch().getVersion(), ver);
- assertEquals(r.getLeader().getId(), 1);
- assertEquals(r.getLeader().getStoreId(), 10);
+ assertEquals(l.getId(), 1);
+ assertEquals(l.getStoreId(), 10);
}
}
@@ -113,13 +116,15 @@ public class PDClientTest extends PDMockServerTest {
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
try (PDClient client = session.getPDClient()) {
- TiRegion r = client.getRegionByID(defaultBackOff(), 0);
+ Pair rl = client.getRegionByID(defaultBackOff(), 0);
+ Metapb.Region r = rl.first;
+ Metapb.Peer l = rl.second;
assertEquals(r.getStartKey(), ByteString.copyFrom(startKey));
assertEquals(r.getEndKey(), ByteString.copyFrom(endKey));
assertEquals(r.getRegionEpoch().getConfVer(), confVer);
assertEquals(r.getRegionEpoch().getVersion(), ver);
- assertEquals(r.getLeader().getId(), 1);
- assertEquals(r.getLeader().getStoreId(), 10);
+ assertEquals(l.getId(), 1);
+ assertEquals(l.getStoreId(), 10);
}
}
diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java
index 112d5cb0ec..003830c3eb 100644
--- a/src/test/java/org/tikv/common/RegionManagerTest.java
+++ b/src/test/java/org/tikv/common/RegionManagerTest.java
@@ -61,6 +61,7 @@ public class RegionManagerTest extends PDMockServerTest {
int confVer = 1026;
int ver = 1027;
long regionId = 233;
+ String testAddress = "testAddress";
pdServer.addGetRegionResp(
GrpcUtils.makeGetRegionResponse(
pdServer.getClusterId(),
@@ -71,6 +72,18 @@ public class RegionManagerTest extends PDMockServerTest {
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
+ for (long id : new long[] {10, 20}) {
+ pdServer.addGetStoreResp(
+ GrpcUtils.makeGetStoreResponse(
+ pdServer.getClusterId(),
+ GrpcUtils.makeStore(
+ id,
+ testAddress,
+ Metapb.StoreState.Up,
+ GrpcUtils.makeStoreLabel("k1", "v1"),
+ GrpcUtils.makeStoreLabel("k2", "v2"))));
+ }
+
TiRegion region = mgr.getRegionByKey(startKey);
assertEquals(region.getId(), regionId);
@@ -106,15 +119,18 @@ public class RegionManagerTest extends PDMockServerTest {
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(storeId, 10),
GrpcUtils.makePeer(storeId + 1, 20))));
- pdServer.addGetStoreResp(
- GrpcUtils.makeGetStoreResponse(
- pdServer.getClusterId(),
- GrpcUtils.makeStore(
- storeId,
- testAddress,
- Metapb.StoreState.Up,
- GrpcUtils.makeStoreLabel("k1", "v1"),
- GrpcUtils.makeStoreLabel("k2", "v2"))));
+ for (long id : new long[] {10, 20}) {
+ pdServer.addGetStoreResp(
+ GrpcUtils.makeGetStoreResponse(
+ pdServer.getClusterId(),
+ GrpcUtils.makeStore(
+ id,
+ testAddress,
+ Metapb.StoreState.Up,
+ GrpcUtils.makeStoreLabel("k1", "v1"),
+ GrpcUtils.makeStoreLabel("k2", "v2"))));
+ }
+
Pair pair = mgr.getRegionStorePairByKey(searchKey);
assertEquals(pair.first.getId(), regionId);
assertEquals(pair.first.getId(), storeId);
diff --git a/src/test/java/org/tikv/common/RegionStoreClientTest.java b/src/test/java/org/tikv/common/RegionStoreClientTest.java
index ec3fe88ccf..76d006990b 100644
--- a/src/test/java/org/tikv/common/RegionStoreClientTest.java
+++ b/src/test/java/org/tikv/common/RegionStoreClientTest.java
@@ -54,7 +54,7 @@ public class RegionStoreClientTest extends MockServerTest {
new RegionStoreClientBuilder(
session.getConf(),
session.getChannelFactory(),
- new RegionManager(session.getPDClient()),
+ new RegionManager(session.getConf(), session.getPDClient()),
session.getPDClient());
return builder.build(region, store);
diff --git a/src/test/java/org/tikv/txn/ReplicaReadTest.java b/src/test/java/org/tikv/txn/ReplicaReadTest.java
index 1baaea2cc6..acfa2e97ba 100644
--- a/src/test/java/org/tikv/txn/ReplicaReadTest.java
+++ b/src/test/java/org/tikv/txn/ReplicaReadTest.java
@@ -10,8 +10,9 @@ import org.junit.Ignore;
import org.junit.Test;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
+import org.tikv.common.replica.Region;
import org.tikv.common.replica.ReplicaSelector;
-import org.tikv.kvproto.Metapb;
+import org.tikv.common.replica.Store;
public class ReplicaReadTest extends TXNTest {
private TiSession session;
@@ -41,12 +42,11 @@ public class ReplicaReadTest extends TXNTest {
conf.setReplicaSelector(
new ReplicaSelector() {
@Override
- public List select(
- Metapb.Peer leader, List followers, List learners) {
- List list = new ArrayList<>();
- list.addAll(followers);
- list.addAll(learners);
- list.add(leader);
+ public List select(Region region) {
+ List list = new ArrayList<>();
+ for (Store store : region.getStores()) {
+ list.add(store);
+ }
return list;
}
});