mirror of https://github.com/grpc/grpc-java.git
Outlier detection load balancer (#9447)
New outlier detection load balancer. Tracks all RPC results to the addresses it is configured with and periodically attempts to detect outlier. It wraps a child load balancer from which it hides any addresses that are deemed outliers. As specified in gRFC A50: gRPC xDS Outlier Detection Support: https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md
This commit is contained in:
parent
778098b911
commit
128688ae4d
|
|
@ -163,6 +163,38 @@ public final class SynchronizationContext implements Executor {
|
|||
return new ScheduledHandle(runnable, future);
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a task to be added and run via {@link #execute} after an inital delay and then
|
||||
* repeated after the delay until cancelled.
|
||||
*
|
||||
* @param task the task being scheduled
|
||||
* @param initialDelay the delay before the first run
|
||||
* @param delay the delay after the first run.
|
||||
* @param unit the time unit for the delay
|
||||
* @param timerService the {@code ScheduledExecutorService} that provides delayed execution
|
||||
*
|
||||
* @return an object for checking the status and/or cancel the scheduled task
|
||||
*/
|
||||
public final ScheduledHandle scheduleWithFixedDelay(
|
||||
final Runnable task, long initialDelay, long delay, TimeUnit unit,
|
||||
ScheduledExecutorService timerService) {
|
||||
final ManagedRunnable runnable = new ManagedRunnable(task);
|
||||
ScheduledFuture<?> future = timerService.scheduleWithFixedDelay(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
execute(runnable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay
|
||||
+ ")";
|
||||
}
|
||||
}, initialDelay, delay, unit);
|
||||
return new ScheduledHandle(runnable, future);
|
||||
}
|
||||
|
||||
|
||||
private static class ManagedRunnable implements Runnable {
|
||||
final Runnable task;
|
||||
boolean isCancelled;
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ public class LoadBalancerRegistryTest {
|
|||
@Test
|
||||
public void stockProviders() {
|
||||
LoadBalancerRegistry defaultRegistry = LoadBalancerRegistry.getDefaultRegistry();
|
||||
assertThat(defaultRegistry.providers()).hasSize(3);
|
||||
assertThat(defaultRegistry.providers()).hasSize(4);
|
||||
|
||||
LoadBalancerProvider pickFirst = defaultRegistry.getProvider("pick_first");
|
||||
assertThat(pickFirst).isInstanceOf(PickFirstLoadBalancerProvider.class);
|
||||
|
|
@ -52,6 +52,12 @@ public class LoadBalancerRegistryTest {
|
|||
"io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider");
|
||||
assertThat(roundRobin.getPriority()).isEqualTo(5);
|
||||
|
||||
LoadBalancerProvider outlierDetection = defaultRegistry.getProvider(
|
||||
"outlier_detection_experimental");
|
||||
assertThat(outlierDetection.getClass().getName()).isEqualTo(
|
||||
"io.grpc.util.OutlierDetectionLoadBalancerProvider");
|
||||
assertThat(roundRobin.getPriority()).isEqualTo(5);
|
||||
|
||||
LoadBalancerProvider grpclb = defaultRegistry.getProvider("grpclb");
|
||||
assertThat(grpclb).isInstanceOf(GrpclbLoadBalancerProvider.class);
|
||||
assertThat(grpclb.getPriority()).isEqualTo(5);
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,158 @@
|
|||
/*
|
||||
* Copyright 2022 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.util;
|
||||
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.JsonUtil;
|
||||
import io.grpc.internal.ServiceConfigUtil;
|
||||
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.internal.TimeProvider;
|
||||
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
|
||||
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig.FailurePercentageEjection;
|
||||
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig.SuccessRateEjection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public final class OutlierDetectionLoadBalancerProvider extends LoadBalancerProvider {
|
||||
|
||||
@Override
|
||||
public LoadBalancer newLoadBalancer(Helper helper) {
|
||||
return new OutlierDetectionLoadBalancer(helper, TimeProvider.SYSTEM_TIME_PROVIDER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAvailable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPolicyName() {
|
||||
return "outlier_detection_experimental";
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawConfig) {
|
||||
// Common configuration.
|
||||
Long intervalNanos = JsonUtil.getStringAsDuration(rawConfig, "interval");
|
||||
Long baseEjectionTimeNanos = JsonUtil.getStringAsDuration(rawConfig, "baseEjectionTime");
|
||||
Long maxEjectionTimeNanos = JsonUtil.getStringAsDuration(rawConfig, "maxEjectionTime");
|
||||
Integer maxEjectionPercentage = JsonUtil.getNumberAsInteger(rawConfig,
|
||||
"maxEjectionPercentage");
|
||||
|
||||
OutlierDetectionLoadBalancerConfig.Builder configBuilder
|
||||
= new OutlierDetectionLoadBalancerConfig.Builder();
|
||||
if (intervalNanos != null) {
|
||||
configBuilder.setIntervalNanos(intervalNanos);
|
||||
}
|
||||
if (baseEjectionTimeNanos != null) {
|
||||
configBuilder.setBaseEjectionTimeNanos(baseEjectionTimeNanos);
|
||||
}
|
||||
if (maxEjectionTimeNanos != null) {
|
||||
configBuilder.setMaxEjectionTimeNanos(maxEjectionTimeNanos);
|
||||
}
|
||||
if (maxEjectionPercentage != null) {
|
||||
configBuilder.setMaxEjectionPercent(maxEjectionPercentage);
|
||||
}
|
||||
|
||||
// Success rate ejection specific configuration.
|
||||
Map<String, ?> rawSuccessRateEjection = JsonUtil.getObject(rawConfig, "successRateEjection");
|
||||
if (rawSuccessRateEjection != null) {
|
||||
SuccessRateEjection.Builder successRateEjectionBuilder = new SuccessRateEjection.Builder();
|
||||
|
||||
Integer stdevFactor = JsonUtil.getNumberAsInteger(rawSuccessRateEjection, "stdevFactor");
|
||||
Integer enforcementPercentage = JsonUtil.getNumberAsInteger(rawSuccessRateEjection,
|
||||
"enforcementPercentage");
|
||||
Integer minimumHosts = JsonUtil.getNumberAsInteger(rawSuccessRateEjection, "minimumHosts");
|
||||
Integer requestVolume = JsonUtil.getNumberAsInteger(rawSuccessRateEjection, "requestVolume");
|
||||
|
||||
if (stdevFactor != null) {
|
||||
successRateEjectionBuilder.setStdevFactor(stdevFactor);
|
||||
}
|
||||
if (enforcementPercentage != null) {
|
||||
successRateEjectionBuilder.setEnforcementPercentage(enforcementPercentage);
|
||||
}
|
||||
if (minimumHosts != null) {
|
||||
successRateEjectionBuilder.setMinimumHosts(minimumHosts);
|
||||
}
|
||||
if (requestVolume != null) {
|
||||
successRateEjectionBuilder.setRequestVolume(requestVolume);
|
||||
}
|
||||
|
||||
configBuilder.setSuccessRateEjection(successRateEjectionBuilder.build());
|
||||
}
|
||||
|
||||
// Failure percentage ejection specific configuration.
|
||||
Map<String, ?> rawFailurePercentageEjection = JsonUtil.getObject(rawConfig,
|
||||
"failurePercentageEjection");
|
||||
if (rawFailurePercentageEjection != null) {
|
||||
FailurePercentageEjection.Builder failurePercentageEjectionBuilder
|
||||
= new FailurePercentageEjection.Builder();
|
||||
|
||||
Integer threshold = JsonUtil.getNumberAsInteger(rawFailurePercentageEjection, "threshold");
|
||||
Integer enforcementPercentage = JsonUtil.getNumberAsInteger(rawFailurePercentageEjection,
|
||||
"enforcementPercentage");
|
||||
Integer minimumHosts = JsonUtil.getNumberAsInteger(rawFailurePercentageEjection,
|
||||
"minimumHosts");
|
||||
Integer requestVolume = JsonUtil.getNumberAsInteger(rawFailurePercentageEjection,
|
||||
"requestVolume");
|
||||
|
||||
if (threshold != null) {
|
||||
failurePercentageEjectionBuilder.setThreshold(threshold);
|
||||
}
|
||||
if (enforcementPercentage != null) {
|
||||
failurePercentageEjectionBuilder.setEnforcementPercentage(enforcementPercentage);
|
||||
}
|
||||
if (minimumHosts != null) {
|
||||
failurePercentageEjectionBuilder.setMinimumHosts(minimumHosts);
|
||||
}
|
||||
if (requestVolume != null) {
|
||||
failurePercentageEjectionBuilder.setRequestVolume(requestVolume);
|
||||
}
|
||||
|
||||
configBuilder.setFailurePercentageEjection(failurePercentageEjectionBuilder.build());
|
||||
}
|
||||
|
||||
// Child load balancer configuration.
|
||||
List<LbConfig> childConfigCandidates = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(rawConfig, "childPolicy"));
|
||||
if (childConfigCandidates == null || childConfigCandidates.isEmpty()) {
|
||||
return ConfigOrError.fromError(Status.INTERNAL.withDescription(
|
||||
"No child policy in outlier_detection_experimental LB policy: "
|
||||
+ rawConfig));
|
||||
}
|
||||
ConfigOrError selectedConfig =
|
||||
ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates,
|
||||
LoadBalancerRegistry.getDefaultRegistry());
|
||||
if (selectedConfig.getError() != null) {
|
||||
return selectedConfig;
|
||||
}
|
||||
configBuilder.setChildPolicy((PolicySelection) selectedConfig.getConfig());
|
||||
|
||||
return ConfigOrError.fromConfig(configBuilder.build());
|
||||
}
|
||||
}
|
||||
|
|
@ -1,2 +1,3 @@
|
|||
io.grpc.internal.PickFirstLoadBalancerProvider
|
||||
io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider
|
||||
io.grpc.util.OutlierDetectionLoadBalancerProvider
|
||||
|
|
|
|||
|
|
@ -159,8 +159,10 @@ public final class FakeClock {
|
|||
}
|
||||
|
||||
@Override public ScheduledFuture<?> scheduleWithFixedDelay(
|
||||
Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
Runnable cmd, long initialDelay, long delay, TimeUnit unit) {
|
||||
ScheduledTask task = new ScheduleWithFixedDelayTask(cmd, delay, unit);
|
||||
schedule(task, initialDelay, unit);
|
||||
return task;
|
||||
}
|
||||
|
||||
@Override public boolean awaitTermination(long timeout, TimeUnit unit) {
|
||||
|
|
@ -234,6 +236,24 @@ public final class FakeClock {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ScheduleWithFixedDelayTask extends ScheduledTask {
|
||||
|
||||
final long delayNanos;
|
||||
|
||||
ScheduleWithFixedDelayTask(Runnable command, long delay, TimeUnit unit) {
|
||||
super(command);
|
||||
this.delayNanos = unit.toNanos(delay);
|
||||
}
|
||||
|
||||
@Override
|
||||
void run() {
|
||||
command.run();
|
||||
if (!isCancelled()) {
|
||||
schedule(this, delayNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,141 @@
|
|||
/*
|
||||
* Copyright 2022 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.util;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.grpc.InternalServiceProviders;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.internal.JsonParser;
|
||||
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
|
||||
import java.io.IOException;
|
||||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link OutlierDetectionLoadBalancerProvider}.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class OutlierDetectionLoadBalancerProviderTest {
|
||||
|
||||
private final SynchronizationContext syncContext = new SynchronizationContext(
|
||||
new UncaughtExceptionHandler() {
|
||||
@Override
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
private final OutlierDetectionLoadBalancerProvider provider
|
||||
= new OutlierDetectionLoadBalancerProvider();
|
||||
|
||||
@Test
|
||||
public void provided() {
|
||||
for (LoadBalancerProvider current : InternalServiceProviders.getCandidatesViaServiceLoader(
|
||||
LoadBalancerProvider.class, getClass().getClassLoader())) {
|
||||
if (current instanceof OutlierDetectionLoadBalancerProvider) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
fail("OutlierDetectionLoadBalancerProvider not registered");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void providesLoadBalancer() {
|
||||
Helper helper = mock(Helper.class);
|
||||
when(helper.getSynchronizationContext()).thenReturn(syncContext);
|
||||
when(helper.getScheduledExecutorService()).thenReturn(mock(ScheduledExecutorService.class));
|
||||
assertThat(provider.newLoadBalancer(helper))
|
||||
.isInstanceOf(OutlierDetectionLoadBalancer.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseLoadBalancingConfig_defaults() throws IOException {
|
||||
String lbConfig =
|
||||
"{ \"successRateEjection\" : {}, "
|
||||
+ "\"failurePercentageEjection\" : {}, "
|
||||
+ "\"childPolicy\" : [{\"round_robin\" : {}}]}";
|
||||
ConfigOrError configOrError =
|
||||
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
|
||||
assertThat(configOrError.getConfig()).isNotNull();
|
||||
OutlierDetectionLoadBalancerConfig config
|
||||
= (OutlierDetectionLoadBalancerConfig) configOrError.getConfig();
|
||||
assertThat(config.successRateEjection).isNotNull();
|
||||
assertThat(config.failurePercentageEjection).isNotNull();
|
||||
assertThat(config.childPolicy.getProvider().getPolicyName()).isEqualTo("round_robin");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseLoadBalancingConfig_valuesSet() throws IOException {
|
||||
String lbConfig =
|
||||
"{\"interval\" : \"100s\","
|
||||
+ " \"baseEjectionTime\" : \"100s\","
|
||||
+ " \"maxEjectionTime\" : \"100s\","
|
||||
+ " \"maxEjectionPercentage\" : 100,"
|
||||
+ " \"successRateEjection\" : {"
|
||||
+ " \"stdevFactor\" : 100,"
|
||||
+ " \"enforcementPercentage\" : 100,"
|
||||
+ " \"minimumHosts\" : 100,"
|
||||
+ " \"requestVolume\" : 100"
|
||||
+ " },"
|
||||
+ " \"failurePercentageEjection\" : {"
|
||||
+ " \"threshold\" : 100,"
|
||||
+ " \"enforcementPercentage\" : 100,"
|
||||
+ " \"minimumHosts\" : 100,"
|
||||
+ " \"requestVolume\" : 100"
|
||||
+ " },"
|
||||
+ "\"childPolicy\" : [{\"round_robin\" : {}}]}";
|
||||
ConfigOrError configOrError =
|
||||
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
|
||||
assertThat(configOrError.getConfig()).isNotNull();
|
||||
OutlierDetectionLoadBalancerConfig config
|
||||
= (OutlierDetectionLoadBalancerConfig) configOrError.getConfig();
|
||||
|
||||
assertThat(config.intervalNanos).isEqualTo(100_000_000_000L);
|
||||
assertThat(config.baseEjectionTimeNanos).isEqualTo(100_000_000_000L);
|
||||
assertThat(config.maxEjectionTimeNanos).isEqualTo(100_000_000_000L);
|
||||
assertThat(config.maxEjectionPercent).isEqualTo(100);
|
||||
|
||||
assertThat(config.successRateEjection).isNotNull();
|
||||
assertThat(config.successRateEjection.stdevFactor).isEqualTo(100);
|
||||
assertThat(config.successRateEjection.enforcementPercentage).isEqualTo(100);
|
||||
assertThat(config.successRateEjection.minimumHosts).isEqualTo(100);
|
||||
assertThat(config.successRateEjection.requestVolume).isEqualTo(100);
|
||||
|
||||
assertThat(config.failurePercentageEjection).isNotNull();
|
||||
assertThat(config.failurePercentageEjection.threshold).isEqualTo(100);
|
||||
assertThat(config.failurePercentageEjection.enforcementPercentage).isEqualTo(100);
|
||||
assertThat(config.failurePercentageEjection.minimumHosts).isEqualTo(100);
|
||||
assertThat(config.failurePercentageEjection.requestVolume).isEqualTo(100);
|
||||
|
||||
assertThat(config.childPolicy.getProvider().getPolicyName()).isEqualTo("round_robin");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Map<String, ?> parseJsonObject(String json) throws IOException {
|
||||
return (Map<String, ?>) JsonParser.parse(json);
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue