bulk load: support deduplicate key (#276)

Signed-off-by: marsishandsome <marsishandsome@gmail.com>
This commit is contained in:
Liangliang Gu 2021-09-14 10:57:57 +08:00 committed by GitHub
parent 5268af69a1
commit 5e9d65e4c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 30 additions and 3 deletions

View File

@ -21,11 +21,14 @@ import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.codec.Codec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
@ -34,6 +37,8 @@ import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Metapb;
public class ImporterClient {
private static final Logger logger = LoggerFactory.getLogger(ImporterClient.class);
private TiConfiguration tiConf;
private TiSession tiSession;
private ByteString uuid;
@ -42,6 +47,8 @@ public class ImporterClient {
private TiRegion region;
private Long ttl;
private boolean deduplicate = false;
private boolean streamOpened = false;
private ImportSstpb.SSTMeta sstMeta;
private List<ImporterStoreClient> clientList;
@ -58,25 +65,45 @@ public class ImporterClient {
this.ttl = ttl;
}
public boolean isDeduplicate() {
return deduplicate;
}
public void setDeduplicate(boolean deduplicate) {
this.deduplicate = deduplicate;
}
/**
* write KV pairs to RawKV/Txn using KVStream interface
*
* @param iterator
*/
public void write(Iterator<Pair<ByteString, ByteString>> iterator) throws GrpcException {
public void write(Iterator<Pair<ByteString, ByteString>> iterator) throws TiKVException {
streamOpened = false;
int maxKVBatchSize = tiConf.getImporterMaxKVBatchSize();
int maxKVBatchBytes = tiConf.getImporterMaxKVBatchBytes();
int totalBytes = 0;
ByteString preKey = null;
while (iterator.hasNext()) {
ArrayList<ImportSstpb.Pair> pairs = new ArrayList<>(maxKVBatchSize);
for (int i = 0; i < maxKVBatchSize; i++) {
if (iterator.hasNext()) {
Pair<ByteString, ByteString> pair = iterator.next();
pairs.add(ImportSstpb.Pair.newBuilder().setKey(pair.first).setValue(pair.second).build());
totalBytes += (pair.first.size() + pair.second.size());
if (preKey != null && preKey.equals(pair.first)) {
if (deduplicate) {
logger.info("skip duplicate key: {}", preKey.toStringUtf8());
} else {
throw new TiKVException(
String.format("duplicate key found, key = %s", preKey.toStringUtf8()));
}
} else {
pairs.add(
ImportSstpb.Pair.newBuilder().setKey(pair.first).setValue(pair.second).build());
totalBytes += (pair.first.size() + pair.second.size());
preKey = pair.first;
}
}
if (totalBytes > maxKVBatchBytes || !iterator.hasNext()) {
break;