grpc-js-xds: Implement xDS Server

This commit is contained in:
Michael Lumish 2024-03-04 09:29:46 -08:00
parent cfa8072099
commit bac66ad291
29 changed files with 2380 additions and 701 deletions

View File

@ -38,6 +38,7 @@
"@types/mocha": "^5.2.6",
"@types/node": "^13.11.1",
"@types/yargs": "^15.0.5",
"find-free-ports": "^3.1.1",
"gts": "^5.0.1",
"typescript": "^4.9.5",
"yargs": "^15.4.1"

View File

@ -0,0 +1,192 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as net from 'net';
import { CidrRange__Output } from './generated/envoy/config/core/v3/CidrRange';
const IPV4_COMPONENT_COUNT = 4n;
const IPV4_COMPONENT_SIZE = 8n;
const IPV4_COMPONENT_CAP = 1n << IPV4_COMPONENT_SIZE;
const IPV4_TOTAL_SIZE = IPV4_COMPONENT_COUNT * IPV4_COMPONENT_SIZE;
const IPV6_COMPONENT_SIZE = 16n;
const IPV6_COMPONENT_COUNT = 8n;
const IPV6_COMPONENT_CAP = 1n << IPV6_COMPONENT_SIZE;
const IPV6_TOTAL_SIZE = IPV6_COMPONENT_COUNT * IPV6_COMPONENT_SIZE;
export interface CidrRange {
addressPrefix: string;
prefixLen: number;
}
export function parseIPv4(address: string): bigint {
return address.split('.').map(component => BigInt(component)).reduce((accumulator, current) => accumulator * IPV4_COMPONENT_CAP + current, 0n);
}
export function parseIPv6(address: string): bigint {
/* If an IPv6 address contains two or more consecutive components with value
* which can be collectively represented with the string '::'. For example,
* the IPv6 adddress 0:0:0:0:0:0:0:1 can also be represented as ::1. Here we
* expand any :: into the correct number of individual components. */
const sections = address.split('::');
let components: string[];
if (sections.length === 1) {
components = sections[0].split(':');
} else if (sections.length === 2) {
const beginning = sections[0].split(':').filter(value => value !== '');
const end = sections[1].split(':').filter(value => value !== '');
components = beginning.concat(Array(8 - beginning.length - end.length).fill('0'), end);
} else {
throw new Error('Invalid IPv6 address contains more than one instance of ::');
}
return components.map(component => BigInt('0x' + component)).reduce((accumulator, current) => accumulator * 65536n + current, 0n);
}
function parseIP(address: string): bigint {
switch (net.isIP(address)) {
case 4:
return parseIPv4(address);
case 6:
return parseIPv6(address);
default:
throw new Error(`Invalid IP address ${address}`);
}
}
export function formatIPv4(address: bigint): string {
const reverseComponents: bigint[] = [];
for (let i = 0; i < IPV4_COMPONENT_COUNT; i++) {
reverseComponents.push(address % IPV4_COMPONENT_CAP);
address = address / IPV4_COMPONENT_CAP;
}
return reverseComponents.reverse().map(component => component.toString(10)).join('.');
}
export function formatIPv6(address: bigint): string {
const reverseComponents: bigint[] = [];
for (let i = 0; i < IPV6_COMPONENT_COUNT; i++) {
reverseComponents.push(address % IPV6_COMPONENT_CAP);
address = address / IPV6_COMPONENT_CAP;
}
const components = reverseComponents.reverse();
/* Find the longest run of consecutive 0 values in the list of components, to
* replace it with :: in the output */
let maxZeroRunIndex = 0;
let maxZeroRunLength = 0;
let inZeroRun = false;
let currentZeroRunIndex = 0;
let currentZeroRunLength = 0;
for (let i = 0; i < components.length; i++) {
if (components[i] === 0n) {
if (inZeroRun) {
currentZeroRunLength += 1;
} else {
inZeroRun = true;
currentZeroRunIndex = i;
currentZeroRunLength = 1;
}
if (currentZeroRunLength > maxZeroRunLength) {
maxZeroRunIndex = currentZeroRunIndex;
maxZeroRunLength = currentZeroRunLength;
}
} else {
currentZeroRunLength = 0;
inZeroRun = false;
}
}
if (maxZeroRunLength >= 2) {
const beginning = components.slice(0, maxZeroRunIndex);
const end = components.slice(maxZeroRunIndex + maxZeroRunLength);
return beginning.map(value => value.toString(16)).join(':') + '::' + end.map(value => value.toString(16)).join(':');
} else {
return components.map(value => value.toString(16)).join(':');
}
}
function getSubnetMaskIPv4(prefixLen: number) {
return ~((1n << (IPV4_TOTAL_SIZE - BigInt(prefixLen))) - 1n);
}
function getSubnetMaskIPv6(prefixLen: number) {
return ~((1n << (IPV6_TOTAL_SIZE - BigInt(prefixLen))) - 1n);
}
export function firstNBitsIPv4(address: string, prefixLen: number): string {
const addressNum = parseIPv4(address);
const prefixMask = getSubnetMaskIPv4(prefixLen);
return formatIPv4(addressNum & prefixMask);
}
export function firstNBitsIPv6(address: string, prefixLen: number): string {
const addressNum = parseIPv6(address);
const prefixMask = getSubnetMaskIPv6(prefixLen);
return formatIPv6(addressNum & prefixMask);
}
export function normalizeCidrRange(range: CidrRange): CidrRange {
switch (net.isIP(range.addressPrefix)) {
case 4: {
const prefixLen = Math.min(Math.max(range.prefixLen, 0), 32);
return {
addressPrefix: firstNBitsIPv4(range.addressPrefix, prefixLen),
prefixLen: prefixLen
};
}
case 6: {
const prefixLen = Math.min(Math.max(range.prefixLen, 0), 128);
return {
addressPrefix: firstNBitsIPv6(range.addressPrefix, prefixLen),
prefixLen: prefixLen
};
}
default:
throw new Error(`Invalid IP address prefix ${range.addressPrefix}`);
}
}
export function getCidrRangeSubnetMask(range: CidrRange): bigint {
switch (net.isIP(range.addressPrefix)) {
case 4:
return getSubnetMaskIPv4(range.prefixLen);
case 6:
return getSubnetMaskIPv6(range.prefixLen);
default:
throw new Error('Invalid CIDR range');
}
}
export function inCidrRange(range: CidrRange, address: string): boolean {
if (net.isIP(range.addressPrefix) !== net.isIP(address)) {
return false;
}
return (parseIP(address) & getCidrRangeSubnetMask(range)) === parseIP(range.addressPrefix);
}
export function cidrRangeEqual(range1: CidrRange | undefined, range2: CidrRange | undefined): boolean {
if (range1 === undefined && range2 === undefined) {
return true;
}
if (range1 === undefined || range2 === undefined) {
return false;
}
return range1.addressPrefix === range2.addressPrefix && range1.prefixLen === range2.prefixLen;
}
export function cidrRangeMessageToCidrRange(message: CidrRange__Output): CidrRange {
return {
addressPrefix: message.address_prefix,
prefixLen: message.prefix_len?.value ?? 0
};
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Types and function from https://stackoverflow.com/a/72059390/159388, with modifications
type ElementType<A> = A extends ReadonlyArray<infer T> ? T | undefined : never;
type ElementsOfAll<Inputs, R extends ReadonlyArray<unknown> = []> = Inputs extends readonly [infer F, ...infer M] ? ElementsOfAll<M, [...R, ElementType<F>]> : R;
type CartesianProduct<Inputs> = ElementsOfAll<Inputs>[];
/**
* Get the cross product or Cartesian product of a list of groups. The
* implementation is copied, with some modifications, from
* https://stackoverflow.com/a/72059390/159388.
* @param sets A list of groups of elements
* @returns A list of all possible combinations of one element from each group
* in sets. Empty groups will result in undefined in that slot in each
* combination.
*/
export function crossProduct<Sets extends ReadonlyArray<ReadonlyArray<unknown>>>(sets: Sets): CartesianProduct<Sets> {
/* The input is an array of arrays, and the expected output is an array of
* each possible combination of one element each of the input arrays, with
* the exception that if one of the input arrays is empty, each combination
* gets [undefined] in that slot.
*
* At each step in the reduce call, we start with the cross product of the
* first N groups, and the next group. For each combation, for each element
* of the next group, extend the combination with that element.
*
* The type assertion at the end is needed because TypeScript doesn't track
* the types well enough through the reduce calls to see that the result has
* the expected type.
*/
return sets.map(x => x.length === 0 ? [undefined] : x).reduce((combinations: unknown[][], nextGroup) => combinations.flatMap(combination => nextGroup.map(element => [...combination, element])), [[]] as unknown[][]) as CartesianProduct<Sets>;
}

View File

@ -31,6 +31,8 @@ import * as round_robin_lb from './lb-policy-registry/round-robin';
import * as typed_struct_lb from './lb-policy-registry/typed-struct';
import * as pick_first_lb from './lb-policy-registry/pick-first';
export { XdsServer } from './server';
/**
* Register the "xds:" name scheme with the @grpc/grpc-js library.
*/

View File

@ -29,11 +29,8 @@ import { Listener__Output } from './generated/envoy/config/listener/v3/Listener'
import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration';
import { HttpConnectionManager__Output } from './generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager';
import { VirtualHost__Output } from './generated/envoy/config/route/v3/VirtualHost';
import { RouteMatch__Output } from './generated/envoy/config/route/v3/RouteMatch';
import { HeaderMatcher__Output } from './generated/envoy/config/route/v3/HeaderMatcher';
import ConfigSelector = experimental.ConfigSelector;
import { ContainsValueMatcher, ExactValueMatcher, FullMatcher, HeaderMatcher, Matcher, PathExactValueMatcher, PathPrefixValueMatcher, PathSafeRegexValueMatcher, PrefixValueMatcher, PresentValueMatcher, RangeValueMatcher, RejectValueMatcher, SafeRegexValueMatcher, SuffixValueMatcher, ValueMatcher } from './matcher';
import { envoyFractionToFraction, Fraction } from "./fraction";
import { Matcher } from './matcher';
import { HashPolicy, RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action';
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resources';
import Duration = experimental.Duration;
@ -47,6 +44,8 @@ import { ListenerResourceType } from './xds-resource-type/listener-resource-type
import { RouteConfigurationResourceType } from './xds-resource-type/route-config-resource-type';
import { protoDurationToDuration } from './duration';
import { loadXxhashApi } from './xxhash';
import { formatTemplateString } from './xds-bootstrap';
import { getPredicateForMatcher } from './route';
const TRACER_NAME = 'xds_resolver';
@ -97,8 +96,12 @@ function domainMatch(matchType: MatchType, domainPattern: string, expectedHostNa
}
}
function findVirtualHostForDomain(virutalHostList: VirtualHost__Output[], domain: string): VirtualHost__Output | null {
let targetVhost: VirtualHost__Output | null = null;
interface HasDomains {
domains: string[];
}
export function findVirtualHostForDomain<T extends HasDomains>(virutalHostList: T[], domain: string): T | null {
let targetVhost: T | null = null;
let bestMatchType: MatchType = MatchType.INVALID_MATCH;
let longestMatch = 0;
for (const virtualHost of virutalHostList) {
@ -130,81 +133,6 @@ function findVirtualHostForDomain(virutalHostList: VirtualHost__Output[], domain
const numberRegex = new RE2(/^-?\d+$/u);
function getPredicateForHeaderMatcher(headerMatch: HeaderMatcher__Output): Matcher {
let valueChecker: ValueMatcher;
switch (headerMatch.header_match_specifier) {
case 'exact_match':
valueChecker = new ExactValueMatcher(headerMatch.exact_match!, false);
break;
case 'safe_regex_match':
valueChecker = new SafeRegexValueMatcher(headerMatch.safe_regex_match!.regex);
break;
case 'range_match':
const start = BigInt(headerMatch.range_match!.start);
const end = BigInt(headerMatch.range_match!.end);
valueChecker = new RangeValueMatcher(start, end);
break;
case 'present_match':
valueChecker = new PresentValueMatcher();
break;
case 'prefix_match':
valueChecker = new PrefixValueMatcher(headerMatch.prefix_match!, false);
break;
case 'suffix_match':
valueChecker = new SuffixValueMatcher(headerMatch.suffix_match!, false);
break;
case 'string_match':
const stringMatch = headerMatch.string_match!
switch (stringMatch.match_pattern) {
case 'exact':
valueChecker = new ExactValueMatcher(stringMatch.exact!, stringMatch.ignore_case);
break;
case 'safe_regex':
valueChecker = new SafeRegexValueMatcher(stringMatch.safe_regex!.regex);
break;
case 'prefix':
valueChecker = new PrefixValueMatcher(stringMatch.prefix!, stringMatch.ignore_case);
break;
case 'suffix':
valueChecker = new SuffixValueMatcher(stringMatch.suffix!, stringMatch.ignore_case);
break;
case 'contains':
valueChecker = new ContainsValueMatcher(stringMatch.contains!, stringMatch.ignore_case);
break;
}
break;
default:
valueChecker = new RejectValueMatcher();
}
return new HeaderMatcher(headerMatch.name, valueChecker, headerMatch.invert_match);
}
function getPredicateForMatcher(routeMatch: RouteMatch__Output): Matcher {
let pathMatcher: ValueMatcher;
const caseInsensitive = routeMatch.case_sensitive?.value === false;
switch (routeMatch.path_specifier) {
case 'prefix':
pathMatcher = new PathPrefixValueMatcher(routeMatch.prefix!, caseInsensitive);
break;
case 'path':
pathMatcher = new PathExactValueMatcher(routeMatch.path!, caseInsensitive);
break;
case 'safe_regex':
pathMatcher = new PathSafeRegexValueMatcher(routeMatch.safe_regex!.regex);
break;
default:
pathMatcher = new RejectValueMatcher();
}
const headerMatchers: Matcher[] = routeMatch.headers.map(getPredicateForHeaderMatcher);
let runtimeFraction: Fraction | null;
if (!routeMatch.runtime_fraction?.default_value) {
runtimeFraction = null;
} else {
runtimeFraction = envoyFractionToFraction(routeMatch.runtime_fraction.default_value)
}
return new FullMatcher(pathMatcher, headerMatchers, runtimeFraction);
}
function protoDurationToSecondsString(duration: Duration__Output): string {
return `${duration.seconds + duration.nanos / 1_000_000_000}s`;
}
@ -215,23 +143,6 @@ function getDefaultRetryMaxInterval(baseInterval: string): string {
return `${Number.parseFloat(baseInterval.substring(0, baseInterval.length - 1)) * 10}s`;
}
/**
* Encode a text string as a valid path of a URI, as specified in RFC-3986 section 3.3
* @param uriPath A value representing an unencoded URI path
* @returns
*/
function encodeURIPath(uriPath: string): string {
return uriPath.replace(/[^A-Za-z0-9._~!$&^()*+,;=/-]/g, substring => encodeURIComponent(substring));
}
function formatTemplateString(templateString: string, value: string): string {
if (templateString.startsWith('xdstp:')) {
return templateString.replace(/%s/g, encodeURIPath(value));
} else {
return templateString.replace(/%s/g, value);
}
}
export function getListenerResourceName(bootstrapConfig: BootstrapInfo, target: GrpcUri): string {
if (target.authority && target.authority !== '') {
if (target.authority in bootstrapConfig.authorities) {
@ -325,6 +236,7 @@ class XdsResolver implements Resolver {
case 'route_config':
if (this.latestRouteConfigName) {
RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher);
this.latestRouteConfigName = null;
}
this.handleRouteConfig(httpConnectionManager.route_config!);
break;
@ -342,6 +254,10 @@ class XdsResolver implements Resolver {
},
onResourceDoesNotExist: () => {
trace('Resolution error for target ' + uriToString(this.target) + ': LDS resource does not exist');
if (this.latestRouteConfigName) {
RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher);
this.latestRouteConfigName = null;
}
this.reportResolutionError(`Listener ${this.target} does not exist`);
}
});

View File

@ -0,0 +1,94 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { RouteMatch__Output } from './generated/envoy/config/route/v3/RouteMatch';
import { HeaderMatcher__Output } from './generated/envoy/config/route/v3/HeaderMatcher';
import { ContainsValueMatcher, ExactValueMatcher, FullMatcher, HeaderMatcher, Matcher, PathExactValueMatcher, PathPrefixValueMatcher, PathSafeRegexValueMatcher, PrefixValueMatcher, PresentValueMatcher, RangeValueMatcher, RejectValueMatcher, SafeRegexValueMatcher, SuffixValueMatcher, ValueMatcher } from './matcher';
import { envoyFractionToFraction, Fraction } from "./fraction";
function getPredicateForHeaderMatcher(headerMatch: HeaderMatcher__Output): Matcher {
let valueChecker: ValueMatcher;
switch (headerMatch.header_match_specifier) {
case 'exact_match':
valueChecker = new ExactValueMatcher(headerMatch.exact_match!, false);
break;
case 'safe_regex_match':
valueChecker = new SafeRegexValueMatcher(headerMatch.safe_regex_match!.regex);
break;
case 'range_match':
const start = BigInt(headerMatch.range_match!.start);
const end = BigInt(headerMatch.range_match!.end);
valueChecker = new RangeValueMatcher(start, end);
break;
case 'present_match':
valueChecker = new PresentValueMatcher();
break;
case 'prefix_match':
valueChecker = new PrefixValueMatcher(headerMatch.prefix_match!, false);
break;
case 'suffix_match':
valueChecker = new SuffixValueMatcher(headerMatch.suffix_match!, false);
break;
case 'string_match':
const stringMatch = headerMatch.string_match!;
switch (stringMatch.match_pattern) {
case 'exact':
valueChecker = new ExactValueMatcher(stringMatch.exact!, stringMatch.ignore_case);
break;
case 'safe_regex':
valueChecker = new SafeRegexValueMatcher(stringMatch.safe_regex!.regex);
break;
case 'prefix':
valueChecker = new PrefixValueMatcher(stringMatch.prefix!, stringMatch.ignore_case);
break;
case 'suffix':
valueChecker = new SuffixValueMatcher(stringMatch.suffix!, stringMatch.ignore_case);
break;
case 'contains':
valueChecker = new ContainsValueMatcher(stringMatch.contains!, stringMatch.ignore_case);
break;
}
break;
default:
valueChecker = new RejectValueMatcher();
}
return new HeaderMatcher(headerMatch.name, valueChecker, headerMatch.invert_match);
}
export function getPredicateForMatcher(routeMatch: RouteMatch__Output): Matcher {
let pathMatcher: ValueMatcher;
const caseInsensitive = routeMatch.case_sensitive?.value === false;
switch (routeMatch.path_specifier) {
case 'prefix':
pathMatcher = new PathPrefixValueMatcher(routeMatch.prefix!, caseInsensitive);
break;
case 'path':
pathMatcher = new PathExactValueMatcher(routeMatch.path!, caseInsensitive);
break;
case 'safe_regex':
pathMatcher = new PathSafeRegexValueMatcher(routeMatch.safe_regex!.regex);
break;
default:
pathMatcher = new RejectValueMatcher();
}
const headerMatchers: Matcher[] = routeMatch.headers.map(getPredicateForHeaderMatcher);
let runtimeFraction: Fraction | null;
if (!routeMatch.runtime_fraction?.default_value) {
runtimeFraction = null;
} else {
runtimeFraction = envoyFractionToFraction(routeMatch.runtime_fraction.default_value);
}
return new FullMatcher(pathMatcher, headerMatchers, runtimeFraction);
}

View File

@ -0,0 +1,139 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Listener__Output } from "./generated/envoy/config/listener/v3/Listener";
import { FilterChain__Output } from "./generated/envoy/config/listener/v3/FilterChain";
import { UInt32Value__Output } from "./generated/google/protobuf/UInt32Value";
import { CidrRange__Output } from "./generated/envoy/config/core/v3/CidrRange";
function nullableValueEquals<T>(first: T | null, second: T | null, valueEquals: (a: T, b: T) => boolean): boolean {
if (first === null && second === null) {
return true;
}
if (first === null && second === null) {
return false;
}
return valueEquals(first!, second!);
}
function arrayEquals<T>(first: T[], second: T[], elementEquals: (a: T, b: T) => boolean = (a, b) => a === b): boolean {
if (first.length !== second.length) {
return false;
}
for (let i = 0; i < first.length; i++) {
if (!elementEquals(first[i], second[i])) {
return false;
}
}
return true;
}
function uint32ValueEquals(first: UInt32Value__Output, second: UInt32Value__Output): boolean {
return first.value === second.value;
}
function cidrRangeEquals(first: CidrRange__Output, second: CidrRange__Output): boolean {
return first.address_prefix === second.address_prefix && nullableValueEquals(first.prefix_len, second.prefix_len, uint32ValueEquals);
}
function filterChainsEquivalent(first: FilterChain__Output, second: FilterChain__Output): boolean {
if (first.filters.length !== second.filters.length) {
return false;
}
for (let i = 0; i < first.filters.length; i++) {
const firstFilter = first.filters[i];
const secondFilter = second.filters[i];
if (!firstFilter.typed_config && !secondFilter.typed_config) {
continue;
}
if (!firstFilter.typed_config || !secondFilter.typed_config) {
return false;
}
if (firstFilter.typed_config.type_url !== secondFilter.typed_config.type_url) {
return false;
}
if (!firstFilter.typed_config.value.equals(secondFilter.typed_config.value)) {
return false;
}
}
if ((first.filter_chain_match === null) !== (second.filter_chain_match === null)) {
return false;
}
if (first.filter_chain_match) {
const firstMatch = first.filter_chain_match;
const secondMatch = second.filter_chain_match!;
if (firstMatch.address_suffix !== secondMatch.address_suffix) {
return false;
}
if (!arrayEquals(firstMatch.application_protocols, secondMatch.application_protocols)) {
return false;
}
if (!nullableValueEquals(firstMatch.destination_port, secondMatch.destination_port, uint32ValueEquals)) {
return false;
}
if (!arrayEquals(firstMatch.direct_source_prefix_ranges, secondMatch.direct_source_prefix_ranges, cidrRangeEquals)) {
return false;
}
if (!arrayEquals(firstMatch.prefix_ranges, secondMatch.prefix_ranges, cidrRangeEquals)) {
return false;
}
if (!arrayEquals(firstMatch.server_names, secondMatch.server_names)) {
return false;
}
if (!arrayEquals(firstMatch.source_ports, secondMatch.source_ports)) {
return false;
}
if (!arrayEquals(firstMatch.source_prefix_ranges, secondMatch.source_prefix_ranges, cidrRangeEquals)) {
return false;
}
if (firstMatch.source_type !== secondMatch.source_type) {
return false;
}
if (!nullableValueEquals(firstMatch.suffix_len, secondMatch.suffix_len, uint32ValueEquals)) {
return false;
}
if (firstMatch.transport_protocol !== secondMatch.transport_protocol) {
return false;
}
}
return true;
}
/**
* Tests whether two listener resources are equivalent with respect to the
* fields that the server uses.
* @param first
* @param second
*/
export function listenersEquivalent(first: Listener__Output, second: Listener__Output): boolean {
if (first.address?.socket_address?.address !== second.address?.socket_address?.address) {
return false;
}
if (first.address?.socket_address?.port_value !== second.address?.socket_address?.port_value) {
return false;
}
if (!nullableValueEquals(first.default_filter_chain, second.default_filter_chain, filterChainsEquivalent)) {
return false;
}
if (first.filter_chains.length !== second.filter_chains.length) {
return false;
}
for (let i = 0; i < first.filter_chains.length; i++) {
if (!filterChainsEquivalent(first.filter_chains[i], second.filter_chains[i])) {
return false;
}
}
return true;
}

View File

@ -0,0 +1,643 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ConnectionInjector, Metadata, Server, ServerCredentials, ServerInterceptingCall, ServerInterceptor, ServerOptions, StatusObject, experimental, logVerbosity, status } from "@grpc/grpc-js";
import { BootstrapInfo, formatTemplateString, loadBootstrapInfo, validateBootstrapConfig } from "./xds-bootstrap";
import * as net from "net";
import HostPort = experimental.HostPort;
import splitHostPort = experimental.splitHostPort;
import createServerCredentialsWithInterceptors = experimental.createServerCredentialsWithInterceptors;
import { Watcher, XdsClient, getSingletonXdsClient } from "./xds-client";
import { Listener__Output } from "./generated/envoy/config/listener/v3/Listener";
import { RouteConfiguration__Output } from "./generated/envoy/config/route/v3/RouteConfiguration";
import { RouteConfigurationResourceType } from "./xds-resource-type/route-config-resource-type";
import { ListenerResourceType } from "./xds-resource-type/listener-resource-type";
import { FilterChainMatch__Output, _envoy_config_listener_v3_FilterChainMatch_ConnectionSourceType } from "./generated/envoy/config/listener/v3/FilterChainMatch";
import { CidrRange, cidrRangeEqual, cidrRangeMessageToCidrRange, inCidrRange, normalizeCidrRange } from "./cidr";
import { Matcher } from "./matcher";
import { listenersEquivalent } from "./server-listener";
import { HTTP_CONNECTION_MANGER_TYPE_URL, decodeSingleResource } from "./resources";
import { FilterChain__Output } from "./generated/envoy/config/listener/v3/FilterChain";
import { getPredicateForMatcher } from "./route";
import { crossProduct } from "./cross-product";
import { findVirtualHostForDomain } from "./resolver-xds";
import { LogVerbosity } from "@grpc/grpc-js/build/src/constants";
const TRACER_NAME = 'xds_server';
function trace(text: string) {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
/**
* Validates that a listening address to be bound is valid for use in the xDS
* server: It must be in the form IP:port, and port must be non-zero.
* @param listeningAddress
* @returns
*/
function isValidIpPort(hostPort: HostPort): boolean {
return hostPort !== null && (net.isIPv4(hostPort.host) || net.isIPv6(hostPort.host)) && hostPort.port !== undefined && hostPort.port > 0;
}
type ConnectionSourceType = keyof typeof _envoy_config_listener_v3_FilterChainMatch_ConnectionSourceType;
interface NormalizedFilterChainMatch {
prefixRange?: CidrRange;
sourceType: ConnectionSourceType;
sourcePrefixRange?: CidrRange;
sourcePort?: number;
}
interface RouteEntry {
matcher: Matcher;
isNonForwardingAction: boolean;
}
interface VirtualHostEntry {
domains: string[];
routes: RouteEntry[];
}
const routeErrorStatus = {
code: status.UNAVAILABLE,
details: 'Routing error'
};
interface ConfigParameters {
xdsClient: XdsClient;
createConnectionInjector: (credentials: ServerCredentials) => ConnectionInjector;
drainGraceTimeMs: number;
listenerResourceNameTemplate: string;
}
class FilterChainEntry {
private matchers: NormalizedFilterChainMatch[];
private rdsName: string | null = null;
private routeConfigWatcher: Watcher<RouteConfiguration__Output>;
private rdsError: string | null = null;
private virtualHosts: VirtualHostEntry[] | null = null;
private connectionInjector: ConnectionInjector;
private hasRouteConfigErrors = false;
constructor(private configParameters: ConfigParameters, filterChain: FilterChain__Output, credentials: ServerCredentials, onRouteConfigPopulated: () => void) {
this.matchers = normalizeFilterChainMatch(filterChain.filter_chain_match);
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, filterChain.filters[0].typed_config!.value);
trace('Populating FilterChainEntry from HttpConncectionManager ' + JSON.stringify(httpConnectionManager, undefined, 2));
this.routeConfigWatcher = new Watcher<RouteConfiguration__Output>({
onResourceChanged: (resource: RouteConfiguration__Output) => {
if (this.rdsError) {
experimental.log(logVerbosity.ERROR, 'Retrieved previously missing RouteConfiguration resource ' + this.rdsName);
}
this.rdsError = null;
this.handleRouteConfigurationResource(resource);
onRouteConfigPopulated();
},
onResourceDoesNotExist: () => {
this.virtualHosts = null;
this.rdsError = `Resource does not exist`;
this.logConfigurationError(this.rdsError);
onRouteConfigPopulated();
},
onError: (status: StatusObject) => {
if (!this.virtualHosts) {
this.rdsError = `Error retrieving resource: ${status.details}`;
this.logConfigurationError(this.rdsError);
}
onRouteConfigPopulated();
}
});
if (httpConnectionManager.route_config) {
this.handleRouteConfigurationResource(httpConnectionManager.route_config);
process.nextTick(onRouteConfigPopulated);
} else if (httpConnectionManager.rds) {
this.rdsName = httpConnectionManager.rds.route_config_name;
RouteConfigurationResourceType.startWatch(this.configParameters.xdsClient, this.rdsName, this.routeConfigWatcher);
}
const interceptor: ServerInterceptor = (methodDescriptor, call) => {
return new ServerInterceptingCall(call, {
start: (next) => {
next({
onReceiveMetadata: (metadata, next) => {
if (!this.virtualHosts) {
call.sendStatus(routeErrorStatus);
return;
}
const virtualHost = findVirtualHostForDomain(this.virtualHosts, call.getHost());
if (!virtualHost) {
call.sendStatus(routeErrorStatus);
return;
}
for (const route of virtualHost.routes) {
if (route.matcher.apply(methodDescriptor.path, metadata)) {
if (route.isNonForwardingAction) {
next(metadata);
} else {
call.sendStatus(routeErrorStatus);
}
return;
}
}
call.sendStatus(routeErrorStatus);
}
});
}
});
}
const interceptingCredentials = createServerCredentialsWithInterceptors(credentials, [interceptor]);
this.connectionInjector = configParameters.createConnectionInjector(interceptingCredentials);
}
private handleRouteConfigurationResource(routeConfig: RouteConfiguration__Output) {
let hasRouteConfigErrors = false;
this.virtualHosts = [];
for (const virtualHost of routeConfig.virtual_hosts) {
const virtualHostEntry: VirtualHostEntry = {
domains: virtualHost.domains,
routes: []
};
for (const route of virtualHost.routes) {
const routeEntry: RouteEntry = {
matcher: getPredicateForMatcher(route.match!),
isNonForwardingAction: route.action === 'non_forwarding_action'
};
if (!routeEntry.isNonForwardingAction) {
hasRouteConfigErrors = true;
this.logConfigurationError('For domains matching [' + virtualHostEntry.domains + '] requests will be rejected for routes matching ' + routeEntry.matcher.toString());
}
virtualHostEntry.routes.push(routeEntry);
}
this.virtualHosts.push(virtualHostEntry);
}
if (this.hasRouteConfigErrors && !hasRouteConfigErrors) {
experimental.log(logVerbosity.ERROR, 'Routes will no longer reject requests for RouteConfiguration ' + this.rdsName);
}
this.hasRouteConfigErrors = hasRouteConfigErrors;
}
private logConfigurationError(text: string) {
experimental.log(logVerbosity.ERROR, 'RouteConfiguration error (' + this.rdsName + '): ' + text);
}
getMatchers(): NormalizedFilterChainMatch[] {
return this.matchers;
}
isPopulated(): boolean {
return !!(this.virtualHosts || this.rdsError);
}
handleConnection(socket: net.Socket) {
this.connectionInjector.injectConnection(socket);
}
shutdown() {
this.connectionInjector.drain(this.configParameters.drainGraceTimeMs);
this.connectionInjector.destroy();
if (this.rdsName) {
RouteConfigurationResourceType.cancelWatch(this.configParameters.xdsClient, this.rdsName, this.routeConfigWatcher);
}
}
drain(graceTimeMs: number) {
this.connectionInjector.drain(graceTimeMs);
}
}
class ListenerConfig {
private filterChainEntries: FilterChainEntry[];
private defaultFilterChain: FilterChainEntry | null = null;
private reportedReadyToUse = false;
constructor(private configParameters: ConfigParameters, private resource: Listener__Output, credentials: ServerCredentials, private onReadyToUse: () => void) {
trace('Populating ListenerConfig from listener ' + resource.name);
this.filterChainEntries = [];
for (const filterChain of resource.filter_chains) {
this.filterChainEntries.push(new FilterChainEntry(configParameters, filterChain, credentials, () => this.maybeReportReadyToUse()));
}
if (resource.default_filter_chain) {
this.defaultFilterChain = new FilterChainEntry(configParameters, resource.default_filter_chain, credentials, () => this.maybeReportReadyToUse());
}
}
private maybeReportReadyToUse() {
if (this.reportedReadyToUse) {
return;
}
for (const entry of this.filterChainEntries) {
if (!entry.isPopulated()) {
return;
}
}
if (this.defaultFilterChain && !this.defaultFilterChain.isPopulated()) {
return;
}
this.reportedReadyToUse = true;
this.onReadyToUse();
}
isEquivalentResource(listener: Listener__Output): boolean {
return listenersEquivalent(listener, this.resource);
}
handleConnection(socket: net.Socket) {
const matchingFilter = selectMostSpecificallyMatchingFilter(this.filterChainEntries, socket) ?? this.defaultFilterChain;
if (!matchingFilter) {
socket.destroy();
return;
}
matchingFilter.handleConnection(socket);
}
shutdown() {
for (const entry of this.filterChainEntries) {
entry.shutdown();
}
this.defaultFilterChain?.shutdown();
}
drain(graceTimeMs: number) {
this.filterChainEntries.forEach(entry => entry.drain(graceTimeMs));
this.defaultFilterChain?.drain(graceTimeMs);
}
}
interface ServingStatusListener {
(servingStatus: StatusObject): void;
}
class BoundPortEntry {
private listenerWatcher: Watcher<Listener__Output>;
private servingStatus: StatusObject;
private tcpServer: net.Server;
private currentConfig: ListenerConfig | null = null;
private pendingConfig: ListenerConfig | null = null;
private servingStatusListeners: Set<ServingStatusListener> = new Set();
constructor(private configParameters: ConfigParameters, private boundAddress: string, private credentials: ServerCredentials) {
this.listenerWatcher = new Watcher<Listener__Output>({
onResourceChanged: (resource: Listener__Output) => {
trace('Received listener resource ' + resource.name);
this.handleListenerResource(resource);
},
onResourceDoesNotExist: () => {
this.currentConfig?.shutdown();
this.currentConfig = null;
this.pendingConfig?.shutdown();
this.pendingConfig = null;
},
onError: (status: StatusObject) => {
if (!this.currentConfig && !this.pendingConfig) {
this.updateServingStatus(status);
}
}
});
this.tcpServer = new net.Server(socket => {
if (this.currentConfig && this.servingStatus.code === status.OK) {
this.currentConfig.handleConnection(socket);
} else {
socket.destroy();
}
});
this.servingStatus = {
code: status.UNAVAILABLE,
details: 'Not yet serving',
metadata: new Metadata()
};
const resourceName = formatTemplateString(configParameters.listenerResourceNameTemplate, boundAddress);
trace('Watching for listener resource ' + resourceName);
ListenerResourceType.startWatch(configParameters.xdsClient, resourceName, this.listenerWatcher);
}
private updateServingStatus(status: StatusObject) {
this.servingStatus = status;
this.servingStatusListeners.forEach(listener => listener(status));
}
private handleListenerResource(listener: Listener__Output) {
trace('handleListenerResource(' + listener.name + ')');
if (!listener.address?.socket_address) {
const errorText = `No socket_address set in Listener resource for port ${this.boundAddress}`;
trace('Error handling listener resource: ' + errorText);
this.updateServingStatus({
code: status.UNAVAILABLE,
details: errorText,
metadata: new Metadata()
});
return;
}
const listeningAddress = splitHostPort(this.boundAddress);
if (!listeningAddress) {
const errorText = `Could not parse bound address ${this.boundAddress}`;
trace('Error handling listener resource: ' + errorText);
this.updateServingStatus({
code: status.UNAVAILABLE,
details: errorText,
metadata: new Metadata()
});
return;
}
if (!(listener.address.socket_address.address === listeningAddress.host && listener.address.socket_address.port_value === listeningAddress.port)) {
const errorText = `socket_address mismatch for port ${this.boundAddress}: got '${listener.address.socket_address.address}:${listener.address.socket_address.port_value}'`;
trace('Error handling listener resource: ' + errorText);
this.updateServingStatus({
code: status.UNAVAILABLE,
details: errorText,
metadata: new Metadata()
});
return;
}
if (this.currentConfig?.isEquivalentResource(listener)) {
trace('Listener resource equivalent to current resource');
this.pendingConfig?.shutdown();
this.pendingConfig = null;
return;
}
if (this.pendingConfig?.isEquivalentResource(listener)) {
trace('Listener resource equivalent to pending resource');
return;
}
this.pendingConfig?.shutdown();
this.pendingConfig = new ListenerConfig(this.configParameters, listener, this.credentials, () => this.startUsingPendingConfig());
}
private maybeStartServing() {
if (this.currentConfig && this.tcpServer.listening) {
this.updateServingStatus({
code: status.OK,
details: '',
metadata: new Metadata()
});
}
}
private startUsingPendingConfig() {
this.currentConfig?.shutdown();
this.currentConfig = this.pendingConfig;
this.pendingConfig = null;
if (!this.tcpServer.listening) {
const listeningAddress = splitHostPort(this.boundAddress);
if (listeningAddress) {
this.tcpServer.listen(listeningAddress?.port, () => {
this.maybeStartServing();
})
}
}
this.maybeStartServing();
}
addServingStatusListener(listener: ServingStatusListener) {
this.servingStatusListeners.add(listener);
}
removeServingStatusListener(listener: ServingStatusListener) {
this.servingStatusListeners.delete(listener);
}
drain(graceTimeMs: number) {
this.currentConfig?.drain(graceTimeMs);
}
unbind() {
this.currentConfig?.shutdown();
this.pendingConfig?.shutdown();
this.tcpServer.close();
const resourceName = formatTemplateString(this.configParameters.listenerResourceNameTemplate, this.boundAddress);
ListenerResourceType.cancelWatch(this.configParameters.xdsClient, resourceName, this.listenerWatcher);
}
}
function normalizeFilterChainMatch(filterChainMatch: FilterChainMatch__Output | null): NormalizedFilterChainMatch[] {
if (!filterChainMatch) {
return [];
}
if (filterChainMatch.destination_port) {
return [];
}
if (filterChainMatch.server_names.length > 0) {
return [];
}
if (filterChainMatch.transport_protocol !== 'raw_buffer') {
return [];
}
if (filterChainMatch.application_protocols.length > 0) {
return [];
}
const normalizedPrefixRanges = filterChainMatch.prefix_ranges.map(cidrRangeMessageToCidrRange).map(normalizeCidrRange);
const normalizedSourcePrefixRanges = filterChainMatch.source_prefix_ranges.map(cidrRangeMessageToCidrRange).map(normalizeCidrRange);
const fieldCrossProduct = crossProduct([normalizedPrefixRanges, normalizedSourcePrefixRanges, filterChainMatch.source_ports] as [CidrRange[], CidrRange[], number[]]);
return fieldCrossProduct.map(([prefixRange, sourcePrefixRange, sourcePort]) => ({prefixRange, sourceType: filterChainMatch.source_type, sourcePrefixRange, sourcePort}));
}
function isSameIpOrLoopback(remoteAddress: string, localAddress: string): boolean {
return remoteAddress === '127.0.0.1' || remoteAddress === '::1' || remoteAddress === localAddress;
}
interface MatchFieldEvaluator<MatcherType, FieldType> {
isMatch: (matcher: MatcherType, field: FieldType) => boolean;
matcherEqual: (matcher1: MatcherType, matcher2: MatcherType) => boolean;
/**
* Returns true if matcher1 is more specific than matcher2.
*
* Note: this comparison will only make sense if the field value in
* consideration matches both matchers.
* @param matcher1
* @param matcher2
* @returns
*/
isMoreSpecific: (matcher1: MatcherType, matcher2: MatcherType) => boolean;
}
type FieldType<MatcherType> = MatcherType extends CidrRange ? (string | undefined) : MatcherType extends (ConnectionSourceType) ? {localAddress: string, remoteAddress?: (string | undefined)} : MatcherType extends number ? number | undefined : never;
function cidrRangeMatch(range: CidrRange | undefined, address: string | undefined): boolean {
return !range || (!!address && inCidrRange(range, address));
}
function cidrRangeMoreSpecific(range1: CidrRange | undefined, range2: CidrRange | undefined): boolean {
if (!range2) {
return !!range1;
}
return !!range1 && range1.prefixLen > range2.prefixLen;
}
function sourceTypeMatch(sourceType: ConnectionSourceType, addresses: {localAddress: string, remoteAddress?: (string | undefined)}): boolean {
switch (sourceType) {
case "ANY":
return true;
case "SAME_IP_OR_LOOPBACK":
return !!addresses.remoteAddress && isSameIpOrLoopback(addresses.remoteAddress, addresses.localAddress);
case "EXTERNAL":
return !!addresses.remoteAddress && !isSameIpOrLoopback(addresses.remoteAddress, addresses.localAddress);
}
}
const cidrRangeEvaluator: MatchFieldEvaluator<CidrRange | undefined, string | undefined> = {
isMatch: cidrRangeMatch,
matcherEqual: cidrRangeEqual,
isMoreSpecific: cidrRangeMoreSpecific
};
const sourceTypeEvaluator: MatchFieldEvaluator<ConnectionSourceType, {localAddress: string, remoteAddress?: (string | undefined)}> = {
isMatch: sourceTypeMatch,
matcherEqual: (matcher1, matcher2) => matcher1 === matcher2,
isMoreSpecific: (matcher1, matcher2) => matcher1 !== 'ANY' && matcher2 === 'ANY'
};
const portEvaluator: MatchFieldEvaluator<number | undefined, number | undefined> = {
isMatch: (matcher, actual) => matcher === undefined || matcher === actual,
matcherEqual: (matcher1, matcher2) => matcher1 === matcher2,
isMoreSpecific: (matcher1, matcher2) => matcher1 !== undefined && matcher2 === undefined
}
function selectMostSpecificMatcherForField<FieldName extends keyof NormalizedFilterChainMatch>(fieldName: FieldName, evaluator: MatchFieldEvaluator<NormalizedFilterChainMatch[FieldName], FieldType<NormalizedFilterChainMatch[FieldName]>>, matchers: NormalizedFilterChainMatch[], fieldValue: FieldType<NormalizedFilterChainMatch[FieldName]>): NormalizedFilterChainMatch[] {
let filteredCandidates: NormalizedFilterChainMatch[] = [];
for (const candidate of matchers) {
const fieldMatcher = candidate[fieldName];
if (!evaluator.isMatch(fieldMatcher, fieldValue)) {
continue;
}
if (filteredCandidates.length === 0) {
filteredCandidates.push(candidate);
} else if (evaluator.matcherEqual(fieldMatcher, filteredCandidates[0][fieldName])) {
filteredCandidates.push(candidate);
} else if (evaluator.isMoreSpecific(fieldMatcher, filteredCandidates[0][fieldName])) {
filteredCandidates = [candidate];
}
}
return filteredCandidates;
}
function selectMostSpecificallyMatchingFilter(filterChains: FilterChainEntry[], socket: net.Socket): FilterChainEntry | null {
let matcherMap: Map<NormalizedFilterChainMatch, FilterChainEntry> = new Map(filterChains.map(chain => chain.getMatchers().map(matcher => ([matcher, chain] as [NormalizedFilterChainMatch, FilterChainEntry]))).flat());
let matcherCandidates = Array.from(matcherMap.keys());
matcherCandidates = selectMostSpecificMatcherForField('prefixRange', cidrRangeEvaluator, matcherCandidates, socket.localAddress);
matcherCandidates = selectMostSpecificMatcherForField('sourceType', sourceTypeEvaluator, matcherCandidates, socket);
matcherCandidates = selectMostSpecificMatcherForField('sourcePrefixRange', cidrRangeEvaluator, matcherCandidates, socket.remoteAddress);
matcherCandidates = selectMostSpecificMatcherForField('sourcePort', portEvaluator, matcherCandidates, socket.remotePort);
if (matcherCandidates.length === 1) {
return matcherMap.get(matcherCandidates[0])!
} else if (matcherCandidates.length === 0) {
return null;
} else {
throw new Error('Duplicate matcher found for incoming connection');
}
}
const BOOTSTRAP_CONFIG_KEY = 'grpc.TEST_ONLY_DO_NOT_USE_IN_PROD.xds_bootstrap_config';
// Default drain grace time of 10 minutes
const DEFAULT_DRAIN_GRACE_TIME_MS = 10 * 60 * 1000;
export interface XdsServerOptions extends ServerOptions {
drainGraceTimeMs?: number;
}
export class XdsServer extends Server {
private listenerResourceNameTemplate: string;
private boundPortMap: Map<string, BoundPortEntry> = new Map();
private xdsClient: XdsClient;
private drainGraceTimeMs: number;
constructor(options?: XdsServerOptions) {
super(options);
let bootstrapConfig: BootstrapInfo;
if (options?.[BOOTSTRAP_CONFIG_KEY]) {
const parsedConfig = JSON.parse(options[BOOTSTRAP_CONFIG_KEY]);
bootstrapConfig = validateBootstrapConfig(parsedConfig);
this.xdsClient = new XdsClient(bootstrapConfig);
} else {
bootstrapConfig = loadBootstrapInfo();
this.xdsClient = getSingletonXdsClient();
}
if (!bootstrapConfig.serverListenerResourceNameTemplate) {
throw new Error('Bootstrap file missing required field server_listener_resource_name_template');
}
this.listenerResourceNameTemplate = bootstrapConfig.serverListenerResourceNameTemplate;
this.drainGraceTimeMs = options?.drainGraceTimeMs ?? DEFAULT_DRAIN_GRACE_TIME_MS;
}
/**
* Bind a port using configuration retrieved from the xDS control plane.
* @param port Port to bind in the format [IP address]:[port number] (e.g. 0.0.0.0:443)
* @param creds Server credentials object to bind
* @param callback
*/
override bindAsync(port: string, creds: ServerCredentials, callback: (error: Error | null, port: number) => void): void {
// Validate port string has the form IP:port
const hostPort = splitHostPort(port);
if (!hostPort || !isValidIpPort(hostPort)) {
throw new Error(`Listening port string must have the format IP:port with non-zero port, got ${port}`);
}
const configParameters: ConfigParameters = {
createConnectionInjector: (credentials) => this.createConnectionInjector(credentials),
drainGraceTimeMs: this.drainGraceTimeMs,
listenerResourceNameTemplate: this.listenerResourceNameTemplate,
xdsClient: this.xdsClient
};
const portEntry = new BoundPortEntry(configParameters, port, creds);
const servingStatusListener: ServingStatusListener = statusObject => {
if (statusObject.code === status.OK) {
callback(null, hostPort.port!);
portEntry.removeServingStatusListener(servingStatusListener);
}
}
portEntry.addServingStatusListener(servingStatusListener);
this.boundPortMap.set(port, portEntry);
}
override drain(port: string, graceTimeMs: number): void {
const boundPort = this.boundPortMap.get(port);
boundPort?.drain(graceTimeMs);
}
override unbind(port: string): void {
const boundPort = this.boundPortMap.get(port);
if (!boundPort) {
return;
}
boundPort.unbind();
this.boundPortMap.delete(port);
}
override tryShutdown(callback: (error?: Error) => void): void {
for (const portEntry of this.boundPortMap.values()) {
portEntry.unbind();
}
this.boundPortMap.clear();
super.tryShutdown(callback);
}
override forceShutdown(): void {
for (const portEntry of this.boundPortMap.values()) {
portEntry.unbind();
}
this.boundPortMap.clear();
super.forceShutdown();
}
addServingStateListener(port: string, listener: ServingStatusListener) {
const portEntry = this.boundPortMap.get(port);
if (portEntry) {
portEntry.addServingStatusListener(listener);
}
}
removeServingStateListener(port: string, listener: ServingStatusListener) {
const portEntry = this.boundPortMap.get(port);
if (portEntry) {
portEntry.removeServingStatusListener(listener);
}
}
}

View File

@ -56,6 +56,7 @@ export interface BootstrapInfo {
node: Node;
authorities: {[authorityName: string]: Authority};
clientDefaultListenerResourceNameTemplate: string;
serverListenerResourceNameTemplate: string | null;
}
const KNOWN_SERVER_FEATURES = ['ignore_resource_deletion'];
@ -308,6 +309,11 @@ function validateAuthoritiesMap(obj: any): {[authorityName: string]: Authority}
export function validateBootstrapConfig(obj: any): BootstrapInfo {
const xdsServers = obj.xds_servers.map(validateXdsServerConfig);
const node = validateNode(obj.node);
if ('server_listener_resource_name_template' in obj) {
if (typeof obj.server_listener_resource_name_template !== 'string') {
throw new Error(`server_listener_resource_name_template: expected string, got ${typeof obj.server_listener_resource_name_template}`);
}
}
if (EXPERIMENTAL_FEDERATION) {
if ('client_default_listener_resource_name_template' in obj) {
if (typeof obj.client_default_listener_resource_name_template !== 'string') {
@ -318,14 +324,16 @@ export function validateBootstrapConfig(obj: any): BootstrapInfo {
xdsServers: xdsServers,
node: node,
authorities: validateAuthoritiesMap(obj.authorities),
clientDefaultListenerResourceNameTemplate: obj.client_default_listener_resource_name_template ?? '%s'
clientDefaultListenerResourceNameTemplate: obj.client_default_listener_resource_name_template ?? '%s',
serverListenerResourceNameTemplate: obj.server_listener_resource_name_template ?? null
};
} else {
return {
xdsServers: xdsServers,
node: node,
authorities: {},
clientDefaultListenerResourceNameTemplate: '%s'
clientDefaultListenerResourceNameTemplate: '%s',
serverListenerResourceNameTemplate: obj.server_listener_resource_name_template ?? null
};
}
}
@ -395,3 +403,21 @@ export function loadBootstrapInfo(): BootstrapInfo {
'The GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG environment variables need to be set to the path to the bootstrap file to use xDS'
);
}
/**
* Encode a text string as a valid path of a URI, as specified in RFC-3986 section 3.3
* @param uriPath A value representing an unencoded URI path
* @returns
*/
export function encodeURIPath(uriPath: string): string {
return uriPath.replace(/[^A-Za-z0-9._~!$&^()*+,;=/-]/g, substring => encodeURIComponent(substring));
}
export function formatTemplateString(templateString: string, value: string): string {
if (templateString.startsWith('xdstp:')) {
return templateString.replace(/%s/g, encodeURIPath(value));
} else {
return templateString.replace(/%s/g, value);
}
}

View File

@ -199,6 +199,7 @@ class AdsResponseParser {
return;
}
if (!this.result.type) {
this.adsCallState.client.trace('Received resource for uninitialized type ' + resource.type_url);
return;
}
const decodeContext: XdsDecodeContext = {
@ -228,6 +229,7 @@ class AdsResponseParser {
const resourceState = this.adsCallState.client.xdsClient.authorityStateMap.get(parsedName.authority)?.resourceMap.get(this.result.type)?.get(parsedName.key);
if (!resourceState) {
// No subscription for this resource
this.adsCallState.client.trace('Received resource of type ' + this.result.type.getTypeUrl() + ' named ' + decodeResult.name + ' with no subscriptions');
return;
}
if (resourceState.deletionIgnored) {
@ -248,8 +250,9 @@ class AdsResponseParser {
return;
}
if (!decodeResult.value) {
this.adsCallState.client.trace('Failed to parse resource of type ' + this.result.type.getTypeUrl());
return;
}
}
this.adsCallState.client.trace('Parsed resource of type ' + this.result.type.getTypeUrl() + ': ' + JSON.stringify(decodeResult.value, (key, value) => (value && value.type === 'Buffer' && Array.isArray(value.data)) ? (value.data as Number[]).map(n => n.toString(16)).join('') : value, 2));
this.result.haveValidResources = true;
if (this.result.type.resourcesEqual(resourceState.cachedResource, decodeResult.value)) {
@ -263,6 +266,7 @@ class AdsResponseParser {
version: this.result.version!
};
process.nextTick(() => {
trace('Notifying ' + resourceState.watchers.size + ' watchers of ' + this.result.type?.getTypeUrl() + ' update');
for (const watcher of resourceState.watchers) {
watcher.onGenericResourceChanged(decodeResult.value!);
}

View File

@ -24,6 +24,11 @@ import { XdsDecodeContext, XdsDecodeResult, XdsResourceType } from "./xds-resour
import { getTopLevelFilterUrl, validateTopLevelFilter } from "../http-filter";
import { RouteConfigurationResourceType } from "./route-config-resource-type";
import { Watcher, XdsClient } from "../xds-client";
import { CidrRange, cidrRangeEqual, cidrRangeMessageToCidrRange, normalizeCidrRange } from "../cidr";
import { FilterChainMatch__Output, _envoy_config_listener_v3_FilterChainMatch_ConnectionSourceType } from "../generated/envoy/config/listener/v3/FilterChainMatch";
import { crossProduct } from "../cross-product";
import { FilterChain__Output } from "../generated/envoy/config/listener/v3/FilterChain";
import { HttpConnectionManager__Output } from "../generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager";
const TRACER_NAME = 'xds_client';
@ -33,6 +38,110 @@ function trace(text: string): void {
const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router';
type ConnectionSourceType = keyof typeof _envoy_config_listener_v3_FilterChainMatch_ConnectionSourceType;
interface NormalizedFilterChainMatch {
destinationPort?: number;
prefixRange?: CidrRange;
sourceType: ConnectionSourceType;
sourcePrefixRange?: CidrRange;
sourcePort?: number;
serverName?: string;
transportProtocol: string;
applicationProtocol?: string;
}
function normalizedFilterChainMatchEquals(first: NormalizedFilterChainMatch, second: NormalizedFilterChainMatch) {
return (
first.destinationPort === second.destinationPort &&
cidrRangeEqual(first.prefixRange, second.prefixRange) &&
first.sourceType === second.sourceType &&
cidrRangeEqual(first.sourcePrefixRange, second.sourcePrefixRange) &&
first.sourcePort === second.sourcePort &&
first.serverName === second.serverName &&
first.transportProtocol === second.transportProtocol &&
first.applicationProtocol === second.applicationProtocol
);
}
function normalizeFilterChainMatch(filterChainMatch: FilterChainMatch__Output): NormalizedFilterChainMatch[] {
const prefixRanges = filterChainMatch.prefix_ranges.map(cidrRangeMessageToCidrRange).map(normalizeCidrRange);
const sourcePrefixRanges = filterChainMatch.source_prefix_ranges.map(cidrRangeMessageToCidrRange).map(normalizeCidrRange);
const sourcePorts = filterChainMatch.source_ports;
const serverNames = filterChainMatch.server_names;
const applicationProtocols = filterChainMatch.application_protocols;
const fieldCrossProduct = crossProduct([prefixRanges, sourcePrefixRanges, sourcePorts, serverNames, applicationProtocols] as [CidrRange[], CidrRange[], number[], string[], string[]]);
return fieldCrossProduct.map(([prefixRange, sourcePrefixRange, sourcePort, serverName, applicationProtocol]) => ({
destinationPort: filterChainMatch.destination_port?.value,
prefixRange,
sourceType: filterChainMatch.source_type,
sourcePrefixRange,
sourcePort,
serverName,
transportProtocol: filterChainMatch.transport_protocol,
applicationProtocol: applicationProtocol
}));
}
function validateHttpConnectionManager(httpConnectionManager: HttpConnectionManager__Output): boolean {
if (EXPERIMENTAL_FAULT_INJECTION) {
const filterNames = new Set<string>();
for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) {
if (filterNames.has(httpFilter.name)) {
trace('LDS response validation failed: duplicate HTTP filter name ' + httpFilter.name);
return false;
}
filterNames.add(httpFilter.name);
if (!validateTopLevelFilter(httpFilter)) {
trace('LDS response validation failed: ' + httpFilter.name + ' filter validation failed');
return false;
}
/* Validate that the last filter, and only the last filter, is the
* router filter. */
const filterUrl = getTopLevelFilterUrl(httpFilter.typed_config!)
if (index < httpConnectionManager.http_filters.length - 1) {
if (filterUrl === ROUTER_FILTER_URL) {
trace('LDS response validation failed: router filter is before end of list');
return false;
}
} else {
if (filterUrl !== ROUTER_FILTER_URL) {
trace('LDS response validation failed: final filter is ' + filterUrl);
return false;
}
}
}
}
switch (httpConnectionManager.route_specifier) {
case 'rds':
if (!httpConnectionManager.rds?.config_source?.ads && !httpConnectionManager.rds?.config_source?.self) {
return false;
}
break;
case 'route_config':
if (!RouteConfigurationResourceType.get().validateResource(httpConnectionManager.route_config!)) {
return false;
}
break;
default: return false;
}
return true;
}
function validateFilterChain(filterChain: FilterChain__Output): boolean {
if (filterChain.filters.length !== 1) {
return false;
}
if (filterChain.filters[0].typed_config?.type_url !== HTTP_CONNECTION_MANGER_TYPE_URL) {
return false;
}
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, filterChain.filters[0].typed_config.value);
if (!validateHttpConnectionManager(httpConnectionManager)) {
return false;
}
return true;
}
export class ListenerResourceType extends XdsResourceType {
private static singleton: ListenerResourceType = new ListenerResourceType();
private constructor() {
@ -56,47 +165,34 @@ export class ListenerResourceType extends XdsResourceType {
return null;
}
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, message.api_listener!.api_listener.value);
if (EXPERIMENTAL_FAULT_INJECTION) {
const filterNames = new Set<string>();
for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) {
if (filterNames.has(httpFilter.name)) {
trace('LDS response validation failed: duplicate HTTP filter name ' + httpFilter.name);
return null;
}
filterNames.add(httpFilter.name);
if (!validateTopLevelFilter(httpFilter)) {
trace('LDS response validation failed: ' + httpFilter.name + ' filter validation failed');
return null;
}
/* Validate that the last filter, and only the last filter, is the
* router filter. */
const filterUrl = getTopLevelFilterUrl(httpFilter.typed_config!)
if (index < httpConnectionManager.http_filters.length - 1) {
if (filterUrl === ROUTER_FILTER_URL) {
trace('LDS response validation failed: router filter is before end of list');
return null;
}
} else {
if (filterUrl !== ROUTER_FILTER_URL) {
trace('LDS response validation failed: final filter is ' + filterUrl);
if (!validateHttpConnectionManager(httpConnectionManager)) {
return null;
}
if (message.listener_filters.length > 0) {
return null;
}
if (message.use_original_dst?.value === true) {
return null;
}
const seenMatches: NormalizedFilterChainMatch[] = [];
for (const filterChain of message.filter_chains) {
if (filterChain.filter_chain_match) {
const normalizedMatches = normalizeFilterChainMatch(filterChain.filter_chain_match);
for (const match of normalizedMatches) {
if (seenMatches.some(prevMatch => normalizedFilterChainMatchEquals(match, prevMatch))) {
return null;
}
seenMatches.push(match);
}
}
if (!validateFilterChain(filterChain)) {
return null;
}
}
switch (httpConnectionManager.route_specifier) {
case 'rds':
if (!httpConnectionManager.rds?.config_source?.ads && !httpConnectionManager.rds?.config_source?.self) {
return null;
}
return message;
case 'route_config':
if (!RouteConfigurationResourceType.get().validateResource(httpConnectionManager.route_config!)) {
return null;
}
return message;
if (message.default_filter_chain && !validateFilterChain(message.default_filter_chain)) {
return null;
}
return null;
return message;
}
decode(context: XdsDecodeContext, resource: Any__Output): XdsDecodeResult {

View File

@ -127,41 +127,50 @@ export class RouteConfigurationResourceType extends XdsResourceType {
return null;
}
}
if (route.action !== 'route') {
return null;
}
if ((route.route === undefined) || (route.route === null) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) {
return null;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
switch (route.action) {
case 'route': {
if (route.action !== 'route') {
return null;
}
}
}
if (EXPERIMENTAL_RETRY) {
if (!this.validateRetryPolicy(route.route.retry_policy)) {
return null;
}
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
let weightSum = 0;
for (const clusterWeight of route.route.weighted_clusters!.clusters) {
weightSum += clusterWeight.weight?.value ?? 0;
}
if (weightSum === 0 || weightSum > UINT32_MAX) {
return null;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
for (const weightedCluster of route.route!.weighted_clusters!.clusters) {
for (const filterConfig of Object.values(weightedCluster.typed_per_filter_config ?? {})) {
if ((route.route === undefined) || (route.route === null) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) {
return null;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
return null;
}
}
}
if (EXPERIMENTAL_RETRY) {
if (!this.validateRetryPolicy(route.route.retry_policy)) {
return null;
}
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
let weightSum = 0;
for (const clusterWeight of route.route.weighted_clusters!.clusters) {
weightSum += clusterWeight.weight?.value ?? 0;
}
if (weightSum === 0 || weightSum > UINT32_MAX) {
return null;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
for (const weightedCluster of route.route!.weighted_clusters!.clusters) {
for (const filterConfig of Object.values(weightedCluster.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
return null;
}
}
}
}
}
break;
}
case 'non_forwarding_action':
continue;
default:
return null;
}
}
}

View File

@ -21,6 +21,11 @@ import { ProtoGrpcType } from "./generated/echo";
import { EchoRequest__Output } from "./generated/grpc/testing/EchoRequest";
import { EchoResponse } from "./generated/grpc/testing/EchoResponse";
import * as net from 'net';
import { XdsServer } from "../src";
import { ControlPlaneServer } from "./xds-server";
import { findFreePorts } from 'find-free-ports';
const loadedProtos = loadPackageDefinition(loadSync(
[
'grpc/testing/echo.proto'
@ -38,12 +43,13 @@ const loadedProtos = loadPackageDefinition(loadSync(
],
})) as unknown as ProtoGrpcType;
const BOOTSTRAP_CONFIG_KEY = 'grpc.TEST_ONLY_DO_NOT_USE_IN_PROD.xds_bootstrap_config';
export class Backend {
private server: Server | null = null;
private receivedCallCount = 0;
private callListeners: (() => void)[] = [];
private port: number | null = null;
constructor() {
constructor(private port: number, private useXdsServer: boolean) {
}
Echo(call: ServerUnaryCall<EchoRequest__Output, EchoResponse>, callback: sendUnaryData<EchoResponse>) {
// call.request.params is currently ignored
@ -72,25 +78,28 @@ export class Backend {
this.callListeners.push(listener);
}
start(callback: (error: Error | null, port: number) => void) {
start(controlPlaneServer: ControlPlaneServer, callback: (error: Error | null, port: number) => void) {
if (this.server) {
throw new Error("Backend already running");
}
this.server = new Server();
this.server.addService(loadedProtos.grpc.testing.EchoTestService.service, this as unknown as UntypedServiceImplementation);
const boundPort = this.port ?? 0;
this.server.bindAsync(`localhost:${boundPort}`, ServerCredentials.createInsecure(), (error, port) => {
if (this.useXdsServer) {
this.server = new XdsServer({[BOOTSTRAP_CONFIG_KEY]: controlPlaneServer.getBootstrapInfoString()});
} else {
this.server = new Server();
}
const server = this.server;
server.addService(loadedProtos.grpc.testing.EchoTestService.service, this as unknown as UntypedServiceImplementation);
server.bindAsync(`[::1]:${this.port}`, ServerCredentials.createInsecure(), (error, port) => {
if (!error) {
this.port = port;
this.server!.start();
}
callback(error, port);
})
});
}
startAsync(): Promise<number> {
startAsync(controlPlaneServer: ControlPlaneServer): Promise<number> {
return new Promise((resolve, reject) => {
this.start((error, port) => {
this.start(controlPlaneServer, (error, port) => {
if (error) {
reject(error);
} else {
@ -101,9 +110,6 @@ export class Backend {
}
getPort(): number {
if (this.port === null) {
throw new Error('Port not set. Backend not yet started.');
}
return this.port;
}
@ -138,3 +144,8 @@ export class Backend {
});
}
}
export async function createBackends(count: number, useXdsServer?: boolean): Promise<Backend[]> {
const ports = await findFreePorts(count);
return ports.map(port => new Backend(port, useXdsServer ?? true));
}

View File

@ -19,7 +19,7 @@ import { ChannelOptions, credentials, loadPackageDefinition, ServiceError } from
import { loadSync } from "@grpc/proto-loader";
import { ProtoGrpcType } from "./generated/echo";
import { EchoTestServiceClient } from "./generated/grpc/testing/EchoTestService";
import { XdsServer } from "./xds-server";
import { ControlPlaneServer } from "./xds-server";
const loadedProtos = loadPackageDefinition(loadSync(
[
@ -50,7 +50,7 @@ export class XdsTestClient {
clearInterval(this.callInterval);
}
static createFromServer(targetName: string, xdsServer: XdsServer, options?: ChannelOptions) {
static createFromServer(targetName: string, xdsServer: ControlPlaneServer, options?: ChannelOptions) {
return new XdsTestClient(`xds:///${targetName}`, xdsServer.getBootstrapInfoString(), options);
}
@ -82,6 +82,14 @@ export class XdsTestClient {
});
}
sendOneCallAsync(): Promise<ServiceError | null> {
return new Promise((resolve, reject) => {
this.sendOneCall(error => {
resolve(error)
});
});
}
sendNCalls(count: number, callback: (error: ServiceError| null) => void) {
const sendInner = (count: number, callback: (error: ServiceError| null) => void) => {
if (count === 0) {
@ -99,6 +107,18 @@ export class XdsTestClient {
sendInner(count, callback);
}
sendNCallsAsync(count: number): Promise<void> {
return new Promise((resolve, reject) => {
this.sendNCalls(count, error => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
getConnectivityState() {
return this.client.getChannel().getConnectivityState(false);
}

View File

@ -29,6 +29,7 @@ import { LocalityLbEndpoints } from "../src/generated/envoy/config/endpoint/v3/L
import { LbEndpoint } from "../src/generated/envoy/config/endpoint/v3/LbEndpoint";
import { ClusterConfig } from "../src/generated/envoy/extensions/clusters/aggregate/v3/ClusterConfig";
import { Any } from "../src/generated/google/protobuf/Any";
import { ControlPlaneServer } from "./xds-server";
interface Endpoint {
locality: Locality;
@ -64,7 +65,7 @@ export interface FakeCluster {
getClusterConfig(): Cluster;
getAllClusterConfigs(): Cluster[];
getName(): string;
startAllBackends(): Promise<any>;
startAllBackends(controlPlaneServer: ControlPlaneServer): Promise<any>;
haveAllBackendsReceivedTraffic(): boolean;
waitForAllBackendsToReceiveTraffic(): Promise<void>;
}
@ -121,8 +122,8 @@ export class FakeEdsCluster implements FakeCluster {
return this.clusterName;
}
startAllBackends(): Promise<any> {
return Promise.all(this.endpoints.map(endpoint => Promise.all(endpoint.backends.map(backend => backend.startAsync()))));
startAllBackends(controlPlaneServer: ControlPlaneServer): Promise<any> {
return Promise.all(this.endpoints.map(endpoint => Promise.all(endpoint.backends.map(backend => backend.startAsync(controlPlaneServer)))));
}
haveAllBackendsReceivedTraffic(): boolean {
@ -192,8 +193,8 @@ export class FakeDnsCluster implements FakeCluster {
getName(): string {
return this.name;
}
startAllBackends(): Promise<any> {
return this.backend.startAsync();
startAllBackends(controlPlaneServer: ControlPlaneServer): Promise<any> {
return this.backend.startAsync(controlPlaneServer);
}
haveAllBackendsReceivedTraffic(): boolean {
return this.backend.getCallCount() > 0;
@ -231,8 +232,8 @@ export class FakeAggregateCluster implements FakeCluster {
getName(): string {
return this.name;
}
startAllBackends(): Promise<any> {
return Promise.all(this.children.map(child => child.startAllBackends()));
startAllBackends(controlPlaneServer: ControlPlaneServer): Promise<any> {
return Promise.all(this.children.map(child => child.startAllBackends(controlPlaneServer)));
}
haveAllBackendsReceivedTraffic(): boolean {
for (const child of this.children) {
@ -320,12 +321,12 @@ export class FakeRouteGroup {
};
}
startAllBackends(): Promise<any> {
startAllBackends(controlPlaneServer: ControlPlaneServer): Promise<any> {
return Promise.all(this.routes.map(route => {
if (route.cluster) {
return route.cluster.startAllBackends();
return route.cluster.startAllBackends(controlPlaneServer);
} else if (route.weightedClusters) {
return Promise.all(route.weightedClusters.map(clusterWeight => clusterWeight.cluster.startAllBackends()));
return Promise.all(route.weightedClusters.map(clusterWeight => clusterWeight.cluster.startAllBackends(controlPlaneServer)));
} else {
return Promise.resolve();
}
@ -359,3 +360,69 @@ export class FakeRouteGroup {
}));
}
}
const DEFAULT_BASE_SERVER_LISTENER: Listener = {
default_filter_chain: {
filter_chain_match: {
source_type: 'SAME_IP_OR_LOOPBACK'
}
}
};
const DEFAULT_BASE_SERVER_ROUTE_CONFIG: RouteConfiguration = {
virtual_hosts: [{
domains: ['*'],
routes: [{
match: {
prefix: ''
},
action: 'non_forwarding_action',
non_forwarding_action: {}
}]
}]
};
export class FakeServerRoute {
private listener: Listener;
private routeConfiguration: RouteConfiguration;
constructor(port: number, routeName: string, baseListener?: Listener | undefined, baseRouteConfiguration?: RouteConfiguration) {
this.listener = baseListener ?? DEFAULT_BASE_SERVER_LISTENER;
this.listener.name = `[::1]:${port}`;
this.listener.address = {
socket_address: {
address: '::1',
port_value: port
}
}
const httpConnectionManager: HttpConnectionManager & AnyExtension = {
'@type': HTTP_CONNECTION_MANGER_TYPE_URL,
rds: {
route_config_name: routeName,
config_source: {ads: {}}
}
};
this.listener.api_listener = {
api_listener: httpConnectionManager
}
const filterList = [{
typed_config: httpConnectionManager
}];
if (this.listener.default_filter_chain) {
this.listener.default_filter_chain.filters = filterList;
}
for (const filterChain of this.listener.filter_chains ?? []) {
filterChain.filters = filterList;
}
this.routeConfiguration = baseRouteConfiguration ?? DEFAULT_BASE_SERVER_ROUTE_CONFIG;
this.routeConfiguration.name = routeName;
}
getRouteConfiguration(): RouteConfiguration {
return this.routeConfiguration;
}
getListener(): Listener {
return this.listener;
}
}

View File

@ -0,0 +1,164 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import assert = require('assert');
import { CidrRange, formatIPv4, formatIPv6, normalizeCidrRange, parseIPv4, parseIPv6 } from '../src/cidr';
describe('parseIPv4 and formatIPv4', () => {
describe('Should transform as expected', () => {
const TEST_CASES = [
{
address: '0.0.0.0',
parsed: 0n
},
{
address: '255.255.255.255',
parsed: 0xffffffffn
},
{
address: '10.0.0.1',
parsed: 0x0a000001n
},
{
address: '10.0.0.0',
parsed: 0x0a000000n
},
{
address: '192.168.0.1',
parsed: 0xc0a80001n
},
{
address: '192.168.0.0',
parsed: 0xc0a80000n
}
];
for (const {address, parsed} of TEST_CASES) {
it(address, () => {
assert.strictEqual(parseIPv4(address), parsed);
assert.strictEqual(formatIPv4(parsed), address);
});
}
});
});
describe('parseIPv6 and formatIPv6', () => {
describe('Should transform as expected', () => {
const TEST_CASES = [
{
address: '::',
parsed: 0n
},
{
address: 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff',
parsed: 0xffffffffffffffffffffffffffffffffn
},
{
address: '::1',
parsed: 1n
},
// The example address in the IPv6 Wikipedia article
{
address: '2001:db8::ff00:42:8329',
parsed: 0x20010db8000000000000ff0000428329n
}
];
for (const {address, parsed} of TEST_CASES) {
it(address, () => {
assert.strictEqual(parseIPv6(address), parsed);
assert.strictEqual(formatIPv6(parsed), address);
});
}
});
});
describe('CIDR range normalization', () => {
const TEST_CASES: {input: CidrRange, output: CidrRange}[] = [
{
input: {
addressPrefix: '192.168.0.0',
prefixLen: 24
},
output: {
addressPrefix: '192.168.0.0',
prefixLen: 24
},
},
{
input: {
addressPrefix: '192.168.0.128',
prefixLen: 24
},
output: {
addressPrefix: '192.168.0.0',
prefixLen: 24
},
},
{
input: {
addressPrefix: '192.168.0.1',
prefixLen: 24
},
output: {
addressPrefix: '192.168.0.0',
prefixLen: 24
},
},
{
input: {
addressPrefix: '192.168.0.1',
prefixLen: -1
},
output: {
addressPrefix: '0.0.0.0',
prefixLen: 0
},
},
{
input: {
addressPrefix: '192.168.0.1',
prefixLen: 33
},
output: {
addressPrefix: '192.168.0.1',
prefixLen: 32
},
},
{
input: {
addressPrefix: 'fe80::',
prefixLen: 10
},
output: {
addressPrefix: 'fe80::',
prefixLen: 10
},
},
{
input: {
addressPrefix: 'fe80::1',
prefixLen: 10
},
output: {
addressPrefix: 'fe80::',
prefixLen: 10
},
},
];
for (const {input, output} of TEST_CASES) {
it(`${input.addressPrefix}/${input.prefixLen} -> ${output.addressPrefix}/${output.prefixLen}`, () => {
assert.deepStrictEqual(normalizeCidrRange(input), output);
})
}
});

View File

@ -17,18 +17,18 @@
import { register } from "../src";
import assert = require("assert");
import { XdsServer } from "./xds-server";
import { ControlPlaneServer } from "./xds-server";
import { XdsTestClient } from "./client";
import { FakeAggregateCluster, FakeDnsCluster, FakeEdsCluster, FakeRouteGroup } from "./framework";
import { Backend } from "./backend";
import { FakeAggregateCluster, FakeDnsCluster, FakeEdsCluster, FakeRouteGroup, FakeServerRoute } from "./framework";
import { Backend, createBackends } from "./backend";
register();
describe('Cluster types', () => {
let xdsServer: XdsServer;
let xdsServer: ControlPlaneServer;
let client: XdsTestClient;
beforeEach(done => {
xdsServer = new XdsServer();
xdsServer = new ControlPlaneServer();
xdsServer.startServer(error => {
done(error);
});
@ -39,35 +39,51 @@ describe('Cluster types', () => {
});
describe('Logical DNS Clusters', () => {
it('Should successfully make RPCs', done => {
const cluster = new FakeDnsCluster('dnsCluster', new Backend());
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
createBackends(1).then(([backend]) => {
const cluster = new FakeDnsCluster('dnsCluster', backend);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(error => {
done(error);
});
routeGroup.startAllBackends(xdsServer).then(() => {
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(error => {
done(error);
});
}, reason => done(reason));
}, reason => done(reason));
});
});
/* These tests pass on Node 18 fail on Node 16, probably because of
* https://github.com/nodejs/node/issues/42713 */
describe.skip('Aggregate DNS Clusters', () => {
it('Should result in prioritized clusters', () => {
const backend1 = new Backend();
const backend2 = new Backend();
it('Should result in prioritized clusters', async () => {
const [backend1, backend2] = await createBackends(2, false);
const cluster1 = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend1], locality:{region: 'region1'}}]);
const cluster2 = new FakeEdsCluster('cluster2', 'endpoint2', [{backends: [backend2], locality:{region: 'region2'}}]);
const aggregateCluster = new FakeAggregateCluster('aggregateCluster', [cluster1, cluster2]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: aggregateCluster}]);
return routeGroup.startAllBackends().then(() => {
const serverRoute1 = new FakeServerRoute(backend1.getPort(), 'serverRoute1');
xdsServer.setRdsResource(serverRoute1.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute1.getListener());
const serverRoute2 = new FakeServerRoute(backend2.getPort(), 'serverRoute2');
xdsServer.setRdsResource(serverRoute2.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute2.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
return routeGroup.startAllBackends(xdsServer).then(() => {
xdsServer.setEdsResource(cluster1.getEndpointConfig());
xdsServer.setCdsResource(cluster1.getClusterConfig());
xdsServer.setEdsResource(cluster2.getEndpointConfig());
@ -75,29 +91,34 @@ describe('Cluster types', () => {
xdsServer.setCdsResource(aggregateCluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
return cluster1.waitForAllBackendsToReceiveTraffic();
}).then(() => backend1.shutdownAsync()
).then(() => cluster2.waitForAllBackendsToReceiveTraffic()
).then(() => backend1.startAsync()
).then(() => backend1.startAsync(xdsServer)
).then(() => cluster1.waitForAllBackendsToReceiveTraffic());
});
it('Should handle a diamond dependency', () => {
const backend1 = new Backend();
const backend2 = new Backend();
it('Should handle a diamond dependency', async () => {
const [backend1, backend2] = await createBackends(2);
const cluster1 = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend1], locality:{region: 'region1'}}]);
const cluster2 = new FakeEdsCluster('cluster2', 'endpoint2', [{backends: [backend2], locality:{region: 'region2'}}]);
const aggregateCluster1 = new FakeAggregateCluster('aggregateCluster1', [cluster1, cluster2]);
const aggregateCluster2 = new FakeAggregateCluster('aggregateCluster2', [cluster1, aggregateCluster1]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: aggregateCluster2}]);
return Promise.all([backend1.startAsync(), backend2.startAsync()]).then(() => {
const serverRoute1 = new FakeServerRoute(backend1.getPort(), 'serverRoute1');
xdsServer.setRdsResource(serverRoute1.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute1.getListener());
const serverRoute2 = new FakeServerRoute(backend2.getPort(), 'serverRoute2');
xdsServer.setRdsResource(serverRoute2.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute2.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
return Promise.all([backend1.startAsync(xdsServer), backend2.startAsync(xdsServer)]).then(() => {
xdsServer.setEdsResource(cluster1.getEndpointConfig());
xdsServer.setCdsResource(cluster1.getClusterConfig());
xdsServer.setEdsResource(cluster2.getEndpointConfig());
@ -106,74 +127,72 @@ describe('Cluster types', () => {
xdsServer.setCdsResource(aggregateCluster2.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
return cluster1.waitForAllBackendsToReceiveTraffic();
}).then(() => backend1.shutdownAsync()
).then(() => cluster2.waitForAllBackendsToReceiveTraffic()
).then(() => backend1.startAsync()
).then(() => backend1.startAsync(xdsServer)
).then(() => cluster1.waitForAllBackendsToReceiveTraffic());
});
it('Should handle EDS then DNS cluster order', () => {
const backend1 = new Backend();
const backend2 = new Backend();
it('Should handle EDS then DNS cluster order', async () => {
const [backend1, backend2] = await createBackends(2);
const cluster1 = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend1], locality:{region: 'region1'}}]);
const cluster2 = new FakeDnsCluster('cluster2', backend2);
const aggregateCluster = new FakeAggregateCluster('aggregateCluster', [cluster1, cluster2]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: aggregateCluster}]);
return routeGroup.startAllBackends().then(() => {
const serverRoute1 = new FakeServerRoute(backend1.getPort(), 'serverRoute1');
xdsServer.setRdsResource(serverRoute1.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute1.getListener());
const serverRoute2 = new FakeServerRoute(backend2.getPort(), 'serverRoute2');
xdsServer.setRdsResource(serverRoute2.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute2.getListener());
return routeGroup.startAllBackends(xdsServer).then(() => {
xdsServer.setEdsResource(cluster1.getEndpointConfig());
xdsServer.setCdsResource(cluster1.getClusterConfig());
xdsServer.setCdsResource(cluster2.getClusterConfig());
xdsServer.setCdsResource(aggregateCluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
return cluster1.waitForAllBackendsToReceiveTraffic();
}).then(() => backend1.shutdownAsync()
).then(() => cluster2.waitForAllBackendsToReceiveTraffic()
).then(() => backend1.startAsync()
).then(() => backend1.startAsync(xdsServer)
).then(() => cluster1.waitForAllBackendsToReceiveTraffic());
});
it('Should handle DNS then EDS cluster order', () => {
const backend1 = new Backend();
const backend2 = new Backend();
it('Should handle DNS then EDS cluster order', async () => {
const [backend1, backend2] = await createBackends(2);
const cluster1 = new FakeDnsCluster('cluster1', backend1);
const cluster2 = new FakeEdsCluster('cluster2', 'endpoint2', [{backends: [backend2], locality:{region: 'region2'}}]);
const aggregateCluster = new FakeAggregateCluster('aggregateCluster', [cluster1, cluster2]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: aggregateCluster}]);
return routeGroup.startAllBackends().then(() => {
const serverRoute1 = new FakeServerRoute(backend1.getPort(), 'serverRoute1');
xdsServer.setRdsResource(serverRoute1.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute1.getListener());
const serverRoute2 = new FakeServerRoute(backend2.getPort(), 'serverRoute2');
xdsServer.setRdsResource(serverRoute2.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute2.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
return routeGroup.startAllBackends(xdsServer).then(() => {
xdsServer.setCdsResource(cluster1.getClusterConfig());
xdsServer.setEdsResource(cluster2.getEndpointConfig());
xdsServer.setCdsResource(cluster2.getClusterConfig());
xdsServer.setCdsResource(aggregateCluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
return cluster1.waitForAllBackendsToReceiveTraffic();
}).then(() => backend1.shutdownAsync()
).then(() => cluster2.waitForAllBackendsToReceiveTraffic()
).then(() => backend1.startAsync()
).then(() => backend1.startAsync(xdsServer)
).then(() => cluster1.waitForAllBackendsToReceiveTraffic());
});
});

View File

@ -15,10 +15,10 @@
*
*/
import { Backend } from "./backend";
import { Backend, createBackends } from "./backend";
import { XdsTestClient } from "./client";
import { FakeEdsCluster, FakeRouteGroup } from "./framework";
import { XdsServer } from "./xds-server";
import { FakeEdsCluster, FakeRouteGroup, FakeServerRoute } from "./framework";
import { ControlPlaneServer } from "./xds-server";
import { register } from "../src";
import assert = require("assert");
@ -27,10 +27,10 @@ import { connectivityState } from "@grpc/grpc-js";
register();
describe('core xDS functionality', () => {
let xdsServer: XdsServer;
let xdsServer: ControlPlaneServer;
let client: XdsTestClient;
beforeEach(done => {
xdsServer = new XdsServer();
xdsServer = new ControlPlaneServer();
xdsServer.startServer(error => {
done(error);
});
@ -39,56 +39,65 @@ describe('core xDS functionality', () => {
client?.close();
xdsServer?.shutdownServer();
})
it('should route requests to the single backend', done => {
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}]);
it('should route requests to the single backend', async () => {
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
client.stopCalls();
done();
}, reason => done(reason));
}, reason => done(reason));
await routeGroup.startAllBackends(xdsServer);
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
await routeGroup.waitForAllBackendsToReceiveTraffic();
client.stopCalls();
});
it('should be able to enter and exit idle', function(done) {
this.timeout(5000);
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
createBackends(1).then(([backend]) => {
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer, {
'grpc.client_idle_timeout_ms': 1000,
});
client.sendOneCall(error => {
assert.ifError(error);
assert.strictEqual(client.getConnectivityState(), connectivityState.READY);
setTimeout(() => {
assert.strictEqual(client.getConnectivityState(), connectivityState.IDLE);
client.sendOneCall(error => {
done(error);
})
}, 1100);
});
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends(xdsServer).then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer, {
'grpc.client_idle_timeout_ms': 1000,
});
client.sendOneCall(error => {
assert.ifError(error);
assert.strictEqual(client.getConnectivityState(), connectivityState.READY);
setTimeout(() => {
assert.strictEqual(client.getConnectivityState(), connectivityState.IDLE);
client.sendOneCall(error => {
done(error);
})
}, 1100);
});
}, reason => done(reason));
}, reason => done(reason));
});
});

View File

@ -17,10 +17,10 @@
import { AnyExtension } from "@grpc/proto-loader";
import { Any } from "../src/generated/google/protobuf/Any";
import { Backend } from "./backend";
import { Backend, createBackends } from "./backend";
import { XdsTestClient } from "./client";
import { FakeEdsCluster, FakeRouteGroup } from "./framework";
import { XdsServer } from "./xds-server";
import { FakeEdsCluster, FakeRouteGroup, FakeServerRoute } from "./framework";
import { ControlPlaneServer } from "./xds-server";
import * as assert from 'assert';
import { WrrLocality } from "../src/generated/envoy/extensions/load_balancing_policies/wrr_locality/v3/WrrLocality";
import { TypedStruct } from "../src/generated/xds/type/v3/TypedStruct";
@ -119,10 +119,10 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
registerLoadBalancerType(LB_POLICY_NAME, RpcBehaviorLoadBalancer, RpcBehaviorLoadBalancingConfig);
describe('Custom LB policies', () => {
let xdsServer: XdsServer;
let xdsServer: ControlPlaneServer;
let client: XdsTestClient;
beforeEach(done => {
xdsServer = new XdsServer();
xdsServer = new ControlPlaneServer();
xdsServer.startServer(error => {
done(error);
});
@ -131,28 +131,32 @@ describe('Custom LB policies', () => {
client?.close();
xdsServer?.shutdownServer();
});
it('Should handle round_robin', done => {
it('Should handle round_robin', async () => {
const lbPolicy: Any = {
'@type': 'type.googleapis.com/envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin'
};
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy);
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
const error = await client.sendOneCallAsync();
assert.strictEqual(error, null);
});
it('Should handle xds_wrr_locality with round_robin child', done => {
it('Should handle xds_wrr_locality with round_robin child', async () => {
const lbPolicy: WrrLocality & AnyExtension = {
'@type': 'type.googleapis.com/envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality',
endpoint_picking_policy: {
@ -168,24 +172,28 @@ describe('Custom LB policies', () => {
]
}
};
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy);
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
const error = await client.sendOneCallAsync();
assert.strictEqual(error, null);
});
it('Should handle a typed_struct policy', done => {
it('Should handle a typed_struct policy', async () => {
const lbPolicy: TypedStruct & AnyExtension = {
'@type': 'type.googleapis.com/xds.type.v3.TypedStruct',
type_url: 'round_robin',
@ -193,24 +201,28 @@ describe('Custom LB policies', () => {
fields: {}
}
};
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy);
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
const error = await client.sendOneCallAsync();
assert.strictEqual(error, null);
});
it('Should handle xds_wrr_locality with an unrecognized first child', done => {
it('Should handle xds_wrr_locality with an unrecognized first child', async () => {
const invalidChildPolicy: TypedStruct & AnyExtension = {
'@type': 'type.googleapis.com/xds.type.v3.TypedStruct',
type_url: 'test.ThisLoadBalancerDoesNotExist',
@ -239,24 +251,28 @@ describe('Custom LB policies', () => {
]
}
};
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy);
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
const error = await client.sendOneCallAsync();
assert.strictEqual(error, null);
});
it('Should handle a custom LB policy', done => {
it('Should handle a custom LB policy', async () => {
const childPolicy: TypedStruct & AnyExtension = {
'@type': 'type.googleapis.com/xds.type.v3.TypedStruct',
type_url: 'test.RpcBehaviorLoadBalancer',
@ -279,47 +295,53 @@ describe('Custom LB policies', () => {
]
}
};
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy);
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(error => {
assert.strictEqual(error?.code, 15);
done();
});
}, reason => done(reason));
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
const error = await client.sendOneCallAsync();
assert(error);
assert.strictEqual(error.code, 15);
});
it('Should handle pick_first', done => {
it('Should handle pick_first', async () => {
const lbPolicy: PickFirst & AnyExtension = {
'@type': 'type.googleapis.com/envoy.extensions.load_balancing_policies.pick_first.v3.PickFirst',
shuffle_address_list: true
};
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy);
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
const error = await client.sendOneCallAsync();
assert.strictEqual(error, null);
});
});

View File

@ -15,16 +15,16 @@
*
*/
import { Backend } from "./backend";
import { Backend, createBackends } from "./backend";
import { XdsTestClient } from "./client";
import { FakeEdsCluster, FakeRouteGroup } from "./framework";
import { XdsServer } from "./xds-server";
import { FakeEdsCluster, FakeRouteGroup, FakeServerRoute } from "./framework";
import { ControlPlaneServer } from "./xds-server";
import assert = require("assert");
/* Test cases in this file are derived from examples in the xDS federation proposal
* https://github.com/grpc/proposal/blob/master/A47-xds-federation.md */
describe('Federation', () => {
let xdsServers: XdsServer[] = [];
let xdsServers: ControlPlaneServer[] = [];
let xdsClient: XdsTestClient;
afterEach(() => {
xdsClient?.close();
@ -36,30 +36,33 @@ describe('Federation', () => {
describe('Bootstrap Config Contains No New Fields', () => {
let bootstrap: string;
beforeEach((done) => {
const xdsServer = new XdsServer();
const xdsServer = new ControlPlaneServer();
xdsServers.push(xdsServer);
xdsServer.startServer(error => {
xdsServer.startServer(async error => {
if (error) {
done(error);
return;
}
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}]);
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('server.example.com', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
const bootstrapInfo = {
xds_servers: [xdsServer.getBootstrapServerConfig()],
node: {
id: 'test',
locality: {}
}
};
bootstrap = JSON.stringify(bootstrapInfo);
done();
});
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
const bootstrapInfo = {
xds_servers: [xdsServer.getBootstrapServerConfig()],
node: {
id: 'test',
locality: {}
}
};
bootstrap = JSON.stringify(bootstrapInfo);
done();
});
});
it('Should accept an old-style name', (done) => {
@ -78,35 +81,38 @@ describe('Federation', () => {
describe('New-Style Names on gRPC Client', () => {
let bootstrap: string;
beforeEach((done) => {
const xdsServer = new XdsServer();
const xdsServer = new ControlPlaneServer();
xdsServers.push(xdsServer);
xdsServer.startServer(error => {
xdsServer.startServer(async error => {
if (error) {
done(error);
return;
}
const cluster = new FakeEdsCluster('xdstp://xds.authority.com/envoy.config.cluster.v3.Cluster/cluster1', 'xdstp://xds.authority.com/envoy.config.endpoint.v3.ClusterLoadAssignment/endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}]);
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('xdstp://xds.authority.com/envoy.config.cluster.v3.Cluster/cluster1', 'xdstp://xds.authority.com/envoy.config.endpoint.v3.ClusterLoadAssignment/endpoint1', [{backends: [backend], locality:{region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('xdstp://xds.authority.com/envoy.config.listener.v3.Listener/server.example.com', 'xdstp://xds.authority.com/envoy.config.route.v3.RouteConfiguration/route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
const bootstrapInfo = {
xds_servers: [xdsServer.getBootstrapServerConfig()],
node: {
id: 'test',
locality: {}
},
"client_default_listener_resource_name_template": "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/%s",
"authorities": {
"xds.authority.com": {
}
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
const bootstrapInfo = {
xds_servers: [xdsServer.getBootstrapServerConfig()],
node: {
id: 'test',
locality: {}
},
"client_default_listener_resource_name_template": "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/%s",
"authorities": {
"xds.authority.com": {
}
};
bootstrap = JSON.stringify(bootstrapInfo);
done();
});
}
};
bootstrap = JSON.stringify(bootstrapInfo);
done();
});
});
it('Should accept a target with no authority', (done) => {
@ -125,59 +131,65 @@ describe('Federation', () => {
let defaultRouteGroup: FakeRouteGroup;
let otherRouteGroup: FakeRouteGroup;
beforeEach((done) => {
const defaultServer = new XdsServer();
const defaultServer = new ControlPlaneServer();
xdsServers.push(defaultServer);
const otherServer = new XdsServer();
const otherServer = new ControlPlaneServer();
xdsServers.push(otherServer);
defaultServer.startServer(error => {
if (error) {
done(error);
return;
}
otherServer.startServer(error => {
otherServer.startServer(async error => {
if (error) {
done(error);
return;
}
const defaultCluster = new FakeEdsCluster('xdstp://xds.authority.com/envoy.config.cluster.v3.Cluster/cluster1', 'xdstp://xds.authority.com/envoy.config.endpoint.v3.ClusterLoadAssignment/endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}]);
const [defaultBackend, otherBackend] = await createBackends(2);
const defaultServerRoute = new FakeServerRoute(defaultBackend.getPort(), 'serverRoute');
defaultServer.setRdsResource(defaultServerRoute.getRouteConfiguration());
defaultServer.setLdsResource(defaultServerRoute.getListener());
const otherServerRoute = new FakeServerRoute(otherBackend.getPort(), 'serverRoute');
otherServer.setRdsResource(otherServerRoute.getRouteConfiguration());
otherServer.setLdsResource(otherServerRoute.getListener());
const defaultCluster = new FakeEdsCluster('xdstp://xds.authority.com/envoy.config.cluster.v3.Cluster/cluster1', 'xdstp://xds.authority.com/envoy.config.endpoint.v3.ClusterLoadAssignment/endpoint1', [{backends: [defaultBackend], locality:{region: 'region1'}}]);
defaultRouteGroup = new FakeRouteGroup('xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/client/server.example.com?project_id=1234', 'xdstp://xds.authority.com/envoy.config.route.v3.RouteConfiguration/route1', [{cluster: defaultCluster}]);
const otherCluster = new FakeEdsCluster('xdstp://xds.other.com/envoy.config.cluster.v3.Cluster/cluster2', 'xdstp://xds.other.com/envoy.config.endpoint.v3.ClusterLoadAssignment/endpoint2', [{backends: [new Backend()], locality:{region: 'region2'}}]);
const otherCluster = new FakeEdsCluster('xdstp://xds.other.com/envoy.config.cluster.v3.Cluster/cluster2', 'xdstp://xds.other.com/envoy.config.endpoint.v3.ClusterLoadAssignment/endpoint2', [{backends: [otherBackend], locality:{region: 'region2'}}]);
otherRouteGroup = new FakeRouteGroup('xdstp://xds.other.com/envoy.config.listener.v3.Listener/server.other.com', 'xdstp://xds.other.com/envoy.config.route.v3.RouteConfiguration/route2', [{cluster: otherCluster}]);
Promise.all([defaultRouteGroup.startAllBackends(), otherRouteGroup.startAllBackends()]).then(() => {
defaultServer.setEdsResource(defaultCluster.getEndpointConfig());
defaultServer.setCdsResource(defaultCluster.getClusterConfig());
defaultServer.setRdsResource(defaultRouteGroup.getRouteConfiguration());
defaultServer.setLdsResource(defaultRouteGroup.getListener());
otherServer.setEdsResource(otherCluster.getEndpointConfig());
otherServer.setCdsResource(otherCluster.getClusterConfig());
otherServer.setRdsResource(otherRouteGroup.getRouteConfiguration());
otherServer.setLdsResource(otherRouteGroup.getListener());
const bootstrapInfo = {
xds_servers: [defaultServer.getBootstrapServerConfig()],
node: {
id: 'test',
locality: {}
defaultServer.setEdsResource(defaultCluster.getEndpointConfig());
defaultServer.setCdsResource(defaultCluster.getClusterConfig());
defaultServer.setRdsResource(defaultRouteGroup.getRouteConfiguration());
defaultServer.setLdsResource(defaultRouteGroup.getListener());
otherServer.setEdsResource(otherCluster.getEndpointConfig());
otherServer.setCdsResource(otherCluster.getClusterConfig());
otherServer.setRdsResource(otherRouteGroup.getRouteConfiguration());
otherServer.setLdsResource(otherRouteGroup.getListener());
await Promise.all([defaultRouteGroup.startAllBackends(defaultServer), otherRouteGroup.startAllBackends(otherServer)]);
const bootstrapInfo = {
xds_servers: [defaultServer.getBootstrapServerConfig()],
node: {
id: 'test',
locality: {}
},
// Resource name template for xds: target URIs with no authority.
"client_default_listener_resource_name_template": "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/client/%s?project_id=1234",
// Resource name template for xDS-enabled gRPC servers.
"server_listener_resource_name_template": "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/server/%s?project_id=1234",
// Authorities map.
"authorities": {
"xds.authority.com": {
"client_listener_resource_name_template": "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/client/%s?project_id=1234"
},
// Resource name template for xds: target URIs with no authority.
"client_default_listener_resource_name_template": "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/client/%s?project_id=1234",
// Resource name template for xDS-enabled gRPC servers.
"server_listener_resource_name_template": "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/server/%s?project_id=1234",
// Authorities map.
"authorities": {
"xds.authority.com": {
"client_listener_resource_name_template": "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/client/%s?project_id=1234"
},
"xds.other.com": {
"xds_servers": [otherServer.getBootstrapServerConfig()]
}
"xds.other.com": {
"xds_servers": [otherServer.getBootstrapServerConfig()]
}
};
bootstrap = JSON.stringify(bootstrapInfo);
done();
});
}
};
bootstrap = JSON.stringify(bootstrapInfo);
done();
});
});
});

View File

@ -18,18 +18,18 @@
import * as assert from 'assert';
import { register } from "../src";
import { Cluster } from '../src/generated/envoy/config/cluster/v3/Cluster';
import { Backend } from "./backend";
import { Backend, createBackends } from "./backend";
import { XdsTestClient } from "./client";
import { FakeEdsCluster, FakeRouteGroup } from "./framework";
import { XdsServer } from "./xds-server";
import { FakeEdsCluster, FakeRouteGroup, FakeServerRoute } from "./framework";
import { ControlPlaneServer } from "./xds-server";
register();
describe('Validation errors', () => {
let xdsServer: XdsServer;
let xdsServer: ControlPlaneServer;
let client: XdsTestClient;
beforeEach(done => {
xdsServer = new XdsServer();
xdsServer = new ControlPlaneServer();
xdsServer.startServer(error => {
done(error);
});
@ -39,122 +39,142 @@ describe('Validation errors', () => {
xdsServer?.shutdownServer();
});
it('Should continue to use a valid resource after receiving an invalid EDS update', done => {
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
// After backends receive calls, set invalid EDS resource
const invalidEdsResource = {cluster_name: cluster.getEndpointConfig().cluster_name, endpoints: [{}]};
xdsServer.setEdsResource(invalidEdsResource);
let seenNack = false;
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
if (seenNack) {
return;
createBackends(1).then(([backend]) => {
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality: {region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends(xdsServer).then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
// After backends receive calls, set invalid EDS resource
const invalidEdsResource = {cluster_name: cluster.getEndpointConfig().cluster_name, endpoints: [{}]};
xdsServer.setEdsResource(invalidEdsResource);
let seenNack = false;
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
if (seenNack) {
return;
}
seenNack = true;
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
client.stopCalls();
done();
});
}
seenNack = true;
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
client.stopCalls();
done();
});
}
});
});
}, reason => done(reason));
}, reason => done(reason));
}, reason => done(reason));
});
it('Should continue to use a valid resource after receiving an invalid CDS update', done => {
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
// After backends receive calls, set invalid CDS resource
const invalidCdsResource: Cluster = {name: cluster.getClusterConfig().name, type: 'EDS'};
xdsServer.setCdsResource(invalidCdsResource);
let seenNack = false;
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
if (seenNack) {
return;
createBackends(1).then(([backend]) => {
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality: {region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends(xdsServer).then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
// After backends receive calls, set invalid CDS resource
const invalidCdsResource: Cluster = {name: cluster.getClusterConfig().name, type: 'EDS'};
xdsServer.setCdsResource(invalidCdsResource);
let seenNack = false;
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
if (seenNack) {
return;
}
seenNack = true;
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
client.stopCalls();
done();
});
}
seenNack = true;
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
client.stopCalls();
done();
});
}
});
});
}, reason => done(reason));
}, reason => done(reason));
}, reason => done(reason));
});
it('Should continue to use a valid resource after receiving an invalid RDS update', done => {
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
// After backends receive calls, set invalid RDS resource
const invalidRdsResource = {name: routeGroup.getRouteConfiguration().name, virtual_hosts: [{domains: ['**']}]};
xdsServer.setRdsResource(invalidRdsResource);
let seenNack = false;
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
if (seenNack) {
return;
createBackends(1).then(([backend]) => {
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality: {region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends(xdsServer).then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
// After backends receive calls, set invalid RDS resource
const invalidRdsResource = {name: routeGroup.getRouteConfiguration().name, virtual_hosts: [{domains: ['**']}]};
xdsServer.setRdsResource(invalidRdsResource);
let seenNack = false;
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
if (seenNack) {
return;
}
seenNack = true;
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
client.stopCalls();
done();
});
}
seenNack = true;
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
client.stopCalls();
done();
});
}
});
});
}, reason => done(reason));
}, reason => done(reason));
}, reason => done(reason));
});
it('Should continue to use a valid resource after receiving an invalid LDS update', done => {
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
// After backends receive calls, set invalid LDS resource
const invalidLdsResource = {name: routeGroup.getListener().name};
xdsServer.setLdsResource(invalidLdsResource);
let seenNack = false;
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
if (seenNack) {
return;
createBackends(1).then(([backend]) => {
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality: {region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends(xdsServer).then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
// After backends receive calls, set invalid LDS resource
const invalidLdsResource = {name: routeGroup.getListener().name};
xdsServer.setLdsResource(invalidLdsResource);
let seenNack = false;
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
if (seenNack) {
return;
}
seenNack = true;
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
client.stopCalls();
done();
});
}
seenNack = true;
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
client.stopCalls();
done();
});
}
});
});
}, reason => done(reason));
}, reason => done(reason));
}, reason => done(reason));
});

View File

@ -15,10 +15,10 @@
*
*/
import { Backend } from "./backend";
import { Backend, createBackends } from "./backend";
import { XdsTestClient } from "./client";
import { FakeEdsCluster, FakeRouteGroup } from "./framework";
import { XdsServer } from "./xds-server";
import { FakeEdsCluster, FakeRouteGroup, FakeServerRoute } from "./framework";
import { ControlPlaneServer } from "./xds-server";
import { register } from "../src";
import assert = require("assert");
@ -30,10 +30,10 @@ import { EXPERIMENTAL_RING_HASH } from "../src/environment";
register();
describe('Ring hash LB policy', () => {
let xdsServer: XdsServer;
let xdsServer: ControlPlaneServer;
let client: XdsTestClient;
beforeEach(done => {
xdsServer = new XdsServer();
xdsServer = new ControlPlaneServer();
xdsServer.startServer(error => {
done(error);
});
@ -42,132 +42,136 @@ describe('Ring hash LB policy', () => {
client?.close();
xdsServer?.shutdownServer();
});
it('Should route requests to the single backend with the old lbPolicy field', function(done) {
it('Should route requests to the single backend with the old lbPolicy field', async function() {
if (!EXPERIMENTAL_RING_HASH) {
this.skip();
}
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], 'RING_HASH');
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}], 'RING_HASH');
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
return client.sendOneCallAsync();
});
it('Should route requests to the single backend with the new load_balancing_policy field', function(done) {
it('Should route requests to the single backend with the new load_balancing_policy field', async function() {
if (!EXPERIMENTAL_RING_HASH) {
this.skip();
}
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const lbPolicy: AnyExtension & RingHash = {
'@type': 'type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash',
hash_function: 'XX_HASH'
};
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy);
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
return client.sendOneCallAsync();
});
it('Should route all identical requests to the same backend', function(done) {
it('Should route all identical requests to the same backend', async function() {
if (!EXPERIMENTAL_RING_HASH) {
this.skip();
}
const backend1 = new Backend();
const backend2 = new Backend()
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const [backend1, backend2] = await createBackends(2);
const serverRoute1 = new FakeServerRoute(backend1.getPort(), 'serverRoute1');
xdsServer.setRdsResource(serverRoute1.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute1.getListener());
const serverRoute2 = new FakeServerRoute(backend1.getPort(), 'serverRoute2');
xdsServer.setRdsResource(serverRoute2.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute2.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend1, backend2], locality:{region: 'region1'}}], 'RING_HASH');
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendNCalls(10, error => {
assert.ifError(error);
assert((backend1.getCallCount() === 0) !== (backend2.getCallCount() === 0));
done();
})
}, reason => done(reason));
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
await client.sendNCallsAsync(10);
assert((backend1.getCallCount() === 0) !== (backend2.getCallCount() === 0));
});
it('Should fallback to a second backend if the first one goes down', function(done) {
it('Should fallback to a second backend if the first one goes down', async function() {
if (!EXPERIMENTAL_RING_HASH) {
this.skip();
}
const backends = [new Backend(), new Backend(), new Backend()];
const backends = await createBackends(3);
for (const backend of backends) {
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
}
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: backends, locality:{region: 'region1'}}], 'RING_HASH');
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
routeGroup.startAllBackends(xdsServer);
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
await client.sendNCallsAsync(100);
let backendWithTraffic: number | null = null;
for (let i = 0; i < backends.length; i++) {
if (backendWithTraffic === null) {
if (backends[i].getCallCount() > 0) {
backendWithTraffic = i;
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendNCalls(100, error => {
assert.ifError(error);
let backendWithTraffic: number | null = null;
for (let i = 0; i < backends.length; i++) {
if (backendWithTraffic === null) {
if (backends[i].getCallCount() > 0) {
backendWithTraffic = i;
}
} else {
assert.strictEqual(backends[i].getCallCount(), 0, `Backends ${backendWithTraffic} and ${i} both got traffic`);
}
} else {
assert.strictEqual(backends[i].getCallCount(), 0, `Backends ${backendWithTraffic} and ${i} both got traffic`);
}
}
assert.notStrictEqual(backendWithTraffic, null, 'No backend got traffic');
await backends[backendWithTraffic!].shutdownAsync();
backends[backendWithTraffic!].resetCallCount();
await client.sendNCallsAsync(100);
let backendWithTraffic2: number | null = null;
for (let i = 0; i < backends.length; i++) {
if (backendWithTraffic2 === null) {
if (backends[i].getCallCount() > 0) {
backendWithTraffic2 = i;
}
assert.notStrictEqual(backendWithTraffic, null, 'No backend got traffic');
backends[backendWithTraffic!].shutdown(error => {
assert.ifError(error);
backends[backendWithTraffic!].resetCallCount();
client.sendNCalls(100, error => {
assert.ifError(error);
let backendWithTraffic2: number | null = null;
for (let i = 0; i < backends.length; i++) {
if (backendWithTraffic2 === null) {
if (backends[i].getCallCount() > 0) {
backendWithTraffic2 = i;
}
} else {
assert.strictEqual(backends[i].getCallCount(), 0, `Backends ${backendWithTraffic2} and ${i} both got traffic`);
}
}
assert.notStrictEqual(backendWithTraffic2, null, 'No backend got traffic');
assert.notStrictEqual(backendWithTraffic2, backendWithTraffic, `Traffic went to the same backend ${backendWithTraffic} after shutdown`);
done();
});
});
});
}, reason => done(reason));
} else {
assert.strictEqual(backends[i].getCallCount(), 0, `Backends ${backendWithTraffic2} and ${i} both got traffic`);
}
}
assert.notStrictEqual(backendWithTraffic2, null, 'No backend got traffic');
assert.notStrictEqual(backendWithTraffic2, backendWithTraffic, `Traffic went to the same backend ${backendWithTraffic} after shutdown`);
})
});

View File

@ -0,0 +1,81 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { createBackends } from "./backend";
import { XdsTestClient } from "./client";
import { FakeEdsCluster, FakeRouteGroup, FakeServerRoute } from "./framework";
import { ControlPlaneServer } from "./xds-server";
import { register } from "../src";
import assert = require("assert");
import { connectivityState, status } from "@grpc/grpc-js";
register();
describe('xDS server', () => {
describe('Route handling', () => {
let xdsServer: ControlPlaneServer;
let client: XdsTestClient;
beforeEach(done => {
xdsServer = new ControlPlaneServer();
xdsServer.startServer(error => {
done(error);
});
});
afterEach(() => {
client?.close();
xdsServer?.shutdownServer();
});
it('should reject requests to invalid routes', async () => {
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute', undefined, {
virtual_hosts: [{
domains: ['*'],
routes: [{
match: {
prefix: ''
},
action: 'route',
route: {
cluster: 'any'
}
}]
}]
});
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
await routeGroup.startAllBackends(xdsServer);
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
client = XdsTestClient.createFromServer('listener1', xdsServer);
const error = await client.sendOneCallAsync();
assert(error);
assert.strictEqual(error.code, status.UNAVAILABLE);
assert.strictEqual(error.details, 'Routing error');
});
});
});

View File

@ -15,7 +15,7 @@
*
*/
import { ServerDuplexStream, Server, UntypedServiceImplementation, ServerCredentials, loadPackageDefinition } from "@grpc/grpc-js";
import { ServerDuplexStream, Server, UntypedServiceImplementation, ServerCredentials, loadPackageDefinition, experimental, logVerbosity } from "@grpc/grpc-js";
import { AnyExtension, loadSync } from "@grpc/proto-loader";
import { EventEmitter } from "stream";
import { Cluster } from "../src/generated/envoy/config/cluster/v3/Cluster";
@ -32,6 +32,12 @@ import * as lrsTypes from '../src/generated/lrs';
import { LoadStatsRequest__Output } from "../src/generated/envoy/service/load_stats/v3/LoadStatsRequest";
import { LoadStatsResponse } from "../src/generated/envoy/service/load_stats/v3/LoadStatsResponse";
const TRACER_NAME = 'control_plane_server';
function trace(text: string) {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
const loadedProtos = loadPackageDefinition(loadSync(
[
'envoy/service/discovery/v3/ads.proto',
@ -110,7 +116,7 @@ function isAdsTypeUrl(value: string): value is AdsTypeUrl {
return ADS_TYPE_URLS.has(value);
}
export class XdsServer {
export class ControlPlaneServer {
private resourceMap: ResourceMap = {
[EDS_TYPE_URL]: {
resourceTypeVersion: 0,
@ -134,6 +140,7 @@ export class XdsServer {
private clients = new Map<string, ServerDuplexStream<DiscoveryRequest__Output, DiscoveryResponse>>();
private server: Server | null = null;
private port: number | null = null;
private nextStreamId: number = 0;
addResponseListener(listener: ResponseListener) {
this.responseListeners.add(listener);
@ -144,6 +151,7 @@ export class XdsServer {
}
setResource<T extends AdsTypeUrl>(resource: ResourceAny<T>, name: string) {
trace(`Set resource type_url=${resource['@type']} name=${name}`);
const resourceTypeState = this.resourceMap[resource["@type"]] as ResourceTypeState<T>;
resourceTypeState.resourceTypeVersion += 1;
let resourceState: ResourceState<T> | undefined = resourceTypeState.resourceNameMap.get(name);
@ -160,18 +168,22 @@ export class XdsServer {
}
setLdsResource(resource: Listener) {
trace(`setLdsResource(${resource.name!})`);
this.setResource({...resource, '@type': LDS_TYPE_URL}, resource.name!);
}
setRdsResource(resource: RouteConfiguration) {
trace(`setRdsResource(${resource.name!})`);
this.setResource({...resource, '@type': RDS_TYPE_URL}, resource.name!);
}
setCdsResource(resource: Cluster) {
trace(`setCdsResource(${resource.name!})`);
this.setResource({...resource, '@type': CDS_TYPE_URL}, resource.name!);
}
setEdsResource(resource: ClusterLoadAssignment) {
trace(`setEdsResource(${resource.cluster_name!})`);
this.setResource({...resource, '@type': EDS_TYPE_URL}, resource.cluster_name!);
}
@ -271,6 +283,7 @@ export class XdsServer {
const requestedResourceNames = new Set(request.resource_names);
const resourceTypeState = this.resourceMap[request.type_url];
const updatedResources = new Set<string>();
trace(`Received request type_url=${request.type_url} names=[${Array.from(requestedResourceNames)}]`);
for (const resourceName of requestedResourceNames) {
if (this.maybeSubscribe(request.type_url, clientName, resourceName) || resourceTypeState.resourceNameMap.get(resourceName)!.resourceTypeVersion > clientResourceVersion) {
updatedResources.add(resourceName);
@ -282,8 +295,14 @@ export class XdsServer {
}
}
private getStreamId(): number {
const id = this.nextStreamId;
this.nextStreamId += 1;
return id;
}
StreamAggregatedResources(call: ServerDuplexStream<DiscoveryRequest__Output, DiscoveryResponse>) {
const clientName = call.getPeer();
const clientName = `${call.getPeer()}(${this.getStreamId()})`;
this.clients.set(clientName, call);
call.on('data', (request: DiscoveryRequest__Output) => {
this.handleRequest(clientName, request);
@ -319,7 +338,6 @@ export class XdsServer {
if (!error) {
this.server = server;
this.port = port;
server.start();
}
callback(error, port);
});
@ -348,7 +366,8 @@ export class XdsServer {
node: {
id: 'test',
locality: {}
}
},
server_listener_resource_name_template: '%s'
}
return JSON.stringify(bootstrapInfo);
}

View File

@ -6,7 +6,7 @@ export {
ConfigSelector,
createResolver,
} from './resolver';
export { GrpcUri, uriToString } from './uri-parser';
export { GrpcUri, uriToString, splitHostPort, HostPort } from './uri-parser';
export { Duration, durationToMs } from './duration';
export { BackoffTimeout } from './backoff-timeout';
export {
@ -52,3 +52,5 @@ export {
SuccessRateEjectionConfig,
FailurePercentageEjectionConfig,
} from './load-balancer-outlier-detection';
export { createServerCredentialsWithInterceptors } from './server-credentials';

View File

@ -48,6 +48,7 @@ import {
} from './make-client';
import { Metadata, MetadataOptions, MetadataValue } from './metadata';
import {
ConnectionInjector,
Server,
ServerOptions,
UntypedHandleCall,
@ -227,7 +228,7 @@ export const setLogVerbosity = (verbosity: LogVerbosity): void => {
logging.setLoggerVerbosity(verbosity);
};
export { Server, ServerOptions };
export { ConnectionInjector, Server, ServerOptions };
export { ServerCredentials };
export { KeyCertPair };

View File

@ -18,6 +18,7 @@
import { SecureServerOptions } from 'http2';
import { CIPHER_SUITES, getDefaultRootsData } from './tls-helpers';
import { SecureContextOptions } from 'tls';
import { ServerInterceptor } from '.';
export interface KeyCertPair {
private_key: Buffer;
@ -51,6 +52,9 @@ export abstract class ServerCredentials {
_getSettings(): SecureServerOptions | null {
return this.latestContextOptions;
}
_getInterceptors(): ServerInterceptor[] {
return [];
}
abstract _equals(other: ServerCredentials): boolean;
static createInsecure(): ServerCredentials {
@ -214,3 +218,42 @@ class SecureServerCredentials extends ServerCredentials {
return true;
}
}
class InterceptorServerCredentials extends ServerCredentials {
constructor(private readonly childCredentials: ServerCredentials, private readonly interceptors: ServerInterceptor[]) {
super();
}
_isSecure(): boolean {
return this.childCredentials._isSecure();
}
_equals(other: ServerCredentials): boolean {
if (!(other instanceof InterceptorServerCredentials)) {
return false;
}
if (!(this.childCredentials._equals(other.childCredentials))) {
return false;
}
if (this.interceptors.length !== other.interceptors.length) {
return false;
}
for (let i = 0; i < this.interceptors.length; i++) {
if (this.interceptors[i] !== other.interceptors[i]) {
return false;
}
}
return true;
}
override _getInterceptors(): ServerInterceptor[] {
return this.interceptors;
}
override _addWatcher(watcher: SecureContextWatcher): void {
this.childCredentials._addWatcher(watcher);
}
override _removeWatcher(watcher: SecureContextWatcher): void {
this.childCredentials._removeWatcher(watcher);
}
}
export function createServerCredentialsWithInterceptors(credentials: ServerCredentials, interceptors: ServerInterceptor[]): ServerCredentials {
return new InterceptorServerCredentials(credentials, interceptors);
}

View File

@ -317,6 +317,10 @@ export interface ServerInterceptingCallInterface {
* Return the call deadline set by the client. The value is Infinity if there is no deadline.
*/
getDeadline(): Deadline;
/**
* Return the host requested by the client in the ":authority" header.
*/
getHost(): string;
}
export class ServerInterceptingCall implements ServerInterceptingCallInterface {
@ -391,6 +395,9 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface {
getDeadline(): Deadline {
return this.nextCall.getDeadline();
}
getHost(): string {
return this.nextCall.getHost();
}
}
export interface ServerInterceptor {
@ -455,6 +462,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
private isReadPending = false;
private receivedHalfClose = false;
private streamEnded = false;
private host: string;
constructor(
private readonly stream: http2.ServerHttp2Stream,
@ -508,6 +516,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
}
this.host = headers[':authority'] ?? headers.host!;
const metadata = Metadata.fromHttp2Headers(headers);
if (logging.isTracerEnabled(TRACER_NAME)) {
@ -861,6 +870,9 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
getDeadline(): Deadline {
return this.deadline;
}
getHost(): string {
return this.host;
}
}
export function getServerInterceptingCall(

View File

@ -546,7 +546,7 @@ export class Server {
}
http2Server.setTimeout(0, noop);
this._setupHandlers(http2Server);
this._setupHandlers(http2Server, credentials._getInterceptors());
return http2Server;
}
@ -1189,6 +1189,7 @@ export class Server {
}
private _channelzHandler(
extraInterceptors: ServerInterceptor[],
stream: http2.ServerHttp2Stream,
headers: http2.IncomingHttpHeaders
) {
@ -1248,7 +1249,7 @@ export class Server {
}
}
const call = getServerInterceptingCall(this.interceptors, stream, headers, callEventTracker, handler, this.options);
const call = getServerInterceptingCall([...extraInterceptors, ...this.interceptors], stream, headers, callEventTracker, handler, this.options);
if (!this._runHandlerForCall(call, handler)) {
this.callTracker.addCallFailed();
@ -1262,6 +1263,7 @@ export class Server {
}
private _streamHandler(
extraInterceptors: ServerInterceptor[],
stream: http2.ServerHttp2Stream,
headers: http2.IncomingHttpHeaders
) {
@ -1281,7 +1283,7 @@ export class Server {
return;
}
const call = getServerInterceptingCall(this.interceptors, stream, headers, null, handler, this.options);
const call = getServerInterceptingCall([...extraInterceptors, ...this.interceptors], stream, headers, null, handler, this.options);
if (!this._runHandlerForCall(call, handler)) {
call.sendStatus({
@ -1322,7 +1324,8 @@ export class Server {
}
private _setupHandlers(
http2Server: http2.Http2Server | http2.Http2SecureServer
http2Server: http2.Http2Server | http2.Http2SecureServer,
extraInterceptors: ServerInterceptor[]
): void {
if (http2Server === null) {
return;
@ -1343,7 +1346,7 @@ export class Server {
? this._channelzHandler
: this._streamHandler;
http2Server.on('stream', handler.bind(this));
http2Server.on('stream', handler.bind(this, extraInterceptors));
http2Server.on('session', session => {
const channelzRef = registerChannelzSocket(