xds: Add counter and gauge metrics (#11661)

Adds the following xDS client metrics defined in [A78](https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient).

Counters
- grpc.xds_client.server_failure
- grpc.xds_client.resource_updates_valid
- grpc.xds_client.resource_updates_invalid

Gauges
- grpc.xds_client.connected
- grpc.xds_client.resources
This commit is contained in:
Vindhya Ningegowda 2024-11-25 16:47:32 -08:00 committed by GitHub
parent 92de2f34dc
commit 20d09cee57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1381 additions and 184 deletions

View File

@ -290,6 +290,7 @@ public abstract class NameResolver {
@Nullable private final ChannelLogger channelLogger; @Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor; @Nullable private final Executor executor;
@Nullable private final String overrideAuthority; @Nullable private final String overrideAuthority;
@Nullable private final MetricRecorder metricRecorder;
private Args( private Args(
Integer defaultPort, Integer defaultPort,
@ -299,7 +300,8 @@ public abstract class NameResolver {
@Nullable ScheduledExecutorService scheduledExecutorService, @Nullable ScheduledExecutorService scheduledExecutorService,
@Nullable ChannelLogger channelLogger, @Nullable ChannelLogger channelLogger,
@Nullable Executor executor, @Nullable Executor executor,
@Nullable String overrideAuthority) { @Nullable String overrideAuthority,
@Nullable MetricRecorder metricRecorder) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set"); this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set"); this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set"); this.syncContext = checkNotNull(syncContext, "syncContext not set");
@ -308,6 +310,7 @@ public abstract class NameResolver {
this.channelLogger = channelLogger; this.channelLogger = channelLogger;
this.executor = executor; this.executor = executor;
this.overrideAuthority = overrideAuthority; this.overrideAuthority = overrideAuthority;
this.metricRecorder = metricRecorder;
} }
/** /**
@ -405,6 +408,14 @@ public abstract class NameResolver {
return overrideAuthority; return overrideAuthority;
} }
/**
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
*/
@Nullable
public MetricRecorder getMetricRecorder() {
return metricRecorder;
}
@Override @Override
public String toString() { public String toString() {
@ -417,6 +428,7 @@ public abstract class NameResolver {
.add("channelLogger", channelLogger) .add("channelLogger", channelLogger)
.add("executor", executor) .add("executor", executor)
.add("overrideAuthority", overrideAuthority) .add("overrideAuthority", overrideAuthority)
.add("metricRecorder", metricRecorder)
.toString(); .toString();
} }
@ -435,6 +447,7 @@ public abstract class NameResolver {
builder.setChannelLogger(channelLogger); builder.setChannelLogger(channelLogger);
builder.setOffloadExecutor(executor); builder.setOffloadExecutor(executor);
builder.setOverrideAuthority(overrideAuthority); builder.setOverrideAuthority(overrideAuthority);
builder.setMetricRecorder(metricRecorder);
return builder; return builder;
} }
@ -461,6 +474,7 @@ public abstract class NameResolver {
private ChannelLogger channelLogger; private ChannelLogger channelLogger;
private Executor executor; private Executor executor;
private String overrideAuthority; private String overrideAuthority;
private MetricRecorder metricRecorder;
Builder() { Builder() {
} }
@ -547,6 +561,14 @@ public abstract class NameResolver {
return this; return this;
} }
/**
* See {@link Args#getMetricRecorder()}. This is an optional field.
*/
public Builder setMetricRecorder(MetricRecorder metricRecorder) {
this.metricRecorder = metricRecorder;
return this;
}
/** /**
* Builds an {@link Args}. * Builds an {@link Args}.
* *
@ -556,7 +578,8 @@ public abstract class NameResolver {
return return
new Args( new Args(
defaultPort, proxyDetector, syncContext, serviceConfigParser, defaultPort, proxyDetector, syncContext, serviceConfigParser,
scheduledExecutorService, channelLogger, executor, overrideAuthority); scheduledExecutorService, channelLogger, executor, overrideAuthority,
metricRecorder);
} }
} }
} }

View File

@ -64,6 +64,7 @@ public class NameResolverTest {
private final ChannelLogger channelLogger = mock(ChannelLogger.class); private final ChannelLogger channelLogger = mock(ChannelLogger.class);
private final Executor executor = Executors.newSingleThreadExecutor(); private final Executor executor = Executors.newSingleThreadExecutor();
private final String overrideAuthority = "grpc.io"; private final String overrideAuthority = "grpc.io";
private final MetricRecorder metricRecorder = new MetricRecorder() {};
@Mock NameResolver.Listener mockListener; @Mock NameResolver.Listener mockListener;
@Test @Test
@ -77,6 +78,7 @@ public class NameResolverTest {
assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger); assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor); assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);
assertThat(args.getOverrideAuthority()).isSameInstanceAs(overrideAuthority); assertThat(args.getOverrideAuthority()).isSameInstanceAs(overrideAuthority);
assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder);
NameResolver.Args args2 = args.toBuilder().build(); NameResolver.Args args2 = args.toBuilder().build();
assertThat(args2.getDefaultPort()).isEqualTo(defaultPort); assertThat(args2.getDefaultPort()).isEqualTo(defaultPort);
@ -87,6 +89,7 @@ public class NameResolverTest {
assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger); assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor); assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);
assertThat(args2.getOverrideAuthority()).isSameInstanceAs(overrideAuthority); assertThat(args2.getOverrideAuthority()).isSameInstanceAs(overrideAuthority);
assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder);
assertThat(args2).isNotSameInstanceAs(args); assertThat(args2).isNotSameInstanceAs(args);
assertThat(args2).isNotEqualTo(args); assertThat(args2).isNotEqualTo(args);
@ -102,6 +105,7 @@ public class NameResolverTest {
.setChannelLogger(channelLogger) .setChannelLogger(channelLogger)
.setOffloadExecutor(executor) .setOffloadExecutor(executor)
.setOverrideAuthority(overrideAuthority) .setOverrideAuthority(overrideAuthority)
.setMetricRecorder(metricRecorder)
.build(); .build();
} }

View File

@ -589,6 +589,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
builder.maxHedgedAttempts, builder.maxHedgedAttempts,
loadBalancerFactory); loadBalancerFactory);
this.authorityOverride = builder.authorityOverride; this.authorityOverride = builder.authorityOverride;
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
this.nameResolverArgs = this.nameResolverArgs =
NameResolver.Args.newBuilder() NameResolver.Args.newBuilder()
.setDefaultPort(builder.getDefaultPort()) .setDefaultPort(builder.getDefaultPort())
@ -599,6 +601,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
.setChannelLogger(channelLogger) .setChannelLogger(channelLogger)
.setOffloadExecutor(this.offloadExecutorHolder) .setOffloadExecutor(this.offloadExecutorHolder)
.setOverrideAuthority(this.authorityOverride) .setOverrideAuthority(this.authorityOverride)
.setMetricRecorder(this.metricRecorder)
.build(); .build();
this.nameResolver = getNameResolver( this.nameResolver = getNameResolver(
targetUri, authorityOverride, nameResolverProvider, nameResolverArgs); targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
@ -671,8 +674,6 @@ final class ManagedChannelImpl extends ManagedChannel implements
} }
serviceConfigUpdated = true; serviceConfigUpdated = true;
} }
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
} }
@VisibleForTesting @VisibleForTesting

View File

@ -687,6 +687,30 @@ public class ManagedChannelImplTest {
eq(optionalLabelValues)); eq(optionalLabelValues));
} }
@Test
public void metricRecorder_fromNameResolverArgs_recordsToMetricSink() {
MetricSink mockSink1 = mock(MetricSink.class);
MetricSink mockSink2 = mock(MetricSink.class);
channelBuilder.addMetricSink(mockSink1);
channelBuilder.addMetricSink(mockSink2);
createChannel();
LongCounterMetricInstrument counter = metricInstrumentRegistry.registerLongCounter(
"test_counter", "Time taken by metric recorder", "s",
ImmutableList.of("grpc.method"), Collections.emptyList(), false);
List<String> requiredLabelValues = ImmutableList.of("testMethod");
List<String> optionalLabelValues = Collections.emptyList();
NameResolver.Args args = helper.getNameResolverArgs();
assertThat(args.getMetricRecorder()).isNotNull();
args.getMetricRecorder()
.addLongCounter(counter, 10, requiredLabelValues, optionalLabelValues);
verify(mockSink1).addLongCounter(eq(counter), eq(10L), eq(requiredLabelValues),
eq(optionalLabelValues));
verify(mockSink2).addLongCounter(eq(counter), eq(10L), eq(requiredLabelValues),
eq(optionalLabelValues));
}
@Test @Test
public void shutdownWithNoTransportsEverCreated() { public void shutdownWithNoTransportsEverCreated() {
channelBuilder.nameResolverFactory( channelBuilder.nameResolverFactory(
@ -2240,6 +2264,7 @@ public class ManagedChannelImplTest {
assertThat(args.getSynchronizationContext()) assertThat(args.getSynchronizationContext())
.isSameInstanceAs(helper.getSynchronizationContext()); .isSameInstanceAs(helper.getSynchronizationContext());
assertThat(args.getServiceConfigParser()).isNotNull(); assertThat(args.getServiceConfigParser()).isNotNull();
assertThat(args.getMetricRecorder()).isNotNull();
} }
@Test @Test

View File

@ -17,6 +17,7 @@
package io.grpc.xds; package io.grpc.xds;
import io.grpc.Internal; import io.grpc.Internal;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsInitializationException; import io.grpc.xds.client.XdsInitializationException;
@ -36,6 +37,11 @@ public final class InternalSharedXdsClientPoolProvider {
public static ObjectPool<XdsClient> getOrCreate(String target) public static ObjectPool<XdsClient> getOrCreate(String target)
throws XdsInitializationException { throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target); return getOrCreate(target, new MetricRecorder() {});
}
public static ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target, metricRecorder);
} }
} }

View File

@ -21,6 +21,7 @@ import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import io.grpc.MetricRecorder;
import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
@ -51,6 +52,8 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private static final boolean LOG_XDS_NODE_ID = Boolean.parseBoolean( private static final boolean LOG_XDS_NODE_ID = Boolean.parseBoolean(
System.getenv("GRPC_LOG_XDS_NODE_ID")); System.getenv("GRPC_LOG_XDS_NODE_ID"));
private static final Logger log = Logger.getLogger(XdsClientImpl.class.getName()); private static final Logger log = Logger.getLogger(XdsClientImpl.class.getName());
private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
new ExponentialBackoffPolicy.Provider();
private final Bootstrapper bootstrapper; private final Bootstrapper bootstrapper;
private final Object lock = new Object(); private final Object lock = new Object();
@ -82,7 +85,8 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
} }
@Override @Override
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException { public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target); ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target);
if (ref == null) { if (ref == null) {
synchronized (lock) { synchronized (lock) {
@ -98,7 +102,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
if (bootstrapInfo.servers().isEmpty()) { if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided"); throw new XdsInitializationException("No xDS server provided");
} }
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target); ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target, metricRecorder);
targetToXdsClientMap.put(target, ref); targetToXdsClientMap.put(target, ref);
} }
} }
@ -111,19 +115,17 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
return ImmutableList.copyOf(targetToXdsClientMap.keySet()); return ImmutableList.copyOf(targetToXdsClientMap.keySet());
} }
private static class SharedXdsClientPoolProviderHolder { private static class SharedXdsClientPoolProviderHolder {
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider(); private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
} }
@ThreadSafe @ThreadSafe
@VisibleForTesting @VisibleForTesting
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> { class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
new ExponentialBackoffPolicy.Provider();
private final BootstrapInfo bootstrapInfo; private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client. private final String target; // The target associated with the xDS client.
private final MetricRecorder metricRecorder;
private final Object lock = new Object(); private final Object lock = new Object();
@GuardedBy("lock") @GuardedBy("lock")
private ScheduledExecutorService scheduler; private ScheduledExecutorService scheduler;
@ -131,11 +133,15 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private XdsClient xdsClient; private XdsClient xdsClient;
@GuardedBy("lock") @GuardedBy("lock")
private int refCount; private int refCount;
@GuardedBy("lock")
private XdsClientMetricReporterImpl metricReporter;
@VisibleForTesting @VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) { RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target,
MetricRecorder metricRecorder) {
this.bootstrapInfo = checkNotNull(bootstrapInfo); this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target; this.target = target;
this.metricRecorder = metricRecorder;
} }
@Override @Override
@ -146,6 +152,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
log.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId()); log.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
} }
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
metricReporter = new XdsClientMetricReporterImpl(metricRecorder, target);
xdsClient = new XdsClientImpl( xdsClient = new XdsClientImpl(
DEFAULT_XDS_TRANSPORT_FACTORY, DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo, bootstrapInfo,
@ -154,7 +161,9 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
GrpcUtil.STOPWATCH_SUPPLIER, GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER, TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE, MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo)); new TlsContextManagerImpl(bootstrapInfo),
metricReporter);
metricReporter.setXdsClient(xdsClient);
} }
refCount++; refCount++;
return xdsClient; return xdsClient;
@ -168,6 +177,9 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
if (refCount == 0) { if (refCount == 0) {
xdsClient.shutdown(); xdsClient.shutdown();
xdsClient = null; xdsClient = null;
metricReporter.close();
metricReporter = null;
targetToXdsClientMap.remove(target);
scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler); scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
} }
return null; return null;

View File

@ -0,0 +1,215 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricRecorder;
import io.grpc.MetricRecorder.BatchCallback;
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceMetadata;
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ServerConnectionCallback;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsResourceType;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* XdsClientMetricReporter implementation.
*/
final class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
private static final Logger logger = Logger.getLogger(
XdsClientMetricReporterImpl.class.getName());
private static final LongCounterMetricInstrument SERVER_FAILURE_COUNTER;
private static final LongCounterMetricInstrument RESOURCE_UPDATES_VALID_COUNTER;
private static final LongCounterMetricInstrument RESOURCE_UPDATES_INVALID_COUNTER;
private static final LongGaugeMetricInstrument CONNECTED_GAUGE;
private static final LongGaugeMetricInstrument RESOURCES_GAUGE;
private final MetricRecorder metricRecorder;
private final String target;
@Nullable
private Registration gaugeRegistration = null;
static {
MetricInstrumentRegistry metricInstrumentRegistry
= MetricInstrumentRegistry.getDefaultRegistry();
SERVER_FAILURE_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.xds_client.server_failure",
"EXPERIMENTAL. A counter of xDS servers going from healthy to unhealthy. A server goes"
+ " unhealthy when we have a connectivity failure or when the ADS stream fails without"
+ " seeing a response message, as per gRFC A57.", "{failure}",
Arrays.asList("grpc.target", "grpc.xds.server"), Collections.emptyList(), false);
RESOURCE_UPDATES_VALID_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.xds_client.resource_updates_valid",
"EXPERIMENTAL. A counter of resources received that were considered valid. The counter will"
+ " be incremented even for resources that have not changed.", "{resource}",
Arrays.asList("grpc.target", "grpc.xds.server", "grpc.xds.resource_type"),
Collections.emptyList(), false);
RESOURCE_UPDATES_INVALID_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.xds_client.resource_updates_invalid",
"EXPERIMENTAL. A counter of resources received that were considered invalid.", "{resource}",
Arrays.asList("grpc.target", "grpc.xds.server", "grpc.xds.resource_type"),
Collections.emptyList(), false);
CONNECTED_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.xds_client.connected",
"EXPERIMENTAL. Whether or not the xDS client currently has a working ADS stream to the xDS"
+ " server. For a given server, this will be set to 1 when the stream is initially"
+ " created. It will be set to 0 when we have a connectivity failure or when the ADS"
+ " stream fails without seeing a response message, as per gRFC A57. Once set to 0, it"
+ " will be reset to 1 when we receive the first response on an ADS stream.", "{bool}",
Arrays.asList("grpc.target", "grpc.xds.server"), Collections.emptyList(), false);
RESOURCES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.xds_client.resources",
"EXPERIMENTAL. Number of xDS resources.", "{resource}",
Arrays.asList("grpc.target", "grpc.xds.authority", "grpc.xds.cache_state",
"grpc.xds.resource_type"), Collections.emptyList(), false);
}
XdsClientMetricReporterImpl(MetricRecorder metricRecorder, String target) {
this.metricRecorder = metricRecorder;
this.target = target;
}
@Override
public void reportResourceUpdates(long validResourceCount, long invalidResourceCount,
String xdsServer, String resourceType) {
metricRecorder.addLongCounter(RESOURCE_UPDATES_VALID_COUNTER, validResourceCount,
Arrays.asList(target, xdsServer, resourceType), Collections.emptyList());
metricRecorder.addLongCounter(RESOURCE_UPDATES_INVALID_COUNTER, invalidResourceCount,
Arrays.asList(target, xdsServer, resourceType), Collections.emptyList());
}
@Override
public void reportServerFailure(long serverFailure, String xdsServer) {
metricRecorder.addLongCounter(SERVER_FAILURE_COUNTER, serverFailure,
Arrays.asList(target, xdsServer), Collections.emptyList());
}
void setXdsClient(XdsClient xdsClient) {
assert gaugeRegistration == null;
// register gauge here
this.gaugeRegistration = metricRecorder.registerBatchCallback(new BatchCallback() {
@Override
public void accept(BatchRecorder recorder) {
reportCallbackMetrics(recorder, xdsClient);
}
}, CONNECTED_GAUGE, RESOURCES_GAUGE);
}
void close() {
if (gaugeRegistration != null) {
gaugeRegistration.close();
gaugeRegistration = null;
}
}
void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
MetricReporterCallback callback = new MetricReporterCallback(recorder, target);
try {
Future<Void> reportServerConnectionsCompleted = xdsClient.reportServerConnections(callback);
ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getResourceMetadataCompleted = xdsClient.getSubscribedResourcesMetadataSnapshot();
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
getResourceMetadataCompleted.get(10, TimeUnit.SECONDS);
computeAndReportResourceCounts(metadataByType, callback);
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt(); // re-set the current thread's interruption state
}
logger.log(Level.WARNING, "Failed to report gauge metrics", e);
}
}
private void computeAndReportResourceCounts(
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
MetricReporterCallback callback) {
for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry :
metadataByType.entrySet()) {
XdsResourceType<?> type = metadataByTypeEntry.getKey();
Map<String, Long> resourceCountsByState = new HashMap<>();
for (ResourceMetadata metadata : metadataByTypeEntry.getValue().values()) {
String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached());
resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1);
}
resourceCountsByState.forEach((cacheState, count) ->
callback.reportResourceCountGauge(count, cacheState, type.typeUrl()));
}
}
private static String cacheStateFromResourceStatus(ResourceMetadataStatus metadataStatus,
boolean isResourceCached) {
switch (metadataStatus) {
case REQUESTED:
return "requested";
case DOES_NOT_EXIST:
return "does_not_exist";
case ACKED:
return "acked";
case NACKED:
return isResourceCached ? "nacked_but_cached" : "nacked";
default:
return "unknown";
}
}
@VisibleForTesting
static final class MetricReporterCallback implements ServerConnectionCallback {
private final BatchRecorder recorder;
private final String target;
MetricReporterCallback(BatchRecorder recorder, String target) {
this.recorder = recorder;
this.target = target;
}
// TODO(dnvindhya): include the "authority" label once xds.authority is available.
void reportResourceCountGauge(long resourceCount, String cacheState,
String resourceType) {
recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount,
Arrays.asList(target, cacheState, resourceType), Collections.emptyList());
}
@Override
public void reportServerConnectionGauge(boolean isConnected, String xdsServer) {
recorder.recordLongGauge(CONNECTED_GAUGE, isConnected ? 1 : 0,
Arrays.asList(target, xdsServer), Collections.emptyList());
}
}
}

