mirror of https://github.com/tikv/client-java.git
Signed-off-by: ti-srebot <ti-srebot@pingcap.com> Signed-off-by: marsishandsome <marsishandsome@gmail.com> Co-authored-by: Liangliang Gu <marsishandsome@gmail.com>
This commit is contained in:
parent
2a04b91be7
commit
d7fa01814b
|
|
@ -92,21 +92,22 @@ public class TiSession implements AutoCloseable {
|
|||
}
|
||||
|
||||
private synchronized void warmUp() {
|
||||
long warmUpStartTime = System.currentTimeMillis();
|
||||
long warmUpStartTime = System.nanoTime();
|
||||
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
|
||||
|
||||
try {
|
||||
this.client = getPDClient();
|
||||
this.regionManager = getRegionManager();
|
||||
List<Metapb.Store> stores = this.client.getAllStores(ConcreteBackOffer.newGetBackOff());
|
||||
List<Metapb.Store> stores = this.client.getAllStores(backOffer);
|
||||
// warm up store cache
|
||||
for (Metapb.Store store : stores) {
|
||||
this.regionManager.updateStore(
|
||||
null,
|
||||
new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId())));
|
||||
null, new TiStore(this.client.getStore(backOffer, store.getId())));
|
||||
}
|
||||
ByteString startKey = ByteString.EMPTY;
|
||||
|
||||
do {
|
||||
TiRegion region = regionManager.getRegionByKey(startKey);
|
||||
TiRegion region = regionManager.getRegionByKey(startKey, backOffer);
|
||||
startKey = region.getEndKey();
|
||||
} while (!startKey.isEmpty());
|
||||
|
||||
|
|
@ -121,7 +122,8 @@ public class TiSession implements AutoCloseable {
|
|||
logger.info("warm up fails, ignored ", e);
|
||||
} finally {
|
||||
logger.info(
|
||||
String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime));
|
||||
String.format(
|
||||
"warm up duration %d ms", (System.nanoTime() - warmUpStartTime) / 1_000_000));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
// onNotLeader is only needed when updateLeader succeeds, thus switch
|
||||
// to a new store address.
|
||||
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
|
||||
retry = newRegion != null && recv.onNotLeader(newRegion);
|
||||
retry = newRegion != null && recv.onNotLeader(newRegion, backOffer);
|
||||
|
||||
backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ public abstract class AbstractRegionStoreClient
|
|||
* @return false when re-split is needed.
|
||||
*/
|
||||
@Override
|
||||
public boolean onNotLeader(TiRegion newRegion) {
|
||||
public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId());
|
||||
}
|
||||
|
|
@ -123,7 +123,7 @@ public abstract class AbstractRegionStoreClient
|
|||
store = null;
|
||||
}
|
||||
region = newRegion;
|
||||
store = regionManager.getStoreById(region.getLeader().getStoreId());
|
||||
store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
|
||||
updateClientStub();
|
||||
return true;
|
||||
}
|
||||
|
|
@ -193,10 +193,10 @@ public abstract class AbstractRegionStoreClient
|
|||
|
||||
logger.info(String.format("try switch leader: region[%d]", region.getId()));
|
||||
|
||||
Metapb.Peer peer = switchLeaderStore();
|
||||
Metapb.Peer peer = switchLeaderStore(backOffer);
|
||||
if (peer != null) {
|
||||
// we found a leader
|
||||
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId());
|
||||
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
|
||||
if (currentLeaderStore.isReachable()) {
|
||||
logger.info(
|
||||
String.format(
|
||||
|
|
@ -232,7 +232,7 @@ public abstract class AbstractRegionStoreClient
|
|||
try {
|
||||
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
|
||||
// when current leader cannot be reached
|
||||
TiStore storeWithProxy = switchProxyStore();
|
||||
TiStore storeWithProxy = switchProxyStore(backOffer);
|
||||
if (storeWithProxy == null) {
|
||||
// no store available, retry
|
||||
logger.warn(String.format("No store available, retry: region[%d]", region.getId()));
|
||||
|
|
@ -250,11 +250,11 @@ public abstract class AbstractRegionStoreClient
|
|||
}
|
||||
|
||||
// first: leader peer, second: true if any responses returned with grpc error
|
||||
private Metapb.Peer switchLeaderStore() {
|
||||
private Metapb.Peer switchLeaderStore(BackOffer backOffer) {
|
||||
List<SwitchLeaderTask> responses = new LinkedList<>();
|
||||
for (Metapb.Peer peer : region.getFollowerList()) {
|
||||
ByteString key = region.getStartKey();
|
||||
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
|
||||
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
|
||||
ManagedChannel channel =
|
||||
channelFactory.getChannel(
|
||||
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
|
||||
|
|
@ -300,12 +300,12 @@ public abstract class AbstractRegionStoreClient
|
|||
}
|
||||
}
|
||||
|
||||
private TiStore switchProxyStore() {
|
||||
private TiStore switchProxyStore(BackOffer backOffer) {
|
||||
long forwardTimeout = conf.getForwardTimeout();
|
||||
List<ForwardCheckTask> responses = new LinkedList<>();
|
||||
for (Metapb.Peer peer : region.getFollowerList()) {
|
||||
ByteString key = region.getStartKey();
|
||||
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
|
||||
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
|
||||
ManagedChannel channel =
|
||||
channelFactory.getChannel(
|
||||
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ package org.tikv.common.region;
|
|||
import org.tikv.common.util.BackOffer;
|
||||
|
||||
public interface RegionErrorReceiver {
|
||||
boolean onNotLeader(TiRegion region);
|
||||
boolean onNotLeader(TiRegion region, BackOffer backOffer);
|
||||
|
||||
/// return whether we need to retry this request.
|
||||
boolean onStoreUnreachable(BackOffer backOffer);
|
||||
|
|
|
|||
|
|
@ -274,7 +274,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
|||
boolean forWrite = false;
|
||||
while (true) {
|
||||
// we should refresh region
|
||||
region = regionManager.getRegionByKey(startKey);
|
||||
region = regionManager.getRegionByKey(startKey, backOffer);
|
||||
|
||||
Supplier<ScanRequest> request =
|
||||
() ->
|
||||
|
|
|
|||
|
|
@ -168,8 +168,6 @@ public class ConcreteBackOffer implements BackOffer {
|
|||
}
|
||||
|
||||
public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) {
|
||||
SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name());
|
||||
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
|
||||
BackOffFunction backOffFunction =
|
||||
backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);
|
||||
|
||||
|
|
@ -185,6 +183,8 @@ public class ConcreteBackOffer implements BackOffer {
|
|||
}
|
||||
}
|
||||
|
||||
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
|
||||
SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name());
|
||||
try {
|
||||
Thread.sleep(sleep);
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue