diff --git a/packages/grpc-js/src/xds-client.ts b/packages/grpc-js/src/xds-client.ts index f5725d78..4375c77a 100644 --- a/packages/grpc-js/src/xds-client.ts +++ b/packages/grpc-js/src/xds-client.ts @@ -33,7 +33,10 @@ import { Node } from './generated/envoy/api/v2/core/Node'; import { AggregatedDiscoveryServiceClient } from './generated/envoy/service/discovery/v2/AggregatedDiscoveryService'; import { DiscoveryRequest } from './generated/envoy/api/v2/DiscoveryRequest'; import { DiscoveryResponse__Output } from './generated/envoy/api/v2/DiscoveryResponse'; -import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment'; +import { + ClusterLoadAssignment__Output, + ClusterLoadAssignment, +} from './generated/envoy/api/v2/ClusterLoadAssignment'; import { Cluster__Output } from './generated/envoy/api/v2/Cluster'; import { LoadReportingServiceClient } from './generated/envoy/service/load_stats/v2/LoadReportingService'; import { LoadStatsRequest } from './generated/envoy/service/load_stats/v2/LoadStatsRequest'; @@ -50,6 +53,7 @@ import { UpstreamLocalityStats } from './generated/envoy/api/v2/endpoint/Upstrea import { Listener__Output } from './generated/envoy/api/v2/Listener'; import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager'; import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration'; +import { Any__Output } from './generated/google/protobuf/Any'; const TRACER_NAME = 'xds_client'; @@ -64,6 +68,13 @@ const CDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Cluster'; const LDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Listener'; const RDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.RouteConfiguration'; +type EdsTypeUrl = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment'; +type CdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Cluster'; +type LdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Listener'; +type RdsTypeUrl = 'type.googleapis.com/envoy.api.v2.RouteConfiguration'; + +type AdsTypeUrl = EdsTypeUrl | CdsTypeUrl | RdsTypeUrl | LdsTypeUrl; + const HTTP_CONNECTION_MANGER_TYPE_URL = 'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager'; @@ -211,6 +222,424 @@ class ClusterLoadReportMap { } } +interface XdsStreamState { + versionInfo: string; + nonce: string; + getResourceNames(): string[]; + /** + * Returns a string containing the error details if the message should be nacked, + * or null if it should be acked. + * @param responses + */ + handleResponses(responses: ResponseType[]): string | null; + + reportStreamError(status: StatusObject): void; +} + +class EdsState implements XdsStreamState { + public versionInfo = ''; + public nonce = ''; + + private watchers: Map< + string, + Watcher[] + > = new Map[]>(); + + private latestResponses: ClusterLoadAssignment__Output[] = []; + + constructor(private updateResourceNames: () => void) {} + + /** + * Add the watcher to the watcher list. Returns true if the list of resource + * names has changed, and false otherwise. + * @param edsServiceName + * @param watcher + */ + addWatcher( + edsServiceName: string, + watcher: Watcher + ): void { + let watchersEntry = this.watchers.get(edsServiceName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.watchers.set(edsServiceName, watchersEntry); + } + watchersEntry.push(watcher); + + /* If we have already received an update for the requested edsServiceName, + * immediately pass that update along to the watcher */ + for (const message of this.latestResponses) { + if (message.cluster_name === edsServiceName) { + /* These updates normally occur asynchronously, so we ensure that + * the same happens here */ + process.nextTick(() => { + watcher.onValidUpdate(message); + }); + } + } + if (addedServiceName) { + this.updateResourceNames(); + } + } + + removeWatcher( + edsServiceName: string, + watcher: Watcher + ): void { + const watchersEntry = this.watchers.get(edsServiceName); + let removedServiceName = false; + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + watchersEntry.splice(entryIndex, 1); + } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.watchers.delete(edsServiceName); + } + } + if (removedServiceName) { + this.updateResourceNames(); + } + } + + getResourceNames(): string[] { + return Array.from(this.watchers.keys()); + } + + /** + * Validate the ClusterLoadAssignment object by these rules: + * https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto + * @param message + */ + private validateResponse(message: ClusterLoadAssignment__Output) { + for (const endpoint of message.endpoints) { + for (const lb of endpoint.lb_endpoints) { + const socketAddress = lb.endpoint?.address?.socket_address; + if (!socketAddress) { + return false; + } + if (socketAddress.port_specifier !== 'port_value') { + return false; + } + if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) { + return false; + } + } + } + return true; + } + + handleResponses(responses: ClusterLoadAssignment__Output[]) { + for (const message of responses) { + if (!this.validateResponse(message)) { + return 'ClusterLoadAssignment validation failed'; + } + } + this.latestResponses = responses; + for (const message of responses) { + const watchers = this.watchers.get(message.cluster_name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + return null; + } + + reportStreamError(status: StatusObject): void { + for (const watcherList of this.watchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError(status); + } + } + } +} + +class CdsState implements XdsStreamState { + versionInfo = ''; + nonce = ''; + + private watchers: Map[]> = new Map< + string, + Watcher[] + >(); + + private latestResponses: Cluster__Output[] = []; + + constructor(private updateResourceNames: () => void) {} + + /** + * Add the watcher to the watcher list. Returns true if the list of resource + * names has changed, and false otherwise. + * @param clusterName + * @param watcher + */ + addWatcher(clusterName: string, watcher: Watcher): void { + let watchersEntry = this.watchers.get(clusterName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.watchers.set(clusterName, watchersEntry); + } + watchersEntry.push(watcher); + + /* If we have already received an update for the requested edsServiceName, + * immediately pass that update along to the watcher */ + for (const message of this.latestResponses) { + if (message.name === clusterName) { + /* These updates normally occur asynchronously, so we ensure that + * the same happens here */ + process.nextTick(() => { + watcher.onValidUpdate(message); + }); + } + } + if (addedServiceName) { + this.updateResourceNames(); + } + } + + removeWatcher(clusterName: string, watcher: Watcher): void { + const watchersEntry = this.watchers.get(clusterName); + let removedServiceName = false; + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + watchersEntry.splice(entryIndex, 1); + } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.watchers.delete(clusterName); + } + } + if (removedServiceName) { + this.updateResourceNames(); + } + } + + getResourceNames(): string[] { + return Array.from(this.watchers.keys()); + } + + private validateResponse(message: Cluster__Output): boolean { + if (message.type !== 'EDS') { + return false; + } + if (!message.eds_cluster_config?.eds_config?.ads) { + return false; + } + if (message.lb_policy !== 'ROUND_ROBIN') { + return false; + } + if (message.lrs_server) { + if (!message.lrs_server.self) { + return false; + } + } + return true; + } + + handleResponses(responses: Cluster__Output[]): string | null { + for (const message of responses) { + if (!this.validateResponse(message)) { + return 'Cluster validation failed'; + } + } + this.latestResponses = responses; + for (const message of responses) { + const watchers = this.watchers.get(message.name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + return null; + } + + reportStreamError(status: StatusObject): void { + for (const watcherList of this.watchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError(status); + } + } + } +} + +class RdsState implements XdsStreamState { + versionInfo = ''; + nonce = ''; + + private routeConfigName: string | null = null; + + constructor( + private watcher: Watcher, + private updateResouceNames: () => void + ) {} + + getResourceNames(): string[] { + return this.routeConfigName ? [this.routeConfigName] : []; + } + + handleSingleMessage(message: RouteConfiguration__Output) { + for (const virtualHost of message.virtual_hosts) { + if (virtualHost.domains.indexOf(this.routeConfigName!) >= 0) { + const route = virtualHost.routes[virtualHost.routes.length - 1]; + if (route.match?.prefix === '' && route.route?.cluster) { + this.watcher.onValidUpdate({ + methodConfig: [], + loadBalancingConfig: [ + { + name: 'cds', + cds: { + cluster: route.route.cluster, + }, + }, + ], + }); + break; + } + } + } + /* If none of the routes match the one we are looking for, bubble up an + * error. */ + this.watcher.onResourceDoesNotExist(); + } + + handleResponses(responses: RouteConfiguration__Output[]): string | null { + if (this.routeConfigName !== null) { + for (const message of responses) { + if (message.name === this.routeConfigName) { + this.handleSingleMessage(message); + return null; + } + } + } + return null; + } + + setRouteConfigName(name: string | null) { + const oldName = this.routeConfigName; + this.routeConfigName = name; + if (name !== oldName) { + this.updateResouceNames(); + } + } + + reportStreamError(status: StatusObject): void { + this.watcher.onTransientError(status); + } +} + +class LdsState implements XdsStreamState { + versionInfo = ''; + nonce = ''; + + constructor(private targetName: string, private rdsState: RdsState) {} + + getResourceNames(): string[] { + return [this.targetName]; + } + + private validateResponse(message: Listener__Output): boolean { + if ( + !( + message.api_listener?.api_listener && + protoLoader.isAnyExtension(message.api_listener.api_listener) && + message.api_listener?.api_listener['@type'] === + HTTP_CONNECTION_MANGER_TYPE_URL + ) + ) { + return false; + } + const httpConnectionManager = message.api_listener + ?.api_listener as protoLoader.AnyExtension & + HttpConnectionManager__Output; + switch (httpConnectionManager.route_specifier) { + case 'rds': + return !!httpConnectionManager.rds?.config_source?.ads; + case 'route_config': + return true; + } + return false; + } + + handleResponses(responses: Listener__Output[]): string | null { + for (const message of responses) { + if (message.name === this.targetName) { + if (this.validateResponse(message)) { + // The validation step ensures that this is correct + const httpConnectionManager = message.api_listener! + .api_listener as protoLoader.AnyExtension & + HttpConnectionManager__Output; + switch (httpConnectionManager.route_specifier) { + case 'rds': + this.rdsState.setRouteConfigName( + httpConnectionManager.rds!.route_config_name + ); + break; + case 'route_config': + this.rdsState.setRouteConfigName(null); + this.rdsState.handleSingleMessage( + httpConnectionManager.route_config! + ); + break; + default: + // The validation rules should prevent this + } + } else { + return 'Listener validation failed'; + } + } + } + throw new Error('Method not implemented.'); + } + + reportStreamError(status: StatusObject): void { + // Nothing to do here + } +} + +interface AdsState { + [EDS_TYPE_URL]: EdsState; + [CDS_TYPE_URL]: CdsState; + [RDS_TYPE_URL]: RdsState; + [LDS_TYPE_URL]: LdsState; +} + +/** + * Map type URLs to their corresponding message types + */ +type OutputType = T extends EdsTypeUrl + ? ClusterLoadAssignment__Output + : T extends CdsTypeUrl + ? Cluster__Output + : T extends RdsTypeUrl + ? RouteConfiguration__Output + : Listener__Output; + +function getResponseMessages( + typeUrl: T, + resources: Any__Output[] +): OutputType[] { + const result: OutputType[] = []; + for (const resource of resources) { + if (protoLoader.isAnyExtension(resource) && resource['@type'] === typeUrl) { + result.push(resource as protoLoader.AnyExtension & OutputType); + } else { + throw new Error( + `Invalid resource type ${ + protoLoader.isAnyExtension(resource) + ? resource['@type'] + : resource.type_url + }` + ); + } + } + return result; +} + export class XdsClient { private adsNode: Node | null = null; private adsClient: AggregatedDiscoveryServiceClient | null = null; @@ -232,36 +661,30 @@ export class XdsClient { private hasShutdown = false; - private endpointWatchers: Map< - string, - Watcher[] - > = new Map[]>(); - private lastEdsVersionInfo = ''; - private lastEdsNonce = ''; - private latestEdsResponses: ClusterLoadAssignment__Output[] = []; - - private clusterWatchers: Map[]> = new Map< - string, - Watcher[] - >(); - private lastCdsVersionInfo = ''; - private lastCdsNonce = ''; - private latestCdsResponses: Cluster__Output[] = []; - - private lastLdsVersionInfo = ''; - private lastLdsNonce = ''; - private latestLdsResponse: Listener__Output | null = null; - - private routeConfigName: string | null = null; - private lastRdsVersionInfo = ''; - private lastRdsNonce = ''; - private latestRdsResponse: RouteConfiguration__Output | null = null; + private adsState: AdsState; constructor( - private targetName: string, - private serviceConfigWatcher: Watcher, + targetName: string, + serviceConfigWatcher: Watcher, channelOptions: ChannelOptions ) { + const edsState = new EdsState(() => { + this.updateNames(EDS_TYPE_URL); + }); + const cdsState = new CdsState(() => { + this.updateNames(CDS_TYPE_URL); + }); + const rdsState = new RdsState(serviceConfigWatcher, () => { + this.updateNames(RDS_TYPE_URL); + }); + const ldsState = new LdsState(targetName, rdsState); + this.adsState = { + [EDS_TYPE_URL]: edsState, + [CDS_TYPE_URL]: cdsState, + [RDS_TYPE_URL]: rdsState, + [LDS_TYPE_URL]: ldsState, + }; + const channelArgs = { ...channelOptions }; const channelArgsToRemove = [ /* The SSL target name override corresponds to the target, and this @@ -328,151 +751,44 @@ export class XdsClient { } private handleAdsResponse(message: DiscoveryResponse__Output) { + let errorString: string | null; + /* The cases in this switch statement look redundant but separating them + * out like this is necessary for the typechecker to validate the types + * as narrowly as we need it to. */ switch (message.type_url) { - case EDS_TYPE_URL: { - const edsResponses: ClusterLoadAssignment__Output[] = []; - for (const resource of message.resources) { - if ( - protoLoader.isAnyExtension(resource) && - resource['@type'] === EDS_TYPE_URL - ) { - const resp = resource as protoLoader.AnyExtension & - ClusterLoadAssignment__Output; - if (!this.validateEdsResponse(resp)) { - this.nackEds('ClusterLoadAssignment validation failed'); - return; - } - edsResponses.push(resp); - } else { - this.nackEds( - `Invalid resource type ${ - protoLoader.isAnyExtension(resource) - ? resource['@type'] - : resource.type_url - }` - ); - return; - } - } - for (const message of edsResponses) { - this.handleEdsResponse(message); - } - this.lastEdsVersionInfo = message.version_info; - this.lastEdsNonce = message.nonce; - this.latestEdsResponses = edsResponses; - this.ackEds(); + case EDS_TYPE_URL: + errorString = this.adsState[message.type_url].handleResponses( + getResponseMessages(message.type_url, message.resources) + ); break; - } - case CDS_TYPE_URL: { - const cdsResponses: Cluster__Output[] = []; - for (const resource of message.resources) { - if ( - protoLoader.isAnyExtension(resource) && - resource['@type'] === CDS_TYPE_URL - ) { - const resp = resource as protoLoader.AnyExtension & Cluster__Output; - if (!this.validateCdsResponse(resp)) { - this.nackCds('Cluster validation failed'); - return; - } - } else { - this.nackCds( - `Invalid resource type ${ - protoLoader.isAnyExtension(resource) - ? resource['@type'] - : resource.type_url - }` - ); - return; - } - } - for (const message of cdsResponses) { - this.handleCdsResponse(message); - } - this.lastCdsVersionInfo = message.version_info; - this.lastCdsNonce = message.nonce; - this.latestCdsResponses = cdsResponses; - this.ackCds(); + case CDS_TYPE_URL: + errorString = this.adsState[message.type_url].handleResponses( + getResponseMessages(message.type_url, message.resources) + ); break; - } - case LDS_TYPE_URL: { - let nackError: string | null = null; - for (const resource of message.resources) { - if ( - protoLoader.isAnyExtension(resource) && - resource['@type'] === LDS_TYPE_URL - ) { - const resp = resource as protoLoader.AnyExtension & - Listener__Output; - if (resp.name === this.targetName) { - if (this.validateLdsResponse(resp)) { - this.handleLdsResponse(resp); - this.lastLdsVersionInfo = message.version_info; - this.lastLdsNonce = message.nonce; - this.latestLdsResponse = resp; - } else { - nackError = 'Listener validation failed'; - } - break; - } - } else { - nackError = `Invalid resource type ${ - protoLoader.isAnyExtension(resource) - ? resource['@type'] - : resource.type_url - }`; - break; - } - } - if (nackError) { - this.nackLds(nackError); - } else { - this.ackLds(); - } + case RDS_TYPE_URL: + errorString = this.adsState[message.type_url].handleResponses( + getResponseMessages(message.type_url, message.resources) + ); break; - } - case RDS_TYPE_URL: { - let nackError: string | null = null; - if (this.routeConfigName === null) { - nackError = 'Unexpected RouteConfiguration response'; - } else { - for (const resource of message.resources) { - if ( - protoLoader.isAnyExtension(resource) && - resource['@type'] === RDS_TYPE_URL - ) { - const resp = resource as protoLoader.AnyExtension & - RouteConfiguration__Output; - if (resp.name === this.routeConfigName) { - if (this.validateRdsResponse(resp)) { - this.handleRdsResponse(resp); - this.lastRdsVersionInfo = message.version_info; - this.lastRdsNonce = message.nonce; - this.latestRdsResponse = resp; - } else { - nackError = 'RouteConfiguration validation failed'; - } - break; - } - } else { - nackError = `Invalid resource type ${ - protoLoader.isAnyExtension(resource) - ? resource['@type'] - : resource.type_url - }`; - break; - } - } - } - if (nackError) { - this.nackRds(nackError); - } else { - this.ackRds(); - } + case LDS_TYPE_URL: + errorString = this.adsState[message.type_url].handleResponses( + getResponseMessages(message.type_url, message.resources) + ); break; - } default: - this.nackUnknown(message.type_url, message.version_info, message.nonce); + errorString = `Unknown type_url ${message.type_url}`; + } + if (errorString === null) { + /* errorString can only be null in one of the first 4 cases, which + * implies that message.type_url is one of the 4 known type URLs, which + * means that this type assertion is valid. */ + const typeUrl = message.type_url as AdsTypeUrl; + this.adsState[typeUrl].nonce = message.nonce; + this.adsState[typeUrl].versionInfo = message.version_info; + this.ack(typeUrl); + } else { + this.nack(message.type_url, errorString); } } @@ -506,341 +822,79 @@ export class XdsClient { this.maybeStartAdsStream(); }); - this.adsCall.write({ - node: this.adsNode!, - type_url: LDS_TYPE_URL, - resource_names: [this.targetName], - }); - - if (this.routeConfigName) { - this.adsCall.write({ - node: this.adsNode!, - type_url: RDS_TYPE_URL, - resource_names: [this.routeConfigName], - }); + const allTypeUrls: AdsTypeUrl[] = [ + EDS_TYPE_URL, + CDS_TYPE_URL, + RDS_TYPE_URL, + LDS_TYPE_URL, + ]; + for (const typeUrl of allTypeUrls) { + const state = this.adsState[typeUrl]; + state.nonce = ''; + state.versionInfo = ''; + if (state.getResourceNames().length > 0) { + this.updateNames(typeUrl); + } } - - const clusterNames = Array.from(this.clusterWatchers.keys()); - if (clusterNames.length > 0) { - this.adsCall.write({ - node: this.adsNode!, - type_url: CDS_TYPE_URL, - resource_names: clusterNames, - }); - } - - const endpointWatcherNames = Array.from(this.endpointWatchers.keys()); - if (endpointWatcherNames.length > 0) { - this.adsCall.write({ - node: this.adsNode!, - type_url: EDS_TYPE_URL, - resource_names: endpointWatcherNames, - }); - } - } - - private nackUnknown(typeUrl: string, versionInfo: string, nonce: string) { - if (!this.adsCall) { - return; - } - this.adsCall.write({ - node: this.adsNode!, - type_url: typeUrl, - version_info: versionInfo, - response_nonce: nonce, - error_detail: { - message: `Unknown type_url ${typeUrl}`, - }, - }); } /** - * Acknowledge an EDS update. This should be called after the local nonce and + * Acknowledge an update. This should be called after the local nonce and * version info are updated so that it sends the post-update values. */ - private ackEds() { - if (!this.adsCall) { - return; - } - this.adsCall.write({ - node: this.adsNode!, - type_url: EDS_TYPE_URL, - resource_names: Array.from(this.endpointWatchers.keys()), - response_nonce: this.lastEdsNonce, - version_info: this.lastEdsVersionInfo, - }); - } - - private ackCds() { - if (!this.adsCall) { - return; - } - this.adsCall.write({ - node: this.adsNode!, - type_url: CDS_TYPE_URL, - resource_names: Array.from(this.clusterWatchers.keys()), - response_nonce: this.lastCdsNonce, - version_info: this.lastCdsVersionInfo, - }); - } - - private ackLds() { - this.adsCall?.write({ - node: this.adsNode!, - type_url: LDS_TYPE_URL, - resource_names: [this.targetName], - response_nonce: this.lastLdsNonce, - version_info: this.lastLdsVersionInfo, - }); - } - - private ackRds() { - this.adsCall?.write({ - node: this.adsNode!, - type_url: RDS_TYPE_URL, - resource_names: [this.routeConfigName!], - response_nonce: this.lastRdsNonce, - version_info: this.lastRdsVersionInfo, - }); + ack(typeUrl: AdsTypeUrl) { + this.updateNames(typeUrl); } /** - * Reject an EDS update. This should be called without updating the local + * Reject an update. This should be called without updating the local * nonce and version info. */ - private nackEds(message: string) { - if (!this.adsCall) { - return; - } - this.adsCall.write({ - node: this.adsNode!, - type_url: EDS_TYPE_URL, - resource_names: Array.from(this.endpointWatchers.keys()), - response_nonce: this.lastEdsNonce, - version_info: this.lastEdsVersionInfo, - error_detail: { - message, - }, - }); - } - - private nackCds(message: string) { - if (!this.adsCall) { - return; - } - this.adsCall.write({ - node: this.adsNode!, - type_url: CDS_TYPE_URL, - resource_names: Array.from(this.clusterWatchers.keys()), - response_nonce: this.lastCdsNonce, - version_info: this.lastCdsVersionInfo, - error_detail: { - message, - }, - }); - } - - private nackLds(message: string) { - this.adsCall?.write({ - node: this.adsNode!, - type_url: LDS_TYPE_URL, - resource_names: [this.targetName], - response_nonce: this.lastLdsNonce, - version_info: this.lastLdsVersionInfo, - error_detail: { - message, - }, - }); - } - - private nackRds(message: string) { - this.adsCall?.write({ - node: this.adsNode!, - type_url: RDS_TYPE_URL, - resource_names: this.routeConfigName ? [this.routeConfigName] : [], - response_nonce: this.lastRdsNonce, - version_info: this.lastRdsVersionInfo, - error_detail: { - message, - }, - }); - } - - /** - * Validate the ClusterLoadAssignment object by these rules: - * https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto - * @param message - */ - private validateEdsResponse(message: ClusterLoadAssignment__Output): boolean { - for (const endpoint of message.endpoints) { - for (const lb of endpoint.lb_endpoints) { - const socketAddress = lb.endpoint?.address?.socket_address; - if (!socketAddress) { - return false; - } - if (socketAddress.port_specifier !== 'port_value') { - return false; - } - if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) { - return false; - } - } - } - return true; - } - - private validateCdsResponse(message: Cluster__Output): boolean { - if (message.type !== 'EDS') { - return false; - } - if (!message.eds_cluster_config?.eds_config?.ads) { - return false; - } - if (message.lb_policy !== 'ROUND_ROBIN') { - return false; - } - if (message.lrs_server) { - if (!message.lrs_server.self) { - return false; - } - } - return true; - } - - private validateLdsResponse(message: Listener__Output): boolean { - if ( - !( - message.api_listener?.api_listener && - protoLoader.isAnyExtension(message.api_listener.api_listener) && - message.api_listener?.api_listener['@type'] === - HTTP_CONNECTION_MANGER_TYPE_URL - ) - ) { - return false; - } - const httpConnectionManager = message.api_listener - ?.api_listener as protoLoader.AnyExtension & - HttpConnectionManager__Output; - switch (httpConnectionManager.route_specifier) { - case 'rds': - if (!httpConnectionManager.rds?.config_source?.ads) { - return false; - } - break; - case 'route_config': - return this.validateRdsResponse(httpConnectionManager.route_config!); - } - return false; - } - - private validateRdsResponse(message: RouteConfiguration__Output): boolean { - return true; - } - - private handleEdsResponse(message: ClusterLoadAssignment__Output) { - const watchers = this.endpointWatchers.get(message.cluster_name) ?? []; - for (const watcher of watchers) { - watcher.onValidUpdate(message); - } - } - - private handleCdsResponse(message: Cluster__Output) { - const watchers = this.clusterWatchers.get(message.name) ?? []; - for (const watcher of watchers) { - watcher.onValidUpdate(message); - } - } - - private handleLdsResponse(message: Listener__Output) { - // The validation step ensures that this is correct - const httpConnectionManager = message.api_listener! - .api_listener as protoLoader.AnyExtension & HttpConnectionManager__Output; - switch (httpConnectionManager.route_specifier) { - case 'rds': - this.routeConfigName = httpConnectionManager.rds!.route_config_name; - this.updateRdsNames(); - break; - case 'route_config': - this.handleRdsResponse(httpConnectionManager.route_config!); - if (this.routeConfigName) { - this.routeConfigName = null; - this.updateRdsNames(); - } + private nack(typeUrl: string, message: string) { + let resourceNames: string[]; + let nonce: string; + let versionInfo: string; + switch (typeUrl) { + case EDS_TYPE_URL: + case CDS_TYPE_URL: + case RDS_TYPE_URL: + case LDS_TYPE_URL: + resourceNames = this.adsState[typeUrl].getResourceNames(); + nonce = this.adsState[typeUrl].nonce; + versionInfo = this.adsState[typeUrl].versionInfo; break; default: - // The validation rules should prevent this + resourceNames = []; + nonce = ''; + versionInfo = ''; } - } - - private handleRdsResponse(message: RouteConfiguration__Output) { - for (const virtualHost of message.virtual_hosts) { - if (virtualHost.domains.indexOf(this.routeConfigName!) >= 0) { - const route = virtualHost.routes[virtualHost.routes.length - 1]; - if (route.match?.prefix === '' && route.route?.cluster) { - this.serviceConfigWatcher.onValidUpdate({ - methodConfig: [], - loadBalancingConfig: [ - { - name: 'cds', - cds: { - cluster: route.route.cluster, - }, - }, - ], - }); - break; - } - } - } - /* If none of the routes match the one we are looking for, bubble up an - * error. */ - this.serviceConfigWatcher.onResourceDoesNotExist(); - } - - private updateEdsNames() { - if (this.adsCall) { - this.adsCall.write({ - node: this.adsNode!, - type_url: EDS_TYPE_URL, - resource_names: Array.from(this.endpointWatchers.keys()), - response_nonce: this.lastEdsNonce, - version_info: this.lastEdsVersionInfo, - }); - } - } - - private updateCdsNames() { - if (this.adsCall) { - this.adsCall.write({ - node: this.adsNode!, - type_url: CDS_TYPE_URL, - resource_names: Array.from(this.clusterWatchers.keys()), - response_nonce: this.lastCdsNonce, - version_info: this.lastCdsVersionInfo, - }); - } - } - - private updateRdsNames() { this.adsCall?.write({ node: this.adsNode!, - type_url: RDS_TYPE_URL, - resource_names: this.routeConfigName ? [this.routeConfigName] : [], - response_nonce: this.lastRdsNonce, - version_info: this.lastRdsVersionInfo, + type_url: typeUrl, + resource_names: resourceNames, + response_nonce: nonce, + version_info: versionInfo, + error_detail: { + message: message, + }, + }); + } + + private updateNames(typeUrl: AdsTypeUrl) { + this.adsCall?.write({ + node: this.adsNode!, + type_url: typeUrl, + resource_names: this.adsState[typeUrl].getResourceNames(), + response_nonce: this.adsState[typeUrl].nonce, + version_info: this.adsState[typeUrl].versionInfo, }); } private reportStreamError(status: StatusObject) { - for (const watcherList of [ - ...this.endpointWatchers.values(), - ...this.clusterWatchers.values(), - [this.serviceConfigWatcher], - ]) { - for (const watcher of watcherList) { - watcher.onTransientError(status); - } - } + this.adsState[EDS_TYPE_URL].reportStreamError(status); + this.adsState[CDS_TYPE_URL].reportStreamError(status); + this.adsState[RDS_TYPE_URL].reportStreamError(status); + this.adsState[LDS_TYPE_URL].reportStreamError(status); } private maybeStartLrsStream() { @@ -967,29 +1021,7 @@ export class XdsClient { watcher: Watcher ) { trace('Watcher added for endpoint ' + edsServiceName); - let watchersEntry = this.endpointWatchers.get(edsServiceName); - let addedServiceName = false; - if (watchersEntry === undefined) { - addedServiceName = true; - watchersEntry = []; - this.endpointWatchers.set(edsServiceName, watchersEntry); - } - watchersEntry.push(watcher); - if (addedServiceName) { - this.updateEdsNames(); - } - - /* If we have already received an update for the requested edsServiceName, - * immediately pass that update along to the watcher */ - for (const message of this.latestEdsResponses) { - if (message.cluster_name === edsServiceName) { - /* These updates normally occur asynchronously, so we ensure that - * the same happens here */ - process.nextTick(() => { - watcher.onValidUpdate(message); - }); - } - } + this.adsState[EDS_TYPE_URL].addWatcher(edsServiceName, watcher); } removeEndpointWatcher( @@ -997,67 +1029,17 @@ export class XdsClient { watcher: Watcher ) { trace('Watcher removed for endpoint ' + edsServiceName); - const watchersEntry = this.endpointWatchers.get(edsServiceName); - let removedServiceName = false; - if (watchersEntry !== undefined) { - const entryIndex = watchersEntry.indexOf(watcher); - if (entryIndex >= 0) { - watchersEntry.splice(entryIndex, 1); - } - if (watchersEntry.length === 0) { - removedServiceName = true; - this.endpointWatchers.delete(edsServiceName); - } - } - if (removedServiceName) { - this.updateEdsNames(); - } + this.adsState[EDS_TYPE_URL].removeWatcher(edsServiceName, watcher); } addClusterWatcher(clusterName: string, watcher: Watcher) { trace('Watcher added for cluster ' + clusterName); - let watchersEntry = this.clusterWatchers.get(clusterName); - let addedServiceName = false; - if (watchersEntry === undefined) { - addedServiceName = true; - watchersEntry = []; - this.clusterWatchers.set(clusterName, watchersEntry); - } - watchersEntry.push(watcher); - if (addedServiceName) { - this.updateCdsNames(); - } - - /* If we have already received an update for the requested clusterName, - * immediately pass that update along to the watcher */ - for (const message of this.latestCdsResponses) { - if (message.name === clusterName) { - /* These updates normally occur asynchronously, so we ensure that - * the same happens here */ - process.nextTick(() => { - watcher.onValidUpdate(message); - }); - } - } + this.adsState[CDS_TYPE_URL].addWatcher(clusterName, watcher); } removeClusterWatcher(clusterName: string, watcher: Watcher) { trace('Watcher removed for endpoint ' + clusterName); - const watchersEntry = this.clusterWatchers.get(clusterName); - let removedServiceName = false; - if (watchersEntry !== undefined) { - const entryIndex = watchersEntry.indexOf(watcher); - if (entryIndex >= 0) { - watchersEntry.splice(entryIndex, 1); - } - if (watchersEntry.length === 0) { - removedServiceName = true; - this.endpointWatchers.delete(clusterName); - } - } - if (removedServiceName) { - this.updateCdsNames(); - } + this.adsState[CDS_TYPE_URL].removeWatcher(clusterName, watcher); } /**