Optimize grpc forward and switch leader logic (#324)

This commit is contained in:
birdstorm 2021-11-19 09:15:04 +08:00 committed by GitHub
parent b1e0c93f0c
commit b60afd0b97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 289 additions and 258 deletions

View File

@ -23,6 +23,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.health.v1.HealthCheckRequest; import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse; import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc; import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.AbstractFutureStub;
import io.grpc.stub.AbstractStub; import io.grpc.stub.AbstractStub;
import io.grpc.stub.ClientCalls; import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
@ -38,14 +39,15 @@ import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ChannelFactory;
public abstract class AbstractGRPCClient< public abstract class AbstractGRPCClient<
BlockingStubT extends AbstractStub<BlockingStubT>, StubT extends AbstractStub<StubT>> BlockingStubT extends AbstractStub<BlockingStubT>,
FutureStubT extends AbstractFutureStub<FutureStubT>>
implements AutoCloseable { implements AutoCloseable {
protected final Logger logger = LoggerFactory.getLogger(this.getClass()); protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected final ChannelFactory channelFactory; protected final ChannelFactory channelFactory;
protected TiConfiguration conf; protected TiConfiguration conf;
protected long timeout; protected long timeout;
protected BlockingStubT blockingStub; protected BlockingStubT blockingStub;
protected StubT asyncStub; protected FutureStubT asyncStub;
protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) { protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) {
this.conf = conf; this.conf = conf;
@ -57,7 +59,7 @@ public abstract class AbstractGRPCClient<
TiConfiguration conf, TiConfiguration conf,
ChannelFactory channelFactory, ChannelFactory channelFactory,
BlockingStubT blockingStub, BlockingStubT blockingStub,
StubT asyncStub) { FutureStubT asyncStub) {
this.conf = conf; this.conf = conf;
this.timeout = conf.getTimeout(); this.timeout = conf.getTimeout();
this.channelFactory = channelFactory; this.channelFactory = channelFactory;
@ -109,7 +111,7 @@ public abstract class AbstractGRPCClient<
.create(handler) .create(handler)
.callWithRetry( .callWithRetry(
() -> { () -> {
StubT stub = getAsyncStub(); FutureStubT stub = getAsyncStub();
ClientCalls.asyncUnaryCall( ClientCalls.asyncUnaryCall(
stub.getChannel().newCall(method, stub.getCallOptions()), stub.getChannel().newCall(method, stub.getCallOptions()),
requestFactory.get(), requestFactory.get(),
@ -133,7 +135,7 @@ public abstract class AbstractGRPCClient<
.create(handler) .create(handler)
.callWithRetry( .callWithRetry(
() -> { () -> {
StubT stub = getAsyncStub(); FutureStubT stub = getAsyncStub();
return asyncBidiStreamingCall( return asyncBidiStreamingCall(
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver); stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
}, },
@ -175,7 +177,7 @@ public abstract class AbstractGRPCClient<
protected abstract BlockingStubT getBlockingStub(); protected abstract BlockingStubT getBlockingStub();
protected abstract StubT getAsyncStub(); protected abstract FutureStubT getAsyncStub();
protected boolean checkHealth(String addressStr, HostMapping hostMapping) { protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);

View File

@ -69,7 +69,7 @@ import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.PDGrpc; import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.PDGrpc.PDBlockingStub; import org.tikv.kvproto.PDGrpc.PDBlockingStub;
import org.tikv.kvproto.PDGrpc.PDStub; import org.tikv.kvproto.PDGrpc.PDFutureStub;
import org.tikv.kvproto.Pdpb; import org.tikv.kvproto.Pdpb;
import org.tikv.kvproto.Pdpb.Error; import org.tikv.kvproto.Pdpb.Error;
import org.tikv.kvproto.Pdpb.ErrorType; import org.tikv.kvproto.Pdpb.ErrorType;
@ -92,7 +92,7 @@ import org.tikv.kvproto.Pdpb.Timestamp;
import org.tikv.kvproto.Pdpb.TsoRequest; import org.tikv.kvproto.Pdpb.TsoRequest;
import org.tikv.kvproto.Pdpb.TsoResponse; import org.tikv.kvproto.Pdpb.TsoResponse;
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub> public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
implements ReadOnlyPDClient { implements ReadOnlyPDClient {
private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync"; private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
private static final long MIN_TRY_UPDATE_DURATION = 50; private static final long MIN_TRY_UPDATE_DURATION = 50;
@ -550,7 +550,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
} }
@Override @Override
protected PDStub getAsyncStub() { protected PDFutureStub getAsyncStub() {
if (pdClientWrapper == null) { if (pdClientWrapper == null) {
throw new GrpcException("PDClient may not be initialized"); throw new GrpcException("PDClient may not be initialized");
} }
@ -644,7 +644,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
static class PDClientWrapper { static class PDClientWrapper {
private final String leaderInfo; private final String leaderInfo;
private final PDBlockingStub blockingStub; private final PDBlockingStub blockingStub;
private final PDStub asyncStub; private final PDFutureStub asyncStub;
private final long createTime; private final long createTime;
private final String storeAddress; private final String storeAddress;
@ -655,10 +655,10 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString()); header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString());
this.blockingStub = this.blockingStub =
MetadataUtils.attachHeaders(PDGrpc.newBlockingStub(clientChannel), header); MetadataUtils.attachHeaders(PDGrpc.newBlockingStub(clientChannel), header);
this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newStub(clientChannel), header); this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newFutureStub(clientChannel), header);
} else { } else {
this.blockingStub = PDGrpc.newBlockingStub(clientChannel); this.blockingStub = PDGrpc.newBlockingStub(clientChannel);
this.asyncStub = PDGrpc.newStub(clientChannel); this.asyncStub = PDGrpc.newFutureStub(clientChannel);
} }
this.leaderInfo = leaderInfo; this.leaderInfo = leaderInfo;
this.storeAddress = storeAddress; this.storeAddress = storeAddress;
@ -677,7 +677,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
return blockingStub; return blockingStub;
} }
PDStub getAsyncStub() { PDFutureStub getAsyncStub() {
return asyncStub; return asyncStub;
} }

View File

@ -35,9 +35,9 @@ public class TiConfiguration implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class); private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
public static final Metadata.Key FORWARD_META_DATA_KEY = public static final Metadata.Key<String> FORWARD_META_DATA_KEY =
Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key PD_FORWARD_META_DATA_KEY = public static final Metadata.Key<String> PD_FORWARD_META_DATA_KEY =
Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
static { static {

View File

@ -94,8 +94,6 @@ public class KVErrorHandler<RespT> implements ErrorHandler<RespT> {
Errorpb.Error error = regionHandler.getRegionError(resp); Errorpb.Error error = regionHandler.getRegionError(resp);
if (error != null) { if (error != null) {
return regionHandler.handleRegionError(backOffer, error); return regionHandler.handleRegionError(backOffer, error);
} else {
regionHandler.tryUpdateRegionStore();
} }
// Key error handling logic // Key error handling logic

View File

@ -46,16 +46,10 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
Errorpb.Error error = getRegionError(resp); Errorpb.Error error = getRegionError(resp);
if (error != null) { if (error != null) {
return handleRegionError(backOffer, error); return handleRegionError(backOffer, error);
} else {
tryUpdateRegionStore();
} }
return false; return false;
} }
public void tryUpdateRegionStore() {
recv.tryUpdateRegionStore();
}
public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
if (error.hasNotLeader()) { if (error.hasNotLeader()) {
// this error is reported from raftstore: // this error is reported from raftstore:

View File

@ -20,34 +20,47 @@ package org.tikv.common.region;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils; import io.grpc.stub.MetadataUtils;
import io.prometheus.client.Histogram;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient; import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration; import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.GrpcException;
import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc;
public abstract class AbstractRegionStoreClient public abstract class AbstractRegionStoreClient
extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvStub> extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvFutureStub>
implements RegionErrorReceiver { implements RegionErrorReceiver {
private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class); private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class);
public static final Histogram SEEK_LEADER_STORE_DURATION =
Histogram.build()
.name("client_java_seek_leader_store_duration")
.help("seek leader store duration.")
.register();
public static final Histogram SEEK_PROXY_STORE_DURATION =
Histogram.build()
.name("client_java_seek_proxy_store_duration")
.help("seek proxy store duration.")
.register();
protected final RegionManager regionManager; protected final RegionManager regionManager;
protected TiRegion region; protected TiRegion region;
protected TiStore targetStore; protected TiStore store;
protected TiStore originStore;
private long retryForwardTimes;
private long retryLeaderTimes;
private Metapb.Peer candidateLeader;
protected AbstractRegionStoreClient( protected AbstractRegionStoreClient(
TiConfiguration conf, TiConfiguration conf,
@ -55,7 +68,7 @@ public abstract class AbstractRegionStoreClient
TiStore store, TiStore store,
ChannelFactory channelFactory, ChannelFactory channelFactory,
TikvGrpc.TikvBlockingStub blockingStub, TikvGrpc.TikvBlockingStub blockingStub,
TikvGrpc.TikvStub asyncStub, TikvGrpc.TikvFutureStub asyncStub,
RegionManager regionManager) { RegionManager regionManager) {
super(conf, channelFactory, blockingStub, asyncStub); super(conf, channelFactory, blockingStub, asyncStub);
checkNotNull(region, "Region is empty"); checkNotNull(region, "Region is empty");
@ -63,14 +76,10 @@ public abstract class AbstractRegionStoreClient
checkArgument(region.getLeader() != null, "Leader Peer is null"); checkArgument(region.getLeader() != null, "Leader Peer is null");
this.region = region; this.region = region;
this.regionManager = regionManager; this.regionManager = regionManager;
this.targetStore = store; this.store = store;
this.originStore = null; if (this.store.getProxyStore() != null) {
this.candidateLeader = null;
this.retryForwardTimes = 0;
this.retryLeaderTimes = 0;
if (this.targetStore.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout(); this.timeout = conf.getForwardTimeout();
} else if (!this.targetStore.isReachable() && !this.targetStore.canForwardFirst()) { } else if (!this.store.isReachable()) {
onStoreUnreachable(); onStoreUnreachable();
} }
} }
@ -86,7 +95,7 @@ public abstract class AbstractRegionStoreClient
} }
@Override @Override
protected TikvGrpc.TikvStub getAsyncStub() { protected TikvGrpc.TikvFutureStub getAsyncStub() {
return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
} }
@ -110,215 +119,271 @@ public abstract class AbstractRegionStoreClient
return false; return false;
} }
// If we try one peer but find the leader has not changed, we do not need try other peers. // If we try one peer but find the leader has not changed, we do not need to try other peers.
if (candidateLeader != null if (region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) {
&& region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) { store = null;
retryLeaderTimes = newRegion.getFollowerList().size();
originStore = null;
} }
candidateLeader = null;
region = newRegion; region = newRegion;
targetStore = regionManager.getStoreById(region.getLeader().getStoreId()); store = regionManager.getStoreById(region.getLeader().getStoreId());
updateClientStub(); updateClientStub();
return true; return true;
} }
@Override @Override
public boolean onStoreUnreachable() { public boolean onStoreUnreachable() {
if (!targetStore.isValid()) { if (!store.isValid()) {
logger.warn(String.format("store [%d] has been invalid", targetStore.getId())); logger.warn(String.format("store [%d] has been invalid", store.getId()));
targetStore = regionManager.getStoreById(targetStore.getId()); store = regionManager.getStoreById(store.getId());
updateClientStub(); updateClientStub();
return true; return true;
} }
if (targetStore.getProxyStore() == null) { if (store.getProxyStore() == null && store.isReachable()) {
if (targetStore.isReachable()) { if (store.isReachable()) {
logger.info(
String.format(
"store[%d] for region[%d] is reachable, retry", store.getId(), region.getId()));
return true; return true;
} }
} }
// If this store has failed to forward request too many times, we shall try other peer at first // seek an available leader store to send request
// so that we can Boolean result = seekLeaderStore();
// reduce the latency cost by fail requests. if (result != null) {
if (targetStore.canForwardFirst()) { return result;
if (retryOtherStoreByProxyForward()) { }
return true; if (conf.getEnableGrpcForward()) {
} // seek an available proxy store to forward request
if (retryOtherStoreLeader()) { return seekProxyStore();
return true;
}
} else {
if (retryOtherStoreLeader()) {
return true;
}
if (retryOtherStoreByProxyForward()) {
return true;
}
} }
logger.warn(
String.format(
"retry time exceed for region[%d], invalid store[%d]",
region.getId(), targetStore.getId()));
regionManager.onRequestFail(region);
return false; return false;
} }
protected Kvrpcpb.Context makeContext(TiStoreType storeType) { protected Kvrpcpb.Context makeContext(TiStoreType storeType) {
if (candidateLeader != null && storeType == TiStoreType.TiKV) { return region.getReplicaContext(java.util.Collections.emptySet(), storeType);
return region.getReplicaContext(candidateLeader, java.util.Collections.emptySet());
} else {
return region.getReplicaContext(java.util.Collections.emptySet(), storeType);
}
} }
protected Kvrpcpb.Context makeContext(Set<Long> resolvedLocks, TiStoreType storeType) { protected Kvrpcpb.Context makeContext(Set<Long> resolvedLocks, TiStoreType storeType) {
if (candidateLeader != null && storeType == TiStoreType.TiKV) { return region.getReplicaContext(resolvedLocks, storeType);
return region.getReplicaContext(candidateLeader, resolvedLocks);
} else {
return region.getReplicaContext(resolvedLocks, storeType);
}
}
@Override
public void tryUpdateRegionStore() {
if (originStore != null) {
if (originStore.getId() == targetStore.getId()) {
logger.warn(
String.format(
"update store [%s] by proxy-store [%s]",
targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress()));
// We do not need to mark the store can-forward, because if one store has grpc forward
// successfully, it will
// create a new store object, which is can-forward.
regionManager.updateStore(originStore, targetStore);
} else {
originStore.forwardFail();
}
}
if (candidateLeader != null) {
logger.warn(
String.format(
"update leader to store [%d] for region[%d]",
candidateLeader.getStoreId(), region.getId()));
this.regionManager.updateLeader(region, candidateLeader.getStoreId());
}
}
private boolean retryOtherStoreLeader() {
List<Metapb.Peer> peers = region.getFollowerList();
if (retryLeaderTimes >= peers.size()) {
return false;
}
retryLeaderTimes += 1;
boolean hasVisitedStore = false;
for (Metapb.Peer cur : peers) {
if (candidateLeader == null || hasVisitedStore) {
TiStore store = regionManager.getStoreById(cur.getStoreId());
if (store != null && store.isReachable()) {
targetStore = store;
candidateLeader = cur;
logger.warn(
String.format(
"try store [%d],peer[%d] for region[%d], which may be new leader",
targetStore.getId(), candidateLeader.getId(), region.getId()));
updateClientStub();
return true;
} else {
continue;
}
} else if (candidateLeader.getId() == cur.getId()) {
hasVisitedStore = true;
}
}
candidateLeader = null;
retryLeaderTimes = peers.size();
return false;
} }
private void updateClientStub() { private void updateClientStub() {
String addressStr = targetStore.getStore().getAddress(); String addressStr = store.getStore().getAddress();
if (targetStore.getProxyStore() != null) { long deadline = timeout;
addressStr = targetStore.getProxyStore().getAddress(); if (store.getProxyStore() != null) {
addressStr = store.getProxyStore().getAddress();
deadline = conf.getForwardTimeout();
} }
ManagedChannel channel = ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel); blockingStub =
asyncStub = TikvGrpc.newStub(channel); TikvGrpc.newBlockingStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
if (targetStore.getProxyStore() != null) { asyncStub = TikvGrpc.newFutureStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
if (store.getProxyStore() != null) {
Metadata header = new Metadata(); Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress()); header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
blockingStub = MetadataUtils.attachHeaders(blockingStub, header); blockingStub = MetadataUtils.attachHeaders(blockingStub, header);
asyncStub = MetadataUtils.attachHeaders(asyncStub, header); asyncStub = MetadataUtils.attachHeaders(asyncStub, header);
} }
} }
private boolean retryOtherStoreByProxyForward() { private Boolean seekLeaderStore() {
if (!conf.getEnableGrpcForward()) { Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer();
return false; try {
} List<Metapb.Peer> peers = region.getFollowerList();
if (retryForwardTimes >= region.getFollowerList().size()) { if (peers.isEmpty()) {
// If we try to forward request to leader by follower failed, it means that the store of old // no followers available, retry
// leader may be logger.warn(String.format("no followers of region[%d] available, retry", region.getId()));
// unavailable but the new leader has not been report to PD. So we can ban this store for a regionManager.onRequestFail(region);
// short time to return false;
// avoid too many request try forward rather than try other peer.
if (originStore != null) {
originStore.forwardFail();
} }
return false;
}
TiStore proxyStore = switchProxyStore();
if (proxyStore == null) {
logger.warn(
String.format(
"no forward store can be selected for store [%s] and region[%d]",
targetStore.getStore().getAddress(), region.getId()));
if (originStore != null) {
originStore.forwardFail();
} else {
targetStore.forwardFail();
}
return false;
}
if (originStore == null) {
originStore = targetStore;
if (this.targetStore.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
}
}
targetStore = proxyStore;
retryForwardTimes += 1;
updateClientStub();
logger.warn(
String.format(
"forward request to store [%s] by store [%s] for region[%d]",
targetStore.getStore().getAddress(),
targetStore.getProxyStore().getAddress(),
region.getId()));
return true;
}
private TiStore switchProxyStore() { logger.info(String.format("try switch leader: region[%d]", region.getId()));
boolean hasVisitedStore = false;
List<Metapb.Peer> peers = region.getFollowerList(); Pair<Metapb.Peer, Boolean> pair = switchLeaderStore();
if (peers.isEmpty()) { Metapb.Peer peer = pair.first;
return null; boolean exceptionEncountered = pair.second;
} if (peer == null) {
Metapb.Store proxyStore = targetStore.getProxyStore(); if (!exceptionEncountered) {
if (proxyStore == null || peers.get(peers.size() - 1).getStoreId() == proxyStore.getId()) { // all response returned normally, the leader is not elected, just wait until it is ready.
hasVisitedStore = true; logger.info(
} String.format(
for (Metapb.Peer peer : peers) { "leader for region[%d] is not elected, just wait until it is ready",
if (hasVisitedStore) { region.getId()));
TiStore store = regionManager.getStoreById(peer.getStoreId()); return true;
if (store.isReachable()) { } else {
return targetStore.withProxy(store.getStore()); // no leader found, some response does not return normally, there may be network
// partition.
logger.warn(
String.format(
"leader for region[%d] is not found, it is possible that network partition occurred",
region.getId()));
}
} else {
// we found a leader
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId());
if (currentLeaderStore.isReachable()) {
logger.info(
String.format(
"update leader using switchLeader logic from store[%d] to store[%d]",
region.getLeader().getStoreId(), peer.getStoreId()));
// update region cache
region = regionManager.updateLeader(region, peer.getStoreId());
// switch to leader store
store = currentLeaderStore;
updateClientStub();
return true;
} }
} else if (peer.getStoreId() == proxyStore.getId()) {
hasVisitedStore = true;
} }
} finally {
switchLeaderDurationTimer.observeDuration();
} }
return null; return null;
} }
private boolean seekProxyStore() {
Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer();
try {
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
// when current leader cannot be reached
TiStore storeWithProxy = switchProxyStore();
if (storeWithProxy == null) {
// no store available, retry
logger.warn(String.format("No store available, retry: region[%d]", region.getId()));
return false;
}
// use proxy store to forward requests
regionManager.updateStore(store, storeWithProxy);
store = storeWithProxy;
updateClientStub();
return true;
} finally {
grpcForwardDurationTimer.observeDuration();
}
}
// first: leader peer, second: true if any responses returned with grpc error
private Pair<Metapb.Peer, Boolean> switchLeaderStore() {
List<SwitchLeaderTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(peer))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
responses.add(new SwitchLeaderTask(task, peer));
}
boolean exceptionEncountered = false;
while (true) {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
throw new GrpcException(e);
}
List<SwitchLeaderTask> unfinished = new LinkedList<>();
for (SwitchLeaderTask task : responses) {
if (!task.task.isDone()) {
unfinished.add(task);
continue;
}
try {
Kvrpcpb.RawGetResponse resp = task.task.get();
if (resp != null) {
if (!resp.hasRegionError()) {
// the peer is leader
logger.info(
String.format("rawGet response indicates peer[%d] is leader", task.peer.getId()));
return Pair.create(task.peer, exceptionEncountered);
}
}
} catch (Exception ignored) {
exceptionEncountered = true;
}
}
if (unfinished.isEmpty()) {
return Pair.create(null, exceptionEncountered);
}
responses = unfinished;
}
}
private TiStore switchProxyStore() {
long forwardTimeout = conf.getForwardTimeout();
List<ForwardCheckTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS);
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(peer))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task =
MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest);
responses.add(new ForwardCheckTask(task, peerStore.getStore()));
}
while (true) {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
throw new GrpcException(e);
}
List<ForwardCheckTask> unfinished = new LinkedList<>();
for (ForwardCheckTask task : responses) {
if (!task.task.isDone()) {
unfinished.add(task);
continue;
}
try {
// any answer will do
Kvrpcpb.RawGetResponse resp = task.task.get();
logger.info(
String.format(
"rawGetResponse indicates forward from [%s] to [%s]",
task.store.getAddress(), store.getAddress()));
return store.withProxy(task.store);
} catch (Exception ignored) {
}
}
if (unfinished.isEmpty()) {
return null;
}
responses = unfinished;
}
}
private static class SwitchLeaderTask {
private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
private final Metapb.Peer peer;
private SwitchLeaderTask(ListenableFuture<Kvrpcpb.RawGetResponse> task, Metapb.Peer peer) {
this.task = task;
this.peer = peer;
}
}
private static class ForwardCheckTask {
private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
private final Metapb.Store store;
private ForwardCheckTask(ListenableFuture<Kvrpcpb.RawGetResponse> task, Metapb.Store store) {
this.task = task;
this.store = store;
}
}
} }

