mirror of https://github.com/tikv/client-java.git
Fix store not match (#141)
Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
parent
c9507c2fa7
commit
bee21d4f66
|
@ -151,7 +151,7 @@ public class KVClient implements AutoCloseable {
|
|||
return client.batchGet(backOffer, batch.keys, version);
|
||||
} catch (final TiKVException e) {
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
|
||||
clientBuilder.getRegionManager().invalidateRegion(batch.region);
|
||||
logger.warn("ReSplitting ranges for BatchGetRequest", e);
|
||||
|
||||
// retry
|
||||
|
|
|
@ -437,7 +437,7 @@ public class TiSession implements AutoCloseable {
|
|||
} catch (final TiKVException e) {
|
||||
// retry
|
||||
logger.warn("ReSplitting ranges for splitRegion", e);
|
||||
clientBuilder.getRegionManager().invalidateRegion(region.getId());
|
||||
clientBuilder.getRegionManager().invalidateRegion(region);
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||
newRegions = splitRegion(splits, backOffer);
|
||||
}
|
||||
|
|
|
@ -51,7 +51,6 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
private final RegionManager regionManager;
|
||||
private final RegionErrorReceiver recv;
|
||||
private final AbstractLockResolverClient lockResolverClient;
|
||||
private final TiRegion ctxRegion;
|
||||
private final long callerStartTS;
|
||||
private final boolean forWrite;
|
||||
|
||||
|
@ -59,13 +58,11 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
RegionManager regionManager,
|
||||
RegionErrorReceiver recv,
|
||||
AbstractLockResolverClient lockResolverClient,
|
||||
TiRegion ctxRegion,
|
||||
Function<RespT, Errorpb.Error> getRegionError,
|
||||
Function<RespT, Kvrpcpb.KeyError> getKeyError,
|
||||
Function<ResolveLockResult, Object> resolveLockResultCallback,
|
||||
long callerStartTS,
|
||||
boolean forWrite) {
|
||||
this.ctxRegion = ctxRegion;
|
||||
this.recv = recv;
|
||||
this.lockResolverClient = lockResolverClient;
|
||||
this.regionManager = regionManager;
|
||||
|
@ -79,9 +76,7 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
public KVErrorHandler(
|
||||
RegionManager regionManager,
|
||||
RegionErrorReceiver recv,
|
||||
TiRegion ctxRegion,
|
||||
Function<RespT, Errorpb.Error> getRegionError) {
|
||||
this.ctxRegion = ctxRegion;
|
||||
this.recv = recv;
|
||||
this.lockResolverClient = null;
|
||||
this.regionManager = regionManager;
|
||||
|
@ -100,7 +95,7 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
}
|
||||
|
||||
private void invalidateRegionStoreCache(TiRegion ctxRegion) {
|
||||
regionManager.invalidateRegion(ctxRegion.getId());
|
||||
regionManager.invalidateRegion(ctxRegion);
|
||||
regionManager.invalidateStore(ctxRegion.getLeader().getStoreId());
|
||||
}
|
||||
|
||||
|
@ -129,7 +124,8 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
public boolean handleResponseError(BackOffer backOffer, RespT resp) {
|
||||
if (resp == null) {
|
||||
String msg =
|
||||
String.format("Request Failed with unknown reason for region region [%s]", ctxRegion);
|
||||
String.format(
|
||||
"Request Failed with unknown reason for region region [%s]", recv.getRegion());
|
||||
logger.warn(msg);
|
||||
return handleRequestError(backOffer, new GrpcException(msg));
|
||||
}
|
||||
|
@ -143,37 +139,40 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
// 1. cache is outdated, region has changed its leader, can be solved by re-fetching from PD
|
||||
// 2. leader of current region is missing, need to wait and then fetch region info from PD
|
||||
long newStoreId = error.getNotLeader().getLeader().getStoreId();
|
||||
boolean retry = true;
|
||||
boolean retry;
|
||||
|
||||
// update Leader here
|
||||
logger.warn(
|
||||
String.format(
|
||||
"NotLeader Error with region id %d and store id %d, new store id %d",
|
||||
ctxRegion.getId(), ctxRegion.getLeader().getStoreId(), newStoreId));
|
||||
recv.getRegion().getId(), recv.getRegion().getLeader().getStoreId(), newStoreId));
|
||||
|
||||
BackOffFunction.BackOffFuncType backOffFuncType;
|
||||
// if there's current no leader, we do not trigger update pd cache logic
|
||||
// since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail.
|
||||
if (newStoreId != NO_LEADER_STORE_ID) {
|
||||
if (!this.regionManager.updateLeader(ctxRegion.getId(), newStoreId)
|
||||
|| !recv.onNotLeader(this.regionManager.getStoreById(newStoreId))) {
|
||||
// If update leader fails, we need to fetch new region info from pd,
|
||||
// and re-split key range for new region. Setting retry to false will
|
||||
// stop retry and enter handleCopResponse logic, which would use RegionMiss
|
||||
// backOff strategy to wait, fetch new region and re-split key range.
|
||||
// onNotLeader is only needed when updateLeader succeeds, thus switch
|
||||
// to a new store address.
|
||||
retry = false;
|
||||
}
|
||||
// If update leader fails, we need to fetch new region info from pd,
|
||||
// and re-split key range for new region. Setting retry to false will
|
||||
// stop retry and enter handleCopResponse logic, which would use RegionMiss
|
||||
// backOff strategy to wait, fetch new region and re-split key range.
|
||||
// onNotLeader is only needed when updateLeader succeeds, thus switch
|
||||
// to a new store address.
|
||||
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
|
||||
retry =
|
||||
newRegion != null
|
||||
&& recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion);
|
||||
|
||||
backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
|
||||
} else {
|
||||
logger.info(
|
||||
String.format(
|
||||
"Received zero store id, from region %d try next time", ctxRegion.getId()));
|
||||
backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss;
|
||||
"Received zero store id, from region %d try next time",
|
||||
recv.getRegion().getId()));
|
||||
|
||||
this.regionManager.invalidateRegion(ctxRegion.getId());
|
||||
this.regionManager.invalidateRegion(recv.getRegion());
|
||||
|
||||
backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss;
|
||||
retry = false;
|
||||
}
|
||||
|
||||
backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString()));
|
||||
|
@ -183,20 +182,24 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
// this error is reported from raftstore:
|
||||
// store_id requested at the moment is inconsistent with that expected
|
||||
// Solution:re-fetch from PD
|
||||
long storeId = ctxRegion.getLeader().getStoreId();
|
||||
long storeId = recv.getRegion().getLeader().getStoreId();
|
||||
long actualStoreId = error.getStoreNotMatch().getActualStoreId();
|
||||
logger.warn(
|
||||
String.format(
|
||||
"Store Not Match happened with region id %d, store id %d",
|
||||
ctxRegion.getId(), storeId));
|
||||
"Store Not Match happened with region id %d, store id %d, actual store id %d",
|
||||
recv.getRegion().getId(), storeId, actualStoreId));
|
||||
|
||||
this.regionManager.invalidateRegion(recv.getRegion());
|
||||
this.regionManager.invalidateStore(storeId);
|
||||
recv.onStoreNotMatch(this.regionManager.getStoreById(storeId));
|
||||
return true;
|
||||
// recv.onStoreNotMatch(this.regionManager.getStoreById(storeId));
|
||||
// assume this is a low probability error, do not retry, just re-split the request by
|
||||
// throwing it out.
|
||||
return false;
|
||||
} else if (error.hasEpochNotMatch()) {
|
||||
// this error is reported from raftstore:
|
||||
// region has outdated version,please try later.
|
||||
logger.warn(String.format("Stale Epoch encountered for region [%s]", ctxRegion));
|
||||
this.regionManager.onRegionStale(ctxRegion.getId());
|
||||
logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion()));
|
||||
this.regionManager.onRegionStale(recv.getRegion());
|
||||
return false;
|
||||
} else if (error.hasServerIsBusy()) {
|
||||
// this error is reported from kv:
|
||||
|
@ -204,19 +207,23 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
logger.warn(
|
||||
String.format(
|
||||
"Server is busy for region [%s], reason: %s",
|
||||
ctxRegion, error.getServerIsBusy().getReason()));
|
||||
recv.getRegion(), error.getServerIsBusy().getReason()));
|
||||
backOffer.doBackOff(
|
||||
BackOffFunction.BackOffFuncType.BoServerBusy,
|
||||
new StatusRuntimeException(
|
||||
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())));
|
||||
backOffer.doBackOff(
|
||||
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
|
||||
return true;
|
||||
} else if (error.hasStaleCommand()) {
|
||||
// this error is reported from raftstore:
|
||||
// command outdated, please try later
|
||||
logger.warn(String.format("Stale command for region [%s]", ctxRegion));
|
||||
logger.warn(String.format("Stale command for region [%s]", recv.getRegion()));
|
||||
backOffer.doBackOff(
|
||||
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
|
||||
return true;
|
||||
} else if (error.hasRaftEntryTooLarge()) {
|
||||
logger.warn(String.format("Raft too large for region [%s]", ctxRegion));
|
||||
logger.warn(String.format("Raft too large for region [%s]", recv.getRegion()));
|
||||
throw new StatusRuntimeException(
|
||||
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()));
|
||||
} else if (error.hasKeyNotInRegion()) {
|
||||
|
@ -227,16 +234,18 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
logger.error(
|
||||
String.format(
|
||||
"Key not in region [%s] for key [%s], this error should not happen here.",
|
||||
ctxRegion, KeyUtils.formatBytesUTF8(invalidKey)));
|
||||
recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey)));
|
||||
throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString()));
|
||||
}
|
||||
|
||||
logger.warn(String.format("Unknown error %s for region [%s]", error, ctxRegion));
|
||||
logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion()));
|
||||
// For other errors, we only drop cache here.
|
||||
// Upper level may split this task.
|
||||
invalidateRegionStoreCache(ctxRegion);
|
||||
invalidateRegionStoreCache(recv.getRegion());
|
||||
// retry if raft proposal is dropped, it indicates the store is in the middle of transition
|
||||
if (error.getMessage().contains("Raft Proposal Dropped")) {
|
||||
if (error.getMessage().contains("Raft ProposalDropped")) {
|
||||
backOffer.doBackOff(
|
||||
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -259,7 +268,7 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
|
||||
@Override
|
||||
public boolean handleRequestError(BackOffer backOffer, Exception e) {
|
||||
regionManager.onRequestFail(ctxRegion);
|
||||
regionManager.onRequestFail(recv.getRegion());
|
||||
|
||||
backOffer.doBackOff(
|
||||
BackOffFunction.BackOffFuncType.BoTiKVRPC,
|
||||
|
|
|
@ -75,18 +75,17 @@ public abstract class AbstractRegionStoreClient
|
|||
* @return false when re-split is needed.
|
||||
*/
|
||||
@Override
|
||||
public boolean onNotLeader(Metapb.Store newStore) {
|
||||
public boolean onNotLeader(Metapb.Store newStore, TiRegion newRegion) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(region + ", new leader = " + newStore.getId());
|
||||
}
|
||||
TiRegion cachedRegion = regionManager.getRegionByKey(region.getStartKey());
|
||||
// When switch leader fails or the region changed its key range,
|
||||
// When switch leader fails or the region changed its region epoch,
|
||||
// it would be necessary to re-split task's key range for new region.
|
||||
if (!region.getStartKey().equals(cachedRegion.getStartKey())
|
||||
|| !region.getEndKey().equals(cachedRegion.getEndKey())) {
|
||||
if (!region.getRegionEpoch().equals(newRegion.getRegionEpoch())) {
|
||||
regionManager.invalidateRegion(newRegion);
|
||||
return false;
|
||||
}
|
||||
region = cachedRegion;
|
||||
region = newRegion;
|
||||
String addressStr = regionManager.getStoreById(region.getLeader().getStoreId()).getAddress();
|
||||
ManagedChannel channel = channelFactory.getChannel(addressStr);
|
||||
blockingStub = TikvGrpc.newBlockingStub(channel);
|
||||
|
@ -100,8 +99,8 @@ public abstract class AbstractRegionStoreClient
|
|||
ManagedChannel channel = channelFactory.getChannel(addressStr);
|
||||
blockingStub = TikvGrpc.newBlockingStub(channel);
|
||||
asyncStub = TikvGrpc.newStub(channel);
|
||||
if (logger.isDebugEnabled() && region.getLeader().getStoreId() != store.getId()) {
|
||||
logger.debug(
|
||||
if (region.getLeader().getStoreId() != store.getId()) {
|
||||
logger.warn(
|
||||
"store_not_match may occur? "
|
||||
+ region
|
||||
+ ", original store = "
|
||||
|
|
|
@ -20,7 +20,9 @@ package org.tikv.common.region;
|
|||
import org.tikv.kvproto.Metapb.Store;
|
||||
|
||||
public interface RegionErrorReceiver {
|
||||
boolean onNotLeader(Store store);
|
||||
boolean onNotLeader(Store store, TiRegion region);
|
||||
|
||||
void onStoreNotMatch(Store store);
|
||||
|
||||
TiRegion getRegion();
|
||||
}
|
||||
|
|
|
@ -143,7 +143,7 @@ public class RegionManager {
|
|||
}
|
||||
if (store == null) {
|
||||
// clear the region cache so we may get the learner peer next time
|
||||
cache.invalidateRegion(region.getId());
|
||||
cache.invalidateRegion(region);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -163,23 +163,26 @@ public class RegionManager {
|
|||
return cache.getStoreById(id, backOffer);
|
||||
}
|
||||
|
||||
public void onRegionStale(long regionId) {
|
||||
cache.invalidateRegion(regionId);
|
||||
public void onRegionStale(TiRegion region) {
|
||||
cache.invalidateRegion(region);
|
||||
}
|
||||
|
||||
public boolean updateLeader(long regionId, long storeId) {
|
||||
TiRegion r = cache.regionCache.get(regionId);
|
||||
public synchronized TiRegion updateLeader(TiRegion region, long storeId) {
|
||||
TiRegion r = cache.getRegionFromCache(region.getId());
|
||||
if (r != null) {
|
||||
if (!r.switchPeer(storeId)) {
|
||||
// failed to switch leader, possibly region is outdated, we need to drop region cache from
|
||||
// regionCache
|
||||
logger.warn("Cannot find peer when updating leader (" + regionId + "," + storeId + ")");
|
||||
// drop region cache using verId
|
||||
cache.invalidateRegion(regionId);
|
||||
return false;
|
||||
if (r.getLeader().getStoreId() == storeId) {
|
||||
return r;
|
||||
}
|
||||
TiRegion newRegion = r.switchPeer(storeId);
|
||||
if (newRegion != null) {
|
||||
cache.putRegion(newRegion);
|
||||
return newRegion;
|
||||
}
|
||||
// failed to switch leader, possibly region is outdated, we need to drop region cache from
|
||||
// regionCache
|
||||
logger.warn("Cannot find peer when updating leader (" + region.getId() + "," + storeId + ")");
|
||||
}
|
||||
return true;
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -188,11 +191,11 @@ public class RegionManager {
|
|||
* @param region region
|
||||
*/
|
||||
public void onRequestFail(TiRegion region) {
|
||||
onRequestFail(region.getId(), region.getLeader().getStoreId());
|
||||
onRequestFail(region, region.getLeader().getStoreId());
|
||||
}
|
||||
|
||||
public void onRequestFail(long regionId, long storeId) {
|
||||
cache.invalidateRegion(regionId);
|
||||
private void onRequestFail(TiRegion region, long storeId) {
|
||||
cache.invalidateRegion(region);
|
||||
cache.invalidateAllRegionForStore(storeId);
|
||||
}
|
||||
|
||||
|
@ -200,8 +203,8 @@ public class RegionManager {
|
|||
cache.invalidateStore(storeId);
|
||||
}
|
||||
|
||||
public void invalidateRegion(long regionId) {
|
||||
cache.invalidateRegion(regionId);
|
||||
public void invalidateRegion(TiRegion region) {
|
||||
cache.invalidateRegion(region);
|
||||
}
|
||||
|
||||
public static class RegionCache {
|
||||
|
@ -277,17 +280,22 @@ public class RegionManager {
|
|||
return region;
|
||||
}
|
||||
|
||||
private synchronized TiRegion getRegionFromCache(long regionId) {
|
||||
return regionCache.get(regionId);
|
||||
}
|
||||
|
||||
/** Removes region associated with regionId from regionCache. */
|
||||
public synchronized void invalidateRegion(long regionId) {
|
||||
public synchronized void invalidateRegion(TiRegion region) {
|
||||
try {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("invalidateRegion ID[%s]", regionId));
|
||||
logger.debug(String.format("invalidateRegion ID[%s]", region.getId()));
|
||||
}
|
||||
TiRegion oldRegion = regionCache.get(region.getId());
|
||||
if (oldRegion != null && oldRegion == region) {
|
||||
keyToRegionIdCache.remove(makeRange(region.getStartKey(), region.getEndKey()));
|
||||
regionCache.remove(region.getId());
|
||||
}
|
||||
TiRegion region = regionCache.get(regionId);
|
||||
keyToRegionIdCache.remove(makeRange(region.getStartKey(), region.getEndKey()));
|
||||
} catch (Exception ignore) {
|
||||
} finally {
|
||||
regionCache.remove(regionId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -179,7 +179,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
regionManager,
|
||||
this,
|
||||
lockResolverClient,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> resp.hasError() ? resp.getError() : null,
|
||||
resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
|
||||
|
@ -224,7 +223,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
regionManager,
|
||||
this,
|
||||
lockResolverClient,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> null,
|
||||
resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
|
||||
|
@ -291,7 +289,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
regionManager,
|
||||
this,
|
||||
lockResolverClient,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> null,
|
||||
resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
|
||||
|
@ -406,7 +403,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
regionManager,
|
||||
this,
|
||||
lockResolverClient,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> null,
|
||||
resolveLockResult -> null,
|
||||
|
@ -483,7 +479,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
regionManager,
|
||||
this,
|
||||
lockResolverClient,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> resp.hasError() ? resp.getError() : null,
|
||||
resolveLockResult -> null,
|
||||
|
@ -539,7 +534,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
regionManager,
|
||||
this,
|
||||
lockResolverClient,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> resp.hasError() ? resp.getError() : null,
|
||||
resolveLockResult -> null,
|
||||
|
@ -607,7 +601,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
regionManager,
|
||||
this,
|
||||
lockResolverClient,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> null,
|
||||
resolveLockResult -> addResolvedLocks(startTs, resolveLockResult.getResolvedLocks()),
|
||||
|
@ -730,7 +723,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
regionManager,
|
||||
this,
|
||||
lockResolverClient,
|
||||
region,
|
||||
StreamingResponse::getFirstRegionError, // TODO: handle all errors in streaming response
|
||||
resp -> null,
|
||||
resolveLockResult -> addResolvedLocks(startTs, resolveLockResult.getResolvedLocks()),
|
||||
|
@ -766,7 +758,6 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
regionManager,
|
||||
this,
|
||||
null,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> null,
|
||||
resolveLockResult -> null,
|
||||
|
@ -813,10 +804,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
() -> RawGetRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
|
||||
KVErrorHandler<RawGetResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
regionManager,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
RawGetResponse resp = callWithRetry(backOffer, TikvGrpc.getRawGetMethod(), factory, handler);
|
||||
return rawGetHelper(resp);
|
||||
} finally {
|
||||
|
@ -848,10 +836,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
RawGetKeyTTLRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
|
||||
KVErrorHandler<RawGetKeyTTLResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
regionManager,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
RawGetKeyTTLResponse resp =
|
||||
callWithRetry(backOffer, TikvGrpc.getRawGetKeyTTLMethod(), factory, handler);
|
||||
return rawGetKeyTTLHelper(resp);
|
||||
|
@ -887,10 +872,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
|
||||
KVErrorHandler<RawDeleteResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
regionManager,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
RawDeleteResponse resp =
|
||||
callWithRetry(backOffer, TikvGrpc.getRawDeleteMethod(), factory, handler);
|
||||
rawDeleteHelper(resp, region);
|
||||
|
@ -928,10 +910,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
|
||||
KVErrorHandler<RawPutResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
regionManager,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
RawPutResponse resp = callWithRetry(backOffer, TikvGrpc.getRawPutMethod(), factory, handler);
|
||||
rawPutHelper(resp);
|
||||
} finally {
|
||||
|
@ -970,10 +949,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
|
||||
KVErrorHandler<RawCASResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
regionManager,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
RawCASResponse resp =
|
||||
callWithRetry(backOffer, TikvGrpc.getRawCompareAndSetMethod(), factory, handler);
|
||||
return rawPutIfAbsentHelper(resp);
|
||||
|
@ -1015,10 +991,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
.build();
|
||||
KVErrorHandler<RawBatchGetResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
regionManager,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
RawBatchGetResponse resp =
|
||||
callWithRetry(backoffer, TikvGrpc.getRawBatchGetMethod(), factory, handler);
|
||||
return handleRawBatchGet(resp);
|
||||
|
@ -1055,10 +1028,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
.build();
|
||||
KVErrorHandler<RawBatchPutResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
regionManager,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
RawBatchPutResponse resp =
|
||||
callWithRetry(backOffer, TikvGrpc.getRawBatchPutMethod(), factory, handler);
|
||||
handleRawBatchPut(resp);
|
||||
|
@ -1106,10 +1076,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
.build();
|
||||
KVErrorHandler<RawBatchDeleteResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
regionManager,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
RawBatchDeleteResponse resp =
|
||||
callWithRetry(backoffer, TikvGrpc.getRawBatchDeleteMethod(), factory, handler);
|
||||
handleRawBatchDelete(resp);
|
||||
|
@ -1156,10 +1123,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
|
||||
KVErrorHandler<RawScanResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
regionManager,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
RawScanResponse resp =
|
||||
callWithRetry(backOffer, TikvGrpc.getRawScanMethod(), factory, handler);
|
||||
return rawScanHelper(resp);
|
||||
|
@ -1204,10 +1168,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
|
||||
KVErrorHandler<RawDeleteRangeResponse> handler =
|
||||
new KVErrorHandler<>(
|
||||
regionManager,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
|
||||
RawDeleteRangeResponse resp =
|
||||
callWithRetry(backOffer, TikvGrpc.getRawDeleteRangeMethod(), factory, handler);
|
||||
rawDeleteRangeHelper(resp);
|
||||
|
|
|
@ -42,7 +42,7 @@ public class TiRegion implements Serializable {
|
|||
private final Region meta;
|
||||
private final IsolationLevel isolationLevel;
|
||||
private final Kvrpcpb.CommandPri commandPri;
|
||||
private Peer leader;
|
||||
private final Peer leader;
|
||||
private int followerIdx = 0;
|
||||
private final boolean isReplicaRead;
|
||||
|
||||
|
@ -55,6 +55,19 @@ public class TiRegion implements Serializable {
|
|||
this(meta, leader, isolationLevel, commandPri, kvMode, false);
|
||||
}
|
||||
|
||||
private TiRegion(
|
||||
Region meta,
|
||||
Peer leader,
|
||||
IsolationLevel isolationLevel,
|
||||
Kvrpcpb.CommandPri commandPri,
|
||||
boolean isReplicaRead) {
|
||||
this.meta = meta;
|
||||
this.leader = leader;
|
||||
this.isolationLevel = isolationLevel;
|
||||
this.commandPri = commandPri;
|
||||
this.isReplicaRead = isReplicaRead;
|
||||
}
|
||||
|
||||
public TiRegion(
|
||||
Region meta,
|
||||
Peer leader,
|
||||
|
@ -199,17 +212,16 @@ public class TiRegion implements Serializable {
|
|||
* storeID.
|
||||
*
|
||||
* @param leaderStoreID is leader peer id.
|
||||
* @return false if no peers matches the store id.
|
||||
* @return null if no peers matches the store id.
|
||||
*/
|
||||
boolean switchPeer(long leaderStoreID) {
|
||||
public TiRegion switchPeer(long leaderStoreID) {
|
||||
List<Peer> peers = meta.getPeersList();
|
||||
for (Peer p : peers) {
|
||||
if (p.getStoreId() == leaderStoreID) {
|
||||
this.leader = p;
|
||||
return true;
|
||||
return new TiRegion(this.meta, p, this.isolationLevel, this.commandPri, this.isReplicaRead);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
|
||||
public boolean isMoreThan(ByteString key) {
|
||||
|
|
|
@ -684,7 +684,7 @@ public class RawKVClient implements AutoCloseable {
|
|||
return Pair.create(new ArrayList<>(), partialResult);
|
||||
} catch (final TiKVException e) {
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
|
||||
clientBuilder.getRegionManager().invalidateRegion(batch.region);
|
||||
logger.warn("ReSplitting ranges for BatchGetRequest", e);
|
||||
|
||||
// retry
|
||||
|
@ -726,7 +726,7 @@ public class RawKVClient implements AutoCloseable {
|
|||
return new ArrayList<>();
|
||||
} catch (final TiKVException e) {
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
|
||||
clientBuilder.getRegionManager().invalidateRegion(batch.region);
|
||||
logger.warn("ReSplitting ranges for BatchGetRequest", e);
|
||||
|
||||
// retry
|
||||
|
@ -777,7 +777,7 @@ public class RawKVClient implements AutoCloseable {
|
|||
return new ArrayList<>();
|
||||
} catch (final TiKVException e) {
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||
clientBuilder.getRegionManager().invalidateRegion(range.getRegion().getId());
|
||||
clientBuilder.getRegionManager().invalidateRegion(range.getRegion());
|
||||
logger.warn("ReSplitting ranges for BatchDeleteRangeRequest", e);
|
||||
|
||||
// retry
|
||||
|
|
|
@ -160,7 +160,7 @@ public class KVClient implements AutoCloseable {
|
|||
return client.batchGet(backOffer, batch.keys, version);
|
||||
} catch (final TiKVException e) {
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
|
||||
clientBuilder.getRegionManager().invalidateRegion(batch.region);
|
||||
logger.warn("ReSplitting ranges for BatchGetRequest", e);
|
||||
|
||||
// retry
|
||||
|
|
|
@ -134,7 +134,6 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
|
|||
regionManager,
|
||||
this,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> resp.hasError() ? resp.getError() : null,
|
||||
resolveLockResult -> null,
|
||||
|
@ -251,7 +250,6 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
|
|||
regionManager,
|
||||
this,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> resp.hasError() ? resp.getError() : null,
|
||||
resolveLockResult -> null,
|
||||
|
|
|
@ -171,7 +171,6 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient
|
|||
regionManager,
|
||||
this,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> resp.hasError() ? resp.getError() : null,
|
||||
resolveLockResult -> null,
|
||||
|
@ -248,7 +247,6 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient
|
|||
regionManager,
|
||||
primaryKeyRegionStoreClient,
|
||||
primaryKeyRegionStoreClient.lockResolverClient,
|
||||
primaryKeyRegion,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> resp.hasError() ? resp.getError() : null,
|
||||
resolveLockResult -> null,
|
||||
|
|
|
@ -180,7 +180,6 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
|
|||
regionManager,
|
||||
this,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> resp.getErrorsCount() > 0 ? resp.getErrorsList().get(0) : null,
|
||||
resolveLockResult -> null,
|
||||
|
@ -306,7 +305,6 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
|
|||
regionManager,
|
||||
primaryKeyRegionStoreClient,
|
||||
primaryKeyRegionStoreClient.lockResolverClient,
|
||||
primaryKeyRegion,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> resp.hasError() ? resp.getError() : null,
|
||||
resolveLockResult -> null,
|
||||
|
@ -386,7 +384,6 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
|
|||
regionManager,
|
||||
this,
|
||||
this,
|
||||
region,
|
||||
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
|
||||
resp -> resp.hasError() ? resp.getError() : null,
|
||||
resolveLockResult -> null,
|
||||
|
|
|
@ -122,7 +122,7 @@ public class TTLManager {
|
|||
String.format("sendTxnHeartBeat failed, regionId=%s", tiRegion.getId()),
|
||||
result.getException()));
|
||||
this.regionManager.invalidateStore(store.getId());
|
||||
this.regionManager.invalidateRegion(tiRegion.getId());
|
||||
this.regionManager.invalidateRegion(tiRegion);
|
||||
// re-split keys and commit again.
|
||||
sendTxnHeartBeat(bo, ttl);
|
||||
} catch (GrpcException e) {
|
||||
|
|
Loading…
Reference in New Issue