mirror of https://github.com/tikv/client-java.git
parent
3163793311
commit
774ee50189
|
@ -37,8 +37,10 @@ import org.tikv.common.operation.ErrorHandler;
|
|||
import org.tikv.common.policy.RetryMaxMs.Builder;
|
||||
import org.tikv.common.policy.RetryPolicy;
|
||||
import org.tikv.common.streaming.StreamingResponse;
|
||||
import org.tikv.common.util.BackOffFunction.BackOffFuncType;
|
||||
import org.tikv.common.util.BackOffer;
|
||||
import org.tikv.common.util.ChannelFactory;
|
||||
import org.tikv.common.util.ConcreteBackOffer;
|
||||
|
||||
public abstract class AbstractGRPCClient<
|
||||
BlockingStubT extends AbstractStub<BlockingStubT>,
|
||||
|
@ -179,19 +181,29 @@ public abstract class AbstractGRPCClient<
|
|||
|
||||
protected abstract FutureStubT getAsyncStub();
|
||||
|
||||
protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
|
||||
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
|
||||
HealthGrpc.HealthBlockingStub stub =
|
||||
HealthGrpc.newBlockingStub(channel).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
|
||||
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
|
||||
try {
|
||||
HealthCheckResponse resp = stub.check(req);
|
||||
if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) {
|
||||
return false;
|
||||
private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
|
||||
while (true) {
|
||||
try {
|
||||
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
|
||||
HealthGrpc.HealthBlockingStub stub =
|
||||
HealthGrpc.newBlockingStub(channel)
|
||||
.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
|
||||
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
|
||||
HealthCheckResponse resp = stub.check(req);
|
||||
return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
|
||||
} catch (Exception e) {
|
||||
logger.warn("check health failed.", e);
|
||||
backOffer.doBackOff(BackOffFuncType.BoCheckHealth, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
|
||||
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff((int) (timeout * 2));
|
||||
try {
|
||||
return doCheckHealth(backOffer, addressStr, hostMapping);
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ public class ConfigUtils {
|
|||
public static final String TIKV_GRPC_KEEPALIVE_TIME = "tikv.grpc.keepalive_time";
|
||||
public static final String TIKV_GRPC_KEEPALIVE_TIMEOUT = "tikv.grpc.keepalive_timeout";
|
||||
public static final String TIKV_GRPC_IDLE_TIMEOUT = "tikv.grpc.idle_timeout";
|
||||
public static final String TIKV_CONN_RECYCLE_TIME = "tikv.conn.recycle_time";
|
||||
|
||||
public static final String TIKV_INDEX_SCAN_BATCH_SIZE = "tikv.index.scan_batch_size";
|
||||
public static final String TIKV_INDEX_SCAN_CONCURRENCY = "tikv.index.scan_concurrency";
|
||||
|
@ -93,6 +94,7 @@ public class ConfigUtils {
|
|||
public static final String TIKV_RAWKV_SERVER_SLOWLOG_FACTOR = "tikv.rawkv.server_slowlog_factor";
|
||||
|
||||
public static final String TIKV_TLS_ENABLE = "tikv.tls_enable";
|
||||
public static final String TIKV_TLS_RELOAD_INTERVAL = "tikv.tls.reload_interval";
|
||||
public static final String TIKV_TRUST_CERT_COLLECTION = "tikv.trust_cert_collection";
|
||||
public static final String TIKV_KEY_CERT_CHAIN = "tikv.key_cert_chain";
|
||||
public static final String TIKV_KEY_FILE = "tikv.key_file";
|
||||
|
@ -130,6 +132,8 @@ public class ConfigUtils {
|
|||
public static final int DEF_HEALTH_CHECK_PERIOD_DURATION = 300;
|
||||
public static final int DEF_SCAN_BATCH_SIZE = 10240;
|
||||
public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB
|
||||
public static final String DEF_TIKV_CONN_RECYCLE_TIME = "60s";
|
||||
public static final String DEF_TIKV_TLS_RELOAD_INTERVAL = "10s";
|
||||
public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000;
|
||||
public static final int DEF_REGION_SCAN_DOWNGRADE_THRESHOLD = 10000000;
|
||||
// if keyRange size per request exceeds this limit, the request might be too large to be accepted
|
||||
|
|
|
@ -435,23 +435,34 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
|
|||
return pdClientWrapper;
|
||||
}
|
||||
|
||||
private GetMembersResponse getMembers(URI uri) {
|
||||
try {
|
||||
ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping);
|
||||
PDGrpc.PDBlockingStub stub =
|
||||
PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
|
||||
GetMembersRequest request =
|
||||
GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build();
|
||||
GetMembersResponse resp = stub.getMembers(request);
|
||||
// check if the response contains a valid leader
|
||||
if (resp != null && resp.getLeader().getMemberId() == 0) {
|
||||
return null;
|
||||
private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) {
|
||||
while (true) {
|
||||
try {
|
||||
ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping);
|
||||
PDGrpc.PDBlockingStub stub =
|
||||
PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
|
||||
GetMembersRequest request =
|
||||
GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build();
|
||||
GetMembersResponse resp = stub.getMembers(request);
|
||||
// check if the response contains a valid leader
|
||||
if (resp != null && resp.getLeader().getMemberId() == 0) {
|
||||
return null;
|
||||
}
|
||||
return resp;
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to get member from pd server.", e);
|
||||
backOffer.doBackOff(BackOffFuncType.BoPDRPC, e);
|
||||
}
|
||||
return resp;
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to get member from pd server.", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private GetMembersResponse getMembers(URI uri) {
|
||||
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
|
||||
try {
|
||||
return doGetMembers(backOffer, uri);
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// return whether the leader has changed to target address `leaderUrlStr`.
|
||||
|
@ -463,7 +474,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
|
|||
return true;
|
||||
}
|
||||
}
|
||||
// If leader has transfered to another member, we can create another leaderwrapper.
|
||||
// If leader has transferred to another member, we can create another leaderWrapper.
|
||||
}
|
||||
// switch leader
|
||||
return createLeaderClientWrapper(leaderUrlStr);
|
||||
|
@ -502,7 +513,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
|
|||
return true;
|
||||
}
|
||||
|
||||
public synchronized void updateLeaderOrforwardFollower() {
|
||||
public synchronized void updateLeaderOrForwardFollower() {
|
||||
if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
|
||||
return;
|
||||
}
|
||||
|
@ -520,7 +531,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
|
|||
leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
|
||||
|
||||
// if leader is switched, just return.
|
||||
if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) {
|
||||
if (checkHealth(leaderUrlStr, hostMapping) && createLeaderClientWrapper(leaderUrlStr)) {
|
||||
lastUpdateLeaderTime = System.currentTimeMillis();
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -17,14 +17,157 @@
|
|||
|
||||
package org.tikv.common;
|
||||
|
||||
import static org.tikv.common.ConfigUtils.*;
|
||||
import static org.tikv.common.ConfigUtils.DEF_BATCH_DELETE_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.DEF_BATCH_GET_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.DEF_BATCH_PUT_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.DEF_BATCH_SCAN_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.DEF_CHECK_HEALTH_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_DB_PREFIX;
|
||||
import static org.tikv.common.ConfigUtils.DEF_DELETE_RANGE_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.DEF_FORWARD_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_GRPC_FORWARD_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_HEALTH_CHECK_PERIOD_DURATION;
|
||||
import static org.tikv.common.ConfigUtils.DEF_INDEX_SCAN_BATCH_SIZE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_INDEX_SCAN_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.DEF_KV_CLIENT_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.DEF_MAX_FRAME_SIZE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_METRICS_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_METRICS_PORT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_PD_ADDRESSES;
|
||||
import static org.tikv.common.ConfigUtils.DEF_REPLICA_READ;
|
||||
import static org.tikv.common.ConfigUtils.DEF_SCAN_BATCH_SIZE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_SCAN_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_SHOW_ROWID;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TABLE_SCAN_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIFLASH_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_BO_REGION_MISS_BASE_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_CONN_RECYCLE_TIME;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_ENABLE_ATOMIC_FOR_CAS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_IDLE_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_INGEST_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_KEEPALIVE_TIME;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_KEEPALIVE_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_WARM_UP_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_NETWORK_MAPPING_NAME;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_PD_FIRST_GET_MEMBER_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_SCAN_REGIONS_LIMIT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_SCATTER_WAIT_SECONDS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_TLS_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_TLS_RELOAD_INTERVAL;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_USE_JKS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIKV_WARM_UP_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS;
|
||||
import static org.tikv.common.ConfigUtils.FOLLOWER;
|
||||
import static org.tikv.common.ConfigUtils.HIGH_COMMAND_PRIORITY;
|
||||
import static org.tikv.common.ConfigUtils.LEADER_AND_FOLLOWER;
|
||||
import static org.tikv.common.ConfigUtils.LOW_COMMAND_PRIORITY;
|
||||
import static org.tikv.common.ConfigUtils.NORMAL_COMMAND_PRIORITY;
|
||||
import static org.tikv.common.ConfigUtils.RAW_KV_MODE;
|
||||
import static org.tikv.common.ConfigUtils.READ_COMMITTED_ISOLATION_LEVEL;
|
||||
import static org.tikv.common.ConfigUtils.SNAPSHOT_ISOLATION_LEVEL;
|
||||
import static org.tikv.common.ConfigUtils.TIFLASH_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_BATCH_DELETE_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_BATCH_GET_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_BATCH_PUT_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_BO_REGION_MISS_BASE_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_CONN_RECYCLE_TIME;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_DB_PREFIX;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_DELETE_RANGE_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_ENABLE_ATOMIC_FOR_CAS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_ENABLE_GRPC_FORWARD;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_FORWARD_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_HEALTH_CHECK_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_IDLE_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_INGEST_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_KEEPALIVE_TIME;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_KEEPALIVE_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_MAX_FRAME_SIZE;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_SCAN_BATCH_SIZE;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_GRPC_WARM_UP_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_HEALTH_CHECK_PERIOD_DURATION;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_IMPORTER_MAX_KV_BATCH_BYTES;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_IMPORTER_MAX_KV_BATCH_SIZE;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_INDEX_SCAN_BATCH_SIZE;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_INDEX_SCAN_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_JKS_KEY_PASSWORD;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_JKS_KEY_PATH;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_JKS_TRUST_PASSWORD;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_JKS_TRUST_PATH;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_KEY_CERT_CHAIN;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_KEY_FILE;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_KV_CLIENT_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_KV_MODE;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_METRICS_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_METRICS_PORT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_NETWORK_MAPPING_NAME;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_PD_ADDRESSES;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_PD_FIRST_GET_MEMBER_TIMEOUT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_READ_SLOWLOG_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_READ_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_SCAN_SLOWLOG_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_SCAN_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_SERVER_SLOWLOG_FACTOR;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_WRITE_SLOWLOG_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_WRITE_TIMEOUT_IN_MS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_REPLICA_READ;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_REQUEST_COMMAND_PRIORITY;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_REQUEST_ISOLATION_LEVEL;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_SCAN_REGIONS_LIMIT;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_SCATTER_WAIT_SECONDS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_SHOW_ROWID;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_TABLE_SCAN_CONCURRENCY;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_TLS_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_TLS_RELOAD_INTERVAL;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_TRUST_CERT_COLLECTION;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_USE_JKS;
|
||||
import static org.tikv.common.ConfigUtils.TIKV_WARM_UP_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.TXN_KV_MODE;
|
||||
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT;
|
||||
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE;
|
||||
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD;
|
||||
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS;
|
||||
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_ENABLE;
|
||||
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS;
|
||||
|
||||
import io.grpc.Metadata;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.Serializable;
|
||||
import java.net.URI;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -93,6 +236,8 @@ public class TiConfiguration implements Serializable {
|
|||
setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT);
|
||||
setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE);
|
||||
setIfMissing(TIKV_GRPC_MAX_FRAME_SIZE, DEF_MAX_FRAME_SIZE);
|
||||
setIfMissing(TIKV_CONN_RECYCLE_TIME, DEF_TIKV_CONN_RECYCLE_TIME);
|
||||
setIfMissing(TIKV_TLS_RELOAD_INTERVAL, DEF_TIKV_TLS_RELOAD_INTERVAL);
|
||||
setIfMissing(TIKV_INDEX_SCAN_BATCH_SIZE, DEF_INDEX_SCAN_BATCH_SIZE);
|
||||
setIfMissing(TIKV_INDEX_SCAN_CONCURRENCY, DEF_INDEX_SCAN_CONCURRENCY);
|
||||
setIfMissing(TIKV_TABLE_SCAN_CONCURRENCY, DEF_TABLE_SCAN_CONCURRENCY);
|
||||
|
@ -154,7 +299,7 @@ public class TiConfiguration implements Serializable {
|
|||
}
|
||||
|
||||
public static void listAll() {
|
||||
logger.info("static configurations are:" + new ArrayList<>(settings.entrySet()).toString());
|
||||
logger.info("static configurations are:" + new ArrayList<>(settings.entrySet()));
|
||||
}
|
||||
|
||||
private static void set(String key, String value) {
|
||||
|
@ -318,6 +463,7 @@ public class TiConfiguration implements Serializable {
|
|||
private long pdFirstGetMemberTimeout = getTimeAsMs(TIKV_PD_FIRST_GET_MEMBER_TIMEOUT);
|
||||
private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT);
|
||||
private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE);
|
||||
private long connRecycleTime = getTimeAsSeconds(TIKV_CONN_RECYCLE_TIME);
|
||||
private List<URI> pdAddrs = getPdAddrs(TIKV_PD_ADDRESSES);
|
||||
private int indexScanBatchSize = getInt(TIKV_INDEX_SCAN_BATCH_SIZE);
|
||||
private int indexScanConcurrency = getInt(TIKV_INDEX_SCAN_CONCURRENCY);
|
||||
|
@ -372,6 +518,8 @@ public class TiConfiguration implements Serializable {
|
|||
private double rawKVServerSlowLogFactor = getDouble(TIKV_RAWKV_SERVER_SLOWLOG_FACTOR, 0.5);
|
||||
|
||||
private boolean tlsEnable = getBoolean(TIKV_TLS_ENABLE);
|
||||
private long certReloadInterval = getTimeAsSeconds(TIKV_TLS_RELOAD_INTERVAL);
|
||||
|
||||
private String trustCertCollectionFile = getOption(TIKV_TRUST_CERT_COLLECTION).orElse(null);
|
||||
private String keyCertChainFile = getOption(TIKV_KEY_CERT_CHAIN).orElse(null);
|
||||
private String keyFile = getOption(TIKV_KEY_FILE).orElse(null);
|
||||
|
@ -382,7 +530,7 @@ public class TiConfiguration implements Serializable {
|
|||
private String jksTrustPath = getOption(TIKV_JKS_TRUST_PATH).orElse(null);
|
||||
private String jksTrustPassword = getOption(TIKV_JKS_TRUST_PASSWORD).orElse(null);
|
||||
|
||||
private boolean tiFlashEnable = getBoolean(TIFLASH_ENABLE);
|
||||
private final boolean tiFlashEnable = getBoolean(TIFLASH_ENABLE);
|
||||
private boolean warmUpEnable = getBoolean(TIKV_WARM_UP_ENABLE);
|
||||
|
||||
private boolean isTest = false;
|
||||
|
@ -538,6 +686,15 @@ public class TiConfiguration implements Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
public long getConnRecycleTimeInSeconds() {
|
||||
return connRecycleTime;
|
||||
}
|
||||
|
||||
public TiConfiguration setConnRecycleTimeInSeconds(int connRecycleTime) {
|
||||
this.connRecycleTime = connRecycleTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getIndexScanBatchSize() {
|
||||
return indexScanBatchSize;
|
||||
}
|
||||
|
@ -848,6 +1005,15 @@ public class TiConfiguration implements Serializable {
|
|||
return tlsEnable;
|
||||
}
|
||||
|
||||
public long getCertReloadIntervalInSeconds() {
|
||||
return certReloadInterval;
|
||||
}
|
||||
|
||||
public TiConfiguration setCertReloadIntervalInSeconds(long interval) {
|
||||
this.certReloadInterval = interval;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setTlsEnable(boolean tlsEnable) {
|
||||
this.tlsEnable = tlsEnable;
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ public class TiSession implements AutoCloseable {
|
|||
private volatile ExecutorService batchScanThreadPool;
|
||||
private volatile ExecutorService deleteRangeThreadPool;
|
||||
private volatile RegionManager regionManager;
|
||||
private volatile boolean enableGrpcForward;
|
||||
private final boolean enableGrpcForward;
|
||||
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
|
||||
private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder;
|
||||
private volatile boolean isClosed = false;
|
||||
|
@ -126,6 +126,8 @@ public class TiSession implements AutoCloseable {
|
|||
conf.getKeepaliveTime(),
|
||||
conf.getKeepaliveTimeout(),
|
||||
conf.getIdleTimeout(),
|
||||
conf.getConnRecycleTimeInSeconds(),
|
||||
conf.getCertReloadIntervalInSeconds(),
|
||||
conf.getJksKeyPath(),
|
||||
conf.getJksKeyPassword(),
|
||||
conf.getJksTrustPath(),
|
||||
|
@ -137,6 +139,8 @@ public class TiSession implements AutoCloseable {
|
|||
conf.getKeepaliveTime(),
|
||||
conf.getKeepaliveTimeout(),
|
||||
conf.getIdleTimeout(),
|
||||
conf.getConnRecycleTimeInSeconds(),
|
||||
conf.getCertReloadIntervalInSeconds(),
|
||||
conf.getTrustCertCollectionFile(),
|
||||
conf.getKeyCertChainFile(),
|
||||
conf.getKeyFile());
|
||||
|
|
|
@ -59,7 +59,7 @@ public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
case PD_ERROR:
|
||||
backOffer.doBackOff(
|
||||
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
|
||||
client.updateLeaderOrforwardFollower();
|
||||
client.updateLeaderOrForwardFollower();
|
||||
return true;
|
||||
case REGION_PEER_NOT_ELECTED:
|
||||
logger.debug(error.getMessage());
|
||||
|
@ -80,7 +80,7 @@ public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
return false;
|
||||
}
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
|
||||
client.updateLeaderOrforwardFollower();
|
||||
client.updateLeaderOrForwardFollower();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,6 +96,7 @@ public class StoreHealthyChecker implements Runnable {
|
|||
return true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.info("fail to check tombstone stores", e);
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -80,6 +80,7 @@ public class BackOffFunction {
|
|||
BoUpdateLeader,
|
||||
BoServerBusy,
|
||||
BoTxnNotFound,
|
||||
BoCheckTimeout
|
||||
BoCheckTimeout,
|
||||
BoCheckHealth
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.tikv.common.util;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.netty.GrpcSslContexts;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
|
@ -27,9 +28,15 @@ import java.io.File;
|
|||
import java.io.FileInputStream;
|
||||
import java.net.URI;
|
||||
import java.security.KeyStore;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import javax.net.ssl.KeyManagerFactory;
|
||||
import javax.net.ssl.SSLException;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
|
@ -40,43 +47,91 @@ import org.tikv.common.pd.PDUtils;
|
|||
|
||||
public class ChannelFactory implements AutoCloseable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ChannelFactory.class);
|
||||
private static final String PUB_KEY_INFRA = "PKIX";
|
||||
|
||||
// After `connRecycleTime` seconds elapses, the old channels will be forced to shut down,
|
||||
// to avoid using the old context all the time including potential channel leak.
|
||||
private final long connRecycleTime;
|
||||
private final int maxFrameSize;
|
||||
private final int keepaliveTime;
|
||||
private final int keepaliveTimeout;
|
||||
private final int idleTimeout;
|
||||
private final ConcurrentHashMap<String, ManagedChannel> connPool = new ConcurrentHashMap<>();
|
||||
private final CertContext certContext;
|
||||
private final CertWatcher certWatcher;
|
||||
|
||||
@VisibleForTesting
|
||||
public final ConcurrentHashMap<String, ManagedChannel> connPool = new ConcurrentHashMap<>();
|
||||
|
||||
private final AtomicReference<SslContextBuilder> sslContextBuilder = new AtomicReference<>();
|
||||
private static final String PUB_KEY_INFRA = "PKIX";
|
||||
|
||||
private abstract static class CertContext {
|
||||
protected abstract boolean isModified();
|
||||
private final ScheduledExecutorService recycler = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
protected abstract SslContextBuilder createSslContextBuilder();
|
||||
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
public SslContextBuilder reload() {
|
||||
if (isModified()) {
|
||||
logger.info("reload ssl context");
|
||||
return createSslContextBuilder();
|
||||
@VisibleForTesting
|
||||
public static class CertWatcher implements AutoCloseable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(CertWatcher.class);
|
||||
private final List<File> targets;
|
||||
private final List<Long> lastReload = new ArrayList<>();
|
||||
private final ScheduledExecutorService executorService =
|
||||
Executors.newSingleThreadScheduledExecutor();
|
||||
private final Runnable onChange;
|
||||
|
||||
public CertWatcher(long pollInterval, List<File> targets, Runnable onChange) {
|
||||
this.targets = targets;
|
||||
this.onChange = onChange;
|
||||
|
||||
for (File ignored : targets) {
|
||||
lastReload.add(0L);
|
||||
}
|
||||
return null;
|
||||
|
||||
executorService.scheduleAtFixedRate(
|
||||
this::tryReload, pollInterval, pollInterval, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void tryReload() {
|
||||
if (needReload()) {
|
||||
onChange.run();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean needReload() {
|
||||
boolean needReload = false;
|
||||
// Check all the modification of the `targets`.
|
||||
// If one of them changed, means to need reload.
|
||||
for (int i = 0; i < targets.size(); i++) {
|
||||
try {
|
||||
long lastModified = targets.get(i).lastModified();
|
||||
if (lastModified != lastReload.get(i)) {
|
||||
lastReload.set(i, lastModified);
|
||||
logger.warn("detected ssl context changes: {}", targets.get(i));
|
||||
needReload = true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("fail to check the status of ssl context files", e);
|
||||
}
|
||||
}
|
||||
return needReload;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private static class JksContext extends CertContext {
|
||||
private long keyLastModified;
|
||||
private long trustLastModified;
|
||||
@VisibleForTesting
|
||||
public abstract static class CertContext {
|
||||
public abstract SslContextBuilder createSslContextBuilder();
|
||||
}
|
||||
|
||||
public static class JksContext extends CertContext {
|
||||
private final String keyPath;
|
||||
private final String keyPassword;
|
||||
private final String trustPath;
|
||||
private final String trustPassword;
|
||||
|
||||
public JksContext(String keyPath, String keyPassword, String trustPath, String trustPassword) {
|
||||
this.keyLastModified = 0;
|
||||
this.trustLastModified = 0;
|
||||
|
||||
this.keyPath = keyPath;
|
||||
this.keyPassword = keyPassword;
|
||||
this.trustPath = trustPath;
|
||||
|
@ -84,22 +139,7 @@ public class ChannelFactory implements AutoCloseable {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected synchronized boolean isModified() {
|
||||
long a = new File(keyPath).lastModified();
|
||||
long b = new File(trustPath).lastModified();
|
||||
|
||||
boolean changed = this.keyLastModified != a || this.trustLastModified != b;
|
||||
|
||||
if (changed) {
|
||||
this.keyLastModified = a;
|
||||
this.trustLastModified = b;
|
||||
}
|
||||
|
||||
return changed;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SslContextBuilder createSslContextBuilder() {
|
||||
public SslContextBuilder createSslContextBuilder() {
|
||||
SslContextBuilder builder = GrpcSslContexts.forClient();
|
||||
try {
|
||||
if (keyPath != null && keyPassword != null) {
|
||||
|
@ -119,50 +159,26 @@ public class ChannelFactory implements AutoCloseable {
|
|||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("JKS SSL context builder failed!", e);
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
||||
private static class OpenSslContext extends CertContext {
|
||||
private long trustLastModified;
|
||||
private long chainLastModified;
|
||||
private long keyLastModified;
|
||||
|
||||
@VisibleForTesting
|
||||
public static class OpenSslContext extends CertContext {
|
||||
private final String trustPath;
|
||||
private final String chainPath;
|
||||
private final String keyPath;
|
||||
|
||||
public OpenSslContext(String trustPath, String chainPath, String keyPath) {
|
||||
this.trustLastModified = 0;
|
||||
this.chainLastModified = 0;
|
||||
this.keyLastModified = 0;
|
||||
|
||||
this.trustPath = trustPath;
|
||||
this.chainPath = chainPath;
|
||||
this.keyPath = keyPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized boolean isModified() {
|
||||
long a = new File(trustPath).lastModified();
|
||||
long b = new File(chainPath).lastModified();
|
||||
long c = new File(keyPath).lastModified();
|
||||
|
||||
boolean changed =
|
||||
this.trustLastModified != a || this.chainLastModified != b || this.keyLastModified != c;
|
||||
|
||||
if (changed) {
|
||||
this.trustLastModified = a;
|
||||
this.chainLastModified = b;
|
||||
this.keyLastModified = c;
|
||||
}
|
||||
|
||||
return changed;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SslContextBuilder createSslContextBuilder() {
|
||||
public SslContextBuilder createSslContextBuilder() {
|
||||
SslContextBuilder builder = GrpcSslContexts.forClient();
|
||||
if (trustPath != null) {
|
||||
builder.trustManager(new File(trustPath));
|
||||
|
@ -180,7 +196,9 @@ public class ChannelFactory implements AutoCloseable {
|
|||
this.keepaliveTime = keepaliveTime;
|
||||
this.keepaliveTimeout = keepaliveTimeout;
|
||||
this.idleTimeout = idleTimeout;
|
||||
this.certWatcher = null;
|
||||
this.certContext = null;
|
||||
this.connRecycleTime = 0;
|
||||
}
|
||||
|
||||
public ChannelFactory(
|
||||
|
@ -188,6 +206,8 @@ public class ChannelFactory implements AutoCloseable {
|
|||
int keepaliveTime,
|
||||
int keepaliveTimeout,
|
||||
int idleTimeout,
|
||||
long connRecycleTime,
|
||||
long certReloadInterval,
|
||||
String trustCertCollectionFilePath,
|
||||
String keyCertChainFilePath,
|
||||
String keyFilePath) {
|
||||
|
@ -195,8 +215,22 @@ public class ChannelFactory implements AutoCloseable {
|
|||
this.keepaliveTime = keepaliveTime;
|
||||
this.keepaliveTimeout = keepaliveTimeout;
|
||||
this.idleTimeout = idleTimeout;
|
||||
this.connRecycleTime = connRecycleTime;
|
||||
this.certContext =
|
||||
new OpenSslContext(trustCertCollectionFilePath, keyCertChainFilePath, keyFilePath);
|
||||
|
||||
File trustCert = new File(trustCertCollectionFilePath);
|
||||
File keyCert = new File(keyCertChainFilePath);
|
||||
File key = new File(keyFilePath);
|
||||
|
||||
if (certReloadInterval > 0) {
|
||||
onCertChange();
|
||||
this.certWatcher =
|
||||
new CertWatcher(
|
||||
certReloadInterval, ImmutableList.of(trustCert, keyCert, key), this::onCertChange);
|
||||
} else {
|
||||
this.certWatcher = null;
|
||||
}
|
||||
}
|
||||
|
||||
public ChannelFactory(
|
||||
|
@ -204,6 +238,8 @@ public class ChannelFactory implements AutoCloseable {
|
|||
int keepaliveTime,
|
||||
int keepaliveTimeout,
|
||||
int idleTimeout,
|
||||
long connRecycleTime,
|
||||
long certReloadInterval,
|
||||
String jksKeyPath,
|
||||
String jksKeyPassword,
|
||||
String jksTrustPath,
|
||||
|
@ -212,66 +248,99 @@ public class ChannelFactory implements AutoCloseable {
|
|||
this.keepaliveTime = keepaliveTime;
|
||||
this.keepaliveTimeout = keepaliveTimeout;
|
||||
this.idleTimeout = idleTimeout;
|
||||
this.connRecycleTime = connRecycleTime;
|
||||
this.certContext = new JksContext(jksKeyPath, jksKeyPassword, jksTrustPath, jksTrustPassword);
|
||||
|
||||
File jksKey = new File(jksKeyPath);
|
||||
File jksTrust = new File(jksTrustPath);
|
||||
if (certReloadInterval > 0) {
|
||||
onCertChange();
|
||||
this.certWatcher =
|
||||
new CertWatcher(
|
||||
certReloadInterval, ImmutableList.of(jksKey, jksTrust), this::onCertChange);
|
||||
} else {
|
||||
this.certWatcher = null;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public boolean reloadSslContext() {
|
||||
private void onCertChange() {
|
||||
try {
|
||||
SslContextBuilder newBuilder = certContext.createSslContextBuilder();
|
||||
lock.writeLock().lock();
|
||||
sslContextBuilder.set(newBuilder);
|
||||
|
||||
List<ManagedChannel> pending = new ArrayList<>(connPool.values());
|
||||
recycler.schedule(() -> cleanExpiredConn(pending), connRecycleTime, TimeUnit.SECONDS);
|
||||
|
||||
connPool.clear();
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public ManagedChannel getChannel(String address, HostMapping mapping) {
|
||||
if (certContext != null) {
|
||||
SslContextBuilder newBuilder = certContext.reload();
|
||||
if (newBuilder != null) {
|
||||
sslContextBuilder.set(newBuilder);
|
||||
return true;
|
||||
try {
|
||||
lock.readLock().lock();
|
||||
return connPool.computeIfAbsent(
|
||||
address, key -> createChannel(sslContextBuilder.get(), address, mapping));
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return connPool.computeIfAbsent(address, key -> createChannel(null, address, mapping));
|
||||
}
|
||||
|
||||
public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) {
|
||||
if (reloadSslContext()) {
|
||||
logger.info("invalidate connection pool");
|
||||
connPool.clear();
|
||||
private ManagedChannel createChannel(
|
||||
SslContextBuilder sslContextBuilder, String address, HostMapping mapping) {
|
||||
URI uri, mapped;
|
||||
try {
|
||||
uri = PDUtils.addrToUri(address);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("failed to form address " + address, e);
|
||||
}
|
||||
try {
|
||||
mapped = mapping.getMappedURI(uri);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("failed to get mapped address " + uri, e);
|
||||
}
|
||||
|
||||
return connPool.computeIfAbsent(
|
||||
addressStr,
|
||||
key -> {
|
||||
URI address;
|
||||
URI mappedAddr;
|
||||
try {
|
||||
address = PDUtils.addrToUri(key);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("failed to form address " + key, e);
|
||||
}
|
||||
try {
|
||||
mappedAddr = hostMapping.getMappedURI(address);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("failed to get mapped address " + address, e);
|
||||
}
|
||||
// Channel should be lazy without actual connection until first call
|
||||
// So a coarse grain lock is ok here
|
||||
NettyChannelBuilder builder =
|
||||
NettyChannelBuilder.forAddress(mapped.getHost(), mapped.getPort())
|
||||
.maxInboundMessageSize(maxFrameSize)
|
||||
.keepAliveTime(keepaliveTime, TimeUnit.SECONDS)
|
||||
.keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS)
|
||||
.keepAliveWithoutCalls(true)
|
||||
.idleTimeout(idleTimeout, TimeUnit.SECONDS);
|
||||
|
||||
// Channel should be lazy without actual connection until first call
|
||||
// So a coarse grain lock is ok here
|
||||
NettyChannelBuilder builder =
|
||||
NettyChannelBuilder.forAddress(mappedAddr.getHost(), mappedAddr.getPort())
|
||||
.maxInboundMessageSize(maxFrameSize)
|
||||
.keepAliveTime(keepaliveTime, TimeUnit.SECONDS)
|
||||
.keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS)
|
||||
.keepAliveWithoutCalls(true)
|
||||
.idleTimeout(idleTimeout, TimeUnit.SECONDS);
|
||||
if (sslContextBuilder == null) {
|
||||
return builder.usePlaintext().build();
|
||||
} else {
|
||||
SslContext sslContext;
|
||||
try {
|
||||
sslContext = sslContextBuilder.build();
|
||||
} catch (SSLException e) {
|
||||
logger.error("create ssl context failed!", e);
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
return builder.sslContext(sslContext).build();
|
||||
}
|
||||
}
|
||||
|
||||
if (certContext == null) {
|
||||
return builder.usePlaintext().build();
|
||||
} else {
|
||||
SslContext sslContext;
|
||||
try {
|
||||
sslContext = sslContextBuilder.get().build();
|
||||
} catch (SSLException e) {
|
||||
logger.error("create ssl context failed!", e);
|
||||
return null;
|
||||
}
|
||||
return builder.sslContext(sslContext).build();
|
||||
}
|
||||
});
|
||||
private void cleanExpiredConn(List<ManagedChannel> pending) {
|
||||
for (ManagedChannel channel : pending) {
|
||||
logger.info("cleaning expire channels");
|
||||
channel.shutdownNow();
|
||||
while (!channel.isShutdown()) {
|
||||
try {
|
||||
channel.awaitTermination(5, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
logger.warn("recycle channels timeout:", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
|
@ -279,5 +348,10 @@ public class ChannelFactory implements AutoCloseable {
|
|||
ch.shutdown();
|
||||
}
|
||||
connPool.clear();
|
||||
|
||||
if (certContext != null) {
|
||||
recycler.shutdown();
|
||||
certWatcher.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,6 +171,9 @@ public class ConcreteBackOffer implements BackOffer {
|
|||
case BoCheckTimeout:
|
||||
backOffFunction = BackOffFunction.create(0, 0, BackOffStrategy.NoJitter);
|
||||
break;
|
||||
case BoCheckHealth:
|
||||
backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter);
|
||||
break;
|
||||
}
|
||||
return backOffFunction;
|
||||
}
|
||||
|
|
|
@ -17,27 +17,92 @@
|
|||
|
||||
package org.tikv.common;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.grpc.ManagedChannel;
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.junit.Test;
|
||||
import org.tikv.common.util.ChannelFactory;
|
||||
import org.tikv.common.util.ChannelFactory.CertWatcher;
|
||||
|
||||
public class ChannelFactoryTest {
|
||||
private final AtomicLong ts = new AtomicLong(System.currentTimeMillis());
|
||||
private final String tlsPath = "src/test/resources/tls/";
|
||||
private final String caPath = tlsPath + "ca.crt";
|
||||
private final String clientCertPath = tlsPath + "client.crt";
|
||||
private final String clientKeyPath = tlsPath + "client.pem";
|
||||
|
||||
private ChannelFactory createFactory() {
|
||||
int v = 1024;
|
||||
return new ChannelFactory(v, v, v, v, 5, 10, caPath, clientCertPath, clientKeyPath);
|
||||
}
|
||||
|
||||
private void touchCert() {
|
||||
ts.addAndGet(100_000_000);
|
||||
assertTrue(new File(caPath).setLastModified(ts.get()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTlsReload() {
|
||||
final int v = 1024;
|
||||
String tlsPath = "src/test/resources/tls/";
|
||||
String caPath = tlsPath + "ca.crt";
|
||||
String clientCertPath = tlsPath + "client.crt";
|
||||
String clientKeyPath = tlsPath + "client.pem";
|
||||
ChannelFactory factory = new ChannelFactory(v, v, v, v, caPath, clientCertPath, clientKeyPath);
|
||||
HostMapping mapping = uri -> uri;
|
||||
public void testCertWatcher() throws InterruptedException {
|
||||
AtomicBoolean changed = new AtomicBoolean(false);
|
||||
File a = new File(caPath);
|
||||
File b = new File(clientCertPath);
|
||||
File c = new File(clientKeyPath);
|
||||
new CertWatcher(2, ImmutableList.of(a, b, c), () -> changed.set(true));
|
||||
Thread.sleep(5000);
|
||||
assertTrue(changed.get());
|
||||
}
|
||||
|
||||
factory.getChannel("127.0.0.1:2379", mapping);
|
||||
@Test
|
||||
public void testMultiThreadTlsReload() throws InterruptedException {
|
||||
ChannelFactory factory = createFactory();
|
||||
HostMapping hostMapping = uri -> uri;
|
||||
|
||||
assertTrue(new File(clientKeyPath).setLastModified(System.currentTimeMillis()));
|
||||
int taskCount = Runtime.getRuntime().availableProcessors() * 2;
|
||||
List<Thread> tasks = new ArrayList<>(taskCount);
|
||||
for (int i = 0; i < taskCount; i++) {
|
||||
Thread t =
|
||||
new Thread(
|
||||
() -> {
|
||||
for (int j = 0; j < 100; j++) {
|
||||
String addr = "127.0.0.1:237" + (j % 2 == 0 ? 9 : 8);
|
||||
ManagedChannel c = factory.getChannel(addr, hostMapping);
|
||||
assertNotNull(c);
|
||||
c.shutdownNow();
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ignore) {
|
||||
}
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
tasks.add(t);
|
||||
}
|
||||
Thread reactor =
|
||||
new Thread(
|
||||
() -> {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
touchCert();
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ignore) {
|
||||
}
|
||||
}
|
||||
});
|
||||
reactor.start();
|
||||
|
||||
assertTrue(factory.reloadSslContext());
|
||||
for (Thread t : tasks) {
|
||||
t.join();
|
||||
}
|
||||
reactor.join();
|
||||
|
||||
factory.close();
|
||||
assertTrue(factory.connPool.isEmpty());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.tikv.common;
|
|||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ByteString;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.Arrays;
|
||||
import org.tikv.common.codec.Codec.BytesCodec;
|
||||
import org.tikv.common.codec.CodecDataOutput;
|
||||
|
@ -108,4 +109,13 @@ public class GrpcUtils {
|
|||
.setStore(store)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static int getFreePort() {
|
||||
while (true) {
|
||||
try (ServerSocket s = new ServerSocket(0)) {
|
||||
return s.getLocalPort();
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -394,7 +394,6 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
|
|||
}
|
||||
|
||||
private static class HealCheck extends HealthImplBase {
|
||||
|
||||
@Override
|
||||
public void check(
|
||||
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.tikv.kvproto.Metapb.Store;
|
|||
import org.tikv.kvproto.Metapb.StoreState;
|
||||
|
||||
public class PDClientMockTest extends PDMockServerTest {
|
||||
|
||||
private static final String LOCAL_ADDR_IPV6 = "[::1]";
|
||||
public static final String HTTP = "http://";
|
||||
|
||||
|
|
|
@ -20,6 +20,10 @@ package org.tikv.common;
|
|||
import io.grpc.Server;
|
||||
import io.grpc.ServerBuilder;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.health.v1.HealthCheckRequest;
|
||||
import io.grpc.health.v1.HealthCheckResponse;
|
||||
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
|
||||
import io.grpc.health.v1.HealthGrpc.HealthImplBase;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
|
@ -130,10 +134,21 @@ public class PDMockServer extends PDGrpc.PDImplBase {
|
|||
start(clusterId, port);
|
||||
}
|
||||
|
||||
private static class HealCheck extends HealthImplBase {
|
||||
@Override
|
||||
public void check(
|
||||
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
|
||||
responseObserver.onNext(
|
||||
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
public void start(long clusterId, int port) throws IOException {
|
||||
this.clusterId = clusterId;
|
||||
this.port = port;
|
||||
server = ServerBuilder.forPort(port).addService(this).build().start();
|
||||
server =
|
||||
ServerBuilder.forPort(port).addService(new HealCheck()).addService(this).build().start();
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(PDMockServer.this::stop));
|
||||
}
|
||||
|
|
|
@ -18,14 +18,12 @@
|
|||
package org.tikv.common;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
public abstract class PDMockServerTest {
|
||||
|
||||
protected static final String LOCAL_ADDR = "127.0.0.1";
|
||||
static final long CLUSTER_ID = 1024;
|
||||
protected TiSession session;
|
||||
|
@ -40,9 +38,7 @@ public abstract class PDMockServerTest {
|
|||
void setup(String addr) throws IOException {
|
||||
int[] ports = new int[3];
|
||||
for (int i = 0; i < ports.length; i++) {
|
||||
try (ServerSocket s = new ServerSocket(0)) {
|
||||
ports[i] = s.getLocalPort();
|
||||
}
|
||||
ports[i] = GrpcUtils.getFreePort();
|
||||
}
|
||||
|
||||
for (int i = 0; i < ports.length; i++) {
|
||||
|
|
|
@ -84,6 +84,16 @@ public class TiConfigurationTest {
|
|||
assertFalse(conf.isJksEnable());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void certReloadTest() {
|
||||
TiConfiguration conf = TiConfiguration.createDefault();
|
||||
conf.setCertReloadIntervalInSeconds(10);
|
||||
conf.setConnRecycleTimeInSeconds(10);
|
||||
|
||||
assertEquals(10, conf.getCertReloadIntervalInSeconds());
|
||||
assertEquals(10, conf.getConnRecycleTimeInSeconds());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void slowLogDefaultValueTest() {
|
||||
TiConfiguration conf = TiConfiguration.createRawDefault();
|
||||
|
|
Loading…
Reference in New Issue