View File

@ -23,7 +23,5 @@ public interface RegionErrorReceiver {
/// return whether we need to retry this request. /// return whether we need to retry this request.
boolean onStoreUnreachable(); boolean onStoreUnreachable();
void tryUpdateRegionStore();
TiRegion getRegion(); TiRegion getRegion();
} }

View File

@ -48,7 +48,7 @@ import org.tikv.kvproto.Kvrpcpb.*;
import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub; import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
import org.tikv.txn.AbstractLockResolverClient; import org.tikv.txn.AbstractLockResolverClient;
import org.tikv.txn.Lock; import org.tikv.txn.Lock;
import org.tikv.txn.ResolveLockResult; import org.tikv.txn.ResolveLockResult;
@ -93,7 +93,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
TiStoreType storeType, TiStoreType storeType,
ChannelFactory channelFactory, ChannelFactory channelFactory,
TikvBlockingStub blockingStub, TikvBlockingStub blockingStub,
TikvStub asyncStub, TikvFutureStub asyncStub,
RegionManager regionManager, RegionManager regionManager,
PDClient pdClient, PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) { RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
@ -124,7 +124,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
TikvBlockingStub tikvBlockingStub = TikvGrpc.newBlockingStub(channel); TikvBlockingStub tikvBlockingStub = TikvGrpc.newBlockingStub(channel);
TikvStub tikvAsyncStub = TikvGrpc.newStub(channel); TikvGrpc.TikvFutureStub tikvAsyncStub = TikvGrpc.newFutureStub(channel);
this.lockResolverClient = this.lockResolverClient =
AbstractLockResolverClient.getInstance( AbstractLockResolverClient.getInstance(
@ -1246,7 +1246,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
ManagedChannel channel = null; ManagedChannel channel = null;
TikvBlockingStub blockingStub = null; TikvBlockingStub blockingStub = null;
TikvStub asyncStub = null; TikvFutureStub asyncStub = null;
if (conf.getEnableGrpcForward() && store.getProxyStore() != null && !store.isReachable()) { if (conf.getEnableGrpcForward() && store.getProxyStore() != null && !store.isReachable()) {
addressStr = store.getProxyStore().getAddress(); addressStr = store.getProxyStore().getAddress();
@ -1255,11 +1255,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
Metadata header = new Metadata(); Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header);
asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newFutureStub(channel), header);
} else { } else {
channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel); blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel); asyncStub = TikvGrpc.newFutureStub(channel);
} }
return new RegionStoreClient( return new RegionStoreClient(

View File

@ -19,12 +19,12 @@ import org.tikv.kvproto.Metapb;
public class StoreHealthyChecker implements Runnable { public class StoreHealthyChecker implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class); private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class);
private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60; private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60;
private BlockingQueue<TiStore> taskQueue; private final BlockingQueue<TiStore> taskQueue;
private final ChannelFactory channelFactory; private final ChannelFactory channelFactory;
private final ReadOnlyPDClient pdClient; private final ReadOnlyPDClient pdClient;
private final RegionCache cache; private final RegionCache cache;
private long checkTombstoneTick; private long checkTombstoneTick;
private long timeout; private final long timeout;
public StoreHealthyChecker( public StoreHealthyChecker(
ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache, long timeout) { ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache, long timeout) {
@ -117,14 +117,9 @@ public class StoreHealthyChecker implements Runnable {
} }
} else { } else {
if (!store.isReachable()) { if (!store.isReachable()) {
logger.warn( logger.warn(String.format("store [%s] recovers to be reachable", store.getAddress()));
String.format(
"store [%s] recovers to be reachable and canforward", store.getAddress()));
store.markReachable(); store.markReachable();
} }
if (!store.canForwardFirst()) {
store.makrCanForward();
}
} }
} else if (store.isReachable()) { } else if (store.isReachable()) {
unreachableStore.add(store); unreachableStore.add(store);

View File

@ -159,6 +159,10 @@ public class TiRegion implements Serializable {
return getContext(currentPeer, resolvedLocks, false); return getContext(currentPeer, resolvedLocks, false);
} }
public Kvrpcpb.Context getReplicaContext(Peer currentPeer) {
return getContext(currentPeer, java.util.Collections.emptySet(), false);
}
private Kvrpcpb.Context getContext( private Kvrpcpb.Context getContext(
Peer currentPeer, Set<Long> resolvedLocks, boolean replicaRead) { Peer currentPeer, Set<Long> resolvedLocks, boolean replicaRead) {

View File

@ -1,25 +1,19 @@
package org.tikv.common.region; package org.tikv.common.region;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb;
public class TiStore { public class TiStore {
private static long MAX_FAIL_FORWARD_TIMES = 4;
private final Metapb.Store store; private final Metapb.Store store;
private final Metapb.Store proxyStore; private final Metapb.Store proxyStore;
private AtomicBoolean reachable; private final AtomicBoolean reachable;
private AtomicBoolean valid; private final AtomicBoolean valid;
private AtomicLong failForwardCount;
private AtomicBoolean canForward;
public TiStore(Metapb.Store store) { public TiStore(Metapb.Store store) {
this.store = store; this.store = store;
this.reachable = new AtomicBoolean(true); this.reachable = new AtomicBoolean(true);
this.valid = new AtomicBoolean(true); this.valid = new AtomicBoolean(true);
this.canForward = new AtomicBoolean(true);
this.proxyStore = null; this.proxyStore = null;
this.failForwardCount = new AtomicLong(0);
} }
private TiStore(Metapb.Store store, Metapb.Store proxyStore) { private TiStore(Metapb.Store store, Metapb.Store proxyStore) {
@ -30,9 +24,7 @@ public class TiStore {
this.reachable = new AtomicBoolean(true); this.reachable = new AtomicBoolean(true);
} }
this.valid = new AtomicBoolean(true); this.valid = new AtomicBoolean(true);
this.canForward = new AtomicBoolean(true);
this.proxyStore = proxyStore; this.proxyStore = proxyStore;
this.failForwardCount = new AtomicLong(0);
} }
@java.lang.Override @java.lang.Override
@ -81,23 +73,6 @@ public class TiStore {
this.valid.set(false); this.valid.set(false);
} }
public void forwardFail() {
if (this.canForward.get()) {
if (this.failForwardCount.addAndGet(1) >= MAX_FAIL_FORWARD_TIMES) {
this.canForward.set(false);
}
}
}
public void makrCanForward() {
this.failForwardCount.set(0);
this.canForward.set(true);
}
public boolean canForwardFirst() {
return this.canForward.get();
}
public Metapb.Store getStore() { public Metapb.Store getStore() {
return this.store; return this.store;
} }

View File

@ -131,7 +131,7 @@ public class ConcreteBackOffer implements BackOffer {
backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter); backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter);
break; break;
case BoTiKVRPC: case BoTiKVRPC:
backOffFunction = BackOffFunction.create(100, 400, BackOffStrategy.EqualJitter); backOffFunction = BackOffFunction.create(10, 400, BackOffStrategy.EqualJitter);
break; break;
case BoTxnNotFound: case BoTxnNotFound:
backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter); backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter);

View File

@ -71,7 +71,7 @@ public interface AbstractLockResolverClient {
TiRegion region, TiRegion region,
TiStore store, TiStore store,
TikvGrpc.TikvBlockingStub blockingStub, TikvGrpc.TikvBlockingStub blockingStub,
TikvGrpc.TikvStub asyncStub, TikvGrpc.TikvFutureStub asyncStub,
ChannelFactory channelFactory, ChannelFactory channelFactory,
RegionManager regionManager, RegionManager regionManager,
PDClient pdClient, PDClient pdClient,

View File

@ -52,7 +52,7 @@ import org.tikv.kvproto.Kvrpcpb.ResolveLockRequest;
import org.tikv.kvproto.Kvrpcpb.ResolveLockResponse; import org.tikv.kvproto.Kvrpcpb.ResolveLockResponse;
import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub; import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
/** Before v3.0.5 TiDB uses the ttl on secondary lock. */ /** Before v3.0.5 TiDB uses the ttl on secondary lock. */
public class LockResolverClientV2 extends AbstractRegionStoreClient public class LockResolverClientV2 extends AbstractRegionStoreClient
@ -77,7 +77,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
TiRegion region, TiRegion region,
TiStore store, TiStore store,
TikvBlockingStub blockingStub, TikvBlockingStub blockingStub,
TikvStub asyncStub, TikvFutureStub asyncStub,
ChannelFactory channelFactory, ChannelFactory channelFactory,
RegionManager regionManager) { RegionManager regionManager) {
super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager); super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);

