Support select replica with rich meta data (#171) (#225)

* cherry pick #171 to release-3.1

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

* fix test

Signed-off-by: birdstorm <samuelwyf@hotmail.com>

Co-authored-by: Xiaoguang Sun <sunxiaoguang@users.noreply.github.com>
Co-authored-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
ti-srebot 2021-07-01 05:38:48 +08:00 committed by GitHub
parent 8210eaf94d
commit e02e61c412
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 468 additions and 230 deletions

View File

@ -185,7 +185,7 @@
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
<version>3.9</version> <version>3.10</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -0,0 +1,87 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common;
import static org.tikv.common.pd.PDUtils.addrToUri;
import com.google.common.annotations.Beta;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.kv.GetResponse;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultHostMapping implements HostMapping {
private static final String NETWORK_MAPPING_PATH = "/client/url-mapping";
private final Client etcdClient;
private final String networkMappingName;
private final ConcurrentMap<String, String> hostMapping;
private final Logger logger = LoggerFactory.getLogger(DefaultHostMapping.class);
public DefaultHostMapping(Client etcdClient, String networkMappingName) {
this.etcdClient = etcdClient;
this.networkMappingName = networkMappingName;
this.hostMapping = new ConcurrentHashMap<>();
}
private ByteSequence hostToNetworkMappingKey(String host) {
String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host;
return ByteSequence.from(path, StandardCharsets.UTF_8);
}
@Beta
private String getMappedHostFromPD(String host) {
ByteSequence hostKey = hostToNetworkMappingKey(host);
for (int i = 0; i < 5; i++) {
CompletableFuture<GetResponse> future = etcdClient.getKVClient().get(hostKey);
try {
GetResponse resp = future.get();
List<KeyValue> kvs = resp.getKvs();
if (kvs.size() != 1) {
break;
}
return kvs.get(0).getValue().toString(StandardCharsets.UTF_8);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
logger.info("failed to get mapped Host from PD: " + host, e);
break;
} catch (Exception ignore) {
// ignore
break;
}
}
return host;
}
public URI getMappedURI(URI uri) {
if (networkMappingName.isEmpty()) {
return uri;
}
return addrToUri(
hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD)
+ ":"
+ uri.getPort());
}
}

View File

@ -15,73 +15,9 @@
package org.tikv.common; package org.tikv.common;
import static org.tikv.common.pd.PDUtils.addrToUri; import java.io.Serializable;
import com.google.common.annotations.Beta;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.kv.GetResponse;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HostMapping { public interface HostMapping extends Serializable {
private static final String NETWORK_MAPPING_PATH = "/client/url-mapping"; URI getMappedURI(URI uri);
private final Client etcdClient;
private final String networkMappingName;
private final ConcurrentMap<String, String> hostMapping;
private final Logger logger = LoggerFactory.getLogger(HostMapping.class);
public HostMapping(Client etcdClient, String networkMappingName) {
this.etcdClient = etcdClient;
this.networkMappingName = networkMappingName;
this.hostMapping = new ConcurrentHashMap<>();
}
private ByteSequence hostToNetworkMappingKey(String host) {
String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host;
return ByteSequence.from(path, StandardCharsets.UTF_8);
}
@Beta
private String getMappedHostFromPD(String host) {
ByteSequence hostKey = hostToNetworkMappingKey(host);
for (int i = 0; i < 5; i++) {
CompletableFuture<GetResponse> future = etcdClient.getKVClient().get(hostKey);
try {
GetResponse resp = future.get();
List<KeyValue> kvs = resp.getKvs();
if (kvs.size() != 1) {
break;
}
return kvs.get(0).getValue().toString(StandardCharsets.UTF_8);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
logger.info("failed to get mapped Host from PD: " + host, e);
break;
} catch (Exception ignore) {
// ignore
break;
}
}
return host;
}
public URI getMappedURI(URI uri) {
if (networkMappingName.isEmpty()) {
return uri;
}
return addrToUri(
hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD)
+ ":"
+ uri.getPort());
}
} }

View File

@ -36,6 +36,7 @@ import io.prometheus.client.Histogram;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -49,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.codec.Codec.BytesCodec; import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.CodecDataOutput; import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.codec.KeyUtils; import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.GrpcException;
@ -56,11 +58,12 @@ import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.meta.TiTimestamp; import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.operation.NoopHandler; import org.tikv.common.operation.NoopHandler;
import org.tikv.common.operation.PDErrorHandler; import org.tikv.common.operation.PDErrorHandler;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction.BackOffFuncType; import org.tikv.common.util.BackOffFunction.BackOffFuncType;
import org.tikv.common.util.BackOffer; import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.PDGrpc; import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.PDGrpc.PDBlockingStub; import org.tikv.kvproto.PDGrpc.PDBlockingStub;
@ -145,7 +148,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
* *
* @param region represents a region info * @param region represents a region info
*/ */
void scatterRegion(TiRegion region, BackOffer backOffer) { void scatterRegion(Metapb.Region region, BackOffer backOffer) {
Supplier<ScatterRegionRequest> request = Supplier<ScatterRegionRequest> request =
() -> () ->
ScatterRegionRequest.newBuilder().setHeader(header).setRegionId(region.getId()).build(); ScatterRegionRequest.newBuilder().setHeader(header).setRegionId(region.getId()).build();
@ -169,7 +172,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
* *
* @param region * @param region
*/ */
void waitScatterRegionFinish(TiRegion region, BackOffer backOffer) { void waitScatterRegionFinish(Metapb.Region region, BackOffer backOffer) {
for (; ; ) { for (; ; ) {
GetOperatorResponse resp = getOperator(region.getId()); GetOperatorResponse resp = getOperator(region.getId());
if (resp != null) { if (resp != null) {
@ -222,7 +225,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
} }
@Override @Override
public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) { public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
try { try {
if (conf.getKvMode() == KVMode.TXN) { if (conf.getKvMode() == KVMode.TXN) {
@ -240,21 +243,14 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
GetRegionResponse resp = GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler); callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
return new TiRegion( return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.getReplicaSelector());
} finally { } finally {
requestTimer.observeDuration(); requestTimer.observeDuration();
} }
} }
@Override @Override
public TiRegion getRegionByID(BackOffer backOffer, long id) { public Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id) {
Supplier<GetRegionByIDRequest> request = Supplier<GetRegionByIDRequest> request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build(); () -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
PDErrorHandler<GetRegionResponse> handler = PDErrorHandler<GetRegionResponse> handler =
@ -262,15 +258,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
GetRegionResponse resp = GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler); callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler);
// Instead of using default leader instance, explicitly set no leader to null return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.getReplicaSelector());
} }
private Supplier<GetStoreRequest> buildGetStoreReq(long storeId) { private Supplier<GetStoreRequest> buildGetStoreReq(long storeId) {
@ -569,12 +557,15 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
.setDaemon(true) .setDaemon(true)
.build())) .build()))
.build(); .build();
this.hostMapping = new HostMapping(this.etcdClient, conf.getNetworkMappingName()); this.hostMapping =
Optional.ofNullable(getConf().getHostMapping())
.orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName()));
for (URI u : pdAddrs) { for (URI u : pdAddrs) {
resp = getMembers(u); resp = getMembers(u);
if (resp != null) { if (resp != null) {
break; break;
} }
logger.info("Could not get leader member with pd: " + u);
} }
checkNotNull(resp, "Failed to init client for PD cluster."); checkNotNull(resp, "Failed to init client for PD cluster.");
long clusterId = resp.getHeader().getClusterId(); long clusterId = resp.getHeader().getClusterId();
@ -668,4 +659,29 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
return "[leaderInfo: " + leaderInfo + "]"; return "[leaderInfo: " + leaderInfo + "]";
} }
} }
private Metapb.Region decodeRegion(Metapb.Region region) {
final boolean isRawRegion = conf.getKvMode() == KVMode.RAW;
Metapb.Region.Builder builder =
Metapb.Region.newBuilder()
.setId(region.getId())
.setRegionEpoch(region.getRegionEpoch())
.addAllPeers(region.getPeersList());
if (region.getStartKey().isEmpty() || isRawRegion) {
builder.setStartKey(region.getStartKey());
} else {
byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
builder.setStartKey(ByteString.copyFrom(decodedStartKey));
}
if (region.getEndKey().isEmpty() || isRawRegion) {
builder.setEndKey(region.getEndKey());
} else {
byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
builder.setEndKey(ByteString.copyFrom(decodedEndKey));
}
return builder.build();
}
} }

View File

@ -18,8 +18,9 @@ package org.tikv.common;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import java.util.List; import java.util.List;
import org.tikv.common.meta.TiTimestamp; import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer; import org.tikv.common.util.BackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.Metapb.Store;
/** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */ /** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */
@ -37,7 +38,7 @@ public interface ReadOnlyPDClient {
* @param key key in bytes for locating a region * @param key key in bytes for locating a region
* @return the region whose startKey and endKey range covers the given key * @return the region whose startKey and endKey range covers the given key
*/ */
TiRegion getRegionByKey(BackOffer backOffer, ByteString key); Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key);
/** /**
* Get Region by Region Id * Get Region by Region Id
@ -45,7 +46,7 @@ public interface ReadOnlyPDClient {
* @param id Region Id * @param id Region Id
* @return the region corresponding to the given Id * @return the region corresponding to the given Id
*/ */
TiRegion getRegionByID(BackOffer backOffer, long id); Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id);
HostMapping getHostMapping(); HostMapping getHostMapping();

View File

