Refactor Region Cache to reduce lock (#228)

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
This commit is contained in:
Wallace 2021-07-07 11:37:51 +08:00 committed by GitHub
parent 01898542c5
commit 48b104f196
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 104 additions and 71 deletions

View File

@ -55,10 +55,10 @@ public class ConfigUtils {
public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "150ms";
public static final String DEF_TIMEOUT = "300ms";
public static final String DEF_FORWARD_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_CHECK_HEALTH_TIMEOUT = 40;
public static final int DEF_CHECK_HEALTH_TIMEOUT = 100;
public static final int DEF_SCAN_BATCH_SIZE = 10240;
public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB
public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000;

View File

@ -93,6 +93,7 @@ import org.tikv.kvproto.Pdpb.TsoResponse;
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
implements ReadOnlyPDClient {
private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
private static final long MIN_TRY_UPDATE_DURATION = 50;
private final Logger logger = LoggerFactory.getLogger(PDClient.class);
private RequestHeader header;
private TsoRequest tsoReq;
@ -103,6 +104,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
private Client etcdClient;
private ConcurrentMap<Long, Double> tiflashReplicaMap;
private HostMapping hostMapping;
private long lastUpdateLeaderTime;
public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY =
Histogram.build()
@ -392,6 +394,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
public synchronized void updateLeaderOrforwardFollower() {
if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
return;
}
for (URI url : this.pdAddrs) {
// since resp is null, we need update leader's address by walking through all pd server.
GetMembersResponse resp = getMembers(url);
@ -407,6 +412,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
// if leader is switched, just return.
if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) {
lastUpdateLeaderTime = System.currentTimeMillis();
return;
}
@ -441,6 +447,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
}
}
lastUpdateLeaderTime = System.currentTimeMillis();
if (pdClientWrapper == null) {
throw new TiClientInternalException(
"already tried all address on file, but not leader found yet.");
@ -470,6 +477,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
return;
}
}
lastUpdateLeaderTime = System.currentTimeMillis();
if (pdClientWrapper == null) {
throw new TiClientInternalException(
"already tried all address on file, but not leader found yet.");

View File

@ -36,7 +36,6 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
public boolean handleResponseError(BackOffer backOffer, RespT resp) {
if (resp == null) {
String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion());
logger.warn(msg);
return handleRequestError(backOffer, new GrpcException(msg));
}
// Region error handling logic
@ -171,6 +170,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
return true;
}
logger.warn("request failed because of: " + e.getMessage());
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoTiKVRPC,
new GrpcException(

View File

@ -79,6 +79,8 @@ public abstract class RetryPolicy<RespT> {
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
continue;
} else {
return result;
}
}

View File