View File

@ -49,7 +49,7 @@ import org.tikv.kvproto.Kvrpcpb.CleanupRequest;
import org.tikv.kvproto.Kvrpcpb.CleanupResponse; import org.tikv.kvproto.Kvrpcpb.CleanupResponse;
import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub; import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
/** Since v3.0.5 TiDB ignores the ttl on secondary lock and will use the ttl on primary key. */ /** Since v3.0.5 TiDB ignores the ttl on secondary lock and will use the ttl on primary key. */
public class LockResolverClientV3 extends AbstractRegionStoreClient public class LockResolverClientV3 extends AbstractRegionStoreClient
@ -78,7 +78,7 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient
TiRegion region, TiRegion region,
TiStore store, TiStore store,
TikvBlockingStub blockingStub, TikvBlockingStub blockingStub,
TikvStub asyncStub, TikvFutureStub asyncStub,
ChannelFactory channelFactory, ChannelFactory channelFactory,
RegionManager regionManager, RegionManager regionManager,
PDClient pdClient, PDClient pdClient,

View File

@ -47,7 +47,7 @@ import org.tikv.common.util.TsoUtils;
import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub; import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
import org.tikv.txn.exception.TxnNotFoundException; import org.tikv.txn.exception.TxnNotFoundException;
import org.tikv.txn.exception.WriteConflictException; import org.tikv.txn.exception.WriteConflictException;
@ -78,7 +78,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
TiRegion region, TiRegion region,
TiStore store, TiStore store,
TikvBlockingStub blockingStub, TikvBlockingStub blockingStub,
TikvStub asyncStub, TikvFutureStub asyncStub,
ChannelFactory channelFactory, ChannelFactory channelFactory,
RegionManager regionManager, RegionManager regionManager,
PDClient pdClient, PDClient pdClient,