From 1f096b5a381aaae0696f5030117f6f6197bbd48c Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Tue, 20 Dec 2022 21:32:21 +0800 Subject: [PATCH] [close #684] Fix batch get blocked by write (#685) Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/txn/KVClient.java | 8 +- src/test/java/org/tikv/txn/BatchGetTest.java | 103 +++++++++++++++++++ src/test/java/org/tikv/txn/TXNTest.java | 2 +- 3 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/tikv/txn/BatchGetTest.java diff --git a/src/main/java/org/tikv/txn/KVClient.java b/src/main/java/org/tikv/txn/KVClient.java index dfa9b8b296..e8c83c5446 100644 --- a/src/main/java/org/tikv/txn/KVClient.java +++ b/src/main/java/org/tikv/txn/KVClient.java @@ -50,6 +50,7 @@ public class KVClient implements AutoCloseable { private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; private final ExecutorService executorService; + private Set resolvedLocks = Collections.emptySet(); public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder, TiSession session) { Objects.requireNonNull(conf, "conf is null"); @@ -223,6 +224,10 @@ public class KVClient implements AutoCloseable { if (oldRegion.equals(currentRegion)) { RegionStoreClient client = clientBuilder.build(batch.getRegion()); + // set resolvedLocks for the new client + if (!resolvedLocks.isEmpty()) { + client.addResolvedLocks(version, resolvedLocks); + } try { return client.batchGet(backOffer, batch.getKeys(), version); } catch (final TiKVException e) { @@ -230,7 +235,8 @@ public class KVClient implements AutoCloseable { clientBuilder.getRegionManager().invalidateRegion(batch.getRegion()); logger.warn("ReSplitting ranges for BatchGetRequest", e); - // retry + // get resolved locks and retry + resolvedLocks = client.getResolvedLocks(version); return doSendBatchGetWithRefetchRegion(backOffer, batch, version); } } else { diff --git a/src/test/java/org/tikv/txn/BatchGetTest.java b/src/test/java/org/tikv/txn/BatchGetTest.java new file mode 100644 index 0000000000..cbdff1b392 --- /dev/null +++ b/src/test/java/org/tikv/txn/BatchGetTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.txn; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; + +import com.google.protobuf.ByteString; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; +import org.tikv.common.BytePairWrapper; +import org.tikv.common.ByteWrapper; +import org.tikv.common.exception.KeyException; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.Kvrpcpb.KvPair; + +public class BatchGetTest extends TXNTest { + + @Test + public void BatchGetResolveLockTest() throws Exception { + long lockTTL = 20000L; + String key1 = "batchGetResolveLockTestKey1"; + String key2 = "batchGetResolveLockTestKey2"; + String val1 = "val1"; + String val2 = "val2"; + String val1_update = "val1_update"; + String val2_update = "val2_update"; + + // put key1 and key2 + putKV(key1, val1); + putKV(key2, val2); + + // run 2PC background + new Thread( + () -> { + long startTS = session.getTimestamp().getVersion(); + try (TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter(session, startTS, lockTTL)) { + byte[] primaryKey = key1.getBytes("UTF-8"); + byte[] secondary = key2.getBytes("UTF-8"); + // prewrite primary key + twoPhaseCommitter.prewritePrimaryKey( + ConcreteBackOffer.newCustomBackOff(5000), + primaryKey, + val1_update.getBytes("UTF-8")); + List pairs = + Arrays.asList(new BytePairWrapper(secondary, val2_update.getBytes("UTF-8"))); + // prewrite secondary key + twoPhaseCommitter.prewriteSecondaryKeys(primaryKey, pairs.iterator(), 5000); + + // get commitTS + long commitTS = session.getTimestamp().getVersion(); + Thread.sleep(5000); + // commit primary key + twoPhaseCommitter.commitPrimaryKey( + ConcreteBackOffer.newCustomBackOff(5000), primaryKey, commitTS); + // commit secondary key + List keys = Arrays.asList(new ByteWrapper(secondary)); + twoPhaseCommitter.commitSecondaryKeys(keys.iterator(), commitTS, 5000); + } catch (Exception e) { + KeyException keyException = (KeyException) e.getCause().getCause(); + assertNotSame("", keyException.getKeyErr().getCommitTsExpired().toString()); + } + }) + .start(); + + // wait 2PC get commitTS + Thread.sleep(2000); + // batch get key1 and key2 + try (KVClient kvClient = session.createKVClient()) { + long version = session.getTimestamp().getVersion(); + ByteString k1 = ByteString.copyFromUtf8(key1); + ByteString k2 = ByteString.copyFromUtf8(key2); + + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(5000); + List kvPairs = kvClient.batchGet(backOffer, Arrays.asList(k1, k2), version); + // Since TiKV v4.0.0 write locked key will not block read. it is supported by Min Commit + // Timestamp + assertEquals(ByteString.copyFromUtf8(val1), kvPairs.get(0).getValue()); + assertEquals(ByteString.copyFromUtf8(val2), kvPairs.get(1).getValue()); + System.out.println(kvPairs); + // wait 2PC finish + Thread.sleep(10000); + } + } +} diff --git a/src/test/java/org/tikv/txn/TXNTest.java b/src/test/java/org/tikv/txn/TXNTest.java index 92af0383da..386ad8182e 100644 --- a/src/test/java/org/tikv/txn/TXNTest.java +++ b/src/test/java/org/tikv/txn/TXNTest.java @@ -41,7 +41,7 @@ import org.tikv.kvproto.Kvrpcpb; public class TXNTest extends BaseTxnKVTest { static final int DEFAULT_TTL = 10; - private TiSession session; + public TiSession session; RegionStoreClient.RegionStoreClientBuilder builder; @Before