feat: txn ingest (#254)

* feat: txn ingest

Signed-off-by: abingcbc <abingcbc626@gmail.com>

* refactor: merge TxnImporterClient into Importerclient

Signed-off-by: abingcbc <abingcbc626@gmail.com>
This commit is contained in:
Bingchang Chen 2021-08-20 15:59:05 +08:00 committed by GitHub
parent 5e2dd1e216
commit e5be356a5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 286 additions and 83 deletions

View File

@ -41,6 +41,7 @@ import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.*;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
import org.tikv.txn.KVClient;
@ -119,7 +120,7 @@ public class TiSession implements AutoCloseable {
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client);
return new KVClient(conf, builder);
return new KVClient(conf, builder, this);
}
public TxnKVClient createTxnClient() {
@ -152,9 +153,17 @@ public class TiSession implements AutoCloseable {
if (res == null) {
synchronized (this) {
if (importerClientBuilder == null) {
importerClientBuilder =
new ImporterStoreClient.ImporterStoreClientBuilder(
conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
if (conf.isTxnKVMode()) {
importerClientBuilder =
new ImporterStoreClient.ImporterStoreClientBuilder<
ImportSstpb.WriteRequest, ImportSstpb.WriteRequest>(
conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
} else {
importerClientBuilder =
new ImporterStoreClient.ImporterStoreClientBuilder<
ImportSstpb.RawWriteRequest, ImportSstpb.RawWriteResponse>(
conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
}
}
res = importerClientBuilder;
}

View File

@ -23,6 +23,8 @@ import java.util.Iterator;
import java.util.List;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.codec.Codec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
@ -57,14 +59,11 @@ public class ImporterClient {
}
/**
* write KV pairs to RawKV using KVStream interface
* write KV pairs to RawKV/Txn using KVStream interface
*
* @param iterator
*/
public void rawWrite(Iterator<Pair<ByteString, ByteString>> iterator) throws GrpcException {
if (!tiConf.isRawKVMode()) {
throw new IllegalArgumentException("KVMode is not RAW in TiConfiguration!");
}
public void write(Iterator<Pair<ByteString, ByteString>> iterator) throws GrpcException {
streamOpened = false;
@ -85,15 +84,15 @@ public class ImporterClient {
}
if (!streamOpened) {
init();
startRawWrite();
rawWriteMeta();
startWrite();
writeMeta();
streamOpened = true;
}
rawWriteBatch(pairs);
writeBatch(pairs);
}
if (streamOpened) {
finishRawWrite();
finishWrite();
ingest();
}
}
@ -102,10 +101,15 @@ public class ImporterClient {
long regionId = region.getId();
Metapb.RegionEpoch regionEpoch = region.getRegionEpoch();
ImportSstpb.Range range =
ImportSstpb.Range.newBuilder()
.setStart(minKey.toByteString())
.setEnd(maxKey.toByteString())
.build();
tiConf.isTxnKVMode()
? ImportSstpb.Range.newBuilder()
.setStart(encode(minKey.toByteString()))
.setEnd(encode(maxKey.toByteString()))
.build()
: ImportSstpb.Range.newBuilder()
.setStart(minKey.toByteString())
.setEnd(maxKey.toByteString())
.build();
sstMeta =
ImportSstpb.SSTMeta.newBuilder()
@ -129,39 +133,69 @@ public class ImporterClient {
}
}
private void startRawWrite() {
private ByteString encode(ByteString key) {
CodecDataOutput cdo = new CodecDataOutput();
Codec.BytesCodec.writeBytes(cdo, key.toByteArray());
return cdo.toByteString();
}
private void startWrite() {
for (ImporterStoreClient client : clientList) {
client.startRawWrite();
client.startWrite();
}
}
private void rawWriteMeta() {
ImportSstpb.RawWriteRequest request =
ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build();
for (ImporterStoreClient client : clientList) {
client.rawWriteBatch(request);
}
}
private void rawWriteBatch(List<ImportSstpb.Pair> pairs) {
ImportSstpb.RawWriteBatch batch;
if (ttl == null || ttl <= 0) {
batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build();
private void writeMeta() {
if (tiConf.isTxnKVMode()) {
ImportSstpb.WriteRequest request =
ImportSstpb.WriteRequest.newBuilder().setMeta(sstMeta).build();
for (ImporterStoreClient client : clientList) {
client.writeBatch(request);
}
} else {
batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build();
}
ImportSstpb.RawWriteRequest request =
ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build();
for (ImporterStoreClient client : clientList) {
client.rawWriteBatch(request);
ImportSstpb.RawWriteRequest request =
ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build();
for (ImporterStoreClient client : clientList) {
client.writeBatch(request);
}
}
}
private void finishRawWrite() {
private void writeBatch(List<ImportSstpb.Pair> pairs) {
if (tiConf.isTxnKVMode()) {
ImportSstpb.WriteBatch batch;
batch =
ImportSstpb.WriteBatch.newBuilder()
.addAllPairs(pairs)
.setCommitTs(tiSession.getTimestamp().getVersion())
.build();
ImportSstpb.WriteRequest request =
ImportSstpb.WriteRequest.newBuilder().setBatch(batch).build();
for (ImporterStoreClient client : clientList) {
client.writeBatch(request);
}
} else {
ImportSstpb.RawWriteBatch batch;
if (ttl == null || ttl <= 0) {
batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build();
} else {
batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build();
}
ImportSstpb.RawWriteRequest request =
ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build();
for (ImporterStoreClient client : clientList) {
client.writeBatch(request);
}
}
}
private void finishWrite() {
for (ImporterStoreClient client : clientList) {
client.finishRawWrite();
client.finishWrite();
}
}
@ -171,10 +205,10 @@ public class ImporterClient {
Iterator<ImporterStoreClient> itor = workingClients.iterator();
while (itor.hasNext()) {
ImporterStoreClient client = itor.next();
if (client.isRawWriteResponseReceived()) {
if (client.isWriteResponseReceived()) {
itor.remove();
} else if (client.hasRawWriteResponseError()) {
throw new GrpcException(client.getRawWriteError());
} else if (client.hasWriteResponseError()) {
throw new GrpcException(client.getWriteError());
}
}

View File

@ -39,9 +39,9 @@ import org.tikv.kvproto.ImportSSTGrpc;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Kvrpcpb;
public class ImporterStoreClient
public class ImporterStoreClient<RequestClass, ResponseClass>
extends AbstractGRPCClient<ImportSSTGrpc.ImportSSTBlockingStub, ImportSSTGrpc.ImportSSTStub>
implements StreamObserver<ImportSstpb.RawWriteResponse> {
implements StreamObserver<ResponseClass> {
private static final Logger logger = LoggerFactory.getLogger(ImporterStoreClient.class);
@ -53,43 +53,43 @@ public class ImporterStoreClient
super(conf, channelFactory, blockingStub, asyncStub);
}
private StreamObserver<ImportSstpb.RawWriteRequest> streamObserverRequest;
private ImportSstpb.RawWriteResponse rawWriteResponse;
private Throwable rawWriteError;
private StreamObserver<RequestClass> streamObserverRequest;
private ResponseClass writeResponse;
private Throwable writeError;
public synchronized boolean isRawWriteResponseReceived() {
return rawWriteResponse != null;
public synchronized boolean isWriteResponseReceived() {
return writeResponse != null;
}
private synchronized ImportSstpb.RawWriteResponse getRawWriteResponse() {
return rawWriteResponse;
private synchronized ResponseClass getWriteResponse() {
return writeResponse;
}
private synchronized void setRawWriteResponse(ImportSstpb.RawWriteResponse rawWriteResponse) {
this.rawWriteResponse = rawWriteResponse;
private synchronized void setWriteResponse(ResponseClass writeResponse) {
this.writeResponse = writeResponse;
}
public synchronized boolean hasRawWriteResponseError() {
return this.rawWriteError != null;
public synchronized boolean hasWriteResponseError() {
return this.writeError != null;
}
public synchronized Throwable getRawWriteError() {
return this.rawWriteError;
public synchronized Throwable getWriteError() {
return this.writeError;
}
private synchronized void setRawWriteError(Throwable t) {
this.rawWriteError = t;
private synchronized void setWriteError(Throwable t) {
this.writeError = t;
}
@Override
public void onNext(ImportSstpb.RawWriteResponse value) {
setRawWriteResponse(value);
public void onNext(ResponseClass response) {
setWriteResponse(response);
}
@Override
public void onError(Throwable t) {
setRawWriteError(t);
logger.error("Error during raw write!", t);
setWriteError(t);
logger.error("Error during write!", t);
}
@Override
@ -98,36 +98,51 @@ public class ImporterStoreClient
}
/**
* Ingest KV pairs to RawKV using gRPC streaming mode. This API should be called on both leader
* and followers.
* Ingest KV pairs to RawKV/Txn using gRPC streaming mode. This API should be called on both
* leader and followers.
*
* @return
*/
public void startRawWrite() {
streamObserverRequest = getAsyncStub().rawWrite(this);
public void startWrite() {
if (conf.isRawKVMode()) {
streamObserverRequest =
(StreamObserver<RequestClass>)
getAsyncStub().rawWrite((StreamObserver<ImportSstpb.RawWriteResponse>) this);
} else {
streamObserverRequest =
(StreamObserver<RequestClass>)
getAsyncStub().write((StreamObserver<ImportSstpb.WriteResponse>) this);
}
}
/**
* This API should be called after `startRawWrite`.
* This API should be called after `startWrite`.
*
* @param request
*/
public void rawWriteBatch(ImportSstpb.RawWriteRequest request) {
public void writeBatch(RequestClass request) {
streamObserverRequest.onNext(request);
}
/** This API should be called after `rawWriteBatch`. */
public void finishRawWrite() {
/** This API should be called after `writeBatch`. */
public void finishWrite() {
streamObserverRequest.onCompleted();
}
/**
* This API should be called after `finishRawWrite`. This API should be called on leader only.
* This API should be called after `finishWrite`. This API should be called on leader only.
*
* @param ctx
*/
public void multiIngest(Kvrpcpb.Context ctx) {
List<ImportSstpb.SSTMeta> metasList = getRawWriteResponse().getMetasList();
List<ImportSstpb.SSTMeta> metasList;
if (writeResponse instanceof ImportSstpb.RawWriteResponse) {
metasList = ((ImportSstpb.RawWriteResponse) getWriteResponse()).getMetasList();
} else if (writeResponse instanceof ImportSstpb.WriteResponse) {
metasList = ((ImportSstpb.WriteResponse) getWriteResponse()).getMetasList();
} else {
throw new IllegalArgumentException("Wrong response type");
}
ImportSstpb.MultiIngestRequest request =
ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build();
@ -163,7 +178,7 @@ public class ImporterStoreClient
@Override
public void close() throws Exception {}
public static class ImporterStoreClientBuilder {
public static class ImporterStoreClientBuilder<RequestClass, ResponseClass> {
private final TiConfiguration conf;
private final ChannelFactory channelFactory;
private final RegionManager regionManager;
@ -193,7 +208,8 @@ public class ImporterStoreClient
ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel);
ImportSSTGrpc.ImportSSTStub asyncStub = ImportSSTGrpc.newStub(channel);
return new ImporterStoreClient(conf, channelFactory, blockingStub, asyncStub);
return new ImporterStoreClient<RequestClass, ResponseClass>(
conf, channelFactory, blockingStub, asyncStub);
}
}
}

View File

@ -727,7 +727,7 @@ public class RawKVClient implements AutoCloseable {
Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first);
ImporterClient importerClient =
new ImporterClient(tiSession, uuid, minKey, maxKey, region, ttl);
importerClient.rawWrite(sortedList.iterator());
importerClient.write(sortedList.iterator());
}
private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl) {

View File

@ -23,22 +23,25 @@ import java.util.*;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.importer.ImporterClient;
import org.tikv.common.importer.SwitchTiKVModeClient;
import org.tikv.common.key.Key;
import org.tikv.common.operation.iterator.ConcreteScanIterator;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.Batch;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.*;
import org.tikv.kvproto.Kvrpcpb;
public class KVClient implements AutoCloseable {
private final TiSession tiSession;
private static final Logger logger = LoggerFactory.getLogger(KVClient.class);
private static final int MAX_BATCH_LIMIT = 1024;
private static final int BATCH_GET_SIZE = 16 * 1024;
@ -46,9 +49,10 @@ public class KVClient implements AutoCloseable {
private final TiConfiguration conf;
private final ExecutorService executorService;
public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) {
public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder, TiSession session) {
Objects.requireNonNull(conf, "conf is null");
Objects.requireNonNull(clientBuilder, "clientBuilder is null");
this.tiSession = session;
this.conf = conf;
this.clientBuilder = clientBuilder;
executorService =
@ -131,6 +135,64 @@ public class KVClient implements AutoCloseable {
return scan(startKey, version, Integer.MAX_VALUE);
}
public synchronized void ingest(List<Pair<ByteString, ByteString>> list) throws GrpcException {
if (list.isEmpty()) {
return;
}
Key min = Key.MAX;
Key max = Key.MIN;
Map<ByteString, ByteString> map = new HashMap<>(list.size());
for (Pair<ByteString, ByteString> pair : list) {
map.put(pair.first, pair.second);
Key key = Key.toRawKey(pair.first.toByteArray());
if (key.compareTo(min) < 0) {
min = key;
}
if (key.compareTo(max) > 0) {
max = key;
}
}
SwitchTiKVModeClient switchTiKVModeClient = tiSession.getSwitchTiKVModeClient();
try {
// switch to normal mode
switchTiKVModeClient.switchTiKVToNormalMode();
// region split
List<byte[]> splitKeys = new ArrayList<>(2);
splitKeys.add(min.getBytes());
splitKeys.add(max.next().getBytes());
tiSession.splitRegionAndScatter(splitKeys);
tiSession.getRegionManager().invalidateAll();
// switch to import mode
switchTiKVModeClient.keepTiKVToImportMode();
// group keys by region
List<ByteString> keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList());
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(
clientBuilder.getRegionManager(), keyList, ConcreteBackOffer.newRawKVBackOff());
// ingest for each region
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
TiRegion region = entry.getKey();
List<ByteString> keys = entry.getValue();
List<Pair<ByteString, ByteString>> kvs =
keys.stream().map(k -> Pair.create(k, map.get(k))).collect(Collectors.toList());
doIngest(region, kvs);
}
} finally {
// swith tikv to normal mode
switchTiKVModeClient.stopKeepTiKVToImportMode();
switchTiKVModeClient.switchTiKVToNormalMode();
}
}
private List<Kvrpcpb.KvPair> doSendBatchGet(
BackOffer backOffer, List<ByteString> keys, long version) {
ExecutorCompletionService<List<Kvrpcpb.KvPair>> completionService =
@ -202,4 +264,17 @@ public class KVClient implements AutoCloseable {
int limit) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
}
private void doIngest(TiRegion region, List<Pair<ByteString, ByteString>> sortedList)
throws GrpcException {
if (sortedList.isEmpty()) {
return;
}
ByteString uuid = ByteString.copyFrom(genUUID());
Key minKey = Key.toRawKey(sortedList.get(0).first);
Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first);
ImporterClient importerClient = new ImporterClient(tiSession, uuid, minKey, maxKey, region, 0L);
importerClient.write(sortedList.iterator());
}
}

View File

@ -0,0 +1,69 @@
package org.tikv.common.importer;
import static org.junit.Assert.assertEquals;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.key.Key;
import org.tikv.common.util.Pair;
import org.tikv.txn.KVClient;
import org.tikv.util.TestUtils;
public class TxnKVIngestTest {
private TiSession session;
private static final int KEY_NUMBER = 16;
private static final String KEY_PREFIX = "prefix_txn_ingest_test_";
private static final int KEY_LENGTH = KEY_PREFIX.length() + 10;
private static final int VALUE_LENGTH = 16;
@Before
public void setup() {
TiConfiguration conf = TiConfiguration.createDefault();
conf.setTest(true);
session = TiSession.create(conf);
}
@After
public void tearDown() throws Exception {
if (session != null) {
session.close();
}
}
@Test
public void txnIngestTest() throws InterruptedException {
KVClient client = session.createKVClient();
// gen test data
List<Pair<ByteString, ByteString>> sortedList = new ArrayList<>();
for (int i = 0; i < KEY_NUMBER; i++) {
byte[] key = TestUtils.genRandomKey(KEY_PREFIX, KEY_LENGTH);
byte[] value = TestUtils.genRandomValue(VALUE_LENGTH);
sortedList.add(Pair.create(ByteString.copyFrom(key), ByteString.copyFrom(value)));
}
sortedList.sort(
(o1, o2) -> {
Key k1 = Key.toRawKey(o1.first.toByteArray());
Key k2 = Key.toRawKey(o2.first.toByteArray());
return k1.compareTo(k2);
});
// ingest
client.ingest(sortedList);
// assert
long version = session.getTimestamp().getVersion();
for (Pair<ByteString, ByteString> pair : sortedList) {
ByteString key = pair.first;
ByteString v = client.get(key, version);
assertEquals(v, pair.second);
}
}
}