mirror of https://github.com/tikv/client-java.git
Co-authored-by: iosmanthus <dengliming@pingcap.com>
This commit is contained in:
parent
9a127af613
commit
6c0f6adeaa
|
|
@ -182,6 +182,8 @@ public abstract class AbstractGRPCClient<
|
|||
|
||||
private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
|
||||
while (true) {
|
||||
backOffer.checkTimeout();
|
||||
|
||||
try {
|
||||
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
|
||||
HealthGrpc.HealthBlockingStub stub =
|
||||
|
|
@ -198,10 +200,6 @@ public abstract class AbstractGRPCClient<
|
|||
}
|
||||
|
||||
protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
|
||||
try {
|
||||
return doCheckHealth(backOffer, addressStr, hostMapping);
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
return doCheckHealth(backOffer, addressStr, hostMapping);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,9 +50,12 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
|
|
@ -122,6 +125,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
|
|||
private ConcurrentMap<Long, Double> tiflashReplicaMap;
|
||||
private HostMapping hostMapping;
|
||||
private long lastUpdateLeaderTime;
|
||||
private final ExecutorService updateLeaderService = Executors.newSingleThreadExecutor();
|
||||
private final AtomicBoolean updateLeaderNotify = new AtomicBoolean();
|
||||
|
||||
public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY =
|
||||
HistogramUtils.buildDuration()
|
||||
|
|
@ -426,6 +431,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
|
|||
if (channelFactory != null) {
|
||||
channelFactory.close();
|
||||
}
|
||||
|
||||
updateLeaderService.shutdownNow();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
@ -462,11 +469,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
|
|||
}
|
||||
|
||||
private GetMembersResponse getMembers(BackOffer backOffer, URI uri) {
|
||||
try {
|
||||
return doGetMembers(backOffer, uri);
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
return doGetMembers(backOffer, uri);
|
||||
}
|
||||
|
||||
// return whether the leader has changed to target address `leaderUrlStr`.
|
||||
|
|
@ -518,7 +521,26 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
|
|||
return true;
|
||||
}
|
||||
|
||||
public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
|
||||
public void tryUpdateLeaderOrForwardFollower() {
|
||||
if (updateLeaderNotify.compareAndSet(false, true)) {
|
||||
try {
|
||||
BackOffer backOffer = defaultBackOffer();
|
||||
updateLeaderService.submit(
|
||||
() -> {
|
||||
try {
|
||||
updateLeaderOrForwardFollower(backOffer);
|
||||
} finally {
|
||||
updateLeaderNotify.set(false);
|
||||
}
|
||||
});
|
||||
} catch (RejectedExecutionException e) {
|
||||
logger.error("PDClient is shutdown", e);
|
||||
updateLeaderNotify.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
|
||||
if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
|
|||
import org.tikv.common.PDClient;
|
||||
import org.tikv.common.exception.GrpcException;
|
||||
import org.tikv.common.exception.TiClientInternalException;
|
||||
import org.tikv.common.log.SlowLogSpan;
|
||||
import org.tikv.common.pd.PDError;
|
||||
import org.tikv.common.util.BackOffFunction;
|
||||
import org.tikv.common.util.BackOffer;
|
||||
|
|
@ -59,7 +60,12 @@ public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
case PD_ERROR:
|
||||
backOffer.doBackOff(
|
||||
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
|
||||
client.updateLeaderOrForwardFollower(backOffer);
|
||||
SlowLogSpan tryUpdateLeaderSpan = backOffer.getSlowLog().start("try_update_leader");
|
||||
try {
|
||||
client.tryUpdateLeaderOrForwardFollower();
|
||||
} finally {
|
||||
tryUpdateLeaderSpan.end();
|
||||
}
|
||||
return true;
|
||||
case REGION_PEER_NOT_ELECTED:
|
||||
logger.debug(error.getMessage());
|
||||
|
|
@ -80,7 +86,12 @@ public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
return false;
|
||||
}
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
|
||||
client.updateLeaderOrForwardFollower(backOffer);
|
||||
SlowLogSpan updateLeaderSpan = backOffer.getSlowLog().start("try_update_leader");
|
||||
try {
|
||||
client.tryUpdateLeaderOrForwardFollower();
|
||||
} finally {
|
||||
updateLeaderSpan.end();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -182,7 +182,7 @@ public class RegionManager {
|
|||
public Pair<TiRegion, TiStore> getRegionStorePairByKey(
|
||||
ByteString key, TiStoreType storeType, BackOffer backOffer) {
|
||||
TiRegion region = getRegionByKey(key, backOffer);
|
||||
if (!region.isValid()) {
|
||||
if (region == null || !region.isValid()) {
|
||||
throw new TiClientInternalException("Region invalid: " + region);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1408,38 +1408,34 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
this);
|
||||
}
|
||||
|
||||
public synchronized RegionStoreClient build(TiRegion region, TiStore store)
|
||||
throws GrpcException {
|
||||
public RegionStoreClient build(TiRegion region, TiStore store) throws GrpcException {
|
||||
return build(region, store, TiStoreType.TiKV);
|
||||
}
|
||||
|
||||
public synchronized RegionStoreClient build(ByteString key) throws GrpcException {
|
||||
public RegionStoreClient build(ByteString key) throws GrpcException {
|
||||
return build(key, TiStoreType.TiKV);
|
||||
}
|
||||
|
||||
public synchronized RegionStoreClient build(ByteString key, BackOffer backOffer)
|
||||
throws GrpcException {
|
||||
public RegionStoreClient build(ByteString key, BackOffer backOffer) throws GrpcException {
|
||||
return build(key, TiStoreType.TiKV, backOffer);
|
||||
}
|
||||
|
||||
public synchronized RegionStoreClient build(ByteString key, TiStoreType storeType)
|
||||
throws GrpcException {
|
||||
public RegionStoreClient build(ByteString key, TiStoreType storeType) throws GrpcException {
|
||||
return build(key, storeType, defaultBackOff());
|
||||
}
|
||||
|
||||
public synchronized RegionStoreClient build(
|
||||
ByteString key, TiStoreType storeType, BackOffer backOffer) throws GrpcException {
|
||||
public RegionStoreClient build(ByteString key, TiStoreType storeType, BackOffer backOffer)
|
||||
throws GrpcException {
|
||||
Pair<TiRegion, TiStore> pair =
|
||||
regionManager.getRegionStorePairByKey(key, storeType, backOffer);
|
||||
return build(pair.first, pair.second, storeType);
|
||||
}
|
||||
|
||||
public synchronized RegionStoreClient build(TiRegion region) throws GrpcException {
|
||||
public RegionStoreClient build(TiRegion region) throws GrpcException {
|
||||
return build(region, defaultBackOff());
|
||||
}
|
||||
|
||||
public synchronized RegionStoreClient build(TiRegion region, BackOffer backOffer)
|
||||
throws GrpcException {
|
||||
public RegionStoreClient build(TiRegion region, BackOffer backOffer) throws GrpcException {
|
||||
TiStore store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
|
||||
return build(region, store, TiStoreType.TiKV);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -88,18 +88,16 @@ public class PDClientV2MockTest extends PDMockServerTest {
|
|||
String start = "getRegionById";
|
||||
String end = "getRegionByIdEnd";
|
||||
leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, end));
|
||||
try (PDClient client = createClient()) {
|
||||
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
|
||||
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
|
||||
Assert.assertEquals(end, r.getEndKey().toStringUtf8());
|
||||
}
|
||||
PDClient client = createClient();
|
||||
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
|
||||
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
|
||||
Assert.assertEquals(end, r.getEndKey().toStringUtf8());
|
||||
|
||||
leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, ""));
|
||||
try (PDClient client = createClient()) {
|
||||
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
|
||||
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
|
||||
Assert.assertEquals("", r.getEndKey().toStringUtf8());
|
||||
}
|
||||
|
||||
r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
|
||||
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
|
||||
Assert.assertEquals("", r.getEndKey().toStringUtf8());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -113,15 +111,14 @@ public class PDClientV2MockTest extends PDMockServerTest {
|
|||
.addRegions(Pdpb.Region.newBuilder().setRegion(makeRegion(start, end)).build())
|
||||
.build());
|
||||
|
||||
try (PDClient client = createClient()) {
|
||||
List<Region> regions =
|
||||
client.scanRegions(
|
||||
ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1);
|
||||
PDClient client = createClient();
|
||||
List<Region> regions =
|
||||
client.scanRegions(
|
||||
ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1);
|
||||
|
||||
for (Region r : regions) {
|
||||
Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8());
|
||||
Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8());
|
||||
}
|
||||
for (Region r : regions) {
|
||||
Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8());
|
||||
Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,9 +52,12 @@ public class TimeoutTest extends MockThreeStoresTest {
|
|||
try (RawKVClient client = createClient()) {
|
||||
pdServers.get(0).stop();
|
||||
long start = System.currentTimeMillis();
|
||||
client.get(ByteString.copyFromUtf8("key"));
|
||||
try {
|
||||
client.get(ByteString.copyFromUtf8("key"));
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
Assert.assertTrue(end - start < session.getConf().getRawKVReadTimeoutInMS() * 2L);
|
||||
Assert.assertTrue(end - start < (session.getConf().getRawKVReadTimeoutInMS() * 1.5));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue