Support SST Decoder (#284)

This commit is contained in:
Liangliang Gu 2021-10-14 17:13:57 +08:00 committed by GitHub
parent dcea3b305c
commit bbe4f9b0c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 478 additions and 2 deletions

View File

@ -61,6 +61,7 @@
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<rocksdb.version>6.22.1.1</rocksdb.version>
<protobuf.version>3.5.1</protobuf.version> <protobuf.version>3.5.1</protobuf.version>
<log4j.version>1.2.17</log4j.version> <log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.16</slf4j.version> <slf4j.version>1.7.16</slf4j.version>
@ -79,6 +80,11 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.antlr</groupId> <groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId> <artifactId>antlr4-runtime</artifactId>

View File

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
# #
kvproto_hash=2ac2a7984b2d01b96ed56fd8474f4bf80fa33c51 kvproto_hash=465fa4c7b42e644d5aacaf79d06c75733dc12eb3
raft_rs_hash=b9891b673573fad77ebcf9bbe0969cf945841926 raft_rs_hash=b9891b673573fad77ebcf9bbe0969cf945841926
tipb_hash=c4d518eb1d60c21f05b028b36729e64610346dac tipb_hash=c4d518eb1d60c21f05b028b36729e64610346dac

View File

@ -0,0 +1,64 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.br;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.tikv.common.exception.SSTDecodeException;
import org.tikv.kvproto.Brpb;
public class BackupDecoder {
private final Brpb.BackupMeta backupMeta;
private final boolean ttlEnabled;
private final KVDecoder kvDecoder;
public BackupDecoder(Brpb.BackupMeta backupMeta) throws SSTDecodeException {
this.backupMeta = backupMeta;
this.ttlEnabled = false;
this.kvDecoder = initKVDecoder();
}
public BackupDecoder(Brpb.BackupMeta backupMeta, boolean ttlEnabled) throws SSTDecodeException {
this.backupMeta = backupMeta;
this.ttlEnabled = ttlEnabled;
this.kvDecoder = initKVDecoder();
}
private KVDecoder initKVDecoder() throws SSTDecodeException {
// Currently only v1 is supported.
// V2 will be added after https://github.com/tikv/tikv/issues/10938.
if (backupMeta.getIsRawKv()) {
// TODO: ttl_enable should be witten to BackupMeta
return new RawKVDecoderV1(ttlEnabled);
} else {
throw new SSTDecodeException("TxnKV is not supported yet!");
}
}
public SSTDecoder decodeSST(String sstFilePath) {
return decodeSST(sstFilePath, new Options(), new ReadOptions());
}
public SSTDecoder decodeSST(String sstFilePath, Options options, ReadOptions readOptions) {
return new SSTDecoder(sstFilePath, kvDecoder, options, readOptions);
}
public Brpb.BackupMeta getBackupMeta() {
return backupMeta;
}
}

View File

@ -0,0 +1,41 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.br;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.tikv.kvproto.Brpb;
public class BackupMetaDecoder {
private final Brpb.BackupMeta backupMeta;
public BackupMetaDecoder(byte[] data) throws InvalidProtocolBufferException {
this.backupMeta = Brpb.BackupMeta.parseFrom(data);
}
public Brpb.BackupMeta getBackupMeta() {
return backupMeta;
}
public static BackupMetaDecoder parse(String backupMetaFilePath) throws IOException {
byte[] data = Files.readAllBytes(new File(backupMetaFilePath).toPath());
return new BackupMetaDecoder(data);
}
}

View File

@ -0,0 +1,26 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.br;
import com.google.protobuf.ByteString;
public interface KVDecoder {
ByteString decodeKey(byte[] key);
ByteString decodeValue(byte[] value);
}

View File

@ -0,0 +1,56 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.br;
import com.google.protobuf.ByteString;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RawKVDecoderV1 implements KVDecoder {
private static final Logger logger = LoggerFactory.getLogger(SSTIterator.class);
private final boolean ttlEnabled;
public RawKVDecoderV1(boolean ttlEnabled) {
this.ttlEnabled = ttlEnabled;
}
@Override
public ByteString decodeKey(byte[] key) {
if (key == null || key.length == 0) {
logger.warn(
"skip Key-Value pair because key == null || key.length == 0, key = "
+ Arrays.toString(key));
return null;
} else if (key[0] != 'z') {
logger.warn("skip Key-Value pair because key[0] != 'z', key = " + Arrays.toString(key));
return null;
}
return ByteString.copyFrom(key, 1, key.length - 1);
}
@Override
public ByteString decodeValue(byte[] value) {
if (!ttlEnabled) {
return ByteString.copyFrom(value);
} else {
return ByteString.copyFrom(value).substring(0, value.length - 8);
}
}
}

View File

@ -0,0 +1,93 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.br;
import com.google.protobuf.ByteString;
import java.util.Iterator;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.SstFileReaderIterator;
import org.tikv.common.util.Pair;
public class SSTDecoder {
private final String filePath;
private final KVDecoder kvDecoder;
private final Options options;
private final ReadOptions readOptions;
private SstFileReader sstFileReader;
private SstFileReaderIterator iterator;
public SSTDecoder(String sstFilePath, KVDecoder kvDecoder) {
this.filePath = sstFilePath;
this.kvDecoder = kvDecoder;
this.options = new Options();
this.readOptions = new ReadOptions();
}
public SSTDecoder(
String filePath, KVDecoder kvDecoder, Options options, ReadOptions readOptions) {
this.filePath = filePath;
this.kvDecoder = kvDecoder;
this.options = options;
this.readOptions = readOptions;
}
public synchronized Iterator<Pair<ByteString, ByteString>> getIterator() throws RocksDBException {
if (sstFileReader != null || iterator != null) {
throw new RocksDBException("File already opened!");
}
sstFileReader = new SstFileReader(new Options());
sstFileReader.open(filePath);
iterator = sstFileReader.newIterator(new ReadOptions());
return new SSTIterator(iterator, kvDecoder);
}
public synchronized void close() {
try {
if (iterator != null) {
iterator.close();
}
} finally {
iterator = null;
}
try {
if (sstFileReader != null) {
sstFileReader.close();
}
} finally {
sstFileReader = null;
}
}
public String getFilePath() {
return filePath;
}
public Options getOptions() {
return options;
}
public ReadOptions getReadOptions() {
return readOptions;
}
}

View File

@ -0,0 +1,64 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.br;
import com.google.protobuf.ByteString;
import java.util.Iterator;
import org.rocksdb.SstFileReaderIterator;
import org.tikv.common.util.Pair;
public class SSTIterator implements Iterator<Pair<ByteString, ByteString>> {
private final SstFileReaderIterator iterator;
private final KVDecoder kvDecoder;
private Pair<ByteString, ByteString> nextPair;
public SSTIterator(SstFileReaderIterator iterator, KVDecoder kvDecoder) {
this.iterator = iterator;
this.kvDecoder = kvDecoder;
this.iterator.seekToFirst();
this.nextPair = processNext();
}
@Override
public boolean hasNext() {
return nextPair != null;
}
@Override
public Pair<ByteString, ByteString> next() {
Pair<ByteString, ByteString> result = nextPair;
nextPair = processNext();
return result;
}
private Pair<ByteString, ByteString> processNext() {
if (iterator.isValid()) {
ByteString key = kvDecoder.decodeKey(iterator.key());
ByteString value = kvDecoder.decodeValue(iterator.value());
iterator.next();
if (key != null) {
return Pair.create(key, value);
} else {
return processNext();
}
} else {
return null;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common.exception;
public class SSTDecodeException extends RuntimeException {
public SSTDecodeException(Exception e) {
super(e);
}
public SSTDecodeException(String msg) {
super(msg);
}
}

View File

@ -1042,7 +1042,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
RawBatchPutRequest.newBuilder() RawBatchPutRequest.newBuilder()
.setContext(makeContext(storeType)) .setContext(makeContext(storeType))
.addAllPairs(kvPairs) .addAllPairs(kvPairs)
.setTtl(ttl) .addTtls(ttl)
.setForCas(atomicForCAS) .setForCas(atomicForCAS)
.build(); .build();
RegionErrorHandler<RawBatchPutResponse> handler = RegionErrorHandler<RawBatchPutResponse> handler =

View File

@ -0,0 +1,78 @@
package org.tikv.br;
import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.junit.Assert;
import org.junit.Test;
import org.rocksdb.RocksDBException;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Brpb;
public class BackupDecoderTest {
private static final int TOTAL_COUNT = 500;
private static final String KEY_PREFIX = "test_";
private static final String VALUE =
"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
@Test
public void rawKVSSTDecoderTest() throws RocksDBException, IOException {
String backupmetaFilePath = "src/test/resources/sst/backupmeta";
String sst1FilePath =
"src/test/resources/sst/1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1633919546277_default.sst";
String sst2FilePath =
"src/test/resources/sst/4_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1633919546278_default.sst";
BackupMetaDecoder backupMetaDecoder = BackupMetaDecoder.parse(backupmetaFilePath);
Brpb.BackupMeta backupMeta = backupMetaDecoder.getBackupMeta();
BackupDecoder sstBackup = new BackupDecoder(backupMeta);
decodeSST(sstBackup, sst1FilePath);
decodeSST(sstBackup, sst2FilePath);
}
@Test
public void rawKVWithTTLSSTDecoderTest() throws RocksDBException, IOException {
String backupmetaFilePath = "src/test/resources/sst_ttl/backupmeta";
String sst1FilePath =
"src/test/resources/sst_ttl/1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1634199092593_default.sst";
String sst2FilePath =
"src/test/resources/sst_ttl/5_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1634199092587_default.sst";
BackupMetaDecoder backupMetaDecoder = BackupMetaDecoder.parse(backupmetaFilePath);
Brpb.BackupMeta backupMeta = backupMetaDecoder.getBackupMeta();
BackupDecoder sstBackup = new BackupDecoder(backupMeta, true);
decodeSST(sstBackup, sst1FilePath);
decodeSST(sstBackup, sst2FilePath);
}
private void decodeSST(BackupDecoder sstBackup, String sst) throws RocksDBException {
String fileName = new File(sst).getName();
Brpb.File backupFile =
sstBackup
.getBackupMeta()
.getFilesList()
.stream()
.filter(a -> a.getName().equals(fileName))
.findFirst()
.get();
Assert.assertEquals(TOTAL_COUNT, backupFile.getTotalKvs());
SSTDecoder sstDecoder = sstBackup.decodeSST(sst);
Iterator<Pair<ByteString, ByteString>> iterator = sstDecoder.getIterator();
int count = 0;
while (iterator.hasNext()) {
Pair<ByteString, ByteString> pair = iterator.next();
Assert.assertEquals(VALUE, pair.second.toStringUtf8());
Assert.assertTrue(pair.first.toStringUtf8().startsWith(KEY_PREFIX));
count += 1;
}
sstDecoder.close();
Assert.assertEquals(TOTAL_COUNT, count);
}
}

View File

@ -0,0 +1,11 @@
<08>§Ô턘è±a"5.3.0-alpha"
"´
`4_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1633919546278_default.sst -å©”1N7‰ù@åÆhǧ*2,L>c<>_Q?­ßtest"test_-009965169116504@ôH„ÌRdefaultXê/"±
`1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1633919546277_default.sst úªÜ H†¸ù£]f;FÊé•÷jÔL³ÿV@õ<>7y¶test_-009965169116504"u@ôH„ÌRdefaultX0@J
testudefaultR[]ZÄBR
Release Version: v5.2.1
Git Commit Hash: cd8fb24c5f7ebd9d479ed228bb41848bd5e97445
Git Branch: heads/refs/tags/v5.2.1
Go Version: go1.16.4
UTC Build Time: 2021-09-07 16:19:11
Race Enabled: false

View File

@ -0,0 +1,11 @@
œâìÈ— ù³a"5.3.0-alpha"
"´
`5_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1634199092587_default.sst<>I‘ÄÆúúÓ~Ѷ—”:†8¥ƒüÍ~Ç0³KFÔtest"test_-009965169116504@ôH¤ëRdefaultXŠ1"±
`1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1634199092593_default.sst Üw2l^§¬êÉ9>™6¸#Õ„Þ<E2809E>¡±)ïqaœ<61>r~øtest_-009965169116504"u@ôH¤ëRdefaultX·1@J
testudefaultR[]ZÄBR
Release Version: v5.2.1
Git Commit Hash: cd8fb24c5f7ebd9d479ed228bb41848bd5e97445
Git Branch: heads/refs/tags/v5.2.1
Go Version: go1.16.4
UTC Build Time: 2021-09-07 16:19:11
Race Enabled: false