diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 1ffe00c5d4..5938d56a39 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -55,10 +55,10 @@ public class ConfigUtils { public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; - public static final String DEF_TIMEOUT = "150ms"; + public static final String DEF_TIMEOUT = "300ms"; public static final String DEF_FORWARD_TIMEOUT = "600ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; - public static final int DEF_CHECK_HEALTH_TIMEOUT = 40; + public static final int DEF_CHECK_HEALTH_TIMEOUT = 100; public static final int DEF_SCAN_BATCH_SIZE = 10240; public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000; diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 9de9d857b3..fcd9e5ef57 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -93,6 +93,7 @@ import org.tikv.kvproto.Pdpb.TsoResponse; public class PDClient extends AbstractGRPCClient implements ReadOnlyPDClient { private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync"; + private static final long MIN_TRY_UPDATE_DURATION = 50; private final Logger logger = LoggerFactory.getLogger(PDClient.class); private RequestHeader header; private TsoRequest tsoReq; @@ -103,6 +104,7 @@ public class PDClient extends AbstractGRPCClient private Client etcdClient; private ConcurrentMap tiflashReplicaMap; private HostMapping hostMapping; + private long lastUpdateLeaderTime; public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY = Histogram.build() @@ -392,6 +394,9 @@ public class PDClient extends AbstractGRPCClient } public synchronized void updateLeaderOrforwardFollower() { + if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) { + return; + } for (URI url : this.pdAddrs) { // since resp is null, we need update leader's address by walking through all pd server. GetMembersResponse resp = getMembers(url); @@ -407,6 +412,7 @@ public class PDClient extends AbstractGRPCClient // if leader is switched, just return. if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { + lastUpdateLeaderTime = System.currentTimeMillis(); return; } @@ -441,6 +447,7 @@ public class PDClient extends AbstractGRPCClient } } } + lastUpdateLeaderTime = System.currentTimeMillis(); if (pdClientWrapper == null) { throw new TiClientInternalException( "already tried all address on file, but not leader found yet."); @@ -470,6 +477,7 @@ public class PDClient extends AbstractGRPCClient return; } } + lastUpdateLeaderTime = System.currentTimeMillis(); if (pdClientWrapper == null) { throw new TiClientInternalException( "already tried all address on file, but not leader found yet."); diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 32b0a3cf2e..608f631d91 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -36,7 +36,6 @@ public class RegionErrorHandler implements ErrorHandler { public boolean handleResponseError(BackOffer backOffer, RespT resp) { if (resp == null) { String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion()); - logger.warn(msg); return handleRequestError(backOffer, new GrpcException(msg)); } // Region error handling logic @@ -171,6 +170,7 @@ public class RegionErrorHandler implements ErrorHandler { return true; } + logger.warn("request failed because of: " + e.getMessage()); backOffer.doBackOff( BackOffFunction.BackOffFuncType.BoTiKVRPC, new GrpcException( diff --git a/src/main/java/org/tikv/common/policy/RetryPolicy.java b/src/main/java/org/tikv/common/policy/RetryPolicy.java index 4c78f66a30..27d8558c7b 100644 --- a/src/main/java/org/tikv/common/policy/RetryPolicy.java +++ b/src/main/java/org/tikv/common/policy/RetryPolicy.java @@ -79,6 +79,8 @@ public abstract class RetryPolicy { if (retry) { GRPC_REQUEST_RETRY_NUM.labels(methodName).inc(); continue; + } else { + return result; } } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 4748a51070..49740c56fe 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -105,6 +105,7 @@ public abstract class AbstractRegionStoreClient } region = newRegion; targetStore = regionManager.getStoreById(region.getLeader().getStoreId()); + originStore = null; String addressStr = targetStore.getStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); @@ -128,7 +129,7 @@ public abstract class AbstractRegionStoreClient } else if (retryTimes > region.getFollowerList().size()) { logger.warn( String.format( - "retry time exceed for region[%d], invalid this region and store[%d]", + "retry time exceed for region[%d], invalid this region[%d]", region.getId(), targetStore.getId())); regionManager.onRequestFail(region); return false; @@ -139,6 +140,7 @@ public abstract class AbstractRegionStoreClient String.format( "no forward store can be selected for store [%s] and region[%d]", targetStore.getStore().getAddress(), region.getId())); + regionManager.onRequestFail(region); return false; } if (originStore == null) { @@ -168,6 +170,10 @@ public abstract class AbstractRegionStoreClient @Override protected void tryUpdateProxy() { if (originStore != null) { + logger.warn( + String.format( + "update store [%s] by proxy-store [%s]", + targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress())); regionManager.updateStore(originStore, targetStore); } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 662b8460e5..a565b5cce4 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -55,6 +55,8 @@ public class RegionManager { // TODO: the region cache logic need rewrite. // https://github.com/pingcap/tispark/issues/1170 private final RegionCache cache; + private final ReadOnlyPDClient pdClient; + private final TiConfiguration conf; private final ScheduledExecutorService executor; private final UnreachableStoreChecker storeChecker; @@ -72,7 +74,9 @@ public class RegionManager { TiConfiguration conf, ReadOnlyPDClient pdClient, Function cacheInvalidateCallback) { - this.cache = new RegionCache(conf, pdClient); + this.cache = new RegionCache(); + this.pdClient = pdClient; + this.conf = conf; this.cacheInvalidateCallback = cacheInvalidateCallback; this.executor = null; this.storeChecker = null; @@ -84,8 +88,11 @@ public class RegionManager { Function cacheInvalidateCallback, ChannelFactory channelFactory, boolean enableGrpcForward) { - this.cache = new RegionCache(conf, pdClient); + this.cache = new RegionCache(); this.cacheInvalidateCallback = cacheInvalidateCallback; + this.pdClient = pdClient; + this.conf = conf; + if (enableGrpcForward) { UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient); this.storeChecker = storeChecker; @@ -98,7 +105,9 @@ public class RegionManager { } public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { - this.cache = new RegionCache(conf, pdClient); + this.cache = new RegionCache(); + this.pdClient = pdClient; + this.conf = conf; this.cacheInvalidateCallback = null; this.storeChecker = null; this.executor = null; @@ -115,7 +124,7 @@ public class RegionManager { } public ReadOnlyPDClient getPDClient() { - return this.cache.pdClient; + return this.pdClient; } public TiRegion getRegionByKey(ByteString key) { @@ -123,7 +132,14 @@ public class RegionManager { } public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { - return cache.getRegionByKey(key, backOffer); + TiRegion region = cache.getRegionByKey(key, backOffer); + if (region == null) { + logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key)); + Pair regionAndLeader = pdClient.getRegionByKey(backOffer, key); + region = + cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer)); + } + return region; } @Deprecated @@ -134,7 +150,15 @@ public class RegionManager { // Consider region A, B. After merge of (A, B) -> A, region ID B does not exist. // This request is unrecoverable. public TiRegion getRegionById(long regionId) { - return cache.getRegionById(ConcreteBackOffer.newGetBackOff(), regionId); + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + TiRegion region = cache.getRegionById(regionId); + if (region == null) { + Pair regionAndLeader = + pdClient.getRegionByID(backOffer, regionId); + region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer); + return cache.putRegion(region); + } + return region; } public Pair getRegionStorePairByKey(ByteString key, BackOffer backOffer) { @@ -151,10 +175,7 @@ public class RegionManager { public Pair getRegionStorePairByKey( ByteString key, TiStoreType storeType, BackOffer backOffer) { - TiRegion region = cache.getRegionByKey(key, backOffer); - if (region == null) { - throw new TiClientInternalException("Region not exist for key:" + formatBytesUTF8(key)); - } + TiRegion region = getRegionByKey(key, backOffer); if (!region.isValid()) { throw new TiClientInternalException("Region invalid: " + region.toString()); } @@ -162,7 +183,7 @@ public class RegionManager { TiStore store = null; if (storeType == TiStoreType.TiKV) { Peer peer = region.getCurrentReplica(); - store = cache.getStoreById(peer.getStoreId(), backOffer); + store = getStoreById(peer.getStoreId(), backOffer); if (store == null) { cache.clearAll(); } @@ -192,12 +213,33 @@ public class RegionManager { return Pair.create(region, store); } - public TiStore getStoreById(long id) { - return getStoreById(id, ConcreteBackOffer.newGetBackOff()); + private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { + List peers = region.getPeersList(); + List stores = getRegionStore(peers, backOffer); + return new TiRegion(conf, region, leader, peers, stores); + } + + private List getRegionStore(List peers, BackOffer backOffer) { + return peers.stream().map(p -> getStoreById(p.getStoreId())).collect(Collectors.toList()); } public TiStore getStoreById(long id, BackOffer backOffer) { - return cache.getStoreById(id, backOffer); + try { + TiStore store = cache.getStoreById(id); + if (store == null) { + store = new TiStore(pdClient.getStore(backOffer, id)); + } + if (store.getStore().getState().equals(StoreState.Tombstone)) { + return null; + } + return cache.putStore(id, store); + } catch (Exception e) { + throw new GrpcException(e); + } + } + + public TiStore getStoreById(long id) { + return getStoreById(id, ConcreteBackOffer.newGetBackOff()); } public void onRegionStale(TiRegion region) { @@ -253,16 +295,12 @@ public class RegionManager { private final Map regionCache; private final Map storeCache; private final RangeMap keyToRegionIdCache; - private final ReadOnlyPDClient pdClient; - private final TiConfiguration conf; - public RegionCache(TiConfiguration conf, ReadOnlyPDClient pdClient) { + public RegionCache() { regionCache = new HashMap<>(); storeCache = new HashMap<>(); keyToRegionIdCache = TreeRangeMap.create(); - this.conf = conf; - this.pdClient = pdClient; } public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { @@ -281,14 +319,7 @@ public class RegionManager { } if (regionId == null) { - logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key)); - Pair regionAndLeader = - pdClient.getRegionByKey(backOffer, key); - TiRegion region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer); - if (!putRegion(region)) { - throw new TiClientInternalException("Invalid Region: " + region.toString()); - } - return region; + return null; } TiRegion region; region = regionCache.get(regionId); @@ -302,29 +333,29 @@ public class RegionManager { } } - private synchronized boolean putRegion(TiRegion region) { + private synchronized TiRegion putRegion(TiRegion region) { if (logger.isDebugEnabled()) { logger.debug("putRegion: " + region); } + TiRegion oldRegion = regionCache.get(region.getId()); + if (oldRegion != null) { + if (oldRegion.getMeta().equals(region.getMeta())) { + return oldRegion; + } else { + invalidateRegion(oldRegion); + } + } regionCache.put(region.getId(), region); keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId()); - return true; + return region; } @Deprecated - private synchronized TiRegion getRegionById(BackOffer backOffer, long regionId) { + private synchronized TiRegion getRegionById(long regionId) { TiRegion region = regionCache.get(regionId); if (logger.isDebugEnabled()) { logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region)); } - if (region == null) { - Pair regionAndLeader = - pdClient.getRegionByID(backOffer, regionId); - region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer); - if (!putRegion(region)) { - throw new TiClientInternalException("Invalid Region: " + region.toString()); - } - } return region; } @@ -353,13 +384,15 @@ public class RegionManager { logger.debug(String.format("invalidateRegion ID[%s]", region.getId())); } TiRegion oldRegion = regionCache.get(region.getId()); - if (expected != oldRegion) { + if (!expected.getMeta().equals(oldRegion.getMeta())) { return false; } else { if (oldRegion != null) { keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey())); } - putRegion(region); + regionCache.put(region.getId(), region); + keyToRegionIdCache.put( + makeRange(region.getStartKey(), region.getEndKey()), region.getId()); return true; } } catch (Exception ignore) { @@ -412,33 +445,17 @@ public class RegionManager { } } - public synchronized TiStore getStoreById(long id, BackOffer backOffer) { - try { - TiStore store = storeCache.get(id); - if (store == null) { - store = new TiStore(pdClient.getStore(backOffer, id)); - } - if (store.getStore().getState().equals(StoreState.Tombstone)) { - return null; - } - storeCache.put(id, store); - return store; - } catch (Exception e) { - throw new GrpcException(e); + public synchronized TiStore getStoreById(long id) { + return storeCache.get(id); + } + + public synchronized TiStore putStore(long id, TiStore store) { + TiStore oldStore = storeCache.get(id); + if (oldStore != null && oldStore.getStore().equals(store.getStore())) { + return oldStore; } - } - - private List getRegionStore(List peers, BackOffer backOffer) { - return peers - .stream() - .map(p -> getStoreById(p.getStoreId(), backOffer)) - .collect(Collectors.toList()); - } - - private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { - List peers = region.getPeersList(); - List stores = getRegionStore(peers, backOffer); - return new TiRegion(conf, region, leader, peers, stores); + storeCache.put(id, store); + return store; } public synchronized void clearAll() { diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index eff368368d..b248d233d8 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -105,7 +105,7 @@ public class ConcreteBackOffer implements BackOffer { backOffFunction = BackOffFunction.create(500, 3000, BackOffStrategy.EqualJitter); break; case BoTiKVRPC: - backOffFunction = BackOffFunction.create(100, 2000, BackOffStrategy.EqualJitter); + backOffFunction = BackOffFunction.create(100, 400, BackOffStrategy.EqualJitter); break; case BoTxnNotFound: backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter);