mirror of https://github.com/tikv/client-java.git
				
				
				
			Signed-off-by: ti-srebot <ti-srebot@pingcap.com> Co-authored-by: tomato <38561029+qidi1@users.noreply.github.com>
This commit is contained in:
		
							parent
							
								
									2029478fa9
								
							
						
					
					
						commit
						c85eeae7d3
					
				|  | @ -97,6 +97,8 @@ public class CacheInvalidateEvent implements Serializable { | |||
| 
 | ||||
|   public enum CacheType implements Serializable { | ||||
|     REGION_STORE, | ||||
|     STORE, | ||||
|     REGION, | ||||
|     REQ_FAILED, | ||||
|     LEADER | ||||
|   } | ||||
|  |  | |||
|  | @ -21,10 +21,13 @@ import io.grpc.Status; | |||
| import io.grpc.StatusRuntimeException; | ||||
| import java.util.ArrayList; | ||||
| import java.util.List; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| import java.util.function.Function; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| import org.tikv.common.codec.KeyUtils; | ||||
| import org.tikv.common.event.CacheInvalidateEvent; | ||||
| import org.tikv.common.event.CacheInvalidateEvent.CacheType; | ||||
| import org.tikv.common.exception.GrpcException; | ||||
| import org.tikv.common.exception.TiKVException; | ||||
| import org.tikv.common.region.RegionErrorReceiver; | ||||
|  | @ -43,6 +46,11 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { | |||
|   private final Function<RespT, Errorpb.Error> getRegionError; | ||||
|   private final RegionManager regionManager; | ||||
|   private final RegionErrorReceiver recv; | ||||
|   private final List<Function<CacheInvalidateEvent, Void>> cacheInvalidateCallBackList; | ||||
| 
 | ||||
|   private final ExecutorService callBackThreadPool; | ||||
|   private final int INVALID_STORE_ID = 0; | ||||
|   private final int INVALID_REGION_ID = 0; | ||||
| 
 | ||||
|   public RegionErrorHandler( | ||||
|       RegionManager regionManager, | ||||
|  | @ -51,6 +59,8 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { | |||
|     this.recv = recv; | ||||
|     this.regionManager = regionManager; | ||||
|     this.getRegionError = getRegionError; | ||||
|     this.cacheInvalidateCallBackList = regionManager.getCacheInvalidateCallbackList(); | ||||
|     this.callBackThreadPool = regionManager.getCallBackThreadPool(); | ||||
|   } | ||||
| 
 | ||||
|   @Override | ||||
|  | @ -107,6 +117,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { | |||
| 
 | ||||
|       if (!retry) { | ||||
|         this.regionManager.invalidateRegion(recv.getRegion()); | ||||
|         notifyRegionLeaderError(recv.getRegion()); | ||||
|       } | ||||
| 
 | ||||
|       backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); | ||||
|  | @ -116,15 +127,14 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { | |||
|       // this error is reported from raftstore: | ||||
|       // store_id requested at the moment is inconsistent with that expected | ||||
|       // Solution:re-fetch from PD | ||||
|       long storeId = recv.getRegion().getLeader().getStoreId(); | ||||
|       long storeId = error.getStoreNotMatch().getRequestStoreId(); | ||||
|       long actualStoreId = error.getStoreNotMatch().getActualStoreId(); | ||||
|       logger.warn( | ||||
|           String.format( | ||||
|               "Store Not Match happened with region id %d, store id %d, actual store id %d", | ||||
|               recv.getRegion().getId(), storeId, actualStoreId)); | ||||
| 
 | ||||
|       this.regionManager.invalidateRegion(recv.getRegion()); | ||||
|       this.regionManager.invalidateStore(storeId); | ||||
|       // may request store which is not leader. | ||||
|       invalidateRegionStoreCache(recv.getRegion(), storeId); | ||||
|       // assume this is a low probability error, do not retry, just re-split the request by | ||||
|       // throwing it out. | ||||
|       return false; | ||||
|  | @ -143,8 +153,6 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { | |||
|           BackOffFunction.BackOffFuncType.BoServerBusy, | ||||
|           new StatusRuntimeException( | ||||
|               Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); | ||||
|       backOffer.doBackOff( | ||||
|           BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); | ||||
|       return true; | ||||
|     } else if (error.hasStaleCommand()) { | ||||
|       // this error is reported from raftstore: | ||||
|  | @ -179,7 +187,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { | |||
|     logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); | ||||
|     // For other errors, we only drop cache here. | ||||
|     // Upper level may split this task. | ||||
|     invalidateRegionStoreCache(recv.getRegion()); | ||||
|     invalidateRegionStoreCache(recv.getRegion(), recv.getRegion().getLeader().getStoreId()); | ||||
|     // retry if raft proposal is dropped, it indicates the store is in the middle of transition | ||||
|     if (error.getMessage().contains("Raft ProposalDropped")) { | ||||
|       backOffer.doBackOff( | ||||
|  | @ -196,6 +204,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { | |||
|   private boolean onRegionEpochNotMatch(BackOffer backOffer, List<Metapb.Region> currentRegions) { | ||||
|     if (currentRegions.size() == 0) { | ||||
|       this.regionManager.onRegionStale(recv.getRegion()); | ||||
|       notifyRegionCacheInvalidate(recv.getRegion()); | ||||
|       return false; | ||||
|     } | ||||
| 
 | ||||
|  | @ -229,6 +238,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { | |||
|     } | ||||
| 
 | ||||
|     if (needInvalidateOld) { | ||||
|       notifyRegionCacheInvalidate(recv.getRegion()); | ||||
|       this.regionManager.onRegionStale(recv.getRegion()); | ||||
|     } | ||||
| 
 | ||||
|  | @ -271,8 +281,51 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { | |||
|     return recv.getRegion(); | ||||
|   } | ||||
| 
 | ||||
|   private void invalidateRegionStoreCache(TiRegion ctxRegion) { | ||||
|   private void notifyRegionRequestError( | ||||
|       TiRegion ctxRegion, long storeId, CacheInvalidateEvent.CacheType type) { | ||||
|     CacheInvalidateEvent event; | ||||
|     // When store(region) id is invalid, | ||||
|     // it implies that the error was not caused by store(region) error. | ||||
|     switch (type) { | ||||
|       case REGION: | ||||
|       case LEADER: | ||||
|         event = new CacheInvalidateEvent(ctxRegion.getId(), INVALID_STORE_ID, true, false, type); | ||||
|         break; | ||||
|       case REGION_STORE: | ||||
|         event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type); | ||||
|         break; | ||||
|       case REQ_FAILED: | ||||
|         event = new CacheInvalidateEvent(INVALID_REGION_ID, INVALID_STORE_ID, false, false, type); | ||||
|         break; | ||||
|       default: | ||||
|         throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type); | ||||
|     } | ||||
|     if (cacheInvalidateCallBackList != null) { | ||||
|       for (Function<CacheInvalidateEvent, Void> cacheInvalidateCallBack : | ||||
|           cacheInvalidateCallBackList) { | ||||
|         callBackThreadPool.submit( | ||||
|             () -> { | ||||
|               try { | ||||
|                 cacheInvalidateCallBack.apply(event); | ||||
|               } catch (Exception e) { | ||||
|                 logger.error(String.format("CacheInvalidCallBack failed %s", e)); | ||||
|               } | ||||
|             }); | ||||
|       } | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) { | ||||
|     regionManager.invalidateRegion(ctxRegion); | ||||
|     regionManager.invalidateStore(ctxRegion.getLeader().getStoreId()); | ||||
|     regionManager.invalidateStore(storeId); | ||||
|     notifyRegionRequestError(ctxRegion, storeId, CacheType.REGION_STORE); | ||||
|   } | ||||
| 
 | ||||
|   private void notifyRegionCacheInvalidate(TiRegion ctxRegion) { | ||||
|     notifyRegionRequestError(ctxRegion, 0, CacheType.REGION); | ||||
|   } | ||||
| 
 | ||||
|   private void notifyRegionLeaderError(TiRegion ctxRegion) { | ||||
|     notifyRegionRequestError(ctxRegion, 0, CacheType.LEADER); | ||||
|   } | ||||
| } | ||||
|  |  | |||
|  | @ -23,14 +23,18 @@ import com.google.protobuf.ByteString; | |||
| import io.prometheus.client.Histogram; | ||||
| import java.util.ArrayList; | ||||
| import java.util.List; | ||||
| import java.util.concurrent.CopyOnWriteArrayList; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| import java.util.concurrent.Executors; | ||||
| import java.util.concurrent.ScheduledExecutorService; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import java.util.concurrent.atomic.AtomicInteger; | ||||
| import java.util.function.Function; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| import org.tikv.common.ReadOnlyPDClient; | ||||
| import org.tikv.common.TiConfiguration; | ||||
| import org.tikv.common.event.CacheInvalidateEvent; | ||||
| import org.tikv.common.exception.GrpcException; | ||||
| import org.tikv.common.exception.InvalidStoreException; | ||||
| import org.tikv.common.exception.TiClientInternalException; | ||||
|  | @ -69,10 +73,36 @@ public class RegionManager { | |||
|   private final TiConfiguration conf; | ||||
|   private final ScheduledExecutorService executor; | ||||
|   private final StoreHealthyChecker storeChecker; | ||||
|   private final CopyOnWriteArrayList<Function<CacheInvalidateEvent, Void>> | ||||
|       cacheInvalidateCallbackList; | ||||
|   private final ExecutorService callBackThreadPool; | ||||
|   private AtomicInteger tiflashStoreIndex = new AtomicInteger(0); | ||||
| 
 | ||||
|   public RegionManager( | ||||
|       TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { | ||||
|     this(conf, pdClient, channelFactory, 1); | ||||
|   } | ||||
| 
 | ||||
|   public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { | ||||
|     this(conf, pdClient, 1); | ||||
|   } | ||||
| 
 | ||||
|   public RegionManager( | ||||
|       TiConfiguration conf, ReadOnlyPDClient pdClient, int callBackExecutorThreadNum) { | ||||
|     this.cache = new RegionCache(); | ||||
|     this.pdClient = pdClient; | ||||
|     this.conf = conf; | ||||
|     this.storeChecker = null; | ||||
|     this.executor = null; | ||||
|     this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); | ||||
|     this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum); | ||||
|   } | ||||
| 
 | ||||
|   public RegionManager( | ||||
|       TiConfiguration conf, | ||||
|       ReadOnlyPDClient pdClient, | ||||
|       ChannelFactory channelFactory, | ||||
|       int callBackExecutorThreadNum) { | ||||
|     this.cache = new RegionCache(); | ||||
|     this.pdClient = pdClient; | ||||
|     this.conf = conf; | ||||
|  | @ -83,26 +113,34 @@ public class RegionManager { | |||
|     this.storeChecker = storeChecker; | ||||
|     this.executor = Executors.newScheduledThreadPool(1); | ||||
|     this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); | ||||
|   } | ||||
| 
 | ||||
|   public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { | ||||
|     this.cache = new RegionCache(); | ||||
|     this.pdClient = pdClient; | ||||
|     this.conf = conf; | ||||
|     this.storeChecker = null; | ||||
|     this.executor = null; | ||||
|     this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); | ||||
|     this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum); | ||||
|   } | ||||
| 
 | ||||
|   public synchronized void close() { | ||||
|     if (this.executor != null) { | ||||
|       this.executor.shutdownNow(); | ||||
|     } | ||||
|     this.callBackThreadPool.shutdownNow(); | ||||
|   } | ||||
| 
 | ||||
|   public ReadOnlyPDClient getPDClient() { | ||||
|     return this.pdClient; | ||||
|   } | ||||
| 
 | ||||
|   public ExecutorService getCallBackThreadPool() { | ||||
|     return callBackThreadPool; | ||||
|   } | ||||
| 
 | ||||
|   public List<Function<CacheInvalidateEvent, Void>> getCacheInvalidateCallbackList() { | ||||
|     return cacheInvalidateCallbackList; | ||||
|   } | ||||
| 
 | ||||
|   public void addCacheInvalidateCallback( | ||||
|       Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) { | ||||
|     this.cacheInvalidateCallbackList.add(cacheInvalidateCallback); | ||||
|   } | ||||
| 
 | ||||
|   public void invalidateAll() { | ||||
|     cache.invalidateAll(); | ||||
|   } | ||||
|  |  | |||
|  | @ -0,0 +1,130 @@ | |||
| /* | ||||
|  * Copyright 2022 TiKV Project Authors. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  * http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  * | ||||
|  */ | ||||
| 
 | ||||
| package org.tikv.common; | ||||
| 
 | ||||
| import static org.junit.Assert.assertEquals; | ||||
| import static org.junit.Assert.fail; | ||||
| 
 | ||||
| import com.google.protobuf.ByteString; | ||||
| import java.util.ArrayList; | ||||
| import java.util.List; | ||||
| import java.util.Optional; | ||||
| import java.util.function.Function; | ||||
| import org.junit.Test; | ||||
| import org.tikv.common.event.CacheInvalidateEvent; | ||||
| import org.tikv.common.region.RegionManager; | ||||
| import org.tikv.common.region.RegionStoreClient; | ||||
| import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; | ||||
| import org.tikv.common.region.TiStore; | ||||
| import org.tikv.common.util.BackOffer; | ||||
| import org.tikv.common.util.ConcreteBackOffer; | ||||
| import org.tikv.kvproto.Errorpb; | ||||
| import org.tikv.kvproto.Errorpb.EpochNotMatch; | ||||
| import org.tikv.kvproto.Errorpb.NotLeader; | ||||
| import org.tikv.kvproto.Errorpb.StoreNotMatch; | ||||
| import org.tikv.kvproto.Metapb; | ||||
| 
 | ||||
| public class CacheInvalidCallBackTest extends MockServerTest { | ||||
| 
 | ||||
|   private RegionStoreClient createClient( | ||||
|       String version, Function<CacheInvalidateEvent, Void> cacheInvalidateCallBack) { | ||||
|     Metapb.Store meta = | ||||
|         Metapb.Store.newBuilder() | ||||
|             .setAddress(LOCAL_ADDR + ":" + port) | ||||
|             .setId(1) | ||||
|             .setState(Metapb.StoreState.Up) | ||||
|             .setVersion(version) | ||||
|             .build(); | ||||
|     TiStore store = new TiStore(meta); | ||||
| 
 | ||||
|     RegionManager manager = new RegionManager(session.getConf(), session.getPDClient()); | ||||
|     manager.addCacheInvalidateCallback(cacheInvalidateCallBack); | ||||
|     RegionStoreClientBuilder builder = | ||||
|         new RegionStoreClientBuilder( | ||||
|             session.getConf(), session.getChannelFactory(), manager, session.getPDClient()); | ||||
| 
 | ||||
|     return builder.build(region, store); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   public void testcacheInvalidCallBack() { | ||||
|     String version = "3.0.12"; | ||||
|     CacheInvalidateCallBack cacheInvalidateCallBack = new CacheInvalidateCallBack(); | ||||
|     doRawGetTest(createClient(version, cacheInvalidateCallBack), cacheInvalidateCallBack); | ||||
|   } | ||||
| 
 | ||||
|   public void doRawGetTest( | ||||
|       RegionStoreClient client, CacheInvalidateCallBack cacheInvalidateCallBack) { | ||||
|     server.put("key1", "value1"); | ||||
|     Optional<ByteString> value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); | ||||
|     assertEquals(ByteString.copyFromUtf8("value1"), value.get()); | ||||
|     try { | ||||
|       server.putError( | ||||
|           "error1", () -> Errorpb.Error.newBuilder().setNotLeader(NotLeader.getDefaultInstance())); | ||||
|       client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("error1")); | ||||
|       fail(); | ||||
|     } catch (Exception e) { | ||||
|       assertEquals(1, cacheInvalidateCallBack.cacheInvalidateEvents.size()); | ||||
|     } | ||||
|     server.putError( | ||||
|         "failure", | ||||
|         () -> Errorpb.Error.newBuilder().setEpochNotMatch(EpochNotMatch.getDefaultInstance())); | ||||
|     try { | ||||
|       client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); | ||||
|       fail(); | ||||
|     } catch (Exception e) { | ||||
|       sleep(1000); | ||||
|       assertEquals(2, cacheInvalidateCallBack.cacheInvalidateEvents.size()); | ||||
|     } | ||||
|     server.putError( | ||||
|         "store_not_match", | ||||
|         () -> Errorpb.Error.newBuilder().setStoreNotMatch(StoreNotMatch.getDefaultInstance())); | ||||
|     try { | ||||
|       client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); | ||||
|       fail(); | ||||
|     } catch (Exception e) { | ||||
|       sleep(1000); | ||||
|       assertEquals(3, cacheInvalidateCallBack.cacheInvalidateEvents.size()); | ||||
|     } | ||||
|     server.clearAllMap(); | ||||
|     client.close(); | ||||
|   } | ||||
| 
 | ||||
|   private void sleep(int time) { | ||||
|     try { | ||||
|       Thread.sleep(time); | ||||
|     } catch (InterruptedException e) { | ||||
|       fail(); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private BackOffer defaultBackOff() { | ||||
|     return ConcreteBackOffer.newCustomBackOff(1000); | ||||
|   } | ||||
| 
 | ||||
|   static class CacheInvalidateCallBack implements Function<CacheInvalidateEvent, Void> { | ||||
| 
 | ||||
|     public List<CacheInvalidateEvent> cacheInvalidateEvents = new ArrayList<>(); | ||||
| 
 | ||||
|     @Override | ||||
|     public Void apply(CacheInvalidateEvent cacheInvalidateEvent) { | ||||
|       cacheInvalidateEvents.add(cacheInvalidateEvent); | ||||
|       return null; | ||||
|     } | ||||
|   } | ||||
| } | ||||
		Loading…
	
		Reference in New Issue