diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index a9c910fcb4..1dc6c321fc 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -277,19 +277,27 @@ public abstract class AbstractRegionStoreClient List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); - ManagedChannel channel = - channelFactory.getChannel( - peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); - TikvGrpc.TikvFutureStub stub = - TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); - Kvrpcpb.RawGetRequest rawGetRequest = - Kvrpcpb.RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(peer)) - .setKey(key) - .build(); - ListenableFuture task = stub.rawGet(rawGetRequest); - responses.add(new SwitchLeaderTask(task, peer)); + try { + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); + ManagedChannel channel = + channelFactory.getChannel( + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(peer)) + .setKey(key) + .build(); + ListenableFuture task = stub.rawGet(rawGetRequest); + responses.add(new SwitchLeaderTask(task, peer)); + } catch (Exception e) { + logger.warn( + "switch region[{}] leader store to {} failed: {}", + region.getId(), + peer.getStoreId(), + e); + } } while (true) { try { @@ -328,22 +336,31 @@ public abstract class AbstractRegionStoreClient List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); - ManagedChannel channel = - channelFactory.getChannel( - peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); - TikvGrpc.TikvFutureStub stub = - TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS); - Metadata header = new Metadata(); - header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); - Kvrpcpb.RawGetRequest rawGetRequest = - Kvrpcpb.RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(region.getLeader())) - .setKey(key) - .build(); - ListenableFuture task = - MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); - responses.add(new ForwardCheckTask(task, peerStore.getStore())); + try { + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); + ManagedChannel channel = + channelFactory.getChannel( + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel) + .withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS); + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(region.getLeader())) + .setKey(key) + .build(); + ListenableFuture task = + MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); + responses.add(new ForwardCheckTask(task, peerStore.getStore())); + } catch (Exception e) { + logger.warn( + "switch region[{}] leader store to {} failed: {}", + region.getId(), + peer.getStoreId(), + e); + } } while (true) { try { diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 35803c0d59..75a23f30a8 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; @@ -47,6 +46,7 @@ import org.tikv.kvproto.Pdpb; @SuppressWarnings("UnstableApiUsage") public class RegionManager { + private static final Logger logger = LoggerFactory.getLogger(RegionManager.class); public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY = HistogramUtils.buildDuration() @@ -205,22 +205,23 @@ public class RegionManager { } public TiRegion createRegion(Metapb.Region region, BackOffer backOffer) { - List peers = region.getPeersList(); - List stores = getRegionStore(peers, backOffer); - return new TiRegion(conf, region, null, peers, stores); + return createRegion(region, null, backOffer); } private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { - List peers = region.getPeersList(); - List stores = getRegionStore(peers, backOffer); - return new TiRegion(conf, region, leader, peers, stores); - } - - private List getRegionStore(List peers, BackOffer backOffer) { - return peers - .stream() - .map(p -> getStoreById(p.getStoreId(), backOffer)) - .collect(Collectors.toList()); + List peers = new ArrayList<>(); + List stores = new ArrayList<>(); + for (Metapb.Peer peer : region.getPeersList()) { + try { + stores.add(getStoreById(peer.getStoreId(), backOffer)); + peers.add(peer); + } catch (Exception e) { + logger.warn("Store {} not found: {}", peer.getStoreId(), e.toString()); + } + } + Metapb.Region newRegion = + Metapb.Region.newBuilder().mergeFrom(region).clearPeers().addAllPeers(peers).build(); + return new TiRegion(conf, newRegion, leader, peers, stores); } private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) { diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index d1608e05a4..3ba0e37439 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -41,7 +41,7 @@ import org.tikv.kvproto.Metapb.StoreState; public class PDClientMockTest extends PDMockServerTest { - private static final String LOCAL_ADDR_IPV6 = "[::]"; + private static final String LOCAL_ADDR_IPV6 = "[::1]"; public static final String HTTP = "http://"; @Test diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 30f4f9dd65..1fd84f5ddb 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -39,7 +39,7 @@ public abstract class PDMockServerTest { void setup(String addr) throws IOException { int basePort; - try (ServerSocket s = new ServerSocket(51820)) { + try (ServerSocket s = new ServerSocket(0)) { basePort = s.getLocalPort(); } @@ -54,9 +54,11 @@ public abstract class PDMockServerTest { GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)), GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2)))); pdServers.add(server); + if (i == 0) { + leader = server; + } } - leader = pdServers.get(0); TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port); conf.setKvMode("RAW"); conf.setWarmUpEnable(false); diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java index 08dbe01dd3..201f10986f 100644 --- a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -18,9 +18,13 @@ package org.tikv.common; import com.google.protobuf.ByteString; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; import org.tikv.common.KVMockServer.State; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.StoreState; +import org.tikv.kvproto.Pdpb; import org.tikv.raw.RawKVClient; public class SeekLeaderStoreTest extends MockThreeStoresTest { @@ -34,12 +38,41 @@ public class SeekLeaderStoreTest extends MockThreeStoresTest { RawKVClient client = createClient(); ByteString key = ByteString.copyFromUtf8("key"); ByteString value = ByteString.copyFromUtf8("value"); + put(key, value); - client.put(key, value); Assert.assertEquals(value, client.get(key).get()); servers.get(0).setState(State.Fail); servers.get(1).setRegion(region.switchPeer(stores.get(1).getId())); Assert.assertEquals(value, client.get(key).get()); + + remove(key, value); + } + + @Test + public void testSeekLeaderMeetInvalidStore() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + + put(key, value); + + servers.get(0).setState(State.Fail); + servers.get(2).setRegion(region.switchPeer(stores.get(2).getId())); + + AtomicInteger i = new AtomicInteger(0); + leader.addGetStoreListener( + request -> { + Metapb.Store.Builder storeBuilder = + Metapb.Store.newBuilder().mergeFrom(stores.get((int) request.getStoreId() - 1)); + if (request.getStoreId() == 0x2 && i.incrementAndGet() > 0) { + storeBuilder.setState(StoreState.Tombstone); + } + return Pdpb.GetStoreResponse.newBuilder().setStore(storeBuilder.build()).build(); + }); + + Assert.assertEquals(value, client.get(key).get()); + + remove(key, value); } }