xds: xDS-based HTTP CONNECT configuration (#11861)

This commit is contained in:
MV Shiva 2025-03-06 08:10:18 +00:00 committed by GitHub
parent c340f4a2f3
commit 12197065fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 668 additions and 223 deletions

View File

@ -85,6 +85,7 @@ java_proto_library(
"@envoy_api//envoy/extensions/load_balancing_policies/ring_hash/v3:pkg",
"@envoy_api//envoy/extensions/load_balancing_policies/round_robin/v3:pkg",
"@envoy_api//envoy/extensions/load_balancing_policies/wrr_locality/v3:pkg",
"@envoy_api//envoy/extensions/transport_sockets/http_11_proxy/v3:pkg",
"@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg",
"@envoy_api//envoy/service/discovery/v3:pkg",
"@envoy_api//envoy/service/load_stats/v3:pkg",

View File

@ -243,7 +243,9 @@ final class CdsLoadBalancer2 extends LoadBalancer {
}
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.unmodifiableList(instances), configOrError.getConfig());
Collections.unmodifiableList(instances),
configOrError.getConfig(),
root.result.isHttp11ProxyAvailable());
if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
@ -59,6 +60,8 @@ import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@ -430,8 +433,18 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
.set(XdsAttributes.ATTR_SERVER_WEIGHT, weight)
.set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname())
.build();
EquivalentAddressGroup eag = new EquivalentAddressGroup(
endpoint.eag().getAddresses(), attr);
EquivalentAddressGroup eag;
if (config.isHttp11ProxyAvailable()) {
List<SocketAddress> rewrittenAddresses = new ArrayList<>();
for (SocketAddress addr : endpoint.eag().getAddresses()) {
rewrittenAddresses.add(rewriteAddress(
addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata()));
}
eag = new EquivalentAddressGroup(rewrittenAddresses, attr);
} else {
eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
}
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
addresses.add(eag);
}
@ -469,6 +482,35 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
new EndpointsUpdated().run();
}
private SocketAddress rewriteAddress(SocketAddress addr,
ImmutableMap<String, Object> endpointMetadata,
ImmutableMap<String, Object> localityMetadata) {
if (!(addr instanceof InetSocketAddress)) {
return addr;
}
SocketAddress proxyAddress;
try {
proxyAddress = (SocketAddress) endpointMetadata.get(
"envoy.http11_proxy_transport_socket.proxy_address");
if (proxyAddress == null) {
proxyAddress = (SocketAddress) localityMetadata.get(
"envoy.http11_proxy_transport_socket.proxy_address");
}
} catch (ClassCastException e) {
return addr;
}
if (proxyAddress == null) {
return addr;
}
return HttpConnectProxiedSocketAddress.newBuilder()
.setTargetAddress((InetSocketAddress) addr)
.setProxyAddress(proxyAddress)
.build();
}
private List<String> generatePriorityNames(String name,
Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
TreeMap<Integer, List<Locality>> todo = new TreeMap<>();

View File

@ -74,10 +74,17 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
final List<DiscoveryMechanism> discoveryMechanisms;
// GracefulSwitch configuration
final Object lbConfig;
private final boolean isHttp11ProxyAvailable;
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig) {
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig,
boolean isHttp11ProxyAvailable) {
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
this.isHttp11ProxyAvailable = isHttp11ProxyAvailable;
}
boolean isHttp11ProxyAvailable() {
return isHttp11ProxyAvailable;
}
@Override

View File

@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.EquivalentAddressGroup;
import java.net.InetSocketAddress;
import java.util.List;
@ -41,11 +42,13 @@ final class Endpoints {
// Locality's priority level.
abstract int priority();
abstract ImmutableMap<String, Object> localityMetadata();
static LocalityLbEndpoints create(List<LbEndpoint> endpoints, int localityWeight,
int priority) {
int priority, ImmutableMap<String, Object> localityMetadata) {
checkArgument(localityWeight > 0, "localityWeight must be greater than 0");
return new AutoValue_Endpoints_LocalityLbEndpoints(
ImmutableList.copyOf(endpoints), localityWeight, priority);
ImmutableList.copyOf(endpoints), localityWeight, priority, localityMetadata);
}
}
@ -63,17 +66,20 @@ final class Endpoints {
abstract String hostname();
abstract ImmutableMap<String, Object> endpointMetadata();
static LbEndpoint create(EquivalentAddressGroup eag, int loadBalancingWeight,
boolean isHealthy, String hostname) {
return new AutoValue_Endpoints_LbEndpoint(eag, loadBalancingWeight, isHealthy, hostname);
boolean isHealthy, String hostname, ImmutableMap<String, Object> endpointMetadata) {
return new AutoValue_Endpoints_LbEndpoint(
eag, loadBalancingWeight, isHealthy, hostname, endpointMetadata);
}
// Only for testing.
@VisibleForTesting
static LbEndpoint create(
String address, int port, int loadBalancingWeight, boolean isHealthy, String hostname) {
static LbEndpoint create(String address, int port, int loadBalancingWeight, boolean isHealthy,
String hostname, ImmutableMap<String, Object> endpointMetadata) {
return LbEndpoint.create(new EquivalentAddressGroup(new InetSocketAddress(address, port)),
loadBalancingWeight, isHealthy, hostname);
loadBalancingWeight, isHealthy, hostname, endpointMetadata);
}
}

View File

