mirror of https://github.com/grpc/grpc-java.git
xds: accept resources wrapped in a Resource message (#8997)
This commit is contained in:
parent
b4d8fc2c40
commit
012dbaf5be
|
|
@ -62,6 +62,8 @@ class BootstrapperImpl extends Bootstrapper {
|
|||
static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
|
||||
"envoy.lb.does_not_support_overprovisioning";
|
||||
@VisibleForTesting
|
||||
static final String CLIENT_FEATURE_RESOURCE_IN_SOTW = "xds.config.resource-in-sotw";
|
||||
@VisibleForTesting
|
||||
static boolean enableFederation =
|
||||
!Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"))
|
||||
&& Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"));
|
||||
|
|
@ -177,6 +179,7 @@ class BootstrapperImpl extends Bootstrapper {
|
|||
nodeBuilder.setUserAgentName(buildVersion.getUserAgent());
|
||||
nodeBuilder.setUserAgentVersion(buildVersion.getImplementationVersion());
|
||||
nodeBuilder.addClientFeatures(CLIENT_FEATURE_DISABLE_OVERPROVISIONING);
|
||||
nodeBuilder.addClientFeatures(CLIENT_FEATURE_RESOURCE_IN_SOTW);
|
||||
builder.node(nodeBuilder.build());
|
||||
|
||||
Map<String, ?> certProvidersBlob = JsonUtil.getObject(rawData, "certificate_providers");
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3
|
|||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
|
||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
|
||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext;
|
||||
import io.envoyproxy.envoy.service.discovery.v3.Resource;
|
||||
import io.envoyproxy.envoy.type.v3.FractionalPercent;
|
||||
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
|
||||
import io.grpc.ChannelCredentials;
|
||||
|
|
@ -188,6 +189,9 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
|
|||
"type.googleapis.com/xds.type.v3.TypedStruct";
|
||||
private static final String TYPE_URL_FILTER_CONFIG =
|
||||
"type.googleapis.com/envoy.config.route.v3.FilterConfig";
|
||||
private static final String TYPE_URL_RESOURCE_V2 = "type.googleapis.com/envoy.api.v2.Resource";
|
||||
private static final String TYPE_URL_RESOURCE_V3 =
|
||||
"type.googleapis.com/envoy.service.discovery.v3.Resource";
|
||||
// TODO(zdapeng): need to discuss how to handle unsupported values.
|
||||
private static final Set<Code> SUPPORTED_RETRYABLE_CODES =
|
||||
Collections.unmodifiableSet(EnumSet.of(
|
||||
|
|
@ -274,6 +278,17 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
|
|||
serverLrsClientMap.put(serverInfo, lrsClient);
|
||||
}
|
||||
|
||||
private Any maybeUnwrapResources(Any resource)
|
||||
throws InvalidProtocolBufferException {
|
||||
if (resource.getTypeUrl().equals(TYPE_URL_RESOURCE_V2)
|
||||
|| resource.getTypeUrl().equals(TYPE_URL_RESOURCE_V3)) {
|
||||
return unpackCompatibleType(resource, Resource.class, TYPE_URL_RESOURCE_V3,
|
||||
TYPE_URL_RESOURCE_V2).getResource();
|
||||
} else {
|
||||
return resource;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleLdsResponse(
|
||||
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce) {
|
||||
|
|
@ -287,10 +302,12 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
|
|||
for (int i = 0; i < resources.size(); i++) {
|
||||
Any resource = resources.get(i);
|
||||
|
||||
// Unpack the Listener.
|
||||
boolean isResourceV3 = resource.getTypeUrl().equals(ResourceType.LDS.typeUrl());
|
||||
boolean isResourceV3;
|
||||
Listener listener;
|
||||
try {
|
||||
resource = maybeUnwrapResources(resource);
|
||||
// Unpack the Listener.
|
||||
isResourceV3 = resource.getTypeUrl().equals(ResourceType.LDS.typeUrl());
|
||||
listener = unpackCompatibleType(resource, Listener.class, ResourceType.LDS.typeUrl(),
|
||||
ResourceType.LDS.typeUrlV2());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
|
|
@ -1424,6 +1441,7 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
|
|||
// Unpack the RouteConfiguration.
|
||||
RouteConfiguration routeConfig;
|
||||
try {
|
||||
resource = maybeUnwrapResources(resource);
|
||||
routeConfig = unpackCompatibleType(resource, RouteConfiguration.class,
|
||||
ResourceType.RDS.typeUrl(), ResourceType.RDS.typeUrlV2());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
|
|
@ -1552,6 +1570,7 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
|
|||
// Unpack the Cluster.
|
||||
Cluster cluster;
|
||||
try {
|
||||
resource = maybeUnwrapResources(resource);
|
||||
cluster = unpackCompatibleType(
|
||||
resource, Cluster.class, ResourceType.CDS.typeUrl(), ResourceType.CDS.typeUrlV2());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
|
|
@ -1801,6 +1820,7 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
|
|||
// Unpack the ClusterLoadAssignment.
|
||||
ClusterLoadAssignment assignment;
|
||||
try {
|
||||
resource = maybeUnwrapResources(resource);
|
||||
assignment =
|
||||
unpackCompatibleType(resource, ClusterLoadAssignment.class, ResourceType.EDS.typeUrl(),
|
||||
ResourceType.EDS.typeUrlV2());
|
||||
|
|
|
|||
|
|
@ -827,6 +827,7 @@ public class BootstrapperImplTest {
|
|||
.setBuildVersion(buildVersion.toString())
|
||||
.setUserAgentName(buildVersion.getUserAgent())
|
||||
.setUserAgentVersion(buildVersion.getImplementationVersion())
|
||||
.addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_DISABLE_OVERPROVISIONING);
|
||||
.addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_DISABLE_OVERPROVISIONING)
|
||||
.addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_RESOURCE_IN_SOTW);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -686,6 +686,21 @@ public abstract class ClientXdsClientTestBase {
|
|||
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrappedLdsResource() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);
|
||||
|
||||
// Client sends an ACK LDS request.
|
||||
call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0000");
|
||||
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
|
||||
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
|
||||
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
|
||||
.hasSize(VHOST_SIZE);
|
||||
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
|
||||
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
|
||||
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ldsResourceFound_containsRdsName() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);
|
||||
|
|
@ -1165,6 +1180,20 @@ public abstract class ClientXdsClientTestBase {
|
|||
verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrappedRdsResource() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(RDS, RDS_RESOURCE, rdsResourceWatcher);
|
||||
call.sendResponse(RDS, mf.buildWrappedResource(testRouteConfig), VERSION_1, "0000");
|
||||
|
||||
// Client sends an ACK RDS request.
|
||||
call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE);
|
||||
verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
|
||||
assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
|
||||
assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
|
||||
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
|
||||
verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cachedRdsResource_data() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(RDS, RDS_RESOURCE, rdsResourceWatcher);
|
||||
|
|
@ -1596,6 +1625,28 @@ public abstract class ClientXdsClientTestBase {
|
|||
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrappedCdsResource() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
|
||||
call.sendResponse(CDS, mf.buildWrappedResource(testClusterRoundRobin), VERSION_1, "0000");
|
||||
|
||||
// Client sent an ACK CDS request.
|
||||
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
|
||||
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
|
||||
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
|
||||
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
|
||||
assertThat(cdsUpdate.edsServiceName()).isNull();
|
||||
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isNull();
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
|
||||
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
|
||||
TIME_INCREMENT);
|
||||
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cdsResourceFound_leastRequestLbPolicy() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
|
||||
|
|
@ -2141,6 +2192,20 @@ public abstract class ClientXdsClientTestBase {
|
|||
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrappedEdsResourceFound() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(EDS, EDS_RESOURCE, edsResourceWatcher);
|
||||
call.sendResponse(EDS, mf.buildWrappedResource(testClusterLoadAssignment), VERSION_1, "0000");
|
||||
|
||||
// Client sent an ACK EDS request.
|
||||
call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE);
|
||||
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
|
||||
validateTestClusterLoadAssigment(edsUpdateCaptor.getValue());
|
||||
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
|
||||
TIME_INCREMENT);
|
||||
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cachedEdsResource_data() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(EDS, EDS_RESOURCE, edsResourceWatcher);
|
||||
|
|
@ -2789,6 +2854,8 @@ public abstract class ClientXdsClientTestBase {
|
|||
/** Throws {@link InvalidProtocolBufferException} on {@link Any#unpack(Class)}. */
|
||||
protected static final Any FAILING_ANY = Any.newBuilder().setTypeUrl("fake").build();
|
||||
|
||||
protected abstract Any buildWrappedResource(Any originalResource);
|
||||
|
||||
protected Message buildListenerWithApiListener(String name, Message routeConfiguration) {
|
||||
return buildListenerWithApiListener(
|
||||
name, routeConfiguration, Collections.<Message>emptyList());
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload;
|
|||
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
|
||||
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
|
||||
import io.envoyproxy.envoy.api.v2.Listener;
|
||||
import io.envoyproxy.envoy.api.v2.Resource;
|
||||
import io.envoyproxy.envoy.api.v2.RouteConfiguration;
|
||||
import io.envoyproxy.envoy.api.v2.auth.CommonTlsContext;
|
||||
import io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig;
|
||||
|
|
@ -253,6 +254,13 @@ public class ClientXdsClientV2Test extends ClientXdsClientTestBase {
|
|||
|
||||
private static class MessageFactoryV2 extends MessageFactory {
|
||||
|
||||
@Override
|
||||
protected Any buildWrappedResource(Any originalResource) {
|
||||
return Any.pack(Resource.newBuilder()
|
||||
.setResource(originalResource)
|
||||
.build());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
protected Message buildListenerWithApiListener(
|
||||
|
|
|
|||
|
|
@ -86,6 +86,7 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContex
|
|||
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
|
||||
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
|
||||
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
|
||||
import io.envoyproxy.envoy.service.discovery.v3.Resource;
|
||||
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase;
|
||||
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
|
||||
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
|
||||
|
|
@ -261,6 +262,13 @@ public class ClientXdsClientV3Test extends ClientXdsClientTestBase {
|
|||
|
||||
private static class MessageFactoryV3 extends MessageFactory {
|
||||
|
||||
@Override
|
||||
protected Any buildWrappedResource(Any originalResource) {
|
||||
return Any.pack(Resource.newBuilder()
|
||||
.setResource(originalResource)
|
||||
.build());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
protected Message buildListenerWithApiListener(
|
||||
|
|
|
|||
Loading…
Reference in New Issue