From 8cb7a8294c4c26f0080192e338ad0b9188d50e31 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Tue, 1 Nov 2022 15:55:18 +0800 Subject: [PATCH] [close #663] Select TiFlash Stores Round-Robin (#662) --- .github/workflows/license-checker.yml | 2 +- .../org/tikv/common/region/RegionManager.java | 12 +++-- src/test/java/org/tikv/common/GrpcUtils.java | 5 ++ .../org/tikv/common/RegionManagerTest.java | 51 ++++++++++++++++++- 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/.github/workflows/license-checker.yml b/.github/workflows/license-checker.yml index 4e1cf90a4f..cd5c12f84c 100644 --- a/.github/workflows/license-checker.yml +++ b/.github/workflows/license-checker.yml @@ -15,7 +15,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Check License Header - uses: apache/skywalking-eyes@main + uses: apache/skywalking-eyes@v0.3.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 44c8137510..45cfc5160d 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; @@ -68,6 +69,7 @@ public class RegionManager { private final TiConfiguration conf; private final ScheduledExecutorService executor; private final StoreHealthyChecker storeChecker; + private AtomicInteger tiflashStoreIndex = new AtomicInteger(0); public RegionManager( TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { @@ -191,17 +193,21 @@ public class RegionManager { Peer peer = region.getCurrentReplica(); store = getStoreById(peer.getStoreId(), backOffer); } else { - outerLoop: + List tiflashStores = new ArrayList<>(); for (Peer peer : region.getLearnerList()) { TiStore s = getStoreById(peer.getStoreId(), backOffer); for (Metapb.StoreLabel label : s.getStore().getLabelsList()) { if (label.getKey().equals(storeType.getLabelKey()) && label.getValue().equals(storeType.getLabelValue())) { - store = s; - break outerLoop; + tiflashStores.add(s); } } } + // select a tiflash with RR strategy + if (tiflashStores.size() > 0) { + store = tiflashStores.get(tiflashStoreIndex.getAndIncrement() % tiflashStores.size()); + } + if (store == null) { // clear the region cache, so we may get the learner peer next time cache.invalidateRegion(region); diff --git a/src/test/java/org/tikv/common/GrpcUtils.java b/src/test/java/org/tikv/common/GrpcUtils.java index e6793f01f1..e7a268f6c3 100644 --- a/src/test/java/org/tikv/common/GrpcUtils.java +++ b/src/test/java/org/tikv/common/GrpcUtils.java @@ -24,6 +24,7 @@ import java.util.Arrays; import org.tikv.common.codec.Codec.BytesCodec; import org.tikv.common.codec.CodecDataOutput; import org.tikv.kvproto.Metapb.Peer; +import org.tikv.kvproto.Metapb.PeerRole; import org.tikv.kvproto.Metapb.Region; import org.tikv.kvproto.Metapb.RegionEpoch; import org.tikv.kvproto.Metapb.Store; @@ -61,6 +62,10 @@ public class GrpcUtils { return Peer.newBuilder().setStoreId(storeId).setId(id).build(); } + public static Peer makeLearnerPeer(long id, long storeId) { + return Peer.newBuilder().setRole(PeerRole.Learner).setStoreId(storeId).setId(id).build(); + } + public static ByteString encodeKey(byte[] key) { CodecDataOutput cdo = new CodecDataOutput(); BytesCodec.writeBytes(cdo, key); diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index 6052640f9b..eddd22a6c6 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -31,6 +31,7 @@ import org.tikv.common.key.Key; import org.tikv.common.region.RegionManager; import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiStore; +import org.tikv.common.region.TiStoreType; import org.tikv.common.util.KeyRangeUtils; import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; @@ -135,7 +136,7 @@ public class RegionManagerTest extends PDMockServerTest { Pair pair = mgr.getRegionStorePairByKey(searchKey); assertEquals(pair.first.getId(), regionId); - assertEquals(pair.first.getId(), storeId); + assertEquals(pair.second.getId(), 10); } @Test @@ -179,4 +180,52 @@ public class RegionManagerTest extends PDMockServerTest { } catch (Exception ignored) { } } + + @Test + public void getRegionStorePairByKeyWithTiFlash() { + + ByteString startKey = ByteString.copyFrom(new byte[] {1}); + ByteString endKey = ByteString.copyFrom(new byte[] {10}); + ByteString searchKey = ByteString.copyFrom(new byte[] {5}); + String testAddress = "testAddress"; + long firstStoreId = 233; + long secondStoreId = 234; + int confVer = 1026; + int ver = 1027; + long regionId = 233; + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + regionId, + GrpcUtils.encodeKey(startKey.toByteArray()), + GrpcUtils.encodeKey(endKey.toByteArray()), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makeLearnerPeer(1, firstStoreId), + GrpcUtils.makeLearnerPeer(2, secondStoreId)))); + + AtomicInteger i = new AtomicInteger(0); + long[] ids = new long[] {firstStoreId, secondStoreId}; + leader.addGetStoreListener( + (request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("engine", "tiflash"), + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); + + Pair pair = mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash); + assertEquals(pair.first.getId(), regionId); + assertEquals(pair.second.getId(), firstStoreId); + + Pair secondPair = + mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash); + assertEquals(secondPair.first.getId(), regionId); + assertEquals(secondPair.second.getId(), secondStoreId); + } }