xds: HttpFilter support

This commit is contained in:
ZHANG Dapeng 2021-03-17 16:37:13 -07:00 committed by GitHub
parent 69587c5239
commit 3ebb3e1924
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1554 additions and 954 deletions

View File

@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.EnvoyServerProtoData.TRANSPORT_SOCKET_NAME_TLS;
import com.github.udpa.udpa.type.v1.TypedStruct;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Stopwatch;
@ -27,6 +28,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.Durations;
import com.google.re2j.Pattern;
import com.google.re2j.PatternSyntaxException;
@ -41,9 +43,7 @@ import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
@ -52,14 +52,14 @@ import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.HttpFault.FaultAbort;
import io.grpc.xds.HttpFault.FaultDelay;
import io.grpc.xds.Filter.ConfigOrError;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Filter.NamedFilterConfig;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
import io.grpc.xds.Matchers.FractionMatcher;
@ -96,7 +96,6 @@ final class ClientXdsClient extends AbstractXdsClient {
static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
@VisibleForTesting
static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate";
private static final String HTTP_FAULT_FILTER_NAME = "envoy.fault";
@VisibleForTesting
static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id";
@VisibleForTesting
@ -117,6 +116,10 @@ final class ClientXdsClient extends AbstractXdsClient {
"type.googleapis.com/envoy.config.cluster.aggregate.v2alpha.ClusterConfig";
private static final String TYPE_URL_CLUSTER_CONFIG =
"type.googleapis.com/envoy.extensions.clusters.aggregate.v3.ClusterConfig";
private static final String TYPE_URL_TYPED_STRUCT =
"type.googleapis.com/udpa.type.v1.TypedStruct";
private static final String TYPE_URL_FILTER_CONFIG =
"type.googleapis.com/envoy.config.route.v3.FilterConfig";
private final Map<String, ResourceSubscriber> ldsResourceSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> rdsResourceSubscribers = new HashMap<>();
@ -140,8 +143,12 @@ final class ClientXdsClient extends AbstractXdsClient {
// Unpack Listener messages.
List<Listener> listeners = new ArrayList<>(resources.size());
List<String> listenerNames = new ArrayList<>(resources.size());
boolean isResourceV3 = false;
try {
for (Any res : resources) {
if (res.getTypeUrl().equals(ResourceType.LDS.typeUrl())) {
isResourceV3 = true;
}
Listener listener = unpackCompatibleType(res, Listener.class, ResourceType.LDS.typeUrl(),
ResourceType.LDS.typeUrlV2());
listeners.add(listener);
@ -189,35 +196,34 @@ final class ClientXdsClient extends AbstractXdsClient {
maxStreamDuration = Durations.toNanos(options.getMaxStreamDuration());
}
}
boolean hasFaultInjection = false;
HttpFault httpFault = null;
if (enableFaultInjection) {
List<HttpFilter> httpFilters = hcm.getHttpFiltersList();
for (HttpFilter httpFilter : httpFilters) {
if (HTTP_FAULT_FILTER_NAME.equals(httpFilter.getName())) {
hasFaultInjection = true;
if (httpFilter.hasTypedConfig()) {
StructOrError<HttpFault> httpFaultOrError =
decodeFaultFilterConfig(httpFilter.getTypedConfig());
if (httpFaultOrError != null) {
if (httpFaultOrError.getErrorDetail() != null) {
nackResponse(ResourceType.LDS, nonce,
"Listener " + listenerName + " contains invalid HttpFault filter: "
+ httpFaultOrError.getErrorDetail());
return;
}
httpFault = httpFaultOrError.getStruct();
}
}
break;
boolean parseFilter = enableFaultInjection && isResourceV3;
List<NamedFilterConfig> filterChain = null;
if (parseFilter) {
filterChain = new ArrayList<>();
List<io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter>
httpFilters = hcm.getHttpFiltersList();
for (io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter
httpFilter : httpFilters) {
String filterName = httpFilter.getName();
StructOrError<FilterConfig> filterConfig = parseHttpFilter(httpFilter);
if (filterConfig == null) {
continue;
}
if (filterConfig.errorDetail != null) {
nackResponse(
ResourceType.LDS, nonce,
"Error parsing HttpFilter: " + filterConfig.errorDetail);
return;
}
filterChain.add(new NamedFilterConfig(filterName, filterConfig.struct));
}
}
if (hcm.hasRouteConfig()) {
List<VirtualHost> virtualHosts = new ArrayList<>();
for (io.envoyproxy.envoy.config.route.v3.VirtualHost virtualHostProto
: hcm.getRouteConfig().getVirtualHostsList()) {
StructOrError<VirtualHost> virtualHost = parseVirtualHost(virtualHostProto);
StructOrError<VirtualHost> virtualHost = parseVirtualHost(virtualHostProto, parseFilter);
if (virtualHost.getErrorDetail() != null) {
nackResponse(ResourceType.LDS, nonce,
"Listener " + listenerName + " contains invalid virtual host: "
@ -226,7 +232,7 @@ final class ClientXdsClient extends AbstractXdsClient {
}
virtualHosts.add(virtualHost.getStruct());
}
update = new LdsUpdate(maxStreamDuration, virtualHosts, hasFaultInjection, httpFault);
update = new LdsUpdate(maxStreamDuration, virtualHosts, filterChain);
} else if (hcm.hasRds()) {
Rds rds = hcm.getRds();
if (!rds.getConfigSource().hasAds()) {
@ -234,8 +240,8 @@ final class ClientXdsClient extends AbstractXdsClient {
"Listener " + listenerName + " with RDS config_source not set to ADS");
return;
}
update = new LdsUpdate(
maxStreamDuration, rds.getRouteConfigName(), hasFaultInjection, httpFault);
update =
new LdsUpdate(maxStreamDuration, rds.getRouteConfigName(), filterChain);
rdsNames.add(rds.getRouteConfigName());
} else {
nackResponse(ResourceType.LDS, nonce,
@ -277,6 +283,77 @@ final class ClientXdsClient extends AbstractXdsClient {
}
}
@VisibleForTesting
@Nullable // Returns null if the filter is optional but not supported.
static StructOrError<FilterConfig> parseHttpFilter(
io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter
httpFilter) {
String filterName = httpFilter.getName();
boolean isOptional = httpFilter.getIsOptional();
if (!httpFilter.hasTypedConfig()) {
if (isOptional) {
return null;
} else {
return StructOrError.fromError(
"HttpFilter [" + filterName + "] is not optional and has no typed config");
}
}
return parseRawFilterConfig(filterName, httpFilter.getTypedConfig(), isOptional, false);
}
@Nullable // Returns null if the filter should be ignored.
private static StructOrError<FilterConfig> parseRawFilterConfig(
String filterName, Any anyConfig, Boolean isOptional, boolean isOverrideConfig) {
checkArgument(
isOptional != null || isOverrideConfig, "isOptional can't be null for top-level config");
String typeUrl = anyConfig.getTypeUrl();
if (isOverrideConfig) {
isOptional = false;
if (typeUrl.equals(TYPE_URL_FILTER_CONFIG)) {
io.envoyproxy.envoy.config.route.v3.FilterConfig filterConfig;
try {
filterConfig =
anyConfig.unpack(io.envoyproxy.envoy.config.route.v3.FilterConfig.class);
} catch (InvalidProtocolBufferException e) {
return StructOrError.fromError(
"HttpFilter [" + filterName + "] contains invalid proto: " + e);
}
isOptional = filterConfig.getIsOptional();
anyConfig = filterConfig.getConfig();
typeUrl = anyConfig.getTypeUrl();
}
}
Message rawConfig = anyConfig;
if (anyConfig.getTypeUrl().equals(TYPE_URL_TYPED_STRUCT)) {
TypedStruct typedStruct;
try {
typedStruct = anyConfig.unpack(TypedStruct.class);
} catch (InvalidProtocolBufferException e) {
return StructOrError.fromError(
"HttpFilter [" + filterName + "] contains invalid proto: " + e);
}
typeUrl = typedStruct.getTypeUrl();
rawConfig = typedStruct.getValue();
}
Filter filter = FilterRegistry.getDefaultRegistry().get(typeUrl);
if (filter == null) {
if (isOptional) {
return null;
} else {
return StructOrError.fromError(
"HttpFilter [" + filterName + "] is not optional and has an unsupported config type: "
+ typeUrl);
}
}
ConfigOrError<? extends FilterConfig> filterConfig = isOverrideConfig
? filter.parseFilterConfigOverride(rawConfig) : filter.parseFilterConfig(rawConfig);
if (filterConfig.errorDetail != null) {
return StructOrError.fromError(
"Invalid filter config for HttpFilter [" + filterName + "]: " + filterConfig.errorDetail);
}
return StructOrError.fromStruct(filterConfig.config);
}
@VisibleForTesting static StructOrError<EnvoyServerProtoData.Listener> parseServerSideListener(
Listener listener) {
try {
@ -291,11 +368,11 @@ final class ClientXdsClient extends AbstractXdsClient {
}
private static StructOrError<VirtualHost> parseVirtualHost(
io.envoyproxy.envoy.config.route.v3.VirtualHost proto) {
io.envoyproxy.envoy.config.route.v3.VirtualHost proto, boolean parseFilter) {
String name = proto.getName();
List<Route> routes = new ArrayList<>(proto.getRoutesCount());
for (io.envoyproxy.envoy.config.route.v3.Route routeProto : proto.getRoutesList()) {
StructOrError<Route> route = parseRoute(routeProto);
StructOrError<Route> route = parseRoute(routeProto, parseFilter);
if (route == null) {
continue;
}
@ -305,27 +382,43 @@ final class ClientXdsClient extends AbstractXdsClient {
}
routes.add(route.getStruct());
}
HttpFault httpFault = null;
Map<String, Any> filterConfigMap = proto.getTypedPerFilterConfigMap();
if (filterConfigMap.containsKey(HTTP_FAULT_FILTER_NAME)) {
Any rawFaultFilterConfig = filterConfigMap.get(HTTP_FAULT_FILTER_NAME);
StructOrError<HttpFault> httpFaultOrError = decodeFaultFilterConfig(rawFaultFilterConfig);
if (httpFaultOrError != null) {
if (httpFaultOrError.getErrorDetail() != null) {
return StructOrError.fromError(
"Virtual host [" + name + "] contains invalid HttpFault filter : "
+ httpFaultOrError.getErrorDetail());
}
httpFault = httpFaultOrError.getStruct();
}
if (!parseFilter) {
return StructOrError.fromStruct(VirtualHost.create(
name, proto.getDomainsList(), routes, new HashMap<String, FilterConfig>()));
}
StructOrError<Map<String, FilterConfig>> overrideConfigs =
parseOverrideFilterConfigs(proto.getTypedPerFilterConfigMap());
if (overrideConfigs.errorDetail != null) {
return StructOrError.fromError(
"VirtualHost [" + proto.getName() + "] contains invalid HttpFilter config: "
+ overrideConfigs.errorDetail);
}
return StructOrError.fromStruct(VirtualHost.create(
name, proto.getDomainsList(), routes, httpFault));
name, proto.getDomainsList(), routes, overrideConfigs.struct));
}
@VisibleForTesting
static StructOrError<Map<String, FilterConfig>> parseOverrideFilterConfigs(
Map<String, Any> rawFilterConfigMap) {
Map<String, FilterConfig> overrideConfigs = new HashMap<>();
for (String name : rawFilterConfigMap.keySet()) {
Any anyConfig = rawFilterConfigMap.get(name);
StructOrError<FilterConfig> filterConfig = parseRawFilterConfig(name, anyConfig, null, true);
if (filterConfig == null) {
continue;
}
if (filterConfig.errorDetail != null) {
return StructOrError.fromError(filterConfig.errorDetail);
}
overrideConfigs.put(name, filterConfig.struct);
}
return StructOrError.fromStruct(overrideConfigs);
}
@VisibleForTesting
@Nullable
static StructOrError<Route> parseRoute(io.envoyproxy.envoy.config.route.v3.Route proto) {
static StructOrError<Route> parseRoute(
io.envoyproxy.envoy.config.route.v3.Route proto, boolean parseFilter) {
StructOrError<RouteMatch> routeMatch = parseRouteMatch(proto.getMatch());
if (routeMatch == null) {
return null;
@ -338,7 +431,7 @@ final class ClientXdsClient extends AbstractXdsClient {
StructOrError<RouteAction> routeAction;
switch (proto.getActionCase()) {
case ROUTE:
routeAction = parseRouteAction(proto.getRoute());
routeAction = parseRouteAction(proto.getRoute(), parseFilter);
break;
case REDIRECT:
return StructOrError.fromError("Unsupported action type: redirect");
@ -357,23 +450,19 @@ final class ClientXdsClient extends AbstractXdsClient {
return StructOrError.fromError(
"Invalid route [" + proto.getName() + "]: " + routeAction.getErrorDetail());
}
HttpFault httpFault = null;
Map<String, Any> filterConfigMap = proto.getTypedPerFilterConfigMap();
if (filterConfigMap.containsKey(HTTP_FAULT_FILTER_NAME)) {
Any rawFaultFilterConfig = filterConfigMap.get(HTTP_FAULT_FILTER_NAME);
StructOrError<HttpFault> httpFaultOrError = decodeFaultFilterConfig(rawFaultFilterConfig);
if (httpFaultOrError != null) {
if (httpFaultOrError.getErrorDetail() != null) {
return StructOrError.fromError(
"Route [" + proto.getName() + "] contains invalid HttpFault filter: "
+ httpFaultOrError.getErrorDetail());
}
httpFault = httpFaultOrError.getStruct();
}
if (!parseFilter) {
return StructOrError.fromStruct(Route.create(
routeMatch.getStruct(), routeAction.getStruct(), new HashMap<String, FilterConfig>()));
}
StructOrError<Map<String, FilterConfig>> overrideConfigs =
parseOverrideFilterConfigs(proto.getTypedPerFilterConfigMap());
if (overrideConfigs.errorDetail != null) {
return StructOrError.fromError(
"Route [" + proto.getName() + "] contains invalid HttpFilter config: "
+ overrideConfigs.errorDetail);
}
return StructOrError.fromStruct(Route.create(
routeMatch.getStruct(), routeAction.getStruct(), httpFault));
routeMatch.getStruct(), routeAction.getStruct(), overrideConfigs.struct));
}
@VisibleForTesting
@ -499,7 +588,7 @@ final class ClientXdsClient extends AbstractXdsClient {
@VisibleForTesting
@Nullable
static StructOrError<RouteAction> parseRouteAction(
io.envoyproxy.envoy.config.route.v3.RouteAction proto) {
io.envoyproxy.envoy.config.route.v3.RouteAction proto, boolean parseFilter) {
Long timeoutNano = null;
if (proto.hasMaxStreamDuration()) {
io.envoyproxy.envoy.config.route.v3.RouteAction.MaxStreamDuration maxStreamDuration
@ -557,7 +646,8 @@ final class ClientXdsClient extends AbstractXdsClient {
List<ClusterWeight> weightedClusters = new ArrayList<>();
for (io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight clusterWeight
: clusterWeights) {
StructOrError<ClusterWeight> clusterWeightOrError = parseClusterWeight(clusterWeight);
StructOrError<ClusterWeight> clusterWeightOrError =
parseClusterWeight(clusterWeight, parseFilter);
if (clusterWeightOrError.getErrorDetail() != null) {
return StructOrError.fromError("RouteAction contains invalid ClusterWeight: "
+ clusterWeightOrError.getErrorDetail());
@ -576,130 +666,33 @@ final class ClientXdsClient extends AbstractXdsClient {
@VisibleForTesting
static StructOrError<ClusterWeight> parseClusterWeight(
io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight proto) {
HttpFault httpFault = null;
Map<String, Any> filterConfigMap = proto.getTypedPerFilterConfigMap();
if (filterConfigMap.containsKey(HTTP_FAULT_FILTER_NAME)) {
Any rawFaultFilterConfig = filterConfigMap.get(HTTP_FAULT_FILTER_NAME);
StructOrError<HttpFault> httpFaultOrError = decodeFaultFilterConfig(rawFaultFilterConfig);
if (httpFaultOrError != null) {
if (httpFaultOrError.getErrorDetail() != null) {
return StructOrError.fromError(
"ClusterWeight [" + proto.getName() + "] contains invalid HttpFault filter: "
+ httpFaultOrError.getErrorDetail());
}
httpFault = httpFaultOrError.getStruct();
}
io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight proto,
boolean parseFilter) {
if (!parseFilter) {
return StructOrError.fromStruct(ClusterWeight.create(
proto.getName(), proto.getWeight().getValue(), new HashMap<String, FilterConfig>()));
}
return StructOrError.fromStruct(
ClusterWeight.create(proto.getName(), proto.getWeight().getValue(), httpFault));
}
@Nullable
private static StructOrError<HttpFault> decodeFaultFilterConfig(Any rawFaultFilterConfig) {
if (!rawFaultFilterConfig.getTypeUrl().equals(
"type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault")) {
return null;
}
HTTPFault httpFaultProto;
try {
httpFaultProto = rawFaultFilterConfig.unpack(HTTPFault.class);
} catch (InvalidProtocolBufferException e) {
return StructOrError.fromError("Invalid proto: " + e);
}
return parseHttpFault(httpFaultProto);
}
private static StructOrError<HttpFault> parseHttpFault(HTTPFault httpFault) {
FaultDelay faultDelay = null;
FaultAbort faultAbort = null;
if (httpFault.hasDelay()) {
faultDelay = parseFaultDelay(httpFault.getDelay());
}
if (httpFault.hasAbort()) {
StructOrError<FaultAbort> faultAbortOrError = parseFaultAbort(httpFault.getAbort());
if (faultAbortOrError.getErrorDetail() != null) {
return StructOrError.fromError(
"HttpFault contains invalid FaultAbort: " + faultAbortOrError.getErrorDetail());
}
faultAbort = faultAbortOrError.getStruct();
}
if (faultDelay == null && faultAbort == null) {
StructOrError<Map<String, FilterConfig>> overrideConfigs =
parseOverrideFilterConfigs(proto.getTypedPerFilterConfigMap());
if (overrideConfigs.errorDetail != null) {
return StructOrError.fromError(
"Invalid HttpFault: neither fault_delay nor fault_abort is specified");
}
String upstreamCluster = httpFault.getUpstreamCluster();
List<String> downstreamNodes = httpFault.getDownstreamNodesList();
List<HeaderMatcher> headers = new ArrayList<>();
for (io.envoyproxy.envoy.config.route.v3.HeaderMatcher proto : httpFault.getHeadersList()) {
StructOrError<HeaderMatcher> headerMatcherOrError = parseHeaderMatcher(proto);
if (headerMatcherOrError.getErrorDetail() != null) {
return StructOrError.fromError(
"HttpFault contains invalid header matcher: "
+ headerMatcherOrError.getErrorDetail());
}
headers.add(headerMatcherOrError.getStruct());
}
Integer maxActiveFaults = null;
if (httpFault.hasMaxActiveFaults()) {
maxActiveFaults = httpFault.getMaxActiveFaults().getValue();
if (maxActiveFaults < 0) {
maxActiveFaults = Integer.MAX_VALUE;
}
}
return StructOrError.fromStruct(HttpFault.create(
faultDelay, faultAbort, upstreamCluster, downstreamNodes, headers, maxActiveFaults));
}
private static FaultDelay parseFaultDelay(
io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) {
HttpFault.FractionalPercent percent = parsePercent(faultDelay.getPercentage());
if (faultDelay.hasHeaderDelay()) {
return FaultDelay.forHeader(percent);
}
return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent);
}
@VisibleForTesting
static StructOrError<FaultAbort> parseFaultAbort(
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) {
HttpFault.FractionalPercent percent = parsePercent(faultAbort.getPercentage());
switch (faultAbort.getErrorTypeCase()) {
case HEADER_ABORT:
return StructOrError.fromStruct(FaultAbort.forHeader(percent));
case HTTP_STATUS:
return StructOrError.fromStruct(FaultAbort.forStatus(
GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent));
case GRPC_STATUS:
return StructOrError.fromStruct(FaultAbort.forStatus(
Status.fromCodeValue(faultAbort.getGrpcStatus()), percent));
case ERRORTYPE_NOT_SET:
default:
return StructOrError.fromError(
"Unknown error type case: " + faultAbort.getErrorTypeCase());
}
}
private static HttpFault.FractionalPercent parsePercent(FractionalPercent proto) {
switch (proto.getDenominator()) {
case HUNDRED:
return HttpFault.FractionalPercent.perHundred(proto.getNumerator());
case TEN_THOUSAND:
return HttpFault.FractionalPercent.perTenThousand(proto.getNumerator());
case MILLION:
return HttpFault.FractionalPercent.perMillion(proto.getNumerator());
case UNRECOGNIZED:
default:
throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator());
"ClusterWeight [" + proto.getName() + "] contains invalid HttpFilter config: "
+ overrideConfigs.errorDetail);
}
return StructOrError.fromStruct(ClusterWeight.create(
proto.getName(), proto.getWeight().getValue(), overrideConfigs.struct));
}
@Override
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
// Unpack RouteConfiguration messages.
Map<String, RouteConfiguration> routeConfigs = new HashMap<>(resources.size());
boolean isResourceV3 = false;
try {
for (Any res : resources) {
if (res.getTypeUrl().equals(ResourceType.RDS.typeUrl())) {
isResourceV3 = true;
}
RouteConfiguration rc =
unpackCompatibleType(res, RouteConfiguration.class, ResourceType.RDS.typeUrl(),
ResourceType.RDS.typeUrlV2());
@ -715,6 +708,7 @@ final class ClientXdsClient extends AbstractXdsClient {
XdsLogLevel.INFO, "Received RDS response for resources: {0}", routeConfigs.keySet());
Map<String, RdsUpdate> rdsUpdates = new HashMap<>();
boolean parseFilter = enableFaultInjection && isResourceV3;
for (Map.Entry<String, RouteConfiguration> entry : routeConfigs.entrySet()) {
String routeConfigName = entry.getKey();
RouteConfiguration routeConfig = entry.getValue();
@ -722,7 +716,7 @@ final class ClientXdsClient extends AbstractXdsClient {
new ArrayList<>(routeConfig.getVirtualHostsCount());
for (io.envoyproxy.envoy.config.route.v3.VirtualHost virtualHostProto
: routeConfig.getVirtualHostsList()) {
StructOrError<VirtualHost> virtualHost = parseVirtualHost(virtualHostProto);
StructOrError<VirtualHost> virtualHost = parseVirtualHost(virtualHostProto, parseFilter);
if (virtualHost.getErrorDetail() != null) {
nackResponse(ResourceType.RDS, nonce, "RouteConfiguration " + routeConfigName
+ " contains invalid virtual host: " + virtualHost.getErrorDetail());

View File

@ -19,35 +19,31 @@ package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import io.grpc.Status;
import io.grpc.xds.Matchers.HeaderMatcher;
import java.util.List;
import io.grpc.xds.Filter.FilterConfig;
import javax.annotation.Nullable;
/** Fault injection configurations. */
@AutoValue
abstract class HttpFault {
abstract class FaultConfig implements FilterConfig {
@Nullable
abstract FaultDelay faultDelay();
@Nullable
abstract FaultAbort faultAbort();
abstract String upstreamCluster();
abstract ImmutableList<String> downstreamNodes();
abstract ImmutableList<HeaderMatcher> headers();
@Nullable
abstract Integer maxActiveFaults();
static HttpFault create(@Nullable FaultDelay faultDelay, @Nullable FaultAbort faultAbort,
String upstreamCluster, List<String> downstreamNodes, List<HeaderMatcher> headers,
@Override
public final String typeUrl() {
return FaultFilter.TYPE_URL;
}
static FaultConfig create(
@Nullable FaultDelay faultDelay, @Nullable FaultAbort faultAbort,
@Nullable Integer maxActiveFaults) {
return new AutoValue_HttpFault(faultDelay, faultAbort, upstreamCluster,
ImmutableList.copyOf(downstreamNodes), ImmutableList.copyOf(headers), maxActiveFaults);
return new AutoValue_FaultConfig(faultDelay, faultAbort, maxActiveFaults);
}
/** Fault configurations for aborting requests. */
@ -70,7 +66,7 @@ abstract class HttpFault {
private static FaultDelay create(
@Nullable Long delayNanos, boolean headerDelay, FractionalPercent percent) {
return new AutoValue_HttpFault_FaultDelay(delayNanos, headerDelay, percent);
return new AutoValue_FaultConfig_FaultDelay(delayNanos, headerDelay, percent);
}
}
@ -95,7 +91,7 @@ abstract class HttpFault {
private static FaultAbort create(
@Nullable Status status, boolean headerAbort, FractionalPercent percent) {
return new AutoValue_HttpFault_FaultAbort(status, headerAbort, percent);
return new AutoValue_FaultConfig_FaultAbort(status, headerAbort, percent);
}
}
@ -123,7 +119,7 @@ abstract class HttpFault {
static FractionalPercent create(
int numerator, DenominatorType denominatorType) {
return new AutoValue_HttpFault_FractionalPercent(numerator, denominatorType);
return new AutoValue_FaultConfig_FractionalPercent(numerator, denominatorType);
}
}
}

View File

@ -0,0 +1,418 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.DelayedClientCall;
import io.grpc.internal.GrpcUtil;
import io.grpc.xds.FaultConfig.FaultAbort;
import io.grpc.xds.FaultConfig.FaultDelay;
import io.grpc.xds.Filter.ClientInterceptorBuilder;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
/** HttpFault filter implementation. */
final class FaultFilter implements Filter, ClientInterceptorBuilder {
static final FaultFilter INSTANCE =
new FaultFilter(ThreadSafeRandomImpl.instance, new AtomicLong());
@VisibleForTesting
static final Metadata.Key<String> HEADER_DELAY_KEY =
Metadata.Key.of("x-envoy-fault-delay-request", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static final Metadata.Key<String> HEADER_DELAY_PERCENTAGE_KEY =
Metadata.Key.of("x-envoy-fault-delay-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static final Metadata.Key<String> HEADER_ABORT_HTTP_STATUS_KEY =
Metadata.Key.of("x-envoy-fault-abort-request", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static final Metadata.Key<String> HEADER_ABORT_GRPC_STATUS_KEY =
Metadata.Key.of("x-envoy-fault-abort-grpc-request", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static final Metadata.Key<String> HEADER_ABORT_PERCENTAGE_KEY =
Metadata.Key.of("x-envoy-fault-abort-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
static final String TYPE_URL =
"type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault";
private final ThreadSafeRandom random;
private final AtomicLong activeFaultCounter;
@VisibleForTesting
FaultFilter(ThreadSafeRandom random, AtomicLong activeFaultCounter) {
this.random = random;
this.activeFaultCounter = activeFaultCounter;
}
@Override
public String[] typeUrls() {
return new String[] { TYPE_URL };
}
@Override
public ConfigOrError<FaultConfig> parseFilterConfig(Message rawProtoMessage) {
HTTPFault httpFaultProto;
if (!(rawProtoMessage instanceof Any)) {
return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
}
Any anyMessage = (Any) rawProtoMessage;
try {
httpFaultProto = anyMessage.unpack(HTTPFault.class);
} catch (InvalidProtocolBufferException e) {
return ConfigOrError.fromError("Invalid proto: " + e);
}
return parseHttpFault(httpFaultProto);
}
private static ConfigOrError<FaultConfig> parseHttpFault(HTTPFault httpFault) {
FaultDelay faultDelay = null;
FaultAbort faultAbort = null;
if (httpFault.hasDelay()) {
faultDelay = parseFaultDelay(httpFault.getDelay());
}
if (httpFault.hasAbort()) {
ConfigOrError<FaultAbort> faultAbortOrError = parseFaultAbort(httpFault.getAbort());
if (faultAbortOrError.errorDetail != null) {
return ConfigOrError.fromError(
"HttpFault contains invalid FaultAbort: " + faultAbortOrError.errorDetail);
}
faultAbort = faultAbortOrError.config;
}
Integer maxActiveFaults = null;
if (httpFault.hasMaxActiveFaults()) {
maxActiveFaults = httpFault.getMaxActiveFaults().getValue();
if (maxActiveFaults < 0) {
maxActiveFaults = Integer.MAX_VALUE;
}
}
return ConfigOrError.fromConfig(FaultConfig.create(faultDelay, faultAbort, maxActiveFaults));
}
private static FaultDelay parseFaultDelay(
io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) {
FaultConfig.FractionalPercent percent = parsePercent(faultDelay.getPercentage());
if (faultDelay.hasHeaderDelay()) {
return FaultDelay.forHeader(percent);
}
return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent);
}
@VisibleForTesting
static ConfigOrError<FaultAbort> parseFaultAbort(
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) {
FaultConfig.FractionalPercent percent = parsePercent(faultAbort.getPercentage());
switch (faultAbort.getErrorTypeCase()) {
case HEADER_ABORT:
return ConfigOrError.fromConfig(FaultAbort.forHeader(percent));
case HTTP_STATUS:
return ConfigOrError.fromConfig(FaultAbort.forStatus(
GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent));
case GRPC_STATUS:
return ConfigOrError.fromConfig(FaultAbort.forStatus(
Status.fromCodeValue(faultAbort.getGrpcStatus()), percent));
case ERRORTYPE_NOT_SET:
default:
return ConfigOrError.fromError(
"Unknown error type case: " + faultAbort.getErrorTypeCase());
}
}
private static FaultConfig.FractionalPercent parsePercent(FractionalPercent proto) {
switch (proto.getDenominator()) {
case HUNDRED:
return FaultConfig.FractionalPercent.perHundred(proto.getNumerator());
case TEN_THOUSAND:
return FaultConfig.FractionalPercent.perTenThousand(proto.getNumerator());
case MILLION:
return FaultConfig.FractionalPercent.perMillion(proto.getNumerator());
case UNRECOGNIZED:
default:
throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator());
}
}
@Override
public ConfigOrError<FaultConfig> parseFilterConfigOverride(Message rawProtoMessage) {
return parseFilterConfig(rawProtoMessage);
}
@Nullable
@Override
public ClientInterceptor buildClientInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig, PickSubchannelArgs args,
final ScheduledExecutorService scheduler) {
checkNotNull(config, "config");
if (overrideConfig != null) {
config = overrideConfig;
}
FaultConfig faultConfig = (FaultConfig) config;
Long delayNanos = null;
Status abortStatus = null;
if (faultConfig.maxActiveFaults() == null
|| activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
Metadata headers = args.getHeaders();
if (faultConfig.faultDelay() != null) {
delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
}
if (faultConfig.faultAbort() != null) {
abortStatus = determineFaultAbortStatus(faultConfig.faultAbort(), headers);
}
}
if (delayNanos == null && abortStatus == null) {
return null;
}
final Long finalDelayNanos = delayNanos;
final Status finalAbortStatus = abortStatus;
final class FaultInjectionInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions,
final Channel next) {
Executor callExecutor = callOptions.getExecutor();
if (callExecutor == null) { // This should never happen in practice because
// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
// a callExecutor.
// TODO(https://github.com/grpc/grpc-java/issues/7868)
callExecutor = MoreExecutors.directExecutor();
}
if (finalDelayNanos != null && finalAbortStatus != null) {
return new DelayInjectedCall<>(
finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(),
Suppliers.ofInstance(
new FailingClientCall<ReqT, RespT>(finalAbortStatus, callExecutor)));
}
if (finalAbortStatus != null) {
return new FailingClientCall<>(finalAbortStatus, callExecutor);
} else {
return new DelayInjectedCall<>(
finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(),
new Supplier<ClientCall<ReqT, RespT>>() {
@Override
public ClientCall<ReqT, RespT> get() {
return next.newCall(method, callOptions);
}
});
}
}
}
return new FaultInjectionInterceptor();
}
@Nullable
private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) {
Long delayNanos;
FaultConfig.FractionalPercent fractionalPercent = faultDelay.percent();
if (faultDelay.headerDelay()) {
try {
int delayMillis = Integer.parseInt(headers.get(HEADER_DELAY_KEY));
delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
String delayPercentageStr = headers.get(HEADER_DELAY_PERCENTAGE_KEY);
if (delayPercentageStr != null) {
int delayPercentage = Integer.parseInt(delayPercentageStr);
if (delayPercentage >= 0 && delayPercentage < fractionalPercent.numerator()) {
fractionalPercent = FaultConfig.FractionalPercent.create(
delayPercentage, fractionalPercent.denominatorType());
}
}
} catch (NumberFormatException e) {
return null; // treated as header_delay not applicable
}
} else {
delayNanos = faultDelay.delayNanos();
}
if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
return null;
}
return delayNanos;
}
@Nullable
private Status determineFaultAbortStatus(FaultAbort faultAbort, Metadata headers) {
Status abortStatus = null;
FaultConfig.FractionalPercent fractionalPercent = faultAbort.percent();
if (faultAbort.headerAbort()) {
try {
String grpcCodeStr = headers.get(HEADER_ABORT_GRPC_STATUS_KEY);
if (grpcCodeStr != null) {
int grpcCode = Integer.parseInt(grpcCodeStr);
abortStatus = Status.fromCodeValue(grpcCode);
}
String httpCodeStr = headers.get(HEADER_ABORT_HTTP_STATUS_KEY);
if (httpCodeStr != null) {
int httpCode = Integer.parseInt(httpCodeStr);
abortStatus = GrpcUtil.httpStatusToGrpcStatus(httpCode);
}
String abortPercentageStr = headers.get(HEADER_ABORT_PERCENTAGE_KEY);
if (abortPercentageStr != null) {
int abortPercentage =
Integer.parseInt(headers.get(HEADER_ABORT_PERCENTAGE_KEY));
if (abortPercentage >= 0 && abortPercentage < fractionalPercent.numerator()) {
fractionalPercent = FaultConfig.FractionalPercent.create(
abortPercentage, fractionalPercent.denominatorType());
}
}
} catch (NumberFormatException e) {
return null; // treated as header_abort not applicable
}
} else {
abortStatus = faultAbort.status();
}
if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
return null;
}
return abortStatus;
}
private static int getRatePerMillion(FaultConfig.FractionalPercent percent) {
int numerator = percent.numerator();
FaultConfig.FractionalPercent.DenominatorType type = percent.denominatorType();
switch (type) {
case TEN_THOUSAND:
numerator *= 100;
break;
case HUNDRED:
numerator *= 10_000;
break;
case MILLION:
default:
break;
}
if (numerator > 1_000_000 || numerator < 0) {
numerator = 1_000_000;
}
return numerator;
}
/** A {@link DelayedClientCall} with a fixed delay. */
private final class DelayInjectedCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
final Object lock = new Object();
ScheduledFuture<?> delayTask;
boolean cancelled;
DelayInjectedCall(
long delayNanos, Executor callExecutor, ScheduledExecutorService scheduler,
@Nullable Deadline deadline,
final Supplier<? extends ClientCall<ReqT, RespT>> callSupplier) {
super(callExecutor, scheduler, deadline);
activeFaultCounter.incrementAndGet();
ScheduledFuture<?> task = scheduler.schedule(
new Runnable() {
@Override
public void run() {
synchronized (lock) {
if (!cancelled) {
activeFaultCounter.decrementAndGet();
}
}
setCall(callSupplier.get());
}
},
delayNanos,
NANOSECONDS);
synchronized (lock) {
if (!cancelled) {
delayTask = task;
return;
}
}
task.cancel(false);
}
@Override
protected void callCancelled() {
ScheduledFuture<?> savedDelayTask;
synchronized (lock) {
cancelled = true;
activeFaultCounter.decrementAndGet();
savedDelayTask = delayTask;
}
if (savedDelayTask != null) {
savedDelayTask.cancel(false);
}
}
}
/** An implementation of {@link ClientCall} that fails when started. */
private final class FailingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
final Status error;
final Executor callExecutor;
final Context context;
FailingClientCall(Status error, Executor callExecutor) {
this.error = error;
this.callExecutor = callExecutor;
this.context = Context.current();
}
@Override
public void start(final ClientCall.Listener<RespT> listener, Metadata headers) {
activeFaultCounter.incrementAndGet();
callExecutor.execute(
new Runnable() {
@Override
public void run() {
Context previous = context.attach();
try {
listener.onClose(error, new Metadata());
activeFaultCounter.decrementAndGet();
} finally {
context.detach(previous);
}
}
});
}
@Override
public void request(int numMessages) {}
@Override
public void cancel(String message, Throwable cause) {}
@Override
public void halfClose() {}
@Override
public void sendMessage(ReqT message) {}
}
}

View File

@ -0,0 +1,142 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import com.google.protobuf.Message;
import io.grpc.ClientInterceptor;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.ServerInterceptor;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
/**
* Defines the parsing functionality of an HTTP filter. A Filter may optionally implement either
* {@link ClientInterceptorBuilder} or {@link ServerInterceptorBuilder} or both, indicating it is
* capable of working on the client side or server side or both, respectively.
*/
interface Filter {
/**
* The proto message types supported by this filter. A filter will be registered by each of its
* supported message types.
*/
String[] typeUrls();
/**
* Parses the top-level filter config from raw proto message. The message may be either a {@link
* com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfig(Message rawProtoMessage);
/**
* Parses the per-filter override filter config from raw proto message. The message may be either
* a {@link com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage);
/** Represents an opaque data structure holding configuration for a filter. */
interface FilterConfig {
String typeUrl();
}
/** Uses the FilterConfigs produced above to produce an HTTP filter interceptor for clients. */
interface ClientInterceptorBuilder {
@Nullable
ClientInterceptor buildClientInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig, PickSubchannelArgs args,
ScheduledExecutorService scheduler);
}
// Server side filters are not currently supported, but this interface is defined for clarity.
interface ServerInterceptorBuilder {
ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig);
}
// TODO(zdapeng): Unify with ClientXdsClient.StructOrError, or just have parseFilterConfig() throw
// certain types of Exception.
final class ConfigOrError<T> {
/**
* Returns a {@link ConfigOrError} for the successfully converted data object.
*/
static <T> ConfigOrError<T> fromConfig(T config) {
return new ConfigOrError<>(config);
}
/**
* Returns a {@link ConfigOrError} for the failure to convert the data object.
*/
static <T> ConfigOrError<T> fromError(String errorDetail) {
return new ConfigOrError<>(errorDetail);
}
final String errorDetail;
final T config;
private ConfigOrError(T config) {
this.config = checkNotNull(config, "config");
this.errorDetail = null;
}
private ConfigOrError(String errorDetail) {
this.config = null;
this.errorDetail = checkNotNull(errorDetail, "errorDetail");
}
}
/** Filter config with instance name. */
final class NamedFilterConfig {
// filter instance name
final String name;
final FilterConfig filterConfig;
NamedFilterConfig(String name, FilterConfig filterConfig) {
this.name = name;
this.filterConfig = filterConfig;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NamedFilterConfig that = (NamedFilterConfig) o;
return Objects.equals(name, that.name)
&& Objects.equals(filterConfig, that.filterConfig);
}
@Override
public int hashCode() {
return Objects.hash(name, filterConfig);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", name)
.add("filterConfig", filterConfig)
.toString();
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
/**
* A registry for all supported {@link Filter}s. Filters can be queried from the registry
* by any of the {@link Filter#typeUrls() type URLs}.
*/
final class FilterRegistry {
private static FilterRegistry instance;
private final Map<String, Filter> supportedFilters = new HashMap<>();
private FilterRegistry() {}
static synchronized FilterRegistry getDefaultRegistry() {
if (instance == null) {
instance = newRegistry().register(FaultFilter.INSTANCE, RouterFilter.INSTANCE);
}
return instance;
}
@VisibleForTesting
static FilterRegistry newRegistry() {
return new FilterRegistry();
}
@VisibleForTesting
FilterRegistry register(Filter... filters) {
for (Filter filter : filters) {
for (String typeUrl : filter.typeUrls()) {
supportedFilters.put(typeUrl, filter);
}
}
return this;
}
@Nullable
Filter get(String typeUrl) {
return supportedFilters.get(typeUrl);
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Message;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.xds.Filter.ClientInterceptorBuilder;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
/**
* A filter that fails all RPCs. To be added to the end of filter chain if RouterFilter is absent.
*/
enum LameFilter implements Filter, ClientInterceptorBuilder {
INSTANCE;
static final FilterConfig LAME_CONFIG = new FilterConfig() {
@Override
public String typeUrl() {
throw new UnsupportedOperationException("shouldn't be called");
}
@Override
public String toString() {
return "LAME_CONFIG";
}
};
@Override
public String[] typeUrls() {
return new String[0];
}
@Override
public ConfigOrError<? extends FilterConfig> parseFilterConfig(Message rawProtoMessage) {
throw new UnsupportedOperationException();
}
@Override
public ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage) {
throw new UnsupportedOperationException();
}
@Nullable
@Override
public ClientInterceptor buildClientInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig, PickSubchannelArgs args,
ScheduledExecutorService scheduler) {
class LameInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions, Channel next) {
final Context context = Context.current();
return new ClientCall<ReqT, RespT>() {
@Override
public void start(final Listener<RespT> listener, Metadata headers) {
Executor callExecutor = callOptions.getExecutor();
if (callExecutor == null) { // This should never happen in practice because
// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
// a callExecutor.
// TODO(https://github.com/grpc/grpc-java/issues/7868)
callExecutor = MoreExecutors.directExecutor();
}
callExecutor.execute(
new Runnable() {
@Override
public void run() {
Context previous = context.attach();
try {
listener.onClose(
Status.UNAVAILABLE.withDescription("No router filter"), new Metadata());
} finally {
context.detach(previous);
}
}
});
}
@Override
public void request(int numMessages) {}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {}
@Override
public void halfClose() {}
@Override
public void sendMessage(ReqT message) {}
};
}
}
return new LameInterceptor();
}
}

View File

@ -24,7 +24,9 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext;
@ -41,6 +43,8 @@ final class MessagePrinter {
.add(Listener.getDescriptor())
.add(io.envoyproxy.envoy.api.v2.Listener.getDescriptor())
.add(HttpConnectionManager.getDescriptor())
.add(HttpFilter.getDescriptor())
.add(HTTPFault.getDescriptor())
.add(io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2
.HttpConnectionManager.getDescriptor())
// UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported

View File

@ -0,0 +1,69 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import com.google.protobuf.Message;
import io.grpc.ClientInterceptor;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.xds.Filter.ClientInterceptorBuilder;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
/**
* Router filter implementation. Currently this filter does not parse any field in the config.
*/
enum RouterFilter implements Filter, ClientInterceptorBuilder {
INSTANCE;
static final String TYPE_URL =
"type.googleapis.com/envoy.extensions.filters.http.router.v3.Router";
static final FilterConfig ROUTER_CONFIG = new FilterConfig() {
@Override
public String typeUrl() {
return RouterFilter.TYPE_URL;
}
@Override
public String toString() {
return "ROUTER_CONFIG";
}
};
@Override
public String[] typeUrls() {
return new String[] { TYPE_URL };
}
@Override
public ConfigOrError<? extends FilterConfig> parseFilterConfig(Message rawProtoMessage) {
return ConfigOrError.fromConfig(ROUTER_CONFIG);
}
@Override
public ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage) {
return ConfigOrError.fromError("Router Filter should not have override config");
}
@Nullable
@Override
public ClientInterceptor buildClientInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig, PickSubchannelArgs args,
ScheduledExecutorService scheduler) {
return null;
}
}

View File

@ -22,12 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.re2j.Pattern;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Matchers.FractionMatcher;
import io.grpc.xds.Matchers.HeaderMatcher;
import io.grpc.xds.Matchers.PathMatcher;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
/** Reprsents an upstream virtual host. */
@ -43,12 +46,13 @@ abstract class VirtualHost {
abstract ImmutableList<Route> routes();
@Nullable
abstract HttpFault httpFault();
abstract ImmutableMap<String, FilterConfig> filterConfigOverrides();
public static VirtualHost create(String name, List<String> domains, List<Route> routes,
@Nullable HttpFault httpFault) {
public static VirtualHost create(
String name, List<String> domains, List<Route> routes,
Map<String, FilterConfig> filterConfigOverrides) {
return new AutoValue_VirtualHost(name, ImmutableList.copyOf(domains),
ImmutableList.copyOf(routes), httpFault);
ImmutableList.copyOf(routes), ImmutableMap.copyOf(filterConfigOverrides));
}
@AutoValue
@ -58,11 +62,13 @@ abstract class VirtualHost {
abstract RouteAction routeAction();
@Nullable
abstract HttpFault httpFault();
abstract ImmutableMap<String, FilterConfig> filterConfigOverrides();
static Route create(RouteMatch routeMatch, RouteAction routeAction,
@Nullable HttpFault httpFault) {
return new AutoValue_VirtualHost_Route(routeMatch, routeAction, httpFault);
static Route create(
RouteMatch routeMatch, RouteAction routeAction,
Map<String, FilterConfig> filterConfigOverrides) {
return new AutoValue_VirtualHost_Route(
routeMatch, routeAction, ImmutableMap.copyOf(filterConfigOverrides));
}
@AutoValue
@ -129,11 +135,12 @@ abstract class VirtualHost {
abstract int weight();
@Nullable
abstract HttpFault httpFault();
abstract ImmutableMap<String, FilterConfig> filterConfigOverrides();
static ClusterWeight create(String name, int weight, @Nullable HttpFault httpFault) {
return new AutoValue_VirtualHost_Route_RouteAction_ClusterWeight(name, weight,
httpFault);
static ClusterWeight create(
String name, int weight, Map<String, FilterConfig> filterConfigOverrides) {
return new AutoValue_VirtualHost_Route_RouteAction_ClusterWeight(
name, weight, ImmutableMap.copyOf(filterConfigOverrides));
}
}

View File

@ -27,6 +27,7 @@ import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.EnvoyServerProtoData.Listener;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.Filter.NamedFilterConfig;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
import java.util.ArrayList;
@ -54,36 +55,32 @@ abstract class XdsClient {
// The list virtual hosts that make up the route table.
@Nullable
final List<VirtualHost> virtualHosts;
// Listener contains the HttpFault filter.
final boolean hasFaultInjection;
@Nullable // Can be null even if hasFaultInjection is true.
final HttpFault httpFault;
// Filter instance names. Null if HttpFilter support is not enabled.
@Nullable final List<NamedFilterConfig> filterChain;
// Server side Listener.
@Nullable
final Listener listener;
LdsUpdate(
long httpMaxStreamDurationNano, String rdsName, boolean hasFaultInjection,
@Nullable HttpFault httpFault) {
this(httpMaxStreamDurationNano, rdsName, null, hasFaultInjection, httpFault);
long httpMaxStreamDurationNano, String rdsName,
@Nullable List<NamedFilterConfig> filterChain) {
this(httpMaxStreamDurationNano, rdsName, null, filterChain);
}
LdsUpdate(
long httpMaxStreamDurationNano, List<VirtualHost> virtualHosts,
boolean hasFaultInjection, @Nullable HttpFault httpFault) {
this(httpMaxStreamDurationNano, null, virtualHosts, hasFaultInjection, httpFault);
@Nullable List<NamedFilterConfig> filterChain) {
this(httpMaxStreamDurationNano, null, virtualHosts, filterChain);
}
private LdsUpdate(
long httpMaxStreamDurationNano, @Nullable String rdsName,
@Nullable List<VirtualHost> virtualHosts, boolean hasFaultInjection,
@Nullable HttpFault httpFault) {
@Nullable List<VirtualHost> virtualHosts, @Nullable List<NamedFilterConfig> filterChain) {
this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
this.rdsName = rdsName;
this.virtualHosts = virtualHosts == null
? null : Collections.unmodifiableList(new ArrayList<>(virtualHosts));
this.hasFaultInjection = hasFaultInjection;
this.httpFault = httpFault;
this.filterChain = filterChain == null ? null : Collections.unmodifiableList(filterChain);
this.listener = null;
}
@ -91,15 +88,14 @@ abstract class XdsClient {
this.listener = listener;
this.httpMaxStreamDurationNano = 0L;
this.rdsName = null;
this.filterChain = null;
this.virtualHosts = null;
this.hasFaultInjection = false;
this.httpFault = null;
}
@Override
public int hashCode() {
return Objects.hash(
httpMaxStreamDurationNano, rdsName, virtualHosts, hasFaultInjection, httpFault, listener);
httpMaxStreamDurationNano, rdsName, virtualHosts, filterChain, listener);
}
@Override
@ -114,8 +110,7 @@ abstract class XdsClient {
return httpMaxStreamDurationNano == that.httpMaxStreamDurationNano
&& Objects.equals(rdsName, that.rdsName)
&& Objects.equals(virtualHosts, that.virtualHosts)
&& hasFaultInjection == that.hasFaultInjection
&& Objects.equals(httpFault, that.httpFault)
&& Objects.equals(filterChain, that.filterChain)
&& Objects.equals(listener, that.listener);
}
@ -128,15 +123,15 @@ abstract class XdsClient {
} else {
toStringHelper.add("virtualHosts", virtualHosts);
}
if (hasFaultInjection) {
toStringHelper.add("faultInjectionEnabled", true)
.add("httpFault", httpFault);
if (filterChain != null) {
toStringHelper.add("filterChain", filterChain);
}
if (listener != null) {
toStringHelper.add("listener", listener);
}
return toStringHelper.toString();
}
}
static final class RdsUpdate implements ResourceUpdate {

View File

@ -18,22 +18,18 @@ package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.ClientInterceptors;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.InternalConfigSelector;
@ -44,12 +40,11 @@ import io.grpc.MethodDescriptor;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.DelayedClientCall;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.HttpFault.FaultAbort;
import io.grpc.xds.HttpFault.FaultDelay;
import io.grpc.xds.HttpFault.FractionalPercent;
import io.grpc.xds.Filter.ClientInterceptorBuilder;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Filter.NamedFilterConfig;
import io.grpc.xds.Matchers.FractionMatcher;
import io.grpc.xds.Matchers.HeaderMatcher;
import io.grpc.xds.Matchers.PathMatcher;
@ -66,6 +61,7 @@ import io.grpc.xds.XdsClient.RdsUpdate;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -76,12 +72,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
/**
@ -98,29 +90,11 @@ final class XdsNameResolver extends NameResolver {
CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
static final CallOptions.Key<Long> RPC_HASH_KEY =
CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
private static final NamedFilterConfig LAME_FILTER =
new NamedFilterConfig(null, LameFilter.LAME_CONFIG);
@VisibleForTesting
static boolean enableTimeout =
Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
@VisibleForTesting
static final Metadata.Key<String> DOWNSTREAM_NODE_KEY =
Metadata.Key.of("x-envoy-downstream-service-node", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static final Metadata.Key<String> HEADER_DELAY_KEY =
Metadata.Key.of("x-envoy-fault-delay-request", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static final Metadata.Key<String> HEADER_DELAY_PERCENTAGE_KEY =
Metadata.Key.of("x-envoy-fault-delay-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static final Metadata.Key<String> HEADER_ABORT_HTTP_STATUS_KEY =
Metadata.Key.of("x-envoy-fault-abort-request", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static final Metadata.Key<String> HEADER_ABORT_GRPC_STATUS_KEY =
Metadata.Key.of("x-envoy-fault-abort-grpc-request", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static final Metadata.Key<String> HEADER_ABORT_PERCENTAGE_KEY =
Metadata.Key.of("x-envoy-fault-abort-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static AtomicLong activeFaultInjectedStreamCounter = new AtomicLong();
private final InternalLogId logId;
private final XdsLogger logger;
@ -130,6 +104,7 @@ final class XdsNameResolver extends NameResolver {
private final ScheduledExecutorService scheduler;
private final XdsClientPoolFactory xdsClientPoolFactory;
private final ThreadSafeRandom random;
private final FilterRegistry filterRegistry;
private final XxHash64 hashFunc = XxHash64.INSTANCE;
private final ConcurrentMap<String, AtomicInteger> clusterRefs = new ConcurrentHashMap<>();
private final ConfigSelector configSelector = new ConfigSelector();
@ -144,19 +119,22 @@ final class XdsNameResolver extends NameResolver {
XdsNameResolver(String name, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler) {
this(name, serviceConfigParser, syncContext, scheduler,
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance);
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
FilterRegistry.getDefaultRegistry());
}
@VisibleForTesting
XdsNameResolver(String name, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random) {
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
FilterRegistry filterRegistry) {
authority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.scheduler = checkNotNull(scheduler, "scheduler");
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.random = checkNotNull(random, "random");
this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
logId = InternalLogId.allocate("xds-resolver", name);
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
@ -370,18 +348,21 @@ final class XdsNameResolver extends NameResolver {
asciiHeaders.put("content-type", "application/grpc");
String cluster = null;
Route selectedRoute = null;
HttpFault selectedFaultConfig;
RoutingConfig routingCfg;
Map<String, FilterConfig> selectedOverrideConfigs;
List<ClientInterceptor> filterInterceptors = new ArrayList<>();
do {
routingCfg = routingConfig;
selectedFaultConfig = routingCfg.faultConfig;
selectedOverrideConfigs = new HashMap<>(routingCfg.virtualHostOverrideConfig);
if (routingCfg.filterChain != null
&& Iterables.getLast(routingCfg.filterChain).equals(LAME_FILTER)) {
break;
}
for (Route route : routingCfg.routes) {
if (matchRoute(route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(),
asciiHeaders, random)) {
selectedRoute = route;
if (routingCfg.applyFaultInjection && route.httpFault() != null) {
selectedFaultConfig = route.httpFault();
}
selectedOverrideConfigs.putAll(route.filterConfigOverrides());
break;
}
}
@ -403,9 +384,7 @@ final class XdsNameResolver extends NameResolver {
accumulator += weightedCluster.weight();
if (select < accumulator) {
cluster = weightedCluster.name();
if (routingCfg.applyFaultInjection && weightedCluster.httpFault() != null) {
selectedFaultConfig = weightedCluster.httpFault();
}
selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
break;
}
}
@ -414,12 +393,15 @@ final class XdsNameResolver extends NameResolver {
// TODO(chengyuanzhang): avoid service config generation and parsing for each call.
Map<String, ?> rawServiceConfig = Collections.emptyMap();
if (enableTimeout) {
Long timeoutNano = selectedRoute.routeAction().timeoutNano();
if (timeoutNano == null) {
timeoutNano = routingCfg.fallbackTimeoutNano;
Long timeoutNanos = null;
if (selectedRoute != null) {
timeoutNanos = selectedRoute.routeAction().timeoutNano();
}
if (timeoutNano > 0) {
rawServiceConfig = generateServiceConfigWithMethodTimeoutConfig(timeoutNano);
if (timeoutNanos == null) {
timeoutNanos = routingCfg.fallbackTimeoutNano;
}
if (timeoutNanos > 0) {
rawServiceConfig = generateServiceConfigWithMethodTimeoutConfig(timeoutNanos);
}
}
ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
@ -430,38 +412,33 @@ final class XdsNameResolver extends NameResolver {
parsedServiceConfig.getError().augmentDescription(
"Failed to parse service config (method config)"));
}
if (selectedFaultConfig != null && selectedFaultConfig.maxActiveFaults() != null
&& activeFaultInjectedStreamCounter.get() >= selectedFaultConfig.maxActiveFaults()) {
selectedFaultConfig = null;
}
if (selectedFaultConfig != null) {
if (!selectedFaultConfig.upstreamCluster().equals(cluster)) {
selectedFaultConfig = null;
} else if (!selectedFaultConfig.downstreamNodes().isEmpty()) {
String downstreamNode = headers.get(DOWNSTREAM_NODE_KEY);
if (downstreamNode == null
|| !selectedFaultConfig.downstreamNodes().contains(downstreamNode)) {
selectedFaultConfig = null;
if (routingCfg.filterChain != null) {
for (NamedFilterConfig namedFilter : routingCfg.filterChain) {
FilterConfig filterConfig = namedFilter.filterConfig;
Filter filter;
if (namedFilter.equals(LAME_FILTER)) {
filter = LameFilter.INSTANCE;
} else {
filter = filterRegistry.get(filterConfig.typeUrl());
}
if (filter instanceof ClientInterceptorBuilder) {
ClientInterceptor interceptor = ((ClientInterceptorBuilder) filter)
.buildClientInterceptor(
filterConfig, selectedOverrideConfigs.get(namedFilter.name),
args, scheduler);
if (interceptor != null) {
filterInterceptors.add(interceptor);
}
}
}
}
if (selectedFaultConfig != null
&& !matchHeaders(selectedFaultConfig.headers(), asciiHeaders)) {
selectedFaultConfig = null;
}
Long delayNanos = null;
Status abortStatus = null;
if (selectedFaultConfig != null) {
if (selectedFaultConfig.faultDelay() != null) {
delayNanos = determineFaultDelayNanos(selectedFaultConfig.faultDelay(), headers);
}
if (selectedFaultConfig.faultAbort() != null) {
abortStatus = determineFaultAbortStatus(selectedFaultConfig.faultAbort(), headers);
if (Iterables.getLast(routingCfg.filterChain).equals(LAME_FILTER)) {
return Result.newBuilder()
.setConfig(config)
.setInterceptor(combineInterceptors(filterInterceptors))
.build();
}
}
final String finalCluster = cluster;
final Long finalDelayNanos = delayNanos;
final Status finalAbortStatus = abortStatus;
final long hash = generateHash(selectedRoute.routeAction().hashPolicies(), asciiHeaders);
class ClusterSelectionInterceptor implements ClientInterceptor {
@Override
@ -471,70 +448,39 @@ final class XdsNameResolver extends NameResolver {
final CallOptions callOptionsForCluster =
callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
.withOption(RPC_HASH_KEY, hash);
Supplier<ClientCall<ReqT, RespT>> configApplyingCallSupplier =
new Supplier<ClientCall<ReqT, RespT>>() {
return new SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptionsForCluster)) {
@Override
public void start(Listener<RespT> listener, Metadata headers) {
listener = new SimpleForwardingClientCallListener<RespT>(listener) {
boolean committed;
@Override
public ClientCall<ReqT, RespT> get() {
return new SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptionsForCluster)) {
@Override
public void start(Listener<RespT> listener, Metadata headers) {
listener = new SimpleForwardingClientCallListener<RespT>(listener) {
boolean committed;
public void onHeaders(Metadata headers) {
committed = true;
releaseCluster(finalCluster);
delegate().onHeaders(headers);
}
@Override
public void onHeaders(Metadata headers) {
committed = true;
releaseCluster(finalCluster);
delegate().onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
if (!committed) {
releaseCluster(finalCluster);
}
delegate().onClose(status, trailers);
}
};
delegate().start(listener, headers);
}
};
@Override
public void onClose(Status status, Metadata trailers) {
if (!committed) {
releaseCluster(finalCluster);
}
delegate().onClose(status, trailers);
}
};
Executor callExecutor = callOptions.getExecutor();
if (callExecutor == null) { // This should never happen in practice because
// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
// a callExecutor.
// TODO(https://github.com/grpc/grpc-java/issues/7868)
callExecutor = MoreExecutors.directExecutor();
}
if (finalDelayNanos != null && finalAbortStatus != null) {
return new ActiveFaultCountingClientCall<>(
new DelayInjectedCall<>(
finalDelayNanos, callExecutor, scheduler, callOptionsForCluster.getDeadline(),
Suppliers.ofInstance(
new FailingClientCall<ReqT, RespT>(finalAbortStatus, callExecutor))));
}
if (finalAbortStatus != null) {
return new ActiveFaultCountingClientCall<>(
new FailingClientCall<ReqT, RespT>(finalAbortStatus, callExecutor));
}
if (finalDelayNanos != null) {
return new ActiveFaultCountingClientCall<>(
new DelayInjectedCall<>(
finalDelayNanos, callExecutor, scheduler, callOptionsForCluster.getDeadline(),
configApplyingCallSupplier));
}
return configApplyingCallSupplier.get();
delegate().start(listener, headers);
}
};
}
}
filterInterceptors.add(new ClusterSelectionInterceptor());
return
Result.newBuilder()
.setConfig(config)
.setInterceptor(new ClusterSelectionInterceptor())
.setInterceptor(combineInterceptors(filterInterceptors))
.build();
}
@ -597,198 +543,21 @@ final class XdsNameResolver extends NameResolver {
}
return hash == null ? random.nextLong() : hash;
}
@Nullable
private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) {
Long delayNanos;
FractionalPercent fractionalPercent = faultDelay.percent();
if (faultDelay.headerDelay()) {
try {
int delayMillis = Integer.parseInt(headers.get(HEADER_DELAY_KEY));
delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
String delayPercentageStr = headers.get(HEADER_DELAY_PERCENTAGE_KEY);
if (delayPercentageStr != null) {
int delayPercentage = Integer.parseInt(delayPercentageStr);
if (delayPercentage >= 0 && delayPercentage < fractionalPercent.numerator()) {
fractionalPercent =
FractionalPercent.create(delayPercentage, fractionalPercent.denominatorType());
}
}
} catch (NumberFormatException e) {
return null; // treated as header_delay not applicable
}
} else {
delayNanos = faultDelay.delayNanos();
}
if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
return null;
}
return delayNanos;
}
@Nullable
private Status determineFaultAbortStatus(FaultAbort faultAbort, Metadata headers) {
Status abortStatus = null;
FractionalPercent fractionalPercent = faultAbort.percent();
if (faultAbort.headerAbort()) {
try {
String grpcCodeStr = headers.get(HEADER_ABORT_GRPC_STATUS_KEY);
if (grpcCodeStr != null) {
int grpcCode = Integer.parseInt(grpcCodeStr);
abortStatus = Status.fromCodeValue(grpcCode);
}
String httpCodeStr = headers.get(HEADER_ABORT_HTTP_STATUS_KEY);
if (httpCodeStr != null) {
int httpCode = Integer.parseInt(httpCodeStr);
abortStatus = GrpcUtil.httpStatusToGrpcStatus(httpCode);
}
String abortPercentageStr = headers.get(HEADER_ABORT_PERCENTAGE_KEY);
if (abortPercentageStr != null) {
int abortPercentage =
Integer.parseInt(headers.get(HEADER_ABORT_PERCENTAGE_KEY));
if (abortPercentage >= 0 && abortPercentage < fractionalPercent.numerator()) {
fractionalPercent =
FractionalPercent.create(abortPercentage, fractionalPercent.denominatorType());
}
}
} catch (NumberFormatException e) {
return null; // treated as header_abort not applicable
}
} else {
abortStatus = faultAbort.status();
}
if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
return null;
}
return abortStatus;
}
}
private static int getRatePerMillion(FractionalPercent percent) {
int numerator = percent.numerator();
FractionalPercent.DenominatorType type = percent.denominatorType();
switch (type) {
case TEN_THOUSAND:
numerator *= 100;
break;
case HUNDRED:
numerator *= 10_000;
break;
case MILLION:
default:
break;
private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
checkArgument(!interceptors.isEmpty(), "empty interceptors");
if (interceptors.size() == 1) {
return interceptors.get(0);
}
if (numerator > 1_000_000 || numerator < 0) {
numerator = 1_000_000;
}
return numerator;
}
/**
* A forwarding client call that counts active fault injections.
*/
private final class ActiveFaultCountingClientCall<ReqT, RespT> extends
SimpleForwardingClientCall<ReqT, RespT> {
ActiveFaultCountingClientCall(ClientCall<ReqT, RespT> faultInjectedDelegate) {
super(faultInjectedDelegate);
activeFaultInjectedStreamCounter.incrementAndGet();
}
@Override
public void start(Listener<RespT> listener, Metadata headers) {
listener = new SimpleForwardingClientCallListener<RespT>(listener) {
@Override
public void onClose(Status status, Metadata trailers) {
delegate().onClose(status, trailers);
activeFaultInjectedStreamCounter.decrementAndGet();
}
};
delegate().start(listener, headers);
}
}
/** A {@link DelayedClientCall} with a fixed delay. */
private static final class DelayInjectedCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
final Object lock = new Object();
ScheduledFuture<?> delayTask;
boolean cancelled;
DelayInjectedCall(
long delayNanos, Executor callExecutor, ScheduledExecutorService scheduler,
@Nullable Deadline deadline,
final Supplier<? extends ClientCall<ReqT, RespT>> callSupplier) {
super(callExecutor, scheduler, deadline);
ScheduledFuture<?> task = scheduler.schedule(
new Runnable() {
@Override
public void run() {
setCall(callSupplier.get());
}
},
delayNanos,
NANOSECONDS);
synchronized (lock) {
if (cancelled) {
task.cancel(false);
return;
}
delayTask = task;
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
next = ClientInterceptors.interceptForward(next, interceptors);
return next.newCall(method, callOptions);
}
}
@Override
protected void callCancelled() {
ScheduledFuture<?> savedDelayTask;
synchronized (lock) {
cancelled = true;
savedDelayTask = delayTask;
}
if (savedDelayTask != null) {
savedDelayTask.cancel(false);
}
}
}
/** An implementation of {@link ClientCall} that fails when started. */
private static final class FailingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
final Status error;
final Executor callExecutor;
final Context context;
FailingClientCall(Status error, Executor callExecutor) {
this.error = error;
this.callExecutor = callExecutor;
this.context = Context.current();
}
@Override
public void start(final ClientCall.Listener<RespT> listener, Metadata headers) {
callExecutor.execute(
new Runnable() {
@Override
public void run() {
Context previous = context.attach();
try {
listener.onClose(error, new Metadata());
} finally {
context.detach(previous);
}
}
});
}
@Override
public void request(int numMessages) {}
@Override
public void cancel(String message, Throwable cause) {}
@Override
public void halfClose() {}
@Override
public void sendMessage(ReqT message) {}
};
}
@VisibleForTesting
@ -872,10 +641,7 @@ final class XdsNameResolver extends NameResolver {
private String rdsResource;
@Nullable
private RdsResourceWatcher rdsWatcher;
private long httpMaxStreamDurationNano;
private boolean applyFaultInjection;
@Nullable
private HttpFault httpFilterFaultConfig;
private LdsUpdate update;
@Override
public void onChanged(final LdsUpdate update) {
@ -886,9 +652,7 @@ final class XdsNameResolver extends NameResolver {
return;
}
logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
httpMaxStreamDurationNano = update.httpMaxStreamDurationNano;
applyFaultInjection = update.hasFaultInjection;
httpFilterFaultConfig = applyFaultInjection ? update.httpFault : null;
ResolveState.this.update = update;
List<VirtualHost> virtualHosts = update.virtualHosts;
String rdsName = update.rdsName;
if (rdsName != null && rdsName.equals(rdsResource)) {
@ -958,11 +722,27 @@ final class XdsNameResolver extends NameResolver {
listener.onResult(emptyResult);
return;
}
List<Route> routes = virtualHost.routes();
HttpFault faultConfig = httpFilterFaultConfig;
if (applyFaultInjection && virtualHost.httpFault() != null) {
faultConfig = virtualHost.httpFault();
List<NamedFilterConfig> filterChain = null;
if (update.filterChain != null) {
boolean hasRouter = false;
filterChain = new ArrayList<>(update.filterChain.size());
for (NamedFilterConfig namedFilter : update.filterChain) {
filterChain.add(namedFilter);
if (namedFilter.filterConfig.equals(RouterFilter.ROUTER_CONFIG)) {
hasRouter = true;
break;
}
}
if (!hasRouter) {
filterChain.add(LAME_FILTER);
routingConfig = new RoutingConfig(
update.httpMaxStreamDurationNano, Collections.<Route>emptyList(), filterChain,
virtualHost.filterConfigOverrides());
updateResolutionResult();
return;
}
}
List<Route> routes = virtualHost.routes();
Set<String> clusters = new HashSet<>();
for (Route route : routes) {
RouteAction action = route.routeAction();
@ -996,7 +776,8 @@ final class XdsNameResolver extends NameResolver {
// Make newly added clusters selectable by config selector and deleted clusters no longer
// selectable.
routingConfig = new RoutingConfig(
httpMaxStreamDurationNano, routes, applyFaultInjection, faultConfig);
update.httpMaxStreamDurationNano, routes, filterChain,
virtualHost.filterConfigOverrides());
shouldUpdateResult = false;
for (String cluster : deletedClusters) {
int count = clusterRefs.get(cluster).decrementAndGet();
@ -1071,21 +852,22 @@ final class XdsNameResolver extends NameResolver {
*/
private static class RoutingConfig {
private final long fallbackTimeoutNano;
private final List<Route> routes;
private final boolean applyFaultInjection;
@Nullable
private final HttpFault faultConfig;
final List<Route> routes;
// Null if HttpFilter is not supported.
@Nullable final List<NamedFilterConfig> filterChain;
final Map<String, FilterConfig> virtualHostOverrideConfig;
private static final RoutingConfig empty =
new RoutingConfig(0L, Collections.<Route>emptyList(), false, null);
private static RoutingConfig empty = new RoutingConfig(
0L, Collections.<Route>emptyList(), null, Collections.<String, FilterConfig>emptyMap());
private RoutingConfig(
long fallbackTimeoutNano, List<Route> routes, boolean applyFaultInjection,
HttpFault faultConfig) {
long fallbackTimeoutNano, List<Route> routes, @Nullable List<NamedFilterConfig> filterChain,
Map<String, FilterConfig> virtualHostOverrideConfig) {
this.fallbackTimeoutNano = fallbackTimeoutNano;
this.routes = routes;
this.applyFaultInjection = applyFaultInjection;
this.faultConfig = faultConfig;
checkArgument(filterChain == null || !filterChain.isEmpty(), "filterChain is empty");
this.filterChain = filterChain == null ? null : Collections.unmodifiableList(filterChain);
this.virtualHostOverrideConfig = Collections.unmodifiableMap(virtualHostOverrideConfig);
}
}
}

View File

@ -18,8 +18,10 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.BoolValue;
import com.google.protobuf.StringValue;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.util.Durations;
import com.google.re2j.Pattern;
@ -45,7 +47,7 @@ import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.Header;
import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.QueryParameter;
import io.envoyproxy.envoy.config.route.v3.RouteAction.MaxStreamDuration;
import io.envoyproxy.envoy.config.route.v3.WeightedCluster;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.HeaderAbort;
import io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter;
@ -59,7 +61,8 @@ import io.grpc.Status.Code;
import io.grpc.xds.ClientXdsClient.StructOrError;
import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.HttpFault.FaultAbort;
import io.grpc.xds.FaultConfig.FaultAbort;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Matchers.FractionMatcher;
import io.grpc.xds.Matchers.HeaderMatcher;
import io.grpc.xds.Matchers.PathMatcher;
@ -71,6 +74,7 @@ import io.grpc.xds.VirtualHost.Route.RouteMatch;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -91,7 +95,7 @@ public class ClientXdsClientDataTest {
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setCluster("cluster-foo"))
.build();
StructOrError<Route> struct = ClientXdsClient.parseRoute(proto);
StructOrError<Route> struct = ClientXdsClient.parseRoute(proto, false);
assertThat(struct.getErrorDetail()).isNull();
assertThat(struct.getStruct())
.isEqualTo(
@ -99,7 +103,7 @@ public class ClientXdsClientDataTest {
RouteMatch.create(PathMatcher.fromPath("/service/method", false),
Collections.<HeaderMatcher>emptyList(), null),
RouteAction.forCluster("cluster-foo", Collections.<HashPolicy>emptyList(), null),
null));
ImmutableMap.<String, FilterConfig>of()));
}
@Test
@ -111,7 +115,7 @@ public class ClientXdsClientDataTest {
.setMatch(io.envoyproxy.envoy.config.route.v3.RouteMatch.newBuilder().setPath(""))
.setRedirect(RedirectAction.getDefaultInstance())
.build();
res = ClientXdsClient.parseRoute(redirectRoute);
res = ClientXdsClient.parseRoute(redirectRoute, false);
assertThat(res.getStruct()).isNull();
assertThat(res.getErrorDetail()).isEqualTo("Unsupported action type: redirect");
@ -121,7 +125,7 @@ public class ClientXdsClientDataTest {
.setMatch(io.envoyproxy.envoy.config.route.v3.RouteMatch.newBuilder().setPath(""))
.setDirectResponse(DirectResponseAction.getDefaultInstance())
.build();
res = ClientXdsClient.parseRoute(directResponseRoute);
res = ClientXdsClient.parseRoute(directResponseRoute, false);
assertThat(res.getStruct()).isNull();
assertThat(res.getErrorDetail()).isEqualTo("Unsupported action type: direct_response");
@ -131,7 +135,7 @@ public class ClientXdsClientDataTest {
.setMatch(io.envoyproxy.envoy.config.route.v3.RouteMatch.newBuilder().setPath(""))
.setFilterAction(FilterAction.getDefaultInstance())
.build();
res = ClientXdsClient.parseRoute(filterRoute);
res = ClientXdsClient.parseRoute(filterRoute, false);
assertThat(res.getStruct()).isNull();
assertThat(res.getErrorDetail()).isEqualTo("Unsupported action type: filter_action");
}
@ -151,7 +155,7 @@ public class ClientXdsClientDataTest {
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setCluster("cluster-foo"))
.build();
assertThat(ClientXdsClient.parseRoute(proto)).isNull();
assertThat(ClientXdsClient.parseRoute(proto, false)).isNull();
}
@Test
@ -166,7 +170,7 @@ public class ClientXdsClientDataTest {
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setClusterHeader("cluster header")) // cluster_header action not supported
.build();
assertThat(ClientXdsClient.parseRoute(proto)).isNull();
assertThat(ClientXdsClient.parseRoute(proto, false)).isNull();
}
@Test
@ -345,7 +349,7 @@ public class ClientXdsClientDataTest {
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setCluster("cluster-foo")
.build();
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto);
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto, false);
assertThat(struct.getErrorDetail()).isNull();
assertThat(struct.getStruct().cluster()).isEqualTo("cluster-foo");
assertThat(struct.getStruct().weightedClusters()).isNull();
@ -367,12 +371,12 @@ public class ClientXdsClientDataTest {
.setName("cluster-bar")
.setWeight(UInt32Value.newBuilder().setValue(70))))
.build();
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto);
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto, false);
assertThat(struct.getErrorDetail()).isNull();
assertThat(struct.getStruct().cluster()).isNull();
assertThat(struct.getStruct().weightedClusters()).containsExactly(
ClusterWeight.create("cluster-foo", 30, null),
ClusterWeight.create("cluster-bar", 70, null));
ClusterWeight.create("cluster-foo", 30, ImmutableMap.<String, FilterConfig>of()),
ClusterWeight.create("cluster-bar", 70, ImmutableMap.<String, FilterConfig>of()));
}
@Test
@ -385,7 +389,7 @@ public class ClientXdsClientDataTest {
.setGrpcTimeoutHeaderMax(Durations.fromSeconds(5L))
.setMaxStreamDuration(Durations.fromMillis(20L)))
.build();
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto);
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto, false);
assertThat(struct.getStruct().timeoutNano()).isEqualTo(TimeUnit.SECONDS.toNanos(5L));
}
@ -398,7 +402,7 @@ public class ClientXdsClientDataTest {
MaxStreamDuration.newBuilder()
.setMaxStreamDuration(Durations.fromSeconds(5L)))
.build();
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto);
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto, false);
assertThat(struct.getStruct().timeoutNano()).isEqualTo(TimeUnit.SECONDS.toNanos(5L));
}
@ -408,7 +412,7 @@ public class ClientXdsClientDataTest {
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setCluster("cluster-foo")
.build();
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto);
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto, false);
assertThat(struct.getStruct().timeoutNano()).isNull();
}
@ -443,7 +447,7 @@ public class ClientXdsClientDataTest {
.setQueryParameter(
QueryParameter.newBuilder().setName("param"))) // unsupported
.build();
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto);
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto, false);
List<HashPolicy> policies = struct.getStruct().hashPolicies();
assertThat(policies).hasSize(2);
assertThat(policies.get(0).type()).isEqualTo(HashPolicy.Type.HEADER);
@ -463,29 +467,11 @@ public class ClientXdsClientDataTest {
.setName("cluster-foo")
.setWeight(UInt32Value.newBuilder().setValue(30))
.build();
ClusterWeight clusterWeight = ClientXdsClient.parseClusterWeight(proto).getStruct();
ClusterWeight clusterWeight = ClientXdsClient.parseClusterWeight(proto, false).getStruct();
assertThat(clusterWeight.name()).isEqualTo("cluster-foo");
assertThat(clusterWeight.weight()).isEqualTo(30);
}
// TODO(zdapeng): add tests for parseClusterWeight with HttpFault.
// TODO(zdapeng): add tests for parseHttpFault.
@Test
public void parseFaultAbort_withHeaderAbort() {
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort proto =
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.newBuilder()
.setPercentage(FractionalPercent.newBuilder()
.setNumerator(20).setDenominator(DenominatorType.HUNDRED))
.setHeaderAbort(HeaderAbort.getDefaultInstance()).build();
FaultAbort faultAbort = ClientXdsClient.parseFaultAbort(proto).getStruct();
assertThat(faultAbort.headerAbort()).isTrue();
assertThat(faultAbort.percent().numerator()).isEqualTo(20);
assertThat(faultAbort.percent().denominatorType())
.isEqualTo(HttpFault.FractionalPercent.DenominatorType.HUNDRED);
}
@Test
public void parseFaultAbort_withHttpStatus() {
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort proto =
@ -493,10 +479,10 @@ public class ClientXdsClientDataTest {
.setPercentage(FractionalPercent.newBuilder()
.setNumerator(100).setDenominator(DenominatorType.TEN_THOUSAND))
.setHttpStatus(400).build();
FaultAbort res = ClientXdsClient.parseFaultAbort(proto).getStruct();
FaultAbort res = FaultFilter.parseFaultAbort(proto).config;
assertThat(res.percent().numerator()).isEqualTo(100);
assertThat(res.percent().denominatorType())
.isEqualTo(HttpFault.FractionalPercent.DenominatorType.TEN_THOUSAND);
.isEqualTo(FaultConfig.FractionalPercent.DenominatorType.TEN_THOUSAND);
assertThat(res.status().getCode()).isEqualTo(Code.INTERNAL);
}
@ -507,10 +493,10 @@ public class ClientXdsClientDataTest {
.setPercentage(FractionalPercent.newBuilder()
.setNumerator(600).setDenominator(DenominatorType.MILLION))
.setGrpcStatus(Code.DEADLINE_EXCEEDED.value()).build();
FaultAbort faultAbort = ClientXdsClient.parseFaultAbort(proto).getStruct();
FaultAbort faultAbort = FaultFilter.parseFaultAbort(proto).config;
assertThat(faultAbort.percent().numerator()).isEqualTo(600);
assertThat(faultAbort.percent().denominatorType())
.isEqualTo(HttpFault.FractionalPercent.DenominatorType.MILLION);
.isEqualTo(FaultConfig.FractionalPercent.DenominatorType.MILLION);
assertThat(faultAbort.status().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
}
@ -627,6 +613,73 @@ public class ClientXdsClientDataTest {
assertThat(struct.getErrorDetail()).isEqualTo("negative priority");
}
@Test
public void parseHttpFilter_unsupportedButOptional() {
HttpFilter httpFilter = HttpFilter.newBuilder()
.setIsOptional(true)
.setTypedConfig(Any.pack(StringValue.of("unsupported")))
.build();
assertThat(ClientXdsClient.parseHttpFilter(httpFilter)).isNull();
}
@Test
public void parseHttpFilter_unsupportedAndRequired() {
HttpFilter httpFilter = HttpFilter.newBuilder()
.setIsOptional(false)
.setName("unsupported.filter")
.setTypedConfig(Any.pack(StringValue.of("string value")))
.build();
assertThat(ClientXdsClient.parseHttpFilter(httpFilter).getErrorDetail()).isEqualTo(
"HttpFilter [unsupported.filter] is not optional and has an unsupported config type: "
+ "type.googleapis.com/google.protobuf.StringValue");
}
@Test
public void parseOverrideFilterConfigs_unsupportedButOptional() {
HTTPFault httpFault = HTTPFault.newBuilder()
.setDelay(FaultDelay.newBuilder().setFixedDelay(Durations.fromNanos(3000)))
.build();
Map<String, Any> configOverrides = ImmutableMap.of(
"envoy.fault",
Any.pack(httpFault),
"unsupported.filter",
Any.pack(io.envoyproxy.envoy.config.route.v3.FilterConfig.newBuilder()
.setIsOptional(true).setConfig(Any.pack(StringValue.of("string value")))
.build()));
Map<String, FilterConfig> parsedConfigs =
ClientXdsClient.parseOverrideFilterConfigs(configOverrides).getStruct();
assertThat(parsedConfigs).hasSize(1);
assertThat(parsedConfigs).containsKey("envoy.fault");
}
@Test
public void parseOverrideFilterConfigs_unsupportedAndRequired() {
HTTPFault httpFault = HTTPFault.newBuilder()
.setDelay(FaultDelay.newBuilder().setFixedDelay(Durations.fromNanos(3000)))
.build();
Map<String, Any> configOverrides = ImmutableMap.of(
"envoy.fault",
Any.pack(httpFault),
"unsupported.filter",
Any.pack(io.envoyproxy.envoy.config.route.v3.FilterConfig.newBuilder()
.setIsOptional(false).setConfig(Any.pack(StringValue.of("string value")))
.build()));
assertThat(ClientXdsClient.parseOverrideFilterConfigs(configOverrides).getErrorDetail())
.isEqualTo(
"HttpFilter [unsupported.filter] is not optional and has an unsupported config type: "
+ "type.googleapis.com/google.protobuf.StringValue");
configOverrides = ImmutableMap.of(
"envoy.fault",
Any.pack(httpFault),
"unsupported.filter",
Any.pack(StringValue.of("string value")));
assertThat(ClientXdsClient.parseOverrideFilterConfigs(configOverrides).getErrorDetail())
.isEqualTo(
"HttpFilter [unsupported.filter] is not optional and has an unsupported config type: "
+ "type.googleapis.com/google.protobuf.StringValue");
}
@Test
public void parseServerSideListener_invalidTrafficDirection() {
Listener listener =

View File

@ -34,8 +34,8 @@ import com.google.common.collect.Iterables;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.FilterConfig;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig;
import io.grpc.BindableService;
import io.grpc.ManagedChannel;
@ -53,7 +53,7 @@ import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.HttpFault.FractionalPercent.DenominatorType;
import io.grpc.xds.FaultConfig.FractionalPercent.DenominatorType;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
@ -383,7 +383,7 @@ public abstract class ClientXdsClientTestBase {
mf.buildOpaqueRoutes(1),
ImmutableMap.of(
"irrelevant",
Any.pack(StringValue.of("irrelevant")),
Any.pack(FilterConfig.newBuilder().setIsOptional(true).build()),
"envoy.fault",
mf.buildHttpFaultTypedConfig(
300L, 1000, "cluster1", ImmutableList.<String>of(), 100, null, null,
@ -394,12 +394,15 @@ public abstract class ClientXdsClientTestBase {
"envoy.fault",
mf.buildHttpFaultTypedConfig(
null, null, "cluster2", ImmutableList.<String>of(), 101, null, 503,
2000)))
)),
2000))))),
ImmutableList.of(
mf.buildHttpFilter("irrelevant", null),
mf.buildHttpFilter("envoy.fault", null)
)));
mf.buildHttpFilter("irrelevant", null, true),
mf.buildHttpFilter(
"envoy.fault",
mf.buildHttpFaultTypedConfig(
1L, 2, "cluster1", ImmutableList.<String>of(), 3, null, null,
null),
false))));
call.sendResponse(LDS, listener, VERSION_1, "0000");
// Client sends an ACK LDS request.
@ -408,24 +411,23 @@ public abstract class ClientXdsClientTestBase {
LdsUpdate ldsUpdate = ldsUpdateCaptor.getValue();
assertThat(ldsUpdate.virtualHosts).hasSize(2);
assertThat(ldsUpdate.hasFaultInjection).isTrue();
assertThat(ldsUpdate.httpFault).isNull();
HttpFault httpFault = ldsUpdate.virtualHosts.get(0).httpFault();
assertThat(httpFault.faultDelay().delayNanos()).isEqualTo(300);
assertThat(httpFault.faultDelay().percent().numerator()).isEqualTo(1000);
assertThat(httpFault.faultDelay().percent().denominatorType())
assertThat(ldsUpdate.filterChain.get(0).name).isEqualTo("envoy.fault");
FaultConfig faultConfig = (FaultConfig) ldsUpdate.virtualHosts.get(0)
.filterConfigOverrides().get("envoy.fault");
assertThat(faultConfig.faultDelay().delayNanos()).isEqualTo(300);
assertThat(faultConfig.faultDelay().percent().numerator()).isEqualTo(1000);
assertThat(faultConfig.faultDelay().percent().denominatorType())
.isEqualTo(DenominatorType.MILLION);
assertThat(httpFault.faultAbort()).isNull();
assertThat(httpFault.upstreamCluster()).isEqualTo("cluster1");
assertThat(httpFault.maxActiveFaults()).isEqualTo(100);
httpFault = ldsUpdate.virtualHosts.get(1).httpFault();
assertThat(httpFault.faultDelay()).isNull();
assertThat(httpFault.faultAbort().status().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(httpFault.faultAbort().percent().numerator()).isEqualTo(2000);
assertThat(httpFault.faultAbort().percent().denominatorType())
assertThat(faultConfig.faultAbort()).isNull();
assertThat(faultConfig.maxActiveFaults()).isEqualTo(100);
faultConfig = (FaultConfig) ldsUpdate.virtualHosts.get(1)
.filterConfigOverrides().get("envoy.fault");
assertThat(faultConfig.faultDelay()).isNull();
assertThat(faultConfig.faultAbort().status().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(faultConfig.faultAbort().percent().numerator()).isEqualTo(2000);
assertThat(faultConfig.faultAbort().percent().denominatorType())
.isEqualTo(DenominatorType.MILLION);
assertThat(httpFault.upstreamCluster()).isEqualTo("cluster2");
assertThat(httpFault.maxActiveFaults()).isEqualTo(101);
assertThat(faultConfig.maxActiveFaults()).isEqualTo(101);
}
@Test
@ -1430,7 +1432,8 @@ public abstract class ClientXdsClientTestBase {
protected abstract Message buildListenerForRds(String name, String rdsResourceName);
protected abstract Message buildHttpFilter(String name, @Nullable Any typedConfig);
protected abstract Message buildHttpFilter(
String name, @Nullable Any typedConfig, boolean isOptional);
protected abstract Any buildHttpFaultTypedConfig(
@Nullable Long delayNanos, @Nullable Integer delayRate, String upstreamCluster,

View File

@ -277,7 +277,7 @@ public class ClientXdsClientV2Test extends ClientXdsClientTestBase {
}
@Override
protected Message buildHttpFilter(String name, @Nullable Any typedConfig) {
protected Message buildHttpFilter(String name, @Nullable Any typedConfig, boolean isOptional) {
throw new UnsupportedOperationException();
}

View File

@ -287,8 +287,8 @@ public class ClientXdsClientV3Test extends ClientXdsClientTestBase {
}
@Override
protected Message buildHttpFilter(String name, @Nullable Any typedConfig) {
HttpFilter.Builder builder = HttpFilter.newBuilder().setName(name);
protected Message buildHttpFilter(String name, @Nullable Any typedConfig, boolean isOptional) {
HttpFilter.Builder builder = HttpFilter.newBuilder().setName(name).setIsOptional(isOptional);
if (typedConfig != null) {
builder.setTypedConfig(typedConfig);
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import com.google.protobuf.Any;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.HeaderAbort;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
import io.grpc.internal.GrpcUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link FaultFilter}. */
@RunWith(JUnit4.class)
public class FaultFilterTest {
@Test
public void parseFaultAbort_convertHttpStatus() {
Any rawConfig = Any.pack(
HTTPFault.newBuilder().setAbort(FaultAbort.newBuilder().setHttpStatus(404)).build());
FaultConfig faultConfig = FaultFilter.INSTANCE.parseFilterConfig(rawConfig).config;
assertThat(faultConfig.faultAbort().status().getCode())
.isEqualTo(GrpcUtil.httpStatusToGrpcStatus(404).getCode());
FaultConfig faultConfigOverride =
FaultFilter.INSTANCE.parseFilterConfigOverride(rawConfig).config;
assertThat(faultConfigOverride.faultAbort().status().getCode())
.isEqualTo(GrpcUtil.httpStatusToGrpcStatus(404).getCode());
}
@Test
public void parseFaultAbort_withHeaderAbort() {
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort proto =
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.newBuilder()
.setPercentage(FractionalPercent.newBuilder()
.setNumerator(20).setDenominator(DenominatorType.HUNDRED))
.setHeaderAbort(HeaderAbort.getDefaultInstance()).build();
FaultConfig.FaultAbort faultAbort = FaultFilter.parseFaultAbort(proto).config;
assertThat(faultAbort.headerAbort()).isTrue();
assertThat(faultAbort.percent().numerator()).isEqualTo(20);
assertThat(faultAbort.percent().denominatorType())
.isEqualTo(FaultConfig.FractionalPercent.DenominatorType.HUNDRED);
}
}

View File

@ -17,12 +17,11 @@
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsNameResolver.DOWNSTREAM_NODE_KEY;
import static io.grpc.xds.XdsNameResolver.HEADER_ABORT_GRPC_STATUS_KEY;
import static io.grpc.xds.XdsNameResolver.HEADER_ABORT_HTTP_STATUS_KEY;
import static io.grpc.xds.XdsNameResolver.HEADER_ABORT_PERCENTAGE_KEY;
import static io.grpc.xds.XdsNameResolver.HEADER_DELAY_KEY;
import static io.grpc.xds.XdsNameResolver.HEADER_DELAY_PERCENTAGE_KEY;
import static io.grpc.xds.FaultFilter.HEADER_ABORT_GRPC_STATUS_KEY;
import static io.grpc.xds.FaultFilter.HEADER_ABORT_HTTP_STATUS_KEY;
import static io.grpc.xds.FaultFilter.HEADER_ABORT_PERCENTAGE_KEY;
import static io.grpc.xds.FaultFilter.HEADER_DELAY_KEY;
import static io.grpc.xds.FaultFilter.HEADER_DELAY_PERCENTAGE_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
@ -35,6 +34,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.re2j.Pattern;
@ -63,8 +63,10 @@ import io.grpc.internal.NoopClientCall.NoopClientCallListener;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.PickSubchannelArgsImpl;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.xds.HttpFault.FaultAbort;
import io.grpc.xds.HttpFault.FaultDelay;
import io.grpc.xds.FaultConfig.FaultAbort;
import io.grpc.xds.FaultConfig.FaultDelay;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Filter.NamedFilterConfig;
import io.grpc.xds.Matchers.HeaderMatcher;
import io.grpc.xds.Matchers.PathMatcher;
import io.grpc.xds.VirtualHost.Route;
@ -101,6 +103,8 @@ import org.mockito.junit.MockitoRule;
@RunWith(JUnit4.class)
public class XdsNameResolverTest {
private static final String AUTHORITY = "foo.googleapis.com:80";
private static final String FAULT_FILTER_INSTANCE_NAME = "envoy.fault";
private static final String ROUTER_FILTER_INSTANCE_NAME = "envoy.router";
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
private final SynchronizationContext syncContext = new SynchronizationContext(
@ -136,22 +140,21 @@ public class XdsNameResolverTest {
private XdsNameResolver resolver;
private TestCall<?, ?> testCall;
private boolean originalEnableTimeout;
private AtomicLong originalFaultCounter;
@Before
public void setUp() {
originalEnableTimeout = XdsNameResolver.enableTimeout;
originalFaultCounter = XdsNameResolver.activeFaultInjectedStreamCounter;
XdsNameResolver.enableTimeout = true;
XdsNameResolver.activeFaultInjectedStreamCounter = new AtomicLong();
FilterRegistry filterRegistry = FilterRegistry.newRegistry().register(
new FaultFilter(mockRandom, new AtomicLong()),
RouterFilter.INSTANCE);
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom);
xdsClientPoolFactory, mockRandom, filterRegistry);
}
@After
public void tearDown() {
XdsNameResolver.enableTimeout = originalEnableTimeout;
XdsNameResolver.activeFaultInjectedStreamCounter = originalFaultCounter;
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
resolver.shutdown();
if (xdsClient != null) {
@ -174,7 +177,7 @@ public class XdsNameResolverTest {
}
};
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom);
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry());
resolver.start(mockListener);
verify(mockListener).onError(errorCaptor.capture());
Status error = errorCaptor.getValue();
@ -270,15 +273,19 @@ public class XdsNameResolverTest {
private List<VirtualHost> buildUnmatchedVirtualHosts() {
Route route1 = Route.create(RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(
cluster2, Collections.<HashPolicy>emptyList(), TimeUnit.SECONDS.toNanos(15L)), null);
cluster2, Collections.<HashPolicy>emptyList(), TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of());
Route route2 = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(
cluster1, Collections.<HashPolicy>emptyList(), TimeUnit.SECONDS.toNanos(15L)), null);
cluster1, Collections.<HashPolicy>emptyList(), TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of());
return Arrays.asList(
VirtualHost.create("virtualhost-foo", Collections.singletonList("hello.googleapis.com"),
Collections.singletonList(route1), null),
Collections.singletonList(route1),
ImmutableMap.<String, FilterConfig>of()),
VirtualHost.create("virtualhost-bar", Collections.singletonList("hi.googleapis.com"),
Collections.singletonList(route2), null));
Collections.singletonList(route2),
ImmutableMap.<String, FilterConfig>of()));
}
@Test
@ -287,9 +294,11 @@ public class XdsNameResolverTest {
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
Route route = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(
cluster1, Collections.<HashPolicy>emptyList(), null), null); // per-route timeout unset
cluster1, Collections.<HashPolicy>emptyList(), null), // per-route timeout unset
ImmutableMap.<String, FilterConfig>of());
VirtualHost virtualHost = VirtualHost.create("does not matter",
Collections.singletonList(AUTHORITY), Collections.singletonList(route), null);
Collections.singletonList(AUTHORITY), Collections.singletonList(route),
ImmutableMap.<String, FilterConfig>of());
xdsClient.deliverLdsUpdate(AUTHORITY, 0L, Collections.singletonList(virtualHost));
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
@ -303,9 +312,11 @@ public class XdsNameResolverTest {
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
Route route = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(
cluster1, Collections.<HashPolicy>emptyList(), null), null); // per-route timeout unset
cluster1, Collections.<HashPolicy>emptyList(), null), // per-route timeout unset
ImmutableMap.<String, FilterConfig>of());
VirtualHost virtualHost = VirtualHost.create("does not matter",
Collections.singletonList(AUTHORITY), Collections.singletonList(route), null);
Collections.singletonList(AUTHORITY), Collections.singletonList(route),
ImmutableMap.<String, FilterConfig>of());
xdsClient.deliverLdsUpdate(AUTHORITY, TimeUnit.SECONDS.toNanos(5L),
Collections.singletonList(virtualHost));
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -347,7 +358,8 @@ public class XdsNameResolverTest {
"/" + TestMethodDescriptors.voidMethod().getFullMethodName()),
RouteAction.forCluster(cluster1, Collections.singletonList(HashPolicy.forHeader(
false, "custom-key", Pattern.compile("value"), "val")),
null), null)));
null),
ImmutableMap.<String, FilterConfig>of())));
verify(mockListener).onResult(resolutionResultCaptor.capture());
InternalConfigSelector configSelector =
resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY);
@ -383,7 +395,8 @@ public class XdsNameResolverTest {
RouteMatch.withPathExactOnly(
"/" + TestMethodDescriptors.voidMethod().getFullMethodName()),
RouteAction.forCluster(cluster1, Collections.singletonList(
HashPolicy.forChannelId(false)), null), null)));
HashPolicy.forChannelId(false)), null),
ImmutableMap.<String, FilterConfig>of())));
verify(mockListener).onResult(resolutionResultCaptor.capture());
InternalConfigSelector configSelector =
resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY);
@ -404,7 +417,7 @@ public class XdsNameResolverTest {
resolver.shutdown();
reset(mockListener);
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom);
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry());
resolver.start(mockListener);
xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(
@ -414,7 +427,8 @@ public class XdsNameResolverTest {
RouteMatch.withPathExactOnly(
"/" + TestMethodDescriptors.voidMethod().getFullMethodName()),
RouteAction.forCluster(cluster1, Collections.singletonList(
HashPolicy.forChannelId(false)), null), null)));
HashPolicy.forChannelId(false)), null),
ImmutableMap.<String, FilterConfig>of())));
verify(mockListener).onResult(resolutionResultCaptor.capture());
configSelector = resolutionResultCaptor.getValue().getAttributes().get(
InternalConfigSelector.KEY);
@ -445,12 +459,13 @@ public class XdsNameResolverTest {
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(
"another-cluster", Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(20L)), null),
TimeUnit.SECONDS.toNanos(20L)),
ImmutableMap.<String, FilterConfig>of()),
Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(
cluster2, Collections.<HashPolicy>emptyList(), TimeUnit.SECONDS.toNanos(15L)),
null)));
ImmutableMap.<String, FilterConfig>of())));
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
// Updated service config still contains cluster1 while it is removed resource. New calls no
@ -484,11 +499,13 @@ public class XdsNameResolverTest {
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(
"another-cluster", Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(20L)), null),
TimeUnit.SECONDS.toNanos(20L)),
ImmutableMap.<String, FilterConfig>of()),
Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of())));
// Two consecutive service config updates: one for removing clcuster1,
// one for adding "another=cluster".
verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture());
@ -517,11 +534,13 @@ public class XdsNameResolverTest {
Route.create(
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster("another-cluster", Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(20L)), null),
TimeUnit.SECONDS.toNanos(20L)),
ImmutableMap.<String, FilterConfig>of()),
Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of())));
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
@ -535,11 +554,13 @@ public class XdsNameResolverTest {
Route.create(
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster("another-cluster", Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null),
TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of()),
Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of())));
verifyNoMoreInteractions(mockListener); // no cluster added/deleted
assertCallSelectResult(call1, configSelector, "another-cluster", 15.0);
}
@ -555,18 +576,21 @@ public class XdsNameResolverTest {
Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of())));
xdsClient.deliverLdsUpdate(
AUTHORITY,
Arrays.asList(
Route.create(
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(cluster1, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null),
TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of()),
Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of())));
testCall.deliverErrorStatus();
verifyNoMoreInteractions(mockListener);
}
@ -584,10 +608,12 @@ public class XdsNameResolverTest {
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forWeightedClusters(
Arrays.asList(
ClusterWeight.create(cluster1, 20, null),
ClusterWeight.create(cluster2, 80, null)),
ClusterWeight.create(cluster1, 20, ImmutableMap.<String, FilterConfig>of()),
ClusterWeight.create(
cluster2, 80, ImmutableMap.<String, FilterConfig>of())),
Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(20L)), null)));
TimeUnit.SECONDS.toNanos(20L)),
ImmutableMap.<String, FilterConfig>of())));
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
assertThat(result.getAddresses()).isEmpty();
@ -645,11 +671,13 @@ public class XdsNameResolverTest {
Route.create(
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(cluster1, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null),
TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of()),
Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
TimeUnit.SECONDS.toNanos(15L)),
ImmutableMap.<String, FilterConfig>of())));
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
assertThat(result.getAddresses()).isEmpty();
@ -788,11 +816,14 @@ public class XdsNameResolverTest {
String hostname = "a.googleapis.com";
List<Route> routes = Collections.emptyList();
VirtualHost vHost1 = VirtualHost.create("virtualhost01.googleapis.com",
Arrays.asList("a.googleapis.com", "b.googleapis.com"), routes, null);
Arrays.asList("a.googleapis.com", "b.googleapis.com"), routes,
ImmutableMap.<String, FilterConfig>of());
VirtualHost vHost2 = VirtualHost.create("virtualhost02.googleapis.com",
Collections.singletonList("*.googleapis.com"), routes, null);
Collections.singletonList("*.googleapis.com"), routes,
ImmutableMap.<String, FilterConfig>of());
VirtualHost vHost3 = VirtualHost.create("virtualhost03.googleapis.com",
Collections.singletonList("*"), routes, null);
Collections.singletonList("*"), routes,
ImmutableMap.<String, FilterConfig>of());
List<VirtualHost> virtualHosts = Arrays.asList(vHost1, vHost2, vHost3);
assertThat(XdsNameResolver.findVirtualHostForHostName(virtualHosts, hostname))
.isEqualTo(vHost1);
@ -803,11 +834,14 @@ public class XdsNameResolverTest {
String hostname = "a.googleapis.com";
List<Route> routes = Collections.emptyList();
VirtualHost vHost1 = VirtualHost.create("virtualhost01.googleapis.com",
Arrays.asList("*.googleapis.com", "b.googleapis.com"), routes, null);
Arrays.asList("*.googleapis.com", "b.googleapis.com"), routes,
ImmutableMap.<String, FilterConfig>of());
VirtualHost vHost2 = VirtualHost.create("virtualhost02.googleapis.com",
Collections.singletonList("a.googleapis.*"), routes, null);
Collections.singletonList("a.googleapis.*"), routes,
ImmutableMap.<String, FilterConfig>of());
VirtualHost vHost3 = VirtualHost.create("virtualhost03.googleapis.com",
Collections.singletonList("*"), routes, null);
Collections.singletonList("*"), routes,
ImmutableMap.<String, FilterConfig>of());
List<VirtualHost> virtualHosts = Arrays.asList(vHost1, vHost2, vHost3);
assertThat(XdsNameResolver.findVirtualHostForHostName(virtualHosts, hostname))
.isEqualTo(vHost1);
@ -818,9 +852,11 @@ public class XdsNameResolverTest {
String hostname = "a.googleapis.com";
List<Route> routes = Collections.emptyList();
VirtualHost vHost1 = VirtualHost.create("virtualhost01.googleapis.com",
Collections.singletonList("*"), routes, null);
Collections.singletonList("*"), routes,
ImmutableMap.<String, FilterConfig>of());
VirtualHost vHost2 = VirtualHost.create("virtualhost02.googleapis.com",
Collections.singletonList("b.googleapis.com"), routes, null);
Collections.singletonList("b.googleapis.com"), routes,
ImmutableMap.<String, FilterConfig>of());
List<VirtualHost> virtualHosts = Arrays.asList(vHost1, vHost2);
assertThat(XdsNameResolver.findVirtualHostForHostName(virtualHosts, hostname))
.isEqualTo(vHost1);;
@ -833,12 +869,9 @@ public class XdsNameResolverTest {
when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50%
// header abort, header abort rate = 60 %
HttpFault httpFilterFaultConfig = HttpFault.create(
FaultConfig httpFilterFaultConfig = FaultConfig.create(
null,
FaultAbort.forHeader(HttpFault.FractionalPercent.perHundred(70)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
FaultAbort.forHeader(FaultConfig.FractionalPercent.perHundred(70)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -868,12 +901,9 @@ public class XdsNameResolverTest {
verifyRpcFailed(observer, Status.UNIMPLEMENTED.withDescription("HTTP status code 404"));
// header abort, no header rate, fix rate = 60 %
httpFilterFaultConfig = HttpFault.create(
httpFilterFaultConfig = FaultConfig.create(
null,
FaultAbort.forHeader(HttpFault.FractionalPercent.perMillion(600_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
FaultAbort.forHeader(FaultConfig.FractionalPercent.perMillion(600_000)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -884,12 +914,9 @@ public class XdsNameResolverTest {
verifyRpcFailed(observer, Status.UNIMPLEMENTED.withDescription("HTTP status code 404"));
// header abort, no header rate, fix rate = 0
httpFilterFaultConfig = HttpFault.create(
httpFilterFaultConfig = FaultConfig.create(
null,
FaultAbort.forHeader(HttpFault.FractionalPercent.perMillion(0)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
FaultAbort.forHeader(FaultConfig.FractionalPercent.perMillion(0)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -900,14 +927,11 @@ public class XdsNameResolverTest {
verifyRpcSucceeded(observer);
// fixed abort, fix rate = 60%
httpFilterFaultConfig = HttpFault.create(
httpFilterFaultConfig = FaultConfig.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED.withDescription("unauthenticated"),
HttpFault.FractionalPercent.perMillion(600_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
FaultConfig.FractionalPercent.perMillion(600_000)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -918,14 +942,11 @@ public class XdsNameResolverTest {
verifyRpcFailed(observer, Status.UNAUTHENTICATED.withDescription("unauthenticated"));
// fixed abort, fix rate = 40%
httpFilterFaultConfig = HttpFault.create(
httpFilterFaultConfig = FaultConfig.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED.withDescription("unauthenticated"),
HttpFault.FractionalPercent.perMillion(400_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
FaultConfig.FractionalPercent.perMillion(400_000)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -943,13 +964,8 @@ public class XdsNameResolverTest {
when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50%
// header delay, header delay rate = 60 %
HttpFault httpFilterFaultConfig = HttpFault.create(
FaultDelay.forHeader(HttpFault.FractionalPercent.perHundred(70)),
null,
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
null);
FaultConfig httpFilterFaultConfig = FaultConfig.create(
FaultDelay.forHeader(FaultConfig.FractionalPercent.perHundred(70)), null, null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
@ -965,13 +981,8 @@ public class XdsNameResolverTest {
verifyRpcDelayed(observer, TimeUnit.MILLISECONDS.toNanos(1000));
// header delay, no header rate, fix rate = 60 %
httpFilterFaultConfig = HttpFault.create(
FaultDelay.forHeader(HttpFault.FractionalPercent.perMillion(600_000)),
null,
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
null);
httpFilterFaultConfig = FaultConfig.create(
FaultDelay.forHeader(FaultConfig.FractionalPercent.perMillion(600_000)), null, null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
@ -981,13 +992,8 @@ public class XdsNameResolverTest {
verifyRpcDelayed(observer, TimeUnit.MILLISECONDS.toNanos(1000));
// header delay, no header rate, fix rate = 0
httpFilterFaultConfig = HttpFault.create(
FaultDelay.forHeader(HttpFault.FractionalPercent.perMillion(0)),
null,
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
null);
httpFilterFaultConfig = FaultConfig.create(
FaultDelay.forHeader(FaultConfig.FractionalPercent.perMillion(0)), null, null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
@ -997,12 +1003,9 @@ public class XdsNameResolverTest {
verifyRpcSucceeded(observer);
// fixed delay, fix rate = 60%
httpFilterFaultConfig = HttpFault.create(
FaultDelay.forFixedDelay(5000L, HttpFault.FractionalPercent.perMillion(600_000)),
httpFilterFaultConfig = FaultConfig.create(
FaultDelay.forFixedDelay(5000L, FaultConfig.FractionalPercent.perMillion(600_000)),
null,
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -1013,159 +1016,9 @@ public class XdsNameResolverTest {
verifyRpcDelayed(observer, 5000L);
// fixed delay, fix rate = 40%
httpFilterFaultConfig = HttpFault.create(
FaultDelay.forFixedDelay(5000L, HttpFault.FractionalPercent.perMillion(400_000)),
httpFilterFaultConfig = FaultConfig.create(
FaultDelay.forFixedDelay(5000L, FaultConfig.FractionalPercent.perMillion(400_000)),
null,
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
}
@Test
public void resolved_faultAbortWithUpstreamClusterMismatchInLdsUpdate() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50%
HttpFault httpFilterFaultConfig = HttpFault.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED.withDescription("unauthenticated"),
HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
null);
// cluster mismatch with fault config
xdsClient.deliverLdsUpdateWithFaultInjection(cluster2, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
}
@Test
public void resolved_faultAbortWithDownstreamNodesInLdsUpdate() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50%
// downstream node match
HttpFault httpFilterFaultConfig = HttpFault.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED.withDescription("unauthenticated"),
HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.singletonList("node1.example.com"),
Collections.<HeaderMatcher>emptyList(),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, ImmutableMap.of(DOWNSTREAM_NODE_KEY.name(), "node1.example.com"),
CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNAUTHENTICATED.withDescription("unauthenticated"));
// downstream node mismatch
httpFilterFaultConfig = HttpFault.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED.withDescription("unauthenticated"),
HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.singletonList("node1.example.com"),
Collections.<HeaderMatcher>emptyList(),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of(DOWNSTREAM_NODE_KEY.name(), "node2.example.com"), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
// downstream node absent in headers
httpFilterFaultConfig = HttpFault.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED.withDescription("unauthenticated"),
HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.singletonList("node1.example.com"),
Collections.<HeaderMatcher>emptyList(),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
}
@Test
public void resolved_faultAbortWithHeaderMatcherInLdsUpdate() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50%
// headers match
HttpFault httpFilterFaultConfig = HttpFault.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED, HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.singletonList(HeaderMatcher.forExactValue("fault_key", "fault_value", false)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, ImmutableMap.of("fault_key", "fault_value"), CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNAUTHENTICATED);
// headers mismatch
httpFilterFaultConfig = HttpFault.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED.withDescription("unauthenticated"),
HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.singletonList(HeaderMatcher.forExactValue("fault_key", "fault_value", false)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of("fault_key", "value_not_match"), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
// headers absent
httpFilterFaultConfig = HttpFault.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED.withDescription("unauthenticated"),
HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.singletonList(HeaderMatcher.forExactValue("fault_key", "fault_value", false)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -1182,12 +1035,9 @@ public class XdsNameResolverTest {
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50%
HttpFault httpFilterFaultConfig = HttpFault.create(
FaultDelay.forFixedDelay(5000L, HttpFault.FractionalPercent.perMillion(1000_000)),
FaultConfig httpFilterFaultConfig = FaultConfig.create(
FaultDelay.forFixedDelay(5000L, FaultConfig.FractionalPercent.perMillion(1000_000)),
null,
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
/* maxActiveFaults= */ 1);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -1209,20 +1059,31 @@ public class XdsNameResolverTest {
verifyRpcDelayed(observer3, 5000L);
}
@Test
public void resolved_withNoRouterFilter() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdateWithNoRouterFilter();
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
ClientCall.Listener<Void> observer = startNewCall(
TestMethodDescriptors.voidMethod(), configSelector, Collections.<String, String>emptyMap(),
CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNAVAILABLE.withDescription("No router filter"));
}
@Test
public void resolved_faultAbortAndDelayInLdsUpdateInLdsUpdate() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50%
HttpFault httpFilterFaultConfig = HttpFault.create(
FaultDelay.forFixedDelay(5000L, HttpFault.FractionalPercent.perMillion(1000_000)),
FaultConfig httpFilterFaultConfig = FaultConfig.create(
FaultDelay.forFixedDelay(5000L, FaultConfig.FractionalPercent.perMillion(1000_000)),
FaultAbort.forStatus(
Status.UNAUTHENTICATED.withDescription("unauthenticated"),
HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
FaultConfig.FractionalPercent.perMillion(1000_000)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -1240,21 +1101,15 @@ public class XdsNameResolverTest {
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50%
HttpFault httpFilterFaultConfig = HttpFault.create(
FaultConfig httpFilterFaultConfig = FaultConfig.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED, HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
Status.UNAUTHENTICATED, FaultConfig.FractionalPercent.perMillion(1000_000)),
null);
// VirtualHost fault config override
HttpFault virtualHostFaultConfig = HttpFault.create(
FaultConfig virtualHostFaultConfig = FaultConfig.create(
null,
FaultAbort.forStatus(Status.INTERNAL, HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
FaultAbort.forStatus(Status.INTERNAL, FaultConfig.FractionalPercent.perMillion(1000_000)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(
cluster1, httpFilterFaultConfig, virtualHostFaultConfig, null, null);
@ -1266,12 +1121,9 @@ public class XdsNameResolverTest {
verifyRpcFailed(observer, Status.INTERNAL);
// Route fault config override
HttpFault routeFaultConfig = HttpFault.create(
FaultConfig routeFaultConfig = FaultConfig.create(
null,
FaultAbort.forStatus(Status.UNKNOWN, HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
FaultAbort.forStatus(Status.UNKNOWN, FaultConfig.FractionalPercent.perMillion(1000_000)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(
cluster1, httpFilterFaultConfig, virtualHostFaultConfig, routeFaultConfig, null);
@ -1283,12 +1135,10 @@ public class XdsNameResolverTest {
verifyRpcFailed(observer, Status.UNKNOWN);
// WeightedCluster fault config override
HttpFault weightedClusterFaultConfig = HttpFault.create(
FaultConfig weightedClusterFaultConfig = FaultConfig.create(
null,
FaultAbort.forStatus(Status.UNAVAILABLE, HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
FaultAbort.forStatus(
Status.UNAVAILABLE, FaultConfig.FractionalPercent.perMillion(1000_000)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(
cluster1, httpFilterFaultConfig, virtualHostFaultConfig, routeFaultConfig,
@ -1307,24 +1157,17 @@ public class XdsNameResolverTest {
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50%
HttpFault httpFilterFaultConfig = HttpFault.create(
FaultConfig httpFilterFaultConfig = FaultConfig.create(
null,
FaultAbort.forStatus(
Status.UNAUTHENTICATED, HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
Status.UNAUTHENTICATED, FaultConfig.FractionalPercent.perMillion(1000_000)),
null);
xdsClient.deliverRdsNameWithFaultInjection(AUTHORITY, httpFilterFaultConfig);
// Route fault config override
HttpFault routeFaultConfig = HttpFault.create(
FaultConfig routeFaultConfig = FaultConfig.create(
null,
FaultAbort.forStatus(Status.UNKNOWN, HttpFault.FractionalPercent.perMillion(1000_000)),
cluster1,
Collections.<String>emptyList(),
Collections.<HeaderMatcher>emptyList(),
FaultAbort.forStatus(Status.UNKNOWN, FaultConfig.FractionalPercent.perMillion(1000_000)),
null);
xdsClient.deliverRdsUpdateWithFaultInjection(null, routeFaultConfig, null);
verify(mockListener).onResult(resolutionResultCaptor.capture());
@ -1567,7 +1410,7 @@ public class XdsNameResolverTest {
if (!resourceName.equals(ldsResource)) {
return;
}
ldsWatcher.onChanged(new LdsUpdate(httpMaxStreamDurationNano, virtualHosts, false, null));
ldsWatcher.onChanged(new LdsUpdate(httpMaxStreamDurationNano, virtualHosts, null));
}
});
}
@ -1580,24 +1423,39 @@ public class XdsNameResolverTest {
return;
}
VirtualHost virtualHost = VirtualHost.create("virtual-host",
Collections.singletonList(AUTHORITY), routes, null);
ldsWatcher.onChanged(
new LdsUpdate(0, Collections.singletonList(virtualHost), false, null));
Collections.singletonList(AUTHORITY), routes,
ImmutableMap.<String, FilterConfig>of());
ldsWatcher.onChanged(new LdsUpdate(0, Collections.singletonList(virtualHost), null));
}
});
}
void deliverLdsUpdateWithFaultInjection(
final String cluster,
final HttpFault httpFilterFaultConfig,
final HttpFault virtualHostFaultConfig,
final HttpFault routeFaultConfig,
final HttpFault weightedClusterFaultConfig) {
FaultConfig httpFilterFaultConfig,
final FaultConfig virtualHostFaultConfig,
final FaultConfig routeFaultConfig,
final FaultConfig weightedClusterFaultConfig) {
if (httpFilterFaultConfig == null) {
httpFilterFaultConfig = FaultConfig.create(null, null, null);
}
final List<NamedFilterConfig> filterChain = ImmutableList.of(
new NamedFilterConfig(FAULT_FILTER_INSTANCE_NAME, httpFilterFaultConfig),
new NamedFilterConfig(ROUTER_FILTER_INSTANCE_NAME, RouterFilter.ROUTER_CONFIG));
syncContext.execute(new Runnable() {
@Override
public void run() {
ImmutableMap<String, FilterConfig> overrideConfig = weightedClusterFaultConfig == null
? ImmutableMap.<String, FilterConfig>of()
: ImmutableMap.<String, FilterConfig>of(
FAULT_FILTER_INSTANCE_NAME, weightedClusterFaultConfig);
ClusterWeight clusterWeight =
ClusterWeight.create(cluster, 100, weightedClusterFaultConfig);
ClusterWeight.create(
cluster, 100,
overrideConfig);
overrideConfig = routeFaultConfig == null
? ImmutableMap.<String, FilterConfig>of()
: ImmutableMap.<String, FilterConfig>of(FAULT_FILTER_INSTANCE_NAME, routeFaultConfig);
Route route = Route.create(
RouteMatch.create(
PathMatcher.fromPrefix("/", false), Collections.<HeaderMatcher>emptyList(), null),
@ -1605,24 +1463,46 @@ public class XdsNameResolverTest {
Collections.singletonList(clusterWeight),
Collections.<HashPolicy>emptyList(),
null),
routeFaultConfig);
overrideConfig);
overrideConfig = virtualHostFaultConfig == null
? ImmutableMap.<String, FilterConfig>of()
: ImmutableMap.<String, FilterConfig>of(
FAULT_FILTER_INSTANCE_NAME, virtualHostFaultConfig);
VirtualHost virtualHost = VirtualHost.create(
"virtual-host",
Collections.singletonList(AUTHORITY),
Collections.singletonList(route), virtualHostFaultConfig);
Collections.singletonList(route),
overrideConfig);
ldsWatcher.onChanged(
new LdsUpdate(
0, Collections.singletonList(virtualHost), true, httpFilterFaultConfig));
new LdsUpdate(0, Collections.singletonList(virtualHost), filterChain));
}
});
}
void deliverLdsUpdateWithNoRouterFilter() {
VirtualHost virtualHost = VirtualHost.create(
"virtual-host",
Collections.singletonList(AUTHORITY),
Collections.<Route>emptyList(),
Collections.<String, FilterConfig>emptyMap());
ldsWatcher.onChanged(
new LdsUpdate(
0, Collections.singletonList(virtualHost), ImmutableList.<NamedFilterConfig>of()));
}
void deliverRdsNameWithFaultInjection(
final String rdsName, final HttpFault httpFilterFaultConfig) {
final String rdsName, FaultConfig httpFilterFaultConfig) {
if (httpFilterFaultConfig == null) {
httpFilterFaultConfig = FaultConfig.create(
null, null, null);
}
final ImmutableList<NamedFilterConfig> filterChain = ImmutableList.of(
new NamedFilterConfig(FAULT_FILTER_INSTANCE_NAME, httpFilterFaultConfig),
new NamedFilterConfig(ROUTER_FILTER_INSTANCE_NAME, RouterFilter.ROUTER_CONFIG));
syncContext.execute(new Runnable() {
@Override
public void run() {
ldsWatcher.onChanged(new LdsUpdate(0, rdsName, true, httpFilterFaultConfig));
ldsWatcher.onChanged(new LdsUpdate(0, rdsName, filterChain));
}
});
}
@ -1634,7 +1514,8 @@ public class XdsNameResolverTest {
if (!resourceName.equals(ldsResource)) {
return;
}
ldsWatcher.onChanged(new LdsUpdate(0, rdsName, false, null));
ldsWatcher.onChanged(
new LdsUpdate(0, rdsName, null));
}
});
}
@ -1652,13 +1533,20 @@ public class XdsNameResolverTest {
}
void deliverRdsUpdateWithFaultInjection(
final HttpFault virtualHostFaultConfig, final HttpFault routFaultConfig,
final HttpFault weightedClusterFaultConfig) {
final FaultConfig virtualHostFaultConfig, final FaultConfig routFaultConfig,
final FaultConfig weightedClusterFaultConfig) {
syncContext.execute(new Runnable() {
@Override
public void run() {
ImmutableMap<String, FilterConfig> overrideConfig = weightedClusterFaultConfig == null
? ImmutableMap.<String, FilterConfig>of()
: ImmutableMap.<String, FilterConfig>of(
FAULT_FILTER_INSTANCE_NAME, weightedClusterFaultConfig);
ClusterWeight clusterWeight =
ClusterWeight.create(cluster1, 100, weightedClusterFaultConfig);
ClusterWeight.create(cluster1, 100, overrideConfig);
overrideConfig = routFaultConfig == null
? ImmutableMap.<String, FilterConfig>of()
: ImmutableMap.<String, FilterConfig>of(FAULT_FILTER_INSTANCE_NAME, routFaultConfig);
Route route = Route.create(
RouteMatch.create(
PathMatcher.fromPrefix("/", false), Collections.<HeaderMatcher>emptyList(), null),
@ -1666,11 +1554,16 @@ public class XdsNameResolverTest {
Collections.singletonList(clusterWeight),
Collections.<HashPolicy>emptyList(),
null),
routFaultConfig);
overrideConfig);
overrideConfig = virtualHostFaultConfig == null
? ImmutableMap.<String, FilterConfig>of()
: ImmutableMap.<String, FilterConfig>of(
FAULT_FILTER_INSTANCE_NAME, virtualHostFaultConfig);
VirtualHost virtualHost = VirtualHost.create(
"virtual-host",
Collections.singletonList(AUTHORITY),
Collections.singletonList(route), virtualHostFaultConfig);
Collections.singletonList(route),
overrideConfig);
rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost)));
}
});