From 64a0b0ad7cd28e73e1e28ecad32437652f1bcc60 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 17 Aug 2021 14:59:22 -0700 Subject: [PATCH] grpc-js-xds: Distinguish v2 and v3 when handling messages --- packages/grpc-js-xds/src/resolver-xds.ts | 24 ++++++++++--------- packages/grpc-js-xds/src/xds-client.ts | 23 ++++++++++++++---- .../src/xds-stream-state/cds-state.ts | 9 ++++--- .../src/xds-stream-state/eds-state.ts | 9 ++++--- .../src/xds-stream-state/lds-state.ts | 17 +++++++------ .../src/xds-stream-state/rds-state.ts | 19 ++++++++------- .../src/xds-stream-state/xds-stream-state.ts | 9 +++++-- 7 files changed, 72 insertions(+), 38 deletions(-) diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index f6287ea6..ab036184 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -212,6 +212,7 @@ class XdsResolver implements Resolver { private latestRouteConfigName: string | null = null; private latestRouteConfig: RouteConfiguration__Output | null = null; + private latestRouteConfigIsV2 = false; private clusterRefcounts = new Map(); @@ -225,7 +226,7 @@ class XdsResolver implements Resolver { private channelOptions: ChannelOptions ) { this.ldsWatcher = { - onValidUpdate: (update: Listener__Output) => { + onValidUpdate: (update: Listener__Output, isV2: boolean) => { const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, update.api_listener!.api_listener!.value); const defaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout; if (defaultTimeout === null || defaultTimeout === undefined) { @@ -233,7 +234,7 @@ class XdsResolver implements Resolver { } else { this.latestDefaultTimeout = protoDurationToDuration(defaultTimeout); } - if (EXPERIMENTAL_FAULT_INJECTION) { + if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { this.ldsHttpFilterConfigs = []; for (const filter of httpConnectionManager.http_filters) { // typed_config must be set here, or validation would have failed @@ -259,7 +260,7 @@ class XdsResolver implements Resolver { if (this.latestRouteConfigName) { getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); } - this.handleRouteConfig(httpConnectionManager.route_config!); + this.handleRouteConfig(httpConnectionManager.route_config!, isV2); break; default: // This is prevented by the validation rules @@ -279,8 +280,8 @@ class XdsResolver implements Resolver { } }; this.rdsWatcher = { - onValidUpdate: (update: RouteConfiguration__Output) => { - this.handleRouteConfig(update); + onValidUpdate: (update: RouteConfiguration__Output, isV2: boolean) => { + this.handleRouteConfig(update, isV2); }, onTransientError: (error: StatusObject) => { /* A transient error only needs to bubble up as a failure if we have @@ -310,20 +311,21 @@ class XdsResolver implements Resolver { refCount.refCount -= 1; if (!refCount.inLastConfig && refCount.refCount === 0) { this.clusterRefcounts.delete(clusterName); - this.handleRouteConfig(this.latestRouteConfig!); + this.handleRouteConfig(this.latestRouteConfig!, this.latestRouteConfigIsV2); } } } - private handleRouteConfig(routeConfig: RouteConfiguration__Output) { + private handleRouteConfig(routeConfig: RouteConfiguration__Output, isV2: boolean) { this.latestRouteConfig = routeConfig; + this.latestRouteConfigIsV2 = isV2; const virtualHost = findVirtualHostForDomain(routeConfig.virtual_hosts, this.target.path); if (virtualHost === null) { this.reportResolutionError('No matching route found'); return; } const virtualHostHttpFilterOverrides = new Map(); - if (EXPERIMENTAL_FAULT_INJECTION) { + if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { for (const [name, filter] of Object.entries(virtualHost.typed_per_filter_config ?? {})) { const parsedConfig = parseOverrideFilterConfig(filter); if (parsedConfig) { @@ -352,7 +354,7 @@ class XdsResolver implements Resolver { timeout = undefined; } const routeHttpFilterOverrides = new Map(); - if (EXPERIMENTAL_FAULT_INJECTION) { + if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { for (const [name, filter] of Object.entries(route.typed_per_filter_config ?? {})) { const parsedConfig = parseOverrideFilterConfig(filter); if (parsedConfig) { @@ -367,7 +369,7 @@ class XdsResolver implements Resolver { const cluster = route.route!.cluster!; allConfigClusters.add(cluster); const extraFilterFactories: FilterFactory[] = []; - if (EXPERIMENTAL_FAULT_INJECTION) { + if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { for (const filterConfig of this.ldsHttpFilterConfigs) { if (routeHttpFilterOverrides.has(filterConfig.name)) { const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!); @@ -396,7 +398,7 @@ class XdsResolver implements Resolver { allConfigClusters.add(clusterWeight.name); const extraFilterFactories: FilterFactory[] = []; const clusterHttpFilterOverrides = new Map(); - if (EXPERIMENTAL_FAULT_INJECTION) { + if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { for (const [name, filter] of Object.entries(clusterWeight.typed_per_filter_config ?? {})) { const parsedConfig = parseOverrideFilterConfig(filter); if (parsedConfig) { diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index 6828a8a8..952ae81f 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -435,32 +435,47 @@ export class XdsClient { private handleAdsResponse(message: DiscoveryResponse__Output) { let errorString: string | null; let serviceKind: AdsServiceKind; + let isV2: boolean; + switch (message.type_url) { + case EDS_TYPE_URL_V2: + case CDS_TYPE_URL_V2: + case RDS_TYPE_URL_V2: + case LDS_TYPE_URL_V2: + isV2 = true; + break; + default: + isV2 = false; + } switch (message.type_url) { case EDS_TYPE_URL_V2: case EDS_TYPE_URL_V3: errorString = this.adsState.eds.handleResponses( - getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources) + getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources), + isV2 ); serviceKind = 'eds'; break; case CDS_TYPE_URL_V2: case CDS_TYPE_URL_V3: errorString = this.adsState.cds.handleResponses( - getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources) + getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources), + isV2 ); serviceKind = 'cds'; break; case RDS_TYPE_URL_V2: case RDS_TYPE_URL_V3: errorString = this.adsState.rds.handleResponses( - getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources) + getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources), + isV2 ); serviceKind = 'rds'; break; case LDS_TYPE_URL_V2: case LDS_TYPE_URL_V3: errorString = this.adsState.lds.handleResponses( - getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources) + getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources), + isV2 ); serviceKind = 'lds'; break; diff --git a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts index a6e89805..7720c567 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts @@ -36,6 +36,7 @@ export class CdsState implements XdsStreamState { >(); private latestResponses: Cluster__Output[] = []; + private latestIsV2 = false; constructor( private edsState: EdsState, @@ -61,13 +62,14 @@ export class CdsState implements XdsStreamState { /* If we have already received an update for the requested edsServiceName, * immediately pass that update along to the watcher */ + const isV2 = this.latestIsV2; for (const message of this.latestResponses) { if (message.name === clusterName) { /* These updates normally occur asynchronously, so we ensure that * the same happens here */ process.nextTick(() => { trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName); - watcher.onValidUpdate(message); + watcher.onValidUpdate(message, isV2); }); } } @@ -134,7 +136,7 @@ export class CdsState implements XdsStreamState { } } - handleResponses(responses: Cluster__Output[]): string | null { + handleResponses(responses: Cluster__Output[], isV2: boolean): string | null { for (const message of responses) { if (!this.validateResponse(message)) { trace('CDS validation failed for message ' + JSON.stringify(message)); @@ -142,6 +144,7 @@ export class CdsState implements XdsStreamState { } } this.latestResponses = responses; + this.latestIsV2 = isV2; const allEdsServiceNames: Set = new Set(); const allClusterNames: Set = new Set(); for (const message of responses) { @@ -152,7 +155,7 @@ export class CdsState implements XdsStreamState { ); const watchers = this.watchers.get(message.name) ?? []; for (const watcher of watchers) { - watcher.onValidUpdate(message); + watcher.onValidUpdate(message, isV2); } } trace('Received CDS updates for cluster names ' + Array.from(allClusterNames)); diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts index a0fb5f4d..dbf18ef8 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -36,6 +36,7 @@ export class EdsState implements XdsStreamState { > = new Map[]>(); private latestResponses: ClusterLoadAssignment__Output[] = []; + private latestIsV2 = false; constructor(private updateResourceNames: () => void) {} @@ -61,13 +62,14 @@ export class EdsState implements XdsStreamState { /* If we have already received an update for the requested edsServiceName, * immediately pass that update along to the watcher */ + const isV2 = this.latestIsV2; for (const message of this.latestResponses) { if (message.cluster_name === edsServiceName) { /* These updates normally occur asynchronously, so we ensure that * the same happens here */ process.nextTick(() => { trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName); - watcher.onValidUpdate(message); + watcher.onValidUpdate(message, isV2); }); } } @@ -143,7 +145,7 @@ export class EdsState implements XdsStreamState { } } - handleResponses(responses: ClusterLoadAssignment__Output[]) { + handleResponses(responses: ClusterLoadAssignment__Output[], isV2: boolean) { for (const message of responses) { if (!this.validateResponse(message)) { trace('EDS validation failed for message ' + JSON.stringify(message)); @@ -151,12 +153,13 @@ export class EdsState implements XdsStreamState { } } this.latestResponses = responses; + this.latestIsV2 = isV2; const allClusterNames: Set = new Set(); for (const message of responses) { allClusterNames.add(message.cluster_name); const watchers = this.watchers.get(message.cluster_name) ?? []; for (const watcher of watchers) { - watcher.onValidUpdate(message); + watcher.onValidUpdate(message, isV2); } } trace('Received EDS updates for cluster names ' + Array.from(allClusterNames)); diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts index 8ab6ed9d..7e8ec045 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -39,6 +39,7 @@ export class LdsState implements XdsStreamState { private watchers: Map[]> = new Map[]>(); private latestResponses: Listener__Output[] = []; + private latestIsV2 = false; constructor(private rdsState: RdsState, private updateResourceNames: () => void) {} @@ -55,13 +56,14 @@ export class LdsState implements XdsStreamState { /* If we have already received an update for the requested edsServiceName, * immediately pass that update along to the watcher */ + const isV2 = this.latestIsV2; for (const message of this.latestResponses) { if (message.name === targetName) { /* These updates normally occur asynchronously, so we ensure that * the same happens here */ process.nextTick(() => { trace('Reporting existing RDS update for new watcher for targetName ' + targetName); - watcher.onValidUpdate(message); + watcher.onValidUpdate(message, isV2); }); } } @@ -93,7 +95,7 @@ export class LdsState implements XdsStreamState { return Array.from(this.watchers.keys()); } - private validateResponse(message: Listener__Output): boolean { + private validateResponse(message: Listener__Output, isV2: boolean): boolean { if ( !( message.api_listener?.api_listener && @@ -104,7 +106,7 @@ export class LdsState implements XdsStreamState { return false; } const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener.value); - if (EXPERIMENTAL_FAULT_INJECTION) { + if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { const filterNames = new Set(); for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) { if (filterNames.has(httpFilter.name)) { @@ -136,7 +138,7 @@ export class LdsState implements XdsStreamState { case 'rds': return !!httpConnectionManager.rds?.config_source?.ads; case 'route_config': - return this.rdsState.validateResponse(httpConnectionManager.route_config!); + return this.rdsState.validateResponse(httpConnectionManager.route_config!, isV2); } return false; } @@ -151,20 +153,21 @@ export class LdsState implements XdsStreamState { } } - handleResponses(responses: Listener__Output[]): string | null { + handleResponses(responses: Listener__Output[], isV2: boolean): string | null { for (const message of responses) { - if (!this.validateResponse(message)) { + if (!this.validateResponse(message, isV2)) { trace('LDS validation failed for message ' + JSON.stringify(message)); return 'LDS Error: Route validation failed'; } } this.latestResponses = responses; + this.latestIsV2 = isV2; const allTargetNames = new Set(); for (const message of responses) { allTargetNames.add(message.name); const watchers = this.watchers.get(message.name) ?? []; for (const watcher of watchers) { - watcher.onValidUpdate(message); + watcher.onValidUpdate(message, isV2); } } trace('Received RDS response with route config names ' + Array.from(allTargetNames)); diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index 58f11f0b..9194529d 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -45,6 +45,7 @@ export class RdsState implements XdsStreamState { private watchers: Map[]> = new Map[]>(); private latestResponses: RouteConfiguration__Output[] = []; + private latestIsV2 = false; constructor(private updateResourceNames: () => void) {} @@ -61,13 +62,14 @@ export class RdsState implements XdsStreamState { /* If we have already received an update for the requested edsServiceName, * immediately pass that update along to the watcher */ + const isV2 = this.latestIsV2; for (const message of this.latestResponses) { if (message.name === routeConfigName) { /* These updates normally occur asynchronously, so we ensure that * the same happens here */ process.nextTick(() => { trace('Reporting existing RDS update for new watcher for routeConfigName ' + routeConfigName); - watcher.onValidUpdate(message); + watcher.onValidUpdate(message, isV2); }); } } @@ -99,7 +101,7 @@ export class RdsState implements XdsStreamState { return Array.from(this.watchers.keys()); } - validateResponse(message: RouteConfiguration__Output): boolean { + validateResponse(message: RouteConfiguration__Output, isV2: boolean): boolean { // https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation for (const virtualHost of message.virtual_hosts) { for (const domainPattern of virtualHost.domains) { @@ -114,7 +116,7 @@ export class RdsState implements XdsStreamState { return false; } } - if (EXPERIMENTAL_FAULT_INJECTION) { + if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { for (const filterConfig of Object.values(virtualHost.typed_per_filter_config ?? {})) { if (!validateOverrideFilter(filterConfig)) { return false; @@ -140,7 +142,7 @@ export class RdsState implements XdsStreamState { if ((route.route === undefined) || (route.route === null) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) { return false; } - if (EXPERIMENTAL_FAULT_INJECTION) { + if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) { if (!validateOverrideFilter(filterConfig)) { return false; @@ -155,7 +157,7 @@ export class RdsState implements XdsStreamState { if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) { return false; } - if (EXPERIMENTAL_FAULT_INJECTION) { + if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) { for (const weightedCluster of route.route!.weighted_clusters!.clusters) { for (const filterConfig of Object.values(weightedCluster.typed_per_filter_config ?? {})) { if (!validateOverrideFilter(filterConfig)) { @@ -180,20 +182,21 @@ export class RdsState implements XdsStreamState { } } - handleResponses(responses: RouteConfiguration__Output[]): string | null { + handleResponses(responses: RouteConfiguration__Output[], isV2: boolean): string | null { for (const message of responses) { - if (!this.validateResponse(message)) { + if (!this.validateResponse(message, isV2)) { trace('RDS validation failed for message ' + JSON.stringify(message)); return 'RDS Error: Route validation failed'; } } this.latestResponses = responses; + this.latestIsV2 = isV2; const allRouteConfigNames = new Set(); for (const message of responses) { allRouteConfigNames.add(message.name); const watchers = this.watchers.get(message.name) ?? []; for (const watcher of watchers) { - watcher.onValidUpdate(message); + watcher.onValidUpdate(message, isV2); } } trace('Received RDS response with route config names ' + Array.from(allRouteConfigNames)); diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index 83db1781..14f3d1c7 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -18,7 +18,12 @@ import { StatusObject } from "@grpc/grpc-js"; export interface Watcher { - onValidUpdate(update: UpdateType): void; + /* Including the isV2 flag here is a bit of a kludge. It would probably be + * better for XdsStreamState#handleResponses to transform the protobuf + * message type into a library-specific configuration object type, to + * remove a lot of duplicate logic, including logic for handling that + * flag. */ + onValidUpdate(update: UpdateType, isV2: boolean): void; onTransientError(error: StatusObject): void; onResourceDoesNotExist(): void; } @@ -32,7 +37,7 @@ export interface XdsStreamState { * or null if it should be acked. * @param responses */ - handleResponses(responses: ResponseType[]): string | null; + handleResponses(responses: ResponseType[], isV2: boolean): string | null; reportStreamError(status: StatusObject): void; } \ No newline at end of file