mirror of https://github.com/grpc/grpc-java.git
core: add logger to OutlierDetectionLoadBalancer (#9880)
This commit is contained in:
parent
3d4d46d9ff
commit
67d6600f71
|
|
@ -26,6 +26,8 @@ import com.google.common.collect.ForwardingMap;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import io.grpc.Attributes;
|
import io.grpc.Attributes;
|
||||||
|
import io.grpc.ChannelLogger;
|
||||||
|
import io.grpc.ChannelLogger.ChannelLogLevel;
|
||||||
import io.grpc.ClientStreamTracer;
|
import io.grpc.ClientStreamTracer;
|
||||||
import io.grpc.ClientStreamTracer.StreamInfo;
|
import io.grpc.ClientStreamTracer.StreamInfo;
|
||||||
import io.grpc.ConnectivityState;
|
import io.grpc.ConnectivityState;
|
||||||
|
|
@ -73,6 +75,8 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
private ScheduledHandle detectionTimerHandle;
|
private ScheduledHandle detectionTimerHandle;
|
||||||
private Long detectionTimerStartNanos;
|
private Long detectionTimerStartNanos;
|
||||||
|
|
||||||
|
private final ChannelLogger logger;
|
||||||
|
|
||||||
private static final Attributes.Key<AddressTracker> ADDRESS_TRACKER_ATTR_KEY
|
private static final Attributes.Key<AddressTracker> ADDRESS_TRACKER_ATTR_KEY
|
||||||
= Attributes.Key.create("addressTrackerKey");
|
= Attributes.Key.create("addressTrackerKey");
|
||||||
|
|
||||||
|
|
@ -80,16 +84,19 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
* Creates a new instance of {@link OutlierDetectionLoadBalancer}.
|
* Creates a new instance of {@link OutlierDetectionLoadBalancer}.
|
||||||
*/
|
*/
|
||||||
public OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider) {
|
public OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider) {
|
||||||
|
logger = helper.getChannelLogger();
|
||||||
childHelper = new ChildHelper(checkNotNull(helper, "helper"));
|
childHelper = new ChildHelper(checkNotNull(helper, "helper"));
|
||||||
switchLb = new GracefulSwitchLoadBalancer(childHelper);
|
switchLb = new GracefulSwitchLoadBalancer(childHelper);
|
||||||
trackerMap = new AddressTrackerMap();
|
trackerMap = new AddressTrackerMap();
|
||||||
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
|
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
|
||||||
this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
|
this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
|
||||||
this.timeProvider = timeProvider;
|
this.timeProvider = timeProvider;
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "OutlierDetection lb created.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
||||||
OutlierDetectionLoadBalancerConfig config
|
OutlierDetectionLoadBalancerConfig config
|
||||||
= (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
= (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||||
|
|
||||||
|
|
@ -129,7 +136,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
trackerMap.resetCallCounters();
|
trackerMap.resetCallCounters();
|
||||||
}
|
}
|
||||||
|
|
||||||
detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config),
|
detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config, logger),
|
||||||
initialDelayNanos, config.intervalNanos, NANOSECONDS, timeService);
|
initialDelayNanos, config.intervalNanos, NANOSECONDS, timeService);
|
||||||
} else if (detectionTimerHandle != null) {
|
} else if (detectionTimerHandle != null) {
|
||||||
// Outlier detection is not configured, but we have a lingering timer. Let's cancel it and
|
// Outlier detection is not configured, but we have a lingering timer. Let's cancel it and
|
||||||
|
|
@ -162,9 +169,11 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
class DetectionTimer implements Runnable {
|
class DetectionTimer implements Runnable {
|
||||||
|
|
||||||
OutlierDetectionLoadBalancerConfig config;
|
OutlierDetectionLoadBalancerConfig config;
|
||||||
|
ChannelLogger logger;
|
||||||
|
|
||||||
DetectionTimer(OutlierDetectionLoadBalancerConfig config) {
|
DetectionTimer(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -173,7 +182,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
trackerMap.swapCounters();
|
trackerMap.swapCounters();
|
||||||
|
|
||||||
for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config)) {
|
for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config, logger)) {
|
||||||
algo.ejectOutliers(trackerMap, detectionTimerStartNanos);
|
algo.ejectOutliers(trackerMap, detectionTimerStartNanos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -235,9 +244,11 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
private boolean ejected;
|
private boolean ejected;
|
||||||
private ConnectivityStateInfo lastSubchannelState;
|
private ConnectivityStateInfo lastSubchannelState;
|
||||||
private SubchannelStateListener subchannelStateListener;
|
private SubchannelStateListener subchannelStateListener;
|
||||||
|
private final ChannelLogger logger;
|
||||||
|
|
||||||
OutlierDetectionSubchannel(Subchannel delegate) {
|
OutlierDetectionSubchannel(Subchannel delegate) {
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
|
this.logger = delegate.getChannelLogger();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -316,12 +327,14 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
ejected = true;
|
ejected = true;
|
||||||
subchannelStateListener.onSubchannelState(
|
subchannelStateListener.onSubchannelState(
|
||||||
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
|
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
|
||||||
|
logger.log(ChannelLogLevel.INFO, "Subchannel ejected: {0}", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
void uneject() {
|
void uneject() {
|
||||||
ejected = false;
|
ejected = false;
|
||||||
if (lastSubchannelState != null) {
|
if (lastSubchannelState != null) {
|
||||||
subchannelStateListener.onSubchannelState(lastSubchannelState);
|
subchannelStateListener.onSubchannelState(lastSubchannelState);
|
||||||
|
logger.log(ChannelLogLevel.INFO, "Subchannel unejected: {0}", this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -353,6 +366,13 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "OutlierDetectionSubchannel{"
|
||||||
|
+ "addresses=" + delegate.getAllAddresses()
|
||||||
|
+ '}';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -576,6 +596,13 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
failureCount.set(0);
|
failureCount.set(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "AddressTracker{"
|
||||||
|
+ "subchannels=" + subchannels
|
||||||
|
+ '}';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -684,13 +711,14 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
/** Builds a list of algorithms that are enabled in the given config. */
|
/** Builds a list of algorithms that are enabled in the given config. */
|
||||||
@Nullable
|
@Nullable
|
||||||
static List<OutlierEjectionAlgorithm> forConfig(OutlierDetectionLoadBalancerConfig config) {
|
static List<OutlierEjectionAlgorithm> forConfig(OutlierDetectionLoadBalancerConfig config,
|
||||||
|
ChannelLogger logger) {
|
||||||
ImmutableList.Builder<OutlierEjectionAlgorithm> algoListBuilder = ImmutableList.builder();
|
ImmutableList.Builder<OutlierEjectionAlgorithm> algoListBuilder = ImmutableList.builder();
|
||||||
if (config.successRateEjection != null) {
|
if (config.successRateEjection != null) {
|
||||||
algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config));
|
algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config, logger));
|
||||||
}
|
}
|
||||||
if (config.failurePercentageEjection != null) {
|
if (config.failurePercentageEjection != null) {
|
||||||
algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config));
|
algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config, logger));
|
||||||
}
|
}
|
||||||
return algoListBuilder.build();
|
return algoListBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
@ -705,9 +733,13 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
private final OutlierDetectionLoadBalancerConfig config;
|
private final OutlierDetectionLoadBalancerConfig config;
|
||||||
|
|
||||||
SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config) {
|
private final ChannelLogger logger;
|
||||||
|
|
||||||
|
SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
|
||||||
|
ChannelLogger logger) {
|
||||||
checkArgument(config.successRateEjection != null, "success rate ejection config is null");
|
checkArgument(config.successRateEjection != null, "success rate ejection config is null");
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -744,6 +776,11 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
// If success rate is below the threshold, eject the address.
|
// If success rate is below the threshold, eject the address.
|
||||||
if (tracker.successRate() < requiredSuccessRate) {
|
if (tracker.successRate() < requiredSuccessRate) {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG,
|
||||||
|
"SuccessRate algorithm detected outlier: {0}. "
|
||||||
|
+ "Parameters: successRate={1}, mean={2}, stdev={3}, "
|
||||||
|
+ "requiredSuccessRate={4}",
|
||||||
|
tracker, tracker.successRate(), mean, stdev, requiredSuccessRate);
|
||||||
// Only eject some addresses based on the enforcement percentage.
|
// Only eject some addresses based on the enforcement percentage.
|
||||||
if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) {
|
if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) {
|
||||||
tracker.ejectSubchannels(ejectionTimeNanos);
|
tracker.ejectSubchannels(ejectionTimeNanos);
|
||||||
|
|
@ -781,8 +818,12 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
private final OutlierDetectionLoadBalancerConfig config;
|
private final OutlierDetectionLoadBalancerConfig config;
|
||||||
|
|
||||||
FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config) {
|
private final ChannelLogger logger;
|
||||||
|
|
||||||
|
FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
|
||||||
|
ChannelLogger logger) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -814,6 +855,9 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
|
||||||
// If the failure rate is above the threshold, we should eject...
|
// If the failure rate is above the threshold, we should eject...
|
||||||
double maxFailureRate = ((double)config.failurePercentageEjection.threshold) / 100;
|
double maxFailureRate = ((double)config.failurePercentageEjection.threshold) / 100;
|
||||||
if (tracker.failureRate() > maxFailureRate) {
|
if (tracker.failureRate() > maxFailureRate) {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG,
|
||||||
|
"FailurePercentage algorithm detected outlier: {0}, failureRate={1}",
|
||||||
|
tracker, tracker.failureRate());
|
||||||
// ...but only enforce this based on the enforcement percentage.
|
// ...but only enforce this based on the enforcement percentage.
|
||||||
if (new Random().nextInt(100) < config.failurePercentageEjection.enforcementPercentage) {
|
if (new Random().nextInt(100) < config.failurePercentageEjection.enforcementPercentage) {
|
||||||
tracker.ejectSubchannels(ejectionTimeNanos);
|
tracker.ejectSubchannels(ejectionTimeNanos);
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import io.grpc.ChannelLogger;
|
||||||
import io.grpc.InternalServiceProviders;
|
import io.grpc.InternalServiceProviders;
|
||||||
import io.grpc.LoadBalancer.Helper;
|
import io.grpc.LoadBalancer.Helper;
|
||||||
import io.grpc.LoadBalancerProvider;
|
import io.grpc.LoadBalancerProvider;
|
||||||
|
|
@ -66,6 +67,9 @@ public class OutlierDetectionLoadBalancerProviderTest {
|
||||||
@Test
|
@Test
|
||||||
public void providesLoadBalancer() {
|
public void providesLoadBalancer() {
|
||||||
Helper helper = mock(Helper.class);
|
Helper helper = mock(Helper.class);
|
||||||
|
ChannelLogger channelLogger = mock(ChannelLogger.class);
|
||||||
|
|
||||||
|
when(helper.getChannelLogger()).thenReturn(channelLogger);
|
||||||
when(helper.getSynchronizationContext()).thenReturn(syncContext);
|
when(helper.getSynchronizationContext()).thenReturn(syncContext);
|
||||||
when(helper.getScheduledExecutorService()).thenReturn(mock(ScheduledExecutorService.class));
|
when(helper.getScheduledExecutorService()).thenReturn(mock(ScheduledExecutorService.class));
|
||||||
assertThat(provider.newLoadBalancer(helper))
|
assertThat(provider.newLoadBalancer(helper))
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
import io.grpc.ChannelLogger;
|
||||||
import io.grpc.ClientStreamTracer;
|
import io.grpc.ClientStreamTracer;
|
||||||
import io.grpc.ConnectivityState;
|
import io.grpc.ConnectivityState;
|
||||||
import io.grpc.ConnectivityStateInfo;
|
import io.grpc.ConnectivityStateInfo;
|
||||||
|
|
@ -165,6 +166,9 @@ public class OutlierDetectionLoadBalancerTest {
|
||||||
subchannel4 = subchannelIterator.next();
|
subchannel4 = subchannelIterator.next();
|
||||||
subchannel5 = subchannelIterator.next();
|
subchannel5 = subchannelIterator.next();
|
||||||
|
|
||||||
|
ChannelLogger channelLogger = mock(ChannelLogger.class);
|
||||||
|
|
||||||
|
when(mockHelper.getChannelLogger()).thenReturn(channelLogger);
|
||||||
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
|
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
|
||||||
when(mockHelper.getScheduledExecutorService()).thenReturn(
|
when(mockHelper.getScheduledExecutorService()).thenReturn(
|
||||||
fakeClock.getScheduledExecutorService());
|
fakeClock.getScheduledExecutorService());
|
||||||
|
|
@ -174,6 +178,7 @@ public class OutlierDetectionLoadBalancerTest {
|
||||||
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
|
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
|
||||||
CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0];
|
CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0];
|
||||||
final Subchannel subchannel = subchannels.get(args.getAddresses());
|
final Subchannel subchannel = subchannels.get(args.getAddresses());
|
||||||
|
when(subchannel.getChannelLogger()).thenReturn(channelLogger);
|
||||||
when(subchannel.getAllAddresses()).thenReturn(args.getAddresses());
|
when(subchannel.getAllAddresses()).thenReturn(args.getAddresses());
|
||||||
when(subchannel.getAttributes()).thenReturn(args.getAttributes());
|
when(subchannel.getAttributes()).thenReturn(args.getAttributes());
|
||||||
doAnswer(new Answer<Void>() {
|
doAnswer(new Answer<Void>() {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue