mirror of https://github.com/tikv/client-java.git
change to synchronized
Signed-off-by: qidi1 <1083369179@qq.com>
This commit is contained in:
parent
454f689b3e
commit
5ef3118edf
|
|
@ -120,7 +120,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
// this error is reported from raftstore:
|
||||
// store_id requested at the moment is inconsistent with that expected
|
||||
// Solution:re-fetch from PD
|
||||
long storeId = recv.getRegion().getLeader().getStoreId();
|
||||
long storeId = error.getStoreNotMatch().getRequestStoreId();
|
||||
long actualStoreId = error.getStoreNotMatch().getActualStoreId();
|
||||
logger.warn(
|
||||
String.format(
|
||||
|
|
@ -163,7 +163,6 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
// key requested is not in current region
|
||||
// should not happen here.
|
||||
ByteString invalidKey = error.getKeyNotInRegion().getKey();
|
||||
notifyRegionCacheInvalidate(recv.getRegion());
|
||||
logger.error(
|
||||
String.format(
|
||||
"Key not in region [%s] for key [%s], this error should not happen here.",
|
||||
|
|
@ -280,9 +279,6 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
case REGION_STORE:
|
||||
event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type);
|
||||
break;
|
||||
case STORE:
|
||||
event = new CacheInvalidateEvent(0, storeId, false, true, type);
|
||||
break;
|
||||
case REQ_FAILED:
|
||||
event = new CacheInvalidateEvent(0, 0, false, false, type);
|
||||
break;
|
||||
|
|
@ -290,6 +286,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type);
|
||||
}
|
||||
if (cacheInvalidateCallBackList != null) {
|
||||
new Thread(() -> {
|
||||
for (Function<CacheInvalidateEvent, Void> cacheInvalidateCallBack :
|
||||
cacheInvalidateCallBackList) {
|
||||
try {
|
||||
|
|
@ -298,13 +295,14 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
logger.warn(String.format("CacheInvalidCallBack failed %s", e));
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
}
|
||||
|
||||
private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) {
|
||||
regionManager.invalidateRegion(ctxRegion);
|
||||
regionManager.invalidateStore(storeId);
|
||||
notifyRegionRequestError(ctxRegion, storeId, CacheInvalidateEvent.CacheType.REGION);
|
||||
notifyRegionRequestError(ctxRegion, storeId, CacheType.REGION_STORE);
|
||||
}
|
||||
|
||||
private void notifyRegionCacheInvalidate(TiRegion ctxRegion) {
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import com.google.protobuf.ByteString;
|
|||
import io.prometheus.client.Histogram;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -93,7 +94,7 @@ public class RegionManager {
|
|||
this.conf = conf;
|
||||
this.storeChecker = null;
|
||||
this.executor = null;
|
||||
this.cacheInvalidateCallbackList = new ArrayList<>();
|
||||
this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
|
||||
}
|
||||
|
||||
public synchronized void close() {
|
||||
|
|
@ -106,11 +107,11 @@ public class RegionManager {
|
|||
return this.pdClient;
|
||||
}
|
||||
|
||||
public synchronized List<Function<CacheInvalidateEvent, Void>> getCacheInvalidateCallbackList() {
|
||||
public List<Function<CacheInvalidateEvent, Void>> getCacheInvalidateCallbackList() {
|
||||
return cacheInvalidateCallbackList;
|
||||
}
|
||||
|
||||
public synchronized void addCacheInvalidateCallback(
|
||||
public void addCacheInvalidateCallback(
|
||||
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
|
||||
this.cacheInvalidateCallbackList.add(cacheInvalidateCallback);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue