diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 6b3e511512..d18954e635 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -216,7 +216,7 @@ public class RegionErrorHandler implements ErrorHandler { @Override public boolean handleRequestError(BackOffer backOffer, Exception e) { - if (recv.onStoreUnreachable(backOffer.getSlowLog())) { + if (recv.onStoreUnreachable(backOffer)) { if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) { regionManager.onRequestFail(recv.getRegion()); throw new GrpcException("retry is exhausted.", e); diff --git a/src/main/java/org/tikv/common/policy/RetryPolicy.java b/src/main/java/org/tikv/common/policy/RetryPolicy.java index a7db3837a9..ce1e37b51e 100644 --- a/src/main/java/org/tikv/common/policy/RetryPolicy.java +++ b/src/main/java/org/tikv/common/policy/RetryPolicy.java @@ -89,6 +89,7 @@ public abstract class RetryPolicy { } catch (Exception e) { rethrowNotRecoverableException(e); // Handle request call error + backOffer.checkTimeout(); boolean retry = handler.handleRequestError(backOffer, e); if (retry) { GRPC_REQUEST_RETRY_NUM.labels(methodName).inc(); diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index e34a0ff352..f58a926723 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -35,9 +35,8 @@ import org.slf4j.LoggerFactory; import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; -import org.tikv.common.log.SlowLog; -import org.tikv.common.log.SlowLogEmptyImpl; import org.tikv.common.log.SlowLogSpan; +import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Metapb; @@ -81,10 +80,6 @@ public abstract class AbstractRegionStoreClient this.store = store; if (this.store.getProxyStore() != null) { this.timeout = conf.getForwardTimeout(); - } else if (!this.store.isReachable()) { - // cannot get Deadline or SlowLog instance here - // use SlowLogEmptyImpl instead to skip slow log record - onStoreUnreachable(SlowLogEmptyImpl.INSTANCE); } } @@ -134,22 +129,24 @@ public abstract class AbstractRegionStoreClient } @Override - public boolean onStoreUnreachable(SlowLog slowLog) { + public boolean onStoreUnreachable(BackOffer backOffer) { if (!store.isValid()) { logger.warn(String.format("store [%d] has been invalid", store.getId())); - store = regionManager.getStoreById(store.getId()); + store = regionManager.getStoreById(store.getId(), backOffer); updateClientStub(); return true; } // seek an available leader store to send request - Boolean result = seekLeaderStore(slowLog); + backOffer.checkTimeout(); + Boolean result = seekLeaderStore(backOffer); if (result != null) { return result; } if (conf.getEnableGrpcForward()) { // seek an available proxy store to forward request - return seekProxyStore(slowLog); + backOffer.checkTimeout(); + return seekProxyStore(backOffer); } return false; } @@ -182,9 +179,9 @@ public abstract class AbstractRegionStoreClient } } - private Boolean seekLeaderStore(SlowLog slowLog) { + private Boolean seekLeaderStore(BackOffer backOffer) { Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer(); - SlowLogSpan slowLogSpan = slowLog.start("seekLeaderStore"); + SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore"); try { List peers = region.getFollowerList(); if (peers.isEmpty()) { @@ -226,8 +223,8 @@ public abstract class AbstractRegionStoreClient return null; } - private boolean seekProxyStore(SlowLog slowLog) { - SlowLogSpan slowLogSpan = slowLog.start("seekProxyStore"); + private boolean seekProxyStore(BackOffer backOffer) { + SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore"); Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer(); try { logger.info(String.format("try grpc forward: region[%d]", region.getId())); diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index 307538d4a8..e0a3dce930 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -17,13 +17,13 @@ package org.tikv.common.region; -import org.tikv.common.log.SlowLog; +import org.tikv.common.util.BackOffer; public interface RegionErrorReceiver { boolean onNotLeader(TiRegion region); /// return whether we need to retry this request. - boolean onStoreUnreachable(SlowLog slowLog); + boolean onStoreUnreachable(BackOffer backOffer); TiRegion getRegion(); } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 8bb4f8186c..bcce5f6e94 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -33,6 +33,7 @@ import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.InvalidStoreException; import org.tikv.common.exception.TiClientInternalException; +import org.tikv.common.log.SlowLogSpan; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; @@ -100,6 +101,7 @@ public class RegionManager { public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); + SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("getRegionByKey"); TiRegion region = cache.getRegionByKey(key, backOffer); try { if (region == null) { @@ -112,6 +114,7 @@ public class RegionManager { return null; } finally { requestTimer.observeDuration(); + slowLogSpan.end(); } return region; diff --git a/src/main/java/org/tikv/common/util/BackOffFunction.java b/src/main/java/org/tikv/common/util/BackOffFunction.java index b80e9e8ec5..bc080e7355 100644 --- a/src/main/java/org/tikv/common/util/BackOffFunction.java +++ b/src/main/java/org/tikv/common/util/BackOffFunction.java @@ -63,6 +63,7 @@ public class BackOffFunction { BoRegionMiss, BoUpdateLeader, BoServerBusy, - BoTxnNotFound + BoTxnNotFound, + BoCheckTimeout } } diff --git a/src/main/java/org/tikv/common/util/BackOffer.java b/src/main/java/org/tikv/common/util/BackOffer.java index 77b0518326..c64529c407 100644 --- a/src/main/java/org/tikv/common/util/BackOffer.java +++ b/src/main/java/org/tikv/common/util/BackOffer.java @@ -39,6 +39,10 @@ public interface BackOffer { * max back off time exceeded and throw an exception to the caller. */ void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err); + + /** check if deadline exceeded. */ + void checkTimeout(); + /** * canRetryAfterSleep sleeps a while base on the BackOffType and records the error message. Will * stop until max back off time exceeded and throw an exception to the caller. It will return diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index 89c79c3078..355c9ff602 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; +import org.tikv.common.exception.TiKVException; import org.tikv.common.log.SlowLog; import org.tikv.common.log.SlowLogEmptyImpl; import org.tikv.common.log.SlowLogSpan; @@ -142,6 +143,9 @@ public class ConcreteBackOffer implements BackOffer { case BoTxnNotFound: backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter); break; + case BoCheckTimeout: + backOffFunction = BackOffFunction.create(0, 0, BackOffStrategy.NoJitter); + break; } return backOffFunction; } @@ -151,6 +155,13 @@ public class ConcreteBackOffer implements BackOffer { doBackOffWithMaxSleep(funcType, -1, err); } + @Override + public void checkTimeout() { + if (!canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoCheckTimeout)) { + logThrowError(new TiKVException("Request Timeout")); + } + } + @Override public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) { return canRetryAfterSleep(funcType, -1); diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 2caa837a73..9e8884494c 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -155,8 +155,7 @@ public class RawKVClientTest extends BaseRawKVTest { } finally { long e = System.currentTimeMillis(); long duration = e - s; - logger.info("duration = " + duration); - assert (duration >= 2900); + assertTrue(duration >= 2900); } } @@ -175,8 +174,26 @@ public class RawKVClientTest extends BaseRawKVTest { } finally { long e = System.currentTimeMillis(); long duration = e - s; - logger.info("duration = " + duration); - assert (duration <= timeout + sleep); + assertTrue(duration <= timeout + sleep); + } + } + + @Test + public void testBackoffTimeout() { + int timeout = 500; + int sleep = 150; + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE); + long s = System.currentTimeMillis(); + try { + while (true) { + Thread.sleep(sleep); + backOffer.checkTimeout(); + } + } catch (Exception ignored) { + } finally { + long e = System.currentTimeMillis(); + long duration = e - s; + assertTrue(duration <= timeout + sleep); } }