@ -105,6 +105,7 @@ public abstract class AbstractRegionStoreClient
}
region = newRegion;
targetStore = regionManager.getStoreById(region.getLeader().getStoreId());
originStore = null;
String addressStr = targetStore.getStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
@ -128,7 +129,7 @@ public abstract class AbstractRegionStoreClient
} else if (retryTimes > region.getFollowerList().size()) {
logger.warn(
String.format(
"retry time exceed for region[%d], invalid this region and store[%d]",
"retry time exceed for region[%d], invalid this region[%d]",
region.getId(), targetStore.getId()));
regionManager.onRequestFail(region);
return false;
@ -139,6 +140,7 @@ public abstract class AbstractRegionStoreClient
String.format(
"no forward store can be selected for store [%s] and region[%d]",
targetStore.getStore().getAddress(), region.getId()));
regionManager.onRequestFail(region);
return false;
}
if (originStore == null) {
@ -168,6 +170,10 @@ public abstract class AbstractRegionStoreClient
@Override
protected void tryUpdateProxy() {
if (originStore != null) {
logger.warn(
String.format(
"update store [%s] by proxy-store [%s]",
targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress()));
regionManager.updateStore(originStore, targetStore);
}
}

View File

@ -55,6 +55,8 @@ public class RegionManager {
// TODO: the region cache logic need rewrite.
// https://github.com/pingcap/tispark/issues/1170
private final RegionCache cache;
private final ReadOnlyPDClient pdClient;
private final TiConfiguration conf;
private final ScheduledExecutorService executor;
private final UnreachableStoreChecker storeChecker;
@ -72,7 +74,9 @@ public class RegionManager {
TiConfiguration conf,
ReadOnlyPDClient pdClient,
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
this.cache = new RegionCache(conf, pdClient);
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
this.cacheInvalidateCallback = cacheInvalidateCallback;
this.executor = null;
this.storeChecker = null;
@ -84,8 +88,11 @@ public class RegionManager {
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback,
ChannelFactory channelFactory,
boolean enableGrpcForward) {
this.cache = new RegionCache(conf, pdClient);
this.cache = new RegionCache();
this.cacheInvalidateCallback = cacheInvalidateCallback;
this.pdClient = pdClient;
this.conf = conf;
if (enableGrpcForward) {
UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient);
this.storeChecker = storeChecker;
@ -98,7 +105,9 @@ public class RegionManager {
}
public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
this.cache = new RegionCache(conf, pdClient);
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
this.cacheInvalidateCallback = null;
this.storeChecker = null;
this.executor = null;
@ -115,7 +124,7 @@ public class RegionManager {
}
public ReadOnlyPDClient getPDClient() {
return this.cache.pdClient;
return this.pdClient;
}
public TiRegion getRegionByKey(ByteString key) {
@ -123,7 +132,14 @@ public class RegionManager {
}
public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
return cache.getRegionByKey(key, backOffer);
TiRegion region = cache.getRegionByKey(key, backOffer);
if (region == null) {
logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key));
Pair<Metapb.Region, Metapb.Peer> regionAndLeader = pdClient.getRegionByKey(backOffer, key);
region =
cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer));
}
return region;
}
@Deprecated
@ -134,7 +150,15 @@ public class RegionManager {
// Consider region A, B. After merge of (A, B) -> A, region ID B does not exist.
// This request is unrecoverable.
public TiRegion getRegionById(long regionId) {
return cache.getRegionById(ConcreteBackOffer.newGetBackOff(), regionId);
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
TiRegion region = cache.getRegionById(regionId);
if (region == null) {
Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
pdClient.getRegionByID(backOffer, regionId);
region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
return cache.putRegion(region);
}
return region;
}
public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, BackOffer backOffer) {
@ -151,10 +175,7 @@ public class RegionManager {
public Pair<TiRegion, TiStore> getRegionStorePairByKey(
ByteString key, TiStoreType storeType, BackOffer backOffer) {
TiRegion region = cache.getRegionByKey(key, backOffer);
if (region == null) {
throw new TiClientInternalException("Region not exist for key:" + formatBytesUTF8(key));
}
TiRegion region = getRegionByKey(key, backOffer);
if (!region.isValid()) {
throw new TiClientInternalException("Region invalid: " + region.toString());
}
@ -162,7 +183,7 @@ public class RegionManager {
TiStore store = null;
if (storeType == TiStoreType.TiKV) {
Peer peer = region.getCurrentReplica();
store = cache.getStoreById(peer.getStoreId(), backOffer);
store = getStoreById(peer.getStoreId(), backOffer);
if (store == null) {
cache.clearAll();
}
@ -192,12 +213,33 @@ public class RegionManager {
return Pair.create(region, store);
}
public TiStore getStoreById(long id) {
return getStoreById(id, ConcreteBackOffer.newGetBackOff());
private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, leader, peers, stores);
}
private List<TiStore> getRegionStore(List<Metapb.Peer> peers, BackOffer backOffer) {
return peers.stream().map(p -> getStoreById(p.getStoreId())).collect(Collectors.toList());
}
public TiStore getStoreById(long id, BackOffer backOffer) {
return cache.getStoreById(id, backOffer);
try {
TiStore store = cache.getStoreById(id);
if (store == null) {
store = new TiStore(pdClient.getStore(backOffer, id));
}
if (store.getStore().getState().equals(StoreState.Tombstone)) {
return null;
}
return cache.putStore(id, store);
} catch (Exception e) {
throw new GrpcException(e);
}
}
public TiStore getStoreById(long id) {
return getStoreById(id, ConcreteBackOffer.newGetBackOff());
}
public void onRegionStale(TiRegion region) {
@ -253,16 +295,12 @@ public class RegionManager {
private final Map<Long, TiRegion> regionCache;
private final Map<Long, TiStore> storeCache;
private final RangeMap<Key, Long> keyToRegionIdCache;
private final ReadOnlyPDClient pdClient;
private final TiConfiguration conf;
public RegionCache(TiConfiguration conf, ReadOnlyPDClient pdClient) {
public RegionCache() {
regionCache = new HashMap<>();
storeCache = new HashMap<>();
keyToRegionIdCache = TreeRangeMap.create();
this.conf = conf;
this.pdClient = pdClient;
}
public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
@ -281,14 +319,7 @@ public class RegionManager {
}
if (regionId == null) {
logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key));
Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
pdClient.getRegionByKey(backOffer, key);
TiRegion region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
if (!putRegion(region)) {
throw new TiClientInternalException("Invalid Region: " + region.toString());
}
return region;
return null;
}
TiRegion region;
region = regionCache.get(regionId);
@ -302,29 +333,29 @@ public class RegionManager {
}
}
private synchronized boolean putRegion(TiRegion region) {
private synchronized TiRegion putRegion(TiRegion region) {
if (logger.isDebugEnabled()) {
logger.debug("putRegion: " + region);
}
TiRegion oldRegion = regionCache.get(region.getId());
if (oldRegion != null) {
if (oldRegion.getMeta().equals(region.getMeta())) {
return oldRegion;
} else {
invalidateRegion(oldRegion);
}
}
regionCache.put(region.getId(), region);
keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId());
return true;
return region;
}
@Deprecated
private synchronized TiRegion getRegionById(BackOffer backOffer, long regionId) {
private synchronized TiRegion getRegionById(long regionId) {
TiRegion region = regionCache.get(regionId);
if (logger.isDebugEnabled()) {
logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region));
}
if (region == null) {
Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
pdClient.getRegionByID(backOffer, regionId);
region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
if (!putRegion(region)) {
throw new TiClientInternalException("Invalid Region: " + region.toString());
}
}
return region;
}
@ -353,13 +384,15 @@ public class RegionManager {
logger.debug(String.format("invalidateRegion ID[%s]", region.getId()));
}
TiRegion oldRegion = regionCache.get(region.getId());
if (expected != oldRegion) {
if (!expected.getMeta().equals(oldRegion.getMeta())) {
return false;
} else {
if (oldRegion != null) {
keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey()));
}
putRegion(region);
regionCache.put(region.getId(), region);
keyToRegionIdCache.put(
makeRange(region.getStartKey(), region.getEndKey()), region.getId());
return true;
}
} catch (Exception ignore) {
@ -412,33 +445,17 @@ public class RegionManager {
}
}
public synchronized TiStore getStoreById(long id, BackOffer backOffer) {
try {
TiStore store = storeCache.get(id);
if (store == null) {
store = new TiStore(pdClient.getStore(backOffer, id));
}
if (store.getStore().getState().equals(StoreState.Tombstone)) {
return null;
}
storeCache.put(id, store);
return store;
} catch (Exception e) {
throw new GrpcException(e);
public synchronized TiStore getStoreById(long id) {
return storeCache.get(id);
}
public synchronized TiStore putStore(long id, TiStore store) {
TiStore oldStore = storeCache.get(id);
if (oldStore != null && oldStore.getStore().equals(store.getStore())) {
return oldStore;
}
}
private List<TiStore> getRegionStore(List<Metapb.Peer> peers, BackOffer backOffer) {
return peers
.stream()
.map(p -> getStoreById(p.getStoreId(), backOffer))
.collect(Collectors.toList());
}
private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, leader, peers, stores);
storeCache.put(id, store);
return store;
}
public synchronized void clearAll() {

View File

@ -105,7 +105,7 @@ public class ConcreteBackOffer implements BackOffer {
backOffFunction = BackOffFunction.create(500, 3000, BackOffStrategy.EqualJitter);
break;
case BoTiKVRPC:
backOffFunction = BackOffFunction.create(100, 2000, BackOffStrategy.EqualJitter);
backOffFunction = BackOffFunction.create(100, 400, BackOffStrategy.EqualJitter);
break;
case BoTxnNotFound:
backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter);