cherry pick #640 to release-3.3

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
This commit is contained in:
iosmanthus 2022-07-29 18:56:42 +08:00 committed by ti-srebot
parent 6e8c0fc99a
commit d711f04367
2 changed files with 25 additions and 12 deletions

View File

@ -749,8 +749,10 @@ public class RawKVClient implements RawKVClientBase {
while (!taskQueue.isEmpty()) { while (!taskQueue.isEmpty()) {
List<Batch> task = taskQueue.poll(); List<Batch> task = taskQueue.poll();
for (Batch batch : task) { for (Batch batch : task) {
futureList.add(
completionService.submit( completionService.submit(
() -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl)); () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl)));
}
try { try {
getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis()); getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
@ -762,7 +764,6 @@ public class RawKVClient implements RawKVClientBase {
} }
} }
} }
}
private List<Batch> doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) { private List<Batch> doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) {
try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) { try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {

View File

@ -20,6 +20,7 @@ package org.tikv.raw;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.tikv.raw.RawKVClientBase.MAX_RAW_BATCH_LIMIT;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
@ -360,7 +361,7 @@ public class RawKVClientTest extends BaseRawKVTest {
} }
int i = 0; int i = 0;
Iterator<KvPair> iter = client.scan0(prefix, ByteString.EMPTY, cnt); Iterator<KvPair> iter = client.scanPrefix0(prefix, cnt, false);
while (iter.hasNext()) { while (iter.hasNext()) {
i++; i++;
KvPair pair = iter.next(); KvPair pair = iter.next();
@ -369,7 +370,7 @@ public class RawKVClientTest extends BaseRawKVTest {
assertEquals(cnt, i); assertEquals(cnt, i);
i = 0; i = 0;
iter = client.scan0(prefix, ByteString.EMPTY, true); iter = client.scanPrefix0(prefix, true);
while (iter.hasNext()) { while (iter.hasNext()) {
i++; i++;
KvPair pair = iter.next(); KvPair pair = iter.next();
@ -397,7 +398,7 @@ public class RawKVClientTest extends BaseRawKVTest {
}); });
client.ingest(kvs); client.ingest(kvs);
assertEquals(client.scan(prefix, ByteString.EMPTY).size(), cnt); assertEquals(client.scanPrefix(prefix).size(), cnt);
} }
@Test @Test
@ -1084,4 +1085,15 @@ public class RawKVClientTest extends BaseRawKVTest {
return FastByteComparisons.compareTo(startKey.toByteArray(), endKey.toByteArray()); return FastByteComparisons.compareTo(startKey.toByteArray(), endKey.toByteArray());
} }
} }
@Test
public void testBatchPutForIssue634() {
ByteString prefix = ByteString.copyFromUtf8("testBatchPutForIssue634");
client.deletePrefix(prefix);
HashMap<ByteString, ByteString> kvs = new HashMap<>();
for (int i = 0; i < MAX_RAW_BATCH_LIMIT * 4; i++) {
kvs.put(prefix.concat(ByteString.copyFromUtf8("key@" + i)), rawValue("value@" + i));
}
client.batchPut(kvs);
}
} }