Support grpc forward (#198)

* support grpc forward for tikv client

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
This commit is contained in:
Wallace 2021-06-18 18:25:58 +08:00 committed by GitHub
parent 8308d796e7
commit 5815678c56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 480 additions and 141 deletions

View File

@ -128,6 +128,11 @@
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>

View File

@ -149,7 +149,11 @@ public class CDCClient implements AutoCloseable {
for (final TiRegion region : regions) {
if (overlapWithRegion(region)) {
final String address =
session.getRegionManager().getStoreById(region.getLeader().getStoreId()).getAddress();
session
.getRegionManager()
.getStoreById(region.getLeader().getStoreId())
.getStore()
.getAddress();
final ManagedChannel channel =
session.getChannelFactory().getChannel(address, session.getPDClient().getHostMapping());
try {

View File

@ -48,10 +48,13 @@ public class ConfigUtils {
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";
public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping";
public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward";
public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_CHECK_HEALTH_TIMEOUT = 40;
public static final int DEF_SCAN_BATCH_SIZE = 10240;
public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB
public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000;
@ -76,6 +79,7 @@ public class ConfigUtils {
public static final boolean DEF_METRICS_ENABLE = false;
public static final int DEF_METRICS_PORT = 3140;
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
public static final boolean DEF_GRPC_FORWARD_ENABLE = true;
public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";

View File

@ -241,6 +241,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
@ -258,6 +259,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
@ -285,6 +287,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
@ -299,6 +302,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),

View File

@ -17,6 +17,7 @@ package org.tikv.common;
import static org.tikv.common.ConfigUtils.*;
import io.grpc.Metadata;
import java.io.Serializable;
import java.net.URI;
import java.util.*;
@ -32,6 +33,8 @@ public class TiConfiguration implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
public static final Metadata.Key FORWARD_META_DATA_KEY =
Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
static {
loadFromSystemProperties();
@ -72,6 +75,8 @@ public class TiConfiguration implements Serializable {
setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE);
setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT);
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE);
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
}
public static void listAll() {
@ -245,6 +250,7 @@ public class TiConfiguration implements Serializable {
private boolean showRowId = getBoolean(TIKV_SHOW_ROWID);
private String dbPrefix = get(TIKV_DB_PREFIX);
private KVMode kvMode = getKvMode(TIKV_KV_MODE);
private boolean enableGrpcForward = getBoolean(TIKV_ENABLE_GRPC_FORWARD);
private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY);
private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ);
@ -253,6 +259,7 @@ public class TiConfiguration implements Serializable {
private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
private int metricsPort = getInt(TIKV_METRICS_PORT);
private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT);
private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
@ -532,4 +539,12 @@ public class TiConfiguration implements Serializable {
public String getNetworkMappingName() {
return this.networkMappingName;
}
public boolean getEnableGrpcForward() {
return this.enableGrpcForward;
}
public long getGrpcHealthCheckTimeout() {
return this.grpcHealthCheckTimeout;
}
}

View File

@ -43,8 +43,8 @@ import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.*;
import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
import org.tikv.txn.KVClient;
import org.tikv.txn.TxnKVClient;
@ -71,6 +71,7 @@ public class TiSession implements AutoCloseable {
private volatile ExecutorService batchScanThreadPool;
private volatile ExecutorService deleteRangeThreadPool;
private volatile RegionManager regionManager;
private volatile boolean enableGrpcForward;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private boolean isClosed = false;
private HTTPServer server;
@ -80,6 +81,7 @@ public class TiSession implements AutoCloseable {
this.conf = conf;
this.channelFactory = new ChannelFactory(conf.getMaxFrameSize());
this.client = PDClient.createRaw(conf, channelFactory);
this.enableGrpcForward = conf.getEnableGrpcForward();
if (conf.isMetricsEnable()) {
try {
this.collectorRegistry = new CollectorRegistry();
@ -199,7 +201,12 @@ public class TiSession implements AutoCloseable {
if (res == null) {
synchronized (this) {
if (regionManager == null) {
regionManager = new RegionManager(getPDClient(), this.cacheInvalidateCallback);
regionManager =
new RegionManager(
getPDClient(),
this.cacheInvalidateCallback,
this.channelFactory,
this.enableGrpcForward);
}
res = regionManager;
}
@ -415,10 +422,10 @@ public class TiSession implements AutoCloseable {
groupKeysByRegion(regionManager, splitKeys, backOffer);
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
Pair<TiRegion, Metapb.Store> pair =
Pair<TiRegion, TiStore> pair =
getRegionManager().getRegionStorePairByKey(entry.getKey().getStartKey());
TiRegion region = pair.first;
Metapb.Store store = pair.second;
TiStore store = pair.second;
List<ByteString> splits =
entry
.getValue()

View File

@ -73,9 +73,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
// onNotLeader is only needed when updateLeader succeeds, thus switch
// to a new store address.
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
retry =
newRegion != null
&& recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion);
retry = newRegion != null && recv.onNotLeader(newRegion);
backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
} else {
@ -107,7 +105,6 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
this.regionManager.invalidateRegion(recv.getRegion());
this.regionManager.invalidateStore(storeId);
// recv.onStoreNotMatch(this.regionManager.getStoreById(storeId));
// assume this is a low probability error, do not retry, just re-split the request by
// throwing it out.
return false;
@ -169,7 +166,11 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
regionManager.onRequestFail(recv.getRegion());
if (recv.onStoreUnreachable()) {
return true;
} else {
regionManager.onRequestFail(recv.getRegion());
}
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoTiKVRPC,

View File

@ -27,11 +27,11 @@ import org.tikv.common.key.Key;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
public class ConcreteScanIterator extends ScanIterator {
private final long version;
@ -82,10 +82,10 @@ public class ConcreteScanIterator extends ScanIterator {
private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
logger.warn(String.format("resolve current key error %s", current.getError().toString()));
Pair<TiRegion, Metapb.Store> pair =
Pair<TiRegion, TiStore> pair =
builder.getRegionManager().getRegionStorePairByKey(current.getKey());
TiRegion region = pair.first;
Metapb.Store store = pair.second;
TiStore store = pair.second;
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
try (RegionStoreClient client = builder.build(region, store)) {
return client.get(backOffer, current.getKey(), version);

View File

@ -32,12 +32,12 @@ import org.tikv.common.meta.TiDAGRequest.PushDownType;
import org.tikv.common.operation.SchemaInfer;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Metapb;
public abstract class DAGIterator<T>
extends org.tikv.common.operation.iterator.CoprocessorIterator<T> {
@ -204,7 +204,7 @@ public abstract class DAGIterator<T>
}
List<Coprocessor.KeyRange> ranges = task.getRanges();
TiRegion region = task.getRegion();
Metapb.Store store = task.getStore();
TiStore store = task.getStore();
try {
RegionStoreClient client =
@ -245,7 +245,7 @@ public abstract class DAGIterator<T>
private Iterator<SelectResponse> processByStreaming(RangeSplitter.RegionTask regionTask) {
List<Coprocessor.KeyRange> ranges = regionTask.getRanges();
TiRegion region = regionTask.getRegion();
Metapb.Store store = regionTask.getStore();
TiStore store = regionTask.getStore();
RegionStoreClient client;
try {

View File

@ -21,6 +21,12 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.MetadataUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
@ -35,10 +41,12 @@ public abstract class AbstractRegionStoreClient
protected final RegionManager regionManager;
protected TiRegion region;
protected TiStore targetStore;
protected AbstractRegionStoreClient(
TiConfiguration conf,
TiRegion region,
TiStore store,
ChannelFactory channelFactory,
TikvGrpc.TikvBlockingStub blockingStub,
TikvGrpc.TikvStub asyncStub,
@ -49,6 +57,7 @@ public abstract class AbstractRegionStoreClient
checkArgument(region.getLeader() != null, "Leader Peer is null");
this.region = region;
this.regionManager = regionManager;
this.targetStore = store;
}
public TiRegion getRegion() {
@ -71,13 +80,13 @@ public abstract class AbstractRegionStoreClient
/**
* onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed
*
* @param newStore the new store presented by NotLeader Error
* @param newRegion the new region presented by NotLeader Error
* @return false when re-split is needed.
*/
@Override
public boolean onNotLeader(Metapb.Store newStore, TiRegion newRegion) {
public boolean onNotLeader(TiRegion newRegion) {
if (logger.isDebugEnabled()) {
logger.debug(region + ", new leader = " + newStore.getId());
logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId());
}
// When switch leader fails or the region changed its region epoch,
// it would be necessary to re-split task's key range for new region.
@ -85,7 +94,8 @@ public abstract class AbstractRegionStoreClient
return false;
}
region = newRegion;
String addressStr = regionManager.getStoreById(region.getLeader().getStoreId()).getAddress();
targetStore = regionManager.getStoreById(region.getLeader().getStoreId());
String addressStr = targetStore.getStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
@ -94,20 +104,89 @@ public abstract class AbstractRegionStoreClient
}
@Override
public void onStoreNotMatch(Metapb.Store store) {
String addressStr = store.getAddress();
public boolean onStoreUnreachable() {
if (!conf.getEnableGrpcForward()) {
return false;
}
if (region.getProxyStore() != null) {
TiStore store = region.getProxyStore();
if (!checkHealth(store) && store.markUnreachable()) {
this.regionManager.scheduleHealthCheckJob(store);
}
} else {
if (!targetStore.isUnreachable()) {
if (checkHealth(targetStore)) {
return true;
} else {
if (targetStore.markUnreachable()) {
this.regionManager.scheduleHealthCheckJob(targetStore);
}
}
}
}
TiRegion proxyRegion = switchProxyStore();
if (proxyRegion == null) {
return false;
}
regionManager.updateRegion(region, proxyRegion);
region = proxyRegion;
String addressStr = region.getProxyStore().getStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
if (region.getLeader().getStoreId() != store.getId()) {
logger.warn(
"store_not_match may occur? "
+ region
+ ", original store = "
+ store.getId()
+ " address = "
+ addressStr);
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress());
blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header);
asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header);
return true;
}
private boolean checkHealth(TiStore store) {
if (store.getStore() == null) {
return false;
}
String addressStr = store.getStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
HealthGrpc.HealthBlockingStub stub =
HealthGrpc.newBlockingStub(channel)
.withDeadlineAfter(conf.getGrpcHealthCheckTimeout(), TimeUnit.MILLISECONDS);
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
try {
HealthCheckResponse resp = stub.check(req);
if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) {
return false;
}
} catch (Exception e) {
return false;
}
return true;
}
private TiRegion switchProxyStore() {
boolean hasVisitedStore = false;
List<Metapb.Peer> peers = region.getFollowerList();
for (int i = 0; i < peers.size() * 2; i++) {
int idx = i % peers.size();
Metapb.Peer peer = peers.get(idx);
if (peer.getStoreId() != region.getLeader().getStoreId()) {
if (region.getProxyStore() == null) {
TiStore store = regionManager.getStoreById(peer.getStoreId());
if (checkHealth(store)) {
return region.switchProxyStore(store);
}
} else {
TiStore proxyStore = region.getProxyStore();
if (peer.getStoreId() == proxyStore.getStore().getId()) {
hasVisitedStore = true;
} else if (hasVisitedStore) {
proxyStore = regionManager.getStoreById(peer.getStoreId());
if (!proxyStore.isUnreachable() && checkHealth(proxyStore)) {
return region.switchProxyStore(proxyStore);
}
}
}
}
}
return null;
}
}

View File

@ -17,12 +17,11 @@
package org.tikv.common.region;
import org.tikv.kvproto.Metapb.Store;
public interface RegionErrorReceiver {
boolean onNotLeader(Store store, TiRegion region);
boolean onNotLeader(TiRegion region);
void onStoreNotMatch(Store store);
/// return whether we need to retry this request.
boolean onStoreUnreachable();
TiRegion getRegion();
}

View File

@ -28,6 +28,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,11 +40,11 @@ import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.key.Key;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Peer;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreState;
@SuppressWarnings("UnstableApiUsage")
@ -50,6 +53,8 @@ public class RegionManager {
// TODO: the region cache logic need rewrite.
// https://github.com/pingcap/tispark/issues/1170
private final RegionCache cache;
private final ScheduledExecutorService executor;
private final UnreachableStoreChecker storeChecker;
private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;
@ -65,11 +70,33 @@ public class RegionManager {
ReadOnlyPDClient pdClient, Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
this.cache = new RegionCache(pdClient);
this.cacheInvalidateCallback = cacheInvalidateCallback;
this.executor = null;
this.storeChecker = null;
}
public RegionManager(
ReadOnlyPDClient pdClient,
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback,
ChannelFactory channelFactory,
boolean enableGrpcForward) {
this.cache = new RegionCache(pdClient);
this.cacheInvalidateCallback = cacheInvalidateCallback;
if (enableGrpcForward) {
UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient);
this.storeChecker = storeChecker;
this.executor = Executors.newScheduledThreadPool(1);
this.executor.scheduleAtFixedRate(storeChecker, 5, 5, TimeUnit.SECONDS);
} else {
this.storeChecker = null;
this.executor = null;
}
}
public RegionManager(ReadOnlyPDClient pdClient) {
this.cache = new RegionCache(pdClient);
this.cacheInvalidateCallback = null;
this.storeChecker = null;
this.executor = null;
}
public Function<CacheInvalidateEvent, Void> getCacheInvalidateCallback() {
@ -99,19 +126,19 @@ public class RegionManager {
return cache.getRegionById(ConcreteBackOffer.newGetBackOff(), regionId);
}
public Pair<TiRegion, Store> getRegionStorePairByKey(ByteString key, BackOffer backOffer) {
public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, BackOffer backOffer) {
return getRegionStorePairByKey(key, TiStoreType.TiKV, backOffer);
}
public Pair<TiRegion, Store> getRegionStorePairByKey(ByteString key) {
public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key) {
return getRegionStorePairByKey(key, TiStoreType.TiKV);
}
public Pair<TiRegion, Store> getRegionStorePairByKey(ByteString key, TiStoreType storeType) {
public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, TiStoreType storeType) {
return getRegionStorePairByKey(key, storeType, ConcreteBackOffer.newGetBackOff());
}
public Pair<TiRegion, Store> getRegionStorePairByKey(
public Pair<TiRegion, TiStore> getRegionStorePairByKey(
ByteString key, TiStoreType storeType, BackOffer backOffer) {
TiRegion region = cache.getRegionByKey(key, backOffer);
if (region == null) {
@ -121,7 +148,7 @@ public class RegionManager {
throw new TiClientInternalException("Region invalid: " + region.toString());
}
Store store = null;
TiStore store = null;
if (storeType == TiStoreType.TiKV) {
Peer peer = region.getCurrentReplica();
store = cache.getStoreById(peer.getStoreId(), backOffer);
@ -131,8 +158,8 @@ public class RegionManager {
} else {
outerLoop:
for (Peer peer : region.getLearnerList()) {
Store s = getStoreById(peer.getStoreId(), backOffer);
for (Metapb.StoreLabel label : s.getLabelsList()) {
TiStore s = getStoreById(peer.getStoreId(), backOffer);
for (Metapb.StoreLabel label : s.getStore().getLabelsList()) {
if (label.getKey().equals(storeType.getLabelKey())
&& label.getValue().equals(storeType.getLabelValue())) {
store = s;
@ -154,11 +181,11 @@ public class RegionManager {
return Pair.create(region, store);
}
public Store getStoreById(long id) {
public TiStore getStoreById(long id) {
return getStoreById(id, ConcreteBackOffer.newGetBackOff());
}
public Store getStoreById(long id, BackOffer backOffer) {
public TiStore getStoreById(long id, BackOffer backOffer) {
return cache.getStoreById(id, backOffer);
}
@ -184,6 +211,10 @@ public class RegionManager {
return null;
}
public boolean updateRegion(TiRegion oldRegion, TiRegion region) {
return cache.updateRegion(oldRegion, region);
}
/**
* Clears all cache when a TiKV server does not respond
*
@ -194,8 +225,10 @@ public class RegionManager {
}
private void onRequestFail(TiRegion region, long storeId) {
cache.invalidateRegion(region);
cache.invalidateAllRegionForStore(storeId);
if (this.storeChecker != null) {
cache.invalidateRegion(region);
cache.invalidateAllRegionForStore(storeId);
}
}
public void invalidateStore(long storeId) {
@ -206,9 +239,13 @@ public class RegionManager {
cache.invalidateRegion(region);
}
public void scheduleHealthCheckJob(TiStore store) {
this.storeChecker.scheduleStoreHealthCheck(store);
}
public static class RegionCache {
private final Map<Long, TiRegion> regionCache;
private final Map<Long, Store> storeCache;
private final Map<Long, TiStore> storeCache;
private final RangeMap<Key, Long> keyToRegionIdCache;
private final ReadOnlyPDClient pdClient;
@ -298,6 +335,26 @@ public class RegionManager {
}
}
public synchronized boolean updateRegion(TiRegion expected, TiRegion region) {
try {
if (logger.isDebugEnabled()) {
logger.debug(String.format("invalidateRegion ID[%s]", region.getId()));
}
TiRegion oldRegion = regionCache.get(region.getId());
if (expected != oldRegion) {
return false;
} else {
if (oldRegion != null) {
keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey()));
}
putRegion(region);
return true;
}
} catch (Exception ignore) {
return false;
}
}
public synchronized void invalidateAllRegionForStore(long storeId) {
List<TiRegion> regionToRemove = new ArrayList<>();
for (TiRegion r : regionCache.values()) {
@ -317,16 +374,19 @@ public class RegionManager {
}
public synchronized void invalidateStore(long storeId) {
storeCache.remove(storeId);
TiStore store = storeCache.remove(storeId);
if (store != null) {
store.markReachable();
}
}
public synchronized Store getStoreById(long id, BackOffer backOffer) {
public synchronized TiStore getStoreById(long id, BackOffer backOffer) {
try {
Store store = storeCache.get(id);
TiStore store = storeCache.get(id);
if (store == null) {
store = pdClient.getStore(backOffer, id);
store = new TiStore(pdClient.getStore(backOffer, id));
}
if (store.getState().equals(StoreState.Tombstone)) {
if (store.getStore().getState().equals(StoreState.Tombstone)) {
return null;
}
storeCache.put(id, store);

View File

@ -26,6 +26,8 @@ import com.google.protobuf.InvalidProtocolBufferException;
import com.pingcap.tidb.tipb.DAGRequest;
import com.pingcap.tidb.tipb.SelectResponse;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.prometheus.client.Histogram;
import java.util.*;
import java.util.function.Supplier;
@ -44,7 +46,6 @@ import org.tikv.common.util.*;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Kvrpcpb.*;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub;
@ -88,7 +89,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
private RegionStoreClient(
TiConfiguration conf,
TiRegion region,
String storeVersion,
TiStore store,
TiStoreType storeType,
ChannelFactory channelFactory,
TikvBlockingStub blockingStub,
@ -96,15 +97,15 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
RegionManager regionManager,
PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
super(conf, region, channelFactory, blockingStub, asyncStub, regionManager);
super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
this.storeType = storeType;
if (this.storeType == TiStoreType.TiKV) {
this.lockResolverClient =
AbstractLockResolverClient.getInstance(
storeVersion,
conf,
region,
store,
this.blockingStub,
this.asyncStub,
channelFactory,
@ -113,10 +114,10 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
clientBuilder);
} else {
Store tikvStore =
TiStore tikvStore =
regionManager.getRegionStorePairByKey(region.getStartKey(), TiStoreType.TiKV).second;
String addressStr = tikvStore.getAddress();
String addressStr = tikvStore.getStore().getAddress();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Create region store client on address %s", addressStr));
}
@ -127,9 +128,9 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
this.lockResolverClient =
AbstractLockResolverClient.getInstance(
tikvStore.getVersion(),
conf,
region,
tikvStore,
tikvBlockingStub,
tikvAsyncStub,
channelFactory,
@ -788,6 +789,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
new TiRegion(
region,
null,
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
@ -1260,25 +1262,48 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
this.pdClient = pdClient;
}
public RegionStoreClient build(TiRegion region, Store store, TiStoreType storeType)
public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType storeType)
throws GrpcException {
Objects.requireNonNull(region, "region is null");
Objects.requireNonNull(store, "store is null");
Objects.requireNonNull(storeType, "storeType is null");
String addressStr = store.getAddress();
String addressStr = store.getStore().getAddress();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Create region store client on address %s", addressStr));
}
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
ManagedChannel channel = null;
TikvBlockingStub blockingStub = TikvGrpc.newBlockingStub(channel);
TikvStub asyncStub = TikvGrpc.newStub(channel);
TikvBlockingStub blockingStub = null;
TikvStub asyncStub = null;
if (conf.getEnableGrpcForward() && region.getProxyStore() != null && store.isUnreachable()) {
addressStr = region.getProxyStore().getStore().getAddress();
channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header);
asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header);
} else {
// If the store is reachable, which is update by check-health thread
if (!store.isUnreachable()) {
if (region.getProxyStore() != null) {
TiRegion newRegion = region.switchProxyStore(null);
if (regionManager.updateRegion(region, newRegion)) {
region = newRegion;
}
}
}
channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
}
return new RegionStoreClient(
conf,
region,
store.getVersion(),
store,
storeType,
channelFactory,
blockingStub,
@ -1288,7 +1313,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
this);
}
public synchronized RegionStoreClient build(TiRegion region, Store store) throws GrpcException {
public synchronized RegionStoreClient build(TiRegion region, TiStore store)
throws GrpcException {
return build(region, store, TiStoreType.TiKV);
}
@ -1298,12 +1324,12 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
public synchronized RegionStoreClient build(ByteString key, TiStoreType storeType)
throws GrpcException {
Pair<TiRegion, Store> pair = regionManager.getRegionStorePairByKey(key, storeType);
Pair<TiRegion, TiStore> pair = regionManager.getRegionStorePairByKey(key, storeType);
return build(pair.first, pair.second, storeType);
}
public synchronized RegionStoreClient build(TiRegion region) throws GrpcException {
Store store = regionManager.getStoreById(region.getLeader().getStoreId());
TiStore store = regionManager.getStoreById(region.getLeader().getStoreId());
return build(region, store, TiStoreType.TiKV);
}

View File

@ -50,11 +50,13 @@ public class TiRegion implements Serializable {
private final Peer leader;
private final ReplicaSelector replicaSelector;
private final List<Peer> replicaList;
private final TiStore proxyStore;
private int replicaIdx;
public TiRegion(
Region meta,
Peer leader,
TiStore proxyStore,
IsolationLevel isolationLevel,
Kvrpcpb.CommandPri commandPri,
KVMode kvMode,
@ -65,6 +67,7 @@ public class TiRegion implements Serializable {
this.isolationLevel = isolationLevel;
this.commandPri = commandPri;
this.replicaSelector = replicaSelector;
this.proxyStore = proxyStore;
if (leader == null || leader.getId() == 0) {
if (meta.getPeersCount() == 0) {
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
@ -197,6 +200,10 @@ public class TiRegion implements Serializable {
meta.getId(), meta.getRegionEpoch().getConfVer(), meta.getRegionEpoch().getVersion());
}
public TiStore getProxyStore() {
return proxyStore;
}
/**
* switches current peer to the one on specific store. It return false if no peer matches the
* storeID.
@ -209,12 +216,29 @@ public class TiRegion implements Serializable {
for (Peer p : peers) {
if (p.getStoreId() == leaderStoreID) {
return new TiRegion(
this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaSelector);
this.meta,
p,
this.proxyStore,
this.isolationLevel,
this.commandPri,
this.kvMode,
this.replicaSelector);
}
}
return null;
}
public TiRegion switchProxyStore(TiStore store) {
return new TiRegion(
this.meta,
this.leader,
store,
this.isolationLevel,
this.commandPri,
this.kvMode,
this.replicaSelector);
}
public boolean isMoreThan(ByteString key) {
return FastByteComparisons.compareTo(
meta.getStartKey().toByteArray(),

View File

@ -0,0 +1,34 @@
package org.tikv.common.region;
import java.util.concurrent.atomic.AtomicBoolean;
import org.tikv.kvproto.Metapb;
public class TiStore {
private final Metapb.Store store;
private AtomicBoolean unreachable;
public TiStore(Metapb.Store store) {
this.store = store;
this.unreachable = new AtomicBoolean(false);
}
public boolean markUnreachable() {
return this.unreachable.compareAndSet(false, true);
}
public void markReachable() {
this.unreachable.set(false);
}
public boolean isUnreachable() {
return this.unreachable.get();
}
public Metapb.Store getStore() {
return this.store;
}
public long getId() {
return this.store.getId();
}
}

View File

@ -0,0 +1,77 @@
package org.tikv.common.region;
import io.grpc.ManagedChannel;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.util.ChannelFactory;
public class UnreachableStoreChecker implements Runnable {
private ConcurrentHashMap<Long, TiStore> stores;
private BlockingQueue<TiStore> taskQueue;
private final ChannelFactory channelFactory;
private final ReadOnlyPDClient pdClient;
public UnreachableStoreChecker(ChannelFactory channelFactory, ReadOnlyPDClient pdClient) {
this.stores = new ConcurrentHashMap();
this.taskQueue = new LinkedBlockingQueue<>();
this.channelFactory = channelFactory;
this.pdClient = pdClient;
}
public void scheduleStoreHealthCheck(TiStore store) {
TiStore oldStore = this.stores.get(Long.valueOf(store.getId()));
if (oldStore == store) {
return;
}
this.stores.put(Long.valueOf(store.getId()), store);
if (!this.taskQueue.add(store)) {
// add queue false, mark it reachable so that it can be put again.
store.markReachable();
}
}
private List<TiStore> getUnhealthStore() {
List<TiStore> unhealthStore = new LinkedList<>();
while (!this.taskQueue.isEmpty()) {
try {
TiStore store = this.taskQueue.take();
unhealthStore.add(store);
} catch (Exception e) {
return unhealthStore;
}
}
return unhealthStore;
}
@Override
public void run() {
List<TiStore> unhealthStore = getUnhealthStore();
for (TiStore store : unhealthStore) {
if (!store.isUnreachable()) {
continue;
}
String addressStr = store.getStore().getAddress();
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
HealthGrpc.HealthBlockingStub stub = HealthGrpc.newBlockingStub(channel);
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
try {
HealthCheckResponse resp = stub.check(req);
if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
store.markReachable();
this.stores.remove(Long.valueOf(store.getId()));
continue;
}
this.taskQueue.add(store);
} catch (Exception e) {
this.taskQueue.add(store);
}
}
}
}

View File

@ -29,9 +29,9 @@ import org.tikv.common.key.RowKey;
import org.tikv.common.pd.PDUtils;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.region.TiStoreType;
import org.tikv.kvproto.Coprocessor.KeyRange;
import org.tikv.kvproto.Metapb;
public class RangeSplitter {
private final RegionManager regionManager;
@ -51,12 +51,11 @@ public class RangeSplitter {
* @param handles Handle list
* @return <Region, HandleList> map
*/
public Map<Pair<TiRegion, Metapb.Store>, TLongArrayList> groupByAndSortHandlesByRegionId(
public Map<Pair<TiRegion, TiStore>, TLongArrayList> groupByAndSortHandlesByRegionId(
long tableId, TLongArrayList handles) {
TLongObjectHashMap<TLongArrayList> regionHandles = new TLongObjectHashMap<>();
TLongObjectHashMap<Pair<TiRegion, Metapb.Store>> idToRegionStorePair =
new TLongObjectHashMap<>();
Map<Pair<TiRegion, Metapb.Store>, TLongArrayList> result = new HashMap<>();
TLongObjectHashMap<Pair<TiRegion, TiStore>> idToRegionStorePair = new TLongObjectHashMap<>();
Map<Pair<TiRegion, TiStore>, TLongArrayList> result = new HashMap<>();
handles.sort();
byte[] endKey = null;
@ -71,7 +70,7 @@ public class RangeSplitter {
regionHandles.put(curRegion.getId(), handlesInCurRegion);
handlesInCurRegion = new TLongArrayList();
}
Pair<TiRegion, Metapb.Store> regionStorePair =
Pair<TiRegion, TiStore> regionStorePair =
regionManager.getRegionStorePairByKey(ByteString.copyFrom(key.getBytes()));
curRegion = regionStorePair.first;
idToRegionStorePair.put(curRegion.getId(), regionStorePair);
@ -84,7 +83,7 @@ public class RangeSplitter {
}
regionHandles.forEachEntry(
(k, v) -> {
Pair<TiRegion, Metapb.Store> regionStorePair = idToRegionStorePair.get(k);
Pair<TiRegion, TiStore> regionStorePair = idToRegionStorePair.get(k);
result.put(regionStorePair, v);
return true;
});
@ -110,7 +109,7 @@ public class RangeSplitter {
// Max value for current index handle range
ImmutableList.Builder<RegionTask> regionTasks = ImmutableList.builder();
Map<Pair<TiRegion, Metapb.Store>, TLongArrayList> regionHandlesMap =
Map<Pair<TiRegion, TiStore>, TLongArrayList> regionHandlesMap =
groupByAndSortHandlesByRegionId(tableId, handles);
regionHandlesMap.forEach((k, v) -> createTask(0, v.size(), tableId, v, k, regionTasks));
@ -123,7 +122,7 @@ public class RangeSplitter {
int endPos,
long tableId,
TLongArrayList handles,
Pair<TiRegion, Metapb.Store> regionStorePair,
Pair<TiRegion, TiStore> regionStorePair,
ImmutableList.Builder<RegionTask> regionTasks) {
List<KeyRange> newKeyRanges = new ArrayList<>(endPos - startPos + 1);
long startHandle = handles.get(startPos);
@ -163,10 +162,10 @@ public class RangeSplitter {
int i = 0;
KeyRange range = keyRanges.get(i++);
Map<Long, List<KeyRange>> idToRange = new HashMap<>(); // region id to keyRange list
Map<Long, Pair<TiRegion, Metapb.Store>> idToRegion = new HashMap<>();
Map<Long, Pair<TiRegion, TiStore>> idToRegion = new HashMap<>();
while (true) {
Pair<TiRegion, Metapb.Store> regionStorePair =
Pair<TiRegion, TiStore> regionStorePair =
regionManager.getRegionStorePairByKey(range.getStart(), storeType);
if (regionStorePair == null) {
@ -203,7 +202,7 @@ public class RangeSplitter {
ImmutableList.Builder<RegionTask> resultBuilder = ImmutableList.builder();
idToRange.forEach(
(k, v) -> {
Pair<TiRegion, Metapb.Store> regionStorePair = idToRegion.get(k);
Pair<TiRegion, TiStore> regionStorePair = idToRegion.get(k);
resultBuilder.add(new RegionTask(regionStorePair.first, regionStorePair.second, v));
});
return resultBuilder.build();
@ -221,24 +220,23 @@ public class RangeSplitter {
public static class RegionTask implements Serializable {
private final TiRegion region;
private final Metapb.Store store;
private final TiStore store;
private final List<KeyRange> ranges;
private final String host;
RegionTask(TiRegion region, Metapb.Store store, List<KeyRange> ranges) {
RegionTask(TiRegion region, TiStore store, List<KeyRange> ranges) {
this.region = region;
this.store = store;
this.ranges = ranges;
String host = null;
try {
host = PDUtils.addrToUri(store.getAddress()).getHost();
host = PDUtils.addrToUri(store.getStore().getAddress()).getHost();
} catch (Exception ignored) {
}
this.host = host;
}
public static RegionTask newInstance(
TiRegion region, Metapb.Store store, List<KeyRange> ranges) {
public static RegionTask newInstance(TiRegion region, TiStore store, List<KeyRange> ranges) {
return new RegionTask(region, store, ranges);
}
@ -246,7 +244,7 @@ public class RangeSplitter {
return region;
}
public Metapb.Store getStore() {
public TiStore getStore() {
return store;
}

View File

@ -26,6 +26,7 @@ import org.tikv.common.exception.KeyException;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.kvproto.Kvrpcpb;
@ -66,22 +67,23 @@ public interface AbstractLockResolverClient {
}
static AbstractLockResolverClient getInstance(
String storeVersion,
TiConfiguration conf,
TiRegion region,
TiStore store,
TikvGrpc.TikvBlockingStub blockingStub,
TikvGrpc.TikvStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager,
PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V3) < 0) {
if (StoreVersion.compareTo(store.getStore().getVersion(), Version.RESOLVE_LOCK_V3) < 0) {
return new LockResolverClientV2(
conf, region, blockingStub, asyncStub, channelFactory, regionManager);
} else if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V4) < 0) {
conf, region, store, blockingStub, asyncStub, channelFactory, regionManager);
} else if (StoreVersion.compareTo(store.getStore().getVersion(), Version.RESOLVE_LOCK_V4) < 0) {
return new LockResolverClientV3(
conf,
region,
store,
blockingStub,
asyncStub,
channelFactory,
@ -92,6 +94,7 @@ public interface AbstractLockResolverClient {
return new LockResolverClientV4(
conf,
region,
store,
blockingStub,
asyncStub,
channelFactory,

View File

@ -42,6 +42,7 @@ import org.tikv.common.region.AbstractRegionStoreClient;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiRegion.RegionVerID;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.TsoUtils;
@ -74,11 +75,12 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
public LockResolverClientV2(
TiConfiguration conf,
TiRegion region,
TiStore store,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager) {
super(conf, region, channelFactory, blockingStub, asyncStub, regionManager);
super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
resolved = new HashMap<>();
recentResolved = new LinkedList<>();
readWriteLock = new ReentrantReadWriteLock();

View File

@ -39,10 +39,7 @@ import org.tikv.common.exception.KeyException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.operation.KVErrorHandler;
import org.tikv.common.region.AbstractRegionStoreClient;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.*;
import org.tikv.common.region.TiRegion.RegionVerID;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
@ -79,13 +76,14 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient
public LockResolverClientV3(
TiConfiguration conf,
TiRegion region,
TiStore store,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager,
PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
super(conf, region, channelFactory, blockingStub, asyncStub, regionManager);
super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
resolved = new HashMap<>();
recentResolved = new LinkedList<>();
readWriteLock = new ReentrantReadWriteLock();

View File

@ -39,10 +39,7 @@ import org.tikv.common.exception.KeyException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.operation.KVErrorHandler;
import org.tikv.common.region.AbstractRegionStoreClient;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.*;
import org.tikv.common.region.TiRegion.RegionVerID;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
@ -79,13 +76,14 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
public LockResolverClientV4(
TiConfiguration conf,
TiRegion region,
TiStore store,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager,
PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
super(conf, region, channelFactory, blockingStub, asyncStub, regionManager);
super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
resolved = new HashMap<>();
recentResolved = new LinkedList<>();
readWriteLock = new ReentrantReadWriteLock();

View File

@ -30,11 +30,11 @@ import org.tikv.common.exception.TiBatchWriteException;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.txn.type.ClientRPCResult;
/**
@ -105,9 +105,9 @@ public class TTLManager {
}
private void sendTxnHeartBeat(BackOffer bo, long ttl) {
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByKey(primaryLock);
Pair<TiRegion, TiStore> pair = regionManager.getRegionStorePairByKey(primaryLock);
TiRegion tiRegion = pair.first;
Metapb.Store store = pair.second;
TiStore store = pair.second;
ClientRPCResult result = kvClient.txnHeartBeat(bo, primaryLock, startTS, ttl, tiRegion, store);
@ -121,7 +121,7 @@ public class TTLManager {
new GrpcException(
String.format("sendTxnHeartBeat failed, regionId=%s", tiRegion.getId()),
result.getException()));
this.regionManager.invalidateStore(store.getId());
this.regionManager.invalidateStore(store.getStore().getId());
this.regionManager.invalidateRegion(tiRegion);
// re-split keys and commit again.
sendTxnHeartBeat(bo, ttl);

View File

@ -38,13 +38,13 @@ import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiBatchWriteException;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Kvrpcpb.Op;
import org.tikv.kvproto.Metapb;
import org.tikv.txn.type.BatchKeys;
import org.tikv.txn.type.ClientRPCResult;
import org.tikv.txn.type.GroupKeyResult;
@ -150,9 +150,9 @@ public class TwoPhaseCommitter {
private void doPrewritePrimaryKeyWithRetry(BackOffer backOffer, ByteString key, ByteString value)
throws TiBatchWriteException {
Pair<TiRegion, Metapb.Store> pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
Pair<TiRegion, TiStore> pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
TiRegion tiRegion = pair.first;
Metapb.Store store = pair.second;
TiStore store = pair.second;
Kvrpcpb.Mutation mutation;
if (!value.isEmpty()) {
@ -205,9 +205,9 @@ public class TwoPhaseCommitter {
private void doCommitPrimaryKeyWithRetry(BackOffer backOffer, ByteString key, long commitTs)
throws TiBatchWriteException {
Pair<TiRegion, Metapb.Store> pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
Pair<TiRegion, TiStore> pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
TiRegion tiRegion = pair.first;
Metapb.Store store = pair.second;
TiStore store = pair.second;
ByteString[] keys = new ByteString[] {key};
// send rpc request to tikv server
@ -339,11 +339,11 @@ public class TwoPhaseCommitter {
// groups keys by region
GroupKeyResult groupResult = this.groupKeysByRegion(keys, size, backOffer);
List<BatchKeys> batchKeyList = new LinkedList<>();
Map<Pair<TiRegion, Metapb.Store>, List<ByteString>> groupKeyMap = groupResult.getGroupsResult();
Map<Pair<TiRegion, TiStore>, List<ByteString>> groupKeyMap = groupResult.getGroupsResult();
for (Map.Entry<Pair<TiRegion, Metapb.Store>, List<ByteString>> entry : groupKeyMap.entrySet()) {
for (Map.Entry<Pair<TiRegion, TiStore>, List<ByteString>> entry : groupKeyMap.entrySet()) {
TiRegion tiRegion = entry.getKey().first;
Metapb.Store store = entry.getKey().second;
TiStore store = entry.getKey().second;
this.appendBatchBySize(batchKeyList, tiRegion, store, entry.getValue(), true, mutations);
}
@ -454,7 +454,7 @@ public class TwoPhaseCommitter {
private void appendBatchBySize(
List<BatchKeys> batchKeyList,
TiRegion tiRegion,
Metapb.Store store,
TiStore store,
List<ByteString> keys,
boolean sizeIncludeValue,
Map<ByteString, Kvrpcpb.Mutation> mutations) {
@ -575,11 +575,11 @@ public class TwoPhaseCommitter {
// groups keys by region
GroupKeyResult groupResult = this.groupKeysByRegion(keys, size, backOffer);
List<BatchKeys> batchKeyList = new ArrayList<>();
Map<Pair<TiRegion, Metapb.Store>, List<ByteString>> groupKeyMap = groupResult.getGroupsResult();
Map<Pair<TiRegion, TiStore>, List<ByteString>> groupKeyMap = groupResult.getGroupsResult();
for (Map.Entry<Pair<TiRegion, Metapb.Store>, List<ByteString>> entry : groupKeyMap.entrySet()) {
for (Map.Entry<Pair<TiRegion, TiStore>, List<ByteString>> entry : groupKeyMap.entrySet()) {
TiRegion tiRegion = entry.getKey().first;
Metapb.Store store = entry.getKey().second;
TiStore store = entry.getKey().second;
this.appendBatchBySize(batchKeyList, tiRegion, store, entry.getValue(), false, null);
}
@ -619,13 +619,12 @@ public class TwoPhaseCommitter {
private GroupKeyResult groupKeysByRegion(ByteString[] keys, int size, BackOffer backOffer)
throws TiBatchWriteException {
Map<Pair<TiRegion, Metapb.Store>, List<ByteString>> groups = new HashMap<>();
Map<Pair<TiRegion, TiStore>, List<ByteString>> groups = new HashMap<>();
int index = 0;
try {
for (; index < size; index++) {
ByteString key = keys[index];
Pair<TiRegion, Metapb.Store> pair =
this.regionManager.getRegionStorePairByKey(key, backOffer);
Pair<TiRegion, TiStore> pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
if (pair != null) {
groups.computeIfAbsent(pair, e -> new ArrayList<>()).add(key);
}

View File

@ -33,11 +33,11 @@ import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
import org.tikv.txn.type.ClientRPCResult;
/** KV client of transaction APIs for GET/PUT/DELETE/SCAN */
@ -94,7 +94,7 @@ public class TxnKVClient implements AutoCloseable {
long lockTTL,
long startTs,
TiRegion tiRegion,
Metapb.Store store) {
TiStore store) {
ClientRPCResult result = new ClientRPCResult(true, false, null);
// send request
RegionStoreClient client = clientBuilder.build(tiRegion, store);
@ -116,7 +116,7 @@ public class TxnKVClient implements AutoCloseable {
long startTs,
long ttl,
TiRegion tiRegion,
Metapb.Store store) {
TiStore store) {
ClientRPCResult result = new ClientRPCResult(true, false, null);
// send request
RegionStoreClient client = clientBuilder.build(tiRegion, store);
@ -148,7 +148,7 @@ public class TxnKVClient implements AutoCloseable {
long startTs,
long commitTs,
TiRegion tiRegion,
Metapb.Store store) {
TiStore store) {
ClientRPCResult result = new ClientRPCResult(true, false, null);
// send request
RegionStoreClient client = clientBuilder.build(tiRegion, store);

View File

@ -19,16 +19,15 @@ import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Metapb;
import org.tikv.common.region.TiStore;
public class BatchKeys {
private final TiRegion region;
private final Metapb.Store store;
private final TiStore store;
private List<ByteString> keys;
private final int sizeInBytes;
public BatchKeys(
TiRegion region, Metapb.Store store, List<ByteString> keysInput, int sizeInBytes) {
public BatchKeys(TiRegion region, TiStore store, List<ByteString> keysInput, int sizeInBytes) {
this.region = region;
this.store = store;
this.keys = new ArrayList<>();
@ -48,7 +47,7 @@ public class BatchKeys {
return region;
}
public Metapb.Store getStore() {
public TiStore getStore() {
return store;
}

View File

@ -20,22 +20,22 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
public class GroupKeyResult {
private Map<Pair<TiRegion, Metapb.Store>, List<ByteString>> groupsResult;
private Map<Pair<TiRegion, TiStore>, List<ByteString>> groupsResult;
public GroupKeyResult() {
this.groupsResult = new HashMap<>();
}
public Map<Pair<TiRegion, Metapb.Store>, List<ByteString>> getGroupsResult() {
public Map<Pair<TiRegion, TiStore>, List<ByteString>> getGroupsResult() {
return groupsResult;
}
public void setGroupsResult(Map<Pair<TiRegion, Metapb.Store>, List<ByteString>> groupsResult) {
public void setGroupsResult(Map<Pair<TiRegion, TiStore>, List<ByteString>> groupsResult) {
this.groupsResult = groupsResult;
}
}

View File

@ -32,6 +32,7 @@ public class MockServerTest extends PDMockServerTest {
new TiRegion(
r,
r.getPeers(0),
null,
session.getConf().getIsolationLevel(),
session.getConf().getCommandPriority(),
KVMode.TXN,

View File

@ -26,10 +26,10 @@ import org.junit.Test;
import org.tikv.common.key.Key;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreState;
public class RegionManagerTest extends PDMockServerTest {
@ -115,7 +115,7 @@ public class RegionManagerTest extends PDMockServerTest {
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
Pair<TiRegion, Store> pair = mgr.getRegionStorePairByKey(searchKey);
Pair<TiRegion, TiStore> pair = mgr.getRegionStorePairByKey(searchKey);
assertEquals(pair.first.getId(), regionId);
assertEquals(pair.first.getId(), storeId);
}
@ -133,8 +133,8 @@ public class RegionManagerTest extends PDMockServerTest {
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
Store store = mgr.getStoreById(storeId);
assertEquals(store.getId(), storeId);
TiStore store = mgr.getStoreById(storeId);
assertEquals(store.getStore().getId(), storeId);
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(

View File

@ -25,6 +25,7 @@ import org.junit.Test;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;
@ -41,13 +42,14 @@ public class RegionStoreClientTest extends MockServerTest {
}
private RegionStoreClient createClient(String version) {
Metapb.Store store =
Metapb.Store meta =
Metapb.Store.newBuilder()
.setAddress(LOCAL_ADDR + ":" + port)
.setId(1)
.setState(Metapb.StoreState.Up)
.setVersion(version)
.build();
TiStore store = new TiStore(meta);
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(