diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 45fd0699ee..3fa292c9d8 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -749,17 +749,18 @@ public class RawKVClient implements RawKVClientBase { while (!taskQueue.isEmpty()) { List task = taskQueue.poll(); for (Batch batch : task) { - completionService.submit( - () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl)); + futureList.add( + completionService.submit( + () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl))); + } - try { - getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis()); - } catch (Exception e) { - for (Future> future : futureList) { - future.cancel(true); - } - throw e; + try { + getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis()); + } catch (Exception e) { + for (Future> future : futureList) { + future.cancel(true); } + throw e; } } } diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index ee20df3ee5..08608ae11a 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -20,6 +20,7 @@ package org.tikv.raw; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.tikv.raw.RawKVClientBase.MAX_RAW_BATCH_LIMIT; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; @@ -360,7 +361,7 @@ public class RawKVClientTest extends BaseRawKVTest { } int i = 0; - Iterator iter = client.scan0(prefix, ByteString.EMPTY, cnt); + Iterator iter = client.scanPrefix0(prefix, cnt, false); while (iter.hasNext()) { i++; KvPair pair = iter.next(); @@ -369,7 +370,7 @@ public class RawKVClientTest extends BaseRawKVTest { assertEquals(cnt, i); i = 0; - iter = client.scan0(prefix, ByteString.EMPTY, true); + iter = client.scanPrefix0(prefix, true); while (iter.hasNext()) { i++; KvPair pair = iter.next(); @@ -397,7 +398,7 @@ public class RawKVClientTest extends BaseRawKVTest { }); client.ingest(kvs); - assertEquals(client.scan(prefix, ByteString.EMPTY).size(), cnt); + assertEquals(client.scanPrefix(prefix).size(), cnt); } @Test @@ -1084,4 +1085,15 @@ public class RawKVClientTest extends BaseRawKVTest { return FastByteComparisons.compareTo(startKey.toByteArray(), endKey.toByteArray()); } } + + @Test + public void testBatchPutForIssue634() { + ByteString prefix = ByteString.copyFromUtf8("testBatchPutForIssue634"); + client.deletePrefix(prefix); + HashMap kvs = new HashMap<>(); + for (int i = 0; i < MAX_RAW_BATCH_LIMIT * 4; i++) { + kvs.put(prefix.concat(ByteString.copyFromUtf8("key@" + i)), rawValue("value@" + i)); + } + client.batchPut(kvs); + } }