mirror of https://github.com/grpc/grpc-java.git
xds: xDS-based HTTP CONNECT configuration (#11861)
This commit is contained in:
parent
c340f4a2f3
commit
12197065fe
|
|
@ -85,6 +85,7 @@ java_proto_library(
|
|||
"@envoy_api//envoy/extensions/load_balancing_policies/ring_hash/v3:pkg",
|
||||
"@envoy_api//envoy/extensions/load_balancing_policies/round_robin/v3:pkg",
|
||||
"@envoy_api//envoy/extensions/load_balancing_policies/wrr_locality/v3:pkg",
|
||||
"@envoy_api//envoy/extensions/transport_sockets/http_11_proxy/v3:pkg",
|
||||
"@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg",
|
||||
"@envoy_api//envoy/service/discovery/v3:pkg",
|
||||
"@envoy_api//envoy/service/load_stats/v3:pkg",
|
||||
|
|
|
|||
|
|
@ -243,7 +243,9 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
|||
}
|
||||
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.unmodifiableList(instances), configOrError.getConfig());
|
||||
Collections.unmodifiableList(instances),
|
||||
configOrError.getConfig(),
|
||||
root.result.isHttp11ProxyAvailable());
|
||||
if (childLb == null) {
|
||||
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.protobuf.Struct;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.HttpConnectProxiedSocketAddress;
|
||||
import io.grpc.InternalLogId;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
|
|
@ -59,6 +60,8 @@ import io.grpc.xds.client.XdsClient;
|
|||
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
||||
import io.grpc.xds.client.XdsLogger;
|
||||
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
|
|
@ -430,8 +433,18 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
.set(XdsAttributes.ATTR_SERVER_WEIGHT, weight)
|
||||
.set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname())
|
||||
.build();
|
||||
EquivalentAddressGroup eag = new EquivalentAddressGroup(
|
||||
endpoint.eag().getAddresses(), attr);
|
||||
|
||||
EquivalentAddressGroup eag;
|
||||
if (config.isHttp11ProxyAvailable()) {
|
||||
List<SocketAddress> rewrittenAddresses = new ArrayList<>();
|
||||
for (SocketAddress addr : endpoint.eag().getAddresses()) {
|
||||
rewrittenAddresses.add(rewriteAddress(
|
||||
addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata()));
|
||||
}
|
||||
eag = new EquivalentAddressGroup(rewrittenAddresses, attr);
|
||||
} else {
|
||||
eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
|
||||
}
|
||||
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
|
||||
addresses.add(eag);
|
||||
}
|
||||
|
|
@ -469,6 +482,35 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
new EndpointsUpdated().run();
|
||||
}
|
||||
|
||||
private SocketAddress rewriteAddress(SocketAddress addr,
|
||||
ImmutableMap<String, Object> endpointMetadata,
|
||||
ImmutableMap<String, Object> localityMetadata) {
|
||||
if (!(addr instanceof InetSocketAddress)) {
|
||||
return addr;
|
||||
}
|
||||
|
||||
SocketAddress proxyAddress;
|
||||
try {
|
||||
proxyAddress = (SocketAddress) endpointMetadata.get(
|
||||
"envoy.http11_proxy_transport_socket.proxy_address");
|
||||
if (proxyAddress == null) {
|
||||
proxyAddress = (SocketAddress) localityMetadata.get(
|
||||
"envoy.http11_proxy_transport_socket.proxy_address");
|
||||
}
|
||||
} catch (ClassCastException e) {
|
||||
return addr;
|
||||
}
|
||||
|
||||
if (proxyAddress == null) {
|
||||
return addr;
|
||||
}
|
||||
|
||||
return HttpConnectProxiedSocketAddress.newBuilder()
|
||||
.setTargetAddress((InetSocketAddress) addr)
|
||||
.setProxyAddress(proxyAddress)
|
||||
.build();
|
||||
}
|
||||
|
||||
private List<String> generatePriorityNames(String name,
|
||||
Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
|
||||
TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
|
||||
|
|
|
|||
|
|
@ -74,10 +74,17 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
|
|||
final List<DiscoveryMechanism> discoveryMechanisms;
|
||||
// GracefulSwitch configuration
|
||||
final Object lbConfig;
|
||||
private final boolean isHttp11ProxyAvailable;
|
||||
|
||||
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig) {
|
||||
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig,
|
||||
boolean isHttp11ProxyAvailable) {
|
||||
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
|
||||
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
|
||||
this.isHttp11ProxyAvailable = isHttp11ProxyAvailable;
|
||||
}
|
||||
|
||||
boolean isHttp11ProxyAvailable() {
|
||||
return isHttp11ProxyAvailable;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
|
|||
import com.google.auto.value.AutoValue;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.grpc.EquivalentAddressGroup;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
|
|
@ -41,11 +42,13 @@ final class Endpoints {
|
|||
// Locality's priority level.
|
||||
abstract int priority();
|
||||
|
||||
abstract ImmutableMap<String, Object> localityMetadata();
|
||||
|
||||
static LocalityLbEndpoints create(List<LbEndpoint> endpoints, int localityWeight,
|
||||
int priority) {
|
||||
int priority, ImmutableMap<String, Object> localityMetadata) {
|
||||
checkArgument(localityWeight > 0, "localityWeight must be greater than 0");
|
||||
return new AutoValue_Endpoints_LocalityLbEndpoints(
|
||||
ImmutableList.copyOf(endpoints), localityWeight, priority);
|
||||
ImmutableList.copyOf(endpoints), localityWeight, priority, localityMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -63,17 +66,20 @@ final class Endpoints {
|
|||
|
||||
abstract String hostname();
|
||||
|
||||
abstract ImmutableMap<String, Object> endpointMetadata();
|
||||
|
||||
static LbEndpoint create(EquivalentAddressGroup eag, int loadBalancingWeight,
|
||||
boolean isHealthy, String hostname) {
|
||||
return new AutoValue_Endpoints_LbEndpoint(eag, loadBalancingWeight, isHealthy, hostname);
|
||||
boolean isHealthy, String hostname, ImmutableMap<String, Object> endpointMetadata) {
|
||||
return new AutoValue_Endpoints_LbEndpoint(
|
||||
eag, loadBalancingWeight, isHealthy, hostname, endpointMetadata);
|
||||
}
|
||||
|
||||
// Only for testing.
|
||||
@VisibleForTesting
|
||||
static LbEndpoint create(
|
||||
String address, int port, int loadBalancingWeight, boolean isHealthy, String hostname) {
|
||||
static LbEndpoint create(String address, int port, int loadBalancingWeight, boolean isHealthy,
|
||||
String hostname, ImmutableMap<String, Object> endpointMetadata) {
|
||||
return LbEndpoint.create(new EquivalentAddressGroup(new InetSocketAddress(address, port)),
|
||||
loadBalancingWeight, isHealthy, hostname);
|
||||
loadBalancingWeight, isHealthy, hostname, endpointMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import io.grpc.MethodDescriptor;
|
|||
import io.grpc.Status;
|
||||
import io.grpc.auth.MoreCallCredentials;
|
||||
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
||||
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
|
@ -240,11 +241,16 @@ final class GcpAuthenticationFilter implements Filter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String parse(Any any) throws InvalidProtocolBufferException {
|
||||
Audience audience = any.unpack(Audience.class);
|
||||
public String parse(Any any) throws ResourceInvalidException {
|
||||
Audience audience;
|
||||
try {
|
||||
audience = any.unpack(Audience.class);
|
||||
} catch (InvalidProtocolBufferException ex) {
|
||||
throw new ResourceInvalidException("Invalid Resource in address proto", ex);
|
||||
}
|
||||
String url = audience.getUrl();
|
||||
if (url.isEmpty()) {
|
||||
throw new InvalidProtocolBufferException(
|
||||
throw new ResourceInvalidException(
|
||||
"Audience URL is empty. Metadata value must contain a valid URL.");
|
||||
}
|
||||
return url;
|
||||
|
|
|
|||
|
|
@ -17,9 +17,14 @@
|
|||
package io.grpc.xds;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.Struct;
|
||||
import io.envoyproxy.envoy.config.core.v3.Metadata;
|
||||
import io.grpc.xds.GcpAuthenticationFilter.AudienceMetadataParser;
|
||||
import io.grpc.xds.XdsEndpointResource.AddressMetadataParser;
|
||||
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
|
||||
import io.grpc.xds.internal.ProtobufJsonConverter;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
|
@ -36,6 +41,7 @@ final class MetadataRegistry {
|
|||
|
||||
private MetadataRegistry() {
|
||||
registerParser(new AudienceMetadataParser());
|
||||
registerParser(new AddressMetadataParser());
|
||||
}
|
||||
|
||||
static MetadataRegistry getInstance() {
|
||||
|
|
@ -55,6 +61,54 @@ final class MetadataRegistry {
|
|||
supportedParsers.remove(parser.getTypeUrl());
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses cluster metadata into a structured map.
|
||||
*
|
||||
* <p>Values in {@code typed_filter_metadata} take precedence over
|
||||
* {@code filter_metadata} when keys overlap, following Envoy API behavior. See
|
||||
* <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/config/core/v3/base.proto#L217-L259">
|
||||
* Envoy metadata documentation </a> for details.
|
||||
*
|
||||
* @param metadata the {@link Metadata} containing the fields to parse.
|
||||
* @return an immutable map of parsed metadata.
|
||||
* @throws ResourceInvalidException if parsing {@code typed_filter_metadata} fails.
|
||||
*/
|
||||
public ImmutableMap<String, Object> parseMetadata(Metadata metadata)
|
||||
throws ResourceInvalidException {
|
||||
ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder();
|
||||
|
||||
// Process typed_filter_metadata
|
||||
for (Map.Entry<String, Any> entry : metadata.getTypedFilterMetadataMap().entrySet()) {
|
||||
String key = entry.getKey();
|
||||
Any value = entry.getValue();
|
||||
MetadataValueParser parser = findParser(value.getTypeUrl());
|
||||
if (parser != null) {
|
||||
try {
|
||||
Object parsedValue = parser.parse(value);
|
||||
parsedMetadata.put(key, parsedValue);
|
||||
} catch (ResourceInvalidException e) {
|
||||
throw new ResourceInvalidException(
|
||||
String.format("Failed to parse metadata key: %s, type: %s. Error: %s",
|
||||
key, value.getTypeUrl(), e.getMessage()), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
// building once to reuse in the next loop
|
||||
ImmutableMap<String, Object> intermediateParsedMetadata = parsedMetadata.build();
|
||||
|
||||
// Process filter_metadata for remaining keys
|
||||
for (Map.Entry<String, Struct> entry : metadata.getFilterMetadataMap().entrySet()) {
|
||||
String key = entry.getKey();
|
||||
if (!intermediateParsedMetadata.containsKey(key)) {
|
||||
Struct structValue = entry.getValue();
|
||||
Object jsonValue = ProtobufJsonConverter.convertToJson(structValue);
|
||||
parsedMetadata.put(key, jsonValue);
|
||||
}
|
||||
}
|
||||
|
||||
return parsedMetadata.build();
|
||||
}
|
||||
|
||||
interface MetadataValueParser {
|
||||
|
||||
String getTypeUrl();
|
||||
|
|
@ -64,8 +118,8 @@ final class MetadataRegistry {
|
|||
*
|
||||
* @param any the {@link Any} object to parse.
|
||||
* @return the parsed metadata value.
|
||||
* @throws InvalidProtocolBufferException if the parsing fails.
|
||||
* @throws ResourceInvalidException if the parsing fails.
|
||||
*/
|
||||
Object parse(Any any) throws InvalidProtocolBufferException;
|
||||
Object parse(Any any) throws ResourceInvalidException;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,6 @@ import com.google.common.base.MoreObjects;
|
|||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.Duration;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.Message;
|
||||
|
|
@ -33,10 +32,11 @@ import com.google.protobuf.Struct;
|
|||
import com.google.protobuf.util.Durations;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
||||
import io.envoyproxy.envoy.config.core.v3.Metadata;
|
||||
import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
|
||||
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
|
||||
import io.envoyproxy.envoy.config.core.v3.TransportSocket;
|
||||
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
||||
import io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3.Http11ProxyUpstreamTransport;
|
||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
|
||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
|
|
@ -46,15 +46,12 @@ import io.grpc.internal.ServiceConfigUtil;
|
|||
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
|
||||
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
|
||||
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
||||
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
||||
import io.grpc.xds.client.XdsClient.ResourceUpdate;
|
||||
import io.grpc.xds.client.XdsResourceType;
|
||||
import io.grpc.xds.internal.ProtobufJsonConverter;
|
||||
import io.grpc.xds.internal.security.CommonTlsContextUtil;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
|
|
@ -67,6 +64,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
@VisibleForTesting
|
||||
public static boolean enableSystemRootCerts =
|
||||
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_SYSTEM_ROOT_CERTS", false);
|
||||
static boolean isEnabledXdsHttpConnect =
|
||||
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT", false);
|
||||
|
||||
@VisibleForTesting
|
||||
static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate";
|
||||
|
|
@ -78,6 +77,9 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
"type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext";
|
||||
private static final String TYPE_URL_UPSTREAM_TLS_CONTEXT_V2 =
|
||||
"type.googleapis.com/envoy.api.v2.auth.UpstreamTlsContext";
|
||||
static final String TRANSPORT_SOCKET_NAME_HTTP11_PROXY =
|
||||
"type.googleapis.com/envoy.extensions.transport_sockets.http_11_proxy.v3"
|
||||
+ ".Http11ProxyUpstreamTransport";
|
||||
private final LoadBalancerRegistry loadBalancerRegistry
|
||||
= LoadBalancerRegistry.getDefaultRegistry();
|
||||
|
||||
|
|
@ -177,10 +179,11 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
ImmutableMap.copyOf(cluster.getMetadata().getFilterMetadataMap()));
|
||||
|
||||
try {
|
||||
MetadataRegistry registry = MetadataRegistry.getInstance();
|
||||
ImmutableMap<String, Object> parsedFilterMetadata =
|
||||
parseClusterMetadata(cluster.getMetadata());
|
||||
registry.parseMetadata(cluster.getMetadata());
|
||||
updateBuilder.parsedMetadata(parsedFilterMetadata);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
} catch (ResourceInvalidException e) {
|
||||
throw new ResourceInvalidException(
|
||||
"Failed to parse xDS filter metadata for cluster '" + cluster.getName() + "': "
|
||||
+ e.getMessage(), e);
|
||||
|
|
@ -189,49 +192,6 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
return updateBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses cluster metadata into a structured map.
|
||||
*
|
||||
* <p>Values in {@code typed_filter_metadata} take precedence over
|
||||
* {@code filter_metadata} when keys overlap, following Envoy API behavior. See
|
||||
* <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/config/core/v3/base.proto#L217-L259">
|
||||
* Envoy metadata documentation </a> for details.
|
||||
*
|
||||
* @param metadata the {@link Metadata} containing the fields to parse.
|
||||
* @return an immutable map of parsed metadata.
|
||||
* @throws InvalidProtocolBufferException if parsing {@code typed_filter_metadata} fails.
|
||||
*/
|
||||
private static ImmutableMap<String, Object> parseClusterMetadata(Metadata metadata)
|
||||
throws InvalidProtocolBufferException {
|
||||
ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder();
|
||||
|
||||
MetadataRegistry registry = MetadataRegistry.getInstance();
|
||||
// Process typed_filter_metadata
|
||||
for (Map.Entry<String, Any> entry : metadata.getTypedFilterMetadataMap().entrySet()) {
|
||||
String key = entry.getKey();
|
||||
Any value = entry.getValue();
|
||||
MetadataValueParser parser = registry.findParser(value.getTypeUrl());
|
||||
if (parser != null) {
|
||||
Object parsedValue = parser.parse(value);
|
||||
parsedMetadata.put(key, parsedValue);
|
||||
}
|
||||
}
|
||||
// building once to reuse in the next loop
|
||||
ImmutableMap<String, Object> intermediateParsedMetadata = parsedMetadata.build();
|
||||
|
||||
// Process filter_metadata for remaining keys
|
||||
for (Map.Entry<String, Struct> entry : metadata.getFilterMetadataMap().entrySet()) {
|
||||
String key = entry.getKey();
|
||||
if (!intermediateParsedMetadata.containsKey(key)) {
|
||||
Struct structValue = entry.getValue();
|
||||
Object jsonValue = ProtobufJsonConverter.convertToJson(structValue);
|
||||
parsedMetadata.put(key, jsonValue);
|
||||
}
|
||||
}
|
||||
|
||||
return parsedMetadata.build();
|
||||
}
|
||||
|
||||
private static StructOrError<CdsUpdate.Builder> parseAggregateCluster(Cluster cluster) {
|
||||
String clusterName = cluster.getName();
|
||||
Cluster.CustomClusterType customType = cluster.getClusterType();
|
||||
|
|
@ -259,6 +219,7 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
Long maxConcurrentRequests = null;
|
||||
UpstreamTlsContext upstreamTlsContext = null;
|
||||
OutlierDetection outlierDetection = null;
|
||||
boolean isHttp11ProxyAvailable = false;
|
||||
if (cluster.hasLrsServer()) {
|
||||
if (!cluster.getLrsServer().hasSelf()) {
|
||||
return StructOrError.fromError(
|
||||
|
|
@ -281,17 +242,43 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
return StructOrError.fromError("Cluster " + clusterName
|
||||
+ ": transport-socket-matches not supported.");
|
||||
}
|
||||
if (cluster.hasTransportSocket()) {
|
||||
if (!TRANSPORT_SOCKET_NAME_TLS.equals(cluster.getTransportSocket().getName())) {
|
||||
return StructOrError.fromError("transport-socket with name "
|
||||
+ cluster.getTransportSocket().getName() + " not supported.");
|
||||
boolean hasTransportSocket = cluster.hasTransportSocket();
|
||||
TransportSocket transportSocket = cluster.getTransportSocket();
|
||||
|
||||
if (hasTransportSocket && !TRANSPORT_SOCKET_NAME_TLS.equals(transportSocket.getName())
|
||||
&& !(isEnabledXdsHttpConnect
|
||||
&& TRANSPORT_SOCKET_NAME_HTTP11_PROXY.equals(transportSocket.getName()))) {
|
||||
return StructOrError.fromError(
|
||||
"transport-socket with name " + transportSocket.getName() + " not supported.");
|
||||
}
|
||||
|
||||
if (hasTransportSocket && isEnabledXdsHttpConnect
|
||||
&& TRANSPORT_SOCKET_NAME_HTTP11_PROXY.equals(transportSocket.getName())) {
|
||||
isHttp11ProxyAvailable = true;
|
||||
try {
|
||||
Http11ProxyUpstreamTransport wrappedTransportSocket = transportSocket
|
||||
.getTypedConfig().unpack(io.envoyproxy.envoy.extensions.transport_sockets
|
||||
.http_11_proxy.v3.Http11ProxyUpstreamTransport.class);
|
||||
hasTransportSocket = wrappedTransportSocket.hasTransportSocket();
|
||||
transportSocket = wrappedTransportSocket.getTransportSocket();
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
return StructOrError.fromError(
|
||||
"Cluster " + clusterName + ": malformed Http11ProxyUpstreamTransport: " + e);
|
||||
} catch (ClassCastException e) {
|
||||
return StructOrError.fromError(
|
||||
"Cluster " + clusterName
|
||||
+ ": invalid transport_socket type in Http11ProxyUpstreamTransport");
|
||||
}
|
||||
}
|
||||
|
||||
if (hasTransportSocket && TRANSPORT_SOCKET_NAME_TLS.equals(transportSocket.getName())) {
|
||||
try {
|
||||
upstreamTlsContext = UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(
|
||||
validateUpstreamTlsContext(
|
||||
unpackCompatibleType(cluster.getTransportSocket().getTypedConfig(),
|
||||
io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext.class,
|
||||
TYPE_URL_UPSTREAM_TLS_CONTEXT, TYPE_URL_UPSTREAM_TLS_CONTEXT_V2),
|
||||
unpackCompatibleType(transportSocket.getTypedConfig(),
|
||||
io.envoyproxy.envoy.extensions
|
||||
.transport_sockets.tls.v3.UpstreamTlsContext.class,
|
||||
TYPE_URL_UPSTREAM_TLS_CONTEXT, TYPE_URL_UPSTREAM_TLS_CONTEXT_V2),
|
||||
certProviderInstances));
|
||||
} catch (InvalidProtocolBufferException | ResourceInvalidException e) {
|
||||
return StructOrError.fromError(
|
||||
|
|
@ -329,9 +316,10 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
return StructOrError.fromError(
|
||||
"EDS service_name must be set when Cluster resource has an xdstp name");
|
||||
}
|
||||
|
||||
return StructOrError.fromStruct(CdsUpdate.forEds(
|
||||
clusterName, edsServiceName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext,
|
||||
outlierDetection));
|
||||
outlierDetection, isHttp11ProxyAvailable));
|
||||
} else if (type.equals(Cluster.DiscoveryType.LOGICAL_DNS)) {
|
||||
if (!cluster.hasLoadAssignment()) {
|
||||
return StructOrError.fromError(
|
||||
|
|
@ -366,7 +354,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
String dnsHostName = String.format(
|
||||
Locale.US, "%s:%d", socketAddress.getAddress(), socketAddress.getPortValue());
|
||||
return StructOrError.fromStruct(CdsUpdate.forLogicalDns(
|
||||
clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext));
|
||||
clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests,
|
||||
upstreamTlsContext, isHttp11ProxyAvailable));
|
||||
}
|
||||
return StructOrError.fromError(
|
||||
"Cluster " + clusterName + ": unsupported built-in discovery type: " + type);
|
||||
|
|
@ -620,6 +609,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
@Nullable
|
||||
abstract UpstreamTlsContext upstreamTlsContext();
|
||||
|
||||
abstract boolean isHttp11ProxyAvailable();
|
||||
|
||||
// List of underlying clusters making of this aggregate cluster.
|
||||
// Only valid for AGGREGATE cluster.
|
||||
@Nullable
|
||||
|
|
@ -640,7 +631,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
.maxRingSize(0)
|
||||
.choiceCount(0)
|
||||
.filterMetadata(ImmutableMap.of())
|
||||
.parsedMetadata(ImmutableMap.of());
|
||||
.parsedMetadata(ImmutableMap.of())
|
||||
.isHttp11ProxyAvailable(false);
|
||||
}
|
||||
|
||||
static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) {
|
||||
|
|
@ -653,26 +645,30 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
static Builder forEds(String clusterName, @Nullable String edsServiceName,
|
||||
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
|
||||
@Nullable UpstreamTlsContext upstreamTlsContext,
|
||||
@Nullable OutlierDetection outlierDetection) {
|
||||
@Nullable OutlierDetection outlierDetection,
|
||||
boolean isHttp11ProxyAvailable) {
|
||||
return newBuilder(clusterName)
|
||||
.clusterType(ClusterType.EDS)
|
||||
.edsServiceName(edsServiceName)
|
||||
.lrsServerInfo(lrsServerInfo)
|
||||
.maxConcurrentRequests(maxConcurrentRequests)
|
||||
.upstreamTlsContext(upstreamTlsContext)
|
||||
.outlierDetection(outlierDetection);
|
||||
.outlierDetection(outlierDetection)
|
||||
.isHttp11ProxyAvailable(isHttp11ProxyAvailable);
|
||||
}
|
||||
|
||||
static Builder forLogicalDns(String clusterName, String dnsHostName,
|
||||
@Nullable ServerInfo lrsServerInfo,
|
||||
@Nullable Long maxConcurrentRequests,
|
||||
@Nullable UpstreamTlsContext upstreamTlsContext) {
|
||||
@Nullable UpstreamTlsContext upstreamTlsContext,
|
||||
boolean isHttp11ProxyAvailable) {
|
||||
return newBuilder(clusterName)
|
||||
.clusterType(ClusterType.LOGICAL_DNS)
|
||||
.dnsHostName(dnsHostName)
|
||||
.lrsServerInfo(lrsServerInfo)
|
||||
.maxConcurrentRequests(maxConcurrentRequests)
|
||||
.upstreamTlsContext(upstreamTlsContext);
|
||||
.upstreamTlsContext(upstreamTlsContext)
|
||||
.isHttp11ProxyAvailable(isHttp11ProxyAvailable);
|
||||
}
|
||||
|
||||
enum ClusterType {
|
||||
|
|
@ -749,6 +745,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
// Private, use one of the static factory methods instead.
|
||||
protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests);
|
||||
|
||||
protected abstract Builder isHttp11ProxyAvailable(boolean isHttp11ProxyAvailable);
|
||||
|
||||
// Private, use one of the static factory methods instead.
|
||||
protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext);
|
||||
|
||||
|
|
|
|||
|
|
@ -20,9 +20,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.net.InetAddresses;
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.Message;
|
||||
import io.envoyproxy.envoy.config.core.v3.Address;
|
||||
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
|
||||
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
|
||||
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
||||
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
|
||||
import io.envoyproxy.envoy.type.v3.FractionalPercent;
|
||||
|
|
@ -30,6 +35,7 @@ import io.grpc.EquivalentAddressGroup;
|
|||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.xds.Endpoints.DropOverload;
|
||||
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
||||
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
||||
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
|
||||
import io.grpc.xds.client.Locality;
|
||||
import io.grpc.xds.client.XdsClient.ResourceUpdate;
|
||||
|
|
@ -185,7 +191,8 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
|
|||
@VisibleForTesting
|
||||
@Nullable
|
||||
static StructOrError<LocalityLbEndpoints> parseLocalityLbEndpoints(
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto) {
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto)
|
||||
throws ResourceInvalidException {
|
||||
// Filter out localities without or with 0 weight.
|
||||
if (!proto.hasLoadBalancingWeight() || proto.getLoadBalancingWeight().getValue() < 1) {
|
||||
return null;
|
||||
|
|
@ -193,6 +200,15 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
|
|||
if (proto.getPriority() < 0) {
|
||||
return StructOrError.fromError("negative priority");
|
||||
}
|
||||
|
||||
ImmutableMap<String, Object> localityMetadata;
|
||||
MetadataRegistry registry = MetadataRegistry.getInstance();
|
||||
try {
|
||||
localityMetadata = registry.parseMetadata(proto.getMetadata());
|
||||
} catch (ResourceInvalidException e) {
|
||||
throw new ResourceInvalidException("Failed to parse Locality Endpoint metadata: "
|
||||
+ e.getMessage(), e);
|
||||
}
|
||||
List<Endpoints.LbEndpoint> endpoints = new ArrayList<>(proto.getLbEndpointsCount());
|
||||
for (io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint endpoint : proto.getLbEndpointsList()) {
|
||||
// The endpoint field of each lb_endpoints must be set.
|
||||
|
|
@ -200,6 +216,13 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
|
|||
if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) {
|
||||
return StructOrError.fromError("LbEndpoint with no endpoint/address");
|
||||
}
|
||||
ImmutableMap<String, Object> endpointMetadata;
|
||||
try {
|
||||
endpointMetadata = registry.parseMetadata(endpoint.getMetadata());
|
||||
} catch (ResourceInvalidException e) {
|
||||
throw new ResourceInvalidException("Failed to parse Endpoint metadata: "
|
||||
+ e.getMessage(), e);
|
||||
}
|
||||
List<java.net.SocketAddress> addresses = new ArrayList<>();
|
||||
addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress()));
|
||||
|
||||
|
|
@ -214,10 +237,12 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
|
|||
endpoints.add(Endpoints.LbEndpoint.create(
|
||||
new EquivalentAddressGroup(addresses),
|
||||
endpoint.getLoadBalancingWeight().getValue(), isHealthy,
|
||||
endpoint.getEndpoint().getHostname()));
|
||||
endpoint.getEndpoint().getHostname(),
|
||||
endpointMetadata));
|
||||
}
|
||||
return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create(
|
||||
endpoints, proto.getLoadBalancingWeight().getValue(), proto.getPriority()));
|
||||
endpoints, proto.getLoadBalancingWeight().getValue(),
|
||||
proto.getPriority(), localityMetadata));
|
||||
}
|
||||
|
||||
private static InetSocketAddress getInetSocketAddress(Address address) {
|
||||
|
|
@ -270,4 +295,47 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
|
|||
.toString();
|
||||
}
|
||||
}
|
||||
|
||||
public static class AddressMetadataParser implements MetadataValueParser {
|
||||
|
||||
@Override
|
||||
public String getTypeUrl() {
|
||||
return "type.googleapis.com/envoy.config.core.v3.Address";
|
||||
}
|
||||
|
||||
@Override
|
||||
public java.net.SocketAddress parse(Any any) throws ResourceInvalidException {
|
||||
SocketAddress socketAddress;
|
||||
try {
|
||||
socketAddress = any.unpack(Address.class).getSocketAddress();
|
||||
} catch (InvalidProtocolBufferException ex) {
|
||||
throw new ResourceInvalidException("Invalid Resource in address proto", ex);
|
||||
}
|
||||
validateAddress(socketAddress);
|
||||
|
||||
String ip = socketAddress.getAddress();
|
||||
int port = socketAddress.getPortValue();
|
||||
|
||||
try {
|
||||
return new InetSocketAddress(InetAddresses.forString(ip), port);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw createException("Invalid IP address or port: " + ip + ":" + port);
|
||||
}
|
||||
}
|
||||
|
||||
private void validateAddress(SocketAddress socketAddress) throws ResourceInvalidException {
|
||||
if (socketAddress.getAddress().isEmpty()) {
|
||||
throw createException("Address field is empty or invalid.");
|
||||
}
|
||||
long port = Integer.toUnsignedLong(socketAddress.getPortValue());
|
||||
if (port > 65535) {
|
||||
throw createException(String.format("Port value %d out of range 1-65535.", port));
|
||||
}
|
||||
}
|
||||
|
||||
private ResourceInvalidException createException(String message) {
|
||||
return new ResourceInvalidException(
|
||||
"Failed to parse envoy.config.core.v3.Address: " + message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -179,7 +179,7 @@ public class CdsLoadBalancer2Test {
|
|||
public void discoverTopLevelEdsCluster() {
|
||||
CdsUpdate update =
|
||||
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
||||
outlierDetection)
|
||||
outlierDetection, false)
|
||||
.roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||
assertThat(childBalancers).hasSize(1);
|
||||
|
|
@ -198,7 +198,8 @@ public class CdsLoadBalancer2Test {
|
|||
@Test
|
||||
public void discoverTopLevelLogicalDnsCluster() {
|
||||
CdsUpdate update =
|
||||
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext)
|
||||
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
||||
false)
|
||||
.leastRequestLbPolicy(3).build();
|
||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||
assertThat(childBalancers).hasSize(1);
|
||||
|
|
@ -232,7 +233,7 @@ public class CdsLoadBalancer2Test {
|
|||
@Test
|
||||
public void nonAggregateCluster_resourceUpdate() {
|
||||
CdsUpdate update =
|
||||
CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext, outlierDetection)
|
||||
CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext, outlierDetection, false)
|
||||
.roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||
assertThat(childBalancers).hasSize(1);
|
||||
|
|
@ -243,7 +244,7 @@ public class CdsLoadBalancer2Test {
|
|||
100L, upstreamTlsContext, outlierDetection);
|
||||
|
||||
update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, null,
|
||||
outlierDetection).roundRobinLbPolicy().build();
|
||||
outlierDetection, false).roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||
childLbConfig = (ClusterResolverConfig) childBalancer.config;
|
||||
instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
|
||||
|
|
@ -254,7 +255,8 @@ public class CdsLoadBalancer2Test {
|
|||
@Test
|
||||
public void nonAggregateCluster_resourceRevoked() {
|
||||
CdsUpdate update =
|
||||
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext)
|
||||
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext,
|
||||
false)
|
||||
.roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||
assertThat(childBalancers).hasSize(1);
|
||||
|
|
@ -298,16 +300,16 @@ public class CdsLoadBalancer2Test {
|
|||
CLUSTER, cluster1, cluster2, cluster3, cluster4);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
|
||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
||||
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster3, update3);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
CdsUpdate update2 =
|
||||
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null)
|
||||
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null, false)
|
||||
.roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster2, update2);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
CdsUpdate update4 =
|
||||
CdsUpdate.forEds(cluster4, null, LRS_SERVER_INFO, 300L, null, outlierDetection)
|
||||
CdsUpdate.forEds(cluster4, null, LRS_SERVER_INFO, 300L, null, outlierDetection, false)
|
||||
.roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster4, update4);
|
||||
assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered
|
||||
|
|
@ -362,10 +364,11 @@ public class CdsLoadBalancer2Test {
|
|||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
|
||||
CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
|
||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
||||
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster1, update1);
|
||||
CdsUpdate update2 =
|
||||
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null)
|
||||
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null,
|
||||
false)
|
||||
.roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster2, update2);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
|
|
@ -412,10 +415,11 @@ public class CdsLoadBalancer2Test {
|
|||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
|
||||
CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
|
||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
||||
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster1, update1);
|
||||
CdsUpdate update2 =
|
||||
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null)
|
||||
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null,
|
||||
false)
|
||||
.roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster2, update2);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
|
|
@ -467,7 +471,7 @@ public class CdsLoadBalancer2Test {
|
|||
xdsClient.deliverCdsUpdate(cluster2, update2);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3);
|
||||
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
||||
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster3, update3);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
|
||||
|
|
@ -518,7 +522,7 @@ public class CdsLoadBalancer2Test {
|
|||
|
||||
reset(helper);
|
||||
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
||||
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster3, update3);
|
||||
verify(helper).updateBalancingState(
|
||||
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
|
|
@ -553,7 +557,7 @@ public class CdsLoadBalancer2Test {
|
|||
.roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster2, update2);
|
||||
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
||||
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster3, update3);
|
||||
|
||||
// cluster2 (aggr.) -> [cluster3 (EDS)]
|
||||
|
|
@ -602,7 +606,7 @@ public class CdsLoadBalancer2Test {
|
|||
|
||||
// Define EDS cluster
|
||||
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
||||
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster3, update3);
|
||||
|
||||
// cluster4 (agg) -> [cluster3 (EDS)] with dups (3 copies)
|
||||
|
|
@ -649,7 +653,8 @@ public class CdsLoadBalancer2Test {
|
|||
.roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||
CdsUpdate update1 =
|
||||
CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_INFO, 200L, null)
|
||||
CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_INFO, 200L, null,
|
||||
false)
|
||||
.roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(cluster1, update1);
|
||||
FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers);
|
||||
|
|
@ -676,7 +681,7 @@ public class CdsLoadBalancer2Test {
|
|||
@Test
|
||||
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
|
||||
CdsUpdate update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
||||
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
assertThat(childBalancer.shutdown).isFalse();
|
||||
|
|
@ -692,7 +697,7 @@ public class CdsLoadBalancer2Test {
|
|||
try {
|
||||
xdsClient.deliverCdsUpdate(CLUSTER,
|
||||
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
||||
outlierDetection)
|
||||
outlierDetection, false)
|
||||
.lbPolicyConfig(ImmutableMap.of("unknownLb", ImmutableMap.of("foo", "bar"))).build());
|
||||
} catch (Exception e) {
|
||||
assertThat(e).hasMessageThat().contains("unknownLb");
|
||||
|
|
@ -706,7 +711,7 @@ public class CdsLoadBalancer2Test {
|
|||
try {
|
||||
xdsClient.deliverCdsUpdate(CLUSTER,
|
||||
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
||||
outlierDetection).lbPolicyConfig(
|
||||
outlierDetection, false).lbPolicyConfig(
|
||||
ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1")))
|
||||
.build());
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import io.grpc.Attributes;
|
|||
import io.grpc.ChannelLogger;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.HttpConnectProxiedSocketAddress;
|
||||
import io.grpc.InsecureChannelCredentials;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
|
|
@ -83,6 +84,7 @@ import io.grpc.xds.client.Locality;
|
|||
import io.grpc.xds.client.XdsClient;
|
||||
import io.grpc.xds.client.XdsResourceType;
|
||||
import io.grpc.xds.internal.security.CommonTlsContextTestsUtil;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
|
@ -240,7 +242,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void edsClustersWithRingHashEndpointLbPolicy() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(edsDiscoveryMechanism1), ringHash);
|
||||
Collections.singletonList(edsDiscoveryMechanism1), ringHash, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -252,14 +254,18 @@ public class ClusterResolverLoadBalancerTest {
|
|||
LocalityLbEndpoints localityLbEndpoints1 =
|
||||
LocalityLbEndpoints.create(
|
||||
Arrays.asList(
|
||||
LbEndpoint.create(endpoint1, 0 /* loadBalancingWeight */, true, "hostname1"),
|
||||
LbEndpoint.create(endpoint2, 0 /* loadBalancingWeight */, true, "hostname2")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
LbEndpoint.create(endpoint1, 0 /* loadBalancingWeight */,
|
||||
true, "hostname1", ImmutableMap.of()),
|
||||
LbEndpoint.create(endpoint2, 0 /* loadBalancingWeight */,
|
||||
true, "hostname2", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
LocalityLbEndpoints localityLbEndpoints2 =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(
|
||||
LbEndpoint.create(endpoint3, 60 /* loadBalancingWeight */, true, "hostname3")),
|
||||
50 /* localityWeight */, 1 /* priority */);
|
||||
LbEndpoint.create(
|
||||
endpoint3, 60 /* loadBalancingWeight */, true,
|
||||
"hostname3", ImmutableMap.of())),
|
||||
50 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1,
|
||||
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
|
||||
|
|
@ -302,7 +308,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void edsClustersWithLeastRequestEndpointLbPolicy() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(edsDiscoveryMechanism1), leastRequest);
|
||||
Collections.singletonList(edsDiscoveryMechanism1), leastRequest, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -312,8 +318,9 @@ public class ClusterResolverLoadBalancerTest {
|
|||
LocalityLbEndpoints localityLbEndpoints =
|
||||
LocalityLbEndpoints.create(
|
||||
Arrays.asList(
|
||||
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, "hostname1")),
|
||||
100 /* localityWeight */, 1 /* priority */);
|
||||
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
|
||||
"hostname1", ImmutableMap.of())),
|
||||
100 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1,
|
||||
ImmutableMap.of(locality1, localityLbEndpoints));
|
||||
|
|
@ -348,7 +355,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void edsClustersEndpointHostname_addedToAddressAttribute() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest);
|
||||
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -358,8 +365,9 @@ public class ClusterResolverLoadBalancerTest {
|
|||
LocalityLbEndpoints localityLbEndpoints =
|
||||
LocalityLbEndpoints.create(
|
||||
Arrays.asList(
|
||||
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, "hostname1")),
|
||||
100 /* localityWeight */, 1 /* priority */);
|
||||
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
|
||||
"hostname1", ImmutableMap.of())),
|
||||
100 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1,
|
||||
ImmutableMap.of(locality1, localityLbEndpoints));
|
||||
|
|
@ -371,11 +379,104 @@ public class ClusterResolverLoadBalancerTest {
|
|||
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo("hostname1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void endpointAddressRewritten_whenProxyMetadataIsInEndpointMetadata() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, true);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
||||
EquivalentAddressGroup endpoint =
|
||||
new EquivalentAddressGroup(InetSocketAddress.createUnresolved("127.0.0.1", 8080));
|
||||
|
||||
// Proxy address in endpointMetadata (use FakeSocketAddress directly)
|
||||
SocketAddress proxyAddress = new FakeSocketAddress("127.0.0.2");
|
||||
ImmutableMap<String, Object> endpointMetadata =
|
||||
ImmutableMap.of("envoy.http11_proxy_transport_socket.proxy_address", proxyAddress);
|
||||
|
||||
// No proxy in locality metadata
|
||||
ImmutableMap<String, Object> localityMetadata = ImmutableMap.of();
|
||||
|
||||
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create(
|
||||
Arrays.asList(
|
||||
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
|
||||
"hostname1", endpointMetadata)),
|
||||
100 /* localityWeight */, 1 /* priority */, localityMetadata);
|
||||
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1,
|
||||
ImmutableMap.of(locality1, localityLbEndpoints));
|
||||
|
||||
assertThat(childBalancers).hasSize(1);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
|
||||
// Get the rewritten address
|
||||
SocketAddress rewrittenAddress =
|
||||
childBalancer.addresses.get(0).getAddresses().get(0);
|
||||
assertThat(rewrittenAddress).isInstanceOf(HttpConnectProxiedSocketAddress.class);
|
||||
HttpConnectProxiedSocketAddress proxiedSocket =
|
||||
(HttpConnectProxiedSocketAddress) rewrittenAddress;
|
||||
|
||||
// Assert that the target address is the original address
|
||||
assertThat(proxiedSocket.getTargetAddress())
|
||||
.isEqualTo(endpoint.getAddresses().get(0));
|
||||
|
||||
// Assert that the proxy address is correctly set
|
||||
assertThat(proxiedSocket.getProxyAddress()).isEqualTo(proxyAddress);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void endpointAddressRewritten_whenProxyMetadataIsInLocalityMetadata() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, true);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
||||
EquivalentAddressGroup endpoint =
|
||||
new EquivalentAddressGroup(InetSocketAddress.createUnresolved("127.0.0.2", 8080));
|
||||
|
||||
// No proxy in endpointMetadata
|
||||
ImmutableMap<String, Object> endpointMetadata = ImmutableMap.of();
|
||||
|
||||
// Proxy address is now in localityMetadata
|
||||
SocketAddress proxyAddress = new FakeSocketAddress("proxy-addr");
|
||||
ImmutableMap<String, Object> localityMetadata =
|
||||
ImmutableMap.of("envoy.http11_proxy_transport_socket.proxy_address", proxyAddress);
|
||||
|
||||
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create(
|
||||
Arrays.asList(
|
||||
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
|
||||
"hostname2", endpointMetadata)),
|
||||
100 /* localityWeight */, 1 /* priority */, localityMetadata);
|
||||
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1,
|
||||
ImmutableMap.of(locality1, localityLbEndpoints));
|
||||
|
||||
assertThat(childBalancers).hasSize(1);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
|
||||
// Get the rewritten address
|
||||
SocketAddress rewrittenAddress = childBalancer.addresses.get(0).getAddresses().get(0);
|
||||
|
||||
// Assert that the address was rewritten
|
||||
assertThat(rewrittenAddress).isInstanceOf(HttpConnectProxiedSocketAddress.class);
|
||||
HttpConnectProxiedSocketAddress proxiedSocket =
|
||||
(HttpConnectProxiedSocketAddress) rewrittenAddress;
|
||||
|
||||
// Assert that the target address is the original address
|
||||
assertThat(proxiedSocket.getTargetAddress()).isEqualTo(endpoint.getAddresses().get(0));
|
||||
|
||||
// Assert that the proxy address is correctly set from locality metadata
|
||||
assertThat(proxiedSocket.getProxyAddress()).isEqualTo(proxyAddress);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void onlyEdsClusters_receivedEndpoints() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -389,17 +490,21 @@ public class ClusterResolverLoadBalancerTest {
|
|||
LocalityLbEndpoints localityLbEndpoints1 =
|
||||
LocalityLbEndpoints.create(
|
||||
Arrays.asList(
|
||||
LbEndpoint.create(endpoint1, 100, true, "hostname1"),
|
||||
LbEndpoint.create(endpoint2, 100, true, "hostname1")),
|
||||
70 /* localityWeight */, 1 /* priority */);
|
||||
LbEndpoint.create(endpoint1, 100,
|
||||
true, "hostname1", ImmutableMap.of()),
|
||||
LbEndpoint.create(endpoint2, 100,
|
||||
true, "hostname1", ImmutableMap.of())),
|
||||
70 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
LocalityLbEndpoints localityLbEndpoints2 =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true, "hostname2")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true,
|
||||
"hostname2", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
LocalityLbEndpoints localityLbEndpoints3 =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint4, 100, true, "hostname3")),
|
||||
20 /* localityWeight */, 2 /* priority */);
|
||||
Collections.singletonList(LbEndpoint.create(endpoint4, 100, true,
|
||||
"hostname3", ImmutableMap.of())),
|
||||
20 /* localityWeight */, 2 /* priority */, ImmutableMap.of());
|
||||
String priority1 = CLUSTER2 + "[child1]";
|
||||
String priority2 = CLUSTER2 + "[child2]";
|
||||
String priority3 = CLUSTER1 + "[child1]";
|
||||
|
|
@ -487,7 +592,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
private void verifyEdsPriorityNames(List<String> want,
|
||||
Map<Locality, LocalityLbEndpoints>... updates) {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism2), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism2), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME2);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -553,15 +658,17 @@ public class ClusterResolverLoadBalancerTest {
|
|||
private LocalityLbEndpoints createEndpoints(int priority) {
|
||||
return LocalityLbEndpoints.create(
|
||||
Arrays.asList(
|
||||
LbEndpoint.create(makeAddress("endpoint-addr-1"), 100, true, "hostname1"),
|
||||
LbEndpoint.create(makeAddress("endpoint-addr-2"), 100, true, "hostname2")),
|
||||
70 /* localityWeight */, priority /* priority */);
|
||||
LbEndpoint.create(makeAddress("endpoint-addr-1"), 100,
|
||||
true, "hostname1", ImmutableMap.of()),
|
||||
LbEndpoint.create(makeAddress("endpoint-addr-2"), 100,
|
||||
true, "hostname2", ImmutableMap.of())),
|
||||
70 /* localityWeight */, priority /* priority */, ImmutableMap.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -583,7 +690,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void onlyEdsClusters_allResourcesRevoked_shutDownChildLbPolicy() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -592,12 +699,14 @@ public class ClusterResolverLoadBalancerTest {
|
|||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
||||
LocalityLbEndpoints localityLbEndpoints1 =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, "hostname1")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true,
|
||||
"hostname1", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
LocalityLbEndpoints localityLbEndpoints2 =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true, "hostname2")),
|
||||
20 /* localityWeight */, 2 /* priority */);
|
||||
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true,
|
||||
"hostname2", ImmutableMap.of())),
|
||||
20 /* localityWeight */, 2 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints1));
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
|
|
@ -618,17 +727,19 @@ public class ClusterResolverLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void handleEdsResource_ignoreUnhealthyEndpoints() {
|
||||
ClusterResolverConfig config =
|
||||
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
||||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
||||
LocalityLbEndpoints localityLbEndpoints =
|
||||
LocalityLbEndpoints.create(
|
||||
Arrays.asList(
|
||||
LbEndpoint.create(endpoint1, 100, false /* isHealthy */, "hostname1"),
|
||||
LbEndpoint.create(endpoint2, 100, true /* isHealthy */, "hostname2")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
|
||||
"hostname1", ImmutableMap.of()),
|
||||
LbEndpoint.create(endpoint2, 100, true /* isHealthy */,
|
||||
"hostname2", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
|
|
@ -638,21 +749,21 @@ public class ClusterResolverLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() {
|
||||
ClusterResolverConfig config =
|
||||
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
||||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
||||
LocalityLbEndpoints localityLbEndpoints1 =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
|
||||
"hostname1")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
"hostname1", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
LocalityLbEndpoints localityLbEndpoints2 =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */,
|
||||
"hostname2")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
"hostname2", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1,
|
||||
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
|
||||
|
|
@ -665,21 +776,21 @@ public class ClusterResolverLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() {
|
||||
ClusterResolverConfig config =
|
||||
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
||||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
||||
LocalityLbEndpoints localityLbEndpoints1 =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
|
||||
"hostname1")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
"hostname1", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
LocalityLbEndpoints localityLbEndpoints2 =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */,
|
||||
"hostname2")),
|
||||
10 /* localityWeight */, 2 /* priority */);
|
||||
"hostname2", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 2 /* priority */, ImmutableMap.of());
|
||||
String priority2 = CLUSTER1 + "[child2]";
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1,
|
||||
|
|
@ -691,15 +802,15 @@ public class ClusterResolverLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void handleEdsResource_noHealthyEndpoint() {
|
||||
ClusterResolverConfig config =
|
||||
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
|
||||
LocalityLbEndpoints localityLbEndpoints =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint, 100, false /* isHealthy */,
|
||||
"hostname1")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
"hostname1", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(EDS_SERVICE_NAME1,
|
||||
Collections.singletonMap(locality1, localityLbEndpoints)); // single endpoint, unhealthy
|
||||
|
||||
|
|
@ -716,7 +827,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void onlyLogicalDnsCluster_endpointsResolved() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -749,7 +860,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void onlyLogicalDnsCluster_handleRefreshNameResolution() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -767,7 +878,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
|
||||
backoffPolicy1, backoffPolicy2);
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -813,7 +924,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() {
|
||||
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -851,7 +962,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void edsClustersAndLogicalDnsCluster_receivedEndpoints() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
|
|
@ -862,8 +973,9 @@ public class ClusterResolverLoadBalancerTest {
|
|||
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
|
||||
LocalityLbEndpoints localityLbEndpoints =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true, "hostname3")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true,
|
||||
"hostname3", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
||||
|
||||
|
|
@ -886,7 +998,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void noEdsResourceExists_useDnsResolutionResults() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
|
|
@ -910,7 +1022,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturnErrorPicker() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
|
|
@ -919,8 +1031,9 @@ public class ClusterResolverLoadBalancerTest {
|
|||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
|
||||
LocalityLbEndpoints localityLbEndpoints =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint, 100, true, "hostname1")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
Collections.singletonList(LbEndpoint.create(endpoint, 100, true,
|
||||
"hostname1", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
||||
resolver.deliverError(Status.UNKNOWN.withDescription("I am lost"));
|
||||
|
|
@ -941,7 +1054,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void resolutionErrorAfterChildLbCreated_propagateErrorIfAllClustersEncounterError() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
|
|
@ -950,8 +1063,9 @@ public class ClusterResolverLoadBalancerTest {
|
|||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
|
||||
LocalityLbEndpoints localityLbEndpoints =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint, 100, true, "hostname1")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
Collections.singletonList(LbEndpoint.create(endpoint, 100, true,
|
||||
"hostname1", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
||||
assertThat(childBalancers).isEmpty(); // not created until all clusters resolved.
|
||||
|
|
@ -976,7 +1090,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void resolutionErrorBeforeChildLbCreated_returnErrorPickerIfAllClustersEncounterError() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
|
|
@ -999,7 +1113,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void resolutionErrorBeforeChildLbCreated_edsOnly_returnErrorPicker() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
|
@ -1017,7 +1131,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
|
|
@ -1033,7 +1147,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
@Test
|
||||
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||
|
|
@ -1043,8 +1157,9 @@ public class ClusterResolverLoadBalancerTest {
|
|||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
||||
LocalityLbEndpoints localityLbEndpoints =
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, "hostname1")),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true,
|
||||
"hostname1", ImmutableMap.of())),
|
||||
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
||||
resolver.deliverEndpointAddresses(Collections.singletonList(endpoint2));
|
||||
|
|
@ -1118,37 +1233,37 @@ public class ClusterResolverLoadBalancerTest {
|
|||
}
|
||||
|
||||
private static EquivalentAddressGroup makeAddress(final String name) {
|
||||
class FakeSocketAddress extends SocketAddress {
|
||||
private final String name;
|
||||
return new EquivalentAddressGroup(new FakeSocketAddress(name));
|
||||
}
|
||||
|
||||
private FakeSocketAddress(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
static class FakeSocketAddress extends SocketAddress {
|
||||
private final String name;
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof FakeSocketAddress)) {
|
||||
return false;
|
||||
}
|
||||
FakeSocketAddress that = (FakeSocketAddress) o;
|
||||
return Objects.equals(name, that.name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name;
|
||||
}
|
||||
private FakeSocketAddress(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
return new EquivalentAddressGroup(new FakeSocketAddress(name));
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof FakeSocketAddress)) {
|
||||
return false;
|
||||
}
|
||||
FakeSocketAddress that = (FakeSocketAddress) o;
|
||||
return Objects.equals(name, that.name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class FakeXdsClient extends XdsClient {
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package io.grpc.xds;
|
|||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.envoyproxy.envoy.config.route.v3.RouteAction.ClusterSpecifierCase.CLUSTER_SPECIFIER_PLUGIN;
|
||||
import static io.grpc.xds.XdsClusterResource.TRANSPORT_SOCKET_NAME_HTTP11_PROXY;
|
||||
import static io.grpc.xds.XdsEndpointResource.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
|
@ -93,6 +94,7 @@ import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3
|
|||
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
|
||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin;
|
||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality;
|
||||
import io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3.Http11ProxyUpstreamTransport;
|
||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateProviderPluginInstance;
|
||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
|
||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
|
||||
|
|
@ -1055,7 +1057,7 @@ public class GrpcXdsClientImplDataTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void parseLocalityLbEndpoints_withHealthyEndpoints() {
|
||||
public void parseLocalityLbEndpoints_withHealthyEndpoints() throws ResourceInvalidException {
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
||||
.setLocality(Locality.newBuilder()
|
||||
|
|
@ -1075,12 +1077,14 @@ public class GrpcXdsClientImplDataTest {
|
|||
assertThat(struct.getErrorDetail()).isNull();
|
||||
assertThat(struct.getStruct()).isEqualTo(
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, true, "")),
|
||||
100, 1));
|
||||
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888,
|
||||
20, true, "", ImmutableMap.of())),
|
||||
100, 1, ImmutableMap.of()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseLocalityLbEndpoints_treatUnknownHealthAsHealthy() {
|
||||
public void parseLocalityLbEndpoints_treatUnknownHealthAsHealthy()
|
||||
throws ResourceInvalidException {
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
||||
.setLocality(Locality.newBuilder()
|
||||
|
|
@ -1100,12 +1104,13 @@ public class GrpcXdsClientImplDataTest {
|
|||
assertThat(struct.getErrorDetail()).isNull();
|
||||
assertThat(struct.getStruct()).isEqualTo(
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, true, "")), 100,
|
||||
1));
|
||||
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888,
|
||||
20, true, "", ImmutableMap.of())),
|
||||
100, 1, ImmutableMap.of()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseLocalityLbEndpoints_withUnHealthyEndpoints() {
|
||||
public void parseLocalityLbEndpoints_withUnHealthyEndpoints() throws ResourceInvalidException {
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
||||
.setLocality(Locality.newBuilder()
|
||||
|
|
@ -1125,12 +1130,13 @@ public class GrpcXdsClientImplDataTest {
|
|||
assertThat(struct.getErrorDetail()).isNull();
|
||||
assertThat(struct.getStruct()).isEqualTo(
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, false, "")), 100,
|
||||
1));
|
||||
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20,
|
||||
false, "", ImmutableMap.of())),
|
||||
100, 1, ImmutableMap.of()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseLocalityLbEndpoints_ignorZeroWeightLocality() {
|
||||
public void parseLocalityLbEndpoints_ignorZeroWeightLocality() throws ResourceInvalidException {
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
||||
.setLocality(Locality.newBuilder()
|
||||
|
|
@ -1187,7 +1193,10 @@ public class GrpcXdsClientImplDataTest {
|
|||
EquivalentAddressGroup expectedEag = new EquivalentAddressGroup(socketAddressList);
|
||||
assertThat(struct.getStruct()).isEqualTo(
|
||||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(expectedEag, 20, true, "")), 100, 1));
|
||||
Collections.singletonList(LbEndpoint.create(
|
||||
expectedEag, 20, true, "", ImmutableMap.of())), 100, 1, ImmutableMap.of()));
|
||||
} catch (ResourceInvalidException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
if (originalDualStackProp != null) {
|
||||
System.setProperty(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, originalDualStackProp);
|
||||
|
|
@ -1198,7 +1207,7 @@ public class GrpcXdsClientImplDataTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void parseLocalityLbEndpoints_invalidPriority() {
|
||||
public void parseLocalityLbEndpoints_invalidPriority() throws ResourceInvalidException {
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
||||
.setLocality(Locality.newBuilder()
|
||||
|
|
@ -2456,6 +2465,59 @@ public class GrpcXdsClientImplDataTest {
|
|||
assertThat(update.parsedMetadata()).isEqualTo(expectedParsedMetadata);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void processCluster_parsesAddressMetadata() throws Exception {
|
||||
|
||||
// Create an Address message
|
||||
Address address = Address.newBuilder()
|
||||
.setSocketAddress(SocketAddress.newBuilder()
|
||||
.setAddress("192.168.1.1")
|
||||
.setPortValue(8080)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
// Wrap the Address in Any
|
||||
Any addressMetadata = Any.newBuilder()
|
||||
.setTypeUrl("type.googleapis.com/envoy.config.core.v3.Address")
|
||||
.setValue(address.toByteString())
|
||||
.build();
|
||||
|
||||
Struct filterMetadata = Struct.newBuilder()
|
||||
.putFields("key1", Value.newBuilder().setStringValue("value1").build())
|
||||
.putFields("key2", Value.newBuilder().setNumberValue(42).build())
|
||||
.build();
|
||||
|
||||
Metadata metadata = Metadata.newBuilder()
|
||||
.putTypedFilterMetadata("ADDRESS_METADATA", addressMetadata)
|
||||
.putFilterMetadata("FILTER_METADATA", filterMetadata)
|
||||
.build();
|
||||
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setName("cluster-foo.googleapis.com")
|
||||
.setType(DiscoveryType.EDS)
|
||||
.setEdsClusterConfig(
|
||||
EdsClusterConfig.newBuilder()
|
||||
.setEdsConfig(
|
||||
ConfigSource.newBuilder()
|
||||
.setAds(AggregatedConfigSource.getDefaultInstance()))
|
||||
.setServiceName("service-foo.googleapis.com"))
|
||||
.setLbPolicy(LbPolicy.ROUND_ROBIN)
|
||||
.setMetadata(metadata)
|
||||
.build();
|
||||
|
||||
CdsUpdate update = XdsClusterResource.processCluster(
|
||||
cluster, null, LRS_SERVER_INFO,
|
||||
LoadBalancerRegistry.getDefaultRegistry());
|
||||
|
||||
ImmutableMap<String, Object> expectedParsedMetadata = ImmutableMap.of(
|
||||
"ADDRESS_METADATA", new InetSocketAddress("192.168.1.1", 8080),
|
||||
"FILTER_METADATA", ImmutableMap.of(
|
||||
"key1", "value1",
|
||||
"key2", 42.0));
|
||||
|
||||
assertThat(update.parsedMetadata()).isEqualTo(expectedParsedMetadata);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void processCluster_metadataKeyCollision_resolvesToTypedMetadata()
|
||||
throws ResourceInvalidException, InvalidProtocolBufferException {
|
||||
|
|
@ -2512,6 +2574,40 @@ public class GrpcXdsClientImplDataTest {
|
|||
metadataRegistry.removeParser(testParser);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseNonAggregateCluster_withHttp11ProxyTransportSocket()
|
||||
throws ResourceInvalidException, InvalidProtocolBufferException {
|
||||
XdsClusterResource.isEnabledXdsHttpConnect = true;
|
||||
|
||||
Http11ProxyUpstreamTransport http11ProxyUpstreamTransport =
|
||||
Http11ProxyUpstreamTransport.newBuilder()
|
||||
.setTransportSocket(TransportSocket.getDefaultInstance())
|
||||
.build();
|
||||
|
||||
TransportSocket transportSocket = TransportSocket.newBuilder()
|
||||
.setName(TRANSPORT_SOCKET_NAME_HTTP11_PROXY)
|
||||
.setTypedConfig(Any.pack(http11ProxyUpstreamTransport))
|
||||
.build();
|
||||
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setName("cluster-http11-proxy.googleapis.com")
|
||||
.setType(DiscoveryType.EDS)
|
||||
.setEdsClusterConfig(
|
||||
EdsClusterConfig.newBuilder()
|
||||
.setEdsConfig(
|
||||
ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
|
||||
.setServiceName("service-http11-proxy.googleapis.com"))
|
||||
.setLbPolicy(LbPolicy.ROUND_ROBIN)
|
||||
.setTransportSocket(transportSocket)
|
||||
.build();
|
||||
|
||||
CdsUpdate result =
|
||||
XdsClusterResource.processCluster(cluster, null, LRS_SERVER_INFO,
|
||||
LoadBalancerRegistry.getDefaultRegistry());
|
||||
|
||||
assertThat(result).isNotNull();
|
||||
assertThat(result.isHttp11ProxyAvailable()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseServerSideListener_invalidTrafficDirection() throws ResourceInvalidException {
|
||||
|
|
|
|||
|
|
@ -607,9 +607,9 @@ public abstract class GrpcXdsClientImplTestBase {
|
|||
Locality.create("region1", "zone1", "subzone1"),
|
||||
LocalityLbEndpoints.create(
|
||||
ImmutableList.of(LbEndpoint.create("192.168.0.1", 8080, 2, true,
|
||||
"endpoint-host-name")), 1, 0),
|
||||
"endpoint-host-name", ImmutableMap.of())), 1, 0, ImmutableMap.of()),
|
||||
Locality.create("region3", "zone3", "subzone3"),
|
||||
LocalityLbEndpoints.create(ImmutableList.<LbEndpoint>of(), 2, 1));
|
||||
LocalityLbEndpoints.create(ImmutableList.<LbEndpoint>of(), 2, 1, ImmutableMap.of()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -3246,7 +3246,9 @@ public abstract class GrpcXdsClientImplTestBase {
|
|||
Locality.create("region2", "zone2", "subzone2"),
|
||||
LocalityLbEndpoints.create(
|
||||
ImmutableList.of(
|
||||
LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0));
|
||||
LbEndpoint.create("172.44.2.2", 8000, 3,
|
||||
true, "endpoint-host-name", ImmutableMap.of())),
|
||||
2, 0, ImmutableMap.of()));
|
||||
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, updatedClusterLoadAssignment, VERSION_2,
|
||||
TIME_INCREMENT * 2);
|
||||
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
|
||||
|
|
@ -3416,7 +3418,9 @@ public abstract class GrpcXdsClientImplTestBase {
|
|||
Locality.create("region2", "zone2", "subzone2"),
|
||||
LocalityLbEndpoints.create(
|
||||
ImmutableList.of(
|
||||
LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0));
|
||||
LbEndpoint.create("172.44.2.2", 8000, 3,
|
||||
true, "endpoint-host-name", ImmutableMap.of())),
|
||||
2, 0, ImmutableMap.of()));
|
||||
verify(watcher2).onChanged(edsUpdateCaptor.capture());
|
||||
edsUpdate = edsUpdateCaptor.getValue();
|
||||
assertThat(edsUpdate.clusterName).isEqualTo(edsResourceTwo);
|
||||
|
|
@ -3426,7 +3430,9 @@ public abstract class GrpcXdsClientImplTestBase {
|
|||
Locality.create("region2", "zone2", "subzone2"),
|
||||
LocalityLbEndpoints.create(
|
||||
ImmutableList.of(
|
||||
LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0));
|
||||
LbEndpoint.create("172.44.2.2", 8000, 3,
|
||||
true, "endpoint-host-name", ImmutableMap.of())),
|
||||
2, 0, ImmutableMap.of()));
|
||||
verifyNoMoreInteractions(edsResourceWatcher);
|
||||
verifyResourceMetadataAcked(
|
||||
EDS, edsResourceTwo, clusterLoadAssignmentTwo, VERSION_2, TIME_INCREMENT * 2);
|
||||
|
|
|
|||
|
|
@ -257,17 +257,17 @@ public class XdsTestUtils {
|
|||
|
||||
// Need to create endpoints to create locality endpoints map to create edsUpdate
|
||||
Map<Locality, LocalityLbEndpoints> lbEndpointsMap = new HashMap<>();
|
||||
LbEndpoint lbEndpoint =
|
||||
LbEndpoint.create(serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME);
|
||||
LbEndpoint lbEndpoint = LbEndpoint.create(
|
||||
serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME, ImmutableMap.of());
|
||||
lbEndpointsMap.put(
|
||||
Locality.create("", "", ""),
|
||||
LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0));
|
||||
LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0, ImmutableMap.of()));
|
||||
|
||||
// Need to create EdsUpdate to create CdsUpdate to create XdsClusterConfig for builder
|
||||
XdsEndpointResource.EdsUpdate edsUpdate = new XdsEndpointResource.EdsUpdate(
|
||||
EDS_NAME, lbEndpointsMap, Collections.emptyList());
|
||||
XdsClusterResource.CdsUpdate cdsUpdate = XdsClusterResource.CdsUpdate.forEds(
|
||||
CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null)
|
||||
CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null, false)
|
||||
.lbPolicyConfig(getWrrLbConfigAsMap()).build();
|
||||
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
|
||||
CLUSTER_NAME, cdsUpdate, new EndpointConfig(StatusOr.fromValue(edsUpdate)));
|
||||
|
|
|
|||
|
|
@ -86,6 +86,7 @@ envoy/extensions/load_balancing_policies/pick_first/v3/pick_first.proto
|
|||
envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto
|
||||
envoy/extensions/load_balancing_policies/round_robin/v3/round_robin.proto
|
||||
envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto
|
||||
envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.proto
|
||||
envoy/extensions/transport_sockets/tls/v3/cert.proto
|
||||
envoy/extensions/transport_sockets/tls/v3/common.proto
|
||||
envoy/extensions/transport_sockets/tls/v3/secret.proto
|
||||
|
|
|
|||
|
|
@ -0,0 +1,38 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package envoy.extensions.transport_sockets.http_11_proxy.v3;
|
||||
|
||||
import "envoy/config/core/v3/base.proto";
|
||||
|
||||
import "udpa/annotations/status.proto";
|
||||
|
||||
option java_package = "io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3";
|
||||
option java_outer_classname = "UpstreamHttp11ConnectProto";
|
||||
option java_multiple_files = true;
|
||||
option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/http_11_proxy/v3;http_11_proxyv3";
|
||||
option (udpa.annotations.file_status).package_version_status = ACTIVE;
|
||||
|
||||
// [#protodoc-title: Upstream HTTP/1.1 Proxy]
|
||||
// [#extension: envoy.transport_sockets.http_11_proxy]
|
||||
|
||||
// HTTP/1.1 proxy transport socket establishes an upstream connection to a proxy address
|
||||
// instead of the target host's address. This behavior is triggered when the transport
|
||||
// socket is configured and proxy information is provided.
|
||||
//
|
||||
// Behavior when proxying:
|
||||
// =======================
|
||||
// When an upstream connection is established, instead of connecting directly to the endpoint
|
||||
// address, the client will connect to the specified proxy address, send an HTTP/1.1 ``CONNECT`` request
|
||||
// indicating the endpoint address, and process the response. If the response has HTTP status 200,
|
||||
// the connection will be passed down to the underlying transport socket.
|
||||
//
|
||||
// Configuring proxy information:
|
||||
// ==============================
|
||||
// Set ``typed_filter_metadata`` in :ref:`LbEndpoint.Metadata <envoy_v3_api_field_config.endpoint.v3.lbendpoint.metadata>` or :ref:`LocalityLbEndpoints.Metadata <envoy_v3_api_field_config.endpoint.v3.LocalityLbEndpoints.metadata>`.
|
||||
// using the key ``envoy.http11_proxy_transport_socket.proxy_address`` and the
|
||||
// proxy address in ``config::core::v3::Address`` format.
|
||||
//
|
||||
message Http11ProxyUpstreamTransport {
|
||||
// The underlying transport socket being wrapped. Defaults to plaintext (raw_buffer) if unset.
|
||||
config.core.v3.TransportSocket transport_socket = 1;
|
||||
}
|
||||
Loading…
Reference in New Issue