check timeout during SeekLeader and SeekProxy (#352)

This commit is contained in:
Liangliang Gu 2021-12-02 00:16:10 +08:00 committed by GitHub
parent 7e2856949e
commit 6ec1d70628
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 56 additions and 22 deletions

View File

@ -217,7 +217,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
@Override @Override
public boolean handleRequestError(BackOffer backOffer, Exception e) { public boolean handleRequestError(BackOffer backOffer, Exception e) {
if (recv.onStoreUnreachable(backOffer.getSlowLog())) { if (recv.onStoreUnreachable(backOffer)) {
if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) { if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) {
regionManager.onRequestFail(recv.getRegion()); regionManager.onRequestFail(recv.getRegion());
throw new GrpcException("retry is exhausted.", e); throw new GrpcException("retry is exhausted.", e);

View File

@ -89,6 +89,7 @@ public abstract class RetryPolicy<RespT> {
} catch (Exception e) { } catch (Exception e) {
rethrowNotRecoverableException(e); rethrowNotRecoverableException(e);
// Handle request call error // Handle request call error
backOffer.checkTimeout();
boolean retry = handler.handleRequestError(backOffer, e); boolean retry = handler.handleRequestError(backOffer, e);
if (retry) { if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc(); GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();

View File

@ -35,9 +35,8 @@ import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient; import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration; import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.GrpcException;
import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogSpan; import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ChannelFactory;
import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb;
@ -81,10 +80,6 @@ public abstract class AbstractRegionStoreClient
this.store = store; this.store = store;
if (this.store.getProxyStore() != null) { if (this.store.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout(); this.timeout = conf.getForwardTimeout();
} else if (!this.store.isReachable()) {
// cannot get Deadline or SlowLog instance here
// use SlowLogEmptyImpl instead to skip slow log record
onStoreUnreachable(SlowLogEmptyImpl.INSTANCE);
} }
} }
@ -134,22 +129,24 @@ public abstract class AbstractRegionStoreClient
} }
@Override @Override
public boolean onStoreUnreachable(SlowLog slowLog) { public boolean onStoreUnreachable(BackOffer backOffer) {
if (!store.isValid()) { if (!store.isValid()) {
logger.warn(String.format("store [%d] has been invalid", store.getId())); logger.warn(String.format("store [%d] has been invalid", store.getId()));
store = regionManager.getStoreById(store.getId()); store = regionManager.getStoreById(store.getId(), backOffer);
updateClientStub(); updateClientStub();
return true; return true;
} }
// seek an available leader store to send request // seek an available leader store to send request
Boolean result = seekLeaderStore(slowLog); backOffer.checkTimeout();
Boolean result = seekLeaderStore(backOffer);
if (result != null) { if (result != null) {
return result; return result;
} }
if (conf.getEnableGrpcForward()) { if (conf.getEnableGrpcForward()) {
// seek an available proxy store to forward request // seek an available proxy store to forward request
return seekProxyStore(slowLog); backOffer.checkTimeout();
return seekProxyStore(backOffer);
} }
return false; return false;
} }
@ -182,9 +179,9 @@ public abstract class AbstractRegionStoreClient
} }
} }
private Boolean seekLeaderStore(SlowLog slowLog) { private Boolean seekLeaderStore(BackOffer backOffer) {
Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer(); Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer();
SlowLogSpan slowLogSpan = slowLog.start("seekLeaderStore"); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore");
try { try {
List<Metapb.Peer> peers = region.getFollowerList(); List<Metapb.Peer> peers = region.getFollowerList();
if (peers.isEmpty()) { if (peers.isEmpty()) {
@ -226,8 +223,8 @@ public abstract class AbstractRegionStoreClient
return null; return null;
} }
private boolean seekProxyStore(SlowLog slowLog) { private boolean seekProxyStore(BackOffer backOffer) {
SlowLogSpan slowLogSpan = slowLog.start("seekProxyStore"); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore");
Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer(); Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer();
try { try {
logger.info(String.format("try grpc forward: region[%d]", region.getId())); logger.info(String.format("try grpc forward: region[%d]", region.getId()));

View File

@ -17,13 +17,13 @@
package org.tikv.common.region; package org.tikv.common.region;
import org.tikv.common.log.SlowLog; import org.tikv.common.util.BackOffer;
public interface RegionErrorReceiver { public interface RegionErrorReceiver {
boolean onNotLeader(TiRegion region); boolean onNotLeader(TiRegion region);
/// return whether we need to retry this request. /// return whether we need to retry this request.
boolean onStoreUnreachable(SlowLog slowLog); boolean onStoreUnreachable(BackOffer backOffer);
TiRegion getRegion(); TiRegion getRegion();
} }

View File

@ -33,6 +33,7 @@ import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.InvalidStoreException; import org.tikv.common.exception.InvalidStoreException;
import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.util.BackOffer; import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.ConcreteBackOffer;
@ -96,6 +97,7 @@ public class RegionManager {
public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("getRegionByKey");
TiRegion region = cache.getRegionByKey(key, backOffer); TiRegion region = cache.getRegionByKey(key, backOffer);
try { try {
if (region == null) { if (region == null) {
@ -106,6 +108,7 @@ public class RegionManager {
} }
} finally { } finally {
requestTimer.observeDuration(); requestTimer.observeDuration();
slowLogSpan.end();
} }
return region; return region;

View File

@ -63,6 +63,7 @@ public class BackOffFunction {
BoRegionMiss, BoRegionMiss,
BoUpdateLeader, BoUpdateLeader,
BoServerBusy, BoServerBusy,
BoTxnNotFound BoTxnNotFound,
BoCheckTimeout
} }
} }

View File

@ -35,6 +35,10 @@ public interface BackOffer {
* max back off time exceeded and throw an exception to the caller. * max back off time exceeded and throw an exception to the caller.
*/ */
void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err); void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err);
/** check if deadline exceeded. */
void checkTimeout();
/** /**
* canRetryAfterSleep sleeps a while base on the BackOffType and records the error message. Will * canRetryAfterSleep sleeps a while base on the BackOffType and records the error message. Will
* stop until max back off time exceeded and throw an exception to the caller. It will return * stop until max back off time exceeded and throw an exception to the caller. It will return

View File

@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration; import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.log.SlowLog; import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogEmptyImpl; import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogSpan; import org.tikv.common.log.SlowLogSpan;
@ -142,6 +143,9 @@ public class ConcreteBackOffer implements BackOffer {
case BoTxnNotFound: case BoTxnNotFound:
backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter); backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter);
break; break;
case BoCheckTimeout:
backOffFunction = BackOffFunction.create(0, 0, BackOffStrategy.NoJitter);
break;
} }
return backOffFunction; return backOffFunction;
} }
@ -151,6 +155,13 @@ public class ConcreteBackOffer implements BackOffer {
doBackOffWithMaxSleep(funcType, -1, err); doBackOffWithMaxSleep(funcType, -1, err);
} }
@Override
public void checkTimeout() {
if (!canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoCheckTimeout)) {
logThrowError(new TiKVException("Request Timeout"));
}
}
@Override @Override
public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) { public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) {
return canRetryAfterSleep(funcType, -1); return canRetryAfterSleep(funcType, -1);

View File

@ -179,8 +179,7 @@ public class RawKVClientTest {
} finally { } finally {
long e = System.currentTimeMillis(); long e = System.currentTimeMillis();
long duration = e - s; long duration = e - s;
logger.info("duration = " + duration); assertTrue(duration >= 2900);
assert (duration >= 2900);
} }
} }
@ -199,8 +198,26 @@ public class RawKVClientTest {
} finally { } finally {
long e = System.currentTimeMillis(); long e = System.currentTimeMillis();
long duration = e - s; long duration = e - s;
logger.info("duration = " + duration); assertTrue(duration <= timeout + sleep);
assert (duration <= timeout + sleep); }
}
@Test
public void testBackoffTimeout() {
int timeout = 500;
int sleep = 150;
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE);
long s = System.currentTimeMillis();
try {
while (true) {
Thread.sleep(sleep);
backOffer.checkTimeout();
}
} catch (Exception ignored) {
} finally {
long e = System.currentTimeMillis();
long duration = e - s;
assertTrue(duration <= timeout + sleep);
} }
} }