From 4aa8046b9f4261518507cf4f8b633b785d8ae0e0 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Wed, 19 Dec 2018 16:40:23 +0800 Subject: [PATCH] add comments Signed-off-by: birdstorm --- src/main/java/org/tikv/raw/RawKVClient.java | 103 ++++++++++++-------- 1 file changed, 64 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 1d7283326b..81a267c73f 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -99,6 +99,7 @@ public class RawKVClient implements AutoCloseable { * @param values values */ public void batchPut(BackOffer backOffer, List keys, List values) { + assert keys.size() == values.size(); Map keysToValues = mapKeysToValues(keys, values); Map> groupKeys = groupKeysByRegion(keys); keys.clear(); @@ -112,45 +113,7 @@ public class RawKVClient implements AutoCloseable { entry.getValue().stream().map(keysToValues::get).collect(Collectors.toList()), RAW_BATCH_PUT_SIZE); } - - for (Batch batch : batches) { - completionService.submit( - () -> { - Pair pair = - regionManager.getRegionStorePairByRegionId(batch.regionId); - RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session); - List kvPairs = new ArrayList<>(); - for (int i = 0; i < batch.keys.size(); i++) { - kvPairs.add( - Kvrpcpb.KvPair.newBuilder() - .setKey(batch.keys.get(i)) - .setValue(batch.values.get(i)) - .build()); - } - try { - client.rawBatchPut(backOffer, kvPairs); - } catch (final TiKVException e) { - // TODO: any elegant way to re-split the ranges if fails? - backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); - logger.warn("ReSplitting ranges for BatchPutRequest"); - // recursive calls - batchPut(backOffer, batch.keys, batch.values); - } - return null; - }); - } - try { - for (int i = 0; i < batches.size(); i++) { - completionService.take().get(BackOffer.rawkvMaxBackoff, TimeUnit.SECONDS); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new TiKVException("Current thread interrupted.", e); - } catch (TimeoutException e) { - throw new TiKVException("TimeOut Exceeded for current operation. ", e); - } catch (ExecutionException e) { - throw new TiKVException("Execution exception met.", e); - } + sendBatchPut(backOffer, batches); } /** @@ -238,6 +201,15 @@ public class RawKVClient implements AutoCloseable { } } + /** + * Append batch to list and split them according to batch limit + * + * @param batches a grouped batch + * @param regionId region ID + * @param keys keys + * @param values values + * @param limit batch max limit + */ private void appendBatches( List batches, long regionId, @@ -260,6 +232,12 @@ public class RawKVClient implements AutoCloseable { } } + /** + * Group by list of keys according to its region + * + * @param keys keys + * @return a mapping of keys and their regionId + */ private Map> groupKeysByRegion(List keys) { Map> groups = new HashMap<>(); TiRegion lastRegion = null; @@ -281,6 +259,53 @@ public class RawKVClient implements AutoCloseable { return map; } + /** + * Send batchPut request concurrently + * + * @param backOffer current backOffer + * @param batches list of batch to send + */ + private void sendBatchPut(BackOffer backOffer, List batches) { + for (Batch batch : batches) { + completionService.submit( + () -> { + Pair pair = + regionManager.getRegionStorePairByRegionId(batch.regionId); + RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session); + List kvPairs = new ArrayList<>(); + for (int i = 0; i < batch.keys.size(); i++) { + kvPairs.add( + Kvrpcpb.KvPair.newBuilder() + .setKey(batch.keys.get(i)) + .setValue(batch.values.get(i)) + .build()); + } + try { + client.rawBatchPut(backOffer, kvPairs); + } catch (final TiKVException e) { + // TODO: any elegant way to re-split the ranges if fails? + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + logger.warn("ReSplitting ranges for BatchPutRequest"); + // recursive calls + batchPut(backOffer, batch.keys, batch.values); + } + return null; + }); + } + try { + for (int i = 0; i < batches.size(); i++) { + completionService.take().get(BackOffer.rawkvMaxBackoff, TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TiKVException("Current thread interrupted.", e); + } catch (TimeoutException e) { + throw new TiKVException("TimeOut Exceeded for current operation. ", e); + } catch (ExecutionException e) { + throw new TiKVException("Execution exception met.", e); + } + } + private Iterator rawScanIterator(ByteString startKey, ByteString endKey) { return new RawScanIterator(startKey, endKey, Integer.MAX_VALUE, session); }