Compare commits

...

13 Commits

Author SHA1 Message Date
Eric Anderson 9d2c895c29 Bump version to 1.74.0 2025-07-24 13:39:04 -07:00
Eric Anderson b58900e759 Update README etc to reference 1.74.0 2025-07-24 13:39:04 -07:00
Eric Anderson 15c7573988 netty: Associate netty stream eagerly to avoid client hang
In #12185, RPCs were randomly hanging. In #12207 this was tracked down
to the headers promise completing successfully, but the netty stream
was null. This was because the headers write hadn't completed but
stream.close() had been called by goingAway().
2025-07-18 16:05:36 +00:00
George Gensure b04c673fdf Guarantee missing stream promise delivery
In observed cases, whether RST_STREAM or another failure from netty or
the server, listeners can fail to be notified when a connection yields a
null stream for the selected streamId. This causes hangs in clients,
despite deadlines, with no obvious resolution.

Tests which relied upon this promise succeeding must now change.
2025-07-18 16:05:27 +00:00
Eric Anderson 1df2a33056 LBs should avoid calling LBs after lb.shutdown()
LoadBalancers shouldn't be called after shutdown(), but RingHashLb could
have enqueued work to the SynchronizationContext that executed after
shutdown(). This commit fixes problems discovered when auditing all LBs
usage of the syncContext for that type of problem.

Similarly, PickFirstLb could have requested a new connection after
shutdown(). We want to avoid that sort of thing too.

RingHashLb's test changed from CONNECTING to TRANSIENT_FAILURE to get
the latest picker. Because two subchannels have failed it will be in
TRANSIENT_FAILURE. Previously the test was using an older picker with
out-of-date subchannelView, and the verifyConnection() was too imprecise
to notice it was creating the wrong subchannel.

As discovered in b/430347751, where ClusterImplLb was seeing a new
subchannel being called after the child LB was shutdown (the shutdown
itself had been caused by RingHashConfig not implementing equals() and
was fixed by a8de9f07ab, which caused ClusterResolverLb to replace its
state):

```
java.lang.NullPointerException
	at io.grpc.xds.ClusterImplLoadBalancer$ClusterImplLbHelper.createClusterLocalityFromAttributes(ClusterImplLoadBalancer.java:322)
	at io.grpc.xds.ClusterImplLoadBalancer$ClusterImplLbHelper.createSubchannel(ClusterImplLoadBalancer.java:236)
	at io.grpc.util.ForwardingLoadBalancerHelper.createSubchannel(ForwardingLoadBalancerHelper.java:47)
	at io.grpc.util.ForwardingLoadBalancerHelper.createSubchannel(ForwardingLoadBalancerHelper.java:47)
	at io.grpc.internal.PickFirstLeafLoadBalancer.createNewSubchannel(PickFirstLeafLoadBalancer.java:527)
	at io.grpc.internal.PickFirstLeafLoadBalancer.requestConnection(PickFirstLeafLoadBalancer.java:459)
	at io.grpc.internal.PickFirstLeafLoadBalancer.acceptResolvedAddresses(PickFirstLeafLoadBalancer.java:174)
	at io.grpc.xds.LazyLoadBalancer$LazyDelegate.activate(LazyLoadBalancer.java:64)
	at io.grpc.xds.LazyLoadBalancer$LazyDelegate.requestConnection(LazyLoadBalancer.java:97)
	at io.grpc.util.ForwardingLoadBalancer.requestConnection(ForwardingLoadBalancer.java:61)
	at io.grpc.xds.RingHashLoadBalancer$RingHashPicker.lambda$pickSubchannel$0(RingHashLoadBalancer.java:440)
	at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:96)
	at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:128)
	at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.onData(XdsClientImpl.java:817)
```
2025-07-18 16:05:09 +00:00
Eric Anderson a5eaa66ccc xds: Implement equals in RingHashConfig
Lack of equals causes cluster_resolver to consider every update a
different configuration and restart itself.

b/430347751
2025-07-18 16:05:01 +00:00
Eric Anderson 393f02b117 Revert "xds: Convert CdsLb to XdsDepManager"
This reverts commit 297ab05efe.

b/430347751 shows multiple concerning behaviors in the xDS stack with
the new A74 config update model. XdsDepManager and CdsLB2 still seem to
be working correctly, but the change is exacerbated issues in other
parts of the stack, like RingHashConfig not having equals fixed in
a8de9f07ab.

Revert only for the v1.74.x release, leaving it on master.
2025-07-16 19:54:03 +00:00
Eric Anderson 69b8cf5f26 Revert "xds: Support tracking non-xds resources in XdsDepManager"
This reverts commit d5b4fb51c2 as part of
reverting 297ab05efe.
2025-07-16 19:54:03 +00:00
Eric Anderson 98e4252e34 Revert "xds: XdsNR should be subscribing to clusters with XdsDepManager"
This reverts commit 2604ce8a55 as part of
reverting 297ab05efe.
2025-07-16 19:54:03 +00:00
Eric Anderson 3b63af411d Revert "xds: Add logical dns cluster support to XdsDepManager"
This reverts commit d2d8ed8efa as part of
reverting 297ab05efe.
2025-07-16 19:54:03 +00:00
Eric Anderson f3daf93ce5 Revert "xds: Disable LOGICAL_DNS in XdsDepMan until used"
This reverts commit d374b26b68 as part of
reverting 297ab05efe.
2025-07-16 19:54:03 +00:00
Eric Anderson 2ecbd437c3 Revert "Fix RLS regressions from XdsDepMan conversion"
This reverts commit f6ba7c5291 as part of
reverting 297ab05efe.
2025-07-16 19:54:03 +00:00
Eric Anderson f6ba7c5291 Fix RLS regressions from XdsDepMan conversion
297ab05ef converted CDS to XdsDependencyManager. This caused three
regressions:

 * CdsLB2 as a RLS child would always fail with "Unable to find
   non-dynamic root cluster" because is_dynamic=true was missing in
   its service config
 * XdsNameResolver only propagated resolution updates when the clusters
   changed, so a CdsUpdate change would be ignored. This caused a hang
   for RLS even with is_dynamic=true. For non-RLS the lack config update
   broke the circuit breaking psm interop test. This would have been
   more severe if ClusterResolverLb had been converted to
   XdsDependenceManager, as it would have ignored EDS updates
 * RLS did not propagate resolution updates, so CdsLB2 even with
   is_dynamic=true the CdsUpdate for the new cluster would never arrive,
   causing a hang

b/428120265
b/427912384
2025-07-09 14:53:41 +00:00
61 changed files with 1441 additions and 1299 deletions

View File

@ -2,7 +2,7 @@ module(
name = "grpc-java",
compatibility_level = 0,
repo_name = "io_grpc_grpc_java",
version = "1.74.0-SNAPSHOT", # CURRENT_GRPC_VERSION
version = "1.74.0", # CURRENT_GRPC_VERSION
)
# GRPC_DEPS_START

View File

@ -44,8 +44,8 @@ For a guided tour, take a look at the [quick start
guide](https://grpc.io/docs/languages/java/quickstart) or the more explanatory [gRPC
basics](https://grpc.io/docs/languages/java/basics).
The [examples](https://github.com/grpc/grpc-java/tree/v1.73.0/examples) and the
[Android example](https://github.com/grpc/grpc-java/tree/v1.73.0/examples/android)
The [examples](https://github.com/grpc/grpc-java/tree/v1.74.0/examples) and the
[Android example](https://github.com/grpc/grpc-java/tree/v1.74.0/examples/android)
are standalone projects that showcase the usage of gRPC.
Download
@ -56,18 +56,18 @@ Download [the JARs][]. Or for Maven with non-Android, add to your `pom.xml`:
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.73.0</version>
<version>1.74.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.73.0</version>
<version>1.74.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.73.0</version>
<version>1.74.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
@ -79,18 +79,18 @@ Download [the JARs][]. Or for Maven with non-Android, add to your `pom.xml`:
Or for Gradle with non-Android, add to your dependencies:
```gradle
runtimeOnly 'io.grpc:grpc-netty-shaded:1.73.0'
implementation 'io.grpc:grpc-protobuf:1.73.0'
implementation 'io.grpc:grpc-stub:1.73.0'
runtimeOnly 'io.grpc:grpc-netty-shaded:1.74.0'
implementation 'io.grpc:grpc-protobuf:1.74.0'
implementation 'io.grpc:grpc-stub:1.74.0'
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+
```
For Android client, use `grpc-okhttp` instead of `grpc-netty-shaded` and
`grpc-protobuf-lite` instead of `grpc-protobuf`:
```gradle
implementation 'io.grpc:grpc-okhttp:1.73.0'
implementation 'io.grpc:grpc-protobuf-lite:1.73.0'
implementation 'io.grpc:grpc-stub:1.73.0'
implementation 'io.grpc:grpc-okhttp:1.74.0'
implementation 'io.grpc:grpc-protobuf-lite:1.74.0'
implementation 'io.grpc:grpc-stub:1.74.0'
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+
```
@ -99,7 +99,7 @@ For [Bazel](https://bazel.build), you can either
(with the GAVs from above), or use `@io_grpc_grpc_java//api` et al (see below).
[the JARs]:
https://search.maven.org/search?q=g:io.grpc%20AND%20v:1.73.0
https://search.maven.org/search?q=g:io.grpc%20AND%20v:1.74.0
Development snapshots are available in [Sonatypes's snapshot
repository](https://central.sonatype.com/repository/maven-snapshots/).
@ -131,7 +131,7 @@ For protobuf-based codegen integrated with the Maven build system, you can use
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.25.5:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.73.0:exe:${os.detected.classifier}</pluginArtifact>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.74.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
@ -161,7 +161,7 @@ protobuf {
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.73.0'
artifact = 'io.grpc:protoc-gen-grpc-java:1.74.0'
}
}
generateProtoTasks {
@ -194,7 +194,7 @@ protobuf {
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.73.0'
artifact = 'io.grpc:protoc-gen-grpc-java:1.74.0'
}
}
generateProtoTasks {

View File

@ -1189,6 +1189,10 @@ public abstract class LoadBalancer {
* Returns a {@link SynchronizationContext} that runs tasks in the same Synchronization Context
* as that the callback methods on the {@link LoadBalancer} interface are run in.
*
* <p>Work added to the synchronization context might not run immediately, so LB implementations
* must be careful to ensure that any assumptions still hold when it is executed. In particular,
* the LB might have been shut down or subchannels might have changed state.
*
* <p>Pro-tip: in order to call {@link SynchronizationContext#schedule}, you need to provide a
* {@link ScheduledExecutorService}. {@link #getScheduledExecutorService} is provided for your
* convenience.

View File

@ -303,7 +303,6 @@ public abstract class NameResolver {
@Nullable private final Executor executor;
@Nullable private final String overrideAuthority;
@Nullable private final MetricRecorder metricRecorder;
@Nullable private final NameResolverRegistry nameResolverRegistry;
@Nullable private final IdentityHashMap<Key<?>, Object> customArgs;
private Args(Builder builder) {
@ -317,7 +316,6 @@ public abstract class NameResolver {
this.executor = builder.executor;
this.overrideAuthority = builder.overrideAuthority;
this.metricRecorder = builder.metricRecorder;
this.nameResolverRegistry = builder.nameResolverRegistry;
this.customArgs = cloneCustomArgs(builder.customArgs);
}
@ -449,18 +447,6 @@ public abstract class NameResolver {
return metricRecorder;
}
/**
* Returns the {@link NameResolverRegistry} that the Channel uses to look for {@link
* NameResolver}s.
*
* @since 1.74.0
*/
public NameResolverRegistry getNameResolverRegistry() {
if (nameResolverRegistry == null) {
throw new IllegalStateException("NameResolverRegistry is not set in Builder");
}
return nameResolverRegistry;
}
@Override
public String toString() {
@ -475,7 +461,6 @@ public abstract class NameResolver {
.add("executor", executor)
.add("overrideAuthority", overrideAuthority)
.add("metricRecorder", metricRecorder)
.add("nameResolverRegistry", nameResolverRegistry)
.toString();
}
@ -495,7 +480,6 @@ public abstract class NameResolver {
builder.setOffloadExecutor(executor);
builder.setOverrideAuthority(overrideAuthority);
builder.setMetricRecorder(metricRecorder);
builder.setNameResolverRegistry(nameResolverRegistry);
builder.customArgs = cloneCustomArgs(customArgs);
return builder;
}
@ -524,7 +508,6 @@ public abstract class NameResolver {
private Executor executor;
private String overrideAuthority;
private MetricRecorder metricRecorder;
private NameResolverRegistry nameResolverRegistry;
private IdentityHashMap<Key<?>, Object> customArgs;
Builder() {
@ -631,16 +614,6 @@ public abstract class NameResolver {
return this;
}
/**
* See {@link Args#getNameResolverRegistry}. This is an optional field.
*
* @since 1.74.0
*/
public Builder setNameResolverRegistry(NameResolverRegistry registry) {
this.nameResolverRegistry = registry;
return this;
}
/**
* Builds an {@link Args}.
*

View File

@ -21,7 +21,7 @@ subprojects {
apply plugin: "net.ltgt.errorprone"
group = "io.grpc"
version = "1.74.0-SNAPSHOT" // CURRENT_GRPC_VERSION
version = "1.74.0" // CURRENT_GRPC_VERSION
repositories {
maven { // The google mirror is less flaky than mavenCentral()

View File

@ -8,7 +8,7 @@ import static io.grpc.MethodDescriptor.generateFullMethodName;
* </pre>
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler (version 1.74.0-SNAPSHOT)",
value = "by gRPC proto compiler (version 1.74.0)",
comments = "Source: grpc/testing/compiler/test.proto")
@io.grpc.stub.annotations.GrpcGenerated
@java.lang.Deprecated

View File

@ -8,7 +8,7 @@ import static io.grpc.MethodDescriptor.generateFullMethodName;
* </pre>
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler (version 1.74.0-SNAPSHOT)",
value = "by gRPC proto compiler (version 1.74.0)",
comments = "Source: grpc/testing/compiler/test.proto")
@io.grpc.stub.annotations.GrpcGenerated
public final class TestServiceGrpc {

View File

@ -219,7 +219,7 @@ public final class GrpcUtil {
public static final Splitter ACCEPT_ENCODING_SPLITTER = Splitter.on(',').trimResults();
public static final String IMPLEMENTATION_VERSION = "1.74.0-SNAPSHOT"; // CURRENT_GRPC_VERSION
public static final String IMPLEMENTATION_VERSION = "1.74.0"; // CURRENT_GRPC_VERSION
/**
* The default timeout in nanos for a keepalive ping request.

View File

@ -597,8 +597,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
.setChannelLogger(channelLogger)
.setOffloadExecutor(this.offloadExecutorHolder)
.setOverrideAuthority(this.authorityOverride)
.setMetricRecorder(this.metricRecorder)
.setNameResolverRegistry(builder.nameResolverRegistry);
.setMetricRecorder(this.metricRecorder);
builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
this.nameResolverArgs = nameResolverArgsBuilder.build();
this.nameResolver = getNameResolver(
@ -686,7 +685,11 @@ final class ManagedChannelImpl extends ManagedChannel implements
// We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
// TODO: After a transition period, all NameResolver implementations that need retry should use
// RetryingNameResolver directly and this step can be removed.
NameResolver usedNameResolver = RetryingNameResolver.wrap(resolver, nameResolverArgs);
NameResolver usedNameResolver = new RetryingNameResolver(resolver,
new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
nameResolverArgs.getScheduledExecutorService(),
nameResolverArgs.getSynchronizationContext()),
nameResolverArgs.getSynchronizationContext());
if (overrideAuthority == null) {
return usedNameResolver;
@ -1967,6 +1970,9 @@ final class ManagedChannelImpl extends ManagedChannel implements
public void requestConnection() {
syncContext.throwIfNotInThisSynchronizationContext();
checkState(started, "not started");
if (shutdown) {
return;
}
subchannel.obtainActiveTransport();
}

View File

@ -134,7 +134,7 @@ final class PickFirstLoadBalancer extends LoadBalancer {
SubchannelPicker picker;
switch (newState) {
case IDLE:
picker = new RequestConnectionPicker(subchannel);
picker = new RequestConnectionPicker();
break;
case CONNECTING:
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
@ -197,22 +197,12 @@ final class PickFirstLoadBalancer extends LoadBalancer {
/** Picker that requests connection during the first pick, and returns noResult. */
private final class RequestConnectionPicker extends SubchannelPicker {
private final Subchannel subchannel;
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
RequestConnectionPicker(Subchannel subchannel) {
this.subchannel = checkNotNull(subchannel, "subchannel");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
if (connectionRequested.compareAndSet(false, true)) {
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
subchannel.requestConnection();
}
});
helper.getSynchronizationContext().execute(PickFirstLoadBalancer.this::requestConnection);
}
return PickResult.withNoResult();
}

View File

@ -27,22 +27,13 @@ import io.grpc.SynchronizationContext;
*
* <p>The {@link NameResolver} used with this
*/
public final class RetryingNameResolver extends ForwardingNameResolver {
public static NameResolver wrap(NameResolver retriedNameResolver, Args args) {
// For migration, this might become conditional
return new RetryingNameResolver(
retriedNameResolver,
new BackoffPolicyRetryScheduler(
new ExponentialBackoffPolicy.Provider(),
args.getScheduledExecutorService(),
args.getSynchronizationContext()),
args.getSynchronizationContext());
}
final class RetryingNameResolver extends ForwardingNameResolver {
private final NameResolver retriedNameResolver;
private final RetryScheduler retryScheduler;
private final SynchronizationContext syncContext;
/**
* Creates a new {@link RetryingNameResolver}.
*

View File

@ -207,7 +207,14 @@ public class DnsNameResolverTest {
// In practice the DNS name resolver provider always wraps the resolver in a
// RetryingNameResolver which adds retry capabilities to it. We use the same setup here.
return (RetryingNameResolver) RetryingNameResolver.wrap(dnsResolver, args);
return new RetryingNameResolver(
dnsResolver,
new BackoffPolicyRetryScheduler(
new ExponentialBackoffPolicy.Provider(),
fakeExecutor.getScheduledExecutorService(),
syncContext
),
syncContext);
}
@Before

View File

@ -1767,6 +1767,19 @@ public class ManagedChannelImplTest {
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
}
@Test
public void subchannelsRequestConnectionNoopAfterShutdown() {
createChannel();
Subchannel sub1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
shutdownSafely(helper, sub1);
requestConnectionSafely(helper, sub1);
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
}
@Test
public void subchannelsNoConnectionShutdownNow() {
createChannel();

View File

@ -1,5 +1,5 @@
bazel_dep(name = "googleapis", repo_name = "com_google_googleapis", version = "0.0.0-20240326-1c8d509c5")
bazel_dep(name = "grpc-java", repo_name = "io_grpc_grpc_java", version = "1.74.0-SNAPSHOT") # CURRENT_GRPC_VERSION
bazel_dep(name = "grpc-java", repo_name = "io_grpc_grpc_java", version = "1.74.0") # CURRENT_GRPC_VERSION
bazel_dep(name = "grpc-proto", repo_name = "io_grpc_grpc_proto", version = "0.0.0-20240627-ec30f58")
bazel_dep(name = "protobuf", repo_name = "com_google_protobuf", version = "23.1")
bazel_dep(name = "rules_jvm_external", version = "6.0")

View File

@ -34,7 +34,7 @@ android {
protobuf {
protoc { artifact = 'com.google.protobuf:protoc:3.25.1' }
plugins {
grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.74.0' // CURRENT_GRPC_VERSION
}
}
generateProtoTasks {
@ -54,12 +54,12 @@ dependencies {
implementation 'androidx.appcompat:appcompat:1.0.0'
// You need to build grpc-java to obtain these libraries below.
implementation 'io.grpc:grpc-okhttp:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-protobuf-lite:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-stub:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-okhttp:1.74.0' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-protobuf-lite:1.74.0' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-stub:1.74.0' // CURRENT_GRPC_VERSION
implementation 'org.apache.tomcat:annotations-api:6.0.53'
testImplementation 'junit:junit:4.13.2'
testImplementation 'com.google.truth:truth:1.1.5'
testImplementation 'io.grpc:grpc-testing:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
testImplementation 'io.grpc:grpc-testing:1.74.0' // CURRENT_GRPC_VERSION
}

View File

@ -32,7 +32,7 @@ android {
protobuf {
protoc { artifact = 'com.google.protobuf:protoc:3.25.1' }
plugins {
grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.74.0' // CURRENT_GRPC_VERSION
}
}
generateProtoTasks {
@ -52,8 +52,8 @@ dependencies {
implementation 'androidx.appcompat:appcompat:1.0.0'
// You need to build grpc-java to obtain these libraries below.
implementation 'io.grpc:grpc-okhttp:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-protobuf-lite:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-stub:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-okhttp:1.74.0' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-protobuf-lite:1.74.0' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-stub:1.74.0' // CURRENT_GRPC_VERSION
implementation 'org.apache.tomcat:annotations-api:6.0.53'
}

View File

@ -32,7 +32,7 @@ android {
protobuf {
protoc { artifact = 'com.google.protobuf:protoc:3.25.1' }
plugins {
grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.74.0' // CURRENT_GRPC_VERSION
}
}
generateProtoTasks {
@ -52,8 +52,8 @@ dependencies {
implementation 'androidx.appcompat:appcompat:1.0.0'
// You need to build grpc-java to obtain these libraries below.
implementation 'io.grpc:grpc-okhttp:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-protobuf-lite:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-stub:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-okhttp:1.74.0' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-protobuf-lite:1.74.0' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-stub:1.74.0' // CURRENT_GRPC_VERSION
implementation 'org.apache.tomcat:annotations-api:6.0.53'
}

View File

@ -33,7 +33,7 @@ android {
protobuf {
protoc { artifact = 'com.google.protobuf:protoc:3.25.1' }
plugins {
grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.74.0' // CURRENT_GRPC_VERSION
}
}
generateProtoTasks {
@ -53,8 +53,8 @@ dependencies {
implementation 'androidx.appcompat:appcompat:1.0.0'
// You need to build grpc-java to obtain these libraries below.
implementation 'io.grpc:grpc-okhttp:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-protobuf-lite:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-stub:1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-okhttp:1.74.0' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-protobuf-lite:1.74.0' // CURRENT_GRPC_VERSION
implementation 'io.grpc:grpc-stub:1.74.0' // CURRENT_GRPC_VERSION
implementation 'org.apache.tomcat:annotations-api:6.0.53'
}

View File

@ -21,7 +21,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protobufVersion = '3.25.5'
def protocVersion = protobufVersion

View File

@ -21,7 +21,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protocVersion = '3.25.5'
dependencies {

View File

@ -23,7 +23,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protobufVersion = '3.25.5'
dependencies {

View File

@ -6,13 +6,13 @@
<packaging>jar</packaging>
<!-- Feel free to delete the comment at the end of these lines. It is just
for safely updating the version in our release process. -->
<version>1.74.0-SNAPSHOT</version><!-- CURRENT_GRPC_VERSION -->
<version>1.74.0</version><!-- CURRENT_GRPC_VERSION -->
<name>example-debug</name>
<url>https://github.com/grpc/grpc-java</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.74.0-SNAPSHOT</grpc.version><!-- CURRENT_GRPC_VERSION -->
<grpc.version>1.74.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protoc.version>3.25.5</protoc.version>
<!-- required for jdk9 -->
<maven.compiler.source>1.8</maven.compiler.source>

View File

@ -23,7 +23,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protobufVersion = '3.25.5'
dependencies {

View File

@ -6,13 +6,13 @@
<packaging>jar</packaging>
<!-- Feel free to delete the comment at the end of these lines. It is just
for safely updating the version in our release process. -->
<version>1.74.0-SNAPSHOT</version><!-- CURRENT_GRPC_VERSION -->
<version>1.74.0</version><!-- CURRENT_GRPC_VERSION -->
<name>example-dualstack</name>
<url>https://github.com/grpc/grpc-java</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.74.0-SNAPSHOT</grpc.version><!-- CURRENT_GRPC_VERSION -->
<grpc.version>1.74.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protoc.version>3.25.5</protoc.version>
<!-- required for jdk9 -->
<maven.compiler.source>1.8</maven.compiler.source>

View File

@ -21,7 +21,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protobufVersion = '3.25.5'
def protocVersion = protobufVersion

View File

@ -6,13 +6,13 @@
<packaging>jar</packaging>
<!-- Feel free to delete the comment at the end of these lines. It is just
for safely updating the version in our release process. -->
<version>1.74.0-SNAPSHOT</version><!-- CURRENT_GRPC_VERSION -->
<version>1.74.0</version><!-- CURRENT_GRPC_VERSION -->
<name>example-gauth</name>
<url>https://github.com/grpc/grpc-java</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.74.0-SNAPSHOT</grpc.version><!-- CURRENT_GRPC_VERSION -->
<grpc.version>1.74.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.25.5</protobuf.version>
<!-- required for jdk9 -->
<maven.compiler.source>1.8</maven.compiler.source>

View File

@ -22,7 +22,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protocVersion = '3.25.5'
def openTelemetryVersion = '1.40.0'
def openTelemetryPrometheusVersion = '1.40.0-alpha'

View File

@ -22,7 +22,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protocVersion = '3.25.5'
dependencies {

View File

@ -21,7 +21,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protobufVersion = '3.25.5'
dependencies {

View File

@ -6,13 +6,13 @@
<packaging>jar</packaging>
<!-- Feel free to delete the comment at the end of these lines. It is just
for safely updating the version in our release process. -->
<version>1.74.0-SNAPSHOT</version><!-- CURRENT_GRPC_VERSION -->
<version>1.74.0</version><!-- CURRENT_GRPC_VERSION -->
<name>example-hostname</name>
<url>https://github.com/grpc/grpc-java</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.74.0-SNAPSHOT</grpc.version><!-- CURRENT_GRPC_VERSION -->
<grpc.version>1.74.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protoc.version>3.25.5</protoc.version>
<!-- required for jdk9 -->
<maven.compiler.source>1.8</maven.compiler.source>

View File

@ -21,7 +21,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protobufVersion = '3.25.5'
def protocVersion = protobufVersion

View File

@ -7,13 +7,13 @@
<packaging>jar</packaging>
<!-- Feel free to delete the comment at the end of these lines. It is just
for safely updating the version in our release process. -->
<version>1.74.0-SNAPSHOT</version><!-- CURRENT_GRPC_VERSION -->
<version>1.74.0</version><!-- CURRENT_GRPC_VERSION -->
<name>example-jwt-auth</name>
<url>https://github.com/grpc/grpc-java</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.74.0-SNAPSHOT</grpc.version><!-- CURRENT_GRPC_VERSION -->
<grpc.version>1.74.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.25.5</protobuf.version>
<protoc.version>3.25.5</protoc.version>
<!-- required for jdk9 -->

View File

@ -21,7 +21,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protobufVersion = '3.25.5'
def protocVersion = protobufVersion

View File

@ -7,13 +7,13 @@
<packaging>jar</packaging>
<!-- Feel free to delete the comment at the end of these lines. It is just
for safely updating the version in our release process. -->
<version>1.74.0-SNAPSHOT</version><!-- CURRENT_GRPC_VERSION -->
<version>1.74.0</version><!-- CURRENT_GRPC_VERSION -->
<name>example-oauth</name>
<url>https://github.com/grpc/grpc-java</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.74.0-SNAPSHOT</grpc.version><!-- CURRENT_GRPC_VERSION -->
<grpc.version>1.74.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.25.5</protobuf.version>
<protoc.version>3.25.5</protoc.version>
<!-- required for jdk9 -->

View File

@ -21,7 +21,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protocVersion = '3.25.5'
def openTelemetryVersion = '1.40.0'
def openTelemetryPrometheusVersion = '1.40.0-alpha'

View File

@ -16,7 +16,7 @@ java {
targetCompatibility = JavaVersion.VERSION_1_8
}
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protocVersion = '3.25.5'
dependencies {

View File

@ -16,7 +16,7 @@ java {
targetCompatibility = JavaVersion.VERSION_1_8
}
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protocVersion = '3.25.5'
dependencies {

View File

@ -15,7 +15,7 @@ java {
targetCompatibility = JavaVersion.VERSION_1_8
}
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protocVersion = '3.25.5'
dependencies {

View File

@ -21,7 +21,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protocVersion = '3.25.5'
dependencies {

View File

@ -6,13 +6,13 @@
<packaging>jar</packaging>
<!-- Feel free to delete the comment at the end of these lines. It is just
for safely updating the version in our release process. -->
<version>1.74.0-SNAPSHOT</version><!-- CURRENT_GRPC_VERSION -->
<version>1.74.0</version><!-- CURRENT_GRPC_VERSION -->
<name>example-tls</name>
<url>https://github.com/grpc/grpc-java</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.74.0-SNAPSHOT</grpc.version><!-- CURRENT_GRPC_VERSION -->
<grpc.version>1.74.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protoc.version>3.25.5</protoc.version>
<!-- required for jdk9 -->
<maven.compiler.source>1.8</maven.compiler.source>

View File

@ -21,7 +21,7 @@ java {
// Feel free to delete the comment at the next line. It is just for safely
// updating the version in our release process.
def grpcVersion = '1.74.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def grpcVersion = '1.74.0' // CURRENT_GRPC_VERSION
def protocVersion = '3.25.5'
dependencies {

View File

@ -6,13 +6,13 @@
<packaging>jar</packaging>
<!-- Feel free to delete the comment at the end of these lines. It is just
for safely updating the version in our release process. -->
<version>1.74.0-SNAPSHOT</version><!-- CURRENT_GRPC_VERSION -->
<version>1.74.0</version><!-- CURRENT_GRPC_VERSION -->
<name>examples</name>
<url>https://github.com/grpc/grpc-java</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.74.0-SNAPSHOT</grpc.version><!-- CURRENT_GRPC_VERSION -->
<grpc.version>1.74.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.25.5</protobuf.version>
<protoc.version>3.25.5</protoc.version>
<!-- required for JDK 8 -->

View File

@ -122,7 +122,7 @@ class ShufflingPickFirstLoadBalancer extends LoadBalancer {
SubchannelPicker picker;
switch (currentState) {
case IDLE:
picker = new RequestConnectionPicker(subchannel);
picker = new RequestConnectionPicker();
break;
case CONNECTING:
picker = new Picker(PickResult.withNoResult());
@ -182,24 +182,15 @@ class ShufflingPickFirstLoadBalancer extends LoadBalancer {
*/
private final class RequestConnectionPicker extends SubchannelPicker {
private final Subchannel subchannel;
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
RequestConnectionPicker(Subchannel subchannel) {
this.subchannel = checkNotNull(subchannel, "subchannel");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
if (connectionRequested.compareAndSet(false, true)) {
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
subchannel.requestConnection();
}
});
helper.getSynchronizationContext().execute(
ShufflingPickFirstLoadBalancer.this::requestConnection);
}
return PickResult.withNoResult();
}
}
}
}

View File

@ -738,14 +738,19 @@ class NettyClientHandler extends AbstractNettyHandler {
// Attach the client stream to the HTTP/2 stream object as user data.
stream.setHttp2Stream(http2Stream);
promise.setSuccess();
} else {
// Otherwise, the stream has been cancelled and Netty is sending a
// RST_STREAM frame which causes it to purge pending writes from the
// flow-controller and delete the http2Stream. The stream listener has already
// been notified of cancellation so there is nothing to do.
//
// This process has been observed to fail in some circumstances, leaving listeners
// unanswered. Ensure that some exception has been delivered consistent with the
// implied RST_STREAM result above.
Status status = Status.INTERNAL.withDescription("unknown stream for connection");
promise.setFailure(status.asRuntimeException());
}
// Otherwise, the stream has been cancelled and Netty is sending a
// RST_STREAM frame which causes it to purge pending writes from the
// flow-controller and delete the http2Stream. The stream listener has already
// been notified of cancellation so there is nothing to do.
// Just forward on the success status to the original promise.
promise.setSuccess();
} else {
Throwable cause = future.cause();
if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
@ -768,6 +773,19 @@ class NettyClientHandler extends AbstractNettyHandler {
}
}
});
// When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in
// StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS
// are delayed because the OS may have too much buffered and isn't accepting the write. The
// write promise is also delayed until flush(). However, we need to associate the netty stream
// with the transport state so that goingAway() and forcefulClose() and able to notify the
// stream of failures.
//
// This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but
// it is better than nothing.
Http2Stream http2Stream = connection().stream(streamId);
if (http2Stream != null) {
http2Stream.setProperty(streamKey, stream);
}
}
/**

View File

@ -268,7 +268,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
// Cancel the stream.
cancelStream(Status.CANCELLED);
assertTrue(createFuture.isSuccess());
assertFalse(createFuture.isSuccess());
verify(streamListener).closed(eq(Status.CANCELLED), same(PROCESSED), any(Metadata.class));
}
@ -311,7 +311,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
ChannelFuture cancelFuture = cancelStream(Status.CANCELLED);
assertTrue(cancelFuture.isSuccess());
assertTrue(createFuture.isDone());
assertTrue(createFuture.isSuccess());
assertFalse(createFuture.isSuccess());
}
/**
@ -453,6 +453,26 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
assertTrue(future.isDone());
}
@Test
public void receivedAbruptGoAwayShouldFailRacingQueuedIoStreamid() throws Exception {
// Purposefully avoid flush(), since we want the write to not actually complete.
// EmbeddedChannel doesn't support flow control, so this is the next closest approximation.
ChannelFuture future = channel().write(
newCreateStreamCommand(grpcHeaders, streamTransportState));
// Read a GOAWAY that indicates our stream can't be sent
channelRead(goAwayFrame(0, 0 /* NO_ERROR */, Unpooled.copiedBuffer("this is a test", UTF_8)));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(streamListener).closed(captor.capture(), same(REFUSED),
ArgumentMatchers.<Metadata>notNull());
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
assertEquals(
"Abrupt GOAWAY closed sent stream. HTTP/2 error code: NO_ERROR, "
+ "debug data: this is a test",
captor.getValue().getDescription());
assertTrue(future.isDone());
}
@Test
public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams()
throws Exception {

View File

@ -21,30 +21,36 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.CheckReturnValue;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ObjectPool;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
import io.grpc.xds.XdsConfig.Subscription;
import io.grpc.xds.XdsConfig.XdsClusterConfig;
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
/**
* Load balancer for cds_experimental LB policy. One instance per top-level cluster.
@ -54,11 +60,13 @@ import java.util.List;
final class CdsLoadBalancer2 extends LoadBalancer {
private final XdsLogger logger;
private final Helper helper;
private final SynchronizationContext syncContext;
private final LoadBalancerRegistry lbRegistry;
// Following fields are effectively final.
private String clusterName;
private Subscription clusterSubscription;
private LoadBalancer childLb;
private ObjectPool<XdsClient> xdsClientPool;
private XdsClient xdsClient;
private CdsLbState cdsLbState;
private ResolvedAddresses resolvedAddresses;
CdsLoadBalancer2(Helper helper) {
this(helper, LoadBalancerRegistry.getDefaultRegistry());
@ -67,6 +75,7 @@ final class CdsLoadBalancer2 extends LoadBalancer {
@VisibleForTesting
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
@ -74,115 +83,25 @@ final class CdsLoadBalancer2 extends LoadBalancer {
@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
if (this.clusterName == null) {
CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
logger.log(XdsLogLevel.INFO, "Config: {0}", config);
if (config.isDynamic) {
clusterSubscription = resolvedAddresses.getAttributes()
.get(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY)
.subscribeToCluster(config.name);
}
this.clusterName = config.name;
}
XdsConfig xdsConfig = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CONFIG);
StatusOr<XdsClusterConfig> clusterConfigOr = xdsConfig.getClusters().get(clusterName);
if (clusterConfigOr == null) {
if (clusterSubscription == null) {
// Should be impossible, because XdsDependencyManager wouldn't have generated this
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unable to find non-dynamic root cluster"));
}
// The dynamic cluster must not have loaded yet
if (this.resolvedAddresses != null) {
return Status.OK;
}
if (!clusterConfigOr.hasValue()) {
return fail(clusterConfigOr.getStatus());
}
XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
List<String> leafNames;
if (clusterConfig.getChildren() instanceof AggregateConfig) {
leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
} else if (clusterConfig.getChildren() instanceof EndpointConfig) {
leafNames = ImmutableList.of(clusterName);
} else {
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unexpected cluster children type: "
+ clusterConfig.getChildren().getClass()));
}
if (leafNames.isEmpty()) {
// Should be impossible, because XdsClusterResource validated this
return fail(Status.UNAVAILABLE.withDescription(
errorPrefix() + "Zero leaf clusters for root cluster " + clusterName));
}
Status noneFoundError = Status.INTERNAL
.withDescription(errorPrefix() + "No leaves and no error; this is a bug");
List<DiscoveryMechanism> instances = new ArrayList<>();
for (String leafName : leafNames) {
StatusOr<XdsClusterConfig> leafConfigOr = xdsConfig.getClusters().get(leafName);
if (!leafConfigOr.hasValue()) {
noneFoundError = leafConfigOr.getStatus();
continue;
}
if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) {
noneFoundError = Status.INTERNAL.withDescription(
errorPrefix() + "Unexpected child " + leafName + " cluster children type: "
+ leafConfigOr.getValue().getChildren().getClass());
continue;
}
CdsUpdate result = leafConfigOr.getValue().getClusterResource();
DiscoveryMechanism instance;
if (result.clusterType() == ClusterType.EDS) {
instance = DiscoveryMechanism.forEds(
leafName,
result.edsServiceName(),
result.lrsServerInfo(),
result.maxConcurrentRequests(),
result.upstreamTlsContext(),
result.filterMetadata(),
result.outlierDetection());
} else {
instance = DiscoveryMechanism.forLogicalDns(
leafName,
result.dnsHostName(),
result.lrsServerInfo(),
result.maxConcurrentRequests(),
result.upstreamTlsContext(),
result.filterMetadata());
}
instances.add(instance);
}
if (instances.isEmpty()) {
return fail(noneFoundError);
}
// The LB policy config is provided in service_config.proto/JSON format.
NameResolver.ConfigOrError configOrError =
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry);
if (configOrError.getError() != null) {
// Should be impossible, because XdsClusterResource validated this
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
}
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.unmodifiableList(instances),
configOrError.getConfig(),
clusterConfig.getClusterResource().isHttp11ProxyAvailable());
if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
}
return childLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
this.resolvedAddresses = resolvedAddresses;
xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL);
xdsClient = xdsClientPool.getObject();
CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
logger.log(XdsLogLevel.INFO, "Config: {0}", config);
cdsLbState = new CdsLbState(config.name);
cdsLbState.start();
return Status.OK;
}
@Override
public void handleNameResolutionError(Status error) {
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
if (childLb != null) {
childLb.handleNameResolutionError(error);
if (cdsLbState != null && cdsLbState.childLb != null) {
cdsLbState.childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
@ -192,28 +111,314 @@ final class CdsLoadBalancer2 extends LoadBalancer {
@Override
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
if (childLb != null) {
childLb.shutdown();
childLb = null;
if (cdsLbState != null) {
cdsLbState.shutdown();
}
if (clusterSubscription != null) {
clusterSubscription.close();
clusterSubscription = null;
if (xdsClientPool != null) {
xdsClientPool.returnObject(xdsClient);
}
}
@CheckReturnValue // don't forget to return up the stack after the fail call
private Status fail(Status error) {
if (childLb != null) {
childLb.shutdown();
childLb = null;
}
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter
}
/**
* The state of a CDS working session of {@link CdsLoadBalancer2}. Created and started when
* receiving the CDS LB policy config with the top-level cluster name.
*/
private final class CdsLbState {
private String errorPrefix() {
return "CdsLb for " + clusterName + ": ";
private final ClusterState root;
private final Map<String, ClusterState> clusterStates = new ConcurrentHashMap<>();
private LoadBalancer childLb;
private CdsLbState(String rootCluster) {
root = new ClusterState(rootCluster);
}
private void start() {
root.start();
}
private void shutdown() {
root.shutdown();
if (childLb != null) {
childLb.shutdown();
}
}
private void handleClusterDiscovered() {
List<DiscoveryMechanism> instances = new ArrayList<>();
// Used for loop detection to break the infinite recursion that loops would cause
Map<ClusterState, List<ClusterState>> parentClusters = new HashMap<>();
Status loopStatus = null;
// Level-order traversal.
// Collect configurations for all non-aggregate (leaf) clusters.
Queue<ClusterState> queue = new ArrayDeque<>();
queue.add(root);
while (!queue.isEmpty()) {
int size = queue.size();
for (int i = 0; i < size; i++) {
ClusterState clusterState = queue.remove();
if (!clusterState.discovered) {
return; // do not proceed until all clusters discovered
}
if (clusterState.result == null) { // resource revoked or not exists
continue;
}
if (clusterState.isLeaf) {
if (instances.stream().map(inst -> inst.cluster).noneMatch(clusterState.name::equals)) {
DiscoveryMechanism instance;
if (clusterState.result.clusterType() == ClusterType.EDS) {
instance = DiscoveryMechanism.forEds(
clusterState.name, clusterState.result.edsServiceName(),
clusterState.result.lrsServerInfo(),
clusterState.result.maxConcurrentRequests(),
clusterState.result.upstreamTlsContext(),
clusterState.result.filterMetadata(),
clusterState.result.outlierDetection());
} else { // logical DNS
instance = DiscoveryMechanism.forLogicalDns(
clusterState.name, clusterState.result.dnsHostName(),
clusterState.result.lrsServerInfo(),
clusterState.result.maxConcurrentRequests(),
clusterState.result.upstreamTlsContext(),
clusterState.result.filterMetadata());
}
instances.add(instance);
}
} else {
if (clusterState.childClusterStates == null) {
continue;
}
// Do loop detection and break recursion if detected
List<String> namesCausingLoops = identifyLoops(clusterState, parentClusters);
if (namesCausingLoops.isEmpty()) {
queue.addAll(clusterState.childClusterStates.values());
} else {
// Do cleanup
if (childLb != null) {
childLb.shutdown();
childLb = null;
}
if (loopStatus != null) {
logger.log(XdsLogLevel.WARNING,
"Multiple loops in CDS config. Old msg: " + loopStatus.getDescription());
}
loopStatus = Status.UNAVAILABLE.withDescription(String.format(
"CDS error: circular aggregate clusters directly under %s for "
+ "root cluster %s, named %s, xDS node ID: %s",
clusterState.name, root.name, namesCausingLoops,
xdsClient.getBootstrapInfo().node().getId()));
}
}
}
}
if (loopStatus != null) {
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus)));
return;
}
if (instances.isEmpty()) { // none of non-aggregate clusters exists
if (childLb != null) {
childLb.shutdown();
childLb = null;
}
Status unavailable = Status.UNAVAILABLE.withDescription(String.format(
"CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster %s"
+ " xDS node ID: %s", root.name, xdsClient.getBootstrapInfo().node().getId()));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable)));
return;
}
// The LB policy config is provided in service_config.proto/JSON format.
NameResolver.ConfigOrError configOrError =
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
Arrays.asList(root.result.lbPolicyConfig()), lbRegistry);
if (configOrError.getError() != null) {
throw configOrError.getError().augmentDescription("Unable to parse the LB config")
.asRuntimeException();
}
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.unmodifiableList(instances),
configOrError.getConfig(),
root.result.isHttp11ProxyAvailable());
if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
}
childLb.handleResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
}
/**
* Returns children that would cause loops and builds up the parentClusters map.
**/
private List<String> identifyLoops(ClusterState clusterState,
Map<ClusterState, List<ClusterState>> parentClusters) {
Set<String> ancestors = new HashSet<>();
ancestors.add(clusterState.name);
addAncestors(ancestors, clusterState, parentClusters);
List<String> namesCausingLoops = new ArrayList<>();
for (ClusterState state : clusterState.childClusterStates.values()) {
if (ancestors.contains(state.name)) {
namesCausingLoops.add(state.name);
}
}
// Update parent map with entries from remaining children to clusterState
clusterState.childClusterStates.values().stream()
.filter(child -> !namesCausingLoops.contains(child.name))
.forEach(
child -> parentClusters.computeIfAbsent(child, k -> new ArrayList<>())
.add(clusterState));
return namesCausingLoops;
}
/** Recursively add all parents to the ancestors list. **/
private void addAncestors(Set<String> ancestors, ClusterState clusterState,
Map<ClusterState, List<ClusterState>> parentClusters) {
List<ClusterState> directParents = parentClusters.get(clusterState);
if (directParents != null) {
directParents.stream().map(c -> c.name).forEach(ancestors::add);
directParents.forEach(p -> addAncestors(ancestors, p, parentClusters));
}
}
private void handleClusterDiscoveryError(Status error) {
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorWithNodeId = error.withDescription(
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
if (childLb != null) {
childLb.handleNameResolutionError(errorWithNodeId);
} else {
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(errorWithNodeId)));
}
}
private final class ClusterState implements ResourceWatcher<CdsUpdate> {
private final String name;
@Nullable
private Map<String, ClusterState> childClusterStates;
@Nullable
private CdsUpdate result;
// Following fields are effectively final.
private boolean isLeaf;
private boolean discovered;
private boolean shutdown;
private ClusterState(String name) {
this.name = name;
}
private void start() {
shutdown = false;
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
}
void shutdown() {
shutdown = true;
xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this);
if (childClusterStates != null) {
// recursively shut down all descendants
childClusterStates.values().stream()
.filter(state -> !state.shutdown)
.forEach(ClusterState::shutdown);
}
}
@Override
public void onError(Status error) {
Status status = Status.UNAVAILABLE
.withDescription(
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
name, error.getCode(), error.getDescription()))
.withCause(error.getCause());
if (shutdown) {
return;
}
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
}
@Override
public void onResourceDoesNotExist(String resourceName) {
if (shutdown) {
return;
}
discovered = true;
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
}
childClusterStates = null;
}
handleClusterDiscovered();
}
@Override
public void onChanged(final CdsUpdate update) {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update;
if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
update.clusterName(), update.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : update.prioritizedClusterNames()) {
if (newChildStates.containsKey(cluster)) {
logger.log(XdsLogLevel.WARNING,
String.format("duplicate cluster name %s in aggregate %s is being ignored",
cluster, update.clusterName()));
continue;
}
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState;
if (clusterStates.containsKey(cluster)) {
childState = clusterStates.get(cluster);
if (childState.shutdown) {
childState.start();
}
} else {
childState = new ClusterState(cluster);
clusterStates.put(cluster, childState);
childState.start();
}
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
}
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
}
}
childClusterStates = newChildStates;
} else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName(), update.edsServiceName());
} else { // logical DNS
isLeaf = true;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
}
handleClusterDiscovered();
}
}
}
}

