mirror of https://github.com/grpc/grpc-java.git
xds: xDS-based HTTP CONNECT configuration (#11861)
This commit is contained in:
parent
c340f4a2f3
commit
12197065fe
|
|
@ -85,6 +85,7 @@ java_proto_library(
|
||||||
"@envoy_api//envoy/extensions/load_balancing_policies/ring_hash/v3:pkg",
|
"@envoy_api//envoy/extensions/load_balancing_policies/ring_hash/v3:pkg",
|
||||||
"@envoy_api//envoy/extensions/load_balancing_policies/round_robin/v3:pkg",
|
"@envoy_api//envoy/extensions/load_balancing_policies/round_robin/v3:pkg",
|
||||||
"@envoy_api//envoy/extensions/load_balancing_policies/wrr_locality/v3:pkg",
|
"@envoy_api//envoy/extensions/load_balancing_policies/wrr_locality/v3:pkg",
|
||||||
|
"@envoy_api//envoy/extensions/transport_sockets/http_11_proxy/v3:pkg",
|
||||||
"@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg",
|
"@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg",
|
||||||
"@envoy_api//envoy/service/discovery/v3:pkg",
|
"@envoy_api//envoy/service/discovery/v3:pkg",
|
||||||
"@envoy_api//envoy/service/load_stats/v3:pkg",
|
"@envoy_api//envoy/service/load_stats/v3:pkg",
|
||||||
|
|
|
||||||
|
|
@ -243,7 +243,9 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Collections.unmodifiableList(instances), configOrError.getConfig());
|
Collections.unmodifiableList(instances),
|
||||||
|
configOrError.getConfig(),
|
||||||
|
root.result.isHttp11ProxyAvailable());
|
||||||
if (childLb == null) {
|
if (childLb == null) {
|
||||||
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
|
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.protobuf.Struct;
|
import com.google.protobuf.Struct;
|
||||||
import io.grpc.Attributes;
|
import io.grpc.Attributes;
|
||||||
import io.grpc.EquivalentAddressGroup;
|
import io.grpc.EquivalentAddressGroup;
|
||||||
|
import io.grpc.HttpConnectProxiedSocketAddress;
|
||||||
import io.grpc.InternalLogId;
|
import io.grpc.InternalLogId;
|
||||||
import io.grpc.LoadBalancer;
|
import io.grpc.LoadBalancer;
|
||||||
import io.grpc.LoadBalancerProvider;
|
import io.grpc.LoadBalancerProvider;
|
||||||
|
|
@ -59,6 +60,8 @@ import io.grpc.xds.client.XdsClient;
|
||||||
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
||||||
import io.grpc.xds.client.XdsLogger;
|
import io.grpc.xds.client.XdsLogger;
|
||||||
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
|
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.SocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
@ -430,8 +433,18 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
||||||
.set(XdsAttributes.ATTR_SERVER_WEIGHT, weight)
|
.set(XdsAttributes.ATTR_SERVER_WEIGHT, weight)
|
||||||
.set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname())
|
.set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname())
|
||||||
.build();
|
.build();
|
||||||
EquivalentAddressGroup eag = new EquivalentAddressGroup(
|
|
||||||
endpoint.eag().getAddresses(), attr);
|
EquivalentAddressGroup eag;
|
||||||
|
if (config.isHttp11ProxyAvailable()) {
|
||||||
|
List<SocketAddress> rewrittenAddresses = new ArrayList<>();
|
||||||
|
for (SocketAddress addr : endpoint.eag().getAddresses()) {
|
||||||
|
rewrittenAddresses.add(rewriteAddress(
|
||||||
|
addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata()));
|
||||||
|
}
|
||||||
|
eag = new EquivalentAddressGroup(rewrittenAddresses, attr);
|
||||||
|
} else {
|
||||||
|
eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
|
||||||
|
}
|
||||||
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
|
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
|
||||||
addresses.add(eag);
|
addresses.add(eag);
|
||||||
}
|
}
|
||||||
|
|
@ -469,6 +482,35 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
||||||
new EndpointsUpdated().run();
|
new EndpointsUpdated().run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private SocketAddress rewriteAddress(SocketAddress addr,
|
||||||
|
ImmutableMap<String, Object> endpointMetadata,
|
||||||
|
ImmutableMap<String, Object> localityMetadata) {
|
||||||
|
if (!(addr instanceof InetSocketAddress)) {
|
||||||
|
return addr;
|
||||||
|
}
|
||||||
|
|
||||||
|
SocketAddress proxyAddress;
|
||||||
|
try {
|
||||||
|
proxyAddress = (SocketAddress) endpointMetadata.get(
|
||||||
|
"envoy.http11_proxy_transport_socket.proxy_address");
|
||||||
|
if (proxyAddress == null) {
|
||||||
|
proxyAddress = (SocketAddress) localityMetadata.get(
|
||||||
|
"envoy.http11_proxy_transport_socket.proxy_address");
|
||||||
|
}
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
return addr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (proxyAddress == null) {
|
||||||
|
return addr;
|
||||||
|
}
|
||||||
|
|
||||||
|
return HttpConnectProxiedSocketAddress.newBuilder()
|
||||||
|
.setTargetAddress((InetSocketAddress) addr)
|
||||||
|
.setProxyAddress(proxyAddress)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
private List<String> generatePriorityNames(String name,
|
private List<String> generatePriorityNames(String name,
|
||||||
Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
|
Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
|
||||||
TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
|
TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
|
||||||
|
|
|
||||||
|
|
@ -74,10 +74,17 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
|
||||||
final List<DiscoveryMechanism> discoveryMechanisms;
|
final List<DiscoveryMechanism> discoveryMechanisms;
|
||||||
// GracefulSwitch configuration
|
// GracefulSwitch configuration
|
||||||
final Object lbConfig;
|
final Object lbConfig;
|
||||||
|
private final boolean isHttp11ProxyAvailable;
|
||||||
|
|
||||||
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig) {
|
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig,
|
||||||
|
boolean isHttp11ProxyAvailable) {
|
||||||
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
|
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
|
||||||
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
|
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
|
||||||
|
this.isHttp11ProxyAvailable = isHttp11ProxyAvailable;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isHttp11ProxyAvailable() {
|
||||||
|
return isHttp11ProxyAvailable;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
|
||||||
import com.google.auto.value.AutoValue;
|
import com.google.auto.value.AutoValue;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import io.grpc.EquivalentAddressGroup;
|
import io.grpc.EquivalentAddressGroup;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -41,11 +42,13 @@ final class Endpoints {
|
||||||
// Locality's priority level.
|
// Locality's priority level.
|
||||||
abstract int priority();
|
abstract int priority();
|
||||||
|
|
||||||
|
abstract ImmutableMap<String, Object> localityMetadata();
|
||||||
|
|
||||||
static LocalityLbEndpoints create(List<LbEndpoint> endpoints, int localityWeight,
|
static LocalityLbEndpoints create(List<LbEndpoint> endpoints, int localityWeight,
|
||||||
int priority) {
|
int priority, ImmutableMap<String, Object> localityMetadata) {
|
||||||
checkArgument(localityWeight > 0, "localityWeight must be greater than 0");
|
checkArgument(localityWeight > 0, "localityWeight must be greater than 0");
|
||||||
return new AutoValue_Endpoints_LocalityLbEndpoints(
|
return new AutoValue_Endpoints_LocalityLbEndpoints(
|
||||||
ImmutableList.copyOf(endpoints), localityWeight, priority);
|
ImmutableList.copyOf(endpoints), localityWeight, priority, localityMetadata);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -63,17 +66,20 @@ final class Endpoints {
|
||||||
|
|
||||||
abstract String hostname();
|
abstract String hostname();
|
||||||
|
|
||||||
|
abstract ImmutableMap<String, Object> endpointMetadata();
|
||||||
|
|
||||||
static LbEndpoint create(EquivalentAddressGroup eag, int loadBalancingWeight,
|
static LbEndpoint create(EquivalentAddressGroup eag, int loadBalancingWeight,
|
||||||
boolean isHealthy, String hostname) {
|
boolean isHealthy, String hostname, ImmutableMap<String, Object> endpointMetadata) {
|
||||||
return new AutoValue_Endpoints_LbEndpoint(eag, loadBalancingWeight, isHealthy, hostname);
|
return new AutoValue_Endpoints_LbEndpoint(
|
||||||
|
eag, loadBalancingWeight, isHealthy, hostname, endpointMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only for testing.
|
// Only for testing.
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static LbEndpoint create(
|
static LbEndpoint create(String address, int port, int loadBalancingWeight, boolean isHealthy,
|
||||||
String address, int port, int loadBalancingWeight, boolean isHealthy, String hostname) {
|
String hostname, ImmutableMap<String, Object> endpointMetadata) {
|
||||||
return LbEndpoint.create(new EquivalentAddressGroup(new InetSocketAddress(address, port)),
|
return LbEndpoint.create(new EquivalentAddressGroup(new InetSocketAddress(address, port)),
|
||||||
loadBalancingWeight, isHealthy, hostname);
|
loadBalancingWeight, isHealthy, hostname, endpointMetadata);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ import io.grpc.MethodDescriptor;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.auth.MoreCallCredentials;
|
import io.grpc.auth.MoreCallCredentials;
|
||||||
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
||||||
|
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
@ -240,11 +241,16 @@ final class GcpAuthenticationFilter implements Filter {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String parse(Any any) throws InvalidProtocolBufferException {
|
public String parse(Any any) throws ResourceInvalidException {
|
||||||
Audience audience = any.unpack(Audience.class);
|
Audience audience;
|
||||||
|
try {
|
||||||
|
audience = any.unpack(Audience.class);
|
||||||
|
} catch (InvalidProtocolBufferException ex) {
|
||||||
|
throw new ResourceInvalidException("Invalid Resource in address proto", ex);
|
||||||
|
}
|
||||||
String url = audience.getUrl();
|
String url = audience.getUrl();
|
||||||
if (url.isEmpty()) {
|
if (url.isEmpty()) {
|
||||||
throw new InvalidProtocolBufferException(
|
throw new ResourceInvalidException(
|
||||||
"Audience URL is empty. Metadata value must contain a valid URL.");
|
"Audience URL is empty. Metadata value must contain a valid URL.");
|
||||||
}
|
}
|
||||||
return url;
|
return url;
|
||||||
|
|
|
||||||
|
|
@ -17,9 +17,14 @@
|
||||||
package io.grpc.xds;
|
package io.grpc.xds;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.protobuf.Any;
|
import com.google.protobuf.Any;
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.Struct;
|
||||||
|
import io.envoyproxy.envoy.config.core.v3.Metadata;
|
||||||
import io.grpc.xds.GcpAuthenticationFilter.AudienceMetadataParser;
|
import io.grpc.xds.GcpAuthenticationFilter.AudienceMetadataParser;
|
||||||
|
import io.grpc.xds.XdsEndpointResource.AddressMetadataParser;
|
||||||
|
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
|
||||||
|
import io.grpc.xds.internal.ProtobufJsonConverter;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
@ -36,6 +41,7 @@ final class MetadataRegistry {
|
||||||
|
|
||||||
private MetadataRegistry() {
|
private MetadataRegistry() {
|
||||||
registerParser(new AudienceMetadataParser());
|
registerParser(new AudienceMetadataParser());
|
||||||
|
registerParser(new AddressMetadataParser());
|
||||||
}
|
}
|
||||||
|
|
||||||
static MetadataRegistry getInstance() {
|
static MetadataRegistry getInstance() {
|
||||||
|
|
@ -55,6 +61,54 @@ final class MetadataRegistry {
|
||||||
supportedParsers.remove(parser.getTypeUrl());
|
supportedParsers.remove(parser.getTypeUrl());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parses cluster metadata into a structured map.
|
||||||
|
*
|
||||||
|
* <p>Values in {@code typed_filter_metadata} take precedence over
|
||||||
|
* {@code filter_metadata} when keys overlap, following Envoy API behavior. See
|
||||||
|
* <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/config/core/v3/base.proto#L217-L259">
|
||||||
|
* Envoy metadata documentation </a> for details.
|
||||||
|
*
|
||||||
|
* @param metadata the {@link Metadata} containing the fields to parse.
|
||||||
|
* @return an immutable map of parsed metadata.
|
||||||
|
* @throws ResourceInvalidException if parsing {@code typed_filter_metadata} fails.
|
||||||
|
*/
|
||||||
|
public ImmutableMap<String, Object> parseMetadata(Metadata metadata)
|
||||||
|
throws ResourceInvalidException {
|
||||||
|
ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder();
|
||||||
|
|
||||||
|
// Process typed_filter_metadata
|
||||||
|
for (Map.Entry<String, Any> entry : metadata.getTypedFilterMetadataMap().entrySet()) {
|
||||||
|
String key = entry.getKey();
|
||||||
|
Any value = entry.getValue();
|
||||||
|
MetadataValueParser parser = findParser(value.getTypeUrl());
|
||||||
|
if (parser != null) {
|
||||||
|
try {
|
||||||
|
Object parsedValue = parser.parse(value);
|
||||||
|
parsedMetadata.put(key, parsedValue);
|
||||||
|
} catch (ResourceInvalidException e) {
|
||||||
|
throw new ResourceInvalidException(
|
||||||
|
String.format("Failed to parse metadata key: %s, type: %s. Error: %s",
|
||||||
|
key, value.getTypeUrl(), e.getMessage()), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// building once to reuse in the next loop
|
||||||
|
ImmutableMap<String, Object> intermediateParsedMetadata = parsedMetadata.build();
|
||||||
|
|
||||||
|
// Process filter_metadata for remaining keys
|
||||||
|
for (Map.Entry<String, Struct> entry : metadata.getFilterMetadataMap().entrySet()) {
|
||||||
|
String key = entry.getKey();
|
||||||
|
if (!intermediateParsedMetadata.containsKey(key)) {
|
||||||
|
Struct structValue = entry.getValue();
|
||||||
|
Object jsonValue = ProtobufJsonConverter.convertToJson(structValue);
|
||||||
|
parsedMetadata.put(key, jsonValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return parsedMetadata.build();
|
||||||
|
}
|
||||||
|
|
||||||
interface MetadataValueParser {
|
interface MetadataValueParser {
|
||||||
|
|
||||||
String getTypeUrl();
|
String getTypeUrl();
|
||||||
|
|
@ -64,8 +118,8 @@ final class MetadataRegistry {
|
||||||
*
|
*
|
||||||
* @param any the {@link Any} object to parse.
|
* @param any the {@link Any} object to parse.
|
||||||
* @return the parsed metadata value.
|
* @return the parsed metadata value.
|
||||||
* @throws InvalidProtocolBufferException if the parsing fails.
|
* @throws ResourceInvalidException if the parsing fails.
|
||||||
*/
|
*/
|
||||||
Object parse(Any any) throws InvalidProtocolBufferException;
|
Object parse(Any any) throws ResourceInvalidException;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,6 @@ import com.google.common.base.MoreObjects;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.protobuf.Any;
|
|
||||||
import com.google.protobuf.Duration;
|
import com.google.protobuf.Duration;
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
|
|
@ -33,10 +32,11 @@ import com.google.protobuf.Struct;
|
||||||
import com.google.protobuf.util.Durations;
|
import com.google.protobuf.util.Durations;
|
||||||
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds;
|
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds;
|
||||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
||||||
import io.envoyproxy.envoy.config.core.v3.Metadata;
|
|
||||||
import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
|
import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
|
||||||
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
|
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
|
||||||
|
import io.envoyproxy.envoy.config.core.v3.TransportSocket;
|
||||||
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
||||||
|
import io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3.Http11ProxyUpstreamTransport;
|
||||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
|
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
|
||||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
|
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
|
||||||
import io.grpc.LoadBalancerRegistry;
|
import io.grpc.LoadBalancerRegistry;
|
||||||
|
|
@ -46,15 +46,12 @@ import io.grpc.internal.ServiceConfigUtil;
|
||||||
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||||
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
|
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
|
||||||
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
|
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
|
||||||
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
|
||||||
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
||||||
import io.grpc.xds.client.XdsClient.ResourceUpdate;
|
import io.grpc.xds.client.XdsClient.ResourceUpdate;
|
||||||
import io.grpc.xds.client.XdsResourceType;
|
import io.grpc.xds.client.XdsResourceType;
|
||||||
import io.grpc.xds.internal.ProtobufJsonConverter;
|
|
||||||
import io.grpc.xds.internal.security.CommonTlsContextUtil;
|
import io.grpc.xds.internal.security.CommonTlsContextUtil;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
|
@ -67,6 +64,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static boolean enableSystemRootCerts =
|
public static boolean enableSystemRootCerts =
|
||||||
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_SYSTEM_ROOT_CERTS", false);
|
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_SYSTEM_ROOT_CERTS", false);
|
||||||
|
static boolean isEnabledXdsHttpConnect =
|
||||||
|
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT", false);
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate";
|
static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate";
|
||||||
|
|
@ -78,6 +77,9 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
"type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext";
|
"type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext";
|
||||||
private static final String TYPE_URL_UPSTREAM_TLS_CONTEXT_V2 =
|
private static final String TYPE_URL_UPSTREAM_TLS_CONTEXT_V2 =
|
||||||
"type.googleapis.com/envoy.api.v2.auth.UpstreamTlsContext";
|
"type.googleapis.com/envoy.api.v2.auth.UpstreamTlsContext";
|
||||||
|
static final String TRANSPORT_SOCKET_NAME_HTTP11_PROXY =
|
||||||
|
"type.googleapis.com/envoy.extensions.transport_sockets.http_11_proxy.v3"
|
||||||
|
+ ".Http11ProxyUpstreamTransport";
|
||||||
private final LoadBalancerRegistry loadBalancerRegistry
|
private final LoadBalancerRegistry loadBalancerRegistry
|
||||||
= LoadBalancerRegistry.getDefaultRegistry();
|
= LoadBalancerRegistry.getDefaultRegistry();
|
||||||
|
|
||||||
|
|
@ -177,10 +179,11 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
ImmutableMap.copyOf(cluster.getMetadata().getFilterMetadataMap()));
|
ImmutableMap.copyOf(cluster.getMetadata().getFilterMetadataMap()));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
MetadataRegistry registry = MetadataRegistry.getInstance();
|
||||||
ImmutableMap<String, Object> parsedFilterMetadata =
|
ImmutableMap<String, Object> parsedFilterMetadata =
|
||||||
parseClusterMetadata(cluster.getMetadata());
|
registry.parseMetadata(cluster.getMetadata());
|
||||||
updateBuilder.parsedMetadata(parsedFilterMetadata);
|
updateBuilder.parsedMetadata(parsedFilterMetadata);
|
||||||
} catch (InvalidProtocolBufferException e) {
|
} catch (ResourceInvalidException e) {
|
||||||
throw new ResourceInvalidException(
|
throw new ResourceInvalidException(
|
||||||
"Failed to parse xDS filter metadata for cluster '" + cluster.getName() + "': "
|
"Failed to parse xDS filter metadata for cluster '" + cluster.getName() + "': "
|
||||||
+ e.getMessage(), e);
|
+ e.getMessage(), e);
|
||||||
|
|
@ -189,49 +192,6 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
return updateBuilder.build();
|
return updateBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Parses cluster metadata into a structured map.
|
|
||||||
*
|
|
||||||
* <p>Values in {@code typed_filter_metadata} take precedence over
|
|
||||||
* {@code filter_metadata} when keys overlap, following Envoy API behavior. See
|
|
||||||
* <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/config/core/v3/base.proto#L217-L259">
|
|
||||||
* Envoy metadata documentation </a> for details.
|
|
||||||
*
|
|
||||||
* @param metadata the {@link Metadata} containing the fields to parse.
|
|
||||||
* @return an immutable map of parsed metadata.
|
|
||||||
* @throws InvalidProtocolBufferException if parsing {@code typed_filter_metadata} fails.
|
|
||||||
*/
|
|
||||||
private static ImmutableMap<String, Object> parseClusterMetadata(Metadata metadata)
|
|
||||||
throws InvalidProtocolBufferException {
|
|
||||||
ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder();
|
|
||||||
|
|
||||||
MetadataRegistry registry = MetadataRegistry.getInstance();
|
|
||||||
// Process typed_filter_metadata
|
|
||||||
for (Map.Entry<String, Any> entry : metadata.getTypedFilterMetadataMap().entrySet()) {
|
|
||||||
String key = entry.getKey();
|
|
||||||
Any value = entry.getValue();
|
|
||||||
MetadataValueParser parser = registry.findParser(value.getTypeUrl());
|
|
||||||
if (parser != null) {
|
|
||||||
Object parsedValue = parser.parse(value);
|
|
||||||
parsedMetadata.put(key, parsedValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// building once to reuse in the next loop
|
|
||||||
ImmutableMap<String, Object> intermediateParsedMetadata = parsedMetadata.build();
|
|
||||||
|
|
||||||
// Process filter_metadata for remaining keys
|
|
||||||
for (Map.Entry<String, Struct> entry : metadata.getFilterMetadataMap().entrySet()) {
|
|
||||||
String key = entry.getKey();
|
|
||||||
if (!intermediateParsedMetadata.containsKey(key)) {
|
|
||||||
Struct structValue = entry.getValue();
|
|
||||||
Object jsonValue = ProtobufJsonConverter.convertToJson(structValue);
|
|
||||||
parsedMetadata.put(key, jsonValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return parsedMetadata.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static StructOrError<CdsUpdate.Builder> parseAggregateCluster(Cluster cluster) {
|
private static StructOrError<CdsUpdate.Builder> parseAggregateCluster(Cluster cluster) {
|
||||||
String clusterName = cluster.getName();
|
String clusterName = cluster.getName();
|
||||||
Cluster.CustomClusterType customType = cluster.getClusterType();
|
Cluster.CustomClusterType customType = cluster.getClusterType();
|
||||||
|
|
@ -259,6 +219,7 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
Long maxConcurrentRequests = null;
|
Long maxConcurrentRequests = null;
|
||||||
UpstreamTlsContext upstreamTlsContext = null;
|
UpstreamTlsContext upstreamTlsContext = null;
|
||||||
OutlierDetection outlierDetection = null;
|
OutlierDetection outlierDetection = null;
|
||||||
|
boolean isHttp11ProxyAvailable = false;
|
||||||
if (cluster.hasLrsServer()) {
|
if (cluster.hasLrsServer()) {
|
||||||
if (!cluster.getLrsServer().hasSelf()) {
|
if (!cluster.getLrsServer().hasSelf()) {
|
||||||
return StructOrError.fromError(
|
return StructOrError.fromError(
|
||||||
|
|
@ -281,17 +242,43 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
return StructOrError.fromError("Cluster " + clusterName
|
return StructOrError.fromError("Cluster " + clusterName
|
||||||
+ ": transport-socket-matches not supported.");
|
+ ": transport-socket-matches not supported.");
|
||||||
}
|
}
|
||||||
if (cluster.hasTransportSocket()) {
|
boolean hasTransportSocket = cluster.hasTransportSocket();
|
||||||
if (!TRANSPORT_SOCKET_NAME_TLS.equals(cluster.getTransportSocket().getName())) {
|
TransportSocket transportSocket = cluster.getTransportSocket();
|
||||||
return StructOrError.fromError("transport-socket with name "
|
|
||||||
+ cluster.getTransportSocket().getName() + " not supported.");
|
if (hasTransportSocket && !TRANSPORT_SOCKET_NAME_TLS.equals(transportSocket.getName())
|
||||||
|
&& !(isEnabledXdsHttpConnect
|
||||||
|
&& TRANSPORT_SOCKET_NAME_HTTP11_PROXY.equals(transportSocket.getName()))) {
|
||||||
|
return StructOrError.fromError(
|
||||||
|
"transport-socket with name " + transportSocket.getName() + " not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasTransportSocket && isEnabledXdsHttpConnect
|
||||||
|
&& TRANSPORT_SOCKET_NAME_HTTP11_PROXY.equals(transportSocket.getName())) {
|
||||||
|
isHttp11ProxyAvailable = true;
|
||||||
|
try {
|
||||||
|
Http11ProxyUpstreamTransport wrappedTransportSocket = transportSocket
|
||||||
|
.getTypedConfig().unpack(io.envoyproxy.envoy.extensions.transport_sockets
|
||||||
|
.http_11_proxy.v3.Http11ProxyUpstreamTransport.class);
|
||||||
|
hasTransportSocket = wrappedTransportSocket.hasTransportSocket();
|
||||||
|
transportSocket = wrappedTransportSocket.getTransportSocket();
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
return StructOrError.fromError(
|
||||||
|
"Cluster " + clusterName + ": malformed Http11ProxyUpstreamTransport: " + e);
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
return StructOrError.fromError(
|
||||||
|
"Cluster " + clusterName
|
||||||
|
+ ": invalid transport_socket type in Http11ProxyUpstreamTransport");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasTransportSocket && TRANSPORT_SOCKET_NAME_TLS.equals(transportSocket.getName())) {
|
||||||
try {
|
try {
|
||||||
upstreamTlsContext = UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(
|
upstreamTlsContext = UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(
|
||||||
validateUpstreamTlsContext(
|
validateUpstreamTlsContext(
|
||||||
unpackCompatibleType(cluster.getTransportSocket().getTypedConfig(),
|
unpackCompatibleType(transportSocket.getTypedConfig(),
|
||||||
io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext.class,
|
io.envoyproxy.envoy.extensions
|
||||||
TYPE_URL_UPSTREAM_TLS_CONTEXT, TYPE_URL_UPSTREAM_TLS_CONTEXT_V2),
|
.transport_sockets.tls.v3.UpstreamTlsContext.class,
|
||||||
|
TYPE_URL_UPSTREAM_TLS_CONTEXT, TYPE_URL_UPSTREAM_TLS_CONTEXT_V2),
|
||||||
certProviderInstances));
|
certProviderInstances));
|
||||||
} catch (InvalidProtocolBufferException | ResourceInvalidException e) {
|
} catch (InvalidProtocolBufferException | ResourceInvalidException e) {
|
||||||
return StructOrError.fromError(
|
return StructOrError.fromError(
|
||||||
|
|
@ -329,9 +316,10 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
return StructOrError.fromError(
|
return StructOrError.fromError(
|
||||||
"EDS service_name must be set when Cluster resource has an xdstp name");
|
"EDS service_name must be set when Cluster resource has an xdstp name");
|
||||||
}
|
}
|
||||||
|
|
||||||
return StructOrError.fromStruct(CdsUpdate.forEds(
|
return StructOrError.fromStruct(CdsUpdate.forEds(
|
||||||
clusterName, edsServiceName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext,
|
clusterName, edsServiceName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext,
|
||||||
outlierDetection));
|
outlierDetection, isHttp11ProxyAvailable));
|
||||||
} else if (type.equals(Cluster.DiscoveryType.LOGICAL_DNS)) {
|
} else if (type.equals(Cluster.DiscoveryType.LOGICAL_DNS)) {
|
||||||
if (!cluster.hasLoadAssignment()) {
|
if (!cluster.hasLoadAssignment()) {
|
||||||
return StructOrError.fromError(
|
return StructOrError.fromError(
|
||||||
|
|
@ -366,7 +354,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
String dnsHostName = String.format(
|
String dnsHostName = String.format(
|
||||||
Locale.US, "%s:%d", socketAddress.getAddress(), socketAddress.getPortValue());
|
Locale.US, "%s:%d", socketAddress.getAddress(), socketAddress.getPortValue());
|
||||||
return StructOrError.fromStruct(CdsUpdate.forLogicalDns(
|
return StructOrError.fromStruct(CdsUpdate.forLogicalDns(
|
||||||
clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext));
|
clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests,
|
||||||
|
upstreamTlsContext, isHttp11ProxyAvailable));
|
||||||
}
|
}
|
||||||
return StructOrError.fromError(
|
return StructOrError.fromError(
|
||||||
"Cluster " + clusterName + ": unsupported built-in discovery type: " + type);
|
"Cluster " + clusterName + ": unsupported built-in discovery type: " + type);
|
||||||
|
|
@ -620,6 +609,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
@Nullable
|
@Nullable
|
||||||
abstract UpstreamTlsContext upstreamTlsContext();
|
abstract UpstreamTlsContext upstreamTlsContext();
|
||||||
|
|
||||||
|
abstract boolean isHttp11ProxyAvailable();
|
||||||
|
|
||||||
// List of underlying clusters making of this aggregate cluster.
|
// List of underlying clusters making of this aggregate cluster.
|
||||||
// Only valid for AGGREGATE cluster.
|
// Only valid for AGGREGATE cluster.
|
||||||
@Nullable
|
@Nullable
|
||||||
|
|
@ -640,7 +631,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
.maxRingSize(0)
|
.maxRingSize(0)
|
||||||
.choiceCount(0)
|
.choiceCount(0)
|
||||||
.filterMetadata(ImmutableMap.of())
|
.filterMetadata(ImmutableMap.of())
|
||||||
.parsedMetadata(ImmutableMap.of());
|
.parsedMetadata(ImmutableMap.of())
|
||||||
|
.isHttp11ProxyAvailable(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) {
|
static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) {
|
||||||
|
|
@ -653,26 +645,30 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
static Builder forEds(String clusterName, @Nullable String edsServiceName,
|
static Builder forEds(String clusterName, @Nullable String edsServiceName,
|
||||||
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
|
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
|
||||||
@Nullable UpstreamTlsContext upstreamTlsContext,
|
@Nullable UpstreamTlsContext upstreamTlsContext,
|
||||||
@Nullable OutlierDetection outlierDetection) {
|
@Nullable OutlierDetection outlierDetection,
|
||||||
|
boolean isHttp11ProxyAvailable) {
|
||||||
return newBuilder(clusterName)
|
return newBuilder(clusterName)
|
||||||
.clusterType(ClusterType.EDS)
|
.clusterType(ClusterType.EDS)
|
||||||
.edsServiceName(edsServiceName)
|
.edsServiceName(edsServiceName)
|
||||||
.lrsServerInfo(lrsServerInfo)
|
.lrsServerInfo(lrsServerInfo)
|
||||||
.maxConcurrentRequests(maxConcurrentRequests)
|
.maxConcurrentRequests(maxConcurrentRequests)
|
||||||
.upstreamTlsContext(upstreamTlsContext)
|
.upstreamTlsContext(upstreamTlsContext)
|
||||||
.outlierDetection(outlierDetection);
|
.outlierDetection(outlierDetection)
|
||||||
|
.isHttp11ProxyAvailable(isHttp11ProxyAvailable);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Builder forLogicalDns(String clusterName, String dnsHostName,
|
static Builder forLogicalDns(String clusterName, String dnsHostName,
|
||||||
@Nullable ServerInfo lrsServerInfo,
|
@Nullable ServerInfo lrsServerInfo,
|
||||||
@Nullable Long maxConcurrentRequests,
|
@Nullable Long maxConcurrentRequests,
|
||||||
@Nullable UpstreamTlsContext upstreamTlsContext) {
|
@Nullable UpstreamTlsContext upstreamTlsContext,
|
||||||
|
boolean isHttp11ProxyAvailable) {
|
||||||
return newBuilder(clusterName)
|
return newBuilder(clusterName)
|
||||||
.clusterType(ClusterType.LOGICAL_DNS)
|
.clusterType(ClusterType.LOGICAL_DNS)
|
||||||
.dnsHostName(dnsHostName)
|
.dnsHostName(dnsHostName)
|
||||||
.lrsServerInfo(lrsServerInfo)
|
.lrsServerInfo(lrsServerInfo)
|
||||||
.maxConcurrentRequests(maxConcurrentRequests)
|
.maxConcurrentRequests(maxConcurrentRequests)
|
||||||
.upstreamTlsContext(upstreamTlsContext);
|
.upstreamTlsContext(upstreamTlsContext)
|
||||||
|
.isHttp11ProxyAvailable(isHttp11ProxyAvailable);
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ClusterType {
|
enum ClusterType {
|
||||||
|
|
@ -749,6 +745,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
// Private, use one of the static factory methods instead.
|
// Private, use one of the static factory methods instead.
|
||||||
protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests);
|
protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests);
|
||||||
|
|
||||||
|
protected abstract Builder isHttp11ProxyAvailable(boolean isHttp11ProxyAvailable);
|
||||||
|
|
||||||
// Private, use one of the static factory methods instead.
|
// Private, use one of the static factory methods instead.
|
||||||
protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext);
|
protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,9 +20,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.MoreObjects;
|
import com.google.common.base.MoreObjects;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.net.InetAddresses;
|
||||||
|
import com.google.protobuf.Any;
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
import io.envoyproxy.envoy.config.core.v3.Address;
|
import io.envoyproxy.envoy.config.core.v3.Address;
|
||||||
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
|
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
|
||||||
|
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
|
||||||
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
||||||
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
|
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
|
||||||
import io.envoyproxy.envoy.type.v3.FractionalPercent;
|
import io.envoyproxy.envoy.type.v3.FractionalPercent;
|
||||||
|
|
@ -30,6 +35,7 @@ import io.grpc.EquivalentAddressGroup;
|
||||||
import io.grpc.internal.GrpcUtil;
|
import io.grpc.internal.GrpcUtil;
|
||||||
import io.grpc.xds.Endpoints.DropOverload;
|
import io.grpc.xds.Endpoints.DropOverload;
|
||||||
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
||||||
|
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
||||||
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
|
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
|
||||||
import io.grpc.xds.client.Locality;
|
import io.grpc.xds.client.Locality;
|
||||||
import io.grpc.xds.client.XdsClient.ResourceUpdate;
|
import io.grpc.xds.client.XdsClient.ResourceUpdate;
|
||||||
|
|
@ -185,7 +191,8 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@Nullable
|
@Nullable
|
||||||
static StructOrError<LocalityLbEndpoints> parseLocalityLbEndpoints(
|
static StructOrError<LocalityLbEndpoints> parseLocalityLbEndpoints(
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto) {
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto)
|
||||||
|
throws ResourceInvalidException {
|
||||||
// Filter out localities without or with 0 weight.
|
// Filter out localities without or with 0 weight.
|
||||||
if (!proto.hasLoadBalancingWeight() || proto.getLoadBalancingWeight().getValue() < 1) {
|
if (!proto.hasLoadBalancingWeight() || proto.getLoadBalancingWeight().getValue() < 1) {
|
||||||
return null;
|
return null;
|
||||||
|
|
@ -193,6 +200,15 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
|
||||||
if (proto.getPriority() < 0) {
|
if (proto.getPriority() < 0) {
|
||||||
return StructOrError.fromError("negative priority");
|
return StructOrError.fromError("negative priority");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ImmutableMap<String, Object> localityMetadata;
|
||||||
|
MetadataRegistry registry = MetadataRegistry.getInstance();
|
||||||
|
try {
|
||||||
|
localityMetadata = registry.parseMetadata(proto.getMetadata());
|
||||||
|
} catch (ResourceInvalidException e) {
|
||||||
|
throw new ResourceInvalidException("Failed to parse Locality Endpoint metadata: "
|
||||||
|
+ e.getMessage(), e);
|
||||||
|
}
|
||||||
List<Endpoints.LbEndpoint> endpoints = new ArrayList<>(proto.getLbEndpointsCount());
|
List<Endpoints.LbEndpoint> endpoints = new ArrayList<>(proto.getLbEndpointsCount());
|
||||||
for (io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint endpoint : proto.getLbEndpointsList()) {
|
for (io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint endpoint : proto.getLbEndpointsList()) {
|
||||||
// The endpoint field of each lb_endpoints must be set.
|
// The endpoint field of each lb_endpoints must be set.
|
||||||
|
|
@ -200,6 +216,13 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
|
||||||
if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) {
|
if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) {
|
||||||
return StructOrError.fromError("LbEndpoint with no endpoint/address");
|
return StructOrError.fromError("LbEndpoint with no endpoint/address");
|
||||||
}
|
}
|
||||||
|
ImmutableMap<String, Object> endpointMetadata;
|
||||||
|
try {
|
||||||
|
endpointMetadata = registry.parseMetadata(endpoint.getMetadata());
|
||||||
|
} catch (ResourceInvalidException e) {
|
||||||
|
throw new ResourceInvalidException("Failed to parse Endpoint metadata: "
|
||||||
|
+ e.getMessage(), e);
|
||||||
|
}
|
||||||
List<java.net.SocketAddress> addresses = new ArrayList<>();
|
List<java.net.SocketAddress> addresses = new ArrayList<>();
|
||||||
addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress()));
|
addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress()));
|
||||||
|
|
||||||
|
|
@ -214,10 +237,12 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
|
||||||
endpoints.add(Endpoints.LbEndpoint.create(
|
endpoints.add(Endpoints.LbEndpoint.create(
|
||||||
new EquivalentAddressGroup(addresses),
|
new EquivalentAddressGroup(addresses),
|
||||||
endpoint.getLoadBalancingWeight().getValue(), isHealthy,
|
endpoint.getLoadBalancingWeight().getValue(), isHealthy,
|
||||||
endpoint.getEndpoint().getHostname()));
|
endpoint.getEndpoint().getHostname(),
|
||||||
|
endpointMetadata));
|
||||||
}
|
}
|
||||||
return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create(
|
return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create(
|
||||||
endpoints, proto.getLoadBalancingWeight().getValue(), proto.getPriority()));
|
endpoints, proto.getLoadBalancingWeight().getValue(),
|
||||||
|
proto.getPriority(), localityMetadata));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static InetSocketAddress getInetSocketAddress(Address address) {
|
private static InetSocketAddress getInetSocketAddress(Address address) {
|
||||||
|
|
@ -270,4 +295,47 @@ class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
|
||||||
.toString();
|
.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class AddressMetadataParser implements MetadataValueParser {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTypeUrl() {
|
||||||
|
return "type.googleapis.com/envoy.config.core.v3.Address";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public java.net.SocketAddress parse(Any any) throws ResourceInvalidException {
|
||||||
|
SocketAddress socketAddress;
|
||||||
|
try {
|
||||||
|
socketAddress = any.unpack(Address.class).getSocketAddress();
|
||||||
|
} catch (InvalidProtocolBufferException ex) {
|
||||||
|
throw new ResourceInvalidException("Invalid Resource in address proto", ex);
|
||||||
|
}
|
||||||
|
validateAddress(socketAddress);
|
||||||
|
|
||||||
|
String ip = socketAddress.getAddress();
|
||||||
|
int port = socketAddress.getPortValue();
|
||||||
|
|
||||||
|
try {
|
||||||
|
return new InetSocketAddress(InetAddresses.forString(ip), port);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw createException("Invalid IP address or port: " + ip + ":" + port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateAddress(SocketAddress socketAddress) throws ResourceInvalidException {
|
||||||
|
if (socketAddress.getAddress().isEmpty()) {
|
||||||
|
throw createException("Address field is empty or invalid.");
|
||||||
|
}
|
||||||
|
long port = Integer.toUnsignedLong(socketAddress.getPortValue());
|
||||||
|
if (port > 65535) {
|
||||||
|
throw createException(String.format("Port value %d out of range 1-65535.", port));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResourceInvalidException createException(String message) {
|
||||||
|
return new ResourceInvalidException(
|
||||||
|
"Failed to parse envoy.config.core.v3.Address: " + message);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -179,7 +179,7 @@ public class CdsLoadBalancer2Test {
|
||||||
public void discoverTopLevelEdsCluster() {
|
public void discoverTopLevelEdsCluster() {
|
||||||
CdsUpdate update =
|
CdsUpdate update =
|
||||||
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
||||||
outlierDetection)
|
outlierDetection, false)
|
||||||
.roundRobinLbPolicy().build();
|
.roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||||
assertThat(childBalancers).hasSize(1);
|
assertThat(childBalancers).hasSize(1);
|
||||||
|
|
@ -198,7 +198,8 @@ public class CdsLoadBalancer2Test {
|
||||||
@Test
|
@Test
|
||||||
public void discoverTopLevelLogicalDnsCluster() {
|
public void discoverTopLevelLogicalDnsCluster() {
|
||||||
CdsUpdate update =
|
CdsUpdate update =
|
||||||
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext)
|
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
||||||
|
false)
|
||||||
.leastRequestLbPolicy(3).build();
|
.leastRequestLbPolicy(3).build();
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||||
assertThat(childBalancers).hasSize(1);
|
assertThat(childBalancers).hasSize(1);
|
||||||
|
|
@ -232,7 +233,7 @@ public class CdsLoadBalancer2Test {
|
||||||
@Test
|
@Test
|
||||||
public void nonAggregateCluster_resourceUpdate() {
|
public void nonAggregateCluster_resourceUpdate() {
|
||||||
CdsUpdate update =
|
CdsUpdate update =
|
||||||
CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext, outlierDetection)
|
CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext, outlierDetection, false)
|
||||||
.roundRobinLbPolicy().build();
|
.roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||||
assertThat(childBalancers).hasSize(1);
|
assertThat(childBalancers).hasSize(1);
|
||||||
|
|
@ -243,7 +244,7 @@ public class CdsLoadBalancer2Test {
|
||||||
100L, upstreamTlsContext, outlierDetection);
|
100L, upstreamTlsContext, outlierDetection);
|
||||||
|
|
||||||
update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, null,
|
update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, null,
|
||||||
outlierDetection).roundRobinLbPolicy().build();
|
outlierDetection, false).roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||||
childLbConfig = (ClusterResolverConfig) childBalancer.config;
|
childLbConfig = (ClusterResolverConfig) childBalancer.config;
|
||||||
instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
|
instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
|
||||||
|
|
@ -254,7 +255,8 @@ public class CdsLoadBalancer2Test {
|
||||||
@Test
|
@Test
|
||||||
public void nonAggregateCluster_resourceRevoked() {
|
public void nonAggregateCluster_resourceRevoked() {
|
||||||
CdsUpdate update =
|
CdsUpdate update =
|
||||||
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext)
|
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext,
|
||||||
|
false)
|
||||||
.roundRobinLbPolicy().build();
|
.roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||||
assertThat(childBalancers).hasSize(1);
|
assertThat(childBalancers).hasSize(1);
|
||||||
|
|
@ -298,16 +300,16 @@ public class CdsLoadBalancer2Test {
|
||||||
CLUSTER, cluster1, cluster2, cluster3, cluster4);
|
CLUSTER, cluster1, cluster2, cluster3, cluster4);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
|
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
|
||||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster3, update3);
|
xdsClient.deliverCdsUpdate(cluster3, update3);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
CdsUpdate update2 =
|
CdsUpdate update2 =
|
||||||
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null)
|
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null, false)
|
||||||
.roundRobinLbPolicy().build();
|
.roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster2, update2);
|
xdsClient.deliverCdsUpdate(cluster2, update2);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
CdsUpdate update4 =
|
CdsUpdate update4 =
|
||||||
CdsUpdate.forEds(cluster4, null, LRS_SERVER_INFO, 300L, null, outlierDetection)
|
CdsUpdate.forEds(cluster4, null, LRS_SERVER_INFO, 300L, null, outlierDetection, false)
|
||||||
.roundRobinLbPolicy().build();
|
.roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster4, update4);
|
xdsClient.deliverCdsUpdate(cluster4, update4);
|
||||||
assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered
|
assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered
|
||||||
|
|
@ -362,10 +364,11 @@ public class CdsLoadBalancer2Test {
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
|
||||||
CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
|
CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
|
||||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster1, update1);
|
xdsClient.deliverCdsUpdate(cluster1, update1);
|
||||||
CdsUpdate update2 =
|
CdsUpdate update2 =
|
||||||
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null)
|
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null,
|
||||||
|
false)
|
||||||
.roundRobinLbPolicy().build();
|
.roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster2, update2);
|
xdsClient.deliverCdsUpdate(cluster2, update2);
|
||||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||||
|
|
@ -412,10 +415,11 @@ public class CdsLoadBalancer2Test {
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
|
||||||
CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
|
CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L,
|
||||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster1, update1);
|
xdsClient.deliverCdsUpdate(cluster1, update1);
|
||||||
CdsUpdate update2 =
|
CdsUpdate update2 =
|
||||||
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null)
|
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null,
|
||||||
|
false)
|
||||||
.roundRobinLbPolicy().build();
|
.roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster2, update2);
|
xdsClient.deliverCdsUpdate(cluster2, update2);
|
||||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||||
|
|
@ -467,7 +471,7 @@ public class CdsLoadBalancer2Test {
|
||||||
xdsClient.deliverCdsUpdate(cluster2, update2);
|
xdsClient.deliverCdsUpdate(cluster2, update2);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3);
|
||||||
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
||||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster3, update3);
|
xdsClient.deliverCdsUpdate(cluster3, update3);
|
||||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||||
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
|
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
|
||||||
|
|
@ -518,7 +522,7 @@ public class CdsLoadBalancer2Test {
|
||||||
|
|
||||||
reset(helper);
|
reset(helper);
|
||||||
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
||||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster3, update3);
|
xdsClient.deliverCdsUpdate(cluster3, update3);
|
||||||
verify(helper).updateBalancingState(
|
verify(helper).updateBalancingState(
|
||||||
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
|
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||||
|
|
@ -553,7 +557,7 @@ public class CdsLoadBalancer2Test {
|
||||||
.roundRobinLbPolicy().build();
|
.roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster2, update2);
|
xdsClient.deliverCdsUpdate(cluster2, update2);
|
||||||
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
||||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster3, update3);
|
xdsClient.deliverCdsUpdate(cluster3, update3);
|
||||||
|
|
||||||
// cluster2 (aggr.) -> [cluster3 (EDS)]
|
// cluster2 (aggr.) -> [cluster3 (EDS)]
|
||||||
|
|
@ -602,7 +606,7 @@ public class CdsLoadBalancer2Test {
|
||||||
|
|
||||||
// Define EDS cluster
|
// Define EDS cluster
|
||||||
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
||||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster3, update3);
|
xdsClient.deliverCdsUpdate(cluster3, update3);
|
||||||
|
|
||||||
// cluster4 (agg) -> [cluster3 (EDS)] with dups (3 copies)
|
// cluster4 (agg) -> [cluster3 (EDS)] with dups (3 copies)
|
||||||
|
|
@ -649,7 +653,8 @@ public class CdsLoadBalancer2Test {
|
||||||
.roundRobinLbPolicy().build();
|
.roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||||
CdsUpdate update1 =
|
CdsUpdate update1 =
|
||||||
CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_INFO, 200L, null)
|
CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_INFO, 200L, null,
|
||||||
|
false)
|
||||||
.roundRobinLbPolicy().build();
|
.roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(cluster1, update1);
|
xdsClient.deliverCdsUpdate(cluster1, update1);
|
||||||
FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers);
|
FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers);
|
||||||
|
|
@ -676,7 +681,7 @@ public class CdsLoadBalancer2Test {
|
||||||
@Test
|
@Test
|
||||||
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
|
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
|
||||||
CdsUpdate update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
CdsUpdate update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L,
|
||||||
upstreamTlsContext, outlierDetection).roundRobinLbPolicy().build();
|
upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build();
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
xdsClient.deliverCdsUpdate(CLUSTER, update);
|
||||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||||
assertThat(childBalancer.shutdown).isFalse();
|
assertThat(childBalancer.shutdown).isFalse();
|
||||||
|
|
@ -692,7 +697,7 @@ public class CdsLoadBalancer2Test {
|
||||||
try {
|
try {
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER,
|
xdsClient.deliverCdsUpdate(CLUSTER,
|
||||||
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
||||||
outlierDetection)
|
outlierDetection, false)
|
||||||
.lbPolicyConfig(ImmutableMap.of("unknownLb", ImmutableMap.of("foo", "bar"))).build());
|
.lbPolicyConfig(ImmutableMap.of("unknownLb", ImmutableMap.of("foo", "bar"))).build());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
assertThat(e).hasMessageThat().contains("unknownLb");
|
assertThat(e).hasMessageThat().contains("unknownLb");
|
||||||
|
|
@ -706,7 +711,7 @@ public class CdsLoadBalancer2Test {
|
||||||
try {
|
try {
|
||||||
xdsClient.deliverCdsUpdate(CLUSTER,
|
xdsClient.deliverCdsUpdate(CLUSTER,
|
||||||
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext,
|
||||||
outlierDetection).lbPolicyConfig(
|
outlierDetection, false).lbPolicyConfig(
|
||||||
ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1")))
|
ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1")))
|
||||||
.build());
|
.build());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ import io.grpc.Attributes;
|
||||||
import io.grpc.ChannelLogger;
|
import io.grpc.ChannelLogger;
|
||||||
import io.grpc.ConnectivityState;
|
import io.grpc.ConnectivityState;
|
||||||
import io.grpc.EquivalentAddressGroup;
|
import io.grpc.EquivalentAddressGroup;
|
||||||
|
import io.grpc.HttpConnectProxiedSocketAddress;
|
||||||
import io.grpc.InsecureChannelCredentials;
|
import io.grpc.InsecureChannelCredentials;
|
||||||
import io.grpc.LoadBalancer;
|
import io.grpc.LoadBalancer;
|
||||||
import io.grpc.LoadBalancer.Helper;
|
import io.grpc.LoadBalancer.Helper;
|
||||||
|
|
@ -83,6 +84,7 @@ import io.grpc.xds.client.Locality;
|
||||||
import io.grpc.xds.client.XdsClient;
|
import io.grpc.xds.client.XdsClient;
|
||||||
import io.grpc.xds.client.XdsResourceType;
|
import io.grpc.xds.client.XdsResourceType;
|
||||||
import io.grpc.xds.internal.security.CommonTlsContextTestsUtil;
|
import io.grpc.xds.internal.security.CommonTlsContextTestsUtil;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
|
@ -240,7 +242,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void edsClustersWithRingHashEndpointLbPolicy() {
|
public void edsClustersWithRingHashEndpointLbPolicy() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Collections.singletonList(edsDiscoveryMechanism1), ringHash);
|
Collections.singletonList(edsDiscoveryMechanism1), ringHash, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -252,14 +254,18 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
LocalityLbEndpoints localityLbEndpoints1 =
|
LocalityLbEndpoints localityLbEndpoints1 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
LbEndpoint.create(endpoint1, 0 /* loadBalancingWeight */, true, "hostname1"),
|
LbEndpoint.create(endpoint1, 0 /* loadBalancingWeight */,
|
||||||
LbEndpoint.create(endpoint2, 0 /* loadBalancingWeight */, true, "hostname2")),
|
true, "hostname1", ImmutableMap.of()),
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
LbEndpoint.create(endpoint2, 0 /* loadBalancingWeight */,
|
||||||
|
true, "hostname2", ImmutableMap.of())),
|
||||||
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
LocalityLbEndpoints localityLbEndpoints2 =
|
LocalityLbEndpoints localityLbEndpoints2 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(
|
Collections.singletonList(
|
||||||
LbEndpoint.create(endpoint3, 60 /* loadBalancingWeight */, true, "hostname3")),
|
LbEndpoint.create(
|
||||||
50 /* localityWeight */, 1 /* priority */);
|
endpoint3, 60 /* loadBalancingWeight */, true,
|
||||||
|
"hostname3", ImmutableMap.of())),
|
||||||
|
50 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1,
|
EDS_SERVICE_NAME1,
|
||||||
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
|
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
|
||||||
|
|
@ -302,7 +308,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void edsClustersWithLeastRequestEndpointLbPolicy() {
|
public void edsClustersWithLeastRequestEndpointLbPolicy() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Collections.singletonList(edsDiscoveryMechanism1), leastRequest);
|
Collections.singletonList(edsDiscoveryMechanism1), leastRequest, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -312,8 +318,9 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
LocalityLbEndpoints localityLbEndpoints =
|
LocalityLbEndpoints localityLbEndpoints =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, "hostname1")),
|
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
|
||||||
100 /* localityWeight */, 1 /* priority */);
|
"hostname1", ImmutableMap.of())),
|
||||||
|
100 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1,
|
EDS_SERVICE_NAME1,
|
||||||
ImmutableMap.of(locality1, localityLbEndpoints));
|
ImmutableMap.of(locality1, localityLbEndpoints));
|
||||||
|
|
@ -348,7 +355,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void edsClustersEndpointHostname_addedToAddressAttribute() {
|
public void edsClustersEndpointHostname_addedToAddressAttribute() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest);
|
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -358,8 +365,9 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
LocalityLbEndpoints localityLbEndpoints =
|
LocalityLbEndpoints localityLbEndpoints =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, "hostname1")),
|
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
|
||||||
100 /* localityWeight */, 1 /* priority */);
|
"hostname1", ImmutableMap.of())),
|
||||||
|
100 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1,
|
EDS_SERVICE_NAME1,
|
||||||
ImmutableMap.of(locality1, localityLbEndpoints));
|
ImmutableMap.of(locality1, localityLbEndpoints));
|
||||||
|
|
@ -371,11 +379,104 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo("hostname1");
|
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo("hostname1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void endpointAddressRewritten_whenProxyMetadataIsInEndpointMetadata() {
|
||||||
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
|
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, true);
|
||||||
|
deliverLbConfig(config);
|
||||||
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
||||||
|
EquivalentAddressGroup endpoint =
|
||||||
|
new EquivalentAddressGroup(InetSocketAddress.createUnresolved("127.0.0.1", 8080));
|
||||||
|
|
||||||
|
// Proxy address in endpointMetadata (use FakeSocketAddress directly)
|
||||||
|
SocketAddress proxyAddress = new FakeSocketAddress("127.0.0.2");
|
||||||
|
ImmutableMap<String, Object> endpointMetadata =
|
||||||
|
ImmutableMap.of("envoy.http11_proxy_transport_socket.proxy_address", proxyAddress);
|
||||||
|
|
||||||
|
// No proxy in locality metadata
|
||||||
|
ImmutableMap<String, Object> localityMetadata = ImmutableMap.of();
|
||||||
|
|
||||||
|
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create(
|
||||||
|
Arrays.asList(
|
||||||
|
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
|
||||||
|
"hostname1", endpointMetadata)),
|
||||||
|
100 /* localityWeight */, 1 /* priority */, localityMetadata);
|
||||||
|
|
||||||
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
|
EDS_SERVICE_NAME1,
|
||||||
|
ImmutableMap.of(locality1, localityLbEndpoints));
|
||||||
|
|
||||||
|
assertThat(childBalancers).hasSize(1);
|
||||||
|
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||||
|
|
||||||
|
// Get the rewritten address
|
||||||
|
SocketAddress rewrittenAddress =
|
||||||
|
childBalancer.addresses.get(0).getAddresses().get(0);
|
||||||
|
assertThat(rewrittenAddress).isInstanceOf(HttpConnectProxiedSocketAddress.class);
|
||||||
|
HttpConnectProxiedSocketAddress proxiedSocket =
|
||||||
|
(HttpConnectProxiedSocketAddress) rewrittenAddress;
|
||||||
|
|
||||||
|
// Assert that the target address is the original address
|
||||||
|
assertThat(proxiedSocket.getTargetAddress())
|
||||||
|
.isEqualTo(endpoint.getAddresses().get(0));
|
||||||
|
|
||||||
|
// Assert that the proxy address is correctly set
|
||||||
|
assertThat(proxiedSocket.getProxyAddress()).isEqualTo(proxyAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void endpointAddressRewritten_whenProxyMetadataIsInLocalityMetadata() {
|
||||||
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
|
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection), leastRequest, true);
|
||||||
|
deliverLbConfig(config);
|
||||||
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
||||||
|
EquivalentAddressGroup endpoint =
|
||||||
|
new EquivalentAddressGroup(InetSocketAddress.createUnresolved("127.0.0.2", 8080));
|
||||||
|
|
||||||
|
// No proxy in endpointMetadata
|
||||||
|
ImmutableMap<String, Object> endpointMetadata = ImmutableMap.of();
|
||||||
|
|
||||||
|
// Proxy address is now in localityMetadata
|
||||||
|
SocketAddress proxyAddress = new FakeSocketAddress("proxy-addr");
|
||||||
|
ImmutableMap<String, Object> localityMetadata =
|
||||||
|
ImmutableMap.of("envoy.http11_proxy_transport_socket.proxy_address", proxyAddress);
|
||||||
|
|
||||||
|
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create(
|
||||||
|
Arrays.asList(
|
||||||
|
LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true,
|
||||||
|
"hostname2", endpointMetadata)),
|
||||||
|
100 /* localityWeight */, 1 /* priority */, localityMetadata);
|
||||||
|
|
||||||
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
|
EDS_SERVICE_NAME1,
|
||||||
|
ImmutableMap.of(locality1, localityLbEndpoints));
|
||||||
|
|
||||||
|
assertThat(childBalancers).hasSize(1);
|
||||||
|
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||||
|
|
||||||
|
// Get the rewritten address
|
||||||
|
SocketAddress rewrittenAddress = childBalancer.addresses.get(0).getAddresses().get(0);
|
||||||
|
|
||||||
|
// Assert that the address was rewritten
|
||||||
|
assertThat(rewrittenAddress).isInstanceOf(HttpConnectProxiedSocketAddress.class);
|
||||||
|
HttpConnectProxiedSocketAddress proxiedSocket =
|
||||||
|
(HttpConnectProxiedSocketAddress) rewrittenAddress;
|
||||||
|
|
||||||
|
// Assert that the target address is the original address
|
||||||
|
assertThat(proxiedSocket.getTargetAddress()).isEqualTo(endpoint.getAddresses().get(0));
|
||||||
|
|
||||||
|
// Assert that the proxy address is correctly set from locality metadata
|
||||||
|
assertThat(proxiedSocket.getProxyAddress()).isEqualTo(proxyAddress);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void onlyEdsClusters_receivedEndpoints() {
|
public void onlyEdsClusters_receivedEndpoints() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -389,17 +490,21 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
LocalityLbEndpoints localityLbEndpoints1 =
|
LocalityLbEndpoints localityLbEndpoints1 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
LbEndpoint.create(endpoint1, 100, true, "hostname1"),
|
LbEndpoint.create(endpoint1, 100,
|
||||||
LbEndpoint.create(endpoint2, 100, true, "hostname1")),
|
true, "hostname1", ImmutableMap.of()),
|
||||||
70 /* localityWeight */, 1 /* priority */);
|
LbEndpoint.create(endpoint2, 100,
|
||||||
|
true, "hostname1", ImmutableMap.of())),
|
||||||
|
70 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
LocalityLbEndpoints localityLbEndpoints2 =
|
LocalityLbEndpoints localityLbEndpoints2 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true, "hostname2")),
|
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true,
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
"hostname2", ImmutableMap.of())),
|
||||||
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
LocalityLbEndpoints localityLbEndpoints3 =
|
LocalityLbEndpoints localityLbEndpoints3 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint4, 100, true, "hostname3")),
|
Collections.singletonList(LbEndpoint.create(endpoint4, 100, true,
|
||||||
20 /* localityWeight */, 2 /* priority */);
|
"hostname3", ImmutableMap.of())),
|
||||||
|
20 /* localityWeight */, 2 /* priority */, ImmutableMap.of());
|
||||||
String priority1 = CLUSTER2 + "[child1]";
|
String priority1 = CLUSTER2 + "[child1]";
|
||||||
String priority2 = CLUSTER2 + "[child2]";
|
String priority2 = CLUSTER2 + "[child2]";
|
||||||
String priority3 = CLUSTER1 + "[child1]";
|
String priority3 = CLUSTER1 + "[child1]";
|
||||||
|
|
@ -487,7 +592,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
private void verifyEdsPriorityNames(List<String> want,
|
private void verifyEdsPriorityNames(List<String> want,
|
||||||
Map<Locality, LocalityLbEndpoints>... updates) {
|
Map<Locality, LocalityLbEndpoints>... updates) {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism2), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism2), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME2);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME2);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -553,15 +658,17 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
private LocalityLbEndpoints createEndpoints(int priority) {
|
private LocalityLbEndpoints createEndpoints(int priority) {
|
||||||
return LocalityLbEndpoints.create(
|
return LocalityLbEndpoints.create(
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
LbEndpoint.create(makeAddress("endpoint-addr-1"), 100, true, "hostname1"),
|
LbEndpoint.create(makeAddress("endpoint-addr-1"), 100,
|
||||||
LbEndpoint.create(makeAddress("endpoint-addr-2"), 100, true, "hostname2")),
|
true, "hostname1", ImmutableMap.of()),
|
||||||
70 /* localityWeight */, priority /* priority */);
|
LbEndpoint.create(makeAddress("endpoint-addr-2"), 100,
|
||||||
|
true, "hostname2", ImmutableMap.of())),
|
||||||
|
70 /* localityWeight */, priority /* priority */, ImmutableMap.of());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() {
|
public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -583,7 +690,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void onlyEdsClusters_allResourcesRevoked_shutDownChildLbPolicy() {
|
public void onlyEdsClusters_allResourcesRevoked_shutDownChildLbPolicy() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -592,12 +699,14 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
||||||
LocalityLbEndpoints localityLbEndpoints1 =
|
LocalityLbEndpoints localityLbEndpoints1 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, "hostname1")),
|
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true,
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
"hostname1", ImmutableMap.of())),
|
||||||
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
LocalityLbEndpoints localityLbEndpoints2 =
|
LocalityLbEndpoints localityLbEndpoints2 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true, "hostname2")),
|
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true,
|
||||||
20 /* localityWeight */, 2 /* priority */);
|
"hostname2", ImmutableMap.of())),
|
||||||
|
20 /* localityWeight */, 2 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints1));
|
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints1));
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
|
|
@ -618,17 +727,19 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void handleEdsResource_ignoreUnhealthyEndpoints() {
|
public void handleEdsResource_ignoreUnhealthyEndpoints() {
|
||||||
ClusterResolverConfig config =
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
|
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
||||||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
||||||
LocalityLbEndpoints localityLbEndpoints =
|
LocalityLbEndpoints localityLbEndpoints =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
LbEndpoint.create(endpoint1, 100, false /* isHealthy */, "hostname1"),
|
LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
|
||||||
LbEndpoint.create(endpoint2, 100, true /* isHealthy */, "hostname2")),
|
"hostname1", ImmutableMap.of()),
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
LbEndpoint.create(endpoint2, 100, true /* isHealthy */,
|
||||||
|
"hostname2", ImmutableMap.of())),
|
||||||
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
||||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||||
|
|
@ -638,21 +749,21 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() {
|
public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() {
|
||||||
ClusterResolverConfig config =
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
|
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
||||||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
||||||
LocalityLbEndpoints localityLbEndpoints1 =
|
LocalityLbEndpoints localityLbEndpoints1 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
|
Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
|
||||||
"hostname1")),
|
"hostname1", ImmutableMap.of())),
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
LocalityLbEndpoints localityLbEndpoints2 =
|
LocalityLbEndpoints localityLbEndpoints2 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */,
|
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */,
|
||||||
"hostname2")),
|
"hostname2", ImmutableMap.of())),
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1,
|
EDS_SERVICE_NAME1,
|
||||||
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
|
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
|
||||||
|
|
@ -665,21 +776,21 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() {
|
public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() {
|
||||||
ClusterResolverConfig config =
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
|
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
||||||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
||||||
LocalityLbEndpoints localityLbEndpoints1 =
|
LocalityLbEndpoints localityLbEndpoints1 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
|
Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */,
|
||||||
"hostname1")),
|
"hostname1", ImmutableMap.of())),
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
LocalityLbEndpoints localityLbEndpoints2 =
|
LocalityLbEndpoints localityLbEndpoints2 =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */,
|
Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */,
|
||||||
"hostname2")),
|
"hostname2", ImmutableMap.of())),
|
||||||
10 /* localityWeight */, 2 /* priority */);
|
10 /* localityWeight */, 2 /* priority */, ImmutableMap.of());
|
||||||
String priority2 = CLUSTER1 + "[child2]";
|
String priority2 = CLUSTER1 + "[child2]";
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1,
|
EDS_SERVICE_NAME1,
|
||||||
|
|
@ -691,15 +802,15 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void handleEdsResource_noHealthyEndpoint() {
|
public void handleEdsResource_noHealthyEndpoint() {
|
||||||
ClusterResolverConfig config =
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
|
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
|
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
|
||||||
LocalityLbEndpoints localityLbEndpoints =
|
LocalityLbEndpoints localityLbEndpoints =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint, 100, false /* isHealthy */,
|
Collections.singletonList(LbEndpoint.create(endpoint, 100, false /* isHealthy */,
|
||||||
"hostname1")),
|
"hostname1", ImmutableMap.of())),
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(EDS_SERVICE_NAME1,
|
xdsClient.deliverClusterLoadAssignment(EDS_SERVICE_NAME1,
|
||||||
Collections.singletonMap(locality1, localityLbEndpoints)); // single endpoint, unhealthy
|
Collections.singletonMap(locality1, localityLbEndpoints)); // single endpoint, unhealthy
|
||||||
|
|
||||||
|
|
@ -716,7 +827,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void onlyLogicalDnsCluster_endpointsResolved() {
|
public void onlyLogicalDnsCluster_endpointsResolved() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
|
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -749,7 +860,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void onlyLogicalDnsCluster_handleRefreshNameResolution() {
|
public void onlyLogicalDnsCluster_handleRefreshNameResolution() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
|
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -767,7 +878,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
|
InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
|
||||||
backoffPolicy1, backoffPolicy2);
|
backoffPolicy1, backoffPolicy2);
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
|
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -813,7 +924,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() {
|
public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() {
|
||||||
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
|
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
|
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -851,7 +962,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void edsClustersAndLogicalDnsCluster_receivedEndpoints() {
|
public void edsClustersAndLogicalDnsCluster_receivedEndpoints() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
|
|
@ -862,8 +973,9 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
|
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
|
||||||
LocalityLbEndpoints localityLbEndpoints =
|
LocalityLbEndpoints localityLbEndpoints =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true, "hostname3")),
|
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true,
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
"hostname3", ImmutableMap.of())),
|
||||||
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
||||||
|
|
||||||
|
|
@ -886,7 +998,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void noEdsResourceExists_useDnsResolutionResults() {
|
public void noEdsResourceExists_useDnsResolutionResults() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
|
|
@ -910,7 +1022,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturnErrorPicker() {
|
public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturnErrorPicker() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
|
|
@ -919,8 +1031,9 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
|
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
|
||||||
LocalityLbEndpoints localityLbEndpoints =
|
LocalityLbEndpoints localityLbEndpoints =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint, 100, true, "hostname1")),
|
Collections.singletonList(LbEndpoint.create(endpoint, 100, true,
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
"hostname1", ImmutableMap.of())),
|
||||||
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
||||||
resolver.deliverError(Status.UNKNOWN.withDescription("I am lost"));
|
resolver.deliverError(Status.UNKNOWN.withDescription("I am lost"));
|
||||||
|
|
@ -941,7 +1054,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void resolutionErrorAfterChildLbCreated_propagateErrorIfAllClustersEncounterError() {
|
public void resolutionErrorAfterChildLbCreated_propagateErrorIfAllClustersEncounterError() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
|
|
@ -950,8 +1063,9 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
|
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
|
||||||
LocalityLbEndpoints localityLbEndpoints =
|
LocalityLbEndpoints localityLbEndpoints =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint, 100, true, "hostname1")),
|
Collections.singletonList(LbEndpoint.create(endpoint, 100, true,
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
"hostname1", ImmutableMap.of())),
|
||||||
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
||||||
assertThat(childBalancers).isEmpty(); // not created until all clusters resolved.
|
assertThat(childBalancers).isEmpty(); // not created until all clusters resolved.
|
||||||
|
|
@ -976,7 +1090,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void resolutionErrorBeforeChildLbCreated_returnErrorPickerIfAllClustersEncounterError() {
|
public void resolutionErrorBeforeChildLbCreated_returnErrorPickerIfAllClustersEncounterError() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
|
|
@ -999,7 +1113,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void resolutionErrorBeforeChildLbCreated_edsOnly_returnErrorPicker() {
|
public void resolutionErrorBeforeChildLbCreated_edsOnly_returnErrorPicker() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
assertThat(childBalancers).isEmpty();
|
assertThat(childBalancers).isEmpty();
|
||||||
|
|
@ -1017,7 +1131,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() {
|
public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
assertResolverCreated("/" + DNS_HOST_NAME);
|
assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
|
|
@ -1033,7 +1147,7 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
|
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
|
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin, false);
|
||||||
deliverLbConfig(config);
|
deliverLbConfig(config);
|
||||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
|
||||||
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
|
||||||
|
|
@ -1043,8 +1157,9 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
|
||||||
LocalityLbEndpoints localityLbEndpoints =
|
LocalityLbEndpoints localityLbEndpoints =
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, "hostname1")),
|
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true,
|
||||||
10 /* localityWeight */, 1 /* priority */);
|
"hostname1", ImmutableMap.of())),
|
||||||
|
10 /* localityWeight */, 1 /* priority */, ImmutableMap.of());
|
||||||
xdsClient.deliverClusterLoadAssignment(
|
xdsClient.deliverClusterLoadAssignment(
|
||||||
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
|
||||||
resolver.deliverEndpointAddresses(Collections.singletonList(endpoint2));
|
resolver.deliverEndpointAddresses(Collections.singletonList(endpoint2));
|
||||||
|
|
@ -1118,37 +1233,37 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static EquivalentAddressGroup makeAddress(final String name) {
|
private static EquivalentAddressGroup makeAddress(final String name) {
|
||||||
class FakeSocketAddress extends SocketAddress {
|
return new EquivalentAddressGroup(new FakeSocketAddress(name));
|
||||||
private final String name;
|
}
|
||||||
|
|
||||||
private FakeSocketAddress(String name) {
|
static class FakeSocketAddress extends SocketAddress {
|
||||||
this.name = name;
|
private final String name;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
private FakeSocketAddress(String name) {
|
||||||
public int hashCode() {
|
this.name = name;
|
||||||
return Objects.hash(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (this == o) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (!(o instanceof FakeSocketAddress)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
FakeSocketAddress that = (FakeSocketAddress) o;
|
|
||||||
return Objects.equals(name, that.name);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return new EquivalentAddressGroup(new FakeSocketAddress(name));
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!(o instanceof FakeSocketAddress)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
FakeSocketAddress that = (FakeSocketAddress) o;
|
||||||
|
return Objects.equals(name, that.name);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class FakeXdsClient extends XdsClient {
|
private static final class FakeXdsClient extends XdsClient {
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ package io.grpc.xds;
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
import static io.envoyproxy.envoy.config.route.v3.RouteAction.ClusterSpecifierCase.CLUSTER_SPECIFIER_PLUGIN;
|
import static io.envoyproxy.envoy.config.route.v3.RouteAction.ClusterSpecifierCase.CLUSTER_SPECIFIER_PLUGIN;
|
||||||
|
import static io.grpc.xds.XdsClusterResource.TRANSPORT_SOCKET_NAME_HTTP11_PROXY;
|
||||||
import static io.grpc.xds.XdsEndpointResource.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS;
|
import static io.grpc.xds.XdsEndpointResource.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
|
@ -93,6 +94,7 @@ import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3
|
||||||
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
|
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
|
||||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin;
|
import io.envoyproxy.envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin;
|
||||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality;
|
import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality;
|
||||||
|
import io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3.Http11ProxyUpstreamTransport;
|
||||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateProviderPluginInstance;
|
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateProviderPluginInstance;
|
||||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
|
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
|
||||||
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
|
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
|
||||||
|
|
@ -1055,7 +1057,7 @@ public class GrpcXdsClientImplDataTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void parseLocalityLbEndpoints_withHealthyEndpoints() {
|
public void parseLocalityLbEndpoints_withHealthyEndpoints() throws ResourceInvalidException {
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
||||||
.setLocality(Locality.newBuilder()
|
.setLocality(Locality.newBuilder()
|
||||||
|
|
@ -1075,12 +1077,14 @@ public class GrpcXdsClientImplDataTest {
|
||||||
assertThat(struct.getErrorDetail()).isNull();
|
assertThat(struct.getErrorDetail()).isNull();
|
||||||
assertThat(struct.getStruct()).isEqualTo(
|
assertThat(struct.getStruct()).isEqualTo(
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, true, "")),
|
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888,
|
||||||
100, 1));
|
20, true, "", ImmutableMap.of())),
|
||||||
|
100, 1, ImmutableMap.of()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void parseLocalityLbEndpoints_treatUnknownHealthAsHealthy() {
|
public void parseLocalityLbEndpoints_treatUnknownHealthAsHealthy()
|
||||||
|
throws ResourceInvalidException {
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
||||||
.setLocality(Locality.newBuilder()
|
.setLocality(Locality.newBuilder()
|
||||||
|
|
@ -1100,12 +1104,13 @@ public class GrpcXdsClientImplDataTest {
|
||||||
assertThat(struct.getErrorDetail()).isNull();
|
assertThat(struct.getErrorDetail()).isNull();
|
||||||
assertThat(struct.getStruct()).isEqualTo(
|
assertThat(struct.getStruct()).isEqualTo(
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, true, "")), 100,
|
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888,
|
||||||
1));
|
20, true, "", ImmutableMap.of())),
|
||||||
|
100, 1, ImmutableMap.of()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void parseLocalityLbEndpoints_withUnHealthyEndpoints() {
|
public void parseLocalityLbEndpoints_withUnHealthyEndpoints() throws ResourceInvalidException {
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
||||||
.setLocality(Locality.newBuilder()
|
.setLocality(Locality.newBuilder()
|
||||||
|
|
@ -1125,12 +1130,13 @@ public class GrpcXdsClientImplDataTest {
|
||||||
assertThat(struct.getErrorDetail()).isNull();
|
assertThat(struct.getErrorDetail()).isNull();
|
||||||
assertThat(struct.getStruct()).isEqualTo(
|
assertThat(struct.getStruct()).isEqualTo(
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20, false, "")), 100,
|
Collections.singletonList(LbEndpoint.create("172.14.14.5", 8888, 20,
|
||||||
1));
|
false, "", ImmutableMap.of())),
|
||||||
|
100, 1, ImmutableMap.of()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void parseLocalityLbEndpoints_ignorZeroWeightLocality() {
|
public void parseLocalityLbEndpoints_ignorZeroWeightLocality() throws ResourceInvalidException {
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
||||||
.setLocality(Locality.newBuilder()
|
.setLocality(Locality.newBuilder()
|
||||||
|
|
@ -1187,7 +1193,10 @@ public class GrpcXdsClientImplDataTest {
|
||||||
EquivalentAddressGroup expectedEag = new EquivalentAddressGroup(socketAddressList);
|
EquivalentAddressGroup expectedEag = new EquivalentAddressGroup(socketAddressList);
|
||||||
assertThat(struct.getStruct()).isEqualTo(
|
assertThat(struct.getStruct()).isEqualTo(
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
Collections.singletonList(LbEndpoint.create(expectedEag, 20, true, "")), 100, 1));
|
Collections.singletonList(LbEndpoint.create(
|
||||||
|
expectedEag, 20, true, "", ImmutableMap.of())), 100, 1, ImmutableMap.of()));
|
||||||
|
} catch (ResourceInvalidException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
} finally {
|
} finally {
|
||||||
if (originalDualStackProp != null) {
|
if (originalDualStackProp != null) {
|
||||||
System.setProperty(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, originalDualStackProp);
|
System.setProperty(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, originalDualStackProp);
|
||||||
|
|
@ -1198,7 +1207,7 @@ public class GrpcXdsClientImplDataTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void parseLocalityLbEndpoints_invalidPriority() {
|
public void parseLocalityLbEndpoints_invalidPriority() throws ResourceInvalidException {
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
|
||||||
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
|
||||||
.setLocality(Locality.newBuilder()
|
.setLocality(Locality.newBuilder()
|
||||||
|
|
@ -2456,6 +2465,59 @@ public class GrpcXdsClientImplDataTest {
|
||||||
assertThat(update.parsedMetadata()).isEqualTo(expectedParsedMetadata);
|
assertThat(update.parsedMetadata()).isEqualTo(expectedParsedMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void processCluster_parsesAddressMetadata() throws Exception {
|
||||||
|
|
||||||
|
// Create an Address message
|
||||||
|
Address address = Address.newBuilder()
|
||||||
|
.setSocketAddress(SocketAddress.newBuilder()
|
||||||
|
.setAddress("192.168.1.1")
|
||||||
|
.setPortValue(8080)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Wrap the Address in Any
|
||||||
|
Any addressMetadata = Any.newBuilder()
|
||||||
|
.setTypeUrl("type.googleapis.com/envoy.config.core.v3.Address")
|
||||||
|
.setValue(address.toByteString())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Struct filterMetadata = Struct.newBuilder()
|
||||||
|
.putFields("key1", Value.newBuilder().setStringValue("value1").build())
|
||||||
|
.putFields("key2", Value.newBuilder().setNumberValue(42).build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Metadata metadata = Metadata.newBuilder()
|
||||||
|
.putTypedFilterMetadata("ADDRESS_METADATA", addressMetadata)
|
||||||
|
.putFilterMetadata("FILTER_METADATA", filterMetadata)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Cluster cluster = Cluster.newBuilder()
|
||||||
|
.setName("cluster-foo.googleapis.com")
|
||||||
|
.setType(DiscoveryType.EDS)
|
||||||
|
.setEdsClusterConfig(
|
||||||
|
EdsClusterConfig.newBuilder()
|
||||||
|
.setEdsConfig(
|
||||||
|
ConfigSource.newBuilder()
|
||||||
|
.setAds(AggregatedConfigSource.getDefaultInstance()))
|
||||||
|
.setServiceName("service-foo.googleapis.com"))
|
||||||
|
.setLbPolicy(LbPolicy.ROUND_ROBIN)
|
||||||
|
.setMetadata(metadata)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
CdsUpdate update = XdsClusterResource.processCluster(
|
||||||
|
cluster, null, LRS_SERVER_INFO,
|
||||||
|
LoadBalancerRegistry.getDefaultRegistry());
|
||||||
|
|
||||||
|
ImmutableMap<String, Object> expectedParsedMetadata = ImmutableMap.of(
|
||||||
|
"ADDRESS_METADATA", new InetSocketAddress("192.168.1.1", 8080),
|
||||||
|
"FILTER_METADATA", ImmutableMap.of(
|
||||||
|
"key1", "value1",
|
||||||
|
"key2", 42.0));
|
||||||
|
|
||||||
|
assertThat(update.parsedMetadata()).isEqualTo(expectedParsedMetadata);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void processCluster_metadataKeyCollision_resolvesToTypedMetadata()
|
public void processCluster_metadataKeyCollision_resolvesToTypedMetadata()
|
||||||
throws ResourceInvalidException, InvalidProtocolBufferException {
|
throws ResourceInvalidException, InvalidProtocolBufferException {
|
||||||
|
|
@ -2512,6 +2574,40 @@ public class GrpcXdsClientImplDataTest {
|
||||||
metadataRegistry.removeParser(testParser);
|
metadataRegistry.removeParser(testParser);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void parseNonAggregateCluster_withHttp11ProxyTransportSocket()
|
||||||
|
throws ResourceInvalidException, InvalidProtocolBufferException {
|
||||||
|
XdsClusterResource.isEnabledXdsHttpConnect = true;
|
||||||
|
|
||||||
|
Http11ProxyUpstreamTransport http11ProxyUpstreamTransport =
|
||||||
|
Http11ProxyUpstreamTransport.newBuilder()
|
||||||
|
.setTransportSocket(TransportSocket.getDefaultInstance())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
TransportSocket transportSocket = TransportSocket.newBuilder()
|
||||||
|
.setName(TRANSPORT_SOCKET_NAME_HTTP11_PROXY)
|
||||||
|
.setTypedConfig(Any.pack(http11ProxyUpstreamTransport))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Cluster cluster = Cluster.newBuilder()
|
||||||
|
.setName("cluster-http11-proxy.googleapis.com")
|
||||||
|
.setType(DiscoveryType.EDS)
|
||||||
|
.setEdsClusterConfig(
|
||||||
|
EdsClusterConfig.newBuilder()
|
||||||
|
.setEdsConfig(
|
||||||
|
ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
|
||||||
|
.setServiceName("service-http11-proxy.googleapis.com"))
|
||||||
|
.setLbPolicy(LbPolicy.ROUND_ROBIN)
|
||||||
|
.setTransportSocket(transportSocket)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
CdsUpdate result =
|
||||||
|
XdsClusterResource.processCluster(cluster, null, LRS_SERVER_INFO,
|
||||||
|
LoadBalancerRegistry.getDefaultRegistry());
|
||||||
|
|
||||||
|
assertThat(result).isNotNull();
|
||||||
|
assertThat(result.isHttp11ProxyAvailable()).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void parseServerSideListener_invalidTrafficDirection() throws ResourceInvalidException {
|
public void parseServerSideListener_invalidTrafficDirection() throws ResourceInvalidException {
|
||||||
|
|
|
||||||
|
|
@ -607,9 +607,9 @@ public abstract class GrpcXdsClientImplTestBase {
|
||||||
Locality.create("region1", "zone1", "subzone1"),
|
Locality.create("region1", "zone1", "subzone1"),
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
ImmutableList.of(LbEndpoint.create("192.168.0.1", 8080, 2, true,
|
ImmutableList.of(LbEndpoint.create("192.168.0.1", 8080, 2, true,
|
||||||
"endpoint-host-name")), 1, 0),
|
"endpoint-host-name", ImmutableMap.of())), 1, 0, ImmutableMap.of()),
|
||||||
Locality.create("region3", "zone3", "subzone3"),
|
Locality.create("region3", "zone3", "subzone3"),
|
||||||
LocalityLbEndpoints.create(ImmutableList.<LbEndpoint>of(), 2, 1));
|
LocalityLbEndpoints.create(ImmutableList.<LbEndpoint>of(), 2, 1, ImmutableMap.of()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -3246,7 +3246,9 @@ public abstract class GrpcXdsClientImplTestBase {
|
||||||
Locality.create("region2", "zone2", "subzone2"),
|
Locality.create("region2", "zone2", "subzone2"),
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
ImmutableList.of(
|
ImmutableList.of(
|
||||||
LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0));
|
LbEndpoint.create("172.44.2.2", 8000, 3,
|
||||||
|
true, "endpoint-host-name", ImmutableMap.of())),
|
||||||
|
2, 0, ImmutableMap.of()));
|
||||||
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, updatedClusterLoadAssignment, VERSION_2,
|
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, updatedClusterLoadAssignment, VERSION_2,
|
||||||
TIME_INCREMENT * 2);
|
TIME_INCREMENT * 2);
|
||||||
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
|
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
|
||||||
|
|
@ -3416,7 +3418,9 @@ public abstract class GrpcXdsClientImplTestBase {
|
||||||
Locality.create("region2", "zone2", "subzone2"),
|
Locality.create("region2", "zone2", "subzone2"),
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
ImmutableList.of(
|
ImmutableList.of(
|
||||||
LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0));
|
LbEndpoint.create("172.44.2.2", 8000, 3,
|
||||||
|
true, "endpoint-host-name", ImmutableMap.of())),
|
||||||
|
2, 0, ImmutableMap.of()));
|
||||||
verify(watcher2).onChanged(edsUpdateCaptor.capture());
|
verify(watcher2).onChanged(edsUpdateCaptor.capture());
|
||||||
edsUpdate = edsUpdateCaptor.getValue();
|
edsUpdate = edsUpdateCaptor.getValue();
|
||||||
assertThat(edsUpdate.clusterName).isEqualTo(edsResourceTwo);
|
assertThat(edsUpdate.clusterName).isEqualTo(edsResourceTwo);
|
||||||
|
|
@ -3426,7 +3430,9 @@ public abstract class GrpcXdsClientImplTestBase {
|
||||||
Locality.create("region2", "zone2", "subzone2"),
|
Locality.create("region2", "zone2", "subzone2"),
|
||||||
LocalityLbEndpoints.create(
|
LocalityLbEndpoints.create(
|
||||||
ImmutableList.of(
|
ImmutableList.of(
|
||||||
LbEndpoint.create("172.44.2.2", 8000, 3, true, "endpoint-host-name")), 2, 0));
|
LbEndpoint.create("172.44.2.2", 8000, 3,
|
||||||
|
true, "endpoint-host-name", ImmutableMap.of())),
|
||||||
|
2, 0, ImmutableMap.of()));
|
||||||
verifyNoMoreInteractions(edsResourceWatcher);
|
verifyNoMoreInteractions(edsResourceWatcher);
|
||||||
verifyResourceMetadataAcked(
|
verifyResourceMetadataAcked(
|
||||||
EDS, edsResourceTwo, clusterLoadAssignmentTwo, VERSION_2, TIME_INCREMENT * 2);
|
EDS, edsResourceTwo, clusterLoadAssignmentTwo, VERSION_2, TIME_INCREMENT * 2);
|
||||||
|
|
|
||||||
|
|
@ -257,17 +257,17 @@ public class XdsTestUtils {
|
||||||
|
|
||||||
// Need to create endpoints to create locality endpoints map to create edsUpdate
|
// Need to create endpoints to create locality endpoints map to create edsUpdate
|
||||||
Map<Locality, LocalityLbEndpoints> lbEndpointsMap = new HashMap<>();
|
Map<Locality, LocalityLbEndpoints> lbEndpointsMap = new HashMap<>();
|
||||||
LbEndpoint lbEndpoint =
|
LbEndpoint lbEndpoint = LbEndpoint.create(
|
||||||
LbEndpoint.create(serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME);
|
serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME, ImmutableMap.of());
|
||||||
lbEndpointsMap.put(
|
lbEndpointsMap.put(
|
||||||
Locality.create("", "", ""),
|
Locality.create("", "", ""),
|
||||||
LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0));
|
LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0, ImmutableMap.of()));
|
||||||
|
|
||||||
// Need to create EdsUpdate to create CdsUpdate to create XdsClusterConfig for builder
|
// Need to create EdsUpdate to create CdsUpdate to create XdsClusterConfig for builder
|
||||||
XdsEndpointResource.EdsUpdate edsUpdate = new XdsEndpointResource.EdsUpdate(
|
XdsEndpointResource.EdsUpdate edsUpdate = new XdsEndpointResource.EdsUpdate(
|
||||||
EDS_NAME, lbEndpointsMap, Collections.emptyList());
|
EDS_NAME, lbEndpointsMap, Collections.emptyList());
|
||||||
XdsClusterResource.CdsUpdate cdsUpdate = XdsClusterResource.CdsUpdate.forEds(
|
XdsClusterResource.CdsUpdate cdsUpdate = XdsClusterResource.CdsUpdate.forEds(
|
||||||
CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null)
|
CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null, false)
|
||||||
.lbPolicyConfig(getWrrLbConfigAsMap()).build();
|
.lbPolicyConfig(getWrrLbConfigAsMap()).build();
|
||||||
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
|
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
|
||||||
CLUSTER_NAME, cdsUpdate, new EndpointConfig(StatusOr.fromValue(edsUpdate)));
|
CLUSTER_NAME, cdsUpdate, new EndpointConfig(StatusOr.fromValue(edsUpdate)));
|
||||||
|
|
|
||||||
|
|
@ -86,6 +86,7 @@ envoy/extensions/load_balancing_policies/pick_first/v3/pick_first.proto
|
||||||
envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto
|
envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto
|
||||||
envoy/extensions/load_balancing_policies/round_robin/v3/round_robin.proto
|
envoy/extensions/load_balancing_policies/round_robin/v3/round_robin.proto
|
||||||
envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto
|
envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto
|
||||||
|
envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.proto
|
||||||
envoy/extensions/transport_sockets/tls/v3/cert.proto
|
envoy/extensions/transport_sockets/tls/v3/cert.proto
|
||||||
envoy/extensions/transport_sockets/tls/v3/common.proto
|
envoy/extensions/transport_sockets/tls/v3/common.proto
|
||||||
envoy/extensions/transport_sockets/tls/v3/secret.proto
|
envoy/extensions/transport_sockets/tls/v3/secret.proto
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,38 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package envoy.extensions.transport_sockets.http_11_proxy.v3;
|
||||||
|
|
||||||
|
import "envoy/config/core/v3/base.proto";
|
||||||
|
|
||||||
|
import "udpa/annotations/status.proto";
|
||||||
|
|
||||||
|
option java_package = "io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3";
|
||||||
|
option java_outer_classname = "UpstreamHttp11ConnectProto";
|
||||||
|
option java_multiple_files = true;
|
||||||
|
option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/http_11_proxy/v3;http_11_proxyv3";
|
||||||
|
option (udpa.annotations.file_status).package_version_status = ACTIVE;
|
||||||
|
|
||||||
|
// [#protodoc-title: Upstream HTTP/1.1 Proxy]
|
||||||
|
// [#extension: envoy.transport_sockets.http_11_proxy]
|
||||||
|
|
||||||
|
// HTTP/1.1 proxy transport socket establishes an upstream connection to a proxy address
|
||||||
|
// instead of the target host's address. This behavior is triggered when the transport
|
||||||
|
// socket is configured and proxy information is provided.
|
||||||
|
//
|
||||||
|
// Behavior when proxying:
|
||||||
|
// =======================
|
||||||
|
// When an upstream connection is established, instead of connecting directly to the endpoint
|
||||||
|
// address, the client will connect to the specified proxy address, send an HTTP/1.1 ``CONNECT`` request
|
||||||
|
// indicating the endpoint address, and process the response. If the response has HTTP status 200,
|
||||||
|
// the connection will be passed down to the underlying transport socket.
|
||||||
|
//
|
||||||
|
// Configuring proxy information:
|
||||||
|
// ==============================
|
||||||
|
// Set ``typed_filter_metadata`` in :ref:`LbEndpoint.Metadata <envoy_v3_api_field_config.endpoint.v3.lbendpoint.metadata>` or :ref:`LocalityLbEndpoints.Metadata <envoy_v3_api_field_config.endpoint.v3.LocalityLbEndpoints.metadata>`.
|
||||||
|
// using the key ``envoy.http11_proxy_transport_socket.proxy_address`` and the
|
||||||
|
// proxy address in ``config::core::v3::Address`` format.
|
||||||
|
//
|
||||||
|
message Http11ProxyUpstreamTransport {
|
||||||
|
// The underlying transport socket being wrapped. Defaults to plaintext (raw_buffer) if unset.
|
||||||
|
config.core.v3.TransportSocket transport_socket = 1;
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue