Forward request by store (#223)

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
This commit is contained in:
Wallace 2021-07-01 01:35:21 +08:00 committed by GitHub
parent 630e1513e9
commit 01898542c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 178 additions and 80 deletions

View File

@ -89,6 +89,9 @@ public abstract class AbstractGRPCClient<
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName());
if (resp != null && this.conf.getEnableGrpcForward()) {
tryUpdateProxy();
}
if (logger.isTraceEnabled()) {
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
@ -177,6 +180,8 @@ public abstract class AbstractGRPCClient<
protected abstract StubT getAsyncStub();
protected abstract void tryUpdateProxy();
protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
HealthGrpc.HealthBlockingStub stub =

View File

@ -20,6 +20,7 @@ import org.tikv.kvproto.Kvrpcpb;
public class ConfigUtils {
public static final String TIKV_PD_ADDRESSES = "tikv.pd.addresses";
public static final String TIKV_GRPC_TIMEOUT = "tikv.grpc.timeout_in_ms";
public static final String TIKV_GRPC_FORWARD_TIMEOUT = "tikv.grpc.forward_timeout_in_ms";
public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms";
public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size";
public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size";
@ -54,7 +55,8 @@ public class ConfigUtils {
public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_TIMEOUT = "150ms";
public static final String DEF_FORWARD_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_CHECK_HEALTH_TIMEOUT = 40;
public static final int DEF_SCAN_BATCH_SIZE = 10240;

View File

@ -325,7 +325,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
private GetMembersResponse getMembers(URI uri) {
try {
ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping);
PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan);
PDGrpc.PDBlockingStub stub =
PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
GetMembersRequest request =
GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build();
GetMembersResponse resp = stub.getMembers(request);
@ -335,7 +336,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
return resp;
} catch (Exception e) {
logger.warn("failed to get member from pd server.", e);
logger.debug("failed to get member from pd server.", e);
}
return null;
}
@ -361,6 +362,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr, hostMapping);
pdClientWrapper =
new PDClientWrapper(leaderUrlStr, leaderUrlStr, clientChannel, System.nanoTime());
timeout = conf.getTimeout();
} catch (IllegalArgumentException e) {
logger.error("Error updating leader. " + leaderUrlStr, e);
return false;
@ -380,6 +382,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
// create new Leader
ManagedChannel channel = channelFactory.getChannel(followerUrlStr, hostMapping);
pdClientWrapper = new PDClientWrapper(leaderUrls, followerUrlStr, channel, System.nanoTime());
timeout = conf.getForwardTimeout();
} catch (IllegalArgumentException e) {
logger.error("Error updating follower. " + followerUrlStr, e);
return false;
@ -411,6 +414,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
continue;
}
logger.info(String.format("can not switch to new leader, try follower forward"));
List<Pdpb.Member> members = resp.getMembersList();
boolean hasReachNextMember = false;
@ -431,6 +435,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
continue;
}
if (hasReachNextMember && createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) {
logger.warn(
String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr));
return;
}
}
@ -464,8 +470,10 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
return;
}
}
throw new TiClientInternalException(
"already tried all address on file, but not leader found yet.");
if (pdClientWrapper == null) {
throw new TiClientInternalException(
"already tried all address on file, but not leader found yet.");
}
}
private synchronized void tryUpdateMembers(List<URI> members) {
@ -541,6 +549,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}
@Override
protected void tryUpdateProxy() {}
private void initCluster() {
GetMembersResponse resp = null;
List<URI> pdAddrs = getConf().getPdAddrs();
@ -558,6 +569,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
this.hostMapping =
Optional.ofNullable(getConf().getHostMapping())
.orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName()));
// The first request may cost too much latency
long originTimeout = this.timeout;
this.timeout = 2000;
for (URI u : pdAddrs) {
resp = getMembers(u);
if (resp != null) {
@ -565,6 +579,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
logger.info("Could not get leader member with pd: " + u);
}
this.timeout = originTimeout;
checkNotNull(resp, "Failed to init client for PD cluster.");
long clusterId = resp.getHeader().getClusterId();
header = RequestHeader.newBuilder().setClusterId(clusterId).build();
@ -654,7 +669,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
@Override
public String toString() {
return "[leaderInfo: " + leaderInfo + "]";
return "[leaderInfo: " + leaderInfo + ", storeAddress: " + storeAddress + "]";
}
}

View File

@ -54,6 +54,7 @@ public class TiConfiguration implements Serializable {
private static void loadFromDefaultProperties() {
setIfMissing(TIKV_PD_ADDRESSES, DEF_PD_ADDRESSES);
setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT);
setIfMissing(TIKV_GRPC_FORWARD_TIMEOUT, DEF_FORWARD_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE);
setIfMissing(TIKV_GRPC_MAX_FRAME_SIZE, DEF_MAX_FRAME_SIZE);
@ -237,6 +238,7 @@ public class TiConfiguration implements Serializable {
}
private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT);
private long forwardTimeout = getTimeAsMs(TIKV_GRPC_FORWARD_TIMEOUT);
private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT);
private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE);
private List<URI> pdAddrs = getPdAddrs(TIKV_PD_ADDRESSES);
@ -334,6 +336,15 @@ public class TiConfiguration implements Serializable {
return this;
}
public long getForwardTimeout() {
return forwardTimeout;
}
public TiConfiguration setForwardTimeout(long timeout) {
this.forwardTimeout = timeout;
return this;
}
public long getScanTimeout() {
return scanTimeout;
}

View File

@ -79,6 +79,9 @@ public class TiSession implements AutoCloseable {
this.client = PDClient.createRaw(conf, channelFactory);
this.enableGrpcForward = conf.getEnableGrpcForward();
this.metricsServer = MetricsServer.getInstance(conf);
if (this.enableGrpcForward) {
logger.info("enable grpc forward for high available");
}
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
}

View File

@ -169,8 +169,6 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
public boolean handleRequestError(BackOffer backOffer, Exception e) {
if (recv.onStoreUnreachable()) {
return true;
} else {
regionManager.onRequestFail(recv.getRegion());
}
backOffer.doBackOff(

View File

@ -28,6 +28,8 @@ import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.MetadataUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
@ -38,10 +40,13 @@ import org.tikv.kvproto.TikvGrpc;
public abstract class AbstractRegionStoreClient
extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvStub>
implements RegionErrorReceiver {
private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class);
protected final RegionManager regionManager;
protected TiRegion region;
protected TiStore targetStore;
protected TiStore originStore;
protected long retryTimes;
protected AbstractRegionStoreClient(
TiConfiguration conf,
@ -58,6 +63,11 @@ public abstract class AbstractRegionStoreClient
this.region = region;
this.regionManager = regionManager;
this.targetStore = store;
this.originStore = null;
this.retryTimes = 0;
if (this.targetStore.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
}
}
public TiRegion getRegion() {
@ -106,26 +116,46 @@ public abstract class AbstractRegionStoreClient
@Override
public boolean onStoreUnreachable() {
if (!conf.getEnableGrpcForward()) {
regionManager.onRequestFail(region);
return false;
}
if (region.getProxyStore() == null) {
if (targetStore.getProxyStore() == null) {
if (!targetStore.isUnreachable()) {
if (checkHealth(targetStore)) {
if (checkHealth(targetStore.getStore())) {
return true;
} else {
if (targetStore.markUnreachable()) {
this.regionManager.scheduleHealthCheckJob(targetStore);
}
}
}
}
TiRegion proxyRegion = switchProxyStore();
if (proxyRegion == null) {
} else if (retryTimes > region.getFollowerList().size()) {
logger.warn(
String.format(
"retry time exceed for region[%d], invalid this region and store[%d]",
region.getId(), targetStore.getId()));
regionManager.onRequestFail(region);
return false;
}
regionManager.updateRegion(region, proxyRegion);
region = proxyRegion;
String addressStr = region.getProxyStore().getStore().getAddress();
TiStore proxyStore = switchProxyStore();
if (proxyStore == null) {
logger.warn(
String.format(
"no forward store can be selected for store [%s] and region[%d]",
targetStore.getStore().getAddress(), region.getId()));
return false;
}
if (originStore == null) {
originStore = targetStore;
if (this.targetStore.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
}
}
targetStore = proxyStore;
retryTimes += 1;
logger.warn(
String.format(
"forward request to store [%s] by store [%s] for region[%d]",
targetStore.getStore().getAddress(),
targetStore.getProxyStore().getAddress(),
region.getId()));
String addressStr = targetStore.getProxyStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
Metadata header = new Metadata();
@ -135,11 +165,15 @@ public abstract class AbstractRegionStoreClient
return true;
}
private boolean checkHealth(TiStore store) {
if (store.getStore() == null) {
return false;
@Override
protected void tryUpdateProxy() {
if (originStore != null) {
regionManager.updateStore(originStore, targetStore);
}
String addressStr = store.getStore().getAddress();
}
private boolean checkHealth(Metapb.Store store) {
String addressStr = store.getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
HealthGrpc.HealthBlockingStub stub =
@ -157,26 +191,25 @@ public abstract class AbstractRegionStoreClient
return true;
}
private TiRegion switchProxyStore() {
private TiStore switchProxyStore() {
boolean hasVisitedStore = false;
List<Metapb.Peer> peers = region.getFollowerList();
for (int i = 0; i < peers.size() * 2; i++) {
int idx = i % peers.size();
Metapb.Peer peer = peers.get(idx);
if (peer.getStoreId() != region.getLeader().getStoreId()) {
if (region.getProxyStore() == null) {
if (targetStore.getProxyStore() == null) {
TiStore store = regionManager.getStoreById(peer.getStoreId());
if (checkHealth(store)) {
return region.switchProxyStore(store);
if (checkHealth(store.getStore())) {
return targetStore.withProxy(store.getStore());
}
} else {
TiStore proxyStore = region.getProxyStore();
if (peer.getStoreId() == proxyStore.getStore().getId()) {
if (peer.getStoreId() == targetStore.getProxyStore().getId()) {
hasVisitedStore = true;
} else if (hasVisitedStore) {
proxyStore = regionManager.getStoreById(peer.getStoreId());
if (!proxyStore.isUnreachable() && checkHealth(proxyStore)) {
return region.switchProxyStore(proxyStore);
TiStore proxyStore = regionManager.getStoreById(peer.getStoreId());
if (!proxyStore.isUnreachable() && checkHealth(proxyStore.getStore())) {
return targetStore.withProxy(proxyStore.getStore());
}
}
}

View File

@ -215,8 +215,16 @@ public class RegionManager {
return null;
}
public synchronized boolean updateRegion(TiRegion oldRegion, TiRegion region) {
return cache.updateRegion(oldRegion, region);
public synchronized void updateStore(TiStore oldStore, TiStore newStore) {
if (cache.updateStore(oldStore, newStore)) {
if (newStore.isUnreachable()) {
logger.warn(
String.format(
"check health for store [%s] in background thread",
newStore.getStore().getAddress()));
this.storeChecker.scheduleStoreHealthCheck(newStore);
}
}
}
/** Clears all cache when some unexpected error occurs. */
@ -229,15 +237,8 @@ public class RegionManager {
*
* @param region region
*/
public void onRequestFail(TiRegion region) {
onRequestFail(region, region.getLeader().getStoreId());
}
private void onRequestFail(TiRegion region, long storeId) {
if (this.storeChecker != null) {
cache.invalidateRegion(region);
cache.invalidateAllRegionForStore(storeId);
}
public synchronized void onRequestFail(TiRegion region) {
cache.invalidateRegion(region);
}
public void invalidateStore(long storeId) {
@ -248,10 +249,6 @@ public class RegionManager {
cache.invalidateRegion(region);
}
public void scheduleHealthCheckJob(TiStore store) {
this.storeChecker.scheduleStoreHealthCheck(store);
}
public static class RegionCache {
private final Map<Long, TiRegion> regionCache;
private final Map<Long, TiStore> storeCache;
@ -370,10 +367,29 @@ public class RegionManager {
}
}
public synchronized void invalidateAllRegionForStore(long storeId) {
public synchronized boolean updateStore(TiStore oldStore, TiStore newStore) {
TiStore originStore = storeCache.get(oldStore.getId());
if (originStore == oldStore) {
storeCache.put(newStore.getId(), newStore);
if (oldStore != null && oldStore.isUnreachable()) {
oldStore.markReachable();
}
if (newStore.getProxyStore() != null) {
newStore.markUnreachable();
}
return true;
}
return false;
}
public synchronized void invalidateAllRegionForStore(TiStore store) {
TiStore oldStore = storeCache.get(store.getId());
if (oldStore != store) {
return;
}
List<TiRegion> regionToRemove = new ArrayList<>();
for (TiRegion r : regionCache.values()) {
if (r.getLeader().getStoreId() == storeId) {
if (r.getLeader().getStoreId() == store.getId()) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("invalidateAllRegionForStore Region[%s]", r));
}
@ -381,6 +397,7 @@ public class RegionManager {
}
}
logger.warn(String.format("invalid store [%d]", store.getId()));
// remove region
for (TiRegion r : regionToRemove) {
keyToRegionIdCache.remove(makeRange(r.getStartKey(), r.getEndKey()));
@ -421,7 +438,7 @@ public class RegionManager {
private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, leader, peers, stores, null);
return new TiRegion(conf, region, leader, peers, stores);
}
public synchronized void clearAll() {

View File

@ -1269,8 +1269,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
TikvBlockingStub blockingStub = null;
TikvStub asyncStub = null;
if (conf.getEnableGrpcForward() && region.getProxyStore() != null && store.isUnreachable()) {
addressStr = region.getProxyStore().getStore().getAddress();
if (conf.getEnableGrpcForward() && store.getProxyStore() != null && store.isUnreachable()) {
addressStr = store.getProxyStore().getAddress();
channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
Metadata header = new Metadata();
@ -1278,13 +1278,16 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header);
asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header);
} else {
// If the store is reachable, which is update by check-health thread
// If the store is reachable, which is update by check-health thread, cancel proxy forward.
if (!store.isUnreachable()) {
if (region.getProxyStore() != null) {
TiRegion newRegion = region.switchProxyStore(null);
if (regionManager.updateRegion(region, newRegion)) {
region = newRegion;
}
if (store.getProxyStore() != null) {
logger.warn(
String.format(
"cancel request to store [%s] forward by store[%s]",
store.getStore().getAddress(), store.getProxyStore().getAddress()));
TiStore newStore = store.withProxy(null);
regionManager.updateStore(store, newStore);
store = newStore;
}
}
channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());

View File

@ -49,18 +49,12 @@ public class TiRegion implements Serializable {
private final Peer leader;
private final ReplicaSelector replicaSelector;
private final List<Peer> replicaList;
private final TiStore proxyStore;
private int replicaIdx;
private final List<Peer> peers;
private final List<TiStore> stores;
public TiRegion(
TiConfiguration conf,
Region meta,
Peer leader,
List<Peer> peers,
List<TiStore> stores,
TiStore proxyStore) {
TiConfiguration conf, Region meta, Peer leader, List<Peer> peers, List<TiStore> stores) {
this.conf = Objects.requireNonNull(conf, "conf is null");
this.meta = Objects.requireNonNull(meta, "meta is null");
this.isolationLevel = conf.getIsolationLevel();
@ -68,7 +62,6 @@ public class TiRegion implements Serializable {
this.peers = peers;
this.stores = stores;
this.replicaSelector = conf.getReplicaSelector();
this.proxyStore = proxyStore;
if (leader == null || leader.getId() == 0) {
if (meta.getPeersCount() == 0) {
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
@ -182,10 +175,6 @@ public class TiRegion implements Serializable {
meta.getId(), meta.getRegionEpoch().getConfVer(), meta.getRegionEpoch().getVersion());
}
public TiStore getProxyStore() {
return proxyStore;
}
/**
* switches current peer to the one on specific store. It return false if no peer matches the
* storeID.
@ -197,16 +186,12 @@ public class TiRegion implements Serializable {
List<Peer> peers = meta.getPeersList();
for (Peer p : peers) {
if (p.getStoreId() == leaderStoreID) {
return new TiRegion(this.conf, this.meta, p, peers, this.stores, this.proxyStore);
return new TiRegion(this.conf, this.meta, p, peers, this.stores);
}
}
return null;
}
public TiRegion switchProxyStore(TiStore store) {
return new TiRegion(this.conf, this.meta, this.leader, this.peers, this.stores, store);
}
public boolean isMoreThan(ByteString key) {
return FastByteComparisons.compareTo(
meta.getStartKey().toByteArray(),

View File

@ -5,15 +5,27 @@ import org.tikv.kvproto.Metapb;
public class TiStore {
private final Metapb.Store store;
private final Metapb.Store proxyStore;
private AtomicBoolean unreachable;
public TiStore(Metapb.Store store) {
this.store = store;
this.unreachable = new AtomicBoolean(false);
this.proxyStore = null;
}
public boolean markUnreachable() {
return this.unreachable.compareAndSet(false, true);
private TiStore(Metapb.Store store, Metapb.Store proxyStore) {
this.store = store;
this.unreachable = new AtomicBoolean(false);
this.proxyStore = proxyStore;
}
public TiStore withProxy(Metapb.Store proxyStore) {
return new TiStore(this.store, proxyStore);
}
public void markUnreachable() {
this.unreachable.set(true);
}
public void markReachable() {
@ -28,6 +40,14 @@ public class TiStore {
return this.store;
}
public String getAddress() {
return this.store.getAddress();
}
public Metapb.Store getProxyStore() {
return this.proxyStore;
}
public long getId() {
return this.store.getId();
}

View File

@ -9,12 +9,15 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Metapb;
public class UnreachableStoreChecker implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(UnreachableStoreChecker.class);
private ConcurrentHashMap<Long, TiStore> stores;
private BlockingQueue<TiStore> taskQueue;
private final ChannelFactory channelFactory;
@ -67,6 +70,9 @@ public class UnreachableStoreChecker implements Runnable {
HealthCheckResponse resp = stub.check(req);
if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
store.markReachable();
logger.warn(
String.format("store [%s] recovers to be reachable", store.getStore().getAddress()));
this.stores.remove(Long.valueOf(store.getId()));
continue;
}

View File

@ -672,6 +672,7 @@ public class RawKVClient implements AutoCloseable {
private List<Batch> doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) {
try (RegionStoreClient client = clientBuilder.build(batch.getRegion())) {
client.setTimeout(conf.getScanTimeout());
client.rawBatchPut(backOffer, batch, ttl, atomicForCAS);
return new ArrayList<>();
} catch (final TiKVException e) {

View File

@ -44,8 +44,7 @@ public class MockServerTest extends PDMockServerTest {
r,
r.getPeers(0),
r.getPeersList(),
s.stream().map(TiStore::new).collect(Collectors.toList()),
null);
s.stream().map(TiStore::new).collect(Collectors.toList()));
pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
for (Metapb.Store store : s) {
pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build());