mirror of https://github.com/grpc/grpc-node.git
grpc-js-xds: Add support for server http filters
This commit is contained in:
parent
c4580fa80b
commit
ff679ae473
|
@ -16,7 +16,7 @@
|
|||
|
||||
// This is a non-public, unstable API, but it's very convenient
|
||||
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
|
||||
import { experimental, logVerbosity } from '@grpc/grpc-js';
|
||||
import { experimental, logVerbosity, ServerInterceptor } from '@grpc/grpc-js';
|
||||
import { Any__Output } from './generated/google/protobuf/Any';
|
||||
import Filter = experimental.Filter;
|
||||
import FilterFactory = experimental.FilterFactory;
|
||||
|
@ -64,7 +64,8 @@ export interface HttpFilterFactoryConstructor<FilterType extends Filter> {
|
|||
export interface HttpFilterRegistryEntry {
|
||||
parseTopLevelFilterConfig(encodedConfig: Any__Output): HttpFilterConfig | null;
|
||||
parseOverrideFilterConfig(encodedConfig: Any__Output): HttpFilterConfig | null;
|
||||
httpFilterConstructor: HttpFilterFactoryConstructor<Filter>;
|
||||
httpFilterConstructor?: HttpFilterFactoryConstructor<Filter> | undefined;
|
||||
createServerFilter?: ((config: HttpFilterConfig, overrideConfigMap: Map<string, HttpFilterConfig>) => ServerInterceptor) | undefined;
|
||||
}
|
||||
|
||||
const FILTER_REGISTRY = new Map<string, HttpFilterRegistryEntry>();
|
||||
|
@ -106,7 +107,7 @@ export function getTopLevelFilterUrl(encodedConfig: Any__Output): string {
|
|||
}
|
||||
}
|
||||
|
||||
export function validateTopLevelFilter(httpFilter: HttpFilter__Output): boolean {
|
||||
export function validateTopLevelFilter(httpFilter: HttpFilter__Output, client: boolean): boolean {
|
||||
if (!httpFilter.typed_config) {
|
||||
trace(httpFilter.name + ' validation failed: typed_config unset');
|
||||
return false;
|
||||
|
@ -121,6 +122,17 @@ export function validateTopLevelFilter(httpFilter: HttpFilter__Output): boolean
|
|||
}
|
||||
const registryEntry = FILTER_REGISTRY.get(typeUrl);
|
||||
if (registryEntry) {
|
||||
if (!httpFilter.is_optional) {
|
||||
if (client) {
|
||||
if (!registryEntry.httpFilterConstructor) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (!registryEntry.createServerFilter) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
const parsedConfig = registryEntry.parseTopLevelFilterConfig(encodedConfig);
|
||||
if (parsedConfig === null) {
|
||||
trace(httpFilter.name + ' validation failed: config parsing failed');
|
||||
|
@ -185,7 +197,7 @@ export function validateOverrideFilter(encodedConfig: Any__Output): boolean {
|
|||
}
|
||||
}
|
||||
|
||||
export function parseTopLevelFilterConfig(encodedConfig: Any__Output) {
|
||||
export function parseTopLevelFilterConfig(encodedConfig: Any__Output, client: boolean) {
|
||||
let typeUrl: string;
|
||||
try {
|
||||
typeUrl = getTopLevelFilterUrl(encodedConfig);
|
||||
|
@ -194,6 +206,15 @@ export function parseTopLevelFilterConfig(encodedConfig: Any__Output) {
|
|||
}
|
||||
const registryEntry = FILTER_REGISTRY.get(typeUrl);
|
||||
if (registryEntry) {
|
||||
if (client) {
|
||||
if (!registryEntry.httpFilterConstructor) {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
if (!registryEntry.createServerFilter) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return registryEntry.parseTopLevelFilterConfig(encodedConfig);
|
||||
} else {
|
||||
// Filter type URL not found in registry
|
||||
|
@ -236,11 +257,20 @@ export function parseOverrideFilterConfig(encodedConfig: Any__Output) {
|
|||
}
|
||||
}
|
||||
|
||||
export function createHttpFilter(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig): FilterFactory<Filter> | null {
|
||||
export function createClientHttpFilter(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig): FilterFactory<Filter> | null {
|
||||
const registryEntry = FILTER_REGISTRY.get(config.typeUrl);
|
||||
if (registryEntry) {
|
||||
if (registryEntry && registryEntry.httpFilterConstructor) {
|
||||
return new registryEntry.httpFilterConstructor(config, overrideConfig);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function createServerHttpFilter(config: HttpFilterConfig, overrideConfigMap: Map<string, HttpFilterConfig>): ServerInterceptor | null {
|
||||
const registryEntry = FILTER_REGISTRY.get(config.typeUrl);
|
||||
if (registryEntry && registryEntry.createServerFilter) {
|
||||
return registryEntry.createServerFilter(config, overrideConfigMap);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -213,8 +213,8 @@ function asyncTimeout(timeMs: number): Promise<void> {
|
|||
|
||||
/**
|
||||
* Returns true with probability numerator/denominator.
|
||||
* @param numerator
|
||||
* @param denominator
|
||||
* @param numerator
|
||||
* @param denominator
|
||||
*/
|
||||
function rollRandomPercentage(numerator: number, denominator: number): boolean {
|
||||
return Math.random() * denominator < numerator;
|
||||
|
@ -344,4 +344,4 @@ export function setup() {
|
|||
parseOverrideFilterConfig: parseHTTPFaultConfig,
|
||||
httpFilterConstructor: FaultInjectionFilterFactory
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { experimental } from '@grpc/grpc-js';
|
||||
import { experimental, ServerInterceptingCall, ServerInterceptor } from '@grpc/grpc-js';
|
||||
import { Any__Output } from '../generated/google/protobuf/Any';
|
||||
import { HttpFilterConfig, registerHttpFilter } from '../http-filter';
|
||||
import Filter = experimental.Filter;
|
||||
|
@ -31,6 +31,21 @@ class RouterFilterFactory implements FilterFactory<RouterFilter> {
|
|||
}
|
||||
}
|
||||
|
||||
function createServerHttpFilter(config: HttpFilterConfig, overrideConfigMap: Map<string, HttpFilterConfig>): ServerInterceptor {
|
||||
return (methodDescriptor, call) => {
|
||||
return new ServerInterceptingCall(call, {
|
||||
start: next => {
|
||||
next({
|
||||
onReceiveMetadata: (metadata, next) => {
|
||||
metadata.remove('grpc-route');
|
||||
next(metadata);
|
||||
}
|
||||
})
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router';
|
||||
|
||||
function parseConfig(encodedConfig: Any__Output): HttpFilterConfig | null {
|
||||
|
@ -44,6 +59,7 @@ export function setup() {
|
|||
registerHttpFilter(ROUTER_FILTER_URL, {
|
||||
parseTopLevelFilterConfig: parseConfig,
|
||||
parseOverrideFilterConfig: parseConfig,
|
||||
httpFilterConstructor: RouterFilterFactory
|
||||
httpFilterConstructor: RouterFilterFactory,
|
||||
createServerFilter: createServerHttpFilter
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import { HashPolicy, RouteAction, SingleClusterRouteAction, WeightedCluster, Wei
|
|||
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resources';
|
||||
import Duration = experimental.Duration;
|
||||
import { Duration__Output } from './generated/google/protobuf/Duration';
|
||||
import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter';
|
||||
import { createClientHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter';
|
||||
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RETRY, EXPERIMENTAL_RING_HASH } from './environment';
|
||||
import Filter = experimental.Filter;
|
||||
import FilterFactory = experimental.FilterFactory;
|
||||
|
@ -171,7 +171,7 @@ class XdsResolver implements Resolver {
|
|||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const filter of httpConnectionManager.http_filters) {
|
||||
// typed_config must be set here, or validation would have failed
|
||||
const filterConfig = parseTopLevelFilterConfig(filter.typed_config!);
|
||||
const filterConfig = parseTopLevelFilterConfig(filter.typed_config!, true);
|
||||
if (filterConfig) {
|
||||
ldsHttpFilterConfigs.push({name: filter.name, config: filterConfig});
|
||||
}
|
||||
|
@ -273,17 +273,17 @@ class XdsResolver implements Resolver {
|
|||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const filterConfig of ldsHttpFilterConfigs) {
|
||||
if (routeHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!);
|
||||
const filter = createClientHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
} else if (virtualHostHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, virtualHostHttpFilterOverrides.get(filterConfig.name)!);
|
||||
const filter = createClientHttpFilter(filterConfig.config, virtualHostHttpFilterOverrides.get(filterConfig.name)!);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
} else {
|
||||
const filter = createHttpFilter(filterConfig.config);
|
||||
const filter = createClientHttpFilter(filterConfig.config);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
|
@ -308,22 +308,22 @@ class XdsResolver implements Resolver {
|
|||
}
|
||||
for (const filterConfig of ldsHttpFilterConfigs) {
|
||||
if (clusterHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, clusterHttpFilterOverrides.get(filterConfig.name)!);
|
||||
const filter = createClientHttpFilter(filterConfig.config, clusterHttpFilterOverrides.get(filterConfig.name)!);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
} else if (routeHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!);
|
||||
const filter = createClientHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
} else if (virtualHostHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, virtualHostHttpFilterOverrides.get(filterConfig.name)!);
|
||||
const filter = createClientHttpFilter(filterConfig.config, virtualHostHttpFilterOverrides.get(filterConfig.name)!);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
} else {
|
||||
const filter = createHttpFilter(filterConfig.config);
|
||||
const filter = createClientHttpFilter(filterConfig.config);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import { findVirtualHostForDomain } from "./xds-dependency-manager";
|
|||
import { LogVerbosity } from "@grpc/grpc-js/build/src/constants";
|
||||
import { XdsServerCredentials } from "./xds-credentials";
|
||||
import { CertificateValidationContext__Output } from "./generated/envoy/extensions/transport_sockets/tls/v3/CertificateValidationContext";
|
||||
import { createServerHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from "./http-filter";
|
||||
|
||||
const TRACER_NAME = 'xds_server';
|
||||
|
||||
|
@ -64,6 +65,7 @@ interface NormalizedFilterChainMatch {
|
|||
}
|
||||
|
||||
interface RouteEntry {
|
||||
id: string;
|
||||
matcher: Matcher;
|
||||
isNonForwardingAction: boolean;
|
||||
}
|
||||
|
@ -94,6 +96,10 @@ class FilterChainEntry {
|
|||
private virtualHosts: VirtualHostEntry[] | null = null;
|
||||
private connectionInjector: ConnectionInjector;
|
||||
private hasRouteConfigErrors = false;
|
||||
/**
|
||||
* filter name -> route ID -> config
|
||||
*/
|
||||
private overrideConfigMaps = new Map<string, Map<string, HttpFilterConfig>>();
|
||||
constructor(private configParameters: ConfigParameters, filterChain: FilterChain__Output, credentials: ServerCredentials, onRouteConfigPopulated: () => void) {
|
||||
this.matchers = normalizeFilterChainMatch(filterChain.filter_chain_match);
|
||||
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, filterChain.filters[0].typed_config!.value);
|
||||
|
@ -145,6 +151,7 @@ class FilterChainEntry {
|
|||
for (const route of virtualHost.routes) {
|
||||
if (route.matcher.apply(methodDescriptor.path, metadata)) {
|
||||
if (route.isNonForwardingAction) {
|
||||
metadata.set('grpc-route', route.id);
|
||||
next(metadata);
|
||||
} else {
|
||||
call.sendStatus(routeErrorStatus);
|
||||
|
@ -158,6 +165,18 @@ class FilterChainEntry {
|
|||
}
|
||||
});
|
||||
}
|
||||
const httpFilterInterceptors: ServerInterceptor[] = [];
|
||||
for (const filter of httpConnectionManager.http_filters) {
|
||||
const filterConfig = parseTopLevelFilterConfig(filter.typed_config!, false);
|
||||
if (filterConfig) {
|
||||
const filterOverrideConfigMap = new Map<string, HttpFilterConfig>();
|
||||
this.overrideConfigMaps.set(filterConfig.typeUrl, filterOverrideConfigMap);
|
||||
const filterInterceptor = createServerHttpFilter(filterConfig, filterOverrideConfigMap);
|
||||
if (filterInterceptor) {
|
||||
httpFilterInterceptors.push(filterInterceptor);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (credentials instanceof XdsServerCredentials) {
|
||||
if (filterChain.transport_socket) {
|
||||
trace('Using secure credentials');
|
||||
|
@ -193,20 +212,24 @@ class FilterChainEntry {
|
|||
credentials = credentials.getFallbackCredentials();
|
||||
}
|
||||
}
|
||||
const interceptingCredentials = createServerCredentialsWithInterceptors(credentials, [interceptor]);
|
||||
const interceptingCredentials = createServerCredentialsWithInterceptors(credentials, [interceptor, ...httpFilterInterceptors]);
|
||||
this.connectionInjector = configParameters.createConnectionInjector(interceptingCredentials);
|
||||
}
|
||||
|
||||
private handleRouteConfigurationResource(routeConfig: RouteConfiguration__Output) {
|
||||
let hasRouteConfigErrors = false;
|
||||
this.virtualHosts = [];
|
||||
for (const virtualHost of routeConfig.virtual_hosts) {
|
||||
for (const overrideMap of this.overrideConfigMaps.values()) {
|
||||
overrideMap.clear();
|
||||
}
|
||||
for (const [virtualHostIndex, virtualHost] of routeConfig.virtual_hosts.entries()) {
|
||||
const virtualHostEntry: VirtualHostEntry = {
|
||||
domains: virtualHost.domains,
|
||||
routes: []
|
||||
};
|
||||
for (const route of virtualHost.routes) {
|
||||
for (const [routeIndex, route] of virtualHost.routes.entries()) {
|
||||
const routeEntry: RouteEntry = {
|
||||
id: `virtualhost=${virtualHostIndex} route=${routeIndex}`,
|
||||
matcher: getPredicateForMatcher(route.match!),
|
||||
isNonForwardingAction: route.action === 'non_forwarding_action'
|
||||
};
|
||||
|
@ -215,6 +238,12 @@ class FilterChainEntry {
|
|||
this.logConfigurationError('For domains matching [' + virtualHostEntry.domains + '] requests will be rejected for routes matching ' + routeEntry.matcher.toString());
|
||||
}
|
||||
virtualHostEntry.routes.push(routeEntry);
|
||||
for (const [filterName, overrideConfig] of Object.entries(route.typed_per_filter_config)) {
|
||||
const parsedConfig = parseOverrideFilterConfig(overrideConfig);
|
||||
if (parsedConfig) {
|
||||
this.overrideConfigMaps.get(filterName)?.set(routeEntry.id, parsedConfig);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.virtualHosts.push(virtualHostEntry);
|
||||
}
|
||||
|
|
|
@ -87,9 +87,10 @@ function normalizeFilterChainMatch(filterChainMatch: FilterChainMatch__Output):
|
|||
|
||||
/**
|
||||
* @param httpConnectionManager
|
||||
* @param
|
||||
* @returns A list of validation errors, if there are any. An empty list indicates success
|
||||
*/
|
||||
function validateHttpConnectionManager(httpConnectionManager: HttpConnectionManager__Output): string[] {
|
||||
function validateHttpConnectionManager(httpConnectionManager: HttpConnectionManager__Output, client: boolean): string[] {
|
||||
const errors: string[] = [];
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
const filterNames = new Set<string>();
|
||||
|
@ -98,7 +99,7 @@ function validateHttpConnectionManager(httpConnectionManager: HttpConnectionMana
|
|||
errors.push(`duplicate HTTP filter name: ${httpFilter.name}`);
|
||||
}
|
||||
filterNames.add(httpFilter.name);
|
||||
if (!validateTopLevelFilter(httpFilter)) {
|
||||
if (!validateTopLevelFilter(httpFilter, client)) {
|
||||
errors.push(`${httpFilter.name} filter validation failed`);
|
||||
}
|
||||
/* Validate that the last filter, and only the last filter, is the
|
||||
|
@ -237,7 +238,7 @@ function validateFilterChain(context: XdsDecodeContext, filterChain: FilterChain
|
|||
if (filterChain.filters.length === 1) {
|
||||
if (filterChain.filters[0].typed_config?.type_url === HTTP_CONNECTION_MANGER_TYPE_URL) {
|
||||
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, filterChain.filters[0].typed_config.value);
|
||||
errors.push(...validateHttpConnectionManager(httpConnectionManager).map(error => `filters[0].typed_config: ${error}`));
|
||||
errors.push(...validateHttpConnectionManager(httpConnectionManager, false).map(error => `filters[0].typed_config: ${error}`));
|
||||
} else {
|
||||
errors.push(`Unexpected value of filters[0].typed_config.type_url: ${filterChain.filters[0].typed_config?.type_url}`);
|
||||
}
|
||||
|
@ -270,7 +271,7 @@ export class ListenerResourceType extends XdsResourceType {
|
|||
message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL
|
||||
) {
|
||||
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, message.api_listener!.api_listener.value);
|
||||
errors.push(...validateHttpConnectionManager(httpConnectionManager).map(error => `api_listener.api_listener: ${error}`));
|
||||
errors.push(...validateHttpConnectionManager(httpConnectionManager, true).map(error => `api_listener.api_listener: ${error}`));
|
||||
} else {
|
||||
errors.push(`api_listener.api_listener.type_url != ${HTTP_CONNECTION_MANGER_TYPE_URL}`);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue