[close #654] Add RegionCacheInvalidCallBack (#653)

* Close #654 To let the upper layers customize their own behavior when the region cache fails, Add RegionCacheInvalidCallBack.

Signed-off-by: qidi1 <1083369179@qq.com>

* change callback to list

Signed-off-by: qidi1 <1083369179@qq.com>

* format code

Signed-off-by: qidi1 <1083369179@qq.com>

* change as comment

Signed-off-by: qidi1 <1083369179@qq.com>

* change to synchronized

Signed-off-by: qidi1 <1083369179@qq.com>

* change list to copy on write

Signed-off-by: qidi1 <1083369179@qq.com>

* change to muti thread

Signed-off-by: qidi1 <1083369179@qq.com>

* format code

Signed-off-by: qidi1 <1083369179@qq.com>

* add comment

Signed-off-by: qidi1 <1083369179@qq.com>

* change to magical num

Signed-off-by: qidi1 <1083369179@qq.com>

* add comment

Signed-off-by: qidi1 <1083369179@qq.com>

* change log levle

Signed-off-by: qidi1 <1083369179@qq.com>

* Fmt

---------

Signed-off-by: qidi1 <1083369179@qq.com>
Co-authored-by: shi yuhang <52435083+shiyuhang0@users.noreply.github.com>
This commit is contained in:
tomato 2023-01-30 15:17:42 +08:00 committed by GitHub
parent d278e3ad19
commit dda1029b94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 240 additions and 17 deletions

View File

@ -97,6 +97,8 @@ public class CacheInvalidateEvent implements Serializable {
public enum CacheType implements Serializable {
REGION_STORE,
STORE,
REGION,
REQ_FAILED,
LEADER
}

View File

@ -21,10 +21,13 @@ import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.event.CacheInvalidateEvent.CacheType;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.region.RegionErrorReceiver;
@ -43,6 +46,11 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
private final Function<RespT, Errorpb.Error> getRegionError;
private final RegionManager regionManager;
private final RegionErrorReceiver recv;
private final List<Function<CacheInvalidateEvent, Void>> cacheInvalidateCallBackList;
private final ExecutorService callBackThreadPool;
private final int INVALID_STORE_ID = 0;
private final int INVALID_REGION_ID = 0;
public RegionErrorHandler(
RegionManager regionManager,
@ -51,6 +59,8 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
this.recv = recv;
this.regionManager = regionManager;
this.getRegionError = getRegionError;
this.cacheInvalidateCallBackList = regionManager.getCacheInvalidateCallbackList();
this.callBackThreadPool = regionManager.getCallBackThreadPool();
}
@Override
@ -107,6 +117,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
if (!retry) {
this.regionManager.invalidateRegion(recv.getRegion());
notifyRegionLeaderError(recv.getRegion());
}
backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString()));
@ -116,15 +127,14 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
// this error is reported from raftstore:
// store_id requested at the moment is inconsistent with that expected
// Solutionre-fetch from PD
long storeId = recv.getRegion().getLeader().getStoreId();
long storeId = error.getStoreNotMatch().getRequestStoreId();
long actualStoreId = error.getStoreNotMatch().getActualStoreId();
logger.warn(
String.format(
"Store Not Match happened with region id %d, store id %d, actual store id %d",
recv.getRegion().getId(), storeId, actualStoreId));
this.regionManager.invalidateRegion(recv.getRegion());
this.regionManager.invalidateStore(storeId);
// may request store which is not leader.
invalidateRegionStoreCache(recv.getRegion(), storeId);
// assume this is a low probability error, do not retry, just re-split the request by
// throwing it out.
return false;
@ -143,8 +153,6 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
BackOffFunction.BackOffFuncType.BoServerBusy,
new StatusRuntimeException(
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())));
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
return true;
} else if (error.hasStaleCommand()) {
// this error is reported from raftstore:
@ -179,7 +187,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion()));
// For other errors, we only drop cache here.
// Upper level may split this task.
invalidateRegionStoreCache(recv.getRegion());
invalidateRegionStoreCache(recv.getRegion(), recv.getRegion().getLeader().getStoreId());
// retry if raft proposal is dropped, it indicates the store is in the middle of transition
if (error.getMessage().contains("Raft ProposalDropped")) {
backOffer.doBackOff(
@ -196,6 +204,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
private boolean onRegionEpochNotMatch(BackOffer backOffer, List<Metapb.Region> currentRegions) {
if (currentRegions.size() == 0) {
this.regionManager.onRegionStale(recv.getRegion());
notifyRegionCacheInvalidate(recv.getRegion());
return false;
}
@ -229,6 +238,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
}
if (needInvalidateOld) {
notifyRegionCacheInvalidate(recv.getRegion());
this.regionManager.onRegionStale(recv.getRegion());
}
@ -271,8 +281,51 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
return recv.getRegion();
}
private void invalidateRegionStoreCache(TiRegion ctxRegion) {
private void notifyRegionRequestError(
TiRegion ctxRegion, long storeId, CacheInvalidateEvent.CacheType type) {
CacheInvalidateEvent event;
// When store(region) id is invalid,
// it implies that the error was not caused by store(region) error.
switch (type) {
case REGION:
case LEADER:
event = new CacheInvalidateEvent(ctxRegion.getId(), INVALID_STORE_ID, true, false, type);
break;
case REGION_STORE:
event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type);
break;
case REQ_FAILED:
event = new CacheInvalidateEvent(INVALID_REGION_ID, INVALID_STORE_ID, false, false, type);
break;
default:
throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type);
}
if (cacheInvalidateCallBackList != null) {
for (Function<CacheInvalidateEvent, Void> cacheInvalidateCallBack :
cacheInvalidateCallBackList) {
callBackThreadPool.submit(
() -> {
try {
cacheInvalidateCallBack.apply(event);
} catch (Exception e) {
logger.error(String.format("CacheInvalidCallBack failed %s", e));
}
});
}
}
}
private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) {
regionManager.invalidateRegion(ctxRegion);
regionManager.invalidateStore(ctxRegion.getLeader().getStoreId());
regionManager.invalidateStore(storeId);
notifyRegionRequestError(ctxRegion, storeId, CacheType.REGION_STORE);
}
private void notifyRegionCacheInvalidate(TiRegion ctxRegion) {
notifyRegionRequestError(ctxRegion, 0, CacheType.REGION);
}
private void notifyRegionLeaderError(TiRegion ctxRegion) {
notifyRegionRequestError(ctxRegion, 0, CacheType.LEADER);
}
}

