[close #616] check store version while using API V2 (#617)

This commit is contained in:
iosmanthus 2022-06-20 22:05:43 +08:00 committed by GitHub
parent 7e6af2984d
commit d6a15c4ccc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 60 additions and 3 deletions

View File

@ -175,6 +175,11 @@ public class TiSession implements AutoCloseable {
} }
this.client = PDClient.createRaw(conf, keyCodec, channelFactory); this.client = PDClient.createRaw(conf, keyCodec, channelFactory);
if (conf.getApiVersion().isV2() && !StoreVersion.minTiKVVersion(Version.API_V2, client)) {
throw new IllegalStateException(
"With API v2, store versions should not older than " + Version.API_V2);
}
this.enableGrpcForward = conf.getEnableGrpcForward(); this.enableGrpcForward = conf.getEnableGrpcForward();
if (this.enableGrpcForward) { if (this.enableGrpcForward) {
logger.info("enable grpc forward for high available"); logger.info("enable grpc forward for high available");

View File

@ -25,4 +25,6 @@ public class Version {
public static final String RESOLVE_LOCK_V4 = "4.0.0"; public static final String RESOLVE_LOCK_V4 = "4.0.0";
public static final String BATCH_WRITE = "3.0.14"; public static final String BATCH_WRITE = "3.0.14";
public static final String API_V2 = "6.1.0";
} }

View File

@ -150,4 +150,15 @@ public class ApiVersionTest {
Assert.assertNotNull(e); Assert.assertNotNull(e);
} }
} }
@Test
public void testAccessOldVersionClusterWithV2() {
Assume.assumeFalse(minTiKVVersion("6.1.0"));
try (RawKVClient client = createRawClient(ApiVersion.V2)) {
Assert.fail("Should not create V2 client while store version is less than 6.1.0");
} catch (Exception e) {
Assert.assertNotNull(e);
}
}
} }

View File

@ -30,6 +30,7 @@ import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore; import org.tikv.common.region.TiStore;
import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb; import org.tikv.kvproto.Pdpb;
import org.tikv.kvproto.Pdpb.GetAllStoresResponse;
public class MockThreeStoresTest extends PDMockServerTest { public class MockThreeStoresTest extends PDMockServerTest {
@ -68,17 +69,17 @@ public class MockThreeStoresTest extends PDMockServerTest {
ImmutableList.of( ImmutableList.of(
Metapb.Store.newBuilder() Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + ports[0]) .setAddress("127.0.0.1:" + ports[0])
.setVersion("5.0.0") .setVersion(Version.API_V2)
.setId(0x1) .setId(0x1)
.build(), .build(),
Metapb.Store.newBuilder() Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + ports[1]) .setAddress("127.0.0.1:" + ports[1])
.setVersion("5.0.0") .setVersion(Version.API_V2)
.setId(0x2) .setId(0x2)
.build(), .build(),
Metapb.Store.newBuilder() Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + ports[2]) .setAddress("127.0.0.1:" + ports[2])
.setVersion("5.0.0") .setVersion(Version.API_V2)
.setId(0x3) .setId(0x3)
.build()); .build());
@ -94,6 +95,10 @@ public class MockThreeStoresTest extends PDMockServerTest {
int i = (int) request.getStoreId() - 1; int i = (int) request.getStoreId() - 1;
return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build(); return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build();
}); });
server.addGetAllStoresListener(
request -> {
return GetAllStoresResponse.newBuilder().addAllStores(stores).build();
});
} }
this.region = this.region =

View File

@ -17,6 +17,7 @@
package org.tikv.common; package org.tikv.common;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import java.util.List; import java.util.List;
import org.junit.Assert; import org.junit.Assert;
@ -27,7 +28,10 @@ import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair; import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreState;
import org.tikv.kvproto.Pdpb; import org.tikv.kvproto.Pdpb;
import org.tikv.kvproto.Pdpb.GetAllStoresResponse;
import org.tikv.kvproto.Pdpb.GetRegionResponse; import org.tikv.kvproto.Pdpb.GetRegionResponse;
import org.tikv.kvproto.Pdpb.Region; import org.tikv.kvproto.Pdpb.Region;
import org.tikv.kvproto.Pdpb.ScanRegionsResponse; import org.tikv.kvproto.Pdpb.ScanRegionsResponse;
@ -35,6 +39,18 @@ import org.tikv.kvproto.Pdpb.ScanRegionsResponse;
public class PDClientV2MockTest extends PDMockServerTest { public class PDClientV2MockTest extends PDMockServerTest {
@Before @Before
public void init() throws Exception { public void init() throws Exception {
leader.addGetAllStoresListener(
request -> {
return GetAllStoresResponse.newBuilder()
.addAllStores(
ImmutableList.of(
Store.newBuilder()
.setId(0x1)
.setState(StoreState.Up)
.setVersion(Version.API_V2)
.build()))
.build();
});
upgradeToV2Cluster(); upgradeToV2Cluster();
} }

View File

@ -30,6 +30,8 @@ import java.net.ServerSocket;
import java.util.Optional; import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
import org.tikv.kvproto.PDGrpc; import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.Pdpb.GetAllStoresRequest;
import org.tikv.kvproto.Pdpb.GetAllStoresResponse;
import org.tikv.kvproto.Pdpb.GetMembersRequest; import org.tikv.kvproto.Pdpb.GetMembersRequest;
import org.tikv.kvproto.Pdpb.GetMembersResponse; import org.tikv.kvproto.Pdpb.GetMembersResponse;
import org.tikv.kvproto.Pdpb.GetRegionByIDRequest; import org.tikv.kvproto.Pdpb.GetRegionByIDRequest;
@ -54,6 +56,8 @@ public class PDMockServer extends PDGrpc.PDImplBase {
private Function<ScanRegionsRequest, ScanRegionsResponse> scanRegionsListener; private Function<ScanRegionsRequest, ScanRegionsResponse> scanRegionsListener;
private Function<GetAllStoresRequest, GetAllStoresResponse> getAllStoresListener;
public void addGetMembersListener(Function<GetMembersRequest, GetMembersResponse> func) { public void addGetMembersListener(Function<GetMembersRequest, GetMembersResponse> func) {
getMembersListener = func; getMembersListener = func;
} }
@ -144,6 +148,20 @@ public class PDMockServer extends PDGrpc.PDImplBase {
} }
} }
public void addGetAllStoresListener(Function<GetAllStoresRequest, GetAllStoresResponse> func) {
getAllStoresListener = func;
}
@Override
public void getAllStores(GetAllStoresRequest request, StreamObserver<GetAllStoresResponse> resp) {
try {
resp.onNext(Optional.ofNullable(getAllStoresListener.apply(request)).get());
resp.onCompleted();
} catch (Exception e) {
resp.onError(Status.INTERNAL.asRuntimeException());
}
}
public void start(long clusterId) throws IOException { public void start(long clusterId) throws IOException {
int port; int port;
try (ServerSocket s = new ServerSocket(0)) { try (ServerSocket s = new ServerSocket(0)) {