From 5e9d65e4c5864523af2bbf0d3ba6030aedc0e9ab Mon Sep 17 00:00:00 2001 From: Liangliang Gu Date: Tue, 14 Sep 2021 10:57:57 +0800 Subject: [PATCH] bulk load: support deduplicate key (#276) Signed-off-by: marsishandsome --- .../tikv/common/importer/ImporterClient.java | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/tikv/common/importer/ImporterClient.java b/src/main/java/org/tikv/common/importer/ImporterClient.java index 12032a311d..3c86eaf7a2 100644 --- a/src/main/java/org/tikv/common/importer/ImporterClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterClient.java @@ -21,11 +21,14 @@ import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; import org.tikv.common.codec.Codec; import org.tikv.common.codec.CodecDataOutput; import org.tikv.common.exception.GrpcException; +import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiStore; @@ -34,6 +37,8 @@ import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Metapb; public class ImporterClient { + private static final Logger logger = LoggerFactory.getLogger(ImporterClient.class); + private TiConfiguration tiConf; private TiSession tiSession; private ByteString uuid; @@ -42,6 +47,8 @@ public class ImporterClient { private TiRegion region; private Long ttl; + private boolean deduplicate = false; + private boolean streamOpened = false; private ImportSstpb.SSTMeta sstMeta; private List clientList; @@ -58,25 +65,45 @@ public class ImporterClient { this.ttl = ttl; } + public boolean isDeduplicate() { + return deduplicate; + } + + public void setDeduplicate(boolean deduplicate) { + this.deduplicate = deduplicate; + } + /** * write KV pairs to RawKV/Txn using KVStream interface * * @param iterator */ - public void write(Iterator> iterator) throws GrpcException { + public void write(Iterator> iterator) throws TiKVException { streamOpened = false; int maxKVBatchSize = tiConf.getImporterMaxKVBatchSize(); int maxKVBatchBytes = tiConf.getImporterMaxKVBatchBytes(); int totalBytes = 0; + ByteString preKey = null; while (iterator.hasNext()) { ArrayList pairs = new ArrayList<>(maxKVBatchSize); for (int i = 0; i < maxKVBatchSize; i++) { if (iterator.hasNext()) { Pair pair = iterator.next(); - pairs.add(ImportSstpb.Pair.newBuilder().setKey(pair.first).setValue(pair.second).build()); - totalBytes += (pair.first.size() + pair.second.size()); + if (preKey != null && preKey.equals(pair.first)) { + if (deduplicate) { + logger.info("skip duplicate key: {}", preKey.toStringUtf8()); + } else { + throw new TiKVException( + String.format("duplicate key found, key = %s", preKey.toStringUtf8())); + } + } else { + pairs.add( + ImportSstpb.Pair.newBuilder().setKey(pair.first).setValue(pair.second).build()); + totalBytes += (pair.first.size() + pair.second.size()); + preKey = pair.first; + } } if (totalBytes > maxKVBatchBytes || !iterator.hasNext()) { break;