Fix region key range while pulling CDC logs (#256) (#259)

Signed-off-by: chase <yun.er.run@gmail.com>
This commit is contained in:
Chase Zhang 2021-08-23 11:07:59 +09:00 committed by GitHub
parent e5be356a5e
commit 3f2aaf589d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 1 deletions

View File

@ -4,15 +4,21 @@ import org.tikv.kvproto.Kvrpcpb;
public class CDCConfig {
private static final int EVENT_BUFFER_SIZE = 50000;
private static final int MAX_ROW_KEY_SIZE = 10240;
private static final boolean READ_OLD_VALUE = true;
private int eventBufferSize = EVENT_BUFFER_SIZE;
private int maxRowKeySize = MAX_ROW_KEY_SIZE;
private boolean readOldValue = READ_OLD_VALUE;
public void setEventBufferSize(final int bufferSize) {
eventBufferSize = bufferSize;
}
public void setMaxRowKeySize(final int rowKeySize) {
maxRowKeySize = rowKeySize;
}
public void setReadOldValue(final boolean value) {
readOldValue = value;
}
@ -21,6 +27,10 @@ public class CDCConfig {
return eventBufferSize;
}
public int getMaxRowKeySize() {
return maxRowKeySize;
}
public boolean getReadOldValue() {
return readOldValue;
}

View File

@ -9,12 +9,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.FastByteComparisons;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.kvproto.Cdcpb.ChangeDataEvent;
import org.tikv.kvproto.Cdcpb.ChangeDataRequest;
import org.tikv.kvproto.Cdcpb.Event.LogType;
import org.tikv.kvproto.Cdcpb.Event.Row;
import org.tikv.kvproto.Cdcpb.Header;
import org.tikv.kvproto.Cdcpb.ResolvedTs;
import org.tikv.kvproto.ChangeDataGrpc;
@ -34,6 +38,7 @@ class RegionCDCClient implements AutoCloseable, StreamObserver<ChangeDataEvent>
private final ChangeDataStub asyncStub;
private final Consumer<CDCEvent> eventConsumer;
private final CDCConfig config;
private final Predicate<Row> rowFilter;
private final AtomicBoolean running = new AtomicBoolean(false);
@ -54,6 +59,24 @@ class RegionCDCClient implements AutoCloseable, StreamObserver<ChangeDataEvent>
this.regionKeyRange =
KeyRange.newBuilder().setStart(region.getStartKey()).setEnd(region.getEndKey()).build();
this.rowFilter =
regionEnclosed()
? ((row) -> true)
: new Predicate<Row>() {
final byte[] buffer = new byte[config.getMaxRowKeySize()];
final byte[] start = keyRange.getStart().toByteArray();
final byte[] end = keyRange.getEnd().toByteArray();
@Override
public boolean test(final Row row) {
final int len = row.getKey().size();
row.getKey().copyTo(buffer, 0);
return (FastByteComparisons.compareTo(buffer, 0, len, start, 0, start.length) >= 0)
&& (FastByteComparisons.compareTo(buffer, 0, len, end, 0, end.length) < 0);
}
};
}
public synchronized void start(final long startTs) {
@ -87,6 +110,11 @@ class RegionCDCClient implements AutoCloseable, StreamObserver<ChangeDataEvent>
return regionKeyRange;
}
public boolean regionEnclosed() {
return KeyRangeUtils.makeRange(keyRange.getStart(), keyRange.getEnd())
.encloses(KeyRangeUtils.makeRange(regionKeyRange.getStart(), regionKeyRange.getEnd()));
}
public boolean isRunning() {
return running.get();
}
@ -133,6 +161,7 @@ class RegionCDCClient implements AutoCloseable, StreamObserver<ChangeDataEvent>
.stream()
.flatMap(ev -> ev.getEntries().getEntriesList().stream())
.filter(row -> ALLOWED_LOGTYPE.contains(row.getType()))
.filter(this.rowFilter)
.map(row -> CDCEvent.rowEvent(region.getId(), row))
.forEach(this::submitEvent);
@ -149,7 +178,7 @@ class RegionCDCClient implements AutoCloseable, StreamObserver<ChangeDataEvent>
}
private void submitEvent(final CDCEvent event) {
LOGGER.info("submit event: {}", event);
LOGGER.debug("submit event: {}", event);
eventConsumer.accept(event);
}
}