@ -264,6 +264,7 @@ public class TiConfiguration implements Serializable {
private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT); private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT);
private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME); private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
private HostMapping hostMapping = null;
public enum KVMode { public enum KVMode {
TXN, TXN,
@ -542,6 +543,14 @@ public class TiConfiguration implements Serializable {
return this.networkMappingName; return this.networkMappingName;
} }
public HostMapping getHostMapping() {
return hostMapping;
}
public void setHostMapping(HostMapping mapping) {
this.hostMapping = mapping;
}
public boolean getEnableGrpcForward() { public boolean getEnableGrpcForward() {
return this.enableGrpcForward; return this.enableGrpcForward;
} }

View File

@ -41,6 +41,7 @@ import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore; import org.tikv.common.region.TiStore;
import org.tikv.common.util.*; import org.tikv.common.util.*;
import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient; import org.tikv.raw.RawKVClient;
import org.tikv.txn.KVClient; import org.tikv.txn.KVClient;
import org.tikv.txn.TxnKVClient; import org.tikv.txn.TxnKVClient;
@ -180,6 +181,7 @@ public class TiSession implements AutoCloseable {
if (regionManager == null) { if (regionManager == null) {
regionManager = regionManager =
new RegionManager( new RegionManager(
getConf(),
getPDClient(), getPDClient(),
this.cacheInvalidateCallback, this.cacheInvalidateCallback,
this.channelFactory, this.channelFactory,
@ -349,7 +351,7 @@ public class TiSession implements AutoCloseable {
long startMS = System.currentTimeMillis(); long startMS = System.currentTimeMillis();
// split region // split region
List<TiRegion> newRegions = List<Metapb.Region> newRegions =
splitRegion( splitRegion(
splitKeys splitKeys
.stream() .stream()
@ -358,7 +360,7 @@ public class TiSession implements AutoCloseable {
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS)); ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS));
// scatter region // scatter region
for (TiRegion newRegion : newRegions) { for (Metapb.Region newRegion : newRegions) {
try { try {
getPDClient() getPDClient()
.scatterRegion(newRegion, ConcreteBackOffer.newCustomBackOff(scatterRegionBackoffMS)); .scatterRegion(newRegion, ConcreteBackOffer.newCustomBackOff(scatterRegionBackoffMS));
@ -371,7 +373,7 @@ public class TiSession implements AutoCloseable {
if (scatterWaitMS > 0) { if (scatterWaitMS > 0) {
logger.info("start to wait scatter region finish"); logger.info("start to wait scatter region finish");
long scatterRegionStartMS = System.currentTimeMillis(); long scatterRegionStartMS = System.currentTimeMillis();
for (TiRegion newRegion : newRegions) { for (Metapb.Region newRegion : newRegions) {
long remainMS = (scatterRegionStartMS + scatterWaitMS) - System.currentTimeMillis(); long remainMS = (scatterRegionStartMS + scatterWaitMS) - System.currentTimeMillis();
if (remainMS <= 0) { if (remainMS <= 0) {
logger.warn("wait scatter region timeout"); logger.warn("wait scatter region timeout");
@ -388,8 +390,8 @@ public class TiSession implements AutoCloseable {
logger.info("splitRegionAndScatter cost {} seconds", (endMS - startMS) / 1000); logger.info("splitRegionAndScatter cost {} seconds", (endMS - startMS) / 1000);
} }
private List<TiRegion> splitRegion(List<ByteString> splitKeys, BackOffer backOffer) { private List<Metapb.Region> splitRegion(List<ByteString> splitKeys, BackOffer backOffer) {
List<TiRegion> regions = new ArrayList<>(); List<Metapb.Region> regions = new ArrayList<>();
Map<TiRegion, List<ByteString>> groupKeys = Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(regionManager, splitKeys, backOffer); groupKeysByRegion(regionManager, splitKeys, backOffer);
@ -411,7 +413,7 @@ public class TiSession implements AutoCloseable {
"split key equal to region start key or end key. Region splitting is not needed."); "split key equal to region start key or end key. Region splitting is not needed.");
} else { } else {
logger.info("start to split region id={}, split size={}", region.getId(), splits.size()); logger.info("start to split region id={}, split size={}", region.getId(), splits.size());
List<TiRegion> newRegions; List<Metapb.Region> newRegions;
try { try {
newRegions = getRegionStoreClientBuilder().build(region, store).splitRegion(splits); newRegions = getRegionStoreClientBuilder().build(region, store).splitRegion(splits);
} catch (final TiKVException e) { } catch (final TiKVException e) {

View File

@ -32,9 +32,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.event.CacheInvalidateEvent; import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.exception.TiClientInternalException;
@ -67,19 +69,22 @@ public class RegionManager {
// To avoid double retrieval, we used the async version of grpc // To avoid double retrieval, we used the async version of grpc
// When rpc not returned, instead of call again, it wait for previous one done // When rpc not returned, instead of call again, it wait for previous one done
public RegionManager( public RegionManager(
ReadOnlyPDClient pdClient, Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) { TiConfiguration conf,
this.cache = new RegionCache(pdClient); ReadOnlyPDClient pdClient,
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
this.cache = new RegionCache(conf, pdClient);
this.cacheInvalidateCallback = cacheInvalidateCallback; this.cacheInvalidateCallback = cacheInvalidateCallback;
this.executor = null; this.executor = null;
this.storeChecker = null; this.storeChecker = null;
} }
public RegionManager( public RegionManager(
TiConfiguration conf,
ReadOnlyPDClient pdClient, ReadOnlyPDClient pdClient,
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback, Function<CacheInvalidateEvent, Void> cacheInvalidateCallback,
ChannelFactory channelFactory, ChannelFactory channelFactory,
boolean enableGrpcForward) { boolean enableGrpcForward) {
this.cache = new RegionCache(pdClient); this.cache = new RegionCache(conf, pdClient);
this.cacheInvalidateCallback = cacheInvalidateCallback; this.cacheInvalidateCallback = cacheInvalidateCallback;
if (enableGrpcForward) { if (enableGrpcForward) {
UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient); UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient);
@ -92,8 +97,8 @@ public class RegionManager {
} }
} }
public RegionManager(ReadOnlyPDClient pdClient) { public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
this.cache = new RegionCache(pdClient); this.cache = new RegionCache(conf, pdClient);
this.cacheInvalidateCallback = null; this.cacheInvalidateCallback = null;
this.storeChecker = null; this.storeChecker = null;
this.executor = null; this.executor = null;
@ -252,12 +257,14 @@ public class RegionManager {
private final Map<Long, TiStore> storeCache; private final Map<Long, TiStore> storeCache;
private final RangeMap<Key, Long> keyToRegionIdCache; private final RangeMap<Key, Long> keyToRegionIdCache;
private final ReadOnlyPDClient pdClient; private final ReadOnlyPDClient pdClient;
private final TiConfiguration conf;
public RegionCache(ReadOnlyPDClient pdClient) { public RegionCache(TiConfiguration conf, ReadOnlyPDClient pdClient) {
regionCache = new HashMap<>(); regionCache = new HashMap<>();
storeCache = new HashMap<>(); storeCache = new HashMap<>();
keyToRegionIdCache = TreeRangeMap.create(); keyToRegionIdCache = TreeRangeMap.create();
this.conf = conf;
this.pdClient = pdClient; this.pdClient = pdClient;
} }
@ -278,7 +285,9 @@ public class RegionManager {
if (regionId == null) { if (regionId == null) {
logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key)); logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key));
TiRegion region = pdClient.getRegionByKey(backOffer, key); Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
pdClient.getRegionByKey(backOffer, key);
TiRegion region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
if (!putRegion(region)) { if (!putRegion(region)) {
throw new TiClientInternalException("Invalid Region: " + region.toString()); throw new TiClientInternalException("Invalid Region: " + region.toString());
} }
@ -312,7 +321,9 @@ public class RegionManager {
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region)); logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
} }
if (region == null) { if (region == null) {
region = pdClient.getRegionByID(backOffer, regionId); Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
pdClient.getRegionByID(backOffer, regionId);
region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
if (!putRegion(region)) { if (!putRegion(region)) {
throw new TiClientInternalException("Invalid Region: " + region.toString()); throw new TiClientInternalException("Invalid Region: " + region.toString());
} }
@ -400,6 +411,19 @@ public class RegionManager {
} }
} }
private List<TiStore> getRegionStore(List<Metapb.Peer> peers, BackOffer backOffer) {
return peers
.stream()
.map(p -> getStoreById(p.getStoreId(), backOffer))
.collect(Collectors.toList());
}
private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, leader, peers, stores, null);
}
public synchronized void clearAll() { public synchronized void clearAll() {
keyToRegionIdCache.clear(); keyToRegionIdCache.clear();
regionCache.clear(); regionCache.clear();

View File

@ -31,7 +31,6 @@ import io.grpc.stub.MetadataUtils;
import io.prometheus.client.Histogram; import io.prometheus.client.Histogram;
import java.util.*; import java.util.*;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.tikv.common.PDClient; import org.tikv.common.PDClient;
@ -46,6 +45,7 @@ import org.tikv.common.util.*;
import org.tikv.kvproto.Coprocessor; import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Kvrpcpb.*; import org.tikv.kvproto.Kvrpcpb.*;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub; import org.tikv.kvproto.TikvGrpc.TikvStub;
@ -747,7 +747,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
* @param splitKeys is the split points for a specific region. * @param splitKeys is the split points for a specific region.
* @return a split region info. * @return a split region info.
*/ */
public List<TiRegion> splitRegion(Iterable<ByteString> splitKeys) { public List<Metapb.Region> splitRegion(Iterable<ByteString> splitKeys) {
Supplier<SplitRegionRequest> request = Supplier<SplitRegionRequest> request =
() -> () ->
SplitRegionRequest.newBuilder() SplitRegionRequest.newBuilder()
@ -782,19 +782,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
region.getId(), resp.getRegionError().toString())); region.getId(), resp.getRegionError().toString()));
} }
return resp.getRegionsList() return resp.getRegionsList();
.stream()
.map(
region ->
new TiRegion(
region,
null,
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.getReplicaSelector()))
.collect(Collectors.toList());
} }
// APIs for Raw Scan/Put/Get/Delete // APIs for Raw Scan/Put/Get/Delete

View File

@ -23,11 +23,10 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.TiConfiguration;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.KeyUtils; import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.key.Key; import org.tikv.common.key.Key;
@ -44,29 +43,31 @@ public class TiRegion implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(TiRegion.class); private static final Logger logger = LoggerFactory.getLogger(TiRegion.class);
private final Region meta; private final Region meta;
private final KVMode kvMode;
private final IsolationLevel isolationLevel; private final IsolationLevel isolationLevel;
private final Kvrpcpb.CommandPri commandPri; private final Kvrpcpb.CommandPri commandPri;
private final TiConfiguration conf;
private final Peer leader; private final Peer leader;
private final ReplicaSelector replicaSelector; private final ReplicaSelector replicaSelector;
private final List<Peer> replicaList; private final List<Peer> replicaList;
private final TiStore proxyStore; private final TiStore proxyStore;
private int replicaIdx; private int replicaIdx;
private final List<Peer> peers;
private final List<TiStore> stores;
public TiRegion( public TiRegion(
TiConfiguration conf,
Region meta, Region meta,
Peer leader, Peer leader,
TiStore proxyStore, List<Peer> peers,
IsolationLevel isolationLevel, List<TiStore> stores,
Kvrpcpb.CommandPri commandPri, TiStore proxyStore) {
KVMode kvMode, this.conf = Objects.requireNonNull(conf, "conf is null");
ReplicaSelector replicaSelector) { this.meta = Objects.requireNonNull(meta, "meta is null");
Objects.requireNonNull(meta, "meta is null"); this.isolationLevel = conf.getIsolationLevel();
this.meta = decodeRegion(meta, kvMode == KVMode.RAW); this.commandPri = conf.getCommandPriority();
this.kvMode = kvMode; this.peers = peers;
this.isolationLevel = isolationLevel; this.stores = stores;
this.commandPri = commandPri; this.replicaSelector = conf.getReplicaSelector();
this.replicaSelector = replicaSelector;
this.proxyStore = proxyStore; this.proxyStore = proxyStore;
if (leader == null || leader.getId() == 0) { if (leader == null || leader.getId() == 0) {
if (meta.getPeersCount() == 0) { if (meta.getPeersCount() == 0) {
@ -79,34 +80,15 @@ public class TiRegion implements Serializable {
} }
// init replicaList // init replicaList
replicaList = replicaSelector.select(this.leader, getFollowerList(), getLearnerList()); replicaList =
replicaSelector
.select(new org.tikv.common.replica.Region(meta, this.leader, peers, stores))
.stream()
.map(org.tikv.common.replica.Store::getPeer)
.collect(Collectors.toList());
replicaIdx = 0; replicaIdx = 0;
} }
private Region decodeRegion(Region region, boolean isRawRegion) {
Region.Builder builder =
Region.newBuilder()
.setId(region.getId())
.setRegionEpoch(region.getRegionEpoch())
.addAllPeers(region.getPeersList());
if (region.getStartKey().isEmpty() || isRawRegion) {
builder.setStartKey(region.getStartKey());
} else {
byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
builder.setStartKey(ByteString.copyFrom(decodedStartKey));
}
if (region.getEndKey().isEmpty() || isRawRegion) {
builder.setEndKey(region.getEndKey());
} else {
byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
builder.setEndKey(ByteString.copyFrom(decodedEndKey));
}
return builder.build();
}
public Peer getLeader() { public Peer getLeader() {
return leader; return leader;
} }
@ -180,7 +162,7 @@ public class TiRegion implements Serializable {
private Kvrpcpb.Context getContext( private Kvrpcpb.Context getContext(
Peer currentPeer, Set<Long> resolvedLocks, TiStoreType storeType) { Peer currentPeer, Set<Long> resolvedLocks, TiStoreType storeType) {
boolean replicaRead = !isLeader(getCurrentReplica()) && TiStoreType.TiKV.equals(storeType); boolean replicaRead = !isLeader(currentPeer) && TiStoreType.TiKV.equals(storeType);
Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder(); Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder();
builder builder
@ -215,28 +197,14 @@ public class TiRegion implements Serializable {
List<Peer> peers = meta.getPeersList(); List<Peer> peers = meta.getPeersList();
for (Peer p : peers) { for (Peer p : peers) {
if (p.getStoreId() == leaderStoreID) { if (p.getStoreId() == leaderStoreID) {
return new TiRegion( return new TiRegion(this.conf, this.meta, p, peers, this.stores, this.proxyStore);
this.meta,
p,
this.proxyStore,
this.isolationLevel,
this.commandPri,
this.kvMode,
this.replicaSelector);
} }
} }
return null; return null;
} }
public TiRegion switchProxyStore(TiStore store) { public TiRegion switchProxyStore(TiStore store) {
return new TiRegion( return new TiRegion(this.conf, this.meta, this.leader, this.peers, this.stores, store);
this.meta,
this.leader,
store,
this.isolationLevel,
this.commandPri,
this.kvMode,
this.replicaSelector);
} }
public boolean isMoreThan(ByteString key) { public boolean isMoreThan(ByteString key) {

View File

@ -18,13 +18,18 @@ package org.tikv.common.replica;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.tikv.kvproto.Metapb;
public class FollowerReplicaSelector implements ReplicaSelector { public class FollowerReplicaSelector implements ReplicaSelector {
@Override @Override
public List<Metapb.Peer> select( public List<Store> select(Region region) {
Metapb.Peer leader, List<Metapb.Peer> followers, List<Metapb.Peer> learners) { Store[] stores = region.getStores();
List<Metapb.Peer> list = new ArrayList<>(followers); Store leader = region.getLeader();
List<Store> list = new ArrayList<>(stores.length);
for (Store store : stores) {
if (!store.isLearner() && !leader.equals(store)) {
list.add(store);
}
}
Collections.shuffle(list); Collections.shuffle(list);
return list; return list;
} }

View File

@ -18,13 +18,18 @@ package org.tikv.common.replica;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.tikv.kvproto.Metapb;
public class LeaderFollowerReplicaSelector implements ReplicaSelector { public class LeaderFollowerReplicaSelector implements ReplicaSelector {
@Override @Override
public List<Metapb.Peer> select( public List<Store> select(Region region) {
Metapb.Peer leader, List<Metapb.Peer> followers, List<Metapb.Peer> learners) { Store[] stores = region.getStores();
List<Metapb.Peer> list = new ArrayList<>(followers); Store leader = region.getLeader();
List<Store> list = new ArrayList<>(stores.length);
for (Store store : stores) {
if (!store.isLearner() && !leader.equals(store)) {
list.add(store);
}
}
Collections.shuffle(list); Collections.shuffle(list);
list.add(leader); list.add(leader);
return list; return list;

View File

@ -17,14 +17,12 @@ package org.tikv.common.replica;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.tikv.kvproto.Metapb;
public class LeaderReplicaSelector implements ReplicaSelector { public class LeaderReplicaSelector implements ReplicaSelector {
@Override @Override
public List<Metapb.Peer> select( public List<Store> select(Region region) {
Metapb.Peer leader, List<Metapb.Peer> followers, List<Metapb.Peer> learners) { List<Store> list = new ArrayList<>(1);
List<Metapb.Peer> list = new ArrayList<>(); list.add(region.getLeader());
list.add(leader);
return list; return list;
} }
} }

View File

@ -0,0 +1,57 @@
package org.tikv.common.replica;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.Iterator;
import java.util.List;
import org.tikv.common.region.TiStore;
import org.tikv.kvproto.Metapb;
public class Region {
private final Metapb.Region region;
private final Store[] stores;
private Store leaderStore;
public Region(
final Metapb.Region region,
final Metapb.Peer leader,
final List<Metapb.Peer> peers,
final List<TiStore> stores) {
this.region = region;
this.stores = new Store[stores.size()];
Iterator<Metapb.Peer> peer = peers.iterator();
Iterator<TiStore> store = stores.iterator();
for (int idx = 0; idx < peers.size(); idx++) {
Metapb.Peer currentPeer = peer.next();
boolean isLeader = currentPeer.equals(leader);
this.stores[idx] = new Store(currentPeer, store.next().getStore(), isLeader);
if (isLeader) {
leaderStore = this.stores[idx];
}
}
}
public Store[] getStores() {
return stores;
}
public Store getLeader() {
return leaderStore;
}
public long getId() {
return region.getId();
}
public byte[] getStartKey() {
return region.getStartKey().toByteArray();
}
public byte[] getEndKey() {
return region.getEndKey().toByteArray();
}
public String toString() {
return toStringHelper(this).add("region", region).add("stores", stores).toString();
}
}

View File

@ -17,13 +17,11 @@ package org.tikv.common.replica;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import org.tikv.kvproto.Metapb;
public interface ReplicaSelector extends Serializable { public interface ReplicaSelector extends Serializable {
public static final ReplicaSelector LEADER = new LeaderReplicaSelector(); ReplicaSelector LEADER = new LeaderReplicaSelector();
public static final ReplicaSelector FOLLOWER = new FollowerReplicaSelector(); ReplicaSelector FOLLOWER = new FollowerReplicaSelector();
public static final ReplicaSelector LEADER_AND_FOLLOWER = new LeaderFollowerReplicaSelector(); ReplicaSelector LEADER_AND_FOLLOWER = new LeaderFollowerReplicaSelector();
List<Metapb.Peer> select( List<Store> select(Region region);
Metapb.Peer leader, List<Metapb.Peer> followers, List<Metapb.Peer> learners);
} }

View File

@ -0,0 +1,111 @@
package org.tikv.common.replica;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.List;
import org.tikv.kvproto.Metapb;
public class Store {
public static class Label {
private final org.tikv.kvproto.Metapb.StoreLabel label;
Label(org.tikv.kvproto.Metapb.StoreLabel label) {
this.label = label;
}
public String getKey() {
return label.getKey();
}
public String getValue() {
return label.getValue();
}
}
public enum State {
Unknown,
Up,
Offline,
Tombstone
}
private static final Label[] EMPTY_LABELS = new Label[0];
private Label[] labels;
private final Metapb.Peer peer;
private final Metapb.Store store;
private final boolean isLeader;
Store(
final org.tikv.kvproto.Metapb.Peer peer,
final org.tikv.kvproto.Metapb.Store store,
boolean isLeader) {
this.peer = peer;
this.store = store;
this.isLeader = isLeader;
}
public Metapb.Peer getPeer() {
return peer;
}
public Label[] getLabels() {
if (labels == null) {
List<Metapb.StoreLabel> labelList = store.getLabelsList();
if (labelList.isEmpty()) {
labels = EMPTY_LABELS;
} else {
labels = labelList.stream().map(Label::new).toArray(Label[]::new);
}
}
return labels;
}
public boolean isLearner() {
return peer.getRole() == Metapb.PeerRole.Learner;
}
public boolean isLeader() {
return isLeader;
}
public boolean isFollower() {
return peer.getRole() == Metapb.PeerRole.Voter && !isLeader;
}
public long getId() {
return store.getId();
}
public String getAddress() {
return store.getAddress();
}
public String getVersion() {
return store.getVersion();
}
public State getState() {
switch (store.getState()) {
case Up:
return State.Up;
case Offline:
return State.Offline;
case Tombstone:
return State.Tombstone;
default:
return State.Unknown;
}
}
public boolean equals(Object o) {
if (!(o instanceof Store)) {
return false;
}
Store other = (Store) o;
return this.peer.equals(other.peer);
}
public String toString() {
return toStringHelper(this).add("peer", peer).add("store", store).toString();
}
}

View File

@ -1,11 +1,13 @@
package org.tikv.common; package org.tikv.common;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Before; import org.junit.Before;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiRegion;
import org.tikv.common.replica.ReplicaSelector; import org.tikv.common.region.TiStore;
import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb; import org.tikv.kvproto.Pdpb;
@ -28,16 +30,26 @@ public class MockServerTest extends PDMockServerTest {
.addPeers(Metapb.Peer.newBuilder().setId(11).setStoreId(13)) .addPeers(Metapb.Peer.newBuilder().setId(11).setStoreId(13))
.build(); .build();
List<Metapb.Store> s =
ImmutableList.of(
Metapb.Store.newBuilder()
.setAddress("localhost:1234")
.setVersion("5.0.0")
.setId(13)
.build());
region = region =
new TiRegion( new TiRegion(
session.getConf(),
r, r,
r.getPeers(0), r.getPeers(0),
null, r.getPeersList(),
session.getConf().getIsolationLevel(), s.stream().map(TiStore::new).collect(Collectors.toList()),
session.getConf().getCommandPriority(), null);
KVMode.TXN,
ReplicaSelector.LEADER);
pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
for (Metapb.Store store : s) {
pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
}
server = new KVMockServer(); server = new KVMockServer();
port = server.start(region); port = server.start(region);
} }

View File

@ -23,9 +23,9 @@ import java.util.concurrent.*;
import org.junit.Test; import org.junit.Test;
import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.GrpcException;
import org.tikv.common.meta.TiTimestamp; import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer; import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreState; import org.tikv.kvproto.Metapb.StoreState;
@ -85,13 +85,16 @@ public class PDClientTest extends PDMockServerTest {
GrpcUtils.makePeer(1, 10), GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20)))); GrpcUtils.makePeer(2, 20))));
try (PDClient client = session.getPDClient()) { try (PDClient client = session.getPDClient()) {
TiRegion r = client.getRegionByKey(defaultBackOff(), ByteString.EMPTY); Pair<Metapb.Region, Metapb.Peer> rl =
client.getRegionByKey(defaultBackOff(), ByteString.EMPTY);
Metapb.Region r = rl.first;
Metapb.Peer l = rl.second;
assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); assertEquals(r.getStartKey(), ByteString.copyFrom(startKey));
assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); assertEquals(r.getEndKey(), ByteString.copyFrom(endKey));
assertEquals(r.getRegionEpoch().getConfVer(), confVer); assertEquals(r.getRegionEpoch().getConfVer(), confVer);
assertEquals(r.getRegionEpoch().getVersion(), ver); assertEquals(r.getRegionEpoch().getVersion(), ver);
assertEquals(r.getLeader().getId(), 1); assertEquals(l.getId(), 1);
assertEquals(r.getLeader().getStoreId(), 10); assertEquals(l.getStoreId(), 10);
} }
} }
@ -113,13 +116,15 @@ public class PDClientTest extends PDMockServerTest {
GrpcUtils.makePeer(1, 10), GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20)))); GrpcUtils.makePeer(2, 20))));
try (PDClient client = session.getPDClient()) { try (PDClient client = session.getPDClient()) {
TiRegion r = client.getRegionByID(defaultBackOff(), 0); Pair<Metapb.Region, Metapb.Peer> rl = client.getRegionByID(defaultBackOff(), 0);
Metapb.Region r = rl.first;
Metapb.Peer l = rl.second;
assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); assertEquals(r.getStartKey(), ByteString.copyFrom(startKey));
assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); assertEquals(r.getEndKey(), ByteString.copyFrom(endKey));
assertEquals(r.getRegionEpoch().getConfVer(), confVer); assertEquals(r.getRegionEpoch().getConfVer(), confVer);
assertEquals(r.getRegionEpoch().getVersion(), ver); assertEquals(r.getRegionEpoch().getVersion(), ver);
assertEquals(r.getLeader().getId(), 1); assertEquals(l.getId(), 1);
assertEquals(r.getLeader().getStoreId(), 10); assertEquals(l.getStoreId(), 10);
} }
} }

View File

@ -61,6 +61,7 @@ public class RegionManagerTest extends PDMockServerTest {
int confVer = 1026; int confVer = 1026;
int ver = 1027; int ver = 1027;
long regionId = 233; long regionId = 233;
String testAddress = "testAddress";
pdServer.addGetRegionResp( pdServer.addGetRegionResp(
GrpcUtils.makeGetRegionResponse( GrpcUtils.makeGetRegionResponse(
pdServer.getClusterId(), pdServer.getClusterId(),
@ -71,6 +72,18 @@ public class RegionManagerTest extends PDMockServerTest {
GrpcUtils.makeRegionEpoch(confVer, ver), GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(1, 10), GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20)))); GrpcUtils.makePeer(2, 20))));
for (long id : new long[] {10, 20}) {
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(),
GrpcUtils.makeStore(
id,
testAddress,
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
}
TiRegion region = mgr.getRegionByKey(startKey); TiRegion region = mgr.getRegionByKey(startKey);
assertEquals(region.getId(), regionId); assertEquals(region.getId(), regionId);
@ -106,15 +119,18 @@ public class RegionManagerTest extends PDMockServerTest {
GrpcUtils.makeRegionEpoch(confVer, ver), GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(storeId, 10), GrpcUtils.makePeer(storeId, 10),
GrpcUtils.makePeer(storeId + 1, 20)))); GrpcUtils.makePeer(storeId + 1, 20))));
pdServer.addGetStoreResp( for (long id : new long[] {10, 20}) {
GrpcUtils.makeGetStoreResponse( pdServer.addGetStoreResp(
pdServer.getClusterId(), GrpcUtils.makeGetStoreResponse(
GrpcUtils.makeStore( pdServer.getClusterId(),
storeId, GrpcUtils.makeStore(
testAddress, id,
Metapb.StoreState.Up, testAddress,
GrpcUtils.makeStoreLabel("k1", "v1"), Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k2", "v2")))); GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
}
Pair<TiRegion, TiStore> pair = mgr.getRegionStorePairByKey(searchKey); Pair<TiRegion, TiStore> pair = mgr.getRegionStorePairByKey(searchKey);
assertEquals(pair.first.getId(), regionId); assertEquals(pair.first.getId(), regionId);
assertEquals(pair.first.getId(), storeId); assertEquals(pair.first.getId(), storeId);

View File

@ -54,7 +54,7 @@ public class RegionStoreClientTest extends MockServerTest {
new RegionStoreClientBuilder( new RegionStoreClientBuilder(
session.getConf(), session.getConf(),
session.getChannelFactory(), session.getChannelFactory(),
new RegionManager(session.getPDClient()), new RegionManager(session.getConf(), session.getPDClient()),
session.getPDClient()); session.getPDClient());
return builder.build(region, store); return builder.build(region, store);

View File

@ -10,8 +10,9 @@ import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.tikv.common.TiConfiguration; import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession; import org.tikv.common.TiSession;
import org.tikv.common.replica.Region;
import org.tikv.common.replica.ReplicaSelector; import org.tikv.common.replica.ReplicaSelector;
import org.tikv.kvproto.Metapb; import org.tikv.common.replica.Store;
public class ReplicaReadTest extends TXNTest { public class ReplicaReadTest extends TXNTest {
private TiSession session; private TiSession session;
@ -41,12 +42,11 @@ public class ReplicaReadTest extends TXNTest {
conf.setReplicaSelector( conf.setReplicaSelector(
new ReplicaSelector() { new ReplicaSelector() {
@Override @Override
public List<Metapb.Peer> select( public List<Store> select(Region region) {
Metapb.Peer leader, List<Metapb.Peer> followers, List<Metapb.Peer> learners) { List<Store> list = new ArrayList<>();
List<Metapb.Peer> list = new ArrayList<>(); for (Store store : region.getStores()) {
list.addAll(followers); list.add(store);
list.addAll(learners); }
list.add(leader);
return list; return list;
} }
}); });