mirror of https://github.com/tikv/client-java.git
add kv mode KV/RAW (#9)
This commit is contained in:
parent
b11d513192
commit
eeced35f2c
|
|
@ -77,34 +77,27 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
|
||||
@Override
|
||||
public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
|
||||
CodecDataOutput cdo = new CodecDataOutput();
|
||||
BytesCodec.writeBytes(cdo, key.toByteArray());
|
||||
ByteString encodedKey = cdo.toByteString();
|
||||
|
||||
Supplier<GetRegionRequest> request =
|
||||
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(encodedKey).build();
|
||||
Supplier<GetRegionRequest> request;
|
||||
if (conf.getKvMode().equalsIgnoreCase("RAW")) {
|
||||
request = () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
|
||||
} else {
|
||||
CodecDataOutput cdo = new CodecDataOutput();
|
||||
BytesCodec.writeBytes(cdo, key.toByteArray());
|
||||
ByteString encodedKey = cdo.toByteString();
|
||||
request =
|
||||
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(encodedKey).build();
|
||||
}
|
||||
|
||||
PDErrorHandler<GetRegionResponse> handler =
|
||||
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
|
||||
|
||||
GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.METHOD_GET_REGION, request, handler);
|
||||
return new TiRegion(
|
||||
resp.getRegion(), resp.getLeader(), conf.getIsolationLevel(), conf.getCommandPriority());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TiRegion getRegionByRawKey(BackOffer backOffer, ByteString key) {
|
||||
Supplier<GetRegionRequest> request =
|
||||
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
|
||||
PDErrorHandler<GetRegionResponse> handler =
|
||||
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
|
||||
GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.METHOD_GET_REGION, request, handler);
|
||||
return new TiRegion(
|
||||
resp.getRegion(),
|
||||
resp.getLeader(),
|
||||
conf.getIsolationLevel(),
|
||||
conf.getCommandPriority(),
|
||||
true);
|
||||
conf.getKvMode());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -116,7 +109,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
resp.getRegion(),
|
||||
resp.getLeader(),
|
||||
conf.getIsolationLevel(),
|
||||
conf.getCommandPriority()));
|
||||
conf.getCommandPriority(),
|
||||
conf.getKvMode()));
|
||||
Supplier<GetRegionRequest> request =
|
||||
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
|
||||
|
||||
|
|
@ -138,7 +132,11 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
callWithRetry(backOffer, PDGrpc.METHOD_GET_REGION_BY_ID, request, handler);
|
||||
// Instead of using default leader instance, explicitly set no leader to null
|
||||
return new TiRegion(
|
||||
resp.getRegion(), resp.getLeader(), conf.getIsolationLevel(), conf.getCommandPriority());
|
||||
resp.getRegion(),
|
||||
resp.getLeader(),
|
||||
conf.getIsolationLevel(),
|
||||
conf.getCommandPriority(),
|
||||
conf.getKvMode());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -150,7 +148,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
resp.getRegion(),
|
||||
resp.getLeader(),
|
||||
conf.getIsolationLevel(),
|
||||
conf.getCommandPriority()));
|
||||
conf.getCommandPriority(),
|
||||
conf.getKvMode()));
|
||||
|
||||
Supplier<GetRegionByIDRequest> request =
|
||||
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ public class RawKVClient {
|
|||
private final RegionManager regionManager;
|
||||
|
||||
private RawKVClient(String addresses) {
|
||||
session = TiSession.create(TiConfiguration.createDefault(addresses));
|
||||
session = TiSession.create(TiConfiguration.createRawDefault(addresses));
|
||||
regionManager = session.getRegionManager();
|
||||
}
|
||||
|
||||
|
|
@ -41,7 +41,7 @@ public class RawKVClient {
|
|||
* @param value raw value
|
||||
*/
|
||||
public void put(ByteString key, ByteString value) {
|
||||
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByRawKey(key);
|
||||
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByKey(key);
|
||||
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
|
||||
client.rawPut(defaultBackOff(), key, value);
|
||||
}
|
||||
|
|
@ -54,7 +54,7 @@ public class RawKVClient {
|
|||
public void batchPut(List<Kvrpcpb.KvPair> kvPairs) {
|
||||
Map<Pair<TiRegion, Metapb.Store>, List<Kvrpcpb.KvPair>> regionMap = new HashMap<>();
|
||||
for (Kvrpcpb.KvPair kvPair : kvPairs) {
|
||||
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByRawKey(kvPair.getKey());
|
||||
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByKey(kvPair.getKey());
|
||||
regionMap.computeIfAbsent(pair, t -> new ArrayList<>()).add(kvPair);
|
||||
}
|
||||
|
||||
|
|
@ -81,7 +81,7 @@ public class RawKVClient {
|
|||
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
|
||||
*/
|
||||
public ByteString get(ByteString key) {
|
||||
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByRawKey(key);
|
||||
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByKey(key);
|
||||
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
|
||||
return client.rawGet(defaultBackOff(), key);
|
||||
}
|
||||
|
|
@ -120,14 +120,14 @@ public class RawKVClient {
|
|||
* @param key raw key to be deleted
|
||||
*/
|
||||
public void delete(ByteString key) {
|
||||
TiRegion region = regionManager.getRegionByRawKey(key);
|
||||
TiRegion region = regionManager.getRegionByKey(key);
|
||||
Kvrpcpb.Context context =
|
||||
Kvrpcpb.Context.newBuilder()
|
||||
.setRegionId(region.getId())
|
||||
.setRegionEpoch(region.getRegionEpoch())
|
||||
.setPeer(region.getLeader())
|
||||
.build();
|
||||
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByRawKey(key);
|
||||
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByKey(key);
|
||||
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
|
||||
client.rawDelete(defaultBackOff(), key, context);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,8 +39,6 @@ public interface ReadOnlyPDClient {
|
|||
*/
|
||||
TiRegion getRegionByKey(BackOffer backOffer, ByteString key);
|
||||
|
||||
TiRegion getRegionByRawKey(BackOffer backOffer, ByteString key);
|
||||
|
||||
Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key);
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ public class TiConfiguration implements Serializable {
|
|||
private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.RC;
|
||||
private static final boolean DEF_SHOW_ROWID = false;
|
||||
private static final String DEF_DB_PREFIX = "";
|
||||
private static final String DEF_KV_MODE = "KV";
|
||||
|
||||
private int timeout = DEF_TIMEOUT;
|
||||
private TimeUnit timeoutUnit = DEF_TIMEOUT_UNIT;
|
||||
|
|
@ -61,6 +62,7 @@ public class TiConfiguration implements Serializable {
|
|||
private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE;
|
||||
private boolean showRowId = DEF_SHOW_ROWID;
|
||||
private String dbPrefix = DEF_DB_PREFIX;
|
||||
private String kvMode = DEF_KV_MODE;
|
||||
|
||||
public static TiConfiguration createDefault(String pdAddrsStr) {
|
||||
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
|
||||
|
|
@ -69,6 +71,14 @@ public class TiConfiguration implements Serializable {
|
|||
return conf;
|
||||
}
|
||||
|
||||
public static TiConfiguration createRawDefault(String pdAddrsStr) {
|
||||
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
|
||||
TiConfiguration conf = new TiConfiguration();
|
||||
conf.pdAddrs = strToHostAndPort(pdAddrsStr);
|
||||
conf.kvMode = "RAW";
|
||||
return conf;
|
||||
}
|
||||
|
||||
private static List<HostAndPort> strToHostAndPort(String addressStr) {
|
||||
Objects.requireNonNull(addressStr);
|
||||
String[] addrs = addressStr.split(",");
|
||||
|
|
@ -216,4 +226,12 @@ public class TiConfiguration implements Serializable {
|
|||
public void setDBPrefix(String dbPrefix) {
|
||||
this.dbPrefix = dbPrefix;
|
||||
}
|
||||
|
||||
public String getKvMode() {
|
||||
return kvMode;
|
||||
}
|
||||
|
||||
public void setKvMode(String kvMode) {
|
||||
this.kvMode = kvMode;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ public class RawScanIterator extends ScanIterator {
|
|||
}
|
||||
|
||||
TiRegion loadCurrentRegionToCache() throws Exception {
|
||||
Pair<TiRegion, Metapb.Store> pair = regionCache.getRegionStorePairByRawKey(startKey);
|
||||
Pair<TiRegion, Metapb.Store> pair = regionCache.getRegionStorePairByKey(startKey);
|
||||
TiRegion region = pair.first;
|
||||
Metapb.Store store = pair.second;
|
||||
try (RegionStoreClient client = RegionStoreClient.create(region, store, session)) {
|
||||
|
|
|
|||
|
|
@ -88,27 +88,6 @@ public class RegionManager {
|
|||
return region;
|
||||
}
|
||||
|
||||
synchronized TiRegion getRegionByRawKey(ByteString key) {
|
||||
Long regionId;
|
||||
regionId = keyToRegionIdCache.get(Key.toRawKey(key));
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("getRegionByKey key[%s] -> ID[%s]", formatBytes(key), regionId));
|
||||
}
|
||||
if (regionId == null) {
|
||||
logger.debug("Key not find in keyToRegionIdCache:" + formatBytes(key));
|
||||
TiRegion region = pdClient.getRegionByRawKey(ConcreteBackOffer.newGetBackOff(), key);
|
||||
if (!putRegion(region)) {
|
||||
throw new TiClientInternalException("Invalid Region: " + region.toString());
|
||||
}
|
||||
return region;
|
||||
}
|
||||
TiRegion region = regionCache.get(regionId);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
|
||||
}
|
||||
return region;
|
||||
}
|
||||
|
||||
private synchronized boolean putRegion(TiRegion region) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("putRegion: " + region);
|
||||
|
|
@ -196,10 +175,6 @@ public class RegionManager {
|
|||
return cache.getRegionByKey(key);
|
||||
}
|
||||
|
||||
public TiRegion getRegionByRawKey(ByteString key) {
|
||||
return cache.getRegionByRawKey(key);
|
||||
}
|
||||
|
||||
public TiRegion getRegionById(long regionId) {
|
||||
return cache.getRegionById(regionId);
|
||||
}
|
||||
|
|
@ -217,19 +192,6 @@ public class RegionManager {
|
|||
return Pair.create(region, cache.getStoreById(storeId));
|
||||
}
|
||||
|
||||
public Pair<TiRegion, Store> getRegionStorePairByRawKey(ByteString key) {
|
||||
TiRegion region = cache.getRegionByRawKey(key);
|
||||
if (region == null) {
|
||||
throw new TiClientInternalException("Region not exist for key:" + formatBytes(key));
|
||||
}
|
||||
if (!region.isValid()) {
|
||||
throw new TiClientInternalException("Region invalid: " + region.toString());
|
||||
}
|
||||
Peer leader = region.getLeader();
|
||||
long storeId = leader.getStoreId();
|
||||
return Pair.create(region, cache.getStoreById(storeId));
|
||||
}
|
||||
|
||||
public Pair<TiRegion, Store> getRegionStorePairByRegionId(long id) {
|
||||
TiRegion region = cache.getRegionById(id);
|
||||
if (!region.isValid()) {
|
||||
|
|
|
|||
|
|
@ -41,19 +41,14 @@ public class TiRegion implements Serializable {
|
|||
private final IsolationLevel isolationLevel;
|
||||
private final Kvrpcpb.CommandPri commandPri;
|
||||
|
||||
public TiRegion(
|
||||
Region meta, Peer peer, IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri) {
|
||||
this(meta, peer, isolationLevel, commandPri, false);
|
||||
}
|
||||
|
||||
public TiRegion(
|
||||
Region meta,
|
||||
Peer peer,
|
||||
IsolationLevel isolationLevel,
|
||||
Kvrpcpb.CommandPri commandPri,
|
||||
boolean isRawRegion) {
|
||||
String kvMode) {
|
||||
Objects.requireNonNull(meta, "meta is null");
|
||||
this.meta = decodeRegion(meta, isRawRegion);
|
||||
this.meta = decodeRegion(meta, kvMode.equalsIgnoreCase("RAW"));
|
||||
if (peer == null || peer.getId() == 0) {
|
||||
if (meta.getPeersCount() == 0) {
|
||||
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
|
||||
|
|
|
|||
|
|
@ -39,7 +39,8 @@ public class MockServerTest {
|
|||
.addPeers(Metapb.Peer.newBuilder().setId(11).setStoreId(13))
|
||||
.build();
|
||||
|
||||
region = new TiRegion(r, r.getPeers(0), Kvrpcpb.IsolationLevel.RC, Kvrpcpb.CommandPri.Low);
|
||||
region =
|
||||
new TiRegion(r, r.getPeers(0), Kvrpcpb.IsolationLevel.RC, Kvrpcpb.CommandPri.Low, "KV");
|
||||
pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
|
||||
server = new KVMockServer();
|
||||
port = server.start(region);
|
||||
|
|
|
|||
Loading…
Reference in New Issue