xds: implement a global map for holding circuit breaker request counters (#7588)

Circuit breakers should be applied to clusters in the global scope. However, the LB hierarchy might cause the LB policy (currently EDS, but cluster_impl in the future) that applies circuit breaking to be duplicated. Also, for multi-channel cases, the circuit breaking threshold should still be shared across channels in the process.

This change creates a global map for accessing circuit breaking atomics that used to count the number of outstanding requests per global cluster basis. Atomics in the global map are held by WeakReferences so LB policies/Pickers/StreamTracers do not need to worry about counter's lifecycle and refcount.
This commit is contained in:
Chengyuan Zhang 2020-11-13 12:12:32 -08:00 committed by GitHub
parent ddd5dea7e9
commit a43ae54c59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 185 additions and 5 deletions

View File

@ -72,6 +72,7 @@ final class EdsLoadBalancer2 extends LoadBalancer {
private final SynchronizationContext syncContext; private final SynchronizationContext syncContext;
private final LoadBalancerRegistry lbRegistry; private final LoadBalancerRegistry lbRegistry;
private final ThreadSafeRandom random; private final ThreadSafeRandom random;
private final CallCounterProvider callCounterProvider;
private final GracefulSwitchLoadBalancer switchingLoadBalancer; private final GracefulSwitchLoadBalancer switchingLoadBalancer;
private ObjectPool<XdsClient> xdsClientPool; private ObjectPool<XdsClient> xdsClientPool;
private XdsClient xdsClient; private XdsClient xdsClient;
@ -79,15 +80,17 @@ final class EdsLoadBalancer2 extends LoadBalancer {
private EdsLbState edsLbState; private EdsLbState edsLbState;
EdsLoadBalancer2(LoadBalancer.Helper helper) { EdsLoadBalancer2(LoadBalancer.Helper helper) {
this(helper, LoadBalancerRegistry.getDefaultRegistry(), ThreadSafeRandomImpl.instance); this(helper, LoadBalancerRegistry.getDefaultRegistry(), ThreadSafeRandomImpl.instance,
SharedCallCounterMap.getInstance());
} }
@VisibleForTesting @VisibleForTesting
EdsLoadBalancer2( EdsLoadBalancer2(LoadBalancer.Helper helper, LoadBalancerRegistry lbRegistry,
LoadBalancer.Helper helper, LoadBalancerRegistry lbRegistry, ThreadSafeRandom random) { ThreadSafeRandom random, CallCounterProvider callCounterProvider) {
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
this.random = checkNotNull(random, "random"); this.random = checkNotNull(random, "random");
syncContext = checkNotNull(helper, "helper").getSynchronizationContext(); syncContext = checkNotNull(helper, "helper").getSynchronizationContext();
this.callCounterProvider = checkNotNull(callCounterProvider, "callCounterProvider");
switchingLoadBalancer = new GracefulSwitchLoadBalancer(helper); switchingLoadBalancer = new GracefulSwitchLoadBalancer(helper);
InternalLogId logId = InternalLogId.allocate("eds-lb", helper.getAuthority()); InternalLogId logId = InternalLogId.allocate("eds-lb", helper.getAuthority());
logger = XdsLogger.withLogId(logId); logger = XdsLogger.withLogId(logId);
@ -160,7 +163,7 @@ final class EdsLoadBalancer2 extends LoadBalancer {
} }
private final class ChildLbState extends LoadBalancer implements EdsResourceWatcher { private final class ChildLbState extends LoadBalancer implements EdsResourceWatcher {
private final AtomicLong requestCount = new AtomicLong(); private final AtomicLong requestCount;
@Nullable @Nullable
private final LoadStatsStore loadStatsStore; private final LoadStatsStore loadStatsStore;
private final RequestLimitingLbHelper lbHelper; private final RequestLimitingLbHelper lbHelper;
@ -175,6 +178,7 @@ final class EdsLoadBalancer2 extends LoadBalancer {
private LoadBalancer lb; private LoadBalancer lb;
private ChildLbState(Helper helper) { private ChildLbState(Helper helper) {
requestCount = callCounterProvider.getOrCreate(cluster, edsServiceName);
if (lrsServerName != null) { if (lrsServerName != null) {
loadStatsStore = xdsClient.addClientStats(cluster, edsServiceName); loadStatsStore = xdsClient.addClientStats(cluster, edsServiceName);
} else { } else {
@ -494,6 +498,14 @@ final class EdsLoadBalancer2 extends LoadBalancer {
} }
} }
/**
* Provides the counter for aggregating outstanding requests per cluster:eds_service_name.
*/
// Introduced for testing.
interface CallCounterProvider {
AtomicLong getOrCreate(String cluster, @Nullable String edsServiceName);
}
@VisibleForTesting @VisibleForTesting
static PriorityLbConfig generatePriorityLbConfig( static PriorityLbConfig generatePriorityLbConfig(
String cluster, String edsServiceName, String lrsServerName, String cluster, String edsServiceName, String lrsServerName,

View File

@ -0,0 +1,100 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.xds.EdsLoadBalancer2.CallCounterProvider;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/**
* The global map for holding circuit breaker atomic counters.
*/
@ThreadSafe
final class SharedCallCounterMap implements CallCounterProvider {
private final ReferenceQueue<AtomicLong> refQueue = new ReferenceQueue<>();
private final Map<String, Map<String, CounterReference>> counters;
private SharedCallCounterMap() {
this(new HashMap<String, Map<String, CounterReference>>());
}
@VisibleForTesting
SharedCallCounterMap(Map<String, Map<String, CounterReference>> counters) {
this.counters = checkNotNull(counters, "counters");
}
static SharedCallCounterMap getInstance() {
return SharedCallCounterMapHolder.instance;
}
@Override
public synchronized AtomicLong getOrCreate(String cluster, @Nullable String edsServiceName) {
Map<String, CounterReference> clusterCounters = counters.get(cluster);
if (clusterCounters == null) {
clusterCounters = new HashMap<>();
counters.put(cluster, clusterCounters);
}
CounterReference ref = clusterCounters.get(edsServiceName);
AtomicLong counter;
if (ref == null || (counter = ref.get()) == null) {
counter = new AtomicLong();
ref = new CounterReference(counter, refQueue, cluster, edsServiceName);
clusterCounters.put(edsServiceName, ref);
}
cleanQueue();
return counter;
}
@VisibleForTesting
void cleanQueue() {
CounterReference ref;
while ((ref = (CounterReference) refQueue.poll()) != null) {
Map<String, CounterReference> clusterCounter = counters.get(ref.cluster);
clusterCounter.remove(ref.edsServiceName);
if (clusterCounter.isEmpty()) {
counters.remove(ref.cluster);
}
}
}
@VisibleForTesting
static final class CounterReference extends WeakReference<AtomicLong> {
private final String cluster;
@Nullable
private final String edsServiceName;
CounterReference(AtomicLong counter, ReferenceQueue<AtomicLong> refQueue, String cluster,
@Nullable String edsServiceName) {
super(counter, refQueue);
this.cluster = cluster;
this.edsServiceName = edsServiceName;
}
}
private static final class SharedCallCounterMapHolder {
private static final SharedCallCounterMap instance = new SharedCallCounterMap();
}
}

View File

@ -49,6 +49,7 @@ import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.EdsLoadBalancer2.CallCounterProvider;
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
import io.grpc.xds.EnvoyProtoData.ClusterStats; import io.grpc.xds.EnvoyProtoData.ClusterStats;
import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.DropOverload;
@ -69,6 +70,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import org.junit.After; import org.junit.After;
@ -135,9 +137,16 @@ public class EdsLoadBalancer2Test {
public void setUp() { public void setUp() {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
CallCounterProvider callCounterProvider = new CallCounterProvider() {
@Override
public AtomicLong getOrCreate(String cluster, @Nullable String edsServiceName) {
return new AtomicLong();
}
};
registry.register(new FakeLoadBalancerProvider(PRIORITY_POLICY_NAME)); registry.register(new FakeLoadBalancerProvider(PRIORITY_POLICY_NAME));
registry.register(new FakeLoadBalancerProvider(LRS_POLICY_NAME)); registry.register(new FakeLoadBalancerProvider(LRS_POLICY_NAME));
loadBalancer = new EdsLoadBalancer2(helper, registry, mockRandom); loadBalancer = new EdsLoadBalancer2(helper, registry, mockRandom, callCounterProvider);
loadBalancer.handleResolvedAddresses( loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder() ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList()) .setAddresses(Collections.<EquivalentAddressGroup>emptyList())

View File

@ -0,0 +1,59 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.testing.GcFinalization;
import io.grpc.xds.SharedCallCounterMap.CounterReference;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link SharedCallCounterMap}.
*/
@RunWith(JUnit4.class)
public class SharedCallCounterMapTest {
private static final String CLUSTER = "cluster-foo.googleapis.com";
private static final String EDS_SERVICE_NAME = null;
private final Map<String, Map<String, CounterReference>> counters = new HashMap<>();
private final SharedCallCounterMap map = new SharedCallCounterMap(counters);
@Test
public void sharedCounterInstance() {
AtomicLong counter1 = map.getOrCreate(CLUSTER, EDS_SERVICE_NAME);
AtomicLong counter2 = map.getOrCreate(CLUSTER, EDS_SERVICE_NAME);
assertThat(counter2).isSameInstanceAs(counter1);
}
@Test
public void autoCleanUp() {
@SuppressWarnings("UnusedVariable")
AtomicLong counter = map.getOrCreate(CLUSTER, EDS_SERVICE_NAME);
CounterReference ref = counters.get(CLUSTER).get(EDS_SERVICE_NAME);
counter = null;
GcFinalization.awaitClear(ref);
map.cleanQueue();
assertThat(counters).isEmpty();
}
}