Batch cherry pick from release-3.1 (#356)

Signed-off-by: marsishandsome <marsishandsome@gmail.com>
Signed-off-by: birdstorm <samuelwyf@hotmail.com>
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
This commit is contained in:
Liangliang Gu 2021-12-01 20:51:41 +08:00 committed by GitHub
parent e2f10aa2ab
commit 1886700118
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 3007 additions and 680 deletions

View File

@ -166,6 +166,30 @@ The following includes ThreadPool related parameters, which can be passed in thr
- a PKCS#8 private key file in PEM format. e.g. /home/tidb/client-key.pem.
- default: null
#### tikv.rawkv.read_timeout_in_ms
- RawKV read timeout in milliseconds. This parameter controls the timeout of `get` `getKeyTTL`.
- default: 2000 (2 seconds)
#### tikv.rawkv.write_timeout_in_ms
- RawKV write timeout in milliseconds. This parameter controls the timeout of `put` `putAtomic` `putIfAbsent` `delete` `deleteAtomic`.
- default: 2000 (2 seconds)
#### tikv.rawkv.batch_read_timeout_in_ms
- RawKV batch read timeout in milliseconds. This parameter controls the timeout of `batchGet`.
- default: 2000 (2 seconds)
#### tikv.rawkv.batch_write_timeout_in_ms
- RawKV batch write timeout in milliseconds. This parameter controls the timeout of `batchPut` `batchDelete` `batchDeleteAtomic`.
- default: 2000 (2 seconds)
#### tikv.rawkv.scan_timeout_in_ms
- RawKV scan timeout in milliseconds. This parameter controls the timeout of `batchScan` `scan` `scanPrefix`.
- default: 10000 (10 seconds)
#### tikv.rawkv.clean_timeout_in_ms
- RawKV clean timeout in milliseconds. This parameter controls the timeout of `deleteRange` `deletePrefix`.
- default: 600000 (10 minutes)
## Metrics
Client Java supports exporting metrics to Prometheus using poll mode and viewing on Grafana. The following steps shows how to enable this function.

File diff suppressed because it is too large Load Diff

10
pom.xml
View File

@ -65,8 +65,9 @@
<protobuf.version>3.5.1</protobuf.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.16</slf4j.version>
<grpc.version>1.24.0</grpc.version>
<netty.tcnative.version>2.0.25.Final</netty.tcnative.version>
<grpc.version>1.38.0</grpc.version>
<netty.tcnative.version>2.0.34.Final</netty.tcnative.version>
<gson.version>2.8.5</gson.version>
<powermock.version>1.6.6</powermock.version>
<jackson.version>2.12.3</jackson.version>
<trove4j.version>3.0.1</trove4j.version>
@ -151,6 +152,11 @@
<version>${grpc.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>

View File

@ -23,6 +23,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.AbstractFutureStub;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
@ -38,14 +39,15 @@ import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
public abstract class AbstractGRPCClient<
BlockingStubT extends AbstractStub<BlockingStubT>, StubT extends AbstractStub<StubT>>
BlockingStubT extends AbstractStub<BlockingStubT>,
FutureStubT extends AbstractFutureStub<FutureStubT>>
implements AutoCloseable {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected final ChannelFactory channelFactory;
protected TiConfiguration conf;
protected long timeout;
protected BlockingStubT blockingStub;
protected StubT asyncStub;
protected FutureStubT asyncStub;
protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) {
this.conf = conf;
@ -57,7 +59,7 @@ public abstract class AbstractGRPCClient<
TiConfiguration conf,
ChannelFactory channelFactory,
BlockingStubT blockingStub,
StubT asyncStub) {
FutureStubT asyncStub) {
this.conf = conf;
this.timeout = conf.getTimeout();
this.channelFactory = channelFactory;
@ -88,7 +90,8 @@ public abstract class AbstractGRPCClient<
return ClientCalls.blockingUnaryCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
if (logger.isTraceEnabled()) {
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
@ -109,14 +112,15 @@ public abstract class AbstractGRPCClient<
.create(handler)
.callWithRetry(
() -> {
StubT stub = getAsyncStub();
FutureStubT stub = getAsyncStub();
ClientCalls.asyncUnaryCall(
stub.getChannel().newCall(method, stub.getCallOptions()),
requestFactory.get(),
responseObserver);
return null;
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
}
@ -133,11 +137,12 @@ public abstract class AbstractGRPCClient<
.create(handler)
.callWithRetry(
() -> {
StubT stub = getAsyncStub();
FutureStubT stub = getAsyncStub();
return asyncBidiStreamingCall(
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return observer;
}
@ -160,7 +165,8 @@ public abstract class AbstractGRPCClient<
blockingServerStreamingCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()));
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return response;
}
@ -175,7 +181,7 @@ public abstract class AbstractGRPCClient<
protected abstract BlockingStubT getBlockingStub();
protected abstract StubT getAsyncStub();
protected abstract FutureStubT getAsyncStub();
protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);

View File

@ -70,6 +70,22 @@ public class ConfigUtils {
public static final String TIKV_SCATTER_WAIT_SECONDS = "tikv.scatter_wait_seconds";
public static final String TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = "tikv.rawkv.default_backoff_in_ms";
public static final String TIKV_RAWKV_READ_TIMEOUT_IN_MS = "tikv.rawkv.read_timeout_in_ms";
public static final String TIKV_RAWKV_WRITE_TIMEOUT_IN_MS = "tikv.rawkv.write_timeout_in_ms";
public static final String TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS =
"tikv.rawkv.batch_read_timeout_in_ms";
public static final String TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS =
"tikv.rawkv.batch_write_timeout_in_ms";
public static final String TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = "tikv.rawkv.scan_timeout_in_ms";
public static final String TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = "tikv.rawkv.clean_timeout_in_ms";
public static final String TIKV_BO_REGION_MISS_BASE_IN_MS = "tikv.bo_region_miss_base_in_ms";
public static final String TIKV_RAWKV_READ_SLOWLOG_IN_MS = "tikv.rawkv.read_slowlog_in_ms";
public static final String TIKV_RAWKV_WRITE_SLOWLOG_IN_MS = "tikv.rawkv.write_slowlog_in_ms";
public static final String TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS =
"tikv.rawkv.batch_read_slowlog_in_ms";
public static final String TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS =
"tikv.rawkv.batch_write_slowlog_in_ms";
public static final String TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "tikv.rawkv.scan_slowlog_in_ms";
public static final String TIKV_TLS_ENABLE = "tikv.tls_enable";
public static final String TIKV_TRUST_CERT_COLLECTION = "tikv.trust_cert_collection";
@ -117,6 +133,16 @@ public class ConfigUtils {
public static final int DEF_TIKV_SCATTER_WAIT_SECONDS = 300;
public static final int DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = BackOffer.RAWKV_MAX_BACKOFF;
public static final int DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS = 2000;
public static final int DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS = 2000;
public static final int DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS = 2000;
public static final int DEF_TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS = 2000;
public static final int DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = 10000;
public static final int DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = 600000;
public static final int DEF_TIKV_BO_REGION_MISS_BASE_IN_MS = 20;
public static final String DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "5000";
public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
public static final String HIGH_COMMAND_PRIORITY = "HIGH";

View File

@ -14,15 +14,10 @@
*/
package org.tikv.common;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
import java.net.InetSocketAddress;
import io.prometheus.client.hotspot.DefaultExports;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.policy.RetryPolicy;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.raw.RawKVClient;
public class MetricsServer {
private static final Logger logger = LoggerFactory.getLogger(MetricsServer.class);
@ -57,16 +52,9 @@ public class MetricsServer {
private MetricsServer(int port) {
try {
CollectorRegistry collectorRegistry = new CollectorRegistry();
collectorRegistry.register(RawKVClient.RAW_REQUEST_LATENCY);
collectorRegistry.register(RawKVClient.RAW_REQUEST_FAILURE);
collectorRegistry.register(RawKVClient.RAW_REQUEST_SUCCESS);
collectorRegistry.register(RegionStoreClient.GRPC_RAW_REQUEST_LATENCY);
collectorRegistry.register(RetryPolicy.GRPC_SINGLE_REQUEST_LATENCY);
collectorRegistry.register(RegionManager.GET_REGION_BY_KEY_REQUEST_LATENCY);
collectorRegistry.register(PDClient.PD_GET_REGION_BY_KEY_REQUEST_LATENCY);
this.port = port;
this.server = new HTTPServer(new InetSocketAddress(port), collectorRegistry, true);
DefaultExports.initialize();
this.server = new HTTPServer(port, true);
logger.info("http server is up " + this.server.getPort());
} catch (Exception e) {
logger.error("http server not up");

View File

@ -39,6 +39,8 @@ import io.prometheus.client.Histogram;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
@ -76,7 +78,7 @@ import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.PDGrpc.PDBlockingStub;
import org.tikv.kvproto.PDGrpc.PDStub;
import org.tikv.kvproto.PDGrpc.PDFutureStub;
import org.tikv.kvproto.Pdpb;
import org.tikv.kvproto.Pdpb.Error;
import org.tikv.kvproto.Pdpb.ErrorType;
@ -99,7 +101,7 @@ import org.tikv.kvproto.Pdpb.Timestamp;
import org.tikv.kvproto.Pdpb.TsoRequest;
import org.tikv.kvproto.Pdpb.TsoResponse;
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
implements ReadOnlyPDClient {
private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
private static final long MIN_TRY_UPDATE_DURATION = 50;
@ -442,7 +444,6 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
new PDClientWrapper(leaderUrlStr, leaderUrlStr, clientChannel, System.nanoTime());
timeout = conf.getTimeout();
} catch (IllegalArgumentException e) {
logger.error("Error updating leader. " + leaderUrlStr, e);
return false;
}
logger.info(String.format("Switched to new leader: %s", pdClientWrapper));
@ -462,7 +463,6 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
pdClientWrapper = new PDClientWrapper(leaderUrls, followerUrlStr, channel, System.nanoTime());
timeout = conf.getForwardTimeout();
} catch (IllegalArgumentException e) {
logger.error("Error updating follower. " + followerUrlStr, e);
return false;
}
logger.info(String.format("Switched to new leader by follower forward: %s", pdClientWrapper));
@ -624,7 +624,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
@Override
protected PDStub getAsyncStub() {
protected PDFutureStub getAsyncStub() {
if (pdClientWrapper == null) {
throw new GrpcException("PDClient may not be initialized");
}
@ -632,8 +632,11 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
private void initCluster() {
logger.info("init cluster: start");
GetMembersResponse resp = null;
List<URI> pdAddrs = getConf().getPdAddrs();
List<URI> pdAddrs = new ArrayList<>(getConf().getPdAddrs());
// shuffle PD addresses so that clients call getMembers from different PD
Collections.shuffle(pdAddrs);
this.pdAddrs = pdAddrs;
this.etcdClient =
Client.builder()
@ -645,19 +648,26 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
.setDaemon(true)
.build()))
.build();
logger.info("init host mapping: start");
this.hostMapping =
Optional.ofNullable(getConf().getHostMapping())
.orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName()));
logger.info("init host mapping: end");
// The first request may cost too much latency
long originTimeout = this.timeout;
this.timeout = conf.getPdFirstGetMemberTimeout();
for (URI u : pdAddrs) {
logger.info("get members with pd " + u + ": start");
resp = getMembers(u);
logger.info("get members with pd " + u + ": end");
if (resp != null) {
break;
}
logger.info("Could not get leader member with pd: " + u);
}
if (resp == null) {
logger.error("Could not get leader member with: " + pdAddrs);
}
this.timeout = originTimeout;
checkNotNull(resp, "Failed to init client for PD cluster.");
long clusterId = resp.getHeader().getClusterId();
@ -673,7 +683,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
String leaderUrlStr = resp.getLeader().getClientUrls(0);
leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
logger.info("createLeaderClientWrapper with leader " + leaderUrlStr + ": start");
createLeaderClientWrapper(leaderUrlStr);
logger.info("createLeaderClientWrapper with leader " + leaderUrlStr + ": end");
service =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
@ -702,12 +714,13 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
tiflashReplicaService.scheduleAtFixedRate(
this::updateTiFlashReplicaStatus, 10, 10, TimeUnit.SECONDS);
}
logger.info("init cluster: finish");
}
static class PDClientWrapper {
private final String leaderInfo;
private final PDBlockingStub blockingStub;
private final PDStub asyncStub;
private final PDFutureStub asyncStub;
private final long createTime;
private final String storeAddress;
@ -718,10 +731,10 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString());
this.blockingStub =
MetadataUtils.attachHeaders(PDGrpc.newBlockingStub(clientChannel), header);
this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newStub(clientChannel), header);
this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newFutureStub(clientChannel), header);
} else {
this.blockingStub = PDGrpc.newBlockingStub(clientChannel);
this.asyncStub = PDGrpc.newStub(clientChannel);
this.asyncStub = PDGrpc.newFutureStub(clientChannel);
}
this.leaderInfo = leaderInfo;
this.storeAddress = storeAddress;
@ -740,7 +753,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
return blockingStub;
}
PDStub getAsyncStub() {
PDFutureStub getAsyncStub() {
return asyncStub;
}

View File

@ -35,9 +35,9 @@ public class TiConfiguration implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
public static final Metadata.Key FORWARD_META_DATA_KEY =
public static final Metadata.Key<String> FORWARD_META_DATA_KEY =
Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key PD_FORWARD_META_DATA_KEY =
public static final Metadata.Key<String> PD_FORWARD_META_DATA_KEY =
Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
static {
@ -122,6 +122,14 @@ public class TiConfiguration implements Serializable {
setIfMissing(TIKV_GRPC_KEEPALIVE_TIMEOUT, DEF_TIKV_GRPC_KEEPALIVE_TIMEOUT);
setIfMissing(TIKV_TLS_ENABLE, DEF_TIKV_TLS_ENABLE);
setIfMissing(TIFLASH_ENABLE, DEF_TIFLASH_ENABLE);
setIfMissing(TIKV_RAWKV_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS);
setIfMissing(TIKV_BO_REGION_MISS_BASE_IN_MS, DEF_TIKV_BO_REGION_MISS_BASE_IN_MS);
setIfMissing(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS, DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS);
}
public static void listAll() {
@ -168,10 +176,14 @@ public class TiConfiguration implements Serializable {
return option.get();
}
private static int getInt(String key) {
public static int getInt(String key) {
return Integer.parseInt(get(key));
}
public static Optional<Integer> getIntOption(String key) {
return getOption(key).map(Integer::parseInt);
}
private static int getInt(String key, int defaultValue) {
try {
return getOption(key).map(Integer::parseInt).orElse(defaultValue);
@ -322,6 +334,19 @@ public class TiConfiguration implements Serializable {
private int scatterWaitSeconds = getInt(TIKV_SCATTER_WAIT_SECONDS);
private int rawKVDefaultBackoffInMS = getInt(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS);
private int rawKVReadTimeoutInMS = getInt(TIKV_RAWKV_READ_TIMEOUT_IN_MS);
private int rawKVWriteTimeoutInMS = getInt(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS);
private int rawKVBatchReadTimeoutInMS = getInt(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS);
private int rawKVBatchWriteTimeoutInMS = getInt(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS);
private int rawKVScanTimeoutInMS = getInt(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS);
private int rawKVCleanTimeoutInMS = getInt(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS);
private Optional<Integer> rawKVReadSlowLogInMS = getIntOption(TIKV_RAWKV_READ_SLOWLOG_IN_MS);
private Optional<Integer> rawKVWriteSlowLogInMS = getIntOption(TIKV_RAWKV_WRITE_SLOWLOG_IN_MS);
private Optional<Integer> rawKVBatchReadSlowLogInMS =
getIntOption(TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS);
private Optional<Integer> rawKVBatchWriteSlowLogInMS =
getIntOption(TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS);
private int rawKVScanSlowLogInMS = getInt(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS);
private boolean tlsEnable = getBoolean(TIKV_TLS_ENABLE);
private String trustCertCollectionFile = getOption(TIKV_TRUST_CERT_COLLECTION).orElse(null);
@ -768,4 +793,92 @@ public class TiConfiguration implements Serializable {
public void setKeyFile(String keyFile) {
this.keyFile = keyFile;
}
public int getRawKVReadTimeoutInMS() {
return rawKVReadTimeoutInMS;
}
public void setRawKVReadTimeoutInMS(int rawKVReadTimeoutInMS) {
this.rawKVReadTimeoutInMS = rawKVReadTimeoutInMS;
}
public int getRawKVWriteTimeoutInMS() {
return rawKVWriteTimeoutInMS;
}
public void setRawKVWriteTimeoutInMS(int rawKVWriteTimeoutInMS) {
this.rawKVWriteTimeoutInMS = rawKVWriteTimeoutInMS;
}
public int getRawKVBatchReadTimeoutInMS() {
return rawKVBatchReadTimeoutInMS;
}
public void setRawKVBatchReadTimeoutInMS(int rawKVBatchReadTimeoutInMS) {
this.rawKVBatchReadTimeoutInMS = rawKVBatchReadTimeoutInMS;
}
public int getRawKVBatchWriteTimeoutInMS() {
return rawKVBatchWriteTimeoutInMS;
}
public void setRawKVBatchWriteTimeoutInMS(int rawKVBatchWriteTimeoutInMS) {
this.rawKVBatchWriteTimeoutInMS = rawKVBatchWriteTimeoutInMS;
}
public int getRawKVScanTimeoutInMS() {
return rawKVScanTimeoutInMS;
}
public void setRawKVScanTimeoutInMS(int rawKVScanTimeoutInMS) {
this.rawKVScanTimeoutInMS = rawKVScanTimeoutInMS;
}
public int getRawKVCleanTimeoutInMS() {
return rawKVCleanTimeoutInMS;
}
public void setRawKVCleanTimeoutInMS(int rawKVCleanTimeoutInMS) {
this.rawKVCleanTimeoutInMS = rawKVCleanTimeoutInMS;
}
public Integer getRawKVReadSlowLogInMS() {
return rawKVReadSlowLogInMS.orElse((int) (getTimeout() * 2));
}
public void setRawKVReadSlowLogInMS(Integer rawKVReadSlowLogInMS) {
this.rawKVReadSlowLogInMS = Optional.of(rawKVReadSlowLogInMS);
}
public Integer getRawKVWriteSlowLogInMS() {
return rawKVWriteSlowLogInMS.orElse((int) (getTimeout() * 2));
}
public void setRawKVWriteSlowLogInMS(Integer rawKVWriteSlowLogInMS) {
this.rawKVWriteSlowLogInMS = Optional.of(rawKVWriteSlowLogInMS);
}
public Integer getRawKVBatchReadSlowLogInMS() {
return rawKVBatchReadSlowLogInMS.orElse((int) (getTimeout() * 2));
}
public void setRawKVBatchReadSlowLogInMS(Integer rawKVBatchReadSlowLogInMS) {
this.rawKVBatchReadSlowLogInMS = Optional.of(rawKVBatchReadSlowLogInMS);
}
public Integer getRawKVBatchWriteSlowLogInMS() {
return rawKVBatchWriteSlowLogInMS.orElse((int) (getTimeout() * 2));
}
public void setRawKVBatchWriteSlowLogInMS(Integer rawKVBatchWriteSlowLogInMS) {
this.rawKVBatchWriteSlowLogInMS = Optional.of(rawKVBatchWriteSlowLogInMS);
}
public int getRawKVScanSlowLogInMS() {
return rawKVScanSlowLogInMS;
}
public void setRawKVScanSlowLogInMS(int rawKVScanSlowLogInMS) {
this.rawKVScanSlowLogInMS = rawKVScanSlowLogInMS;
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common.exception;
public class InvalidStoreException extends TiKVException {
public InvalidStoreException(long storeId) {
super(String.format("Invalid storeId: %d", storeId));
}
}

View File

@ -41,17 +41,22 @@ import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Kvrpcpb;
public class ImporterStoreClient<RequestClass, ResponseClass>
extends AbstractGRPCClient<ImportSSTGrpc.ImportSSTBlockingStub, ImportSSTGrpc.ImportSSTStub>
extends AbstractGRPCClient<
ImportSSTGrpc.ImportSSTBlockingStub, ImportSSTGrpc.ImportSSTFutureStub>
implements StreamObserver<ResponseClass> {
private static final Logger logger = LoggerFactory.getLogger(ImporterStoreClient.class);
private final ImportSSTGrpc.ImportSSTStub stub;
protected ImporterStoreClient(
TiConfiguration conf,
ChannelFactory channelFactory,
ImportSSTGrpc.ImportSSTBlockingStub blockingStub,
ImportSSTGrpc.ImportSSTStub asyncStub) {
ImportSSTGrpc.ImportSSTFutureStub asyncStub,
ImportSSTGrpc.ImportSSTStub stub) {
super(conf, channelFactory, blockingStub, asyncStub);
this.stub = stub;
}
private StreamObserver<RequestClass> streamObserverRequest;
@ -108,11 +113,11 @@ public class ImporterStoreClient<RequestClass, ResponseClass>
if (conf.isRawKVMode()) {
streamObserverRequest =
(StreamObserver<RequestClass>)
getAsyncStub().rawWrite((StreamObserver<ImportSstpb.RawWriteResponse>) this);
getStub().rawWrite((StreamObserver<ImportSstpb.RawWriteResponse>) this);
} else {
streamObserverRequest =
(StreamObserver<RequestClass>)
getAsyncStub().write((StreamObserver<ImportSstpb.WriteResponse>) this);
getStub().write((StreamObserver<ImportSstpb.WriteResponse>) this);
}
}
@ -174,10 +179,14 @@ public class ImporterStoreClient<RequestClass, ResponseClass>
}
@Override
protected ImportSSTGrpc.ImportSSTStub getAsyncStub() {
protected ImportSSTGrpc.ImportSSTFutureStub getAsyncStub() {
return asyncStub.withDeadlineAfter(conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
}
protected ImportSSTGrpc.ImportSSTStub getStub() {
return stub.withDeadlineAfter(conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public void close() throws Exception {}
@ -209,10 +218,11 @@ public class ImporterStoreClient<RequestClass, ResponseClass>
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel);
ImportSSTGrpc.ImportSSTStub asyncStub = ImportSSTGrpc.newStub(channel);
ImportSSTGrpc.ImportSSTFutureStub asyncStub = ImportSSTGrpc.newFutureStub(channel);
ImportSSTGrpc.ImportSSTStub stub = ImportSSTGrpc.newStub(channel);
return new ImporterStoreClient<RequestClass, ResponseClass>(
conf, channelFactory, blockingStub, asyncStub);
conf, channelFactory, blockingStub, asyncStub, stub);
}
}
}

View File

@ -0,0 +1,26 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
public interface SlowLog {
void addProperty(String key, String value);
SlowLogSpan start(String name);
void log();
}

View File

@ -0,0 +1,35 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
public class SlowLogEmptyImpl implements SlowLog {
public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl();
private SlowLogEmptyImpl() {}
@Override
public void addProperty(String key, String value) {}
@Override
public SlowLogSpan start(String name) {
return SlowLogSpanEmptyImpl.INSTANCE;
}
@Override
public void log() {}
}

View File

@ -0,0 +1,93 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SlowLogImpl implements SlowLog {
private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class);
private static final int MAX_SPAN_SIZE = 1024;
public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS");
private final List<SlowLogSpan> slowLogSpans = new ArrayList<>();
private final long startMS;
private final long slowThresholdMS;
/** Key-Value pairs which will be logged, e.g. function name, key, region, etc. */
private final Map<String, String> properties;
public SlowLogImpl(long slowThresholdMS, Map<String, String> properties) {
this.startMS = System.currentTimeMillis();
this.slowThresholdMS = slowThresholdMS;
this.properties = new HashMap<>(properties);
}
@Override
public void addProperty(String key, String value) {
this.properties.put(key, value);
}
@Override
public synchronized SlowLogSpan start(String name) {
SlowLogSpan slowLogSpan = new SlowLogSpanImpl(name);
if (slowLogSpans.size() < MAX_SPAN_SIZE) {
slowLogSpans.add(slowLogSpan);
}
slowLogSpan.start();
return slowLogSpan;
}
@Override
public void log() {
long currentMS = System.currentTimeMillis();
if (slowThresholdMS >= 0 && currentMS - startMS > slowThresholdMS) {
logger.warn("SlowLog:" + getSlowLogString(currentMS));
}
}
private String getSlowLogString(long currentMS) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("start", DATE_FORMAT.format(startMS));
jsonObject.addProperty("end", DATE_FORMAT.format(currentMS));
jsonObject.addProperty("duration", (currentMS - startMS) + "ms");
for (Map.Entry<String, String> entry : properties.entrySet()) {
jsonObject.addProperty(entry.getKey(), entry.getValue());
}
JsonArray jsonArray = new JsonArray();
for (SlowLogSpan slowLogSpan : slowLogSpans) {
jsonArray.add(slowLogSpan.toJsonElement());
}
jsonObject.add("spans", jsonArray);
return jsonObject.toString();
}
}

View File

@ -0,0 +1,28 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
import com.google.gson.JsonElement;
public interface SlowLogSpan {
void start();
void end();
JsonElement toJsonElement();
}

View File

@ -0,0 +1,39 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
public class SlowLogSpanEmptyImpl implements SlowLogSpan {
public static final SlowLogSpanEmptyImpl INSTANCE = new SlowLogSpanEmptyImpl();
private SlowLogSpanEmptyImpl() {}
@Override
public void start() {}
@Override
public void end() {}
@Override
public JsonElement toJsonElement() {
return new JsonObject();
}
}

View File

@ -0,0 +1,77 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
import static org.tikv.common.log.SlowLogImpl.DATE_FORMAT;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
public class SlowLogSpanImpl implements SlowLogSpan {
private final String name;
private long startMS;
private long endMS;
public SlowLogSpanImpl(String name) {
this.name = name;
this.startMS = 0;
this.endMS = 0;
}
@Override
public void start() {
this.startMS = System.currentTimeMillis();
}
@Override
public void end() {
this.endMS = System.currentTimeMillis();
}
@Override
public JsonElement toJsonElement() {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("name", name);
jsonObject.addProperty("start", getStartString());
jsonObject.addProperty("end", getEndString());
jsonObject.addProperty("duration", getDurationString());
return jsonObject;
}
private String getStartString() {
if (startMS == 0) {
return "N/A";
}
return DATE_FORMAT.format(startMS);
}
private String getEndString() {
if (endMS == 0) {
return "N/A";
}
return DATE_FORMAT.format(endMS);
}
private String getDurationString() {
if (startMS == 0 || endMS == 0) {
return "N/A";
}
return (endMS - startMS) + "ms";
}
}

View File

@ -94,8 +94,6 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
Errorpb.Error error = regionHandler.getRegionError(resp);
if (error != null) {
return regionHandler.handleRegionError(backOffer, error);
} else {
regionHandler.tryUpdateRegionStore();
}
// Key error handling logic

View File

@ -19,6 +19,7 @@ package org.tikv.common.operation;
import static org.tikv.common.pd.PDError.buildFromPdpbError;
import io.grpc.StatusRuntimeException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,6 +75,10 @@ public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
// store id is not found
if (e instanceof StatusRuntimeException && e.getMessage().contains("invalid store ID")) {
return false;
}
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
client.updateLeaderOrforwardFollower();
return true;

View File

@ -3,17 +3,21 @@ package org.tikv.common.operation;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.region.RegionErrorReceiver;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Metapb;
public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
private static final Logger logger = LoggerFactory.getLogger(RegionErrorHandler.class);
@ -42,16 +46,10 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
Errorpb.Error error = getRegionError(resp);
if (error != null) {
return handleRegionError(backOffer, error);
} else {
tryUpdateRegionStore();
}
return false;
}
public void tryUpdateRegionStore() {
recv.tryUpdateRegionStore();
}
public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
if (error.hasNotLeader()) {
// this error is reported from raftstore:
@ -114,11 +112,9 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
// throwing it out.
return false;
} else if (error.hasEpochNotMatch()) {
// this error is reported from raftstore:
// region has outdated versionplease try later.
logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion()));
this.regionManager.onRegionStale(recv.getRegion());
return false;
logger.warn(
String.format("tikv reports `EpochNotMatch` retry later, region: %s", recv.getRegion()));
return onRegionEpochNotMatch(backOffer, error.getEpochNotMatch().getCurrentRegionsList());
} else if (error.hasServerIsBusy()) {
// this error is reported from kv:
// will occur when write pressure is high. Please try later.
@ -170,17 +166,70 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
return false;
}
// ref: https://github.com/tikv/client-go/blob/tidb-5.2/internal/locate/region_request.go#L985
// OnRegionEpochNotMatch removes the old region and inserts new regions into the cache.
// It returns whether retries the request because it's possible the region epoch is ahead of
// TiKV's due to slow appling.
private boolean onRegionEpochNotMatch(BackOffer backOffer, List<Metapb.Region> currentRegions) {
if (currentRegions.size() == 0) {
this.regionManager.onRegionStale(recv.getRegion());
return false;
}
// Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff.
for (Metapb.Region meta : currentRegions) {
if (meta.getId() == recv.getRegion().getId()
&& (meta.getRegionEpoch().getConfVer() < recv.getRegion().getVerID().getConfVer()
|| meta.getRegionEpoch().getVersion() < recv.getRegion().getVerID().getVer())) {
String errorMsg =
String.format(
"region epoch is ahead of tikv, region: %s, currentRegions: %s",
recv.getRegion(), currentRegions);
logger.info(errorMsg);
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new TiKVException(errorMsg));
return true;
}
}
boolean needInvalidateOld = true;
List<TiRegion> newRegions = new ArrayList<>(currentRegions.size());
// If the region epoch is not ahead of TiKV's, replace region meta in region cache.
for (Metapb.Region meta : currentRegions) {
TiRegion region = regionManager.createRegion(meta, backOffer);
newRegions.add(region);
if (recv.getRegion().getVerID() == region.getVerID()) {
needInvalidateOld = false;
}
}
if (needInvalidateOld) {
this.regionManager.onRegionStale(recv.getRegion());
}
for (TiRegion region : newRegions) {
regionManager.insertRegionToCache(region);
}
return false;
}
@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
if (recv.onStoreUnreachable()) {
if (recv.onStoreUnreachable(backOffer.getSlowLog())) {
if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) {
regionManager.onRequestFail(recv.getRegion());
throw new GrpcException("retry is exhausted.", e);
}
return true;
}
logger.warn("request failed because of: " + e.getMessage());
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoTiKVRPC,
new GrpcException(
"send tikv request error: " + e.getMessage() + ", try next peer later", e));
if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) {
regionManager.onRequestFail(recv.getRegion());
throw new GrpcException(
"send tikv request error: " + e.getMessage() + ", try next peer later", e);
}
// TiKV maybe down, so do not retry in `callWithRetry`
// should re-fetch the new leader from PD and send request to it
return false;

