[close #550] rawkv: fix seek leader/proxy store early abort (#551)

Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com>
This commit is contained in:
iosmanthus 2022-03-23 12:48:48 +08:00 committed by GitHub
parent b5b0545b6f
commit e89ca5f37b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 47 deletions

View File

@ -277,19 +277,27 @@ public abstract class AbstractRegionStoreClient
List<SwitchLeaderTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(peer))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
responses.add(new SwitchLeaderTask(task, peer));
try {
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(peer))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
responses.add(new SwitchLeaderTask(task, peer));
} catch (Exception e) {
logger.warn(
"switch region[{}] leader store to {} failed: {}",
region.getId(),
peer.getStoreId(),
e);
}
}
while (true) {
try {
@ -328,22 +336,31 @@ public abstract class AbstractRegionStoreClient
List<ForwardCheckTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS);
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(region.getLeader()))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task =
MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest);
responses.add(new ForwardCheckTask(task, peerStore.getStore()));
try {
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel)
.withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS);
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(region.getLeader()))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task =
MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest);
responses.add(new ForwardCheckTask(task, peerStore.getStore()));
} catch (Exception e) {
logger.warn(
"switch region[{}] leader store to {} failed: {}",
region.getId(),
peer.getStoreId(),
e);
}
}
while (true) {
try {

View File

@ -26,7 +26,6 @@ import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
@ -47,6 +46,7 @@ import org.tikv.kvproto.Pdpb;
@SuppressWarnings("UnstableApiUsage")
public class RegionManager {
private static final Logger logger = LoggerFactory.getLogger(RegionManager.class);
public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY =
HistogramUtils.buildDuration()
@ -205,22 +205,23 @@ public class RegionManager {
}
public TiRegion createRegion(Metapb.Region region, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, null, peers, stores);
return createRegion(region, null, backOffer);
}
private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, leader, peers, stores);
}
private List<TiStore> getRegionStore(List<Metapb.Peer> peers, BackOffer backOffer) {
return peers
.stream()
.map(p -> getStoreById(p.getStoreId(), backOffer))
.collect(Collectors.toList());
List<Metapb.Peer> peers = new ArrayList<>();
List<TiStore> stores = new ArrayList<>();
for (Metapb.Peer peer : region.getPeersList()) {
try {
stores.add(getStoreById(peer.getStoreId(), backOffer));
peers.add(peer);
} catch (Exception e) {
logger.warn("Store {} not found: {}", peer.getStoreId(), e.toString());
}
}
Metapb.Region newRegion =
Metapb.Region.newBuilder().mergeFrom(region).clearPeers().addAllPeers(peers).build();
return new TiRegion(conf, newRegion, leader, peers, stores);
}
private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) {

View File

@ -41,7 +41,7 @@ import org.tikv.kvproto.Metapb.StoreState;
public class PDClientMockTest extends PDMockServerTest {
private static final String LOCAL_ADDR_IPV6 = "[::]";
private static final String LOCAL_ADDR_IPV6 = "[::1]";
public static final String HTTP = "http://";
@Test

View File

@ -39,7 +39,7 @@ public abstract class PDMockServerTest {
void setup(String addr) throws IOException {
int basePort;
try (ServerSocket s = new ServerSocket(51820)) {
try (ServerSocket s = new ServerSocket(0)) {
basePort = s.getLocalPort();
}
@ -54,9 +54,11 @@ public abstract class PDMockServerTest {
GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)),
GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2))));
pdServers.add(server);
if (i == 0) {
leader = server;
}
}
leader = pdServers.get(0);
TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port);
conf.setKvMode("RAW");
conf.setWarmUpEnable(false);

View File

@ -18,9 +18,13 @@
package org.tikv.common;
import com.google.protobuf.ByteString;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.tikv.common.KVMockServer.State;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.StoreState;
import org.tikv.kvproto.Pdpb;
import org.tikv.raw.RawKVClient;
public class SeekLeaderStoreTest extends MockThreeStoresTest {
@ -34,12 +38,41 @@ public class SeekLeaderStoreTest extends MockThreeStoresTest {
RawKVClient client = createClient();
ByteString key = ByteString.copyFromUtf8("key");
ByteString value = ByteString.copyFromUtf8("value");
put(key, value);
client.put(key, value);
Assert.assertEquals(value, client.get(key).get());
servers.get(0).setState(State.Fail);
servers.get(1).setRegion(region.switchPeer(stores.get(1).getId()));
Assert.assertEquals(value, client.get(key).get());
remove(key, value);
}
@Test
public void testSeekLeaderMeetInvalidStore() {
RawKVClient client = createClient();
ByteString key = ByteString.copyFromUtf8("key");
ByteString value = ByteString.copyFromUtf8("value");
put(key, value);
servers.get(0).setState(State.Fail);
servers.get(2).setRegion(region.switchPeer(stores.get(2).getId()));
AtomicInteger i = new AtomicInteger(0);
leader.addGetStoreListener(
request -> {
Metapb.Store.Builder storeBuilder =
Metapb.Store.newBuilder().mergeFrom(stores.get((int) request.getStoreId() - 1));
if (request.getStoreId() == 0x2 && i.incrementAndGet() > 0) {
storeBuilder.setState(StoreState.Tombstone);
}
return Pdpb.GetStoreResponse.newBuilder().setStore(storeBuilder.build()).build();
});
Assert.assertEquals(value, client.get(key).get());
remove(key, value);
}
}