[close #489 ] fix backoffer data race (#491)

This commit is contained in:
iosmanthus 2022-01-21 10:42:44 +08:00 committed by GitHub
parent 6bda197de2
commit a70c050e5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 5 deletions

View File

@ -19,12 +19,14 @@ package org.tikv.common.util;
import static org.tikv.common.ConfigUtils.TIKV_BO_REGION_MISS_BASE_IN_MS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
@ -37,8 +39,11 @@ import org.tikv.common.log.SlowLogSpan;
public class ConcreteBackOffer implements BackOffer {
private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class);
private final int maxSleep;
private final Map<BackOffFunction.BackOffFuncType, BackOffFunction> backOffFunctionMap;
private final List<Exception> errors;
@VisibleForTesting
public final Map<BackOffFunction.BackOffFuncType, BackOffFunction> backOffFunctionMap;
@VisibleForTesting public final List<Exception> errors;
private int totalSleep;
private final long deadline;
private final SlowLog slowLog;
@ -56,8 +61,8 @@ public class ConcreteBackOffer implements BackOffer {
Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0.");
Preconditions.checkArgument(deadline >= 0, "Deadline cannot be less than 0.");
this.maxSleep = maxSleep;
this.errors = new ArrayList<>();
this.backOffFunctionMap = new HashMap<>();
this.errors = Collections.synchronizedList(new ArrayList<>());
this.backOffFunctionMap = new ConcurrentHashMap<>();
this.deadline = deadline;
this.slowLog = slowLog;
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2017 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Test;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffFunction.BackOffFuncType;
import org.tikv.common.util.ConcreteBackOffer;
public class ConcreteBackOfferTest {
private static void ignoreException(Callable<Void> callable) throws Exception {
try {
callable.call();
} catch (Exception ignored) {
}
}
@Test
public void raceMapTest() throws Exception {
ConcreteBackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
ignoreException(
() -> {
backOffer.doBackOff(BackOffFuncType.BoRegionMiss, new Exception("first backoff"));
return null;
});
ignoreException(
() -> {
backOffer.doBackOff(BackOffFuncType.BoTiKVRPC, new Exception("second backoff"));
return null;
});
for (Entry<BackOffFuncType, BackOffFunction> item : backOffer.backOffFunctionMap.entrySet()) {
backOffer.backOffFunctionMap.remove(item.getKey());
}
}
@Test
public void raceErrorsTest() throws Exception {
int timeToSleep = 1;
int threadCnt = Runtime.getRuntime().availableProcessors() * 2;
int taskCnt = threadCnt * 2;
ExecutorService executorService = Executors.newFixedThreadPool(threadCnt);
ConcreteBackOffer backOffer = ConcreteBackOffer.newCustomBackOff(timeToSleep);
List<Future<?>> tasks = new ArrayList<>();
for (int i = 0; i < taskCnt; i++) {
int idx = i;
Future<?> task =
executorService.submit(
() -> {
try {
backOffer.doBackOff(
BackOffFuncType.BoUpdateLeader, new Exception("backoff " + idx));
} catch (GrpcException ignored) {
}
});
tasks.add(task);
}
for (Future<?> task : tasks) {
task.get();
}
Assert.assertEquals(backOffer.errors.size(), taskCnt);
}
}