Fix data race in context (#22)

This commit is contained in:
birdstorm 2018-12-28 10:51:07 +08:00 committed by GitHub
parent 45d8f78727
commit 24719494ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 296 additions and 148 deletions

View File

@ -16,6 +16,8 @@
package org.tikv.common;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.tikv.common.operation.PDErrorHandler.getRegionResponseErrorExtractor;
import static org.tikv.common.pd.PDError.buildFromPdpbError;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HostAndPort;
@ -59,7 +61,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
Supplier<TsoRequest> request = () -> tsoReq;
PDErrorHandler<TsoResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
this);
TsoResponse resp = callWithRetry(backOffer, PDGrpc.METHOD_TSO, request, handler);
Timestamp timestamp = resp.getTimestamp();
@ -80,7 +84,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.METHOD_GET_REGION, request, handler);
return new TiRegion(
@ -106,7 +110,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
callAsyncWithRetry(backOffer, PDGrpc.METHOD_GET_REGION, request, responseObserver, handler);
return responseObserver.getFuture();
@ -117,7 +121,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
Supplier<GetRegionByIDRequest> request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.METHOD_GET_REGION_BY_ID, request, handler);
@ -145,7 +149,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
Supplier<GetRegionByIDRequest> request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
callAsyncWithRetry(
backOffer, PDGrpc.METHOD_GET_REGION_BY_ID, request, responseObserver, handler);
@ -157,7 +161,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
Supplier<GetStoreRequest> request =
() -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build();
PDErrorHandler<GetStoreResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
this);
GetStoreResponse resp = callWithRetry(backOffer, PDGrpc.METHOD_GET_STORE, request, handler);
return resp.getStore();
@ -171,7 +177,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
Supplier<GetStoreRequest> request =
() -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build();
PDErrorHandler<GetStoreResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
this);
callAsyncWithRetry(backOffer, PDGrpc.METHOD_GET_STORE, request, responseObserver, handler);
return responseObserver.getFuture();

View File

@ -96,13 +96,13 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
// if there's current no leader, we do not trigger update pd cache logic
// since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail.
if (newStoreId != NO_LEADER_STORE_ID) {
if (!this.regionManager.updateLeader(ctxRegion.getId(), newStoreId)
if (!this.regionManager.checkAndDropLeader(ctxRegion.getId(), newStoreId)
|| !recv.onNotLeader(this.regionManager.getStoreById(newStoreId))) {
// If update leader fails, we need to fetch new region info from pd,
// and re-split key range for new region. Setting retry to false will
// stop retry and enter handleCopResponse logic, which would use RegionMiss
// backOff strategy to wait, fetch new region and re-split key range.
// onNotLeader is only needed when updateLeader succeeds, thus switch
// onNotLeader is only needed when checkAndDropLeader succeeds, thus switch
// to a new store address.
retry = false;
}
@ -181,7 +181,7 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
regionManager.onRequestFail(ctxRegion.getId(), ctxRegion.getLeader().getStoreId());
regionManager.onRequestFail(ctxRegion);
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoTiKVRPC,

View File

@ -17,20 +17,30 @@
package org.tikv.common.operation;
import static org.tikv.common.pd.PDError.buildFromPdpbError;
import java.util.function.Function;
import org.apache.log4j.Logger;
import org.tikv.common.PDClient;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.pd.PDError;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.kvproto.Pdpb;
public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
private static final Logger logger = Logger.getLogger(PDErrorHandler.class);
private final Function<RespT, Pdpb.Error> getError;
private final Function<RespT, PDError> getError;
private final PDClient client;
public PDErrorHandler(Function<RespT, Pdpb.Error> errorExtractor, PDClient client) {
public static final Function<Pdpb.GetRegionResponse, PDError> getRegionResponseErrorExtractor =
r ->
r.getHeader().hasError()
? buildFromPdpbError(r.getHeader().getError())
: r.getRegion().getId() == 0 ? PDError.RegionPeerNotElected.DEFAULT_INSTANCE : null;
public PDErrorHandler(Function<RespT, PDError> errorExtractor, PDClient client) {
this.getError = errorExtractor;
this.client = client;
}
@ -40,12 +50,22 @@ public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
if (resp == null) {
return false;
}
Pdpb.Error error = getError.apply(resp);
PDError error = getError.apply(resp);
if (error != null) {
switch (error.getErrorType()) {
case PD_ERROR:
client.updateLeader();
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
return true;
case REGION_PEER_NOT_ELECTED:
logger.info(error.getMessage());
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
return true;
default:
throw new TiClientInternalException("Unknown error type encountered: " + error);
}
}
return false;
}

View File

@ -0,0 +1,104 @@
/*
*
* Copyright 2018 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.pd;
import org.tikv.kvproto.Pdpb;
public final class PDError {
private final Pdpb.Error error;
private final ErrorType errorType;
public enum ErrorType {
PD_ERROR,
REGION_PEER_NOT_ELECTED
}
private PDError(Pdpb.Error error) {
this.error = error;
this.errorType = ErrorType.PD_ERROR;
}
private PDError(Pdpb.Error error, ErrorType errorType) {
this.error = error;
this.errorType = errorType;
}
public static PDError buildFromPdpbError(Pdpb.Error error) {
return new PDError(error);
}
public static Builder newBuilder() {
return new Builder();
}
public static Builder newBuilder(Pdpb.Error error) {
return new Builder(error);
}
public Pdpb.Error getError() {
return error;
}
public ErrorType getErrorType() {
return errorType;
}
public String getMessage() {
return getError().getMessage();
}
@Override
public String toString() {
return "\nErrorType: " + errorType + "\nError: " + error;
}
public static final class RegionPeerNotElected {
private static final String ERROR_MESSAGE = "Region Peer not elected. Please try later";
private static final Pdpb.Error DEFAULT_ERROR =
Pdpb.Error.newBuilder().setMessage(ERROR_MESSAGE).build();
private static final ErrorType ERROR_TYPE = ErrorType.REGION_PEER_NOT_ELECTED;
public static final PDError DEFAULT_INSTANCE =
PDError.newBuilder(DEFAULT_ERROR).setErrorType(ERROR_TYPE).build();
}
public static final class Builder {
private Pdpb.Error error_;
private ErrorType errorType_ = ErrorType.PD_ERROR;
public Builder() {}
public Builder(Pdpb.Error error) {
this.error_ = error;
}
public Builder setError(Pdpb.Error error) {
this.error_ = error;
return this;
}
public Builder setErrorType(ErrorType errorType) {
this.errorType_ = errorType;
return this;
}
public PDError build() {
return new PDError(error_, errorType_);
}
}
}

View File

@ -73,7 +73,7 @@ public class RegionManager {
}
if (regionId == null) {
logger.debug("Key not find in keyToRegionIdCache:" + formatBytes(key));
logger.debug("Key not found in keyToRegionIdCache:" + formatBytes(key));
TiRegion region = pdClient.getRegionByKey(ConcreteBackOffer.newGetBackOff(), key);
if (!putRegion(region)) {
throw new TiClientInternalException("Invalid Region: " + region.toString());
@ -210,15 +210,16 @@ public class RegionManager {
cache.invalidateRegion(regionId);
}
public boolean updateLeader(long regionId, long storeId) {
public boolean checkAndDropLeader(long regionId, long storeId) {
TiRegion r = cache.regionCache.get(regionId);
if (r != null) {
if (!r.switchPeer(storeId)) {
TiRegion r2 = r.withNewLeader(storeId);
// drop region cache using verId
cache.invalidateRegion(regionId);
if (r2.getLeader().getStoreId() != storeId) {
// failed to switch leader, possibly region is outdated, we need to drop region cache from
// regionCache
logger.warn("Cannot find peer when updating leader (" + regionId + "," + storeId + ")");
// drop region cache using verId
cache.invalidateRegion(regionId);
return false;
}
}
@ -228,12 +229,11 @@ public class RegionManager {
/**
* Clears all cache when a TiKV server does not respond
*
* @param regionId region's id
* @param storeId TiKV store's id
* @param region region
*/
public void onRequestFail(long regionId, long storeId) {
cache.invalidateRegion(regionId);
cache.invalidateAllRegionForStore(storeId);
public void onRequestFail(TiRegion region) {
cache.invalidateRegion(region.getId());
cache.invalidateAllRegionForStore(region.getLeader().getStoreId());
}
public void invalidateStore(long storeId) {

View File

@ -67,10 +67,14 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
while (true) {
// we should refresh region
region = regionManager.getRegionByKey(key);
Context context = region.getContext();
Supplier<GetRequest> factory =
() -> GetRequest.newBuilder().setContext(context).setKey(key).setVersion(version).build();
() ->
GetRequest.newBuilder()
.setContext(region.getContext())
.setKey(key)
.setVersion(version)
.build();
KVErrorHandler<GetResponse> handler =
new KVErrorHandler<>(
@ -81,14 +85,21 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
GetResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_GET, factory, handler);
if (getHelper(backOffer, resp)) {
return resp.getValue();
}
}
}
private boolean getHelper(BackOffer backOffer, GetResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(context.getRegionId(), context.getPeer().getStoreId());
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("GetResponse failed without a cause");
}
if (resp.hasRegionError()) {
backOffer.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
continue;
return false;
}
if (resp.hasError()) {
@ -101,17 +112,14 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
backOffer.doBackOff(
BoTxnLockFast, new KeyException((resp.getError().getLocked().toString())));
}
continue;
return false;
} else {
// retry or abort
// this should trigger Spark to retry the txn
throw new KeyException(resp.getError());
}
}
return resp.getValue();
}
return true;
}
// TODO: batch get should consider key range split
@ -169,11 +177,10 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
public List<KvPair> scan(
BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) {
Context context = region.getContext();
Supplier<ScanRequest> request =
() ->
ScanRequest.newBuilder()
.setContext(context)
.setContext(region.getContext())
.setStartKey(startKey)
.setVersion(version)
.setKeyOnly(keyOnly)
@ -187,10 +194,6 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
ScanResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_SCAN, request, handler);
if (resp == null) {
this.regionManager.onRequestFail(context.getRegionId(), context.getPeer().getStoreId());
throw new TiClientInternalException("ScanResponse failed without a cause");
}
return scanHelper(resp, backOffer);
}
@ -199,6 +202,11 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
// which we shoule retry not throw
// exception
private List<KvPair> scanHelper(ScanResponse resp, BackOffer bo) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("ScanResponse failed without a cause");
}
List<Lock> locks = new ArrayList<>();
for (KvPair pair : resp.getPairsList()) {
@ -212,7 +220,7 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
}
}
if (locks.size() > 0) {
if (!locks.isEmpty()) {
boolean ok = lockResolverClient.resolveLocks(bo, locks);
if (!ok) {
// if not resolve all locks, we wait and retry
@ -235,9 +243,8 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
// APIs for Raw Scan/Put/Get/Delete
public ByteString rawGet(BackOffer backOffer, ByteString key) {
Context context = region.getContext();
Supplier<RawGetRequest> factory =
() -> RawGetRequest.newBuilder().setContext(context).setKey(key).build();
() -> RawGetRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
KVErrorHandler<RawGetResponse> handler =
new KVErrorHandler<>(
regionManager,
@ -245,12 +252,12 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawGetResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_RAW_GET, factory, handler);
return rawGetHelper(resp, context);
return rawGetHelper(resp);
}
private ByteString rawGetHelper(RawGetResponse resp, Context context) {
private ByteString rawGetHelper(RawGetResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(context.getRegionId(), context.getPeer().getStoreId());
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawGetResponse failed without a cause");
}
String error = resp.getError();
@ -263,9 +270,9 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
return resp.getValue();
}
public void rawDelete(BackOffer backOffer, ByteString key, Context context) {
public void rawDelete(BackOffer backOffer, ByteString key) {
Supplier<RawDeleteRequest> factory =
() -> RawDeleteRequest.newBuilder().setContext(context).setKey(key).build();
() -> RawDeleteRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
KVErrorHandler<RawDeleteResponse> handler =
new KVErrorHandler<>(
@ -274,12 +281,12 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawDeleteResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_RAW_DELETE, factory, handler);
rawDeleteHelper(resp, context);
rawDeleteHelper(resp, region);
}
private void rawDeleteHelper(RawDeleteResponse resp, Context context) {
private void rawDeleteHelper(RawDeleteResponse resp, TiRegion region) {
if (resp == null) {
this.regionManager.onRequestFail(context.getRegionId(), context.getPeer().getStoreId());
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawDeleteResponse failed without a cause");
}
String error = resp.getError();
@ -292,9 +299,13 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
}
public void rawPut(BackOffer backOffer, ByteString key, ByteString value) {
Context context = region.getContext();
Supplier<RawPutRequest> factory =
() -> RawPutRequest.newBuilder().setContext(context).setKey(key).setValue(value).build();
() ->
RawPutRequest.newBuilder()
.setContext(region.getContext())
.setKey(key)
.setValue(value)
.build();
KVErrorHandler<RawPutResponse> handler =
new KVErrorHandler<>(
@ -303,12 +314,12 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawPutResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_RAW_PUT, factory, handler);
rawPutHelper(resp, context);
rawPutHelper(resp);
}
private void rawPutHelper(RawPutResponse resp, Context context) {
private void rawPutHelper(RawPutResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(context.getRegionId(), context.getPeer().getStoreId());
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawPutResponse failed without a cause");
}
String error = resp.getError();
@ -324,9 +335,12 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
if (kvPairs.isEmpty()) {
return;
}
Context context = region.getContext();
Supplier<RawBatchPutRequest> factory =
() -> RawBatchPutRequest.newBuilder().setContext(context).addAllPairs(kvPairs).build();
() ->
RawBatchPutRequest.newBuilder()
.setContext(region.getContext())
.addAllPairs(kvPairs)
.build();
KVErrorHandler<RawBatchPutResponse> handler =
new KVErrorHandler<>(
regionManager,
@ -335,12 +349,12 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawBatchPutResponse resp =
callWithRetry(backOffer, TikvGrpc.METHOD_RAW_BATCH_PUT, factory, handler);
handleRawBatchPut(resp, context);
handleRawBatchPut(resp);
}
private void handleRawBatchPut(RawBatchPutResponse resp, Context context) {
private void handleRawBatchPut(RawBatchPutResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(context.getRegionId(), context.getPeer().getStoreId());
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawBatchPutResponse failed without a cause");
}
if (resp.hasRegionError()) {
@ -358,11 +372,10 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
* @return KvPair list
*/
private List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
Context context = region.getContext();
Supplier<RawScanRequest> factory =
() ->
RawScanRequest.newBuilder()
.setContext(context)
.setContext(region.getContext())
.setStartKey(key)
.setKeyOnly(keyOnly)
.setLimit(limit)
@ -375,7 +388,7 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawScanResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_RAW_SCAN, factory, handler);
return rawScanHelper(resp, context);
return rawScanHelper(resp);
}
public List<KvPair> rawScan(BackOffer backOffer, ByteString key) {
@ -386,9 +399,9 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
return rawScan(backOffer, key, limit, false);
}
private List<KvPair> rawScanHelper(RawScanResponse resp, Context context) {
private List<KvPair> rawScanHelper(RawScanResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(context.getRegionId(), context.getPeer().getStoreId());
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawScanResponse failed without a cause");
}
if (resp.hasRegionError()) {
@ -449,15 +462,15 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
if (logger.isDebugEnabled()) {
logger.debug(region + ", new leader = " + newStore.getId());
}
TiRegion cachedRegion = getSession().getRegionManager().getRegionById(region.getId());
TiRegion cachedRegion = regionManager.getRegionById(region.getId());
// When switch leader fails or the region changed its key range,
// it would be necessary to re-split task's key range for new region.
if (!region.switchPeer(newStore.getId())
|| !region.getStartKey().equals(cachedRegion.getStartKey())
if (!region.getStartKey().equals(cachedRegion.getStartKey())
|| !region.getEndKey().equals(cachedRegion.getEndKey())) {
return false;
}
String addressStr = newStore.getAddress();
region = cachedRegion;
String addressStr = regionManager.getStoreById(region.getLeader().getStoreId()).getAddress();
ManagedChannel channel = getSession().getChannel(addressStr);
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);

View File

@ -19,10 +19,8 @@ package org.tikv.common.region;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.KeyUtils;
@ -36,10 +34,10 @@ import org.tikv.kvproto.Metapb.Region;
public class TiRegion implements Serializable {
private final Region meta;
private final Set<Long> unreachableStores;
private Peer peer;
private final Peer peer;
private final IsolationLevel isolationLevel;
private final Kvrpcpb.CommandPri commandPri;
private Kvrpcpb.Context cachedContext;
public TiRegion(
Region meta,
@ -57,11 +55,22 @@ public class TiRegion implements Serializable {
} else {
this.peer = peer;
}
this.unreachableStores = new HashSet<>();
this.isolationLevel = isolationLevel;
this.commandPri = commandPri;
}
private TiRegion(
Region meta, Peer peer, IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri) {
this.meta = meta;
this.peer = peer;
this.isolationLevel = isolationLevel;
this.commandPri = commandPri;
}
private TiRegion withNewLeader(Peer p) {
return new TiRegion(this.meta, p, this.isolationLevel, this.commandPri);
}
private Region decodeRegion(Region region, boolean isRawRegion) {
Region.Builder builder =
Region.newBuilder()
@ -103,11 +112,18 @@ public class TiRegion implements Serializable {
}
public Kvrpcpb.Context getContext() {
if (cachedContext == null) {
Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder();
builder.setIsolationLevel(this.isolationLevel);
builder.setPriority(this.commandPri);
builder.setRegionId(meta.getId()).setPeer(this.peer).setRegionEpoch(this.meta.getRegionEpoch());
return builder.build();
builder
.setRegionId(this.meta.getId())
.setPeer(this.peer)
.setRegionEpoch(this.meta.getRegionEpoch());
cachedContext = builder.build();
return cachedContext;
}
return cachedContext;
}
public class RegionVerID {
@ -133,17 +149,16 @@ public class TiRegion implements Serializable {
* storeID.
*
* @param leaderStoreID is leader peer id.
* @return false if no peers matches the store id.
* @return a copy of current region with new leader store id
*/
public boolean switchPeer(long leaderStoreID) {
public TiRegion withNewLeader(long leaderStoreID) {
List<Peer> peers = meta.getPeersList();
for (Peer p : peers) {
if (p.getStoreId() == leaderStoreID) {
this.peer = p;
return true;
return withNewLeader(p);
}
}
return false;
return this;
}
public boolean contains(ByteString key) {

View File

@ -103,10 +103,10 @@ public class RawKVClient implements AutoCloseable {
}
private void batchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs) {
Map<Long, List<ByteString>> groupKeys = groupKeysByRegion(kvPairs.keySet());
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(kvPairs.keySet());
List<Batch> batches = new ArrayList<>();
for (Map.Entry<Long, List<ByteString>> entry : groupKeys.entrySet()) {
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
batches,
entry.getKey(),
@ -172,17 +172,10 @@ public class RawKVClient implements AutoCloseable {
public void delete(ByteString key) {
BackOffer backOffer = defaultBackOff();
while (true) {
TiRegion region = regionManager.getRegionByKey(key);
Kvrpcpb.Context context =
Kvrpcpb.Context.newBuilder()
.setRegionId(region.getId())
.setRegionEpoch(region.getRegionEpoch())
.setPeer(region.getLeader())
.build();
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByKey(key);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
try {
client.rawDelete(defaultBackOff(), key, context);
client.rawDelete(defaultBackOff(), key);
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
@ -190,13 +183,14 @@ public class RawKVClient implements AutoCloseable {
}
}
private class Batch {
private long regionId;
private List<ByteString> keys;
private List<ByteString> values;
/** A Batch containing the region, a list of keys and/or values to send */
private final class Batch {
private final TiRegion region;
private final List<ByteString> keys;
private final List<ByteString> values;
public Batch(long regionId, List<ByteString> keys, List<ByteString> values) {
this.regionId = regionId;
public Batch(TiRegion region, List<ByteString> keys, List<ByteString> values) {
this.region = region;
this.keys = keys;
this.values = values;
}
@ -206,14 +200,14 @@ public class RawKVClient implements AutoCloseable {
* Append batch to list and split them according to batch limit
*
* @param batches a grouped batch
* @param regionId region ID
* @param region region
* @param keys keys
* @param values values
* @param limit batch max limit
*/
private void appendBatches(
List<Batch> batches,
long regionId,
TiRegion region,
List<ByteString> keys,
List<ByteString> values,
int limit) {
@ -221,7 +215,7 @@ public class RawKVClient implements AutoCloseable {
List<ByteString> tmpValues = new ArrayList<>();
for (int i = 0; i < keys.size(); i++) {
if (i >= limit) {
batches.add(new Batch(regionId, tmpKeys, tmpValues));
batches.add(new Batch(region, tmpKeys, tmpValues));
tmpKeys.clear();
tmpValues.clear();
}
@ -229,7 +223,7 @@ public class RawKVClient implements AutoCloseable {
tmpValues.add(values.get(i));
}
if (!tmpKeys.isEmpty()) {
batches.add(new Batch(regionId, tmpKeys, tmpValues));
batches.add(new Batch(region, tmpKeys, tmpValues));
}
}
@ -237,16 +231,16 @@ public class RawKVClient implements AutoCloseable {
* Group by list of keys according to its region
*
* @param keys keys
* @return a mapping of keys and their regionId
* @return a mapping of keys and their region
*/
private Map<Long, List<ByteString>> groupKeysByRegion(Set<ByteString> keys) {
Map<Long, List<ByteString>> groups = new HashMap<>();
private Map<TiRegion, List<ByteString>> groupKeysByRegion(Set<ByteString> keys) {
Map<TiRegion, List<ByteString>> groups = new HashMap<>();
TiRegion lastRegion = null;
for (ByteString key : keys) {
if (lastRegion == null || !lastRegion.contains(key)) {
lastRegion = regionManager.getRegionByKey(key);
}
groups.computeIfAbsent(lastRegion.getId(), k -> new ArrayList<>()).add(key);
groups.computeIfAbsent(lastRegion, k -> new ArrayList<>()).add(key);
}
return groups;
}
@ -271,9 +265,11 @@ public class RawKVClient implements AutoCloseable {
completionService.submit(
() -> {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
Pair<TiRegion, Metapb.Store> pair =
regionManager.getRegionStorePairByRegionId(batch.regionId);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
RegionStoreClient client =
RegionStoreClient.create(
batch.region,
regionManager.getStoreById(batch.region.getLeader().getStoreId()),
session);
List<Kvrpcpb.KvPair> kvPairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
kvPairs.add(

View File

@ -276,12 +276,13 @@ public class LockResolverClient extends AbstractGRPCClient<TikvBlockingStub, Tik
TiRegion cachedRegion = getSession().getRegionManager().getRegionById(region.getId());
// When switch leader fails or the region changed its key range,
// it would be necessary to re-split task's key range for new region.
if (!region.switchPeer(newStore.getId())
|| !region.getStartKey().equals(cachedRegion.getStartKey())
if (!region.getStartKey().equals(cachedRegion.getStartKey())
|| !region.getEndKey().equals(cachedRegion.getEndKey())) {
return false;
}
String addressStr = newStore.getAddress();
region = cachedRegion;
String addressStr =
getSession().getRegionManager().getStoreById(region.getLeader().getStoreId()).getAddress();
ManagedChannel channel = getSession().getChannel(addressStr);
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
@ -294,14 +295,5 @@ public class LockResolverClient extends AbstractGRPCClient<TikvBlockingStub, Tik
ManagedChannel channel = getSession().getChannel(addressStr);
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
if (logger.isDebugEnabled() && region.getLeader().getStoreId() != store.getId()) {
logger.debug(
"store_not_match may occur? "
+ region
+ ", original store = "
+ store.getId()
+ " address = "
+ addressStr);
}
}
}