rls: add counter metrics (#11138)

Adds the following metrics to the RlsLoadBalancer:
- grpc.lb.rls.default_target_picks
- grpc.lb.rls.target_picks
- grpc.lb.rls.failed_picks
This commit is contained in:
Terry Wilson 2024-05-01 11:24:38 -07:00 committed by GitHub
parent 4561bb5b80
commit a9fb272b78
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 162 additions and 4 deletions

View File

@ -716,6 +716,13 @@ public abstract class LoadBalancer {
return drop;
}
/**
* Returns {@code true} if the pick was not created with {@link #withNoResult()}.
*/
public boolean hasResult() {
return !(subchannel == null && status.isOk());
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)

View File

@ -24,6 +24,7 @@ import com.google.common.base.Converter;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Ticker;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@ -36,9 +37,11 @@ import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.Status;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
@ -87,6 +90,10 @@ final class CachingRlsLbClient {
/** Minimum bytes for a Java Object. */
public static final int OBJ_OVERHEAD_B = 16;
private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
// All cache status changes (pending, backoff, success) must be under this lock
private final Object lock = new Object();
// LRU cache based on access order (BACKOFF and actual data will be here)
@ -115,6 +122,23 @@ final class CachingRlsLbClient {
private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
private final ChannelLogger logger;
static {
MetricInstrumentRegistry metricInstrumentRegistry
= MetricInstrumentRegistry.getDefaultRegistry();
DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.lb.rls.default_target_picks", "Number of LB picks sent to the default target", "pick",
Lists.newArrayList("grpc.target", "grpc.lb.rls.server_target",
"grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Lists.newArrayList(), true);
TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
"Number of LB picks sent to each RLS target", "pick",
Lists.newArrayList("grpc.target", "grpc.lb.rls.server_target",
"grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Lists.newArrayList(), true);
FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
"Number of LB picks failed due to either a failed RLS request or the RLS channel being "
+ "throttled", "pick", Lists.newArrayList("grpc.target", "grpc.lb.rls.server_target"),
Lists.newArrayList(), true);
}
private CachingRlsLbClient(Builder builder) {
helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
scheduledExecutorService = helper.getScheduledExecutorService();
@ -147,7 +171,7 @@ final class CachingRlsLbClient {
}
RlsRequestFactory requestFactory = new RlsRequestFactory(
lbPolicyConfig.getRouteLookupConfig(), serverHost);
rlsPicker = new RlsPicker(requestFactory);
rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
// It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
// RLS server using the same authority as the backends, even though the RLS servers addresses
// will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
@ -904,9 +928,11 @@ final class CachingRlsLbClient {
final class RlsPicker extends SubchannelPicker {
private final RlsRequestFactory requestFactory;
private final String lookupService;
RlsPicker(RlsRequestFactory requestFactory) {
RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
this.requestFactory = checkNotNull(requestFactory, "requestFactory");
this.lookupService = checkNotNull(lookupService, "rlsConfig");
}
@Override
@ -941,7 +967,14 @@ final class CachingRlsLbClient {
}
// Happy path
logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
return picker.pickSubchannel(args);
PickResult pickResult = picker.pickSubchannel(args);
// TODO: include the "grpc.target" label once target is available here.
if (pickResult.hasResult()) {
helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
Lists.newArrayList("", lookupService, childPolicyWrapper.getTarget(),
determineMetricsPickResult(pickResult)), Lists.newArrayList());
}
return pickResult;
} else if (response.hasError()) {
logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
if (hasFallback) {
@ -949,6 +982,9 @@ final class CachingRlsLbClient {
return useFallback(args);
}
logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");
// TODO: include the "grpc.target" label once target is available here.
helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
Lists.newArrayList("", lookupService), Lists.newArrayList());
return PickResult.withError(
convertRlsServerStatus(response.getStatus(),
lbPolicyConfig.getRouteLookupConfig().lookupService()));
@ -969,7 +1005,24 @@ final class CachingRlsLbClient {
if (picker == null) {
return PickResult.withNoResult();
}
return picker.pickSubchannel(args);
PickResult pickResult = picker.pickSubchannel(args);
if (pickResult.hasResult()) {
// TODO: include the grpc.target label once target is available here.
helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
Lists.newArrayList("", lookupService, fallbackChildPolicyWrapper.getTarget(),
determineMetricsPickResult(pickResult)), Lists.newArrayList());
}
return pickResult;
}
private String determineMetricsPickResult(PickResult pickResult) {
if (pickResult.getStatus().isOk()) {
return "complete";
} else if (pickResult.isDrop()) {
return "drop";
} else {
return "fail";
}
}
private void startFallbackChildPolicy() {

View File

@ -50,6 +50,7 @@ import io.grpc.LoadBalancerProvider;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MetricRecorder;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.Status.Code;
@ -121,6 +122,8 @@ public class CachingRlsLbClientTest {
private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
@Mock
private SocketAddress socketAddress;
@Mock
private MetricRecorder mockMetricRecorder;
private final SynchronizationContext syncContext =
new SynchronizationContext(new UncaughtExceptionHandler() {
@ -892,6 +895,11 @@ public class CachingRlsLbClientTest {
public ChannelLogger getChannelLogger() {
return mock(ChannelLogger.class);
}
@Override
public MetricRecorder getMetricRecorder() {
return mockMetricRecorder;
}
}
private static final class FakeThrottler implements Throttler {

View File

@ -20,15 +20,19 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.base.Converter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ChannelCredentials;
@ -46,14 +50,17 @@ import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.MetricRecorder;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
@ -86,6 +93,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
@ -108,6 +116,8 @@ public class RlsLoadBalancerTest {
throw new RuntimeException(e);
}
});
@Mock
private MetricRecorder mockMetricRecorder;
private final FakeHelper helperDelegate = new FakeHelper();
private final Helper helper =
mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate));
@ -195,6 +205,7 @@ public class RlsLoadBalancerTest {
subchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
res = picker.pickSubchannel(fakeSearchMethodArgs);
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK);
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete");
// Check on conversion
Throwable cause = new Throwable("cause");
@ -205,6 +216,7 @@ public class RlsLoadBalancerTest {
assertThat(serverStatus.getDescription()).contains("RLS server returned: ");
assertThat(serverStatus.getDescription()).endsWith("ABORTED: base desc");
assertThat(serverStatus.getDescription()).contains("RLS server conv.test");
verifyNoMoreInteractions(mockMetricRecorder);
}
@Test
@ -226,6 +238,7 @@ public class RlsLoadBalancerTest {
inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();
inOrder.verifyNoMoreInteractions();
assertThat(res.getStatus().isOk()).isTrue();
assertThat(subchannels).hasSize(1);
FakeSubchannel searchSubchannel = subchannels.getLast();
@ -238,6 +251,7 @@ public class RlsLoadBalancerTest {
res = picker.pickSubchannel(searchSubchannelArgs);
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
assertThat(res.getSubchannel()).isSameInstanceAs(searchSubchannel);
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete");
// rescue should be pending status although the overall channel state is READY
res = picker.pickSubchannel(rescueSubchannelArgs);
@ -262,6 +276,29 @@ public class RlsLoadBalancerTest {
res = picker.pickSubchannel(searchSubchannelArgs);
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
verifyNoMoreInteractions(mockMetricRecorder);
}
@Test
public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception {
defaultTarget = "";
fakeThrottler.nextResult = true;
deliverResolvedAddresses();
InOrder inOrder = inOrder(helper);
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
// With no RLS response and no fallback, we should see a failure
PickResult res = picker.pickSubchannel(searchSubchannelArgs); // create subchannel
assertThat(res.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(helper).getMetricRecorder();
inOrder.verifyNoMoreInteractions();
verifyFailedPicksCounterAdd(1, 1);
verifyNoMoreInteractions(mockMetricRecorder);
}
@Test
@ -281,15 +318,20 @@ public class RlsLoadBalancerTest {
(FakeSubchannel) markReadyAndGetPickResult(inOrder, searchSubchannelArgs).getSubchannel();
assertThat(fallbackSubchannel).isNotNull();
assertThat(subchannelIsReady(fallbackSubchannel)).isTrue();
inOrder.verify(helper).getMetricRecorder();
inOrder.verifyNoMoreInteractions();
verifyLongCounterAdd("grpc.lb.rls.default_target_picks", 1, 1, "defaultTarget", "complete");
verifyNoMoreInteractions(mockMetricRecorder);
Subchannel subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel();
assertThat(subchannelIsReady(subchannel)).isTrue();
assertThat(subchannel).isSameInstanceAs(fallbackSubchannel);
verifyLongCounterAdd("grpc.lb.rls.default_target_picks", 2, 1, "defaultTarget", "complete");
subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel();
assertThat(subchannelIsReady(subchannel)).isTrue();
assertThat(subchannel).isSameInstanceAs(fallbackSubchannel);
verifyLongCounterAdd("grpc.lb.rls.default_target_picks", 3, 1, "defaultTarget", "complete");
// Make sure that when RLS starts communicating that default stops being used
fakeThrottler.nextResult = false;
@ -300,6 +342,7 @@ public class RlsLoadBalancerTest {
(FakeSubchannel) markReadyAndGetPickResult(inOrder, searchSubchannelArgs).getSubchannel();
assertThat(searchSubchannel).isNotNull();
assertThat(searchSubchannel).isNotSameInstanceAs(fallbackSubchannel);
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete");
// create rescue subchannel
picker.pickSubchannel(rescueSubchannelArgs);
@ -308,6 +351,7 @@ public class RlsLoadBalancerTest {
assertThat(rescueSubchannel).isNotNull();
assertThat(rescueSubchannel).isNotSameInstanceAs(fallbackSubchannel);
assertThat(rescueSubchannel).isNotSameInstanceAs(searchSubchannel);
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "civilization", "complete");
// all channels are failed
rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
@ -318,6 +362,9 @@ public class RlsLoadBalancerTest {
searchSubchannelArgs);
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(res.getSubchannel()).isNull();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
verifyNoMoreInteractions(mockMetricRecorder);
}
@Test
@ -345,6 +392,7 @@ public class RlsLoadBalancerTest {
assertThat(subchannels).hasSize(1);
FakeSubchannel searchSubchannel =
(FakeSubchannel) markReadyAndGetPickResult(inOrder, searchSubchannelArgs).getSubchannel();
inOrder.verify(helper).getMetricRecorder();
inOrder.verifyNoMoreInteractions();
assertThat(subchannelIsReady(searchSubchannel)).isTrue();
assertThat(subchannels.getLast()).isSameInstanceAs(searchSubchannel);
@ -373,11 +421,13 @@ public class RlsLoadBalancerTest {
res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
assertThat(res.getStatus().isOk()).isFalse();
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete");
res = picker.pickSubchannel(newPickSubchannelArgs(fakeRescueMethod));
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
assertThat(res.getSubchannel().getAddresses()).isEqualTo(rescueSubchannel.getAddresses());
assertThat(res.getSubchannel().getAttributes()).isEqualTo(rescueSubchannel.getAttributes());
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "civilization", "complete");
// all channels are failed
rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
@ -385,6 +435,8 @@ public class RlsLoadBalancerTest {
.updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
inOrder.verify(helper, atLeast(0)).refreshNameResolution();
inOrder.verifyNoMoreInteractions();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
verifyNoMoreInteractions(mockMetricRecorder);
}
@Test
@ -413,7 +465,9 @@ public class RlsLoadBalancerTest {
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses());
assertThat(res.getSubchannel().getAttributes()).isEqualTo(searchSubchannel.getAttributes());
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete");
inOrder.verify(helper).getMetricRecorder();
inOrder.verifyNoMoreInteractions();
rlsLb.handleNameResolutionError(Status.UNAVAILABLE);
@ -424,6 +478,7 @@ public class RlsLoadBalancerTest {
res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
assertThat(res.getStatus().isOk()).isFalse();
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
verifyNoMoreInteractions(mockMetricRecorder);
}
private PickResult markReadyAndGetPickResult(InOrder inOrder,
@ -492,6 +547,36 @@ public class RlsLoadBalancerTest {
+ "}";
}
// Verifies that the MetricRecorder has been called to record a long counter value of 1 for the
// given metric name, the given number of times
private void verifyLongCounterAdd(String name, int times, long value,
String dataPlaneTargetLabel, String pickResult) {
// TODO: support the "grpc.target" label once available.
verify(mockMetricRecorder, times(times)).addLongCounter(
argThat(new ArgumentMatcher<LongCounterMetricInstrument>() {
@Override
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
return longCounterInstrument.getName().equals(name);
}
}), eq(value),
eq(Lists.newArrayList("", "localhost:8972", dataPlaneTargetLabel, pickResult)),
eq(Lists.newArrayList()));
}
// This one is for verifying the failed_pick metric specifically.
private void verifyFailedPicksCounterAdd(int times, long value) {
// TODO: support the "grpc.target" label once available.
verify(mockMetricRecorder, times(times)).addLongCounter(
argThat(new ArgumentMatcher<LongCounterMetricInstrument>() {
@Override
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
return longCounterInstrument.getName().equals("grpc.lb.rls.failed_picks");
}
}), eq(value),
eq(Lists.newArrayList("", "localhost:8972")),
eq(Lists.newArrayList()));
}
private PickSubchannelArgs newPickSubchannelArgs(MethodDescriptor<?, ?> method) {
return new PickSubchannelArgsImpl(
method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {});
@ -585,6 +670,11 @@ public class RlsLoadBalancerTest {
public ChannelLogger getChannelLogger() {
return mock(ChannelLogger.class);
}
@Override
public MetricRecorder getMetricRecorder() {
return mockMetricRecorder;
}
}
private static final class FakeRlsServerImpl