View File

@ -16,6 +16,7 @@
package io.grpc.xds; package io.grpc.xds;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsInitializationException; import io.grpc.xds.client.XdsInitializationException;
@ -29,7 +30,8 @@ interface XdsClientPoolFactory {
@Nullable @Nullable
ObjectPool<XdsClient> get(String target); ObjectPool<XdsClient> get(String target);
ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException; ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException;
List<String> getTargets(); List<String> getTargets();
} }

View File

@ -41,6 +41,7 @@ import io.grpc.InternalLogId;
import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MetricRecorder;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.Status.Code; import io.grpc.Status.Code;
@ -125,6 +126,7 @@ final class XdsNameResolver extends NameResolver {
private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
private final ConfigSelector configSelector = new ConfigSelector(); private final ConfigSelector configSelector = new ConfigSelector();
private final long randomChannelId; private final long randomChannelId;
private final MetricRecorder metricRecorder;
private volatile RoutingConfig routingConfig = RoutingConfig.empty; private volatile RoutingConfig routingConfig = RoutingConfig.empty;
private Listener2 listener; private Listener2 listener;
@ -140,10 +142,12 @@ final class XdsNameResolver extends NameResolver {
URI targetUri, String name, @Nullable String overrideAuthority, URI targetUri, String name, @Nullable String overrideAuthority,
ServiceConfigParser serviceConfigParser, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler, SynchronizationContext syncContext, ScheduledExecutorService scheduler,
@Nullable Map<String, ?> bootstrapOverride) { @Nullable Map<String, ?> bootstrapOverride,
MetricRecorder metricRecorder) {
this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser, this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(), syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride); ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride,
metricRecorder);
} }
@VisibleForTesting @VisibleForTesting
@ -152,7 +156,8 @@ final class XdsNameResolver extends NameResolver {
@Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser, @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler, SynchronizationContext syncContext, ScheduledExecutorService scheduler,
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random, XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) { FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride,
MetricRecorder metricRecorder) {
this.targetAuthority = targetAuthority; this.targetAuthority = targetAuthority;
target = targetUri.toString(); target = targetUri.toString();
@ -170,6 +175,7 @@ final class XdsNameResolver extends NameResolver {
this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride); this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
this.random = checkNotNull(random, "random"); this.random = checkNotNull(random, "random");
this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry"); this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
this.metricRecorder = metricRecorder;
randomChannelId = random.nextLong(); randomChannelId = random.nextLong();
logId = InternalLogId.allocate("xds-resolver", name); logId = InternalLogId.allocate("xds-resolver", name);
logger = XdsLogger.withLogId(logId); logger = XdsLogger.withLogId(logId);
@ -185,7 +191,7 @@ final class XdsNameResolver extends NameResolver {
public void start(Listener2 listener) { public void start(Listener2 listener) {
this.listener = checkNotNull(listener, "listener"); this.listener = checkNotNull(listener, "listener");
try { try {
xdsClientPool = xdsClientPoolFactory.getOrCreate(target); xdsClientPool = xdsClientPoolFactory.getOrCreate(target, metricRecorder);
} catch (Exception e) { } catch (Exception e) {
listener.onError( listener.onError(
Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e)); Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));

View File

@ -81,7 +81,8 @@ public final class XdsNameResolverProvider extends NameResolverProvider {
targetUri, name, args.getOverrideAuthority(), targetUri, name, args.getOverrideAuthority(),
args.getServiceConfigParser(), args.getSynchronizationContext(), args.getServiceConfigParser(), args.getSynchronizationContext(),
args.getScheduledExecutorService(), args.getScheduledExecutorService(),
bootstrapOverride); bootstrapOverride,
args.getMetricRecorder());
} }
return null; return null;
} }

View File

@ -29,6 +29,7 @@ import io.grpc.Attributes;
import io.grpc.InternalServerInterceptors; import io.grpc.InternalServerInterceptors;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MetricRecorder;
import io.grpc.Server; import io.grpc.Server;
import io.grpc.ServerBuilder; import io.grpc.ServerBuilder;
import io.grpc.ServerCall; import io.grpc.ServerCall;
@ -171,7 +172,9 @@ final class XdsServerWrapper extends Server {
private void internalStart() { private void internalStart() {
try { try {
xdsClientPool = xdsClientPoolFactory.getOrCreate(""); // TODO(dnvindhya): Add "#server" as "grpc.target" attribute value for
// xDS enabled servers.
xdsClientPool = xdsClientPoolFactory.getOrCreate("", new MetricRecorder() {});
} catch (Exception e) { } catch (Exception e) {
StatusException statusException = Status.UNAVAILABLE.withDescription( StatusException statusException = Status.UNAVAILABLE.withDescription(
"Failed to initialize xDS").withCause(e).asException(); "Failed to initialize xDS").withCause(e).asException();

View File

@ -78,6 +78,7 @@ final class ControlPlaneClient {
private final Map<XdsResourceType<?>, String> versions = new HashMap<>(); private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
private boolean shutdown; private boolean shutdown;
private boolean streamClosedNoResponse;
@Nullable @Nullable
private AdsStream adsStream; private AdsStream adsStream;
@Nullable @Nullable
@ -224,6 +225,19 @@ final class ControlPlaneClient {
xdsClient.startSubscriberTimersIfNeeded(serverInfo); xdsClient.startSubscriberTimersIfNeeded(serverInfo);
} }
/**
* Indicates whether there is an active ADS stream.
*
* <p>Return {@code true} when the {@code AdsStream} is created.
* {@code false} when the ADS stream fails without a response. Resets to true
* upon receiving the first response on a new ADS stream.
*/
// Must be synchronized
boolean hasWorkingAdsStream() {
return !streamClosedNoResponse;
}
/** /**
* Establishes the RPC connection by creating a new RPC stream on the given channel for * Establishes the RPC connection by creating a new RPC stream on the given channel for
* xDS protocol communication. * xDS protocol communication.
@ -332,6 +346,8 @@ final class ControlPlaneClient {
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
// Reset flag as message has been received on a stream
streamClosedNoResponse = false;
XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl()); XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) { if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log( logger.log(
@ -408,6 +424,7 @@ final class ControlPlaneClient {
"ADS stream closed by server after a response was received"); "ADS stream closed by server after a response was received");
} }
} else { } else {
streamClosedNoResponse = true;
// If the ADS stream is closed without ever having received a response from the server, then // If the ADS stream is closed without ever having received a response from the server, then
// the XdsClient should consider that a connectivity error (see gRFC A57). // the XdsClient should consider that a connectivity error (see gRFC A57).
if (status.isOk()) { if (status.isOk()) {

View File

@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -154,44 +155,46 @@ public abstract class XdsClient {
private final String version; private final String version;
private final ResourceMetadataStatus status; private final ResourceMetadataStatus status;
private final long updateTimeNanos; private final long updateTimeNanos;
private final boolean cached;
@Nullable private final Any rawResource; @Nullable private final Any rawResource;
@Nullable private final UpdateFailureState errorState; @Nullable private final UpdateFailureState errorState;
private ResourceMetadata( private ResourceMetadata(
ResourceMetadataStatus status, String version, long updateTimeNanos, ResourceMetadataStatus status, String version, long updateTimeNanos, boolean cached,
@Nullable Any rawResource, @Nullable UpdateFailureState errorState) { @Nullable Any rawResource, @Nullable UpdateFailureState errorState) {
this.status = checkNotNull(status, "status"); this.status = checkNotNull(status, "status");
this.version = checkNotNull(version, "version"); this.version = checkNotNull(version, "version");
this.updateTimeNanos = updateTimeNanos; this.updateTimeNanos = updateTimeNanos;
this.cached = cached;
this.rawResource = rawResource; this.rawResource = rawResource;
this.errorState = errorState; this.errorState = errorState;
} }
static ResourceMetadata newResourceMetadataUnknown() { public static ResourceMetadata newResourceMetadataUnknown() {
return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, null, null); return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, false,null, null);
} }
static ResourceMetadata newResourceMetadataRequested() { public static ResourceMetadata newResourceMetadataRequested() {
return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, null, null); return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, false, null, null);
} }
static ResourceMetadata newResourceMetadataDoesNotExist() { public static ResourceMetadata newResourceMetadataDoesNotExist() {
return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, null, null); return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false, null, null);
} }
public static ResourceMetadata newResourceMetadataAcked( public static ResourceMetadata newResourceMetadataAcked(
Any rawResource, String version, long updateTimeNanos) { Any rawResource, String version, long updateTimeNanos) {
checkNotNull(rawResource, "rawResource"); checkNotNull(rawResource, "rawResource");
return new ResourceMetadata( return new ResourceMetadata(
ResourceMetadataStatus.ACKED, version, updateTimeNanos, rawResource, null); ResourceMetadataStatus.ACKED, version, updateTimeNanos, true, rawResource, null);
} }
static ResourceMetadata newResourceMetadataNacked( public static ResourceMetadata newResourceMetadataNacked(
ResourceMetadata metadata, String failedVersion, long failedUpdateTime, ResourceMetadata metadata, String failedVersion, long failedUpdateTime,
String failedDetails) { String failedDetails, boolean cached) {
checkNotNull(metadata, "metadata"); checkNotNull(metadata, "metadata");
return new ResourceMetadata(ResourceMetadataStatus.NACKED, return new ResourceMetadata(ResourceMetadataStatus.NACKED,
metadata.getVersion(), metadata.getUpdateTimeNanos(), metadata.getRawResource(), metadata.getVersion(), metadata.getUpdateTimeNanos(), cached, metadata.getRawResource(),
new UpdateFailureState(failedVersion, failedUpdateTime, failedDetails)); new UpdateFailureState(failedVersion, failedUpdateTime, failedDetails));
} }
@ -210,6 +213,11 @@ public abstract class XdsClient {
return updateTimeNanos; return updateTimeNanos;
} }
/** Returns whether the resource was cached. */
public boolean isCached() {
return cached;
}
/** The last successfully updated xDS resource as it was returned by the server. */ /** The last successfully updated xDS resource as it was returned by the server. */
@Nullable @Nullable
public Any getRawResource() { public Any getRawResource() {
@ -378,6 +386,23 @@ public abstract class XdsClient {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
/** Callback used to report a gauge metric value for server connections. */
public interface ServerConnectionCallback {
void reportServerConnectionGauge(boolean isConnected, String xdsServer);
}
/**
* Reports whether xDS client has a "working" ADS stream to xDS server. The definition of a
* working stream is defined in gRFC A78.
*
* @see <a
* href="https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient">
* A78-grpc-metrics-wrr-pf-xds.md</a>
*/
public Future<Void> reportServerConnections(ServerConnectionCallback callback) {
throw new UnsupportedOperationException();
}
static final class ProcessingTracker { static final class ProcessingTracker {
private final AtomicInteger pendingTask = new AtomicInteger(1); private final AtomicInteger pendingTask = new AtomicInteger(1);
private final Executor executor; private final Executor executor;

View File

@ -41,7 +41,6 @@ import io.grpc.internal.TimeProvider;
import io.grpc.xds.client.Bootstrapper.AuthorityInfo; import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.XdsClient.ResourceStore; import io.grpc.xds.client.XdsClient.ResourceStore;
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
import io.grpc.xds.client.XdsLogger.XdsLogLevel; import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
@ -52,6 +51,7 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -60,7 +60,7 @@ import javax.annotation.Nullable;
* XdsClient implementation. * XdsClient implementation.
*/ */
@Internal @Internal
public final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore { public final class XdsClientImpl extends XdsClient implements ResourceStore {
// Longest time to wait, since the subscription to some resource, for concluding its absence. // Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting @VisibleForTesting
@ -100,6 +100,7 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
private final XdsLogger logger; private final XdsLogger logger;
private volatile boolean isShutdown; private volatile boolean isShutdown;
private final MessagePrettyPrinter messagePrinter; private final MessagePrettyPrinter messagePrinter;
private final XdsClientMetricReporter metricReporter;
public XdsClientImpl( public XdsClientImpl(
XdsTransportFactory xdsTransportFactory, XdsTransportFactory xdsTransportFactory,
@ -109,7 +110,8 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
Supplier<Stopwatch> stopwatchSupplier, Supplier<Stopwatch> stopwatchSupplier,
TimeProvider timeProvider, TimeProvider timeProvider,
MessagePrettyPrinter messagePrinter, MessagePrettyPrinter messagePrinter,
Object securityConfig) { Object securityConfig,
XdsClientMetricReporter metricReporter) {
this.xdsTransportFactory = xdsTransportFactory; this.xdsTransportFactory = xdsTransportFactory;
this.bootstrapInfo = bootstrapInfo; this.bootstrapInfo = bootstrapInfo;
this.timeService = timeService; this.timeService = timeService;
@ -118,13 +120,13 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
this.timeProvider = timeProvider; this.timeProvider = timeProvider;
this.messagePrinter = messagePrinter; this.messagePrinter = messagePrinter;
this.securityConfig = securityConfig; this.securityConfig = securityConfig;
this.metricReporter = metricReporter;
logId = InternalLogId.allocate("xds-client", null); logId = InternalLogId.allocate("xds-client", null);
logger = XdsLogger.withLogId(logId); logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created"); logger.log(XdsLogLevel.INFO, "Created");
} }
@Override private void handleResourceResponse(
public void handleResourceResponse(
XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo, XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
List<Any> resources, String nonce, ProcessingTracker processingTracker) { List<Any> resources, String nonce, ProcessingTracker processingTracker) {
checkNotNull(xdsResourceType, "xdsResourceType"); checkNotNull(xdsResourceType, "xdsResourceType");
@ -138,11 +140,11 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
handleResourceUpdate(args, resources, xdsResourceType, processingTracker); handleResourceUpdate(args, resources, xdsResourceType, processingTracker);
} }
@Override private void handleStreamClosed(Status error, ServerInfo serverInfo) {
public void handleStreamClosed(Status error) {
syncContext.throwIfNotInThisSynchronizationContext(); syncContext.throwIfNotInThisSynchronizationContext();
cleanUpResourceTimers(); cleanUpResourceTimers();
if (!error.isOk()) { if (!error.isOk()) {
metricReporter.reportServerFailure(1L, serverInfo.target());
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap : for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) { resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) { for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
@ -154,8 +156,7 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
} }
} }
@Override private void handleStreamRestarted(ServerInfo serverInfo) {
public void handleStreamRestarted(ServerInfo serverInfo) {
syncContext.throwIfNotInThisSynchronizationContext(); syncContext.throwIfNotInThisSynchronizationContext();
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap : for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) { resourceSubscribers.values()) {
@ -394,7 +395,27 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
xdsTransport, xdsTransport,
serverInfo, serverInfo,
bootstrapInfo.node(), bootstrapInfo.node(),
this, new XdsResponseHandler() {
@Override
public void handleResourceResponse(
XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
List<Any> resources, String nonce, ProcessingTracker processingTracker) {
XdsClientImpl.this.handleResourceResponse(resourceType, serverInfo, versionInfo,
resources, nonce,
processingTracker);
}
@Override
public void handleStreamClosed(Status error) {
XdsClientImpl.this.handleStreamClosed(error, serverInfo);
}
@Override
public void handleStreamRestarted(ServerInfo serverInfo) {
XdsClientImpl.this.handleStreamRestarted(serverInfo);
}
},
this, this,
timeService, timeService,
syncContext, syncContext,
@ -448,6 +469,10 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources); xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
Map<String, ParsedResource<T>> parsedResources = result.parsedResources; Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
Set<String> invalidResources = result.invalidResources; Set<String> invalidResources = result.invalidResources;
metricReporter.reportResourceUpdates(Long.valueOf(parsedResources.size()),
Long.valueOf(invalidResources.size()),
args.getServerInfo().target(), xdsResourceType.typeUrl());
List<String> errors = result.errors; List<String> errors = result.errors;
String errorDetail = null; String errorDetail = null;
if (errors.isEmpty()) { if (errors.isEmpty()) {
@ -504,9 +529,19 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
} }
} }
/** @Override
* Tracks a single subscribed resource. public Future<Void> reportServerConnections(ServerConnectionCallback callback) {
*/ SettableFuture<Void> future = SettableFuture.create();
syncContext.execute(() -> {
serverCpClientMap.forEach((serverInfo, controlPlaneClient) ->
callback.reportServerConnectionGauge(
controlPlaneClient.hasWorkingAdsStream(), serverInfo.target()));
future.set(null);
});
return future;
}
/** Tracks a single subscribed resource. */
private final class ResourceSubscriber<T extends ResourceUpdate> { private final class ResourceSubscriber<T extends ResourceUpdate> {
@Nullable private final ServerInfo serverInfo; @Nullable private final ServerInfo serverInfo;
@Nullable private final ControlPlaneClient controlPlaneClient; @Nullable private final ControlPlaneClient controlPlaneClient;
@ -644,10 +679,10 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
respTimer.cancel(); respTimer.cancel();
respTimer = null; respTimer = null;
} }
this.metadata = ResourceMetadata
.newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
ResourceUpdate oldData = this.data; ResourceUpdate oldData = this.data;
this.data = parsedResource.getResourceUpdate(); this.data = parsedResource.getResourceUpdate();
this.metadata = ResourceMetadata
.newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
absent = false; absent = false;
if (resourceDeletionIgnored) { if (resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version " logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
@ -741,7 +776,8 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) { void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
metadata = ResourceMetadata metadata = ResourceMetadata
.newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails); .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails,
data != null);
} }
private void notifyWatcher(ResourceWatcher<T> watcher, T update) { private void notifyWatcher(ResourceWatcher<T> watcher, T update) {

View File

@ -0,0 +1,48 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds.client;
import io.grpc.Internal;
/**
* Interface for reporting metrics from the xDS client.
*/
@Internal
public interface XdsClientMetricReporter {
/**
* Reports number of valid and invalid resources.
*
* @param validResourceCount Number of resources that were valid.
* @param invalidResourceCount Number of resources that were invalid.
* @param xdsServer Target URI of the xDS server with which the XdsClient is communicating.
* @param resourceType Type of XDS resource (e.g., "envoy.config.listener.v3.Listener").
*/
default void reportResourceUpdates(long validResourceCount, long invalidResourceCount,
String xdsServer, String resourceType) {
}
/**
* Reports number of xDS servers going from healthy to unhealthy.
*
* @param serverFailure Number of xDS server failures.
* @param xdsServer Target URI of the xDS server with which the XdsClient is communicating.
*/
default void reportServerFailure(long serverFailure, String xdsServer) {
}
}

View File

@ -39,6 +39,7 @@ import io.envoyproxy.envoy.service.status.v3.ClientStatusResponse;
import io.envoyproxy.envoy.type.matcher.v3.NodeMatcher; import io.envoyproxy.envoy.type.matcher.v3.NodeMatcher;
import io.grpc.Deadline; import io.grpc.Deadline;
import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureChannelCredentials;
import io.grpc.MetricRecorder;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.Status.Code; import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
@ -553,8 +554,9 @@ public class CsdsServiceTest {
throw new UnsupportedOperationException("Should not be called"); throw new UnsupportedOperationException("Should not be called");
} }
@Override @Override
public ObjectPool<XdsClient> getOrCreate(String target) { public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder) {
throw new UnsupportedOperationException("Should not be called"); throw new UnsupportedOperationException("Should not be called");
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -23,6 +23,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureChannelCredentials;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
import io.grpc.xds.SharedXdsClientPoolProvider.RefCountedXdsClientObjectPool; import io.grpc.xds.SharedXdsClientPoolProvider.RefCountedXdsClientObjectPool;
import io.grpc.xds.client.Bootstrapper.BootstrapInfo; import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
@ -51,6 +52,7 @@ public class SharedXdsClientPoolProviderTest {
@Rule @Rule
public final ExpectedException thrown = ExpectedException.none(); public final ExpectedException thrown = ExpectedException.none();
private final Node node = Node.newBuilder().setId("SharedXdsClientPoolProviderTest").build(); private final Node node = Node.newBuilder().setId("SharedXdsClientPoolProviderTest").build();
private final MetricRecorder metricRecorder = new MetricRecorder() {};
private static final String DUMMY_TARGET = "dummy"; private static final String DUMMY_TARGET = "dummy";
@Mock @Mock
@ -64,7 +66,7 @@ public class SharedXdsClientPoolProviderTest {
SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper); SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper);
thrown.expect(XdsInitializationException.class); thrown.expect(XdsInitializationException.class);
thrown.expectMessage("No xDS server provided"); thrown.expectMessage("No xDS server provided");
provider.getOrCreate(DUMMY_TARGET); provider.getOrCreate(DUMMY_TARGET, metricRecorder);
assertThat(provider.get(DUMMY_TARGET)).isNull(); assertThat(provider.get(DUMMY_TARGET)).isNull();
} }
@ -77,9 +79,9 @@ public class SharedXdsClientPoolProviderTest {
SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper); SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper);
assertThat(provider.get(DUMMY_TARGET)).isNull(); assertThat(provider.get(DUMMY_TARGET)).isNull();
ObjectPool<XdsClient> xdsClientPool = provider.getOrCreate(DUMMY_TARGET); ObjectPool<XdsClient> xdsClientPool = provider.getOrCreate(DUMMY_TARGET, metricRecorder);
verify(bootstrapper).bootstrap(); verify(bootstrapper).bootstrap();
assertThat(provider.getOrCreate(DUMMY_TARGET)).isSameInstanceAs(xdsClientPool); assertThat(provider.getOrCreate(DUMMY_TARGET, metricRecorder)).isSameInstanceAs(xdsClientPool);
assertThat(provider.get(DUMMY_TARGET)).isNotNull(); assertThat(provider.get(DUMMY_TARGET)).isNotNull();
assertThat(provider.get(DUMMY_TARGET)).isSameInstanceAs(xdsClientPool); assertThat(provider.get(DUMMY_TARGET)).isSameInstanceAs(xdsClientPool);
verifyNoMoreInteractions(bootstrapper); verifyNoMoreInteractions(bootstrapper);
@ -90,8 +92,9 @@ public class SharedXdsClientPoolProviderTest {
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create()); ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create());
BootstrapInfo bootstrapInfo = BootstrapInfo bootstrapInfo =
BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build();
SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper);
RefCountedXdsClientObjectPool xdsClientPool = RefCountedXdsClientObjectPool xdsClientPool =
new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET); provider.new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET, metricRecorder);
assertThat(xdsClientPool.getXdsClientForTest()).isNull(); assertThat(xdsClientPool.getXdsClientForTest()).isNull();
XdsClient xdsClient = xdsClientPool.getObject(); XdsClient xdsClient = xdsClientPool.getObject();
assertThat(xdsClientPool.getXdsClientForTest()).isNotNull(); assertThat(xdsClientPool.getXdsClientForTest()).isNotNull();
@ -103,8 +106,9 @@ public class SharedXdsClientPoolProviderTest {
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create()); ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create());
BootstrapInfo bootstrapInfo = BootstrapInfo bootstrapInfo =
BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build();
SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper);
RefCountedXdsClientObjectPool xdsClientPool = RefCountedXdsClientObjectPool xdsClientPool =
new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET); provider.new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET, metricRecorder);
// getObject once // getObject once
XdsClient xdsClient = xdsClientPool.getObject(); XdsClient xdsClient = xdsClientPool.getObject();
assertThat(xdsClient).isNotNull(); assertThat(xdsClient).isNotNull();
@ -123,8 +127,9 @@ public class SharedXdsClientPoolProviderTest {
ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create()); ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create());
BootstrapInfo bootstrapInfo = BootstrapInfo bootstrapInfo =
BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build();
SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper);
RefCountedXdsClientObjectPool xdsClientPool = RefCountedXdsClientObjectPool xdsClientPool =
new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET); provider.new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET, metricRecorder);
XdsClient xdsClient1 = xdsClientPool.getObject(); XdsClient xdsClient1 = xdsClientPool.getObject();
assertThat(xdsClientPool.returnObject(xdsClient1)).isNull(); assertThat(xdsClientPool.returnObject(xdsClient1)).isNull();
assertThat(xdsClient1.isShutDown()).isTrue(); assertThat(xdsClient1.isShutDown()).isTrue();