View File

@ -25,10 +25,10 @@ import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;
public class RawScanIterator extends ScanIterator {
private final BackOffer scanBackOffer;
public RawScanIterator(
TiConfiguration conf,
@ -36,15 +36,19 @@ public class RawScanIterator extends ScanIterator {
ByteString startKey,
ByteString endKey,
int limit,
boolean keyOnly) {
boolean keyOnly,
BackOffer scanBackOffer) {
super(conf, builder, startKey, endKey, limit, keyOnly);
this.scanBackOffer = scanBackOffer;
}
@Override
TiRegion loadCurrentRegionToCache() throws GrpcException {
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
BackOffer backOffer = scanBackOffer;
while (true) {
try (RegionStoreClient client = builder.build(startKey)) {
client.setTimeout(conf.getScanTimeout());
try (RegionStoreClient client = builder.build(startKey, backOffer)) {
client.setTimeout(conf.getRawKVScanTimeoutInMS());
TiRegion region = client.getRegion();
if (limit <= 0) {
currentCache = null;

View File

@ -21,6 +21,7 @@ import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.util.concurrent.Callable;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.operation.ErrorHandler;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
@ -33,6 +34,12 @@ public abstract class RetryPolicy<RespT> {
.help("grpc request latency.")
.labelNames("type")
.register();
public static final Histogram CALL_WITH_RETRY_DURATION =
Histogram.build()
.name("client_java_call_with_retry_duration")
.help("callWithRetry duration.")
.labelNames("type")
.register();
public static final Counter GRPC_REQUEST_RETRY_NUM =
Counter.build()
.name("client_java_grpc_requests_retry_num")
@ -61,38 +68,49 @@ public abstract class RetryPolicy<RespT> {
}
}
public RespT callWithRetry(Callable<RespT> proc, String methodName) {
while (true) {
RespT result = null;
try {
// add single request duration histogram
Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer backOffer) {
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer();
SlowLogSpan callWithRetrySlowLogSpan =
backOffer.getSlowLog().start("callWithRetry " + methodName);
try {
while (true) {
RespT result = null;
try {
result = proc.call();
} finally {
requestTimer.observeDuration();
// add single request duration histogram
Histogram.Timer requestTimer =
GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC " + methodName);
try {
result = proc.call();
} finally {
slowLogSpan.end();
requestTimer.observeDuration();
}
} catch (Exception e) {
rethrowNotRecoverableException(e);
// Handle request call error
boolean retry = handler.handleRequestError(backOffer, e);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
continue;
} else {
return result;
}
}
} catch (Exception e) {
rethrowNotRecoverableException(e);
// Handle request call error
boolean retry = handler.handleRequestError(backOffer, e);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
continue;
} else {
return result;
}
}
// Handle response error
if (handler != null) {
boolean retry = handler.handleResponseError(backOffer, result);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
continue;
// Handle response error
if (handler != null) {
boolean retry = handler.handleResponseError(backOffer, result);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
continue;
}
}
return result;
}
return result;
} finally {
callWithRetryTimer.observeDuration();
callWithRetrySlowLogSpan.end();
}
}

View File

@ -20,34 +20,49 @@ package org.tikv.common.region;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.prometheus.client.Histogram;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.util.ChannelFactory;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;
public abstract class AbstractRegionStoreClient
extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvStub>
extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvFutureStub>
implements RegionErrorReceiver {
private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class);
public static final Histogram SEEK_LEADER_STORE_DURATION =
Histogram.build()
.name("client_java_seek_leader_store_duration")
.help("seek leader store duration.")
.register();
public static final Histogram SEEK_PROXY_STORE_DURATION =
Histogram.build()
.name("client_java_seek_proxy_store_duration")
.help("seek proxy store duration.")
.register();
protected final RegionManager regionManager;
protected TiRegion region;
protected TiStore targetStore;
protected TiStore originStore;
private long retryForwardTimes;
private long retryLeaderTimes;
private Metapb.Peer candidateLeader;
protected TiStore store;
protected AbstractRegionStoreClient(
TiConfiguration conf,
@ -55,7 +70,7 @@ public abstract class AbstractRegionStoreClient
TiStore store,
ChannelFactory channelFactory,
TikvGrpc.TikvBlockingStub blockingStub,
TikvGrpc.TikvStub asyncStub,
TikvGrpc.TikvFutureStub asyncStub,
RegionManager regionManager) {
super(conf, channelFactory, blockingStub, asyncStub);
checkNotNull(region, "Region is empty");
@ -63,15 +78,13 @@ public abstract class AbstractRegionStoreClient
checkArgument(region.getLeader() != null, "Leader Peer is null");
this.region = region;
this.regionManager = regionManager;
this.targetStore = store;
this.originStore = null;
this.candidateLeader = null;
this.retryForwardTimes = 0;
this.retryLeaderTimes = 0;
if (this.targetStore.getProxyStore() != null) {
this.store = store;
if (this.store.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
} else if (!this.targetStore.isReachable() && !this.targetStore.canForwardFirst()) {
onStoreUnreachable();
} else if (!this.store.isReachable()) {
// cannot get Deadline or SlowLog instance here
// use SlowLogEmptyImpl instead to skip slow log record
onStoreUnreachable(SlowLogEmptyImpl.INSTANCE);
}
}
@ -86,7 +99,7 @@ public abstract class AbstractRegionStoreClient
}
@Override
protected TikvGrpc.TikvStub getAsyncStub() {
protected TikvGrpc.TikvFutureStub getAsyncStub() {
return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}
@ -110,204 +123,252 @@ public abstract class AbstractRegionStoreClient
return false;
}
// If we try one peer but find the leader has not changed, we do not need try other peers.
if (candidateLeader != null
&& region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) {
retryLeaderTimes = newRegion.getFollowerList().size();
originStore = null;
// If we try one peer but find the leader has not changed, we do not need to try other peers.
if (region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) {
store = null;
}
candidateLeader = null;
region = newRegion;
targetStore = regionManager.getStoreById(region.getLeader().getStoreId());
store = regionManager.getStoreById(region.getLeader().getStoreId());
updateClientStub();
return true;
}
@Override
public boolean onStoreUnreachable() {
if (!targetStore.isValid()) {
logger.warn(
String.format("store [%d] has been invalid", region.getId(), targetStore.getId()));
targetStore = regionManager.getStoreById(targetStore.getId());
public boolean onStoreUnreachable(SlowLog slowLog) {
if (!store.isValid()) {
logger.warn(String.format("store [%d] has been invalid", store.getId()));
store = regionManager.getStoreById(store.getId());
updateClientStub();
return true;
}
if (targetStore.getProxyStore() == null) {
if (targetStore.isReachable()) {
return true;
}
// seek an available leader store to send request
Boolean result = seekLeaderStore(slowLog);
if (result != null) {
return result;
}
// If this store has failed to forward request too many times, we shall try other peer at first
// so that we can
// reduce the latency cost by fail requests.
if (targetStore.canForwardFirst()) {
if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) {
return retryOtherStoreByProxyForward();
}
if (retryOtherStoreLeader()) {
return true;
}
} else {
if (retryOtherStoreLeader()) {
return true;
}
if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) {
return retryOtherStoreByProxyForward();
}
return true;
if (conf.getEnableGrpcForward()) {
// seek an available proxy store to forward request
return seekProxyStore(slowLog);
}
logger.warn(
String.format(
"retry time exceed for region[%d], invalid this region[%d]",
region.getId(), targetStore.getId()));
regionManager.onRequestFail(region);
return false;
}
protected Kvrpcpb.Context makeContext(TiStoreType storeType) {
if (candidateLeader != null && storeType == TiStoreType.TiKV) {
return region.getReplicaContext(candidateLeader, java.util.Collections.emptySet());
} else {
return region.getReplicaContext(java.util.Collections.emptySet(), storeType);
}
return region.getReplicaContext(java.util.Collections.emptySet(), storeType);
}
protected Kvrpcpb.Context makeContext(Set<Long> resolvedLocks, TiStoreType storeType) {
if (candidateLeader != null && storeType == TiStoreType.TiKV) {
return region.getReplicaContext(candidateLeader, resolvedLocks);
} else {
return region.getReplicaContext(resolvedLocks, storeType);
}
}
@Override
public void tryUpdateRegionStore() {
if (originStore != null) {
if (originStore.getId() == targetStore.getId()) {
logger.warn(
String.format(
"update store [%s] by proxy-store [%s]",
targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress()));
// We do not need to mark the store can-forward, because if one store has grpc forward
// successfully, it will
// create a new store object, which is can-forward.
regionManager.updateStore(originStore, targetStore);
} else {
// If we try to forward request to leader by follower failed, it means that the store of old
// leader may be
// unavailable but the new leader has not been report to PD. So we can ban this store for a
// short time to
// avoid too many request try forward rather than try other peer.
originStore.forwardFail();
}
}
if (candidateLeader != null) {
logger.warn(
String.format(
"update leader to store [%d] for region[%d]",
candidateLeader.getStoreId(), region.getId()));
this.regionManager.updateLeader(region, candidateLeader.getStoreId());
}
}
private boolean retryOtherStoreLeader() {
List<Metapb.Peer> peers = region.getFollowerList();
if (retryLeaderTimes >= peers.size()) {
return false;
}
retryLeaderTimes += 1;
boolean hasVisitedStore = false;
for (Metapb.Peer cur : peers) {
if (candidateLeader == null || hasVisitedStore) {
TiStore store = regionManager.getStoreById(cur.getStoreId());
if (store != null && store.isReachable()) {
targetStore = store;
candidateLeader = cur;
logger.warn(
String.format(
"try store [%d],peer[%d] for region[%d], which may be new leader",
targetStore.getId(), candidateLeader.getId(), region.getId()));
updateClientStub();
return true;
} else {
continue;
}
} else if (candidateLeader.getId() == cur.getId()) {
hasVisitedStore = true;
}
}
candidateLeader = null;
retryLeaderTimes = peers.size();
return false;
return region.getReplicaContext(resolvedLocks, storeType);
}
private void updateClientStub() {
String addressStr = targetStore.getStore().getAddress();
if (targetStore.getProxyStore() != null) {
addressStr = targetStore.getProxyStore().getAddress();
String addressStr = store.getStore().getAddress();
long deadline = timeout;
if (store.getProxyStore() != null) {
addressStr = store.getProxyStore().getAddress();
deadline = conf.getForwardTimeout();
}
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
if (targetStore.getProxyStore() != null) {
blockingStub =
TikvGrpc.newBlockingStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
asyncStub = TikvGrpc.newFutureStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
if (store.getProxyStore() != null) {
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress());
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
blockingStub = MetadataUtils.attachHeaders(blockingStub, header);
asyncStub = MetadataUtils.attachHeaders(asyncStub, header);
}
}
private boolean retryOtherStoreByProxyForward() {
TiStore proxyStore = switchProxyStore();
if (proxyStore == null) {
logger.warn(
String.format(
"no forward store can be selected for store [%s] and region[%d]",
targetStore.getStore().getAddress(), region.getId()));
return false;
}
if (originStore == null) {
originStore = targetStore;
if (this.targetStore.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
private Boolean seekLeaderStore(SlowLog slowLog) {
Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer();
SlowLogSpan slowLogSpan = slowLog.start("seekLeaderStore");
try {
List<Metapb.Peer> peers = region.getFollowerList();
if (peers.isEmpty()) {
// no followers available, retry
logger.warn(String.format("no followers of region[%d] available, retry", region.getId()));
regionManager.onRequestFail(region);
return false;
}
}
targetStore = proxyStore;
retryForwardTimes += 1;
updateClientStub();
logger.warn(
String.format(
"forward request to store [%s] by store [%s] for region[%d]",
targetStore.getStore().getAddress(),
targetStore.getProxyStore().getAddress(),
region.getId()));
return true;
}
private TiStore switchProxyStore() {
boolean hasVisitedStore = false;
List<Metapb.Peer> peers = region.getFollowerList();
if (peers.isEmpty()) {
return null;
}
Metapb.Store proxyStore = targetStore.getProxyStore();
if (proxyStore == null || peers.get(peers.size() - 1).getStoreId() == proxyStore.getId()) {
hasVisitedStore = true;
}
for (Metapb.Peer peer : peers) {
if (hasVisitedStore) {
TiStore store = regionManager.getStoreById(peer.getStoreId());
if (store.isReachable()) {
return targetStore.withProxy(store.getStore());
logger.info(String.format("try switch leader: region[%d]", region.getId()));
Metapb.Peer peer = switchLeaderStore();
if (peer != null) {
// we found a leader
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId());
if (currentLeaderStore.isReachable()) {
logger.info(
String.format(
"update leader using switchLeader logic from store[%d] to store[%d]",
region.getLeader().getStoreId(), peer.getStoreId()));
// update region cache
region = regionManager.updateLeader(region, peer.getStoreId());
// switch to leader store
store = currentLeaderStore;
updateClientStub();
return true;
}
} else if (peer.getStoreId() == proxyStore.getId()) {
hasVisitedStore = true;
} else {
// no leader found, some response does not return normally, there may be network partition.
logger.warn(
String.format(
"leader for region[%d] is not found, it is possible that network partition occurred",
region.getId()));
}
} finally {
switchLeaderDurationTimer.observeDuration();
slowLogSpan.end();
}
return null;
}
private boolean seekProxyStore(SlowLog slowLog) {
SlowLogSpan slowLogSpan = slowLog.start("seekProxyStore");
Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer();
try {
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
// when current leader cannot be reached
TiStore storeWithProxy = switchProxyStore();
if (storeWithProxy == null) {
// no store available, retry
logger.warn(String.format("No store available, retry: region[%d]", region.getId()));
return false;
}
// use proxy store to forward requests
regionManager.updateStore(store, storeWithProxy);
store = storeWithProxy;
updateClientStub();
return true;
} finally {
grpcForwardDurationTimer.observeDuration();
slowLogSpan.end();
}
}
// first: leader peer, second: true if any responses returned with grpc error
private Metapb.Peer switchLeaderStore() {
List<SwitchLeaderTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(peer))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
responses.add(new SwitchLeaderTask(task, peer));
}
while (true) {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
throw new GrpcException(e);
}
List<SwitchLeaderTask> unfinished = new LinkedList<>();
for (SwitchLeaderTask task : responses) {
if (!task.task.isDone()) {
unfinished.add(task);
continue;
}
try {
Kvrpcpb.RawGetResponse resp = task.task.get();
if (resp != null) {
if (!resp.hasRegionError()) {
// the peer is leader
logger.info(
String.format("rawGet response indicates peer[%d] is leader", task.peer.getId()));
return task.peer;
}
}
} catch (Exception ignored) {
}
}
if (unfinished.isEmpty()) {
return null;
}
responses = unfinished;
}
}
private TiStore switchProxyStore() {
long forwardTimeout = conf.getForwardTimeout();
List<ForwardCheckTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS);
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(peer))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task =
MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest);
responses.add(new ForwardCheckTask(task, peerStore.getStore()));
}
while (true) {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
throw new GrpcException(e);
}
List<ForwardCheckTask> unfinished = new LinkedList<>();
for (ForwardCheckTask task : responses) {
if (!task.task.isDone()) {
unfinished.add(task);
continue;
}
try {
// any answer will do
Kvrpcpb.RawGetResponse resp = task.task.get();
logger.info(
String.format(
"rawGetResponse indicates forward from [%s] to [%s]",
task.store.getAddress(), store.getAddress()));
return store.withProxy(task.store);
} catch (Exception ignored) {
}
}
if (unfinished.isEmpty()) {
return null;
}
responses = unfinished;
}
}
private static class SwitchLeaderTask {
private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
private final Metapb.Peer peer;
private SwitchLeaderTask(ListenableFuture<Kvrpcpb.RawGetResponse> task, Metapb.Peer peer) {
this.task = task;
this.peer = peer;
}
}
private static class ForwardCheckTask {
private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
private final Metapb.Store store;
private ForwardCheckTask(ListenableFuture<Kvrpcpb.RawGetResponse> task, Metapb.Store store) {
this.task = task;
this.store = store;
}
}
}

View File

@ -104,6 +104,18 @@ public class RegionCache {
}
}
public synchronized void insertRegionToCache(TiRegion region) {
try {
TiRegion oldRegion = regionCache.get(region.getId());
if (oldRegion != null) {
keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey()));
}
regionCache.put(region.getId(), region);
keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId());
} catch (Exception ignore) {
}
}
public synchronized boolean updateRegion(TiRegion expected, TiRegion region) {
try {
if (logger.isDebugEnabled()) {

View File

@ -17,13 +17,13 @@
package org.tikv.common.region;
import org.tikv.common.log.SlowLog;
public interface RegionErrorReceiver {
boolean onNotLeader(TiRegion region);
/// return whether we need to retry this request.
boolean onStoreUnreachable();
void tryUpdateRegionStore();
boolean onStoreUnreachable(SlowLog slowLog);
TiRegion getRegion();
}

View File

@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.InvalidStoreException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
@ -158,9 +159,6 @@ public class RegionManager {
if (storeType == TiStoreType.TiKV) {
Peer peer = region.getCurrentReplica();
store = getStoreById(peer.getStoreId(), backOffer);
if (store == null) {
cache.clearAll();
}
} else {
outerLoop:
for (Peer peer : region.getLearnerList()) {
@ -174,19 +172,19 @@ public class RegionManager {
}
}
if (store == null) {
// clear the region cache so we may get the learner peer next time
// clear the region cache, so we may get the learner peer next time
cache.invalidateRegion(region);
}
}
if (store == null) {
throw new TiClientInternalException(
"Cannot find valid store on " + storeType + " for region " + region);
}
return Pair.create(region, store);
}
public TiRegion createRegion(Metapb.Region region, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, null, peers, stores);
}
private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
@ -194,16 +192,26 @@ public class RegionManager {
}
private List<TiStore> getRegionStore(List<Metapb.Peer> peers, BackOffer backOffer) {
return peers.stream().map(p -> getStoreById(p.getStoreId())).collect(Collectors.toList());
return peers
.stream()
.map(p -> getStoreById(p.getStoreId(), backOffer))
.collect(Collectors.toList());
}
public TiStore getStoreById(long id, BackOffer backOffer) {
private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) {
try {
TiStore store = cache.getStoreById(id);
if (store == null) {
store = new TiStore(pdClient.getStore(backOffer, id));
}
// if we did not get store info from pd, remove store from cache
if (store.getStore() == null) {
logger.warn(String.format("failed to get store %d from pd", id));
return null;
}
// if the store is already tombstone, remove store from cache
if (store.getStore().getState().equals(StoreState.Tombstone)) {
logger.warn(String.format("store %d is tombstone", id));
return null;
}
if (cache.putStore(id, store) && storeChecker != null) {
@ -219,6 +227,16 @@ public class RegionManager {
return getStoreById(id, defaultBackOff());
}
public TiStore getStoreById(long id, BackOffer backOffer) {
TiStore store = getStoreByIdWithBackOff(id, backOffer);
if (store == null) {
logger.warn(String.format("failed to fetch store %d, the store may be missing", id));
cache.clearAll();
throw new InvalidStoreException(id);
}
return store;
}
public void onRegionStale(TiRegion region) {
cache.invalidateRegion(region);
}
@ -265,6 +283,10 @@ public class RegionManager {
cache.invalidateRegion(region);
}
public void insertRegionToCache(TiRegion region) {
cache.insertRegionToCache(region);
}
private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS());
}

View File

@ -48,7 +48,7 @@ import org.tikv.kvproto.Kvrpcpb.*;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub;
import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
import org.tikv.txn.AbstractLockResolverClient;
import org.tikv.txn.Lock;
import org.tikv.txn.ResolveLockResult;
@ -93,7 +93,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
TiStoreType storeType,
ChannelFactory channelFactory,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
TikvFutureStub asyncStub,
RegionManager regionManager,
PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
@ -124,7 +124,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
TikvBlockingStub tikvBlockingStub = TikvGrpc.newBlockingStub(channel);
TikvStub tikvAsyncStub = TikvGrpc.newStub(channel);
TikvGrpc.TikvFutureStub tikvAsyncStub = TikvGrpc.newFutureStub(channel);
this.lockResolverClient =
AbstractLockResolverClient.getInstance(
@ -1264,7 +1264,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
ManagedChannel channel = null;
TikvBlockingStub blockingStub = null;
TikvStub asyncStub = null;
TikvFutureStub asyncStub = null;
if (conf.getEnableGrpcForward() && store.getProxyStore() != null && !store.isReachable()) {
addressStr = store.getProxyStore().getAddress();
@ -1273,11 +1273,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header);
asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header);
asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newFutureStub(channel), header);
} else {
channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
asyncStub = TikvGrpc.newFutureStub(channel);
}
return new RegionStoreClient(

View File

@ -110,14 +110,9 @@ public class StoreHealthyChecker implements Runnable {
}
} else {
if (!store.isReachable()) {
logger.warn(
String.format(
"store [%s] recovers to be reachable and canforward", store.getAddress()));
logger.warn(String.format("store [%s] recovers to be reachable", store.getAddress()));
store.markReachable();
}
if (!store.canForwardFirst()) {
store.makrCanForward();
}
}
} else if (store.isReachable()) {
unreachableStore.add(store);

View File

@ -82,6 +82,10 @@ public class TiRegion implements Serializable {
replicaIdx = 0;
}
public TiConfiguration getConf() {
return conf;
}
public Peer getLeader() {
return leader;
}
@ -155,6 +159,10 @@ public class TiRegion implements Serializable {
return getContext(currentPeer, resolvedLocks, false);
}
public Kvrpcpb.Context getReplicaContext(Peer currentPeer) {
return getContext(currentPeer, java.util.Collections.emptySet(), false);
}
private Kvrpcpb.Context getContext(
Peer currentPeer, Set<Long> resolvedLocks, boolean replicaRead) {
@ -271,6 +279,18 @@ public class TiRegion implements Serializable {
this.ver = ver;
}
public long getId() {
return id;
}
public long getConfVer() {
return confVer;
}
public long getVer() {
return ver;
}
@Override
public boolean equals(Object other) {
if (this == other) {

View File

@ -2,25 +2,19 @@ package org.tikv.common.region;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.tikv.kvproto.Metapb;
public class TiStore implements Serializable {
private static final long MAX_FAIL_FORWARD_TIMES = 4;
private final Metapb.Store store;
private final Metapb.Store proxyStore;
private final AtomicBoolean reachable;
private final AtomicBoolean valid;
private final AtomicLong failForwardCount;
private final AtomicBoolean canForward;
public TiStore(Metapb.Store store) {
this.store = store;
this.reachable = new AtomicBoolean(true);
this.valid = new AtomicBoolean(true);
this.canForward = new AtomicBoolean(true);
this.proxyStore = null;
this.failForwardCount = new AtomicLong(0);
}
private TiStore(Metapb.Store store, Metapb.Store proxyStore) {
@ -31,9 +25,7 @@ public class TiStore implements Serializable {
this.reachable = new AtomicBoolean(true);
}
this.valid = new AtomicBoolean(true);
this.canForward = new AtomicBoolean(true);
this.proxyStore = proxyStore;
this.failForwardCount = new AtomicLong(0);
}
@java.lang.Override
@ -82,23 +74,6 @@ public class TiStore implements Serializable {
this.valid.set(false);
}
public void forwardFail() {
if (this.canForward.get()) {
if (this.failForwardCount.addAndGet(1) >= MAX_FAIL_FORWARD_TIMES) {
this.canForward.set(false);
}
}
}
public void makrCanForward() {
this.failForwardCount.set(0);
this.canForward.set(true);
}
public boolean canForwardFirst() {
return this.canForward.get();
}
public Metapb.Store getStore() {
return this.store;
}

View File

@ -1,7 +1,6 @@
package org.tikv.common.util;
import java.util.concurrent.ThreadLocalRandom;
import org.tikv.common.exception.GrpcException;
public class BackOffFunction {
private final int base;
@ -25,7 +24,7 @@ public class BackOffFunction {
* Do back off in exponential with optional jitters according to different back off strategies.
* See http://www.awsarchitectureblog.com/2015/03/backoff.html
*/
long doBackOff(long maxSleepMs) {
long getSleepMs(long maxSleepMs) {
long sleep = 0;
long v = expo(base, cap, attempts);
switch (strategy) {
@ -47,11 +46,6 @@ public class BackOffFunction {
sleep = maxSleepMs;
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new GrpcException(e);
}
attempts++;
lastSleep = sleep;
return lastSleep;

View File

@ -17,6 +17,8 @@
package org.tikv.common.util;
import org.tikv.common.log.SlowLog;
public interface BackOffer {
// Back off types.
int seconds = 1000;
@ -37,6 +39,12 @@ public interface BackOffer {
* max back off time exceeded and throw an exception to the caller.
*/
void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err);
/**
* canRetryAfterSleep sleeps a while base on the BackOffType and records the error message. Will
* stop until max back off time exceeded and throw an exception to the caller. It will return
* false if the total sleep time has exceed some limit condition.
*/
boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType);
/**
* BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message and
@ -56,4 +64,6 @@ public interface BackOffer {
// DecorrJitter increases the maximum jitter based on the last random value.
DecorrJitter
}
SlowLog getSlowLog();
}

View File

@ -100,7 +100,7 @@ public class ChannelFactory implements AutoCloseable {
.idleTimeout(60, TimeUnit.SECONDS);
if (sslContextBuilder == null) {
return builder.usePlaintext(true).build();
return builder.usePlaintext().build();
} else {
SslContext sslContext = null;
try {

View File

@ -19,6 +19,7 @@ import com.google.protobuf.ByteString;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.tikv.common.exception.TiKVException;
@ -170,10 +171,14 @@ public class ClientUtils {
ExecutorCompletionService<List<T>> completionService,
Queue<List<T>> taskQueue,
List<T> batches,
int backOff) {
long backOff) {
try {
for (int i = 0; i < batches.size(); i++) {
List<T> task = completionService.take().get(backOff, TimeUnit.MILLISECONDS);
Future<List<T>> future = completionService.poll(backOff, TimeUnit.MILLISECONDS);
if (future == null) {
throw new TiKVException("TimeOut Exceeded for current operation.");
}
List<T> task = future.get();
if (!task.isEmpty()) {
taskQueue.offer(task);
}
@ -181,8 +186,6 @@ public class ClientUtils {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
@ -192,11 +195,16 @@ public class ClientUtils {
ExecutorCompletionService<Pair<List<T>, List<U>>> completionService,
Queue<List<T>> taskQueue,
List<T> batches,
int backOff) {
long backOff) {
try {
List<U> result = new ArrayList<>();
for (int i = 0; i < batches.size(); i++) {
Pair<List<T>, List<U>> task = completionService.take().get(backOff, TimeUnit.MILLISECONDS);
Future<Pair<List<T>, List<U>>> future =
completionService.poll(backOff, TimeUnit.MILLISECONDS);
if (future == null) {
throw new TiKVException("TimeOut Exceeded for current operation.");
}
Pair<List<T>, List<U>> task = future.get();
if (!task.first.isEmpty()) {
taskQueue.offer(task.first);
} else {
@ -207,8 +215,6 @@ public class ClientUtils {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}

View File

@ -17,14 +17,21 @@
package org.tikv.common.util;
import static org.tikv.common.ConfigUtils.TIKV_BO_REGION_MISS_BASE_IN_MS;
import com.google.common.base.Preconditions;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogSpan;
public class ConcreteBackOffer implements BackOffer {
private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class);
@ -32,12 +39,26 @@ public class ConcreteBackOffer implements BackOffer {
private final Map<BackOffFunction.BackOffFuncType, BackOffFunction> backOffFunctionMap;
private final List<Exception> errors;
private int totalSleep;
private final long deadline;
private final SlowLog slowLog;
private ConcreteBackOffer(int maxSleep) {
public static final Histogram BACKOFF_DURATION =
Histogram.build()
.name("client_java_backoff_duration")
.help("backoff duration.")
.labelNames("type")
.register();
private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog) {
Preconditions.checkArgument(
maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0.");
Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0.");
Preconditions.checkArgument(deadline >= 0, "Deadline cannot be less than 0.");
this.maxSleep = maxSleep;
this.errors = new ArrayList<>();
this.backOffFunctionMap = new HashMap<>();
this.deadline = deadline;
this.slowLog = slowLog;
}
private ConcreteBackOffer(ConcreteBackOffer source) {
@ -45,34 +66,41 @@ public class ConcreteBackOffer implements BackOffer {
this.totalSleep = source.totalSleep;
this.errors = source.errors;
this.backOffFunctionMap = source.backOffFunctionMap;
this.deadline = source.deadline;
this.slowLog = source.slowLog;
}
public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) {
long deadline = System.currentTimeMillis() + timeoutInMs;
return new ConcreteBackOffer(0, deadline, slowLog);
}
public static ConcreteBackOffer newCustomBackOff(int maxSleep) {
return new ConcreteBackOffer(maxSleep);
return new ConcreteBackOffer(maxSleep, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newScannerNextMaxBackOff() {
return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF);
return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newBatchGetMaxBackOff() {
return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF);
return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newCopNextMaxBackOff() {
return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF);
return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newGetBackOff() {
return new ConcreteBackOffer(GET_MAX_BACKOFF);
return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newRawKVBackOff() {
return new ConcreteBackOffer(RAWKV_MAX_BACKOFF);
return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newTsoBackOff() {
return new ConcreteBackOffer(TSO_MAX_BACKOFF);
return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer create(BackOffer source) {
@ -96,7 +124,11 @@ public class ConcreteBackOffer implements BackOffer {
backOffFunction = BackOffFunction.create(2000, 10000, BackOffStrategy.EqualJitter);
break;
case BoRegionMiss:
backOffFunction = BackOffFunction.create(100, 500, BackOffStrategy.NoJitter);
backOffFunction =
BackOffFunction.create(
TiConfiguration.getInt(TIKV_BO_REGION_MISS_BASE_IN_MS),
500,
BackOffStrategy.NoJitter);
break;
case BoTxnLock:
backOffFunction = BackOffFunction.create(200, 3000, BackOffStrategy.EqualJitter);
@ -105,7 +137,7 @@ public class ConcreteBackOffer implements BackOffer {
backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter);
break;
case BoTiKVRPC:
backOffFunction = BackOffFunction.create(100, 400, BackOffStrategy.EqualJitter);
backOffFunction = BackOffFunction.create(10, 400, BackOffStrategy.EqualJitter);
break;
case BoTxnNotFound:
backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter);
@ -120,32 +152,72 @@ public class ConcreteBackOffer implements BackOffer {
}
@Override
public void doBackOffWithMaxSleep(
BackOffFunction.BackOffFuncType funcType, long maxSleepMs, Exception err) {
public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) {
return canRetryAfterSleep(funcType, -1);
}
public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) {
SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name());
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
BackOffFunction backOffFunction =
backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);
// Back off will be done here
totalSleep += backOffFunction.doBackOff(maxSleepMs);
// Back off will not be done here
long sleep = backOffFunction.getSleepMs(maxSleepMs);
totalSleep += sleep;
// Check deadline
if (deadline > 0) {
long currentMs = System.currentTimeMillis();
if (currentMs + sleep >= deadline) {
logger.warn(String.format("Deadline %d is exceeded, errors:", deadline));
return false;
}
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new GrpcException(e);
} finally {
slowLogSpan.end();
backOffTimer.observeDuration();
}
if (maxSleep > 0 && totalSleep >= maxSleep) {
logger.warn(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep));
return false;
}
return true;
}
@Override
public void doBackOffWithMaxSleep(
BackOffFunction.BackOffFuncType funcType, long maxSleepMs, Exception err) {
logger.debug(
String.format(
"%s, retry later(totalSleep %dms, maxSleep %dms)",
err.getMessage(), totalSleep, maxSleep));
errors.add(err);
if (maxSleep > 0 && totalSleep >= maxSleep) {
StringBuilder errMsg =
new StringBuilder(
String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep));
for (int i = 0; i < errors.size(); i++) {
Exception curErr = errors.get(i);
// Print only last 3 errors for non-DEBUG log levels.
if (logger.isDebugEnabled() || i >= errors.size() - 3) {
errMsg.append("\n").append(i).append(".").append(curErr.toString());
}
}
logger.warn(errMsg.toString());
// Use the last backoff type to generate an exception
throw new GrpcException("retry is exhausted.", err);
if (!canRetryAfterSleep(funcType, maxSleepMs)) {
logThrowError(err);
}
}
private void logThrowError(Exception err) {
StringBuilder errMsg = new StringBuilder();
for (int i = 0; i < errors.size(); i++) {
Exception curErr = errors.get(i);
// Print only last 3 errors for non-DEBUG log levels.
if (logger.isDebugEnabled() || i >= errors.size() - 3) {
errMsg.append("\n").append(i).append(".").append(curErr.toString());
}
}
logger.warn(errMsg.toString());
// Use the last backoff type to generate an exception
throw new GrpcException("retry is exhausted.", err);
}
@Override
public SlowLog getSlowLog() {
return slowLog;
}
}

View File

@ -27,12 +27,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.RawCASConflictException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.importer.ImporterClient;
import org.tikv.common.importer.SwitchTiKVModeClient;
import org.tikv.common.key.Key;
import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogImpl;
import org.tikv.common.operation.iterator.RawScanIterator;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
@ -122,16 +126,27 @@ public class RawKVClient implements AutoCloseable {
public void put(ByteString key, ByteString value, long ttl) {
String label = "client_raw_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVWriteSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "put");
put("key", KeyUtils.formatBytesUTF8(key));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog);
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
try {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
slowLog.addProperty("region", client.getRegion().toString());
client.rawPut(backOffer, key, value, ttl, atomicForCAS);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("Retry for put error", e);
}
}
} catch (Exception e) {
@ -139,6 +154,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -208,16 +224,27 @@ public class RawKVClient implements AutoCloseable {
String label = "client_raw_compare_and_set";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVWriteSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "putIfAbsent");
put("key", KeyUtils.formatBytesUTF8(key));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog);
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
try {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
slowLog.addProperty("region", client.getRegion().toString());
client.rawCompareAndSet(backOffer, key, prevValue, value, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("Retry for putIfAbsent error", e);
}
}
} catch (Exception e) {
@ -225,6 +252,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -246,14 +274,27 @@ public class RawKVClient implements AutoCloseable {
public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVBatchWriteSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "batchPut");
put("keySize", String.valueOf(kvPairs.size()));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog);
try {
doSendBatchPut(defaultBackOff(), kvPairs, ttl);
long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS();
doSendBatchPut(backOffer, kvPairs, ttl, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc();
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -266,16 +307,28 @@ public class RawKVClient implements AutoCloseable {
public Optional<ByteString> get(ByteString key) {
String label = "client_raw_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVReadSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "get");
put("key", KeyUtils.formatBytesUTF8(key));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog);
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
try {
Optional<ByteString> result = client.rawGet(defaultBackOff(), key);
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
slowLog.addProperty("region", client.getRegion().toString());
Optional<ByteString> result = client.rawGet(backOffer, key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("Retry for get error", e);
}
}
} catch (Exception e) {
@ -283,6 +336,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -295,9 +349,20 @@ public class RawKVClient implements AutoCloseable {
public List<KvPair> batchGet(List<ByteString> keys) {
String label = "client_raw_batch_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVBatchReadSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "batchGet");
put("keySize", String.valueOf(keys.size()));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS(), slowLog);
try {
BackOffer backOffer = defaultBackOff();
List<KvPair> result = doSendBatchGet(backOffer, keys);
long deadline = System.currentTimeMillis() + conf.getRawKVBatchReadTimeoutInMS();
List<KvPair> result = doSendBatchGet(backOffer, keys, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (Exception e) {
@ -305,6 +370,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -316,9 +382,20 @@ public class RawKVClient implements AutoCloseable {
public void batchDelete(List<ByteString> keys) {
String label = "client_raw_batch_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVBatchWriteSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "batchDelete");
put("keySize", String.valueOf(keys.size()));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog);
try {
BackOffer backOffer = defaultBackOff();
doSendBatchDelete(backOffer, keys);
long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS();
doSendBatchDelete(backOffer, keys, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (Exception e) {
@ -326,6 +403,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -339,16 +417,27 @@ public class RawKVClient implements AutoCloseable {
public Optional<Long> getKeyTTL(ByteString key) {
String label = "client_raw_get_key_ttl";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVReadSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "getKeyTTL");
put("key", KeyUtils.formatBytesUTF8(key));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog);
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
try {
Optional<Long> result = client.rawGetKeyTTL(defaultBackOff(), key);
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
slowLog.addProperty("region", client.getRegion().toString());
Optional<Long> result = client.rawGetKeyTTL(backOffer, key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("Retry for getKeyTTL error", e);
}
}
} catch (Exception e) {
@ -356,6 +445,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -405,6 +495,8 @@ public class RawKVClient implements AutoCloseable {
public List<List<KvPair>> batchScan(List<ScanOption> ranges) {
String label = "client_raw_batch_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
long deadline = System.currentTimeMillis() + conf.getRawKVScanTimeoutInMS();
List<Future<Pair<Integer, List<KvPair>>>> futureList = new ArrayList<>();
try {
if (ranges.isEmpty()) {
return new ArrayList<>();
@ -414,7 +506,7 @@ public class RawKVClient implements AutoCloseable {
int num = 0;
for (ScanOption scanOption : ranges) {
int i = num;
completionService.submit(() -> Pair.create(i, scan(scanOption)));
futureList.add(completionService.submit(() -> Pair.create(i, scan(scanOption))));
++num;
}
List<List<KvPair>> scanResults = new ArrayList<>();
@ -423,14 +515,16 @@ public class RawKVClient implements AutoCloseable {
}
for (int i = 0; i < num; i++) {
try {
Pair<Integer, List<KvPair>> scanResult =
completionService.take().get(BackOffer.RAWKV_MAX_BACKOFF, TimeUnit.SECONDS);
Future<Pair<Integer, List<KvPair>>> future =
completionService.poll(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (future == null) {
throw new TiKVException("TimeOut Exceeded for current operation.");
}
Pair<Integer, List<KvPair>> scanResult = future.get();
scanResults.set(scanResult.first, scanResult.second);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
@ -439,6 +533,9 @@ public class RawKVClient implements AutoCloseable {
return scanResults;
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
for (Future<Pair<Integer, List<KvPair>>> future : futureList) {
future.cancel(true);
}
throw e;
} finally {
requestTimer.observeDuration();
@ -469,9 +566,23 @@ public class RawKVClient implements AutoCloseable {
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
String label = "client_raw_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVScanSlowLogInMS(),
new HashMap<String, String>(5) {
{
put("func", "scan");
put("startKey", KeyUtils.formatBytesUTF8(startKey));
put("endKey", KeyUtils.formatBytesUTF8(endKey));
put("limit", String.valueOf(limit));
put("keyOnly", String.valueOf(keyOnly));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog);
try {
Iterator<KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly);
rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, backOffer);
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
RAW_REQUEST_SUCCESS.labels(label).inc();
@ -481,6 +592,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -529,17 +641,37 @@ public class RawKVClient implements AutoCloseable {
public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
String label = "client_raw_scan_without_limit";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVScanSlowLogInMS(),
new HashMap<String, String>(4) {
{
put("func", "scan");
put("startKey", KeyUtils.formatBytesUTF8(startKey));
put("endKey", KeyUtils.formatBytesUTF8(endKey));
put("keyOnly", String.valueOf(keyOnly));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog);
try {
ByteString newStartKey = startKey;
List<KvPair> result = new ArrayList<>();
while (true) {
Iterator<KvPair> iterator =
rawScanIterator(
conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly);
conf,
clientBuilder,
newStartKey,
endKey,
conf.getScanBatchSize(),
keyOnly,
backOffer);
if (!iterator.hasNext()) {
break;
}
iterator.forEachRemaining(result::add);
startKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString();
newStartKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString();
}
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
@ -548,6 +680,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -587,16 +720,27 @@ public class RawKVClient implements AutoCloseable {
public void delete(ByteString key) {
String label = "client_raw_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVWriteSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "delete");
put("key", KeyUtils.formatBytesUTF8(key));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog);
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
try {
client.rawDelete(defaultBackOff(), key, atomicForCAS);
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
slowLog.addProperty("region", client.getRegion().toString());
client.rawDelete(backOffer, key, atomicForCAS);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("Retry for delete error", e);
}
}
} catch (Exception e) {
@ -604,6 +748,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -619,9 +764,12 @@ public class RawKVClient implements AutoCloseable {
public synchronized void deleteRange(ByteString startKey, ByteString endKey) {
String label = "client_raw_delete_range";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(
conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE);
try {
BackOffer backOffer = defaultBackOff();
doSendDeleteRange(backOffer, startKey, endKey);
long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS();
doSendDeleteRange(backOffer, startKey, endKey, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc();
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
@ -730,10 +878,13 @@ public class RawKVClient implements AutoCloseable {
importerClient.write(sortedList.iterator());
}
private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl) {
private void doSendBatchPut(
BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl, long deadline) {
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchPutThreadPool);
List<Future<List<Batch>>> futureList = new ArrayList<>();
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), kvPairs.keySet(), backOffer);
List<Batch> batches = new ArrayList<>();
@ -756,20 +907,28 @@ public class RawKVClient implements AutoCloseable {
for (Batch batch : task) {
completionService.submit(
() -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl));
try {
getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
} catch (Exception e) {
for (Future<List<Batch>> future : futureList) {
future.cancel(true);
}
throw e;
}
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
}
private List<Batch> doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) {
try (RegionStoreClient client = clientBuilder.build(batch.getRegion())) {
client.setTimeout(conf.getScanTimeout());
try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
client.setTimeout(conf.getRawKVBatchWriteTimeoutInMS());
client.rawBatchPut(backOffer, batch, ttl, atomicForCAS);
return new ArrayList<>();
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.debug("ReSplitting ranges for BatchPutRequest");
logger.warn("ReSplitting ranges for BatchPutRequest", e);
// retry
return doSendBatchPutWithRefetchRegion(backOffer, batch);
}
@ -794,10 +953,12 @@ public class RawKVClient implements AutoCloseable {
return retryBatches;
}
private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys) {
private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys, long deadline) {
ExecutorCompletionService<Pair<List<Batch>, List<KvPair>>> completionService =
new ExecutorCompletionService<>(batchGetThreadPool);
List<Future<Pair<List<Batch>, List<KvPair>>>> futureList = new ArrayList<>();
List<Batch> batches =
getBatches(backOffer, keys, RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, this.clientBuilder);
@ -808,11 +969,20 @@ public class RawKVClient implements AutoCloseable {
while (!taskQueue.isEmpty()) {
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
completionService.submit(
() -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch));
futureList.add(
completionService.submit(
() -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch)));
}
try {
result.addAll(
getTasksWithOutput(
completionService, taskQueue, task, deadline - System.currentTimeMillis()));
} catch (Exception e) {
for (Future<Pair<List<Batch>, List<KvPair>>> future : futureList) {
future.cancel(true);
}
throw e;
}
result.addAll(
getTasksWithOutput(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF));
}
return result;
@ -820,14 +990,14 @@ public class RawKVClient implements AutoCloseable {
private Pair<List<Batch>, List<KvPair>> doSendBatchGetInBatchesWithRetry(
BackOffer backOffer, Batch batch) {
RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer);
try {
try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
List<KvPair> partialResult = client.rawBatchGet(backOffer, batch.getKeys());
return Pair.create(new ArrayList<>(), partialResult);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
logger.debug("ReSplitting ranges for BatchGetRequest", e);
logger.warn("ReSplitting ranges for BatchGetRequest", e);
// retry
return Pair.create(doSendBatchGetWithRefetchRegion(backOffer, batch), new ArrayList<>());
@ -839,10 +1009,12 @@ public class RawKVClient implements AutoCloseable {
backOffer, batch.getKeys(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
}
private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys) {
private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys, long deadline) {
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchDeleteThreadPool);
List<Future<List<Batch>>> futureList = new ArrayList<>();
List<Batch> batches =
getBatches(backOffer, keys, RAW_BATCH_DELETE_SIZE, MAX_RAW_BATCH_LIMIT, this.clientBuilder);
@ -852,22 +1024,29 @@ public class RawKVClient implements AutoCloseable {
while (!taskQueue.isEmpty()) {
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
completionService.submit(
() -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch));
futureList.add(
completionService.submit(
() -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch)));
}
try {
getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
} catch (Exception e) {
for (Future<List<Batch>> future : futureList) {
future.cancel(true);
}
throw e;
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
}
private List<Batch> doSendBatchDeleteInBatchesWithRetry(BackOffer backOffer, Batch batch) {
RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer);
try {
try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
client.rawBatchDelete(backOffer, batch.getKeys(), atomicForCAS);
return new ArrayList<>();
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
logger.debug("ReSplitting ranges for BatchGetRequest", e);
logger.warn("ReSplitting ranges for BatchGetRequest", e);
// retry
return doSendBatchDeleteWithRefetchRegion(backOffer, batch);
@ -886,10 +1065,13 @@ public class RawKVClient implements AutoCloseable {
return key2;
}
private void doSendDeleteRange(BackOffer backOffer, ByteString startKey, ByteString endKey) {
private void doSendDeleteRange(
BackOffer backOffer, ByteString startKey, ByteString endKey, long deadline) {
ExecutorCompletionService<List<DeleteRange>> completionService =
new ExecutorCompletionService<>(deleteRangeThreadPool);
List<Future<List<DeleteRange>>> futureList = new ArrayList<>();
List<TiRegion> regions = fetchRegionsFromRange(backOffer, startKey, endKey);
List<DeleteRange> ranges = new ArrayList<>();
for (int i = 0; i < regions.size(); i++) {
@ -903,9 +1085,18 @@ public class RawKVClient implements AutoCloseable {
while (!taskQueue.isEmpty()) {
List<DeleteRange> task = taskQueue.poll();
for (DeleteRange range : task) {
completionService.submit(() -> doSendDeleteRangeWithRetry(range.getBackOffer(), range));
futureList.add(
completionService.submit(
() -> doSendDeleteRangeWithRetry(range.getBackOffer(), range)));
}
try {
getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
} catch (Exception e) {
for (Future<List<DeleteRange>> future : futureList) {
future.cancel(true);
}
throw e;
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
}
@ -917,7 +1108,7 @@ public class RawKVClient implements AutoCloseable {
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(range.getRegion());
logger.debug("ReSplitting ranges for BatchDeleteRangeRequest", e);
logger.warn("ReSplitting ranges for BatchDeleteRangeRequest", e);
// retry
return doSendDeleteRangeWithRefetchRegion(backOffer, range);
@ -970,15 +1161,12 @@ public class RawKVClient implements AutoCloseable {
ByteString startKey,
ByteString endKey,
int limit,
boolean keyOnly) {
boolean keyOnly,
BackOffer backOffer) {
if (limit > MAX_RAW_SCAN_LIMIT) {
throw ERR_MAX_SCAN_LIMIT_EXCEEDED;
}
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly);
}
private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS());
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer);
}
/**
@ -1031,7 +1219,7 @@ public class RawKVClient implements AutoCloseable {
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
Iterator<KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly);
rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, defaultBackOff());
RAW_REQUEST_SUCCESS.labels(label).inc();
return iterator;
} catch (Exception e) {
@ -1110,7 +1298,13 @@ public class RawKVClient implements AutoCloseable {
this.iterator =
rawScanIterator(
conf, clientBuilder, this.startKey, this.endKey, conf.getScanBatchSize(), keyOnly);
conf,
clientBuilder,
this.startKey,
this.endKey,
conf.getScanBatchSize(),
keyOnly,
defaultBackOff());
}
@Override
@ -1123,7 +1317,14 @@ public class RawKVClient implements AutoCloseable {
}
ByteString startKey = Key.toRawKey(this.last.getKey()).next().toByteString();
this.iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly);
rawScanIterator(
conf,
clientBuilder,
startKey,
endKey,
conf.getScanBatchSize(),
keyOnly,
defaultBackOff());
this.last = null;
return this.iterator.hasNext();
}
@ -1135,4 +1336,8 @@ public class RawKVClient implements AutoCloseable {
return next;
}
}
private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS());
}
}

View File

@ -71,7 +71,7 @@ public interface AbstractLockResolverClient {
TiRegion region,
TiStore store,
TikvGrpc.TikvBlockingStub blockingStub,
TikvGrpc.TikvStub asyncStub,
TikvGrpc.TikvFutureStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager,
PDClient pdClient,

View File

@ -52,7 +52,7 @@ import org.tikv.kvproto.Kvrpcpb.ResolveLockRequest;
import org.tikv.kvproto.Kvrpcpb.ResolveLockResponse;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub;
import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
/** Before v3.0.5 TiDB uses the ttl on secondary lock. */
public class LockResolverClientV2 extends AbstractRegionStoreClient
@ -77,7 +77,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
TiRegion region,
TiStore store,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
TikvFutureStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager) {
super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);

View File

@ -49,7 +49,7 @@ import org.tikv.kvproto.Kvrpcpb.CleanupRequest;
import org.tikv.kvproto.Kvrpcpb.CleanupResponse;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub;
import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
/** Since v3.0.5 TiDB ignores the ttl on secondary lock and will use the ttl on primary key. */
public class LockResolverClientV3 extends AbstractRegionStoreClient
@ -78,7 +78,7 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient
TiRegion region,
TiStore store,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
TikvFutureStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager,
PDClient pdClient,

View File

@ -47,7 +47,7 @@ import org.tikv.common.util.TsoUtils;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub;
import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
import org.tikv.txn.exception.TxnNotFoundException;
import org.tikv.txn.exception.WriteConflictException;
@ -78,7 +78,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
TiRegion region,
TiStore store,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
TikvFutureStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager,
PDClient pdClient,

View File

@ -16,7 +16,7 @@
package org.tikv.common;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
@ -153,6 +153,18 @@ public class RegionManagerTest extends PDMockServerTest {
StoreState.Tombstone,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
assertNull(mgr.getStoreById(storeId + 1));
try {
mgr.getStoreById(storeId + 1);
fail();
} catch (Exception ignored) {
}
mgr.invalidateStore(storeId);
try {
mgr.getStoreById(storeId);
fail();
} catch (Exception ignored) {
}
}
}

View File

@ -18,6 +18,10 @@ import org.tikv.common.TiSession;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.FastByteComparisons;
import org.tikv.common.util.Pair;
import org.tikv.common.util.ScanOption;
@ -136,6 +140,46 @@ public class RawKVClientTest extends BaseRawKVTest {
"%s%02d", RandomStringUtils.randomAlphabetic(3).toUpperCase(Locale.ROOT), r.nextInt(10000));
}
@Test
public void testCustomBackOff() {
int timeout = 2000;
int sleep = 150;
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(timeout);
long s = System.currentTimeMillis();
try {
while (true) {
Thread.sleep(sleep);
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new Exception("t"));
}
} catch (Exception ignored) {
} finally {
long e = System.currentTimeMillis();
long duration = e - s;
logger.info("duration = " + duration);
assert (duration >= 2900);
}
}
@Test
public void testDeadlineBackOff() {
int timeout = 2000;
int sleep = 150;
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE);
long s = System.currentTimeMillis();
try {
while (true) {
Thread.sleep(sleep);
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new Exception("t"));
}
} catch (Exception ignored) {
} finally {
long e = System.currentTimeMillis();
long duration = e - s;
logger.info("duration = " + duration);
assert (duration <= timeout + sleep);
}
}
@Test
public void batchPutTest() {
ExecutorService executors = Executors.newFixedThreadPool(200);