From 0addbc3aa1c91cbe8a1c2c2ad6f23ffd25d4816b Mon Sep 17 00:00:00 2001 From: ajian2002 Date: Thu, 4 Aug 2022 13:38:49 +0800 Subject: [PATCH] addBatchGetRetry Test --- .../tikv/common/region/RegionStoreClient.java | 11 +- .../org/tikv/txn/TwoPhaseCommitterTest.java | 117 +++++++++++++++++- 2 files changed, 126 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index ba742c872b..511ae2e6bb 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -298,7 +298,16 @@ public class RegionStoreClient extends AbstractRegionStoreClient { forWrite); BatchGetResponse resp = callWithRetry(backOffer, TikvGrpc.getKvBatchGetMethod(), request, handler); - return handleBatchGetResponse(backOffer, resp, version); + try { + return handleBatchGetResponse(backOffer, resp, version); + } catch (TiKVException e) { + if ("locks not resolved, retry".equals(e.getMessage())) { + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoTxnLock, e); + return batchGet(backOffer, keys, version); + } else { + throw e; + } + } } private List handleBatchGetResponse( diff --git a/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java b/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java index 02a530ffbc..f47db85c5a 100644 --- a/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java +++ b/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java @@ -17,9 +17,16 @@ package org.tikv.txn; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.ByteString; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.junit.After; @@ -27,21 +34,33 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.tikv.BaseTxnKVTest; +import org.tikv.common.BytePairWrapper; +import org.tikv.common.ByteWrapper; +import org.tikv.common.Snapshot; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Kvrpcpb.KvPair; public class TwoPhaseCommitterTest extends BaseTxnKVTest { private static final int WRITE_BUFFER_SIZE = 32 * 1024; private static final int TXN_COMMIT_BATCH_SIZE = 768 * 1024; private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000; - + private RegionStoreClientBuilder clientBuilder; private TiSession session; + private TxnKVClient txnKVClient; + private Long lockTTLSeconds = 20L; @Before public void setUp() { TiConfiguration conf = createTiConfiguration(); try { session = TiSession.create(conf); + clientBuilder = session.getRegionStoreClientBuilder(); + txnKVClient = session.createTxnClient(); } catch (Exception e) { fail("TiDB cluster may not be present"); } @@ -76,4 +95,100 @@ public class TwoPhaseCommitterTest extends BaseTxnKVTest { executorService)) {} Assert.assertTrue(executorService.isShutdown()); } + + @Test + public void batchGetRetryTest() throws Exception { + long startTS = session.getTimestamp().getVersion(); + + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(60000); + byte[] primaryKey = "key1".getBytes(StandardCharsets.UTF_8); + byte[] key2 = "key2".getBytes(StandardCharsets.UTF_8); + long version = session.getTimestamp().getVersion(); + ByteString bkey1 = ByteString.copyFromUtf8("key1"); + ByteString bkey2 = ByteString.copyFromUtf8("key2"); + ByteString bvalue1 = ByteString.copyFromUtf8("val1"); + ByteString bvalue2 = ByteString.copyFromUtf8("val2"); + ByteString bvalue3 = ByteString.copyFromUtf8("value3"); + List> kvs = + Arrays.asList(new Pair<>(bkey1, bvalue1), new Pair<>(bkey2, bvalue2)); + try (KVClient kvClient = session.createKVClient()) { + new Thread( + () -> { + Snapshot snapshot = session.createSnapshot(session.getTimestamp()); + snapshot.batchGet( + 60000, Collections.singletonList("hello".getBytes(Charset.defaultCharset()))); + try (TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter(session, startTS)) { + // first phrase: prewrite + twoPhaseCommitter.prewritePrimaryKey( + backOffer, primaryKey, "val1".getBytes(StandardCharsets.UTF_8)); + List pairs = null; + pairs = + Collections.singletonList( + new BytePairWrapper(key2, "val2".getBytes(StandardCharsets.UTF_8))); + twoPhaseCommitter.prewriteSecondaryKeys(primaryKey, pairs.iterator(), 1000); + // second phrase: commit + long commitTS = session.getTimestamp().getVersion(); + + Thread.sleep(10000L); + twoPhaseCommitter.commitPrimaryKey(backOffer, primaryKey, commitTS); + List keys = Collections.singletonList(new ByteWrapper(key2)); + twoPhaseCommitter.commitSecondaryKeys(keys.iterator(), commitTS, 1000); + + ByteString val = kvClient.get(bkey1, version); + assertEquals(bvalue1, val); + + BackOffer cBackOffer = ConcreteBackOffer.newCustomBackOff(1000); + List kvPairs = + kvClient.batchGet(cBackOffer, Arrays.asList(bkey1, bkey2), version); + assertEquals(kvs.size(), kvPairs.size()); + for (int i = 0; i < kvPairs.size(); i++) { + assertEquals(kvPairs.get(i).getKey(), kvs.get(i).first); + assertEquals(kvPairs.get(i).getValue(), kvs.get(i).second); + } + kvPairs = kvClient.scan(bkey1, ByteString.copyFromUtf8("key3"), version); + assertEquals(kvs.size(), kvPairs.size()); + for (int i = 0; i < kvPairs.size(); i++) { + assertEquals(kvPairs.get(i).getKey(), kvs.get(i).first); + assertEquals(kvPairs.get(i).getValue(), kvs.get(i).second); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .start(); + + Thread.sleep(1000L); + + try (TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter(session, session.getTimestamp().getVersion())) { + // first phrase: prewrite + twoPhaseCommitter.prewritePrimaryKey( + backOffer, primaryKey, "val3".getBytes(StandardCharsets.UTF_8)); + List pairs = null; + pairs = + Collections.singletonList( + new BytePairWrapper(key2, "val2".getBytes(StandardCharsets.UTF_8))); + twoPhaseCommitter.prewriteSecondaryKeys(primaryKey, pairs.iterator(), 1000); + // second phrase: commit + long commitTS = session.getTimestamp().getVersion(); + + twoPhaseCommitter.commitPrimaryKey(backOffer, primaryKey, commitTS); + List keys = Collections.singletonList(new ByteWrapper(key2)); + twoPhaseCommitter.commitSecondaryKeys(keys.iterator(), commitTS, 1000); + } + // access + List> kvs2 = + Arrays.asList(new Pair<>(bkey1, bvalue3), new Pair<>(bkey2, bvalue2)); + Snapshot snapshot = session.createSnapshot(session.getTimestamp()); + BackOffer cBackOffer = ConcreteBackOffer.newCustomBackOff(3000); + List kvPairs = + kvClient.batchGet(cBackOffer, Arrays.asList(bkey1, bkey2), snapshot.getVersion()); + assertEquals(kvs2.size(), kvPairs.size()); + for (int i = 0; i < kvPairs.size(); i++) { + assertEquals(kvPairs.get(i).getKey(), kvs2.get(i).first); + assertEquals(kvPairs.get(i).getValue(), kvs2.get(i).second); + } + } + } }