add comments

Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
birdstorm 2018-12-19 16:40:23 +08:00
parent 106a7c3b3f
commit 4aa8046b9f
1 changed files with 64 additions and 39 deletions

View File

@ -99,6 +99,7 @@ public class RawKVClient implements AutoCloseable {
* @param values values
*/
public void batchPut(BackOffer backOffer, List<ByteString> keys, List<ByteString> values) {
assert keys.size() == values.size();
Map<ByteString, ByteString> keysToValues = mapKeysToValues(keys, values);
Map<Long, List<ByteString>> groupKeys = groupKeysByRegion(keys);
keys.clear();
@ -112,45 +113,7 @@ public class RawKVClient implements AutoCloseable {
entry.getValue().stream().map(keysToValues::get).collect(Collectors.toList()),
RAW_BATCH_PUT_SIZE);
}
for (Batch batch : batches) {
completionService.submit(
() -> {
Pair<TiRegion, Metapb.Store> pair =
regionManager.getRegionStorePairByRegionId(batch.regionId);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
List<Kvrpcpb.KvPair> kvPairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
kvPairs.add(
Kvrpcpb.KvPair.newBuilder()
.setKey(batch.keys.get(i))
.setValue(batch.values.get(i))
.build());
}
try {
client.rawBatchPut(backOffer, kvPairs);
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("ReSplitting ranges for BatchPutRequest");
// recursive calls
batchPut(backOffer, batch.keys, batch.values);
}
return null;
});
}
try {
for (int i = 0; i < batches.size(); i++) {
completionService.take().get(BackOffer.rawkvMaxBackoff, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
sendBatchPut(backOffer, batches);
}
/**
@ -238,6 +201,15 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Append batch to list and split them according to batch limit
*
* @param batches a grouped batch
* @param regionId region ID
* @param keys keys
* @param values values
* @param limit batch max limit
*/
private void appendBatches(
List<Batch> batches,
long regionId,
@ -260,6 +232,12 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Group by list of keys according to its region
*
* @param keys keys
* @return a mapping of keys and their regionId
*/
private Map<Long, List<ByteString>> groupKeysByRegion(List<ByteString> keys) {
Map<Long, List<ByteString>> groups = new HashMap<>();
TiRegion lastRegion = null;
@ -281,6 +259,53 @@ public class RawKVClient implements AutoCloseable {
return map;
}
/**
* Send batchPut request concurrently
*
* @param backOffer current backOffer
* @param batches list of batch to send
*/
private void sendBatchPut(BackOffer backOffer, List<Batch> batches) {
for (Batch batch : batches) {
completionService.submit(
() -> {
Pair<TiRegion, Metapb.Store> pair =
regionManager.getRegionStorePairByRegionId(batch.regionId);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
List<Kvrpcpb.KvPair> kvPairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
kvPairs.add(
Kvrpcpb.KvPair.newBuilder()
.setKey(batch.keys.get(i))
.setValue(batch.values.get(i))
.build());
}
try {
client.rawBatchPut(backOffer, kvPairs);
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("ReSplitting ranges for BatchPutRequest");
// recursive calls
batchPut(backOffer, batch.keys, batch.values);
}
return null;
});
}
try {
for (int i = 0; i < batches.size(); i++) {
completionService.take().get(BackOffer.rawkvMaxBackoff, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}
private Iterator<Kvrpcpb.KvPair> rawScanIterator(ByteString startKey, ByteString endKey) {
return new RawScanIterator(startKey, endKey, Integer.MAX_VALUE, session);
}