fix rawBatchPut

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
This commit is contained in:
iosmanthus 2022-07-28 16:51:20 +08:00
parent 5fca510ae3
commit af6ab50233
No known key found for this signature in database
GPG Key ID: DEE5BAABFE092169
2 changed files with 18 additions and 22 deletions

View File

@ -749,17 +749,17 @@ public class RawKVClient implements RawKVClientBase {
while (!taskQueue.isEmpty()) { while (!taskQueue.isEmpty()) {
List<Batch> task = taskQueue.poll(); List<Batch> task = taskQueue.poll();
for (Batch batch : task) { for (Batch batch : task) {
completionService.submit( futureList.add(completionService.submit(
() -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl)); () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl)));
}
try { try {
getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis()); getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
} catch (Exception e) { } catch (Exception e) {
for (Future<List<Batch>> future : futureList) { for (Future<List<Batch>> future : futureList) {
future.cancel(true); future.cancel(true);
}
throw e;
} }
throw e;
} }
} }
} }

View File

@ -20,6 +20,7 @@ package org.tikv.raw;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.tikv.raw.RawKVClientBase.MAX_RAW_BATCH_LIMIT;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
@ -1086,18 +1087,13 @@ public class RawKVClientTest extends BaseRawKVTest {
} }
@Test @Test
public void testBatchPut() throws Exception { public void testBatchPutForIssue634() {
TiConfiguration conf = session.getConf(); ByteString prefix = ByteString.copyFromUtf8("testBatchPutForIssue634");
conf.setRawKVBatchWriteTimeoutInMS(100000); client.deletePrefix(prefix);
conf.setTimeout(100000); HashMap<ByteString, ByteString> kvs = new HashMap<>();
try(TiSession newSession = TiSession.create(conf)){ for (int i = 0; i < MAX_RAW_BATCH_LIMIT * 4; i++) {
try(RawKVClient client=newSession.createRawClient()) { kvs.put(prefix.concat(ByteString.copyFromUtf8("key@" + i)), rawValue("value@" + i));
HashMap<ByteString, ByteString> kvs = new HashMap<>(); }
for (int i = 0; i < 2048; i++) { client.batchPut(kvs);
kvs.put(ByteString.copyFromUtf8("key@" + i), rawValue("value@" + i));
}
client.batchPut(kvs);
}
};
} }
} }