[close #663] Select TiFlash Stores Round-Robin (#662)

This commit is contained in:
shi yuhang 2022-11-01 15:55:18 +08:00 committed by GitHub
parent 7aa2d940a8
commit 8cb7a8294c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 5 deletions

View File

@ -15,7 +15,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Check License Header
uses: apache/skywalking-eyes@main
uses: apache/skywalking-eyes@v0.3.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
@ -68,6 +69,7 @@ public class RegionManager {
private final TiConfiguration conf;
private final ScheduledExecutorService executor;
private final StoreHealthyChecker storeChecker;
private AtomicInteger tiflashStoreIndex = new AtomicInteger(0);
public RegionManager(
TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) {
@ -191,17 +193,21 @@ public class RegionManager {
Peer peer = region.getCurrentReplica();
store = getStoreById(peer.getStoreId(), backOffer);
} else {
outerLoop:
List<TiStore> tiflashStores = new ArrayList<>();
for (Peer peer : region.getLearnerList()) {
TiStore s = getStoreById(peer.getStoreId(), backOffer);
for (Metapb.StoreLabel label : s.getStore().getLabelsList()) {
if (label.getKey().equals(storeType.getLabelKey())
&& label.getValue().equals(storeType.getLabelValue())) {
store = s;
break outerLoop;
tiflashStores.add(s);
}
}
}
// select a tiflash with RR strategy
if (tiflashStores.size() > 0) {
store = tiflashStores.get(tiflashStoreIndex.getAndIncrement() % tiflashStores.size());
}
if (store == null) {
// clear the region cache, so we may get the learner peer next time
cache.invalidateRegion(region);

View File

@ -24,6 +24,7 @@ import java.util.Arrays;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.kvproto.Metapb.Peer;
import org.tikv.kvproto.Metapb.PeerRole;
import org.tikv.kvproto.Metapb.Region;
import org.tikv.kvproto.Metapb.RegionEpoch;
import org.tikv.kvproto.Metapb.Store;
@ -61,6 +62,10 @@ public class GrpcUtils {
return Peer.newBuilder().setStoreId(storeId).setId(id).build();
}
public static Peer makeLearnerPeer(long id, long storeId) {
return Peer.newBuilder().setRole(PeerRole.Learner).setStoreId(storeId).setId(id).build();
}
public static ByteString encodeKey(byte[] key) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key);

View File

@ -31,6 +31,7 @@ import org.tikv.common.key.Key;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
@ -135,7 +136,7 @@ public class RegionManagerTest extends PDMockServerTest {
Pair<TiRegion, TiStore> pair = mgr.getRegionStorePairByKey(searchKey);
assertEquals(pair.first.getId(), regionId);
assertEquals(pair.first.getId(), storeId);
assertEquals(pair.second.getId(), 10);
}
@Test
@ -179,4 +180,52 @@ public class RegionManagerTest extends PDMockServerTest {
} catch (Exception ignored) {
}
}
@Test
public void getRegionStorePairByKeyWithTiFlash() {
ByteString startKey = ByteString.copyFrom(new byte[] {1});
ByteString endKey = ByteString.copyFrom(new byte[] {10});
ByteString searchKey = ByteString.copyFrom(new byte[] {5});
String testAddress = "testAddress";
long firstStoreId = 233;
long secondStoreId = 234;
int confVer = 1026;
int ver = 1027;
long regionId = 233;
leader.addGetRegionListener(
request ->
GrpcUtils.makeGetRegionResponse(
leader.getClusterId(),
GrpcUtils.makeRegion(
regionId,
GrpcUtils.encodeKey(startKey.toByteArray()),
GrpcUtils.encodeKey(endKey.toByteArray()),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makeLearnerPeer(1, firstStoreId),
GrpcUtils.makeLearnerPeer(2, secondStoreId))));
AtomicInteger i = new AtomicInteger(0);
long[] ids = new long[] {firstStoreId, secondStoreId};
leader.addGetStoreListener(
(request ->
GrpcUtils.makeGetStoreResponse(
leader.getClusterId(),
GrpcUtils.makeStore(
ids[i.getAndIncrement()],
testAddress,
StoreState.Up,
GrpcUtils.makeStoreLabel("engine", "tiflash"),
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2")))));
Pair<TiRegion, TiStore> pair = mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash);
assertEquals(pair.first.getId(), regionId);
assertEquals(pair.second.getId(), firstStoreId);
Pair<TiRegion, TiStore> secondPair =
mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash);
assertEquals(secondPair.first.getId(), regionId);
assertEquals(secondPair.second.getId(), secondStoreId);
}
}