mirror of https://github.com/tikv/client-java.git
fix error in region decode when using rawKVClient API
Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
parent
e9641249d9
commit
6306440218
|
@ -94,6 +94,21 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
resp.getRegion(), resp.getLeader(), conf.getIsolationLevel(), conf.getCommandPriority());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TiRegion getRegionByRawKey(BackOffer backOffer, ByteString key) {
|
||||
Supplier<GetRegionRequest> request =
|
||||
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
|
||||
PDErrorHandler<GetRegionResponse> handler =
|
||||
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
|
||||
GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.METHOD_GET_REGION, request, handler);
|
||||
return new TiRegion(
|
||||
resp.getRegion(),
|
||||
resp.getLeader(),
|
||||
conf.getIsolationLevel(),
|
||||
conf.getCommandPriority(),
|
||||
true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key) {
|
||||
FutureObserver<TiRegion, GetRegionResponse> responseObserver =
|
||||
|
|
|
@ -39,6 +39,8 @@ public interface ReadOnlyPDClient {
|
|||
*/
|
||||
TiRegion getRegionByKey(BackOffer backOffer, ByteString key);
|
||||
|
||||
TiRegion getRegionByRawKey(BackOffer backOffer, ByteString key);
|
||||
|
||||
Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key);
|
||||
|
||||
/**
|
||||
|
|
|
@ -30,8 +30,6 @@ import java.util.Map;
|
|||
import org.apache.log4j.Logger;
|
||||
import org.tikv.ReadOnlyPDClient;
|
||||
import org.tikv.TiSession;
|
||||
import org.tikv.codec.Codec;
|
||||
import org.tikv.codec.CodecDataOutput;
|
||||
import org.tikv.exception.GrpcException;
|
||||
import org.tikv.exception.TiClientInternalException;
|
||||
import org.tikv.key.Key;
|
||||
|
@ -91,9 +89,24 @@ public class RegionManager {
|
|||
}
|
||||
|
||||
synchronized TiRegion getRegionByRawKey(ByteString key) {
|
||||
CodecDataOutput cdo = new CodecDataOutput();
|
||||
Codec.BytesCodec.writeBytes(cdo, key.toByteArray());
|
||||
return getRegionByKey(cdo.toByteString());
|
||||
Long regionId;
|
||||
regionId = keyToRegionIdCache.get(Key.toRawKey(key));
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("getRegionByKey key[%s] -> ID[%s]", formatBytes(key), regionId));
|
||||
}
|
||||
if (regionId == null) {
|
||||
logger.debug("Key not find in keyToRegionIdCache:" + formatBytes(key));
|
||||
TiRegion region = pdClient.getRegionByRawKey(ConcreteBackOffer.newGetBackOff(), key);
|
||||
if (!putRegion(region)) {
|
||||
throw new TiClientInternalException("Invalid Region: " + region.toString());
|
||||
}
|
||||
return region;
|
||||
}
|
||||
TiRegion region = regionCache.get(regionId);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
|
||||
}
|
||||
return region;
|
||||
}
|
||||
|
||||
private synchronized boolean putRegion(TiRegion region) {
|
||||
|
|
|
@ -43,8 +43,17 @@ public class TiRegion implements Serializable {
|
|||
|
||||
public TiRegion(
|
||||
Region meta, Peer peer, IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri) {
|
||||
this(meta, peer, isolationLevel, commandPri, false);
|
||||
}
|
||||
|
||||
public TiRegion(
|
||||
Region meta,
|
||||
Peer peer,
|
||||
IsolationLevel isolationLevel,
|
||||
Kvrpcpb.CommandPri commandPri,
|
||||
boolean isRawRegion) {
|
||||
Objects.requireNonNull(meta, "meta is null");
|
||||
this.meta = decodeRegion(meta);
|
||||
this.meta = decodeRegion(meta, isRawRegion);
|
||||
if (peer == null || peer.getId() == 0) {
|
||||
if (meta.getPeersCount() == 0) {
|
||||
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
|
||||
|
@ -58,21 +67,21 @@ public class TiRegion implements Serializable {
|
|||
this.commandPri = commandPri;
|
||||
}
|
||||
|
||||
private Region decodeRegion(Region region) {
|
||||
private Region decodeRegion(Region region, boolean isRawRegion) {
|
||||
Region.Builder builder =
|
||||
Region.newBuilder()
|
||||
.setId(region.getId())
|
||||
.setRegionEpoch(region.getRegionEpoch())
|
||||
.addAllPeers(region.getPeersList());
|
||||
|
||||
if (region.getStartKey().isEmpty()) {
|
||||
if (region.getStartKey().isEmpty() || isRawRegion) {
|
||||
builder.setStartKey(region.getStartKey());
|
||||
} else {
|
||||
byte[] decodecStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
|
||||
builder.setStartKey(ByteString.copyFrom(decodecStartKey));
|
||||
}
|
||||
|
||||
if (region.getEndKey().isEmpty()) {
|
||||
if (region.getEndKey().isEmpty() || isRawRegion) {
|
||||
builder.setEndKey(region.getEndKey());
|
||||
} else {
|
||||
byte[] decodecEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
|
||||
|
|
|
@ -12,7 +12,7 @@ import org.tikv.kvproto.Kvrpcpb;
|
|||
import org.tikv.util.FastByteComparisons;
|
||||
|
||||
public class RawKVClientTest {
|
||||
private static final String RAW_PREFIX = "raw_";
|
||||
private static final String RAW_PREFIX = "raw_\\u0001_";
|
||||
private static final int KEY_POOL_SIZE = 1000000;
|
||||
private static final int TEST_CASES = 10000;
|
||||
private static final int WORKER_CNT = 100;
|
||||
|
|
Loading…
Reference in New Issue