From 4a401776d16a28e470dda622db61e5cd35f21daf Mon Sep 17 00:00:00 2001 From: Wallace Date: Wed, 16 Jun 2021 22:15:49 +0800 Subject: [PATCH] refactor kverrorhandler (#196) * refactor kverrorhandler Signed-off-by: Little-Wallace --- .../tikv/common/operation/KVErrorHandler.java | 182 +--------------- .../common/operation/RegionErrorHandler.java | 198 ++++++++++++++++++ .../tikv/common/region/RegionStoreClient.java | 41 ++-- 3 files changed, 228 insertions(+), 193 deletions(-) create mode 100644 src/main/java/org/tikv/common/operation/RegionErrorHandler.java diff --git a/src/main/java/org/tikv/common/operation/KVErrorHandler.java b/src/main/java/org/tikv/common/operation/KVErrorHandler.java index 55bbdef44e..3321b56074 100644 --- a/src/main/java/org/tikv/common/operation/KVErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/KVErrorHandler.java @@ -19,20 +19,14 @@ package org.tikv.common.operation; import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLockFast; -import com.google.protobuf.ByteString; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; import java.util.Collections; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.KeyException; import org.tikv.common.region.RegionErrorReceiver; import org.tikv.common.region.RegionManager; -import org.tikv.common.region.TiRegion; -import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Kvrpcpb; @@ -43,16 +37,12 @@ import org.tikv.txn.ResolveLockResult; // TODO: consider refactor to Builder mode public class KVErrorHandler implements ErrorHandler { private static final Logger logger = LoggerFactory.getLogger(KVErrorHandler.class); - // if a store does not have leader currently, store id is set to 0 - private static final int NO_LEADER_STORE_ID = 0; - private final Function getRegionError; private final Function getKeyError; private final Function resolveLockResultCallback; - private final RegionManager regionManager; - private final RegionErrorReceiver recv; private final AbstractLockResolverClient lockResolverClient; private final long callerStartTS; private final boolean forWrite; + private final RegionErrorHandler regionHandler; public KVErrorHandler( RegionManager regionManager, @@ -63,42 +53,14 @@ public class KVErrorHandler implements ErrorHandler { Function resolveLockResultCallback, long callerStartTS, boolean forWrite) { - this.recv = recv; + this.regionHandler = new RegionErrorHandler<>(regionManager, recv, getRegionError); this.lockResolverClient = lockResolverClient; - this.regionManager = regionManager; - this.getRegionError = getRegionError; this.getKeyError = getKeyError; this.resolveLockResultCallback = resolveLockResultCallback; this.callerStartTS = callerStartTS; this.forWrite = forWrite; } - public KVErrorHandler( - RegionManager regionManager, - RegionErrorReceiver recv, - Function getRegionError) { - this.recv = recv; - this.lockResolverClient = null; - this.regionManager = regionManager; - this.getRegionError = getRegionError; - this.getKeyError = resp -> null; - this.resolveLockResultCallback = resolveLock -> null; - this.callerStartTS = 0; - this.forWrite = false; - } - - private Errorpb.Error getRegionError(RespT resp) { - if (getRegionError != null) { - return getRegionError.apply(resp); - } - return null; - } - - private void invalidateRegionStoreCache(TiRegion ctxRegion) { - regionManager.invalidateRegion(ctxRegion); - regionManager.invalidateStore(ctxRegion.getLeader().getStoreId()); - } - private void resolveLock(BackOffer backOffer, Lock lock) { if (lockResolverClient != null) { logger.warn("resolving lock"); @@ -123,159 +85,33 @@ public class KVErrorHandler implements ErrorHandler { @Override public boolean handleResponseError(BackOffer backOffer, RespT resp) { if (resp == null) { - String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion()); + String msg = + String.format("Request Failed with unknown reason for [%s]", regionHandler.getRegion()); logger.warn(msg); return handleRequestError(backOffer, new GrpcException(msg)); } - // Region error handling logic - Errorpb.Error error = getRegionError(resp); + Errorpb.Error error = regionHandler.getRegionError(resp); if (error != null) { - if (error.hasNotLeader()) { - // this error is reported from raftstore: - // peer of current request is not leader, the following might be its causes: - // 1. cache is outdated, region has changed its leader, can be solved by re-fetching from PD - // 2. leader of current region is missing, need to wait and then fetch region info from PD - long newStoreId = error.getNotLeader().getLeader().getStoreId(); - boolean retry; - - // update Leader here - logger.warn( - String.format( - "NotLeader Error with region id %d and store id %d, new store id %d", - recv.getRegion().getId(), recv.getRegion().getLeader().getStoreId(), newStoreId)); - - BackOffFunction.BackOffFuncType backOffFuncType; - // if there's current no leader, we do not trigger update pd cache logic - // since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail. - if (newStoreId != NO_LEADER_STORE_ID) { - // If update leader fails, we need to fetch new region info from pd, - // and re-split key range for new region. Setting retry to false will - // stop retry and enter handleCopResponse logic, which would use RegionMiss - // backOff strategy to wait, fetch new region and re-split key range. - // onNotLeader is only needed when updateLeader succeeds, thus switch - // to a new store address. - TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); - retry = - newRegion != null - && recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion); - - backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; - } else { - logger.info( - String.format( - "Received zero store id, from region %d try next time", - recv.getRegion().getId())); - - backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss; - retry = false; - } - - if (!retry) { - this.regionManager.invalidateRegion(recv.getRegion()); - } - - backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); - - return retry; - } else if (error.hasStoreNotMatch()) { - // this error is reported from raftstore: - // store_id requested at the moment is inconsistent with that expected - // Solution:re-fetch from PD - long storeId = recv.getRegion().getLeader().getStoreId(); - long actualStoreId = error.getStoreNotMatch().getActualStoreId(); - logger.warn( - String.format( - "Store Not Match happened with region id %d, store id %d, actual store id %d", - recv.getRegion().getId(), storeId, actualStoreId)); - - this.regionManager.invalidateRegion(recv.getRegion()); - this.regionManager.invalidateStore(storeId); - // recv.onStoreNotMatch(this.regionManager.getStoreById(storeId)); - // assume this is a low probability error, do not retry, just re-split the request by - // throwing it out. - return false; - } else if (error.hasEpochNotMatch()) { - // this error is reported from raftstore: - // region has outdated version,please try later. - logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion())); - this.regionManager.onRegionStale(recv.getRegion()); - return false; - } else if (error.hasServerIsBusy()) { - // this error is reported from kv: - // will occur when write pressure is high. Please try later. - logger.warn( - String.format( - "Server is busy for region [%s], reason: %s", - recv.getRegion(), error.getServerIsBusy().getReason())); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoServerBusy, - new StatusRuntimeException( - Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); - return true; - } else if (error.hasStaleCommand()) { - // this error is reported from raftstore: - // command outdated, please try later - logger.warn(String.format("Stale command for region [%s]", recv.getRegion())); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); - return true; - } else if (error.hasRaftEntryTooLarge()) { - logger.warn(String.format("Raft too large for region [%s]", recv.getRegion())); - throw new StatusRuntimeException( - Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())); - } else if (error.hasKeyNotInRegion()) { - // this error is reported from raftstore: - // key requested is not in current region - // should not happen here. - ByteString invalidKey = error.getKeyNotInRegion().getKey(); - logger.error( - String.format( - "Key not in region [%s] for key [%s], this error should not happen here.", - recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey))); - throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString())); - } - - logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); - // For other errors, we only drop cache here. - // Upper level may split this task. - invalidateRegionStoreCache(recv.getRegion()); - // retry if raft proposal is dropped, it indicates the store is in the middle of transition - if (error.getMessage().contains("Raft ProposalDropped")) { - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); - return true; - } + return regionHandler.handleRegionError(backOffer, error); } - boolean retry = false; - // Key error handling logic Kvrpcpb.KeyError keyError = getKeyError.apply(resp); if (keyError != null) { try { Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(keyError); resolveLock(backOffer, lock); - retry = true; + return true; } catch (KeyException e) { logger.warn("Unable to handle KeyExceptions other than LockException", e); } } - return retry; + return false; } @Override public boolean handleRequestError(BackOffer backOffer, Exception e) { - regionManager.onRequestFail(recv.getRegion()); - - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoTiKVRPC, - new GrpcException( - "send tikv request error: " + e.getMessage() + ", try next peer later", e)); - // TiKV maybe down, so do not retry in `callWithRetry` - // should re-fetch the new leader from PD and send request to it - return false; + return regionHandler.handleRequestError(backOffer, e); } } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java new file mode 100644 index 0000000000..4137854e75 --- /dev/null +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -0,0 +1,198 @@ +package org.tikv.common.operation; + +import com.google.protobuf.ByteString; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.codec.KeyUtils; +import org.tikv.common.exception.GrpcException; +import org.tikv.common.region.RegionErrorReceiver; +import org.tikv.common.region.RegionManager; +import org.tikv.common.region.TiRegion; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffer; +import org.tikv.kvproto.Errorpb; + +public class RegionErrorHandler implements ErrorHandler { + private static final Logger logger = LoggerFactory.getLogger(RegionErrorHandler.class); + // if a store does not have leader currently, store id is set to 0 + private static final int NO_LEADER_STORE_ID = 0; + private final Function getRegionError; + private final RegionManager regionManager; + private final RegionErrorReceiver recv; + + public RegionErrorHandler( + RegionManager regionManager, + RegionErrorReceiver recv, + Function getRegionError) { + this.recv = recv; + this.regionManager = regionManager; + this.getRegionError = getRegionError; + } + + @Override + public boolean handleResponseError(BackOffer backOffer, RespT resp) { + if (resp == null) { + String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion()); + logger.warn(msg); + return handleRequestError(backOffer, new GrpcException(msg)); + } + // Region error handling logic + Errorpb.Error error = getRegionError(resp); + if (error != null) { + return handleRegionError(backOffer, error); + } + return false; + } + + public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { + if (error.hasNotLeader()) { + // this error is reported from raftstore: + // peer of current request is not leader, the following might be its causes: + // 1. cache is outdated, region has changed its leader, can be solved by re-fetching from PD + // 2. leader of current region is missing, need to wait and then fetch region info from PD + long newStoreId = error.getNotLeader().getLeader().getStoreId(); + boolean retry; + + // update Leader here + logger.warn( + String.format( + "NotLeader Error with region id %d and store id %d, new store id %d", + recv.getRegion().getId(), recv.getRegion().getLeader().getStoreId(), newStoreId)); + + BackOffFunction.BackOffFuncType backOffFuncType; + // if there's current no leader, we do not trigger update pd cache logic + // since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail. + if (newStoreId != NO_LEADER_STORE_ID) { + // If update leader fails, we need to fetch new region info from pd, + // and re-split key range for new region. Setting retry to false will + // stop retry and enter handleCopResponse logic, which would use RegionMiss + // backOff strategy to wait, fetch new region and re-split key range. + // onNotLeader is only needed when updateLeader succeeds, thus switch + // to a new store address. + TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); + retry = + newRegion != null + && recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion); + + backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; + } else { + logger.info( + String.format( + "Received zero store id, from region %d try next time", recv.getRegion().getId())); + + backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss; + retry = false; + } + + if (!retry) { + this.regionManager.invalidateRegion(recv.getRegion()); + } + + backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); + + return retry; + } else if (error.hasStoreNotMatch()) { + // this error is reported from raftstore: + // store_id requested at the moment is inconsistent with that expected + // Solution:re-fetch from PD + long storeId = recv.getRegion().getLeader().getStoreId(); + long actualStoreId = error.getStoreNotMatch().getActualStoreId(); + logger.warn( + String.format( + "Store Not Match happened with region id %d, store id %d, actual store id %d", + recv.getRegion().getId(), storeId, actualStoreId)); + + this.regionManager.invalidateRegion(recv.getRegion()); + this.regionManager.invalidateStore(storeId); + // recv.onStoreNotMatch(this.regionManager.getStoreById(storeId)); + // assume this is a low probability error, do not retry, just re-split the request by + // throwing it out. + return false; + } else if (error.hasEpochNotMatch()) { + // this error is reported from raftstore: + // region has outdated version,please try later. + logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion())); + this.regionManager.onRegionStale(recv.getRegion()); + return false; + } else if (error.hasServerIsBusy()) { + // this error is reported from kv: + // will occur when write pressure is high. Please try later. + logger.warn( + String.format( + "Server is busy for region [%s], reason: %s", + recv.getRegion(), error.getServerIsBusy().getReason())); + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoServerBusy, + new StatusRuntimeException( + Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); + return true; + } else if (error.hasStaleCommand()) { + // this error is reported from raftstore: + // command outdated, please try later + logger.warn(String.format("Stale command for region [%s]", recv.getRegion())); + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); + return true; + } else if (error.hasRaftEntryTooLarge()) { + logger.warn(String.format("Raft too large for region [%s]", recv.getRegion())); + throw new StatusRuntimeException( + Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())); + } else if (error.hasKeyNotInRegion()) { + // this error is reported from raftstore: + // key requested is not in current region + // should not happen here. + ByteString invalidKey = error.getKeyNotInRegion().getKey(); + logger.error( + String.format( + "Key not in region [%s] for key [%s], this error should not happen here.", + recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey))); + throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString())); + } + + logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); + // For other errors, we only drop cache here. + // Upper level may split this task. + invalidateRegionStoreCache(recv.getRegion()); + // retry if raft proposal is dropped, it indicates the store is in the middle of transition + if (error.getMessage().contains("Raft ProposalDropped")) { + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); + return true; + } + return false; + } + + @Override + public boolean handleRequestError(BackOffer backOffer, Exception e) { + regionManager.onRequestFail(recv.getRegion()); + + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoTiKVRPC, + new GrpcException( + "send tikv request error: " + e.getMessage() + ", try next peer later", e)); + // TiKV maybe down, so do not retry in `callWithRetry` + // should re-fetch the new leader from PD and send request to it + return false; + } + + public Errorpb.Error getRegionError(RespT resp) { + if (getRegionError != null) { + return getRegionError.apply(resp); + } + return null; + } + + public TiRegion getRegion() { + return recv.getRegion(); + } + + private void invalidateRegionStoreCache(TiRegion ctxRegion) { + regionManager.invalidateRegion(ctxRegion); + regionManager.invalidateStore(ctxRegion.getLeader().getStoreId()); + } +} diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 6ea51aa9cb..e6f37a9949 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -38,6 +38,7 @@ import org.tikv.common.TiConfiguration; import org.tikv.common.Version; import org.tikv.common.exception.*; import org.tikv.common.operation.KVErrorHandler; +import org.tikv.common.operation.RegionErrorHandler; import org.tikv.common.streaming.StreamingResponse; import org.tikv.common.util.*; import org.tikv.kvproto.Coprocessor; @@ -806,8 +807,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setContext(region.getReplicaContext(storeType)) .setKey(key) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawGetResponse resp = callWithRetry(backOffer, TikvGrpc.getRawGetMethod(), factory, handler); return rawGetHelper(resp); @@ -841,8 +842,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setContext(region.getReplicaContext(storeType)) .setKey(key) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawGetKeyTTLResponse resp = callWithRetry(backOffer, TikvGrpc.getRawGetKeyTTLMethod(), factory, handler); @@ -881,8 +882,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setKey(key) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawDeleteResponse resp = callWithRetry(backOffer, TikvGrpc.getRawDeleteMethod(), factory, handler); @@ -919,8 +920,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setTtl(ttl) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawPutResponse resp = callWithRetry(backOffer, TikvGrpc.getRawPutMethod(), factory, handler); rawPutHelper(resp); @@ -958,8 +959,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setTtl(ttl) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawCASResponse resp = callWithRetry(backOffer, TikvGrpc.getRawCompareAndSwapMethod(), factory, handler); @@ -1000,8 +1001,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setContext(region.getReplicaContext(storeType)) .addAllKeys(keys) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawBatchGetResponse resp = callWithRetry(backoffer, TikvGrpc.getRawBatchGetMethod(), factory, handler); @@ -1037,8 +1038,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setTtl(ttl) .setForCas(atomic) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawBatchPutResponse resp = callWithRetry(backOffer, TikvGrpc.getRawBatchPutMethod(), factory, handler); @@ -1088,8 +1089,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .addAllKeys(keys) .setForCas(atomic) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawBatchDeleteResponse resp = callWithRetry(backoffer, TikvGrpc.getRawBatchDeleteMethod(), factory, handler); @@ -1135,8 +1136,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setLimit(limit) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawScanResponse resp = callWithRetry(backOffer, TikvGrpc.getRawScanMethod(), factory, handler); @@ -1180,8 +1181,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient { .setEndKey(endKey) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawDeleteRangeResponse resp = callWithRetry(backOffer, TikvGrpc.getRawDeleteRangeMethod(), factory, handler);