diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index e073de658c..a06401eb0b 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -120,7 +120,7 @@ public class RegionErrorHandler implements ErrorHandler { // this error is reported from raftstore: // store_id requested at the moment is inconsistent with that expected // Solution:re-fetch from PD - long storeId = recv.getRegion().getLeader().getStoreId(); + long storeId = error.getStoreNotMatch().getRequestStoreId(); long actualStoreId = error.getStoreNotMatch().getActualStoreId(); logger.warn( String.format( @@ -163,7 +163,6 @@ public class RegionErrorHandler implements ErrorHandler { // key requested is not in current region // should not happen here. ByteString invalidKey = error.getKeyNotInRegion().getKey(); - notifyRegionCacheInvalidate(recv.getRegion()); logger.error( String.format( "Key not in region [%s] for key [%s], this error should not happen here.", @@ -280,9 +279,6 @@ public class RegionErrorHandler implements ErrorHandler { case REGION_STORE: event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type); break; - case STORE: - event = new CacheInvalidateEvent(0, storeId, false, true, type); - break; case REQ_FAILED: event = new CacheInvalidateEvent(0, 0, false, false, type); break; @@ -290,6 +286,7 @@ public class RegionErrorHandler implements ErrorHandler { throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type); } if (cacheInvalidateCallBackList != null) { + new Thread(() -> { for (Function cacheInvalidateCallBack : cacheInvalidateCallBackList) { try { @@ -298,13 +295,14 @@ public class RegionErrorHandler implements ErrorHandler { logger.warn(String.format("CacheInvalidCallBack failed %s", e)); } } + }).start(); } } private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) { regionManager.invalidateRegion(ctxRegion); regionManager.invalidateStore(storeId); - notifyRegionRequestError(ctxRegion, storeId, CacheInvalidateEvent.CacheType.REGION); + notifyRegionRequestError(ctxRegion, storeId, CacheType.REGION_STORE); } private void notifyRegionCacheInvalidate(TiRegion ctxRegion) { diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 331d96ad47..a6d7e7fbf7 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -23,6 +23,7 @@ import com.google.protobuf.ByteString; import io.prometheus.client.Histogram; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -93,7 +94,7 @@ public class RegionManager { this.conf = conf; this.storeChecker = null; this.executor = null; - this.cacheInvalidateCallbackList = new ArrayList<>(); + this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); } public synchronized void close() { @@ -106,11 +107,11 @@ public class RegionManager { return this.pdClient; } - public synchronized List> getCacheInvalidateCallbackList() { + public List> getCacheInvalidateCallbackList() { return cacheInvalidateCallbackList; } - public synchronized void addCacheInvalidateCallback( + public void addCacheInvalidateCallback( Function cacheInvalidateCallback) { this.cacheInvalidateCallbackList.add(cacheInvalidateCallback); }