fix multi-thread on backOffer

Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
birdstorm 2018-12-21 16:09:40 +08:00
parent c7ff78aefe
commit ce2adbe2c9
3 changed files with 18 additions and 7 deletions

View File

@ -26,10 +26,10 @@ import org.apache.log4j.Logger;
import org.tikv.common.exception.GrpcException;
public class ConcreteBackOffer implements BackOffer {
private int maxSleep;
private final int maxSleep;
private int totalSleep;
private Map<BackOffFunction.BackOffFuncType, BackOffFunction> backOffFunctionMap;
private List<Exception> errors;
private final Map<BackOffFunction.BackOffFuncType, BackOffFunction> backOffFunctionMap;
private final List<Exception> errors;
private static final Logger logger = Logger.getLogger(ConcreteBackOffer.class);
public static ConcreteBackOffer newCustomBackOff(int maxSleep) {
@ -60,6 +60,10 @@ public class ConcreteBackOffer implements BackOffer {
return new ConcreteBackOffer(tsoMaxBackoff);
}
public static ConcreteBackOffer create(BackOffer source) {
return new ConcreteBackOffer(((ConcreteBackOffer) source));
}
private ConcreteBackOffer(int maxSleep) {
Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0.");
this.maxSleep = maxSleep;
@ -67,6 +71,13 @@ public class ConcreteBackOffer implements BackOffer {
this.backOffFunctionMap = new HashMap<>();
}
private ConcreteBackOffer(ConcreteBackOffer source) {
this.maxSleep = source.maxSleep;
this.totalSleep = source.totalSleep;
this.errors = source.errors;
this.backOffFunctionMap = source.backOffFunctionMap;
}
/**
* Creates a back off func which implements exponential back off with optional jitters according
* to different back off strategies. See http://www.awsarchitectureblog.com/2015/03/backoff.html

View File

@ -270,6 +270,7 @@ public class RawKVClient implements AutoCloseable {
for (Batch batch : batches) {
completionService.submit(
() -> {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
Pair<TiRegion, Metapb.Store> pair =
regionManager.getRegionStorePairByRegionId(batch.regionId);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
@ -282,13 +283,13 @@ public class RawKVClient implements AutoCloseable {
.build());
}
try {
client.rawBatchPut(backOffer, kvPairs);
client.rawBatchPut(singleBatchBackOffer, kvPairs);
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
singleBatchBackOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("ReSplitting ranges for BatchPutRequest");
// recursive calls
batchPut(backOffer, batch.keys, batch.values);
batchPut(singleBatchBackOffer, batch.keys, batch.values);
}
return null;
});

View File

@ -1,6 +1,5 @@
package org.tikv.raw;
import com.google.protobuf.ByteString;
import java.util.*;
import java.util.concurrent.*;