diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index caa3cc99b1..616bcfbaeb 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -37,8 +37,10 @@ import org.tikv.common.operation.ErrorHandler; import org.tikv.common.policy.RetryMaxMs.Builder; import org.tikv.common.policy.RetryPolicy; import org.tikv.common.streaming.StreamingResponse; +import org.tikv.common.util.BackOffFunction.BackOffFuncType; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.ConcreteBackOffer; public abstract class AbstractGRPCClient< BlockingStubT extends AbstractStub, @@ -179,19 +181,29 @@ public abstract class AbstractGRPCClient< protected abstract FutureStubT getAsyncStub(); - protected boolean checkHealth(String addressStr, HostMapping hostMapping) { - ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); - HealthGrpc.HealthBlockingStub stub = - HealthGrpc.newBlockingStub(channel).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); - HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); - try { - HealthCheckResponse resp = stub.check(req); - if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) { - return false; + private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { + while (true) { + try { + ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); + HealthGrpc.HealthBlockingStub stub = + HealthGrpc.newBlockingStub(channel) + .withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); + HealthCheckResponse resp = stub.check(req); + return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING; + } catch (Exception e) { + logger.warn("check health failed.", e); + backOffer.doBackOff(BackOffFuncType.BoCheckHealth, e); } + } + } + + protected boolean checkHealth(String addressStr, HostMapping hostMapping) { + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff((int) (timeout * 2)); + try { + return doCheckHealth(backOffer, addressStr, hostMapping); } catch (Exception e) { return false; } - return true; } } diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 600a81d109..dd5d1d28bb 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -36,6 +36,7 @@ public class ConfigUtils { public static final String TIKV_GRPC_KEEPALIVE_TIME = "tikv.grpc.keepalive_time"; public static final String TIKV_GRPC_KEEPALIVE_TIMEOUT = "tikv.grpc.keepalive_timeout"; public static final String TIKV_GRPC_IDLE_TIMEOUT = "tikv.grpc.idle_timeout"; + public static final String TIKV_CONN_RECYCLE_TIME = "tikv.conn.recycle_time"; public static final String TIKV_INDEX_SCAN_BATCH_SIZE = "tikv.index.scan_batch_size"; public static final String TIKV_INDEX_SCAN_CONCURRENCY = "tikv.index.scan_concurrency"; @@ -93,6 +94,7 @@ public class ConfigUtils { public static final String TIKV_RAWKV_SERVER_SLOWLOG_FACTOR = "tikv.rawkv.server_slowlog_factor"; public static final String TIKV_TLS_ENABLE = "tikv.tls_enable"; + public static final String TIKV_TLS_RELOAD_INTERVAL = "tikv.tls.reload_interval"; public static final String TIKV_TRUST_CERT_COLLECTION = "tikv.trust_cert_collection"; public static final String TIKV_KEY_CERT_CHAIN = "tikv.key_cert_chain"; public static final String TIKV_KEY_FILE = "tikv.key_file"; @@ -130,6 +132,8 @@ public class ConfigUtils { public static final int DEF_HEALTH_CHECK_PERIOD_DURATION = 300; public static final int DEF_SCAN_BATCH_SIZE = 10240; public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB + public static final String DEF_TIKV_CONN_RECYCLE_TIME = "60s"; + public static final String DEF_TIKV_TLS_RELOAD_INTERVAL = "10s"; public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000; public static final int DEF_REGION_SCAN_DOWNGRADE_THRESHOLD = 10000000; // if keyRange size per request exceeds this limit, the request might be too large to be accepted diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 80633fe2ca..c6263b2c9f 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -435,23 +435,34 @@ public class PDClient extends AbstractGRPCClient return pdClientWrapper; } - private GetMembersResponse getMembers(URI uri) { - try { - ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping); - PDGrpc.PDBlockingStub stub = - PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); - GetMembersRequest request = - GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build(); - GetMembersResponse resp = stub.getMembers(request); - // check if the response contains a valid leader - if (resp != null && resp.getLeader().getMemberId() == 0) { - return null; + private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { + while (true) { + try { + ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping); + PDGrpc.PDBlockingStub stub = + PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + GetMembersRequest request = + GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build(); + GetMembersResponse resp = stub.getMembers(request); + // check if the response contains a valid leader + if (resp != null && resp.getLeader().getMemberId() == 0) { + return null; + } + return resp; + } catch (Exception e) { + logger.warn("failed to get member from pd server.", e); + backOffer.doBackOff(BackOffFuncType.BoPDRPC, e); } - return resp; - } catch (Exception e) { - logger.warn("failed to get member from pd server.", e); } - return null; + } + + private GetMembersResponse getMembers(URI uri) { + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + try { + return doGetMembers(backOffer, uri); + } catch (Exception e) { + return null; + } } // return whether the leader has changed to target address `leaderUrlStr`. @@ -463,7 +474,7 @@ public class PDClient extends AbstractGRPCClient return true; } } - // If leader has transfered to another member, we can create another leaderwrapper. + // If leader has transferred to another member, we can create another leaderWrapper. } // switch leader return createLeaderClientWrapper(leaderUrlStr); @@ -502,7 +513,7 @@ public class PDClient extends AbstractGRPCClient return true; } - public synchronized void updateLeaderOrforwardFollower() { + public synchronized void updateLeaderOrForwardFollower() { if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) { return; } @@ -520,7 +531,7 @@ public class PDClient extends AbstractGRPCClient leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // if leader is switched, just return. - if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { + if (checkHealth(leaderUrlStr, hostMapping) && createLeaderClientWrapper(leaderUrlStr)) { lastUpdateLeaderTime = System.currentTimeMillis(); return; } diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index bc64a7ce4b..cae0ed40c5 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -17,14 +17,157 @@ package org.tikv.common; -import static org.tikv.common.ConfigUtils.*; +import static org.tikv.common.ConfigUtils.DEF_BATCH_DELETE_CONCURRENCY; +import static org.tikv.common.ConfigUtils.DEF_BATCH_GET_CONCURRENCY; +import static org.tikv.common.ConfigUtils.DEF_BATCH_PUT_CONCURRENCY; +import static org.tikv.common.ConfigUtils.DEF_BATCH_SCAN_CONCURRENCY; +import static org.tikv.common.ConfigUtils.DEF_CHECK_HEALTH_TIMEOUT; +import static org.tikv.common.ConfigUtils.DEF_DB_PREFIX; +import static org.tikv.common.ConfigUtils.DEF_DELETE_RANGE_CONCURRENCY; +import static org.tikv.common.ConfigUtils.DEF_FORWARD_TIMEOUT; +import static org.tikv.common.ConfigUtils.DEF_GRPC_FORWARD_ENABLE; +import static org.tikv.common.ConfigUtils.DEF_HEALTH_CHECK_PERIOD_DURATION; +import static org.tikv.common.ConfigUtils.DEF_INDEX_SCAN_BATCH_SIZE; +import static org.tikv.common.ConfigUtils.DEF_INDEX_SCAN_CONCURRENCY; +import static org.tikv.common.ConfigUtils.DEF_KV_CLIENT_CONCURRENCY; +import static org.tikv.common.ConfigUtils.DEF_MAX_FRAME_SIZE; +import static org.tikv.common.ConfigUtils.DEF_METRICS_ENABLE; +import static org.tikv.common.ConfigUtils.DEF_METRICS_PORT; +import static org.tikv.common.ConfigUtils.DEF_PD_ADDRESSES; +import static org.tikv.common.ConfigUtils.DEF_REPLICA_READ; +import static org.tikv.common.ConfigUtils.DEF_SCAN_BATCH_SIZE; +import static org.tikv.common.ConfigUtils.DEF_SCAN_TIMEOUT; +import static org.tikv.common.ConfigUtils.DEF_SHOW_ROWID; +import static org.tikv.common.ConfigUtils.DEF_TABLE_SCAN_CONCURRENCY; +import static org.tikv.common.ConfigUtils.DEF_TIFLASH_ENABLE; +import static org.tikv.common.ConfigUtils.DEF_TIKV_BO_REGION_MISS_BASE_IN_MS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_CONN_RECYCLE_TIME; +import static org.tikv.common.ConfigUtils.DEF_TIKV_ENABLE_ATOMIC_FOR_CAS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_IDLE_TIMEOUT; +import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_INGEST_TIMEOUT; +import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_KEEPALIVE_TIME; +import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_KEEPALIVE_TIMEOUT; +import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_WARM_UP_TIMEOUT; +import static org.tikv.common.ConfigUtils.DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES; +import static org.tikv.common.ConfigUtils.DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE; +import static org.tikv.common.ConfigUtils.DEF_TIKV_NETWORK_MAPPING_NAME; +import static org.tikv.common.ConfigUtils.DEF_TIKV_PD_FIRST_GET_MEMBER_TIMEOUT; +import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_SCAN_REGIONS_LIMIT; +import static org.tikv.common.ConfigUtils.DEF_TIKV_SCATTER_WAIT_SECONDS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_TLS_ENABLE; +import static org.tikv.common.ConfigUtils.DEF_TIKV_TLS_RELOAD_INTERVAL; +import static org.tikv.common.ConfigUtils.DEF_TIKV_USE_JKS; +import static org.tikv.common.ConfigUtils.DEF_TIKV_WARM_UP_ENABLE; +import static org.tikv.common.ConfigUtils.DEF_TIMEOUT; +import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT; +import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE; +import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD; +import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS; +import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_ENABLE; +import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS; +import static org.tikv.common.ConfigUtils.FOLLOWER; +import static org.tikv.common.ConfigUtils.HIGH_COMMAND_PRIORITY; +import static org.tikv.common.ConfigUtils.LEADER_AND_FOLLOWER; +import static org.tikv.common.ConfigUtils.LOW_COMMAND_PRIORITY; +import static org.tikv.common.ConfigUtils.NORMAL_COMMAND_PRIORITY; +import static org.tikv.common.ConfigUtils.RAW_KV_MODE; +import static org.tikv.common.ConfigUtils.READ_COMMITTED_ISOLATION_LEVEL; +import static org.tikv.common.ConfigUtils.SNAPSHOT_ISOLATION_LEVEL; +import static org.tikv.common.ConfigUtils.TIFLASH_ENABLE; +import static org.tikv.common.ConfigUtils.TIKV_BATCH_DELETE_CONCURRENCY; +import static org.tikv.common.ConfigUtils.TIKV_BATCH_GET_CONCURRENCY; +import static org.tikv.common.ConfigUtils.TIKV_BATCH_PUT_CONCURRENCY; +import static org.tikv.common.ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY; +import static org.tikv.common.ConfigUtils.TIKV_BO_REGION_MISS_BASE_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_CONN_RECYCLE_TIME; +import static org.tikv.common.ConfigUtils.TIKV_DB_PREFIX; +import static org.tikv.common.ConfigUtils.TIKV_DELETE_RANGE_CONCURRENCY; +import static org.tikv.common.ConfigUtils.TIKV_ENABLE_ATOMIC_FOR_CAS; +import static org.tikv.common.ConfigUtils.TIKV_ENABLE_GRPC_FORWARD; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_FORWARD_TIMEOUT; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_HEALTH_CHECK_TIMEOUT; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_IDLE_TIMEOUT; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_INGEST_TIMEOUT; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_KEEPALIVE_TIME; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_KEEPALIVE_TIMEOUT; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_MAX_FRAME_SIZE; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_SCAN_BATCH_SIZE; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_TIMEOUT; +import static org.tikv.common.ConfigUtils.TIKV_GRPC_WARM_UP_TIMEOUT; +import static org.tikv.common.ConfigUtils.TIKV_HEALTH_CHECK_PERIOD_DURATION; +import static org.tikv.common.ConfigUtils.TIKV_IMPORTER_MAX_KV_BATCH_BYTES; +import static org.tikv.common.ConfigUtils.TIKV_IMPORTER_MAX_KV_BATCH_SIZE; +import static org.tikv.common.ConfigUtils.TIKV_INDEX_SCAN_BATCH_SIZE; +import static org.tikv.common.ConfigUtils.TIKV_INDEX_SCAN_CONCURRENCY; +import static org.tikv.common.ConfigUtils.TIKV_JKS_KEY_PASSWORD; +import static org.tikv.common.ConfigUtils.TIKV_JKS_KEY_PATH; +import static org.tikv.common.ConfigUtils.TIKV_JKS_TRUST_PASSWORD; +import static org.tikv.common.ConfigUtils.TIKV_JKS_TRUST_PATH; +import static org.tikv.common.ConfigUtils.TIKV_KEY_CERT_CHAIN; +import static org.tikv.common.ConfigUtils.TIKV_KEY_FILE; +import static org.tikv.common.ConfigUtils.TIKV_KV_CLIENT_CONCURRENCY; +import static org.tikv.common.ConfigUtils.TIKV_KV_MODE; +import static org.tikv.common.ConfigUtils.TIKV_METRICS_ENABLE; +import static org.tikv.common.ConfigUtils.TIKV_METRICS_PORT; +import static org.tikv.common.ConfigUtils.TIKV_NETWORK_MAPPING_NAME; +import static org.tikv.common.ConfigUtils.TIKV_PD_ADDRESSES; +import static org.tikv.common.ConfigUtils.TIKV_PD_FIRST_GET_MEMBER_TIMEOUT; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_READ_SLOWLOG_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_READ_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_SCAN_SLOWLOG_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_SCAN_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_SERVER_SLOWLOG_FACTOR; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_WRITE_SLOWLOG_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_RAWKV_WRITE_TIMEOUT_IN_MS; +import static org.tikv.common.ConfigUtils.TIKV_REPLICA_READ; +import static org.tikv.common.ConfigUtils.TIKV_REQUEST_COMMAND_PRIORITY; +import static org.tikv.common.ConfigUtils.TIKV_REQUEST_ISOLATION_LEVEL; +import static org.tikv.common.ConfigUtils.TIKV_SCAN_REGIONS_LIMIT; +import static org.tikv.common.ConfigUtils.TIKV_SCATTER_WAIT_SECONDS; +import static org.tikv.common.ConfigUtils.TIKV_SHOW_ROWID; +import static org.tikv.common.ConfigUtils.TIKV_TABLE_SCAN_CONCURRENCY; +import static org.tikv.common.ConfigUtils.TIKV_TLS_ENABLE; +import static org.tikv.common.ConfigUtils.TIKV_TLS_RELOAD_INTERVAL; +import static org.tikv.common.ConfigUtils.TIKV_TRUST_CERT_COLLECTION; +import static org.tikv.common.ConfigUtils.TIKV_USE_JKS; +import static org.tikv.common.ConfigUtils.TIKV_WARM_UP_ENABLE; +import static org.tikv.common.ConfigUtils.TXN_KV_MODE; +import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT; +import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE; +import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD; +import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS; +import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_ENABLE; +import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS; import io.grpc.Metadata; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; import java.net.URI; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +236,8 @@ public class TiConfiguration implements Serializable { setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE); setIfMissing(TIKV_GRPC_MAX_FRAME_SIZE, DEF_MAX_FRAME_SIZE); + setIfMissing(TIKV_CONN_RECYCLE_TIME, DEF_TIKV_CONN_RECYCLE_TIME); + setIfMissing(TIKV_TLS_RELOAD_INTERVAL, DEF_TIKV_TLS_RELOAD_INTERVAL); setIfMissing(TIKV_INDEX_SCAN_BATCH_SIZE, DEF_INDEX_SCAN_BATCH_SIZE); setIfMissing(TIKV_INDEX_SCAN_CONCURRENCY, DEF_INDEX_SCAN_CONCURRENCY); setIfMissing(TIKV_TABLE_SCAN_CONCURRENCY, DEF_TABLE_SCAN_CONCURRENCY); @@ -154,7 +299,7 @@ public class TiConfiguration implements Serializable { } public static void listAll() { - logger.info("static configurations are:" + new ArrayList<>(settings.entrySet()).toString()); + logger.info("static configurations are:" + new ArrayList<>(settings.entrySet())); } private static void set(String key, String value) { @@ -318,6 +463,7 @@ public class TiConfiguration implements Serializable { private long pdFirstGetMemberTimeout = getTimeAsMs(TIKV_PD_FIRST_GET_MEMBER_TIMEOUT); private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT); private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE); + private long connRecycleTime = getTimeAsSeconds(TIKV_CONN_RECYCLE_TIME); private List pdAddrs = getPdAddrs(TIKV_PD_ADDRESSES); private int indexScanBatchSize = getInt(TIKV_INDEX_SCAN_BATCH_SIZE); private int indexScanConcurrency = getInt(TIKV_INDEX_SCAN_CONCURRENCY); @@ -372,6 +518,8 @@ public class TiConfiguration implements Serializable { private double rawKVServerSlowLogFactor = getDouble(TIKV_RAWKV_SERVER_SLOWLOG_FACTOR, 0.5); private boolean tlsEnable = getBoolean(TIKV_TLS_ENABLE); + private long certReloadInterval = getTimeAsSeconds(TIKV_TLS_RELOAD_INTERVAL); + private String trustCertCollectionFile = getOption(TIKV_TRUST_CERT_COLLECTION).orElse(null); private String keyCertChainFile = getOption(TIKV_KEY_CERT_CHAIN).orElse(null); private String keyFile = getOption(TIKV_KEY_FILE).orElse(null); @@ -382,7 +530,7 @@ public class TiConfiguration implements Serializable { private String jksTrustPath = getOption(TIKV_JKS_TRUST_PATH).orElse(null); private String jksTrustPassword = getOption(TIKV_JKS_TRUST_PASSWORD).orElse(null); - private boolean tiFlashEnable = getBoolean(TIFLASH_ENABLE); + private final boolean tiFlashEnable = getBoolean(TIFLASH_ENABLE); private boolean warmUpEnable = getBoolean(TIKV_WARM_UP_ENABLE); private boolean isTest = false; @@ -538,6 +686,15 @@ public class TiConfiguration implements Serializable { return this; } + public long getConnRecycleTimeInSeconds() { + return connRecycleTime; + } + + public TiConfiguration setConnRecycleTimeInSeconds(int connRecycleTime) { + this.connRecycleTime = connRecycleTime; + return this; + } + public int getIndexScanBatchSize() { return indexScanBatchSize; } @@ -848,6 +1005,15 @@ public class TiConfiguration implements Serializable { return tlsEnable; } + public long getCertReloadIntervalInSeconds() { + return certReloadInterval; + } + + public TiConfiguration setCertReloadIntervalInSeconds(long interval) { + this.certReloadInterval = interval; + return this; + } + public void setTlsEnable(boolean tlsEnable) { this.tlsEnable = tlsEnable; } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 61aa245646..845e0ea44b 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -83,7 +83,7 @@ public class TiSession implements AutoCloseable { private volatile ExecutorService batchScanThreadPool; private volatile ExecutorService deleteRangeThreadPool; private volatile RegionManager regionManager; - private volatile boolean enableGrpcForward; + private final boolean enableGrpcForward; private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder; private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder; private volatile boolean isClosed = false; @@ -126,6 +126,8 @@ public class TiSession implements AutoCloseable { conf.getKeepaliveTime(), conf.getKeepaliveTimeout(), conf.getIdleTimeout(), + conf.getConnRecycleTimeInSeconds(), + conf.getCertReloadIntervalInSeconds(), conf.getJksKeyPath(), conf.getJksKeyPassword(), conf.getJksTrustPath(), @@ -137,6 +139,8 @@ public class TiSession implements AutoCloseable { conf.getKeepaliveTime(), conf.getKeepaliveTimeout(), conf.getIdleTimeout(), + conf.getConnRecycleTimeInSeconds(), + conf.getCertReloadIntervalInSeconds(), conf.getTrustCertCollectionFile(), conf.getKeyCertChainFile(), conf.getKeyFile()); diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index 7feac9bb2f..df40378950 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -59,7 +59,7 @@ public class PDErrorHandler implements ErrorHandler { case PD_ERROR: backOffer.doBackOff( BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString())); - client.updateLeaderOrforwardFollower(); + client.updateLeaderOrForwardFollower(); return true; case REGION_PEER_NOT_ELECTED: logger.debug(error.getMessage()); @@ -80,7 +80,7 @@ public class PDErrorHandler implements ErrorHandler { return false; } backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); - client.updateLeaderOrforwardFollower(); + client.updateLeaderOrForwardFollower(); return true; } } diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java index c22186299e..8d305649c4 100644 --- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -96,6 +96,7 @@ public class StoreHealthyChecker implements Runnable { return true; } } catch (Exception e) { + logger.info("fail to check tombstone stores", e); return false; } return false; diff --git a/src/main/java/org/tikv/common/util/BackOffFunction.java b/src/main/java/org/tikv/common/util/BackOffFunction.java index 72435dca35..3eb07a725a 100644 --- a/src/main/java/org/tikv/common/util/BackOffFunction.java +++ b/src/main/java/org/tikv/common/util/BackOffFunction.java @@ -80,6 +80,7 @@ public class BackOffFunction { BoUpdateLeader, BoServerBusy, BoTxnNotFound, - BoCheckTimeout + BoCheckTimeout, + BoCheckHealth } } diff --git a/src/main/java/org/tikv/common/util/ChannelFactory.java b/src/main/java/org/tikv/common/util/ChannelFactory.java index f91719352f..28d53e9348 100644 --- a/src/main/java/org/tikv/common/util/ChannelFactory.java +++ b/src/main/java/org/tikv/common/util/ChannelFactory.java @@ -18,6 +18,7 @@ package org.tikv.common.util; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import io.grpc.ManagedChannel; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyChannelBuilder; @@ -27,9 +28,15 @@ import java.io.File; import java.io.FileInputStream; import java.net.URI; import java.security.KeyStore; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLException; import javax.net.ssl.TrustManagerFactory; @@ -40,43 +47,91 @@ import org.tikv.common.pd.PDUtils; public class ChannelFactory implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(ChannelFactory.class); + private static final String PUB_KEY_INFRA = "PKIX"; + // After `connRecycleTime` seconds elapses, the old channels will be forced to shut down, + // to avoid using the old context all the time including potential channel leak. + private final long connRecycleTime; private final int maxFrameSize; private final int keepaliveTime; private final int keepaliveTimeout; private final int idleTimeout; - private final ConcurrentHashMap connPool = new ConcurrentHashMap<>(); private final CertContext certContext; + private final CertWatcher certWatcher; + + @VisibleForTesting + public final ConcurrentHashMap connPool = new ConcurrentHashMap<>(); + private final AtomicReference sslContextBuilder = new AtomicReference<>(); - private static final String PUB_KEY_INFRA = "PKIX"; - private abstract static class CertContext { - protected abstract boolean isModified(); + private final ScheduledExecutorService recycler = Executors.newSingleThreadScheduledExecutor(); - protected abstract SslContextBuilder createSslContextBuilder(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); - public SslContextBuilder reload() { - if (isModified()) { - logger.info("reload ssl context"); - return createSslContextBuilder(); + @VisibleForTesting + public static class CertWatcher implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(CertWatcher.class); + private final List targets; + private final List lastReload = new ArrayList<>(); + private final ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor(); + private final Runnable onChange; + + public CertWatcher(long pollInterval, List targets, Runnable onChange) { + this.targets = targets; + this.onChange = onChange; + + for (File ignored : targets) { + lastReload.add(0L); } - return null; + + executorService.scheduleAtFixedRate( + this::tryReload, pollInterval, pollInterval, TimeUnit.SECONDS); + } + + private void tryReload() { + if (needReload()) { + onChange.run(); + } + } + + private boolean needReload() { + boolean needReload = false; + // Check all the modification of the `targets`. + // If one of them changed, means to need reload. + for (int i = 0; i < targets.size(); i++) { + try { + long lastModified = targets.get(i).lastModified(); + if (lastModified != lastReload.get(i)) { + lastReload.set(i, lastModified); + logger.warn("detected ssl context changes: {}", targets.get(i)); + needReload = true; + } + } catch (Exception e) { + logger.error("fail to check the status of ssl context files", e); + } + } + return needReload; + } + + @Override + public void close() { + executorService.shutdown(); } } - private static class JksContext extends CertContext { - private long keyLastModified; - private long trustLastModified; + @VisibleForTesting + public abstract static class CertContext { + public abstract SslContextBuilder createSslContextBuilder(); + } + public static class JksContext extends CertContext { private final String keyPath; private final String keyPassword; private final String trustPath; private final String trustPassword; public JksContext(String keyPath, String keyPassword, String trustPath, String trustPassword) { - this.keyLastModified = 0; - this.trustLastModified = 0; - this.keyPath = keyPath; this.keyPassword = keyPassword; this.trustPath = trustPath; @@ -84,22 +139,7 @@ public class ChannelFactory implements AutoCloseable { } @Override - protected synchronized boolean isModified() { - long a = new File(keyPath).lastModified(); - long b = new File(trustPath).lastModified(); - - boolean changed = this.keyLastModified != a || this.trustLastModified != b; - - if (changed) { - this.keyLastModified = a; - this.trustLastModified = b; - } - - return changed; - } - - @Override - protected SslContextBuilder createSslContextBuilder() { + public SslContextBuilder createSslContextBuilder() { SslContextBuilder builder = GrpcSslContexts.forClient(); try { if (keyPath != null && keyPassword != null) { @@ -119,50 +159,26 @@ public class ChannelFactory implements AutoCloseable { } } catch (Exception e) { logger.error("JKS SSL context builder failed!", e); + throw new IllegalArgumentException(e); } return builder; } } - private static class OpenSslContext extends CertContext { - private long trustLastModified; - private long chainLastModified; - private long keyLastModified; - + @VisibleForTesting + public static class OpenSslContext extends CertContext { private final String trustPath; private final String chainPath; private final String keyPath; public OpenSslContext(String trustPath, String chainPath, String keyPath) { - this.trustLastModified = 0; - this.chainLastModified = 0; - this.keyLastModified = 0; - this.trustPath = trustPath; this.chainPath = chainPath; this.keyPath = keyPath; } @Override - protected synchronized boolean isModified() { - long a = new File(trustPath).lastModified(); - long b = new File(chainPath).lastModified(); - long c = new File(keyPath).lastModified(); - - boolean changed = - this.trustLastModified != a || this.chainLastModified != b || this.keyLastModified != c; - - if (changed) { - this.trustLastModified = a; - this.chainLastModified = b; - this.keyLastModified = c; - } - - return changed; - } - - @Override - protected SslContextBuilder createSslContextBuilder() { + public SslContextBuilder createSslContextBuilder() { SslContextBuilder builder = GrpcSslContexts.forClient(); if (trustPath != null) { builder.trustManager(new File(trustPath)); @@ -180,7 +196,9 @@ public class ChannelFactory implements AutoCloseable { this.keepaliveTime = keepaliveTime; this.keepaliveTimeout = keepaliveTimeout; this.idleTimeout = idleTimeout; + this.certWatcher = null; this.certContext = null; + this.connRecycleTime = 0; } public ChannelFactory( @@ -188,6 +206,8 @@ public class ChannelFactory implements AutoCloseable { int keepaliveTime, int keepaliveTimeout, int idleTimeout, + long connRecycleTime, + long certReloadInterval, String trustCertCollectionFilePath, String keyCertChainFilePath, String keyFilePath) { @@ -195,8 +215,22 @@ public class ChannelFactory implements AutoCloseable { this.keepaliveTime = keepaliveTime; this.keepaliveTimeout = keepaliveTimeout; this.idleTimeout = idleTimeout; + this.connRecycleTime = connRecycleTime; this.certContext = new OpenSslContext(trustCertCollectionFilePath, keyCertChainFilePath, keyFilePath); + + File trustCert = new File(trustCertCollectionFilePath); + File keyCert = new File(keyCertChainFilePath); + File key = new File(keyFilePath); + + if (certReloadInterval > 0) { + onCertChange(); + this.certWatcher = + new CertWatcher( + certReloadInterval, ImmutableList.of(trustCert, keyCert, key), this::onCertChange); + } else { + this.certWatcher = null; + } } public ChannelFactory( @@ -204,6 +238,8 @@ public class ChannelFactory implements AutoCloseable { int keepaliveTime, int keepaliveTimeout, int idleTimeout, + long connRecycleTime, + long certReloadInterval, String jksKeyPath, String jksKeyPassword, String jksTrustPath, @@ -212,66 +248,99 @@ public class ChannelFactory implements AutoCloseable { this.keepaliveTime = keepaliveTime; this.keepaliveTimeout = keepaliveTimeout; this.idleTimeout = idleTimeout; + this.connRecycleTime = connRecycleTime; this.certContext = new JksContext(jksKeyPath, jksKeyPassword, jksTrustPath, jksTrustPassword); + + File jksKey = new File(jksKeyPath); + File jksTrust = new File(jksTrustPath); + if (certReloadInterval > 0) { + onCertChange(); + this.certWatcher = + new CertWatcher( + certReloadInterval, ImmutableList.of(jksKey, jksTrust), this::onCertChange); + } else { + this.certWatcher = null; + } } - @VisibleForTesting - public boolean reloadSslContext() { + private void onCertChange() { + try { + SslContextBuilder newBuilder = certContext.createSslContextBuilder(); + lock.writeLock().lock(); + sslContextBuilder.set(newBuilder); + + List pending = new ArrayList<>(connPool.values()); + recycler.schedule(() -> cleanExpiredConn(pending), connRecycleTime, TimeUnit.SECONDS); + + connPool.clear(); + } finally { + lock.writeLock().unlock(); + } + } + + public ManagedChannel getChannel(String address, HostMapping mapping) { if (certContext != null) { - SslContextBuilder newBuilder = certContext.reload(); - if (newBuilder != null) { - sslContextBuilder.set(newBuilder); - return true; + try { + lock.readLock().lock(); + return connPool.computeIfAbsent( + address, key -> createChannel(sslContextBuilder.get(), address, mapping)); + } finally { + lock.readLock().unlock(); } } - return false; + return connPool.computeIfAbsent(address, key -> createChannel(null, address, mapping)); } - public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) { - if (reloadSslContext()) { - logger.info("invalidate connection pool"); - connPool.clear(); + private ManagedChannel createChannel( + SslContextBuilder sslContextBuilder, String address, HostMapping mapping) { + URI uri, mapped; + try { + uri = PDUtils.addrToUri(address); + } catch (Exception e) { + throw new IllegalArgumentException("failed to form address " + address, e); + } + try { + mapped = mapping.getMappedURI(uri); + } catch (Exception e) { + throw new IllegalArgumentException("failed to get mapped address " + uri, e); } - return connPool.computeIfAbsent( - addressStr, - key -> { - URI address; - URI mappedAddr; - try { - address = PDUtils.addrToUri(key); - } catch (Exception e) { - throw new IllegalArgumentException("failed to form address " + key, e); - } - try { - mappedAddr = hostMapping.getMappedURI(address); - } catch (Exception e) { - throw new IllegalArgumentException("failed to get mapped address " + address, e); - } + // Channel should be lazy without actual connection until first call + // So a coarse grain lock is ok here + NettyChannelBuilder builder = + NettyChannelBuilder.forAddress(mapped.getHost(), mapped.getPort()) + .maxInboundMessageSize(maxFrameSize) + .keepAliveTime(keepaliveTime, TimeUnit.SECONDS) + .keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .idleTimeout(idleTimeout, TimeUnit.SECONDS); - // Channel should be lazy without actual connection until first call - // So a coarse grain lock is ok here - NettyChannelBuilder builder = - NettyChannelBuilder.forAddress(mappedAddr.getHost(), mappedAddr.getPort()) - .maxInboundMessageSize(maxFrameSize) - .keepAliveTime(keepaliveTime, TimeUnit.SECONDS) - .keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS) - .keepAliveWithoutCalls(true) - .idleTimeout(idleTimeout, TimeUnit.SECONDS); + if (sslContextBuilder == null) { + return builder.usePlaintext().build(); + } else { + SslContext sslContext; + try { + sslContext = sslContextBuilder.build(); + } catch (SSLException e) { + logger.error("create ssl context failed!", e); + throw new IllegalArgumentException(e); + } + return builder.sslContext(sslContext).build(); + } + } - if (certContext == null) { - return builder.usePlaintext().build(); - } else { - SslContext sslContext; - try { - sslContext = sslContextBuilder.get().build(); - } catch (SSLException e) { - logger.error("create ssl context failed!", e); - return null; - } - return builder.sslContext(sslContext).build(); - } - }); + private void cleanExpiredConn(List pending) { + for (ManagedChannel channel : pending) { + logger.info("cleaning expire channels"); + channel.shutdownNow(); + while (!channel.isShutdown()) { + try { + channel.awaitTermination(5, TimeUnit.SECONDS); + } catch (Exception e) { + logger.warn("recycle channels timeout:", e); + } + } + } } public void close() { @@ -279,5 +348,10 @@ public class ChannelFactory implements AutoCloseable { ch.shutdown(); } connPool.clear(); + + if (certContext != null) { + recycler.shutdown(); + certWatcher.close(); + } } } diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index ac00d087fe..cef280567c 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -171,6 +171,9 @@ public class ConcreteBackOffer implements BackOffer { case BoCheckTimeout: backOffFunction = BackOffFunction.create(0, 0, BackOffStrategy.NoJitter); break; + case BoCheckHealth: + backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter); + break; } return backOffFunction; } diff --git a/src/test/java/org/tikv/common/ChannelFactoryTest.java b/src/test/java/org/tikv/common/ChannelFactoryTest.java index 6296bf5a30..131943be74 100644 --- a/src/test/java/org/tikv/common/ChannelFactoryTest.java +++ b/src/test/java/org/tikv/common/ChannelFactoryTest.java @@ -17,27 +17,92 @@ package org.tikv.common; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import com.google.common.collect.ImmutableList; +import io.grpc.ManagedChannel; import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.junit.Test; import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.ChannelFactory.CertWatcher; public class ChannelFactoryTest { + private final AtomicLong ts = new AtomicLong(System.currentTimeMillis()); + private final String tlsPath = "src/test/resources/tls/"; + private final String caPath = tlsPath + "ca.crt"; + private final String clientCertPath = tlsPath + "client.crt"; + private final String clientKeyPath = tlsPath + "client.pem"; + + private ChannelFactory createFactory() { + int v = 1024; + return new ChannelFactory(v, v, v, v, 5, 10, caPath, clientCertPath, clientKeyPath); + } + + private void touchCert() { + ts.addAndGet(100_000_000); + assertTrue(new File(caPath).setLastModified(ts.get())); + } + @Test - public void testTlsReload() { - final int v = 1024; - String tlsPath = "src/test/resources/tls/"; - String caPath = tlsPath + "ca.crt"; - String clientCertPath = tlsPath + "client.crt"; - String clientKeyPath = tlsPath + "client.pem"; - ChannelFactory factory = new ChannelFactory(v, v, v, v, caPath, clientCertPath, clientKeyPath); - HostMapping mapping = uri -> uri; + public void testCertWatcher() throws InterruptedException { + AtomicBoolean changed = new AtomicBoolean(false); + File a = new File(caPath); + File b = new File(clientCertPath); + File c = new File(clientKeyPath); + new CertWatcher(2, ImmutableList.of(a, b, c), () -> changed.set(true)); + Thread.sleep(5000); + assertTrue(changed.get()); + } - factory.getChannel("127.0.0.1:2379", mapping); + @Test + public void testMultiThreadTlsReload() throws InterruptedException { + ChannelFactory factory = createFactory(); + HostMapping hostMapping = uri -> uri; - assertTrue(new File(clientKeyPath).setLastModified(System.currentTimeMillis())); + int taskCount = Runtime.getRuntime().availableProcessors() * 2; + List tasks = new ArrayList<>(taskCount); + for (int i = 0; i < taskCount; i++) { + Thread t = + new Thread( + () -> { + for (int j = 0; j < 100; j++) { + String addr = "127.0.0.1:237" + (j % 2 == 0 ? 9 : 8); + ManagedChannel c = factory.getChannel(addr, hostMapping); + assertNotNull(c); + c.shutdownNow(); + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { + } + } + }); + t.start(); + tasks.add(t); + } + Thread reactor = + new Thread( + () -> { + for (int i = 0; i < 100; i++) { + touchCert(); + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { + } + } + }); + reactor.start(); - assertTrue(factory.reloadSslContext()); + for (Thread t : tasks) { + t.join(); + } + reactor.join(); + + factory.close(); + assertTrue(factory.connPool.isEmpty()); } } diff --git a/src/test/java/org/tikv/common/GrpcUtils.java b/src/test/java/org/tikv/common/GrpcUtils.java index 3c051e3ad1..e6793f01f1 100644 --- a/src/test/java/org/tikv/common/GrpcUtils.java +++ b/src/test/java/org/tikv/common/GrpcUtils.java @@ -19,6 +19,7 @@ package org.tikv.common; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; +import java.net.ServerSocket; import java.util.Arrays; import org.tikv.common.codec.Codec.BytesCodec; import org.tikv.common.codec.CodecDataOutput; @@ -108,4 +109,13 @@ public class GrpcUtils { .setStore(store) .build(); } + + public static int getFreePort() { + while (true) { + try (ServerSocket s = new ServerSocket(0)) { + return s.getLocalPort(); + } catch (Exception ignore) { + } + } + } } diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index 412d9dcfb4..18de79819e 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -394,7 +394,6 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { } private static class HealCheck extends HealthImplBase { - @Override public void check( HealthCheckRequest request, StreamObserver responseObserver) { diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index 10e0901d34..a8074d9457 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -40,7 +40,6 @@ import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.Metapb.StoreState; public class PDClientMockTest extends PDMockServerTest { - private static final String LOCAL_ADDR_IPV6 = "[::1]"; public static final String HTTP = "http://"; diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 96c4970af9..7980e0703f 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -20,6 +20,10 @@ package org.tikv.common; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.Status; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.health.v1.HealthGrpc.HealthImplBase; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; @@ -130,10 +134,21 @@ public class PDMockServer extends PDGrpc.PDImplBase { start(clusterId, port); } + private static class HealCheck extends HealthImplBase { + @Override + public void check( + HealthCheckRequest request, StreamObserver responseObserver) { + responseObserver.onNext( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); + responseObserver.onCompleted(); + } + } + public void start(long clusterId, int port) throws IOException { this.clusterId = clusterId; this.port = port; - server = ServerBuilder.forPort(port).addService(this).build().start(); + server = + ServerBuilder.forPort(port).addService(new HealCheck()).addService(this).build().start(); Runtime.getRuntime().addShutdownHook(new Thread(PDMockServer.this::stop)); } diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index e61a10cce9..81821885b0 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -18,14 +18,12 @@ package org.tikv.common; import java.io.IOException; -import java.net.ServerSocket; import java.util.ArrayList; import java.util.List; import org.junit.After; import org.junit.Before; public abstract class PDMockServerTest { - protected static final String LOCAL_ADDR = "127.0.0.1"; static final long CLUSTER_ID = 1024; protected TiSession session; @@ -40,9 +38,7 @@ public abstract class PDMockServerTest { void setup(String addr) throws IOException { int[] ports = new int[3]; for (int i = 0; i < ports.length; i++) { - try (ServerSocket s = new ServerSocket(0)) { - ports[i] = s.getLocalPort(); - } + ports[i] = GrpcUtils.getFreePort(); } for (int i = 0; i < ports.length; i++) { diff --git a/src/test/java/org/tikv/common/TiConfigurationTest.java b/src/test/java/org/tikv/common/TiConfigurationTest.java index f4968c9b1b..a6d19c7a48 100644 --- a/src/test/java/org/tikv/common/TiConfigurationTest.java +++ b/src/test/java/org/tikv/common/TiConfigurationTest.java @@ -84,6 +84,16 @@ public class TiConfigurationTest { assertFalse(conf.isJksEnable()); } + @Test + public void certReloadTest() { + TiConfiguration conf = TiConfiguration.createDefault(); + conf.setCertReloadIntervalInSeconds(10); + conf.setConnRecycleTimeInSeconds(10); + + assertEquals(10, conf.getCertReloadIntervalInSeconds()); + assertEquals(10, conf.getConnRecycleTimeInSeconds()); + } + @Test public void slowLogDefaultValueTest() { TiConfiguration conf = TiConfiguration.createRawDefault();