From 311aca31e411da18e542686665aa14711aa7ec3e Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 17 Jun 2021 09:45:19 -0700 Subject: [PATCH 1/5] grpc-js-xds: Add HTTP Filters support --- packages/grpc-js-xds/package.json | 2 +- packages/grpc-js-xds/src/environment.ts | 4 +- .../generated/google/protobuf/EnumOptions.ts | 3 - .../google/protobuf/EnumValueOptions.ts | 5 - .../generated/google/protobuf/FieldOptions.ts | 10 - .../generated/google/protobuf/FileOptions.ts | 6 - .../google/protobuf/MessageOptions.ts | 6 - .../grpc-js-xds/src/generated/typed_struct.ts | 74 ++++++ .../src/generated/udpa/type/v1/TypedStruct.ts | 77 ++++++ packages/grpc-js-xds/src/http-filter.ts | 221 ++++++++++++++++++ packages/grpc-js-xds/src/protobuf-any.ts | 23 ++ packages/grpc-js-xds/src/resolver-xds.ts | 139 ++++++++++- packages/grpc-js-xds/src/route-action.ts | 27 ++- .../src/xds-stream-state/lds-state.ts | 9 + .../src/xds-stream-state/rds-state.ts | 25 ++ packages/grpc-js/src/call-stream.ts | 4 + packages/grpc-js/src/channel.ts | 1 + packages/grpc-js/src/resolver.ts | 2 + .../grpc-js/src/resolving-load-balancer.ts | 6 +- 19 files changed, 596 insertions(+), 48 deletions(-) create mode 100644 packages/grpc-js-xds/src/generated/typed_struct.ts create mode 100644 packages/grpc-js-xds/src/generated/udpa/type/v1/TypedStruct.ts create mode 100644 packages/grpc-js-xds/src/http-filter.ts create mode 100644 packages/grpc-js-xds/src/protobuf-any.ts diff --git a/packages/grpc-js-xds/package.json b/packages/grpc-js-xds/package.json index f69c8bf4..8b6005d5 100644 --- a/packages/grpc-js-xds/package.json +++ b/packages/grpc-js-xds/package.json @@ -12,7 +12,7 @@ "prepare": "npm run compile", "pretest": "npm run compile", "posttest": "npm run check", - "generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/udpa/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v2/ads.proto envoy/service/load_stats/v2/lrs.proto envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto", + "generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/udpa/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v2/ads.proto envoy/service/load_stats/v2/lrs.proto envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto", "generate-interop-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O interop/generated --grpcLib @grpc/grpc-js grpc/testing/test.proto" }, "repository": { diff --git a/packages/grpc-js-xds/src/environment.ts b/packages/grpc-js-xds/src/environment.ts index c2c7f2e0..18e9e70b 100644 --- a/packages/grpc-js-xds/src/environment.ts +++ b/packages/grpc-js-xds/src/environment.ts @@ -13,4 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. * - */ \ No newline at end of file + */ + +export const EXPERIMENTAL_FAULT_INJECTION = process.env.GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION; \ No newline at end of file diff --git a/packages/grpc-js-xds/src/generated/google/protobuf/EnumOptions.ts b/packages/grpc-js-xds/src/generated/google/protobuf/EnumOptions.ts index b92f699a..b92ade4f 100644 --- a/packages/grpc-js-xds/src/generated/google/protobuf/EnumOptions.ts +++ b/packages/grpc-js-xds/src/generated/google/protobuf/EnumOptions.ts @@ -1,18 +1,15 @@ // Original file: null import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption'; -import type { MigrateAnnotation as _udpa_annotations_MigrateAnnotation, MigrateAnnotation__Output as _udpa_annotations_MigrateAnnotation__Output } from '../../udpa/annotations/MigrateAnnotation'; export interface EnumOptions { 'allowAlias'?: (boolean); 'deprecated'?: (boolean); 'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[]; - '.udpa.annotations.enum_migrate'?: (_udpa_annotations_MigrateAnnotation); } export interface EnumOptions__Output { 'allowAlias': (boolean); 'deprecated': (boolean); 'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[]; - '.udpa.annotations.enum_migrate'?: (_udpa_annotations_MigrateAnnotation__Output); } diff --git a/packages/grpc-js-xds/src/generated/google/protobuf/EnumValueOptions.ts b/packages/grpc-js-xds/src/generated/google/protobuf/EnumValueOptions.ts index db277053..e60ee6f4 100644 --- a/packages/grpc-js-xds/src/generated/google/protobuf/EnumValueOptions.ts +++ b/packages/grpc-js-xds/src/generated/google/protobuf/EnumValueOptions.ts @@ -1,18 +1,13 @@ // Original file: null import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption'; -import type { MigrateAnnotation as _udpa_annotations_MigrateAnnotation, MigrateAnnotation__Output as _udpa_annotations_MigrateAnnotation__Output } from '../../udpa/annotations/MigrateAnnotation'; export interface EnumValueOptions { 'deprecated'?: (boolean); 'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[]; - '.envoy.annotations.disallowed_by_default_enum'?: (boolean); - '.udpa.annotations.enum_value_migrate'?: (_udpa_annotations_MigrateAnnotation); } export interface EnumValueOptions__Output { 'deprecated': (boolean); 'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[]; - '.envoy.annotations.disallowed_by_default_enum': (boolean); - '.udpa.annotations.enum_value_migrate'?: (_udpa_annotations_MigrateAnnotation__Output); } diff --git a/packages/grpc-js-xds/src/generated/google/protobuf/FieldOptions.ts b/packages/grpc-js-xds/src/generated/google/protobuf/FieldOptions.ts index 63f8a015..530022af 100644 --- a/packages/grpc-js-xds/src/generated/google/protobuf/FieldOptions.ts +++ b/packages/grpc-js-xds/src/generated/google/protobuf/FieldOptions.ts @@ -2,8 +2,6 @@ import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption'; import type { FieldRules as _validate_FieldRules, FieldRules__Output as _validate_FieldRules__Output } from '../../validate/FieldRules'; -import type { FieldSecurityAnnotation as _udpa_annotations_FieldSecurityAnnotation, FieldSecurityAnnotation__Output as _udpa_annotations_FieldSecurityAnnotation__Output } from '../../udpa/annotations/FieldSecurityAnnotation'; -import type { FieldMigrateAnnotation as _udpa_annotations_FieldMigrateAnnotation, FieldMigrateAnnotation__Output as _udpa_annotations_FieldMigrateAnnotation__Output } from '../../udpa/annotations/FieldMigrateAnnotation'; // Original file: null @@ -30,10 +28,6 @@ export interface FieldOptions { 'weak'?: (boolean); 'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[]; '.validate.rules'?: (_validate_FieldRules); - '.udpa.annotations.security'?: (_udpa_annotations_FieldSecurityAnnotation); - '.udpa.annotations.sensitive'?: (boolean); - '.udpa.annotations.field_migrate'?: (_udpa_annotations_FieldMigrateAnnotation); - '.envoy.annotations.disallowed_by_default'?: (boolean); } export interface FieldOptions__Output { @@ -45,8 +39,4 @@ export interface FieldOptions__Output { 'weak': (boolean); 'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[]; '.validate.rules'?: (_validate_FieldRules__Output); - '.udpa.annotations.security'?: (_udpa_annotations_FieldSecurityAnnotation__Output); - '.udpa.annotations.sensitive': (boolean); - '.udpa.annotations.field_migrate'?: (_udpa_annotations_FieldMigrateAnnotation__Output); - '.envoy.annotations.disallowed_by_default': (boolean); } diff --git a/packages/grpc-js-xds/src/generated/google/protobuf/FileOptions.ts b/packages/grpc-js-xds/src/generated/google/protobuf/FileOptions.ts index b2ddbb37..573e847c 100644 --- a/packages/grpc-js-xds/src/generated/google/protobuf/FileOptions.ts +++ b/packages/grpc-js-xds/src/generated/google/protobuf/FileOptions.ts @@ -1,8 +1,6 @@ // Original file: null import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption'; -import type { FileMigrateAnnotation as _udpa_annotations_FileMigrateAnnotation, FileMigrateAnnotation__Output as _udpa_annotations_FileMigrateAnnotation__Output } from '../../udpa/annotations/FileMigrateAnnotation'; -import type { StatusAnnotation as _udpa_annotations_StatusAnnotation, StatusAnnotation__Output as _udpa_annotations_StatusAnnotation__Output } from '../../udpa/annotations/StatusAnnotation'; // Original file: null @@ -28,8 +26,6 @@ export interface FileOptions { 'objcClassPrefix'?: (string); 'csharpNamespace'?: (string); 'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[]; - '.udpa.annotations.file_migrate'?: (_udpa_annotations_FileMigrateAnnotation); - '.udpa.annotations.file_status'?: (_udpa_annotations_StatusAnnotation); } export interface FileOptions__Output { @@ -48,6 +44,4 @@ export interface FileOptions__Output { 'objcClassPrefix': (string); 'csharpNamespace': (string); 'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[]; - '.udpa.annotations.file_migrate'?: (_udpa_annotations_FileMigrateAnnotation__Output); - '.udpa.annotations.file_status'?: (_udpa_annotations_StatusAnnotation__Output); } diff --git a/packages/grpc-js-xds/src/generated/google/protobuf/MessageOptions.ts b/packages/grpc-js-xds/src/generated/google/protobuf/MessageOptions.ts index 219e4bfd..d3b5a54b 100644 --- a/packages/grpc-js-xds/src/generated/google/protobuf/MessageOptions.ts +++ b/packages/grpc-js-xds/src/generated/google/protobuf/MessageOptions.ts @@ -1,8 +1,6 @@ // Original file: null import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption'; -import type { VersioningAnnotation as _udpa_annotations_VersioningAnnotation, VersioningAnnotation__Output as _udpa_annotations_VersioningAnnotation__Output } from '../../udpa/annotations/VersioningAnnotation'; -import type { MigrateAnnotation as _udpa_annotations_MigrateAnnotation, MigrateAnnotation__Output as _udpa_annotations_MigrateAnnotation__Output } from '../../udpa/annotations/MigrateAnnotation'; export interface MessageOptions { 'messageSetWireFormat'?: (boolean); @@ -11,8 +9,6 @@ export interface MessageOptions { 'mapEntry'?: (boolean); 'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[]; '.validate.disabled'?: (boolean); - '.udpa.annotations.versioning'?: (_udpa_annotations_VersioningAnnotation); - '.udpa.annotations.message_migrate'?: (_udpa_annotations_MigrateAnnotation); } export interface MessageOptions__Output { @@ -22,6 +18,4 @@ export interface MessageOptions__Output { 'mapEntry': (boolean); 'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[]; '.validate.disabled': (boolean); - '.udpa.annotations.versioning'?: (_udpa_annotations_VersioningAnnotation__Output); - '.udpa.annotations.message_migrate'?: (_udpa_annotations_MigrateAnnotation__Output); } diff --git a/packages/grpc-js-xds/src/generated/typed_struct.ts b/packages/grpc-js-xds/src/generated/typed_struct.ts new file mode 100644 index 00000000..78d781a2 --- /dev/null +++ b/packages/grpc-js-xds/src/generated/typed_struct.ts @@ -0,0 +1,74 @@ +import type * as grpc from '@grpc/grpc-js'; +import type { ServiceDefinition, EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader'; + + +type SubtypeConstructor any, Subtype> = { + new(...args: ConstructorParameters): Subtype; +}; + +export interface ProtoGrpcType { + google: { + protobuf: { + DescriptorProto: MessageTypeDefinition + Duration: MessageTypeDefinition + EnumDescriptorProto: MessageTypeDefinition + EnumOptions: MessageTypeDefinition + EnumValueDescriptorProto: MessageTypeDefinition + EnumValueOptions: MessageTypeDefinition + FieldDescriptorProto: MessageTypeDefinition + FieldOptions: MessageTypeDefinition + FileDescriptorProto: MessageTypeDefinition + FileDescriptorSet: MessageTypeDefinition + FileOptions: MessageTypeDefinition + GeneratedCodeInfo: MessageTypeDefinition + ListValue: MessageTypeDefinition + MessageOptions: MessageTypeDefinition + MethodDescriptorProto: MessageTypeDefinition + MethodOptions: MessageTypeDefinition + NullValue: EnumTypeDefinition + OneofDescriptorProto: MessageTypeDefinition + OneofOptions: MessageTypeDefinition + ServiceDescriptorProto: MessageTypeDefinition + ServiceOptions: MessageTypeDefinition + SourceCodeInfo: MessageTypeDefinition + Struct: MessageTypeDefinition + Timestamp: MessageTypeDefinition + UninterpretedOption: MessageTypeDefinition + Value: MessageTypeDefinition + } + } + udpa: { + type: { + v1: { + TypedStruct: MessageTypeDefinition + } + } + } + validate: { + AnyRules: MessageTypeDefinition + BoolRules: MessageTypeDefinition + BytesRules: MessageTypeDefinition + DoubleRules: MessageTypeDefinition + DurationRules: MessageTypeDefinition + EnumRules: MessageTypeDefinition + FieldRules: MessageTypeDefinition + Fixed32Rules: MessageTypeDefinition + Fixed64Rules: MessageTypeDefinition + FloatRules: MessageTypeDefinition + Int32Rules: MessageTypeDefinition + Int64Rules: MessageTypeDefinition + KnownRegex: EnumTypeDefinition + MapRules: MessageTypeDefinition + MessageRules: MessageTypeDefinition + RepeatedRules: MessageTypeDefinition + SFixed32Rules: MessageTypeDefinition + SFixed64Rules: MessageTypeDefinition + SInt32Rules: MessageTypeDefinition + SInt64Rules: MessageTypeDefinition + StringRules: MessageTypeDefinition + TimestampRules: MessageTypeDefinition + UInt32Rules: MessageTypeDefinition + UInt64Rules: MessageTypeDefinition + } +} + diff --git a/packages/grpc-js-xds/src/generated/udpa/type/v1/TypedStruct.ts b/packages/grpc-js-xds/src/generated/udpa/type/v1/TypedStruct.ts new file mode 100644 index 00000000..f9d84298 --- /dev/null +++ b/packages/grpc-js-xds/src/generated/udpa/type/v1/TypedStruct.ts @@ -0,0 +1,77 @@ +// Original file: deps/udpa/udpa/type/v1/typed_struct.proto + +import type { Struct as _google_protobuf_Struct, Struct__Output as _google_protobuf_Struct__Output } from '../../../google/protobuf/Struct'; + +/** + * A TypedStruct contains an arbitrary JSON serialized protocol buffer message with a URL that + * describes the type of the serialized message. This is very similar to google.protobuf.Any, + * instead of having protocol buffer binary, this employs google.protobuf.Struct as value. + * + * This message is intended to be embedded inside Any, so it shouldn't be directly referred + * from other UDPA messages. + * + * When packing an opaque extension config, packing the expected type into Any is preferred + * wherever possible for its efficiency. TypedStruct should be used only if a proto descriptor + * is not available, for example if: + * - A control plane sends opaque message that is originally from external source in human readable + * format such as JSON or YAML. + * - The control plane doesn't have the knowledge of the protocol buffer schema hence it cannot + * serialize the message in protocol buffer binary format. + * - The DPLB doesn't have have the knowledge of the protocol buffer schema its plugin or extension + * uses. This has to be indicated in the DPLB capability negotiation. + * + * When a DPLB receives a TypedStruct in Any, it should: + * - Check if the type_url of the TypedStruct matches the type the extension expects. + * - Convert value to the type described in type_url and perform validation. + * TODO(lizan): Figure out how TypeStruct should be used with DPLB extensions that doesn't link + * protobuf descriptor with DPLB itself, (e.g. gRPC LB Plugin, Envoy WASM extensions). + */ +export interface TypedStruct { + /** + * A URL that uniquely identifies the type of the serialize protocol buffer message. + * This has same semantics and format described in google.protobuf.Any: + * https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/any.proto + */ + 'type_url'?: (string); + /** + * A JSON representation of the above specified type. + */ + 'value'?: (_google_protobuf_Struct); +} + +/** + * A TypedStruct contains an arbitrary JSON serialized protocol buffer message with a URL that + * describes the type of the serialized message. This is very similar to google.protobuf.Any, + * instead of having protocol buffer binary, this employs google.protobuf.Struct as value. + * + * This message is intended to be embedded inside Any, so it shouldn't be directly referred + * from other UDPA messages. + * + * When packing an opaque extension config, packing the expected type into Any is preferred + * wherever possible for its efficiency. TypedStruct should be used only if a proto descriptor + * is not available, for example if: + * - A control plane sends opaque message that is originally from external source in human readable + * format such as JSON or YAML. + * - The control plane doesn't have the knowledge of the protocol buffer schema hence it cannot + * serialize the message in protocol buffer binary format. + * - The DPLB doesn't have have the knowledge of the protocol buffer schema its plugin or extension + * uses. This has to be indicated in the DPLB capability negotiation. + * + * When a DPLB receives a TypedStruct in Any, it should: + * - Check if the type_url of the TypedStruct matches the type the extension expects. + * - Convert value to the type described in type_url and perform validation. + * TODO(lizan): Figure out how TypeStruct should be used with DPLB extensions that doesn't link + * protobuf descriptor with DPLB itself, (e.g. gRPC LB Plugin, Envoy WASM extensions). + */ +export interface TypedStruct__Output { + /** + * A URL that uniquely identifies the type of the serialize protocol buffer message. + * This has same semantics and format described in google.protobuf.Any: + * https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/any.proto + */ + 'type_url': (string); + /** + * A JSON representation of the above specified type. + */ + 'value'?: (_google_protobuf_Struct__Output); +} diff --git a/packages/grpc-js-xds/src/http-filter.ts b/packages/grpc-js-xds/src/http-filter.ts new file mode 100644 index 00000000..e7ea3295 --- /dev/null +++ b/packages/grpc-js-xds/src/http-filter.ts @@ -0,0 +1,221 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This is a non-public, unstable API, but it's very convenient +import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util'; +import { experimental } from '@grpc/grpc-js'; +import { Any__Output } from './generated/google/protobuf/Any'; +import Filter = experimental.Filter; +import FilterFactory = experimental.FilterFactory; +import { TypedStruct__Output } from './generated/udpa/type/v1/TypedStruct'; +import { FilterConfig__Output } from './generated/envoy/config/route/v3/FilterConfig'; +import { HttpFilter__Output } from './generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpFilter'; + +const TYPED_STRUCT_URL = 'type.googleapis.com/udpa.type.v1.TypedStruct'; +const TYPED_STRUCT_NAME = 'udpa.type.v1.TypedStruct'; + +const FILTER_CONFIG_URL = 'type.googleapis.com/envoy.config.route.v3.FilterConfig'; +const FILTER_CONFIG_NAME = 'envoy.config.route.v3.FilterConfig'; + +const resourceRoot = loadProtosWithOptionsSync([ + 'udpa/type/v1/typed_struct.proto', + 'envoy/config/route/v3/route_components.proto'], { + keepCase: true, + includeDirs: [ + // Paths are relative to src/build + __dirname + '/../../deps/udpa/', + __dirname + '/../../deps/envoy-api/' + ], + } +); + +export interface HttpFilterConfig { + typeUrl: string; + config: any; +} + +export interface HttpFilterFactoryConstructor { + new(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig): FilterFactory; +} + +export interface HttpFilterRegistryEntry { + parseTopLevelFilterConfig(encodedConfig: Any__Output): HttpFilterConfig | null; + parseOverrideFilterConfig(encodedConfig: Any__Output): HttpFilterConfig | null; + httpFilterConstructor: HttpFilterFactoryConstructor; +} + +const FILTER_REGISTRY = new Map(); + +export function registerHttpFilter(typeName: string, entry: HttpFilterRegistryEntry) { + FILTER_REGISTRY.set(typeName, entry); +} + +const toObjectOptions = { + longs: String, + enums: String, + defaults: true, + oneofs: true +} + +function parseAnyMessage(message: Any__Output): MessageType | null { + const messageType = resourceRoot.lookup(message.type_url); + if (messageType) { + const decodedMessage = (messageType as any).decode(message.value); + return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as MessageType; + } else { + return null; + } +} + +function getTopLevelFilterUrl(encodedConfig: Any__Output): string { + let typeUrl: string; + if (encodedConfig.type_url === TYPED_STRUCT_URL) { + const typedStruct = parseAnyMessage(encodedConfig) + if (typedStruct) { + return typedStruct.type_url; + } else { + throw new Error('Failed to parse TypedStruct'); + } + } else { + return encodedConfig.type_url; + } +} + +export function validateTopLevelFilter(httpFilter: HttpFilter__Output): boolean { + if (!httpFilter.typed_config) { + return false; + } + const encodedConfig = httpFilter.typed_config; + let typeUrl: string; + try { + typeUrl = getTopLevelFilterUrl(encodedConfig); + } catch (e) { + return false; + } + const registryEntry = FILTER_REGISTRY.get(typeUrl); + if (registryEntry) { + const parsedConfig = registryEntry.parseTopLevelFilterConfig(encodedConfig); + return parsedConfig !== null; + } else { + if (httpFilter.is_optional) { + return true; + } else { + return false; + } + } +} + +export function validateOverrideFilter(encodedConfig: Any__Output): boolean { + let typeUrl: string; + let realConfig: Any__Output; + let isOptional = false; + if (encodedConfig.type_url === FILTER_CONFIG_URL) { + const filterConfig = parseAnyMessage(encodedConfig); + if (filterConfig) { + isOptional = filterConfig.is_optional; + if (filterConfig.config) { + realConfig = filterConfig.config; + } else { + return false; + } + } else { + return false; + } + } else { + realConfig = encodedConfig; + } + if (realConfig.type_url === TYPED_STRUCT_URL) { + const typedStruct = parseAnyMessage(encodedConfig); + if (typedStruct) { + typeUrl = typedStruct.type_url; + } else { + return false; + } + } else { + typeUrl = realConfig.type_url; + } + const registryEntry = FILTER_REGISTRY.get(typeUrl); + if (registryEntry) { + const parsedConfig = registryEntry.parseOverrideFilterConfig(encodedConfig); + return parsedConfig !== null; + } else { + if (isOptional) { + return true; + } else { + return false; + } + } +} + +export function parseTopLevelFilterConfig(encodedConfig: Any__Output) { + let typeUrl: string; + try { + typeUrl = getTopLevelFilterUrl(encodedConfig); + } catch (e) { + return null; + } + const registryEntry = FILTER_REGISTRY.get(typeUrl); + if (registryEntry) { + return registryEntry.parseTopLevelFilterConfig(encodedConfig); + } else { + // Filter type URL not found in registry + return null; + } +} + +export function parseOverrideFilterConfig(encodedConfig: Any__Output) { + let typeUrl: string; + let realConfig: Any__Output; + if (encodedConfig.type_url === FILTER_CONFIG_URL) { + const filterConfig = parseAnyMessage(encodedConfig); + if (filterConfig) { + if (filterConfig.config) { + realConfig = filterConfig.config; + } else { + return null; + } + } else { + return null; + } + } else { + realConfig = encodedConfig; + } + if (realConfig.type_url === TYPED_STRUCT_URL) { + const typedStruct = parseAnyMessage(encodedConfig); + if (typedStruct) { + typeUrl = typedStruct.type_url; + } else { + return null; + } + } else { + typeUrl = realConfig.type_url; + } + const registryEntry = FILTER_REGISTRY.get(typeUrl); + if (registryEntry) { + return registryEntry.parseOverrideFilterConfig(encodedConfig); + } else { + return null; + } +} + +export function createHttpFilter(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig): FilterFactory | null { + const registryEntry = FILTER_REGISTRY.get(config.typeUrl); + if (registryEntry) { + return new registryEntry.httpFilterConstructor(config, overrideConfig); + } else { + return null; + } +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/protobuf-any.ts b/packages/grpc-js-xds/src/protobuf-any.ts new file mode 100644 index 00000000..cfee35f9 --- /dev/null +++ b/packages/grpc-js-xds/src/protobuf-any.ts @@ -0,0 +1,23 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This is a non-public, unstable API, but it's very convenient +import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util'; +import { Any__Output } from './generated/google/protobuf/Any'; + +function parseAnyMessage(encodedMessage: Any__Output) { + +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 892cf571..372d8b05 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -42,6 +42,12 @@ import { RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedCluster import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from './resources'; import Duration = experimental.Duration; import { Duration__Output } from './generated/google/protobuf/Duration'; +import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter'; +import { EXPERIMENTAL_FAULT_INJECTION } from './environment'; +import Filter = experimental.Filter; +import FilterFactory = experimental.FilterFactory; +import BaseFilter = experimental.BaseFilter; +import CallStream = experimental.CallStream; const TRACER_NAME = 'xds_resolver'; @@ -203,6 +209,25 @@ function protoDurationToDuration(duration: Duration__Output): Duration { } } +class NoRouterFilter extends BaseFilter implements Filter { + constructor(private call: CallStream) { + super(); + } + + sendMetadata(metadata: Promise): Promise { + this.call.cancelWithStatus(status.UNAVAILABLE, 'no xDS HTTP router filter configured'); + return Promise.reject(new Error('no xDS HTTP router filter configured')); + } +} + +class NoRouterFilterFactory implements FilterFactory { + createFilter(callStream: CallStream): NoRouterFilter { + return new NoRouterFilter(callStream); + } +} + +const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router'; + class XdsResolver implements Resolver { private hasReportedSuccess = false; @@ -221,6 +246,9 @@ class XdsResolver implements Resolver { private latestDefaultTimeout: Duration | undefined = undefined; + private ldsHttpFilterConfigs: {name: string, config: HttpFilterConfig}[] = []; + private hasRouterFilter = false; + constructor( private target: GrpcUri, private listener: ResolverListener, @@ -235,6 +263,21 @@ class XdsResolver implements Resolver { } else { this.latestDefaultTimeout = protoDurationToDuration(defaultTimeout); } + if (EXPERIMENTAL_FAULT_INJECTION) { + this.ldsHttpFilterConfigs = []; + this.hasRouterFilter = false; + for (const filter of httpConnectionManager.http_filters) { + if (filter.typed_config?.type_url === ROUTER_FILTER_URL) { + this.hasRouterFilter = true; + break; + } + // typed_config must be set here, or validation would have failed + const filterConfig = parseTopLevelFilterConfig(filter.typed_config!); + if (filterConfig) { + this.ldsHttpFilterConfigs.push({name: filter.name, config: filterConfig}); + } + } + } switch (httpConnectionManager.route_specifier) { case 'rds': { const routeConfigName = httpConnectionManager.rds!.route_config_name; @@ -314,6 +357,15 @@ class XdsResolver implements Resolver { this.reportResolutionError('No matching route found'); return; } + const virtualHostHttpFilterOverrides = new Map(); + if (EXPERIMENTAL_FAULT_INJECTION) { + for (const [name, filter] of Object.entries(virtualHost.typed_per_filter_config ?? {})) { + const parsedConfig = parseOverrideFilterConfig(filter); + if (parsedConfig) { + virtualHostHttpFilterOverrides.set(name, parsedConfig); + } + } + } trace('Received virtual host config ' + JSON.stringify(virtualHost, undefined, 2)); const allConfigClusters = new Set(); const matchList: {matcher: Matcher, action: RouteAction}[] = []; @@ -334,20 +386,89 @@ class XdsResolver implements Resolver { if (timeout?.seconds === 0 && timeout.nanos === 0) { timeout = undefined; } + const routeHttpFilterOverrides = new Map(); + if (EXPERIMENTAL_FAULT_INJECTION) { + for (const [name, filter] of Object.entries(route.typed_per_filter_config ?? {})) { + const parsedConfig = parseOverrideFilterConfig(filter); + if (parsedConfig) { + routeHttpFilterOverrides.set(name, parsedConfig); + } + } + } switch (route.route!.cluster_specifier) { case 'cluster_header': continue; case 'cluster':{ const cluster = route.route!.cluster!; allConfigClusters.add(cluster); - routeAction = new SingleClusterRouteAction(cluster, timeout); + const extraFilterFactories: FilterFactory[] = []; + if (EXPERIMENTAL_FAULT_INJECTION) { + for (const filterConfig of this.ldsHttpFilterConfigs) { + if (routeHttpFilterOverrides.has(filterConfig.name)) { + const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!); + if (filter) { + extraFilterFactories.push(filter); + } + } else if (virtualHostHttpFilterOverrides.has(filterConfig.name)) { + const filter = createHttpFilter(filterConfig.config, virtualHostHttpFilterOverrides.get(filterConfig.name)!); + if (filter) { + extraFilterFactories.push(filter); + } + } else { + const filter = createHttpFilter(filterConfig.config); + if (filter) { + extraFilterFactories.push(filter); + } + } + } + if (!this.hasRouterFilter) { + extraFilterFactories.push(new NoRouterFilterFactory()); + } + } + routeAction = new SingleClusterRouteAction(cluster, timeout, extraFilterFactories); break; } case 'weighted_clusters': { const weightedClusters: WeightedCluster[] = []; for (const clusterWeight of route.route!.weighted_clusters!.clusters) { allConfigClusters.add(clusterWeight.name); - weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0}); + const extraFilterFactories: FilterFactory[] = []; + const clusterHttpFilterOverrides = new Map(); + if (EXPERIMENTAL_FAULT_INJECTION) { + for (const [name, filter] of Object.entries(clusterWeight.typed_per_filter_config ?? {})) { + const parsedConfig = parseOverrideFilterConfig(filter); + if (parsedConfig) { + clusterHttpFilterOverrides.set(name, parsedConfig); + } + } + for (const filterConfig of this.ldsHttpFilterConfigs) { + if (clusterHttpFilterOverrides.has(filterConfig.name)) { + const filter = createHttpFilter(filterConfig.config, clusterHttpFilterOverrides.get(filterConfig.name)!); + if (filter) { + extraFilterFactories.push(filter); + } + } else if (routeHttpFilterOverrides.has(filterConfig.name)) { + const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!); + if (filter) { + extraFilterFactories.push(filter); + } + } else if (virtualHostHttpFilterOverrides.has(filterConfig.name)) { + const filter = createHttpFilter(filterConfig.config, virtualHostHttpFilterOverrides.get(filterConfig.name)!); + if (filter) { + extraFilterFactories.push(filter); + } + } else { + const filter = createHttpFilter(filterConfig.config); + if (filter) { + extraFilterFactories.push(filter); + } + } + } + if (!this.hasRouterFilter) { + extraFilterFactories.push(new NoRouterFilterFactory()); + } + } + weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, extraFilterFactories: extraFilterFactories}); } routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, timeout); } @@ -376,16 +497,17 @@ class XdsResolver implements Resolver { const configSelector: ConfigSelector = (methodName, metadata) => { for (const {matcher, action} of matchList) { if (matcher.apply(methodName, metadata)) { - const clusterName = action.getCluster(); - this.refCluster(clusterName); + const clusterResult = action.getCluster(); + this.refCluster(clusterResult.name); const onCommitted = () => { - this.unrefCluster(clusterName); + this.unrefCluster(clusterResult.name); } return { methodConfig: {name: [], timeout: action.getTimeout()}, onCommitted: onCommitted, - pickInformation: {cluster: clusterName}, - status: status.OK + pickInformation: {cluster: clusterResult.name}, + status: status.OK, + extraFilterFactories: clusterResult.extraFilterFactories }; } } @@ -393,7 +515,8 @@ class XdsResolver implements Resolver { methodConfig: {name: []}, // cluster won't be used here, but it's set because of some TypeScript weirdness pickInformation: {cluster: ''}, - status: status.UNAVAILABLE + status: status.UNAVAILABLE, + extraFilterFactories: [] }; }; trace('Created ConfigSelector with configuration:'); diff --git a/packages/grpc-js-xds/src/route-action.ts b/packages/grpc-js-xds/src/route-action.ts index 5574bcd9..a45bf02a 100644 --- a/packages/grpc-js-xds/src/route-action.ts +++ b/packages/grpc-js-xds/src/route-action.ts @@ -16,10 +16,17 @@ import { experimental } from '@grpc/grpc-js'; import Duration = experimental.Duration; +import Filter = experimental.Filter; +import FilterFactory = experimental.FilterFactory; + +export interface ClusterResult { + name: string; + extraFilterFactories: FilterFactory[]; +} export interface RouteAction { toString(): string; - getCluster(): string; + getCluster(): ClusterResult; getTimeout(): Duration | undefined; } @@ -33,10 +40,13 @@ function durationToLogString(duration: Duration) { } export class SingleClusterRouteAction implements RouteAction { - constructor(private cluster: string, private timeout: Duration | undefined) {} + constructor(private cluster: string, private timeout: Duration | undefined, private extraFilterFactories: FilterFactory[]) {} getCluster() { - return this.cluster; + return { + name: this.cluster, + extraFilterFactories: this.extraFilterFactories + }; } toString() { @@ -55,11 +65,13 @@ export class SingleClusterRouteAction implements RouteAction { export interface WeightedCluster { name: string; weight: number; + extraFilterFactories: FilterFactory[]; } interface ClusterChoice { name: string; numerator: number; + extraFilterFactories: FilterFactory[]; } export class WeightedClusterRouteAction implements RouteAction { @@ -72,7 +84,7 @@ export class WeightedClusterRouteAction implements RouteAction { let lastNumerator = 0; for (const clusterWeight of clusters) { lastNumerator += clusterWeight.weight; - this.clusterChoices.push({name: clusterWeight.name, numerator: lastNumerator}); + this.clusterChoices.push({name: clusterWeight.name, numerator: lastNumerator, extraFilterFactories: clusterWeight.extraFilterFactories}); } } @@ -80,11 +92,14 @@ export class WeightedClusterRouteAction implements RouteAction { const randomNumber = Math.random() * this.totalWeight; for (const choice of this.clusterChoices) { if (randomNumber < choice.numerator) { - return choice.name; + return { + name: choice.name, + extraFilterFactories: choice.extraFilterFactories + }; } } // This should be prevented by the validation rules - return ''; + return {name: '', extraFilterFactories: []}; } toString() { diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts index eb76f799..10ada540 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -22,6 +22,8 @@ import { RdsState } from "./rds-state"; import { Watcher, XdsStreamState } from "./xds-stream-state"; import { HttpConnectionManager__Output } from '../generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager'; import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V2, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from '../resources'; +import { validateTopLevelFilter } from '../http-filter'; +import { EXPERIMENTAL_FAULT_INJECTION } from '../environment'; const TRACER_NAME = 'xds_client'; @@ -100,6 +102,13 @@ export class LdsState implements XdsStreamState { return false; } const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener.value); + if (EXPERIMENTAL_FAULT_INJECTION) { + for (const httpFilter of httpConnectionManager.http_filters) { + if (!validateTopLevelFilter(httpFilter)) { + return false; + } + } + } switch (httpConnectionManager.route_specifier) { case 'rds': return !!httpConnectionManager.rds?.config_source?.ads; diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index 1e50f22e..9441a7ec 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -16,7 +16,9 @@ */ import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { EXPERIMENTAL_FAULT_INJECTION } from "../environment"; import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration"; +import { validateOverrideFilter } from "../http-filter"; import { CdsLoadBalancingConfig } from "../load-balancer-cds"; import { Watcher, XdsStreamState } from "./xds-stream-state"; import ServiceConfig = experimental.ServiceConfig; @@ -112,6 +114,13 @@ export class RdsState implements XdsStreamState { return false; } } + if (EXPERIMENTAL_FAULT_INJECTION) { + for (const filterConfig of Object.values(virtualHost.typed_per_filter_config ?? {})) { + if (!validateOverrideFilter(filterConfig)) { + return false; + } + } + } for (const route of virtualHost.routes) { const match = route.match; if (!match) { @@ -131,6 +140,13 @@ export class RdsState implements XdsStreamState { if ((route.route === undefined) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) { return false; } + if (EXPERIMENTAL_FAULT_INJECTION) { + for (const filterConfig of Object.values(route.typed_per_filter_config ?? {})) { + if (!validateOverrideFilter(filterConfig)) { + return false; + } + } + } if (route.route!.cluster_specifier === 'weighted_clusters') { let weightSum = 0; for (const clusterWeight of route.route.weighted_clusters!.clusters) { @@ -139,6 +155,15 @@ export class RdsState implements XdsStreamState { if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) { return false; } + if (EXPERIMENTAL_FAULT_INJECTION) { + for (const weightedCluster of route.route!.weighted_clusters!.clusters) { + for (const filterConfig of Object.values(weightedCluster.typed_per_filter_config ?? {})) { + if (!validateOverrideFilter(filterConfig)) { + return false; + } + } + } + } } } } diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index ae634260..e9e1cb8d 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -721,6 +721,10 @@ export class Http2CallStream implements Call { this.configDeadline = configDeadline; } + addFilterFactories(extraFilterFactories: FilterFactory[]) { + this.filterStack.push(extraFilterFactories.map(filterFactory => filterFactory.createFilter(this))); + } + startRead() { /* If the stream has ended with an error, we should not emit any more * messages and we should communicate that the stream has ended */ diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index cd2fc94a..859a6e76 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -536,6 +536,7 @@ export class ChannelImplementation implements Channel { // Refreshing the filters makes the deadline filter pick up the new deadline stream.filterStack.refresh(); } + stream.addFilterFactories(callConfig.extraFilterFactories); this.tryPick(stream, metadata, callConfig); } else { stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod()); diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 147ace30..cf5f7ee2 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -24,12 +24,14 @@ import { GrpcUri, uriToString } from './uri-parser'; import { ChannelOptions } from './channel-options'; import { Metadata } from './metadata'; import { Status } from './constants'; +import { Filter, FilterFactory } from './filter'; export interface CallConfig { methodConfig: MethodConfig; onCommitted?: () => void; pickInformation: {[key: string]: string}; status: Status; + extraFilterFactories: FilterFactory[]; } /** diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 94dd8c4a..3f84af37 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -58,7 +58,8 @@ function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSe return { methodConfig: methodConfig, pickInformation: {}, - status: Status.OK + status: Status.OK, + extraFilterFactories: [] }; } } @@ -67,7 +68,8 @@ function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSe return { methodConfig: {name: []}, pickInformation: {}, - status: Status.OK + status: Status.OK, + extraFilterFactories: [] }; } } From a5449155049351c81b4ffead4a3004217e504568 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 15 Jul 2021 11:29:28 -0700 Subject: [PATCH 2/5] Add router filter registry entry --- .../src/http-filter/router-filter.ts | 49 +++++++++++++++++++ packages/grpc-js-xds/src/index.ts | 2 + 2 files changed, 51 insertions(+) create mode 100644 packages/grpc-js-xds/src/http-filter/router-filter.ts diff --git a/packages/grpc-js-xds/src/http-filter/router-filter.ts b/packages/grpc-js-xds/src/http-filter/router-filter.ts new file mode 100644 index 00000000..81450c0f --- /dev/null +++ b/packages/grpc-js-xds/src/http-filter/router-filter.ts @@ -0,0 +1,49 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { experimental } from '@grpc/grpc-js'; +import { Any__Output } from '../generated/google/protobuf/Any'; +import { HttpFilterConfig, registerHttpFilter } from '../http-filter'; +import Filter = experimental.Filter; +import FilterFactory = experimental.FilterFactory; +import BaseFilter = experimental.BaseFilter; + +class RouterFilter extends BaseFilter implements Filter {} + +class RouterFilterFactory implements FilterFactory { + constructor(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig) {} + + createFilter(callStream: experimental.CallStream): RouterFilter { + return new RouterFilter(); + } +} + +const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router'; + +function parseConfig(encodedConfig: Any__Output): HttpFilterConfig | null { + return { + typeUrl: ROUTER_FILTER_URL, + config: null + }; +} + +export function setup() { + registerHttpFilter(ROUTER_FILTER_URL, { + parseTopLevelFilterConfig: parseConfig, + parseOverrideFilterConfig: parseConfig, + httpFilterConstructor: RouterFilterFactory + }); +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/index.ts b/packages/grpc-js-xds/src/index.ts index 1b24d25e..2fee339d 100644 --- a/packages/grpc-js-xds/src/index.ts +++ b/packages/grpc-js-xds/src/index.ts @@ -22,6 +22,7 @@ import * as load_balancer_lrs from './load-balancer-lrs'; import * as load_balancer_priority from './load-balancer-priority'; import * as load_balancer_weighted_target from './load-balancer-weighted-target'; import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager'; +import * as router_filter from './http-filter/router-filter'; /** * Register the "xds:" name scheme with the @grpc/grpc-js library. @@ -34,4 +35,5 @@ export function register() { load_balancer_priority.setup(); load_balancer_weighted_target.setup(); load_balancer_xds_cluster_manager.setup(); + router_filter.setup(); } \ No newline at end of file From d0745b3a4caf84525357cbcca7f62ad625dd16fd Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 15 Jul 2021 14:56:47 -0700 Subject: [PATCH 3/5] Run call config filter factories before load balancing --- packages/grpc-js-xds/src/resolver-xds.ts | 6 +-- packages/grpc-js-xds/src/route-action.ts | 14 ++--- packages/grpc-js/src/call-stream.ts | 8 +-- packages/grpc-js/src/channel.ts | 51 ++++++++++++++----- packages/grpc-js/src/filter-stack.ts | 4 ++ packages/grpc-js/src/resolver.ts | 2 +- .../grpc-js/src/resolving-load-balancer.ts | 4 +- packages/grpc-js/src/subchannel.ts | 4 +- 8 files changed, 61 insertions(+), 32 deletions(-) diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 372d8b05..c48cc251 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -468,7 +468,7 @@ class XdsResolver implements Resolver { extraFilterFactories.push(new NoRouterFilterFactory()); } } - weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, extraFilterFactories: extraFilterFactories}); + weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, dynamicFilterFactories: extraFilterFactories}); } routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, timeout); } @@ -507,7 +507,7 @@ class XdsResolver implements Resolver { onCommitted: onCommitted, pickInformation: {cluster: clusterResult.name}, status: status.OK, - extraFilterFactories: clusterResult.extraFilterFactories + dynamicFilterFactories: clusterResult.dynamicFilterFactories }; } } @@ -516,7 +516,7 @@ class XdsResolver implements Resolver { // cluster won't be used here, but it's set because of some TypeScript weirdness pickInformation: {cluster: ''}, status: status.UNAVAILABLE, - extraFilterFactories: [] + dynamicFilterFactories: [] }; }; trace('Created ConfigSelector with configuration:'); diff --git a/packages/grpc-js-xds/src/route-action.ts b/packages/grpc-js-xds/src/route-action.ts index a45bf02a..d29e67b9 100644 --- a/packages/grpc-js-xds/src/route-action.ts +++ b/packages/grpc-js-xds/src/route-action.ts @@ -21,7 +21,7 @@ import FilterFactory = experimental.FilterFactory; export interface ClusterResult { name: string; - extraFilterFactories: FilterFactory[]; + dynamicFilterFactories: FilterFactory[]; } export interface RouteAction { @@ -45,7 +45,7 @@ export class SingleClusterRouteAction implements RouteAction { getCluster() { return { name: this.cluster, - extraFilterFactories: this.extraFilterFactories + dynamicFilterFactories: this.extraFilterFactories }; } @@ -65,13 +65,13 @@ export class SingleClusterRouteAction implements RouteAction { export interface WeightedCluster { name: string; weight: number; - extraFilterFactories: FilterFactory[]; + dynamicFilterFactories: FilterFactory[]; } interface ClusterChoice { name: string; numerator: number; - extraFilterFactories: FilterFactory[]; + dynamicFilterFactories: FilterFactory[]; } export class WeightedClusterRouteAction implements RouteAction { @@ -84,7 +84,7 @@ export class WeightedClusterRouteAction implements RouteAction { let lastNumerator = 0; for (const clusterWeight of clusters) { lastNumerator += clusterWeight.weight; - this.clusterChoices.push({name: clusterWeight.name, numerator: lastNumerator, extraFilterFactories: clusterWeight.extraFilterFactories}); + this.clusterChoices.push({name: clusterWeight.name, numerator: lastNumerator, dynamicFilterFactories: clusterWeight.dynamicFilterFactories}); } } @@ -94,12 +94,12 @@ export class WeightedClusterRouteAction implements RouteAction { if (randomNumber < choice.numerator) { return { name: choice.name, - extraFilterFactories: choice.extraFilterFactories + dynamicFilterFactories: choice.dynamicFilterFactories }; } } // This should be prevented by the validation rules - return {name: '', extraFilterFactories: []}; + return {name: '', dynamicFilterFactories: []}; } toString() { diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index e9e1cb8d..accc275f 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -462,9 +462,9 @@ export class Http2CallStream implements Call { attachHttp2Stream( stream: http2.ClientHttp2Stream, subchannel: Subchannel, - extraFilters: FilterFactory[] + extraFilters: Filter[] ): void { - this.filterStack.push(extraFilters.map(filterFactory => filterFactory.createFilter(this))); + this.filterStack.push(extraFilters); if (this.finalStatus !== null) { stream.close(NGHTTP2_CANCEL); } else { @@ -721,8 +721,8 @@ export class Http2CallStream implements Call { this.configDeadline = configDeadline; } - addFilterFactories(extraFilterFactories: FilterFactory[]) { - this.filterStack.push(extraFilterFactories.map(filterFactory => filterFactory.createFilter(this))); + addFilters(extraFilters: Filter[]) { + this.filterStack.push(extraFilters); } startRead() { diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 859a6e76..8b5f31bf 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -41,6 +41,7 @@ import { mapProxyName } from './http_proxy'; import { GrpcUri, parseUri, uriToString } from './uri-parser'; import { ServerSurfaceCall } from './server-call'; import { SurfaceCall } from './call'; +import { Filter } from './filter'; export enum ConnectivityState { IDLE, @@ -148,6 +149,7 @@ export class ChannelImplementation implements Channel { callStream: Http2CallStream; callMetadata: Metadata; callConfig: CallConfig; + dynamicFilters: Filter[]; }> = []; private connectivityStateWatchers: ConnectivityStateWatcher[] = []; private defaultAuthority: string; @@ -237,8 +239,8 @@ export class ChannelImplementation implements Channel { const queueCopy = this.pickQueue.slice(); this.pickQueue = []; this.callRefTimerUnref(); - for (const { callStream, callMetadata, callConfig } of queueCopy) { - this.tryPick(callStream, callMetadata, callConfig); + for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) { + this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); } this.updateState(connectivityState); }, @@ -308,8 +310,8 @@ export class ChannelImplementation implements Channel { } } - private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) { - this.pickQueue.push({ callStream, callMetadata, callConfig }); + private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig, dynamicFilters: Filter[]) { + this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters }); this.callRefTimerRef(); } @@ -320,7 +322,10 @@ export class ChannelImplementation implements Channel { * @param callStream * @param callMetadata */ - private tryPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) { + private tryPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig, dynamicFilters: Filter[]) { + if (callStream.getStatus() !== null) { + return; + } const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation }); trace( LogVerbosity.DEBUG, @@ -357,7 +362,7 @@ export class ChannelImplementation implements Channel { ' has state ' + ConnectivityState[pickResult.subchannel!.getConnectivityState()] ); - this.pushPick(callStream, callMetadata, callConfig); + this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); break; } /* We need to clone the callMetadata here because the transparent @@ -370,10 +375,11 @@ export class ChannelImplementation implements Channel { const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState(); if (subchannelState === ConnectivityState.READY) { try { + const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream)); pickResult.subchannel!.startCallStream( finalMetadata, callStream, - pickResult.extraFilterFactories + [...dynamicFilters, ...pickExtraFilters] ); /* If we reach this point, the call stream has started * successfully */ @@ -406,7 +412,7 @@ export class ChannelImplementation implements Channel { (error as Error).message + '. Retrying pick' ); - this.tryPick(callStream, callMetadata, callConfig); + this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); } else { trace( LogVerbosity.INFO, @@ -435,7 +441,7 @@ export class ChannelImplementation implements Channel { ConnectivityState[subchannelState] + ' after metadata filters. Retrying pick' ); - this.tryPick(callStream, callMetadata, callConfig); + this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); } }, (error: Error & { code: number }) => { @@ -449,11 +455,11 @@ export class ChannelImplementation implements Channel { } break; case PickResultType.QUEUE: - this.pushPick(callStream, callMetadata, callConfig); + this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); break; case PickResultType.TRANSIENT_FAILURE: if (callMetadata.getOptions().waitForReady) { - this.pushPick(callStream, callMetadata, callConfig); + this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); } else { callStream.cancelWithStatus( pickResult.status!.code, @@ -536,8 +542,27 @@ export class ChannelImplementation implements Channel { // Refreshing the filters makes the deadline filter pick up the new deadline stream.filterStack.refresh(); } - stream.addFilterFactories(callConfig.extraFilterFactories); - this.tryPick(stream, metadata, callConfig); + if (callConfig.dynamicFilterFactories.length > 0) { + /* These dynamicFilters are the mechanism for implementing gRFC A39: + * https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md + * We run them here instead of with the rest of the filters because + * that spec says "the xDS HTTP filters will run in between name + * resolution and load balancing". + * + * We use the filter stack here to simplify the multi-filter async + * waterfall logic, but we pass along the underlying list of filters + * to avoid having nested filter stacks when combining it with the + * original filter stack. We do not pass along the original filter + * factory list because these filters may need to persist data + * between sending headers and other operations. */ + const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories); + const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream); + dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => { + this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters()); + }); + } else { + this.tryPick(stream, metadata, callConfig, []); + } } else { stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod()); } diff --git a/packages/grpc-js/src/filter-stack.ts b/packages/grpc-js/src/filter-stack.ts index 4e0ccf07..a9e75442 100644 --- a/packages/grpc-js/src/filter-stack.ts +++ b/packages/grpc-js/src/filter-stack.ts @@ -81,6 +81,10 @@ export class FilterStack implements Filter { push(filters: Filter[]) { this.filters.unshift(...filters); } + + getFilters(): Filter[] { + return this.filters; + } } export class FilterStackFactory implements FilterFactory { diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index cf5f7ee2..57db665c 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -31,7 +31,7 @@ export interface CallConfig { onCommitted?: () => void; pickInformation: {[key: string]: string}; status: Status; - extraFilterFactories: FilterFactory[]; + dynamicFilterFactories: FilterFactory[]; } /** diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 3f84af37..7f6b41fd 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -59,7 +59,7 @@ function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSe methodConfig: methodConfig, pickInformation: {}, status: Status.OK, - extraFilterFactories: [] + dynamicFilterFactories: [] }; } } @@ -69,7 +69,7 @@ function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSe methodConfig: {name: []}, pickInformation: {}, status: Status.OK, - extraFilterFactories: [] + dynamicFilterFactories: [] }; } } diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index c50a68e7..6dc3e9b3 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -688,7 +688,7 @@ export class Subchannel { startCallStream( metadata: Metadata, callStream: Http2CallStream, - extraFilterFactories: FilterFactory[] + extraFilters: Filter[] ) { const headers = metadata.toHttp2Headers(); headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); @@ -720,7 +720,7 @@ export class Subchannel { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; } logging.trace(LogVerbosity.DEBUG, 'call_stream', 'Starting stream on subchannel ' + this.subchannelAddressString + ' with headers\n' + headersString); - callStream.attachHttp2Stream(http2Stream, this, extraFilterFactories); + callStream.attachHttp2Stream(http2Stream, this, extraFilters); } /** From f03b4dd87fdc9a56e4ef652ccdb9213f7553e97c Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 20 Jul 2021 14:12:59 -0700 Subject: [PATCH 4/5] Validate uniqueness of http filter names --- packages/grpc-js-xds/src/xds-stream-state/rds-state.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index 9441a7ec..0322218d 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -141,7 +141,12 @@ export class RdsState implements XdsStreamState { return false; } if (EXPERIMENTAL_FAULT_INJECTION) { - for (const filterConfig of Object.values(route.typed_per_filter_config ?? {})) { + const filterNames = new Set(); + for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) { + if (filterNames.has(name)) { + return false; + } + filterNames.add(name); if (!validateOverrideFilter(filterConfig)) { return false; } From 215cdcd1344b4b1426d989772ae6219770476777 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 22 Jul 2021 12:42:11 -0700 Subject: [PATCH 5/5] Check for router filter in validation step --- packages/grpc-js-xds/src/resolver-xds.ts | 33 ------------------- .../src/xds-stream-state/lds-state.ts | 20 ++++++++++- .../src/xds-stream-state/rds-state.ts | 5 --- 3 files changed, 19 insertions(+), 39 deletions(-) diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 59a3fec4..3eaabb11 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -46,8 +46,6 @@ import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTop import { EXPERIMENTAL_FAULT_INJECTION } from './environment'; import Filter = experimental.Filter; import FilterFactory = experimental.FilterFactory; -import BaseFilter = experimental.BaseFilter; -import CallStream = experimental.CallStream; const TRACER_NAME = 'xds_resolver'; @@ -209,25 +207,6 @@ function protoDurationToDuration(duration: Duration__Output): Duration { } } -class NoRouterFilter extends BaseFilter implements Filter { - constructor(private call: CallStream) { - super(); - } - - sendMetadata(metadata: Promise): Promise { - this.call.cancelWithStatus(status.UNAVAILABLE, 'no xDS HTTP router filter configured'); - return Promise.reject(new Error('no xDS HTTP router filter configured')); - } -} - -class NoRouterFilterFactory implements FilterFactory { - createFilter(callStream: CallStream): NoRouterFilter { - return new NoRouterFilter(callStream); - } -} - -const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router'; - class XdsResolver implements Resolver { private hasReportedSuccess = false; @@ -247,7 +226,6 @@ class XdsResolver implements Resolver { private latestDefaultTimeout: Duration | undefined = undefined; private ldsHttpFilterConfigs: {name: string, config: HttpFilterConfig}[] = []; - private hasRouterFilter = false; constructor( private target: GrpcUri, @@ -265,12 +243,7 @@ class XdsResolver implements Resolver { } if (EXPERIMENTAL_FAULT_INJECTION) { this.ldsHttpFilterConfigs = []; - this.hasRouterFilter = false; for (const filter of httpConnectionManager.http_filters) { - if (filter.typed_config?.type_url === ROUTER_FILTER_URL) { - this.hasRouterFilter = true; - break; - } // typed_config must be set here, or validation would have failed const filterConfig = parseTopLevelFilterConfig(filter.typed_config!); if (filterConfig) { @@ -421,9 +394,6 @@ class XdsResolver implements Resolver { } } } - if (!this.hasRouterFilter) { - extraFilterFactories.push(new NoRouterFilterFactory()); - } } routeAction = new SingleClusterRouteAction(cluster, timeout, extraFilterFactories); break; @@ -464,9 +434,6 @@ class XdsResolver implements Resolver { } } } - if (!this.hasRouterFilter) { - extraFilterFactories.push(new NoRouterFilterFactory()); - } } weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, dynamicFilterFactories: extraFilterFactories}); } diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts index 10ada540..3efc64b9 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -31,6 +31,8 @@ function trace(text: string): void { experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); } +const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router'; + export class LdsState implements XdsStreamState { versionInfo = ''; nonce = ''; @@ -103,10 +105,26 @@ export class LdsState implements XdsStreamState { } const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener.value); if (EXPERIMENTAL_FAULT_INJECTION) { - for (const httpFilter of httpConnectionManager.http_filters) { + const filterNames = new Set(); + for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) { + if (filterNames.has(httpFilter.name)) { + return false; + } + filterNames.add(httpFilter.name); if (!validateTopLevelFilter(httpFilter)) { return false; } + /* Validate that the last filter, and only the last filter, is the + * router filter. */ + if (index < httpConnectionManager.http_filters.length - 1) { + if (httpFilter.name === ROUTER_FILTER_URL) { + return false; + } + } else { + if (httpFilter.name !== ROUTER_FILTER_URL) { + return false; + } + } } } switch (httpConnectionManager.route_specifier) { diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index 0322218d..f0469285 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -141,12 +141,7 @@ export class RdsState implements XdsStreamState { return false; } if (EXPERIMENTAL_FAULT_INJECTION) { - const filterNames = new Set(); for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) { - if (filterNames.has(name)) { - return false; - } - filterNames.add(name); if (!validateOverrideFilter(filterConfig)) { return false; }