From 3f2aaf589d7913a387166bb15f8ebd2d78cf4f38 Mon Sep 17 00:00:00 2001 From: Chase Zhang Date: Mon, 23 Aug 2021 11:07:59 +0900 Subject: [PATCH] Fix region key range while pulling CDC logs (#256) (#259) Signed-off-by: chase --- src/main/java/org/tikv/cdc/CDCConfig.java | 10 ++++++ .../java/org/tikv/cdc/RegionCDCClient.java | 31 ++++++++++++++++++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/cdc/CDCConfig.java b/src/main/java/org/tikv/cdc/CDCConfig.java index 9692a3bd0a..a0ee9db3e2 100644 --- a/src/main/java/org/tikv/cdc/CDCConfig.java +++ b/src/main/java/org/tikv/cdc/CDCConfig.java @@ -4,15 +4,21 @@ import org.tikv.kvproto.Kvrpcpb; public class CDCConfig { private static final int EVENT_BUFFER_SIZE = 50000; + private static final int MAX_ROW_KEY_SIZE = 10240; private static final boolean READ_OLD_VALUE = true; private int eventBufferSize = EVENT_BUFFER_SIZE; + private int maxRowKeySize = MAX_ROW_KEY_SIZE; private boolean readOldValue = READ_OLD_VALUE; public void setEventBufferSize(final int bufferSize) { eventBufferSize = bufferSize; } + public void setMaxRowKeySize(final int rowKeySize) { + maxRowKeySize = rowKeySize; + } + public void setReadOldValue(final boolean value) { readOldValue = value; } @@ -21,6 +27,10 @@ public class CDCConfig { return eventBufferSize; } + public int getMaxRowKeySize() { + return maxRowKeySize; + } + public boolean getReadOldValue() { return readOldValue; } diff --git a/src/main/java/org/tikv/cdc/RegionCDCClient.java b/src/main/java/org/tikv/cdc/RegionCDCClient.java index 2bdf58324b..52d27550fe 100644 --- a/src/main/java/org/tikv/cdc/RegionCDCClient.java +++ b/src/main/java/org/tikv/cdc/RegionCDCClient.java @@ -9,12 +9,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.region.TiRegion; +import org.tikv.common.util.FastByteComparisons; +import org.tikv.common.util.KeyRangeUtils; import org.tikv.kvproto.Cdcpb.ChangeDataEvent; import org.tikv.kvproto.Cdcpb.ChangeDataRequest; import org.tikv.kvproto.Cdcpb.Event.LogType; +import org.tikv.kvproto.Cdcpb.Event.Row; import org.tikv.kvproto.Cdcpb.Header; import org.tikv.kvproto.Cdcpb.ResolvedTs; import org.tikv.kvproto.ChangeDataGrpc; @@ -34,6 +38,7 @@ class RegionCDCClient implements AutoCloseable, StreamObserver private final ChangeDataStub asyncStub; private final Consumer eventConsumer; private final CDCConfig config; + private final Predicate rowFilter; private final AtomicBoolean running = new AtomicBoolean(false); @@ -54,6 +59,24 @@ class RegionCDCClient implements AutoCloseable, StreamObserver this.regionKeyRange = KeyRange.newBuilder().setStart(region.getStartKey()).setEnd(region.getEndKey()).build(); + + this.rowFilter = + regionEnclosed() + ? ((row) -> true) + : new Predicate() { + final byte[] buffer = new byte[config.getMaxRowKeySize()]; + + final byte[] start = keyRange.getStart().toByteArray(); + final byte[] end = keyRange.getEnd().toByteArray(); + + @Override + public boolean test(final Row row) { + final int len = row.getKey().size(); + row.getKey().copyTo(buffer, 0); + return (FastByteComparisons.compareTo(buffer, 0, len, start, 0, start.length) >= 0) + && (FastByteComparisons.compareTo(buffer, 0, len, end, 0, end.length) < 0); + } + }; } public synchronized void start(final long startTs) { @@ -87,6 +110,11 @@ class RegionCDCClient implements AutoCloseable, StreamObserver return regionKeyRange; } + public boolean regionEnclosed() { + return KeyRangeUtils.makeRange(keyRange.getStart(), keyRange.getEnd()) + .encloses(KeyRangeUtils.makeRange(regionKeyRange.getStart(), regionKeyRange.getEnd())); + } + public boolean isRunning() { return running.get(); } @@ -133,6 +161,7 @@ class RegionCDCClient implements AutoCloseable, StreamObserver .stream() .flatMap(ev -> ev.getEntries().getEntriesList().stream()) .filter(row -> ALLOWED_LOGTYPE.contains(row.getType())) + .filter(this.rowFilter) .map(row -> CDCEvent.rowEvent(region.getId(), row)) .forEach(this::submitEvent); @@ -149,7 +178,7 @@ class RegionCDCClient implements AutoCloseable, StreamObserver } private void submitEvent(final CDCEvent event) { - LOGGER.info("submit event: {}", event); + LOGGER.debug("submit event: {}", event); eventConsumer.accept(event); } }