From 3ebb3e19245fe2af6a24785dc6f6fd325b543f73 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Wed, 17 Mar 2021 16:37:13 -0700 Subject: [PATCH] xds: HttpFilter support --- .../java/io/grpc/xds/ClientXdsClient.java | 354 ++++++------ .../xds/{HttpFault.java => FaultConfig.java} | 30 +- .../main/java/io/grpc/xds/FaultFilter.java | 418 ++++++++++++++ xds/src/main/java/io/grpc/xds/Filter.java | 142 +++++ .../main/java/io/grpc/xds/FilterRegistry.java | 61 ++ xds/src/main/java/io/grpc/xds/LameFilter.java | 121 ++++ .../main/java/io/grpc/xds/MessagePrinter.java | 4 + .../main/java/io/grpc/xds/RouterFilter.java | 69 +++ .../main/java/io/grpc/xds/VirtualHost.java | 31 +- xds/src/main/java/io/grpc/xds/XdsClient.java | 37 +- .../java/io/grpc/xds/XdsNameResolver.java | 462 ++++----------- .../io/grpc/xds/ClientXdsClientDataTest.java | 133 +++-- .../io/grpc/xds/ClientXdsClientTestBase.java | 53 +- .../io/grpc/xds/ClientXdsClientV2Test.java | 2 +- .../io/grpc/xds/ClientXdsClientV3Test.java | 4 +- .../java/io/grpc/xds/FaultFilterTest.java | 62 +++ .../java/io/grpc/xds/XdsNameResolverTest.java | 525 +++++++----------- 17 files changed, 1554 insertions(+), 954 deletions(-) rename xds/src/main/java/io/grpc/xds/{HttpFault.java => FaultConfig.java} (77%) create mode 100644 xds/src/main/java/io/grpc/xds/FaultFilter.java create mode 100644 xds/src/main/java/io/grpc/xds/Filter.java create mode 100644 xds/src/main/java/io/grpc/xds/FilterRegistry.java create mode 100644 xds/src/main/java/io/grpc/xds/LameFilter.java create mode 100644 xds/src/main/java/io/grpc/xds/RouterFilter.java create mode 100644 xds/src/test/java/io/grpc/xds/FaultFilterTest.java diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 5823bc85f8..0b0a9bab15 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.xds.EnvoyServerProtoData.TRANSPORT_SOCKET_NAME_TLS; +import com.github.udpa.udpa.type.v1.TypedStruct; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CaseFormat; import com.google.common.base.Stopwatch; @@ -27,6 +28,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; import com.google.protobuf.util.Durations; import com.google.re2j.Pattern; import com.google.re2j.PatternSyntaxException; @@ -41,9 +43,7 @@ import io.envoyproxy.envoy.config.core.v3.RoutingPriority; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; -import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; import io.envoyproxy.envoy.type.v3.FractionalPercent; import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; @@ -52,14 +52,14 @@ import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; -import io.grpc.internal.GrpcUtil; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; -import io.grpc.xds.HttpFault.FaultAbort; -import io.grpc.xds.HttpFault.FaultDelay; +import io.grpc.xds.Filter.ConfigOrError; +import io.grpc.xds.Filter.FilterConfig; +import io.grpc.xds.Filter.NamedFilterConfig; import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; import io.grpc.xds.Matchers.FractionMatcher; @@ -96,7 +96,6 @@ final class ClientXdsClient extends AbstractXdsClient { static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15; @VisibleForTesting static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate"; - private static final String HTTP_FAULT_FILTER_NAME = "envoy.fault"; @VisibleForTesting static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id"; @VisibleForTesting @@ -117,6 +116,10 @@ final class ClientXdsClient extends AbstractXdsClient { "type.googleapis.com/envoy.config.cluster.aggregate.v2alpha.ClusterConfig"; private static final String TYPE_URL_CLUSTER_CONFIG = "type.googleapis.com/envoy.extensions.clusters.aggregate.v3.ClusterConfig"; + private static final String TYPE_URL_TYPED_STRUCT = + "type.googleapis.com/udpa.type.v1.TypedStruct"; + private static final String TYPE_URL_FILTER_CONFIG = + "type.googleapis.com/envoy.config.route.v3.FilterConfig"; private final Map ldsResourceSubscribers = new HashMap<>(); private final Map rdsResourceSubscribers = new HashMap<>(); @@ -140,8 +143,12 @@ final class ClientXdsClient extends AbstractXdsClient { // Unpack Listener messages. List listeners = new ArrayList<>(resources.size()); List listenerNames = new ArrayList<>(resources.size()); + boolean isResourceV3 = false; try { for (Any res : resources) { + if (res.getTypeUrl().equals(ResourceType.LDS.typeUrl())) { + isResourceV3 = true; + } Listener listener = unpackCompatibleType(res, Listener.class, ResourceType.LDS.typeUrl(), ResourceType.LDS.typeUrlV2()); listeners.add(listener); @@ -189,35 +196,34 @@ final class ClientXdsClient extends AbstractXdsClient { maxStreamDuration = Durations.toNanos(options.getMaxStreamDuration()); } } - boolean hasFaultInjection = false; - HttpFault httpFault = null; - if (enableFaultInjection) { - List httpFilters = hcm.getHttpFiltersList(); - for (HttpFilter httpFilter : httpFilters) { - if (HTTP_FAULT_FILTER_NAME.equals(httpFilter.getName())) { - hasFaultInjection = true; - if (httpFilter.hasTypedConfig()) { - StructOrError httpFaultOrError = - decodeFaultFilterConfig(httpFilter.getTypedConfig()); - if (httpFaultOrError != null) { - if (httpFaultOrError.getErrorDetail() != null) { - nackResponse(ResourceType.LDS, nonce, - "Listener " + listenerName + " contains invalid HttpFault filter: " - + httpFaultOrError.getErrorDetail()); - return; - } - httpFault = httpFaultOrError.getStruct(); - } - } - break; + boolean parseFilter = enableFaultInjection && isResourceV3; + List filterChain = null; + if (parseFilter) { + filterChain = new ArrayList<>(); + List + httpFilters = hcm.getHttpFiltersList(); + for (io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter + httpFilter : httpFilters) { + String filterName = httpFilter.getName(); + StructOrError filterConfig = parseHttpFilter(httpFilter); + if (filterConfig == null) { + continue; } + if (filterConfig.errorDetail != null) { + nackResponse( + ResourceType.LDS, nonce, + "Error parsing HttpFilter: " + filterConfig.errorDetail); + return; + } + filterChain.add(new NamedFilterConfig(filterName, filterConfig.struct)); } } + if (hcm.hasRouteConfig()) { List virtualHosts = new ArrayList<>(); for (io.envoyproxy.envoy.config.route.v3.VirtualHost virtualHostProto : hcm.getRouteConfig().getVirtualHostsList()) { - StructOrError virtualHost = parseVirtualHost(virtualHostProto); + StructOrError virtualHost = parseVirtualHost(virtualHostProto, parseFilter); if (virtualHost.getErrorDetail() != null) { nackResponse(ResourceType.LDS, nonce, "Listener " + listenerName + " contains invalid virtual host: " @@ -226,7 +232,7 @@ final class ClientXdsClient extends AbstractXdsClient { } virtualHosts.add(virtualHost.getStruct()); } - update = new LdsUpdate(maxStreamDuration, virtualHosts, hasFaultInjection, httpFault); + update = new LdsUpdate(maxStreamDuration, virtualHosts, filterChain); } else if (hcm.hasRds()) { Rds rds = hcm.getRds(); if (!rds.getConfigSource().hasAds()) { @@ -234,8 +240,8 @@ final class ClientXdsClient extends AbstractXdsClient { "Listener " + listenerName + " with RDS config_source not set to ADS"); return; } - update = new LdsUpdate( - maxStreamDuration, rds.getRouteConfigName(), hasFaultInjection, httpFault); + update = + new LdsUpdate(maxStreamDuration, rds.getRouteConfigName(), filterChain); rdsNames.add(rds.getRouteConfigName()); } else { nackResponse(ResourceType.LDS, nonce, @@ -277,6 +283,77 @@ final class ClientXdsClient extends AbstractXdsClient { } } + @VisibleForTesting + @Nullable // Returns null if the filter is optional but not supported. + static StructOrError parseHttpFilter( + io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter + httpFilter) { + String filterName = httpFilter.getName(); + boolean isOptional = httpFilter.getIsOptional(); + if (!httpFilter.hasTypedConfig()) { + if (isOptional) { + return null; + } else { + return StructOrError.fromError( + "HttpFilter [" + filterName + "] is not optional and has no typed config"); + } + } + return parseRawFilterConfig(filterName, httpFilter.getTypedConfig(), isOptional, false); + } + + @Nullable // Returns null if the filter should be ignored. + private static StructOrError parseRawFilterConfig( + String filterName, Any anyConfig, Boolean isOptional, boolean isOverrideConfig) { + checkArgument( + isOptional != null || isOverrideConfig, "isOptional can't be null for top-level config"); + String typeUrl = anyConfig.getTypeUrl(); + if (isOverrideConfig) { + isOptional = false; + if (typeUrl.equals(TYPE_URL_FILTER_CONFIG)) { + io.envoyproxy.envoy.config.route.v3.FilterConfig filterConfig; + try { + filterConfig = + anyConfig.unpack(io.envoyproxy.envoy.config.route.v3.FilterConfig.class); + } catch (InvalidProtocolBufferException e) { + return StructOrError.fromError( + "HttpFilter [" + filterName + "] contains invalid proto: " + e); + } + isOptional = filterConfig.getIsOptional(); + anyConfig = filterConfig.getConfig(); + typeUrl = anyConfig.getTypeUrl(); + } + } + Message rawConfig = anyConfig; + if (anyConfig.getTypeUrl().equals(TYPE_URL_TYPED_STRUCT)) { + TypedStruct typedStruct; + try { + typedStruct = anyConfig.unpack(TypedStruct.class); + } catch (InvalidProtocolBufferException e) { + return StructOrError.fromError( + "HttpFilter [" + filterName + "] contains invalid proto: " + e); + } + typeUrl = typedStruct.getTypeUrl(); + rawConfig = typedStruct.getValue(); + } + Filter filter = FilterRegistry.getDefaultRegistry().get(typeUrl); + if (filter == null) { + if (isOptional) { + return null; + } else { + return StructOrError.fromError( + "HttpFilter [" + filterName + "] is not optional and has an unsupported config type: " + + typeUrl); + } + } + ConfigOrError filterConfig = isOverrideConfig + ? filter.parseFilterConfigOverride(rawConfig) : filter.parseFilterConfig(rawConfig); + if (filterConfig.errorDetail != null) { + return StructOrError.fromError( + "Invalid filter config for HttpFilter [" + filterName + "]: " + filterConfig.errorDetail); + } + return StructOrError.fromStruct(filterConfig.config); + } + @VisibleForTesting static StructOrError parseServerSideListener( Listener listener) { try { @@ -291,11 +368,11 @@ final class ClientXdsClient extends AbstractXdsClient { } private static StructOrError parseVirtualHost( - io.envoyproxy.envoy.config.route.v3.VirtualHost proto) { + io.envoyproxy.envoy.config.route.v3.VirtualHost proto, boolean parseFilter) { String name = proto.getName(); List routes = new ArrayList<>(proto.getRoutesCount()); for (io.envoyproxy.envoy.config.route.v3.Route routeProto : proto.getRoutesList()) { - StructOrError route = parseRoute(routeProto); + StructOrError route = parseRoute(routeProto, parseFilter); if (route == null) { continue; } @@ -305,27 +382,43 @@ final class ClientXdsClient extends AbstractXdsClient { } routes.add(route.getStruct()); } - HttpFault httpFault = null; - Map filterConfigMap = proto.getTypedPerFilterConfigMap(); - if (filterConfigMap.containsKey(HTTP_FAULT_FILTER_NAME)) { - Any rawFaultFilterConfig = filterConfigMap.get(HTTP_FAULT_FILTER_NAME); - StructOrError httpFaultOrError = decodeFaultFilterConfig(rawFaultFilterConfig); - if (httpFaultOrError != null) { - if (httpFaultOrError.getErrorDetail() != null) { - return StructOrError.fromError( - "Virtual host [" + name + "] contains invalid HttpFault filter : " - + httpFaultOrError.getErrorDetail()); - } - httpFault = httpFaultOrError.getStruct(); - } + if (!parseFilter) { + return StructOrError.fromStruct(VirtualHost.create( + name, proto.getDomainsList(), routes, new HashMap())); + } + StructOrError> overrideConfigs = + parseOverrideFilterConfigs(proto.getTypedPerFilterConfigMap()); + if (overrideConfigs.errorDetail != null) { + return StructOrError.fromError( + "VirtualHost [" + proto.getName() + "] contains invalid HttpFilter config: " + + overrideConfigs.errorDetail); } return StructOrError.fromStruct(VirtualHost.create( - name, proto.getDomainsList(), routes, httpFault)); + name, proto.getDomainsList(), routes, overrideConfigs.struct)); + } + + @VisibleForTesting + static StructOrError> parseOverrideFilterConfigs( + Map rawFilterConfigMap) { + Map overrideConfigs = new HashMap<>(); + for (String name : rawFilterConfigMap.keySet()) { + Any anyConfig = rawFilterConfigMap.get(name); + StructOrError filterConfig = parseRawFilterConfig(name, anyConfig, null, true); + if (filterConfig == null) { + continue; + } + if (filterConfig.errorDetail != null) { + return StructOrError.fromError(filterConfig.errorDetail); + } + overrideConfigs.put(name, filterConfig.struct); + } + return StructOrError.fromStruct(overrideConfigs); } @VisibleForTesting @Nullable - static StructOrError parseRoute(io.envoyproxy.envoy.config.route.v3.Route proto) { + static StructOrError parseRoute( + io.envoyproxy.envoy.config.route.v3.Route proto, boolean parseFilter) { StructOrError routeMatch = parseRouteMatch(proto.getMatch()); if (routeMatch == null) { return null; @@ -338,7 +431,7 @@ final class ClientXdsClient extends AbstractXdsClient { StructOrError routeAction; switch (proto.getActionCase()) { case ROUTE: - routeAction = parseRouteAction(proto.getRoute()); + routeAction = parseRouteAction(proto.getRoute(), parseFilter); break; case REDIRECT: return StructOrError.fromError("Unsupported action type: redirect"); @@ -357,23 +450,19 @@ final class ClientXdsClient extends AbstractXdsClient { return StructOrError.fromError( "Invalid route [" + proto.getName() + "]: " + routeAction.getErrorDetail()); } - - HttpFault httpFault = null; - Map filterConfigMap = proto.getTypedPerFilterConfigMap(); - if (filterConfigMap.containsKey(HTTP_FAULT_FILTER_NAME)) { - Any rawFaultFilterConfig = filterConfigMap.get(HTTP_FAULT_FILTER_NAME); - StructOrError httpFaultOrError = decodeFaultFilterConfig(rawFaultFilterConfig); - if (httpFaultOrError != null) { - if (httpFaultOrError.getErrorDetail() != null) { - return StructOrError.fromError( - "Route [" + proto.getName() + "] contains invalid HttpFault filter: " - + httpFaultOrError.getErrorDetail()); - } - httpFault = httpFaultOrError.getStruct(); - } + if (!parseFilter) { + return StructOrError.fromStruct(Route.create( + routeMatch.getStruct(), routeAction.getStruct(), new HashMap())); + } + StructOrError> overrideConfigs = + parseOverrideFilterConfigs(proto.getTypedPerFilterConfigMap()); + if (overrideConfigs.errorDetail != null) { + return StructOrError.fromError( + "Route [" + proto.getName() + "] contains invalid HttpFilter config: " + + overrideConfigs.errorDetail); } return StructOrError.fromStruct(Route.create( - routeMatch.getStruct(), routeAction.getStruct(), httpFault)); + routeMatch.getStruct(), routeAction.getStruct(), overrideConfigs.struct)); } @VisibleForTesting @@ -499,7 +588,7 @@ final class ClientXdsClient extends AbstractXdsClient { @VisibleForTesting @Nullable static StructOrError parseRouteAction( - io.envoyproxy.envoy.config.route.v3.RouteAction proto) { + io.envoyproxy.envoy.config.route.v3.RouteAction proto, boolean parseFilter) { Long timeoutNano = null; if (proto.hasMaxStreamDuration()) { io.envoyproxy.envoy.config.route.v3.RouteAction.MaxStreamDuration maxStreamDuration @@ -557,7 +646,8 @@ final class ClientXdsClient extends AbstractXdsClient { List weightedClusters = new ArrayList<>(); for (io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight clusterWeight : clusterWeights) { - StructOrError clusterWeightOrError = parseClusterWeight(clusterWeight); + StructOrError clusterWeightOrError = + parseClusterWeight(clusterWeight, parseFilter); if (clusterWeightOrError.getErrorDetail() != null) { return StructOrError.fromError("RouteAction contains invalid ClusterWeight: " + clusterWeightOrError.getErrorDetail()); @@ -576,130 +666,33 @@ final class ClientXdsClient extends AbstractXdsClient { @VisibleForTesting static StructOrError parseClusterWeight( - io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight proto) { - HttpFault httpFault = null; - Map filterConfigMap = proto.getTypedPerFilterConfigMap(); - if (filterConfigMap.containsKey(HTTP_FAULT_FILTER_NAME)) { - Any rawFaultFilterConfig = filterConfigMap.get(HTTP_FAULT_FILTER_NAME); - StructOrError httpFaultOrError = decodeFaultFilterConfig(rawFaultFilterConfig); - if (httpFaultOrError != null) { - if (httpFaultOrError.getErrorDetail() != null) { - return StructOrError.fromError( - "ClusterWeight [" + proto.getName() + "] contains invalid HttpFault filter: " - + httpFaultOrError.getErrorDetail()); - } - httpFault = httpFaultOrError.getStruct(); - } + io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight proto, + boolean parseFilter) { + if (!parseFilter) { + return StructOrError.fromStruct(ClusterWeight.create( + proto.getName(), proto.getWeight().getValue(), new HashMap())); } - return StructOrError.fromStruct( - ClusterWeight.create(proto.getName(), proto.getWeight().getValue(), httpFault)); - } - - @Nullable - private static StructOrError decodeFaultFilterConfig(Any rawFaultFilterConfig) { - if (!rawFaultFilterConfig.getTypeUrl().equals( - "type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault")) { - return null; - } - HTTPFault httpFaultProto; - try { - httpFaultProto = rawFaultFilterConfig.unpack(HTTPFault.class); - } catch (InvalidProtocolBufferException e) { - return StructOrError.fromError("Invalid proto: " + e); - } - return parseHttpFault(httpFaultProto); - } - - private static StructOrError parseHttpFault(HTTPFault httpFault) { - FaultDelay faultDelay = null; - FaultAbort faultAbort = null; - if (httpFault.hasDelay()) { - faultDelay = parseFaultDelay(httpFault.getDelay()); - } - if (httpFault.hasAbort()) { - StructOrError faultAbortOrError = parseFaultAbort(httpFault.getAbort()); - if (faultAbortOrError.getErrorDetail() != null) { - return StructOrError.fromError( - "HttpFault contains invalid FaultAbort: " + faultAbortOrError.getErrorDetail()); - } - faultAbort = faultAbortOrError.getStruct(); - } - if (faultDelay == null && faultAbort == null) { + StructOrError> overrideConfigs = + parseOverrideFilterConfigs(proto.getTypedPerFilterConfigMap()); + if (overrideConfigs.errorDetail != null) { return StructOrError.fromError( - "Invalid HttpFault: neither fault_delay nor fault_abort is specified"); - } - String upstreamCluster = httpFault.getUpstreamCluster(); - List downstreamNodes = httpFault.getDownstreamNodesList(); - List headers = new ArrayList<>(); - for (io.envoyproxy.envoy.config.route.v3.HeaderMatcher proto : httpFault.getHeadersList()) { - StructOrError headerMatcherOrError = parseHeaderMatcher(proto); - if (headerMatcherOrError.getErrorDetail() != null) { - return StructOrError.fromError( - "HttpFault contains invalid header matcher: " - + headerMatcherOrError.getErrorDetail()); - } - headers.add(headerMatcherOrError.getStruct()); - } - Integer maxActiveFaults = null; - if (httpFault.hasMaxActiveFaults()) { - maxActiveFaults = httpFault.getMaxActiveFaults().getValue(); - if (maxActiveFaults < 0) { - maxActiveFaults = Integer.MAX_VALUE; - } - } - return StructOrError.fromStruct(HttpFault.create( - faultDelay, faultAbort, upstreamCluster, downstreamNodes, headers, maxActiveFaults)); - } - - private static FaultDelay parseFaultDelay( - io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) { - HttpFault.FractionalPercent percent = parsePercent(faultDelay.getPercentage()); - if (faultDelay.hasHeaderDelay()) { - return FaultDelay.forHeader(percent); - } - return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent); - } - - @VisibleForTesting - static StructOrError parseFaultAbort( - io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) { - HttpFault.FractionalPercent percent = parsePercent(faultAbort.getPercentage()); - switch (faultAbort.getErrorTypeCase()) { - case HEADER_ABORT: - return StructOrError.fromStruct(FaultAbort.forHeader(percent)); - case HTTP_STATUS: - return StructOrError.fromStruct(FaultAbort.forStatus( - GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent)); - case GRPC_STATUS: - return StructOrError.fromStruct(FaultAbort.forStatus( - Status.fromCodeValue(faultAbort.getGrpcStatus()), percent)); - case ERRORTYPE_NOT_SET: - default: - return StructOrError.fromError( - "Unknown error type case: " + faultAbort.getErrorTypeCase()); - } - } - - private static HttpFault.FractionalPercent parsePercent(FractionalPercent proto) { - switch (proto.getDenominator()) { - case HUNDRED: - return HttpFault.FractionalPercent.perHundred(proto.getNumerator()); - case TEN_THOUSAND: - return HttpFault.FractionalPercent.perTenThousand(proto.getNumerator()); - case MILLION: - return HttpFault.FractionalPercent.perMillion(proto.getNumerator()); - case UNRECOGNIZED: - default: - throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator()); + "ClusterWeight [" + proto.getName() + "] contains invalid HttpFilter config: " + + overrideConfigs.errorDetail); } + return StructOrError.fromStruct(ClusterWeight.create( + proto.getName(), proto.getWeight().getValue(), overrideConfigs.struct)); } @Override protected void handleRdsResponse(String versionInfo, List resources, String nonce) { // Unpack RouteConfiguration messages. Map routeConfigs = new HashMap<>(resources.size()); + boolean isResourceV3 = false; try { for (Any res : resources) { + if (res.getTypeUrl().equals(ResourceType.RDS.typeUrl())) { + isResourceV3 = true; + } RouteConfiguration rc = unpackCompatibleType(res, RouteConfiguration.class, ResourceType.RDS.typeUrl(), ResourceType.RDS.typeUrlV2()); @@ -715,6 +708,7 @@ final class ClientXdsClient extends AbstractXdsClient { XdsLogLevel.INFO, "Received RDS response for resources: {0}", routeConfigs.keySet()); Map rdsUpdates = new HashMap<>(); + boolean parseFilter = enableFaultInjection && isResourceV3; for (Map.Entry entry : routeConfigs.entrySet()) { String routeConfigName = entry.getKey(); RouteConfiguration routeConfig = entry.getValue(); @@ -722,7 +716,7 @@ final class ClientXdsClient extends AbstractXdsClient { new ArrayList<>(routeConfig.getVirtualHostsCount()); for (io.envoyproxy.envoy.config.route.v3.VirtualHost virtualHostProto : routeConfig.getVirtualHostsList()) { - StructOrError virtualHost = parseVirtualHost(virtualHostProto); + StructOrError virtualHost = parseVirtualHost(virtualHostProto, parseFilter); if (virtualHost.getErrorDetail() != null) { nackResponse(ResourceType.RDS, nonce, "RouteConfiguration " + routeConfigName + " contains invalid virtual host: " + virtualHost.getErrorDetail()); diff --git a/xds/src/main/java/io/grpc/xds/HttpFault.java b/xds/src/main/java/io/grpc/xds/FaultConfig.java similarity index 77% rename from xds/src/main/java/io/grpc/xds/HttpFault.java rename to xds/src/main/java/io/grpc/xds/FaultConfig.java index 252fd41d76..95a1cd4704 100644 --- a/xds/src/main/java/io/grpc/xds/HttpFault.java +++ b/xds/src/main/java/io/grpc/xds/FaultConfig.java @@ -19,35 +19,31 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; import io.grpc.Status; -import io.grpc.xds.Matchers.HeaderMatcher; -import java.util.List; +import io.grpc.xds.Filter.FilterConfig; import javax.annotation.Nullable; /** Fault injection configurations. */ @AutoValue -abstract class HttpFault { +abstract class FaultConfig implements FilterConfig { @Nullable abstract FaultDelay faultDelay(); @Nullable abstract FaultAbort faultAbort(); - abstract String upstreamCluster(); - - abstract ImmutableList downstreamNodes(); - - abstract ImmutableList headers(); - @Nullable abstract Integer maxActiveFaults(); - static HttpFault create(@Nullable FaultDelay faultDelay, @Nullable FaultAbort faultAbort, - String upstreamCluster, List downstreamNodes, List headers, + @Override + public final String typeUrl() { + return FaultFilter.TYPE_URL; + } + + static FaultConfig create( + @Nullable FaultDelay faultDelay, @Nullable FaultAbort faultAbort, @Nullable Integer maxActiveFaults) { - return new AutoValue_HttpFault(faultDelay, faultAbort, upstreamCluster, - ImmutableList.copyOf(downstreamNodes), ImmutableList.copyOf(headers), maxActiveFaults); + return new AutoValue_FaultConfig(faultDelay, faultAbort, maxActiveFaults); } /** Fault configurations for aborting requests. */ @@ -70,7 +66,7 @@ abstract class HttpFault { private static FaultDelay create( @Nullable Long delayNanos, boolean headerDelay, FractionalPercent percent) { - return new AutoValue_HttpFault_FaultDelay(delayNanos, headerDelay, percent); + return new AutoValue_FaultConfig_FaultDelay(delayNanos, headerDelay, percent); } } @@ -95,7 +91,7 @@ abstract class HttpFault { private static FaultAbort create( @Nullable Status status, boolean headerAbort, FractionalPercent percent) { - return new AutoValue_HttpFault_FaultAbort(status, headerAbort, percent); + return new AutoValue_FaultConfig_FaultAbort(status, headerAbort, percent); } } @@ -123,7 +119,7 @@ abstract class HttpFault { static FractionalPercent create( int numerator, DenominatorType denominatorType) { - return new AutoValue_HttpFault_FractionalPercent(numerator, denominatorType); + return new AutoValue_FaultConfig_FractionalPercent(numerator, denominatorType); } } } diff --git a/xds/src/main/java/io/grpc/xds/FaultFilter.java b/xds/src/main/java/io/grpc/xds/FaultFilter.java new file mode 100644 index 0000000000..a531407baa --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/FaultFilter.java @@ -0,0 +1,418 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault; +import io.envoyproxy.envoy.type.v3.FractionalPercent; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.Deadline; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.internal.DelayedClientCall; +import io.grpc.internal.GrpcUtil; +import io.grpc.xds.FaultConfig.FaultAbort; +import io.grpc.xds.FaultConfig.FaultDelay; +import io.grpc.xds.Filter.ClientInterceptorBuilder; +import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; + +/** HttpFault filter implementation. */ +final class FaultFilter implements Filter, ClientInterceptorBuilder { + + static final FaultFilter INSTANCE = + new FaultFilter(ThreadSafeRandomImpl.instance, new AtomicLong()); + @VisibleForTesting + static final Metadata.Key HEADER_DELAY_KEY = + Metadata.Key.of("x-envoy-fault-delay-request", Metadata.ASCII_STRING_MARSHALLER); + @VisibleForTesting + static final Metadata.Key HEADER_DELAY_PERCENTAGE_KEY = + Metadata.Key.of("x-envoy-fault-delay-request-percentage", Metadata.ASCII_STRING_MARSHALLER); + @VisibleForTesting + static final Metadata.Key HEADER_ABORT_HTTP_STATUS_KEY = + Metadata.Key.of("x-envoy-fault-abort-request", Metadata.ASCII_STRING_MARSHALLER); + @VisibleForTesting + static final Metadata.Key HEADER_ABORT_GRPC_STATUS_KEY = + Metadata.Key.of("x-envoy-fault-abort-grpc-request", Metadata.ASCII_STRING_MARSHALLER); + @VisibleForTesting + static final Metadata.Key HEADER_ABORT_PERCENTAGE_KEY = + Metadata.Key.of("x-envoy-fault-abort-request-percentage", Metadata.ASCII_STRING_MARSHALLER); + static final String TYPE_URL = + "type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault"; + + private final ThreadSafeRandom random; + private final AtomicLong activeFaultCounter; + + @VisibleForTesting + FaultFilter(ThreadSafeRandom random, AtomicLong activeFaultCounter) { + this.random = random; + this.activeFaultCounter = activeFaultCounter; + } + + @Override + public String[] typeUrls() { + return new String[] { TYPE_URL }; + } + + @Override + public ConfigOrError parseFilterConfig(Message rawProtoMessage) { + HTTPFault httpFaultProto; + if (!(rawProtoMessage instanceof Any)) { + return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass()); + } + Any anyMessage = (Any) rawProtoMessage; + try { + httpFaultProto = anyMessage.unpack(HTTPFault.class); + } catch (InvalidProtocolBufferException e) { + return ConfigOrError.fromError("Invalid proto: " + e); + } + return parseHttpFault(httpFaultProto); + } + + private static ConfigOrError parseHttpFault(HTTPFault httpFault) { + FaultDelay faultDelay = null; + FaultAbort faultAbort = null; + if (httpFault.hasDelay()) { + faultDelay = parseFaultDelay(httpFault.getDelay()); + } + if (httpFault.hasAbort()) { + ConfigOrError faultAbortOrError = parseFaultAbort(httpFault.getAbort()); + if (faultAbortOrError.errorDetail != null) { + return ConfigOrError.fromError( + "HttpFault contains invalid FaultAbort: " + faultAbortOrError.errorDetail); + } + faultAbort = faultAbortOrError.config; + } + Integer maxActiveFaults = null; + if (httpFault.hasMaxActiveFaults()) { + maxActiveFaults = httpFault.getMaxActiveFaults().getValue(); + if (maxActiveFaults < 0) { + maxActiveFaults = Integer.MAX_VALUE; + } + } + return ConfigOrError.fromConfig(FaultConfig.create(faultDelay, faultAbort, maxActiveFaults)); + } + + private static FaultDelay parseFaultDelay( + io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) { + FaultConfig.FractionalPercent percent = parsePercent(faultDelay.getPercentage()); + if (faultDelay.hasHeaderDelay()) { + return FaultDelay.forHeader(percent); + } + return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent); + } + + @VisibleForTesting + static ConfigOrError parseFaultAbort( + io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) { + FaultConfig.FractionalPercent percent = parsePercent(faultAbort.getPercentage()); + switch (faultAbort.getErrorTypeCase()) { + case HEADER_ABORT: + return ConfigOrError.fromConfig(FaultAbort.forHeader(percent)); + case HTTP_STATUS: + return ConfigOrError.fromConfig(FaultAbort.forStatus( + GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent)); + case GRPC_STATUS: + return ConfigOrError.fromConfig(FaultAbort.forStatus( + Status.fromCodeValue(faultAbort.getGrpcStatus()), percent)); + case ERRORTYPE_NOT_SET: + default: + return ConfigOrError.fromError( + "Unknown error type case: " + faultAbort.getErrorTypeCase()); + } + } + + private static FaultConfig.FractionalPercent parsePercent(FractionalPercent proto) { + switch (proto.getDenominator()) { + case HUNDRED: + return FaultConfig.FractionalPercent.perHundred(proto.getNumerator()); + case TEN_THOUSAND: + return FaultConfig.FractionalPercent.perTenThousand(proto.getNumerator()); + case MILLION: + return FaultConfig.FractionalPercent.perMillion(proto.getNumerator()); + case UNRECOGNIZED: + default: + throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator()); + } + } + + @Override + public ConfigOrError parseFilterConfigOverride(Message rawProtoMessage) { + return parseFilterConfig(rawProtoMessage); + } + + @Nullable + @Override + public ClientInterceptor buildClientInterceptor( + FilterConfig config, @Nullable FilterConfig overrideConfig, PickSubchannelArgs args, + final ScheduledExecutorService scheduler) { + checkNotNull(config, "config"); + if (overrideConfig != null) { + config = overrideConfig; + } + FaultConfig faultConfig = (FaultConfig) config; + Long delayNanos = null; + Status abortStatus = null; + if (faultConfig.maxActiveFaults() == null + || activeFaultCounter.get() < faultConfig.maxActiveFaults()) { + Metadata headers = args.getHeaders(); + if (faultConfig.faultDelay() != null) { + delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers); + } + if (faultConfig.faultAbort() != null) { + abortStatus = determineFaultAbortStatus(faultConfig.faultAbort(), headers); + } + } + if (delayNanos == null && abortStatus == null) { + return null; + } + final Long finalDelayNanos = delayNanos; + final Status finalAbortStatus = abortStatus; + final class FaultInjectionInterceptor implements ClientInterceptor { + @Override + public ClientCall interceptCall( + final MethodDescriptor method, final CallOptions callOptions, + final Channel next) { + Executor callExecutor = callOptions.getExecutor(); + if (callExecutor == null) { // This should never happen in practice because + // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with + // a callExecutor. + // TODO(https://github.com/grpc/grpc-java/issues/7868) + callExecutor = MoreExecutors.directExecutor(); + } + if (finalDelayNanos != null && finalAbortStatus != null) { + return new DelayInjectedCall<>( + finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(), + Suppliers.ofInstance( + new FailingClientCall(finalAbortStatus, callExecutor))); + } + if (finalAbortStatus != null) { + return new FailingClientCall<>(finalAbortStatus, callExecutor); + } else { + return new DelayInjectedCall<>( + finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(), + new Supplier>() { + @Override + public ClientCall get() { + return next.newCall(method, callOptions); + } + }); + } + } + } + + return new FaultInjectionInterceptor(); + } + + @Nullable + private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) { + Long delayNanos; + FaultConfig.FractionalPercent fractionalPercent = faultDelay.percent(); + if (faultDelay.headerDelay()) { + try { + int delayMillis = Integer.parseInt(headers.get(HEADER_DELAY_KEY)); + delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis); + String delayPercentageStr = headers.get(HEADER_DELAY_PERCENTAGE_KEY); + if (delayPercentageStr != null) { + int delayPercentage = Integer.parseInt(delayPercentageStr); + if (delayPercentage >= 0 && delayPercentage < fractionalPercent.numerator()) { + fractionalPercent = FaultConfig.FractionalPercent.create( + delayPercentage, fractionalPercent.denominatorType()); + } + } + } catch (NumberFormatException e) { + return null; // treated as header_delay not applicable + } + } else { + delayNanos = faultDelay.delayNanos(); + } + if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) { + return null; + } + return delayNanos; + } + + @Nullable + private Status determineFaultAbortStatus(FaultAbort faultAbort, Metadata headers) { + Status abortStatus = null; + FaultConfig.FractionalPercent fractionalPercent = faultAbort.percent(); + if (faultAbort.headerAbort()) { + try { + String grpcCodeStr = headers.get(HEADER_ABORT_GRPC_STATUS_KEY); + if (grpcCodeStr != null) { + int grpcCode = Integer.parseInt(grpcCodeStr); + abortStatus = Status.fromCodeValue(grpcCode); + } + String httpCodeStr = headers.get(HEADER_ABORT_HTTP_STATUS_KEY); + if (httpCodeStr != null) { + int httpCode = Integer.parseInt(httpCodeStr); + abortStatus = GrpcUtil.httpStatusToGrpcStatus(httpCode); + } + String abortPercentageStr = headers.get(HEADER_ABORT_PERCENTAGE_KEY); + if (abortPercentageStr != null) { + int abortPercentage = + Integer.parseInt(headers.get(HEADER_ABORT_PERCENTAGE_KEY)); + if (abortPercentage >= 0 && abortPercentage < fractionalPercent.numerator()) { + fractionalPercent = FaultConfig.FractionalPercent.create( + abortPercentage, fractionalPercent.denominatorType()); + } + } + } catch (NumberFormatException e) { + return null; // treated as header_abort not applicable + } + } else { + abortStatus = faultAbort.status(); + } + if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) { + return null; + } + return abortStatus; + } + + private static int getRatePerMillion(FaultConfig.FractionalPercent percent) { + int numerator = percent.numerator(); + FaultConfig.FractionalPercent.DenominatorType type = percent.denominatorType(); + switch (type) { + case TEN_THOUSAND: + numerator *= 100; + break; + case HUNDRED: + numerator *= 10_000; + break; + case MILLION: + default: + break; + } + if (numerator > 1_000_000 || numerator < 0) { + numerator = 1_000_000; + } + return numerator; + } + + /** A {@link DelayedClientCall} with a fixed delay. */ + private final class DelayInjectedCall extends DelayedClientCall { + final Object lock = new Object(); + ScheduledFuture delayTask; + boolean cancelled; + + DelayInjectedCall( + long delayNanos, Executor callExecutor, ScheduledExecutorService scheduler, + @Nullable Deadline deadline, + final Supplier> callSupplier) { + super(callExecutor, scheduler, deadline); + activeFaultCounter.incrementAndGet(); + ScheduledFuture task = scheduler.schedule( + new Runnable() { + @Override + public void run() { + synchronized (lock) { + if (!cancelled) { + activeFaultCounter.decrementAndGet(); + } + } + setCall(callSupplier.get()); + } + }, + delayNanos, + NANOSECONDS); + synchronized (lock) { + if (!cancelled) { + delayTask = task; + return; + } + } + task.cancel(false); + } + + @Override + protected void callCancelled() { + ScheduledFuture savedDelayTask; + synchronized (lock) { + cancelled = true; + activeFaultCounter.decrementAndGet(); + savedDelayTask = delayTask; + } + if (savedDelayTask != null) { + savedDelayTask.cancel(false); + } + } + } + + /** An implementation of {@link ClientCall} that fails when started. */ + private final class FailingClientCall extends ClientCall { + final Status error; + final Executor callExecutor; + final Context context; + + FailingClientCall(Status error, Executor callExecutor) { + this.error = error; + this.callExecutor = callExecutor; + this.context = Context.current(); + } + + @Override + public void start(final ClientCall.Listener listener, Metadata headers) { + activeFaultCounter.incrementAndGet(); + callExecutor.execute( + new Runnable() { + @Override + public void run() { + Context previous = context.attach(); + try { + listener.onClose(error, new Metadata()); + activeFaultCounter.decrementAndGet(); + } finally { + context.detach(previous); + } + } + }); + } + + @Override + public void request(int numMessages) {} + + @Override + public void cancel(String message, Throwable cause) {} + + @Override + public void halfClose() {} + + @Override + public void sendMessage(ReqT message) {} + } +} diff --git a/xds/src/main/java/io/grpc/xds/Filter.java b/xds/src/main/java/io/grpc/xds/Filter.java new file mode 100644 index 0000000000..f8372cdd1b --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/Filter.java @@ -0,0 +1,142 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; +import com.google.protobuf.Message; +import io.grpc.ClientInterceptor; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.ServerInterceptor; +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nullable; + +/** + * Defines the parsing functionality of an HTTP filter. A Filter may optionally implement either + * {@link ClientInterceptorBuilder} or {@link ServerInterceptorBuilder} or both, indicating it is + * capable of working on the client side or server side or both, respectively. + */ +interface Filter { + + /** + * The proto message types supported by this filter. A filter will be registered by each of its + * supported message types. + */ + String[] typeUrls(); + + /** + * Parses the top-level filter config from raw proto message. The message may be either a {@link + * com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}. + */ + ConfigOrError parseFilterConfig(Message rawProtoMessage); + + /** + * Parses the per-filter override filter config from raw proto message. The message may be either + * a {@link com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}. + */ + ConfigOrError parseFilterConfigOverride(Message rawProtoMessage); + + /** Represents an opaque data structure holding configuration for a filter. */ + interface FilterConfig { + String typeUrl(); + } + + /** Uses the FilterConfigs produced above to produce an HTTP filter interceptor for clients. */ + interface ClientInterceptorBuilder { + @Nullable + ClientInterceptor buildClientInterceptor( + FilterConfig config, @Nullable FilterConfig overrideConfig, PickSubchannelArgs args, + ScheduledExecutorService scheduler); + } + + // Server side filters are not currently supported, but this interface is defined for clarity. + interface ServerInterceptorBuilder { + ServerInterceptor buildServerInterceptor( + FilterConfig config, @Nullable FilterConfig overrideConfig); + } + + // TODO(zdapeng): Unify with ClientXdsClient.StructOrError, or just have parseFilterConfig() throw + // certain types of Exception. + final class ConfigOrError { + /** + * Returns a {@link ConfigOrError} for the successfully converted data object. + */ + static ConfigOrError fromConfig(T config) { + return new ConfigOrError<>(config); + } + + /** + * Returns a {@link ConfigOrError} for the failure to convert the data object. + */ + static ConfigOrError fromError(String errorDetail) { + return new ConfigOrError<>(errorDetail); + } + + final String errorDetail; + final T config; + + private ConfigOrError(T config) { + this.config = checkNotNull(config, "config"); + this.errorDetail = null; + } + + private ConfigOrError(String errorDetail) { + this.config = null; + this.errorDetail = checkNotNull(errorDetail, "errorDetail"); + } + } + + /** Filter config with instance name. */ + final class NamedFilterConfig { + // filter instance name + final String name; + final FilterConfig filterConfig; + + NamedFilterConfig(String name, FilterConfig filterConfig) { + this.name = name; + this.filterConfig = filterConfig; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NamedFilterConfig that = (NamedFilterConfig) o; + return Objects.equals(name, that.name) + && Objects.equals(filterConfig, that.filterConfig); + } + + @Override + public int hashCode() { + return Objects.hash(name, filterConfig); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("filterConfig", filterConfig) + .toString(); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/FilterRegistry.java b/xds/src/main/java/io/grpc/xds/FilterRegistry.java new file mode 100644 index 0000000000..db4f256bce --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/FilterRegistry.java @@ -0,0 +1,61 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * A registry for all supported {@link Filter}s. Filters can be queried from the registry + * by any of the {@link Filter#typeUrls() type URLs}. + */ +final class FilterRegistry { + private static FilterRegistry instance; + + private final Map supportedFilters = new HashMap<>(); + + private FilterRegistry() {} + + static synchronized FilterRegistry getDefaultRegistry() { + if (instance == null) { + instance = newRegistry().register(FaultFilter.INSTANCE, RouterFilter.INSTANCE); + } + return instance; + } + + @VisibleForTesting + static FilterRegistry newRegistry() { + return new FilterRegistry(); + } + + @VisibleForTesting + FilterRegistry register(Filter... filters) { + for (Filter filter : filters) { + for (String typeUrl : filter.typeUrls()) { + supportedFilters.put(typeUrl, filter); + } + } + return this; + } + + @Nullable + Filter get(String typeUrl) { + return supportedFilters.get(typeUrl); + } +} diff --git a/xds/src/main/java/io/grpc/xds/LameFilter.java b/xds/src/main/java/io/grpc/xds/LameFilter.java new file mode 100644 index 0000000000..4dd1d3c96e --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/LameFilter.java @@ -0,0 +1,121 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Message; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.xds.Filter.ClientInterceptorBuilder; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nullable; + +/** + * A filter that fails all RPCs. To be added to the end of filter chain if RouterFilter is absent. + */ +enum LameFilter implements Filter, ClientInterceptorBuilder { + INSTANCE; + + static final FilterConfig LAME_CONFIG = new FilterConfig() { + @Override + public String typeUrl() { + throw new UnsupportedOperationException("shouldn't be called"); + } + + @Override + public String toString() { + return "LAME_CONFIG"; + } + }; + + @Override + public String[] typeUrls() { + return new String[0]; + } + + @Override + public ConfigOrError parseFilterConfig(Message rawProtoMessage) { + throw new UnsupportedOperationException(); + } + + @Override + public ConfigOrError parseFilterConfigOverride(Message rawProtoMessage) { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public ClientInterceptor buildClientInterceptor( + FilterConfig config, @Nullable FilterConfig overrideConfig, PickSubchannelArgs args, + ScheduledExecutorService scheduler) { + class LameInterceptor implements ClientInterceptor { + + @Override + public ClientCall interceptCall( + MethodDescriptor method, final CallOptions callOptions, Channel next) { + final Context context = Context.current(); + return new ClientCall() { + @Override + public void start(final Listener listener, Metadata headers) { + Executor callExecutor = callOptions.getExecutor(); + if (callExecutor == null) { // This should never happen in practice because + // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with + // a callExecutor. + // TODO(https://github.com/grpc/grpc-java/issues/7868) + callExecutor = MoreExecutors.directExecutor(); + } + callExecutor.execute( + new Runnable() { + @Override + public void run() { + Context previous = context.attach(); + try { + listener.onClose( + Status.UNAVAILABLE.withDescription("No router filter"), new Metadata()); + } finally { + context.detach(previous); + } + } + }); + } + + @Override + public void request(int numMessages) {} + + @Override + public void cancel(@Nullable String message, @Nullable Throwable cause) {} + + @Override + public void halfClose() {} + + @Override + public void sendMessage(ReqT message) {} + }; + } + } + + return new LameInterceptor(); + } +} diff --git a/xds/src/main/java/io/grpc/xds/MessagePrinter.java b/xds/src/main/java/io/grpc/xds/MessagePrinter.java index bc738eed52..9f5df41610 100644 --- a/xds/src/main/java/io/grpc/xds/MessagePrinter.java +++ b/xds/src/main/java/io/grpc/xds/MessagePrinter.java @@ -24,7 +24,9 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; @@ -41,6 +43,8 @@ final class MessagePrinter { .add(Listener.getDescriptor()) .add(io.envoyproxy.envoy.api.v2.Listener.getDescriptor()) .add(HttpConnectionManager.getDescriptor()) + .add(HttpFilter.getDescriptor()) + .add(HTTPFault.getDescriptor()) .add(io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2 .HttpConnectionManager.getDescriptor()) // UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported diff --git a/xds/src/main/java/io/grpc/xds/RouterFilter.java b/xds/src/main/java/io/grpc/xds/RouterFilter.java new file mode 100644 index 0000000000..18fb8becae --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/RouterFilter.java @@ -0,0 +1,69 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import com.google.protobuf.Message; +import io.grpc.ClientInterceptor; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.xds.Filter.ClientInterceptorBuilder; +import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nullable; + +/** + * Router filter implementation. Currently this filter does not parse any field in the config. + */ +enum RouterFilter implements Filter, ClientInterceptorBuilder { + INSTANCE; + + static final String TYPE_URL = + "type.googleapis.com/envoy.extensions.filters.http.router.v3.Router"; + + static final FilterConfig ROUTER_CONFIG = new FilterConfig() { + @Override + public String typeUrl() { + return RouterFilter.TYPE_URL; + } + + @Override + public String toString() { + return "ROUTER_CONFIG"; + } + }; + + @Override + public String[] typeUrls() { + return new String[] { TYPE_URL }; + } + + @Override + public ConfigOrError parseFilterConfig(Message rawProtoMessage) { + return ConfigOrError.fromConfig(ROUTER_CONFIG); + } + + @Override + public ConfigOrError parseFilterConfigOverride(Message rawProtoMessage) { + return ConfigOrError.fromError("Router Filter should not have override config"); + } + + @Nullable + @Override + public ClientInterceptor buildClientInterceptor( + FilterConfig config, @Nullable FilterConfig overrideConfig, PickSubchannelArgs args, + ScheduledExecutorService scheduler) { + return null; + } +} diff --git a/xds/src/main/java/io/grpc/xds/VirtualHost.java b/xds/src/main/java/io/grpc/xds/VirtualHost.java index 30e08d5e23..68967780d5 100644 --- a/xds/src/main/java/io/grpc/xds/VirtualHost.java +++ b/xds/src/main/java/io/grpc/xds/VirtualHost.java @@ -22,12 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.re2j.Pattern; +import io.grpc.xds.Filter.FilterConfig; import io.grpc.xds.Matchers.FractionMatcher; import io.grpc.xds.Matchers.HeaderMatcher; import io.grpc.xds.Matchers.PathMatcher; import java.util.Collections; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; /** Reprsents an upstream virtual host. */ @@ -43,12 +46,13 @@ abstract class VirtualHost { abstract ImmutableList routes(); @Nullable - abstract HttpFault httpFault(); + abstract ImmutableMap filterConfigOverrides(); - public static VirtualHost create(String name, List domains, List routes, - @Nullable HttpFault httpFault) { + public static VirtualHost create( + String name, List domains, List routes, + Map filterConfigOverrides) { return new AutoValue_VirtualHost(name, ImmutableList.copyOf(domains), - ImmutableList.copyOf(routes), httpFault); + ImmutableList.copyOf(routes), ImmutableMap.copyOf(filterConfigOverrides)); } @AutoValue @@ -58,11 +62,13 @@ abstract class VirtualHost { abstract RouteAction routeAction(); @Nullable - abstract HttpFault httpFault(); + abstract ImmutableMap filterConfigOverrides(); - static Route create(RouteMatch routeMatch, RouteAction routeAction, - @Nullable HttpFault httpFault) { - return new AutoValue_VirtualHost_Route(routeMatch, routeAction, httpFault); + static Route create( + RouteMatch routeMatch, RouteAction routeAction, + Map filterConfigOverrides) { + return new AutoValue_VirtualHost_Route( + routeMatch, routeAction, ImmutableMap.copyOf(filterConfigOverrides)); } @AutoValue @@ -129,11 +135,12 @@ abstract class VirtualHost { abstract int weight(); @Nullable - abstract HttpFault httpFault(); + abstract ImmutableMap filterConfigOverrides(); - static ClusterWeight create(String name, int weight, @Nullable HttpFault httpFault) { - return new AutoValue_VirtualHost_Route_RouteAction_ClusterWeight(name, weight, - httpFault); + static ClusterWeight create( + String name, int weight, Map filterConfigOverrides) { + return new AutoValue_VirtualHost_Route_RouteAction_ClusterWeight( + name, weight, ImmutableMap.copyOf(filterConfigOverrides)); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index b793255ee2..f01e22460c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -27,6 +27,7 @@ import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.EnvoyServerProtoData.Listener; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; +import io.grpc.xds.Filter.NamedFilterConfig; import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; import java.util.ArrayList; @@ -54,36 +55,32 @@ abstract class XdsClient { // The list virtual hosts that make up the route table. @Nullable final List virtualHosts; - // Listener contains the HttpFault filter. - final boolean hasFaultInjection; - @Nullable // Can be null even if hasFaultInjection is true. - final HttpFault httpFault; + // Filter instance names. Null if HttpFilter support is not enabled. + @Nullable final List filterChain; // Server side Listener. @Nullable final Listener listener; LdsUpdate( - long httpMaxStreamDurationNano, String rdsName, boolean hasFaultInjection, - @Nullable HttpFault httpFault) { - this(httpMaxStreamDurationNano, rdsName, null, hasFaultInjection, httpFault); + long httpMaxStreamDurationNano, String rdsName, + @Nullable List filterChain) { + this(httpMaxStreamDurationNano, rdsName, null, filterChain); } LdsUpdate( long httpMaxStreamDurationNano, List virtualHosts, - boolean hasFaultInjection, @Nullable HttpFault httpFault) { - this(httpMaxStreamDurationNano, null, virtualHosts, hasFaultInjection, httpFault); + @Nullable List filterChain) { + this(httpMaxStreamDurationNano, null, virtualHosts, filterChain); } private LdsUpdate( long httpMaxStreamDurationNano, @Nullable String rdsName, - @Nullable List virtualHosts, boolean hasFaultInjection, - @Nullable HttpFault httpFault) { + @Nullable List virtualHosts, @Nullable List filterChain) { this.httpMaxStreamDurationNano = httpMaxStreamDurationNano; this.rdsName = rdsName; this.virtualHosts = virtualHosts == null ? null : Collections.unmodifiableList(new ArrayList<>(virtualHosts)); - this.hasFaultInjection = hasFaultInjection; - this.httpFault = httpFault; + this.filterChain = filterChain == null ? null : Collections.unmodifiableList(filterChain); this.listener = null; } @@ -91,15 +88,14 @@ abstract class XdsClient { this.listener = listener; this.httpMaxStreamDurationNano = 0L; this.rdsName = null; + this.filterChain = null; this.virtualHosts = null; - this.hasFaultInjection = false; - this.httpFault = null; } @Override public int hashCode() { return Objects.hash( - httpMaxStreamDurationNano, rdsName, virtualHosts, hasFaultInjection, httpFault, listener); + httpMaxStreamDurationNano, rdsName, virtualHosts, filterChain, listener); } @Override @@ -114,8 +110,7 @@ abstract class XdsClient { return httpMaxStreamDurationNano == that.httpMaxStreamDurationNano && Objects.equals(rdsName, that.rdsName) && Objects.equals(virtualHosts, that.virtualHosts) - && hasFaultInjection == that.hasFaultInjection - && Objects.equals(httpFault, that.httpFault) + && Objects.equals(filterChain, that.filterChain) && Objects.equals(listener, that.listener); } @@ -128,15 +123,15 @@ abstract class XdsClient { } else { toStringHelper.add("virtualHosts", virtualHosts); } - if (hasFaultInjection) { - toStringHelper.add("faultInjectionEnabled", true) - .add("httpFault", httpFault); + if (filterChain != null) { + toStringHelper.add("filterChain", filterChain); } if (listener != null) { toStringHelper.add("listener", listener); } return toStringHelper.toString(); } + } static final class RdsUpdate implements ResourceUpdate { diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 037f0e3aa0..91e0b4a38c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -18,22 +18,18 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static java.util.concurrent.TimeUnit.NANOSECONDS; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; -import io.grpc.Context; -import io.grpc.Deadline; +import io.grpc.ClientInterceptors; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.InternalConfigSelector; @@ -44,12 +40,11 @@ import io.grpc.MethodDescriptor; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.SynchronizationContext; -import io.grpc.internal.DelayedClientCall; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ObjectPool; -import io.grpc.xds.HttpFault.FaultAbort; -import io.grpc.xds.HttpFault.FaultDelay; -import io.grpc.xds.HttpFault.FractionalPercent; +import io.grpc.xds.Filter.ClientInterceptorBuilder; +import io.grpc.xds.Filter.FilterConfig; +import io.grpc.xds.Filter.NamedFilterConfig; import io.grpc.xds.Matchers.FractionMatcher; import io.grpc.xds.Matchers.HeaderMatcher; import io.grpc.xds.Matchers.PathMatcher; @@ -66,6 +61,7 @@ import io.grpc.xds.XdsClient.RdsUpdate; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -76,12 +72,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; /** @@ -98,29 +90,11 @@ final class XdsNameResolver extends NameResolver { CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY"); static final CallOptions.Key RPC_HASH_KEY = CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY"); + private static final NamedFilterConfig LAME_FILTER = + new NamedFilterConfig(null, LameFilter.LAME_CONFIG); @VisibleForTesting static boolean enableTimeout = Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT")); - @VisibleForTesting - static final Metadata.Key DOWNSTREAM_NODE_KEY = - Metadata.Key.of("x-envoy-downstream-service-node", Metadata.ASCII_STRING_MARSHALLER); - @VisibleForTesting - static final Metadata.Key HEADER_DELAY_KEY = - Metadata.Key.of("x-envoy-fault-delay-request", Metadata.ASCII_STRING_MARSHALLER); - @VisibleForTesting - static final Metadata.Key HEADER_DELAY_PERCENTAGE_KEY = - Metadata.Key.of("x-envoy-fault-delay-request-percentage", Metadata.ASCII_STRING_MARSHALLER); - @VisibleForTesting - static final Metadata.Key HEADER_ABORT_HTTP_STATUS_KEY = - Metadata.Key.of("x-envoy-fault-abort-request", Metadata.ASCII_STRING_MARSHALLER); - @VisibleForTesting - static final Metadata.Key HEADER_ABORT_GRPC_STATUS_KEY = - Metadata.Key.of("x-envoy-fault-abort-grpc-request", Metadata.ASCII_STRING_MARSHALLER); - @VisibleForTesting - static final Metadata.Key HEADER_ABORT_PERCENTAGE_KEY = - Metadata.Key.of("x-envoy-fault-abort-request-percentage", Metadata.ASCII_STRING_MARSHALLER); - @VisibleForTesting - static AtomicLong activeFaultInjectedStreamCounter = new AtomicLong(); private final InternalLogId logId; private final XdsLogger logger; @@ -130,6 +104,7 @@ final class XdsNameResolver extends NameResolver { private final ScheduledExecutorService scheduler; private final XdsClientPoolFactory xdsClientPoolFactory; private final ThreadSafeRandom random; + private final FilterRegistry filterRegistry; private final XxHash64 hashFunc = XxHash64.INSTANCE; private final ConcurrentMap clusterRefs = new ConcurrentHashMap<>(); private final ConfigSelector configSelector = new ConfigSelector(); @@ -144,19 +119,22 @@ final class XdsNameResolver extends NameResolver { XdsNameResolver(String name, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler) { this(name, serviceConfigParser, syncContext, scheduler, - SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance); + SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance, + FilterRegistry.getDefaultRegistry()); } @VisibleForTesting XdsNameResolver(String name, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler, - XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random) { + XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random, + FilterRegistry filterRegistry) { authority = GrpcUtil.checkAuthority(checkNotNull(name, "name")); this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.scheduler = checkNotNull(scheduler, "scheduler"); this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); this.random = checkNotNull(random, "random"); + this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry"); logId = InternalLogId.allocate("xds-resolver", name); logger = XdsLogger.withLogId(logId); logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name); @@ -370,18 +348,21 @@ final class XdsNameResolver extends NameResolver { asciiHeaders.put("content-type", "application/grpc"); String cluster = null; Route selectedRoute = null; - HttpFault selectedFaultConfig; RoutingConfig routingCfg; + Map selectedOverrideConfigs; + List filterInterceptors = new ArrayList<>(); do { routingCfg = routingConfig; - selectedFaultConfig = routingCfg.faultConfig; + selectedOverrideConfigs = new HashMap<>(routingCfg.virtualHostOverrideConfig); + if (routingCfg.filterChain != null + && Iterables.getLast(routingCfg.filterChain).equals(LAME_FILTER)) { + break; + } for (Route route : routingCfg.routes) { if (matchRoute(route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(), asciiHeaders, random)) { selectedRoute = route; - if (routingCfg.applyFaultInjection && route.httpFault() != null) { - selectedFaultConfig = route.httpFault(); - } + selectedOverrideConfigs.putAll(route.filterConfigOverrides()); break; } } @@ -403,9 +384,7 @@ final class XdsNameResolver extends NameResolver { accumulator += weightedCluster.weight(); if (select < accumulator) { cluster = weightedCluster.name(); - if (routingCfg.applyFaultInjection && weightedCluster.httpFault() != null) { - selectedFaultConfig = weightedCluster.httpFault(); - } + selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides()); break; } } @@ -414,12 +393,15 @@ final class XdsNameResolver extends NameResolver { // TODO(chengyuanzhang): avoid service config generation and parsing for each call. Map rawServiceConfig = Collections.emptyMap(); if (enableTimeout) { - Long timeoutNano = selectedRoute.routeAction().timeoutNano(); - if (timeoutNano == null) { - timeoutNano = routingCfg.fallbackTimeoutNano; + Long timeoutNanos = null; + if (selectedRoute != null) { + timeoutNanos = selectedRoute.routeAction().timeoutNano(); } - if (timeoutNano > 0) { - rawServiceConfig = generateServiceConfigWithMethodTimeoutConfig(timeoutNano); + if (timeoutNanos == null) { + timeoutNanos = routingCfg.fallbackTimeoutNano; + } + if (timeoutNanos > 0) { + rawServiceConfig = generateServiceConfigWithMethodTimeoutConfig(timeoutNanos); } } ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig); @@ -430,38 +412,33 @@ final class XdsNameResolver extends NameResolver { parsedServiceConfig.getError().augmentDescription( "Failed to parse service config (method config)")); } - if (selectedFaultConfig != null && selectedFaultConfig.maxActiveFaults() != null - && activeFaultInjectedStreamCounter.get() >= selectedFaultConfig.maxActiveFaults()) { - selectedFaultConfig = null; - } - if (selectedFaultConfig != null) { - if (!selectedFaultConfig.upstreamCluster().equals(cluster)) { - selectedFaultConfig = null; - } else if (!selectedFaultConfig.downstreamNodes().isEmpty()) { - String downstreamNode = headers.get(DOWNSTREAM_NODE_KEY); - if (downstreamNode == null - || !selectedFaultConfig.downstreamNodes().contains(downstreamNode)) { - selectedFaultConfig = null; + if (routingCfg.filterChain != null) { + for (NamedFilterConfig namedFilter : routingCfg.filterChain) { + FilterConfig filterConfig = namedFilter.filterConfig; + Filter filter; + if (namedFilter.equals(LAME_FILTER)) { + filter = LameFilter.INSTANCE; + } else { + filter = filterRegistry.get(filterConfig.typeUrl()); + } + if (filter instanceof ClientInterceptorBuilder) { + ClientInterceptor interceptor = ((ClientInterceptorBuilder) filter) + .buildClientInterceptor( + filterConfig, selectedOverrideConfigs.get(namedFilter.name), + args, scheduler); + if (interceptor != null) { + filterInterceptors.add(interceptor); + } } } - } - if (selectedFaultConfig != null - && !matchHeaders(selectedFaultConfig.headers(), asciiHeaders)) { - selectedFaultConfig = null; - } - Long delayNanos = null; - Status abortStatus = null; - if (selectedFaultConfig != null) { - if (selectedFaultConfig.faultDelay() != null) { - delayNanos = determineFaultDelayNanos(selectedFaultConfig.faultDelay(), headers); - } - if (selectedFaultConfig.faultAbort() != null) { - abortStatus = determineFaultAbortStatus(selectedFaultConfig.faultAbort(), headers); + if (Iterables.getLast(routingCfg.filterChain).equals(LAME_FILTER)) { + return Result.newBuilder() + .setConfig(config) + .setInterceptor(combineInterceptors(filterInterceptors)) + .build(); } } final String finalCluster = cluster; - final Long finalDelayNanos = delayNanos; - final Status finalAbortStatus = abortStatus; final long hash = generateHash(selectedRoute.routeAction().hashPolicies(), asciiHeaders); class ClusterSelectionInterceptor implements ClientInterceptor { @Override @@ -471,70 +448,39 @@ final class XdsNameResolver extends NameResolver { final CallOptions callOptionsForCluster = callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster) .withOption(RPC_HASH_KEY, hash); - Supplier> configApplyingCallSupplier = - new Supplier>() { + return new SimpleForwardingClientCall( + next.newCall(method, callOptionsForCluster)) { + @Override + public void start(Listener listener, Metadata headers) { + listener = new SimpleForwardingClientCallListener(listener) { + boolean committed; + @Override - public ClientCall get() { - return new SimpleForwardingClientCall( - next.newCall(method, callOptionsForCluster)) { - @Override - public void start(Listener listener, Metadata headers) { - listener = new SimpleForwardingClientCallListener(listener) { - boolean committed; + public void onHeaders(Metadata headers) { + committed = true; + releaseCluster(finalCluster); + delegate().onHeaders(headers); + } - @Override - public void onHeaders(Metadata headers) { - committed = true; - releaseCluster(finalCluster); - delegate().onHeaders(headers); - } - - @Override - public void onClose(Status status, Metadata trailers) { - if (!committed) { - releaseCluster(finalCluster); - } - delegate().onClose(status, trailers); - } - }; - delegate().start(listener, headers); - } - }; + @Override + public void onClose(Status status, Metadata trailers) { + if (!committed) { + releaseCluster(finalCluster); + } + delegate().onClose(status, trailers); } }; - - Executor callExecutor = callOptions.getExecutor(); - if (callExecutor == null) { // This should never happen in practice because - // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with - // a callExecutor. - // TODO(https://github.com/grpc/grpc-java/issues/7868) - callExecutor = MoreExecutors.directExecutor(); - } - if (finalDelayNanos != null && finalAbortStatus != null) { - return new ActiveFaultCountingClientCall<>( - new DelayInjectedCall<>( - finalDelayNanos, callExecutor, scheduler, callOptionsForCluster.getDeadline(), - Suppliers.ofInstance( - new FailingClientCall(finalAbortStatus, callExecutor)))); - } - if (finalAbortStatus != null) { - return new ActiveFaultCountingClientCall<>( - new FailingClientCall(finalAbortStatus, callExecutor)); - } - if (finalDelayNanos != null) { - return new ActiveFaultCountingClientCall<>( - new DelayInjectedCall<>( - finalDelayNanos, callExecutor, scheduler, callOptionsForCluster.getDeadline(), - configApplyingCallSupplier)); - } - return configApplyingCallSupplier.get(); + delegate().start(listener, headers); + } + }; } } + filterInterceptors.add(new ClusterSelectionInterceptor()); return Result.newBuilder() .setConfig(config) - .setInterceptor(new ClusterSelectionInterceptor()) + .setInterceptor(combineInterceptors(filterInterceptors)) .build(); } @@ -597,198 +543,21 @@ final class XdsNameResolver extends NameResolver { } return hash == null ? random.nextLong() : hash; } - - @Nullable - private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) { - Long delayNanos; - FractionalPercent fractionalPercent = faultDelay.percent(); - if (faultDelay.headerDelay()) { - try { - int delayMillis = Integer.parseInt(headers.get(HEADER_DELAY_KEY)); - delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis); - String delayPercentageStr = headers.get(HEADER_DELAY_PERCENTAGE_KEY); - if (delayPercentageStr != null) { - int delayPercentage = Integer.parseInt(delayPercentageStr); - if (delayPercentage >= 0 && delayPercentage < fractionalPercent.numerator()) { - fractionalPercent = - FractionalPercent.create(delayPercentage, fractionalPercent.denominatorType()); - } - } - } catch (NumberFormatException e) { - return null; // treated as header_delay not applicable - } - } else { - delayNanos = faultDelay.delayNanos(); - } - if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) { - return null; - } - return delayNanos; - } - - @Nullable - private Status determineFaultAbortStatus(FaultAbort faultAbort, Metadata headers) { - Status abortStatus = null; - FractionalPercent fractionalPercent = faultAbort.percent(); - if (faultAbort.headerAbort()) { - try { - String grpcCodeStr = headers.get(HEADER_ABORT_GRPC_STATUS_KEY); - if (grpcCodeStr != null) { - int grpcCode = Integer.parseInt(grpcCodeStr); - abortStatus = Status.fromCodeValue(grpcCode); - } - String httpCodeStr = headers.get(HEADER_ABORT_HTTP_STATUS_KEY); - if (httpCodeStr != null) { - int httpCode = Integer.parseInt(httpCodeStr); - abortStatus = GrpcUtil.httpStatusToGrpcStatus(httpCode); - } - String abortPercentageStr = headers.get(HEADER_ABORT_PERCENTAGE_KEY); - if (abortPercentageStr != null) { - int abortPercentage = - Integer.parseInt(headers.get(HEADER_ABORT_PERCENTAGE_KEY)); - if (abortPercentage >= 0 && abortPercentage < fractionalPercent.numerator()) { - fractionalPercent = - FractionalPercent.create(abortPercentage, fractionalPercent.denominatorType()); - } - } - } catch (NumberFormatException e) { - return null; // treated as header_abort not applicable - } - } else { - abortStatus = faultAbort.status(); - } - if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) { - return null; - } - return abortStatus; - } } - private static int getRatePerMillion(FractionalPercent percent) { - int numerator = percent.numerator(); - FractionalPercent.DenominatorType type = percent.denominatorType(); - switch (type) { - case TEN_THOUSAND: - numerator *= 100; - break; - case HUNDRED: - numerator *= 10_000; - break; - case MILLION: - default: - break; + private static ClientInterceptor combineInterceptors(final List interceptors) { + checkArgument(!interceptors.isEmpty(), "empty interceptors"); + if (interceptors.size() == 1) { + return interceptors.get(0); } - - if (numerator > 1_000_000 || numerator < 0) { - numerator = 1_000_000; - } - return numerator; - } - - /** - * A forwarding client call that counts active fault injections. - */ - private final class ActiveFaultCountingClientCall extends - SimpleForwardingClientCall { - ActiveFaultCountingClientCall(ClientCall faultInjectedDelegate) { - super(faultInjectedDelegate); - activeFaultInjectedStreamCounter.incrementAndGet(); - } - - @Override - public void start(Listener listener, Metadata headers) { - listener = new SimpleForwardingClientCallListener(listener) { - @Override - public void onClose(Status status, Metadata trailers) { - delegate().onClose(status, trailers); - activeFaultInjectedStreamCounter.decrementAndGet(); - } - }; - delegate().start(listener, headers); - } - } - - /** A {@link DelayedClientCall} with a fixed delay. */ - private static final class DelayInjectedCall extends DelayedClientCall { - final Object lock = new Object(); - ScheduledFuture delayTask; - boolean cancelled; - - DelayInjectedCall( - long delayNanos, Executor callExecutor, ScheduledExecutorService scheduler, - @Nullable Deadline deadline, - final Supplier> callSupplier) { - super(callExecutor, scheduler, deadline); - ScheduledFuture task = scheduler.schedule( - new Runnable() { - @Override - public void run() { - setCall(callSupplier.get()); - } - }, - delayNanos, - NANOSECONDS); - synchronized (lock) { - if (cancelled) { - task.cancel(false); - return; - } - delayTask = task; + return new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + next = ClientInterceptors.interceptForward(next, interceptors); + return next.newCall(method, callOptions); } - } - - @Override - protected void callCancelled() { - ScheduledFuture savedDelayTask; - synchronized (lock) { - cancelled = true; - savedDelayTask = delayTask; - } - if (savedDelayTask != null) { - savedDelayTask.cancel(false); - } - } - } - - /** An implementation of {@link ClientCall} that fails when started. */ - private static final class FailingClientCall extends ClientCall { - final Status error; - final Executor callExecutor; - final Context context; - - FailingClientCall(Status error, Executor callExecutor) { - this.error = error; - this.callExecutor = callExecutor; - this.context = Context.current(); - } - - @Override - public void start(final ClientCall.Listener listener, Metadata headers) { - callExecutor.execute( - new Runnable() { - @Override - public void run() { - Context previous = context.attach(); - try { - listener.onClose(error, new Metadata()); - } finally { - context.detach(previous); - } - } - }); - } - - @Override - public void request(int numMessages) {} - - @Override - public void cancel(String message, Throwable cause) {} - - @Override - public void halfClose() {} - - @Override - public void sendMessage(ReqT message) {} + }; } @VisibleForTesting @@ -872,10 +641,7 @@ final class XdsNameResolver extends NameResolver { private String rdsResource; @Nullable private RdsResourceWatcher rdsWatcher; - private long httpMaxStreamDurationNano; - private boolean applyFaultInjection; - @Nullable - private HttpFault httpFilterFaultConfig; + private LdsUpdate update; @Override public void onChanged(final LdsUpdate update) { @@ -886,9 +652,7 @@ final class XdsNameResolver extends NameResolver { return; } logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update); - httpMaxStreamDurationNano = update.httpMaxStreamDurationNano; - applyFaultInjection = update.hasFaultInjection; - httpFilterFaultConfig = applyFaultInjection ? update.httpFault : null; + ResolveState.this.update = update; List virtualHosts = update.virtualHosts; String rdsName = update.rdsName; if (rdsName != null && rdsName.equals(rdsResource)) { @@ -958,11 +722,27 @@ final class XdsNameResolver extends NameResolver { listener.onResult(emptyResult); return; } - List routes = virtualHost.routes(); - HttpFault faultConfig = httpFilterFaultConfig; - if (applyFaultInjection && virtualHost.httpFault() != null) { - faultConfig = virtualHost.httpFault(); + List filterChain = null; + if (update.filterChain != null) { + boolean hasRouter = false; + filterChain = new ArrayList<>(update.filterChain.size()); + for (NamedFilterConfig namedFilter : update.filterChain) { + filterChain.add(namedFilter); + if (namedFilter.filterConfig.equals(RouterFilter.ROUTER_CONFIG)) { + hasRouter = true; + break; + } + } + if (!hasRouter) { + filterChain.add(LAME_FILTER); + routingConfig = new RoutingConfig( + update.httpMaxStreamDurationNano, Collections.emptyList(), filterChain, + virtualHost.filterConfigOverrides()); + updateResolutionResult(); + return; + } } + List routes = virtualHost.routes(); Set clusters = new HashSet<>(); for (Route route : routes) { RouteAction action = route.routeAction(); @@ -996,7 +776,8 @@ final class XdsNameResolver extends NameResolver { // Make newly added clusters selectable by config selector and deleted clusters no longer // selectable. routingConfig = new RoutingConfig( - httpMaxStreamDurationNano, routes, applyFaultInjection, faultConfig); + update.httpMaxStreamDurationNano, routes, filterChain, + virtualHost.filterConfigOverrides()); shouldUpdateResult = false; for (String cluster : deletedClusters) { int count = clusterRefs.get(cluster).decrementAndGet(); @@ -1071,21 +852,22 @@ final class XdsNameResolver extends NameResolver { */ private static class RoutingConfig { private final long fallbackTimeoutNano; - private final List routes; - private final boolean applyFaultInjection; - @Nullable - private final HttpFault faultConfig; + final List routes; + // Null if HttpFilter is not supported. + @Nullable final List filterChain; + final Map virtualHostOverrideConfig; - private static final RoutingConfig empty = - new RoutingConfig(0L, Collections.emptyList(), false, null); + private static RoutingConfig empty = new RoutingConfig( + 0L, Collections.emptyList(), null, Collections.emptyMap()); private RoutingConfig( - long fallbackTimeoutNano, List routes, boolean applyFaultInjection, - HttpFault faultConfig) { + long fallbackTimeoutNano, List routes, @Nullable List filterChain, + Map virtualHostOverrideConfig) { this.fallbackTimeoutNano = fallbackTimeoutNano; this.routes = routes; - this.applyFaultInjection = applyFaultInjection; - this.faultConfig = faultConfig; + checkArgument(filterChain == null || !filterChain.isEmpty(), "filterChain is empty"); + this.filterChain = filterChain == null ? null : Collections.unmodifiableList(filterChain); + this.virtualHostOverrideConfig = Collections.unmodifiableMap(virtualHostOverrideConfig); } } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java index fe7a4d7caf..9948371fad 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java @@ -18,8 +18,10 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; import com.google.protobuf.BoolValue; +import com.google.protobuf.StringValue; import com.google.protobuf.UInt32Value; import com.google.protobuf.util.Durations; import com.google.re2j.Pattern; @@ -45,7 +47,7 @@ import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.Header; import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.QueryParameter; import io.envoyproxy.envoy.config.route.v3.RouteAction.MaxStreamDuration; import io.envoyproxy.envoy.config.route.v3.WeightedCluster; -import io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.HeaderAbort; +import io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay; import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; @@ -59,7 +61,8 @@ import io.grpc.Status.Code; import io.grpc.xds.ClientXdsClient.StructOrError; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; -import io.grpc.xds.HttpFault.FaultAbort; +import io.grpc.xds.FaultConfig.FaultAbort; +import io.grpc.xds.Filter.FilterConfig; import io.grpc.xds.Matchers.FractionMatcher; import io.grpc.xds.Matchers.HeaderMatcher; import io.grpc.xds.Matchers.PathMatcher; @@ -71,6 +74,7 @@ import io.grpc.xds.VirtualHost.Route.RouteMatch; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; @@ -91,7 +95,7 @@ public class ClientXdsClientDataTest { io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder() .setCluster("cluster-foo")) .build(); - StructOrError struct = ClientXdsClient.parseRoute(proto); + StructOrError struct = ClientXdsClient.parseRoute(proto, false); assertThat(struct.getErrorDetail()).isNull(); assertThat(struct.getStruct()) .isEqualTo( @@ -99,7 +103,7 @@ public class ClientXdsClientDataTest { RouteMatch.create(PathMatcher.fromPath("/service/method", false), Collections.emptyList(), null), RouteAction.forCluster("cluster-foo", Collections.emptyList(), null), - null)); + ImmutableMap.of())); } @Test @@ -111,7 +115,7 @@ public class ClientXdsClientDataTest { .setMatch(io.envoyproxy.envoy.config.route.v3.RouteMatch.newBuilder().setPath("")) .setRedirect(RedirectAction.getDefaultInstance()) .build(); - res = ClientXdsClient.parseRoute(redirectRoute); + res = ClientXdsClient.parseRoute(redirectRoute, false); assertThat(res.getStruct()).isNull(); assertThat(res.getErrorDetail()).isEqualTo("Unsupported action type: redirect"); @@ -121,7 +125,7 @@ public class ClientXdsClientDataTest { .setMatch(io.envoyproxy.envoy.config.route.v3.RouteMatch.newBuilder().setPath("")) .setDirectResponse(DirectResponseAction.getDefaultInstance()) .build(); - res = ClientXdsClient.parseRoute(directResponseRoute); + res = ClientXdsClient.parseRoute(directResponseRoute, false); assertThat(res.getStruct()).isNull(); assertThat(res.getErrorDetail()).isEqualTo("Unsupported action type: direct_response"); @@ -131,7 +135,7 @@ public class ClientXdsClientDataTest { .setMatch(io.envoyproxy.envoy.config.route.v3.RouteMatch.newBuilder().setPath("")) .setFilterAction(FilterAction.getDefaultInstance()) .build(); - res = ClientXdsClient.parseRoute(filterRoute); + res = ClientXdsClient.parseRoute(filterRoute, false); assertThat(res.getStruct()).isNull(); assertThat(res.getErrorDetail()).isEqualTo("Unsupported action type: filter_action"); } @@ -151,7 +155,7 @@ public class ClientXdsClientDataTest { io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder() .setCluster("cluster-foo")) .build(); - assertThat(ClientXdsClient.parseRoute(proto)).isNull(); + assertThat(ClientXdsClient.parseRoute(proto, false)).isNull(); } @Test @@ -166,7 +170,7 @@ public class ClientXdsClientDataTest { io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder() .setClusterHeader("cluster header")) // cluster_header action not supported .build(); - assertThat(ClientXdsClient.parseRoute(proto)).isNull(); + assertThat(ClientXdsClient.parseRoute(proto, false)).isNull(); } @Test @@ -345,7 +349,7 @@ public class ClientXdsClientDataTest { io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder() .setCluster("cluster-foo") .build(); - StructOrError struct = ClientXdsClient.parseRouteAction(proto); + StructOrError struct = ClientXdsClient.parseRouteAction(proto, false); assertThat(struct.getErrorDetail()).isNull(); assertThat(struct.getStruct().cluster()).isEqualTo("cluster-foo"); assertThat(struct.getStruct().weightedClusters()).isNull(); @@ -367,12 +371,12 @@ public class ClientXdsClientDataTest { .setName("cluster-bar") .setWeight(UInt32Value.newBuilder().setValue(70)))) .build(); - StructOrError struct = ClientXdsClient.parseRouteAction(proto); + StructOrError struct = ClientXdsClient.parseRouteAction(proto, false); assertThat(struct.getErrorDetail()).isNull(); assertThat(struct.getStruct().cluster()).isNull(); assertThat(struct.getStruct().weightedClusters()).containsExactly( - ClusterWeight.create("cluster-foo", 30, null), - ClusterWeight.create("cluster-bar", 70, null)); + ClusterWeight.create("cluster-foo", 30, ImmutableMap.of()), + ClusterWeight.create("cluster-bar", 70, ImmutableMap.of())); } @Test @@ -385,7 +389,7 @@ public class ClientXdsClientDataTest { .setGrpcTimeoutHeaderMax(Durations.fromSeconds(5L)) .setMaxStreamDuration(Durations.fromMillis(20L))) .build(); - StructOrError struct = ClientXdsClient.parseRouteAction(proto); + StructOrError struct = ClientXdsClient.parseRouteAction(proto, false); assertThat(struct.getStruct().timeoutNano()).isEqualTo(TimeUnit.SECONDS.toNanos(5L)); } @@ -398,7 +402,7 @@ public class ClientXdsClientDataTest { MaxStreamDuration.newBuilder() .setMaxStreamDuration(Durations.fromSeconds(5L))) .build(); - StructOrError struct = ClientXdsClient.parseRouteAction(proto); + StructOrError struct = ClientXdsClient.parseRouteAction(proto, false); assertThat(struct.getStruct().timeoutNano()).isEqualTo(TimeUnit.SECONDS.toNanos(5L)); } @@ -408,7 +412,7 @@ public class ClientXdsClientDataTest { io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder() .setCluster("cluster-foo") .build(); - StructOrError struct = ClientXdsClient.parseRouteAction(proto); + StructOrError struct = ClientXdsClient.parseRouteAction(proto, false); assertThat(struct.getStruct().timeoutNano()).isNull(); } @@ -443,7 +447,7 @@ public class ClientXdsClientDataTest { .setQueryParameter( QueryParameter.newBuilder().setName("param"))) // unsupported .build(); - StructOrError struct = ClientXdsClient.parseRouteAction(proto); + StructOrError struct = ClientXdsClient.parseRouteAction(proto, false); List policies = struct.getStruct().hashPolicies(); assertThat(policies).hasSize(2); assertThat(policies.get(0).type()).isEqualTo(HashPolicy.Type.HEADER); @@ -463,29 +467,11 @@ public class ClientXdsClientDataTest { .setName("cluster-foo") .setWeight(UInt32Value.newBuilder().setValue(30)) .build(); - ClusterWeight clusterWeight = ClientXdsClient.parseClusterWeight(proto).getStruct(); + ClusterWeight clusterWeight = ClientXdsClient.parseClusterWeight(proto, false).getStruct(); assertThat(clusterWeight.name()).isEqualTo("cluster-foo"); assertThat(clusterWeight.weight()).isEqualTo(30); } - // TODO(zdapeng): add tests for parseClusterWeight with HttpFault. - - // TODO(zdapeng): add tests for parseHttpFault. - - @Test - public void parseFaultAbort_withHeaderAbort() { - io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort proto = - io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.newBuilder() - .setPercentage(FractionalPercent.newBuilder() - .setNumerator(20).setDenominator(DenominatorType.HUNDRED)) - .setHeaderAbort(HeaderAbort.getDefaultInstance()).build(); - FaultAbort faultAbort = ClientXdsClient.parseFaultAbort(proto).getStruct(); - assertThat(faultAbort.headerAbort()).isTrue(); - assertThat(faultAbort.percent().numerator()).isEqualTo(20); - assertThat(faultAbort.percent().denominatorType()) - .isEqualTo(HttpFault.FractionalPercent.DenominatorType.HUNDRED); - } - @Test public void parseFaultAbort_withHttpStatus() { io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort proto = @@ -493,10 +479,10 @@ public class ClientXdsClientDataTest { .setPercentage(FractionalPercent.newBuilder() .setNumerator(100).setDenominator(DenominatorType.TEN_THOUSAND)) .setHttpStatus(400).build(); - FaultAbort res = ClientXdsClient.parseFaultAbort(proto).getStruct(); + FaultAbort res = FaultFilter.parseFaultAbort(proto).config; assertThat(res.percent().numerator()).isEqualTo(100); assertThat(res.percent().denominatorType()) - .isEqualTo(HttpFault.FractionalPercent.DenominatorType.TEN_THOUSAND); + .isEqualTo(FaultConfig.FractionalPercent.DenominatorType.TEN_THOUSAND); assertThat(res.status().getCode()).isEqualTo(Code.INTERNAL); } @@ -507,10 +493,10 @@ public class ClientXdsClientDataTest { .setPercentage(FractionalPercent.newBuilder() .setNumerator(600).setDenominator(DenominatorType.MILLION)) .setGrpcStatus(Code.DEADLINE_EXCEEDED.value()).build(); - FaultAbort faultAbort = ClientXdsClient.parseFaultAbort(proto).getStruct(); + FaultAbort faultAbort = FaultFilter.parseFaultAbort(proto).config; assertThat(faultAbort.percent().numerator()).isEqualTo(600); assertThat(faultAbort.percent().denominatorType()) - .isEqualTo(HttpFault.FractionalPercent.DenominatorType.MILLION); + .isEqualTo(FaultConfig.FractionalPercent.DenominatorType.MILLION); assertThat(faultAbort.status().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); } @@ -627,6 +613,73 @@ public class ClientXdsClientDataTest { assertThat(struct.getErrorDetail()).isEqualTo("negative priority"); } + @Test + public void parseHttpFilter_unsupportedButOptional() { + HttpFilter httpFilter = HttpFilter.newBuilder() + .setIsOptional(true) + .setTypedConfig(Any.pack(StringValue.of("unsupported"))) + .build(); + assertThat(ClientXdsClient.parseHttpFilter(httpFilter)).isNull(); + } + + @Test + public void parseHttpFilter_unsupportedAndRequired() { + HttpFilter httpFilter = HttpFilter.newBuilder() + .setIsOptional(false) + .setName("unsupported.filter") + .setTypedConfig(Any.pack(StringValue.of("string value"))) + .build(); + assertThat(ClientXdsClient.parseHttpFilter(httpFilter).getErrorDetail()).isEqualTo( + "HttpFilter [unsupported.filter] is not optional and has an unsupported config type: " + + "type.googleapis.com/google.protobuf.StringValue"); + } + + @Test + public void parseOverrideFilterConfigs_unsupportedButOptional() { + HTTPFault httpFault = HTTPFault.newBuilder() + .setDelay(FaultDelay.newBuilder().setFixedDelay(Durations.fromNanos(3000))) + .build(); + Map configOverrides = ImmutableMap.of( + "envoy.fault", + Any.pack(httpFault), + "unsupported.filter", + Any.pack(io.envoyproxy.envoy.config.route.v3.FilterConfig.newBuilder() + .setIsOptional(true).setConfig(Any.pack(StringValue.of("string value"))) + .build())); + Map parsedConfigs = + ClientXdsClient.parseOverrideFilterConfigs(configOverrides).getStruct(); + assertThat(parsedConfigs).hasSize(1); + assertThat(parsedConfigs).containsKey("envoy.fault"); + } + + @Test + public void parseOverrideFilterConfigs_unsupportedAndRequired() { + HTTPFault httpFault = HTTPFault.newBuilder() + .setDelay(FaultDelay.newBuilder().setFixedDelay(Durations.fromNanos(3000))) + .build(); + Map configOverrides = ImmutableMap.of( + "envoy.fault", + Any.pack(httpFault), + "unsupported.filter", + Any.pack(io.envoyproxy.envoy.config.route.v3.FilterConfig.newBuilder() + .setIsOptional(false).setConfig(Any.pack(StringValue.of("string value"))) + .build())); + assertThat(ClientXdsClient.parseOverrideFilterConfigs(configOverrides).getErrorDetail()) + .isEqualTo( + "HttpFilter [unsupported.filter] is not optional and has an unsupported config type: " + + "type.googleapis.com/google.protobuf.StringValue"); + + configOverrides = ImmutableMap.of( + "envoy.fault", + Any.pack(httpFault), + "unsupported.filter", + Any.pack(StringValue.of("string value"))); + assertThat(ClientXdsClient.parseOverrideFilterConfigs(configOverrides).getErrorDetail()) + .isEqualTo( + "HttpFilter [unsupported.filter] is not optional and has an unsupported config type: " + + "type.googleapis.com/google.protobuf.StringValue"); + } + @Test public void parseServerSideListener_invalidTrafficDirection() { Listener listener = diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index fc292a7c29..9779e68046 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -34,8 +34,8 @@ import com.google.common.collect.Iterables; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; -import com.google.protobuf.StringValue; import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.config.route.v3.FilterConfig; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; import io.grpc.BindableService; import io.grpc.ManagedChannel; @@ -53,7 +53,7 @@ import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.EnvoyProtoData.Node; -import io.grpc.xds.HttpFault.FractionalPercent.DenominatorType; +import io.grpc.xds.FaultConfig.FractionalPercent.DenominatorType; import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsUpdate; @@ -383,7 +383,7 @@ public abstract class ClientXdsClientTestBase { mf.buildOpaqueRoutes(1), ImmutableMap.of( "irrelevant", - Any.pack(StringValue.of("irrelevant")), + Any.pack(FilterConfig.newBuilder().setIsOptional(true).build()), "envoy.fault", mf.buildHttpFaultTypedConfig( 300L, 1000, "cluster1", ImmutableList.of(), 100, null, null, @@ -394,12 +394,15 @@ public abstract class ClientXdsClientTestBase { "envoy.fault", mf.buildHttpFaultTypedConfig( null, null, "cluster2", ImmutableList.of(), 101, null, 503, - 2000))) - )), + 2000))))), ImmutableList.of( - mf.buildHttpFilter("irrelevant", null), - mf.buildHttpFilter("envoy.fault", null) - ))); + mf.buildHttpFilter("irrelevant", null, true), + mf.buildHttpFilter( + "envoy.fault", + mf.buildHttpFaultTypedConfig( + 1L, 2, "cluster1", ImmutableList.of(), 3, null, null, + null), + false)))); call.sendResponse(LDS, listener, VERSION_1, "0000"); // Client sends an ACK LDS request. @@ -408,24 +411,23 @@ public abstract class ClientXdsClientTestBase { LdsUpdate ldsUpdate = ldsUpdateCaptor.getValue(); assertThat(ldsUpdate.virtualHosts).hasSize(2); - assertThat(ldsUpdate.hasFaultInjection).isTrue(); - assertThat(ldsUpdate.httpFault).isNull(); - HttpFault httpFault = ldsUpdate.virtualHosts.get(0).httpFault(); - assertThat(httpFault.faultDelay().delayNanos()).isEqualTo(300); - assertThat(httpFault.faultDelay().percent().numerator()).isEqualTo(1000); - assertThat(httpFault.faultDelay().percent().denominatorType()) + assertThat(ldsUpdate.filterChain.get(0).name).isEqualTo("envoy.fault"); + FaultConfig faultConfig = (FaultConfig) ldsUpdate.virtualHosts.get(0) + .filterConfigOverrides().get("envoy.fault"); + assertThat(faultConfig.faultDelay().delayNanos()).isEqualTo(300); + assertThat(faultConfig.faultDelay().percent().numerator()).isEqualTo(1000); + assertThat(faultConfig.faultDelay().percent().denominatorType()) .isEqualTo(DenominatorType.MILLION); - assertThat(httpFault.faultAbort()).isNull(); - assertThat(httpFault.upstreamCluster()).isEqualTo("cluster1"); - assertThat(httpFault.maxActiveFaults()).isEqualTo(100); - httpFault = ldsUpdate.virtualHosts.get(1).httpFault(); - assertThat(httpFault.faultDelay()).isNull(); - assertThat(httpFault.faultAbort().status().getCode()).isEqualTo(Status.Code.UNAVAILABLE); - assertThat(httpFault.faultAbort().percent().numerator()).isEqualTo(2000); - assertThat(httpFault.faultAbort().percent().denominatorType()) + assertThat(faultConfig.faultAbort()).isNull(); + assertThat(faultConfig.maxActiveFaults()).isEqualTo(100); + faultConfig = (FaultConfig) ldsUpdate.virtualHosts.get(1) + .filterConfigOverrides().get("envoy.fault"); + assertThat(faultConfig.faultDelay()).isNull(); + assertThat(faultConfig.faultAbort().status().getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(faultConfig.faultAbort().percent().numerator()).isEqualTo(2000); + assertThat(faultConfig.faultAbort().percent().denominatorType()) .isEqualTo(DenominatorType.MILLION); - assertThat(httpFault.upstreamCluster()).isEqualTo("cluster2"); - assertThat(httpFault.maxActiveFaults()).isEqualTo(101); + assertThat(faultConfig.maxActiveFaults()).isEqualTo(101); } @Test @@ -1430,7 +1432,8 @@ public abstract class ClientXdsClientTestBase { protected abstract Message buildListenerForRds(String name, String rdsResourceName); - protected abstract Message buildHttpFilter(String name, @Nullable Any typedConfig); + protected abstract Message buildHttpFilter( + String name, @Nullable Any typedConfig, boolean isOptional); protected abstract Any buildHttpFaultTypedConfig( @Nullable Long delayNanos, @Nullable Integer delayRate, String upstreamCluster, diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java index d079b96116..3f13546e58 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java @@ -277,7 +277,7 @@ public class ClientXdsClientV2Test extends ClientXdsClientTestBase { } @Override - protected Message buildHttpFilter(String name, @Nullable Any typedConfig) { + protected Message buildHttpFilter(String name, @Nullable Any typedConfig, boolean isOptional) { throw new UnsupportedOperationException(); } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java index 4b730ac016..0a74b0d86d 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java @@ -287,8 +287,8 @@ public class ClientXdsClientV3Test extends ClientXdsClientTestBase { } @Override - protected Message buildHttpFilter(String name, @Nullable Any typedConfig) { - HttpFilter.Builder builder = HttpFilter.newBuilder().setName(name); + protected Message buildHttpFilter(String name, @Nullable Any typedConfig, boolean isOptional) { + HttpFilter.Builder builder = HttpFilter.newBuilder().setName(name).setIsOptional(isOptional); if (typedConfig != null) { builder.setTypedConfig(typedConfig); } diff --git a/xds/src/test/java/io/grpc/xds/FaultFilterTest.java b/xds/src/test/java/io/grpc/xds/FaultFilterTest.java new file mode 100644 index 0000000000..92e53dd248 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/FaultFilterTest.java @@ -0,0 +1,62 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.protobuf.Any; +import io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort; +import io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.HeaderAbort; +import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault; +import io.envoyproxy.envoy.type.v3.FractionalPercent; +import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; +import io.grpc.internal.GrpcUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link FaultFilter}. */ +@RunWith(JUnit4.class) +public class FaultFilterTest { + + @Test + public void parseFaultAbort_convertHttpStatus() { + Any rawConfig = Any.pack( + HTTPFault.newBuilder().setAbort(FaultAbort.newBuilder().setHttpStatus(404)).build()); + FaultConfig faultConfig = FaultFilter.INSTANCE.parseFilterConfig(rawConfig).config; + assertThat(faultConfig.faultAbort().status().getCode()) + .isEqualTo(GrpcUtil.httpStatusToGrpcStatus(404).getCode()); + FaultConfig faultConfigOverride = + FaultFilter.INSTANCE.parseFilterConfigOverride(rawConfig).config; + assertThat(faultConfigOverride.faultAbort().status().getCode()) + .isEqualTo(GrpcUtil.httpStatusToGrpcStatus(404).getCode()); + } + + @Test + public void parseFaultAbort_withHeaderAbort() { + io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort proto = + io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.newBuilder() + .setPercentage(FractionalPercent.newBuilder() + .setNumerator(20).setDenominator(DenominatorType.HUNDRED)) + .setHeaderAbort(HeaderAbort.getDefaultInstance()).build(); + FaultConfig.FaultAbort faultAbort = FaultFilter.parseFaultAbort(proto).config; + assertThat(faultAbort.headerAbort()).isTrue(); + assertThat(faultAbort.percent().numerator()).isEqualTo(20); + assertThat(faultAbort.percent().denominatorType()) + .isEqualTo(FaultConfig.FractionalPercent.DenominatorType.HUNDRED); + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index 422f0f1c62..3652ee2a63 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -17,12 +17,11 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; -import static io.grpc.xds.XdsNameResolver.DOWNSTREAM_NODE_KEY; -import static io.grpc.xds.XdsNameResolver.HEADER_ABORT_GRPC_STATUS_KEY; -import static io.grpc.xds.XdsNameResolver.HEADER_ABORT_HTTP_STATUS_KEY; -import static io.grpc.xds.XdsNameResolver.HEADER_ABORT_PERCENTAGE_KEY; -import static io.grpc.xds.XdsNameResolver.HEADER_DELAY_KEY; -import static io.grpc.xds.XdsNameResolver.HEADER_DELAY_PERCENTAGE_KEY; +import static io.grpc.xds.FaultFilter.HEADER_ABORT_GRPC_STATUS_KEY; +import static io.grpc.xds.FaultFilter.HEADER_ABORT_HTTP_STATUS_KEY; +import static io.grpc.xds.FaultFilter.HEADER_ABORT_PERCENTAGE_KEY; +import static io.grpc.xds.FaultFilter.HEADER_DELAY_KEY; +import static io.grpc.xds.FaultFilter.HEADER_DELAY_PERCENTAGE_KEY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; @@ -35,6 +34,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.re2j.Pattern; @@ -63,8 +63,10 @@ import io.grpc.internal.NoopClientCall.NoopClientCallListener; import io.grpc.internal.ObjectPool; import io.grpc.internal.PickSubchannelArgsImpl; import io.grpc.testing.TestMethodDescriptors; -import io.grpc.xds.HttpFault.FaultAbort; -import io.grpc.xds.HttpFault.FaultDelay; +import io.grpc.xds.FaultConfig.FaultAbort; +import io.grpc.xds.FaultConfig.FaultDelay; +import io.grpc.xds.Filter.FilterConfig; +import io.grpc.xds.Filter.NamedFilterConfig; import io.grpc.xds.Matchers.HeaderMatcher; import io.grpc.xds.Matchers.PathMatcher; import io.grpc.xds.VirtualHost.Route; @@ -101,6 +103,8 @@ import org.mockito.junit.MockitoRule; @RunWith(JUnit4.class) public class XdsNameResolverTest { private static final String AUTHORITY = "foo.googleapis.com:80"; + private static final String FAULT_FILTER_INSTANCE_NAME = "envoy.fault"; + private static final String ROUTER_FILTER_INSTANCE_NAME = "envoy.router"; @Rule public final MockitoRule mocks = MockitoJUnit.rule(); private final SynchronizationContext syncContext = new SynchronizationContext( @@ -136,22 +140,21 @@ public class XdsNameResolverTest { private XdsNameResolver resolver; private TestCall testCall; private boolean originalEnableTimeout; - private AtomicLong originalFaultCounter; @Before public void setUp() { originalEnableTimeout = XdsNameResolver.enableTimeout; - originalFaultCounter = XdsNameResolver.activeFaultInjectedStreamCounter; XdsNameResolver.enableTimeout = true; - XdsNameResolver.activeFaultInjectedStreamCounter = new AtomicLong(); + FilterRegistry filterRegistry = FilterRegistry.newRegistry().register( + new FaultFilter(mockRandom, new AtomicLong()), + RouterFilter.INSTANCE); resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom); + xdsClientPoolFactory, mockRandom, filterRegistry); } @After public void tearDown() { XdsNameResolver.enableTimeout = originalEnableTimeout; - XdsNameResolver.activeFaultInjectedStreamCounter = originalFaultCounter; FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); resolver.shutdown(); if (xdsClient != null) { @@ -174,7 +177,7 @@ public class XdsNameResolverTest { } }; resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry()); resolver.start(mockListener); verify(mockListener).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); @@ -270,15 +273,19 @@ public class XdsNameResolverTest { private List buildUnmatchedVirtualHosts() { Route route1 = Route.create(RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteAction.forCluster( - cluster2, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L)), null); + cluster2, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()); Route route2 = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteAction.forCluster( - cluster1, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L)), null); + cluster1, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()); return Arrays.asList( VirtualHost.create("virtualhost-foo", Collections.singletonList("hello.googleapis.com"), - Collections.singletonList(route1), null), + Collections.singletonList(route1), + ImmutableMap.of()), VirtualHost.create("virtualhost-bar", Collections.singletonList("hi.googleapis.com"), - Collections.singletonList(route2), null)); + Collections.singletonList(route2), + ImmutableMap.of())); } @Test @@ -287,9 +294,11 @@ public class XdsNameResolverTest { FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); Route route = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteAction.forCluster( - cluster1, Collections.emptyList(), null), null); // per-route timeout unset + cluster1, Collections.emptyList(), null), // per-route timeout unset + ImmutableMap.of()); VirtualHost virtualHost = VirtualHost.create("does not matter", - Collections.singletonList(AUTHORITY), Collections.singletonList(route), null); + Collections.singletonList(AUTHORITY), Collections.singletonList(route), + ImmutableMap.of()); xdsClient.deliverLdsUpdate(AUTHORITY, 0L, Collections.singletonList(virtualHost)); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); @@ -303,9 +312,11 @@ public class XdsNameResolverTest { FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); Route route = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteAction.forCluster( - cluster1, Collections.emptyList(), null), null); // per-route timeout unset + cluster1, Collections.emptyList(), null), // per-route timeout unset + ImmutableMap.of()); VirtualHost virtualHost = VirtualHost.create("does not matter", - Collections.singletonList(AUTHORITY), Collections.singletonList(route), null); + Collections.singletonList(AUTHORITY), Collections.singletonList(route), + ImmutableMap.of()); xdsClient.deliverLdsUpdate(AUTHORITY, TimeUnit.SECONDS.toNanos(5L), Collections.singletonList(virtualHost)); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -347,7 +358,8 @@ public class XdsNameResolverTest { "/" + TestMethodDescriptors.voidMethod().getFullMethodName()), RouteAction.forCluster(cluster1, Collections.singletonList(HashPolicy.forHeader( false, "custom-key", Pattern.compile("value"), "val")), - null), null))); + null), + ImmutableMap.of()))); verify(mockListener).onResult(resolutionResultCaptor.capture()); InternalConfigSelector configSelector = resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY); @@ -383,7 +395,8 @@ public class XdsNameResolverTest { RouteMatch.withPathExactOnly( "/" + TestMethodDescriptors.voidMethod().getFullMethodName()), RouteAction.forCluster(cluster1, Collections.singletonList( - HashPolicy.forChannelId(false)), null), null))); + HashPolicy.forChannelId(false)), null), + ImmutableMap.of()))); verify(mockListener).onResult(resolutionResultCaptor.capture()); InternalConfigSelector configSelector = resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY); @@ -404,7 +417,7 @@ public class XdsNameResolverTest { resolver.shutdown(); reset(mockListener); resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry()); resolver.start(mockListener); xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( @@ -414,7 +427,8 @@ public class XdsNameResolverTest { RouteMatch.withPathExactOnly( "/" + TestMethodDescriptors.voidMethod().getFullMethodName()), RouteAction.forCluster(cluster1, Collections.singletonList( - HashPolicy.forChannelId(false)), null), null))); + HashPolicy.forChannelId(false)), null), + ImmutableMap.of()))); verify(mockListener).onResult(resolutionResultCaptor.capture()); configSelector = resolutionResultCaptor.getValue().getAttributes().get( InternalConfigSelector.KEY); @@ -445,12 +459,13 @@ public class XdsNameResolverTest { RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteAction.forCluster( "another-cluster", Collections.emptyList(), - TimeUnit.SECONDS.toNanos(20L)), null), + TimeUnit.SECONDS.toNanos(20L)), + ImmutableMap.of()), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteAction.forCluster( cluster2, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L)), - null))); + ImmutableMap.of()))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); // Updated service config still contains cluster1 while it is removed resource. New calls no @@ -484,11 +499,13 @@ public class XdsNameResolverTest { RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteAction.forCluster( "another-cluster", Collections.emptyList(), - TimeUnit.SECONDS.toNanos(20L)), null), + TimeUnit.SECONDS.toNanos(20L)), + ImmutableMap.of()), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteAction.forCluster(cluster2, Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L)), null))); + TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()))); // Two consecutive service config updates: one for removing clcuster1, // one for adding "another=cluster". verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture()); @@ -517,11 +534,13 @@ public class XdsNameResolverTest { Route.create( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteAction.forCluster("another-cluster", Collections.emptyList(), - TimeUnit.SECONDS.toNanos(20L)), null), + TimeUnit.SECONDS.toNanos(20L)), + ImmutableMap.of()), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteAction.forCluster(cluster2, Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L)), null))); + TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); @@ -535,11 +554,13 @@ public class XdsNameResolverTest { Route.create( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteAction.forCluster("another-cluster", Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L)), null), + TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteAction.forCluster(cluster2, Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L)), null))); + TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()))); verifyNoMoreInteractions(mockListener); // no cluster added/deleted assertCallSelectResult(call1, configSelector, "another-cluster", 15.0); } @@ -555,18 +576,21 @@ public class XdsNameResolverTest { Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteAction.forCluster(cluster2, Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L)), null))); + TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()))); xdsClient.deliverLdsUpdate( AUTHORITY, Arrays.asList( Route.create( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteAction.forCluster(cluster1, Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L)), null), + TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteAction.forCluster(cluster2, Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L)), null))); + TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()))); testCall.deliverErrorStatus(); verifyNoMoreInteractions(mockListener); } @@ -584,10 +608,12 @@ public class XdsNameResolverTest { RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteAction.forWeightedClusters( Arrays.asList( - ClusterWeight.create(cluster1, 20, null), - ClusterWeight.create(cluster2, 80, null)), + ClusterWeight.create(cluster1, 20, ImmutableMap.of()), + ClusterWeight.create( + cluster2, 80, ImmutableMap.of())), Collections.emptyList(), - TimeUnit.SECONDS.toNanos(20L)), null))); + TimeUnit.SECONDS.toNanos(20L)), + ImmutableMap.of()))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); assertThat(result.getAddresses()).isEmpty(); @@ -645,11 +671,13 @@ public class XdsNameResolverTest { Route.create( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteAction.forCluster(cluster1, Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L)), null), + TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteAction.forCluster(cluster2, Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L)), null))); + TimeUnit.SECONDS.toNanos(15L)), + ImmutableMap.of()))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); assertThat(result.getAddresses()).isEmpty(); @@ -788,11 +816,14 @@ public class XdsNameResolverTest { String hostname = "a.googleapis.com"; List routes = Collections.emptyList(); VirtualHost vHost1 = VirtualHost.create("virtualhost01.googleapis.com", - Arrays.asList("a.googleapis.com", "b.googleapis.com"), routes, null); + Arrays.asList("a.googleapis.com", "b.googleapis.com"), routes, + ImmutableMap.of()); VirtualHost vHost2 = VirtualHost.create("virtualhost02.googleapis.com", - Collections.singletonList("*.googleapis.com"), routes, null); + Collections.singletonList("*.googleapis.com"), routes, + ImmutableMap.of()); VirtualHost vHost3 = VirtualHost.create("virtualhost03.googleapis.com", - Collections.singletonList("*"), routes, null); + Collections.singletonList("*"), routes, + ImmutableMap.of()); List virtualHosts = Arrays.asList(vHost1, vHost2, vHost3); assertThat(XdsNameResolver.findVirtualHostForHostName(virtualHosts, hostname)) .isEqualTo(vHost1); @@ -803,11 +834,14 @@ public class XdsNameResolverTest { String hostname = "a.googleapis.com"; List routes = Collections.emptyList(); VirtualHost vHost1 = VirtualHost.create("virtualhost01.googleapis.com", - Arrays.asList("*.googleapis.com", "b.googleapis.com"), routes, null); + Arrays.asList("*.googleapis.com", "b.googleapis.com"), routes, + ImmutableMap.of()); VirtualHost vHost2 = VirtualHost.create("virtualhost02.googleapis.com", - Collections.singletonList("a.googleapis.*"), routes, null); + Collections.singletonList("a.googleapis.*"), routes, + ImmutableMap.of()); VirtualHost vHost3 = VirtualHost.create("virtualhost03.googleapis.com", - Collections.singletonList("*"), routes, null); + Collections.singletonList("*"), routes, + ImmutableMap.of()); List virtualHosts = Arrays.asList(vHost1, vHost2, vHost3); assertThat(XdsNameResolver.findVirtualHostForHostName(virtualHosts, hostname)) .isEqualTo(vHost1); @@ -818,9 +852,11 @@ public class XdsNameResolverTest { String hostname = "a.googleapis.com"; List routes = Collections.emptyList(); VirtualHost vHost1 = VirtualHost.create("virtualhost01.googleapis.com", - Collections.singletonList("*"), routes, null); + Collections.singletonList("*"), routes, + ImmutableMap.of()); VirtualHost vHost2 = VirtualHost.create("virtualhost02.googleapis.com", - Collections.singletonList("b.googleapis.com"), routes, null); + Collections.singletonList("b.googleapis.com"), routes, + ImmutableMap.of()); List virtualHosts = Arrays.asList(vHost1, vHost2); assertThat(XdsNameResolver.findVirtualHostForHostName(virtualHosts, hostname)) .isEqualTo(vHost1);; @@ -833,12 +869,9 @@ public class XdsNameResolverTest { when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50% // header abort, header abort rate = 60 % - HttpFault httpFilterFaultConfig = HttpFault.create( + FaultConfig httpFilterFaultConfig = FaultConfig.create( null, - FaultAbort.forHeader(HttpFault.FractionalPercent.perHundred(70)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + FaultAbort.forHeader(FaultConfig.FractionalPercent.perHundred(70)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -868,12 +901,9 @@ public class XdsNameResolverTest { verifyRpcFailed(observer, Status.UNIMPLEMENTED.withDescription("HTTP status code 404")); // header abort, no header rate, fix rate = 60 % - httpFilterFaultConfig = HttpFault.create( + httpFilterFaultConfig = FaultConfig.create( null, - FaultAbort.forHeader(HttpFault.FractionalPercent.perMillion(600_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + FaultAbort.forHeader(FaultConfig.FractionalPercent.perMillion(600_000)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -884,12 +914,9 @@ public class XdsNameResolverTest { verifyRpcFailed(observer, Status.UNIMPLEMENTED.withDescription("HTTP status code 404")); // header abort, no header rate, fix rate = 0 - httpFilterFaultConfig = HttpFault.create( + httpFilterFaultConfig = FaultConfig.create( null, - FaultAbort.forHeader(HttpFault.FractionalPercent.perMillion(0)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + FaultAbort.forHeader(FaultConfig.FractionalPercent.perMillion(0)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -900,14 +927,11 @@ public class XdsNameResolverTest { verifyRpcSucceeded(observer); // fixed abort, fix rate = 60% - httpFilterFaultConfig = HttpFault.create( + httpFilterFaultConfig = FaultConfig.create( null, FaultAbort.forStatus( Status.UNAUTHENTICATED.withDescription("unauthenticated"), - HttpFault.FractionalPercent.perMillion(600_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + FaultConfig.FractionalPercent.perMillion(600_000)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -918,14 +942,11 @@ public class XdsNameResolverTest { verifyRpcFailed(observer, Status.UNAUTHENTICATED.withDescription("unauthenticated")); // fixed abort, fix rate = 40% - httpFilterFaultConfig = HttpFault.create( + httpFilterFaultConfig = FaultConfig.create( null, FaultAbort.forStatus( Status.UNAUTHENTICATED.withDescription("unauthenticated"), - HttpFault.FractionalPercent.perMillion(400_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + FaultConfig.FractionalPercent.perMillion(400_000)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -943,13 +964,8 @@ public class XdsNameResolverTest { when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50% // header delay, header delay rate = 60 % - HttpFault httpFilterFaultConfig = HttpFault.create( - FaultDelay.forHeader(HttpFault.FractionalPercent.perHundred(70)), - null, - cluster1, - Collections.emptyList(), - Collections.emptyList(), - null); + FaultConfig httpFilterFaultConfig = FaultConfig.create( + FaultDelay.forHeader(FaultConfig.FractionalPercent.perHundred(70)), null, null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); @@ -965,13 +981,8 @@ public class XdsNameResolverTest { verifyRpcDelayed(observer, TimeUnit.MILLISECONDS.toNanos(1000)); // header delay, no header rate, fix rate = 60 % - httpFilterFaultConfig = HttpFault.create( - FaultDelay.forHeader(HttpFault.FractionalPercent.perMillion(600_000)), - null, - cluster1, - Collections.emptyList(), - Collections.emptyList(), - null); + httpFilterFaultConfig = FaultConfig.create( + FaultDelay.forHeader(FaultConfig.FractionalPercent.perMillion(600_000)), null, null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); @@ -981,13 +992,8 @@ public class XdsNameResolverTest { verifyRpcDelayed(observer, TimeUnit.MILLISECONDS.toNanos(1000)); // header delay, no header rate, fix rate = 0 - httpFilterFaultConfig = HttpFault.create( - FaultDelay.forHeader(HttpFault.FractionalPercent.perMillion(0)), - null, - cluster1, - Collections.emptyList(), - Collections.emptyList(), - null); + httpFilterFaultConfig = FaultConfig.create( + FaultDelay.forHeader(FaultConfig.FractionalPercent.perMillion(0)), null, null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); @@ -997,12 +1003,9 @@ public class XdsNameResolverTest { verifyRpcSucceeded(observer); // fixed delay, fix rate = 60% - httpFilterFaultConfig = HttpFault.create( - FaultDelay.forFixedDelay(5000L, HttpFault.FractionalPercent.perMillion(600_000)), + httpFilterFaultConfig = FaultConfig.create( + FaultDelay.forFixedDelay(5000L, FaultConfig.FractionalPercent.perMillion(600_000)), null, - cluster1, - Collections.emptyList(), - Collections.emptyList(), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -1013,159 +1016,9 @@ public class XdsNameResolverTest { verifyRpcDelayed(observer, 5000L); // fixed delay, fix rate = 40% - httpFilterFaultConfig = HttpFault.create( - FaultDelay.forFixedDelay(5000L, HttpFault.FractionalPercent.perMillion(400_000)), + httpFilterFaultConfig = FaultConfig.create( + FaultDelay.forFixedDelay(5000L, FaultConfig.FractionalPercent.perMillion(400_000)), null, - cluster1, - Collections.emptyList(), - Collections.emptyList(), - null); - xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - result = resolutionResultCaptor.getValue(); - configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, - Collections.emptyMap(), CallOptions.DEFAULT); - verifyRpcSucceeded(observer); - } - - @Test - public void resolved_faultAbortWithUpstreamClusterMismatchInLdsUpdate() { - resolver.start(mockListener); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50% - - HttpFault httpFilterFaultConfig = HttpFault.create( - null, - FaultAbort.forStatus( - Status.UNAUTHENTICATED.withDescription("unauthenticated"), - HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), - null); - // cluster mismatch with fault config - xdsClient.deliverLdsUpdateWithFaultInjection(cluster2, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), - configSelector, Collections.emptyMap(), CallOptions.DEFAULT); - verifyRpcSucceeded(observer); - } - - @Test - public void resolved_faultAbortWithDownstreamNodesInLdsUpdate() { - resolver.start(mockListener); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50% - - // downstream node match - HttpFault httpFilterFaultConfig = HttpFault.create( - null, - FaultAbort.forStatus( - Status.UNAUTHENTICATED.withDescription("unauthenticated"), - HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.singletonList("node1.example.com"), - Collections.emptyList(), - null); - xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), - configSelector, ImmutableMap.of(DOWNSTREAM_NODE_KEY.name(), "node1.example.com"), - CallOptions.DEFAULT); - verifyRpcFailed(observer, Status.UNAUTHENTICATED.withDescription("unauthenticated")); - - // downstream node mismatch - httpFilterFaultConfig = HttpFault.create( - null, - FaultAbort.forStatus( - Status.UNAUTHENTICATED.withDescription("unauthenticated"), - HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.singletonList("node1.example.com"), - Collections.emptyList(), - null); - xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - result = resolutionResultCaptor.getValue(); - configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, - ImmutableMap.of(DOWNSTREAM_NODE_KEY.name(), "node2.example.com"), CallOptions.DEFAULT); - verifyRpcSucceeded(observer); - - // downstream node absent in headers - httpFilterFaultConfig = HttpFault.create( - null, - FaultAbort.forStatus( - Status.UNAUTHENTICATED.withDescription("unauthenticated"), - HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.singletonList("node1.example.com"), - Collections.emptyList(), - null); - xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - result = resolutionResultCaptor.getValue(); - configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, - Collections.emptyMap(), CallOptions.DEFAULT); - verifyRpcSucceeded(observer); - } - - @Test - public void resolved_faultAbortWithHeaderMatcherInLdsUpdate() { - resolver.start(mockListener); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50% - - // headers match - HttpFault httpFilterFaultConfig = HttpFault.create( - null, - FaultAbort.forStatus( - Status.UNAUTHENTICATED, HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.singletonList(HeaderMatcher.forExactValue("fault_key", "fault_value", false)), - null); - xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), - configSelector, ImmutableMap.of("fault_key", "fault_value"), CallOptions.DEFAULT); - verifyRpcFailed(observer, Status.UNAUTHENTICATED); - - // headers mismatch - httpFilterFaultConfig = HttpFault.create( - null, - FaultAbort.forStatus( - Status.UNAUTHENTICATED.withDescription("unauthenticated"), - HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.singletonList(HeaderMatcher.forExactValue("fault_key", "fault_value", false)), - null); - xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - result = resolutionResultCaptor.getValue(); - configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, - ImmutableMap.of("fault_key", "value_not_match"), CallOptions.DEFAULT); - verifyRpcSucceeded(observer); - - // headers absent - httpFilterFaultConfig = HttpFault.create( - null, - FaultAbort.forStatus( - Status.UNAUTHENTICATED.withDescription("unauthenticated"), - HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.singletonList(HeaderMatcher.forExactValue("fault_key", "fault_value", false)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -1182,12 +1035,9 @@ public class XdsNameResolverTest { FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50% - HttpFault httpFilterFaultConfig = HttpFault.create( - FaultDelay.forFixedDelay(5000L, HttpFault.FractionalPercent.perMillion(1000_000)), + FaultConfig httpFilterFaultConfig = FaultConfig.create( + FaultDelay.forFixedDelay(5000L, FaultConfig.FractionalPercent.perMillion(1000_000)), null, - cluster1, - Collections.emptyList(), - Collections.emptyList(), /* maxActiveFaults= */ 1); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -1209,20 +1059,31 @@ public class XdsNameResolverTest { verifyRpcDelayed(observer3, 5000L); } + @Test + public void resolved_withNoRouterFilter() { + resolver.start(mockListener); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverLdsUpdateWithNoRouterFilter(); + verify(mockListener).onResult(resolutionResultCaptor.capture()); + ResolutionResult result = resolutionResultCaptor.getValue(); + InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); + ClientCall.Listener observer = startNewCall( + TestMethodDescriptors.voidMethod(), configSelector, Collections.emptyMap(), + CallOptions.DEFAULT); + verifyRpcFailed(observer, Status.UNAVAILABLE.withDescription("No router filter")); + } + @Test public void resolved_faultAbortAndDelayInLdsUpdateInLdsUpdate() { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50% - HttpFault httpFilterFaultConfig = HttpFault.create( - FaultDelay.forFixedDelay(5000L, HttpFault.FractionalPercent.perMillion(1000_000)), + FaultConfig httpFilterFaultConfig = FaultConfig.create( + FaultDelay.forFixedDelay(5000L, FaultConfig.FractionalPercent.perMillion(1000_000)), FaultAbort.forStatus( Status.UNAUTHENTICATED.withDescription("unauthenticated"), - HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + FaultConfig.FractionalPercent.perMillion(1000_000)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -1240,21 +1101,15 @@ public class XdsNameResolverTest { FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50% - HttpFault httpFilterFaultConfig = HttpFault.create( + FaultConfig httpFilterFaultConfig = FaultConfig.create( null, FaultAbort.forStatus( - Status.UNAUTHENTICATED, HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + Status.UNAUTHENTICATED, FaultConfig.FractionalPercent.perMillion(1000_000)), null); // VirtualHost fault config override - HttpFault virtualHostFaultConfig = HttpFault.create( + FaultConfig virtualHostFaultConfig = FaultConfig.create( null, - FaultAbort.forStatus(Status.INTERNAL, HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + FaultAbort.forStatus(Status.INTERNAL, FaultConfig.FractionalPercent.perMillion(1000_000)), null); xdsClient.deliverLdsUpdateWithFaultInjection( cluster1, httpFilterFaultConfig, virtualHostFaultConfig, null, null); @@ -1266,12 +1121,9 @@ public class XdsNameResolverTest { verifyRpcFailed(observer, Status.INTERNAL); // Route fault config override - HttpFault routeFaultConfig = HttpFault.create( + FaultConfig routeFaultConfig = FaultConfig.create( null, - FaultAbort.forStatus(Status.UNKNOWN, HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + FaultAbort.forStatus(Status.UNKNOWN, FaultConfig.FractionalPercent.perMillion(1000_000)), null); xdsClient.deliverLdsUpdateWithFaultInjection( cluster1, httpFilterFaultConfig, virtualHostFaultConfig, routeFaultConfig, null); @@ -1283,12 +1135,10 @@ public class XdsNameResolverTest { verifyRpcFailed(observer, Status.UNKNOWN); // WeightedCluster fault config override - HttpFault weightedClusterFaultConfig = HttpFault.create( + FaultConfig weightedClusterFaultConfig = FaultConfig.create( null, - FaultAbort.forStatus(Status.UNAVAILABLE, HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + FaultAbort.forStatus( + Status.UNAVAILABLE, FaultConfig.FractionalPercent.perMillion(1000_000)), null); xdsClient.deliverLdsUpdateWithFaultInjection( cluster1, httpFilterFaultConfig, virtualHostFaultConfig, routeFaultConfig, @@ -1307,24 +1157,17 @@ public class XdsNameResolverTest { FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50% - HttpFault httpFilterFaultConfig = HttpFault.create( + FaultConfig httpFilterFaultConfig = FaultConfig.create( null, FaultAbort.forStatus( - Status.UNAUTHENTICATED, HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + Status.UNAUTHENTICATED, FaultConfig.FractionalPercent.perMillion(1000_000)), null); xdsClient.deliverRdsNameWithFaultInjection(AUTHORITY, httpFilterFaultConfig); - // Route fault config override - HttpFault routeFaultConfig = HttpFault.create( + FaultConfig routeFaultConfig = FaultConfig.create( null, - FaultAbort.forStatus(Status.UNKNOWN, HttpFault.FractionalPercent.perMillion(1000_000)), - cluster1, - Collections.emptyList(), - Collections.emptyList(), + FaultAbort.forStatus(Status.UNKNOWN, FaultConfig.FractionalPercent.perMillion(1000_000)), null); xdsClient.deliverRdsUpdateWithFaultInjection(null, routeFaultConfig, null); verify(mockListener).onResult(resolutionResultCaptor.capture()); @@ -1567,7 +1410,7 @@ public class XdsNameResolverTest { if (!resourceName.equals(ldsResource)) { return; } - ldsWatcher.onChanged(new LdsUpdate(httpMaxStreamDurationNano, virtualHosts, false, null)); + ldsWatcher.onChanged(new LdsUpdate(httpMaxStreamDurationNano, virtualHosts, null)); } }); } @@ -1580,24 +1423,39 @@ public class XdsNameResolverTest { return; } VirtualHost virtualHost = VirtualHost.create("virtual-host", - Collections.singletonList(AUTHORITY), routes, null); - ldsWatcher.onChanged( - new LdsUpdate(0, Collections.singletonList(virtualHost), false, null)); + Collections.singletonList(AUTHORITY), routes, + ImmutableMap.of()); + ldsWatcher.onChanged(new LdsUpdate(0, Collections.singletonList(virtualHost), null)); } }); } void deliverLdsUpdateWithFaultInjection( final String cluster, - final HttpFault httpFilterFaultConfig, - final HttpFault virtualHostFaultConfig, - final HttpFault routeFaultConfig, - final HttpFault weightedClusterFaultConfig) { + FaultConfig httpFilterFaultConfig, + final FaultConfig virtualHostFaultConfig, + final FaultConfig routeFaultConfig, + final FaultConfig weightedClusterFaultConfig) { + if (httpFilterFaultConfig == null) { + httpFilterFaultConfig = FaultConfig.create(null, null, null); + } + final List filterChain = ImmutableList.of( + new NamedFilterConfig(FAULT_FILTER_INSTANCE_NAME, httpFilterFaultConfig), + new NamedFilterConfig(ROUTER_FILTER_INSTANCE_NAME, RouterFilter.ROUTER_CONFIG)); syncContext.execute(new Runnable() { @Override public void run() { + ImmutableMap overrideConfig = weightedClusterFaultConfig == null + ? ImmutableMap.of() + : ImmutableMap.of( + FAULT_FILTER_INSTANCE_NAME, weightedClusterFaultConfig); ClusterWeight clusterWeight = - ClusterWeight.create(cluster, 100, weightedClusterFaultConfig); + ClusterWeight.create( + cluster, 100, + overrideConfig); + overrideConfig = routeFaultConfig == null + ? ImmutableMap.of() + : ImmutableMap.of(FAULT_FILTER_INSTANCE_NAME, routeFaultConfig); Route route = Route.create( RouteMatch.create( PathMatcher.fromPrefix("/", false), Collections.emptyList(), null), @@ -1605,24 +1463,46 @@ public class XdsNameResolverTest { Collections.singletonList(clusterWeight), Collections.emptyList(), null), - routeFaultConfig); + overrideConfig); + overrideConfig = virtualHostFaultConfig == null + ? ImmutableMap.of() + : ImmutableMap.of( + FAULT_FILTER_INSTANCE_NAME, virtualHostFaultConfig); VirtualHost virtualHost = VirtualHost.create( "virtual-host", Collections.singletonList(AUTHORITY), - Collections.singletonList(route), virtualHostFaultConfig); + Collections.singletonList(route), + overrideConfig); ldsWatcher.onChanged( - new LdsUpdate( - 0, Collections.singletonList(virtualHost), true, httpFilterFaultConfig)); + new LdsUpdate(0, Collections.singletonList(virtualHost), filterChain)); } }); } + void deliverLdsUpdateWithNoRouterFilter() { + VirtualHost virtualHost = VirtualHost.create( + "virtual-host", + Collections.singletonList(AUTHORITY), + Collections.emptyList(), + Collections.emptyMap()); + ldsWatcher.onChanged( + new LdsUpdate( + 0, Collections.singletonList(virtualHost), ImmutableList.of())); + } + void deliverRdsNameWithFaultInjection( - final String rdsName, final HttpFault httpFilterFaultConfig) { + final String rdsName, FaultConfig httpFilterFaultConfig) { + if (httpFilterFaultConfig == null) { + httpFilterFaultConfig = FaultConfig.create( + null, null, null); + } + final ImmutableList filterChain = ImmutableList.of( + new NamedFilterConfig(FAULT_FILTER_INSTANCE_NAME, httpFilterFaultConfig), + new NamedFilterConfig(ROUTER_FILTER_INSTANCE_NAME, RouterFilter.ROUTER_CONFIG)); syncContext.execute(new Runnable() { @Override public void run() { - ldsWatcher.onChanged(new LdsUpdate(0, rdsName, true, httpFilterFaultConfig)); + ldsWatcher.onChanged(new LdsUpdate(0, rdsName, filterChain)); } }); } @@ -1634,7 +1514,8 @@ public class XdsNameResolverTest { if (!resourceName.equals(ldsResource)) { return; } - ldsWatcher.onChanged(new LdsUpdate(0, rdsName, false, null)); + ldsWatcher.onChanged( + new LdsUpdate(0, rdsName, null)); } }); } @@ -1652,13 +1533,20 @@ public class XdsNameResolverTest { } void deliverRdsUpdateWithFaultInjection( - final HttpFault virtualHostFaultConfig, final HttpFault routFaultConfig, - final HttpFault weightedClusterFaultConfig) { + final FaultConfig virtualHostFaultConfig, final FaultConfig routFaultConfig, + final FaultConfig weightedClusterFaultConfig) { syncContext.execute(new Runnable() { @Override public void run() { + ImmutableMap overrideConfig = weightedClusterFaultConfig == null + ? ImmutableMap.of() + : ImmutableMap.of( + FAULT_FILTER_INSTANCE_NAME, weightedClusterFaultConfig); ClusterWeight clusterWeight = - ClusterWeight.create(cluster1, 100, weightedClusterFaultConfig); + ClusterWeight.create(cluster1, 100, overrideConfig); + overrideConfig = routFaultConfig == null + ? ImmutableMap.of() + : ImmutableMap.of(FAULT_FILTER_INSTANCE_NAME, routFaultConfig); Route route = Route.create( RouteMatch.create( PathMatcher.fromPrefix("/", false), Collections.emptyList(), null), @@ -1666,11 +1554,16 @@ public class XdsNameResolverTest { Collections.singletonList(clusterWeight), Collections.emptyList(), null), - routFaultConfig); + overrideConfig); + overrideConfig = virtualHostFaultConfig == null + ? ImmutableMap.of() + : ImmutableMap.of( + FAULT_FILTER_INSTANCE_NAME, virtualHostFaultConfig); VirtualHost virtualHost = VirtualHost.create( "virtual-host", Collections.singletonList(AUTHORITY), - Collections.singletonList(route), virtualHostFaultConfig); + Collections.singletonList(route), + overrideConfig); rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost))); } });