From b5fd5b033ee6e876e4bc0304ff8c31a8bfd0b8a4 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 3 Aug 2021 15:08:10 -0700 Subject: [PATCH] grpc-js-xds: Add fault injection HTTP filter --- .../src/http-filter/fault-injection-filter.ts | 333 ++++++++++++++++++ packages/grpc-js-xds/src/index.ts | 2 + 2 files changed, 335 insertions(+) create mode 100644 packages/grpc-js-xds/src/http-filter/fault-injection-filter.ts diff --git a/packages/grpc-js-xds/src/http-filter/fault-injection-filter.ts b/packages/grpc-js-xds/src/http-filter/fault-injection-filter.ts new file mode 100644 index 00000000..825b7c05 --- /dev/null +++ b/packages/grpc-js-xds/src/http-filter/fault-injection-filter.ts @@ -0,0 +1,333 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This is a non-public, unstable API, but it's very convenient +import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util'; +import { experimental, Metadata, status } from '@grpc/grpc-js'; +import { Any__Output } from '../generated/google/protobuf/Any'; +import Filter = experimental.Filter; +import FilterFactory = experimental.FilterFactory; +import BaseFilter = experimental.BaseFilter; +import CallStream = experimental.CallStream; +import { HttpFilterConfig, registerHttpFilter } from '../http-filter'; +import { HTTPFault__Output } from '../generated/envoy/extensions/filters/http/fault/v3/HTTPFault'; +import { envoyFractionToFraction, Fraction } from '../fraction'; +import { Duration__Output } from '../generated/google/protobuf/Duration'; + +const resourceRoot = loadProtosWithOptionsSync([ + 'envoy/extendsion/filtesr/http/fault/v3/fault.proto'], { + keepCase: true, + includeDirs: [ + // Paths are relative to src/build + __dirname + '/../../deps/udpa/', + __dirname + '/../../deps/envoy-api/', + __dirname + '/../../deps/protoc-gen-validate/' + ], + } +); + +interface FixedDelayConfig { + kind: 'fixed'; + durationMs: number; + percentage: Fraction; +} + +interface HeaderDelayConfig { + kind: 'header'; + percentage: Fraction; +} + +interface GrpcAbortConfig { + kind: 'grpc'; + code: status; + percentage: Fraction; +} + +interface HeaderAbortConfig { + kind: 'header'; + percentage: Fraction; +} + +interface FaultInjectionConfig { + delay: FixedDelayConfig | HeaderDelayConfig | null; + abort: GrpcAbortConfig | HeaderAbortConfig | null; + maxActiveFaults: number; +} + +interface FaultInjectionFilterConfig extends HttpFilterConfig { + typeUrl: 'type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault'; + config: FaultInjectionConfig; +} + +const FAULT_INJECTION_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault'; + +const toObjectOptions = { + longs: String, + enums: String, + defaults: true, + oneofs: true +} + +function parseAnyMessage(message: Any__Output): MessageType | null { + const messageType = resourceRoot.lookup(message.type_url); + if (messageType) { + const decodedMessage = (messageType as any).decode(message.value); + return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as MessageType; + } else { + return null; + } +} + +function durationToMs(duration: Duration__Output): number { + return Number.parseInt(duration.seconds) * 1000 + duration.nanos / 1_000_000; +} + +function httpCodeToGrpcStatus(code: number): status { + switch (code) { + case 400: return status.INTERNAL; + case 401: return status.UNAUTHENTICATED; + case 403: return status.PERMISSION_DENIED; + case 404: return status.UNIMPLEMENTED; + case 429: return status.UNAVAILABLE; + case 502: return status.UNAVAILABLE; + case 503: return status.UNAVAILABLE; + case 504: return status.UNAVAILABLE; + default: return status.UNKNOWN; + } +} + +function parseHTTPFaultConfig(encodedConfig: Any__Output): FaultInjectionFilterConfig | null { + if (encodedConfig.type_url !== FAULT_INJECTION_FILTER_URL) { + return null; + } + const parsedMessage = parseAnyMessage(encodedConfig); + if (parsedMessage === null) { + return null; + } + const result: FaultInjectionConfig = { + delay: null, + abort: null, + maxActiveFaults: Infinity + }; + // Parse delay field + if (parsedMessage.delay !== null) { + if (parsedMessage.delay.percentage === null) { + return null; + } + const percentage = envoyFractionToFraction(parsedMessage.delay.percentage); + switch (parsedMessage.delay.fault_delay_secifier /* sic */) { + case 'fixed_delay': + result.delay = { + kind: 'fixed', + durationMs: durationToMs(parsedMessage.delay.fixed_delay!), + percentage: percentage + }; + break; + case 'header_delay': + result.delay = { + kind: 'header', + percentage: percentage + }; + break; + default: + // Should not be possible + return null; + } + } + // Parse abort field + if (parsedMessage.abort !== null) { + if (parsedMessage.abort.percentage === null) { + return null; + } + const percentage = envoyFractionToFraction(parsedMessage.abort.percentage); + switch (parsedMessage.abort.error_type) { + case 'http_status': + result.abort = { + kind: 'grpc', + code: httpCodeToGrpcStatus(parsedMessage.abort.http_status!), + percentage: percentage + }; + break; + case 'grpc_status': + result.abort = { + kind: 'grpc', + code: parsedMessage.abort.grpc_status!, + percentage: percentage + } + break; + case 'header_abort': + result.abort = { + kind: 'header', + percentage: percentage + }; + break; + default: + // Should not be possible + return null; + } + } + // Parse max_active_faults field + if (parsedMessage.max_active_faults !== null) { + result.maxActiveFaults = parsedMessage.max_active_faults.value; + } + return { + typeUrl: FAULT_INJECTION_FILTER_URL, + config: result + }; +} + +function asyncTimeout(timeMs: number): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => { + resolve(); + }, timeMs); + }); +} + +/** + * Returns true with probability numerator/denominator. + * @param numerator + * @param denominator + */ +function rollRandomPercentage(numerator: number, denominator: number): boolean { + return Math.random() * denominator < numerator; +} + +const DELAY_DURATION_HEADER_KEY = 'x-envoy-fault-delay-request'; +const DELAY_PERCENTAGE_HEADER_KEY = 'x-envoy-fault-delay-request-percentage'; +const ABORT_GRPC_HEADER_KEY = 'x-envoy-fault-abort-grpc-request'; +const ABORT_HTTP_HEADER_KEY = 'x-envoy-fault-abort-request'; +const ABORT_PERCENTAGE_HEADER_KEY = 'x-envoy-fault-abort-request-percentage'; + +const NUMBER_REGEX = /\d+/; + +let totalActiveFaults = 0; + +class FaultInjectionFilter extends BaseFilter implements Filter { + constructor(private callStream: CallStream, private config: FaultInjectionConfig) { + super(); + } + + async sendMetadata(metadataPromise: Promise): Promise { + const metadata = await metadataPromise; + // Handle delay + if (totalActiveFaults < this.config.maxActiveFaults && this.config.delay) { + let duration = 0; + let numerator = this.config.delay.percentage.numerator; + const denominator = this.config.delay.percentage.denominator; + if (this.config.delay.kind === 'fixed') { + duration = this.config.delay.durationMs; + } else { + const durationHeader = metadata.get(DELAY_DURATION_HEADER_KEY); + for (const value of durationHeader) { + if (typeof value !== 'string') { + continue; + } + if (NUMBER_REGEX.test(value)) { + duration = Number.parseInt(value); + break; + } + } + const percentageHeader = metadata.get(DELAY_PERCENTAGE_HEADER_KEY); + for (const value of percentageHeader) { + if (typeof value !== 'string') { + continue; + } + if (NUMBER_REGEX.test(value)) { + numerator = Math.min(numerator, Number.parseInt(value)); + break; + } + } + } + if (rollRandomPercentage(numerator, denominator)) { + totalActiveFaults++; + await asyncTimeout(duration); + totalActiveFaults--; + } + } + // Handle abort + if (totalActiveFaults < this.config.maxActiveFaults && this.config.abort) { + let abortStatus: status | null = null; + let numerator = this.config.abort.percentage.numerator; + const denominator = this.config.abort.percentage.denominator; + if (this.config.abort.kind === 'grpc') { + abortStatus = this.config.abort.code; + } else { + const grpcStatusHeader = metadata.get(ABORT_GRPC_HEADER_KEY); + for (const value of grpcStatusHeader) { + if (typeof value !== 'string') { + continue; + } + if (NUMBER_REGEX.test(value)) { + abortStatus = Number.parseInt(value); + break; + } + } + /* Fall back to looking for HTTP status header if the gRPC status + * header is not present. */ + if (abortStatus === null) { + const httpStatusHeader = metadata.get(ABORT_HTTP_HEADER_KEY); + for (const value of httpStatusHeader) { + if (typeof value !== 'string') { + continue; + } + if (NUMBER_REGEX.test(value)) { + abortStatus = httpCodeToGrpcStatus(Number.parseInt(value)); + break; + } + } + } + const percentageHeader = metadata.get(ABORT_PERCENTAGE_HEADER_KEY); + for (const value of percentageHeader) { + if (typeof value !== 'string') { + continue; + } + if (NUMBER_REGEX.test(value)) { + numerator = Math.min(numerator, Number.parseInt(value)); + break; + } + } + } + if (abortStatus !== null && rollRandomPercentage(numerator, denominator)) { + this.callStream.cancelWithStatus(abortStatus, 'Fault injected'); + } + } + return metadata; + } +} + +class FaultInjectionFilterFactory implements FilterFactory { + private config: FaultInjectionConfig; + constructor(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig) { + if (overrideConfig?.typeUrl === FAULT_INJECTION_FILTER_URL) { + this.config = overrideConfig.config; + } else { + this.config = config.config; + } + } + + createFilter(callStream: experimental.CallStream): FaultInjectionFilter { + return new FaultInjectionFilter(callStream, this.config); + } +} + +export function setup() { + registerHttpFilter(FAULT_INJECTION_FILTER_URL, { + parseTopLevelFilterConfig: parseHTTPFaultConfig, + parseOverrideFilterConfig: parseHTTPFaultConfig, + httpFilterConstructor: FaultInjectionFilterFactory + }); +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/index.ts b/packages/grpc-js-xds/src/index.ts index 2fee339d..7d35bcd1 100644 --- a/packages/grpc-js-xds/src/index.ts +++ b/packages/grpc-js-xds/src/index.ts @@ -23,6 +23,7 @@ import * as load_balancer_priority from './load-balancer-priority'; import * as load_balancer_weighted_target from './load-balancer-weighted-target'; import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager'; import * as router_filter from './http-filter/router-filter'; +import * as fault_injection_filter from './http-filter/fault-injection-filter'; /** * Register the "xds:" name scheme with the @grpc/grpc-js library. @@ -36,4 +37,5 @@ export function register() { load_balancer_weighted_target.setup(); load_balancer_xds_cluster_manager.setup(); router_filter.setup(); + fault_injection_filter.setup(); } \ No newline at end of file