Fix batch retry hanging when tikv is down (#145)

Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
birdstorm 2021-03-23 17:21:01 +08:00 committed by GitHub
parent 7cedfca241
commit eecae1663e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 84 additions and 48 deletions

View File

@ -131,9 +131,8 @@ public class KVClient implements AutoCloseable {
getBatches(backOffer, keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
for (Batch batch : batches) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(
() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch, version));
() -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch, version));
}
return getKvPairs(completionService, batches, BackOffer.BATCH_GET_MAX_BACKOFF);
@ -141,17 +140,17 @@ public class KVClient implements AutoCloseable {
private List<KvPair> doSendBatchGetInBatchesWithRetry(
BackOffer backOffer, Batch batch, long version) {
TiRegion oldRegion = batch.region;
TiRegion oldRegion = batch.getRegion();
TiRegion currentRegion =
clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey());
if (oldRegion.equals(currentRegion)) {
RegionStoreClient client = clientBuilder.build(batch.region);
RegionStoreClient client = clientBuilder.build(batch.getRegion());
try {
return client.batchGet(backOffer, batch.keys, version);
return client.batchGet(backOffer, batch.getKeys(), version);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region);
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
logger.warn("ReSplitting ranges for BatchGetRequest", e);
// retry
@ -165,12 +164,13 @@ public class KVClient implements AutoCloseable {
private List<KvPair> doSendBatchGetWithRefetchRegion(
BackOffer backOffer, Batch batch, long version) {
List<Batch> retryBatches =
getBatches(backOffer, batch.keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
getBatches(backOffer, batch.getKeys(), BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
ArrayList<KvPair> results = new ArrayList<>();
for (Batch retryBatch : retryBatches) {
// recursive calls
List<KvPair> batchResult = doSendBatchGetInBatchesWithRetry(backOffer, retryBatch, version);
List<KvPair> batchResult =
doSendBatchGetInBatchesWithRetry(retryBatch.getBackOffer(), retryBatch, version);
results.addAll(batchResult);
}
return results;

View File

@ -1039,9 +1039,12 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atomic) {
List<KvPair> pairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
for (int i = 0; i < batch.getKeys().size(); i++) {
pairs.add(
KvPair.newBuilder().setKey(batch.keys.get(i)).setValue(batch.values.get(i)).build());
KvPair.newBuilder()
.setKey(batch.getKeys().get(i))
.setValue(batch.getValues().get(i))
.build());
}
rawBatchPut(backOffer, pairs, ttl, atomic);
}

View File

@ -23,19 +23,23 @@ import org.tikv.common.region.TiRegion;
/** A Batch containing the region, a list of keys and/or values to send */
public class Batch {
public final TiRegion region;
public final List<ByteString> keys;
public final List<ByteString> values;
public final Map<ByteString, ByteString> map;
private final BackOffer backOffer;
private final TiRegion region;
private final List<ByteString> keys;
private final List<ByteString> values;
private final Map<ByteString, ByteString> map;
public Batch(TiRegion region, List<ByteString> keys) {
public Batch(BackOffer backOffer, TiRegion region, List<ByteString> keys) {
this.backOffer = ConcreteBackOffer.create(backOffer);
this.region = region;
this.keys = keys;
this.values = null;
this.map = null;
}
public Batch(TiRegion region, List<ByteString> keys, List<ByteString> values) {
public Batch(
BackOffer backOffer, TiRegion region, List<ByteString> keys, List<ByteString> values) {
this.backOffer = ConcreteBackOffer.create(backOffer);
this.region = region;
this.keys = keys;
this.values = values;
@ -50,4 +54,24 @@ public class Batch {
}
return kvMap;
}
public BackOffer getBackOffer() {
return backOffer;
}
public TiRegion getRegion() {
return region;
}
public List<ByteString> getKeys() {
return keys;
}
public List<ByteString> getValues() {
return values;
}
public Map<ByteString, ByteString> getMap() {
return map;
}
}

View File

@ -31,12 +31,14 @@ public class ClientUtils {
/**
* Append batch to list and split them according to batch limit
*
* @param backOffer backOffer
* @param batches a grouped batch
* @param region region
* @param keys keys
* @param batchMaxSizeInBytes batch max limit
*/
public static void appendBatches(
BackOffer backOffer,
List<Batch> batches,
TiRegion region,
List<ByteString> keys,
@ -53,7 +55,7 @@ public class ClientUtils {
end++) {
size += keys.get(end).size();
}
Batch batch = new Batch(region, keys.subList(start, end));
Batch batch = new Batch(backOffer, region, keys.subList(start, end));
batches.add(batch);
}
}
@ -61,6 +63,7 @@ public class ClientUtils {
/**
* Append batch to list and split them according to batch limit
*
* @param backOffer backOffer
* @param batches a grouped batch
* @param region region
* @param keys keys
@ -68,6 +71,7 @@ public class ClientUtils {
* @param batchMaxSizeInBytes batch max limit
*/
public static void appendBatches(
BackOffer backOffer,
List<Batch> batches,
TiRegion region,
List<ByteString> keys,
@ -86,7 +90,8 @@ public class ClientUtils {
size += keys.get(end).size();
size += values.get(end).size();
}
Batch batch = new Batch(region, keys.subList(start, end), values.subList(start, end));
Batch batch =
new Batch(backOffer, region, keys.subList(start, end), values.subList(start, end));
batches.add(batch);
}
}
@ -102,7 +107,8 @@ public class ClientUtils {
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(retryBatches, entry.getKey(), entry.getValue(), batchSize, batchLimit);
appendBatches(
backOffer, retryBatches, entry.getKey(), entry.getValue(), batchSize, batchLimit);
}
return retryBatches;

View File

@ -19,16 +19,22 @@ import com.google.protobuf.ByteString;
import org.tikv.common.region.TiRegion;
public class DeleteRange {
private final BackOffer backOffer;
private final TiRegion region;
private final ByteString startKey;
private final ByteString endKey;
public DeleteRange(TiRegion region, ByteString startKey, ByteString endKey) {
public DeleteRange(BackOffer backOffer, TiRegion region, ByteString startKey, ByteString endKey) {
this.backOffer = ConcreteBackOffer.create(backOffer);
this.region = region;
this.startKey = startKey;
this.endKey = endKey;
}
public BackOffer getBackOffer() {
return backOffer;
}
public TiRegion getRegion() {
return region;
}

View File

@ -598,6 +598,7 @@ public class RawKVClient implements AutoCloseable {
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
backOffer,
batches,
entry.getKey(),
entry.getValue(),
@ -611,9 +612,8 @@ public class RawKVClient implements AutoCloseable {
while (!taskQueue.isEmpty()) {
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(
() -> doSendBatchPutInBatchesWithRetry(singleBatchBackOffer, batch, ttl, atomic));
() -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl, atomic));
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
@ -621,7 +621,7 @@ public class RawKVClient implements AutoCloseable {
private List<Batch> doSendBatchPutInBatchesWithRetry(
BackOffer backOffer, Batch batch, long ttl, boolean atomic) {
try (RegionStoreClient client = clientBuilder.build(batch.region)) {
try (RegionStoreClient client = clientBuilder.build(batch.getRegion())) {
client.rawBatchPut(backOffer, batch, ttl, atomic);
return new ArrayList<>();
} catch (final TiKVException e) {
@ -635,15 +635,16 @@ public class RawKVClient implements AutoCloseable {
private List<Batch> doSendBatchPutWithRefetchRegion(BackOffer backOffer, Batch batch) {
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), batch.keys, backOffer);
groupKeysByRegion(clientBuilder.getRegionManager(), batch.getKeys(), backOffer);
List<Batch> retryBatches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(
backOffer,
retryBatches,
entry.getKey(),
entry.getValue(),
entry.getValue().stream().map(batch.map::get).collect(Collectors.toList()),
entry.getValue().stream().map(batch.getMap()::get).collect(Collectors.toList()),
RAW_BATCH_PUT_SIZE,
MAX_RAW_BATCH_LIMIT);
}
@ -665,9 +666,8 @@ public class RawKVClient implements AutoCloseable {
while (!taskQueue.isEmpty()) {
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(
() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch));
() -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch));
}
result.addAll(
getTasksWithOutput(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF));
@ -678,13 +678,13 @@ public class RawKVClient implements AutoCloseable {
private Pair<List<Batch>, List<KvPair>> doSendBatchGetInBatchesWithRetry(
BackOffer backOffer, Batch batch) {
RegionStoreClient client = clientBuilder.build(batch.region);
RegionStoreClient client = clientBuilder.build(batch.getRegion());
try {
List<KvPair> partialResult = client.rawBatchGet(backOffer, batch.keys);
List<KvPair> partialResult = client.rawBatchGet(backOffer, batch.getKeys());
return Pair.create(new ArrayList<>(), partialResult);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region);
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
logger.warn("ReSplitting ranges for BatchGetRequest", e);
// retry
@ -694,7 +694,7 @@ public class RawKVClient implements AutoCloseable {
private List<Batch> doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch batch) {
return getBatches(
backOffer, batch.keys, RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
backOffer, batch.getKeys(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
}
private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys, boolean atomic) {
@ -710,9 +710,8 @@ public class RawKVClient implements AutoCloseable {
while (!taskQueue.isEmpty()) {
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(
() -> doSendBatchDeleteInBatchesWithRetry(singleBatchBackOffer, batch, atomic));
() -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch, atomic));
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
@ -720,13 +719,13 @@ public class RawKVClient implements AutoCloseable {
private List<Batch> doSendBatchDeleteInBatchesWithRetry(
BackOffer backOffer, Batch batch, boolean atomic) {
RegionStoreClient client = clientBuilder.build(batch.region);
RegionStoreClient client = clientBuilder.build(batch.getRegion());
try {
client.rawBatchDelete(backOffer, batch.keys, atomic);
client.rawBatchDelete(backOffer, batch.getKeys(), atomic);
return new ArrayList<>();
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region);
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
logger.warn("ReSplitting ranges for BatchGetRequest", e);
// retry
@ -736,7 +735,7 @@ public class RawKVClient implements AutoCloseable {
private List<Batch> doSendBatchDeleteWithRefetchRegion(BackOffer backOffer, Batch batch) {
return getBatches(
backOffer, batch.keys, RAW_BATCH_DELETE_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
backOffer, batch.getKeys(), RAW_BATCH_DELETE_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
}
private ByteString calcKeyByCondition(boolean condition, ByteString key1, ByteString key2) {
@ -756,15 +755,14 @@ public class RawKVClient implements AutoCloseable {
TiRegion region = regions.get(i);
ByteString start = calcKeyByCondition(i == 0, startKey, region.getStartKey());
ByteString end = calcKeyByCondition(i == regions.size() - 1, endKey, region.getEndKey());
ranges.add(new DeleteRange(region, start, end));
ranges.add(new DeleteRange(backOffer, region, start, end));
}
Queue<List<DeleteRange>> taskQueue = new LinkedList<>();
taskQueue.offer(ranges);
while (!taskQueue.isEmpty()) {
List<DeleteRange> task = taskQueue.poll();
for (DeleteRange range : task) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(() -> doSendDeleteRangeWithRetry(singleBatchBackOffer, range));
completionService.submit(() -> doSendDeleteRangeWithRetry(range.getBackOffer(), range));
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
@ -795,7 +793,7 @@ public class RawKVClient implements AutoCloseable {
ByteString start = calcKeyByCondition(i == 0, range.getStartKey(), region.getStartKey());
ByteString end =
calcKeyByCondition(i == regions.size() - 1, range.getEndKey(), region.getEndKey());
retryRanges.add(new DeleteRange(region, start, end));
retryRanges.add(new DeleteRange(backOffer, region, start, end));
}
return retryRanges;
}

View File

@ -140,9 +140,8 @@ public class KVClient implements AutoCloseable {
getBatches(backOffer, keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
for (Batch batch : batches) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(
() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch, version));
() -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch, version));
}
return getKvPairs(completionService, batches, BackOffer.BATCH_GET_MAX_BACKOFF);
@ -150,17 +149,17 @@ public class KVClient implements AutoCloseable {
private List<Kvrpcpb.KvPair> doSendBatchGetInBatchesWithRetry(
BackOffer backOffer, Batch batch, long version) {
TiRegion oldRegion = batch.region;
TiRegion oldRegion = batch.getRegion();
TiRegion currentRegion =
clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey());
if (oldRegion.equals(currentRegion)) {
RegionStoreClient client = clientBuilder.build(batch.region);
RegionStoreClient client = clientBuilder.build(batch.getRegion());
try {
return client.batchGet(backOffer, batch.keys, version);
return client.batchGet(backOffer, batch.getKeys(), version);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region);
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
logger.warn("ReSplitting ranges for BatchGetRequest", e);
// retry
@ -174,7 +173,7 @@ public class KVClient implements AutoCloseable {
private List<Kvrpcpb.KvPair> doSendBatchGetWithRefetchRegion(
BackOffer backOffer, Batch batch, long version) {
List<Batch> retryBatches =
getBatches(backOffer, batch.keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
getBatches(backOffer, batch.getKeys(), BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
ArrayList<Kvrpcpb.KvPair> results = new ArrayList<>();
for (Batch retryBatch : retryBatches) {