Allow users to start watching xDS resources (#10864)

This commit is contained in:
Anirudh Ramachandra 2024-02-01 22:51:00 +05:30 committed by GitHub
parent a97f21b61e
commit 3202370684
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 48 additions and 41 deletions

View File

@ -26,6 +26,7 @@ import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import io.grpc.ExperimentalApi;
import io.grpc.Status;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
@ -115,7 +116,8 @@ public abstract class XdsClient {
/**
* Watcher interface for a single requested xDS resource.
*/
interface ResourceWatcher<T extends ResourceUpdate> {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
public interface ResourceWatcher<T extends ResourceUpdate> {
/**
* Called when the resource discovery RPC encounters some transient error.
@ -123,7 +125,7 @@ public abstract class XdsClient {
* <p>Note that we expect that the implementer to:
* - Comply with the guarantee to not generate certain statuses by the library:
* https://grpc.github.io/grpc/core/md_doc_statuscodes.html. If the code needs to be
* propagated to the channel, override it with {@link Status.Code#UNAVAILABLE}.
* propagated to the channel, override it with {@link io.grpc.Status.Code#UNAVAILABLE}.
* - Keep {@link Status} description in one form or another, as it contains valuable debugging
* information.
*/
@ -306,23 +308,25 @@ public abstract class XdsClient {
/**
* Registers a data watcher for the given Xds resource.
*/
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor executor) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher,
Executor executor) {
throw new UnsupportedOperationException();
}
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
}
/**
* Unregisters the given resource watcher.
*/
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
throw new UnsupportedOperationException();
}

View File

@ -282,9 +282,10 @@ final class XdsClientImpl extends XdsClient
}
@Override
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor watcherExecutor) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher,
Executor watcherExecutor) {
syncContext.execute(new Runnable() {
@Override
@SuppressWarnings("unchecked")
@ -309,9 +310,9 @@ final class XdsClientImpl extends XdsClient
}
@Override
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
syncContext.execute(new Runnable() {
@Override
@SuppressWarnings("unchecked")

View File

@ -789,8 +789,9 @@ public class CdsLoadBalancer2Test {
@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher, Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher, Executor syncContext) {
assertThat(type.typeName()).isEqualTo("CDS");
watchers.computeIfAbsent(resourceName, k -> new ArrayList<>())
.add((ResourceWatcher<CdsUpdate>)watcher);
@ -798,9 +799,9 @@ public class CdsLoadBalancer2Test {
@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
assertThat(type.typeName()).isEqualTo("CDS");
assertThat(watchers).containsKey(resourceName);
List<ResourceWatcher<CdsUpdate>> watcherList = watchers.get(resourceName);

View File

@ -1184,9 +1184,10 @@ public class ClusterResolverLoadBalancerTest {
@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
assertThat(type.typeName()).isEqualTo("EDS");
assertThat(watchers).doesNotContainKey(resourceName);
watchers.put(resourceName, (ResourceWatcher<EdsUpdate>) watcher);
@ -1194,9 +1195,9 @@ public class ClusterResolverLoadBalancerTest {
@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
assertThat(type.typeName()).isEqualTo("EDS");
assertThat(watchers).containsKey(resourceName);
watchers.remove(resourceName);

View File

@ -1915,10 +1915,10 @@ public class XdsNameResolverTest {
@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
switch (resourceType.typeName()) {
case "LDS":
@ -1939,9 +1939,9 @@ public class XdsNameResolverTest {
}
@Override
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
switch (type.typeName()) {
case "LDS":
assertThat(ldsResource).isNotNull();

View File

@ -181,10 +181,10 @@ public class XdsServerTestHelper {
@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
switch (resourceType.typeName()) {
case "LDS":
assertThat(ldsWatcher).isNull();
@ -201,9 +201,9 @@ public class XdsServerTestHelper {
}
@Override
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
switch (type.typeName()) {
case "LDS":
assertThat(ldsWatcher).isNotNull();