diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index 616bcfbaeb..e23ae9be9d 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -198,8 +198,7 @@ public abstract class AbstractGRPCClient< } } - protected boolean checkHealth(String addressStr, HostMapping hostMapping) { - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff((int) (timeout * 2)); + protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { try { return doCheckHealth(backOffer, addressStr, hostMapping); } catch (Exception e) { diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index e253053480..384741d3d5 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -498,11 +498,11 @@ public class PDClient extends AbstractGRPCClient return true; } - synchronized boolean createFollowerClientWrapper(String followerUrlStr, String leaderUrls) { + synchronized boolean createFollowerClientWrapper(BackOffer backOffer, String followerUrlStr, String leaderUrls) { // TODO: Why not strip protocol info on server side since grpc does not need it try { - if (!checkHealth(followerUrlStr, hostMapping)) { + if (!checkHealth(backOffer, followerUrlStr, hostMapping)) { return false; } @@ -535,7 +535,7 @@ public class PDClient extends AbstractGRPCClient leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // if leader is switched, just return. - if (checkHealth(leaderUrlStr, hostMapping) && createLeaderClientWrapper(leaderUrlStr)) { + if (checkHealth(backOffer, leaderUrlStr, hostMapping) && createLeaderClientWrapper(leaderUrlStr)) { lastUpdateLeaderTime = System.currentTimeMillis(); return; } @@ -562,7 +562,7 @@ public class PDClient extends AbstractGRPCClient hasReachNextMember = true; continue; } - if (hasReachNextMember && createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) { + if (hasReachNextMember && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) { logger.warn( String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr)); return; @@ -592,7 +592,7 @@ public class PDClient extends AbstractGRPCClient leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // If leader is not change but becomes available, we can cancel follower forward. - if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { + if (checkHealth(defaultBackOffer(), leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { if (!urls.equals(this.pdAddrs)) { tryUpdateMembers(urls); } diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 08018d0d49..edd3a19f44 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -20,6 +20,7 @@ package org.tikv.common; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.function.Function; import org.junit.After; import org.junit.Before; import org.tikv.common.TiConfiguration.ApiVersion; @@ -51,6 +52,16 @@ public abstract class PDMockServerTest { session = TiSession.create(conf); } + void updateConf(Function update) throws Exception { + if (session == null) { + throw new IllegalStateException("Cluster is not initialized"); + } + + session.close(); + + session = TiSession.create(update.apply(session.getConf())); + } + void setup(String addr) throws IOException { int[] ports = new int[3]; for (int i = 0; i < ports.length; i++) { diff --git a/src/test/java/org/tikv/common/TimeoutTest.java b/src/test/java/org/tikv/common/TimeoutTest.java new file mode 100644 index 0000000000..0ebb91d6a2 --- /dev/null +++ b/src/test/java/org/tikv/common/TimeoutTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.tikv.raw.RawKVClient; + +public class TimeoutTest extends MockThreeStoresTest { + @Before + public void init() throws Exception { + updateConf(conf -> { + conf.setEnableAtomicForCAS(true); + conf.setTimeout(150); + conf.setForwardTimeout(200); + conf.setRawKVReadTimeoutInMS(400); + conf.setRawKVWriteTimeoutInMS(400); + conf.setRawKVBatchReadTimeoutInMS(400); + conf.setRawKVBatchWriteTimeoutInMS(400); + conf.setRawKVWriteSlowLogInMS(50); + conf.setRawKVReadSlowLogInMS(50); + conf.setRawKVBatchReadSlowLogInMS(50); + conf.setRawKVBatchWriteSlowLogInMS(50); + return conf; + }); + } + + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testTimeoutInTime() { + try (RawKVClient client = createClient()) { + pdServers.get(0).stop(); + long start = System.currentTimeMillis(); + client.get(ByteString.copyFromUtf8("key")); + long end = System.currentTimeMillis(); + Assert.assertTrue(end - start < 500); + } + } +}