@ -36,6 +36,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
@ -240,11 +241,16 @@ final class GcpAuthenticationFilter implements Filter {
}
@Override
public String parse(Any any) throws InvalidProtocolBufferException {
Audience audience = any.unpack(Audience.class);
public String parse(Any any) throws ResourceInvalidException {
Audience audience;
try {
audience = any.unpack(Audience.class);
} catch (InvalidProtocolBufferException ex) {
throw new ResourceInvalidException("Invalid Resource in address proto", ex);
}
String url = audience.getUrl();
if (url.isEmpty()) {
throw new InvalidProtocolBufferException(
throw new ResourceInvalidException(
"Audience URL is empty. Metadata value must contain a valid URL.");
}
return url;

View File

@ -17,9 +17,14 @@
package io.grpc.xds;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Struct;
import io.envoyproxy.envoy.config.core.v3.Metadata;
import io.grpc.xds.GcpAuthenticationFilter.AudienceMetadataParser;
import io.grpc.xds.XdsEndpointResource.AddressMetadataParser;
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
import io.grpc.xds.internal.ProtobufJsonConverter;
import java.util.HashMap;
import java.util.Map;
@ -36,6 +41,7 @@ final class MetadataRegistry {
private MetadataRegistry() {
registerParser(new AudienceMetadataParser());
registerParser(new AddressMetadataParser());
}
static MetadataRegistry getInstance() {
@ -55,6 +61,54 @@ final class MetadataRegistry {
supportedParsers.remove(parser.getTypeUrl());
}
/**
* Parses cluster metadata into a structured map.
*
* <p>Values in {@code typed_filter_metadata} take precedence over
* {@code filter_metadata} when keys overlap, following Envoy API behavior. See
* <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/config/core/v3/base.proto#L217-L259">
* Envoy metadata documentation </a> for details.
*
* @param metadata the {@link Metadata} containing the fields to parse.
* @return an immutable map of parsed metadata.
* @throws ResourceInvalidException if parsing {@code typed_filter_metadata} fails.
*/
public ImmutableMap<String, Object> parseMetadata(Metadata metadata)
throws ResourceInvalidException {
ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder();
// Process typed_filter_metadata
for (Map.Entry<String, Any> entry : metadata.getTypedFilterMetadataMap().entrySet()) {
String key = entry.getKey();
Any value = entry.getValue();
MetadataValueParser parser = findParser(value.getTypeUrl());
if (parser != null) {
try {
Object parsedValue = parser.parse(value);
parsedMetadata.put(key, parsedValue);
} catch (ResourceInvalidException e) {
throw new ResourceInvalidException(
String.format("Failed to parse metadata key: %s, type: %s. Error: %s",
key, value.getTypeUrl(), e.getMessage()), e);
}
}
}
// building once to reuse in the next loop
ImmutableMap<String, Object> intermediateParsedMetadata = parsedMetadata.build();
// Process filter_metadata for remaining keys
for (Map.Entry<String, Struct> entry : metadata.getFilterMetadataMap().entrySet()) {
String key = entry.getKey();
if (!intermediateParsedMetadata.containsKey(key)) {
Struct structValue = entry.getValue();
Object jsonValue = ProtobufJsonConverter.convertToJson(structValue);
parsedMetadata.put(key, jsonValue);
}
}
return parsedMetadata.build();
}
interface MetadataValueParser {
String getTypeUrl();
@ -64,8 +118,8 @@ final class MetadataRegistry {
*
* @param any the {@link Any} object to parse.
* @return the parsed metadata value.
* @throws InvalidProtocolBufferException if the parsing fails.
* @throws ResourceInvalidException if the parsing fails.
*/
Object parse(Any any) throws InvalidProtocolBufferException;
Object parse(Any any) throws ResourceInvalidException;
}
}

View File

@ -25,7 +25,6 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
@ -33,10 +32,11 @@ import com.google.protobuf.Struct;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.core.v3.Metadata;
import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
import io.envoyproxy.envoy.config.core.v3.TransportSocket;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3.Http11ProxyUpstreamTransport;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.grpc.LoadBalancerRegistry;
@ -46,15 +46,12 @@ import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.client.XdsClient.ResourceUpdate;
import io.grpc.xds.client.XdsResourceType;
import io.grpc.xds.internal.ProtobufJsonConverter;
import io.grpc.xds.internal.security.CommonTlsContextUtil;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
@ -67,6 +64,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
@VisibleForTesting
public static boolean enableSystemRootCerts =
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_SYSTEM_ROOT_CERTS", false);
static boolean isEnabledXdsHttpConnect =
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT", false);
@VisibleForTesting
static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate";
@ -78,6 +77,9 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
"type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext";
private static final String TYPE_URL_UPSTREAM_TLS_CONTEXT_V2 =
"type.googleapis.com/envoy.api.v2.auth.UpstreamTlsContext";
static final String TRANSPORT_SOCKET_NAME_HTTP11_PROXY =
"type.googleapis.com/envoy.extensions.transport_sockets.http_11_proxy.v3"
+ ".Http11ProxyUpstreamTransport";
private final LoadBalancerRegistry loadBalancerRegistry
= LoadBalancerRegistry.getDefaultRegistry();
@ -177,10 +179,11 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
ImmutableMap.copyOf(cluster.getMetadata().getFilterMetadataMap()));
try {
MetadataRegistry registry = MetadataRegistry.getInstance();
ImmutableMap<String, Object> parsedFilterMetadata =
parseClusterMetadata(cluster.getMetadata());
registry.parseMetadata(cluster.getMetadata());
updateBuilder.parsedMetadata(parsedFilterMetadata);
} catch (InvalidProtocolBufferException e) {
} catch (ResourceInvalidException e) {
throw new ResourceInvalidException(
"Failed to parse xDS filter metadata for cluster '" + cluster.getName() + "': "
+ e.getMessage(), e);
@ -189,49 +192,6 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
return updateBuilder.build();
}
/**
* Parses cluster metadata into a structured map.
*
* <p>Values in {@code typed_filter_metadata} take precedence over
* {@code filter_metadata} when keys overlap, following Envoy API behavior. See
* <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/config/core/v3/base.proto#L217-L259">
* Envoy metadata documentation </a> for details.
*
* @param metadata the {@link Metadata} containing the fields to parse.
* @return an immutable map of parsed metadata.
* @throws InvalidProtocolBufferException if parsing {@code typed_filter_metadata} fails.
*/
private static ImmutableMap<String, Object> parseClusterMetadata(Metadata metadata)
throws InvalidProtocolBufferException {
ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder();
MetadataRegistry registry = MetadataRegistry.getInstance();
// Process typed_filter_metadata
for (Map.Entry<String, Any> entry : metadata.getTypedFilterMetadataMap().entrySet()) {
String key = entry.getKey();
Any value = entry.getValue();
MetadataValueParser parser = registry.findParser(value.getTypeUrl());
if (parser != null) {
Object parsedValue = parser.parse(value);
parsedMetadata.put(key, parsedValue);
}
}
// building once to reuse in the next loop
ImmutableMap<String, Object> intermediateParsedMetadata = parsedMetadata.build();
// Process filter_metadata for remaining keys
for (Map.Entry<String, Struct> entry : metadata.getFilterMetadataMap().entrySet()) {
String key = entry.getKey();
if (!intermediateParsedMetadata.containsKey(key)) {
Struct structValue = entry.getValue();
Object jsonValue = ProtobufJsonConverter.convertToJson(structValue);
parsedMetadata.put(key, jsonValue);
}
}
return parsedMetadata.build();
}
private static StructOrError<CdsUpdate.Builder> parseAggregateCluster(Cluster cluster) {
String clusterName = cluster.getName();
Cluster.CustomClusterType customType = cluster.getClusterType();
@ -259,6 +219,7 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
Long maxConcurrentRequests = null;
UpstreamTlsContext upstreamTlsContext = null;
OutlierDetection outlierDetection = null;
boolean isHttp11ProxyAvailable = false;
if (cluster.hasLrsServer()) {
if (!cluster.getLrsServer().hasSelf()) {
return StructOrError.fromError(
@ -281,17 +242,43 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
return StructOrError.fromError("Cluster " + clusterName
+ ": transport-socket-matches not supported.");
}
if (cluster.hasTransportSocket()) {
if (!TRANSPORT_SOCKET_NAME_TLS.equals(cluster.getTransportSocket().getName())) {
return StructOrError.fromError("transport-socket with name "
+ cluster.getTransportSocket().getName() + " not supported.");
boolean hasTransportSocket = cluster.hasTransportSocket();
TransportSocket transportSocket = cluster.getTransportSocket();
if (hasTransportSocket && !TRANSPORT_SOCKET_NAME_TLS.equals(transportSocket.getName())
&& !(isEnabledXdsHttpConnect
&& TRANSPORT_SOCKET_NAME_HTTP11_PROXY.equals(transportSocket.getName()))) {
return StructOrError.fromError(
"transport-socket with name " + transportSocket.getName() + " not supported.");
}
if (hasTransportSocket && isEnabledXdsHttpConnect
&& TRANSPORT_SOCKET_NAME_HTTP11_PROXY.equals(transportSocket.getName())) {
isHttp11ProxyAvailable = true;
try {
Http11ProxyUpstreamTransport wrappedTransportSocket = transportSocket
.getTypedConfig().unpack(io.envoyproxy.envoy.extensions.transport_sockets
.http_11_proxy.v3.Http11ProxyUpstreamTransport.class);
hasTransportSocket = wrappedTransportSocket.hasTransportSocket();
transportSocket = wrappedTransportSocket.getTransportSocket();
} catch (InvalidProtocolBufferException e) {
return StructOrError.fromError(
"Cluster " + clusterName + ": malformed Http11ProxyUpstreamTransport: " + e);
} catch (ClassCastException e) {
return StructOrError.fromError(
"Cluster " + clusterName
+ ": invalid transport_socket type in Http11ProxyUpstreamTransport");
}
}
if (hasTransportSocket && TRANSPORT_SOCKET_NAME_TLS.equals(transportSocket.getName())) {
try {
upstreamTlsContext = UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(
validateUpstreamTlsContext(
unpackCompatibleType(cluster.getTransportSocket().getTypedConfig(),
io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext.class,
TYPE_URL_UPSTREAM_TLS_CONTEXT, TYPE_URL_UPSTREAM_TLS_CONTEXT_V2),
unpackCompatibleType(transportSocket.getTypedConfig(),
io.envoyproxy.envoy.extensions
.transport_sockets.tls.v3.UpstreamTlsContext.class,
TYPE_URL_UPSTREAM_TLS_CONTEXT, TYPE_URL_UPSTREAM_TLS_CONTEXT_V2),
certProviderInstances));
} catch (InvalidProtocolBufferException | ResourceInvalidException e) {
return StructOrError.fromError(
@ -329,9 +316,10 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
return StructOrError.fromError(
"EDS service_name must be set when Cluster resource has an xdstp name");
}
return StructOrError.fromStruct(CdsUpdate.forEds(
clusterName, edsServiceName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext,
outlierDetection));
outlierDetection, isHttp11ProxyAvailable));
} else if (type.equals(Cluster.DiscoveryType.LOGICAL_DNS)) {
if (!cluster.hasLoadAssignment()) {
return StructOrError.fromError(
@ -366,7 +354,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
String dnsHostName = String.format(
Locale.US, "%s:%d", socketAddress.getAddress(), socketAddress.getPortValue());
return StructOrError.fromStruct(CdsUpdate.forLogicalDns(
clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext));
clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests,
upstreamTlsContext, isHttp11ProxyAvailable));
}
return StructOrError.fromError(
"Cluster " + clusterName + ": unsupported built-in discovery type: " + type);
@ -620,6 +609,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
@Nullable
abstract UpstreamTlsContext upstreamTlsContext();
abstract boolean isHttp11ProxyAvailable();
// List of underlying clusters making of this aggregate cluster.
// Only valid for AGGREGATE cluster.
@Nullable
@ -640,7 +631,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
.maxRingSize(0)
.choiceCount(0)
.filterMetadata(ImmutableMap.of())
.parsedMetadata(ImmutableMap.of());
.parsedMetadata(ImmutableMap.of())
.isHttp11ProxyAvailable(false);
}
static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) {
@ -653,26 +645,30 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
static Builder forEds(String clusterName, @Nullable String edsServiceName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext upstreamTlsContext,
@Nullable OutlierDetection outlierDetection) {
@Nullable OutlierDetection outlierDetection,
boolean isHttp11ProxyAvailable) {
return newBuilder(clusterName)
.clusterType(ClusterType.EDS)
.edsServiceName(edsServiceName)
.lrsServerInfo(lrsServerInfo)
.maxConcurrentRequests(maxConcurrentRequests)
.upstreamTlsContext(upstreamTlsContext)
.outlierDetection(outlierDetection);
.outlierDetection(outlierDetection)
.isHttp11ProxyAvailable(isHttp11ProxyAvailable);
}
static Builder forLogicalDns(String clusterName, String dnsHostName,
@Nullable ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext upstreamTlsContext) {
@Nullable UpstreamTlsContext upstreamTlsContext,
boolean isHttp11ProxyAvailable) {
return newBuilder(clusterName)
.clusterType(ClusterType.LOGICAL_DNS)
.dnsHostName(dnsHostName)
.lrsServerInfo(lrsServerInfo)
.maxConcurrentRequests(maxConcurrentRequests)
.upstreamTlsContext(upstreamTlsContext);
.upstreamTlsContext(upstreamTlsContext)
.isHttp11ProxyAvailable(isHttp11ProxyAvailable);
}
enum ClusterType {
@ -749,6 +745,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
// Private, use one of the static factory methods instead.
protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests);
protected abstract Builder isHttp11ProxyAvailable(boolean isHttp11ProxyAvailable);
// Private, use one of the static factory methods instead.
protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext);

View File

@ -20,9 +20,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.InetAddresses;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
@ -30,6 +35,7 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.internal.GrpcUtil;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsClient.ResourceUpdate;
@ -185,7 +191,8 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
@VisibleForTesting
@Nullable
static StructOrError<LocalityLbEndpoints> parseLocalityLbEndpoints(
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto) {
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto)
throws ResourceInvalidException {
// Filter out localities without or with 0 weight.
if (!proto.hasLoadBalancingWeight() || proto.getLoadBalancingWeight().getValue() < 1) {
return null;
@ -193,6 +200,15 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
if (proto.getPriority() < 0) {
return StructOrError.fromError("negative priority");
}
ImmutableMap<String, Object> localityMetadata;
MetadataRegistry registry = MetadataRegistry.getInstance();
try {
localityMetadata = registry.parseMetadata(proto.getMetadata());
} catch (ResourceInvalidException e) {
throw new ResourceInvalidException("Failed to parse Locality Endpoint metadata: "
+ e.getMessage(), e);
}
List<Endpoints.LbEndpoint> endpoints = new ArrayList<>(proto.getLbEndpointsCount());
for (io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint endpoint : proto.getLbEndpointsList()) {
// The endpoint field of each lb_endpoints must be set.
@ -200,6 +216,13 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) {
return StructOrError.fromError("LbEndpoint with no endpoint/address");
}
ImmutableMap<String, Object> endpointMetadata;
try {
endpointMetadata = registry.parseMetadata(endpoint.getMetadata());
} catch (ResourceInvalidException e) {
throw new ResourceInvalidException("Failed to parse Endpoint metadata: "
+ e.getMessage(), e);
}
List<java.net.SocketAddress> addresses = new ArrayList<>();
addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress()));
@ -214,10 +237,12 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
endpoints.add(Endpoints.LbEndpoint.create(
new EquivalentAddressGroup(addresses),
endpoint.getLoadBalancingWeight().getValue(), isHealthy,
endpoint.getEndpoint().getHostname()));
endpoint.getEndpoint().getHostname(),
endpointMetadata));
}
return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create(
endpoints, proto.getLoadBalancingWeight().getValue(), proto.getPriority()));
endpoints, proto.getLoadBalancingWeight().getValue(),
proto.getPriority(), localityMetadata));
}
private static InetSocketAddress getInetSocketAddress(Address address) {
@ -270,4 +295,47 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
.toString();
}
}
public static class AddressMetadataParser implements MetadataValueParser {
@Override
public String getTypeUrl() {
return "type.googleapis.com/envoy.config.core.v3.Address";
}
@Override
public java.net.SocketAddress parse(Any any) throws ResourceInvalidException {
SocketAddress socketAddress;
try {
socketAddress = any.unpack(Address.class).getSocketAddress();
} catch (InvalidProtocolBufferException ex) {
throw new ResourceInvalidException("Invalid Resource in address proto", ex);
}
validateAddress(socketAddress);
String ip = socketAddress.getAddress();
int port = socketAddress.getPortValue();
try {
return new InetSocketAddress(InetAddresses.forString(ip), port);
} catch (IllegalArgumentException e) {
throw createException("Invalid IP address or port: " + ip + ":" + port);
}
}
private void validateAddress(SocketAddress socketAddress) throws ResourceInvalidException {
if (socketAddress.getAddress().isEmpty()) {
throw createException("Address field is empty or invalid.");
}
long port = Integer.toUnsignedLong(socketAddress.getPortValue());
if (port > 65535) {
throw createException(String.format("Port value %d out of range 1-65535.", port));
}
}
private ResourceInvalidException createException(String message) {
return new ResourceInvalidException(
"Failed to parse envoy.config.core.v3.Address: " + message);
}
}
}

View File

@ -179,7 +179,7 @@ public class CdsLoadBalancer2Test {
public void discoverTopLevelEdsCluster() {
CdsUpdate update =
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
outlierDetection)
outlierDetection, false)
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
@ -198,7 +198,8 @@ public class CdsLoadBalancer2Test {
@Test
public void discoverTopLevelLogicalDnsCluster() {
CdsUpdate update =
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext)
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
false)
.leastRequestLbPolicy(3).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
@ -232,7 +233,7 @@ public class CdsLoadBalancer2Test {
@Test
public void nonAggregateCluster_resourceUpdate() {
CdsUpdate update =
CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext, outlierDetection)
CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext, outlierDetection, false)
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
@ -243,7 +244,7 @@ public class CdsLoadBalancer2Test {
100L, upstreamTlsContext, outlierDetection);
update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, null,
outlierDetection).roundRobinLbPolicy().build();
outlierDetection, false).roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
childLbConfig = (ClusterResolverConfig) childBalancer.config;
instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
@ -254,7 +255,8 @@ public class CdsLoadBalancer2Test {
@Test
public void nonAggregateCluster_resourceRevoked() {
CdsUpdate update =
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext)
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext,
false)
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
@ -298,16 +300,16 @@ public class CdsLoadBalancer2Test {
CLUSTER, cluster1, cluster2, cluster3, cluster4);
assertThat(childBalancers).isEmpty();
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster3, update3);
assertThat(childBalancers).isEmpty();
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null)
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null, false)
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
assertThat(childBalancers).isEmpty();
CdsUpdate update4 =
CdsUpdate.forEds(cluster4, null, LRS_SERVER_INFO, 300L, null, outlierDetection)
CdsUpdate.forEds(cluster4, null, LRS_SERVER_INFO, 300L, null, outlierDetection, false)
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster4, update4);
assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered
@ -362,10 +364,11 @@ public class CdsLoadBalancer2Test {
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null)
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null,
false)
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
@ -412,10 +415,11 @@ public class CdsLoadBalancer2Test {
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null)
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null,
false)
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
@ -467,7 +471,7 @@ public class CdsLoadBalancer2Test {
xdsClient.deliverCdsUpdate(cluster2, update2);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3);
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster3, update3);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
@ -518,7 +522,7 @@ public class CdsLoadBalancer2Test {
reset(helper);
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster3, update3);
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
@ -553,7 +557,7 @@ public class CdsLoadBalancer2Test {
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster3, update3);
// cluster2 (aggr.) -> [cluster3 (EDS)]
@ -602,7 +606,7 @@ public class CdsLoadBalancer2Test {
// Define EDS cluster
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster3, update3);
// cluster4 (agg) -> [cluster3 (EDS)] with dups (3 copies)
@ -649,7 +653,8 @@ public class CdsLoadBalancer2Test {
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
CdsUpdate update1 =
CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_INFO, 200L, null)
CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_INFO, 200L, null,
false)
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers);
@ -676,7 +681,7 @@ public class CdsLoadBalancer2Test {
@Test
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
CdsUpdate update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.shutdown).isFalse();
@ -692,7 +697,7 @@ public class CdsLoadBalancer2Test {
try {
xdsClient.deliverCdsUpdate(CLUSTER,
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
outlierDetection)
outlierDetection, false)
.lbPolicyConfig(ImmutableMap.of("unknownLb", ImmutableMap.of("foo", "bar"))).build());
} catch (Exception e) {
assertThat(e).hasMessageThat().contains("unknownLb");
@ -706,7 +711,7 @@ public class CdsLoadBalancer2Test {
try {
xdsClient.deliverCdsUpdate(CLUSTER,
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
outlierDetection).lbPolicyConfig(
outlierDetection, false).lbPolicyConfig(
ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1")))
.build());
} catch (Exception e) {

View File

@ -36,6 +36,7 @@ import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.InsecureChannelCredentials;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
@ -83,6 +84,7 @@ import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsResourceType;
import io.grpc.xds.internal.security.CommonTlsContextTestsUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@ -240,7 +242,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void edsClustersWithRingHashEndpointLbPolicy() {
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(edsDiscoveryMechanism1), ringHash);
Collections.singletonList(edsDiscoveryMechanism1), ringHash, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(childBalancers).isEmpty();
@ -252,14 +254,18 @@ public class ClusterResolverLoadBalancerTest {
LocalityLbEndpoints localityLbEndpoints1 =
LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(endpoint1, 0 /* loadBalancingWeight */, true, "hostname1"),
LbEndpoint.create(endpoint2, 0 /* loadBalancingWeight */, true, "hostname2")),
10 /* localityWeight */, 1 /* priority */);
LbEndpoint.create(endpoint1, 0 /* loadBalancingWeight */,
true, "hostname1", ImmutableMap.of()),
LbEndpoint.create(endpoint2, 0 /* loadBalancingWeight */,
true, "hostname2", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
LocalityLbEndpoints localityLbEndpoints2 =
LocalityLbEndpoints.create(
Collections.singletonList(
LbEndpoint.create(endpoint3, 60 /* loadBalancingWeight */, true, "hostname3")),
50 /* localityWeight */, 1 /* priority */);
LbEndpoint.create(
endpoint3, 60 /* loadBalancingWeight */, true,
"hostname3", ImmutableMap.of())),
50 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1,
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
@ -302,7 +308,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void edsClustersWithLeastRequestEndpointLbPolicy() {
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(edsDiscoveryMechanism1), leastRequest);
Collections.singletonList(edsDiscoveryMechanism1), leastRequest, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(childBalancers).isEmpty();
@ -312,8 +318,9 @@ public class ClusterResolverLoadBalancerTest {
LocalityLbEndpoints localityLbEndpoints =
LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, "hostname1")),
100 /* localityWeight */, 1 /* priority */);
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
"hostname1", ImmutableMap.of())),
100 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1,
ImmutableMap.of(locality1, localityLbEndpoints));
@ -348,7 +355,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void edsClustersEndpointHostname_addedToAddressAttribute() {
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest);
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(childBalancers).isEmpty();
@ -358,8 +365,9 @@ public class ClusterResolverLoadBalancerTest {
LocalityLbEndpoints localityLbEndpoints =
LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, "hostname1")),
100 /* localityWeight */, 1 /* priority */);
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
"hostname1", ImmutableMap.of())),
100 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1,
ImmutableMap.of(locality1, localityLbEndpoints));
@ -371,11 +379,104 @@ public class ClusterResolverLoadBalancerTest {
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo("hostname1");
}
@Test
public void endpointAddressRewritten_whenProxyMetadataIsInEndpointMetadata() {
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, true);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(childBalancers).isEmpty();
EquivalentAddressGroup endpoint =
new EquivalentAddressGroup(InetSocketAddress.createUnresolved("127.0.0.1", 8080));
// Proxy address in endpointMetadata (use FakeSocketAddress directly)
SocketAddress proxyAddress = new FakeSocketAddress("127.0.0.2");
ImmutableMap<String, Object> endpointMetadata =
ImmutableMap.of("envoy.http11_proxy_transport_socket.proxy_address", proxyAddress);
// No proxy in locality metadata
ImmutableMap<String, Object> localityMetadata = ImmutableMap.of();
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
"hostname1", endpointMetadata)),
100 /* localityWeight */, 1 /* priority */, localityMetadata);
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1,
ImmutableMap.of(locality1, localityLbEndpoints));
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
// Get the rewritten address
SocketAddress rewrittenAddress =
childBalancer.addresses.get(0).getAddresses().get(0);
assertThat(rewrittenAddress).isInstanceOf(HttpConnectProxiedSocketAddress.class);
HttpConnectProxiedSocketAddress proxiedSocket =
(HttpConnectProxiedSocketAddress) rewrittenAddress;
// Assert that the target address is the original address
assertThat(proxiedSocket.getTargetAddress())
.isEqualTo(endpoint.getAddresses().get(0));
// Assert that the proxy address is correctly set
assertThat(proxiedSocket.getProxyAddress()).isEqualTo(proxyAddress);
}
@Test
public void endpointAddressRewritten_whenProxyMetadataIsInLocalityMetadata() {
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, true);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(childBalancers).isEmpty();
EquivalentAddressGroup endpoint =
new EquivalentAddressGroup(InetSocketAddress.createUnresolved("127.0.0.2", 8080));
// No proxy in endpointMetadata
ImmutableMap<String, Object> endpointMetadata = ImmutableMap.of();
// Proxy address is now in localityMetadata
SocketAddress proxyAddress = new FakeSocketAddress("proxy-addr");
ImmutableMap<String, Object> localityMetadata =
ImmutableMap.of("envoy.http11_proxy_transport_socket.proxy_address", proxyAddress);
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
"hostname2", endpointMetadata)),
100 /* localityWeight */, 1 /* priority */, localityMetadata);
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1,
ImmutableMap.of(locality1, localityLbEndpoints));
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
// Get the rewritten address
SocketAddress rewrittenAddress = childBalancer.addresses.get(0).getAddresses().get(0);
// Assert that the address was rewritten
assertThat(rewrittenAddress).isInstanceOf(HttpConnectProxiedSocketAddress.class);
HttpConnectProxiedSocketAddress proxiedSocket =
(HttpConnectProxiedSocketAddress) rewrittenAddress;
// Assert that the target address is the original address
assertThat(proxiedSocket.getTargetAddress()).isEqualTo(endpoint.getAddresses().get(0));
// Assert that the proxy address is correctly set from locality metadata
assertThat(proxiedSocket.getProxyAddress()).isEqualTo(proxyAddress);
}
@Test
public void onlyEdsClusters_receivedEndpoints() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
assertThat(childBalancers).isEmpty();
@ -389,17 +490,21 @@ public class ClusterResolverLoadBalancerTest {
LocalityLbEndpoints localityLbEndpoints1 =
LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(endpoint1, 100, true, "hostname1"),
LbEndpoint.create(endpoint2, 100, true, "hostname1")),
70 /* localityWeight */, 1 /* priority */);
LbEndpoint.create(endpoint1, 100,
true, "hostname1", ImmutableMap.of()),
LbEndpoint.create(endpoint2, 100,
true, "hostname1", ImmutableMap.of())),
70 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
LocalityLbEndpoints localityLbEndpoints2 =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true, "hostname2")),
10 /* localityWeight */, 1 /* priority */);
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true,
"hostname2", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
LocalityLbEndpoints localityLbEndpoints3 =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint4, 100, true, "hostname3")),
20 /* localityWeight */, 2 /* priority */);
Collections.singletonList(LbEndpoint.create(endpoint4, 100, true,
"hostname3", ImmutableMap.of())),
20 /* localityWeight */, 2 /* priority */, ImmutableMap.of());
String priority1 = CLUSTER2 + "[child1]";
String priority2 = CLUSTER2 + "[child2]";
String priority3 = CLUSTER1 + "[child1]";
@ -487,7 +592,7 @@ public class ClusterResolverLoadBalancerTest {
private void verifyEdsPriorityNames(List<String> want,
Map<Locality, LocalityLbEndpoints>... updates) {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism2), roundRobin);
Arrays.asList(edsDiscoveryMechanism2), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME2);
assertThat(childBalancers).isEmpty();
@ -553,15 +658,17 @@ public class ClusterResolverLoadBalancerTest {
private LocalityLbEndpoints createEndpoints(int priority) {
return LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(makeAddress("endpoint-addr-1"), 100, true, "hostname1"),
LbEndpoint.create(makeAddress("endpoint-addr-2"), 100, true, "hostname2")),
70 /* localityWeight */, priority /* priority */);
LbEndpoint.create(makeAddress("endpoint-addr-1"), 100,
true, "hostname1", ImmutableMap.of()),
LbEndpoint.create(makeAddress("endpoint-addr-2"), 100,
true, "hostname2", ImmutableMap.of())),
70 /* localityWeight */, priority /* priority */, ImmutableMap.of());
}
@Test
public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
assertThat(childBalancers).isEmpty();
@ -583,7 +690,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void onlyEdsClusters_allResourcesRevoked_shutDownChildLbPolicy() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
assertThat(childBalancers).isEmpty();
@ -592,12 +699,14 @@ public class ClusterResolverLoadBalancerTest {
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints1 =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, "hostname1")),
10 /* localityWeight */, 1 /* priority */);
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true,
"hostname1", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
LocalityLbEndpoints localityLbEndpoints2 =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true, "hostname2")),
20 /* localityWeight */, 2 /* priority */);
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true,
"hostname2", ImmutableMap.of())),
20 /* localityWeight */, 2 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints1));
xdsClient.deliverClusterLoadAssignment(
@ -618,17 +727,19 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void handleEdsResource_ignoreUnhealthyEndpoints() {
ClusterResolverConfig config =
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
deliverLbConfig(config);
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints =
LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(endpoint1, 100, false /* isHealthy */, "hostname1"),
LbEndpoint.create(endpoint2, 100, true /* isHealthy */, "hostname2")),
10 /* localityWeight */, 1 /* priority */);
LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
"hostname1", ImmutableMap.of()),
LbEndpoint.create(endpoint2, 100, true /* isHealthy */,
"hostname2", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
@ -638,21 +749,21 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() {
ClusterResolverConfig config =
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
deliverLbConfig(config);
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints1 =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
"hostname1")),
10 /* localityWeight */, 1 /* priority */);
"hostname1", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
LocalityLbEndpoints localityLbEndpoints2 =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */,
"hostname2")),
10 /* localityWeight */, 1 /* priority */);
"hostname2", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1,
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
@ -665,21 +776,21 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() {
ClusterResolverConfig config =
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
deliverLbConfig(config);
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints1 =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
"hostname1")),
10 /* localityWeight */, 1 /* priority */);
"hostname1", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
LocalityLbEndpoints localityLbEndpoints2 =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */,
"hostname2")),
10 /* localityWeight */, 2 /* priority */);
"hostname2", ImmutableMap.of())),
10 /* localityWeight */, 2 /* priority */, ImmutableMap.of());
String priority2 = CLUSTER1 + "[child2]";
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1,
@ -691,15 +802,15 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void handleEdsResource_noHealthyEndpoint() {
ClusterResolverConfig config =
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
deliverLbConfig(config);
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
LocalityLbEndpoints localityLbEndpoints =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint, 100, false /* isHealthy */,
"hostname1")),
10 /* localityWeight */, 1 /* priority */);
"hostname1", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(EDS_SERVICE_NAME1,
Collections.singletonMap(locality1, localityLbEndpoints)); // single endpoint, unhealthy
@ -716,7 +827,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void onlyLogicalDnsCluster_endpointsResolved() {
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
assertThat(childBalancers).isEmpty();
@ -749,7 +860,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void onlyLogicalDnsCluster_handleRefreshNameResolution() {
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
assertThat(childBalancers).isEmpty();
@ -767,7 +878,7 @@ public class ClusterResolverLoadBalancerTest {
InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
backoffPolicy1, backoffPolicy2);
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
assertThat(childBalancers).isEmpty();
@ -813,7 +924,7 @@ public class ClusterResolverLoadBalancerTest {
public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() {
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
assertThat(childBalancers).isEmpty();
@ -851,7 +962,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void edsClustersAndLogicalDnsCluster_receivedEndpoints() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
@ -862,8 +973,9 @@ public class ClusterResolverLoadBalancerTest {
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
LocalityLbEndpoints localityLbEndpoints =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true, "hostname3")),
10 /* localityWeight */, 1 /* priority */);
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true,
"hostname3", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
@ -886,7 +998,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void noEdsResourceExists_useDnsResolutionResults() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
@ -910,7 +1022,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturnErrorPicker() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
@ -919,8 +1031,9 @@ public class ClusterResolverLoadBalancerTest {
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
LocalityLbEndpoints localityLbEndpoints =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint, 100, true, "hostname1")),
10 /* localityWeight */, 1 /* priority */);
Collections.singletonList(LbEndpoint.create(endpoint, 100, true,
"hostname1", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
resolver.deliverError(Status.UNKNOWN.withDescription("I am lost"));
@ -941,7 +1054,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void resolutionErrorAfterChildLbCreated_propagateErrorIfAllClustersEncounterError() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
@ -950,8 +1063,9 @@ public class ClusterResolverLoadBalancerTest {
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
LocalityLbEndpoints localityLbEndpoints =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint, 100, true, "hostname1")),
10 /* localityWeight */, 1 /* priority */);
Collections.singletonList(LbEndpoint.create(endpoint, 100, true,
"hostname1", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
assertThat(childBalancers).isEmpty(); // not created until all clusters resolved.
@ -976,7 +1090,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void resolutionErrorBeforeChildLbCreated_returnErrorPickerIfAllClustersEncounterError() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
@ -999,7 +1113,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void resolutionErrorBeforeChildLbCreated_edsOnly_returnErrorPicker() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1), roundRobin);
Arrays.asList(edsDiscoveryMechanism1), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(childBalancers).isEmpty();
@ -1017,7 +1131,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertResolverCreated("/" + DNS_HOST_NAME);
@ -1033,7 +1147,7 @@ public class ClusterResolverLoadBalancerTest {
@Test
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
@ -1043,8 +1157,9 @@ public class ClusterResolverLoadBalancerTest {
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints =
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, "hostname1")),
10 /* localityWeight */, 1 /* priority */);
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true,
"hostname1", ImmutableMap.of())),
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
resolver.deliverEndpointAddresses(Collections.singletonList(endpoint2));
@ -1118,37 +1233,37 @@ public class ClusterResolverLoadBalancerTest {
}
private static EquivalentAddressGroup makeAddress(final String name) {
class FakeSocketAddress extends SocketAddress {
private final String name;
return new EquivalentAddressGroup(new FakeSocketAddress(name));
}
private FakeSocketAddress(String name) {
this.name = name;
}
static class FakeSocketAddress extends SocketAddress {
private final String name;
@Override
public int hashCode() {
return Objects.hash(name);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FakeSocketAddress)) {
return false;
}
FakeSocketAddress that = (FakeSocketAddress) o;
return Objects.equals(name, that.name);
}
@Override
public String toString() {
return name;
}
private FakeSocketAddress(String name) {
this.name = name;
}
return new EquivalentAddressGroup(new FakeSocketAddress(name));
@Override
public int hashCode() {
return Objects.hash(name);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FakeSocketAddress)) {
return false;
}
FakeSocketAddress that = (FakeSocketAddress) o;
return Objects.equals(name, that.name);
}
@Override
public String toString() {
return name;
}
}
private static final class FakeXdsClient extends XdsClient {

View File

@ -18,6 +18,7 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.envoyproxy.envoy.config.route.v3.RouteAction.ClusterSpecifierCase.CLUSTER_SPECIFIER_PLUGIN;
import static io.grpc.xds.XdsClusterResource.TRANSPORT_SOCKET_NAME_HTTP11_PROXY;
import static io.grpc.xds.XdsEndpointResource.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS;
import static org.junit.Assert.fail;
@ -93,6 +94,7 @@ import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
import io.envoyproxy.envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin;
import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality;
import io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3.Http11ProxyUpstreamTransport;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateProviderPluginInstance;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
@ -1055,7 +1057,7 @@ public class GrpcXdsClientImplDataTest {
}
@Test
public void parseLocalityLbEndpoints_withHealthyEndpoints() {
public void parseLocalityLbEndpoints_withHealthyEndpoints() throws ResourceInvalidException {
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
.setLocality(Locality.newBuilder()
@ -1075,12 +1077,14 @@ public class GrpcXdsClientImplDataTest {
assertThat(struct.getErrorDetail()).isNull();
assertThat(struct.getStruct()).isEqualTo(
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, true, "")),
100, 1));
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888,
20, true, "", ImmutableMap.of())),
100, 1, ImmutableMap.of()));
}
@Test
public void parseLocalityLbEndpoints_treatUnknownHealthAsHealthy() {
public void parseLocalityLbEndpoints_treatUnknownHealthAsHealthy()
throws ResourceInvalidException {
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
.setLocality(Locality.newBuilder()
@ -1100,12 +1104,13 @@ public class GrpcXdsClientImplDataTest {
assertThat(struct.getErrorDetail()).isNull();
assertThat(struct.getStruct()).isEqualTo(
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, true, "")), 100,
1));
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888,
20, true, "", ImmutableMap.of())),
100, 1, ImmutableMap.of()));
}
@Test
public void parseLocalityLbEndpoints_withUnHealthyEndpoints() {
public void parseLocalityLbEndpoints_withUnHealthyEndpoints() throws ResourceInvalidException {
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
.setLocality(Locality.newBuilder()
@ -1125,12 +1130,13 @@ public class GrpcXdsClientImplDataTest {
assertThat(struct.getErrorDetail()).isNull();
assertThat(struct.getStruct()).isEqualTo(
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, false, "")), 100,
1));
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20,
false, "", ImmutableMap.of())),
100, 1, ImmutableMap.of()));
}
@Test
public void parseLocalityLbEndpoints_ignorZeroWeightLocality() {
public void parseLocalityLbEndpoints_ignorZeroWeightLocality() throws ResourceInvalidException {
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
.setLocality(Locality.newBuilder()
@ -1187,7 +1193,10 @@ public class GrpcXdsClientImplDataTest {
EquivalentAddressGroup expectedEag = new EquivalentAddressGroup(socketAddressList);
assertThat(struct.getStruct()).isEqualTo(
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(expectedEag, 20, true, "")), 100, 1));
Collections.singletonList(LbEndpoint.create(
expectedEag, 20, true, "", ImmutableMap.of())), 100, 1, ImmutableMap.of()));
} catch (ResourceInvalidException e) {
throw new RuntimeException(e);
} finally {
if (originalDualStackProp != null) {
System.setProperty(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, originalDualStackProp);
@ -1198,7 +1207,7 @@ public class GrpcXdsClientImplDataTest {
}
@Test
public void parseLocalityLbEndpoints_invalidPriority() {
public void parseLocalityLbEndpoints_invalidPriority() throws ResourceInvalidException {
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
.setLocality(Locality.newBuilder()
@ -2456,6 +2465,59 @@ public class GrpcXdsClientImplDataTest {
assertThat(update.parsedMetadata()).isEqualTo(expectedParsedMetadata);
}
@Test
public void processCluster_parsesAddressMetadata() throws Exception {
// Create an Address message
Address address = Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("192.168.1.1")
.setPortValue(8080)
.build())
.build();
// Wrap the Address in Any
Any addressMetadata = Any.newBuilder()
.setTypeUrl("type.googleapis.com/envoy.config.core.v3.Address")
.setValue(address.toByteString())
.build();
Struct filterMetadata = Struct.newBuilder()
.putFields("key1", Value.newBuilder().setStringValue("value1").build())
.putFields("key2", Value.newBuilder().setNumberValue(42).build())
.build();
Metadata metadata = Metadata.newBuilder()
.putTypedFilterMetadata("ADDRESS_METADATA", addressMetadata)
.putFilterMetadata("FILTER_METADATA", filterMetadata)
.build();
Cluster cluster = Cluster.newBuilder()
.setName("cluster-foo.googleapis.com")
.setType(DiscoveryType.EDS)
.setEdsClusterConfig(
EdsClusterConfig.newBuilder()
.setEdsConfig(
ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance()))
.setServiceName("service-foo.googleapis.com"))
.setLbPolicy(LbPolicy.ROUND_ROBIN)
.setMetadata(metadata)
.build();
CdsUpdate update = XdsClusterResource.processCluster(
cluster, null, LRS_SERVER_INFO,
LoadBalancerRegistry.getDefaultRegistry());
ImmutableMap<String, Object> expectedParsedMetadata = ImmutableMap.of(
"ADDRESS_METADATA", new InetSocketAddress("192.168.1.1", 8080),
"FILTER_METADATA", ImmutableMap.of(
"key1", "value1",
"key2", 42.0));
assertThat(update.parsedMetadata()).isEqualTo(expectedParsedMetadata);
}
@Test
public void processCluster_metadataKeyCollision_resolvesToTypedMetadata()
throws ResourceInvalidException, InvalidProtocolBufferException {
@ -2512,6 +2574,40 @@ public class GrpcXdsClientImplDataTest {
metadataRegistry.removeParser(testParser);
}
@Test
public void parseNonAggregateCluster_withHttp11ProxyTransportSocket()
throws ResourceInvalidException, InvalidProtocolBufferException {
XdsClusterResource.isEnabledXdsHttpConnect = true;
Http11ProxyUpstreamTransport http11ProxyUpstreamTransport =
Http11ProxyUpstreamTransport.newBuilder()
.setTransportSocket(TransportSocket.getDefaultInstance())
.build();
TransportSocket transportSocket = TransportSocket.newBuilder()
.setName(TRANSPORT_SOCKET_NAME_HTTP11_PROXY)
.setTypedConfig(Any.pack(http11ProxyUpstreamTransport))
.build();
Cluster cluster = Cluster.newBuilder()
.setName("cluster-http11-proxy.googleapis.com")
.setType(DiscoveryType.EDS)
.setEdsClusterConfig(
EdsClusterConfig.newBuilder()
.setEdsConfig(
ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
.setServiceName("service-http11-proxy.googleapis.com"))
.setLbPolicy(LbPolicy.ROUND_ROBIN)
.setTransportSocket(transportSocket)
.build();
CdsUpdate result =
XdsClusterResource.processCluster(cluster, null, LRS_SERVER_INFO,
LoadBalancerRegistry.getDefaultRegistry());
assertThat(result).isNotNull();
assertThat(result.isHttp11ProxyAvailable()).isTrue();
}
@Test
public void parseServerSideListener_invalidTrafficDirection() throws ResourceInvalidException {

View File

@ -607,9 +607,9 @@ public abstract class GrpcXdsClientImplTestBase {
Locality.create("region1", "zone1", "subzone1"),
LocalityLbEndpoints.create(
ImmutableList.of(LbEndpoint.create("192.168.0.1", 8080, 2, true,
"endpoint-host-name")), 1, 0),
"endpoint-host-name", ImmutableMap.of())), 1, 0, ImmutableMap.of()),
Locality.create("region3", "zone3", "subzone3"),
LocalityLbEndpoints.create(ImmutableList.<LbEndpoint>of(), 2, 1));
LocalityLbEndpoints.create(ImmutableList.<LbEndpoint>of(), 2, 1, ImmutableMap.of()));
}
/**
@ -3246,7 +3246,9 @@ public abstract class GrpcXdsClientImplTestBase {
Locality.create("region2", "zone2", "subzone2"),
LocalityLbEndpoints.create(
ImmutableList.of(
LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0));
LbEndpoint.create("172.44.2.2", 8000, 3,
true, "endpoint-host-name", ImmutableMap.of())),
2, 0, ImmutableMap.of()));
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, updatedClusterLoadAssignment, VERSION_2,
TIME_INCREMENT * 2);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
@ -3416,7 +3418,9 @@ public abstract class GrpcXdsClientImplTestBase {
Locality.create("region2", "zone2", "subzone2"),
LocalityLbEndpoints.create(
ImmutableList.of(
LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0));
LbEndpoint.create("172.44.2.2", 8000, 3,
true, "endpoint-host-name", ImmutableMap.of())),
2, 0, ImmutableMap.of()));
verify(watcher2).onChanged(edsUpdateCaptor.capture());
edsUpdate = edsUpdateCaptor.getValue();
assertThat(edsUpdate.clusterName).isEqualTo(edsResourceTwo);
@ -3426,7 +3430,9 @@ public abstract class GrpcXdsClientImplTestBase {
Locality.create("region2", "zone2", "subzone2"),
LocalityLbEndpoints.create(
ImmutableList.of(
LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0));
LbEndpoint.create("172.44.2.2", 8000, 3,
true, "endpoint-host-name", ImmutableMap.of())),
2, 0, ImmutableMap.of()));
verifyNoMoreInteractions(edsResourceWatcher);
verifyResourceMetadataAcked(
EDS, edsResourceTwo, clusterLoadAssignmentTwo, VERSION_2, TIME_INCREMENT * 2);

View File

@ -257,17 +257,17 @@ public class XdsTestUtils {
// Need to create endpoints to create locality endpoints map to create edsUpdate
Map<Locality, LocalityLbEndpoints> lbEndpointsMap = new HashMap<>();
LbEndpoint lbEndpoint =
LbEndpoint.create(serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME);
LbEndpoint lbEndpoint = LbEndpoint.create(
serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME, ImmutableMap.of());
lbEndpointsMap.put(
Locality.create("", "", ""),
LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0));
LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0, ImmutableMap.of()));
// Need to create EdsUpdate to create CdsUpdate to create XdsClusterConfig for builder
XdsEndpointResource.EdsUpdate edsUpdate = new XdsEndpointResource.EdsUpdate(
EDS_NAME, lbEndpointsMap, Collections.emptyList());
XdsClusterResource.CdsUpdate cdsUpdate = XdsClusterResource.CdsUpdate.forEds(
CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null)
CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null, false)
.lbPolicyConfig(getWrrLbConfigAsMap()).build();
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
CLUSTER_NAME, cdsUpdate, new EndpointConfig(StatusOr.fromValue(edsUpdate)));

View File

@ -86,6 +86,7 @@ envoy/extensions/load_balancing_policies/pick_first/v3/pick_first.proto
envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto
envoy/extensions/load_balancing_policies/round_robin/v3/round_robin.proto
envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto
envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.proto
envoy/extensions/transport_sockets/tls/v3/cert.proto
envoy/extensions/transport_sockets/tls/v3/common.proto
envoy/extensions/transport_sockets/tls/v3/secret.proto

View File

@ -0,0 +1,38 @@
syntax = "proto3";
package envoy.extensions.transport_sockets.http_11_proxy.v3;
import "envoy/config/core/v3/base.proto";
import "udpa/annotations/status.proto";
option java_package = "io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3";
option java_outer_classname = "UpstreamHttp11ConnectProto";
option java_multiple_files = true;
option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/http_11_proxy/v3;http_11_proxyv3";
option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#protodoc-title: Upstream HTTP/1.1 Proxy]
// [#extension: envoy.transport_sockets.http_11_proxy]
// HTTP/1.1 proxy transport socket establishes an upstream connection to a proxy address
// instead of the target host's address. This behavior is triggered when the transport
// socket is configured and proxy information is provided.
//
// Behavior when proxying:
// =======================
// When an upstream connection is established, instead of connecting directly to the endpoint
// address, the client will connect to the specified proxy address, send an HTTP/1.1 ``CONNECT`` request
// indicating the endpoint address, and process the response. If the response has HTTP status 200,
// the connection will be passed down to the underlying transport socket.
//
// Configuring proxy information:
// ==============================
// Set ``typed_filter_metadata`` in :ref:`LbEndpoint.Metadata <envoy_v3_api_field_config.endpoint.v3.lbendpoint.metadata>` or :ref:`LocalityLbEndpoints.Metadata <envoy_v3_api_field_config.endpoint.v3.LocalityLbEndpoints.metadata>`.
// using the key ``envoy.http11_proxy_transport_socket.proxy_address`` and the
// proxy address in ``config::core::v3::Address`` format.
//
message Http11ProxyUpstreamTransport {
// The underlying transport socket being wrapped. Defaults to plaintext (raw_buffer) if unset.
config.core.v3.TransportSocket transport_socket = 1;
}