From b60afd0b97ea110ab3aac97727976aece443bd3d Mon Sep 17 00:00:00 2001 From: birdstorm Date: Fri, 19 Nov 2021 09:15:04 +0800 Subject: [PATCH] Optimize grpc forward and switch leader logic (#324) --- .../org/tikv/common/AbstractGRPCClient.java | 14 +- src/main/java/org/tikv/common/PDClient.java | 14 +- .../java/org/tikv/common/TiConfiguration.java | 4 +- .../tikv/common/operation/KVErrorHandler.java | 2 - .../common/operation/RegionErrorHandler.java | 6 - .../region/AbstractRegionStoreClient.java | 433 ++++++++++-------- .../common/region/RegionErrorReceiver.java | 2 - .../tikv/common/region/RegionStoreClient.java | 12 +- .../common/region/StoreHealthyChecker.java | 11 +- .../java/org/tikv/common/region/TiRegion.java | 4 + .../java/org/tikv/common/region/TiStore.java | 29 +- .../tikv/common/util/ConcreteBackOffer.java | 2 +- .../tikv/txn/AbstractLockResolverClient.java | 2 +- .../org/tikv/txn/LockResolverClientV2.java | 4 +- .../org/tikv/txn/LockResolverClientV3.java | 4 +- .../org/tikv/txn/LockResolverClientV4.java | 4 +- 16 files changed, 289 insertions(+), 258 deletions(-) diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index 813b98629a..dd886eb070 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -23,6 +23,7 @@ import io.grpc.MethodDescriptor; import io.grpc.health.v1.HealthCheckRequest; import io.grpc.health.v1.HealthCheckResponse; import io.grpc.health.v1.HealthGrpc; +import io.grpc.stub.AbstractFutureStub; import io.grpc.stub.AbstractStub; import io.grpc.stub.ClientCalls; import io.grpc.stub.StreamObserver; @@ -38,14 +39,15 @@ import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; public abstract class AbstractGRPCClient< - BlockingStubT extends AbstractStub, StubT extends AbstractStub> + BlockingStubT extends AbstractStub, + FutureStubT extends AbstractFutureStub> implements AutoCloseable { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); protected final ChannelFactory channelFactory; protected TiConfiguration conf; protected long timeout; protected BlockingStubT blockingStub; - protected StubT asyncStub; + protected FutureStubT asyncStub; protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) { this.conf = conf; @@ -57,7 +59,7 @@ public abstract class AbstractGRPCClient< TiConfiguration conf, ChannelFactory channelFactory, BlockingStubT blockingStub, - StubT asyncStub) { + FutureStubT asyncStub) { this.conf = conf; this.timeout = conf.getTimeout(); this.channelFactory = channelFactory; @@ -109,7 +111,7 @@ public abstract class AbstractGRPCClient< .create(handler) .callWithRetry( () -> { - StubT stub = getAsyncStub(); + FutureStubT stub = getAsyncStub(); ClientCalls.asyncUnaryCall( stub.getChannel().newCall(method, stub.getCallOptions()), requestFactory.get(), @@ -133,7 +135,7 @@ public abstract class AbstractGRPCClient< .create(handler) .callWithRetry( () -> { - StubT stub = getAsyncStub(); + FutureStubT stub = getAsyncStub(); return asyncBidiStreamingCall( stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver); }, @@ -175,7 +177,7 @@ public abstract class AbstractGRPCClient< protected abstract BlockingStubT getBlockingStub(); - protected abstract StubT getAsyncStub(); + protected abstract FutureStubT getAsyncStub(); protected boolean checkHealth(String addressStr, HostMapping hostMapping) { ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 24125f17d6..59e16a5c5a 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -69,7 +69,7 @@ import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.PDGrpc; import org.tikv.kvproto.PDGrpc.PDBlockingStub; -import org.tikv.kvproto.PDGrpc.PDStub; +import org.tikv.kvproto.PDGrpc.PDFutureStub; import org.tikv.kvproto.Pdpb; import org.tikv.kvproto.Pdpb.Error; import org.tikv.kvproto.Pdpb.ErrorType; @@ -92,7 +92,7 @@ import org.tikv.kvproto.Pdpb.Timestamp; import org.tikv.kvproto.Pdpb.TsoRequest; import org.tikv.kvproto.Pdpb.TsoResponse; -public class PDClient extends AbstractGRPCClient +public class PDClient extends AbstractGRPCClient implements ReadOnlyPDClient { private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync"; private static final long MIN_TRY_UPDATE_DURATION = 50; @@ -550,7 +550,7 @@ public class PDClient extends AbstractGRPCClient } @Override - protected PDStub getAsyncStub() { + protected PDFutureStub getAsyncStub() { if (pdClientWrapper == null) { throw new GrpcException("PDClient may not be initialized"); } @@ -644,7 +644,7 @@ public class PDClient extends AbstractGRPCClient static class PDClientWrapper { private final String leaderInfo; private final PDBlockingStub blockingStub; - private final PDStub asyncStub; + private final PDFutureStub asyncStub; private final long createTime; private final String storeAddress; @@ -655,10 +655,10 @@ public class PDClient extends AbstractGRPCClient header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString()); this.blockingStub = MetadataUtils.attachHeaders(PDGrpc.newBlockingStub(clientChannel), header); - this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newStub(clientChannel), header); + this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newFutureStub(clientChannel), header); } else { this.blockingStub = PDGrpc.newBlockingStub(clientChannel); - this.asyncStub = PDGrpc.newStub(clientChannel); + this.asyncStub = PDGrpc.newFutureStub(clientChannel); } this.leaderInfo = leaderInfo; this.storeAddress = storeAddress; @@ -677,7 +677,7 @@ public class PDClient extends AbstractGRPCClient return blockingStub; } - PDStub getAsyncStub() { + PDFutureStub getAsyncStub() { return asyncStub; } diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 0d2a65580d..614cfbdeaa 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -35,9 +35,9 @@ public class TiConfiguration implements Serializable { private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class); private static final ConcurrentHashMap settings = new ConcurrentHashMap<>(); - public static final Metadata.Key FORWARD_META_DATA_KEY = + public static final Metadata.Key FORWARD_META_DATA_KEY = Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); - public static final Metadata.Key PD_FORWARD_META_DATA_KEY = + public static final Metadata.Key PD_FORWARD_META_DATA_KEY = Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); static { diff --git a/src/main/java/org/tikv/common/operation/KVErrorHandler.java b/src/main/java/org/tikv/common/operation/KVErrorHandler.java index dfbe24c5a6..3321b56074 100644 --- a/src/main/java/org/tikv/common/operation/KVErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/KVErrorHandler.java @@ -94,8 +94,6 @@ public class KVErrorHandler implements ErrorHandler { Errorpb.Error error = regionHandler.getRegionError(resp); if (error != null) { return regionHandler.handleRegionError(backOffer, error); - } else { - regionHandler.tryUpdateRegionStore(); } // Key error handling logic diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index cec220440c..230ceedaa3 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -46,16 +46,10 @@ public class RegionErrorHandler implements ErrorHandler { Errorpb.Error error = getRegionError(resp); if (error != null) { return handleRegionError(backOffer, error); - } else { - tryUpdateRegionStore(); } return false; } - public void tryUpdateRegionStore() { - recv.tryUpdateRegionStore(); - } - public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { if (error.hasNotLeader()) { // this error is reported from raftstore: diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 57551f1e2c..96e620279a 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -20,34 +20,47 @@ package org.tikv.common.region; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.stub.MetadataUtils; +import io.prometheus.client.Histogram; +import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.Pair; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; public abstract class AbstractRegionStoreClient - extends AbstractGRPCClient + extends AbstractGRPCClient implements RegionErrorReceiver { private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class); + public static final Histogram SEEK_LEADER_STORE_DURATION = + Histogram.build() + .name("client_java_seek_leader_store_duration") + .help("seek leader store duration.") + .register(); + + public static final Histogram SEEK_PROXY_STORE_DURATION = + Histogram.build() + .name("client_java_seek_proxy_store_duration") + .help("seek proxy store duration.") + .register(); + protected final RegionManager regionManager; protected TiRegion region; - protected TiStore targetStore; - protected TiStore originStore; - private long retryForwardTimes; - private long retryLeaderTimes; - private Metapb.Peer candidateLeader; + protected TiStore store; protected AbstractRegionStoreClient( TiConfiguration conf, @@ -55,7 +68,7 @@ public abstract class AbstractRegionStoreClient TiStore store, ChannelFactory channelFactory, TikvGrpc.TikvBlockingStub blockingStub, - TikvGrpc.TikvStub asyncStub, + TikvGrpc.TikvFutureStub asyncStub, RegionManager regionManager) { super(conf, channelFactory, blockingStub, asyncStub); checkNotNull(region, "Region is empty"); @@ -63,14 +76,10 @@ public abstract class AbstractRegionStoreClient checkArgument(region.getLeader() != null, "Leader Peer is null"); this.region = region; this.regionManager = regionManager; - this.targetStore = store; - this.originStore = null; - this.candidateLeader = null; - this.retryForwardTimes = 0; - this.retryLeaderTimes = 0; - if (this.targetStore.getProxyStore() != null) { + this.store = store; + if (this.store.getProxyStore() != null) { this.timeout = conf.getForwardTimeout(); - } else if (!this.targetStore.isReachable() && !this.targetStore.canForwardFirst()) { + } else if (!this.store.isReachable()) { onStoreUnreachable(); } } @@ -86,7 +95,7 @@ public abstract class AbstractRegionStoreClient } @Override - protected TikvGrpc.TikvStub getAsyncStub() { + protected TikvGrpc.TikvFutureStub getAsyncStub() { return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); } @@ -110,215 +119,271 @@ public abstract class AbstractRegionStoreClient return false; } - // If we try one peer but find the leader has not changed, we do not need try other peers. - if (candidateLeader != null - && region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) { - retryLeaderTimes = newRegion.getFollowerList().size(); - originStore = null; + // If we try one peer but find the leader has not changed, we do not need to try other peers. + if (region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) { + store = null; } - candidateLeader = null; region = newRegion; - targetStore = regionManager.getStoreById(region.getLeader().getStoreId()); + store = regionManager.getStoreById(region.getLeader().getStoreId()); updateClientStub(); return true; } @Override public boolean onStoreUnreachable() { - if (!targetStore.isValid()) { - logger.warn(String.format("store [%d] has been invalid", targetStore.getId())); - targetStore = regionManager.getStoreById(targetStore.getId()); + if (!store.isValid()) { + logger.warn(String.format("store [%d] has been invalid", store.getId())); + store = regionManager.getStoreById(store.getId()); updateClientStub(); return true; } - if (targetStore.getProxyStore() == null) { - if (targetStore.isReachable()) { + if (store.getProxyStore() == null && store.isReachable()) { + if (store.isReachable()) { + logger.info( + String.format( + "store[%d] for region[%d] is reachable, retry", store.getId(), region.getId())); return true; } } - // If this store has failed to forward request too many times, we shall try other peer at first - // so that we can - // reduce the latency cost by fail requests. - if (targetStore.canForwardFirst()) { - if (retryOtherStoreByProxyForward()) { - return true; - } - if (retryOtherStoreLeader()) { - return true; - } - } else { - if (retryOtherStoreLeader()) { - return true; - } - if (retryOtherStoreByProxyForward()) { - return true; - } + // seek an available leader store to send request + Boolean result = seekLeaderStore(); + if (result != null) { + return result; + } + if (conf.getEnableGrpcForward()) { + // seek an available proxy store to forward request + return seekProxyStore(); } - logger.warn( - String.format( - "retry time exceed for region[%d], invalid store[%d]", - region.getId(), targetStore.getId())); - regionManager.onRequestFail(region); return false; } protected Kvrpcpb.Context makeContext(TiStoreType storeType) { - if (candidateLeader != null && storeType == TiStoreType.TiKV) { - return region.getReplicaContext(candidateLeader, java.util.Collections.emptySet()); - } else { - return region.getReplicaContext(java.util.Collections.emptySet(), storeType); - } + return region.getReplicaContext(java.util.Collections.emptySet(), storeType); } protected Kvrpcpb.Context makeContext(Set resolvedLocks, TiStoreType storeType) { - if (candidateLeader != null && storeType == TiStoreType.TiKV) { - return region.getReplicaContext(candidateLeader, resolvedLocks); - } else { - return region.getReplicaContext(resolvedLocks, storeType); - } - } - - @Override - public void tryUpdateRegionStore() { - if (originStore != null) { - if (originStore.getId() == targetStore.getId()) { - logger.warn( - String.format( - "update store [%s] by proxy-store [%s]", - targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress())); - // We do not need to mark the store can-forward, because if one store has grpc forward - // successfully, it will - // create a new store object, which is can-forward. - regionManager.updateStore(originStore, targetStore); - } else { - originStore.forwardFail(); - } - } - if (candidateLeader != null) { - logger.warn( - String.format( - "update leader to store [%d] for region[%d]", - candidateLeader.getStoreId(), region.getId())); - this.regionManager.updateLeader(region, candidateLeader.getStoreId()); - } - } - - private boolean retryOtherStoreLeader() { - List peers = region.getFollowerList(); - if (retryLeaderTimes >= peers.size()) { - return false; - } - retryLeaderTimes += 1; - boolean hasVisitedStore = false; - for (Metapb.Peer cur : peers) { - if (candidateLeader == null || hasVisitedStore) { - TiStore store = regionManager.getStoreById(cur.getStoreId()); - if (store != null && store.isReachable()) { - targetStore = store; - candidateLeader = cur; - logger.warn( - String.format( - "try store [%d],peer[%d] for region[%d], which may be new leader", - targetStore.getId(), candidateLeader.getId(), region.getId())); - updateClientStub(); - return true; - } else { - continue; - } - } else if (candidateLeader.getId() == cur.getId()) { - hasVisitedStore = true; - } - } - candidateLeader = null; - retryLeaderTimes = peers.size(); - return false; + return region.getReplicaContext(resolvedLocks, storeType); } private void updateClientStub() { - String addressStr = targetStore.getStore().getAddress(); - if (targetStore.getProxyStore() != null) { - addressStr = targetStore.getProxyStore().getAddress(); + String addressStr = store.getStore().getAddress(); + long deadline = timeout; + if (store.getProxyStore() != null) { + addressStr = store.getProxyStore().getAddress(); + deadline = conf.getForwardTimeout(); } ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); - blockingStub = TikvGrpc.newBlockingStub(channel); - asyncStub = TikvGrpc.newStub(channel); - if (targetStore.getProxyStore() != null) { + blockingStub = + TikvGrpc.newBlockingStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); + asyncStub = TikvGrpc.newFutureStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); + if (store.getProxyStore() != null) { Metadata header = new Metadata(); - header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress()); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); blockingStub = MetadataUtils.attachHeaders(blockingStub, header); asyncStub = MetadataUtils.attachHeaders(asyncStub, header); } } - private boolean retryOtherStoreByProxyForward() { - if (!conf.getEnableGrpcForward()) { - return false; - } - if (retryForwardTimes >= region.getFollowerList().size()) { - // If we try to forward request to leader by follower failed, it means that the store of old - // leader may be - // unavailable but the new leader has not been report to PD. So we can ban this store for a - // short time to - // avoid too many request try forward rather than try other peer. - if (originStore != null) { - originStore.forwardFail(); + private Boolean seekLeaderStore() { + Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer(); + try { + List peers = region.getFollowerList(); + if (peers.isEmpty()) { + // no followers available, retry + logger.warn(String.format("no followers of region[%d] available, retry", region.getId())); + regionManager.onRequestFail(region); + return false; } - return false; - } - TiStore proxyStore = switchProxyStore(); - if (proxyStore == null) { - logger.warn( - String.format( - "no forward store can be selected for store [%s] and region[%d]", - targetStore.getStore().getAddress(), region.getId())); - if (originStore != null) { - originStore.forwardFail(); - } else { - targetStore.forwardFail(); - } - return false; - } - if (originStore == null) { - originStore = targetStore; - if (this.targetStore.getProxyStore() != null) { - this.timeout = conf.getForwardTimeout(); - } - } - targetStore = proxyStore; - retryForwardTimes += 1; - updateClientStub(); - logger.warn( - String.format( - "forward request to store [%s] by store [%s] for region[%d]", - targetStore.getStore().getAddress(), - targetStore.getProxyStore().getAddress(), - region.getId())); - return true; - } - private TiStore switchProxyStore() { - boolean hasVisitedStore = false; - List peers = region.getFollowerList(); - if (peers.isEmpty()) { - return null; - } - Metapb.Store proxyStore = targetStore.getProxyStore(); - if (proxyStore == null || peers.get(peers.size() - 1).getStoreId() == proxyStore.getId()) { - hasVisitedStore = true; - } - for (Metapb.Peer peer : peers) { - if (hasVisitedStore) { - TiStore store = regionManager.getStoreById(peer.getStoreId()); - if (store.isReachable()) { - return targetStore.withProxy(store.getStore()); + logger.info(String.format("try switch leader: region[%d]", region.getId())); + + Pair pair = switchLeaderStore(); + Metapb.Peer peer = pair.first; + boolean exceptionEncountered = pair.second; + if (peer == null) { + if (!exceptionEncountered) { + // all response returned normally, the leader is not elected, just wait until it is ready. + logger.info( + String.format( + "leader for region[%d] is not elected, just wait until it is ready", + region.getId())); + return true; + } else { + // no leader found, some response does not return normally, there may be network + // partition. + logger.warn( + String.format( + "leader for region[%d] is not found, it is possible that network partition occurred", + region.getId())); + } + } else { + // we found a leader + TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); + if (currentLeaderStore.isReachable()) { + logger.info( + String.format( + "update leader using switchLeader logic from store[%d] to store[%d]", + region.getLeader().getStoreId(), peer.getStoreId())); + // update region cache + region = regionManager.updateLeader(region, peer.getStoreId()); + // switch to leader store + store = currentLeaderStore; + updateClientStub(); + return true; } - } else if (peer.getStoreId() == proxyStore.getId()) { - hasVisitedStore = true; } + } finally { + switchLeaderDurationTimer.observeDuration(); } return null; } + + private boolean seekProxyStore() { + Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer(); + try { + logger.info(String.format("try grpc forward: region[%d]", region.getId())); + // when current leader cannot be reached + TiStore storeWithProxy = switchProxyStore(); + if (storeWithProxy == null) { + // no store available, retry + logger.warn(String.format("No store available, retry: region[%d]", region.getId())); + return false; + } + // use proxy store to forward requests + regionManager.updateStore(store, storeWithProxy); + store = storeWithProxy; + updateClientStub(); + return true; + } finally { + grpcForwardDurationTimer.observeDuration(); + } + } + + // first: leader peer, second: true if any responses returned with grpc error + private Pair switchLeaderStore() { + List responses = new LinkedList<>(); + for (Metapb.Peer peer : region.getFollowerList()) { + ByteString key = region.getStartKey(); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); + ManagedChannel channel = + channelFactory.getChannel( + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(peer)) + .setKey(key) + .build(); + ListenableFuture task = stub.rawGet(rawGetRequest); + responses.add(new SwitchLeaderTask(task, peer)); + } + boolean exceptionEncountered = false; + while (true) { + try { + Thread.sleep(2); + } catch (InterruptedException e) { + throw new GrpcException(e); + } + List unfinished = new LinkedList<>(); + for (SwitchLeaderTask task : responses) { + if (!task.task.isDone()) { + unfinished.add(task); + continue; + } + try { + Kvrpcpb.RawGetResponse resp = task.task.get(); + if (resp != null) { + if (!resp.hasRegionError()) { + // the peer is leader + logger.info( + String.format("rawGet response indicates peer[%d] is leader", task.peer.getId())); + return Pair.create(task.peer, exceptionEncountered); + } + } + } catch (Exception ignored) { + exceptionEncountered = true; + } + } + if (unfinished.isEmpty()) { + return Pair.create(null, exceptionEncountered); + } + responses = unfinished; + } + } + + private TiStore switchProxyStore() { + long forwardTimeout = conf.getForwardTimeout(); + List responses = new LinkedList<>(); + for (Metapb.Peer peer : region.getFollowerList()) { + ByteString key = region.getStartKey(); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); + ManagedChannel channel = + channelFactory.getChannel( + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS); + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(peer)) + .setKey(key) + .build(); + ListenableFuture task = + MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); + responses.add(new ForwardCheckTask(task, peerStore.getStore())); + } + while (true) { + try { + Thread.sleep(2); + } catch (InterruptedException e) { + throw new GrpcException(e); + } + List unfinished = new LinkedList<>(); + for (ForwardCheckTask task : responses) { + if (!task.task.isDone()) { + unfinished.add(task); + continue; + } + try { + // any answer will do + Kvrpcpb.RawGetResponse resp = task.task.get(); + logger.info( + String.format( + "rawGetResponse indicates forward from [%s] to [%s]", + task.store.getAddress(), store.getAddress())); + return store.withProxy(task.store); + } catch (Exception ignored) { + } + } + if (unfinished.isEmpty()) { + return null; + } + responses = unfinished; + } + } + + private static class SwitchLeaderTask { + private final ListenableFuture task; + private final Metapb.Peer peer; + + private SwitchLeaderTask(ListenableFuture task, Metapb.Peer peer) { + this.task = task; + this.peer = peer; + } + } + + private static class ForwardCheckTask { + private final ListenableFuture task; + private final Metapb.Store store; + + private ForwardCheckTask(ListenableFuture task, Metapb.Store store) { + this.task = task; + this.store = store; + } + } } diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index 0a5bcabf03..4bee1356ea 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -23,7 +23,5 @@ public interface RegionErrorReceiver { /// return whether we need to retry this request. boolean onStoreUnreachable(); - void tryUpdateRegionStore(); - TiRegion getRegion(); } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index c747d9216c..c31ea5345a 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -48,7 +48,7 @@ import org.tikv.kvproto.Kvrpcpb.*; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; -import org.tikv.kvproto.TikvGrpc.TikvStub; +import org.tikv.kvproto.TikvGrpc.TikvFutureStub; import org.tikv.txn.AbstractLockResolverClient; import org.tikv.txn.Lock; import org.tikv.txn.ResolveLockResult; @@ -93,7 +93,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { TiStoreType storeType, ChannelFactory channelFactory, TikvBlockingStub blockingStub, - TikvStub asyncStub, + TikvFutureStub asyncStub, RegionManager regionManager, PDClient pdClient, RegionStoreClient.RegionStoreClientBuilder clientBuilder) { @@ -124,7 +124,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); TikvBlockingStub tikvBlockingStub = TikvGrpc.newBlockingStub(channel); - TikvStub tikvAsyncStub = TikvGrpc.newStub(channel); + TikvGrpc.TikvFutureStub tikvAsyncStub = TikvGrpc.newFutureStub(channel); this.lockResolverClient = AbstractLockResolverClient.getInstance( @@ -1246,7 +1246,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { ManagedChannel channel = null; TikvBlockingStub blockingStub = null; - TikvStub asyncStub = null; + TikvFutureStub asyncStub = null; if (conf.getEnableGrpcForward() && store.getProxyStore() != null && !store.isReachable()) { addressStr = store.getProxyStore().getAddress(); @@ -1255,11 +1255,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient { Metadata header = new Metadata(); header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); - asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); + asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newFutureStub(channel), header); } else { channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); - asyncStub = TikvGrpc.newStub(channel); + asyncStub = TikvGrpc.newFutureStub(channel); } return new RegionStoreClient( diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java index fbc75cb534..546c08a969 100644 --- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -19,12 +19,12 @@ import org.tikv.kvproto.Metapb; public class StoreHealthyChecker implements Runnable { private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class); private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60; - private BlockingQueue taskQueue; + private final BlockingQueue taskQueue; private final ChannelFactory channelFactory; private final ReadOnlyPDClient pdClient; private final RegionCache cache; private long checkTombstoneTick; - private long timeout; + private final long timeout; public StoreHealthyChecker( ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache, long timeout) { @@ -117,14 +117,9 @@ public class StoreHealthyChecker implements Runnable { } } else { if (!store.isReachable()) { - logger.warn( - String.format( - "store [%s] recovers to be reachable and canforward", store.getAddress())); + logger.warn(String.format("store [%s] recovers to be reachable", store.getAddress())); store.markReachable(); } - if (!store.canForwardFirst()) { - store.makrCanForward(); - } } } else if (store.isReachable()) { unreachableStore.add(store); diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 0bb934e425..c02124126d 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -159,6 +159,10 @@ public class TiRegion implements Serializable { return getContext(currentPeer, resolvedLocks, false); } + public Kvrpcpb.Context getReplicaContext(Peer currentPeer) { + return getContext(currentPeer, java.util.Collections.emptySet(), false); + } + private Kvrpcpb.Context getContext( Peer currentPeer, Set resolvedLocks, boolean replicaRead) { diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index a6346f44c3..dde79975d9 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -1,25 +1,19 @@ package org.tikv.common.region; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.tikv.kvproto.Metapb; public class TiStore { - private static long MAX_FAIL_FORWARD_TIMES = 4; private final Metapb.Store store; private final Metapb.Store proxyStore; - private AtomicBoolean reachable; - private AtomicBoolean valid; - private AtomicLong failForwardCount; - private AtomicBoolean canForward; + private final AtomicBoolean reachable; + private final AtomicBoolean valid; public TiStore(Metapb.Store store) { this.store = store; this.reachable = new AtomicBoolean(true); this.valid = new AtomicBoolean(true); - this.canForward = new AtomicBoolean(true); this.proxyStore = null; - this.failForwardCount = new AtomicLong(0); } private TiStore(Metapb.Store store, Metapb.Store proxyStore) { @@ -30,9 +24,7 @@ public class TiStore { this.reachable = new AtomicBoolean(true); } this.valid = new AtomicBoolean(true); - this.canForward = new AtomicBoolean(true); this.proxyStore = proxyStore; - this.failForwardCount = new AtomicLong(0); } @java.lang.Override @@ -81,23 +73,6 @@ public class TiStore { this.valid.set(false); } - public void forwardFail() { - if (this.canForward.get()) { - if (this.failForwardCount.addAndGet(1) >= MAX_FAIL_FORWARD_TIMES) { - this.canForward.set(false); - } - } - } - - public void makrCanForward() { - this.failForwardCount.set(0); - this.canForward.set(true); - } - - public boolean canForwardFirst() { - return this.canForward.get(); - } - public Metapb.Store getStore() { return this.store; } diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index 173109a1cb..141bb5ed76 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -131,7 +131,7 @@ public class ConcreteBackOffer implements BackOffer { backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter); break; case BoTiKVRPC: - backOffFunction = BackOffFunction.create(100, 400, BackOffStrategy.EqualJitter); + backOffFunction = BackOffFunction.create(10, 400, BackOffStrategy.EqualJitter); break; case BoTxnNotFound: backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter); diff --git a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java index 020068e272..7b4ed1c212 100644 --- a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java +++ b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java @@ -71,7 +71,7 @@ public interface AbstractLockResolverClient { TiRegion region, TiStore store, TikvGrpc.TikvBlockingStub blockingStub, - TikvGrpc.TikvStub asyncStub, + TikvGrpc.TikvFutureStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient, diff --git a/src/main/java/org/tikv/txn/LockResolverClientV2.java b/src/main/java/org/tikv/txn/LockResolverClientV2.java index 3df5966abd..2003309040 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV2.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV2.java @@ -52,7 +52,7 @@ import org.tikv.kvproto.Kvrpcpb.ResolveLockRequest; import org.tikv.kvproto.Kvrpcpb.ResolveLockResponse; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; -import org.tikv.kvproto.TikvGrpc.TikvStub; +import org.tikv.kvproto.TikvGrpc.TikvFutureStub; /** Before v3.0.5 TiDB uses the ttl on secondary lock. */ public class LockResolverClientV2 extends AbstractRegionStoreClient @@ -77,7 +77,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient TiRegion region, TiStore store, TikvBlockingStub blockingStub, - TikvStub asyncStub, + TikvFutureStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager) { super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager); diff --git a/src/main/java/org/tikv/txn/LockResolverClientV3.java b/src/main/java/org/tikv/txn/LockResolverClientV3.java index 0b8d3c89a8..b15fa78437 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV3.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV3.java @@ -49,7 +49,7 @@ import org.tikv.kvproto.Kvrpcpb.CleanupRequest; import org.tikv.kvproto.Kvrpcpb.CleanupResponse; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; -import org.tikv.kvproto.TikvGrpc.TikvStub; +import org.tikv.kvproto.TikvGrpc.TikvFutureStub; /** Since v3.0.5 TiDB ignores the ttl on secondary lock and will use the ttl on primary key. */ public class LockResolverClientV3 extends AbstractRegionStoreClient @@ -78,7 +78,7 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient TiRegion region, TiStore store, TikvBlockingStub blockingStub, - TikvStub asyncStub, + TikvFutureStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient, diff --git a/src/main/java/org/tikv/txn/LockResolverClientV4.java b/src/main/java/org/tikv/txn/LockResolverClientV4.java index 07a5552f0f..6acc51313c 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV4.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV4.java @@ -47,7 +47,7 @@ import org.tikv.common.util.TsoUtils; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; -import org.tikv.kvproto.TikvGrpc.TikvStub; +import org.tikv.kvproto.TikvGrpc.TikvFutureStub; import org.tikv.txn.exception.TxnNotFoundException; import org.tikv.txn.exception.WriteConflictException; @@ -78,7 +78,7 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient TiRegion region, TiStore store, TikvBlockingStub blockingStub, - TikvStub asyncStub, + TikvFutureStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient,