mirror of https://github.com/grpc/grpc-java.git
rls: Add gauge metric recording (#11175)
Adds these gauges: - grpc.lb.rls.cache_entries - grpc.lb.rls.cache_size
This commit is contained in:
parent
7a663f633c
commit
511b9c3a5b
|
|
@ -37,10 +37,14 @@ import io.grpc.LoadBalancer.PickSubchannelArgs;
|
|||
import io.grpc.LoadBalancer.ResolvedAddresses;
|
||||
import io.grpc.LoadBalancer.SubchannelPicker;
|
||||
import io.grpc.LongCounterMetricInstrument;
|
||||
import io.grpc.LongGaugeMetricInstrument;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MetricInstrumentRegistry;
|
||||
import io.grpc.MetricRecorder.BatchCallback;
|
||||
import io.grpc.MetricRecorder.BatchRecorder;
|
||||
import io.grpc.MetricRecorder.Registration;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.internal.ExponentialBackoffPolicy;
|
||||
|
|
@ -65,6 +69,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -94,6 +99,10 @@ final class CachingRlsLbClient {
|
|||
private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
|
||||
private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
|
||||
private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
|
||||
private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
|
||||
private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
|
||||
private final Registration gaugeRegistration;
|
||||
private final String metricsInstanceUuid = UUID.randomUUID().toString();
|
||||
|
||||
// All cache status changes (pending, backoff, success) must be under this lock
|
||||
private final Object lock = new Object();
|
||||
|
|
@ -138,6 +147,14 @@ final class CachingRlsLbClient {
|
|||
"Number of LB picks failed due to either a failed RLS request or the RLS channel being "
|
||||
+ "throttled", "pick", Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
|
||||
Collections.emptyList(), true);
|
||||
CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
|
||||
"Number of entries in the RLS cache", "entry",
|
||||
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
|
||||
Collections.emptyList(), true);
|
||||
CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
|
||||
"The current size of the RLS cache", "byte",
|
||||
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
|
||||
Collections.emptyList(), true);
|
||||
}
|
||||
|
||||
private CachingRlsLbClient(Builder builder) {
|
||||
|
|
@ -202,6 +219,26 @@ final class CachingRlsLbClient {
|
|||
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
|
||||
childLbHelperProvider,
|
||||
new BackoffRefreshListener());
|
||||
|
||||
gaugeRegistration = helper.getMetricRecorder()
|
||||
.registerBatchCallback(new BatchCallback() {
|
||||
@Override
|
||||
public void accept(BatchRecorder recorder) {
|
||||
int estimatedSize;
|
||||
long estimatedSizeBytes;
|
||||
synchronized (lock) {
|
||||
estimatedSize = linkedHashLruCache.estimatedSize();
|
||||
estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
|
||||
}
|
||||
recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
|
||||
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
|
||||
metricsInstanceUuid), Collections.emptyList());
|
||||
recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
|
||||
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
|
||||
metricsInstanceUuid), Collections.emptyList());
|
||||
}
|
||||
}, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
|
||||
|
||||
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
|
||||
}
|
||||
|
||||
|
|
@ -306,6 +343,7 @@ final class CachingRlsLbClient {
|
|||
pendingCallCache.clear();
|
||||
rlsChannel.shutdownNow();
|
||||
rlsPicker.close();
|
||||
gaugeRegistration.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,12 +23,15 @@ import static com.google.common.truth.Truth.assertWithMessage;
|
|||
import static io.grpc.rls.CachingRlsLbClient.RLS_DATA_KEY;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.mockito.AdditionalAnswers.delegatesTo;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.inOrder;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.base.Converter;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
|
@ -47,10 +50,14 @@ import io.grpc.LoadBalancer.PickDetailsConsumer;
|
|||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.LoadBalancer.SubchannelPicker;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.LongGaugeMetricInstrument;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MetricRecorder;
|
||||
import io.grpc.MetricRecorder.BatchCallback;
|
||||
import io.grpc.MetricRecorder.BatchRecorder;
|
||||
import io.grpc.MetricRecorder.Registration;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.Status.Code;
|
||||
|
|
@ -95,12 +102,14 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.TimeoutException;
|
||||
import javax.annotation.Nonnull;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
import org.mockito.AdditionalAnswers;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.InOrder;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.MockitoJUnit;
|
||||
|
|
@ -124,6 +133,13 @@ public class CachingRlsLbClientTest {
|
|||
private SocketAddress socketAddress;
|
||||
@Mock
|
||||
private MetricRecorder mockMetricRecorder;
|
||||
@Mock
|
||||
private BatchRecorder mockBatchRecorder;
|
||||
@Mock
|
||||
private Registration mockGaugeRegistration;
|
||||
@Captor
|
||||
private ArgumentCaptor<BatchCallback> gaugeBatchCallbackCaptor;
|
||||
|
||||
|
||||
private final SynchronizationContext syncContext =
|
||||
new SynchronizationContext(new UncaughtExceptionHandler() {
|
||||
|
|
@ -145,7 +161,7 @@ public class CachingRlsLbClientTest {
|
|||
private final ChildLoadBalancingPolicy childLbPolicy =
|
||||
new ChildLoadBalancingPolicy("target", Collections.<String, Object>emptyMap(), lbProvider);
|
||||
private final Helper helper =
|
||||
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
|
||||
mock(Helper.class, delegatesTo(new FakeHelper()));
|
||||
private final FakeThrottler fakeThrottler = new FakeThrottler();
|
||||
private final LbPolicyConfiguration lbPolicyConfiguration =
|
||||
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, null, childLbPolicy);
|
||||
|
|
@ -168,6 +184,11 @@ public class CachingRlsLbClientTest {
|
|||
.build();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpMockMetricRecorder() {
|
||||
when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
rlsLbClient.close();
|
||||
|
|
@ -636,6 +657,51 @@ public class CachingRlsLbClientTest {
|
|||
policyWrapper.getHelper().updateBalancingState(newState, policyWrapper.getPicker());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metricGauges() throws ExecutionException, InterruptedException, TimeoutException {
|
||||
setUpRlsLbClient();
|
||||
|
||||
verify(mockMetricRecorder).registerBatchCallback(gaugeBatchCallbackCaptor.capture(),
|
||||
any());
|
||||
|
||||
BatchCallback gaugeBatchCallback = gaugeBatchCallbackCaptor.getValue();
|
||||
|
||||
// Verify the correct cache gauge values when requested at this point.
|
||||
InOrder inOrder = inOrder(mockBatchRecorder);
|
||||
gaugeBatchCallback.accept(mockBatchRecorder);
|
||||
inOrder.verify(mockBatchRecorder).recordLongGauge(
|
||||
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(0L),
|
||||
any(), any());
|
||||
inOrder.verify(mockBatchRecorder)
|
||||
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
|
||||
eq(0L), any(), any());
|
||||
|
||||
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(
|
||||
ImmutableMap.of("server", "bigtable.googleapis.com", "service-key", "foo", "method-key",
|
||||
"bar"));
|
||||
rlsServerImpl.setLookupTable(ImmutableMap.of(routeLookupRequest,
|
||||
RouteLookupResponse.create(ImmutableList.of("target"), "header")));
|
||||
|
||||
// Make a request that will populate the cache with an entry
|
||||
getInSyncContext(routeLookupRequest);
|
||||
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
|
||||
|
||||
// Gauge values should reflect the new cache entry.
|
||||
gaugeBatchCallback.accept(mockBatchRecorder);
|
||||
inOrder.verify(mockBatchRecorder).recordLongGauge(
|
||||
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(1L),
|
||||
any(), any());
|
||||
inOrder.verify(mockBatchRecorder)
|
||||
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
|
||||
eq(260L), any(), any());
|
||||
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
|
||||
// Shutdown
|
||||
rlsLbClient.close();
|
||||
verify(mockGaugeRegistration).close();
|
||||
}
|
||||
|
||||
private static RouteLookupConfig getRouteLookupConfig() {
|
||||
return RouteLookupConfig.builder()
|
||||
.grpcKeybuilders(ImmutableList.of(
|
||||
|
|
@ -667,6 +733,21 @@ public class CachingRlsLbClientTest {
|
|||
};
|
||||
}
|
||||
|
||||
private static class LongGaugeInstrumentArgumentMatcher implements
|
||||
ArgumentMatcher<LongGaugeMetricInstrument> {
|
||||
|
||||
private final String instrumentName;
|
||||
|
||||
public LongGaugeInstrumentArgumentMatcher(String instrumentName) {
|
||||
this.instrumentName = instrumentName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(LongGaugeMetricInstrument instrument) {
|
||||
return instrument.getName().equals(instrumentName);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class FakeBackoffProvider implements BackoffPolicy.Provider {
|
||||
|
||||
private BackoffPolicy nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ import static org.mockito.Mockito.inOrder;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.base.Converter;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
|
@ -59,6 +59,7 @@ import io.grpc.MethodDescriptor;
|
|||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.MetricInstrument;
|
||||
import io.grpc.MetricRecorder;
|
||||
import io.grpc.MetricRecorder.Registration;
|
||||
import io.grpc.MetricSink;
|
||||
import io.grpc.NameResolver.ConfigOrError;
|
||||
import io.grpc.NoopMetricSink;
|
||||
|
|
@ -128,6 +129,8 @@ public class RlsLoadBalancerTest {
|
|||
});
|
||||
@Mock
|
||||
private MetricRecorder mockMetricRecorder;
|
||||
@Mock
|
||||
private Registration mockGaugeRegistration;
|
||||
private final FakeHelper helperDelegate = new FakeHelper();
|
||||
private final Helper helper =
|
||||
mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate));
|
||||
|
|
@ -186,6 +189,8 @@ public class RlsLoadBalancerTest {
|
|||
|
||||
searchSubchannelArgs = newPickSubchannelArgs(fakeSearchMethod);
|
||||
rescueSubchannelArgs = newPickSubchannelArgs(fakeRescueMethod);
|
||||
|
||||
when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
@ -226,7 +231,6 @@ public class RlsLoadBalancerTest {
|
|||
assertThat(serverStatus.getDescription()).contains("RLS server returned: ");
|
||||
assertThat(serverStatus.getDescription()).endsWith("ABORTED: base desc");
|
||||
assertThat(serverStatus.getDescription()).contains("RLS server conv.test");
|
||||
verifyNoMoreInteractions(mockMetricRecorder);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -290,8 +294,6 @@ public class RlsLoadBalancerTest {
|
|||
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
||||
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
|
||||
|
||||
verifyNoMoreInteractions(mockMetricRecorder);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -351,7 +353,6 @@ public class RlsLoadBalancerTest {
|
|||
inOrder.verify(helper).getChannelTarget();
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
verifyFailedPicksCounterAdd(1, 1);
|
||||
verifyNoMoreInteractions(mockMetricRecorder);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -377,7 +378,6 @@ public class RlsLoadBalancerTest {
|
|||
int times = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2;
|
||||
verifyLongCounterAdd("grpc.lb.rls.default_target_picks", times, 1,
|
||||
"defaultTarget", "complete");
|
||||
verifyNoMoreInteractions(mockMetricRecorder);
|
||||
|
||||
Subchannel subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel();
|
||||
assertThat(subchannelIsReady(subchannel)).isTrue();
|
||||
|
|
@ -422,8 +422,6 @@ public class RlsLoadBalancerTest {
|
|||
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
|
||||
assertThat(res.getSubchannel()).isNull();
|
||||
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
|
||||
|
||||
verifyNoMoreInteractions(mockMetricRecorder);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -499,7 +497,6 @@ public class RlsLoadBalancerTest {
|
|||
inOrder.verify(helper, atLeast(0)).refreshNameResolution();
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
|
||||
verifyNoMoreInteractions(mockMetricRecorder);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -542,7 +539,6 @@ public class RlsLoadBalancerTest {
|
|||
res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
|
||||
assertThat(res.getStatus().isOk()).isFalse();
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
||||
verifyNoMoreInteractions(mockMetricRecorder);
|
||||
}
|
||||
|
||||
private PickResult markReadyAndGetPickResult(InOrder inOrder,
|
||||
|
|
|
|||
Loading…
Reference in New Issue