mirror of https://github.com/tikv/client-java.git
fix test (#235)
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
This commit is contained in:
parent
d4d1c6ac6a
commit
faf0cb435d
|
@ -51,14 +51,17 @@ public class ConfigUtils {
|
|||
public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping";
|
||||
public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward";
|
||||
public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout";
|
||||
public static final String TIKV_HEALTH_CHECK_PERIOD_DURATION =
|
||||
"tikv.health_check_period_duration";
|
||||
|
||||
public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas";
|
||||
|
||||
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
|
||||
public static final String DEF_TIMEOUT = "300ms";
|
||||
public static final String DEF_FORWARD_TIMEOUT = "600ms";
|
||||
public static final String DEF_TIMEOUT = "200ms";
|
||||
public static final String DEF_FORWARD_TIMEOUT = "300ms";
|
||||
public static final String DEF_SCAN_TIMEOUT = "20s";
|
||||
public static final int DEF_CHECK_HEALTH_TIMEOUT = 100;
|
||||
public static final int DEF_HEALTH_CHECK_PERIOD_DURATION = 300;
|
||||
public static final int DEF_SCAN_BATCH_SIZE = 10240;
|
||||
public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB
|
||||
public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000;
|
||||
|
|
|
@ -80,6 +80,7 @@ public class TiConfiguration implements Serializable {
|
|||
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
|
||||
setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE);
|
||||
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
|
||||
setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION);
|
||||
setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS);
|
||||
}
|
||||
|
||||
|
@ -265,6 +266,7 @@ public class TiConfiguration implements Serializable {
|
|||
private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
|
||||
private int metricsPort = getInt(TIKV_METRICS_PORT);
|
||||
private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT);
|
||||
private int healthCheckPeriodDuration = getInt(TIKV_HEALTH_CHECK_PERIOD_DURATION);
|
||||
|
||||
private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
|
||||
private HostMapping hostMapping = null;
|
||||
|
@ -573,6 +575,10 @@ public class TiConfiguration implements Serializable {
|
|||
return this.grpcHealthCheckTimeout;
|
||||
}
|
||||
|
||||
public long getHealthCheckPeriodDuration() {
|
||||
return this.healthCheckPeriodDuration;
|
||||
}
|
||||
|
||||
public boolean isEnableAtomicForCAS() {
|
||||
return enableAtomicForCAS;
|
||||
}
|
||||
|
|
|
@ -26,12 +26,10 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.tikv.common.catalog.Catalog;
|
||||
import org.tikv.common.event.CacheInvalidateEvent;
|
||||
import org.tikv.common.exception.TiKVException;
|
||||
import org.tikv.common.key.Key;
|
||||
import org.tikv.common.meta.TiTimestamp;
|
||||
|
@ -56,7 +54,6 @@ public class TiSession implements AutoCloseable {
|
|||
private static final Map<String, TiSession> sessionCachedMap = new HashMap<>();
|
||||
private final TiConfiguration conf;
|
||||
private final ChannelFactory channelFactory;
|
||||
private Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;
|
||||
// below object creation is either heavy or making connection (pd), pending for lazy loading
|
||||
private volatile PDClient client;
|
||||
private volatile Catalog catalog;
|
||||
|
@ -182,13 +179,7 @@ public class TiSession implements AutoCloseable {
|
|||
if (res == null) {
|
||||
synchronized (this) {
|
||||
if (regionManager == null) {
|
||||
regionManager =
|
||||
new RegionManager(
|
||||
getConf(),
|
||||
getPDClient(),
|
||||
this.cacheInvalidateCallback,
|
||||
this.channelFactory,
|
||||
this.enableGrpcForward);
|
||||
regionManager = new RegionManager(getConf(), getPDClient(), this.channelFactory);
|
||||
}
|
||||
res = regionManager;
|
||||
}
|
||||
|
@ -331,15 +322,6 @@ public class TiSession implements AutoCloseable {
|
|||
return channelFactory;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is used for setting call back function to invalidate cache information
|
||||
*
|
||||
* @param callBackFunc callback function
|
||||
*/
|
||||
public void injectCallBackFunc(Function<CacheInvalidateEvent, Void> callBackFunc) {
|
||||
this.cacheInvalidateCallback = callBackFunc;
|
||||
}
|
||||
|
||||
/**
|
||||
* split region and scatter
|
||||
*
|
||||
|
|
|
@ -125,6 +125,14 @@ public abstract class AbstractRegionStoreClient
|
|||
|
||||
@Override
|
||||
public boolean onStoreUnreachable() {
|
||||
if (!targetStore.isValid()) {
|
||||
logger.warn(
|
||||
String.format("store [%d] has been invalid", region.getId(), targetStore.getId()));
|
||||
targetStore = regionManager.getStoreById(targetStore.getId());
|
||||
updateClientStub();
|
||||
return true;
|
||||
}
|
||||
|
||||
if (targetStore.getProxyStore() == null) {
|
||||
if (targetStore.isReachable()) {
|
||||
return true;
|
||||
|
@ -238,20 +246,22 @@ public abstract class AbstractRegionStoreClient
|
|||
|
||||
private void updateClientStub() {
|
||||
String addressStr = targetStore.getStore().getAddress();
|
||||
if (targetStore.getProxyStore() != null) {
|
||||
addressStr = targetStore.getProxyStore().getAddress();
|
||||
}
|
||||
ManagedChannel channel =
|
||||
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
|
||||
blockingStub = TikvGrpc.newBlockingStub(channel);
|
||||
asyncStub = TikvGrpc.newStub(channel);
|
||||
if (targetStore.getProxyStore() != null) {
|
||||
Metadata header = new Metadata();
|
||||
header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress());
|
||||
blockingStub = MetadataUtils.attachHeaders(blockingStub, header);
|
||||
asyncStub = MetadataUtils.attachHeaders(asyncStub, header);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean retryOtherStoreByProxyForward() {
|
||||
if (!targetStore.isValid()) {
|
||||
targetStore = regionManager.getStoreById(targetStore.getId());
|
||||
logger.warn(
|
||||
String.format("store [%d] has been invalid", region.getId(), targetStore.getId()));
|
||||
return true;
|
||||
}
|
||||
|
||||
TiStore proxyStore = switchProxyStore();
|
||||
if (proxyStore == null) {
|
||||
logger.warn(
|
||||
|
@ -268,19 +278,13 @@ public abstract class AbstractRegionStoreClient
|
|||
}
|
||||
targetStore = proxyStore;
|
||||
retryForwardTimes += 1;
|
||||
updateClientStub();
|
||||
logger.warn(
|
||||
String.format(
|
||||
"forward request to store [%s] by store [%s] for region[%d]",
|
||||
targetStore.getStore().getAddress(),
|
||||
targetStore.getProxyStore().getAddress(),
|
||||
region.getId()));
|
||||
String addressStr = targetStore.getProxyStore().getAddress();
|
||||
ManagedChannel channel =
|
||||
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
|
||||
Metadata header = new Metadata();
|
||||
header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress());
|
||||
blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header);
|
||||
asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -25,13 +25,11 @@ import java.util.List;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.tikv.common.ReadOnlyPDClient;
|
||||
import org.tikv.common.TiConfiguration;
|
||||
import org.tikv.common.event.CacheInvalidateEvent;
|
||||
import org.tikv.common.exception.GrpcException;
|
||||
import org.tikv.common.exception.TiClientInternalException;
|
||||
import org.tikv.common.util.BackOffer;
|
||||
|
@ -59,50 +57,24 @@ public class RegionManager {
|
|||
private final ScheduledExecutorService executor;
|
||||
private final StoreHealthyChecker storeChecker;
|
||||
|
||||
private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;
|
||||
|
||||
// To avoid double retrieval, we used the async version of grpc
|
||||
// When rpc not returned, instead of call again, it wait for previous one done
|
||||
public RegionManager(
|
||||
TiConfiguration conf,
|
||||
ReadOnlyPDClient pdClient,
|
||||
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
|
||||
TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) {
|
||||
this.cache = new RegionCache();
|
||||
this.pdClient = pdClient;
|
||||
this.conf = conf;
|
||||
this.cacheInvalidateCallback = cacheInvalidateCallback;
|
||||
this.executor = null;
|
||||
this.storeChecker = null;
|
||||
}
|
||||
|
||||
public RegionManager(
|
||||
TiConfiguration conf,
|
||||
ReadOnlyPDClient pdClient,
|
||||
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback,
|
||||
ChannelFactory channelFactory,
|
||||
boolean enableGrpcForward) {
|
||||
this.cache = new RegionCache();
|
||||
this.cacheInvalidateCallback = cacheInvalidateCallback;
|
||||
this.pdClient = pdClient;
|
||||
this.conf = conf;
|
||||
|
||||
if (enableGrpcForward) {
|
||||
StoreHealthyChecker storeChecker =
|
||||
new StoreHealthyChecker(channelFactory, pdClient, this.cache);
|
||||
this.storeChecker = storeChecker;
|
||||
this.executor = Executors.newScheduledThreadPool(1);
|
||||
this.executor.scheduleAtFixedRate(storeChecker, 1, 1, TimeUnit.SECONDS);
|
||||
} else {
|
||||
this.storeChecker = null;
|
||||
this.executor = null;
|
||||
}
|
||||
long period = conf.getHealthCheckPeriodDuration();
|
||||
StoreHealthyChecker storeChecker =
|
||||
new StoreHealthyChecker(
|
||||
channelFactory, pdClient, this.cache, conf.getGrpcHealthCheckTimeout());
|
||||
this.storeChecker = storeChecker;
|
||||
this.executor = Executors.newScheduledThreadPool(1);
|
||||
this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
|
||||
this.cache = new RegionCache();
|
||||
this.pdClient = pdClient;
|
||||
this.conf = conf;
|
||||
this.cacheInvalidateCallback = null;
|
||||
this.storeChecker = null;
|
||||
this.executor = null;
|
||||
}
|
||||
|
@ -113,10 +85,6 @@ public class RegionManager {
|
|||
}
|
||||
}
|
||||
|
||||
public Function<CacheInvalidateEvent, Void> getCacheInvalidateCallback() {
|
||||
return cacheInvalidateCallback;
|
||||
}
|
||||
|
||||
public ReadOnlyPDClient getPDClient() {
|
||||
return this.pdClient;
|
||||
}
|
||||
|
@ -234,7 +202,7 @@ public class RegionManager {
|
|||
if (store.getStore().getState().equals(StoreState.Tombstone)) {
|
||||
return null;
|
||||
}
|
||||
if (cache.putStore(id, store)) {
|
||||
if (cache.putStore(id, store) && storeChecker != null) {
|
||||
storeChecker.scheduleStoreHealthCheck(store);
|
||||
}
|
||||
return store;
|
||||
|
@ -266,7 +234,7 @@ public class RegionManager {
|
|||
}
|
||||
|
||||
public synchronized void updateStore(TiStore oldStore, TiStore newStore) {
|
||||
if (cache.updateStore(oldStore, newStore)) {
|
||||
if (cache.updateStore(oldStore, newStore) && storeChecker != null) {
|
||||
storeChecker.scheduleStoreHealthCheck(newStore);
|
||||
}
|
||||
}
|
||||
|
@ -285,24 +253,6 @@ public class RegionManager {
|
|||
cache.invalidateRegion(region);
|
||||
}
|
||||
|
||||
/** If region has changed, return the new one and update cache. */
|
||||
public TiRegion getRegionSkipCache(TiRegion region) {
|
||||
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
|
||||
try {
|
||||
Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
|
||||
pdClient.getRegionByID(backOffer, region.getId());
|
||||
if (!regionAndLeader.first.equals(region.getMeta())) {
|
||||
region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
|
||||
return cache.putRegion(region);
|
||||
} else {
|
||||
logger.warn("Cannot get region from PD for region id: " + region.getId());
|
||||
return null;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void invalidateStore(long storeId) {
|
||||
cache.invalidateStore(storeId);
|
||||
}
|
||||
|
|
|
@ -19,20 +19,21 @@ import org.tikv.kvproto.Metapb;
|
|||
public class StoreHealthyChecker implements Runnable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class);
|
||||
private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60;
|
||||
private static final long SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK = 500;
|
||||
private BlockingQueue<TiStore> taskQueue;
|
||||
private final ChannelFactory channelFactory;
|
||||
private final ReadOnlyPDClient pdClient;
|
||||
private final RegionCache cache;
|
||||
private long checkTombstoneTick;
|
||||
private long timeout;
|
||||
|
||||
public StoreHealthyChecker(
|
||||
ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache) {
|
||||
ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache, long timeout) {
|
||||
this.taskQueue = new LinkedBlockingQueue<>();
|
||||
this.channelFactory = channelFactory;
|
||||
this.pdClient = pdClient;
|
||||
this.cache = cache;
|
||||
this.checkTombstoneTick = 0;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public boolean scheduleStoreHealthCheck(TiStore store) {
|
||||
|
@ -64,7 +65,7 @@ public class StoreHealthyChecker implements Runnable {
|
|||
try {
|
||||
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
|
||||
HealthGrpc.HealthBlockingStub stub =
|
||||
HealthGrpc.newBlockingStub(channel).withDeadlineAfter(200, TimeUnit.MILLISECONDS);
|
||||
HealthGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
|
||||
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
|
||||
HealthCheckResponse resp = stub.check(req);
|
||||
if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
|
||||
|
@ -133,7 +134,7 @@ public class StoreHealthyChecker implements Runnable {
|
|||
}
|
||||
if (!unreachableStore.isEmpty()) {
|
||||
try {
|
||||
Thread.sleep(SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK);
|
||||
Thread.sleep(timeout);
|
||||
} catch (Exception e) {
|
||||
this.taskQueue.addAll(unreachableStore);
|
||||
return;
|
||||
|
|
|
@ -61,7 +61,7 @@ public class RegionManagerTest extends PDMockServerTest {
|
|||
int confVer = 1026;
|
||||
int ver = 1027;
|
||||
long regionId = 233;
|
||||
String testAddress = "testAddress";
|
||||
String testAddress = "127.0.0.1";
|
||||
pdServer.addGetRegionResp(
|
||||
GrpcUtils.makeGetRegionResponse(
|
||||
pdServer.getClusterId(),
|
||||
|
@ -92,11 +92,7 @@ public class RegionManagerTest extends PDMockServerTest {
|
|||
|
||||
// This will in turn invoke rpc and results in an error
|
||||
// since we set just one rpc response
|
||||
try {
|
||||
mgr.getRegionByKey(searchKeyNotExists);
|
||||
fail();
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
assertNull(mgr.getRegionByKey(searchKeyNotExists));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue