diff --git a/xds/BUILD.bazel b/xds/BUILD.bazel index b235a79c52..53fac28b2d 100644 --- a/xds/BUILD.bazel +++ b/xds/BUILD.bazel @@ -85,6 +85,7 @@ java_proto_library( "@envoy_api//envoy/extensions/load_balancing_policies/ring_hash/v3:pkg", "@envoy_api//envoy/extensions/load_balancing_policies/round_robin/v3:pkg", "@envoy_api//envoy/extensions/load_balancing_policies/wrr_locality/v3:pkg", + "@envoy_api//envoy/extensions/transport_sockets/http_11_proxy/v3:pkg", "@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg", "@envoy_api//envoy/service/discovery/v3:pkg", "@envoy_api//envoy/service/load_stats/v3:pkg", diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 04b7663fd3..bb44071a48 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -243,7 +243,9 @@ final class CdsLoadBalancer2 extends LoadBalancer { } ClusterResolverConfig config = new ClusterResolverConfig( - Collections.unmodifiableList(instances), configOrError.getConfig()); + Collections.unmodifiableList(instances), + configOrError.getConfig(), + root.result.isHttp11ProxyAvailable()); if (childLb == null) { childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper); } diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 4e08ddc597..c92f592ebc 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.Struct; import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; +import io.grpc.HttpConnectProxiedSocketAddress; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; @@ -59,6 +60,8 @@ import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsLogger.XdsLogLevel; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -430,8 +433,18 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { .set(XdsAttributes.ATTR_SERVER_WEIGHT, weight) .set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname()) .build(); - EquivalentAddressGroup eag = new EquivalentAddressGroup( - endpoint.eag().getAddresses(), attr); + + EquivalentAddressGroup eag; + if (config.isHttp11ProxyAvailable()) { + List rewrittenAddresses = new ArrayList<>(); + for (SocketAddress addr : endpoint.eag().getAddresses()) { + rewrittenAddresses.add(rewriteAddress( + addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata())); + } + eag = new EquivalentAddressGroup(rewrittenAddresses, attr); + } else { + eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr); + } eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); addresses.add(eag); } @@ -469,6 +482,35 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { new EndpointsUpdated().run(); } + private SocketAddress rewriteAddress(SocketAddress addr, + ImmutableMap endpointMetadata, + ImmutableMap localityMetadata) { + if (!(addr instanceof InetSocketAddress)) { + return addr; + } + + SocketAddress proxyAddress; + try { + proxyAddress = (SocketAddress) endpointMetadata.get( + "envoy.http11_proxy_transport_socket.proxy_address"); + if (proxyAddress == null) { + proxyAddress = (SocketAddress) localityMetadata.get( + "envoy.http11_proxy_transport_socket.proxy_address"); + } + } catch (ClassCastException e) { + return addr; + } + + if (proxyAddress == null) { + return addr; + } + + return HttpConnectProxiedSocketAddress.newBuilder() + .setTargetAddress((InetSocketAddress) addr) + .setProxyAddress(proxyAddress) + .build(); + } + private List generatePriorityNames(String name, Map localityLbEndpoints) { TreeMap> todo = new TreeMap<>(); diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java index 2301cb670e..b5dcb27136 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java @@ -74,10 +74,17 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi final List discoveryMechanisms; // GracefulSwitch configuration final Object lbConfig; + private final boolean isHttp11ProxyAvailable; - ClusterResolverConfig(List discoveryMechanisms, Object lbConfig) { + ClusterResolverConfig(List discoveryMechanisms, Object lbConfig, + boolean isHttp11ProxyAvailable) { this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms"); this.lbConfig = checkNotNull(lbConfig, "lbConfig"); + this.isHttp11ProxyAvailable = isHttp11ProxyAvailable; + } + + boolean isHttp11ProxyAvailable() { + return isHttp11ProxyAvailable; } @Override diff --git a/xds/src/main/java/io/grpc/xds/Endpoints.java b/xds/src/main/java/io/grpc/xds/Endpoints.java index 7d7aa3e386..b0d97d42c1 100644 --- a/xds/src/main/java/io/grpc/xds/Endpoints.java +++ b/xds/src/main/java/io/grpc/xds/Endpoints.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.grpc.EquivalentAddressGroup; import java.net.InetSocketAddress; import java.util.List; @@ -41,11 +42,13 @@ final class Endpoints { // Locality's priority level. abstract int priority(); + abstract ImmutableMap localityMetadata(); + static LocalityLbEndpoints create(List endpoints, int localityWeight, - int priority) { + int priority, ImmutableMap localityMetadata) { checkArgument(localityWeight > 0, "localityWeight must be greater than 0"); return new AutoValue_Endpoints_LocalityLbEndpoints( - ImmutableList.copyOf(endpoints), localityWeight, priority); + ImmutableList.copyOf(endpoints), localityWeight, priority, localityMetadata); } } @@ -63,17 +66,20 @@ final class Endpoints { abstract String hostname(); + abstract ImmutableMap endpointMetadata(); + static LbEndpoint create(EquivalentAddressGroup eag, int loadBalancingWeight, - boolean isHealthy, String hostname) { - return new AutoValue_Endpoints_LbEndpoint(eag, loadBalancingWeight, isHealthy, hostname); + boolean isHealthy, String hostname, ImmutableMap endpointMetadata) { + return new AutoValue_Endpoints_LbEndpoint( + eag, loadBalancingWeight, isHealthy, hostname, endpointMetadata); } // Only for testing. @VisibleForTesting - static LbEndpoint create( - String address, int port, int loadBalancingWeight, boolean isHealthy, String hostname) { + static LbEndpoint create(String address, int port, int loadBalancingWeight, boolean isHealthy, + String hostname, ImmutableMap endpointMetadata) { return LbEndpoint.create(new EquivalentAddressGroup(new InetSocketAddress(address, port)), - loadBalancingWeight, isHealthy, hostname); + loadBalancingWeight, isHealthy, hostname, endpointMetadata); } } diff --git a/xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java b/xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java index 7ed617c984..41687817c4 100644 --- a/xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java +++ b/xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java @@ -36,6 +36,7 @@ import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.auth.MoreCallCredentials; import io.grpc.xds.MetadataRegistry.MetadataValueParser; +import io.grpc.xds.client.XdsResourceType.ResourceInvalidException; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; @@ -240,11 +241,16 @@ final class GcpAuthenticationFilter implements Filter { } @Override - public String parse(Any any) throws InvalidProtocolBufferException { - Audience audience = any.unpack(Audience.class); + public String parse(Any any) throws ResourceInvalidException { + Audience audience; + try { + audience = any.unpack(Audience.class); + } catch (InvalidProtocolBufferException ex) { + throw new ResourceInvalidException("Invalid Resource in address proto", ex); + } String url = audience.getUrl(); if (url.isEmpty()) { - throw new InvalidProtocolBufferException( + throw new ResourceInvalidException( "Audience URL is empty. Metadata value must contain a valid URL."); } return url; diff --git a/xds/src/main/java/io/grpc/xds/MetadataRegistry.java b/xds/src/main/java/io/grpc/xds/MetadataRegistry.java index 8243b6a6f0..b79a61a261 100644 --- a/xds/src/main/java/io/grpc/xds/MetadataRegistry.java +++ b/xds/src/main/java/io/grpc/xds/MetadataRegistry.java @@ -17,9 +17,14 @@ package io.grpc.xds; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; -import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Struct; +import io.envoyproxy.envoy.config.core.v3.Metadata; import io.grpc.xds.GcpAuthenticationFilter.AudienceMetadataParser; +import io.grpc.xds.XdsEndpointResource.AddressMetadataParser; +import io.grpc.xds.client.XdsResourceType.ResourceInvalidException; +import io.grpc.xds.internal.ProtobufJsonConverter; import java.util.HashMap; import java.util.Map; @@ -36,6 +41,7 @@ final class MetadataRegistry { private MetadataRegistry() { registerParser(new AudienceMetadataParser()); + registerParser(new AddressMetadataParser()); } static MetadataRegistry getInstance() { @@ -55,6 +61,54 @@ final class MetadataRegistry { supportedParsers.remove(parser.getTypeUrl()); } + /** + * Parses cluster metadata into a structured map. + * + *

Values in {@code typed_filter_metadata} take precedence over + * {@code filter_metadata} when keys overlap, following Envoy API behavior. See + * + * Envoy metadata documentation for details. + * + * @param metadata the {@link Metadata} containing the fields to parse. + * @return an immutable map of parsed metadata. + * @throws ResourceInvalidException if parsing {@code typed_filter_metadata} fails. + */ + public ImmutableMap parseMetadata(Metadata metadata) + throws ResourceInvalidException { + ImmutableMap.Builder parsedMetadata = ImmutableMap.builder(); + + // Process typed_filter_metadata + for (Map.Entry entry : metadata.getTypedFilterMetadataMap().entrySet()) { + String key = entry.getKey(); + Any value = entry.getValue(); + MetadataValueParser parser = findParser(value.getTypeUrl()); + if (parser != null) { + try { + Object parsedValue = parser.parse(value); + parsedMetadata.put(key, parsedValue); + } catch (ResourceInvalidException e) { + throw new ResourceInvalidException( + String.format("Failed to parse metadata key: %s, type: %s. Error: %s", + key, value.getTypeUrl(), e.getMessage()), e); + } + } + } + // building once to reuse in the next loop + ImmutableMap intermediateParsedMetadata = parsedMetadata.build(); + + // Process filter_metadata for remaining keys + for (Map.Entry entry : metadata.getFilterMetadataMap().entrySet()) { + String key = entry.getKey(); + if (!intermediateParsedMetadata.containsKey(key)) { + Struct structValue = entry.getValue(); + Object jsonValue = ProtobufJsonConverter.convertToJson(structValue); + parsedMetadata.put(key, jsonValue); + } + } + + return parsedMetadata.build(); + } + interface MetadataValueParser { String getTypeUrl(); @@ -64,8 +118,8 @@ final class MetadataRegistry { * * @param any the {@link Any} object to parse. * @return the parsed metadata value. - * @throws InvalidProtocolBufferException if the parsing fails. + * @throws ResourceInvalidException if the parsing fails. */ - Object parse(Any any) throws InvalidProtocolBufferException; + Object parse(Any any) throws ResourceInvalidException; } } diff --git a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java index 626d61c1f5..cfc74f3ca7 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java @@ -25,7 +25,6 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Any; import com.google.protobuf.Duration; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; @@ -33,10 +32,11 @@ import com.google.protobuf.Struct; import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; import io.envoyproxy.envoy.config.cluster.v3.Cluster; -import io.envoyproxy.envoy.config.core.v3.Metadata; import io.envoyproxy.envoy.config.core.v3.RoutingPriority; import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3.Http11ProxyUpstreamTransport; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; import io.grpc.LoadBalancerRegistry; @@ -46,15 +46,12 @@ import io.grpc.internal.ServiceConfigUtil; import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; -import io.grpc.xds.MetadataRegistry.MetadataValueParser; import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.client.XdsClient.ResourceUpdate; import io.grpc.xds.client.XdsResourceType; -import io.grpc.xds.internal.ProtobufJsonConverter; import io.grpc.xds.internal.security.CommonTlsContextUtil; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Set; import javax.annotation.Nullable; @@ -67,6 +64,8 @@ class XdsClusterResource extends XdsResourceType { @VisibleForTesting public static boolean enableSystemRootCerts = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_SYSTEM_ROOT_CERTS", false); + static boolean isEnabledXdsHttpConnect = + GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT", false); @VisibleForTesting static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate"; @@ -78,6 +77,9 @@ class XdsClusterResource extends XdsResourceType { "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext"; private static final String TYPE_URL_UPSTREAM_TLS_CONTEXT_V2 = "type.googleapis.com/envoy.api.v2.auth.UpstreamTlsContext"; + static final String TRANSPORT_SOCKET_NAME_HTTP11_PROXY = + "type.googleapis.com/envoy.extensions.transport_sockets.http_11_proxy.v3" + + ".Http11ProxyUpstreamTransport"; private final LoadBalancerRegistry loadBalancerRegistry = LoadBalancerRegistry.getDefaultRegistry(); @@ -177,10 +179,11 @@ class XdsClusterResource extends XdsResourceType { ImmutableMap.copyOf(cluster.getMetadata().getFilterMetadataMap())); try { + MetadataRegistry registry = MetadataRegistry.getInstance(); ImmutableMap parsedFilterMetadata = - parseClusterMetadata(cluster.getMetadata()); + registry.parseMetadata(cluster.getMetadata()); updateBuilder.parsedMetadata(parsedFilterMetadata); - } catch (InvalidProtocolBufferException e) { + } catch (ResourceInvalidException e) { throw new ResourceInvalidException( "Failed to parse xDS filter metadata for cluster '" + cluster.getName() + "': " + e.getMessage(), e); @@ -189,49 +192,6 @@ class XdsClusterResource extends XdsResourceType { return updateBuilder.build(); } - /** - * Parses cluster metadata into a structured map. - * - *

Values in {@code typed_filter_metadata} take precedence over - * {@code filter_metadata} when keys overlap, following Envoy API behavior. See - * - * Envoy metadata documentation for details. - * - * @param metadata the {@link Metadata} containing the fields to parse. - * @return an immutable map of parsed metadata. - * @throws InvalidProtocolBufferException if parsing {@code typed_filter_metadata} fails. - */ - private static ImmutableMap parseClusterMetadata(Metadata metadata) - throws InvalidProtocolBufferException { - ImmutableMap.Builder parsedMetadata = ImmutableMap.builder(); - - MetadataRegistry registry = MetadataRegistry.getInstance(); - // Process typed_filter_metadata - for (Map.Entry entry : metadata.getTypedFilterMetadataMap().entrySet()) { - String key = entry.getKey(); - Any value = entry.getValue(); - MetadataValueParser parser = registry.findParser(value.getTypeUrl()); - if (parser != null) { - Object parsedValue = parser.parse(value); - parsedMetadata.put(key, parsedValue); - } - } - // building once to reuse in the next loop - ImmutableMap intermediateParsedMetadata = parsedMetadata.build(); - - // Process filter_metadata for remaining keys - for (Map.Entry entry : metadata.getFilterMetadataMap().entrySet()) { - String key = entry.getKey(); - if (!intermediateParsedMetadata.containsKey(key)) { - Struct structValue = entry.getValue(); - Object jsonValue = ProtobufJsonConverter.convertToJson(structValue); - parsedMetadata.put(key, jsonValue); - } - } - - return parsedMetadata.build(); - } - private static StructOrError parseAggregateCluster(Cluster cluster) { String clusterName = cluster.getName(); Cluster.CustomClusterType customType = cluster.getClusterType(); @@ -259,6 +219,7 @@ class XdsClusterResource extends XdsResourceType { Long maxConcurrentRequests = null; UpstreamTlsContext upstreamTlsContext = null; OutlierDetection outlierDetection = null; + boolean isHttp11ProxyAvailable = false; if (cluster.hasLrsServer()) { if (!cluster.getLrsServer().hasSelf()) { return StructOrError.fromError( @@ -281,17 +242,43 @@ class XdsClusterResource extends XdsResourceType { return StructOrError.fromError("Cluster " + clusterName + ": transport-socket-matches not supported."); } - if (cluster.hasTransportSocket()) { - if (!TRANSPORT_SOCKET_NAME_TLS.equals(cluster.getTransportSocket().getName())) { - return StructOrError.fromError("transport-socket with name " - + cluster.getTransportSocket().getName() + " not supported."); + boolean hasTransportSocket = cluster.hasTransportSocket(); + TransportSocket transportSocket = cluster.getTransportSocket(); + + if (hasTransportSocket && !TRANSPORT_SOCKET_NAME_TLS.equals(transportSocket.getName()) + && !(isEnabledXdsHttpConnect + && TRANSPORT_SOCKET_NAME_HTTP11_PROXY.equals(transportSocket.getName()))) { + return StructOrError.fromError( + "transport-socket with name " + transportSocket.getName() + " not supported."); + } + + if (hasTransportSocket && isEnabledXdsHttpConnect + && TRANSPORT_SOCKET_NAME_HTTP11_PROXY.equals(transportSocket.getName())) { + isHttp11ProxyAvailable = true; + try { + Http11ProxyUpstreamTransport wrappedTransportSocket = transportSocket + .getTypedConfig().unpack(io.envoyproxy.envoy.extensions.transport_sockets + .http_11_proxy.v3.Http11ProxyUpstreamTransport.class); + hasTransportSocket = wrappedTransportSocket.hasTransportSocket(); + transportSocket = wrappedTransportSocket.getTransportSocket(); + } catch (InvalidProtocolBufferException e) { + return StructOrError.fromError( + "Cluster " + clusterName + ": malformed Http11ProxyUpstreamTransport: " + e); + } catch (ClassCastException e) { + return StructOrError.fromError( + "Cluster " + clusterName + + ": invalid transport_socket type in Http11ProxyUpstreamTransport"); } + } + + if (hasTransportSocket && TRANSPORT_SOCKET_NAME_TLS.equals(transportSocket.getName())) { try { upstreamTlsContext = UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext( validateUpstreamTlsContext( - unpackCompatibleType(cluster.getTransportSocket().getTypedConfig(), - io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext.class, - TYPE_URL_UPSTREAM_TLS_CONTEXT, TYPE_URL_UPSTREAM_TLS_CONTEXT_V2), + unpackCompatibleType(transportSocket.getTypedConfig(), + io.envoyproxy.envoy.extensions + .transport_sockets.tls.v3.UpstreamTlsContext.class, + TYPE_URL_UPSTREAM_TLS_CONTEXT, TYPE_URL_UPSTREAM_TLS_CONTEXT_V2), certProviderInstances)); } catch (InvalidProtocolBufferException | ResourceInvalidException e) { return StructOrError.fromError( @@ -329,9 +316,10 @@ class XdsClusterResource extends XdsResourceType { return StructOrError.fromError( "EDS service_name must be set when Cluster resource has an xdstp name"); } + return StructOrError.fromStruct(CdsUpdate.forEds( clusterName, edsServiceName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext, - outlierDetection)); + outlierDetection, isHttp11ProxyAvailable)); } else if (type.equals(Cluster.DiscoveryType.LOGICAL_DNS)) { if (!cluster.hasLoadAssignment()) { return StructOrError.fromError( @@ -366,7 +354,8 @@ class XdsClusterResource extends XdsResourceType { String dnsHostName = String.format( Locale.US, "%s:%d", socketAddress.getAddress(), socketAddress.getPortValue()); return StructOrError.fromStruct(CdsUpdate.forLogicalDns( - clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext)); + clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests, + upstreamTlsContext, isHttp11ProxyAvailable)); } return StructOrError.fromError( "Cluster " + clusterName + ": unsupported built-in discovery type: " + type); @@ -620,6 +609,8 @@ class XdsClusterResource extends XdsResourceType { @Nullable abstract UpstreamTlsContext upstreamTlsContext(); + abstract boolean isHttp11ProxyAvailable(); + // List of underlying clusters making of this aggregate cluster. // Only valid for AGGREGATE cluster. @Nullable @@ -640,7 +631,8 @@ class XdsClusterResource extends XdsResourceType { .maxRingSize(0) .choiceCount(0) .filterMetadata(ImmutableMap.of()) - .parsedMetadata(ImmutableMap.of()); + .parsedMetadata(ImmutableMap.of()) + .isHttp11ProxyAvailable(false); } static Builder forAggregate(String clusterName, List prioritizedClusterNames) { @@ -653,26 +645,30 @@ class XdsClusterResource extends XdsResourceType { static Builder forEds(String clusterName, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext, - @Nullable OutlierDetection outlierDetection) { + @Nullable OutlierDetection outlierDetection, + boolean isHttp11ProxyAvailable) { return newBuilder(clusterName) .clusterType(ClusterType.EDS) .edsServiceName(edsServiceName) .lrsServerInfo(lrsServerInfo) .maxConcurrentRequests(maxConcurrentRequests) .upstreamTlsContext(upstreamTlsContext) - .outlierDetection(outlierDetection); + .outlierDetection(outlierDetection) + .isHttp11ProxyAvailable(isHttp11ProxyAvailable); } static Builder forLogicalDns(String clusterName, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext upstreamTlsContext) { + @Nullable UpstreamTlsContext upstreamTlsContext, + boolean isHttp11ProxyAvailable) { return newBuilder(clusterName) .clusterType(ClusterType.LOGICAL_DNS) .dnsHostName(dnsHostName) .lrsServerInfo(lrsServerInfo) .maxConcurrentRequests(maxConcurrentRequests) - .upstreamTlsContext(upstreamTlsContext); + .upstreamTlsContext(upstreamTlsContext) + .isHttp11ProxyAvailable(isHttp11ProxyAvailable); } enum ClusterType { @@ -749,6 +745,8 @@ class XdsClusterResource extends XdsResourceType { // Private, use one of the static factory methods instead. protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests); + protected abstract Builder isHttp11ProxyAvailable(boolean isHttp11ProxyAvailable); + // Private, use one of the static factory methods instead. protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext); diff --git a/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java b/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java index 6a3cd35bd5..11111fa51c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java @@ -20,9 +20,14 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.InetAddresses; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import io.envoyproxy.envoy.config.core.v3.Address; import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; import io.envoyproxy.envoy.type.v3.FractionalPercent; @@ -30,6 +35,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.internal.GrpcUtil; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LocalityLbEndpoints; +import io.grpc.xds.MetadataRegistry.MetadataValueParser; import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.client.Locality; import io.grpc.xds.client.XdsClient.ResourceUpdate; @@ -185,7 +191,8 @@ class XdsEndpointResource extends XdsResourceType { @VisibleForTesting @Nullable static StructOrError parseLocalityLbEndpoints( - io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto) { + io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto) + throws ResourceInvalidException { // Filter out localities without or with 0 weight. if (!proto.hasLoadBalancingWeight() || proto.getLoadBalancingWeight().getValue() < 1) { return null; @@ -193,6 +200,15 @@ class XdsEndpointResource extends XdsResourceType { if (proto.getPriority() < 0) { return StructOrError.fromError("negative priority"); } + + ImmutableMap localityMetadata; + MetadataRegistry registry = MetadataRegistry.getInstance(); + try { + localityMetadata = registry.parseMetadata(proto.getMetadata()); + } catch (ResourceInvalidException e) { + throw new ResourceInvalidException("Failed to parse Locality Endpoint metadata: " + + e.getMessage(), e); + } List endpoints = new ArrayList<>(proto.getLbEndpointsCount()); for (io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint endpoint : proto.getLbEndpointsList()) { // The endpoint field of each lb_endpoints must be set. @@ -200,6 +216,13 @@ class XdsEndpointResource extends XdsResourceType { if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) { return StructOrError.fromError("LbEndpoint with no endpoint/address"); } + ImmutableMap endpointMetadata; + try { + endpointMetadata = registry.parseMetadata(endpoint.getMetadata()); + } catch (ResourceInvalidException e) { + throw new ResourceInvalidException("Failed to parse Endpoint metadata: " + + e.getMessage(), e); + } List addresses = new ArrayList<>(); addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress())); @@ -214,10 +237,12 @@ class XdsEndpointResource extends XdsResourceType { endpoints.add(Endpoints.LbEndpoint.create( new EquivalentAddressGroup(addresses), endpoint.getLoadBalancingWeight().getValue(), isHealthy, - endpoint.getEndpoint().getHostname())); + endpoint.getEndpoint().getHostname(), + endpointMetadata)); } return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create( - endpoints, proto.getLoadBalancingWeight().getValue(), proto.getPriority())); + endpoints, proto.getLoadBalancingWeight().getValue(), + proto.getPriority(), localityMetadata)); } private static InetSocketAddress getInetSocketAddress(Address address) { @@ -270,4 +295,47 @@ class XdsEndpointResource extends XdsResourceType { .toString(); } } + + public static class AddressMetadataParser implements MetadataValueParser { + + @Override + public String getTypeUrl() { + return "type.googleapis.com/envoy.config.core.v3.Address"; + } + + @Override + public java.net.SocketAddress parse(Any any) throws ResourceInvalidException { + SocketAddress socketAddress; + try { + socketAddress = any.unpack(Address.class).getSocketAddress(); + } catch (InvalidProtocolBufferException ex) { + throw new ResourceInvalidException("Invalid Resource in address proto", ex); + } + validateAddress(socketAddress); + + String ip = socketAddress.getAddress(); + int port = socketAddress.getPortValue(); + + try { + return new InetSocketAddress(InetAddresses.forString(ip), port); + } catch (IllegalArgumentException e) { + throw createException("Invalid IP address or port: " + ip + ":" + port); + } + } + + private void validateAddress(SocketAddress socketAddress) throws ResourceInvalidException { + if (socketAddress.getAddress().isEmpty()) { + throw createException("Address field is empty or invalid."); + } + long port = Integer.toUnsignedLong(socketAddress.getPortValue()); + if (port > 65535) { + throw createException(String.format("Port value %d out of range 1-65535.", port)); + } + } + + private ResourceInvalidException createException(String message) { + return new ResourceInvalidException( + "Failed to parse envoy.config.core.v3.Address: " + message); + } + } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 82a61e79ab..479bde76ce 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -179,7 +179,7 @@ public class CdsLoadBalancer2Test { public void discoverTopLevelEdsCluster() { CdsUpdate update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - outlierDetection) + outlierDetection, false) .roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(childBalancers).hasSize(1); @@ -198,7 +198,8 @@ public class CdsLoadBalancer2Test { @Test public void discoverTopLevelLogicalDnsCluster() { CdsUpdate update = - CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext) + CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, + false) .leastRequestLbPolicy(3).build(); xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(childBalancers).hasSize(1); @@ -232,7 +233,7 @@ public class CdsLoadBalancer2Test { @Test public void nonAggregateCluster_resourceUpdate() { CdsUpdate update = - CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext, outlierDetection) + CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext, outlierDetection, false) .roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(childBalancers).hasSize(1); @@ -243,7 +244,7 @@ public class CdsLoadBalancer2Test { 100L, upstreamTlsContext, outlierDetection); update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, null, - outlierDetection).roundRobinLbPolicy().build(); + outlierDetection, false).roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(CLUSTER, update); childLbConfig = (ClusterResolverConfig) childBalancer.config; instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); @@ -254,7 +255,8 @@ public class CdsLoadBalancer2Test { @Test public void nonAggregateCluster_resourceRevoked() { CdsUpdate update = - CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext) + CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext, + false) .roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(childBalancers).hasSize(1); @@ -298,16 +300,16 @@ public class CdsLoadBalancer2Test { CLUSTER, cluster1, cluster2, cluster3, cluster4); assertThat(childBalancers).isEmpty(); CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build(); + upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster3, update3); assertThat(childBalancers).isEmpty(); CdsUpdate update2 = - CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null) + CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null, false) .roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster2, update2); assertThat(childBalancers).isEmpty(); CdsUpdate update4 = - CdsUpdate.forEds(cluster4, null, LRS_SERVER_INFO, 300L, null, outlierDetection) + CdsUpdate.forEds(cluster4, null, LRS_SERVER_INFO, 300L, null, outlierDetection, false) .roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster4, update4); assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered @@ -362,10 +364,11 @@ public class CdsLoadBalancer2Test { xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build(); + upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster1, update1); CdsUpdate update2 = - CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null) + CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, + false) .roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster2, update2); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); @@ -412,10 +415,11 @@ public class CdsLoadBalancer2Test { xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build(); + upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster1, update1); CdsUpdate update2 = - CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null) + CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, + false) .roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster2, update2); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); @@ -467,7 +471,7 @@ public class CdsLoadBalancer2Test { xdsClient.deliverCdsUpdate(cluster2, update2); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3); CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build(); + upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster3, update3); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; @@ -518,7 +522,7 @@ public class CdsLoadBalancer2Test { reset(helper); CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build(); + upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster3, update3); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); @@ -553,7 +557,7 @@ public class CdsLoadBalancer2Test { .roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster2, update2); CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build(); + upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster3, update3); // cluster2 (aggr.) -> [cluster3 (EDS)] @@ -602,7 +606,7 @@ public class CdsLoadBalancer2Test { // Define EDS cluster CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build(); + upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster3, update3); // cluster4 (agg) -> [cluster3 (EDS)] with dups (3 copies) @@ -649,7 +653,8 @@ public class CdsLoadBalancer2Test { .roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(CLUSTER, update); CdsUpdate update1 = - CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_INFO, 200L, null) + CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_INFO, 200L, null, + false) .roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(cluster1, update1); FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers); @@ -676,7 +681,7 @@ public class CdsLoadBalancer2Test { @Test public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() { CdsUpdate update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build(); + upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); xdsClient.deliverCdsUpdate(CLUSTER, update); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.shutdown).isFalse(); @@ -692,7 +697,7 @@ public class CdsLoadBalancer2Test { try { xdsClient.deliverCdsUpdate(CLUSTER, CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - outlierDetection) + outlierDetection, false) .lbPolicyConfig(ImmutableMap.of("unknownLb", ImmutableMap.of("foo", "bar"))).build()); } catch (Exception e) { assertThat(e).hasMessageThat().contains("unknownLb"); @@ -706,7 +711,7 @@ public class CdsLoadBalancer2Test { try { xdsClient.deliverCdsUpdate(CLUSTER, CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - outlierDetection).lbPolicyConfig( + outlierDetection, false).lbPolicyConfig( ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1"))) .build()); } catch (Exception e) { diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 28898c0930..d0176d7aa3 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -36,6 +36,7 @@ import io.grpc.Attributes; import io.grpc.ChannelLogger; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; +import io.grpc.HttpConnectProxiedSocketAddress; import io.grpc.InsecureChannelCredentials; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; @@ -83,6 +84,7 @@ import io.grpc.xds.client.Locality; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -240,7 +242,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void edsClustersWithRingHashEndpointLbPolicy() { ClusterResolverConfig config = new ClusterResolverConfig( - Collections.singletonList(edsDiscoveryMechanism1), ringHash); + Collections.singletonList(edsDiscoveryMechanism1), ringHash, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); assertThat(childBalancers).isEmpty(); @@ -252,14 +254,18 @@ public class ClusterResolverLoadBalancerTest { LocalityLbEndpoints localityLbEndpoints1 = LocalityLbEndpoints.create( Arrays.asList( - LbEndpoint.create(endpoint1, 0 /* loadBalancingWeight */, true, "hostname1"), - LbEndpoint.create(endpoint2, 0 /* loadBalancingWeight */, true, "hostname2")), - 10 /* localityWeight */, 1 /* priority */); + LbEndpoint.create(endpoint1, 0 /* loadBalancingWeight */, + true, "hostname1", ImmutableMap.of()), + LbEndpoint.create(endpoint2, 0 /* loadBalancingWeight */, + true, "hostname2", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); LocalityLbEndpoints localityLbEndpoints2 = LocalityLbEndpoints.create( Collections.singletonList( - LbEndpoint.create(endpoint3, 60 /* loadBalancingWeight */, true, "hostname3")), - 50 /* localityWeight */, 1 /* priority */); + LbEndpoint.create( + endpoint3, 60 /* loadBalancingWeight */, true, + "hostname3", ImmutableMap.of())), + 50 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2)); @@ -302,7 +308,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void edsClustersWithLeastRequestEndpointLbPolicy() { ClusterResolverConfig config = new ClusterResolverConfig( - Collections.singletonList(edsDiscoveryMechanism1), leastRequest); + Collections.singletonList(edsDiscoveryMechanism1), leastRequest, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); assertThat(childBalancers).isEmpty(); @@ -312,8 +318,9 @@ public class ClusterResolverLoadBalancerTest { LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( Arrays.asList( - LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, "hostname1")), - 100 /* localityWeight */, 1 /* priority */); + LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, + "hostname1", ImmutableMap.of())), + 100 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, ImmutableMap.of(locality1, localityLbEndpoints)); @@ -348,7 +355,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void edsClustersEndpointHostname_addedToAddressAttribute() { ClusterResolverConfig config = new ClusterResolverConfig( - Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest); + Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); assertThat(childBalancers).isEmpty(); @@ -358,8 +365,9 @@ public class ClusterResolverLoadBalancerTest { LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( Arrays.asList( - LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, "hostname1")), - 100 /* localityWeight */, 1 /* priority */); + LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, + "hostname1", ImmutableMap.of())), + 100 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, ImmutableMap.of(locality1, localityLbEndpoints)); @@ -371,11 +379,104 @@ public class ClusterResolverLoadBalancerTest { .get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo("hostname1"); } + @Test + public void endpointAddressRewritten_whenProxyMetadataIsInEndpointMetadata() { + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, true); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); + assertThat(childBalancers).isEmpty(); + + EquivalentAddressGroup endpoint = + new EquivalentAddressGroup(InetSocketAddress.createUnresolved("127.0.0.1", 8080)); + + // Proxy address in endpointMetadata (use FakeSocketAddress directly) + SocketAddress proxyAddress = new FakeSocketAddress("127.0.0.2"); + ImmutableMap endpointMetadata = + ImmutableMap.of("envoy.http11_proxy_transport_socket.proxy_address", proxyAddress); + + // No proxy in locality metadata + ImmutableMap localityMetadata = ImmutableMap.of(); + + LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( + Arrays.asList( + LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, + "hostname1", endpointMetadata)), + 100 /* localityWeight */, 1 /* priority */, localityMetadata); + + xdsClient.deliverClusterLoadAssignment( + EDS_SERVICE_NAME1, + ImmutableMap.of(locality1, localityLbEndpoints)); + + assertThat(childBalancers).hasSize(1); + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + + // Get the rewritten address + SocketAddress rewrittenAddress = + childBalancer.addresses.get(0).getAddresses().get(0); + assertThat(rewrittenAddress).isInstanceOf(HttpConnectProxiedSocketAddress.class); + HttpConnectProxiedSocketAddress proxiedSocket = + (HttpConnectProxiedSocketAddress) rewrittenAddress; + + // Assert that the target address is the original address + assertThat(proxiedSocket.getTargetAddress()) + .isEqualTo(endpoint.getAddresses().get(0)); + + // Assert that the proxy address is correctly set + assertThat(proxiedSocket.getProxyAddress()).isEqualTo(proxyAddress); + } + + @Test + public void endpointAddressRewritten_whenProxyMetadataIsInLocalityMetadata() { + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, true); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); + assertThat(childBalancers).isEmpty(); + + EquivalentAddressGroup endpoint = + new EquivalentAddressGroup(InetSocketAddress.createUnresolved("127.0.0.2", 8080)); + + // No proxy in endpointMetadata + ImmutableMap endpointMetadata = ImmutableMap.of(); + + // Proxy address is now in localityMetadata + SocketAddress proxyAddress = new FakeSocketAddress("proxy-addr"); + ImmutableMap localityMetadata = + ImmutableMap.of("envoy.http11_proxy_transport_socket.proxy_address", proxyAddress); + + LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( + Arrays.asList( + LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, + "hostname2", endpointMetadata)), + 100 /* localityWeight */, 1 /* priority */, localityMetadata); + + xdsClient.deliverClusterLoadAssignment( + EDS_SERVICE_NAME1, + ImmutableMap.of(locality1, localityLbEndpoints)); + + assertThat(childBalancers).hasSize(1); + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + + // Get the rewritten address + SocketAddress rewrittenAddress = childBalancer.addresses.get(0).getAddresses().get(0); + + // Assert that the address was rewritten + assertThat(rewrittenAddress).isInstanceOf(HttpConnectProxiedSocketAddress.class); + HttpConnectProxiedSocketAddress proxiedSocket = + (HttpConnectProxiedSocketAddress) rewrittenAddress; + + // Assert that the target address is the original address + assertThat(proxiedSocket.getTargetAddress()).isEqualTo(endpoint.getAddresses().get(0)); + + // Assert that the proxy address is correctly set from locality metadata + assertThat(proxiedSocket.getProxyAddress()).isEqualTo(proxyAddress); + } @Test public void onlyEdsClusters_receivedEndpoints() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin); + Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2); assertThat(childBalancers).isEmpty(); @@ -389,17 +490,21 @@ public class ClusterResolverLoadBalancerTest { LocalityLbEndpoints localityLbEndpoints1 = LocalityLbEndpoints.create( Arrays.asList( - LbEndpoint.create(endpoint1, 100, true, "hostname1"), - LbEndpoint.create(endpoint2, 100, true, "hostname1")), - 70 /* localityWeight */, 1 /* priority */); + LbEndpoint.create(endpoint1, 100, + true, "hostname1", ImmutableMap.of()), + LbEndpoint.create(endpoint2, 100, + true, "hostname1", ImmutableMap.of())), + 70 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); LocalityLbEndpoints localityLbEndpoints2 = LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint3, 100, true, "hostname2")), - 10 /* localityWeight */, 1 /* priority */); + Collections.singletonList(LbEndpoint.create(endpoint3, 100, true, + "hostname2", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); LocalityLbEndpoints localityLbEndpoints3 = LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint4, 100, true, "hostname3")), - 20 /* localityWeight */, 2 /* priority */); + Collections.singletonList(LbEndpoint.create(endpoint4, 100, true, + "hostname3", ImmutableMap.of())), + 20 /* localityWeight */, 2 /* priority */, ImmutableMap.of()); String priority1 = CLUSTER2 + "[child1]"; String priority2 = CLUSTER2 + "[child2]"; String priority3 = CLUSTER1 + "[child1]"; @@ -487,7 +592,7 @@ public class ClusterResolverLoadBalancerTest { private void verifyEdsPriorityNames(List want, Map... updates) { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism2), roundRobin); + Arrays.asList(edsDiscoveryMechanism2), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME2); assertThat(childBalancers).isEmpty(); @@ -553,15 +658,17 @@ public class ClusterResolverLoadBalancerTest { private LocalityLbEndpoints createEndpoints(int priority) { return LocalityLbEndpoints.create( Arrays.asList( - LbEndpoint.create(makeAddress("endpoint-addr-1"), 100, true, "hostname1"), - LbEndpoint.create(makeAddress("endpoint-addr-2"), 100, true, "hostname2")), - 70 /* localityWeight */, priority /* priority */); + LbEndpoint.create(makeAddress("endpoint-addr-1"), 100, + true, "hostname1", ImmutableMap.of()), + LbEndpoint.create(makeAddress("endpoint-addr-2"), 100, + true, "hostname2", ImmutableMap.of())), + 70 /* localityWeight */, priority /* priority */, ImmutableMap.of()); } @Test public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin); + Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2); assertThat(childBalancers).isEmpty(); @@ -583,7 +690,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void onlyEdsClusters_allResourcesRevoked_shutDownChildLbPolicy() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin); + Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2); assertThat(childBalancers).isEmpty(); @@ -592,12 +699,14 @@ public class ClusterResolverLoadBalancerTest { EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); LocalityLbEndpoints localityLbEndpoints1 = LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, "hostname1")), - 10 /* localityWeight */, 1 /* priority */); + Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, + "hostname1", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); LocalityLbEndpoints localityLbEndpoints2 = LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint2, 100, true, "hostname2")), - 20 /* localityWeight */, 2 /* priority */); + Collections.singletonList(LbEndpoint.create(endpoint2, 100, true, + "hostname2", ImmutableMap.of())), + 20 /* localityWeight */, 2 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints1)); xdsClient.deliverClusterLoadAssignment( @@ -618,17 +727,19 @@ public class ClusterResolverLoadBalancerTest { @Test public void handleEdsResource_ignoreUnhealthyEndpoints() { - ClusterResolverConfig config = - new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin); + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false); deliverLbConfig(config); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( Arrays.asList( - LbEndpoint.create(endpoint1, 100, false /* isHealthy */, "hostname1"), - LbEndpoint.create(endpoint2, 100, true /* isHealthy */, "hostname2")), - 10 /* localityWeight */, 1 /* priority */); + LbEndpoint.create(endpoint1, 100, false /* isHealthy */, + "hostname1", ImmutableMap.of()), + LbEndpoint.create(endpoint2, 100, true /* isHealthy */, + "hostname2", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); @@ -638,21 +749,21 @@ public class ClusterResolverLoadBalancerTest { @Test public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() { - ClusterResolverConfig config = - new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin); + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false); deliverLbConfig(config); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); LocalityLbEndpoints localityLbEndpoints1 = LocalityLbEndpoints.create( Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */, - "hostname1")), - 10 /* localityWeight */, 1 /* priority */); + "hostname1", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); LocalityLbEndpoints localityLbEndpoints2 = LocalityLbEndpoints.create( Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */, - "hostname2")), - 10 /* localityWeight */, 1 /* priority */); + "hostname2", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2)); @@ -665,21 +776,21 @@ public class ClusterResolverLoadBalancerTest { @Test public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() { - ClusterResolverConfig config = - new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin); + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false); deliverLbConfig(config); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); LocalityLbEndpoints localityLbEndpoints1 = LocalityLbEndpoints.create( Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */, - "hostname1")), - 10 /* localityWeight */, 1 /* priority */); + "hostname1", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); LocalityLbEndpoints localityLbEndpoints2 = LocalityLbEndpoints.create( Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */, - "hostname2")), - 10 /* localityWeight */, 2 /* priority */); + "hostname2", ImmutableMap.of())), + 10 /* localityWeight */, 2 /* priority */, ImmutableMap.of()); String priority2 = CLUSTER1 + "[child2]"; xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, @@ -691,15 +802,15 @@ public class ClusterResolverLoadBalancerTest { @Test public void handleEdsResource_noHealthyEndpoint() { - ClusterResolverConfig config = - new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin); + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false); deliverLbConfig(config); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( Collections.singletonList(LbEndpoint.create(endpoint, 100, false /* isHealthy */, - "hostname1")), - 10 /* localityWeight */, 1 /* priority */); + "hostname1", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment(EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); // single endpoint, unhealthy @@ -716,7 +827,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void onlyLogicalDnsCluster_endpointsResolved() { ClusterResolverConfig config = new ClusterResolverConfig( - Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); + Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); assertThat(childBalancers).isEmpty(); @@ -749,7 +860,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void onlyLogicalDnsCluster_handleRefreshNameResolution() { ClusterResolverConfig config = new ClusterResolverConfig( - Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); + Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); assertThat(childBalancers).isEmpty(); @@ -767,7 +878,7 @@ public class ClusterResolverLoadBalancerTest { InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); ClusterResolverConfig config = new ClusterResolverConfig( - Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); + Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); assertThat(childBalancers).isEmpty(); @@ -813,7 +924,7 @@ public class ClusterResolverLoadBalancerTest { public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() { InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); ClusterResolverConfig config = new ClusterResolverConfig( - Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); + Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); assertThat(childBalancers).isEmpty(); @@ -851,7 +962,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void edsClustersAndLogicalDnsCluster_receivedEndpoints() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); @@ -862,8 +973,9 @@ public class ClusterResolverLoadBalancerTest { resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2)); LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint3, 100, true, "hostname3")), - 10 /* localityWeight */, 1 /* priority */); + Collections.singletonList(LbEndpoint.create(endpoint3, 100, true, + "hostname3", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); @@ -886,7 +998,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void noEdsResourceExists_useDnsResolutionResults() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); @@ -910,7 +1022,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturnErrorPicker() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); @@ -919,8 +1031,9 @@ public class ClusterResolverLoadBalancerTest { EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint, 100, true, "hostname1")), - 10 /* localityWeight */, 1 /* priority */); + Collections.singletonList(LbEndpoint.create(endpoint, 100, true, + "hostname1", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); resolver.deliverError(Status.UNKNOWN.withDescription("I am lost")); @@ -941,7 +1054,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void resolutionErrorAfterChildLbCreated_propagateErrorIfAllClustersEncounterError() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); @@ -950,8 +1063,9 @@ public class ClusterResolverLoadBalancerTest { EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint, 100, true, "hostname1")), - 10 /* localityWeight */, 1 /* priority */); + Collections.singletonList(LbEndpoint.create(endpoint, 100, true, + "hostname1", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); assertThat(childBalancers).isEmpty(); // not created until all clusters resolved. @@ -976,7 +1090,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void resolutionErrorBeforeChildLbCreated_returnErrorPickerIfAllClustersEncounterError() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); @@ -999,7 +1113,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void resolutionErrorBeforeChildLbCreated_edsOnly_returnErrorPicker() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1), roundRobin); + Arrays.asList(edsDiscoveryMechanism1), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); assertThat(childBalancers).isEmpty(); @@ -1017,7 +1131,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); assertResolverCreated("/" + DNS_HOST_NAME); @@ -1033,7 +1147,7 @@ public class ClusterResolverLoadBalancerTest { @Test public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() { ClusterResolverConfig config = new ClusterResolverConfig( - Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); @@ -1043,8 +1157,9 @@ public class ClusterResolverLoadBalancerTest { EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, "hostname1")), - 10 /* localityWeight */, 1 /* priority */); + Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, + "hostname1", ImmutableMap.of())), + 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); resolver.deliverEndpointAddresses(Collections.singletonList(endpoint2)); @@ -1118,37 +1233,37 @@ public class ClusterResolverLoadBalancerTest { } private static EquivalentAddressGroup makeAddress(final String name) { - class FakeSocketAddress extends SocketAddress { - private final String name; + return new EquivalentAddressGroup(new FakeSocketAddress(name)); + } - private FakeSocketAddress(String name) { - this.name = name; - } + static class FakeSocketAddress extends SocketAddress { + private final String name; - @Override - public int hashCode() { - return Objects.hash(name); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof FakeSocketAddress)) { - return false; - } - FakeSocketAddress that = (FakeSocketAddress) o; - return Objects.equals(name, that.name); - } - - @Override - public String toString() { - return name; - } + private FakeSocketAddress(String name) { + this.name = name; } - return new EquivalentAddressGroup(new FakeSocketAddress(name)); + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof FakeSocketAddress)) { + return false; + } + FakeSocketAddress that = (FakeSocketAddress) o; + return Objects.equals(name, that.name); + } + + @Override + public String toString() { + return name; + } } private static final class FakeXdsClient extends XdsClient { diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java index 610d147ccf..7fac666f98 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java @@ -18,6 +18,7 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; import static io.envoyproxy.envoy.config.route.v3.RouteAction.ClusterSpecifierCase.CLUSTER_SPECIFIER_PLUGIN; +import static io.grpc.xds.XdsClusterResource.TRANSPORT_SOCKET_NAME_HTTP11_PROXY; import static io.grpc.xds.XdsEndpointResource.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS; import static org.junit.Assert.fail; @@ -93,6 +94,7 @@ import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3 import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; import io.envoyproxy.envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin; import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality; +import io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3.Http11ProxyUpstreamTransport; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateProviderPluginInstance; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; @@ -1055,7 +1057,7 @@ public class GrpcXdsClientImplDataTest { } @Test - public void parseLocalityLbEndpoints_withHealthyEndpoints() { + public void parseLocalityLbEndpoints_withHealthyEndpoints() throws ResourceInvalidException { io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto = io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder() .setLocality(Locality.newBuilder() @@ -1075,12 +1077,14 @@ public class GrpcXdsClientImplDataTest { assertThat(struct.getErrorDetail()).isNull(); assertThat(struct.getStruct()).isEqualTo( LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, true, "")), - 100, 1)); + Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, + 20, true, "", ImmutableMap.of())), + 100, 1, ImmutableMap.of())); } @Test - public void parseLocalityLbEndpoints_treatUnknownHealthAsHealthy() { + public void parseLocalityLbEndpoints_treatUnknownHealthAsHealthy() + throws ResourceInvalidException { io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto = io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder() .setLocality(Locality.newBuilder() @@ -1100,12 +1104,13 @@ public class GrpcXdsClientImplDataTest { assertThat(struct.getErrorDetail()).isNull(); assertThat(struct.getStruct()).isEqualTo( LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, true, "")), 100, - 1)); + Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, + 20, true, "", ImmutableMap.of())), + 100, 1, ImmutableMap.of())); } @Test - public void parseLocalityLbEndpoints_withUnHealthyEndpoints() { + public void parseLocalityLbEndpoints_withUnHealthyEndpoints() throws ResourceInvalidException { io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto = io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder() .setLocality(Locality.newBuilder() @@ -1125,12 +1130,13 @@ public class GrpcXdsClientImplDataTest { assertThat(struct.getErrorDetail()).isNull(); assertThat(struct.getStruct()).isEqualTo( LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, false, "")), 100, - 1)); + Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, + false, "", ImmutableMap.of())), + 100, 1, ImmutableMap.of())); } @Test - public void parseLocalityLbEndpoints_ignorZeroWeightLocality() { + public void parseLocalityLbEndpoints_ignorZeroWeightLocality() throws ResourceInvalidException { io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto = io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder() .setLocality(Locality.newBuilder() @@ -1187,7 +1193,10 @@ public class GrpcXdsClientImplDataTest { EquivalentAddressGroup expectedEag = new EquivalentAddressGroup(socketAddressList); assertThat(struct.getStruct()).isEqualTo( LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(expectedEag, 20, true, "")), 100, 1)); + Collections.singletonList(LbEndpoint.create( + expectedEag, 20, true, "", ImmutableMap.of())), 100, 1, ImmutableMap.of())); + } catch (ResourceInvalidException e) { + throw new RuntimeException(e); } finally { if (originalDualStackProp != null) { System.setProperty(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, originalDualStackProp); @@ -1198,7 +1207,7 @@ public class GrpcXdsClientImplDataTest { } @Test - public void parseLocalityLbEndpoints_invalidPriority() { + public void parseLocalityLbEndpoints_invalidPriority() throws ResourceInvalidException { io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto = io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder() .setLocality(Locality.newBuilder() @@ -2456,6 +2465,59 @@ public class GrpcXdsClientImplDataTest { assertThat(update.parsedMetadata()).isEqualTo(expectedParsedMetadata); } + @Test + public void processCluster_parsesAddressMetadata() throws Exception { + + // Create an Address message + Address address = Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("192.168.1.1") + .setPortValue(8080) + .build()) + .build(); + + // Wrap the Address in Any + Any addressMetadata = Any.newBuilder() + .setTypeUrl("type.googleapis.com/envoy.config.core.v3.Address") + .setValue(address.toByteString()) + .build(); + + Struct filterMetadata = Struct.newBuilder() + .putFields("key1", Value.newBuilder().setStringValue("value1").build()) + .putFields("key2", Value.newBuilder().setNumberValue(42).build()) + .build(); + + Metadata metadata = Metadata.newBuilder() + .putTypedFilterMetadata("ADDRESS_METADATA", addressMetadata) + .putFilterMetadata("FILTER_METADATA", filterMetadata) + .build(); + + Cluster cluster = Cluster.newBuilder() + .setName("cluster-foo.googleapis.com") + .setType(DiscoveryType.EDS) + .setEdsClusterConfig( + EdsClusterConfig.newBuilder() + .setEdsConfig( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance())) + .setServiceName("service-foo.googleapis.com")) + .setLbPolicy(LbPolicy.ROUND_ROBIN) + .setMetadata(metadata) + .build(); + + CdsUpdate update = XdsClusterResource.processCluster( + cluster, null, LRS_SERVER_INFO, + LoadBalancerRegistry.getDefaultRegistry()); + + ImmutableMap expectedParsedMetadata = ImmutableMap.of( + "ADDRESS_METADATA", new InetSocketAddress("192.168.1.1", 8080), + "FILTER_METADATA", ImmutableMap.of( + "key1", "value1", + "key2", 42.0)); + + assertThat(update.parsedMetadata()).isEqualTo(expectedParsedMetadata); + } + @Test public void processCluster_metadataKeyCollision_resolvesToTypedMetadata() throws ResourceInvalidException, InvalidProtocolBufferException { @@ -2512,6 +2574,40 @@ public class GrpcXdsClientImplDataTest { metadataRegistry.removeParser(testParser); } + @Test + public void parseNonAggregateCluster_withHttp11ProxyTransportSocket() + throws ResourceInvalidException, InvalidProtocolBufferException { + XdsClusterResource.isEnabledXdsHttpConnect = true; + + Http11ProxyUpstreamTransport http11ProxyUpstreamTransport = + Http11ProxyUpstreamTransport.newBuilder() + .setTransportSocket(TransportSocket.getDefaultInstance()) + .build(); + + TransportSocket transportSocket = TransportSocket.newBuilder() + .setName(TRANSPORT_SOCKET_NAME_HTTP11_PROXY) + .setTypedConfig(Any.pack(http11ProxyUpstreamTransport)) + .build(); + + Cluster cluster = Cluster.newBuilder() + .setName("cluster-http11-proxy.googleapis.com") + .setType(DiscoveryType.EDS) + .setEdsClusterConfig( + EdsClusterConfig.newBuilder() + .setEdsConfig( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setServiceName("service-http11-proxy.googleapis.com")) + .setLbPolicy(LbPolicy.ROUND_ROBIN) + .setTransportSocket(transportSocket) + .build(); + + CdsUpdate result = + XdsClusterResource.processCluster(cluster, null, LRS_SERVER_INFO, + LoadBalancerRegistry.getDefaultRegistry()); + + assertThat(result).isNotNull(); + assertThat(result.isHttp11ProxyAvailable()).isTrue(); + } @Test public void parseServerSideListener_invalidTrafficDirection() throws ResourceInvalidException { diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 00fbfe669a..51c07cb353 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -607,9 +607,9 @@ public abstract class GrpcXdsClientImplTestBase { Locality.create("region1", "zone1", "subzone1"), LocalityLbEndpoints.create( ImmutableList.of(LbEndpoint.create("192.168.0.1", 8080, 2, true, - "endpoint-host-name")), 1, 0), + "endpoint-host-name", ImmutableMap.of())), 1, 0, ImmutableMap.of()), Locality.create("region3", "zone3", "subzone3"), - LocalityLbEndpoints.create(ImmutableList.of(), 2, 1)); + LocalityLbEndpoints.create(ImmutableList.of(), 2, 1, ImmutableMap.of())); } /** @@ -3246,7 +3246,9 @@ public abstract class GrpcXdsClientImplTestBase { Locality.create("region2", "zone2", "subzone2"), LocalityLbEndpoints.create( ImmutableList.of( - LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0)); + LbEndpoint.create("172.44.2.2", 8000, 3, + true, "endpoint-host-name", ImmutableMap.of())), + 2, 0, ImmutableMap.of())); verifyResourceMetadataAcked(EDS, EDS_RESOURCE, updatedClusterLoadAssignment, VERSION_2, TIME_INCREMENT * 2); verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); @@ -3416,7 +3418,9 @@ public abstract class GrpcXdsClientImplTestBase { Locality.create("region2", "zone2", "subzone2"), LocalityLbEndpoints.create( ImmutableList.of( - LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0)); + LbEndpoint.create("172.44.2.2", 8000, 3, + true, "endpoint-host-name", ImmutableMap.of())), + 2, 0, ImmutableMap.of())); verify(watcher2).onChanged(edsUpdateCaptor.capture()); edsUpdate = edsUpdateCaptor.getValue(); assertThat(edsUpdate.clusterName).isEqualTo(edsResourceTwo); @@ -3426,7 +3430,9 @@ public abstract class GrpcXdsClientImplTestBase { Locality.create("region2", "zone2", "subzone2"), LocalityLbEndpoints.create( ImmutableList.of( - LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0)); + LbEndpoint.create("172.44.2.2", 8000, 3, + true, "endpoint-host-name", ImmutableMap.of())), + 2, 0, ImmutableMap.of())); verifyNoMoreInteractions(edsResourceWatcher); verifyResourceMetadataAcked( EDS, edsResourceTwo, clusterLoadAssignmentTwo, VERSION_2, TIME_INCREMENT * 2); diff --git a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java index ea28734ec6..d0580ae266 100644 --- a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java @@ -257,17 +257,17 @@ public class XdsTestUtils { // Need to create endpoints to create locality endpoints map to create edsUpdate Map lbEndpointsMap = new HashMap<>(); - LbEndpoint lbEndpoint = - LbEndpoint.create(serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME); + LbEndpoint lbEndpoint = LbEndpoint.create( + serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME, ImmutableMap.of()); lbEndpointsMap.put( Locality.create("", "", ""), - LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0)); + LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0, ImmutableMap.of())); // Need to create EdsUpdate to create CdsUpdate to create XdsClusterConfig for builder XdsEndpointResource.EdsUpdate edsUpdate = new XdsEndpointResource.EdsUpdate( EDS_NAME, lbEndpointsMap, Collections.emptyList()); XdsClusterResource.CdsUpdate cdsUpdate = XdsClusterResource.CdsUpdate.forEds( - CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null) + CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null, false) .lbPolicyConfig(getWrrLbConfigAsMap()).build(); XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig( CLUSTER_NAME, cdsUpdate, new EndpointConfig(StatusOr.fromValue(edsUpdate))); diff --git a/xds/third_party/envoy/import.sh b/xds/third_party/envoy/import.sh index dbe6f81b1a..7a6b33871b 100755 --- a/xds/third_party/envoy/import.sh +++ b/xds/third_party/envoy/import.sh @@ -86,6 +86,7 @@ envoy/extensions/load_balancing_policies/pick_first/v3/pick_first.proto envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto envoy/extensions/load_balancing_policies/round_robin/v3/round_robin.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto +envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.proto envoy/extensions/transport_sockets/tls/v3/cert.proto envoy/extensions/transport_sockets/tls/v3/common.proto envoy/extensions/transport_sockets/tls/v3/secret.proto diff --git a/xds/third_party/envoy/src/main/proto/envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.proto b/xds/third_party/envoy/src/main/proto/envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.proto new file mode 100644 index 0000000000..2c9b5333f4 --- /dev/null +++ b/xds/third_party/envoy/src/main/proto/envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +package envoy.extensions.transport_sockets.http_11_proxy.v3; + +import "envoy/config/core/v3/base.proto"; + +import "udpa/annotations/status.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3"; +option java_outer_classname = "UpstreamHttp11ConnectProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/http_11_proxy/v3;http_11_proxyv3"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Upstream HTTP/1.1 Proxy] +// [#extension: envoy.transport_sockets.http_11_proxy] + +// HTTP/1.1 proxy transport socket establishes an upstream connection to a proxy address +// instead of the target host's address. This behavior is triggered when the transport +// socket is configured and proxy information is provided. +// +// Behavior when proxying: +// ======================= +// When an upstream connection is established, instead of connecting directly to the endpoint +// address, the client will connect to the specified proxy address, send an HTTP/1.1 ``CONNECT`` request +// indicating the endpoint address, and process the response. If the response has HTTP status 200, +// the connection will be passed down to the underlying transport socket. +// +// Configuring proxy information: +// ============================== +// Set ``typed_filter_metadata`` in :ref:`LbEndpoint.Metadata ` or :ref:`LocalityLbEndpoints.Metadata `. +// using the key ``envoy.http11_proxy_transport_socket.proxy_address`` and the +// proxy address in ``config::core::v3::Address`` format. +// +message Http11ProxyUpstreamTransport { + // The underlying transport socket being wrapped. Defaults to plaintext (raw_buffer) if unset. + config.core.v3.TransportSocket transport_socket = 1; +}