Add routing and traffic splitting functionality

This commit is contained in:
Michael Lumish 2021-02-24 14:25:41 -08:00
parent c953a0e212
commit 131b604f2c
9 changed files with 723 additions and 602 deletions

View File

@ -42,7 +42,8 @@
"yargs": "^15.4.1"
},
"dependencies": {
"@grpc/proto-loader": "^0.6.0-pre14"
"@grpc/proto-loader": "^0.6.0-pre14",
"re2-wasm": "^1.0.1"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.2.2"

View File

@ -0,0 +1,18 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
export const GRPC_XDS_EXPERIMENTAL_ROUTING = (process.env.GRPC_XDS_EXPERIMENTAL_ROUTING === 'true');

View File

@ -16,7 +16,7 @@
*/
import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js';
import { XdsClient } from './xds-client';
import { getSingletonXdsClient, XdsClient } from './xds-client';
import { Cluster__Output } from './generated/envoy/api/v2/Cluster';
import SubchannelAddress = experimental.SubchannelAddress;
import UnavailablePicker = experimental.UnavailablePicker;
@ -66,7 +66,6 @@ export class CdsLoadBalancingConfig implements LoadBalancingConfig {
export class CdsLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private xdsClient: XdsClient | null = null;
private watcher: Watcher<Cluster__Output>;
private isWatcherActive = false;
@ -127,7 +126,6 @@ export class CdsLoadBalancer implements LoadBalancer {
return;
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
this.xdsClient = attributes.xdsClient;
this.latestAttributes = attributes;
/* If the cluster is changing, disable the old watcher before adding the new
@ -137,7 +135,7 @@ export class CdsLoadBalancer implements LoadBalancer {
this.latestConfig?.getCluster() !== lbConfig.getCluster()
) {
trace('Removing old cluster watcher for cluster name ' + this.latestConfig!.getCluster());
this.xdsClient.removeClusterWatcher(
getSingletonXdsClient().removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
@ -153,7 +151,7 @@ export class CdsLoadBalancer implements LoadBalancer {
if (!this.isWatcherActive) {
trace('Adding new cluster watcher for cluster name ' + lbConfig.getCluster());
this.xdsClient.addClusterWatcher(lbConfig.getCluster(), this.watcher);
getSingletonXdsClient().addClusterWatcher(lbConfig.getCluster(), this.watcher);
this.isWatcherActive = true;
}
}
@ -167,7 +165,7 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Destroying load balancer with cluster name ' + this.latestConfig?.getCluster());
this.childBalancer.destroy();
if (this.isWatcherActive) {
this.xdsClient?.removeClusterWatcher(
getSingletonXdsClient().removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);

View File

@ -16,7 +16,7 @@
*/
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental } from '@grpc/grpc-js';
import { XdsClient, Watcher, XdsClusterDropStats } from './xds-client';
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from './xds-client';
import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from './load-balancer-priority';
@ -33,6 +33,7 @@ import PickResultType = experimental.PickResultType;
import { validateLoadBalancingConfig } from '@grpc/grpc-js/build/src/experimental';
import { WeightedTarget, WeightedTargetLoadBalancingConfig } from './load-balancer-weighted-target';
import { LrsLoadBalancingConfig } from './load-balancer-lrs';
import { Watcher } from './xds-stream-state/xds-stream-state';
const TRACER_NAME = 'eds_balancer';
@ -122,11 +123,10 @@ export class EdsLoadBalancer implements LoadBalancer {
* requests.
*/
private childBalancer: ChildLoadBalancerHandler;
private xdsClient: XdsClient | null = null;
private edsServiceName: string | null = null;
private watcher: Watcher<ClusterLoadAssignment__Output>;
/**
* Indicates whether the watcher has already been passed to this.xdsClient
* Indicates whether the watcher has already been passed to the xdsClient
* and is getting updates.
*/
private isWatcherActive = false;
@ -434,14 +434,13 @@ export class EdsLoadBalancer implements LoadBalancer {
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
this.lastestConfig = lbConfig;
this.latestAttributes = attributes;
this.xdsClient = attributes.xdsClient;
const newEdsServiceName = lbConfig.getEdsServiceName() ?? lbConfig.getCluster();
/* If the name is changing, disable the old watcher before adding the new
* one */
if (this.isWatcherActive && this.edsServiceName !== newEdsServiceName) {
trace('Removing old endpoint watcher for edsServiceName ' + this.edsServiceName)
this.xdsClient.removeEndpointWatcher(this.edsServiceName!, this.watcher);
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName!, this.watcher);
/* Setting isWatcherActive to false here lets us have one code path for
* calling addEndpointWatcher */
this.isWatcherActive = false;
@ -454,12 +453,12 @@ export class EdsLoadBalancer implements LoadBalancer {
if (!this.isWatcherActive) {
trace('Adding new endpoint watcher for edsServiceName ' + this.edsServiceName);
this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher);
getSingletonXdsClient().addEndpointWatcher(this.edsServiceName, this.watcher);
this.isWatcherActive = true;
}
if (lbConfig.getLrsLoadReportingServerName()) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
this.clusterDropStats = getSingletonXdsClient().addClusterDropStats(
lbConfig.getLrsLoadReportingServerName()!,
lbConfig.getCluster(),
lbConfig.getEdsServiceName() ?? ''
@ -480,7 +479,7 @@ export class EdsLoadBalancer implements LoadBalancer {
destroy(): void {
trace('Destroying load balancer with edsServiceName ' + this.edsServiceName);
if (this.edsServiceName) {
this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher);
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName, this.watcher);
}
this.childBalancer.destroy();
}

View File

@ -14,7 +14,11 @@
* limitations under the License.
*/
import { XdsClient } from './xds-client';
import * as protoLoader from '@grpc/proto-loader';
import { RE2 } from 're2-wasm';
import { getSingletonXdsClient, XdsClient } from './xds-client';
import { StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions } from '@grpc/grpc-js';
import Resolver = experimental.Resolver;
import GrpcUri = experimental.GrpcUri;
@ -22,6 +26,17 @@ import ResolverListener = experimental.ResolverListener;
import uriToString = experimental.uriToString;
import ServiceConfig = experimental.ServiceConfig;
import registerResolver = experimental.registerResolver;
import { Listener__Output } from './generated/envoy/api/v2/Listener';
import { Watcher } from './xds-stream-state/xds-stream-state';
import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration';
import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager';
import { GRPC_XDS_EXPERIMENTAL_ROUTING } from './environment';
import { CdsLoadBalancingConfig } from './load-balancer-cds';
import { VirtualHost__Output } from './generated/envoy/api/v2/route/VirtualHost';
import { RouteMatch__Output } from './generated/envoy/api/v2/route/RouteMatch';
import { HeaderMatcher__Output } from './generated/envoy/api/v2/route/HeaderMatcher';
import ConfigSelector = experimental.ConfigSelector;
import LoadBalancingConfig = experimental.LoadBalancingConfig;
const TRACER_NAME = 'xds_resolver';
@ -29,15 +44,404 @@ function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
// Better match type has smaller value.
enum MatchType {
EXACT_MATCH,
SUFFIX_MATCH,
PREFIX_MATCH,
UNIVERSE_MATCH,
INVALID_MATCH,
};
function domainPatternMatchType(domainPattern: string): MatchType {
if (domainPattern.length === 0) {
return MatchType.INVALID_MATCH;
}
if (domainPattern.indexOf('*') < 0) {
return MatchType.EXACT_MATCH;
}
if (domainPattern === '*') {
return MatchType.UNIVERSE_MATCH;
}
if (domainPattern.startsWith('*')) {
return MatchType.SUFFIX_MATCH;
}
if (domainPattern.endsWith('*')) {
return MatchType.PREFIX_MATCH;
}
return MatchType.INVALID_MATCH;
}
function domainMatch(matchType: MatchType, domainPattern: string, expectedHostName: string) {
switch (matchType) {
case MatchType.EXACT_MATCH:
return expectedHostName === domainPattern;
case MatchType.SUFFIX_MATCH:
return expectedHostName.endsWith(domainPattern.substring(1));
case MatchType.PREFIX_MATCH:
return expectedHostName.startsWith(domainPattern.substring(0, domainPattern.length - 1));
case MatchType.UNIVERSE_MATCH:
return true;
case MatchType.INVALID_MATCH:
return false;
}
}
function findVirtualHostForDomain(virutalHostList: VirtualHost__Output[], domain: string): VirtualHost__Output | null {
let targetVhost: VirtualHost__Output | null = null;
let bestMatchType: MatchType = MatchType.INVALID_MATCH;
let longestMatch = 0;
for (const virtualHost of virutalHostList) {
for (const domainPattern of virtualHost.domains) {
const matchType = domainPatternMatchType(domainPattern);
// If we already have a match of a better type, skip this one
if (matchType > bestMatchType) {
continue;
}
// If we already have a longer match of the same type, skip this one
if (matchType === bestMatchType && domainPattern.length <= longestMatch) {
continue;
}
if (domainMatch(matchType, domainPattern, domain)) {
targetVhost = virtualHost;
bestMatchType = matchType;
longestMatch = domainPattern.length;
}
if (bestMatchType === MatchType.EXACT_MATCH) {
break;
}
}
if (bestMatchType === MatchType.EXACT_MATCH) {
break;
}
}
return targetVhost;
}
interface Matcher {
(methodName: string, metadata: Metadata): boolean;
}
const numberRegex = new RE2(/^-?\d+$/u);
function getPredicateForHeaderMatcher(headerMatch: HeaderMatcher__Output): Matcher {
let valueChecker: (value: string) => boolean;
switch (headerMatch.header_match_specifier) {
case 'exact_match':
valueChecker = value => value === headerMatch.exact_match;
break;
case 'safe_regex_match':
const regex = new RE2(`^${headerMatch.safe_regex_match}$`, 'u');
valueChecker = value => regex.test(value);
break;
case 'range_match':
const start = BigInt(headerMatch.range_match!.start);
const end = BigInt(headerMatch.range_match!.end);
valueChecker = value => {
if (!numberRegex.test(value)) {
return false;
}
const numberValue = BigInt(value);
return start <= numberValue && numberValue < end;
}
break;
case 'present_match':
valueChecker = value => true;
break;
case 'prefix_match':
valueChecker = value => value.startsWith(headerMatch.prefix_match!);
break;
case 'suffix_match':
valueChecker = value => value.endsWith(headerMatch.suffix_match!);
break;
default:
// Should be prevented by validation rules
return (methodName, metadata) => false;
}
const headerMatcher: Matcher = (methodName, metadata) => {
if (headerMatch.name.endsWith('-bin')) {
return false;
}
let value: string;
if (headerMatch.name === 'content-type') {
value = 'application/grpc';
} else {
const valueArray = metadata.get(headerMatch.name);
if (valueArray.length === 0) {
return false;
} else {
value = valueArray.join(',');
}
}
return valueChecker(value);
}
if (headerMatch.invert_match) {
return (methodName, metadata) => !headerMatcher(methodName, metadata);
} else {
return headerMatcher;
}
}
const RUNTIME_FRACTION_DENOMINATOR_VALUES = {
HUNDRED: 100,
TEN_THOUSAND: 10_000,
MILLION: 1_000_000
}
function getPredicateForMatcher(routeMatch: RouteMatch__Output): Matcher {
let pathMatcher: Matcher;
switch (routeMatch.path_specifier) {
case 'prefix':
if (routeMatch.case_sensitive?.value === false) {
const prefix = routeMatch.prefix!.toLowerCase();
pathMatcher = (methodName, metadata) => (methodName.toLowerCase().startsWith(prefix));
} else {
const prefix = routeMatch.prefix!;
pathMatcher = (methodName, metadata) => (methodName.startsWith(prefix));
}
break;
case 'path':
if (routeMatch.case_sensitive?.value === false) {
const path = routeMatch.path!.toLowerCase();
pathMatcher = (methodName, metadata) => (methodName.toLowerCase() === path);
} else {
const path = routeMatch.path!;
pathMatcher = (methodName, metadata) => (methodName === path);
}
break;
case 'safe_regex':
const flags = routeMatch.case_sensitive?.value === false ? 'ui' : 'u';
const regex = new RE2(`^${routeMatch.safe_regex!.regex!}$`, flags);
pathMatcher = (methodName, metadata) => (regex.test(methodName));
break;
default:
// Should be prevented by validation rules
return (methodName, metadata) => false;
}
const headerMatchers: Matcher[] = routeMatch.headers.map(getPredicateForHeaderMatcher);
let runtimeFractionHandler: () => boolean;
if (!routeMatch.runtime_fraction?.default_value) {
runtimeFractionHandler = () => true;
} else {
const numerator = routeMatch.runtime_fraction.default_value.numerator;
const denominator = RUNTIME_FRACTION_DENOMINATOR_VALUES[routeMatch.runtime_fraction.default_value.denominator];
runtimeFractionHandler = () => {
const randomNumber = Math.random() * denominator;
return randomNumber < numerator;
}
}
return (methodName, metadata) => pathMatcher(methodName, metadata) && headerMatchers.every(matcher => matcher(methodName, metadata)) && runtimeFractionHandler();
}
class XdsResolver implements Resolver {
private hasReportedSuccess = false;
private xdsClient: XdsClient | null = null;
private ldsWatcher: Watcher<Listener__Output>;
private rdsWatcher: Watcher<RouteConfiguration__Output>
private isLdsWatcherActive = false;
/**
* The latest route config name from an LDS response. The RDS watcher is
* actively watching that name if and only if this is not null.
*/
private latestRouteConfigName: string | null = null;
private latestRouteConfig: RouteConfiguration__Output | null = null;
private clusterRefcounts = new Map<string, {inLastConfig: boolean, refCount: number}>();
constructor(
private target: GrpcUri,
private listener: ResolverListener,
private channelOptions: ChannelOptions
) {}
) {
this.ldsWatcher = {
onValidUpdate: (update: Listener__Output) => {
const httpConnectionManager = update.api_listener!
.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds': {
const routeConfigName = httpConnectionManager.rds!.route_config_name;
if (this.latestRouteConfigName !== routeConfigName) {
if (this.latestRouteConfigName !== null) {
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
}
getSingletonXdsClient().addRouteWatcher(httpConnectionManager.rds!.route_config_name, this.rdsWatcher);
this.latestRouteConfigName = routeConfigName;
}
break;
}
case 'route_config':
if (this.latestRouteConfigName) {
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
}
this.handleRouteConfig(httpConnectionManager.route_config!);
break;
default:
// This is prevented by the validation rules
}
},
onTransientError: (error: StatusObject) => {
/* A transient error only needs to bubble up as a failure if we have
* not already provided a ServiceConfig for the upper layer to use */
if (!this.hasReportedSuccess) {
trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details);
this.reportResolutionError(error.details);
}
},
onResourceDoesNotExist: () => {
trace('Resolution error for target ' + uriToString(this.target) + ': LDS resource does not exist');
this.reportResolutionError(`Listener ${this.target} does not exist`);
}
};
this.rdsWatcher = {
onValidUpdate: (update: RouteConfiguration__Output) => {
this.handleRouteConfig(update);
},
onTransientError: (error: StatusObject) => {
/* A transient error only needs to bubble up as a failure if we have
* not already provided a ServiceConfig for the upper layer to use */
if (!this.hasReportedSuccess) {
trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details);
this.reportResolutionError(error.details);
}
},
onResourceDoesNotExist: () => {
trace('Resolution error for target ' + uriToString(this.target) + ' and route config ' + this.latestRouteConfigName + ': RDS resource does not exist');
this.reportResolutionError(`Route config ${this.latestRouteConfigName} does not exist`);
}
}
}
private refCluster(clusterName: string) {
const refCount = this.clusterRefcounts.get(clusterName);
if (refCount) {
refCount.refCount += 1;
}
}
private unrefCluster(clusterName: string) {
const refCount = this.clusterRefcounts.get(clusterName);
if (refCount) {
refCount.refCount -= 1;
if (!refCount.inLastConfig && refCount.refCount === 0) {
this.clusterRefcounts.delete(clusterName);
this.handleRouteConfig(this.latestRouteConfig!);
}
}
}
private handleRouteConfig(routeConfig: RouteConfiguration__Output) {
this.latestRouteConfig = routeConfig;
if (GRPC_XDS_EXPERIMENTAL_ROUTING) {
const virtualHost = findVirtualHostForDomain(routeConfig.virtual_hosts, this.target.path);
if (virtualHost === null) {
this.reportResolutionError('No matching route found');
return;
}
const allConfigClusters = new Set<string>();
const matchList: {matcher: Matcher, action: () => string}[] = [];
for (const route of virtualHost.routes) {
let routeAction: () => string;
switch (route.route!.cluster_specifier) {
case 'cluster_header':
continue;
case 'cluster':{
const cluster = route.route!.cluster!;
allConfigClusters.add(cluster);
routeAction = () => cluster;
break;
}
case 'weighted_clusters': {
let lastNumerator = 0;
// clusterChoices is essentially the weighted choices represented as a CDF
const clusterChoices: {cluster: string, numerator: number}[] = [];
for (const clusterWeight of route.route!.weighted_clusters!.clusters) {
allConfigClusters.add(clusterWeight.name);
lastNumerator = lastNumerator + (clusterWeight.weight?.value ?? 0);
clusterChoices.push({cluster: clusterWeight.name, numerator: lastNumerator});
}
routeAction = () => {
const randomNumber = Math.random() * (route.route!.weighted_clusters!.total_weight?.value ?? 100);
for (const choice of clusterChoices) {
if (randomNumber < choice.numerator) {
return choice.cluster;
}
}
// This should be prevented by the validation rules
return '';
}
}
}
const routeMatcher = getPredicateForMatcher(route.match!);
matchList.push({matcher: routeMatcher, action: routeAction});
}
// Mark clusters that are not in this config, and remove ones with no references
for (const [name, refCount] of Array.from(this.clusterRefcounts.entries())) {
if (!allConfigClusters.has(name)) {
refCount.inLastConfig = false;
if (refCount.refCount === 0) {
this.clusterRefcounts.delete(name);
}
}
}
for (const name of allConfigClusters) {
if (this.clusterRefcounts.has(name)) {
this.clusterRefcounts.get(name)!.inLastConfig = true;
} else {
this.clusterRefcounts.set(name, {inLastConfig: true, refCount: 0});
}
}
const configSelector: ConfigSelector = (methodName, metadata) => {
for (const {matcher, action} of matchList) {
if (matcher(methodName, metadata)) {
const clusterName = action();
this.refCluster(clusterName);
const onCommitted = () => {
this.unrefCluster(clusterName);
}
return {
methodConfig: {name: []},
onCommitted: onCommitted,
pickInformation: {cluster: clusterName},
status: status.OK
};
}
}
return {
methodConfig: {name: []},
// pickInformation won't be used here, but it's set because of some TypeScript weirdness
pickInformation: {cluster: ''},
status: status.UNAVAILABLE
};
};
const clusterConfigMap = new Map<string, {child_policy: LoadBalancingConfig[]}>();
for (const clusterName of this.clusterRefcounts.keys()) {
clusterConfigMap.set(clusterName, {child_policy: [new CdsLoadBalancingConfig(clusterName)]});
}
// TODO: Create xdsClusterManagerLoadBalancingConfig and report successful resolution
} else {
for (const virtualHost of routeConfig.virtual_hosts) {
if (virtualHost.domains.indexOf(this.target.path) >= 0) {
const route = virtualHost.routes[virtualHost.routes.length - 1];
if (route.match?.prefix === '' && route.route?.cluster) {
trace('Reporting RDS update for host ' + uriToString(this.target) + ' with cluster ' + route.route.cluster);
this.listener.onSuccessfulResolution([], {
methodConfig: [],
loadBalancingConfig: [
new CdsLoadBalancingConfig(route.route.cluster)
],
}, null, null, {});
this.hasReportedSuccess = true;
return;
} else {
trace('Discarded matching route with prefix ' + route.match?.prefix + ' and cluster ' + route.route?.cluster);
}
}
}
this.reportResolutionError('No matching route found');
}
}
private reportResolutionError(reason: string) {
this.listener.onError({
@ -51,38 +455,18 @@ class XdsResolver implements Resolver {
updateResolution(): void {
// Wait until updateResolution is called once to start the xDS requests
if (this.xdsClient === null) {
if (!this.isLdsWatcherActive) {
trace('Starting resolution for target ' + uriToString(this.target));
this.xdsClient = new XdsClient(
this.target.path,
{
onValidUpdate: (update: ServiceConfig) => {
trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update));
this.hasReportedSuccess = true;
this.listener.onSuccessfulResolution([], update, null, null, {
xdsClient: this.xdsClient,
});
},
onTransientError: (error: StatusObject) => {
/* A transient error only needs to bubble up as a failure if we have
* not already provided a ServiceConfig for the upper layer to use */
if (!this.hasReportedSuccess) {
trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details);
this.reportResolutionError(error.details);
}
},
onResourceDoesNotExist: () => {
trace('Resolution error for target ' + uriToString(this.target) + ': resource does not exist');
this.reportResolutionError("Resource does not exist");
},
},
this.channelOptions
);
getSingletonXdsClient().addListenerWatcher(this.target.path, this.ldsWatcher);
this.isLdsWatcherActive = true;
}
}
destroy() {
this.xdsClient?.shutdown();
getSingletonXdsClient().removeListenerWatcher(this.target.path, this.ldsWatcher);
if (this.latestRouteConfigName) {
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
}
}
static getDefaultAuthority(target: GrpcUri) {

View File

@ -50,6 +50,11 @@ import BackoffTimeout = experimental.BackoffTimeout;
import ServiceConfig = experimental.ServiceConfig;
import createGoogleDefaultCredentials = experimental.createGoogleDefaultCredentials;
import { CdsLoadBalancingConfig } from './load-balancer-cds';
import { EdsState } from './xds-stream-state/eds-state';
import { CdsState } from './xds-stream-state/cds-state';
import { RdsState } from './xds-stream-state/rds-state';
import { LdsState } from './xds-stream-state/lds-state';
import { Watcher } from './xds-stream-state/xds-stream-state';
const TRACER_NAME = 'xds_client';
@ -131,12 +136,6 @@ function localityEqual(
);
}
export interface Watcher<UpdateType> {
onValidUpdate(update: UpdateType): void;
onTransientError(error: StatusObject): void;
onResourceDoesNotExist(): void;
}
export interface XdsClusterDropStats {
addCallDropped(category: string): void;
}
@ -219,450 +218,6 @@ class ClusterLoadReportMap {
}
}
interface XdsStreamState<ResponseType> {
versionInfo: string;
nonce: string;
getResourceNames(): string[];
/**
* Returns a string containing the error details if the message should be nacked,
* or null if it should be acked.
* @param responses
*/
handleResponses(responses: ResponseType[]): string | null;
reportStreamError(status: StatusObject): void;
}
class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
public versionInfo = '';
public nonce = '';
private watchers: Map<
string,
Watcher<ClusterLoadAssignment__Output>[]
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();
private latestResponses: ClusterLoadAssignment__Output[] = [];
constructor(private updateResourceNames: () => void) {}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param edsServiceName
* @param watcher
*/
addWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
let watchersEntry = this.watchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(edsServiceName, watchersEntry);
}
trace('Adding EDS watcher (' + watchersEntry.length + ' ->' + (watchersEntry.length + 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName);
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
trace('Removing EDS watcher for edsServiceName ' + edsServiceName);
const watchersEntry = this.watchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
trace('Removed EDS watcher (' + watchersEntry.length + ' -> ' + (watchersEntry.length - 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateResponse(message: ClusterLoadAssignment__Output) {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
return false;
}
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
return false;
}
}
}
return true;
}
/**
* Given a list of edsServiceNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
handleMissingNames(allEdsServiceNames: Set<string>) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allEdsServiceNames.has(edsServiceName)) {
trace('Reporting EDS resource does not exist for edsServiceName ' + edsServiceName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: ClusterLoadAssignment__Output[]) {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('EDS validation failed for message ' + JSON.stringify(message));
return 'EDS Error: ClusterLoadAssignment validation failed';
}
}
this.latestResponses = responses;
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
allClusterNames.add(message.cluster_name);
const watchers = this.watchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
trace('Received EDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
return null;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}
class CdsState implements XdsStreamState<Cluster__Output> {
versionInfo = '';
nonce = '';
private watchers: Map<string, Watcher<Cluster__Output>[]> = new Map<
string,
Watcher<Cluster__Output>[]
>();
private latestResponses: Cluster__Output[] = [];
constructor(
private edsState: EdsState,
private updateResourceNames: () => void
) {}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param clusterName
* @param watcher
*/
addWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Adding CDS watcher for clusterName ' + clusterName);
let watchersEntry = this.watchers.get(clusterName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(clusterName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.name === clusterName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName);
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Removing CDS watcher for clusterName ' + clusterName);
const watchersEntry = this.watchers.get(clusterName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(clusterName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
private validateResponse(message: Cluster__Output): boolean {
if (message.type !== 'EDS') {
return false;
}
if (!message.eds_cluster_config?.eds_config?.ads) {
return false;
}
if (message.lb_policy !== 'ROUND_ROBIN') {
return false;
}
if (message.lrs_server) {
if (!message.lrs_server.self) {
return false;
}
}
return true;
}
/**
* Given a list of clusterNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
private handleMissingNames(allClusterNames: Set<string>) {
for (const [clusterName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(clusterName)) {
trace('Reporting CDS resource does not exist for clusterName ' + clusterName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: Cluster__Output[]): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('CDS validation failed for message ' + JSON.stringify(message));
return 'CDS Error: Cluster validation failed';
}
}
this.latestResponses = responses;
const allEdsServiceNames: Set<string> = new Set<string>();
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
allClusterNames.add(message.name);
const edsServiceName = message.eds_cluster_config?.service_name ?? '';
allEdsServiceNames.add(
edsServiceName === '' ? message.name : edsServiceName
);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
trace('Received CDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
this.edsState.handleMissingNames(allEdsServiceNames);
return null;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}
class RdsState implements XdsStreamState<RouteConfiguration__Output> {
versionInfo = '';
nonce = '';
private routeConfigName: string | null = null;
constructor(
private targetName: string,
private watcher: Watcher<ServiceConfig>,
private updateResouceNames: () => void
) {}
getResourceNames(): string[] {
return this.routeConfigName ? [this.routeConfigName] : [];
}
handleSingleMessage(message: RouteConfiguration__Output) {
for (const virtualHost of message.virtual_hosts) {
if (virtualHost.domains.indexOf(this.targetName) >= 0) {
const route = virtualHost.routes[virtualHost.routes.length - 1];
if (route.match?.prefix === '' && route.route?.cluster) {
trace('Reporting RDS update for host ' + this.targetName + ' with cluster ' + route.route.cluster);
this.watcher.onValidUpdate({
methodConfig: [],
loadBalancingConfig: [
new CdsLoadBalancingConfig(route.route.cluster)
],
});
return;
} else {
trace('Discarded matching route with prefix ' + route.match?.prefix + ' and cluster ' + route.route?.cluster);
}
}
}
trace('Reporting RDS resource does not exist from domain lists ' + message.virtual_hosts.map(virtualHost => virtualHost.domains));
/* If none of the routes match the one we are looking for, bubble up an
* error. */
this.watcher.onResourceDoesNotExist();
}
handleResponses(responses: RouteConfiguration__Output[]): string | null {
trace('Received RDS response with route config names ' + responses.map(message => message.name));
if (this.routeConfigName !== null) {
for (const message of responses) {
if (message.name === this.routeConfigName) {
this.handleSingleMessage(message);
return null;
}
}
}
return null;
}
setRouteConfigName(name: string | null) {
const oldName = this.routeConfigName;
this.routeConfigName = name;
if (name !== oldName) {
this.updateResouceNames();
}
}
reportStreamError(status: StatusObject): void {
this.watcher.onTransientError(status);
}
}
class LdsState implements XdsStreamState<Listener__Output> {
versionInfo = '';
nonce = '';
constructor(private targetName: string, private rdsState: RdsState) {}
getResourceNames(): string[] {
return [this.targetName];
}
private validateResponse(message: Listener__Output): boolean {
if (
!(
message.api_listener?.api_listener &&
protoLoader.isAnyExtension(message.api_listener.api_listener) &&
message.api_listener?.api_listener['@type'] ===
HTTP_CONNECTION_MANGER_TYPE_URL
)
) {
return false;
}
const httpConnectionManager = message.api_listener
?.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds':
return !!httpConnectionManager.rds?.config_source?.ads;
case 'route_config':
return true;
}
return false;
}
handleResponses(responses: Listener__Output[]): string | null {
trace('Received LDS update with names ' + responses.map(message => message.name));
for (const message of responses) {
if (message.name === this.targetName) {
if (this.validateResponse(message)) {
// The validation step ensures that this is correct
const httpConnectionManager = message.api_listener!
.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds':
trace('Received LDS update with RDS route config name ' + httpConnectionManager.rds!.route_config_name);
this.rdsState.setRouteConfigName(
httpConnectionManager.rds!.route_config_name
);
break;
case 'route_config':
trace('Received LDS update with route configuration');
this.rdsState.setRouteConfigName(null);
this.rdsState.handleSingleMessage(
httpConnectionManager.route_config!
);
break;
default:
// The validation rules should prevent this
}
} else {
trace('LRS validation error for message ' + JSON.stringify(message));
return 'LRS Error: Listener validation failed';
}
}
}
return null;
}
reportStreamError(status: StatusObject): void {
// Nothing to do here
}
}
interface AdsState {
[EDS_TYPE_URL]: EdsState;
[CDS_TYPE_URL]: CdsState;
@ -728,21 +283,19 @@ export class XdsClient {
private adsBackoff: BackoffTimeout;
private lrsBackoff: BackoffTimeout;
constructor(
targetName: string,
serviceConfigWatcher: Watcher<ServiceConfig>,
channelOptions: ChannelOptions
) {
constructor() {
const edsState = new EdsState(() => {
this.updateNames(EDS_TYPE_URL);
});
const cdsState = new CdsState(edsState, () => {
this.updateNames(CDS_TYPE_URL);
});
const rdsState = new RdsState(targetName, serviceConfigWatcher, () => {
const rdsState = new RdsState(() => {
this.updateNames(RDS_TYPE_URL);
});
const ldsState = new LdsState(targetName, rdsState);
const ldsState = new LdsState(rdsState, () => {
this.updateNames(LDS_TYPE_URL);
});
this.adsState = {
[EDS_TYPE_URL]: edsState,
[CDS_TYPE_URL]: cdsState,
@ -750,26 +303,10 @@ export class XdsClient {
[LDS_TYPE_URL]: ldsState,
};
const channelArgs = { ...channelOptions };
const channelArgsToRemove = [
/* The SSL target name override corresponds to the target, and this
* client has its own target */
'grpc.ssl_target_name_override',
/* The default authority also corresponds to the target */
'grpc.default_authority',
/* This client will have its own specific keepalive time setting */
'grpc.keepalive_time_ms',
/* The service config specifies the load balancing policy. This channel
* needs its own separate load balancing policy setting. In particular,
* recursively using an xDS load balancer for the xDS client would be
* bad */
'grpc.service_config',
];
for (const arg of channelArgsToRemove) {
delete channelArgs[arg];
const channelArgs = {
// 5 minutes
'grpc.keepalive_time_ms': 5 * 60 * 1000
}
// 5 minutes
channelArgs['grpc.keepalive_time_ms'] = 5 * 60 * 1000;
this.adsBackoff = new BackoffTimeout(() => {
this.maybeStartAdsStream();
@ -823,14 +360,12 @@ export class XdsClient {
channelCreds,
channelArgs
);
this.maybeStartAdsStream();
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v2.LoadReportingService(
bootstrapInfo.xdsServers[0].serverUri,
channelCreds,
{channelOverride: this.adsClient.getChannel()}
);
this.maybeStartLrsStream();
},
(error) => {
trace('Failed to initialize xDS Client. ' + error.message);
@ -986,6 +521,16 @@ export class XdsClient {
}
private updateNames(typeUrl: AdsTypeUrl) {
if (this.adsState[EDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[CDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[RDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[LDS_TYPE_URL].getResourceNames().length === 0) {
this.adsCall?.end();
this.lrsCall?.end();
return;
}
this.maybeStartAdsStream();
this.maybeStartLrsStream();
trace('Sending update for type URL ' + typeUrl + ' with names ' + this.adsState[typeUrl].getResourceNames());
this.adsCall?.write({
node: this.adsNode!,
@ -1159,6 +704,26 @@ export class XdsClient {
this.adsState[CDS_TYPE_URL].removeWatcher(clusterName, watcher);
}
addRouteWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
trace('Watcher added for route ' + routeConfigName);
this.adsState[RDS_TYPE_URL].addWatcher(routeConfigName, watcher);
}
removeRouteWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
trace('Watcher removed for route ' + routeConfigName);
this.adsState[RDS_TYPE_URL].removeWatcher(routeConfigName, watcher);
}
addListenerWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
trace('Watcher added for listener ' + targetName);
this.adsState[LDS_TYPE_URL].addWatcher(targetName, watcher);
}
removeListenerWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
trace('Watcher removed for listener ' + targetName);
this.adsState[LDS_TYPE_URL].removeWatcher(targetName, watcher);
}
/**
*
* @param lrsServer The target name of the server to send stats to. An empty
@ -1241,7 +806,7 @@ export class XdsClient {
};
}
shutdown(): void {
private shutdown(): void {
this.adsCall?.cancel();
this.adsClient?.close();
this.lrsCall?.cancel();
@ -1249,3 +814,12 @@ export class XdsClient {
this.hasShutdown = true;
}
}
let singletonXdsClient: XdsClient | null = null;
export function getSingletonXdsClient(): XdsClient {
if (singletonXdsClient === null) {
singletonXdsClient = new XdsClient();
}
return singletonXdsClient;
}

View File

@ -19,7 +19,7 @@ import * as protoLoader from '@grpc/proto-loader';
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { Listener__Output } from "../generated/envoy/api/v2/Listener";
import { RdsState } from "./rds-state";
import { XdsStreamState } from "./xds-stream-state";
import { Watcher, XdsStreamState } from "./xds-stream-state";
import { HttpConnectionManager__Output } from '../generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager';
const TRACER_NAME = 'xds_client';
@ -35,10 +35,60 @@ export class LdsState implements XdsStreamState<Listener__Output> {
versionInfo = '';
nonce = '';
constructor(private targetName: string, private rdsState: RdsState) {}
private watchers: Map<string, Watcher<Listener__Output>[]> = new Map<string, Watcher<Listener__Output>[]>();
private latestResponses: Listener__Output[] = [];
constructor(private rdsState: RdsState, private updateResourceNames: () => void) {}
addWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
trace('Adding RDS watcher for targetName ' + targetName);
let watchersEntry = this.watchers.get(targetName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(targetName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.name === targetName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing RDS update for new watcher for targetName ' + targetName);
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(targetName: string, watcher: Watcher<Listener__Output>): void {
trace('Removing RDS watcher for targetName ' + targetName);
const watchersEntry = this.watchers.get(targetName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(targetName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return [this.targetName];
return Array.from(this.watchers.keys());
}
private validateResponse(message: Listener__Output): boolean {
@ -59,47 +109,47 @@ export class LdsState implements XdsStreamState<Listener__Output> {
case 'rds':
return !!httpConnectionManager.rds?.config_source?.ads;
case 'route_config':
return true;
return this.rdsState.validateResponse(httpConnectionManager.route_config!);
}
return false;
}
handleResponses(responses: Listener__Output[]): string | null {
trace('Received LDS update with names ' + responses.map(message => message.name));
for (const message of responses) {
if (message.name === this.targetName) {
if (this.validateResponse(message)) {
// The validation step ensures that this is correct
const httpConnectionManager = message.api_listener!
.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds':
trace('Received LDS update with RDS route config name ' + httpConnectionManager.rds!.route_config_name);
this.rdsState.setRouteConfigName(
httpConnectionManager.rds!.route_config_name
);
break;
case 'route_config':
trace('Received LDS update with route configuration');
this.rdsState.setRouteConfigName(null);
this.rdsState.handleSingleMessage(
httpConnectionManager.route_config!
);
break;
default:
// The validation rules should prevent this
}
} else {
trace('LRS validation error for message ' + JSON.stringify(message));
return 'LRS Error: Listener validation failed';
private handleMissingNames(allTargetNames: Set<string>) {
for (const [targetName, watcherList] of this.watchers.entries()) {
if (!allTargetNames.has(targetName)) {
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: Listener__Output[]): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('LDS validation failed for message ' + JSON.stringify(message));
return 'LDS Error: Route validation failed';
}
}
this.latestResponses = responses;
const allTargetNames = new Set<string>();
for (const message of responses) {
allTargetNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
trace('Received RDS response with route config names ' + Array.from(allTargetNames));
this.handleMissingNames(allTargetNames);
return null;
}
reportStreamError(status: StatusObject): void {
// Nothing to do here
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}

View File

@ -16,6 +16,7 @@
*/
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { GRPC_XDS_EXPERIMENTAL_ROUTING } from "../environment";
import { RouteConfiguration__Output } from "../generated/envoy/api/v2/RouteConfiguration";
import { CdsLoadBalancingConfig } from "../load-balancer-cds";
import { Watcher, XdsStreamState } from "./xds-stream-state";
@ -27,68 +28,164 @@ function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
const SUPPORTED_PATH_SPECIFIERS = ['prefix', 'path', 'safe_regex'];
const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
'exact_match',
'safe_regex_match',
'range_match',
'present_match',
'prefix_match',
'suffix_match'];
const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header'];
export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
versionInfo = '';
nonce = '';
private routeConfigName: string | null = null;
private watchers: Map<string, Watcher<RouteConfiguration__Output>[]> = new Map<string, Watcher<RouteConfiguration__Output>[]>();
private latestResponses: RouteConfiguration__Output[] = [];
constructor(
private targetName: string,
private watcher: Watcher<ServiceConfig>,
private updateResouceNames: () => void
) {}
constructor(private updateResourceNames: () => void) {}
getResourceNames(): string[] {
return this.routeConfigName ? [this.routeConfigName] : [];
addWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
trace('Adding RDS watcher for routeConfigName ' + routeConfigName);
let watchersEntry = this.watchers.get(routeConfigName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(routeConfigName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.name === routeConfigName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing RDS update for new watcher for routeConfigName ' + routeConfigName);
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
handleSingleMessage(message: RouteConfiguration__Output) {
for (const virtualHost of message.virtual_hosts) {
if (virtualHost.domains.indexOf(this.targetName) >= 0) {
const route = virtualHost.routes[virtualHost.routes.length - 1];
if (route.match?.prefix === '' && route.route?.cluster) {
trace('Reporting RDS update for host ' + this.targetName + ' with cluster ' + route.route.cluster);
this.watcher.onValidUpdate({
methodConfig: [],
loadBalancingConfig: [
new CdsLoadBalancingConfig(route.route.cluster)
],
});
return;
} else {
trace('Discarded matching route with prefix ' + route.match?.prefix + ' and cluster ' + route.route?.cluster);
removeWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>): void {
trace('Removing RDS watcher for routeConfigName ' + routeConfigName);
const watchersEntry = this.watchers.get(routeConfigName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(routeConfigName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
validateResponse(message: RouteConfiguration__Output): boolean {
if (GRPC_XDS_EXPERIMENTAL_ROUTING) {
// https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation
for (const virtualHost of message.virtual_hosts) {
for (const domainPattern of virtualHost.domains) {
const starIndex = domainPattern.indexOf('*');
const lastStarIndex = domainPattern.lastIndexOf('*');
// A domain pattern can have at most one wildcard *
if (starIndex !== lastStarIndex) {
return false;
}
// A wildcard * can either be absent or at the beginning or end of the pattern
if (!(starIndex === -1 || starIndex === 0 || starIndex === domainPattern.length - 1)) {
return false;
}
}
for (const route of virtualHost.routes) {
const match = route.match;
if (!match) {
return false;
}
if (SUPPORTED_PATH_SPECIFIERS.indexOf(match.path_specifier) < 0) {
return false;
}
for (const headers of match.headers) {
if (SUPPPORTED_HEADER_MATCH_SPECIFIERS.indexOf(headers.header_match_specifier) < 0) {
return false;
}
}
if (route.action !== 'route') {
return false;
}
if ((route.route === undefined) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) {
return false;
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
let weightSum = 0;
for (const clusterWeight of route.route.weighted_clusters!.clusters) {
weightSum += clusterWeight.weight?.value ?? 0;
}
if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) {
return false;
}
}
}
}
return true;
} else {
return true;
}
}
private handleMissingNames(allRouteConfigNames: Set<string>) {
for (const [routeConfigName, watcherList] of this.watchers.entries()) {
if (!allRouteConfigNames.has(routeConfigName)) {
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
trace('Reporting RDS resource does not exist from domain lists ' + message.virtual_hosts.map(virtualHost => virtualHost.domains));
/* If none of the routes match the one we are looking for, bubble up an
* error. */
this.watcher.onResourceDoesNotExist();
}
handleResponses(responses: RouteConfiguration__Output[]): string | null {
trace('Received RDS response with route config names ' + responses.map(message => message.name));
if (this.routeConfigName !== null) {
for (const message of responses) {
if (message.name === this.routeConfigName) {
this.handleSingleMessage(message);
return null;
}
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('RDS validation failed for message ' + JSON.stringify(message));
return 'RDS Error: Route validation failed';
}
}
this.latestResponses = responses;
const allRouteConfigNames = new Set<string>();
for (const message of responses) {
allRouteConfigNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
trace('Received RDS response with route config names ' + Array.from(allRouteConfigNames));
this.handleMissingNames(allRouteConfigNames);
return null;
}
setRouteConfigName(name: string | null) {
const oldName = this.routeConfigName;
this.routeConfigName = name;
if (name !== oldName) {
this.updateResouceNames();
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
reportStreamError(status: StatusObject): void {
this.watcher.onTransientError(status);
}
}

View File

@ -1,5 +1,5 @@
export { trace } from './logging';
export { Resolver, ResolverListener, registerResolver } from './resolver';
export { Resolver, ResolverListener, registerResolver, ConfigSelector } from './resolver';
export { GrpcUri, uriToString } from './uri-parser';
export { ServiceConfig } from './service-config';
export { createGoogleDefaultCredentials } from './channel-credentials';