View File

@ -36,6 +36,8 @@ import java.util.Map;
@Internal
public class CdsLoadBalancerProvider extends LoadBalancerProvider {
private static final String CLUSTER_KEY = "cluster";
@Override
public boolean isAvailable() {
return true;
@ -68,12 +70,9 @@ public class CdsLoadBalancerProvider extends LoadBalancerProvider {
*/
static ConfigOrError parseLoadBalancingConfigPolicy(Map<String, ?> rawLoadBalancingPolicyConfig) {
try {
String cluster = JsonUtil.getString(rawLoadBalancingPolicyConfig, "cluster");
Boolean isDynamic = JsonUtil.getBoolean(rawLoadBalancingPolicyConfig, "is_dynamic");
if (isDynamic == null) {
isDynamic = Boolean.FALSE;
}
return ConfigOrError.fromConfig(new CdsConfig(cluster, isDynamic));
String cluster =
JsonUtil.getString(rawLoadBalancingPolicyConfig, CLUSTER_KEY);
return ConfigOrError.fromConfig(new CdsConfig(cluster));
} catch (RuntimeException e) {
return ConfigOrError.fromError(
Status.UNAVAILABLE.withCause(e).withDescription(
@ -90,28 +89,15 @@ public class CdsLoadBalancerProvider extends LoadBalancerProvider {
* Name of cluster to query CDS for.
*/
final String name;
/**
* Whether this cluster was dynamically chosen, so the XdsDependencyManager may be unaware of
* it without an explicit cluster subscription.
*/
final boolean isDynamic;
CdsConfig(String name) {
this(name, false);
}
CdsConfig(String name, boolean isDynamic) {
checkArgument(name != null && !name.isEmpty(), "name is null or empty");
this.name = name;
this.isDynamic = isDynamic;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", name)
.add("isDynamic", isDynamic)
.toString();
return MoreObjects.toStringHelper(this).add("name", name).toString();
}
}
}

View File

@ -99,11 +99,13 @@ final class LazyLoadBalancer extends ForwardingLoadBalancer {
@Override
public void shutdown() {
delegate = new NoopLoadBalancer();
}
private final class LazyPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
// activate() is a no-op after shutdown()
helper.getSynchronizationContext().execute(LazyDelegate.this::activate);
return PickResult.withNoResult();
}
@ -121,4 +123,17 @@ final class LazyLoadBalancer extends ForwardingLoadBalancer {
return new LazyLoadBalancer(helper, delegate);
}
}
private static final class NoopLoadBalancer extends LoadBalancer {
@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
return Status.OK;
}
@Override
public void handleNameResolutionError(Status error) {}
@Override
public void shutdown() {}
}
}

View File

@ -51,6 +51,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@ -437,7 +438,9 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
if (subchannelView.connectivityState == IDLE) {
syncContext.execute(() -> {
childLbState.getLb().requestConnection();
if (childLbState.getCurrentState() == IDLE) {
childLbState.getLb().requestConnection();
}
});
return PickResult.withNoResult(); // Indicates that this should be retried after backoff
@ -455,10 +458,11 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
return childLbState.getCurrentPicker().pickSubchannel(args);
}
if (!requestedConnection && subchannelView.connectivityState == IDLE) {
syncContext.execute(
() -> {
childLbState.getLb().requestConnection();
});
syncContext.execute(() -> {
if (childLbState.getCurrentState() == IDLE) {
childLbState.getLb().requestConnection();
}
});
requestedConnection = true;
}
}
@ -523,6 +527,22 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
this.requestHashHeader = requestHashHeader;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof RingHashConfig)) {
return false;
}
RingHashConfig that = (RingHashConfig) o;
return this.minRingSize == that.minRingSize
&& this.maxRingSize == that.maxRingSize
&& Objects.equals(this.requestHashHeader, that.requestHashHeader);
}
@Override
public int hashCode() {
return Objects.hash(minRingSize, maxRingSize, requestHashHeader);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)

View File

@ -104,7 +104,7 @@ public final class RingHashLoadBalancerProvider extends LoadBalancerProvider {
}
if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize) {
return ConfigOrError.fromError(Status.UNAVAILABLE.withDescription(
"Invalid 'minRingSize'/'maxRingSize'"));
"Invalid 'mingRingSize'/'maxRingSize'"));
}
return ConfigOrError.fromConfig(
new RingHashConfig(minRingSize, maxRingSize, requestHashHeader));

View File

@ -172,9 +172,7 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
lbConfig.getPolicyName()).parseLoadBalancingPolicyConfig(
lbConfig.getRawConfigValue());
if (configOrError.getError() != null) {
throw new ResourceInvalidException(
"Failed to parse lb config for cluster '" + cluster.getName() + "': "
+ configOrError.getError());
throw new ResourceInvalidException(structOrError.getErrorDetail());
}
updateBuilder.lbPolicyConfig(lbPolicyConfig);
@ -211,10 +209,6 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
} catch (InvalidProtocolBufferException e) {
return StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e);
}
if (clusterConfig.getClustersList().isEmpty()) {
return StructOrError.fromError("Cluster " + clusterName
+ ": aggregate ClusterConfig.clusters must not be empty");
}
return StructOrError.fromStruct(CdsUpdate.forAggregate(
clusterName, clusterConfig.getClustersList()));
}

