mirror of https://github.com/tikv/client-java.git
Try other peer when current leader of this region is not available. (#232)
* Auto switch leader peer rather than ask PD. Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
This commit is contained in:
parent
48b104f196
commit
d4d1c6ac6a
|
|
@ -89,9 +89,6 @@ public abstract class AbstractGRPCClient<
|
|||
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
|
||||
},
|
||||
method.getFullMethodName());
|
||||
if (resp != null && this.conf.getEnableGrpcForward()) {
|
||||
tryUpdateProxy();
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
|
||||
|
|
@ -180,8 +177,6 @@ public abstract class AbstractGRPCClient<
|
|||
|
||||
protected abstract StubT getAsyncStub();
|
||||
|
||||
protected abstract void tryUpdateProxy();
|
||||
|
||||
protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
|
||||
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
|
||||
HealthGrpc.HealthBlockingStub stub =
|
||||
|
|
|
|||
|
|
@ -557,9 +557,6 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tryUpdateProxy() {}
|
||||
|
||||
private void initCluster() {
|
||||
GetMembersResponse resp = null;
|
||||
List<URI> pdAddrs = getConf().getPdAddrs();
|
||||
|
|
|
|||
|
|
@ -94,6 +94,8 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
Errorpb.Error error = regionHandler.getRegionError(resp);
|
||||
if (error != null) {
|
||||
return regionHandler.handleRegionError(backOffer, error);
|
||||
} else {
|
||||
regionHandler.tryUpdateRegionStore();
|
||||
}
|
||||
|
||||
// Key error handling logic
|
||||
|
|
|
|||
|
|
@ -42,10 +42,16 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
Errorpb.Error error = getRegionError(resp);
|
||||
if (error != null) {
|
||||
return handleRegionError(backOffer, error);
|
||||
} else {
|
||||
tryUpdateRegionStore();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void tryUpdateRegionStore() {
|
||||
recv.tryUpdateRegionStore();
|
||||
}
|
||||
|
||||
public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
|
||||
if (error.hasNotLeader()) {
|
||||
// this error is reported from raftstore:
|
||||
|
|
|
|||
|
|
@ -22,11 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.health.v1.HealthCheckRequest;
|
||||
import io.grpc.health.v1.HealthCheckResponse;
|
||||
import io.grpc.health.v1.HealthGrpc;
|
||||
import io.grpc.stub.MetadataUtils;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -34,6 +32,7 @@ import org.tikv.common.AbstractGRPCClient;
|
|||
import org.tikv.common.TiConfiguration;
|
||||
import org.tikv.common.exception.GrpcException;
|
||||
import org.tikv.common.util.ChannelFactory;
|
||||
import org.tikv.kvproto.Kvrpcpb;
|
||||
import org.tikv.kvproto.Metapb;
|
||||
import org.tikv.kvproto.TikvGrpc;
|
||||
|
||||
|
|
@ -46,7 +45,9 @@ public abstract class AbstractRegionStoreClient
|
|||
protected TiRegion region;
|
||||
protected TiStore targetStore;
|
||||
protected TiStore originStore;
|
||||
protected long retryTimes;
|
||||
private long retryForwardTimes;
|
||||
private long retryLeaderTimes;
|
||||
private Metapb.Peer candidateLeader;
|
||||
|
||||
protected AbstractRegionStoreClient(
|
||||
TiConfiguration conf,
|
||||
|
|
@ -64,12 +65,17 @@ public abstract class AbstractRegionStoreClient
|
|||
this.regionManager = regionManager;
|
||||
this.targetStore = store;
|
||||
this.originStore = null;
|
||||
this.retryTimes = 0;
|
||||
this.candidateLeader = null;
|
||||
this.retryForwardTimes = 0;
|
||||
this.retryLeaderTimes = 0;
|
||||
if (this.targetStore.getProxyStore() != null) {
|
||||
this.timeout = conf.getForwardTimeout();
|
||||
} else if (!this.targetStore.isReachable() && !this.targetStore.canForwardFirst()) {
|
||||
onStoreUnreachable();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TiRegion getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
|
@ -103,44 +109,155 @@ public abstract class AbstractRegionStoreClient
|
|||
if (!region.getRegionEpoch().equals(newRegion.getRegionEpoch())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we try one peer but find the leader has not changed, we do not need try other peers.
|
||||
if (candidateLeader != null
|
||||
&& region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) {
|
||||
retryLeaderTimes = newRegion.getFollowerList().size();
|
||||
originStore = null;
|
||||
}
|
||||
candidateLeader = null;
|
||||
region = newRegion;
|
||||
targetStore = regionManager.getStoreById(region.getLeader().getStoreId());
|
||||
originStore = null;
|
||||
String addressStr = targetStore.getStore().getAddress();
|
||||
ManagedChannel channel =
|
||||
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
|
||||
blockingStub = TikvGrpc.newBlockingStub(channel);
|
||||
asyncStub = TikvGrpc.newStub(channel);
|
||||
updateClientStub();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onStoreUnreachable() {
|
||||
if (!conf.getEnableGrpcForward()) {
|
||||
regionManager.onRequestFail(region);
|
||||
return false;
|
||||
}
|
||||
if (targetStore.getProxyStore() == null) {
|
||||
if (!targetStore.isUnreachable()) {
|
||||
if (checkHealth(targetStore.getStore())) {
|
||||
return true;
|
||||
}
|
||||
if (targetStore.isReachable()) {
|
||||
return true;
|
||||
}
|
||||
} else if (retryTimes > region.getFollowerList().size()) {
|
||||
}
|
||||
|
||||
// If this store has failed to forward request too many times, we shall try other peer at first
|
||||
// so that we can
|
||||
// reduce the latency cost by fail requests.
|
||||
if (targetStore.canForwardFirst()) {
|
||||
if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) {
|
||||
return retryOtherStoreByProxyForward();
|
||||
}
|
||||
if (retryOtherStoreLeader()) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (retryOtherStoreLeader()) {
|
||||
return true;
|
||||
}
|
||||
if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) {
|
||||
return retryOtherStoreByProxyForward();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
logger.warn(
|
||||
String.format(
|
||||
"retry time exceed for region[%d], invalid this region[%d]",
|
||||
region.getId(), targetStore.getId()));
|
||||
regionManager.onRequestFail(region);
|
||||
return false;
|
||||
}
|
||||
|
||||
protected Kvrpcpb.Context makeContext(TiStoreType storeType) {
|
||||
if (candidateLeader != null && storeType == TiStoreType.TiKV) {
|
||||
return region.getReplicaContext(candidateLeader, java.util.Collections.emptySet());
|
||||
} else {
|
||||
return region.getReplicaContext(java.util.Collections.emptySet(), storeType);
|
||||
}
|
||||
}
|
||||
|
||||
protected Kvrpcpb.Context makeContext(Set<Long> resolvedLocks, TiStoreType storeType) {
|
||||
if (candidateLeader != null && storeType == TiStoreType.TiKV) {
|
||||
return region.getReplicaContext(candidateLeader, resolvedLocks);
|
||||
} else {
|
||||
return region.getReplicaContext(resolvedLocks, storeType);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tryUpdateRegionStore() {
|
||||
if (originStore != null) {
|
||||
if (originStore.getId() == targetStore.getId()) {
|
||||
logger.warn(
|
||||
String.format(
|
||||
"update store [%s] by proxy-store [%s]",
|
||||
targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress()));
|
||||
// We do not need to mark the store can-forward, because if one store has grpc forward
|
||||
// successfully, it will
|
||||
// create a new store object, which is can-forward.
|
||||
regionManager.updateStore(originStore, targetStore);
|
||||
} else {
|
||||
// If we try to forward request to leader by follower failed, it means that the store of old
|
||||
// leader may be
|
||||
// unavailable but the new leader has not been report to PD. So we can ban this store for a
|
||||
// short time to
|
||||
// avoid too many request try forward rather than try other peer.
|
||||
originStore.forwardFail();
|
||||
}
|
||||
}
|
||||
if (candidateLeader != null) {
|
||||
logger.warn(
|
||||
String.format(
|
||||
"retry time exceed for region[%d], invalid this region[%d]",
|
||||
region.getId(), targetStore.getId()));
|
||||
regionManager.onRequestFail(region);
|
||||
"update leader to store [%d] for region[%d]",
|
||||
candidateLeader.getStoreId(), region.getId()));
|
||||
this.regionManager.updateLeader(region, candidateLeader.getStoreId());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean retryOtherStoreLeader() {
|
||||
List<Metapb.Peer> peers = region.getFollowerList();
|
||||
if (retryLeaderTimes >= peers.size()) {
|
||||
return false;
|
||||
}
|
||||
retryLeaderTimes += 1;
|
||||
boolean hasVisitedStore = false;
|
||||
for (Metapb.Peer cur : peers) {
|
||||
if (candidateLeader == null || hasVisitedStore) {
|
||||
TiStore store = regionManager.getStoreById(cur.getStoreId());
|
||||
if (store != null && store.isReachable()) {
|
||||
targetStore = store;
|
||||
candidateLeader = cur;
|
||||
logger.warn(
|
||||
String.format(
|
||||
"try store [%d],peer[%d] for region[%d], which may be new leader",
|
||||
targetStore.getId(), candidateLeader.getId(), region.getId()));
|
||||
updateClientStub();
|
||||
return true;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else if (candidateLeader.getId() == cur.getId()) {
|
||||
hasVisitedStore = true;
|
||||
}
|
||||
}
|
||||
candidateLeader = null;
|
||||
retryLeaderTimes = peers.size();
|
||||
return false;
|
||||
}
|
||||
|
||||
private void updateClientStub() {
|
||||
String addressStr = targetStore.getStore().getAddress();
|
||||
ManagedChannel channel =
|
||||
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
|
||||
blockingStub = TikvGrpc.newBlockingStub(channel);
|
||||
asyncStub = TikvGrpc.newStub(channel);
|
||||
}
|
||||
|
||||
private boolean retryOtherStoreByProxyForward() {
|
||||
if (!targetStore.isValid()) {
|
||||
targetStore = regionManager.getStoreById(targetStore.getId());
|
||||
logger.warn(
|
||||
String.format("store [%d] has been invalid", region.getId(), targetStore.getId()));
|
||||
return true;
|
||||
}
|
||||
|
||||
TiStore proxyStore = switchProxyStore();
|
||||
if (proxyStore == null) {
|
||||
logger.warn(
|
||||
String.format(
|
||||
"no forward store can be selected for store [%s] and region[%d]",
|
||||
targetStore.getStore().getAddress(), region.getId()));
|
||||
regionManager.onRequestFail(region);
|
||||
return false;
|
||||
}
|
||||
if (originStore == null) {
|
||||
|
|
@ -150,7 +267,7 @@ public abstract class AbstractRegionStoreClient
|
|||
}
|
||||
}
|
||||
targetStore = proxyStore;
|
||||
retryTimes += 1;
|
||||
retryForwardTimes += 1;
|
||||
logger.warn(
|
||||
String.format(
|
||||
"forward request to store [%s] by store [%s] for region[%d]",
|
||||
|
|
@ -167,58 +284,24 @@ public abstract class AbstractRegionStoreClient
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tryUpdateProxy() {
|
||||
if (originStore != null) {
|
||||
logger.warn(
|
||||
String.format(
|
||||
"update store [%s] by proxy-store [%s]",
|
||||
targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress()));
|
||||
regionManager.updateStore(originStore, targetStore);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkHealth(Metapb.Store store) {
|
||||
String addressStr = store.getAddress();
|
||||
ManagedChannel channel =
|
||||
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
|
||||
HealthGrpc.HealthBlockingStub stub =
|
||||
HealthGrpc.newBlockingStub(channel)
|
||||
.withDeadlineAfter(conf.getGrpcHealthCheckTimeout(), TimeUnit.MILLISECONDS);
|
||||
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
|
||||
try {
|
||||
HealthCheckResponse resp = stub.check(req);
|
||||
if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) {
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private TiStore switchProxyStore() {
|
||||
boolean hasVisitedStore = false;
|
||||
List<Metapb.Peer> peers = region.getFollowerList();
|
||||
for (int i = 0; i < peers.size() * 2; i++) {
|
||||
int idx = i % peers.size();
|
||||
Metapb.Peer peer = peers.get(idx);
|
||||
if (peer.getStoreId() != region.getLeader().getStoreId()) {
|
||||
if (targetStore.getProxyStore() == null) {
|
||||
TiStore store = regionManager.getStoreById(peer.getStoreId());
|
||||
if (checkHealth(store.getStore())) {
|
||||
return targetStore.withProxy(store.getStore());
|
||||
}
|
||||
} else {
|
||||
if (peer.getStoreId() == targetStore.getProxyStore().getId()) {
|
||||
hasVisitedStore = true;
|
||||
} else if (hasVisitedStore) {
|
||||
TiStore proxyStore = regionManager.getStoreById(peer.getStoreId());
|
||||
if (!proxyStore.isUnreachable() && checkHealth(proxyStore.getStore())) {
|
||||
return targetStore.withProxy(proxyStore.getStore());
|
||||
}
|
||||
}
|
||||
if (peers.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
Metapb.Store proxyStore = targetStore.getProxyStore();
|
||||
if (proxyStore == null || peers.get(peers.size() - 1).getStoreId() == proxyStore.getId()) {
|
||||
hasVisitedStore = true;
|
||||
}
|
||||
for (Metapb.Peer peer : peers) {
|
||||
if (hasVisitedStore) {
|
||||
TiStore store = regionManager.getStoreById(peer.getStoreId());
|
||||
if (store.isReachable()) {
|
||||
return targetStore.withProxy(store.getStore());
|
||||
}
|
||||
} else if (peer.getStoreId() == proxyStore.getId()) {
|
||||
hasVisitedStore = true;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,186 @@
|
|||
package org.tikv.common.region;
|
||||
|
||||
import static org.tikv.common.codec.KeyUtils.formatBytesUTF8;
|
||||
import static org.tikv.common.util.KeyRangeUtils.makeRange;
|
||||
|
||||
import com.google.common.collect.RangeMap;
|
||||
import com.google.common.collect.TreeRangeMap;
|
||||
import com.google.protobuf.ByteString;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.tikv.common.key.Key;
|
||||
import org.tikv.common.util.BackOffer;
|
||||
|
||||
public class RegionCache {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RegionCache.class);
|
||||
|
||||
private final Map<Long, TiRegion> regionCache;
|
||||
private final Map<Long, TiStore> storeCache;
|
||||
private final RangeMap<Key, Long> keyToRegionIdCache;
|
||||
|
||||
public RegionCache() {
|
||||
regionCache = new HashMap<>();
|
||||
storeCache = new HashMap<>();
|
||||
|
||||
keyToRegionIdCache = TreeRangeMap.create();
|
||||
}
|
||||
|
||||
public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
|
||||
Long regionId;
|
||||
if (key.isEmpty()) {
|
||||
// if key is empty, it must be the start key.
|
||||
regionId = keyToRegionIdCache.get(Key.toRawKey(key, true));
|
||||
} else {
|
||||
regionId = keyToRegionIdCache.get(Key.toRawKey(key));
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(
|
||||
String.format("getRegionByKey key[%s] -> ID[%s]", formatBytesUTF8(key), regionId));
|
||||
}
|
||||
|
||||
if (regionId == null) {
|
||||
return null;
|
||||
}
|
||||
TiRegion region;
|
||||
region = regionCache.get(regionId);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
|
||||
}
|
||||
return region;
|
||||
}
|
||||
|
||||
public synchronized TiRegion putRegion(TiRegion region) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("putRegion: " + region);
|
||||
}
|
||||
TiRegion oldRegion = regionCache.get(region.getId());
|
||||
if (oldRegion != null) {
|
||||
if (oldRegion.getMeta().equals(region.getMeta())) {
|
||||
return oldRegion;
|
||||
} else {
|
||||
invalidateRegion(oldRegion);
|
||||
}
|
||||
}
|
||||
regionCache.put(region.getId(), region);
|
||||
keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId());
|
||||
return region;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public synchronized TiRegion getRegionById(long regionId) {
|
||||
TiRegion region = regionCache.get(regionId);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
|
||||
}
|
||||
return region;
|
||||
}
|
||||
|
||||
private synchronized TiRegion getRegionFromCache(long regionId) {
|
||||
return regionCache.get(regionId);
|
||||
}
|
||||
|
||||
/** Removes region associated with regionId from regionCache. */
|
||||
public synchronized void invalidateRegion(TiRegion region) {
|
||||
try {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("invalidateRegion ID[%s]", region.getId()));
|
||||
}
|
||||
TiRegion oldRegion = regionCache.get(region.getId());
|
||||
if (oldRegion != null && oldRegion == region) {
|
||||
keyToRegionIdCache.remove(makeRange(region.getStartKey(), region.getEndKey()));
|
||||
regionCache.remove(region.getId());
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized boolean updateRegion(TiRegion expected, TiRegion region) {
|
||||
try {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("invalidateRegion ID[%s]", region.getId()));
|
||||
}
|
||||
TiRegion oldRegion = regionCache.get(region.getId());
|
||||
if (!expected.getMeta().equals(oldRegion.getMeta())) {
|
||||
return false;
|
||||
} else {
|
||||
if (oldRegion != null) {
|
||||
keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey()));
|
||||
}
|
||||
regionCache.put(region.getId(), region);
|
||||
keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId());
|
||||
return true;
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized boolean updateStore(TiStore oldStore, TiStore newStore) {
|
||||
if (!newStore.isValid()) {
|
||||
return false;
|
||||
}
|
||||
TiStore originStore = storeCache.get(oldStore.getId());
|
||||
if (originStore == oldStore) {
|
||||
storeCache.put(newStore.getId(), newStore);
|
||||
oldStore.markInvalid();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public synchronized void invalidateAllRegionForStore(TiStore store) {
|
||||
TiStore oldStore = storeCache.get(store.getId());
|
||||
if (oldStore != store) {
|
||||
return;
|
||||
}
|
||||
List<TiRegion> regionToRemove = new ArrayList<>();
|
||||
for (TiRegion r : regionCache.values()) {
|
||||
if (r.getLeader().getStoreId() == store.getId()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("invalidateAllRegionForStore Region[%s]", r));
|
||||
}
|
||||
regionToRemove.add(r);
|
||||
}
|
||||
}
|
||||
|
||||
logger.warn(String.format("invalid store [%d]", store.getId()));
|
||||
// remove region
|
||||
for (TiRegion r : regionToRemove) {
|
||||
keyToRegionIdCache.remove(makeRange(r.getStartKey(), r.getEndKey()));
|
||||
regionCache.remove(r.getId());
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void invalidateStore(long storeId) {
|
||||
TiStore store = storeCache.remove(storeId);
|
||||
if (store != null) {
|
||||
store.markInvalid();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized TiStore getStoreById(long id) {
|
||||
return storeCache.get(id);
|
||||
}
|
||||
|
||||
public synchronized boolean putStore(long id, TiStore store) {
|
||||
TiStore oldStore = storeCache.get(id);
|
||||
if (oldStore != null) {
|
||||
if (oldStore.equals(store)) {
|
||||
return false;
|
||||
} else {
|
||||
oldStore.markInvalid();
|
||||
}
|
||||
}
|
||||
storeCache.put(id, store);
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized void clearAll() {
|
||||
keyToRegionIdCache.clear();
|
||||
regionCache.clear();
|
||||
}
|
||||
}
|
||||
|
|
@ -23,5 +23,7 @@ public interface RegionErrorReceiver {
|
|||
/// return whether we need to retry this request.
|
||||
boolean onStoreUnreachable();
|
||||
|
||||
void tryUpdateRegionStore();
|
||||
|
||||
TiRegion getRegion();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,16 +18,10 @@
|
|||
package org.tikv.common.region;
|
||||
|
||||
import static org.tikv.common.codec.KeyUtils.formatBytesUTF8;
|
||||
import static org.tikv.common.util.KeyRangeUtils.makeRange;
|
||||
|
||||
import com.google.common.collect.RangeMap;
|
||||
import com.google.common.collect.TreeRangeMap;
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.prometheus.client.Histogram;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -40,7 +34,6 @@ import org.tikv.common.TiConfiguration;
|
|||
import org.tikv.common.event.CacheInvalidateEvent;
|
||||
import org.tikv.common.exception.GrpcException;
|
||||
import org.tikv.common.exception.TiClientInternalException;
|
||||
import org.tikv.common.key.Key;
|
||||
import org.tikv.common.util.BackOffer;
|
||||
import org.tikv.common.util.ChannelFactory;
|
||||
import org.tikv.common.util.ConcreteBackOffer;
|
||||
|
|
@ -52,22 +45,22 @@ import org.tikv.kvproto.Metapb.StoreState;
|
|||
@SuppressWarnings("UnstableApiUsage")
|
||||
public class RegionManager {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RegionManager.class);
|
||||
public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY =
|
||||
Histogram.build()
|
||||
.name("client_java_get_region_by_requests_latency")
|
||||
.help("getRegionByKey request latency.")
|
||||
.register();
|
||||
|
||||
// TODO: the region cache logic need rewrite.
|
||||
// https://github.com/pingcap/tispark/issues/1170
|
||||
private final RegionCache cache;
|
||||
private final ReadOnlyPDClient pdClient;
|
||||
private final TiConfiguration conf;
|
||||
private final ScheduledExecutorService executor;
|
||||
private final UnreachableStoreChecker storeChecker;
|
||||
private final StoreHealthyChecker storeChecker;
|
||||
|
||||
private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;
|
||||
|
||||
public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY =
|
||||
Histogram.build()
|
||||
.name("client_java_get_region_by_requests_latency")
|
||||
.help("getRegionByKey request latency.")
|
||||
.register();
|
||||
|
||||
// To avoid double retrieval, we used the async version of grpc
|
||||
// When rpc not returned, instead of call again, it wait for previous one done
|
||||
public RegionManager(
|
||||
|
|
@ -94,10 +87,11 @@ public class RegionManager {
|
|||
this.conf = conf;
|
||||
|
||||
if (enableGrpcForward) {
|
||||
UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient);
|
||||
StoreHealthyChecker storeChecker =
|
||||
new StoreHealthyChecker(channelFactory, pdClient, this.cache);
|
||||
this.storeChecker = storeChecker;
|
||||
this.executor = Executors.newScheduledThreadPool(1);
|
||||
this.executor.scheduleAtFixedRate(storeChecker, 10, 10, TimeUnit.SECONDS);
|
||||
this.executor.scheduleAtFixedRate(storeChecker, 1, 1, TimeUnit.SECONDS);
|
||||
} else {
|
||||
this.storeChecker = null;
|
||||
this.executor = null;
|
||||
|
|
@ -132,13 +126,21 @@ public class RegionManager {
|
|||
}
|
||||
|
||||
public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
|
||||
Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
|
||||
TiRegion region = cache.getRegionByKey(key, backOffer);
|
||||
if (region == null) {
|
||||
logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key));
|
||||
Pair<Metapb.Region, Metapb.Peer> regionAndLeader = pdClient.getRegionByKey(backOffer, key);
|
||||
region =
|
||||
cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer));
|
||||
try {
|
||||
if (region == null) {
|
||||
logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key));
|
||||
Pair<Metapb.Region, Metapb.Peer> regionAndLeader = pdClient.getRegionByKey(backOffer, key);
|
||||
region =
|
||||
cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
} finally {
|
||||
requestTimer.observeDuration();
|
||||
}
|
||||
|
||||
return region;
|
||||
}
|
||||
|
||||
|
|
@ -232,7 +234,10 @@ public class RegionManager {
|
|||
if (store.getStore().getState().equals(StoreState.Tombstone)) {
|
||||
return null;
|
||||
}
|
||||
return cache.putStore(id, store);
|
||||
if (cache.putStore(id, store)) {
|
||||
storeChecker.scheduleStoreHealthCheck(store);
|
||||
}
|
||||
return store;
|
||||
} catch (Exception e) {
|
||||
throw new GrpcException(e);
|
||||
}
|
||||
|
|
@ -246,7 +251,10 @@ public class RegionManager {
|
|||
cache.invalidateRegion(region);
|
||||
}
|
||||
|
||||
public synchronized TiRegion updateLeader(TiRegion region, long storeId) {
|
||||
public TiRegion updateLeader(TiRegion region, long storeId) {
|
||||
if (region.getLeader().getStoreId() == storeId) {
|
||||
return region;
|
||||
}
|
||||
TiRegion newRegion = region.switchPeer(storeId);
|
||||
if (cache.updateRegion(region, newRegion)) {
|
||||
return newRegion;
|
||||
|
|
@ -259,13 +267,7 @@ public class RegionManager {
|
|||
|
||||
public synchronized void updateStore(TiStore oldStore, TiStore newStore) {
|
||||
if (cache.updateStore(oldStore, newStore)) {
|
||||
if (newStore.isUnreachable()) {
|
||||
logger.warn(
|
||||
String.format(
|
||||
"check health for store [%s] in background thread",
|
||||
newStore.getStore().getAddress()));
|
||||
this.storeChecker.scheduleStoreHealthCheck(newStore);
|
||||
}
|
||||
storeChecker.scheduleStoreHealthCheck(newStore);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -283,6 +285,24 @@ public class RegionManager {
|
|||
cache.invalidateRegion(region);
|
||||
}
|
||||
|
||||
/** If region has changed, return the new one and update cache. */
|
||||
public TiRegion getRegionSkipCache(TiRegion region) {
|
||||
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
|
||||
try {
|
||||
Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
|
||||
pdClient.getRegionByID(backOffer, region.getId());
|
||||
if (!regionAndLeader.first.equals(region.getMeta())) {
|
||||
region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
|
||||
return cache.putRegion(region);
|
||||
} else {
|
||||
logger.warn("Cannot get region from PD for region id: " + region.getId());
|
||||
return null;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void invalidateStore(long storeId) {
|
||||
cache.invalidateStore(storeId);
|
||||
}
|
||||
|
|
@ -290,177 +310,4 @@ public class RegionManager {
|
|||
public void invalidateRegion(TiRegion region) {
|
||||
cache.invalidateRegion(region);
|
||||
}
|
||||
|
||||
public static class RegionCache {
|
||||
private final Map<Long, TiRegion> regionCache;
|
||||
private final Map<Long, TiStore> storeCache;
|
||||
private final RangeMap<Key, Long> keyToRegionIdCache;
|
||||
|
||||
public RegionCache() {
|
||||
regionCache = new HashMap<>();
|
||||
storeCache = new HashMap<>();
|
||||
|
||||
keyToRegionIdCache = TreeRangeMap.create();
|
||||
}
|
||||
|
||||
public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
|
||||
Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
|
||||
try {
|
||||
Long regionId;
|
||||
if (key.isEmpty()) {
|
||||
// if key is empty, it must be the start key.
|
||||
regionId = keyToRegionIdCache.get(Key.toRawKey(key, true));
|
||||
} else {
|
||||
regionId = keyToRegionIdCache.get(Key.toRawKey(key));
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(
|
||||
String.format("getRegionByKey key[%s] -> ID[%s]", formatBytesUTF8(key), regionId));
|
||||
}
|
||||
|
||||
if (regionId == null) {
|
||||
return null;
|
||||
}
|
||||
TiRegion region;
|
||||
region = regionCache.get(regionId);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
|
||||
}
|
||||
|
||||
return region;
|
||||
} finally {
|
||||
requestTimer.observeDuration();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized TiRegion putRegion(TiRegion region) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("putRegion: " + region);
|
||||
}
|
||||
TiRegion oldRegion = regionCache.get(region.getId());
|
||||
if (oldRegion != null) {
|
||||
if (oldRegion.getMeta().equals(region.getMeta())) {
|
||||
return oldRegion;
|
||||
} else {
|
||||
invalidateRegion(oldRegion);
|
||||
}
|
||||
}
|
||||
regionCache.put(region.getId(), region);
|
||||
keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId());
|
||||
return region;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
private synchronized TiRegion getRegionById(long regionId) {
|
||||
TiRegion region = regionCache.get(regionId);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
|
||||
}
|
||||
return region;
|
||||
}
|
||||
|
||||
private synchronized TiRegion getRegionFromCache(long regionId) {
|
||||
return regionCache.get(regionId);
|
||||
}
|
||||
|
||||
/** Removes region associated with regionId from regionCache. */
|
||||
public synchronized void invalidateRegion(TiRegion region) {
|
||||
try {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("invalidateRegion ID[%s]", region.getId()));
|
||||
}
|
||||
TiRegion oldRegion = regionCache.get(region.getId());
|
||||
if (oldRegion != null && oldRegion == region) {
|
||||
keyToRegionIdCache.remove(makeRange(region.getStartKey(), region.getEndKey()));
|
||||
regionCache.remove(region.getId());
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized boolean updateRegion(TiRegion expected, TiRegion region) {
|
||||
try {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("invalidateRegion ID[%s]", region.getId()));
|
||||
}
|
||||
TiRegion oldRegion = regionCache.get(region.getId());
|
||||
if (!expected.getMeta().equals(oldRegion.getMeta())) {
|
||||
return false;
|
||||
} else {
|
||||
if (oldRegion != null) {
|
||||
keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey()));
|
||||
}
|
||||
regionCache.put(region.getId(), region);
|
||||
keyToRegionIdCache.put(
|
||||
makeRange(region.getStartKey(), region.getEndKey()), region.getId());
|
||||
return true;
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized boolean updateStore(TiStore oldStore, TiStore newStore) {
|
||||
TiStore originStore = storeCache.get(oldStore.getId());
|
||||
if (originStore == oldStore) {
|
||||
storeCache.put(newStore.getId(), newStore);
|
||||
if (oldStore != null && oldStore.isUnreachable()) {
|
||||
oldStore.markReachable();
|
||||
}
|
||||
if (newStore.getProxyStore() != null) {
|
||||
newStore.markUnreachable();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public synchronized void invalidateAllRegionForStore(TiStore store) {
|
||||
TiStore oldStore = storeCache.get(store.getId());
|
||||
if (oldStore != store) {
|
||||
return;
|
||||
}
|
||||
List<TiRegion> regionToRemove = new ArrayList<>();
|
||||
for (TiRegion r : regionCache.values()) {
|
||||
if (r.getLeader().getStoreId() == store.getId()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("invalidateAllRegionForStore Region[%s]", r));
|
||||
}
|
||||
regionToRemove.add(r);
|
||||
}
|
||||
}
|
||||
|
||||
logger.warn(String.format("invalid store [%d]", store.getId()));
|
||||
// remove region
|
||||
for (TiRegion r : regionToRemove) {
|
||||
keyToRegionIdCache.remove(makeRange(r.getStartKey(), r.getEndKey()));
|
||||
regionCache.remove(r.getId());
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void invalidateStore(long storeId) {
|
||||
TiStore store = storeCache.remove(storeId);
|
||||
if (store != null) {
|
||||
store.markReachable();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized TiStore getStoreById(long id) {
|
||||
return storeCache.get(id);
|
||||
}
|
||||
|
||||
public synchronized TiStore putStore(long id, TiStore store) {
|
||||
TiStore oldStore = storeCache.get(id);
|
||||
if (oldStore != null && oldStore.getStore().equals(store.getStore())) {
|
||||
return oldStore;
|
||||
}
|
||||
storeCache.put(id, store);
|
||||
return store;
|
||||
}
|
||||
|
||||
public synchronized void clearAll() {
|
||||
keyToRegionIdCache.clear();
|
||||
regionCache.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<GetRequest> factory =
|
||||
() ->
|
||||
GetRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType))
|
||||
.setContext(makeContext(getResolvedLocks(version), this.storeType))
|
||||
.setKey(key)
|
||||
.setVersion(version)
|
||||
.build();
|
||||
|
|
@ -216,7 +216,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<BatchGetRequest> request =
|
||||
() ->
|
||||
BatchGetRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType))
|
||||
.setContext(makeContext(getResolvedLocks(version), this.storeType))
|
||||
.addAllKeys(keys)
|
||||
.setVersion(version)
|
||||
.build();
|
||||
|
|
@ -279,7 +279,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<ScanRequest> request =
|
||||
() ->
|
||||
ScanRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType))
|
||||
.setContext(makeContext(getResolvedLocks(version), this.storeType))
|
||||
.setStartKey(startKey)
|
||||
.setVersion(version)
|
||||
.setKeyOnly(keyOnly)
|
||||
|
|
@ -381,7 +381,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
() ->
|
||||
getIsV4()
|
||||
? PrewriteRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.setStartVersion(startTs)
|
||||
.setPrimaryLock(primaryLock)
|
||||
.addAllMutations(mutations)
|
||||
|
|
@ -391,7 +391,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
.setTxnSize(16)
|
||||
.build()
|
||||
: PrewriteRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.setStartVersion(startTs)
|
||||
.setPrimaryLock(primaryLock)
|
||||
.addAllMutations(mutations)
|
||||
|
|
@ -471,7 +471,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<TxnHeartBeatRequest> factory =
|
||||
() ->
|
||||
TxnHeartBeatRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.setStartVersion(startTs)
|
||||
.setPrimaryLock(primaryLock)
|
||||
.setAdviseLockTtl(ttl)
|
||||
|
|
@ -529,7 +529,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
.setStartVersion(startTs)
|
||||
.setCommitVersion(commitTs)
|
||||
.addAllKeys(keys)
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.build();
|
||||
KVErrorHandler<CommitResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
|
|
@ -590,7 +590,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<Coprocessor.Request> reqToSend =
|
||||
() ->
|
||||
Coprocessor.Request.newBuilder()
|
||||
.setContext(region.getReplicaContext(getResolvedLocks(startTs), this.storeType))
|
||||
.setContext(makeContext(getResolvedLocks(startTs), this.storeType))
|
||||
.setTp(REQ_TYPE_DAG.getValue())
|
||||
.setStartTs(startTs)
|
||||
.setData(req.toByteString())
|
||||
|
|
@ -713,7 +713,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<Coprocessor.Request> reqToSend =
|
||||
() ->
|
||||
Coprocessor.Request.newBuilder()
|
||||
.setContext(region.getReplicaContext(getResolvedLocks(startTs), this.storeType))
|
||||
.setContext(makeContext(getResolvedLocks(startTs), this.storeType))
|
||||
// TODO: If no executors...?
|
||||
.setTp(REQ_TYPE_DAG.getValue())
|
||||
.setData(req.toByteString())
|
||||
|
|
@ -751,7 +751,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<SplitRegionRequest> request =
|
||||
() ->
|
||||
SplitRegionRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.addAllSplitKeys(splitKeys)
|
||||
.build();
|
||||
|
||||
|
|
@ -792,11 +792,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer();
|
||||
try {
|
||||
Supplier<RawGetRequest> factory =
|
||||
() ->
|
||||
RawGetRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setKey(key)
|
||||
.build();
|
||||
() -> RawGetRequest.newBuilder().setContext(makeContext(storeType)).setKey(key).build();
|
||||
RegionErrorHandler<RawGetResponse> handler =
|
||||
new RegionErrorHandler<RawGetResponse>(
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
|
|
@ -833,7 +829,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<RawGetKeyTTLRequest> factory =
|
||||
() ->
|
||||
RawGetKeyTTLRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.setKey(key)
|
||||
.build();
|
||||
RegionErrorHandler<RawGetKeyTTLResponse> handler =
|
||||
|
|
@ -872,7 +868,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<RawDeleteRequest> factory =
|
||||
() ->
|
||||
RawDeleteRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.setKey(key)
|
||||
.setForCas(atomicForCAS)
|
||||
.build();
|
||||
|
|
@ -910,7 +906,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<RawPutRequest> factory =
|
||||
() ->
|
||||
RawPutRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.setKey(key)
|
||||
.setValue(value)
|
||||
.setTtl(ttl)
|
||||
|
|
@ -954,7 +950,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<RawCASRequest> factory =
|
||||
() ->
|
||||
RawCASRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.setKey(key)
|
||||
.setValue(value)
|
||||
.setPreviousValue(prevValue.orElse(ByteString.EMPTY))
|
||||
|
|
@ -1007,7 +1003,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<RawBatchGetRequest> factory =
|
||||
() ->
|
||||
RawBatchGetRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.addAllKeys(keys)
|
||||
.build();
|
||||
RegionErrorHandler<RawBatchGetResponse> handler =
|
||||
|
|
@ -1043,7 +1039,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<RawBatchPutRequest> factory =
|
||||
() ->
|
||||
RawBatchPutRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.addAllPairs(kvPairs)
|
||||
.setTtl(ttl)
|
||||
.setForCas(atomicForCAS)
|
||||
|
|
@ -1095,7 +1091,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<RawBatchDeleteRequest> factory =
|
||||
() ->
|
||||
RawBatchDeleteRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.addAllKeys(keys)
|
||||
.setForCas(atomicForCAS)
|
||||
.build();
|
||||
|
|
@ -1140,7 +1136,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<RawScanRequest> factory =
|
||||
() ->
|
||||
RawScanRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.setStartKey(key)
|
||||
.setKeyOnly(keyOnly)
|
||||
.setLimit(limit)
|
||||
|
|
@ -1186,7 +1182,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
Supplier<RawDeleteRangeRequest> factory =
|
||||
() ->
|
||||
RawDeleteRangeRequest.newBuilder()
|
||||
.setContext(region.getReplicaContext(storeType))
|
||||
.setContext(makeContext(storeType))
|
||||
.setStartKey(startKey)
|
||||
.setEndKey(endKey)
|
||||
.build();
|
||||
|
|
@ -1269,7 +1265,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
TikvBlockingStub blockingStub = null;
|
||||
TikvStub asyncStub = null;
|
||||
|
||||
if (conf.getEnableGrpcForward() && store.getProxyStore() != null && store.isUnreachable()) {
|
||||
if (conf.getEnableGrpcForward() && store.getProxyStore() != null && !store.isReachable()) {
|
||||
addressStr = store.getProxyStore().getAddress();
|
||||
channel =
|
||||
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
|
||||
|
|
@ -1278,18 +1274,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header);
|
||||
asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header);
|
||||
} else {
|
||||
// If the store is reachable, which is update by check-health thread, cancel proxy forward.
|
||||
if (!store.isUnreachable()) {
|
||||
if (store.getProxyStore() != null) {
|
||||
logger.warn(
|
||||
String.format(
|
||||
"cancel request to store [%s] forward by store[%s]",
|
||||
store.getStore().getAddress(), store.getProxyStore().getAddress()));
|
||||
TiStore newStore = store.withProxy(null);
|
||||
regionManager.updateStore(store, newStore);
|
||||
store = newStore;
|
||||
}
|
||||
}
|
||||
channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
|
||||
blockingStub = TikvGrpc.newBlockingStub(channel);
|
||||
asyncStub = TikvGrpc.newStub(channel);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,150 @@
|
|||
package org.tikv.common.region;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.health.v1.HealthCheckRequest;
|
||||
import io.grpc.health.v1.HealthCheckResponse;
|
||||
import io.grpc.health.v1.HealthGrpc;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.tikv.common.ReadOnlyPDClient;
|
||||
import org.tikv.common.util.ChannelFactory;
|
||||
import org.tikv.common.util.ConcreteBackOffer;
|
||||
import org.tikv.kvproto.Metapb;
|
||||
|
||||
public class StoreHealthyChecker implements Runnable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class);
|
||||
private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60;
|
||||
private static final long SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK = 500;
|
||||
private BlockingQueue<TiStore> taskQueue;
|
||||
private final ChannelFactory channelFactory;
|
||||
private final ReadOnlyPDClient pdClient;
|
||||
private final RegionCache cache;
|
||||
private long checkTombstoneTick;
|
||||
|
||||
public StoreHealthyChecker(
|
||||
ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache) {
|
||||
this.taskQueue = new LinkedBlockingQueue<>();
|
||||
this.channelFactory = channelFactory;
|
||||
this.pdClient = pdClient;
|
||||
this.cache = cache;
|
||||
this.checkTombstoneTick = 0;
|
||||
}
|
||||
|
||||
public boolean scheduleStoreHealthCheck(TiStore store) {
|
||||
if (!this.taskQueue.add(store)) {
|
||||
// add queue false, mark it reachable so that it can be put again.
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private List<TiStore> getValidStores() {
|
||||
List<TiStore> unhealthStore = new LinkedList<>();
|
||||
while (!this.taskQueue.isEmpty()) {
|
||||
try {
|
||||
TiStore store = this.taskQueue.take();
|
||||
if (!store.isValid()) {
|
||||
continue;
|
||||
}
|
||||
unhealthStore.add(store);
|
||||
} catch (Exception e) {
|
||||
return unhealthStore;
|
||||
}
|
||||
}
|
||||
return unhealthStore;
|
||||
}
|
||||
|
||||
private boolean checkStoreHealth(TiStore store) {
|
||||
String addressStr = store.getStore().getAddress();
|
||||
try {
|
||||
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
|
||||
HealthGrpc.HealthBlockingStub stub =
|
||||
HealthGrpc.newBlockingStub(channel).withDeadlineAfter(200, TimeUnit.MILLISECONDS);
|
||||
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
|
||||
HealthCheckResponse resp = stub.check(req);
|
||||
if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkStoreTombstone(TiStore store) {
|
||||
try {
|
||||
Metapb.Store newStore = pdClient.getStore(ConcreteBackOffer.newRawKVBackOff(), store.getId());
|
||||
if (newStore.getState() == Metapb.StoreState.Tombstone) {
|
||||
return true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
checkTombstoneTick += 1;
|
||||
boolean needCheckTombstoneStore = false;
|
||||
if (checkTombstoneTick >= MAX_CHECK_STORE_TOMBSTONE_TICK) {
|
||||
needCheckTombstoneStore = true;
|
||||
checkTombstoneTick = 0;
|
||||
}
|
||||
List<TiStore> allStores = getValidStores();
|
||||
List<TiStore> unreachableStore = new LinkedList<>();
|
||||
for (TiStore store : allStores) {
|
||||
if (needCheckTombstoneStore) {
|
||||
if (checkStoreTombstone(store)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (checkStoreHealth(store)) {
|
||||
if (store.getProxyStore() != null) {
|
||||
TiStore newStore = store.withProxy(null);
|
||||
logger.warn(String.format("store [%s] recovers to be reachable", store.getAddress()));
|
||||
if (cache.putStore(newStore.getId(), newStore)) {
|
||||
this.taskQueue.add(newStore);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
if (!store.isReachable()) {
|
||||
logger.warn(
|
||||
String.format(
|
||||
"store [%s] recovers to be reachable and canforward", store.getAddress()));
|
||||
store.markReachable();
|
||||
}
|
||||
if (!store.canForwardFirst()) {
|
||||
store.makrCanForward();
|
||||
}
|
||||
}
|
||||
} else if (store.isReachable()) {
|
||||
unreachableStore.add(store);
|
||||
continue;
|
||||
}
|
||||
this.taskQueue.add(store);
|
||||
}
|
||||
if (!unreachableStore.isEmpty()) {
|
||||
try {
|
||||
Thread.sleep(SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK);
|
||||
} catch (Exception e) {
|
||||
this.taskQueue.addAll(unreachableStore);
|
||||
return;
|
||||
}
|
||||
for (TiStore store : unreachableStore) {
|
||||
if (!checkStoreHealth(store)) {
|
||||
logger.warn(String.format("store [%s] is not reachable", store.getAddress()));
|
||||
store.markUnreachable();
|
||||
}
|
||||
this.taskQueue.add(store);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -142,20 +142,21 @@ public class TiRegion implements Serializable {
|
|||
}
|
||||
|
||||
public Kvrpcpb.Context getLeaderContext() {
|
||||
return getContext(this.leader, java.util.Collections.emptySet(), TiStoreType.TiKV);
|
||||
}
|
||||
|
||||
public Kvrpcpb.Context getReplicaContext(TiStoreType storeType) {
|
||||
return getContext(getCurrentReplica(), java.util.Collections.emptySet(), storeType);
|
||||
return getContext(this.leader, java.util.Collections.emptySet(), false);
|
||||
}
|
||||
|
||||
public Kvrpcpb.Context getReplicaContext(Set<Long> resolvedLocks, TiStoreType storeType) {
|
||||
return getContext(getCurrentReplica(), resolvedLocks, storeType);
|
||||
Peer currentPeer = getCurrentReplica();
|
||||
boolean replicaRead = !isLeader(currentPeer) && TiStoreType.TiKV.equals(storeType);
|
||||
return getContext(currentPeer, resolvedLocks, replicaRead);
|
||||
}
|
||||
|
||||
public Kvrpcpb.Context getReplicaContext(Peer currentPeer, Set<Long> resolvedLocks) {
|
||||
return getContext(currentPeer, resolvedLocks, false);
|
||||
}
|
||||
|
||||
private Kvrpcpb.Context getContext(
|
||||
Peer currentPeer, Set<Long> resolvedLocks, TiStoreType storeType) {
|
||||
boolean replicaRead = !isLeader(currentPeer) && TiStoreType.TiKV.equals(storeType);
|
||||
Peer currentPeer, Set<Long> resolvedLocks, boolean replicaRead) {
|
||||
|
||||
Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder();
|
||||
builder
|
||||
|
|
|
|||
|
|
@ -1,23 +1,60 @@
|
|||
package org.tikv.common.region;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.tikv.kvproto.Metapb;
|
||||
|
||||
public class TiStore {
|
||||
private static long MAX_FAIL_FORWARD_TIMES = 4;
|
||||
private final Metapb.Store store;
|
||||
private final Metapb.Store proxyStore;
|
||||
private AtomicBoolean unreachable;
|
||||
private AtomicBoolean reachable;
|
||||
private AtomicBoolean valid;
|
||||
private AtomicLong failForwardCount;
|
||||
private AtomicBoolean canForward;
|
||||
|
||||
public TiStore(Metapb.Store store) {
|
||||
this.store = store;
|
||||
this.unreachable = new AtomicBoolean(false);
|
||||
this.reachable = new AtomicBoolean(true);
|
||||
this.valid = new AtomicBoolean(true);
|
||||
this.canForward = new AtomicBoolean(true);
|
||||
this.proxyStore = null;
|
||||
this.failForwardCount = new AtomicLong(0);
|
||||
}
|
||||
|
||||
private TiStore(Metapb.Store store, Metapb.Store proxyStore) {
|
||||
this.store = store;
|
||||
this.unreachable = new AtomicBoolean(false);
|
||||
if (proxyStore != null) {
|
||||
this.reachable = new AtomicBoolean(false);
|
||||
} else {
|
||||
this.reachable = new AtomicBoolean(true);
|
||||
}
|
||||
this.valid = new AtomicBoolean(true);
|
||||
this.canForward = new AtomicBoolean(true);
|
||||
this.proxyStore = proxyStore;
|
||||
this.failForwardCount = new AtomicLong(0);
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
public boolean equals(final java.lang.Object obj) {
|
||||
if (obj == this) {
|
||||
return true;
|
||||
}
|
||||
if (!(obj instanceof TiStore)) {
|
||||
return super.equals(obj);
|
||||
}
|
||||
TiStore other = (TiStore) obj;
|
||||
if (!this.store.equals(other.store)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (proxyStore == null && other.proxyStore == null) {
|
||||
return true;
|
||||
}
|
||||
if (proxyStore != null && other.proxyStore != null) {
|
||||
return proxyStore.equals(other.proxyStore);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public TiStore withProxy(Metapb.Store proxyStore) {
|
||||
|
|
@ -25,15 +62,40 @@ public class TiStore {
|
|||
}
|
||||
|
||||
public void markUnreachable() {
|
||||
this.unreachable.set(true);
|
||||
this.reachable.set(false);
|
||||
}
|
||||
|
||||
public void markReachable() {
|
||||
this.unreachable.set(false);
|
||||
this.reachable.set(true);
|
||||
}
|
||||
|
||||
public boolean isUnreachable() {
|
||||
return this.unreachable.get();
|
||||
public boolean isReachable() {
|
||||
return this.reachable.get();
|
||||
}
|
||||
|
||||
public boolean isValid() {
|
||||
return this.valid.get();
|
||||
}
|
||||
|
||||
public void markInvalid() {
|
||||
this.valid.set(false);
|
||||
}
|
||||
|
||||
public void forwardFail() {
|
||||
if (this.canForward.get()) {
|
||||
if (this.failForwardCount.addAndGet(1) >= MAX_FAIL_FORWARD_TIMES) {
|
||||
this.canForward.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void makrCanForward() {
|
||||
this.failForwardCount.set(0);
|
||||
this.canForward.set(true);
|
||||
}
|
||||
|
||||
public boolean canForwardFirst() {
|
||||
return this.canForward.get();
|
||||
}
|
||||
|
||||
public Metapb.Store getStore() {
|
||||
|
|
|
|||
|
|
@ -1,90 +0,0 @@
|
|||
package org.tikv.common.region;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.health.v1.HealthCheckRequest;
|
||||
import io.grpc.health.v1.HealthCheckResponse;
|
||||
import io.grpc.health.v1.HealthGrpc;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.tikv.common.ReadOnlyPDClient;
|
||||
import org.tikv.common.util.ChannelFactory;
|
||||
import org.tikv.common.util.ConcreteBackOffer;
|
||||
import org.tikv.kvproto.Metapb;
|
||||
|
||||
public class UnreachableStoreChecker implements Runnable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(UnreachableStoreChecker.class);
|
||||
private ConcurrentHashMap<Long, TiStore> stores;
|
||||
private BlockingQueue<TiStore> taskQueue;
|
||||
private final ChannelFactory channelFactory;
|
||||
private final ReadOnlyPDClient pdClient;
|
||||
|
||||
public UnreachableStoreChecker(ChannelFactory channelFactory, ReadOnlyPDClient pdClient) {
|
||||
this.stores = new ConcurrentHashMap();
|
||||
this.taskQueue = new LinkedBlockingQueue<>();
|
||||
this.channelFactory = channelFactory;
|
||||
this.pdClient = pdClient;
|
||||
}
|
||||
|
||||
public void scheduleStoreHealthCheck(TiStore store) {
|
||||
TiStore oldStore = this.stores.get(Long.valueOf(store.getId()));
|
||||
if (oldStore == store) {
|
||||
return;
|
||||
}
|
||||
this.stores.put(Long.valueOf(store.getId()), store);
|
||||
if (!this.taskQueue.add(store)) {
|
||||
// add queue false, mark it reachable so that it can be put again.
|
||||
store.markReachable();
|
||||
}
|
||||
}
|
||||
|
||||
private List<TiStore> getUnhealthStore() {
|
||||
List<TiStore> unhealthStore = new LinkedList<>();
|
||||
while (!this.taskQueue.isEmpty()) {
|
||||
try {
|
||||
TiStore store = this.taskQueue.take();
|
||||
unhealthStore.add(store);
|
||||
} catch (Exception e) {
|
||||
return unhealthStore;
|
||||
}
|
||||
}
|
||||
return unhealthStore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
List<TiStore> unhealthStore = getUnhealthStore();
|
||||
for (TiStore store : unhealthStore) {
|
||||
if (!store.isUnreachable()) {
|
||||
continue;
|
||||
}
|
||||
String addressStr = store.getStore().getAddress();
|
||||
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
|
||||
HealthGrpc.HealthBlockingStub stub = HealthGrpc.newBlockingStub(channel);
|
||||
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
|
||||
try {
|
||||
HealthCheckResponse resp = stub.check(req);
|
||||
if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
|
||||
store.markReachable();
|
||||
logger.warn(
|
||||
String.format("store [%s] recovers to be reachable", store.getStore().getAddress()));
|
||||
|
||||
this.stores.remove(Long.valueOf(store.getId()));
|
||||
continue;
|
||||
}
|
||||
Metapb.Store newStore =
|
||||
pdClient.getStore(ConcreteBackOffer.newRawKVBackOff(), store.getId());
|
||||
if (newStore.getState() == Metapb.StoreState.Tombstone) {
|
||||
continue;
|
||||
}
|
||||
this.taskQueue.add(store);
|
||||
} catch (Exception e) {
|
||||
this.taskQueue.add(store);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -102,7 +102,7 @@ public class ConcreteBackOffer implements BackOffer {
|
|||
backOffFunction = BackOffFunction.create(200, 3000, BackOffStrategy.EqualJitter);
|
||||
break;
|
||||
case BoPDRPC:
|
||||
backOffFunction = BackOffFunction.create(500, 3000, BackOffStrategy.EqualJitter);
|
||||
backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter);
|
||||
break;
|
||||
case BoTiKVRPC:
|
||||
backOffFunction = BackOffFunction.create(100, 400, BackOffStrategy.EqualJitter);
|
||||
|
|
|
|||
Loading…
Reference in New Issue