View File

@ -23,6 +23,7 @@ import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
import io.grpc.xds.Filter.NamedFilterConfig; import io.grpc.xds.Filter.NamedFilterConfig;
import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.XdsListenerResource.LdsUpdate;
@ -73,12 +74,13 @@ public class XdsClientFederationTest {
private ObjectPool<XdsClient> xdsClientPool; private ObjectPool<XdsClient> xdsClientPool;
private XdsClient xdsClient; private XdsClient xdsClient;
private static final String DUMMY_TARGET = "dummy"; private static final String DUMMY_TARGET = "dummy";
private final MetricRecorder metricRecorder = new MetricRecorder() {};
@Before @Before
public void setUp() throws XdsInitializationException { public void setUp() throws XdsInitializationException {
SharedXdsClientPoolProvider clientPoolProvider = new SharedXdsClientPoolProvider(); SharedXdsClientPoolProvider clientPoolProvider = new SharedXdsClientPoolProvider();
clientPoolProvider.setBootstrapOverride(defaultBootstrapOverride()); clientPoolProvider.setBootstrapOverride(defaultBootstrapOverride());
xdsClientPool = clientPoolProvider.getOrCreate(DUMMY_TARGET); xdsClientPool = clientPoolProvider.getOrCreate(DUMMY_TARGET, metricRecorder);
xdsClient = xdsClientPool.getObject(); xdsClient = xdsClientPool.getObject();
} }

View File

@ -0,0 +1,394 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.grpc.MetricInstrument;
import io.grpc.MetricRecorder;
import io.grpc.MetricRecorder.BatchCallback;
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricSink;
import io.grpc.xds.XdsClientMetricReporterImpl.MetricReporterCallback;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceMetadata;
import io.grpc.xds.client.XdsClient.ServerConnectionCallback;
import io.grpc.xds.client.XdsResourceType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/**
* Unit tests for {@link XdsClientMetricReporterImpl}.
*/
@RunWith(JUnit4.class)
public class XdsClientMetricReporterImplTest {
private static final String target = "test-target";
private static final String server = "trafficdirector.googleapis.com";
private static final String resourceTypeUrl =
"resourceTypeUrl.googleapis.com/envoy.config.cluster.v3.Cluster";
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
@Mock
private MetricRecorder mockMetricRecorder;
@Mock
private XdsClient mockXdsClient;
@Mock
private BatchRecorder mockBatchRecorder;
@Captor
private ArgumentCaptor<BatchCallback> gaugeBatchCallbackCaptor;
private XdsClientMetricReporterImpl reporter;
@Before
public void setUp() {
reporter = new XdsClientMetricReporterImpl(mockMetricRecorder, target);
}
@Test
public void reportResourceUpdates() {
// TODO(dnvindhya): add the "authority" label once available.
reporter.reportResourceUpdates(10, 5, server, resourceTypeUrl);
verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.xds_client.resource_updates_valid"), eq((long) 10),
eq(Lists.newArrayList(target, server, resourceTypeUrl)),
eq(Lists.newArrayList()));
verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.xds_client.resource_updates_invalid"),
eq((long) 5),
eq(Lists.newArrayList(target, server, resourceTypeUrl)),
eq(Lists.newArrayList()));
}
@Test
public void reportServerFailure() {
reporter.reportServerFailure(1, server);
verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.xds_client.server_failure"), eq((long) 1),
eq(Lists.newArrayList(target, server)),
eq(Lists.newArrayList()));
}
@Test
public void setXdsClient_reportMetrics() throws Exception {
SettableFuture<Void> future = SettableFuture.create();
future.set(null);
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
ImmutableMap.of()));
when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class)))
.thenReturn(future);
reporter.setXdsClient(mockXdsClient);
verify(mockMetricRecorder).registerBatchCallback(gaugeBatchCallbackCaptor.capture(),
eqMetricInstrumentName("grpc.xds_client.connected"),
eqMetricInstrumentName("grpc.xds_client.resources"));
gaugeBatchCallbackCaptor.getValue().accept(mockBatchRecorder);
verify(mockXdsClient).reportServerConnections(any(ServerConnectionCallback.class));
}
@Test
public void setXdsClient_reportCallbackMetrics_resourceCountsFails() {
TestlogHandler testLogHandler = new TestlogHandler();
Logger logger = Logger.getLogger(XdsClientMetricReporterImpl.class.getName());
logger.addHandler(testLogHandler);
// For reporting resource counts connections, return a normally completed future
SettableFuture<Void> future = SettableFuture.create();
future.set(null);
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
ImmutableMap.of()));
// Create a future that will throw an exception
SettableFuture<Void> serverConnectionsFeature = SettableFuture.create();
serverConnectionsFeature.setException(new Exception("test"));
when(mockXdsClient.reportServerConnections(any())).thenReturn(serverConnectionsFeature);
reporter.setXdsClient(mockXdsClient);
verify(mockMetricRecorder)
.registerBatchCallback(gaugeBatchCallbackCaptor.capture(), any(), any());
gaugeBatchCallbackCaptor.getValue().accept(mockBatchRecorder);
// Verify that the xdsClient methods were called
// verify(mockXdsClient).reportResourceCounts(any());
verify(mockXdsClient).reportServerConnections(any());
assertThat(testLogHandler.getLogs().size()).isEqualTo(1);
assertThat(testLogHandler.getLogs().get(0).getLevel()).isEqualTo(Level.WARNING);
assertThat(testLogHandler.getLogs().get(0).getMessage()).isEqualTo(
"Failed to report gauge metrics");
logger.removeHandler(testLogHandler);
}
@Test
public void metricGauges() {
SettableFuture<Void> future = SettableFuture.create();
future.set(null);
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
ImmutableMap.of()));
when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class)))
.thenReturn(future);
reporter.setXdsClient(mockXdsClient);
verify(mockMetricRecorder).registerBatchCallback(gaugeBatchCallbackCaptor.capture(),
eqMetricInstrumentName("grpc.xds_client.connected"),
eqMetricInstrumentName("grpc.xds_client.resources"));
BatchCallback gaugeBatchCallback = gaugeBatchCallbackCaptor.getValue();
InOrder inOrder = inOrder(mockBatchRecorder);
// Trigger the internal call to reportCallbackMetrics()
gaugeBatchCallback.accept(mockBatchRecorder);
ArgumentCaptor<ServerConnectionCallback> serverConnectionCallbackCaptor =
ArgumentCaptor.forClass(ServerConnectionCallback.class);
// verify(mockXdsClient).reportResourceCounts(resourceCallbackCaptor.capture());
verify(mockXdsClient).reportServerConnections(serverConnectionCallbackCaptor.capture());
// Get the captured callback
MetricReporterCallback callback = (MetricReporterCallback)
serverConnectionCallbackCaptor.getValue();
// Verify that reportResourceCounts and reportServerConnections were called
// with the captured callback
callback.reportResourceCountGauge(10, "acked", resourceTypeUrl);
inOrder.verify(mockBatchRecorder)
.recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), any(),
any());
callback.reportServerConnectionGauge(true, "xdsServer");
inOrder.verify(mockBatchRecorder)
.recordLongGauge(eqMetricInstrumentName("grpc.xds_client.connected"), eq(1L), any(), any());
inOrder.verifyNoMoreInteractions();
}
@Test
public void metricReporterCallback() {
MetricReporterCallback callback =
new MetricReporterCallback(mockBatchRecorder, target);
callback.reportServerConnectionGauge(true, server);
verify(mockBatchRecorder, times(1)).recordLongGauge(
eqMetricInstrumentName("grpc.xds_client.connected"), eq(1L),
eq(Lists.newArrayList(target, server)),
eq(Lists.newArrayList()));
String cacheState = "requested";
callback.reportResourceCountGauge(10, cacheState, resourceTypeUrl);
verify(mockBatchRecorder, times(1)).recordLongGauge(
eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L),
eq(Arrays.asList(target, cacheState, resourceTypeUrl)),
eq(Collections.emptyList()));
}
@Test
public void reportCallbackMetrics_computeAndReportResourceCounts() {
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType = new HashMap<>();
XdsResourceType<?> listenerResource = XdsListenerResource.getInstance();
XdsResourceType<?> routeConfigResource = XdsRouteConfigureResource.getInstance();
XdsResourceType<?> clusterResource = XdsClusterResource.getInstance();
Any rawListener =
Any.pack(Listener.newBuilder().setName("listener.googleapis.com").build());
long nanosLastUpdate = 1577923199_606042047L;
Map<String, ResourceMetadata> ldsResourceMetadataMap = new HashMap<>();
ldsResourceMetadataMap.put("resource1",
ResourceMetadata.newResourceMetadataRequested());
ResourceMetadata ackedLdsResource = ResourceMetadata.newResourceMetadataAcked(rawListener, "42",
nanosLastUpdate);
ldsResourceMetadataMap.put("resource2", ackedLdsResource);
ldsResourceMetadataMap.put("resource3",
ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate));
ldsResourceMetadataMap.put("resource4",
ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate,
"nacked after previous ack", true));
Map<String, ResourceMetadata> rdsResourceMetadataMap = new HashMap<>();
ResourceMetadata requestedRdsResourceMetadata = ResourceMetadata.newResourceMetadataRequested();
rdsResourceMetadataMap.put("resource5",
ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24",
nanosLastUpdate, "nacked after request", false));
rdsResourceMetadataMap.put("resource6",
ResourceMetadata.newResourceMetadataDoesNotExist());
Map<String, ResourceMetadata> cdsResourceMetadataMap = new HashMap<>();
cdsResourceMetadataMap.put("resource7", ResourceMetadata.newResourceMetadataUnknown());
metadataByType.put(listenerResource, ldsResourceMetadataMap);
metadataByType.put(routeConfigResource, rdsResourceMetadataMap);
metadataByType.put(clusterResource, cdsResourceMetadataMap);
SettableFuture<Void> reportServerConnectionsCompleted = SettableFuture.create();
reportServerConnectionsCompleted.set(null);
when(mockXdsClient.reportServerConnections(any(MetricReporterCallback.class)))
.thenReturn(reportServerConnectionsCompleted);
ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getResourceMetadataCompleted = Futures.immediateFuture(metadataByType);
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot())
.thenReturn(getResourceMetadataCompleted);
reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient);
// LDS resource requested
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "requested", listenerResource.typeUrl())), any());
// LDS resources acked
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(2L), eq(Arrays.asList(target, "acked", listenerResource.typeUrl())), any());
// LDS resource nacked but cached
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "nacked_but_cached", listenerResource.typeUrl())), any());
// RDS resource nacked
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "nacked", routeConfigResource.typeUrl())), any());
// RDS resource does not exist
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "does_not_exist", routeConfigResource.typeUrl())), any());
// CDS resource unknown
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "unknown", clusterResource.typeUrl())), any());
verifyNoMoreInteractions(mockBatchRecorder);
}
@Test
public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources() {
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType = new HashMap<>();
XdsResourceType<?> listenerResource = XdsListenerResource.getInstance();
metadataByType.put(listenerResource, Collections.emptyMap());
SettableFuture<Void> reportServerConnectionsCompleted = SettableFuture.create();
reportServerConnectionsCompleted.set(null);
when(mockXdsClient.reportServerConnections(any(MetricReporterCallback.class)))
.thenReturn(reportServerConnectionsCompleted);
ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getResourceMetadataCompleted = Futures.immediateFuture(metadataByType);
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot())
.thenReturn(getResourceMetadataCompleted);
reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient);
// Verify that reportResourceCountGauge is never called
verifyNoInteractions(mockBatchRecorder);
}
@Test
public void reportCallbackMetrics_computeAndReportResourceCounts_nullMetadata() {
TestlogHandler testLogHandler = new TestlogHandler();
Logger logger = Logger.getLogger(XdsClientMetricReporterImpl.class.getName());
logger.addHandler(testLogHandler);
SettableFuture<Void> reportServerConnectionsCompleted = SettableFuture.create();
reportServerConnectionsCompleted.set(null);
when(mockXdsClient.reportServerConnections(any(MetricReporterCallback.class)))
.thenReturn(reportServerConnectionsCompleted);
ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getResourceMetadataCompleted = Futures.immediateFailedFuture(
new Exception("Error generating metadata snapshot"));
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot())
.thenReturn(getResourceMetadataCompleted);
reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient);
assertThat(testLogHandler.getLogs().size()).isEqualTo(1);
assertThat(testLogHandler.getLogs().get(0).getLevel()).isEqualTo(Level.WARNING);
assertThat(testLogHandler.getLogs().get(0).getMessage()).isEqualTo(
"Failed to report gauge metrics");
logger.removeHandler(testLogHandler);
}
@Test
public void close_closesGaugeRegistration() {
MetricSink.Registration mockRegistration = mock(MetricSink.Registration.class);
when(mockMetricRecorder.registerBatchCallback(any(MetricRecorder.BatchCallback.class),
eqMetricInstrumentName("grpc.xds_client.connected"),
eqMetricInstrumentName("grpc.xds_client.resources"))).thenReturn(mockRegistration);
// Sets XdsClient and register the gauges
reporter.setXdsClient(mockXdsClient);
// Closes registered gauges
reporter.close();
verify(mockRegistration, times(1)).close();
}
@SuppressWarnings("TypeParameterUnusedInFormals")
private <T extends MetricInstrument> T eqMetricInstrumentName(String name) {
return argThat(new ArgumentMatcher<T>() {
@Override
public boolean matches(T instrument) {
return instrument.getName().equals(name);
}
});
}
static class TestlogHandler extends Handler {
List<LogRecord> logs = new ArrayList<>();
@Override
public void publish(LogRecord record) {
logs.add(record);
}
@Override
public void close() {}
@Override
public void flush() {}
public List<LogRecord> getLogs() {
return logs;
}
}
}