View File

@ -254,12 +254,6 @@ final class XdsConfig {
}
public interface XdsClusterSubscriptionRegistry {
Subscription subscribeToCluster(String clusterName);
}
public interface Subscription extends Closeable {
/** Release resources without throwing exceptions. */
@Override
void close();
Closeable subscribeToCluster(String clusterName);
}
}

View File

@ -18,34 +18,25 @@ package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.internal.RetryingNameResolver;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsResourceType;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
@ -53,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
/**
@ -62,73 +54,46 @@ import javax.annotation.Nullable;
* applies to a single data plane authority.
*/
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
private enum TrackedWatcherTypeEnum {
LDS, RDS, CDS, EDS, DNS
}
private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
new TrackedWatcherType<>(TrackedWatcherTypeEnum.LDS);
private static final TrackedWatcherType<RdsUpdate> RDS_TYPE =
new TrackedWatcherType<>(TrackedWatcherTypeEnum.RDS);
private static final TrackedWatcherType<XdsClusterResource.CdsUpdate> CDS_TYPE =
new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
private static final TrackedWatcherType<List<EquivalentAddressGroup>> DNS_TYPE =
new TrackedWatcherType<>(TrackedWatcherTypeEnum.DNS);
// DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
// to an empty locality.
private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
static boolean enableLogicalDns = false;
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
private final String listenerName;
private final XdsClient xdsClient;
private final XdsConfigWatcher xdsConfigWatcher;
private final SynchronizationContext syncContext;
private final String dataPlaneAuthority;
private final NameResolver.Args nameResolverArgs;
private XdsConfigWatcher xdsConfigWatcher;
private StatusOr<XdsConfig> lastUpdate = null;
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers =
new EnumMap<>(TrackedWatcherTypeEnum.class);
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
private final Set<ClusterSubscription> subscriptions = new HashSet<>();
XdsDependencyManager(
XdsClient xdsClient,
SynchronizationContext syncContext,
String dataPlaneAuthority,
String listenerName,
NameResolver.Args nameResolverArgs) {
XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
SynchronizationContext syncContext, String dataPlaneAuthority,
String listenerName, NameResolver.Args nameResolverArgs,
ScheduledExecutorService scheduler) {
this.listenerName = checkNotNull(listenerName, "listenerName");
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs");
checkNotNull(nameResolverArgs, "nameResolverArgs");
checkNotNull(scheduler, "scheduler");
// start the ball rolling
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
}
public static String toContextStr(String typeName, String resourceName) {
return typeName + " resource " + resourceName;
}
public void start(XdsConfigWatcher xdsConfigWatcher) {
checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
// start the ball rolling
syncContext.execute(() -> addWatcher(LDS_TYPE, new LdsWatcher(listenerName)));
}
@Override
public XdsConfig.Subscription subscribeToCluster(String clusterName) {
checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
public Closeable subscribeToCluster(String clusterName) {
checkNotNull(clusterName, "clusterName");
ClusterSubscription subscription = new ClusterSubscription(clusterName);
syncContext.execute(() -> {
if (getWatchers(LDS_TYPE).isEmpty()) {
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
subscription.closed = true;
return; // shutdown() called
}
@ -139,40 +104,33 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
return subscription;
}
/**
* For all logical dns clusters refresh their results.
*/
public void requestReresolution() {
syncContext.execute(() -> {
for (TrackedWatcher<List<EquivalentAddressGroup>> watcher : getWatchers(DNS_TYPE).values()) {
DnsWatcher dnsWatcher = (DnsWatcher) watcher;
dnsWatcher.refresh();
}
});
}
private <T extends ResourceUpdate> void addWatcher(
TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
syncContext.throwIfNotInThisSynchronizationContext();
XdsResourceType<T> type = watcher.type;
String resourceName = watcher.resourceName;
getWatchers(watcherType).put(resourceName, watcher);
getWatchers(type).put(resourceName, watcher);
xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
}
public void shutdown() {
syncContext.execute(() -> {
for (TypeWatchers<?> watchers : resourceWatchers.values()) {
for (TrackedWatcher<?> watcher : watchers.watchers.values()) {
watcher.close();
}
shutdownWatchersForType(watchers);
}
resourceWatchers.clear();
subscriptions.clear();
});
}
private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
watcherEntry.getValue());
watcherEntry.getValue().cancelled = true;
}
}
private void releaseSubscription(ClusterSubscription subscription) {
checkNotNull(subscription, "subscription");
syncContext.execute(() -> {
@ -193,12 +151,12 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
*/
private void maybePublishConfig() {
syncContext.throwIfNotInThisSynchronizationContext();
if (getWatchers(LDS_TYPE).isEmpty()) {
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
return; // shutdown() called
}
boolean waitingOnResource = resourceWatchers.values().stream()
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
.anyMatch(TrackedWatcher::missingResult);
.anyMatch(XdsWatcherBase::missingResult);
if (waitingOnResource) {
return;
}
@ -233,8 +191,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
// Iterate watchers and build the XdsConfig
TrackedWatcher<XdsListenerResource.LdsUpdate> ldsWatcher
= tracer.getWatcher(LDS_TYPE, listenerName);
XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
= tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
if (ldsWatcher == null) {
return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"Bug: No listener watcher found for " + listenerName));
@ -280,13 +238,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
return StatusOr.fromValue(builder.build());
}
private <T> Map<String, TrackedWatcher<T>> getWatchers(TrackedWatcherType<T> watcherType) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
XdsResourceType<T> resourceType) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
if (typeWatchers == null) {
typeWatchers = new TypeWatchers<T>(watcherType);
resourceWatchers.put(watcherType.typeEnum, typeWatchers);
typeWatchers = new TypeWatchers<T>(resourceType);
resourceWatchers.put(resourceType, typeWatchers);
}
assert typeWatchers.watcherType == watcherType;
assert typeWatchers.resourceType == resourceType;
@SuppressWarnings("unchecked")
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
return tTypeWatchers.watchers;
@ -313,7 +272,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
return;
}
CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CDS_TYPE, clusterName);
CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
if (!cdsWatcherDataOr.hasValue()) {
clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
@ -332,17 +291,10 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
addConfigForCluster(clusters, childCluster, ancestors, tracer);
StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
if (!config.hasValue()) {
// gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
// exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
// watchers reports a transient ADS stream error, the policy should report that it is in
// TRANSIENT_FAILURE if it has never passed a config to its child.
//
// But there's currently disagreement about whether that is actually what we want, and
// that was not originally implemented in gRPC Java. So we're keeping Java's old
// behavior for now and only failing the "leaves" (which is a bit arbitrary for a
// cycle).
leafNames.add(childCluster);
continue;
clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
"Unable to get leaves for " + clusterName + ": "
+ config.getStatus().getDescription())));
return;
}
XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
if (children instanceof AggregateConfig) {
@ -356,8 +308,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
child = new AggregateConfig(ImmutableList.copyOf(leafNames));
break;
case EDS:
TrackedWatcher<XdsEndpointResource.EdsUpdate> edsWatcher =
tracer.getWatcher(EDS_TYPE, cdsWatcher.getEdsServiceName());
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
if (edsWatcher != null) {
child = new EndpointConfig(edsWatcher.getData());
} else {
@ -366,79 +318,40 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
}
break;
case LOGICAL_DNS:
if (enableLogicalDns) {
TrackedWatcher<List<EquivalentAddressGroup>> dnsWatcher =
tracer.getWatcher(DNS_TYPE, cdsUpdate.dnsHostName());
child = new EndpointConfig(dnsToEdsUpdate(dnsWatcher.getData(), cdsUpdate.dnsHostName()));
} else {
child = new EndpointConfig(StatusOr.fromStatus(
Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
}
// TODO get the resolved endpoint configuration
child = new EndpointConfig(StatusOr.fromStatus(
Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
break;
default:
child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"Unknown type in cluster " + clusterName + " " + cdsUpdate.clusterType())));
}
if (clusters.containsKey(clusterName)) {
// If a cycle is detected, we'll have detected it while recursing, so now there will be a key
// present. We don't want to overwrite it with a non-error value.
return;
}
clusters.put(clusterName, StatusOr.fromValue(
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
}
private static StatusOr<XdsEndpointResource.EdsUpdate> dnsToEdsUpdate(
StatusOr<List<EquivalentAddressGroup>> dnsData, String dnsHostName) {
if (!dnsData.hasValue()) {
return StatusOr.fromStatus(dnsData.getStatus());
}
List<Endpoints.LbEndpoint> endpoints = new ArrayList<>();
for (EquivalentAddressGroup eag : dnsData.getValue()) {
endpoints.add(Endpoints.LbEndpoint.create(eag, 1, true, dnsHostName, ImmutableMap.of()));
}
LocalityLbEndpoints lbEndpoints =
LocalityLbEndpoints.create(endpoints, 1, 0, ImmutableMap.of());
return StatusOr.fromValue(new XdsEndpointResource.EdsUpdate(
"fakeEds_logicalDns",
Collections.singletonMap(LOGICAL_DNS_CLUSTER_LOCALITY, lbEndpoints),
new ArrayList<>()));
}
private void addRdsWatcher(String resourceName) {
if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
return;
}
addWatcher(RDS_TYPE, new RdsWatcher(resourceName));
addWatcher(new RdsWatcher(resourceName));
}
private void addEdsWatcher(String edsServiceName) {
if (getWatchers(EDS_TYPE).containsKey(edsServiceName)) {
if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
return;
}
addWatcher(EDS_TYPE, new EdsWatcher(edsServiceName));
addWatcher(new EdsWatcher(edsServiceName));
}
private void addClusterWatcher(String clusterName) {
if (getWatchers(CDS_TYPE).containsKey(clusterName)) {
if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
return;
}
addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
}
private void addDnsWatcher(String dnsHostName) {
syncContext.throwIfNotInThisSynchronizationContext();
if (getWatchers(DNS_TYPE).containsKey(dnsHostName)) {
return;
}
DnsWatcher watcher = new DnsWatcher(dnsHostName, nameResolverArgs);
getWatchers(DNS_TYPE).put(dnsHostName, watcher);
watcher.start();
addWatcher(new CdsWatcher(clusterName));
}
private void updateRoutes(List<VirtualHost> virtualHosts) {
@ -476,40 +389,13 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
return clusters;
}
private static NameResolver createNameResolver(
String dnsHostName,
NameResolver.Args nameResolverArgs) {
URI uri;
try {
uri = new URI("dns", "", "/" + dnsHostName, null);
} catch (URISyntaxException e) {
return new FailingNameResolver(
Status.INTERNAL.withDescription("Bug, invalid URI creation: " + dnsHostName)
.withCause(e));
}
NameResolverProvider provider =
nameResolverArgs.getNameResolverRegistry().getProviderForScheme("dns");
if (provider == null) {
return new FailingNameResolver(
Status.INTERNAL.withDescription("Could not find dns name resolver"));
}
NameResolver bareResolver = provider.newNameResolver(uri, nameResolverArgs);
if (bareResolver == null) {
return new FailingNameResolver(
Status.INTERNAL.withDescription("DNS name resolver provider returned null: " + uri));
}
return RetryingNameResolver.wrap(bareResolver, nameResolverArgs);
}
private static class TypeWatchers<T> {
private static class TypeWatchers<T extends ResourceUpdate> {
// Key is resource name
final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
final TrackedWatcherType<T> watcherType;
final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
final XdsResourceType<T> resourceType;
TypeWatchers(TrackedWatcherType<T> watcherType) {
this.watcherType = checkNotNull(watcherType, "watcherType");
TypeWatchers(XdsResourceType<T> resourceType) {
this.resourceType = resourceType;
}
}
@ -521,7 +407,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
void onUpdate(StatusOr<XdsConfig> config);
}
private final class ClusterSubscription implements XdsConfig.Subscription {
private final class ClusterSubscription implements Closeable {
private final String clusterName;
boolean closed; // Accessed from syncContext
@ -534,43 +420,45 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
}
@Override
public void close() {
public void close() throws IOException {
releaseSubscription(this);
}
}
/** State for tracing garbage collector. */
private static final class WatcherTracer {
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers;
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> usedWatchers;
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
public WatcherTracer(Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers) {
public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
this.resourceWatchers = resourceWatchers;
this.usedWatchers = new EnumMap<>(TrackedWatcherTypeEnum.class);
for (Map.Entry<TrackedWatcherTypeEnum, TypeWatchers<?>> me : resourceWatchers.entrySet()) {
usedWatchers.put(me.getKey(), newTypeWatchers(me.getValue().watcherType));
this.usedWatchers = new HashMap<>();
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
usedWatchers.put(type, newTypeWatchers(type));
}
}
private static <T> TypeWatchers<T> newTypeWatchers(TrackedWatcherType<T> type) {
private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
XdsResourceType<T> type) {
return new TypeWatchers<T>(type);
}
public <T> TrackedWatcher<T> getWatcher(TrackedWatcherType<T> watcherType, String name) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
XdsResourceType<T> resourceType, String name) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
if (typeWatchers == null) {
return null;
}
assert typeWatchers.watcherType == watcherType;
assert typeWatchers.resourceType == resourceType;
@SuppressWarnings("unchecked")
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
TrackedWatcher<T> watcher = tTypeWatchers.watchers.get(name);
XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
if (watcher == null) {
return null;
}
@SuppressWarnings("unchecked")
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(watcherType.typeEnum);
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
usedTypeWatchers.watchers.put(name, watcher);
return watcher;
}
@ -578,9 +466,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
/** Shut down unused watchers. */
public void closeUnusedWatchers() {
boolean changed = false; // Help out the GC by preferring old objects
for (TrackedWatcherTypeEnum key : resourceWatchers.keySet()) {
TypeWatchers<?> orig = resourceWatchers.get(key);
TypeWatchers<?> used = usedWatchers.get(key);
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
TypeWatchers<?> orig = resourceWatchers.get(type);
TypeWatchers<?> used = usedWatchers.get(type);
for (String name : orig.watchers.keySet()) {
if (used.watchers.containsKey(name)) {
continue;
@ -595,33 +483,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
}
}
@SuppressWarnings("UnusedTypeParameter")
private static final class TrackedWatcherType<T> {
public final TrackedWatcherTypeEnum typeEnum;
public TrackedWatcherType(TrackedWatcherTypeEnum typeEnum) {
this.typeEnum = checkNotNull(typeEnum, "typeEnum");
}
}
private interface TrackedWatcher<T> {
@Nullable
StatusOr<T> getData();
default boolean missingResult() {
return getData() == null;
}
default boolean hasDataValue() {
StatusOr<T> data = getData();
return data != null && data.hasValue();
}
void close();
}
private abstract class XdsWatcherBase<T extends ResourceUpdate>
implements ResourceWatcher<T>, TrackedWatcher<T> {
implements ResourceWatcher<T> {
private final XdsResourceType<T> type;
private final String resourceName;
boolean cancelled;
@ -676,18 +539,24 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
protected abstract void subscribeToChildren(T update);
@Override
public void close() {
cancelled = true;
xdsClient.cancelXdsResourceWatch(type, resourceName, this);
}
@Override
boolean missingResult() {
return data == null;
}
@Nullable
public StatusOr<T> getData() {
StatusOr<T> getData() {
return data;
}
boolean hasDataValue() {
return data != null && data.hasValue();
}
public String toContextString() {
return toContextStr(type.typeName(), resourceName);
}
@ -738,7 +607,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
if (rdsName == null) {
return null;
}
return (RdsWatcher) tracer.getWatcher(RDS_TYPE, rdsName);
return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
}
public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
@ -804,7 +673,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
CdsWatcher(String resourceName) {
super(XdsClusterResource.getInstance(), checkNotNull(resourceName, "resourceName"));
super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
}
@Override
@ -814,9 +683,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
addEdsWatcher(getEdsServiceName());
break;
case LOGICAL_DNS:
if (enableLogicalDns) {
addDnsWatcher(update.dnsHostName());
}
// no eds needed
break;
case AGGREGATE:
update.prioritizedClusterNames()
@ -839,107 +706,10 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
private EdsWatcher(String resourceName) {
super(XdsEndpointResource.getInstance(), checkNotNull(resourceName, "resourceName"));
super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
}
@Override
public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
}
private final class DnsWatcher implements TrackedWatcher<List<EquivalentAddressGroup>> {
private final NameResolver resolver;
@Nullable
private StatusOr<List<EquivalentAddressGroup>> data;
private boolean cancelled;
public DnsWatcher(String dnsHostName, NameResolver.Args nameResolverArgs) {
this.resolver = createNameResolver(dnsHostName, nameResolverArgs);
}
public void start() {
resolver.start(new NameResolverListener());
}
public void refresh() {
if (cancelled) {
return;
}
resolver.refresh();
}
@Override
@Nullable
public StatusOr<List<EquivalentAddressGroup>> getData() {
return data;
}
@Override
public void close() {
if (cancelled) {
return;
}
cancelled = true;
resolver.shutdown();
}
private class NameResolverListener extends NameResolver.Listener2 {
@Override
public void onResult(final NameResolver.ResolutionResult resolutionResult) {
syncContext.execute(() -> onResult2(resolutionResult));
}
@Override
public Status onResult2(final NameResolver.ResolutionResult resolutionResult) {
if (cancelled) {
return Status.OK;
}
data = resolutionResult.getAddressesOrError();
maybePublishConfig();
return resolutionResult.getAddressesOrError().getStatus();
}
@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (cancelled) {
return;
}
// DnsNameResolver cannot distinguish between address-not-found and transient errors.
// Assume it is a transient error.
// TODO: Once the resolution note API is available, don't throw away the error if
// hasDataValue(); pass it as the note instead
if (!hasDataValue()) {
data = StatusOr.fromStatus(error);
maybePublishConfig();
}
}
});
}
}
}
private static final class FailingNameResolver extends NameResolver {
private final Status status;
public FailingNameResolver(Status status) {
checkNotNull(status, "status");
checkArgument(!status.isOk(), "Status must not be OK");
this.status = status;
}
@Override
public void start(Listener2 listener) {
listener.onError(status);
}
@Override
public String getServiceAuthority() {
return "bug-if-you-see-this-authority";
}
@Override
public void shutdown() {}
}
}

