mirror of https://github.com/grpc/grpc-node.git
Merge pull request #1853 from murgatroid99/grpc-js-xds_http_filters
grpc-js-xds: Add HTTP Filters support
This commit is contained in:
commit
92cc956dec
|
@ -12,7 +12,7 @@
|
|||
"prepare": "npm run compile",
|
||||
"pretest": "npm run compile",
|
||||
"posttest": "npm run check",
|
||||
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/udpa/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v2/ads.proto envoy/service/load_stats/v2/lrs.proto envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto",
|
||||
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/udpa/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v2/ads.proto envoy/service/load_stats/v2/lrs.proto envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto",
|
||||
"generate-interop-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O interop/generated --grpcLib @grpc/grpc-js grpc/testing/test.proto"
|
||||
},
|
||||
"repository": {
|
||||
|
|
|
@ -13,4 +13,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
*/
|
||||
|
||||
export const EXPERIMENTAL_FAULT_INJECTION = process.env.GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION;
|
|
@ -1,18 +1,15 @@
|
|||
// Original file: null
|
||||
|
||||
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
|
||||
import type { MigrateAnnotation as _udpa_annotations_MigrateAnnotation, MigrateAnnotation__Output as _udpa_annotations_MigrateAnnotation__Output } from '../../udpa/annotations/MigrateAnnotation';
|
||||
|
||||
export interface EnumOptions {
|
||||
'allowAlias'?: (boolean);
|
||||
'deprecated'?: (boolean);
|
||||
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
|
||||
'.udpa.annotations.enum_migrate'?: (_udpa_annotations_MigrateAnnotation);
|
||||
}
|
||||
|
||||
export interface EnumOptions__Output {
|
||||
'allowAlias': (boolean);
|
||||
'deprecated': (boolean);
|
||||
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
|
||||
'.udpa.annotations.enum_migrate'?: (_udpa_annotations_MigrateAnnotation__Output);
|
||||
}
|
||||
|
|
|
@ -1,18 +1,13 @@
|
|||
// Original file: null
|
||||
|
||||
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
|
||||
import type { MigrateAnnotation as _udpa_annotations_MigrateAnnotation, MigrateAnnotation__Output as _udpa_annotations_MigrateAnnotation__Output } from '../../udpa/annotations/MigrateAnnotation';
|
||||
|
||||
export interface EnumValueOptions {
|
||||
'deprecated'?: (boolean);
|
||||
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
|
||||
'.envoy.annotations.disallowed_by_default_enum'?: (boolean);
|
||||
'.udpa.annotations.enum_value_migrate'?: (_udpa_annotations_MigrateAnnotation);
|
||||
}
|
||||
|
||||
export interface EnumValueOptions__Output {
|
||||
'deprecated': (boolean);
|
||||
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
|
||||
'.envoy.annotations.disallowed_by_default_enum': (boolean);
|
||||
'.udpa.annotations.enum_value_migrate'?: (_udpa_annotations_MigrateAnnotation__Output);
|
||||
}
|
||||
|
|
|
@ -2,8 +2,6 @@
|
|||
|
||||
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
|
||||
import type { FieldRules as _validate_FieldRules, FieldRules__Output as _validate_FieldRules__Output } from '../../validate/FieldRules';
|
||||
import type { FieldSecurityAnnotation as _udpa_annotations_FieldSecurityAnnotation, FieldSecurityAnnotation__Output as _udpa_annotations_FieldSecurityAnnotation__Output } from '../../udpa/annotations/FieldSecurityAnnotation';
|
||||
import type { FieldMigrateAnnotation as _udpa_annotations_FieldMigrateAnnotation, FieldMigrateAnnotation__Output as _udpa_annotations_FieldMigrateAnnotation__Output } from '../../udpa/annotations/FieldMigrateAnnotation';
|
||||
|
||||
// Original file: null
|
||||
|
||||
|
@ -30,10 +28,6 @@ export interface FieldOptions {
|
|||
'weak'?: (boolean);
|
||||
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
|
||||
'.validate.rules'?: (_validate_FieldRules);
|
||||
'.udpa.annotations.security'?: (_udpa_annotations_FieldSecurityAnnotation);
|
||||
'.udpa.annotations.sensitive'?: (boolean);
|
||||
'.udpa.annotations.field_migrate'?: (_udpa_annotations_FieldMigrateAnnotation);
|
||||
'.envoy.annotations.disallowed_by_default'?: (boolean);
|
||||
}
|
||||
|
||||
export interface FieldOptions__Output {
|
||||
|
@ -45,8 +39,4 @@ export interface FieldOptions__Output {
|
|||
'weak': (boolean);
|
||||
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
|
||||
'.validate.rules'?: (_validate_FieldRules__Output);
|
||||
'.udpa.annotations.security'?: (_udpa_annotations_FieldSecurityAnnotation__Output);
|
||||
'.udpa.annotations.sensitive': (boolean);
|
||||
'.udpa.annotations.field_migrate'?: (_udpa_annotations_FieldMigrateAnnotation__Output);
|
||||
'.envoy.annotations.disallowed_by_default': (boolean);
|
||||
}
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
// Original file: null
|
||||
|
||||
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
|
||||
import type { FileMigrateAnnotation as _udpa_annotations_FileMigrateAnnotation, FileMigrateAnnotation__Output as _udpa_annotations_FileMigrateAnnotation__Output } from '../../udpa/annotations/FileMigrateAnnotation';
|
||||
import type { StatusAnnotation as _udpa_annotations_StatusAnnotation, StatusAnnotation__Output as _udpa_annotations_StatusAnnotation__Output } from '../../udpa/annotations/StatusAnnotation';
|
||||
|
||||
// Original file: null
|
||||
|
||||
|
@ -28,8 +26,6 @@ export interface FileOptions {
|
|||
'objcClassPrefix'?: (string);
|
||||
'csharpNamespace'?: (string);
|
||||
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
|
||||
'.udpa.annotations.file_migrate'?: (_udpa_annotations_FileMigrateAnnotation);
|
||||
'.udpa.annotations.file_status'?: (_udpa_annotations_StatusAnnotation);
|
||||
}
|
||||
|
||||
export interface FileOptions__Output {
|
||||
|
@ -48,6 +44,4 @@ export interface FileOptions__Output {
|
|||
'objcClassPrefix': (string);
|
||||
'csharpNamespace': (string);
|
||||
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
|
||||
'.udpa.annotations.file_migrate'?: (_udpa_annotations_FileMigrateAnnotation__Output);
|
||||
'.udpa.annotations.file_status'?: (_udpa_annotations_StatusAnnotation__Output);
|
||||
}
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
// Original file: null
|
||||
|
||||
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
|
||||
import type { VersioningAnnotation as _udpa_annotations_VersioningAnnotation, VersioningAnnotation__Output as _udpa_annotations_VersioningAnnotation__Output } from '../../udpa/annotations/VersioningAnnotation';
|
||||
import type { MigrateAnnotation as _udpa_annotations_MigrateAnnotation, MigrateAnnotation__Output as _udpa_annotations_MigrateAnnotation__Output } from '../../udpa/annotations/MigrateAnnotation';
|
||||
|
||||
export interface MessageOptions {
|
||||
'messageSetWireFormat'?: (boolean);
|
||||
|
@ -11,8 +9,6 @@ export interface MessageOptions {
|
|||
'mapEntry'?: (boolean);
|
||||
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
|
||||
'.validate.disabled'?: (boolean);
|
||||
'.udpa.annotations.versioning'?: (_udpa_annotations_VersioningAnnotation);
|
||||
'.udpa.annotations.message_migrate'?: (_udpa_annotations_MigrateAnnotation);
|
||||
}
|
||||
|
||||
export interface MessageOptions__Output {
|
||||
|
@ -22,6 +18,4 @@ export interface MessageOptions__Output {
|
|||
'mapEntry': (boolean);
|
||||
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
|
||||
'.validate.disabled': (boolean);
|
||||
'.udpa.annotations.versioning'?: (_udpa_annotations_VersioningAnnotation__Output);
|
||||
'.udpa.annotations.message_migrate'?: (_udpa_annotations_MigrateAnnotation__Output);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
import type * as grpc from '@grpc/grpc-js';
|
||||
import type { ServiceDefinition, EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader';
|
||||
|
||||
|
||||
type SubtypeConstructor<Constructor extends new (...args: any) => any, Subtype> = {
|
||||
new(...args: ConstructorParameters<Constructor>): Subtype;
|
||||
};
|
||||
|
||||
export interface ProtoGrpcType {
|
||||
google: {
|
||||
protobuf: {
|
||||
DescriptorProto: MessageTypeDefinition
|
||||
Duration: MessageTypeDefinition
|
||||
EnumDescriptorProto: MessageTypeDefinition
|
||||
EnumOptions: MessageTypeDefinition
|
||||
EnumValueDescriptorProto: MessageTypeDefinition
|
||||
EnumValueOptions: MessageTypeDefinition
|
||||
FieldDescriptorProto: MessageTypeDefinition
|
||||
FieldOptions: MessageTypeDefinition
|
||||
FileDescriptorProto: MessageTypeDefinition
|
||||
FileDescriptorSet: MessageTypeDefinition
|
||||
FileOptions: MessageTypeDefinition
|
||||
GeneratedCodeInfo: MessageTypeDefinition
|
||||
ListValue: MessageTypeDefinition
|
||||
MessageOptions: MessageTypeDefinition
|
||||
MethodDescriptorProto: MessageTypeDefinition
|
||||
MethodOptions: MessageTypeDefinition
|
||||
NullValue: EnumTypeDefinition
|
||||
OneofDescriptorProto: MessageTypeDefinition
|
||||
OneofOptions: MessageTypeDefinition
|
||||
ServiceDescriptorProto: MessageTypeDefinition
|
||||
ServiceOptions: MessageTypeDefinition
|
||||
SourceCodeInfo: MessageTypeDefinition
|
||||
Struct: MessageTypeDefinition
|
||||
Timestamp: MessageTypeDefinition
|
||||
UninterpretedOption: MessageTypeDefinition
|
||||
Value: MessageTypeDefinition
|
||||
}
|
||||
}
|
||||
udpa: {
|
||||
type: {
|
||||
v1: {
|
||||
TypedStruct: MessageTypeDefinition
|
||||
}
|
||||
}
|
||||
}
|
||||
validate: {
|
||||
AnyRules: MessageTypeDefinition
|
||||
BoolRules: MessageTypeDefinition
|
||||
BytesRules: MessageTypeDefinition
|
||||
DoubleRules: MessageTypeDefinition
|
||||
DurationRules: MessageTypeDefinition
|
||||
EnumRules: MessageTypeDefinition
|
||||
FieldRules: MessageTypeDefinition
|
||||
Fixed32Rules: MessageTypeDefinition
|
||||
Fixed64Rules: MessageTypeDefinition
|
||||
FloatRules: MessageTypeDefinition
|
||||
Int32Rules: MessageTypeDefinition
|
||||
Int64Rules: MessageTypeDefinition
|
||||
KnownRegex: EnumTypeDefinition
|
||||
MapRules: MessageTypeDefinition
|
||||
MessageRules: MessageTypeDefinition
|
||||
RepeatedRules: MessageTypeDefinition
|
||||
SFixed32Rules: MessageTypeDefinition
|
||||
SFixed64Rules: MessageTypeDefinition
|
||||
SInt32Rules: MessageTypeDefinition
|
||||
SInt64Rules: MessageTypeDefinition
|
||||
StringRules: MessageTypeDefinition
|
||||
TimestampRules: MessageTypeDefinition
|
||||
UInt32Rules: MessageTypeDefinition
|
||||
UInt64Rules: MessageTypeDefinition
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
// Original file: deps/udpa/udpa/type/v1/typed_struct.proto
|
||||
|
||||
import type { Struct as _google_protobuf_Struct, Struct__Output as _google_protobuf_Struct__Output } from '../../../google/protobuf/Struct';
|
||||
|
||||
/**
|
||||
* A TypedStruct contains an arbitrary JSON serialized protocol buffer message with a URL that
|
||||
* describes the type of the serialized message. This is very similar to google.protobuf.Any,
|
||||
* instead of having protocol buffer binary, this employs google.protobuf.Struct as value.
|
||||
*
|
||||
* This message is intended to be embedded inside Any, so it shouldn't be directly referred
|
||||
* from other UDPA messages.
|
||||
*
|
||||
* When packing an opaque extension config, packing the expected type into Any is preferred
|
||||
* wherever possible for its efficiency. TypedStruct should be used only if a proto descriptor
|
||||
* is not available, for example if:
|
||||
* - A control plane sends opaque message that is originally from external source in human readable
|
||||
* format such as JSON or YAML.
|
||||
* - The control plane doesn't have the knowledge of the protocol buffer schema hence it cannot
|
||||
* serialize the message in protocol buffer binary format.
|
||||
* - The DPLB doesn't have have the knowledge of the protocol buffer schema its plugin or extension
|
||||
* uses. This has to be indicated in the DPLB capability negotiation.
|
||||
*
|
||||
* When a DPLB receives a TypedStruct in Any, it should:
|
||||
* - Check if the type_url of the TypedStruct matches the type the extension expects.
|
||||
* - Convert value to the type described in type_url and perform validation.
|
||||
* TODO(lizan): Figure out how TypeStruct should be used with DPLB extensions that doesn't link
|
||||
* protobuf descriptor with DPLB itself, (e.g. gRPC LB Plugin, Envoy WASM extensions).
|
||||
*/
|
||||
export interface TypedStruct {
|
||||
/**
|
||||
* A URL that uniquely identifies the type of the serialize protocol buffer message.
|
||||
* This has same semantics and format described in google.protobuf.Any:
|
||||
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/any.proto
|
||||
*/
|
||||
'type_url'?: (string);
|
||||
/**
|
||||
* A JSON representation of the above specified type.
|
||||
*/
|
||||
'value'?: (_google_protobuf_Struct);
|
||||
}
|
||||
|
||||
/**
|
||||
* A TypedStruct contains an arbitrary JSON serialized protocol buffer message with a URL that
|
||||
* describes the type of the serialized message. This is very similar to google.protobuf.Any,
|
||||
* instead of having protocol buffer binary, this employs google.protobuf.Struct as value.
|
||||
*
|
||||
* This message is intended to be embedded inside Any, so it shouldn't be directly referred
|
||||
* from other UDPA messages.
|
||||
*
|
||||
* When packing an opaque extension config, packing the expected type into Any is preferred
|
||||
* wherever possible for its efficiency. TypedStruct should be used only if a proto descriptor
|
||||
* is not available, for example if:
|
||||
* - A control plane sends opaque message that is originally from external source in human readable
|
||||
* format such as JSON or YAML.
|
||||
* - The control plane doesn't have the knowledge of the protocol buffer schema hence it cannot
|
||||
* serialize the message in protocol buffer binary format.
|
||||
* - The DPLB doesn't have have the knowledge of the protocol buffer schema its plugin or extension
|
||||
* uses. This has to be indicated in the DPLB capability negotiation.
|
||||
*
|
||||
* When a DPLB receives a TypedStruct in Any, it should:
|
||||
* - Check if the type_url of the TypedStruct matches the type the extension expects.
|
||||
* - Convert value to the type described in type_url and perform validation.
|
||||
* TODO(lizan): Figure out how TypeStruct should be used with DPLB extensions that doesn't link
|
||||
* protobuf descriptor with DPLB itself, (e.g. gRPC LB Plugin, Envoy WASM extensions).
|
||||
*/
|
||||
export interface TypedStruct__Output {
|
||||
/**
|
||||
* A URL that uniquely identifies the type of the serialize protocol buffer message.
|
||||
* This has same semantics and format described in google.protobuf.Any:
|
||||
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/any.proto
|
||||
*/
|
||||
'type_url': (string);
|
||||
/**
|
||||
* A JSON representation of the above specified type.
|
||||
*/
|
||||
'value'?: (_google_protobuf_Struct__Output);
|
||||
}
|
|
@ -0,0 +1,221 @@
|
|||
/*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// This is a non-public, unstable API, but it's very convenient
|
||||
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
|
||||
import { experimental } from '@grpc/grpc-js';
|
||||
import { Any__Output } from './generated/google/protobuf/Any';
|
||||
import Filter = experimental.Filter;
|
||||
import FilterFactory = experimental.FilterFactory;
|
||||
import { TypedStruct__Output } from './generated/udpa/type/v1/TypedStruct';
|
||||
import { FilterConfig__Output } from './generated/envoy/config/route/v3/FilterConfig';
|
||||
import { HttpFilter__Output } from './generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpFilter';
|
||||
|
||||
const TYPED_STRUCT_URL = 'type.googleapis.com/udpa.type.v1.TypedStruct';
|
||||
const TYPED_STRUCT_NAME = 'udpa.type.v1.TypedStruct';
|
||||
|
||||
const FILTER_CONFIG_URL = 'type.googleapis.com/envoy.config.route.v3.FilterConfig';
|
||||
const FILTER_CONFIG_NAME = 'envoy.config.route.v3.FilterConfig';
|
||||
|
||||
const resourceRoot = loadProtosWithOptionsSync([
|
||||
'udpa/type/v1/typed_struct.proto',
|
||||
'envoy/config/route/v3/route_components.proto'], {
|
||||
keepCase: true,
|
||||
includeDirs: [
|
||||
// Paths are relative to src/build
|
||||
__dirname + '/../../deps/udpa/',
|
||||
__dirname + '/../../deps/envoy-api/'
|
||||
],
|
||||
}
|
||||
);
|
||||
|
||||
export interface HttpFilterConfig {
|
||||
typeUrl: string;
|
||||
config: any;
|
||||
}
|
||||
|
||||
export interface HttpFilterFactoryConstructor<FilterType extends Filter> {
|
||||
new(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig): FilterFactory<FilterType>;
|
||||
}
|
||||
|
||||
export interface HttpFilterRegistryEntry {
|
||||
parseTopLevelFilterConfig(encodedConfig: Any__Output): HttpFilterConfig | null;
|
||||
parseOverrideFilterConfig(encodedConfig: Any__Output): HttpFilterConfig | null;
|
||||
httpFilterConstructor: HttpFilterFactoryConstructor<Filter>;
|
||||
}
|
||||
|
||||
const FILTER_REGISTRY = new Map<string, HttpFilterRegistryEntry>();
|
||||
|
||||
export function registerHttpFilter(typeName: string, entry: HttpFilterRegistryEntry) {
|
||||
FILTER_REGISTRY.set(typeName, entry);
|
||||
}
|
||||
|
||||
const toObjectOptions = {
|
||||
longs: String,
|
||||
enums: String,
|
||||
defaults: true,
|
||||
oneofs: true
|
||||
}
|
||||
|
||||
function parseAnyMessage<MessageType>(message: Any__Output): MessageType | null {
|
||||
const messageType = resourceRoot.lookup(message.type_url);
|
||||
if (messageType) {
|
||||
const decodedMessage = (messageType as any).decode(message.value);
|
||||
return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as MessageType;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function getTopLevelFilterUrl(encodedConfig: Any__Output): string {
|
||||
let typeUrl: string;
|
||||
if (encodedConfig.type_url === TYPED_STRUCT_URL) {
|
||||
const typedStruct = parseAnyMessage<TypedStruct__Output>(encodedConfig)
|
||||
if (typedStruct) {
|
||||
return typedStruct.type_url;
|
||||
} else {
|
||||
throw new Error('Failed to parse TypedStruct');
|
||||
}
|
||||
} else {
|
||||
return encodedConfig.type_url;
|
||||
}
|
||||
}
|
||||
|
||||
export function validateTopLevelFilter(httpFilter: HttpFilter__Output): boolean {
|
||||
if (!httpFilter.typed_config) {
|
||||
return false;
|
||||
}
|
||||
const encodedConfig = httpFilter.typed_config;
|
||||
let typeUrl: string;
|
||||
try {
|
||||
typeUrl = getTopLevelFilterUrl(encodedConfig);
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
const registryEntry = FILTER_REGISTRY.get(typeUrl);
|
||||
if (registryEntry) {
|
||||
const parsedConfig = registryEntry.parseTopLevelFilterConfig(encodedConfig);
|
||||
return parsedConfig !== null;
|
||||
} else {
|
||||
if (httpFilter.is_optional) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function validateOverrideFilter(encodedConfig: Any__Output): boolean {
|
||||
let typeUrl: string;
|
||||
let realConfig: Any__Output;
|
||||
let isOptional = false;
|
||||
if (encodedConfig.type_url === FILTER_CONFIG_URL) {
|
||||
const filterConfig = parseAnyMessage<FilterConfig__Output>(encodedConfig);
|
||||
if (filterConfig) {
|
||||
isOptional = filterConfig.is_optional;
|
||||
if (filterConfig.config) {
|
||||
realConfig = filterConfig.config;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
realConfig = encodedConfig;
|
||||
}
|
||||
if (realConfig.type_url === TYPED_STRUCT_URL) {
|
||||
const typedStruct = parseAnyMessage<TypedStruct__Output>(encodedConfig);
|
||||
if (typedStruct) {
|
||||
typeUrl = typedStruct.type_url;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
typeUrl = realConfig.type_url;
|
||||
}
|
||||
const registryEntry = FILTER_REGISTRY.get(typeUrl);
|
||||
if (registryEntry) {
|
||||
const parsedConfig = registryEntry.parseOverrideFilterConfig(encodedConfig);
|
||||
return parsedConfig !== null;
|
||||
} else {
|
||||
if (isOptional) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function parseTopLevelFilterConfig(encodedConfig: Any__Output) {
|
||||
let typeUrl: string;
|
||||
try {
|
||||
typeUrl = getTopLevelFilterUrl(encodedConfig);
|
||||
} catch (e) {
|
||||
return null;
|
||||
}
|
||||
const registryEntry = FILTER_REGISTRY.get(typeUrl);
|
||||
if (registryEntry) {
|
||||
return registryEntry.parseTopLevelFilterConfig(encodedConfig);
|
||||
} else {
|
||||
// Filter type URL not found in registry
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function parseOverrideFilterConfig(encodedConfig: Any__Output) {
|
||||
let typeUrl: string;
|
||||
let realConfig: Any__Output;
|
||||
if (encodedConfig.type_url === FILTER_CONFIG_URL) {
|
||||
const filterConfig = parseAnyMessage<FilterConfig__Output>(encodedConfig);
|
||||
if (filterConfig) {
|
||||
if (filterConfig.config) {
|
||||
realConfig = filterConfig.config;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
realConfig = encodedConfig;
|
||||
}
|
||||
if (realConfig.type_url === TYPED_STRUCT_URL) {
|
||||
const typedStruct = parseAnyMessage<TypedStruct__Output>(encodedConfig);
|
||||
if (typedStruct) {
|
||||
typeUrl = typedStruct.type_url;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
typeUrl = realConfig.type_url;
|
||||
}
|
||||
const registryEntry = FILTER_REGISTRY.get(typeUrl);
|
||||
if (registryEntry) {
|
||||
return registryEntry.parseOverrideFilterConfig(encodedConfig);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function createHttpFilter(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig): FilterFactory<Filter> | null {
|
||||
const registryEntry = FILTER_REGISTRY.get(config.typeUrl);
|
||||
if (registryEntry) {
|
||||
return new registryEntry.httpFilterConstructor(config, overrideConfig);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { experimental } from '@grpc/grpc-js';
|
||||
import { Any__Output } from '../generated/google/protobuf/Any';
|
||||
import { HttpFilterConfig, registerHttpFilter } from '../http-filter';
|
||||
import Filter = experimental.Filter;
|
||||
import FilterFactory = experimental.FilterFactory;
|
||||
import BaseFilter = experimental.BaseFilter;
|
||||
|
||||
class RouterFilter extends BaseFilter implements Filter {}
|
||||
|
||||
class RouterFilterFactory implements FilterFactory<RouterFilter> {
|
||||
constructor(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig) {}
|
||||
|
||||
createFilter(callStream: experimental.CallStream): RouterFilter {
|
||||
return new RouterFilter();
|
||||
}
|
||||
}
|
||||
|
||||
const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router';
|
||||
|
||||
function parseConfig(encodedConfig: Any__Output): HttpFilterConfig | null {
|
||||
return {
|
||||
typeUrl: ROUTER_FILTER_URL,
|
||||
config: null
|
||||
};
|
||||
}
|
||||
|
||||
export function setup() {
|
||||
registerHttpFilter(ROUTER_FILTER_URL, {
|
||||
parseTopLevelFilterConfig: parseConfig,
|
||||
parseOverrideFilterConfig: parseConfig,
|
||||
httpFilterConstructor: RouterFilterFactory
|
||||
});
|
||||
}
|
|
@ -22,6 +22,7 @@ import * as load_balancer_lrs from './load-balancer-lrs';
|
|||
import * as load_balancer_priority from './load-balancer-priority';
|
||||
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
|
||||
import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager';
|
||||
import * as router_filter from './http-filter/router-filter';
|
||||
|
||||
/**
|
||||
* Register the "xds:" name scheme with the @grpc/grpc-js library.
|
||||
|
@ -34,4 +35,5 @@ export function register() {
|
|||
load_balancer_priority.setup();
|
||||
load_balancer_weighted_target.setup();
|
||||
load_balancer_xds_cluster_manager.setup();
|
||||
router_filter.setup();
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// This is a non-public, unstable API, but it's very convenient
|
||||
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
|
||||
import { Any__Output } from './generated/google/protobuf/Any';
|
||||
|
||||
function parseAnyMessage(encodedMessage: Any__Output) {
|
||||
|
||||
}
|
|
@ -42,6 +42,10 @@ import { RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedCluster
|
|||
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from './resources';
|
||||
import Duration = experimental.Duration;
|
||||
import { Duration__Output } from './generated/google/protobuf/Duration';
|
||||
import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter';
|
||||
import { EXPERIMENTAL_FAULT_INJECTION } from './environment';
|
||||
import Filter = experimental.Filter;
|
||||
import FilterFactory = experimental.FilterFactory;
|
||||
|
||||
const TRACER_NAME = 'xds_resolver';
|
||||
|
||||
|
@ -221,6 +225,8 @@ class XdsResolver implements Resolver {
|
|||
|
||||
private latestDefaultTimeout: Duration | undefined = undefined;
|
||||
|
||||
private ldsHttpFilterConfigs: {name: string, config: HttpFilterConfig}[] = [];
|
||||
|
||||
constructor(
|
||||
private target: GrpcUri,
|
||||
private listener: ResolverListener,
|
||||
|
@ -235,6 +241,16 @@ class XdsResolver implements Resolver {
|
|||
} else {
|
||||
this.latestDefaultTimeout = protoDurationToDuration(defaultTimeout);
|
||||
}
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
this.ldsHttpFilterConfigs = [];
|
||||
for (const filter of httpConnectionManager.http_filters) {
|
||||
// typed_config must be set here, or validation would have failed
|
||||
const filterConfig = parseTopLevelFilterConfig(filter.typed_config!);
|
||||
if (filterConfig) {
|
||||
this.ldsHttpFilterConfigs.push({name: filter.name, config: filterConfig});
|
||||
}
|
||||
}
|
||||
}
|
||||
switch (httpConnectionManager.route_specifier) {
|
||||
case 'rds': {
|
||||
const routeConfigName = httpConnectionManager.rds!.route_config_name;
|
||||
|
@ -314,6 +330,15 @@ class XdsResolver implements Resolver {
|
|||
this.reportResolutionError('No matching route found');
|
||||
return;
|
||||
}
|
||||
const virtualHostHttpFilterOverrides = new Map<string, HttpFilterConfig>();
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const [name, filter] of Object.entries(virtualHost.typed_per_filter_config ?? {})) {
|
||||
const parsedConfig = parseOverrideFilterConfig(filter);
|
||||
if (parsedConfig) {
|
||||
virtualHostHttpFilterOverrides.set(name, parsedConfig);
|
||||
}
|
||||
}
|
||||
}
|
||||
trace('Received virtual host config ' + JSON.stringify(virtualHost, undefined, 2));
|
||||
const allConfigClusters = new Set<string>();
|
||||
const matchList: {matcher: Matcher, action: RouteAction}[] = [];
|
||||
|
@ -334,20 +359,83 @@ class XdsResolver implements Resolver {
|
|||
if (timeout?.seconds === 0 && timeout.nanos === 0) {
|
||||
timeout = undefined;
|
||||
}
|
||||
const routeHttpFilterOverrides = new Map<string, HttpFilterConfig>();
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const [name, filter] of Object.entries(route.typed_per_filter_config ?? {})) {
|
||||
const parsedConfig = parseOverrideFilterConfig(filter);
|
||||
if (parsedConfig) {
|
||||
routeHttpFilterOverrides.set(name, parsedConfig);
|
||||
}
|
||||
}
|
||||
}
|
||||
switch (route.route!.cluster_specifier) {
|
||||
case 'cluster_header':
|
||||
continue;
|
||||
case 'cluster':{
|
||||
const cluster = route.route!.cluster!;
|
||||
allConfigClusters.add(cluster);
|
||||
routeAction = new SingleClusterRouteAction(cluster, timeout);
|
||||
const extraFilterFactories: FilterFactory<Filter>[] = [];
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const filterConfig of this.ldsHttpFilterConfigs) {
|
||||
if (routeHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
} else if (virtualHostHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, virtualHostHttpFilterOverrides.get(filterConfig.name)!);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
} else {
|
||||
const filter = createHttpFilter(filterConfig.config);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
routeAction = new SingleClusterRouteAction(cluster, timeout, extraFilterFactories);
|
||||
break;
|
||||
}
|
||||
case 'weighted_clusters': {
|
||||
const weightedClusters: WeightedCluster[] = [];
|
||||
for (const clusterWeight of route.route!.weighted_clusters!.clusters) {
|
||||
allConfigClusters.add(clusterWeight.name);
|
||||
weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0});
|
||||
const extraFilterFactories: FilterFactory<Filter>[] = [];
|
||||
const clusterHttpFilterOverrides = new Map<string, HttpFilterConfig>();
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const [name, filter] of Object.entries(clusterWeight.typed_per_filter_config ?? {})) {
|
||||
const parsedConfig = parseOverrideFilterConfig(filter);
|
||||
if (parsedConfig) {
|
||||
clusterHttpFilterOverrides.set(name, parsedConfig);
|
||||
}
|
||||
}
|
||||
for (const filterConfig of this.ldsHttpFilterConfigs) {
|
||||
if (clusterHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, clusterHttpFilterOverrides.get(filterConfig.name)!);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
} else if (routeHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
} else if (virtualHostHttpFilterOverrides.has(filterConfig.name)) {
|
||||
const filter = createHttpFilter(filterConfig.config, virtualHostHttpFilterOverrides.get(filterConfig.name)!);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
} else {
|
||||
const filter = createHttpFilter(filterConfig.config);
|
||||
if (filter) {
|
||||
extraFilterFactories.push(filter);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, dynamicFilterFactories: extraFilterFactories});
|
||||
}
|
||||
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, timeout);
|
||||
}
|
||||
|
@ -376,16 +464,17 @@ class XdsResolver implements Resolver {
|
|||
const configSelector: ConfigSelector = (methodName, metadata) => {
|
||||
for (const {matcher, action} of matchList) {
|
||||
if (matcher.apply(methodName, metadata)) {
|
||||
const clusterName = action.getCluster();
|
||||
this.refCluster(clusterName);
|
||||
const clusterResult = action.getCluster();
|
||||
this.refCluster(clusterResult.name);
|
||||
const onCommitted = () => {
|
||||
this.unrefCluster(clusterName);
|
||||
this.unrefCluster(clusterResult.name);
|
||||
}
|
||||
return {
|
||||
methodConfig: {name: [], timeout: action.getTimeout()},
|
||||
onCommitted: onCommitted,
|
||||
pickInformation: {cluster: clusterName},
|
||||
status: status.OK
|
||||
pickInformation: {cluster: clusterResult.name},
|
||||
status: status.OK,
|
||||
dynamicFilterFactories: clusterResult.dynamicFilterFactories
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -393,7 +482,8 @@ class XdsResolver implements Resolver {
|
|||
methodConfig: {name: []},
|
||||
// cluster won't be used here, but it's set because of some TypeScript weirdness
|
||||
pickInformation: {cluster: ''},
|
||||
status: status.UNAVAILABLE
|
||||
status: status.UNAVAILABLE,
|
||||
dynamicFilterFactories: []
|
||||
};
|
||||
};
|
||||
trace('Created ConfigSelector with configuration:');
|
||||
|
|
|
@ -16,10 +16,17 @@
|
|||
|
||||
import { experimental } from '@grpc/grpc-js';
|
||||
import Duration = experimental.Duration;
|
||||
import Filter = experimental.Filter;
|
||||
import FilterFactory = experimental.FilterFactory;
|
||||
|
||||
export interface ClusterResult {
|
||||
name: string;
|
||||
dynamicFilterFactories: FilterFactory<Filter>[];
|
||||
}
|
||||
|
||||
export interface RouteAction {
|
||||
toString(): string;
|
||||
getCluster(): string;
|
||||
getCluster(): ClusterResult;
|
||||
getTimeout(): Duration | undefined;
|
||||
}
|
||||
|
||||
|
@ -33,10 +40,13 @@ function durationToLogString(duration: Duration) {
|
|||
}
|
||||
|
||||
export class SingleClusterRouteAction implements RouteAction {
|
||||
constructor(private cluster: string, private timeout: Duration | undefined) {}
|
||||
constructor(private cluster: string, private timeout: Duration | undefined, private extraFilterFactories: FilterFactory<Filter>[]) {}
|
||||
|
||||
getCluster() {
|
||||
return this.cluster;
|
||||
return {
|
||||
name: this.cluster,
|
||||
dynamicFilterFactories: this.extraFilterFactories
|
||||
};
|
||||
}
|
||||
|
||||
toString() {
|
||||
|
@ -55,11 +65,13 @@ export class SingleClusterRouteAction implements RouteAction {
|
|||
export interface WeightedCluster {
|
||||
name: string;
|
||||
weight: number;
|
||||
dynamicFilterFactories: FilterFactory<Filter>[];
|
||||
}
|
||||
|
||||
interface ClusterChoice {
|
||||
name: string;
|
||||
numerator: number;
|
||||
dynamicFilterFactories: FilterFactory<Filter>[];
|
||||
}
|
||||
|
||||
export class WeightedClusterRouteAction implements RouteAction {
|
||||
|
@ -72,7 +84,7 @@ export class WeightedClusterRouteAction implements RouteAction {
|
|||
let lastNumerator = 0;
|
||||
for (const clusterWeight of clusters) {
|
||||
lastNumerator += clusterWeight.weight;
|
||||
this.clusterChoices.push({name: clusterWeight.name, numerator: lastNumerator});
|
||||
this.clusterChoices.push({name: clusterWeight.name, numerator: lastNumerator, dynamicFilterFactories: clusterWeight.dynamicFilterFactories});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,11 +92,14 @@ export class WeightedClusterRouteAction implements RouteAction {
|
|||
const randomNumber = Math.random() * this.totalWeight;
|
||||
for (const choice of this.clusterChoices) {
|
||||
if (randomNumber < choice.numerator) {
|
||||
return choice.name;
|
||||
return {
|
||||
name: choice.name,
|
||||
dynamicFilterFactories: choice.dynamicFilterFactories
|
||||
};
|
||||
}
|
||||
}
|
||||
// This should be prevented by the validation rules
|
||||
return '';
|
||||
return {name: '', dynamicFilterFactories: []};
|
||||
}
|
||||
|
||||
toString() {
|
||||
|
|
|
@ -22,6 +22,8 @@ import { RdsState } from "./rds-state";
|
|||
import { Watcher, XdsStreamState } from "./xds-stream-state";
|
||||
import { HttpConnectionManager__Output } from '../generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager';
|
||||
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V2, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from '../resources';
|
||||
import { validateTopLevelFilter } from '../http-filter';
|
||||
import { EXPERIMENTAL_FAULT_INJECTION } from '../environment';
|
||||
|
||||
const TRACER_NAME = 'xds_client';
|
||||
|
||||
|
@ -29,6 +31,8 @@ function trace(text: string): void {
|
|||
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router';
|
||||
|
||||
export class LdsState implements XdsStreamState<Listener__Output> {
|
||||
versionInfo = '';
|
||||
nonce = '';
|
||||
|
@ -100,6 +104,29 @@ export class LdsState implements XdsStreamState<Listener__Output> {
|
|||
return false;
|
||||
}
|
||||
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener.value);
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
const filterNames = new Set<string>();
|
||||
for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) {
|
||||
if (filterNames.has(httpFilter.name)) {
|
||||
return false;
|
||||
}
|
||||
filterNames.add(httpFilter.name);
|
||||
if (!validateTopLevelFilter(httpFilter)) {
|
||||
return false;
|
||||
}
|
||||
/* Validate that the last filter, and only the last filter, is the
|
||||
* router filter. */
|
||||
if (index < httpConnectionManager.http_filters.length - 1) {
|
||||
if (httpFilter.name === ROUTER_FILTER_URL) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (httpFilter.name !== ROUTER_FILTER_URL) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
switch (httpConnectionManager.route_specifier) {
|
||||
case 'rds':
|
||||
return !!httpConnectionManager.rds?.config_source?.ads;
|
||||
|
|
|
@ -16,7 +16,9 @@
|
|||
*/
|
||||
|
||||
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
|
||||
import { EXPERIMENTAL_FAULT_INJECTION } from "../environment";
|
||||
import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration";
|
||||
import { validateOverrideFilter } from "../http-filter";
|
||||
import { CdsLoadBalancingConfig } from "../load-balancer-cds";
|
||||
import { Watcher, XdsStreamState } from "./xds-stream-state";
|
||||
import ServiceConfig = experimental.ServiceConfig;
|
||||
|
@ -112,6 +114,13 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const filterConfig of Object.values(virtualHost.typed_per_filter_config ?? {})) {
|
||||
if (!validateOverrideFilter(filterConfig)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (const route of virtualHost.routes) {
|
||||
const match = route.match;
|
||||
if (!match) {
|
||||
|
@ -131,6 +140,13 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
|
|||
if ((route.route === undefined) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) {
|
||||
return false;
|
||||
}
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) {
|
||||
if (!validateOverrideFilter(filterConfig)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (route.route!.cluster_specifier === 'weighted_clusters') {
|
||||
let weightSum = 0;
|
||||
for (const clusterWeight of route.route.weighted_clusters!.clusters) {
|
||||
|
@ -139,6 +155,15 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
|
|||
if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) {
|
||||
return false;
|
||||
}
|
||||
if (EXPERIMENTAL_FAULT_INJECTION) {
|
||||
for (const weightedCluster of route.route!.weighted_clusters!.clusters) {
|
||||
for (const filterConfig of Object.values(weightedCluster.typed_per_filter_config ?? {})) {
|
||||
if (!validateOverrideFilter(filterConfig)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -466,11 +466,9 @@ export class Http2CallStream implements Call {
|
|||
attachHttp2Stream(
|
||||
stream: http2.ClientHttp2Stream,
|
||||
subchannel: Subchannel,
|
||||
extraFilters: FilterFactory<Filter>[]
|
||||
extraFilters: Filter[]
|
||||
): void {
|
||||
this.filterStack.push(
|
||||
extraFilters.map((filterFactory) => filterFactory.createFilter(this))
|
||||
);
|
||||
this.filterStack.push(extraFilters);
|
||||
if (this.finalStatus !== null) {
|
||||
stream.close(NGHTTP2_CANCEL);
|
||||
} else {
|
||||
|
@ -736,6 +734,10 @@ export class Http2CallStream implements Call {
|
|||
this.configDeadline = configDeadline;
|
||||
}
|
||||
|
||||
addFilters(extraFilters: Filter[]) {
|
||||
this.filterStack.push(extraFilters);
|
||||
}
|
||||
|
||||
startRead() {
|
||||
/* If the stream has ended with an error, we should not emit any more
|
||||
* messages and we should communicate that the stream has ended */
|
||||
|
|
|
@ -46,6 +46,8 @@ import { mapProxyName } from './http_proxy';
|
|||
import { GrpcUri, parseUri, uriToString } from './uri-parser';
|
||||
import { ServerSurfaceCall } from './server-call';
|
||||
import { SurfaceCall } from './call';
|
||||
import { Filter } from './filter';
|
||||
|
||||
import { ConnectivityState } from './connectivity-state';
|
||||
|
||||
/**
|
||||
|
@ -146,6 +148,7 @@ export class ChannelImplementation implements Channel {
|
|||
callStream: Http2CallStream;
|
||||
callMetadata: Metadata;
|
||||
callConfig: CallConfig;
|
||||
dynamicFilters: Filter[];
|
||||
}> = [];
|
||||
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
|
||||
private defaultAuthority: string;
|
||||
|
@ -235,8 +238,8 @@ export class ChannelImplementation implements Channel {
|
|||
const queueCopy = this.pickQueue.slice();
|
||||
this.pickQueue = [];
|
||||
this.callRefTimerUnref();
|
||||
for (const { callStream, callMetadata, callConfig } of queueCopy) {
|
||||
this.tryPick(callStream, callMetadata, callConfig);
|
||||
for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) {
|
||||
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
||||
}
|
||||
this.updateState(connectivityState);
|
||||
},
|
||||
|
@ -329,9 +332,10 @@ export class ChannelImplementation implements Channel {
|
|||
private pushPick(
|
||||
callStream: Http2CallStream,
|
||||
callMetadata: Metadata,
|
||||
callConfig: CallConfig
|
||||
callConfig: CallConfig,
|
||||
dynamicFilters: Filter[]
|
||||
) {
|
||||
this.pickQueue.push({ callStream, callMetadata, callConfig });
|
||||
this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters });
|
||||
this.callRefTimerRef();
|
||||
}
|
||||
|
||||
|
@ -345,7 +349,8 @@ export class ChannelImplementation implements Channel {
|
|||
private tryPick(
|
||||
callStream: Http2CallStream,
|
||||
callMetadata: Metadata,
|
||||
callConfig: CallConfig
|
||||
callConfig: CallConfig,
|
||||
dynamicFilters: Filter[]
|
||||
) {
|
||||
const pickResult = this.currentPicker.pick({
|
||||
metadata: callMetadata,
|
||||
|
@ -386,7 +391,7 @@ export class ChannelImplementation implements Channel {
|
|||
' has state ' +
|
||||
ConnectivityState[pickResult.subchannel!.getConnectivityState()]
|
||||
);
|
||||
this.pushPick(callStream, callMetadata, callConfig);
|
||||
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
||||
break;
|
||||
}
|
||||
/* We need to clone the callMetadata here because the transparent
|
||||
|
@ -399,10 +404,11 @@ export class ChannelImplementation implements Channel {
|
|||
const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState();
|
||||
if (subchannelState === ConnectivityState.READY) {
|
||||
try {
|
||||
const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
|
||||
pickResult.subchannel!.startCallStream(
|
||||
finalMetadata,
|
||||
callStream,
|
||||
pickResult.extraFilterFactories
|
||||
[...dynamicFilters, ...pickExtraFilters]
|
||||
);
|
||||
/* If we reach this point, the call stream has started
|
||||
* successfully */
|
||||
|
@ -435,7 +441,7 @@ export class ChannelImplementation implements Channel {
|
|||
(error as Error).message +
|
||||
'. Retrying pick'
|
||||
);
|
||||
this.tryPick(callStream, callMetadata, callConfig);
|
||||
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
||||
} else {
|
||||
trace(
|
||||
LogVerbosity.INFO,
|
||||
|
@ -466,7 +472,7 @@ export class ChannelImplementation implements Channel {
|
|||
ConnectivityState[subchannelState] +
|
||||
' after metadata filters. Retrying pick'
|
||||
);
|
||||
this.tryPick(callStream, callMetadata, callConfig);
|
||||
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
||||
}
|
||||
},
|
||||
(error: Error & { code: number }) => {
|
||||
|
@ -480,11 +486,11 @@ export class ChannelImplementation implements Channel {
|
|||
}
|
||||
break;
|
||||
case PickResultType.QUEUE:
|
||||
this.pushPick(callStream, callMetadata, callConfig);
|
||||
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
||||
break;
|
||||
case PickResultType.TRANSIENT_FAILURE:
|
||||
if (callMetadata.getOptions().waitForReady) {
|
||||
this.pushPick(callStream, callMetadata, callConfig);
|
||||
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
||||
} else {
|
||||
callStream.cancelWithStatus(
|
||||
pickResult.status!.code,
|
||||
|
@ -572,7 +578,27 @@ export class ChannelImplementation implements Channel {
|
|||
// Refreshing the filters makes the deadline filter pick up the new deadline
|
||||
stream.filterStack.refresh();
|
||||
}
|
||||
this.tryPick(stream, metadata, callConfig);
|
||||
if (callConfig.dynamicFilterFactories.length > 0) {
|
||||
/* These dynamicFilters are the mechanism for implementing gRFC A39:
|
||||
* https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md
|
||||
* We run them here instead of with the rest of the filters because
|
||||
* that spec says "the xDS HTTP filters will run in between name
|
||||
* resolution and load balancing".
|
||||
*
|
||||
* We use the filter stack here to simplify the multi-filter async
|
||||
* waterfall logic, but we pass along the underlying list of filters
|
||||
* to avoid having nested filter stacks when combining it with the
|
||||
* original filter stack. We do not pass along the original filter
|
||||
* factory list because these filters may need to persist data
|
||||
* between sending headers and other operations. */
|
||||
const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories);
|
||||
const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream);
|
||||
dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
|
||||
this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters());
|
||||
});
|
||||
} else {
|
||||
this.tryPick(stream, metadata, callConfig, []);
|
||||
}
|
||||
} else {
|
||||
stream.cancelWithStatus(
|
||||
callConfig.status,
|
||||
|
|
|
@ -81,6 +81,10 @@ export class FilterStack implements Filter {
|
|||
push(filters: Filter[]) {
|
||||
this.filters.unshift(...filters);
|
||||
}
|
||||
|
||||
getFilters(): Filter[] {
|
||||
return this.filters;
|
||||
}
|
||||
}
|
||||
|
||||
export class FilterStackFactory implements FilterFactory<FilterStack> {
|
||||
|
|
|
@ -22,12 +22,14 @@ import { GrpcUri, uriToString } from './uri-parser';
|
|||
import { ChannelOptions } from './channel-options';
|
||||
import { Metadata } from './metadata';
|
||||
import { Status } from './constants';
|
||||
import { Filter, FilterFactory } from './filter';
|
||||
|
||||
export interface CallConfig {
|
||||
methodConfig: MethodConfig;
|
||||
onCommitted?: () => void;
|
||||
pickInformation: { [key: string]: string };
|
||||
status: Status;
|
||||
dynamicFilterFactories: FilterFactory<Filter>[];
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -67,6 +67,7 @@ function getDefaultConfigSelector(
|
|||
methodConfig: methodConfig,
|
||||
pickInformation: {},
|
||||
status: Status.OK,
|
||||
dynamicFilterFactories: []
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -76,6 +77,7 @@ function getDefaultConfigSelector(
|
|||
methodConfig: { name: [] },
|
||||
pickInformation: {},
|
||||
status: Status.OK,
|
||||
dynamicFilterFactories: []
|
||||
};
|
||||
};
|
||||
}
|
||||
|
|
|
@ -655,7 +655,7 @@ export class Subchannel {
|
|||
startCallStream(
|
||||
metadata: Metadata,
|
||||
callStream: Http2CallStream,
|
||||
extraFilterFactories: FilterFactory<Filter>[]
|
||||
extraFilters: Filter[]
|
||||
) {
|
||||
const headers = metadata.toHttp2Headers();
|
||||
headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
|
||||
|
@ -694,7 +694,7 @@ export class Subchannel {
|
|||
' with headers\n' +
|
||||
headersString
|
||||
);
|
||||
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactories);
|
||||
callStream.attachHttp2Stream(http2Stream, this, extraFilters);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue