From d29cf0079c27d5443341f1969c39393b738562aa Mon Sep 17 00:00:00 2001 From: Wallace Date: Tue, 22 Jun 2021 18:22:42 +0800 Subject: [PATCH] pd-client grpc forward (#203) * forward pd leader Signed-off-by: Little-Wallace --- .../org/tikv/common/AbstractGRPCClient.java | 21 ++ src/main/java/org/tikv/common/PDClient.java | 254 ++++++++++-------- .../org/tikv/common/ReadOnlyPDClient.java | 7 - .../java/org/tikv/common/TiConfiguration.java | 2 + src/main/java/org/tikv/common/TiSession.java | 1 + .../tikv/common/operation/PDErrorHandler.java | 8 +- .../common/operation/RegionErrorHandler.java | 1 + .../region/AbstractRegionStoreClient.java | 7 +- .../org/tikv/common/region/RegionManager.java | 28 +- .../region/UnreachableStoreChecker.java | 7 + src/main/java/org/tikv/raw/RawKVClient.java | 8 +- .../java/org/tikv/common/PDClientTest.java | 98 +------ 12 files changed, 208 insertions(+), 234 deletions(-) diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index ebfca0d387..813b98629a 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -18,10 +18,15 @@ package org.tikv.common; import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall; import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; +import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; import io.grpc.stub.AbstractStub; import io.grpc.stub.ClientCalls; import io.grpc.stub.StreamObserver; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,4 +176,20 @@ public abstract class AbstractGRPCClient< protected abstract BlockingStubT getBlockingStub(); protected abstract StubT getAsyncStub(); + + protected boolean checkHealth(String addressStr, HostMapping hostMapping) { + ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); + HealthGrpc.HealthBlockingStub stub = + HealthGrpc.newBlockingStub(channel).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); + try { + HealthCheckResponse resp = stub.check(req); + if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) { + return false; + } + } catch (Exception e) { + return false; + } + return true; + } } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 20693547d3..bfcecc137b 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -30,6 +30,8 @@ import io.etcd.jetcd.KeyValue; import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.options.GetOption; import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.stub.MetadataUtils; import io.prometheus.client.Histogram; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -39,10 +41,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration.KVMode; @@ -59,11 +61,11 @@ import org.tikv.common.util.BackOffFunction.BackOffFuncType; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; -import org.tikv.common.util.FutureObserver; import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.PDGrpc; import org.tikv.kvproto.PDGrpc.PDBlockingStub; import org.tikv.kvproto.PDGrpc.PDStub; +import org.tikv.kvproto.Pdpb; import org.tikv.kvproto.Pdpb.Error; import org.tikv.kvproto.Pdpb.ErrorType; import org.tikv.kvproto.Pdpb.GetAllStoresRequest; @@ -91,7 +93,7 @@ public class PDClient extends AbstractGRPCClient private final Logger logger = LoggerFactory.getLogger(PDClient.class); private RequestHeader header; private TsoRequest tsoReq; - private volatile LeaderWrapper leaderWrapper; + private volatile PDClientWrapper pdClientWrapper; private ScheduledExecutorService service; private ScheduledExecutorService tiflashReplicaService; private List pdAddrs; @@ -251,29 +253,6 @@ public class PDClient extends AbstractGRPCClient } } - @Override - public Future getRegionByKeyAsync(BackOffer backOffer, ByteString key) { - FutureObserver responseObserver = - new FutureObserver<>( - resp -> - new TiRegion( - resp.getRegion(), - resp.getLeader(), - null, - conf.getIsolationLevel(), - conf.getCommandPriority(), - conf.getKvMode(), - conf.getReplicaSelector())); - Supplier request = - () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build(); - - PDErrorHandler handler = - new PDErrorHandler<>(getRegionResponseErrorExtractor, this); - - callAsyncWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, responseObserver, handler); - return responseObserver.getFuture(); - } - @Override public TiRegion getRegionByID(BackOffer backOffer, long id) { Supplier request = @@ -294,30 +273,6 @@ public class PDClient extends AbstractGRPCClient conf.getReplicaSelector()); } - @Override - public Future getRegionByIDAsync(BackOffer backOffer, long id) { - FutureObserver responseObserver = - new FutureObserver<>( - resp -> - new TiRegion( - resp.getRegion(), - resp.getLeader(), - null, - conf.getIsolationLevel(), - conf.getCommandPriority(), - conf.getKvMode(), - conf.getReplicaSelector())); - - Supplier request = - () -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build(); - PDErrorHandler handler = - new PDErrorHandler<>(getRegionResponseErrorExtractor, this); - - callAsyncWithRetry( - backOffer, PDGrpc.getGetRegionByIDMethod(), request, responseObserver, handler); - return responseObserver.getFuture(); - } - private Supplier buildGetStoreReq(long storeId) { return () -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build(); } @@ -338,20 +293,6 @@ public class PDClient extends AbstractGRPCClient .getStore(); } - @Override - public Future getStoreAsync(BackOffer backOffer, long storeId) { - FutureObserver responseObserver = - new FutureObserver<>(GetStoreResponse::getStore); - - callAsyncWithRetry( - backOffer, - PDGrpc.getGetStoreMethod(), - buildGetStoreReq(storeId), - responseObserver, - buildPDErrorHandler()); - return responseObserver.getFuture(); - } - @Override public List getAllStores(BackOffer backOffer) { return callWithRetry( @@ -389,8 +330,8 @@ public class PDClient extends AbstractGRPCClient } @VisibleForTesting - LeaderWrapper getLeaderWrapper() { - return leaderWrapper; + PDClientWrapper getPdClientWrapper() { + return pdClientWrapper; } private GetMembersResponse getMembers(URI uri) { @@ -411,50 +352,127 @@ public class PDClient extends AbstractGRPCClient return null; } - synchronized boolean switchLeader(List leaderURLs) { - if (leaderURLs.isEmpty()) return false; - String leaderUrlStr = leaderURLs.get(0); - // TODO: Why not strip protocol info on server side since grpc does not need it - if (leaderWrapper != null && leaderUrlStr.equals(leaderWrapper.getLeaderInfo())) { - return true; + // return whether the leader has changed to target address `leaderUrlStr`. + synchronized boolean trySwitchLeader(String leaderUrlStr) { + if (pdClientWrapper != null) { + if (leaderUrlStr.equals(pdClientWrapper.getLeaderInfo())) { + // The message to leader is not forwarded by follower. + if (leaderUrlStr.equals(pdClientWrapper.getStoreAddress())) { + return true; + } + } + // If leader has transfered to another member, we can create another leaderwrapper. } // switch leader - return createLeaderWrapper(leaderUrlStr); + return createLeaderClientWrapper(leaderUrlStr); } - private synchronized boolean createLeaderWrapper(String leaderUrlStr) { + private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) { try { - URI newLeader = addrToUri(leaderUrlStr); - leaderUrlStr = uriToAddr(newLeader); - if (leaderWrapper != null && leaderUrlStr.equals(leaderWrapper.getLeaderInfo())) { - return true; - } - // create new Leader ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr, hostMapping); - leaderWrapper = - new LeaderWrapper( - leaderUrlStr, - PDGrpc.newBlockingStub(clientChannel), - PDGrpc.newStub(clientChannel), - System.nanoTime()); + pdClientWrapper = + new PDClientWrapper(leaderUrlStr, leaderUrlStr, clientChannel, System.nanoTime()); } catch (IllegalArgumentException e) { logger.error("Error updating leader. " + leaderUrlStr, e); return false; } - logger.info(String.format("Switched to new leader: %s", leaderWrapper)); + logger.info(String.format("Switched to new leader: %s", pdClientWrapper)); return true; } - public void updateLeader() { + synchronized boolean createFollowerClientWrapper(String followerUrlStr, String leaderUrls) { + // TODO: Why not strip protocol info on server side since grpc does not need it + + try { + if (!checkHealth(followerUrlStr, hostMapping)) { + return false; + } + + // create new Leader + ManagedChannel channel = channelFactory.getChannel(followerUrlStr, hostMapping); + pdClientWrapper = new PDClientWrapper(leaderUrls, followerUrlStr, channel, System.nanoTime()); + } catch (IllegalArgumentException e) { + logger.error("Error updating follower. " + followerUrlStr, e); + return false; + } + logger.info(String.format("Switched to new leader by follower forward: %s", pdClientWrapper)); + return true; + } + + public synchronized void updateLeaderOrforwardFollower() { for (URI url : this.pdAddrs) { // since resp is null, we need update leader's address by walking through all pd server. GetMembersResponse resp = getMembers(url); if (resp == null) { continue; } + if (resp.getLeader().getClientUrlsList().isEmpty()) { + continue; + } + + String leaderUrlStr = resp.getLeader().getClientUrlsList().get(0); + leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); + // if leader is switched, just return. - if (switchLeader(resp.getLeader().getClientUrlsList())) { + if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { + return; + } + + if (!conf.getEnableGrpcForward()) { + continue; + } + + List members = resp.getMembersList(); + + boolean hasReachNextMember = false; + // If we have not used follower forward, try the first follower. + if (pdClientWrapper != null && pdClientWrapper.getStoreAddress().equals(leaderUrlStr)) { + hasReachNextMember = true; + } + + for (int i = 0; i < members.size() * 2; i++) { + Pdpb.Member member = members.get(i % members.size()); + if (member.getMemberId() == resp.getLeader().getMemberId()) { + continue; + } + String followerUrlStr = member.getClientUrlsList().get(0); + followerUrlStr = uriToAddr(addrToUri(followerUrlStr)); + if (pdClientWrapper != null && pdClientWrapper.getStoreAddress().equals(followerUrlStr)) { + hasReachNextMember = true; + continue; + } + if (hasReachNextMember && createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) { + return; + } + } + } + if (pdClientWrapper == null) { + throw new TiClientInternalException( + "already tried all address on file, but not leader found yet."); + } + } + + public void tryUpdateLeader() { + for (URI url : this.pdAddrs) { + // since resp is null, we need update leader's address by walking through all pd server. + GetMembersResponse resp = getMembers(url); + if (resp == null) { + continue; + } + List urls = + resp.getMembersList() + .stream() + .map(mem -> addrToUri(mem.getClientUrls(0))) + .collect(Collectors.toList()); + String leaderUrlStr = resp.getLeader().getClientUrlsList().get(0); + leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); + + // If leader is not change but becomes available, we can cancel follower forward. + if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { + if (!urls.equals(this.pdAddrs)) { + tryUpdateMembers(urls); + } return; } } @@ -462,6 +480,10 @@ public class PDClient extends AbstractGRPCClient "already tried all address on file, but not leader found yet."); } + private synchronized void tryUpdateMembers(List members) { + this.pdAddrs = members; + } + public void updateTiFlashReplicaStatus() { ByteSequence prefix = ByteSequence.from(TIFLASH_TABLE_SYNC_PROGRESS_PATH, StandardCharsets.UTF_8); @@ -517,18 +539,18 @@ public class PDClient extends AbstractGRPCClient @Override protected PDBlockingStub getBlockingStub() { - if (leaderWrapper == null) { + if (pdClientWrapper == null) { throw new GrpcException("PDClient may not be initialized"); } - return leaderWrapper.getBlockingStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + return pdClientWrapper.getBlockingStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); } @Override protected PDStub getAsyncStub() { - if (leaderWrapper == null) { + if (pdClientWrapper == null) { throw new GrpcException("PDClient may not be initialized"); } - return leaderWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); } private void initCluster() { @@ -557,7 +579,16 @@ public class PDClient extends AbstractGRPCClient header = RequestHeader.newBuilder().setClusterId(clusterId).build(); tsoReq = TsoRequest.newBuilder().setHeader(header).setCount(1).build(); this.tiflashReplicaMap = new ConcurrentHashMap<>(); - createLeaderWrapper(resp.getLeader().getClientUrls(0)); + this.pdAddrs = + resp.getMembersList() + .stream() + .map(mem -> addrToUri(mem.getClientUrls(0))) + .collect(Collectors.toList()); + logger.info("init cluster with address: " + this.pdAddrs); + + String leaderUrlStr = resp.getLeader().getClientUrls(0); + leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); + createLeaderClientWrapper(leaderUrlStr); service = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() @@ -568,14 +599,14 @@ public class PDClient extends AbstractGRPCClient () -> { // Wrap this with a try catch block in case schedule update fails try { - updateLeader(); + tryUpdateLeader(); } catch (Exception e) { logger.warn("Update leader failed", e); } }, - 1, - 1, - TimeUnit.MINUTES); + 10, + 10, + TimeUnit.SECONDS); tiflashReplicaService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() @@ -586,20 +617,27 @@ public class PDClient extends AbstractGRPCClient this::updateTiFlashReplicaStatus, 10, 10, TimeUnit.SECONDS); } - static class LeaderWrapper { + static class PDClientWrapper { private final String leaderInfo; private final PDBlockingStub blockingStub; private final PDStub asyncStub; private final long createTime; + private final String storeAddress; - LeaderWrapper( - String leaderInfo, - PDGrpc.PDBlockingStub blockingStub, - PDGrpc.PDStub asyncStub, - long createTime) { + PDClientWrapper( + String leaderInfo, String storeAddress, ManagedChannel clientChannel, long createTime) { + if (!storeAddress.equals(leaderInfo)) { + Metadata header = new Metadata(); + header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString()); + this.blockingStub = + MetadataUtils.attachHeaders(PDGrpc.newBlockingStub(clientChannel), header); + this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newStub(clientChannel), header); + } else { + this.blockingStub = PDGrpc.newBlockingStub(clientChannel); + this.asyncStub = PDGrpc.newStub(clientChannel); + } this.leaderInfo = leaderInfo; - this.blockingStub = blockingStub; - this.asyncStub = asyncStub; + this.storeAddress = storeAddress; this.createTime = createTime; } @@ -607,6 +645,10 @@ public class PDClient extends AbstractGRPCClient return leaderInfo; } + String getStoreAddress() { + return storeAddress; + } + PDBlockingStub getBlockingStub() { return blockingStub; } diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index eab6850abe..373209900e 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -17,7 +17,6 @@ package org.tikv.common; import com.google.protobuf.ByteString; import java.util.List; -import java.util.concurrent.Future; import org.tikv.common.meta.TiTimestamp; import org.tikv.common.region.TiRegion; import org.tikv.common.util.BackOffer; @@ -40,8 +39,6 @@ public interface ReadOnlyPDClient { */ TiRegion getRegionByKey(BackOffer backOffer, ByteString key); - Future getRegionByKeyAsync(BackOffer backOffer, ByteString key); - /** * Get Region by Region Id * @@ -50,8 +47,6 @@ public interface ReadOnlyPDClient { */ TiRegion getRegionByID(BackOffer backOffer, long id); - Future getRegionByIDAsync(BackOffer backOffer, long id); - HostMapping getHostMapping(); /** @@ -62,8 +57,6 @@ public interface ReadOnlyPDClient { */ Store getStore(BackOffer backOffer, long storeId); - Future getStoreAsync(BackOffer backOffer, long storeId); - List getAllStores(BackOffer backOffer); TiConfiguration.ReplicaRead getReplicaRead(); diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index c38fd44bbc..cffafa53b4 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -35,6 +35,8 @@ public class TiConfiguration implements Serializable { private static final ConcurrentHashMap settings = new ConcurrentHashMap<>(); public static final Metadata.Key FORWARD_META_DATA_KEY = Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); + public static final Metadata.Key PD_FORWARD_META_DATA_KEY = + Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); static { loadFromSystemProperties(); diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 749db0dc6c..c647f7731c 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -92,6 +92,7 @@ public class TiSession implements AutoCloseable { this.collectorRegistry.register(RetryPolicy.GRPC_SINGLE_REQUEST_LATENCY); this.collectorRegistry.register(RegionManager.GET_REGION_BY_KEY_REQUEST_LATENCY); this.collectorRegistry.register(PDClient.PD_GET_REGION_BY_KEY_REQUEST_LATENCY); + this.enableGrpcForward = conf.getEnableGrpcForward(); this.server = new HTTPServer( new InetSocketAddress(conf.getMetricsPort()), this.collectorRegistry, true); diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index 7c79a3d6d8..0d280da5ee 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -48,7 +48,9 @@ public class PDErrorHandler implements ErrorHandler { @Override public boolean handleResponseError(BackOffer backOffer, RespT resp) { if (resp == null) { - return false; + String msg = String.format("PD Request Failed with unknown reason"); + logger.warn(msg); + return handleRequestError(backOffer, new GrpcException(msg)); } PDError error = getError.apply(resp); if (error != null) { @@ -56,7 +58,7 @@ public class PDErrorHandler implements ErrorHandler { case PD_ERROR: backOffer.doBackOff( BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString())); - client.updateLeader(); + client.updateLeaderOrforwardFollower(); return true; case REGION_PEER_NOT_ELECTED: logger.debug(error.getMessage()); @@ -73,7 +75,7 @@ public class PDErrorHandler implements ErrorHandler { @Override public boolean handleRequestError(BackOffer backOffer, Exception e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); - client.updateLeader(); + client.updateLeaderOrforwardFollower(); return true; } } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index c783c4f3b3..10744ee8d2 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -148,6 +148,7 @@ public class RegionErrorHandler implements ErrorHandler { String.format( "Key not in region [%s] for key [%s], this error should not happen here.", recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey))); + regionManager.clearRegionCache(); throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString())); } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index a2d90ce47a..2b28b3f087 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -108,12 +108,7 @@ public abstract class AbstractRegionStoreClient if (!conf.getEnableGrpcForward()) { return false; } - if (region.getProxyStore() != null) { - TiStore store = region.getProxyStore(); - if (!checkHealth(store) && store.markUnreachable()) { - this.regionManager.scheduleHealthCheckJob(store); - } - } else { + if (region.getProxyStore() == null) { if (!targetStore.isUnreachable()) { if (checkHealth(targetStore)) { return true; diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 128509408c..f2432aa908 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -85,7 +85,7 @@ public class RegionManager { UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient); this.storeChecker = storeChecker; this.executor = Executors.newScheduledThreadPool(1); - this.executor.scheduleAtFixedRate(storeChecker, 5, 5, TimeUnit.SECONDS); + this.executor.scheduleAtFixedRate(storeChecker, 10, 10, TimeUnit.SECONDS); } else { this.storeChecker = null; this.executor = null; @@ -194,27 +194,25 @@ public class RegionManager { } public synchronized TiRegion updateLeader(TiRegion region, long storeId) { - TiRegion r = cache.getRegionFromCache(region.getId()); - if (r != null) { - if (r.getLeader().getStoreId() == storeId) { - return r; - } - TiRegion newRegion = r.switchPeer(storeId); - if (newRegion != null) { - cache.putRegion(newRegion); - return newRegion; - } - // failed to switch leader, possibly region is outdated, we need to drop region cache from - // regionCache - logger.warn("Cannot find peer when updating leader (" + region.getId() + "," + storeId + ")"); + TiRegion newRegion = region.switchPeer(storeId); + if (cache.updateRegion(region, newRegion)) { + return newRegion; } + // failed to switch leader, possibly region is outdated, we need to drop region cache from + // regionCache + logger.warn("Cannot find peer when updating leader (" + region.getId() + "," + storeId + ")"); return null; } - public boolean updateRegion(TiRegion oldRegion, TiRegion region) { + public synchronized boolean updateRegion(TiRegion oldRegion, TiRegion region) { return cache.updateRegion(oldRegion, region); } + /** Clears all cache when some unexpected error occurs. */ + public void clearRegionCache() { + cache.clearAll(); + } + /** * Clears all cache when a TiKV server does not respond * diff --git a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java index 2c948b8371..c13d2ba0c1 100644 --- a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java +++ b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java @@ -11,6 +11,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.Metapb; public class UnreachableStoreChecker implements Runnable { private ConcurrentHashMap stores; @@ -68,6 +70,11 @@ public class UnreachableStoreChecker implements Runnable { this.stores.remove(Long.valueOf(store.getId())); continue; } + Metapb.Store newStore = + pdClient.getStore(ConcreteBackOffer.newRawKVBackOff(), store.getId()); + if (newStore.getState() == Metapb.StoreState.Tombstone) { + continue; + } this.taskQueue.add(store); } catch (Exception e) { this.taskQueue.add(store); diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index cd37bf7159..974df90e65 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -700,7 +700,7 @@ public class RawKVClient implements AutoCloseable { } catch (final TiKVException e) { // TODO: any elegant way to re-split the ranges if fails? backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); - logger.warn("ReSplitting ranges for BatchPutRequest"); + logger.debug("ReSplitting ranges for BatchPutRequest"); // retry return doSendBatchPutWithRefetchRegion(backOffer, batch); } @@ -758,7 +758,7 @@ public class RawKVClient implements AutoCloseable { } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); clientBuilder.getRegionManager().invalidateRegion(batch.getRegion()); - logger.warn("ReSplitting ranges for BatchGetRequest", e); + logger.debug("ReSplitting ranges for BatchGetRequest", e); // retry return Pair.create(doSendBatchGetWithRefetchRegion(backOffer, batch), new ArrayList<>()); @@ -799,7 +799,7 @@ public class RawKVClient implements AutoCloseable { } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); clientBuilder.getRegionManager().invalidateRegion(batch.getRegion()); - logger.warn("ReSplitting ranges for BatchGetRequest", e); + logger.debug("ReSplitting ranges for BatchGetRequest", e); // retry return doSendBatchDeleteWithRefetchRegion(backOffer, batch); @@ -849,7 +849,7 @@ public class RawKVClient implements AutoCloseable { } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); clientBuilder.getRegionManager().invalidateRegion(range.getRegion()); - logger.warn("ReSplitting ranges for BatchDeleteRangeRequest", e); + logger.debug("ReSplitting ranges for BatchDeleteRangeRequest", e); // retry return doSendDeleteRangeWithRefetchRegion(backOffer, range); diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index 14514601de..78dcb43875 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -18,7 +18,6 @@ package org.tikv.common; import static org.junit.Assert.*; import static org.tikv.common.GrpcUtils.encodeKey; -import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import java.util.concurrent.*; import org.junit.Test; @@ -37,7 +36,7 @@ public class PDClientTest extends PDMockServerTest { @Test public void testCreate() throws Exception { try (PDClient client = session.getPDClient()) { - assertEquals(client.getLeaderWrapper().getLeaderInfo(), LOCAL_ADDR + ":" + pdServer.port); + assertEquals(client.getPdClientWrapper().getLeaderInfo(), LOCAL_ADDR + ":" + pdServer.port); assertEquals(client.getHeader().getClusterId(), CLUSTER_ID); } } @@ -45,17 +44,16 @@ public class PDClientTest extends PDMockServerTest { @Test public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { - client.switchLeader(ImmutableList.of("http://" + LOCAL_ADDR + ":" + (pdServer.port + 1))); + client.trySwitchLeader("http://" + LOCAL_ADDR + ":" + (pdServer.port + 1)); assertEquals( - client.getLeaderWrapper().getLeaderInfo(), LOCAL_ADDR + ":" + (pdServer.port + 1)); + client.getPdClientWrapper().getLeaderInfo(), LOCAL_ADDR + ":" + (pdServer.port + 1)); } tearDown(); setUp(LOCAL_ADDR_IPV6); try (PDClient client = session.getPDClient()) { - client.switchLeader( - ImmutableList.of("http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2))); + client.trySwitchLeader("http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); assertEquals( - client.getLeaderWrapper().getLeaderInfo(), LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); + client.getPdClientWrapper().getLeaderInfo(), LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); } } @@ -95,33 +93,6 @@ public class PDClientTest extends PDMockServerTest { } } - @Test - public void testGetRegionByKeyAsync() throws Exception { - byte[] startKey = new byte[] {1, 0, 2, 4}; - byte[] endKey = new byte[] {1, 0, 2, 5}; - int confVer = 1026; - int ver = 1027; - pdServer.addGetRegionResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - 1, - encodeKey(startKey), - encodeKey(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); - try (PDClient client = session.getPDClient()) { - TiRegion r = client.getRegionByKeyAsync(defaultBackOff(), ByteString.EMPTY).get(); - assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); - assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); - assertEquals(r.getRegionEpoch().getConfVer(), confVer); - assertEquals(r.getRegionEpoch().getVersion(), ver); - assertEquals(r.getLeader().getId(), 1); - assertEquals(r.getLeader().getStoreId(), 10); - } - } - @Test public void testGetRegionById() throws Exception { byte[] startKey = new byte[] {1, 0, 2, 4}; @@ -150,33 +121,6 @@ public class PDClientTest extends PDMockServerTest { } } - @Test - public void testGetRegionByIdAsync() throws Exception { - byte[] startKey = new byte[] {1, 0, 2, 4}; - byte[] endKey = new byte[] {1, 0, 2, 5}; - int confVer = 1026; - int ver = 1027; - pdServer.addGetRegionByIDResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - 1, - encodeKey(startKey), - encodeKey(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); - try (PDClient client = session.getPDClient()) { - TiRegion r = client.getRegionByIDAsync(defaultBackOff(), 0).get(); - assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); - assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); - assertEquals(r.getRegionEpoch().getConfVer(), confVer); - assertEquals(r.getRegionEpoch().getVersion(), ver); - assertEquals(r.getLeader().getId(), 1); - assertEquals(r.getLeader().getStoreId(), 10); - } - } - @Test public void testGetStore() throws Exception { long storeId = 1; @@ -208,38 +152,6 @@ public class PDClientTest extends PDMockServerTest { } } - @Test - public void testGetStoreAsync() throws Exception { - long storeId = 1; - String testAddress = "testAddress"; - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); - try (PDClient client = session.getPDClient()) { - Store r = client.getStoreAsync(defaultBackOff(), 0).get(); - assertEquals(r.getId(), storeId); - assertEquals(r.getAddress(), testAddress); - assertEquals(r.getState(), Metapb.StoreState.Up); - assertEquals(r.getLabels(0).getKey(), "k1"); - assertEquals(r.getLabels(1).getKey(), "k2"); - assertEquals(r.getLabels(0).getValue(), "v1"); - assertEquals(r.getLabels(1).getValue(), "v2"); - - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); - assertEquals( - StoreState.Tombstone, client.getStoreAsync(defaultBackOff(), 0).get().getState()); - } - } - private BackOffer defaultBackOff() { return ConcreteBackOffer.newCustomBackOff(1000); }