diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index 7f1de71a40..813b98629a 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -89,9 +89,6 @@ public abstract class AbstractGRPCClient< stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()); }, method.getFullMethodName()); - if (resp != null && this.conf.getEnableGrpcForward()) { - tryUpdateProxy(); - } if (logger.isTraceEnabled()) { logger.trace(String.format("leaving %s...", method.getFullMethodName())); @@ -180,8 +177,6 @@ public abstract class AbstractGRPCClient< protected abstract StubT getAsyncStub(); - protected abstract void tryUpdateProxy(); - protected boolean checkHealth(String addressStr, HostMapping hostMapping) { ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); HealthGrpc.HealthBlockingStub stub = diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index fcd9e5ef57..8d108afe2b 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -557,9 +557,6 @@ public class PDClient extends AbstractGRPCClient return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); } - @Override - protected void tryUpdateProxy() {} - private void initCluster() { GetMembersResponse resp = null; List pdAddrs = getConf().getPdAddrs(); diff --git a/src/main/java/org/tikv/common/operation/KVErrorHandler.java b/src/main/java/org/tikv/common/operation/KVErrorHandler.java index 3321b56074..dfbe24c5a6 100644 --- a/src/main/java/org/tikv/common/operation/KVErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/KVErrorHandler.java @@ -94,6 +94,8 @@ public class KVErrorHandler implements ErrorHandler { Errorpb.Error error = regionHandler.getRegionError(resp); if (error != null) { return regionHandler.handleRegionError(backOffer, error); + } else { + regionHandler.tryUpdateRegionStore(); } // Key error handling logic diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 608f631d91..9aa678b41e 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -42,10 +42,16 @@ public class RegionErrorHandler implements ErrorHandler { Errorpb.Error error = getRegionError(resp); if (error != null) { return handleRegionError(backOffer, error); + } else { + tryUpdateRegionStore(); } return false; } + public void tryUpdateRegionStore() { + recv.tryUpdateRegionStore(); + } + public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { if (error.hasNotLeader()) { // this error is reported from raftstore: diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 49740c56fe..9edb94c0f4 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -22,11 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.ManagedChannel; import io.grpc.Metadata; -import io.grpc.health.v1.HealthCheckRequest; -import io.grpc.health.v1.HealthCheckResponse; -import io.grpc.health.v1.HealthGrpc; import io.grpc.stub.MetadataUtils; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +32,7 @@ import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; import org.tikv.common.util.ChannelFactory; +import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; @@ -46,7 +45,9 @@ public abstract class AbstractRegionStoreClient protected TiRegion region; protected TiStore targetStore; protected TiStore originStore; - protected long retryTimes; + private long retryForwardTimes; + private long retryLeaderTimes; + private Metapb.Peer candidateLeader; protected AbstractRegionStoreClient( TiConfiguration conf, @@ -64,12 +65,17 @@ public abstract class AbstractRegionStoreClient this.regionManager = regionManager; this.targetStore = store; this.originStore = null; - this.retryTimes = 0; + this.candidateLeader = null; + this.retryForwardTimes = 0; + this.retryLeaderTimes = 0; if (this.targetStore.getProxyStore() != null) { this.timeout = conf.getForwardTimeout(); + } else if (!this.targetStore.isReachable() && !this.targetStore.canForwardFirst()) { + onStoreUnreachable(); } } + @Override public TiRegion getRegion() { return region; } @@ -103,44 +109,155 @@ public abstract class AbstractRegionStoreClient if (!region.getRegionEpoch().equals(newRegion.getRegionEpoch())) { return false; } + + // If we try one peer but find the leader has not changed, we do not need try other peers. + if (candidateLeader != null + && region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) { + retryLeaderTimes = newRegion.getFollowerList().size(); + originStore = null; + } + candidateLeader = null; region = newRegion; targetStore = regionManager.getStoreById(region.getLeader().getStoreId()); - originStore = null; - String addressStr = targetStore.getStore().getAddress(); - ManagedChannel channel = - channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); - blockingStub = TikvGrpc.newBlockingStub(channel); - asyncStub = TikvGrpc.newStub(channel); + updateClientStub(); return true; } @Override public boolean onStoreUnreachable() { - if (!conf.getEnableGrpcForward()) { - regionManager.onRequestFail(region); - return false; - } if (targetStore.getProxyStore() == null) { - if (!targetStore.isUnreachable()) { - if (checkHealth(targetStore.getStore())) { - return true; - } + if (targetStore.isReachable()) { + return true; } - } else if (retryTimes > region.getFollowerList().size()) { + } + + // If this store has failed to forward request too many times, we shall try other peer at first + // so that we can + // reduce the latency cost by fail requests. + if (targetStore.canForwardFirst()) { + if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) { + return retryOtherStoreByProxyForward(); + } + if (retryOtherStoreLeader()) { + return true; + } + } else { + if (retryOtherStoreLeader()) { + return true; + } + if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) { + return retryOtherStoreByProxyForward(); + } + return true; + } + + logger.warn( + String.format( + "retry time exceed for region[%d], invalid this region[%d]", + region.getId(), targetStore.getId())); + regionManager.onRequestFail(region); + return false; + } + + protected Kvrpcpb.Context makeContext(TiStoreType storeType) { + if (candidateLeader != null && storeType == TiStoreType.TiKV) { + return region.getReplicaContext(candidateLeader, java.util.Collections.emptySet()); + } else { + return region.getReplicaContext(java.util.Collections.emptySet(), storeType); + } + } + + protected Kvrpcpb.Context makeContext(Set resolvedLocks, TiStoreType storeType) { + if (candidateLeader != null && storeType == TiStoreType.TiKV) { + return region.getReplicaContext(candidateLeader, resolvedLocks); + } else { + return region.getReplicaContext(resolvedLocks, storeType); + } + } + + @Override + public void tryUpdateRegionStore() { + if (originStore != null) { + if (originStore.getId() == targetStore.getId()) { + logger.warn( + String.format( + "update store [%s] by proxy-store [%s]", + targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress())); + // We do not need to mark the store can-forward, because if one store has grpc forward + // successfully, it will + // create a new store object, which is can-forward. + regionManager.updateStore(originStore, targetStore); + } else { + // If we try to forward request to leader by follower failed, it means that the store of old + // leader may be + // unavailable but the new leader has not been report to PD. So we can ban this store for a + // short time to + // avoid too many request try forward rather than try other peer. + originStore.forwardFail(); + } + } + if (candidateLeader != null) { logger.warn( String.format( - "retry time exceed for region[%d], invalid this region[%d]", - region.getId(), targetStore.getId())); - regionManager.onRequestFail(region); + "update leader to store [%d] for region[%d]", + candidateLeader.getStoreId(), region.getId())); + this.regionManager.updateLeader(region, candidateLeader.getStoreId()); + } + } + + private boolean retryOtherStoreLeader() { + List peers = region.getFollowerList(); + if (retryLeaderTimes >= peers.size()) { return false; } + retryLeaderTimes += 1; + boolean hasVisitedStore = false; + for (Metapb.Peer cur : peers) { + if (candidateLeader == null || hasVisitedStore) { + TiStore store = regionManager.getStoreById(cur.getStoreId()); + if (store != null && store.isReachable()) { + targetStore = store; + candidateLeader = cur; + logger.warn( + String.format( + "try store [%d],peer[%d] for region[%d], which may be new leader", + targetStore.getId(), candidateLeader.getId(), region.getId())); + updateClientStub(); + return true; + } else { + continue; + } + } else if (candidateLeader.getId() == cur.getId()) { + hasVisitedStore = true; + } + } + candidateLeader = null; + retryLeaderTimes = peers.size(); + return false; + } + + private void updateClientStub() { + String addressStr = targetStore.getStore().getAddress(); + ManagedChannel channel = + channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); + blockingStub = TikvGrpc.newBlockingStub(channel); + asyncStub = TikvGrpc.newStub(channel); + } + + private boolean retryOtherStoreByProxyForward() { + if (!targetStore.isValid()) { + targetStore = regionManager.getStoreById(targetStore.getId()); + logger.warn( + String.format("store [%d] has been invalid", region.getId(), targetStore.getId())); + return true; + } + TiStore proxyStore = switchProxyStore(); if (proxyStore == null) { logger.warn( String.format( "no forward store can be selected for store [%s] and region[%d]", targetStore.getStore().getAddress(), region.getId())); - regionManager.onRequestFail(region); return false; } if (originStore == null) { @@ -150,7 +267,7 @@ public abstract class AbstractRegionStoreClient } } targetStore = proxyStore; - retryTimes += 1; + retryForwardTimes += 1; logger.warn( String.format( "forward request to store [%s] by store [%s] for region[%d]", @@ -167,58 +284,24 @@ public abstract class AbstractRegionStoreClient return true; } - @Override - protected void tryUpdateProxy() { - if (originStore != null) { - logger.warn( - String.format( - "update store [%s] by proxy-store [%s]", - targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress())); - regionManager.updateStore(originStore, targetStore); - } - } - - private boolean checkHealth(Metapb.Store store) { - String addressStr = store.getAddress(); - ManagedChannel channel = - channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); - HealthGrpc.HealthBlockingStub stub = - HealthGrpc.newBlockingStub(channel) - .withDeadlineAfter(conf.getGrpcHealthCheckTimeout(), TimeUnit.MILLISECONDS); - HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); - try { - HealthCheckResponse resp = stub.check(req); - if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) { - return false; - } - } catch (Exception e) { - return false; - } - return true; - } - private TiStore switchProxyStore() { boolean hasVisitedStore = false; List peers = region.getFollowerList(); - for (int i = 0; i < peers.size() * 2; i++) { - int idx = i % peers.size(); - Metapb.Peer peer = peers.get(idx); - if (peer.getStoreId() != region.getLeader().getStoreId()) { - if (targetStore.getProxyStore() == null) { - TiStore store = regionManager.getStoreById(peer.getStoreId()); - if (checkHealth(store.getStore())) { - return targetStore.withProxy(store.getStore()); - } - } else { - if (peer.getStoreId() == targetStore.getProxyStore().getId()) { - hasVisitedStore = true; - } else if (hasVisitedStore) { - TiStore proxyStore = regionManager.getStoreById(peer.getStoreId()); - if (!proxyStore.isUnreachable() && checkHealth(proxyStore.getStore())) { - return targetStore.withProxy(proxyStore.getStore()); - } - } + if (peers.isEmpty()) { + return null; + } + Metapb.Store proxyStore = targetStore.getProxyStore(); + if (proxyStore == null || peers.get(peers.size() - 1).getStoreId() == proxyStore.getId()) { + hasVisitedStore = true; + } + for (Metapb.Peer peer : peers) { + if (hasVisitedStore) { + TiStore store = regionManager.getStoreById(peer.getStoreId()); + if (store.isReachable()) { + return targetStore.withProxy(store.getStore()); } + } else if (peer.getStoreId() == proxyStore.getId()) { + hasVisitedStore = true; } } return null; diff --git a/src/main/java/org/tikv/common/region/RegionCache.java b/src/main/java/org/tikv/common/region/RegionCache.java new file mode 100644 index 0000000000..a52fc716f9 --- /dev/null +++ b/src/main/java/org/tikv/common/region/RegionCache.java @@ -0,0 +1,186 @@ +package org.tikv.common.region; + +import static org.tikv.common.codec.KeyUtils.formatBytesUTF8; +import static org.tikv.common.util.KeyRangeUtils.makeRange; + +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.key.Key; +import org.tikv.common.util.BackOffer; + +public class RegionCache { + private static final Logger logger = LoggerFactory.getLogger(RegionCache.class); + + private final Map regionCache; + private final Map storeCache; + private final RangeMap keyToRegionIdCache; + + public RegionCache() { + regionCache = new HashMap<>(); + storeCache = new HashMap<>(); + + keyToRegionIdCache = TreeRangeMap.create(); + } + + public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { + Long regionId; + if (key.isEmpty()) { + // if key is empty, it must be the start key. + regionId = keyToRegionIdCache.get(Key.toRawKey(key, true)); + } else { + regionId = keyToRegionIdCache.get(Key.toRawKey(key)); + } + if (logger.isDebugEnabled()) { + logger.debug( + String.format("getRegionByKey key[%s] -> ID[%s]", formatBytesUTF8(key), regionId)); + } + + if (regionId == null) { + return null; + } + TiRegion region; + region = regionCache.get(regionId); + if (logger.isDebugEnabled()) { + logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region)); + } + return region; + } + + public synchronized TiRegion putRegion(TiRegion region) { + if (logger.isDebugEnabled()) { + logger.debug("putRegion: " + region); + } + TiRegion oldRegion = regionCache.get(region.getId()); + if (oldRegion != null) { + if (oldRegion.getMeta().equals(region.getMeta())) { + return oldRegion; + } else { + invalidateRegion(oldRegion); + } + } + regionCache.put(region.getId(), region); + keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId()); + return region; + } + + @Deprecated + public synchronized TiRegion getRegionById(long regionId) { + TiRegion region = regionCache.get(regionId); + if (logger.isDebugEnabled()) { + logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region)); + } + return region; + } + + private synchronized TiRegion getRegionFromCache(long regionId) { + return regionCache.get(regionId); + } + + /** Removes region associated with regionId from regionCache. */ + public synchronized void invalidateRegion(TiRegion region) { + try { + if (logger.isDebugEnabled()) { + logger.debug(String.format("invalidateRegion ID[%s]", region.getId())); + } + TiRegion oldRegion = regionCache.get(region.getId()); + if (oldRegion != null && oldRegion == region) { + keyToRegionIdCache.remove(makeRange(region.getStartKey(), region.getEndKey())); + regionCache.remove(region.getId()); + } + } catch (Exception ignore) { + } + } + + public synchronized boolean updateRegion(TiRegion expected, TiRegion region) { + try { + if (logger.isDebugEnabled()) { + logger.debug(String.format("invalidateRegion ID[%s]", region.getId())); + } + TiRegion oldRegion = regionCache.get(region.getId()); + if (!expected.getMeta().equals(oldRegion.getMeta())) { + return false; + } else { + if (oldRegion != null) { + keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey())); + } + regionCache.put(region.getId(), region); + keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId()); + return true; + } + } catch (Exception ignore) { + return false; + } + } + + public synchronized boolean updateStore(TiStore oldStore, TiStore newStore) { + if (!newStore.isValid()) { + return false; + } + TiStore originStore = storeCache.get(oldStore.getId()); + if (originStore == oldStore) { + storeCache.put(newStore.getId(), newStore); + oldStore.markInvalid(); + return true; + } + return false; + } + + public synchronized void invalidateAllRegionForStore(TiStore store) { + TiStore oldStore = storeCache.get(store.getId()); + if (oldStore != store) { + return; + } + List regionToRemove = new ArrayList<>(); + for (TiRegion r : regionCache.values()) { + if (r.getLeader().getStoreId() == store.getId()) { + if (logger.isDebugEnabled()) { + logger.debug(String.format("invalidateAllRegionForStore Region[%s]", r)); + } + regionToRemove.add(r); + } + } + + logger.warn(String.format("invalid store [%d]", store.getId())); + // remove region + for (TiRegion r : regionToRemove) { + keyToRegionIdCache.remove(makeRange(r.getStartKey(), r.getEndKey())); + regionCache.remove(r.getId()); + } + } + + public synchronized void invalidateStore(long storeId) { + TiStore store = storeCache.remove(storeId); + if (store != null) { + store.markInvalid(); + } + } + + public synchronized TiStore getStoreById(long id) { + return storeCache.get(id); + } + + public synchronized boolean putStore(long id, TiStore store) { + TiStore oldStore = storeCache.get(id); + if (oldStore != null) { + if (oldStore.equals(store)) { + return false; + } else { + oldStore.markInvalid(); + } + } + storeCache.put(id, store); + return true; + } + + public synchronized void clearAll() { + keyToRegionIdCache.clear(); + regionCache.clear(); + } +} diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index 4bee1356ea..0a5bcabf03 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -23,5 +23,7 @@ public interface RegionErrorReceiver { /// return whether we need to retry this request. boolean onStoreUnreachable(); + void tryUpdateRegionStore(); + TiRegion getRegion(); } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index a565b5cce4..5aad400ed8 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -18,16 +18,10 @@ package org.tikv.common.region; import static org.tikv.common.codec.KeyUtils.formatBytesUTF8; -import static org.tikv.common.util.KeyRangeUtils.makeRange; -import com.google.common.collect.RangeMap; -import com.google.common.collect.TreeRangeMap; import com.google.protobuf.ByteString; import io.prometheus.client.Histogram; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -40,7 +34,6 @@ import org.tikv.common.TiConfiguration; import org.tikv.common.event.CacheInvalidateEvent; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiClientInternalException; -import org.tikv.common.key.Key; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; @@ -52,22 +45,22 @@ import org.tikv.kvproto.Metapb.StoreState; @SuppressWarnings("UnstableApiUsage") public class RegionManager { private static final Logger logger = LoggerFactory.getLogger(RegionManager.class); + public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY = + Histogram.build() + .name("client_java_get_region_by_requests_latency") + .help("getRegionByKey request latency.") + .register(); + // TODO: the region cache logic need rewrite. // https://github.com/pingcap/tispark/issues/1170 private final RegionCache cache; private final ReadOnlyPDClient pdClient; private final TiConfiguration conf; private final ScheduledExecutorService executor; - private final UnreachableStoreChecker storeChecker; + private final StoreHealthyChecker storeChecker; private final Function cacheInvalidateCallback; - public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY = - Histogram.build() - .name("client_java_get_region_by_requests_latency") - .help("getRegionByKey request latency.") - .register(); - // To avoid double retrieval, we used the async version of grpc // When rpc not returned, instead of call again, it wait for previous one done public RegionManager( @@ -94,10 +87,11 @@ public class RegionManager { this.conf = conf; if (enableGrpcForward) { - UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient); + StoreHealthyChecker storeChecker = + new StoreHealthyChecker(channelFactory, pdClient, this.cache); this.storeChecker = storeChecker; this.executor = Executors.newScheduledThreadPool(1); - this.executor.scheduleAtFixedRate(storeChecker, 10, 10, TimeUnit.SECONDS); + this.executor.scheduleAtFixedRate(storeChecker, 1, 1, TimeUnit.SECONDS); } else { this.storeChecker = null; this.executor = null; @@ -132,13 +126,21 @@ public class RegionManager { } public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { + Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); TiRegion region = cache.getRegionByKey(key, backOffer); - if (region == null) { - logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key)); - Pair regionAndLeader = pdClient.getRegionByKey(backOffer, key); - region = - cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer)); + try { + if (region == null) { + logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key)); + Pair regionAndLeader = pdClient.getRegionByKey(backOffer, key); + region = + cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer)); + } + } catch (Exception e) { + return null; + } finally { + requestTimer.observeDuration(); } + return region; } @@ -232,7 +234,10 @@ public class RegionManager { if (store.getStore().getState().equals(StoreState.Tombstone)) { return null; } - return cache.putStore(id, store); + if (cache.putStore(id, store)) { + storeChecker.scheduleStoreHealthCheck(store); + } + return store; } catch (Exception e) { throw new GrpcException(e); } @@ -246,7 +251,10 @@ public class RegionManager { cache.invalidateRegion(region); } - public synchronized TiRegion updateLeader(TiRegion region, long storeId) { + public TiRegion updateLeader(TiRegion region, long storeId) { + if (region.getLeader().getStoreId() == storeId) { + return region; + } TiRegion newRegion = region.switchPeer(storeId); if (cache.updateRegion(region, newRegion)) { return newRegion; @@ -259,13 +267,7 @@ public class RegionManager { public synchronized void updateStore(TiStore oldStore, TiStore newStore) { if (cache.updateStore(oldStore, newStore)) { - if (newStore.isUnreachable()) { - logger.warn( - String.format( - "check health for store [%s] in background thread", - newStore.getStore().getAddress())); - this.storeChecker.scheduleStoreHealthCheck(newStore); - } + storeChecker.scheduleStoreHealthCheck(newStore); } } @@ -283,6 +285,24 @@ public class RegionManager { cache.invalidateRegion(region); } + /** If region has changed, return the new one and update cache. */ + public TiRegion getRegionSkipCache(TiRegion region) { + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + try { + Pair regionAndLeader = + pdClient.getRegionByID(backOffer, region.getId()); + if (!regionAndLeader.first.equals(region.getMeta())) { + region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer); + return cache.putRegion(region); + } else { + logger.warn("Cannot get region from PD for region id: " + region.getId()); + return null; + } + } catch (Exception e) { + return null; + } + } + public void invalidateStore(long storeId) { cache.invalidateStore(storeId); } @@ -290,177 +310,4 @@ public class RegionManager { public void invalidateRegion(TiRegion region) { cache.invalidateRegion(region); } - - public static class RegionCache { - private final Map regionCache; - private final Map storeCache; - private final RangeMap keyToRegionIdCache; - - public RegionCache() { - regionCache = new HashMap<>(); - storeCache = new HashMap<>(); - - keyToRegionIdCache = TreeRangeMap.create(); - } - - public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { - Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); - try { - Long regionId; - if (key.isEmpty()) { - // if key is empty, it must be the start key. - regionId = keyToRegionIdCache.get(Key.toRawKey(key, true)); - } else { - regionId = keyToRegionIdCache.get(Key.toRawKey(key)); - } - if (logger.isDebugEnabled()) { - logger.debug( - String.format("getRegionByKey key[%s] -> ID[%s]", formatBytesUTF8(key), regionId)); - } - - if (regionId == null) { - return null; - } - TiRegion region; - region = regionCache.get(regionId); - if (logger.isDebugEnabled()) { - logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region)); - } - - return region; - } finally { - requestTimer.observeDuration(); - } - } - - private synchronized TiRegion putRegion(TiRegion region) { - if (logger.isDebugEnabled()) { - logger.debug("putRegion: " + region); - } - TiRegion oldRegion = regionCache.get(region.getId()); - if (oldRegion != null) { - if (oldRegion.getMeta().equals(region.getMeta())) { - return oldRegion; - } else { - invalidateRegion(oldRegion); - } - } - regionCache.put(region.getId(), region); - keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId()); - return region; - } - - @Deprecated - private synchronized TiRegion getRegionById(long regionId) { - TiRegion region = regionCache.get(regionId); - if (logger.isDebugEnabled()) { - logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region)); - } - return region; - } - - private synchronized TiRegion getRegionFromCache(long regionId) { - return regionCache.get(regionId); - } - - /** Removes region associated with regionId from regionCache. */ - public synchronized void invalidateRegion(TiRegion region) { - try { - if (logger.isDebugEnabled()) { - logger.debug(String.format("invalidateRegion ID[%s]", region.getId())); - } - TiRegion oldRegion = regionCache.get(region.getId()); - if (oldRegion != null && oldRegion == region) { - keyToRegionIdCache.remove(makeRange(region.getStartKey(), region.getEndKey())); - regionCache.remove(region.getId()); - } - } catch (Exception ignore) { - } - } - - public synchronized boolean updateRegion(TiRegion expected, TiRegion region) { - try { - if (logger.isDebugEnabled()) { - logger.debug(String.format("invalidateRegion ID[%s]", region.getId())); - } - TiRegion oldRegion = regionCache.get(region.getId()); - if (!expected.getMeta().equals(oldRegion.getMeta())) { - return false; - } else { - if (oldRegion != null) { - keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey())); - } - regionCache.put(region.getId(), region); - keyToRegionIdCache.put( - makeRange(region.getStartKey(), region.getEndKey()), region.getId()); - return true; - } - } catch (Exception ignore) { - return false; - } - } - - public synchronized boolean updateStore(TiStore oldStore, TiStore newStore) { - TiStore originStore = storeCache.get(oldStore.getId()); - if (originStore == oldStore) { - storeCache.put(newStore.getId(), newStore); - if (oldStore != null && oldStore.isUnreachable()) { - oldStore.markReachable(); - } - if (newStore.getProxyStore() != null) { - newStore.markUnreachable(); - } - return true; - } - return false; - } - - public synchronized void invalidateAllRegionForStore(TiStore store) { - TiStore oldStore = storeCache.get(store.getId()); - if (oldStore != store) { - return; - } - List regionToRemove = new ArrayList<>(); - for (TiRegion r : regionCache.values()) { - if (r.getLeader().getStoreId() == store.getId()) { - if (logger.isDebugEnabled()) { - logger.debug(String.format("invalidateAllRegionForStore Region[%s]", r)); - } - regionToRemove.add(r); - } - } - - logger.warn(String.format("invalid store [%d]", store.getId())); - // remove region - for (TiRegion r : regionToRemove) { - keyToRegionIdCache.remove(makeRange(r.getStartKey(), r.getEndKey())); - regionCache.remove(r.getId()); - } - } - - public synchronized void invalidateStore(long storeId) { - TiStore store = storeCache.remove(storeId); - if (store != null) { - store.markReachable(); - } - } - - public synchronized TiStore getStoreById(long id) { - return storeCache.get(id); - } - - public synchronized TiStore putStore(long id, TiStore store) { - TiStore oldStore = storeCache.get(id); - if (oldStore != null && oldStore.getStore().equals(store.getStore())) { - return oldStore; - } - storeCache.put(id, store); - return store; - } - - public synchronized void clearAll() { - keyToRegionIdCache.clear(); - regionCache.clear(); - } - } } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 5839c14e97..19eed19531 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -171,7 +171,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> GetRequest.newBuilder() - .setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType)) + .setContext(makeContext(getResolvedLocks(version), this.storeType)) .setKey(key) .setVersion(version) .build(); @@ -216,7 +216,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier request = () -> BatchGetRequest.newBuilder() - .setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType)) + .setContext(makeContext(getResolvedLocks(version), this.storeType)) .addAllKeys(keys) .setVersion(version) .build(); @@ -279,7 +279,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier request = () -> ScanRequest.newBuilder() - .setContext(region.getReplicaContext(getResolvedLocks(version), this.storeType)) + .setContext(makeContext(getResolvedLocks(version), this.storeType)) .setStartKey(startKey) .setVersion(version) .setKeyOnly(keyOnly) @@ -381,7 +381,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { () -> getIsV4() ? PrewriteRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .setStartVersion(startTs) .setPrimaryLock(primaryLock) .addAllMutations(mutations) @@ -391,7 +391,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setTxnSize(16) .build() : PrewriteRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .setStartVersion(startTs) .setPrimaryLock(primaryLock) .addAllMutations(mutations) @@ -471,7 +471,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> TxnHeartBeatRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .setStartVersion(startTs) .setPrimaryLock(primaryLock) .setAdviseLockTtl(ttl) @@ -529,7 +529,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setStartVersion(startTs) .setCommitVersion(commitTs) .addAllKeys(keys) - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .build(); KVErrorHandler handler = new KVErrorHandler<>( @@ -590,7 +590,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier reqToSend = () -> Coprocessor.Request.newBuilder() - .setContext(region.getReplicaContext(getResolvedLocks(startTs), this.storeType)) + .setContext(makeContext(getResolvedLocks(startTs), this.storeType)) .setTp(REQ_TYPE_DAG.getValue()) .setStartTs(startTs) .setData(req.toByteString()) @@ -713,7 +713,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier reqToSend = () -> Coprocessor.Request.newBuilder() - .setContext(region.getReplicaContext(getResolvedLocks(startTs), this.storeType)) + .setContext(makeContext(getResolvedLocks(startTs), this.storeType)) // TODO: If no executors...? .setTp(REQ_TYPE_DAG.getValue()) .setData(req.toByteString()) @@ -751,7 +751,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier request = () -> SplitRegionRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .addAllSplitKeys(splitKeys) .build(); @@ -792,11 +792,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer(); try { Supplier factory = - () -> - RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) - .setKey(key) - .build(); + () -> RawGetRequest.newBuilder().setContext(makeContext(storeType)).setKey(key).build(); RegionErrorHandler handler = new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); @@ -833,7 +829,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawGetKeyTTLRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .setKey(key) .build(); RegionErrorHandler handler = @@ -872,7 +868,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawDeleteRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .setKey(key) .setForCas(atomicForCAS) .build(); @@ -910,7 +906,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawPutRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .setKey(key) .setValue(value) .setTtl(ttl) @@ -954,7 +950,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawCASRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .setKey(key) .setValue(value) .setPreviousValue(prevValue.orElse(ByteString.EMPTY)) @@ -1007,7 +1003,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawBatchGetRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .addAllKeys(keys) .build(); RegionErrorHandler handler = @@ -1043,7 +1039,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawBatchPutRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .addAllPairs(kvPairs) .setTtl(ttl) .setForCas(atomicForCAS) @@ -1095,7 +1091,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawBatchDeleteRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .addAllKeys(keys) .setForCas(atomicForCAS) .build(); @@ -1140,7 +1136,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawScanRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .setStartKey(key) .setKeyOnly(keyOnly) .setLimit(limit) @@ -1186,7 +1182,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Supplier factory = () -> RawDeleteRangeRequest.newBuilder() - .setContext(region.getReplicaContext(storeType)) + .setContext(makeContext(storeType)) .setStartKey(startKey) .setEndKey(endKey) .build(); @@ -1269,7 +1265,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { TikvBlockingStub blockingStub = null; TikvStub asyncStub = null; - if (conf.getEnableGrpcForward() && store.getProxyStore() != null && store.isUnreachable()) { + if (conf.getEnableGrpcForward() && store.getProxyStore() != null && !store.isReachable()) { addressStr = store.getProxyStore().getAddress(); channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); @@ -1278,18 +1274,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient { blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); } else { - // If the store is reachable, which is update by check-health thread, cancel proxy forward. - if (!store.isUnreachable()) { - if (store.getProxyStore() != null) { - logger.warn( - String.format( - "cancel request to store [%s] forward by store[%s]", - store.getStore().getAddress(), store.getProxyStore().getAddress())); - TiStore newStore = store.withProxy(null); - regionManager.updateStore(store, newStore); - store = newStore; - } - } channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); asyncStub = TikvGrpc.newStub(channel); diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java new file mode 100644 index 0000000000..352725f5fe --- /dev/null +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -0,0 +1,150 @@ +package org.tikv.common.region; + +import io.grpc.ManagedChannel; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.ReadOnlyPDClient; +import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.Metapb; + +public class StoreHealthyChecker implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class); + private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60; + private static final long SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK = 500; + private BlockingQueue taskQueue; + private final ChannelFactory channelFactory; + private final ReadOnlyPDClient pdClient; + private final RegionCache cache; + private long checkTombstoneTick; + + public StoreHealthyChecker( + ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache) { + this.taskQueue = new LinkedBlockingQueue<>(); + this.channelFactory = channelFactory; + this.pdClient = pdClient; + this.cache = cache; + this.checkTombstoneTick = 0; + } + + public boolean scheduleStoreHealthCheck(TiStore store) { + if (!this.taskQueue.add(store)) { + // add queue false, mark it reachable so that it can be put again. + return false; + } + return true; + } + + private List getValidStores() { + List unhealthStore = new LinkedList<>(); + while (!this.taskQueue.isEmpty()) { + try { + TiStore store = this.taskQueue.take(); + if (!store.isValid()) { + continue; + } + unhealthStore.add(store); + } catch (Exception e) { + return unhealthStore; + } + } + return unhealthStore; + } + + private boolean checkStoreHealth(TiStore store) { + String addressStr = store.getStore().getAddress(); + try { + ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); + HealthGrpc.HealthBlockingStub stub = + HealthGrpc.newBlockingStub(channel).withDeadlineAfter(200, TimeUnit.MILLISECONDS); + HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); + HealthCheckResponse resp = stub.check(req); + if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { + return true; + } else { + return false; + } + } catch (Exception e) { + return false; + } + } + + private boolean checkStoreTombstone(TiStore store) { + try { + Metapb.Store newStore = pdClient.getStore(ConcreteBackOffer.newRawKVBackOff(), store.getId()); + if (newStore.getState() == Metapb.StoreState.Tombstone) { + return true; + } + } catch (Exception e) { + return false; + } + return false; + } + + @Override + public void run() { + checkTombstoneTick += 1; + boolean needCheckTombstoneStore = false; + if (checkTombstoneTick >= MAX_CHECK_STORE_TOMBSTONE_TICK) { + needCheckTombstoneStore = true; + checkTombstoneTick = 0; + } + List allStores = getValidStores(); + List unreachableStore = new LinkedList<>(); + for (TiStore store : allStores) { + if (needCheckTombstoneStore) { + if (checkStoreTombstone(store)) { + continue; + } + } + + if (checkStoreHealth(store)) { + if (store.getProxyStore() != null) { + TiStore newStore = store.withProxy(null); + logger.warn(String.format("store [%s] recovers to be reachable", store.getAddress())); + if (cache.putStore(newStore.getId(), newStore)) { + this.taskQueue.add(newStore); + continue; + } + } else { + if (!store.isReachable()) { + logger.warn( + String.format( + "store [%s] recovers to be reachable and canforward", store.getAddress())); + store.markReachable(); + } + if (!store.canForwardFirst()) { + store.makrCanForward(); + } + } + } else if (store.isReachable()) { + unreachableStore.add(store); + continue; + } + this.taskQueue.add(store); + } + if (!unreachableStore.isEmpty()) { + try { + Thread.sleep(SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK); + } catch (Exception e) { + this.taskQueue.addAll(unreachableStore); + return; + } + for (TiStore store : unreachableStore) { + if (!checkStoreHealth(store)) { + logger.warn(String.format("store [%s] is not reachable", store.getAddress())); + store.markUnreachable(); + } + this.taskQueue.add(store); + } + } + } +} diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index f7b736dbe2..b53557f930 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -142,20 +142,21 @@ public class TiRegion implements Serializable { } public Kvrpcpb.Context getLeaderContext() { - return getContext(this.leader, java.util.Collections.emptySet(), TiStoreType.TiKV); - } - - public Kvrpcpb.Context getReplicaContext(TiStoreType storeType) { - return getContext(getCurrentReplica(), java.util.Collections.emptySet(), storeType); + return getContext(this.leader, java.util.Collections.emptySet(), false); } public Kvrpcpb.Context getReplicaContext(Set resolvedLocks, TiStoreType storeType) { - return getContext(getCurrentReplica(), resolvedLocks, storeType); + Peer currentPeer = getCurrentReplica(); + boolean replicaRead = !isLeader(currentPeer) && TiStoreType.TiKV.equals(storeType); + return getContext(currentPeer, resolvedLocks, replicaRead); + } + + public Kvrpcpb.Context getReplicaContext(Peer currentPeer, Set resolvedLocks) { + return getContext(currentPeer, resolvedLocks, false); } private Kvrpcpb.Context getContext( - Peer currentPeer, Set resolvedLocks, TiStoreType storeType) { - boolean replicaRead = !isLeader(currentPeer) && TiStoreType.TiKV.equals(storeType); + Peer currentPeer, Set resolvedLocks, boolean replicaRead) { Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder(); builder diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index f6c7fc80eb..a6346f44c3 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -1,23 +1,60 @@ package org.tikv.common.region; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.tikv.kvproto.Metapb; public class TiStore { + private static long MAX_FAIL_FORWARD_TIMES = 4; private final Metapb.Store store; private final Metapb.Store proxyStore; - private AtomicBoolean unreachable; + private AtomicBoolean reachable; + private AtomicBoolean valid; + private AtomicLong failForwardCount; + private AtomicBoolean canForward; public TiStore(Metapb.Store store) { this.store = store; - this.unreachable = new AtomicBoolean(false); + this.reachable = new AtomicBoolean(true); + this.valid = new AtomicBoolean(true); + this.canForward = new AtomicBoolean(true); this.proxyStore = null; + this.failForwardCount = new AtomicLong(0); } private TiStore(Metapb.Store store, Metapb.Store proxyStore) { this.store = store; - this.unreachable = new AtomicBoolean(false); + if (proxyStore != null) { + this.reachable = new AtomicBoolean(false); + } else { + this.reachable = new AtomicBoolean(true); + } + this.valid = new AtomicBoolean(true); + this.canForward = new AtomicBoolean(true); this.proxyStore = proxyStore; + this.failForwardCount = new AtomicLong(0); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof TiStore)) { + return super.equals(obj); + } + TiStore other = (TiStore) obj; + if (!this.store.equals(other.store)) { + return false; + } + + if (proxyStore == null && other.proxyStore == null) { + return true; + } + if (proxyStore != null && other.proxyStore != null) { + return proxyStore.equals(other.proxyStore); + } + return false; } public TiStore withProxy(Metapb.Store proxyStore) { @@ -25,15 +62,40 @@ public class TiStore { } public void markUnreachable() { - this.unreachable.set(true); + this.reachable.set(false); } public void markReachable() { - this.unreachable.set(false); + this.reachable.set(true); } - public boolean isUnreachable() { - return this.unreachable.get(); + public boolean isReachable() { + return this.reachable.get(); + } + + public boolean isValid() { + return this.valid.get(); + } + + public void markInvalid() { + this.valid.set(false); + } + + public void forwardFail() { + if (this.canForward.get()) { + if (this.failForwardCount.addAndGet(1) >= MAX_FAIL_FORWARD_TIMES) { + this.canForward.set(false); + } + } + } + + public void makrCanForward() { + this.failForwardCount.set(0); + this.canForward.set(true); + } + + public boolean canForwardFirst() { + return this.canForward.get(); } public Metapb.Store getStore() { diff --git a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java deleted file mode 100644 index 11ea49b639..0000000000 --- a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java +++ /dev/null @@ -1,90 +0,0 @@ -package org.tikv.common.region; - -import io.grpc.ManagedChannel; -import io.grpc.health.v1.HealthCheckRequest; -import io.grpc.health.v1.HealthCheckResponse; -import io.grpc.health.v1.HealthGrpc; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.tikv.common.ReadOnlyPDClient; -import org.tikv.common.util.ChannelFactory; -import org.tikv.common.util.ConcreteBackOffer; -import org.tikv.kvproto.Metapb; - -public class UnreachableStoreChecker implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(UnreachableStoreChecker.class); - private ConcurrentHashMap stores; - private BlockingQueue taskQueue; - private final ChannelFactory channelFactory; - private final ReadOnlyPDClient pdClient; - - public UnreachableStoreChecker(ChannelFactory channelFactory, ReadOnlyPDClient pdClient) { - this.stores = new ConcurrentHashMap(); - this.taskQueue = new LinkedBlockingQueue<>(); - this.channelFactory = channelFactory; - this.pdClient = pdClient; - } - - public void scheduleStoreHealthCheck(TiStore store) { - TiStore oldStore = this.stores.get(Long.valueOf(store.getId())); - if (oldStore == store) { - return; - } - this.stores.put(Long.valueOf(store.getId()), store); - if (!this.taskQueue.add(store)) { - // add queue false, mark it reachable so that it can be put again. - store.markReachable(); - } - } - - private List getUnhealthStore() { - List unhealthStore = new LinkedList<>(); - while (!this.taskQueue.isEmpty()) { - try { - TiStore store = this.taskQueue.take(); - unhealthStore.add(store); - } catch (Exception e) { - return unhealthStore; - } - } - return unhealthStore; - } - - @Override - public void run() { - List unhealthStore = getUnhealthStore(); - for (TiStore store : unhealthStore) { - if (!store.isUnreachable()) { - continue; - } - String addressStr = store.getStore().getAddress(); - ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); - HealthGrpc.HealthBlockingStub stub = HealthGrpc.newBlockingStub(channel); - HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); - try { - HealthCheckResponse resp = stub.check(req); - if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { - store.markReachable(); - logger.warn( - String.format("store [%s] recovers to be reachable", store.getStore().getAddress())); - - this.stores.remove(Long.valueOf(store.getId())); - continue; - } - Metapb.Store newStore = - pdClient.getStore(ConcreteBackOffer.newRawKVBackOff(), store.getId()); - if (newStore.getState() == Metapb.StoreState.Tombstone) { - continue; - } - this.taskQueue.add(store); - } catch (Exception e) { - this.taskQueue.add(store); - } - } - } -} diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index b248d233d8..a7af12b85c 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -102,7 +102,7 @@ public class ConcreteBackOffer implements BackOffer { backOffFunction = BackOffFunction.create(200, 3000, BackOffStrategy.EqualJitter); break; case BoPDRPC: - backOffFunction = BackOffFunction.create(500, 3000, BackOffStrategy.EqualJitter); + backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter); break; case BoTiKVRPC: backOffFunction = BackOffFunction.create(100, 400, BackOffStrategy.EqualJitter);