View File

@ -23,14 +23,18 @@ import com.google.protobuf.ByteString;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.InvalidStoreException;
import org.tikv.common.exception.TiClientInternalException;
@ -69,10 +73,36 @@ public class RegionManager {
private final TiConfiguration conf;
private final ScheduledExecutorService executor;
private final StoreHealthyChecker storeChecker;
private final CopyOnWriteArrayList<Function<CacheInvalidateEvent, Void>>
cacheInvalidateCallbackList;
private final ExecutorService callBackThreadPool;
private AtomicInteger tiflashStoreIndex = new AtomicInteger(0);
public RegionManager(
TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) {
this(conf, pdClient, channelFactory, 1);
}
public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
this(conf, pdClient, 1);
}
public RegionManager(
TiConfiguration conf, ReadOnlyPDClient pdClient, int callBackExecutorThreadNum) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
this.storeChecker = null;
this.executor = null;
this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
}
public RegionManager(
TiConfiguration conf,
ReadOnlyPDClient pdClient,
ChannelFactory channelFactory,
int callBackExecutorThreadNum) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
@ -83,26 +113,34 @@ public class RegionManager {
this.storeChecker = storeChecker;
this.executor = Executors.newScheduledThreadPool(1);
this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS);
}
public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
this.storeChecker = null;
this.executor = null;
this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
}
public synchronized void close() {
if (this.executor != null) {
this.executor.shutdownNow();
}
this.callBackThreadPool.shutdownNow();
}
public ReadOnlyPDClient getPDClient() {
return this.pdClient;
}
public ExecutorService getCallBackThreadPool() {
return callBackThreadPool;
}
public List<Function<CacheInvalidateEvent, Void>> getCacheInvalidateCallbackList() {
return cacheInvalidateCallbackList;
}
public void addCacheInvalidateCallback(
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
this.cacheInvalidateCallbackList.add(cacheInvalidateCallback);
}
public void invalidateAll() {
cache.invalidateAll();
}

View File

@ -0,0 +1,130 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.junit.Test;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Errorpb.EpochNotMatch;
import org.tikv.kvproto.Errorpb.NotLeader;
import org.tikv.kvproto.Errorpb.StoreNotMatch;
import org.tikv.kvproto.Metapb;
public class CacheInvalidCallBackTest extends MockServerTest {
private RegionStoreClient createClient(
String version, Function<CacheInvalidateEvent, Void> cacheInvalidateCallBack) {
Metapb.Store meta =
Metapb.Store.newBuilder()
.setAddress(LOCAL_ADDR + ":" + port)
.setId(1)
.setState(Metapb.StoreState.Up)
.setVersion(version)
.build();
TiStore store = new TiStore(meta);
RegionManager manager = new RegionManager(session.getConf(), session.getPDClient());
manager.addCacheInvalidateCallback(cacheInvalidateCallBack);
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(
session.getConf(), session.getChannelFactory(), manager, session.getPDClient());
return builder.build(region, store);
}
@Test
public void testcacheInvalidCallBack() {
String version = "3.0.12";
CacheInvalidateCallBack cacheInvalidateCallBack = new CacheInvalidateCallBack();
doRawGetTest(createClient(version, cacheInvalidateCallBack), cacheInvalidateCallBack);
}
public void doRawGetTest(
RegionStoreClient client, CacheInvalidateCallBack cacheInvalidateCallBack) {
server.put("key1", "value1");
Optional<ByteString> value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1"));
assertEquals(ByteString.copyFromUtf8("value1"), value.get());
try {
server.putError(
"error1", () -> Errorpb.Error.newBuilder().setNotLeader(NotLeader.getDefaultInstance()));
client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("error1"));
fail();
} catch (Exception e) {
assertEquals(1, cacheInvalidateCallBack.cacheInvalidateEvents.size());
}
server.putError(
"failure",
() -> Errorpb.Error.newBuilder().setEpochNotMatch(EpochNotMatch.getDefaultInstance()));
try {
client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure"));
fail();
} catch (Exception e) {
sleep(1000);
assertEquals(2, cacheInvalidateCallBack.cacheInvalidateEvents.size());
}
server.putError(
"store_not_match",
() -> Errorpb.Error.newBuilder().setStoreNotMatch(StoreNotMatch.getDefaultInstance()));
try {
client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure"));
fail();
} catch (Exception e) {
sleep(1000);
assertEquals(3, cacheInvalidateCallBack.cacheInvalidateEvents.size());
}
server.clearAllMap();
client.close();
}
private void sleep(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
fail();
}
}
private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(1000);
}
static class CacheInvalidateCallBack implements Function<CacheInvalidateEvent, Void> {
public List<CacheInvalidateEvent> cacheInvalidateEvents = new ArrayList<>();
@Override
public Void apply(CacheInvalidateEvent cacheInvalidateEvent) {
cacheInvalidateEvents.add(cacheInvalidateEvent);
return null;
}
}
}