View File

@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.grpc.ChannelLogger; import io.grpc.ChannelLogger;
import io.grpc.InternalServiceProviders; import io.grpc.InternalServiceProviders;
import io.grpc.MetricRecorder;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.NameResolver.ServiceConfigParser; import io.grpc.NameResolver.ServiceConfigParser;
import io.grpc.NameResolverProvider; import io.grpc.NameResolverProvider;
@ -57,6 +58,7 @@ public class XdsNameResolverProviderTest {
.setServiceConfigParser(mock(ServiceConfigParser.class)) .setServiceConfigParser(mock(ServiceConfigParser.class))
.setScheduledExecutorService(fakeClock.getScheduledExecutorService()) .setScheduledExecutorService(fakeClock.getScheduledExecutorService())
.setChannelLogger(mock(ChannelLogger.class)) .setChannelLogger(mock(ChannelLogger.class))
.setMetricRecorder(mock(MetricRecorder.class))
.build(); .build();
private XdsNameResolverProvider provider = new XdsNameResolverProvider(); private XdsNameResolverProvider provider = new XdsNameResolverProvider();

View File

@ -55,6 +55,7 @@ import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType; import io.grpc.MethodDescriptor.MethodType;
import io.grpc.MetricRecorder;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.NameResolver.ConfigOrError; import io.grpc.NameResolver.ConfigOrError;
import io.grpc.NameResolver.ResolutionResult; import io.grpc.NameResolver.ResolutionResult;
@ -152,6 +153,7 @@ public class XdsNameResolverTest {
private final CallInfo call1 = new CallInfo("HelloService", "hi"); private final CallInfo call1 = new CallInfo("HelloService", "hi");
private final CallInfo call2 = new CallInfo("GreetService", "bye"); private final CallInfo call2 = new CallInfo("GreetService", "bye");
private final TestChannel channel = new TestChannel(); private final TestChannel channel = new TestChannel();
private final MetricRecorder metricRecorder = new MetricRecorder() {};
private BootstrapInfo bootstrapInfo = BootstrapInfo.builder() private BootstrapInfo bootstrapInfo = BootstrapInfo.builder()
.servers(ImmutableList.of(ServerInfo.create( .servers(ImmutableList.of(ServerInfo.create(
"td.googleapis.com", InsecureChannelCredentials.create()))) "td.googleapis.com", InsecureChannelCredentials.create())))
@ -187,7 +189,7 @@ public class XdsNameResolverTest {
RouterFilter.INSTANCE); RouterFilter.INSTANCE);
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null,
serviceConfigParser, syncContext, scheduler, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, filterRegistry, null); xdsClientPoolFactory, mockRandom, filterRegistry, null, metricRecorder);
} }
@After @After
@ -215,7 +217,8 @@ public class XdsNameResolverTest {
} }
@Override @Override
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException { public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
throw new XdsInitializationException("Fail to read bootstrap file"); throw new XdsInitializationException("Fail to read bootstrap file");
} }
@ -227,7 +230,8 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null,
serviceConfigParser, syncContext, scheduler, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
verify(mockListener).onError(errorCaptor.capture()); verify(mockListener).onError(errorCaptor.capture());
Status error = errorCaptor.getValue(); Status error = errorCaptor.getValue();
@ -240,7 +244,8 @@ public class XdsNameResolverTest {
public void resolving_withTargetAuthorityNotFound() { public void resolving_withTargetAuthorityNotFound() {
resolver = new XdsNameResolver(targetUri, resolver = new XdsNameResolver(targetUri,
"notfound.google.com", AUTHORITY, null, serviceConfigParser, syncContext, scheduler, "notfound.google.com", AUTHORITY, null, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
verify(mockListener).onError(errorCaptor.capture()); verify(mockListener).onError(errorCaptor.capture());
Status error = errorCaptor.getValue(); Status error = errorCaptor.getValue();
@ -262,7 +267,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver( resolver = new XdsNameResolver(
targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext,
scheduler, xdsClientPoolFactory, scheduler, xdsClientPoolFactory,
mockRandom, FilterRegistry.getDefaultRegistry(), null); mockRandom, FilterRegistry.getDefaultRegistry(), null, metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
verify(mockListener, never()).onError(any(Status.class)); verify(mockListener, never()).onError(any(Status.class));
} }
@ -282,7 +287,8 @@ public class XdsNameResolverTest {
+ "%5B::FFFF:129.144.52.38%5D:80?id=1"; + "%5B::FFFF:129.144.52.38%5D:80?id=1";
resolver = new XdsNameResolver( resolver = new XdsNameResolver(
targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
verify(mockListener, never()).onError(any(Status.class)); verify(mockListener, never()).onError(any(Status.class));
} }
@ -302,7 +308,8 @@ public class XdsNameResolverTest {
+ "path/to/service?id=1"; + "path/to/service?id=1";
resolver = new XdsNameResolver( resolver = new XdsNameResolver(
targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
// The Service Authority must be URL encoded, but unlike the LDS resource name. // The Service Authority must be URL encoded, but unlike the LDS resource name.
@ -330,7 +337,8 @@ public class XdsNameResolverTest {
+ "%5B::FFFF:129.144.52.38%5D:80?bar=2&foo=1"; // query param canonified + "%5B::FFFF:129.144.52.38%5D:80?bar=2&foo=1"; // query param canonified
resolver = new XdsNameResolver(targetUri, resolver = new XdsNameResolver(targetUri,
"xds.authority.com", serviceAuthority, null, serviceConfigParser, syncContext, scheduler, "xds.authority.com", serviceAuthority, null, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
verify(mockListener, never()).onError(any(Status.class)); verify(mockListener, never()).onError(any(Status.class));
} }
@ -362,7 +370,8 @@ public class XdsNameResolverTest {
.build(); .build();
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null,
serviceConfigParser, syncContext, scheduler, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
// use different ldsResourceName and service authority. The virtualhost lookup should use // use different ldsResourceName and service authority. The virtualhost lookup should use
// service authority. // service authority.
expectedLdsResourceName = "test-" + expectedLdsResourceName; expectedLdsResourceName = "test-" + expectedLdsResourceName;
@ -543,7 +552,8 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random", resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random",
serviceConfigParser, syncContext, scheduler, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost)); xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost));
@ -566,7 +576,8 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random", resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random",
serviceConfigParser, syncContext, scheduler, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost)); xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost));
@ -577,7 +588,8 @@ public class XdsNameResolverTest {
public void resolving_matchingVirtualHostNotFoundForOverrideAuthority() { public void resolving_matchingVirtualHostNotFoundForOverrideAuthority() {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, AUTHORITY, resolver = new XdsNameResolver(targetUri, null, AUTHORITY, AUTHORITY,
serviceConfigParser, syncContext, scheduler, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(0L, buildUnmatchedVirtualHosts()); xdsClient.deliverLdsUpdate(0L, buildUnmatchedVirtualHosts());
@ -661,7 +673,8 @@ public class XdsNameResolverTest {
ServiceConfigParser realParser = new ScParser( ServiceConfigParser realParser = new ScParser(
true, 5, 5, new AutoConfiguredLoadBalancerFactory("pick-first")); true, 5, 5, new AutoConfiguredLoadBalancerFactory("pick-first"));
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, realParser, syncContext, resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, realParser, syncContext,
scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
RetryPolicy retryPolicy = RetryPolicy.create( RetryPolicy retryPolicy = RetryPolicy.create(
@ -871,7 +884,8 @@ public class XdsNameResolverTest {
when(mockRandom.nextLong()).thenReturn(123L); when(mockRandom.nextLong()).thenReturn(123L);
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser,
syncContext, scheduler, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate( xdsClient.deliverLdsUpdate(
@ -904,7 +918,7 @@ public class XdsNameResolverTest {
public void resolved_routeActionHasAutoHostRewrite_emitsCallOptionForTheSame() { public void resolved_routeActionHasAutoHostRewrite_emitsCallOptionForTheSame() {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser,
syncContext, scheduler, xdsClientPoolFactory, mockRandom, syncContext, scheduler, xdsClientPoolFactory, mockRandom,
FilterRegistry.getDefaultRegistry(), null); FilterRegistry.getDefaultRegistry(), null, metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate( xdsClient.deliverLdsUpdate(
@ -935,7 +949,7 @@ public class XdsNameResolverTest {
public void resolved_routeActionNoAutoHostRewrite_doesntEmitCallOptionForTheSame() { public void resolved_routeActionNoAutoHostRewrite_doesntEmitCallOptionForTheSame() {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser,
syncContext, scheduler, xdsClientPoolFactory, mockRandom, syncContext, scheduler, xdsClientPoolFactory, mockRandom,
FilterRegistry.getDefaultRegistry(), null); FilterRegistry.getDefaultRegistry(), null, metricRecorder);
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate( xdsClient.deliverLdsUpdate(
@ -1994,7 +2008,8 @@ public class XdsNameResolverTest {
} }
@Override @Override
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException { public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
targets.add(target); targets.add(target);
return new ObjectPool<XdsClient>() { return new ObjectPool<XdsClient>() {
@Override @Override

View File

@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureChannelCredentials;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
import io.grpc.xds.EnvoyServerProtoData.ConnectionSourceType; import io.grpc.xds.EnvoyServerProtoData.ConnectionSourceType;
import io.grpc.xds.EnvoyServerProtoData.FilterChain; import io.grpc.xds.EnvoyServerProtoData.FilterChain;
@ -151,7 +152,8 @@ public class XdsServerTestHelper {
} }
@Override @Override
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException { public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
return new ObjectPool<XdsClient>() { return new ObjectPool<XdsClient>() {
@Override @Override
public XdsClient getObject() { public XdsClient getObject() {