mirror of https://github.com/tikv/client-java.git
pd-client grpc forward (#203)
* forward pd leader Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
This commit is contained in:
parent
c7dd5310df
commit
d29cf0079c
|
@ -18,10 +18,15 @@ package org.tikv.common;
|
|||
import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall;
|
||||
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.health.v1.HealthCheckRequest;
|
||||
import io.grpc.health.v1.HealthCheckResponse;
|
||||
import io.grpc.health.v1.HealthGrpc;
|
||||
import io.grpc.stub.AbstractStub;
|
||||
import io.grpc.stub.ClientCalls;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -171,4 +176,20 @@ public abstract class AbstractGRPCClient<
|
|||
protected abstract BlockingStubT getBlockingStub();
|
||||
|
||||
protected abstract StubT getAsyncStub();
|
||||
|
||||
protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
|
||||
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
|
||||
HealthGrpc.HealthBlockingStub stub =
|
||||
HealthGrpc.newBlockingStub(channel).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
|
||||
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
|
||||
try {
|
||||
HealthCheckResponse resp = stub.check(req);
|
||||
if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) {
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import io.etcd.jetcd.KeyValue;
|
|||
import io.etcd.jetcd.kv.GetResponse;
|
||||
import io.etcd.jetcd.options.GetOption;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.stub.MetadataUtils;
|
||||
import io.prometheus.client.Histogram;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
@ -39,10 +41,10 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.tikv.common.TiConfiguration.KVMode;
|
||||
|
@ -59,11 +61,11 @@ import org.tikv.common.util.BackOffFunction.BackOffFuncType;
|
|||
import org.tikv.common.util.BackOffer;
|
||||
import org.tikv.common.util.ChannelFactory;
|
||||
import org.tikv.common.util.ConcreteBackOffer;
|
||||
import org.tikv.common.util.FutureObserver;
|
||||
import org.tikv.kvproto.Metapb.Store;
|
||||
import org.tikv.kvproto.PDGrpc;
|
||||
import org.tikv.kvproto.PDGrpc.PDBlockingStub;
|
||||
import org.tikv.kvproto.PDGrpc.PDStub;
|
||||
import org.tikv.kvproto.Pdpb;
|
||||
import org.tikv.kvproto.Pdpb.Error;
|
||||
import org.tikv.kvproto.Pdpb.ErrorType;
|
||||
import org.tikv.kvproto.Pdpb.GetAllStoresRequest;
|
||||
|
@ -91,7 +93,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
private final Logger logger = LoggerFactory.getLogger(PDClient.class);
|
||||
private RequestHeader header;
|
||||
private TsoRequest tsoReq;
|
||||
private volatile LeaderWrapper leaderWrapper;
|
||||
private volatile PDClientWrapper pdClientWrapper;
|
||||
private ScheduledExecutorService service;
|
||||
private ScheduledExecutorService tiflashReplicaService;
|
||||
private List<URI> pdAddrs;
|
||||
|
@ -251,29 +253,6 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key) {
|
||||
FutureObserver<TiRegion, GetRegionResponse> responseObserver =
|
||||
new FutureObserver<>(
|
||||
resp ->
|
||||
new TiRegion(
|
||||
resp.getRegion(),
|
||||
resp.getLeader(),
|
||||
null,
|
||||
conf.getIsolationLevel(),
|
||||
conf.getCommandPriority(),
|
||||
conf.getKvMode(),
|
||||
conf.getReplicaSelector()));
|
||||
Supplier<GetRegionRequest> request =
|
||||
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
|
||||
|
||||
PDErrorHandler<GetRegionResponse> handler =
|
||||
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
|
||||
|
||||
callAsyncWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, responseObserver, handler);
|
||||
return responseObserver.getFuture();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TiRegion getRegionByID(BackOffer backOffer, long id) {
|
||||
Supplier<GetRegionByIDRequest> request =
|
||||
|
@ -294,30 +273,6 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
conf.getReplicaSelector());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<TiRegion> getRegionByIDAsync(BackOffer backOffer, long id) {
|
||||
FutureObserver<TiRegion, GetRegionResponse> responseObserver =
|
||||
new FutureObserver<>(
|
||||
resp ->
|
||||
new TiRegion(
|
||||
resp.getRegion(),
|
||||
resp.getLeader(),
|
||||
null,
|
||||
conf.getIsolationLevel(),
|
||||
conf.getCommandPriority(),
|
||||
conf.getKvMode(),
|
||||
conf.getReplicaSelector()));
|
||||
|
||||
Supplier<GetRegionByIDRequest> request =
|
||||
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
|
||||
PDErrorHandler<GetRegionResponse> handler =
|
||||
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
|
||||
|
||||
callAsyncWithRetry(
|
||||
backOffer, PDGrpc.getGetRegionByIDMethod(), request, responseObserver, handler);
|
||||
return responseObserver.getFuture();
|
||||
}
|
||||
|
||||
private Supplier<GetStoreRequest> buildGetStoreReq(long storeId) {
|
||||
return () -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build();
|
||||
}
|
||||
|
@ -338,20 +293,6 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
.getStore();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Store> getStoreAsync(BackOffer backOffer, long storeId) {
|
||||
FutureObserver<Store, GetStoreResponse> responseObserver =
|
||||
new FutureObserver<>(GetStoreResponse::getStore);
|
||||
|
||||
callAsyncWithRetry(
|
||||
backOffer,
|
||||
PDGrpc.getGetStoreMethod(),
|
||||
buildGetStoreReq(storeId),
|
||||
responseObserver,
|
||||
buildPDErrorHandler());
|
||||
return responseObserver.getFuture();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Store> getAllStores(BackOffer backOffer) {
|
||||
return callWithRetry(
|
||||
|
@ -389,8 +330,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
LeaderWrapper getLeaderWrapper() {
|
||||
return leaderWrapper;
|
||||
PDClientWrapper getPdClientWrapper() {
|
||||
return pdClientWrapper;
|
||||
}
|
||||
|
||||
private GetMembersResponse getMembers(URI uri) {
|
||||
|
@ -411,50 +352,127 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
return null;
|
||||
}
|
||||
|
||||
synchronized boolean switchLeader(List<String> leaderURLs) {
|
||||
if (leaderURLs.isEmpty()) return false;
|
||||
String leaderUrlStr = leaderURLs.get(0);
|
||||
// TODO: Why not strip protocol info on server side since grpc does not need it
|
||||
if (leaderWrapper != null && leaderUrlStr.equals(leaderWrapper.getLeaderInfo())) {
|
||||
return true;
|
||||
// return whether the leader has changed to target address `leaderUrlStr`.
|
||||
synchronized boolean trySwitchLeader(String leaderUrlStr) {
|
||||
if (pdClientWrapper != null) {
|
||||
if (leaderUrlStr.equals(pdClientWrapper.getLeaderInfo())) {
|
||||
// The message to leader is not forwarded by follower.
|
||||
if (leaderUrlStr.equals(pdClientWrapper.getStoreAddress())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// If leader has transfered to another member, we can create another leaderwrapper.
|
||||
}
|
||||
// switch leader
|
||||
return createLeaderWrapper(leaderUrlStr);
|
||||
return createLeaderClientWrapper(leaderUrlStr);
|
||||
}
|
||||
|
||||
private synchronized boolean createLeaderWrapper(String leaderUrlStr) {
|
||||
private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) {
|
||||
try {
|
||||
URI newLeader = addrToUri(leaderUrlStr);
|
||||
leaderUrlStr = uriToAddr(newLeader);
|
||||
if (leaderWrapper != null && leaderUrlStr.equals(leaderWrapper.getLeaderInfo())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// create new Leader
|
||||
ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr, hostMapping);
|
||||
leaderWrapper =
|
||||
new LeaderWrapper(
|
||||
leaderUrlStr,
|
||||
PDGrpc.newBlockingStub(clientChannel),
|
||||
PDGrpc.newStub(clientChannel),
|
||||
System.nanoTime());
|
||||
pdClientWrapper =
|
||||
new PDClientWrapper(leaderUrlStr, leaderUrlStr, clientChannel, System.nanoTime());
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.error("Error updating leader. " + leaderUrlStr, e);
|
||||
return false;
|
||||
}
|
||||
logger.info(String.format("Switched to new leader: %s", leaderWrapper));
|
||||
logger.info(String.format("Switched to new leader: %s", pdClientWrapper));
|
||||
return true;
|
||||
}
|
||||
|
||||
public void updateLeader() {
|
||||
synchronized boolean createFollowerClientWrapper(String followerUrlStr, String leaderUrls) {
|
||||
// TODO: Why not strip protocol info on server side since grpc does not need it
|
||||
|
||||
try {
|
||||
if (!checkHealth(followerUrlStr, hostMapping)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// create new Leader
|
||||
ManagedChannel channel = channelFactory.getChannel(followerUrlStr, hostMapping);
|
||||
pdClientWrapper = new PDClientWrapper(leaderUrls, followerUrlStr, channel, System.nanoTime());
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.error("Error updating follower. " + followerUrlStr, e);
|
||||
return false;
|
||||
}
|
||||
logger.info(String.format("Switched to new leader by follower forward: %s", pdClientWrapper));
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized void updateLeaderOrforwardFollower() {
|
||||
for (URI url : this.pdAddrs) {
|
||||
// since resp is null, we need update leader's address by walking through all pd server.
|
||||
GetMembersResponse resp = getMembers(url);
|
||||
if (resp == null) {
|
||||
continue;
|
||||
}
|
||||
if (resp.getLeader().getClientUrlsList().isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String leaderUrlStr = resp.getLeader().getClientUrlsList().get(0);
|
||||
leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
|
||||
|
||||
// if leader is switched, just return.
|
||||
if (switchLeader(resp.getLeader().getClientUrlsList())) {
|
||||
if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!conf.getEnableGrpcForward()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
List<Pdpb.Member> members = resp.getMembersList();
|
||||
|
||||
boolean hasReachNextMember = false;
|
||||
// If we have not used follower forward, try the first follower.
|
||||
if (pdClientWrapper != null && pdClientWrapper.getStoreAddress().equals(leaderUrlStr)) {
|
||||
hasReachNextMember = true;
|
||||
}
|
||||
|
||||
for (int i = 0; i < members.size() * 2; i++) {
|
||||
Pdpb.Member member = members.get(i % members.size());
|
||||
if (member.getMemberId() == resp.getLeader().getMemberId()) {
|
||||
continue;
|
||||
}
|
||||
String followerUrlStr = member.getClientUrlsList().get(0);
|
||||
followerUrlStr = uriToAddr(addrToUri(followerUrlStr));
|
||||
if (pdClientWrapper != null && pdClientWrapper.getStoreAddress().equals(followerUrlStr)) {
|
||||
hasReachNextMember = true;
|
||||
continue;
|
||||
}
|
||||
if (hasReachNextMember && createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pdClientWrapper == null) {
|
||||
throw new TiClientInternalException(
|
||||
"already tried all address on file, but not leader found yet.");
|
||||
}
|
||||
}
|
||||
|
||||
public void tryUpdateLeader() {
|
||||
for (URI url : this.pdAddrs) {
|
||||
// since resp is null, we need update leader's address by walking through all pd server.
|
||||
GetMembersResponse resp = getMembers(url);
|
||||
if (resp == null) {
|
||||
continue;
|
||||
}
|
||||
List<URI> urls =
|
||||
resp.getMembersList()
|
||||
.stream()
|
||||
.map(mem -> addrToUri(mem.getClientUrls(0)))
|
||||
.collect(Collectors.toList());
|
||||
String leaderUrlStr = resp.getLeader().getClientUrlsList().get(0);
|
||||
leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
|
||||
|
||||
// If leader is not change but becomes available, we can cancel follower forward.
|
||||
if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) {
|
||||
if (!urls.equals(this.pdAddrs)) {
|
||||
tryUpdateMembers(urls);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -462,6 +480,10 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
"already tried all address on file, but not leader found yet.");
|
||||
}
|
||||
|
||||
private synchronized void tryUpdateMembers(List<URI> members) {
|
||||
this.pdAddrs = members;
|
||||
}
|
||||
|
||||
public void updateTiFlashReplicaStatus() {
|
||||
ByteSequence prefix =
|
||||
ByteSequence.from(TIFLASH_TABLE_SYNC_PROGRESS_PATH, StandardCharsets.UTF_8);
|
||||
|
@ -517,18 +539,18 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
|
||||
@Override
|
||||
protected PDBlockingStub getBlockingStub() {
|
||||
if (leaderWrapper == null) {
|
||||
if (pdClientWrapper == null) {
|
||||
throw new GrpcException("PDClient may not be initialized");
|
||||
}
|
||||
return leaderWrapper.getBlockingStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
|
||||
return pdClientWrapper.getBlockingStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PDStub getAsyncStub() {
|
||||
if (leaderWrapper == null) {
|
||||
if (pdClientWrapper == null) {
|
||||
throw new GrpcException("PDClient may not be initialized");
|
||||
}
|
||||
return leaderWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
|
||||
return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private void initCluster() {
|
||||
|
@ -557,7 +579,16 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
header = RequestHeader.newBuilder().setClusterId(clusterId).build();
|
||||
tsoReq = TsoRequest.newBuilder().setHeader(header).setCount(1).build();
|
||||
this.tiflashReplicaMap = new ConcurrentHashMap<>();
|
||||
createLeaderWrapper(resp.getLeader().getClientUrls(0));
|
||||
this.pdAddrs =
|
||||
resp.getMembersList()
|
||||
.stream()
|
||||
.map(mem -> addrToUri(mem.getClientUrls(0)))
|
||||
.collect(Collectors.toList());
|
||||
logger.info("init cluster with address: " + this.pdAddrs);
|
||||
|
||||
String leaderUrlStr = resp.getLeader().getClientUrls(0);
|
||||
leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
|
||||
createLeaderClientWrapper(leaderUrlStr);
|
||||
service =
|
||||
Executors.newSingleThreadScheduledExecutor(
|
||||
new ThreadFactoryBuilder()
|
||||
|
@ -568,14 +599,14 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
() -> {
|
||||
// Wrap this with a try catch block in case schedule update fails
|
||||
try {
|
||||
updateLeader();
|
||||
tryUpdateLeader();
|
||||
} catch (Exception e) {
|
||||
logger.warn("Update leader failed", e);
|
||||
}
|
||||
},
|
||||
1,
|
||||
1,
|
||||
TimeUnit.MINUTES);
|
||||
10,
|
||||
10,
|
||||
TimeUnit.SECONDS);
|
||||
tiflashReplicaService =
|
||||
Executors.newSingleThreadScheduledExecutor(
|
||||
new ThreadFactoryBuilder()
|
||||
|
@ -586,20 +617,27 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
this::updateTiFlashReplicaStatus, 10, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
static class LeaderWrapper {
|
||||
static class PDClientWrapper {
|
||||
private final String leaderInfo;
|
||||
private final PDBlockingStub blockingStub;
|
||||
private final PDStub asyncStub;
|
||||
private final long createTime;
|
||||
private final String storeAddress;
|
||||
|
||||
LeaderWrapper(
|
||||
String leaderInfo,
|
||||
PDGrpc.PDBlockingStub blockingStub,
|
||||
PDGrpc.PDStub asyncStub,
|
||||
long createTime) {
|
||||
PDClientWrapper(
|
||||
String leaderInfo, String storeAddress, ManagedChannel clientChannel, long createTime) {
|
||||
if (!storeAddress.equals(leaderInfo)) {
|
||||
Metadata header = new Metadata();
|
||||
header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString());
|
||||
this.blockingStub =
|
||||
MetadataUtils.attachHeaders(PDGrpc.newBlockingStub(clientChannel), header);
|
||||
this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newStub(clientChannel), header);
|
||||
} else {
|
||||
this.blockingStub = PDGrpc.newBlockingStub(clientChannel);
|
||||
this.asyncStub = PDGrpc.newStub(clientChannel);
|
||||
}
|
||||
this.leaderInfo = leaderInfo;
|
||||
this.blockingStub = blockingStub;
|
||||
this.asyncStub = asyncStub;
|
||||
this.storeAddress = storeAddress;
|
||||
this.createTime = createTime;
|
||||
}
|
||||
|
||||
|
@ -607,6 +645,10 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
return leaderInfo;
|
||||
}
|
||||
|
||||
String getStoreAddress() {
|
||||
return storeAddress;
|
||||
}
|
||||
|
||||
PDBlockingStub getBlockingStub() {
|
||||
return blockingStub;
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@ package org.tikv.common;
|
|||
|
||||
import com.google.protobuf.ByteString;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Future;
|
||||
import org.tikv.common.meta.TiTimestamp;
|
||||
import org.tikv.common.region.TiRegion;
|
||||
import org.tikv.common.util.BackOffer;
|
||||
|
@ -40,8 +39,6 @@ public interface ReadOnlyPDClient {
|
|||
*/
|
||||
TiRegion getRegionByKey(BackOffer backOffer, ByteString key);
|
||||
|
||||
Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key);
|
||||
|
||||
/**
|
||||
* Get Region by Region Id
|
||||
*
|
||||
|
@ -50,8 +47,6 @@ public interface ReadOnlyPDClient {
|
|||
*/
|
||||
TiRegion getRegionByID(BackOffer backOffer, long id);
|
||||
|
||||
Future<TiRegion> getRegionByIDAsync(BackOffer backOffer, long id);
|
||||
|
||||
HostMapping getHostMapping();
|
||||
|
||||
/**
|
||||
|
@ -62,8 +57,6 @@ public interface ReadOnlyPDClient {
|
|||
*/
|
||||
Store getStore(BackOffer backOffer, long storeId);
|
||||
|
||||
Future<Store> getStoreAsync(BackOffer backOffer, long storeId);
|
||||
|
||||
List<Store> getAllStores(BackOffer backOffer);
|
||||
|
||||
TiConfiguration.ReplicaRead getReplicaRead();
|
||||
|
|
|
@ -35,6 +35,8 @@ public class TiConfiguration implements Serializable {
|
|||
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
|
||||
public static final Metadata.Key FORWARD_META_DATA_KEY =
|
||||
Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
|
||||
public static final Metadata.Key PD_FORWARD_META_DATA_KEY =
|
||||
Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
|
||||
|
||||
static {
|
||||
loadFromSystemProperties();
|
||||
|
|
|
@ -92,6 +92,7 @@ public class TiSession implements AutoCloseable {
|
|||
this.collectorRegistry.register(RetryPolicy.GRPC_SINGLE_REQUEST_LATENCY);
|
||||
this.collectorRegistry.register(RegionManager.GET_REGION_BY_KEY_REQUEST_LATENCY);
|
||||
this.collectorRegistry.register(PDClient.PD_GET_REGION_BY_KEY_REQUEST_LATENCY);
|
||||
this.enableGrpcForward = conf.getEnableGrpcForward();
|
||||
this.server =
|
||||
new HTTPServer(
|
||||
new InetSocketAddress(conf.getMetricsPort()), this.collectorRegistry, true);
|
||||
|
|
|
@ -48,7 +48,9 @@ public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
@Override
|
||||
public boolean handleResponseError(BackOffer backOffer, RespT resp) {
|
||||
if (resp == null) {
|
||||
return false;
|
||||
String msg = String.format("PD Request Failed with unknown reason");
|
||||
logger.warn(msg);
|
||||
return handleRequestError(backOffer, new GrpcException(msg));
|
||||
}
|
||||
PDError error = getError.apply(resp);
|
||||
if (error != null) {
|
||||
|
@ -56,7 +58,7 @@ public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
case PD_ERROR:
|
||||
backOffer.doBackOff(
|
||||
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
|
||||
client.updateLeader();
|
||||
client.updateLeaderOrforwardFollower();
|
||||
return true;
|
||||
case REGION_PEER_NOT_ELECTED:
|
||||
logger.debug(error.getMessage());
|
||||
|
@ -73,7 +75,7 @@ public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
@Override
|
||||
public boolean handleRequestError(BackOffer backOffer, Exception e) {
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
|
||||
client.updateLeader();
|
||||
client.updateLeaderOrforwardFollower();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -148,6 +148,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
|
|||
String.format(
|
||||
"Key not in region [%s] for key [%s], this error should not happen here.",
|
||||
recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey)));
|
||||
regionManager.clearRegionCache();
|
||||
throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString()));
|
||||
}
|
||||
|
||||
|
|
|
@ -108,12 +108,7 @@ public abstract class AbstractRegionStoreClient
|
|||
if (!conf.getEnableGrpcForward()) {
|
||||
return false;
|
||||
}
|
||||
if (region.getProxyStore() != null) {
|
||||
TiStore store = region.getProxyStore();
|
||||
if (!checkHealth(store) && store.markUnreachable()) {
|
||||
this.regionManager.scheduleHealthCheckJob(store);
|
||||
}
|
||||
} else {
|
||||
if (region.getProxyStore() == null) {
|
||||
if (!targetStore.isUnreachable()) {
|
||||
if (checkHealth(targetStore)) {
|
||||
return true;
|
||||
|
|
|
@ -85,7 +85,7 @@ public class RegionManager {
|
|||
UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient);
|
||||
this.storeChecker = storeChecker;
|
||||
this.executor = Executors.newScheduledThreadPool(1);
|
||||
this.executor.scheduleAtFixedRate(storeChecker, 5, 5, TimeUnit.SECONDS);
|
||||
this.executor.scheduleAtFixedRate(storeChecker, 10, 10, TimeUnit.SECONDS);
|
||||
} else {
|
||||
this.storeChecker = null;
|
||||
this.executor = null;
|
||||
|
@ -194,27 +194,25 @@ public class RegionManager {
|
|||
}
|
||||
|
||||
public synchronized TiRegion updateLeader(TiRegion region, long storeId) {
|
||||
TiRegion r = cache.getRegionFromCache(region.getId());
|
||||
if (r != null) {
|
||||
if (r.getLeader().getStoreId() == storeId) {
|
||||
return r;
|
||||
}
|
||||
TiRegion newRegion = r.switchPeer(storeId);
|
||||
if (newRegion != null) {
|
||||
cache.putRegion(newRegion);
|
||||
return newRegion;
|
||||
}
|
||||
// failed to switch leader, possibly region is outdated, we need to drop region cache from
|
||||
// regionCache
|
||||
logger.warn("Cannot find peer when updating leader (" + region.getId() + "," + storeId + ")");
|
||||
TiRegion newRegion = region.switchPeer(storeId);
|
||||
if (cache.updateRegion(region, newRegion)) {
|
||||
return newRegion;
|
||||
}
|
||||
// failed to switch leader, possibly region is outdated, we need to drop region cache from
|
||||
// regionCache
|
||||
logger.warn("Cannot find peer when updating leader (" + region.getId() + "," + storeId + ")");
|
||||
return null;
|
||||
}
|
||||
|
||||
public boolean updateRegion(TiRegion oldRegion, TiRegion region) {
|
||||
public synchronized boolean updateRegion(TiRegion oldRegion, TiRegion region) {
|
||||
return cache.updateRegion(oldRegion, region);
|
||||
}
|
||||
|
||||
/** Clears all cache when some unexpected error occurs. */
|
||||
public void clearRegionCache() {
|
||||
cache.clearAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears all cache when a TiKV server does not respond
|
||||
*
|
||||
|
|
|
@ -11,6 +11,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import org.tikv.common.ReadOnlyPDClient;
|
||||
import org.tikv.common.util.ChannelFactory;
|
||||
import org.tikv.common.util.ConcreteBackOffer;
|
||||
import org.tikv.kvproto.Metapb;
|
||||
|
||||
public class UnreachableStoreChecker implements Runnable {
|
||||
private ConcurrentHashMap<Long, TiStore> stores;
|
||||
|
@ -68,6 +70,11 @@ public class UnreachableStoreChecker implements Runnable {
|
|||
this.stores.remove(Long.valueOf(store.getId()));
|
||||
continue;
|
||||
}
|
||||
Metapb.Store newStore =
|
||||
pdClient.getStore(ConcreteBackOffer.newRawKVBackOff(), store.getId());
|
||||
if (newStore.getState() == Metapb.StoreState.Tombstone) {
|
||||
continue;
|
||||
}
|
||||
this.taskQueue.add(store);
|
||||
} catch (Exception e) {
|
||||
this.taskQueue.add(store);
|
||||
|
|
|
@ -700,7 +700,7 @@ public class RawKVClient implements AutoCloseable {
|
|||
} catch (final TiKVException e) {
|
||||
// TODO: any elegant way to re-split the ranges if fails?
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||
logger.warn("ReSplitting ranges for BatchPutRequest");
|
||||
logger.debug("ReSplitting ranges for BatchPutRequest");
|
||||
// retry
|
||||
return doSendBatchPutWithRefetchRegion(backOffer, batch);
|
||||
}
|
||||
|
@ -758,7 +758,7 @@ public class RawKVClient implements AutoCloseable {
|
|||
} catch (final TiKVException e) {
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
|
||||
logger.warn("ReSplitting ranges for BatchGetRequest", e);
|
||||
logger.debug("ReSplitting ranges for BatchGetRequest", e);
|
||||
|
||||
// retry
|
||||
return Pair.create(doSendBatchGetWithRefetchRegion(backOffer, batch), new ArrayList<>());
|
||||
|
@ -799,7 +799,7 @@ public class RawKVClient implements AutoCloseable {
|
|||
} catch (final TiKVException e) {
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
|
||||
logger.warn("ReSplitting ranges for BatchGetRequest", e);
|
||||
logger.debug("ReSplitting ranges for BatchGetRequest", e);
|
||||
|
||||
// retry
|
||||
return doSendBatchDeleteWithRefetchRegion(backOffer, batch);
|
||||
|
@ -849,7 +849,7 @@ public class RawKVClient implements AutoCloseable {
|
|||
} catch (final TiKVException e) {
|
||||
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
|
||||
clientBuilder.getRegionManager().invalidateRegion(range.getRegion());
|
||||
logger.warn("ReSplitting ranges for BatchDeleteRangeRequest", e);
|
||||
logger.debug("ReSplitting ranges for BatchDeleteRangeRequest", e);
|
||||
|
||||
// retry
|
||||
return doSendDeleteRangeWithRefetchRegion(backOffer, range);
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.tikv.common;
|
|||
import static org.junit.Assert.*;
|
||||
import static org.tikv.common.GrpcUtils.encodeKey;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.protobuf.ByteString;
|
||||
import java.util.concurrent.*;
|
||||
import org.junit.Test;
|
||||
|
@ -37,7 +36,7 @@ public class PDClientTest extends PDMockServerTest {
|
|||
@Test
|
||||
public void testCreate() throws Exception {
|
||||
try (PDClient client = session.getPDClient()) {
|
||||
assertEquals(client.getLeaderWrapper().getLeaderInfo(), LOCAL_ADDR + ":" + pdServer.port);
|
||||
assertEquals(client.getPdClientWrapper().getLeaderInfo(), LOCAL_ADDR + ":" + pdServer.port);
|
||||
assertEquals(client.getHeader().getClusterId(), CLUSTER_ID);
|
||||
}
|
||||
}
|
||||
|
@ -45,17 +44,16 @@ public class PDClientTest extends PDMockServerTest {
|
|||
@Test
|
||||
public void testSwitchLeader() throws Exception {
|
||||
try (PDClient client = session.getPDClient()) {
|
||||
client.switchLeader(ImmutableList.of("http://" + LOCAL_ADDR + ":" + (pdServer.port + 1)));
|
||||
client.trySwitchLeader("http://" + LOCAL_ADDR + ":" + (pdServer.port + 1));
|
||||
assertEquals(
|
||||
client.getLeaderWrapper().getLeaderInfo(), LOCAL_ADDR + ":" + (pdServer.port + 1));
|
||||
client.getPdClientWrapper().getLeaderInfo(), LOCAL_ADDR + ":" + (pdServer.port + 1));
|
||||
}
|
||||
tearDown();
|
||||
setUp(LOCAL_ADDR_IPV6);
|
||||
try (PDClient client = session.getPDClient()) {
|
||||
client.switchLeader(
|
||||
ImmutableList.of("http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)));
|
||||
client.trySwitchLeader("http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2));
|
||||
assertEquals(
|
||||
client.getLeaderWrapper().getLeaderInfo(), LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2));
|
||||
client.getPdClientWrapper().getLeaderInfo(), LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,33 +93,6 @@ public class PDClientTest extends PDMockServerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRegionByKeyAsync() throws Exception {
|
||||
byte[] startKey = new byte[] {1, 0, 2, 4};
|
||||
byte[] endKey = new byte[] {1, 0, 2, 5};
|
||||
int confVer = 1026;
|
||||
int ver = 1027;
|
||||
pdServer.addGetRegionResp(
|
||||
GrpcUtils.makeGetRegionResponse(
|
||||
pdServer.getClusterId(),
|
||||
GrpcUtils.makeRegion(
|
||||
1,
|
||||
encodeKey(startKey),
|
||||
encodeKey(endKey),
|
||||
GrpcUtils.makeRegionEpoch(confVer, ver),
|
||||
GrpcUtils.makePeer(1, 10),
|
||||
GrpcUtils.makePeer(2, 20))));
|
||||
try (PDClient client = session.getPDClient()) {
|
||||
TiRegion r = client.getRegionByKeyAsync(defaultBackOff(), ByteString.EMPTY).get();
|
||||
assertEquals(r.getStartKey(), ByteString.copyFrom(startKey));
|
||||
assertEquals(r.getEndKey(), ByteString.copyFrom(endKey));
|
||||
assertEquals(r.getRegionEpoch().getConfVer(), confVer);
|
||||
assertEquals(r.getRegionEpoch().getVersion(), ver);
|
||||
assertEquals(r.getLeader().getId(), 1);
|
||||
assertEquals(r.getLeader().getStoreId(), 10);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRegionById() throws Exception {
|
||||
byte[] startKey = new byte[] {1, 0, 2, 4};
|
||||
|
@ -150,33 +121,6 @@ public class PDClientTest extends PDMockServerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRegionByIdAsync() throws Exception {
|
||||
byte[] startKey = new byte[] {1, 0, 2, 4};
|
||||
byte[] endKey = new byte[] {1, 0, 2, 5};
|
||||
int confVer = 1026;
|
||||
int ver = 1027;
|
||||
pdServer.addGetRegionByIDResp(
|
||||
GrpcUtils.makeGetRegionResponse(
|
||||
pdServer.getClusterId(),
|
||||
GrpcUtils.makeRegion(
|
||||
1,
|
||||
encodeKey(startKey),
|
||||
encodeKey(endKey),
|
||||
GrpcUtils.makeRegionEpoch(confVer, ver),
|
||||
GrpcUtils.makePeer(1, 10),
|
||||
GrpcUtils.makePeer(2, 20))));
|
||||
try (PDClient client = session.getPDClient()) {
|
||||
TiRegion r = client.getRegionByIDAsync(defaultBackOff(), 0).get();
|
||||
assertEquals(r.getStartKey(), ByteString.copyFrom(startKey));
|
||||
assertEquals(r.getEndKey(), ByteString.copyFrom(endKey));
|
||||
assertEquals(r.getRegionEpoch().getConfVer(), confVer);
|
||||
assertEquals(r.getRegionEpoch().getVersion(), ver);
|
||||
assertEquals(r.getLeader().getId(), 1);
|
||||
assertEquals(r.getLeader().getStoreId(), 10);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetStore() throws Exception {
|
||||
long storeId = 1;
|
||||
|
@ -208,38 +152,6 @@ public class PDClientTest extends PDMockServerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetStoreAsync() throws Exception {
|
||||
long storeId = 1;
|
||||
String testAddress = "testAddress";
|
||||
pdServer.addGetStoreResp(
|
||||
GrpcUtils.makeGetStoreResponse(
|
||||
pdServer.getClusterId(),
|
||||
GrpcUtils.makeStore(
|
||||
storeId,
|
||||
testAddress,
|
||||
Metapb.StoreState.Up,
|
||||
GrpcUtils.makeStoreLabel("k1", "v1"),
|
||||
GrpcUtils.makeStoreLabel("k2", "v2"))));
|
||||
try (PDClient client = session.getPDClient()) {
|
||||
Store r = client.getStoreAsync(defaultBackOff(), 0).get();
|
||||
assertEquals(r.getId(), storeId);
|
||||
assertEquals(r.getAddress(), testAddress);
|
||||
assertEquals(r.getState(), Metapb.StoreState.Up);
|
||||
assertEquals(r.getLabels(0).getKey(), "k1");
|
||||
assertEquals(r.getLabels(1).getKey(), "k2");
|
||||
assertEquals(r.getLabels(0).getValue(), "v1");
|
||||
assertEquals(r.getLabels(1).getValue(), "v2");
|
||||
|
||||
pdServer.addGetStoreResp(
|
||||
GrpcUtils.makeGetStoreResponse(
|
||||
pdServer.getClusterId(),
|
||||
GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone)));
|
||||
assertEquals(
|
||||
StoreState.Tombstone, client.getStoreAsync(defaultBackOff(), 0).get().getState());
|
||||
}
|
||||
}
|
||||
|
||||
private BackOffer defaultBackOff() {
|
||||
return ConcreteBackOffer.newCustomBackOff(1000);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue