Optimize and Fix minor bugs on master (#139)

This commit is contained in:
birdstorm 2021-03-11 21:25:40 +08:00 committed by GitHub
parent af7326c96c
commit 56331e2f98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1436 additions and 601 deletions

27
pom.xml
View File

@ -189,6 +189,29 @@
<version>3.9</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.10.0</version>
</dependency>
<!-- Hotspot JVM metrics-->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
<version>0.10.0</version>
</dependency>
<!-- Exposition HTTPServer-->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
<version>0.10.0</version>
</dependency>
<!-- Pushgateway exposition-->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_pushgateway</artifactId>
<version>0.10.0</version>
</dependency>
</dependencies>
<build>
<resources>
@ -411,6 +434,10 @@
<pattern>io.opencensus</pattern>
<shadedPattern>org.tikv.shade.io.opencensus</shadedPattern>
</relocation>
<relocation>
<pattern>io.prometheus</pattern>
<shadedPattern>org.tikv.shade.io.prometheus</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>

View File

@ -38,11 +38,13 @@ public abstract class AbstractGRPCClient<
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected final ChannelFactory channelFactory;
protected TiConfiguration conf;
protected long timeout;
protected BlockingStubT blockingStub;
protected StubT asyncStub;
protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) {
this.conf = conf;
this.timeout = conf.getTimeout();
this.channelFactory = channelFactory;
}
@ -52,6 +54,7 @@ public abstract class AbstractGRPCClient<
BlockingStubT blockingStub,
StubT asyncStub) {
this.conf = conf;
this.timeout = conf.getTimeout();
this.channelFactory = channelFactory;
this.blockingStub = blockingStub;
this.asyncStub = asyncStub;
@ -157,6 +160,14 @@ public abstract class AbstractGRPCClient<
return response;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public long getTimeout() {
return this.timeout;
}
protected abstract BlockingStubT getBlockingStub();
protected abstract StubT getAsyncStub();

View File

@ -0,0 +1,83 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common;
import org.tikv.kvproto.Kvrpcpb;
public class ConfigUtils {
public static final String TIKV_PD_ADDRESSES = "tikv.pd.addresses";
public static final String TIKV_GRPC_TIMEOUT = "tikv.grpc.timeout_in_ms";
public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms";
public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size";
public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size";
public static final String TIKV_INDEX_SCAN_BATCH_SIZE = "tikv.index.scan_batch_size";
public static final String TIKV_INDEX_SCAN_CONCURRENCY = "tikv.index.scan_concurrency";
public static final String TIKV_TABLE_SCAN_CONCURRENCY = "tikv.table.scan_concurrency";
public static final String TIKV_BATCH_GET_CONCURRENCY = "tikv.batch_get_concurrency";
public static final String TIKV_BATCH_PUT_CONCURRENCY = "tikv.batch_put_concurrency";
public static final String TIKV_BATCH_SCAN_CONCURRENCY = "tikv.batch_scan_concurrency";
public static final String TIKV_DELETE_RANGE_CONCURRENCY = "tikv.delete_range_concurrency";
public static final String TIKV_REQUEST_COMMAND_PRIORITY = "tikv.request.command.priority";
public static final String TIKV_REQUEST_ISOLATION_LEVEL = "tikv.request.isolation.level";
public static final String TIKV_SHOW_ROWID = "tikv.show_rowid";
public static final String TIKV_DB_PREFIX = "tikv.db_prefix";
public static final String TIKV_KV_CLIENT_CONCURRENCY = "tikv.kv_client_concurrency";
public static final String TIKV_KV_MODE = "tikv.kv_mode";
public static final String TIKV_IS_REPLICA_READ = "tikv.is_replica_read";
public static final String TIKV_METRICS_ENABLE = "tikv.metrics.enable";
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_SCAN_BATCH_SIZE = 10240;
public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB
public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000;
public static final int DEF_REGION_SCAN_DOWNGRADE_THRESHOLD = 10000000;
// if keyRange size per request exceeds this limit, the request might be too large to be accepted
// by TiKV(maximum request size accepted by TiKV is around 1MB)
public static final int MAX_REQUEST_KEY_RANGE_SIZE = 20000;
public static final int DEF_INDEX_SCAN_CONCURRENCY = 5;
public static final int DEF_TABLE_SCAN_CONCURRENCY = 512;
public static final int DEF_BATCH_GET_CONCURRENCY = 20;
public static final int DEF_BATCH_PUT_CONCURRENCY = 20;
public static final int DEF_BATCH_SCAN_CONCURRENCY = 5;
public static final int DEF_DELETE_RANGE_CONCURRENCY = 20;
public static final Kvrpcpb.CommandPri DEF_COMMAND_PRIORITY = Kvrpcpb.CommandPri.Low;
public static final Kvrpcpb.IsolationLevel DEF_ISOLATION_LEVEL = Kvrpcpb.IsolationLevel.SI;
public static final boolean DEF_SHOW_ROWID = false;
public static final String DEF_DB_PREFIX = "";
public static final int DEF_KV_CLIENT_CONCURRENCY = 10;
public static final TiConfiguration.KVMode DEF_KV_MODE = TiConfiguration.KVMode.TXN;
public static final boolean DEF_IS_REPLICA_READ = false;
public static final boolean DEF_METRICS_ENABLE = false;
public static final int DEF_METRICS_PORT = 3140;
public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
public static final String HIGH_COMMAND_PRIORITY = "HIGH";
public static final String SNAPSHOT_ISOLATION_LEVEL = "SI";
public static final String READ_COMMITTED_ISOLATION_LEVEL = "RC";
public static final String RAW_KV_MODE = "RAW";
public static final String TXN_KV_MODE = "TXN";
}

View File

@ -17,8 +17,7 @@
package org.tikv.common;
import static org.tikv.common.util.ClientUtils.appendBatches;
import static org.tikv.common.util.ClientUtils.getKvPairs;
import static org.tikv.common.util.ClientUtils.*;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
@ -28,7 +27,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.exception.GrpcException;
@ -45,6 +43,7 @@ import org.tikv.kvproto.Kvrpcpb.KvPair;
public class KVClient implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(KVClient.class);
private static final int MAX_BATCH_LIMIT = 1024;
private static final int BATCH_GET_SIZE = 16 * 1024;
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
@ -129,11 +128,12 @@ public class KVClient implements AutoCloseable {
ExecutorCompletionService<List<KvPair>> completionService =
new ExecutorCompletionService<>(batchGetThreadPool);
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(keys);
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), keys, backOffer);
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE, MAX_BATCH_LIMIT);
}
for (Batch batch : batches) {
@ -170,11 +170,13 @@ public class KVClient implements AutoCloseable {
private List<KvPair> doSendBatchGetWithRefetchRegion(
BackOffer backOffer, Batch batch, long version) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(batch.keys);
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), batch.keys, backOffer);
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(retryBatches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
appendBatches(
retryBatches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE, MAX_BATCH_LIMIT);
}
ArrayList<KvPair> results = new ArrayList<>();
@ -186,17 +188,6 @@ public class KVClient implements AutoCloseable {
return results;
}
/**
* Group by list of keys according to its region
*
* @param keys keys
* @return a mapping of keys and their region
*/
private Map<TiRegion, List<ByteString>> groupKeysByRegion(List<ByteString> keys) {
return keys.stream()
.collect(Collectors.groupingBy(clientBuilder.getRegionManager()::getRegionByKey));
}
private Iterator<KvPair> scanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,

View File

@ -28,6 +28,7 @@ import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import io.grpc.ManagedChannel;
import io.prometheus.client.Histogram;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
@ -96,6 +97,12 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
private Client etcdClient;
private ConcurrentMap<Long, Double> tiflashReplicaMap;
public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY =
Histogram.build()
.name("client_java_pd_get_region_by_requests_latency")
.help("pd getRegionByKey request latency.")
.register();
private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
super(conf, channelFactory);
initCluster();
@ -208,28 +215,33 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
@Override
public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
if (conf.getKvMode() == KVMode.TXN) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key.toByteArray());
key = cdo.toByteString();
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
try {
if (conf.getKvMode() == KVMode.TXN) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key.toByteArray());
key = cdo.toByteString();
}
ByteString queryKey = key;
Supplier<GetRegionRequest> request =
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(queryKey).build();
PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead());
} finally {
requestTimer.observeDuration();
}
ByteString queryKey = key;
Supplier<GetRegionRequest> request =
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(queryKey).build();
PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead());
}
@Override
@ -498,9 +510,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
if (leaderWrapper == null) {
throw new GrpcException("PDClient may not be initialized");
}
return leaderWrapper
.getBlockingStub()
.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit());
return leaderWrapper.getBlockingStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -508,9 +518,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
if (leaderWrapper == null) {
throw new GrpcException("PDClient may not be initialized");
}
return leaderWrapper
.getAsyncStub()
.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit());
return leaderWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}
private void initCluster() {

View File

@ -15,79 +15,226 @@
package org.tikv.common;
import static org.tikv.common.ConfigUtils.*;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.pd.PDUtils;
import org.tikv.kvproto.Kvrpcpb.CommandPri;
import org.tikv.kvproto.Kvrpcpb.IsolationLevel;
public class TiConfiguration implements Serializable {
private static final int DEF_TIMEOUT = 10;
private static final TimeUnit DEF_TIMEOUT_UNIT = TimeUnit.MINUTES;
private static final int DEF_SCAN_BATCH_SIZE = 100;
private static final boolean DEF_IGNORE_TRUNCATE = true;
private static final boolean DEF_TRUNCATE_AS_WARNING = false;
private static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB
private static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000;
private static final int DEF_REGION_SCAN_DOWNGRADE_THRESHOLD = 10000000;
// if keyRange size per request exceeds this limit, the request might be too large to be accepted
// by TiKV(maximum request size accepted by TiKV is around 1MB)
private static final int MAX_REQUEST_KEY_RANGE_SIZE = 20000;
private static final int DEF_INDEX_SCAN_CONCURRENCY = 5;
private static final int DEF_TABLE_SCAN_CONCURRENCY = 512;
private static final int DEF_BATCH_GET_CONCURRENCY = 20;
private static final int DEF_BATCH_PUT_CONCURRENCY = 20;
private static final int DEF_BATCH_SCAN_CONCURRENCY = 5;
private static final int DEF_DELETE_RANGE_CONCURRENCY = 20;
private static final CommandPri DEF_COMMAND_PRIORITY = CommandPri.Low;
private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.SI;
private static final boolean DEF_SHOW_ROWID = false;
private static final String DEF_DB_PREFIX = "";
private static final boolean DEF_WRITE_ENABLE = true;
private static final boolean DEF_WRITE_ALLOW_SPARK_SQL = false;
private static final boolean DEF_WRITE_WITHOUT_LOCK_TABLE = false;
private static final int DEF_TIKV_REGION_SPLIT_SIZE_IN_MB = 96;
private static final int DEF_PARTITION_PER_SPLIT = 1;
private static final int DEF_KV_CLIENT_CONCURRENCY = 10;
private static final KVMode DEF_KV_MODE = KVMode.TXN;
private static final int DEF_RAW_CLIENT_CONCURRENCY = 200;
private static final boolean DEF_IS_REPLICA_READ = false;
private int timeout = DEF_TIMEOUT;
private TimeUnit timeoutUnit = DEF_TIMEOUT_UNIT;
private boolean ignoreTruncate = DEF_IGNORE_TRUNCATE;
private boolean truncateAsWarning = DEF_TRUNCATE_AS_WARNING;
private int maxFrameSize = DEF_MAX_FRAME_SIZE;
private List<URI> pdAddrs = new ArrayList<>();
private int indexScanBatchSize = DEF_INDEX_SCAN_BATCH_SIZE;
private int downgradeThreshold = DEF_REGION_SCAN_DOWNGRADE_THRESHOLD;
private int indexScanConcurrency = DEF_INDEX_SCAN_CONCURRENCY;
private int tableScanConcurrency = DEF_TABLE_SCAN_CONCURRENCY;
private int batchGetConcurrency = DEF_BATCH_GET_CONCURRENCY;
private int batchPutConcurrency = DEF_BATCH_PUT_CONCURRENCY;
private int batchScanConcurrency = DEF_BATCH_SCAN_CONCURRENCY;
private int deleteRangeConcurrency = DEF_DELETE_RANGE_CONCURRENCY;
private CommandPri commandPriority = DEF_COMMAND_PRIORITY;
private IsolationLevel isolationLevel = DEF_ISOLATION_LEVEL;
private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE;
private boolean showRowId = DEF_SHOW_ROWID;
private String dbPrefix = DEF_DB_PREFIX;
private KVMode kvMode = DEF_KV_MODE;
private int rawClientConcurrency = DEF_RAW_CLIENT_CONCURRENCY;
private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
private boolean writeAllowSparkSQL = DEF_WRITE_ALLOW_SPARK_SQL;
private boolean writeEnable = DEF_WRITE_ENABLE;
private boolean writeWithoutLockTable = DEF_WRITE_WITHOUT_LOCK_TABLE;
private int tikvRegionSplitSizeInMB = DEF_TIKV_REGION_SPLIT_SIZE_IN_MB;
private int partitionPerSplit = DEF_PARTITION_PER_SPLIT;
static {
loadFromSystemProperties();
loadFromDefaultProperties();
}
private int kvClientConcurrency = DEF_KV_CLIENT_CONCURRENCY;
private boolean isReplicaRead = DEF_IS_REPLICA_READ;
private static void loadFromSystemProperties() {
for (Map.Entry<String, String> prop : Utils.getSystemProperties().entrySet()) {
if (prop.getKey().startsWith("tikv.")) {
set(prop.getKey(), prop.getValue());
}
}
}
private static void loadFromDefaultProperties() {
setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE);
setIfMissing(TIKV_GRPC_MAX_FRAME_SIZE, DEF_MAX_FRAME_SIZE);
setIfMissing(TIKV_INDEX_SCAN_BATCH_SIZE, DEF_INDEX_SCAN_BATCH_SIZE);
setIfMissing(TIKV_INDEX_SCAN_CONCURRENCY, DEF_INDEX_SCAN_CONCURRENCY);
setIfMissing(TIKV_TABLE_SCAN_CONCURRENCY, DEF_TABLE_SCAN_CONCURRENCY);
setIfMissing(TIKV_BATCH_GET_CONCURRENCY, DEF_BATCH_GET_CONCURRENCY);
setIfMissing(TIKV_BATCH_PUT_CONCURRENCY, DEF_BATCH_PUT_CONCURRENCY);
setIfMissing(TIKV_BATCH_SCAN_CONCURRENCY, DEF_BATCH_SCAN_CONCURRENCY);
setIfMissing(TIKV_DELETE_RANGE_CONCURRENCY, DEF_DELETE_RANGE_CONCURRENCY);
setIfMissing(TIKV_REQUEST_COMMAND_PRIORITY, LOW_COMMAND_PRIORITY);
setIfMissing(TIKV_REQUEST_ISOLATION_LEVEL, SNAPSHOT_ISOLATION_LEVEL);
setIfMissing(TIKV_REQUEST_ISOLATION_LEVEL, SNAPSHOT_ISOLATION_LEVEL);
setIfMissing(TIKV_SHOW_ROWID, DEF_SHOW_ROWID);
setIfMissing(TIKV_DB_PREFIX, DEF_DB_PREFIX);
setIfMissing(TIKV_DB_PREFIX, DEF_DB_PREFIX);
setIfMissing(TIKV_KV_CLIENT_CONCURRENCY, DEF_KV_CLIENT_CONCURRENCY);
setIfMissing(TIKV_KV_MODE, TXN_KV_MODE);
setIfMissing(TIKV_IS_REPLICA_READ, DEF_IS_REPLICA_READ);
setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE);
setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT);
}
public static void listAll() {
logger.info(new ArrayList<>(settings.entrySet()).toString());
}
private static void set(String key, String value) {
if (key == null) {
throw new NullPointerException("null key");
}
if (value == null) {
throw new NullPointerException("null value for " + key);
}
settings.put(key, value);
}
private static void setIfMissing(String key, int value) {
setIfMissing(key, String.valueOf(value));
}
private static void setIfMissing(String key, boolean value) {
setIfMissing(key, String.valueOf(value));
}
private static void setIfMissing(String key, String value) {
if (key == null) {
throw new NullPointerException("null key");
}
if (value == null) {
throw new NullPointerException("null value for " + key);
}
settings.putIfAbsent(key, value);
}
private static Optional<String> getOption(String key) {
return Optional.ofNullable(settings.get(key));
}
private static String get(String key) {
Optional<String> option = getOption(key);
if (!option.isPresent()) {
throw new NoSuchElementException(key);
}
return option.get();
}
private static int getInt(String key) {
return Integer.parseInt(get(key));
}
private static int getInt(String key, int defaultValue) {
try {
return getOption(key).map(Integer::parseInt).orElse(defaultValue);
} catch (NumberFormatException e) {
return defaultValue;
}
}
private static long getLong(String key) {
return Long.parseLong(get(key));
}
private static long getLong(String key, long defaultValue) {
try {
return getOption(key).map(Long::parseLong).orElse(defaultValue);
} catch (NumberFormatException e) {
return defaultValue;
}
}
private static double getDouble(String key) {
return Double.parseDouble(get(key));
}
private static double getDouble(String key, double defaultValue) {
try {
return getOption(key).map(Double::parseDouble).orElse(defaultValue);
} catch (NumberFormatException e) {
return defaultValue;
}
}
private static boolean getBoolean(String key) {
return Boolean.parseBoolean(get(key));
}
private static boolean getBoolean(String key, boolean defaultValue) {
try {
return getOption(key).map(Boolean::parseBoolean).orElse(defaultValue);
} catch (NumberFormatException e) {
return defaultValue;
}
}
private static Long getTimeAsMs(String key) {
return Utils.timeStringAsMs(get(key));
}
private static Long getTimeAsSeconds(String key) {
return Utils.timeStringAsSec(get(key));
}
private static List<URI> getPdAddrs(String key) {
Optional<String> pdAddrs = getOption(key);
if (pdAddrs.isPresent()) {
return strToURI(pdAddrs.get());
} else {
return new ArrayList<>();
}
}
private static CommandPri getCommandPri(String key) {
String priority = get(key).toUpperCase(Locale.ROOT);
switch (priority) {
case NORMAL_COMMAND_PRIORITY:
return CommandPri.Normal;
case LOW_COMMAND_PRIORITY:
return CommandPri.Low;
case HIGH_COMMAND_PRIORITY:
return CommandPri.High;
default:
return CommandPri.UNRECOGNIZED;
}
}
private static IsolationLevel getIsolationLevel(String key) {
String isolationLevel = get(key).toUpperCase(Locale.ROOT);
switch (isolationLevel) {
case READ_COMMITTED_ISOLATION_LEVEL:
return IsolationLevel.RC;
case SNAPSHOT_ISOLATION_LEVEL:
return IsolationLevel.SI;
default:
return IsolationLevel.UNRECOGNIZED;
}
}
private static KVMode getKvMode(String key) {
if (get(key).toUpperCase(Locale.ROOT).equals(RAW_KV_MODE)) {
return KVMode.RAW;
} else {
return KVMode.TXN;
}
}
private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT);
private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT);
private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE);
private List<URI> pdAddrs = getPdAddrs(TIKV_PD_ADDRESSES);
private int indexScanBatchSize = getInt(TIKV_INDEX_SCAN_BATCH_SIZE);
private int indexScanConcurrency = getInt(TIKV_INDEX_SCAN_CONCURRENCY);
private int tableScanConcurrency = getInt(TIKV_TABLE_SCAN_CONCURRENCY);
private int batchGetConcurrency = getInt(TIKV_BATCH_GET_CONCURRENCY);
private int batchPutConcurrency = getInt(TIKV_BATCH_PUT_CONCURRENCY);
private int batchScanConcurrency = getInt(TIKV_BATCH_SCAN_CONCURRENCY);
private int deleteRangeConcurrency = getInt(TIKV_DELETE_RANGE_CONCURRENCY);
private CommandPri commandPriority = getCommandPri(TIKV_REQUEST_COMMAND_PRIORITY);
private IsolationLevel isolationLevel = getIsolationLevel(TIKV_REQUEST_ISOLATION_LEVEL);
private boolean showRowId = getBoolean(TIKV_SHOW_ROWID);
private String dbPrefix = get(TIKV_DB_PREFIX);
private KVMode kvMode = getKvMode(TIKV_KV_MODE);
private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY);
private boolean isReplicaRead = getBoolean(TIKV_IS_REPLICA_READ);
private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
private int metricsPort = getInt(TIKV_METRICS_PORT);
public enum KVMode {
TXN,
@ -129,21 +276,21 @@ public class TiConfiguration implements Serializable {
return sb.toString();
}
public int getTimeout() {
public long getTimeout() {
return timeout;
}
public TiConfiguration setTimeout(int timeout) {
public TiConfiguration setTimeout(long timeout) {
this.timeout = timeout;
return this;
}
public TimeUnit getTimeoutUnit() {
return timeoutUnit;
public long getScanTimeout() {
return scanTimeout;
}
public TiConfiguration setTimeoutUnit(TimeUnit timeoutUnit) {
this.timeoutUnit = timeoutUnit;
public TiConfiguration setScanTimeout(long scanTimeout) {
this.scanTimeout = scanTimeout;
return this;
}
@ -159,24 +306,6 @@ public class TiConfiguration implements Serializable {
return DEF_SCAN_BATCH_SIZE;
}
boolean isIgnoreTruncate() {
return ignoreTruncate;
}
public TiConfiguration setIgnoreTruncate(boolean ignoreTruncate) {
this.ignoreTruncate = ignoreTruncate;
return this;
}
boolean isTruncateAsWarning() {
return truncateAsWarning;
}
public TiConfiguration setTruncateAsWarning(boolean truncateAsWarning) {
this.truncateAsWarning = truncateAsWarning;
return this;
}
public int getMaxFrameSize() {
return maxFrameSize;
}
@ -190,178 +319,143 @@ public class TiConfiguration implements Serializable {
return indexScanBatchSize;
}
public void setIndexScanBatchSize(int indexScanBatchSize) {
public TiConfiguration setIndexScanBatchSize(int indexScanBatchSize) {
this.indexScanBatchSize = indexScanBatchSize;
return this;
}
public int getIndexScanConcurrency() {
return indexScanConcurrency;
}
public void setIndexScanConcurrency(int indexScanConcurrency) {
public TiConfiguration setIndexScanConcurrency(int indexScanConcurrency) {
this.indexScanConcurrency = indexScanConcurrency;
return this;
}
public int getTableScanConcurrency() {
return tableScanConcurrency;
}
public void setTableScanConcurrency(int tableScanConcurrency) {
public TiConfiguration setTableScanConcurrency(int tableScanConcurrency) {
this.tableScanConcurrency = tableScanConcurrency;
return this;
}
public int getBatchGetConcurrency() {
return batchGetConcurrency;
}
public void setBatchGetConcurrency(int batchGetConcurrency) {
public TiConfiguration setBatchGetConcurrency(int batchGetConcurrency) {
this.batchGetConcurrency = batchGetConcurrency;
return this;
}
public int getBatchPutConcurrency() {
return batchPutConcurrency;
}
public void setBatchPutConcurrency(int batchPutConcurrency) {
public TiConfiguration setBatchPutConcurrency(int batchPutConcurrency) {
this.batchPutConcurrency = batchPutConcurrency;
return this;
}
public int getBatchScanConcurrency() {
return batchScanConcurrency;
}
public void setBatchScanConcurrency(int batchScanConcurrency) {
public TiConfiguration setBatchScanConcurrency(int batchScanConcurrency) {
this.batchScanConcurrency = batchScanConcurrency;
return this;
}
public int getDeleteRangeConcurrency() {
return deleteRangeConcurrency;
}
public void setDeleteRangeConcurrency(int deleteRangeConcurrency) {
public TiConfiguration setDeleteRangeConcurrency(int deleteRangeConcurrency) {
this.deleteRangeConcurrency = deleteRangeConcurrency;
return this;
}
public CommandPri getCommandPriority() {
return commandPriority;
}
public void setCommandPriority(CommandPri commandPriority) {
public TiConfiguration setCommandPriority(CommandPri commandPriority) {
this.commandPriority = commandPriority;
return this;
}
public IsolationLevel getIsolationLevel() {
return isolationLevel;
}
public void setIsolationLevel(IsolationLevel isolationLevel) {
public TiConfiguration setIsolationLevel(IsolationLevel isolationLevel) {
this.isolationLevel = isolationLevel;
}
public int getMaxRequestKeyRangeSize() {
return maxRequestKeyRangeSize;
}
public void setMaxRequestKeyRangeSize(int maxRequestKeyRangeSize) {
if (maxRequestKeyRangeSize <= 0) {
throw new IllegalArgumentException("Key range size cannot be less than 1");
}
this.maxRequestKeyRangeSize = maxRequestKeyRangeSize;
}
public void setShowRowId(boolean flag) {
this.showRowId = flag;
return this;
}
public boolean ifShowRowId() {
return showRowId;
}
public TiConfiguration setShowRowId(boolean flag) {
this.showRowId = flag;
return this;
}
public String getDBPrefix() {
return dbPrefix;
}
public void setDBPrefix(String dbPrefix) {
public TiConfiguration setDBPrefix(String dbPrefix) {
this.dbPrefix = dbPrefix;
}
public boolean isWriteEnable() {
return writeEnable;
}
public void setWriteEnable(boolean writeEnable) {
this.writeEnable = writeEnable;
}
public boolean isWriteWithoutLockTable() {
return writeWithoutLockTable;
}
public void setWriteWithoutLockTable(boolean writeWithoutLockTable) {
this.writeWithoutLockTable = writeWithoutLockTable;
}
public boolean isWriteAllowSparkSQL() {
return writeAllowSparkSQL;
}
public void setWriteAllowSparkSQL(boolean writeAllowSparkSQL) {
this.writeAllowSparkSQL = writeAllowSparkSQL;
}
public int getTikvRegionSplitSizeInMB() {
return tikvRegionSplitSizeInMB;
}
public void setTikvRegionSplitSizeInMB(int tikvRegionSplitSizeInMB) {
this.tikvRegionSplitSizeInMB = tikvRegionSplitSizeInMB;
}
public int getDowngradeThreshold() {
return downgradeThreshold;
}
public void setDowngradeThreshold(int downgradeThreshold) {
this.downgradeThreshold = downgradeThreshold;
}
public int getPartitionPerSplit() {
return partitionPerSplit;
}
public void setPartitionPerSplit(int partitionPerSplit) {
this.partitionPerSplit = partitionPerSplit;
return this;
}
public KVMode getKvMode() {
return kvMode;
}
public void setKvMode(String kvMode) {
public TiConfiguration setKvMode(String kvMode) {
this.kvMode = KVMode.valueOf(kvMode);
}
public int getRawClientConcurrency() {
return rawClientConcurrency;
}
public void setRawClientConcurrency(int rawClientConcurrency) {
this.rawClientConcurrency = rawClientConcurrency;
return this;
}
public int getKvClientConcurrency() {
return kvClientConcurrency;
}
public void setKvClientConcurrency(int kvClientConcurrency) {
public TiConfiguration setKvClientConcurrency(int kvClientConcurrency) {
this.kvClientConcurrency = kvClientConcurrency;
return this;
}
public boolean isReplicaRead() {
return isReplicaRead;
}
public void setReplicaRead(boolean isReplicaRead) {
public TiConfiguration setReplicaRead(boolean isReplicaRead) {
this.isReplicaRead = isReplicaRead;
return this;
}
public boolean isMetricsEnable() {
return metricsEnable;
}
public TiConfiguration setMetricsEnable(boolean metricsEnable) {
this.metricsEnable = metricsEnable;
return this;
}
public int getMetricsPort() {
return metricsPort;
}
public TiConfiguration setMetricsPort(int metricsPort) {
this.metricsPort = metricsPort;
return this;
}
}

View File

@ -15,9 +15,14 @@
package org.tikv.common;
import static org.tikv.common.util.ClientUtils.groupKeysByRegion;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -33,6 +38,7 @@ import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.policy.RetryPolicy;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
@ -66,11 +72,33 @@ public class TiSession implements AutoCloseable {
private volatile RegionManager regionManager;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private boolean isClosed = false;
private HTTPServer server;
private CollectorRegistry collectorRegistry;
public TiSession(TiConfiguration conf) {
this.conf = conf;
this.channelFactory = new ChannelFactory(conf.getMaxFrameSize());
this.client = PDClient.createRaw(conf, channelFactory);
if (conf.isMetricsEnable()) {
try {
this.collectorRegistry = new CollectorRegistry();
this.collectorRegistry.register(RawKVClient.RAW_REQUEST_LATENCY);
this.collectorRegistry.register(RawKVClient.RAW_REQUEST_FAILURE);
this.collectorRegistry.register(RawKVClient.RAW_REQUEST_SUCCESS);
this.collectorRegistry.register(RegionStoreClient.GRPC_RAW_REQUEST_LATENCY);
this.collectorRegistry.register(RetryPolicy.GRPC_SINGLE_REQUEST_LATENCY);
this.collectorRegistry.register(RegionManager.GET_REGION_BY_KEY_REQUEST_LATENCY);
this.collectorRegistry.register(PDClient.PD_GET_REGION_BY_KEY_REQUEST_LATENCY);
this.server =
new HTTPServer(
new InetSocketAddress(conf.getMetricsPort()), this.collectorRegistry, true);
logger.info("http server is up " + this.server.getPort());
} catch (Exception e) {
logger.error("http server not up");
throw new RuntimeException(e);
}
}
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
}
@VisibleForTesting
@ -221,7 +249,10 @@ public class TiSession implements AutoCloseable {
batchPutThreadPool =
Executors.newFixedThreadPool(
conf.getBatchPutConcurrency(),
new ThreadFactoryBuilder().setDaemon(true).build());
new ThreadFactoryBuilder()
.setNameFormat("batchPut-thread-%d")
.setDaemon(true)
.build());
}
res = batchPutThreadPool;
}
@ -237,7 +268,10 @@ public class TiSession implements AutoCloseable {
batchGetThreadPool =
Executors.newFixedThreadPool(
conf.getBatchGetConcurrency(),
new ThreadFactoryBuilder().setDaemon(true).build());
new ThreadFactoryBuilder()
.setNameFormat("batchGet-thread-%d")
.setDaemon(true)
.build());
}
res = batchGetThreadPool;
}
@ -253,7 +287,10 @@ public class TiSession implements AutoCloseable {
batchScanThreadPool =
Executors.newFixedThreadPool(
conf.getBatchScanConcurrency(),
new ThreadFactoryBuilder().setDaemon(true).build());
new ThreadFactoryBuilder()
.setNameFormat("batchScan-thread-%d")
.setDaemon(true)
.build());
}
res = batchScanThreadPool;
}
@ -269,7 +306,10 @@ public class TiSession implements AutoCloseable {
deleteRangeThreadPool =
Executors.newFixedThreadPool(
conf.getDeleteRangeConcurrency(),
new ThreadFactoryBuilder().setDaemon(true).build());
new ThreadFactoryBuilder()
.setNameFormat("deleteRange-thread-%d")
.setDaemon(true)
.build());
}
res = deleteRangeThreadPool;
}
@ -282,6 +322,10 @@ public class TiSession implements AutoCloseable {
return channelFactory;
}
public CollectorRegistry getCollectorRegistry() {
return collectorRegistry;
}
/**
* This is used for setting call back function to invalidate cache information
*
@ -347,7 +391,8 @@ public class TiSession implements AutoCloseable {
private List<TiRegion> splitRegion(List<ByteString> splitKeys, BackOffer backOffer) {
List<TiRegion> regions = new ArrayList<>();
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(splitKeys);
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(regionManager, splitKeys, backOffer);
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
Pair<TiRegion, Metapb.Store> pair =
@ -385,11 +430,6 @@ public class TiSession implements AutoCloseable {
return regions;
}
private Map<TiRegion, List<ByteString>> groupKeysByRegion(List<ByteString> keys) {
return keys.stream()
.collect(Collectors.groupingBy(clientBuilder.getRegionManager()::getRegionByKey));
}
@Override
public synchronized void close() throws Exception {
if (isClosed) {
@ -397,6 +437,11 @@ public class TiSession implements AutoCloseable {
return;
}
if (server != null) {
server.stop();
logger.info("Metrics server on " + server.getPort() + " is stopped");
}
isClosed = true;
synchronized (sessionCachedMap) {
sessionCachedMap.remove(conf.getPdAddrsString());
@ -408,10 +453,17 @@ public class TiSession implements AutoCloseable {
if (indexScanThreadPool != null) {
indexScanThreadPool.shutdownNow();
}
if (regionManager != null) {
if (logger.isDebugEnabled()) {
logger.debug("region cache miss rate: " + getRegionManager().cacheMiss());
}
if (batchGetThreadPool != null) {
batchGetThreadPool.shutdownNow();
}
if (batchPutThreadPool != null) {
batchPutThreadPool.shutdownNow();
}
if (batchScanThreadPool != null) {
batchScanThreadPool.shutdownNow();
}
if (deleteRangeThreadPool != null) {
deleteRangeThreadPool.shutdownNow();
}
if (client != null) {
getPDClient().close();

View File

@ -0,0 +1,91 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common;
import com.google.common.collect.ImmutableMap;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class Utils {
// org.apache.spark.network.util.JavaUtils
private static final ImmutableMap<String, TimeUnit> timeSuffixes =
ImmutableMap.<String, TimeUnit>builder()
.put("us", TimeUnit.MICROSECONDS)
.put("ms", TimeUnit.MILLISECONDS)
.put("s", TimeUnit.SECONDS)
.put("m", TimeUnit.MINUTES)
.put("min", TimeUnit.MINUTES)
.put("h", TimeUnit.HOURS)
.put("d", TimeUnit.DAYS)
.build();
public static ConcurrentHashMap<String, String> getSystemProperties() {
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
System.getProperties()
.stringPropertyNames()
.forEach(key -> map.put(key, System.getProperty(key)));
return map;
}
public static void setSystemProperties(ConcurrentHashMap<String, String> settings) {
Properties prop = new Properties();
settings.forEach(prop::setProperty);
System.setProperties(prop);
}
// org.apache.spark.network.util.JavaUtils
public static long timeStringAs(String str, TimeUnit unit) {
String lower = str.toLowerCase(Locale.ROOT).trim();
try {
Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
if (!m.matches()) {
throw new NumberFormatException("Failed to parse time string: " + str);
}
long val = Long.parseLong(m.group(1));
String suffix = m.group(2);
// Check for invalid suffixes
if (suffix != null && !timeSuffixes.containsKey(suffix)) {
throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
}
// If suffix is valid use that, otherwise none was provided and use the default passed
return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
} catch (NumberFormatException e) {
String timeError =
"Time must be specified as seconds (s), "
+ "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). "
+ "E.g. 50s, 100ms, or 250us.";
throw new NumberFormatException(timeError + "\n" + e.getMessage());
}
}
public static long timeStringAsMs(String str) {
return timeStringAs(str, TimeUnit.MILLISECONDS);
}
public static long timeStringAsSec(String str) {
return timeStringAs(str, TimeUnit.SECONDS);
}
}

View File

@ -172,6 +172,8 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
String.format(
"Received zero store id, from region %d try next time", ctxRegion.getId()));
backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss;
this.regionManager.invalidateRegion(ctxRegion.getId());
}
backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString()));

View File

@ -72,6 +72,7 @@ public class ConcreteScanIterator extends ScanIterator {
TiRegion loadCurrentRegionToCache() throws GrpcException {
TiRegion region;
try (RegionStoreClient client = builder.build(startKey)) {
client.setTimeout(conf.getScanTimeout());
region = client.getRegion();
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
currentCache = client.scan(backOffer, startKey, version);

View File

@ -44,6 +44,7 @@ public class RawScanIterator extends ScanIterator {
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
while (true) {
try (RegionStoreClient client = builder.build(startKey)) {
client.setTimeout(conf.getScanTimeout());
TiRegion region = client.getRegion();
if (limit <= 0) {
currentCache = null;

View File

@ -25,7 +25,7 @@ public class RetryMaxMs<T> extends RetryPolicy<T> {
}
public static class Builder<T> implements RetryPolicy.Builder<T> {
private BackOffer backOffer;
private final BackOffer backOffer;
public Builder(BackOffer backOffer) {
this.backOffer = backOffer;

View File

@ -17,6 +17,7 @@ package org.tikv.common.policy;
import com.google.common.collect.ImmutableSet;
import io.grpc.Status;
import io.prometheus.client.Histogram;
import java.util.concurrent.Callable;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.operation.ErrorHandler;
@ -25,6 +26,11 @@ import org.tikv.common.util.ConcreteBackOffer;
public abstract class RetryPolicy<RespT> {
BackOffer backOffer = ConcreteBackOffer.newCopNextMaxBackOff();
public static final Histogram GRPC_SINGLE_REQUEST_LATENCY =
Histogram.build()
.name("client_java_grpc_single_requests_latency")
.help("grpc request latency.")
.register();
// handles PD and TiKV's error.
private ErrorHandler<RespT> handler;
@ -51,7 +57,13 @@ public abstract class RetryPolicy<RespT> {
while (true) {
RespT result = null;
try {
result = proc.call();
// add single request duration histogram
Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.startTimer();
try {
result = proc.call();
} finally {
requestTimer.observeDuration();
}
} catch (Exception e) {
rethrowNotRecoverableException(e);
// Handle request call error
@ -65,6 +77,7 @@ public abstract class RetryPolicy<RespT> {
if (handler != null) {
boolean retry = handler.handleResponseError(backOffer, result);
if (retry) {
// add retry counter
continue;
}
}

View File

@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.ManagedChannel;
import java.util.concurrent.TimeUnit;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
@ -56,12 +57,12 @@ public abstract class AbstractRegionStoreClient
@Override
protected TikvGrpc.TikvBlockingStub getBlockingStub() {
return blockingStub.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit());
return blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}
@Override
protected TikvGrpc.TikvStub getAsyncStub() {
return asyncStub.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit());
return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}
@Override

View File

@ -23,6 +23,7 @@ import static org.tikv.common.util.KeyRangeUtils.makeRange;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import com.google.protobuf.ByteString;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -53,6 +54,12 @@ public class RegionManager {
private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;
public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY =
Histogram.build()
.name("client_java_get_region_by_requests_latency")
.help("getRegionByKey request latency.")
.register();
// To avoid double retrieval, we used the async version of grpc
// When rpc not returned, instead of call again, it wait for previous one done
public RegionManager(
@ -197,18 +204,11 @@ public class RegionManager {
cache.invalidateRegion(regionId);
}
public double cacheMiss() {
logger.debug("cache miss: " + cache.miss + " total: " + cache.total);
return cache.miss * 1.0 / cache.total;
}
public static class RegionCache {
private final Map<Long, TiRegion> regionCache;
private final Map<Long, Store> storeCache;
private final RangeMap<Key, Long> keyToRegionIdCache;
private final ReadOnlyPDClient pdClient;
private int total = 0;
private int miss = 0;
public RegionCache(ReadOnlyPDClient pdClient) {
regionCache = new HashMap<>();
@ -219,30 +219,38 @@ public class RegionManager {
}
public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
Long regionId;
++total;
regionId = keyToRegionIdCache.get(Key.toRawKey(key));
if (logger.isDebugEnabled()) {
logger.debug(
String.format("getRegionByKey key[%s] -> ID[%s]", formatBytesUTF8(key), regionId));
}
if (regionId == null) {
logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key));
++miss;
TiRegion region = pdClient.getRegionByKey(backOffer, key);
if (!putRegion(region)) {
throw new TiClientInternalException("Invalid Region: " + region.toString());
Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
try {
Long regionId;
if (key.isEmpty()) {
// if key is empty, it must be the start key.
regionId = keyToRegionIdCache.get(Key.toRawKey(key, true));
} else {
regionId = keyToRegionIdCache.get(Key.toRawKey(key));
}
if (logger.isDebugEnabled()) {
logger.debug(
String.format("getRegionByKey key[%s] -> ID[%s]", formatBytesUTF8(key), regionId));
}
return region;
}
TiRegion region;
region = regionCache.get(regionId);
if (logger.isDebugEnabled()) {
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
}
return region;
if (regionId == null) {
logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key));
TiRegion region = pdClient.getRegionByKey(backOffer, key);
if (!putRegion(region)) {
throw new TiClientInternalException("Invalid Region: " + region.toString());
}
return region;
}
TiRegion region;
region = regionCache.get(regionId);
if (logger.isDebugEnabled()) {
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
}
return region;
} finally {
requestTimer.observeDuration();
}
}
private synchronized boolean putRegion(TiRegion region) {

View File

@ -26,6 +26,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import com.pingcap.tidb.tipb.DAGRequest;
import com.pingcap.tidb.tipb.SelectResponse;
import io.grpc.ManagedChannel;
import io.prometheus.client.Histogram;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -69,6 +70,13 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
private final PDClient pdClient;
private Boolean isV4 = null;
public static final Histogram GRPC_RAW_REQUEST_LATENCY =
Histogram.build()
.name("client_java_grpc_raw_requests_latency")
.help("grpc raw request latency.")
.labelNames("type")
.register();
private synchronized Boolean getIsV4() {
if (isV4 == null) {
isV4 = StoreVersion.minTiKVVersion(Version.RESOLVE_LOCK_V4, pdClient);
@ -798,16 +806,22 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
// APIs for Raw Scan/Put/Get/Delete
public ByteString rawGet(BackOffer backOffer, ByteString key) {
Supplier<RawGetRequest> factory =
() -> RawGetRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
KVErrorHandler<RawGetResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawGetResponse resp = callWithRetry(backOffer, TikvGrpc.getRawGetMethod(), factory, handler);
return rawGetHelper(resp);
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer();
try {
Supplier<RawGetRequest> factory =
() -> RawGetRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
KVErrorHandler<RawGetResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawGetResponse resp = callWithRetry(backOffer, TikvGrpc.getRawGetMethod(), factory, handler);
return rawGetHelper(resp);
} finally {
requestTimer.observeDuration();
}
}
private ByteString rawGetHelper(RawGetResponse resp) {
@ -825,19 +839,64 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
return resp.getValue();
}
public void rawDelete(BackOffer backOffer, ByteString key) {
Supplier<RawDeleteRequest> factory =
() -> RawDeleteRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
public Long rawGetKeyTTL(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get_key_ttl").startTimer();
try {
Supplier<RawGetKeyTTLRequest> factory =
() ->
RawGetKeyTTLRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
KVErrorHandler<RawGetKeyTTLResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawGetKeyTTLResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawGetKeyTTLMethod(), factory, handler);
return rawGetKeyTTLHelper(resp);
} finally {
requestTimer.observeDuration();
}
}
KVErrorHandler<RawDeleteResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawDeleteResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawDeleteMethod(), factory, handler);
rawDeleteHelper(resp, region);
private Long rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawGetResponse failed without a cause");
}
String error = resp.getError();
if (!error.isEmpty()) {
throw new KeyException(resp.getError());
}
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}
if (resp.getNotFound()) {
return null;
}
return resp.getTtl();
}
public void rawDelete(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete").startTimer();
try {
Supplier<RawDeleteRequest> factory =
() -> RawDeleteRequest.newBuilder().setContext(region.getContext()).setKey(key).build();
KVErrorHandler<RawDeleteResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawDeleteResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawDeleteMethod(), factory, handler);
rawDeleteHelper(resp, region);
} finally {
requestTimer.observeDuration();
}
}
private void rawDeleteHelper(RawDeleteResponse resp, TiRegion region) {
@ -855,23 +914,29 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
public void rawPut(BackOffer backOffer, ByteString key, ByteString value, long ttl) {
Supplier<RawPutRequest> factory =
() ->
RawPutRequest.newBuilder()
.setContext(region.getContext())
.setKey(key)
.setValue(value)
.setTtl(ttl)
.build();
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put").startTimer();
try {
Supplier<RawPutRequest> factory =
() ->
RawPutRequest.newBuilder()
.setContext(region.getContext())
.setKey(key)
.setValue(value)
.setTtl(ttl)
.build();
KVErrorHandler<RawPutResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawPutResponse resp = callWithRetry(backOffer, TikvGrpc.getRawPutMethod(), factory, handler);
rawPutHelper(resp);
KVErrorHandler<RawPutResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawPutResponse resp = callWithRetry(backOffer, TikvGrpc.getRawPutMethod(), factory, handler);
rawPutHelper(resp);
} finally {
requestTimer.observeDuration();
}
}
private void rawPutHelper(RawPutResponse resp) {
@ -889,24 +954,30 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
public List<KvPair> rawBatchGet(BackOffer backoffer, List<ByteString> keys) {
if (keys.isEmpty()) {
return new ArrayList<>();
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_get").startTimer();
try {
if (keys.isEmpty()) {
return new ArrayList<>();
}
Supplier<RawBatchGetRequest> factory =
() ->
RawBatchGetRequest.newBuilder()
.setContext(region.getContext())
.addAllKeys(keys)
.build();
KVErrorHandler<RawBatchGetResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawBatchGetResponse resp =
callWithRetry(backoffer, TikvGrpc.getRawBatchGetMethod(), factory, handler);
return handleRawBatchGet(resp);
} finally {
requestTimer.observeDuration();
}
Supplier<RawBatchGetRequest> factory =
() ->
RawBatchGetRequest.newBuilder()
.setContext(region.getContext())
.addAllKeys(keys)
.build();
KVErrorHandler<RawBatchGetResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawBatchGetResponse resp =
callWithRetry(backoffer, TikvGrpc.getRawBatchGetMethod(), factory, handler);
return handleRawBatchGet(resp);
}
private List<KvPair> handleRawBatchGet(RawBatchGetResponse resp) {
@ -921,25 +992,31 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
public void rawBatchPut(BackOffer backOffer, List<KvPair> kvPairs, long ttl) {
if (kvPairs.isEmpty()) {
return;
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_put").startTimer();
try {
if (kvPairs.isEmpty()) {
return;
}
Supplier<RawBatchPutRequest> factory =
() ->
RawBatchPutRequest.newBuilder()
.setContext(region.getContext())
.addAllPairs(kvPairs)
.setTtl(ttl)
.build();
KVErrorHandler<RawBatchPutResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawBatchPutResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawBatchPutMethod(), factory, handler);
handleRawBatchPut(resp);
} finally {
requestTimer.observeDuration();
}
Supplier<RawBatchPutRequest> factory =
() ->
RawBatchPutRequest.newBuilder()
.setContext(region.getContext())
.addAllPairs(kvPairs)
.setTtl(ttl)
.build();
KVErrorHandler<RawBatchPutResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawBatchPutResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawBatchPutMethod(), factory, handler);
handleRawBatchPut(resp);
}
public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl) {
@ -971,23 +1048,30 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
* @return KvPair list
*/
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
Supplier<RawScanRequest> factory =
() ->
RawScanRequest.newBuilder()
.setContext(region.getContext())
.setStartKey(key)
.setKeyOnly(keyOnly)
.setLimit(limit)
.build();
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan").startTimer();
try {
Supplier<RawScanRequest> factory =
() ->
RawScanRequest.newBuilder()
.setContext(region.getContext())
.setStartKey(key)
.setKeyOnly(keyOnly)
.setLimit(limit)
.build();
KVErrorHandler<RawScanResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawScanResponse resp = callWithRetry(backOffer, TikvGrpc.getRawScanMethod(), factory, handler);
return rawScanHelper(resp);
KVErrorHandler<RawScanResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawScanResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawScanMethod(), factory, handler);
return rawScanHelper(resp);
} finally {
requestTimer.observeDuration();
}
}
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, boolean keyOnly) {
@ -1013,23 +1097,29 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
* @param endKey endKey
*/
public void rawDeleteRange(BackOffer backOffer, ByteString startKey, ByteString endKey) {
Supplier<RawDeleteRangeRequest> factory =
() ->
RawDeleteRangeRequest.newBuilder()
.setContext(region.getContext())
.setStartKey(startKey)
.setEndKey(endKey)
.build();
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete_range").startTimer();
try {
Supplier<RawDeleteRangeRequest> factory =
() ->
RawDeleteRangeRequest.newBuilder()
.setContext(region.getContext())
.setStartKey(startKey)
.setEndKey(endKey)
.build();
KVErrorHandler<RawDeleteRangeResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawDeleteRangeResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawDeleteRangeMethod(), factory, handler);
rawDeleteRangeHelper(resp);
KVErrorHandler<RawDeleteRangeResponse> handler =
new KVErrorHandler<>(
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawDeleteRangeResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawDeleteRangeMethod(), factory, handler);
rawDeleteRangeHelper(resp);
} finally {
requestTimer.observeDuration();
}
}
private void rawDeleteRangeHelper(RawDeleteRangeResponse resp) {

View File

@ -16,14 +16,13 @@
package org.tikv.common.util;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Kvrpcpb;
@ -37,14 +36,20 @@ public class ClientUtils {
* @param batchMaxSizeInBytes batch max limit
*/
public static void appendBatches(
List<Batch> batches, TiRegion region, List<ByteString> keys, int batchMaxSizeInBytes) {
List<Batch> batches,
TiRegion region,
List<ByteString> keys,
int batchMaxSizeInBytes,
int batchLimit) {
if (keys == null) {
return;
}
int len = keys.size();
for (int start = 0, end; start < len; start = end) {
int size = 0;
for (end = start; end < len && size < batchMaxSizeInBytes; end++) {
for (end = start;
end < len && size < batchMaxSizeInBytes && end - start < batchLimit;
end++) {
size += keys.get(end).size();
}
Batch batch = new Batch(region, keys.subList(start, end));
@ -66,14 +71,17 @@ public class ClientUtils {
TiRegion region,
List<ByteString> keys,
List<ByteString> values,
int batchMaxSizeInBytes) {
int batchMaxSizeInBytes,
int batchLimit) {
if (keys == null) {
return;
}
int len = keys.size();
for (int start = 0, end; start < len; start = end) {
int size = 0;
for (end = start; end < len && size < batchMaxSizeInBytes; end++) {
for (end = start;
end < len && size < batchMaxSizeInBytes && end - start < batchLimit;
end++) {
size += keys.get(end).size();
size += values.get(end).size();
}
@ -82,6 +90,38 @@ public class ClientUtils {
}
}
public static Map<TiRegion, List<ByteString>> groupKeysByRegion(
RegionManager regionManager, Set<ByteString> keys, BackOffer backoffer) {
return groupKeysByRegion(regionManager, new ArrayList<>(keys), backoffer, true);
}
public static Map<TiRegion, List<ByteString>> groupKeysByRegion(
RegionManager regionManager, List<ByteString> keys, BackOffer backoffer) {
return groupKeysByRegion(regionManager, new ArrayList<>(keys), backoffer, false);
}
/**
* Group by list of keys according to its region
*
* @param keys keys
* @return a mapping of keys and their region
*/
public static Map<TiRegion, List<ByteString>> groupKeysByRegion(
RegionManager regionManager, List<ByteString> keys, BackOffer backoffer, boolean sorted) {
Map<TiRegion, List<ByteString>> groups = new HashMap<>();
if (!sorted) {
keys.sort((k1, k2) -> FastByteComparisons.compareTo(k1.toByteArray(), k2.toByteArray()));
}
TiRegion lastRegion = null;
for (ByteString key : keys) {
if (lastRegion == null || !lastRegion.contains(key)) {
lastRegion = regionManager.getRegionByKey(key, backoffer);
}
groups.computeIfAbsent(lastRegion, k -> new ArrayList<>()).add(key);
}
return groups;
}
public static List<Kvrpcpb.KvPair> getKvPairs(
ExecutorCompletionService<List<Kvrpcpb.KvPair>> completionService,
List<Batch> batches,
@ -102,14 +142,14 @@ public class ClientUtils {
}
}
public static void getTasks(
ExecutorCompletionService<List<Batch>> completionService,
Queue<List<Batch>> taskQueue,
List<Batch> batches,
public static <T> void getTasks(
ExecutorCompletionService<List<T>> completionService,
Queue<List<T>> taskQueue,
List<T> batches,
int backOff) {
try {
for (int i = 0; i < batches.size(); i++) {
List<Batch> task = completionService.take().get(backOff, TimeUnit.MILLISECONDS);
List<T> task = completionService.take().get(backOff, TimeUnit.MILLISECONDS);
if (!task.isEmpty()) {
taskQueue.offer(task);
}
@ -123,4 +163,30 @@ public class ClientUtils {
throw new TiKVException("Execution exception met.", e);
}
}
public static <T, U> List<U> getTasksWithOutput(
ExecutorCompletionService<Pair<List<T>, List<U>>> completionService,
Queue<List<T>> taskQueue,
List<T> batches,
int backOff) {
try {
List<U> result = new ArrayList<>();
for (int i = 0; i < batches.size(); i++) {
Pair<List<T>, List<U>> task = completionService.take().get(backOff, TimeUnit.MILLISECONDS);
if (!task.first.isEmpty()) {
taskQueue.offer(task.first);
} else {
result.addAll(task.second);
}
}
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common.util;
import com.google.protobuf.ByteString;
import org.tikv.common.region.TiRegion;
public class DeleteRange {
private final TiRegion region;
private final ByteString startKey;
private final ByteString endKey;
public DeleteRange(TiRegion region, ByteString startKey, ByteString endKey) {
this.region = region;
this.startKey = startKey;
this.endKey = endKey;
}
public TiRegion getRegion() {
return region;
}
public ByteString getStartKey() {
return startKey;
}
public ByteString getEndKey() {
return endKey;
}
@Override
public String toString() {
return KeyRangeUtils.makeRange(getStartKey(), getEndKey()).toString();
}
}

View File

@ -18,6 +18,8 @@ package org.tikv.raw;
import static org.tikv.common.util.ClientUtils.*;
import com.google.protobuf.ByteString;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@ -25,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.operation.iterator.RawScanIterator;
@ -44,16 +45,35 @@ public class RawKVClient implements AutoCloseable {
private final ExecutorService deleteRangeThreadPool;
private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class);
private static final int MAX_RETRY_LIMIT = 3;
// https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go
private static final int MAX_RAW_SCAN_LIMIT = 10240;
private static final int MAX_RAW_BATCH_LIMIT = 1024;
private static final int RAW_BATCH_PUT_SIZE = 1024 * 1024; // 1 MB
private static final int RAW_BATCH_GET_SIZE = 16 * 1024; // 16 K
private static final int RAW_BATCH_SCAN_SIZE = 16;
private static final int RAW_BATCH_PAIR_COUNT = 512;
private static final TiKVException ERR_RETRY_LIMIT_EXCEEDED =
new GrpcException("retry is exhausted. retry exceeds " + MAX_RETRY_LIMIT + "attempts");
public static final Histogram RAW_REQUEST_LATENCY =
Histogram.build()
.name("client_java_raw_requests_latency")
.help("client raw request latency.")
.labelNames("type")
.register();
public static final Counter RAW_REQUEST_SUCCESS =
Counter.build()
.name("client_java_raw_requests_success")
.help("client raw request success.")
.labelNames("type")
.register();
public static final Counter RAW_REQUEST_FAILURE =
Counter.build()
.name("client_java_raw_requests_failure")
.help("client raw request failure.")
.labelNames("type")
.register();
private static final TiKVException ERR_MAX_SCAN_LIMIT_EXCEEDED =
new TiKVException("limit should be less than MAX_RAW_SCAN_LIMIT");
@ -89,17 +109,26 @@ public class RawKVClient implements AutoCloseable {
* @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
*/
public void put(ByteString key, ByteString value, long ttl) {
BackOffer backOffer = defaultBackOff();
for (int i = 0; i < MAX_RETRY_LIMIT; i++) {
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawPut(backOffer, key, value, ttl);
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
String label = "client_raw_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawPut(backOffer, key, value, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
}
}
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
throw ERR_RETRY_LIMIT_EXCEEDED;
}
/**
@ -118,7 +147,17 @@ public class RawKVClient implements AutoCloseable {
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl);
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
}
private void batchPut(BackOffer backOffer, List<ByteString> keys, List<ByteString> values) {
@ -138,16 +177,26 @@ public class RawKVClient implements AutoCloseable {
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
public ByteString get(ByteString key) {
BackOffer backOffer = defaultBackOff();
for (int i = 0; i < MAX_RETRY_LIMIT; i++) {
RegionStoreClient client = clientBuilder.build(key);
try {
return client.rawGet(defaultBackOff(), key);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
String label = "client_raw_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
ByteString result = client.rawGet(defaultBackOff(), key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
}
}
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
throw ERR_RETRY_LIMIT_EXCEEDED;
}
/**
@ -157,41 +206,92 @@ public class RawKVClient implements AutoCloseable {
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
public List<KvPair> batchGet(List<ByteString> keys) {
BackOffer backOffer = defaultBackOff();
return doSendBatchGet(backOffer, keys);
String label = "client_raw_batch_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
List<KvPair> result = doSendBatchGet(backOffer, keys);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
}
/**
* Get the TTL of a raw key from TiKV if key exists
*
* @param key raw key
* @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key
* exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist
*/
public Long getKeyTTL(ByteString key) {
String label = "client_raw_get_key_ttl";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
Long result = client.rawGetKeyTTL(defaultBackOff(), key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
}
}
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
}
public List<List<KvPair>> batchScan(List<ScanOption> ranges) {
if (ranges.isEmpty()) {
return new ArrayList<>();
}
ExecutorCompletionService<Pair<Integer, List<KvPair>>> completionService =
new ExecutorCompletionService<>(batchScanThreadPool);
int num = 0;
for (ScanOption scanOption : ranges) {
int i = num;
completionService.submit(() -> Pair.create(i, scan(scanOption)));
++num;
}
List<List<KvPair>> scanResults = new ArrayList<>();
for (int i = 0; i < num; i++) {
scanResults.add(new ArrayList<>());
}
for (int i = 0; i < num; i++) {
try {
Pair<Integer, List<KvPair>> scanResult =
completionService.take().get(BackOffer.RAWKV_MAX_BACKOFF, TimeUnit.SECONDS);
scanResults.set(scanResult.first, scanResult.second);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
String label = "client_raw_batch_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
if (ranges.isEmpty()) {
return new ArrayList<>();
}
ExecutorCompletionService<Pair<Integer, List<KvPair>>> completionService =
new ExecutorCompletionService<>(batchScanThreadPool);
int num = 0;
for (ScanOption scanOption : ranges) {
int i = num;
completionService.submit(() -> Pair.create(i, scan(scanOption)));
++num;
}
List<List<KvPair>> scanResults = new ArrayList<>();
for (int i = 0; i < num; i++) {
scanResults.add(new ArrayList<>());
}
for (int i = 0; i < num; i++) {
try {
Pair<Integer, List<KvPair>> scanResult =
completionService.take().get(BackOffer.RAWKV_MAX_BACKOFF, TimeUnit.SECONDS);
scanResults.set(scanResult.first, scanResult.second);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}
RAW_REQUEST_SUCCESS.labels(label).inc();
return scanResults;
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
return scanResults;
}
/**
@ -216,11 +316,21 @@ public class RawKVClient implements AutoCloseable {
* @return list of key-value pairs in range
*/
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
Iterator<KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly);
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
String label = "client_raw_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
Iterator<KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly);
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
}
/**
@ -266,17 +376,28 @@ public class RawKVClient implements AutoCloseable {
* @return list of key-value pairs in range
*/
public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
List<KvPair> result = new ArrayList<>();
while (true) {
Iterator<KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly);
if (!iterator.hasNext()) {
break;
String label = "client_raw_scan_without_limit";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
List<KvPair> result = new ArrayList<>();
while (true) {
Iterator<KvPair> iterator =
rawScanIterator(
conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly);
if (!iterator.hasNext()) {
break;
}
iterator.forEachRemaining(result::add);
startKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString();
}
iterator.forEachRemaining(result::add);
startKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString();
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
return result;
}
private List<KvPair> scan(ScanOption scanOption) {
@ -313,15 +434,25 @@ public class RawKVClient implements AutoCloseable {
* @param key raw key to be deleted
*/
public void delete(ByteString key) {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawDelete(defaultBackOff(), key);
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
String label = "client_raw_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawDelete(defaultBackOff(), key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
}
}
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
}
@ -335,8 +466,18 @@ public class RawKVClient implements AutoCloseable {
* @param endKey raw start key to be deleted
*/
public synchronized void deleteRange(ByteString startKey, ByteString endKey) {
BackOffer backOffer = defaultBackOff();
doSendDeleteRange(backOffer, startKey, endKey);
String label = "client_raw_delete_range";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
doSendDeleteRange(backOffer, startKey, endKey);
RAW_REQUEST_SUCCESS.labels(label).inc();
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
}
/**
@ -356,7 +497,8 @@ public class RawKVClient implements AutoCloseable {
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchPutThreadPool);
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(kvPairs.keySet());
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), kvPairs.keySet(), backOffer);
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
@ -365,7 +507,8 @@ public class RawKVClient implements AutoCloseable {
entry.getKey(),
entry.getValue(),
entry.getValue().stream().map(kvPairs::get).collect(Collectors.toList()),
RAW_BATCH_PUT_SIZE);
RAW_BATCH_PUT_SIZE,
MAX_RAW_BATCH_LIMIT);
}
Queue<List<Batch>> taskQueue = new LinkedList<>();
taskQueue.offer(batches);
@ -382,28 +525,21 @@ public class RawKVClient implements AutoCloseable {
}
private List<Batch> doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) {
TiRegion oldRegion = batch.region;
TiRegion currentRegion =
clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey());
if (oldRegion.equals(currentRegion)) {
try (RegionStoreClient client = clientBuilder.build(batch.region); ) {
client.rawBatchPut(backOffer, batch, ttl);
return new ArrayList<>();
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("ReSplitting ranges for BatchPutRequest");
// retry
return doSendBatchPutWithRefetchRegion(backOffer, batch, ttl);
}
} else {
return doSendBatchPutWithRefetchRegion(backOffer, batch, ttl);
try (RegionStoreClient client = clientBuilder.build(batch.region)) {
client.rawBatchPut(backOffer, batch, ttl);
return new ArrayList<>();
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("ReSplitting ranges for BatchPutRequest");
// retry
return doSendBatchPutWithRefetchRegion(backOffer, batch);
}
}
private List<Batch> doSendBatchPutWithRefetchRegion(BackOffer backOffer, Batch batch, long ttl) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(batch.keys);
private List<Batch> doSendBatchPutWithRefetchRegion(BackOffer backOffer, Batch batch) {
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), batch.keys, backOffer);
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
@ -412,21 +548,24 @@ public class RawKVClient implements AutoCloseable {
entry.getKey(),
entry.getValue(),
entry.getValue().stream().map(batch.map::get).collect(Collectors.toList()),
RAW_BATCH_PUT_SIZE);
RAW_BATCH_PUT_SIZE,
MAX_RAW_BATCH_LIMIT);
}
return retryBatches;
}
private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys) {
ExecutorCompletionService<List<KvPair>> completionService =
ExecutorCompletionService<Pair<List<Batch>, List<KvPair>>> completionService =
new ExecutorCompletionService<>(batchGetThreadPool);
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(keys);
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), keys, backOffer);
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(batches, entry.getKey(), entry.getValue(), RAW_BATCH_GET_SIZE);
appendBatches(
batches, entry.getKey(), entry.getValue(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT);
}
for (Batch batch : batches) {
@ -434,46 +573,51 @@ public class RawKVClient implements AutoCloseable {
completionService.submit(() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch));
}
return getKvPairs(completionService, batches, BackOffer.RAWKV_MAX_BACKOFF);
Queue<List<Batch>> taskQueue = new LinkedList<>();
List<KvPair> result = new ArrayList<>();
taskQueue.offer(batches);
while (!taskQueue.isEmpty()) {
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(
() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch));
}
result.addAll(
getTasksWithOutput(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF));
}
return result;
}
private List<KvPair> doSendBatchGetInBatchesWithRetry(BackOffer backOffer, Batch batch) {
TiRegion oldRegion = batch.region;
TiRegion currentRegion =
clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey());
private Pair<List<Batch>, List<KvPair>> doSendBatchGetInBatchesWithRetry(
BackOffer backOffer, Batch batch) {
RegionStoreClient client = clientBuilder.build(batch.region);
try {
List<KvPair> partialResult = client.rawBatchGet(backOffer, batch.keys);
return Pair.create(new ArrayList<>(), partialResult);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
logger.warn("ReSplitting ranges for BatchGetRequest", e);
if (oldRegion.equals(currentRegion)) {
RegionStoreClient client = clientBuilder.build(batch.region);
try {
return client.rawBatchGet(backOffer, batch.keys);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
logger.warn("ReSplitting ranges for BatchGetRequest", e);
// retry
return doSendBatchGetWithRefetchRegion(backOffer, batch);
}
} else {
return doSendBatchGetWithRefetchRegion(backOffer, batch);
// retry
return Pair.create(doSendBatchGetWithRefetchRegion(backOffer, batch), new ArrayList<>());
}
}
private List<KvPair> doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch batch) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(batch.keys);
private List<Batch> doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch batch) {
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), batch.keys, backOffer);
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(retryBatches, entry.getKey(), entry.getValue(), RAW_BATCH_GET_SIZE);
appendBatches(
retryBatches, entry.getKey(), entry.getValue(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT);
}
ArrayList<KvPair> results = new ArrayList<>();
for (Batch retryBatch : retryBatches) {
// recursive calls
List<KvPair> batchResult = doSendBatchGetInBatchesWithRetry(backOffer, retryBatch);
results.addAll(batchResult);
}
return results;
return retryBatches;
}
private ByteString calcKeyByCondition(boolean condition, ByteString key1, ByteString key2) {
@ -484,85 +628,57 @@ public class RawKVClient implements AutoCloseable {
}
private void doSendDeleteRange(BackOffer backOffer, ByteString startKey, ByteString endKey) {
ExecutorCompletionService<Object> completionService =
ExecutorCompletionService<List<DeleteRange>> completionService =
new ExecutorCompletionService<>(deleteRangeThreadPool);
List<TiRegion> regions = fetchRegionsFromRange(startKey, endKey);
for (int i = 0; i < regions.size(); i++) {
TiRegion region = regions.get(i);
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
ByteString start = calcKeyByCondition(i == 0, startKey, region.getStartKey());
ByteString end = calcKeyByCondition(i == regions.size() - 1, endKey, region.getEndKey());
completionService.submit(
() -> doSendDeleteRangeWithRetry(singleBatchBackOffer, region, start, end));
}
for (int i = 0; i < regions.size(); i++) {
try {
completionService.take().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}
}
private Object doSendDeleteRangeWithRetry(
BackOffer backOffer, TiRegion region, ByteString startKey, ByteString endKey) {
TiRegion currentRegion = clientBuilder.getRegionManager().getRegionByKey(region.getStartKey());
if (region.equals(currentRegion)) {
RegionStoreClient client = clientBuilder.build(region);
try {
client.rawDeleteRange(backOffer, startKey, endKey);
return null;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(region.getId());
logger.warn("ReSplitting ranges for BatchDeleteRangeRequest", e);
// retry
return doSendDeleteRangeWithRefetchRegion(backOffer, startKey, endKey);
}
} else {
return doSendDeleteRangeWithRefetchRegion(backOffer, startKey, endKey);
}
}
private Object doSendDeleteRangeWithRefetchRegion(
BackOffer backOffer, ByteString startKey, ByteString endKey) {
List<TiRegion> regions = fetchRegionsFromRange(startKey, endKey);
List<TiRegion> regions = fetchRegionsFromRange(backOffer, startKey, endKey);
List<DeleteRange> ranges = new ArrayList<>();
for (int i = 0; i < regions.size(); i++) {
TiRegion region = regions.get(i);
ByteString start = calcKeyByCondition(i == 0, startKey, region.getStartKey());
ByteString end = calcKeyByCondition(i == regions.size() - 1, endKey, region.getEndKey());
doSendDeleteRangeWithRetry(backOffer, region, start, end);
ranges.add(new DeleteRange(region, start, end));
}
return null;
}
/**
* Group by list of keys according to its region
*
* @param keys keys
* @return a mapping of keys and their region
*/
private Map<TiRegion, List<ByteString>> groupKeysByRegion(Set<ByteString> keys) {
Map<TiRegion, List<ByteString>> groups = new HashMap<>();
TiRegion lastRegion = null;
for (ByteString key : keys) {
if (lastRegion == null || !lastRegion.contains(key)) {
lastRegion = clientBuilder.getRegionManager().getRegionByKey(key);
Queue<List<DeleteRange>> taskQueue = new LinkedList<>();
taskQueue.offer(ranges);
while (!taskQueue.isEmpty()) {
List<DeleteRange> task = taskQueue.poll();
for (DeleteRange range : task) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(() -> doSendDeleteRangeWithRetry(singleBatchBackOffer, range));
}
groups.computeIfAbsent(lastRegion, k -> new ArrayList<>()).add(key);
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
return groups;
}
private Map<TiRegion, List<ByteString>> groupKeysByRegion(List<ByteString> keys) {
return keys.stream()
.collect(Collectors.groupingBy(clientBuilder.getRegionManager()::getRegionByKey));
private List<DeleteRange> doSendDeleteRangeWithRetry(BackOffer backOffer, DeleteRange range) {
try (RegionStoreClient client = clientBuilder.build(range.getRegion())) {
client.setTimeout(conf.getScanTimeout());
client.rawDeleteRange(backOffer, range.getStartKey(), range.getEndKey());
return new ArrayList<>();
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(range.getRegion().getId());
logger.warn("ReSplitting ranges for BatchDeleteRangeRequest", e);
// retry
return doSendDeleteRangeWithRefetchRegion(backOffer, range);
}
}
private List<DeleteRange> doSendDeleteRangeWithRefetchRegion(
BackOffer backOffer, DeleteRange range) {
List<TiRegion> regions =
fetchRegionsFromRange(backOffer, range.getStartKey(), range.getEndKey());
List<DeleteRange> retryRanges = new ArrayList<>();
for (int i = 0; i < regions.size(); i++) {
TiRegion region = regions.get(i);
ByteString start = calcKeyByCondition(i == 0, range.getStartKey(), region.getStartKey());
ByteString end =
calcKeyByCondition(i == regions.size() - 1, range.getEndKey(), region.getEndKey());
retryRanges.add(new DeleteRange(region, start, end));
}
return retryRanges;
}
private static Map<ByteString, ByteString> mapKeysToValues(
@ -574,12 +690,16 @@ public class RawKVClient implements AutoCloseable {
return map;
}
private List<TiRegion> fetchRegionsFromRange(ByteString startKey, ByteString endKey) {
private List<TiRegion> fetchRegionsFromRange(
BackOffer backOffer, ByteString startKey, ByteString endKey) {
List<TiRegion> regions = new ArrayList<>();
while (FastByteComparisons.compareTo(startKey.toByteArray(), endKey.toByteArray()) < 0) {
TiRegion currentRegion = clientBuilder.getRegionManager().getRegionByKey(startKey);
while (startKey.isEmpty() || Key.toRawKey(startKey).compareTo(Key.toRawKey(endKey)) < 0) {
TiRegion currentRegion = clientBuilder.getRegionManager().getRegionByKey(startKey, backOffer);
regions.add(currentRegion);
startKey = currentRegion.getEndKey();
if (currentRegion.getEndKey().isEmpty()) {
break;
}
}
return regions;
}

View File

@ -15,8 +15,7 @@
package org.tikv.txn;
import static org.tikv.common.util.ClientUtils.appendBatches;
import static org.tikv.common.util.ClientUtils.getKvPairs;
import static org.tikv.common.util.ClientUtils.*;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
@ -24,7 +23,6 @@ import java.util.*;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
@ -42,6 +40,7 @@ import org.tikv.kvproto.Kvrpcpb;
public class KVClient implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(KVClient.class);
private static final int MAX_BATCH_LIMIT = 1024;
private static final int BATCH_GET_SIZE = 16 * 1024;
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
@ -137,11 +136,12 @@ public class KVClient implements AutoCloseable {
ExecutorCompletionService<List<Kvrpcpb.KvPair>> completionService =
new ExecutorCompletionService<>(executorService);
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(keys);
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), keys, backOffer);
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE, MAX_BATCH_LIMIT);
}
for (Batch batch : batches) {
@ -178,11 +178,13 @@ public class KVClient implements AutoCloseable {
private List<Kvrpcpb.KvPair> doSendBatchGetWithRefetchRegion(
BackOffer backOffer, Batch batch, long version) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(batch.keys);
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), batch.keys, backOffer);
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(retryBatches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
appendBatches(
retryBatches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE, MAX_BATCH_LIMIT);
}
ArrayList<Kvrpcpb.KvPair> results = new ArrayList<>();
@ -195,17 +197,6 @@ public class KVClient implements AutoCloseable {
return results;
}
/**
* Group by list of keys according to its region
*
* @param keys keys
* @return a mapping of keys and their region
*/
private Map<TiRegion, List<ByteString>> groupKeysByRegion(List<ByteString> keys) {
return keys.stream()
.collect(Collectors.groupingBy(clientBuilder.getRegionManager()::getRegionByKey));
}
private Iterator<Kvrpcpb.KvPair> scanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,

View File

@ -17,12 +17,16 @@ package org.tikv.common;
import static org.junit.Assert.*;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import com.google.protobuf.ByteString;
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import org.tikv.common.key.Key;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
@ -38,6 +42,16 @@ public class RegionManagerTest extends PDMockServerTest {
mgr = session.getRegionManager();
}
@Test
public void testRegionBorder() {
RangeMap<Key, Long> map = TreeRangeMap.create();
map.put(KeyRangeUtils.makeRange(ByteString.EMPTY, ByteString.copyFromUtf8("abc")), 1L);
map.put(KeyRangeUtils.makeRange(ByteString.copyFromUtf8("abc"), ByteString.EMPTY), 2L);
assert map.get(Key.toRawKey(ByteString.EMPTY)) == null;
assert map.get(Key.toRawKey(ByteString.EMPTY, true)) == 1L;
}
@Test
public void getRegionByKey() throws Exception {
ByteString startKey = ByteString.copyFrom(new byte[] {1});

View File

@ -92,6 +92,118 @@ public class RawKVClientTest {
}
}
@Test
public void getKeyTTLTest() {
if (!initialized) return;
long ttl = 10;
ByteString key = ByteString.copyFromUtf8("key_ttl");
ByteString value = ByteString.copyFromUtf8("value");
client.put(key, value, ttl);
for (int i = 0; i < 9; i++) {
Long t = client.getKeyTTL(key);
logger.info("current ttl of key is " + t);
try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
}
Long t = client.getKeyTTL(key);
if (t == null) {
logger.info("key outdated.");
} else {
logger.info("key not outdated: " + t);
}
}
private ByteString generateBatchPutKey(String envId, String type, String id) {
return ByteString.copyFromUtf8(
String.format("indexInfo_:_{%s}_:_{%s}_:_{%s}", envId, type, id));
}
private ByteString generateBatchPutValue() {
return ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(290));
}
private String generateEnvId() {
return String.format(
"%s%02d", RandomStringUtils.randomAlphabetic(2).toLowerCase(Locale.ROOT), r.nextInt(100));
}
private String generateType() {
return String.format(
"%s%02d", RandomStringUtils.randomAlphabetic(3).toUpperCase(Locale.ROOT), r.nextInt(10000));
}
@Test
public void batchPutTest() {
if (!initialized) return;
ExecutorService executors = Executors.newFixedThreadPool(200);
ExecutorCompletionService<Object> completionService =
new ExecutorCompletionService<>(executors);
long dataCnt = 1000L;
long keysPerBatch = 1000;
long workerCnt = dataCnt / keysPerBatch;
List<String> envIdPool = new ArrayList<>();
int envIdPoolSize = 10000;
for (int i = 0; i < envIdPoolSize; i++) {
envIdPool.add(generateEnvId());
}
List<String> typePool = new ArrayList<>();
int typePoolSize = 10000;
for (int i = 0; i < typePoolSize; i++) {
typePool.add(generateType());
}
List<ByteString> valuePool = new ArrayList<>();
int valuePoolSize = 10000;
for (int i = 0; i < valuePoolSize; i++) {
valuePool.add(generateBatchPutValue());
}
for (long i = 0; i < workerCnt; i++) {
completionService.submit(
() -> {
String envId = envIdPool.get(r.nextInt(envIdPoolSize));
String type = typePool.get(r.nextInt(typePoolSize));
String prefix =
String.format(
"%d%09d%09d", r.nextInt(10), r.nextInt(1000000000), r.nextInt(1000000000));
Map<ByteString, ByteString> map = new HashMap<>();
RawKVClient rawKVClient = session.createRawClient();
for (int j = 0; j < keysPerBatch; j++) {
String id = String.format("%s%04d", prefix, j);
map.put(
generateBatchPutKey(envId, type, id), valuePool.get(r.nextInt(valuePoolSize)));
}
rawKVClient.batchPut(map);
return null;
});
}
logger.info("start");
try {
for (int i = 0; i < workerCnt; i++) {
completionService.take().get(1, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("Current thread interrupted. Test fail.");
} catch (TimeoutException e) {
logger.info("TimeOut Exceeded for current test. " + 1 + "s");
} catch (ExecutionException e) {
logger.info("Execution exception met. Test fail.");
}
logger.info("done");
}
@Test
public void deleteRangeTest() {
if (!initialized) return;
client.deleteRange(ByteString.EMPTY, ByteString.EMPTY);
}
@Test
public void simpleTest() {
if (!initialized) return;
@ -186,8 +298,6 @@ public class RawKVClientTest {
rawDeleteTest(deleteCases, benchmark);
}
prepare();
// TODO: check whether cluster supports ttl
// long ttl = 10;
// rawTTLTest(10, ttl, benchmark);
@ -582,6 +692,7 @@ public class RawKVClientTest {
ByteString key = randomKeys.get(i), value = values.get(r.nextInt(KEY_POOL_SIZE));
data.put(key, value);
checkPutTTL(key, value, ttl);
checkGetKeyTTL(key, ttl);
}
try {
Thread.sleep(ttl * 1000);
@ -591,6 +702,7 @@ public class RawKVClientTest {
for (int i = 0; i < cases; i++) {
ByteString key = randomKeys.get(i);
checkGetTTLTimeOut(key);
checkGetKeyTTLTimeOut(key);
}
}
}
@ -677,10 +789,21 @@ public class RawKVClientTest {
assert client.get(key).equals(value);
}
private void checkGetKeyTTL(ByteString key, long ttl) {
Long t = client.getKeyTTL(key);
assert t != null;
assert t <= ttl && t > 0;
}
private void checkGetTTLTimeOut(ByteString key) {
assert client.get(key).isEmpty();
}
private void checkGetKeyTTLTimeOut(ByteString key) {
Long t = client.getKeyTTL(key);
assert t == null;
}
private void checkEmpty(ByteString key) {
assert client.get(key).isEmpty();
}