refactor kverrorhandler (#196)

* refactor kverrorhandler

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
This commit is contained in:
Wallace 2021-06-16 22:15:49 +08:00 committed by GitHub
parent 7fe64c60e7
commit 4a401776d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 228 additions and 193 deletions

View File

@ -19,20 +19,14 @@ package org.tikv.common.operation;
import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLockFast;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.Collections;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.KeyException;
import org.tikv.common.region.RegionErrorReceiver;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Kvrpcpb;
@ -43,16 +37,12 @@ import org.tikv.txn.ResolveLockResult;
// TODO: consider refactor to Builder mode
public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
private static final Logger logger = LoggerFactory.getLogger(KVErrorHandler.class);
// if a store does not have leader currently, store id is set to 0
private static final int NO_LEADER_STORE_ID = 0;
private final Function<RespT, Errorpb.Error> getRegionError;
private final Function<RespT, Kvrpcpb.KeyError> getKeyError;
private final Function<ResolveLockResult, Object> resolveLockResultCallback;
private final RegionManager regionManager;
private final RegionErrorReceiver recv;
private final AbstractLockResolverClient lockResolverClient;
private final long callerStartTS;
private final boolean forWrite;
private final RegionErrorHandler<RespT> regionHandler;
public KVErrorHandler(
RegionManager regionManager,
@ -63,42 +53,14 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
Function<ResolveLockResult, Object> resolveLockResultCallback,
long callerStartTS,
boolean forWrite) {
this.recv = recv;
this.regionHandler = new RegionErrorHandler<>(regionManager, recv, getRegionError);
this.lockResolverClient = lockResolverClient;
this.regionManager = regionManager;
this.getRegionError = getRegionError;
this.getKeyError = getKeyError;
this.resolveLockResultCallback = resolveLockResultCallback;
this.callerStartTS = callerStartTS;
this.forWrite = forWrite;
}
public KVErrorHandler(
RegionManager regionManager,
RegionErrorReceiver recv,
Function<RespT, Errorpb.Error> getRegionError) {
this.recv = recv;
this.lockResolverClient = null;
this.regionManager = regionManager;
this.getRegionError = getRegionError;
this.getKeyError = resp -> null;
this.resolveLockResultCallback = resolveLock -> null;
this.callerStartTS = 0;
this.forWrite = false;
}
private Errorpb.Error getRegionError(RespT resp) {
if (getRegionError != null) {
return getRegionError.apply(resp);
}
return null;
}
private void invalidateRegionStoreCache(TiRegion ctxRegion) {
regionManager.invalidateRegion(ctxRegion);
regionManager.invalidateStore(ctxRegion.getLeader().getStoreId());
}
private void resolveLock(BackOffer backOffer, Lock lock) {
if (lockResolverClient != null) {
logger.warn("resolving lock");
@ -123,159 +85,33 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
@Override
public boolean handleResponseError(BackOffer backOffer, RespT resp) {
if (resp == null) {
String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion());
String msg =
String.format("Request Failed with unknown reason for [%s]", regionHandler.getRegion());
logger.warn(msg);
return handleRequestError(backOffer, new GrpcException(msg));
}
// Region error handling logic
Errorpb.Error error = getRegionError(resp);
Errorpb.Error error = regionHandler.getRegionError(resp);
if (error != null) {
if (error.hasNotLeader()) {
// this error is reported from raftstore:
// peer of current request is not leader, the following might be its causes:
// 1. cache is outdated, region has changed its leader, can be solved by re-fetching from PD
// 2. leader of current region is missing, need to wait and then fetch region info from PD
long newStoreId = error.getNotLeader().getLeader().getStoreId();
boolean retry;
// update Leader here
logger.warn(
String.format(
"NotLeader Error with region id %d and store id %d, new store id %d",
recv.getRegion().getId(), recv.getRegion().getLeader().getStoreId(), newStoreId));
BackOffFunction.BackOffFuncType backOffFuncType;
// if there's current no leader, we do not trigger update pd cache logic
// since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail.
if (newStoreId != NO_LEADER_STORE_ID) {
// If update leader fails, we need to fetch new region info from pd,
// and re-split key range for new region. Setting retry to false will
// stop retry and enter handleCopResponse logic, which would use RegionMiss
// backOff strategy to wait, fetch new region and re-split key range.
// onNotLeader is only needed when updateLeader succeeds, thus switch
// to a new store address.
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
retry =
newRegion != null
&& recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion);
backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
} else {
logger.info(
String.format(
"Received zero store id, from region %d try next time",
recv.getRegion().getId()));
backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss;
retry = false;
}
if (!retry) {
this.regionManager.invalidateRegion(recv.getRegion());
}
backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString()));
return retry;
} else if (error.hasStoreNotMatch()) {
// this error is reported from raftstore:
// store_id requested at the moment is inconsistent with that expected
// Solutionre-fetch from PD
long storeId = recv.getRegion().getLeader().getStoreId();
long actualStoreId = error.getStoreNotMatch().getActualStoreId();
logger.warn(
String.format(
"Store Not Match happened with region id %d, store id %d, actual store id %d",
recv.getRegion().getId(), storeId, actualStoreId));
this.regionManager.invalidateRegion(recv.getRegion());
this.regionManager.invalidateStore(storeId);
// recv.onStoreNotMatch(this.regionManager.getStoreById(storeId));
// assume this is a low probability error, do not retry, just re-split the request by
// throwing it out.
return false;
} else if (error.hasEpochNotMatch()) {
// this error is reported from raftstore:
// region has outdated versionplease try later.
logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion()));
this.regionManager.onRegionStale(recv.getRegion());
return false;
} else if (error.hasServerIsBusy()) {
// this error is reported from kv:
// will occur when write pressure is high. Please try later.
logger.warn(
String.format(
"Server is busy for region [%s], reason: %s",
recv.getRegion(), error.getServerIsBusy().getReason()));
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoServerBusy,
new StatusRuntimeException(
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())));
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
return true;
} else if (error.hasStaleCommand()) {
// this error is reported from raftstore:
// command outdated, please try later
logger.warn(String.format("Stale command for region [%s]", recv.getRegion()));
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
return true;
} else if (error.hasRaftEntryTooLarge()) {
logger.warn(String.format("Raft too large for region [%s]", recv.getRegion()));
throw new StatusRuntimeException(
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()));
} else if (error.hasKeyNotInRegion()) {
// this error is reported from raftstore:
// key requested is not in current region
// should not happen here.
ByteString invalidKey = error.getKeyNotInRegion().getKey();
logger.error(
String.format(
"Key not in region [%s] for key [%s], this error should not happen here.",
recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey)));
throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString()));
}
logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion()));
// For other errors, we only drop cache here.
// Upper level may split this task.
invalidateRegionStoreCache(recv.getRegion());
// retry if raft proposal is dropped, it indicates the store is in the middle of transition
if (error.getMessage().contains("Raft ProposalDropped")) {
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
return true;
}
return regionHandler.handleRegionError(backOffer, error);
}
boolean retry = false;
// Key error handling logic
Kvrpcpb.KeyError keyError = getKeyError.apply(resp);
if (keyError != null) {
try {
Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(keyError);
resolveLock(backOffer, lock);
retry = true;
return true;
} catch (KeyException e) {
logger.warn("Unable to handle KeyExceptions other than LockException", e);
}
}
return retry;
return false;
}
@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
regionManager.onRequestFail(recv.getRegion());
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoTiKVRPC,
new GrpcException(
"send tikv request error: " + e.getMessage() + ", try next peer later", e));
// TiKV maybe down, so do not retry in `callWithRetry`
// should re-fetch the new leader from PD and send request to it
return false;
return regionHandler.handleRequestError(backOffer, e);
}
}

View File

@ -0,0 +1,198 @@
package org.tikv.common.operation;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.region.RegionErrorReceiver;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.kvproto.Errorpb;
public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
private static final Logger logger = LoggerFactory.getLogger(RegionErrorHandler.class);
// if a store does not have leader currently, store id is set to 0
private static final int NO_LEADER_STORE_ID = 0;
private final Function<RespT, Errorpb.Error> getRegionError;
private final RegionManager regionManager;
private final RegionErrorReceiver recv;
public RegionErrorHandler(
RegionManager regionManager,
RegionErrorReceiver recv,
Function<RespT, Errorpb.Error> getRegionError) {
this.recv = recv;
this.regionManager = regionManager;
this.getRegionError = getRegionError;
}
@Override
public boolean handleResponseError(BackOffer backOffer, RespT resp) {
if (resp == null) {
String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion());
logger.warn(msg);
return handleRequestError(backOffer, new GrpcException(msg));
}
// Region error handling logic
Errorpb.Error error = getRegionError(resp);
if (error != null) {
return handleRegionError(backOffer, error);
}
return false;
}
public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
if (error.hasNotLeader()) {
// this error is reported from raftstore:
// peer of current request is not leader, the following might be its causes:
// 1. cache is outdated, region has changed its leader, can be solved by re-fetching from PD
// 2. leader of current region is missing, need to wait and then fetch region info from PD
long newStoreId = error.getNotLeader().getLeader().getStoreId();
boolean retry;
// update Leader here
logger.warn(
String.format(
"NotLeader Error with region id %d and store id %d, new store id %d",
recv.getRegion().getId(), recv.getRegion().getLeader().getStoreId(), newStoreId));
BackOffFunction.BackOffFuncType backOffFuncType;
// if there's current no leader, we do not trigger update pd cache logic
// since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail.
if (newStoreId != NO_LEADER_STORE_ID) {
// If update leader fails, we need to fetch new region info from pd,
// and re-split key range for new region. Setting retry to false will
// stop retry and enter handleCopResponse logic, which would use RegionMiss
// backOff strategy to wait, fetch new region and re-split key range.
// onNotLeader is only needed when updateLeader succeeds, thus switch
// to a new store address.
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
retry =
newRegion != null
&& recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion);
backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
} else {
logger.info(
String.format(
"Received zero store id, from region %d try next time", recv.getRegion().getId()));
backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss;
retry = false;
}
if (!retry) {
this.regionManager.invalidateRegion(recv.getRegion());
}
backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString()));
return retry;
} else if (error.hasStoreNotMatch()) {
// this error is reported from raftstore:
// store_id requested at the moment is inconsistent with that expected
// Solutionre-fetch from PD
long storeId = recv.getRegion().getLeader().getStoreId();
long actualStoreId = error.getStoreNotMatch().getActualStoreId();
logger.warn(
String.format(
"Store Not Match happened with region id %d, store id %d, actual store id %d",
recv.getRegion().getId(), storeId, actualStoreId));
this.regionManager.invalidateRegion(recv.getRegion());
this.regionManager.invalidateStore(storeId);
// recv.onStoreNotMatch(this.regionManager.getStoreById(storeId));
// assume this is a low probability error, do not retry, just re-split the request by
// throwing it out.
return false;
} else if (error.hasEpochNotMatch()) {
// this error is reported from raftstore:
// region has outdated versionplease try later.
logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion()));
this.regionManager.onRegionStale(recv.getRegion());
return false;
} else if (error.hasServerIsBusy()) {
// this error is reported from kv:
// will occur when write pressure is high. Please try later.
logger.warn(
String.format(
"Server is busy for region [%s], reason: %s",
recv.getRegion(), error.getServerIsBusy().getReason()));
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoServerBusy,
new StatusRuntimeException(
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())));
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
return true;
} else if (error.hasStaleCommand()) {
// this error is reported from raftstore:
// command outdated, please try later
logger.warn(String.format("Stale command for region [%s]", recv.getRegion()));
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
return true;
} else if (error.hasRaftEntryTooLarge()) {
logger.warn(String.format("Raft too large for region [%s]", recv.getRegion()));
throw new StatusRuntimeException(
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()));
} else if (error.hasKeyNotInRegion()) {
// this error is reported from raftstore:
// key requested is not in current region
// should not happen here.
ByteString invalidKey = error.getKeyNotInRegion().getKey();
logger.error(
String.format(
"Key not in region [%s] for key [%s], this error should not happen here.",
recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey)));
throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString()));
}
logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion()));
// For other errors, we only drop cache here.
// Upper level may split this task.
invalidateRegionStoreCache(recv.getRegion());
// retry if raft proposal is dropped, it indicates the store is in the middle of transition
if (error.getMessage().contains("Raft ProposalDropped")) {
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
return true;
}
return false;
}
@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
regionManager.onRequestFail(recv.getRegion());
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoTiKVRPC,
new GrpcException(
"send tikv request error: " + e.getMessage() + ", try next peer later", e));
// TiKV maybe down, so do not retry in `callWithRetry`
// should re-fetch the new leader from PD and send request to it
return false;
}
public Errorpb.Error getRegionError(RespT resp) {
if (getRegionError != null) {
return getRegionError.apply(resp);
}
return null;
}
public TiRegion getRegion() {
return recv.getRegion();
}
private void invalidateRegionStoreCache(TiRegion ctxRegion) {
regionManager.invalidateRegion(ctxRegion);
regionManager.invalidateStore(ctxRegion.getLeader().getStoreId());
}
}

View File

@ -38,6 +38,7 @@ import org.tikv.common.TiConfiguration;
import org.tikv.common.Version;
import org.tikv.common.exception.*;
import org.tikv.common.operation.KVErrorHandler;
import org.tikv.common.operation.RegionErrorHandler;
import org.tikv.common.streaming.StreamingResponse;
import org.tikv.common.util.*;
import org.tikv.kvproto.Coprocessor;
@ -806,8 +807,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.build();
KVErrorHandler<RawGetResponse> handler =
new KVErrorHandler<>(
RegionErrorHandler<RawGetResponse> handler =
new RegionErrorHandler<RawGetResponse>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawGetResponse resp = callWithRetry(backOffer, TikvGrpc.getRawGetMethod(), factory, handler);
return rawGetHelper(resp);
@ -841,8 +842,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.build();
KVErrorHandler<RawGetKeyTTLResponse> handler =
new KVErrorHandler<>(
RegionErrorHandler<RawGetKeyTTLResponse> handler =
new RegionErrorHandler<RawGetKeyTTLResponse>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawGetKeyTTLResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawGetKeyTTLMethod(), factory, handler);
@ -881,8 +882,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setKey(key)
.build();
KVErrorHandler<RawDeleteResponse> handler =
new KVErrorHandler<>(
RegionErrorHandler<RawDeleteResponse> handler =
new RegionErrorHandler<RawDeleteResponse>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawDeleteResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawDeleteMethod(), factory, handler);
@ -919,8 +920,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setTtl(ttl)
.build();
KVErrorHandler<RawPutResponse> handler =
new KVErrorHandler<>(
RegionErrorHandler<RawPutResponse> handler =
new RegionErrorHandler<RawPutResponse>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawPutResponse resp = callWithRetry(backOffer, TikvGrpc.getRawPutMethod(), factory, handler);
rawPutHelper(resp);
@ -958,8 +959,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setTtl(ttl)
.build();
KVErrorHandler<RawCASResponse> handler =
new KVErrorHandler<>(
RegionErrorHandler<RawCASResponse> handler =
new RegionErrorHandler<RawCASResponse>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawCASResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawCompareAndSwapMethod(), factory, handler);
@ -1000,8 +1001,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setContext(region.getReplicaContext(storeType))
.addAllKeys(keys)
.build();
KVErrorHandler<RawBatchGetResponse> handler =
new KVErrorHandler<>(
RegionErrorHandler<RawBatchGetResponse> handler =
new RegionErrorHandler<RawBatchGetResponse>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawBatchGetResponse resp =
callWithRetry(backoffer, TikvGrpc.getRawBatchGetMethod(), factory, handler);
@ -1037,8 +1038,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setTtl(ttl)
.setForCas(atomic)
.build();
KVErrorHandler<RawBatchPutResponse> handler =
new KVErrorHandler<>(
RegionErrorHandler<RawBatchPutResponse> handler =
new RegionErrorHandler<RawBatchPutResponse>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawBatchPutResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawBatchPutMethod(), factory, handler);
@ -1088,8 +1089,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.addAllKeys(keys)
.setForCas(atomic)
.build();
KVErrorHandler<RawBatchDeleteResponse> handler =
new KVErrorHandler<>(
RegionErrorHandler<RawBatchDeleteResponse> handler =
new RegionErrorHandler<RawBatchDeleteResponse>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawBatchDeleteResponse resp =
callWithRetry(backoffer, TikvGrpc.getRawBatchDeleteMethod(), factory, handler);
@ -1135,8 +1136,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setLimit(limit)
.build();
KVErrorHandler<RawScanResponse> handler =
new KVErrorHandler<>(
RegionErrorHandler<RawScanResponse> handler =
new RegionErrorHandler<RawScanResponse>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawScanResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawScanMethod(), factory, handler);
@ -1180,8 +1181,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setEndKey(endKey)
.build();
KVErrorHandler<RawDeleteRangeResponse> handler =
new KVErrorHandler<>(
RegionErrorHandler<RawDeleteRangeResponse> handler =
new RegionErrorHandler<RawDeleteRangeResponse>(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawDeleteRangeResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawDeleteRangeMethod(), factory, handler);