[to #439] introduce MockThreeStoresTest to onStoreUnreachable (#518)

Co-authored-by: Jian Zhang <zjsariel@gmail.com>
This commit is contained in:
iosmanthus 2022-02-21 16:14:22 +08:00 committed by GitHub
parent 2ac49bb92c
commit d24b8e9da7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 520 additions and 174 deletions

View File

@ -22,7 +22,12 @@ import static org.tikv.common.util.ClientUtils.groupKeysByRegion;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@ -34,8 +39,15 @@ import org.tikv.common.importer.ImporterStoreClient;
import org.tikv.common.importer.SwitchTiKVModeClient;
import org.tikv.common.key.Key;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.*;
import org.tikv.common.util.*;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Metapb;

View File

@ -29,7 +29,7 @@ import io.prometheus.client.Histogram;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
@ -48,6 +48,7 @@ import org.tikv.kvproto.Tracepb;
public abstract class AbstractRegionStoreClient
extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvFutureStub>
implements RegionErrorReceiver {
private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class);
public static final Histogram SEEK_LEADER_STORE_DURATION =
@ -230,6 +231,7 @@ public abstract class AbstractRegionStoreClient
// switch to leader store
store = currentLeaderStore;
updateClientStub();
return true;
}
return false;
}
@ -374,6 +376,7 @@ public abstract class AbstractRegionStoreClient
}
private static class SwitchLeaderTask {
private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
private final Metapb.Peer peer;
@ -384,6 +387,7 @@ public abstract class AbstractRegionStoreClient
}
private static class ForwardCheckTask {
private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
private final Metapb.Store store;

View File

@ -40,6 +40,7 @@ import org.tikv.kvproto.Metapb.Peer;
import org.tikv.kvproto.Metapb.Region;
public class TiRegion implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(TiRegion.class);
private final Region meta;
@ -269,6 +270,7 @@ public class TiRegion implements Serializable {
}
public class RegionVerID {
final long id;
final long confVer;
final long ver;

View File

@ -27,10 +27,22 @@ import com.pingcap.tidb.tipb.SelectResponse;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.health.v1.HealthGrpc.HealthImplBase;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Coprocessor;
@ -45,9 +57,11 @@ import org.tikv.kvproto.TikvGrpc;
public class KVMockServer extends TikvGrpc.TikvImplBase {
private static final Logger logger = LoggerFactory.getLogger(KVMockServer.class);
private int port;
private Server server;
private TiRegion region;
private State state = State.Normal;
private final TreeMap<Key, ByteString> dataMap = new TreeMap<>();
private final Map<ByteString, Integer> errorMap = new HashMap<>();
@ -64,10 +78,23 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
public static final int STORE_NOT_MATCH = 9;
public static final int RAFT_ENTRY_TOO_LARGE = 10;
public enum State {
Normal,
Fail
}
public void setState(State state) {
this.state = state;
}
public int getPort() {
return port;
}
public void setRegion(TiRegion region) {
this.region = region;
}
public void put(ByteString key, ByteString value) {
dataMap.put(toRawKey(key), value);
}
@ -97,7 +124,7 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
if (context.getRegionId() != region.getId()
|| !context.getRegionEpoch().equals(region.getRegionEpoch())
|| !context.getPeer().equals(region.getLeader())) {
throw new Exception();
throw new Exception("context doesn't match");
}
}
@ -106,6 +133,11 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
org.tikv.kvproto.Kvrpcpb.RawGetRequest request,
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.RawGetResponse> responseObserver) {
try {
switch (state) {
case Fail:
throw new Exception(State.Fail.toString());
default:
}
verifyContext(request.getContext());
ByteString key = request.getKey();
@ -116,7 +148,12 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
setErrorInfo(errorCode, errBuilder);
builder.setRegionError(errBuilder.build());
} else {
builder.setValue(dataMap.get(toRawKey(key)));
Key rawKey = toRawKey(key);
ByteString value = dataMap.get(rawKey);
if (value == null) {
value = ByteString.EMPTY;
}
builder.setValue(value);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
@ -139,7 +176,6 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
if (errorCode != null) {
setErrorInfo(errorCode, errBuilder);
builder.setRegionError(errBuilder.build());
// builder.setError("");
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
@ -349,16 +385,35 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
}
public int start(TiRegion region) throws IOException {
int port;
try (ServerSocket s = new ServerSocket(0)) {
port = s.getLocalPort();
}
server = ServerBuilder.forPort(port).addService(this).build().start();
this.region = region;
Runtime.getRuntime().addShutdownHook(new Thread(KVMockServer.this::stop));
start(region, port);
return port;
}
private static class HealCheck extends HealthImplBase {
@Override
public void check(
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
responseObserver.onNext(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
responseObserver.onCompleted();
}
}
public void start(TiRegion region, int port) throws IOException {
this.port = port;
this.region = region;
logger.info("start mock server on port: " + port);
server =
ServerBuilder.forPort(port).addService(new HealCheck()).addService(this).build().start();
Runtime.getRuntime().addShutdownHook(new Thread(KVMockServer.this::stop));
}
public void stop() {
if (server != null) {
server.shutdown();

View File

@ -29,14 +29,15 @@ import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb;
public class MockServerTest extends PDMockServerTest {
public KVMockServer server;
public int port;
public TiRegion region;
@Before
@Override
public void setUp() throws IOException {
super.setUp();
public void setup() throws IOException {
super.setup();
Metapb.Region r =
Metapb.Region.newBuilder()
@ -62,9 +63,11 @@ public class MockServerTest extends PDMockServerTest {
r.getPeers(0),
r.getPeersList(),
s.stream().map(TiStore::new).collect(Collectors.toList()));
pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
leader.addGetRegionListener(
request -> Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
for (Metapb.Store store : s) {
pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
leader.addGetStoreListener(
(request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
}
server = new KVMockServer();
port = server.start(region);

View File

@ -0,0 +1,129 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb;
public class MockThreeStoresTest extends PDMockServerTest {
protected TiRegion region;
protected List<KVMockServer> servers = new ArrayList<>();
protected List<Metapb.Store> stores;
@Before
@Override
public void setup() throws IOException {
super.setup();
int basePort;
try (ServerSocket s = new ServerSocket(0)) {
basePort = s.getLocalPort();
}
ImmutableList<Metapb.Peer> peers =
ImmutableList.of(
Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1).build(),
Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2).build(),
Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3).build());
Metapb.Region region =
Metapb.Region.newBuilder()
.setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
.setId(0xff)
.setStartKey(ByteString.EMPTY)
.setEndKey(ByteString.EMPTY)
.addAllPeers(peers)
.build();
stores =
ImmutableList.of(
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + basePort)
.setVersion("5.0.0")
.setId(0x1)
.build(),
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + (basePort + 1))
.setVersion("5.0.0")
.setId(0x2)
.build(),
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + (basePort + 2))
.setVersion("5.0.0")
.setId(0x3)
.build());
for (PDMockServer server : pdServers) {
server.addGetRegionListener(
request ->
Pdpb.GetRegionResponse.newBuilder()
.setLeader(peers.get(0))
.setRegion(region)
.build());
server.addGetStoreListener(
(request) -> {
int i = (int) request.getStoreId() - 1;
return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build();
});
}
this.region =
new TiRegion(
session.getConf(),
region,
region.getPeers(0),
region.getPeersList(),
stores.stream().map(TiStore::new).collect(Collectors.toList()));
for (int i = 0; i < 3; i++) {
KVMockServer server = new KVMockServer();
server.start(this.region, basePort + i);
servers.add(server);
}
}
public void put(ByteString key, ByteString value) {
for (KVMockServer server : servers) {
server.put(key, value);
}
}
public void remove(ByteString key, ByteString value) {
for (KVMockServer server : servers) {
server.remove(key);
}
}
@After
public void tearDown() {
for (KVMockServer server : servers) {
server.stop();
}
}
}

View File

@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.meta.TiTimestamp;
@ -46,7 +47,7 @@ public class PDClientMockTest extends PDMockServerTest {
@Test
public void testCreate() throws Exception {
try (PDClient client = session.getPDClient()) {
assertEquals(LOCAL_ADDR + ":" + pdServer.port, client.getPdClientWrapper().getLeaderInfo());
assertEquals(LOCAL_ADDR + ":" + leader.port, client.getPdClientWrapper().getLeaderInfo());
assertEquals(CLUSTER_ID, client.getHeader().getClusterId());
}
}
@ -54,18 +55,17 @@ public class PDClientMockTest extends PDMockServerTest {
@Test
public void testSwitchLeader() throws Exception {
try (PDClient client = session.getPDClient()) {
client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (pdServer.port + 1));
client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1));
assertEquals(
client.getPdClientWrapper().getLeaderInfo(),
HTTP + LOCAL_ADDR + ":" + (pdServer.port + 1));
client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (leader.port + 1));
}
tearDown();
setUp(LOCAL_ADDR_IPV6);
setup(LOCAL_ADDR_IPV6);
try (PDClient client = session.getPDClient()) {
client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2));
client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2));
assertEquals(
client.getPdClientWrapper().getLeaderInfo(),
HTTP + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2));
HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2));
}
}
@ -84,16 +84,17 @@ public class PDClientMockTest extends PDMockServerTest {
byte[] endKey = new byte[] {1, 0, 2, 5};
int confVer = 1026;
int ver = 1027;
pdServer.addGetRegionResp(
GrpcUtils.makeGetRegionResponse(
pdServer.getClusterId(),
GrpcUtils.makeRegion(
1,
ByteString.copyFrom(startKey),
ByteString.copyFrom(endKey),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
leader.addGetRegionListener(
request ->
GrpcUtils.makeGetRegionResponse(
leader.getClusterId(),
GrpcUtils.makeRegion(
1,
ByteString.copyFrom(startKey),
ByteString.copyFrom(endKey),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
try (PDClient client = session.getPDClient()) {
Pair<Metapb.Region, Metapb.Peer> rl =
client.getRegionByKey(defaultBackOff(), ByteString.EMPTY);
@ -115,16 +116,17 @@ public class PDClientMockTest extends PDMockServerTest {
int confVer = 1026;
int ver = 1027;
pdServer.addGetRegionByIDResp(
GrpcUtils.makeGetRegionResponse(
pdServer.getClusterId(),
GrpcUtils.makeRegion(
1,
ByteString.copyFrom(startKey),
ByteString.copyFrom(endKey),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
leader.addGetRegionByIDListener(
request ->
GrpcUtils.makeGetRegionResponse(
leader.getClusterId(),
GrpcUtils.makeRegion(
1,
ByteString.copyFrom(startKey),
ByteString.copyFrom(endKey),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
try (PDClient client = session.getPDClient()) {
Pair<Metapb.Region, Metapb.Peer> rl = client.getRegionByID(defaultBackOff(), 0);
Metapb.Region r = rl.first;
@ -142,15 +144,16 @@ public class PDClientMockTest extends PDMockServerTest {
public void testGetStore() throws Exception {
long storeId = 1;
String testAddress = "testAddress";
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(),
GrpcUtils.makeStore(
storeId,
testAddress,
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
leader.addGetStoreListener(
request ->
GrpcUtils.makeGetStoreResponse(
leader.getClusterId(),
GrpcUtils.makeStore(
storeId,
testAddress,
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
try (PDClient client = session.getPDClient()) {
Store r = client.getStore(defaultBackOff(), storeId);
assertEquals(storeId, r.getId());
@ -161,10 +164,11 @@ public class PDClientMockTest extends PDMockServerTest {
assertEquals("v1", r.getLabels(0).getValue());
assertEquals("v2", r.getLabels(1).getValue());
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(),
GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone)));
leader.addGetStoreListener(
request ->
GrpcUtils.makeGetStoreResponse(
leader.getClusterId(),
GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone)));
assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState());
}
}
@ -177,11 +181,16 @@ public class PDClientMockTest extends PDMockServerTest {
public void testRetryPolicy() throws Exception {
long storeId = 1024;
ExecutorService service = Executors.newCachedThreadPool();
pdServer.addGetStoreResp(null);
pdServer.addGetStoreResp(null);
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)));
AtomicInteger i = new AtomicInteger();
leader.addGetStoreListener(
request -> {
if (i.getAndIncrement() < 2) {
return null;
} else {
return GrpcUtils.makeGetStoreResponse(
leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up));
}
});
try (PDClient client = session.getPDClient()) {
Callable<Store> storeCallable =
() -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0);
@ -194,16 +203,17 @@ public class PDClientMockTest extends PDMockServerTest {
}
// Should fail
pdServer.addGetStoreResp(null);
pdServer.addGetStoreResp(null);
pdServer.addGetStoreResp(null);
pdServer.addGetStoreResp(null);
pdServer.addGetStoreResp(null);
pdServer.addGetStoreResp(null);
AtomicInteger j = new AtomicInteger();
leader.addGetStoreListener(
request -> {
if (j.getAndIncrement() < 6) {
return null;
} else {
return GrpcUtils.makeGetStoreResponse(
leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up));
}
});
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)));
try {
client.getStore(defaultBackOff(), 0);
} catch (GrpcException e) {

View File

@ -23,29 +23,38 @@ import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Deque;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Function;
import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.Pdpb.*;
import org.tikv.kvproto.Pdpb.GetMembersRequest;
import org.tikv.kvproto.Pdpb.GetMembersResponse;
import org.tikv.kvproto.Pdpb.GetRegionByIDRequest;
import org.tikv.kvproto.Pdpb.GetRegionRequest;
import org.tikv.kvproto.Pdpb.GetRegionResponse;
import org.tikv.kvproto.Pdpb.GetStoreRequest;
import org.tikv.kvproto.Pdpb.GetStoreResponse;
import org.tikv.kvproto.Pdpb.TsoRequest;
import org.tikv.kvproto.Pdpb.TsoResponse;
public class PDMockServer extends PDGrpc.PDImplBase {
public int port;
private long clusterId;
private Server server;
public void addGetMemberResp(GetMembersResponse r) {
getMembersResp.addLast(Optional.ofNullable(r));
}
private Function<GetMembersRequest, GetMembersResponse> getMembersListener;
private Function<GetStoreRequest, GetStoreResponse> getStoreListener;
private Function<GetRegionRequest, GetRegionResponse> getRegionListener;
private Function<GetRegionByIDRequest, GetRegionResponse> getRegionByIDListener;
private final Deque<java.util.Optional<GetMembersResponse>> getMembersResp =
new LinkedBlockingDeque<java.util.Optional<GetMembersResponse>>();
public void addGetMembersListener(Function<GetMembersRequest, GetMembersResponse> func) {
getMembersListener = func;
}
@Override
public void getMembers(GetMembersRequest request, StreamObserver<GetMembersResponse> resp) {
try {
resp.onNext(getMembersResp.getFirst().get());
resp.onNext(Optional.ofNullable(getMembersListener.apply(request)).get());
resp.onCompleted();
} catch (Exception e) {
resp.onError(Status.INTERNAL.asRuntimeException());
@ -72,47 +81,42 @@ public class PDMockServer extends PDGrpc.PDImplBase {
};
}
public void addGetRegionResp(GetRegionResponse r) {
getRegionResp.addLast(r);
public void addGetRegionListener(Function<GetRegionRequest, GetRegionResponse> func) {
getRegionListener = func;
}
private final Deque<GetRegionResponse> getRegionResp = new LinkedBlockingDeque<>();
@Override
public void getRegion(GetRegionRequest request, StreamObserver<GetRegionResponse> resp) {
try {
resp.onNext(getRegionResp.removeFirst());
resp.onNext(getRegionListener.apply(request));
resp.onCompleted();
} catch (Exception e) {
resp.onError(Status.INTERNAL.asRuntimeException());
}
}
public void addGetRegionByIDResp(GetRegionResponse r) {
getRegionByIDResp.addLast(r);
public void addGetRegionByIDListener(Function<GetRegionByIDRequest, GetRegionResponse> func) {
getRegionByIDListener = func;
}
private final Deque<GetRegionResponse> getRegionByIDResp = new LinkedBlockingDeque<>();
@Override
public void getRegionByID(GetRegionByIDRequest request, StreamObserver<GetRegionResponse> resp) {
try {
resp.onNext(getRegionByIDResp.removeFirst());
resp.onNext(getRegionByIDListener.apply(request));
resp.onCompleted();
} catch (Exception e) {
resp.onError(Status.INTERNAL.asRuntimeException());
}
}
public void addGetStoreResp(GetStoreResponse r) {
getStoreResp.addLast(Optional.ofNullable(r));
public void addGetStoreListener(Function<GetStoreRequest, GetStoreResponse> func) {
getStoreListener = func;
}
private final Deque<Optional<GetStoreResponse>> getStoreResp = new LinkedBlockingDeque<>();
@Override
public void getStore(GetStoreRequest request, StreamObserver<GetStoreResponse> resp) {
try {
resp.onNext(getStoreResp.removeFirst().get());
resp.onNext(Optional.ofNullable(getStoreListener.apply(request)).get());
resp.onCompleted();
} catch (Exception e) {
resp.onError(Status.INTERNAL.asRuntimeException());
@ -120,10 +124,16 @@ public class PDMockServer extends PDGrpc.PDImplBase {
}
public void start(long clusterId) throws IOException {
int port;
try (ServerSocket s = new ServerSocket(0)) {
port = s.getLocalPort();
}
start(clusterId, port);
}
public void start(long clusterId, int port) throws IOException {
this.clusterId = clusterId;
this.port = port;
server = ServerBuilder.forPort(port).addService(this).build().start();
Runtime.getRuntime().addShutdownHook(new Thread(PDMockServer.this::stop));

View File

@ -18,40 +18,58 @@
package org.tikv.common;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Before;
public abstract class PDMockServerTest {
protected static final String LOCAL_ADDR = "127.0.0.1";
static final long CLUSTER_ID = 1024;
protected static TiSession session;
protected PDMockServer pdServer;
protected TiSession session;
protected PDMockServer leader;
protected List<PDMockServer> pdServers = new ArrayList<>();
@Before
public void setUp() throws IOException {
setUp(LOCAL_ADDR);
public void setup() throws IOException {
setup(LOCAL_ADDR);
}
void setUp(String addr) throws IOException {
pdServer = new PDMockServer();
pdServer.start(CLUSTER_ID);
pdServer.addGetMemberResp(
GrpcUtils.makeGetMembersResponse(
pdServer.getClusterId(),
GrpcUtils.makeMember(1, "http://" + addr + ":" + pdServer.port),
GrpcUtils.makeMember(2, "http://" + addr + ":" + (pdServer.port + 1)),
GrpcUtils.makeMember(3, "http://" + addr + ":" + (pdServer.port + 2))));
TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + pdServer.port);
conf.setEnableGrpcForward(false);
void setup(String addr) throws IOException {
int basePort;
try (ServerSocket s = new ServerSocket(51820)) {
basePort = s.getLocalPort();
}
for (int i = 0; i < 3; i++) {
PDMockServer server = new PDMockServer();
server.start(CLUSTER_ID, basePort + i);
server.addGetMembersListener(
(request) ->
GrpcUtils.makeGetMembersResponse(
server.getClusterId(),
GrpcUtils.makeMember(1, "http://" + addr + ":" + basePort),
GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)),
GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2))));
pdServers.add(server);
}
leader = pdServers.get(0);
TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port);
conf.setKvMode("RAW");
conf.setTest(true);
conf.setWarmUpEnable(false);
conf.setTimeout(2000);
conf.setEnableGrpcForward(true);
session = TiSession.create(conf);
}
@After
public void tearDown() throws Exception {
session.close();
pdServer.stop();
for (PDMockServer server : pdServers) {
server.stop();
}
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
import org.tikv.common.key.Key;
@ -36,12 +37,13 @@ import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.StoreState;
public class RegionManagerTest extends PDMockServerTest {
private RegionManager mgr;
@Before
@Override
public void setUp() throws IOException {
super.setUp();
public void setup() throws IOException {
super.setup();
mgr = session.getRegionManager();
}
@ -64,27 +66,30 @@ public class RegionManagerTest extends PDMockServerTest {
int ver = 1027;
long regionId = 233;
String testAddress = "127.0.0.1";
pdServer.addGetRegionResp(
GrpcUtils.makeGetRegionResponse(
pdServer.getClusterId(),
GrpcUtils.makeRegion(
regionId,
GrpcUtils.encodeKey(startKey.toByteArray()),
GrpcUtils.encodeKey(endKey.toByteArray()),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
for (long id : new long[] {10, 20}) {
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(),
GrpcUtils.makeStore(
id,
testAddress,
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
}
leader.addGetRegionListener(
request ->
GrpcUtils.makeGetRegionResponse(
leader.getClusterId(),
GrpcUtils.makeRegion(
regionId,
GrpcUtils.encodeKey(startKey.toByteArray()),
GrpcUtils.encodeKey(endKey.toByteArray()),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
AtomicInteger i = new AtomicInteger(0);
long[] ids = new long[] {10, 20};
leader.addGetStoreListener(
(request ->
GrpcUtils.makeGetStoreResponse(
leader.getClusterId(),
GrpcUtils.makeStore(
ids[i.getAndIncrement()],
testAddress,
StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2")))));
TiRegion region = mgr.getRegionByKey(startKey);
assertEquals(region.getId(), regionId);
@ -103,27 +108,30 @@ public class RegionManagerTest extends PDMockServerTest {
int confVer = 1026;
int ver = 1027;
long regionId = 233;
pdServer.addGetRegionResp(
GrpcUtils.makeGetRegionResponse(
pdServer.getClusterId(),
GrpcUtils.makeRegion(
regionId,
GrpcUtils.encodeKey(startKey.toByteArray()),
GrpcUtils.encodeKey(endKey.toByteArray()),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(storeId, 10),
GrpcUtils.makePeer(storeId + 1, 20))));
for (long id : new long[] {10, 20}) {
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(),
GrpcUtils.makeStore(
id,
testAddress,
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
}
leader.addGetRegionListener(
request ->
GrpcUtils.makeGetRegionResponse(
leader.getClusterId(),
GrpcUtils.makeRegion(
regionId,
GrpcUtils.encodeKey(startKey.toByteArray()),
GrpcUtils.encodeKey(endKey.toByteArray()),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(storeId, 10),
GrpcUtils.makePeer(storeId + 1, 20))));
AtomicInteger i = new AtomicInteger(0);
long[] ids = new long[] {10, 20};
leader.addGetStoreListener(
(request ->
GrpcUtils.makeGetStoreResponse(
leader.getClusterId(),
GrpcUtils.makeStore(
ids[i.getAndIncrement()],
testAddress,
StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2")))));
Pair<TiRegion, TiStore> pair = mgr.getRegionStorePairByKey(searchKey);
assertEquals(pair.first.getId(), regionId);
@ -134,27 +142,29 @@ public class RegionManagerTest extends PDMockServerTest {
public void getStoreById() {
long storeId = 234;
String testAddress = "testAddress";
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(),
GrpcUtils.makeStore(
storeId,
testAddress,
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
leader.addGetStoreListener(
request ->
GrpcUtils.makeGetStoreResponse(
leader.getClusterId(),
GrpcUtils.makeStore(
storeId,
testAddress,
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
TiStore store = mgr.getStoreById(storeId);
assertEquals(store.getStore().getId(), storeId);
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(),
GrpcUtils.makeStore(
storeId + 1,
testAddress,
StoreState.Tombstone,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
leader.addGetStoreListener(
request ->
GrpcUtils.makeGetStoreResponse(
leader.getClusterId(),
GrpcUtils.makeStore(
storeId + 1,
testAddress,
StoreState.Tombstone,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
try {
mgr.getStoreById(storeId + 1);

View File

@ -0,0 +1,45 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common;
import com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
import org.tikv.common.KVMockServer.State;
import org.tikv.raw.RawKVClient;
public class SeekLeaderStoreTest extends MockThreeStoresTest {
private RawKVClient createClient() {
return session.createRawClient();
}
@Test
public void testSeekLeader() {
RawKVClient client = createClient();
ByteString key = ByteString.copyFromUtf8("key");
ByteString value = ByteString.copyFromUtf8("value");
put(key, value);
client.put(key, value);
Assert.assertEquals(value, client.get(key).get());
servers.get(0).setState(State.Fail);
servers.get(1).setRegion(region.switchPeer(stores.get(1).getId()));
Assert.assertEquals(value, client.get(key).get());
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common;
import com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
import org.tikv.common.KVMockServer.State;
import org.tikv.raw.RawKVClient;
public class SeekProxyStoreTest extends MockThreeStoresTest {
private RawKVClient createClient() {
return session.createRawClient();
}
@Test
public void testSeekProxyStore() {
RawKVClient client = createClient();
ByteString key = ByteString.copyFromUtf8("key");
ByteString value = ByteString.copyFromUtf8("value");
put(key, value);
client.put(key, value);
Assert.assertEquals(value, client.get(key).get());
// Set the leader to state Fail, the request will route to peer 0x2, which is not the leader.
// The state of three peers is the same.
// Thus, with the correct context, the peer 0x2 will return normally.
servers.get(0).setState(State.Fail);
Assert.assertEquals(value, client.get(key).get());
}
}