mirror of https://github.com/tikv/client-java.git
				
				
				
			Signed-off-by: marsishandsome <marsishandsome@gmail.com>
This commit is contained in:
		
							parent
							
								
									1886700118
								
							
						
					
					
						commit
						b61b9cd938
					
				|  | @ -216,7 +216,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { | |||
| 
 | ||||
|   @Override | ||||
|   public boolean handleRequestError(BackOffer backOffer, Exception e) { | ||||
|     if (recv.onStoreUnreachable(backOffer.getSlowLog())) { | ||||
|     if (recv.onStoreUnreachable(backOffer)) { | ||||
|       if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) { | ||||
|         regionManager.onRequestFail(recv.getRegion()); | ||||
|         throw new GrpcException("retry is exhausted.", e); | ||||
|  |  | |||
|  | @ -89,6 +89,7 @@ public abstract class RetryPolicy<RespT> { | |||
|         } catch (Exception e) { | ||||
|           rethrowNotRecoverableException(e); | ||||
|           // Handle request call error | ||||
|           backOffer.checkTimeout(); | ||||
|           boolean retry = handler.handleRequestError(backOffer, e); | ||||
|           if (retry) { | ||||
|             GRPC_REQUEST_RETRY_NUM.labels(methodName).inc(); | ||||
|  |  | |||
|  | @ -35,9 +35,8 @@ import org.slf4j.LoggerFactory; | |||
| import org.tikv.common.AbstractGRPCClient; | ||||
| import org.tikv.common.TiConfiguration; | ||||
| import org.tikv.common.exception.GrpcException; | ||||
| import org.tikv.common.log.SlowLog; | ||||
| import org.tikv.common.log.SlowLogEmptyImpl; | ||||
| import org.tikv.common.log.SlowLogSpan; | ||||
| import org.tikv.common.util.BackOffer; | ||||
| import org.tikv.common.util.ChannelFactory; | ||||
| import org.tikv.kvproto.Kvrpcpb; | ||||
| import org.tikv.kvproto.Metapb; | ||||
|  | @ -81,10 +80,6 @@ public abstract class AbstractRegionStoreClient | |||
|     this.store = store; | ||||
|     if (this.store.getProxyStore() != null) { | ||||
|       this.timeout = conf.getForwardTimeout(); | ||||
|     } else if (!this.store.isReachable()) { | ||||
|       // cannot get Deadline or SlowLog instance here | ||||
|       // use SlowLogEmptyImpl instead to skip slow log record | ||||
|       onStoreUnreachable(SlowLogEmptyImpl.INSTANCE); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|  | @ -134,22 +129,24 @@ public abstract class AbstractRegionStoreClient | |||
|   } | ||||
| 
 | ||||
|   @Override | ||||
|   public boolean onStoreUnreachable(SlowLog slowLog) { | ||||
|   public boolean onStoreUnreachable(BackOffer backOffer) { | ||||
|     if (!store.isValid()) { | ||||
|       logger.warn(String.format("store [%d] has been invalid", store.getId())); | ||||
|       store = regionManager.getStoreById(store.getId()); | ||||
|       store = regionManager.getStoreById(store.getId(), backOffer); | ||||
|       updateClientStub(); | ||||
|       return true; | ||||
|     } | ||||
| 
 | ||||
|     // seek an available leader store to send request | ||||
|     Boolean result = seekLeaderStore(slowLog); | ||||
|     backOffer.checkTimeout(); | ||||
|     Boolean result = seekLeaderStore(backOffer); | ||||
|     if (result != null) { | ||||
|       return result; | ||||
|     } | ||||
|     if (conf.getEnableGrpcForward()) { | ||||
|       // seek an available proxy store to forward request | ||||
|       return seekProxyStore(slowLog); | ||||
|       backOffer.checkTimeout(); | ||||
|       return seekProxyStore(backOffer); | ||||
|     } | ||||
|     return false; | ||||
|   } | ||||
|  | @ -182,9 +179,9 @@ public abstract class AbstractRegionStoreClient | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private Boolean seekLeaderStore(SlowLog slowLog) { | ||||
|   private Boolean seekLeaderStore(BackOffer backOffer) { | ||||
|     Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer(); | ||||
|     SlowLogSpan slowLogSpan = slowLog.start("seekLeaderStore"); | ||||
|     SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore"); | ||||
|     try { | ||||
|       List<Metapb.Peer> peers = region.getFollowerList(); | ||||
|       if (peers.isEmpty()) { | ||||
|  | @ -226,8 +223,8 @@ public abstract class AbstractRegionStoreClient | |||
|     return null; | ||||
|   } | ||||
| 
 | ||||
|   private boolean seekProxyStore(SlowLog slowLog) { | ||||
|     SlowLogSpan slowLogSpan = slowLog.start("seekProxyStore"); | ||||
|   private boolean seekProxyStore(BackOffer backOffer) { | ||||
|     SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore"); | ||||
|     Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer(); | ||||
|     try { | ||||
|       logger.info(String.format("try grpc forward: region[%d]", region.getId())); | ||||
|  |  | |||
|  | @ -17,13 +17,13 @@ | |||
| 
 | ||||
| package org.tikv.common.region; | ||||
| 
 | ||||
| import org.tikv.common.log.SlowLog; | ||||
| import org.tikv.common.util.BackOffer; | ||||
| 
 | ||||
| public interface RegionErrorReceiver { | ||||
|   boolean onNotLeader(TiRegion region); | ||||
| 
 | ||||
|   /// return whether we need to retry this request. | ||||
|   boolean onStoreUnreachable(SlowLog slowLog); | ||||
|   boolean onStoreUnreachable(BackOffer backOffer); | ||||
| 
 | ||||
|   TiRegion getRegion(); | ||||
| } | ||||
|  |  | |||
|  | @ -33,6 +33,7 @@ import org.tikv.common.TiConfiguration; | |||
| import org.tikv.common.exception.GrpcException; | ||||
| import org.tikv.common.exception.InvalidStoreException; | ||||
| import org.tikv.common.exception.TiClientInternalException; | ||||
| import org.tikv.common.log.SlowLogSpan; | ||||
| import org.tikv.common.util.BackOffer; | ||||
| import org.tikv.common.util.ChannelFactory; | ||||
| import org.tikv.common.util.ConcreteBackOffer; | ||||
|  | @ -100,6 +101,7 @@ public class RegionManager { | |||
| 
 | ||||
|   public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { | ||||
|     Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); | ||||
|     SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("getRegionByKey"); | ||||
|     TiRegion region = cache.getRegionByKey(key, backOffer); | ||||
|     try { | ||||
|       if (region == null) { | ||||
|  | @ -112,6 +114,7 @@ public class RegionManager { | |||
|       return null; | ||||
|     } finally { | ||||
|       requestTimer.observeDuration(); | ||||
|       slowLogSpan.end(); | ||||
|     } | ||||
| 
 | ||||
|     return region; | ||||
|  |  | |||
|  | @ -63,6 +63,7 @@ public class BackOffFunction { | |||
|     BoRegionMiss, | ||||
|     BoUpdateLeader, | ||||
|     BoServerBusy, | ||||
|     BoTxnNotFound | ||||
|     BoTxnNotFound, | ||||
|     BoCheckTimeout | ||||
|   } | ||||
| } | ||||
|  |  | |||
|  | @ -39,6 +39,10 @@ public interface BackOffer { | |||
|    * max back off time exceeded and throw an exception to the caller. | ||||
|    */ | ||||
|   void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err); | ||||
| 
 | ||||
|   /** check if deadline exceeded. */ | ||||
|   void checkTimeout(); | ||||
| 
 | ||||
|   /** | ||||
|    * canRetryAfterSleep sleeps a while base on the BackOffType and records the error message. Will | ||||
|    * stop until max back off time exceeded and throw an exception to the caller. It will return | ||||
|  |  | |||
|  | @ -29,6 +29,7 @@ import org.slf4j.Logger; | |||
| import org.slf4j.LoggerFactory; | ||||
| import org.tikv.common.TiConfiguration; | ||||
| import org.tikv.common.exception.GrpcException; | ||||
| import org.tikv.common.exception.TiKVException; | ||||
| import org.tikv.common.log.SlowLog; | ||||
| import org.tikv.common.log.SlowLogEmptyImpl; | ||||
| import org.tikv.common.log.SlowLogSpan; | ||||
|  | @ -142,6 +143,9 @@ public class ConcreteBackOffer implements BackOffer { | |||
|       case BoTxnNotFound: | ||||
|         backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter); | ||||
|         break; | ||||
|       case BoCheckTimeout: | ||||
|         backOffFunction = BackOffFunction.create(0, 0, BackOffStrategy.NoJitter); | ||||
|         break; | ||||
|     } | ||||
|     return backOffFunction; | ||||
|   } | ||||
|  | @ -151,6 +155,13 @@ public class ConcreteBackOffer implements BackOffer { | |||
|     doBackOffWithMaxSleep(funcType, -1, err); | ||||
|   } | ||||
| 
 | ||||
|   @Override | ||||
|   public void checkTimeout() { | ||||
|     if (!canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoCheckTimeout)) { | ||||
|       logThrowError(new TiKVException("Request Timeout")); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   @Override | ||||
|   public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) { | ||||
|     return canRetryAfterSleep(funcType, -1); | ||||
|  |  | |||
|  | @ -155,8 +155,7 @@ public class RawKVClientTest extends BaseRawKVTest { | |||
|     } finally { | ||||
|       long e = System.currentTimeMillis(); | ||||
|       long duration = e - s; | ||||
|       logger.info("duration = " + duration); | ||||
|       assert (duration >= 2900); | ||||
|       assertTrue(duration >= 2900); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|  | @ -175,8 +174,26 @@ public class RawKVClientTest extends BaseRawKVTest { | |||
|     } finally { | ||||
|       long e = System.currentTimeMillis(); | ||||
|       long duration = e - s; | ||||
|       logger.info("duration = " + duration); | ||||
|       assert (duration <= timeout + sleep); | ||||
|       assertTrue(duration <= timeout + sleep); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   public void testBackoffTimeout() { | ||||
|     int timeout = 500; | ||||
|     int sleep = 150; | ||||
|     BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE); | ||||
|     long s = System.currentTimeMillis(); | ||||
|     try { | ||||
|       while (true) { | ||||
|         Thread.sleep(sleep); | ||||
|         backOffer.checkTimeout(); | ||||
|       } | ||||
|     } catch (Exception ignored) { | ||||
|     } finally { | ||||
|       long e = System.currentTimeMillis(); | ||||
|       long duration = e - s; | ||||
|       assertTrue(duration <= timeout + sleep); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue