diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index 337873eb6b..9668f78d7c 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -19,12 +19,14 @@ package org.tikv.common.util; import static org.tikv.common.ConfigUtils.TIKV_BO_REGION_MISS_BASE_IN_MS; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.prometheus.client.Histogram; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; @@ -37,8 +39,11 @@ import org.tikv.common.log.SlowLogSpan; public class ConcreteBackOffer implements BackOffer { private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class); private final int maxSleep; - private final Map backOffFunctionMap; - private final List errors; + + @VisibleForTesting + public final Map backOffFunctionMap; + + @VisibleForTesting public final List errors; private int totalSleep; private final long deadline; private final SlowLog slowLog; @@ -56,8 +61,8 @@ public class ConcreteBackOffer implements BackOffer { Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0."); Preconditions.checkArgument(deadline >= 0, "Deadline cannot be less than 0."); this.maxSleep = maxSleep; - this.errors = new ArrayList<>(); - this.backOffFunctionMap = new HashMap<>(); + this.errors = Collections.synchronizedList(new ArrayList<>()); + this.backOffFunctionMap = new ConcurrentHashMap<>(); this.deadline = deadline; this.slowLog = slowLog; } diff --git a/src/test/java/org/tikv/util/ConcreteBackOfferTest.java b/src/test/java/org/tikv/util/ConcreteBackOfferTest.java new file mode 100644 index 0000000000..0b8adcf20e --- /dev/null +++ b/src/test/java/org/tikv/util/ConcreteBackOfferTest.java @@ -0,0 +1,86 @@ +/* + * Copyright 2017 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.exception.GrpcException; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffFunction.BackOffFuncType; +import org.tikv.common.util.ConcreteBackOffer; + +public class ConcreteBackOfferTest { + + private static void ignoreException(Callable callable) throws Exception { + try { + callable.call(); + } catch (Exception ignored) { + } + } + + @Test + public void raceMapTest() throws Exception { + ConcreteBackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(); + ignoreException( + () -> { + backOffer.doBackOff(BackOffFuncType.BoRegionMiss, new Exception("first backoff")); + return null; + }); + ignoreException( + () -> { + backOffer.doBackOff(BackOffFuncType.BoTiKVRPC, new Exception("second backoff")); + return null; + }); + for (Entry item : backOffer.backOffFunctionMap.entrySet()) { + backOffer.backOffFunctionMap.remove(item.getKey()); + } + } + + @Test + public void raceErrorsTest() throws Exception { + int timeToSleep = 1; + int threadCnt = Runtime.getRuntime().availableProcessors() * 2; + int taskCnt = threadCnt * 2; + + ExecutorService executorService = Executors.newFixedThreadPool(threadCnt); + ConcreteBackOffer backOffer = ConcreteBackOffer.newCustomBackOff(timeToSleep); + List> tasks = new ArrayList<>(); + for (int i = 0; i < taskCnt; i++) { + int idx = i; + Future task = + executorService.submit( + () -> { + try { + backOffer.doBackOff( + BackOffFuncType.BoUpdateLeader, new Exception("backoff " + idx)); + } catch (GrpcException ignored) { + } + }); + tasks.add(task); + } + for (Future task : tasks) { + task.get(); + } + Assert.assertEquals(backOffer.errors.size(), taskCnt); + } +}