[close #570] mockserver: fix unstable mock server cluster setup (#571) (#576)

Co-authored-by: iosmanthus <dengliming@pingcap.com>
Co-authored-by: iosmanthus <myosmanthustree@gmail.com>
This commit is contained in:
ti-srebot 2022-03-29 10:39:30 +08:00 committed by GitHub
parent 47abaaf2f0
commit c2b23dbcf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 81 additions and 253 deletions

View File

@ -42,9 +42,11 @@ public class MockThreeStoresTest extends PDMockServerTest {
public void setup() throws IOException {
super.setup();
int basePort;
try (ServerSocket s = new ServerSocket(0)) {
basePort = s.getLocalPort();
int[] ports = new int[3];
for (int i = 0; i < ports.length; i++) {
try (ServerSocket s = new ServerSocket(0)) {
ports[i] = s.getLocalPort();
}
}
ImmutableList<Metapb.Peer> peers =
@ -65,17 +67,17 @@ public class MockThreeStoresTest extends PDMockServerTest {
stores =
ImmutableList.of(
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + basePort)
.setAddress("127.0.0.1:" + ports[0])
.setVersion("5.0.0")
.setId(0x1)
.build(),
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + (basePort + 1))
.setAddress("127.0.0.1:" + ports[1])
.setVersion("5.0.0")
.setId(0x2)
.build(),
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + (basePort + 2))
.setAddress("127.0.0.1:" + ports[2])
.setVersion("5.0.0")
.setId(0x3)
.build());
@ -101,9 +103,9 @@ public class MockThreeStoresTest extends PDMockServerTest {
region.getPeers(0),
region.getPeersList(),
stores.stream().map(TiStore::new).collect(Collectors.toList()));
for (int i = 0; i < 3; i++) {
for (int port : ports) {
KVMockServer server = new KVMockServer();
server.start(this.region, basePort + i);
server.start(this.region, port);
servers.add(server);
}
}

View File

@ -40,14 +40,14 @@ import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreState;
public class PDClientMockTest extends PDMockServerTest {
private static final String LOCAL_ADDR_IPV6 = "[::1]";
public static final String HTTP = "http://";
@Test
public void testCreate() throws Exception {
try (PDClient client = session.getPDClient()) {
assertEquals(LOCAL_ADDR + ":" + leader.port, client.getPdClientWrapper().getLeaderInfo());
assertEquals(
LOCAL_ADDR + ":" + leader.getPort(), client.getPdClientWrapper().getLeaderInfo());
assertEquals(CLUSTER_ID, client.getHeader().getClusterId());
}
}
@ -55,17 +55,19 @@ public class PDClientMockTest extends PDMockServerTest {
@Test
public void testSwitchLeader() throws Exception {
try (PDClient client = session.getPDClient()) {
client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1));
// Switch leader to server 1
client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort());
assertEquals(
client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (leader.port + 1));
client.getPdClientWrapper().getLeaderInfo(),
HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort());
}
tearDown();
setup(LOCAL_ADDR_IPV6);
try (PDClient client = session.getPDClient()) {
client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2));
client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort());
assertEquals(
client.getPdClientWrapper().getLeaderInfo(),
HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2));
HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort());
}
}

View File

@ -1,226 +0,0 @@
/*
* Copyright 2017 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.protobuf.ByteString;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreState;
public class PDClientTest extends PDMockServerTest {
private static final String LOCAL_ADDR_IPV6 = "[::1]";
public static final String HTTP = "http://";
@Test
public void testCreate() throws Exception {
try (PDClient client = session.getPDClient()) {
assertEquals(LOCAL_ADDR + ":" + leader.port, client.getPdClientWrapper().getLeaderInfo());
assertEquals(CLUSTER_ID, client.getHeader().getClusterId());
}
}
@Test
public void testSwitchLeader() throws Exception {
try (PDClient client = session.getPDClient()) {
client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1));
assertEquals(
client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (leader.port + 1));
}
tearDown();
setup(LOCAL_ADDR_IPV6);
try (PDClient client = session.getPDClient()) {
client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2));
assertEquals(
client.getPdClientWrapper().getLeaderInfo(),
HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2));
}
}
@Test
public void testTso() throws Exception {
try (PDClient client = session.getPDClient()) {
TiTimestamp ts = client.getTimestamp(defaultBackOff());
// Test pdServer is set to generate physical == logical + 1
assertEquals(ts.getPhysical(), ts.getLogical() + 1);
}
}
@Test
public void testGetRegionByKey() throws Exception {
byte[] startKey = new byte[] {1, 0, 2, 4};
byte[] endKey = new byte[] {1, 0, 2, 5};
int confVer = 1026;
int ver = 1027;
leader.addGetRegionListener(
request ->
GrpcUtils.makeGetRegionResponse(
leader.getClusterId(),
GrpcUtils.makeRegion(
1,
ByteString.copyFrom(startKey),
ByteString.copyFrom(endKey),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
try (PDClient client = session.getPDClient()) {
Pair<Metapb.Region, Metapb.Peer> rl =
client.getRegionByKey(defaultBackOff(), ByteString.EMPTY);
Metapb.Region r = rl.first;
Metapb.Peer l = rl.second;
assertEquals(r.getStartKey(), ByteString.copyFrom(startKey));
assertEquals(r.getEndKey(), ByteString.copyFrom(endKey));
assertEquals(r.getRegionEpoch().getConfVer(), confVer);
assertEquals(r.getRegionEpoch().getVersion(), ver);
assertEquals(1, l.getId());
assertEquals(10, l.getStoreId());
}
}
@Test
public void testGetRegionById() throws Exception {
byte[] startKey = new byte[] {1, 0, 2, 4};
byte[] endKey = new byte[] {1, 0, 2, 5};
int confVer = 1026;
int ver = 1027;
leader.addGetRegionByIDListener(
request ->
GrpcUtils.makeGetRegionResponse(
leader.getClusterId(),
GrpcUtils.makeRegion(
1,
ByteString.copyFrom(startKey),
ByteString.copyFrom(endKey),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makePeer(1, 10),
GrpcUtils.makePeer(2, 20))));
try (PDClient client = session.getPDClient()) {
Pair<Metapb.Region, Metapb.Peer> rl = client.getRegionByID(defaultBackOff(), 0);
Metapb.Region r = rl.first;
Metapb.Peer l = rl.second;
assertEquals(ByteString.copyFrom(startKey), r.getStartKey());
assertEquals(ByteString.copyFrom(endKey), r.getEndKey());
assertEquals(confVer, r.getRegionEpoch().getConfVer());
assertEquals(ver, r.getRegionEpoch().getVersion());
assertEquals(1, l.getId());
assertEquals(10, l.getStoreId());
}
}
@Test
public void testGetStore() throws Exception {
long storeId = 1;
String testAddress = "testAddress";
leader.addGetStoreListener(
request ->
GrpcUtils.makeGetStoreResponse(
leader.getClusterId(),
GrpcUtils.makeStore(
storeId,
testAddress,
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
try (PDClient client = session.getPDClient()) {
Store r = client.getStore(defaultBackOff(), storeId);
assertEquals(storeId, r.getId());
assertEquals(testAddress, r.getAddress());
assertEquals(Metapb.StoreState.Up, r.getState());
assertEquals("k1", r.getLabels(0).getKey());
assertEquals("k2", r.getLabels(1).getKey());
assertEquals("v1", r.getLabels(0).getValue());
assertEquals("v2", r.getLabels(1).getValue());
leader.addGetStoreListener(
request ->
GrpcUtils.makeGetStoreResponse(
leader.getClusterId(),
GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone)));
assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState());
}
}
private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(1000);
}
@Test
public void testRetryPolicy() throws Exception {
long storeId = 1024;
ExecutorService service = Executors.newCachedThreadPool();
AtomicInteger i = new AtomicInteger();
leader.addGetStoreListener(
request -> {
if (i.getAndIncrement() < 2) {
return null;
} else {
return GrpcUtils.makeGetStoreResponse(
leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up));
}
});
try (PDClient client = session.getPDClient()) {
Callable<Store> storeCallable =
() -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0);
Future<Store> storeFuture = service.submit(storeCallable);
try {
Store r = storeFuture.get(50, TimeUnit.SECONDS);
assertEquals(r.getId(), storeId);
} catch (TimeoutException e) {
fail();
}
// Should fail
AtomicInteger j = new AtomicInteger();
leader.addGetStoreListener(
request -> {
if (j.getAndIncrement() < 6) {
return null;
} else {
return GrpcUtils.makeGetStoreResponse(
leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up));
}
});
try {
client.getStore(defaultBackOff(), 0);
} catch (GrpcException e) {
assertTrue(true);
return;
}
fail();
}
}
}

View File

@ -37,8 +37,7 @@ import org.tikv.kvproto.Pdpb.TsoRequest;
import org.tikv.kvproto.Pdpb.TsoResponse;
public class PDMockServer extends PDGrpc.PDImplBase {
public int port;
private int port;
private long clusterId;
private Server server;
@ -148,4 +147,8 @@ public class PDMockServer extends PDGrpc.PDImplBase {
public long getClusterId() {
return clusterId;
}
public long getPort() {
return port;
}
}

View File

@ -25,7 +25,6 @@ import org.junit.After;
import org.junit.Before;
public abstract class PDMockServerTest {
protected static final String LOCAL_ADDR = "127.0.0.1";
static final long CLUSTER_ID = 1024;
protected TiSession session;
@ -38,28 +37,30 @@ public abstract class PDMockServerTest {
}
void setup(String addr) throws IOException {
int basePort;
try (ServerSocket s = new ServerSocket(0)) {
basePort = s.getLocalPort();
int[] ports = new int[3];
for (int i = 0; i < ports.length; i++) {
try (ServerSocket s = new ServerSocket(0)) {
ports[i] = s.getLocalPort();
}
}
for (int i = 0; i < 3; i++) {
for (int i = 0; i < ports.length; i++) {
PDMockServer server = new PDMockServer();
server.start(CLUSTER_ID, basePort + i);
server.start(CLUSTER_ID, ports[i]);
server.addGetMembersListener(
(request) ->
GrpcUtils.makeGetMembersResponse(
server.getClusterId(),
GrpcUtils.makeMember(1, "http://" + addr + ":" + basePort),
GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)),
GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2))));
GrpcUtils.makeMember(1, "http://" + addr + ":" + ports[0]),
GrpcUtils.makeMember(2, "http://" + addr + ":" + ports[1]),
GrpcUtils.makeMember(3, "http://" + addr + ":" + ports[2])));
pdServers.add(server);
if (i == 0) {
leader = server;
}
}
TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port);
TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + ports[0]);
conf.setKvMode("RAW");
conf.setWarmUpEnable(false);
conf.setTimeout(2000);

View File

@ -28,7 +28,6 @@ import org.tikv.kvproto.Pdpb;
import org.tikv.raw.RawKVClient;
public class SeekLeaderStoreTest extends MockThreeStoresTest {
private RawKVClient createClient() {
return session.createRawClient();
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common;
import com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
import org.tikv.common.KVMockServer.State;
import org.tikv.raw.RawKVClient;
public class SeekProxyStoreTest extends MockThreeStoresTest {
private RawKVClient createClient() {
return session.createRawClient();
}
@Test
public void testSeekProxyStore() {
RawKVClient client = createClient();
ByteString key = ByteString.copyFromUtf8("key");
ByteString value = ByteString.copyFromUtf8("value");
put(key, value);
client.put(key, value);
Assert.assertEquals(value, client.get(key));
// Set the leader to state Fail, the request will route to peer 0x2, which is not the leader.
// The state of three peers is the same.
// Thus, with the correct context, the peer 0x2 will return normally.
servers.get(0).setState(State.Fail);
Assert.assertEquals(value, client.get(key));
}
}