diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 9618562b10..12a6cefabd 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -41,6 +41,7 @@ import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiStore; import org.tikv.common.util.*; +import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Metapb; import org.tikv.raw.RawKVClient; import org.tikv.txn.KVClient; @@ -119,7 +120,7 @@ public class TiSession implements AutoCloseable { RegionStoreClientBuilder builder = new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client); - return new KVClient(conf, builder); + return new KVClient(conf, builder, this); } public TxnKVClient createTxnClient() { @@ -152,9 +153,17 @@ public class TiSession implements AutoCloseable { if (res == null) { synchronized (this) { if (importerClientBuilder == null) { - importerClientBuilder = - new ImporterStoreClient.ImporterStoreClientBuilder( - conf, this.channelFactory, this.getRegionManager(), this.getPDClient()); + if (conf.isTxnKVMode()) { + importerClientBuilder = + new ImporterStoreClient.ImporterStoreClientBuilder< + ImportSstpb.WriteRequest, ImportSstpb.WriteRequest>( + conf, this.channelFactory, this.getRegionManager(), this.getPDClient()); + } else { + importerClientBuilder = + new ImporterStoreClient.ImporterStoreClientBuilder< + ImportSstpb.RawWriteRequest, ImportSstpb.RawWriteResponse>( + conf, this.channelFactory, this.getRegionManager(), this.getPDClient()); + } } res = importerClientBuilder; } diff --git a/src/main/java/org/tikv/common/importer/ImporterClient.java b/src/main/java/org/tikv/common/importer/ImporterClient.java index 99494b5205..d2822a3dac 100644 --- a/src/main/java/org/tikv/common/importer/ImporterClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterClient.java @@ -23,6 +23,8 @@ import java.util.Iterator; import java.util.List; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.codec.Codec; +import org.tikv.common.codec.CodecDataOutput; import org.tikv.common.exception.GrpcException; import org.tikv.common.key.Key; import org.tikv.common.region.TiRegion; @@ -57,14 +59,11 @@ public class ImporterClient { } /** - * write KV pairs to RawKV using KVStream interface + * write KV pairs to RawKV/Txn using KVStream interface * * @param iterator */ - public void rawWrite(Iterator> iterator) throws GrpcException { - if (!tiConf.isRawKVMode()) { - throw new IllegalArgumentException("KVMode is not RAW in TiConfiguration!"); - } + public void write(Iterator> iterator) throws GrpcException { streamOpened = false; @@ -85,15 +84,15 @@ public class ImporterClient { } if (!streamOpened) { init(); - startRawWrite(); - rawWriteMeta(); + startWrite(); + writeMeta(); streamOpened = true; } - rawWriteBatch(pairs); + writeBatch(pairs); } if (streamOpened) { - finishRawWrite(); + finishWrite(); ingest(); } } @@ -102,10 +101,15 @@ public class ImporterClient { long regionId = region.getId(); Metapb.RegionEpoch regionEpoch = region.getRegionEpoch(); ImportSstpb.Range range = - ImportSstpb.Range.newBuilder() - .setStart(minKey.toByteString()) - .setEnd(maxKey.toByteString()) - .build(); + tiConf.isTxnKVMode() + ? ImportSstpb.Range.newBuilder() + .setStart(encode(minKey.toByteString())) + .setEnd(encode(maxKey.toByteString())) + .build() + : ImportSstpb.Range.newBuilder() + .setStart(minKey.toByteString()) + .setEnd(maxKey.toByteString()) + .build(); sstMeta = ImportSstpb.SSTMeta.newBuilder() @@ -129,39 +133,69 @@ public class ImporterClient { } } - private void startRawWrite() { + private ByteString encode(ByteString key) { + CodecDataOutput cdo = new CodecDataOutput(); + Codec.BytesCodec.writeBytes(cdo, key.toByteArray()); + return cdo.toByteString(); + } + + private void startWrite() { for (ImporterStoreClient client : clientList) { - client.startRawWrite(); + client.startWrite(); } } - private void rawWriteMeta() { - ImportSstpb.RawWriteRequest request = - ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build(); - for (ImporterStoreClient client : clientList) { - client.rawWriteBatch(request); - } - } - - private void rawWriteBatch(List pairs) { - ImportSstpb.RawWriteBatch batch; - - if (ttl == null || ttl <= 0) { - batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build(); + private void writeMeta() { + if (tiConf.isTxnKVMode()) { + ImportSstpb.WriteRequest request = + ImportSstpb.WriteRequest.newBuilder().setMeta(sstMeta).build(); + for (ImporterStoreClient client : clientList) { + client.writeBatch(request); + } } else { - batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build(); - } - - ImportSstpb.RawWriteRequest request = - ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build(); - for (ImporterStoreClient client : clientList) { - client.rawWriteBatch(request); + ImportSstpb.RawWriteRequest request = + ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build(); + for (ImporterStoreClient client : clientList) { + client.writeBatch(request); + } } } - private void finishRawWrite() { + private void writeBatch(List pairs) { + if (tiConf.isTxnKVMode()) { + ImportSstpb.WriteBatch batch; + + batch = + ImportSstpb.WriteBatch.newBuilder() + .addAllPairs(pairs) + .setCommitTs(tiSession.getTimestamp().getVersion()) + .build(); + + ImportSstpb.WriteRequest request = + ImportSstpb.WriteRequest.newBuilder().setBatch(batch).build(); + for (ImporterStoreClient client : clientList) { + client.writeBatch(request); + } + } else { + ImportSstpb.RawWriteBatch batch; + + if (ttl == null || ttl <= 0) { + batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build(); + } else { + batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build(); + } + + ImportSstpb.RawWriteRequest request = + ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build(); + for (ImporterStoreClient client : clientList) { + client.writeBatch(request); + } + } + } + + private void finishWrite() { for (ImporterStoreClient client : clientList) { - client.finishRawWrite(); + client.finishWrite(); } } @@ -171,10 +205,10 @@ public class ImporterClient { Iterator itor = workingClients.iterator(); while (itor.hasNext()) { ImporterStoreClient client = itor.next(); - if (client.isRawWriteResponseReceived()) { + if (client.isWriteResponseReceived()) { itor.remove(); - } else if (client.hasRawWriteResponseError()) { - throw new GrpcException(client.getRawWriteError()); + } else if (client.hasWriteResponseError()) { + throw new GrpcException(client.getWriteError()); } } diff --git a/src/main/java/org/tikv/common/importer/ImporterStoreClient.java b/src/main/java/org/tikv/common/importer/ImporterStoreClient.java index 96534b778a..006bf51733 100644 --- a/src/main/java/org/tikv/common/importer/ImporterStoreClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterStoreClient.java @@ -39,9 +39,9 @@ import org.tikv.kvproto.ImportSSTGrpc; import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Kvrpcpb; -public class ImporterStoreClient +public class ImporterStoreClient extends AbstractGRPCClient - implements StreamObserver { + implements StreamObserver { private static final Logger logger = LoggerFactory.getLogger(ImporterStoreClient.class); @@ -53,43 +53,43 @@ public class ImporterStoreClient super(conf, channelFactory, blockingStub, asyncStub); } - private StreamObserver streamObserverRequest; - private ImportSstpb.RawWriteResponse rawWriteResponse; - private Throwable rawWriteError; + private StreamObserver streamObserverRequest; + private ResponseClass writeResponse; + private Throwable writeError; - public synchronized boolean isRawWriteResponseReceived() { - return rawWriteResponse != null; + public synchronized boolean isWriteResponseReceived() { + return writeResponse != null; } - private synchronized ImportSstpb.RawWriteResponse getRawWriteResponse() { - return rawWriteResponse; + private synchronized ResponseClass getWriteResponse() { + return writeResponse; } - private synchronized void setRawWriteResponse(ImportSstpb.RawWriteResponse rawWriteResponse) { - this.rawWriteResponse = rawWriteResponse; + private synchronized void setWriteResponse(ResponseClass writeResponse) { + this.writeResponse = writeResponse; } - public synchronized boolean hasRawWriteResponseError() { - return this.rawWriteError != null; + public synchronized boolean hasWriteResponseError() { + return this.writeError != null; } - public synchronized Throwable getRawWriteError() { - return this.rawWriteError; + public synchronized Throwable getWriteError() { + return this.writeError; } - private synchronized void setRawWriteError(Throwable t) { - this.rawWriteError = t; + private synchronized void setWriteError(Throwable t) { + this.writeError = t; } @Override - public void onNext(ImportSstpb.RawWriteResponse value) { - setRawWriteResponse(value); + public void onNext(ResponseClass response) { + setWriteResponse(response); } @Override public void onError(Throwable t) { - setRawWriteError(t); - logger.error("Error during raw write!", t); + setWriteError(t); + logger.error("Error during write!", t); } @Override @@ -98,36 +98,51 @@ public class ImporterStoreClient } /** - * Ingest KV pairs to RawKV using gRPC streaming mode. This API should be called on both leader - * and followers. + * Ingest KV pairs to RawKV/Txn using gRPC streaming mode. This API should be called on both + * leader and followers. * * @return */ - public void startRawWrite() { - streamObserverRequest = getAsyncStub().rawWrite(this); + public void startWrite() { + if (conf.isRawKVMode()) { + streamObserverRequest = + (StreamObserver) + getAsyncStub().rawWrite((StreamObserver) this); + } else { + streamObserverRequest = + (StreamObserver) + getAsyncStub().write((StreamObserver) this); + } } /** - * This API should be called after `startRawWrite`. + * This API should be called after `startWrite`. * * @param request */ - public void rawWriteBatch(ImportSstpb.RawWriteRequest request) { + public void writeBatch(RequestClass request) { streamObserverRequest.onNext(request); } - /** This API should be called after `rawWriteBatch`. */ - public void finishRawWrite() { + /** This API should be called after `writeBatch`. */ + public void finishWrite() { streamObserverRequest.onCompleted(); } /** - * This API should be called after `finishRawWrite`. This API should be called on leader only. + * This API should be called after `finishWrite`. This API should be called on leader only. * * @param ctx */ public void multiIngest(Kvrpcpb.Context ctx) { - List metasList = getRawWriteResponse().getMetasList(); + List metasList; + if (writeResponse instanceof ImportSstpb.RawWriteResponse) { + metasList = ((ImportSstpb.RawWriteResponse) getWriteResponse()).getMetasList(); + } else if (writeResponse instanceof ImportSstpb.WriteResponse) { + metasList = ((ImportSstpb.WriteResponse) getWriteResponse()).getMetasList(); + } else { + throw new IllegalArgumentException("Wrong response type"); + } ImportSstpb.MultiIngestRequest request = ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build(); @@ -163,7 +178,7 @@ public class ImporterStoreClient @Override public void close() throws Exception {} - public static class ImporterStoreClientBuilder { + public static class ImporterStoreClientBuilder { private final TiConfiguration conf; private final ChannelFactory channelFactory; private final RegionManager regionManager; @@ -193,7 +208,8 @@ public class ImporterStoreClient ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel); ImportSSTGrpc.ImportSSTStub asyncStub = ImportSSTGrpc.newStub(channel); - return new ImporterStoreClient(conf, channelFactory, blockingStub, asyncStub); + return new ImporterStoreClient( + conf, channelFactory, blockingStub, asyncStub); } } } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index c6eb45ed78..af502d8672 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -727,7 +727,7 @@ public class RawKVClient implements AutoCloseable { Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first); ImporterClient importerClient = new ImporterClient(tiSession, uuid, minKey, maxKey, region, ttl); - importerClient.rawWrite(sortedList.iterator()); + importerClient.write(sortedList.iterator()); } private void doSendBatchPut(BackOffer backOffer, Map kvPairs, long ttl) { diff --git a/src/main/java/org/tikv/txn/KVClient.java b/src/main/java/org/tikv/txn/KVClient.java index 28b29f4ee1..6b7b63e4dd 100644 --- a/src/main/java/org/tikv/txn/KVClient.java +++ b/src/main/java/org/tikv/txn/KVClient.java @@ -23,22 +23,25 @@ import java.util.*; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiKVException; +import org.tikv.common.importer.ImporterClient; +import org.tikv.common.importer.SwitchTiKVModeClient; +import org.tikv.common.key.Key; import org.tikv.common.operation.iterator.ConcreteScanIterator; import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; -import org.tikv.common.util.BackOffFunction; -import org.tikv.common.util.BackOffer; -import org.tikv.common.util.Batch; -import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.*; import org.tikv.kvproto.Kvrpcpb; public class KVClient implements AutoCloseable { + private final TiSession tiSession; private static final Logger logger = LoggerFactory.getLogger(KVClient.class); private static final int MAX_BATCH_LIMIT = 1024; private static final int BATCH_GET_SIZE = 16 * 1024; @@ -46,9 +49,10 @@ public class KVClient implements AutoCloseable { private final TiConfiguration conf; private final ExecutorService executorService; - public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) { + public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder, TiSession session) { Objects.requireNonNull(conf, "conf is null"); Objects.requireNonNull(clientBuilder, "clientBuilder is null"); + this.tiSession = session; this.conf = conf; this.clientBuilder = clientBuilder; executorService = @@ -131,6 +135,64 @@ public class KVClient implements AutoCloseable { return scan(startKey, version, Integer.MAX_VALUE); } + public synchronized void ingest(List> list) throws GrpcException { + if (list.isEmpty()) { + return; + } + + Key min = Key.MAX; + Key max = Key.MIN; + Map map = new HashMap<>(list.size()); + + for (Pair pair : list) { + map.put(pair.first, pair.second); + Key key = Key.toRawKey(pair.first.toByteArray()); + if (key.compareTo(min) < 0) { + min = key; + } + if (key.compareTo(max) > 0) { + max = key; + } + } + + SwitchTiKVModeClient switchTiKVModeClient = tiSession.getSwitchTiKVModeClient(); + + try { + // switch to normal mode + switchTiKVModeClient.switchTiKVToNormalMode(); + + // region split + List splitKeys = new ArrayList<>(2); + splitKeys.add(min.getBytes()); + splitKeys.add(max.next().getBytes()); + + tiSession.splitRegionAndScatter(splitKeys); + tiSession.getRegionManager().invalidateAll(); + + // switch to import mode + switchTiKVModeClient.keepTiKVToImportMode(); + + // group keys by region + List keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList()); + Map> groupKeys = + groupKeysByRegion( + clientBuilder.getRegionManager(), keyList, ConcreteBackOffer.newRawKVBackOff()); + + // ingest for each region + for (Map.Entry> entry : groupKeys.entrySet()) { + TiRegion region = entry.getKey(); + List keys = entry.getValue(); + List> kvs = + keys.stream().map(k -> Pair.create(k, map.get(k))).collect(Collectors.toList()); + doIngest(region, kvs); + } + } finally { + // swith tikv to normal mode + switchTiKVModeClient.stopKeepTiKVToImportMode(); + switchTiKVModeClient.switchTiKVToNormalMode(); + } + } + private List doSendBatchGet( BackOffer backOffer, List keys, long version) { ExecutorCompletionService> completionService = @@ -202,4 +264,17 @@ public class KVClient implements AutoCloseable { int limit) { return new ConcreteScanIterator(conf, builder, startKey, version, limit); } + + private void doIngest(TiRegion region, List> sortedList) + throws GrpcException { + if (sortedList.isEmpty()) { + return; + } + + ByteString uuid = ByteString.copyFrom(genUUID()); + Key minKey = Key.toRawKey(sortedList.get(0).first); + Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first); + ImporterClient importerClient = new ImporterClient(tiSession, uuid, minKey, maxKey, region, 0L); + importerClient.write(sortedList.iterator()); + } } diff --git a/src/test/java/org/tikv/common/importer/TxnKVIngestTest.java b/src/test/java/org/tikv/common/importer/TxnKVIngestTest.java new file mode 100644 index 0000000000..fc0935977c --- /dev/null +++ b/src/test/java/org/tikv/common/importer/TxnKVIngestTest.java @@ -0,0 +1,69 @@ +package org.tikv.common.importer; + +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.key.Key; +import org.tikv.common.util.Pair; +import org.tikv.txn.KVClient; +import org.tikv.util.TestUtils; + +public class TxnKVIngestTest { + private TiSession session; + + private static final int KEY_NUMBER = 16; + private static final String KEY_PREFIX = "prefix_txn_ingest_test_"; + private static final int KEY_LENGTH = KEY_PREFIX.length() + 10; + private static final int VALUE_LENGTH = 16; + + @Before + public void setup() { + TiConfiguration conf = TiConfiguration.createDefault(); + conf.setTest(true); + session = TiSession.create(conf); + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + } + + @Test + public void txnIngestTest() throws InterruptedException { + KVClient client = session.createKVClient(); + + // gen test data + List> sortedList = new ArrayList<>(); + for (int i = 0; i < KEY_NUMBER; i++) { + byte[] key = TestUtils.genRandomKey(KEY_PREFIX, KEY_LENGTH); + byte[] value = TestUtils.genRandomValue(VALUE_LENGTH); + sortedList.add(Pair.create(ByteString.copyFrom(key), ByteString.copyFrom(value))); + } + sortedList.sort( + (o1, o2) -> { + Key k1 = Key.toRawKey(o1.first.toByteArray()); + Key k2 = Key.toRawKey(o2.first.toByteArray()); + return k1.compareTo(k2); + }); + + // ingest + client.ingest(sortedList); + + // assert + long version = session.getTimestamp().getVersion(); + for (Pair pair : sortedList) { + ByteString key = pair.first; + ByteString v = client.get(key, version); + assertEquals(v, pair.second); + } + } +}