View File

@ -230,8 +230,7 @@ final class XdsNameResolver extends NameResolver {
ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
callCounterProvider = SharedCallCounterMap.getInstance();
resolveState = new ResolveState(ldsResourceName);
resolveState.start();
resolveState = new ResolveState(ldsResourceName); // auto starts
}
private static String expandPercentS(String template, String replacement) {
@ -548,7 +547,7 @@ final class XdsNameResolver extends NameResolver {
if (clusterRefs.get(cluster).refCount.get() != 0) {
throw new AssertionError();
}
clusterRefs.remove(cluster).close();
clusterRefs.remove(cluster);
if (resolveState.lastConfigOrStatus.hasValue()) {
updateResolutionResult(resolveState.lastConfigOrStatus.getValue());
} else {
@ -654,12 +653,8 @@ final class XdsNameResolver extends NameResolver {
private ResolveState(String ldsResourceName) {
authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
xdsDependencyManager =
new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName,
nameResolverArgs);
}
void start() {
xdsDependencyManager.start(this);
new XdsDependencyManager(xdsClient, this, syncContext, authority, ldsResourceName,
nameResolverArgs, scheduler);
}
private void shutdown() {
@ -793,13 +788,9 @@ final class XdsNameResolver extends NameResolver {
clusterRefs.get(cluster).refCount.incrementAndGet();
} else {
if (clusterNameMap.containsKey(cluster)) {
assert cluster.startsWith("cluster:");
XdsConfig.Subscription subscription =
xdsDependencyManager.subscribeToCluster(cluster.substring("cluster:".length()));
clusterRefs.put(
cluster,
ClusterRefState.forCluster(
new AtomicInteger(1), clusterNameMap.get(cluster), subscription));
ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
}
if (rlsPluginConfigMap.containsKey(cluster)) {
clusterRefs.put(
@ -830,7 +821,7 @@ final class XdsNameResolver extends NameResolver {
for (String cluster : deletedClusters) {
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
if (count == 0) {
clusterRefs.remove(cluster).close();
clusterRefs.remove(cluster);
shouldUpdateResult = true;
}
}
@ -883,7 +874,7 @@ final class XdsNameResolver extends NameResolver {
for (String cluster : existingClusters) {
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
if (count == 0) {
clusterRefs.remove(cluster).close();
clusterRefs.remove(cluster);
}
}
existingClusters = null;
@ -969,18 +960,15 @@ final class XdsNameResolver extends NameResolver {
final String traditionalCluster;
@Nullable
final RlsPluginConfig rlsPluginConfig;
@Nullable
final XdsConfig.Subscription subscription;
private ClusterRefState(
AtomicInteger refCount, @Nullable String traditionalCluster,
@Nullable RlsPluginConfig rlsPluginConfig, @Nullable XdsConfig.Subscription subscription) {
@Nullable RlsPluginConfig rlsPluginConfig) {
this.refCount = refCount;
checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
"There must be exactly one non-null value in traditionalCluster and pluginConfig");
this.traditionalCluster = traditionalCluster;
this.rlsPluginConfig = rlsPluginConfig;
this.subscription = subscription;
}
private Map<String, ?> toLbPolicy() {
@ -1000,21 +988,12 @@ final class XdsNameResolver extends NameResolver {
}
}
private void close() {
if (subscription != null) {
subscription.close();
}
static ClusterRefState forCluster(AtomicInteger refCount, String name) {
return new ClusterRefState(refCount, name, null);
}
static ClusterRefState forCluster(
AtomicInteger refCount, String name, XdsConfig.Subscription subscription) {
return new ClusterRefState(refCount, name, null, checkNotNull(subscription, "subscription"));
}
static ClusterRefState forRlsPlugin(
AtomicInteger refCount,
RlsPluginConfig rlsPluginConfig) {
return new ClusterRefState(refCount, null, rlsPluginConfig, null);
static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
return new ClusterRefState(refCount, null, rlsPluginConfig);
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -2211,23 +2211,6 @@ public abstract class GrpcXdsClientImplTestBase {
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
}
@Test
public void cdsResponseWithEmptyAggregateCluster() {
DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE,
cdsResourceWatcher);
List<String> candidates = Arrays.asList();
Any clusterAggregate =
Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, "round_robin", null, null, candidates));
call.sendResponse(CDS, clusterAggregate, VERSION_1, "0000");
// Client sent an ACK CDS request.
String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: "
+ "Cluster cluster.googleapis.com: aggregate ClusterConfig.clusters must not be empty";
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
verify(cdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
}
@Test
public void cdsResponseWithCircuitBreakers() {
DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE,

View File

@ -0,0 +1,94 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import io.grpc.CallOptions;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.SynchronizationContext;
import io.grpc.internal.PickSubchannelArgsImpl;
import io.grpc.testing.TestMethodDescriptors;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit test for {@link io.grpc.xds.LazyLoadBalancer}. */
@RunWith(JUnit4.class)
public final class LazyLoadBalancerTest {
private SynchronizationContext syncContext =
new SynchronizationContext((t, e) -> {
throw new AssertionError(e);
});
private LoadBalancer.PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(),
new Metadata(),
CallOptions.DEFAULT,
new LoadBalancer.PickDetailsConsumer() {});
private FakeHelper helper = new FakeHelper();
@Test
public void pickerIsNoopAfterEarlyShutdown() {
LazyLoadBalancer lb = new LazyLoadBalancer(helper, new LoadBalancer.Factory() {
@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
throw new AssertionError("unexpected");
}
});
lb.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(Arrays.asList())
.build());
SubchannelPicker picker = helper.picker;
assertThat(picker).isNotNull();
lb.shutdown();
picker.pickSubchannel(args);
}
class FakeHelper extends LoadBalancer.Helper {
ConnectivityState state;
SubchannelPicker picker;
@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
throw new UnsupportedOperationException();
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
this.state = newState;
this.picker = newPicker;
}
@Override
public SynchronizationContext getSynchronizationContext() {
return syncContext;
}
@Override
public String getAuthority() {
return "localhost";
}
}
}

View File

@ -106,7 +106,7 @@ public class RingHashLoadBalancerProviderTest {
assertThat(configOrError.getError()).isNotNull();
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(configOrError.getError().getDescription())
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
}
@Test
@ -117,7 +117,7 @@ public class RingHashLoadBalancerProviderTest {
assertThat(configOrError.getError()).isNotNull();
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(configOrError.getError().getDescription())
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
}
@Test
@ -214,7 +214,7 @@ public class RingHashLoadBalancerProviderTest {
assertThat(configOrError.getError()).isNotNull();
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(configOrError.getError().getDescription())
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
}
@Test
@ -225,7 +225,7 @@ public class RingHashLoadBalancerProviderTest {
assertThat(configOrError.getError()).isNotNull();
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(configOrError.getError().getDescription())
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
}
@Test

View File

@ -42,6 +42,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.collect.Iterables;
import com.google.common.primitives.UnsignedInteger;
import com.google.common.testing.EqualsTester;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ConnectivityState;
@ -260,7 +261,7 @@ public class RingHashLoadBalancerTest {
private void verifyConnection(int times) {
for (int i = 0; i < times; i++) {
Subchannel connectOnce = connectionRequestedQueue.poll();
assertWithMessage("Null connection is at (%s) of (%s)", i, times)
assertWithMessage("Expected %s new connections, but found %s", times, i)
.that(connectOnce).isNotNull();
clearInvocations(connectOnce);
}
@ -647,7 +648,7 @@ public class RingHashLoadBalancerTest {
getSubchannel(servers, 2),
ConnectivityStateInfo.forTransientFailure(
Status.PERMISSION_DENIED.withDescription("permission denied")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(0);
PickResult result = pickerCaptor.getValue().pickSubchannel(args); // activate last subchannel
assertThat(result.getStatus().isOk()).isTrue();
@ -1113,6 +1114,19 @@ public class RingHashLoadBalancerTest {
assertThat(picks).containsExactly(subchannel1);
}
@Test
public void config_equalsTester() {
new EqualsTester()
.addEqualityGroup(
new RingHashConfig(1, 2, "headerA"),
new RingHashConfig(1, 2, "headerA"))
.addEqualityGroup(new RingHashConfig(1, 1, "headerA"))
.addEqualityGroup(new RingHashConfig(2, 2, "headerA"))
.addEqualityGroup(new RingHashConfig(1, 2, "headerB"))
.addEqualityGroup(new RingHashConfig(1, 2, ""))
.testEquals();
}
private List<Subchannel> initializeLbSubchannels(RingHashConfig config,
List<EquivalentAddressGroup> servers, InitializationFlags... initFlags) {

View File

@ -28,6 +28,7 @@ import static io.grpc.xds.XdsTestUtils.ENDPOINT_HOSTNAME;
import static io.grpc.xds.XdsTestUtils.ENDPOINT_PORT;
import static io.grpc.xds.XdsTestUtils.RDS_NAME;
import static io.grpc.xds.XdsTestUtils.getEdsNameForCluster;
import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
@ -42,36 +43,33 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Message;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.grpc.BindableService;
import io.grpc.ChannelLogger;
import io.grpc.EquivalentAddressGroup;
import io.grpc.ManagedChannel;
import io.grpc.NameResolver;
import io.grpc.NameResolverRegistry;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.StatusOrMatcher;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.testing.FakeNameResolverProvider;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsConfig.XdsClusterConfig;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.CommonBootstrapperTestUtils;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceMetadata;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsResourceType;
import io.grpc.xds.client.XdsTransportFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
@ -83,6 +81,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@ -97,6 +96,7 @@ import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
@ -108,20 +108,21 @@ public class XdsDependencyManagerTest {
public static final String CLUSTER_TYPE_NAME = XdsClusterResource.getInstance().typeName();
public static final String ENDPOINT_TYPE_NAME = XdsEndpointResource.getInstance().typeName();
@Mock
private XdsClientMetricReporter xdsClientMetricReporter;
private final SynchronizationContext syncContext =
new SynchronizationContext((t, e) -> {
throw new AssertionError(e);
});
private final FakeClock fakeClock = new FakeClock();
private XdsClient xdsClient = XdsTestUtils.createXdsClient(
Collections.singletonList("control-plane"),
serverInfo -> new GrpcXdsTransportFactory.GrpcXdsTransport(
InProcessChannelBuilder.forName(serverInfo.target()).directExecutor().build()),
fakeClock);
private ManagedChannel channel;
private XdsClient xdsClient;
private XdsDependencyManager xdsDependencyManager;
private TestWatcher xdsConfigWatcher;
private Server xdsServer;
private final FakeClock fakeClock = new FakeClock();
private final String serverName = "the-service-name";
private final Queue<XdsTestUtils.LrsRpcCall> loadReportCalls = new ArrayDeque<>();
private final AtomicBoolean adsEnded = new AtomicBoolean(true);
@ -129,7 +130,6 @@ public class XdsDependencyManagerTest {
private final XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService();
private final BindableService lrsService =
XdsTestUtils.createLrsService(lrsEnded, loadReportCalls);
private final NameResolverRegistry nameResolverRegistry = new NameResolverRegistry();
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
@ -147,16 +147,13 @@ public class XdsDependencyManagerTest {
.setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class))
.setChannelLogger(mock(ChannelLogger.class))
.setScheduledExecutorService(fakeClock.getScheduledExecutorService())
.setNameResolverRegistry(nameResolverRegistry)
.build();
private XdsDependencyManager xdsDependencyManager = new XdsDependencyManager(
xdsClient, syncContext, serverName, serverName, nameResolverArgs);
private boolean savedEnableLogicalDns;
private final ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService();
@Before
public void setUp() throws Exception {
cleanupRule.register(InProcessServerBuilder
xdsServer = cleanupRule.register(InProcessServerBuilder
.forName("control-plane")
.addService(controlPlaneService)
.addService(lrsService)
@ -166,11 +163,18 @@ public class XdsDependencyManagerTest {
XdsTestUtils.setAdsConfig(controlPlaneService, serverName);
channel = cleanupRule.register(
InProcessChannelBuilder.forName("control-plane").directExecutor().build());
XdsTransportFactory xdsTransportFactory =
ignore -> new GrpcXdsTransportFactory.GrpcXdsTransport(channel);
xdsClient = CommonBootstrapperTestUtils.createXdsClient(
Collections.singletonList(SERVER_URI), xdsTransportFactory, fakeClock,
new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter);
testWatcher = new TestWatcher();
xdsConfigWatcher = mock(TestWatcher.class, delegatesTo(testWatcher));
defaultXdsConfig = XdsTestUtils.getDefaultXdsConfig(serverName);
savedEnableLogicalDns = XdsDependencyManager.enableLogicalDns;
}
@After
@ -179,17 +183,19 @@ public class XdsDependencyManagerTest {
xdsDependencyManager.shutdown();
}
xdsClient.shutdown();
channel.shutdown(); // channel not owned by XdsClient
xdsServer.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
assertThat(adsEnded.get()).isTrue();
assertThat(lrsEnded.get()).isTrue();
assertThat(fakeClock.getPendingTasks()).isEmpty();
XdsDependencyManager.enableLogicalDns = savedEnableLogicalDns;
}
@Test
public void verify_basic_config() {
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
testWatcher.verifyStats(1, 0);
@ -197,7 +203,8 @@ public class XdsDependencyManagerTest {
@Test
public void verify_config_update() {
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
@ -214,7 +221,8 @@ public class XdsDependencyManagerTest {
@Test
public void verify_simple_aggregate() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
@ -273,7 +281,8 @@ public class XdsDependencyManagerTest {
List<String> childNames2 = Arrays.asList("clusterA", "clusterX");
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher).onUpdate(any());
Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
@ -304,7 +313,8 @@ public class XdsDependencyManagerTest {
@Test
public void testDelayedSubscription() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
String rootName1 = "root_c";
@ -350,7 +360,8 @@ public class XdsDependencyManagerTest {
edsMap.put("garbageEds", clusterLoadAssignment);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
@ -382,9 +393,8 @@ public class XdsDependencyManagerTest {
@Test
public void testMissingLds() {
String ldsName = "badLdsName";
xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
serverName, ldsName, nameResolverArgs);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, ldsName, nameResolverArgs, scheduler);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher).onUpdate(
@ -399,7 +409,8 @@ public class XdsDependencyManagerTest {
Listener serverListener =
ControlPlaneRule.buildServerListener().toBuilder().setName(serverName).build();
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(serverName, serverListener));
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher).onUpdate(
@ -416,7 +427,8 @@ public class XdsDependencyManagerTest {
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
ImmutableMap.of(serverName, clientListener));
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher).onUpdate(
@ -432,7 +444,8 @@ public class XdsDependencyManagerTest {
"wrong-virtual-host", XdsTestUtils.RDS_NAME, XdsTestUtils.CLUSTER_NAME);
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
// Update with a config that has a virtual host that doesn't match the server name
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
@ -447,9 +460,8 @@ public class XdsDependencyManagerTest {
String ldsResourceName =
"xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1";
xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
serverName, ldsResourceName, nameResolverArgs);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, ldsResourceName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher).onUpdate(
argThat(StatusOrMatcher.hasStatus(
@ -462,7 +474,8 @@ public class XdsDependencyManagerTest {
@Test
public void testChangeRdsName_fromLds() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
String newRdsName = "newRdsName1";
@ -517,7 +530,8 @@ public class XdsDependencyManagerTest {
// Start the actual test
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue();
@ -562,7 +576,8 @@ public class XdsDependencyManagerTest {
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
assertThat(config.getClusters().get("clusterA").hasValue()).isTrue();
@ -596,12 +611,12 @@ public class XdsDependencyManagerTest {
// The cycle is loaded and detected
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
assertThat(config.getClusters().get("clusterA").hasValue()).isFalse();
assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle");
assertThat(config.getClusters().get("clusterB").hasValue()).isTrue();
// Orphan the cycle and it is discarded
routeConfig =
@ -642,7 +657,8 @@ public class XdsDependencyManagerTest {
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
// Start the actual test
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue();
assertThat(initialConfig.getClusters().keySet())
@ -659,7 +675,8 @@ public class XdsDependencyManagerTest {
@Test
public void testChangeRdsName_FromLds_complexTree() {
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
// Create the same tree as in testMultipleParentsInCdsTree
Cluster rootCluster =
@ -704,7 +721,8 @@ public class XdsDependencyManagerTest {
public void testChangeAggCluster() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher).onUpdate(any());
// Setup initial config A -> A1 -> (A11, A12)
@ -752,83 +770,13 @@ public class XdsDependencyManagerTest {
inOrder.verify(xdsConfigWatcher).onUpdate(argThat(nameMatcher));
}
@Test
public void testLogicalDns_success() {
XdsDependencyManager.enableLogicalDns = true;
FakeSocketAddress fakeAddress = new FakeSocketAddress();
nameResolverRegistry.register(new FakeNameResolverProvider(
"dns:///dns.example.com:1111", fakeAddress));
Cluster cluster = Cluster.newBuilder()
.setName(CLUSTER_NAME)
.setType(Cluster.DiscoveryType.LOGICAL_DNS)
.setLoadAssignment(ClusterLoadAssignment.newBuilder()
.addEndpoints(LocalityLbEndpoints.newBuilder()
.addLbEndpoints(LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("dns.example.com")
.setPortValue(1111)))))))
.build();
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS,
ImmutableMap.of(CLUSTER_NAME, cluster));
xdsDependencyManager.start(xdsConfigWatcher);
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
XdsClusterConfig.ClusterChild clusterChild =
config.getClusters().get(CLUSTER_NAME).getValue().getChildren();
assertThat(clusterChild).isInstanceOf(XdsClusterConfig.EndpointConfig.class);
StatusOr<EdsUpdate> endpointOr = ((XdsClusterConfig.EndpointConfig) clusterChild).getEndpoint();
assertThat(endpointOr.getStatus()).isEqualTo(Status.OK);
assertThat(endpointOr.getValue()).isEqualTo(new EdsUpdate(
"fakeEds_logicalDns",
ImmutableMap.of(
Locality.create("", "", ""),
Endpoints.LocalityLbEndpoints.create(
Arrays.asList(Endpoints.LbEndpoint.create(
new EquivalentAddressGroup(fakeAddress),
1, true, "dns.example.com:1111", ImmutableMap.of())),
1, 0, ImmutableMap.of())),
Arrays.asList()));
}
@Test
public void testLogicalDns_noDnsNr() {
XdsDependencyManager.enableLogicalDns = true;
Cluster cluster = Cluster.newBuilder()
.setName(CLUSTER_NAME)
.setType(Cluster.DiscoveryType.LOGICAL_DNS)
.setLoadAssignment(ClusterLoadAssignment.newBuilder()
.addEndpoints(LocalityLbEndpoints.newBuilder()
.addLbEndpoints(LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("dns.example.com")
.setPortValue(1111)))))))
.build();
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS,
ImmutableMap.of(CLUSTER_NAME, cluster));
xdsDependencyManager.start(xdsConfigWatcher);
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
XdsClusterConfig.ClusterChild clusterChild =
config.getClusters().get(CLUSTER_NAME).getValue().getChildren();
assertThat(clusterChild).isInstanceOf(XdsClusterConfig.EndpointConfig.class);
StatusOr<EdsUpdate> endpointOr = ((XdsClusterConfig.EndpointConfig) clusterChild).getEndpoint();
assertThat(endpointOr.getStatus().getCode()).isEqualTo(Status.Code.INTERNAL);
assertThat(endpointOr.getStatus().getDescription())
.isEqualTo("Could not find dns name resolver");
}
@Test
public void testCdsError() throws IOException {
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_CDS, ImmutableMap.of(XdsTestUtils.CLUSTER_NAME,
Cluster.newBuilder().setName(XdsTestUtils.CLUSTER_NAME).build()));
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
Status status = xdsUpdateCaptor.getValue().getValue()
@ -841,7 +789,8 @@ public class XdsDependencyManagerTest {
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher).onUpdate(any());
@ -873,7 +822,8 @@ public class XdsDependencyManagerTest {
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher).onUpdate(any());
@ -905,7 +855,8 @@ public class XdsDependencyManagerTest {
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher).onUpdate(any());
@ -937,7 +888,8 @@ public class XdsDependencyManagerTest {
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher).onUpdate(any());
@ -970,7 +922,8 @@ public class XdsDependencyManagerTest {
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager.start(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher).onUpdate(any());
xdsDependencyManager.shutdown();
@ -1028,6 +981,4 @@ public class XdsDependencyManagerTest {
&& xdsConfig.getClusters().keySet().containsAll(expectedNames);
}
}
private static class FakeSocketAddress extends java.net.SocketAddress {}
}

View File

@ -52,20 +52,14 @@ import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.StatusOr;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.internal.JsonParser;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
import io.grpc.xds.client.Bootstrapper;
import io.grpc.xds.client.CommonBootstrapperTestUtils;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsResourceType;
import io.grpc.xds.client.XdsTransportFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -370,32 +364,6 @@ public class XdsTestUtils {
.setApiListener(clientListenerBuilder.build()).build();
}
public static XdsClient createXdsClient(
List<String> serverUris,
XdsTransportFactory xdsTransportFactory,
FakeClock fakeClock) {
return createXdsClient(
CommonBootstrapperTestUtils.buildBootStrap(serverUris),
xdsTransportFactory,
fakeClock,
new XdsClientMetricReporter() {});
}
/** Calls {@link CommonBootstrapperTestUtils#createXdsClient} with gRPC-specific values. */
public static XdsClient createXdsClient(
Bootstrapper.BootstrapInfo bootstrapInfo,
XdsTransportFactory xdsTransportFactory,
FakeClock fakeClock,
XdsClientMetricReporter xdsClientMetricReporter) {
return CommonBootstrapperTestUtils.createXdsClient(
bootstrapInfo,
xdsTransportFactory,
fakeClock,
new ExponentialBackoffPolicy.Provider(),
MessagePrinter.INSTANCE,
xdsClientMetricReporter);
}
/**
* Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with
* the same list of clusterName:clusterServiceName pair.