From dda1029b94216ce7d97353df55ca514c1977b8e3 Mon Sep 17 00:00:00 2001 From: tomato <38561029+qidi1@users.noreply.github.com> Date: Mon, 30 Jan 2023 15:17:42 +0800 Subject: [PATCH] [close #654] Add RegionCacheInvalidCallBack (#653) * Close #654 To let the upper layers customize their own behavior when the region cache fails, Add RegionCacheInvalidCallBack. Signed-off-by: qidi1 <1083369179@qq.com> * change callback to list Signed-off-by: qidi1 <1083369179@qq.com> * format code Signed-off-by: qidi1 <1083369179@qq.com> * change as comment Signed-off-by: qidi1 <1083369179@qq.com> * change to synchronized Signed-off-by: qidi1 <1083369179@qq.com> * change list to copy on write Signed-off-by: qidi1 <1083369179@qq.com> * change to muti thread Signed-off-by: qidi1 <1083369179@qq.com> * format code Signed-off-by: qidi1 <1083369179@qq.com> * add comment Signed-off-by: qidi1 <1083369179@qq.com> * change to magical num Signed-off-by: qidi1 <1083369179@qq.com> * add comment Signed-off-by: qidi1 <1083369179@qq.com> * change log levle Signed-off-by: qidi1 <1083369179@qq.com> * Fmt --------- Signed-off-by: qidi1 <1083369179@qq.com> Co-authored-by: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> --- .../common/event/CacheInvalidateEvent.java | 2 + .../common/operation/RegionErrorHandler.java | 71 ++++++++-- .../org/tikv/common/region/RegionManager.java | 54 ++++++-- .../tikv/common/CacheInvalidCallBackTest.java | 130 ++++++++++++++++++ 4 files changed, 240 insertions(+), 17 deletions(-) create mode 100644 src/test/java/org/tikv/common/CacheInvalidCallBackTest.java diff --git a/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java b/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java index 10d21942c9..ca7d73bac3 100644 --- a/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java +++ b/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java @@ -97,6 +97,8 @@ public class CacheInvalidateEvent implements Serializable { public enum CacheType implements Serializable { REGION_STORE, + STORE, + REGION, REQ_FAILED, LEADER } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index c30f7b6f6f..df95471ffa 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -21,10 +21,13 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.codec.KeyUtils; +import org.tikv.common.event.CacheInvalidateEvent; +import org.tikv.common.event.CacheInvalidateEvent.CacheType; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiKVException; import org.tikv.common.region.RegionErrorReceiver; @@ -43,6 +46,11 @@ public class RegionErrorHandler implements ErrorHandler { private final Function getRegionError; private final RegionManager regionManager; private final RegionErrorReceiver recv; + private final List> cacheInvalidateCallBackList; + + private final ExecutorService callBackThreadPool; + private final int INVALID_STORE_ID = 0; + private final int INVALID_REGION_ID = 0; public RegionErrorHandler( RegionManager regionManager, @@ -51,6 +59,8 @@ public class RegionErrorHandler implements ErrorHandler { this.recv = recv; this.regionManager = regionManager; this.getRegionError = getRegionError; + this.cacheInvalidateCallBackList = regionManager.getCacheInvalidateCallbackList(); + this.callBackThreadPool = regionManager.getCallBackThreadPool(); } @Override @@ -107,6 +117,7 @@ public class RegionErrorHandler implements ErrorHandler { if (!retry) { this.regionManager.invalidateRegion(recv.getRegion()); + notifyRegionLeaderError(recv.getRegion()); } backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); @@ -116,15 +127,14 @@ public class RegionErrorHandler implements ErrorHandler { // this error is reported from raftstore: // store_id requested at the moment is inconsistent with that expected // Solution:re-fetch from PD - long storeId = recv.getRegion().getLeader().getStoreId(); + long storeId = error.getStoreNotMatch().getRequestStoreId(); long actualStoreId = error.getStoreNotMatch().getActualStoreId(); logger.warn( String.format( "Store Not Match happened with region id %d, store id %d, actual store id %d", recv.getRegion().getId(), storeId, actualStoreId)); - - this.regionManager.invalidateRegion(recv.getRegion()); - this.regionManager.invalidateStore(storeId); + // may request store which is not leader. + invalidateRegionStoreCache(recv.getRegion(), storeId); // assume this is a low probability error, do not retry, just re-split the request by // throwing it out. return false; @@ -143,8 +153,6 @@ public class RegionErrorHandler implements ErrorHandler { BackOffFunction.BackOffFuncType.BoServerBusy, new StatusRuntimeException( Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); return true; } else if (error.hasStaleCommand()) { // this error is reported from raftstore: @@ -179,7 +187,7 @@ public class RegionErrorHandler implements ErrorHandler { logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); // For other errors, we only drop cache here. // Upper level may split this task. - invalidateRegionStoreCache(recv.getRegion()); + invalidateRegionStoreCache(recv.getRegion(), recv.getRegion().getLeader().getStoreId()); // retry if raft proposal is dropped, it indicates the store is in the middle of transition if (error.getMessage().contains("Raft ProposalDropped")) { backOffer.doBackOff( @@ -196,6 +204,7 @@ public class RegionErrorHandler implements ErrorHandler { private boolean onRegionEpochNotMatch(BackOffer backOffer, List currentRegions) { if (currentRegions.size() == 0) { this.regionManager.onRegionStale(recv.getRegion()); + notifyRegionCacheInvalidate(recv.getRegion()); return false; } @@ -229,6 +238,7 @@ public class RegionErrorHandler implements ErrorHandler { } if (needInvalidateOld) { + notifyRegionCacheInvalidate(recv.getRegion()); this.regionManager.onRegionStale(recv.getRegion()); } @@ -271,8 +281,51 @@ public class RegionErrorHandler implements ErrorHandler { return recv.getRegion(); } - private void invalidateRegionStoreCache(TiRegion ctxRegion) { + private void notifyRegionRequestError( + TiRegion ctxRegion, long storeId, CacheInvalidateEvent.CacheType type) { + CacheInvalidateEvent event; + // When store(region) id is invalid, + // it implies that the error was not caused by store(region) error. + switch (type) { + case REGION: + case LEADER: + event = new CacheInvalidateEvent(ctxRegion.getId(), INVALID_STORE_ID, true, false, type); + break; + case REGION_STORE: + event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type); + break; + case REQ_FAILED: + event = new CacheInvalidateEvent(INVALID_REGION_ID, INVALID_STORE_ID, false, false, type); + break; + default: + throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type); + } + if (cacheInvalidateCallBackList != null) { + for (Function cacheInvalidateCallBack : + cacheInvalidateCallBackList) { + callBackThreadPool.submit( + () -> { + try { + cacheInvalidateCallBack.apply(event); + } catch (Exception e) { + logger.error(String.format("CacheInvalidCallBack failed %s", e)); + } + }); + } + } + } + + private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) { regionManager.invalidateRegion(ctxRegion); - regionManager.invalidateStore(ctxRegion.getLeader().getStoreId()); + regionManager.invalidateStore(storeId); + notifyRegionRequestError(ctxRegion, storeId, CacheType.REGION_STORE); + } + + private void notifyRegionCacheInvalidate(TiRegion ctxRegion) { + notifyRegionRequestError(ctxRegion, 0, CacheType.REGION); + } + + private void notifyRegionLeaderError(TiRegion ctxRegion) { + notifyRegionRequestError(ctxRegion, 0, CacheType.LEADER); } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 2d84f9988e..37c3d73f75 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -23,14 +23,18 @@ import com.google.protobuf.ByteString; import io.prometheus.client.Histogram; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.TiConfiguration; +import org.tikv.common.event.CacheInvalidateEvent; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.InvalidStoreException; import org.tikv.common.exception.TiClientInternalException; @@ -69,10 +73,36 @@ public class RegionManager { private final TiConfiguration conf; private final ScheduledExecutorService executor; private final StoreHealthyChecker storeChecker; + private final CopyOnWriteArrayList> + cacheInvalidateCallbackList; + private final ExecutorService callBackThreadPool; private AtomicInteger tiflashStoreIndex = new AtomicInteger(0); public RegionManager( TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { + this(conf, pdClient, channelFactory, 1); + } + + public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { + this(conf, pdClient, 1); + } + + public RegionManager( + TiConfiguration conf, ReadOnlyPDClient pdClient, int callBackExecutorThreadNum) { + this.cache = new RegionCache(); + this.pdClient = pdClient; + this.conf = conf; + this.storeChecker = null; + this.executor = null; + this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); + this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum); + } + + public RegionManager( + TiConfiguration conf, + ReadOnlyPDClient pdClient, + ChannelFactory channelFactory, + int callBackExecutorThreadNum) { this.cache = new RegionCache(); this.pdClient = pdClient; this.conf = conf; @@ -83,26 +113,34 @@ public class RegionManager { this.storeChecker = storeChecker; this.executor = Executors.newScheduledThreadPool(1); this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); - } - - public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { - this.cache = new RegionCache(); - this.pdClient = pdClient; - this.conf = conf; - this.storeChecker = null; - this.executor = null; + this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); + this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum); } public synchronized void close() { if (this.executor != null) { this.executor.shutdownNow(); } + this.callBackThreadPool.shutdownNow(); } public ReadOnlyPDClient getPDClient() { return this.pdClient; } + public ExecutorService getCallBackThreadPool() { + return callBackThreadPool; + } + + public List> getCacheInvalidateCallbackList() { + return cacheInvalidateCallbackList; + } + + public void addCacheInvalidateCallback( + Function cacheInvalidateCallback) { + this.cacheInvalidateCallbackList.add(cacheInvalidateCallback); + } + public void invalidateAll() { cache.invalidateAll(); } diff --git a/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java new file mode 100644 index 0000000000..5e4f0a992a --- /dev/null +++ b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java @@ -0,0 +1,130 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import org.junit.Test; +import org.tikv.common.event.CacheInvalidateEvent; +import org.tikv.common.region.RegionManager; +import org.tikv.common.region.RegionStoreClient; +import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; +import org.tikv.common.region.TiStore; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.Errorpb; +import org.tikv.kvproto.Errorpb.EpochNotMatch; +import org.tikv.kvproto.Errorpb.NotLeader; +import org.tikv.kvproto.Errorpb.StoreNotMatch; +import org.tikv.kvproto.Metapb; + +public class CacheInvalidCallBackTest extends MockServerTest { + + private RegionStoreClient createClient( + String version, Function cacheInvalidateCallBack) { + Metapb.Store meta = + Metapb.Store.newBuilder() + .setAddress(LOCAL_ADDR + ":" + port) + .setId(1) + .setState(Metapb.StoreState.Up) + .setVersion(version) + .build(); + TiStore store = new TiStore(meta); + + RegionManager manager = new RegionManager(session.getConf(), session.getPDClient()); + manager.addCacheInvalidateCallback(cacheInvalidateCallBack); + RegionStoreClientBuilder builder = + new RegionStoreClientBuilder( + session.getConf(), session.getChannelFactory(), manager, session.getPDClient()); + + return builder.build(region, store); + } + + @Test + public void testcacheInvalidCallBack() { + String version = "3.0.12"; + CacheInvalidateCallBack cacheInvalidateCallBack = new CacheInvalidateCallBack(); + doRawGetTest(createClient(version, cacheInvalidateCallBack), cacheInvalidateCallBack); + } + + public void doRawGetTest( + RegionStoreClient client, CacheInvalidateCallBack cacheInvalidateCallBack) { + server.put("key1", "value1"); + Optional value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); + assertEquals(ByteString.copyFromUtf8("value1"), value.get()); + try { + server.putError( + "error1", () -> Errorpb.Error.newBuilder().setNotLeader(NotLeader.getDefaultInstance())); + client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("error1")); + fail(); + } catch (Exception e) { + assertEquals(1, cacheInvalidateCallBack.cacheInvalidateEvents.size()); + } + server.putError( + "failure", + () -> Errorpb.Error.newBuilder().setEpochNotMatch(EpochNotMatch.getDefaultInstance())); + try { + client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); + fail(); + } catch (Exception e) { + sleep(1000); + assertEquals(2, cacheInvalidateCallBack.cacheInvalidateEvents.size()); + } + server.putError( + "store_not_match", + () -> Errorpb.Error.newBuilder().setStoreNotMatch(StoreNotMatch.getDefaultInstance())); + try { + client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); + fail(); + } catch (Exception e) { + sleep(1000); + assertEquals(3, cacheInvalidateCallBack.cacheInvalidateEvents.size()); + } + server.clearAllMap(); + client.close(); + } + + private void sleep(int time) { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + fail(); + } + } + + private BackOffer defaultBackOff() { + return ConcreteBackOffer.newCustomBackOff(1000); + } + + static class CacheInvalidateCallBack implements Function { + + public List cacheInvalidateEvents = new ArrayList<>(); + + @Override + public Void apply(CacheInvalidateEvent cacheInvalidateEvent) { + cacheInvalidateEvents.add(cacheInvalidateEvent); + return null; + } + } +}