RawKV support SST Ingest using KVStream interface (#233)

This commit is contained in:
Liangliang Gu 2021-07-22 16:40:55 +08:00 committed by GitHub
parent faf0cb435d
commit 798244cd9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 951 additions and 24 deletions

View File

@ -1,8 +1,8 @@
def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPullDescription, credentialsId) {
def TIDB_BRANCH = "release-5.0"
def TIKV_BRANCH = "release-5.0"
def PD_BRANCH = "release-5.0"
def TIDB_BRANCH = "master"
def TIKV_BRANCH = "master"
def PD_BRANCH = "master"
// parse tidb branch
def m1 = ghprbCommentBody =~ /tidb\s*=\s*([^\s\\]+)(\s|\\|$)/

View File

@ -98,6 +98,18 @@ The following includes JVM related parameters.
- timeout of scan/delete range grpc request
- default: 20s
#### tikv.importer.max_kv_batch_bytes
- Maximal package size transporting from clients to TiKV Server (ingest API)
- default: 1048576 (1M)
#### tikv.importer.max_kv_batch_size
- Maximal batch size transporting from clients to TiKV Server (ingest API)
- default: 32768 (32K)
#### tikv.scatter_wait_seconds
- time to wait for scattering regions
- default: 300 (5min)
### Metrics Parameter
#### tikv.metrics.enable

View File

@ -14,7 +14,7 @@
# limitations under the License.
#
kvproto_hash=6ed99a08e262d8a32d6355dcba91cf99cb92074a
kvproto_hash=2ac2a7984b2d01b96ed56fd8474f4bf80fa33c51
raft_rs_hash=b9891b673573fad77ebcf9bbe0969cf945841926
tipb_hash=c4d518eb1d60c21f05b028b36729e64610346dac

View File

@ -56,6 +56,11 @@ public class ConfigUtils {
public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas";
public static final String TIKV_IMPORTER_MAX_KV_BATCH_BYTES = "tikv.importer.max_kv_batch_bytes";
public static final String TIKV_IMPORTER_MAX_KV_BATCH_SIZE = "tikv.importer.max_kv_batch_size";
public static final String TIKV_SCATTER_WAIT_SECONDS = "tikv.scatter_wait_seconds";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_FORWARD_TIMEOUT = "300ms";
@ -89,6 +94,10 @@ public class ConfigUtils {
public static final boolean DEF_GRPC_FORWARD_ENABLE = true;
public static final boolean DEF_TIKV_ENABLE_ATOMIC_FOR_CAS = false;
public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES = 1024 * 1024;
public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE = 1024 * 32;
public static final int DEF_TIKV_SCATTER_WAIT_SECONDS = 300;
public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
public static final String HIGH_COMMAND_PRIORITY = "HIGH";

View File

@ -48,7 +48,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.CodecDataOutput;
@ -230,7 +229,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
try {
if (conf.getKvMode() == KVMode.TXN) {
if (conf.isTxnKVMode()) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key.toByteArray());
key = cdo.toByteString();
@ -679,7 +678,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
private Metapb.Region decodeRegion(Metapb.Region region) {
final boolean isRawRegion = conf.getKvMode() == KVMode.RAW;
final boolean isRawRegion = conf.isRawKVMode();
Metapb.Region.Builder builder =
Metapb.Region.newBuilder()
.setId(region.getId())

View File

@ -82,6 +82,9 @@ public class TiConfiguration implements Serializable {
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION);
setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS);
setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_BYTES, DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES);
setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_SIZE, DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE);
setIfMissing(TIKV_SCATTER_WAIT_SECONDS, DEF_TIKV_SCATTER_WAIT_SECONDS);
}
public static void listAll() {
@ -273,6 +276,12 @@ public class TiConfiguration implements Serializable {
private boolean enableAtomicForCAS = getBoolean(TIKV_ENABLE_ATOMIC_FOR_CAS);
private int importerMaxKVBatchBytes = getInt(TIKV_IMPORTER_MAX_KV_BATCH_BYTES);
private int importerMaxKVBatchSize = getInt(TIKV_IMPORTER_MAX_KV_BATCH_SIZE);
private int scatterWaitSeconds = getInt(TIKV_SCATTER_WAIT_SECONDS);
public enum KVMode {
TXN,
RAW
@ -489,6 +498,14 @@ public class TiConfiguration implements Serializable {
return kvMode;
}
public boolean isRawKVMode() {
return getKvMode() == TiConfiguration.KVMode.RAW;
}
public boolean isTxnKVMode() {
return getKvMode() == KVMode.TXN;
}
public TiConfiguration setKvMode(String kvMode) {
this.kvMode = KVMode.valueOf(kvMode);
return this;
@ -586,4 +603,28 @@ public class TiConfiguration implements Serializable {
public void setEnableAtomicForCAS(boolean enableAtomicForCAS) {
this.enableAtomicForCAS = enableAtomicForCAS;
}
public int getImporterMaxKVBatchBytes() {
return importerMaxKVBatchBytes;
}
public void setImporterMaxKVBatchBytes(int importerMaxKVBatchBytes) {
this.importerMaxKVBatchBytes = importerMaxKVBatchBytes;
}
public int getImporterMaxKVBatchSize() {
return importerMaxKVBatchSize;
}
public void setImporterMaxKVBatchSize(int importerMaxKVBatchSize) {
this.importerMaxKVBatchSize = importerMaxKVBatchSize;
}
public int getScatterWaitSeconds() {
return scatterWaitSeconds;
}
public void setScatterWaitSeconds(int scatterWaitSeconds) {
this.scatterWaitSeconds = scatterWaitSeconds;
}
}

View File

@ -31,6 +31,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.catalog.Catalog;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.importer.ImporterStoreClient;
import org.tikv.common.importer.SwitchTiKVModeClient;
import org.tikv.common.key.Key;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.RegionManager;
@ -67,8 +69,10 @@ public class TiSession implements AutoCloseable {
private volatile RegionManager regionManager;
private volatile boolean enableGrpcForward;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder;
private boolean isClosed = false;
private MetricsServer metricsServer;
private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6;
public TiSession(TiConfiguration conf) {
this.conf = conf;
@ -132,6 +136,21 @@ public class TiSession implements AutoCloseable {
return res;
}
public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClientBuilder() {
ImporterStoreClient.ImporterStoreClientBuilder res = importerClientBuilder;
if (res == null) {
synchronized (this) {
if (importerClientBuilder == null) {
importerClientBuilder =
new ImporterStoreClient.ImporterStoreClientBuilder(
conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
}
res = importerClientBuilder;
}
}
return res;
}
public TiConfiguration getConf() {
return conf;
}
@ -322,10 +341,22 @@ public class TiSession implements AutoCloseable {
return channelFactory;
}
/**
* SwitchTiKVModeClient is used for SST Ingest.
*
* @return a SwitchTiKVModeClient
*/
public SwitchTiKVModeClient getSwitchTiKVModeClient() {
return new SwitchTiKVModeClient(getPDClient(), getImporterRegionStoreClientBuilder());
}
/**
* split region and scatter
*
* @param splitKeys
* @param splitRegionBackoffMS
* @param scatterRegionBackoffMS
* @param scatterWaitMS
*/
public void splitRegionAndScatter(
List<byte[]> splitKeys,
@ -340,7 +371,7 @@ public class TiSession implements AutoCloseable {
splitRegion(
splitKeys
.stream()
.map(k -> Key.toRawKey(k).next().toByteString())
.map(k -> Key.toRawKey(k).toByteString())
.collect(Collectors.toList()),
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS));
@ -375,11 +406,28 @@ public class TiSession implements AutoCloseable {
logger.info("splitRegionAndScatter cost {} seconds", (endMS - startMS) / 1000);
}
/**
* split region and scatter
*
* @param splitKeys
*/
public void splitRegionAndScatter(List<byte[]> splitKeys) {
int splitRegionBackoffMS = BackOffer.SPLIT_REGION_BACKOFF;
int scatterRegionBackoffMS = BackOffer.SCATTER_REGION_BACKOFF;
int scatterWaitMS = conf.getScatterWaitSeconds() * 1000;
splitRegionAndScatter(splitKeys, splitRegionBackoffMS, scatterRegionBackoffMS, scatterWaitMS);
}
private List<Metapb.Region> splitRegion(List<ByteString> splitKeys, BackOffer backOffer) {
return splitRegion(splitKeys, backOffer, 1);
}
private List<Metapb.Region> splitRegion(
List<ByteString> splitKeys, BackOffer backOffer, int depth) {
List<Metapb.Region> regions = new ArrayList<>();
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(regionManager, splitKeys, backOffer);
groupKeysByRegion(getRegionManager(), splitKeys, backOffer);
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
Pair<TiRegion, TiStore> pair =
@ -401,12 +449,22 @@ public class TiSession implements AutoCloseable {
List<Metapb.Region> newRegions;
try {
newRegions = getRegionStoreClientBuilder().build(region, store).splitRegion(splits);
// invalidate old region
getRegionManager().invalidateRegion(region);
} catch (final TiKVException e) {
// retry
logger.warn("ReSplitting ranges for splitRegion", e);
clientBuilder.getRegionManager().invalidateRegion(region);
getRegionManager().invalidateRegion(region);
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
newRegions = splitRegion(splits, backOffer);
if (depth >= MAX_SPLIT_REGION_STACK_DEPTH) {
logger.warn(
String.format(
"Skip split region because MAX_SPLIT_REGION_STACK_DEPTH(%d) reached!",
MAX_SPLIT_REGION_STACK_DEPTH));
newRegions = new ArrayList<>();
} else {
newRegions = splitRegion(splits, backOffer, depth + 1);
}
}
logger.info("region id={}, new region size={}", region.getId(), newRegions.size());
regions.addAll(newRegions);

View File

@ -0,0 +1,192 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.importer;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Metapb;
public class ImporterClient {
private TiConfiguration tiConf;
private TiSession tiSession;
private ByteString uuid;
private Key minKey;
private Key maxKey;
private TiRegion region;
private Long ttl;
private boolean streamOpened = false;
private ImportSstpb.SSTMeta sstMeta;
private List<ImporterStoreClient> clientList;
private ImporterStoreClient clientLeader;
public ImporterClient(
TiSession tiSession, ByteString uuid, Key minKey, Key maxKey, TiRegion region, Long ttl) {
this.uuid = uuid;
this.tiConf = tiSession.getConf();
this.tiSession = tiSession;
this.minKey = minKey;
this.maxKey = maxKey;
this.region = region;
this.ttl = ttl;
}
/**
* write KV pairs to RawKV using KVStream interface
*
* @param iterator
*/
public void rawWrite(Iterator<Pair<ByteString, ByteString>> iterator) throws GrpcException {
if (!tiConf.isRawKVMode()) {
throw new IllegalArgumentException("KVMode is not RAW in TiConfiguration!");
}
streamOpened = false;
int maxKVBatchSize = tiConf.getImporterMaxKVBatchSize();
int maxKVBatchBytes = tiConf.getImporterMaxKVBatchBytes();
int totalBytes = 0;
while (iterator.hasNext()) {
ArrayList<ImportSstpb.Pair> pairs = new ArrayList<>(maxKVBatchSize);
for (int i = 0; i < maxKVBatchSize; i++) {
if (iterator.hasNext()) {
Pair<ByteString, ByteString> pair = iterator.next();
pairs.add(ImportSstpb.Pair.newBuilder().setKey(pair.first).setValue(pair.second).build());
totalBytes += (pair.first.size() + pair.second.size());
}
if (totalBytes > maxKVBatchBytes) {
break;
}
}
if (!streamOpened) {
init();
startRawWrite();
rawWriteMeta();
streamOpened = true;
}
rawWriteBatch(pairs);
}
if (streamOpened) {
finishRawWrite();
ingest();
}
}
private void init() {
long regionId = region.getId();
Metapb.RegionEpoch regionEpoch = region.getRegionEpoch();
ImportSstpb.Range range =
ImportSstpb.Range.newBuilder()
.setStart(minKey.toByteString())
.setEnd(maxKey.toByteString())
.build();
sstMeta =
ImportSstpb.SSTMeta.newBuilder()
.setUuid(uuid)
.setRegionId(regionId)
.setRegionEpoch(regionEpoch)
.setRange(range)
.build();
clientList = new ArrayList<>();
for (Metapb.Peer peer : region.getPeersList()) {
long storeId = peer.getStoreId();
TiStore store = tiSession.getRegionManager().getStoreById(storeId);
ImporterStoreClient importerStoreClient =
tiSession.getImporterRegionStoreClientBuilder().build(store);
clientList.add(importerStoreClient);
if (region.getLeader().getStoreId() == storeId) {
clientLeader = importerStoreClient;
}
}
}
private void startRawWrite() {
for (ImporterStoreClient client : clientList) {
client.startRawWrite();
}
}
private void rawWriteMeta() {
ImportSstpb.RawWriteRequest request =
ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build();
for (ImporterStoreClient client : clientList) {
client.rawWriteBatch(request);
}
}
private void rawWriteBatch(List<ImportSstpb.Pair> pairs) {
ImportSstpb.RawWriteBatch batch;
if (ttl == null || ttl <= 0) {
batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build();
} else {
batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build();
}
ImportSstpb.RawWriteRequest request =
ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build();
for (ImporterStoreClient client : clientList) {
client.rawWriteBatch(request);
}
}
private void finishRawWrite() {
for (ImporterStoreClient client : clientList) {
client.finishRawWrite();
}
}
private void ingest() throws GrpcException {
List<ImporterStoreClient> workingClients = new ArrayList<>(clientList);
while (!workingClients.isEmpty()) {
Iterator<ImporterStoreClient> itor = workingClients.iterator();
while (itor.hasNext()) {
ImporterStoreClient client = itor.next();
if (client.isRawWriteResponseReceived()) {
itor.remove();
} else if (client.hasRawWriteResponseError()) {
throw new GrpcException(client.getRawWriteError());
}
}
if (!workingClients.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
clientLeader.multiIngest(region.getLeaderContext());
}
}

View File

@ -0,0 +1,199 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.importer;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.PDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.operation.NoopHandler;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.ImportSSTGrpc;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Kvrpcpb;
public class ImporterStoreClient
extends AbstractGRPCClient<ImportSSTGrpc.ImportSSTBlockingStub, ImportSSTGrpc.ImportSSTStub>
implements StreamObserver<ImportSstpb.RawWriteResponse> {
private static final Logger logger = LoggerFactory.getLogger(ImporterStoreClient.class);
protected ImporterStoreClient(
TiConfiguration conf,
ChannelFactory channelFactory,
ImportSSTGrpc.ImportSSTBlockingStub blockingStub,
ImportSSTGrpc.ImportSSTStub asyncStub) {
super(conf, channelFactory, blockingStub, asyncStub);
}
private StreamObserver<ImportSstpb.RawWriteRequest> streamObserverRequest;
private ImportSstpb.RawWriteResponse rawWriteResponse;
private Throwable rawWriteError;
public synchronized boolean isRawWriteResponseReceived() {
return rawWriteResponse != null;
}
private synchronized ImportSstpb.RawWriteResponse getRawWriteResponse() {
return rawWriteResponse;
}
private synchronized void setRawWriteResponse(ImportSstpb.RawWriteResponse rawWriteResponse) {
this.rawWriteResponse = rawWriteResponse;
}
public synchronized boolean hasRawWriteResponseError() {
return this.rawWriteError != null;
}
public synchronized Throwable getRawWriteError() {
return this.rawWriteError;
}
private synchronized void setRawWriteError(Throwable t) {
this.rawWriteError = t;
}
@Override
public void onNext(ImportSstpb.RawWriteResponse value) {
setRawWriteResponse(value);
}
@Override
public void onError(Throwable t) {
setRawWriteError(t);
logger.error("Error during raw write!", t);
}
@Override
public void onCompleted() {
// do nothing
}
/**
* Ingest KV pairs to RawKV using gRPC streaming mode. This API should be called on both leader
* and followers.
*
* @return
*/
public void startRawWrite() {
streamObserverRequest = getAsyncStub().rawWrite(this);
}
/**
* This API should be called after `startRawWrite`.
*
* @param request
*/
public void rawWriteBatch(ImportSstpb.RawWriteRequest request) {
streamObserverRequest.onNext(request);
}
/** This API should be called after `rawWriteBatch`. */
public void finishRawWrite() {
streamObserverRequest.onCompleted();
}
/**
* This API should be called after `finishRawWrite`. This API should be called on leader only.
*
* @param ctx
*/
public void multiIngest(Kvrpcpb.Context ctx) {
List<ImportSstpb.SSTMeta> metasList = getRawWriteResponse().getMetasList();
ImportSstpb.MultiIngestRequest request =
ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build();
ImportSstpb.IngestResponse response = getBlockingStub().multiIngest(request);
if (response.hasError()) {
throw new GrpcException("" + response.getError());
}
}
public void switchMode(ImportSstpb.SwitchMode mode) {
Supplier<ImportSstpb.SwitchModeRequest> request =
() -> ImportSstpb.SwitchModeRequest.newBuilder().setMode(mode).build();
NoopHandler<ImportSstpb.SwitchModeResponse> noopHandler = new NoopHandler<>();
callWithRetry(
ConcreteBackOffer.newCustomBackOff(BackOffer.TIKV_SWITCH_MODE_BACKOFF),
ImportSSTGrpc.getSwitchModeMethod(),
request,
noopHandler);
}
@Override
protected ImportSSTGrpc.ImportSSTBlockingStub getBlockingStub() {
return blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}
@Override
protected ImportSSTGrpc.ImportSSTStub getAsyncStub() {
return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public void close() throws Exception {}
public static class ImporterStoreClientBuilder {
private final TiConfiguration conf;
private final ChannelFactory channelFactory;
private final RegionManager regionManager;
private final PDClient pdClient;
public ImporterStoreClientBuilder(
TiConfiguration conf,
ChannelFactory channelFactory,
RegionManager regionManager,
PDClient pdClient) {
Objects.requireNonNull(conf, "conf is null");
Objects.requireNonNull(channelFactory, "channelFactory is null");
Objects.requireNonNull(regionManager, "regionManager is null");
this.conf = conf;
this.channelFactory = channelFactory;
this.regionManager = regionManager;
this.pdClient = pdClient;
}
public synchronized ImporterStoreClient build(TiStore store) throws GrpcException {
Objects.requireNonNull(store, "store is null");
String addressStr = store.getStore().getAddress();
logger.debug(String.format("Create region store client on address %s", addressStr));
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel);
ImportSSTGrpc.ImportSSTStub asyncStub = ImportSSTGrpc.newStub(channel);
return new ImporterStoreClient(conf, channelFactory, blockingStub, asyncStub);
}
}
}

View File

@ -0,0 +1,79 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.importer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.tikv.common.PDClient;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Metapb;
public class SwitchTiKVModeClient {
private static final int IMPORT_MODE_TIMEOUT = 600;
private static final int KEEP_TIKV_TO_IMPORT_MODE_PERIOD = IMPORT_MODE_TIMEOUT / 5;
private final PDClient pdClient;
private final ImporterStoreClient.ImporterStoreClientBuilder builder;
private final ScheduledExecutorService ingestScheduledExecutorService;
public SwitchTiKVModeClient(
PDClient pdClient, ImporterStoreClient.ImporterStoreClientBuilder builder) {
this.pdClient = pdClient;
this.builder = builder;
this.ingestScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("switch-tikv-mode-pool-%d")
.setDaemon(true)
.build());
}
public void switchTiKVToNormalMode() {
doSwitchTiKVMode(ImportSstpb.SwitchMode.Normal);
}
public void keepTiKVToImportMode() {
ingestScheduledExecutorService.scheduleAtFixedRate(
this::switchTiKVToImportMode, 0, KEEP_TIKV_TO_IMPORT_MODE_PERIOD, TimeUnit.SECONDS);
}
public void stopKeepTiKVToImportMode() {
ingestScheduledExecutorService.shutdown();
}
private void switchTiKVToImportMode() {
doSwitchTiKVMode(ImportSstpb.SwitchMode.Import);
}
private void doSwitchTiKVMode(ImportSstpb.SwitchMode mode) {
BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
List<Metapb.Store> allStores = pdClient.getAllStores(bo);
for (Metapb.Store store : allStores) {
ImporterStoreClient client = builder.build(new TiStore(store));
client.switchMode(mode);
}
}
}

View File

@ -29,6 +29,12 @@ public class RegionCache {
keyToRegionIdCache = TreeRangeMap.create();
}
public synchronized void invalidateAll() {
regionCache.clear();
storeCache.clear();
keyToRegionIdCache.clear();
}
public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
Long regionId;
if (key.isEmpty()) {

View File

@ -89,6 +89,10 @@ public class RegionManager {
return this.pdClient;
}
public void invalidateAll() {
cache.invalidateAll();
}
public TiRegion getRegionByKey(ByteString key) {
return getRegionByKey(key, ConcreteBackOffer.newGetBackOff());
}

View File

@ -753,6 +753,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
SplitRegionRequest.newBuilder()
.setContext(makeContext(storeType))
.addAllSplitKeys(splitKeys)
.setIsRawKv(conf.isRawKVMode())
.build();
KVErrorHandler<SplitRegionResponse> handler =

View File

@ -108,6 +108,10 @@ public class TiRegion implements Serializable {
return peers;
}
public List<Peer> getPeersList() {
return getMeta().getPeersList();
}
public Peer getCurrentReplica() {
return replicaList.get(replicaIdx);
}
@ -137,10 +141,6 @@ public class TiRegion implements Serializable {
return meta.getEndKey();
}
public Key getRowEndKey() {
return Key.toRawKey(getEndKey());
}
public Kvrpcpb.Context getLeaderContext() {
return getContext(this.leader, java.util.Collections.emptySet(), false);
}

View File

@ -26,17 +26,11 @@ public interface BackOffer {
int BATCH_GET_MAX_BACKOFF = 40 * seconds;
int COP_NEXT_MAX_BACKOFF = 40 * seconds;
int GET_MAX_BACKOFF = 40 * seconds;
int PREWRITE_MAX_BACKOFF = 20 * seconds;
int CLEANUP_MAX_BACKOFF = 20 * seconds;
int GC_ONE_REGION_MAX_BACKOFF = 20 * seconds;
int GC_RESOLVE_LOCK_MAX_BACKOFF = 100 * seconds;
int GC_DELETE_RANGE_MAX_BACKOFF = 100 * seconds;
int RAWKV_MAX_BACKOFF = 20 * seconds;
int SPLIT_REGION_BACKOFF = 20 * seconds;
int BATCH_COMMIT_BACKOFF = 10 * seconds;
int PD_INFO_BACKOFF = 5 * seconds;
int TIKV_SWITCH_MODE_BACKOFF = seconds;
int SPLIT_REGION_BACKOFF = 12000;
int SCATTER_REGION_BACKOFF = 30000;
/**
* doBackOff sleeps a while base on the BackOffType and records the error message. Will stop until

View File

@ -213,4 +213,19 @@ public class ClientUtils {
throw new TiKVException("Execution exception met.", e);
}
}
public static byte[] genUUID() {
UUID uuid = UUID.randomUUID();
byte[] out = new byte[16];
long msb = uuid.getMostSignificantBits();
long lsb = uuid.getLeastSignificantBits();
for (int i = 0; i < 8; i++) {
out[i] = (byte) ((msb >> ((7 - i) * 8)) & 0xff);
}
for (int i = 8; i < 16; i++) {
out[i] = (byte) ((lsb >> ((15 - i) * 8)) & 0xff);
}
return out;
}
}

View File

@ -27,8 +27,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.RawCASConflictException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.importer.ImporterClient;
import org.tikv.common.importer.SwitchTiKVModeClient;
import org.tikv.common.key.Key;
import org.tikv.common.operation.iterator.RawScanIterator;
import org.tikv.common.region.RegionStoreClient;
@ -38,6 +41,7 @@ import org.tikv.common.util.*;
import org.tikv.kvproto.Kvrpcpb.KvPair;
public class RawKVClient implements AutoCloseable {
private final TiSession tiSession;
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final boolean atomicForCAS;
@ -85,6 +89,7 @@ public class RawKVClient implements AutoCloseable {
Objects.requireNonNull(session, "session is null");
Objects.requireNonNull(clientBuilder, "clientBuilder is null");
this.conf = session.getConf();
this.tiSession = session;
this.clientBuilder = clientBuilder;
this.batchGetThreadPool = session.getThreadPoolForBatchGet();
this.batchPutThreadPool = session.getThreadPoolForBatchPut();
@ -639,6 +644,93 @@ public class RawKVClient implements AutoCloseable {
deleteRange(key, endKey);
}
/**
* Ingest KV pairs to RawKV using StreamKV API.
*
* @param list
*/
public synchronized void ingest(List<Pair<ByteString, ByteString>> list) {
ingest(list, null);
}
/**
* Ingest KV pairs to RawKV using StreamKV API.
*
* @param list
* @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
*/
public synchronized void ingest(List<Pair<ByteString, ByteString>> list, Long ttl)
throws GrpcException {
if (list.isEmpty()) {
return;
}
Key min = Key.MAX;
Key max = Key.MIN;
Map<ByteString, ByteString> map = new HashMap<>(list.size());
for (Pair<ByteString, ByteString> pair : list) {
map.put(pair.first, pair.second);
Key key = Key.toRawKey(pair.first.toByteArray());
if (key.compareTo(min) < 0) {
min = key;
}
if (key.compareTo(max) > 0) {
max = key;
}
}
SwitchTiKVModeClient switchTiKVModeClient = tiSession.getSwitchTiKVModeClient();
try {
// switch to normal mode
switchTiKVModeClient.switchTiKVToNormalMode();
// region split
List<byte[]> splitKeys = new ArrayList<>(2);
splitKeys.add(min.getBytes());
splitKeys.add(max.next().getBytes());
tiSession.splitRegionAndScatter(splitKeys);
tiSession.getRegionManager().invalidateAll();
// switch to import mode
switchTiKVModeClient.keepTiKVToImportMode();
// group keys by region
List<ByteString> keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList());
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(
clientBuilder.getRegionManager(), keyList, ConcreteBackOffer.newRawKVBackOff());
// ingest for each region
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
TiRegion region = entry.getKey();
List<ByteString> keys = entry.getValue();
List<Pair<ByteString, ByteString>> kvs =
keys.stream().map(k -> Pair.create(k, map.get(k))).collect(Collectors.toList());
doIngest(region, kvs, ttl);
}
} finally {
// swith tikv to normal mode
switchTiKVModeClient.stopKeepTiKVToImportMode();
switchTiKVModeClient.switchTiKVToNormalMode();
}
}
private void doIngest(TiRegion region, List<Pair<ByteString, ByteString>> sortedList, Long ttl)
throws GrpcException {
if (sortedList.isEmpty()) {
return;
}
ByteString uuid = ByteString.copyFrom(genUUID());
Key minKey = Key.toRawKey(sortedList.get(0).first);
Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first);
ImporterClient importerClient =
new ImporterClient(tiSession, uuid, minKey, maxKey, region, ttl);
importerClient.rawWrite(sortedList.iterator());
}
private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl) {
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchPutThreadPool);

View File

@ -0,0 +1,108 @@
package org.tikv.common.importer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.key.Key;
import org.tikv.common.util.Pair;
import org.tikv.raw.RawKVClient;
import org.tikv.util.TestUtils;
public class RawKVIngestTest {
private TiSession session;
private static final int KEY_NUMBER = 16;
private static final String KEY_PREFIX = "prefix_rawkv_ingest_test_";
private static final int KEY_LENGTH = KEY_PREFIX.length() + 10;
private static final int VALUE_LENGTH = 16;
@Before
public void setup() {
TiConfiguration conf = TiConfiguration.createRawDefault();
session = TiSession.create(conf);
}
@After
public void tearDown() throws Exception {
if (session != null) {
session.close();
}
}
@Test
public void rawKVIngestTest() {
RawKVClient client = session.createRawClient();
// gen test data
List<Pair<ByteString, ByteString>> sortedList = new ArrayList<>();
for (int i = 0; i < KEY_NUMBER; i++) {
byte[] key = TestUtils.genRandomKey(KEY_PREFIX, KEY_LENGTH);
byte[] value = TestUtils.genRandomValue(VALUE_LENGTH);
sortedList.add(Pair.create(ByteString.copyFrom(key), ByteString.copyFrom(value)));
}
sortedList.sort(
(o1, o2) -> {
Key k1 = Key.toRawKey(o1.first.toByteArray());
Key k2 = Key.toRawKey(o2.first.toByteArray());
return k1.compareTo(k2);
});
// ingest
client.ingest(sortedList);
// assert
for (Pair<ByteString, ByteString> pair : sortedList) {
Optional<ByteString> v = client.get(pair.first);
assertTrue(v.isPresent());
assertEquals(v.get(), pair.second);
}
}
@Test
public void rawKVIngestTestWithTTL() throws InterruptedException {
long ttl = 10;
RawKVClient client = session.createRawClient();
// gen test data
List<Pair<ByteString, ByteString>> sortedList = new ArrayList<>();
for (int i = 0; i < KEY_NUMBER; i++) {
byte[] key = TestUtils.genRandomKey(KEY_PREFIX, KEY_LENGTH);
byte[] value = TestUtils.genRandomValue(VALUE_LENGTH);
sortedList.add(Pair.create(ByteString.copyFrom(key), ByteString.copyFrom(value)));
}
sortedList.sort(
(o1, o2) -> {
Key k1 = Key.toRawKey(o1.first.toByteArray());
Key k2 = Key.toRawKey(o2.first.toByteArray());
return k1.compareTo(k2);
});
// ingest
client.ingest(sortedList, ttl);
// assert
for (Pair<ByteString, ByteString> pair : sortedList) {
Optional<ByteString> v = client.get(pair.first);
assertTrue(v.isPresent());
assertEquals(v.get(), pair.second);
}
// sleep
Thread.sleep(ttl * 2 * 1000);
// assert
for (Pair<ByteString, ByteString> pair : sortedList) {
Optional<ByteString> v = client.get(pair.first);
assertFalse(v.isPresent());
}
}
}

View File

@ -0,0 +1,52 @@
package org.tikv.common.importer;
import static org.junit.Assert.assertArrayEquals;
import static org.tikv.util.TestUtils.genRandomKey;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.region.TiRegion;
public class RegionSplitTest {
private TiSession session;
private static final int KEY_NUMBER = 10;
private static final String KEY_PREFIX = "prefix_region_split_test_";
private static final int KEY_LENGTH = KEY_PREFIX.length() + 10;
@Before
public void setup() {
TiConfiguration conf = TiConfiguration.createRawDefault();
session = TiSession.create(conf);
}
@After
public void tearDown() throws Exception {
if (session != null) {
session.close();
}
}
@Test
public void rawKVSplitTest() {
List<byte[]> splitKeys = new ArrayList<>(KEY_NUMBER);
for (int i = 0; i < KEY_NUMBER; i++) {
splitKeys.add(genRandomKey(KEY_PREFIX, KEY_LENGTH));
}
session.splitRegionAndScatter(splitKeys);
session.getRegionManager().invalidateAll();
for (int i = 0; i < KEY_NUMBER; i++) {
byte[] key = splitKeys.get(i);
TiRegion region = session.getRegionManager().getRegionByKey(ByteString.copyFrom(key));
assertArrayEquals(key, region.getStartKey().toByteArray());
}
}
}

View File

@ -0,0 +1,33 @@
package org.tikv.common.importer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
public class SwitchTiKVModeTest {
private TiSession session;
@Before
public void setup() {
TiConfiguration conf = TiConfiguration.createRawDefault();
session = TiSession.create(conf);
}
@After
public void tearDown() throws Exception {
if (session != null) {
session.close();
}
}
@Test
public void swithTiKVModeTest() throws InterruptedException {
SwitchTiKVModeClient switchTiKVModeClient = session.getSwitchTiKVModeClient();
switchTiKVModeClient.keepTiKVToImportMode();
Thread.sleep(6000);
switchTiKVModeClient.stopKeepTiKVToImportMode();
switchTiKVModeClient.switchTiKVToNormalMode();
}
}

View File

@ -0,0 +1,33 @@
package org.tikv.util;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
public class TestUtils {
public static byte[] genRandomKey(String keyPrefix, int keyLength) {
int length = keyLength - keyPrefix.length();
if (length <= 0) {
length = 0;
}
return (keyPrefix + genRandomString(length)).getBytes();
}
public static byte[] genRandomValue(int length) {
return genRandomString(length).getBytes();
}
private static String genRandomString(int length) {
Random rnd = ThreadLocalRandom.current();
StringBuilder ret = new StringBuilder(length);
for (int i = 0; i < length; i++) {
boolean isChar = (rnd.nextInt(2) % 2 == 0);
if (isChar) {
int choice = rnd.nextInt(2) % 2 == 0 ? 65 : 97;
ret.append((char) (choice + rnd.nextInt(26)));
} else {
ret.append(rnd.nextInt(10));
}
}
return ret.toString();
}
}