From af6ab502332db6f2a40aaadbcc648afcbbb58a34 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Thu, 28 Jul 2022 16:51:20 +0800 Subject: [PATCH] fix rawBatchPut Signed-off-by: iosmanthus --- src/main/java/org/tikv/raw/RawKVClient.java | 18 +++++++-------- .../java/org/tikv/raw/RawKVClientTest.java | 22 ++++++++----------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 45fd0699ee..368d219184 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -749,17 +749,17 @@ public class RawKVClient implements RawKVClientBase { while (!taskQueue.isEmpty()) { List task = taskQueue.poll(); for (Batch batch : task) { - completionService.submit( - () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl)); + futureList.add(completionService.submit( + () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl))); + } - try { - getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis()); - } catch (Exception e) { - for (Future> future : futureList) { - future.cancel(true); - } - throw e; + try { + getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis()); + } catch (Exception e) { + for (Future> future : futureList) { + future.cancel(true); } + throw e; } } } diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 39464aca3a..9e926f3fd8 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -20,6 +20,7 @@ package org.tikv.raw; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.tikv.raw.RawKVClientBase.MAX_RAW_BATCH_LIMIT; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; @@ -1086,18 +1087,13 @@ public class RawKVClientTest extends BaseRawKVTest { } @Test - public void testBatchPut() throws Exception { - TiConfiguration conf = session.getConf(); - conf.setRawKVBatchWriteTimeoutInMS(100000); - conf.setTimeout(100000); - try(TiSession newSession = TiSession.create(conf)){ - try(RawKVClient client=newSession.createRawClient()) { - HashMap kvs = new HashMap<>(); - for (int i = 0; i < 2048; i++) { - kvs.put(ByteString.copyFromUtf8("key@" + i), rawValue("value@" + i)); - } - client.batchPut(kvs); - } - }; + public void testBatchPutForIssue634() { + ByteString prefix = ByteString.copyFromUtf8("testBatchPutForIssue634"); + client.deletePrefix(prefix); + HashMap kvs = new HashMap<>(); + for (int i = 0; i < MAX_RAW_BATCH_LIMIT * 4; i++) { + kvs.put(prefix.concat(ByteString.copyFromUtf8("key@" + i)), rawValue("value@" + i)); + } + client.batchPut(kvs); } }